JBoss.orgCommunity Documentation

Chapter 38. Clusters

38.1. Clusters Overview
38.2. Server discovery
38.2.1. Dynamic Discovery
38.2.2. Discovery using static Connectors
38.3. Server-Side Message Load Balancing
38.3.1. Configuring Cluster Connections
38.3.2. Cluster User Credentials
38.4. Client-Side Load balancing
38.5. Specifying Members of a Cluster Explicitly
38.6. Message Redistribution
38.7. Cluster topologies
38.7.1. Symmetric cluster
38.7.2. Chain cluster

HornetQ clusters allow groups of HornetQ servers to be grouped together in order to share message processing load. Each active node in the cluster is an active HornetQ server which manages its own messages and handles its own connections.

The cluster is formed by each node declaring cluster connections to other nodes in the core configuration file hornetq-configuration.xml. When a node forms a cluster connection to another node, internally it creates a core bridge (as described in Chapter 36, Core Bridges) connection between it and the other node, this is done transparently behind the scenes - you don't have to declare an explicit bridge for each node. These cluster connections allow messages to flow between the nodes of the cluster to balance load.

Nodes can be connected together to form a cluster in many different topologies, we will discuss a couple of the more common topologies later in this chapter.

We'll also discuss client side load balancing, where we can balance client connections across the nodes of the cluster, and we'll consider message redistribution where HornetQ will redistribute messages between nodes to avoid starvation.

Another important part of clustering is server discovery where servers can broadcast their connection details so clients or other servers can connect to them with the minimum of configuration.

Server discovery is a mechanism by which servers can propagate their connection details to:

This information, lets call it the Cluster Topology, is actually sent around normal HornetQ connections to clients and to other servers over cluster connections. This being the case we need a way of establishing the initial first connection. This can be done using dynamic discovery techniques like UDP and JGroups, or by providing a list of initial connectors.

Server discovery uses UDP multicast or JGroups to broadcast server connection settings.

A broadcast group is the means by which a server broadcasts connectors over the network. A connector defines a way in which a client (or other server) can make connections to the server. For more information on what a connector is, please see Chapter 16, Configuring the Transport.

The broadcast group takes a set of connector pairs, each connector pair contains connection settings for a live and backup server (if one exists) and broadcasts them on the network. Depending on which broadcasting technique you configure the cluster, it uses either UDP or JGroups to broadcast connector pairs information.

Broadcast groups are defined in the server configuration file hornetq-configuration.xml. There can be many broadcast groups per HornetQ server. All broadcast groups must be defined in a broadcast-groups element.

Let's take a look at an example broadcast group from hornetq-configuration.xml that defines a UDP broadcast group:

<broadcast-groups>
   <broadcast-group name="my-broadcast-group">
      <local-bind-address>172.16.9.3</local-bind-address>
      <local-bind-port>5432</local-bind-port>
      <group-address>231.7.7.7</group-address>
      <group-port>9876</group-port>
      <broadcast-period>2000</broadcast-period>
      <connector-ref connector-name="netty-connector"/>
   </broadcast-group>
</broadcast-groups>

Some of the broadcast group parameters are optional and you'll normally use the defaults, but we specify them all in the above example for clarity. Let's discuss each one in turn:

  • name attribute. Each broadcast group in the server must have a unique name.

  • local-bind-address. This is the local bind address that the datagram socket is bound to. If you have multiple network interfaces on your server, you would specify which one you wish to use for broadcasts by setting this property. If this property is not specified then the socket will be bound to the wildcard address, an IP address chosen by the kernel. This is a UDP specific attribute.

  • local-bind-port. If you want to specify a local port to which the datagram socket is bound you can specify it here. Normally you would just use the default value of -1 which signifies that an anonymous port should be used. This parameter is alawys specified in conjunction with local-bind-address. This is a UDP specific attribute.

  • group-address. This is the multicast address to which the data will be broadcast. It is a class D IP address in the range 224.0.0.0 to 239.255.255.255, inclusive. The address 224.0.0.0 is reserved and is not available for use. This parameter is mandatory. This is a UDP specific attribute.

  • group-port. This is the UDP port number used for broadcasting. This parameter is mandatory. This is a UDP specific attribute.

  • broadcast-period. This is the period in milliseconds between consecutive broadcasts. This parameter is optional, the default value is 2000 milliseconds.

  • connector-ref. This specifies the connector and optional backup connector that will be broadcasted (see Chapter 16, Configuring the Transport for more information on connectors). The connector to be broadcasted is specified by the connector-name attribute.

