View Javadoc

1   /*
2    * Copyright 2009 Red Hat, Inc.
3    *
4    * Red Hat licenses this file to you under the Apache License, version 2.0
5    * (the "License"); you may not use this file except in compliance with the
6    * License.  You may obtain a copy of the License at:
7    *
8    *    http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.group;
17  
18  import java.net.SocketAddress;
19  import java.util.AbstractSet;
20  import java.util.ArrayList;
21  import java.util.Collection;
22  import java.util.Iterator;
23  import java.util.LinkedHashMap;
24  import java.util.Map;
25  import java.util.concurrent.ConcurrentMap;
26  import java.util.concurrent.atomic.AtomicInteger;
27  
28  import org.jboss.netty.buffer.ChannelBuffer;
29  import org.jboss.netty.channel.Channel;
30  import org.jboss.netty.channel.ChannelFuture;
31  import org.jboss.netty.channel.ChannelFutureListener;
32  import org.jboss.netty.channel.ServerChannel;
33  import org.jboss.netty.util.internal.ConcurrentHashMap;
34  
35  /**
36   * The default {@link ChannelGroup} implementation.
37   *
38   * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
39   * @author <a href="http://gleamynode.net/">Trustin Lee</a>
40   * @version $Rev: 2080 $, $Date: 2010-01-26 18:04:19 +0900 (Tue, 26 Jan 2010) $
41   *
42   * @apiviz.landmark
43   */
44  public class DefaultChannelGroup extends AbstractSet<Channel> implements ChannelGroup {
45  
46      private static final AtomicInteger nextId = new AtomicInteger();
47  
48      private final String name;
49      private final ConcurrentMap<Integer, Channel> serverChannels = new ConcurrentHashMap<Integer, Channel>();
50      private final ConcurrentMap<Integer, Channel> nonServerChannels = new ConcurrentHashMap<Integer, Channel>();
51      private final ChannelFutureListener remover = new ChannelFutureListener() {
52          public void operationComplete(ChannelFuture future) throws Exception {
53              remove(future.getChannel());
54          }
55      };
56  
57      /**
58       * Creates a new group with a generated name.
59       */
60      public DefaultChannelGroup() {
61          this("group-0x" + Integer.toHexString(nextId.incrementAndGet()));
62      }
63  
64      /**
65       * Creates a new group with the specified {@code name}.  Please note that
66       * different groups can have the same name, which means no duplicate check
67       * is done against group names.
68       */
69      public DefaultChannelGroup(String name) {
70          if (name == null) {
71              throw new NullPointerException("name");
72          }
73          this.name = name;
74      }
75  
76      public String getName() {
77          return name;
78      }
79  
80      @Override
81      public boolean isEmpty() {
82          return nonServerChannels.isEmpty() && serverChannels.isEmpty();
83      }
84  
85      @Override
86      public int size() {
87          return nonServerChannels.size() + serverChannels.size();
88      }
89  
90      public Channel find(Integer id) {
91          Channel c = nonServerChannels.get(id);
92          if (c != null) {
93              return c;
94          } else {
95              return serverChannels.get(id);
96          }
97      }
98  
99      @Override
100     public boolean contains(Object o) {
101         if (o instanceof Integer) {
102             return nonServerChannels.containsKey(o) || serverChannels.containsKey(o);
103         } else if (o instanceof Channel) {
104             Channel c = (Channel) o;
105             if (o instanceof ServerChannel) {
106                 return serverChannels.containsKey(c.getId());
107             } else {
108                 return nonServerChannels.containsKey(c.getId());
109             }
110         } else {
111             return false;
112         }
113     }
114 
115     @Override
116     public boolean add(Channel channel) {
117         ConcurrentMap<Integer, Channel> map =
118             channel instanceof ServerChannel? serverChannels : nonServerChannels;
119 
120         boolean added = map.putIfAbsent(channel.getId(), channel) == null;
121         if (added) {
122             channel.getCloseFuture().addListener(remover);
123         }
124         return added;
125     }
126 
127     @Override
128     public boolean remove(Object o) {
129         Channel c = null;
130         if (o instanceof Integer) {
131             c = nonServerChannels.remove(o);
132             if (c == null) {
133                 c = serverChannels.remove(o);
134             }
135         } else if (o instanceof Channel) {
136             c = (Channel) o;
137             if (c instanceof ServerChannel) {
138                 c = serverChannels.remove(c.getId());
139             } else {
140                 c = nonServerChannels.remove(c.getId());
141             }
142         }
143 
144         if (c == null) {
145             return false;
146         }
147 
148         c.getCloseFuture().removeListener(remover);
149         return true;
150     }
151 
152     @Override
153     public void clear() {
154         nonServerChannels.clear();
155         serverChannels.clear();
156     }
157 
158     @Override
159     public Iterator<Channel> iterator() {
160         return new CombinedIterator<Channel>(
161                 serverChannels.values().iterator(),
162                 nonServerChannels.values().iterator());
163     }
164 
165     @Override
166     public Object[] toArray() {
167         Collection<Channel> channels = new ArrayList<Channel>(size());
168         channels.addAll(serverChannels.values());
169         channels.addAll(nonServerChannels.values());
170         return channels.toArray();
171     }
172 
173     @Override
174     public <T> T[] toArray(T[] a) {
175         Collection<Channel> channels = new ArrayList<Channel>(size());
176         channels.addAll(serverChannels.values());
177         channels.addAll(nonServerChannels.values());
178         return channels.toArray(a);
179     }
180 
181     public ChannelGroupFuture close() {
182         Map<Integer, ChannelFuture> futures =
183             new LinkedHashMap<Integer, ChannelFuture>(size());
184 
185         for (Channel c: serverChannels.values()) {
186             futures.put(c.getId(), c.close().awaitUninterruptibly());
187         }
188         for (Channel c: nonServerChannels.values()) {
189             futures.put(c.getId(), c.close());
190         }
191 
192         return new DefaultChannelGroupFuture(this, futures);
193     }
194 
195     public ChannelGroupFuture disconnect() {
196         Map<Integer, ChannelFuture> futures =
197             new LinkedHashMap<Integer, ChannelFuture>(size());
198 
199         for (Channel c: serverChannels.values()) {
200             futures.put(c.getId(), c.disconnect().awaitUninterruptibly());
201         }
202         for (Channel c: nonServerChannels.values()) {
203             futures.put(c.getId(), c.disconnect());
204         }
205 
206         return new DefaultChannelGroupFuture(this, futures);
207     }
208 
209     public ChannelGroupFuture setInterestOps(int interestOps) {
210         Map<Integer, ChannelFuture> futures =
211             new LinkedHashMap<Integer, ChannelFuture>(size());
212 
213         for (Channel c: serverChannels.values()) {
214             futures.put(c.getId(), c.setInterestOps(interestOps).awaitUninterruptibly());
215         }
216         for (Channel c: nonServerChannels.values()) {
217             futures.put(c.getId(), c.setInterestOps(interestOps));
218         }
219 
220         return new DefaultChannelGroupFuture(this, futures);
221     }
222 
223     public ChannelGroupFuture setReadable(boolean readable) {
224         Map<Integer, ChannelFuture> futures =
225             new LinkedHashMap<Integer, ChannelFuture>(size());
226 
227         for (Channel c: serverChannels.values()) {
228             futures.put(c.getId(), c.setReadable(readable).awaitUninterruptibly());
229         }
230         for (Channel c: nonServerChannels.values()) {
231             futures.put(c.getId(), c.setReadable(readable));
232         }
233 
234         return new DefaultChannelGroupFuture(this, futures);
235     }
236 
237     public ChannelGroupFuture unbind() {
238         Map<Integer, ChannelFuture> futures =
239             new LinkedHashMap<Integer, ChannelFuture>(size());
240 
241         for (Channel c: serverChannels.values()) {
242             futures.put(c.getId(), c.unbind().awaitUninterruptibly());
243         }
244         for (Channel c: nonServerChannels.values()) {
245             futures.put(c.getId(), c.unbind());
246         }
247 
248         return new DefaultChannelGroupFuture(this, futures);
249     }
250 
251     public ChannelGroupFuture write(Object message) {
252         Map<Integer, ChannelFuture> futures =
253             new LinkedHashMap<Integer, ChannelFuture>(size());
254         if (message instanceof ChannelBuffer) {
255             ChannelBuffer buf = (ChannelBuffer) message;
256             for (Channel c: nonServerChannels.values()) {
257                 futures.put(c.getId(), c.write(buf.duplicate()));
258             }
259         } else {
260             for (Channel c: nonServerChannels.values()) {
261                 futures.put(c.getId(), c.write(message));
262             }
263         }
264         return new DefaultChannelGroupFuture(this, futures);
265     }
266 
267     public ChannelGroupFuture write(Object message, SocketAddress remoteAddress) {
268         Map<Integer, ChannelFuture> futures =
269             new LinkedHashMap<Integer, ChannelFuture>(size());
270         if (message instanceof ChannelBuffer) {
271             ChannelBuffer buf = (ChannelBuffer) message;
272             for (Channel c: nonServerChannels.values()) {
273                 futures.put(c.getId(), c.write(buf.duplicate(), remoteAddress));
274             }
275         } else {
276             for (Channel c: nonServerChannels.values()) {
277                 futures.put(c.getId(), c.write(message, remoteAddress));
278             }
279         }
280         return new DefaultChannelGroupFuture(this, futures);
281     }
282 
283     @Override
284     public int hashCode() {
285         return System.identityHashCode(this);
286     }
287 
288     @Override
289     public boolean equals(Object o) {
290         return this == o;
291     }
292 
293     public int compareTo(ChannelGroup o) {
294         int v = getName().compareTo(o.getName());
295         if (v != 0) {
296             return v;
297         }
298 
299         return System.identityHashCode(this) - System.identityHashCode(o);
300     }
301 
302     @Override
303     public String toString() {
304         return getClass().getSimpleName() +
305                "(name: " + getName() + ", size: " + size() + ')';
306     }
307 }