public abstract class TP extends Protocol
sendMulticast(byte[], int, int)
sendUnicast(org.jgroups.PhysicalAddress, byte[], int, int)
init()
start()
: subclasses must call super.start() after they initialize themselves
(e.g., created their sockets).
stop()
: subclasses must call super.stop() after they deinitialized themselves
destroy()
receive(Address, byte[], int, int)
method must
be called by subclasses when a unicast or multicast message has been received.Modifier and Type | Class and Description |
---|---|
protected static interface |
TP.Bundler |
static class |
TP.ProtocolAdapter
Used when the transport is shared (singleton_name is not null).
|
Modifier and Type | Field and Description |
---|---|
protected InetAddress |
bind_addr |
protected String |
bind_interface_str |
protected int |
bind_port
The port to which the transport binds.
|
protected TP.Bundler |
bundler |
protected int |
bundler_capacity |
protected String |
bundler_type |
protected static boolean |
can_bind_to_mcast_addr |
protected String |
channel_name
The name of the group to which this member is connected.
|
protected int |
connect_count
Keeps track of connects and disconnects, in order to start and stop threads
|
protected ReentrantLock |
connectLock |
protected ThreadFactory |
default_thread_factory
Factory which is used by oob_thread_pool
|
protected DiagnosticsHandler |
diag_handler |
protected InetAddress |
diagnostics_addr |
protected int |
diagnostics_port |
protected boolean |
discard_incompatible_packets
Discard packets with a different version.
|
protected boolean |
enable_bundling |
protected boolean |
enable_diagnostics |
protected boolean |
enable_unicast_bundling
Enable bundling for unicast messages.
|
protected InetAddress |
external_addr |
protected int |
external_port |
protected static NumberFormat |
f |
protected ThreadFactory |
global_thread_factory
Used by all threads created by JGroups outside of the thread pools
|
protected TpHeader |
header
The header including the cluster name, sent with each message.
|
protected long |
last_discovery_request |
protected static byte |
LIST |
protected Address |
local_addr
The address (host and port) of this member.
|
protected boolean |
log_discard_msgs
Whether or not warnings about messages from different groups are logged - private flag, not for common use
|
protected LazyRemovalCache<Address,PhysicalAddress> |
logical_addr_cache
Cache which maintains mappings between logical and physical addresses.
|
protected long |
logical_addr_cache_expiration |
protected int |
logical_addr_cache_max_size |
protected boolean |
loopback
If true, messages sent to self are treated specially: unicast messages are looped back immediately,
multicast messages get a local copy first and - when the real copy arrives - it will be discarded.
|
protected int |
max_bundle_size
Maximum number of bytes for messages to be queued until they are sent.
|
protected long |
max_bundle_timeout
Max number of milliseconds until queued messages are sent.
|
protected Set<Address> |
members
The members of this group (updated when a member joins or leaves).
|
protected static byte |
MULTICAST |
protected long |
num_bytes_received |
protected long |
num_bytes_sent |
protected long |
num_incoming_msgs_received |
protected long |
num_msgs_received |
protected long |
num_msgs_sent |
protected long |
num_oob_msgs_received |
protected static byte |
OOB |
protected ThreadFactory |
oob_thread_factory
Factory which is used by oob_thread_pool
|
protected Executor |
oob_thread_pool |
protected boolean |
oob_thread_pool_enabled |
protected long |
oob_thread_pool_keep_alive_time |
protected int |
oob_thread_pool_max_threads |
protected int |
oob_thread_pool_min_threads |
protected BlockingQueue<Runnable> |
oob_thread_pool_queue
Used if oob_thread_pool is a ThreadPoolExecutor and oob_thread_pool_queue_enabled is true
|
protected boolean |
oob_thread_pool_queue_enabled |
protected int |
oob_thread_pool_queue_max_size |
protected ThreadGroup |
pool_thread_group |
protected int |
port_range |
protected List<DiagnosticsHandler.ProbeHandler> |
preregistered_probe_handlers |
protected static LazyRemovalCache.Printable<Address,PhysicalAddress> |
print_function |
protected List<NetworkInterface> |
receive_interfaces
List
|
protected boolean |
receive_on_all_interfaces |
protected String |
singleton_name |
protected SocketFactory |
socket_factory |
protected String |
thread_naming_pattern |
protected Executor |
thread_pool
The thread pool which handles unmarshalling, version checks and dispatching of regular messages
|
protected boolean |
thread_pool_enabled |
protected long |
thread_pool_keep_alive_time |
protected int |
thread_pool_max_threads |
protected int |
thread_pool_min_threads |
protected BlockingQueue<Runnable> |
thread_pool_queue
Used if thread_pool is a ThreadPoolExecutor and thread_pool_queue_enabled is true
|
protected boolean |
thread_pool_queue_enabled |
protected int |
thread_pool_queue_max_size |
protected String |
thread_pool_rejection_policy |
protected long |
tick_time |
protected TimeScheduler |
timer |
protected long |
timer_keep_alive_time |
protected int |
timer_max_threads |
protected int |
timer_min_threads |
protected int |
timer_queue_max_size |
protected String |
timer_rejection_policy |
protected ThreadFactory |
timer_thread_factory |
protected String |
timer_type |
protected ConcurrentMap<String,Protocol> |
up_prots
If singleton_name is enabled, this map is used to de-multiplex incoming messages according to their cluster
names (attached to the message by the transport anyway).
|
protected int |
wheel_size |
protected AgeOutCache<Address> |
who_has_cache
Cache keeping track of WHO_HAS requests for physical addresses (given a logical address) and expiring
them after 5000ms
|
Modifier | Constructor and Description |
---|---|
protected |
TP()
Creates the TP protocol, and initializes the state variables, does
however not start any sockets or threads.
|
Modifier and Type | Method and Description |
---|---|
protected void |
addPhysicalAddressToCache(Address logical_addr,
PhysicalAddress physical_addr) |
void |
clearLogicalAddressCache()
Clears the cache.
|
protected static ExecutorService |
createThreadPool(int min_threads,
int max_threads,
long keep_alive_time,
String rejection_policy,
BlockingQueue<Runnable> queue,
ThreadFactory factory) |
void |
destroy()
This method is called on a
Channel.close() . |
protected void |
dispatchToThreadPool(Executor pool,
Address sender,
byte[] data,
int offset,
int length) |
protected void |
doSend(Buffer buf,
Address dest,
boolean multicast) |
Object |
down(Event evt)
Caller by the layer above this layer.
|
String |
dumpTimerTasks() |
void |
evictLogicalAddressCache() |
protected void |
fetchLocalAddresses()
Grabs the local address (or addresses in the shared transport case) and registers them with the physical address
in the transport's cache
|
InetAddress |
getBindAddress() |
int |
getBindPort() |
int |
getBundlerBufferSize() |
Executor |
getDefaultThreadPool() |
ThreadFactory |
getDefaultThreadPoolThreadFactory() |
abstract String |
getInfo() |
boolean |
getLogDiscardMessages() |
int |
getMaxBundleSize() |
long |
getMaxBundleTimeout() |
long |
getNumBytesReceived() |
long |
getNumBytesSent() |
long |
getNumMessagesReceived() |
long |
getNumMessagesSent() |
static int |
getNumThreads() |
int |
getNumTimerTasks() |
int |
getOOBMaxQueueSize() |
long |
getOOBMessages() |
int |
getOOBPoolSize() |
int |
getOOBQueueSize() |
Executor |
getOOBThreadPool() |
long |
getOOBThreadPoolKeepAliveTime() |
int |
getOOBThreadPoolMaxThreads() |
int |
getOOBThreadPoolMinThreads() |
ThreadFactory |
getOOBThreadPoolThreadFactory() |
protected abstract PhysicalAddress |
getPhysicalAddress() |
protected PhysicalAddress |
getPhysicalAddressFromCache(Address logical_addr) |
ThreadGroup |
getPoolThreadGroup() |
int |
getPortRange() |
List<NetworkInterface> |
getReceiveInterfaces() |
int |
getRegularMaxQueueSize() |
long |
getRegularMessages() |
int |
getRegularPoolSize() |
int |
getRegularQueueSize() |
String |
getSingletonName() |
SocketFactory |
getSocketFactory()
Returns the SocketFactory associated with this protocol, if overridden in a subclass, or passes the call down
|
ThreadFactory |
getThreadFactory()
Supposed to be overwritten by subclasses.
|
String |
getThreadNamingPattern()
Names the current thread.
|
long |
getThreadPoolKeepAliveTime() |
int |
getThreadPoolMaxThreads() |
int |
getThreadPoolMinThreads() |
TimeScheduler |
getTimer() |
String |
getTimerClass() |
long |
getTimerKeepAliveTime() |
int |
getTimerMaxThreads() |
int |
getTimerMinThreads() |
int |
getTimerQueueSize() |
ThreadFactory |
getTimerThreadFactory() |
int |
getTimerThreads() |
ConcurrentMap<String,Protocol> |
getUpProtocols() |
protected void |
handleConnect() |
protected void |
handleDisconnect() |
protected Object |
handleDownEvent(Event evt) |
void |
init()
Called after instance has been created (null constructor) and before protocol is started.
|
boolean |
isDefaulThreadPoolEnabled() |
boolean |
isDiscardIncompatiblePackets() |
boolean |
isEnableBundling() |
boolean |
isEnableUnicastBundling() |
boolean |
isLoopback() |
boolean |
isMulticastCapable() |
boolean |
isOOBThreadPoolEnabled() |
boolean |
isReceiveOnAllInterfaces() |
boolean |
isSingleton() |
protected void |
passMessageUp(Message msg,
boolean perform_cluster_name_matching,
boolean multicast,
boolean discard_own_mcast) |
protected void |
passToAllUpProtocols(Event evt) |
String |
printLogicalAddressCache() |
protected static Message |
readMessage(DataInputStream instream) |
protected static List<Message> |
readMessageList(DataInputStream in) |
protected void |
receive(Address sender,
byte[] data,
int offset,
int length)
Subclasses must call this method when a unicast or multicast message has been received.
|
protected void |
registerLocalAddress(Address addr)
Associates the address with the physical address fetched from the cache
|
void |
registerProbeHandler(DiagnosticsHandler.ProbeHandler handler) |
protected void |
removeLogicalAddressFromCache(Address logical_addr) |
void |
resetStats() |
protected void |
send(Message msg,
Address dest,
boolean multicast)
Serializes and sends a message.
|
abstract void |
sendMulticast(byte[] data,
int offset,
int length)
Send to all members in the group.
|
protected void |
sendToAllPhysicalAddresses(byte[] buf,
int offset,
int length) |
protected void |
sendToSingleMember(Address dest,
byte[] buf,
int offset,
int length) |
abstract void |
sendUnicast(PhysicalAddress dest,
byte[] data,
int offset,
int length)
Send a unicast to 1 member.
|
void |
setBindAddress(InetAddress bind_addr) |
void |
setBindPort(int port) |
void |
setBindToAllInterfaces(boolean flag) |
void |
setDefaultThreadPool(Executor thread_pool) |
void |
setDefaultThreadPoolThreadFactory(ThreadFactory factory) |
void |
setDiagnosticsHandler(DiagnosticsHandler handler)
Sets a
DiagnosticsHandler . |
void |
setDiscardIncompatiblePackets(boolean flag) |
void |
setEnableBundling(boolean flag) |
void |
setEnableUnicastBundling(boolean enable_unicast_bundling) |
protected void |
setInAllThreadFactories(String cluster_name,
Address local_address,
String pattern) |
void |
setLogDiscardMessages(boolean flag) |
void |
setLoopback(boolean b) |
void |
setMaxBundleSize(int size) |
void |
setMaxBundleTimeout(long timeout) |
void |
setOOBRejectionPolicy(String rejection_policy) |
void |
setOOBThreadPool(Executor oob_thread_pool) |
void |
setOOBThreadPoolKeepAliveTime(long time) |
void |
setOOBThreadPoolMaxThreads(int size) |
void |
setOOBThreadPoolMinThreads(int size) |
void |
setOOBThreadPoolThreadFactory(ThreadFactory factory) |
void |
setPortRange(int range) |
void |
setRegularRejectionPolicy(String rejection_policy) |
void |
setSocketFactory(SocketFactory factory)
Sets a SocketFactory.
|
protected void |
setSourceAddress(Message msg)
If the sender is null, set our own address.
|
void |
setThreadFactory(ThreadFactory factory) |
protected void |
setThreadNames() |
void |
setThreadPoolKeepAliveTime(long time) |
void |
setThreadPoolMaxThreads(int size) |
void |
setThreadPoolMinThreads(int size) |
void |
setThreadPoolQueueEnabled(boolean flag) |
void |
setTimer(TimeScheduler timer)
Sets a new timer.
|
void |
setTimerKeepAliveTime(long time) |
void |
setTimerMaxThreads(int size) |
void |
setTimerMinThreads(int size) |
void |
setTimerThreadFactory(ThreadFactory factory) |
protected static void |
shutdownThreadPool(Executor thread_pool) |
void |
start()
Creates the unicast and multicast sockets and starts the unicast and multicast receiver threads
|
void |
stop()
This method is called on a
Channel.disconnect() . |
abstract boolean |
supportsMulticasting()
Whether or not hardware multicasting is supported
|
String |
toString() |
void |
unregisterProbeHandler(DiagnosticsHandler.ProbeHandler handler) |
protected void |
unsetThreadNames() |
Object |
up(Event evt)
handle the UP event.
|
protected static void |
writeMessage(Message msg,
DataOutputStream dos,
boolean multicast)
This method needs to be synchronized on out_stream when it is called
|
protected static void |
writeMessageList(Address dest,
Address src,
List<Message> msgs,
DataOutputStream dos,
boolean multicast)
Write a lits of messages with the same destination and *mostly* the same src addresses.
|
dumpStats, enableStats, getConfigurableObjects, getDownProtocol, getId, getLevel, getName, getProtocolStack, getTransport, getUpProtocol, getValue, isErgonomics, printStats, providedDownServices, providedUpServices, requiredDownServices, requiredUpServices, resetStatistics, setDownProtocol, setErgonomics, setId, setLevel, setProtocolStack, setUpProtocol, setValue, setValues, statsEnabled
protected static final byte LIST
protected static final byte MULTICAST
protected static final byte OOB
protected static final boolean can_bind_to_mcast_addr
protected static NumberFormat f
protected InetAddress bind_addr
protected InetAddress external_addr
protected int external_port
protected String bind_interface_str
protected boolean receive_on_all_interfaces
protected List<NetworkInterface> receive_interfaces
protected int logical_addr_cache_max_size
protected long logical_addr_cache_expiration
protected int bind_port
protected int port_range
protected boolean loopback
protected boolean discard_incompatible_packets
protected String thread_naming_pattern
protected boolean oob_thread_pool_enabled
protected int oob_thread_pool_min_threads
protected int oob_thread_pool_max_threads
protected long oob_thread_pool_keep_alive_time
protected boolean oob_thread_pool_queue_enabled
protected int oob_thread_pool_queue_max_size
protected int thread_pool_min_threads
protected int thread_pool_max_threads
protected long thread_pool_keep_alive_time
protected boolean thread_pool_enabled
protected boolean thread_pool_queue_enabled
protected int thread_pool_queue_max_size
protected String thread_pool_rejection_policy
protected String timer_type
protected int timer_min_threads
protected int timer_max_threads
protected long timer_keep_alive_time
protected int timer_queue_max_size
protected String timer_rejection_policy
protected int wheel_size
protected long tick_time
protected boolean enable_bundling
protected boolean enable_unicast_bundling
protected boolean enable_diagnostics
protected InetAddress diagnostics_addr
protected int diagnostics_port
protected String singleton_name
protected boolean log_discard_msgs
protected int max_bundle_size
protected long max_bundle_timeout
protected String bundler_type
protected int bundler_capacity
protected long num_msgs_sent
protected long num_msgs_received
protected long num_bytes_sent
protected long num_bytes_received
protected String channel_name
protected long num_oob_msgs_received
protected long num_incoming_msgs_received
protected Address local_addr
protected final Set<Address> members
protected ThreadGroup pool_thread_group
protected int connect_count
protected final ReentrantLock connectLock
protected Executor oob_thread_pool
protected ThreadFactory oob_thread_factory
protected BlockingQueue<Runnable> oob_thread_pool_queue
protected Executor thread_pool
protected ThreadFactory default_thread_factory
protected BlockingQueue<Runnable> thread_pool_queue
protected TimeScheduler timer
protected ThreadFactory timer_thread_factory
protected ThreadFactory global_thread_factory
protected SocketFactory socket_factory
protected TP.Bundler bundler
protected DiagnosticsHandler diag_handler
protected final List<DiagnosticsHandler.ProbeHandler> preregistered_probe_handlers
protected final ConcurrentMap<String,Protocol> up_prots
protected TpHeader header
protected LazyRemovalCache<Address,PhysicalAddress> logical_addr_cache
protected long last_discovery_request
protected static final LazyRemovalCache.Printable<Address,PhysicalAddress> print_function
protected AgeOutCache<Address> who_has_cache
protected TP()
public void setMaxBundleSize(int size)
public long getMaxBundleTimeout()
public void setMaxBundleTimeout(long timeout)
public int getMaxBundleSize()
public int getBundlerBufferSize()
public void setOOBThreadPoolKeepAliveTime(long time)
public long getOOBThreadPoolKeepAliveTime()
public void setOOBThreadPoolMinThreads(int size)
public int getOOBThreadPoolMinThreads()
public void setOOBThreadPoolMaxThreads(int size)
public int getOOBThreadPoolMaxThreads()
public void setThreadPoolMinThreads(int size)
public int getThreadPoolMinThreads()
public void setThreadPoolMaxThreads(int size)
public int getThreadPoolMaxThreads()
public void setThreadPoolKeepAliveTime(long time)
public long getThreadPoolKeepAliveTime()
public void setTimerMinThreads(int size)
public int getTimerMinThreads()
public void setTimerMaxThreads(int size)
public int getTimerMaxThreads()
public void setTimerKeepAliveTime(long time)
public long getTimerKeepAliveTime()
public int getTimerQueueSize()
public String getTimerClass()
public abstract boolean supportsMulticasting()
public boolean isMulticastCapable()
public void resetStats()
resetStats
in class Protocol
public void registerProbeHandler(DiagnosticsHandler.ProbeHandler handler)
public void unregisterProbeHandler(DiagnosticsHandler.ProbeHandler handler)
public void setDiagnosticsHandler(DiagnosticsHandler handler)
DiagnosticsHandler
. Should be set before the stack is startedhandler
- public ThreadGroup getPoolThreadGroup()
public void setThreadPoolQueueEnabled(boolean flag)
public Executor getDefaultThreadPool()
public void setDefaultThreadPool(Executor thread_pool)
public ThreadFactory getDefaultThreadPoolThreadFactory()
public void setDefaultThreadPoolThreadFactory(ThreadFactory factory)
public Executor getOOBThreadPool()
public void setOOBThreadPool(Executor oob_thread_pool)
public ThreadFactory getOOBThreadPoolThreadFactory()
public void setOOBThreadPoolThreadFactory(ThreadFactory factory)
public ThreadFactory getTimerThreadFactory()
public void setTimerThreadFactory(ThreadFactory factory)
public TimeScheduler getTimer()
public void setTimer(TimeScheduler timer)
timer
- public ThreadFactory getThreadFactory()
Protocol
getThreadFactory
in class Protocol
public void setThreadFactory(ThreadFactory factory)
public SocketFactory getSocketFactory()
Protocol
getSocketFactory
in class Protocol
public void setSocketFactory(SocketFactory factory)
Protocol
TP
)
or TP.ProtocolAdapter
setSocketFactory
in class Protocol
public String getThreadNamingPattern()
public long getNumMessagesSent()
public long getNumMessagesReceived()
public long getNumBytesSent()
public long getNumBytesReceived()
public InetAddress getBindAddress()
public void setBindAddress(InetAddress bind_addr)
public int getBindPort()
public void setBindPort(int port)
public void setBindToAllInterfaces(boolean flag)
public boolean isReceiveOnAllInterfaces()
public List<NetworkInterface> getReceiveInterfaces()
public boolean isDiscardIncompatiblePackets()
public void setDiscardIncompatiblePackets(boolean flag)
public boolean isEnableBundling()
public void setEnableBundling(boolean flag)
public boolean isEnableUnicastBundling()
public void setEnableUnicastBundling(boolean enable_unicast_bundling)
public void setPortRange(int range)
public int getPortRange()
public boolean isOOBThreadPoolEnabled()
public boolean isDefaulThreadPoolEnabled()
public boolean isLoopback()
public void setLoopback(boolean b)
public ConcurrentMap<String,Protocol> getUpProtocols()
public int getOOBPoolSize()
public long getOOBMessages()
public int getOOBQueueSize()
public int getOOBMaxQueueSize()
public void setOOBRejectionPolicy(String rejection_policy)
public int getRegularPoolSize()
public long getRegularMessages()
public int getRegularQueueSize()
public int getRegularMaxQueueSize()
public int getNumTimerTasks()
public String dumpTimerTasks()
public int getTimerThreads()
public static int getNumThreads()
public void setRegularRejectionPolicy(String rejection_policy)
public void setLogDiscardMessages(boolean flag)
public boolean getLogDiscardMessages()
public String printLogicalAddressCache()
public void evictLogicalAddressCache()
public abstract void sendMulticast(byte[] data, int offset, int length) throws Exception
data
- The data to be sent. This is not a copy, so don't modify itoffset
- length
- Exception
public abstract void sendUnicast(PhysicalAddress dest, byte[] data, int offset, int length) throws Exception
dest
- Must be a non-null unicast addressdata
- The data to be sent. This is not a copy, so don't modify itoffset
- length
- Exception
public abstract String getInfo()
public void init() throws Exception
Protocol
public void destroy()
Protocol
Channel.close()
.
Does some cleanup; after the call the VM will terminatepublic void start() throws Exception
start
in class Protocol
Exception
- Thrown if protocol cannot be started successfully. This will cause the ProtocolStack
to fail, so Channel.connect(String)
will throw an exceptionpublic void stop()
Protocol
Channel.disconnect()
. Stops work (e.g. by closing multicast socket).
Will be called from top to bottom. This means that at the time of the method invocation the
neighbor protocol below is still working. This method will replace the
STOP, STOP_OK, CLEANUP and CLEANUP_OK events. The ProtocolStack guarantees that
when this method is called all messages in the down queue will have been flushedprotected void handleDisconnect()
public String getSingletonName()
public boolean isSingleton()
public Object down(Event evt)
protected void setSourceAddress(Message msg)
protected void passMessageUp(Message msg, boolean perform_cluster_name_matching, boolean multicast, boolean discard_own_mcast)
protected void receive(Address sender, byte[] data, int offset, int length)
sender
- data
- offset
- length
- protected void dispatchToThreadPool(Executor pool, Address sender, byte[] data, int offset, int length)
protected void send(Message msg, Address dest, boolean multicast) throws Exception
Exception
protected void doSend(Buffer buf, Address dest, boolean multicast) throws Exception
Exception
protected void sendToSingleMember(Address dest, byte[] buf, int offset, int length) throws Exception
Exception
protected void sendToAllPhysicalAddresses(byte[] buf, int offset, int length) throws Exception
Exception
protected static void writeMessage(Message msg, DataOutputStream dos, boolean multicast) throws Exception
msg
- IOException
Exception
protected static Message readMessage(DataInputStream instream) throws Exception
Exception
protected static void writeMessageList(Address dest, Address src, List<Message> msgs, DataOutputStream dos, boolean multicast) throws Exception
List: * | version | flags | dest | src | [Message*] | Message: | presence | leading | flags | [src] | length | [buffer] | size | [Headers*] |
dest
- src
- msgs
- dos
- multicast
- Exception
protected static List<Message> readMessageList(DataInputStream in) throws Exception
Exception
protected void registerLocalAddress(Address addr)
addr
- protected void fetchLocalAddresses()
protected void setThreadNames()
protected void unsetThreadNames()
protected void setInAllThreadFactories(String cluster_name, Address local_address, String pattern)
protected static ExecutorService createThreadPool(int min_threads, int max_threads, long keep_alive_time, String rejection_policy, BlockingQueue<Runnable> queue, ThreadFactory factory)
protected static void shutdownThreadPool(Executor thread_pool)
protected void passToAllUpProtocols(Event evt)
protected void addPhysicalAddressToCache(Address logical_addr, PhysicalAddress physical_addr)
protected PhysicalAddress getPhysicalAddressFromCache(Address logical_addr)
protected void removeLogicalAddressFromCache(Address logical_addr)
public void clearLogicalAddressCache()
protected abstract PhysicalAddress getPhysicalAddress()
Copyright © 2012 JBoss by Red Hat. All Rights Reserved.