View Javadoc

1   /*
2    * ModeShape (http://www.modeshape.org)
3    * See the COPYRIGHT.txt file distributed with this work for information
4    * regarding copyright ownership.  Some portions may be licensed
5    * to Red Hat, Inc. under one or more contributor license agreements.
6    * See the AUTHORS.txt file in the distribution for a full listing of 
7    * individual contributors.
8    *
9    * ModeShape is free software. Unless otherwise indicated, all code in ModeShape
10   * is licensed to you under the terms of the GNU Lesser General Public License as
11   * published by the Free Software Foundation; either version 2.1 of
12   * the License, or (at your option) any later version.
13   * 
14   * ModeShape is distributed in the hope that it will be useful,
15   * but WITHOUT ANY WARRANTY; without even the implied warranty of
16   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17   * Lesser General Public License for more details.
18   *
19   * You should have received a copy of the GNU Lesser General Public
20   * License along with this software; if not, write to the Free
21   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
22   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
23   */
24  package org.modeshape.graph.search;
25  
26  import java.util.LinkedList;
27  import java.util.List;
28  import java.util.concurrent.CancellationException;
29  import java.util.concurrent.ExecutionException;
30  import java.util.concurrent.ExecutorService;
31  import java.util.concurrent.TimeUnit;
32  import javax.naming.NamingException;
33  import javax.naming.Reference;
34  import javax.transaction.xa.XAResource;
35  import net.jcip.annotations.NotThreadSafe;
36  import org.modeshape.common.i18n.I18n;
37  import org.modeshape.common.util.CheckArg;
38  import org.modeshape.common.util.Logger;
39  import org.modeshape.graph.ExecutionContext;
40  import org.modeshape.graph.GraphI18n;
41  import org.modeshape.graph.Subgraph;
42  import org.modeshape.graph.cache.CachePolicy;
43  import org.modeshape.graph.connector.RepositoryConnection;
44  import org.modeshape.graph.connector.RepositoryConnectionFactory;
45  import org.modeshape.graph.connector.RepositoryContext;
46  import org.modeshape.graph.connector.RepositorySource;
47  import org.modeshape.graph.connector.RepositorySourceCapabilities;
48  import org.modeshape.graph.connector.RepositorySourceException;
49  import org.modeshape.graph.observe.Changes;
50  import org.modeshape.graph.observe.Observer;
51  import org.modeshape.graph.request.AccessQueryRequest;
52  import org.modeshape.graph.request.CompositeRequest;
53  import org.modeshape.graph.request.CompositeRequestChannel;
54  import org.modeshape.graph.request.FullTextSearchRequest;
55  import org.modeshape.graph.request.Request;
56  import org.modeshape.graph.request.processor.RequestProcessor;
57  
58  /**
59   * A {@link RepositorySource} implementation that can be used as a wrapper around another
60   * {@link RepositorySourceCapabilities#supportsSearches() non-searchable} or
61   * {@link RepositorySourceCapabilities#supportsQueries() non-querable} RepositorySource instance to provide search and query
62   * capability.
63   */
64  public class SearchableRepositorySource implements RepositorySource {
65  
66      private static final long serialVersionUID = 1L;
67  
68      private final RepositorySource delegate;
69      private final boolean executeAsynchronously;
70      private final boolean updateIndexesAsynchronously;
71      private final transient ExecutorService executorService;
72      private final transient SearchEngine searchEngine;
73  
74      /**
75       * Create a new searchable and queryable RepositorySource around an instance that is neither.
76       * 
77       * @param wrapped the RepositorySource that is not searchable and queryable
78       * @param searchEngine the search engine that is to be used
79       * @param executorService the ExecutorService that should be used when submitting requests to the wrapped service; may be null
80       *        if all operations should be performed in the calling thread
81       * @param executeAsynchronously true if an {@link ExecutorService} is provided and the requests to the wrapped source are to
82       *        be executed asynchronously
83       * @param updateIndexesAsynchronously true if an {@link ExecutorService} is provided and the indexes are to be updated in a
84       *        different thread than the thread executing the {@link RepositoryConnection#execute(ExecutionContext, Request)}
85       *        calls.
86       */
87      public SearchableRepositorySource( RepositorySource wrapped,
88                                         SearchEngine searchEngine,
89                                         ExecutorService executorService,
90                                         boolean executeAsynchronously,
91                                         boolean updateIndexesAsynchronously ) {
92          CheckArg.isNotNull(wrapped, "wrapped");
93          CheckArg.isNotNull(searchEngine, "searchEngine");
94          this.delegate = wrapped;
95          this.executorService = executorService;
96          this.searchEngine = searchEngine;
97          this.updateIndexesAsynchronously = this.executorService != null && updateIndexesAsynchronously;
98          this.executeAsynchronously = this.executorService != null && executeAsynchronously;
99      }
100 
101     /**
102      * Create a new searchable and queryable RepositorySource around an instance that is neither. All of the request processing
103      * will be done in the calling thread, and updating the indexes will be done synchronously within the context of the
104      * {@link RepositoryConnection#execute(ExecutionContext, Request)} method (and obviously on the same thread). This means that
105      * the execution of the requests will not return until the indexes have been updated with any changes made by the requests.
106      * <p>
107      * This is equivalent to calling <code>new SearchableRepositorySource(wrapped,searchEngine,null,false)</code>
108      * </p>
109      * 
110      * @param wrapped the RepositorySource that is not searchable and queryable
111      * @param searchEngine the search engine that is to be used
112      */
113     public SearchableRepositorySource( RepositorySource wrapped,
114                                        SearchEngine searchEngine ) {
115         this(wrapped, searchEngine, null, false, false);
116     }
117 
118     /**
119      * {@inheritDoc}
120      * 
121      * @see org.modeshape.graph.connector.RepositorySource#getName()
122      */
123     public String getName() {
124         return delegate.getName();
125     }
126 
127     /**
128      * {@inheritDoc}
129      * 
130      * @see org.modeshape.graph.connector.RepositorySource#close()
131      */
132     public void close() {
133         this.delegate.close();
134     }
135 
136     /**
137      * {@inheritDoc}
138      * 
139      * @see org.modeshape.graph.connector.RepositorySource#getCapabilities()
140      */
141     public RepositorySourceCapabilities getCapabilities() {
142         // Return the capabilities of the source, except with search and query suppport enabled ...
143         return new RepositorySourceCapabilities(this.delegate.getCapabilities()) {
144             /**
145              * {@inheritDoc}
146              * 
147              * @see org.modeshape.graph.connector.RepositorySourceCapabilities#supportsQueries()
148              */
149             @Override
150             public boolean supportsQueries() {
151                 return true;
152             }
153 
154             /**
155              * {@inheritDoc}
156              * 
157              * @see org.modeshape.graph.connector.RepositorySourceCapabilities#supportsSearches()
158              */
159             @Override
160             public boolean supportsSearches() {
161                 return true;
162             }
163         };
164     }
165 
166     /**
167      * {@inheritDoc}
168      * 
169      * @see org.modeshape.graph.connector.RepositorySource#getConnection()
170      */
171     public RepositoryConnection getConnection() throws RepositorySourceException {
172         if (executeRequestsAsynchronously()) {
173             // Use the executor service ...
174             assert executorService != null;
175             return new ParallelConnection(executorService);
176         }
177         // We need to do the processing in this thread ...
178         return new SynchronousConnection();
179     }
180 
181     /**
182      * {@inheritDoc}
183      * 
184      * @see org.modeshape.graph.connector.RepositorySource#getRetryLimit()
185      */
186     public int getRetryLimit() {
187         return delegate.getRetryLimit();
188     }
189 
190     /**
191      * {@inheritDoc}
192      * 
193      * @see org.modeshape.graph.connector.RepositorySource#initialize(org.modeshape.graph.connector.RepositoryContext)
194      */
195     public void initialize( final RepositoryContext context ) throws RepositorySourceException {
196         final String delegateSourceName = delegate.getName();
197 
198         // The search engine will need a connection factory to the source, but the 'context' connection factory
199         // will point back to this wrapper. So make one ...
200         final RepositoryConnectionFactory originalConnectionFactory = context.getRepositoryConnectionFactory();
201         final RepositoryConnectionFactory connectionFactory = new RepositoryConnectionFactory() {
202             /**
203              * {@inheritDoc}
204              * 
205              * @see org.modeshape.graph.connector.RepositoryConnectionFactory#createConnection(java.lang.String)
206              */
207             public RepositoryConnection createConnection( String sourceName ) throws RepositorySourceException {
208                 if (delegateSourceName.equals(sourceName)) return delegate().getConnection();
209                 return originalConnectionFactory.createConnection(sourceName);
210             }
211         };
212 
213         // Create an observer so that we know what changes are being made in the delegate ...
214         final Observer observer = new Observer() {
215             /**
216              * {@inheritDoc}
217              * 
218              * @see org.modeshape.graph.observe.Observer#notify(org.modeshape.graph.observe.Changes)
219              */
220             public void notify( final Changes changes ) {
221                 if (changes != null) {
222                     if (updateIndexesAsynchronously()) {
223                         // Enqueue the changes in the delegate content ...
224                         executorService().submit(new Runnable() {
225                             public void run() {
226                                 process(context.getExecutionContext(), changes);
227                             }
228                         });
229                     } else {
230                         process(context.getExecutionContext(), changes);
231                     }
232                 }
233             }
234         };
235 
236         // Create a new RepositoryContext that uses our observer and connection factory ...
237         final RepositoryContext newContext = new RepositoryContext() {
238             /**
239              * {@inheritDoc}
240              * 
241              * @see org.modeshape.graph.connector.RepositoryContext#getConfiguration(int)
242              */
243             public Subgraph getConfiguration( int depth ) {
244                 return context.getConfiguration(depth);
245             }
246 
247             /**
248              * {@inheritDoc}
249              * 
250              * @see org.modeshape.graph.connector.RepositoryContext#getExecutionContext()
251              */
252             public ExecutionContext getExecutionContext() {
253                 return context.getExecutionContext();
254             }
255 
256             /**
257              * {@inheritDoc}
258              * 
259              * @see org.modeshape.graph.connector.RepositoryContext#getObserver()
260              */
261             public Observer getObserver() {
262                 return observer;
263             }
264 
265             /**
266              * {@inheritDoc}
267              * 
268              * @see org.modeshape.graph.connector.RepositoryContext#getRepositoryConnectionFactory()
269              */
270             public RepositoryConnectionFactory getRepositoryConnectionFactory() {
271                 return connectionFactory;
272             }
273         };
274 
275         // Now initialize the delegate with the delegate's context ...
276         delegate.initialize(newContext);
277     }
278 
279     protected final SearchEngine searchEngine() {
280         assert searchEngine != null;
281         return searchEngine;
282     }
283 
284     protected final boolean updateIndexesAsynchronously() {
285         return executorService != null && updateIndexesAsynchronously;
286     }
287 
288     protected final boolean executeRequestsAsynchronously() {
289         return executorService != null && executeAsynchronously;
290     }
291 
292     protected final ExecutorService executorService() {
293         assert executorService != null;
294         return executorService;
295     }
296 
297     protected final RepositorySource delegate() {
298         return this.delegate;
299     }
300 
301     /**
302      * Do the work of processing the changes and updating the {@link #searchEngine}. This method may be called while on one of the
303      * threads owned by the {@link #executorService executor service} (if {@link #updateIndexesAsynchronously()} returns true), or
304      * from the thread {@link RepositoryConnection#execute(ExecutionContext, org.modeshape.graph.request.Request) executing} the
305      * requests on the {@link #delegate} (if {@link #updateIndexesAsynchronously()} returns false).
306      * 
307      * @param context the execution context in which the indexes should be updated
308      * @param changes the changes; never null
309      */
310     protected void process( ExecutionContext context,
311                             Changes changes ) {
312         assert context != null;
313         assert changes != null;
314         if (searchEngine == null) return; // null only after deserialization ...
315         // Obtain a request processor
316         searchEngine.index(context, changes.getChangeRequests());
317     }
318 
319     /**
320      * {@inheritDoc}
321      * 
322      * @see org.modeshape.graph.connector.RepositorySource#setRetryLimit(int)
323      */
324     public void setRetryLimit( int limit ) {
325         delegate.setRetryLimit(limit);
326     }
327 
328     /**
329      * {@inheritDoc}
330      * 
331      * @see javax.naming.Referenceable#getReference()
332      */
333     public Reference getReference() throws NamingException {
334         return delegate.getReference();
335     }
336 
337     @NotThreadSafe
338     protected abstract class AbstractConnection implements RepositoryConnection {
339         private RepositoryConnection delegateConnection;
340 
341         protected AbstractConnection() {
342         }
343 
344         protected RepositoryConnection delegateConnection() {
345             if (delegateConnection == null) {
346                 delegateConnection = delegate().getConnection();
347             }
348             return delegateConnection;
349         }
350 
351         /**
352          * {@inheritDoc}
353          * 
354          * @see org.modeshape.graph.connector.RepositoryConnection#ping(long, java.util.concurrent.TimeUnit)
355          */
356         public boolean ping( long time,
357                              TimeUnit unit ) throws InterruptedException {
358             return delegateConnection().ping(time, unit);
359         }
360 
361         /**
362          * {@inheritDoc}
363          * 
364          * @see org.modeshape.graph.connector.RepositoryConnection#getDefaultCachePolicy()
365          */
366         public CachePolicy getDefaultCachePolicy() {
367             return delegateConnection().getDefaultCachePolicy();
368         }
369 
370         /**
371          * {@inheritDoc}
372          * 
373          * @see org.modeshape.graph.connector.RepositoryConnection#getSourceName()
374          */
375         public String getSourceName() {
376             return delegate().getName();
377         }
378 
379         /**
380          * {@inheritDoc}
381          * 
382          * @see org.modeshape.graph.connector.RepositoryConnection#getXAResource()
383          */
384         public XAResource getXAResource() {
385             return delegateConnection().getXAResource();
386         }
387 
388         /**
389          * {@inheritDoc}
390          * 
391          * @see org.modeshape.graph.connector.RepositoryConnection#close()
392          */
393         public void close() {
394             if (delegateConnection != null) {
395                 try {
396                     delegateConnection.close();
397                 } finally {
398                     delegateConnection = null;
399                 }
400             }
401         }
402     }
403 
404     /**
405      * A {@link RepositoryConnection} implementation that calls the delegate processor in a background thread, allowing the
406      * processing of the {@link FullTextSearchRequest} and {@link AccessQueryRequest} objects to be done in this thread and in
407      * parallel with other requests.
408      */
409     @NotThreadSafe
410     protected class ParallelConnection extends AbstractConnection {
411         private final ExecutorService executorService;
412 
413         protected ParallelConnection( ExecutorService executorService ) {
414             this.executorService = executorService;
415         }
416 
417         /**
418          * {@inheritDoc}
419          * 
420          * @see org.modeshape.graph.connector.RepositoryConnection#execute(org.modeshape.graph.ExecutionContext,
421          *      org.modeshape.graph.request.Request)
422          */
423         public void execute( ExecutionContext context,
424                              Request request ) throws RepositorySourceException {
425             if (request instanceof AccessQueryRequest) {
426                 AccessQueryRequest queryRequest = (AccessQueryRequest)request;
427                 RequestProcessor searchProcessor = searchEngine().createProcessor(context, null, true);
428                 try {
429                     searchProcessor.process(queryRequest);
430                 } finally {
431                     searchProcessor.close();
432                 }
433             } else if (request instanceof FullTextSearchRequest) {
434                 FullTextSearchRequest searchRequest = (FullTextSearchRequest)request;
435                 RequestProcessor searchProcessor = searchEngine().createProcessor(context, null, true);
436                 try {
437                     searchProcessor.process(searchRequest);
438                 } finally {
439                     searchProcessor.close();
440                 }
441             } else if (request instanceof CompositeRequest) {
442                 CompositeRequest composite = (CompositeRequest)request;
443                 CompositeRequestChannel channel = null;
444                 RequestProcessor searchProcessor = null;
445                 try {
446                     for (Request nested : composite) {
447                         if (nested instanceof AccessQueryRequest) {
448                             AccessQueryRequest queryRequest = (AccessQueryRequest)request;
449                             if (searchProcessor == null) searchProcessor = searchEngine().createProcessor(context, null, true);
450                             searchProcessor.process(queryRequest);
451                         } else if (nested instanceof FullTextSearchRequest) {
452                             FullTextSearchRequest searchRequest = (FullTextSearchRequest)request;
453                             if (searchProcessor == null) searchProcessor = searchEngine().createProcessor(context, null, true);
454                             searchProcessor.process(searchRequest);
455                         } else {
456                             // Delegate to the channel ...
457                             if (channel == null) {
458                                 // Create a connection factory that always returns the delegate connection ...
459                                 RepositoryConnectionFactory connectionFactory = new RepositoryConnectionFactory() {
460                                     /**
461                                      * {@inheritDoc}
462                                      * 
463                                      * @see org.modeshape.graph.connector.RepositoryConnectionFactory#createConnection(java.lang.String)
464                                      */
465                                     public RepositoryConnection createConnection( String sourceName )
466                                         throws RepositorySourceException {
467                                         assert delegate().getName().equals(sourceName);
468                                         return delegateConnection();
469                                     }
470                                 };
471                                 channel = new CompositeRequestChannel(delegate().getName());
472                                 channel.start(executorService, context, connectionFactory);
473                             }
474                             channel.add(request);
475                         }
476                     }
477                 } finally {
478                     try {
479                         if (searchProcessor != null) {
480                             searchProcessor.close();
481                         }
482                     } finally {
483                         if (channel != null) {
484                             try {
485                                 channel.close();
486                             } finally {
487                                 try {
488                                     channel.await();
489                                 } catch (CancellationException err) {
490                                     composite.cancel();
491                                 } catch (ExecutionException err) {
492                                     composite.setError(err);
493                                 } catch (InterruptedException err) {
494                                     // Reset the thread ...
495                                     Thread.interrupted();
496                                     // Then log the message ...
497                                     I18n msg = GraphI18n.interruptedWhileClosingChannel;
498                                     Logger.getLogger(getClass()).warn(err, msg, delegate().getName());
499                                     composite.setError(err);
500                                 }
501                             }
502                         }
503                     }
504                 }
505             } else {
506                 // Just a single, non-query and non-search request ...
507                 delegateConnection().execute(context, request);
508             }
509         }
510     }
511 
512     /**
513      * A {@link RepositoryConnection} implementation that calls the delegate processor in the calling thread.
514      */
515     @NotThreadSafe
516     protected class SynchronousConnection extends AbstractConnection {
517 
518         protected SynchronousConnection() {
519         }
520 
521         /**
522          * {@inheritDoc}
523          * 
524          * @see org.modeshape.graph.connector.RepositoryConnection#execute(org.modeshape.graph.ExecutionContext,
525          *      org.modeshape.graph.request.Request)
526          */
527         public void execute( final ExecutionContext context,
528                              final Request request ) throws RepositorySourceException {
529             if (request instanceof AccessQueryRequest) {
530                 AccessQueryRequest queryRequest = (AccessQueryRequest)request;
531                 RequestProcessor searchProcessor = searchEngine().createProcessor(context, null, true);
532                 try {
533                     searchProcessor.process(queryRequest);
534                 } finally {
535                     searchProcessor.close();
536                 }
537             } else if (request instanceof FullTextSearchRequest) {
538                 FullTextSearchRequest searchRequest = (FullTextSearchRequest)request;
539                 RequestProcessor searchProcessor = searchEngine().createProcessor(context, null, true);
540                 try {
541                     searchProcessor.process(searchRequest);
542                 } finally {
543                     searchProcessor.close();
544                 }
545             } else if (request instanceof CompositeRequest) {
546                 CompositeRequest composite = (CompositeRequest)request;
547                 List<Request> delegateRequests = null;
548                 RequestProcessor searchProcessor = null;
549                 try {
550                     Request delegateRequest = composite;
551                     for (Request nested : composite) {
552                         if (nested instanceof AccessQueryRequest) {
553                             AccessQueryRequest queryRequest = (AccessQueryRequest)request;
554                             if (searchProcessor == null) searchProcessor = searchEngine().createProcessor(context, null, true);
555                             searchProcessor.process(queryRequest);
556                             delegateRequest = null;
557                         } else if (nested instanceof FullTextSearchRequest) {
558                             FullTextSearchRequest searchRequest = (FullTextSearchRequest)request;
559                             if (searchProcessor == null) searchProcessor = searchEngine().createProcessor(context, null, true);
560                             searchProcessor.process(searchRequest);
561                             delegateRequest = null;
562                         } else {
563                             // Delegate the request ...
564                             if (delegateRequests == null) {
565                                 delegateRequests = new LinkedList<Request>();
566                             }
567                             delegateRequests.add(request);
568                         }
569                     }
570                     if (delegateRequest == null) {
571                         // Then there was at least one query or search request ...
572                         if (delegateRequests != null) {
573                             // There was other requests ...
574                             assert !delegateRequests.isEmpty();
575                             delegateRequest = CompositeRequest.with(delegateRequests);
576                             delegateConnection().execute(context, delegateRequest);
577                         } else {
578                             // There were no other requests in the composite other than the search and/or query requests ...
579                             // So nothing to do ...
580                         }
581                     } else {
582                         // There were no search or query requests, so delegate the orginal composite request ...
583                         delegateConnection().execute(context, request);
584                     }
585                 } finally {
586                     if (searchProcessor != null) {
587                         searchProcessor.close();
588                     }
589                 }
590             } else {
591                 // Just a single, non-query and non-search request ...
592                 delegateConnection().execute(context, request);
593             }
594         }
595     }
596 }