package org.jboss.mq.il.uil2;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.Properties;
import org.jboss.logging.Logger;
import org.jboss.mq.Connection;
import org.jboss.mq.ReceiveRequest;
import org.jboss.mq.SpyDestination;
import org.jboss.mq.il.ClientILService;
import org.jboss.mq.il.ClientIL;
import org.jboss.mq.il.uil2.msgs.MsgTypes;
import org.jboss.mq.il.uil2.msgs.BaseMsg;
import org.jboss.mq.il.uil2.msgs.ReceiveRequestMsg;
import org.jboss.mq.il.uil2.msgs.DeleteTemporaryDestMsg;
import org.jboss.mq.il.uil2.msgs.PingMsg;
public class UILClientILService
implements ClientILService, MsgTypes, SocketManagerHandler
{
static Logger log = Logger.getLogger(UILClientILService.class);
private UILClientIL clientIL;
private SocketManager socketMgr;
private Connection connection;
private boolean sendReceiveReplies = true;
public ClientIL getClientIL()
throws Exception
{
return clientIL;
}
public void init(Connection connection, Properties props)
throws Exception
{
this.connection = connection;
clientIL = new UILClientIL();
UILServerIL serverIL = (UILServerIL) connection.getServerIL();
socketMgr = serverIL.getSocketMgr();
String t = props.getProperty(UILServerILFactory.UIL_BUFFERSIZE_KEY);
if (t != null)
socketMgr.setBufferSize(Integer.parseInt(t));
t = props.getProperty(UILServerILFactory.UIL_CHUNKSIZE_KEY);
if (t != null)
socketMgr.setChunkSize(Integer.parseInt(t));
t = props.getProperty(UILServerILFactory.UIL_RECEIVE_REPLIES_KEY);
if (t != null)
sendReceiveReplies = false;
socketMgr.setHandler(this);
}
public void handleMsg(BaseMsg msg)
throws Exception
{
boolean trace = log.isTraceEnabled();
int msgType = msg.getMsgType();
if (trace)
log.trace("Begin handleMsg, msgType: " + msgType);
switch( msgType )
{
case m_receiveRequest:
ReceiveRequestMsg rmsg = (ReceiveRequestMsg) msg;
ReceiveRequest[] messages = rmsg.getMessages();
connection.asynchDeliver(messages);
if (sendReceiveReplies)
{
rmsg.trimTheMessages();
socketMgr.sendReply(msg);
}
break;
case m_deleteTemporaryDestination:
DeleteTemporaryDestMsg dmsg = (DeleteTemporaryDestMsg) msg;
SpyDestination dest = dmsg.getDest();
connection.asynchDeleteTemporaryDestination(dest);
socketMgr.sendReply(msg);
break;
case m_close:
connection.asynchClose();
socketMgr.sendReply(msg);
break;
case m_pong:
PingMsg pmsg = (PingMsg) msg;
long time = pmsg.getTime();
connection.asynchPong(time);
break;
default:
connection.asynchFailure("UILClientILService received bad msg: "+msg, null);
}
if (trace)
log.trace("End handleMsg");
}
public void start() throws Exception
{
log.debug("Starting");
}
public void stop() throws Exception
{
log.debug("Stopping");
socketMgr.stop();
}
public void onStreamNotification(Object stream, int size)
{
connection.asynchPong(System.currentTimeMillis());
}
public void asynchFailure(String error, Throwable e)
{
if (e instanceof Exception)
connection.asynchFailure(error, e);
else
connection.asynchFailure(error, new UndeclaredThrowableException(e));
}
public void close()
{
}
}