View Javadoc

1   /*
2    * ModeShape (http://www.modeshape.org)
3    * See the COPYRIGHT.txt file distributed with this work for information
4    * regarding copyright ownership.  Some portions may be licensed
5    * to Red Hat, Inc. under one or more contributor license agreements.
6    * See the AUTHORS.txt file in the distribution for a full listing of 
7    * individual contributors.
8    *
9    * ModeShape is free software. Unless otherwise indicated, all code in ModeShape
10   * is licensed to you under the terms of the GNU Lesser General Public License as
11   * published by the Free Software Foundation; either version 2.1 of
12   * the License, or (at your option) any later version.
13   * 
14   * ModeShape is distributed in the hope that it will be useful,
15   * but WITHOUT ANY WARRANTY; without even the implied warranty of
16   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17   * Lesser General Public License for more details.
18   *
19   * You should have received a copy of the GNU Lesser General Public
20   * License along with this software; if not, write to the Free
21   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
22   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
23   */
24  package org.modeshape.graph.request;
25  
26  import java.util.Iterator;
27  import java.util.LinkedList;
28  import java.util.List;
29  import java.util.NoSuchElementException;
30  import java.util.concurrent.BlockingQueue;
31  import java.util.concurrent.Callable;
32  import java.util.concurrent.CancellationException;
33  import java.util.concurrent.CountDownLatch;
34  import java.util.concurrent.ExecutionException;
35  import java.util.concurrent.ExecutorService;
36  import java.util.concurrent.Future;
37  import java.util.concurrent.LinkedBlockingQueue;
38  import java.util.concurrent.atomic.AtomicBoolean;
39  import org.modeshape.graph.ExecutionContext;
40  import org.modeshape.graph.GraphI18n;
41  import org.modeshape.graph.connector.RepositoryConnection;
42  import org.modeshape.graph.connector.RepositoryConnectionFactory;
43  import org.modeshape.graph.request.processor.RequestProcessor;
44  
45  /**
46   * A channel for Request objects that can be submitted to a consumer (typically a {@link RequestProcessor} or
47   * {@link RepositoryConnection}) while allowing the channel owner to continue adding more Request objects into the channel.
48   * <p>
49   * The owner of this channel is responsible for starting the processing using one of the two <code>start(...)</code> methods,
50   * adding {@link Request}s via <code>add(...)</code> methods, {@link #close() closing} the channel when there are no more requests
51   * to be added, and finally {@link #await() awaiting} until all of the submitted requests have been processed. Note that the owner
52   * can optionally pre-fill the channel with Request objects before calling <code>start(...)</code>.
53   * </p>
54   * <p>
55   * The consumer will be handed a {@link CompositeRequest}, and should use the {@link CompositeRequest#iterator()} method to obtain
56   * an Iterator&lt;Request>. The {@link Iterator#hasNext()} method will block until there is another Request available in the
57   * channel, or until the channel is closed (at which point {@link Iterator#hasNext()} will return false. (Notice that the
58   * {@link Iterator#next()} method will also block if it is not preceeded by a {@link Iterator#hasNext()}, but will throw a
59   * {@link NoSuchElementException} when there are no more Request objects and the channel is closed.)
60   * </p>
61   * <p>
62   * Because the CompositeRequest's iterator will block, the consumer will block while processing the request. Therefore, this
63   * channel submits the CompositeRequest to the consumer asynchronously, via an {@link ExecutorService} supplied in one of the two
64   * {@link #start(ExecutorService, ExecutionContext, RepositoryConnectionFactory) start}
65   * {@link #start(ExecutorService, RequestProcessor, boolean) methods}.
66   * </p>
67   */
68  public class CompositeRequestChannel {
69  
70      protected final String sourceName;
71      /** The list of all requests that are or have been processed as part of this channel */
72      protected final LinkedList<Request> allRequests = new LinkedList<Request>();
73      /** The queue of requests that remain unprocessed */
74      private final BlockingQueue<Request> queue = new LinkedBlockingQueue<Request>();
75      /** The CompositeRequest that is submitted to the underlying processor */
76      protected final CompositeRequest composite;
77      /**
78       * The Future that is submitted to the ExecutorService to do the processing, which is used to {@link #await()} until the
79       * processing is completed or {@link #cancel(boolean) cancel} the work
80       */
81      protected Future<String> future;
82      /** Flag that defines whether the channel has processed all requests */
83      protected final AtomicBoolean closed = new AtomicBoolean(false);
84      protected Throwable compositeError = null;
85  
86      /**
87       * Create a new channel with the supplied channel name.
88       * 
89       * @param sourceName the name of the repository source used to execute this channel's {@link #allRequests() requests}; may not
90       *        be null or empty
91       */
92      public CompositeRequestChannel( final String sourceName ) {
93          assert sourceName != null;
94          this.sourceName = sourceName;
95          this.composite = new ChannelCompositeRequest();
96      }
97  
98      /**
99       * Utility method to create an iterator over the requests in this channel. This really should be called once
100      * 
101      * @return the iterator over the channels
102      */
103     protected Iterator<Request> createIterator() {
104         final BlockingQueue<Request> queue = this.queue;
105         return new Iterator<Request>() {
106             private Request next;
107 
108             public boolean hasNext() {
109                 // If next still has a request, then 'hasNext()' has been called multiple times in a row
110                 if (next != null) return true;
111 
112                 // Now, block for a next item (this blocks) ...
113                 try {
114                     next = queue.take();
115                 } catch (InterruptedException e) {
116                     // This happens when the federated connector has been told to shutdown now, and it shuts down
117                     // its executor (the worker pool) immediately by interrupting each in-use thread.
118                     // In this case, we should consider there to be more more requests ...
119                     try {
120                         return false;
121                     } finally {
122                         // reset the interrupted status ...
123                         Thread.interrupted();
124                     }
125                 }
126                 if (RequestType.LAST == next.getType()) {
127                     return false;
128                 }
129                 return next != null;
130             }
131 
132             public Request next() {
133                 if (next == null) {
134                     // Must have been called without first calling 'hasNext()' ...
135                     try {
136                         next = queue.take();
137                     } catch (InterruptedException e) {
138                         // This happens when the federated connector has been told to shutdown now, and it shuts down
139                         // its executor (the worker pool) immediately by interrupting each in-use thread.
140                         // In this case, we should consider there to be more more requests (again, this case
141                         // is when 'next()' has been called without calling 'hasNext()') ...
142                         try {
143                             throw new NoSuchElementException();
144                         } finally {
145                             // reset the interrupted status ...
146                             Thread.interrupted();
147                         }
148                     }
149                 }
150                 if (next == null) {
151                     throw new NoSuchElementException();
152                 }
153                 Request result = next;
154                 next = null;
155                 return result;
156             }
157 
158             public void remove() {
159                 throw new UnsupportedOperationException();
160             }
161         };
162     }
163 
164     /**
165      * Begins processing any requests that have been {@link #add(Request) added} to this channel. Processing is done by submitting
166      * the channel to the supplied executor.
167      * 
168      * @param executor the executor that is to do the work; may not be null
169      * @param context the execution context in which the work is to be performed; may not be null
170      * @param connectionFactory the connection factory that should be used to create connections; may not be null
171      * @throws IllegalStateException if this channel has already been started
172      */
173     public void start( final ExecutorService executor,
174                        final ExecutionContext context,
175                        final RepositoryConnectionFactory connectionFactory ) {
176         assert executor != null;
177         assert context != null;
178         assert connectionFactory != null;
179         assert sourceName != null;
180         if (this.future != null) {
181             throw new IllegalStateException();
182         }
183         this.future = executor.submit(new Callable<String>() {
184             /**
185              * {@inheritDoc}
186              * 
187              * @see java.util.concurrent.Callable#call()
188              */
189             public String call() throws Exception {
190                 final RepositoryConnection connection = connectionFactory.createConnection(sourceName);
191                 assert connection != null;
192                 try {
193                     connection.execute(context, composite);
194                 } finally {
195                     connection.close();
196                 }
197                 return sourceName;
198             }
199         });
200     }
201 
202     /**
203      * Begins processing any requests that have been {@link #add(Request) added} to this channel. Processing is done by submitting
204      * the channel to the supplied executor.
205      * 
206      * @param executor the executor that is to do the work; may not be null
207      * @param processor the request processor that will be used to execute the requests; may not be null
208      * @param closeProcessorWhenCompleted true if this method should call {@link RequestProcessor#close()} when the channel is
209      *        completed, or false if the caller is responsible for doing this
210      * @throws IllegalStateException if this channel has already been started
211      */
212     public void start( final ExecutorService executor,
213                        final RequestProcessor processor,
214                        final boolean closeProcessorWhenCompleted ) {
215         assert executor != null;
216         assert processor != null;
217         if (this.future != null) {
218             throw new IllegalStateException();
219         }
220         this.future = executor.submit(new Callable<String>() {
221             /**
222              * {@inheritDoc}
223              * 
224              * @see java.util.concurrent.Callable#call()
225              */
226             public String call() throws Exception {
227                 try {
228                     processor.process(composite);
229                 } finally {
230                     if (closeProcessorWhenCompleted) processor.close();
231                 }
232                 return sourceName;
233             }
234         });
235     }
236 
237     /**
238      * Add the request to this channel for asynchronous processing.
239      * 
240      * @param request the request to be submitted; may not be null
241      * @throws IllegalStateException if this channel has already been {@link #close() closed}
242      */
243     public void add( Request request ) {
244         if (closed.get()) {
245             throw new IllegalStateException(GraphI18n.unableToAddRequestToChannelThatIsDone.text(sourceName, request));
246         }
247         assert request != null;
248         this.allRequests.add(request);
249         this.queue.add(request);
250     }
251 
252     /**
253      * Add the request to this channel for asynchronous processing, and supply a {@link CountDownLatch count-down latch} that
254      * should be {@link CountDownLatch#countDown() decremented} when this request is completed.
255      * 
256      * @param request the request to be submitted; may not be null
257      * @param latch the count-down latch that should be decremented when <code>request</code> has been completed; may not be null
258      * @return the same latch that was supplied, for method chaining purposes; never null
259      * @throws IllegalStateException if this channel has already been {@link #close() closed}
260      */
261     public CountDownLatch add( Request request,
262                                CountDownLatch latch ) {
263         if (closed.get()) {
264             throw new IllegalStateException(GraphI18n.unableToAddRequestToChannelThatIsDone.text(sourceName, request));
265         }
266         assert request != null;
267         assert latch != null;
268         // Submit the request for processing ...
269         this.allRequests.add(request);
270         request.setLatchForFreezing(latch);
271         this.queue.add(request);
272         return latch;
273     }
274 
275     /**
276      * Add the request to this channel for processing, but wait to return until the request has been processed.
277      * 
278      * @param request the request to be submitted; may not be null
279      * @throws InterruptedException if the current thread is interrupted while waiting
280      */
281     public void addAndAwait( Request request ) throws InterruptedException {
282         // Add the request with a latch, then block until the request has completed ...
283         add(request, new CountDownLatch(1)).await();
284     }
285 
286     /**
287      * Mark this source as having no more requests to process.
288      */
289     public void close() {
290         if (this.closed.compareAndSet(false, true)) {
291             this.queue.add(new LastRequest());
292         }
293     }
294 
295     /**
296      * Return whether this channel has been {@link #close() closed}.
297      * 
298      * @return true if the channel was marked as done, or false otherwise
299      */
300     public boolean isClosed() {
301         return closed.get();
302     }
303 
304     /**
305      * Cancel this forked channel, stopping work as soon as possible. If the channel has not yet been started, this method
306      * 
307      * @param mayInterruptIfRunning true if the channel is still being worked on, and the thread on which its being worked on may
308      *        be interrupted, or false if the channel should be allowed to finish if it is already in work.
309      */
310     public void cancel( boolean mayInterruptIfRunning ) {
311         if (this.future == null || this.future.isDone() || this.future.isCancelled()) return;
312 
313         // Mark the composite as cancelled first, so that the next composed request will be marked as
314         // cancelled.
315         this.composite.cancel();
316 
317         // Now mark the channel as being done ...
318         close();
319 
320         // Now, mark the channel as being cancelled (do allow interrupting the worker thread) ...
321         this.future.cancel(mayInterruptIfRunning);
322     }
323 
324     /**
325      * Return whether this channel has been {@link #start(ExecutorService, ExecutionContext, RepositoryConnectionFactory) started}
326      * .
327      * 
328      * @return true if this channel was started, or false otherwise
329      */
330     public boolean isStarted() {
331         return this.future != null;
332     }
333 
334     /**
335      * Return whether this channel has completed all of its work.
336      * 
337      * @return true if the channel was started and is complete, or false otherwise
338      */
339     public boolean isComplete() {
340         return this.future != null && this.future.isDone();
341     }
342 
343     /**
344      * Await until this channel has completed.
345      * 
346      * @throws CancellationException if the channel was cancelled
347      * @throws ExecutionException if the channel execution threw an exception
348      * @throws InterruptedException if the current thread is interrupted while waiting
349      */
350     public void await() throws ExecutionException, InterruptedException, CancellationException {
351         this.future.get();
352     }
353 
354     /**
355      * Get all the requests that were submitted to this queue. The resulting list is the actual list that is appended when
356      * requests are added, and may change until the channel is {@link #close() closed}.
357      * 
358      * @return all of the requests that were submitted to this channel; never null
359      */
360     public List<Request> allRequests() {
361         return allRequests;
362     }
363 
364     /**
365      * Get the name of the source that this channel uses.
366      * 
367      * @return the source name; never null
368      */
369     public String sourceName() {
370         return sourceName;
371     }
372 
373     protected class ChannelCompositeRequest extends CompositeRequest {
374         private static final long serialVersionUID = 1L;
375         private final LinkedList<Request> allRequests = CompositeRequestChannel.this.allRequests;
376 
377         protected ChannelCompositeRequest() {
378             super(false);
379         }
380 
381         /**
382          * {@inheritDoc}
383          * 
384          * @see org.modeshape.graph.request.CompositeRequest#iterator()
385          */
386         @Override
387         public Iterator<Request> iterator() {
388             return createIterator();
389         }
390 
391         /**
392          * {@inheritDoc}
393          * 
394          * @see org.modeshape.graph.request.CompositeRequest#getRequests()
395          */
396         @Override
397         public List<Request> getRequests() {
398             return allRequests;
399         }
400 
401         /**
402          * {@inheritDoc}
403          * 
404          * @see org.modeshape.graph.request.CompositeRequest#size()
405          */
406         @Override
407         public int size() {
408             return closed.get() ? allRequests.size() : CompositeRequest.UNKNOWN_NUMBER_OF_REQUESTS;
409         }
410 
411         /**
412          * {@inheritDoc}
413          * 
414          * @see org.modeshape.graph.request.Request#cancel()
415          */
416         @Override
417         public void cancel() {
418             closed.set(true);
419         }
420 
421         /**
422          * {@inheritDoc}
423          * 
424          * @see org.modeshape.graph.request.Request#setError(java.lang.Throwable)
425          */
426         @Override
427         public void setError( Throwable error ) {
428             compositeError = error;
429             super.setError(error);
430         }
431 
432         /**
433          * {@inheritDoc}
434          * 
435          * @see org.modeshape.graph.request.Request#hasError()
436          */
437         @Override
438         public boolean hasError() {
439             return compositeError != null || super.hasError();
440         }
441     }
442 
443     /**
444      * A psuedo Request that is used by the {@link CompositeRequestChannel} to insert into a request queue so that the queue's
445      * iterator knows when there are no more requests to process.
446      */
447     protected static class LastRequest extends Request {
448         private static final long serialVersionUID = 1L;
449 
450         @Override
451         public boolean isReadOnly() {
452             return false;
453         }
454 
455         @Override
456         public RequestType getType() {
457             return RequestType.LAST;
458         }
459 
460     }
461 }