WorkerQueue.java |
/* * JBoss, the OpenSource J2EE webOS * * Distributable under LGPL license. * See terms of license at gnu.org. */ package org.jboss.util; /** * Class that queues {@link Executable} jobs that are executed sequentially * by a single thread. * * @see Executable * * @author <a href="mailto:simone.bordet@compaq.com">Simone Bordet</a> * @version $Revision: 1.1 $ */ public class WorkerQueue { /** The thread that runs the Executable jobs */ protected Thread m_queueThread; /** The job that will be executed by the worker thread */ private JobItem m_currentJob; /** * Creates a new worker queue with default thread name of "Worker Thread" */ public WorkerQueue() { this("Worker Thread"); } /** * Creates a new worker queue with the specified thread name */ public WorkerQueue(String threadName) { m_queueThread = new Thread(createQueueLoop(), threadName); } /** * Creates a new worker queue with the specified thread name * and daemon mode flag */ public WorkerQueue(String threadName, boolean isDaemon) { m_queueThread = new Thread(createQueueLoop(), threadName); m_queueThread.setDaemon(isDaemon); } /** * Starts the worker queue. * @see #stop */ public void start() { if (m_queueThread != null) {m_queueThread.start();} } /** * Stops nicely the worker queue. <br> * After this call trying to put a new job will result in a * InterruptedException to be thrown. The jobs queued before and not * yet processed are processed until the queue is empty, then this * worker queue is cleared. * @see #clear * @see #start * @see #isInterrupted */ public synchronized void stop() { if (m_queueThread != null) {m_queueThread.interrupt();} } /** * Called by a thread that is not the WorkerQueue thread, this method * queues the job and, if necessary, wakes up this worker queue that is * waiting in {@link #getJob}. */ public synchronized void putJob(Executable job) { // Preconditions if (m_queueThread == null || !m_queueThread.isAlive()) { throw new IllegalStateException("Can't put job, thread is not alive or not present"); } if (isInterrupted()) { throw new IllegalStateException("Can't put job, thread was interrupted"); } putJobImpl(job); } /** * Returns whether the worker thread has been interrupted. <br> * When this method returns true, it is not possible to put new jobs in the * queue and the already present jobs are executed and removed from the * queue, then the thread exits. * * @see #stop */ protected boolean isInterrupted() { return m_queueThread.isInterrupted(); } /** * Called by this class, this method checks if the queue is empty; * if it is, then waits, else returns the current job. * * @see #putJob */ protected synchronized Executable getJob() throws InterruptedException { // Preconditions if (m_queueThread == null || !m_queueThread.isAlive()) { throw new IllegalStateException(); } return getJobImpl(); } /** * Never call this method, only override in subclasses to perform * job getting in a specific way, normally tied to the data structure * holding the jobs. */ protected Executable getJobImpl() throws InterruptedException { // While the queue is empty, wait(); // when notified take an event from the queue and return it. while (m_currentJob == null) {wait();} // This one is the job to return JobItem item = m_currentJob; // Go on to the next object for the next call. m_currentJob = m_currentJob.m_next; return item.m_job; } /** * Never call this method, only override in subclasses to perform * job adding in a specific way, normally tied to the data structure * holding the jobs. */ protected void putJobImpl(Executable job) { JobItem posted = new JobItem(job); if (m_currentJob == null) { // The queue is empty, set the current job to process and // wake up the thread waiting in method getJob m_currentJob = posted; notifyAll(); } else { JobItem item = m_currentJob; // The queue is not empty, find the end of the queue ad add the // posted job at the end while (item.m_next != null) {item = item.m_next;} item.m_next = posted; } } /** * Clears the running thread after the queue has been stopped. <br> * After this call, this worker queue is unusable and can be garbaged. */ protected void clear() { m_queueThread = null; m_currentJob = null; } /** * Creates the loop that will get the next job and process it. <br> * Override in subclasses to create a custom loop. */ protected Runnable createQueueLoop() { return new QueueLoop(); } /** * Class that loops getting the next job to be executed and then * executing it, in the worker thread. */ protected class QueueLoop implements Runnable { public void run() { try { while (true) { try { if (isInterrupted()) { flush(); break; } else { getJob().execute(); } } catch (InterruptedException e) { try { flush(); } catch (Exception ignored) {} break; } catch (Exception e) { ThrowableHandler.add(ThrowableHandler.Type.ERROR, e); } } } finally { clear(); } } protected void flush() throws Exception { // Empty the queue of the posted jobs and exit while (m_currentJob != null) { m_currentJob.m_job.execute(); m_currentJob = m_currentJob.m_next; } } } /** * Simple linked cell, that has only a reference to the next job. */ private class JobItem { private Executable m_job; private JobItem m_next; private JobItem(Executable job) {m_job = job;} } }
WorkerQueue.java |