View Javadoc

1   /*
2    * Copyright 2009 Red Hat, Inc.
3    *
4    * Red Hat licenses this file to you under the Apache License, version 2.0
5    * (the "License"); you may not use this file except in compliance with the
6    * License.  You may obtain a copy of the License at:
7    *
8    *    http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.codec.serialization;
17  
18  import java.io.ObjectInputStream;
19  import java.io.ObjectOutputStream;
20  import java.io.OutputStream;
21  import java.util.concurrent.atomic.AtomicReference;
22  
23  import org.jboss.netty.buffer.ChannelBuffer;
24  import org.jboss.netty.buffer.ChannelBufferFactory;
25  import org.jboss.netty.buffer.ChannelBufferOutputStream;
26  import org.jboss.netty.buffer.ChannelBuffers;
27  import org.jboss.netty.channel.Channel;
28  import org.jboss.netty.channel.ChannelHandlerContext;
29  import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
30  
31  /**
32   * An encoder which serializes a Java object into a {@link ChannelBuffer}
33   * (interoperability version).
34   * <p>
35   * This encoder is interoperable with the standard Java object streams such as
36   * {@link ObjectInputStream} and {@link ObjectOutputStream}.
37   *
38   * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
39   * @author <a href="http://gleamynode.net/">Trustin Lee</a>
40   *
41   * @version $Rev:231 $, $Date:2008-06-12 16:44:50 +0900 (목, 12 6월 2008) $
42   */
43  public class CompatibleObjectEncoder extends OneToOneEncoder {
44  
45      private final AtomicReference<ChannelBuffer> buffer =
46          new AtomicReference<ChannelBuffer>();
47      private final int resetInterval;
48      private volatile ObjectOutputStream oout;
49      private int writtenObjects;
50  
51      /**
52       * Creates a new instance with the reset interval of {@code 16}.
53       */
54      public CompatibleObjectEncoder() {
55          this(16); // Reset at every sixteen writes
56      }
57  
58      /**
59       * Creates a new instance.
60       *
61       * @param resetInterval
62       *        the number of objects between {@link ObjectOutputStream#reset()}.
63       *        {@code 0} will disable resetting the stream, but the remote
64       *        peer will be at the risk of getting {@link OutOfMemoryError} in
65       *        the long term.
66       */
67      public CompatibleObjectEncoder(int resetInterval) {
68          if (resetInterval < 0) {
69              throw new IllegalArgumentException(
70                      "resetInterval: " + resetInterval);
71          }
72          this.resetInterval = resetInterval;
73      }
74  
75      /**
76       * Creates a new {@link ObjectOutputStream} which wraps the specified
77       * {@link OutputStream}.  Override this method to use a subclass of the
78       * {@link ObjectOutputStream}.
79       */
80      protected ObjectOutputStream newObjectOutputStream(OutputStream out) throws Exception {
81          return new ObjectOutputStream(out);
82      }
83  
84      @Override
85      protected Object encode(ChannelHandlerContext context, Channel channel, Object msg) throws Exception {
86          ChannelBuffer buffer = buffer(context);
87          ObjectOutputStream oout = this.oout;
88          if (resetInterval != 0) {
89              // Resetting will prevent OOM on the receiving side.
90              writtenObjects ++;
91              if (writtenObjects % resetInterval == 0) {
92                  oout.reset();
93  
94                  // Also discard the byproduct to avoid OOM on the sending side.
95                  buffer.discardReadBytes();
96              }
97          }
98          oout.writeObject(msg);
99          oout.flush();
100 
101         ChannelBuffer encoded = buffer.readBytes(buffer.readableBytes());
102         return encoded;
103     }
104 
105     private ChannelBuffer buffer(ChannelHandlerContext ctx) throws Exception {
106         ChannelBuffer buf = buffer.get();
107         if (buf == null) {
108             ChannelBufferFactory factory = ctx.getChannel().getConfig().getBufferFactory();
109             buf = ChannelBuffers.dynamicBuffer(factory);
110             if (buffer.compareAndSet(null, buf)) {
111                 boolean success = false;
112                 try {
113                     oout = newObjectOutputStream(new ChannelBufferOutputStream(buf));
114                     success = true;
115                 } finally {
116                     if (!success) {
117                         oout = null;
118                     }
119                 }
120             } else {
121                 buf = buffer.get();
122             }
123         }
124         return buf;
125     }
126 }