View Javadoc

1   /*
2    * Copyright 2009 Red Hat, Inc.
3    *
4    * Red Hat licenses this file to you under the Apache License, version 2.0
5    * (the "License"); you may not use this file except in compliance with the
6    * License.  You may obtain a copy of the License at:
7    *
8    *    http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.util;
17  
18  import java.util.Collections;
19  import java.util.IdentityHashMap;
20  import java.util.List;
21  import java.util.Set;
22  import java.util.concurrent.AbstractExecutorService;
23  import java.util.concurrent.Executor;
24  import java.util.concurrent.ExecutorService;
25  import java.util.concurrent.RejectedExecutionException;
26  import java.util.concurrent.TimeUnit;
27  
28  import org.jboss.netty.channel.ChannelFactory;
29  import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
30  
31  
32  /**
33   * A delegating {@link ExecutorService} with its own termination management.
34   * <p>
35   * {@link VirtualExecutorService} is used when you want to inject an
36   * {@link ExecutorService} but you do not want to allow the explicit termination
37   * of threads on shutdown request.  It is particularly useful when the
38   * {@link ExecutorService} to inject is shared by different components and
39   * the life cycle of the components depend on the termination of the injected
40   * {@link ExecutorService}.
41   *
42   * <pre>
43   * ExecutorService globalExecutor = ...;
44   * ExecutorService virtualExecutor = new {@link VirtualExecutorService}(globalExecutor);
45   *
46   * {@link ChannelFactory} factory =
47   *         new {@link NioServerSocketChannelFactory}(virtualExecutor, virtualExecutor);
48   * ...
49   *
50   * // ChannelFactory.releaseExternalResources() shuts down the executor and
51   * // interrupts the I/O threads to terminate all I/O tasks and to release all
52   * // resources acquired by ChannelFactory.
53   * factory.releaseExternalResources();
54   *
55   * // Note that globalExecutor is not shut down because VirtualExecutorService
56   * // implements its own termination management. All threads which were acquired
57   * // by ChannelFactory via VirtualExecutorService are returned to the pool.
58   * assert !globalExecutor.isShutdown();
59   * </pre>
60   *
61   * <h3>The differences from an ordinary {@link ExecutorService}</h3>
62   *
63   * A shutdown request ({@link #shutdown()} or {@link #shutdownNow()}) does not
64   * shut down its parent {@link Executor} but simply sets its internal flag to
65   * reject further execution request.
66   * <p>
67   * {@link #shutdownNow()} interrupts only the thread which is executing the
68   * task executed via {@link VirtualExecutorService}.
69   * <p>
70   * {@link #awaitTermination(long, TimeUnit)} does not wait for real thread
71   * termination but wait until {@link VirtualExecutorService} is shut down and
72   * its active tasks are finished and the threads are returned to the parent
73   * {@link Executor}.
74   *
75   * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
76   * @author <a href="http://gleamynode.net/">Trustin Lee</a>
77   * @version $Rev: 2122 $, $Date: 2010-02-02 11:00:04 +0900 (Tue, 02 Feb 2010) $
78   *
79   * @apiviz.landmark
80   */
81  public class VirtualExecutorService extends AbstractExecutorService {
82  
83      private final Executor e;
84      private final ExecutorService s;
85      final Object startStopLock = new Object();
86      volatile boolean shutdown;
87      Set<Thread> activeThreads = new MapBackedSet<Thread>(new IdentityHashMap<Thread, Boolean>());
88  
89      /**
90       * Creates a new instance with the specified parent {@link Executor}.
91       */
92      public VirtualExecutorService(Executor parent) {
93          if (parent == null) {
94              throw new NullPointerException("parent");
95          }
96  
97          if (parent instanceof ExecutorService) {
98              e = null;
99              s = (ExecutorService) parent;
100         } else {
101             e = parent;
102             s = null;
103         }
104     }
105 
106     public boolean isShutdown() {
107         synchronized (startStopLock) {
108             return shutdown;
109         }
110     }
111 
112     public boolean isTerminated() {
113         synchronized (startStopLock) {
114             return shutdown && activeThreads.isEmpty();
115         }
116     }
117 
118     public void shutdown() {
119         synchronized (startStopLock) {
120             if (shutdown) {
121                 return;
122             }
123             shutdown = true;
124         }
125     }
126 
127     public List<Runnable> shutdownNow() {
128         synchronized (startStopLock) {
129             if (!isTerminated()) {
130                 shutdown();
131                 for (Thread t: activeThreads) {
132                     t.interrupt();
133                 }
134             }
135         }
136 
137         return Collections.emptyList();
138     }
139 
140     public boolean awaitTermination(long timeout, TimeUnit unit)
141             throws InterruptedException {
142         synchronized (startStopLock) {
143             while (!isTerminated()) {
144                 startStopLock.wait(TimeUnit.MILLISECONDS.convert(timeout, unit));
145             }
146 
147             return isTerminated();
148         }
149     }
150 
151     public void execute(Runnable command) {
152         if (command == null) {
153             throw new NullPointerException("command");
154         }
155 
156         if (shutdown) {
157             throw new RejectedExecutionException();
158         }
159 
160         if (s != null) {
161             s.execute(new ChildExecutorRunnable(command));
162         } else {
163             e.execute(new ChildExecutorRunnable(command));
164         }
165     }
166 
167     private class ChildExecutorRunnable implements Runnable {
168 
169         private final Runnable runnable;
170 
171         ChildExecutorRunnable(Runnable runnable) {
172             this.runnable = runnable;
173         }
174 
175         public void run() {
176             Thread thread = Thread.currentThread();
177             synchronized (startStopLock) {
178                 activeThreads.add(thread);
179             }
180             try {
181                 runnable.run();
182             } finally {
183                 synchronized (startStopLock) {
184                     boolean removed = activeThreads.remove(thread);
185                     assert removed;
186                     if (isTerminated()) {
187                         startStopLock.notifyAll();
188                     }
189                 }
190             }
191         }
192     }
193 }