package org.jboss.mq.il.http;
import java.lang.reflect.Method;
import java.net.URL;
import java.util.Properties;
import org.jboss.logging.Logger;
import org.jboss.mq.Connection;
import org.jboss.mq.il.ClientIL;
import org.jboss.mq.il.ClientILService;
public class HTTPClientILService implements Runnable, ClientILService
{
private static Logger log = Logger.getLogger(HTTPClientILService.class);
private HTTPClientIL clientIL;
private Connection connection;
private URL url = null;
private long timeout = 60000;
private long restInterval = 0;
private Thread worker;
private String clientILIdentifier;
private static int threadNumber= 0;
public HTTPClientILService()
{
if (log.isTraceEnabled())
{
log.trace("created");
}
}
public ClientIL getClientIL() throws Exception
{
if (log.isTraceEnabled())
{
log.trace("getClientIL()");
}
return this.clientIL;
}
public void init(Connection connection, Properties props) throws Exception
{
if (log.isTraceEnabled())
{
log.trace("init(Connection " + connection.toString() + ", Properties " + props.toString() + ")");
}
this.connection = connection;
this.url = HTTPClient.resolveServerUrl(props.getProperty(HTTPServerILFactory.SERVER_URL_KEY));
this.clientILIdentifier = this.getClientILIdentifier(this.url);
this.clientIL = new HTTPClientIL(this.clientILIdentifier);
try
{
if (System.getProperties().containsKey(HTTPServerILFactory.TIMEOUT_KEY))
{
this.timeout = Long.valueOf(System.getProperty(HTTPServerILFactory.TIMEOUT_KEY)).longValue();
}
else
{
this.timeout = Long.valueOf(props.getProperty(HTTPServerILFactory.TIMEOUT_KEY)).longValue();
}
if (System.getProperties().containsKey(HTTPServerILFactory.REST_INTERVAL_KEY))
{
this.restInterval = Long.valueOf(System.getProperty(HTTPServerILFactory.REST_INTERVAL_KEY)).longValue();
}
else
{
this.restInterval = Long.valueOf(props.getProperty(HTTPServerILFactory.REST_INTERVAL_KEY)).longValue();
}
}
catch (Exception exception)
{} }
public void start() throws Exception
{
if (log.isTraceEnabled())
{
log.trace("start()");
}
clientIL.stopped = false;
worker = new Thread(Connection.getThreadGroup(), this, "HTTPClientILService-" + threadNumber++);
worker.setDaemon(true);
worker.start();
}
public void stop() throws Exception
{
if (log.isTraceEnabled())
{
log.trace("stop()");
}
clientIL.stopped = true;
}
public void run()
{
if (log.isTraceEnabled())
{
log.trace("run()");
}
HTTPILRequest request = new HTTPILRequest();
request.setMethodName("clientListening");
while (clientIL.stopped == false)
{
try
{
if (this.clientILIdentifier != null && clientIL.stopped == false)
{
request.setArguments(new Object[]
{this.clientILIdentifier, new Long(this.timeout)}, new Class[]
{String.class, Long.class});
if (log.isDebugEnabled())
{
log.debug("Sending a request to '" + this.url.toString() + "' for ClientIL #" + this.clientILIdentifier + ".");
}
HTTPILRequest[] response = (HTTPILRequest[])HTTPClient.post(this.url, request);
if (response != null)
{
if (log.isDebugEnabled())
{
log.debug("Logging each response received in this batch for ClientIL #" + this.clientILIdentifier + ".");
}
for (int i = 0; i < response.length; i++)
{
if (log.isDebugEnabled())
{
log.debug(response.toString());
}
Method method = this.connection.getClass().getMethod(response[i].getMethodName(), response[i].getArgumentTypes());
method.invoke(this.connection, response[i].getArguments());
if (log.isDebugEnabled())
{
log.debug("Server invoked method '" + method.getName() + "' on ClientIL #" + this.clientILIdentifier + ".");
}
}
}
else
{
log.warn("The request posted to '" + this.url.toString() + "' on behalf of ClientIL #" + this.clientILIdentifier + " returned an unexpected response.");
}
try
{
if (log.isDebugEnabled())
{
log.debug("Resting " + String.valueOf(this.restInterval) + " milliseconds on ClientIL #" + this.clientILIdentifier + ".");
}
Thread.sleep(this.restInterval);
}
catch (InterruptedException exception)
{}
}
else
{
log.warn("ClientIL Id is null, waiting 50 milliseconds to get one.");
Thread.sleep(50);
}
}
catch (Exception exception)
{
if (log.isDebugEnabled())
{
log.debug("Exception of type '" + exception.getClass().getName() + "' occured when trying to receive request from server URL '" + this.url + ".'");
}
this.connection.asynchFailure(exception.getMessage(), exception);
break;
}
}
if (this.clientIL.stopped)
{
if (log.isDebugEnabled())
{
log.debug("Notifying the server that ClientIL #" + this.clientILIdentifier + " has stopped.");
}
try
{
request.setMethodName("stopClientListening");
request.setArguments(new Object[]
{this.clientILIdentifier}, new Class[]
{String.class});
HTTPClient.post(this.url, request);
}
catch (Exception exception)
{
if (log.isDebugEnabled())
{
log.debug("Attempt to notify the server that ClientIL #" + this.clientILIdentifier + " failed due to exception with description '" + exception.getMessage() + ".' This means that requests will stay in the storage queue even though the client has stopped.");
}
}
}
}
private String getClientILIdentifier(URL url) throws Exception
{
HTTPILRequest request = new HTTPILRequest();
request.setMethodName("getClientILIdentifer");
return (String)HTTPClient.post(url, request);
}
}