1 /* 2 * Copyright 2009 Red Hat, Inc. 3 * 4 * Red Hat licenses this file to you under the Apache License, version 2.0 5 * (the "License"); you may not use this file except in compliance with the 6 * License. You may obtain a copy of the License at: 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13 * License for the specific language governing permissions and limitations 14 * under the License. 15 */ 16 package org.jboss.netty.handler.queue; 17 18 import java.io.IOException; 19 import java.util.concurrent.BlockingQueue; 20 import java.util.concurrent.TimeUnit; 21 22 import org.jboss.netty.buffer.ChannelBuffer; 23 import org.jboss.netty.channel.Channel; 24 import org.jboss.netty.channel.ChannelEvent; 25 import org.jboss.netty.channel.ChannelHandlerContext; 26 import org.jboss.netty.channel.ChannelPipeline; 27 import org.jboss.netty.channel.ChannelStateEvent; 28 import org.jboss.netty.channel.ExceptionEvent; 29 import org.jboss.netty.channel.MessageEvent; 30 import org.jboss.netty.channel.SimpleChannelUpstreamHandler; 31 import org.jboss.netty.util.internal.DeadLockProofWorker; 32 import org.jboss.netty.util.internal.LinkedTransferQueue; 33 34 /** 35 * Emulates blocking read operation. This handler stores all received messages 36 * into a {@link BlockingQueue} and returns the received messages when 37 * {@link #read()}, {@link #read(long, TimeUnit)}, {@link #readEvent()}, or 38 * {@link #readEvent(long, TimeUnit)} method is called. 39 * <p> 40 * Please note that this handler is only useful for the cases where there are 41 * very small number of connections, such as testing and simple client-side 42 * application development. 43 * <p> 44 * Also, any handler placed after this handler will never receive 45 * {@code messageReceived}, {@code exceptionCaught}, and {@code channelClosed} 46 * events, hence it should be placed in the last place in a pipeline. 47 * <p> 48 * Here is an example that demonstrates the usage: 49 * <pre> 50 * {@link BlockingReadHandler}<{@link ChannelBuffer}> reader = 51 * new {@link BlockingReadHandler}<{@link ChannelBuffer}>(); 52 * {@link ChannelPipeline} p = ...; 53 * p.addLast("reader", reader); 54 * 55 * ... 56 * 57 * // Read a message from a channel in a blocking manner. 58 * try { 59 * {@link ChannelBuffer} buf = reader.read(60, TimeUnit.SECONDS); 60 * if (buf == null) { 61 * // Connection closed. 62 * } else { 63 * // Handle the received message here. 64 * } 65 * } catch ({@link BlockingReadTimeoutException} e) { 66 * // Read timed out. 67 * } catch (IOException e) { 68 * // Other read errors 69 * } 70 * </pre> 71 * 72 * @param <E> the type of the received messages 73 * 74 * @author <a href="http://www.jboss.org/netty/">The Netty Project</a> 75 * @author <a href="http://gleamynode.net/">Trustin Lee</a> 76 * @version $Rev: 2122 $, $Date: 2010-02-02 11:00:04 +0900 (Tue, 02 Feb 2010) $ 77 */ 78 public class BlockingReadHandler<E> extends SimpleChannelUpstreamHandler { 79 80 private final BlockingQueue<ChannelEvent> queue; 81 private volatile boolean closed; 82 83 /** 84 * Creates a new instance with the default unbounded {@link BlockingQueue} 85 * implementation. 86 */ 87 public BlockingReadHandler() { 88 this(new LinkedTransferQueue<ChannelEvent>()); 89 } 90 91 /** 92 * Creates a new instance with the specified {@link BlockingQueue}. 93 */ 94 public BlockingReadHandler(BlockingQueue<ChannelEvent> queue) { 95 if (queue == null) { 96 throw new NullPointerException("queue"); 97 } 98 this.queue = queue; 99 } 100 101 /** 102 * Returns the queue which stores the received messages. The default 103 * implementation returns the queue which was specified in the constructor. 104 */ 105 protected BlockingQueue<ChannelEvent> getQueue() { 106 return queue; 107 } 108 109 /** 110 * Returns {@code true} if and only if the {@link Channel} associated with 111 * this handler has been closed. 112 * 113 * @throws IllegalStateException 114 * if this handler was not added to a {@link ChannelPipeline} yet 115 */ 116 public boolean isClosed() { 117 return closed; 118 } 119 120 /** 121 * Waits until a new message is received or the associated {@link Channel} 122 * is closed. 123 * 124 * @return the received message or {@code null} if the associated 125 * {@link Channel} has been closed 126 * @throws IOException 127 * if failed to receive a new message 128 * @throws InterruptedException 129 * if the operation has been interrupted 130 */ 131 public E read() throws IOException, InterruptedException { 132 ChannelEvent e = readEvent(); 133 if (e == null) { 134 return null; 135 } 136 137 if (e instanceof MessageEvent) { 138 return getMessage((MessageEvent) e); 139 } else if (e instanceof ExceptionEvent) { 140 throw (IOException) new IOException().initCause(((ExceptionEvent) e).getCause()); 141 } else { 142 throw new IllegalStateException(); 143 } 144 } 145 146 /** 147 * Waits until a new message is received or the associated {@link Channel} 148 * is closed. 149 * 150 * @param timeout 151 * the amount time to wait until a new message is received. 152 * If no message is received within the timeout, 153 * {@link BlockingReadTimeoutException} is thrown. 154 * @param unit 155 * the unit of {@code timeout} 156 * 157 * @return the received message or {@code null} if the associated 158 * {@link Channel} has been closed 159 * @throws BlockingReadTimeoutException 160 * if no message was received within the specified timeout 161 * @throws IOException 162 * if failed to receive a new message 163 * @throws InterruptedException 164 * if the operation has been interrupted 165 */ 166 public E read(long timeout, TimeUnit unit) throws IOException, InterruptedException { 167 ChannelEvent e = readEvent(timeout, unit); 168 if (e == null) { 169 return null; 170 } 171 172 if (e instanceof MessageEvent) { 173 return getMessage((MessageEvent) e); 174 } else if (e instanceof ExceptionEvent) { 175 throw (IOException) new IOException().initCause(((ExceptionEvent) e).getCause()); 176 } else { 177 throw new IllegalStateException(); 178 } 179 } 180 181 /** 182 * Waits until a new {@link ChannelEvent} is received or the associated 183 * {@link Channel} is closed. 184 * 185 * @return a {@link MessageEvent} or an {@link ExceptionEvent}. 186 * {@code null} if the associated {@link Channel} has been closed 187 * @throws InterruptedException 188 * if the operation has been interrupted 189 */ 190 public ChannelEvent readEvent() throws InterruptedException { 191 detectDeadLock(); 192 if (isClosed()) { 193 if (getQueue().isEmpty()) { 194 return null; 195 } 196 } 197 198 ChannelEvent e = getQueue().take(); 199 if (e instanceof ChannelStateEvent) { 200 // channelClosed has been triggered. 201 assert closed; 202 return null; 203 } else { 204 return e; 205 } 206 } 207 208 /** 209 * Waits until a new {@link ChannelEvent} is received or the associated 210 * {@link Channel} is closed. 211 * 212 * @param timeout 213 * the amount time to wait until a new {@link ChannelEvent} is 214 * received. If no message is received within the timeout, 215 * {@link BlockingReadTimeoutException} is thrown. 216 * @param unit 217 * the unit of {@code timeout} 218 * 219 * @return a {@link MessageEvent} or an {@link ExceptionEvent}. 220 * {@code null} if the associated {@link Channel} has been closed 221 * @throws BlockingReadTimeoutException 222 * if no event was received within the specified timeout 223 * @throws InterruptedException 224 * if the operation has been interrupted 225 */ 226 public ChannelEvent readEvent(long timeout, TimeUnit unit) throws InterruptedException, BlockingReadTimeoutException { 227 detectDeadLock(); 228 if (isClosed()) { 229 if (getQueue().isEmpty()) { 230 return null; 231 } 232 } 233 234 ChannelEvent e = getQueue().poll(timeout, unit); 235 if (e == null) { 236 throw new BlockingReadTimeoutException(); 237 } else if (e instanceof ChannelStateEvent) { 238 // channelClosed has been triggered. 239 assert closed; 240 return null; 241 } else { 242 return e; 243 } 244 } 245 246 private void detectDeadLock() { 247 if (DeadLockProofWorker.PARENT.get() != null) { 248 throw new IllegalStateException( 249 "read*(...) in I/O thread causes a dead lock or " + 250 "sudden performance drop. Implement a state machine or " + 251 "call read*() from a different thread."); 252 } 253 } 254 255 @Override 256 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) 257 throws Exception { 258 getQueue().put(e); 259 } 260 261 @Override 262 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) 263 throws Exception { 264 getQueue().put(e); 265 } 266 267 @Override 268 public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) 269 throws Exception { 270 closed = true; 271 getQueue().put(e); 272 } 273 274 @SuppressWarnings("unchecked") 275 private E getMessage(MessageEvent e) { 276 return (E) e.getMessage(); 277 } 278 }