Here is another example broadcast group that defines a JGroups broadcast group:

<broadcast-groups>
   <broadcast-group name="my-broadcast-group">
      <jgroups-file>test-jgroups-file_ping.xml</jgroups-file>
      <jgroups-channel>hornetq_broadcast_channel</jgroups-channel>
      <broadcast-period>2000</broadcast-period>
    <connector-ref connector-name="netty-connector"/>
   </broadcast-group>
</broadcast-groups>

To be able to use JGroups to broadcast, one must specify two attributes, i.e. jgroups-file and jgroups-channel, as discussed in details as following:

  • jgroups-file attribute. This is the name of JGroups configuration file. It will be used to initialize JGroups channels. Make sure the file is in the java resource path so that HornetQ can load it.

  • jgroups-channel attribute. The name that JGroups channels connect to for broadcasting.

Note

The JGroups attributes (jgroups-file and jgroups-channel) and UDP specific attributes described above are exclusive of each other. Only one set can be specified in a broadcast group configuration. Don't mix them!

The following is an example of a JGroups file

<config xmlns="urn:org:jgroups"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/JGroups-3.0.xsd">
   <TCP loopback="true"
      recv_buf_size="20000000"
      send_buf_size="640000"
      discard_incompatible_packets="true"
      max_bundle_size="64000"
      max_bundle_timeout="30"
      enable_bundling="true"
      use_send_queues="false"
      sock_conn_timeout="300"

      thread_pool.enabled="true"
      thread_pool.min_threads="1"
      thread_pool.max_threads="10"
      thread_pool.keep_alive_time="5000"
      thread_pool.queue_enabled="false"
      thread_pool.queue_max_size="100"
      thread_pool.rejection_policy="run"

      oob_thread_pool.enabled="true"
      oob_thread_pool.min_threads="1"
      oob_thread_pool.max_threads="8"
      oob_thread_pool.keep_alive_time="5000"
      oob_thread_pool.queue_enabled="false"
      oob_thread_pool.queue_max_size="100"
      oob_thread_pool.rejection_policy="run"/>

   <FILE_PING location="../file.ping.dir"/>
   <MERGE2 max_interval="30000"
      min_interval="10000"/>
   <FD_SOCK/>
   <FD timeout="10000" max_tries="5" />
   <VERIFY_SUSPECT timeout="1500"  />
   <BARRIER />
   <pbcast.NAKACK
      use_mcast_xmit="false"
      retransmit_timeout="300,600,1200,2400,4800"
      discard_delivered_msgs="true"/>
   <UNICAST timeout="300,600,1200" />
   <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
      max_bytes="400000"/>
   <pbcast.GMS print_local_addr="true" join_timeout="3000"
      view_bundling="true"/>
   <FC max_credits="2000000"
      min_threshold="0.10"/>
   <FRAG2 frag_size="60000"  />
   <pbcast.STATE_TRANSFER/>
   <pbcast.FLUSH timeout="0"/>
</config>

As it shows, the file content defines a jgroups protocol stacks. If you want hornetq to use this stacks for channel creation, you have to make sure the value of jgroups-file in your broadcast-group/discovery-group configuration to be the name of this jgroups configuration file. For example if the above stacks configuration is stored in a file named "jgroups-stacks.xml" then your jgroups-file should be like

<jgroups-file>jgroups-stacks.xml</jgroups-file>

For cluster connections, discovery groups are defined in the server side configuration file hornetq-configuration.xml. All discovery groups must be defined inside a discovery-groups element. There can be many discovery groups defined by HornetQ server. Let's look at an example:

<discovery-groups>
   <discovery-group name="my-discovery-group">
      <local-bind-address>172.16.9.7</local-bind-address>
      <group-address>231.7.7.7</group-address>
      <group-port>9876</group-port>
      <refresh-timeout>10000</refresh-timeout>
   </discovery-group>
</discovery-groups>

We'll consider each parameter of the discovery group:

Here is another example that defines a JGroups discovery group:

<discovery-groups>
   <discovery-group name="my-broadcast-group">
      <jgroups-file>test-jgroups-file_ping.xml</jgroups-file>
      <jgroups-channel>hornetq_broadcast_channel</jgroups-channel>
      <refresh-timeout>10000</refresh-timeout>
   </discovery-group>
</discovery-groups>

