/*
 * JBossMQ, the OpenSource JMS implementation
 *
 * Distributable under LGPL license.
 * See terms of license at gnu.org.
 */
package org.jboss.mq.pm.jdbc3;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.util.Properties;

import javax.jms.JMSException;
import javax.management.ObjectName;
import javax.naming.InitialContext;
import javax.sql.DataSource;
import javax.transaction.Status;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;

import org.jboss.mq.DurableSubscriptionID;
import org.jboss.mq.SpyDestination;
import org.jboss.mq.SpyJMSException;
import org.jboss.mq.SpyMessage;
import org.jboss.mq.SpyTopic;
import org.jboss.mq.pm.CacheStore;
import org.jboss.mq.pm.NewPersistenceManager;
import org.jboss.mq.pm.Tx;
import org.jboss.mq.pm.TxManager;
import org.jboss.mq.server.JMSDestination;
import org.jboss.mq.server.JMSTopic;
import org.jboss.mq.server.MessageCache;
import org.jboss.mq.server.MessageReference;
import org.jboss.system.ServiceMBeanSupport;
import org.jboss.tm.TransactionManagerService;

import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong;

/**
 * This class manages all persistence related services for JDBC based
 * persistence.
 *
 * @jmx:mbean extends="org.jboss.system.ServiceMBean, org.jboss.mq.pm.PersistenceManagerMBean, org.jboss.mq.pm.CacheStoreMBean"
 *
 * @author Jayesh Parayali (jayeshpk1@yahoo.com)
 * @author Hiram Chirino (cojonudo14@hotmail.com)
 * @author Adrian Brock (adrian@jboss.org)
 *
 *  @version $Revision: 1.6.4.3 $
 */
