1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.ssl;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.io.IOException;
21 import java.nio.ByteBuffer;
22 import java.nio.channels.ClosedChannelException;
23 import java.util.LinkedList;
24 import java.util.Queue;
25 import java.util.concurrent.Executor;
26 import java.util.concurrent.atomic.AtomicBoolean;
27 import java.util.regex.Pattern;
28
29 import javax.net.ssl.SSLEngine;
30 import javax.net.ssl.SSLEngineResult;
31 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
32 import javax.net.ssl.SSLEngineResult.Status;
33 import javax.net.ssl.SSLException;
34
35 import org.jboss.netty.buffer.ChannelBuffer;
36 import org.jboss.netty.buffer.ChannelBuffers;
37 import org.jboss.netty.channel.Channel;
38 import org.jboss.netty.channel.ChannelDownstreamHandler;
39 import org.jboss.netty.channel.ChannelEvent;
40 import org.jboss.netty.channel.ChannelFuture;
41 import org.jboss.netty.channel.ChannelFutureListener;
42 import org.jboss.netty.channel.ChannelHandlerContext;
43 import org.jboss.netty.channel.ChannelPipeline;
44 import org.jboss.netty.channel.ChannelStateEvent;
45 import org.jboss.netty.channel.Channels;
46 import org.jboss.netty.channel.DownstreamMessageEvent;
47 import org.jboss.netty.channel.ExceptionEvent;
48 import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
49 import org.jboss.netty.channel.MessageEvent;
50 import org.jboss.netty.handler.codec.frame.FrameDecoder;
51 import org.jboss.netty.logging.InternalLogger;
52 import org.jboss.netty.logging.InternalLoggerFactory;
53 import org.jboss.netty.util.internal.LinkedTransferQueue;
54 import org.jboss.netty.util.internal.NonReentrantLock;
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149 public class SslHandler extends FrameDecoder
150 implements ChannelDownstreamHandler,
151 LifeCycleAwareChannelHandler {
152
153 private static final InternalLogger logger =
154 InternalLoggerFactory.getInstance(SslHandler.class);
155
156 private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
157
158 private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile(
159 "^.*(?:connection.*reset|connection.*closed|broken.*pipe).*$",
160 Pattern.CASE_INSENSITIVE);
161
162 private static SslBufferPool defaultBufferPool;
163
164
165
166
167
168 public static synchronized SslBufferPool getDefaultBufferPool() {
169 if (defaultBufferPool == null) {
170 defaultBufferPool = new SslBufferPool();
171 }
172 return defaultBufferPool;
173 }
174
175 private volatile ChannelHandlerContext ctx;
176 private final SSLEngine engine;
177 private final SslBufferPool bufferPool;
178 private final Executor delegatedTaskExecutor;
179 private final boolean startTls;
180
181 private volatile boolean enableRenegotiation = true;
182
183 final Object handshakeLock = new Object();
184 private boolean handshaking;
185 private volatile boolean handshaken;
186 private volatile ChannelFuture handshakeFuture;
187
188 private final AtomicBoolean sentFirstMessage = new AtomicBoolean();
189 private final AtomicBoolean sentCloseNotify = new AtomicBoolean();
190 int ignoreClosedChannelException;
191 final Object ignoreClosedChannelExceptionLock = new Object();
192 private final Queue<PendingWrite> pendingUnencryptedWrites = new LinkedList<PendingWrite>();
193 private final Queue<MessageEvent> pendingEncryptedWrites = new LinkedTransferQueue<MessageEvent>();
194 private final NonReentrantLock pendingEncryptedWritesLock = new NonReentrantLock();
195
196
197
198
199
200
201 public SslHandler(SSLEngine engine) {
202 this(engine, getDefaultBufferPool(), ImmediateExecutor.INSTANCE);
203 }
204
205
206
207
208
209
210
211
212 public SslHandler(SSLEngine engine, SslBufferPool bufferPool) {
213 this(engine, bufferPool, ImmediateExecutor.INSTANCE);
214 }
215
216
217
218
219
220
221
222
223 public SslHandler(SSLEngine engine, boolean startTls) {
224 this(engine, getDefaultBufferPool(), startTls);
225 }
226
227
228
229
230
231
232
233
234
235
236 public SslHandler(SSLEngine engine, SslBufferPool bufferPool, boolean startTls) {
237 this(engine, bufferPool, startTls, ImmediateExecutor.INSTANCE);
238 }
239
240
241
242
243
244
245
246
247
248
249 public SslHandler(SSLEngine engine, Executor delegatedTaskExecutor) {
250 this(engine, getDefaultBufferPool(), delegatedTaskExecutor);
251 }
252
253
254
255
256
257
258
259
260
261
262
263
264
265 public SslHandler(SSLEngine engine, SslBufferPool bufferPool, Executor delegatedTaskExecutor) {
266 this(engine, bufferPool, false, delegatedTaskExecutor);
267 }
268
269
270
271
272
273
274
275
276
277
278
279
280
281 public SslHandler(SSLEngine engine, boolean startTls, Executor delegatedTaskExecutor) {
282 this(engine, getDefaultBufferPool(), startTls, delegatedTaskExecutor);
283 }
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300 public SslHandler(SSLEngine engine, SslBufferPool bufferPool, boolean startTls, Executor delegatedTaskExecutor) {
301 if (engine == null) {
302 throw new NullPointerException("engine");
303 }
304 if (bufferPool == null) {
305 throw new NullPointerException("bufferPool");
306 }
307 if (delegatedTaskExecutor == null) {
308 throw new NullPointerException("delegatedTaskExecutor");
309 }
310 this.engine = engine;
311 this.bufferPool = bufferPool;
312 this.delegatedTaskExecutor = delegatedTaskExecutor;
313 this.startTls = startTls;
314 }
315
316
317
318
319 public SSLEngine getEngine() {
320 return engine;
321 }
322
323
324
325
326
327
328
329 public ChannelFuture handshake() {
330 if (handshaken && !isEnableRenegotiation()) {
331 throw new IllegalStateException("renegotiation disabled");
332 }
333
334 ChannelHandlerContext ctx = this.ctx;
335 Channel channel = ctx.getChannel();
336 ChannelFuture handshakeFuture;
337 Exception exception = null;
338
339 synchronized (handshakeLock) {
340 if (handshaking) {
341 return this.handshakeFuture;
342 } else {
343 handshaking = true;
344 try {
345 engine.beginHandshake();
346 runDelegatedTasks();
347 handshakeFuture = this.handshakeFuture = future(channel);
348 } catch (Exception e) {
349 handshakeFuture = this.handshakeFuture = failedFuture(channel, e);
350 exception = e;
351 }
352 }
353 }
354
355 if (exception == null) {
356 try {
357 wrapNonAppData(ctx, channel);
358 } catch (SSLException e) {
359 fireExceptionCaught(ctx, e);
360 handshakeFuture.setFailure(e);
361 }
362 } else {
363 fireExceptionCaught(ctx, exception);
364 }
365
366 return handshakeFuture;
367 }
368
369
370
371
372 @Deprecated
373 public ChannelFuture handshake(Channel channel) {
374 return handshake();
375 }
376
377
378
379
380
381 public ChannelFuture close() {
382 ChannelHandlerContext ctx = this.ctx;
383 Channel channel = ctx.getChannel();
384 try {
385 engine.closeOutbound();
386 return wrapNonAppData(ctx, channel);
387 } catch (SSLException e) {
388 fireExceptionCaught(ctx, e);
389 return failedFuture(channel, e);
390 }
391 }
392
393
394
395
396 @Deprecated
397 public ChannelFuture close(Channel channel) {
398 return close();
399 }
400
401
402
403
404 public boolean isEnableRenegotiation() {
405 return enableRenegotiation;
406 }
407
408
409
410
411 public void setEnableRenegotiation(boolean enableRenegotiation) {
412 this.enableRenegotiation = enableRenegotiation;
413 }
414
415 public void handleDownstream(
416 final ChannelHandlerContext context, final ChannelEvent evt) throws Exception {
417 if (evt instanceof ChannelStateEvent) {
418 ChannelStateEvent e = (ChannelStateEvent) evt;
419 switch (e.getState()) {
420 case OPEN:
421 case CONNECTED:
422 case BOUND:
423 if (Boolean.FALSE.equals(e.getValue()) || e.getValue() == null) {
424 closeOutboundAndChannel(context, e);
425 return;
426 }
427 }
428 }
429 if (!(evt instanceof MessageEvent)) {
430 context.sendDownstream(evt);
431 return;
432 }
433
434 MessageEvent e = (MessageEvent) evt;
435 if (!(e.getMessage() instanceof ChannelBuffer)) {
436 context.sendDownstream(evt);
437 return;
438 }
439
440
441
442 if (startTls && sentFirstMessage.compareAndSet(false, true)) {
443 context.sendDownstream(evt);
444 return;
445 }
446
447
448 ChannelBuffer msg = (ChannelBuffer) e.getMessage();
449 PendingWrite pendingWrite;
450
451 if (msg.readable()) {
452 pendingWrite = new PendingWrite(evt.getFuture(), msg.toByteBuffer(msg.readerIndex(), msg.readableBytes()));
453 } else {
454 pendingWrite = new PendingWrite(evt.getFuture(), null);
455 }
456 synchronized (pendingUnencryptedWrites) {
457 boolean offered = pendingUnencryptedWrites.offer(pendingWrite);
458 assert offered;
459 }
460
461 wrap(context, evt.getChannel());
462 }
463
464 @Override
465 public void channelDisconnected(ChannelHandlerContext ctx,
466 ChannelStateEvent e) throws Exception {
467
468
469
470 synchronized (handshakeLock) {
471 if (handshaking) {
472 handshakeFuture.setFailure(new ClosedChannelException());
473 }
474 }
475
476 try {
477 super.channelDisconnected(ctx, e);
478 } finally {
479 unwrap(ctx, e.getChannel(), ChannelBuffers.EMPTY_BUFFER, 0, 0);
480 engine.closeOutbound();
481 if (!sentCloseNotify.get() && handshaken) {
482 try {
483 engine.closeInbound();
484 } catch (SSLException ex) {
485 logger.debug("Failed to clean up SSLEngine.", ex);
486 }
487 }
488 }
489 }
490
491 @Override
492 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
493 throws Exception {
494
495 Throwable cause = e.getCause();
496 if (cause instanceof IOException) {
497 if (cause instanceof ClosedChannelException) {
498 synchronized (ignoreClosedChannelExceptionLock) {
499 if (ignoreClosedChannelException > 0) {
500 ignoreClosedChannelException --;
501 logger.debug(
502 "Swallowing an exception raised while " +
503 "writing non-app data", cause);
504 return;
505 }
506 }
507 } else if (engine.isOutboundDone()) {
508 String message = String.valueOf(cause.getMessage()).toLowerCase();
509 if (IGNORABLE_ERROR_MESSAGE.matcher(message).matches()) {
510
511
512 logger.debug(
513 "Swallowing a 'connection reset by peer / " +
514 "broken pipe' error occurred while writing " +
515 "'closure_notify'", cause);
516
517
518
519 Channels.close(ctx, succeededFuture(e.getChannel()));
520 return;
521 }
522 }
523 }
524
525 ctx.sendUpstream(e);
526 }
527
528 @Override
529 protected Object decode(
530 final ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
531
532 if (buffer.readableBytes() < 5) {
533 return null;
534 }
535
536 int packetLength = 0;
537
538
539 boolean tls;
540 switch (buffer.getUnsignedByte(buffer.readerIndex())) {
541 case 20:
542 case 21:
543 case 22:
544 case 23:
545 tls = true;
546 break;
547 default:
548
549 tls = false;
550 }
551
552 if (tls) {
553
554 int majorVersion = buffer.getUnsignedByte(buffer.readerIndex() + 1);
555 if (majorVersion >= 3 && majorVersion < 10) {
556
557 packetLength = (getShort(buffer, buffer.readerIndex() + 3) & 0xFFFF) + 5;
558 if (packetLength <= 5) {
559
560 tls = false;
561 }
562 } else {
563
564 tls = false;
565 }
566 }
567
568 if (!tls) {
569
570 boolean sslv2 = true;
571 int headerLength = (buffer.getUnsignedByte(
572 buffer.readerIndex()) & 0x80) != 0 ? 2 : 3;
573 int majorVersion = buffer.getUnsignedByte(
574 buffer.readerIndex() + headerLength + 1);
575 if (majorVersion >= 2 && majorVersion < 10) {
576
577 if (headerLength == 2) {
578 packetLength = (getShort(buffer, buffer.readerIndex()) & 0x7FFF) + 2;
579 } else {
580 packetLength = (getShort(buffer, buffer.readerIndex()) & 0x3FFF) + 3;
581 }
582 if (packetLength <= headerLength) {
583 sslv2 = false;
584 }
585 } else {
586 sslv2 = false;
587 }
588
589 if (!sslv2) {
590
591 SSLException e = new SSLException(
592 "not an SSL/TLS record: " + ChannelBuffers.hexDump(buffer));
593 buffer.skipBytes(buffer.readableBytes());
594 throw e;
595 }
596 }
597
598 assert packetLength > 0;
599
600 if (buffer.readableBytes() < packetLength) {
601 return null;
602 }
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618 final int packetOffset = buffer.readerIndex();
619 buffer.skipBytes(packetLength);
620 return unwrap(ctx, channel, buffer, packetOffset, packetLength);
621 }
622
623
624
625
626
627 private static short getShort(ChannelBuffer buf, int offset) {
628 return (short) (buf.getByte(offset) << 8 | buf.getByte(offset + 1) & 0xFF);
629 }
630
631 private ChannelFuture wrap(ChannelHandlerContext context, Channel channel)
632 throws SSLException {
633
634 ChannelFuture future = null;
635 ChannelBuffer msg;
636 ByteBuffer outNetBuf = bufferPool.acquire();
637 boolean success = true;
638 boolean offered = false;
639 boolean needsUnwrap = false;
640 try {
641 loop:
642 for (;;) {
643
644
645
646 synchronized (pendingUnencryptedWrites) {
647 PendingWrite pendingWrite = pendingUnencryptedWrites.peek();
648 if (pendingWrite == null) {
649 break;
650 }
651
652 ByteBuffer outAppBuf = pendingWrite.outAppBuf;
653 if (outAppBuf == null) {
654
655 pendingUnencryptedWrites.remove();
656 offerEncryptedWriteRequest(
657 new DownstreamMessageEvent(
658 channel, pendingWrite.future,
659 ChannelBuffers.EMPTY_BUFFER,
660 channel.getRemoteAddress()));
661 offered = true;
662 } else {
663 SSLEngineResult result = null;
664 try {
665 synchronized (handshakeLock) {
666 result = engine.wrap(outAppBuf, outNetBuf);
667 }
668 } finally {
669 if (!outAppBuf.hasRemaining()) {
670 pendingUnencryptedWrites.remove();
671 }
672 }
673
674 if (result.bytesProduced() > 0) {
675 outNetBuf.flip();
676 msg = ChannelBuffers.buffer(outNetBuf.remaining());
677 msg.writeBytes(outNetBuf.array(), 0, msg.capacity());
678 outNetBuf.clear();
679
680 if (pendingWrite.outAppBuf.hasRemaining()) {
681
682
683 future = succeededFuture(channel);
684 } else {
685 future = pendingWrite.future;
686 }
687
688 MessageEvent encryptedWrite = new DownstreamMessageEvent(
689 channel, future, msg, channel.getRemoteAddress());
690 offerEncryptedWriteRequest(encryptedWrite);
691 offered = true;
692 } else if (result.getStatus() == Status.CLOSED) {
693
694
695 success = false;
696 break;
697 } else {
698 final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
699 handleRenegotiation(handshakeStatus);
700 switch (handshakeStatus) {
701 case NEED_WRAP:
702 if (outAppBuf.hasRemaining()) {
703 break;
704 } else {
705 break loop;
706 }
707 case NEED_UNWRAP:
708 needsUnwrap = true;
709 break loop;
710 case NEED_TASK:
711 runDelegatedTasks();
712 break;
713 case FINISHED:
714 case NOT_HANDSHAKING:
715 if (handshakeStatus == HandshakeStatus.FINISHED) {
716 setHandshakeSuccess(channel);
717 }
718 if (result.getStatus() == Status.CLOSED) {
719 success = false;
720 }
721 break loop;
722 default:
723 throw new IllegalStateException(
724 "Unknown handshake status: " +
725 handshakeStatus);
726 }
727 }
728 }
729 }
730 }
731 } catch (SSLException e) {
732 success = false;
733 setHandshakeFailure(channel, e);
734 throw e;
735 } finally {
736 bufferPool.release(outNetBuf);
737
738 if (offered) {
739 flushPendingEncryptedWrites(context);
740 }
741
742 if (!success) {
743 IllegalStateException cause =
744 new IllegalStateException("SSLEngine already closed");
745
746
747
748
749 for (;;) {
750 PendingWrite pendingWrite;
751 synchronized (pendingUnencryptedWrites) {
752 pendingWrite = pendingUnencryptedWrites.poll();
753 if (pendingWrite == null) {
754 break;
755 }
756 }
757
758 pendingWrite.future.setFailure(cause);
759 }
760 }
761 }
762
763 if (needsUnwrap) {
764 unwrap(context, channel, ChannelBuffers.EMPTY_BUFFER, 0, 0);
765 }
766
767 if (future == null) {
768 future = succeededFuture(channel);
769 }
770 return future;
771 }
772
773 private void offerEncryptedWriteRequest(MessageEvent encryptedWrite) {
774 final boolean locked = pendingEncryptedWritesLock.tryLock();
775 try {
776 pendingEncryptedWrites.offer(encryptedWrite);
777 } finally {
778 if (locked) {
779 pendingEncryptedWritesLock.unlock();
780 }
781 }
782 }
783
784 private void flushPendingEncryptedWrites(ChannelHandlerContext ctx) {
785
786
787
788 if (!pendingEncryptedWritesLock.tryLock()) {
789 return;
790 }
791
792 try {
793 MessageEvent e;
794 while ((e = pendingEncryptedWrites.poll()) != null) {
795 ctx.sendDownstream(e);
796 }
797 } finally {
798 pendingEncryptedWritesLock.unlock();
799 }
800 }
801
802 private ChannelFuture wrapNonAppData(ChannelHandlerContext ctx, Channel channel) throws SSLException {
803 ChannelFuture future = null;
804 ByteBuffer outNetBuf = bufferPool.acquire();
805
806 SSLEngineResult result;
807 try {
808 for (;;) {
809 synchronized (handshakeLock) {
810 result = engine.wrap(EMPTY_BUFFER, outNetBuf);
811 }
812
813 if (result.bytesProduced() > 0) {
814 outNetBuf.flip();
815 ChannelBuffer msg = ChannelBuffers.buffer(outNetBuf.remaining());
816 msg.writeBytes(outNetBuf.array(), 0, msg.capacity());
817 outNetBuf.clear();
818
819 future = future(channel);
820 future.addListener(new ChannelFutureListener() {
821 public void operationComplete(ChannelFuture future)
822 throws Exception {
823 if (future.getCause() instanceof ClosedChannelException) {
824 synchronized (ignoreClosedChannelExceptionLock) {
825 ignoreClosedChannelException ++;
826 }
827 }
828 }
829 });
830
831 write(ctx, future, msg);
832 }
833
834 final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
835 handleRenegotiation(handshakeStatus);
836 switch (handshakeStatus) {
837 case FINISHED:
838 setHandshakeSuccess(channel);
839 runDelegatedTasks();
840 break;
841 case NEED_TASK:
842 runDelegatedTasks();
843 break;
844 case NEED_UNWRAP:
845 if (!Thread.holdsLock(handshakeLock)) {
846
847
848
849 unwrap(ctx, channel, ChannelBuffers.EMPTY_BUFFER, 0, 0);
850 }
851 break;
852 case NOT_HANDSHAKING:
853 case NEED_WRAP:
854 break;
855 default:
856 throw new IllegalStateException(
857 "Unexpected handshake status: " + handshakeStatus);
858 }
859
860 if (result.bytesProduced() == 0) {
861 break;
862 }
863 }
864 } catch (SSLException e) {
865 setHandshakeFailure(channel, e);
866 throw e;
867 } finally {
868 bufferPool.release(outNetBuf);
869 }
870
871 if (future == null) {
872 future = succeededFuture(channel);
873 }
874
875 return future;
876 }
877
878 private ChannelBuffer unwrap(
879 ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer, int offset, int length) throws SSLException {
880 ByteBuffer inNetBuf = buffer.toByteBuffer(offset, length);
881 ByteBuffer outAppBuf = bufferPool.acquire();
882
883 try {
884 boolean needsWrap = false;
885 loop:
886 for (;;) {
887 SSLEngineResult result;
888 synchronized (handshakeLock) {
889 if (!handshaken && !handshaking &&
890 !engine.getUseClientMode() &&
891 !engine.isInboundDone() && !engine.isOutboundDone()) {
892 handshake();
893 }
894
895 result = engine.unwrap(inNetBuf, outAppBuf);
896
897 final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
898 handleRenegotiation(handshakeStatus);
899 switch (handshakeStatus) {
900 case NEED_UNWRAP:
901 if (inNetBuf.hasRemaining() && !engine.isInboundDone()) {
902 break;
903 } else {
904 break loop;
905 }
906 case NEED_WRAP:
907 wrapNonAppData(ctx, channel);
908 break;
909 case NEED_TASK:
910 runDelegatedTasks();
911 break;
912 case FINISHED:
913 setHandshakeSuccess(channel);
914 needsWrap = true;
915 break loop;
916 case NOT_HANDSHAKING:
917 needsWrap = true;
918 break loop;
919 default:
920 throw new IllegalStateException(
921 "Unknown handshake status: " + handshakeStatus);
922 }
923 }
924 }
925
926 if (needsWrap) {
927
928
929
930
931
932
933
934
935 if (!Thread.holdsLock(handshakeLock) &&
936 !pendingEncryptedWritesLock.isHeldByCurrentThread()) {
937 wrap(ctx, channel);
938 }
939 }
940
941 outAppBuf.flip();
942
943 if (outAppBuf.hasRemaining()) {
944 ChannelBuffer frame = ChannelBuffers.buffer(outAppBuf.remaining());
945 frame.writeBytes(outAppBuf.array(), 0, frame.capacity());
946 return frame;
947 } else {
948 return null;
949 }
950 } catch (SSLException e) {
951 setHandshakeFailure(channel, e);
952 throw e;
953 } finally {
954 bufferPool.release(outAppBuf);
955 }
956 }
957
958 private void handleRenegotiation(HandshakeStatus handshakeStatus) {
959 if (handshakeStatus == HandshakeStatus.NOT_HANDSHAKING ||
960 handshakeStatus == HandshakeStatus.FINISHED) {
961
962 return;
963 }
964
965 if (!handshaken) {
966
967 return;
968 }
969
970 final boolean renegotiate;
971 synchronized (handshakeLock) {
972 if (handshaking) {
973
974
975 return;
976 }
977
978 if (engine.isInboundDone() || engine.isOutboundDone()) {
979
980 return;
981 }
982
983 if (isEnableRenegotiation()) {
984
985 renegotiate = true;
986 } else {
987
988 renegotiate = false;
989
990 handshaking = true;
991 }
992 }
993
994 if (renegotiate) {
995
996 handshake();
997 } else {
998
999 fireExceptionCaught(
1000 ctx, new SSLException(
1001 "renegotiation attempted by peer; " +
1002 "closing the connection"));
1003
1004
1005 Channels.close(ctx, succeededFuture(ctx.getChannel()));
1006 }
1007 }
1008
1009 private void runDelegatedTasks() {
1010 for (;;) {
1011 final Runnable task;
1012 synchronized (handshakeLock) {
1013 task = engine.getDelegatedTask();
1014 }
1015
1016 if (task == null) {
1017 break;
1018 }
1019
1020 delegatedTaskExecutor.execute(new Runnable() {
1021 public void run() {
1022 synchronized (handshakeLock) {
1023 task.run();
1024 }
1025 }
1026 });
1027 }
1028 }
1029
1030 private void setHandshakeSuccess(Channel channel) {
1031 synchronized (handshakeLock) {
1032 handshaking = false;
1033 handshaken = true;
1034
1035 if (handshakeFuture == null) {
1036 handshakeFuture = future(channel);
1037 }
1038 }
1039
1040 handshakeFuture.setSuccess();
1041 }
1042
1043 private void setHandshakeFailure(Channel channel, SSLException cause) {
1044 synchronized (handshakeLock) {
1045 if (!handshaking) {
1046 return;
1047 }
1048 handshaking = false;
1049 handshaken = false;
1050
1051 if (handshakeFuture == null) {
1052 handshakeFuture = future(channel);
1053 }
1054
1055
1056
1057
1058 engine.closeOutbound();
1059
1060 try {
1061 engine.closeInbound();
1062 } catch (SSLException e) {
1063 logger.debug(
1064 "SSLEngine.closeInbound() raised an exception after " +
1065 "a handshake failure.", e);
1066 }
1067 }
1068
1069 handshakeFuture.setFailure(cause);
1070 }
1071
1072 private void closeOutboundAndChannel(
1073 final ChannelHandlerContext context, final ChannelStateEvent e) {
1074 if (!e.getChannel().isConnected()) {
1075 context.sendDownstream(e);
1076 return;
1077 }
1078
1079 boolean success = false;
1080 try {
1081 try {
1082 unwrap(context, e.getChannel(), ChannelBuffers.EMPTY_BUFFER, 0, 0);
1083 } catch (SSLException ex) {
1084 logger.debug("Failed to unwrap before sending a close_notify message", ex);
1085 }
1086
1087 if (!engine.isInboundDone()) {
1088 if (sentCloseNotify.compareAndSet(false, true)) {
1089 engine.closeOutbound();
1090 try {
1091 ChannelFuture closeNotifyFuture = wrapNonAppData(context, e.getChannel());
1092 closeNotifyFuture.addListener(
1093 new ClosingChannelFutureListener(context, e));
1094 success = true;
1095 } catch (SSLException ex) {
1096 logger.debug("Failed to encode a close_notify message", ex);
1097 }
1098 }
1099 } else {
1100 success = true;
1101 }
1102 } finally {
1103 if (!success) {
1104 context.sendDownstream(e);
1105 }
1106 }
1107 }
1108
1109 private static final class PendingWrite {
1110 final ChannelFuture future;
1111 final ByteBuffer outAppBuf;
1112
1113 PendingWrite(ChannelFuture future, ByteBuffer outAppBuf) {
1114 this.future = future;
1115 this.outAppBuf = outAppBuf;
1116 }
1117 }
1118
1119 private static final class ClosingChannelFutureListener implements ChannelFutureListener {
1120
1121 private final ChannelHandlerContext context;
1122 private final ChannelStateEvent e;
1123
1124 ClosingChannelFutureListener(
1125 ChannelHandlerContext context, ChannelStateEvent e) {
1126 this.context = context;
1127 this.e = e;
1128 }
1129
1130 public void operationComplete(ChannelFuture closeNotifyFuture) throws Exception {
1131 if (!(closeNotifyFuture.getCause() instanceof ClosedChannelException)) {
1132 Channels.close(context, e.getFuture());
1133 } else {
1134 e.getFuture().setSuccess();
1135 }
1136 }
1137 }
1138
1139 public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
1140 this.ctx = ctx;
1141 }
1142
1143 public void afterAdd(ChannelHandlerContext ctx) throws Exception {
1144
1145 }
1146
1147 public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
1148
1149 }
1150
1151 public void afterRemove(ChannelHandlerContext ctx) throws Exception {
1152
1153 }
1154 }