1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 package org.modeshape.graph.connector.federation;
25
26 import java.util.ArrayList;
27 import java.util.Collections;
28 import java.util.HashMap;
29 import java.util.HashSet;
30 import java.util.Iterator;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.NoSuchElementException;
34 import java.util.Set;
35 import java.util.concurrent.BlockingQueue;
36 import net.jcip.annotations.NotThreadSafe;
37 import org.modeshape.graph.ExecutionContext;
38 import org.modeshape.graph.GraphI18n;
39 import org.modeshape.graph.Location;
40 import org.modeshape.graph.connector.federation.FederatedRequest.ProjectedRequest;
41 import org.modeshape.graph.observe.Observer;
42 import org.modeshape.graph.property.DateTime;
43 import org.modeshape.graph.property.Name;
44 import org.modeshape.graph.property.Path;
45 import org.modeshape.graph.property.PathFactory;
46 import org.modeshape.graph.property.PathNotFoundException;
47 import org.modeshape.graph.property.Property;
48 import org.modeshape.graph.property.PropertyFactory;
49 import org.modeshape.graph.property.ValueComparators;
50 import org.modeshape.graph.request.AccessQueryRequest;
51 import org.modeshape.graph.request.CacheableRequest;
52 import org.modeshape.graph.request.ChangeRequest;
53 import org.modeshape.graph.request.CloneBranchRequest;
54 import org.modeshape.graph.request.CloneWorkspaceRequest;
55 import org.modeshape.graph.request.CopyBranchRequest;
56 import org.modeshape.graph.request.CreateNodeRequest;
57 import org.modeshape.graph.request.CreateWorkspaceRequest;
58 import org.modeshape.graph.request.DeleteBranchRequest;
59 import org.modeshape.graph.request.DeleteChildrenRequest;
60 import org.modeshape.graph.request.DestroyWorkspaceRequest;
61 import org.modeshape.graph.request.FullTextSearchRequest;
62 import org.modeshape.graph.request.GetWorkspacesRequest;
63 import org.modeshape.graph.request.InvalidRequestException;
64 import org.modeshape.graph.request.MoveBranchRequest;
65 import org.modeshape.graph.request.ReadAllChildrenRequest;
66 import org.modeshape.graph.request.ReadAllPropertiesRequest;
67 import org.modeshape.graph.request.ReadBranchRequest;
68 import org.modeshape.graph.request.ReadNodeRequest;
69 import org.modeshape.graph.request.ReadPropertyRequest;
70 import org.modeshape.graph.request.RemovePropertyRequest;
71 import org.modeshape.graph.request.RenameNodeRequest;
72 import org.modeshape.graph.request.Request;
73 import org.modeshape.graph.request.RequestType;
74 import org.modeshape.graph.request.SetPropertyRequest;
75 import org.modeshape.graph.request.UpdatePropertiesRequest;
76 import org.modeshape.graph.request.VerifyNodeExistsRequest;
77 import org.modeshape.graph.request.VerifyWorkspaceRequest;
78 import org.modeshape.graph.request.processor.RequestProcessor;
79
80
81
82
83 @NotThreadSafe
84 class JoinRequestProcessor extends RequestProcessor {
85
86 private final PathFactory pathFactory;
87 private final PropertyFactory propertyFactory;
88 private final JoinMirrorRequestProcessor mirrorProcessor;
89 protected FederatedRequest federatedRequest;
90
91
92
93
94
95
96
97
98
99 public JoinRequestProcessor( FederatedRepository repository,
100 ExecutionContext context,
101 Observer observer,
102 DateTime now ) {
103 super(repository.getSourceName(), context, observer, now, repository.getDefaultCachePolicy());
104
105 this.propertyFactory = context.getPropertyFactory();
106 this.pathFactory = context.getValueFactories().getPathFactory();
107
108 this.mirrorProcessor = new JoinMirrorRequestProcessor(repository.getSourceName(), context, null, now,
109 repository.getDefaultCachePolicy());
110 }
111
112
113
114
115
116
117
118
119 public void process( final Iterable<FederatedRequest> completedFederatedRequests ) {
120 for (FederatedRequest federatedRequest : completedFederatedRequests) {
121
122 process(federatedRequest);
123 }
124 }
125
126
127
128
129
130
131
132
133
134
135 public void process( final BlockingQueue<FederatedRequest> federatedRequestQueue ) {
136 FederatedRequest forked = null;
137 try {
138 for (;;) {
139 forked = federatedRequestQueue.take();
140 if (forked instanceof NoMoreFederatedRequests) return;
141
142 forked.await();
143
144 process(forked);
145 }
146 } catch (InterruptedException e) {
147
148
149
150 try {
151 if (forked != null) {
152 forked.original().cancel();
153 }
154 } finally {
155
156 Thread.interrupted();
157 }
158 }
159 }
160
161 protected final void process( FederatedRequest forked ) {
162
163 Request original = forked.original();
164 ProjectedRequest projectedRequest = forked.getFirstProjectedRequest();
165 boolean sameLocation = projectedRequest != null && !projectedRequest.hasNext() && projectedRequest.isSameLocation();
166
167
168 if (original instanceof CacheableRequest) {
169 CacheableRequest cacheableOriginal = (CacheableRequest)original;
170 cacheableOriginal.setCachePolicy(getDefaultCachePolicy());
171 while (projectedRequest != null) {
172 Request requestToSource = projectedRequest.getRequest();
173 if (cacheableOriginal != null) {
174 setCacheableInfo(cacheableOriginal, ((CacheableRequest)requestToSource).getCachePolicy());
175 }
176 projectedRequest = projectedRequest.next();
177 }
178 }
179
180
181 if (sameLocation) {
182 Request sourceRequest = forked.getFirstProjectedRequest().getRequest();
183 if (sourceRequest.hasError()) {
184 original.setError(sourceRequest.getError());
185 } else if (sourceRequest.isCancelled()) {
186 original.cancel();
187 }
188 mirrorProcessor.setFederatedRequest(forked);
189 mirrorProcessor.process(original);
190
191 if (original instanceof ChangeRequest && !original.hasError() && !original.isCancelled()) {
192 recordChange((ChangeRequest)original);
193 }
194 } else {
195 this.federatedRequest = forked;
196 process(original);
197 }
198 }
199
200
201
202
203
204
205 @Override
206 public void process( VerifyNodeExistsRequest request ) {
207 ProjectedRequest projectedRequest = federatedRequest.getFirstProjectedRequest();
208
209 request.setCachePolicy(getDefaultCachePolicy());
210 Location actualLocation = request.at();
211 int numMerged = 0;
212 while (projectedRequest != null) {
213 VerifyNodeExistsRequest readFromSource = (VerifyNodeExistsRequest)projectedRequest.getRequest();
214 if (readFromSource.hasError()) {
215 projectedRequest = projectedRequest.next();
216 continue;
217 }
218 if (readFromSource.isCancelled()) {
219 request.cancel();
220 return;
221 }
222
223
224 Location sourceLocation = readFromSource.getActualLocationOfNode();
225 actualLocation = determineActualLocation(actualLocation, sourceLocation, projectedRequest.getProjection());
226
227 if (sourceLocation.hasIdProperties()) {
228
229 for (Property propertyInSource : sourceLocation.getIdProperties()) {
230 Name name = propertyInSource.getName();
231 Property existing = actualLocation.getIdProperty(name);
232 if (existing != null) {
233
234 propertyInSource = merge(existing, propertyInSource, propertyFactory, true);
235 }
236 actualLocation = actualLocation.with(propertyInSource);
237 }
238 }
239 setCacheableInfo(request, readFromSource.getCachePolicy());
240 projectedRequest = projectedRequest.next();
241 ++numMerged;
242 }
243 if (numMerged == 0) {
244
245 setPathNotFound(request, request.at(), federatedRequest.getFirstProjectedRequest());
246 } else {
247 request.setActualLocationOfNode(actualLocation);
248 }
249 }
250
251
252
253
254
255
256 @Override
257 public void process( ReadNodeRequest request ) {
258 Path federatedPath = request.at().getPath();
259 Map<Name, Property> properties = request.getPropertiesByName();
260 Map<Name, Integer> childSnsIndexes = new HashMap<Name, Integer>();
261 ProjectedRequest projectedRequest = federatedRequest.getFirstProjectedRequest();
262
263 request.setCachePolicy(getDefaultCachePolicy());
264 Location actualLocation = request.at();
265 int numMerged = 0;
266 while (projectedRequest != null) {
267 Request sourceRequest = projectedRequest.getRequest();
268 if (sourceRequest.hasError()) {
269 projectedRequest = projectedRequest.next();
270 continue;
271 }
272 if (sourceRequest.isCancelled()) {
273 request.cancel();
274 return;
275 }
276
277 Projection projection = projectedRequest.getProjection();
278 if (RequestType.VERIFY_NODE_EXISTS == sourceRequest.getType()) {
279
280 VerifyNodeExistsRequest verify = (VerifyNodeExistsRequest)sourceRequest;
281 Location childInSource = verify.getActualLocationOfNode();
282 Location childInRepos = getChildLocationWithCorrectSnsIndex(childInSource,
283 federatedPath,
284 childSnsIndexes,
285 projection);
286 request.addChild(childInRepos);
287 if (federatedPath == null) federatedPath = childInRepos.getPath().getParent();
288 } else {
289 ReadNodeRequest readFromSource = (ReadNodeRequest)sourceRequest;
290 Location sourceLocation = readFromSource.getActualLocationOfNode();
291 if (sourceLocation.hasIdProperties()) {
292
293 for (Property propertyInSource : sourceLocation.getIdProperties()) {
294 Name name = propertyInSource.getName();
295 Property existing = actualLocation.getIdProperty(name);
296 if (existing != null) {
297
298 propertyInSource = merge(existing, propertyInSource, propertyFactory, true);
299 }
300 actualLocation = actualLocation.with(propertyInSource);
301 }
302 }
303
304
305 actualLocation = determineActualLocation(actualLocation, sourceLocation, projection);
306 if (federatedPath == null) federatedPath = actualLocation.getPath();
307
308
309 for (Location childInSource : readFromSource.getChildren()) {
310 request.addChild(getChildLocationWithCorrectSnsIndex(childInSource,
311 federatedPath,
312 childSnsIndexes,
313 projection));
314 }
315
316
317 for (Property propertyInSource : readFromSource.getProperties()) {
318 Name name = propertyInSource.getName();
319 Property existing = properties.get(name);
320 if (existing != null) {
321
322 propertyInSource = merge(existing, propertyInSource, propertyFactory, true);
323 }
324 properties.put(name, propertyInSource);
325 }
326 setCacheableInfo(request, readFromSource.getCachePolicy());
327 }
328 projectedRequest = projectedRequest.next();
329 ++numMerged;
330 }
331 if (numMerged == 0) {
332
333 setPathNotFound(request, request.at(), federatedRequest.getFirstProjectedRequest());
334 } else {
335 if (!actualLocation.hasPath()) {
336 assert federatedPath != null;
337 actualLocation = actualLocation.with(federatedPath);
338 }
339 assert actualLocation.getPath() != null;
340 request.setActualLocationOfNode(actualLocation);
341 }
342 }
343
344 protected Location getChildLocationWithCorrectSnsIndex( Location childInSource,
345 Path federatedPath,
346 Map<Name, Integer> childSnsIndexes,
347 Projection projection ) {
348
349 Path childPath = childInSource.getPath();
350 if (childPath.isRoot() || federatedPath == null) {
351
352 for (Path path : projection.getPathsInRepository(childInSource.getPath(), pathFactory)) {
353 childPath = path;
354 if (federatedPath == null) federatedPath = path.getParent();
355 break;
356 }
357 }
358
359
360 Name childName = childPath.getLastSegment().getName();
361 Integer snsIndex = childSnsIndexes.get(childName);
362 if (snsIndex == null) {
363 snsIndex = new Integer(1);
364 childSnsIndexes.put(childName, snsIndex);
365 } else {
366 snsIndex = new Integer(snsIndex.intValue() + 1);
367 childSnsIndexes.put(childName, snsIndex);
368 }
369 Path newPath = pathFactory.create(federatedPath, childName, snsIndex.intValue());
370 return childInSource.with(newPath);
371 }
372
373
374
375
376
377
378
379
380
381 protected void setPathNotFound( Request original,
382 Location originalLocation,
383 ProjectedRequest projected ) {
384 Path lowestExistingInFederated = pathFactory.createRootPath();
385 while (projected != null) {
386 Request projectedRequest = projected.getRequest();
387 Throwable error = projectedRequest.getError();
388 if (error instanceof PathNotFoundException) {
389 PathNotFoundException notFound = (PathNotFoundException)error;
390 Path lowestExisting = notFound.getLowestAncestorThatDoesExist();
391
392 for (Path federatedPath : projected.getProjection().getPathsInRepository(lowestExisting, pathFactory)) {
393 if (federatedPath.isAtOrBelow(lowestExistingInFederated)) {
394 lowestExistingInFederated = federatedPath;
395 }
396 }
397 }
398 projected = projected.next();
399 }
400 original.setError(new PathNotFoundException(originalLocation, lowestExistingInFederated));
401 }
402
403
404
405
406
407
408 @Override
409 public void process( ReadAllChildrenRequest request ) {
410 Path federatedPath = request.of().getPath();
411 Map<Name, Integer> childSnsIndexes = new HashMap<Name, Integer>();
412 ProjectedRequest projectedRequest = federatedRequest.getFirstProjectedRequest();
413
414 request.setCachePolicy(getDefaultCachePolicy());
415 Location actualLocation = request.of();
416 int numMerged = 0;
417 while (projectedRequest != null) {
418 Request sourceRequest = projectedRequest.getRequest();
419 if (sourceRequest.hasError()) {
420 projectedRequest = projectedRequest.next();
421 continue;
422 }
423 if (sourceRequest.isCancelled()) {
424 request.cancel();
425 return;
426 }
427
428 Projection projection = projectedRequest.getProjection();
429 if (RequestType.VERIFY_NODE_EXISTS == sourceRequest.getType()) {
430
431 VerifyNodeExistsRequest verify = (VerifyNodeExistsRequest)sourceRequest;
432 Location childInSource = verify.getActualLocationOfNode();
433 Location childInRepos = getChildLocationWithCorrectSnsIndex(childInSource,
434 federatedPath,
435 childSnsIndexes,
436 projection);
437 request.addChild(childInRepos);
438 if (federatedPath == null) federatedPath = childInRepos.getPath().getParent();
439 } else {
440 ReadAllChildrenRequest readFromSource = (ReadAllChildrenRequest)sourceRequest;
441 Location sourceLocation = readFromSource.getActualLocationOfNode();
442 if (sourceLocation.hasIdProperties()) {
443
444 for (Property propertyInSource : sourceLocation.getIdProperties()) {
445 Name name = propertyInSource.getName();
446 Property existing = actualLocation.getIdProperty(name);
447 if (existing != null) {
448
449 propertyInSource = merge(existing, propertyInSource, propertyFactory, true);
450 }
451 actualLocation = actualLocation.with(propertyInSource);
452 }
453 }
454
455
456 actualLocation = determineActualLocation(actualLocation, readFromSource.getActualLocationOfNode(), projection);
457 if (federatedPath == null) federatedPath = actualLocation.getPath();
458
459
460 for (Location childInSource : readFromSource.getChildren()) {
461 request.addChild(getChildLocationWithCorrectSnsIndex(childInSource,
462 federatedPath,
463 childSnsIndexes,
464 projection));
465 }
466 setCacheableInfo(request, readFromSource.getCachePolicy());
467 }
468
469 projectedRequest = projectedRequest.next();
470 ++numMerged;
471 }
472 if (numMerged == 0) {
473
474 setPathNotFound(request, request.of(), federatedRequest.getFirstProjectedRequest());
475 } else {
476 if (!actualLocation.hasPath()) {
477 assert federatedPath != null;
478 actualLocation = actualLocation.with(federatedPath);
479 }
480 request.setActualLocationOfNode(actualLocation);
481 }
482 }
483
484 protected Location determineActualLocation( Location actual,
485 Location inSource,
486 Projection projection ) {
487 if (actual.getPath() == null) {
488 if (projection == null) {
489
490 return inSource;
491 }
492
493 Path pathInSource = inSource.getPath();
494 for (Path path : projection.getPathsInRepository(pathInSource, pathFactory)) {
495 return actual.with(path);
496 }
497 }
498 return actual;
499 }
500
501 protected Location determineActualLocation( Location actualInSource,
502 Projection projection ) {
503 assert projection != null;
504
505 Path pathInSource = actualInSource.getPath();
506 for (Path path : projection.getPathsInRepository(pathInSource, pathFactory)) {
507 return actualInSource.with(path);
508 }
509 return actualInSource;
510 }
511
512
513
514
515
516
517 @Override
518 public void process( ReadAllPropertiesRequest request ) {
519 Map<Name, Property> properties = request.getPropertiesByName();
520 ProjectedRequest projectedRequest = federatedRequest.getFirstProjectedRequest();
521
522 request.setCachePolicy(getDefaultCachePolicy());
523 Location actualLocation = request.at();
524 int numMerged = 0;
525 while (projectedRequest != null) {
526 ReadAllPropertiesRequest readFromSource = (ReadAllPropertiesRequest)projectedRequest.getRequest();
527 if (readFromSource.hasError()) {
528 projectedRequest = projectedRequest.next();
529 continue;
530 }
531 if (readFromSource.isCancelled()) {
532 request.cancel();
533 return;
534 }
535
536
537 Location sourceLocation = readFromSource.getActualLocationOfNode();
538 actualLocation = determineActualLocation(actualLocation, sourceLocation, projectedRequest.getProjection());
539
540 if (sourceLocation.hasIdProperties()) {
541
542 for (Property propertyInSource : sourceLocation.getIdProperties()) {
543 Name name = propertyInSource.getName();
544 Property existing = actualLocation.getIdProperty(name);
545 if (existing != null) {
546
547 propertyInSource = merge(existing, propertyInSource, propertyFactory, true);
548 }
549 actualLocation = actualLocation.with(propertyInSource);
550 }
551 }
552
553
554 for (Property propertyInSource : readFromSource.getProperties()) {
555 Name name = propertyInSource.getName();
556 Property existing = properties.get(name);
557 if (existing != null) {
558
559 propertyInSource = merge(existing, propertyInSource, propertyFactory, true);
560 }
561 properties.put(name, propertyInSource);
562 }
563 setCacheableInfo(request, readFromSource.getCachePolicy());
564 projectedRequest = projectedRequest.next();
565 ++numMerged;
566 }
567 if (numMerged == 0) {
568
569 setPathNotFound(request, request.at(), federatedRequest.getFirstProjectedRequest());
570 } else {
571 request.setActualLocationOfNode(actualLocation);
572 }
573 }
574
575
576
577
578
579
580 @Override
581 public void process( ReadPropertyRequest request ) {
582 ProjectedRequest projectedRequest = federatedRequest.getFirstProjectedRequest();
583
584 request.setCachePolicy(getDefaultCachePolicy());
585 Location actualLocation = request.on();
586 int numMerged = 0;
587 while (projectedRequest != null) {
588 ReadPropertyRequest readFromSource = (ReadPropertyRequest)projectedRequest.getRequest();
589 if (readFromSource.hasError()) {
590 projectedRequest = projectedRequest.next();
591 continue;
592 }
593 if (readFromSource.isCancelled()) {
594 request.cancel();
595 return;
596 }
597
598
599 Location sourceLocation = readFromSource.getActualLocationOfNode();
600 actualLocation = determineActualLocation(actualLocation, sourceLocation, projectedRequest.getProjection());
601
602 if (sourceLocation.hasIdProperties()) {
603
604 for (Property propertyInSource : sourceLocation.getIdProperties()) {
605 Name name = propertyInSource.getName();
606 Property existing = actualLocation.getIdProperty(name);
607 if (existing != null) {
608
609 propertyInSource = merge(existing, propertyInSource, propertyFactory, true);
610 }
611 actualLocation = actualLocation.with(propertyInSource);
612 }
613 }
614
615
616 Property read = readFromSource.getProperty();
617 if (read != null) {
618 Property existing = request.getProperty();
619 if (existing != null) {
620
621 request.setProperty(merge(existing, read, propertyFactory, true));
622 } else {
623 request.setProperty(read);
624 }
625 }
626 setCacheableInfo(request, readFromSource.getCachePolicy());
627 projectedRequest = projectedRequest.next();
628 ++numMerged;
629 }
630 if (numMerged == 0) {
631
632 setPathNotFound(request, request.on(), federatedRequest.getFirstProjectedRequest());
633 } else {
634 request.setActualLocationOfNode(actualLocation);
635 }
636 }
637
638
639
640
641
642
643 @Override
644 public void process( ReadBranchRequest request ) {
645 ProjectedRequest projectedRequest = federatedRequest.getFirstProjectedRequest();
646
647 request.setCachePolicy(getDefaultCachePolicy());
648 Location actualLocation = request.at();
649 int numMerged = 0;
650
651 Map<Path, Location> actualLocationsOfProxyNodes = new HashMap<Path, Location>();
652 while (projectedRequest != null) {
653 CacheableRequest fromSource = (CacheableRequest)projectedRequest.getRequest();
654 if (fromSource.hasError()) {
655 projectedRequest = projectedRequest.next();
656 continue;
657 }
658 if (fromSource.isCancelled()) {
659 request.cancel();
660 return;
661 }
662
663 Projection projection = projectedRequest.getProjection();
664 if (RequestType.READ_BRANCH == fromSource.getType()) {
665 ReadBranchRequest readFromSource = (ReadBranchRequest)fromSource;
666 for (Location parent : readFromSource) {
667 List<Location> children = readFromSource.getChildren(parent);
668 Map<Name, Property> properties = readFromSource.getPropertiesFor(parent);
669 projectToFederated(actualLocation, projection, request, parent, children, properties);
670 }
671 Location locationOfProxy = readFromSource.getActualLocationOfNode();
672
673 locationOfProxy = determineActualLocation(locationOfProxy, projection);
674 actualLocationsOfProxyNodes.put(locationOfProxy.getPath(), locationOfProxy);
675 }
676 setCacheableInfo(request, fromSource.getCachePolicy());
677 projectedRequest = projectedRequest.next();
678 ++numMerged;
679 }
680
681 projectedRequest = federatedRequest.getFirstProjectedRequest();
682 while (projectedRequest != null) {
683 CacheableRequest fromSource = (CacheableRequest)projectedRequest.getRequest();
684 Projection projection = projectedRequest.getProjection();
685 if (RequestType.READ_NODE == fromSource.getType()) {
686 ReadNodeRequest readFromSource = (ReadNodeRequest)fromSource;
687 Location parent = readFromSource.getActualLocationOfNode();
688 List<Location> children = readFromSource.getChildren();
689 for (int i = 0; i != children.size(); ++i) {
690 Location child = children.get(i);
691 if (!child.hasIdProperties()) {
692
693 Location actual = actualLocationsOfProxyNodes.get(child.getPath());
694 assert actual != null;
695 children.set(i, actual);
696 }
697 }
698 Map<Name, Property> properties = readFromSource.getPropertiesByName();
699 projectToFederated(actualLocation, projection, request, parent, children, properties);
700 }
701 setCacheableInfo(request, fromSource.getCachePolicy());
702 projectedRequest = projectedRequest.next();
703 }
704
705 if (numMerged == 0) {
706
707 setPathNotFound(request, request.at(), federatedRequest.getFirstProjectedRequest());
708 } else {
709 request.setActualLocationOfNode(actualLocation);
710 }
711 }
712
713
714
715
716
717
718
719
720
721
722
723 protected void projectToFederated( Location ancestorInFederation,
724 Projection projection,
725 ReadBranchRequest request,
726 Location parent,
727 List<Location> children,
728 Map<Name, Property> propertiesByName ) {
729 Path ancestorPath = ancestorInFederation.getPath();
730 if (projection == null) {
731
732 if (children != null) {
733
734 List<Location> existing = request.getChildren(parent);
735 if (existing == null) existing = new ArrayList<Location>(children.size());
736 for (Location child : children) {
737 existing.add(child);
738 }
739 request.setChildren(parent, existing);
740 }
741 if (propertiesByName != null) {
742
743 Map<Name, Property> propsByName = request.getPropertiesFor(parent);
744 if (propsByName == null) propsByName = new HashMap<Name, Property>();
745 for (Property property : propertiesByName.values()) {
746 Property existingProperty = propsByName.get(property.getName());
747 if (existingProperty != null) {
748
749 property = merge(existingProperty, property, propertyFactory, true);
750 }
751 propsByName.put(property.getName(), property);
752 }
753 request.setProperties(parent, propsByName.values());
754 }
755 return;
756 }
757 for (Path path : projection.getPathsInRepository(parent.getPath(), pathFactory)) {
758 if (!path.isAtOrBelow(ancestorPath)) continue;
759
760
761 Location parentInFederation = parent.with(path);
762 if (children != null) {
763
764 List<Location> existing = request.getChildren(parentInFederation);
765 if (existing == null) existing = new ArrayList<Location>(children.size());
766 for (Location child : children) {
767 Path childPath = pathFactory.create(path, child.getPath().getLastSegment());
768 existing.add(child.with(childPath));
769 }
770 request.setChildren(parentInFederation, existing);
771 }
772
773
774 if (propertiesByName != null) {
775 Map<Name, Property> propsByName = request.getPropertiesFor(parentInFederation);
776 if (propsByName == null) propsByName = new HashMap<Name, Property>();
777 for (Property property : propertiesByName.values()) {
778 Property existingProperty = propsByName.get(property.getName());
779 if (existingProperty != null) {
780
781 property = merge(existingProperty, property, propertyFactory, true);
782 }
783 propsByName.put(property.getName(), property);
784 }
785 request.setProperties(parentInFederation, propsByName.values());
786 }
787
788 return;
789 }
790 }
791
792
793
794
795
796
797
798
799
800
801
802
803 protected Location projectToFederated( Location ancestorInFederation,
804 Projection projection,
805 Location actualSourceLocation,
806 Request originalRequest ) {
807 Path ancestorPath = ancestorInFederation.getPath();
808 Path actualPathInSource = actualSourceLocation.getPath();
809
810 for (Path path : projection.getPathsInRepository(actualPathInSource, pathFactory)) {
811 if (path.isAtOrBelow(ancestorPath)) {
812 return actualSourceLocation.with(path);
813 }
814 }
815
816 String whereInSource = actualSourceLocation.getString(getExecutionContext().getNamespaceRegistry());
817 String msg = GraphI18n.unableToProjectSourceInformationIntoWorkspace.text(whereInSource, getSourceName(), projection);
818 originalRequest.setError(new InvalidRequestException(msg));
819 return null;
820 }
821
822
823
824
825
826
827
828
829
830
831
832 protected Location projectToFederated( Projection projection,
833 Location actualSourceLocation,
834 Request originalRequest ) {
835 Path actualPathInSource = actualSourceLocation.getPath();
836
837 for (Path path : projection.getPathsInRepository(actualPathInSource, pathFactory)) {
838 return actualSourceLocation.with(path);
839 }
840
841 String whereInSource = actualSourceLocation.getString(getExecutionContext().getNamespaceRegistry());
842 String msg = GraphI18n.unableToProjectSourceInformationIntoWorkspace.text(whereInSource, getSourceName(), projection);
843 originalRequest.setError(new InvalidRequestException(msg));
844 return null;
845 }
846
847
848
849
850
851
852 @Override
853 public void process( CreateNodeRequest request ) {
854 ProjectedRequest projected = federatedRequest.getFirstProjectedRequest();
855
856 if (checkErrorOrCancel(request, federatedRequest)) return;
857
858 Request projectedRequest = projected.getRequest();
859
860 if (checkErrorOrCancel(request, projectedRequest)) return;
861
862
863 Location sourceLocation = null;
864 if (RequestType.CREATE_NODE == projectedRequest.getType()) {
865 CreateNodeRequest source = (CreateNodeRequest)projectedRequest;
866 sourceLocation = source.getActualLocationOfNode();
867 } else if (RequestType.READ_NODE == projectedRequest.getType()) {
868
869
870
871 ReadNodeRequest source = (ReadNodeRequest)projectedRequest;
872 sourceLocation = source.getActualLocationOfNode();
873 }
874 request.setActualLocationOfNode(projectToFederated(request.under(), projected.getProjection(), sourceLocation, request));
875 }
876
877
878
879
880
881
882 @Override
883 public void process( UpdatePropertiesRequest request ) {
884 ProjectedRequest projected = federatedRequest.getFirstProjectedRequest();
885
886 if (checkErrorOrCancel(request, federatedRequest)) return;
887
888 UpdatePropertiesRequest source = (UpdatePropertiesRequest)projected.getRequest();
889 if (checkErrorOrCancel(request, source)) return;
890 Location sourceLocation = source.getActualLocationOfNode();
891 request.setActualLocationOfNode(projectToFederated(request.on(), projected.getProjection(), sourceLocation, request));
892 request.setNewProperties(source.getNewPropertyNames());
893 }
894
895
896
897
898
899
900 @Override
901 public void process( SetPropertyRequest request ) {
902 ProjectedRequest projected = federatedRequest.getFirstProjectedRequest();
903
904 if (checkErrorOrCancel(request, federatedRequest)) return;
905
906 SetPropertyRequest source = (SetPropertyRequest)projected.getRequest();
907 if (checkErrorOrCancel(request, source)) return;
908
909 Location sourceLocation = source.getActualLocationOfNode();
910 request.setActualLocationOfNode(projectToFederated(request.on(), projected.getProjection(), sourceLocation, request));
911 request.setNewProperty(source.isNewProperty());
912 }
913
914
915
916
917
918
919 @Override
920 public void process( RemovePropertyRequest request ) {
921 ProjectedRequest projected = federatedRequest.getFirstProjectedRequest();
922
923 if (checkErrorOrCancel(request, federatedRequest)) return;
924
925 SetPropertyRequest source = (SetPropertyRequest)projected.getRequest();
926 if (checkErrorOrCancel(request, source)) return;
927 Location sourceLocation = source.getActualLocationOfNode();
928 request.setActualLocationOfNode(projectToFederated(request.from(), projected.getProjection(), sourceLocation, request));
929 }
930
931
932
933
934
935
936 @Override
937 public void process( DeleteBranchRequest request ) {
938 ProjectedRequest projected = federatedRequest.getFirstProjectedRequest();
939
940 if (checkErrorOrCancel(request, federatedRequest)) return;
941
942
943 Request projectedSource = projected.getRequest();
944 if (checkErrorOrCancel(request, projectedSource)) return;
945
946
947 Location highest = null;
948 while (projected != null) {
949
950
951
952 Location actual = null;
953 Request sourceRequest = projected.getRequest();
954 if (RequestType.DELETE_BRANCH == sourceRequest.getType()) {
955 DeleteBranchRequest source = (DeleteBranchRequest)projected.getRequest();
956 actual = source.getActualLocationOfNode();
957 } else {
958 DeleteChildrenRequest source = (DeleteChildrenRequest)projected.getRequest();
959 actual = source.getActualLocationOfNode();
960 }
961 if (checkErrorOrCancel(request, sourceRequest)) return;
962 if (!projected.isSameLocation() && projected.getProjection() != null) {
963 actual = projectToFederated(request.at(), projected.getProjection(), actual, request);
964 }
965 if (highest == null) highest = actual;
966 else if (highest.getPath().isDecendantOf(actual.getPath())) highest = actual;
967 projected = projected.next();
968 }
969 assert highest != null;
970 request.setActualLocationOfNode(highest);
971 }
972
973
974
975
976
977
978 @Override
979 public void process( DeleteChildrenRequest request ) {
980 ProjectedRequest projected = federatedRequest.getFirstProjectedRequest();
981
982 if (checkErrorOrCancel(request, federatedRequest)) return;
983
984
985 Request projectedSource = projected.getRequest();
986 if (checkErrorOrCancel(request, projectedSource)) return;
987
988
989 Location highest = null;
990 while (projected != null) {
991
992 Request sourceRequest = projected.getRequest();
993 DeleteChildrenRequest source = (DeleteChildrenRequest)projected.getRequest();
994 Location actual = source.getActualLocationOfNode();
995 if (checkErrorOrCancel(request, sourceRequest)) return;
996 if (!projected.isSameLocation() && projected.getProjection() != null) {
997 actual = projectToFederated(request.at(), projected.getProjection(), actual, request);
998 }
999 if (highest == null) highest = actual;
1000 else if (highest.getPath().isDecendantOf(actual.getPath())) highest = actual;
1001 projected = projected.next();
1002 }
1003 assert highest != null;
1004 request.setActualLocationOfNode(highest);
1005 }
1006
1007
1008
1009
1010
1011
1012 @Override
1013 public void process( RenameNodeRequest request ) {
1014 ProjectedRequest projected = federatedRequest.getFirstProjectedRequest();
1015
1016 if (checkErrorOrCancel(request, federatedRequest)) return;
1017
1018 RenameNodeRequest source = (RenameNodeRequest)projected.getRequest();
1019 if (checkErrorOrCancel(request, source)) return;
1020 Location locationBefore = source.getActualLocationBefore();
1021 Location locationAfter = source.getActualLocationBefore();
1022 locationBefore = projectToFederated(request.at(), projected.getProjection(), locationBefore, request);
1023 locationAfter = projectToFederated(request.at(), projected.getSecondProjection(), locationAfter, request);
1024 request.setActualLocations(locationBefore, locationAfter);
1025 }
1026
1027
1028
1029
1030
1031
1032 @Override
1033 public void process( CopyBranchRequest request ) {
1034 ProjectedRequest projected = federatedRequest.getFirstProjectedRequest();
1035
1036 if (checkErrorOrCancel(request, federatedRequest)) return;
1037
1038 CopyBranchRequest source = (CopyBranchRequest)projected.getRequest();
1039 if (checkErrorOrCancel(request, source)) return;
1040 Location locationBefore = source.getActualLocationBefore();
1041 Location locationAfter = source.getActualLocationBefore();
1042 locationBefore = projectToFederated(request.from(), projected.getProjection(), locationBefore, request);
1043 locationAfter = projectToFederated(request.into(), projected.getSecondProjection(), locationAfter, request);
1044 request.setActualLocations(locationBefore, locationAfter);
1045 }
1046
1047
1048
1049
1050
1051
1052 @Override
1053 public void process( CloneBranchRequest request ) {
1054 ProjectedRequest projected = federatedRequest.getFirstProjectedRequest();
1055
1056 if (checkErrorOrCancel(request, federatedRequest)) return;
1057
1058 CloneBranchRequest source = (CloneBranchRequest)projected.getRequest();
1059 if (checkErrorOrCancel(request, source)) return;
1060 Location locationBefore = source.getActualLocationBefore();
1061 Location locationAfter = source.getActualLocationBefore();
1062 locationBefore = projectToFederated(request.from(), projected.getProjection(), locationBefore, request);
1063 locationAfter = projectToFederated(request.into(), projected.getSecondProjection(), locationAfter, request);
1064 request.setActualLocations(locationBefore, locationAfter);
1065 if (source.removeExisting()) {
1066 Set<Location> removed = new HashSet<Location>();
1067 for (Location location : request.getRemovedNodes()) {
1068 removed.add(projectToFederated(projected.getSecondProjection(), location, request));
1069 }
1070 request.setRemovedNodes(Collections.unmodifiableSet(removed));
1071 }
1072 }
1073
1074
1075
1076
1077
1078
1079 @Override
1080 public void process( MoveBranchRequest request ) {
1081 ProjectedRequest projected = federatedRequest.getFirstProjectedRequest();
1082
1083 if (checkErrorOrCancel(request, federatedRequest)) return;
1084
1085 MoveBranchRequest source = (MoveBranchRequest)projected.getRequest();
1086 if (checkErrorOrCancel(request, source)) return;
1087 Location locationBefore = source.getActualLocationBefore();
1088 Location locationAfter = source.getActualLocationBefore();
1089 locationBefore = projectToFederated(request.from(), projected.getProjection(), locationBefore, request);
1090 Projection afterProjection = projected.getSecondProjection();
1091 if (afterProjection == null) projected.getProjection();
1092 locationAfter = projectToFederated(request.into(), afterProjection, locationAfter, request);
1093 request.setActualLocations(locationBefore, locationAfter);
1094 }
1095
1096
1097
1098
1099
1100
1101 @Override
1102 public void process( VerifyWorkspaceRequest request ) {
1103 ProjectedRequest projectedRequest = federatedRequest.getFirstProjectedRequest();
1104
1105 Location actualLocation = Location.create(getExecutionContext().getValueFactories().getPathFactory().createRootPath());
1106 while (projectedRequest != null) {
1107 VerifyNodeExistsRequest readFromSource = (VerifyNodeExistsRequest)projectedRequest.getRequest();
1108 if (readFromSource.hasError()) {
1109 request.setError(readFromSource.getError());
1110 return;
1111 }
1112 request.setError(null);
1113 if (readFromSource.isCancelled()) {
1114 request.cancel();
1115 return;
1116 }
1117
1118 Location sourceLocation = readFromSource.getActualLocationOfNode();
1119 if (sourceLocation.hasIdProperties()) {
1120
1121 for (Property propertyInSource : sourceLocation.getIdProperties()) {
1122 Name name = propertyInSource.getName();
1123 Property existing = actualLocation.getIdProperty(name);
1124 if (existing != null) {
1125
1126 propertyInSource = merge(existing, propertyInSource, propertyFactory, true);
1127 }
1128 actualLocation = actualLocation.with(propertyInSource);
1129 }
1130 }
1131 projectedRequest = projectedRequest.next();
1132 }
1133 request.setActualRootLocation(actualLocation);
1134 }
1135
1136
1137
1138
1139
1140
1141 @Override
1142 public void process( GetWorkspacesRequest request ) {
1143 throw new UnsupportedOperationException();
1144 }
1145
1146
1147
1148
1149
1150
1151 @Override
1152 public void process( CreateWorkspaceRequest request ) {
1153 throw new UnsupportedOperationException();
1154 }
1155
1156
1157
1158
1159
1160
1161 @Override
1162 public void process( CloneWorkspaceRequest request ) {
1163 throw new UnsupportedOperationException();
1164 }
1165
1166
1167
1168
1169
1170
1171 @Override
1172 public void process( DestroyWorkspaceRequest request ) {
1173 throw new UnsupportedOperationException();
1174 }
1175
1176
1177
1178
1179
1180
1181 @Override
1182 public void process( AccessQueryRequest request ) {
1183 throw new UnsupportedOperationException();
1184 }
1185
1186
1187
1188
1189
1190
1191 @Override
1192 public void process( FullTextSearchRequest request ) {
1193 throw new UnsupportedOperationException();
1194 }
1195
1196 protected boolean checkErrorOrCancel( Request request,
1197 FederatedRequest federatedRequest ) {
1198 if (federatedRequest.getFirstProjectedRequest() == null) {
1199 Request original = federatedRequest.original();
1200 if (original.hasError()) {
1201
1202 request.setError(original.getError());
1203 return true;
1204 }
1205 assert original.isCancelled();
1206 request.cancel();
1207 return true;
1208 }
1209 return false;
1210 }
1211
1212 protected boolean checkErrorOrCancel( Request request,
1213 Request sourceRequest ) {
1214 if (sourceRequest.hasError()) {
1215 request.setError(sourceRequest.getError());
1216 return true;
1217 }
1218 if (sourceRequest.isCancelled()) {
1219 request.cancel();
1220 return true;
1221 }
1222 return false;
1223 }
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243 protected Property merge( Property property1,
1244 Property property2,
1245 PropertyFactory factory,
1246 boolean removeDuplicates ) {
1247 assert property1 != null;
1248 assert property2 != null;
1249 assert property1.getName().equals(property2.getName());
1250 if (property1.isEmpty()) return property2;
1251 if (property2.isEmpty()) return property1;
1252
1253
1254 if (property1.isSingle() && property2.isSingle()) {
1255 Object value1 = property1.getValues().next();
1256 Object value2 = property2.getValues().next();
1257 if (removeDuplicates && ValueComparators.OBJECT_COMPARATOR.compare(value1, value2) == 0) return property1;
1258 return factory.create(property1.getName(), new Object[] {value1, value2});
1259 }
1260
1261
1262 if (!removeDuplicates) {
1263 Iterator<?> valueIterator = new DualIterator(property1.getValues(), property2.getValues());
1264 return factory.create(property1.getName(), valueIterator);
1265 }
1266
1267
1268 Object[] values = new Object[property1.size() + property2.size()];
1269 int index = 0;
1270 for (Object property1Value : property1) {
1271 values[index++] = property1Value;
1272 }
1273 assert index == property1.size();
1274
1275 for (Object property2Value : property2) {
1276
1277 boolean matched = false;
1278 for (Object property1Value : property1) {
1279 if (ValueComparators.OBJECT_COMPARATOR.compare(property1Value, property2Value) == 0) {
1280
1281 matched = true;
1282 break;
1283 }
1284 }
1285 if (!matched) values[index++] = property2Value;
1286 }
1287 if (index != values.length) {
1288 Object[] newValues = new Object[index];
1289 System.arraycopy(values, 0, newValues, 0, index);
1290 values = newValues;
1291 }
1292 return factory.create(property1.getName(), values);
1293 }
1294
1295 protected static class DualIterator implements Iterator<Object> {
1296
1297 private final Iterator<?>[] iterators;
1298 private Iterator<?> current;
1299 private int index = 0;
1300
1301 protected DualIterator( Iterator<?>... iterators ) {
1302 assert iterators != null;
1303 assert iterators.length > 0;
1304 this.iterators = iterators;
1305 this.current = this.iterators[0];
1306 }
1307
1308
1309
1310
1311
1312
1313 public boolean hasNext() {
1314 if (this.current != null) return this.current.hasNext();
1315 return false;
1316 }
1317
1318
1319
1320
1321
1322
1323 public Object next() {
1324 while (this.current != null) {
1325 if (this.current.hasNext()) return this.current.next();
1326
1327 if (++this.index < iterators.length) {
1328 this.current = this.iterators[this.index];
1329 } else {
1330 this.current = null;
1331 }
1332 }
1333 throw new NoSuchElementException();
1334 }
1335
1336
1337
1338
1339
1340
1341 public void remove() {
1342 throw new UnsupportedOperationException();
1343 }
1344 }
1345
1346 }