JBoss.orgCommunity Documentation

HornetQ User Manual

Putting the buzz in messaging

HornetQ Logo


1. Legal Notice
2. Preface
3. Project Information
3.1. Software Download
3.2. Project Information
4. Messaging Concepts
4.1. Messaging Concepts
4.2. Messaging styles
4.2.1. The Message Queue Pattern
4.2.2. The Publish-Subscribe Pattern
4.3. Delivery guarantees
4.4. Transactions
4.5. Durability
4.6. Messaging APIs and protocols
4.6.1. Java Message Service (JMS)
4.6.2. System specific APIs
4.6.3. RESTful API
4.6.4. STOMP
4.6.5. AMQP
4.7. High Availability
4.8. Clusters
4.9. Bridges and routing
5. Architecture
5.1. Core Architecture
5.2. HornetQ embedded in your own application
5.3. HornetQ integrated with a JEE application server
5.4. HornetQ stand-alone server
6. Using the Server
6.1. Starting and Stopping the standalone server
6.2. Server JVM settings
6.3. Server classpath
6.4. Library Path
6.5. System properties
6.6. Configuration files
6.7. JBoss Microcontainer Beans File
6.8. JBoss AS4 MBean Service.
6.9. The main configuration file.
7. Using JMS
7.1. A simple ordering system
7.2. JMS Server Configuration
7.3. Connection Factory Types
7.4. JNDI configuration
7.5. The code
7.6. Directly instantiating JMS Resources without using JNDI
7.7. Setting The Client ID
7.8. Setting The Batch Size for DUPS_OK
7.9. Setting The Transaction Batch Size
8. Using Core
8.1. Core Messaging Concepts
8.1.1. Message
8.1.2. Address
8.1.3. Queue
8.1.4. ServerLocator
8.1.5. ClientSessionFactory
8.1.6. ClientSession
8.1.7. ClientConsumer
8.1.8. ClientProducer
8.2. A simple example of using Core
9. Mapping JMS Concepts to the Core API
10. The Client Classpath
10.1. HornetQ Core Client
10.2. JMS Client
10.3. JMS Client with JNDI
11. Examples
11.1. JMS Examples
11.1.1. Applet
11.1.2. Application-Layer Failover
11.1.3. Core Bridge Example
11.1.4. Browser
11.1.5. Client Kickoff
11.1.6. Client-Side Load-Balancing
11.1.7. Clustered Durable Subscription
11.1.8. Clustered Grouping
11.1.9. Clustered Queue
11.1.10. Clustering with JGroups
11.1.11. Clustered Standalone
11.1.12. Clustered Static Discovery
11.1.13. Clustered Static Cluster One Way
11.1.14. Clustered Topic
11.1.15. Message Consumer Rate Limiting
11.1.16. Dead Letter
11.1.17. Delayed Redelivery
11.1.18. Divert
11.1.19. Durable Subscription
11.1.20. Embedded
11.1.21. Embedded Simple
11.1.22. Message Expiration
11.1.23. Failover Manual Stop
11.1.24. HTTP Transport
11.1.25. Instantiate JMS Objects Directly
11.1.26. Interceptor
11.1.27. JAAS
11.1.28. JMS Bridge
11.1.29. JMX Management
11.1.30. Large Message
11.1.31. Last-Value Queue
11.1.32. Management
11.1.33. Management Notification
11.1.34. Message Counter
11.1.35. Message Group
11.1.36. Message Group
11.1.37. Message Priority
11.1.38. Multiple Failover
11.1.39. Multiple Failover Failback
11.1.40. No Consumer Buffering
11.1.41. Non-Transaction Failover With Server Data Replication
11.1.42. Paging
11.1.43. Pre-Acknowledge
11.1.44. Message Producer Rate Limiting
11.1.45. Queue
11.1.46. Message Redistribution
11.1.47. Queue Requestor
11.1.48. Queue with Message Selector
11.1.49. Reattach Node example
11.1.50. Request-Reply example
11.1.51. Scheduled Message
11.1.52. Security
11.1.53. Send Acknowledgements
11.1.54. Spring Integration
11.1.55. SSL Transport
11.1.56. Static Message Selector
11.1.57. Static Message Selector Using JMS
11.1.58. Stomp
11.1.59. Stomp1.1
11.1.60. Stomp Over Web Sockets
11.1.61. Symmetric Cluster
11.1.62. Temporary Queue
11.1.63. Topic
11.1.64. Topic Hierarchy
11.1.65. Topic Selector 1
11.1.66. Topic Selector 2
11.1.67. Transaction Failover
11.1.68. Transactional Session
11.1.69. XA Heuristic
11.1.70. XA Receive
11.1.71. XA Send
11.1.72. XA with Transaction Manager
11.2. Core API Examples
11.2.1. Embedded
11.3. Java EE Examples
11.3.1. EJB/JMS Transaction
11.3.2. HAJNDI (High Availability)
11.3.3. Resource Adapter Configuration
11.3.4. Resource Adapter Remote Server Configuration
11.3.5. JMS Bridge
11.3.6. MDB (Message Driven Bean)
11.3.7. Servlet Transport
11.3.8. Servlet SSL Transport
11.3.9. XA Recovery
12. Routing Messages With Wild Cards
13. Understanding the HornetQ Wildcard Syntax
14. Filter Expressions
15. Persistence
15.1. Configuring the bindings journal
15.2. Configuring the jms journal
15.3. Configuring the message journal
15.4. An important note on disabling disk write cache.
15.5. Installing AIO
15.6. Configuring HornetQ for Zero Persistence
15.7. Import/Export the Journal Data
16. Configuring the Transport
16.1. Understanding Acceptors
16.2. Understanding Connectors
16.3. Configuring the transport directly from the client side.
16.4. Configuring the Netty transport
16.4.1. Configuring Netty TCP
16.4.2. Configuring Netty SSL
16.4.3. Configuring Netty HTTP
16.4.4. Configuring Netty Servlet
17. Detecting Dead Connections
17.1. Cleaning up Dead Connection Resources on the Server
17.1.1. Closing core sessions or JMS connections that you have failed to close
17.2. Detecting failure from the client side.
17.3. Configuring Asynchronous Connection Execution
18. Resource Manager Configuration
19. Flow Control
19.1. Consumer Flow Control
19.1.1. Window-Based Flow Control
19.1.2. Rate limited flow control
19.2. Producer flow control
19.2.1. Window based flow control
19.2.2. Rate limited flow control
20. Guarantees of sends and commits
20.1. Guarantees of Transaction Completion
20.2. Guarantees of Non Transactional Message Sends
20.3. Guarantees of Non Transactional Acknowledgements
20.4. Asynchronous Send Acknowledgements
20.4.1. Asynchronous Send Acknowledgements
21. Message Redelivery and Undelivered Messages
21.1. Delayed Redelivery
21.1.1. Configuring Delayed Redelivery
21.1.2. Example
21.2. Dead Letter Addresses
21.2.1. Configuring Dead Letter Addresses
21.2.2. Dead Letter Properties
21.2.3. Example
21.3. Delivery Count Persistence
22. Message Expiry
22.1. Message Expiry
22.2. Configuring Expiry Addresses
22.3. Configuring The Expiry Reaper Thread
22.4. Example
23. Large Messages
23.1. Configuring the server
23.2. Configuring Parameters
23.2.1. Using Core API
23.2.2. Using JMS
23.2.3. Compressed Large Messages
23.3. Streaming large messages
23.3.1. Streaming over Core API
23.3.2. Streaming over JMS
23.4. Streaming Alternative
23.5. Large message example
24. Paging
24.1. Page Files
24.2. Configuration
24.3. Paging Mode
24.3.1. Configuration
24.4. Dropping messages
24.5. Dropping messages and throwing an exception to producers
24.6. Blocking producers
24.7. Caution with Addresses with Multiple Queues
24.8. Example
25. Queue Attributes
25.1. Predefined Queues
25.2. Using the API
25.3. Configuring Queues Via Address Settings
26. Scheduled Messages
26.1. Scheduled Delivery Property
26.2. Example
27. Last-Value Queues
27.1. Configuring Last-Value Queues
27.2. Using Last-Value Property
27.3. Example
28. Message Grouping
28.1. Using Core API
28.2. Using JMS
28.3. Example
28.4. Example
28.5. Clustered Grouping
28.5.1. Clustered Grouping Best Practices
28.5.2. Clustered Grouping Example
29. Extra Acknowledge Modes
29.1. Using PRE_ACKNOWLEDGE
29.2. Individual Acknowledge
29.3. Example
30. Management
30.1. The Management API
30.1.1. Core Management API
30.1.2. JMS Management API
30.2. Using Management Via JMX
30.2.1. Configuring JMX
30.2.2. Example
30.3. Using Management Via Core API
30.3.1. Configuring Core Management
30.4. Using Management Via JMS
30.4.1. Configuring JMS Management
30.4.2. Example
30.5. Management Notifications
30.5.1. JMX Notifications
30.5.2. Core Messages Notifications
30.5.3. JMS Messages Notifications
30.5.4. Example
30.6. Message Counters
30.6.1. Configuring Message Counters
30.6.2. Example
30.7. Administering HornetQ Resources Using The JBoss AS Admin Console
30.7.1. JMS Queues
30.7.2. JMS Topics
30.7.3. JMS Connection Factories
31. Security
31.1. Role based security for addresses
31.2. Secure Sockets Layer (SSL) Transport
31.3. Basic user credentials
31.4. Changing the security manager
31.5. JAAS Security Manager
31.5.1. Example
31.6. JBoss AS Security Manager
31.6.1. Configuring Client Login
31.6.2. Changing the Security Domain
31.7. Changing the username/password for clustering
32. Application Server Integration and Java EE
32.1. Configuring Message-Driven Beans
32.1.1. Using Container-Managed Transactions
32.1.2. Using Bean-Managed Transactions
32.1.3. Using Message Selectors with Message-Driven Beans
32.2. Sending Messages from within JEE components
32.3. MDB and Consumer pool size
32.4. Configuring the JCA Adaptor
32.4.1. Global Properties
32.4.2. Adapter Outbound Configuration
32.4.3. Adapter Inbound Configuration
32.4.4. Configuring the adapter to use a standalone HornetQ Server
32.5. Configuring the JBoss Application Server to connect to Remote HornetQ Server
32.5.1. Configuring Jboss 5
32.5.2. Configuring Jboss 5
32.6. High Availability JNDI (HA-JNDI)
32.7. XA Recovery
32.7.1. XA Recovery Configuration
32.7.2. Example
33. The JMS Bridge
33.1. JMS Bridge Parameters
33.2. Source and Target Connection Factories
33.3. Source and Target Destination Factories
33.4. Quality Of Service
33.4.1. AT_MOST_ONCE
33.4.2. DUPLICATES_OK
33.4.3. ONCE_AND_ONLY_ONCE
33.4.4. Time outs and the JMS bridge
33.4.5. Examples
34. Client Reconnection and Session Reattachment
34.1. 100% Transparent session re-attachment
34.2. Session reconnection
34.3. Configuring reconnection/reattachment attributes
34.4. ExceptionListeners and SessionFailureListeners
35. Diverting and Splitting Message Flows
35.1. Exclusive Divert
35.2. Non-exclusive Divert
36. Core Bridges
36.1. Configuring Bridges
37. Duplicate Message Detection
37.1. Using Duplicate Detection for Message Sending
37.2. Configuring the Duplicate ID Cache
37.3. Duplicate Detection and Bridges
37.4. Duplicate Detection and Cluster Connections
38. Clusters
38.1. Clusters Overview
38.2. Server discovery
38.2.1. Dynamic Discovery
38.2.2. Discovery using static Connectors
38.3. Server-Side Message Load Balancing
38.3.1. Configuring Cluster Connections
38.3.2. Cluster User Credentials
38.4. Client-Side Load balancing
38.5. Specifying Members of a Cluster Explicitly
38.6. Message Redistribution
38.7. Cluster topologies
38.7.1. Symmetric cluster
38.7.2. Chain cluster
39. High Availability and Failover
39.1. Live - Backup Groups
39.1.1. HA modes
39.1.2. Data Replication
39.1.3. Shared Store
39.1.4. Failing Back to live Server
39.2. Failover Modes
39.2.1. Automatic Client Failover
39.2.2. Getting Notified of Connection Failure
39.2.3. Application-Level Failover
40. Libaio Native Libraries
40.1. Compiling the native libraries
40.1.1. Install requirements
40.1.2. Invoking the compilation
41. Thread management
41.1. Server-Side Thread Management
41.1.1. Server Scheduled Thread Pool
41.1.2. General Purpose Server Thread Pool
41.1.3. Expiry Reaper Thread
41.1.4. Asynchronous IO
41.2. Client-Side Thread Management
42. Logging
42.1. Logging in a client or with an Embedded server
42.2. Logging With The JBoss Application Server
43. REST Interface
43.1. Goals of REST Interface
43.2. Installation and Configuration
43.2.1. Installing Within Pre-configured Environment
43.2.2. Bootstrapping HornetQ Along with REST
43.2.3. REST Configuration
43.3. HornetQ REST Interface Basics
43.3.1. Queue and Topic Resources
43.3.2. Queue Resource Response Headers
43.3.3. Topic Resource Respones Headers
43.4. Posting Messages
43.4.1. Duplicate Detection
43.4.2. Persistent Messages
43.4.3. TTL, Expiration and Priority
43.5. Consuming Messages via Pull
43.5.1. Auto-Acknowledge
43.5.2. Manual Acknowledgement
43.5.3. Blocking Pulls with Accept-Wait
43.5.4. Clean Up Your Consumers!
43.6. Pushing Messages
43.6.1. The Queue Push Subscription XML
43.6.2. The Topic Push Subscription XML
43.6.3. Creating a Push Subscription at Runtime
43.6.4. Creating a Push Subscription by Hand
43.6.5. Pushing to Authenticated Servers
43.7. Creating Destinations
43.8. Securing the HornetQ REST Interface
43.8.1. Within JBoss Application server
43.8.2. Security in other environments
43.9. Mixing JMS and REST
43.9.1. JMS Producers - REST Consumers
43.9.2. REST Producers - JMS Consumers
44. Embedding HornetQ
44.1. Simple Config File Embedding
44.1.1. Core API Only
44.1.2. JMS API
44.2. POJO instantiation - Embedding Programmatically
44.3. Dependency Frameworks
45. Spring Integration
46. Intercepting Operations
46.1. Implementing The Interceptors
46.2. Configuring The Interceptors
46.3. Interceptors on the Client Side
46.4. Example
47. Interoperability
47.1. Stomp
47.1.1. Native Stomp support
47.1.2. Mapping Stomp destinations to HornetQ addresses and queues
47.1.3. STOMP and connection-ttl
47.1.4. Stomp and JMS interoperabilty
47.1.5. Stomp Over Web Sockets
47.1.6. StompConnect
47.2. REST
47.3. AMQP
48. Performance Tuning
48.1. Tuning persistence
48.2. Tuning JMS
48.3. Other Tunings
48.4. Tuning Transport Settings
48.5. Tuning the VM
48.6. Avoiding Anti-Patterns
49. Configuration Reference
49.1. Server Configuration
49.1.1. hornetq-configuration.xml
49.1.2. hornetq-jms.xml
49.1.3. Using Masked Passwords in Configuration Files

Copyright © 2010 Red Hat, Inc. and others.

The text of and illustrations in this document are licensed by Red Hat under a Creative Commons Attribution–Share Alike 3.0 Unported license ("CC-BY-SA").

An explanation of CC-BY-SA is available at http://creativecommons.org/licenses/by-sa/3.0/. In accordance with CC-BY-SA, if you distribute this document or an adaptation of it, you must provide the URL for the original version.

Red Hat, as the licensor of this document, waives the right to enforce, and agrees not to assert, Section 4d of CC-BY-SA to the fullest extent permitted by applicable law.

What is HornetQ?

Why use HornetQ? Here are just a few of the reasons:

  • 100% open source software. HornetQ is licenced using the Apache Software License v 2.0 to minimise barriers to adoption.

  • HornetQ is designed with usability in mind.

  • Written in Java. Runs on any platform with a Java 6+ runtime, that's everything from Windows desktops to IBM mainframes.

  • Amazing performance. Our ground-breaking high performance journal provides persistent messaging performance at rates normally seen for non-persistent messaging, our non-persistent messaging performance rocks the boat too.

  • Full feature set. All the features you'd expect in any serious messaging system, and others you won't find anywhere else.

  • Elegant, clean-cut design with minimal third party dependencies. Run HornetQ stand-alone, run it in integrated in your favourite JEE application server, or run it embedded inside your own product. It's up to you.

  • Seamless high availability. We provide a HA solution with automatic client failover so you can guarantee zero message loss or duplication in event of server failure.

  • Hugely flexible clustering. Create clusters of servers that know how to load balance messages. Link geographically distributed clusters over unreliable connections to form a global network. Configure routing of messages in a highly flexible way.

  • For a full list of features, please see the features wiki page .

The official HornetQ project page is http://hornetq.org/.

Red Hat kindly employs developers to work full time on HornetQ, they are:

  • Clebert Suconic (project lead)

  • Andy Taylor

  • Howard Gao

  • Francisco Borges

And many thanks to all our contributors, both old and new who helped create HornetQ, for a full list of the people who made it happen, take a look at our team page.

HornetQ is an asynchronous messaging system, an example of Message Oriented Middleware , we'll just call them messaging systems in the remainder of this book.

We'll first present a brief overview of what kind of things messaging systems do, where they're useful and the kind of concepts you'll hear about in the messaging world.

If you're already familiar with what a messaging system is and what it's capable of, then you can skip this chapter.

Messaging systems allow you to loosely couple heteregenous systems together, whilst typically providing reliability, transactions and many other features.

Unlike systems based on a Remote Procedure Call (RPC) pattern, messaging systems primarily use an asynchronous message passing pattern with no tight relationship between requests and responses. Most messaging systems also support a request-response mode but this is not a primary feature of messaging systems.

Designing systems to be asynchronous from end-to-end allows you to really take advantage of your hardware resources, minimizing the amount of threads blocking on IO operations, and to use your network bandwidth to its full capacity. With an RPC approach you have to wait for a response for each request you make so are limited by the network round trip time, or latency of your network. With an asynchronous system you can pipeline flows of messages in different directions, so are limited by the network bandwidth not the latency. This typically allows you to create much higher performance applications.

Messaging systems decouple the senders of messages from the consumers of messages. The senders and consumers of messages are completely independent and know nothing of each other. This allows you to create flexible, loosely coupled systems.

Often, large enterprises use a messaging system to implement a message bus which loosely couples heterogeneous systems together. Message buses often form the core of an Enterprise Service Bus. (ESB). Using a message bus to de-couple disparate systems can allow the system to grow and adapt more easily. It also allows more flexibility to add new systems or retire old ones since they don't have brittle dependencies on each other.

Messaging systems normally support two main styles of asynchronous messaging: message queue messaging (also known as point-to-point messaging) and publish subscribe messaging. We'll summarise them briefly here:

With this type of messaging you send a message to a queue. The message is then typically persisted to provide a guarantee of delivery, then some time later the messaging system delivers the message to a consumer. The consumer then processes the message and when it is done, it acknowledges the message. Once the message is acknowledged it disappears from the queue and is not available to be delivered again. If the system crashes before the messaging server receives an acknowledgement from the consumer, then on recovery, the message will be available to be delivered to a consumer again.

With point-to-point messaging, there can be many consumers on the queue but a particular message will only ever be consumed by a maximum of one of them. Senders (also known as producers) to the queue are completely decoupled from receivers (also known as consumers) of the queue - they do not know of each others existence.

A classic example of point to point messaging would be an order queue in a company's book ordering system. Each order is represented as a message which is sent to the order queue. Let's imagine there are many front end ordering systems which send orders to the order queue. When a message arrives on the queue it is persisted - this ensures that if the server crashes the order is not lost. Let's also imagine there are many consumers on the order queue - each representing an instance of an order processing component - these can be on different physical machines but consuming from the same queue. The messaging system delivers each message to one and only one of the ordering processing components. Different messages can be processed by different order processors, but a single order is only processed by one order processor - this ensures orders aren't processed twice.

As an order processor receives a message, it fulfills the order, sends order information to the warehouse system and then updates the order database with the order details. Once it's done that it acknowledges the message to tell the server that the order has been processed and can be forgotten about. Often the send to the warehouse system, update in database and acknowledgement will be completed in a single transaction to ensure ACID properties.

How do client applications interact with messaging systems in order to send and consume messages?

Several messaging systems provide their own proprietary APIs with which the client communicates with the messaging system.

There are also some standard ways of operating with messaging systems and some emerging standards in this space.

Let's take a brief look at these:

JMS is part of Sun's JEE specification. It's a Java API that encapsulates both message queue and publish-subscribe messaging patterns. JMS is a lowest common denominator specification - i.e. it was created to encapsulate common functionality of the already existing messaging systems that were available at the time of its creation.

JMS is a very popular API and is implemented by most, messaging systems. JMS is only available to clients running Java.

JMS does not define a standard wire format - it only defines a programmatic API so JMS clients and servers from different vendors cannot directly interoperate since each will use the vendor's own internal wire protocol.

HornetQ provides a fully compliant JMS 1.1 API.

REST approaches to messaging are showing a lot interest recently.

It seems plausible that API standards for cloud computing may converge on a REST style set of interfaces and consequently a REST messaging approach is a very strong contender for becoming the defacto method for messaging interoperability.

With a REST approach messaging resources are manipulated as resources defined by a URI and typically using a simple set of operations on those resources, e.g. PUT, POST, GET etc. REST approaches to messaging often use HTTP as their underlying protocol.

The advantage of a REST approach with HTTP is in its simplicity and the fact the internet is already tuned to deal with HTTP optimally.

Please see Chapter 43, REST Interface for using HornetQ's RESTful interface.

Stomp is a very simple text protocol for interoperating with messaging systems. It defines a wire format, so theoretically any Stomp client can work with any messaging system that supports Stomp. Stomp clients are available in many different programming languages.

Please see Section 47.1, “Stomp” for using STOMP with HornetQ.

AMQP is a specification for interoperable messaging. It also defines a wire format, so any AMQP client can work with any messaging system that supports AMQP. AMQP clients are available in many different programming languages.

HornetQ will shortly be implementing AMQP.

In this section we will give an overview of the HornetQ high level architecture.

HornetQ core is designed simply as set of Plain Old Java Objects (POJOs) - we hope you like it's clean-cut design.

We've also designed it to have as few dependencies on external jars as possible. In fact, HornetQ core has only one jar dependency, netty.jar, other than the standard JDK classes! This is because we use some of the netty buffer classes internally.

This allows HornetQ to be easily embedded in your own project, or instantiated in any dependency injection framework such as JBoss Microcontainer, Spring or Google Guice.

Each HornetQ server has its own ultra high performance persistent journal, which it uses for message and other persistence.

Using a high performance journal allows outrageous persistence message performance, something not achievable when using a relational database for persistence.

HornetQ clients, potentially on different physical machines interact with the HornetQ server. HornetQ currently provides two APIs for messaging at the client side:

JMS semantics are implemented by a thin JMS facade layer on the client side.

The HornetQ server does not speak JMS and in fact does not know anything about JMS, it's a protocol agnostic messaging server designed to be used with multiple different protocols.

When a user uses the JMS API on the client side, all JMS interactions are translated into operations on the HornetQ core client API before being transferred over the wire using the HornetQ wire format.

The server always just deals with core API interactions.

A schematic illustrating this relationship is shown in figure 3.1 below:

Figure 3.1 shows two user applications interacting with a HornetQ server. User Application 1 is using the JMS API, while User Application 2 is using the core client API directly.

You can see from the diagram that the JMS API is implemented by a thin facade layer on the client side.

HornetQ provides its own fully functional Java Connector Architecture (JCA) adaptor which enables it to be integrated easily into any JEE compliant application server or servlet engine.

JEE application servers provide Message Driven Beans (MDBs), which are a special type of Enterprise Java Beans (EJBs) that can process messages from sources such as JMS systems or mail systems.

Probably the most common use of an MDB is to consume messages from a JMS messaging system.

According to the JEE specification, a JEE application server uses a JCA adapter to integrate with a JMS messaging system so it can consume messages for MDBs.

However, the JCA adapter is not only used by the JEE application server for consuming messages via MDBs, it is also used when sending message to the JMS messaging system e.g. from inside an EJB or servlet.

When integrating with a JMS messaging system from inside a JEE application server it is always recommended that this is done via a JCA adaptor. In fact, communicating with a JMS messaging system directly, without using JCA would be illegal according to the JEE specification.

The application server's JCA service provides extra functionality such as connection pooling and automatic transaction enlistment, which are desirable when using messaging, say, from inside an EJB. It is possible to talk to a JMS messaging system directly from an EJB, MDB or servlet without going through a JCA adapter, but this is not recommended since you will not be able to take advantage of the JCA features, such as caching of JMS sessions, which can result in poor performance.

Figure 3.2 below shows a JEE application server integrating with a HornetQ server via the HornetQ JCA adaptor. Note that all communication between EJB sessions or entity beans and Message Driven beans go through the adaptor and not directly to HornetQ.

The large arrow with the prohibited sign shows an EJB session bean talking directly to the HornetQ server. This is not recommended as you'll most likely end up creating a new connection and session every time you want to interact from the EJB, which is an anti-pattern.

For more information on using the JCA adaptor, please see Chapter 32, Application Server Integration and Java EE.

HornetQ can also be deployed as a stand-alone server. This means a fully independent messaging server not dependent on a JEE application server.

The standard stand-alone messaging server configuration comprises a core messaging server, a JMS service and a JNDI service.

The role of the JMS Service is to deploy any JMS Queue, Topic and ConnectionFactory instances from any server side hornetq-jms.xml configuration files. It also provides a simple management API for creating and destroying Queues, Topics and ConnectionFactory instances which can be accessed via JMX or the connection. It is a separate service to the HornetQ core server, since the core server is JMS agnostic. If you don't want to deploy any JMS Queue, Topic or ConnectionFactory instances via server side XML configuration and don't require a JMS management API on the server side then you can disable this service.

We also include a JNDI server since JNDI is a common requirement when using JMS to lookup Queues, Topics and ConnectionFactory instances. If you do not require JNDI then this service can also be disabled. HornetQ allows you to programmatically create JMS and core objects directly on the client side as opposed to looking them up from JNDI, so a JNDI server is not always a requirement.

The stand-alone server configuration uses JBoss Microcontainer to instantiate and enforce dependencies between the components. JBoss Microcontainer is a very lightweight POJO bootstrapper.

The stand-alone server architecture is shown in figure 3.3 below:

For more information on server configuration files see Section 49.1, “Server Configuration”. $

This chapter will familiarise you with how to use the HornetQ server.

We'll show where it is, how to start and stop it, and we'll describe the directory layout and what all the files are and what they do.

For the remainder of this chapter when we talk about the HornetQ server we mean the HornetQ standalone server, in its default configuration with a JMS Service and JNDI service enabled.

When running embedded in JBoss Application Server the layout may be slightly different but by-and-large will be the same.

If you're using the Asynchronous IO Journal on Linux, you need to specify java.library.path as a property on your Java options. This is done automatically in the run.sh script.

If you don't specify java.library.path at your Java options then the JVM will use the environment variable LD_LIBRARY_PATH.

The configuration directory is specified on the classpath in the run scripts run.sh and run.bat This directory can contain the following files.

Note

The property file-deployment-enabled in the hornetq-configuration.xml configuration when set to false means that the other configuration files are not loaded. This is true by default.

It is also possible to use system property substitution in all the configuration files. by replacing a value with the name of a system property. Here is an example of this with a connector configuration:

<connector name="netty">
   <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
   <param key="host" value="${hornetq.remoting.netty.host:localhost}" type="String"/>
   <param key="port" value="${hornetq.remoting.netty.port:5445}" type="Integer"/>
</connector>

Here you can see we have replaced 2 values with system properties hornetq.remoting.netty.host and hornetq.remoting.netty.port. These values will be replaced by the value found in the system property if there is one, if not they default back to localhost or 5445 respectively. It is also possible to not supply a default. i.e. ${hornetq.remoting.netty.host}, however the system property must be supplied in that case.

The stand-alone server is basically a set of POJOs which are instantiated by the light weight JBoss Microcontainer engine.

Note

A beans file is also needed when the server is deployed in the JBoss Application Server but this will deploy a slightly different set of objects since the Application Server will already have things like security etc deployed.

Let's take a look at an example beans file from the stand-alone server:

<?xml version="1.0" encoding="UTF-8"?>
<deployment xmlns="urn:jboss:bean-deployer:2.0">
   <bean name="Naming" class="org.jnp.server.NamingBeanImpl"/>
   <!-- JNDI server. Disable this if you don't want JNDI -->
   <bean name="JNDIServer" class="org.jnp.server.Main">
      <property name="namingInfo">
         <inject bean="Naming"/>
      </property>
      <property name="port">1099</property>
      <property name="bindAddress">localhost</property>
      <property name="rmiPort">1098</property>
      <property name="rmiBindAddress">localhost</property>
   </bean>

   <!-- MBean server -->
   <bean name="MBeanServer" class="javax.management.MBeanServer">
      <constructor factoryClass="java.lang.management.ManagementFactory"
         factoryMethod="getPlatformMBeanServer"/>
   </bean>

   <!-- The core configuration -->
   <bean name="Configuration" class="org.hornetq.core.config.impl.FileConfiguration">
   </bean>

   <!-- The security manager -->
   <bean name="HornetQSecurityManager"
         class="org.hornetq.spi.core.security.HornetQSecurityManagerImpl">
      <start ignored="true"/>
      <stop ignored="true"/>
   </bean>

   <!-- The core server -->
   <bean name="HornetQServer" class="org.hornetq.core.server.impl.HornetQServerImpl">
      <start ignored="true"/>
      <stop ignored="true"/>
      <constructor>
         <parameter>
            <inject bean="Configuration"/>
         </parameter>
         <parameter>
            <inject bean="MBeanServer"/>
         </parameter>
         <parameter>
            <inject bean="HornetQSecurityManager"/>
         </parameter>
      </constructor>
   </bean>

   <!-- The JMS server -->
   <bean name="JMSServerManager"
         class="org.hornetq.jms.server.impl.JMSServerManagerImpl">
      <constructor>
         <parameter>
            <inject bean="HornetQServer"/>
         </parameter>
      </constructor>
   </bean>
