package org.jboss.ejb.plugins.cmp.jdbc2.schema;
import org.jboss.deployment.DeploymentException;
import org.jboss.ejb.plugins.cmp.jdbc.SQLUtil;
import org.jboss.ejb.plugins.cmp.jdbc.JDBCUtil;
import org.jboss.ejb.plugins.cmp.jdbc.JDBCEntityPersistenceStore;
import org.jboss.ejb.plugins.cmp.jdbc.JDBCTypeFactory;
import org.jboss.ejb.plugins.cmp.jdbc.bridge.JDBCAbstractCMRFieldBridge;
import org.jboss.ejb.plugins.cmp.jdbc.metadata.JDBCEntityMetaData;
import org.jboss.ejb.plugins.cmp.jdbc.metadata.JDBCFunctionMappingMetaData;
import org.jboss.ejb.plugins.cmp.jdbc.metadata.JDBCTypeMappingMetaData;
import org.jboss.ejb.plugins.cmp.jdbc2.bridge.JDBCEntityBridge2;
import org.jboss.ejb.plugins.cmp.jdbc2.bridge.JDBCCMPFieldBridge2;
import org.jboss.logging.Logger;
import org.jboss.mx.util.MBeanServerLocator;
import org.jboss.mx.util.MBeanProxyExt;
import org.jboss.system.ServiceControllerMBean;
import org.jboss.system.Registry;
import org.jboss.metadata.ConfigurationMetaData;
import org.jboss.metadata.MetaData;
import org.jboss.metadata.EntityMetaData;
import org.jboss.cache.invalidation.InvalidationManagerMBean;
import org.jboss.cache.invalidation.InvalidationGroup;
import org.w3c.dom.Element;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.sql.DataSource;
import javax.ejb.DuplicateKeyException;
import javax.ejb.EJBException;
import javax.ejb.NoSuchEntityException;
import javax.ejb.NoSuchObjectLocalException;
import javax.transaction.Transaction;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.ResultSet;
import java.util.Map;
import java.util.HashMap;
import java.util.ArrayList;
import java.util.List;
public class EntityTable
implements Table
{
private static final byte UNREFERENCED = 0;
private static final byte CLEAN = 1;
private static final byte DIRTY = 2;
private static final byte CREATED = 4;
private static final byte DELETED = 8;
private static final byte DIRTY_RELATIONS = 16;
private JDBCEntityBridge2 entity;
private String tableName;
private int fieldsTotal;
private int relationsTotal;
private DataSource dataSource;
private Schema schema;
private int tableId;
private boolean dontFlushCreated;
private String deleteSql;
private String updateSql;
private String insertSql;
private String selectSql;
private String duplicatePkSql;
private final CommitStrategy insertStrategy;
private final CommitStrategy deleteStrategy;
private final CommitStrategy updateStrategy;
private Logger log;
private Cache cache;
private ServiceControllerMBean serviceController;
private ObjectName cacheName;
private int[] references;
private int[] referencedBy;
private ForeignKeyConstraint[] fkConstraints;
private CacheInvalidator cacheInvalidator;
public EntityTable(JDBCEntityMetaData metadata, JDBCEntityBridge2 entity, Schema schema, int tableId)
throws DeploymentException
{
try
{
InitialContext ic = new InitialContext();
dataSource = (DataSource) ic.lookup(metadata.getDataSourceName());
}
catch(NamingException e)
{
throw new DeploymentException("Filed to lookup: " + metadata.getDataSourceName(), e);
}
this.entity = entity;
tableName = SQLUtil.fixTableName(metadata.getDefaultTableName(), dataSource);
log = Logger.getLogger(getClass().getName() + "." + tableName);
this.schema = schema;
this.tableId = tableId;
final EntityMetaData entityMetaData = ((EntityMetaData)entity.getContainer().getBeanMetaData());
final ConfigurationMetaData containerConf = entityMetaData.getContainerConfiguration();
dontFlushCreated = containerConf.isInsertAfterEjbPostCreate();
final Element cacheConf = containerConf.getContainerCacheConf();
final Element cachePolicy = cacheConf == null ? null : MetaData.getOptionalChild(cacheConf, "cache-policy-conf");
int minCapacity;
int maxCapacity;
if(cachePolicy != null)
{
String str = MetaData.getOptionalChildContent(cachePolicy, "min-capacity");
minCapacity = (str == null ? 1000 : Integer.parseInt(str));
str = MetaData.getOptionalChildContent(cachePolicy, "max-capacity");
maxCapacity = (str == null ? 10000 : Integer.parseInt(str));
}
else
{
minCapacity = 1000;
maxCapacity = 10000;
}
final Element otherConf = cacheConf == null ? null : MetaData.getOptionalChild(cacheConf, "cache-policy-conf-other");
int partitionsTotal;
final boolean invalidable;
final Element batchCommitStrategy;
if(otherConf != null)
{
String str = MetaData.getOptionalChildContent(otherConf, "partitions");
partitionsTotal = (str == null ? 10 : Integer.parseInt(str));
batchCommitStrategy = MetaData.getOptionalChild(otherConf, "batch-commit-strategy");
invalidable = MetaData.getOptionalChild(otherConf, "invalidable") == null ? false : true;
}
else
{
partitionsTotal = 10;
batchCommitStrategy = null;
invalidable = false;
}
cache = cachePolicy == null ? Cache.NONE : new PartitionedTableCache(minCapacity, maxCapacity, partitionsTotal);
if(invalidable)
{
String groupName = entityMetaData.getDistributedCacheInvalidationConfig().getInvalidationGroupName();
String imName = entityMetaData.getDistributedCacheInvalidationConfig().getInvalidationManagerName();
InvalidationManagerMBean im = (InvalidationManagerMBean) Registry.lookup(imName);
InvalidationGroup invalidationGroup = im.getInvalidationGroup(groupName);
cacheInvalidator = new CacheInvalidator(cache, entity.getContainer().getTransactionManager(), invalidationGroup);
}
if(batchCommitStrategy == null)
{
insertStrategy = NON_BATCH_UPDATE;
deleteStrategy = NON_BATCH_UPDATE;
updateStrategy = NON_BATCH_UPDATE;
}
else
{
log.debug("batch-commit-strategy");
insertStrategy = BATCH_UPDATE;
deleteStrategy = BATCH_UPDATE;
updateStrategy = BATCH_UPDATE;
}
if(cache != Cache.NONE)
{
final MBeanServer server = MBeanServerLocator.locateJBoss();
serviceController = (ServiceControllerMBean)
MBeanProxyExt.create(ServiceControllerMBean.class,
ServiceControllerMBean.OBJECT_NAME,
server);
try
{
cacheName =
new ObjectName("jboss.cmp:service=tablecache,ejbname=" + metadata.getName() + ",table=" + tableName);
server.registerMBean(cache, cacheName);
serviceController.create(cacheName);
}
catch(Exception e)
{
throw new DeploymentException("Failed to register table cache for " + tableName, e);
}
}
}
public void start() throws DeploymentException
{
final JDBCAbstractCMRFieldBridge[] cmrFields = entity.getCMRFields();
relationsTotal = (cmrFields != null ? cmrFields.length : 0);
JDBCCMPFieldBridge2[] pkFields = (JDBCCMPFieldBridge2[]) entity.getPrimaryKeyFields();
JDBCCMPFieldBridge2[] tableFields = (JDBCCMPFieldBridge2[]) entity.getTableFields();
deleteSql = "delete from " + tableName + " where ";
deleteSql += pkFields[0].getColumnName() + "=?";
for(int i = 1; i < pkFields.length; ++i)
{
deleteSql += " and " + pkFields[i].getColumnName() + "=?";
}
log.debug("delete sql: " + deleteSql);
insertSql = "insert into " + tableName + "(";
insertSql += tableFields[0].getColumnName();
for(int i = 1; i < tableFields.length; ++i)
{
insertSql += ", " + tableFields[i].getColumnName();
}
insertSql += ") values (?";
for(int i = 1; i < tableFields.length; ++i)
{
insertSql += ", ?";
}
insertSql += ")";
log.debug("insert sql: " + insertSql);
updateSql = "update " + tableName + " set ";
int setFields = 0;
for(int i = 0; i < tableFields.length; ++i)
{
JDBCCMPFieldBridge2 field = tableFields[i];
if(!field.isPrimaryKeyMember())
{
if(setFields++ > 0)
{
updateSql += ", ";
}
updateSql += field.getColumnName() + "=?";
}
}
updateSql += " where ";
updateSql += pkFields[0].getColumnName() + "=?";
for(int i = 1; i < pkFields.length; ++i)
{
updateSql += " and " + pkFields[i].getColumnName() + "=?";
}
if(entity.getVersionField() != null)
{
updateSql += " and " + entity.getVersionField().getColumnName() + "=?";
}
log.debug("update sql: " + updateSql);
String selectColumns = tableFields[0].getColumnName();
for(int i = 1; i < tableFields.length; ++i)
{
JDBCCMPFieldBridge2 field = tableFields[i];
selectColumns += ", " + field.getColumnName();
}
String whereColumns = pkFields[0].getColumnName() + "=?";
for(int i = 1; i < pkFields.length; ++i)
{
whereColumns += " and " + pkFields[i].getColumnName() + "=?";
}
if(entity.getMetaData().hasRowLocking())
{
JDBCEntityPersistenceStore manager = entity.getManager();
JDBCTypeFactory typeFactory = manager.getJDBCTypeFactory();
JDBCTypeMappingMetaData typeMapping = typeFactory.getTypeMapping();
JDBCFunctionMappingMetaData rowLockingTemplate = typeMapping.getRowLockingTemplate();
if(rowLockingTemplate == null)
{
throw new DeploymentException("Row locking template is not defined for mapping: " + typeMapping.getName());
}
selectSql = rowLockingTemplate.getFunctionSql(new Object[]{selectColumns, tableName, whereColumns, null},
new StringBuffer()).toString();
}
else
{
selectSql = "select ";
selectSql += selectColumns;
selectSql += " from " + tableName + " where ";
selectSql += whereColumns;
}
log.debug("select sql: " + selectSql);
if(dontFlushCreated)
{
duplicatePkSql = "select ";
duplicatePkSql += pkFields[0].getColumnName();
for(int i = 1; i < pkFields.length; ++i)
{
duplicatePkSql += ", " + pkFields[i].getColumnName();
}
duplicatePkSql += " from " + tableName + " where ";
duplicatePkSql += pkFields[0].getColumnName() + "=?";
for(int i = 1; i < pkFields.length; ++i)
{
duplicatePkSql += " and " + pkFields[i].getColumnName() + "=?";
}
log.debug("duplicate pk sql: " + duplicatePkSql);
}
if(cacheName != null)
{
try
{
serviceController.start(cacheName);
}
catch(Exception e)
{
throw new DeploymentException("Failed to start table cache.", e);
}
}
}
public void stop() throws Exception
{
if(cacheInvalidator != null)
{
cacheInvalidator.unregister();
}
if(cacheName != null)
{
serviceController.stop(cacheName);
serviceController.destroy(cacheName);
serviceController.remove(cacheName);
}
serviceController = null;
}
public StringBuffer appendColumnNames(JDBCCMPFieldBridge2[] fields, String alias, StringBuffer buf)
{
for(int i = 0; i < fields.length; ++i)
{
if(i > 0)
{
buf.append(", ");
}
if(alias != null)
{
buf.append(alias).append(".");
}
buf.append(fields[i].getColumnName());
}
return buf;
}
public void addField()
{
++fieldsTotal;
}
public int addVersionField()
{
return fieldsTotal++;
}
public ForeignKeyConstraint addFkConstraint(JDBCCMPFieldBridge2[] fkFields, EntityTable referenced)
{
addReference(referenced);
referenced.addReferencedBy(this);
if(fkConstraints == null)
{
fkConstraints = new ForeignKeyConstraint[1];
}
else
{
ForeignKeyConstraint[] tmp = fkConstraints;
fkConstraints = new ForeignKeyConstraint[tmp.length + 1];
System.arraycopy(tmp, 0, fkConstraints, 0, tmp.length);
}
final int fkindex = fkConstraints.length - 1;
final ForeignKeyConstraint fkc = new ForeignKeyConstraint(fkindex, fkFields);
fkConstraints[fkindex] = fkc;
return fkc;
}
public DataSource getDataSource()
{
return dataSource;
}
public void loadRow(ResultSet rs, Object pk)
{
View view = getView();
view.loadRow(rs, pk);
}
public Object loadRow(ResultSet rs)
{
View view = getView();
Object pk = view.loadPk(rs);
if(pk != null)
{
view.loadRow(rs, pk);
}
else if(log.isTraceEnabled())
{
log.trace("loaded pk is null.");
}
return pk;
}
public Row getRow(Object id)
{
return getView().getRow(id);
}
public boolean hasRow(Object id)
{
return getView().hasRow(id);
}
public Row loadRow(Object id) throws SQLException
{
View view = getView();
Row row = view.getRowByPk(id, false);
if(row != null)
{
if(log.isTraceEnabled())
{
log.trace("row is already loaded: pk=" + id);
}
return row;
}
JDBCCMPFieldBridge2[] pkFields = (JDBCCMPFieldBridge2[]) entity.getPrimaryKeyFields();
Connection con = null;
PreparedStatement ps = null;
ResultSet rs = null;
try
{
if(log.isDebugEnabled())
{
log.debug("executing sql: " + selectSql);
}
con = dataSource.getConnection();
ps = con.prepareStatement(selectSql);
int paramInd = 1;
for(int i = 0; i < pkFields.length; ++i)
{
JDBCCMPFieldBridge2 pkField = pkFields[i];
Object pkValue = pkField.getPrimaryKeyValue(id);
paramInd = pkField.setArgumentParameters(ps, paramInd, pkValue);
}
rs = ps.executeQuery();
if(!rs.next())
{
throw new NoSuchEntityException("Row not found: " + id);
}
return view.loadRow(rs, id);
}
catch(SQLException e)
{
log.error("Failed to load row: table=" + tableName + ", pk=" + id);
throw e;
}
finally
{
JDBCUtil.safeClose(rs);
JDBCUtil.safeClose(ps);
JDBCUtil.safeClose(con);
}
}
public int getTableId()
{
return tableId;
}
public String getTableName()
{
return tableName;
}
public Table.View createView(Transaction tx)
{
return new View(tx);
}
private void addReference(EntityTable table)
{
boolean wasRegistered = false;
if(references != null)
{
for(int i = 0; i < references.length; ++i)
{
if(references[i] == table.getTableId())
{
wasRegistered = true;
break;
}
}
if(!wasRegistered)
{
int[] tmp = references;
references = new int[references.length + 1];
System.arraycopy(tmp, 0, references, 0, tmp.length);
references[tmp.length] = table.getTableId();
}
}
else
{
references = new int[1];
references[0] = table.getTableId();
}
if(!wasRegistered)
{
if(log.isTraceEnabled())
{
log.trace("references " + table.getTableName());
}
}
}
private void addReferencedBy(EntityTable table)
{
boolean wasRegistered = false;
if(referencedBy != null)
{
for(int i = 0; i < referencedBy.length; ++i)
{
if(referencedBy[i] == table.getTableId())
{
wasRegistered = true;
break;
}
}
if(!wasRegistered)
{
int[] tmp = referencedBy;
referencedBy = new int[referencedBy.length + 1];
System.arraycopy(tmp, 0, referencedBy, 0, tmp.length);
referencedBy[tmp.length] = table.getTableId();
}
}
else
{
referencedBy = new int[1];
referencedBy[0] = table.getTableId();
}
if(!wasRegistered)
{
if(log.isTraceEnabled())
{
log.trace("referenced by " + table.getTableName());
}
}
}
private void delete(View view) throws SQLException
{
JDBCCMPFieldBridge2[] pkFields = (JDBCCMPFieldBridge2[]) entity.getPrimaryKeyFields();
Connection con = null;
PreparedStatement ps = null;
try
{
if(log.isDebugEnabled())
{
log.debug("executing : " + deleteSql);
}
con = dataSource.getConnection();
ps = con.prepareStatement(deleteSql);
int batchCount = 0;
while(view.deleted != null)
{
Row row = view.deleted;
int paramInd = 1;
for(int pkInd = 0; pkInd < pkFields.length; ++pkInd)
{
JDBCCMPFieldBridge2 pkField = pkFields[pkInd];
Object fieldValue = row.fields[pkField.getRowIndex()];
paramInd = pkField.setArgumentParameters(ps, paramInd, fieldValue);
}
deleteStrategy.executeUpdate(ps);
++batchCount;
row.flushStatus();
}
deleteStrategy.executeBatch(ps);
if(view.deleted != null)
{
throw new IllegalStateException("There are still rows to delete!");
}
if(log.isTraceEnabled())
{
log.trace("deleted rows: " + batchCount);
}
}
catch(SQLException e)
{
log.error("Failed to delete view: " + e.getMessage(), e);
throw e;
}
finally
{
JDBCUtil.safeClose(ps);
JDBCUtil.safeClose(con);
}
}
private void update(View view) throws SQLException
{
JDBCCMPFieldBridge2[] tableFields = (JDBCCMPFieldBridge2[]) entity.getTableFields();
JDBCCMPFieldBridge2[] pkFields = (JDBCCMPFieldBridge2[]) entity.getPrimaryKeyFields();
Connection con = null;
PreparedStatement ps = null;
try
{
if(log.isDebugEnabled())
{
log.debug("executing : " + updateSql);
}
con = dataSource.getConnection();
ps = con.prepareStatement(updateSql);
int batchCount = 0;
while(view.dirty != null)
{
Row row = view.dirty;
int paramInd = 1;
for(int fInd = 0; fInd < tableFields.length; ++fInd)
{
JDBCCMPFieldBridge2 field = tableFields[fInd];
if(!field.isPrimaryKeyMember())
{
Object fieldValue = row.fields[field.getRowIndex()];
paramInd = field.setArgumentParameters(ps, paramInd, fieldValue);
}
}
for(int fInd = 0; fInd < pkFields.length; ++fInd)
{
JDBCCMPFieldBridge2 pkField = pkFields[fInd];
Object fieldValue = row.fields[pkField.getRowIndex()];
paramInd = pkField.setArgumentParameters(ps, paramInd, fieldValue);
}
JDBCCMPFieldBridge2 versionField = entity.getVersionField();
if(versionField != null)
{
int versionIndex = versionField.getVersionIndex();
Object curVersion = row.fields[versionIndex];
paramInd = versionField.setArgumentParameters(ps, paramInd, curVersion);
Object newVersion = row.fields[versionField.getRowIndex()];
row.fields[versionIndex] = newVersion;
}
updateStrategy.executeUpdate(ps);
++batchCount;
row.flushStatus();
}
updateStrategy.executeBatch(ps);
if(log.isTraceEnabled())
{
log.trace("updated rows: " + batchCount);
}
}
catch(SQLException e)
{
log.error("Failed to update: table=" + tableName, e);
throw e;
}
finally
{
JDBCUtil.safeClose(ps);
JDBCUtil.safeClose(con);
}
}
private void insert(View view) throws SQLException
{
JDBCCMPFieldBridge2[] tableFields = (JDBCCMPFieldBridge2[]) entity.getTableFields();
Connection con = null;
PreparedStatement ps = null;
try
{
if(log.isDebugEnabled())
{
log.debug("executing : " + insertSql);
}
con = dataSource.getConnection();
ps = con.prepareStatement(insertSql);
int batchCount = 0;
while(view.created != null)
{
Row row = view.created;
int paramInd = 1;
for(int fInd = 0; fInd < tableFields.length; ++fInd)
{
JDBCCMPFieldBridge2 field = tableFields[fInd];
Object fieldValue = row.fields[field.getRowIndex()];
paramInd = field.setArgumentParameters(ps, paramInd, fieldValue);
}
insertStrategy.executeUpdate(ps);
++batchCount;
row.flushStatus();
}
insertStrategy.executeBatch(ps);
if(log.isTraceEnabled())
{
log.trace("inserted rows: " + batchCount);
}
}
catch(SQLException e)
{
log.error("Failed to insert new rows: " + e.getMessage(), e);
throw e;
}
finally
{
JDBCUtil.safeClose(ps);
JDBCUtil.safeClose(con);
}
}
private EntityTable.View getView()
{
return (EntityTable.View) schema.getView(this);
}
public class View implements Table.View
{
private final Transaction tx;
private Map rowByPk = new HashMap();
private Row created;
private Row deleted;
private Row dirty;
private Row dirtyRelations;
private Row clean;
private Row cacheUpdates;
private List rowsWithNullFks;
private boolean inFlush;
public View(Transaction tx)
{
this.tx = tx;
}
public Row getRow(Object pk)
{
Row row;
if(pk == null)
{
row = new Row(this);
}
else
{
row = getRowByPk(pk, false);
if(row == null)
{
row = createCleanRow(pk);
}
}
return row;
}
public Row getRowByPk(Object pk, boolean required)
{
Row row = (Row) rowByPk.get(pk);
if(row == null)
{
Object[] fields;
Object[] relations = null;
try
{
cache.lock(pk);
fields = cache.getFields(pk);
if(fields != null && relationsTotal > 0)
{
relations = cache.getRelations(pk);
if(relations == null)
{
relations = new Object[relationsTotal];
}
}
}
finally
{
cache.unlock(pk);
}
if(fields != null)
{
row = createCleanRow(pk, fields, relations);
}
}
if(row == null && required)
{
throw new IllegalStateException("row not found: pk=" + pk);
}
return row;
}
public void addClean(Row row)
{
if(clean != null)
{
row.next = clean;
clean.prev = row;
}
clean = row;
row.state = CLEAN;
rowByPk.put(row.pk, row);
}
public void addCreated(Row row) throws DuplicateKeyException
{
if(created != null)
{
row.next = created;
created.prev = row;
}
created = row;
row.state = CREATED;
rowByPk.put(row.pk, row);
JDBCCMPFieldBridge2 versionField = entity.getVersionField();
if(versionField != null)
{
row.fields[versionField.getVersionIndex()] = row.fields[versionField.getRowIndex()];
}
}
public Row loadRow(ResultSet rs, Object pk)
{
Row row = getRowByPk(pk, false);
if(row != null)
{
if(log.isTraceEnabled())
{
log.trace("row is already loaded: pk=" + pk);
}
return row;
}
else if(log.isTraceEnabled())
{
log.trace("reading result set: pk=" + pk);
}
row = createCleanRow(pk);
JDBCCMPFieldBridge2[] tableFields = (JDBCCMPFieldBridge2[]) entity.getTableFields();
for(int i = 0; i < tableFields.length; ++i)
{
JDBCCMPFieldBridge2 field = tableFields[i];
Object columnValue = field.loadArgumentResults(rs, field.getRowIndex() + 1);
row.fields[field.getRowIndex()] = columnValue;
if(field.getVersionIndex() != -1)
{
row.fields[field.getVersionIndex()] = columnValue;
}
}
Object[] relations = (relationsTotal > 0 ? new Object[relationsTotal] : null);
try
{
cache.lock(row.pk);
cache.put(tx, row.pk, row.fields, relations);
}
finally
{
cache.unlock(row.pk);
}
return row;
}
public Object loadPk(ResultSet rs)
{
Object pk = null;
JDBCCMPFieldBridge2[] pkFields = (JDBCCMPFieldBridge2[]) entity.getPrimaryKeyFields();
for(int i = 0; i < pkFields.length; ++i)
{
JDBCCMPFieldBridge2 field = pkFields[i];
Object columnValue = field.loadArgumentResults(rs, field.getRowIndex() + 1);
pk = field.setPrimaryKeyValue(pk, columnValue);
}
return pk;
}
public boolean hasRow(Object id)
{
boolean has = rowByPk.containsKey(id);
if(!has)
{
try
{
cache.lock(id);
has = cache.contains(tx, id);
}
finally
{
cache.unlock(id);
}
}
return has;
}
public void addRowWithNullFk(Row row)
{
if(rowsWithNullFks == null)
{
rowsWithNullFks = new ArrayList();
}
rowsWithNullFks.add(row);
}
private Row createCleanRow(Object pk)
{
Row row = new Row(this);
row.pk = pk;
addClean(row);
return row;
}
private Row createCleanRow(Object pk, Object[] fields, Object[] relations)
{
Row row = new Row(this, fields, relations);
row.pk = pk;
addClean(row);
return row;
}
public void flushDeleted(Schema.Views views) throws SQLException
{
if(rowsWithNullFks != null)
{
nullifyForeignKeys();
rowsWithNullFks = null;
}
if(deleted == null)
{
if(log.isTraceEnabled())
{
log.trace("no rows to delete");
}
return;
}
if(referencedBy != null)
{
if(inFlush)
{
if(log.isTraceEnabled())
{
log.trace("inFlush, ignoring flushDeleted");
}
return;
}
inFlush = true;
try
{
for(int i = 0; i < referencedBy.length; ++i)
{
final Table.View view = views.entityViews[referencedBy[i]];
if(view != null)
{
view.flushDeleted(views);
}
}
}
finally
{
inFlush = false;
}
}
delete(this);
}
public void flushCreated(Schema.Views views) throws SQLException
{
if(created == null || dontFlushCreated)
{
if(log.isTraceEnabled())
{
log.trace("no rows to insert");
}
return;
}
if(references != null)
{
if(inFlush)
{
if(log.isTraceEnabled())
{
log.trace("inFlush, ignorning flushCreated");
}
return;
}
else if(log.isTraceEnabled())
{
log.trace("flushing created references");
}
inFlush = true;
try
{
for(int i = 0; i < references.length; ++i)
{
final Table.View view = views.entityViews[references[i]];
if(view != null)
{
view.flushCreated(views);
}
}
}
finally
{
inFlush = false;
}
}
insert(this);
}
public void flushUpdated() throws SQLException
{
if(dirtyRelations != null)
{
while(dirtyRelations != null)
{
Row row = dirtyRelations;
row.flushStatus();
}
}
if(dirty == null)
{
if(log.isTraceEnabled())
{
log.trace("no rows to update");
}
return;
}
update(this);
}
public void beforeCompletion()
{
}
public void committed()
{
if(cacheUpdates != null)
{
Row cursor = cacheUpdates;
while(cursor != null)
{
cache.lock(cursor.pk);
try
{
switch(cursor.state)
{
case CLEAN:
cache.put(tx, cursor.pk, cursor.fields, cursor.relations);
break;
case DELETED:
try
{
cache.remove(tx, cursor.pk);
}
catch(Cache.RemoveException e)
{
log.warn(e.getMessage());
}
break;
default:
throw new IllegalStateException("Unexpected row state: table=" +
entity.getQualifiedTableName() +
", pk=" + cursor.pk + ", state=" + cursor.state);
}
}
finally
{
cache.unlock(cursor.pk);
}
cursor = cursor.nextCacheUpdate;
}
}
}
public void rolledback()
{
}
private void nullifyForeignKeys()
throws SQLException
{
if(log.isTraceEnabled())
{
log.trace("nullifying foreign keys");
}
Connection con = null;
PreparedStatement[] ps = new PreparedStatement[fkConstraints.length];
try
{
final JDBCCMPFieldBridge2[] pkFields = (JDBCCMPFieldBridge2[]) entity.getPrimaryKeyFields();
con = dataSource.getConnection();
for(int i = 0; i < rowsWithNullFks.size(); ++i)
{
final Row row = (Row) rowsWithNullFks.get(i);
if(row.state != DELETED)
{
final ForeignKeyConstraint[] cons = row.fkUpdates;
for(int c = 0; c < fkConstraints.length; ++c)
{
if(cons[c] != null)
{
PreparedStatement s = ps[c];
if(s == null)
{
if(log.isDebugEnabled())
{
log.debug("nullifying fk: " + cons[c].nullFkSql);
}
s = con.prepareStatement(cons[c].nullFkSql);
ps[c] = s;
}
int paramInd = 1;
for(int fInd = 0; fInd < pkFields.length; ++fInd)
{
JDBCCMPFieldBridge2 pkField = pkFields[fInd];
Object fieldValue = row.fields[pkField.getRowIndex()];
paramInd = pkField.setArgumentParameters(s, paramInd, fieldValue);
}
final int affected = s.executeUpdate();
if(affected != 1)
{
throw new EJBException("Affected " + affected + " rows while expected just one");
}
}
}
}
}
}
finally
{
for(int i = 0; i < ps.length; ++i)
{
JDBCUtil.safeClose(ps[i]);
}
JDBCUtil.safeClose(con);
}
}
}
public class Row
{
private EntityTable.View view;
private Object pk;
private final Object[] fields;
private final Object[] relations;
private byte state;
private Row prev;
private Row next;
private boolean cacheUpdateScheduled;
private Row nextCacheUpdate;
private ForeignKeyConstraint[] fkUpdates;
public Row(EntityTable.View view)
{
this.view = view;
fields = new Object[fieldsTotal];
relations = (relationsTotal == 0 ? null : new Object[relationsTotal]);
state = UNREFERENCED;
}
public Row(EntityTable.View view, Object[] fields, Object[] relations)
{
this.view = view;
this.fields = fields;
this.relations = relations;
state = UNREFERENCED;
}
public Object getPk()
{
return pk;
}
public void loadCachedRelations(int index, Cache.CacheLoader loader)
{
if(relations != null)
{
final Object cached = relations[index];
relations[index] = loader.loadFromCache(cached);
}
}
public void cacheRelations(int index, Cache.CacheLoader loader)
{
relations[index] = loader.getCachedValue();
scheduleCacheUpdate();
}
public void insert(Object pk) throws DuplicateKeyException
{
this.pk = pk;
view.addCreated(this);
}
public Object getFieldValue(int i)
{
if(state == DELETED)
{
throw new NoSuchObjectLocalException("The instance was removed: " + pk);
}
return fields[i];
}
public void setFieldValue(int i, Object value)
{
fields[i] = value;
}
public boolean isDirty()
{
return state != CLEAN && state != DIRTY_RELATIONS;
}
public void setDirty()
{
if(state == CLEAN || state == DIRTY_RELATIONS)
{
updateState(DIRTY);
}
}
public void setDirtyRelations()
{
if(state == CLEAN)
{
updateState(DIRTY_RELATIONS);
}
}
public void delete()
{
if(state == CLEAN || state == DIRTY || state == DIRTY_RELATIONS)
{
updateState(DELETED);
}
else if(state == CREATED)
{
dereference();
state = DELETED;
view.rowByPk.remove(pk);
}
else if(state == DELETED)
{
throw new IllegalStateException("The row is already deleted: pk=" + pk);
}
}
public void nullForeignKey(ForeignKeyConstraint constraint)
{
if(fkUpdates == null)
{
fkUpdates = new ForeignKeyConstraint[fkConstraints.length];
view.addRowWithNullFk(this);
}
fkUpdates[constraint.index] = constraint;
}
public void nonNullForeignKey(ForeignKeyConstraint constraint)
{
if(fkUpdates != null)
{
fkUpdates[constraint.index] = null;
}
}
private void flushStatus()
{
if(state == CREATED || state == DIRTY)
{
updateState(CLEAN);
}
else if(state == DELETED)
{
dereference();
}
else if(state == DIRTY_RELATIONS)
{
updateState(CLEAN);
}
scheduleCacheUpdate();
}
private void scheduleCacheUpdate()
{
if(!cacheUpdateScheduled)
{
if(view.cacheUpdates == null)
{
view.cacheUpdates = this;
}
else
{
nextCacheUpdate = view.cacheUpdates;
view.cacheUpdates = this;
}
cacheUpdateScheduled = true;
}
}
private void updateState(byte state)
{
dereference();
if(state == CLEAN)
{
if(view.clean != null)
{
next = view.clean;
view.clean.prev = this;
}
view.clean = this;
}
else if(state == DIRTY)
{
if(view.dirty != null)
{
next = view.dirty;
view.dirty.prev = this;
}
view.dirty = this;
}
else if(state == CREATED)
{
if(view.created != null)
{
next = view.created;
view.created.prev = this;
}
view.created = this;
}
else if(state == DELETED)
{
if(view.deleted != null)
{
next = view.deleted;
view.deleted.prev = this;
}
view.deleted = this;
}
else if(state == DIRTY_RELATIONS)
{
if(view.dirtyRelations != null)
{
next = view.dirtyRelations;
view.dirtyRelations.prev = this;
}
view.dirtyRelations = this;
}
else
{
throw new IllegalStateException("Can't update to state: " + state);
}
this.state = state;
}
private void dereference()
{
if(state == CLEAN && view.clean == this)
{
view.clean = next;
}
else if(state == DIRTY && view.dirty == this)
{
view.dirty = next;
}
else if(state == CREATED && view.created == this)
{
view.created = next;
}
else if(state == DELETED && view.deleted == this)
{
view.deleted = next;
}
else if(state == DIRTY_RELATIONS && view.dirtyRelations == this)
{
view.dirtyRelations = next;
}
if(next != null)
{
next.prev = prev;
}
if(prev != null)
{
prev.next = next;
}
prev = null;
next = null;
}
public void flush() throws SQLException, DuplicateKeyException
{
if(state != CREATED)
{
if(log.isTraceEnabled())
{
log.trace("The row is already inserted: pk=" + pk);
}
return;
}
Connection con = null;
PreparedStatement duplicatePkPs = null;
PreparedStatement insertPs = null;
ResultSet rs = null;
try
{
int paramInd;
con = dataSource.getConnection();
if(log.isDebugEnabled())
{
log.debug("executing : " + insertSql);
}
insertPs = con.prepareStatement(insertSql);
paramInd = 1;
JDBCCMPFieldBridge2[] tableFields = (JDBCCMPFieldBridge2[]) entity.getTableFields();
for(int fInd = 0; fInd < tableFields.length; ++fInd)
{
JDBCCMPFieldBridge2 field = tableFields[fInd];
Object fieldValue = fields[field.getRowIndex()];
paramInd = field.setArgumentParameters(insertPs, paramInd, fieldValue);
}
insertPs.executeUpdate();
flushStatus();
}
catch(SQLException e)
{
log.error("Failed to insert new rows: " + e.getMessage(), e);
throw e;
}
finally
{
JDBCUtil.safeClose(rs);
JDBCUtil.safeClose(duplicatePkPs);
JDBCUtil.safeClose(insertPs);
JDBCUtil.safeClose(con);
}
}
}
public static interface CommitStrategy
{
void executeUpdate(PreparedStatement ps) throws SQLException;
void executeBatch(PreparedStatement ps) throws SQLException;
}
private static final CommitStrategy BATCH_UPDATE = new CommitStrategy()
{
public void executeUpdate(PreparedStatement ps) throws SQLException
{
ps.addBatch();
}
public void executeBatch(PreparedStatement ps) throws SQLException
{
int[] updates = ps.executeBatch();
for(int i = 0; i < updates.length; ++i)
{
int status = updates[i];
if(status != 1 && status != -2 )
{
String msg = (status == -3 ?
"One of the commands in the batch failed to execute" :
"Each command in the batch should update exactly 1 row but " +
"one of the commands updated " + updates[i] + " rows.");
throw new EJBException(msg);
}
}
}
};
private static final CommitStrategy NON_BATCH_UPDATE = new CommitStrategy()
{
public void executeUpdate(PreparedStatement ps) throws SQLException
{
int rows = ps.executeUpdate();
if(rows != 1)
{
throw new EJBException("Expected one updated row but got: " + rows);
}
}
public void executeBatch(PreparedStatement ps)
{
}
};
public class ForeignKeyConstraint
{
public final int index;
private final String nullFkSql;
public ForeignKeyConstraint(int index, JDBCCMPFieldBridge2[] fkFields)
{
this.index = index;
StringBuffer buf = new StringBuffer();
buf.append("update ").append(tableName).append(" set ")
.append(fkFields[0].getColumnName()).append("=null");
for(int i = 1; i < fkFields.length; ++i)
{
buf.append(", ").append(fkFields[i].getColumnName()).append("=null");
}
buf.append(" where ");
JDBCCMPFieldBridge2[] pkFields = (JDBCCMPFieldBridge2[]) entity.getPrimaryKeyFields();
buf.append(pkFields[0].getColumnName()).append("=?");
for(int i = 1; i < pkFields.length; ++i)
{
buf.append(" and ").append(pkFields[i].getColumnName()).append("=?");
}
nullFkSql = buf.toString();
if(log.isDebugEnabled())
{
log.debug("update foreign key sql: " + nullFkSql);
}
}
}
}