1 /* 2 * ModeShape (http://www.modeshape.org) 3 * See the COPYRIGHT.txt file distributed with this work for information 4 * regarding copyright ownership. Some portions may be licensed 5 * to Red Hat, Inc. under one or more contributor license agreements. 6 * See the AUTHORS.txt file in the distribution for a full listing of 7 * individual contributors. 8 * 9 * ModeShape is free software. Unless otherwise indicated, all code in ModeShape 10 * is licensed to you under the terms of the GNU Lesser General Public License as 11 * published by the Free Software Foundation; either version 2.1 of 12 * the License, or (at your option) any later version. 13 * 14 * ModeShape is distributed in the hope that it will be useful, 15 * but WITHOUT ANY WARRANTY; without even the implied warranty of 16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 17 * Lesser General Public License for more details. 18 * 19 * You should have received a copy of the GNU Lesser General Public 20 * License along with this software; if not, write to the Free 21 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 22 * 02110-1301 USA, or see the FSF site: http://www.fsf.org. 23 */ 24 package org.modeshape.graph.request; 25 26 import java.util.Iterator; 27 import java.util.LinkedList; 28 import java.util.List; 29 import java.util.NoSuchElementException; 30 import java.util.concurrent.BlockingQueue; 31 import java.util.concurrent.Callable; 32 import java.util.concurrent.CancellationException; 33 import java.util.concurrent.CountDownLatch; 34 import java.util.concurrent.ExecutionException; 35 import java.util.concurrent.ExecutorService; 36 import java.util.concurrent.Future; 37 import java.util.concurrent.LinkedBlockingQueue; 38 import java.util.concurrent.atomic.AtomicBoolean; 39 import org.modeshape.graph.ExecutionContext; 40 import org.modeshape.graph.GraphI18n; 41 import org.modeshape.graph.connector.RepositoryConnection; 42 import org.modeshape.graph.connector.RepositoryConnectionFactory; 43 import org.modeshape.graph.request.processor.RequestProcessor; 44 45 /** 46 * A channel for Request objects that can be submitted to a consumer (typically a {@link RequestProcessor} or 47 * {@link RepositoryConnection}) while allowing the channel owner to continue adding more Request objects into the channel. 48 * <p> 49 * The owner of this channel is responsible for starting the processing using one of the two <code>start(...)</code> methods, 50 * adding {@link Request}s via <code>add(...)</code> methods, {@link #close() closing} the channel when there are no more requests 51 * to be added, and finally {@link #await() awaiting} until all of the submitted requests have been processed. Note that the owner 52 * can optionally pre-fill the channel with Request objects before calling <code>start(...)</code>. 53 * </p> 54 * <p> 55 * The consumer will be handed a {@link CompositeRequest}, and should use the {@link CompositeRequest#iterator()} method to obtain 56 * an Iterator<Request>. The {@link Iterator#hasNext()} method will block until there is another Request available in the 57 * channel, or until the channel is closed (at which point {@link Iterator#hasNext()} will return false. (Notice that the 58 * {@link Iterator#next()} method will also block if it is not preceeded by a {@link Iterator#hasNext()}, but will throw a 59 * {@link NoSuchElementException} when there are no more Request objects and the channel is closed.) 60 * </p> 61 * <p> 62 * Because the CompositeRequest's iterator will block, the consumer will block while processing the request. Therefore, this 63 * channel submits the CompositeRequest to the consumer asynchronously, via an {@link ExecutorService} supplied in one of the two 64 * {@link #start(ExecutorService, ExecutionContext, RepositoryConnectionFactory) start} 65 * {@link #start(ExecutorService, RequestProcessor, boolean) methods}. 66 * </p> 67 */ 68 public class CompositeRequestChannel { 69 70 protected final String sourceName; 71 /** The list of all requests that are or have been processed as part of this channel */ 72 protected final LinkedList<Request> allRequests = new LinkedList<Request>(); 73 /** The queue of requests that remain unprocessed */ 74 private final BlockingQueue<Request> queue = new LinkedBlockingQueue<Request>(); 75 /** The CompositeRequest that is submitted to the underlying processor */ 76 protected final CompositeRequest composite; 77 /** 78 * The Future that is submitted to the ExecutorService to do the processing, which is used to {@link #await()} until the 79 * processing is completed or {@link #cancel(boolean) cancel} the work 80 */ 81 protected Future<String> future; 82 /** Flag that defines whether the channel has processed all requests */ 83 protected final AtomicBoolean closed = new AtomicBoolean(false); 84 protected Throwable compositeError = null; 85 86 /** 87 * Create a new channel with the supplied channel name. 88 * 89 * @param sourceName the name of the repository source used to execute this channel's {@link #allRequests() requests}; may not 90 * be null or empty 91 */ 92 public CompositeRequestChannel( final String sourceName ) { 93 assert sourceName != null; 94 this.sourceName = sourceName; 95 this.composite = new ChannelCompositeRequest(); 96 } 97 98 /** 99 * Utility method to create an iterator over the requests in this channel. This really should be called once 100 * 101 * @return the iterator over the channels 102 */ 103 protected Iterator<Request> createIterator() { 104 final BlockingQueue<Request> queue = this.queue; 105 return new Iterator<Request>() { 106 private Request next; 107 108 public boolean hasNext() { 109 // If next still has a request, then 'hasNext()' has been called multiple times in a row 110 if (next != null) return true; 111 112 // Now, block for a next item (this blocks) ... 113 try { 114 next = queue.take(); 115 } catch (InterruptedException e) { 116 // This happens when the federated connector has been told to shutdown now, and it shuts down 117 // its executor (the worker pool) immediately by interrupting each in-use thread. 118 // In this case, we should consider there to be more more requests ... 119 try { 120 return false; 121 } finally { 122 // reset the interrupted status ... 123 Thread.interrupted(); 124 } 125 } 126 if (RequestType.LAST == next.getType()) { 127 return false; 128 } 129 return next != null; 130 } 131 132 public Request next() { 133 if (next == null) { 134 // Must have been called without first calling 'hasNext()' ... 135 try { 136 next = queue.take(); 137 } catch (InterruptedException e) { 138 // This happens when the federated connector has been told to shutdown now, and it shuts down 139 // its executor (the worker pool) immediately by interrupting each in-use thread. 140 // In this case, we should consider there to be more more requests (again, this case 141 // is when 'next()' has been called without calling 'hasNext()') ... 142 try { 143 throw new NoSuchElementException(); 144 } finally { 145 // reset the interrupted status ... 146 Thread.interrupted(); 147 } 148 } 149 } 150 if (next == null) { 151 throw new NoSuchElementException(); 152 } 153 Request result = next; 154 next = null; 155 return result; 156 } 157 158 public void remove() { 159 throw new UnsupportedOperationException(); 160 } 161 }; 162 } 163 164 /** 165 * Begins processing any requests that have been {@link #add(Request) added} to this channel. Processing is done by submitting 166 * the channel to the supplied executor. 167 * 168 * @param executor the executor that is to do the work; may not be null 169 * @param context the execution context in which the work is to be performed; may not be null 170 * @param connectionFactory the connection factory that should be used to create connections; may not be null 171 * @throws IllegalStateException if this channel has already been started 172 */ 173 public void start( final ExecutorService executor, 174 final ExecutionContext context, 175 final RepositoryConnectionFactory connectionFactory ) { 176 assert executor != null; 177 assert context != null; 178 assert connectionFactory != null; 179 assert sourceName != null; 180 if (this.future != null) { 181 throw new IllegalStateException(); 182 } 183 this.future = executor.submit(new Callable<String>() { 184 /** 185 * {@inheritDoc} 186 * 187 * @see java.util.concurrent.Callable#call() 188 */ 189 public String call() throws Exception { 190 final RepositoryConnection connection = connectionFactory.createConnection(sourceName); 191 assert connection != null; 192 try { 193 connection.execute(context, composite); 194 } finally { 195 connection.close(); 196 } 197 return sourceName; 198 } 199 }); 200 } 201 202 /** 203 * Begins processing any requests that have been {@link #add(Request) added} to this channel. Processing is done by submitting 204 * the channel to the supplied executor. 205 * 206 * @param executor the executor that is to do the work; may not be null 207 * @param processor the request processor that will be used to execute the requests; may not be null 208 * @param closeProcessorWhenCompleted true if this method should call {@link RequestProcessor#close()} when the channel is 209 * completed, or false if the caller is responsible for doing this 210 * @throws IllegalStateException if this channel has already been started 211 */ 212 public void start( final ExecutorService executor, 213 final RequestProcessor processor, 214 final boolean closeProcessorWhenCompleted ) { 215 assert executor != null; 216 assert processor != null; 217 if (this.future != null) { 218 throw new IllegalStateException(); 219 } 220 this.future = executor.submit(new Callable<String>() { 221 /** 222 * {@inheritDoc} 223 * 224 * @see java.util.concurrent.Callable#call() 225 */ 226 public String call() throws Exception { 227 try { 228 processor.process(composite); 229 } finally { 230 if (closeProcessorWhenCompleted) processor.close(); 231 } 232 return sourceName; 233 } 234 }); 235 } 236 237 /** 238 * Add the request to this channel for asynchronous processing. 239 * 240 * @param request the request to be submitted; may not be null 241 * @throws IllegalStateException if this channel has already been {@link #close() closed} 242 */ 243 public void add( Request request ) { 244 if (closed.get()) { 245 throw new IllegalStateException(GraphI18n.unableToAddRequestToChannelThatIsDone.text(sourceName, request)); 246 } 247 assert request != null; 248 this.allRequests.add(request); 249 this.queue.add(request); 250 } 251 252 /** 253 * Add the request to this channel for asynchronous processing, and supply a {@link CountDownLatch count-down latch} that 254 * should be {@link CountDownLatch#countDown() decremented} when this request is completed. 255 * 256 * @param request the request to be submitted; may not be null 257 * @param latch the count-down latch that should be decremented when <code>request</code> has been completed; may not be null 258 * @return the same latch that was supplied, for method chaining purposes; never null 259 * @throws IllegalStateException if this channel has already been {@link #close() closed} 260 */ 261 public CountDownLatch add( Request request, 262 CountDownLatch latch ) { 263 if (closed.get()) { 264 throw new IllegalStateException(GraphI18n.unableToAddRequestToChannelThatIsDone.text(sourceName, request)); 265 } 266 assert request != null; 267 assert latch != null; 268 // Submit the request for processing ... 269 this.allRequests.add(request); 270 request.setLatchForFreezing(latch); 271 this.queue.add(request); 272 return latch; 273 } 274 275 /** 276 * Add the request to this channel for processing, but wait to return until the request has been processed. 277 * 278 * @param request the request to be submitted; may not be null 279 * @throws InterruptedException if the current thread is interrupted while waiting 280 */ 281 public void addAndAwait( Request request ) throws InterruptedException { 282 // Add the request with a latch, then block until the request has completed ... 283 add(request, new CountDownLatch(1)).await(); 284 } 285 286 /** 287 * Mark this source as having no more requests to process. 288 */ 289 public void close() { 290 if (this.closed.compareAndSet(false, true)) { 291 this.queue.add(new LastRequest()); 292 } 293 } 294 295 /** 296 * Return whether this channel has been {@link #close() closed}. 297 * 298 * @return true if the channel was marked as done, or false otherwise 299 */ 300 public boolean isClosed() { 301 return closed.get(); 302 } 303 304 /** 305 * Cancel this forked channel, stopping work as soon as possible. If the channel has not yet been started, this method 306 * 307 * @param mayInterruptIfRunning true if the channel is still being worked on, and the thread on which its being worked on may 308 * be interrupted, or false if the channel should be allowed to finish if it is already in work. 309 */ 310 public void cancel( boolean mayInterruptIfRunning ) { 311 if (this.future == null || this.future.isDone() || this.future.isCancelled()) return; 312 313 // Mark the composite as cancelled first, so that the next composed request will be marked as 314 // cancelled. 315 this.composite.cancel(); 316 317 // Now mark the channel as being done ... 318 close(); 319 320 // Now, mark the channel as being cancelled (do allow interrupting the worker thread) ... 321 this.future.cancel(mayInterruptIfRunning); 322 } 323 324 /** 325 * Return whether this channel has been {@link #start(ExecutorService, ExecutionContext, RepositoryConnectionFactory) started} 326 * . 327 * 328 * @return true if this channel was started, or false otherwise 329 */ 330 public boolean isStarted() { 331 return this.future != null; 332 } 333 334 /** 335 * Return whether this channel has completed all of its work. 336 * 337 * @return true if the channel was started and is complete, or false otherwise 338 */ 339 public boolean isComplete() { 340 return this.future != null && this.future.isDone(); 341 } 342 343 /** 344 * Await until this channel has completed. 345 * 346 * @throws CancellationException if the channel was cancelled 347 * @throws ExecutionException if the channel execution threw an exception 348 * @throws InterruptedException if the current thread is interrupted while waiting 349 */ 350 public void await() throws ExecutionException, InterruptedException, CancellationException { 351 this.future.get(); 352 } 353 354 /** 355 * Get all the requests that were submitted to this queue. The resulting list is the actual list that is appended when 356 * requests are added, and may change until the channel is {@link #close() closed}. 357 * 358 * @return all of the requests that were submitted to this channel; never null 359 */ 360 public List<Request> allRequests() { 361 return allRequests; 362 } 363 364 /** 365 * Get the name of the source that this channel uses. 366 * 367 * @return the source name; never null 368 */ 369 public String sourceName() { 370 return sourceName; 371 } 372 373 protected class ChannelCompositeRequest extends CompositeRequest { 374 private static final long serialVersionUID = 1L; 375 private final LinkedList<Request> allRequests = CompositeRequestChannel.this.allRequests; 376 377 protected ChannelCompositeRequest() { 378 super(false); 379 } 380 381 /** 382 * {@inheritDoc} 383 * 384 * @see org.modeshape.graph.request.CompositeRequest#iterator() 385 */ 386 @Override 387 public Iterator<Request> iterator() { 388 return createIterator(); 389 } 390 391 /** 392 * {@inheritDoc} 393 * 394 * @see org.modeshape.graph.request.CompositeRequest#getRequests() 395 */ 396 @Override 397 public List<Request> getRequests() { 398 return allRequests; 399 } 400 401 /** 402 * {@inheritDoc} 403 * 404 * @see org.modeshape.graph.request.CompositeRequest#size() 405 */ 406 @Override 407 public int size() { 408 return closed.get() ? allRequests.size() : CompositeRequest.UNKNOWN_NUMBER_OF_REQUESTS; 409 } 410 411 /** 412 * {@inheritDoc} 413 * 414 * @see org.modeshape.graph.request.Request#cancel() 415 */ 416 @Override 417 public void cancel() { 418 closed.set(true); 419 } 420 421 /** 422 * {@inheritDoc} 423 * 424 * @see org.modeshape.graph.request.Request#setError(java.lang.Throwable) 425 */ 426 @Override 427 public void setError( Throwable error ) { 428 compositeError = error; 429 super.setError(error); 430 } 431 432 /** 433 * {@inheritDoc} 434 * 435 * @see org.modeshape.graph.request.Request#hasError() 436 */ 437 @Override 438 public boolean hasError() { 439 return compositeError != null || super.hasError(); 440 } 441 } 442 443 /** 444 * A psuedo Request that is used by the {@link CompositeRequestChannel} to insert into a request queue so that the queue's 445 * iterator knows when there are no more requests to process. 446 */ 447 protected static class LastRequest extends Request { 448 private static final long serialVersionUID = 1L; 449 450 @Override 451 public boolean isReadOnly() { 452 return false; 453 } 454 455 @Override 456 public RequestType getType() { 457 return RequestType.LAST; 458 } 459 460 } 461 }