001 /*
002 * JBoss DNA (http://www.jboss.org/dna)
003 * See the COPYRIGHT.txt file distributed with this work for information
004 * regarding copyright ownership. Some portions may be licensed
005 * to Red Hat, Inc. under one or more contributor license agreements.
006 * See the AUTHORS.txt file in the distribution for a full listing of
007 * individual contributors.
008 *
009 * JBoss DNA is free software. Unless otherwise indicated, all code in JBoss DNA
010 * is licensed to you under the terms of the GNU Lesser General Public License as
011 * published by the Free Software Foundation; either version 2.1 of
012 * the License, or (at your option) any later version.
013 *
014 * JBoss DNA is distributed in the hope that it will be useful,
015 * but WITHOUT ANY WARRANTY; without even the implied warranty of
016 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
017 * Lesser General Public License for more details.
018 *
019 * You should have received a copy of the GNU Lesser General Public
020 * License along with this software; if not, write to the Free
021 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
022 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
023 */
024 package org.jboss.dna.connector.federation;
025
026 import java.util.ArrayList;
027 import java.util.Collection;
028 import java.util.Collections;
029 import java.util.HashMap;
030 import java.util.HashSet;
031 import java.util.Iterator;
032 import java.util.LinkedList;
033 import java.util.List;
034 import java.util.Map;
035 import java.util.Set;
036 import java.util.UUID;
037 import java.util.concurrent.TimeUnit;
038 import net.jcip.annotations.Immutable;
039 import net.jcip.annotations.NotThreadSafe;
040 import org.jboss.dna.common.i18n.I18n;
041 import org.jboss.dna.common.util.CheckArg;
042 import org.jboss.dna.common.util.Logger;
043 import org.jboss.dna.connector.federation.contribution.Contribution;
044 import org.jboss.dna.connector.federation.merge.FederatedNode;
045 import org.jboss.dna.connector.federation.merge.MergePlan;
046 import org.jboss.dna.graph.DnaLexicon;
047 import org.jboss.dna.graph.ExecutionContext;
048 import org.jboss.dna.graph.JcrLexicon;
049 import org.jboss.dna.graph.Location;
050 import org.jboss.dna.graph.NodeConflictBehavior;
051 import org.jboss.dna.graph.cache.CachePolicy;
052 import org.jboss.dna.graph.connector.RepositoryConnection;
053 import org.jboss.dna.graph.connector.RepositoryConnectionFactory;
054 import org.jboss.dna.graph.connector.RepositorySource;
055 import org.jboss.dna.graph.connector.RepositorySourceException;
056 import org.jboss.dna.graph.property.DateTime;
057 import org.jboss.dna.graph.property.Name;
058 import org.jboss.dna.graph.property.Path;
059 import org.jboss.dna.graph.property.PathFactory;
060 import org.jboss.dna.graph.property.PathNotFoundException;
061 import org.jboss.dna.graph.property.Property;
062 import org.jboss.dna.graph.property.PropertyFactory;
063 import org.jboss.dna.graph.request.CloneWorkspaceRequest;
064 import org.jboss.dna.graph.request.CompositeRequest;
065 import org.jboss.dna.graph.request.CopyBranchRequest;
066 import org.jboss.dna.graph.request.CreateNodeRequest;
067 import org.jboss.dna.graph.request.CreateWorkspaceRequest;
068 import org.jboss.dna.graph.request.DeleteBranchRequest;
069 import org.jboss.dna.graph.request.DestroyWorkspaceRequest;
070 import org.jboss.dna.graph.request.GetWorkspacesRequest;
071 import org.jboss.dna.graph.request.InvalidWorkspaceException;
072 import org.jboss.dna.graph.request.MoveBranchRequest;
073 import org.jboss.dna.graph.request.ReadAllChildrenRequest;
074 import org.jboss.dna.graph.request.ReadAllPropertiesRequest;
075 import org.jboss.dna.graph.request.ReadNodeRequest;
076 import org.jboss.dna.graph.request.Request;
077 import org.jboss.dna.graph.request.UnsupportedRequestException;
078 import org.jboss.dna.graph.request.UpdatePropertiesRequest;
079 import org.jboss.dna.graph.request.VerifyWorkspaceRequest;
080 import org.jboss.dna.graph.request.processor.RequestProcessor;
081
082 /**
083 * @author Randall Hauch
084 */
085 @NotThreadSafe
086 public class FederatingRequestProcessor extends RequestProcessor {
087
088 private static final Set<Name> HIDDEN_PROPERTIES = Collections.singleton(DnaLexicon.MERGE_PLAN);
089
090 private final Map<String, FederatedWorkspace> workspaces;
091 private final FederatedWorkspace defaultWorkspace;
092 private final RepositoryConnectionFactory connectionFactory;
093 /** The set of all connections, including the cache connection */
094 private final Map<String, RepositoryConnection> connectionsBySourceName;
095 protected final PathFactory pathFactory;
096 private Logger logger;
097
098 /**
099 * Create a command executor that federates (merges) the information from multiple sources described by the source projections
100 * for the particular workspace specified by the request(s). The request processor will use the {@link Projection cache
101 * projection} of each {@link FederatedWorkspace workspace} to identify the {@link Projection#getSourceName() repository
102 * source} for the cache as well as the {@link Projection#getRules() rules} for how the paths are mapped in the cache. This
103 * cache will be consulted first for the requested information, and will be kept up to date as changes are made to the
104 * federated information.
105 *
106 * @param context the execution context in which the executor will be run; may not be null
107 * @param sourceName the name of the {@link RepositorySource} that is making use of this executor; may not be null or empty
108 * @param workspaces the configuration for each workspace, keyed by workspace name; may not be null
109 * @param defaultWorkspace the default workspace; null if there is no default
110 * @param connectionFactory the factory for {@link RepositoryConnection} instances
111 */
112 public FederatingRequestProcessor( ExecutionContext context,
113 String sourceName,
114 Map<String, FederatedWorkspace> workspaces,
115 FederatedWorkspace defaultWorkspace,
116 RepositoryConnectionFactory connectionFactory ) {
117 super(sourceName, context);
118 CheckArg.isNotEmpty(workspaces, "workspaces");
119 CheckArg.isNotNull(connectionFactory, "connectionFactory");
120 this.workspaces = workspaces;
121 this.connectionFactory = connectionFactory;
122 this.logger = context.getLogger(getClass());
123 this.connectionsBySourceName = new HashMap<String, RepositoryConnection>();
124 this.defaultWorkspace = defaultWorkspace; // may be null
125 this.pathFactory = context.getValueFactories().getPathFactory();
126 }
127
128 protected DateTime getCurrentTimeInUtc() {
129 return getExecutionContext().getValueFactories().getDateFactory().createUtc();
130 }
131
132 /**
133 * {@inheritDoc}
134 *
135 * @see RequestProcessor#close()
136 */
137 @Override
138 public void close() {
139 try {
140 super.close();
141 } finally {
142 // Make sure to close ALL open connections ...
143 for (RepositoryConnection connection : connectionsBySourceName.values()) {
144 if (connection == null) continue;
145 try {
146 connection.close();
147 } catch (Throwable t) {
148 logger.debug("Error while closing connection to {0}", connection.getSourceName());
149 }
150 }
151 connectionsBySourceName.clear();
152 }
153 }
154
155 protected RepositoryConnection getConnectionToCacheFor( FederatedWorkspace workspace ) throws RepositorySourceException {
156 return getConnection(workspace.getCacheProjection());
157 }
158
159 protected RepositoryConnection getConnection( Projection projection ) throws RepositorySourceException {
160 String sourceName = projection.getSourceName();
161 RepositoryConnection connection = connectionsBySourceName.get(sourceName);
162 if (connection == null) {
163 connection = connectionFactory.createConnection(sourceName);
164 connectionsBySourceName.put(sourceName, connection);
165 }
166 return connection;
167 }
168
169 protected Set<String> getOpenConnections() {
170 return connectionsBySourceName.keySet();
171 }
172
173 /**
174 * Utility to obtain the federated workspace referenced by the request. This method supports using the default workspace if
175 * the workspace name is null. If no such workspace, the request is marked with an appropriate error.
176 *
177 * @param request the request; may not be null
178 * @param workspaceName the name of the workspace; may be null if the default workspace should be used
179 * @return the federated workspace, or null if none was found
180 */
181 protected FederatedWorkspace getWorkspace( Request request,
182 String workspaceName ) {
183 FederatedWorkspace workspace = null;
184 if (workspaceName == null) {
185 if (defaultWorkspace != null) return defaultWorkspace;
186 // There is no default, so record the error ...
187 String msg = FederationI18n.noDefaultWorkspace.text(getSourceName());
188 request.setError(new InvalidWorkspaceException(msg));
189 }
190 workspace = workspaces.get(workspaceName);
191 if (workspace == null) {
192 // There is no workspace with this name, so record an error ...
193 String msg = FederationI18n.workspaceDoesNotExist.text(getSourceName(), workspaceName);
194 request.setError(new InvalidWorkspaceException(msg));
195 }
196 return workspace;
197 }
198
199 /**
200 * {@inheritDoc}
201 *
202 * @see org.jboss.dna.graph.request.processor.RequestProcessor#process(org.jboss.dna.graph.request.ReadAllChildrenRequest)
203 */
204 @Override
205 public void process( ReadAllChildrenRequest request ) {
206 FederatedWorkspace workspace = getWorkspace(request, request.inWorkspace());
207 if (workspace == null) return;
208 ReadNodeRequest nodeInfo = getNode(request.of(), workspace);
209 if (nodeInfo.hasError()) return;
210 for (Location child : nodeInfo.getChildren()) {
211 request.addChild(child);
212 }
213 request.setActualLocationOfNode(nodeInfo.getActualLocationOfNode());
214 }
215
216 /**
217 * {@inheritDoc}
218 *
219 * @see org.jboss.dna.graph.request.processor.RequestProcessor#process(org.jboss.dna.graph.request.ReadAllPropertiesRequest)
220 */
221 @Override
222 public void process( ReadAllPropertiesRequest request ) {
223 FederatedWorkspace workspace = getWorkspace(request, request.inWorkspace());
224 if (workspace == null) return;
225 ReadNodeRequest nodeInfo = getNode(request.at(), workspace);
226 if (nodeInfo.hasError()) return;
227 for (Property property : nodeInfo.getProperties()) {
228 if (HIDDEN_PROPERTIES.contains(property.getName())) continue;
229 request.addProperty(property);
230 }
231 request.setActualLocationOfNode(nodeInfo.getActualLocationOfNode());
232 }
233
234 /**
235 * {@inheritDoc}
236 *
237 * @see org.jboss.dna.graph.request.processor.RequestProcessor#process(org.jboss.dna.graph.request.ReadNodeRequest)
238 */
239 @Override
240 public void process( ReadNodeRequest request ) {
241 FederatedWorkspace workspace = getWorkspace(request, request.inWorkspace());
242 if (workspace == null) return;
243 ReadNodeRequest nodeInfo = getNode(request.at(), workspace);
244 if (nodeInfo.hasError()) return;
245 for (Property property : nodeInfo.getProperties()) {
246 if (HIDDEN_PROPERTIES.contains(property.getName())) continue;
247 request.addProperty(property);
248 }
249 for (Location child : nodeInfo.getChildren()) {
250 request.addChild(child);
251 }
252 request.setActualLocationOfNode(nodeInfo.getActualLocationOfNode());
253 }
254
255 /**
256 * {@inheritDoc}
257 *
258 * @see org.jboss.dna.graph.request.processor.RequestProcessor#process(org.jboss.dna.graph.request.CreateNodeRequest)
259 */
260 @Override
261 public void process( CreateNodeRequest request ) {
262 FederatedWorkspace workspace = getWorkspace(request, request.inWorkspace());
263 if (workspace == null) return;
264
265 // Can push this down if and only if the entire request is within a single federated source ...
266 SingleProjection projection = asSingleProjection(workspace, request.under(), request);
267 if (projection == null) return;
268
269 // Push down the request ...
270 Location parentLocation = Location.create(projection.pathInSource);
271 String workspaceName = projection.projection.getWorkspaceName();
272 CreateNodeRequest sourceRequest = new CreateNodeRequest(parentLocation, workspaceName, request.named(),
273 request.properties());
274 execute(sourceRequest, projection.projection);
275
276 // Copy/transform the results ...
277 if (sourceRequest.hasError()) {
278 request.setError(sourceRequest.getError());
279 } else {
280 request.setActualLocationOfNode(projection.convertToRepository(sourceRequest.getActualLocationOfNode()));
281 }
282 }
283
284 /**
285 * {@inheritDoc}
286 *
287 * @see org.jboss.dna.graph.request.processor.RequestProcessor#process(org.jboss.dna.graph.request.DeleteBranchRequest)
288 */
289 @Override
290 public void process( DeleteBranchRequest request ) {
291 FederatedWorkspace workspace = getWorkspace(request, request.inWorkspace());
292 if (workspace == null) return;
293
294 // Can push this down if and only if the entire request is within a single federated source ...
295 SingleProjection projection = asSingleProjection(workspace, request.at(), request);
296 if (projection == null) return;
297
298 // Push down the request ...
299 Location sourceLocation = Location.create(projection.pathInSource);
300 String workspaceName = projection.projection.getWorkspaceName();
301 DeleteBranchRequest sourceRequest = new DeleteBranchRequest(sourceLocation, workspaceName);
302 execute(sourceRequest, projection.projection);
303
304 // Copy/transform the results ...
305 if (sourceRequest.hasError()) {
306 request.setError(sourceRequest.getError());
307 } else {
308 request.setActualLocationOfNode(projection.convertToRepository(sourceRequest.getActualLocationOfNode()));
309 }
310
311 // Delete in the cache ...
312 DeleteBranchRequest cacheRequest = new DeleteBranchRequest(request.at(), workspace.getCacheProjection()
313 .getWorkspaceName());
314 executeInCache(cacheRequest, workspace);
315 }
316
317 /**
318 * {@inheritDoc}
319 *
320 * @see org.jboss.dna.graph.request.processor.RequestProcessor#process(org.jboss.dna.graph.request.CopyBranchRequest)
321 */
322 @Override
323 public void process( CopyBranchRequest request ) {
324 FederatedWorkspace fromWorkspace = getWorkspace(request, request.fromWorkspace());
325 if (fromWorkspace == null) return;
326 FederatedWorkspace intoWorkspace = getWorkspace(request, request.intoWorkspace());
327 if (intoWorkspace == null) return;
328 if (!fromWorkspace.equals(intoWorkspace)) {
329 // Otherwise there wasn't a single projection with a single path ...
330 String msg = FederationI18n.unableToPerformOperationSpanningWorkspaces.text(fromWorkspace.getName(),
331 intoWorkspace.getName());
332 request.setError(new UnsupportedRequestException(msg));
333 }
334
335 // Can push this down if and only if the entire request is within a single federated source ...
336 SingleProjection fromProjection = asSingleProjection(fromWorkspace, request.from(), request);
337 if (fromProjection == null) return;
338 SingleProjection intoProjection = asSingleProjection(intoWorkspace, request.into(), request);
339 if (intoProjection == null) return;
340 if (!intoProjection.projection.equals(fromProjection.projection)) {
341 // Otherwise there wasn't a single projection with a single path ...
342 String msg = FederationI18n.unableToPerformOperationUnlessLocationsAreFromSingleProjection.text(request.from(),
343 request.into(),
344 fromWorkspace.getName(),
345 fromProjection.projection.getRules(),
346 intoProjection.projection.getRules());
347 request.setError(new UnsupportedRequestException(msg));
348 }
349
350 // Push down the request ...
351 Location fromLocation = Location.create(fromProjection.pathInSource);
352 Location intoLocation = Location.create(intoProjection.pathInSource);
353 String workspaceName = fromProjection.projection.getWorkspaceName();
354 CopyBranchRequest sourceRequest = new CopyBranchRequest(fromLocation, workspaceName, intoLocation, workspaceName,
355 request.desiredName(), request.conflictBehavior());
356 execute(sourceRequest, fromProjection.projection);
357
358 // Copy/transform the results ...
359 if (sourceRequest.hasError()) {
360 request.setError(sourceRequest.getError());
361 } else {
362 request.setActualLocations(fromProjection.convertToRepository(sourceRequest.getActualLocationBefore()),
363 intoProjection.convertToRepository(sourceRequest.getActualLocationAfter()));
364 }
365
366 // Delete from the cache the parent of the new location ...
367 DeleteBranchRequest cacheRequest = new DeleteBranchRequest(request.into(), fromWorkspace.getCacheProjection()
368 .getWorkspaceName());
369 executeInCache(cacheRequest, fromWorkspace);
370 }
371
372 /**
373 * {@inheritDoc}
374 *
375 * @see org.jboss.dna.graph.request.processor.RequestProcessor#process(org.jboss.dna.graph.request.MoveBranchRequest)
376 */
377 @Override
378 public void process( MoveBranchRequest request ) {
379 FederatedWorkspace workspace = getWorkspace(request, request.inWorkspace());
380 if (workspace == null) return;
381
382 // Can push this down if and only if the entire request is within a single federated source ...
383 SingleProjection fromProjection = asSingleProjection(workspace, request.from(), request);
384 if (fromProjection == null) return;
385 SingleProjection intoProjection = asSingleProjection(workspace, request.into(), request);
386 if (intoProjection == null) return;
387 if (!intoProjection.projection.equals(fromProjection.projection)) {
388 // Otherwise there wasn't a single projection with a single path ...
389 String msg = FederationI18n.unableToPerformOperationUnlessLocationsAreFromSingleProjection.text(request.from(),
390 request.into(),
391 workspace.getName(),
392 fromProjection.projection.getRules(),
393 intoProjection.projection.getRules());
394 request.setError(new UnsupportedRequestException(msg));
395 }
396
397 // Push down the request ...
398 Location fromLocation = Location.create(fromProjection.pathInSource);
399 Location intoLocation = Location.create(intoProjection.pathInSource);
400 String workspaceName = fromProjection.projection.getWorkspaceName();
401 MoveBranchRequest sourceRequest = new MoveBranchRequest(fromLocation, intoLocation, workspaceName, request.desiredName(),
402 request.conflictBehavior());
403 execute(sourceRequest, fromProjection.projection);
404
405 // Copy/transform the results ...
406 if (sourceRequest.hasError()) {
407 request.setError(sourceRequest.getError());
408 } else {
409 request.setActualLocations(fromProjection.convertToRepository(sourceRequest.getActualLocationBefore()),
410 intoProjection.convertToRepository(sourceRequest.getActualLocationAfter()));
411 }
412 // Delete from the cache ...
413 DeleteBranchRequest cacheRequest = new DeleteBranchRequest(request.from(), workspace.getCacheProjection()
414 .getWorkspaceName());
415 executeInCache(cacheRequest, workspace);
416 }
417
418 /**
419 * {@inheritDoc}
420 *
421 * @see org.jboss.dna.graph.request.processor.RequestProcessor#process(org.jboss.dna.graph.request.UpdatePropertiesRequest)
422 */
423 @Override
424 public void process( UpdatePropertiesRequest request ) {
425 FederatedWorkspace workspace = getWorkspace(request, request.inWorkspace());
426 if (workspace == null) return;
427
428 // Can push this down if and only if the entire request is within a single federated source ...
429 SingleProjection projection = asSingleProjection(workspace, request.on(), request);
430 if (projection == null) return;
431
432 // Push down the request ...
433 Location sourceLocation = Location.create(projection.pathInSource);
434 String workspaceName = projection.projection.getWorkspaceName();
435 UpdatePropertiesRequest sourceRequest = new UpdatePropertiesRequest(sourceLocation, workspaceName, request.properties());
436 execute(sourceRequest, projection.projection);
437
438 // Copy/transform the results ...
439 if (sourceRequest.hasError()) {
440 request.setError(sourceRequest.getError());
441 } else {
442 request.setActualLocationOfNode(projection.convertToRepository(sourceRequest.getActualLocationOfNode()));
443 }
444
445 // Update the cache ...
446 UpdatePropertiesRequest cacheRequest = new UpdatePropertiesRequest(request.on(), workspace.getCacheProjection()
447 .getWorkspaceName(),
448 request.properties());
449 executeInCache(cacheRequest, workspace);
450 }
451
452 /**
453 * {@inheritDoc}
454 *
455 * @see org.jboss.dna.graph.request.processor.RequestProcessor#process(org.jboss.dna.graph.request.VerifyWorkspaceRequest)
456 */
457 @Override
458 public void process( VerifyWorkspaceRequest request ) {
459 FederatedWorkspace workspace = getWorkspace(request, request.workspaceName());
460 if (workspace != null) {
461 request.setActualWorkspaceName(workspace.getName());
462 Location root = Location.create(pathFactory.createRootPath());
463 ReadNodeRequest nodeInfo = getNode(root, workspace);
464 if (nodeInfo.hasError()) return;
465 request.setActualRootLocation(nodeInfo.getActualLocationOfNode());
466 }
467 }
468
469 /**
470 * {@inheritDoc}
471 *
472 * @see org.jboss.dna.graph.request.processor.RequestProcessor#process(org.jboss.dna.graph.request.GetWorkspacesRequest)
473 */
474 @Override
475 public void process( GetWorkspacesRequest request ) {
476 request.setAvailableWorkspaceNames(workspaces.keySet());
477 super.setCacheableInfo(request);
478 }
479
480 /**
481 * {@inheritDoc}
482 *
483 * @see org.jboss.dna.graph.request.processor.RequestProcessor#process(org.jboss.dna.graph.request.CreateWorkspaceRequest)
484 */
485 @Override
486 public void process( CreateWorkspaceRequest request ) {
487 throw new UnsupportedOperationException();
488 }
489
490 /**
491 * {@inheritDoc}
492 *
493 * @see org.jboss.dna.graph.request.processor.RequestProcessor#process(org.jboss.dna.graph.request.CloneWorkspaceRequest)
494 */
495 @Override
496 public void process( CloneWorkspaceRequest request ) {
497 throw new UnsupportedOperationException();
498 }
499
500 /**
501 * {@inheritDoc}
502 *
503 * @see org.jboss.dna.graph.request.processor.RequestProcessor#process(org.jboss.dna.graph.request.DestroyWorkspaceRequest)
504 */
505 @Override
506 public void process( DestroyWorkspaceRequest request ) {
507 throw new UnsupportedOperationException();
508 }
509
510 @Immutable
511 protected class SingleProjection {
512 protected final Projection projection;
513 protected final Path pathInSource;
514 protected final Location federatedLocation;
515
516 protected SingleProjection( Projection projection,
517 Path pathInSource,
518 Location federatedLocation ) {
519 this.projection = projection;
520 this.federatedLocation = federatedLocation;
521 this.pathInSource = pathInSource;
522 }
523
524 protected Location convertToRepository( Location sourceLocation ) {
525 assert sourceLocation != null;
526 if (sourceLocation.hasPath()) {
527 Set<Path> paths = projection.getPathsInRepository(sourceLocation.getPath(), pathFactory);
528 assert paths.size() == 1;
529 return sourceLocation.with(paths.iterator().next());
530 }
531 return sourceLocation;
532 }
533 }
534
535 protected SingleProjection asSingleProjection( FederatedWorkspace federatedWorkspace,
536 Location location,
537 Request request ) {
538 // Check the cache for this location ...
539 ReadNodeRequest nodeInfo = getNode(location, federatedWorkspace);
540 if (nodeInfo.hasError()) {
541 request.setError(nodeInfo.getError());
542 return null;
543 }
544 Location actualLocation = nodeInfo.getActualLocationOfNode();
545 Path pathInRepository = actualLocation.getPath();
546 assert pathInRepository != null;
547
548 // Get the merge plan for the node ...
549 MergePlan plan = getMergePlan(nodeInfo);
550 assert plan != null;
551 if (plan.getRealContributionCount() == 1) {
552 for (Contribution contribution : plan) {
553 if (contribution.isEmpty() || contribution.isPlaceholder()) continue;
554 for (Projection projection : federatedWorkspace.getProjectionsFor(contribution.getSourceName())) {
555 Set<Path> paths = projection.getPathsInSource(pathInRepository, pathFactory);
556 if (paths.size() == 1) {
557 return new SingleProjection(projection, paths.iterator().next(), actualLocation);
558 }
559 }
560 }
561 }
562
563 // Otherwise there wasn't a single projection with a single path ...
564 StringBuilder projections = new StringBuilder();
565 boolean first = true;
566 for (Contribution contribution : plan) {
567 if (contribution.isPlaceholder() || contribution.isEmpty()) continue;
568 if (first) first = false;
569 else projections.append(", ");
570 for (Projection projection : federatedWorkspace.getProjectionsFor(contribution.getSourceName())) {
571 Set<Path> paths = projection.getPathsInSource(pathInRepository, pathFactory);
572 if (paths.size() == 1) {
573 projections.append(FederationI18n.pathInProjection.text(paths.iterator().next(), projection.getRules()));
574 } else {
575 projections.append(FederationI18n.pathInProjection.text(paths, projection.getRules()));
576 }
577 }
578 }
579 String msg = FederationI18n.unableToPerformOperationUnlessLocationIsFromSingleProjection.text(location,
580 federatedWorkspace.getName(),
581 projections);
582 request.setError(new UnsupportedRequestException(msg));
583 return null;
584 }
585
586 protected void execute( Request request,
587 Projection projection ) {
588 RepositoryConnection connection = getConnection(projection);
589 connection.execute(getExecutionContext(), request);
590 // Don't need to close, as we'll close all connections when this processor is closed
591 }
592
593 protected void executeInCache( Request request,
594 FederatedWorkspace workspace ) {
595 RepositoryConnection connection = getConnectionToCacheFor(workspace);
596 connection.execute(getExecutionContext(), request);
597 // Don't need to close, as we'll close all connections when this processor is closed
598 }
599
600 /**
601 * Get the node information from the underlying sources or, if possible, from the cache.
602 *
603 * @param location the location of the node to be returned
604 * @param workspace the federated workspace configuration; may be null
605 * @return the node information
606 * @throws RepositorySourceException
607 */
608 protected ReadNodeRequest getNode( Location location,
609 FederatedWorkspace workspace ) throws RepositorySourceException {
610 // Check the cache first ...
611 ReadNodeRequest fromCache = new ReadNodeRequest(location, workspace.getCacheProjection().getWorkspaceName());
612 executeInCache(fromCache, workspace);
613
614 // Look at the cache results from the cache for problems, or if found a plan in the cache look
615 // at the contributions. We'll be putting together the set of source names for which we need to
616 // get the contributions.
617 Set<String> sourceNames = null;
618 List<Contribution> contributions = new LinkedList<Contribution>();
619
620 if (fromCache.hasError()) {
621 Throwable error = fromCache.getError();
622 if (!(error instanceof PathNotFoundException)) return fromCache;
623
624 // The path was not found in the cache, so since we don't know whether the ancestors are federated
625 // from multiple source nodes, we need to populate the cache starting with the lowest ancestor
626 // that already exists in the cache.
627 PathNotFoundException notFound = (PathNotFoundException)fromCache.getError();
628 Path lowestExistingAncestor = notFound.getLowestAncestorThatDoesExist();
629 if (location.hasPath()) {
630 Path path = location.getPath();
631 Path ancestor = path.getParent();
632 if (!ancestor.equals(lowestExistingAncestor)) {
633 // Load the nodes along the path below the existing ancestor, down to (but excluding) the desired path
634 Path pathToLoad = ancestor;
635 while (!pathToLoad.equals(lowestExistingAncestor)) {
636 Location locationToLoad = Location.create(pathToLoad);
637 loadContributionsFromSources(locationToLoad, workspace, null, contributions); // sourceNames may be
638 // null or empty
639 FederatedNode mergedNode = createFederatedNode(locationToLoad, workspace, fromCache, contributions, true);
640 if (mergedNode == null) {
641 // No source had a contribution ...
642 I18n msg = FederationI18n.nodeDoesNotExistAtPath;
643 fromCache.setError(new PathNotFoundException(location, ancestor, msg.text(path, ancestor)));
644 return fromCache;
645 }
646 MergePlan mergePlan = mergedNode.getMergePlan();
647 if (mergePlan != null) {
648 Property mergePlanProperty = getExecutionContext().getPropertyFactory().create(DnaLexicon.MERGE_PLAN,
649 (Object)mergePlan);
650 fromCache.addProperty(mergePlanProperty);
651 }
652 contributions.clear();
653 // Move to the next child along the path ...
654 pathToLoad = pathToLoad.getParent();
655 }
656 }
657
658 }
659
660 // At this point, all ancestors exist ...
661 } else {
662 // There is no error, so look for the merge plan ...
663 MergePlan mergePlan = getMergePlan(fromCache);
664 if (mergePlan != null) {
665 // We found the merge plan, so check whether it's still valid ...
666 final DateTime now = getCurrentTimeInUtc();
667 if (!mergePlan.isExpired(now)) {
668 // It is still valid, so check whether any contribution is from a non-existant projection ...
669 for (Contribution contribution : mergePlan) {
670 if (!workspace.contains(contribution.getSourceName(), contribution.getWorkspaceName())) {
671 // TODO: Record that the cached contribution is from a source that is no longer in this repository
672 }
673 }
674 return fromCache;
675 }
676
677 // At least one of the contributions is expired, so go through the contributions and place
678 // the valid contributions in the 'contributions' list; any expired contribution
679 // needs to be loaded by adding the name to the 'sourceNames'
680 if (mergePlan.getContributionCount() > 0) {
681 sourceNames = new HashSet<String>();
682 for (Contribution contribution : mergePlan) {
683 if (contribution.isExpired(now)) {
684 sourceNames.add(contribution.getSourceName());
685 contributions.add(contribution);
686 }
687 }
688 }
689 }
690 }
691
692 // Get the contributions from the sources given their names ...
693 location = fromCache.getActualLocationOfNode();
694 if (location == null) {
695 // Not yet in the cache ...
696 location = fromCache.at();
697 }
698 loadContributionsFromSources(location, workspace, sourceNames, contributions); // sourceNames may be null or empty
699 FederatedNode mergedNode = createFederatedNode(location, workspace, fromCache, contributions, true);
700 if (mergedNode == null) {
701 // No source had a contribution ...
702 if (location.hasPath()) {
703 Path ancestor = location.getPath().getParent();
704 I18n msg = FederationI18n.nodeDoesNotExistAtPath;
705 fromCache.setError(new PathNotFoundException(location, ancestor, msg.text(location, ancestor)));
706 return fromCache;
707 }
708 I18n msg = FederationI18n.nodeDoesNotExistAtLocation;
709 fromCache.setError(new PathNotFoundException(location, null, msg.text(location)));
710 return fromCache;
711 }
712 return mergedNode;
713 }
714
715 protected FederatedNode createFederatedNode( Location location,
716 FederatedWorkspace federatedWorkspace,
717 ReadNodeRequest fromCache,
718 List<Contribution> contributions,
719 boolean updateCache ) throws RepositorySourceException {
720 assert location != null;
721
722 // If there are no contributions from any source ...
723 boolean foundNonEmptyContribution = false;
724 for (Contribution contribution : contributions) {
725 assert contribution != null;
726 if (!contribution.isEmpty()) {
727 foundNonEmptyContribution = true;
728 break;
729 }
730 }
731 if (!foundNonEmptyContribution) return null;
732 if (logger.isTraceEnabled()) {
733 logger.trace("Loaded {0} from sources, resulting in these contributions:", location);
734 int i = 0;
735 for (Contribution contribution : contributions) {
736 logger.trace(" {0} {1}", ++i, contribution);
737 }
738 }
739
740 // Create the node, and use the existing UUID if one is found in the cache ...
741 ExecutionContext context = getExecutionContext();
742 assert context != null;
743 FederatedNode mergedNode = new FederatedNode(location, federatedWorkspace.getName());
744
745 // Merge the results into a single set of results ...
746 assert contributions.size() > 0;
747 federatedWorkspace.getMergingStrategy().merge(mergedNode, contributions, context);
748 if (mergedNode.getCachePolicy() == null) {
749 mergedNode.setCachePolicy(federatedWorkspace.getCachePolicy());
750 }
751 if (updateCache) {
752 // Place the results into the cache ...
753 updateCache(federatedWorkspace, mergedNode, fromCache);
754 }
755 // And return the results ...
756 return mergedNode;
757 }
758
759 /**
760 * Load the node at the supplied location from the sources with the supplied name, returning the information. This method
761 * always obtains the information from the sources and does not use or update the cache.
762 *
763 * @param location the location of the node that is to be loaded
764 * @param federatedWorkspace the federated workspace
765 * @param sourceNames the names of the sources from which contributions are to be loaded; may be empty or null if all
766 * contributions from all sources are to be loaded
767 * @param contributions the list into which the contributions are to be placed
768 * @throws RepositorySourceException
769 */
770 protected void loadContributionsFromSources( Location location,
771 FederatedWorkspace federatedWorkspace,
772 Set<String> sourceNames,
773 List<Contribution> contributions ) throws RepositorySourceException {
774 // At this point, there is no merge plan, so read information from the sources ...
775 final ExecutionContext context = getExecutionContext();
776
777 CachePolicy cachePolicy = federatedWorkspace.getCachePolicy();
778 // If the location has no path, then we have to submit a request to ALL sources ...
779 if (!location.hasPath()) {
780 for (Projection projection : federatedWorkspace.getSourceProjections()) {
781 final String source = projection.getSourceName();
782 final String workspace = projection.getSourceName();
783 if (sourceNames != null && !sourceNames.contains(source)) continue;
784 final RepositoryConnection sourceConnection = getConnection(projection);
785 if (sourceConnection == null) continue; // No source exists by this name
786 // Submit the request ...
787 ReadNodeRequest request = new ReadNodeRequest(location, federatedWorkspace.getName());
788 sourceConnection.execute(context, request);
789 if (request.hasError()) continue;
790
791 // Figure out how long we can cache this contribution ...
792 long minimumTimeToLive = Long.MAX_VALUE;
793 if (cachePolicy != null) {
794 minimumTimeToLive = Math.min(minimumTimeToLive, cachePolicy.getTimeToLive());
795 }
796 CachePolicy requestCachePolicy = request.getCachePolicy();
797 if (requestCachePolicy != null) {
798 minimumTimeToLive = Math.min(minimumTimeToLive, requestCachePolicy.getTimeToLive());
799 } else {
800 // See if the source has a default policy ...
801 CachePolicy sourceCachePolicy = sourceConnection.getDefaultCachePolicy();
802 if (sourceCachePolicy != null) {
803 minimumTimeToLive = Math.min(minimumTimeToLive, sourceCachePolicy.getTimeToLive());
804 }
805 }
806 // The expiration time should be the smallest of the minimum TTL values ...
807 DateTime expirationTime = null;
808 if (minimumTimeToLive < Long.MAX_VALUE) {
809 expirationTime = getCurrentTimeInUtc().plus(minimumTimeToLive, TimeUnit.MILLISECONDS);
810 }
811
812 // Convert the locations of the children (relative to the source) to be relative to this node
813 Contribution contribution = Contribution.create(source,
814 workspace,
815 request.getActualLocationOfNode(),
816 expirationTime,
817 request.getProperties(),
818 request.getChildren());
819 contributions.add(contribution);
820 }
821 }
822
823 // Otherwise, we can do it by path and projections ...
824 Path path = location.getPath();
825 for (Projection projection : federatedWorkspace.getSourceProjections()) {
826 final String source = projection.getSourceName();
827 final String workspace = projection.getWorkspaceName();
828 if (sourceNames != null && !sourceNames.contains(source)) continue;
829 final RepositoryConnection sourceConnection = getConnection(projection);
830 if (sourceConnection == null) continue; // No source exists by this name
831 // Get the cached information ...
832 DateTime expirationTime = null;
833 if (cachePolicy != null) {
834 expirationTime = getCurrentTimeInUtc().plus(cachePolicy.getTimeToLive(), TimeUnit.MILLISECONDS);
835 }
836 // Get the paths-in-source where we should fetch node contributions ...
837 Set<Path> pathsInSource = projection.getPathsInSource(path, pathFactory);
838 if (pathsInSource.isEmpty()) {
839 // The source has no contributions, but see whether the project exists BELOW this path.
840 // We do this by getting the top-level repository paths of the projection, and then
841 // use those to figure out the children of the nodes.
842 Contribution contribution = null;
843 List<Path> topLevelPaths = projection.getTopLevelPathsInRepository(pathFactory);
844 Location input = Location.create(path);
845 switch (topLevelPaths.size()) {
846 case 0:
847 break;
848 case 1: {
849 Path topLevelPath = topLevelPaths.iterator().next();
850 if (path.isAncestorOf(topLevelPath)) {
851 Location child = Location.create(topLevelPath);
852 contribution = Contribution.createPlaceholder(source, workspace, input, expirationTime, child);
853 }
854 break;
855 }
856 default: {
857 // We assume that the top-level paths do not overlap ...
858 List<Location> children = new ArrayList<Location>(topLevelPaths.size());
859 for (Path topLevelPath : topLevelPaths) {
860 if (path.isAncestorOf(topLevelPath)) {
861 children.add(Location.create(topLevelPath));
862 }
863 }
864 if (children.size() > 0) {
865 contribution = Contribution.createPlaceholder(source, workspace, input, expirationTime, children);
866 }
867 }
868 }
869 if (contribution == null) contribution = Contribution.create(source, workspace, expirationTime);
870 contributions.add(contribution);
871 } else {
872 // There is at least one (real) contribution ...
873
874 // Get the contributions ...
875 final int numPaths = pathsInSource.size();
876 if (numPaths == 1) {
877 Path pathInSource = pathsInSource.iterator().next();
878 ReadNodeRequest fromSource = new ReadNodeRequest(Location.create(pathInSource), workspace);
879 sourceConnection.execute(getExecutionContext(), fromSource);
880 if (!fromSource.hasError()) {
881 Collection<Property> properties = fromSource.getProperties();
882 List<Location> children = fromSource.getChildren();
883
884 // Figure out how long we can cache this contribution ...
885 long minimumTimeToLive = Long.MAX_VALUE;
886 if (cachePolicy != null) {
887 minimumTimeToLive = Math.min(minimumTimeToLive, cachePolicy.getTimeToLive());
888 }
889 CachePolicy requestCachePolicy = fromSource.getCachePolicy();
890 if (requestCachePolicy != null) {
891 minimumTimeToLive = Math.min(minimumTimeToLive, requestCachePolicy.getTimeToLive());
892 } else {
893 // See if the source has a default policy ...
894 CachePolicy sourceCachePolicy = sourceConnection.getDefaultCachePolicy();
895 if (sourceCachePolicy != null) {
896 minimumTimeToLive = Math.min(minimumTimeToLive, sourceCachePolicy.getTimeToLive());
897 }
898 }
899 // The expiration time should be the smallest of the minimum TTL values ...
900 expirationTime = null;
901 if (minimumTimeToLive < Long.MAX_VALUE) {
902 expirationTime = getCurrentTimeInUtc().plus(minimumTimeToLive, TimeUnit.MILLISECONDS);
903 }
904
905 Location actualLocation = fromSource.getActualLocationOfNode();
906 Contribution contribution = Contribution.create(source,
907 workspace,
908 actualLocation,
909 expirationTime,
910 properties,
911 children);
912 contributions.add(contribution);
913 }
914 } else {
915 List<Request> fromSourceCommands = new ArrayList<Request>(numPaths);
916 for (Path pathInSource : pathsInSource) {
917 fromSourceCommands.add(new ReadNodeRequest(Location.create(pathInSource), workspace));
918 }
919 Request request = CompositeRequest.with(fromSourceCommands);
920 sourceConnection.execute(context, request);
921 for (Request requestObj : fromSourceCommands) {
922 ReadNodeRequest fromSource = (ReadNodeRequest)requestObj;
923 if (fromSource.hasError()) continue;
924
925 // Figure out how long we can cache this contribution ...
926 long minimumTimeToLive = Long.MAX_VALUE;
927 if (cachePolicy != null) {
928 minimumTimeToLive = Math.min(minimumTimeToLive, cachePolicy.getTimeToLive());
929 }
930 CachePolicy requestCachePolicy = fromSource.getCachePolicy();
931 if (requestCachePolicy != null) {
932 minimumTimeToLive = Math.min(minimumTimeToLive, requestCachePolicy.getTimeToLive());
933 } else {
934 // See if the source has a default policy ...
935 CachePolicy sourceCachePolicy = sourceConnection.getDefaultCachePolicy();
936 if (sourceCachePolicy != null) {
937 minimumTimeToLive = Math.min(minimumTimeToLive, sourceCachePolicy.getTimeToLive());
938 }
939 }
940 // The expiration time should be the smallest of the minimum TTL values ...
941 expirationTime = null;
942 if (minimumTimeToLive < Long.MAX_VALUE) {
943 expirationTime = getCurrentTimeInUtc().plus(minimumTimeToLive, TimeUnit.MILLISECONDS);
944 }
945
946 List<Location> children = fromSource.getChildren();
947 Contribution contribution = Contribution.create(source,
948 workspace,
949 fromSource.getActualLocationOfNode(),
950 expirationTime,
951 fromSource.getProperties(),
952 children);
953 contributions.add(contribution);
954 }
955 }
956 }
957 }
958 }
959
960 protected MergePlan getMergePlan( ReadNodeRequest request ) {
961 Property mergePlanProperty = request.getPropertiesByName().get(DnaLexicon.MERGE_PLAN);
962 if (mergePlanProperty == null || mergePlanProperty.isEmpty()) {
963 return null;
964 }
965 Object value = mergePlanProperty.getValues().next();
966 return value instanceof MergePlan ? (MergePlan)value : null;
967 }
968
969 protected void updateCache( FederatedWorkspace federatedWorkspace,
970 FederatedNode mergedNode,
971 ReadNodeRequest fromCache ) throws RepositorySourceException {
972 final ExecutionContext context = getExecutionContext();
973 final Location location = mergedNode.at();
974 final Path path = location.getPath();
975 final String cacheWorkspace = federatedWorkspace.getCacheProjection().getWorkspaceName();
976 assert path != null;
977 List<Request> requests = new ArrayList<Request>();
978 Name childName = null;
979
980 // If the merged node has a merge plan, then add it to the properties if it is not already there ...
981 Map<Name, Property> properties = mergedNode.getPropertiesByName();
982 MergePlan mergePlan = mergedNode.getMergePlan();
983 if (mergePlan != null && !properties.containsKey(DnaLexicon.MERGE_PLAN)) {
984 // Record the merge plan on the merged node ...
985 Property mergePlanProperty = getExecutionContext().getPropertyFactory().create(DnaLexicon.MERGE_PLAN,
986 (Object)mergePlan);
987 properties.put(mergePlanProperty.getName(), mergePlanProperty);
988 }
989
990 // Make sure the UUID is being stored ...
991 PropertyFactory propertyFactory = getExecutionContext().getPropertyFactory();
992 Property uuidProperty = properties.get(DnaLexicon.UUID);
993 if (uuidProperty == null) uuidProperty = properties.get(JcrLexicon.UUID);
994 if (uuidProperty == null) {
995 UUID uuid = mergedNode.at().getUuid();
996 if (uuid == null) uuid = UUID.randomUUID();
997 uuidProperty = propertyFactory.create(DnaLexicon.UUID, uuid);
998 properties.put(uuidProperty.getName(), uuidProperty);
999 }
1000
1001 // Have the children changed ...
1002 if (mergedNode.hasError() && !path.isRoot()) {
1003 // This is not the root node, so we need to create the node (or replace it if it exists) ...
1004 final Location parentLocation = Location.create(path.getParent());
1005 childName = path.getLastSegment().getName();
1006 requests.add(new CreateNodeRequest(parentLocation, cacheWorkspace, childName, NodeConflictBehavior.REPLACE,
1007 mergedNode.getProperties()));
1008 // logger.trace("Adding {0} to cache with properties {1}", location, properties);
1009 // Now create all of the children that this federated node knows of ...
1010 for (Location child : mergedNode.getChildren()) {
1011 childName = child.getPath().getLastSegment().getName();
1012 requests.add(new CreateNodeRequest(location, cacheWorkspace, childName, NodeConflictBehavior.APPEND, child));
1013 // logger.trace("Caching child of {0} named {1}", location, childName);
1014 }
1015 } else if (fromCache.getChildren().equals(mergedNode.getChildren())) {
1016 // Just update the properties ...
1017 requests.add(new UpdatePropertiesRequest(location, cacheWorkspace, properties));
1018 // logger.trace("Updating cached properties on the root to {0}", properties);
1019 } else {
1020 // The children have changed, so figure out how ...
1021 if (fromCache.getChildren().isEmpty()) {
1022 // No children in the cache, so just update the properties of the node ...
1023 requests.add(new UpdatePropertiesRequest(location, cacheWorkspace, properties));
1024 // logger.trace("Updating cached properties on {0} to {1}", location, properties);
1025
1026 // And create all of the children that this federated node knows of ...
1027 for (Location child : mergedNode.getChildren()) {
1028 childName = child.getPath().getLastSegment().getName();
1029 requests.add(new CreateNodeRequest(location, cacheWorkspace, childName, NodeConflictBehavior.APPEND, child));
1030 // logger.trace("Caching child of {0} named {1}", location, childName);
1031 }
1032 } else if (mergedNode.getChildren().isEmpty()) {
1033 // There were children in the cache but not in the merged node, so update the cached properties
1034 requests.add(new UpdatePropertiesRequest(location, cacheWorkspace, properties));
1035
1036 // and delete all the children ...
1037 for (Location child : fromCache.getChildren()) {
1038 requests.add(new DeleteBranchRequest(child, cacheWorkspace));
1039 // logger.trace("Removing {0} from cache", child);
1040 }
1041 } else {
1042 // There were children in the cache and in the merged node. The easy way is to just remove the
1043 // branch from the cache, the create it again ...
1044 if (path.isRoot()) {
1045 requests.add(new UpdatePropertiesRequest(location, cacheWorkspace, properties));
1046 // logger.trace("Updating cached properties on {0} to {1}", location, properties);
1047
1048 // and delete all the children ...
1049 for (Location child : fromCache.getChildren()) {
1050 requests.add(new DeleteBranchRequest(child, cacheWorkspace));
1051 // logger.trace("Removing child node {0} from cache", child);
1052 }
1053
1054 // Now create all of the children that this federated node knows of ...
1055 for (Location child : mergedNode.getChildren()) {
1056 childName = child.getPath().getLastSegment().getName();
1057 requests.add(new CreateNodeRequest(location, cacheWorkspace, childName, NodeConflictBehavior.APPEND,
1058 child));
1059 // logger.trace("Caching child of {0} named {1}", location, childName);
1060 }
1061 } else {
1062 requests.add(new DeleteBranchRequest(location, cacheWorkspace));
1063 // logger.trace("Replacing node {0} from cache", location);
1064
1065 // This is not the root node, so we need to create the node (or replace it if it exists) ...
1066 final Location parentLocation = Location.create(path.getParent());
1067 childName = path.getLastSegment().getName();
1068 requests.add(new CreateNodeRequest(parentLocation, cacheWorkspace, childName, NodeConflictBehavior.REPLACE,
1069 mergedNode.getProperties()));
1070 // logger.trace("Adding {0} to cache with properties {1}", location, properties);
1071 // Now create all of the children that this federated node knows of ...
1072 for (Location child : mergedNode.getChildren()) {
1073 childName = child.getPath().getLastSegment().getName();
1074 requests.add(new CreateNodeRequest(location, cacheWorkspace, childName, NodeConflictBehavior.APPEND,
1075 child));
1076 // logger.trace("Caching child of {0} named {1}", location, childName);
1077 }
1078 }
1079 }
1080 }
1081
1082 // Execute all the requests ...
1083 final RepositoryConnection cacheConnection = getConnectionToCacheFor(federatedWorkspace);
1084 cacheConnection.execute(context, CompositeRequest.with(requests));
1085
1086 // If the children did not have UUIDs, then find the actual locations for each of the cached nodes ...
1087 if (requests.size() > 1) {
1088 Iterator<Request> requestIter = requests.iterator();
1089 requestIter.next(); // Skip the first request, which creates/updates the node (we want children)
1090 List<Location> children = mergedNode.getChildren();
1091 for (int i = 0; i != children.size(); ++i) {
1092 Request request = requestIter.next();
1093 while (!(request instanceof CreateNodeRequest)) { // skip non-create requests
1094 request = requestIter.next();
1095 }
1096 Location actual = ((CreateNodeRequest)request).getActualLocationOfNode();
1097 Location child = children.get(i);
1098 if (!child.hasIdProperties()) {
1099 assert child.getPath().equals(actual.getPath());
1100 children.set(i, actual);
1101 }
1102 }
1103 }
1104 }
1105 }