</deployment>

We can see that, as well as the core HornetQ server, the stand-alone server instantiates various different POJOs, lets look at them in turn:

<?xml version="1.0" encoding="UTF-8"?>
<server>
   <mbean code="org.hornetq.service.HornetQFileConfigurationService"
      name="org.hornetq:service=HornetQFileConfigurationService">
   </mbean>

   <mbean code="org.hornetq.service.JBossASSecurityManagerService"
      name="org.hornetq:service=JBossASSecurityManagerService">
   </mbean>

   <mbean code="org.hornetq.service.HornetQStarterService" 
      name="org.hornetq:service=HornetQStarterService">
      <!--lets let the JMS Server start us-->
         <attribute name="Start">false</attribute>

      <depends optional-attribute-name="SecurityManagerService"
         proxy-type="attribute">org.hornetq:service=JBossASSecurityManagerService</depends>
      <depends optional-attribute-name="ConfigurationService"
         proxy-type="attribute">org.hornetq:service=HornetQFileConfigurationService</depends>
   </mbean>

   <mbean code="org.hornetq.service.HornetQJMSStarterService"
      name="org.hornetq:service=HornetQJMSStarterService">
      <depends optional-attribute-name="HornetQServer"
         proxy-type="attribute">org.hornetq:service=HornetQStarterService</depends>
   </mbean>
</server>

This jboss-service.xml configuration file is included inside the hornetq-service.sar on AS4 with embebbed HornetQ. As you can see, on this configuration file we are starting various services:

Although HornetQ provides a JMS agnostic messaging API, many users will be more comfortable using JMS.

JMS is a very popular API standard for messaging, and most messaging systems provide a JMS API. If you are completely new to JMS we suggest you follow the Sun JMS tutorial - a full JMS tutorial is out of scope for this guide.

HornetQ also ships with a wide range of examples, many of which demonstrate JMS API usage. A good place to start would be to play around with the simple JMS Queue and Topic example, but we also provide examples for many other parts of the JMS API. A full description of the examples is available in Chapter 11, Examples.

In this section we'll go through the main steps in configuring the server for JMS and creating a simple JMS program. We'll also show how to configure and use JNDI, and also how to use JMS with HornetQ without using any JNDI.

The file hornetq-jms.xml on the server classpath contains any JMS Queue, Topic and ConnectionFactory instances that we wish to create and make available to lookup via the JNDI.

A JMS ConnectionFactory object is used by the client to make connections to the server. It knows the location of the server it is connecting to, as well as many other configuration parameters. In most cases the defaults will be acceptable.

We'll deploy a single JMS Queue and a single JMS Connection Factory instance on the server for this example but there are no limits to the number of Queues, Topics and Connection Factory instances you can deploy from the file. Here's our configuration:

<configuration xmlns="urn:hornetq" 
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="urn:hornetq ../schemas/hornetq-jms.xsd ">
    
   <connection-factory name="ConnectionFactory">
      <connectors>
         <connector-ref connector-name="netty"/>
      </connectors>
      <entries>
         <entry name="ConnectionFactory"/>
      </entries>
   </connection-factory>

   <queue name="OrderQueue">
      <entry name="queues/OrderQueue"/>
   </queue>
</configuration>

We deploy one ConnectionFactory called ConnectionFactory and bind it in just one place in JNDI as given by the entry element. ConnectionFactory instances can be bound in many places in JNDI if you require.

When using JNDI from the client side you need to specify a set of JNDI properties which tell the JNDI client where to locate the JNDI server, amongst other things. These are often specified in a file called jndi.properties on the client classpath, or you can specify them directly when creating the JNDI initial context. A full JNDI tutorial is outside the scope of this document, please see the Sun JNDI tutorial for more information on how to use JNDI.

For talking to the JBoss JNDI Server, the jndi properties will look something like this:

java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
java.naming.provider.url=jnp://myhost:1099
java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces

Where myhost is the hostname or IP address of the JNDI server. 1099 is the port used by the JNDI server and may vary depending on how you have configured your JNDI server.

In the default standalone configuration, JNDI server ports are configured in the file hornetq-beans.xml by setting properties on the JNDIServer bean:

<bean name="JNDIServer" class="org.jnp.server.Main">
   <property name="namingInfo">
      <inject bean="Naming"/>
   </property>
   <property name="port">1099</property>
   <property name="bindAddress">localhost</property>
   <property name="rmiPort">1098</property>
   <property name="rmiBindAddress">localhost</property>
</bean>

Note

If you want your JNDI server to be available to non local clients make sure you change it's bind address to something other than localhost!

Note

The JNDIServer bean must be defined only when HornetQ is running in stand-alone mode. When HornetQ is integrated to JBoss Application Server, JBoss AS will provide a ready-to-use JNDI server without any additional configuration.

Here's the code for the example:

First we'll create a JNDI initial context from which to lookup our JMS objects:

InitialContect ic = new InitialContext();

Now we'll look up the connection factory:

ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");

And look up the Queue:

Queue orderQueue = (Queue)ic.lookup("/queues/OrderQueue");

Next we create a JMS connection using the connection factory:

Connection connection = cf.createConnection();

And we create a non transacted JMS Session, with AUTO_ACKNOWLEDGE acknowledge mode:

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

We create a MessageProducer that will send orders to the queue:

MessageProducer producer = session.createProducer(orderQueue);

And we create a MessageConsumer which will consume orders from the queue:

MessageConsumer consumer = session.createConsumer(orderQueue);

We make sure we start the connection, or delivery won't occur on it:

connection.start();

We create a simple TextMessage and send it:

TextMessage message = session.createTextMessage("This is an order");
producer.send(message);

And we consume the message:

TextMessage receivedMessage = (TextMessage)consumer.receive();
System.out.println("Got order: " + receivedMessage.getText());

It's as simple as that. For a wide range of working JMS examples please see the examples directory in the distribution.

Although it's a very common JMS usage pattern to lookup JMS Administered Objects (that's JMS Queue, Topic and ConnectionFactory instances) from JNDI, in some cases a JNDI server is not available and you still want to use JMS, or you just think "Why do I need JNDI? Why can't I just instantiate these objects directly?"

With HornetQ you can do exactly that. HornetQ supports the direct instantiation of JMS Queue, Topic and ConnectionFactory instances, so you don't have to use JNDI at all.

For a full working example of direct instantiation please see the JMS examples in Chapter 11, Examples.

Here's our simple example, rewritten to not use JNDI at all:

We create the JMS ConnectionFactory object via the HornetQJMSClient Utility class, note we need to provide connection parameters and specify which transport we are using, for more information on connectors please see Chapter 16, Configuring the Transport.

              
TransportConfiguration transportConfiguration = new TransportConfiguration(NettyConnectorFactory.class.getName());
ConnectionFactory cf = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF,transportConfiguration);

We also create the JMS Queue object via the HornetQJMSClient Utility class:

Queue orderQueue = HornetQJMSClient.createQueue("OrderQueue");

Next we create a JMS connection using the connection factory:

Connection connection = cf.createConnection();

And we create a non transacted JMS Session, with AUTO_ACKNOWLEDGE acknowledge mode:

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

We create a MessageProducer that will send orders to the queue:

MessageProducer producer = session.createProducer(orderQueue);

And we create a MessageConsumer which will consume orders from the queue:

MessageConsumer consumer = session.createConsumer(orderQueue);

We make sure we start the connection, or delivery won't occur on it:

connection.start();

We create a simple TextMessage and send it:

TextMessage message = session.createTextMessage("This is an order");
producer.send(message);

And we consume the message:

TextMessage receivedMessage = (TextMessage)consumer.receive();
System.out.println("Got order: " + receivedMessage.getText());

HornetQ core is a completely JMS-agnostic messaging system with its own non-JMS API. We call this the core API.

If you don't want to use JMS you can use the core API directly. The core API provides all the functionality of JMS but without much of the complexity. It also provides features that are not available using JMS.

Some of the core messaging concepts are similar to JMS concepts, but core messaging concepts differ in some ways. In general the core messaging API is simpler than the JMS API, since we remove distinctions between queues, topics and subscriptions. We'll discuss each of the major core messaging concepts in turn, but to see the API in detail, please consult the Javadoc.

A client uses a ClientSession for consuming and producing messages and for grouping them in transactions. ClientSession instances can support both transactional and non transactional semantics and also provide an XAResource interface so messaging operations can be performed as part of a JTA transaction.

ClientSession instances group ClientConsumers and ClientProducers.

ClientSession instances can be registered with an optional SendAcknowledgementHandler. This allows your client code to be notified asynchronously when sent messages have successfully reached the server. This unique HornetQ feature, allows you to have full guarantees that sent messages have reached the server without having to block on each message sent until a response is received. Blocking on each messages sent is costly since it requires a network round trip for each message sent. By not blocking and receiving send acknowledgements asynchronously you can create true end to end asynchronous systems which is not possible using the standard JMS API. For more information on this advanced feature please see the section Chapter 20, Guarantees of sends and commits.

This chapter describes how JMS destinations are mapped to HornetQ addresses.

HornetQ core is JMS-agnostic. It does not have any concept of a JMS topic. A JMS topic is implemented in core as an address (the topic name) with zero or more queues bound to it. Each queue bound to that address represents a topic subscription. Likewise, a JMS queue is implemented as an address (the JMS queue name) with one single queue bound to it which represents the JMS queue.

By convention, all JMS queues map to core queues where the core queue name has the string jms.queue. prepended to it. E.g. the JMS queue with the name "orders.europe" would map to the core queue with the name "jms.queue.orders.europe". The address at which the core queue is bound is also given by the core queue name.

For JMS topics the address at which the queues that represent the subscriptions are bound is given by prepending the string "jms.topic." to the name of the JMS topic. E.g. the JMS topic with name "news.europe" would map to the core address "jms.topic.news.europe"

In other words if you send a JMS message to a JMS queue with name "orders.europe" it will get routed on the server to any core queues bound to the address "jms.queue.orders.europe". If you send a JMS message to a JMS topic with name "news.europe" it will get routed on the server to any core queues bound to the address "jms.topic.news.europe".

If you want to configure settings for a JMS Queue with the name "orders.europe", you need to configure the corresponding core queue "jms.queue.orders.europe":

<!-- expired messages in JMS Queue "orders.europe" will be sent to the JMS Queue "expiry.europe" -->
<address-setting match="jms.queue.orders.europe">
   <expiry-address>jms.queue.expiry.europe</expiry-address>
   ...
</address-setting>
11.1. JMS Examples
11.1.1. Applet
11.1.2. Application-Layer Failover
11.1.3. Core Bridge Example
11.1.4. Browser
11.1.5. Client Kickoff
11.1.6. Client-Side Load-Balancing
11.1.7. Clustered Durable Subscription
11.1.8. Clustered Grouping
11.1.9. Clustered Queue
11.1.10. Clustering with JGroups
11.1.11. Clustered Standalone
11.1.12. Clustered Static Discovery
11.1.13. Clustered Static Cluster One Way
11.1.14. Clustered Topic
11.1.15. Message Consumer Rate Limiting
11.1.16. Dead Letter
11.1.17. Delayed Redelivery
11.1.18. Divert
11.1.19. Durable Subscription
11.1.20. Embedded
11.1.21. Embedded Simple
11.1.22. Message Expiration
11.1.23. Failover Manual Stop
11.1.24. HTTP Transport
11.1.25. Instantiate JMS Objects Directly
11.1.26. Interceptor
11.1.27. JAAS
11.1.28. JMS Bridge
11.1.29. JMX Management
11.1.30. Large Message
11.1.31. Last-Value Queue
11.1.32. Management
11.1.33. Management Notification
11.1.34. Message Counter
11.1.35. Message Group
11.1.36. Message Group
11.1.37. Message Priority
11.1.38. Multiple Failover
11.1.39. Multiple Failover Failback
11.1.40. No Consumer Buffering
11.1.41. Non-Transaction Failover With Server Data Replication
11.1.42. Paging
11.1.43. Pre-Acknowledge
11.1.44. Message Producer Rate Limiting
11.1.45. Queue
11.1.46. Message Redistribution
11.1.47. Queue Requestor
11.1.48. Queue with Message Selector
11.1.49. Reattach Node example
11.1.50. Request-Reply example
11.1.51. Scheduled Message
11.1.52. Security
11.1.53. Send Acknowledgements
11.1.54. Spring Integration
11.1.55. SSL Transport
11.1.56. Static Message Selector
11.1.57. Static Message Selector Using JMS
11.1.58. Stomp
11.1.59. Stomp1.1
11.1.60. Stomp Over Web Sockets
11.1.61. Symmetric Cluster
11.1.62. Temporary Queue
11.1.63. Topic
11.1.64. Topic Hierarchy
11.1.65. Topic Selector 1
11.1.66. Topic Selector 2
11.1.67. Transaction Failover
11.1.68. Transactional Session
11.1.69. XA Heuristic
11.1.70. XA Receive
11.1.71. XA Send
11.1.72. XA with Transaction Manager
11.2. Core API Examples
11.2.1. Embedded
11.3. Java EE Examples
11.3.1. EJB/JMS Transaction
11.3.2. HAJNDI (High Availability)
11.3.3. Resource Adapter Configuration
11.3.4. Resource Adapter Remote Server Configuration
11.3.5. JMS Bridge
11.3.6. MDB (Message Driven Bean)
11.3.7. Servlet Transport
11.3.8. Servlet SSL Transport
11.3.9. XA Recovery

The HornetQ distribution comes with over 70 run out-of-the-box examples demonstrating many of the features.

The examples are available in the distribution, in the examples directory. Examples are split into JMS and core examples. JMS examples show how a particular feature can be used by a normal JMS client. Core examples show how the equivalent feature can be used by a core messaging client.

A set of Java EE examples are also provided which need the JBoss Application Server installed to be able to run.

To run a JMS example, simply cd into the appropriate example directory and type ./build.sh (or build.bat if you are on Windows).

Here's a listing of the examples with a brief description.

HornetQ uses a specific syntax for representing wildcards in security settings, address settings and when creating consumers.

The syntax is similar to that used by AMQP.

A HornetQ wildcard expression contains words delimited by the character '.' (full stop).

The special characters '#' and '*' also have special meaning and can take the place of a word.

The character '#' means 'match any sequence of zero or more words'.

The character '*' means 'match a single word'.

So the wildcard 'news.europe.#' would match 'news.europe', 'news.europe.sport', 'news.europe.politics', and 'news.europe.politics.regional' but would not match 'news.usa', 'news.usa.sport' nor 'entertainment'.

The wildcard 'news.*' would match 'news.europe', but not 'news.europe.sport'.

The wildcard 'news.*.sport' would match 'news.europe.sport' and also 'news.usa.sport', but not 'news.europe.politics'.

HornetQ provides a powerful filter language based on a subset of the SQL 92 expression syntax.

It is the same as the syntax used for JMS selectors, but the predefined identifiers are different. For documentation on JMS selector syntax please the JMS javadoc for javax.jms.Message.

Filter expressions are used in several places in HornetQ

  • Predefined Queues. When pre-defining a queue, either in hornetq-configuration.xml or hornetq-jms.xml a filter expression can be defined for a queue. Only messages that match the filter expression will enter the queue.

  • Core bridges can be defined with an optional filter expression, only matching messages will be bridged (see Chapter 36, Core Bridges).

  • Diverts can be defined with an optional filter expression, only matching messages will be diverted (see Chapter 35, Diverting and Splitting Message Flows).

  • Filter are also used programmatically when creating consumers, queues and in several places as described in Chapter 30, Management.

There are some differences between JMS selector expressions and HornetQ core filter expressions. Whereas JMS selector expressions operate on a JMS message, HornetQ core filter expressions operate on a core message.

The following identifiers can be used in a core filter expressions to refer to attributes of the core message in an expression:

  • HQPriority. To refer to the priority of a message. Message priorities are integers with valid values from 0 - 9. 0 is the lowest priority and 9 is the highest. E.g. HQPriority = 3 AND animal = 'aardvark'

  • HQExpiration. To refer to the expiration time of a message. The value is a long integer.

  • HQDurable. To refer to whether a message is durable or not. The value is a string with valid values: DURABLE or NON_DURABLE.

  • HQTimestamp. The timestamp of when the message was created. The value is a long integer.

  • HQSize. The size of a message in bytes. The value is an integer.

Any other identifiers used in core filter expressions will be assumed to be properties of the message.

In this chapter we will describe how persistence works with HornetQ and how to configure it.

HornetQ ships with a high performance journal. Since HornetQ handles its own persistence, rather than relying on a database or other 3rd party persistence engine it is very highly optimised for the specific messaging use cases.

A HornetQ journal is an append only journal. It consists of a set of files on disk. Each file is pre-created to a fixed size and initially filled with padding. As operations are performed on the server, e.g. add message, update message, delete message, records are appended to the journal. When one journal file is full we move to the next one.

Because records are only appended, i.e. added to the end of the journal we minimise disk head movement, i.e. we minimise random access operations which is typically the slowest operation on a disk.

Making the file size configurable means that an optimal size can be chosen, i.e. making each file fit on a disk cylinder. Modern disk topologies are complex and we are not in control over which cylinder(s) the file is mapped onto so this is not an exact science. But by minimising the number of disk cylinders the file is using, we can minimise the amount of disk head movement, since an entire disk cylinder is accessible simply by the disk rotating - the head does not have to move.

As delete records are added to the journal, HornetQ has a sophisticated file garbage collection algorithm which can determine if a particular journal file is needed any more - i.e. has all its data been deleted in the same or other files. If so, the file can be reclaimed and re-used.

HornetQ also has a compaction algorithm which removes dead space from the journal and compresses up the data so it takes up less files on disk.

The journal also fully supports transactional operation if required, supporting both local and XA transactions.

The majority of the journal is written in Java, however we abstract out the interaction with the actual file system to allow different pluggable implementations. HornetQ ships with two implementations:

The standard HornetQ core server uses two instances of the journal:

  • Bindings journal.

    This journal is used to store bindings related data. That includes the set of queues that are deployed on the server and their attributes. It also stores data such as id sequence counters.

    The bindings journal is always a NIO journal as it is typically low throughput compared to the message journal.

    The files on this journal are prefixed as hornetq-bindings. Each file has a bindings extension. File size is 1048576, and it is located at the bindings folder.

  • JMS journal.

    This journal instance stores all JMS related data, This is basically any JMS Queues, Topics and Connection Factories and any JNDI bindings for these resources.

    Any JMS Resources created via the management API will be persisted to this journal. Any resources configured via configuration files will not. The JMS Journal will only be created if JMS is being used.

    The files on this journal are prefixed as hornetq-jms. Each file has a jms extension. File size is 1048576, and it is located at the bindings folder.

  • Message journal.

    This journal instance stores all message related data, including the message themselves and also duplicate-id caches.

    By default HornetQ will try and use an AIO journal. If AIO is not available, e.g. the platform is not Linux with the correct kernel version or AIO has not been installed then it will automatically fall back to using Java NIO which is available on any Java platform.

    The files on this journal are prefixed as hornetq-data. Each file has a hq extension. File size is by the default 10485760 (configurable), and it is located at the journal folder.

For large messages, HornetQ persists them outside the message journal. This is discussed in Chapter 23, Large Messages.

HornetQ can also be configured to page messages to disk in low memory situations. This is discussed in Chapter 24, Paging.

If no persistence is required at all, HornetQ can also be configured not to persist any data at all to storage as discussed in Section 15.6, “Configuring HornetQ for Zero Persistence”.

The message journal is configured using the following attributes in hornetq-configuration.xml

Warning

Most disks contain hardware write caches. A write cache can increase the apparent performance of the disk because writes just go into the cache and are then lazily written to the disk later.

This happens irrespective of whether you have executed a fsync() from the operating system or correctly synced data from inside a Java program!

By default many systems ship with disk write cache enabled. This means that even after syncing from the operating system there is no guarantee the data has actually made it to disk, so if a failure occurs, critical data can be lost.

Some more expensive disks have non volatile or battery backed write caches which won't necessarily lose data on event of failure, but you need to test them!

If your disk does not have an expensive non volatile or battery backed cache and it's not part of some kind of redundant array (e.g. RAID), and you value your data integrity you need to make sure disk write cache is disabled.

Be aware that disabling disk write cache can give you a nasty shock performance wise. If you've been used to using disks with write cache enabled in their default setting, unaware that your data integrity could be compromised, then disabling it will give you an idea of how fast your disk can perform when acting really reliably.

On Linux you can inspect and/or change your disk's write cache settings using the tools hdparm (for IDE disks) or sdparm or sginfo (for SDSI/SATA disks)

On Windows you can check / change the setting by right clicking on the disk and clicking properties.

HornetQ has a fully pluggable and highly flexible transport layer and defines its own Service Provider Interface (SPI) to make plugging in a new transport provider relatively straightforward.

In this chapter we'll describe the concepts required for understanding HornetQ transports and where and how they're configured.

One of the most important concepts in HornetQ transports is the acceptor. Let's dive straight in and take a look at an acceptor defined in xml in the configuration file hornetq-configuration.xml.

<acceptors>
   <acceptor name="netty">
      <factory-class>
         org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory
      </factory-class>
      <param key="port" value="5446"/>
   </acceptor>
</acceptors>

Acceptors are always defined inside an acceptors element. There can be one or more acceptors defined in the acceptors element. There's no upper limit to the number of acceptors per server.

Each acceptor defines a way in which connections can be made to the HornetQ server.

In the above example we're defining an acceptor that uses Netty to listen for connections at port 5446.

The acceptor element contains a sub-element factory-class, this element defines the factory used to create acceptor instances. In this case we're using Netty to listen for connections so we use the Netty implementation of an AcceptorFactory to do this. Basically, the factory-class element determines which pluggable transport we're going to use to do the actual listening.

The acceptor element can also be configured with zero or more param sub-elements. Each param element defines a key-value pair. These key-value pairs are used to configure the specific transport, the set of valid key-value pairs depends on the specific transport be used and are passed straight through to the underlying transport.

Examples of key-value pairs for a particular transport would be, say, to configure the IP address to bind to, or the port to listen at.

Whereas acceptors are used on the server to define how we accept connections, connectors are used by a client to define how it connects to a server.

Let's look at a connector defined in our hornetq-configuration.xml file:

<connectors>
   <connector name="netty">
      <factory-class>
         org.hornetq.core.remoting.impl.netty.NettyConnectorFactory
      </factory-class>
      <param key="port" value="5446"/>
   </connector>
</connectors>

Connectors can be defined inside a connectors element. There can be one or more connectors defined in the connectors element. There's no upper limit to the number of connectors per server.

You make ask yourself, if connectors are used by the client to make connections then why are they defined on the server? There are a couple of reasons for this:

How do we configure a core ClientSessionFactory with the information that it needs to connect with a server?

Connectors are also used indirectly when directly configuring a core ClientSessionFactory to directly talk to a server. Although in this case there's no need to define such a connector in the server side configuration, instead we just create the parameters and tell the ClientSessionFactory which connector factory to use.

Here's an example of creating a ClientSessionFactory which will connect directly to the acceptor we defined earlier in this chapter, it uses the standard Netty TCP transport and will try and connect on port 5446 to localhost (default):

Map<String, Object> connectionParams = new HashMap<String, Object>();
    
connectionParams.put(org.hornetq.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, 
                    5446);

TransportConfiguration transportConfiguration = 
    new TransportConfiguration(
    "org.hornetq.core.remoting.impl.netty.NettyConnectorFactory", 
    connectionParams);

ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(transportConfiguration);

ClientSessionFactory sessionFactory = locator.createClientSessionFactory();

ClientSession session = sessionFactory.createSession(...);

etc

Similarly, if you're using JMS, you can configure the JMS connection factory directly on the client side without having to define a connector on the server side or define a connection factory in hornetq-jms.xml:

Map<String, Object> connectionParams = new HashMap<String, Object>();

connectionParams.put(org.hornetq.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, 5446);

TransportConfiguration transportConfiguration = 
    new TransportConfiguration(
    "org.hornetq.core.remoting.impl.netty.NettyConnectorFactory", 
    connectionParams);

ConnectionFactory connectionFactory = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, transportConfiguration);

Connection jmsConnection = connectionFactory.createConnection();

etc

Out of the box, HornetQ currently uses Netty, a high performance low level network library.

Our Netty transport can be configured in several different ways; to use old (blocking) Java IO, or NIO (non-blocking), also to use straightforward TCP sockets, SSL, or to tunnel over HTTP or HTTPS, on top of that we also provide a servlet transport.

We believe this caters for the vast majority of transport requirements.

Netty TCP is a simple unencrypted TCP sockets based transport. Netty TCP can be configured to use old blocking Java IO or non blocking Java NIO. We recommend you use the Java NIO on the server side for better scalability with many concurrent connections. However using Java old IO can sometimes give you better latency than NIO when you're not so worried about supporting many thousands of concurrent connections.

If you're running connections across an untrusted network please bear in mind this transport is unencrypted. You may want to look at the SSL or HTTPS configurations.

With the Netty TCP transport all connections are initiated from the client side. I.e. the server does not initiate any connections to the client. This works well with firewall policies that typically only allow connections to be initiated in one direction.

All the valid Netty transport keys are defined in the class org.hornetq.core.remoting.impl.netty.TransportConstants. Most parameters can be used either with acceptors or connectors, some only work with acceptors. The following parameters can be used to configure Netty for simple TCP:

We also provide a Netty servlet transport for use with HornetQ. The servlet transport allows HornetQ traffic to be tunneled over HTTP to a servlet running in a servlet engine which then redirects it to an in-VM HornetQ server.

The servlet transport differs from the Netty HTTP transport in that, with the HTTP transport HornetQ effectively acts a web server listening for HTTP traffic on, e.g. port 80 or 8080, whereas with the servlet transport HornetQ traffic is proxied through a servlet engine which may already be serving web site or other applications. This allows HornetQ to be used where corporate policies may only allow a single web server listening on an HTTP port, and this needs to serve all applications including messaging.

Please see the examples for a full working example of the servlet transport being used.

To configure a servlet engine to work the Netty Servlet transport we need to do the following things:

Heres a list of the init params and what they are used for

The servlet pattern configured in the web.xml is the path of the URL that is used. The connector param servlet-path on the connector config must match this using the application context of the web app if there is one.

Its also possible to use the servlet transport over SSL. simply add the following configuration to the connector:

<connector name="netty-servlet">
   <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
   <param key="host" value="localhost"/>
   <param key="port" value="8443"/>
   <param key="use-servlet" value="true"/>
   <param key="servlet-path" value="/messaging/HornetQServlet"/>
   <param key="ssl-enabled" value="true"/>
   <param key="key-store-path" value="path to a keystoree"/>
   <param key="key-store-password" value="keystore password"/>
</connector>

You will also have to configure the Application server to use a KeyStore. Edit the server.xml file that can be found under server/default/deploy/jbossweb.sar of the Application Server installation and edit the SSL/TLS connector configuration to look like the following:

<Connector protocol="HTTP/1.1" SSLEnabled="true"
     port="8443" address="${jboss.bind.address}"
     scheme="https" secure="true" clientAuth="false"
     keystoreFile="path to a keystore"
     keystorePass="keystore password" sslProtocol = "TLS" />

In both cases you will need to provide a keystore and password. Take a look at the servlet ssl example shipped with HornetQ for more detail.

In this section we will discuss connection time-to-live (TTL) and explain how HornetQ deals with crashed clients and clients which have exited without cleanly closing their resources.

Before a HornetQ client application exits it is considered good practice that it should close its resources in a controlled manner, using a finally block.

Here's an example of a well behaved core client application closing its session and session factory in a finally block:

ServerLocator locator = null;
ClientSessionFactory sf = null;
ClientSession session = null;

try
{
   locator = HornetQClient.createServerLocatorWithoutHA(..);

   sf = locator.createClientSessionFactory();;

   session = sf.createSession(...);
   
   ... do some stuff with the session...
}
finally
{
   if (session != null)
   {
      session.close();
   }
   
   if (sf != null)
   {
      sf.close();
   }

   if(locator != null)
   {
      locator.close();
   }
}

And here's an example of a well behaved JMS client application:

Connection jmsConnection = null;

try
{
   ConnectionFactory jmsConnectionFactory = HornetQJMSClient.createConnectionFactoryWithoutHA(...);

   jmsConnection = jmsConnectionFactory.createConnection();

   ... do some stuff with the connection...
}
finally
{
   if (connection != null)
   {
      connection.close();
   }
}

Unfortunately users don't always write well behaved applications, and sometimes clients just crash so they don't have a chance to clean up their resources!

If this occurs then it can leave server side resources, like sessions, hanging on the server. If these were not removed they would cause a resource leak on the server and over time this result in the server running out of memory or other resources.

We have to balance the requirement for cleaning up dead client resources with the fact that sometimes the network between the client and the server can fail and then come back, allowing the client to reconnect. HornetQ supports client reconnection, so we don't want to clean up "dead" server side resources too soon or this will prevent any client from reconnecting, as it won't be able to find its old sessions on the server.

HornetQ makes all of this configurable. For each ClientSessionFactory we define a connection TTL. Basically, the TTL determines how long the server will keep a connection alive in the absence of any data arriving from the client. The client will automatically send "ping" packets periodically to prevent the server from closing it down. If the server doesn't receive any packets on a connection for the connection TTL time, then it will automatically close all the sessions on the server that relate to that connection.

