package org.jboss.jms.serverless.client;
import org.jboss.logging.Logger;
import javax.jms.ConnectionFactory;
import javax.jms.Connection;
import java.util.List;
import java.util.ArrayList;
import java.util.Iterator;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.jms.Session;
import java.util.Map;
import java.util.HashMap;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Message;
import javax.jms.TextMessage;
import java.util.Set;
public class Interactive {
private static final Logger log = Logger.getLogger(Interactive.class);
private Context initialContext;
private Map connectionFactories; private Map destinations; private List connectionHolders;
public Interactive() throws Exception {
connectionFactories = new HashMap();
destinations = new HashMap();
connectionHolders = new ArrayList();
initJNDI();
}
public void exit() {
int exitValue = 0;
for(Iterator i = connectionHolders.iterator(); i.hasNext(); ) {
Connection c = ((ConnectionHolder)i.next()).getConnection();
try {
c.close();
}
catch(Exception e) {
exitValue ++;
log.warn("Trouble closing connection "+c, e);
}
}
System.exit(exitValue);
}
public void runtime() throws Exception {
System.out.println();
System.out.println("JMS Runtime: ");
System.out.println();
System.out.print("Connection Factories: ");
if (connectionFactories.size() == 0) {
System.out.println("No Known ConnectionFactories");
}
else {
System.out.println();
for(Iterator i = connectionFactories.keySet().iterator(); i.hasNext(); ) {
String jndiName = (String)i.next();
ConnectionFactory cf = (ConnectionFactory)connectionFactories.get(jndiName);
System.out.println("\t"+jndiName+" - "+cf);
}
}
System.out.print("Destinations: ");
if (destinations.size() == 0) {
System.out.println("No Known Destinations");
}
else {
System.out.println();
for(Iterator i = destinations.keySet().iterator(); i.hasNext(); ) {
String jndiName = (String)i.next();
Destination d = (Destination)destinations.get(jndiName);
System.out.println("\t"+jndiName+" - "+d.getClass().getName());
}
}
System.out.println();
System.out.print("Connections");
if (connectionHolders.size() == 0) {
System.out.println(": No Active Connections");
}
else {
System.out.println(": ");
int idx = 0;
for(Iterator ci = connectionHolders.iterator(); ci.hasNext(); idx++) {
ConnectionHolder ch = (ConnectionHolder)ci.next();
Connection c = ch.getConnection();
ConnectionFactory cf = ch.getConnectionFactory();
String cfJNDIName = getConnectionFactoryJNDIName(cf);
List sessionHolders = ch.getSessionHolders();
System.out.println("\t" + idx + " " + c + " produced by '" + cfJNDIName + "'");
System.out.print("\t\tSessions: ");
if (sessionHolders.isEmpty()) {
System.out.println("No Active Sessions");
}
else {
System.out.println();
int sidx = 0;
for(Iterator i = sessionHolders.iterator(); i.hasNext(); sidx++) {
SessionHolder h = (SessionHolder)i.next();
Session s = h.getSession();
System.out.println("\t\tSession "+idx+"."+sidx+" ("+
transactedToString(s.getTransacted())+", "+
acknowledgeModeToString(s.getAcknowledgeMode())+"): ");
List producers = h.getProducers();
if (producers.size() == 0) {
System.out.println("\t\t\tNo Producers");
}
else {
int pidx = 0;
for(Iterator j = producers.iterator(); j.hasNext(); pidx++) {
MessageProducer p = (MessageProducer)j.next();
System.out.println("\t\t\tProducer "+idx+"."+sidx+"."+pidx+" for "+
getDestinationJNDIName(p.getDestination()));
}
}
List consumers = h.getConsumers();
if (consumers.size() == 0) {
System.out.println("\t\t\tNo Consumers");
}
else {
int cidx = 0;
for(Iterator j = consumers.iterator(); j.hasNext(); cidx++) {
MessageConsumer mc = (MessageConsumer)j.next();
System.out.print("\t\t\tConsumer " +idx+"."+sidx+"."+cidx+" "+mc);
if (mc.getMessageListener() != null) {
System.out.println(", MessageListener ON");
}
else {
System.out.println(", MessageListener OFF");
}
}
}
}
}
}
}
System.out.println();
System.out.println();
}
public void lookupConnectionFactory(String name) throws Exception {
ConnectionFactory cf = (ConnectionFactory)initialContext.lookup(name);
connectionFactories.put(name, cf);
}
public void lookupDestination(String destinationJNDIName) throws Exception {
Destination d = (Destination)initialContext.lookup(destinationJNDIName);
destinations.put(destinationJNDIName, d);
}
public void createConnection(String connectionFactoryJNDIName) throws Exception {
lookupConnectionFactory(connectionFactoryJNDIName);
ConnectionFactory cf =
(ConnectionFactory)connectionFactories.get(connectionFactoryJNDIName);
Connection c = cf.createConnection();
ConnectionHolder ch = new ConnectionHolder(c, cf, new ArrayList());
connectionHolders.add(ch);
}
public void createConnection() throws Exception {
Set names = connectionFactories.keySet();
if (names.isEmpty()) {
log.error("No ConnectionFactory has been looked up yet!");
return;
}
if (names.size() > 1) {
String msg =
"There is more than one ConnectionFactory available. Specify the JNDI name when "+
"creating a connection";
log.error(msg);
return;
}
createConnection((String)(names.toArray()[0]));
}
public void start(int index) throws Exception {
try {
connectionOK(index);
}
catch(Exception e) {
log.error(e.getMessage());
return;
}
Connection c = ((ConnectionHolder)connectionHolders.get(index)).getConnection();
c.start();
}
public void start() throws Exception {
if (connectionHolders.size() == 0) {
log.error("No Connection has been created yet.");
return;
}
if (connectionHolders.size() > 1) {
log.error("There are more than one active Connections. Use start(index).");
return;
}
start(0);
}
public void stop(int index) throws Exception {
try {
connectionOK(index);
}
catch(Exception e) {
log.error(e.getMessage());
return;
}
Connection c = ((ConnectionHolder)connectionHolders.get(index)).getConnection();
c.stop();
}
public void close(int index) throws Exception {
try {
connectionOK(index);
}
catch(Exception e) {
log.error(e.getMessage());
return;
}
ConnectionHolder ch = (ConnectionHolder)connectionHolders.get(index);
Connection c = ch.getConnection();
c.close();
ch.destroy();
connectionHolders.remove(index);
}
public void createSession(int index, boolean transacted, String acknowledgeModeString)
throws Exception {
try {
connectionOK(index);
}
catch(Exception e) {
log.error(e.getMessage());
return;
}
int acknowledgeMode = -1;
try {
acknowledgeMode = parseAcknowledgeModeString(acknowledgeModeString);
}
catch(Exception e) {
return;
}
ConnectionHolder ch = (ConnectionHolder)connectionHolders.get(index);
List sessionHolders = ch.getSessionHolders();
Session s = ch.getConnection().createSession(transacted, acknowledgeMode);
sessionHolders.add(new SessionHolder(s, new ArrayList(), new ArrayList()));
}
public void createProducer(String sessionID, String destinationJNDIName) throws Exception {
int[] indices = parseCompositeID2(sessionID);
int connIdx = indices[0];
int sessionIdx = indices[1];
try {
connectionOK(connIdx);
}
catch(Exception e) {
log.error(e.getMessage());
return;
}
List sessionHolders =
((ConnectionHolder)connectionHolders.get(connIdx)).getSessionHolders();
if (sessionIdx >= sessionHolders.size()) {
String msg =
"There is no Session with the index "+sessionIdx+". Currently there are "+
sessionHolders.size()+" active Sessions for this Connection.";
log.error(msg);
return;
}
SessionHolder h = (SessionHolder)sessionHolders.get(sessionIdx);
Session s = h.getSession();
Destination d = getDestination(destinationJNDIName);
MessageProducer p = s.createProducer(d);
h.getProducers().add(p);
}
public void createConsumer(String sessionID, String destinationJNDIName) throws Exception {
int[] indices = parseCompositeID2(sessionID);
int connIdx = indices[0];
int sessionIdx = indices[1];
try {
connectionOK(connIdx);
}
catch(Exception e) {
log.error(e.getMessage());
return;
}
List sessionHolders =
((ConnectionHolder)connectionHolders.get(connIdx)).getSessionHolders();
if (sessionIdx >= sessionHolders.size()) {
String msg =
"There is no Session with the index "+sessionIdx+". Currently there are "+
sessionHolders.size()+" active Sessions for this Connection.";
log.error(msg);
return;
}
SessionHolder h = (SessionHolder)sessionHolders.get(sessionIdx);
Session s = h.getSession();
Destination d = getDestination(destinationJNDIName);
MessageConsumer c = s.createConsumer(d);
h.getConsumers().add(c);
}
public void closeConsumer(String consumerID) throws Exception {
MessageConsumer c = null;
try {
c = (MessageConsumer)getSessionChild(consumerID, false);
}
catch(Exception e) {
log.error(e.getMessage());
return;
}
c.close();
getSessionHolder(consumerID).getConsumers().remove(c);
}
public void setMessageListener(String consumerID) throws Exception {
MessageConsumer c = null;
try {
c = (MessageConsumer)getSessionChild(consumerID, false);
}
catch(Exception e) {
log.error(e.getMessage());
return;
}
final MessageConsumer myConsumer = c;
c.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
try {
String myConsumersID = getSessionChildID(myConsumer);
String output = "Consumer "+myConsumersID+": ";
if (message instanceof TextMessage) {
output += ((TextMessage)message).getText();
}
else {
output += message.toString();
}
System.out.println(output);
}
catch(Exception e) {
log.error("Failed to process message", e);
}
}
});
}
public void send(String producerID, String payload) throws Exception {
TextMessage tm = getSession(producerID).createTextMessage();
tm.setText(payload);
MessageProducer p = (MessageProducer)getSessionChild(producerID, true);
p.send(tm);
}
public void forward(String consumerID, String producerID) throws Exception {
final MessageConsumer c = (MessageConsumer)getSessionChild(consumerID, false);
final MessageProducer p = (MessageProducer)getSessionChild(producerID, true);
MessageListener l = new MessageListener() {
public void onMessage(Message message) {
try {
String consumerID = getSessionChildID(c);
String producerID = getSessionChildID(p);
p.send(message);
String msg =
"Consumer "+consumerID+" forwarded message to producer "+producerID;
System.out.println(msg);
}
catch(Exception e) {
log.error("Failed to process message", e);
}
}
};
c.setMessageListener(l);
}
private void initJNDI() throws Exception {
initialContext = new InitialContext();
}
private void connectionOK(int index) throws Exception {
int size = connectionHolders.size();
String msg = null;
if (size == 0) {
msg = "No active Connection created yet!";
}
else if (index < 0 || index >= size) {
msg =
"No such Connection index. Valid indexes are 0"+
(size == 0 ? "":" ... "+(size - 1))+".";
}
if (msg != null) {
throw new Exception(msg);
}
}
private int parseAcknowledgeModeString(String s) throws Exception {
s = s.toUpperCase();
if ("AUTO_ACKNOWLEDGE".equals(s)) {
return Session.AUTO_ACKNOWLEDGE;
}
else if ("CLIENT_ACKNOWLEDGE".equals(s)) {
return Session.CLIENT_ACKNOWLEDGE;
}
else if ("DUPS_OK_ACKNOWLEDGE".equals(s)) {
return Session.DUPS_OK_ACKNOWLEDGE;
}
else {
log.error("Unknow session acknowledment type: "+s);
throw new Exception();
}
}
private String acknowledgeModeToString(int a) {
if (a == Session.AUTO_ACKNOWLEDGE) {
return "AUTO_ACKNOWLEDGE";
}
else if (a == Session.CLIENT_ACKNOWLEDGE) {
return "CLIENT_ACKNOWLEDGE";
}
else if (a == Session.DUPS_OK_ACKNOWLEDGE) {
return "DUPS_OK_ACKNOWLEDGE";
}
else {
return "UNKNOWN_ACKNOWLEDGE_TYPE";
}
}
private String transactedToString(boolean t) {
if (t) {
return "TRANSACTED";
}
return "NON TRANSACTED";
}
private Destination getDestination(String destinationJNDIName) throws Exception {
Destination d = (Destination)destinations.get(destinationJNDIName);
if (d == null) {
lookupDestination(destinationJNDIName);
d = (Destination)destinations.get(destinationJNDIName);
}
return d;
}
private String getDestinationJNDIName(Destination d) throws Exception {
for(Iterator i = destinations.keySet().iterator(); i.hasNext(); ) {
String name = (String)i.next();
if (d.equals(destinations.get(name))) {
return name;
}
}
return null;
}
private String getConnectionFactoryJNDIName(ConnectionFactory cf) throws Exception {
for(Iterator i = connectionFactories.keySet().iterator(); i.hasNext(); ) {
String name = (String)i.next();
if (cf.equals(connectionFactories.get(name))) {
return name;
}
}
return null;
}
private int[] parseCompositeID2(String compositeID) throws Exception {
try {
int first, last;
int i = compositeID.indexOf('.');
first = Integer.parseInt(compositeID.substring(0, i));
last = Integer.parseInt(compositeID.substring(i+1));
return new int[] { first, last };
}
catch(Exception e) {
String msg = "Invalid ID format: "+compositeID;
throw new Exception(msg);
}
}
private int[] parseCompositeID3(String compositeID) throws Exception {
try {
int i1;
int i = compositeID.indexOf('.');
i1 = Integer.parseInt(compositeID.substring(0, i));
int[] c = parseCompositeID2(compositeID.substring(i+1));
return new int[] { i1, c[0], c[1] };
}
catch(Exception e) {
String msg = "Invalid ID format: "+compositeID;
throw new Exception(msg);
}
}
private SessionHolder getSessionHolder(String compositeID) throws Exception {
int[] indices = parseCompositeID3(compositeID);
int connIdx = indices[0];
int sessionIdx = indices[1];
connectionOK(connIdx);
List sHolders = ((ConnectionHolder)connectionHolders.get(connIdx)).getSessionHolders();
if (sessionIdx < 0 || sessionIdx >= sHolders.size()) {
String msg = "Invalid Session index: "+sessionIdx;
throw new Exception(msg);
}
return (SessionHolder)sHolders.get(sessionIdx);
}
private Session getSession(String compositeID) throws Exception {
return getSessionHolder(compositeID).getSession();
}
private Object getSessionChild(String compositeID, boolean isProducer) throws Exception {
SessionHolder h = getSessionHolder(compositeID);
int[] indices = parseCompositeID3(compositeID);
int childIdx = indices[2];
List l = isProducer ? h.getProducers() : h.getConsumers();
if (childIdx < 0 || childIdx >= l.size()) {
String msg = "Invalid "+(isProducer?"producer":"consumer")+" index: "+childIdx;
throw new Exception(msg);
}
return l.get(childIdx);
}
private String getSessionChildID(Object sessionChild) {
String id = null;
int cidx = 0;
for(Iterator ci = connectionHolders.iterator(); ci.hasNext(); cidx++) {
List sh = ((ConnectionHolder)ci.next()).getSessionHolders();
int sidx = 0;
for(Iterator i = sh.iterator(); i.hasNext(); sidx++) {
SessionHolder h = (SessionHolder)i.next();
int idx = h.getIndex(sessionChild);
if (idx == -1) {
continue;
}
return
Integer.toString(cidx)+"."+Integer.toString(sidx)+"."+Integer.toString(idx);
}
}
return id;
}
private class ConnectionHolder {
private Connection c;
private ConnectionFactory cf;
private List sessionHolders;
public ConnectionHolder(Connection c, ConnectionFactory cf, List sessionHolders) {
this.c = c;
this.cf = cf;
this.sessionHolders = sessionHolders;
}
public Connection getConnection() {
return c;
}
public ConnectionFactory getConnectionFactory() {
return cf;
}
public List getSessionHolders() {
return sessionHolders;
}
public void destroy() {
c = null;
cf = null;
for(Iterator i = sessionHolders.iterator(); i.hasNext(); ) {
SessionHolder h = (SessionHolder)i.next();
h.destroy();
}
sessionHolders.clear();
sessionHolders = null;
}
}
private class SessionHolder {
private Session s;
private List producers; private List consumers;
public SessionHolder(Session s, List producers, List consumers) {
this.s = s;
this.producers = producers;
this.consumers = consumers;
}
public Session getSession() {
return s;
}
public List getProducers() {
return producers;
}
public List getConsumers() {
return consumers;
}
public List getChildren(Object likeThis) {
if (likeThis instanceof MessageProducer) {
return producers;
}
else if (likeThis instanceof MessageConsumer) {
return consumers;
}
return null;
}
public int getIndex(Object sessionChild) {
List l = getChildren(sessionChild);
if (l == null) {
return -1;
}
return l.indexOf(sessionChild);
}
public void destroy() {
s = null;
producers.clear();
consumers.clear();
producers = null;
consumers = null;
}
}
}