package org.jboss.remoting.transport.http;
import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
import org.apache.commons.httpclient.Header;
import org.apache.commons.httpclient.HttpParser;
import org.jboss.remoting.InvocationRequest;
import org.jboss.remoting.InvokerLocator;
import org.jboss.remoting.ServerInvoker;
import org.jboss.remoting.marshal.MarshalFactory;
import org.jboss.remoting.marshal.Marshaller;
import org.jboss.remoting.marshal.UnMarshaller;
import org.jboss.remoting.marshal.http.HTTPMarshaller;
import org.jboss.remoting.marshal.http.HTTPUnMarshaller;
import org.jboss.remoting.marshal.serializable.SerializableMarshaller;
import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
public class HTTPServerInvoker extends ServerInvoker implements Runnable
{
private static int BACKLOG_DEFAULT = 1000;
private static int MAX_POOL_SIZE_DEFAULT = 250;
private ServerSocket serverSocket = null;
private int serverBindPort = 8888;
private InetAddress bindAddress = null;
private boolean running = false;
private Thread[] acceptThreads;
private int numAcceptThreads = 1;
private PooledExecutor threadpool;
private int maxPoolSize = MAX_POOL_SIZE_DEFAULT;
private String serverBindAddress = "localhost";
protected int backlog = BACKLOG_DEFAULT;
public static String HTML = "text/html";
public static String PLAIN = "text/plain";
public static String SOAP = "application/soap+xml";
public static String BINARY = "application/octet-stream";
public static String HEADER_SESSION_ID = "sessionId";
public static String HEADER_SUBSYSTEM = "subsystem";
public HTTPServerInvoker(InvokerLocator locator)
{
super(locator);
}
public HTTPServerInvoker(InvokerLocator locator, Map configuration)
{
super(locator, configuration);
}
protected String getDefaultDataType()
{
return SerializableMarshaller.DATATYPE;
}
public boolean isTransportBiDirectional()
{
return false;
}
public void start() throws IOException
{
if(!running)
{
running = true;
try
{
threadpool = new PooledExecutor(new BoundedBuffer(maxPoolSize));
threadpool.waitWhenBlocked();
bindAddress = InetAddress.getByName(serverBindAddress);
serverBindPort = getLocator().getPort();
serverSocket = new ServerSocket(serverBindPort, backlog, bindAddress);
acceptThreads = new Thread[numAcceptThreads];
for(int i = 0; i < numAcceptThreads; i++)
{
String name = "HTTPServerInvoker#" + i + "-" + serverBindPort;
acceptThreads[i] = new Thread(this, name);
acceptThreads[i].start();
}
}
catch(IOException e)
{
log.error("Error starting ServerSocket. Bind port: " + serverBindPort + ", bind address: " + bindAddress);
throw e;
}
}
super.start();
}
public int getCurrentThreadPoolSize()
{
return threadpool.getPoolSize();
}
public void run()
{
while(running)
{
try
{
Socket socket = serverSocket.accept();
if(socket != null)
{
Runnable worker = new HTTPServerWorker(socket);
try
{
threadpool.execute(worker);
}
catch(InterruptedException e)
{
log.error("HTTP server worker thread interrupted.", e);
}
}
}
catch(Throwable thr)
{
log.error("Error processing incoming request.", thr);
}
}
}
private int getContentLength(Object response) throws IOException
{
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(response);
oos.flush();
bos.flush();
byte buffer[] = bos.toByteArray();
return buffer.length;
}
private InvocationRequest getInvocationRequest(Map metadata, Object obj)
{
InvocationRequest request = null;
if(obj instanceof InvocationRequest)
{
request = (InvocationRequest) obj;
if(request.getRequestPayload() == null)
{
request.setRequestPayload(metadata);
}
else
{
request.getRequestPayload().putAll(metadata);
}
}
else
{
request = createNewInvocationRequest(metadata, obj);
}
return request;
}
private InvocationRequest createNewInvocationRequest(Map metadata, Object payload)
{
String sessionId = getSessionId(metadata);
String subSystem = (String) metadata.get(HEADER_SUBSYSTEM);
InvocationRequest request = new InvocationRequest(sessionId, subSystem, payload, metadata, null, null);
return request;
}
private String getSessionId(Map metadata)
{
String sessionId = (String) metadata.get(HEADER_SESSION_ID);
if(sessionId == null || sessionId.length() == 0)
{
String userAgent = (String) metadata.get("User-Agent");
String host = (String) metadata.get("Host");
String idSeed = userAgent + ":" + host;
sessionId = Integer.toString(idSeed.hashCode());
}
return sessionId;
}
private boolean isBinary(String requestContentType)
{
if(BINARY.equalsIgnoreCase(requestContentType))
{
return true;
}
else
{
return false;
}
}
public void stop()
{
if(running)
{
running = false;
maxPoolSize = 0; for(int i = 0; i < acceptThreads.length; i++)
{
try
{
acceptThreads[i].interrupt();
}
catch(Exception ignored)
{
}
}
try
{
threadpool.shutdownAfterProcessingCurrentlyQueuedTasks();
threadpool.awaitTerminationAfterShutdown();
}
catch(InterruptedException e)
{
e.printStackTrace();
}
try
{
if(serverSocket != null && !serverSocket.isClosed())
{
serverSocket.close();
}
serverSocket = null;
}
catch(Exception e)
{
}
}
super.stop();
log.debug("HTTPServerInvoker stopped.");
}
public String getMBeanObjectName()
{
return "jboss.remoting:service=invoker,transport=http";
}
private class HTTPServerWorker implements Runnable
{
private Socket socket;
public HTTPServerWorker(Socket socket)
{
this.socket = socket;
}
public void run()
{
try
{
processRequest(this.socket);
}
catch(Throwable e)
{
log.error("Error processing http request in HTTPServerInvoker::HTTPServerWorker " + Thread.currentThread().getName(), e);
}
}
private void processRequest(Socket socket)
{
DataInputStream dataInput = null;
DataOutputStream dataOutput = null;
BufferedOutputStream bufferOutput = null;
try
{
Object response = null;
boolean isError = false;
String requestContentType = null;
try
{
dataInput = new DataInputStream(socket.getInputStream());
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
int ch;
while((ch = dataInput.read()) >= 0)
{
buffer.write(ch);
if(ch == '\n')
{
break;
}
}
String methodType = null;
String path = null;
String httpVersion = null;
byte[] firstLineRaw = buffer.toByteArray();
if(firstLineRaw[firstLineRaw.length - 2] == '\r')
{
String firstLine = new String(firstLineRaw).trim();
int startIndex = 0;
int endIndex = firstLine.indexOf(' ');
methodType = firstLine.substring(startIndex, endIndex);
startIndex = endIndex + 1;
endIndex = firstLine.indexOf(' ', startIndex);
path = firstLine.substring(startIndex, endIndex);
startIndex = endIndex + 1;
httpVersion = firstLine.substring(startIndex);
}
else
{
System.err.println("Error processing first line. Should have ended in \r\n, but did not");
throw new RuntimeException("Error processing HTTP request type. First line of request is invalid.");
}
Map metadata = new HashMap();
Header[] headers = HttpParser.parseHeaders(dataInput);
for(int x = 0; x < headers.length; x++)
{
String headerName = headers[x].getName();
String headerValue = headers[x].getName();
metadata.put(headerName, headerValue);
if("Content-Type".equalsIgnoreCase(headerName))
{
requestContentType = headers[x].getValue();
}
}
UnMarshaller unmarshaller = MarshalFactory.getUnMarshaller(HTTPUnMarshaller.DATATYPE);
Object obj = unmarshaller.read(dataInput, metadata);
InvocationRequest request = null;
if(obj instanceof InvocationRequest)
{
request = (InvocationRequest) obj;
}
else
{
if(isBinary(requestContentType))
{
request = getInvocationRequest(metadata, obj);
}
else
{
request = createNewInvocationRequest(metadata, obj);
}
}
try
{
response = invoke(request);
}
catch(Throwable ex)
{
log.debug("Error thrown calling invoke on server invoker.", ex);
response = ex;
isError = true;
}
dataOutput = new DataOutputStream(socket.getOutputStream());
bufferOutput = new BufferedOutputStream(dataOutput, 1024);
}
catch(Throwable thr)
{
log.debug("Error thrown processing request. Probably error with processing headers.", thr);
if(thr instanceof Exception)
{
response = (Exception) thr;
}
else
{
response = new Exception(thr);
}
isError = true;
}
if(response == null)
{
bufferOutput.write("HTTP/1.1 ".getBytes());
bufferOutput.write("200 JBoss Remoting: Request Processed Successfully".getBytes());
String contentType = "\r\n" + "Content-Type" + ": " + "text/html";
bufferOutput.write(contentType.getBytes());
String contentLength = "\r\n" + "Content-Length" + ": " + 0;
bufferOutput.write(contentLength.getBytes());
bufferOutput.flush();
}
else
{
bufferOutput.write("HTTP/1.1 ".getBytes());
String status = isError ? "500 JBoss Remoting: Error occurred within target application." :
"200 JBoss Remoting: Request Processed Succussfully";
bufferOutput.write(status.getBytes());
String contentType = "\r\n" + "Content-Type" + ": " + requestContentType;
bufferOutput.write(contentType.getBytes());
int iContentLength = getContentLength(response);
String contentLength = "\r\n" + "Content-Length" + ": " + iContentLength;
bufferOutput.write(contentLength.getBytes());
bufferOutput.write(("\r\n" + "\r\n").getBytes());
Marshaller marshaller = MarshalFactory.getMarshaller(getLocator(), this.getClass().getClassLoader());
if(marshaller == null)
{
marshaller = MarshalFactory.getMarshaller(HTTPMarshaller.DATATYPE);
}
marshaller.write(response, bufferOutput);
}
}
catch(Exception e)
{
log.error("Error processing client request.", e);
}
finally
{
if(dataInput != null)
{
try
{
dataInput.close();
}
catch(Exception e)
{
log.warn("Error closing resource.", e);
}
}
if(dataOutput != null)
{
try
{
dataOutput.close();
}
catch(Exception e)
{
log.warn("Error closing resource.", e);
}
}
try
{
socket.close();
}
catch(Exception e)
{
log.warn("Error closing resource.", e);
}
}
}
}
}