If you're using JMS, the connection TTL is defined by the ConnectionTTL attribute on a HornetQConnectionFactory instance, or if you're deploying JMS connection factory instances direct into JNDI on the server side, you can specify it in the xml config, using the parameter connection-ttl.

The default value for connection ttl is 60000ms, i.e. 1 minute. A value of -1 for ConnectionTTL means the server will never time out the connection on the server side.

If you do not wish clients to be able to specify their own connection TTL, you can override all values used by a global value set on the server side. This can be done by specifying the connection-ttl-override attribute in the server side configuration. The default value for connection-ttl-override is -1 which means "do not override" (i.e. let clients use their own values).

Flow control is used to limit the flow of data between a client and server, or a server and another server in order to prevent the client or server being overwhelmed with data.

This controls the flow of data between the server and the client as the client consumes messages. For performance reasons clients normally buffer messages before delivering to the consumer via the receive() method or asynchronously via a message listener. If the consumer cannot process messages as fast as they are being delivered and stored in the internal buffer, then you could end up with a situation where messages would keep building up possibly causing out of memory on the client if they cannot be processed in time.

By default, HornetQ consumers buffer messages from the server in a client side buffer before the client consumes them. This improves performance: otherwise every time the client consumes a message, HornetQ would have to go the server to request the next message. In turn, this message would then get sent to the client side, if one was available.

A network round trip would be involved for every message and considerably reduce performance.

To prevent this, HornetQ pre-fetches messages into a buffer on each consumer. The total maximum size of messages (in bytes) that will be buffered on each consumer is determined by the consumer-window-size parameter.

By default, the consumer-window-size is set to 1 MiB (1024 * 1024 bytes).

The value can be:

Setting the consumer window size can considerably improve performance depending on the messaging use case. As an example, let's consider the two extremes:

Fast consumers

Fast consumers can process messages as fast as they consume them (or even faster)

To allow fast consumers, set the consumer-window-size to -1. This will allow unbounded message buffering on the client side.

Use this setting with caution: it can overflow the client memory if the consumer is not able to process messages as fast as it receives them.

Slow consumers

Slow consumers takes significant time to process each message and it is desirable to prevent buffering messages on the client side so that they can be delivered to another consumer instead.

Consider a situation where a queue has 2 consumers; 1 of which is very slow. Messages are delivered in a round robin fashion to both consumers, the fast consumer processes all of its messages very quickly until its buffer is empty. At this point there are still messages awaiting to be processed in the buffer of the slow consumer thus preventing them being processed by the fast consumer. The fast consumer is therefore sitting idle when it could be processing the other messages.

To allow slow consumers, set the consumer-window-size to 0 (for no buffer at all). This will prevent the slow consumer from buffering any messages on the client side. Messages will remain on the server side ready to be consumed by other consumers.

Setting this to 0 can give deterministic distribution between multiple consumers on a queue.

Most of the consumers cannot be clearly identified as fast or slow consumers but are in-between. In that case, setting the value of consumer-window-size to optimize performance depends on the messaging use case and requires benchmarks to find the optimal value, but a value of 1MiB is fine in most cases.

It is also possible to control the rate at which a consumer can consume messages. This is a form of throttling and can be used to make sure that a consumer never consumes messages at a rate faster than the rate specified.

The rate must be a positive integer to enable this functionality and is the maximum desired message consumption rate specified in units of messages per second. Setting this to -1 disables rate limited flow control. The default value is -1.

Please see Section 11.1.15, “Message Consumer Rate Limiting” for a working example of limiting consumer rate.

HornetQ also can limit the amount of data sent from a client to a server to prevent the server being overwhelmed.

In a similar way to consumer window based flow control, HornetQ producers, by default, can only send messages to an address as long as they have sufficient credits to do so. The amount of credits required to send a message is given by the size of the message.

As producers run low on credits they request more from the server, when the server sends them more credits they can send more messages.

The amount of credits a producer requests in one go is known as the window size.

The window size therefore determines the amount of bytes that can be in-flight at any one time before more need to be requested - this prevents the remoting connection from getting overloaded.

Normally the server will always give the same number of credits as have been requested. However, it is also possible to set a maximum size on any address, and the server will never send more credits than could cause the address's upper memory limit to be exceeded.

For example, if I have a JMS queue called "myqueue", I could set the maximum memory size to 10MiB, and the the server will control the number of credits sent to any producers which are sending any messages to myqueue such that the total messages in the queue never exceeds 10MiB.

When the address gets full, producers will block on the client side until more space frees up on the address, i.e. until messages are consumed from the queue thus freeing up space for more messages to be sent.

We call this blocking producer flow control, and it's an efficient way to prevent the server running out of memory due to producers sending more messages than can be handled at any time.

It is an alternative approach to paging, which does not block producers but instead pages messages to storage.

To configure an address with a maximum size and tell the server that you want to block producers for this address if it becomes full, you need to define an AddressSettings (Section 25.3, “Configuring Queues Via Address Settings”) block for the address and specify max-size-bytes and address-full-policy

The address block applies to all queues registered to that address. I.e. the total memory for all queues bound to that address will not exceed max-size-bytes. In the case of JMS topics this means the total memory of all subscriptions in the topic won't exceed max-size-bytes.

Here's an example:

<address-settings>
   <address-setting match="jms.queue.exampleQueue">
      <max-size-bytes>100000</max-size-bytes>
      <address-full-policy>BLOCK</address-full-policy>
   </address-setting>
</address-settings>

The above example would set the max size of the JMS queue "exampleQueue" to be 100000 bytes and would block any producers sending to that address to prevent that max size being exceeded.

Note the policy must be set to BLOCK to enable blocking producer flow control.

Note

Note that in the default configuration all addresses are set to block producers after 10 MiB of message data is in the address. This means you cannot send more than 10MiB of message data to an address without it being consumed before the producers will be blocked. If you do not want this behaviour increase the max-size-bytes parameter or change the address full message policy.

If you are sending messages to a server using a non transacted session, HornetQ can be configured to block the call to send until the message has definitely reached the server, and a response has been sent back to the client. This can be configured individually for durable and non-durable messages, and is determined by the following two parameters:

Setting block on sends to true can reduce performance since each send requires a network round trip before the next send can be performed. This means the performance of sending messages will be limited by the network round trip time (RTT) of your network, rather than the bandwidth of your network. For better performance we recommend either batching many messages sends together in a transaction since with a transactional session, only the commit / rollback blocks not every send, or, using HornetQ's advanced asynchronous send acknowledgements feature described in Section 20.4, “Asynchronous Send Acknowledgements”.

If you are using JMS and you're using the JMS service on the server to load your JMS connection factory instances into JNDI then these parameters can be configured in hornetq-jms.xml using the elements block-on-durable-send and block-on-non-durable-send. If you're using JMS but not using JNDI then you can set these values directly on the HornetQConnectionFactory instance using the appropriate setter methods.

If you're using core you can set these values directly on the ClientSessionFactory instance using the appropriate setter methods.

When the server receives a message sent from a non transactional session, and that message is durable and the message is routed to at least one durable queue, then the server will persist the message in permanent storage. If the journal parameter journal-sync-non-transactional is set to true the server will not send a response back to the client until the message has been persisted and the server has a guarantee that the data has been persisted to disk. The default value for this parameter is true.

If you are using a non transacted session but want a guarantee that every message sent to the server has reached it, then, as discussed in Section 20.2, “Guarantees of Non Transactional Message Sends”, you can configure HornetQ to block the call to send until the server has received the message, persisted it and sent back a response. This works well but has a severe performance penalty - each call to send needs to block for at least the time of a network round trip (RTT) - the performance of sending is thus limited by the latency of the network, not limited by the network bandwidth.

Let's do a little bit of maths to see how severe that is. We'll consider a standard 1Gib ethernet network with a network round trip between the server and the client of 0.25 ms.

With a RTT of 0.25 ms, the client can send at most 1000/ 0.25 = 4000 messages per second if it blocks on each message send.

If each message is < 1500 bytes and a standard 1500 bytes MTU size is used on the network, then a 1GiB network has a theoretical upper limit of (1024 * 1024 * 1024 / 8) / 1500 = 89478 messages per second if messages are sent without blocking! These figures aren't an exact science but you can clearly see that being limited by network RTT can have serious effect on performance.

To remedy this, HornetQ provides an advanced new feature called asynchronous send acknowledgements. With this feature, HornetQ can be configured to send messages without blocking in one direction and asynchronously getting acknowledgement from the server that the messages were received in a separate stream. By de-coupling the send from the acknowledgement of the send, the system is not limited by the network RTT, but is limited by the network bandwidth. Consequently better throughput can be achieved than is possible using a blocking approach, while at the same time having absolute guarantees that messages have successfully reached the server.

The window size for send acknowledgements is determined by the confirmation-window-size parameter on the connection factory or client session factory. Please see Chapter 34, Client Reconnection and Session Reattachment for more info on this.

Messages can be delivered unsuccessfully (e.g. if the transacted session used to consume them is rolled back). Such a message goes back to its queue ready to be redelivered. However, this means it is possible for a message to be delivered again and again without any success and remain in the queue, clogging the system.

There are 2 ways to deal with these undelivered messages:

  • Delayed redelivery.

    It is possible to delay messages redelivery to let the client some time to recover from transient failures and not overload its network or CPU resources

  • Dead Letter Address.

    It is also possible to configure a dead letter address so that after a specified number of unsuccessful deliveries, messages are removed from the queue and will not be delivered again

Both options can be combined for maximum flexibility.

To prevent a client infinitely receiving the same undelivered message (regardless of what is causing the unsuccessful deliveries), messaging systems define dead letter addresses: after a specified unsuccessful delivery attempts, the message is removed from the queue and send instead to a dead letter address.

Any such messages can then be diverted to queue(s) where they can later be perused by the system administrator for action to be taken.

HornetQ's addresses can be assigned a dead letter address. Once the messages have be unsuccessfully delivered for a given number of attempts, they are removed from the queue and sent to the dead letter address. These dead letter messages can later be consumed for further inspection.

See Section 11.1.16, “Dead Letter” for an example which shows how dead letter is configured and used with JMS.

Messages can be set with an optional time to live when sending them.

HornetQ will not deliver a message to a consumer after it's time to live has been exceeded. If the message hasn't been delivered by the time that time to live is reached the server can discard it.

HornetQ's addresses can be assigned a expiry address so that, when messages are expired, they are removed from the queue and sent to the expiry address. Many different queues can be bound to an expiry address. These expired messages can later be consumed for further inspection.

See Section 11.1.22, “Message Expiration” for an example which shows how message expiry is configured and used with JMS.

HornetQ supports sending and receiving of huge messages, even when the client and server are running with limited memory. The only realistic limit to the size of a message that can be sent or consumed is the amount of disk space you have available. We have tested sending and consuming messages up to 8 GiB in size with a client and server running in just 50MiB of RAM!

To send a large message, the user can set an InputStream on a message body, and when that message is sent, HornetQ will read the InputStream. A FileInputStream could be used for example to send a huge message from a huge file on disk.

As the InputStream is read the data is sent to the server as a stream of fragments. The server persists these fragments to disk as it receives them and when the time comes to deliver them to a consumer they are read back of the disk, also in fragments and sent down the wire. When the consumer receives a large message it initially receives just the message with an empty body, it can then set an OutputStream on the message to stream the huge message body to a file on disk or elsewhere. At no time is the entire message body stored fully in memory, either on the client or the server.

Any message larger than a certain size is considered a large message. Large messages will be split up and sent in fragments. This is determined by the parameter min-large-message-size

The default value is 100KiB.

HornetQ supports setting the body of messages using input and output streams (java.lang.io)

These streams are then used directly for sending (input streams) and receiving (output streams) messages.

When receiving messages there are 2 ways to deal with the output stream; you may choose to block while the output stream is recovered using the method ClientMessage.saveOutputStream or alternatively using the method ClientMessage.setOutputstream which will asynchronously write the message to the stream. If you choose the latter the consumer must be kept alive until the message has been fully received.

You can use any kind of stream you like. The most common use case is to send files stored in your disk, but you could also send things like JDBC Blobs, SocketInputStream, things you recovered from HTTPRequests etc. Anything as long as it implements java.io.InputStream for sending messages or java.io.OutputStream for receiving them.

When using JMS, HornetQ maps the streaming methods on the core API (see Table 23.1, “org.hornetq.api.core.client.ClientMessage API”) by setting object properties . You can use the method Message.setObjectProperty to set the input and output streams.

The InputStream can be defined through the JMS Object Property JMS_HQ_InputStream on messages being sent:

BytesMessage message = session.createBytesMessage();

FileInputStream fileInputStream = new FileInputStream(fileInput);

BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream);

message.setObjectProperty("JMS_HQ_InputStream", bufferedInput);

someProducer.send(message);

The OutputStream can be set through the JMS Object Property JMS_HQ_SaveStream on messages being received in a blocking way.

BytesMessage messageReceived = (BytesMessage)messageConsumer.receive(120000);
                
File outputFile = new File("huge_message_received.dat");
                
FileOutputStream fileOutputStream = new FileOutputStream(outputFile);
                
BufferedOutputStream bufferedOutput = new BufferedOutputStream(fileOutputStream);
                
// This will block until the entire content is saved on disk
messageReceived.setObjectProperty("JMS_HQ_SaveStream", bufferedOutput);

Setting the OutputStream could also be done in a non blocking way using the property JMS_HQ_OutputStream.

// This won't wait the stream to finish. You need to keep the consumer active.
messageReceived.setObjectProperty("JMS_HQ_OutputStream", bufferedOutput);

Note

When using JMS, Streaming large messages are only supported on StreamMessage and BytesMessage.

Please see Section 11.1.30, “Large Message” for an example which shows how large message is configured and used with JMS.

HornetQ transparently supports huge queues containing millions of messages while the server is running with limited memory.

In such a situation it's not possible to store all of the queues in memory at any one time, so HornetQ transparently pages messages into and out of memory as they are needed, thus allowing massive queues with a low memory footprint.

HornetQ will start paging messages to disk, when the size of all messages in memory for an address exceeds a configured maximum size.

By default, HornetQ does not page messages - this must be explicitly configured to activate it.

As soon as messages delivered to an address exceed the configured size, that address alone goes into page mode.

See Section 11.1.42, “Paging” for an example which shows how to use paging with HornetQ.

Queue attributes can be set in one of two ways. Either by configuring them using the configuration file or by using the core API. This chapter will explain how to configure each attribute and what effect the attribute has.

Queues can be predefined via configuration at a core level or at a JMS level. Firstly lets look at a JMS level.

The following shows a queue predefined in the hornetq-jms.xml configuration file.

<queue name="selectorQueue">
   <entry name="/queue/selectorQueue"/>
   <selector string="color='red'"/>
   <durable>true</durable>
</queue>

This name attribute of queue defines the name of the queue. When we do this at a jms level we follow a naming convention so the actual name of the core queue will be jms.queue.selectorQueue.

The entry element configures the name that will be used to bind the queue to JNDI. This is a mandatory element and the queue can contain multiple of these to bind the same queue to different names.

The selector element defines what JMS message selector the predefined queue will have. Only messages that match the selector will be added to the queue. This is an optional element with a default of null when omitted.

The durable element specifies whether the queue will be persisted. This again is optional and defaults to true if omitted.

Secondly a queue can be predefined at a core level in the hornetq-configuration.xml file. The following is an example.

<queues>
   <queue name="jms.queue.selectorQueue">
      <address>jms.queue.selectorQueue</address>
      <filter string="color='red'"/>
      <durable>true</durable>
    </queue>
</queues>

This is very similar to the JMS configuration, with 3 real differences which are.

There are some attributes that are defined against an address wildcard rather than a specific queue. Here an example of an address-setting entry that would be found in the hornetq-configuration.xml file.

<address-settings>
   <address-setting match="jms.queue.exampleQueue">
      <dead-letter-address>jms.queue.deadLetterQueue</dead-letter-address>
      <max-delivery-attempts>3</max-delivery-attempts>
      <redelivery-delay>5000</redelivery-delay>
      <expiry-address>jms.queue.expiryQueue</expiry-address>
      <last-value-queue>true</last-value-queue>
      <max-size-bytes>100000</max-size-bytes>
      <page-size-bytes>20000</page-size-bytes>
      <redistribution-delay>0</redistribution-delay>
      <send-to-dla-on-no-route>true</send-to-dla-on-no-route>
      <address-full-policy>PAGE</address-full-policy>
   </address-setting>
</address-settings>

The idea with address settings, is you can provide a block of settings which will be applied against any adresses that match the string in the match attribute. In the above example the settings would only be applied to any addresses which exactly match the address jms.queue.exampleQueue, but you can also use wildcards to apply sets of configuration against many addresses. The wildcard syntax used is described here.

For example, if you used the match string jms.queue.# the settings would be applied to all addresses which start with jms.queue. which would be all JMS queues.

The meaning of the specific settings are explained fully throughout the user manual, however here is a brief description with a link to the appropriate chapter if available.

max-delivery-attempts defines how many time a cancelled message can be redelivered before sending to the dead-letter-address. A full explanation can be found here.

redelivery-delay defines how long to wait before attempting redelivery of a cancelled message. see here.

expiry-address defines where to send a message that has expired. see here.

expiry-delay defines the expiration time that will be used for messages which are using the default expiration time (i.e. 0). For example, if expiry-delay is set to "10" and a message which is using the default expiration time (i.e. 0) arrives then its expiration time of "0" will be changed to "10." However, if a message which is using an expiration time of "20" arrives then its expiration time will remain unchanged. Setting expiry-delay to "-1" will disable this feature. The default is "-1".

last-value-queue defines whether a queue only uses last values or not. see here.

max-size-bytes and page-size-bytes are used to set paging on an address. This is explained here.

redistribution-delay defines how long to wait when the last consumer is closed on a queue before redistributing any messages. see here.

send-to-dla-on-no-route. If a message is sent to an address, but the server does not route it to any queues, for example, there might be no queues bound to that address, or none of the queues have filters that match, then normally that message would be discarded. However if this parameter is set to true for that address, if the message is not routed to any queues it will instead be sent to the dead letter address (DLA) for that address, if it exists.

address-full-policy. This attribute can have one of the following values: PAGE, DROP, FAIL or BLOCK and determines what happens when an address where max-size-bytes is specified becomes full. The default value is PAGE. If the value is PAGE then further messages will be paged to disk. If the value is DROP then further messages will be silently dropped. If the value is FAIL then further messages will be dropped and an exception will be thrown on the client-side. If the value is BLOCK then client message producers will block when they try and send further messages. See the following chapters for more info Chapter 19, Flow Control, Chapter 24, Paging.

Last-Value queues are special queues which discard any messages when a newer message with the same value for a well-defined Last-Value property is put in the queue. In other words, a Last-Value queue only retains the last value.

A typical example for Last-Value queue is for stock prices, where you are only interested by the latest value for a particular stock.

See Section 11.1.31, “Last-Value Queue” for an example which shows how last value queues are configured and used with JMS.

Message groups are sets of messages that have the following characteristics:

  • Messages in a message group share the same group id, i.e. they have same group identifier property (JMSXGroupID for JMS, _HQ_GROUP_ID for HornetQ Core API).

  • Messages in a message group are always consumed by the same consumer, even if there are many consumers on a queue. They pin all messages with the same group id to the same consumer. If that consumer closes another consumer is chosen and will receive all messages with the same group id.

Message groups are useful when you want all messages for a certain value of the property to be processed serially by the same consumer.

An example might be orders for a certain stock. You may want orders for any particular stock to be processed serially by the same consumer. To do this you can create a pool of consumers (perhaps one for each stock, but less will work too), then set the stock name as the value of the _HQ_GROUP_ID property.

This will ensure that all messages for a particular stock will always be processed by the same consumer.

See Section 11.1.35, “Message Group” for an example which shows how message groups are configured and used with JMS.

See Section 11.1.36, “Message Group” for an example which shows how message groups are configured via a connection factory.

Using message groups in a cluster is a bit more complex. This is because messages with a particular group id can arrive on any node so each node needs to know about which group id's are bound to which consumer on which node. The consumer handling messages for a particular group id may be on a different node of the cluster, so each node needs to know this information so it can route the message correctly to the node which has that consumer.

To solve this there is the notion of a grouping handler. Each node will have its own grouping handler and when a messages is sent with a group id assigned, the handlers will decide between them which route the message should take.

There are 2 types of handlers; Local and Remote. Each cluster should choose 1 node to have a local grouping handler and all the other nodes should have remote handlers- it's the local handler that actually makes the decsion as to what route should be used, all the other remote handlers converse with this. Here is a sample config for both types of handler, this should be configured in the hornetq-configuration.xml file.

<grouping-handler name="my-grouping-handler">
   <type>LOCAL</type>
   <address>jms</address>
   <timeout>5000</timeout>
</grouping-handler>

<grouping-handler name="my-grouping-handler">
   <type>REMOTE</type>
   <address>jms</address>
   <timeout>5000</timeout>
</grouping-handler>

The address attribute refers to a cluster connection and the address it uses, refer to the clustering section on how to configure clusters. The timeout attribute referes to how long to wait for a decision to be made, an exception will be thrown during the send if this timeout is reached, this ensures that strict ordering is kept.

The decision as to where a message should be routed to is initially proposed by the node that receives the message. The node will pick a suitable route as per the normal clustered routing conditions, i.e. round robin available queues, use a local queue first and choose a queue that has a consumer. If the proposal is accepted by the grouping handlers the node will route messages to this queue from that point on, if rejected an alternative route will be offered and the node will again route to that queue indefinitely. All other nodes will also route to the queue chosen at proposal time. Once the message arrives at the queue then normal single server message group semantics take over and the message is pinned to a consumer on that queue.

You may have noticed that there is a single point of failure with the single local handler. If this node crashes then no decisions will be able to be made. Any messages sent will be not be delivered and an exception thrown. To avoid this happening Local Handlers can be replicated on another backup node. Simple create your back up node and configure it with the same Local handler.

See Section 11.1.8, “Clustered Grouping” for an example of how to configure message groups with a HornetQ cluster

JMS specifies 3 acknowledgement modes:

  • AUTO_ACKNOWLEDGE

  • CLIENT_ACKNOWLEDGE

  • DUPS_OK_ACKNOWLEDGE

HornetQ supports two additional modes: PRE_ACKNOWLEDGE and INDIVIDUAL_ACKNOWLEDGE

In some cases you can afford to lose messages in event of failure, so it would make sense to acknowledge the message on the server before delivering it to the client.

This extra mode is supported by HornetQ and will call it pre-acknowledge mode.

The disadvantage of acknowledging on the server before delivery is that the message will be lost if the system crashes after acknowledging the message on the server but before it is delivered to the client. In that case, the message is lost and will not be recovered when the system restart.

Depending on your messaging case, pre-acknowledgement mode can avoid extra network traffic and CPU at the cost of coping with message loss.

An example of a use case for pre-acknowledgement is for stock price update messages. With these messages it might be reasonable to lose a message in event of crash, since the next price update message will arrive soon, overriding the previous price.

Note

Please note, that if you use pre-acknowledge mode, then you will lose transactional semantics for messages being consumed, since clearly they are being acknowledged first on the server, not when you commit the transaction. This may be stating the obvious but we like to be clear on these things to avoid confusion!

See Section 11.1.43, “Pre-Acknowledge” for an example which shows how to use pre-acknowledgement mode with JMS.

HornetQ has an extensive management API that allows a user to modify a server configuration, create new resources (e.g. JMS queues and topics), inspect these resources (e.g. how many messages are currently held in a queue) and interact with it (e.g. to remove messages from a queue). All the operations allows a client to manage HornetQ. It also allows clients to subscribe to management notifications.

There are 3 ways to manage HornetQ:

  • Using JMX -- JMX is the standard way to manage Java applications

  • Using the core API -- management operations are sent to HornetQ server using core messages

  • Using the JMS API -- management operations are sent to HornetQ server using JMS messages

Although there are 3 different ways to manage HornetQ each API supports the same functionality. If it is possible to manage a resource using JMX it is also possible to achieve the same result using Core messages or JMS messages.

This choice depends on your requirements, your application settings and your environment to decide which way suits you best.

Regardless of the way you invoke management operations, the management API is the same.

For each managed resource, there exists a Java interface describing what can be invoked for this type of resource.

HornetQ exposes its managed resources in 2 packages:

The way to invoke a management operations depends whether JMX, core messages, or JMS messages are used.

HornetQ defines a core management API to manage core resources. For full details of the API please consult the javadoc. In summary:

The bulk of the core management API deals with core queues. The QueueControl class defines the Core queue management operations (with the ObjectName org.hornetq:module=Core,type=Queue,address="<the bound address>",name="<the queue name>" or the resource name core.queue.<the queue name>).

Most of the management operations on queues take either a single message ID (e.g. to remove a single message) or a filter (e.g. to expire all messages with a given property.)

HornetQ allows to start and stop its remote resources (acceptors, diverts, bridges, etc.) so that a server can be taken off line for a given period of time without stopping it completely (e.g. if other management operations must be performed such as resolving heuristic transactions). These resources are:

HornetQ defines a JMS Management API to manage JMS administrated objects (i.e. JMS queues, topics and connection factories).

JMS Resources (connection factories and destinations) can be created using the JMSServerControl class (with the ObjectName org.hornetq:module=JMS,type=Server or the resource name jms.server).

JMS queues can be managed using the JMSQueueControl class (with the ObjectName org.hornetq:module=JMS,type=Queue,name="<the queue name>" or the resource name jms.queue.<the queue name>).

The management operations on a JMS queue are very similar to the operations on a core queue.

HornetQ can be managed using JMX.

The management API is exposed by HornetQ using MBeans interfaces. HornetQ registers its resources with the domain org.hornetq.

For example, the ObjectName to manage a JMS Queue exampleQueue is:

org.hornetq:module=JMS,type=Queue,name="exampleQueue"

and the MBean is:

org.hornetq.api.jms.management.JMSQueueControl

The MBean's ObjectName are built using the helper class org.hornetq.api.core.management.ObjectNameBuilder. You can also use jconsole to find the ObjectName of the MBeans you want to manage.

Managing HornetQ using JMX is identical to management of any Java Applications using JMX. It can be done by reflection or by creating proxies of the MBeans.

By default, JMX is enabled to manage HornetQ. It can be disabled by setting jmx-management-enabled to false in hornetq-configuration.xml:

<!-- false to disable JMX management for HornetQ -->
<jmx-management-enabled>false</jmx-management-enabled>

If JMX is enabled, HornetQ can be managed locally using jconsole.

Note

Remote connections to JMX are not enabled by default for security reasons. Please refer to Java Management guide to configure the server for remote management (system properties must be set in run.sh or run.bat scripts).

By default, HornetQ server uses the JMX domain "org.hornetq". To manage several HornetQ servers from the same MBeanServer, the JMX domain can be configured for each individual HornetQ server by setting jmx-domain in hornetq-configuration.xml:

<!-- use a specific JMX domain for HornetQ MBeans -->
<jmx-domain>my.org.hornetq</jmx-domain>

When HornetQ is run in standalone, it uses the Java Virtual Machine's Platform MBeanServer to register its MBeans. This is configured in JBoss Microcontainer Beans file (see Section 6.7, “JBoss Microcontainer Beans File”):

<!-- MBeanServer -->
<bean name="MBeanServer" class="javax.management.MBeanServer">
   <constructor factoryClass="java.lang.management.ManagementFactory"
                   factoryMethod="getPlatformMBeanServer" />
</bean>

When it is integrated in JBoss AS 5+, it uses the Application Server's own MBean Server so that it can be managed using AS 5's jmx-console:

<!-- MBeanServer -->
<bean name="MBeanServer" class="javax.management.MBeanServer">
   <constructor factoryClass="org.jboss.mx.util.MBeanServerLocator"
                   factoryMethod="locateJBoss" />
</bean>

See Section 11.1.29, “JMX Management” for an example which shows how to use a remote connection to JMX and MBean proxies to manage HornetQ.

The core management API in HornetQ is called by sending Core messages to a special address, the management address.

Management messages are regular Core messages with well-known properties that the server needs to understand to interact with the management API:

When such a management message is sent to the management address, HornetQ server will handle it, extract the information, invoke the operation on the managed resources and send a management reply to the management message's reply-to address (specified by ClientMessageImpl.REPLYTO_HEADER_NAME).

A ClientConsumer can be used to consume the management reply and retrieve the result of the operation (if any) stored in the reply's body. For portability, results are returned as a JSON String rather than Java Serialization (the org.hornetq.api.core.management.ManagementHelper can be used to convert the JSON string to Java objects).

These steps can be simplified to make it easier to invoke management operations using Core messages:

  1. Create a ClientRequestor to send messages to the management address and receive replies

  2. Create a ClientMessage

  3. Use the helper class org.hornetq.api.core.management.ManagementHelper to fill the message with the management properties

  4. Send the message using the ClientRequestor

  5. Use the helper class org.hornetq.api.core.management.ManagementHelper to retrieve the operation result from the management reply

For example, to find out the number of messages in the core queue exampleQueue:

ClientSession session = ...
ClientRequestor requestor = new ClientRequestor(session, "jms.queue.hornetq.management");
ClientMessage message = session.createMessage(false);
ManagementHelper.putAttribute(message, "core.queue.exampleQueue", "messageCount");
session.start();
ClientMessage reply = requestor.request(m);
int count = (Integer) ManagementHelper.getResult(reply);
System.out.println("There are " + count + " messages in exampleQueue");

Management operation name and parameters must conform to the Java interfaces defined in the management packages.

Names of the resources are built using the helper class org.hornetq.api.core.management.ResourceNames and are straightforward (core.queue.exampleQueue for the Core Queue exampleQueue, jms.topic.exampleTopic for the JMS Topic exampleTopic, etc.).

