1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel;
17
18 import static java.util.concurrent.TimeUnit.*;
19
20 import java.util.ArrayList;
21 import java.util.Collection;
22 import java.util.List;
23 import java.util.concurrent.TimeUnit;
24
25 import org.jboss.netty.logging.InternalLogger;
26 import org.jboss.netty.logging.InternalLoggerFactory;
27 import org.jboss.netty.util.internal.DeadLockProofWorker;
28
29
30
31
32
33
34
35
36
37
38
39
40 public class DefaultChannelFuture implements ChannelFuture {
41
42 private static final InternalLogger logger =
43 InternalLoggerFactory.getInstance(DefaultChannelFuture.class);
44
45 private static final Throwable CANCELLED = new Throwable();
46
47 private static volatile boolean useDeadLockChecker = true;
48 private static boolean disabledDeadLockCheckerOnce;
49
50
51
52
53 public static boolean isUseDeadLockChecker() {
54 return useDeadLockChecker;
55 }
56
57
58
59
60
61 public static void setUseDeadLockChecker(boolean useDeadLockChecker) {
62 if (!useDeadLockChecker && !disabledDeadLockCheckerOnce) {
63 disabledDeadLockCheckerOnce = true;
64 logger.debug(
65 "The dead lock checker in " +
66 DefaultChannelFuture.class.getSimpleName() +
67 " has been disabled as requested at your own risk.");
68 }
69 DefaultChannelFuture.useDeadLockChecker = useDeadLockChecker;
70 }
71
72 private final Channel channel;
73 private final boolean cancellable;
74
75 private ChannelFutureListener firstListener;
76 private List<ChannelFutureListener> otherListeners;
77 private List<ChannelFutureProgressListener> progressListeners;
78 private boolean done;
79 private Throwable cause;
80 private int waiters;
81
82
83
84
85
86
87
88
89
90 public DefaultChannelFuture(Channel channel, boolean cancellable) {
91 this.channel = channel;
92 this.cancellable = cancellable;
93 }
94
95 public Channel getChannel() {
96 return channel;
97 }
98
99 public synchronized boolean isDone() {
100 return done;
101 }
102
103 public synchronized boolean isSuccess() {
104 return done && cause == null;
105 }
106
107 public synchronized Throwable getCause() {
108 if (cause != CANCELLED) {
109 return cause;
110 } else {
111 return null;
112 }
113 }
114
115 public synchronized boolean isCancelled() {
116 return cause == CANCELLED;
117 }
118
119 public void addListener(ChannelFutureListener listener) {
120 if (listener == null) {
121 throw new NullPointerException("listener");
122 }
123
124 boolean notifyNow = false;
125 synchronized (this) {
126 if (done) {
127 notifyNow = true;
128 } else {
129 if (firstListener == null) {
130 firstListener = listener;
131 } else {
132 if (otherListeners == null) {
133 otherListeners = new ArrayList<ChannelFutureListener>(1);
134 }
135 otherListeners.add(listener);
136 }
137
138 if (listener instanceof ChannelFutureProgressListener) {
139 if (progressListeners == null) {
140 progressListeners = new ArrayList<ChannelFutureProgressListener>(1);
141 }
142 progressListeners.add((ChannelFutureProgressListener) listener);
143 }
144 }
145 }
146
147 if (notifyNow) {
148 notifyListener(listener);
149 }
150 }
151
152 public void removeListener(ChannelFutureListener listener) {
153 if (listener == null) {
154 throw new NullPointerException("listener");
155 }
156
157 synchronized (this) {
158 if (!done) {
159 if (listener == firstListener) {
160 if (otherListeners != null && !otherListeners.isEmpty()) {
161 firstListener = otherListeners.remove(0);
162 } else {
163 firstListener = null;
164 }
165 } else if (otherListeners != null) {
166 otherListeners.remove(listener);
167 }
168
169 if (listener instanceof ChannelFutureProgressListener) {
170 progressListeners.remove(listener);
171 }
172 }
173 }
174 }
175
176 public ChannelFuture await() throws InterruptedException {
177 if (Thread.interrupted()) {
178 throw new InterruptedException();
179 }
180
181 synchronized (this) {
182 while (!done) {
183 checkDeadLock();
184 waiters++;
185 try {
186 this.wait();
187 } finally {
188 waiters--;
189 }
190 }
191 }
192 return this;
193 }
194
195 public boolean await(long timeout, TimeUnit unit)
196 throws InterruptedException {
197 return await0(unit.toNanos(timeout), true);
198 }
199
200 public boolean await(long timeoutMillis) throws InterruptedException {
201 return await0(MILLISECONDS.toNanos(timeoutMillis), true);
202 }
203
204 public ChannelFuture awaitUninterruptibly() {
205 boolean interrupted = false;
206 synchronized (this) {
207 while (!done) {
208 checkDeadLock();
209 waiters++;
210 try {
211 this.wait();
212 } catch (InterruptedException e) {
213 interrupted = true;
214 } finally {
215 waiters--;
216 }
217 }
218 }
219
220 if (interrupted) {
221 Thread.currentThread().interrupt();
222 }
223
224 return this;
225 }
226
227 public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
228 try {
229 return await0(unit.toNanos(timeout), false);
230 } catch (InterruptedException e) {
231 throw new InternalError();
232 }
233 }
234
235 public boolean awaitUninterruptibly(long timeoutMillis) {
236 try {
237 return await0(MILLISECONDS.toNanos(timeoutMillis), false);
238 } catch (InterruptedException e) {
239 throw new InternalError();
240 }
241 }
242
243 private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
244 if (interruptable && Thread.interrupted()) {
245 throw new InterruptedException();
246 }
247
248 long startTime = timeoutNanos <= 0 ? 0 : System.nanoTime();
249 long waitTime = timeoutNanos;
250 boolean interrupted = false;
251
252 try {
253 synchronized (this) {
254 if (done) {
255 return done;
256 } else if (waitTime <= 0) {
257 return done;
258 }
259
260 checkDeadLock();
261 waiters++;
262 try {
263 for (;;) {
264 try {
265 this.wait(waitTime / 1000000, (int) (waitTime % 1000000));
266 } catch (InterruptedException e) {
267 if (interruptable) {
268 throw e;
269 } else {
270 interrupted = true;
271 }
272 }
273
274 if (done) {
275 return true;
276 } else {
277 waitTime = timeoutNanos - (System.nanoTime() - startTime);
278 if (waitTime <= 0) {
279 return done;
280 }
281 }
282 }
283 } finally {
284 waiters--;
285 }
286 }
287 } finally {
288 if (interrupted) {
289 Thread.currentThread().interrupt();
290 }
291 }
292 }
293
294 private void checkDeadLock() {
295 if (isUseDeadLockChecker() && DeadLockProofWorker.PARENT.get() != null) {
296 throw new IllegalStateException(
297 "await*() in I/O thread causes a dead lock or " +
298 "sudden performance drop. Use addListener() instead or " +
299 "call await*() from a different thread.");
300 }
301 }
302
303 public boolean setSuccess() {
304 synchronized (this) {
305
306 if (done) {
307 return false;
308 }
309
310 done = true;
311 if (waiters > 0) {
312 notifyAll();
313 }
314 }
315
316 notifyListeners();
317 return true;
318 }
319
320 public boolean setFailure(Throwable cause) {
321 synchronized (this) {
322
323 if (done) {
324 return false;
325 }
326
327 this.cause = cause;
328 done = true;
329 if (waiters > 0) {
330 notifyAll();
331 }
332 }
333
334 notifyListeners();
335 return true;
336 }
337
338 public boolean cancel() {
339 if (!cancellable) {
340 return false;
341 }
342
343 synchronized (this) {
344
345 if (done) {
346 return false;
347 }
348
349 cause = CANCELLED;
350 done = true;
351 if (waiters > 0) {
352 notifyAll();
353 }
354 }
355
356 notifyListeners();
357 return true;
358 }
359
360 private void notifyListeners() {
361
362
363
364
365
366 if (firstListener != null) {
367 notifyListener(firstListener);
368 firstListener = null;
369
370 if (otherListeners != null) {
371 for (ChannelFutureListener l: otherListeners) {
372 notifyListener(l);
373 }
374 otherListeners = null;
375 }
376 }
377 }
378
379 private void notifyListener(ChannelFutureListener l) {
380 try {
381 l.operationComplete(this);
382 } catch (Throwable t) {
383 logger.warn(
384 "An exception was thrown by " +
385 ChannelFutureListener.class.getSimpleName() + ".", t);
386 }
387 }
388
389 public boolean setProgress(long amount, long current, long total) {
390 ChannelFutureProgressListener[] plisteners;
391 synchronized (this) {
392
393 if (done) {
394 return false;
395 }
396
397 Collection<ChannelFutureProgressListener> progressListeners =
398 this.progressListeners;
399 if (progressListeners == null || progressListeners.isEmpty()) {
400
401 return true;
402 }
403
404 plisteners = progressListeners.toArray(
405 new ChannelFutureProgressListener[progressListeners.size()]);
406 }
407
408 for (ChannelFutureProgressListener pl: plisteners) {
409 notifyProgressListener(pl, amount, current, total);
410 }
411
412 return true;
413 }
414
415 private void notifyProgressListener(
416 ChannelFutureProgressListener l,
417 long amount, long current, long total) {
418
419 try {
420 l.operationProgressed(this, amount, current, total);
421 } catch (Throwable t) {
422 logger.warn(
423 "An exception was thrown by " +
424 ChannelFutureProgressListener.class.getSimpleName() + ".", t);
425 }
426 }
427 }