package org.jboss.mq.pm.jdbc2;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.StreamCorruptedException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Properties;
import javax.jms.JMSException;
import javax.management.ObjectName;
import javax.naming.InitialContext;
import javax.sql.DataSource;
import javax.transaction.Status;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import org.jboss.mq.SpyDestination;
import org.jboss.mq.SpyJMSException;
import org.jboss.mq.SpyMessage;
import org.jboss.mq.SpyTopic;
import org.jboss.mq.pm.Tx;
import org.jboss.mq.pm.TxManager;
import org.jboss.mq.server.JMSDestination;
import org.jboss.mq.server.MessageCache;
import org.jboss.mq.server.MessageReference;
import org.jboss.system.ServiceMBeanSupport;
import org.jboss.tm.TransactionManagerService;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong;
public class PersistenceManager
extends ServiceMBeanSupport
implements PersistenceManagerMBean, org.jboss.mq.pm.PersistenceManager, org.jboss.mq.pm.CacheStore
{
private SynchronizedLong nextTransactionId = new SynchronizedLong(0l);
private TxManager txManager;
private DataSource datasource;
private TransactionManager tm;
String UPDATE_MARKED_MESSAGES = "UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE TXOP=?";
String UPDATE_MARKED_MESSAGES_WITH_TX = "UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE TXOP=? AND TXID=?";
String DELETE_MARKED_MESSAGES_WITH_TX =
"DELETE FROM JMS_MESSAGES WHERE TXID IN (SELECT TXID FROM JMS_TRANSACTIONS) AND TXOP=?";
String DELETE_TX = "DELETE FROM JMS_TRANSACTIONS WHERE TXID = ?";
String DELETE_MARKED_MESSAGES = "DELETE FROM JMS_MESSAGES WHERE TXID=? AND TXOP=?";
String DELETE_TEMPORARY_MESSAGES = "DELETE FROM JMS_MESSAGES WHERE TXOP = 'T'";
String INSERT_TX = "INSERT INTO JMS_TRANSACTIONS (TXID) values(?)";
String SELECT_MAX_TX = "SELECT MAX(TXID) FROM JMS_MESSAGES";
String SELECT_MESSAGES_IN_DEST = "SELECT MESSAGEID, MESSAGEBLOB FROM JMS_MESSAGES WHERE DESTINATION=?";
String SELECT_MESSAGE = "SELECT MESSAGEID, MESSAGEBLOB FROM JMS_MESSAGES WHERE MESSAGEID=? AND DESTINATION=?";
String INSERT_MESSAGE =
"INSERT INTO JMS_MESSAGES (MESSAGEID, DESTINATION, MESSAGEBLOB, TXID, TXOP) VALUES(?,?,?,?,?)";
String MARK_MESSAGE = "UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE MESSAGEID=? AND DESTINATION=?";
String DELETE_MESSAGE = "DELETE FROM JMS_MESSAGES WHERE MESSAGEID=? AND DESTINATION=?";
String UPDATE_MESSAGE = "UPDATE JMS_MESSAGES SET MESSAGEBLOB=? WHERE MESSAGEID=? AND DESTINATION=?";
String CREATE_MESSAGE_TABLE =
"CREATE TABLE JMS_MESSAGES ( MESSAGEID INTEGER NOT NULL, "
+ "DESTINATION VARCHAR(32) NOT NULL, TXID INTEGER, TXOP CHAR(1),"
+ "MESSAGEBLOB OBJECT, PRIMARY KEY (MESSAGEID, DESTINATION) )";
String CREATE_IDX_MESSAGE_TXOP_TXID = "CREATE INDEX JMS_MESSAGES_TXOP_TXID ON JMS_MESSAGES (TXOP, TXID)";
String CREATE_IDX_MESSAGE_DESTINATION = "CREATE INDEX JMS_MESSAGES_DESTINATION ON JMS_MESSAGES (DESTINATION)";
String CREATE_TX_TABLE = "CREATE TABLE JMS_TRANSACTIONS ( TXID INTEGER, PRIMARY KEY (TXID) )";
static final int OBJECT_BLOB = 0;
static final int BYTES_BLOB = 1;
static final int BINARYSTREAM_BLOB = 2;
static final int BLOB_BLOB = 3;
int blobType = OBJECT_BLOB;
boolean createTables;
private int connectionRetryAttempts = 5;
public PersistenceManager() throws javax.jms.JMSException
{
txManager = new TxManager(this);
}
class TransactionManagerStrategy
{
Transaction threadTx;
void startTX() throws JMSException
{
try
{
threadTx = tm.suspend();
tm.begin();
}
catch (Exception e)
{
try
{
if (threadTx != null)
tm.resume(threadTx);
}
catch (Exception ignore)
{
}
throw new SpyJMSException("Could not start a transaction with the transaction manager.", e);
}
}
void setRollbackOnly() throws JMSException
{
try
{
tm.setRollbackOnly();
}
catch (Exception e)
{
throw new SpyJMSException("Could not start a mark the transaction for rollback .", e);
}
}
void endTX() throws JMSException
{
try
{
if (tm.getStatus() == Status.STATUS_MARKED_ROLLBACK)
{
tm.rollback();
}
else
{
tm.commit();
}
}
catch (Exception e)
{
throw new SpyJMSException("Could not start a transaction with the transaction manager.", e);
}
finally
{
try
{
if (threadTx != null)
tm.resume(threadTx);
}
catch (Exception ignore)
{
}
}
}
}
synchronized public void resolveAllUncommitedTXs() throws JMSException
{
TransactionManagerStrategy tms = new TransactionManagerStrategy();
tms.startTX();
Connection c = null;
PreparedStatement stmt = null;
ResultSet rs = null;
boolean threadWasInterrupted = Thread.interrupted();
try
{
if (createTables)
{
c = this.getConnection();
boolean createdMessageTable = false;
try
{
stmt = c.prepareStatement(CREATE_MESSAGE_TABLE);
stmt.executeUpdate();
createdMessageTable = true;
}
catch (SQLException e)
{
log.debug("Could not create table with SQL: " + CREATE_MESSAGE_TABLE, e);
}
finally
{
try
{
if (stmt != null)
stmt.close();
}
catch (Throwable ignored)
{
log.trace("Ignored: " + ignored);
}
stmt = null;
}
if (createdMessageTable)
{
try
{
stmt = c.prepareStatement(CREATE_IDX_MESSAGE_TXOP_TXID);
stmt.executeUpdate();
}
catch (SQLException e)
{
log.debug("Could not create index with SQL: " + CREATE_IDX_MESSAGE_TXOP_TXID, e);
}
finally
{
try
{
if (stmt != null)
stmt.close();
}
catch (Throwable ignored)
{
log.trace("Ignored: " + ignored);
}
stmt = null;
}
try
{
stmt = c.prepareStatement(CREATE_IDX_MESSAGE_DESTINATION);
stmt.executeUpdate();
}
catch (SQLException e)
{
log.debug("Could not create index with SQL: " + CREATE_IDX_MESSAGE_DESTINATION, e);
}
finally
{
try
{
if (stmt != null)
stmt.close();
}
catch (Throwable ignored)
{
log.trace("Ignored: " + ignored);
}
stmt = null;
}
}
try
{
stmt = c.prepareStatement(CREATE_TX_TABLE);
stmt.executeUpdate();
}
catch (SQLException e)
{
log.debug("Could not create table with SQL: " + CREATE_TX_TABLE, e);
}
finally
{
try
{
if (stmt != null)
stmt.close();
}
catch (Throwable ignored)
{
log.trace("Ignored: " + ignored);
}
stmt = null;
}
}
}
catch (SQLException e)
{
tms.setRollbackOnly();
throw new SpyJMSException("Could not get a connection for jdbc2 table construction ", e);
}
finally
{
try
{
if (stmt != null)
stmt.close();
}
catch (Throwable ignore)
{
}
stmt = null;
try
{
if (c != null)
c.close();
}
catch (Throwable ignore)
{
}
c = null;
tms.endTX();
if (threadWasInterrupted)
Thread.currentThread().interrupt();
}
tms = new TransactionManagerStrategy();
tms.startTX();
threadWasInterrupted = Thread.interrupted();
try
{
c = this.getConnection();
stmt = c.prepareStatement(SELECT_MAX_TX);
rs = stmt.executeQuery();
if (rs.next())
nextTransactionId.set(rs.getLong(1) + 1);
stmt.close();
stmt = c.prepareStatement(DELETE_TEMPORARY_MESSAGES);
stmt.executeUpdate();
stmt.close();
stmt = c.prepareStatement(DELETE_MARKED_MESSAGES_WITH_TX);
stmt.setString(1, "A");
stmt.executeUpdate();
stmt.close();
stmt = c.prepareStatement(UPDATE_MARKED_MESSAGES);
stmt.setNull(1, java.sql.Types.BIGINT);
stmt.setString(2, "A");
stmt.setString(3, "D");
stmt.executeUpdate();
stmt.close();
}
catch (SQLException e)
{
tms.setRollbackOnly();
throw new SpyJMSException(
"Could not resolve uncommited transactions. Message recovery may not be accurate",
e);
}
finally
{
try
{
if (rs != null)
rs.close();
}
catch (Throwable ignore)
{
}
try
{
if (stmt != null)
stmt.close();
}
catch (Throwable ignore)
{
}
try
{
if (c != null)
c.close();
}
catch (Throwable ignore)
{
}
tms.endTX();
if (threadWasInterrupted)
Thread.currentThread().interrupt();
}
}
synchronized public void restoreQueue(JMSDestination jmsDest, SpyDestination dest) throws javax.jms.JMSException
{
if (jmsDest == null)
throw new IllegalArgumentException("Must supply non null JMSDestination to restoreQueue");
if (dest == null)
throw new IllegalArgumentException("Must supply non null SpyDestination to restoreQueue");
TransactionManagerStrategy tms = new TransactionManagerStrategy();
tms.startTX();
Connection c = null;
PreparedStatement stmt = null;
ResultSet rs = null;
boolean threadWasInterrupted = Thread.interrupted();
try
{
c = this.getConnection();
stmt = c.prepareStatement(SELECT_MESSAGES_IN_DEST);
stmt.setString(1, dest.toString());
rs = stmt.executeQuery();
int counter = 0;
while (rs.next())
{
SpyMessage message = extractMessage(rs);
if (dest instanceof SpyTopic)
message.header.durableSubscriberID = ((SpyTopic) dest).getDurableSubscriptionID();
jmsDest.restoreMessage(message);
counter++;
}
log.debug("Restored " + counter + " message(s) to: " + dest);
}
catch (IOException e)
{
tms.setRollbackOnly();
throw new SpyJMSException("Could not restore messages to destination : " + dest.toString(), e);
}
catch (SQLException e)
{
tms.setRollbackOnly();
throw new SpyJMSException("Could not restore messages to destination : " + dest.toString(), e);
}
finally
{
try
{
rs.close();
}
catch (Throwable ignore)
{
}
try
{
stmt.close();
}
catch (Throwable ignore)
{
}
try
{
c.close();
}
catch (Throwable ignore)
{
}
tms.endTX();
if (threadWasInterrupted)
Thread.currentThread().interrupt();
}
}
SpyMessage extractMessage(ResultSet rs) throws SQLException, IOException
{
try
{
long messageid = rs.getLong(1);
SpyMessage message = null;
if (blobType == OBJECT_BLOB)
{
message = (SpyMessage) rs.getObject(2);
}
else if (blobType == BYTES_BLOB)
{
byte[] st = rs.getBytes(2);
ByteArrayInputStream baip = new ByteArrayInputStream(st);
ObjectInputStream ois = new ObjectInputStream(baip);
message = SpyMessage.readMessage(ois);
}
else if (blobType == BINARYSTREAM_BLOB)
{
ObjectInputStream ois = new ObjectInputStream(rs.getBinaryStream(2));
message = SpyMessage.readMessage(ois);
}
else if (blobType == BLOB_BLOB)
{
ObjectInputStream ois = new ObjectInputStream(rs.getBlob(2).getBinaryStream());
message = SpyMessage.readMessage(ois);
}
message.header.messageId = messageid;
return message;
}
catch (StreamCorruptedException e)
{
throw new IOException("Could not load the message: " + e);
}
}
public void commitPersistentTx(Tx txId) throws javax.jms.JMSException
{
if (txId.wasPersisted() == false)
return;
TransactionManagerStrategy tms = new TransactionManagerStrategy();
tms.startTX();
Connection c = null;
boolean threadWasInterrupted = Thread.interrupted();
try
{
c = this.getConnection();
removeMarkedMessages(c, txId, "D");
removeTXRecord(c, txId.longValue());
}
catch (SQLException e)
{
tms.setRollbackOnly();
throw new SpyJMSException("Could not commit tx: " + txId, e);
}
finally
{
try
{
c.close();
}
catch (Throwable ignore)
{
}
tms.endTX();
if (threadWasInterrupted)
Thread.currentThread().interrupt();
}
}
public void removeMarkedMessages(Connection c, Tx txid, String mark) throws SQLException
{
PreparedStatement stmt = null;
try
{
stmt = c.prepareStatement(DELETE_MARKED_MESSAGES);
stmt.setLong(1, txid.longValue());
stmt.setString(2, mark);
stmt.executeUpdate();
}
finally
{
try
{
stmt.close();
}
catch (Throwable e)
{
}
}
}
public void addTXRecord(Connection c, long txid) throws SQLException
{
PreparedStatement stmt = null;
try
{
stmt = c.prepareStatement(INSERT_TX);
stmt.setLong(1, txid);
stmt.executeUpdate();
}
finally
{
try
{
stmt.close();
}
catch (Throwable e)
{
}
}
}
public void removeTXRecord(Connection c, long txid) throws SQLException
{
PreparedStatement stmt = null;
try
{
stmt = c.prepareStatement(DELETE_TX);
stmt.setLong(1, txid);
stmt.executeUpdate();
}
finally
{
try
{
stmt.close();
}
catch (Throwable e)
{
}
}
}
public void rollbackPersistentTx(Tx txId) throws JMSException
{
if (txId.wasPersisted() == false)
return;
TransactionManagerStrategy tms = new TransactionManagerStrategy();
tms.startTX();
Connection c = null;
PreparedStatement stmt = null;
boolean threadWasInterrupted = Thread.interrupted();
try
{
c = this.getConnection();
removeMarkedMessages(c, txId, "A");
removeTXRecord(c, txId.longValue());
stmt = c.prepareStatement(UPDATE_MARKED_MESSAGES_WITH_TX);
stmt.setNull(1, java.sql.Types.BIGINT);
stmt.setString(2, "A");
stmt.setString(3, "D");
stmt.setLong(4, txId.longValue());
stmt.executeUpdate();
stmt.close();
}
catch (SQLException e)
{
tms.setRollbackOnly();
throw new SpyJMSException("Could not rollback tx: " + txId, e);
}
finally
{
try
{
if (stmt != null)
stmt.close();
if (c != null)
c.close();
}
catch (Throwable ignore)
{
}
tms.endTX();
if (threadWasInterrupted)
Thread.currentThread().interrupt();
}
}
public Tx createPersistentTx() throws JMSException
{
Tx id = new Tx(nextTransactionId.increment());
return id;
}
public void insertPersistentTx(TransactionManagerStrategy tms, Connection c, Tx tx) throws JMSException
{
try
{
if (tx != null && tx.checkPersisted() == false)
addTXRecord(c, tx.longValue());
}
catch (SQLException e)
{
tms.setRollbackOnly();
throw new SpyJMSException("Could not create tx: " + tx.longValue(), e);
}
}
public void add(MessageReference messageRef, Tx txId) throws javax.jms.JMSException
{
boolean trace = log.isTraceEnabled();
if (trace)
log.trace("About to add message " + messageRef + " transaction=" + txId);
TransactionManagerStrategy tms = new TransactionManagerStrategy();
tms.startTX();
Connection c = null;
boolean threadWasInterrupted = Thread.interrupted();
try
{
c = this.getConnection();
insertPersistentTx(tms, c, txId);
synchronized (messageRef)
{
SpyMessage message = messageRef.getMessage();
if (messageRef.stored == MessageReference.STORED)
{
if (trace)
log.trace("Updating message " + messageRef + " transaction=" + txId);
markMessage(c, messageRef.messageId, messageRef.getPersistentKey(), txId, "A");
}
else
{
if (trace)
log.trace("Inserting message " + messageRef + " transaction=" + txId);
add(c, messageRef.getPersistentKey(), message, txId, "A");
messageRef.setStored(MessageReference.STORED);
}
if (trace)
log.trace("Added message " + messageRef + " transaction=" + txId);
}
}
catch (IOException e)
{
tms.setRollbackOnly();
throw new SpyJMSException("Could not store message: " + messageRef, e);
}
catch (SQLException e)
{
tms.setRollbackOnly();
throw new SpyJMSException("Could not store message: " + messageRef, e);
}
finally
{
try
{
c.close();
}
catch (Throwable ignore)
{
}
tms.endTX();
if (threadWasInterrupted)
Thread.currentThread().interrupt();
}
}
protected void add(Connection c, String queue, SpyMessage message, Tx txId, String mark)
throws SQLException, IOException
{
PreparedStatement stmt = null;
try
{
stmt = c.prepareStatement(INSERT_MESSAGE);
stmt.setLong(1, message.header.messageId);
stmt.setString(2, queue);
setBlob(stmt, 3, message);
if (txId != null)
stmt.setLong(4, txId.longValue());
else
stmt.setNull(4, java.sql.Types.BIGINT);
stmt.setString(5, mark);
stmt.executeUpdate();
}
finally
{
try
{
stmt.close();
}
catch (Throwable ignore)
{
}
}
}
public void markMessage(Connection c, long messageid, String destination, Tx txId, String mark)
throws SQLException
{
PreparedStatement stmt = null;
try
{
stmt = c.prepareStatement(MARK_MESSAGE);
if (txId == null)
{
stmt.setNull(1, java.sql.Types.BIGINT);
}
else
{
stmt.setLong(1, txId.longValue());
}
stmt.setString(2, mark);
stmt.setLong(3, messageid);
stmt.setString(4, destination);
stmt.executeUpdate();
}
finally
{
try
{
stmt.close();
}
catch (Throwable ignore)
{
}
}
}
public void setBlob(PreparedStatement stmt, int column, SpyMessage message) throws IOException, SQLException
{
if (blobType == OBJECT_BLOB)
{
stmt.setObject(column, message);
}
else if (blobType == BYTES_BLOB)
{
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
SpyMessage.writeMessage(message, oos);
oos.flush();
byte[] messageAsBytes = baos.toByteArray();
stmt.setBytes(column, messageAsBytes);
}
else if (blobType == BINARYSTREAM_BLOB)
{
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
SpyMessage.writeMessage(message, oos);
oos.flush();
byte[] messageAsBytes = baos.toByteArray();
ByteArrayInputStream bais = new ByteArrayInputStream(messageAsBytes);
stmt.setBinaryStream(column, bais, messageAsBytes.length);
}
else if (blobType == BLOB_BLOB)
{
throw new RuntimeException("BLOB_TYPE: BLOB_BLOB is not yet implemented.");
}
}
public void update(MessageReference messageRef, Tx txId) throws javax.jms.JMSException
{
boolean trace = log.isTraceEnabled();
if (trace)
log.trace("Updating message " + messageRef + " transaction=" + txId);
TransactionManagerStrategy tms = new TransactionManagerStrategy();
tms.startTX();
Connection c = null;
PreparedStatement stmt = null;
boolean threadWasInterrupted = Thread.interrupted();
try
{
c = this.getConnection();
if (txId == null)
{
stmt = c.prepareStatement(UPDATE_MESSAGE);
setBlob(stmt, 1, messageRef.getMessage());
stmt.setLong(2, messageRef.messageId);
stmt.setString(3, messageRef.getPersistentKey());
int rc = stmt.executeUpdate();
if (rc != 1)
throw new SpyJMSException(
"Could not update the message in the database: update affected " + rc + " rows");
}
else
{
throw new SpyJMSException("NYI: Updating a message in a transaction is not currently used");
}
if (trace)
log.trace("Updated message " + messageRef + " transaction=" + txId);
}
catch (IOException e)
{
tms.setRollbackOnly();
throw new SpyJMSException("Could not update message: " + messageRef, e);
}
catch (SQLException e)
{
tms.setRollbackOnly();
throw new SpyJMSException("Could not update message: " + messageRef, e);
}
finally
{
try
{
stmt.close();
}
catch (Throwable ignore)
{
}
try
{
c.close();
}
catch (Throwable ignore)
{
}
tms.endTX();
if (threadWasInterrupted)
Thread.currentThread().interrupt();
}
}
public void remove(MessageReference messageRef, Tx txId) throws javax.jms.JMSException
{
boolean trace = log.isTraceEnabled();
if (trace)
log.trace("Removing message " + messageRef + " transaction=" + txId);
TransactionManagerStrategy tms = new TransactionManagerStrategy();
tms.startTX();
Connection c = null;
PreparedStatement stmt = null;
boolean threadWasInterrupted = Thread.interrupted();
try
{
c = this.getConnection();
insertPersistentTx(tms, c, txId);
synchronized (messageRef)
{
if (txId == null)
{
stmt = c.prepareStatement(DELETE_MESSAGE);
stmt.setLong(1, messageRef.messageId);
stmt.setString(2, messageRef.getPersistentKey());
int rc = stmt.executeUpdate();
if (rc != 1)
throw new SpyJMSException(
"Could not delete the message from the database: delete affected " + rc + " rows");
messageRef.setStored(MessageReference.NOT_STORED);
messageRef.removeDelayed();
}
else
{
stmt = c.prepareStatement(MARK_MESSAGE);
stmt.setLong(1, txId.longValue());
stmt.setString(2, "D");
stmt.setLong(3, messageRef.messageId);
stmt.setString(4, messageRef.getPersistentKey());
int rc = stmt.executeUpdate();
if (rc != 1)
throw new SpyJMSException(
"Could not mark the message as deleted in the database: update affected " + rc + " rows");
}
if (trace)
log.trace("Removed message " + messageRef + " transaction=" + txId);
}
}
catch (SQLException e)
{
tms.setRollbackOnly();
throw new SpyJMSException("Could not remove message: " + messageRef, e);
}
finally
{
try
{
stmt.close();
}
catch (Throwable ignore)
{
}
try
{
c.close();
}
catch (Throwable ignore)
{
}
tms.endTX();
if (threadWasInterrupted)
Thread.currentThread().interrupt();
}
}
public TxManager getTxManager()
{
return txManager;
}
public void closeQueue(JMSDestination jmsDest, SpyDestination dest) throws JMSException
{
}
public SpyMessage loadFromStorage(MessageReference messageRef) throws JMSException
{
if (log.isTraceEnabled())
log.trace("Loading message from storage " + messageRef);
TransactionManagerStrategy tms = new TransactionManagerStrategy();
tms.startTX();
Connection c = null;
PreparedStatement stmt = null;
ResultSet rs = null;
boolean threadWasInterrupted = Thread.interrupted();
try
{
c = this.getConnection();
stmt = c.prepareStatement(SELECT_MESSAGE);
stmt.setLong(1, messageRef.messageId);
stmt.setString(2, messageRef.getPersistentKey());
rs = stmt.executeQuery();
if (rs.next())
return extractMessage(rs);
return null;
}
catch (IOException e)
{
tms.setRollbackOnly();
throw new SpyJMSException("Could not load message : " + messageRef, e);
}
catch (SQLException e)
{
tms.setRollbackOnly();
throw new SpyJMSException("Could not load message : " + messageRef, e);
}
finally
{
try
{
rs.close();
}
catch (Throwable ignore)
{
}
try
{
stmt.close();
}
catch (Throwable ignore)
{
}
try
{
c.close();
}
catch (Throwable ignore)
{
}
tms.endTX();
if (threadWasInterrupted)
Thread.currentThread().interrupt();
}
}
public void removeFromStorage(MessageReference messageRef) throws JMSException
{
if (messageRef.isPersistent())
return;
boolean trace = log.isTraceEnabled();
if (trace)
log.trace("Removing message from storage " + messageRef);
TransactionManagerStrategy tms = new TransactionManagerStrategy();
tms.startTX();
Connection c = null;
PreparedStatement stmt = null;
boolean threadWasInterrupted = Thread.interrupted();
try
{
c = this.getConnection();
stmt = c.prepareStatement(DELETE_MESSAGE);
stmt.setLong(1, messageRef.messageId);
stmt.setString(2, messageRef.getPersistentKey());
stmt.executeUpdate();
messageRef.setStored(MessageReference.NOT_STORED);
if (trace)
log.trace("Removed message from storage " + messageRef);
}
catch (SQLException e)
{
tms.setRollbackOnly();
throw new SpyJMSException("Could not remove message: " + messageRef, e);
}
finally
{
try
{
stmt.close();
}
catch (Throwable ignore)
{
}
try
{
c.close();
}
catch (Throwable ignore)
{
}
tms.endTX();
if (threadWasInterrupted)
Thread.currentThread().interrupt();
}
}
public void saveToStorage(MessageReference messageRef, SpyMessage message) throws JMSException
{
if (messageRef.isPersistent())
return;
boolean trace = log.isTraceEnabled();
if (trace)
log.trace("Saving message to storage " + messageRef);
TransactionManagerStrategy tms = new TransactionManagerStrategy();
tms.startTX();
Connection c = null;
boolean threadWasInterrupted = Thread.interrupted();
try
{
c = this.getConnection();
add(c, messageRef.getPersistentKey(), message, null, "T");
messageRef.setStored(MessageReference.STORED);
if (trace)
log.trace("Saved message to storage " + messageRef);
}
catch (IOException e)
{
tms.setRollbackOnly();
throw new SpyJMSException("Could not store message: " + messageRef, e);
}
catch (SQLException e)
{
tms.setRollbackOnly();
throw new SpyJMSException("Could not store message: " + messageRef, e);
}
finally
{
try
{
c.close();
}
catch (Throwable ignore)
{
}
tms.endTX();
if (threadWasInterrupted)
Thread.currentThread().interrupt();
}
}
private Connection getConnection() throws SQLException
{
int attempts = this.connectionRetryAttempts;
int attemptCount = 0;
SQLException sqlException = null;
while (attempts-- > 0)
{
if (++attemptCount > 1)
{
log.debug("Retrying connection: attempt # " + attemptCount);
}
try
{
sqlException = null;
return datasource.getConnection();
}
catch (SQLException exception)
{
log.debug("Connection attempt # " + attemptCount + " failed with SQLException", exception);
sqlException = exception;
}
finally
{
if (sqlException == null && attemptCount > 1)
{
log.debug("Connection succeeded on attempt # " + attemptCount);
}
}
if (attempts > 0)
{
try
{
Thread.sleep(1500);
}
catch (InterruptedException interruptedException)
{
break;
}
}
}
if (sqlException != null)
{
throw sqlException;
}
throw new SQLException("connection attempt interrupted");
}
private ObjectName connectionManagerName;
private Properties sqlProperties = new Properties();
public void startService() throws Exception
{
UPDATE_MARKED_MESSAGES = sqlProperties.getProperty("UPDATE_MARKED_MESSAGES", UPDATE_MARKED_MESSAGES);
UPDATE_MARKED_MESSAGES_WITH_TX =
sqlProperties.getProperty("UPDATE_MARKED_MESSAGES_WITH_TX", UPDATE_MARKED_MESSAGES_WITH_TX);
DELETE_MARKED_MESSAGES_WITH_TX =
sqlProperties.getProperty("DELETE_MARKED_MESSAGES_WITH_TX", DELETE_MARKED_MESSAGES_WITH_TX);
DELETE_TX = sqlProperties.getProperty("DELETE_TX", DELETE_TX);
DELETE_MARKED_MESSAGES = sqlProperties.getProperty("DELETE_MARKED_MESSAGES", DELETE_MARKED_MESSAGES);
DELETE_TEMPORARY_MESSAGES = sqlProperties.getProperty("DELETE_TEMPORARY_MESSAGES", DELETE_TEMPORARY_MESSAGES);
INSERT_TX = sqlProperties.getProperty("INSERT_TX", INSERT_TX);
SELECT_MAX_TX = sqlProperties.getProperty("SELECT_MAX_TX", SELECT_MAX_TX);
SELECT_MESSAGES_IN_DEST = sqlProperties.getProperty("SELECT_MESSAGES_IN_DEST", SELECT_MESSAGES_IN_DEST);
SELECT_MESSAGE = sqlProperties.getProperty("SELECT_MESSAGE", SELECT_MESSAGE);
INSERT_MESSAGE = sqlProperties.getProperty("INSERT_MESSAGE", INSERT_MESSAGE);
MARK_MESSAGE = sqlProperties.getProperty("MARK_MESSAGE", MARK_MESSAGE);
DELETE_MESSAGE = sqlProperties.getProperty("DELETE_MESSAGE", DELETE_MESSAGE);
UPDATE_MESSAGE = sqlProperties.getProperty("UPDATE_MESSAGE", UPDATE_MESSAGE);
CREATE_MESSAGE_TABLE = sqlProperties.getProperty("CREATE_MESSAGE_TABLE", CREATE_MESSAGE_TABLE);
CREATE_IDX_MESSAGE_TXOP_TXID = sqlProperties.getProperty("CREATE_IDX_MESSAGE_TXOP_TXID", CREATE_IDX_MESSAGE_TXOP_TXID);
CREATE_IDX_MESSAGE_DESTINATION = sqlProperties.getProperty("CREATE_IDX_MESSAGE_DESTINATION", CREATE_IDX_MESSAGE_DESTINATION);
CREATE_TX_TABLE = sqlProperties.getProperty("CREATE_TX_TABLE", CREATE_TX_TABLE);
createTables = sqlProperties.getProperty("CREATE_TABLES_ON_STARTUP", "true").equalsIgnoreCase("true");
String s = sqlProperties.getProperty("BLOB_TYPE", "OBJECT_BLOB");
if (s.equals("OBJECT_BLOB"))
{
blobType = OBJECT_BLOB;
}
else if (s.equals("BYTES_BLOB"))
{
blobType = BYTES_BLOB;
}
else if (s.equals("BINARYSTREAM_BLOB"))
{
blobType = BINARYSTREAM_BLOB;
}
else if (s.equals("BLOB_BLOB"))
{
blobType = BLOB_BLOB;
}
String dsName = (String) getServer().getAttribute(connectionManagerName, "BindName");
InitialContext ctx = new InitialContext();
datasource = (DataSource) ctx.lookup(dsName);
tm = (TransactionManager) ctx.lookup(TransactionManagerService.JNDI_NAME);
log.debug("Resolving uncommited TXS");
resolveAllUncommitedTXs();
}
public Object getInstance()
{
return this;
}
public ObjectName getMessageCache()
{
throw new UnsupportedOperationException("This is now set on the destination manager");
}
public void setMessageCache(ObjectName messageCache)
{
throw new UnsupportedOperationException("This is now set on the destination manager");
}
public ObjectName getConnectionManager()
{
return connectionManagerName;
}
public void setConnectionManager(ObjectName connectionManagerName)
{
this.connectionManagerName = connectionManagerName;
}
public MessageCache getMessageCacheInstance()
{
throw new UnsupportedOperationException("This is now set on the destination manager");
}
public String getSqlProperties()
{
try
{
ByteArrayOutputStream boa = new ByteArrayOutputStream();
sqlProperties.store(boa, "");
return new String(boa.toByteArray());
}
catch (IOException shouldnothappen)
{
return "";
}
}
public void setSqlProperties(String value)
{
try
{
ByteArrayInputStream is = new ByteArrayInputStream(value.getBytes());
sqlProperties = new Properties();
sqlProperties.load(is);
}
catch (IOException shouldnothappen)
{
}
}
public void setConnectionRetryAttempts(int value)
{
this.connectionRetryAttempts = value;
}
public int getConnectionRetryAttempts()
{
return this.connectionRetryAttempts;
}
}