package org.jboss.mq.il.uil2.msgs;
import java.io.ObjectOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.lang.reflect.UndeclaredThrowableException;
import org.jboss.mq.il.uil2.SocketManager.ReadTask;
public class BaseMsg
implements Runnable
{
private static boolean useJMSServerMsgIDs = false;
private static int nextMsgID = 0;
private static Object nextMsgIDLock = new Object();
private static final int SERVER_MSG_ID_MASK = 0x80000000;
private ReadTask handler;
public int msgType;
public int msgID;
public Exception error;
public BaseMsg(int msgType)
{
this(msgType, 0);
}
public BaseMsg(int msgType, int msgID)
{
this.msgType = msgType;
this.msgID = msgID;
}
public static void setUseJMSServerMsgIDs(boolean flag)
{
useJMSServerMsgIDs = flag;
}
public static BaseMsg createMsg(int msgType) throws IllegalArgumentException
{
BaseMsg msg = null;
switch( msgType )
{
case MsgTypes.m_acknowledge:
msg = new AcknowledgementRequestMsg();
break;
case MsgTypes.m_addMessage:
msg = new AddMsg();
break;
case MsgTypes.m_browse:
msg = new BrowseMsg();
break;
case MsgTypes.m_checkID:
msg = new CheckIDMsg();
break;
case MsgTypes.m_connectionClosing:
msg = new CloseMsg();
break;
case MsgTypes.m_createQueue:
msg = new CreateDestMsg(true);
break;
case MsgTypes.m_createTopic:
msg = new CreateDestMsg(false);
break;
case MsgTypes.m_deleteTemporaryDestination:
msg = new DeleteTemporaryDestMsg();
break;
case MsgTypes.m_getID:
msg = new GetIDMsg();
break;
case MsgTypes.m_getTemporaryQueue:
msg = new TemporaryDestMsg(true);
break;
case MsgTypes.m_getTemporaryTopic:
msg = new TemporaryDestMsg(false);
break;
case MsgTypes.m_receive:
msg = new ReceiveMsg();
break;
case MsgTypes.m_setEnabled:
msg = new EnableConnectionMsg();
break;
case MsgTypes.m_setSpyDistributedConnection:
msg = new ConnectionTokenMsg();
break;
case MsgTypes.m_subscribe:
msg = new SubscribeMsg();
break;
case MsgTypes.m_transact:
msg = new TransactMsg();
break;
case MsgTypes.m_unsubscribe:
msg = new UnsubscribeMsg();
break;
case MsgTypes.m_destroySubscription:
msg = new DeleteSubscriptionMsg();
break;
case MsgTypes.m_checkUser:
msg = new CheckUserMsg(false);
break;
case MsgTypes.m_ping:
msg = new PingMsg(true);
break;
case MsgTypes.m_authenticate:
msg = new CheckUserMsg(true);
break;
case MsgTypes.m_close:
break;
case MsgTypes.m_pong:
msg = new PingMsg(false);
break;
case MsgTypes.m_receiveRequest:
msg = new ReceiveRequestMsg();
break;
default:
throw new IllegalArgumentException("Invalid msgType: "+msgType);
}
return msg;
}
public static String toString(int msgType)
{
String msgTypeString = null;
switch (msgType)
{
case MsgTypes.m_acknowledge:
msgTypeString = "m_acknowledge";
break;
case MsgTypes.m_addMessage:
msgTypeString = "m_addMessage";
break;
case MsgTypes.m_browse:
msgTypeString = "m_browse";
break;
case MsgTypes.m_checkID:
msgTypeString = "m_checkID";
break;
case MsgTypes.m_connectionClosing:
msgTypeString = "m_connectionClosing";
break;
case MsgTypes.m_createQueue:
msgTypeString = "m_createQueue";
break;
case MsgTypes.m_createTopic:
msgTypeString = "m_createTopic";
break;
case MsgTypes.m_deleteTemporaryDestination:
msgTypeString = "m_deleteTemporaryDestination";
break;
case MsgTypes.m_getID:
msgTypeString = "m_getID";
break;
case MsgTypes.m_getTemporaryQueue:
msgTypeString = "m_getTemporaryQueue";
break;
case MsgTypes.m_getTemporaryTopic:
msgTypeString = "m_getTemporaryTopic";
break;
case MsgTypes.m_receive:
msgTypeString = "m_receive";
break;
case MsgTypes.m_setEnabled:
msgTypeString = "m_setEnabled";
break;
case MsgTypes.m_setSpyDistributedConnection:
msgTypeString = "m_setSpyDistributedConnection";
break;
case MsgTypes.m_subscribe:
msgTypeString = "m_subscribe";
break;
case MsgTypes.m_transact:
msgTypeString = "m_transact";
break;
case MsgTypes.m_unsubscribe:
msgTypeString = "m_unsubscribe";
break;
case MsgTypes.m_destroySubscription:
msgTypeString = "m_destroySubscription";
break;
case MsgTypes.m_checkUser:
msgTypeString = "m_checkUser";
break;
case MsgTypes.m_ping:
msgTypeString = "m_ping";
break;
case MsgTypes.m_authenticate:
msgTypeString = "m_authenticate";
break;
case MsgTypes.m_close:
msgTypeString = "m_close";
break;
case MsgTypes.m_pong:
msgTypeString = "m_pong";
break;
case MsgTypes.m_receiveRequest:
msgTypeString = "m_receiveRequest";
break;
default:
msgTypeString = "unknown message type " + msgType;
}
return msgTypeString;
}
public int getMsgType()
{
return msgType;
}
public synchronized int getMsgID()
{
if( msgID == 0 )
{
synchronized (nextMsgIDLock)
{
msgID = ++ nextMsgID;
}
if( useJMSServerMsgIDs )
msgID += SERVER_MSG_ID_MASK;
else if( msgID >= SERVER_MSG_ID_MASK )
msgID = msgID % SERVER_MSG_ID_MASK;
}
return msgID;
}
public void setMsgID(int msgID)
{
this.msgID = msgID;
}
public Exception getError()
{
return error;
}
public void setError(Throwable e)
{
if( e instanceof Exception )
error = (Exception) e;
else
error = new UndeclaredThrowableException(e);
}
public boolean equals(Object o)
{
BaseMsg msg = (BaseMsg) o;
return msg.msgID == msgID;
}
public int hashCode()
{
return msgID;
}
public String toString()
{
StringBuffer tmp = new StringBuffer(this.getClass().getName());
tmp.append(System.identityHashCode(this));
tmp.append("[msgType: ");
tmp.append(toString(msgType));
tmp.append(", msgID: ");
tmp.append(msgID);
tmp.append(", error: ");
tmp.append(error);
tmp.append("]");
return tmp.toString();
}
public void trimReply()
{
}
public void write(ObjectOutputStream out) throws IOException
{
out.writeByte(msgType);
out.writeInt(msgID);
int hasError = error != null ? 1 : 0;
out.writeByte(hasError);
if( hasError == 1 )
out.writeObject(error);
}
public void read(ObjectInputStream in) throws IOException, ClassNotFoundException
{
int hasError = in.readByte();
if( hasError == 1 )
error = (Exception) in.readObject();
}
public void setHandler(ReadTask handler)
{
this.handler = handler;
}
public void run()
{
handler.handleMsg(this);
handler = null;
}
}