View Javadoc

1   /*
2    * Copyright 2009 Red Hat, Inc.
3    *
4    * Red Hat licenses this file to you under the Apache License, version 2.0
5    * (the "License"); you may not use this file except in compliance with the
6    * License.  You may obtain a copy of the License at:
7    *
8    *    http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.ssl;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  
20  import java.io.IOException;
21  import java.nio.ByteBuffer;
22  import java.nio.channels.ClosedChannelException;
23  import java.util.LinkedList;
24  import java.util.Queue;
25  import java.util.concurrent.Executor;
26  import java.util.concurrent.atomic.AtomicBoolean;
27  import java.util.regex.Pattern;
28  
29  import javax.net.ssl.SSLEngine;
30  import javax.net.ssl.SSLEngineResult;
31  import javax.net.ssl.SSLEngineResult.HandshakeStatus;
32  import javax.net.ssl.SSLEngineResult.Status;
33  import javax.net.ssl.SSLException;
34  
35  import org.jboss.netty.buffer.ChannelBuffer;
36  import org.jboss.netty.buffer.ChannelBuffers;
37  import org.jboss.netty.channel.Channel;
38  import org.jboss.netty.channel.ChannelDownstreamHandler;
39  import org.jboss.netty.channel.ChannelEvent;
40  import org.jboss.netty.channel.ChannelFuture;
41  import org.jboss.netty.channel.ChannelFutureListener;
42  import org.jboss.netty.channel.ChannelHandlerContext;
43  import org.jboss.netty.channel.ChannelPipeline;
44  import org.jboss.netty.channel.ChannelStateEvent;
45  import org.jboss.netty.channel.Channels;
46  import org.jboss.netty.channel.DownstreamMessageEvent;
47  import org.jboss.netty.channel.ExceptionEvent;
48  import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
49  import org.jboss.netty.channel.MessageEvent;
50  import org.jboss.netty.handler.codec.frame.FrameDecoder;
51  import org.jboss.netty.logging.InternalLogger;
52  import org.jboss.netty.logging.InternalLoggerFactory;
53  import org.jboss.netty.util.internal.LinkedTransferQueue;
54  import org.jboss.netty.util.internal.NonReentrantLock;
55  
56  /**
57   * Adds <a href="http://en.wikipedia.org/wiki/Transport_Layer_Security">SSL
58   * &middot; TLS</a> and StartTLS support to a {@link Channel}.  Please refer
59   * to the <strong>"SecureChat"</strong> example in the distribution or the web
60   * site for the detailed usage.
61   *
62   * <h3>Beginning the handshake</h3>
63   * <p>
64   * You must make sure not to write a message while the
65   * {@linkplain #handshake() handshake} is in progress unless you are
66   * renegotiating.  You will be notified by the {@link ChannelFuture} which is
67   * returned by the {@link #handshake()} method when the handshake
68   * process succeeds or fails.
69   *
70   * <h3>Renegotiation</h3>
71   * <p>
72   * If {@link #isEnableRenegotiation() enableRenegotiation} is {@code true}
73   * (default) and the initial handshake has been done successfully, you can call
74   * {@link #handshake()} to trigger the renegotiation.
75   * <p>
76   * If {@link #isEnableRenegotiation() enableRenegotiation} is {@code false},
77   * an attempt to trigger renegotiation will result in the connection closure.
78   * <p>
79   * Please note that TLS renegotiation had a security issue before.  If your
80   * runtime environment did not fix it, please make sure to disable TLS
81   * renegotiation by calling {@link #setEnableRenegotiation(boolean)} with
82   * {@code false}.  For more information, please refer to the following documents:
83   * <ul>
84   *   <li><a href="http://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2009-3555">CVE-2009-3555</a></li>
85   *   <li><a href="http://www.ietf.org/rfc/rfc5746.txt">RFC5746</a></li>
86   *   <li><a href="http://www.oracle.com/technetwork/java/javase/documentation/tlsreadme2-176330.html">Phased Approach to Fixing the TLS Renegotiation Issue</a></li>
87   * </ul>
88   *
89   * <h3>Closing the session</h3>
90   * <p>
91   * To close the SSL session, the {@link #close()} method should be
92   * called to send the {@code close_notify} message to the remote peer.  One
93   * exception is when you close the {@link Channel} - {@link SslHandler}
94   * intercepts the close request and send the {@code close_notify} message
95   * before the channel closure automatically.  Once the SSL session is closed,
96   * it is not reusable, and consequently you should create a new
97   * {@link SslHandler} with a new {@link SSLEngine} as explained in the
98   * following section.
99   *
100  * <h3>Restarting the session</h3>
101  * <p>
102  * To restart the SSL session, you must remove the existing closed
103  * {@link SslHandler} from the {@link ChannelPipeline}, insert a new
104  * {@link SslHandler} with a new {@link SSLEngine} into the pipeline,
105  * and start the handshake process as described in the first section.
106  *
107  * <h3>Implementing StartTLS</h3>
108  * <p>
109  * <a href="http://en.wikipedia.org/wiki/STARTTLS">StartTLS</a> is the
110  * communication pattern that secures the wire in the middle of the plaintext
111  * connection.  Please note that it is different from SSL &middot; TLS, that
112  * secures the wire from the beginning of the connection.  Typically, StartTLS
113  * is composed of three steps:
114  * <ol>
115  * <li>Client sends a StartTLS request to server.</li>
116  * <li>Server sends a StartTLS response to client.</li>
117  * <li>Client begins SSL handshake.</li>
118  * </ol>
119  * If you implement a server, you need to:
120  * <ol>
121  * <li>create a new {@link SslHandler} instance with {@code startTls} flag set
122  *     to {@code true},</li>
123  * <li>insert the {@link SslHandler} to the {@link ChannelPipeline}, and</li>
124  * <li>write a StartTLS response.</li>
125  * </ol>
126  * Please note that you must insert {@link SslHandler} <em>before</em> sending
127  * the StartTLS response.  Otherwise the client can send begin SSL handshake
128  * before {@link SslHandler} is inserted to the {@link ChannelPipeline}, causing
129  * data corruption.
130  * <p>
131  * The client-side implementation is much simpler.
132  * <ol>
133  * <li>Write a StartTLS request,</li>
134  * <li>wait for the StartTLS response,</li>
135  * <li>create a new {@link SslHandler} instance with {@code startTls} flag set
136  *     to {@code false},</li>
137  * <li>insert the {@link SslHandler} to the {@link ChannelPipeline}, and</li>
138  * <li>Initiate SSL handshake by calling {@link SslHandler#handshake()}.</li>
139  * </ol>
140  *
141  * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
142  * @author <a href="http://gleamynode.net/">Trustin Lee</a>
143  *
144  * @version $Rev: 2369 $, $Date: 2010-10-19 13:05:28 +0900 (Tue, 19 Oct 2010) $
145  *
146  * @apiviz.landmark
147  * @apiviz.uses org.jboss.netty.handler.ssl.SslBufferPool
148  */
149 public class SslHandler extends FrameDecoder
150                         implements ChannelDownstreamHandler,
151                                    LifeCycleAwareChannelHandler {
152 
153     private static final InternalLogger logger =
154         InternalLoggerFactory.getInstance(SslHandler.class);
155 
156     private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
157 
158     private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile(
159             "^.*(?:connection.*reset|connection.*closed|broken.*pipe).*$",
160             Pattern.CASE_INSENSITIVE);
161 
162     private static SslBufferPool defaultBufferPool;
163 
164     /**
165      * Returns the default {@link SslBufferPool} used when no pool is
166      * specified in the constructor.
167      */
168     public static synchronized SslBufferPool getDefaultBufferPool() {
169         if (defaultBufferPool == null) {
170             defaultBufferPool = new SslBufferPool();
171         }
172         return defaultBufferPool;
173     }
174 
175     private volatile ChannelHandlerContext ctx;
176     private final SSLEngine engine;
177     private final SslBufferPool bufferPool;
178     private final Executor delegatedTaskExecutor;
179     private final boolean startTls;
180 
181     private volatile boolean enableRenegotiation = true;
182 
183     final Object handshakeLock = new Object();
184     private boolean handshaking;
185     private volatile boolean handshaken;
186     private volatile ChannelFuture handshakeFuture;
187 
188     private final AtomicBoolean sentFirstMessage = new AtomicBoolean();
189     private final AtomicBoolean sentCloseNotify = new AtomicBoolean();
190     int ignoreClosedChannelException;
191     final Object ignoreClosedChannelExceptionLock = new Object();
192     private final Queue<PendingWrite> pendingUnencryptedWrites = new LinkedList<PendingWrite>();
193     private final Queue<MessageEvent> pendingEncryptedWrites = new LinkedTransferQueue<MessageEvent>();
194     private final NonReentrantLock pendingEncryptedWritesLock = new NonReentrantLock();
195 
196     /**
197      * Creates a new instance.
198      *
199      * @param engine  the {@link SSLEngine} this handler will use
200      */
201     public SslHandler(SSLEngine engine) {
202         this(engine, getDefaultBufferPool(), ImmediateExecutor.INSTANCE);
203     }
204 
205     /**
206      * Creates a new instance.
207      *
208      * @param engine      the {@link SSLEngine} this handler will use
209      * @param bufferPool  the {@link SslBufferPool} where this handler will
210      *                    acquire the buffers required by the {@link SSLEngine}
211      */
212     public SslHandler(SSLEngine engine, SslBufferPool bufferPool) {
213         this(engine, bufferPool, ImmediateExecutor.INSTANCE);
214     }
215 
216     /**
217      * Creates a new instance.
218      *
219      * @param engine    the {@link SSLEngine} this handler will use
220      * @param startTls  {@code true} if the first write request shouldn't be
221      *                  encrypted by the {@link SSLEngine}
222      */
223     public SslHandler(SSLEngine engine, boolean startTls) {
224         this(engine, getDefaultBufferPool(), startTls);
225     }
226 
227     /**
228      * Creates a new instance.
229      *
230      * @param engine      the {@link SSLEngine} this handler will use
231      * @param bufferPool  the {@link SslBufferPool} where this handler will
232      *                    acquire the buffers required by the {@link SSLEngine}
233      * @param startTls    {@code true} if the first write request shouldn't be
234      *                    encrypted by the {@link SSLEngine}
235      */
236     public SslHandler(SSLEngine engine, SslBufferPool bufferPool, boolean startTls) {
237         this(engine, bufferPool, startTls, ImmediateExecutor.INSTANCE);
238     }
239 
240     /**
241      * Creates a new instance.
242      *
243      * @param engine
244      *        the {@link SSLEngine} this handler will use
245      * @param delegatedTaskExecutor
246      *        the {@link Executor} which will execute the delegated task
247      *        that {@link SSLEngine#getDelegatedTask()} will return
248      */
249     public SslHandler(SSLEngine engine, Executor delegatedTaskExecutor) {
250         this(engine, getDefaultBufferPool(), delegatedTaskExecutor);
251     }
252 
253     /**
254      * Creates a new instance.
255      *
256      * @param engine
257      *        the {@link SSLEngine} this handler will use
258      * @param bufferPool
259      *        the {@link SslBufferPool} where this handler will acquire
260      *        the buffers required by the {@link SSLEngine}
261      * @param delegatedTaskExecutor
262      *        the {@link Executor} which will execute the delegated task
263      *        that {@link SSLEngine#getDelegatedTask()} will return
264      */
265     public SslHandler(SSLEngine engine, SslBufferPool bufferPool, Executor delegatedTaskExecutor) {
266         this(engine, bufferPool, false, delegatedTaskExecutor);
267     }
268 
269     /**
270      * Creates a new instance.
271      *
272      * @param engine
273      *        the {@link SSLEngine} this handler will use
274      * @param startTls
275      *        {@code true} if the first write request shouldn't be encrypted
276      *        by the {@link SSLEngine}
277      * @param delegatedTaskExecutor
278      *        the {@link Executor} which will execute the delegated task
279      *        that {@link SSLEngine#getDelegatedTask()} will return
280      */
281     public SslHandler(SSLEngine engine, boolean startTls, Executor delegatedTaskExecutor) {
282         this(engine, getDefaultBufferPool(), startTls, delegatedTaskExecutor);
283     }
284 
285     /**
286      * Creates a new instance.
287      *
288      * @param engine
289      *        the {@link SSLEngine} this handler will use
290      * @param bufferPool
291      *        the {@link SslBufferPool} where this handler will acquire
292      *        the buffers required by the {@link SSLEngine}
293      * @param startTls
294      *        {@code true} if the first write request shouldn't be encrypted
295      *        by the {@link SSLEngine}
296      * @param delegatedTaskExecutor
297      *        the {@link Executor} which will execute the delegated task
298      *        that {@link SSLEngine#getDelegatedTask()} will return
299      */
300     public SslHandler(SSLEngine engine, SslBufferPool bufferPool, boolean startTls, Executor delegatedTaskExecutor) {
301         if (engine == null) {
302             throw new NullPointerException("engine");
303         }
304         if (bufferPool == null) {
305             throw new NullPointerException("bufferPool");
306         }
307         if (delegatedTaskExecutor == null) {
308             throw new NullPointerException("delegatedTaskExecutor");
309         }
310         this.engine = engine;
311         this.bufferPool = bufferPool;
312         this.delegatedTaskExecutor = delegatedTaskExecutor;
313         this.startTls = startTls;
314     }
315 
316     /**
317      * Returns the {@link SSLEngine} which is used by this handler.
318      */
319     public SSLEngine getEngine() {
320         return engine;
321     }
322 
323     /**
324      * Starts an SSL / TLS handshake for the specified channel.
325      *
326      * @return a {@link ChannelFuture} which is notified when the handshake
327      *         succeeds or fails.
328      */
329     public ChannelFuture handshake() {
330         if (handshaken && !isEnableRenegotiation()) {
331             throw new IllegalStateException("renegotiation disabled");
332         }
333 
334         ChannelHandlerContext ctx = this.ctx;
335         Channel channel = ctx.getChannel();
336         ChannelFuture handshakeFuture;
337         Exception exception = null;
338 
339         synchronized (handshakeLock) {
340             if (handshaking) {
341                 return this.handshakeFuture;
342             } else {
343                 handshaking = true;
344                 try {
345                     engine.beginHandshake();
346                     runDelegatedTasks();
347                     handshakeFuture = this.handshakeFuture = future(channel);
348                 } catch (Exception e) {
349                     handshakeFuture = this.handshakeFuture = failedFuture(channel, e);
350                     exception = e;
351                 }
352             }
353         }
354 
355         if (exception == null) { // Began handshake successfully.
356             try {
357                 wrapNonAppData(ctx, channel);
358             } catch (SSLException e) {
359                 fireExceptionCaught(ctx, e);
360                 handshakeFuture.setFailure(e);
361             }
362         } else { // Failed to initiate handshake.
363             fireExceptionCaught(ctx, exception);
364         }
365 
366         return handshakeFuture;
367     }
368 
369     /**
370      * @deprecated Use {@link #handshake()} instead.
371      */
372     @Deprecated
373     public ChannelFuture handshake(Channel channel) {
374         return handshake();
375     }
376 
377     /**
378      * Sends an SSL {@code close_notify} message to the specified channel and
379      * destroys the underlying {@link SSLEngine}.
380      */
381     public ChannelFuture close() {
382         ChannelHandlerContext ctx = this.ctx;
383         Channel channel = ctx.getChannel();
384         try {
385             engine.closeOutbound();
386             return wrapNonAppData(ctx, channel);
387         } catch (SSLException e) {
388             fireExceptionCaught(ctx, e);
389             return failedFuture(channel, e);
390         }
391     }
392 
393     /**
394      * @deprecated Use {@link #close()} instead.
395      */
396     @Deprecated
397     public ChannelFuture close(Channel channel) {
398         return close();
399     }
400 
401     /**
402      * Returns {@code true} if and only if TLS renegotiation is enabled.
403      */
404     public boolean isEnableRenegotiation() {
405         return enableRenegotiation;
406     }
407 
408     /**
409      * Enables or disables TLS renegotiation.
410      */
411     public void setEnableRenegotiation(boolean enableRenegotiation) {
412         this.enableRenegotiation = enableRenegotiation;
413     }
414 
415     public void handleDownstream(
416             final ChannelHandlerContext context, final ChannelEvent evt) throws Exception {
417         if (evt instanceof ChannelStateEvent) {
418             ChannelStateEvent e = (ChannelStateEvent) evt;
419             switch (e.getState()) {
420             case OPEN:
421             case CONNECTED:
422             case BOUND:
423                 if (Boolean.FALSE.equals(e.getValue()) || e.getValue() == null) {
424                     closeOutboundAndChannel(context, e);
425                     return;
426                 }
427             }
428         }
429         if (!(evt instanceof MessageEvent)) {
430             context.sendDownstream(evt);
431             return;
432         }
433 
434         MessageEvent e = (MessageEvent) evt;
435         if (!(e.getMessage() instanceof ChannelBuffer)) {
436             context.sendDownstream(evt);
437             return;
438         }
439 
440         // Do not encrypt the first write request if this handler is
441         // created with startTLS flag turned on.
442         if (startTls && sentFirstMessage.compareAndSet(false, true)) {
443             context.sendDownstream(evt);
444             return;
445         }
446 
447         // Otherwise, all messages are encrypted.
448         ChannelBuffer msg = (ChannelBuffer) e.getMessage();
449         PendingWrite pendingWrite;
450 
451         if (msg.readable()) {
452             pendingWrite = new PendingWrite(evt.getFuture(), msg.toByteBuffer(msg.readerIndex(), msg.readableBytes()));
453         } else {
454             pendingWrite = new PendingWrite(evt.getFuture(), null);
455         }
456         synchronized (pendingUnencryptedWrites) {
457             boolean offered = pendingUnencryptedWrites.offer(pendingWrite);
458             assert offered;
459         }
460 
461         wrap(context, evt.getChannel());
462     }
463 
464     @Override
465     public void channelDisconnected(ChannelHandlerContext ctx,
466             ChannelStateEvent e) throws Exception {
467 
468         // Make sure the handshake future is notified when a connection has
469         // been closed during handshake.
470         synchronized (handshakeLock) {
471             if (handshaking) {
472                 handshakeFuture.setFailure(new ClosedChannelException());
473             }
474         }
475 
476         try {
477             super.channelDisconnected(ctx, e);
478         } finally {
479             unwrap(ctx, e.getChannel(), ChannelBuffers.EMPTY_BUFFER, 0, 0);
480             engine.closeOutbound();
481             if (!sentCloseNotify.get() && handshaken) {
482                 try {
483                     engine.closeInbound();
484                 } catch (SSLException ex) {
485                     logger.debug("Failed to clean up SSLEngine.", ex);
486                 }
487             }
488         }
489     }
490 
491     @Override
492     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
493             throws Exception {
494 
495         Throwable cause = e.getCause();
496         if (cause instanceof IOException) {
497             if (cause instanceof ClosedChannelException) {
498                 synchronized (ignoreClosedChannelExceptionLock) {
499                     if (ignoreClosedChannelException > 0) {
500                         ignoreClosedChannelException --;
501                         logger.debug(
502                                 "Swallowing an exception raised while " +
503                                 "writing non-app data", cause);
504                         return;
505                     }
506                 }
507             } else if (engine.isOutboundDone()) {
508                 String message = String.valueOf(cause.getMessage()).toLowerCase();
509                 if (IGNORABLE_ERROR_MESSAGE.matcher(message).matches()) {
510                     // It is safe to ignore the 'connection reset by peer' or
511                     // 'broken pipe' error after sending closure_notify.
512                     logger.debug(
513                             "Swallowing a 'connection reset by peer / " +
514                             "broken pipe' error occurred while writing " +
515                             "'closure_notify'", cause);
516 
517                     // Close the connection explicitly just in case the transport
518                     // did not close the connection automatically.
519                     Channels.close(ctx, succeededFuture(e.getChannel()));
520                     return;
521                 }
522             }
523         }
524 
525         ctx.sendUpstream(e);
526     }
527 
528     @Override
529     protected Object decode(
530             final ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
531 
532         if (buffer.readableBytes() < 5) {
533             return null;
534         }
535 
536         int packetLength = 0;
537 
538         // SSLv3 or TLS - Check ContentType
539         boolean tls;
540         switch (buffer.getUnsignedByte(buffer.readerIndex())) {
541         case 20:  // change_cipher_spec
542         case 21:  // alert
543         case 22:  // handshake
544         case 23:  // application_data
545             tls = true;
546             break;
547         default:
548             // SSLv2 or bad data
549             tls = false;
550         }
551 
552         if (tls) {
553             // SSLv3 or TLS - Check ProtocolVersion
554             int majorVersion = buffer.getUnsignedByte(buffer.readerIndex() + 1);
555             if (majorVersion >= 3 && majorVersion < 10) {
556                 // SSLv3 or TLS
557                 packetLength = (getShort(buffer, buffer.readerIndex() + 3) & 0xFFFF) + 5;
558                 if (packetLength <= 5) {
559                     // Neither SSLv2 or TLSv1 (i.e. SSLv2 or bad data)
560                     tls = false;
561                 }
562             } else {
563                 // Neither SSLv2 or TLSv1 (i.e. SSLv2 or bad data)
564                 tls = false;
565             }
566         }
567 
568         if (!tls) {
569             // SSLv2 or bad data - Check the version
570             boolean sslv2 = true;
571             int headerLength = (buffer.getUnsignedByte(
572                     buffer.readerIndex()) & 0x80) != 0 ? 2 : 3;
573             int majorVersion = buffer.getUnsignedByte(
574                     buffer.readerIndex() + headerLength + 1);
575             if (majorVersion >= 2 && majorVersion < 10) {
576                 // SSLv2
577                 if (headerLength == 2) {
578                     packetLength = (getShort(buffer, buffer.readerIndex()) & 0x7FFF) + 2;
579                 } else {
580                     packetLength = (getShort(buffer, buffer.readerIndex()) & 0x3FFF) + 3;
581                 }
582                 if (packetLength <= headerLength) {
583                     sslv2 = false;
584                 }
585             } else {
586                 sslv2 = false;
587             }
588 
589             if (!sslv2) {
590                 // Bad data - discard the buffer and raise an exception.
591                 SSLException e = new SSLException(
592                         "not an SSL/TLS record: " + ChannelBuffers.hexDump(buffer));
593                 buffer.skipBytes(buffer.readableBytes());
594                 throw e;
595             }
596         }
597 
598         assert packetLength > 0;
599 
600         if (buffer.readableBytes() < packetLength) {
601             return null;
602         }
603 
604         // We advance the buffer's readerIndex before calling unwrap() because
605         // unwrap() can trigger FrameDecoder call decode(), this method, recursively.
606         // The recursive call results in decoding the same packet twice if
607         // the readerIndex is advanced *after* decode().
608         //
609         // Here's an example:
610         // 1) An SSL packet is received from the wire.
611         // 2) SslHandler.decode() deciphers the packet and calls the user code.
612         // 3) The user closes the channel in the same thread.
613         // 4) The same thread triggers a channelDisconnected() event.
614         // 5) FrameDecoder.cleanup() is called, and it calls SslHandler.decode().
615         // 6) SslHandler.decode() will feed the same packet with what was
616         //    deciphered at the step 2 again if the readerIndex was not advanced
617         //    before calling the user code.
618         final int packetOffset = buffer.readerIndex();
619         buffer.skipBytes(packetLength);
620         return unwrap(ctx, channel, buffer, packetOffset, packetLength);
621     }
622 
623     /**
624      * Reads a big-endian short integer from the buffer.  Please note that we do not use
625      * {@link ChannelBuffer#getShort(int)} because it might be a little-endian buffer.
626      */
627     private static short getShort(ChannelBuffer buf, int offset) {
628         return (short) (buf.getByte(offset) << 8 | buf.getByte(offset + 1) & 0xFF);
629     }
630 
631     private ChannelFuture wrap(ChannelHandlerContext context, Channel channel)
632             throws SSLException {
633 
634         ChannelFuture future = null;
635         ChannelBuffer msg;
636         ByteBuffer outNetBuf = bufferPool.acquire();
637         boolean success = true;
638         boolean offered = false;
639         boolean needsUnwrap = false;
640         try {
641             loop:
642             for (;;) {
643                 // Acquire a lock to make sure unencrypted data is polled
644                 // in order and their encrypted counterpart is offered in
645                 // order.
646                 synchronized (pendingUnencryptedWrites) {
647                     PendingWrite pendingWrite = pendingUnencryptedWrites.peek();
648                     if (pendingWrite == null) {
649                         break;
650                     }
651 
652                     ByteBuffer outAppBuf = pendingWrite.outAppBuf;
653                     if (outAppBuf == null) {
654                         // A write request with an empty buffer
655                         pendingUnencryptedWrites.remove();
656                         offerEncryptedWriteRequest(
657                                 new DownstreamMessageEvent(
658                                         channel, pendingWrite.future,
659                                         ChannelBuffers.EMPTY_BUFFER,
660                                         channel.getRemoteAddress()));
661                         offered = true;
662                     } else {
663                         SSLEngineResult result = null;
664                         try {
665                             synchronized (handshakeLock) {
666                                 result = engine.wrap(outAppBuf, outNetBuf);
667                             }
668                         } finally {
669                             if (!outAppBuf.hasRemaining()) {
670                                 pendingUnencryptedWrites.remove();
671                             }
672                         }
673 
674                         if (result.bytesProduced() > 0) {
675                             outNetBuf.flip();
676                             msg = ChannelBuffers.buffer(outNetBuf.remaining());
677                             msg.writeBytes(outNetBuf.array(), 0, msg.capacity());
678                             outNetBuf.clear();
679 
680                             if (pendingWrite.outAppBuf.hasRemaining()) {
681                                 // pendingWrite's future shouldn't be notified if
682                                 // only partial data is written.
683                                 future = succeededFuture(channel);
684                             } else {
685                                 future = pendingWrite.future;
686                             }
687 
688                             MessageEvent encryptedWrite = new DownstreamMessageEvent(
689                                     channel, future, msg, channel.getRemoteAddress());
690                             offerEncryptedWriteRequest(encryptedWrite);
691                             offered = true;
692                         } else if (result.getStatus() == Status.CLOSED) {
693                             // SSLEngine has been closed already.
694                             // Any further write attempts should be denied.
695                             success = false;
696                             break;
697                         } else {
698                             final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
699                             handleRenegotiation(handshakeStatus);
700                             switch (handshakeStatus) {
701                             case NEED_WRAP:
702                                 if (outAppBuf.hasRemaining()) {
703                                     break;
704                                 } else {
705                                     break loop;
706                                 }
707                             case NEED_UNWRAP:
708                                 needsUnwrap = true;
709                                 break loop;
710                             case NEED_TASK:
711                                 runDelegatedTasks();
712                                 break;
713                             case FINISHED:
714                             case NOT_HANDSHAKING:
715                                 if (handshakeStatus == HandshakeStatus.FINISHED) {
716                                     setHandshakeSuccess(channel);
717                                 }
718                                 if (result.getStatus() == Status.CLOSED) {
719                                     success = false;
720                                 }
721                                 break loop;
722                             default:
723                                 throw new IllegalStateException(
724                                         "Unknown handshake status: " +
725                                         handshakeStatus);
726                             }
727                         }
728                     }
729                 }
730             }
731         } catch (SSLException e) {
732             success = false;
733             setHandshakeFailure(channel, e);
734             throw e;
735         } finally {
736             bufferPool.release(outNetBuf);
737 
738             if (offered) {
739                 flushPendingEncryptedWrites(context);
740             }
741 
742             if (!success) {
743                 IllegalStateException cause =
744                     new IllegalStateException("SSLEngine already closed");
745                 // Mark all remaining pending writes as failure if anything
746                 // wrong happened before the write requests are wrapped.
747                 // Please note that we do not call setFailure while a lock is
748                 // acquired, to avoid a potential dead lock.
749                 for (;;) {
750                     PendingWrite pendingWrite;
751                     synchronized (pendingUnencryptedWrites) {
752                         pendingWrite = pendingUnencryptedWrites.poll();
753                         if (pendingWrite == null) {
754                             break;
755                         }
756                     }
757 
758                     pendingWrite.future.setFailure(cause);
759                 }
760             }
761         }
762 
763         if (needsUnwrap) {
764             unwrap(context, channel, ChannelBuffers.EMPTY_BUFFER, 0, 0);
765         }
766 
767         if (future == null) {
768             future = succeededFuture(channel);
769         }
770         return future;
771     }
772 
773     private void offerEncryptedWriteRequest(MessageEvent encryptedWrite) {
774         final boolean locked = pendingEncryptedWritesLock.tryLock();
775         try {
776             pendingEncryptedWrites.offer(encryptedWrite);
777         } finally {
778             if (locked) {
779                 pendingEncryptedWritesLock.unlock();
780             }
781         }
782     }
783 
784     private void flushPendingEncryptedWrites(ChannelHandlerContext ctx) {
785         // Avoid possible dead lock and data integrity issue
786         // which is caused by cross communication between more than one channel
787         // in the same VM.
788         if (!pendingEncryptedWritesLock.tryLock()) {
789             return;
790         }
791 
792         try {
793             MessageEvent e;
794             while ((e = pendingEncryptedWrites.poll()) != null) {
795                 ctx.sendDownstream(e);
796             }
797         } finally {
798             pendingEncryptedWritesLock.unlock();
799         }
800     }
801 
802     private ChannelFuture wrapNonAppData(ChannelHandlerContext ctx, Channel channel) throws SSLException {
803         ChannelFuture future = null;
804         ByteBuffer outNetBuf = bufferPool.acquire();
805 
806         SSLEngineResult result;
807         try {
808             for (;;) {
809                 synchronized (handshakeLock) {
810                     result = engine.wrap(EMPTY_BUFFER, outNetBuf);
811                 }
812 
813                 if (result.bytesProduced() > 0) {
814                     outNetBuf.flip();
815                     ChannelBuffer msg = ChannelBuffers.buffer(outNetBuf.remaining());
816                     msg.writeBytes(outNetBuf.array(), 0, msg.capacity());
817                     outNetBuf.clear();
818 
819                     future = future(channel);
820                     future.addListener(new ChannelFutureListener() {
821                         public void operationComplete(ChannelFuture future)
822                                 throws Exception {
823                             if (future.getCause() instanceof ClosedChannelException) {
824                                 synchronized (ignoreClosedChannelExceptionLock) {
825                                     ignoreClosedChannelException ++;
826                                 }
827                             }
828                         }
829                     });
830 
831                     write(ctx, future, msg);
832                 }
833 
834                 final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
835                 handleRenegotiation(handshakeStatus);
836                 switch (handshakeStatus) {
837                 case FINISHED:
838                     setHandshakeSuccess(channel);
839                     runDelegatedTasks();
840                     break;
841                 case NEED_TASK:
842                     runDelegatedTasks();
843                     break;
844                 case NEED_UNWRAP:
845                     if (!Thread.holdsLock(handshakeLock)) {
846                         // unwrap shouldn't be called when this method was
847                         // called by unwrap - unwrap will keep running after
848                         // this method returns.
849                         unwrap(ctx, channel, ChannelBuffers.EMPTY_BUFFER, 0, 0);
850                     }
851                     break;
852                 case NOT_HANDSHAKING:
853                 case NEED_WRAP:
854                     break;
855                 default:
856                     throw new IllegalStateException(
857                             "Unexpected handshake status: " + handshakeStatus);
858                 }
859 
860                 if (result.bytesProduced() == 0) {
861                     break;
862                 }
863             }
864         } catch (SSLException e) {
865             setHandshakeFailure(channel, e);
866             throw e;
867         } finally {
868             bufferPool.release(outNetBuf);
869         }
870 
871         if (future == null) {
872             future = succeededFuture(channel);
873         }
874 
875         return future;
876     }
877 
878     private ChannelBuffer unwrap(
879             ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer, int offset, int length) throws SSLException {
880         ByteBuffer inNetBuf = buffer.toByteBuffer(offset, length);
881         ByteBuffer outAppBuf = bufferPool.acquire();
882 
883         try {
884             boolean needsWrap = false;
885             loop:
886             for (;;) {
887                 SSLEngineResult result;
888                 synchronized (handshakeLock) {
889                     if (!handshaken && !handshaking &&
890                         !engine.getUseClientMode() &&
891                         !engine.isInboundDone() && !engine.isOutboundDone()) {
892                         handshake();
893                     }
894 
895                     result = engine.unwrap(inNetBuf, outAppBuf);
896 
897                     final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
898                     handleRenegotiation(handshakeStatus);
899                     switch (handshakeStatus) {
900                     case NEED_UNWRAP:
901                         if (inNetBuf.hasRemaining() && !engine.isInboundDone()) {
902                             break;
903                         } else {
904                             break loop;
905                         }
906                     case NEED_WRAP:
907                         wrapNonAppData(ctx, channel);
908                         break;
909                     case NEED_TASK:
910                         runDelegatedTasks();
911                         break;
912                     case FINISHED:
913                         setHandshakeSuccess(channel);
914                         needsWrap = true;
915                         break loop;
916                     case NOT_HANDSHAKING:
917                         needsWrap = true;
918                         break loop;
919                     default:
920                         throw new IllegalStateException(
921                                 "Unknown handshake status: " + handshakeStatus);
922                     }
923                 }
924             }
925 
926             if (needsWrap) {
927                 // wrap() acquires pendingUnencryptedWrites first and then
928                 // handshakeLock.  If handshakeLock is already hold by the
929                 // current thread, calling wrap() will lead to a dead lock
930                 // i.e. pendingUnencryptedWrites -> handshakeLock vs.
931                 //      handshakeLock -> pendingUnencryptedLock -> handshakeLock
932                 //
933                 // There is also a same issue between pendingEncryptedWrites
934                 // and pendingUnencryptedWrites.
935                 if (!Thread.holdsLock(handshakeLock) &&
936                     !pendingEncryptedWritesLock.isHeldByCurrentThread()) {
937                     wrap(ctx, channel);
938                 }
939             }
940 
941             outAppBuf.flip();
942 
943             if (outAppBuf.hasRemaining()) {
944                 ChannelBuffer frame = ChannelBuffers.buffer(outAppBuf.remaining());
945                 frame.writeBytes(outAppBuf.array(), 0, frame.capacity());
946                 return frame;
947             } else {
948                 return null;
949             }
950         } catch (SSLException e) {
951             setHandshakeFailure(channel, e);
952             throw e;
953         } finally {
954             bufferPool.release(outAppBuf);
955         }
956     }
957 
958     private void handleRenegotiation(HandshakeStatus handshakeStatus) {
959         if (handshakeStatus == HandshakeStatus.NOT_HANDSHAKING ||
960             handshakeStatus == HandshakeStatus.FINISHED) {
961             // Not handshaking
962             return;
963         }
964 
965         if (!handshaken) {
966             // Not renegotiation
967             return;
968         }
969 
970         final boolean renegotiate;
971         synchronized (handshakeLock) {
972             if (handshaking) {
973                 // Renegotiation in progress or failed already.
974                 // i.e. Renegotiation check has been done already below.
975                 return;
976             }
977 
978             if (engine.isInboundDone() || engine.isOutboundDone()) {
979                 // Not handshaking but closing.
980                 return;
981             }
982 
983             if (isEnableRenegotiation()) {
984                 // Continue renegotiation.
985                 renegotiate = true;
986             } else {
987                 // Do not renegotiate.
988                 renegotiate = false;
989                 // Prevent reentrance of this method.
990                 handshaking = true;
991             }
992         }
993 
994         if (renegotiate) {
995             // Renegotiate.
996             handshake();
997         } else {
998             // Raise an exception.
999             fireExceptionCaught(
1000                     ctx, new SSLException(
1001                             "renegotiation attempted by peer; " +
1002                             "closing the connection"));
1003 
1004             // Close the connection to stop renegotiation.
1005             Channels.close(ctx, succeededFuture(ctx.getChannel()));
1006         }
1007     }
1008 
1009     private void runDelegatedTasks() {
1010         for (;;) {
1011             final Runnable task;
1012             synchronized (handshakeLock) {
1013                 task = engine.getDelegatedTask();
1014             }
1015 
1016             if (task == null) {
1017                 break;
1018             }
1019 
1020             delegatedTaskExecutor.execute(new Runnable() {
1021                 public void run() {
1022                     synchronized (handshakeLock) {
1023                         task.run();
1024                     }
1025                 }
1026             });
1027         }
1028     }
1029 
1030     private void setHandshakeSuccess(Channel channel) {
1031         synchronized (handshakeLock) {
1032             handshaking = false;
1033             handshaken = true;
1034 
1035             if (handshakeFuture == null) {
1036                 handshakeFuture = future(channel);
1037             }
1038         }
1039 
1040         handshakeFuture.setSuccess();
1041     }
1042 
1043     private void setHandshakeFailure(Channel channel, SSLException cause) {
1044         synchronized (handshakeLock) {
1045             if (!handshaking) {
1046                 return;
1047             }
1048             handshaking = false;
1049             handshaken = false;
1050 
1051             if (handshakeFuture == null) {
1052                 handshakeFuture = future(channel);
1053             }
1054 
1055             // Release all resources such as internal buffers that SSLEngine
1056             // is managing.
1057 
1058             engine.closeOutbound();
1059             
1060             try {
1061                 engine.closeInbound();
1062             } catch (SSLException e) {
1063                 logger.debug(
1064                         "SSLEngine.closeInbound() raised an exception after " +
1065                         "a handshake failure.", e);
1066             }
1067         }
1068         
1069         handshakeFuture.setFailure(cause);
1070     }
1071 
1072     private void closeOutboundAndChannel(
1073             final ChannelHandlerContext context, final ChannelStateEvent e) {
1074         if (!e.getChannel().isConnected()) {
1075             context.sendDownstream(e);
1076             return;
1077         }
1078 
1079         boolean success = false;
1080         try {
1081             try {
1082                 unwrap(context, e.getChannel(), ChannelBuffers.EMPTY_BUFFER, 0, 0);
1083             } catch (SSLException ex) {
1084                 logger.debug("Failed to unwrap before sending a close_notify message", ex);
1085             }
1086 
1087             if (!engine.isInboundDone()) {
1088                 if (sentCloseNotify.compareAndSet(false, true)) {
1089                     engine.closeOutbound();
1090                     try {
1091                         ChannelFuture closeNotifyFuture = wrapNonAppData(context, e.getChannel());
1092                         closeNotifyFuture.addListener(
1093                                 new ClosingChannelFutureListener(context, e));
1094                         success = true;
1095                     } catch (SSLException ex) {
1096                         logger.debug("Failed to encode a close_notify message", ex);
1097                     }
1098                 }
1099             } else {
1100                 success = true;
1101             }
1102         } finally {
1103             if (!success) {
1104                 context.sendDownstream(e);
1105             }
1106         }
1107     }
1108 
1109     private static final class PendingWrite {
1110         final ChannelFuture future;
1111         final ByteBuffer outAppBuf;
1112 
1113         PendingWrite(ChannelFuture future, ByteBuffer outAppBuf) {
1114             this.future = future;
1115             this.outAppBuf = outAppBuf;
1116         }
1117     }
1118 
1119     private static final class ClosingChannelFutureListener implements ChannelFutureListener {
1120 
1121         private final ChannelHandlerContext context;
1122         private final ChannelStateEvent e;
1123 
1124         ClosingChannelFutureListener(
1125                 ChannelHandlerContext context, ChannelStateEvent e) {
1126             this.context = context;
1127             this.e = e;
1128         }
1129 
1130         public void operationComplete(ChannelFuture closeNotifyFuture) throws Exception {
1131             if (!(closeNotifyFuture.getCause() instanceof ClosedChannelException)) {
1132                 Channels.close(context, e.getFuture());
1133             } else {
1134                 e.getFuture().setSuccess();
1135             }
1136         }
1137     }
1138 
1139     public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
1140         this.ctx = ctx;
1141     }
1142 
1143     public void afterAdd(ChannelHandlerContext ctx) throws Exception {
1144         // Unused
1145     }
1146 
1147     public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
1148         // Unused
1149     }
1150 
1151     public void afterRemove(ChannelHandlerContext ctx) throws Exception {
1152         // Unused
1153     }
1154 }