JBoss.orgCommunity Documentation
HornetQ supports sending and receiving of huge messages, even when the client and server are running with limited memory. The only realistic limit to the size of a message that can be sent or consumed is the amount of disk space you have available. We have tested sending and consuming messages up to 8 GiB in size with a client and server running in just 50MiB of RAM!
To send a large message, the user can set an InputStream
on a message
body, and when that message is sent, HornetQ will read the InputStream
. A
FileInputStream
could be used for example to send a huge message from
a huge file on disk.
As the InputStream
is read the data is sent to the server as a stream
of fragments. The server persists these fragments to disk as it receives them and when the
time comes to deliver them to a consumer they are read back of the disk, also in fragments
and sent down the wire. When the consumer receives a large message it initially receives
just the message with an empty body, it can then set an OutputStream
on
the message to stream the huge message body to a file on disk or elsewhere. At no time is
the entire message body stored fully in memory, either on the client or the server.
Large messages are stored on a disk directory on the server side, as configured on the main configuration file.
The configuration property large-messages-directory
specifies where
large messages are stored.
<configuration xmlns="urn:hornetq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd"> ... <large-messages-directory>/data/large-messages</large-messages-directory> ... </configuration
By default the large message directory is data/largemessages
For the best performance we recommend large messages directory is stored on a different physical volume to the message journal or paging directory.
Any message larger than a certain size is considered a large message. Large messages
will be split up and sent in fragments. This is determined by the parameter min-large-message-size
The default value is 100KiB.
If the HornetQ Core API is used, the minimal large message size is specified by
ClientSessionFactory.setMinLargeMessageSize
.
ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName())) locator.setMinLargeMessageSize(25 * 1024); ClientSessionFactory factory = HornetQClient.createClientSessionFactory();
Section 16.3, “Configuring the transport directly from the client side.” will provide more information on how to instantiate the session factory.
If JNDI is used to look up the connection factory, the minimum large message size
is specified in hornetq-jms.xml
... <connection-factory name="ConnectionFactory"> <connectors> <connector-ref connector-name="netty"/> </connectors> <entries> <entry name="ConnectionFactory"/> <entry name="XAConnectionFactory"/> </entries> <min-large-message-size>250000</min-large-message-size> </connection-factory> ...
If the connection factory is being instantiated directly, the minimum large
message size is specified by HornetQConnectionFactory.setMinLargeMessageSize
.
If you specify the boolean property compress-large-message
on
the server locator
or ConnectionFactory
The
system will use the ZIP algorithm to compress the message body as the message is
transfered to the server's side. Notice that there's no special treatment at the
server's side, all the compressing and uncompressing is done at the client.
HornetQ supports setting the body of messages using input and output streams (java.lang.io
)
These streams are then used directly for sending (input streams) and receiving (output streams) messages.
When receiving messages there are 2 ways to deal with the output stream; you may
choose to block while the output stream is recovered using the method ClientMessage.saveOutputStream
or alternatively using the method ClientMessage.setOutputstream
which will asynchronously write the message
to the stream. If you choose the latter the consumer must be kept alive until the
message has been fully received.
You can use any kind of stream you like. The most common use case is to send files
stored in your disk, but you could also send things like JDBC Blobs, SocketInputStream
, things you recovered from HTTPRequests
etc. Anything as long as it implements java.io.InputStream
for sending messages or java.io.OutputStream
for receiving them.
The following table shows a list of methods available at ClientMessage
which are also available through JMS by the use of
object properties.
Table 23.1. org.hornetq.api.core.client.ClientMessage API
Name | Description | JMS Equivalent Property |
---|---|---|
setBodyInputStream(InputStream) | Set the InputStream used to read a message body when sending it. | JMS_HQ_InputStream |
setOutputStream(OutputStream) | Set the OutputStream that will receive the body of a message. This method does not block. | JMS_HQ_OutputStream |
saveOutputStream(OutputStream) | Save the body of the message to the OutputStream . It will block until the entire content
is transferred to the OutputStream . | JMS_HQ_SaveStream |
To set the output stream when receiving a core message:
... ClientMessage msg = consumer.receive(...); // This will block here until the stream was transferred msg.saveOutputStream(someOutputStream); ClientMessage msg2 = consumer.receive(...); // This will not wait the transfer to finish msg.setOutputStream(someOtherOutputStream); ...
Set the input stream when sending a core message:
... ClientMessage msg = session.createMessage(); msg.setInputStream(dataInputStream); ...
Notice also that for messages with more than 2GiB the getBodySize() will return invalid values since this is an integer (which is also exposed to the JMS API). On those cases you can use the message property _HQ_LARGE_SIZE.
When using JMS, HornetQ maps the streaming methods on the core API (see Table 23.1, “org.hornetq.api.core.client.ClientMessage API”) by setting object properties . You
can use the method Message.setObjectProperty
to set the input and
output streams.
The InputStream
can be defined through the JMS Object Property
JMS_HQ_InputStream on messages being sent:
BytesMessage message = session.createBytesMessage(); FileInputStream fileInputStream = new FileInputStream(fileInput); BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream); message.setObjectProperty("JMS_HQ_InputStream", bufferedInput); someProducer.send(message);
The OutputStream
can be set through the JMS Object Property
JMS_HQ_SaveStream on messages being received in a blocking way.
BytesMessage messageReceived = (BytesMessage)messageConsumer.receive(120000); File outputFile = new File("huge_message_received.dat"); FileOutputStream fileOutputStream = new FileOutputStream(outputFile); BufferedOutputStream bufferedOutput = new BufferedOutputStream(fileOutputStream); // This will block until the entire content is saved on disk messageReceived.setObjectProperty("JMS_HQ_SaveStream", bufferedOutput);
Setting the OutputStream
could also be done in a non blocking
way using the property JMS_HQ_OutputStream.
// This won't wait the stream to finish. You need to keep the consumer active. messageReceived.setObjectProperty("JMS_HQ_OutputStream", bufferedOutput);
When using JMS, Streaming large messages are only supported on StreamMessage
and BytesMessage
.
If you choose not to use the InputStream
or OutputStream
capability of HornetQ You could still access the data
directly in an alternative fashion.
On the Core API just get the bytes of the body as you normally would.
ClientMessage msg = consumer.receive(); byte[] bytes = new byte[1024]; for (int i = 0 ; i < msg.getBodySize(); i += bytes.length) { msg.getBody().readBytes(bytes); // Whatever you want to do with the bytes }
If using JMS API, BytesMessage
and StreamMessage
also supports it transparently.
BytesMessage rm = (BytesMessage)cons.receive(10000); byte data[] = new byte[1024]; for (int i = 0; i < rm.getBodyLength(); i += 1024) { int numberOfBytes = rm.readBytes(data); // Do whatever you want with the data }
Please see Section 11.1.31, “Large Message” for an example which shows how large message is configured and used with JMS.