| SessionManager.java |
/*
* JBoss, the OpenSource J2EE webOS
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jboss.jms.serverless;
import org.jboss.logging.Logger;
import javax.jms.Session;
import java.util.List;
import java.util.ArrayList;
import javax.jms.JMSException;
import java.util.Iterator;
import javax.jms.Message;
/**
* The main reason for this class to exist is to insure synchronized access to the connection's
* session list. It also handles message delivery from the group to sessions and vice-versa.
*
* @author Ovidiu Feodorov <ovidiu@jboss.org>
* @version $Revision: 1.1 $ $Date: 2004/04/15 22:54:19 $
*
**/
class SessionManager implements Runnable {
private static final Logger log = Logger.getLogger(SessionManager.class);
private GroupConnection connection;
private org.jgroups.util.Queue deliveryQueue;
private Thread deliveryThread;
private List sessions;
private int sessionCounter = 0;
SessionManager(GroupConnection connection, org.jgroups.util.Queue deliveryQueue) {
this.connection = connection;
this.deliveryQueue = deliveryQueue;
sessions = new ArrayList();
deliveryThread = new Thread(this, "Session Delivery Thread");
deliveryThread.start();
}
public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
Session s = new SessionImpl(this, generateSessionID(), transacted, acknowledgeMode);
synchronized(sessions) {
sessions.add(s);
}
return s;
}
/**
* The only way for the managed Session to access the Connection instance. If a session
* needs to access the connection directly, that's the way it gets the instance.
**/
GroupConnection getConnection() {
return connection;
}
// TO_DO: acknowledgement, deal with failed deliveries
private void deliver(Message m) {
// TO_DO: single threaded access for sessions
// So far, the only thread that accesses dispatch() is the connection's puller thread and
// this will be the unique thread that accesses the Sessions. This may not be sufficient
// for high load, consider the possiblity to (dynamically) add new threads to handle
// delivery, possibly a thread per session.
synchronized(sessions) {
for(Iterator i = sessions.iterator(); i.hasNext(); ) {
((SessionImpl)i.next()).deliver(m);
}
}
}
// TO_DO: acknowledgement, deal with failed deliveries
private void deliver(Message m, String sessionID, String queueReceiverID) {
// TO_DO: single threaded access for sessions
// So far, the only thread that accesses dispatch() is the connection's puller thread and
// this will be the unique thread that accesses the Sessions. This may not be sufficient
// for high load, consider the possiblity to (dynamically) add new threads to handle
// delivery, possibly a thread per session.
SessionImpl session = null;
synchronized(sessions) {
for(Iterator i = sessions.iterator(); i.hasNext(); ) {
SessionImpl crts = (SessionImpl)i.next();
if (crts.getID().equals(sessionID)) {
session = crts;
break;
}
}
}
if (session == null) {
log.error("No such session: "+sessionID+". Delivery failed!");
}
else {
session.deliver(m, queueReceiverID);
}
}
/**
* Method called by a managed sessions when a new queue receiver is created or removed.
* The queue receiver has to be advertised to the group, to update the queue section of the
* group state.
**/
void advertiseQueueReceiver(String sessionID, QueueReceiverImpl qr, boolean isOn)
throws JMSException {
try {
connection.
advertiseQueueReceiver(qr.getQueue().getQueueName(), sessionID, qr.getID(), isOn);
}
catch(ProviderException e) {
// the multicast failed, the queue receiver is invalid
String msg = "Cannot advertise queue receiver";
JMSException jmse = new JMSException(msg);
jmse.setLinkedException(e);
throw jmse;
}
}
//
//
//
/**
* Generate a session ID that is quaranteed to be unique for the life time of a SessionManager
* instance.
**/
private synchronized String generateSessionID() {
return Integer.toString(sessionCounter++);
}
//
// Runnable INTERFACE IMPLEMENTATION
//
public void run() {
while(true) {
try {
Object o = deliveryQueue.remove();
if (o instanceof javax.jms.Message) {
deliver((javax.jms.Message)o);
}
else if (o instanceof QueueCarrier) {
QueueCarrier qc = (QueueCarrier)o;
deliver(qc.getJMSMessage(), qc.getSessionID(), qc.getReceiverID());
}
else {
log.warn("Unknown delivery object: " +
(o == null ? "null" : o.getClass().getName()));
}
}
catch(Exception e) {
log.warn("Failed to remove element from the delivery queue", e);
}
}
}
//
// END Runnable INTERFACE IMPLEMENTATION
//
}
| SessionManager.java |