package org.jboss.resource.work;
import javax.resource.spi.work.ExecutionContext;
import javax.resource.spi.work.Work;
import javax.resource.spi.work.WorkException;
import javax.resource.spi.work.WorkListener;
import javax.resource.spi.work.WorkManager;
import javax.transaction.xa.Xid;
import org.jboss.tm.JBossXATerminator;
import org.jboss.util.threadpool.Task;
import org.jboss.util.threadpool.ThreadPool;
import javax.management.ObjectName;
import org.jboss.system.ServiceMBeanSupport;
import org.jboss.mx.util.MBeanProxyExt;
public class JBossWorkManager extends ServiceMBeanSupport implements WorkManager, JBossWorkManagerMBean
{
private boolean trace = log.isTraceEnabled();
private ThreadPool threadPool;
private ObjectName threadPoolName;
private JBossXATerminator xaTerminator;
private ObjectName xaTerminatorName;
public ThreadPool getThreadPool()
{
return threadPool;
}
public void setThreadPool(ThreadPool threadPool)
{
this.threadPool = threadPool;
}
public ObjectName getThreadPoolName()
{
return threadPoolName;
}
public void setThreadPoolName(ObjectName threadPoolName)
{
this.threadPoolName = threadPoolName;
}
public ObjectName getXATerminatorName()
{
return xaTerminatorName;
}
public void setXATerminatorName(ObjectName xaTerminatorName)
{
this.xaTerminatorName = xaTerminatorName;
}
public WorkManager getInstance()
{
return this;
}
public void doWork(Work work, long startTimeout, ExecutionContext ctx, WorkListener listener) throws WorkException
{
if (ctx == null)
ctx = new ExecutionContext();
WorkWrapper wrapper = new WorkWrapper(this, work, Task.WAIT_FOR_COMPLETE, startTimeout, ctx, listener);
importWork(wrapper);
executeWork(wrapper);
if (wrapper.getWorkException() != null)
throw wrapper.getWorkException();
}
public void doWork(Work work) throws WorkException
{
doWork(work, WorkManager.INDEFINITE, null, null);
}
public long startWork(Work work, long startTimeout, ExecutionContext ctx, WorkListener listener) throws WorkException
{
if (ctx == null)
ctx = new ExecutionContext();
WorkWrapper wrapper = new WorkWrapper(this, work, Task.WAIT_FOR_START, startTimeout, ctx, listener);
importWork(wrapper);
executeWork(wrapper);
if (wrapper.getWorkException() != null)
throw wrapper.getWorkException();
return wrapper.getBlockedElapsed();
}
public long startWork(Work work) throws WorkException
{
return startWork(work, WorkManager.INDEFINITE, null, null);
}
public void scheduleWork(Work work, long startTimeout, ExecutionContext ctx, WorkListener listener) throws WorkException
{
if (ctx == null)
ctx = new ExecutionContext();
WorkWrapper wrapper = new WorkWrapper(this, work, Task.WAIT_NONE, startTimeout, ctx, listener);
importWork(wrapper);
executeWork(wrapper);
if (wrapper.getWorkException() != null)
throw wrapper.getWorkException();
}
public void scheduleWork(Work work) throws WorkException
{
scheduleWork(work, WorkManager.INDEFINITE, null, null);
}
protected void startService() throws Exception
{
if (threadPoolName == null)
throw new IllegalStateException("No thread pool name");
threadPool = (ThreadPool) server.getAttribute(threadPoolName, "Instance");
if (xaTerminatorName == null)
throw new IllegalStateException("No xa terminator name");
xaTerminator = (JBossXATerminator) server.getAttribute(xaTerminatorName, "XATerminator");
}
protected void importWork(WorkWrapper wrapper) throws WorkException
{
trace = log.isTraceEnabled();
if (trace)
log.trace("Importing work " + wrapper);
ExecutionContext ctx = wrapper.getExecutionContext();
if (ctx != null)
{
Xid xid = ctx.getXid();
if (xid != null)
{
long timeout = ctx.getTransactionTimeout();
xaTerminator.registerWork(wrapper.getWork(), xid, timeout);
}
}
if (trace)
log.trace("Imported work " + wrapper);
}
protected void executeWork(WorkWrapper wrapper) throws WorkException
{
if (trace)
log.trace("Submitting work to thread pool " + wrapper);
threadPool.runTaskWrapper(wrapper);
if (trace)
log.trace("Submitted work to thread pool " + wrapper);
}
protected void startWork(WorkWrapper wrapper) throws WorkException
{
if (trace)
log.trace("Starting work " + wrapper);
ExecutionContext ctx = wrapper.getExecutionContext();
if (ctx != null)
{
Xid xid = ctx.getXid();
if (xid != null)
{
xaTerminator.startWork(wrapper.getWork(), xid);
}
}
if (trace)
log.trace("Started work " + wrapper);
}
protected void endWork(WorkWrapper wrapper)
{
if (trace)
log.trace("Ending work " + wrapper);
ExecutionContext ctx = wrapper.getExecutionContext();
if (ctx != null)
{
Xid xid = ctx.getXid();
if (xid != null)
{
xaTerminator.endWork(wrapper.getWork(), xid);
}
}
if (trace)
log.trace("Ended work " + wrapper);
}
protected void cancelWork(WorkWrapper wrapper)
{
if (trace)
log.trace("Cancel work " + wrapper);
ExecutionContext ctx = wrapper.getExecutionContext();
if (ctx != null)
{
Xid xid = ctx.getXid();
if (xid != null)
{
xaTerminator.cancelWork(wrapper.getWork(), xid);
}
}
if (trace)
log.trace("Canceled work " + wrapper);
}
}