1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 package org.modeshape.graph.connector.federation;
25
26 import java.util.Enumeration;
27 import java.util.HashMap;
28 import java.util.Hashtable;
29 import java.util.LinkedList;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.concurrent.ExecutorService;
33 import java.util.concurrent.Executors;
34 import java.util.concurrent.TimeUnit;
35 import javax.naming.Context;
36 import javax.naming.Name;
37 import javax.naming.RefAddr;
38 import javax.naming.Reference;
39 import javax.naming.StringRefAddr;
40 import javax.naming.spi.ObjectFactory;
41 import net.jcip.annotations.GuardedBy;
42 import net.jcip.annotations.ThreadSafe;
43 import org.modeshape.common.i18n.I18n;
44 import org.modeshape.common.util.CheckArg;
45 import org.modeshape.common.util.HashCode;
46 import org.modeshape.common.util.NamedThreadFactory;
47 import org.modeshape.graph.ExecutionContext;
48 import org.modeshape.graph.GraphI18n;
49 import org.modeshape.graph.Location;
50 import org.modeshape.graph.ModeShapeLexicon;
51 import org.modeshape.graph.Node;
52 import org.modeshape.graph.Subgraph;
53 import org.modeshape.graph.SubgraphNode;
54 import org.modeshape.graph.cache.BasicCachePolicy;
55 import org.modeshape.graph.cache.CachePolicy;
56 import org.modeshape.graph.connector.RepositoryConnection;
57 import org.modeshape.graph.connector.RepositoryConnectionFactory;
58 import org.modeshape.graph.connector.RepositoryContext;
59 import org.modeshape.graph.connector.RepositorySource;
60 import org.modeshape.graph.connector.RepositorySourceCapabilities;
61 import org.modeshape.graph.connector.RepositorySourceException;
62 import org.modeshape.graph.observe.Observer;
63 import org.modeshape.graph.property.NamespaceRegistry;
64 import org.modeshape.graph.property.Path;
65 import org.modeshape.graph.property.Property;
66 import org.modeshape.graph.property.ValueFactories;
67 import org.modeshape.graph.property.ValueFactory;
68
69
70
71
72 @ThreadSafe
73 public class FederatedRepositorySource implements RepositorySource, ObjectFactory {
74
75
76
77
78 public static final int DEFAULT_RETRY_LIMIT = 0;
79
80 protected static final String SOURCE_NAME = "sourceName";
81 protected static final String RETRY_LIMIT = "retryLimit";
82
83 private static final long serialVersionUID = 1L;
84
85 private volatile String name;
86 private volatile int retryLimit;
87 private volatile RepositorySourceCapabilities capabilities = new RepositorySourceCapabilities(true, true, false, false, true);
88 private volatile transient FederatedRepository configuration;
89 private volatile transient RepositoryContext context;
90
91
92
93
94 public FederatedRepositorySource() {
95 }
96
97
98
99
100
101
102 public String getName() {
103 return name;
104 }
105
106
107
108
109 public synchronized void setName( String name ) {
110 if (this.name == name || this.name != null && this.name.equals(name)) return;
111 this.name = name;
112 changeConfiguration();
113 }
114
115
116
117
118
119
120 public int getRetryLimit() {
121 return retryLimit;
122 }
123
124
125
126
127
128
129 public synchronized void setRetryLimit( int limit ) {
130 retryLimit = limit < 0 ? 0 : limit;
131 changeConfiguration();
132 }
133
134
135
136
137
138
139 public RepositorySourceCapabilities getCapabilities() {
140 return capabilities;
141 }
142
143
144
145
146
147
148 public synchronized void initialize( RepositoryContext context ) throws RepositorySourceException {
149 this.context = context;
150 changeConfiguration();
151 }
152
153
154
155
156
157
158
159 return context;
160 }
161
162
163
164
165
166
167 public RepositoryConnection getConnection() throws RepositorySourceException {
168 FederatedRepository config = this.configuration;
169 if (config == null) {
170 synchronized (this) {
171 if (this.configuration == null) {
172
173 String name = getName();
174 if (name == null) {
175 I18n msg = GraphI18n.namePropertyIsRequiredForFederatedRepositorySource;
176 throw new RepositorySourceException(getName(), msg.text("name"));
177 }
178 RepositoryContext repositoryContext = getRepositoryContext();
179 if (repositoryContext == null) {
180 I18n msg = GraphI18n.federatedRepositorySourceMustBeInitialized;
181 throw new RepositorySourceException(getName(), msg.text("name", name));
182 }
183
184
185 this.configuration = loadRepository(name, repositoryContext);
186 }
187 config = this.configuration;
188 }
189 }
190 Observer observer = this.context != null ? this.context.getObserver() : null;
191 return new FederatedRepositoryConnection(config, observer);
192 }
193
194
195
196
197
198
199 public void close() {
200 synchronized (this) {
201 if (this.configuration != null) {
202
203 if (this.configuration.getExecutor() != null) {
204 this.configuration.getExecutor().shutdown();
205 }
206 this.configuration = null;
207 }
208 }
209 }
210
211
212
213
214
215
216 public Reference getReference() {
217 String className = getClass().getName();
218 String factoryClassName = this.getClass().getName();
219 Reference ref = new Reference(className, factoryClassName, null);
220
221 ref.add(new StringRefAddr(SOURCE_NAME, getName()));
222 ref.add(new StringRefAddr(RETRY_LIMIT, Integer.toString(getRetryLimit())));
223 return ref;
224 }
225
226
227
228
229
230
231
232 public Object getObjectInstance( Object obj,
233 Name name,
234 Context nameCtx,
235 Hashtable<?, ?> environment ) throws Exception {
236 if (obj instanceof Reference) {
237 Map<String, String> values = new HashMap<String, String>();
238 Reference ref = (Reference)obj;
239 Enumeration<?> en = ref.getAll();
240 while (en.hasMoreElements()) {
241 RefAddr subref = (RefAddr)en.nextElement();
242 if (subref instanceof StringRefAddr) {
243 String key = subref.getType();
244 Object value = subref.getContent();
245 if (value != null) values.put(key, value.toString());
246 }
247 }
248 String sourceName = values.get(SOURCE_NAME);
249 String retryLimit = values.get(RETRY_LIMIT);
250
251
252 FederatedRepositorySource source = new FederatedRepositorySource();
253 if (sourceName != null) source.setName(sourceName);
254 if (retryLimit != null) source.setRetryLimit(Integer.parseInt(retryLimit));
255 return source;
256 }
257 return null;
258 }
259
260
261
262
263 @Override
264 public int hashCode() {
265 return HashCode.compute(getName());
266 }
267
268
269
270
271 @Override
272 public boolean equals( Object obj ) {
273 if (obj == this) return true;
274 if (obj instanceof FederatedRepositorySource) {
275 FederatedRepositorySource that = (FederatedRepositorySource)obj;
276
277 if (this.getName() == null) {
278 if (that.getName() != null) return false;
279 } else {
280 if (!this.getName().equals(that.getName())) return false;
281 }
282 return true;
283 }
284 return false;
285 }
286
287
288
289
290 @GuardedBy( "this" )
291 protected void changeConfiguration() {
292 this.configuration = null;
293 }
294
295
296
297
298
299
300
301
302
303
304
305 protected FederatedRepository loadRepository( String name,
306 RepositoryContext repositoryContext ) throws RepositorySourceException {
307
308 ExecutionContext executionContext = repositoryContext.getExecutionContext();
309 RepositoryConnectionFactory connectionFactory = repositoryContext.getRepositoryConnectionFactory();
310 ValueFactories valueFactories = executionContext.getValueFactories();
311 ValueFactory<String> strings = valueFactories.getStringFactory();
312 ValueFactory<Long> longs = valueFactories.getLongFactory();
313 ProjectionParser projectionParser = ProjectionParser.getInstance();
314 NamespaceRegistry registry = executionContext.getNamespaceRegistry();
315
316 try {
317
318
319
320
321
322
323
324 Subgraph repositories = repositoryContext.getConfiguration(5);
325
326
327 String defaultWorkspaceName = null;
328 Property defaultWorkspaceNameProperty = repositories.getRoot().getProperty(ModeShapeLexicon.DEFAULT_WORKSPACE_NAME);
329 if (defaultWorkspaceNameProperty != null) {
330
331 defaultWorkspaceName = strings.create(defaultWorkspaceNameProperty.getFirstValue());
332 }
333
334
335 CachePolicy defaultCachePolicy = null;
336 Property timeToExpire = repositories.getRoot().getProperty(ModeShapeLexicon.TIME_TO_EXPIRE);
337 if (timeToExpire != null && !timeToExpire.isEmpty()) {
338 long timeToCacheInMillis = longs.create(timeToExpire.getFirstValue());
339 defaultCachePolicy = new BasicCachePolicy(timeToCacheInMillis, TimeUnit.MILLISECONDS).getUnmodifiable();
340 }
341
342
343 Node workspacesNode = repositories.getNode(ModeShapeLexicon.WORKSPACES);
344 if (workspacesNode == null) {
345 I18n msg = GraphI18n.requiredNodeDoesNotExistRelativeToNode;
346 throw new RepositorySourceException(msg.text(ModeShapeLexicon.WORKSPACES.getString(registry),
347 repositories.getLocation().getPath().getString(registry),
348 repositories.getGraph().getCurrentWorkspaceName(),
349 repositories.getGraph().getSourceName()));
350 }
351
352
353 LinkedList<FederatedWorkspace> workspaces = new LinkedList<FederatedWorkspace>();
354 for (Location workspace : workspacesNode) {
355
356
357 String workspaceName = null;
358 SubgraphNode workspaceNode = repositories.getNode(workspace);
359 Property workspaceNameProperty = workspaceNode.getProperty(ModeShapeLexicon.WORKSPACE_NAME);
360 if (workspaceNameProperty != null) {
361
362 workspaceName = strings.create(workspaceNameProperty.getFirstValue());
363 }
364 if (workspaceName == null) {
365
366 workspaceName = workspace.getPath().getLastSegment().getName().getLocalName();
367 }
368
369
370 Node projectionsNode = workspaceNode.getNode(ModeShapeLexicon.PROJECTIONS);
371 if (projectionsNode == null) {
372 I18n msg = GraphI18n.requiredNodeDoesNotExistRelativeToNode;
373 throw new RepositorySourceException(getName(), msg.text(ModeShapeLexicon.PROJECTIONS.getString(registry),
374 workspaceNode.getLocation()
375 .getPath()
376 .getString(registry),
377 repositories.getGraph().getCurrentWorkspaceName(),
378 repositories.getGraph().getSourceName()));
379 }
380
381
382 List<Projection> sourceProjections = new LinkedList<Projection>();
383 for (Location projection : projectionsNode) {
384 Node projectionNode = repositories.getNode(projection);
385 sourceProjections.add(createProjection(executionContext, projectionParser, projectionNode));
386 }
387
388
389 FederatedWorkspace space = new FederatedWorkspace(repositoryContext, name, workspaceName, sourceProjections,
390 defaultCachePolicy);
391 if (workspaceName.equals(defaultWorkspaceName)) {
392 workspaces.addFirst(space);
393 } else {
394 workspaces.add(space);
395 }
396 }
397
398
399 ExecutorService executor = Executors.newCachedThreadPool(new NamedThreadFactory(name));
400
401 return new FederatedRepository(name, connectionFactory, workspaces, defaultCachePolicy, executor);
402 } catch (RepositorySourceException t) {
403 throw t;
404 } catch (Throwable t) {
405 I18n msg = GraphI18n.errorReadingConfigurationForFederatedRepositorySource;
406 throw new RepositorySourceException(getName(), msg.text(name), t);
407 }
408 }
409
410
411
412
413
414
415
416
417
418
419
420 public synchronized FederatedWorkspace addWorkspace( String workspaceName,
421 Iterable<Projection> projections,
422 boolean isDefault ) {
423 CheckArg.isNotNull(workspaceName, "workspaceName");
424 CheckArg.isNotNull(projections, "projections");
425
426
427 String name = getName();
428 if (name == null) {
429 I18n msg = GraphI18n.namePropertyIsRequiredForFederatedRepositorySource;
430 throw new RepositorySourceException(getName(), msg.text("name"));
431 }
432 RepositoryContext context = getRepositoryContext();
433 if (context == null) {
434 I18n msg = GraphI18n.federatedRepositorySourceMustBeInitialized;
435 throw new RepositorySourceException(getName(), msg.text("name", name));
436 }
437
438
439 RepositoryConnectionFactory connectionFactory = null;
440 ExecutorService executor = null;
441 LinkedList<FederatedWorkspace> workspaces = new LinkedList<FederatedWorkspace>();
442 CachePolicy defaultCachePolicy = null;
443 if (this.configuration != null) {
444 connectionFactory = this.configuration.getConnectionFactory();
445 executor = this.configuration.getExecutor();
446 defaultCachePolicy = this.configuration.getDefaultCachePolicy();
447 for (String existingWorkspaceName : this.configuration.getWorkspaceNames()) {
448 if (existingWorkspaceName.equals(workspaceName)) continue;
449 workspaces.add(this.configuration.getWorkspace(existingWorkspaceName));
450 }
451 } else {
452 connectionFactory = context.getRepositoryConnectionFactory();
453 executor = Executors.newCachedThreadPool(new NamedThreadFactory(name));
454 }
455
456
457 FederatedWorkspace newWorkspace = new FederatedWorkspace(context, name, workspaceName, projections, defaultCachePolicy);
458 if (isDefault) {
459 workspaces.addFirst(newWorkspace);
460 } else {
461 workspaces.add(newWorkspace);
462 }
463
464 this.configuration = new FederatedRepository(name, connectionFactory, workspaces, defaultCachePolicy, executor);
465 return newWorkspace;
466 }
467
468
469
470
471
472
473
474
475 public synchronized boolean removeWorkspace( String workspaceName ) {
476 CheckArg.isNotNull(workspaceName, "workspaceName");
477 if (this.configuration == null) return false;
478 FederatedWorkspace workspace = this.configuration.getWorkspace(workspaceName);
479 if (workspace == null) return false;
480 List<FederatedWorkspace> workspaces = new LinkedList<FederatedWorkspace>();
481 for (String existingWorkspaceName : this.configuration.getWorkspaceNames()) {
482 if (existingWorkspaceName.equals(workspaceName)) continue;
483 workspaces.add(this.configuration.getWorkspace(existingWorkspaceName));
484 }
485 RepositoryConnectionFactory connectionFactory = this.configuration.getConnectionFactory();
486 ExecutorService executor = this.configuration.getExecutor();
487 CachePolicy defaultCachePolicy = this.configuration.getDefaultCachePolicy();
488 this.configuration = new FederatedRepository(name, connectionFactory, workspaces, defaultCachePolicy, executor);
489 return true;
490 }
491
492 public synchronized boolean hasWorkspace( String workspaceName ) {
493 CheckArg.isNotNull(workspaceName, "workspaceName");
494 return this.configuration != null && this.configuration.getWorkspaceNames().contains(workspaceName);
495 }
496
497
498
499
500
501
502
503
504
505 protected Projection createProjection( ExecutionContext context,
506 ProjectionParser projectionParser,
507 Node node ) {
508 ValueFactory<String> strings = context.getValueFactories().getStringFactory();
509
510 Path path = node.getLocation().getPath();
511
512
513 String sourceName = path.getLastSegment().getName().getLocalName();
514 Property sourceNameProperty = node.getProperty(ModeShapeLexicon.SOURCE_NAME);
515 if (sourceNameProperty != null && !sourceNameProperty.isEmpty()) {
516
517 sourceName = strings.create(sourceNameProperty.getFirstValue());
518 }
519 assert sourceName != null;
520
521
522 String workspaceName = null;
523 Property workspaceNameProperty = node.getProperty(ModeShapeLexicon.WORKSPACE_NAME);
524 if (workspaceNameProperty != null && !workspaceNameProperty.isEmpty()) {
525
526 workspaceName = strings.create(workspaceNameProperty.getFirstValue());
527 }
528
529
530 Projection.Rule[] projectionRules = null;
531 Property projectionRulesProperty = node.getProperty(ModeShapeLexicon.PROJECTION_RULES);
532 if (projectionRulesProperty != null && !projectionRulesProperty.isEmpty()) {
533 String[] projectionRuleStrs = strings.create(projectionRulesProperty.getValuesAsArray());
534 if (projectionRuleStrs != null && projectionRuleStrs.length != 0) {
535 projectionRules = projectionParser.rulesFromStrings(context, projectionRuleStrs);
536 }
537 }
538
539
540 boolean readOnly = false;
541 Property readOnlyProperty = node.getProperty(ModeShapeLexicon.READ_ONLY);
542 if (readOnlyProperty != null && !readOnlyProperty.isEmpty()) {
543 readOnly = context.getValueFactories().getBooleanFactory().create(readOnlyProperty.getFirstValue());
544 }
545
546 return new Projection(sourceName, workspaceName, readOnly, projectionRules);
547 }
548
549 }