1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.group;
17
18 import java.net.SocketAddress;
19 import java.util.AbstractSet;
20 import java.util.ArrayList;
21 import java.util.Collection;
22 import java.util.Iterator;
23 import java.util.LinkedHashMap;
24 import java.util.Map;
25 import java.util.concurrent.ConcurrentMap;
26 import java.util.concurrent.atomic.AtomicInteger;
27
28 import org.jboss.netty.buffer.ChannelBuffer;
29 import org.jboss.netty.channel.Channel;
30 import org.jboss.netty.channel.ChannelFuture;
31 import org.jboss.netty.channel.ChannelFutureListener;
32 import org.jboss.netty.channel.ServerChannel;
33 import org.jboss.netty.util.internal.ConcurrentHashMap;
34
35
36
37
38
39
40
41
42
43
44 public class DefaultChannelGroup extends AbstractSet<Channel> implements ChannelGroup {
45
46 private static final AtomicInteger nextId = new AtomicInteger();
47
48 private final String name;
49 private final ConcurrentMap<Integer, Channel> serverChannels = new ConcurrentHashMap<Integer, Channel>();
50 private final ConcurrentMap<Integer, Channel> nonServerChannels = new ConcurrentHashMap<Integer, Channel>();
51 private final ChannelFutureListener remover = new ChannelFutureListener() {
52 public void operationComplete(ChannelFuture future) throws Exception {
53 remove(future.getChannel());
54 }
55 };
56
57
58
59
60 public DefaultChannelGroup() {
61 this("group-0x" + Integer.toHexString(nextId.incrementAndGet()));
62 }
63
64
65
66
67
68
69 public DefaultChannelGroup(String name) {
70 if (name == null) {
71 throw new NullPointerException("name");
72 }
73 this.name = name;
74 }
75
76 public String getName() {
77 return name;
78 }
79
80 @Override
81 public boolean isEmpty() {
82 return nonServerChannels.isEmpty() && serverChannels.isEmpty();
83 }
84
85 @Override
86 public int size() {
87 return nonServerChannels.size() + serverChannels.size();
88 }
89
90 public Channel find(Integer id) {
91 Channel c = nonServerChannels.get(id);
92 if (c != null) {
93 return c;
94 } else {
95 return serverChannels.get(id);
96 }
97 }
98
99 @Override
100 public boolean contains(Object o) {
101 if (o instanceof Integer) {
102 return nonServerChannels.containsKey(o) || serverChannels.containsKey(o);
103 } else if (o instanceof Channel) {
104 Channel c = (Channel) o;
105 if (o instanceof ServerChannel) {
106 return serverChannels.containsKey(c.getId());
107 } else {
108 return nonServerChannels.containsKey(c.getId());
109 }
110 } else {
111 return false;
112 }
113 }
114
115 @Override
116 public boolean add(Channel channel) {
117 ConcurrentMap<Integer, Channel> map =
118 channel instanceof ServerChannel? serverChannels : nonServerChannels;
119
120 boolean added = map.putIfAbsent(channel.getId(), channel) == null;
121 if (added) {
122 channel.getCloseFuture().addListener(remover);
123 }
124 return added;
125 }
126
127 @Override
128 public boolean remove(Object o) {
129 Channel c = null;
130 if (o instanceof Integer) {
131 c = nonServerChannels.remove(o);
132 if (c == null) {
133 c = serverChannels.remove(o);
134 }
135 } else if (o instanceof Channel) {
136 c = (Channel) o;
137 if (c instanceof ServerChannel) {
138 c = serverChannels.remove(c.getId());
139 } else {
140 c = nonServerChannels.remove(c.getId());
141 }
142 }
143
144 if (c == null) {
145 return false;
146 }
147
148 c.getCloseFuture().removeListener(remover);
149 return true;
150 }
151
152 @Override
153 public void clear() {
154 nonServerChannels.clear();
155 serverChannels.clear();
156 }
157
158 @Override
159 public Iterator<Channel> iterator() {
160 return new CombinedIterator<Channel>(
161 serverChannels.values().iterator(),
162 nonServerChannels.values().iterator());
163 }
164
165 @Override
166 public Object[] toArray() {
167 Collection<Channel> channels = new ArrayList<Channel>(size());
168 channels.addAll(serverChannels.values());
169 channels.addAll(nonServerChannels.values());
170 return channels.toArray();
171 }
172
173 @Override
174 public <T> T[] toArray(T[] a) {
175 Collection<Channel> channels = new ArrayList<Channel>(size());
176 channels.addAll(serverChannels.values());
177 channels.addAll(nonServerChannels.values());
178 return channels.toArray(a);
179 }
180
181 public ChannelGroupFuture close() {
182 Map<Integer, ChannelFuture> futures =
183 new LinkedHashMap<Integer, ChannelFuture>(size());
184
185 for (Channel c: serverChannels.values()) {
186 futures.put(c.getId(), c.close().awaitUninterruptibly());
187 }
188 for (Channel c: nonServerChannels.values()) {
189 futures.put(c.getId(), c.close());
190 }
191
192 return new DefaultChannelGroupFuture(this, futures);
193 }
194
195 public ChannelGroupFuture disconnect() {
196 Map<Integer, ChannelFuture> futures =
197 new LinkedHashMap<Integer, ChannelFuture>(size());
198
199 for (Channel c: serverChannels.values()) {
200 futures.put(c.getId(), c.disconnect().awaitUninterruptibly());
201 }
202 for (Channel c: nonServerChannels.values()) {
203 futures.put(c.getId(), c.disconnect());
204 }
205
206 return new DefaultChannelGroupFuture(this, futures);
207 }
208
209 public ChannelGroupFuture setInterestOps(int interestOps) {
210 Map<Integer, ChannelFuture> futures =
211 new LinkedHashMap<Integer, ChannelFuture>(size());
212
213 for (Channel c: serverChannels.values()) {
214 futures.put(c.getId(), c.setInterestOps(interestOps).awaitUninterruptibly());
215 }
216 for (Channel c: nonServerChannels.values()) {
217 futures.put(c.getId(), c.setInterestOps(interestOps));
218 }
219
220 return new DefaultChannelGroupFuture(this, futures);
221 }
222
223 public ChannelGroupFuture setReadable(boolean readable) {
224 Map<Integer, ChannelFuture> futures =
225 new LinkedHashMap<Integer, ChannelFuture>(size());
226
227 for (Channel c: serverChannels.values()) {
228 futures.put(c.getId(), c.setReadable(readable).awaitUninterruptibly());
229 }
230 for (Channel c: nonServerChannels.values()) {
231 futures.put(c.getId(), c.setReadable(readable));
232 }
233
234 return new DefaultChannelGroupFuture(this, futures);
235 }
236
237 public ChannelGroupFuture unbind() {
238 Map<Integer, ChannelFuture> futures =
239 new LinkedHashMap<Integer, ChannelFuture>(size());
240
241 for (Channel c: serverChannels.values()) {
242 futures.put(c.getId(), c.unbind().awaitUninterruptibly());
243 }
244 for (Channel c: nonServerChannels.values()) {
245 futures.put(c.getId(), c.unbind());
246 }
247
248 return new DefaultChannelGroupFuture(this, futures);
249 }
250
251 public ChannelGroupFuture write(Object message) {
252 Map<Integer, ChannelFuture> futures =
253 new LinkedHashMap<Integer, ChannelFuture>(size());
254 if (message instanceof ChannelBuffer) {
255 ChannelBuffer buf = (ChannelBuffer) message;
256 for (Channel c: nonServerChannels.values()) {
257 futures.put(c.getId(), c.write(buf.duplicate()));
258 }
259 } else {
260 for (Channel c: nonServerChannels.values()) {
261 futures.put(c.getId(), c.write(message));
262 }
263 }
264 return new DefaultChannelGroupFuture(this, futures);
265 }
266
267 public ChannelGroupFuture write(Object message, SocketAddress remoteAddress) {
268 Map<Integer, ChannelFuture> futures =
269 new LinkedHashMap<Integer, ChannelFuture>(size());
270 if (message instanceof ChannelBuffer) {
271 ChannelBuffer buf = (ChannelBuffer) message;
272 for (Channel c: nonServerChannels.values()) {
273 futures.put(c.getId(), c.write(buf.duplicate(), remoteAddress));
274 }
275 } else {
276 for (Channel c: nonServerChannels.values()) {
277 futures.put(c.getId(), c.write(message, remoteAddress));
278 }
279 }
280 return new DefaultChannelGroupFuture(this, futures);
281 }
282
283 @Override
284 public int hashCode() {
285 return System.identityHashCode(this);
286 }
287
288 @Override
289 public boolean equals(Object o) {
290 return this == o;
291 }
292
293 public int compareTo(ChannelGroup o) {
294 int v = getName().compareTo(o.getName());
295 if (v != 0) {
296 return v;
297 }
298
299 return System.identityHashCode(this) - System.identityHashCode(o);
300 }
301
302 @Override
303 public String toString() {
304 return getClass().getSimpleName() +
305 "(name: " + getName() + ", size: " + size() + ')';
306 }
307 }