package org.jboss.mq.pm.jdbc3;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.util.Properties;
import javax.jms.JMSException;
import javax.management.ObjectName;
import javax.naming.InitialContext;
import javax.sql.DataSource;
import javax.transaction.Status;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import org.jboss.mq.DurableSubscriptionID;
import org.jboss.mq.SpyDestination;
import org.jboss.mq.SpyJMSException;
import org.jboss.mq.SpyMessage;
import org.jboss.mq.SpyTopic;
import org.jboss.mq.pm.CacheStore;
import org.jboss.mq.pm.NewPersistenceManager;
import org.jboss.mq.pm.Tx;
import org.jboss.mq.pm.TxManager;
import org.jboss.mq.server.JMSDestination;
import org.jboss.mq.server.JMSTopic;
import org.jboss.mq.server.MessageCache;
import org.jboss.mq.server.MessageReference;
import org.jboss.system.ServiceMBeanSupport;
import org.jboss.tm.TransactionManagerService;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong;
public class PersistenceManager
extends ServiceMBeanSupport
implements PersistenceManagerMBean, NewPersistenceManager, CacheStore, Runnable
{
static final int OBJECT_BLOB = 0;
static final int BYTES_BLOB = 1;
static final int BINARYSTREAM_BLOB = 2;
static final int BLOB_BLOB = 3;
private SynchronizedLong nextTransactionId = new SynchronizedLong(0l);
private TxManager txManager;
private DataSource datasource;
private TransactionManager tm;
private ObjectName connectionManagerName;
private Properties sqlProperties = new Properties();
String UPDATE_MARKED_MESSAGES = "UPDATE JMS_MESSAGE_LOG SET TXID=?, TXOP=? WHERE TXOP=?";
String UPDATE_MARKED_REFERENCES = "UPDATE JMS_REFERENCE_LOG SET TXID=?, TXOP=? WHERE TXOP=?";
String UPDATE_MARKED_MESSAGES_WITH_TX = "UPDATE JMS_MESSAGE_LOG SET TXID=?, TXOP=? WHERE TXOP=? AND TXID=?";
String UPDATE_MARKED_REFERENCES_WITH_TX = "UPDATE JMS_REFERENCE_LOG SET TXID=?, TXOP=? WHERE TXOP=? AND TXID=?";
String DELETE_MARKED_MESSAGES_WITH_TX = "DELETE FROM JMS_MESSAGE_LOG WHERE TXID IN (SELECT TXID FROM JMS_TRANSACTION_LOG) AND TXOP=?";
String DELETE_MARKED_REFERENCES_WITH_TX = "DELETE FROM JMS_REFERENCE_LOG WHERE TXID IN (SELECT TXID FROM JMS_TRANSACTION_LOG) AND TXOP=?";
String DELETE_TX = "DELETE FROM JMS_TRANSACTION_LOG WHERE TXID = ?";
String DELETE_MARKED_MESSAGES = "DELETE FROM JMS_MESSAGE_LOG WHERE TXID=? AND TXOP=?";
String DELETE_MARKED_REFERENCES = "DELETE FROM JMS_REFERENCE_LOG WHERE TXID=? AND TXOP=?";
String DELETE_TEMPORARY_MESSAGES = "DELETE FROM JMS_MESSAGE_LOG WHERE TXOP='T'";
String DELETE_TEMPORARY_REFERENCES = "DELETE FROM JMS_REFERENCE_LOG WHERE TXOP='T'";
String INSERT_TX = "INSERT INTO JMS_TRANSACTION_LOG (TXID) values(?)";
String SELECT_MAX_TX = "SELECT MAX(TXID) FROM JMS_TRANSACTION_LOG";
String SELECT_MESSAGES_IN_DEST = "SELECT MESSAGEID, MESSAGEBLOB FROM JMS_MESSAGE_LOG WHERE DESTINATION=?";
String SELECT_REFERENCES_IN_DEST = "SELECT R.MESSAGEID, M.MESSAGEBLOB, R.REDELIVERED, R.REDELIVERS FROM JMS_REFERENCE_LOG AS R, JMS_MESSAGE_LOG AS M" +
" WHERE R.MESSAGEID = M.MESSAGEID AND R.DESTINATION=?";
String SELECT_MESSAGE = "SELECT MESSAGEID, MESSAGEBLOB FROM JMS_MESSAGE_LOG WHERE MESSAGEID=? AND DESTINATION=?";
String INSERT_MESSAGE = "INSERT INTO JMS_MESSAGE_LOG (MESSAGEID, DESTINATION, MESSAGEBLOB, TXID, TXOP, LATECLONE) VALUES(?,?,?,?,?,?)";
String INSERT_REFERENCE = "INSERT INTO JMS_REFERENCE_LOG (MESSAGEID, DESTINATION, TXID, TXOP, REDELIVERED, REDELIVERS) VALUES(?,?,?,?,?,?)";
String MARK_MESSAGE = "UPDATE JMS_MESSAGE_LOG SET TXID=?, TXOP=? WHERE MESSAGEID=? AND DESTINATION=?";
String MARK_REFERENCE = "UPDATE JMS_REFERENCE_LOG SET TXID=?, TXOP=? WHERE MESSAGEID=? AND DESTINATION=?";
String DELETE_MESSAGE = "DELETE FROM JMS_MESSAGE_LOG WHERE MESSAGEID=? AND DESTINATION=?";
String DELETE_REFERENCE = "DELETE FROM JMS_REFERENCE_LOG WHERE MESSAGEID=? AND DESTINATION=?";
String UPDATE_MESSAGE = "UPDATE JMS_MESSAGE_LOG SET MESSAGEBLOB=? WHERE MESSAGEID=? AND DESTINATION=?";
String UPDATE_REFERENCE = "UPDATE JMS_REFERENCE_LOG SET REDELIVERED=?, REDELIVERS=? WHERE MESSAGEID=? AND DESTINATION=?";
String DELETE_ORPHANED_MESSAGES = "DELETE FROM JMS_MESSAGE_LOG WHERE LATECLONE = '1' AND MESSAGEID NOT IN (SELECT MESSAGEID FROM JMS_REFERENCE_LOG)";
String DELETE_ALL_TXS = "DELETE FROM JMS_TRANSACTION_LOG";
String CREATE_REFERENCE_TABLE =
"CREATE TABLE JMS_REFERENCE_LOG ( MESSAGEID INTEGER NOT NULL, "
+ "DESTINATION VARCHAR(256) NOT NULL, TXID INTEGER, TXOP CHAR(1), "
+ "REDELIVERED CHAR(1), REDELIVERS INTEGER, "
+ "PRIMARY KEY (MESSAGEID, DESTINATION) )";
String CREATE_MESSAGE_TABLE =
"CREATE TABLE JMS_MESSAGE_LOG ( MESSAGEID INTEGER NOT NULL, "
+ "DESTINATION VARCHAR(256), TXID INTEGER, TXOP CHAR(1), LATECLONE CHAR(1), "
+ "MESSAGEBLOB OBJECT, PRIMARY KEY (MESSAGEID, DESTINATION) )";
String CREATE_TX_TABLE = "CREATE TABLE JMS_TRANSACTION_LOG ( TXID INTEGER )";
int blobType = OBJECT_BLOB;
boolean createTables = true;
private int connectionRetryAttempts = 5;
private long gcPeriod = 60000;
private Thread gcThread;
public PersistenceManager() throws javax.jms.JMSException
{
txManager = new TxManager(this);
}
public ObjectName getConnectionManager()
{
return connectionManagerName;
}
public void setConnectionManager(ObjectName connectionManagerName)
{
this.connectionManagerName = connectionManagerName;
}
public int getGCPeriodSecs()
{
return (int) gcPeriod / 1000;
}
public void setGCPeriodSecs(int gcPeriodSecs)
{
this.gcPeriod = gcPeriodSecs * 1000;
}
public int getConnectionRetryAttempts()
{
return this.connectionRetryAttempts;
}
public void setConnectionRetryAttempts(int value)
{
this.connectionRetryAttempts = value;
}
public String getSqlProperties()
{
try
{
ByteArrayOutputStream boa = new ByteArrayOutputStream();
sqlProperties.store(boa, "");
return new String(boa.toByteArray());
}
catch (IOException shouldnothappen)
{
return "";
}
}
public void setSqlProperties(String value)
{
try
{
ByteArrayInputStream is = new ByteArrayInputStream(value.getBytes());
sqlProperties = new Properties();
sqlProperties.load(is);
}
catch (IOException shouldnothappen)
{
}
}
public Tx createPersistentTx() throws JMSException
{
Tx id = new Tx(nextTransactionId.increment());
TransactionManagerStrategy tms = new TransactionManagerStrategy();
tms.startTX();
Connection c = null;
PreparedStatement stmt = null;
boolean threadWasInterrupted = Thread.interrupted();
try
{
c = this.getConnection();
stmt = c.prepareStatement(INSERT_TX);
stmt.setLong(1, id.longValue());
stmt.executeUpdate();
}
catch (SQLException e)
{
tms.setRollbackOnly();
throw new SpyJMSException("Could not crate tx: " + id, e);
}
finally
{
try
{
stmt.close();
}
catch (Throwable ignore)
{
}
try
{
c.close();
}
catch (Throwable ignore)
{
}
tms.endTX();
if( threadWasInterrupted )
Thread.currentThread().interrupt();
}
return id;
}
public void commitPersistentTx(Tx txId) throws JMSException
{
TransactionManagerStrategy tms = new TransactionManagerStrategy();
tms.startTX();
Connection c = null;
boolean threadWasInterrupted = Thread.interrupted();
try
{
c = this.getConnection();
removeMarkedMessages(c, txId, "D");
removeMarkedReferences(c, txId, "D");
removeTXRecord(c, txId.longValue());
}
catch (SQLException e)
{
tms.setRollbackOnly();
throw new SpyJMSException("Could not commit tx: " + txId, e);
}
finally
{
try
{
c.close();
}
catch (Throwable ignore)
{
}
tms.endTX();
if( threadWasInterrupted )
Thread.currentThread().interrupt();
}
}
public void rollbackPersistentTx(Tx txId) throws JMSException
{
TransactionManagerStrategy tms = new TransactionManagerStrategy();
tms.startTX();
Connection c = null;
PreparedStatement stmt = null;
boolean threadWasInterrupted = Thread.interrupted();
try
{
c = this.getConnection();
removeMarkedMessages(c, txId, "A");
removeMarkedReferences(c, txId, "A");
removeTXRecord(c, txId.longValue());
stmt = c.prepareStatement(UPDATE_MARKED_MESSAGES_WITH_TX);
stmt.setNull(1, Types.BIGINT);
stmt.setString(2, "A");
stmt.setString(3, "D");
stmt.setLong(4, txId.longValue());
stmt.executeUpdate();
stmt.close();
stmt = c.prepareStatement(UPDATE_MARKED_REFERENCES_WITH_TX);
stmt.setNull(1, Types.BIGINT);
stmt.setString(2, "A");
stmt.setString(3, "D");
stmt.setLong(4, txId.longValue());
stmt.executeUpdate();
stmt.close();
}
catch (SQLException e)
{
tms.setRollbackOnly();
throw new SpyJMSException("Could not rollback tx: " + txId, e);
}
finally
{
try
{
if (stmt != null)
stmt.close();
if (c != null)
c.close();
}
catch (Throwable ignore)
{
}
tms.endTX();
if( threadWasInterrupted )
Thread.currentThread().interrupt();
}
}
public void add(MessageReference messageRef, Tx txId) throws JMSException
{
boolean trace = log.isTraceEnabled();
if (trace)
log.trace("About to add message " + messageRef + " transaction=" + txId);
TransactionManagerStrategy tms = new TransactionManagerStrategy();
tms.startTX();
Connection c = null;
boolean threadWasInterrupted = Thread.interrupted();
try
{
c = this.getConnection();
synchronized(messageRef)
{
if (trace)
log.trace("Inserting message " + messageRef + " transaction=" + txId);
if (messageRef.isLateClone())
{
addReference(c, messageRef.getPersistentKey(), messageRef, txId, "A");
}
else
{
SpyMessage message = messageRef.getMessage();
addMessage(c, messageRef.getPersistentKey(), message, txId, "A", "0");
}
messageRef.setStored(MessageReference.STORED);
if (trace)
log.trace("Added message " + messageRef + " transaction=" + txId);
}
}
catch (IOException e)
{
tms.setRollbackOnly();
throw new SpyJMSException("Could not store message: " + messageRef, e);
}
catch (SQLException e)
{
tms.setRollbackOnly();
throw new SpyJMSException("Could not store message: " + messageRef, e);
}
finally
{
try
{
c.close();
}
catch (Throwable ignore)
{
}
tms.endTX();
if( threadWasInterrupted )
Thread.currentThread().interrupt();
}
}
public void update(MessageReference messageRef, Tx txId) throws JMSException
{
boolean trace = log.isTraceEnabled();
if (trace)
log.trace("Updating message " + messageRef + " transaction=" + txId);
TransactionManagerStrategy tms = new TransactionManagerStrategy();
tms.startTX();
Connection c = null;
PreparedStatement stmt = null;
boolean threadWasInterrupted = Thread.interrupted();
try
{
c = this.getConnection();
if (txId == null)
{
if (messageRef.isLateClone())
{
stmt = c.prepareStatement(UPDATE_REFERENCE);
if (messageRef.redelivered)
stmt.setString(1, "1");
else
stmt.setString(1, "0");
stmt.setLong(2, messageRef.redeliveryCount);
stmt.setLong(3, messageRef.messageId);
stmt.setString(4, messageRef.getPersistentKey());
}
else
{
stmt = c.prepareStatement(UPDATE_MESSAGE);
setBlob(stmt, 1, messageRef.getMessage());
stmt.setLong(2, messageRef.messageId);
stmt.setString(3, messageRef.getPersistentKey());
}
int rc = stmt.executeUpdate();
if( rc != 1 )
throw new SpyJMSException("Could not update the message in the database: update affected "+rc+" rows");
}
else
{
throw new SpyJMSException("NYI: Updating a message in a transaction is not currently used");
}
if (trace)
log.trace("Updated message " + messageRef + " transaction=" + txId);
}
catch (IOException e)
{
tms.setRollbackOnly();
throw new SpyJMSException("Could not update message: " + messageRef, e);
}
catch (SQLException e)
{
tms.setRollbackOnly();
throw new SpyJMSException("Could not update message: " + messageRef, e);
}
finally
{
try
{
stmt.close();
}
catch (Throwable ignore)
{
}
try
{
c.close();
}
catch (Throwable ignore)
{
}
tms.endTX();
if( threadWasInterrupted )
Thread.currentThread().interrupt();
}
}
public void remove(MessageReference messageRef, Tx txId) throws JMSException
{
boolean trace = log.isTraceEnabled();
if (trace)
log.trace("Removing message " + messageRef + " transaction=" + txId);
TransactionManagerStrategy tms = new TransactionManagerStrategy();
tms.startTX();
Connection c = null;
PreparedStatement stmt = null;
boolean threadWasInterrupted = Thread.interrupted();
try
{
c = this.getConnection();
synchronized(messageRef)
{
if (txId == null)
{
if (messageRef.isLateClone())
stmt = c.prepareStatement(DELETE_REFERENCE);
else
stmt = c.prepareStatement(DELETE_MESSAGE);
stmt.setLong(1, messageRef.messageId);
stmt.setString(2, messageRef.getPersistentKey());
int rc = stmt.executeUpdate();
if( rc != 1 )
throw new SpyJMSException("Could not delete the message from the database: delete affected "+rc+" rows");
messageRef.setStored(MessageReference.NOT_STORED);
messageRef.removeDelayed();
}
else
{
if (messageRef.isLateClone())
{
stmt = c.prepareStatement(MARK_REFERENCE);
stmt.setLong(1, txId.longValue());
stmt.setString(2, "D");
stmt.setLong(3, messageRef.messageId);
stmt.setString(4, messageRef.getPersistentKey());
}
else
{
stmt = c.prepareStatement(MARK_MESSAGE);
stmt.setLong(1, txId.longValue());
stmt.setString(2, "D");
stmt.setLong(3, messageRef.messageId);
stmt.setString(4, messageRef.getPersistentKey());
}
int rc = stmt.executeUpdate();
if( rc != 1 )
throw new SpyJMSException("Could not mark the message as deleted in the database: update affected "+rc+" rows");
}
if (trace)
log.trace("Removed message " + messageRef + " transaction=" + txId);
}
}
catch (SQLException e)
{
tms.setRollbackOnly();
throw new SpyJMSException("Could not remove message: " + messageRef, e);
}
finally
{
try
{
stmt.close();
}
catch (Throwable ignore)
{
}
try
{
c.close();
}
catch (Throwable ignore)
{
}
tms.endTX();
if( threadWasInterrupted )
Thread.currentThread().interrupt();
}
}
public synchronized void restoreQueue(JMSDestination jmsDest, SpyDestination dest) throws javax.jms.JMSException
{
if (jmsDest == null)
throw new IllegalArgumentException("Must supply non null JMSDestination to restoreQueue");
if (dest == null)
throw new IllegalArgumentException("Must supply non null SpyDestination to restoreQueue");
TransactionManagerStrategy tms = new TransactionManagerStrategy();
tms.startTX();
Connection c = null;
PreparedStatement stmt = null;
ResultSet rs = null;
boolean threadWasInterrupted = Thread.interrupted();
try
{
c = this.getConnection();
int counter=0;
if (jmsDest.parameters.lateClone)
{
JMSTopic topic = (JMSTopic) jmsDest;
DurableSubscriptionID id = ((SpyTopic) dest).getDurableSubscriptionID();
stmt = c.prepareStatement(SELECT_REFERENCES_IN_DEST);
stmt.setString(1, dest.toString());
rs = stmt.executeQuery();
while (rs.next())
{
SpyMessage message = extractMessage(rs, 2);
boolean redelivered = false;
if (rs.getString(3).equals("1"))
redelivered = true;
message.header.jmsRedelivered = redelivered;
message.header.jmsProperties.put(SpyMessage.PROPERTY_REDELIVERY_COUNT, new Integer(rs.getInt(4)));
topic.restoreMessage(message, id);
counter++;
}
}
else
{
stmt = c.prepareStatement(SELECT_MESSAGES_IN_DEST);
stmt.setString(1, dest.toString());
rs = stmt.executeQuery();
while (rs.next())
{
SpyMessage message = extractMessage(rs, 2);
if (dest instanceof SpyTopic)
message.header.durableSubscriberID = ((SpyTopic)dest).getDurableSubscriptionID();
jmsDest.restoreMessage(message);
counter++;
}
}
log.debug("Restored "+counter+" message(s) to: "+dest);
}
catch (IOException e)
{
tms.setRollbackOnly();
throw new SpyJMSException("Could not restore messages to destination : " + dest.toString(), e);
}
catch (SQLException e)
{
tms.setRollbackOnly();
throw new SpyJMSException("Could not restore messages to destination : " + dest.toString(), e);
}
finally
{
try
{
rs.close();
}
catch (Throwable ignore)
{
}
try
{
stmt.close();
}
catch (Throwable ignore)
{
}
try
{
c.close();
}
catch (Throwable ignore)
{
}
tms.endTX();
if( threadWasInterrupted )
Thread.currentThread().interrupt();
}
}
public TxManager getTxManager()
{
return txManager;
}
public void closeQueue(JMSDestination jmsDest, SpyDestination dest) throws JMSException
{
}
public MessageCache getMessageCacheInstance()
{
throw new UnsupportedOperationException("This is now set on the destination manager");
}
public void addMessage(SpyMessage message) throws JMSException
{
TransactionManagerStrategy tms = new TransactionManagerStrategy();
tms.startTX();
Connection c = null;
boolean threadWasInterrupted = Thread.interrupted();
try
{
c = datasource.getConnection();
addMessage(c, "*", message, null, null, "1");
}
catch (IOException e)
{
tms.setRollbackOnly();
throw new SpyJMSException("Could not add message:", e);
}
catch (SQLException e)
{
tms.setRollbackOnly();
throw new SpyJMSException("Could not add message:", e);
}
finally
{
try
{
if (c != null)
c.close();
}
catch (Throwable ignore)
{
}
tms.endTX();
if( threadWasInterrupted )
Thread.currentThread().interrupt();
}
}
public Object getInstance()
{
return this;
}
public ObjectName getMessageCache()
{
throw new UnsupportedOperationException("This is now set on the destination manager");
}
public void setMessageCache(ObjectName messageCache)
{
throw new UnsupportedOperationException("This is now set on the destination manager");
}
public SpyMessage loadFromStorage(MessageReference messageRef) throws JMSException
{
if (log.isTraceEnabled())
log.trace("Loading message from storage " + messageRef);
TransactionManagerStrategy tms = new TransactionManagerStrategy();
tms.startTX();
Connection c = null;
PreparedStatement stmt = null;
ResultSet rs = null;
boolean threadWasInterrupted = Thread.interrupted();
try
{
c = this.getConnection();
stmt = c.prepareStatement(SELECT_MESSAGE);
stmt.setLong(1, messageRef.messageId);
if (messageRef.isLateClone())
stmt.setString(2, "*");
else
stmt.setString(2, messageRef.getPersistentKey());
rs = stmt.executeQuery();
if (rs.next())
return extractMessage(rs, 2);
return null;
}
catch (IOException e)
{
tms.setRollbackOnly();
throw new SpyJMSException("Could not load message : " + messageRef, e);
}
catch (SQLException e)
{
tms.setRollbackOnly();
throw new SpyJMSException("Could not load message : " + messageRef, e);
}
finally
{
try
{
rs.close();
}
catch (Throwable ignore)
{
}
try
{
stmt.close();
}
catch (Throwable ignore)
{
}
try
{
c.close();
}
catch (Throwable ignore)
{
}
tms.endTX();
if( threadWasInterrupted )
Thread.currentThread().interrupt();
}
}
public void removeFromStorage(MessageReference messageRef) throws JMSException
{
if (messageRef.isPersistent())
return;
boolean trace = log.isTraceEnabled();
if (trace)
log.trace("Removing message from storage " + messageRef);
TransactionManagerStrategy tms = new TransactionManagerStrategy();
tms.startTX();
Connection c = null;
PreparedStatement stmt = null;
boolean threadWasInterrupted = Thread.interrupted();
try
{
c = this.getConnection();
if (messageRef.isLateClone())
{
stmt = c.prepareStatement(DELETE_REFERENCE);
stmt.setLong(1, messageRef.messageId);
stmt.setString(2, messageRef.getPersistentKey());
stmt.executeUpdate();
messageRef.setStored(MessageReference.NOT_STORED);
}
else
{
stmt = c.prepareStatement(DELETE_MESSAGE);
stmt.setLong(1, messageRef.messageId);
stmt.setString(2, messageRef.getPersistentKey());
stmt.executeUpdate();
messageRef.setStored(MessageReference.NOT_STORED);
}
if (trace)
log.trace("Removed message from storage " + messageRef);
}
catch (SQLException e)
{
tms.setRollbackOnly();
throw new SpyJMSException("Could not remove message: " + messageRef, e);
}
finally
{
try
{
stmt.close();
}
catch (Throwable ignore)
{
}
try
{
c.close();
}
catch (Throwable ignore)
{
}
tms.endTX();
if( threadWasInterrupted )
Thread.currentThread().interrupt();
}
}
public void saveToStorage(MessageReference messageRef, SpyMessage message) throws JMSException
{
if (messageRef.isPersistent())
return;
boolean trace = log.isTraceEnabled();
if (trace)
log.trace("Saving message to storage " + messageRef);
TransactionManagerStrategy tms = new TransactionManagerStrategy();
tms.startTX();
Connection c = null;
boolean threadWasInterrupted = Thread.interrupted();
try
{
c = this.getConnection();
if (messageRef.isLateClone())
{
addReference(c, messageRef.getPersistentKey(), messageRef, null, "T");
try
{
addMessage(c, "*", message, null, "T", "1");
}
catch (SQLException e)
{
log.trace("TODO: Check this is really a duplicate", e);
}
}
else
{
addMessage(c, messageRef.getPersistentKey(), message, null, "T", "0");
}
messageRef.setStored(MessageReference.STORED);
if (trace)
log.trace("Saved message to storage " + messageRef);
}
catch (IOException e)
{
tms.setRollbackOnly();
throw new SpyJMSException("Could not store message: " + messageRef, e);
}
catch (SQLException e)
{
tms.setRollbackOnly();
throw new SpyJMSException("Could not store message: " + messageRef, e);
}
finally
{
try
{
c.close();
}
catch (Throwable ignore)
{
}
tms.endTX();
if( threadWasInterrupted )
Thread.currentThread().interrupt();
}
}
public void run()
{
Thread current = Thread.currentThread();
while (gcThread == current)
{
try
{
Thread.sleep(gcPeriod);
if (gcThread != current)
return;
Connection connection = datasource.getConnection();
try
{
PreparedStatement stmt = connection.prepareStatement(DELETE_ORPHANED_MESSAGES);
try
{
stmt.executeUpdate();
}
finally
{
try
{
stmt.close();
}
catch (SQLException ignored)
{
log.trace("Error closing statement", ignored);
}
}
}
finally
{
try
{
connection.close();
}
catch (SQLException ignored)
{
log.trace("Error closing connection", ignored);
}
}
}
catch (InterruptedException ignored)
{
}
catch (Throwable t)
{
log.warn("Unhandled throwable in gc thread:", t);
}
}
}
protected void startService() throws Exception
{
UPDATE_MARKED_MESSAGES = sqlProperties.getProperty("UPDATE_MARKED_MESSAGES", UPDATE_MARKED_MESSAGES);
UPDATE_MARKED_REFERENCES = sqlProperties.getProperty("UPDATE_MARKED_REFERENCES", UPDATE_MARKED_REFERENCES);
UPDATE_MARKED_MESSAGES_WITH_TX = sqlProperties.getProperty("UPDATE_MARKED_MESSAGES_WITH_TX", UPDATE_MARKED_MESSAGES_WITH_TX);
UPDATE_MARKED_REFERENCES_WITH_TX = sqlProperties.getProperty("UPDATE_MARKED_REFERENCES_WITH_TX", UPDATE_MARKED_REFERENCES_WITH_TX);
DELETE_MARKED_MESSAGES_WITH_TX = sqlProperties.getProperty("DELETE_MARKED_MESSAGES_WITH_TX", DELETE_MARKED_MESSAGES_WITH_TX);
DELETE_MARKED_REFERENCES_WITH_TX = sqlProperties.getProperty("DELETE_MARKED_REFERENCES_WITH_TX", DELETE_MARKED_REFERENCES_WITH_TX);
DELETE_TX = sqlProperties.getProperty("DELETE_TX", DELETE_TX);
DELETE_MARKED_MESSAGES = sqlProperties.getProperty("DELETE_MARKED_MESSAGES", DELETE_MARKED_MESSAGES);
DELETE_MARKED_REFERENCES = sqlProperties.getProperty("DELETE_MARKED_REFERENCES", DELETE_MARKED_REFERENCES);
DELETE_TEMPORARY_MESSAGES = sqlProperties.getProperty("DELETE_TEMPORARY_MESSAGES", DELETE_TEMPORARY_MESSAGES);
DELETE_TEMPORARY_REFERENCES = sqlProperties.getProperty("DELETE_TEMPORARY_REFERENCES", DELETE_TEMPORARY_REFERENCES);
INSERT_TX = sqlProperties.getProperty("INSERT_TX", INSERT_TX);
SELECT_MAX_TX = sqlProperties.getProperty("SELECT_MAX_TX", SELECT_MAX_TX);
SELECT_MESSAGES_IN_DEST = sqlProperties.getProperty("SELECT_MESSAGES_IN_DEST", SELECT_MESSAGES_IN_DEST);
SELECT_REFERENCES_IN_DEST = sqlProperties.getProperty("SELECT_REFERENCES_IN_DEST", SELECT_REFERENCES_IN_DEST);
SELECT_MESSAGE = sqlProperties.getProperty("SELECT_MESSAGE", SELECT_MESSAGE);
INSERT_MESSAGE = sqlProperties.getProperty("INSERT_MESSAGE", INSERT_MESSAGE);
INSERT_REFERENCE = sqlProperties.getProperty("INSERT_REFERENCE", INSERT_REFERENCE);
MARK_MESSAGE = sqlProperties.getProperty("MARK_MESSAGE", MARK_MESSAGE);
MARK_REFERENCE = sqlProperties.getProperty("MARK_REFERENCE", MARK_REFERENCE);
DELETE_MESSAGE = sqlProperties.getProperty("DELETE_MESSAGE", DELETE_MESSAGE);
DELETE_REFERENCE = sqlProperties.getProperty("DELETE_REFERENCE", DELETE_REFERENCE);
UPDATE_MESSAGE = sqlProperties.getProperty("UPDATE_MESSAGE", UPDATE_MESSAGE);
UPDATE_REFERENCE = sqlProperties.getProperty("UPDATE_REFERENCE", UPDATE_REFERENCE);
DELETE_ORPHANED_MESSAGES = sqlProperties.getProperty("DELETE_ORPHANED_MESSAGES", DELETE_ORPHANED_MESSAGES);
DELETE_ALL_TXS = sqlProperties.getProperty("DELETE_ALL_TXS", DELETE_ALL_TXS);
CREATE_REFERENCE_TABLE = sqlProperties.getProperty("CREATE_REFERENCE_TABLE", CREATE_REFERENCE_TABLE);
CREATE_MESSAGE_TABLE = sqlProperties.getProperty("CREATE_MESSAGE_TABLE", CREATE_MESSAGE_TABLE);
CREATE_TX_TABLE = sqlProperties.getProperty("CREATE_TX_TABLE", CREATE_TX_TABLE);
createTables = sqlProperties.getProperty("CREATE_TABLES_ON_STARTUP", "true").equalsIgnoreCase("true");
String s = sqlProperties.getProperty("BLOB_TYPE", "OBJECT_BLOB");
if (s.equals("OBJECT_BLOB"))
blobType = OBJECT_BLOB;
else if (s.equals("BYTES_BLOB"))
blobType = BYTES_BLOB;
else if (s.equals("BINARYSTREAM_BLOB"))
blobType = BINARYSTREAM_BLOB;
else if (s.equals("BLOB_BLOB"))
blobType = BLOB_BLOB;
String dsName = (String) getServer().getAttribute(connectionManagerName, "BindName");
InitialContext ctx = new InitialContext();
datasource = (DataSource) ctx.lookup(dsName);
tm = (TransactionManager) ctx.lookup(TransactionManagerService.JNDI_NAME);
log.debug("Resolving uncommited TXS");
resolveAllUncommitedTXs();
gcThread = new Thread(this, "JBossMQ persistent message garbage collection");
gcThread.setDaemon(true);
gcThread.start();
}
protected void stopService() throws Exception
{
if (gcThread != null)
gcThread.interrupt();
gcThread = null;
}
protected synchronized void resolveAllUncommitedTXs() throws JMSException
{
TransactionManagerStrategy tms = new TransactionManagerStrategy();
tms.startTX();
Connection c = null;
PreparedStatement stmt = null;
ResultSet rs = null;
boolean threadWasInterrupted = Thread.interrupted();
try
{
if (createTables)
{
c = this.getConnection();
try
{
stmt = c.prepareStatement(CREATE_REFERENCE_TABLE);
stmt.executeUpdate();
}
catch (SQLException e)
{
log.debug("Could not create table with SQL: " + CREATE_REFERENCE_TABLE + ", got : " + e);
}
finally
{
try
{
if (stmt != null)
stmt.close();
}
catch (Throwable ignored)
{
log.trace("Ignored: " + ignored);
}
stmt = null;
}
try
{
stmt = c.prepareStatement(CREATE_MESSAGE_TABLE);
stmt.executeUpdate();
}
catch (SQLException e)
{
log.debug("Could not create table with SQL: " + CREATE_MESSAGE_TABLE + ", got : " + e);
}
finally
{
try
{
if (stmt != null)
stmt.close();
}
catch (Throwable ignored)
{
log.trace("Ignored: " + ignored);
}
stmt = null;
}
try
{
stmt = c.prepareStatement(CREATE_TX_TABLE);
stmt.executeUpdate();
}
catch (SQLException e)
{
log.debug("Could not create table with SQL: " + CREATE_TX_TABLE + ", got : " + e);
}
finally
{
try
{
if (stmt != null)
stmt.close();
}
catch (Throwable ignored)
{
log.trace("Ignored: " + ignored);
}
stmt = null;
}
}
}
catch (SQLException e)
{
tms.setRollbackOnly();
throw new SpyJMSException("Could not get a connection for jdbc2 table construction ", e);
}
finally
{
try
{
if (stmt != null)
stmt.close();
}
catch (Throwable ignore)
{
}
stmt = null;
try
{
c.close();
}
catch (Throwable ignore)
{
}
c = null;
tms.endTX();
if( threadWasInterrupted )
Thread.currentThread().interrupt();
}
tms = new TransactionManagerStrategy();
tms.startTX();
threadWasInterrupted = Thread.interrupted();
try
{
c = this.getConnection();
stmt = c.prepareStatement(DELETE_TEMPORARY_MESSAGES);
stmt.executeUpdate();
stmt.close();
stmt = c.prepareStatement(DELETE_MARKED_MESSAGES_WITH_TX);
stmt.setString(1, "A");
stmt.executeUpdate();
stmt.close();
stmt = c.prepareStatement(UPDATE_MARKED_MESSAGES);
stmt.setNull(1, Types.BIGINT);
stmt.setString(2, "A");
stmt.setString(3, "D");
stmt.executeUpdate();
stmt.close();
stmt = c.prepareStatement(DELETE_TEMPORARY_REFERENCES);
stmt.executeUpdate();
stmt.close();
stmt = c.prepareStatement(DELETE_MARKED_REFERENCES_WITH_TX);
stmt.setString(1, "A");
stmt.executeUpdate();
stmt.close();
stmt = c.prepareStatement(UPDATE_MARKED_REFERENCES);
stmt.setNull(1, Types.BIGINT);
stmt.setString(2, "A");
stmt.setString(3, "D");
stmt.executeUpdate();
stmt.close();
stmt = c.prepareStatement(DELETE_ORPHANED_MESSAGES);
stmt.executeUpdate();
stmt.close();
stmt = c.prepareStatement(SELECT_MAX_TX);
rs = stmt.executeQuery();
if (rs.next())
nextTransactionId.set(rs.getLong(1) + 1);
rs.close();
stmt.close();
stmt = c.prepareStatement(DELETE_ALL_TXS);
stmt.executeUpdate();
stmt.close();
}
catch (SQLException e)
{
tms.setRollbackOnly();
throw new SpyJMSException("Could not resolve uncommited transactions. Message recovery may not be accurate", e);
}
finally
{
try
{
rs.close();
}
catch (Throwable ignore)
{
}
try
{
stmt.close();
}
catch (Throwable ignore)
{
}
try
{
c.close();
}
catch (Throwable ignore)
{
}
tms.endTX();
if( threadWasInterrupted )
Thread.currentThread().interrupt();
}
}
protected void removeTXRecord(Connection c, long txid) throws SQLException
{
PreparedStatement stmt = null;
try
{
stmt = c.prepareStatement(DELETE_TX);
stmt.setLong(1, txid);
stmt.executeUpdate();
}
finally
{
try
{
stmt.close();
}
catch (Throwable e)
{
}
}
}
protected void addMessage(Connection c, String queue, SpyMessage message, Tx txId, String mark, String lateClone)
throws SQLException, IOException
{
PreparedStatement stmt = null;
try
{
stmt = c.prepareStatement(INSERT_MESSAGE);
stmt.setLong(1, message.header.messageId);
String dest = "*";
if (queue != null)
dest = queue;
stmt.setString(2, dest);
setBlob(stmt, 3, message);
if (txId != null)
stmt.setLong(4, txId.longValue());
else
stmt.setNull(4, Types.BIGINT);
if (mark == null)
stmt.setNull(5, Types.VARCHAR);
else
stmt.setString(5, mark);
stmt.setString(6, lateClone);
try
{
stmt.executeUpdate();
}
catch (SQLException e)
{
if (lateClone.equals("1"))
log.trace("Assumed already added to message log: " + message.header.messageId);
else
throw e;
}
}
finally
{
try
{
stmt.close();
}
catch (Throwable ignore)
{
}
}
}
protected void addReference(Connection c, String queue, MessageReference message, Tx txId, String mark)
throws SQLException, IOException
{
PreparedStatement stmt = null;
try
{
stmt = c.prepareStatement(INSERT_REFERENCE);
stmt.setLong(1, message.messageId);
stmt.setString(2, queue);
if (txId != null)
stmt.setLong(3, txId.longValue());
else
stmt.setNull(3, Types.BIGINT);
stmt.setString(4, mark);
if (message.redelivered)
stmt.setString(5, "1");
else
stmt.setString(5, "0");
stmt.setLong(6, message.redeliveryCount);
stmt.executeUpdate();
}
finally
{
try
{
stmt.close();
}
catch (Throwable ignore)
{
}
}
}
protected void removeMarkedMessages(Connection c, Tx txid, String mark) throws SQLException
{
PreparedStatement stmt = null;
try
{
stmt = c.prepareStatement(DELETE_MARKED_MESSAGES);
stmt.setLong(1, txid.longValue());
stmt.setString(2, mark);
stmt.executeUpdate();
}
finally
{
try
{
stmt.close();
}
catch (Throwable e)
{
}
}
}
protected void removeMarkedReferences(Connection c, Tx txid, String mark) throws SQLException
{
PreparedStatement stmt = null;
try
{
stmt = c.prepareStatement(DELETE_MARKED_REFERENCES);
if (txid != null)
stmt.setLong(1, txid.longValue());
else
stmt.setNull(1, Types.BIGINT);
stmt.setString(2, mark);
stmt.executeUpdate();
}
finally
{
try
{
stmt.close();
}
catch (Throwable e)
{
}
}
}
protected void setBlob(PreparedStatement stmt, int column, SpyMessage message)
throws IOException, SQLException
{
if (blobType == OBJECT_BLOB)
stmt.setObject(column, message);
else if (blobType == BYTES_BLOB)
{
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
SpyMessage.writeMessage(message,oos);
oos.flush();
byte[] messageAsBytes = baos.toByteArray();
stmt.setBytes(column, messageAsBytes);
}
else if (blobType == BINARYSTREAM_BLOB)
{
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
SpyMessage.writeMessage(message,oos);
oos.flush();
byte[] messageAsBytes = baos.toByteArray();
ByteArrayInputStream bais = new ByteArrayInputStream(messageAsBytes);
stmt.setBinaryStream(column, bais, messageAsBytes.length);
}
else if (blobType == BLOB_BLOB)
{
throw new RuntimeException("BLOB_TYPE: BLOB_BLOB is not yet implemented.");
}
}
protected SpyMessage extractMessage(ResultSet rs, int column) throws SQLException, IOException
{
long messageid = rs.getLong(1);
SpyMessage message = null;
if (blobType == OBJECT_BLOB)
{
message = (SpyMessage) rs.getObject(column);
}
else if (blobType == BYTES_BLOB)
{
byte[] st = rs.getBytes(column);
ByteArrayInputStream baip = new ByteArrayInputStream(st);
ObjectInputStream ois = new ObjectInputStream(baip);
message = SpyMessage.readMessage(ois);
}
else if (blobType == BINARYSTREAM_BLOB)
{
ObjectInputStream ois = new ObjectInputStream(rs.getBinaryStream(column));
message = SpyMessage.readMessage(ois);
}
else if (blobType == BLOB_BLOB)
{
ObjectInputStream ois = new ObjectInputStream(rs.getBlob(column).getBinaryStream());
message = SpyMessage.readMessage(ois);
}
else throw new IllegalStateException();
message.header.messageId = messageid;
return message;
}
protected Connection getConnection() throws SQLException
{
int attempts = this.connectionRetryAttempts;
int attemptCount = 0;
SQLException sqlException = null;
while (attempts-- > 0)
{
if (++attemptCount > 1)
log.debug("Retrying connection: attempt # " + attemptCount);
try
{
sqlException = null;
return datasource.getConnection();
}
catch (SQLException exception)
{
log.debug("Connection attempt # " + attemptCount + " failed with SQLException", exception);
sqlException = exception;
}
finally
{
if (sqlException == null && attemptCount > 1)
log.debug("Connection succeeded on attempt # " + attemptCount);
}
if (attempts > 0)
{
try
{
Thread.sleep(1500);
}
catch(InterruptedException interruptedException)
{
break;
}
}
}
if (sqlException != null)
throw sqlException;
throw new SQLException("connection attempt interrupted");
}
class TransactionManagerStrategy
{
Transaction threadTx;
void startTX() throws JMSException
{
try
{
threadTx = tm.suspend();
tm.begin();
}
catch (Exception e)
{
try
{
if (threadTx != null)
tm.resume(threadTx);
}
catch (Exception ignore)
{
}
throw new SpyJMSException("Could not start a transaction with the transaction manager.", e);
}
}
void setRollbackOnly() throws JMSException
{
try
{
tm.setRollbackOnly();
}
catch (Exception e)
{
throw new SpyJMSException("Could not start a mark the transaction for rollback .", e);
}
}
void endTX() throws JMSException
{
try
{
if (tm.getStatus() == Status.STATUS_MARKED_ROLLBACK)
{
tm.rollback();
}
else
{
tm.commit();
}
}
catch (Exception e)
{
throw new SpyJMSException("Could not start a transaction with the transaction manager.", e);
}
finally
{
try
{
if (threadTx != null)
tm.resume(threadTx);
}
catch (Exception ignore)
{
}
}
}
}
}