Using JMS messages to manage HornetQ is very similar to using core API.

An important difference is that JMS requires a JMS queue to send the messages to (instead of an address for the core API).

The management queue is a special queue and needs to be instantiated directly by the client:

Queue managementQueue = HornetQJMSClient.createQueue("hornetq.management");

All the other steps are the same than for the Core API but they use JMS API instead:

For example, to know the number of messages in the JMS queue exampleQueue:

Queue managementQueue = HornetQJMSClient.createQueue("hornetq.management");

QueueSession session = ...
QueueRequestor requestor = new QueueRequestor(session, managementQueue);
connection.start();
Message message = session.createMessage();
JMSManagementHelper.putAttribute(message, "jms.queue.exampleQueue", "messageCount");
Message reply = requestor.request(message);
int count = (Integer)JMSManagementHelper.getResult(reply);
System.out.println("There are " + count + " messages in exampleQueue");

See Section 11.1.32, “Management” for an example which shows how to use JMS messages to manage HornetQ server.

HornetQ emits notifications to inform listeners of potentially interesting events (creation of new resources, security violation, etc.).

These notifications can be received by 3 different ways:

If JMX is enabled (see Section 30.2.1, “Configuring JMX”), JMX notifications can be received by subscribing to 2 MBeans:

  • org.hornetq:module=Core,type=Server for notifications on Core resources

  • org.hornetq:module=JMS,type=Server for notifications on JMS resources

See Section 11.1.33, “Management Notification” for an example which shows how to use a JMS MessageListener to receive management notifications from HornetQ server.

Message counters can be used to obtain information on queues over time as HornetQ keeps a history on queue metrics.

They can be used to show trends on queues. For example, using the management API, it would be possible to query the number of messages in a queue at regular interval. However, this would not be enough to know if the queue is used: the number of messages can remain constant because nobody is sending or receiving messages from the queue or because there are as many messages sent to the queue than messages consumed from it. The number of messages in the queue remains the same in both cases but its use is widely different.

Message counters gives additional information about the queues:

By default, message counters are disabled as it might have a small negative effect on memory.

To enable message counters, you can set it to true in hornetq-configuration.xml:

<message-counter-enabled>true</message-counter-enabled>

Message counters keeps a history of the queue metrics (10 days by default) and samples all the queues at regular interval (10 seconds by default). If message counters are enabled, these values should be configured to suit your messaging use case in hornetq-configuration.xml:

<!-- keep history for a week -->
<message-counter-max-day-history>7</message-counter-max-day-history>
<!-- sample the queues every minute (60000ms) -->
<message-counter-sample-period>60000</message-counter-sample-period>

Message counters can be retrieved using the Management API. For example, to retrieve message counters on a JMS Queue using JMX:

// retrieve a connection to HornetQ's MBeanServer
MBeanServerConnection mbsc = ...
JMSQueueControlMBean queueControl = (JMSQueueControl)MBeanServerInvocationHandler.newProxyInstance(mbsc,
   on,
   JMSQueueControl.class,
   false);
// message counters are retrieved as a JSON String                                                                                                      
String counters = queueControl.listMessageCounter();
// use the MessageCounterInfo helper class to manipulate message counters more easily
MessageCounterInfo messageCounter = MessageCounterInfo.fromJSON(counters);         
System.out.format("%s message(s) in the queue (since last sample: %s)\n",
messageCounter.getDepth(),
messageCounter.getDepthDelta());

See Section 11.1.34, “Message Counter” for an example which shows how to use message counters to retrieve information on a JMS Queue.

Its possible to create and configure HornetQ resources via the admin console within the JBoss Application Server.

The Admin Console will allow you to create destinations (JMS Topics and Queues) and JMS Connection Factories.

Once logged in to the admin console you will see a JMS Manager item in the left hand tree. All HornetQ resources will be configured via this. This will have a child items for JMS Queues, Topics and Connection Factories, clicking on each node will reveal which resources are currently available. The following sections explain how to create and configure each resource in turn.

To create a new JMS Queue click on the JMS Queues item to reveal the available queues. On the right hand panel you will see an add a new resource button, click on this and then choose the default(JMS Queue) template and click continue. The important things to fill in here are the name of the queue and the JNDI name of the queue. The JNDI name is what you will use to look up the queue in JNDI from your client. For most queues this will be the only info you will need to provide as sensible defaults are provided for the others. You will also see a security roles section near the bottom. If you do not provide any roles for this queue then the servers default security configuration will be used, after you have created the queue these will be shown in the configuration. All configuration values, except the name and JNDI name, can be changed via the configuration tab after clicking on the queue in the admin console. The following section explains these in more detail

After highlighting the configuration you will see the following screen

The name and JNDI name cant be changed, if you want to change these recreate the queue with the appropriate settings. The rest of the configuration options, apart from security roles, relate to address settings for a particular address. The default address settings are picked up from the servers configuration, if you change any of these settings or create a queue via the console a new Address Settings enrty will be added. For a full explanation on Address Settings see Section 25.3, “Configuring Queues Via Address Settings”

To delete a queue simply click on the delete button beside the queue name in the main JMS Queues screen. This will also delete any address settings or security settings previously created for the queues address

The last part of the configuration options are security roles. If non are provided on creation then the servers default security settings will be shown. If these are changed or updated then new securty settings are created for the address of this queue. For more information on securuty setting see Chapter 31, Security

It is also possible via the metrics tab to view statistics for this queue. This will show statistics such as message count, consumer count etc.

Operations can be performed on a queue via the control tab. This will allow you to start and stop the queue, list,move,expire and delete messages from the queue and other useful operations. To invoke an operation click on the button for the operation you want, this will take you to a screen where you can parameters for the opertion can be set. Once set clicking the ok button will invoke the operation, results appear at the bottom of the screen.

This chapter describes how security works with HornetQ and how you can configure it. To disable security completely simply set the security-enabled property to false in the hornetq-configuration.xml file.

For performance reasons security is cached and invalidated every so long. To change this period set the property security-invalidation-interval, which is in milliseconds. The default is 10000 ms.

HornetQ contains a flexible role-based security model for applying security to queues, based on their addresses.

As explained in Chapter 8, Using Core, HornetQ core consists mainly of sets of queues bound to addresses. A message is sent to an address and the server looks up the set of queues that are bound to that address, the server then routes the message to those set of queues.

HornetQ allows sets of permissions to be defined against the queues based on their address. An exact match on the address can be used or a wildcard match can be used using the wildcard characters '#' and '*'.

Seven different permissions can be given to the set of queues which match the address. Those permissions are:

  • createDurableQueue. This permission allows the user to create a durable queue under matching addresses.

  • deleteDurableQueue. This permission allows the user to delete a durable queue under matching addresses.

  • createNonDurableQueue. This permission allows the user to create a non-durable queue under matching addresses.

  • deleteNonDurableQueue. This permission allows the user to delete a non-durable queue under matching addresses.

  • send. This permission allows the user to send a message to matching addresses.

  • consume. This permission allows the user to consume a message from a queue bound to matching addresses.

  • manage. This permission allows the user to invoke management operations by sending management messages to the management address.

For each permission, a list of roles who are granted that permission is specified. If the user has any of those roles, he/she will be granted that permission for that set of addresses.

Let's take a simple example, here's a security block from hornetq-configuration.xml or hornetq-queues.xml file:

<security-setting match="globalqueues.europe.#">
   <permission type="createDurableQueue" roles="admin"/>
   <permission type="deleteDurableQueue" roles="admin"/>
   <permission type="createNonDurableQueue" roles="admin, guest, europe-users"/>
   <permission type="deleteNonDurableQueue" roles="admin, guest, europe-users"/>
   <permission type="send" roles="admin, europe-users"/>
   <permission type="consume" roles="admin, europe-users"/>
</security-setting>

The '#' character signifies "any sequence of words". Words are delimited by the '.' character. For a full description of the wildcard syntax please see Chapter 13, Understanding the HornetQ Wildcard Syntax. The above security block applies to any address that starts with the string "globalqueues.europe.":

Only users who have the admin role can create or delete durable queues bound to an address that starts with the string "globalqueues.europe."

Any users with the roles admin, guest, or europe-users can create or delete temporary queues bound to an address that starts with the string "globalqueues.europe."

Any users with the roles admin or europe-users can send messages to these addresses or consume messages from queues bound to an address that starts with the string "globalqueues.europe."

The mapping between a user and what roles they have is handled by the security manager. HornetQ ships with a user manager that reads user credentials from a file on disk, and can also plug into JAAS or JBoss Application Server security.

For more information on configuring the security manager, please see Section 31.4, “Changing the security manager”.

There can be zero or more security-setting elements in each xml file. Where more than one match applies to a set of addresses the more specific match takes precedence.

Let's look at an example of that, here's another security-setting block:

<security-setting match="globalqueues.europe.orders.#">
   <permission type="send" roles="europe-users"/>
   <permission type="consume" roles="europe-users"/>
</security-setting>

In this security-setting block the match 'globalqueues.europe.orders.#' is more specific than the previous match 'globalqueues.europe.#'. So any addresses which match 'globalqueues.europe.orders.#' will take their security settings only from the latter security-setting block.

Note that settings are not inherited from the former block. All the settings will be taken from the more specific matching block, so for the address 'globalqueues.europe.orders.plastics' the only permissions that exist are send and consume for the role europe-users. The permissions createDurableQueue, deleteDurableQueue, createNonDurableQueue, deleteNonDurableQueue are not inherited from the other security-setting block.

By not inheriting permissions, it allows you to effectively deny permissions in more specific security-setting blocks by simply not specifying them. Otherwise it would not be possible to deny permissions in sub-groups of addresses.

The JBoss AS security manager is used when running HornetQ inside the JBoss Application server. This allows tight integration with the JBoss Application Server's security model.

The class name of this security manager is org.hornetq.integration.jboss.security.JBossASSecurityManager

Take a look at one of the default hornetq-jboss-beans.xml files for JBoss Application Server that are bundled in the distribution for an example of how this is configured.

HornetQ can be easily installed in JBoss Application Server 4 or later. For details on installing HornetQ in the JBoss Application Server please refer to quick-start guide.

Since HornetQ also provides a JCA adapter, it is also possible to integrate HornetQ as a JMS provider in other JEE compliant app servers. For instructions on how to integrate a remote JCA adaptor into another application sever, please consult the other application server's instructions.

A JCA Adapter basically controls the inflow of messages to Message-Driven Beans (MDBs) and the outflow of messages sent from other JEE components, e.g. EJBs and Servlets.

This section explains the basics behind configuring the different JEE components in the AS.

The delivery of messages to an MDB using HornetQ is configured on the JCA Adapter via a configuration file ra.xml which can be found under the jms-ra.rar directory. By default this is configured to consume messages using an InVM connector from the instance of HornetQ running within the application server. The configuration properties are listed later in this chapter.

All MDBs however need to have the destination type and the destination configured. The following example shows how this can be done using annotations:

@MessageDriven(name = "MDBExample", activationConfig =
{
   @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
   @ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/testQueue")
})
@ResourceAdapter("hornetq-ra.rar")
public class MDBExample implements MessageListener
{
   public void onMessage(Message message)...
}

In this example you can see that the MDB will consume messages from a queue that is mapped into JNDI with the binding queue/testQueue. This queue must be preconfigured in the usual way using the HornetQ configuration files.

The ResourceAdapter annotation is used to specify which adaptor should be used. To use this you will need to import org.jboss.ejb3.annotation.ResourceAdapter for JBoss AS 5.X and later version which can be found in the jboss-ejb3-ext-api.jar which can be found in the JBoss repository. For JBoss AS 4.X, the annotation to use is org.jboss.annotation.ejb.ResourceAdaptor.

Alternatively you can add use a deployment descriptor and add something like the following to jboss.xml

<message-driven>
   <ejb-name>ExampleMDB</ejb-name>
   <resource-adapter-name>hornetq-ra.rar</resource-adapter-name>
</message-driven>

You can also rename the hornetq-ra.rar directory to jms-ra.rar and neither the annotation or the extra descriptor information will be needed. If you do this you will need to edit the jms-ds.xml datasource file and change rar-name element.

All the examples shipped with the HornetQ distribution use the annotation.

When an MDB is using Container-Managed Transactions (CMT), the delivery of the message is done within the scope of a JTA transaction. The commit or rollback of this transaction is controlled by the container itself. If the transaction is rolled back then the message delivery semantics will kick in (by default, it will try to redeliver the message up to 10 times before sending to a DLQ). Using annotations this would be configured as follows:

@MessageDriven(name = "MDB_CMP_TxRequiredExample", activationConfig =
{
   @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
   @ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/testQueue")
})
@TransactionManagement(value= TransactionManagementType.CONTAINER)
@TransactionAttribute(value= TransactionAttributeType.REQUIRED)
@ResourceAdapter("hornetq-ra.rar")
public class MDB_CMP_TxRequiredExample implements MessageListener
{
   public void onMessage(Message message)...
}

The TransactionManagement annotation tells the container to manage the transaction. The TransactionAttribute annotation tells the container that a JTA transaction is required for this MDB. Note that the only other valid value for this is TransactionAttributeType.NOT_SUPPORTED which tells the container that this MDB does not support JTA transactions and one should not be created.

It is also possible to inform the container that it must rollback the transaction by calling setRollbackOnly on the MessageDrivenContext. The code for this would look something like:

@Resource
MessageDrivenContextContext ctx;

public void onMessage(Message message)
{
   try
   {
      //something here fails
   }
   catch (Exception e)
   {
      ctx.setRollbackOnly();
   }
}

If you do not want the overhead of an XA transaction being created every time but you would still like the message delivered within a transaction (i.e. you are only using a JMS resource) then you can configure the MDB to use a local transaction. This would be configured as such:

@MessageDriven(name = "MDB_CMP_TxLocalExample", activationConfig =
{
      @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
      @ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/testQueue"),
      @ActivationConfigProperty(propertyName = "useLocalTx", propertyValue = "true")
})
@TransactionManagement(value = TransactionManagementType.CONTAINER)
@TransactionAttribute(value = TransactionAttributeType.NOT_SUPPORTED)
@ResourceAdapter("hornetq-ra.rar")
public class MDB_CMP_TxLocalExample implements MessageListener
{
   public void onMessage(Message message)...
}

Message-driven beans can also be configured to use Bean-Managed Transactions (BMT). In this case a User Transaction is created. This would be configured as follows:

@MessageDriven(name = "MDB_BMPExample", activationConfig =
{
   @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
   @ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/testQueue"),
   @ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Dups-ok-acknowledge")
})
@TransactionManagement(value= TransactionManagementType.BEAN)
@ResourceAdapter("hornetq-ra.rar")
public class MDB_BMPExample implements MessageListener
{
   public void onMessage(Message message)
}

When using Bean-Managed Transactions the message delivery to the MDB will occur outside the scope of the user transaction and use the acknowledge mode specified by the user with the acknowledgeMode property. There are only 2 acceptable values for this Auto-acknowledge and Dups-ok-acknowledge. Please note that because the message delivery is outside the scope of the transaction a failure within the MDB will not cause the message to be redelivered.

A user would control the lifecycle of the transaction something like the following:

@Resource
MessageDrivenContext ctx;

public void onMessage(Message message)
{
   UserTransaction tx;
   try
   {
      TextMessage textMessage = (TextMessage)message;

      String text = textMessage.getText();

      UserTransaction tx = ctx.getUserTransaction();

      tx.begin();

      //do some stuff within the transaction

      tx.commit();

   }
   catch (Exception e)
   {
      tx.rollback();
   }
}

The JCA adapter can also be used for sending messages. The Connection Factory to use is configured by default in the jms-ds.xml file and is mapped to java:/JmsXA. Using this from within a JEE component will mean that the sending of the message will be done as part of the JTA transaction being used by the component.

This means that if the sending of the message fails the overall transaction would rollback and the message be re-sent. Heres an example of this from within an MDB:

@MessageDriven(name = "MDBMessageSendTxExample", activationConfig =
{
   @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
   @ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/testQueue")
})
@TransactionManagement(value= TransactionManagementType.CONTAINER)
@TransactionAttribute(value= TransactionAttributeType.REQUIRED)
@ResourceAdapter("hornetq-ra.rar")
public class MDBMessageSendTxExample implements MessageListener
{
   @Resource(mappedName = "java:/JmsXA")
   ConnectionFactory connectionFactory;

   @Resource(mappedName = "queue/replyQueue")
   Queue replyQueue;

   public void onMessage(Message message)
   {
      Connection conn = null;
      try
      {
         //Step 9. We know the client is sending a text message so we cast
         TextMessage textMessage = (TextMessage)message;

         //Step 10. get the text from the message.
         String text = textMessage.getText();

         System.out.println("message " + text);

         conn = connectionFactory.createConnection();

         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

         MessageProducer producer = sess.createProducer(replyQueue);

         producer.send(sess.createTextMessage("this is a reply"));

      }
      catch (Exception e)
      {
         e.printStackTrace();
      }
      finally
      {
         if(conn != null)
         {
            try
            {
               conn.close();
            }
            catch (JMSException e)
            { 
            }
         }
      }
   }
   }

In JBoss Application Server you can use the JMS JCA adapter for sending messages from EJBs (including Session, Entity and Message-Driven Beans), Servlets (including jsps) and custom MBeans.

The Java Connector Architecture (JCA) Adapter is what allows HornetQ to be integrated with JEE components such as MDBs and EJBs. It configures how components such as MDBs consume messages from the HornetQ server and also how components such as EJBs or Servlets can send messages.

The HornetQ JCA adapter is deployed via the jms-ra.rar archive. The configuration of the adapter is found in this archive under META-INF/ra.xml.

The configuration will look something like the following:

<resourceadapter>
   <resourceadapter-class>org.hornetq.ra.HornetQResourceAdapter</resourceadapter-class>
   <config-property>
      <description>The transport type. Multiple connectors can be configured by using a comma separated list,
         i.e. org.hornetq.core.remoting.impl.invm.InVMConnectorFactory,org.hornetq.core.remoting.impl.invm.InVMConnectorFactory.</description>
      <config-property-name>ConnectorClassName</config-property-name>
      <config-property-type>java.lang.String</config-property-type>
      <config-property-value>org.hornetq.core.remoting.impl.invm.InVMConnectorFactory</config-property-value>
   </config-property>
   <config-property>
      <description>The transport configuration. These values must be in the form of key=val;key=val;,
         if multiple connectors are used then each set must be separated by a comma i.e. host=host1;port=5445,host=host2;port=5446.
         Each set of params maps to the connector classname specified.</description>
      <config-property-name>ConnectionParameters</config-property-name>
      <config-property-type>java.lang.String</config-property-type>
      <config-property-value>server-id=0</config-property-value>
   </config-property>

   <outbound-resourceadapter>
      <connection-definition>
         <managedconnectionfactory-class>org.hornetq.ra.HornetQRAManagedConnection
         Factory</managedconnectionfactory-class>

         <config-property>
            <description>The default session type</description>
            <config-property-name>SessionDefaultType</config-property-name>
            <config-property-type>java.lang.String</config-property-type>
            <config-property-value>javax.jms.Queue</config-property-value>
         </config-property>
         <config-property>
            <description>Try to obtain a lock within specified number of seconds; less
            than or equal to 0 disable this functionality</description>
            <config-property-name>UseTryLock</config-property-name>
            <config-property-type>java.lang.Integer</config-property-type>
            <config-property-value>0</config-property-value>
         </config-property>

         <connectionfactory-interface>org.hornetq.ra.HornetQRAConnectionFactory
         </connectionfactory-interface>
         <connectionfactororg.hornetq.ra.HornetQConnectionFactoryImplonFactoryImpl
         </connectionfactory-impl-class>
         <connection-interface>javax.jms.Session</connection-interface>
         <connection-impl-class>org.hornetq.ra.HornetQRASession
         </connection-impl-class>
      </connection-definition>
      <transaction-support>XATransaction</transaction-support>
      <authentication-mechanism>
         <authentication-mechanism-type>BasicPassword
         </authentication-mechanism-type>
         <credential-interface>javax.resource.spi.security.PasswordCredential
         </credential-interface>
      </authentication-mechanism>
      <reauthentication-support>false</reauthentication-support>
   </outbound-resourceadapter>

   <inbound-resourceadapter>
      <messageadapter>
         <messagelistener>
            <messagelistener-type>javax.jms.MessageListener</messagelistener-type>
            <activationspec>
               <activationspec-class>org.hornetq.ra.inflow.HornetQActivationSpec
               </activationspec-class>
               <required-config-property>
                   <config-property-name>destination</config-property-name>
               </required-config-property>
            </activationspec>
         </messagelistener>
      </messageadapter>
   </inbound-resourceadapter>
</resourceadapter>

There are three main parts to this configuration.

The first element you see is resourceadapter-class which should be left unchanged. This is the HornetQ resource adapter class.

After that there is a list of configuration properties. This will be where most of the configuration is done. The first two properties configure the transport used by the adapter and the rest configure the connection factory itself.

The following table explains what each property is for.

Table 32.1. Global Configuration Properties

Property NameProperty TypeProperty Description
ConnectorClassNameStringThe Connector class name (see Chapter 16, Configuring the Transport for more information). If multiple connectors are needed this should be provided as a comma separated list.
ConnectionParametersStringThe transport configuration. These parameters must be in the form of key1=val1;key2=val2; and will be specific to the connector used. If multiple connectors are configured then params should be supplied for each connector separated by a comma.
hAbooleanTrue if high availability is needed.
useLocalTxbooleanTrue will enable local transaction optimisation.
UserNameStringThe user name to use when making a connection
PasswordStringThe password to use when making a connection
DiscoveryAddressStringThe discovery group address to use to autodetect a server
DiscoveryPortIntegerThe port to use for discovery
DiscoveryRefreshTimeoutLongThe timeout, in milliseconds, to refresh.
DiscoveryInitialWaitTimeout LongThe initial time to wait for discovery.
ConnectionLoadBalancingPolicyClassName StringThe load balancing policy class to use.
ConnectionTTL LongThe time to live (in milliseconds) for the connection.
CallTimeout Longthe call timeout (in milliseconds) for each packet sent.
DupsOKBatchSize Integerthe batch size (in bytes) between acknowledgements when using DUPS_OK_ACKNOWLEDGE mode
TransactionBatchSize Integerthe batch size (in bytes) between acknowledgements when using a transactional session
ConsumerWindowSize Integerthe window size (in bytes) for consumer flow control
ConsumerMaxRate Integerthe fastest rate a consumer may consume messages per second
ConfirmationWindowSize Integerthe window size (in bytes) for reattachment confirmations
ProducerMaxRate Integerthe maximum rate of messages per second that can be sent
MinLargeMessageSize Integerthe size (in bytes) before a message is treated as large
BlockOnAcknowledge Booleanwhether or not messages are acknowledged synchronously
BlockOnNonDurableSend Booleanwhether or not non-durable messages are sent synchronously
BlockOnDurableSend Booleanwhether or not durable messages are sent synchronously
AutoGroup Booleanwhether or not message grouping is automatically used
PreAcknowledge Booleanwhether messages are pre acknowledged by the server before sending
ReconnectAttempts Integermaximum number of retry attempts, default for the resource adpater is -1 (infinite attempts)
RetryInterval Longthe time (in milliseconds) to retry a connection after failing
RetryIntervalMultiplier Doublemultiplier to apply to successive retry intervals
FailoverOnServerShutdown BooleanIf true client will reconnect to another server if available
ClientID Stringthe pre-configured client ID for the connection factory
ClientFailureCheckPeriod Longthe period (in ms) after which the client will consider the connection failed after not receiving packets from the server
UseGlobalPools Booleanwhether or not to use a global thread pool for threads
ScheduledThreadPoolMaxSize Integerthe size of the scheduled thread pool
ThreadPoolMaxSize Integerthe size of the thread pool
SetupAttemptsIntegerNumber of attempts to setup a JMS connection (default is 10, -1 means to attempt infinitely). It is possible that the MDB is deployed before the JMS resources are available. In that case, the resource adapter will try to setup several times until the resources are available. This applies only for inbound connections
SetupIntervalLongInterval in milliseconds between consecutive attemps to setup a JMS connection (default is 2000m). This applies only for inbound connections

The outbound configuration should remain unchanged as they define connection factories that are used by Java EE components. These Connection Factories can be defined inside a configuration file that matches the name *-ds.xml. You'll find a default jms-ds.xml configuration under the hornetq directory in the JBoss AS deployment. The connection factories defined in this file inherit their properties from the main ra.xml configuration but can also be overridden. The following example shows how to override them.

<tx-connection-factory>
   <jndi-name>RemoteJmsXA</jndi-name>
   <xa-transaction/>
   <rar-name>jms-ra.rar</rar-name>
   <connection-definition>org.hornetq.ra.HornetQRAConnectionFactory
</connection-definition>
<config-property name="SessionDefaultType" type="String">javax.jms.Topic</config-property>
   <config-property name="ConnectorClassName" type="String">
      org.hornetq.core.remoting.impl.netty.NettyConnectorFactory
   </config-property>
   <config-property name="ConnectionParameters" type="String">
      port=5445</config-property>
   <max-pool-size>20</max-pool-size>
</tx-connection-factory>

In this example the connection factory will be bound to JNDI with the name RemoteJmsXA and can be looked up in the usual way using JNDI or defined within the EJB or MDB as such:

@Resource(mappedName="java:/RemoteJmsXA")
private ConnectionFactory connectionFactory;

The config-property elements are what overrides those in the ra.xml configuration file. Any of the elements pertaining to the connection factory can be overridden here.

The outbound configuration also defines additional properties in addition to the global configuration properties.


Sometime you may want your messaging server on a different machine or separate from the application server. If this is the case you will only need the hornetq client libs installed. This section explains what config to create and what jar dependencies are needed.

There are two configuration files needed to do this, one for the incoming adapter used for MDB's and one for outgoing connections managed by the JCA managed connection pool used by outgoing JEE components wanting outgoing connections.

Firstly you will need to create directory under the deploy directory ending in .rar. For this example we will name the directory hornetq-ra.rar. This detail is important as the name of directory is referred to by the MDB's and the outgoing configuration.

Under the hornetq-ra.rar directory you will need to create a META-INF directory into which you should create an ra.xml configuration file. You can find a template for the ra.xml under the config directory of the HornetQ distribution.

To configure MDB's to consume messages from a remote HornetQ server you need to edit the ra.xml file under deploy/hornet-ra.rar/META-INF and change the transport type to use a netty connector (instead of the invm connector that is defined) and configure its transport params. Heres an example of what this would look like:

<resourceadapter-class>org.hornetq.ra.HornetQResourceAdapter</resourceadapter-class>
   <config-property>
      <description>The transport type</description>
      <config-property-name>ConnectorClassName</config-property-name>
      <config-property-type>java.lang.String</config-property-type>
      <config-property-value>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</config-property-value>
   </config-property>
      <config-property>
      <description>The transport configuration. These values must be in the form of key=val;key=val;</description>
      <config-property-name>ConnectionParameters</config-property-name>
      <config-property-type>java.lang.String</config-property-type>
   <config-property-value>host=127.0.0.1;port=5446</config-property-value>
</config-property>

If you want to provide a list of servers that the adapter can connect to you can provide a list of connectors, each separated by a comma.

<resourceadapter-class>org.hornetq.ra.HornetQResourceAdapter</resourceadapter-class>
   <config-property>
      <description>The transport type</description>
      <config-property-name>ConnectorClassName</config-property-name>
      <config-property-type>java.lang.String</config-property-type>
      <config-property-value>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory,org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</config-property-value>
   </config-property>
      <config-property>
      <description>The transport configuration. These values must be in the form of key=val;key=val;</description>
      <config-property-name>ConnectionParameters</config-property-name>
      <config-property-type>java.lang.String</config-property-type>
   <config-property-value>host=127.0.0.1;port=5446,host=127.0.0.2;port=5447</config-property-value>
</config-property>

This configures the resource adapter to connect to a server running on localhost listening on port 5446

You will also need to configure the outbound connection by creating a hornetq-ds.xml and placing it under any directory that will be deployed under the deploy directory. In a standard HornetQ jboss configuration this would be under horneq or hornetq.sar but you can place it where ever you like. Actually as long as it ends in -ds.xml you can call it anything you like. You can again find a template for this file under the config directory of the HornetQ distribution but called jms-ds.xml which is the jboss default.

The following example shows a sample configuration

<tx-connection-factory>
   <jndi-name>RemoteJmsXA</jndi-name>
   <xa-transaction/>
   <rar-name>hornetq-ra.rar</rar-name>
   <connection-definition>org.hornetq.ra.HornetQRAConnectionFactory</connection-definition>
   <config-property name="SessionDefaultType" type="java.lang.String">javax.jms.Topic</config-property>
   <config-property name="ConnectorClassName" type="java.lang.String">org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</config-property>
   <config-property name="ConnectionParameters" type="java.lang.String">host=127.0.0.1;port=5446</config-property>
   <max-pool-size>20</max-pool-size>
</tx-connection-factory>

Again you will see that this uses the netty connector type and will connect to the HornetQ server running on localhost and listening on port 5446. JEE components can access this by using JNDI and looking up the connection factory using JNDI using java:/RemoteJmsXA, you can see that this is defined under thejndi-name attribute. You will also note that the outgoing connection will be created by the resource adaptor configured under the directory hornetq-ra.rar as explained in the last section.

Also if you want to configure multiple connectors do this as a comma separated list as in the ra configuration.

This is a step by step guide on how to configure a JBoss application server that doesn't have HornetQ installed to use a remote instance of HornetQ

Firstly download and install JBoss AS 5 as per the JBoss installation guide and HornetQ as per the HornetQ installation guide. After thatt he following steps are required

