View Javadoc

1   /*
2    * Copyright 2009 Red Hat, Inc.
3    *
4    * Red Hat licenses this file to you under the Apache License, version 2.0
5    * (the "License"); you may not use this file except in compliance with the
6    * License.  You may obtain a copy of the License at:
7    *
8    *    http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.queue;
17  
18  import java.io.IOException;
19  import java.util.concurrent.BlockingQueue;
20  import java.util.concurrent.TimeUnit;
21  
22  import org.jboss.netty.buffer.ChannelBuffer;
23  import org.jboss.netty.channel.Channel;
24  import org.jboss.netty.channel.ChannelEvent;
25  import org.jboss.netty.channel.ChannelHandlerContext;
26  import org.jboss.netty.channel.ChannelPipeline;
27  import org.jboss.netty.channel.ChannelStateEvent;
28  import org.jboss.netty.channel.ExceptionEvent;
29  import org.jboss.netty.channel.MessageEvent;
30  import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
31  import org.jboss.netty.util.internal.DeadLockProofWorker;
32  import org.jboss.netty.util.internal.LinkedTransferQueue;
33  
34  /**
35   * Emulates blocking read operation.  This handler stores all received messages
36   * into a {@link BlockingQueue} and returns the received messages when
37   * {@link #read()}, {@link #read(long, TimeUnit)}, {@link #readEvent()}, or
38   * {@link #readEvent(long, TimeUnit)} method is called.
39   * <p>
40   * Please note that this handler is only useful for the cases where there are
41   * very small number of connections, such as testing and simple client-side
42   * application development.
43   * <p>
44   * Also, any handler placed after this handler will never receive
45   * {@code messageReceived}, {@code exceptionCaught}, and {@code channelClosed}
46   * events, hence it should be placed in the last place in a pipeline.
47   * <p>
48   * Here is an example that demonstrates the usage:
49   * <pre>
50   * {@link BlockingReadHandler}&lt;{@link ChannelBuffer}&gt; reader =
51   *         new {@link BlockingReadHandler}&lt;{@link ChannelBuffer}&gt;();
52   * {@link ChannelPipeline} p = ...;
53   * p.addLast("reader", reader);
54   *
55   * ...
56   *
57   * // Read a message from a channel in a blocking manner.
58   * try {
59   *     {@link ChannelBuffer} buf = reader.read(60, TimeUnit.SECONDS);
60   *     if (buf == null) {
61   *         // Connection closed.
62   *     } else {
63   *         // Handle the received message here.
64   *     }
65   * } catch ({@link BlockingReadTimeoutException} e) {
66   *     // Read timed out.
67   * } catch (IOException e) {
68   *     // Other read errors
69   * }
70   * </pre>
71   *
72   * @param <E> the type of the received messages
73   *
74   * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
75   * @author <a href="http://gleamynode.net/">Trustin Lee</a>
76   * @version $Rev: 2122 $, $Date: 2010-02-02 11:00:04 +0900 (Tue, 02 Feb 2010) $
77   */
78  public class BlockingReadHandler<E> extends SimpleChannelUpstreamHandler {
79  
80      private final BlockingQueue<ChannelEvent> queue;
81      private volatile boolean closed;
82  
83      /**
84       * Creates a new instance with the default unbounded {@link BlockingQueue}
85       * implementation.
86       */
87      public BlockingReadHandler() {
88          this(new LinkedTransferQueue<ChannelEvent>());
89      }
90  
91      /**
92       * Creates a new instance with the specified {@link BlockingQueue}.
93       */
94      public BlockingReadHandler(BlockingQueue<ChannelEvent> queue) {
95          if (queue == null) {
96              throw new NullPointerException("queue");
97          }
98          this.queue = queue;
99      }
100 
101     /**
102      * Returns the queue which stores the received messages.  The default
103      * implementation returns the queue which was specified in the constructor.
104      */
105     protected BlockingQueue<ChannelEvent> getQueue() {
106         return queue;
107     }
108 
109     /**
110      * Returns {@code true} if and only if the {@link Channel} associated with
111      * this handler has been closed.
112      *
113      * @throws IllegalStateException
114      *         if this handler was not added to a {@link ChannelPipeline} yet
115      */
116     public boolean isClosed() {
117         return closed;
118     }
119 
120     /**
121      * Waits until a new message is received or the associated {@link Channel}
122      * is closed.
123      *
124      * @return the received message or {@code null} if the associated
125      *         {@link Channel} has been closed
126      * @throws IOException
127      *         if failed to receive a new message
128      * @throws InterruptedException
129      *         if the operation has been interrupted
130      */
131     public E read() throws IOException, InterruptedException {
132         ChannelEvent e = readEvent();
133         if (e == null) {
134             return null;
135         }
136 
137         if (e instanceof MessageEvent) {
138             return getMessage((MessageEvent) e);
139         } else if (e instanceof ExceptionEvent) {
140             throw (IOException) new IOException().initCause(((ExceptionEvent) e).getCause());
141         } else {
142             throw new IllegalStateException();
143         }
144     }
145 
146     /**
147      * Waits until a new message is received or the associated {@link Channel}
148      * is closed.
149      *
150      * @param timeout
151      *        the amount time to wait until a new message is received.
152      *        If no message is received within the timeout,
153      *        {@link BlockingReadTimeoutException} is thrown.
154      * @param unit
155      *        the unit of {@code timeout}
156      *
157      * @return the received message or {@code null} if the associated
158      *         {@link Channel} has been closed
159      * @throws BlockingReadTimeoutException
160      *         if no message was received within the specified timeout
161      * @throws IOException
162      *         if failed to receive a new message
163      * @throws InterruptedException
164      *         if the operation has been interrupted
165      */
166     public E read(long timeout, TimeUnit unit) throws IOException, InterruptedException {
167         ChannelEvent e = readEvent(timeout, unit);
168         if (e == null) {
169             return null;
170         }
171 
172         if (e instanceof MessageEvent) {
173             return getMessage((MessageEvent) e);
174         } else if (e instanceof ExceptionEvent) {
175             throw (IOException) new IOException().initCause(((ExceptionEvent) e).getCause());
176         } else {
177             throw new IllegalStateException();
178         }
179     }
180 
181     /**
182      * Waits until a new {@link ChannelEvent} is received or the associated
183      * {@link Channel} is closed.
184      *
185      * @return a {@link MessageEvent} or an {@link ExceptionEvent}.
186      *         {@code null} if the associated {@link Channel} has been closed
187      * @throws InterruptedException
188      *         if the operation has been interrupted
189      */
190     public ChannelEvent readEvent() throws InterruptedException {
191         detectDeadLock();
192         if (isClosed()) {
193             if (getQueue().isEmpty()) {
194                 return null;
195             }
196         }
197 
198         ChannelEvent e = getQueue().take();
199         if (e instanceof ChannelStateEvent) {
200             // channelClosed has been triggered.
201             assert closed;
202             return null;
203         } else {
204             return e;
205         }
206     }
207 
208     /**
209      * Waits until a new {@link ChannelEvent} is received or the associated
210      * {@link Channel} is closed.
211      *
212      * @param timeout
213      *        the amount time to wait until a new {@link ChannelEvent} is
214      *        received.  If no message is received within the timeout,
215      *        {@link BlockingReadTimeoutException} is thrown.
216      * @param unit
217      *        the unit of {@code timeout}
218      *
219      * @return a {@link MessageEvent} or an {@link ExceptionEvent}.
220      *         {@code null} if the associated {@link Channel} has been closed
221      * @throws BlockingReadTimeoutException
222      *         if no event was received within the specified timeout
223      * @throws InterruptedException
224      *         if the operation has been interrupted
225      */
226     public ChannelEvent readEvent(long timeout, TimeUnit unit) throws InterruptedException, BlockingReadTimeoutException {
227         detectDeadLock();
228         if (isClosed()) {
229             if (getQueue().isEmpty()) {
230                 return null;
231             }
232         }
233 
234         ChannelEvent e = getQueue().poll(timeout, unit);
235         if (e == null) {
236             throw new BlockingReadTimeoutException();
237         } else if (e instanceof ChannelStateEvent) {
238             // channelClosed has been triggered.
239             assert closed;
240             return null;
241         } else {
242             return e;
243         }
244     }
245 
246     private void detectDeadLock() {
247         if (DeadLockProofWorker.PARENT.get() != null) {
248             throw new IllegalStateException(
249                     "read*(...) in I/O thread causes a dead lock or " +
250                     "sudden performance drop. Implement a state machine or " +
251                     "call read*() from a different thread.");
252         }
253     }
254 
255     @Override
256     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
257             throws Exception {
258         getQueue().put(e);
259     }
260 
261     @Override
262     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
263             throws Exception {
264         getQueue().put(e);
265     }
266 
267     @Override
268     public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
269             throws Exception {
270         closed = true;
271         getQueue().put(e);
272     }
273 
274     @SuppressWarnings("unchecked")
275     private E getMessage(MessageEvent e) {
276         return (E) e.getMessage();
277     }
278 }