To receive broadcast from JGroups channels, one must specify two attributes, jgroups-file and jgroups-channel, as discussed in details as following:

Let's discuss how to configure a HornetQ client to use discovery to discover a list of servers to which it can connect. The way to do this differs depending on whether you're using JMS or the core API.

If you're using JMS and you're also using the JMS Service on the server to load your JMS connection factory instances into JNDI, then you can specify which discovery group to use for your JMS connection factory in the server side xml configuration hornetq-jms.xml. Let's take a look at an example:

<connection-factory name="ConnectionFactory">
   <discovery-group-ref discovery-group-name="my-discovery-group"/>
   <entries>
      <entry name="ConnectionFactory"/>
   </entries>
</connection-factory>

The element discovery-group-ref specifies the name of a discovery group defined in hornetq-configuration.xml.

When this connection factory is downloaded from JNDI by a client application and JMS connections are created from it, those connections will be load-balanced across the list of servers that the discovery group maintains by listening on the multicast address specified in the discovery group configuration.

If you're using JMS, but you're not using JNDI to lookup a connection factory - you're instantiating the JMS connection factory directly then you can specify the discovery group parameters directly when creating the JMS connection factory. Here's an example:

final String groupAddress = "231.7.7.7";

final int groupPort = 9876;

ConnectionFactory jmsConnectionFactory =
HornetQJMSClient.createConnectionFactory(new DiscoveryGroupConfiguration(groupAddress, groupPort), , JMSFactoryType.CF);

Connection jmsConnection1 = jmsConnectionFactory.createConnection();

Connection jmsConnection2 = jmsConnectionFactory.createConnection();

The refresh-timeout can be set directly on the DiscoveryGroupConfiguration by using the setter method setDiscoveryRefreshTimeout() if you want to change the default value.

There is also a further parameter settable on the DiscoveryGroupConfiguration using the setter method setDiscoveryInitialWaitTimeout(). If the connection factory is used immediately after creation then it may not have had enough time to received broadcasts from all the nodes in the cluster. On first usage, the connection factory will make sure it waits this long since creation before creating the first connection. The default value for this parameter is 10000 milliseconds.

Sometimes it may be impossible to use UDP on the network you are using. In this case its possible to configure a connection with an initial list if possible servers. This could be just one server that you know will always be available or a list of servers where at least one will be available.

This doesn't mean that you have to know where all your servers are going to be hosted, you can configure these servers to use the reliable servers to connect to. Once they are connected there connection details will be propagated via the server it connects to

A static list of possible servers can also be used by a normal client.

If you're using JMS and you're also using the JMS Service on the server to load your JMS connection factory instances into JNDI, then you can specify which connectors to use for your JMS connection factory in the server side xml configuration hornetq-jms.xml. Let's take a look at an example:

<connection-factory name="ConnectionFactory">
   <connectors>
      <connector-ref connector-name="netty-connector"/>
      <connector-ref connector-name="netty-connector2"/>
      <connector-ref connector-name="netty-connector3"/>
   </connectors>
   <entries>
      <entry name="ConnectionFactory"/>
   </entries>
</connection-factory>

The element connectors contains a list of pre defined connectors in the hornetq-configuration.xml file. When this connection factory is downloaded from JNDI by a client application and JMS connections are created from it, those connections will be load-balanced across the list of servers defined by these connectors.

If you're using JMS, but you're not using JNDI to lookup a connection factory - you're instantiating the JMS connection factory directly then you can specify the connector list directly when creating the JMS connection factory. Here's an example:

HashMap<String, Object> map = new HashMap<String, Object>();
map.put("host", "myhost");
map.put("port", "5445");
TransportConfiguration server1 = new TransportConfiguration(NettyConnectorFactory.class.getName(), map);
HashMap<String, Object> map2 = new HashMap<String, Object>();
map2.put("host", "myhost2");
map2.put("port", "5446");
TransportConfiguration server2 = new TransportConfiguration(NettyConnectorFactory.class.getName(), map2);

HornetQConnectionFactory cf = HornetQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.CF, server1, server2);

If cluster connections are defined between nodes of a cluster, then HornetQ will load balance messages arriving at a particular node from a client.

Let's take a simple example of a cluster of four nodes A, B, C, and D arranged in a symmetric cluster (described in Section 38.7.1, “Symmetric cluster”). We have a queue called OrderQueue deployed on each node of the cluster.

