1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 package org.modeshape.graph.search;
25
26 import java.util.LinkedList;
27 import java.util.List;
28 import java.util.concurrent.CancellationException;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.ExecutorService;
31 import java.util.concurrent.TimeUnit;
32 import javax.naming.NamingException;
33 import javax.naming.Reference;
34 import javax.transaction.xa.XAResource;
35 import net.jcip.annotations.NotThreadSafe;
36 import org.modeshape.common.i18n.I18n;
37 import org.modeshape.common.util.CheckArg;
38 import org.modeshape.common.util.Logger;
39 import org.modeshape.graph.ExecutionContext;
40 import org.modeshape.graph.GraphI18n;
41 import org.modeshape.graph.Subgraph;
42 import org.modeshape.graph.cache.CachePolicy;
43 import org.modeshape.graph.connector.RepositoryConnection;
44 import org.modeshape.graph.connector.RepositoryConnectionFactory;
45 import org.modeshape.graph.connector.RepositoryContext;
46 import org.modeshape.graph.connector.RepositorySource;
47 import org.modeshape.graph.connector.RepositorySourceCapabilities;
48 import org.modeshape.graph.connector.RepositorySourceException;
49 import org.modeshape.graph.observe.Changes;
50 import org.modeshape.graph.observe.Observer;
51 import org.modeshape.graph.request.AccessQueryRequest;
52 import org.modeshape.graph.request.CompositeRequest;
53 import org.modeshape.graph.request.CompositeRequestChannel;
54 import org.modeshape.graph.request.FullTextSearchRequest;
55 import org.modeshape.graph.request.Request;
56 import org.modeshape.graph.request.processor.RequestProcessor;
57
58
59
60
61
62
63
64 public class SearchableRepositorySource implements RepositorySource {
65
66 private static final long serialVersionUID = 1L;
67
68 private final RepositorySource delegate;
69 private final boolean executeAsynchronously;
70 private final boolean updateIndexesAsynchronously;
71 private final transient ExecutorService executorService;
72 private final transient SearchEngine searchEngine;
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87 public SearchableRepositorySource( RepositorySource wrapped,
88 SearchEngine searchEngine,
89 ExecutorService executorService,
90 boolean executeAsynchronously,
91 boolean updateIndexesAsynchronously ) {
92 CheckArg.isNotNull(wrapped, "wrapped");
93 CheckArg.isNotNull(searchEngine, "searchEngine");
94 this.delegate = wrapped;
95 this.executorService = executorService;
96 this.searchEngine = searchEngine;
97 this.updateIndexesAsynchronously = this.executorService != null && updateIndexesAsynchronously;
98 this.executeAsynchronously = this.executorService != null && executeAsynchronously;
99 }
100
101
102
103
104
105
106
107
108
109
110
111
112
113 public SearchableRepositorySource( RepositorySource wrapped,
114 SearchEngine searchEngine ) {
115 this(wrapped, searchEngine, null, false, false);
116 }
117
118
119
120
121
122
123 public String getName() {
124 return delegate.getName();
125 }
126
127
128
129
130
131
132 public void close() {
133 this.delegate.close();
134 }
135
136
137
138
139
140
141 public RepositorySourceCapabilities getCapabilities() {
142
143 return new RepositorySourceCapabilities(this.delegate.getCapabilities()) {
144
145
146
147
148
149 @Override
150 public boolean supportsQueries() {
151 return true;
152 }
153
154
155
156
157
158
159 @Override
160 public boolean supportsSearches() {
161 return true;
162 }
163 };
164 }
165
166
167
168
169
170
171 public RepositoryConnection getConnection() throws RepositorySourceException {
172 if (executeRequestsAsynchronously()) {
173
174 assert executorService != null;
175 return new ParallelConnection(executorService);
176 }
177
178 return new SynchronousConnection();
179 }
180
181
182
183
184
185
186 public int getRetryLimit() {
187 return delegate.getRetryLimit();
188 }
189
190
191
192
193
194
195 public void initialize( final RepositoryContext context ) throws RepositorySourceException {
196 final String delegateSourceName = delegate.getName();
197
198
199
200 final RepositoryConnectionFactory originalConnectionFactory = context.getRepositoryConnectionFactory();
201 final RepositoryConnectionFactory connectionFactory = new RepositoryConnectionFactory() {
202
203
204
205
206
207 public RepositoryConnection createConnection( String sourceName ) throws RepositorySourceException {
208 if (delegateSourceName.equals(sourceName)) return delegate().getConnection();
209 return originalConnectionFactory.createConnection(sourceName);
210 }
211 };
212
213
214 final Observer observer = new Observer() {
215
216
217
218
219
220 public void notify( final Changes changes ) {
221 if (changes != null) {
222 if (updateIndexesAsynchronously()) {
223
224 executorService().submit(new Runnable() {
225 public void run() {
226 process(context.getExecutionContext(), changes);
227 }
228 });
229 } else {
230 process(context.getExecutionContext(), changes);
231 }
232 }
233 }
234 };
235
236
237 final RepositoryContext newContext = new RepositoryContext() {
238
239
240
241
242
243 public Subgraph getConfiguration( int depth ) {
244 return context.getConfiguration(depth);
245 }
246
247
248
249
250
251
252 public ExecutionContext getExecutionContext() {
253 return context.getExecutionContext();
254 }
255
256
257
258
259
260
261 public Observer getObserver() {
262 return observer;
263 }
264
265
266
267
268
269
270 public RepositoryConnectionFactory getRepositoryConnectionFactory() {
271 return connectionFactory;
272 }
273 };
274
275
276 delegate.initialize(newContext);
277 }
278
279 protected final SearchEngine searchEngine() {
280 assert searchEngine != null;
281 return searchEngine;
282 }
283
284 protected final boolean updateIndexesAsynchronously() {
285 return executorService != null && updateIndexesAsynchronously;
286 }
287
288 protected final boolean executeRequestsAsynchronously() {
289 return executorService != null && executeAsynchronously;
290 }
291
292 protected final ExecutorService executorService() {
293 assert executorService != null;
294 return executorService;
295 }
296
297 protected final RepositorySource delegate() {
298 return this.delegate;
299 }
300
301
302
303
304
305
306
307
308
309
310 protected void process( ExecutionContext context,
311 Changes changes ) {
312 assert context != null;
313 assert changes != null;
314 if (searchEngine == null) return;
315
316 searchEngine.index(context, changes.getChangeRequests());
317 }
318
319
320
321
322
323
324 public void setRetryLimit( int limit ) {
325 delegate.setRetryLimit(limit);
326 }
327
328
329
330
331
332
333 public Reference getReference() throws NamingException {
334 return delegate.getReference();
335 }
336
337 @NotThreadSafe
338 protected abstract class AbstractConnection implements RepositoryConnection {
339 private RepositoryConnection delegateConnection;
340
341 protected AbstractConnection() {
342 }
343
344 protected RepositoryConnection delegateConnection() {
345 if (delegateConnection == null) {
346 delegateConnection = delegate().getConnection();
347 }
348 return delegateConnection;
349 }
350
351
352
353
354
355
356 public boolean ping( long time,
357 TimeUnit unit ) throws InterruptedException {
358 return delegateConnection().ping(time, unit);
359 }
360
361
362
363
364
365
366 public CachePolicy getDefaultCachePolicy() {
367 return delegateConnection().getDefaultCachePolicy();
368 }
369
370
371
372
373
374
375 public String getSourceName() {
376 return delegate().getName();
377 }
378
379
380
381
382
383
384 public XAResource getXAResource() {
385 return delegateConnection().getXAResource();
386 }
387
388
389
390
391
392
393 public void close() {
394 if (delegateConnection != null) {
395 try {
396 delegateConnection.close();
397 } finally {
398 delegateConnection = null;
399 }
400 }
401 }
402 }
403
404
405
406
407
408
409 @NotThreadSafe
410 protected class ParallelConnection extends AbstractConnection {
411 private final ExecutorService executorService;
412
413 protected ParallelConnection( ExecutorService executorService ) {
414 this.executorService = executorService;
415 }
416
417
418
419
420
421
422
423 public void execute( ExecutionContext context,
424 Request request ) throws RepositorySourceException {
425 if (request instanceof AccessQueryRequest) {
426 AccessQueryRequest queryRequest = (AccessQueryRequest)request;
427 RequestProcessor searchProcessor = searchEngine().createProcessor(context, null, true);
428 try {
429 searchProcessor.process(queryRequest);
430 } finally {
431 searchProcessor.close();
432 }
433 } else if (request instanceof FullTextSearchRequest) {
434 FullTextSearchRequest searchRequest = (FullTextSearchRequest)request;
435 RequestProcessor searchProcessor = searchEngine().createProcessor(context, null, true);
436 try {
437 searchProcessor.process(searchRequest);
438 } finally {
439 searchProcessor.close();
440 }
441 } else if (request instanceof CompositeRequest) {
442 CompositeRequest composite = (CompositeRequest)request;
443 CompositeRequestChannel channel = null;
444 RequestProcessor searchProcessor = null;
445 try {
446 for (Request nested : composite) {
447 if (nested instanceof AccessQueryRequest) {
448 AccessQueryRequest queryRequest = (AccessQueryRequest)request;
449 if (searchProcessor == null) searchProcessor = searchEngine().createProcessor(context, null, true);
450 searchProcessor.process(queryRequest);
451 } else if (nested instanceof FullTextSearchRequest) {
452 FullTextSearchRequest searchRequest = (FullTextSearchRequest)request;
453 if (searchProcessor == null) searchProcessor = searchEngine().createProcessor(context, null, true);
454 searchProcessor.process(searchRequest);
455 } else {
456
457 if (channel == null) {
458
459 RepositoryConnectionFactory connectionFactory = new RepositoryConnectionFactory() {
460
461
462
463
464
465 public RepositoryConnection createConnection( String sourceName )
466 throws RepositorySourceException {
467 assert delegate().getName().equals(sourceName);
468 return delegateConnection();
469 }
470 };
471 channel = new CompositeRequestChannel(delegate().getName());
472 channel.start(executorService, context, connectionFactory);
473 }
474 channel.add(request);
475 }
476 }
477 } finally {
478 try {
479 if (searchProcessor != null) {
480 searchProcessor.close();
481 }
482 } finally {
483 if (channel != null) {
484 try {
485 channel.close();
486 } finally {
487 try {
488 channel.await();
489 } catch (CancellationException err) {
490 composite.cancel();
491 } catch (ExecutionException err) {
492 composite.setError(err);
493 } catch (InterruptedException err) {
494
495 Thread.interrupted();
496
497 I18n msg = GraphI18n.interruptedWhileClosingChannel;
498 Logger.getLogger(getClass()).warn(err, msg, delegate().getName());
499 composite.setError(err);
500 }
501 }
502 }
503 }
504 }
505 } else {
506
507 delegateConnection().execute(context, request);
508 }
509 }
510 }
511
512
513
514
515 @NotThreadSafe
516 protected class SynchronousConnection extends AbstractConnection {
517
518 protected SynchronousConnection() {
519 }
520
521
522
523
524
525
526
527 public void execute( final ExecutionContext context,
528 final Request request ) throws RepositorySourceException {
529 if (request instanceof AccessQueryRequest) {
530 AccessQueryRequest queryRequest = (AccessQueryRequest)request;
531 RequestProcessor searchProcessor = searchEngine().createProcessor(context, null, true);
532 try {
533 searchProcessor.process(queryRequest);
534 } finally {
535 searchProcessor.close();
536 }
537 } else if (request instanceof FullTextSearchRequest) {
538 FullTextSearchRequest searchRequest = (FullTextSearchRequest)request;
539 RequestProcessor searchProcessor = searchEngine().createProcessor(context, null, true);
540 try {
541 searchProcessor.process(searchRequest);
542 } finally {
543 searchProcessor.close();
544 }
545 } else if (request instanceof CompositeRequest) {
546 CompositeRequest composite = (CompositeRequest)request;
547 List<Request> delegateRequests = null;
548 RequestProcessor searchProcessor = null;
549 try {
550 Request delegateRequest = composite;
551 for (Request nested : composite) {
552 if (nested instanceof AccessQueryRequest) {
553 AccessQueryRequest queryRequest = (AccessQueryRequest)request;
554 if (searchProcessor == null) searchProcessor = searchEngine().createProcessor(context, null, true);
555 searchProcessor.process(queryRequest);
556 delegateRequest = null;
557 } else if (nested instanceof FullTextSearchRequest) {
558 FullTextSearchRequest searchRequest = (FullTextSearchRequest)request;
559 if (searchProcessor == null) searchProcessor = searchEngine().createProcessor(context, null, true);
560 searchProcessor.process(searchRequest);
561 delegateRequest = null;
562 } else {
563
564 if (delegateRequests == null) {
565 delegateRequests = new LinkedList<Request>();
566 }
567 delegateRequests.add(request);
568 }
569 }
570 if (delegateRequest == null) {
571
572 if (delegateRequests != null) {
573
574 assert !delegateRequests.isEmpty();
575 delegateRequest = CompositeRequest.with(delegateRequests);
576 delegateConnection().execute(context, delegateRequest);
577 } else {
578
579
580 }
581 } else {
582
583 delegateConnection().execute(context, request);
584 }
585 } finally {
586 if (searchProcessor != null) {
587 searchProcessor.close();
588 }
589 }
590 } else {
591
592 delegateConnection().execute(context, request);
593 }
594 }
595 }
596 }