View Javadoc

1   /*
2    * Copyright 2009 Red Hat, Inc.
3    *
4    * Red Hat licenses this file to you under the Apache License, version 2.0
5    * (the "License"); you may not use this file except in compliance with the
6    * License.  You may obtain a copy of the License at:
7    *
8    *    http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.group;
17  
18  import static java.util.concurrent.TimeUnit.*;
19  
20  import java.util.ArrayList;
21  import java.util.Collection;
22  import java.util.Collections;
23  import java.util.Iterator;
24  import java.util.LinkedHashMap;
25  import java.util.List;
26  import java.util.Map;
27  import java.util.concurrent.TimeUnit;
28  
29  import org.jboss.netty.channel.Channel;
30  import org.jboss.netty.channel.ChannelFuture;
31  import org.jboss.netty.channel.ChannelFutureListener;
32  import org.jboss.netty.logging.InternalLogger;
33  import org.jboss.netty.logging.InternalLoggerFactory;
34  import org.jboss.netty.util.internal.DeadLockProofWorker;
35  
36  /**
37   * The default {@link ChannelGroupFuture} implementation.
38   *
39   * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
40   * @author <a href="http://gleamynode.net/">Trustin Lee</a>
41   *
42   * @version $Rev: 2191 $, $Date: 2010-02-19 18:18:10 +0900 (Fri, 19 Feb 2010) $
43   */
44  public class DefaultChannelGroupFuture implements ChannelGroupFuture {
45  
46      private static final InternalLogger logger =
47          InternalLoggerFactory.getInstance(DefaultChannelGroupFuture.class);
48  
49      private final ChannelGroup group;
50      final Map<Integer, ChannelFuture> futures;
51      private ChannelGroupFutureListener firstListener;
52      private List<ChannelGroupFutureListener> otherListeners;
53      private boolean done;
54      int successCount;
55      int failureCount;
56      private int waiters;
57  
58      private final ChannelFutureListener childListener = new ChannelFutureListener() {
59          public void operationComplete(ChannelFuture future) throws Exception {
60              boolean success = future.isSuccess();
61              boolean callSetDone = false;
62              synchronized (DefaultChannelGroupFuture.this) {
63                  if (success) {
64                      successCount ++;
65                  } else {
66                      failureCount ++;
67                  }
68  
69                  callSetDone = successCount + failureCount == futures.size();
70                  assert successCount + failureCount <= futures.size();
71              }
72  
73              if (callSetDone) {
74                  setDone();
75              }
76          }
77      };
78  
79      /**
80       * Creates a new instance.
81       */
82      public DefaultChannelGroupFuture(ChannelGroup group, Collection<ChannelFuture> futures) {
83          if (group == null) {
84              throw new NullPointerException("group");
85          }
86          if (futures == null) {
87              throw new NullPointerException("futures");
88          }
89  
90          this.group = group;
91  
92          Map<Integer, ChannelFuture> futureMap = new LinkedHashMap<Integer, ChannelFuture>();
93          for (ChannelFuture f: futures) {
94              futureMap.put(f.getChannel().getId(), f);
95          }
96  
97          this.futures = Collections.unmodifiableMap(futureMap);
98  
99          for (ChannelFuture f: this.futures.values()) {
100             f.addListener(childListener);
101         }
102 
103         // Done on arrival?
104         if (this.futures.isEmpty()) {
105             setDone();
106         }
107     }
108 
109     DefaultChannelGroupFuture(ChannelGroup group, Map<Integer, ChannelFuture> futures) {
110         this.group = group;
111         this.futures = Collections.unmodifiableMap(futures);
112         for (ChannelFuture f: this.futures.values()) {
113             f.addListener(childListener);
114         }
115 
116         // Done on arrival?
117         if (this.futures.isEmpty()) {
118             setDone();
119         }
120     }
121 
122     public ChannelGroup getGroup() {
123         return group;
124     }
125 
126     public ChannelFuture find(Integer channelId) {
127         return futures.get(channelId);
128     }
129 
130     public ChannelFuture find(Channel channel) {
131         return futures.get(channel.getId());
132     }
133 
134     public Iterator<ChannelFuture> iterator() {
135         return futures.values().iterator();
136     }
137 
138     public synchronized boolean isDone() {
139         return done;
140     }
141 
142     public synchronized boolean isCompleteSuccess() {
143         return successCount == futures.size();
144     }
145 
146     public synchronized boolean isPartialSuccess() {
147         return successCount != 0 && successCount != futures.size();
148     }
149 
150     public synchronized boolean isPartialFailure() {
151         return failureCount != 0 && failureCount != futures.size();
152     }
153 
154     public synchronized boolean isCompleteFailure() {
155         int futureCnt = futures.size();
156         return futureCnt != 0 && failureCount == futureCnt;
157     }
158 
159     public void addListener(ChannelGroupFutureListener listener) {
160         if (listener == null) {
161             throw new NullPointerException("listener");
162         }
163 
164         boolean notifyNow = false;
165         synchronized (this) {
166             if (done) {
167                 notifyNow = true;
168             } else {
169                 if (firstListener == null) {
170                     firstListener = listener;
171                 } else {
172                     if (otherListeners == null) {
173                         otherListeners = new ArrayList<ChannelGroupFutureListener>(1);
174                     }
175                     otherListeners.add(listener);
176                 }
177             }
178         }
179 
180         if (notifyNow) {
181             notifyListener(listener);
182         }
183     }
184 
185     public void removeListener(ChannelGroupFutureListener listener) {
186         if (listener == null) {
187             throw new NullPointerException("listener");
188         }
189 
190         synchronized (this) {
191             if (!done) {
192                 if (listener == firstListener) {
193                     if (otherListeners != null && !otherListeners.isEmpty()) {
194                         firstListener = otherListeners.remove(0);
195                     } else {
196                         firstListener = null;
197                     }
198                 } else if (otherListeners != null) {
199                     otherListeners.remove(listener);
200                 }
201             }
202         }
203     }
204 
205     public ChannelGroupFuture await() throws InterruptedException {
206         if (Thread.interrupted()) {
207             throw new InterruptedException();
208         }
209 
210         synchronized (this) {
211             while (!done) {
212                 checkDeadLock();
213                 waiters++;
214                 try {
215                     this.wait();
216                 } finally {
217                     waiters--;
218                 }
219             }
220         }
221         return this;
222     }
223 
224     public boolean await(long timeout, TimeUnit unit)
225             throws InterruptedException {
226         return await0(unit.toNanos(timeout), true);
227     }
228 
229     public boolean await(long timeoutMillis) throws InterruptedException {
230         return await0(MILLISECONDS.toNanos(timeoutMillis), true);
231     }
232 
233     public ChannelGroupFuture awaitUninterruptibly() {
234         boolean interrupted = false;
235         synchronized (this) {
236             while (!done) {
237                 checkDeadLock();
238                 waiters++;
239                 try {
240                     this.wait();
241                 } catch (InterruptedException e) {
242                     interrupted = true;
243                 } finally {
244                     waiters--;
245                 }
246             }
247         }
248 
249         if (interrupted) {
250             Thread.currentThread().interrupt();
251         }
252 
253         return this;
254     }
255 
256     public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
257         try {
258             return await0(unit.toNanos(timeout), false);
259         } catch (InterruptedException e) {
260             throw new InternalError();
261         }
262     }
263 
264     public boolean awaitUninterruptibly(long timeoutMillis) {
265         try {
266             return await0(MILLISECONDS.toNanos(timeoutMillis), false);
267         } catch (InterruptedException e) {
268             throw new InternalError();
269         }
270     }
271 
272     private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
273         if (interruptable && Thread.interrupted()) {
274             throw new InterruptedException();
275         }
276 
277         long startTime = timeoutNanos <= 0 ? 0 : System.nanoTime();
278         long waitTime = timeoutNanos;
279         boolean interrupted = false;
280 
281         try {
282             synchronized (this) {
283                 if (done) {
284                     return done;
285                 } else if (waitTime <= 0) {
286                     return done;
287                 }
288 
289                 checkDeadLock();
290                 waiters++;
291                 try {
292                     for (;;) {
293                         try {
294                             this.wait(waitTime / 1000000, (int) (waitTime % 1000000));
295                         } catch (InterruptedException e) {
296                             if (interruptable) {
297                                 throw e;
298                             } else {
299                                 interrupted = true;
300                             }
301                         }
302 
303                         if (done) {
304                             return true;
305                         } else {
306                             waitTime = timeoutNanos - (System.nanoTime() - startTime);
307                             if (waitTime <= 0) {
308                                 return done;
309                             }
310                         }
311                     }
312                 } finally {
313                     waiters--;
314                 }
315             }
316         } finally {
317             if (interrupted) {
318                 Thread.currentThread().interrupt();
319             }
320         }
321     }
322 
323     private void checkDeadLock() {
324         if (DeadLockProofWorker.PARENT.get() != null) {
325             throw new IllegalStateException(
326                     "await*() in I/O thread causes a dead lock or " +
327                     "sudden performance drop. Use addListener() instead or " +
328                     "call await*() from a different thread.");
329         }
330     }
331 
332     boolean setDone() {
333         synchronized (this) {
334             // Allow only once.
335             if (done) {
336                 return false;
337             }
338 
339             done = true;
340             if (waiters > 0) {
341                 notifyAll();
342             }
343         }
344 
345         notifyListeners();
346         return true;
347     }
348 
349     private void notifyListeners() {
350         // This method doesn't need synchronization because:
351         // 1) This method is always called after synchronized (this) block.
352         //    Hence any listener list modification happens-before this method.
353         // 2) This method is called only when 'done' is true.  Once 'done'
354         //    becomes true, the listener list is never modified - see add/removeListener()
355         if (firstListener != null) {
356             notifyListener(firstListener);
357             firstListener = null;
358 
359             if (otherListeners != null) {
360                 for (ChannelGroupFutureListener l: otherListeners) {
361                     notifyListener(l);
362                 }
363                 otherListeners = null;
364             }
365         }
366     }
367 
368     private void notifyListener(ChannelGroupFutureListener l) {
369         try {
370             l.operationComplete(this);
371         } catch (Throwable t) {
372             logger.warn(
373                     "An exception was thrown by " +
374                     ChannelFutureListener.class.getSimpleName() + ".", t);
375         }
376     }
377 }