At this point you should be able to now deploy MDB's that consume from the remote server. You will however, have to make sure that your MDB's have the annotation @ResourceAdapter("hornetq-ra.rar") added, this is illustrated in the Section 32.1, “Configuring Message-Driven Beans” section. If you don't want to add this annotation then you can delete the generic resource adapter jms-ra.rar and rename the hornetq-ra.rar to this.

If you also want to use the remote HornetQ server for outgoing connections, i.e. sending messages, then do the following:

  • Create a file called hornetq-ds.xml in the deploy directory (in fact you can call this anything you want as long as it ends in -ds.xml). Then add the following:

    <connection-factories>
      <!--
       JMS XA Resource adapter, use this for outbound JMS connections.
       Inbound connections are defined at the @MDB activation or at the resource-adapter properties.
      -->
      <tx-connection-factory>
         <jndi-name>RemoteJmsXA</jndi-name>
         <xa-transaction/>
         <rar-name>hornetq-ra.rar</rar-name>
         <connection-definition>org.hornetq.ra.HornetQRAConnectionFactory</connection-definition>
         <config-property name="SessionDefaultType" type="java.lang.String">javax.jms.Topic</config-property>
         <config-property name="ConnectorClassName" type="java.lang.String">org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</config-property>
         <config-property name="ConnectionParameters" type="java.lang.String">host=127.0.0.1;port=5445</config-property>
         <max-pool-size>20</max-pool-size>
      </tx-connection-factory>
    
    
    </connection-factories>

    Again you will see that the host and port are configured here to match the remote HornetQ servers configuration. The other important attributes are:

    • jndi-name - This is the name used to look up the JMS connection factory from within your JEE client

    • rar-name - This should match the directory that you created to hold the Resource Adapter configuration

Now you should be able to send messages using the JCA JMS connection pooling within an XA transaction.

XA recovery deals with system or application failures to ensure that of a transaction are applied consistently to all resources affected by the transaction, even if any of the application processes or the machine hosting them crash or lose network connectivity. For more information on XA Recovery,please refer to JBoss Transactions.

When HornetQ is integrated with JBoss AS, it can take advantage of JBoss Transactions to provide recovery of messaging resources. If messages are involved in a XA transaction, in the event of a server crash, the recovery manager will ensure that the transactions are recovered and the messages will either be committed or rolled back (depending on the transaction outcome) when the server is restarted.

To enable HornetQ's XA Recovery, the Recovery Manager must be configured to connect to HornetQ to recover its resources. The following property must be added to the jta section of conf/jbossts-properties.xml of JBoss AS profiles:

<properties depends="arjuna" name="jta">
   ...
                     
   <property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.HornetQ1"
                value="org.hornetq.jms.server.recovery.HornetQXAResourceRecovery;[connection configuration]"/>
   <property name="com.arjuna.ats.jta.xaRecoveryNode" value="1"/>
</properties>

The [connection configuration] contains all the information required to connect to HornetQ node under the form [connector factory class name],[user name], [password], [connector parameters].

Also note the com.arjuna.ats.jta.xaRecoveryNode parameter. If you want recovery enabled then this must be configured to what ever the tx node id is set to, this is configured in the same file by the com.arjuna.ats.arjuna.xa.nodeIdentifier property.

Note

HornetQ must have a valid acceptor which corresponds to the connector specified in conf/jbossts-properties.xml.

If HornetQ is configured with a default in-vm acceptor:

<acceptor name="in-vm">
   <factory-class>org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory</factory-class>
</acceptor>

the corresponding configuration in conf/jbossts-properties.xml is:

<property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.HORNETQ1"
   value="org.hornetq.jms.server.recovery.HornetQXAResourceRecovery;org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"/>

If it is now configured with a netty acceptor on a non-default port:

<acceptor name="netty">
   <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
   <param key="port" value="8888"/>
</acceptor>

the corresponding configuration in conf/jbossts-properties.xml is:

<property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.HORNETQ1"
       value="org.hornetq.jms.server.recovery.HornetQXAResourceRecovery;org.hornetq.core.remoting.impl.netty.NettyConnectorFactory, , , port=8888"/>

If the recovery must use admin, adminpass, the configuration would have been:

<property name="com.arjuna.ats.jta.recovery.XAResourceRecovery.HORNETQ1"
      value="org.hornetq.jms.server.recovery.HornetQXAResourceRecovery;org.hornetq.core.remoting.impl.netty.NettyConnectorFactory, admin, adminpass, port=8888"/>

Configuring HornetQ with an invm acceptor and configuring the Recovery Manager with an invm connector is the recommended way to enable XA Recovery.

See Section 11.3.9, “XA Recovery” which shows how to configure XA Recovery and recover messages after a server crash.

HornetQ includes a fully functional JMS message bridge.

The function of the bridge is to consume messages from a source queue or topic, and send them to a target queue or topic, typically on a different server.

The source and target servers do not have to be in the same cluster which makes bridging suitable for reliably sending messages from one cluster to another, for instance across a WAN, and where the connection may be unreliable.

A bridge can be deployed as a standalone application, with HornetQ standalone server or inside a JBoss AS instance. The source and the target can be located in the same virtual machine or another one.

The bridge can also be used to bridge messages from other non HornetQ JMS servers, as long as they are JMS 1.1 compliant.

Note

Do not confuse a JMS bridge with a core bridge. A JMS bridge can be used to bridge any two JMS 1.1 compliant JMS providers and uses the JMS API. A core bridge (described in Chapter 36, Core Bridges) is used to bridge any two HornetQ instances and uses the core API. Always use a core bridge if you can in preference to a JMS bridge. The core bridge will typically provide better performance than a JMS bridge. Also the core bridge can provide once and only once delivery guarantees without using XA.

The bridge has built-in resilience to failure so if the source or target server connection is lost, e.g. due to network failure, the bridge will retry connecting to the source and/or target until they come back online. When it comes back online it will resume operation as normal.

The bridge can be configured with an optional JMS selector, so it will only consume messages matching that JMS selector

It can be configured to consume from a queue or a topic. When it consumes from a topic it can be configured to consume using a non durable or durable subscription

Typically, the bridge is deployed by the JBoss Micro Container via a beans configuration file. This would typically be deployed inside the JBoss Application Server and the following example shows an example of a beans file that bridges 2 destinations which are actually on the same server.

<?xml version="1.0" encoding="UTF-8"?>
<deployment xmlns="urn:jboss:bean-deployer:2.0">
   <bean name="JMSBridge" class="org.hornetq.api.jms.bridge.impl.JMSBridgeImpl">
      <!-- HornetQ must be started before the bridge -->
      <depends>HornetQServer</depends>
      <constructor>
         <!-- Source ConnectionFactory Factory -->
         <parameter>
            <inject bean="SourceCFF"/>
         </parameter>
         <!-- Target ConnectionFactory Factory -->
         <parameter>
            <inject bean="TargetCFF"/>
         </parameter>
         <!-- Source DestinationFactory -->
         <parameter>
            <inject bean="SourceDestinationFactory"/>
         </parameter>
         <!-- Target DestinationFactory -->
         <parameter>
            <inject bean="TargetDestinationFactory"/>
         </parameter>
         <!-- Source User Name (no username here) -->
         <parameter><null /></parameter>
         <!-- Source Password (no password here)-->
         <parameter><null /></parameter>
         <!-- Target User Name (no username here)-->
         <parameter><null /></parameter>
         <!-- Target Password (no password here)-->
         <parameter><null /></parameter>
         <!-- Selector -->
         <parameter><null /></parameter>
         <!-- Failure Retry Interval (in ms) -->
         <parameter>5000</parameter>
         <!-- Max Retries -->
         <parameter>10</parameter>
         <!-- Quality Of Service -->
         <parameter>ONCE_AND_ONLY_ONCE</parameter>
         <!-- Max Batch Size -->
         <parameter>1</parameter>
         <!-- Max Batch Time (-1 means infinite) -->
         <parameter>-1</parameter>
         <!-- Subscription name (no subscription name here)-->
         <parameter><null /></parameter>
         <!-- Client ID  (no client ID here)-->
         <parameter><null /></parameter>
         <!-- Add MessageID In Header -->
         <parameter>true</parameter>
         <!-- register the JMS Bridge in the AS MBeanServer -->
         <parameter>
            <inject bean="MBeanServer"/>
         </parameter>
         <parameter>org.hornetq:service=JMSBridge</parameter>
      </constructor>
      <property name="transactionManager">
         <inject bean="RealTransactionManager"/>
      </property>
   </bean>

   <!-- SourceCFF describes the ConnectionFactory used to connect to the source destination -->
   <bean name="SourceCFF"
       class="org.hornetq.api.jms.bridge.impl.JNDIConnectionFactoryFactory">
      <constructor>
         <parameter>
            <inject bean="JNDI" />
         </parameter>
         <parameter>/ConnectionFactory</parameter>
      </constructor>
   </bean>

   <!-- TargetCFF describes the ConnectionFactory used to connect to the target destination -->
   <bean name="TargetCFF"
       class="org.hornetq.api.jms.bridge.impl.JNDIConnectionFactoryFactory">
      <constructor>
         <parameter>
            <inject bean="JNDI" />
         </parameter>
         <parameter>/ConnectionFactory</parameter>
      </constructor>
   </bean>

   <!-- SourceDestinationFactory describes the Destination used as the source -->
   <bean name="SourceDestinationFactory" class="org.hornetq.api.jms.bridge.impl.JNDIDestinationFactory">
      <constructor>
         <parameter>
            <inject bean="JNDI" />
         </parameter>
         <parameter>/queue/source</parameter>
      </constructor>
   </bean>

   <!-- TargetDestinationFactory describes the Destination used as the target -->
   <bean name="TargetDestinationFactory" class="org.hornetq.api.jms.bridge.impl.JNDIDestinationFactory">
      <constructor>
         <parameter>
            <inject bean="JNDI" />
         </parameter>
         <parameter>/queue/target</parameter>
      </constructor>
   </bean>

   <!-- JNDI is a Hashtable containing the JNDI properties required -->
   <!-- to connect to the sources and targets JMS resrouces         -->
   <bean name="JNDI" class="java.util.Hashtable">
      <constructor class="java.util.Map">
         <map class="java.util.Hashtable" keyClass="String"
                                          valueClass="String">
            <entry>
               <key>java.naming.factory.initial</key>
               <value>org.jnp.interfaces.NamingContextFactory</value>
            </entry>
            <entry>
               <key>java.naming.provider.url</key>
               <value>jnp://localhost:1099</value>
            </entry>
            <entry>
               <key>java.naming.factory.url.pkgs</key>
               <value>org.jboss.naming:org.jnp.interfaces"</value>
            </entry>
            <entry>
               <key>jnp.timeout</key>
               <value>5000</value>
            </entry>
            <entry>
               <key>jnp.sotimeout</key>
               <value>5000</value>
            </entry>
         </map>
      </constructor>
   </bean>

   <bean name="MBeanServer" class="javax.management.MBeanServer">
      <constructor factoryClass="org.jboss.mx.util.MBeanServerLocator" factoryMethod="locateJBoss"/>
   </bean>
</deployment>

The main bean deployed is the JMSBridge bean. The bean is configurable by the parameters passed to its constructor.

The quality of service modes used by the bridge are described here in more detail.

This QoS mode ensures messages will reach the destination from the source once and only once. (Sometimes this mode is known as "exactly once"). If both the source and the destination are on the same HornetQ server instance then this can be achieved by sending and acknowledging the messages in the same local transaction. If the source and destination are on different servers this is achieved by enlisting the sending and consuming sessions in a JTA transaction. The JTA transaction is controlled by JBoss Transactions JTA * implementation which is a fully recovering transaction manager, thus providing a very high degree of durability. If JTA is required then both supplied connection factories need to be XAConnectionFactory implementations. This is likely to be the slowest mode since it requires extra persistence for the transaction logging.

This mode is only available for durable messages.

Please see Section 11.3.5, “JMS Bridge” which shows how to configure and use a JMS Bridge with JBoss AS to send messages to the source destination and consume them from the target destination.

Please see Section 11.1.28, “JMS Bridge” which shows how to configure and use a JMS Bridge between two standalone HornetQ servers.

HornetQ clients can be configured to automatically reconnect or re-attach to the server in the event that a failure is detected in the connection between the client and the server.

If the failure was due to some transient failure such as a temporary network failure, and the target server was not restarted, then the sessions will still be existent on the server, asssuming the client hasn't been disconnected for more than connection-ttl Chapter 17, Detecting Dead Connections.

In this scenario, HornetQ will automatically re-attach the client sessions to the server sessions when the connection reconnects. This is done 100% transparently and the client can continue exactly as if nothing had happened.

The way this works is as follows:

As HornetQ clients send commands to their servers they store each sent command in an in-memory buffer. In the case that connection failure occurs and the client subsequently reattaches to the same server, as part of the reattachment protocol the server informs the client during reattachment with the id of the last command it successfully received from that client.

If the client has sent more commands than were received before failover it can replay any sent commands from its buffer so that the client and server can reconcile their states.

The size of this buffer is configured by the ConfirmationWindowSize parameter, when the server has received ConfirmationWindowSize bytes of commands and processed them it will send back a command confirmation to the client, and the client can then free up space in the buffer.

If you are using JMS and you're using the JMS service on the server to load your JMS connection factory instances into JNDI then this parameter can be configured in hornetq-jms.xml using the element confirmation-window-size a. If you're using JMS but not using JNDI then you can set these values directly on the HornetQConnectionFactory instance using the appropriate setter method.

If you're using the core API you can set these values directly on the ServerLocator instance using the appropriate setter method.

The window is specified in bytes.

Setting this parameter to -1 disables any buffering and prevents any re-attachment from occurring, forcing reconnect instead. The default value for this parameter is -1. (Which means by default no auto re-attachment will occur)

Client reconnection is configured using the following parameters:

If you're using JMS, and you're using the JMS Service on the server to load your JMS connection factory instances directly into JNDI, then you can specify these parameters in the xml configuration in hornetq-jms.xml, for example:

<connection-factory name="ConnectionFactory">
<connectors>
   <connector-ref connector-name="netty"/>
</connectors>
<entries>
   <entry name="ConnectionFactory"/>
   <entry name="XAConnectionFactory"/>
</entries>
<retry-interval>1000</retry-interval>
<retry-interval-multiplier>1.5</retry-interval-multiplier>
<max-retry-interval>60000</max-retry-interval>
<reconnect-attempts>1000</reconnect-attempts>
</connection-factory>

If you're using JMS, but instantiating your JMS connection factory directly, you can specify the parameters using the appropriate setter methods on the HornetQConnectionFactory immediately after creating it.

If you're using the core API and instantiating the ServerLocator instance directly you can also specify the parameters using the appropriate setter methods on the ServerLocator immediately after creating it.

If your client does manage to reconnect but the session is no longer available on the server, for instance if the server has been restarted or it has timed out, then the client won't be able to re-attach, and any ExceptionListener or FailureListener instances registered on the connection or session will be called.

HornetQ allows you to configure objects called diverts with some simple server configuration.

Diverts allow you to transparently divert messages routed to one address to some other address, without making any changes to any client application logic.

Diverts can be exclusive, meaning that the message is diverted to the new address, and does not go to the old address at all, or they can be non-exclusive which means the message continues to go the old address, and a copy of it is also sent to the new address. Non-exclusive diverts can therefore be used for splitting message flows, e.g. there may be a requirement to monitor every order sent to an order queue.

Diverts can also be configured to have an optional message filter. If specified then only messages that match the filter will be diverted.

Diverts can also be configured to apply a Transformer. If specified, all diverted messages will have the opportunity of being transformed by the Transformer.

A divert will only divert a message to an address on the same server, however, if you want to divert to an address on a different server, a common pattern would be to divert to a local store-and-forward queue, then set up a bridge which consumes from that queue and forwards to an address on a different server.

Diverts are therefore a very sophisticated concept, which when combined with bridges can be used to create interesting and complex routings. The set of diverts on a server can be thought of as a type of routing table for messages. Combining diverts with bridges allows you to create a distributed network of reliable routing connections between multiple geographically distributed servers, creating your global messaging mesh.

Diverts are defined as xml in the hornetq-configuration.xml file. There can be zero or more diverts in the file.

Please see Section 11.1.18, “Divert” for a full working example showing you how to configure and use diverts.

Let's take a look at some divert examples:

Let's take a look at an exclusive divert. An exclusive divert diverts all matching messages that are routed to the old address to the new address. Matching messages do not get routed to the old address.

Here's some example xml configuration for an exclusive divert, it's taken from the divert example:

<divert name="prices-divert">
   <address>jms.topic.priceUpdates</address>
   <forwarding-address>jms.queue.priceForwarding</forwarding-address>
   <filter string="office='New York'"/>
   <transformer-class-name>
      org.hornetq.jms.example.AddForwardingTimeTransformer
   </transformer-class-name>
   <exclusive>true</exclusive>
</divert>

We define a divert called 'prices-divert' that will divert any messages sent to the address 'jms.topic.priceUpdates' (this corresponds to any messages sent to a JMS Topic called 'priceUpdates') to another local address 'jms.queue.priceForwarding' (this corresponds to a local JMS queue called 'priceForwarding'

We also specify a message filter string so only messages with the message property office with value New York will get diverted, all other messages will continue to be routed to the normal address. The filter string is optional, if not specified then all messages will be considered matched.

In this example a transformer class is specified. Again this is optional, and if specified the transformer will be executed for each matching message. This allows you to change the messages body or properties before it is diverted. In this example the transformer simply adds a header that records the time the divert happened.

This example is actually diverting messages to a local store and forward queue, which is configured with a bridge which forwards the message to an address on another HornetQ server. Please see the example for more details.

The function of a bridge is to consume messages from a source queue, and forward them to a target address, typically on a different HornetQ server.

The source and target servers do not have to be in the same cluster which makes bridging suitable for reliably sending messages from one cluster to another, for instance across a WAN, or internet and where the connection may be unreliable.

The bridge has built in resilience to failure so if the target server connection is lost, e.g. due to network failure, the bridge will retry connecting to the target until it comes back online. When it comes back online it will resume operation as normal.

In summary, bridges are a way to reliably connect two separate HornetQ servers together. With a core bridge both source and target servers must be HornetQ servers.

Bridges can be configured to provide once and only once delivery guarantees even in the event of the failure of the source or the target server. They do this by using duplicate detection (described in Chapter 37, Duplicate Message Detection).

Note

Although they have similar function, don't confuse core bridges with JMS bridges!

Core bridges are for linking a HornetQ node with another HornetQ node and do not use the JMS API. A JMS Bridge is used for linking any two JMS 1.1 compliant JMS providers. So, a JMS Bridge could be used for bridging to or from different JMS compliant messaging system. It's always preferable to use a core bridge if you can. Core bridges use duplicate detection to provide once and only once guarantees. To provide the same guarantee using a JMS bridge you would have to use XA which has a higher overhead and is more complex to configure.

Bridges are configured in hornetq-configuration.xml. Let's kick off with an example (this is actually from the bridge example):

<bridge name="my-bridge">
   <queue-name>jms.queue.sausage-factory</queue-name>
   <forwarding-address>jms.queue.mincing-machine</forwarding-address>
   <filter-string="name='aardvark'"/>
   <transformer-class-name>
      org.hornetq.jms.example.HatColourChangeTransformer
   </transformer-class-name>
   <retry-interval>1000</retry-interval>
   <ha>true</ha>
   <retry-interval-multiplier>1.0</retry-interval-multiplier>
   <reconnect-attempts>-1</reconnect-attempts>
   <failover-on-server-shutdown>false</failover-on-server-shutdown>
   <use-duplicate-detection>true</use-duplicate-detection>
   <confirmation-window-size>10000000</confirmation-window-size>
   <connector-ref connector-name="remote-connector" backup-connector-name="backup-remote-connector"/>
   <user>foouser</user>
   <password>foopassword</password>
</bridge>

In the above example we have shown all the parameters its possible to configure for a bridge. In practice you might use many of the defaults so it won't be necessary to specify them all explicitly.

Let's take a look at all the parameters in turn:

  • name attribute. All bridges must have a unique name in the server.

  • queue-name. This is the unique name of the local queue that the bridge consumes from, it's a mandatory parameter.

    The queue must already exist by the time the bridge is instantiated at start-up.

  • forwarding-address. This is the address on the target server that the message will be forwarded to. If a forwarding address is not specified, then the original address of the message will be retained.

  • filter-string. An optional filter string can be supplied. If specified then only messages which match the filter expression specified in the filter string will be forwarded. The filter string follows the HornetQ filter expression syntax described in Chapter 14, Filter Expressions.

  • transformer-class-name. An optional transformer-class-name can be specified. This is the name of a user-defined class which implements the org.hornetq.core.server.cluster.Transformer interface.

    If this is specified then the transformer's transform() method will be invoked with the message before it is forwarded. This gives you the opportunity to transform the message's header or body before forwarding it.

  • ha. This optional parameter determines whether or not this bridge should support high availability. True means it will connect to any available server in a cluster and support failover. The default value is false.

  • retry-interval. This optional parameter determines the period in milliseconds between subsequent reconnection attempts, if the connection to the target server has failed. The default value is 2000milliseconds.

  • retry-interval-multiplier. This optional parameter determines determines a multiplier to apply to the time since the last retry to compute the time to the next retry.

    This allows you to implement an exponential backoff between retry attempts.

    Let's take an example:

    If we set retry-intervalto 1000 ms and we set retry-interval-multiplier to 2.0, then, if the first reconnect attempt fails, we will wait 1000 ms then 2000 ms then 4000 ms between subsequent reconnection attempts.

    The default value is 1.0 meaning each reconnect attempt is spaced at equal intervals.

  • reconnect-attempts. This optional parameter determines the total number of reconnect attempts the bridge will make before giving up and shutting down. A value of -1 signifies an unlimited number of attempts. The default value is -1.

  • failover-on-server-shutdown. This optional parameter determines whether the bridge will attempt to failover onto a backup server (if specified) when the target server is cleanly shutdown rather than crashed.

    The bridge connector can specify both a live and a backup server, if it specifies a backup server and this parameter is set to true then if the target server is cleanly shutdown the bridge connection will attempt to failover onto its backup. If the bridge connector has no backup server configured then this parameter has no effect.

    Sometimes you want a bridge configured with a live and a backup target server, but you don't want to failover to the backup if the live server is simply taken down temporarily for maintenance, this is when this parameter comes in handy.

    The default value for this parameter is false.

  • use-duplicate-detection. This optional parameter determines whether the bridge will automatically insert a duplicate id property into each message that it forwards.

    Doing so, allows the target server to perform duplicate detection on messages it receives from the source server. If the connection fails or server crashes, then, when the bridge resumes it will resend unacknowledged messages. This might result in duplicate messages being sent to the target server. By enabling duplicate detection allows these duplicates to be screened out and ignored.

    This allows the bridge to provide a once and only once delivery guarantee without using heavyweight methods such as XA (see Chapter 37, Duplicate Message Detection for more information).

    The default value for this parameter is true.

  • confirmation-window-size. This optional parameter determines the confirmation-window-size to use for the connection used to forward messages to the target node. This attribute is described in section Chapter 34, Client Reconnection and Session Reattachment

    Warning

    When using the bridge to forward messages from a queue which has a max-size-bytes set it's important that confirmation-window-size is less than or equal to max-size-bytes to prevent the flow of messages from ceasing.

  • connector-ref. This mandatory parameter determines which connector pair the bridge will use to actually make the connection to the target server.

    A connector encapsulates knowledge of what transport to use (TCP, SSL, HTTP etc) as well as the server connection parameters (host, port etc). For more information about what connectors are and how to configure them, please see Chapter 16, Configuring the Transport.

    The connector-ref element can be configured with two attributes:

    • connector-name. This references the name of a connector defined in the core configuration file hornetq-configuration.xml. The bridge will use this connector to make its connection to the target server. This attribute is mandatory.

    • backup-connector-name. This optional parameter also references the name of a connector defined in the core configuration file hornetq-configuration.xml. It represents the connector that the bridge will fail-over onto if it detects the live server connection has failed. If this is specified and failover-on-server-shutdown is set to true then it will also attempt failover onto this connector if the live target server is cleanly shut-down.

  • user. This optional parameter determines the user name to use when creating the bridge connection to the remote server. If it is not specified the default cluster user specified by cluster-user in hornetq-configuration.xml will be used.

  • password. This optional parameter determines the password to use when creating the bridge connection to the remote server. If it is not specified the default cluster password specified by cluster-password in hornetq-configuration.xml will be used.

HornetQ includes powerful automatic duplicate message detection, filtering out duplicate messages without you having to code your own fiddly duplicate detection logic at the application level. This chapter will explain what duplicate detection is, how HornetQ uses it and how and where to configure it.

When sending messages from a client to a server, or indeed from a server to another server, if the target server or connection fails sometime after sending the message, but before the sender receives a response that the send (or commit) was processed successfully then the sender cannot know for sure if the message was sent successfully to the address.

If the target server or connection failed after the send was received and processed but before the response was sent back then the message will have been sent to the address successfully, but if the target server or connection failed before the send was received and finished processing then it will not have been sent to the address successfully. From the senders point of view it's not possible to distinguish these two cases.

When the server recovers this leaves the client in a difficult situation. It knows the target server failed, but it does not know if the last message reached its destination ok. If it decides to resend the last message, then that could result in a duplicate message being sent to the address. If each message was an order or a trade then this could result in the order being fulfilled twice or the trade being double booked. This is clearly not a desirable situation.

Sending the message(s) in a transaction does not help out either. If the server or connection fails while the transaction commit is being processed it is also indeterminate whether the transaction was successfully committed or not!

To solve these issues HornetQ provides automatic duplicate messages detection for messages sent to addresses.

Enabling duplicate message detection for sent messages is simple: you just need to set a special property on the message to a unique value. You can create the value however you like, as long as it is unique. When the target server receives the message it will check if that property is set, if it is, then it will check in its in memory cache if it has already received a message with that value of the header. If it has received a message with the same value before then it will ignore the message.

If you're sending messages in a transaction then you don't have to set the property for every message you send in that transaction, you only need to set it once in the transaction. If the server detects a duplicate message for any message in the transaction, then it will ignore the entire transaction.

The name of the property that you set is given by the value of org.hornetq.api.core.HDR_DUPLICATE_DETECTION_ID, which is _HQ_DUPL_ID

The value of the property can be of type byte[] or SimpleString if you're using the core API. If you're using JMS it must be a String, and its value should be unique. An easy way of generating a unique id is by generating a UUID.

Here's an example of setting the property using the core API:

...     

ClientMessage message = session.createMessage(true);

SimpleString myUniqueID = "This is my unique id";   // Could use a UUID for this

message.setStringProperty(HDR_DUPLICATE_DETECTION_ID, myUniqueID);

...

And here's an example using the JMS API:

...     

Message jmsMessage = session.createMessage();

String myUniqueID = "This is my unique id";   // Could use a UUID for this

message.setStringProperty(HDR_DUPLICATE_DETECTION_ID.toString(), myUniqueID);

...

HornetQ clusters allow groups of HornetQ servers to be grouped together in order to share message processing load. Each active node in the cluster is an active HornetQ server which manages its own messages and handles its own connections.

The cluster is formed by each node declaring cluster connections to other nodes in the core configuration file hornetq-configuration.xml. When a node forms a cluster connection to another node, internally it creates a core bridge (as described in Chapter 36, Core Bridges) connection between it and the other node, this is done transparently behind the scenes - you don't have to declare an explicit bridge for each node. These cluster connections allow messages to flow between the nodes of the cluster to balance load.

Nodes can be connected together to form a cluster in many different topologies, we will discuss a couple of the more common topologies later in this chapter.

We'll also discuss client side load balancing, where we can balance client connections across the nodes of the cluster, and we'll consider message redistribution where HornetQ will redistribute messages between nodes to avoid starvation.

Another important part of clustering is server discovery where servers can broadcast their connection details so clients or other servers can connect to them with the minimum of configuration.

Server discovery is a mechanism by which servers can propagate their connection details to:

This information, lets call it the Cluster Topology, is actually sent around normal HornetQ connections to clients and to other servers over cluster connections. This being the case we need a way of establishing the initial first connection. This can be done using dynamic discovery techniques like UDP and JGroups, or by providing a list of initial connectors.

Server discovery uses UDP multicast or JGroups to broadcast server connection settings.

A broadcast group is the means by which a server broadcasts connectors over the network. A connector defines a way in which a client (or other server) can make connections to the server. For more information on what a connector is, please see Chapter 16, Configuring the Transport.

The broadcast group takes a set of connector pairs, each connector pair contains connection settings for a live and backup server (if one exists) and broadcasts them on the network. Depending on which broadcasting technique you configure the cluster, it uses either UDP or JGroups to broadcast connector pairs information.

Broadcast groups are defined in the server configuration file hornetq-configuration.xml. There can be many broadcast groups per HornetQ server. All broadcast groups must be defined in a broadcast-groups element.

Let's take a look at an example broadcast group from hornetq-configuration.xml that defines a UDP broadcast group:

<broadcast-groups>
   <broadcast-group name="my-broadcast-group">
      <local-bind-address>172.16.9.3</local-bind-address>
      <local-bind-port>5432</local-bind-port>
      <group-address>231.7.7.7</group-address>
      <group-port>9876</group-port>
      <broadcast-period>2000</broadcast-period>
      <connector-ref connector-name="netty-connector"/>
   </broadcast-group>
</broadcast-groups>

Some of the broadcast group parameters are optional and you'll normally use the defaults, but we specify them all in the above example for clarity. Let's discuss each one in turn:

  • name attribute. Each broadcast group in the server must have a unique name.

  • local-bind-address. This is the local bind address that the datagram socket is bound to. If you have multiple network interfaces on your server, you would specify which one you wish to use for broadcasts by setting this property. If this property is not specified then the socket will be bound to the wildcard address, an IP address chosen by the kernel. This is a UDP specific attribute.

  • local-bind-port. If you want to specify a local port to which the datagram socket is bound you can specify it here. Normally you would just use the default value of -1 which signifies that an anonymous port should be used. This parameter is alawys specified in conjunction with local-bind-address. This is a UDP specific attribute.

  • group-address. This is the multicast address to which the data will be broadcast. It is a class D IP address in the range 224.0.0.0 to 239.255.255.255, inclusive. The address 224.0.0.0 is reserved and is not available for use. This parameter is mandatory. This is a UDP specific attribute.

  • group-port. This is the UDP port number used for broadcasting. This parameter is mandatory. This is a UDP specific attribute.

  • broadcast-period. This is the period in milliseconds between consecutive broadcasts. This parameter is optional, the default value is 2000 milliseconds.

  • connector-ref. This specifies the connector and optional backup connector that will be broadcasted (see Chapter 16, Configuring the Transport for more information on connectors). The connector to be broadcasted is specified by the connector-name attribute.

Here is another example broadcast group that defines a JGroups broadcast group:

<broadcast-groups>
   <broadcast-group name="my-broadcast-group">
      <jgroups-file>test-jgroups-file_ping.xml</jgroups-file>
      <jgroups-channel>hornetq_broadcast_channel</jgroups-channel>
      <broadcast-period>2000</broadcast-period>
    <connector-ref connector-name="netty-connector"/>
   </broadcast-group>
</broadcast-groups>

To be able to use JGroups to broadcast, one must specify two attributes, i.e. jgroups-file and jgroups-channel, as discussed in details as following:

  • jgroups-file attribute. This is the name of JGroups configuration file. It will be used to initialize JGroups channels. Make sure the file is in the java resource path so that HornetQ can load it.

  • jgroups-channel attribute. The name that JGroups channels connect to for broadcasting.

Note

The JGroups attributes (jgroups-file and jgroups-channel) and UDP specific attributes described above are exclusive of each other. Only one set can be specified in a broadcast group configuration. Don't mix them!

The following is an example of a JGroups file

<config xmlns="urn:org:jgroups"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/JGroups-3.0.xsd">
   <TCP loopback="true"
      recv_buf_size="20000000"
      send_buf_size="640000"
      discard_incompatible_packets="true"
      max_bundle_size="64000"
      max_bundle_timeout="30"
      enable_bundling="true"
      use_send_queues="false"
      sock_conn_timeout="300"

      thread_pool.enabled="true"
      thread_pool.min_threads="1"
      thread_pool.max_threads="10"
      thread_pool.keep_alive_time="5000"
      thread_pool.queue_enabled="false"
      thread_pool.queue_max_size="100"
      thread_pool.rejection_policy="run"

      oob_thread_pool.enabled="true"
      oob_thread_pool.min_threads="1"
      oob_thread_pool.max_threads="8"
      oob_thread_pool.keep_alive_time="5000"
      oob_thread_pool.queue_enabled="false"
      oob_thread_pool.queue_max_size="100"
      oob_thread_pool.rejection_policy="run"/>

   <FILE_PING location="../file.ping.dir"/>
   <MERGE2 max_interval="30000"
      min_interval="10000"/>
   <FD_SOCK/>
   <FD timeout="10000" max_tries="5" />
   <VERIFY_SUSPECT timeout="1500"  />
   <BARRIER />
   <pbcast.NAKACK
      use_mcast_xmit="false"
      retransmit_timeout="300,600,1200,2400,4800"
      discard_delivered_msgs="true"/>
   <UNICAST timeout="300,600,1200" />
   <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
      max_bytes="400000"/>
   <pbcast.GMS print_local_addr="true" join_timeout="3000"
      view_bundling="true"/>
   <FC max_credits="2000000"
      min_threshold="0.10"/>
   <FRAG2 frag_size="60000"  />
   <pbcast.STATE_TRANSFER/>
   <pbcast.FLUSH timeout="0"/>
</config>

As it shows, the file content defines a jgroups protocol stacks. If you want hornetq to use this stacks for channel creation, you have to make sure the value of jgroups-file in your broadcast-group/discovery-group configuration to be the name of this jgroups configuration file. For example if the above stacks configuration is stored in a file named "jgroups-stacks.xml" then your jgroups-file should be like

<jgroups-file>jgroups-stacks.xml</jgroups-file>

For cluster connections, discovery groups are defined in the server side configuration file hornetq-configuration.xml. All discovery groups must be defined inside a discovery-groups element. There can be many discovery groups defined by HornetQ server. Let's look at an example:

<discovery-groups>
   <discovery-group name="my-discovery-group">
      <local-bind-address>172.16.9.7</local-bind-address>
      <group-address>231.7.7.7</group-address>
      <group-port>9876</group-port>
      <refresh-timeout>10000</refresh-timeout>
   </discovery-group>
</discovery-groups>

We'll consider each parameter of the discovery group:

Here is another example that defines a JGroups discovery group:

<discovery-groups>
   <discovery-group name="my-broadcast-group">
      <jgroups-file>test-jgroups-file_ping.xml</jgroups-file>
      <jgroups-channel>hornetq_broadcast_channel</jgroups-channel>
      <refresh-timeout>10000</refresh-timeout>
   </discovery-group>
</discovery-groups>

To receive broadcast from JGroups channels, one must specify two attributes, jgroups-file and jgroups-channel, as discussed in details as following:

Let's discuss how to configure a HornetQ client to use discovery to discover a list of servers to which it can connect. The way to do this differs depending on whether you're using JMS or the core API.

If you're using JMS and you're also using the JMS Service on the server to load your JMS connection factory instances into JNDI, then you can specify which discovery group to use for your JMS connection factory in the server side xml configuration hornetq-jms.xml. Let's take a look at an example:

<connection-factory name="ConnectionFactory">
   <discovery-group-ref discovery-group-name="my-discovery-group"/>
   <entries>
      <entry name="ConnectionFactory"/>
   </entries>
</connection-factory>

The element discovery-group-ref specifies the name of a discovery group defined in hornetq-configuration.xml.

When this connection factory is downloaded from JNDI by a client application and JMS connections are created from it, those connections will be load-balanced across the list of servers that the discovery group maintains by listening on the multicast address specified in the discovery group configuration.

If you're using JMS, but you're not using JNDI to lookup a connection factory - you're instantiating the JMS connection factory directly then you can specify the discovery group parameters directly when creating the JMS connection factory. Here's an example:

final String groupAddress = "231.7.7.7";

final int groupPort = 9876;

ConnectionFactory jmsConnectionFactory =
HornetQJMSClient.createConnectionFactory(new DiscoveryGroupConfiguration(groupAddress, groupPort), , JMSFactoryType.CF);

Connection jmsConnection1 = jmsConnectionFactory.createConnection();

Connection jmsConnection2 = jmsConnectionFactory.createConnection();

The refresh-timeout can be set directly on the DiscoveryGroupConfiguration by using the setter method setDiscoveryRefreshTimeout() if you want to change the default value.

There is also a further parameter settable on the DiscoveryGroupConfiguration using the setter method setDiscoveryInitialWaitTimeout(). If the connection factory is used immediately after creation then it may not have had enough time to received broadcasts from all the nodes in the cluster. On first usage, the connection factory will make sure it waits this long since creation before creating the first connection. The default value for this parameter is 10000 milliseconds.

Sometimes it may be impossible to use UDP on the network you are using. In this case its possible to configure a connection with an initial list if possible servers. This could be just one server that you know will always be available or a list of servers where at least one will be available.

This doesn't mean that you have to know where all your servers are going to be hosted, you can configure these servers to use the reliable servers to connect to. Once they are connected there connection details will be propagated via the server it connects to

A static list of possible servers can also be used by a normal client.

If you're using JMS and you're also using the JMS Service on the server to load your JMS connection factory instances into JNDI, then you can specify which connectors to use for your JMS connection factory in the server side xml configuration hornetq-jms.xml. Let's take a look at an example:

<connection-factory name="ConnectionFactory">
   <connectors>
      <connector-ref connector-name="netty-connector"/>
      <connector-ref connector-name="netty-connector2"/>
      <connector-ref connector-name="netty-connector3"/>
   </connectors>
   <entries>
      <entry name="ConnectionFactory"/>
   </entries>
</connection-factory>

The element connectors contains a list of pre defined connectors in the hornetq-configuration.xml file. When this connection factory is downloaded from JNDI by a client application and JMS connections are created from it, those connections will be load-balanced across the list of servers defined by these connectors.

If you're using JMS, but you're not using JNDI to lookup a connection factory - you're instantiating the JMS connection factory directly then you can specify the connector list directly when creating the JMS connection factory. Here's an example:

HashMap<String, Object> map = new HashMap<String, Object>();
map.put("host", "myhost");
map.put("port", "5445");
TransportConfiguration server1 = new TransportConfiguration(NettyConnectorFactory.class.getName(), map);
HashMap<String, Object> map2 = new HashMap<String, Object>();
map2.put("host", "myhost2");
map2.put("port", "5446");
TransportConfiguration server2 = new TransportConfiguration(NettyConnectorFactory.class.getName(), map2);

HornetQConnectionFactory cf = HornetQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.CF, server1, server2);

