1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 package org.modeshape.connector.infinispan;
25
26 import java.io.ByteArrayInputStream;
27 import java.io.ByteArrayOutputStream;
28 import java.io.FileInputStream;
29 import java.io.IOException;
30 import java.io.InputStream;
31 import java.io.ObjectInputStream;
32 import java.io.ObjectOutputStream;
33 import java.util.Enumeration;
34 import java.util.HashMap;
35 import java.util.Hashtable;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.UUID;
39 import javax.naming.BinaryRefAddr;
40 import javax.naming.Context;
41 import javax.naming.InitialContext;
42 import javax.naming.NamingException;
43 import javax.naming.RefAddr;
44 import javax.naming.Reference;
45 import javax.naming.Referenceable;
46 import javax.naming.StringRefAddr;
47 import javax.naming.spi.ObjectFactory;
48 import net.jcip.annotations.ThreadSafe;
49 import org.infinispan.Cache;
50 import org.infinispan.manager.CacheManager;
51 import org.infinispan.manager.DefaultCacheManager;
52 import org.modeshape.common.i18n.I18n;
53 import org.modeshape.common.util.HashCode;
54 import org.modeshape.common.util.StringUtil;
55 import org.modeshape.graph.cache.CachePolicy;
56 import org.modeshape.graph.connector.RepositoryConnection;
57 import org.modeshape.graph.connector.RepositoryContext;
58 import org.modeshape.graph.connector.RepositorySource;
59 import org.modeshape.graph.connector.RepositorySourceCapabilities;
60 import org.modeshape.graph.connector.RepositorySourceException;
61 import org.modeshape.graph.connector.base.BaseRepositorySource;
62 import org.modeshape.graph.connector.base.Connection;
63 import org.modeshape.graph.observe.Observer;
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79 @ThreadSafe
80 public class InfinispanSource implements BaseRepositorySource, ObjectFactory {
81
82 private static final long serialVersionUID = 2L;
83
84
85
86 public static final int DEFAULT_RETRY_LIMIT = 0;
87
88
89
90
91 public static final String DEFAULT_NAME_OF_DEFAULT_WORKSPACE = "default";
92
93
94
95
96 public static final boolean DEFAULT_UPDATES_ALLOWED = true;
97
98 protected static final String ROOT_NODE_UUID = "rootNodeUuid";
99 protected static final String SOURCE_NAME = "sourceName";
100 protected static final String DEFAULT_CACHE_POLICY = "defaultCachePolicy";
101 protected static final String CACHE_CONFIGURATION_NAME = "cacheConfigurationName";
102 protected static final String CACHE_FACTORY_JNDI_NAME = "cacheManagerJndiName";
103 protected static final String RETRY_LIMIT = "retryLimit";
104 protected static final String DEFAULT_WORKSPACE = "defaultWorkspace";
105 protected static final String PREDEFINED_WORKSPACE_NAMES = "predefinedWorkspaceNames";
106 protected static final String ALLOW_CREATING_WORKSPACES = "allowCreatingWorkspaces";
107 protected static final String UPDATES_ALLOWED = "updatesAllowed";
108
109 private volatile String name;
110 private volatile UUID rootNodeUuid = UUID.fromString("cafebabe-cafe-babe-cafe-babecafebabe");
111 private volatile CachePolicy defaultCachePolicy;
112 private volatile String cacheConfigurationName;
113 private volatile String cacheManagerJndiName;
114 private volatile int retryLimit = DEFAULT_RETRY_LIMIT;
115 private volatile String defaultWorkspace;
116 private volatile boolean updatesAllowed = DEFAULT_UPDATES_ALLOWED;
117 private volatile String[] predefinedWorkspaces = new String[] {};
118 private volatile RepositorySourceCapabilities capabilities = new RepositorySourceCapabilities(true, true, false, true, false);
119 private transient InfinispanRepository repository;
120 private transient Context jndiContext;
121 private transient RepositoryContext repositoryContext;
122
123
124
125
126 public InfinispanSource() {
127 }
128
129
130
131
132
133
134 public void initialize( RepositoryContext context ) throws RepositorySourceException {
135 this.repositoryContext = context;
136 }
137
138
139
140
141 public String getName() {
142 return this.name;
143 }
144
145
146
147
148
149
150 public RepositorySourceCapabilities getCapabilities() {
151 return capabilities;
152 }
153
154
155
156
157
158
159 public int getRetryLimit() {
160 return retryLimit;
161 }
162
163
164
165
166
167
168 public synchronized void setRetryLimit( int limit ) {
169 retryLimit = limit < 0 ? 0 : limit;
170 }
171
172
173
174
175
176
177 public synchronized void setName( String name ) {
178 if (this.name == name || this.name != null && this.name.equals(name)) return;
179 this.name = name;
180 }
181
182
183
184
185
186
187 public CachePolicy getDefaultCachePolicy() {
188 return defaultCachePolicy;
189 }
190
191
192
193
194 public synchronized void setDefaultCachePolicy( CachePolicy defaultCachePolicy ) {
195 if (this.defaultCachePolicy == defaultCachePolicy || this.defaultCachePolicy != null
196 && this.defaultCachePolicy.equals(defaultCachePolicy)) return;
197 this.defaultCachePolicy = defaultCachePolicy;
198 }
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213 public String getCacheManagerJndiName() {
214 return cacheManagerJndiName;
215 }
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231 public synchronized void setCacheManagerJndiName( String jndiName ) {
232 if (this.cacheManagerJndiName == jndiName || this.cacheManagerJndiName != null
233 && this.cacheManagerJndiName.equals(jndiName)) return;
234 this.cacheManagerJndiName = jndiName;
235 }
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251 public String getCacheConfigurationName() {
252 return cacheConfigurationName;
253 }
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269 public synchronized void setCacheConfigurationName( String cacheConfigurationName ) {
270 if (this.cacheConfigurationName == cacheConfigurationName || this.cacheConfigurationName != null
271 && this.cacheConfigurationName.equals(cacheConfigurationName)) return;
272 this.cacheConfigurationName = cacheConfigurationName;
273 }
274
275
276
277
278
279
280
281 public String getRootNodeUuid() {
282 return this.rootNodeUuid.toString();
283 }
284
285
286
287
288
289
290
291 public UUID getRootNodeUuidObject() {
292 return this.rootNodeUuid;
293 }
294
295
296
297
298
299
300
301 public synchronized void setRootNodeUuid( String rootNodeUuid ) {
302 UUID uuid = null;
303 if (rootNodeUuid == null) uuid = UUID.randomUUID();
304 else uuid = UUID.fromString(rootNodeUuid);
305 if (this.rootNodeUuid.equals(uuid)) return;
306 this.rootNodeUuid = uuid;
307 }
308
309
310
311
312
313
314 public String getDefaultWorkspaceName() {
315 return defaultWorkspace;
316 }
317
318
319
320
321
322
323
324 public synchronized void setDefaultWorkspaceName( String nameOfDefaultWorkspace ) {
325 this.defaultWorkspace = nameOfDefaultWorkspace != null ? nameOfDefaultWorkspace : DEFAULT_NAME_OF_DEFAULT_WORKSPACE;
326 }
327
328
329
330
331
332
333
334
335 public synchronized String[] getPredefinedWorkspaceNames() {
336 String[] copy = new String[predefinedWorkspaces.length];
337 System.arraycopy(predefinedWorkspaces, 0, copy, 0, predefinedWorkspaces.length);
338 return copy;
339 }
340
341
342
343
344
345
346
347
348
349 public synchronized void setPredefinedWorkspaceNames( String[] predefinedWorkspaceNames ) {
350 if (predefinedWorkspaceNames != null && predefinedWorkspaceNames.length == 1) {
351 predefinedWorkspaceNames = predefinedWorkspaceNames[0].split("\\s*,\\s*");
352 }
353 this.predefinedWorkspaces = predefinedWorkspaceNames;
354 }
355
356
357
358
359
360
361
362
363
364
365 public boolean isCreatingWorkspacesAllowed() {
366 return capabilities.supportsCreatingWorkspaces();
367 }
368
369
370
371
372
373
374
375
376
377
378 public synchronized void setCreatingWorkspacesAllowed( boolean allowWorkspaceCreation ) {
379 capabilities = new RepositorySourceCapabilities(true, capabilities.supportsUpdates(), false, allowWorkspaceCreation,
380 capabilities.supportsReferences());
381 }
382
383 private CacheManager createCacheManager() {
384 CacheManager cacheManager;
385
386 String configName = getCacheConfigurationName();
387 if (configName == null) {
388 cacheManager = new DefaultCacheManager();
389 } else {
390
391
392
393 InputStream configStream = getClass().getResourceAsStream(configName);
394 try {
395 if (configStream == null) {
396 configStream = new FileInputStream(configName);
397 }
398 } catch (IOException ioe) {
399 I18n msg = InfinispanConnectorI18n.configFileNotFound;
400 throw new RepositorySourceException(this.name, msg.text(configName), ioe);
401 }
402
403 try {
404 cacheManager = new DefaultCacheManager(configStream);
405 } catch (IOException ioe) {
406 I18n msg = InfinispanConnectorI18n.configFileNotValid;
407 throw new RepositorySourceException(this.name, msg.text(configName), ioe);
408 } finally {
409 try {
410 configStream.close();
411 } catch (IOException ioe) {
412 }
413 }
414 }
415
416 return cacheManager;
417 }
418
419 final CacheManager cacheManager() {
420 return repository.getCacheManager();
421 }
422
423
424
425
426
427
428 public synchronized RepositoryConnection getConnection() throws RepositorySourceException {
429 if (getName() == null) {
430 I18n msg = InfinispanConnectorI18n.propertyIsRequired;
431 throw new RepositorySourceException(getName(), msg.text("name"));
432 }
433 if (this.repository == null) {
434 Context context = getContext();
435 if (context == null) {
436 try {
437 context = new InitialContext();
438 } catch (NamingException err) {
439 throw new RepositorySourceException(name, err);
440 }
441 }
442
443
444 CacheManager cacheManager = null;
445 String jndiName = getCacheManagerJndiName();
446 if (jndiName != null && jndiName.trim().length() != 0) {
447 Object object = null;
448 try {
449 object = context.lookup(jndiName);
450 if (object != null) cacheManager = (CacheManager)object;
451 } catch (ClassCastException err) {
452 I18n msg = InfinispanConnectorI18n.objectFoundInJndiWasNotCacheManager;
453 String className = object != null ? object.getClass().getName() : "null";
454 throw new RepositorySourceException(getName(), msg.text(jndiName, this.getName(), className), err);
455 } catch (Throwable err) {
456 if (err instanceof RuntimeException) throw (RuntimeException)err;
457 throw new RepositorySourceException(getName(), err);
458 }
459 }
460 if (cacheManager == null) {
461 cacheManager = createCacheManager();
462 }
463
464
465 this.repository = new InfinispanRepository(this, cacheManager);
466 }
467
468 return new Connection<InfinispanNode, InfinispanWorkspace>(this, repository);
469 }
470
471
472
473
474
475
476 public synchronized void close() {
477 if (this.repository != null) {
478 try {
479 this.repository.shutdown();
480 } finally {
481 this.repository = null;
482 }
483 }
484 }
485
486
487
488
489 public RepositoryContext getRepositoryContext() {
490 return repositoryContext;
491 }
492
493 protected Observer getObserver() {
494 return repositoryContext != null ? repositoryContext.getObserver() : null;
495 }
496
497 protected synchronized Context getContext() {
498 return this.jndiContext;
499 }
500
501 protected synchronized void setContext( Context context ) {
502 this.jndiContext = context;
503 }
504
505 public boolean areUpdatesAllowed() {
506 return this.updatesAllowed;
507 }
508
509 public void setUpdatesAllowed( boolean updatesAllowed ) {
510 this.updatesAllowed = updatesAllowed;
511 }
512
513
514
515
516 @Override
517 public boolean equals( Object obj ) {
518 if (obj == this) return true;
519 if (obj instanceof InfinispanSource) {
520 InfinispanSource that = (InfinispanSource)obj;
521 if (this.getName() == null) {
522 if (that.getName() != null) return false;
523 } else {
524 if (!this.getName().equals(that.getName())) return false;
525 }
526 return true;
527 }
528 return false;
529 }
530
531 @Override
532 public int hashCode() {
533 return HashCode.compute(getName());
534 }
535
536
537
538
539 public synchronized Reference getReference() {
540 String className = getClass().getName();
541 String managerClassName = this.getClass().getName();
542 Reference ref = new Reference(className, managerClassName, null);
543
544 ref.add(new StringRefAddr(SOURCE_NAME, getName()));
545 ref.add(new StringRefAddr(ROOT_NODE_UUID, getRootNodeUuid().toString()));
546 ref.add(new StringRefAddr(CACHE_FACTORY_JNDI_NAME, getCacheManagerJndiName()));
547 ref.add(new StringRefAddr(CACHE_CONFIGURATION_NAME, getCacheConfigurationName()));
548 ref.add(new StringRefAddr(RETRY_LIMIT, Integer.toString(getRetryLimit())));
549 ref.add(new StringRefAddr(DEFAULT_WORKSPACE, getDefaultWorkspaceName()));
550 ref.add(new StringRefAddr(UPDATES_ALLOWED, String.valueOf(areUpdatesAllowed())));
551 ref.add(new StringRefAddr(ALLOW_CREATING_WORKSPACES, Boolean.toString(isCreatingWorkspacesAllowed())));
552 String[] workspaceNames = getPredefinedWorkspaceNames();
553 if (workspaceNames != null && workspaceNames.length != 0) {
554 ref.add(new StringRefAddr(PREDEFINED_WORKSPACE_NAMES, StringUtil.combineLines(workspaceNames)));
555 }
556 if (getDefaultCachePolicy() != null) {
557 ByteArrayOutputStream baos = new ByteArrayOutputStream();
558 CachePolicy policy = getDefaultCachePolicy();
559 try {
560 ObjectOutputStream oos = new ObjectOutputStream(baos);
561 oos.writeObject(policy);
562 ref.add(new BinaryRefAddr(DEFAULT_CACHE_POLICY, baos.toByteArray()));
563 } catch (IOException e) {
564 I18n msg = InfinispanConnectorI18n.errorSerializingCachePolicyInSource;
565 throw new RepositorySourceException(getName(), msg.text(policy.getClass().getName(), getName()), e);
566 }
567 }
568 return ref;
569 }
570
571
572
573
574 public Object getObjectInstance( Object obj,
575 javax.naming.Name name,
576 Context nameCtx,
577 Hashtable<?, ?> environment ) throws Exception {
578 if (obj instanceof Reference) {
579 Map<String, Object> values = new HashMap<String, Object>();
580 Reference ref = (Reference)obj;
581 Enumeration<?> en = ref.getAll();
582 while (en.hasMoreElements()) {
583 RefAddr subref = (RefAddr)en.nextElement();
584 if (subref instanceof StringRefAddr) {
585 String key = subref.getType();
586 Object value = subref.getContent();
587 if (value != null) values.put(key, value.toString());
588 } else if (subref instanceof BinaryRefAddr) {
589 String key = subref.getType();
590 Object value = subref.getContent();
591 if (value instanceof byte[]) {
592
593 ByteArrayInputStream bais = new ByteArrayInputStream((byte[])value);
594 ObjectInputStream ois = new ObjectInputStream(bais);
595 value = ois.readObject();
596 values.put(key, value);
597 }
598 }
599 }
600 String sourceName = (String)values.get(SOURCE_NAME);
601 String rootNodeUuidString = (String)values.get(ROOT_NODE_UUID);
602 String cacheManagerJndiName = (String)values.get(CACHE_FACTORY_JNDI_NAME);
603 String cacheConfigurationName = (String)values.get(CACHE_CONFIGURATION_NAME);
604 Object defaultCachePolicy = values.get(DEFAULT_CACHE_POLICY);
605 String retryLimit = (String)values.get(RETRY_LIMIT);
606 String defaultWorkspace = (String)values.get(DEFAULT_WORKSPACE);
607 String createWorkspaces = (String)values.get(ALLOW_CREATING_WORKSPACES);
608 String updatesAllowed = (String)values.get(UPDATES_ALLOWED);
609
610 String combinedWorkspaceNames = (String)values.get(PREDEFINED_WORKSPACE_NAMES);
611 String[] workspaceNames = null;
612 if (combinedWorkspaceNames != null) {
613 List<String> paths = StringUtil.splitLines(combinedWorkspaceNames);
614 workspaceNames = paths.toArray(new String[paths.size()]);
615 }
616
617
618 InfinispanSource source = new InfinispanSource();
619 if (sourceName != null) source.setName(sourceName);
620 if (rootNodeUuidString != null) source.setRootNodeUuid(rootNodeUuidString);
621 if (cacheManagerJndiName != null) source.setCacheManagerJndiName(cacheManagerJndiName);
622 if (cacheConfigurationName != null) source.setCacheConfigurationName(cacheConfigurationName);
623 if (defaultCachePolicy instanceof CachePolicy) {
624 source.setDefaultCachePolicy((CachePolicy)defaultCachePolicy);
625 }
626 if (retryLimit != null) source.setRetryLimit(Integer.parseInt(retryLimit));
627 if (defaultWorkspace != null) source.setDefaultWorkspaceName(defaultWorkspace);
628 if (createWorkspaces != null) source.setCreatingWorkspacesAllowed(Boolean.parseBoolean(createWorkspaces));
629 if (workspaceNames != null && workspaceNames.length != 0) source.setPredefinedWorkspaceNames(workspaceNames);
630 if (updatesAllowed != null) source.setUpdatesAllowed(Boolean.valueOf(updatesAllowed));
631 return source;
632 }
633 return null;
634 }
635 }