View Javadoc

1   /*
2    * Copyright 2009 Red Hat, Inc.
3    *
4    * Red Hat licenses this file to you under the Apache License, version 2.0
5    * (the "License"); you may not use this file except in compliance with the
6    * License.  You may obtain a copy of the License at:
7    *
8    *    http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.util;
17  
18  import java.util.ArrayList;
19  import java.util.Collections;
20  import java.util.HashSet;
21  import java.util.List;
22  import java.util.Set;
23  import java.util.concurrent.Executors;
24  import java.util.concurrent.ThreadFactory;
25  import java.util.concurrent.TimeUnit;
26  import java.util.concurrent.atomic.AtomicBoolean;
27  import java.util.concurrent.atomic.AtomicInteger;
28  import java.util.concurrent.locks.ReadWriteLock;
29  import java.util.concurrent.locks.ReentrantReadWriteLock;
30  
31  import org.jboss.netty.channel.ChannelPipelineFactory;
32  import org.jboss.netty.logging.InternalLogger;
33  import org.jboss.netty.logging.InternalLoggerFactory;
34  import org.jboss.netty.util.internal.ConcurrentIdentityHashMap;
35  import org.jboss.netty.util.internal.ReusableIterator;
36  import org.jboss.netty.util.internal.SharedResourceMisuseDetector;
37  
38  /**
39   * A {@link Timer} optimized for approximated I/O timeout scheduling.
40   *
41   * <h3>Tick Duration</h3>
42   *
43   * As described with 'approximated', this timer does not execute the scheduled
44   * {@link TimerTask} on time.  {@link HashedWheelTimer}, on every tick, will
45   * check if there are any {@link TimerTask}s behind the schedule and execute
46   * them.
47   * <p>
48   * You can increase or decrease the accuracy of the execution timing by
49   * specifying smaller or larger tick duration in the constructor.  In most
50   * network applications, I/O timeout does not need to be accurate.  Therefore,
51   * the default tick duration is 100 milliseconds and you will not need to try
52   * different configurations in most cases.
53   *
54   * <h3>Ticks per Wheel (Wheel Size)</h3>
55   *
56   * {@link HashedWheelTimer} maintains a data structure called 'wheel'.
57   * To put simply, a wheel is a hash table of {@link TimerTask}s whose hash
58   * function is 'dead line of the task'.  The default number of ticks per wheel
59   * (i.e. the size of the wheel) is 512.  You could specify a larger value
60   * if you are going to schedule a lot of timeouts.
61   *
62   * <h3>Do not create many instances.</h3>
63   *
64   * {@link HashedWheelTimer} creates a new thread whenever it is instantiated and
65   * started.  Therefore, you should make sure to create only one instance and
66   * share it across your application.  One of the common mistakes, that makes
67   * your application unresponsive, is to create a new instance in
68   * {@link ChannelPipelineFactory}, which results in the creation of a new thread
69   * for every connection.
70   *
71   * <h3>Implementation Details</h3>
72   *
73   * {@link HashedWheelTimer} is based on
74   * <a href="http://cseweb.ucsd.edu/users/varghese/">George Varghese</a> and
75   * Tony Lauck's paper,
76   * <a href="http://cseweb.ucsd.edu/users/varghese/PAPERS/twheel.ps.Z">'Hashed
77   * and Hierarchical Timing Wheels: data structures to efficiently implement a
78   * timer facility'</a>.  More comprehensive slides are located
79   * <a href="http://www.cse.wustl.edu/~cdgill/courses/cs6874/TimingWheels.ppt">here</a>.
80   *
81   * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
82   * @author <a href="http://gleamynode.net/">Trustin Lee</a>
83   * @version $Rev: 2297 $, $Date: 2010-06-07 10:50:02 +0900 (Mon, 07 Jun 2010) $
84   */
85  public class HashedWheelTimer implements Timer {
86  
87      static final InternalLogger logger =
88          InternalLoggerFactory.getInstance(HashedWheelTimer.class);
89      private static final AtomicInteger id = new AtomicInteger();
90  
91      private static final SharedResourceMisuseDetector misuseDetector =
92          new SharedResourceMisuseDetector(HashedWheelTimer.class);
93  
94      private final Worker worker = new Worker();
95      final Thread workerThread;
96      final AtomicBoolean shutdown = new AtomicBoolean();
97  
98      private final long roundDuration;
99      final long tickDuration;
100     final Set<HashedWheelTimeout>[] wheel;
101     final ReusableIterator<HashedWheelTimeout>[] iterators;
102     final int mask;
103     final ReadWriteLock lock = new ReentrantReadWriteLock();
104     volatile int wheelCursor;
105 
106     /**
107      * Creates a new timer with the default thread factory
108      * ({@link Executors#defaultThreadFactory()}), default tick duration, and
109      * default number of ticks per wheel.
110      */
111     public HashedWheelTimer() {
112         this(Executors.defaultThreadFactory());
113     }
114 
115     /**
116      * Creates a new timer with the default thread factory
117      * ({@link Executors#defaultThreadFactory()}) and default number of ticks
118      * per wheel.
119      *
120      * @param tickDuration   the duration between tick
121      * @param unit           the time unit of the {@code tickDuration}
122      */
123     public HashedWheelTimer(long tickDuration, TimeUnit unit) {
124         this(Executors.defaultThreadFactory(), tickDuration, unit);
125     }
126 
127     /**
128      * Creates a new timer with the default thread factory
129      * ({@link Executors#defaultThreadFactory()}).
130      *
131      * @param tickDuration   the duration between tick
132      * @param unit           the time unit of the {@code tickDuration}
133      * @param ticksPerWheel  the size of the wheel
134      */
135     public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
136         this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
137     }
138 
139     /**
140      * Creates a new timer with the default tick duration and default number of
141      * ticks per wheel.
142      *
143      * @param threadFactory  a {@link ThreadFactory} that creates a
144      *                       background {@link Thread} which is dedicated to
145      *                       {@link TimerTask} execution.
146      */
147     public HashedWheelTimer(ThreadFactory threadFactory) {
148         this(threadFactory, 100, TimeUnit.MILLISECONDS);
149     }
150 
151     /**
152      * Creates a new timer with the default number of ticks per wheel.
153      *
154      * @param threadFactory  a {@link ThreadFactory} that creates a
155      *                       background {@link Thread} which is dedicated to
156      *                       {@link TimerTask} execution.
157      * @param tickDuration   the duration between tick
158      * @param unit           the time unit of the {@code tickDuration}
159      */
160     public HashedWheelTimer(
161             ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
162         this(threadFactory, tickDuration, unit, 512);
163     }
164 
165     /**
166      * Creates a new timer.
167      *
168      * @param threadFactory  a {@link ThreadFactory} that creates a
169      *                       background {@link Thread} which is dedicated to
170      *                       {@link TimerTask} execution.
171      * @param tickDuration   the duration between tick
172      * @param unit           the time unit of the {@code tickDuration}
173      * @param ticksPerWheel  the size of the wheel
174      */
175     public HashedWheelTimer(
176             ThreadFactory threadFactory,
177             long tickDuration, TimeUnit unit, int ticksPerWheel) {
178 
179         if (threadFactory == null) {
180             throw new NullPointerException("threadFactory");
181         }
182         if (unit == null) {
183             throw new NullPointerException("unit");
184         }
185         if (tickDuration <= 0) {
186             throw new IllegalArgumentException(
187                     "tickDuration must be greater than 0: " + tickDuration);
188         }
189         if (ticksPerWheel <= 0) {
190             throw new IllegalArgumentException(
191                     "ticksPerWheel must be greater than 0: " + ticksPerWheel);
192         }
193 
194         // Normalize ticksPerWheel to power of two and initialize the wheel.
195         wheel = createWheel(ticksPerWheel);
196         iterators = createIterators(wheel);
197         mask = wheel.length - 1;
198 
199         // Convert tickDuration to milliseconds.
200         this.tickDuration = tickDuration = unit.toMillis(tickDuration);
201 
202         // Prevent overflow.
203         if (tickDuration == Long.MAX_VALUE ||
204                 tickDuration >= Long.MAX_VALUE / wheel.length) {
205             throw new IllegalArgumentException(
206                     "tickDuration is too long: " +
207                     tickDuration +  ' ' + unit);
208         }
209 
210         roundDuration = tickDuration * wheel.length;
211 
212         workerThread = threadFactory.newThread(new ThreadRenamingRunnable(
213                         worker, "Hashed wheel timer #" + id.incrementAndGet()));
214 
215         // Misuse check
216         misuseDetector.increase();
217     }
218 
219     @SuppressWarnings("unchecked")
220     private static Set<HashedWheelTimeout>[] createWheel(int ticksPerWheel) {
221         if (ticksPerWheel <= 0) {
222             throw new IllegalArgumentException(
223                     "ticksPerWheel must be greater than 0: " + ticksPerWheel);
224         }
225         if (ticksPerWheel > 1073741824) {
226             throw new IllegalArgumentException(
227                     "ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
228         }
229 
230         ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
231         Set<HashedWheelTimeout>[] wheel = new Set[ticksPerWheel];
232         for (int i = 0; i < wheel.length; i ++) {
233             wheel[i] = new MapBackedSet<HashedWheelTimeout>(
234                     new ConcurrentIdentityHashMap<HashedWheelTimeout, Boolean>(16, 0.95f, 4));
235         }
236         return wheel;
237     }
238 
239     @SuppressWarnings("unchecked")
240     private static ReusableIterator<HashedWheelTimeout>[] createIterators(Set<HashedWheelTimeout>[] wheel) {
241         ReusableIterator<HashedWheelTimeout>[] iterators = new ReusableIterator[wheel.length];
242         for (int i = 0; i < wheel.length; i ++) {
243             iterators[i] = (ReusableIterator<HashedWheelTimeout>) wheel[i].iterator();
244         }
245         return iterators;
246     }
247 
248     private static int normalizeTicksPerWheel(int ticksPerWheel) {
249         int normalizedTicksPerWheel = 1;
250         while (normalizedTicksPerWheel < ticksPerWheel) {
251             normalizedTicksPerWheel <<= 1;
252         }
253         return normalizedTicksPerWheel;
254     }
255 
256     /**
257      * Starts the background thread explicitly.  The background thread will
258      * start automatically on demand even if you did not call this method.
259      *
260      * @throws IllegalStateException if this timer has been
261      *                               {@linkplain #stop() stopped} already
262      */
263     public synchronized void start() {
264         if (shutdown.get()) {
265             throw new IllegalStateException("cannot be started once stopped");
266         }
267 
268         if (!workerThread.isAlive()) {
269             workerThread.start();
270         }
271     }
272 
273     public synchronized Set<Timeout> stop() {
274         if (Thread.currentThread() == workerThread) {
275             throw new IllegalStateException(
276                     HashedWheelTimer.class.getSimpleName() +
277                     ".stop() cannot be called from " +
278                     TimerTask.class.getSimpleName());
279         }
280 
281         if (!shutdown.compareAndSet(false, true)) {
282             return Collections.emptySet();
283         }
284 
285         boolean interrupted = false;
286         while (workerThread.isAlive()) {
287             workerThread.interrupt();
288             try {
289                 workerThread.join(100);
290             } catch (InterruptedException e) {
291                 interrupted = true;
292             }
293         }
294 
295         if (interrupted) {
296             Thread.currentThread().interrupt();
297         }
298 
299         misuseDetector.decrease();
300 
301         Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
302         for (Set<HashedWheelTimeout> bucket: wheel) {
303             unprocessedTimeouts.addAll(bucket);
304             bucket.clear();
305         }
306 
307         return Collections.unmodifiableSet(unprocessedTimeouts);
308     }
309 
310     public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
311         final long currentTime = System.currentTimeMillis();
312 
313         if (task == null) {
314             throw new NullPointerException("task");
315         }
316         if (unit == null) {
317             throw new NullPointerException("unit");
318         }
319 
320         if (!workerThread.isAlive()) {
321             start();
322         }
323 
324         delay = unit.toMillis(delay);
325         HashedWheelTimeout timeout = new HashedWheelTimeout(task, currentTime + delay);
326         scheduleTimeout(timeout, delay);
327         return timeout;
328     }
329 
330     void scheduleTimeout(HashedWheelTimeout timeout, long delay) {
331         // delay must be equal to or greater than tickDuration so that the
332         // worker thread never misses the timeout.
333         if (delay < tickDuration) {
334             delay = tickDuration;
335         }
336 
337         // Prepare the required parameters to schedule the timeout object.
338         final long lastRoundDelay = delay % roundDuration;
339         final long lastTickDelay = delay % tickDuration;
340         final long relativeIndex =
341             lastRoundDelay / tickDuration + (lastTickDelay != 0? 1 : 0);
342 
343         final long remainingRounds =
344             delay / roundDuration - (delay % roundDuration == 0? 1 : 0);
345 
346         // Add the timeout to the wheel.
347         lock.readLock().lock();
348         try {
349             int stopIndex = (int) (wheelCursor + relativeIndex & mask);
350             timeout.stopIndex = stopIndex;
351             timeout.remainingRounds = remainingRounds;
352 
353             wheel[stopIndex].add(timeout);
354         } finally {
355             lock.readLock().unlock();
356         }
357     }
358 
359     private final class Worker implements Runnable {
360 
361         private long startTime;
362         private long tick;
363 
364         Worker() {
365             super();
366         }
367 
368         public void run() {
369             List<HashedWheelTimeout> expiredTimeouts =
370                 new ArrayList<HashedWheelTimeout>();
371 
372             startTime = System.currentTimeMillis();
373             tick = 1;
374 
375             while (!shutdown.get()) {
376                 final long deadline = waitForNextTick();
377                 if (deadline > 0) {
378                     fetchExpiredTimeouts(expiredTimeouts, deadline);
379                     notifyExpiredTimeouts(expiredTimeouts);
380                 }
381             }
382         }
383 
384         private void fetchExpiredTimeouts(
385                 List<HashedWheelTimeout> expiredTimeouts, long deadline) {
386 
387             // Find the expired timeouts and decrease the round counter
388             // if necessary.  Note that we don't send the notification
389             // immediately to make sure the listeners are called without
390             // an exclusive lock.
391             lock.writeLock().lock();
392             try {
393                 int newWheelCursor = wheelCursor = wheelCursor + 1 & mask;
394                 ReusableIterator<HashedWheelTimeout> i = iterators[newWheelCursor];
395                 fetchExpiredTimeouts(expiredTimeouts, i, deadline);
396             } finally {
397                 lock.writeLock().unlock();
398             }
399         }
400 
401         private void fetchExpiredTimeouts(
402                 List<HashedWheelTimeout> expiredTimeouts,
403                 ReusableIterator<HashedWheelTimeout> i, long deadline) {
404 
405             List<HashedWheelTimeout> slipped = null;
406             i.rewind();
407             while (i.hasNext()) {
408                 HashedWheelTimeout timeout = i.next();
409                 if (timeout.remainingRounds <= 0) {
410                     i.remove();
411                     if (timeout.deadline <= deadline) {
412                         expiredTimeouts.add(timeout);
413                     } else {
414                         // Handle the case where the timeout is put into a wrong
415                         // place, usually one tick earlier.  For now, just add
416                         // it to a temporary list - we will reschedule it in a
417                         // separate loop.
418                         if (slipped == null) {
419                             slipped = new ArrayList<HashedWheelTimer.HashedWheelTimeout>();
420                         }
421                         slipped.add(timeout);
422                     }
423                 } else {
424                     timeout.remainingRounds --;
425                 }
426             }
427 
428             // Reschedule the slipped timeouts.
429             if (slipped != null) {
430                 for (HashedWheelTimeout timeout: slipped) {
431                     scheduleTimeout(timeout, timeout.deadline - deadline);
432                 }
433             }
434         }
435 
436         private void notifyExpiredTimeouts(
437                 List<HashedWheelTimeout> expiredTimeouts) {
438             // Notify the expired timeouts.
439             for (int i = expiredTimeouts.size() - 1; i >= 0; i --) {
440                 expiredTimeouts.get(i).expire();
441             }
442 
443             // Clean up the temporary list.
444             expiredTimeouts.clear();
445         }
446 
447         private long waitForNextTick() {
448             long deadline = startTime + tickDuration * tick;
449 
450             for (;;) {
451                 final long currentTime = System.currentTimeMillis();
452                 final long sleepTime = tickDuration * tick - (currentTime - startTime);
453 
454                 if (sleepTime <= 0) {
455                     break;
456                 }
457 
458                 try {
459                     Thread.sleep(sleepTime);
460                 } catch (InterruptedException e) {
461                     if (shutdown.get()) {
462                         return -1;
463                     }
464                 }
465             }
466 
467             // Increase the tick.
468             tick ++;
469             return deadline;
470         }
471     }
472 
473     private final class HashedWheelTimeout implements Timeout {
474 
475         private static final int ST_INIT = 0;
476         private static final int ST_CANCELLED = 1;
477         private static final int ST_EXPIRED = 2;
478 
479         private final TimerTask task;
480         final long deadline;
481         volatile int stopIndex;
482         volatile long remainingRounds;
483         private final AtomicInteger state = new AtomicInteger(ST_INIT);
484 
485         HashedWheelTimeout(TimerTask task, long deadline) {
486             this.task = task;
487             this.deadline = deadline;
488         }
489 
490         public Timer getTimer() {
491             return HashedWheelTimer.this;
492         }
493 
494         public TimerTask getTask() {
495             return task;
496         }
497 
498         public void cancel() {
499             if (!state.compareAndSet(ST_INIT, ST_CANCELLED)) {
500                 // TODO return false
501                 return;
502             }
503             
504             wheel[stopIndex].remove(this);
505         }
506 
507         public boolean isCancelled() {
508             return state.get() == ST_CANCELLED;
509         }
510 
511         public boolean isExpired() {
512             return state.get() != ST_INIT;
513         }
514 
515         public void expire() {
516             if (!state.compareAndSet(ST_INIT, ST_EXPIRED)) {
517                 return;
518             }
519 
520             try {
521                 task.run(this);
522             } catch (Throwable t) {
523                 logger.warn(
524                         "An exception was thrown by " +
525                         TimerTask.class.getSimpleName() + ".", t);
526             }
527         }
528 
529         @Override
530         public String toString() {
531             long currentTime = System.currentTimeMillis();
532             long remaining = deadline - currentTime;
533 
534             StringBuilder buf = new StringBuilder(192);
535             buf.append(getClass().getSimpleName());
536             buf.append('(');
537 
538             buf.append("deadline: ");
539             if (remaining > 0) {
540                 buf.append(remaining);
541                 buf.append(" ms later, ");
542             } else if (remaining < 0) {
543                 buf.append(-remaining);
544                 buf.append(" ms ago, ");
545             } else {
546                 buf.append("now, ");
547             }
548 
549             if (isCancelled()) {
550                 buf.append (", cancelled");
551             }
552 
553             return buf.append(')').toString();
554         }
555     }
556 }