package org.jboss.mq.server;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.util.HashMap;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong;
import javax.jms.JMSException;
import javax.management.MBeanRegistration;
import javax.management.ObjectName;
import org.jboss.mq.DurableSubscriptionID;
import org.jboss.mq.SpyMessage;
import org.jboss.mq.pm.CacheStore;
import org.jboss.system.ServiceMBeanSupport;
public class MessageCache extends ServiceMBeanSupport implements MessageCacheMBean, MBeanRegistration, Runnable
{
public static final long ONE_MEGABYTE = 1024L * 1000;
public static final long DEFAULT_HIGH_MEMORY_MARK = ONE_MEGABYTE * 50;
public static final long DEFAULT_MAX_MEMORY_MARK = ONE_MEGABYTE * 60;
private LRUCache lruCache = new LRUCache();
private SynchronizedLong messageCounter = new SynchronizedLong(0);
long cacheHits = 0;
long cacheMisses = 0;
CacheStore cacheStore;
ObjectName cacheStoreName;
private Thread referenceSoftner;
private long highMemoryMark = DEFAULT_HIGH_MEMORY_MARK;
private long maxMemoryMark = DEFAULT_MAX_MEMORY_MARK;
private boolean makeSoftReferences = true;
private long lastSoften = 0L;
private long softenNoMoreOftenThanMillis = 0L;
private long softenAtLeastEveryMillis = 0L;
private long softenWaitMillis = 1000L;
private int minimumHard = 1;
private int maximumHard = 0;
int softRefCacheSize = 0;
int totalCacheSize = 0;
ReferenceQueue referenceQueue = new ReferenceQueue();
long softenedSize = 0;
boolean checkSoftReferenceDepth = false;
public MessageCache getInstance()
{
return this;
}
public MessageReference add(SpyMessage message, BasicQueue queue, int stored) throws javax.jms.JMSException
{
DurableSubscriptionID id = message.header.durableSubscriberID;
return addInternal(message, queue, stored, id);
}
public MessageReference add(SpyMessage message, BasicQueue queue, int stored, DurableSubscriptionID id) throws javax.jms.JMSException
{
return addInternal(message, queue, stored, id);
}
public MessageReference addInternal(SpyMessage message, BasicQueue queue, int stored, DurableSubscriptionID id) throws javax.jms.JMSException
{
MessageReference mh = new MessageReference();
mh.init(this, messageCounter.increment(), message, queue, id);
mh.setStored(stored);
synchronized (mh)
{
synchronized (lruCache)
{
lruCache.addMostRecent(mh);
totalCacheSize++;
}
}
validateSoftReferenceDepth();
return mh;
}
public void remove(MessageReference mr) throws JMSException
{
removeInternal(mr, true, true);
}
public void removeDelayed(MessageReference mr) throws JMSException
{
removeInternal(mr, true, false);
}
void soften(MessageReference mr) throws JMSException
{
removeInternal(mr, false, false);
if (makeSoftReferences)
softRefCacheSize++;
}
protected void removeInternal(MessageReference mr, boolean clear, boolean reset) throws JMSException
{
synchronized (mr)
{
if (mr.stored != MessageReference.REMOVED)
{
synchronized (lruCache)
{
if (mr.hardReference != null) lruCache.remove(mr);
if (clear)
totalCacheSize--;
}
if (clear)
mr.clear();
}
if (reset)
mr.reset();
}
}
public void run()
{
try
{
while (true)
{
Reference r = null;
if (checkSoftReferenceDepth)
r = referenceQueue.poll();
else
r = referenceQueue.remove(softenWaitMillis);
if (r != null)
{
softRefCacheSize--;
while ((r = referenceQueue.poll()) != null)
{
softRefCacheSize--;
}
if (log.isTraceEnabled())
log.trace("soft reference cache size is now: " + softRefCacheSize);
checkSoftReferenceDepth = true;
}
long now = System.currentTimeMillis();
if (softenNoMoreOftenThanMillis > 0 && (now - lastSoften < softenNoMoreOftenThanMillis))
checkSoftReferenceDepth = false;
else if (softenAtLeastEveryMillis > 0 && (now - lastSoften > softenAtLeastEveryMillis))
checkSoftReferenceDepth = true;
if (checkSoftReferenceDepth)
{
checkSoftReferenceDepth = validateSoftReferenceDepth();
if (checkSoftReferenceDepth == false)
lastSoften = now;
}
}
}
catch (InterruptedException e)
{
}
catch (Throwable t)
{
log.error("Message Cache Thread Stopped: ", t);
}
log.debug("Thread exiting.");
}
boolean validateSoftReferenceDepth() throws JMSException
{
boolean trace = log.isTraceEnabled();
while (getState() == ServiceMBeanSupport.STARTED)
{
MessageReference messageToSoften = null;
synchronized (lruCache)
{
int softenCount = 0;
int hardCount = getHardRefCacheSize();
int softCount = getSoftRefCacheSize();
if (hardCount <= minimumHard)
return false;
long currentMem = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
if (currentMem > highMemoryMark)
{
float severity = ((float) (currentMem - highMemoryMark)) / (maxMemoryMark - highMemoryMark);
severity = Math.min(severity, 1.0F);
if (trace)
log.trace("Memory usage serverity=" + severity);
int totalMessageInMem = hardCount + softCount;
int howManyShouldBeSoft = (int) ((totalMessageInMem) * severity);
softenCount = howManyShouldBeSoft - softCount;
}
if (maximumHard > 0)
{
int removeCount = hardCount - maximumHard;
if (removeCount > 0 && removeCount > softenCount)
softenCount = removeCount;
}
if (softenCount > hardCount)
{
if (trace)
log.trace("Soften count " + softenCount + " greater than hard references " + hardCount);
softenCount = hardCount;
}
if (softenCount > 1 || (maximumHard > 0 && hardCount > maximumHard))
{
if (trace)
log.trace("Need to soften " + softenCount + " messages");
Node node = lruCache.getLeastRecent();
messageToSoften = (MessageReference) node.data;
}
}
if (messageToSoften == null)
return false;
synchronized (messageToSoften)
{
if (messageToSoften.messageCache != null && messageToSoften.stored != MessageReference.REMOVED)
{
messageToSoften.makeSoft();
if (messageToSoften.stored == MessageReference.STORED)
{
softenedSize++;
return true;
}
else if (messageToSoften.isPersistent())
{
return false;
}
}
else if (trace)
log.trace("not softening removed message " + messageToSoften);
}
}
return false;
}
void messageReferenceUsedEvent(MessageReference mh, boolean wasHard) throws JMSException
{
synchronized (mh)
{
synchronized (lruCache)
{
if (wasHard)
lruCache.makeMostRecent(mh);
else
{
lruCache.addMostRecent(mh);
}
}
}
if (wasHard == false)
checkSoftReferenceDepth = true;
}
SpyMessage loadFromStorage(MessageReference mh) throws JMSException
{
return cacheStore.loadFromStorage(mh);
}
void saveToStorage(MessageReference mh, SpyMessage message) throws JMSException
{
cacheStore.saveToStorage(mh, message);
}
void removeFromStorage(MessageReference mh) throws JMSException
{
cacheStore.removeFromStorage(mh);
}
protected void startService() throws Exception
{
cacheStore = (CacheStore) getServer().getAttribute(cacheStoreName, "Instance");
referenceSoftner = new Thread(this, "JBossMQ Cache Reference Softner");
referenceSoftner.setDaemon(true);
referenceSoftner.start();
}
protected void stopService()
{
synchronized (lruCache)
{
referenceSoftner.interrupt();
referenceSoftner = null;
}
cacheStore = null;
}
public int getHardRefCacheSize()
{
synchronized (lruCache)
{
return lruCache.size();
}
}
public long getSoftenedSize()
{
return softenedSize;
}
public int getSoftRefCacheSize()
{
return softRefCacheSize;
}
public int getTotalCacheSize()
{
return totalCacheSize;
}
public long getCacheMisses()
{
return cacheMisses;
}
public long getCacheHits()
{
return cacheHits;
}
public boolean getMakeSoftReferences()
{
return makeSoftReferences;
}
public void setMakeSoftReferences(boolean makeSoftReferences)
{
this.makeSoftReferences = makeSoftReferences;
}
public int getMinimumHard()
{
return minimumHard;
}
public void setMinimumHard(int minimumHard)
{
if (minimumHard < 1)
this.minimumHard = 1;
else
this.minimumHard = minimumHard;
}
public int getMaximumHard()
{
return maximumHard;
}
public void setMaximumHard(int maximumHard)
{
if (maximumHard < 0)
this.maximumHard = 0;
else
this.maximumHard = maximumHard;
}
public long getSoftenWaitMillis()
{
return softenWaitMillis;
}
public void setSoftenWaitMillis(long millis)
{
if (millis < 1000)
softenWaitMillis = 1000;
else
softenWaitMillis = millis;
}
public long getSoftenNoMoreOftenThanMillis()
{
return softenNoMoreOftenThanMillis;
}
public void setSoftenNoMoreOftenThanMillis(long millis)
{
if (millis < 0)
softenNoMoreOftenThanMillis = 0;
else
softenNoMoreOftenThanMillis = millis;
}
public long getSoftenAtLeastEveryMillis()
{
return softenAtLeastEveryMillis;
}
public void setSoftenAtLeastEveryMillis(long millis)
{
if (millis < 0)
softenAtLeastEveryMillis = 0;
else
softenAtLeastEveryMillis = millis;
}
public long getHighMemoryMark()
{
return highMemoryMark / ONE_MEGABYTE;
}
public void setHighMemoryMark(long highMemoryMark)
{
if (highMemoryMark > 0)
this.highMemoryMark = highMemoryMark * ONE_MEGABYTE;
else
this.highMemoryMark = 0;
}
public long getMaxMemoryMark()
{
return maxMemoryMark / ONE_MEGABYTE;
}
public void setMaxMemoryMark(long maxMemoryMark)
{
if (maxMemoryMark > 0)
this.maxMemoryMark = maxMemoryMark * ONE_MEGABYTE;
else
this.maxMemoryMark = 0;
}
public long getCurrentMemoryUsage()
{
return (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()) / ONE_MEGABYTE;
}
public String getName()
{
return "MessageCache";
}
public void setCacheStore(ObjectName cacheStoreName)
{
this.cacheStoreName = cacheStoreName;
}
public ObjectName getCacheStore()
{
return cacheStoreName;
}
class LRUCache
{
int currentSize = 0;
HashMap map = new HashMap();
Node mostRecent = null;
Node leastRecent = null;
public void addMostRecent(Object o)
{
Node newNode = new Node();
newNode.data = o;
Object oldNode = map.put(o, newNode);
if (oldNode != null)
{
map.put(o, oldNode);
throw new RuntimeException("Can't add object '" + o + "' to LRUCache that is already in cache.");
}
if (mostRecent == null)
{
mostRecent = newNode;
leastRecent = newNode;
}
else
{
newNode.lessRecent = mostRecent;
mostRecent.moreRecent = newNode;
mostRecent = newNode;
}
++currentSize;
}
public void addLeastRecent(Object o)
{
Node newNode = new Node();
newNode.data = o;
Object oldNode = map.put(o, newNode);
if (oldNode != null)
{
map.put(o, oldNode);
throw new RuntimeException("Can't add object '" + o + "' to LRUCache that is already in cache.");
}
if (leastRecent == null)
{
mostRecent = newNode;
leastRecent = newNode;
}
else
{
newNode.moreRecent = leastRecent;
leastRecent.lessRecent = newNode;
leastRecent = newNode;
}
++currentSize;
}
public void remove(Object o)
{
Node node = (Node) map.remove(o);
if (node == null)
throw new RuntimeException("Can't remove object '" + o + "' that is not in cache.");
Node more = node.moreRecent;
Node less = node.lessRecent;
if (more == null)
{ mostRecent = less;
if (mostRecent != null)
{
mostRecent.moreRecent = null; }
}
else
{
more.lessRecent = less;
}
if (less == null)
{ leastRecent = more;
if (leastRecent != null)
{
leastRecent.lessRecent = null; }
}
else
{
less.moreRecent = more;
}
--currentSize;
}
public void makeMostRecent(Object o)
{
Node node = (Node) map.get(o);
if (node == null)
throw new RuntimeException("Can't make most recent object '" + o + "' that is not in cache.");
Node more = node.moreRecent;
Node less = node.lessRecent;
if (more == null) return;
else
more.lessRecent = less;
if (less == null) leastRecent = more;
else
less.moreRecent = more;
node.lessRecent = mostRecent;
node.moreRecent = null; mostRecent.moreRecent = node;
mostRecent = node;
}
public int size()
{
return currentSize;
}
public Node getMostRecent()
{
return mostRecent;
}
public Node getLeastRecent()
{
return leastRecent;
}
}
class Node
{
Node moreRecent = null;
Node lessRecent = null;
Object data = null;
}
}