public class PersistenceManager
   extends ServiceMBeanSupport
   implements PersistenceManagerMBean, NewPersistenceManager, CacheStore, Runnable
{
   // Constants --------------------------------------------------------------------

   /** Message is an object */
   static final int OBJECT_BLOB = 0;

   /** Message is a byte array */
   static final int BYTES_BLOB = 1;

   /** Message is a binary stream */
   static final int BINARYSTREAM_BLOB = 2;

   /** Message is a blob */
   static final int BLOB_BLOB = 3;

   // Attributes -------------------------------------------------------------------

   /** The next transaction id */
   private SynchronizedLong nextTransactionId = new SynchronizedLong(0l);
   
   /** The jbossmq transaction manager */
   private TxManager txManager;

   /** The data source */
   private DataSource datasource;

   /** The jta transaction manager */
   private TransactionManager tm;
   
   /** The object name of the connection manager */
   private ObjectName connectionManagerName;
   
   /** The sql properties */
   private Properties sqlProperties = new Properties();

   String UPDATE_MARKED_MESSAGES = "UPDATE JMS_MESSAGE_LOG SET TXID=?, TXOP=? WHERE TXOP=?";
   String UPDATE_MARKED_REFERENCES = "UPDATE JMS_REFERENCE_LOG SET TXID=?, TXOP=? WHERE TXOP=?";
   String UPDATE_MARKED_MESSAGES_WITH_TX = "UPDATE JMS_MESSAGE_LOG SET TXID=?, TXOP=? WHERE TXOP=? AND TXID=?";
   String UPDATE_MARKED_REFERENCES_WITH_TX = "UPDATE JMS_REFERENCE_LOG SET TXID=?, TXOP=? WHERE TXOP=? AND TXID=?";
   String DELETE_MARKED_MESSAGES_WITH_TX = "DELETE FROM JMS_MESSAGE_LOG WHERE TXID IN (SELECT TXID FROM JMS_TRANSACTION_LOG) AND TXOP=?";
   String DELETE_MARKED_REFERENCES_WITH_TX = "DELETE FROM JMS_REFERENCE_LOG WHERE TXID IN (SELECT TXID FROM JMS_TRANSACTION_LOG) AND TXOP=?";
   String DELETE_TX = "DELETE FROM JMS_TRANSACTION_LOG WHERE TXID = ?";
   String DELETE_MARKED_MESSAGES = "DELETE FROM JMS_MESSAGE_LOG WHERE TXID=? AND TXOP=?";
   String DELETE_MARKED_REFERENCES = "DELETE FROM JMS_REFERENCE_LOG WHERE TXID=? AND TXOP=?";
   String DELETE_TEMPORARY_MESSAGES = "DELETE FROM JMS_MESSAGE_LOG WHERE TXOP='T'";
   String DELETE_TEMPORARY_REFERENCES = "DELETE FROM JMS_REFERENCE_LOG WHERE TXOP='T'";
   String INSERT_TX = "INSERT INTO JMS_TRANSACTION_LOG (TXID) values(?)";
   String SELECT_MAX_TX = "SELECT MAX(TXID) FROM JMS_TRANSACTION_LOG";
   String SELECT_MESSAGES_IN_DEST = "SELECT MESSAGEID, MESSAGEBLOB FROM JMS_MESSAGE_LOG WHERE DESTINATION=?";
   String SELECT_REFERENCES_IN_DEST = "SELECT R.MESSAGEID, M.MESSAGEBLOB, R.REDELIVERED, R.REDELIVERS FROM JMS_REFERENCE_LOG AS R, JMS_MESSAGE_LOG AS M" + 
                                      " WHERE R.MESSAGEID = M.MESSAGEID AND R.DESTINATION=?";
   String SELECT_MESSAGE = "SELECT MESSAGEID, MESSAGEBLOB FROM JMS_MESSAGE_LOG WHERE MESSAGEID=? AND DESTINATION=?";
   String INSERT_MESSAGE = "INSERT INTO JMS_MESSAGE_LOG (MESSAGEID, DESTINATION, MESSAGEBLOB, TXID, TXOP, LATECLONE) VALUES(?,?,?,?,?,?)";
   String INSERT_REFERENCE = "INSERT INTO JMS_REFERENCE_LOG (MESSAGEID, DESTINATION, TXID, TXOP, REDELIVERED, REDELIVERS) VALUES(?,?,?,?,?,?)";
   String MARK_MESSAGE = "UPDATE JMS_MESSAGE_LOG SET TXID=?, TXOP=? WHERE MESSAGEID=? AND DESTINATION=?";
   String MARK_REFERENCE = "UPDATE JMS_REFERENCE_LOG SET TXID=?, TXOP=? WHERE MESSAGEID=? AND DESTINATION=?";
   String DELETE_MESSAGE = "DELETE FROM JMS_MESSAGE_LOG WHERE MESSAGEID=? AND DESTINATION=?";
   String DELETE_REFERENCE = "DELETE FROM JMS_REFERENCE_LOG WHERE MESSAGEID=? AND DESTINATION=?";
   String UPDATE_MESSAGE = "UPDATE JMS_MESSAGE_LOG SET MESSAGEBLOB=? WHERE MESSAGEID=? AND DESTINATION=?";
   String UPDATE_REFERENCE = "UPDATE JMS_REFERENCE_LOG SET REDELIVERED=?, REDELIVERS=? WHERE MESSAGEID=? AND DESTINATION=?";
   String DELETE_ORPHANED_MESSAGES = "DELETE FROM JMS_MESSAGE_LOG WHERE LATECLONE = '1' AND MESSAGEID NOT IN (SELECT MESSAGEID FROM JMS_REFERENCE_LOG)";
   String DELETE_ALL_TXS = "DELETE FROM JMS_TRANSACTION_LOG";
   String CREATE_REFERENCE_TABLE =
      "CREATE TABLE JMS_REFERENCE_LOG ( MESSAGEID INTEGER NOT NULL, "
         + "DESTINATION VARCHAR(256) NOT NULL, TXID INTEGER, TXOP CHAR(1), "
         + "REDELIVERED CHAR(1), REDELIVERS INTEGER, "
         + "PRIMARY KEY (MESSAGEID, DESTINATION) )";
   String CREATE_MESSAGE_TABLE =
      "CREATE TABLE JMS_MESSAGE_LOG ( MESSAGEID INTEGER NOT NULL, "
         + "DESTINATION VARCHAR(256), TXID INTEGER, TXOP CHAR(1), LATECLONE CHAR(1), "
         + "MESSAGEBLOB OBJECT, PRIMARY KEY (MESSAGEID, DESTINATION) )";
   String CREATE_TX_TABLE = "CREATE TABLE JMS_TRANSACTION_LOG ( TXID INTEGER )";

   /** The blob type */
   int blobType = OBJECT_BLOB;

   /** Whether to create tables */
   boolean createTables = true;
   
   /** Number of retry attempts to connect to the db */
   private int connectionRetryAttempts = 5;
   
   /** The garbage collection period  in millis */
   private long gcPeriod = 60000;

   /** The background gc thread */
   private Thread gcThread;

   // Constructors -----------------------------------------------------------------

   /**
    * Create a new persistence manager
    *
    * @throws JMSException for any error
    */   
   public PersistenceManager() throws javax.jms.JMSException
   {
      txManager = new TxManager(this);
   }

   // Public -----------------------------------------------------------------------

   /**
    * Retrieve the connection manager object name
    * 
    * @jmx:managed-attribute
    */
   public ObjectName getConnectionManager()
   {
      return connectionManagerName;
   }

   /**
    * Set the connection manager object name
    * 
    * @jmx:managed-attribute
    */
   public void setConnectionManager(ObjectName connectionManagerName)
   {
      this.connectionManagerName = connectionManagerName;
   }

   /**
    * Set the garbage collection period
    * 
    * @jmx:managed-attribute
    */
   public int getGCPeriodSecs()
   {
      return (int) gcPeriod / 1000;
   }

   /**
    * Set the garbage collection period in seconds
    * 
    * @jmx:managed-attribute
    */
   public void setGCPeriodSecs(int gcPeriodSecs)
   {
      this.gcPeriod = gcPeriodSecs * 1000;
   }
   
   /**
    * Gets the ConnectionRetryAttempts.
    *
    * @jmx:managed-attribute
    * @return the number of retry events
    */
   public int getConnectionRetryAttempts()
   {
      return this.connectionRetryAttempts;
   }
   
   /**
    * Sets the ConnectionRetryAttempts.
    *
    * @jmx:managed-attribute
    * @param value the number of retry attempts
    */
   public void setConnectionRetryAttempts(int value)
   {
      this.connectionRetryAttempts = value;
   }

   /**
    * Gets the sqlProperties.
    *
    * @jmx:managed-attribute
    * @return Returns the Properties
    */
   public String getSqlProperties()
   {
      try
      {
         ByteArrayOutputStream boa = new ByteArrayOutputStream();
         sqlProperties.store(boa, "");
         return new String(boa.toByteArray());
      }
      catch (IOException shouldnothappen)
      {
         return "";
      }
   }

   /**
    * Sets the sqlProperties.
    *
    * @jmx:managed-attribute
    * @param sqlProperties The sqlProperties to set
    */
   public void setSqlProperties(String value)
   {
      try
      {
         ByteArrayInputStream is = new ByteArrayInputStream(value.getBytes());
         sqlProperties = new Properties();
         sqlProperties.load(is);
      }
      catch (IOException shouldnothappen)
      {
      }
   }

   // PersistenceManager implementation --------------------------------------------

   public Tx createPersistentTx() throws JMSException
   {
      Tx id = new Tx(nextTransactionId.increment());
      TransactionManagerStrategy tms = new TransactionManagerStrategy();
      tms.startTX();
      Connection c = null;
      PreparedStatement stmt = null;
      boolean threadWasInterrupted = Thread.interrupted();
      try
      {
         c = this.getConnection();
         stmt = c.prepareStatement(INSERT_TX);
         stmt.setLong(1, id.longValue());
         stmt.executeUpdate();

      }
      catch (SQLException e)
      {
         tms.setRollbackOnly();
         throw new SpyJMSException("Could not crate tx: " + id, e);
      }
      finally
      {
         try
         {
            stmt.close();
         }
         catch (Throwable ignore)
         {
         }
         try
         {
            c.close();
         }
         catch (Throwable ignore)
         {
         }
         tms.endTX();

         // Restore the interrupted state of the thread
         if( threadWasInterrupted )
            Thread.currentThread().interrupt();
      }

      return id;
   }

   public void commitPersistentTx(Tx txId) throws JMSException
   {
      TransactionManagerStrategy tms = new TransactionManagerStrategy();
      tms.startTX();
      Connection c = null;
      boolean threadWasInterrupted = Thread.interrupted();
      try
      {
         c = this.getConnection();
         removeMarkedMessages(c, txId, "D");
         removeMarkedReferences(c, txId, "D");
         removeTXRecord(c, txId.longValue());
      }
      catch (SQLException e)
      {
         tms.setRollbackOnly();
         throw new SpyJMSException("Could not commit tx: " + txId, e);
      }
      finally
      {
         try
         {
            c.close();
         }
         catch (Throwable ignore)
         {
         }
         tms.endTX();

         // Restore the interrupted state of the thread
         if( threadWasInterrupted )
            Thread.currentThread().interrupt();
      }
   }

   public void rollbackPersistentTx(Tx txId) throws JMSException
   {

      TransactionManagerStrategy tms = new TransactionManagerStrategy();
      tms.startTX();
      Connection c = null;
      PreparedStatement stmt = null;
      boolean threadWasInterrupted = Thread.interrupted();
      try
      {
         c = this.getConnection();
         removeMarkedMessages(c, txId, "A");
         removeMarkedReferences(c, txId, "A");
         removeTXRecord(c, txId.longValue());

         // Restore all the messages that were logically removed.
         stmt = c.prepareStatement(UPDATE_MARKED_MESSAGES_WITH_TX);
         stmt.setNull(1, Types.BIGINT);
         stmt.setString(2, "A");
         stmt.setString(3, "D");
         stmt.setLong(4, txId.longValue());
         stmt.executeUpdate();
         stmt.close();

         // Restore all the references that were logically removed.
         stmt = c.prepareStatement(UPDATE_MARKED_REFERENCES_WITH_TX);
         stmt.setNull(1, Types.BIGINT);
         stmt.setString(2, "A");
         stmt.setString(3, "D");
         stmt.setLong(4, txId.longValue());
         stmt.executeUpdate();
         stmt.close();
      }
      catch (SQLException e)
      {
         tms.setRollbackOnly();
         throw new SpyJMSException("Could not rollback tx: " + txId, e);
      }
      finally
      {
         try
         {
            if (stmt != null)
               stmt.close();
            if (c != null)
               c.close();
         }
         catch (Throwable ignore)
         {
         }
         tms.endTX();

         // Restore the interrupted state of the thread
         if( threadWasInterrupted )
            Thread.currentThread().interrupt();
      }
   }

   public void add(MessageReference messageRef, Tx txId) throws JMSException
   {
      boolean trace = log.isTraceEnabled();
      if (trace)
         log.trace("About to add message " + messageRef + " transaction=" + txId);

      TransactionManagerStrategy tms = new TransactionManagerStrategy();
      tms.startTX();
      Connection c = null;
      boolean threadWasInterrupted = Thread.interrupted();
      try
      {
         c = this.getConnection();
         // Synchronize on the message to avoid a race with the softener
         synchronized(messageRef)
         {
            if (trace)
               log.trace("Inserting message " + messageRef + " transaction=" + txId);

            if (messageRef.isLateClone())
            {
               addReference(c, messageRef.getPersistentKey(), messageRef, txId, "A");
            }
            else
            {
               SpyMessage message = messageRef.getMessage();
               addMessage(c, messageRef.getPersistentKey(), message, txId, "A", "0");
            }
            messageRef.setStored(MessageReference.STORED);

            if (trace)
               log.trace("Added message " + messageRef + " transaction=" + txId);
         }
      }
      catch (IOException e)
      {
         tms.setRollbackOnly();
         throw new SpyJMSException("Could not store message: " + messageRef, e);
      }
      catch (SQLException e)
      {
         tms.setRollbackOnly();
         throw new SpyJMSException("Could not store message: " + messageRef, e);
      }
      finally
      {
         try
         {
            c.close();
         }
         catch (Throwable ignore)
         {
         }
         tms.endTX();

         // Restore the interrupted state of the thread
         if( threadWasInterrupted )
            Thread.currentThread().interrupt();
      }
   }

   public void update(MessageReference messageRef, Tx txId) throws JMSException
   {
      boolean trace = log.isTraceEnabled();
      if (trace)
         log.trace("Updating message " + messageRef + " transaction=" + txId);

      TransactionManagerStrategy tms = new TransactionManagerStrategy();
      tms.startTX();
      Connection c = null;
      PreparedStatement stmt = null;
      boolean threadWasInterrupted = Thread.interrupted();
      try
      {
         c = this.getConnection();
         if (txId == null)
         {
            if (messageRef.isLateClone())
            {
               stmt = c.prepareStatement(UPDATE_REFERENCE);
               if (messageRef.redelivered)
                  stmt.setString(1, "1");
               else
                  stmt.setString(1, "0");
               stmt.setLong(2, messageRef.redeliveryCount);
               stmt.setLong(3, messageRef.messageId);
               stmt.setString(4, messageRef.getPersistentKey());
            }
            else
            {
               stmt = c.prepareStatement(UPDATE_MESSAGE);
               setBlob(stmt, 1, messageRef.getMessage());
               stmt.setLong(2, messageRef.messageId);
               stmt.setString(3, messageRef.getPersistentKey());
            }
            int rc = stmt.executeUpdate();
            if(  rc != 1 ) 
               throw new SpyJMSException("Could not update the message in the database: update affected "+rc+" rows");
         }
         else
         {
            throw new SpyJMSException("NYI: Updating a message in a transaction is not currently used");
         }
         if (trace)
            log.trace("Updated message " + messageRef + " transaction=" + txId);

      }
      catch (IOException e)
      {
         tms.setRollbackOnly();
         throw new SpyJMSException("Could not update message: " + messageRef, e);
      }
      catch (SQLException e)
      {
         tms.setRollbackOnly();
         throw new SpyJMSException("Could not update message: " + messageRef, e);
      }
      finally
      {
         try
         {
            stmt.close();
         }
         catch (Throwable ignore)
         {
         }
         try
         {
            c.close();
         }
         catch (Throwable ignore)
         {
         }
         tms.endTX();

         // Restore the interrupted state of the thread
         if( threadWasInterrupted )
            Thread.currentThread().interrupt();
      }
   }

   public void remove(MessageReference messageRef, Tx txId) throws JMSException
   {
      boolean trace = log.isTraceEnabled();
      if (trace)
         log.trace("Removing message " + messageRef + " transaction=" + txId);

      TransactionManagerStrategy tms = new TransactionManagerStrategy();
      tms.startTX();
      Connection c = null;
      PreparedStatement stmt = null;
      boolean threadWasInterrupted = Thread.interrupted();
      try
      {
         c = this.getConnection();
         // Synchronize on the message to avoid a race with the softener
         synchronized(messageRef)
         {
            if (txId == null)
            {
               if (messageRef.isLateClone())
                  stmt = c.prepareStatement(DELETE_REFERENCE);
               else
                  stmt = c.prepareStatement(DELETE_MESSAGE);
               stmt.setLong(1, messageRef.messageId);
               stmt.setString(2, messageRef.getPersistentKey());
               int rc = stmt.executeUpdate();
               if(  rc != 1 ) 
                  throw new SpyJMSException("Could not delete the message from the database: delete affected "+rc+" rows");

               // Adrian Brock:
               // Remove the message from the cache, but don't 
               // return it to the pool just yet. The queue still holds
               // a reference to the message and will return it
               // to the pool once it gets enough time slice.
               // The alternative is to remove the validation
               // for double removal from the cache, 
               // which I don't want to do because it is useful
               // for spotting errors
               messageRef.setStored(MessageReference.NOT_STORED);
               messageRef.removeDelayed();
            }
            else
            {
               if (messageRef.isLateClone())
               {
                  stmt = c.prepareStatement(MARK_REFERENCE);
                  stmt.setLong(1, txId.longValue());
                  stmt.setString(2, "D");
                  stmt.setLong(3, messageRef.messageId);
                  stmt.setString(4, messageRef.getPersistentKey());
               }
               else
               {
                  stmt = c.prepareStatement(MARK_MESSAGE);
                  stmt.setLong(1, txId.longValue());
                  stmt.setString(2, "D");
                  stmt.setLong(3, messageRef.messageId);
                  stmt.setString(4, messageRef.getPersistentKey());
               }
               int rc = stmt.executeUpdate();
               if(  rc != 1 )
                  throw new SpyJMSException("Could not mark the message as deleted in the database: update affected "+rc+" rows");
            }
            if (trace)
               log.trace("Removed message " + messageRef + " transaction=" + txId);
         }
      }
      catch (SQLException e)
      {
         tms.setRollbackOnly();
         throw new SpyJMSException("Could not remove message: " + messageRef, e);
      }
      finally
      {
         try
         {
            stmt.close();
         }
         catch (Throwable ignore)
         {
         }
         try
         {
            c.close();
         }
         catch (Throwable ignore)
         {
         }
         tms.endTX();

         // Restore the interrupted state of the thread
         if( threadWasInterrupted )
            Thread.currentThread().interrupt();
      }
   }

   public synchronized void restoreQueue(JMSDestination jmsDest, SpyDestination dest) throws javax.jms.JMSException
   {
      if (jmsDest == null)
         throw new IllegalArgumentException("Must supply non null JMSDestination to restoreQueue");
      if (dest == null)
         throw new IllegalArgumentException("Must supply non null SpyDestination to restoreQueue");

      TransactionManagerStrategy tms = new TransactionManagerStrategy();
      tms.startTX();
      Connection c = null;
      PreparedStatement stmt = null;
      ResultSet rs = null;
      boolean threadWasInterrupted = Thread.interrupted();
      try
      {
         c = this.getConnection();
         int counter=0;
         if (jmsDest.parameters.lateClone)
         {
            JMSTopic topic = (JMSTopic) jmsDest;
            // The durable subscription is not serialized
            DurableSubscriptionID id = ((SpyTopic) dest).getDurableSubscriptionID();

            stmt = c.prepareStatement(SELECT_REFERENCES_IN_DEST);
            stmt.setString(1, dest.toString());

            rs = stmt.executeQuery();
            while (rs.next())
            {
               SpyMessage message = extractMessage(rs, 2);
               boolean redelivered = false;
               if (rs.getString(3).equals("1"))
                  redelivered = true;
               message.header.jmsRedelivered = redelivered;
               message.header.jmsProperties.put(SpyMessage.PROPERTY_REDELIVERY_COUNT, new Integer(rs.getInt(4)));
               topic.restoreMessage(message, id);
               counter++;
            }
         }
         else
         {
            stmt = c.prepareStatement(SELECT_MESSAGES_IN_DEST);
            stmt.setString(1, dest.toString());

            rs = stmt.executeQuery();
            while (rs.next())
            {
               SpyMessage message = extractMessage(rs, 2);
               // The durable subscription is not serialized
               if (dest instanceof SpyTopic)
                  message.header.durableSubscriberID = ((SpyTopic)dest).getDurableSubscriptionID();
               jmsDest.restoreMessage(message);
               counter++;
            }
         }
         
         log.debug("Restored "+counter+" message(s) to: "+dest);
      }
      catch (IOException e)
      {
         tms.setRollbackOnly();
         throw new SpyJMSException("Could not restore messages to destination : " + dest.toString(), e);
      }
      catch (SQLException e)
      {
         tms.setRollbackOnly();
         throw new SpyJMSException("Could not restore messages to destination : " + dest.toString(), e);
      }
      finally
      {
         try
         {
            rs.close();
         }
         catch (Throwable ignore)
         {
         }
         try
         {
            stmt.close();
         }
         catch (Throwable ignore)
         {
         }
         try
         {
            c.close();
         }
         catch (Throwable ignore)
         {
         }
         tms.endTX();

         // Restore the interrupted state of the thread
         if( threadWasInterrupted )
            Thread.currentThread().interrupt();
      }

   }

   public TxManager getTxManager()
   {
      return txManager;
   }

   public void closeQueue(JMSDestination jmsDest, SpyDestination dest) throws JMSException
   {
      // Nothing to clean up, all the state is in the db.
   }

   /**
    * Unsupported operation
    */
   public MessageCache getMessageCacheInstance()
   {
      throw new UnsupportedOperationException("This is now set on the destination manager");
   }

   // NewPersistenceManager implementation -----------------------------------------

   public void addMessage(SpyMessage message) throws JMSException
   {

      TransactionManagerStrategy tms = new TransactionManagerStrategy();
      tms.startTX();
      Connection c = null;
      boolean threadWasInterrupted = Thread.interrupted();
      try
      {
         c = datasource.getConnection();
         addMessage(c, "*", message, null, null, "1");
      }
      catch (IOException e)
      {
         tms.setRollbackOnly();
         throw new SpyJMSException("Could not add message:", e);
      }
      catch (SQLException e)
      {
         tms.setRollbackOnly();
         throw new SpyJMSException("Could not add message:", e);
      }
      finally
      {
         try
         {
            if (c != null)
               c.close();
         }
         catch (Throwable ignore)
         {
         }
         tms.endTX();

         // Restore the interrupted state of the thread
         if( threadWasInterrupted )
            Thread.currentThread().interrupt();
      }
   }

   // PersistenceManagerMBean implementation ---------------------------------------

   public Object getInstance()
   {
      return this;
   }

   /**
    * Unsupported operation
    */
   public ObjectName getMessageCache()
   {
      throw new UnsupportedOperationException("This is now set on the destination manager");
   }

   /**
    * Unsupported operation
    */
   public void setMessageCache(ObjectName messageCache)
   {
      throw new UnsupportedOperationException("This is now set on the destination manager");
   }

   // CacheStore implementation ----------------------------------------------------

   public SpyMessage loadFromStorage(MessageReference messageRef) throws JMSException
   {
      if (log.isTraceEnabled())
         log.trace("Loading message from storage " + messageRef);

      TransactionManagerStrategy tms = new TransactionManagerStrategy();
      tms.startTX();
      Connection c = null;
      PreparedStatement stmt = null;
      ResultSet rs = null;
      boolean threadWasInterrupted = Thread.interrupted();
      try
      {
         c = this.getConnection();
         stmt = c.prepareStatement(SELECT_MESSAGE);
         stmt.setLong(1, messageRef.messageId);
         if (messageRef.isLateClone())
            stmt.setString(2, "*");
         else
            stmt.setString(2, messageRef.getPersistentKey());

         rs = stmt.executeQuery();
         if (rs.next())
            return extractMessage(rs, 2);

         return null;

      }
      catch (IOException e)
      {
         tms.setRollbackOnly();
         throw new SpyJMSException("Could not load message : " + messageRef, e);
      }
      catch (SQLException e)
      {
         tms.setRollbackOnly();
         throw new SpyJMSException("Could not load message : " + messageRef, e);
      }
      finally
      {
         try
         {
            rs.close();
         }
         catch (Throwable ignore)
         {
         }
         try
         {
            stmt.close();
         }
         catch (Throwable ignore)
         {
         }
         try
         {
            c.close();
         }
         catch (Throwable ignore)
         {
         }
         tms.endTX();

         // Restore the interrupted state of the thread
         if( threadWasInterrupted )
            Thread.currentThread().interrupt();
      }
   }

   public void removeFromStorage(MessageReference messageRef) throws JMSException
   {
      // We don't remove persistent messages sent to persistent queues
      if (messageRef.isPersistent())
         return;

      boolean trace = log.isTraceEnabled();
      if (trace)
         log.trace("Removing message from storage " + messageRef);
      
      TransactionManagerStrategy tms = new TransactionManagerStrategy();
      tms.startTX();
      Connection c = null;
      PreparedStatement stmt = null;
      boolean threadWasInterrupted = Thread.interrupted();
      try
      {
         c = this.getConnection();
         if (messageRef.isLateClone())
         {
            stmt = c.prepareStatement(DELETE_REFERENCE);
            stmt.setLong(1, messageRef.messageId);
            stmt.setString(2, messageRef.getPersistentKey());
            stmt.executeUpdate();
            messageRef.setStored(MessageReference.NOT_STORED);
         }
         else
         {
            stmt = c.prepareStatement(DELETE_MESSAGE);
            stmt.setLong(1, messageRef.messageId);
            stmt.setString(2, messageRef.getPersistentKey());
            stmt.executeUpdate();
            messageRef.setStored(MessageReference.NOT_STORED);
         }

         if (trace)
            log.trace("Removed message from storage " + messageRef);
      }
      catch (SQLException e)
      {
         tms.setRollbackOnly();
         throw new SpyJMSException("Could not remove message: " + messageRef, e);
      }
      finally
      {
         try
         {
            stmt.close();
         }
         catch (Throwable ignore)
         {
         }
         try
         {
            c.close();
         }
         catch (Throwable ignore)
         {
         }
         tms.endTX();

         // Restore the interrupted state of the thread
         if( threadWasInterrupted )
            Thread.currentThread().interrupt();
      }
   }

   public void saveToStorage(MessageReference messageRef, SpyMessage message) throws JMSException
   {
      // Ignore save operations for persistent messages sent to persistent queues
      // The queues handle the persistence
      if (messageRef.isPersistent())
         return;

      boolean trace = log.isTraceEnabled();
      if (trace)
         log.trace("Saving message to storage " + messageRef);
      
      TransactionManagerStrategy tms = new TransactionManagerStrategy();
      tms.startTX();
      Connection c = null;
      boolean threadWasInterrupted = Thread.interrupted();
      try
      {
         c = this.getConnection();
         if (messageRef.isLateClone())
         {
            addReference(c, messageRef.getPersistentKey(), messageRef, null, "T");
            try
            {
               addMessage(c, "*", message, null, "T", "1");
            }
            catch (SQLException e)
            {
               log.trace("TODO: Check this is really a duplicate", e);
            }
         }
         else
         {
            addMessage(c, messageRef.getPersistentKey(), message, null, "T", "0");
         }
         messageRef.setStored(MessageReference.STORED);

         if (trace)
            log.trace("Saved message to storage " + messageRef);
      }
      catch (IOException e)
      {
         tms.setRollbackOnly();
         throw new SpyJMSException("Could not store message: " + messageRef, e);
      }
      catch (SQLException e)
      {
         tms.setRollbackOnly();
         throw new SpyJMSException("Could not store message: " + messageRef, e);
      }
      finally
      {
         try
         {
            c.close();
         }
         catch (Throwable ignore)
         {
         }
         tms.endTX();

         // Restore the interrupted state of the thread
         if( threadWasInterrupted )
            Thread.currentThread().interrupt();
      }
   }

   // Runnable implementation ------------------------------------------------------
   
   public void run()
   {
      Thread current = Thread.currentThread();
      while (gcThread == current)
      {
         try
         {
            Thread.sleep(gcPeriod);
            if (gcThread != current)
               return;
               
            Connection connection = datasource.getConnection();
            try
            {
               PreparedStatement stmt = connection.prepareStatement(DELETE_ORPHANED_MESSAGES);
               try
               {
                  stmt.executeUpdate();
               }
               finally
               {
                  try
                  {
                     stmt.close();
                  }
                  catch (SQLException ignored)
                  {
                     log.trace("Error closing statement", ignored);
                  }
               }
            }
            finally
            {
               try
               {
                  connection.close();
               }
               catch (SQLException ignored)
               {
                  log.trace("Error closing connection", ignored);
               }
            }
         }
         catch (InterruptedException ignored)
         {
         }
         catch (Throwable t)
         {
            log.warn("Unhandled throwable in gc thread:", t);
         }
      }
   }

   // ServerMBeanSupport overrides -------------------------------------------------

   protected void startService() throws Exception
   {
      UPDATE_MARKED_MESSAGES = sqlProperties.getProperty("UPDATE_MARKED_MESSAGES", UPDATE_MARKED_MESSAGES);
      UPDATE_MARKED_REFERENCES = sqlProperties.getProperty("UPDATE_MARKED_REFERENCES", UPDATE_MARKED_REFERENCES);
      UPDATE_MARKED_MESSAGES_WITH_TX = sqlProperties.getProperty("UPDATE_MARKED_MESSAGES_WITH_TX", UPDATE_MARKED_MESSAGES_WITH_TX);
      UPDATE_MARKED_REFERENCES_WITH_TX = sqlProperties.getProperty("UPDATE_MARKED_REFERENCES_WITH_TX", UPDATE_MARKED_REFERENCES_WITH_TX);
      DELETE_MARKED_MESSAGES_WITH_TX = sqlProperties.getProperty("DELETE_MARKED_MESSAGES_WITH_TX", DELETE_MARKED_MESSAGES_WITH_TX);
      DELETE_MARKED_REFERENCES_WITH_TX = sqlProperties.getProperty("DELETE_MARKED_REFERENCES_WITH_TX", DELETE_MARKED_REFERENCES_WITH_TX);
      DELETE_TX = sqlProperties.getProperty("DELETE_TX", DELETE_TX);
      DELETE_MARKED_MESSAGES = sqlProperties.getProperty("DELETE_MARKED_MESSAGES", DELETE_MARKED_MESSAGES);
      DELETE_MARKED_REFERENCES = sqlProperties.getProperty("DELETE_MARKED_REFERENCES", DELETE_MARKED_REFERENCES);
      DELETE_TEMPORARY_MESSAGES = sqlProperties.getProperty("DELETE_TEMPORARY_MESSAGES", DELETE_TEMPORARY_MESSAGES);
      DELETE_TEMPORARY_REFERENCES = sqlProperties.getProperty("DELETE_TEMPORARY_REFERENCES", DELETE_TEMPORARY_REFERENCES);
      INSERT_TX = sqlProperties.getProperty("INSERT_TX", INSERT_TX);
      SELECT_MAX_TX = sqlProperties.getProperty("SELECT_MAX_TX", SELECT_MAX_TX);
      SELECT_MESSAGES_IN_DEST = sqlProperties.getProperty("SELECT_MESSAGES_IN_DEST", SELECT_MESSAGES_IN_DEST);
      SELECT_REFERENCES_IN_DEST = sqlProperties.getProperty("SELECT_REFERENCES_IN_DEST", SELECT_REFERENCES_IN_DEST);
      SELECT_MESSAGE = sqlProperties.getProperty("SELECT_MESSAGE", SELECT_MESSAGE);
      INSERT_MESSAGE = sqlProperties.getProperty("INSERT_MESSAGE", INSERT_MESSAGE);
      INSERT_REFERENCE = sqlProperties.getProperty("INSERT_REFERENCE", INSERT_REFERENCE);
      MARK_MESSAGE = sqlProperties.getProperty("MARK_MESSAGE", MARK_MESSAGE);
      MARK_REFERENCE = sqlProperties.getProperty("MARK_REFERENCE", MARK_REFERENCE);
      DELETE_MESSAGE = sqlProperties.getProperty("DELETE_MESSAGE", DELETE_MESSAGE);
      DELETE_REFERENCE = sqlProperties.getProperty("DELETE_REFERENCE", DELETE_REFERENCE);
      UPDATE_MESSAGE = sqlProperties.getProperty("UPDATE_MESSAGE", UPDATE_MESSAGE);
      UPDATE_REFERENCE = sqlProperties.getProperty("UPDATE_REFERENCE", UPDATE_REFERENCE);
      DELETE_ORPHANED_MESSAGES = sqlProperties.getProperty("DELETE_ORPHANED_MESSAGES", DELETE_ORPHANED_MESSAGES);
      DELETE_ALL_TXS = sqlProperties.getProperty("DELETE_ALL_TXS", DELETE_ALL_TXS);
      CREATE_REFERENCE_TABLE = sqlProperties.getProperty("CREATE_REFERENCE_TABLE", CREATE_REFERENCE_TABLE);
      CREATE_MESSAGE_TABLE = sqlProperties.getProperty("CREATE_MESSAGE_TABLE", CREATE_MESSAGE_TABLE);
      CREATE_TX_TABLE = sqlProperties.getProperty("CREATE_TX_TABLE", CREATE_TX_TABLE);
      createTables = sqlProperties.getProperty("CREATE_TABLES_ON_STARTUP", "true").equalsIgnoreCase("true");
      String s = sqlProperties.getProperty("BLOB_TYPE", "OBJECT_BLOB");

      if (s.equals("OBJECT_BLOB"))
         blobType = OBJECT_BLOB;
      else if (s.equals("BYTES_BLOB"))
         blobType = BYTES_BLOB;
      else if (s.equals("BINARYSTREAM_BLOB"))
         blobType = BINARYSTREAM_BLOB;
      else if (s.equals("BLOB_BLOB"))
         blobType = BLOB_BLOB;

      //Find the ConnectionFactoryLoader MBean so we can find the datasource
      String dsName = (String) getServer().getAttribute(connectionManagerName, "BindName");

      //Get an InitialContext
      InitialContext ctx = new InitialContext();
      datasource = (DataSource) ctx.lookup(dsName);

      //Get the Transaction Manager so we can control the jdbc tx
      tm = (TransactionManager) ctx.lookup(TransactionManagerService.JNDI_NAME);

      log.debug("Resolving uncommited TXS");
      resolveAllUncommitedTXs();
      
      gcThread = new Thread(this, "JBossMQ persistent message garbage collection");
      gcThread.setDaemon(true);
      gcThread.start();
   }

   protected void stopService() throws Exception
   {
      if (gcThread != null)
         gcThread.interrupt();
      gcThread = null;
   }

   // Protected --------------------------------------------------------------------

   /**
    * Resolve uncommitted transactions
    * 
    * @throws JMSException for any error
    */
   protected synchronized void resolveAllUncommitedTXs() throws JMSException
   {
      TransactionManagerStrategy tms = new TransactionManagerStrategy();
      tms.startTX();
      Connection c = null;
      PreparedStatement stmt = null;
      ResultSet rs = null;
      boolean threadWasInterrupted = Thread.interrupted();
      try
      {
         if (createTables)
         {
            c = this.getConnection();

            try
            {
               stmt = c.prepareStatement(CREATE_REFERENCE_TABLE);
               stmt.executeUpdate();
            }
            catch (SQLException e)
            {
               log.debug("Could not create table with SQL: " + CREATE_REFERENCE_TABLE + ", got : " + e);
            }
            finally
            {
               try
               {
                  if (stmt != null)
                     stmt.close();
               }
               catch (Throwable ignored)
               {
                  log.trace("Ignored: " + ignored);
               }
               stmt = null;
            }

            try
            {
               stmt = c.prepareStatement(CREATE_MESSAGE_TABLE);
               stmt.executeUpdate();
            }
            catch (SQLException e)
            {
               log.debug("Could not create table with SQL: " + CREATE_MESSAGE_TABLE + ", got : " + e);
            }
            finally
            {
               try
               {
                  if (stmt != null)
                     stmt.close();
               }
               catch (Throwable ignored)
               {
                  log.trace("Ignored: " + ignored);
               }
               stmt = null;
            }

            try
            {
               stmt = c.prepareStatement(CREATE_TX_TABLE);
               stmt.executeUpdate();
            }
            catch (SQLException e)
            {
               log.debug("Could not create table with SQL: " + CREATE_TX_TABLE + ", got : " + e);
            }
            finally
            {
               try
               {
                  if (stmt != null)
                     stmt.close();
               }
               catch (Throwable ignored)
               {
                  log.trace("Ignored: " + ignored);
               }
               stmt = null;
            }
         }
      }
      catch (SQLException e)
      {
         tms.setRollbackOnly();
         throw new SpyJMSException("Could not get a connection for jdbc2 table construction ", e);
      }
      finally
      {
         try
         {
            if (stmt != null)
               stmt.close();
         }
         catch (Throwable ignore)
         {
         }
         stmt = null;
         try
         {
            c.close();
         }
         catch (Throwable ignore)
         {
         }
         c = null;
         tms.endTX();

         // Restore the interrupted state of the thread
         if( threadWasInterrupted )
            Thread.currentThread().interrupt();
      }

      // We perform recovery in a different thread to the table creation
      // Postgres doesn't like create table failing in the same transaction
      // as other operations

      tms = new TransactionManagerStrategy();
      tms.startTX();
      threadWasInterrupted = Thread.interrupted();
      try
      {
         c = this.getConnection();

         // Delete the temporary messages.
         stmt = c.prepareStatement(DELETE_TEMPORARY_MESSAGES);
         stmt.executeUpdate();
         stmt.close();

         // Delete all the messages that were added but their tx's were not commited.
         stmt = c.prepareStatement(DELETE_MARKED_MESSAGES_WITH_TX);
         stmt.setString(1, "A");
         stmt.executeUpdate();
         stmt.close();

         // Restore all the messages that were removed but their tx's were not commited.
         stmt = c.prepareStatement(UPDATE_MARKED_MESSAGES);
         stmt.setNull(1, Types.BIGINT);
         stmt.setString(2, "A");
         stmt.setString(3, "D");
         stmt.executeUpdate();
         stmt.close();

         // Delete the temporary references.
         stmt = c.prepareStatement(DELETE_TEMPORARY_REFERENCES);
         stmt.executeUpdate();
         stmt.close();

         // Delete all the references that were added but their tx's were not commited.
         stmt = c.prepareStatement(DELETE_MARKED_REFERENCES_WITH_TX);
         stmt.setString(1, "A");
         stmt.executeUpdate();
         stmt.close();

         // Restore all the references that were removed but their tx's were not commited.
         stmt = c.prepareStatement(UPDATE_MARKED_REFERENCES);
         stmt.setNull(1, Types.BIGINT);
         stmt.setString(2, "A");
         stmt.setString(3, "D");
         stmt.executeUpdate();
         stmt.close();
         
         // Remove orphaned messages
         stmt = c.prepareStatement(DELETE_ORPHANED_MESSAGES);
         stmt.executeUpdate();
         stmt.close();

         // Find out what the next TXID should be
         stmt = c.prepareStatement(SELECT_MAX_TX);
         rs = stmt.executeQuery();
         if (rs.next())
            nextTransactionId.set(rs.getLong(1) + 1);
         rs.close();
         stmt.close();
            
         // Delete all transactions.
         stmt = c.prepareStatement(DELETE_ALL_TXS);
         stmt.executeUpdate();
         stmt.close();
      }
      catch (SQLException e)
      {
         tms.setRollbackOnly();
         throw new SpyJMSException("Could not resolve uncommited transactions.  Message recovery may not be accurate", e);
      }
      finally
      {
         try
         {
            rs.close();
         }
         catch (Throwable ignore)
         {
         }
         try
         {
            stmt.close();
         }
         catch (Throwable ignore)
         {
         }
         try
         {
            c.close();
         }
         catch (Throwable ignore)
         {
         }
         tms.endTX();

         // Restore the interrupted state of the thread
         if( threadWasInterrupted )
            Thread.currentThread().interrupt();
      }
   }

   /**
    * Remove a transaction record
    * 
    * @param c the connection
    * @param txid the transaction
    * @throws SQLException for any error
    */
   protected void removeTXRecord(Connection c, long txid) throws SQLException
   {
      PreparedStatement stmt = null;
      try
      {
         stmt = c.prepareStatement(DELETE_TX);
         stmt.setLong(1, txid);
         stmt.executeUpdate();
      }
      finally
      {
         try
         {
            stmt.close();
         }
         catch (Throwable e)
         {
         }
      }
   }

   /**
    * Add a message
    * 
    * @param c the connection
    * @param queue the queue name
    * @param message the message
    * @param txid the transaction id
    * @param mark the mark to set for the message
    * @throws SQLException for an error in the db
    * @throws IOException for an error serializing the message
    */
   protected void addMessage(Connection c, String queue, SpyMessage message, Tx txId, String mark, String lateClone)
      throws SQLException, IOException
   {
      PreparedStatement stmt = null;
      try
      {
         stmt = c.prepareStatement(INSERT_MESSAGE);

         stmt.setLong(1, message.header.messageId);
         String dest = "*";
         if (queue != null)
            dest = queue;
         stmt.setString(2, dest);
         setBlob(stmt, 3, message);
         if (txId != null)
            stmt.setLong(4, txId.longValue());
         else
            stmt.setNull(4, Types.BIGINT);
         if (mark == null)
            stmt.setNull(5, Types.VARCHAR);
         else
            stmt.setString(5, mark);
         stmt.setString(6, lateClone);

         try
         {
            stmt.executeUpdate();
         }
         catch (SQLException e)
         {
            if (lateClone.equals("1"))
               log.trace("Assumed already added to message log: " + message.header.messageId);
            else
               throw e;
         }
      }
      finally
      {
         try
         {
            stmt.close();
         }
         catch (Throwable ignore)
         {
         }
      }
   }

   /**
    * Add a reference
    * 
    * @param c the connection
    * @param queue the queue name
    * @param message the reference
    * @param txid the transaction id
    * @param mark the mark to set for the message
    * @throws SQLException for an error in the db
    * @throws IOException for an error serializing the message
    */
   protected void addReference(Connection c, String queue, MessageReference message, Tx txId, String mark)
      throws SQLException, IOException
   {
      PreparedStatement stmt = null;
      try
      {
         stmt = c.prepareStatement(INSERT_REFERENCE);

         stmt.setLong(1, message.messageId);
         stmt.setString(2, queue);

         if (txId != null)
            stmt.setLong(3, txId.longValue());
         else
            stmt.setNull(3, Types.BIGINT);
         stmt.setString(4, mark);
         if (message.redelivered)
            stmt.setString(5, "1");
         else
            stmt.setString(5, "0");
         stmt.setLong(6, message.redeliveryCount);

         stmt.executeUpdate();
      }
      finally
      {
         try
         {
            stmt.close();
         }
         catch (Throwable ignore)
         {
         }
      }
   }

   /**
    * Remove messages for a given transaction and mark
    * 
    * @param c the connection
    * @param txid the transaction id
    * @param mark the mark
    * @throws SQLException for any error
    */
   protected void removeMarkedMessages(Connection c, Tx txid, String mark) throws SQLException
   {
      PreparedStatement stmt = null;
      try
      {
         stmt = c.prepareStatement(DELETE_MARKED_MESSAGES);
         stmt.setLong(1, txid.longValue());
         stmt.setString(2, mark);
         stmt.executeUpdate();
      }
      finally
      {
         try
         {
            stmt.close();
         }
         catch (Throwable e)
         {
         }
      }
   }

   /**
    * Remove references for a given transaction and mark
    * 
    * @param c the connection
    * @param txid the transaction id
    * @param mark the mark
    * @throws SQLException for any error
    */
   protected void removeMarkedReferences(Connection c, Tx txid, String mark) throws SQLException
   {
      PreparedStatement stmt = null;
      try
      {
         stmt = c.prepareStatement(DELETE_MARKED_REFERENCES);
         if (txid != null)
            stmt.setLong(1, txid.longValue());
         else
            stmt.setNull(1, Types.BIGINT);
         stmt.setString(2, mark);
         stmt.executeUpdate();
      }
      finally
      {
         try
         {
            stmt.close();
         }
         catch (Throwable e)
         {
         }
      }
   }

   /**
    * Store the message in a blob
    * 
    * @param stmt the prepared statement
    * @param column the column in the prepared statement
    * @param message the message
    * @param IOException for an error serializing the message
    * @param SQLException for an error accessing the db
    */
   protected void setBlob(PreparedStatement stmt, int column, SpyMessage message)
      throws IOException, SQLException
   {
      if (blobType == OBJECT_BLOB)
         stmt.setObject(column, message);
      else if (blobType == BYTES_BLOB)
      {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         ObjectOutputStream oos = new ObjectOutputStream(baos);
         SpyMessage.writeMessage(message,oos);
         oos.flush();
         byte[] messageAsBytes = baos.toByteArray();
         stmt.setBytes(column, messageAsBytes);
      }
      else if (blobType == BINARYSTREAM_BLOB)
      {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         ObjectOutputStream oos = new ObjectOutputStream(baos);
         SpyMessage.writeMessage(message,oos);
         oos.flush();
         byte[] messageAsBytes = baos.toByteArray();
         ByteArrayInputStream bais = new ByteArrayInputStream(messageAsBytes);
         stmt.setBinaryStream(column, bais, messageAsBytes.length);
      }
      else if (blobType == BLOB_BLOB)
      {
         throw new RuntimeException("BLOB_TYPE: BLOB_BLOB is not yet implemented.");
         /** TODO:
         ByteArrayOutputStream baos= new ByteArrayOutputStream();
         ObjectOutputStream oos= new ObjectOutputStream(baos);
         oos.writeObject(message);
         byte[] messageAsBytes= baos.toByteArray();
         ByteArrayInputStream bais= new ByteArrayInputStream(messageAsBytes);
         stmt.setBsetBinaryStream(column, bais, messageAsBytes.length);
         */
      }
   }

   /**
    * Extract a message from a result
    * 
    * @param rs the result set
    * @param column the column number
    * @return the message
    * @throws SQLException for an error accessing the db
    * @throws IOException for an error extracting the message
    */
   protected SpyMessage extractMessage(ResultSet rs, int column) throws SQLException, IOException
   {
      long messageid = rs.getLong(1);
      SpyMessage message = null;
      if (blobType == OBJECT_BLOB)
      {
         message = (SpyMessage) rs.getObject(column);
      }
      else if (blobType == BYTES_BLOB)
      {
         byte[] st = rs.getBytes(column);
         ByteArrayInputStream baip = new ByteArrayInputStream(st);
         ObjectInputStream ois = new ObjectInputStream(baip);
         message = SpyMessage.readMessage(ois);
      }
      else if (blobType == BINARYSTREAM_BLOB)
      {
         ObjectInputStream ois = new ObjectInputStream(rs.getBinaryStream(column));
         message = SpyMessage.readMessage(ois);
      }
      else if (blobType == BLOB_BLOB)
      {
         ObjectInputStream ois = new ObjectInputStream(rs.getBlob(column).getBinaryStream());
         message = SpyMessage.readMessage(ois);
      }
      else throw new IllegalStateException();
      message.header.messageId = messageid;
      return message;
   }
   
   /**
    * Gets a connection from the datasource, retrying as needed.  This was
    * implemented because in some minimal configurations (i.e. little logging
    * and few services) the database wasn't ready when we tried to get a
    * connection.  We, therefore, implement a retry loop wich is controled
    * by the ConnectionRetryAttempts attribute.  Submitted by terry@amicas.com
    *
    * @throws SQLException if an error occurs.
    */
   protected Connection getConnection() throws SQLException
   {
      int attempts = this.connectionRetryAttempts;
      int attemptCount = 0;
      SQLException sqlException = null;
      while (attempts-- > 0)
      {
         if (++attemptCount > 1)
            log.debug("Retrying connection: attempt # " + attemptCount);

         try
         {
            sqlException = null;
            return datasource.getConnection();
         }
         catch (SQLException exception)
         {
            log.debug("Connection attempt # " + attemptCount + " failed with SQLException", exception);
            sqlException = exception;
         }
         finally
         {
            if (sqlException == null && attemptCount > 1)
               log.debug("Connection succeeded on attempt # " + attemptCount);
         }
           
         if (attempts > 0)
         {
             try
             {
                 Thread.sleep(1500);
             }
             catch(InterruptedException interruptedException)
             {
                 break;
             }
         }
      }
      if (sqlException != null)
         throw sqlException;
      throw new SQLException("connection attempt interrupted");
   }

   // Inner Classes ----------------------------------------------------------------

   /**
    * This inner class helps handle the tx management of the jdbc connections.
    */
   class TransactionManagerStrategy
   {

      Transaction threadTx;

      void startTX() throws JMSException
      {
         try
         {
            // Thread arriving must be clean (jboss doesn't set the thread
            // previously). However optimized calls come with associated
            // thread for example. We suspend the thread association here, and
            // resume in the finally block of the following try.
            threadTx = tm.suspend();

            tm.begin();
         }
         catch (Exception e)
         {
            try
            {
               if (threadTx != null)
                  tm.resume(threadTx);
            }
            catch (Exception ignore)
            {
            }
            throw new SpyJMSException("Could not start a transaction with the transaction manager.", e);
         }
      }

      void setRollbackOnly() throws JMSException
      {
         try
         {
            tm.setRollbackOnly();
         }
         catch (Exception e)
         {
            throw new SpyJMSException("Could not start a mark the transaction for rollback .", e);
         }
      }

      void endTX() throws JMSException
      {
         try
         {
            if (tm.getStatus() == Status.STATUS_MARKED_ROLLBACK)
            {
               tm.rollback();
            }
            else
            {
               tm.commit();
            }
         }
         catch (Exception e)
         {
            throw new SpyJMSException("Could not start a transaction with the transaction manager.", e);
         }
         finally
         {
            try
            {
               if (threadTx != null)
                  tm.resume(threadTx);
            }
            catch (Exception ignore)
            {
            }
         }
      }
   }
}