If cluster connections are defined between nodes of a cluster, then HornetQ will load balance messages arriving at a particular node from a client.

Let's take a simple example of a cluster of four nodes A, B, C, and D arranged in a symmetric cluster (described in Section 38.7.1, “Symmetric cluster”). We have a queue called OrderQueue deployed on each node of the cluster.

We have client Ca connected to node A, sending orders to the server. We have also have order processor clients Pa, Pb, Pc, and Pd connected to each of the nodes A, B, C, D. If no cluster connection was defined on node A, then as order messages arrive on node A they will all end up in the OrderQueue on node A, so will only get consumed by the order processor client attached to node A, Pa.

If we define a cluster connection on node A, then as ordered messages arrive on node A instead of all of them going into the local OrderQueue instance, they are distributed in a round-robin fashion between all the nodes of the cluster. The messages are forwarded from the receiving node to other nodes of the cluster. This is all done on the server side, the client maintains a single connection to node A.

For example, messages arriving on node A might be distributed in the following order between the nodes: B, D, C, A, B, D, C, A, B, D. The exact order depends on the order the nodes started up, but the algorithm used is round robin.

HornetQ cluster connections can be configured to always blindly load balance messages in a round robin fashion irrespective of whether there are any matching consumers on other nodes, but they can be a bit cleverer than that and also be configured to only distribute to other nodes if they have matching consumers. We'll look at both these cases in turn with some examples, but first we'll discuss configuring cluster connections in general.

Cluster connections group servers into clusters so that messages can be load balanced between the nodes of the cluster. Let's take a look at a typical cluster connection. Cluster connections are always defined in hornetq-configuration.xml inside a cluster-connection element. There can be zero or more cluster connections defined per HornetQ server.

<cluster-connections>
   <cluster-connection name="my-cluster">
      <address>jms</address>
      <retry-interval>500</retry-interval>
      <use-duplicate-detection>true</use-duplicate-detection>
      <forward-when-no-consumers>false</forward-when-no-consumers>
      <max-hops>1</max-hops>
      <discovery-group-ref discovery-group-name="my-discovery-group"/>
   </cluster-connection>
</cluster-connections>

In the above cluster connection all parameters have been explicitly specified. The following shows all the available configuration options

Alternatively if you would like your cluster connections to use a static list of servers for discovery then you can do it like this.

<cluster-connection name="my-cluster">
   <address>jms</address>
   <connector-ref>netty-connector</connector-ref>
   <retry-interval>500</retry-interval>
   <use-duplicate-detection>true</use-duplicate-detection>
   <forward-when-no-consumers>true</forward-when-no-consumers>
   <max-hops>1</max-hops>
   <static-connectors>
      <connector-ref>server0-connector</connector-ref>
      <connector-ref>server1-connector</connector-ref>
   </static-connectors>
</cluster-connection>

Here we have defined 2 servers that we know for sure will that at least one will be available. There may be many more servers in the cluster but these will; be discovered via one of these connectors once an initial connection has been made.

With HornetQ client-side load balancing, subsequent sessions created using a single session factory can be connected to different nodes of the cluster. This allows sessions to spread smoothly across the nodes of a cluster and not be "clumped" on any particular node.

The load balancing policy to be used by the client factory is configurable. HornetQ provides four out-of-the-box load balancing policies, and you can also implement your own and use that.

The out-of-the-box policies are

You can also implement your own policy by implementing the interface org.hornetq.api.core.client.loadbalance.ConnectionLoadBalancingPolicy

Specifying which load balancing policy to use differs whether you are using JMS or the core API. If you don't specify a policy then the default will be used which is org.hornetq.api.core.client.loadbalance.RoundRobinConnectionLoadBalancingPolicy.

If you're using JMS, and you're using JNDI on the server to put your JMS connection factories into JNDI, then you can specify the load balancing policy directly in the hornetq-jms.xml configuration file on the server as follows:

<connection-factory name="ConnectionFactory">
   <discovery-group-ref discovery-group-name="my-discovery-group"/>
   <entries>
      <entry name="ConnectionFactory"/>
   </entries>
   <connection-load-balancing-policy-class-name>
      org.hornetq.api.core.client.loadbalance.RandomConnectionLoadBalancingPolicy
   </connection-load-balancing-policy-class-name>
</connection-factory>

The above example would deploy a JMS connection factory that uses the random connection load balancing policy.

If you're using JMS but you're instantiating your connection factory directly on the client side then you can set the load balancing policy using the setter on the HornetQConnectionFactory before using it:

ConnectionFactory jmsConnectionFactory = HornetQJMSClient.createConnectionFactory(...);
jmsConnectionFactory.setLoadBalancingPolicyClassName("com.acme.MyLoadBalancingPolicy");

If you're using the core API, you can set the load balancing policy directly on the ServerLocator instance you are using:

ServerLocator locator = HornetQClient.createServerLocatorWithHA(server1, server2);
locator.setLoadBalancingPolicyClassName("com.acme.MyLoadBalancingPolicy");

The set of servers over which the factory load balances can be determined in one of two ways:

Another important part of clustering is message redistribution. Earlier we learned how server side message load balancing round robins messages across the cluster. If forward-when-no-consumers is false, then messages won't be forwarded to nodes which don't have matching consumers, this is great and ensures that messages don't arrive on a queue which has no consumers to consume them, however there is a situation it doesn't solve: What happens if the consumers on a queue close after the messages have been sent to the node? If there are no consumers on the queue the message won't get consumed and we have a starvation situation.

This is where message redistribution comes in. With message redistribution HornetQ can be configured to automatically redistribute messages from queues which have no consumers back to other nodes in the cluster which do have matching consumers.

Message redistribution can be configured to kick in immediately after the last consumer on a queue is closed, or to wait a configurable delay after the last consumer on a queue is closed before redistributing. By default message redistribution is disabled.

Message redistribution can be configured on a per address basis, by specifying the redistribution delay in the address settings, for more information on configuring address settings, please see Chapter 25, Queue Attributes.

Here's an address settings snippet from hornetq-configuration.xml showing how message redistribution is enabled for a set of queues:

<address-settings>
   <address-setting match="jms.#">
      <redistribution-delay>0</redistribution-delay>
   </address-setting>
</address-settings>

The above address-settings block would set a redistribution-delay of 0 for any queue which is bound to an address that starts with "jms.". All JMS queues and topic subscriptions are bound to addresses that start with "jms.", so the above would enable instant (no delay) redistribution for all JMS queues and topic subscriptions.

The attribute match can be an exact match or it can be a string that conforms to the HornetQ wildcard syntax (described in Chapter 13, Understanding the HornetQ Wildcard Syntax).

The element redistribution-delay defines the delay in milliseconds after the last consumer is closed on a queue before redistributing messages from that queue to other nodes of the cluster which do have matching consumers. A delay of zero means the messages will be immediately redistributed. A value of -1 signifies that messages will never be redistributed. The default value is -1.

It often makes sense to introduce a delay before redistributing as it's a common case that a consumer closes but another one quickly is created on the same queue, in such a case you probably don't want to redistribute immediately since the new consumer will arrive shortly.

HornetQ clusters can be connected together in many different topologies, let's consider the two most common ones here

With a chain cluster, each node in the cluster is not connected to every node in the cluster directly, instead the nodes form a chain with a node on each end of the chain and all other nodes just connecting to the previous and next nodes in the chain.

An example of this would be a three node chain consisting of nodes A, B and C. Node A is hosted in one network and has many producer clients connected to it sending order messages. Due to corporate policy, the order consumer clients need to be hosted in a different network, and that network is only accessible via a third network. In this setup node B acts as a mediator with no producers or consumers on it. Any messages arriving on node A will be forwarded to node B, which will in turn forward them to node C where they can get consumed. Node A does not need to directly connect to C, but all the nodes can still act as a part of the cluster.

To set up a cluster in this way, node A would define a cluster connection that connects to node B, and node B would define a cluster connection that connects to node C. In this case we only want cluster connections in one direction since we're only moving messages from node A->B->C and never from C->B->A.

For this topology we would set max-hops to 2. With a value of 2 the knowledge of what queues and consumers that exist on node C would be propagated from node C to node B to node A. Node A would then know to distribute messages to node B when they arrive, even though node B has no consumers itself, it would know that a further hop away is node C which does have consumers.

We define high availability as the ability for the system to continue functioning after failure of one or more of the servers.

A part of high availability is failover which we define as the ability for client connections to migrate from one server to another in event of server failure so client applications can continue to operate.

HornetQ allows servers to be linked together as live - backup groups where each live server can have 1 or more backup servers. A backup server is owned by only one live server. Backup servers are not operational until failover occurs, however 1 chosen backup, which will be in passive mode, announces its status and waits to take over the live servers work

Before failover, only the live server is serving the HornetQ clients while the backup servers remain passive or awaiting to become a backup server. When a live server crashes or is brought down in the correct mode, the backup server currently in passive mode will become live and another backup server will become passive. If a live server restarts after a failover then it will have priority and be the next server to become live when the current live server goes down, if the current live server is configured to allow automatic failback then it will detect the live server coming back up and automatically stop.

Replication is supported since version 2.3.

When using replication, the live and the backup servers do not share the same data directories, all data synchronization is done through network traffic. Therefore all (persistent) data traffic received by the live server will be duplicated to the backup.

Notice that upon start-up the backup server will first need to synchronize all existing data from the live server, before becoming capable of replacing the live server should it fail. So unlike the shared store case, a replicating backup will not be a fully operational backup right after start, but only after it finishes synchronizing the data. The time it will take for this to happen will depend on the amount of data to be synchronized and the connection speed.

The replicating live and backup pair must be part of a cluster, meaning that even tho you may have a single live/backup it is still regarded as a cluster and must have a cluster connection configured in both the live and tha backup's for the same address. Also all servers must be on the same cluster, and have the same cluster user and password.

There are two ways that a backup server will locate a live server to replicate from, these are:

The backup will search for any live server that it is configured to connect to. It then tries to replicate with each live server in turn until it finds a live server that has no current backup configured. If no live server is available it will wait until the cluster topology changes and repeats the process.

Much like in the shared-store case, when the live server stops or crashes, its replicating backup will become active and take over its duties. Specifically, the backup will become active when it loses connection to its live server. This can be problematic because this can also happen because of a temporary network problem. In order to address this issue, the backup will try to determine whether it still can connect to the other servers in the cluster. If it can connect to more than half the servers, it will become active, if more than half the servers also disappeared with the live, the backup will wait and try reconnecting with the live. This avoids a split brain situation.

When using a shared store, both live and backup servers share the same entire data directory using a shared file system. This means the paging directory, journal directory, large messages and binding journal.

When failover occurs and a backup server takes over, it will load the persistent storage from the shared file system and clients can connect to it.

This style of high availability differs from data replication in that it requires a shared file system which is accessible by both the live and backup nodes. Typically this will be some kind of high performance Storage Area Network (SAN). We do not recommend you use Network Attached Storage (NAS), e.g. NFS mounts to store any shared journal (NFS is slow).

The advantage of shared-store high availability is that no replication occurs between the live and backup nodes, this means it does not suffer any performance penalties due to the overhead of replication during normal operation.

The disadvantage of shared store replication is that it requires a shared file system, and when the backup server activates it needs to load the journal from the shared store which can take some time depending on the amount of data in the store.

If you require the highest performance during normal operation, have access to a fast SAN, and can live with a slightly slower failover (depending on amount of data), we recommend shared store high availability

After a live server has failed and a backup taken has taken over its duties, you may want to restart the live server and have clients fail back. To do this simply restart the original live server and kill the new live server. You can do this by killing the process itself or just waiting for the server to crash naturally

It is also possible to cause failover to occur on normal server shutdown, to enable this set the following property to true in the hornetq-configuration.xml configuration file like so:

<failover-on-shutdown>true</failover-on-shutdown>

By default this is set to false, if by some chance you have set this to false but still want to stop the server normally and cause failover then you can do this by using the management API as explained at Section 30.1.1.1, “Core Server Management”

You can also force the new live server to shutdown when the old live server comes back up allowing the original live server to take over automatically by setting the following property in the hornetq-configuration.xml configuration file as follows:

<allow-failback>true</allow-failback>

In replication HA mode you need to set an extra property check-for-live-server to true in order to force the new live server to shutdown when the old live server comes back up in hornetq-configuration.xml configuration file as follows:

<check-for-live-server>true</check-for-live-server>

HornetQ defines two types of client failover:

HornetQ also provides 100% transparent automatic reattachment of connections to the same server (e.g. in case of transient network problems). This is similar to failover, except it's reconnecting to the same server and is discussed in Chapter 34, Client Reconnection and Session Reattachment

During failover, if the client has consumers on any non persistent or temporary queues, those queues will be automatically recreated during failover on the backup node, since the backup node will not have any knowledge of non persistent queues.

HornetQ clients can be configured to receive knowledge of all live and backup servers, so that in event of connection failure at the client - live server connection, the client will detect this and reconnect to the backup server. The backup server will then automatically recreate any sessions and consumers that existed on each connection before failover, thus saving the user from having to hand-code manual reconnection logic.

HornetQ clients detect connection failure when it has not received packets from the server within the time given by client-failure-check-period as explained in section Chapter 17, Detecting Dead Connections. If the client does not receive data in good time, it will assume the connection has failed and attempt failover. Also if the socket is closed by the OS, usually if the server process is killed rather than the machine itself crashing, then the client will failover straight away.

HornetQ clients can be configured to discover the list of live-backup server groups in a number of different ways. They can be configured explicitly or probably the most common way of doing this is to use server discovery for the client to automatically discover the list. For full details on how to configure server discovery, please see Chapter 38, Clusters. Alternatively, the clients can explicitly connect to a specific server and download the current servers and backups see Chapter 38, Clusters.

To enable automatic client failover, the client must be configured to allow non-zero reconnection attempts (as explained in Chapter 34, Client Reconnection and Session Reattachment).

By default failover will only occur after at least one connection has been made to the live server. In other words, by default, failover will not occur if the client fails to make an initial connection to the live server - in this case it will simply retry connecting to the live server according to the reconnect-attempts property and fail after this number of attempts.

For examples of automatic failover with transacted and non-transacted JMS sessions, please see Section 11.1.67, “Transaction Failover” and Section 11.1.41, “Non-Transaction Failover With Server Data Replication”.

HornetQ does not replicate full server state between live and backup servers. When the new session is automatically recreated on the backup it won't have any knowledge of messages already sent or acknowledged in that session. Any in-flight sends or acknowledgements at the time of failover might also be lost.

By replicating full server state, theoretically we could provide a 100% transparent seamless failover, which would avoid any lost messages or acknowledgements, however this comes at a great cost: replicating the full server state (including the queues, session, etc.). This would require replication of the entire server state machine; every operation on the live server would have to replicated on the replica server(s) in the exact same global order to ensure a consistent replica state. This is extremely hard to do in a performant and scalable way, especially when one considers that multiple threads are changing the live server state concurrently.

It is possible to provide full state machine replication using techniques such as virtual synchrony, but this does not scale well and effectively serializes all operations to a single thread, dramatically reducing concurrency.

Other techniques for multi-threaded active replication exist such as replicating lock states or replicating thread scheduling but this is very hard to achieve at a Java level.

Consequently it has decided it was not worth massively reducing performance and concurrency for the sake of 100% transparent failover. Even without 100% transparent failover, it is simple to guarantee once and only once delivery, even in the case of failure, by using a combination of duplicate detection and retrying of transactions. However this is not 100% transparent to the client code.

If the session is transactional and messages have already been sent or acknowledged in the current transaction, then the server cannot be sure that messages sent or acknowledgements have not been lost during the failover.

Consequently the transaction will be marked as rollback-only, and any subsequent attempt to commit it will throw a javax.jms.TransactionRolledBackException (if using JMS), or a HornetQException with error code HornetQException.TRANSACTION_ROLLED_BACK if using the core API.

It is up to the user to catch the exception, and perform any client side local rollback code as necessary. There is no need to manually rollback the session - it is already rolled back. The user can then just retry the transactional operations again on the same session.

HornetQ ships with a fully functioning example demonstrating how to do this, please see Section 11.1.67, “Transaction Failover”

If failover occurs when a commit call is being executed, the server, as previously described, will unblock the call to prevent a hang, since no response will come back. In this case it is not easy for the client to determine whether the transaction commit was actually processed on the live server before failure occurred.

Note

If XA is being used either via JMS or through the core API then an XAException.XA_RETRY is thrown. This is to inform Transaction Managers that a retry should occur at some point. At some later point in time the Transaction Manager will retry the commit. If the original commit hadn't occurred then it will still exist and be committed, if it doesn't exist then it is assumed to have been committed although the transaction manager may log a warning.

To remedy this, the client can simply enable duplicate detection (Chapter 37, Duplicate Message Detection) in the transaction, and retry the transaction operations again after the call is unblocked. If the transaction had indeed been committed on the live server successfully before failover, then when the transaction is retried, duplicate detection will ensure that any durable messages resent in the transaction will be ignored on the server to prevent them getting sent more than once.

Note

By catching the rollback exceptions and retrying, catching unblocked calls and enabling duplicate detection, once and only once delivery guarantees for messages can be provided in the case of failure, guaranteeing 100% no loss or duplication of messages.

HornetQ distributes a native library, used as a bridge between HornetQ and linux libaio.

libaio is a library, developed as part of the linux kernel project. With libaio we submit writes to the operating system where they are processed asynchronously. Some time later the OS will call our code back when they have been processed.

We use this in our high performance journal if configured to do so, please see Chapter 15, Persistence.

These are the native libraries distributed by HornetQ:

  • libHornetQAIO32.so - x86 32 bits

  • libHornetQAIO64.so - x86 64 bits

When using libaio, HornetQ will always try loading these files as long as they are on the library path.

In the case that you are using Linux on a platform other than x86_32 or x86_64 (for example Itanium 64 bits or IBM Power) you may need to compile the native library, since we do not distribute binaries for those platforms with the release.

The native library uses autoconf what makes the compilation process easy, however you need to install extra packages as a requirement for compilation:

  • gcc - C Compiler

  • gcc-c++ or g++ - Extension to gcc with support for C++

  • autoconf - Tool for automating native build process

  • make - Plain old make

  • automake - Tool for automating make generation

  • libtool - Tool for link editing native libraries

  • libaio - library to disk asynchronous IO kernel functions

  • libaio-dev - Compilation support for libaio

  • A full JDK installed with the environment variable JAVA_HOME set to its location

To perform this installation on RHEL or Fedora, you can simply type this at a command line:

sudo yum install automake libtool autoconf gcc-c++ gcc libaio libaio-devel make

Or on debian systems:

sudo apt-get install automake libtool autoconf gcc-g++ gcc libaio libaio-dev make

Note

You could find a slight variation of the package names depending on the version and linux distribution. (for example gcc-c++ on Fedora versus g++ on Debian systems)

This chapter describes how HornetQ uses and pools threads and how you can manage them.

First we'll discuss how threads are managed and used on the server side, then we'll look at the client side.

Each HornetQ Server maintains a single thread pool for general use, and a scheduled thread pool for scheduled use. A Java scheduled thread pool cannot be configured to use a standard thread pool, otherwise we could use a single thread pool for both scheduled and non scheduled activity.

When using old (blocking) IO, a separate thread pool is also used to service connections. Since old IO requires a thread per connection it does not make sense to get them from the standard pool as the pool will easily get exhausted if too many connections are made, resulting in the server "hanging" since it has no remaining threads to do anything else. If you require the server to handle many concurrent connections you should make sure you use NIO, not old IO.

When using new IO (NIO), HornetQ will, by default, use a number of threads equal to three times the number of cores (or hyper-threads) as reported by Runtime.getRuntime().availableProcessors() for processing incoming packets. If you want to override this value, you can set the number of threads by specifying the parameter nio-remoting-threads in the transport configuration. See the Chapter 16, Configuring the Transport for more information on this.

There are also a small number of other places where threads are used directly, we'll discuss each in turn.

On the client side, HornetQ maintains a single static scheduled thread pool and a single static general thread pool for use by all clients using the same classloader in that JVM instance.

The static scheduled thread pool has a maximum size of 5 threads, and the general purpose thread pool has an unbounded maximum size.

If required HornetQ can also be configured so that each ClientSessionFactory instance does not use these static pools but instead maintains its own scheduled and general purpose pool. Any sessions created from that ClientSessionFactory will use those pools instead.

To configure a ClientSessionFactory instance to use its own pools, simply use the appropriate setter methods immediately after creation, for example:

ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(...)
ClientSessionFactory myFactory = locator.createClientSessionFactory();
myFactory.setUseGlobalPools(false);
myFactory.setScheduledThreadPoolMaxSize(10);
myFactory.setThreadPoolMaxSize(-1);   

If you're using the JMS API, you can set the same parameters on the ClientSessionFactory and use it to create the ConnectionFactory instance, for example:

ConnectionFactory myConnectionFactory = HornetQJMSClient.createConnectionFactory(myFactory);

