package org.jboss.mq;
import java.util.Enumeration;
import javax.jms.BytesMessage;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageEOFException;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.StreamMessage;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import org.jboss.logging.Logger;
public class SpyMessageProducer implements MessageProducer
{
static Logger log = Logger.getLogger(SpyMessageProducer.class);
static boolean trace = log.isTraceEnabled();
protected SpySession session;
protected Destination destination;
protected int defaultDeliveryMode = SpyMessage.DEFAULT_DELIVERY_MODE;
protected int defaultPriority = SpyMessage.DEFAULT_PRIORITY;
protected long defaultTTL = SpyMessage.DEFAULT_TIME_TO_LIVE;
protected boolean closed = false;
private boolean disableMessageID = false;
private boolean disableTS = false;
SpyMessageProducer(SpySession session, Destination destination)
{
trace = log.isTraceEnabled();
this.session = session;
this.destination = destination;
try
{
if (destination instanceof TemporaryQueue || destination instanceof TemporaryTopic)
setDeliveryMode(DeliveryMode.NON_PERSISTENT);
else
setDeliveryMode(DeliveryMode.PERSISTENT);
}
catch (JMSException ignored)
{
log.debug("Ignored error during setDeliveryMode", ignored);
}
if (trace)
log.trace("New message producer " + this);
}
public void setDisableMessageID(boolean value) throws JMSException
{
checkClosed();
disableMessageID = value;
}
public void setDisableMessageTimestamp(boolean value) throws JMSException
{
checkClosed();
disableTS = value;
}
public void setDeliveryMode(int deli) throws JMSException
{
checkClosed();
if (deli != DeliveryMode.NON_PERSISTENT && deli != DeliveryMode.PERSISTENT)
throw new JMSException("Bad DeliveryMode value");
else
defaultDeliveryMode = deli;
}
public void setPriority(int pri) throws JMSException
{
checkClosed();
if (pri < 0 || pri > 9)
throw new JMSException("Bad priority value");
else
defaultPriority = pri;
}
public void setTimeToLive(int timeToLive) throws JMSException
{
checkClosed();
if (timeToLive < 0)
throw new JMSException("Bad TimeToLive value");
else
defaultTTL = timeToLive;
}
public void setTimeToLive(long timeToLive) throws JMSException
{
checkClosed();
if (timeToLive < 0)
throw new JMSException("Bad TimeToLive value");
else
defaultTTL = timeToLive;
}
public boolean getDisableMessageID() throws JMSException
{
checkClosed();
return disableMessageID;
}
public boolean getDisableMessageTimestamp() throws JMSException
{
checkClosed();
return disableTS;
}
public int getDeliveryMode() throws JMSException
{
checkClosed();
return defaultDeliveryMode;
}
public int getPriority() throws JMSException
{
checkClosed();
return defaultPriority;
}
public long getTimeToLive() throws JMSException
{
checkClosed();
return defaultTTL;
}
public void close() throws JMSException
{
closed = true;
session.removeProducer(this);
if (trace)
log.trace("Closed " + this);
}
public Destination getDestination() throws JMSException
{
checkClosed();
return destination;
}
public void send(Message message) throws JMSException
{
if (destination == null)
throw new UnsupportedOperationException(
"Not constructed with identifyed destination. Usage of method not allowed");
send(destination, message, defaultDeliveryMode, defaultPriority, defaultTTL);
}
public void send(Destination destination, Message message) throws JMSException
{
send(destination, message, defaultDeliveryMode, defaultPriority, defaultTTL);
}
public void send(Message message, int deliveryMode, int priority, long ttl) throws JMSException
{
if (destination == null)
throw new UnsupportedOperationException(
"Not constructed with identifyed destination. Usage of method not allowed");
send(destination, message, deliveryMode, priority, ttl);
}
public void send(Destination destination, Message message, int deliveryMode, int priority, long ttl)
throws JMSException
{
checkClosed();
if (this.destination != null && this.destination.equals(destination) == false)
throw new UnsupportedOperationException("Sending to " + destination
+ " not allowed when producer created with " + this.destination);
if (destination == null || (destination instanceof SpyDestination) == false)
throw new InvalidDestinationException("Destination is not an instance of SpyDestination " + destination);
SpyMessage sendMessage;
if ((message instanceof SpyMessage) == false)
sendMessage = encapsulateMessage(message);
else
sendMessage = (SpyMessage) message;
sendMessage.setJMSDestination(destination);
sendMessage.setJMSDeliveryMode(deliveryMode);
long ts = System.currentTimeMillis();
sendMessage.setJMSTimestamp(ts);
if (ttl == 0)
sendMessage.setJMSExpiration(0);
else
sendMessage.setJMSExpiration(ttl + ts);
sendMessage.setJMSPriority(priority);
String id = session.getNewMessageID();
sendMessage.setJMSMessageID(id);
if (message != sendMessage)
{
message.setJMSDestination(destination);
message.setJMSDeliveryMode(deliveryMode);
message.setJMSTimestamp(ts);
if (ttl == 0)
message.setJMSExpiration(0);
else
message.setJMSExpiration(ttl + ts);
message.setJMSPriority(priority);
message.setJMSMessageID(id);
}
if (trace)
log.trace("Sending message " + this + " \n" + sendMessage);
session.sendMessage(sendMessage);
}
public String toString()
{
StringBuffer buffer = new StringBuffer(100);
buffer.append("SpyMessageProducer@").append(System.identityHashCode(this));
buffer.append("[ dest=").append(destination);
if (defaultDeliveryMode == DeliveryMode.PERSISTENT)
buffer.append(" delivery=").append("persist");
else
buffer.append(" delivery=").append("besteffort");
buffer.append(" priority=").append(defaultPriority);
buffer.append(" ttl=").append(defaultTTL);
buffer.append(" disableMessageID=").append(disableMessageID);
buffer.append(" disableTS=").append(disableTS);
buffer.append(" session=").append(session);
buffer.append(']');
return buffer.toString();
}
protected void checkClosed() throws JMSException
{
if (closed)
throw new IllegalStateException("Message producer is closed");
}
protected SpyMessage encapsulateMessage(Message message) throws JMSException
{
SpyMessage result;
if (message instanceof BytesMessage)
{
result = MessagePool.getBytesMessage();
BytesMessage original = (BytesMessage) message;
original.reset();
byte[] temp = new byte[1024];
int bytes = original.readBytes(temp);
while (bytes != -1)
{
((BytesMessage) result).writeBytes(temp, 0, bytes);
bytes = original.readBytes(temp);
}
}
else if (message instanceof MapMessage)
{
result = MessagePool.getMapMessage();
MapMessage original = (MapMessage) message;
for (Enumeration en=original.getMapNames(); en.hasMoreElements();)
{
String key = (String) en.nextElement();
try
{
((MapMessage) result).setObject(key, original.getObject(key));
}
catch (JMSException ignored)
{
if (trace)
log.trace("Unable to copy map entry " + key, ignored);
}
}
}
else if (message instanceof StreamMessage)
{
result = MessagePool.getStreamMessage();
StreamMessage original = (StreamMessage) message;
original.reset();
try
{
while (true)
{
((StreamMessage) result).writeObject(original.readObject());
}
}
catch (MessageEOFException expected)
{
}
}
else if (message instanceof ObjectMessage)
{
result = MessagePool.getObjectMessage();
((ObjectMessage) result).setObject(((ObjectMessage) message).getObject());
}
else if (message instanceof TextMessage)
{
result = MessagePool.getTextMessage();
((TextMessage) result).setText(((TextMessage) message).getText());
}
else
result = MessagePool.getMessage();
try
{
result.setJMSCorrelationID(message.getJMSCorrelationID());
}
catch (JMSException e)
{
result.setJMSCorrelationIDAsBytes(message.getJMSCorrelationIDAsBytes());
}
result.setJMSReplyTo(message.getJMSReplyTo());
result.setJMSType(message.getJMSType());
for (Enumeration en=message.getPropertyNames(); en.hasMoreElements();)
{
String key = (String) en.nextElement();
try
{
result.setObjectProperty(key, message.getObjectProperty(key));
}
catch (JMSException ignored)
{
if (trace)
log.trace("Unable to copy property " + key, ignored);
}
}
return result;
}
}