package org.jboss.invocation.pooled.interfaces;
import java.io.IOException;
import java.io.Externalizable;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.BufferedOutputStream;
import java.io.BufferedInputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;
import java.rmi.MarshalledObject;
import java.rmi.NoSuchObjectException;
import java.rmi.ServerException;
import java.rmi.ConnectException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import javax.transaction.TransactionRolledbackException;
import javax.transaction.SystemException;
import org.jboss.invocation.Invocation;
import org.jboss.invocation.Invoker;
import org.jboss.tm.TransactionPropagationContextFactory;
public class PooledInvokerProxy
implements Invoker, Externalizable
{
private static final long serialVersionUID = -1456509931095566410L;
protected static TransactionPropagationContextFactory tpcFactory = null;
public static void setTPCFactory(TransactionPropagationContextFactory tpcf) {
tpcFactory = tpcf;
}
public static long getSocketTime = 0;
public static long readTime = 0;
public static long writeTime = 0;
public static long serializeTime = 0;
public static long deserializeTime = 0;
public static long usedPooled = 0;
public static int MAX_RETRIES = 10;
protected static HashMap connectionPools = new HashMap();
protected ServerAddress address;
protected LinkedList pool = null;
protected int maxPoolSize;
protected static class ClientSocket
{
public ObjectOutputStream out;
public ObjectInputStream in;
public Socket socket;
public int timeout;
public ClientSocket(Socket socket, int timeout) throws Exception
{
this.socket = socket;
socket.setSoTimeout(timeout);
this.timeout = timeout;
out = new OptimizedObjectOutputStream(new BufferedOutputStream(socket.getOutputStream()));
out.flush();
in = new OptimizedObjectInputStream(new BufferedInputStream(socket.getInputStream()));
}
protected void finalize()
{
if (socket != null)
{
try { socket.close(); } catch (Exception ignored) {}
}
}
}
public PooledInvokerProxy()
{
super();
}
public PooledInvokerProxy(ServerAddress sa, int maxPoolSize)
{
this.address = sa;
this.maxPoolSize = maxPoolSize;
}
public static void clearPool(ServerAddress sa)
{
try
{
LinkedList thepool = (LinkedList)connectionPools.get(sa);
if (thepool == null) return;
synchronized (thepool)
{
int size = thepool.size();
for (int i = 0; i < size; i++)
{
ClientSocket socket = (ClientSocket)thepool.removeFirst();
try
{
socket.socket.close();
socket.socket = null;
}
catch (Exception ignored)
{
}
}
}
}
catch (Exception ex)
{
}
}
public static void clearPools()
{
synchronized (connectionPools)
{
Iterator it = connectionPools.keySet().iterator();
while (it.hasNext())
{
ServerAddress sa = (ServerAddress)it.next();
clearPool(sa);
}
}
}
protected void initPool()
{
synchronized (connectionPools)
{
pool = (LinkedList)connectionPools.get(address);
if (pool == null)
{
pool = new LinkedList();
connectionPools.put(address, pool);
}
}
}
protected ClientSocket getConnection() throws Exception
{
Socket socket = null;
for (int i = 0; i < MAX_RETRIES; i++)
{
synchronized(pool)
{
if (pool.size() > 0)
{
ClientSocket pooled = getPooledConnection();
if (pooled != null)
{
usedPooled++;
return pooled;
}
}
}
try
{
socket = new Socket(address.address, address.port);
break;
}
catch (Exception ex)
{
if (i + 1 < MAX_RETRIES)
{
Thread.sleep(1);
continue;
}
throw ex;
}
}
socket.setTcpNoDelay(address.enableTcpNoDelay);
return new ClientSocket(socket, address.timeout);
}
protected ClientSocket getPooledConnection()
{
ClientSocket socket = null;
while (pool.size() > 0)
{
socket = (ClientSocket)pool.removeFirst();
try
{
final byte ACK = 1;
socket.out.writeByte(ACK);
socket.out.flush();
socket.in.readByte();
return socket;
}
catch (Exception ex)
{
try
{
socket.socket.close();
}
catch (Exception ignored) {}
}
}
return null;
}
public String getServerHostName() throws Exception
{
return address.address;
}
public Object getTransactionPropagationContext()
throws SystemException
{
return (tpcFactory == null) ? null : tpcFactory.getTransactionPropagationContext();
}
public Object invoke(Invocation invocation)
throws Exception
{
PooledMarshalledInvocation mi = new PooledMarshalledInvocation(invocation);
mi.setTransactionPropagationContext(getTransactionPropagationContext());
Object response = null;
long start = System.currentTimeMillis();
ClientSocket socket = getConnection();
long end = System.currentTimeMillis() - start;
getSocketTime += end;
try
{
socket.out.writeObject(mi);
socket.out.reset();
socket.out.writeObject(Boolean.TRUE); socket.out.flush();
socket.out.reset();
end = System.currentTimeMillis() - start;
writeTime += end;
start = System.currentTimeMillis();
response = socket.in.readObject();
socket.in.readObject();
end = System.currentTimeMillis() - start;
readTime += end;
}
catch (Exception ex)
{
try
{
socket.socket.close();
}
catch (Exception ignored) {}
throw new ConnectException("Failed to communicate", ex);
}
synchronized (pool)
{
if (pool.size() < maxPoolSize)
{
pool.add(socket);
}
else
{
try
{
socket.socket.close();
}
catch (Exception ignored) {}
}
}
try
{
if (response instanceof Exception)
{
throw ((Exception)response);
}
if (response instanceof MarshalledObject)
{
return ((MarshalledObject)response).get();
}
return response;
}
catch (ServerException ex)
{
if (ex.detail instanceof NoSuchObjectException)
{
throw (NoSuchObjectException) ex.detail;
}
if (ex.detail instanceof TransactionRolledbackException)
{
throw (TransactionRolledbackException) ex.detail;
}
throw ex;
}
}
public void writeExternal(final ObjectOutput out)
throws IOException
{
out.writeObject(address);
out.writeInt(maxPoolSize);
}
public void readExternal(final ObjectInput in)
throws IOException, ClassNotFoundException
{
address = (ServerAddress)in.readObject();
maxPoolSize = in.readInt();
initPool();
}
}