package org.jboss.mq.il.oil2;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.InetAddress;
import java.net.Socket;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.Topic;
import javax.net.SocketFactory;
import org.jboss.logging.Logger;
import org.jboss.mq.AcknowledgementRequest;
import org.jboss.mq.Connection;
import org.jboss.mq.ConnectionToken;
import org.jboss.mq.DurableSubscriptionID;
import org.jboss.mq.SpyDestination;
import org.jboss.mq.SpyMessage;
import org.jboss.mq.TransactionRequest;
import org.jboss.mq.il.ServerIL;
public final class OIL2ServerIL
implements java.io.Serializable, java.lang.Cloneable, org.jboss.mq.il.ServerIL, OIL2Constants
{
static final long serialVersionUID = 1841984837999477932L;
private final static Logger log = Logger.getLogger(OIL2ServerIL.class);
private final static String LOCAL_ADDR = "org.jboss.mq.il.oil2.localAddr";
private final static String LOCAL_PORT = "org.jboss.mq.il.oil2.localPort";
private String addr;
private int port;
private String socketFactoryName;
private boolean enableTcpNoDelay = false;
private transient InetAddress localAddr;
private transient int localPort;
private transient ObjectInputStream in;
private transient ObjectOutputStream out;
private transient Socket socket;
OIL2SocketHandler socketHandler;
class RequestListner implements OIL2RequestListner
{
public void handleConnectionException(Exception e)
{
}
public void handleRequest(OIL2Request request)
{
}
}
public OIL2ServerIL(String addr, int port,
String socketFactoryName, boolean enableTcpNoDelay)
{
this.addr = addr;
this.port = port;
this.socketFactoryName = socketFactoryName;
this.enableTcpNoDelay = enableTcpNoDelay;
}
synchronized public void connect() throws IOException
{
if (socket == null)
{
boolean tracing = log.isTraceEnabled();
if( tracing )
log.trace("Connecting to : "+addr+":"+port);
SocketFactory socketFactory = null;
if( socketFactoryName != null )
{
try
{
ClassLoader loader = Thread.currentThread().getContextClassLoader();
Class factoryClass = loader.loadClass(socketFactoryName);
socketFactory = (SocketFactory) factoryClass.newInstance();
}
catch(Exception e)
{
log.debug("Failed to load socket factory: "+socketFactoryName, e);
}
}
if( socketFactory == null )
{
socketFactory = SocketFactory.getDefault();
}
String tmp = System.getProperty(LOCAL_ADDR);
if( tmp != null )
this.localAddr = InetAddress.getByName(tmp);
tmp = System.getProperty(LOCAL_PORT);
if( tmp != null )
this.localPort = Integer.parseInt(tmp);
if( tracing )
{
log.trace("Connecting with addr="+addr+", port="+port
+ ", localAddr="+localAddr+", localPort="+localPort
+ ", socketFactory="+socketFactory);
}
if( localAddr != null )
socket = socketFactory.createSocket(addr, port, localAddr, localPort);
else
socket = socketFactory.createSocket(addr, port);
if( tracing )
log.trace("Connection established.");
socket.setTcpNoDelay(enableTcpNoDelay);
out = new ObjectOutputStream(new BufferedOutputStream(socket.getOutputStream()));
out.flush();
in = new ObjectInputStream(new BufferedInputStream(socket.getInputStream()));
if( tracing )
log.trace("Streams initialized.");
socketHandler = new OIL2SocketHandler(in, out, Connection.getThreadGroup());
socketHandler.setRequestListner(new RequestListner());
socketHandler.start();
}
}
public void setConnectionToken(ConnectionToken dest) throws Exception
{
connect();
OIL2Request request = new OIL2Request(OIL2Constants.SERVER_SET_SPY_DISTRIBUTED_CONNECTION, new Object[] { dest });
OIL2Response response = socketHandler.synchRequest(request);
response.evalThrowsJMSException();
}
public void setEnabled(ConnectionToken dc, boolean enabled) throws JMSException, Exception
{
connect();
OIL2Request request = new OIL2Request(OIL2Constants.SERVER_SET_ENABLED, new Object[] { new Boolean(enabled)});
OIL2Response response = socketHandler.synchRequest(request);
response.evalThrowsJMSException();
}
public String getID() throws Exception
{
connect();
OIL2Request request = new OIL2Request(OIL2Constants.SERVER_GET_ID, null);
OIL2Response response = socketHandler.synchRequest(request);
return (String) response.evalThrowsJMSException();
}
public TemporaryQueue getTemporaryQueue(ConnectionToken dc) throws JMSException, Exception
{
connect();
OIL2Request request = new OIL2Request(OIL2Constants.SERVER_GET_TEMPORARY_QUEUE, null);
OIL2Response response = socketHandler.synchRequest(request);
return (TemporaryQueue) response.evalThrowsJMSException();
}
public TemporaryTopic getTemporaryTopic(ConnectionToken dc) throws JMSException, Exception
{
connect();
OIL2Request request = new OIL2Request(OIL2Constants.SERVER_GET_TEMPORARY_TOPIC, null);
OIL2Response response = socketHandler.synchRequest(request);
return (TemporaryTopic) response.evalThrowsJMSException();
}
public void acknowledge(ConnectionToken dc, AcknowledgementRequest item) throws JMSException, Exception
{
connect();
OIL2Request request = new OIL2Request(OIL2Constants.SERVER_ACKNOWLEDGE, new Object[] { item });
OIL2Response response = socketHandler.synchRequest(request);
response.evalThrowsJMSException();
}
public void addMessage(ConnectionToken dc, SpyMessage val) throws Exception
{
connect();
OIL2Request request = new OIL2Request(OIL2Constants.SERVER_ADD_MESSAGE, new Object[] { val });
OIL2Response response = socketHandler.synchRequest(request);
response.evalThrowsJMSException();
}
public SpyMessage[] browse(ConnectionToken dc, Destination dest, String selector)
throws JMSException, Exception
{
connect();
OIL2Request request = new OIL2Request(OIL2Constants.SERVER_BROWSE, new Object[] { dest, selector });
OIL2Response response = socketHandler.synchRequest(request);
return (SpyMessage[]) response.evalThrowsJMSException();
}
public void checkID(String ID) throws JMSException, Exception
{
connect();
OIL2Request request = new OIL2Request(OIL2Constants.SERVER_CHECK_ID, new Object[] { ID });
OIL2Response response = socketHandler.synchRequest(request);
response.evalThrowsJMSException();
}
public String checkUser(String userName, String password) throws JMSException, Exception
{
connect();
OIL2Request request = new OIL2Request(OIL2Constants.SERVER_CHECK_USER, new Object[] { userName, password });
OIL2Response response = socketHandler.synchRequest(request);
return (String) response.evalThrowsJMSException();
}
public String authenticate(String userName, String password) throws JMSException, Exception
{
connect();
OIL2Request request = new OIL2Request(OIL2Constants.SERVER_AUTHENTICATE, new Object[] { userName, password });
OIL2Response response = socketHandler.synchRequest(request);
return (String) response.evalThrowsJMSException();
}
public Object clone() throws CloneNotSupportedException
{
return super.clone();
}
public ServerIL cloneServerIL() throws Exception
{
return (ServerIL) clone();
}
public void connectionClosing(ConnectionToken dc) throws JMSException, Exception
{
try
{
connect();
OIL2Request request = new OIL2Request(OIL2Constants.SERVER_CONNECTION_CLOSING, null);
OIL2Response response = socketHandler.synchRequest(request);
response.evalThrowsJMSException();
}
finally
{
close();
}
}
public Queue createQueue(ConnectionToken dc, String dest) throws JMSException, Exception
{
connect();
OIL2Request request = new OIL2Request(OIL2Constants.SERVER_CREATE_QUEUE, new Object[] { dest });
OIL2Response response = socketHandler.synchRequest(request);
return (Queue) response.evalThrowsJMSException();
}
public Topic createTopic(ConnectionToken dc, String dest) throws JMSException, Exception
{
connect();
OIL2Request request = new OIL2Request(OIL2Constants.SERVER_CREATE_TOPIC, new Object[] { dest });
OIL2Response response = socketHandler.synchRequest(request);
return (Topic) response.evalThrowsJMSException();
}
public void deleteTemporaryDestination(ConnectionToken dc, SpyDestination dest)
throws JMSException, Exception
{
connect();
OIL2Request request = new OIL2Request(OIL2Constants.SERVER_DELETE_TEMPORARY_DESTINATION, new Object[] { dest });
OIL2Response response = socketHandler.synchRequest(request);
response.evalThrowsJMSException();
}
public void destroySubscription(ConnectionToken dc, DurableSubscriptionID id)
throws JMSException, Exception
{
connect();
OIL2Request request = new OIL2Request(OIL2Constants.SERVER_DESTROY_SUBSCRIPTION, new Object[] { id });
OIL2Response response = socketHandler.synchRequest(request);
response.evalThrowsJMSException();
}
public void ping(ConnectionToken dc, long clientTime) throws Exception
{
connect();
OIL2Request request = new OIL2Request(OIL2Constants.SERVER_PING, new Object[] { new Long(clientTime)});
OIL2Response response = socketHandler.synchRequest(request);
response.evalThrowsJMSException();
}
public SpyMessage receive(ConnectionToken dc, int subscriberId, long wait) throws Exception, Exception
{
connect();
OIL2Request request =
new OIL2Request(OIL2Constants.SERVER_RECEIVE, new Object[] { new Integer(subscriberId), new Long(wait)});
OIL2Response response = socketHandler.synchRequest(request);
return (SpyMessage) response.evalThrowsJMSException();
}
public void subscribe(ConnectionToken dc, org.jboss.mq.Subscription s) throws JMSException, Exception
{
connect();
OIL2Request request = new OIL2Request(OIL2Constants.SERVER_SUBSCRIBE, new Object[] { s });
OIL2Response response = socketHandler.synchRequest(request);
response.evalThrowsJMSException();
}
public void transact(org.jboss.mq.ConnectionToken dc, TransactionRequest t)
throws JMSException, Exception
{
connect();
OIL2Request request = new OIL2Request(OIL2Constants.SERVER_TRANSACT, new Object[] { t });
OIL2Response response = socketHandler.synchRequest(request);
response.evalThrowsJMSException();
}
public void unsubscribe(ConnectionToken dc, int subscriptionId) throws JMSException, Exception
{
connect();
OIL2Request request =
new OIL2Request(OIL2Constants.SERVER_UNSUBSCRIBE, new Object[] { new Integer(subscriptionId)});
OIL2Response response = socketHandler.synchRequest(request);
response.evalThrowsJMSException();
}
synchronized public void close()
{
try
{
if (socket != null)
{
socketHandler.stop();
in.close();
out.close();
socket.close();
socket = null;
}
}
catch (IOException e)
{
log.debug("Exception occured while closing opened resources: ", e);
}
}
}