If you're using JNDI to instantiate HornetQConnectionFactory instances, you can also set these parameters in the hornetq-jms.xml file where you describe your connection factory, for example:

<connection-factory name="ConnectionFactory">
   <connectors>
      <connector-ref connector-name="netty"/>
   </connectors>
   <entries>
      <entry name="ConnectionFactory"/>
      <entry name="XAConnectionFactory"/>
   </entries>
   <use-global-pools>false</use-global-pools>
   <scheduled-thread-pool-max-size>10</scheduled-thread-pool-max-size>
   <thread-pool-max-size>-1</thread-pool-max-size>
</connection-factory>

HornetQ uses the JBoss Logging framework to do its logging and is configurable via the logging.properties file found in the configuration directories. This is configured by Default to log to both the console and to a file.

There are 6 loggers availabe which are as follows:


Firstly, if you want to enable logging on the client side you need to include the jboss logging jars in your library. If you are using the distribution make sure the jnp-client.jar is included or if you are using maven add the following dependencies.

<dependency>
<groupId>org.jboss.naming</groupId>
<artifactId>jnp-client</artifactId>
<version>5.0.5.Final</version>
   <exclusions>
      <exclusion>
         <groupId>org.jboss.logging</groupId>
         <artifactId>jboss-logging-spi</artifactId>
      </exclusion>
   </exclusions>
</dependency>
<dependency>
   <groupId>org.jboss.logmanager</groupId>
   <artifactId>jboss-logmanager</artifactId>
   <version>1.3.1.Final</version>
</dependency>
<dependency>
   <groupId>org.hornetq</groupId>
   <artifactId>hornetq-core-client</artifactId>
   <version>2.3.0.Final</version>
</dependency>

The first dependency jnp-client is not actually needed for logging, however this is needed for using JNDI and imports a previous version JBoss logging which needs to be excluded

There are 2 properties you need to set when starting your java program, the first is to set the Log Manager to use the JBoss Log Manager, this is done by setting the -Djava.util.logging.manager property i.e. -Djava.util.logging.manager=org.jboss.logmanager.LogManager

The second is to set the location of the logging.properties file to use, this is done via the -Dlogging.configuration for instance -Dlogging.configuration=file:///home/user/projects/myProject/logging.properties.

The following is a typical logging.properties for a client

# Root logger option
loggers=org.jboss.logging,org.hornetq.core.server,org.hornetq.utils,org.hornetq.journal,org.hornetq.jms,org.hornetq.ra

# Root logger level
logger.level=INFO
# HornetQ logger levels
logger.org.hornetq.core.server.level=INFO
logger.org.hornetq.utils.level=INFO
logger.org.hornetq.jms.level=DEBUG

# Root logger handlers
logger.handlers=FILE,CONSOLE

# Console handler configuration
handler.CONSOLE=org.jboss.logmanager.handlers.ConsoleHandler
handler.CONSOLE.properties=autoFlush
handler.CONSOLE.level=FINE
handler.CONSOLE.autoFlush=true
handler.CONSOLE.formatter=PATTERN

# File handler configuration
handler.FILE=org.jboss.logmanager.handlers.FileHandler
handler.FILE.level=FINE
handler.FILE.properties=autoFlush,fileName
handler.FILE.autoFlush=true
handler.FILE.fileName=hornetq.log
handler.FILE.formatter=PATTERN

# Formatter pattern configuration
formatter.PATTERN=org.jboss.logmanager.formatters.PatternFormatter
formatter.PATTERN.properties=pattern
formatter.PATTERN.pattern=%d{HH:mm:ss,SSS} %-5p [%c] %s%E%n

The HornetQ REST interface allows you to leverage the reliability and scalability features of HornetQ over a simple REST/HTTP interface. Messages are produced and consumed by sending and receiving simple HTTP messages that contain the content you want to push around. For instance, here's a simple example of posting an order to an order processing queue express as an HTTP message:

POST /queue/orders/create HTTP/1.1
Host: example.com
Content-Type: application/xml

<order>
   <name>Bill</name>
   <item>iPhone 4</item>
   <cost>$199.99</cost>
</order>

As you can see, we're just posting some arbitrary XML document to a URL. When the XML is received on the server is it processed within HornetQ as a JMS message and distributed through core HornetQ. Simple and easy. Consuming messages from a queue or topic looks very similar. We'll discuss the entire interface in detail later in this docbook.

