package org.jboss.mq.server;
import java.lang.ref.SoftReference;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import org.jboss.logging.Logger;
import org.jboss.mq.DurableSubscriptionID;
import org.jboss.mq.SpyMessage;
public class MessageReference implements Comparable
{
static Logger log = Logger.getLogger(MessageReference.class);
public static final int NOT_STORED = 1;
public static final int STORED = 2;
public static final int REMOVED = 3;
public long referenceId;
public SpyMessage hardReference;
public byte jmsPriority;
public long messageId;
public int jmsDeliveryMode;
public long messageScheduledDelivery;
public long messageExpiration;
public boolean redelivered;
public long redeliveryDelay;
public int redeliveryCount;
public BasicQueue queue;
public MessageCache messageCache;
public SoftReference softReference;
public DurableSubscriptionID durableSubscriberID;
public int stored;
transient public Object persistData;
MessageReference()
{
}
void init(MessageCache messageCache, long referenceId, SpyMessage message, BasicQueue queue, DurableSubscriptionID id) throws JMSException
{
this.messageCache = messageCache;
this.hardReference = message;
this.referenceId = referenceId;
this.jmsPriority = (byte) message.getJMSPriority();
this.messageId = message.header.messageId;
this.stored = NOT_STORED;
this.jmsDeliveryMode = message.header.jmsDeliveryMode;
if (message.propertyExists(SpyMessage.PROPERTY_SCHEDULED_DELIVERY))
this.messageScheduledDelivery = message.getLongProperty(SpyMessage.PROPERTY_SCHEDULED_DELIVERY);
if (message.propertyExists(SpyMessage.PROPERTY_REDELIVERY_DELAY))
this.redeliveryDelay = message.getLongProperty(SpyMessage.PROPERTY_REDELIVERY_DELAY);
else if (queue.parameters.redeliveryDelay > 0)
this.redeliveryDelay = queue.parameters.redeliveryDelay;
if (queue.parameters.redeliveryLimit > -1 && !message.propertyExists(SpyMessage.PROPERTY_REDELIVERY_LIMIT))
message.header.jmsProperties.put(SpyMessage.PROPERTY_REDELIVERY_LIMIT,
new Integer(queue.parameters.redeliveryLimit));
this.messageExpiration = message.getJMSExpiration();
this.redelivered = message.getJMSRedelivered();
if (message.propertyExists(SpyMessage.PROPERTY_REDELIVERY_COUNT))
this.redeliveryCount = message.getIntProperty(SpyMessage.PROPERTY_REDELIVERY_COUNT);
this.durableSubscriberID = id;
this.queue = queue;
this.persistData = null;
}
void reset()
{
if (softReference != null && softReference.get() != null)
messageCache.softRefCacheSize--;
this.messageCache = null;
this.hardReference = null;
this.softReference = null;
this.queue = null;
this.stored = 0;
this.jmsDeliveryMode = 0;
this.persistData = null;
this.durableSubscriberID = null;
}
public SpyMessage getMessageForDelivery() throws JMSException
{
SpyMessage message = getMessage();
if (queue.parameters.lateClone)
{
message = message.myClone();
message.header.durableSubscriberID = durableSubscriberID;
message.header.jmsRedelivered = redelivered;
message.header.jmsProperties.put(SpyMessage.PROPERTY_REDELIVERY_COUNT, new Integer(redeliveryCount));
}
return message;
}
public SpyMessage getMessage() throws JMSException
{
SpyMessage result = null;
synchronized (this)
{
if (hardReference == null)
{
makeHard();
result = hardReference;
messageCache.messageReferenceUsedEvent(this, false);
}
else
{
result = hardReference;
messageCache.cacheHits++;
messageCache.messageReferenceUsedEvent(this, true);
}
return result;
}
}
public void redelivered() throws JMSException
{
this.redelivered = true;
if (redeliveryDelay != 0)
{
log.trace("message has redelivery delay");
messageScheduledDelivery = System.currentTimeMillis() + redeliveryDelay;
}
++redeliveryCount;
if (queue.parameters.lateClone == false)
{
SpyMessage message = getMessage();
message.setJMSRedelivered(redelivered);
message.header.jmsProperties.put(SpyMessage.PROPERTY_REDELIVERY_COUNT, new Integer(redeliveryCount));
}
}
public boolean isExpired()
{
if (messageExpiration == 0)
return false;
long ts = System.currentTimeMillis();
return messageExpiration < ts;
}
public boolean isPersistent()
{
return queue instanceof PersistentQueue && jmsDeliveryMode == DeliveryMode.PERSISTENT;
}
public boolean inMemory()
{
return queue.parameters.inMemory;
}
public String getPersistentKey()
{
return queue.getDescription();
}
public boolean isLateClone()
{
return queue.parameters.lateClone;
}
public SpyMessage.Header getHeaders() throws JMSException
{
return getMessage().header;
}
void clear() throws JMSException
{
synchronized (this)
{
if (stored == STORED)
messageCache.removeFromStorage(this);
stored = MessageReference.REMOVED;
}
}
public void invalidate() throws JMSException
{
synchronized (this)
{
if (stored == STORED)
{
if (hardReference == null)
{
makeHard();
messageCache.messageReferenceUsedEvent(this, false);
}
messageCache.removeFromStorage(this);
}
}
}
public void removeDelayed() throws JMSException
{
messageCache.removeDelayed(this);
}
void makeSoft() throws JMSException
{
boolean trace = log.isTraceEnabled();
synchronized (this)
{
if (stored == REMOVED)
throw new JMSException("CACHE ERROR: makeSoft() on a removed message " + this);
if (softReference != null)
{
if (stored == NOT_STORED)
throw new JMSException("CACHE ERROR: soft reference to unstored message " + this);
return;
}
if (stored == NOT_STORED)
messageCache.saveToStorage(this, hardReference);
if (stored != STORED)
{
if (trace)
log.trace("saveToStorage rejected by cache " + toString());
return;
}
if (messageCache.getMakeSoftReferences())
softReference = new SoftReference(hardReference, messageCache.referenceQueue);
messageCache.soften(this);
hardReference = null;
}
}
public void setStored(int stored)
{
this.stored = stored;
}
void makeHard() throws JMSException
{
synchronized (this)
{
if (stored == REMOVED)
throw new JMSException("CACHE ERROR: makeHard() on a removed message " + this);
if (hardReference != null)
return;
if (softReference != null)
hardReference = (SpyMessage) softReference.get();
if (hardReference == null)
{
hardReference = messageCache.loadFromStorage(this);
messageCache.cacheMisses++;
}
else
{
messageCache.cacheHits++;
}
if (softReference != null && softReference.get() != null)
messageCache.softRefCacheSize--;
softReference = null;
}
}
public boolean equals(Object o)
{
try
{
return referenceId == ((MessageReference) o).referenceId;
}
catch (Throwable e)
{
return false;
}
}
public int compareTo(Object o)
{
MessageReference sm = (MessageReference) o;
if (jmsPriority > sm.jmsPriority)
{
return -1;
}
if (jmsPriority < sm.jmsPriority)
{
return 1;
}
return (int) (messageId - sm.messageId);
}
public String toString()
{
StringBuffer buffer = new StringBuffer(100);
if (messageCache == null)
buffer.append(" NOT IN CACHE hashCode=").append(hashCode());
else
{
buffer.append(referenceId);
buffer.append(" msg=").append(messageId);
if (hardReference != null)
buffer.append(" hard");
if (softReference != null)
buffer.append(" soft");
switch (stored)
{
case NOT_STORED :
buffer.append(" NOT_STORED");
break;
case STORED :
buffer.append(" STORED");
break;
case REMOVED :
buffer.append(" REMOVED");
break;
}
switch (jmsDeliveryMode)
{
case DeliveryMode.NON_PERSISTENT :
buffer.append(" NON_PERSISTENT");
break;
case DeliveryMode.PERSISTENT :
buffer.append(" PERSISTENT");
break;
}
if (persistData != null)
buffer.append(" persistData=").append(persistData);
if (queue != null)
buffer.append(" queue=").append(queue.getDescription());
else
buffer.append(" NO_QUEUE");
buffer.append(" priority=").append(jmsPriority);
buffer.append(" lateClone=").append(isLateClone());
buffer.append(" hashCode=").append(hashCode());
}
return buffer.toString();
}
}