package org.jboss.cache.invalidation;
import javax.transaction.Transaction;
import org.jboss.logging.Logger;
import org.jboss.tm.TransactionLocal;
import java.util.HashMap;
import javax.transaction.Synchronization;
import java.io.Serializable;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
public class InvalidationsTxGrouper
{
private static final TransactionLocal synchLocal = new TransactionLocal();
static Logger log = Logger.getLogger(InvalidationsTxGrouper.class);
public static void registerInvalidationSynchronization(Transaction tx, InvalidationGroup group, Serializable key)
throws Exception
{
InvalidatorSynchronization synch = (InvalidatorSynchronization) synchLocal.get(tx);
if(synch == null)
{
synch = new InvalidatorSynchronization(tx);
synchLocal.set(tx, synch);
tx.registerSynchronization(synch);
}
synch.addInvalidation(group, key);
}
}
class InvalidatorSynchronization
implements Synchronization
{
protected Transaction tx;
protected HashMap ids = new HashMap();
InvalidatorSynchronization(Transaction tx)
{
this.tx = tx;
}
public void addInvalidation(InvalidationGroup group, Serializable key)
{
InvalidationManagerMBean im = group.getInvalidationManager();
Map relatedInvalidationMgr;
synchronized(ids)
{
relatedInvalidationMgr = (HashMap) ids.get(im);
if(relatedInvalidationMgr == null)
{
relatedInvalidationMgr = new HashMap();
ids.put(im, relatedInvalidationMgr);
}
}
Set relatedInvalidations;
synchronized(relatedInvalidationMgr)
{
relatedInvalidations = (HashSet) relatedInvalidationMgr.get(group);
if(relatedInvalidations == null)
{
relatedInvalidations = new HashSet();
relatedInvalidationMgr.put(group, relatedInvalidations);
}
}
relatedInvalidations.add(key);
}
public void beforeCompletion()
{
}
public void afterCompletion(int status)
{
ClassLoader oldCl = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
try
{
try
{
sendBatchInvalidations();
}
catch(Exception ex)
{
InvalidationsTxGrouper.log.warn("Failed sending invalidations messages", ex);
}
}
finally
{
Thread.currentThread().setContextClassLoader(oldCl);
}
}
protected void sendBatchInvalidations()
{
boolean trace = InvalidationsTxGrouper.log.isTraceEnabled();
if(trace)
{
InvalidationsTxGrouper.log.trace("Begin sendBatchInvalidations, tx=" + tx);
}
Iterator imIter = ids.keySet().iterator();
while(imIter.hasNext())
{
InvalidationManagerMBean im = (InvalidationManagerMBean) imIter.next();
HashMap relatedInvalidationMgr = (HashMap) ids.get(im);
BatchInvalidation[] bomb = new BatchInvalidation[relatedInvalidationMgr.size()];
Iterator groupsIter = relatedInvalidationMgr.keySet().iterator();
int i = 0;
while(groupsIter.hasNext())
{
InvalidationGroup group = (InvalidationGroup) groupsIter.next();
HashSet sourceIds = (HashSet) relatedInvalidationMgr.get(group);
String groupName = group.getGroupName();
if(trace)
{
InvalidationsTxGrouper.log.trace("Adding ids to bomb(" + groupName + "): " + sourceIds);
}
Serializable[] ids = new Serializable[sourceIds.size()];
sourceIds.toArray(ids);
BatchInvalidation batch = new BatchInvalidation(ids, groupName);
bomb[i] = batch;
i++;
}
im.batchInvalidate(bomb);
}
if(trace)
{
InvalidationsTxGrouper.log.trace("End sendBatchInvalidations, tx=" + tx);
}
this.ids = null;
}
}