View Javadoc

1   /*
2    * ModeShape (http://www.modeshape.org)
3    * See the COPYRIGHT.txt file distributed with this work for information
4    * regarding copyright ownership.  Some portions may be licensed
5    * to Red Hat, Inc. under one or more contributor license agreements.
6    * See the AUTHORS.txt file in the distribution for a full listing of 
7    * individual contributors.
8    *
9    * Unless otherwise indicated, all code in ModeShape is licensed
10   * to you under the terms of the GNU Lesser General Public License as
11   * published by the Free Software Foundation; either version 2.1 of
12   * the License, or (at your option) any later version.
13   * 
14   * ModeShape is distributed in the hope that it will be useful,
15   * but WITHOUT ANY WARRANTY; without even the implied warranty of
16   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17   * Lesser General Public License for more details.
18   *
19   * You should have received a copy of the GNU Lesser General Public
20   * License along with this software; if not, write to the Free
21   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
22   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
23   */
24  package org.modeshape.graph.connector.federation;
25  
26  import java.util.ArrayList;
27  import java.util.Collections;
28  import java.util.HashMap;
29  import java.util.HashSet;
30  import java.util.Iterator;
31  import java.util.List;
32  import java.util.Map;
33  import java.util.NoSuchElementException;
34  import java.util.Set;
35  import java.util.concurrent.BlockingQueue;
36  import net.jcip.annotations.NotThreadSafe;
37  import org.modeshape.graph.ExecutionContext;
38  import org.modeshape.graph.GraphI18n;
39  import org.modeshape.graph.Location;
40  import org.modeshape.graph.connector.federation.FederatedRequest.ProjectedRequest;
41  import org.modeshape.graph.observe.Observer;
42  import org.modeshape.graph.property.DateTime;
43  import org.modeshape.graph.property.Name;
44  import org.modeshape.graph.property.Path;
45  import org.modeshape.graph.property.PathFactory;
46  import org.modeshape.graph.property.PathNotFoundException;
47  import org.modeshape.graph.property.Property;
48  import org.modeshape.graph.property.PropertyFactory;
49  import org.modeshape.graph.property.ValueComparators;
50  import org.modeshape.graph.request.AccessQueryRequest;
51  import org.modeshape.graph.request.CacheableRequest;
52  import org.modeshape.graph.request.ChangeRequest;
53  import org.modeshape.graph.request.CloneBranchRequest;
54  import org.modeshape.graph.request.CloneWorkspaceRequest;
55  import org.modeshape.graph.request.CopyBranchRequest;
56  import org.modeshape.graph.request.CreateNodeRequest;
57  import org.modeshape.graph.request.CreateWorkspaceRequest;
58  import org.modeshape.graph.request.DeleteBranchRequest;
59  import org.modeshape.graph.request.DeleteChildrenRequest;
60  import org.modeshape.graph.request.DestroyWorkspaceRequest;
61  import org.modeshape.graph.request.FullTextSearchRequest;
62  import org.modeshape.graph.request.GetWorkspacesRequest;
63  import org.modeshape.graph.request.InvalidRequestException;
64  import org.modeshape.graph.request.MoveBranchRequest;
65  import org.modeshape.graph.request.ReadAllChildrenRequest;
66  import org.modeshape.graph.request.ReadAllPropertiesRequest;
67  import org.modeshape.graph.request.ReadBranchRequest;
68  import org.modeshape.graph.request.ReadNodeRequest;
69  import org.modeshape.graph.request.ReadPropertyRequest;
70  import org.modeshape.graph.request.RemovePropertyRequest;
71  import org.modeshape.graph.request.RenameNodeRequest;
72  import org.modeshape.graph.request.Request;
73  import org.modeshape.graph.request.RequestType;
74  import org.modeshape.graph.request.SetPropertyRequest;
75  import org.modeshape.graph.request.UpdatePropertiesRequest;
76  import org.modeshape.graph.request.VerifyNodeExistsRequest;
77  import org.modeshape.graph.request.VerifyWorkspaceRequest;
78  import org.modeshape.graph.request.processor.RequestProcessor;
79  
80  /**
81   * A {@link RequestProcessor} that performs the join portion of the fork-join operation.
82   */
83  @NotThreadSafe
84  class JoinRequestProcessor extends RequestProcessor {
85  
86      private final PathFactory pathFactory;
87      private final PropertyFactory propertyFactory;
88      private final JoinMirrorRequestProcessor mirrorProcessor;
89      protected FederatedRequest federatedRequest;
90  
91      /**
92       * Create a new join processor
93       * 
94       * @param repository the federated repository configuration; never null
95       * @param context the execution context in which this processor is executing; may not be null
96       * @param observer the observer for change events; may be null
97       * @param now the timestamp representing the current time in UTC; may not be null
98       */
99      public JoinRequestProcessor( FederatedRepository repository,
100                                  ExecutionContext context,
101                                  Observer observer,
102                                  DateTime now ) {
103         super(repository.getSourceName(), context, observer, now, repository.getDefaultCachePolicy());
104         // this.repository = repository;
105         this.propertyFactory = context.getPropertyFactory();
106         this.pathFactory = context.getValueFactories().getPathFactory();
107         // The mirror processor should never send anything to an observer, since all requests go to this processor's observer
108         this.mirrorProcessor = new JoinMirrorRequestProcessor(repository.getSourceName(), context, null, now,
109                                                               repository.getDefaultCachePolicy());
110     }
111 
112     /**
113      * Process all of the {@link FederatedRequest} objects that are in the supplied collection.
114      * 
115      * @param completedFederatedRequests the collection of {@link FederatedRequest} whose projected requests have already been
116      *        processed; may not be null
117      * @see FederatedRepositoryConnection#execute(ExecutionContext, org.modeshape.graph.request.Request)
118      */
119     public void process( final Iterable<FederatedRequest> completedFederatedRequests ) {
120         for (FederatedRequest federatedRequest : completedFederatedRequests) {
121             // No need to await for the forked request, since it will be done
122             process(federatedRequest);
123         }
124     }
125 
126     /**
127      * Process the {@link FederatedRequest} objects that are in the supplied queue. The queue contains {@link FederatedRequest}
128      * that may have projected requests that have not yet been processed by the respective source, so this method
129      * {@link FederatedRequest#await() waits} until all source requests have been processed. This method returns only when it
130      * obtains from the queue a {@link NoMoreFederatedRequests} instance.
131      * 
132      * @param federatedRequestQueue the queue containing the federated requests; may not be null
133      * @see FederatedRepositoryConnection#execute(ExecutionContext, org.modeshape.graph.request.Request)
134      */
135     public void process( final BlockingQueue<FederatedRequest> federatedRequestQueue ) {
136         FederatedRequest forked = null;
137         try {
138             for (;;) {
139                 forked = federatedRequestQueue.take();
140                 if (forked instanceof NoMoreFederatedRequests) return;
141                 // Block until this forked request has completed
142                 forked.await();
143                 // Now process ...
144                 process(forked);
145             }
146         } catch (InterruptedException e) {
147             // This happens when the federated connector has been told to shutdown now, and it shuts down
148             // its executor (the worker pool) immediately by interrupting each in-use thread.
149             // In this case, we should cancel the current request but should NOT iterate over any remaining requests.
150             try {
151                 if (forked != null) {
152                     forked.original().cancel();
153                 }
154             } finally {
155                 // Clear the interrupted status of the thread ...
156                 Thread.interrupted();
157             }
158         }
159     }
160 
161     protected final void process( FederatedRequest forked ) {
162         // Determine whether this is a single mirror request ...
163         Request original = forked.original();
164         ProjectedRequest projectedRequest = forked.getFirstProjectedRequest();
165         boolean sameLocation = projectedRequest != null && !projectedRequest.hasNext() && projectedRequest.isSameLocation();
166 
167         // Set the cachable information ...
168         if (original instanceof CacheableRequest) {
169             CacheableRequest cacheableOriginal = (CacheableRequest)original;
170             cacheableOriginal.setCachePolicy(getDefaultCachePolicy());
171             while (projectedRequest != null) {
172                 Request requestToSource = projectedRequest.getRequest();
173                 if (cacheableOriginal != null) {
174                     setCacheableInfo(cacheableOriginal, ((CacheableRequest)requestToSource).getCachePolicy());
175                 }
176                 projectedRequest = projectedRequest.next();
177             }
178         }
179 
180         // Now do the join on this request ...
181         if (sameLocation) {
182             Request sourceRequest = forked.getFirstProjectedRequest().getRequest();
183             if (sourceRequest.hasError()) {
184                 original.setError(sourceRequest.getError());
185             } else if (sourceRequest.isCancelled()) {
186                 original.cancel();
187             }
188             mirrorProcessor.setFederatedRequest(forked);
189             mirrorProcessor.process(original);
190             // If this is a change request, record it on this processor so it goes to the observer ...
191             if (original instanceof ChangeRequest && !original.hasError() && !original.isCancelled()) {
192                 recordChange((ChangeRequest)original);
193             }
194         } else {
195             this.federatedRequest = forked;
196             process(original);
197         }
198     }
199 
200     /**
201      * {@inheritDoc}
202      * 
203      * @see org.modeshape.graph.request.processor.RequestProcessor#process(org.modeshape.graph.request.VerifyNodeExistsRequest)
204      */
205     @Override
206     public void process( VerifyNodeExistsRequest request ) {
207         ProjectedRequest projectedRequest = federatedRequest.getFirstProjectedRequest();
208 
209         request.setCachePolicy(getDefaultCachePolicy());
210         Location actualLocation = request.at();
211         int numMerged = 0;
212         while (projectedRequest != null) {
213             VerifyNodeExistsRequest readFromSource = (VerifyNodeExistsRequest)projectedRequest.getRequest();
214             if (readFromSource.hasError()) {
215                 projectedRequest = projectedRequest.next();
216                 continue;
217             }
218             if (readFromSource.isCancelled()) {
219                 request.cancel();
220                 return;
221             }
222 
223             // Make sure we have an actual location ...
224             Location sourceLocation = readFromSource.getActualLocationOfNode();
225             actualLocation = determineActualLocation(actualLocation, sourceLocation, projectedRequest.getProjection());
226 
227             if (sourceLocation.hasIdProperties()) {
228                 // Accumulate the identification properties ...
229                 for (Property propertyInSource : sourceLocation.getIdProperties()) {
230                     Name name = propertyInSource.getName();
231                     Property existing = actualLocation.getIdProperty(name);
232                     if (existing != null) {
233                         // Merge the property values ...
234                         propertyInSource = merge(existing, propertyInSource, propertyFactory, true);
235                     }
236                     actualLocation = actualLocation.with(propertyInSource);
237                 }
238             }
239             setCacheableInfo(request, readFromSource.getCachePolicy());
240             projectedRequest = projectedRequest.next();
241             ++numMerged;
242         }
243         if (numMerged == 0) {
244             // No source requests had results ...
245             setPathNotFound(request, request.at(), federatedRequest.getFirstProjectedRequest());
246         } else {
247             request.setActualLocationOfNode(actualLocation);
248         }
249     }
250 
251     /**
252      * {@inheritDoc}
253      * 
254      * @see org.modeshape.graph.request.processor.RequestProcessor#process(org.modeshape.graph.request.ReadNodeRequest)
255      */
256     @Override
257     public void process( ReadNodeRequest request ) {
258         Path federatedPath = request.at().getPath();
259         Map<Name, Property> properties = request.getPropertiesByName();
260         Map<Name, Integer> childSnsIndexes = new HashMap<Name, Integer>();
261         ProjectedRequest projectedRequest = federatedRequest.getFirstProjectedRequest();
262 
263         request.setCachePolicy(getDefaultCachePolicy());
264         Location actualLocation = request.at();
265         int numMerged = 0;
266         while (projectedRequest != null) {
267             Request sourceRequest = projectedRequest.getRequest();
268             if (sourceRequest.hasError()) {
269                 projectedRequest = projectedRequest.next();
270                 continue;
271             }
272             if (sourceRequest.isCancelled()) {
273                 request.cancel();
274                 return;
275             }
276 
277             Projection projection = projectedRequest.getProjection();
278             if (RequestType.VERIFY_NODE_EXISTS == sourceRequest.getType()) {
279                 // We needed to verify the existance of a child node ...
280                 VerifyNodeExistsRequest verify = (VerifyNodeExistsRequest)sourceRequest;
281                 Location childInSource = verify.getActualLocationOfNode();
282                 Location childInRepos = getChildLocationWithCorrectSnsIndex(childInSource,
283                                                                             federatedPath,
284                                                                             childSnsIndexes,
285                                                                             projection);
286                 request.addChild(childInRepos);
287                 if (federatedPath == null) federatedPath = childInRepos.getPath().getParent();
288             } else {
289                 ReadNodeRequest readFromSource = (ReadNodeRequest)sourceRequest;
290                 Location sourceLocation = readFromSource.getActualLocationOfNode();
291                 if (sourceLocation.hasIdProperties()) {
292                     // Accumulate the identification properties ...
293                     for (Property propertyInSource : sourceLocation.getIdProperties()) {
294                         Name name = propertyInSource.getName();
295                         Property existing = actualLocation.getIdProperty(name);
296                         if (existing != null) {
297                             // Merge the property values ...
298                             propertyInSource = merge(existing, propertyInSource, propertyFactory, true);
299                         }
300                         actualLocation = actualLocation.with(propertyInSource);
301                     }
302                 }
303 
304                 // Make sure we have an actual location ...
305                 actualLocation = determineActualLocation(actualLocation, sourceLocation, projection);
306                 if (federatedPath == null) federatedPath = actualLocation.getPath();
307 
308                 // Add all the children from the source ...
309                 for (Location childInSource : readFromSource.getChildren()) {
310                     request.addChild(getChildLocationWithCorrectSnsIndex(childInSource,
311                                                                          federatedPath,
312                                                                          childSnsIndexes,
313                                                                          projection));
314                 }
315 
316                 // Add all the properties ...
317                 for (Property propertyInSource : readFromSource.getProperties()) {
318                     Name name = propertyInSource.getName();
319                     Property existing = properties.get(name);
320                     if (existing != null) {
321                         // Merge the property values ...
322                         propertyInSource = merge(existing, propertyInSource, propertyFactory, true);
323                     }
324                     properties.put(name, propertyInSource);
325                 }
326                 setCacheableInfo(request, readFromSource.getCachePolicy());
327             }
328             projectedRequest = projectedRequest.next();
329             ++numMerged;
330         }
331         if (numMerged == 0) {
332             // No source requests had results ...
333             setPathNotFound(request, request.at(), federatedRequest.getFirstProjectedRequest());
334         } else {
335             if (!actualLocation.hasPath()) {
336                 assert federatedPath != null;
337                 actualLocation = actualLocation.with(federatedPath);
338             }
339             assert actualLocation.getPath() != null;
340             request.setActualLocationOfNode(actualLocation);
341         }
342     }
343 
344     protected Location getChildLocationWithCorrectSnsIndex( Location childInSource,
345                                                             Path federatedPath,
346                                                             Map<Name, Integer> childSnsIndexes,
347                                                             Projection projection ) {
348         // Project back into the federated repository ...
349         Path childPath = childInSource.getPath();
350         if (childPath.isRoot() || federatedPath == null) {
351             // We've lost the name of the child, so we need to recompute the path ...
352             for (Path path : projection.getPathsInRepository(childInSource.getPath(), pathFactory)) {
353                 childPath = path;
354                 if (federatedPath == null) federatedPath = path.getParent();
355                 break;
356             }
357         }
358 
359         // Correct the same-name-sibling index for the child ...
360         Name childName = childPath.getLastSegment().getName();
361         Integer snsIndex = childSnsIndexes.get(childName);
362         if (snsIndex == null) {
363             snsIndex = new Integer(1);
364             childSnsIndexes.put(childName, snsIndex);
365         } else {
366             snsIndex = new Integer(snsIndex.intValue() + 1);
367             childSnsIndexes.put(childName, snsIndex);
368         }
369         Path newPath = pathFactory.create(federatedPath, childName, snsIndex.intValue());
370         return childInSource.with(newPath);
371     }
372 
373     /**
374      * Sets the request {@link Request#setError(Throwable) error} to a {@link PathNotFoundException} that has the lowest existing
375      * ancestor computed from the {@link PathNotFoundException}s in the projected requests.
376      * 
377      * @param original
378      * @param originalLocation
379      * @param projected
380      */
381     protected void setPathNotFound( Request original,
382                                     Location originalLocation,
383                                     ProjectedRequest projected ) {
384         Path lowestExistingInFederated = pathFactory.createRootPath();
385         while (projected != null) {
386             Request projectedRequest = projected.getRequest();
387             Throwable error = projectedRequest.getError();
388             if (error instanceof PathNotFoundException) {
389                 PathNotFoundException notFound = (PathNotFoundException)error;
390                 Path lowestExisting = notFound.getLowestAncestorThatDoesExist();
391                 // Project back to the repository level ...
392                 for (Path federatedPath : projected.getProjection().getPathsInRepository(lowestExisting, pathFactory)) {
393                     if (federatedPath.isAtOrBelow(lowestExistingInFederated)) {
394                         lowestExistingInFederated = federatedPath;
395                     }
396                 }
397             }
398             projected = projected.next();
399         }
400         original.setError(new PathNotFoundException(originalLocation, lowestExistingInFederated));
401     }
402 
403     /**
404      * {@inheritDoc}
405      * 
406      * @see org.modeshape.graph.request.processor.RequestProcessor#process(org.modeshape.graph.request.ReadAllChildrenRequest)
407      */
408     @Override
409     public void process( ReadAllChildrenRequest request ) {
410         Path federatedPath = request.of().getPath();
411         Map<Name, Integer> childSnsIndexes = new HashMap<Name, Integer>();
412         ProjectedRequest projectedRequest = federatedRequest.getFirstProjectedRequest();
413 
414         request.setCachePolicy(getDefaultCachePolicy());
415         Location actualLocation = request.of();
416         int numMerged = 0;
417         while (projectedRequest != null) {
418             Request sourceRequest = projectedRequest.getRequest();
419             if (sourceRequest.hasError()) {
420                 projectedRequest = projectedRequest.next();
421                 continue;
422             }
423             if (sourceRequest.isCancelled()) {
424                 request.cancel();
425                 return;
426             }
427 
428             Projection projection = projectedRequest.getProjection();
429             if (RequestType.VERIFY_NODE_EXISTS == sourceRequest.getType()) {
430                 // We needed to verify the existance of a child node ...
431                 VerifyNodeExistsRequest verify = (VerifyNodeExistsRequest)sourceRequest;
432                 Location childInSource = verify.getActualLocationOfNode();
433                 Location childInRepos = getChildLocationWithCorrectSnsIndex(childInSource,
434                                                                             federatedPath,
435                                                                             childSnsIndexes,
436                                                                             projection);
437                 request.addChild(childInRepos);
438                 if (federatedPath == null) federatedPath = childInRepos.getPath().getParent();
439             } else {
440                 ReadAllChildrenRequest readFromSource = (ReadAllChildrenRequest)sourceRequest;
441                 Location sourceLocation = readFromSource.getActualLocationOfNode();
442                 if (sourceLocation.hasIdProperties()) {
443                     // Accumulate the identification properties ...
444                     for (Property propertyInSource : sourceLocation.getIdProperties()) {
445                         Name name = propertyInSource.getName();
446                         Property existing = actualLocation.getIdProperty(name);
447                         if (existing != null) {
448                             // Merge the property values ...
449                             propertyInSource = merge(existing, propertyInSource, propertyFactory, true);
450                         }
451                         actualLocation = actualLocation.with(propertyInSource);
452                     }
453                 }
454 
455                 // Make sure we have an actual location ...
456                 actualLocation = determineActualLocation(actualLocation, readFromSource.getActualLocationOfNode(), projection);
457                 if (federatedPath == null) federatedPath = actualLocation.getPath();
458 
459                 // Add all the children from the source ...
460                 for (Location childInSource : readFromSource.getChildren()) {
461                     request.addChild(getChildLocationWithCorrectSnsIndex(childInSource,
462                                                                          federatedPath,
463                                                                          childSnsIndexes,
464                                                                          projection));
465                 }
466                 setCacheableInfo(request, readFromSource.getCachePolicy());
467             }
468 
469             projectedRequest = projectedRequest.next();
470             ++numMerged;
471         }
472         if (numMerged == 0) {
473             // No source requests had results ...
474             setPathNotFound(request, request.of(), federatedRequest.getFirstProjectedRequest());
475         } else {
476             if (!actualLocation.hasPath()) {
477                 assert federatedPath != null;
478                 actualLocation = actualLocation.with(federatedPath);
479             }
480             request.setActualLocationOfNode(actualLocation);
481         }
482     }
483 
484     protected Location determineActualLocation( Location actual,
485                                                 Location inSource,
486                                                 Projection projection ) {
487         if (actual.getPath() == null) {
488             if (projection == null) {
489                 // It must be a placeholder node ...
490                 return inSource;
491             }
492             // Get the projection from the source-specific location ...
493             Path pathInSource = inSource.getPath();
494             for (Path path : projection.getPathsInRepository(pathInSource, pathFactory)) {
495                 return actual.with(path);
496             }
497         }
498         return actual;
499     }
500 
501     protected Location determineActualLocation( Location actualInSource,
502                                                 Projection projection ) {
503         assert projection != null;
504         // Get the projection from the source-specific location ...
505         Path pathInSource = actualInSource.getPath();
506         for (Path path : projection.getPathsInRepository(pathInSource, pathFactory)) {
507             return actualInSource.with(path);
508         }
509         return actualInSource;
510     }
511 
512     /**
513      * {@inheritDoc}
514      * 
515      * @see org.modeshape.graph.request.processor.RequestProcessor#process(org.modeshape.graph.request.ReadAllPropertiesRequest)
516      */
517     @Override
518     public void process( ReadAllPropertiesRequest request ) {
519         Map<Name, Property> properties = request.getPropertiesByName();
520         ProjectedRequest projectedRequest = federatedRequest.getFirstProjectedRequest();
521 
522         request.setCachePolicy(getDefaultCachePolicy());
523         Location actualLocation = request.at();
524         int numMerged = 0;
525         while (projectedRequest != null) {
526             ReadAllPropertiesRequest readFromSource = (ReadAllPropertiesRequest)projectedRequest.getRequest();
527             if (readFromSource.hasError()) {
528                 projectedRequest = projectedRequest.next();
529                 continue;
530             }
531             if (readFromSource.isCancelled()) {
532                 request.cancel();
533                 return;
534             }
535 
536             // Make sure we have an actual location ...
537             Location sourceLocation = readFromSource.getActualLocationOfNode();
538             actualLocation = determineActualLocation(actualLocation, sourceLocation, projectedRequest.getProjection());
539 
540             if (sourceLocation.hasIdProperties()) {
541                 // Accumulate the identification properties ...
542                 for (Property propertyInSource : sourceLocation.getIdProperties()) {
543                     Name name = propertyInSource.getName();
544                     Property existing = actualLocation.getIdProperty(name);
545                     if (existing != null) {
546                         // Merge the property values ...
547                         propertyInSource = merge(existing, propertyInSource, propertyFactory, true);
548                     }
549                     actualLocation = actualLocation.with(propertyInSource);
550                 }
551             }
552 
553             // Add all the properties ...
554             for (Property propertyInSource : readFromSource.getProperties()) {
555                 Name name = propertyInSource.getName();
556                 Property existing = properties.get(name);
557                 if (existing != null) {
558                     // Merge the property values ...
559                     propertyInSource = merge(existing, propertyInSource, propertyFactory, true);
560                 }
561                 properties.put(name, propertyInSource);
562             }
563             setCacheableInfo(request, readFromSource.getCachePolicy());
564             projectedRequest = projectedRequest.next();
565             ++numMerged;
566         }
567         if (numMerged == 0) {
568             // No source requests had results ...
569             setPathNotFound(request, request.at(), federatedRequest.getFirstProjectedRequest());
570         } else {
571             request.setActualLocationOfNode(actualLocation);
572         }
573     }
574 
575     /**
576      * {@inheritDoc}
577      * 
578      * @see org.modeshape.graph.request.processor.RequestProcessor#process(org.modeshape.graph.request.ReadPropertyRequest)
579      */
580     @Override
581     public void process( ReadPropertyRequest request ) {
582         ProjectedRequest projectedRequest = federatedRequest.getFirstProjectedRequest();
583 
584         request.setCachePolicy(getDefaultCachePolicy());
585         Location actualLocation = request.on();
586         int numMerged = 0;
587         while (projectedRequest != null) {
588             ReadPropertyRequest readFromSource = (ReadPropertyRequest)projectedRequest.getRequest();
589             if (readFromSource.hasError()) {
590                 projectedRequest = projectedRequest.next();
591                 continue;
592             }
593             if (readFromSource.isCancelled()) {
594                 request.cancel();
595                 return;
596             }
597 
598             // Make sure we have an actual location ...
599             Location sourceLocation = readFromSource.getActualLocationOfNode();
600             actualLocation = determineActualLocation(actualLocation, sourceLocation, projectedRequest.getProjection());
601 
602             if (sourceLocation.hasIdProperties()) {
603                 // Accumulate the identification properties ...
604                 for (Property propertyInSource : sourceLocation.getIdProperties()) {
605                     Name name = propertyInSource.getName();
606                     Property existing = actualLocation.getIdProperty(name);
607                     if (existing != null) {
608                         // Merge the property values ...
609                         propertyInSource = merge(existing, propertyInSource, propertyFactory, true);
610                     }
611                     actualLocation = actualLocation.with(propertyInSource);
612                 }
613             }
614 
615             // Add all the properties ...
616             Property read = readFromSource.getProperty();
617             if (read != null) {
618                 Property existing = request.getProperty();
619                 if (existing != null) {
620                     // Merge the property values ...
621                     request.setProperty(merge(existing, read, propertyFactory, true));
622                 } else {
623                     request.setProperty(read);
624                 }
625             }
626             setCacheableInfo(request, readFromSource.getCachePolicy());
627             projectedRequest = projectedRequest.next();
628             ++numMerged;
629         }
630         if (numMerged == 0) {
631             // No source requests had results ...
632             setPathNotFound(request, request.on(), federatedRequest.getFirstProjectedRequest());
633         } else {
634             request.setActualLocationOfNode(actualLocation);
635         }
636     }
637 
638     /**
639      * {@inheritDoc}
640      * 
641      * @see org.modeshape.graph.request.processor.RequestProcessor#process(org.modeshape.graph.request.ReadBranchRequest)
642      */
643     @Override
644     public void process( ReadBranchRequest request ) {
645         ProjectedRequest projectedRequest = federatedRequest.getFirstProjectedRequest();
646 
647         request.setCachePolicy(getDefaultCachePolicy());
648         Location actualLocation = request.at();
649         int numMerged = 0;
650         // The first pass will only capture the actual ReadBranchRequests to the underlying sources ...
651         Map<Path, Location> actualLocationsOfProxyNodes = new HashMap<Path, Location>();
652         while (projectedRequest != null) {
653             CacheableRequest fromSource = (CacheableRequest)projectedRequest.getRequest();
654             if (fromSource.hasError()) {
655                 projectedRequest = projectedRequest.next();
656                 continue;
657             }
658             if (fromSource.isCancelled()) {
659                 request.cancel();
660                 return;
661             }
662 
663             Projection projection = projectedRequest.getProjection();
664             if (RequestType.READ_BRANCH == fromSource.getType()) {
665                 ReadBranchRequest readFromSource = (ReadBranchRequest)fromSource;
666                 for (Location parent : readFromSource) {
667                     List<Location> children = readFromSource.getChildren(parent);
668                     Map<Name, Property> properties = readFromSource.getPropertiesFor(parent);
669                     projectToFederated(actualLocation, projection, request, parent, children, properties);
670                 }
671                 Location locationOfProxy = readFromSource.getActualLocationOfNode();
672                 // The location is in terms of the source, so get the projected location ...
673                 locationOfProxy = determineActualLocation(locationOfProxy, projection);
674                 actualLocationsOfProxyNodes.put(locationOfProxy.getPath(), locationOfProxy);
675             }
676             setCacheableInfo(request, fromSource.getCachePolicy());
677             projectedRequest = projectedRequest.next();
678             ++numMerged;
679         }
680         // Go through the requests and process the ReadNodeRequests (which were reading children of placeholders)...
681         projectedRequest = federatedRequest.getFirstProjectedRequest();
682         while (projectedRequest != null) {
683             CacheableRequest fromSource = (CacheableRequest)projectedRequest.getRequest();
684             Projection projection = projectedRequest.getProjection();
685             if (RequestType.READ_NODE == fromSource.getType()) {
686                 ReadNodeRequest readFromSource = (ReadNodeRequest)fromSource;
687                 Location parent = readFromSource.getActualLocationOfNode();
688                 List<Location> children = readFromSource.getChildren();
689                 for (int i = 0; i != children.size(); ++i) {
690                     Location child = children.get(i);
691                     if (!child.hasIdProperties()) {
692                         // The the child must have been a proxy node ...
693                         Location actual = actualLocationsOfProxyNodes.get(child.getPath());
694                         assert actual != null;
695                         children.set(i, actual);
696                     }
697                 }
698                 Map<Name, Property> properties = readFromSource.getPropertiesByName();
699                 projectToFederated(actualLocation, projection, request, parent, children, properties);
700             }
701             setCacheableInfo(request, fromSource.getCachePolicy());
702             projectedRequest = projectedRequest.next();
703         }
704 
705         if (numMerged == 0) {
706             // No source requests had results ...
707             setPathNotFound(request, request.at(), federatedRequest.getFirstProjectedRequest());
708         } else {
709             request.setActualLocationOfNode(actualLocation);
710         }
711     }
712 
713     /**
714      * Project the supplied node information read from a source and update the supplied request.
715      * 
716      * @param ancestorInFederation the federated node under which this information is being projected; may not be null
717      * @param projection the projection used to make the original source request; may not be null
718      * @param request the federated request upon which the results are to be recorded; may not be null
719      * @param parent the location of the parent in the source; may not be null
720      * @param children the location of the children in the source; may be null or empty
721      * @param propertiesByName the properties on the parent in the source; may be null or empty
722      */
723     protected void projectToFederated( Location ancestorInFederation,
724                                        Projection projection,
725                                        ReadBranchRequest request,
726                                        Location parent,
727                                        List<Location> children,
728                                        Map<Name, Property> propertiesByName ) {
729         Path ancestorPath = ancestorInFederation.getPath();
730         if (projection == null) {
731             // This is a placeholder node ...
732             if (children != null) {
733                 // Add the children (to any existing children) ...
734                 List<Location> existing = request.getChildren(parent);
735                 if (existing == null) existing = new ArrayList<Location>(children.size());
736                 for (Location child : children) {
737                     existing.add(child);
738                 }
739                 request.setChildren(parent, existing);
740             }
741             if (propertiesByName != null) {
742                 // Add the properties to any existing properties ...
743                 Map<Name, Property> propsByName = request.getPropertiesFor(parent);
744                 if (propsByName == null) propsByName = new HashMap<Name, Property>();
745                 for (Property property : propertiesByName.values()) {
746                     Property existingProperty = propsByName.get(property.getName());
747                     if (existingProperty != null) {
748                         // Merge the property values ...
749                         property = merge(existingProperty, property, propertyFactory, true);
750                     }
751                     propsByName.put(property.getName(), property);
752                 }
753                 request.setProperties(parent, propsByName.values());
754             }
755             return;
756         }
757         for (Path path : projection.getPathsInRepository(parent.getPath(), pathFactory)) {
758             if (!path.isAtOrBelow(ancestorPath)) continue;
759 
760             // Determine the list of children ...
761             Location parentInFederation = parent.with(path);
762             if (children != null) {
763                 // Add the children to any existing children ...
764                 List<Location> existing = request.getChildren(parentInFederation);
765                 if (existing == null) existing = new ArrayList<Location>(children.size());
766                 for (Location child : children) {
767                     Path childPath = pathFactory.create(path, child.getPath().getLastSegment());
768                     existing.add(child.with(childPath));
769                 }
770                 request.setChildren(parentInFederation, existing);
771             }
772 
773             // Set or update the properties ...
774             if (propertiesByName != null) {
775                 Map<Name, Property> propsByName = request.getPropertiesFor(parentInFederation);
776                 if (propsByName == null) propsByName = new HashMap<Name, Property>();
777                 for (Property property : propertiesByName.values()) {
778                     Property existingProperty = propsByName.get(property.getName());
779                     if (existingProperty != null) {
780                         // Merge the property values ...
781                         property = merge(existingProperty, property, propertyFactory, true);
782                     }
783                     propsByName.put(property.getName(), property);
784                 }
785                 request.setProperties(parentInFederation, propsByName.values());
786             }
787             // We're done, since we found a path that's on the ancestor path ...
788             return;
789         }
790     }
791 
792     /**
793      * Project the supplied location in a source into its federated location. The projection is used to find the location under
794      * the supplied ancestor. Any errors are recorded on the original request.
795      * 
796      * @param ancestorInFederation the ancestor in the federated repository; may not be null
797      * @param projection the projection that should be used; may not be null
798      * @param actualSourceLocation the actual location in the source that is to be projected back into the federated repository;
799      *        may not be null
800      * @param originalRequest the original request, if there are errors; may not be null
801      * @return the location in the federated repository
802      */
803     protected Location projectToFederated( Location ancestorInFederation,
804                                            Projection projection,
805                                            Location actualSourceLocation,
806                                            Request originalRequest ) {
807         Path ancestorPath = ancestorInFederation.getPath();
808         Path actualPathInSource = actualSourceLocation.getPath();
809         // Project the actual location ...
810         for (Path path : projection.getPathsInRepository(actualPathInSource, pathFactory)) {
811             if (path.isAtOrBelow(ancestorPath)) {
812                 return actualSourceLocation.with(path);
813             }
814         }
815         // Record that there was an error projecting the results ...
816         String whereInSource = actualSourceLocation.getString(getExecutionContext().getNamespaceRegistry());
817         String msg = GraphI18n.unableToProjectSourceInformationIntoWorkspace.text(whereInSource, getSourceName(), projection);
818         originalRequest.setError(new InvalidRequestException(msg));
819         return null;
820     }
821 
822     /**
823      * Project the supplied location in a source into its federated location. The projection is used to find the location under
824      * the supplied ancestor. Any errors are recorded on the original request.
825      * 
826      * @param projection the projection that should be used; may not be null
827      * @param actualSourceLocation the actual location in the source that is to be projected back into the federated repository;
828      *        may not be null
829      * @param originalRequest the original request, if there are errors; may not be null
830      * @return the location in the federated repository
831      */
832     protected Location projectToFederated( Projection projection,
833                                            Location actualSourceLocation,
834                                            Request originalRequest ) {
835         Path actualPathInSource = actualSourceLocation.getPath();
836         // Project the actual location ...
837         for (Path path : projection.getPathsInRepository(actualPathInSource, pathFactory)) {
838             return actualSourceLocation.with(path);
839         }
840         // Record that there was an error projecting the results ...
841         String whereInSource = actualSourceLocation.getString(getExecutionContext().getNamespaceRegistry());
842         String msg = GraphI18n.unableToProjectSourceInformationIntoWorkspace.text(whereInSource, getSourceName(), projection);
843         originalRequest.setError(new InvalidRequestException(msg));
844         return null;
845     }
846 
847     /**
848      * {@inheritDoc}
849      * 
850      * @see org.modeshape.graph.request.processor.RequestProcessor#process(org.modeshape.graph.request.CreateNodeRequest)
851      */
852     @Override
853     public void process( CreateNodeRequest request ) {
854         ProjectedRequest projected = federatedRequest.getFirstProjectedRequest();
855         // Check the projection first ...
856         if (checkErrorOrCancel(request, federatedRequest)) return;
857 
858         Request projectedRequest = projected.getRequest();
859         // Check the error on the projected request ...
860         if (checkErrorOrCancel(request, projectedRequest)) return;
861 
862         // No error, so project the results back to the federated repository ...
863         Location sourceLocation = null;
864         if (RequestType.CREATE_NODE == projectedRequest.getType()) {
865             CreateNodeRequest source = (CreateNodeRequest)projectedRequest;
866             sourceLocation = source.getActualLocationOfNode();
867         } else if (RequestType.READ_NODE == projectedRequest.getType()) {
868             // In this case, the original request was to create the node only if it was missing,
869             // but we knew it already exists because the parent was a placeholder and the child
870             // mapped to an existing proxy node. Therefore, record the location...
871             ReadNodeRequest source = (ReadNodeRequest)projectedRequest;
872             sourceLocation = source.getActualLocationOfNode();
873         }
874         request.setActualLocationOfNode(projectToFederated(request.under(), projected.getProjection(), sourceLocation, request));
875     }
876 
877     /**
878      * {@inheritDoc}
879      * 
880      * @see org.modeshape.graph.request.processor.RequestProcessor#process(org.modeshape.graph.request.UpdatePropertiesRequest)
881      */
882     @Override
883     public void process( UpdatePropertiesRequest request ) {
884         ProjectedRequest projected = federatedRequest.getFirstProjectedRequest();
885         // Check the projection first ...
886         if (checkErrorOrCancel(request, federatedRequest)) return;
887 
888         UpdatePropertiesRequest source = (UpdatePropertiesRequest)projected.getRequest();
889         if (checkErrorOrCancel(request, source)) return;
890         Location sourceLocation = source.getActualLocationOfNode();
891         request.setActualLocationOfNode(projectToFederated(request.on(), projected.getProjection(), sourceLocation, request));
892         request.setNewProperties(source.getNewPropertyNames());
893     }
894 
895     /**
896      * {@inheritDoc}
897      * 
898      * @see org.modeshape.graph.request.processor.RequestProcessor#process(org.modeshape.graph.request.SetPropertyRequest)
899      */
900     @Override
901     public void process( SetPropertyRequest request ) {
902         ProjectedRequest projected = federatedRequest.getFirstProjectedRequest();
903         // Check the projection first ...
904         if (checkErrorOrCancel(request, federatedRequest)) return;
905 
906         SetPropertyRequest source = (SetPropertyRequest)projected.getRequest();
907         if (checkErrorOrCancel(request, source)) return;
908         // Set the actual location and created flags ...
909         Location sourceLocation = source.getActualLocationOfNode();
910         request.setActualLocationOfNode(projectToFederated(request.on(), projected.getProjection(), sourceLocation, request));
911         request.setNewProperty(source.isNewProperty());
912     }
913 
914     /**
915      * {@inheritDoc}
916      * 
917      * @see org.modeshape.graph.request.processor.RequestProcessor#process(org.modeshape.graph.request.RemovePropertyRequest)
918      */
919     @Override
920     public void process( RemovePropertyRequest request ) {
921         ProjectedRequest projected = federatedRequest.getFirstProjectedRequest();
922         // Check the projection first ...
923         if (checkErrorOrCancel(request, federatedRequest)) return;
924 
925         SetPropertyRequest source = (SetPropertyRequest)projected.getRequest();
926         if (checkErrorOrCancel(request, source)) return;
927         Location sourceLocation = source.getActualLocationOfNode();
928         request.setActualLocationOfNode(projectToFederated(request.from(), projected.getProjection(), sourceLocation, request));
929     }
930 
931     /**
932      * {@inheritDoc}
933      * 
934      * @see org.modeshape.graph.request.processor.RequestProcessor#process(org.modeshape.graph.request.DeleteBranchRequest)
935      */
936     @Override
937     public void process( DeleteBranchRequest request ) {
938         ProjectedRequest projected = federatedRequest.getFirstProjectedRequest();
939         // Check the projection first ...
940         if (checkErrorOrCancel(request, federatedRequest)) return;
941 
942         // Do an initial check to make sure that there was no error on the source that prevented projection
943         Request projectedSource = projected.getRequest();
944         if (checkErrorOrCancel(request, projectedSource)) return;
945 
946         // Go through the projected requests, and look for the top-most node ...
947         Location highest = null;
948         while (projected != null) {
949             // The projected request should either be a DeleteChildrenRequest (if the node being deleted is
950             // at the top of a projection and therefore required to exist) or a DeleteBranchRequest (in all
951             // other cases)...
952             Location actual = null;
953             Request sourceRequest = projected.getRequest();
954             if (RequestType.DELETE_BRANCH == sourceRequest.getType()) {
955                 DeleteBranchRequest source = (DeleteBranchRequest)projected.getRequest();
956                 actual = source.getActualLocationOfNode();
957             } else {
958                 DeleteChildrenRequest source = (DeleteChildrenRequest)projected.getRequest();
959                 actual = source.getActualLocationOfNode();
960             }
961             if (checkErrorOrCancel(request, sourceRequest)) return;
962             if (!projected.isSameLocation() && projected.getProjection() != null) {
963                 actual = projectToFederated(request.at(), projected.getProjection(), actual, request);
964             }
965             if (highest == null) highest = actual;
966             else if (highest.getPath().isDecendantOf(actual.getPath())) highest = actual;
967             projected = projected.next();
968         }
969         assert highest != null;
970         request.setActualLocationOfNode(highest);
971     }
972 
973     /**
974      * {@inheritDoc}
975      * 
976      * @see org.modeshape.graph.request.processor.RequestProcessor#process(org.modeshape.graph.request.DeleteChildrenRequest)
977      */
978     @Override
979     public void process( DeleteChildrenRequest request ) {
980         ProjectedRequest projected = federatedRequest.getFirstProjectedRequest();
981         // Check the projection first ...
982         if (checkErrorOrCancel(request, federatedRequest)) return;
983 
984         // Do an initial check to make sure that there was no error on the source that prevented projection
985         Request projectedSource = projected.getRequest();
986         if (checkErrorOrCancel(request, projectedSource)) return;
987 
988         // Go through the projected requests, and look for the top-most node ...
989         Location highest = null;
990         while (projected != null) {
991             // The projected request should a DeleteChildrenRequest ...
992             Request sourceRequest = projected.getRequest();
993             DeleteChildrenRequest source = (DeleteChildrenRequest)projected.getRequest();
994             Location actual = source.getActualLocationOfNode();
995             if (checkErrorOrCancel(request, sourceRequest)) return;
996             if (!projected.isSameLocation() && projected.getProjection() != null) {
997                 actual = projectToFederated(request.at(), projected.getProjection(), actual, request);
998             }
999             if (highest == null) highest = actual;
1000             else if (highest.getPath().isDecendantOf(actual.getPath())) highest = actual;
1001             projected = projected.next();
1002         }
1003         assert highest != null;
1004         request.setActualLocationOfNode(highest);
1005     }
1006 
1007     /**
1008      * {@inheritDoc}
1009      * 
1010      * @see org.modeshape.graph.request.processor.RequestProcessor#process(org.modeshape.graph.request.RenameNodeRequest)
1011      */
1012     @Override
1013     public void process( RenameNodeRequest request ) {
1014         ProjectedRequest projected = federatedRequest.getFirstProjectedRequest();
1015         // Check the projection first ...
1016         if (checkErrorOrCancel(request, federatedRequest)) return;
1017 
1018         RenameNodeRequest source = (RenameNodeRequest)projected.getRequest();
1019         if (checkErrorOrCancel(request, source)) return;
1020         Location locationBefore = source.getActualLocationBefore();
1021         Location locationAfter = source.getActualLocationBefore();
1022         locationBefore = projectToFederated(request.at(), projected.getProjection(), locationBefore, request);
1023         locationAfter = projectToFederated(request.at(), projected.getSecondProjection(), locationAfter, request);
1024         request.setActualLocations(locationBefore, locationAfter);
1025     }
1026 
1027     /**
1028      * {@inheritDoc}
1029      * 
1030      * @see org.modeshape.graph.request.processor.RequestProcessor#process(org.modeshape.graph.request.CopyBranchRequest)
1031      */
1032     @Override
1033     public void process( CopyBranchRequest request ) {
1034         ProjectedRequest projected = federatedRequest.getFirstProjectedRequest();
1035         // Check the projection first ...
1036         if (checkErrorOrCancel(request, federatedRequest)) return;
1037 
1038         CopyBranchRequest source = (CopyBranchRequest)projected.getRequest();
1039         if (checkErrorOrCancel(request, source)) return;
1040         Location locationBefore = source.getActualLocationBefore();
1041         Location locationAfter = source.getActualLocationBefore();
1042         locationBefore = projectToFederated(request.from(), projected.getProjection(), locationBefore, request);
1043         locationAfter = projectToFederated(request.into(), projected.getSecondProjection(), locationAfter, request);
1044         request.setActualLocations(locationBefore, locationAfter);
1045     }
1046 
1047     /**
1048      * {@inheritDoc}
1049      * 
1050      * @see org.modeshape.graph.request.processor.RequestProcessor#process(org.modeshape.graph.request.CloneBranchRequest)
1051      */
1052     @Override
1053     public void process( CloneBranchRequest request ) {
1054         ProjectedRequest projected = federatedRequest.getFirstProjectedRequest();
1055         // Check the projection first ...
1056         if (checkErrorOrCancel(request, federatedRequest)) return;
1057 
1058         CloneBranchRequest source = (CloneBranchRequest)projected.getRequest();
1059         if (checkErrorOrCancel(request, source)) return;
1060         Location locationBefore = source.getActualLocationBefore();
1061         Location locationAfter = source.getActualLocationBefore();
1062         locationBefore = projectToFederated(request.from(), projected.getProjection(), locationBefore, request);
1063         locationAfter = projectToFederated(request.into(), projected.getSecondProjection(), locationAfter, request);
1064         request.setActualLocations(locationBefore, locationAfter);
1065         if (source.removeExisting()) {
1066             Set<Location> removed = new HashSet<Location>();
1067             for (Location location : request.getRemovedNodes()) {
1068                 removed.add(projectToFederated(projected.getSecondProjection(), location, request));
1069             }
1070             request.setRemovedNodes(Collections.unmodifiableSet(removed));
1071         }
1072     }
1073 
1074     /**
1075      * {@inheritDoc}
1076      * 
1077      * @see org.modeshape.graph.request.processor.RequestProcessor#process(org.modeshape.graph.request.MoveBranchRequest)
1078      */
1079     @Override
1080     public void process( MoveBranchRequest request ) {
1081         ProjectedRequest projected = federatedRequest.getFirstProjectedRequest();
1082         // Check the projection first ...
1083         if (checkErrorOrCancel(request, federatedRequest)) return;
1084 
1085         MoveBranchRequest source = (MoveBranchRequest)projected.getRequest();
1086         if (checkErrorOrCancel(request, source)) return;
1087         Location locationBefore = source.getActualLocationBefore();
1088         Location locationAfter = source.getActualLocationBefore();
1089         locationBefore = projectToFederated(request.from(), projected.getProjection(), locationBefore, request);
1090         Projection afterProjection = projected.getSecondProjection();
1091         if (afterProjection == null) projected.getProjection();
1092         locationAfter = projectToFederated(request.into(), afterProjection, locationAfter, request);
1093         request.setActualLocations(locationBefore, locationAfter);
1094     }
1095 
1096     /**
1097      * {@inheritDoc}
1098      * 
1099      * @see org.modeshape.graph.request.processor.RequestProcessor#process(org.modeshape.graph.request.VerifyWorkspaceRequest)
1100      */
1101     @Override
1102     public void process( VerifyWorkspaceRequest request ) {
1103         ProjectedRequest projectedRequest = federatedRequest.getFirstProjectedRequest();
1104 
1105         Location actualLocation = Location.create(getExecutionContext().getValueFactories().getPathFactory().createRootPath());
1106         while (projectedRequest != null) {
1107             VerifyNodeExistsRequest readFromSource = (VerifyNodeExistsRequest)projectedRequest.getRequest();
1108             if (readFromSource.hasError()) {
1109                 request.setError(readFromSource.getError());
1110                 return;
1111             }
1112             request.setError(null);
1113             if (readFromSource.isCancelled()) {
1114                 request.cancel();
1115                 return;
1116             }
1117 
1118             Location sourceLocation = readFromSource.getActualLocationOfNode();
1119             if (sourceLocation.hasIdProperties()) {
1120                 // Accumulate the identification properties ...
1121                 for (Property propertyInSource : sourceLocation.getIdProperties()) {
1122                     Name name = propertyInSource.getName();
1123                     Property existing = actualLocation.getIdProperty(name);
1124                     if (existing != null) {
1125                         // Merge the property values ...
1126                         propertyInSource = merge(existing, propertyInSource, propertyFactory, true);
1127                     }
1128                     actualLocation = actualLocation.with(propertyInSource);
1129                 }
1130             }
1131             projectedRequest = projectedRequest.next();
1132         }
1133         request.setActualRootLocation(actualLocation);
1134     }
1135 
1136     /**
1137      * {@inheritDoc}
1138      * 
1139      * @see org.modeshape.graph.request.processor.RequestProcessor#process(org.modeshape.graph.request.GetWorkspacesRequest)
1140      */
1141     @Override
1142     public void process( GetWorkspacesRequest request ) {
1143         throw new UnsupportedOperationException(); // should never be called, since it's handled in the ForkProcessor
1144     }
1145 
1146     /**
1147      * {@inheritDoc}
1148      * 
1149      * @see org.modeshape.graph.request.processor.RequestProcessor#process(org.modeshape.graph.request.CreateWorkspaceRequest)
1150      */
1151     @Override
1152     public void process( CreateWorkspaceRequest request ) {
1153         throw new UnsupportedOperationException(); // should never be called, since it's handled in the ForkProcessor
1154     }
1155 
1156     /**
1157      * {@inheritDoc}
1158      * 
1159      * @see org.modeshape.graph.request.processor.RequestProcessor#process(org.modeshape.graph.request.CloneWorkspaceRequest)
1160      */
1161     @Override
1162     public void process( CloneWorkspaceRequest request ) {
1163         throw new UnsupportedOperationException(); // should never be called, since it's handled in the ForkProcessor
1164     }
1165 
1166     /**
1167      * {@inheritDoc}
1168      * 
1169      * @see org.modeshape.graph.request.processor.RequestProcessor#process(org.modeshape.graph.request.DestroyWorkspaceRequest)
1170      */
1171     @Override
1172     public void process( DestroyWorkspaceRequest request ) {
1173         throw new UnsupportedOperationException(); // should never be called, since it's handled in the ForkProcessor
1174     }
1175 
1176     /**
1177      * {@inheritDoc}
1178      * 
1179      * @see org.modeshape.graph.request.processor.RequestProcessor#process(org.modeshape.graph.request.AccessQueryRequest)
1180      */
1181     @Override
1182     public void process( AccessQueryRequest request ) {
1183         throw new UnsupportedOperationException(); // should never be called
1184     }
1185 
1186     /**
1187      * {@inheritDoc}
1188      * 
1189      * @see org.modeshape.graph.request.processor.RequestProcessor#process(org.modeshape.graph.request.FullTextSearchRequest)
1190      */
1191     @Override
1192     public void process( FullTextSearchRequest request ) {
1193         throw new UnsupportedOperationException(); // should never be called
1194     }
1195 
1196     protected boolean checkErrorOrCancel( Request request,
1197                                           FederatedRequest federatedRequest ) {
1198         if (federatedRequest.getFirstProjectedRequest() == null) {
1199             Request original = federatedRequest.original();
1200             if (original.hasError()) {
1201                 // No source requests had results ...
1202                 request.setError(original.getError());
1203                 return true;
1204             }
1205             assert original.isCancelled();
1206             request.cancel();
1207             return true;
1208         }
1209         return false;
1210     }
1211 
1212     protected boolean checkErrorOrCancel( Request request,
1213                                           Request sourceRequest ) {
1214         if (sourceRequest.hasError()) {
1215             request.setError(sourceRequest.getError());
1216             return true;
1217         }
1218         if (sourceRequest.isCancelled()) {
1219             request.cancel();
1220             return true;
1221         }
1222         return false;
1223     }
1224 
1225     /**
1226      * Merge the values from the two properties with the same name, returning a new property with the newly merged values.
1227      * <p>
1228      * The current algorithm merges the values by concatenating the values from <code>property1</code> and <code>property2</code>,
1229      * and if <code>removeDuplicates</code> is true any values in <code>property2</code> that are identical to values found in
1230      * <code>property1</code> are skipped.
1231      * </p>
1232      * 
1233      * @param property1 the first property; may not be null, and must have the same {@link Property#getName() name} as
1234      *        <code>property2</code>
1235      * @param property2 the second property; may not be null, and must have the same {@link Property#getName() name} as
1236      *        <code>property1</code>
1237      * @param factory the property factory, used to create the result
1238      * @param removeDuplicates true if this method removes any values in the second property that duplicate values found in the
1239      *        first property.
1240      * @return the property that contains the same {@link Property#getName() name} as the input properties, but with values that
1241      *         are merged from both of the input properties
1242      */
1243     protected Property merge( Property property1,
1244                               Property property2,
1245                               PropertyFactory factory,
1246                               boolean removeDuplicates ) {
1247         assert property1 != null;
1248         assert property2 != null;
1249         assert property1.getName().equals(property2.getName());
1250         if (property1.isEmpty()) return property2;
1251         if (property2.isEmpty()) return property1;
1252 
1253         // If they are both single-valued, then we can use a more efficient algorithm ...
1254         if (property1.isSingle() && property2.isSingle()) {
1255             Object value1 = property1.getValues().next();
1256             Object value2 = property2.getValues().next();
1257             if (removeDuplicates && ValueComparators.OBJECT_COMPARATOR.compare(value1, value2) == 0) return property1;
1258             return factory.create(property1.getName(), new Object[] {value1, value2});
1259         }
1260 
1261         // One or both properties are multi-valued, so use an algorithm that works with in all cases ...
1262         if (!removeDuplicates) {
1263             Iterator<?> valueIterator = new DualIterator(property1.getValues(), property2.getValues());
1264             return factory.create(property1.getName(), valueIterator);
1265         }
1266 
1267         // First copy all the values from property 1 ...
1268         Object[] values = new Object[property1.size() + property2.size()];
1269         int index = 0;
1270         for (Object property1Value : property1) {
1271             values[index++] = property1Value;
1272         }
1273         assert index == property1.size();
1274         // Now add any values of property2 that don't match a value in property1 ...
1275         for (Object property2Value : property2) {
1276             // Brute force, go through the values of property1 and compare ...
1277             boolean matched = false;
1278             for (Object property1Value : property1) {
1279                 if (ValueComparators.OBJECT_COMPARATOR.compare(property1Value, property2Value) == 0) {
1280                     // The values are the same ...
1281                     matched = true;
1282                     break;
1283                 }
1284             }
1285             if (!matched) values[index++] = property2Value;
1286         }
1287         if (index != values.length) {
1288             Object[] newValues = new Object[index];
1289             System.arraycopy(values, 0, newValues, 0, index);
1290             values = newValues;
1291         }
1292         return factory.create(property1.getName(), values);
1293     }
1294 
1295     protected static class DualIterator implements Iterator<Object> {
1296 
1297         private final Iterator<?>[] iterators;
1298         private Iterator<?> current;
1299         private int index = 0;
1300 
1301         protected DualIterator( Iterator<?>... iterators ) {
1302             assert iterators != null;
1303             assert iterators.length > 0;
1304             this.iterators = iterators;
1305             this.current = this.iterators[0];
1306         }
1307 
1308         /**
1309          * {@inheritDoc}
1310          * 
1311          * @see java.util.Iterator#hasNext()
1312          */
1313         public boolean hasNext() {
1314             if (this.current != null) return this.current.hasNext();
1315             return false;
1316         }
1317 
1318         /**
1319          * {@inheritDoc}
1320          * 
1321          * @see java.util.Iterator#next()
1322          */
1323         public Object next() {
1324             while (this.current != null) {
1325                 if (this.current.hasNext()) return this.current.next();
1326                 // Get the next iterator ...
1327                 if (++this.index < iterators.length) {
1328                     this.current = this.iterators[this.index];
1329                 } else {
1330                     this.current = null;
1331                 }
1332             }
1333             throw new NoSuchElementException();
1334         }
1335 
1336         /**
1337          * {@inheritDoc}
1338          * 
1339          * @see java.util.Iterator#remove()
1340          */
1341         public void remove() {
1342             throw new UnsupportedOperationException();
1343         }
1344     }
1345 
1346 }