package org.jboss.web.tomcat.tc5.session;
import java.io.IOException;
import java.util.*;
import javax.management.ObjectName;
import org.jboss.cache.CacheException;
import org.jboss.cache.Fqn;
import org.jboss.cache.TreeCache;
import org.jboss.cache.TreeCacheListener;
import org.jboss.cache.TreeCacheMBean;
import org.jboss.cache.lock.TimeoutException;
import org.jboss.invocation.MarshalledValue;
import org.jboss.logging.Logger;
import org.jboss.mx.util.MBeanProxyExt;
import org.jboss.web.tomcat.tc5.Tomcat5;
import org.jgroups.View;
public class JBossCacheService implements TreeCacheListener
{
private TreeCacheMBean proxy_;
private ObjectName cacheServiceName_;
protected static Logger log_ = Logger.getLogger(JBossCacheService.class);
public static final String SESSION = "JSESSION";
public static final String ATTRIBUTE = "ATTRIBUTE";
public static final String KEY = "ATRR_KEY";
private static final int RETRY = 3;
private static final String VERSION_KEY = "VERSION";
private String jvmRoute_;
private String webAppPath_;
private List newSessionIDList_;
private ClassLoader tcl_;
private JBossCacheManager manager_;
public JBossCacheService() throws ClusteringNotSupportedException
{
try
{
cacheServiceName_ = new ObjectName(Tomcat5.DEFAULT_CACHE_NAME);
proxy_ = (TreeCacheMBean) MBeanProxyExt.create(TreeCacheMBean.class, cacheServiceName_);
if (proxy_ == null)
{
throw new RuntimeException("JBossCacheService: locate null TomcatCacheMbean");
}
newSessionIDList_ = new ArrayList();
}
catch (Throwable e)
{
String str = cacheServiceName_ + " service to Tomcat clustering not found";
log_.error(str);
throw new ClusteringNotSupportedException(str);
}
}
public void start(ClassLoader tcl, JBossCacheManager manager)
{
tcl_ = tcl;
manager_ = manager;
jvmRoute_ = null;
proxy_.addTreeCacheListener(this);
String path = manager_.getContainer().getName();
if( path.length() == 0 || path.equals("/")) {
webAppPath_ = "ROOT";
} if ( path.startsWith("/") ) {
webAppPath_ = path.substring(1);
} else {
webAppPath_ = path;
}
log_.debug("Old and new web app path are: " +path + ", " +webAppPath_);
}
public void stop()
{
proxy_.removeTreeCacheListener(this);
}
public List findSessionIDs()
{
List ids = new ArrayList();
try {
Object[] objs = new Object[]{SESSION, webAppPath_};
Fqn path = new Fqn( objs );
Set names = proxy_.getChildrenNames(path);
if( names == null ) return ids;
for(Iterator it = names.iterator(); it.hasNext();) {
Object id = it.next();
if(id==null) continue;
ids.add(id);
if(log_.isTraceEnabled()) {
log_.trace("Retrieving through web app path with fqn: " +path + " and session id: " +id);
}
}
} catch (CacheException e) {
e.printStackTrace();
throw new RuntimeException("JBossCacheService: exception occurred in cache getChildrenNames ... ", e);
}
return ids;
}
public Object getSession(String id)
{
String realId = stripJvmRoute(id);
Fqn fqn = getSessionFqn(realId);
return getUnMarshalledValue(_get(fqn, realId));
}
public void putSession(String id, Object session)
{
String realId = stripJvmRoute(id);
Fqn fqn = getSessionFqn(realId);
_put(fqn, realId, getMarshalledValue(session));
_put(fqn, VERSION_KEY, new Integer(((ClusteredSession)session).getVersion()));
}
public Object removeSession(String id)
{
String realId = stripJvmRoute(id);
Fqn fqn = getSessionFqn(realId);
if (log_.isDebugEnabled())
{
log_.debug("Remove session from distributed store. Fqn: " + fqn);
}
Object obj = getUnMarshalledValue(_remove(fqn, realId));
_remove(fqn);
return obj;
}
public void removeSessionLocal(String id)
{
String realId = stripJvmRoute(id);
Fqn fqn = getSessionFqn(realId);
if (log_.isDebugEnabled())
{
log_.debug("Remove session from my own distributed store only. Fqn: " + fqn);
}
_evict(fqn);
}
public boolean exists(String id)
{
String realId = stripJvmRoute(id);
Fqn fqn = getSessionFqn(realId);
return proxy_.exists(fqn);
}
public Object getAttribute(String id, String key)
{
String realId = stripJvmRoute(id);
Fqn fqn = getAttributeFqn(realId);
return getUnMarshalledValue(_get(fqn, key));
}
public Object putAttribute(String id, String key, Object value)
{
String realId = stripJvmRoute(id);
Fqn fqn = getAttributeFqn(realId);
return _put(fqn, key, getMarshalledValue(value));
}
public void putAttribute(String id, Map map)
{
String realId = stripJvmRoute(id);
Fqn fqn = getAttributeFqn(realId);
Set set = map.keySet();
Iterator it = set.iterator();
while (it.hasNext())
{
String key = (String) it.next();
_put(fqn, key, getMarshalledValue(map.get(key)));
}
}
public void removeAttributes(String id)
{
String realId = stripJvmRoute(id);
Fqn fqn = getAttributeFqn(realId);
_remove(fqn);
}
public Object removeAttribute(String id, String key)
{
String realId = stripJvmRoute(id);
Fqn fqn = getAttributeFqn(realId);
if (log_.isDebugEnabled())
{
log_.debug("Remove attribute from distributed store. Fqn: " + fqn + " key: " + key);
}
return getUnMarshalledValue(_remove(fqn, key));
}
public void removeAttributeLocal(String id)
{
String realId = stripJvmRoute(id);
Fqn fqn = getAttributeFqn(realId);
if (log_.isDebugEnabled())
{
log_.debug("Remove attributes from my own distributed store only. Fqn: " + fqn);
}
_evict(fqn);
}
public Set getAttributeKeys(String id)
{
if (id == null || id.length() == 0)
throw new IllegalArgumentException("JBossCacheService: id is either null or empty");
String realId = stripJvmRoute(id);
Fqn fqn = getAttributeFqn(realId);
try
{
return proxy_.getKeys(fqn);
}
catch (CacheException e)
{
e.printStackTrace();
}
return null;
}
public Map getAttributes(String id)
{
if (id == null || id.length() == 0) return new HashMap();
Set set = getAttributeKeys(id);
String realId = stripJvmRoute(id);
Fqn fqn = getAttributeFqn(realId);
Map map = new HashMap();
if(set == null) return map;
for (Iterator it = set.iterator(); it.hasNext();)
{
String key = (String) it.next();
Object value = getAttribute(id, key);
map.put(key, value);
}
return map;
}
public List getNewSessionsInStore()
{
List list = new ArrayList();
synchronized(newSessionIDList_)
{
if(newSessionIDList_.size() != 0)
{
list.addAll(newSessionIDList_);
newSessionIDList_.clear();
}
}
return list;
}
protected Object _get(Fqn fqn, String id)
{
Exception ex = null;
for (int i = 0; i < RETRY; i++)
{
try
{
return proxy_.get(fqn, id);
}
catch (TimeoutException e)
{
e.printStackTrace();
ex = e;
}
catch (Exception e)
{
e.printStackTrace();
throw new RuntimeException("JBossCacheService: exception occurred in cache get ... ", e);
}
}
throw new RuntimeException("JBossCacheService: exception occurred in cache get after retry ... ", ex);
}
protected Object _put(Fqn fqn, String id, Object value)
{
Exception ex = null;
for (int i = 0; i < RETRY; i++)
{
try
{
return proxy_.put(fqn, id, value);
}
catch (TimeoutException e)
{
e.printStackTrace();
ex = e;
}
catch (Exception e)
{
e.printStackTrace();
throw new RuntimeException("JBossCacheService: exception occurred in cache put ... ", e);
}
}
throw new RuntimeException("JBossCacheService: exception occurred in cache put after retry ... ", ex);
}
protected void _put(Fqn fqn, Map map)
{
Exception ex = null;
for (int i = 0; i < RETRY; i++)
{
try
{
proxy_.put(fqn, map);
return;
}
catch (TimeoutException e)
{
e.printStackTrace();
ex = e;
}
catch (Exception e)
{
e.printStackTrace();
throw new RuntimeException("JBossCacheService: exception occurred in cache put ... ", e);
}
}
throw new RuntimeException("JBossCacheService: exception occurred in cache put after retry ... ", ex);
}
protected Object _remove(Fqn fqn, String id)
{
Exception ex = null;
for (int i = 0; i < RETRY; i++)
{
try
{
return proxy_.remove(fqn, id);
}
catch (TimeoutException e)
{
e.printStackTrace();
ex = e;
}
catch (Exception e)
{
e.printStackTrace();
throw new RuntimeException("JBossCacheService: exception occurred in cache remove ... ", e);
}
}
throw new RuntimeException("JBossCacheService: exception occurred in cache remove after retry ... ", ex);
}
protected void _remove(Fqn fqn)
{
Exception ex = null;
for (int i = 0; i < RETRY; i++)
{
try
{
proxy_.remove(fqn);
return;
}
catch (TimeoutException e)
{
e.printStackTrace();
ex = e;
}
catch (Exception e)
{
e.printStackTrace();
throw new RuntimeException("JBossCacheService: exception occurred in cache remove ... ", e);
}
}
throw new RuntimeException("JBossCacheService: exception occurred in cache remove after retry ... ", ex);
}
protected void _evict(Fqn fqn)
{
Exception ex = null;
for (int i = 0; i < RETRY; i++)
{
try
{
proxy_.evict(fqn);
return;
}
catch (TimeoutException e)
{
e.printStackTrace();
ex = e;
}
catch (Exception e)
{
e.printStackTrace();
throw new RuntimeException("JBossCacheService: exception occurred in cache evict ... ", e);
}
}
throw new RuntimeException("JBossCacheService: exception occurred in cache evict after retry ... ", ex);
}
private String stripJvmRoute(String id)
{
int index = id.indexOf(".");
if (index > 0)
{
if(jvmRoute_ == null)
jvmRoute_ = id.substring(index);
return id.substring(0, index);
}
else
{
return id;
}
}
private Fqn getSessionFqn(String id)
{
Object[] objs = new Object[]{SESSION, webAppPath_, id};
return new Fqn(objs);
}
private Fqn getAttributeFqn(String id)
{
Object[] objs = new Object[]{SESSION, webAppPath_, id, ATTRIBUTE};
return new Fqn(objs);
}
private Object getMarshalledValue(Object value)
{
try
{
return new MarshalledValue(value);
}
catch (IOException e)
{
e.printStackTrace();
return null;
}
}
private Object getUnMarshalledValue(Object mv)
{
if (mv == null) return null;
ClassLoader prevTCL = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(tcl_);
try
{
return ((MarshalledValue) mv).get();
}
catch (IOException e)
{
e.printStackTrace();
return null;
}
catch (ClassNotFoundException e)
{
e.printStackTrace();
return null;
}
finally
{
Thread.currentThread().setContextClassLoader(prevTCL);
}
}
public void nodeCreated(Fqn fqn)
{
}
public void nodeRemoved(Fqn fqn)
{
nodeDirty(fqn);
}
public void nodeLoaded(Fqn fqn)
{
}
public void nodeModified(Fqn fqn)
{
nodeDirty(fqn);
}
protected void nodeDirty(Fqn fqn)
{
Integer version = (Integer)_get(fqn, VERSION_KEY);
if(version != null)
{
String realId = getIdFromFqn(fqn);
ClusteredSession session = (ClusteredSession)manager_.findLocalSession(realId);
if( session != null )
{
if( session.isNewData(version.intValue()))
{
session.setIsOutdated(true);
if(log_.isDebugEnabled())
{
log_.debug("nodeDirty(): session in-memory data is invalidated with id: " +realId
+ " and verion id: " +version.intValue());
}
}
} else
{
synchronized(newSessionIDList_)
{
if(!newSessionIDList_.contains(realId))
newSessionIDList_.add(realId);
}
}
}
}
protected String getIdFromFqn(Fqn fqn)
{
return (String)fqn.get(fqn.size()-1);
}
public void nodeVisited(Fqn fqn)
{
}
public void cacheStarted(TreeCache cache)
{
}
public void cacheStopped(TreeCache cache)
{
}
public void viewChange(View new_view)
{
}
public void nodeEvicted(Fqn fqn)
{
}
}