View Javadoc

1   /*
2    * Copyright 2009 Red Hat, Inc.
3    *
4    * Red Hat licenses this file to you under the Apache License, version 2.0
5    * (the "License"); you may not use this file except in compliance with the
6    * License.  You may obtain a copy of the License at:
7    *
8    *    http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel;
17  
18  import static java.util.concurrent.TimeUnit.*;
19  
20  import java.util.ArrayList;
21  import java.util.Collection;
22  import java.util.List;
23  import java.util.concurrent.TimeUnit;
24  
25  import org.jboss.netty.logging.InternalLogger;
26  import org.jboss.netty.logging.InternalLoggerFactory;
27  import org.jboss.netty.util.internal.DeadLockProofWorker;
28  
29  /**
30   * The default {@link ChannelFuture} implementation.  It is recommended to
31   * use {@link Channels#future(Channel)} and {@link Channels#future(Channel, boolean)}
32   * to create a new {@link ChannelFuture} rather than calling the constructor
33   * explicitly.
34   *
35   * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
36   * @author <a href="http://gleamynode.net/">Trustin Lee</a>
37   *
38   * @version $Rev: 2201 $, $Date: 2010-02-23 14:45:53 +0900 (Tue, 23 Feb 2010) $
39   */
40  public class DefaultChannelFuture implements ChannelFuture {
41  
42      private static final InternalLogger logger =
43          InternalLoggerFactory.getInstance(DefaultChannelFuture.class);
44  
45      private static final Throwable CANCELLED = new Throwable();
46  
47      private static volatile boolean useDeadLockChecker = true;
48      private static boolean disabledDeadLockCheckerOnce;
49  
50      /**
51       * Returns {@code true} if and only if the dead lock checker is enabled.
52       */
53      public static boolean isUseDeadLockChecker() {
54          return useDeadLockChecker;
55      }
56  
57      /**
58       * Enables or disables the dead lock checker.  It is not recommended to
59       * disable the dead lock checker.  Disable it at your own risk!
60       */
61      public static void setUseDeadLockChecker(boolean useDeadLockChecker) {
62          if (!useDeadLockChecker && !disabledDeadLockCheckerOnce) {
63              disabledDeadLockCheckerOnce = true;
64              logger.debug(
65                      "The dead lock checker in " +
66                      DefaultChannelFuture.class.getSimpleName() +
67                      " has been disabled as requested at your own risk.");
68          }
69          DefaultChannelFuture.useDeadLockChecker = useDeadLockChecker;
70      }
71  
72      private final Channel channel;
73      private final boolean cancellable;
74  
75      private ChannelFutureListener firstListener;
76      private List<ChannelFutureListener> otherListeners;
77      private List<ChannelFutureProgressListener> progressListeners;
78      private boolean done;
79      private Throwable cause;
80      private int waiters;
81  
82      /**
83       * Creates a new instance.
84       *
85       * @param channel
86       *        the {@link Channel} associated with this future
87       * @param cancellable
88       *        {@code true} if and only if this future can be canceled
89       */
90      public DefaultChannelFuture(Channel channel, boolean cancellable) {
91          this.channel = channel;
92          this.cancellable = cancellable;
93      }
94  
95      public Channel getChannel() {
96          return channel;
97      }
98  
99      public synchronized boolean isDone() {
100         return done;
101     }
102 
103     public synchronized boolean isSuccess() {
104         return done && cause == null;
105     }
106 
107     public synchronized Throwable getCause() {
108         if (cause != CANCELLED) {
109             return cause;
110         } else {
111             return null;
112         }
113     }
114 
115     public synchronized boolean isCancelled() {
116         return cause == CANCELLED;
117     }
118 
119     public void addListener(ChannelFutureListener listener) {
120         if (listener == null) {
121             throw new NullPointerException("listener");
122         }
123 
124         boolean notifyNow = false;
125         synchronized (this) {
126             if (done) {
127                 notifyNow = true;
128             } else {
129                 if (firstListener == null) {
130                     firstListener = listener;
131                 } else {
132                     if (otherListeners == null) {
133                         otherListeners = new ArrayList<ChannelFutureListener>(1);
134                     }
135                     otherListeners.add(listener);
136                 }
137 
138                 if (listener instanceof ChannelFutureProgressListener) {
139                     if (progressListeners == null) {
140                         progressListeners = new ArrayList<ChannelFutureProgressListener>(1);
141                     }
142                     progressListeners.add((ChannelFutureProgressListener) listener);
143                 }
144             }
145         }
146 
147         if (notifyNow) {
148             notifyListener(listener);
149         }
150     }
151 
152     public void removeListener(ChannelFutureListener listener) {
153         if (listener == null) {
154             throw new NullPointerException("listener");
155         }
156 
157         synchronized (this) {
158             if (!done) {
159                 if (listener == firstListener) {
160                     if (otherListeners != null && !otherListeners.isEmpty()) {
161                         firstListener = otherListeners.remove(0);
162                     } else {
163                         firstListener = null;
164                     }
165                 } else if (otherListeners != null) {
166                     otherListeners.remove(listener);
167                 }
168 
169                 if (listener instanceof ChannelFutureProgressListener) {
170                     progressListeners.remove(listener);
171                 }
172             }
173         }
174     }
175 
176     public ChannelFuture await() throws InterruptedException {
177         if (Thread.interrupted()) {
178             throw new InterruptedException();
179         }
180 
181         synchronized (this) {
182             while (!done) {
183                 checkDeadLock();
184                 waiters++;
185                 try {
186                     this.wait();
187                 } finally {
188                     waiters--;
189                 }
190             }
191         }
192         return this;
193     }
194 
195     public boolean await(long timeout, TimeUnit unit)
196             throws InterruptedException {
197         return await0(unit.toNanos(timeout), true);
198     }
199 
200     public boolean await(long timeoutMillis) throws InterruptedException {
201         return await0(MILLISECONDS.toNanos(timeoutMillis), true);
202     }
203 
204     public ChannelFuture awaitUninterruptibly() {
205         boolean interrupted = false;
206         synchronized (this) {
207             while (!done) {
208                 checkDeadLock();
209                 waiters++;
210                 try {
211                     this.wait();
212                 } catch (InterruptedException e) {
213                     interrupted = true;
214                 } finally {
215                     waiters--;
216                 }
217             }
218         }
219 
220         if (interrupted) {
221             Thread.currentThread().interrupt();
222         }
223 
224         return this;
225     }
226 
227     public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
228         try {
229             return await0(unit.toNanos(timeout), false);
230         } catch (InterruptedException e) {
231             throw new InternalError();
232         }
233     }
234 
235     public boolean awaitUninterruptibly(long timeoutMillis) {
236         try {
237             return await0(MILLISECONDS.toNanos(timeoutMillis), false);
238         } catch (InterruptedException e) {
239             throw new InternalError();
240         }
241     }
242 
243     private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
244         if (interruptable && Thread.interrupted()) {
245             throw new InterruptedException();
246         }
247 
248         long startTime = timeoutNanos <= 0 ? 0 : System.nanoTime();
249         long waitTime = timeoutNanos;
250         boolean interrupted = false;
251 
252         try {
253             synchronized (this) {
254                 if (done) {
255                     return done;
256                 } else if (waitTime <= 0) {
257                     return done;
258                 }
259 
260                 checkDeadLock();
261                 waiters++;
262                 try {
263                     for (;;) {
264                         try {
265                             this.wait(waitTime / 1000000, (int) (waitTime % 1000000));
266                         } catch (InterruptedException e) {
267                             if (interruptable) {
268                                 throw e;
269                             } else {
270                                 interrupted = true;
271                             }
272                         }
273 
274                         if (done) {
275                             return true;
276                         } else {
277                             waitTime = timeoutNanos - (System.nanoTime() - startTime);
278                             if (waitTime <= 0) {
279                                 return done;
280                             }
281                         }
282                     }
283                 } finally {
284                     waiters--;
285                 }
286             }
287         } finally {
288             if (interrupted) {
289                 Thread.currentThread().interrupt();
290             }
291         }
292     }
293 
294     private void checkDeadLock() {
295         if (isUseDeadLockChecker() && DeadLockProofWorker.PARENT.get() != null) {
296             throw new IllegalStateException(
297                     "await*() in I/O thread causes a dead lock or " +
298                     "sudden performance drop. Use addListener() instead or " +
299                     "call await*() from a different thread.");
300         }
301     }
302 
303     public boolean setSuccess() {
304         synchronized (this) {
305             // Allow only once.
306             if (done) {
307                 return false;
308             }
309 
310             done = true;
311             if (waiters > 0) {
312                 notifyAll();
313             }
314         }
315 
316         notifyListeners();
317         return true;
318     }
319 
320     public boolean setFailure(Throwable cause) {
321         synchronized (this) {
322             // Allow only once.
323             if (done) {
324                 return false;
325             }
326 
327             this.cause = cause;
328             done = true;
329             if (waiters > 0) {
330                 notifyAll();
331             }
332         }
333 
334         notifyListeners();
335         return true;
336     }
337 
338     public boolean cancel() {
339         if (!cancellable) {
340             return false;
341         }
342 
343         synchronized (this) {
344             // Allow only once.
345             if (done) {
346                 return false;
347             }
348 
349             cause = CANCELLED;
350             done = true;
351             if (waiters > 0) {
352                 notifyAll();
353             }
354         }
355 
356         notifyListeners();
357         return true;
358     }
359 
360     private void notifyListeners() {
361         // This method doesn't need synchronization because:
362         // 1) This method is always called after synchronized (this) block.
363         //    Hence any listener list modification happens-before this method.
364         // 2) This method is called only when 'done' is true.  Once 'done'
365         //    becomes true, the listener list is never modified - see add/removeListener()
366         if (firstListener != null) {
367             notifyListener(firstListener);
368             firstListener = null;
369 
370             if (otherListeners != null) {
371                 for (ChannelFutureListener l: otherListeners) {
372                     notifyListener(l);
373                 }
374                 otherListeners = null;
375             }
376         }
377     }
378 
379     private void notifyListener(ChannelFutureListener l) {
380         try {
381             l.operationComplete(this);
382         } catch (Throwable t) {
383             logger.warn(
384                     "An exception was thrown by " +
385                     ChannelFutureListener.class.getSimpleName() + ".", t);
386         }
387     }
388 
389     public boolean setProgress(long amount, long current, long total) {
390         ChannelFutureProgressListener[] plisteners;
391         synchronized (this) {
392             // Do not generate progress event after completion.
393             if (done) {
394                 return false;
395             }
396 
397             Collection<ChannelFutureProgressListener> progressListeners =
398                 this.progressListeners;
399             if (progressListeners == null || progressListeners.isEmpty()) {
400                 // Nothing to notify - no need to create an empty array.
401                 return true;
402             }
403 
404             plisteners = progressListeners.toArray(
405                     new ChannelFutureProgressListener[progressListeners.size()]);
406         }
407 
408         for (ChannelFutureProgressListener pl: plisteners) {
409             notifyProgressListener(pl, amount, current, total);
410         }
411 
412         return true;
413     }
414 
415     private void notifyProgressListener(
416             ChannelFutureProgressListener l,
417             long amount, long current, long total) {
418 
419         try {
420             l.operationProgressed(this, amount, current, total);
421         } catch (Throwable t) {
422             logger.warn(
423                     "An exception was thrown by " +
424                     ChannelFutureProgressListener.class.getSimpleName() + ".", t);
425         }
426     }
427 }