1 /*
2 * ModeShape (http://www.modeshape.org)
3 * See the COPYRIGHT.txt file distributed with this work for information
4 * regarding copyright ownership. Some portions may be licensed
5 * to Red Hat, Inc. under one or more contributor license agreements.
6 * See the AUTHORS.txt file in the distribution for a full listing of
7 * individual contributors.
8 *
9 * ModeShape is free software. Unless otherwise indicated, all code in ModeShape
10 * is licensed to you under the terms of the GNU Lesser General Public License as
11 * published by the Free Software Foundation; either version 2.1 of
12 * the License, or (at your option) any later version.
13 *
14 * ModeShape is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 * Lesser General Public License for more details.
18 *
19 * You should have received a copy of the GNU Lesser General Public
20 * License along with this software; if not, write to the Free
21 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
22 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
23 */
24 package org.modeshape.graph.search;
25
26 import java.util.LinkedList;
27 import java.util.List;
28 import java.util.concurrent.CancellationException;
29 import java.util.concurrent.ExecutionException;
30 import java.util.concurrent.ExecutorService;
31 import java.util.concurrent.TimeUnit;
32 import javax.naming.NamingException;
33 import javax.naming.Reference;
34 import javax.transaction.xa.XAResource;
35 import net.jcip.annotations.NotThreadSafe;
36 import net.jcip.annotations.ThreadSafe;
37 import org.modeshape.common.i18n.I18n;
38 import org.modeshape.common.util.CheckArg;
39 import org.modeshape.common.util.Logger;
40 import org.modeshape.graph.ExecutionContext;
41 import org.modeshape.graph.GraphI18n;
42 import org.modeshape.graph.Subgraph;
43 import org.modeshape.graph.cache.CachePolicy;
44 import org.modeshape.graph.connector.RepositoryConnection;
45 import org.modeshape.graph.connector.RepositoryConnectionFactory;
46 import org.modeshape.graph.connector.RepositoryContext;
47 import org.modeshape.graph.connector.RepositorySource;
48 import org.modeshape.graph.connector.RepositorySourceCapabilities;
49 import org.modeshape.graph.connector.RepositorySourceException;
50 import org.modeshape.graph.observe.Changes;
51 import org.modeshape.graph.observe.Observer;
52 import org.modeshape.graph.request.AccessQueryRequest;
53 import org.modeshape.graph.request.CompositeRequest;
54 import org.modeshape.graph.request.CompositeRequestChannel;
55 import org.modeshape.graph.request.FullTextSearchRequest;
56 import org.modeshape.graph.request.Request;
57 import org.modeshape.graph.request.processor.RequestProcessor;
58
59 /**
60 * A {@link RepositorySource} implementation that can be used as a wrapper around another
61 * {@link RepositorySourceCapabilities#supportsSearches() non-searchable} or
62 * {@link RepositorySourceCapabilities#supportsQueries() non-querable} RepositorySource instance to provide search and query
63 * capability.
64 */
65 @ThreadSafe
66 public class SearchableRepositorySource implements RepositorySource {
67
68 private static final long serialVersionUID = 1L;
69
70 private final RepositorySource delegate;
71 private final boolean executeAsynchronously;
72 private final boolean updateIndexesAsynchronously;
73 private final transient ExecutorService executorService;
74 private final transient SearchEngine searchEngine;
75
76 /**
77 * Create a new searchable and queryable RepositorySource around an instance that is neither.
78 *
79 * @param wrapped the RepositorySource that is not searchable and queryable
80 * @param searchEngine the search engine that is to be used
81 * @param executorService the ExecutorService that should be used when submitting requests to the wrapped service; may be null
82 * if all operations should be performed in the calling thread
83 * @param executeAsynchronously true if an {@link ExecutorService} is provided and the requests to the wrapped source are to
84 * be executed asynchronously
85 * @param updateIndexesAsynchronously true if an {@link ExecutorService} is provided and the indexes are to be updated in a
86 * different thread than the thread executing the {@link RepositoryConnection#execute(ExecutionContext, Request)}
87 * calls.
88 */
89 public SearchableRepositorySource( RepositorySource wrapped,
90 SearchEngine searchEngine,
91 ExecutorService executorService,
92 boolean executeAsynchronously,
93 boolean updateIndexesAsynchronously ) {
94 CheckArg.isNotNull(wrapped, "wrapped");
95 CheckArg.isNotNull(searchEngine, "searchEngine");
96 this.delegate = wrapped;
97 this.executorService = executorService;
98 this.searchEngine = searchEngine;
99 this.updateIndexesAsynchronously = this.executorService != null && updateIndexesAsynchronously;
100 this.executeAsynchronously = this.executorService != null && executeAsynchronously;
101 }
102
103 /**
104 * Create a new searchable and queryable RepositorySource around an instance that is neither. All of the request processing
105 * will be done in the calling thread, and updating the indexes will be done synchronously within the context of the
106 * {@link RepositoryConnection#execute(ExecutionContext, Request)} method (and obviously on the same thread). This means that
107 * the execution of the requests will not return until the indexes have been updated with any changes made by the requests.
108 * <p>
109 * This is equivalent to calling <code>new SearchableRepositorySource(wrapped,searchEngine,null,false)</code>
110 * </p>
111 *
112 * @param wrapped the RepositorySource that is not searchable and queryable
113 * @param searchEngine the search engine that is to be used
114 */
115 public SearchableRepositorySource( RepositorySource wrapped,
116 SearchEngine searchEngine ) {
117 this(wrapped, searchEngine, null, false, false);
118 }
119
120 /**
121 * {@inheritDoc}
122 *
123 * @see org.modeshape.graph.connector.RepositorySource#getName()
124 */
125 public String getName() {
126 return delegate.getName();
127 }
128
129 /**
130 * {@inheritDoc}
131 *
132 * @see org.modeshape.graph.connector.RepositorySource#close()
133 */
134 public void close() {
135 this.delegate.close();
136 }
137
138 /**
139 * {@inheritDoc}
140 *
141 * @see org.modeshape.graph.connector.RepositorySource#getCapabilities()
142 */
143 public RepositorySourceCapabilities getCapabilities() {
144 // Return the capabilities of the source, except with search and query suppport enabled ...
145 return new RepositorySourceCapabilities(this.delegate.getCapabilities()) {
146 /**
147 * {@inheritDoc}
148 *
149 * @see org.modeshape.graph.connector.RepositorySourceCapabilities#supportsQueries()
150 */
151 @Override
152 public boolean supportsQueries() {
153 return true;
154 }
155
156 /**
157 * {@inheritDoc}
158 *
159 * @see org.modeshape.graph.connector.RepositorySourceCapabilities#supportsSearches()
160 */
161 @Override
162 public boolean supportsSearches() {
163 return true;
164 }
165 };
166 }
167
168 /**
169 * {@inheritDoc}
170 *
171 * @see org.modeshape.graph.connector.RepositorySource#getConnection()
172 */
173 public RepositoryConnection getConnection() throws RepositorySourceException {
174 if (executeRequestsAsynchronously()) {
175 // Use the executor service ...
176 assert executorService != null;
177 return new ParallelConnection(executorService);
178 }
179 // We need to do the processing in this thread ...
180 return new SynchronousConnection();
181 }
182
183 /**
184 * {@inheritDoc}
185 *
186 * @see org.modeshape.graph.connector.RepositorySource#getRetryLimit()
187 */
188 public int getRetryLimit() {
189 return delegate.getRetryLimit();
190 }
191
192 /**
193 * {@inheritDoc}
194 *
195 * @see org.modeshape.graph.connector.RepositorySource#initialize(org.modeshape.graph.connector.RepositoryContext)
196 */
197 public void initialize( final RepositoryContext context ) throws RepositorySourceException {
198 final String delegateSourceName = delegate.getName();
199
200 // The search engine will need a connection factory to the source, but the 'context' connection factory
201 // will point back to this wrapper. So make one ...
202 final RepositoryConnectionFactory originalConnectionFactory = context.getRepositoryConnectionFactory();
203 final RepositoryConnectionFactory connectionFactory = new RepositoryConnectionFactory() {
204 /**
205 * {@inheritDoc}
206 *
207 * @see org.modeshape.graph.connector.RepositoryConnectionFactory#createConnection(java.lang.String)
208 */
209 public RepositoryConnection createConnection( String sourceName ) throws RepositorySourceException {
210 if (delegateSourceName.equals(sourceName)) return delegate().getConnection();
211 return originalConnectionFactory.createConnection(sourceName);
212 }
213 };
214
215 // Create an observer so that we know what changes are being made in the delegate ...
216 final Observer observer = new Observer() {
217 /**
218 * {@inheritDoc}
219 *
220 * @see org.modeshape.graph.observe.Observer#notify(org.modeshape.graph.observe.Changes)
221 */
222 public void notify( final Changes changes ) {
223 if (changes != null) {
224 if (updateIndexesAsynchronously()) {
225 // Enqueue the changes in the delegate content ...
226 executorService().submit(new Runnable() {
227 public void run() {
228 process(context.getExecutionContext(), changes);
229 }
230 });
231 } else {
232 process(context.getExecutionContext(), changes);
233 }
234 }
235 }
236 };
237
238 // Create a new RepositoryContext that uses our observer and connection factory ...
239 final RepositoryContext newContext = new RepositoryContext() {
240 /**
241 * {@inheritDoc}
242 *
243 * @see org.modeshape.graph.connector.RepositoryContext#getConfiguration(int)
244 */
245 public Subgraph getConfiguration( int depth ) {
246 return context.getConfiguration(depth);
247 }
248
249 /**
250 * {@inheritDoc}
251 *
252 * @see org.modeshape.graph.connector.RepositoryContext#getExecutionContext()
253 */
254 public ExecutionContext getExecutionContext() {
255 return context.getExecutionContext();
256 }
257
258 /**
259 * {@inheritDoc}
260 *
261 * @see org.modeshape.graph.connector.RepositoryContext#getObserver()
262 */
263 public Observer getObserver() {
264 return observer;
265 }
266
267 /**
268 * {@inheritDoc}
269 *
270 * @see org.modeshape.graph.connector.RepositoryContext#getRepositoryConnectionFactory()
271 */
272 public RepositoryConnectionFactory getRepositoryConnectionFactory() {
273 return connectionFactory;
274 }
275 };
276
277 // Now initialize the delegate with the delegate's context ...
278 delegate.initialize(newContext);
279 }
280
281 protected final SearchEngine searchEngine() {
282 assert searchEngine != null;
283 return searchEngine;
284 }
285
286 protected final boolean updateIndexesAsynchronously() {
287 return executorService != null && updateIndexesAsynchronously;
288 }
289
290 protected final boolean executeRequestsAsynchronously() {
291 return executorService != null && executeAsynchronously;
292 }
293
294 protected final ExecutorService executorService() {
295 assert executorService != null;
296 return executorService;
297 }
298
299 protected final RepositorySource delegate() {
300 return this.delegate;
301 }
302
303 /**
304 * Do the work of processing the changes and updating the {@link #searchEngine}. This method may be called while on one of the
305 * threads owned by the {@link #executorService executor service} (if {@link #updateIndexesAsynchronously()} returns true), or
306 * from the thread {@link RepositoryConnection#execute(ExecutionContext, org.modeshape.graph.request.Request) executing} the
307 * requests on the {@link #delegate} (if {@link #updateIndexesAsynchronously()} returns false).
308 *
309 * @param context the execution context in which the indexes should be updated
310 * @param changes the changes; never null
311 */
312 protected void process( ExecutionContext context,
313 Changes changes ) {
314 assert context != null;
315 assert changes != null;
316 if (searchEngine == null) return; // null only after deserialization ...
317 // Obtain a request processor
318 searchEngine.index(context, changes.getChangeRequests());
319 }
320
321 /**
322 * {@inheritDoc}
323 *
324 * @see org.modeshape.graph.connector.RepositorySource#setRetryLimit(int)
325 */
326 public void setRetryLimit( int limit ) {
327 delegate.setRetryLimit(limit);
328 }
329
330 /**
331 * {@inheritDoc}
332 *
333 * @see javax.naming.Referenceable#getReference()
334 */
335 public Reference getReference() throws NamingException {
336 return delegate.getReference();
337 }
338
339 @NotThreadSafe
340 protected abstract class AbstractConnection implements RepositoryConnection {
341 private RepositoryConnection delegateConnection;
342
343 protected AbstractConnection() {
344 }
345
346 protected RepositoryConnection delegateConnection() {
347 if (delegateConnection == null) {
348 delegateConnection = delegate().getConnection();
349 }
350 return delegateConnection;
351 }
352
353 /**
354 * {@inheritDoc}
355 *
356 * @see org.modeshape.graph.connector.RepositoryConnection#ping(long, java.util.concurrent.TimeUnit)
357 */
358 public boolean ping( long time,
359 TimeUnit unit ) throws InterruptedException {
360 return delegateConnection().ping(time, unit);
361 }
362
363 /**
364 * {@inheritDoc}
365 *
366 * @see org.modeshape.graph.connector.RepositoryConnection#getDefaultCachePolicy()
367 */
368 public CachePolicy getDefaultCachePolicy() {
369 return delegateConnection().getDefaultCachePolicy();
370 }
371
372 /**
373 * {@inheritDoc}
374 *
375 * @see org.modeshape.graph.connector.RepositoryConnection#getSourceName()
376 */
377 public String getSourceName() {
378 return delegate().getName();
379 }
380
381 /**
382 * {@inheritDoc}
383 *
384 * @see org.modeshape.graph.connector.RepositoryConnection#getXAResource()
385 */
386 public XAResource getXAResource() {
387 return delegateConnection().getXAResource();
388 }
389
390 /**
391 * {@inheritDoc}
392 *
393 * @see org.modeshape.graph.connector.RepositoryConnection#close()
394 */
395 public void close() {
396 if (delegateConnection != null) {
397 try {
398 delegateConnection.close();
399 } finally {
400 delegateConnection = null;
401 }
402 }
403 }
404 }
405
406 /**
407 * A {@link RepositoryConnection} implementation that calls the delegate processor in a background thread, allowing the
408 * processing of the {@link FullTextSearchRequest} and {@link AccessQueryRequest} objects to be done in this thread and in
409 * parallel with other requests.
410 */
411 @NotThreadSafe
412 protected class ParallelConnection extends AbstractConnection {
413 private final ExecutorService executorService;
414
415 protected ParallelConnection( ExecutorService executorService ) {
416 this.executorService = executorService;
417 }
418
419 /**
420 * {@inheritDoc}
421 *
422 * @see org.modeshape.graph.connector.RepositoryConnection#execute(org.modeshape.graph.ExecutionContext,
423 * org.modeshape.graph.request.Request)
424 */
425 public void execute( ExecutionContext context,
426 Request request ) throws RepositorySourceException {
427 RequestProcessor searchProcessor = null;
428
429 switch (request.getType()) {
430 case ACCESS_QUERY:
431 AccessQueryRequest queryRequest = (AccessQueryRequest)request;
432 searchProcessor = searchEngine().createProcessor(context, null, true);
433 try {
434 searchProcessor.process(queryRequest);
435 } finally {
436 searchProcessor.close();
437 }
438 break;
439 case FULL_TEXT_SEARCH:
440 FullTextSearchRequest searchRequest = (FullTextSearchRequest)request;
441 searchProcessor = searchEngine().createProcessor(context, null, true);
442 try {
443 searchProcessor.process(searchRequest);
444 } finally {
445 searchProcessor.close();
446 }
447 break;
448 case COMPOSITE:
449 CompositeRequest composite = (CompositeRequest)request;
450 CompositeRequestChannel channel = null;
451 try {
452 for (Request nested : composite) {
453 switch (nested.getType()) {
454 case ACCESS_QUERY:
455 queryRequest = (AccessQueryRequest)request;
456 if (searchProcessor == null) {
457 searchProcessor = searchEngine().createProcessor(context, null, true);
458 }
459 searchProcessor.process(queryRequest);
460 break;
461 case FULL_TEXT_SEARCH:
462 searchRequest = (FullTextSearchRequest)request;
463 if (searchProcessor == null) {
464 searchProcessor = searchEngine().createProcessor(context, null, true);
465 }
466 searchProcessor.process(searchRequest);
467 break;
468 default:
469 // Delegate to the channel ...
470 if (channel == null) {
471 // Create a connection factory that always returns the delegate connection ...
472 RepositoryConnectionFactory connectionFactory = new RepositoryConnectionFactory() {
473 /**
474 * {@inheritDoc}
475 *
476 * @see org.modeshape.graph.connector.RepositoryConnectionFactory#createConnection(java.lang.String)
477 */
478 public RepositoryConnection createConnection( String sourceName )
479 throws RepositorySourceException {
480 assert delegate().getName().equals(sourceName);
481 return delegateConnection();
482 }
483 };
484 channel = new CompositeRequestChannel(delegate().getName());
485 channel.start(executorService, context, connectionFactory);
486 }
487 channel.add(request);
488
489 }
490 }
491 } finally {
492 try {
493 if (searchProcessor != null) {
494 searchProcessor.close();
495 }
496 } finally {
497 if (channel != null) {
498 try {
499 channel.close();
500 } finally {
501 try {
502 channel.await();
503 } catch (CancellationException err) {
504 composite.cancel();
505 } catch (ExecutionException err) {
506 composite.setError(err);
507 } catch (InterruptedException err) {
508 // Reset the thread ...
509 Thread.interrupted();
510 // Then log the message ...
511 I18n msg = GraphI18n.interruptedWhileClosingChannel;
512 Logger.getLogger(getClass()).warn(err, msg, delegate().getName());
513 composite.setError(err);
514 }
515 }
516 }
517 }
518 }
519 break;
520 default:
521 delegateConnection().execute(context, request);
522
523 }
524 }
525 }
526
527 /**
528 * A {@link RepositoryConnection} implementation that calls the delegate processor in the calling thread.
529 */
530 @NotThreadSafe
531 protected class SynchronousConnection extends AbstractConnection {
532
533 protected SynchronousConnection() {
534 }
535
536 /**
537 * {@inheritDoc}
538 *
539 * @see org.modeshape.graph.connector.RepositoryConnection#execute(org.modeshape.graph.ExecutionContext,
540 * org.modeshape.graph.request.Request)
541 */
542 public void execute( final ExecutionContext context,
543 final Request request ) throws RepositorySourceException {
544 RequestProcessor searchProcessor = null;
545
546 switch (request.getType()) {
547 case ACCESS_QUERY:
548 AccessQueryRequest queryRequest = (AccessQueryRequest)request;
549 searchProcessor = searchEngine().createProcessor(context, null, true);
550 try {
551 searchProcessor.process(queryRequest);
552 } finally {
553 searchProcessor.close();
554 }
555 break;
556 case FULL_TEXT_SEARCH:
557 FullTextSearchRequest searchRequest = (FullTextSearchRequest)request;
558 searchProcessor = searchEngine().createProcessor(context, null, true);
559 try {
560 searchProcessor.process(searchRequest);
561 } finally {
562 searchProcessor.close();
563 }
564 break;
565 case COMPOSITE:
566 CompositeRequest composite = (CompositeRequest)request;
567 List<Request> delegateRequests = null;
568 try {
569 Request delegateRequest = composite;
570 for (Request nested : composite) {
571 switch (nested.getType()) {
572 case ACCESS_QUERY:
573 queryRequest = (AccessQueryRequest)request;
574 if (searchProcessor == null) {
575 searchProcessor = searchEngine().createProcessor(context, null, true);
576 }
577 searchProcessor.process(queryRequest);
578 delegateRequest = null;
579 break;
580 case FULL_TEXT_SEARCH:
581 searchRequest = (FullTextSearchRequest)request;
582 if (searchProcessor == null) {
583 searchProcessor = searchEngine().createProcessor(context, null, true);
584 }
585 searchProcessor.process(searchRequest);
586 delegateRequest = null;
587 break;
588 default:
589 // Delegate the request ...
590 if (delegateRequests == null) {
591 delegateRequests = new LinkedList<Request>();
592 }
593 delegateRequests.add(request);
594
595 }
596 }
597 if (delegateRequest == null) {
598 // Then there was at least one query or search request ...
599 if (delegateRequests != null) {
600 // There was other requests ...
601 assert !delegateRequests.isEmpty();
602 delegateRequest = CompositeRequest.with(delegateRequests);
603 delegateConnection().execute(context, delegateRequest);
604 } else {
605 // There were no other requests in the composite other than the search and/or query requests ...
606 // So nothing to do ...
607 }
608 } else {
609 // There were no search or query requests, so delegate the orginal composite request ...
610 delegateConnection().execute(context, request);
611 }
612 } finally {
613 if (searchProcessor != null) {
614 searchProcessor.close();
615 }
616 }
617 break;
618 default:
619 // Just a single, non-query and non-search request ...
620 delegateConnection().execute(context, request);
621
622 }
623 }
624 }
625 }