package org.jboss.cache.lock;
import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
import EDU.oswego.cs.dl.util.concurrent.Sync;
import org.jboss.logging.Logger;
public class ReadWriteLockWithUpgrade implements ReadWriteLock
{
private long activeReaders_ = 0;
protected Thread activeWriter_ = null;
private long waitingReaders_ = 0;
private long waitingWriters_ = 0;
private long waitingUpgrader_ = 0;
protected final ThreadLocal upgraderLocal_ = new ThreadLocal();
protected static final Object dummy_ = new Object();
protected final ReaderLock readerLock_ = new ReaderLock();
protected final WriterLock writerLock_ = new WriterLock();
protected static final Logger log_ = Logger.getLogger(ReadWriteLockWithUpgrade.class);
public String toString()
{
StringBuffer sb=new StringBuffer();
sb.append("activeReaders=").append(activeReaders_).append(", activeWriter=").append(activeWriter_);
sb.append(", waitingReaders=").append(waitingReaders_).append(", waitingWriters=").append(waitingWriters_);
sb.append(", waitingUpgrader=").append(waitingUpgrader_);
return sb.toString();
}
public Sync writeLock()
{
return writerLock_;
}
public Sync readLock()
{
return readerLock_;
}
public Sync upgradeLockAttempt(long msecs) throws UpgradeException
{
if (activeReaders_ == 0)
throw new RuntimeException("No reader lock available for upgrade");
synchronized (writerLock_) {
if(waitingUpgrader_ >=1) {
String errStr="upgradeLockAttempt(): more than one reader trying to simultaneously upgrade to write lock";
log_.error(errStr);
throw new UpgradeException(errStr);
}
waitingUpgrader_++;
upgraderLocal_.set(dummy_);
}
if (activeReaders_ == 1) {
resetWaitingUpgrader();
return changeLock();
} else {
readerLock_.release();
try {
if (!writerLock_.attempt(msecs)) {
log_.error("upgradeLock(): failed");
resetWaitingUpgrader();
if(!readerLock_.attempt(msecs)) {
String errStr="ReadWriteLockWithUpgrade.upgradeLockAttempt():" +
" failed to upgrade to write lock and also failed to re-obtain the read lock";
log_.error(errStr);
throw new IllegalStateException(errStr);
}
return null;
}
resetWaitingUpgrader();
} catch (InterruptedException ex) {
resetWaitingUpgrader();
return null;
}
return writerLock_;
}
}
private void resetWaitingUpgrader() {
synchronized (writerLock_) {
waitingUpgrader_--;
upgraderLocal_.set(null);
}
}
protected synchronized Sync changeLock()
{
--activeReaders_;
if (!startWrite()) {
return null;
}
return writerLock_;
}
protected synchronized void cancelledWaitingReader()
{
--waitingReaders_;
}
protected synchronized void cancelledWaitingWriter()
{
--waitingWriters_;
}
protected boolean allowReader()
{
return activeWriter_ == null && waitingWriters_ == 0 && waitingUpgrader_ == 0;
}
protected synchronized boolean startRead()
{
boolean allowRead = allowReader();
if (allowRead) {
++activeReaders_;
}
return allowRead;
}
protected synchronized boolean startWrite()
{
boolean allowWrite = activeWriter_ == null && activeReaders_ == 0;
if (allowWrite) activeWriter_ = Thread.currentThread();
return allowWrite;
}
protected synchronized boolean startReadFromNewReader()
{
boolean pass = startRead();
if (!pass) ++waitingReaders_;
return pass;
}
protected synchronized boolean startWriteFromNewWriter()
{
boolean pass = startWrite();
if (!pass) ++waitingWriters_;
return pass;
}
protected synchronized boolean startReadFromWaitingReader()
{
boolean pass = startRead();
if (pass) --waitingReaders_;
return pass;
}
protected synchronized boolean startWriteFromWaitingWriter()
{
boolean pass = startWrite();
if (pass) --waitingWriters_;
return pass;
}
protected synchronized Signaller endRead()
{
if (activeReaders_ != 0 && --activeReaders_ == 0 && waitingWriters_ > 0)
return writerLock_;
else
return null;
}
protected synchronized Signaller endWrite()
{
activeWriter_ = null;
if (waitingReaders_ > 0 && allowReader())
return readerLock_;
else if (waitingWriters_ > 0)
return writerLock_;
else
return null;
}
static interface Signaller
{ void signalWaiters();
}
protected class ReaderLock implements Signaller, Sync
{
public void acquire() throws InterruptedException
{
throw new RuntimeException("acquire(): Operation currently not supported.");
}
public void release()
{
Signaller s = endRead();
if (s != null) {
s.signalWaiters();
}
}
public synchronized void signalWaiters()
{
ReaderLock.this.notifyAll();
}
public boolean attempt(long msecs) throws InterruptedException
{
if (Thread.interrupted()) throw new InterruptedException();
InterruptedException ie = null;
synchronized (this) {
if (msecs <= 0)
return startRead();
else if (startReadFromNewReader())
return true;
else {
long waitTime = msecs;
long start = System.currentTimeMillis();
while(true) {
try {
ReaderLock.this.wait(waitTime);
}
catch(InterruptedException ex) {
cancelledWaitingReader();
ie=ex;
break;
}
if(startReadFromWaitingReader())
return true;
else {
waitTime=msecs - (System.currentTimeMillis() - start);
if(waitTime <= 0) {
cancelledWaitingReader();
break;
}
}
}
}
}
writerLock_.signalWaiters();
if (ie != null)
throw ie;
else
return false; }
}
protected class WriterLock implements Signaller, Sync
{
public void acquire() throws InterruptedException
{
throw new RuntimeException("acquire(): Operation currently not supported.");
}
public void release()
{
Signaller s = endWrite();
if (s != null) s.signalWaiters();
}
public synchronized void signalWaiters()
{
WriterLock.this.notifyAll();
}
public boolean attempt(long msecs) throws InterruptedException
{
if (Thread.interrupted()) throw new InterruptedException();
InterruptedException ie = null;
synchronized (WriterLock.this) {
if (msecs <= 0) {
if (waitingUpgrader_ != 0) {
if (upgraderLocal_.get() != null) {
log_.info("attempt(): upgrade to write lock");
return startWrite();
}
else
return false;
} else
return startWrite();
} else if (startWriteFromNewWriter())
return true;
else {
long waitTime = msecs;
long start = System.currentTimeMillis();
while(true) {
try {
WriterLock.this.wait(waitTime);
}
catch(InterruptedException ex) {
cancelledWaitingWriter();
WriterLock.this.notifyAll();
ie=ex;
break;
}
if(waitingUpgrader_ != 0) { if(upgraderLocal_.get() != null) { if(startWriteFromWaitingWriter())
return true;
}
else { continue;
}
}
else { if(startWriteFromWaitingWriter())
return true;
}
waitTime=msecs - (System.currentTimeMillis() - start);
if(waitTime <= 0) {
cancelledWaitingWriter();
WriterLock.this.notifyAll();
break;
}
}
}
}
readerLock_.signalWaiters();
if (ie != null)
throw ie;
else
return false; }
}
}