We have client Ca connected to node A, sending orders to the server. We have also have order processor clients Pa, Pb, Pc, and Pd connected to each of the nodes A, B, C, D. If no cluster connection was defined on node A, then as order messages arrive on node A they will all end up in the OrderQueue on node A, so will only get consumed by the order processor client attached to node A, Pa.

If we define a cluster connection on node A, then as ordered messages arrive on node A instead of all of them going into the local OrderQueue instance, they are distributed in a round-robin fashion between all the nodes of the cluster. The messages are forwarded from the receiving node to other nodes of the cluster. This is all done on the server side, the client maintains a single connection to node A.

For example, messages arriving on node A might be distributed in the following order between the nodes: B, D, C, A, B, D, C, A, B, D. The exact order depends on the order the nodes started up, but the algorithm used is round robin.

HornetQ cluster connections can be configured to always blindly load balance messages in a round robin fashion irrespective of whether there are any matching consumers on other nodes, but they can be a bit cleverer than that and also be configured to only distribute to other nodes if they have matching consumers. We'll look at both these cases in turn with some examples, but first we'll discuss configuring cluster connections in general.

Cluster connections group servers into clusters so that messages can be load balanced between the nodes of the cluster. Let's take a look at a typical cluster connection. Cluster connections are always defined in hornetq-configuration.xml inside a cluster-connection element. There can be zero or more cluster connections defined per HornetQ server.

<cluster-connections>
   <cluster-connection name="my-cluster">
      <address>jms</address>
      <retry-interval>500</retry-interval>
      <use-duplicate-detection>true</use-duplicate-detection>
      <forward-when-no-consumers>false</forward-when-no-consumers>
      <max-hops>1</max-hops>
      <discovery-group-ref discovery-group-name="my-discovery-group"/>
   </cluster-connection>
</cluster-connections>

In the above cluster connection all parameters have been explicitly specified. The following shows all the available configuration options

Alternatively if you would like your cluster connections to use a static list of servers for discovery then you can do it like this.

<cluster-connection name="my-cluster">
   <address>jms</address>
   <connector-ref>netty-connector</connector-ref>
   <retry-interval>500</retry-interval>
   <use-duplicate-detection>true</use-duplicate-detection>
   <forward-when-no-consumers>true</forward-when-no-consumers>
   <max-hops>1</max-hops>
   <static-connectors>
      <connector-ref>server0-connector</connector-ref>
      <connector-ref>server1-connector</connector-ref>
   </static-connectors>
</cluster-connection>

Here we have defined 2 servers that we know for sure will that at least one will be available. There may be many more servers in the cluster but these will; be discovered via one of these connectors once an initial connection has been made.

With HornetQ client-side load balancing, subsequent sessions created using a single session factory can be connected to different nodes of the cluster. This allows sessions to spread smoothly across the nodes of a cluster and not be "clumped" on any particular node.

The load balancing policy to be used by the client factory is configurable. HornetQ provides four out-of-the-box load balancing policies, and you can also implement your own and use that.

The out-of-the-box policies are

You can also implement your own policy by implementing the interface org.hornetq.api.core.client.loadbalance.ConnectionLoadBalancingPolicy

Specifying which load balancing policy to use differs whether you are using JMS or the core API. If you don't specify a policy then the default will be used which is org.hornetq.api.core.client.loadbalance.RoundRobinConnectionLoadBalancingPolicy.

If you're using JMS, and you're using JNDI on the server to put your JMS connection factories into JNDI, then you can specify the load balancing policy directly in the hornetq-jms.xml configuration file on the server as follows:

<connection-factory name="ConnectionFactory">
   <discovery-group-ref discovery-group-name="my-discovery-group"/>
   <entries>
      <entry name="ConnectionFactory"/>
   </entries>
   <connection-load-balancing-policy-class-name>
      org.hornetq.api.core.client.loadbalance.RandomConnectionLoadBalancingPolicy
   </connection-load-balancing-policy-class-name>
</connection-factory>

The above example would deploy a JMS connection factory that uses the random connection load balancing policy.

If you're using JMS but you're instantiating your connection factory directly on the client side then you can set the load balancing policy using the setter on the HornetQConnectionFactory before using it:

ConnectionFactory jmsConnectionFactory = HornetQJMSClient.createConnectionFactory(...);
jmsConnectionFactory.setLoadBalancingPolicyClassName("com.acme.MyLoadBalancingPolicy");

If you're using the core API, you can set the load balancing policy directly on the ServerLocator instance you are using:

