package org.jboss.jms.serverless;
import org.jboss.logging.Logger;
import javax.jms.Session;
import java.util.List;
import java.util.ArrayList;
import javax.jms.JMSException;
import java.util.Iterator;
import javax.jms.Message;
class SessionManager implements Runnable {
private static final Logger log = Logger.getLogger(SessionManager.class);
private GroupConnection connection;
private org.jgroups.util.Queue deliveryQueue;
private Thread deliveryThread;
private List sessions;
private int sessionCounter = 0;
SessionManager(GroupConnection connection, org.jgroups.util.Queue deliveryQueue) {
this.connection = connection;
this.deliveryQueue = deliveryQueue;
sessions = new ArrayList();
deliveryThread = new Thread(this, "Session Delivery Thread");
deliveryThread.start();
}
public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
Session s = new SessionImpl(this, generateSessionID(), transacted, acknowledgeMode);
synchronized(sessions) {
sessions.add(s);
}
return s;
}
GroupConnection getConnection() {
return connection;
}
private void deliver(Message m) {
synchronized(sessions) {
for(Iterator i = sessions.iterator(); i.hasNext(); ) {
((SessionImpl)i.next()).deliver(m);
}
}
}
private void deliver(Message m, String sessionID, String queueReceiverID) {
SessionImpl session = null;
synchronized(sessions) {
for(Iterator i = sessions.iterator(); i.hasNext(); ) {
SessionImpl crts = (SessionImpl)i.next();
if (crts.getID().equals(sessionID)) {
session = crts;
break;
}
}
}
if (session == null) {
log.error("No such session: "+sessionID+". Delivery failed!");
}
else {
session.deliver(m, queueReceiverID);
}
}
void advertiseQueueReceiver(String sessionID, QueueReceiverImpl qr, boolean isOn)
throws JMSException {
try {
connection.
advertiseQueueReceiver(qr.getQueue().getQueueName(), sessionID, qr.getID(), isOn);
}
catch(ProviderException e) {
String msg = "Cannot advertise queue receiver";
JMSException jmse = new JMSException(msg);
jmse.setLinkedException(e);
throw jmse;
}
}
private synchronized String generateSessionID() {
return Integer.toString(sessionCounter++);
}
public void run() {
while(true) {
try {
Object o = deliveryQueue.remove();
if (o instanceof javax.jms.Message) {
deliver((javax.jms.Message)o);
}
else if (o instanceof QueueCarrier) {
QueueCarrier qc = (QueueCarrier)o;
deliver(qc.getJMSMessage(), qc.getSessionID(), qc.getReceiverID());
}
else {
log.warn("Unknown delivery object: " +
(o == null ? "null" : o.getClass().getName()));
}
}
catch(Exception e) {
log.warn("Failed to remove element from the delivery queue", e);
}
}
}
}