1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.group;
17
18 import static java.util.concurrent.TimeUnit.*;
19
20 import java.util.ArrayList;
21 import java.util.Collection;
22 import java.util.Collections;
23 import java.util.Iterator;
24 import java.util.LinkedHashMap;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.concurrent.TimeUnit;
28
29 import org.jboss.netty.channel.Channel;
30 import org.jboss.netty.channel.ChannelFuture;
31 import org.jboss.netty.channel.ChannelFutureListener;
32 import org.jboss.netty.logging.InternalLogger;
33 import org.jboss.netty.logging.InternalLoggerFactory;
34 import org.jboss.netty.util.internal.DeadLockProofWorker;
35
36
37
38
39
40
41
42
43
44 public class DefaultChannelGroupFuture implements ChannelGroupFuture {
45
46 private static final InternalLogger logger =
47 InternalLoggerFactory.getInstance(DefaultChannelGroupFuture.class);
48
49 private final ChannelGroup group;
50 final Map<Integer, ChannelFuture> futures;
51 private ChannelGroupFutureListener firstListener;
52 private List<ChannelGroupFutureListener> otherListeners;
53 private boolean done;
54 int successCount;
55 int failureCount;
56 private int waiters;
57
58 private final ChannelFutureListener childListener = new ChannelFutureListener() {
59 public void operationComplete(ChannelFuture future) throws Exception {
60 boolean success = future.isSuccess();
61 boolean callSetDone = false;
62 synchronized (DefaultChannelGroupFuture.this) {
63 if (success) {
64 successCount ++;
65 } else {
66 failureCount ++;
67 }
68
69 callSetDone = successCount + failureCount == futures.size();
70 assert successCount + failureCount <= futures.size();
71 }
72
73 if (callSetDone) {
74 setDone();
75 }
76 }
77 };
78
79
80
81
82 public DefaultChannelGroupFuture(ChannelGroup group, Collection<ChannelFuture> futures) {
83 if (group == null) {
84 throw new NullPointerException("group");
85 }
86 if (futures == null) {
87 throw new NullPointerException("futures");
88 }
89
90 this.group = group;
91
92 Map<Integer, ChannelFuture> futureMap = new LinkedHashMap<Integer, ChannelFuture>();
93 for (ChannelFuture f: futures) {
94 futureMap.put(f.getChannel().getId(), f);
95 }
96
97 this.futures = Collections.unmodifiableMap(futureMap);
98
99 for (ChannelFuture f: this.futures.values()) {
100 f.addListener(childListener);
101 }
102
103
104 if (this.futures.isEmpty()) {
105 setDone();
106 }
107 }
108
109 DefaultChannelGroupFuture(ChannelGroup group, Map<Integer, ChannelFuture> futures) {
110 this.group = group;
111 this.futures = Collections.unmodifiableMap(futures);
112 for (ChannelFuture f: this.futures.values()) {
113 f.addListener(childListener);
114 }
115
116
117 if (this.futures.isEmpty()) {
118 setDone();
119 }
120 }
121
122 public ChannelGroup getGroup() {
123 return group;
124 }
125
126 public ChannelFuture find(Integer channelId) {
127 return futures.get(channelId);
128 }
129
130 public ChannelFuture find(Channel channel) {
131 return futures.get(channel.getId());
132 }
133
134 public Iterator<ChannelFuture> iterator() {
135 return futures.values().iterator();
136 }
137
138 public synchronized boolean isDone() {
139 return done;
140 }
141
142 public synchronized boolean isCompleteSuccess() {
143 return successCount == futures.size();
144 }
145
146 public synchronized boolean isPartialSuccess() {
147 return successCount != 0 && successCount != futures.size();
148 }
149
150 public synchronized boolean isPartialFailure() {
151 return failureCount != 0 && failureCount != futures.size();
152 }
153
154 public synchronized boolean isCompleteFailure() {
155 int futureCnt = futures.size();
156 return futureCnt != 0 && failureCount == futureCnt;
157 }
158
159 public void addListener(ChannelGroupFutureListener listener) {
160 if (listener == null) {
161 throw new NullPointerException("listener");
162 }
163
164 boolean notifyNow = false;
165 synchronized (this) {
166 if (done) {
167 notifyNow = true;
168 } else {
169 if (firstListener == null) {
170 firstListener = listener;
171 } else {
172 if (otherListeners == null) {
173 otherListeners = new ArrayList<ChannelGroupFutureListener>(1);
174 }
175 otherListeners.add(listener);
176 }
177 }
178 }
179
180 if (notifyNow) {
181 notifyListener(listener);
182 }
183 }
184
185 public void removeListener(ChannelGroupFutureListener listener) {
186 if (listener == null) {
187 throw new NullPointerException("listener");
188 }
189
190 synchronized (this) {
191 if (!done) {
192 if (listener == firstListener) {
193 if (otherListeners != null && !otherListeners.isEmpty()) {
194 firstListener = otherListeners.remove(0);
195 } else {
196 firstListener = null;
197 }
198 } else if (otherListeners != null) {
199 otherListeners.remove(listener);
200 }
201 }
202 }
203 }
204
205 public ChannelGroupFuture await() throws InterruptedException {
206 if (Thread.interrupted()) {
207 throw new InterruptedException();
208 }
209
210 synchronized (this) {
211 while (!done) {
212 checkDeadLock();
213 waiters++;
214 try {
215 this.wait();
216 } finally {
217 waiters--;
218 }
219 }
220 }
221 return this;
222 }
223
224 public boolean await(long timeout, TimeUnit unit)
225 throws InterruptedException {
226 return await0(unit.toNanos(timeout), true);
227 }
228
229 public boolean await(long timeoutMillis) throws InterruptedException {
230 return await0(MILLISECONDS.toNanos(timeoutMillis), true);
231 }
232
233 public ChannelGroupFuture awaitUninterruptibly() {
234 boolean interrupted = false;
235 synchronized (this) {
236 while (!done) {
237 checkDeadLock();
238 waiters++;
239 try {
240 this.wait();
241 } catch (InterruptedException e) {
242 interrupted = true;
243 } finally {
244 waiters--;
245 }
246 }
247 }
248
249 if (interrupted) {
250 Thread.currentThread().interrupt();
251 }
252
253 return this;
254 }
255
256 public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
257 try {
258 return await0(unit.toNanos(timeout), false);
259 } catch (InterruptedException e) {
260 throw new InternalError();
261 }
262 }
263
264 public boolean awaitUninterruptibly(long timeoutMillis) {
265 try {
266 return await0(MILLISECONDS.toNanos(timeoutMillis), false);
267 } catch (InterruptedException e) {
268 throw new InternalError();
269 }
270 }
271
272 private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
273 if (interruptable && Thread.interrupted()) {
274 throw new InterruptedException();
275 }
276
277 long startTime = timeoutNanos <= 0 ? 0 : System.nanoTime();
278 long waitTime = timeoutNanos;
279 boolean interrupted = false;
280
281 try {
282 synchronized (this) {
283 if (done) {
284 return done;
285 } else if (waitTime <= 0) {
286 return done;
287 }
288
289 checkDeadLock();
290 waiters++;
291 try {
292 for (;;) {
293 try {
294 this.wait(waitTime / 1000000, (int) (waitTime % 1000000));
295 } catch (InterruptedException e) {
296 if (interruptable) {
297 throw e;
298 } else {
299 interrupted = true;
300 }
301 }
302
303 if (done) {
304 return true;
305 } else {
306 waitTime = timeoutNanos - (System.nanoTime() - startTime);
307 if (waitTime <= 0) {
308 return done;
309 }
310 }
311 }
312 } finally {
313 waiters--;
314 }
315 }
316 } finally {
317 if (interrupted) {
318 Thread.currentThread().interrupt();
319 }
320 }
321 }
322
323 private void checkDeadLock() {
324 if (DeadLockProofWorker.PARENT.get() != null) {
325 throw new IllegalStateException(
326 "await*() in I/O thread causes a dead lock or " +
327 "sudden performance drop. Use addListener() instead or " +
328 "call await*() from a different thread.");
329 }
330 }
331
332 boolean setDone() {
333 synchronized (this) {
334
335 if (done) {
336 return false;
337 }
338
339 done = true;
340 if (waiters > 0) {
341 notifyAll();
342 }
343 }
344
345 notifyListeners();
346 return true;
347 }
348
349 private void notifyListeners() {
350
351
352
353
354
355 if (firstListener != null) {
356 notifyListener(firstListener);
357 firstListener = null;
358
359 if (otherListeners != null) {
360 for (ChannelGroupFutureListener l: otherListeners) {
361 notifyListener(l);
362 }
363 otherListeners = null;
364 }
365 }
366 }
367
368 private void notifyListener(ChannelGroupFutureListener l) {
369 try {
370 l.operationComplete(this);
371 } catch (Throwable t) {
372 logger.warn(
373 "An exception was thrown by " +
374 ChannelFutureListener.class.getSimpleName() + ".", t);
375 }
376 }
377 }