HornetQ's REST interface is installed as a Web archive (WAR). It depends on the RESTEasy project and can currently only run within a servlet container. Installing the HornetQ REST interface is a little bit different depending whether HornetQ is already installed and configured for your environment (e.g. you're deploying within JBoss AS 7) or you want the HornetQ REST WAR to startup and manage the HornetQ server (e.g. you're deploying within something like Apache Tomcat).

The section should be used when you want to use the HornetQ REST interface in an environment that already has HornetQ installed and running, e.g. JBoss AS 7. You must create a Web archive (.WAR) file with the following web.xml settings:

<web-app>
   <listener>
      <listener-class>
         org.jboss.resteasy.plugins.server.servlet.ResteasyBootstrap
      </listener-class>
   </listener>

   <listener>
      <listener-class>
         org.hornetq.rest.integration.RestMessagingBootstrapListener
      </listener-class>
   </listener>

   <filter>
      <filter-name>Rest-Messaging</filter-name>
      <filter-class>
         org.jboss.resteasy.plugins.server.servlet.FilterDispatcher
      </filter-class>
   </filter>

   <filter-mapping>
      <filter-name>Rest-Messaging</filter-name>
      <url-pattern>/*</url-pattern>
   </filter-mapping>
</web-app>

Within your WEB-INF/lib directory you must have the hornetq-rest.jar file. If RESTEasy is not installed within your environment, you must add the RESTEasy jar files within the lib directory as well. Here's a sample Maven pom.xml that can build your WAR for this case.

<project xmlns="http://maven.apache.org/POM/4.0.0"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

   <modelVersion>4.0.0</modelVersion>
   <groupId>org.somebody</groupId>
   <artifactId>myapp</artifactId>
   <packaging>war</packaging>
   <name>My App</name>
   <version>0.1-SNAPSHOT</version>
   <repositories>
      <repository>
         <id>jboss</id>
         <url>http://repository.jboss.org/nexus/content/groups/public/</url>
      </repository>
   </repositories>

   <build>
      <plugins>
         <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <configuration>
               <source>1.6</source>
               <target>1.6</target>
            </configuration>
         </plugin>
      </plugins>
   </build>

   <dependencies>
      <dependency>
         <groupId>org.hornetq.rest</groupId>
         <artifactId>hornetq-rest</artifactId>
         <version>2.3.0-SNAPSHOT</version>
      </dependency>
   </dependencies>
</project>

You can bootstrap HornetQ within your WAR as well. To do this, you must have the HornetQ core and JMS jars along with Netty, Resteasy, and the HornetQ REST jar within your WEB-INF/lib. You must also have a hornetq-configuration.xml, hornetq-jms.xml, and hornetq-users.xml config files within WEB-INF/classes. The examples that come with the HornetQ REST distribution show how to do this. You must also add an additional listener to your web.xml file. Here's an example:

<web-app>
   <listener>
      <listener-class>
         org.jboss.resteasy.plugins.server.servlet.ResteasyBootstrap
      </listener-class>
   </listener>

   <listener>
      <listener-class>
         org.hornetq.rest.integration.HornetqBootstrapListener
      </listener-class>
   </listener>

   <listener>
      <listener-class>
         org.hornetq.rest.integration.RestMessagingBootstrapListener
      </listener-class>
   </listener>

   <filter>
      <filter-name>Rest-Messaging</filter-name>
      <filter-class>
         org.jboss.resteasy.plugins.server.servlet.FilterDispatcher
      </filter-class>
   </filter>

   <filter-mapping>
      <filter-name>Rest-Messaging</filter-name>
      <url-pattern>/*</url-pattern>
   </filter-mapping>
</web-app>

Here's a Maven pom.xml file for creating a WAR for this environment. Make sure your hornetq configuration files are within the src/main/resources directory so that they are stuffed within the WAR's WEB-INF/classes directory!

<project xmlns="http://maven.apache.org/POM/4.0.0"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

   <modelVersion>4.0.0</modelVersion>
   <groupId>org.somebody</groupId>
   <artifactId>myapp</artifactId>
   <packaging>war</packaging>
   <name>My App</name>
   <version>0.1-SNAPSHOT</version>
   <repositories>
      <repository>
         <id>jboss</id>
         <url>http://repository.jboss.org/nexus/content/groups/public/</url>
      </repository>
   </repositories>
   <build>
      <plugins>
         <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <configuration>
               <source>1.6</source>
               <target>1.6</target>
            </configuration>
         </plugin>
      </plugins>
   </build>
   <dependencies>
      <dependency>
         <groupId>org.hornetq</groupId>
         <artifactId>hornetq-core</artifactId>
         <version>2.3.0-SNAPSHOT</version>
      </dependency>
      <dependency>
         <groupId>io.netty</groupId>
         <artifactId>netty</artifactId>
         <version>3.4.5.Final</version>
      </dependency>
      <dependency>
         <groupId>org.hornetq</groupId>
         <artifactId>hornetq-jms</artifactId>
         <version>2.3.0-SNAPSHOT</version>
      </dependency>
      <dependency>
         <groupId>org.jboss.spec.javax.jms</groupId>
         <artifactId>jboss-jms-api_1.1_spec</artifactId>
         <version>1.0.0.Final</version>
      </dependency>
      <dependency>
         <groupId>org.hornetq.rest</groupId>
         <artifactId>hornetq-rest</artifactId>
         <version>2.3.0-SNAPSHOT</version>
      </dependency>
      <dependency>
         <groupId>org.jboss.resteasy</groupId>
         <artifactId>resteasy-jaxrs</artifactId>
         <version>2.3.4.Final</version>
      </dependency>
      <dependency>
         <groupId>org.jboss.resteasy</groupId>
         <artifactId>resteasy-jaxb-provider</artifactId>
         <version>2.3.4.Final</version>
      </dependency>
   </dependencies>
</project>

The HornetQ REST implementation does have some configuration options. These are configured via XML configuration file that must be in your WEB-INF/classes directory. You must set the web.xml context-param rest.messaging.config.file to specify the name of the configuration file. Below is the format of the XML configuration file and the default values for each.

<rest-messaging>
   <server-in-vm-id>0</server-in-vm-id>
   <use-link-headers>false</use-link-headers>
   <default-durable-send>false</default-durable-send>
   <dups-ok>true</dups-ok>
   <topic-push-store-dir>topic-push-store</topic-push-store-dir>
   <queue-push-store-dir>queue-push-store</queue-push-store-dir>
   <producer-time-to-live>0</producer-time-to-live>
   <producer-session-pool-size>10</producer-session-pool-size>
   <session-timeout-task-interval>1</session-timeout-task-interval>
   <consumer-session-timeout-seconds>300</consumer-session-timeout-seconds>
   <consumer-window-size>-1</consumer-window-size>
</rest-messaging>

Let's give an explanation of each config option.

The HornetQ REST interface publishes a variety of REST resources to perform various tasks on a queue or topic. Only the top-level queue and topic URI schemes are published to the outside world. You must discover all over resources to interact with by looking for and traversing links. You'll find published links within custom response headers and embedded in published XML representations. Let's look at how this works.

To interact with a queue or topic you do a HEAD or GET request on the following relative URI pattern:

/queues/{name}
/topics/{name}

The base of the URI is the base URL of the WAR you deployed the HornetQ REST server within as defined in the Installation and Configuration section of this document. Replace the {name} string within the above URI pattern with the name of the queue or topic you are interested in interacting with. For example if you have configured a JMS topic named "foo" within your hornetq-jms.xml file, the URI name should be "jms.topic.foo". If you have configured a JMS queue name "bar" within your hornetq-jms.xml file, the URI name should be "jms.queue.bar". Internally, HornetQ prepends the "jms.topic" or "jms.queue" strings to the name of the deployed destination. Next, perform your HEAD or GET request on this URI. Here's what a request/response would look like.

HEAD /queues/jms.queue.bar HTTP/1.1
Host: example.com

--- Response ---
HTTP/1.1 200 Ok
msg-create: http://example.com/queues/jms.queue.bar/create
msg-pull-consumers: http://example.com/queues/jms.queue.bar/pull-consumers
msg-push-consumers: http://example.com/queues/jms.queue.bar/push-consumers

The HEAD or GET response contains a number of custom response headers that are URLs to additional REST resources that allow you to interact with the queue or topic in different ways. It is important not to rely on the scheme of the URLs returned within these headers as they are an implementation detail. Treat them as opaque and query for them each and every time you initially interact (at boot time) with the server. If you treat all URLs as opaque then you will be isolated from implementation changes as the HornetQ REST interface evolves over time.

Below is a list of response headers you should expect when interacting with a Queue resource.

Below is a list of response headers you should expect when interacting with a Topic resource.

This chapter discusses the protocol for posting messages to a queue or a topic. In HornetQ REST Interface Basics, you saw that a queue or topic resource publishes variable custom headers that are links to other RESTful resources. The msg-create header is the URL you post messages to. Messages are published to a queue or topic by sending a simple HTTP message to the URL published by the msg-create header. The HTTP message contains whatever content you want to publish to the HornetQ destination. Here's an example scenario:

  1. Obtain the starting msg-create header from the queue or topic resource.

    HEAD /queues/jms.queue.bar HTTP/1.1
    Host: example.com
    
    --- Response ---
    HTTP/1.1 200 Ok
    msg-create: http://example.com/queues/jms.queue.bar/create
    msg-create-with-id: http://example.com/queues/jms.queue.bar/create/{id}

  2. Do a POST to the URL contained in the msg-create header.

    POST /queues/jms.queue.bar/create
    Host: example.com
    Content-Type: application/xml
    
    <order>
       <name>Bill</name>
       <item>iPhone4</name>
       <cost>$199.99</cost>
    </order>
    
    --- Response ---
    HTTP/1.1 201 Created
    msg-create-next: http://example.com/queues/jms.queue.bar/create/002

    A successful response will return a 201 response code. Also notice that a msg-create-next response header is sent as well. You must use this URL to POST your next message.

  3. POST your next message to the queue using the URL returned in the msg-create-next header.

    POST /queues/jms.queue.bar/create/002
    Host: example.com
    Content-Type: application/xml
    
    <order>
       <name>Monica</name>
       <item>iPad</item>
       <cost>$499.99</cost>
    </order>
    
    --- Response --
    HTTP/1.1 201 Created
    msg-create-next: http://example.com/queues/jms.queue.bar/create/003

    Continue using the new msg-create-next header returned with each response.

Warning

It is VERY IMPORTANT that you never re-use returned msg-create-next headers to post new messages. This URL may be uniquely generated for each message and used for duplicate detection. If you lose the URL within the msg-create-next header, then just go back to the queue or topic resource to get the msg-create URL.

Sometimes you might have network problems when posting new messages to a queue or topic. You may do a POST and never receive a response. Unfortunately, you don't know whether or not the server received the message and so a re-post of the message might cause duplicates to be posted to the queue or topic. By default, the HornetQ REST interface is configured to accept and post duplicate messages. You can change this by turning on duplicate message detection by setting the dups-ok config option to false as described in HornetQ REST Interface Basics. When you do this, the initial POST to the msg-create URL will redirect you, using the standard HTTP 307 redirection mechanism to a unique URL to POST to. All other interactions remain the same as discussed earlier. Here's an example:

  1. Obtain the starting msg-create header from the queue or topic resource.

    HEAD /queues/jms.queue.bar HTTP/1.1
    Host: example.com
    
    --- Response ---
    HTTP/1.1 200 Ok
    msg-create: http://example.com/queues/jms.queue.bar/create
    msg-create-with-id: http://example.com/queues/jms.queue.bar/create/{id}

  2. Do a POST to the URL contained in the msg-create header.

    POST /queues/jms.queue.bar/create
    Host: example.com
    Content-Type: application/xml
    
    <order>
       <name>Bill</name>
       <item>iPhone4</name>
       <cost>$199.99</cost>
    </order>
    
    --- Response ---
    HTTP/1.1 307 Redirect
    Location: http://example.com/queues/jms.queue.bar/create/001

    A successful response will return a 307 response code. This is standard HTTP protocol. It is telling you that you must re-POST to the URL contained within the Location header.

  3. re-POST your message to the URL provided within the Location header.

    POST /queues/jms.queue.bar/create/001
    Host: example.com
    Content-Type: application/xml
    
    <order>
       <name>Bill</name>
       <item>iPhone4</name>
       <cost>$199.99</cost>
    </order>
    
    --- Response --
    HTTP/1.1 201 Created
    msg-create-next: http://example.com/queues/jms.queue.bar/create/002

    You should receive a 201 Created response. If there is a network failure, just re-POST to the Location header. For new messages, use the returned msg-create-next header returned with each response.

  4. POST any new message to the returned msg-create-next header.

    POST /queues/jms.queue.bar/create/002
    Host: example.com
    Content-Type: application/xml
    
    <order>
       <name>Monica</name>
       <item>iPad</name>
       <cost>$499.99</cost>
    </order>
    
    --- Response --
    HTTP/1.1 201 Created
    msg-create-next: http://example.com/queues/jms.queue.bar/create/003

    If there ever is a network problem, just repost to the URL provided in the msg-create-next header.

How can this work? As you can see, with each successful response, the HornetQ REST server returns a uniquely generated URL within the msg-create-next header. This URL is dedicated to the next new message you want to post. Behind the scenes, the code extracts an identify from the URL and uses HornetQ's duplicate detection mechanism by setting the DUPLICATE_DETECTION_ID property of the JMS message that is actually posted to the system.

An alternative to this approach is to use the msg-create-with-id header. This is not an invokable URL, but a URL template. The idea is that the client provides the DUPLICATE_DETECTION_ID and creates it's own create-next URL. The msg-create-with-id header looks like this (you've see it in previous examples, but we haven't used it):

msg-create-with-id: http://example.com/queues/jms.queue.bar/create/{id}

You see that it is a regular URL appended with a {id}. This {id} is a pattern matching substring. A client would generate its DUPLICATE_DETECTION_ID and replace {id} with that generated id, then POST to the new URL. The URL the client creates works exactly like a create-next URL described earlier. The response of this POST would also return a new msg-create-next header. The client can continue to generate its own DUPLICATE_DETECTION_ID, or use the new URL returned via the msg-create-next header.

The advantage of this approach is that the client does not have to repost the message. It also only has to come up with a unique DUPLICATE_DETECTION_ID once.

There are two different ways to consume messages from a topic or queue. You can wait and have the messaging server push them to you, or you can continuously poll the server yourself to see if messages are available. This chapter discusses the latter. Consuming messages via a pull works almost identically for queues and topics with some minor, but important caveats. To start consuming you must create a consumer resource on the server that is dedicated to your client. Now, this pretty much breaks the stateless principle of REST, but after much prototyping, this is the best way to work most effectively with HornetQ through a REST interface.

You create consumer resources by doing a simple POST to the URL published by the msg-pull-consumers response header if you're interacting with a queue, the msg-pull-subscribers response header if you're interacting with a topic. These headers are provided by the main queue or topic resource discussed in HornetQ REST Interface Basics. Doing an empty POST to one of these URLs will create a consumer resource that follows an auto-acknowledge protocol and, if you're interacting with a topic, creates a temporty subscription to the topic. If you want to use the acknowledgement protocol and/or create a durable subscription (topics only), then you must use the form parameters (application/x-www-form-urlencoded) described below.

  • autoAck. A value of true or false can be given. This defaults to true if you do not pass this parameter.

  • durable. A value of true or false can be given. This defaults to false if you do not pass this parameter. Only available on topics. This specifies whether you want a durable subscription or not. A durable subscription persists through server restart.

  • name. This is the name of the durable subscription. If you do not provide this parameter, the name will be automatically generated by the server. Only usable on topics.

  • selector. This is an optional JMS selector string. The HornetQ REST interface adds HTTP headers to the JMS message for REST produced messages. HTTP headers are prefixed with "http_" and every '-' charactor is converted to a '$'.

  • idle-timeout. For a topic subscription, idle time in milliseconds in which the consumer connections will be closed if idle.

  • delete-when-idle. Boolean value, If true, a topic subscription will be deleted (even if it is durable) when an the idle timeout is reached.

This section focuses on the auto-acknowledge protocol for consuming messages via a pull. Here's a list of the response headers and URLs you'll be interested in.

Here is an example of creating an auto-acknowledged queue pull consumer.

Creating an auto-acknowledged consumer for a topic is pretty much the same. Here's an example of creating a durable auto-acknowledged topic pull subscription.

After you have created a consumer resource, you are ready to start pulling messages from the server. Notice that when you created the consumer for either the queue or topic, the response contained a msg-consume-next response header. POST to the URL contained within this header to consume the next message in the queue or topic subscription. A successful POST causes the server to extract a message from the queue or topic subscription, acknowledge it, and return it to the consuming client. If there are no messages in the queue or topic subscription, a 503 (Service Unavailable) HTTP code is returned.

Here's an example of pulling multiple messages from the consumer resource.

  1. Do a POST on the msg-consume-next URL that was returned with the consumer or subscription resource discussed earlier.

    POST /queues/jms.queue.bar/pull-consumers/consume-next-1
    Host: example.com
    
    --- Response ---
    HTTP/1.1 200 Ok
    Content-Type: application/xml
    msg-consume-next: http://example.com/queues/jms.queue.bar/pull-consumers/333/consume-next-2
    msg-consumer: http://example.com/queues/jms.queue.bar/pull-consumers/333
    
    <order>...</order>

    The POST returns the message consumed from the queue. It also returns a new msg-consume-next link. Use this new link to get the next message. Notice also a msg-consumer response header is returned. This is a URL that points back to the consumer or subscription resource. You will need that to clean up your connection after you are finished using the queue or topic.

  2. The POST returns the message consumed from the queue. It also returns a new msg-consume-next link. Use this new link to get the next message.

    POST /queues/jms.queue.bar/pull-consumers/consume-next-2
    Host: example.com
    
    --- Response ---
    Http/1.1 503 Service Unavailable
    Retry-After: 5
    msg-consume-next: http://example.com/queues/jms.queue.bar/pull-consumers/333/consume-next-2

    In this case, there are no messages in the queue, so we get a 503 response back. As per the HTTP 1.1 spec, a 503 response may return a Retry-After head specifying the time in seconds that you should retry a post. Also notice, that another new msg-consume-next URL is present. Although it probabley is the same URL you used last post, get in the habit of using URLs returned in response headers as future versions of HornetQ REST might be redirecting you or adding additional data to the URL after timeouts like this.

  3. POST to the URL within the last msg-consume-next to get the next message.

    POST /queues/jms.queue.bar/pull-consumers/consume-next-2
    Host: example.com
    
    --- Response ---
    HTTP/1.1 200 Ok
    Content-Type: application/xml
    msg-consume-next: http://example.com/queues/jms.queue.bar/pull-consumers/333/consume-next-3
    
    <order>...</order>

The manual acknowledgement protocol is similar to the auto-ack protocol except there is an additional round trip to the server to tell it that you have received the message and that the server can internally ack the message. Here is a list of the respone headers you will be interested in.

Here is an example of creating an auto-acknowledged queue pull consumer.

Creating an manually-acknowledged consumer for a topic is pretty much the same. Here's an example of creating a durable manually-acknowledged topic pull subscription.

After you have created a consumer resource, you are ready to start pulling messages from the server. Notice that when you created the consumer for either the queue or topic, the response contained a msg-acknowledge-next response header. POST to the URL contained within this header to consume the next message in the queue or topic subscription. If there are no messages in the queue or topic subscription, a 503 (Service Unavailable) HTTP code is returned. A successful POST causes the server to extract a message from the queue or topic subscription and return it to the consuming client. It does not acknowledge the message though. The response will contain the acknowledgement header which you will use to acknowledge the message.

Here's an example of pulling multiple messages from the consumer resource.

  1. Do a POST on the msg-acknowledge-next URL that was returned with the consumer or subscription resource discussed earlier.

    POST /queues/jms.queue.bar/pull-consumers/consume-next-1
    Host: example.com
    
    --- Response ---
    HTTP/1.1 200 Ok
    Content-Type: application/xml
    msg-acknowledgement:
    http://example.com/queues/jms.queue.bar/pull-consumers/333/acknowledgement/2
    msg-consumer: http://example.com/queues/jms.queue.bar/pull-consumers/333
    
    <order>...</order>

    The POST returns the message consumed from the queue. It also returns amsg-acknowledgement link. You will use this new link to acknowledge the message. Notice also a msg-consumer response header is returned. This is a URL that points back to the consumer or subscription resource. You will need that to clean up your connection after you are finished using the queue or topic.

  2. Acknowledge or unacknowledge the message by doing a POST to the URL contained in the msg-acknowledgement header. You must pass an acknowledge form parameter set to true or false depending on whether you want to acknowledge or unacknowledge the message on the server.

    POST /queues/jms.queue.bar/pull-consumers/acknowledgement/2
    Host: example.com
    Content-Type: application/x-www-form-urlencoded
    
    acknowledge=true
    
    --- Response ---
    Http/1.1 200 Ok
    msg-acknowledge-next:
    http://example.com/queues/jms.queue.bar/pull-consumers/333/acknowledge-next-2

    Whether you acknowledge or unacknowledge the message, the response will contain a new msg-acknowledge-next header that you must use to obtain the next message.

When the client is done with its consumer or topic subscription it should do an HTTP DELETE call on the consumer URL passed back from the Location header or the msg-consumer response header. The server will time out a consumer with the value of consumer-session-timeout-seconds configured from REST configuration, so you don't have to clean up if you don't want to, but if you are a good kid, you will clean up your messes. A consumer timeout for durable subscriptions will not delete the underlying durable JMS subscription though, only the server-side consumer resource (and underlying JMS session).

You can configure the HornetQ REST server to push messages to a registered URL either remotely through the REST interface, or by creating a pre-configured XML file for the HornetQ REST server to load at boot time.

Creating a push consumer for a queue first involves creating a very simple XML document. This document tells the server if the push subscription should survive server reboots (is it durable). It must provide a URL to ship the forwarded message to. Finally, you have to provide authentication information if the final endpoint requires authentication. Here's a simple example:

<push-registration>
   <durable>false</durable>
   <selector><![CDATA[
   SomeAttribute > 1
   ]]>
   </selector>
   <link rel="push" href="http://somewhere.com" type="application/json" method="PUT"/>
   <maxRetries>5</maxRetries>
   <retryWaitMillis>1000</retryWaitMillis>
   <disableOnFailure>true</disableOnFailure>
</push-registration>

The durable element specifies whether the registration should be saved to disk so that if there is a server restart, the push subscription will still work. This element is not required. If left out it defaults tofalse. If durable is set to true, an XML file for the push subscription will be created within the directory specified by the queue-push-store-dir config variable defined in Chapter 2 (topic-push-store-dir for topics).

The selector element is optional and defines a JMS message selector. You should enclose it within CDATA blocks as some of the selector characters are illegal XML.

The maxRetries element specifies how many times a the server will try to push a message to a URL if there is a connection failure.

The retryWaitMillis element specifies how long to wait before performing a retry.

The disableOnFailure element, if set to true, will disable the registration if all retries have failed. It will not disable the connection on non-connection-failure issues (like a bad request for instance). In these cases, the dead letter queue logic of HornetQ will take over.

The link element specifies the basis of the interaction. The href attribute contains the URL you want to interact with. It is the only required attribute. The type attribute specifies the content-type of what the push URL is expecting. The method attribute defines what HTTP method the server will use when it sends the message to the server. If it is not provided it defaults to POST. The rel attribute is very important and the value of it triggers different behavior. Here's the values a rel attribute can have:

Creating a push subscription at runtime involves getting the factory resource URL from the msg-push-consumers header, if the destination is a queue, or msg-push-subscriptions header, if the destination is a topic. Here's an example of creating a push registration for a queue:

Here's an example of creating a push registration for a topic:

The HornetQ REST interface supports mixing JMS and REST producres and consumers. You can send an ObjectMessage through a JMS Producer, and have a REST client consume it. You can have a REST client POST a message to a topic and have a JMS Consumer receive it. Some simple transformations are supported if you have the correct RESTEasy providers installed.

HornetQ is designed as set of simple Plain Old Java Objects (POJOs). This means HornetQ can be instantiated and run in any dependency injection framework such as JBoss Microcontainer, Spring or Google Guice. It also means that if you have an application that could use messaging functionality internally, then it can directly instantiate HornetQ clients and servers in its own application code to perform that functionality. We call this embedding HornetQ.

Examples of applications that might want to do this include any application that needs very high performance, transactional, persistent messaging but doesn't want the hassle of writing it all from scratch.

Embedding HornetQ can be done in very few easy steps. Instantiate the configuration object, instantiate the server, start it, and you have a HornetQ running in your virtual machine. It's as simple and easy as that.

The simplest way to embed HornetQ is to use the embedded wrapper classes and configure HornetQ through its configuration files. There are two different helper classes for this depending on whether your using the HornetQ Core API or JMS.

You can follow this step-by-step guide to programmatically embed the core, non-JMS HornetQ Server instance:

Create the configuration object - this contains configuration information for a HornetQ instance. The setter methods of this class allow you to programmitcally set configuration options as describe in the Section 49.1, “Server Configuration” section.

The acceptors are configured through ConfigurationImpl. Just add the NettyAcceptorFactory on the transports the same way you would through the main configuration file.

import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;

...

Configuration config = new ConfigurationImpl();
HashSet<TransportConfiguration> transports = new HashSet<TransportConfiguration>();
      
transports.add(new TransportConfiguration(NettyAcceptorFactory.class.getName()));
transports.add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));

config.setAcceptorConfigurations(transports);

You need to instantiate an instance of org.hornetq.api.core.server.embedded.EmbeddedHornetQ and add the configuration object to it.

import org.hornetq.api.core.server.HornetQ;
import org.hornetq.core.server.embedded.EmbeddedHornetQ;

...

EmbeddedHornetQ server = new EmbeddedHornetQ();
server.setConfiguration(config);

server.start();

You also have the option of instantiating HornetQServerImpl directly:

HornetQServer server = new HornetQServerImpl(config);
server.start();

For JMS POJO instantiation, you work with the EmbeddedJMS class instead as described earlier. First you define the configuration programmatically for your ConnectionFactory and Destination objects, then set the JmsConfiguration property of the EmbeddedJMS class. Here is an example of this:

// Step 1. Create HornetQ core configuration, and set the properties accordingly
Configuration configuration = new ConfigurationImpl();
configuration.setPersistenceEnabled(false);
configuration.setSecurityEnabled(false);
configuration.getAcceptorConfigurations().add(new TransportConfiguration(NettyAcceptorFactory.class.getName()));

// Step 2. Create the JMS configuration
JMSConfiguration jmsConfig = new JMSConfigurationImpl();

// Step 3. Configure the JMS ConnectionFactory
TransportConfiguration connectorConfig = new TransportConfiguration(NettyConnectorFactory.class.getName());
ConnectionFactoryConfiguration cfConfig = new ConnectionFactoryConfigurationImpl("cf", connectorConfig, "/cf");
jmsConfig.getConnectionFactoryConfigurations().add(cfConfig);

// Step 4. Configure the JMS Queue
JMSQueueConfiguration queueConfig = new JMSQueueConfigurationImpl("queue1", null, false, "/queue/queue1");
jmsConfig.getQueueConfigurations().add(queueConfig);

// Step 5. Start the JMS Server using the HornetQ core server and the JMS configuration
EmbeddedJMS jmsServer = new EmbeddedJMS();
jmsServer.setConfiguration(configuration);
jmsServer.setJmsConfiguration(jmsConfig);
jmsServer.start();

Please see Section 11.1.20, “Embedded” for an example which shows how to setup and run HornetQ embedded with JMS.

You may also choose to use a dependency injection framework such as JBoss Micro Container™ or Spring Framework™. See Chapter 45, Spring Integration for more details on Spring and HornetQ, but here's how you would do things with the JBoss Micro Contaier.

HornetQ standalone uses JBoss Micro Container as the injection framework. HornetQBootstrapServer and hornetq-beans.xml which are part of the HornetQ distribution provide a very complete implementation of what's needed to bootstrap the server using JBoss Micro Container.

When using JBoss Micro Container, you need to provide an XML file declaring the HornetQServer and Configuration object, you can also inject a security manager and a MBean server if you want, but those are optional.

A very basic XML Bean declaration for the JBoss Micro Container would be:

<?xml version="1.0" encoding="UTF-8"?>
<deployment xmlns="urn:jboss:bean-deployer:2.0">
   <!-- The core configuration -->
   <bean name="Configuration" 
         class="org.hornetq.core.config.impl.FileConfiguration">
   </bean>

     <!-- The core server -->
   <bean name="HornetQServer" 
         class="org.hornetq.core.server.impl.HornetQServerImpl">
      <constructor>
         <parameter>
            <inject bean="Configuration"/>
         </parameter>
      </constructor>
   </bean>
</deployment>

HornetQBootstrapServer provides an easy encapsulation of JBoss Micro Container.

HornetQBootstrapServer bootStrap = new HornetQBootstrapServer(new String[] {"hornetq-beans.xml"});
bootStrap.run();

HornetQ provides a simple bootstrap class, org.hornetq.integration.spring.SpringJmsBootstrap, for integration with Spring. To use it, you configure HornetQ as you always would, through its various configuration files like hornetq-configuration.xml, hornetq-jms.xml, and hornetq-users.xml. The Spring helper class starts the HornetQ server and adds any factories or destinations configured within hornetq-jms.xml directly into the namespace of the Spring context. Let's take this hornetq-jms.xml file for instance:

<configuration xmlns="urn:hornetq"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
   <!--the connection factory used by the example-->
   <connection-factory name="ConnectionFactory">
      <connectors>
         <connector-ref connector-name="in-vm"/>
      </connectors>
      <entries>
         <entry name="ConnectionFactory"/>
      </entries>
   </connection-factory>

   <!--the queue used by the example-->
   <queue name="exampleQueue">
      <entry name="/queue/exampleQueue"/>
   </queue>
</configuration>

Here we've specified a javax.jms.ConnectionFactory we want bound to a ConnectionFactory entry as well as a queue destination bound to a /queue/exampleQueue entry. Using the SpringJmsBootStrap bean will automatically populate the Spring context with references to those beans so that you can use them. Below is an example Spring JMS bean file taking advantage of this feature:

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">

   <bean id="EmbeddedJms" class="org.hornetq.integration.spring.SpringJmsBootstrap" init-method="start"/>

   <bean id="listener" class="org.hornetq.tests.integration.spring.ExampleListener"/>
    
   <bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
      <property name="connectionFactory" ref="ConnectionFactory"/>
      <property name="destination" ref="/queue/exampleQueue"/>
      <property name="messageListener" ref="listener"/>
   </bean>
</beans>

As you can see, the listenerContainer bean references the components defined in the hornetq-jms.xml file. The SpringJmsBootstrap class extends the EmbeddedJMS class talked about in Section 44.1.2, “JMS API” and the same defaults and configuration options apply. Also notice that an init-method must be declared with a start value so that the bean's lifecycle is executed. See the javadocs for more details on other properties of the bean class.

HornetQ supports interceptors to intercept packets entering and exiting the server. Incoming and outgoing interceptors are be called for any packet entering or exiting the server respectively. This allows custom code to be executed, e.g. for auditing packets, filtering or other reasons. Interceptors can change the packets they intercept. This makes interceptors powerful, but also potentially dangerous.

The interceptors can also be run on the client side to intercept packets either sent by the client to the server or by the server to the client. This is done by adding the interceptor to the ServerLocator with the addIncomingInterceptor(Interceptor) or addOutgoingInterceptor(Interceptor) methods.

As noted above, if an interceptor returns false then the sending of the packet is aborted which means that no other interceptors are be called and the packet is not be processed further by the client. Typically this process happens transparently to the client (i.e. it has no idea if a packet was aborted or not). However, in the case of an outgoing packet that is sent in a blocking fashion a HornetQException will be thrown to the caller. The exception is thrown because blocking sends provide reliability and it is considered an error for them not to succeed. Blocking sends occurs when, for example, an application invokes setBlockOnNonDurableSend(true) or setBlockOnDurableSend(true) on its ServerLocator or if an application is using a JMS connection factory retrieved from JNDI that has either block-on-durable-send or block-on-non-durable-send set to true. Blocking is also used for packets dealing with transactions (e.g. commit, roll-back, etc.). The HornetQException thrown will contain the name of the interceptor that returned false.

As on the server, the client interceptor classes (and their dependencies) must be added to the classpath to be properly instantiated and invoked.

See Section 11.1.26, “Interceptor” for an example which shows how to use interceptors to add properties to a message on the server.

Stomp is a text-orientated wire protocol that allows Stomp clients to communicate with Stomp Brokers. HornetQ now supports both Stomp 1.0 and Stomp 1.1.

Stomp clients are available for several languages and platforms making it a good choice for interoperability.

Well behaved STOMP clients will always send a DISCONNECT frame before closing their connections. In this case the server will clear up any server side resources such as sessions and consumers synchronously. However if STOMP clients exit without sending a DISCONNECT frame or if they crash the server will have no way of knowing immediately whether the client is still alive or not. STOMP connections therefore default to a connection-ttl value of 1 minute (see chapter on connection-ttl for more information. This value can be overridden using connection-ttl-override.

If you need a specific connection-ttl for your stomp connections without affecting the connection-ttl-override setting, you can configure your stomp acceptor with the "connection-ttl" property, which is used to set the ttl for connections that are created from that acceptor. For example:

<acceptor name="stomp-acceptor">
   <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
   <param key="protocol"  value="stomp"/>
   <param key="port"  value="61613"/>
   <param key="connection-ttl"  value="20000"/>
</acceptor>

The above configuration will make sure that any stomp connection that is created from that acceptor will have its connection-ttl set to 20 seconds.

Note

Please note that the STOMP protocol version 1.0 does not contain any heartbeat frame. It is therefore the user's responsibility to make sure data is sent within connection-ttl or the server will assume the client is dead and clean up server side resources. With Stomp 1.1 users can use heart-beats to maintain the life cycle of stomp connections.

As explained in Chapter 9, Mapping JMS Concepts to the Core API, JMS destinations are also mapped to HornetQ addresses and queues. If you want to use Stomp to send messages to JMS destinations, the Stomp destinations must follow the same convention:

  • send or subscribe to a JMS Queue by prepending the queue name by jms.queue..

    For example, to send a message to the orders JMS Queue, the Stomp client must send the frame:

    SEND
    destination:jms.queue.orders
    
    hello queue orders
    ^@
  • send or subscribe to a JMS Topic by prepending the topic name by jms.topic..

    For example to subscribe to the stocks JMS Topic, the Stomp client must send the frame:

    SUBSCRIBE
    destination:jms.topic.stocks
    
    ^@

HornetQ also support Stomp over Web Sockets. Modern web browser which support Web Sockets can send and receive Stomp messages from HornetQ.

To enable Stomp over Web Sockets, you must configure a NettyAcceptor with a protocol parameter set to stomp_ws:

<acceptor name="stomp-ws-acceptor">
   <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
   <param key="protocol" value="stomp_ws"/>
   <param key="port" value="61614"/>
</acceptor>

With this configuration, HornetQ will accept Stomp connections over Web Sockets on the port 61614 with the URL path /stomp. Web browser can then connect to ws://<server>:61614/stomp using a Web Socket to send and receive Stomp messages.

A companion JavaScript library to ease client-side development is available from GitHub (please see its documentation for a complete description).

The stomp-websockets example shows how to configure HornetQ server to have web browsers and Java applications exchanges messages on a JMS topic.

StompConnect is a server that can act as a Stomp broker and proxy the Stomp protocol to the standard JMS API. Consequently, using StompConnect it is possible to turn HornetQ into a Stomp Broker and use any of the available stomp clients. These include clients written in C, C++, c# and .net etc.

To run StompConnect first start the HornetQ server and make sure that it is using JNDI.

Stomp requires the file jndi.properties to be available on the classpath. This should look something like:

java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
java.naming.provider.url=jnp://localhost:1099
java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces

Make sure this file is in the classpath along with the StompConnect jar and the HornetQ jars and simply run java org.codehaus.stomp.jms.Main.

In this chapter we'll discuss how to tune HornetQ for optimum performance.

There are a few areas where some tweaks can be done if you are using the JMS API

There are various other places in HornetQ where we can perform some tuning:

  • Use Asynchronous Send Acknowledgements. If you need to send durable messages non transactionally and you need a guarantee that they have reached the server by the time the call to send() returns, don't set durable messages to be sent blocking, instead use asynchronous send acknowledgements to get your acknowledgements of send back in a separate stream, see Chapter 20, Guarantees of sends and commits for more information on this.

  • Use pre-acknowledge mode. With pre-acknowledge mode, messages are acknowledged before they are sent to the client. This reduces the amount of acknowledgement traffic on the wire. For more information on this, see Chapter 29, Extra Acknowledge Modes.

  • Disable security. You may get a small performance boost by disabling security by setting the security-enabled parameter to false in hornetq-configuration.xml.

  • Disable persistence. If you don't need message persistence, turn it off altogether by setting persistence-enabled to false in hornetq-configuration.xml.

  • Sync transactions lazily. Setting journal-sync-transactional to false in hornetq-configuration.xml can give you better transactional persistent performance at the expense of some possibility of loss of transactions on failure. See Chapter 20, Guarantees of sends and commits for more information.

  • Sync non transactional lazily. Setting journal-sync-non-transactional to false in hornetq-configuration.xml can give you better non-transactional persistent performance at the expense of some possibility of loss of durable messages on failure. See Chapter 20, Guarantees of sends and commits for more information.

  • Send messages non blocking. Setting block-on-durable-send and block-on-non-durable-send to false in hornetq-jms.xml (if you're using JMS and JNDI) or directly on the ClientSessionFactory. This means you don't have to wait a whole network round trip for every message sent. See Chapter 20, Guarantees of sends and commits for more information.

  • If you have very fast consumers, you can increase consumer-window-size. This effectively disables consumer flow control.

  • Socket NIO vs Socket Old IO. By default HornetQ uses old (blocking) on the server and the client side (see the chapter on configuring transports for more information Chapter 16, Configuring the Transport). NIO is much more scalable but can give you some latency hit compared to old blocking IO. If you need to be able to service many thousands of connections on the server, then you should make sure you're using NIO on the server. However, if don't expect many thousands of connections on the server you can keep the server acceptors using old IO, and might get a small performance advantage.

  • Use the core API not JMS. Using the JMS API you will have slightly lower performance than using the core API, since all JMS operations need to be translated into core operations before the server can handle them. If using the core API try to use methods that take SimpleString as much as possible. SimpleString, unlike java.lang.String does not require copying before it is written to the wire, so if you re-use SimpleString instances between calls then you can avoid some unnecessary copying.

  • TCP buffer sizes. If you have a fast network and fast machines you may get a performance boost by increasing the TCP send and receive buffer sizes. See the Chapter 16, Configuring the Transport for more information on this.

    Note

    Note that some operating systems like later versions of Linux include TCP auto-tuning and setting TCP buffer sizes manually can prevent auto-tune from working and actually give you worse performance!

  • Increase limit on file handles on the server. If you expect a lot of concurrent connections on your servers, or if clients are rapidly opening and closing connections, you should make sure the user running the server has permission to create sufficient file handles.

    This varies from operating system to operating system. On Linux systems you can increase the number of allowable open file handles in the file /etc/security/limits.conf e.g. add the lines

    serveruser     soft    nofile  20000
    serveruser     hard    nofile  20000

    This would allow up to 20000 file handles to be open by the user serveruser.

  • Use batch-delay and set direct-deliver to false for the best throughput for very small messages. HornetQ comes with a preconfigured connector/acceptor pair (netty-throughput) in hornetq-configuration.xml and JMS connection factory (ThroughputConnectionFactory) in hornetq-jms.xmlwhich can be used to give the very best throughput, especially for small messages. See the Chapter 16, Configuring the Transport for more information on this.

We highly recommend you use the latest Java JVM for the best performance. We test internally using the Sun JVM, so some of these tunings won't apply to JDKs from other providers (e.g. IBM or JRockit)

This section is a quick index for looking up configuration. Click on the element name to go to the specific chapter.

This is the main core server configuration file.


Continued..

file-deployment-enabledBooleantrue means that the server will load configuration from the configuration filestrue
failover-on-shutdownBooleanWill this backup server come live on a normal server shutdownfalse
id-cache-sizeIntegerthe size of the cache for pre creating message id's2000
journal-buffer-sizeLongThe size of the internal buffer on the journal.128 KiB
journal-buffer-timeoutLongThe timeout (in nanoseconds) used to flush internal buffers on the journal.20000
journal-compact-min-filesIntegerThe minimal number of data files before we can start compacting10
journal-compact-percentageIntegerThe percentage of live data on which we consider compacting the journal30
journal-directoryStringthe directory to store the journal files indata/journal
journal-file-sizeLongthe size (in bytes) of each journal file10 * 1024 * 1024 (10 MiB)
journal-max-ioIntegerthe maximum number of write requests that can be in the AIO queue at any one time500
journal-min-filesIntegerhow many journal files to pre-create2
journal-sync-transactionalBooleanif true wait for transaction data to be synchronized to the journal before returning response to clienttrue
journal-sync-non-transactionalBooleanif true wait for non transaction data to be synced to the journal before returning response to client.true
journal-typeASYNCIO|NIOthe type of journal to useASYNCIO
jmx-management-enabledBooleantrue means that the management API is available via JMXtrue
jmx-domainStringthe JMX domain used to registered HornetQ MBeans in the MBeanServerorg.hornetq
log-delegate-factory-class-nameStringtodotodo
large-messages-directoryStringthe directory to store large messagesdata/largemessages
management-addressStringthe name of the management address to send management messages tojms.queue.hornetq.management
cluster-userStringthe user used by cluster connections to communicate between the clustered nodesHORNETQ.CLUSTER.ADMIN.USER
cluster-passwordStringthe password used by cluster connections to communicate between the clustered nodesCHANGE ME!!
management-notification-addressStringthe name of the address that consumers bind to receive management notificationshornetq.notifications
message-counter-enabledBooleantrue means that message counters are enabledfalse
message-counter-max-day-historyIntegerhow many days to keep message counter history10
message-counter-sample-periodLongthe sample period (in ms) to use for message counters10000
message-expiry-scan-periodLonghow often (in ms) to scan for expired messages30000
message-expiry-thread-priorityIntegerthe priority of the thread expiring messages3
paging-directoryStringthe directory to store paged messages indata/paging
persist-delivery-count-before-deliveryBooleantrue means that the delivery count is persisted before delivery. False means that this only happens after a message has been cancelled.false
persistence-enabledBooleantrue means that the server will use the file based journal for persistence.true
persist-id-cacheBooleantrue means that id's are persisted to the journaltrue
remoting-interceptorstodotodotodo
shared-storeBooleanis this server using a shared store for failoverfalse
scheduled-thread-pool-max-sizeIntegerthe number of threads that the main scheduled thread pool has.5
security-enabledBooleantrue means that security is enabledtrue
security-invalidation-intervalLonghow long (in ms) to wait before invalidating the security cache10000
thread-pool-max-sizeIntegerthe number of threads that the main thread pool has. -1 means no limit-1
async-connection-execution-enabledBooleanShould incoming packets on the server be handed off to a thread from the thread pool for processing or should they be handled on the remoting thread?true
transaction-timeoutLonghow long (in ms) before a transaction can be removed from the resource manager after create time60000
transaction-timeout-scan-periodLonghow often (in ms) to scan for timeout transactions1000
wild-card-routing-enabledBooleantrue means that the server supports wild card routingtrue
memory-measure-intervalLongfrequency to sample JVM memory in ms (or -1 to disable memory sampling)-1
memory-warning-thresholdIntegerPercentage of available memory which threshold a warning log25
connectorsConnectora list of remoting connectors configurations to create 
connector.name (attribute)StringName of the connector - mandatory 
connector.factory-classStringName of the ConnectorFactory implementation - mandatory 
connector.paramA connector configuration parameterA key-value pair used to configure the connector. A connector can have many param 
connector.param.key (attribute)StringKey of a configuration parameter - mandatory 
connector.param.value (attribute)StringValue of a configuration parameter - mandatory 
acceptorsAcceptora list of remoting acceptors to create 
acceptor.name (attribute)StringName of the acceptor - optional 
acceptor.factory-classStringName of the AcceptorFactory implementation - mandatory 
acceptor.paramAn acceptor configuration parameterA key-value pair used to configure the acceptor. An acceptor can have many param 
acceptor.param.key (attribute)StringKey of a configuration parameter - mandatory 
acceptor.param.value (attribute)StringValue of a configuration parameter - mandatory 
broadcast-groupsBroadcastGroupa list of broadcast groups to create 
broadcast-group.name (attribute) Stringa unique name for the broadcast group - mandatory 
broadcast-group.local-bind-address Stringlocal bind address that the datagram socket is bound towildcard IP address chosen by the kernel
broadcast-group.local-bind-port Integerlocal port to which the datagram socket is bound to-1 (anonymous port)
broadcast-group.group-address Stringmulticast address to which the data will be broadcast - mandatory 
broadcast-group.group-port IntegerUDP port number used for broadcasting - mandatory 
broadcast-group.broadcast-period Longperiod in milliseconds between consecutive broadcasts2000 (in milliseconds)
broadcast-group.jgroups-file StringName of JGroups configuration file. If specified, the server uses JGroups for broadcasting. - Optional 
broadcast-group.jgroups-channel StringName of JGroups Channel. If specified, the server uses the named channel for broadcasting. - Optional 
broadcast-group.connector-ref A pair of connectorA pair connector and optional backup connector that will be broadcasted. A broadcast-group can have multiple connector-ref 
broadcast-group.connector-ref.connector-name (attribute) StringName of the live connector - mandatory 
broadcast-group.connector-ref.backup-connector-name (attribute) StringName of the backup connector - optional 
discovery-groupsDiscoveryGroupa list of discovery groups to create 
discovery-group.name (attribute)Stringa unique name for the discovery group - mandatory 
discovery-group.local-bind-address Stringthe discovery group will be bound only to this local address 
discovery-group.group-address StringMulticast IP address of the group to listen on - mandatory 
discovery-group.group-port IntegerUDP port of the multicast group - mandatory 
discovery-group.refresh-timeout IntegerPeriod the discovery group waits after receiving the last broadcast from a particular server before removing that servers connector pair entry from its list.5000 (in milliseconds)
discovery-group.jgroups-file StringName of JGroups configuration file. If specified, the server uses JGroups for discovery. - Optional 
discovery-group.jgroups-channel StringName of JGroups Channel. If specified, the server uses the named channel for discovery. - Optional 
divertsDiverta list of diverts to use 
divert.name (attribute)Stringa unique name for the divert - mandatory 
divert.routing-nameStringthe routing name for the divert - mandatory 
divert.addressStringthe address this divert will divert from - mandatory 
divert.forwarding-addressStringthe forwarding address for the divert - mandatory 
divert.exclusiveBooleanis this divert exclusive?false
divert.filterStringan optional core filter expressionnull
divert.transformer-class-nameStringan optional class name of a transformer 
queuesQueuea list of pre configured queues to create 
queues.name (attribute)Stringunique name of this queue 
queues.addressStringaddress for this queue - mandatory 
queues.filterStringoptional core filter expression for this queuenull
queues.durableBooleanis this queue durable?true
bridgesBridgea list of bridges to create 
bridges.name (attribute)Stringunique name for this bridge 
bridges.queue-nameStringname of queue that this bridge consumes from - mandatory 
bridges.forwarding-addressStringaddress to forward to. If omitted original address is usednull
bridges.filterStringoptional core filter expressionnull
bridges.transformer-class-nameStringoptional name of transformer classnull
bridges.retry-intervalLongperiod (in ms) between successive retries2000 ms
bridges.retry-interval-multiplierDoublemultiplier to apply to successive retry intervals1.0
bridges.reconnect-attemptsIntegermaximum number of retry attempts, -1 signifies infinite-1
bridges.failover-on-server-shutdownBooleanshould failover be prompted if target server is cleanly shutdown?false
bridges.use-duplicate-detectionBooleanshould duplicate detection headers be inserted in forwarded messages?true
bridges.discovery-group-refStringname of discovery group used by this bridgenull
bridges.connector-ref.connector-name (attribute)Stringname of connector to use for live connection 
bridges.connector-ref.backup-connector-name (attribute)Stringoptional name of connector to use for backup connectionnull
cluster-connectionsClusterConnectiona list of cluster connections 
cluster-connections.name (attribute)Stringunique name for this cluster connection 
cluster-connections.addressStringname of address this cluster connection applies to 
cluster-connections.forward-when-no-consumersBooleanshould messages be load balanced if there are no matching consumers on target?false
cluster-connections.max-hopsIntegermaximum number of hops cluster topology is propagated1
cluster-connections.retry-intervalLongperiod (in ms) between successive retries2000
cluster-connections.use-duplicate-detectionBooleanshould duplicate detection headers be inserted in forwarded messages?true
cluster-connections.discovery-group-refStringname of discovery group used by this bridgenull
cluster-connections.connector-ref.connector-name (attribute)Stringname of connector to use for live connection 
cluster-connections.connector-ref.backup-connector-name (attribute)Stringoptional name of connector to use for backup connectionnull
security-settingsSecuritySettinga list of security settings 
security-settings.match (attribute)Stringthe string to use for matching security against an address 
security-settings.permissionSecurity Permissiona permision to add to the address 
security-settings.permission.type (attribute)Permission Typethe type of permission 
security-settings.permission.roles (attribute)Rolesa comma-separated list of roles to apply the permission to 
address-settingsAddressSettinga list of address settings 
address-settings.dead-letter-addressStringthe address to send dead messages to 
address-settings.max-delivery-attemptsIntegerhow many times to attempt to deliver a message before sending to dead letter address10
address-settings.expiry-addressStringthe address to send expired messages to 
address-settings.redelivery-delayLongthe time (in ms) to wait before redelivering a cancelled message.0
address-settings.last-value-queuebooleanwhether to treat the queue as a last value queuefalse
address-settings.page-size-bytesLongthe page size (in bytes) to use for an address10 * 1024 * 1024
address-settings.max-size-bytesLongthe maximum size (in bytes) to use in paging for an address-1
address-settings.redistribution-delayLonghow long (in ms) to wait after the last consumer is closed on a queue before redistributing messages.-1

This is the configuration file used by the server side JMS service to load JMS Queues, Topics and Connection Factories.


Continued..

connection-factory.signature (attribute) StringType of connection factorygeneric
connection-factory.xa BooleanIf it is a XA connection factoryfalse
connection-factory.auto-group Booleanwhether or not message grouping is automatically usedfalse
connection-factory.connectors StringA list of connectors used by the connection factory 
connection-factory.connectors.connector-ref.connector-name (attribute) StringName of the connector to connect to the live server 
connection-factory.connectors.connector-ref.backup-connector-name (attribute) StringName of the connector to connect to the backup server 
connection-factory.discovery-group-ref.discovery-group-name (attribute) StringName of discovery group used by this connection factory 
connection-factory.discovery-initial-wait-timeout Longthe initial time to wait (in ms) for discovery groups to wait for broadcasts10000
connection-factory.block-on-acknowledge Booleanwhether or not messages are acknowledged synchronouslyfalse
connection-factory.block-on-non-durable-send Booleanwhether or not non-durable messages are sent synchronouslyfalse
connection-factory.block-on-durable-send Booleanwhether or not durable messages are sent synchronouslytrue
connection-factory.call-timeoutLongthe timeout (in ms) for remote calls30000
connection-factory.client-failure-check-period Longthe period (in ms) after which the client will consider the connection failed after not receiving packets from the server5000
connection-factory.client-id Stringthe pre-configured client ID for the connection factorynull
connection-factory.connection-load-balancing-policy-class-name Stringthe name of the load balancing classorg.hornetq.api.core.client.loadbalance.RoundRobinConnectionLoadBalancingPolicy
connection-factory.connection-ttl Longthe time to live (in ms) for connections1 * 60000
connection-factory.consumer-max-rateIntegerthe fastest rate a consumer may consume messages per second-1
connection-factory.consumer-window-sizeIntegerthe window size (in bytes) for consumer flow control1024 * 1024
connection-factory.dups-ok-batch-sizeIntegerthe batch size (in bytes) between acknowledgements when using DUPS_OK_ACKNOWLEDGE mode1024 * 1024
connection-factory.failover-on-initial-connectionBooleanwhether or not to failover to backup on event that initial connection to live server failsfalse
connection-factory.failover-on-server-shutdownBooleanwhether or not to failover on server shutdownfalse
connection-factory.min-large-message-sizeIntegerthe size (in bytes) before a message is treated as large100 * 1024
connection-factory.cache-large-message-clientBooleanIf true clients using this connection factory will hold the large message body on temporary files.false
connection-factory.pre-acknowledgeBooleanwhether messages are pre acknowledged by the server before sendingfalse
connection-factory.producer-max-rateIntegerthe maximum rate of messages per second that can be sent-1
connection-factory.producer-window-sizeIntegerthe window size in bytes for producers sending messages1024 * 1024
connection-factory.confirmation-window-size Integerthe window size (in bytes) for reattachment confirmations1024 * 1024
connection-factory.reconnect-attempts Integermaximum number of retry attempts, -1 signifies infinite0
connection-factory.retry-interval Longthe time (in ms) to retry a connection after failing2000
connection-factory.retry-interval-multiplier Doublemultiplier to apply to successive retry intervals1.0
connection-factory.max-retry-intervalIntegerThe maximum retry interval in the case a retry-interval-multiplier has been specified2000
connection-factory.scheduled-thread-pool-max-size Integerthe size of the scheduled thread pool5
connection-factory.thread-pool-max-size Integerthe size of the thread pool-1
connection-factory.transaction-batch-size Integerthe batch size (in bytes) between acknowledgements when using a transactional session1024 * 1024
connection-factory.use-global-pools Booleanwhether or not to use a global thread pool for threadstrue
queueQueuea queue to create and add to JNDI 
queue.name (attribute)Stringunique name of the queue 
queue.entryStringcontext where the queue will be bound in JNDI (there can be many) 
queue.durableBooleanis the queue durable?true
queue.filterStringoptional filter expression for the queue 
topicTopica topic to create and add to JNDI 
topic.name (attribute)Stringunique name of the topic 
topic.entryStringcontext where the topic will be bound in JNDI (there can be many) 

By default all passwords in HornetQ server's configuration files are in plaintext form. This usually poses no security issues as those files should be well protected from unauthorized accessing. However, in some circumstances a user doesn't want to expose its passwords to more eyes than necessary.

HornetQ can be configured to use 'masked' passwords in its configuration files. A masked password is an obscure string representation of a real password. To mask a password a user will use an 'encoder'. The encoder takes in the real password and outputs the masked version. A user can then replace the real password in the configuration files with the new masked password. When HornetQ loads a masked password, it uses a suitable 'decoder' to decode it into real password.

Hornetq provides a default password encoder and decoder. Optionally users can use or implement their own encoder and decoder for masking the passwords.

As described in the previous sections, all password masking requires a decoder. A decoder uses an algorithm to convert a masked password into its original cleartext form in order to be used in various security operations. The algorithm used for decoding must match that for encoding. Otherwise the decoding may not be successful.

For user's convenience HornetQ provides a default built-in Decoder. However a user can if they so wish implement their own.