package org.jboss.mq.il.uil2;
import java.io.Serializable;
import java.io.IOException;
import java.net.InetAddress;
import java.net.ConnectException;
import java.net.Socket;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.Topic;
import javax.net.SocketFactory;
import org.jboss.logging.Logger;
import org.jboss.mq.AcknowledgementRequest;
import org.jboss.mq.Connection;
import org.jboss.mq.ConnectionToken;
import org.jboss.mq.DurableSubscriptionID;
import org.jboss.mq.SpyDestination;
import org.jboss.mq.SpyMessage;
import org.jboss.mq.TransactionRequest;
import org.jboss.mq.il.ServerIL;
import org.jboss.mq.il.uil2.msgs.MsgTypes;
import org.jboss.mq.il.uil2.msgs.ConnectionTokenMsg;
import org.jboss.mq.il.uil2.msgs.EnableConnectionMsg;
import org.jboss.mq.il.uil2.msgs.GetIDMsg;
import org.jboss.mq.il.uil2.msgs.TemporaryDestMsg;
import org.jboss.mq.il.uil2.msgs.AcknowledgementRequestMsg;
import org.jboss.mq.il.uil2.msgs.AddMsg;
import org.jboss.mq.il.uil2.msgs.BrowseMsg;
import org.jboss.mq.il.uil2.msgs.CheckIDMsg;
import org.jboss.mq.il.uil2.msgs.CheckUserMsg;
import org.jboss.mq.il.uil2.msgs.CloseMsg;
import org.jboss.mq.il.uil2.msgs.CreateDestMsg;
import org.jboss.mq.il.uil2.msgs.DeleteTemporaryDestMsg;
import org.jboss.mq.il.uil2.msgs.DeleteSubscriptionMsg;
import org.jboss.mq.il.uil2.msgs.PingMsg;
import org.jboss.mq.il.uil2.msgs.ReceiveMsg;
import org.jboss.mq.il.uil2.msgs.SubscribeMsg;
import org.jboss.mq.il.uil2.msgs.TransactMsg;
import org.jboss.mq.il.uil2.msgs.UnsubscribeMsg;
public class UILServerIL
implements Cloneable, MsgTypes, Serializable, ServerIL
{
private static final long serialVersionUID = 853594001646066224L;
private static Logger log = Logger.getLogger(UILServerIL.class);
private final static String USE_SERVER_HOST = "org.jboss.mq.il.uil2.useServerHost";
private final static String LOCAL_ADDR = "org.jboss.mq.il.uil2.localAddr";
private final static String LOCAL_PORT = "org.jboss.mq.il.uil2.localPort";
private final static String SERVER_ADDR = "org.jboss.mq.il.uil2.serverAddr";
private final static String SERVER_PORT = "org.jboss.mq.il.uil2.serverPort";
private final static String RETRY_COUNT = "org.jboss.mq.il.uil2.retryCount";
private final static String RETRY_DELAY = "org.jboss.mq.il.uil2.retryDelay";
private InetAddress addr;
private int port;
private String socketFactoryName;
private boolean enableTcpNoDelay = false;
private int bufferSize;
private int chunkSize;
private transient InetAddress localAddr;
private transient int localPort;
protected transient Socket socket;
protected transient SocketManager socketMgr;
public UILServerIL(InetAddress addr, int port, String socketFactoryName,
boolean enableTcpNoDelay, int bufferSize, int chunkSize)
throws Exception
{
this.addr = addr;
this.port = port;
this.socketFactoryName = socketFactoryName;
this.enableTcpNoDelay = enableTcpNoDelay;
this.bufferSize = bufferSize;
this.chunkSize = chunkSize;
}
public void setConnectionToken(ConnectionToken dest)
throws Exception
{
ConnectionTokenMsg msg = new ConnectionTokenMsg(dest);
getSocketMgr().sendMessage(msg);
}
public void setEnabled(ConnectionToken dc, boolean enabled)
throws JMSException, Exception
{
EnableConnectionMsg msg = new EnableConnectionMsg(enabled);
getSocketMgr().sendMessage(msg);
}
public String getID()
throws Exception
{
GetIDMsg msg = new GetIDMsg();
getSocketMgr().sendMessage(msg);
String id = msg.getID();
return id;
}
public TemporaryQueue getTemporaryQueue(ConnectionToken dc)
throws JMSException, Exception
{
TemporaryDestMsg msg = new TemporaryDestMsg(true);
getSocketMgr().sendMessage(msg);
TemporaryQueue dest = msg.getQueue();
return dest;
}
public TemporaryTopic getTemporaryTopic(ConnectionToken dc)
throws JMSException, Exception
{
TemporaryDestMsg msg = new TemporaryDestMsg(false);
getSocketMgr().sendMessage(msg);
TemporaryTopic dest = msg.getTopic();
return dest;
}
public void acknowledge(ConnectionToken dc, AcknowledgementRequest item)
throws JMSException, Exception
{
AcknowledgementRequestMsg msg = new AcknowledgementRequestMsg(item);
getSocketMgr().sendMessage(msg);
}
public void addMessage(ConnectionToken dc, SpyMessage val)
throws Exception
{
AddMsg msg = new AddMsg(val);
getSocketMgr().sendMessage(msg);
}
public SpyMessage[] browse(ConnectionToken dc, Destination dest, String selector)
throws JMSException, Exception
{
BrowseMsg msg = new BrowseMsg(dest, selector);
getSocketMgr().sendMessage(msg);
SpyMessage[] msgs = msg.getMessages();
return msgs;
}
public void checkID(String id)
throws JMSException, Exception
{
CheckIDMsg msg = new CheckIDMsg(id);
getSocketMgr().sendMessage(msg);
}
public String checkUser(String username, String password)
throws JMSException, Exception
{
CheckUserMsg msg = new CheckUserMsg(username, password, false);
getSocketMgr().sendMessage(msg);
String clientID = msg.getID();
return clientID;
}
public String authenticate(String username, String password)
throws JMSException, Exception
{
CheckUserMsg msg = new CheckUserMsg(username, password, true);
getSocketMgr().sendMessage(msg);
String sessionID = msg.getID();
return sessionID;
}
public Object clone()
throws CloneNotSupportedException
{
return super.clone();
}
public ServerIL cloneServerIL()
throws Exception
{
return (ServerIL)clone();
}
public void connectionClosing(ConnectionToken dc)
throws JMSException, Exception
{
CloseMsg msg = new CloseMsg();
try
{
getSocketMgr().sendMessage(msg);
}
catch (IOException ignored)
{
}
destroyConnection();
}
public Queue createQueue(ConnectionToken dc, String destName)
throws JMSException, Exception
{
CreateDestMsg msg = new CreateDestMsg(destName, true);
getSocketMgr().sendMessage(msg);
Queue dest = msg.getQueue();
return dest;
}
public Topic createTopic(ConnectionToken dc, String destName)
throws JMSException, Exception
{
CreateDestMsg msg = new CreateDestMsg(destName, false);
getSocketMgr().sendMessage(msg);
Topic dest = msg.getTopic();
return dest;
}
public void deleteTemporaryDestination(ConnectionToken dc, SpyDestination dest)
throws JMSException, Exception
{
DeleteTemporaryDestMsg msg = new DeleteTemporaryDestMsg(dest);
getSocketMgr().sendMessage(msg);
}
public void destroySubscription(ConnectionToken dc,DurableSubscriptionID id)
throws JMSException, Exception
{
DeleteSubscriptionMsg msg = new DeleteSubscriptionMsg(id);
getSocketMgr().sendMessage(msg);
}
public void ping(ConnectionToken dc, long clientTime)
throws Exception
{
PingMsg msg = new PingMsg(clientTime, true);
msg.getMsgID();
getSocketMgr().sendReply(msg);
}
public SpyMessage receive(ConnectionToken dc, int subscriberId, long wait)
throws Exception, Exception
{
ReceiveMsg msg = new ReceiveMsg(subscriberId, wait);
getSocketMgr().sendMessage(msg);
SpyMessage reply = msg.getMessage();
return reply;
}
public void subscribe(ConnectionToken dc, org.jboss.mq.Subscription s)
throws JMSException, Exception
{
SubscribeMsg msg = new SubscribeMsg(s);
getSocketMgr().sendMessage(msg);
}
public void transact(org.jboss.mq.ConnectionToken dc, TransactionRequest t)
throws JMSException, Exception
{
TransactMsg msg = new TransactMsg(t);
getSocketMgr().sendMessage(msg);
}
public void unsubscribe(ConnectionToken dc, int subscriptionID)
throws JMSException, Exception
{
UnsubscribeMsg msg = new UnsubscribeMsg(subscriptionID);
getSocketMgr().sendMessage(msg);
}
final SocketManager getSocketMgr()
throws Exception
{
if( socketMgr == null )
createConnection();
return socketMgr;
}
protected void checkConnection()
throws Exception
{
if (socketMgr == null)
{
createConnection();
}
}
protected void createConnection()
throws Exception
{
boolean tracing = log.isTraceEnabled();
if( tracing )
log.trace("Connecting to : "+addr+":"+port);
SocketFactory socketFactory = null;
if( socketFactoryName != null )
{
try
{
ClassLoader loader = Thread.currentThread().getContextClassLoader();
Class factoryClass = loader.loadClass(socketFactoryName);
socketFactory = (SocketFactory) factoryClass.newInstance();
}
catch(Exception e)
{
log.debug("Failed to load socket factory: "+socketFactoryName, e);
}
}
if( socketFactory == null )
{
socketFactory = SocketFactory.getDefault();
}
String tmp = getProperty(LOCAL_ADDR);
if( tmp != null )
this.localAddr = InetAddress.getByName(tmp);
tmp = getProperty(LOCAL_PORT);
if( tmp != null )
this.localPort = Integer.parseInt(tmp);
InetAddress serverAddr = addr;
int serverPort = port;
tmp = getProperty(SERVER_ADDR);
if( tmp != null )
serverAddr = InetAddress.getByName(tmp);
tmp = getProperty(SERVER_PORT);
if( tmp != null )
serverPort = Integer.parseInt(tmp);
String useHostNameProp = getProperty(USE_SERVER_HOST);
String serverHost = serverAddr.getHostAddress();
if (Boolean.valueOf(useHostNameProp).booleanValue())
serverHost = serverAddr.getHostName();
if( tracing )
{
log.trace("Connecting with addr="+serverHost+", port="+serverPort
+ ", localAddr="+localAddr+", localPort="+localPort
+ ", socketFactory="+socketFactory
+ ", enableTcpNoDelay="+enableTcpNoDelay
+ ", bufferSize="+bufferSize
+ ", chunkSize="+chunkSize
);
}
int retries = 0;
int maxRetries = 10;
tmp = getProperty(RETRY_COUNT);
if( tmp != null )
maxRetries = Integer.parseInt(tmp);
long retryDelay = 0;
tmp = getProperty(RETRY_DELAY);
if( tmp != null )
{
retryDelay = Long.parseLong(tmp);
if( retryDelay < 0 )
retryDelay = 0;
}
if( tracing )
log.trace("Begin connect loop, maxRetries="+maxRetries+", delay="+retryDelay);
while (true)
{
try
{
if( localAddr != null )
socket = socketFactory.createSocket(serverHost, port, localAddr, localPort);
else
socket = socketFactory.createSocket(serverHost, port);
break;
}
catch (ConnectException e)
{
if (++retries > maxRetries)
throw e;
if( tracing )
log.trace("Failed to connect, retries="+retries, e);
}
try
{
Thread.sleep(retryDelay);
}
catch(InterruptedException e)
{
break;
}
}
socket.setTcpNoDelay(enableTcpNoDelay);
socketMgr = new SocketManager(socket);
socketMgr.setBufferSize(bufferSize);
socketMgr.setChunkSize(chunkSize);
socketMgr.start(Connection.getThreadGroup());
}
protected void destroyConnection()
{
try
{
if( socket != null )
{
try
{
socketMgr.stop();
}
finally
{
socket.close();
}
}
}
catch(IOException ignore)
{
}
}
private String getProperty(String name)
{
String value = null;
try
{
value = System.getProperty(name);
}
catch (Throwable ignored)
{
log.trace("Cannot retrieve system property " + name);
}
return value;
}
}