1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.util;
17
18 import java.util.ArrayList;
19 import java.util.Collections;
20 import java.util.HashSet;
21 import java.util.List;
22 import java.util.Set;
23 import java.util.concurrent.Executors;
24 import java.util.concurrent.ThreadFactory;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.atomic.AtomicBoolean;
27 import java.util.concurrent.atomic.AtomicInteger;
28 import java.util.concurrent.locks.ReadWriteLock;
29 import java.util.concurrent.locks.ReentrantReadWriteLock;
30
31 import org.jboss.netty.channel.ChannelPipelineFactory;
32 import org.jboss.netty.logging.InternalLogger;
33 import org.jboss.netty.logging.InternalLoggerFactory;
34 import org.jboss.netty.util.internal.ConcurrentIdentityHashMap;
35 import org.jboss.netty.util.internal.ReusableIterator;
36 import org.jboss.netty.util.internal.SharedResourceMisuseDetector;
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85 public class HashedWheelTimer implements Timer {
86
87 static final InternalLogger logger =
88 InternalLoggerFactory.getInstance(HashedWheelTimer.class);
89 private static final AtomicInteger id = new AtomicInteger();
90
91 private static final SharedResourceMisuseDetector misuseDetector =
92 new SharedResourceMisuseDetector(HashedWheelTimer.class);
93
94 private final Worker worker = new Worker();
95 final Thread workerThread;
96 final AtomicBoolean shutdown = new AtomicBoolean();
97
98 private final long roundDuration;
99 final long tickDuration;
100 final Set<HashedWheelTimeout>[] wheel;
101 final ReusableIterator<HashedWheelTimeout>[] iterators;
102 final int mask;
103 final ReadWriteLock lock = new ReentrantReadWriteLock();
104 volatile int wheelCursor;
105
106
107
108
109
110
111 public HashedWheelTimer() {
112 this(Executors.defaultThreadFactory());
113 }
114
115
116
117
118
119
120
121
122
123 public HashedWheelTimer(long tickDuration, TimeUnit unit) {
124 this(Executors.defaultThreadFactory(), tickDuration, unit);
125 }
126
127
128
129
130
131
132
133
134
135 public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
136 this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
137 }
138
139
140
141
142
143
144
145
146
147 public HashedWheelTimer(ThreadFactory threadFactory) {
148 this(threadFactory, 100, TimeUnit.MILLISECONDS);
149 }
150
151
152
153
154
155
156
157
158
159
160 public HashedWheelTimer(
161 ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
162 this(threadFactory, tickDuration, unit, 512);
163 }
164
165
166
167
168
169
170
171
172
173
174
175 public HashedWheelTimer(
176 ThreadFactory threadFactory,
177 long tickDuration, TimeUnit unit, int ticksPerWheel) {
178
179 if (threadFactory == null) {
180 throw new NullPointerException("threadFactory");
181 }
182 if (unit == null) {
183 throw new NullPointerException("unit");
184 }
185 if (tickDuration <= 0) {
186 throw new IllegalArgumentException(
187 "tickDuration must be greater than 0: " + tickDuration);
188 }
189 if (ticksPerWheel <= 0) {
190 throw new IllegalArgumentException(
191 "ticksPerWheel must be greater than 0: " + ticksPerWheel);
192 }
193
194
195 wheel = createWheel(ticksPerWheel);
196 iterators = createIterators(wheel);
197 mask = wheel.length - 1;
198
199
200 this.tickDuration = tickDuration = unit.toMillis(tickDuration);
201
202
203 if (tickDuration == Long.MAX_VALUE ||
204 tickDuration >= Long.MAX_VALUE / wheel.length) {
205 throw new IllegalArgumentException(
206 "tickDuration is too long: " +
207 tickDuration + ' ' + unit);
208 }
209
210 roundDuration = tickDuration * wheel.length;
211
212 workerThread = threadFactory.newThread(new ThreadRenamingRunnable(
213 worker, "Hashed wheel timer #" + id.incrementAndGet()));
214
215
216 misuseDetector.increase();
217 }
218
219 @SuppressWarnings("unchecked")
220 private static Set<HashedWheelTimeout>[] createWheel(int ticksPerWheel) {
221 if (ticksPerWheel <= 0) {
222 throw new IllegalArgumentException(
223 "ticksPerWheel must be greater than 0: " + ticksPerWheel);
224 }
225 if (ticksPerWheel > 1073741824) {
226 throw new IllegalArgumentException(
227 "ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
228 }
229
230 ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
231 Set<HashedWheelTimeout>[] wheel = new Set[ticksPerWheel];
232 for (int i = 0; i < wheel.length; i ++) {
233 wheel[i] = new MapBackedSet<HashedWheelTimeout>(
234 new ConcurrentIdentityHashMap<HashedWheelTimeout, Boolean>(16, 0.95f, 4));
235 }
236 return wheel;
237 }
238
239 @SuppressWarnings("unchecked")
240 private static ReusableIterator<HashedWheelTimeout>[] createIterators(Set<HashedWheelTimeout>[] wheel) {
241 ReusableIterator<HashedWheelTimeout>[] iterators = new ReusableIterator[wheel.length];
242 for (int i = 0; i < wheel.length; i ++) {
243 iterators[i] = (ReusableIterator<HashedWheelTimeout>) wheel[i].iterator();
244 }
245 return iterators;
246 }
247
248 private static int normalizeTicksPerWheel(int ticksPerWheel) {
249 int normalizedTicksPerWheel = 1;
250 while (normalizedTicksPerWheel < ticksPerWheel) {
251 normalizedTicksPerWheel <<= 1;
252 }
253 return normalizedTicksPerWheel;
254 }
255
256
257
258
259
260
261
262
263 public synchronized void start() {
264 if (shutdown.get()) {
265 throw new IllegalStateException("cannot be started once stopped");
266 }
267
268 if (!workerThread.isAlive()) {
269 workerThread.start();
270 }
271 }
272
273 public synchronized Set<Timeout> stop() {
274 if (Thread.currentThread() == workerThread) {
275 throw new IllegalStateException(
276 HashedWheelTimer.class.getSimpleName() +
277 ".stop() cannot be called from " +
278 TimerTask.class.getSimpleName());
279 }
280
281 if (!shutdown.compareAndSet(false, true)) {
282 return Collections.emptySet();
283 }
284
285 boolean interrupted = false;
286 while (workerThread.isAlive()) {
287 workerThread.interrupt();
288 try {
289 workerThread.join(100);
290 } catch (InterruptedException e) {
291 interrupted = true;
292 }
293 }
294
295 if (interrupted) {
296 Thread.currentThread().interrupt();
297 }
298
299 misuseDetector.decrease();
300
301 Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
302 for (Set<HashedWheelTimeout> bucket: wheel) {
303 unprocessedTimeouts.addAll(bucket);
304 bucket.clear();
305 }
306
307 return Collections.unmodifiableSet(unprocessedTimeouts);
308 }
309
310 public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
311 final long currentTime = System.currentTimeMillis();
312
313 if (task == null) {
314 throw new NullPointerException("task");
315 }
316 if (unit == null) {
317 throw new NullPointerException("unit");
318 }
319
320 if (!workerThread.isAlive()) {
321 start();
322 }
323
324 delay = unit.toMillis(delay);
325 HashedWheelTimeout timeout = new HashedWheelTimeout(task, currentTime + delay);
326 scheduleTimeout(timeout, delay);
327 return timeout;
328 }
329
330 void scheduleTimeout(HashedWheelTimeout timeout, long delay) {
331
332
333 if (delay < tickDuration) {
334 delay = tickDuration;
335 }
336
337
338 final long lastRoundDelay = delay % roundDuration;
339 final long lastTickDelay = delay % tickDuration;
340 final long relativeIndex =
341 lastRoundDelay / tickDuration + (lastTickDelay != 0? 1 : 0);
342
343 final long remainingRounds =
344 delay / roundDuration - (delay % roundDuration == 0? 1 : 0);
345
346
347 lock.readLock().lock();
348 try {
349 int stopIndex = (int) (wheelCursor + relativeIndex & mask);
350 timeout.stopIndex = stopIndex;
351 timeout.remainingRounds = remainingRounds;
352
353 wheel[stopIndex].add(timeout);
354 } finally {
355 lock.readLock().unlock();
356 }
357 }
358
359 private final class Worker implements Runnable {
360
361 private long startTime;
362 private long tick;
363
364 Worker() {
365 super();
366 }
367
368 public void run() {
369 List<HashedWheelTimeout> expiredTimeouts =
370 new ArrayList<HashedWheelTimeout>();
371
372 startTime = System.currentTimeMillis();
373 tick = 1;
374
375 while (!shutdown.get()) {
376 final long deadline = waitForNextTick();
377 if (deadline > 0) {
378 fetchExpiredTimeouts(expiredTimeouts, deadline);
379 notifyExpiredTimeouts(expiredTimeouts);
380 }
381 }
382 }
383
384 private void fetchExpiredTimeouts(
385 List<HashedWheelTimeout> expiredTimeouts, long deadline) {
386
387
388
389
390
391 lock.writeLock().lock();
392 try {
393 int newWheelCursor = wheelCursor = wheelCursor + 1 & mask;
394 ReusableIterator<HashedWheelTimeout> i = iterators[newWheelCursor];
395 fetchExpiredTimeouts(expiredTimeouts, i, deadline);
396 } finally {
397 lock.writeLock().unlock();
398 }
399 }
400
401 private void fetchExpiredTimeouts(
402 List<HashedWheelTimeout> expiredTimeouts,
403 ReusableIterator<HashedWheelTimeout> i, long deadline) {
404
405 List<HashedWheelTimeout> slipped = null;
406 i.rewind();
407 while (i.hasNext()) {
408 HashedWheelTimeout timeout = i.next();
409 if (timeout.remainingRounds <= 0) {
410 i.remove();
411 if (timeout.deadline <= deadline) {
412 expiredTimeouts.add(timeout);
413 } else {
414
415
416
417
418 if (slipped == null) {
419 slipped = new ArrayList<HashedWheelTimer.HashedWheelTimeout>();
420 }
421 slipped.add(timeout);
422 }
423 } else {
424 timeout.remainingRounds --;
425 }
426 }
427
428
429 if (slipped != null) {
430 for (HashedWheelTimeout timeout: slipped) {
431 scheduleTimeout(timeout, timeout.deadline - deadline);
432 }
433 }
434 }
435
436 private void notifyExpiredTimeouts(
437 List<HashedWheelTimeout> expiredTimeouts) {
438
439 for (int i = expiredTimeouts.size() - 1; i >= 0; i --) {
440 expiredTimeouts.get(i).expire();
441 }
442
443
444 expiredTimeouts.clear();
445 }
446
447 private long waitForNextTick() {
448 long deadline = startTime + tickDuration * tick;
449
450 for (;;) {
451 final long currentTime = System.currentTimeMillis();
452 final long sleepTime = tickDuration * tick - (currentTime - startTime);
453
454 if (sleepTime <= 0) {
455 break;
456 }
457
458 try {
459 Thread.sleep(sleepTime);
460 } catch (InterruptedException e) {
461 if (shutdown.get()) {
462 return -1;
463 }
464 }
465 }
466
467
468 tick ++;
469 return deadline;
470 }
471 }
472
473 private final class HashedWheelTimeout implements Timeout {
474
475 private static final int ST_INIT = 0;
476 private static final int ST_CANCELLED = 1;
477 private static final int ST_EXPIRED = 2;
478
479 private final TimerTask task;
480 final long deadline;
481 volatile int stopIndex;
482 volatile long remainingRounds;
483 private final AtomicInteger state = new AtomicInteger(ST_INIT);
484
485 HashedWheelTimeout(TimerTask task, long deadline) {
486 this.task = task;
487 this.deadline = deadline;
488 }
489
490 public Timer getTimer() {
491 return HashedWheelTimer.this;
492 }
493
494 public TimerTask getTask() {
495 return task;
496 }
497
498 public void cancel() {
499 if (!state.compareAndSet(ST_INIT, ST_CANCELLED)) {
500
501 return;
502 }
503
504 wheel[stopIndex].remove(this);
505 }
506
507 public boolean isCancelled() {
508 return state.get() == ST_CANCELLED;
509 }
510
511 public boolean isExpired() {
512 return state.get() != ST_INIT;
513 }
514
515 public void expire() {
516 if (!state.compareAndSet(ST_INIT, ST_EXPIRED)) {
517 return;
518 }
519
520 try {
521 task.run(this);
522 } catch (Throwable t) {
523 logger.warn(
524 "An exception was thrown by " +
525 TimerTask.class.getSimpleName() + ".", t);
526 }
527 }
528
529 @Override
530 public String toString() {
531 long currentTime = System.currentTimeMillis();
532 long remaining = deadline - currentTime;
533
534 StringBuilder buf = new StringBuilder(192);
535 buf.append(getClass().getSimpleName());
536 buf.append('(');
537
538 buf.append("deadline: ");
539 if (remaining > 0) {
540 buf.append(remaining);
541 buf.append(" ms later, ");
542 } else if (remaining < 0) {
543 buf.append(-remaining);
544 buf.append(" ms ago, ");
545 } else {
546 buf.append("now, ");
547 }
548
549 if (isCancelled()) {
550 buf.append (", cancelled");
551 }
552
553 return buf.append(')').toString();
554 }
555 }
556 }