ServerLocator locator = HornetQClient.createServerLocatorWithHA(server1, server2);
locator.setLoadBalancingPolicyClassName("com.acme.MyLoadBalancingPolicy");

The set of servers over which the factory load balances can be determined in one of two ways:

Sometimes you want to explicitly define a cluster more explicitly, that is control which server connect to each other in the cluster. This is typically used to form non symmetrical clusters such as chain cluster or ring clusters. This can only be done using a static list of connectors and is configured as follows:

<cluster-connection name="my-cluster">
   <address>jms</address>
   <connector-ref>netty-connector</connector-ref>
   <retry-interval>500</retry-interval>
   <use-duplicate-detection>true</use-duplicate-detection>
   <forward-when-no-consumers>true</forward-when-no-consumers>
   <max-hops>1</max-hops>
   <static-connectors allow-direct-connections-only="true">
      <connector-ref>server1-connector</connector-ref>
   </static-connectors>
</cluster-connection>

In this example we have set the attribute allow-direct-connections-only which means that the only server that this server can create a cluster connection to is server1-connector. This means you can explicitly create any cluster topology you want.

Another important part of clustering is message redistribution. Earlier we learned how server side message load balancing round robins messages across the cluster. If forward-when-no-consumers is false, then messages won't be forwarded to nodes which don't have matching consumers, this is great and ensures that messages don't arrive on a queue which has no consumers to consume them, however there is a situation it doesn't solve: What happens if the consumers on a queue close after the messages have been sent to the node? If there are no consumers on the queue the message won't get consumed and we have a starvation situation.

This is where message redistribution comes in. With message redistribution HornetQ can be configured to automatically redistribute messages from queues which have no consumers back to other nodes in the cluster which do have matching consumers.

Message redistribution can be configured to kick in immediately after the last consumer on a queue is closed, or to wait a configurable delay after the last consumer on a queue is closed before redistributing. By default message redistribution is disabled.

Message redistribution can be configured on a per address basis, by specifying the redistribution delay in the address settings, for more information on configuring address settings, please see Chapter 25, Queue Attributes.

Here's an address settings snippet from hornetq-configuration.xml showing how message redistribution is enabled for a set of queues:

<address-settings>
   <address-setting match="jms.#">
      <redistribution-delay>0</redistribution-delay>
   </address-setting>
</address-settings>

The above address-settings block would set a redistribution-delay of 0 for any queue which is bound to an address that starts with "jms.". All JMS queues and topic subscriptions are bound to addresses that start with "jms.", so the above would enable instant (no delay) redistribution for all JMS queues and topic subscriptions.

The attribute match can be an exact match or it can be a string that conforms to the HornetQ wildcard syntax (described in Chapter 13, Understanding the HornetQ Wildcard Syntax).

The element redistribution-delay defines the delay in milliseconds after the last consumer is closed on a queue before redistributing messages from that queue to other nodes of the cluster which do have matching consumers. A delay of zero means the messages will be immediately redistributed. A value of -1 signifies that messages will never be redistributed. The default value is -1.

It often makes sense to introduce a delay before redistributing as it's a common case that a consumer closes but another one quickly is created on the same queue, in such a case you probably don't want to redistribute immediately since the new consumer will arrive shortly.

HornetQ clusters can be connected together in many different topologies, let's consider the two most common ones here

With a chain cluster, each node in the cluster is not connected to every node in the cluster directly, instead the nodes form a chain with a node on each end of the chain and all other nodes just connecting to the previous and next nodes in the chain.

An example of this would be a three node chain consisting of nodes A, B and C. Node A is hosted in one network and has many producer clients connected to it sending order messages. Due to corporate policy, the order consumer clients need to be hosted in a different network, and that network is only accessible via a third network. In this setup node B acts as a mediator with no producers or consumers on it. Any messages arriving on node A will be forwarded to node B, which will in turn forward them to node C where they can get consumed. Node A does not need to directly connect to C, but all the nodes can still act as a part of the cluster.

To set up a cluster in this way, node A would define a cluster connection that connects to node B, and node B would define a cluster connection that connects to node C. In this case we only want cluster connections in one direction since we're only moving messages from node A->B->C and never from C->B->A.

For this topology we would set max-hops to 2. With a value of 2 the knowledge of what queues and consumers that exist on node C would be propagated from node C to node B to node A. Node A would then know to distribute messages to node B when they arrive, even though node B has no consumers itself, it would know that a further hop away is node C which does have consumers.