package org.jboss.ha.jmx;
import java.io.Serializable;
import java.util.List;
import java.util.Set;
import javax.management.Notification;
import javax.management.ObjectInstance;
import javax.management.Query;
import javax.management.QueryExp;
import javax.management.InstanceNotFoundException;
import org.jboss.ha.framework.interfaces.DistributedReplicantManager;
import org.jboss.ha.framework.interfaces.DistributedState;
import org.jboss.ha.framework.interfaces.HAPartition;
import org.jboss.ha.framework.server.ClusterPartition;
import org.jboss.ha.framework.server.ClusterPartitionMBean;
import org.jboss.mx.util.MBeanProxy;
import org.jboss.system.ServiceMBeanSupport;
import org.jboss.system.server.ServerConfigUtil;
public class HAServiceMBeanSupport
extends ServiceMBeanSupport
implements HAServiceMBean
{
private HAPartition partition_;
private String partitionName = ServerConfigUtil.getDefaultPartitionName();
private DistributedReplicantManager.ReplicantListener drmListener = null;
private String REPLICANT_TOKEN = "";
public HAServiceMBeanSupport()
{
}
public String getPartitionName()
{
return partitionName;
}
public void setPartitionName(String newPartitionName)
{
if ((getState() != STARTED) && (getState() != STARTING))
{
partitionName = newPartitionName;
}
}
public void setDistributedState(String key, Serializable value)
throws Exception
{
DistributedState ds = getPartition().getDistributedStateService();
ds.set(getServiceHAName(), key, value);
}
public Serializable getDistributedState(String key)
{
DistributedState ds = getPartition().getDistributedStateService();
return ds.get(getServiceHAName(), key);
}
protected void startService() throws Exception
{
boolean debug = log.isDebugEnabled();
if (debug)
log.debug("start HAServiceMBeanSupport");
setupPartition();
registerRPCHandler();
registerDRMListener();
}
protected void stopService() throws Exception
{
boolean debug = log.isDebugEnabled();
if (debug)
log.debug("stop HAServiceMBeanSupport");
unregisterDRMListener();
unregisterRPCHandler();
}
protected void setupPartition() throws Exception
{
String pName = getPartitionName();
partition_ = findHAPartitionWithName(pName);
}
protected void registerRPCHandler()
{
partition_.registerRPCHandler(getServiceHAName(), this);
}
protected void unregisterRPCHandler()
{
partition_.unregisterRPCHandler(getServiceHAName(), this);
}
protected void registerDRMListener() throws Exception
{
DistributedReplicantManager drm =
this.partition_.getDistributedReplicantManager();
drmListener = new DistributedReplicantManager.ReplicantListener()
{
public void replicantsChanged(
String key,
List newReplicants,
int newReplicantsViewId)
{
if (key.equals(getServiceHAName()))
{
HAServiceMBeanSupport.this.partitionTopologyChanged(newReplicants, newReplicantsViewId);
}
}
};
drm.registerListener(getServiceHAName(), drmListener);
drm.add(getServiceHAName(), REPLICANT_TOKEN);
}
protected void unregisterDRMListener() throws Exception
{
DistributedReplicantManager drm =
this.partition_.getDistributedReplicantManager();
drm.remove(getServiceHAName());
drm.unregisterListener(getServiceHAName(), drmListener);
}
public void partitionTopologyChanged(List newReplicants, int newReplicantsViewId)
{
boolean debug = log.isDebugEnabled();
if (debug)
{
log.debug("partitionTopologyChanged(). cluster view id: " + newReplicantsViewId);
}
}
protected boolean isDRMMasterReplica()
{
DistributedReplicantManager drm =
getPartition().getDistributedReplicantManager();
return drm.isMasterReplica(getServiceHAName());
}
public HAPartition getPartition()
{
return partition_;
}
public void callMethodOnPartition(String methodName, Object[] args)
throws Exception
{
getPartition().callMethodOnCluster(
getServiceHAName(),
methodName,
args,
true);
}
public void callMethodOnPartition(String methodName, Object[] args, Class[] types)
throws Exception
{
getPartition().callMethodOnCluster(
getServiceHAName(),
methodName,
args, types,
true);
}
public void sendNotification(Notification notification)
{
try
{
notification.setSource(this.getServiceName());
sendNotificationRemote(notification);
}
catch (Throwable th)
{
boolean debug = log.isDebugEnabled();
if (debug)
log.debug("sendNotificationRemote( " + notification + " ) failed ", th);
}
sendNotificationToLocalListeners(notification);
}
protected void sendNotificationToLocalListeners(Notification notification)
{
super.sendNotification(notification);
}
protected void callAsyncMethodOnPartition(String methodName, Object[] args, Class[] types)
throws Exception
{
HAPartition partition = getPartition();
if (partition != null)
{
getPartition().callAsynchMethodOnCluster(
getServiceHAName(),
methodName,
args, types,
true);
}
}
protected void sendNotificationRemote(Notification notification)
throws Exception
{
callAsyncMethodOnPartition("_receiveRemoteNotification",
new Object[]{notification}, new Class[]{Notification.class});
}
public void _receiveRemoteNotification(Notification notification)
{
super.sendNotification(notification);
}
public String getServiceHAName()
{
return getServiceName().getCanonicalName();
}
protected HAPartition findHAPartitionWithName(String name) throws Exception
{
log.debug("findHAPartitionWithName, name="+name);
HAPartition result = null;
QueryExp classEQ = Query.eq(Query.classattr(),
Query.value(ClusterPartition.class.getName()));
QueryExp matchPartitionName = Query.match(Query.attr("PartitionName"),
Query.value(name));
QueryExp exp = Query.and(classEQ, matchPartitionName);
Set mbeans = this.getServer().queryMBeans(null, exp);
if (mbeans != null && mbeans.size() > 0)
{
ObjectInstance inst = (ObjectInstance) (mbeans.iterator().next());
ClusterPartitionMBean cp =
(ClusterPartitionMBean) MBeanProxy.get(
ClusterPartitionMBean.class,
inst.getObjectName(),
this.getServer());
result = cp.getHAPartition();
}
if( result == null )
{
String msg = "Failed to find HAPartition with PartitionName="+name;
throw new InstanceNotFoundException(msg);
}
return result;
}
}