View Javadoc

1   /*
2    * ModeShape (http://www.modeshape.org)
3    * See the COPYRIGHT.txt file distributed with this work for information
4    * regarding copyright ownership.  Some portions may be licensed
5    * to Red Hat, Inc. under one or more contributor license agreements.
6    * See the AUTHORS.txt file in the distribution for a full listing of 
7    * individual contributors.
8    *
9    * ModeShape is free software. Unless otherwise indicated, all code in ModeShape
10   * is licensed to you under the terms of the GNU Lesser General Public License as
11   * published by the Free Software Foundation; either version 2.1 of
12   * the License, or (at your option) any later version.
13   * 
14   * ModeShape is distributed in the hope that it will be useful,
15   * but WITHOUT ANY WARRANTY; without even the implied warranty of
16   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17   * Lesser General Public License for more details.
18   *
19   * You should have received a copy of the GNU Lesser General Public
20   * License along with this software; if not, write to the Free
21   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
22   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
23   */
24  package org.modeshape.graph.search;
25  
26  import java.util.LinkedList;
27  import java.util.List;
28  import java.util.concurrent.CancellationException;
29  import java.util.concurrent.ExecutionException;
30  import java.util.concurrent.ExecutorService;
31  import java.util.concurrent.TimeUnit;
32  import javax.naming.NamingException;
33  import javax.naming.Reference;
34  import javax.transaction.xa.XAResource;
35  import net.jcip.annotations.NotThreadSafe;
36  import net.jcip.annotations.ThreadSafe;
37  import org.modeshape.common.i18n.I18n;
38  import org.modeshape.common.util.CheckArg;
39  import org.modeshape.common.util.Logger;
40  import org.modeshape.graph.ExecutionContext;
41  import org.modeshape.graph.GraphI18n;
42  import org.modeshape.graph.Subgraph;
43  import org.modeshape.graph.cache.CachePolicy;
44  import org.modeshape.graph.connector.RepositoryConnection;
45  import org.modeshape.graph.connector.RepositoryConnectionFactory;
46  import org.modeshape.graph.connector.RepositoryContext;
47  import org.modeshape.graph.connector.RepositorySource;
48  import org.modeshape.graph.connector.RepositorySourceCapabilities;
49  import org.modeshape.graph.connector.RepositorySourceException;
50  import org.modeshape.graph.observe.Changes;
51  import org.modeshape.graph.observe.Observer;
52  import org.modeshape.graph.request.AccessQueryRequest;
53  import org.modeshape.graph.request.CompositeRequest;
54  import org.modeshape.graph.request.CompositeRequestChannel;
55  import org.modeshape.graph.request.FullTextSearchRequest;
56  import org.modeshape.graph.request.Request;
57  import org.modeshape.graph.request.processor.RequestProcessor;
58  
59  /**
60   * A {@link RepositorySource} implementation that can be used as a wrapper around another
61   * {@link RepositorySourceCapabilities#supportsSearches() non-searchable} or
62   * {@link RepositorySourceCapabilities#supportsQueries() non-querable} RepositorySource instance to provide search and query
63   * capability.
64   */
65  @ThreadSafe
66  public class SearchableRepositorySource implements RepositorySource {
67  
68      private static final long serialVersionUID = 1L;
69  
70      private final RepositorySource delegate;
71      private final boolean executeAsynchronously;
72      private final boolean updateIndexesAsynchronously;
73      private final transient ExecutorService executorService;
74      private final transient SearchEngine searchEngine;
75  
76      /**
77       * Create a new searchable and queryable RepositorySource around an instance that is neither.
78       * 
79       * @param wrapped the RepositorySource that is not searchable and queryable
80       * @param searchEngine the search engine that is to be used
81       * @param executorService the ExecutorService that should be used when submitting requests to the wrapped service; may be null
82       *        if all operations should be performed in the calling thread
83       * @param executeAsynchronously true if an {@link ExecutorService} is provided and the requests to the wrapped source are to
84       *        be executed asynchronously
85       * @param updateIndexesAsynchronously true if an {@link ExecutorService} is provided and the indexes are to be updated in a
86       *        different thread than the thread executing the {@link RepositoryConnection#execute(ExecutionContext, Request)}
87       *        calls.
88       */
89      public SearchableRepositorySource( RepositorySource wrapped,
90                                         SearchEngine searchEngine,
91                                         ExecutorService executorService,
92                                         boolean executeAsynchronously,
93                                         boolean updateIndexesAsynchronously ) {
94          CheckArg.isNotNull(wrapped, "wrapped");
95          CheckArg.isNotNull(searchEngine, "searchEngine");
96          this.delegate = wrapped;
97          this.executorService = executorService;
98          this.searchEngine = searchEngine;
99          this.updateIndexesAsynchronously = this.executorService != null && updateIndexesAsynchronously;
100         this.executeAsynchronously = this.executorService != null && executeAsynchronously;
101     }
102 
103     /**
104      * Create a new searchable and queryable RepositorySource around an instance that is neither. All of the request processing
105      * will be done in the calling thread, and updating the indexes will be done synchronously within the context of the
106      * {@link RepositoryConnection#execute(ExecutionContext, Request)} method (and obviously on the same thread). This means that
107      * the execution of the requests will not return until the indexes have been updated with any changes made by the requests.
108      * <p>
109      * This is equivalent to calling <code>new SearchableRepositorySource(wrapped,searchEngine,null,false)</code>
110      * </p>
111      * 
112      * @param wrapped the RepositorySource that is not searchable and queryable
113      * @param searchEngine the search engine that is to be used
114      */
115     public SearchableRepositorySource( RepositorySource wrapped,
116                                        SearchEngine searchEngine ) {
117         this(wrapped, searchEngine, null, false, false);
118     }
119 
120     /**
121      * {@inheritDoc}
122      * 
123      * @see org.modeshape.graph.connector.RepositorySource#getName()
124      */
125     public String getName() {
126         return delegate.getName();
127     }
128 
129     /**
130      * {@inheritDoc}
131      * 
132      * @see org.modeshape.graph.connector.RepositorySource#close()
133      */
134     public void close() {
135         this.delegate.close();
136     }
137 
138     /**
139      * {@inheritDoc}
140      * 
141      * @see org.modeshape.graph.connector.RepositorySource#getCapabilities()
142      */
143     public RepositorySourceCapabilities getCapabilities() {
144         // Return the capabilities of the source, except with search and query suppport enabled ...
145         return new RepositorySourceCapabilities(this.delegate.getCapabilities()) {
146             /**
147              * {@inheritDoc}
148              * 
149              * @see org.modeshape.graph.connector.RepositorySourceCapabilities#supportsQueries()
150              */
151             @Override
152             public boolean supportsQueries() {
153                 return true;
154             }
155 
156             /**
157              * {@inheritDoc}
158              * 
159              * @see org.modeshape.graph.connector.RepositorySourceCapabilities#supportsSearches()
160              */
161             @Override
162             public boolean supportsSearches() {
163                 return true;
164             }
165         };
166     }
167 
168     /**
169      * {@inheritDoc}
170      * 
171      * @see org.modeshape.graph.connector.RepositorySource#getConnection()
172      */
173     public RepositoryConnection getConnection() throws RepositorySourceException {
174         if (executeRequestsAsynchronously()) {
175             // Use the executor service ...
176             assert executorService != null;
177             return new ParallelConnection(executorService);
178         }
179         // We need to do the processing in this thread ...
180         return new SynchronousConnection();
181     }
182 
183     /**
184      * {@inheritDoc}
185      * 
186      * @see org.modeshape.graph.connector.RepositorySource#getRetryLimit()
187      */
188     public int getRetryLimit() {
189         return delegate.getRetryLimit();
190     }
191 
192     /**
193      * {@inheritDoc}
194      * 
195      * @see org.modeshape.graph.connector.RepositorySource#initialize(org.modeshape.graph.connector.RepositoryContext)
196      */
197     public void initialize( final RepositoryContext context ) throws RepositorySourceException {
198         final String delegateSourceName = delegate.getName();
199 
200         // The search engine will need a connection factory to the source, but the 'context' connection factory
201         // will point back to this wrapper. So make one ...
202         final RepositoryConnectionFactory originalConnectionFactory = context.getRepositoryConnectionFactory();
203         final RepositoryConnectionFactory connectionFactory = new RepositoryConnectionFactory() {
204             /**
205              * {@inheritDoc}
206              * 
207              * @see org.modeshape.graph.connector.RepositoryConnectionFactory#createConnection(java.lang.String)
208              */
209             public RepositoryConnection createConnection( String sourceName ) throws RepositorySourceException {
210                 if (delegateSourceName.equals(sourceName)) return delegate().getConnection();
211                 return originalConnectionFactory.createConnection(sourceName);
212             }
213         };
214 
215         // Create an observer so that we know what changes are being made in the delegate ...
216         final Observer observer = new Observer() {
217             /**
218              * {@inheritDoc}
219              * 
220              * @see org.modeshape.graph.observe.Observer#notify(org.modeshape.graph.observe.Changes)
221              */
222             public void notify( final Changes changes ) {
223                 if (changes != null) {
224                     if (updateIndexesAsynchronously()) {
225                         // Enqueue the changes in the delegate content ...
226                         executorService().submit(new Runnable() {
227                             public void run() {
228                                 process(context.getExecutionContext(), changes);
229                             }
230                         });
231                     } else {
232                         process(context.getExecutionContext(), changes);
233                     }
234                 }
235             }
236         };
237 
238         // Create a new RepositoryContext that uses our observer and connection factory ...
239         final RepositoryContext newContext = new RepositoryContext() {
240             /**
241              * {@inheritDoc}
242              * 
243              * @see org.modeshape.graph.connector.RepositoryContext#getConfiguration(int)
244              */
245             public Subgraph getConfiguration( int depth ) {
246                 return context.getConfiguration(depth);
247             }
248 
249             /**
250              * {@inheritDoc}
251              * 
252              * @see org.modeshape.graph.connector.RepositoryContext#getExecutionContext()
253              */
254             public ExecutionContext getExecutionContext() {
255                 return context.getExecutionContext();
256             }
257 
258             /**
259              * {@inheritDoc}
260              * 
261              * @see org.modeshape.graph.connector.RepositoryContext#getObserver()
262              */
263             public Observer getObserver() {
264                 return observer;
265             }
266 
267             /**
268              * {@inheritDoc}
269              * 
270              * @see org.modeshape.graph.connector.RepositoryContext#getRepositoryConnectionFactory()
271              */
272             public RepositoryConnectionFactory getRepositoryConnectionFactory() {
273                 return connectionFactory;
274             }
275         };
276 
277         // Now initialize the delegate with the delegate's context ...
278         delegate.initialize(newContext);
279     }
280 
281     protected final SearchEngine searchEngine() {
282         assert searchEngine != null;
283         return searchEngine;
284     }
285 
286     protected final boolean updateIndexesAsynchronously() {
287         return executorService != null && updateIndexesAsynchronously;
288     }
289 
290     protected final boolean executeRequestsAsynchronously() {
291         return executorService != null && executeAsynchronously;
292     }
293 
294     protected final ExecutorService executorService() {
295         assert executorService != null;
296         return executorService;
297     }
298 
299     protected final RepositorySource delegate() {
300         return this.delegate;
301     }
302 
303     /**
304      * Do the work of processing the changes and updating the {@link #searchEngine}. This method may be called while on one of the
305      * threads owned by the {@link #executorService executor service} (if {@link #updateIndexesAsynchronously()} returns true), or
306      * from the thread {@link RepositoryConnection#execute(ExecutionContext, org.modeshape.graph.request.Request) executing} the
307      * requests on the {@link #delegate} (if {@link #updateIndexesAsynchronously()} returns false).
308      * 
309      * @param context the execution context in which the indexes should be updated
310      * @param changes the changes; never null
311      */
312     protected void process( ExecutionContext context,
313                             Changes changes ) {
314         assert context != null;
315         assert changes != null;
316         if (searchEngine == null) return; // null only after deserialization ...
317         // Obtain a request processor
318         searchEngine.index(context, changes.getChangeRequests());
319     }
320 
321     /**
322      * {@inheritDoc}
323      * 
324      * @see org.modeshape.graph.connector.RepositorySource#setRetryLimit(int)
325      */
326     public void setRetryLimit( int limit ) {
327         delegate.setRetryLimit(limit);
328     }
329 
330     /**
331      * {@inheritDoc}
332      * 
333      * @see javax.naming.Referenceable#getReference()
334      */
335     public Reference getReference() throws NamingException {
336         return delegate.getReference();
337     }
338 
339     @NotThreadSafe
340     protected abstract class AbstractConnection implements RepositoryConnection {
341         private RepositoryConnection delegateConnection;
342 
343         protected AbstractConnection() {
344         }
345 
346         protected RepositoryConnection delegateConnection() {
347             if (delegateConnection == null) {
348                 delegateConnection = delegate().getConnection();
349             }
350             return delegateConnection;
351         }
352 
353         /**
354          * {@inheritDoc}
355          * 
356          * @see org.modeshape.graph.connector.RepositoryConnection#ping(long, java.util.concurrent.TimeUnit)
357          */
358         public boolean ping( long time,
359                              TimeUnit unit ) throws InterruptedException {
360             return delegateConnection().ping(time, unit);
361         }
362 
363         /**
364          * {@inheritDoc}
365          * 
366          * @see org.modeshape.graph.connector.RepositoryConnection#getDefaultCachePolicy()
367          */
368         public CachePolicy getDefaultCachePolicy() {
369             return delegateConnection().getDefaultCachePolicy();
370         }
371 
372         /**
373          * {@inheritDoc}
374          * 
375          * @see org.modeshape.graph.connector.RepositoryConnection#getSourceName()
376          */
377         public String getSourceName() {
378             return delegate().getName();
379         }
380 
381         /**
382          * {@inheritDoc}
383          * 
384          * @see org.modeshape.graph.connector.RepositoryConnection#getXAResource()
385          */
386         public XAResource getXAResource() {
387             return delegateConnection().getXAResource();
388         }
389 
390         /**
391          * {@inheritDoc}
392          * 
393          * @see org.modeshape.graph.connector.RepositoryConnection#close()
394          */
395         public void close() {
396             if (delegateConnection != null) {
397                 try {
398                     delegateConnection.close();
399                 } finally {
400                     delegateConnection = null;
401                 }
402             }
403         }
404     }
405 
406     /**
407      * A {@link RepositoryConnection} implementation that calls the delegate processor in a background thread, allowing the
408      * processing of the {@link FullTextSearchRequest} and {@link AccessQueryRequest} objects to be done in this thread and in
409      * parallel with other requests.
410      */
411     @NotThreadSafe
412     protected class ParallelConnection extends AbstractConnection {
413         private final ExecutorService executorService;
414 
415         protected ParallelConnection( ExecutorService executorService ) {
416             this.executorService = executorService;
417         }
418 
419         /**
420          * {@inheritDoc}
421          * 
422          * @see org.modeshape.graph.connector.RepositoryConnection#execute(org.modeshape.graph.ExecutionContext,
423          *      org.modeshape.graph.request.Request)
424          */
425         public void execute( ExecutionContext context,
426                              Request request ) throws RepositorySourceException {
427             RequestProcessor searchProcessor = null;
428 
429             switch (request.getType()) {
430                 case ACCESS_QUERY:
431                     AccessQueryRequest queryRequest = (AccessQueryRequest)request;
432                     searchProcessor = searchEngine().createProcessor(context, null, true);
433                     try {
434                         searchProcessor.process(queryRequest);
435                     } finally {
436                         searchProcessor.close();
437                     }
438                     break;
439                 case FULL_TEXT_SEARCH:
440                     FullTextSearchRequest searchRequest = (FullTextSearchRequest)request;
441                     searchProcessor = searchEngine().createProcessor(context, null, true);
442                     try {
443                         searchProcessor.process(searchRequest);
444                     } finally {
445                         searchProcessor.close();
446                     }
447                     break;
448                 case COMPOSITE:
449                     CompositeRequest composite = (CompositeRequest)request;
450                     CompositeRequestChannel channel = null;
451                     try {
452                         for (Request nested : composite) {
453                             switch (nested.getType()) {
454                                 case ACCESS_QUERY:
455                                     queryRequest = (AccessQueryRequest)request;
456                                     if (searchProcessor == null) {
457                                         searchProcessor = searchEngine().createProcessor(context, null, true);
458                                     }
459                                     searchProcessor.process(queryRequest);
460                                     break;
461                                 case FULL_TEXT_SEARCH:
462                                     searchRequest = (FullTextSearchRequest)request;
463                                     if (searchProcessor == null) {
464                                         searchProcessor = searchEngine().createProcessor(context, null, true);
465                                     }
466                                     searchProcessor.process(searchRequest);
467                                     break;
468                                 default:
469                                     // Delegate to the channel ...
470                                     if (channel == null) {
471                                         // Create a connection factory that always returns the delegate connection ...
472                                         RepositoryConnectionFactory connectionFactory = new RepositoryConnectionFactory() {
473                                             /**
474                                              * {@inheritDoc}
475                                              * 
476                                              * @see org.modeshape.graph.connector.RepositoryConnectionFactory#createConnection(java.lang.String)
477                                              */
478                                             public RepositoryConnection createConnection( String sourceName )
479                                                 throws RepositorySourceException {
480                                                 assert delegate().getName().equals(sourceName);
481                                                 return delegateConnection();
482                                             }
483                                         };
484                                         channel = new CompositeRequestChannel(delegate().getName());
485                                         channel.start(executorService, context, connectionFactory);
486                                     }
487                                     channel.add(request);
488 
489                             }
490                         }
491                     } finally {
492                         try {
493                             if (searchProcessor != null) {
494                                 searchProcessor.close();
495                             }
496                         } finally {
497                             if (channel != null) {
498                                 try {
499                                     channel.close();
500                                 } finally {
501                                     try {
502                                         channel.await();
503                                     } catch (CancellationException err) {
504                                         composite.cancel();
505                                     } catch (ExecutionException err) {
506                                         composite.setError(err);
507                                     } catch (InterruptedException err) {
508                                         // Reset the thread ...
509                                         Thread.interrupted();
510                                         // Then log the message ...
511                                         I18n msg = GraphI18n.interruptedWhileClosingChannel;
512                                         Logger.getLogger(getClass()).warn(err, msg, delegate().getName());
513                                         composite.setError(err);
514                                     }
515                                 }
516                             }
517                         }
518                     }
519                     break;
520                 default:
521                     delegateConnection().execute(context, request);
522 
523             }
524         }
525     }
526 
527     /**
528      * A {@link RepositoryConnection} implementation that calls the delegate processor in the calling thread.
529      */
530     @NotThreadSafe
531     protected class SynchronousConnection extends AbstractConnection {
532 
533         protected SynchronousConnection() {
534         }
535 
536         /**
537          * {@inheritDoc}
538          * 
539          * @see org.modeshape.graph.connector.RepositoryConnection#execute(org.modeshape.graph.ExecutionContext,
540          *      org.modeshape.graph.request.Request)
541          */
542         public void execute( final ExecutionContext context,
543                              final Request request ) throws RepositorySourceException {
544             RequestProcessor searchProcessor = null;
545 
546             switch (request.getType()) {
547                 case ACCESS_QUERY:
548                     AccessQueryRequest queryRequest = (AccessQueryRequest)request;
549                     searchProcessor = searchEngine().createProcessor(context, null, true);
550                     try {
551                         searchProcessor.process(queryRequest);
552                     } finally {
553                         searchProcessor.close();
554                     }
555                     break;
556                 case FULL_TEXT_SEARCH:
557                     FullTextSearchRequest searchRequest = (FullTextSearchRequest)request;
558                     searchProcessor = searchEngine().createProcessor(context, null, true);
559                     try {
560                         searchProcessor.process(searchRequest);
561                     } finally {
562                         searchProcessor.close();
563                     }
564                     break;
565                 case COMPOSITE:
566                     CompositeRequest composite = (CompositeRequest)request;
567                     List<Request> delegateRequests = null;
568                     try {
569                         Request delegateRequest = composite;
570                         for (Request nested : composite) {
571                             switch (nested.getType()) {
572                                 case ACCESS_QUERY:
573                                     queryRequest = (AccessQueryRequest)request;
574                                     if (searchProcessor == null) {
575                                         searchProcessor = searchEngine().createProcessor(context, null, true);
576                                     }
577                                     searchProcessor.process(queryRequest);
578                                     delegateRequest = null;
579                                     break;
580                                 case FULL_TEXT_SEARCH:
581                                     searchRequest = (FullTextSearchRequest)request;
582                                     if (searchProcessor == null) {
583                                         searchProcessor = searchEngine().createProcessor(context, null, true);
584                                     }
585                                     searchProcessor.process(searchRequest);
586                                     delegateRequest = null;
587                                     break;
588                                 default:
589                                     // Delegate the request ...
590                                     if (delegateRequests == null) {
591                                         delegateRequests = new LinkedList<Request>();
592                                     }
593                                     delegateRequests.add(request);
594 
595                             }
596                         }
597                         if (delegateRequest == null) {
598                             // Then there was at least one query or search request ...
599                             if (delegateRequests != null) {
600                                 // There was other requests ...
601                                 assert !delegateRequests.isEmpty();
602                                 delegateRequest = CompositeRequest.with(delegateRequests);
603                                 delegateConnection().execute(context, delegateRequest);
604                             } else {
605                                 // There were no other requests in the composite other than the search and/or query requests ...
606                                 // So nothing to do ...
607                             }
608                         } else {
609                             // There were no search or query requests, so delegate the orginal composite request ...
610                             delegateConnection().execute(context, request);
611                         }
612                     } finally {
613                         if (searchProcessor != null) {
614                             searchProcessor.close();
615                         }
616                     }
617                     break;
618                 default:
619                     // Just a single, non-query and non-search request ...
620                     delegateConnection().execute(context, request);
621 
622             }
623         }
624     }
625 }