[Mulgara-svn] r1060 - branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver
ronald at mulgara.org
ronald at mulgara.org
Mon Jul 7 12:54:20 UTC 2008
Author: ronald
Date: 2008-07-07 05:54:19 -0700 (Mon, 07 Jul 2008)
New Revision: 1060
Modified:
branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransaction.java
branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransactionFactory.java
branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraInternalTransaction.java
branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraInternalTransactionFactory.java
branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionFactory.java
branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionManager.java
branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraXAResourceContext.java
Log:
Fixed lacking synchronization and deadlocks.
Specifically, this changes the transaction, transaction-factory, and the
xa-resource instances to all use the same, shared, mutex for a given Session
(as opposed to each instance having its own mutex). This avoids all the
deadlock problems once and for all. Additionally, all the methods on the
transaction classes are also protected by the mutex (see below). And lastly,
ensured proper synchronization between timeout threads and the rest by making
the last-active variables volatile and by putting access to the current
write-transaction variable in a synchronized block.
Some background on this. While the contract for (Database)Session does not
allow the application to use a given instance concurrently in multiple
threads, we end up having concurrent access to it from multiple sources:
* RemoteAnswerWrapperAnswer uses a prefetch thread to pre-fetch a page of
results; this may run concurrently behind the applications back with
whatever operation the application is doing on the same Session, including
both transaction-control and data operations. This means a
MulgaraTransaction.execute() may occur concurrently with any other operation
(transaction-control or data) on the transaction, the transaction-factory,
or the xa-resource.
* Timeouts also run in a separate thread, so a heuristicRollback may run
concurrently with any operation on the transaction, the transaction-factory,
or the xa-resource.
* The Session may get closed by the RMI 'unreferenced' hook in
SessionWrapperRemoteSession, which also runs in a separate thread. While
somewhat rarer, a long running operation may still be in progress when this
occurs (e.g. the user gave up and terminated the client), or an operation
may be waiting for the write-lock and acquire at the same time as the close
happens.
* A buggy client may violate the contract.
This therefore requires all entry points to be fully synchronized.
One "drawback" of using the same mutex for all parties for a given Session is
reduced parallelism of operations on a given Session. This basically only
affects the pre-fetching of pages of query results which will now be serialized
on entry to the transaction execute() (an application is not allowed to issue
multiple operations concurrently (on a given Session). However, the resulting
complexity of trying to support this is not worth it IMNSHO (witness the
numerous bugs in this area so far).
Modified: branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransaction.java
===================================================================
--- branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransaction.java 2008-07-07 12:54:12 UTC (rev 1059)
+++ branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransaction.java 2008-07-07 12:54:19 UTC (rev 1060)
@@ -64,10 +64,12 @@
private MulgaraExternalTransactionFactory factory;
private DatabaseOperationContext context;
+ private boolean inXACompletion;
+
private boolean hRollback;
private int heurCode;
private boolean rollback;
- private long lastActive;
+ private volatile long lastActive;
MulgaraExternalTransaction(MulgaraExternalTransactionFactory factory, Xid xid, DatabaseOperationContext context)
throws QueryException {
@@ -82,6 +84,8 @@
this.xaResources = new HashMap<EnlistableResource, XAResource>();
+ this.inXACompletion = false;
+
this.hRollback = false;
this.heurCode = 0;
this.rollback = false;
@@ -97,103 +101,144 @@
public MulgaraTransactionException abortTransaction(String errorMessage, Throwable cause)
throws MulgaraTransactionException {
report("abortTransaction");
+
+ acquireMutex(10*1000L, true, MulgaraTransactionException.class); // 10 seconds
try {
- for (EnlistableResource resource : enlisted) {
- try {
- resource.abort();
- } catch (Throwable throw_away) {}
+ try {
+ for (EnlistableResource resource : enlisted) {
+ try {
+ resource.abort();
+ } catch (Throwable throw_away) {}
+ }
+ for (EnlistableResource resource : prepared) {
+ try {
+ resource.abort();
+ } catch (Throwable throw_away) {}
+ }
+
+ return new MulgaraTransactionException(errorMessage, cause);
+ } finally {
+ factory.transactionComplete(this);
}
- for (EnlistableResource resource : prepared) {
- try {
- resource.abort();
- } catch (Throwable throw_away) {}
- }
-
- return new MulgaraTransactionException(errorMessage, cause);
} finally {
- factory.transactionComplete(this);
+ releaseMutex();
}
}
public void heuristicRollback(String cause) throws MulgaraTransactionException {
- report("heuristicRollback");
- hRollback = true;
+ report("heuristicRollback: " + cause);
+
+ synchronized (factory.getMutexLock()) {
+ if (factory.getMutexHolder() != null && factory.getMutexHolder() != Thread.currentThread()) {
+ if (inXACompletion) {
+ return; // this txn is already being cleaned up, so let it go
+ }
+ }
+
+ acquireMutex(10*1000L, true, MulgaraTransactionException.class); // 10 seconds
+ }
+
try {
- rollback(xid);
- } catch (XAException xa) {
- throw new MulgaraTransactionException("Failed heuristic rollback", xa);
+ hRollback = true;
+ try {
+ rollback(xid);
+ } catch (XAException xa) {
+ throw new MulgaraTransactionException("Failed heuristic rollback", xa);
+ } finally {
+ heurCode = heurCode == 0 ? XAException.XA_HEURRB : heurCode;
+ }
} finally {
- heurCode = heurCode == 0 ? XAException.XA_HEURRB : heurCode;
+ releaseMutex();
}
}
public void execute(Operation operation, DatabaseMetadata metadata) throws MulgaraTransactionException {
+ acquireMutex(0, false, MulgaraTransactionException.class);
try {
- long la = lastActive;
- lastActive = -1;
+ try {
+ long la = lastActive;
+ lastActive = -1;
- operation.execute(context,
- context.getSystemResolver(),
- metadata);
+ operation.execute(context,
+ context.getSystemResolver(),
+ metadata);
- lastActive = (la != -1) ? System.currentTimeMillis() : -1;
- } catch (Throwable th) {
- try {
- heuristicRollback(th.toString());
- } catch (MulgaraTransactionException ex) {
- logger.error("Error in rollback after operation failure", ex);
+ lastActive = (la != -1) ? System.currentTimeMillis() : -1;
+ } catch (Throwable th) {
+ try {
+ heuristicRollback(th.toString());
+ } catch (MulgaraTransactionException ex) {
+ logger.error("Error in rollback after operation failure", ex);
+ }
+ throw new MulgaraTransactionException("Operation failed", th);
}
- throw new MulgaraTransactionException("Operation failed", th);
+ } finally {
+ releaseMutex();
}
}
public AnswerOperationResult execute(AnswerOperation ao) throws TuplesException {
+ acquireMutex(0, false, TuplesException.class);
try {
- long la = lastActive;
- lastActive = -1;
+ try {
+ long la = lastActive;
+ lastActive = -1;
- ao.execute();
+ ao.execute();
- lastActive = (la != -1) ? System.currentTimeMillis() : -1;
+ lastActive = (la != -1) ? System.currentTimeMillis() : -1;
- return ao.getResult();
- } catch (Throwable th) {
- try {
- logger.warn("Error in answer operation triggered rollback", th);
- heuristicRollback(th.toString());
- } catch (MulgaraTransactionException ex) {
- logger.error("Error in rollback after answer-operation failure", ex);
+ return ao.getResult();
+ } catch (Throwable th) {
+ try {
+ logger.warn("Error in answer operation triggered rollback", th);
+ heuristicRollback(th.toString());
+ } catch (MulgaraTransactionException ex) {
+ logger.error("Error in rollback after answer-operation failure", ex);
+ }
+ throw new TuplesException("Request failed", th);
}
- throw new TuplesException("Request failed", th);
+ } finally {
+ releaseMutex();
}
}
// FIXME: See if we can't rearrange things to allow this to be deleted.
public void execute(TransactionOperation to) throws MulgaraTransactionException {
- long la = lastActive;
- lastActive = -1;
+ acquireMutex(0, false, MulgaraTransactionException.class);
+ try {
+ long la = lastActive;
+ lastActive = -1;
- to.execute();
+ to.execute();
- lastActive = (la != -1) ? System.currentTimeMillis() : -1;
+ lastActive = (la != -1) ? System.currentTimeMillis() : -1;
+ } finally {
+ releaseMutex();
+ }
}
public void enlist(EnlistableResource enlistable) throws MulgaraTransactionException {
+ acquireMutex(0, false, MulgaraTransactionException.class);
try {
- XAResource res = enlistable.getXAResource();
- for (EnlistableResource eres : enlisted) {
- if (res.isSameRM(xaResources.get(eres))) {
- return;
+ try {
+ XAResource res = enlistable.getXAResource();
+ for (EnlistableResource eres : enlisted) {
+ if (res.isSameRM(xaResources.get(eres))) {
+ return;
+ }
}
+ enlisted.add(enlistable);
+ xaResources.put(enlistable, res);
+ // FIXME: We need to handle this uptodate operation properly - handle
+ // suspension or mid-prepare/commit.
+ // bringUptodate(res);
+ res.start(xid, XAResource.TMNOFLAGS);
+ } catch (XAException ex) {
+ throw new MulgaraTransactionException("Failed to enlist resource", ex);
}
- enlisted.add(enlistable);
- xaResources.put(enlistable, res);
- // FIXME: We need to handle this uptodate operation properly - handle
- // suspension or mid-prepare/commit.
- // bringUptodate(res);
- res.start(xid, XAResource.TMNOFLAGS);
- } catch (XAException ex) {
- throw new MulgaraTransactionException("Failed to enlist resource", ex);
+ } finally {
+ releaseMutex();
}
}
@@ -207,14 +252,20 @@
void commit(Xid xid) throws XAException {
report("commit");
- lastActive = -1;
- // FIXME: Consider the possiblity prepare failed, or was incomplete.
- for (EnlistableResource er : prepared) {
- xaResources.get(er).commit(xid, false);
- committed.add(er);
+ acquireMutex(0, true, XAException.class);
+ try {
+ lastActive = -1;
+
+ // FIXME: Consider the possiblity prepare failed, or was incomplete.
+ for (EnlistableResource er : prepared) {
+ xaResources.get(er).commit(xid, false);
+ committed.add(er);
+ }
+ cleanupTransaction();
+ } finally {
+ releaseMutex();
}
- cleanupTransaction();
}
boolean isHeuristicallyRollbacked() {
@@ -235,14 +286,20 @@
void prepare(Xid xid) throws XAException {
report("prepare");
- long la = lastActive;
- lastActive = -1;
- for (EnlistableResource er : enlisted) {
- xaResources.get(er).prepare(xid);
- prepared.add(er);
+ acquireMutex(0, true, XAException.class);
+ try {
+ long la = lastActive;
+ lastActive = -1;
+
+ for (EnlistableResource er : enlisted) {
+ xaResources.get(er).prepare(xid);
+ prepared.add(er);
+ }
+ lastActive = (la != -1) ? System.currentTimeMillis() : -1;
+ } finally {
+ releaseMutex();
}
- lastActive = (la != -1) ? System.currentTimeMillis() : -1;
}
/**
@@ -251,78 +308,84 @@
*/
void rollback(Xid xid) throws XAException {
report("rollback");
- lastActive = -1;
+
+ acquireMutex(0, true, XAException.class);
try {
- rollback = true;
- Map<EnlistableResource, XAException> rollbackFailed = new HashMap<EnlistableResource, XAException>();
+ lastActive = -1;
+ try {
+ rollback = true;
+ Map<EnlistableResource, XAException> rollbackFailed = new HashMap<EnlistableResource, XAException>();
- for (EnlistableResource er : enlisted) {
- try {
- if (!committed.contains(er)) {
- xaResources.get(er).rollback(xid);
- rollbacked.add(er);
+ for (EnlistableResource er : enlisted) {
+ try {
+ if (!committed.contains(er)) {
+ xaResources.get(er).rollback(xid);
+ rollbacked.add(er);
+ }
+ } catch (XAException ex) {
+ logger.error("Attempt to rollback resource failed", ex);
+ rollbackFailed.put(er, ex);
}
- } catch (XAException ex) {
- logger.error("Attempt to rollback resource failed", ex);
- rollbackFailed.put(er, ex);
}
- }
- if (rollbackFailed.isEmpty()) {
- if (committed.isEmpty()) { // Clean failure and rollback
- return; // SUCCESSFUL ROLLBACK - RETURN
- } else { // No rollback-failure, but partial commit
- heurCode = XAException.XA_HEURMIX;
- throw new XAException(heurCode);
- }
- } else {
- // Something went wrong - start by assuming if one committed all committed
- heurCode = (committed.isEmpty()) ? 0 : XAException.XA_HEURCOM;
- // Then check every rollback failure code for a contradiction to all committed.
- for (XAException xaex : rollbackFailed.values()) {
- switch (xaex.errorCode) {
- case XAException.XA_HEURHAZ:
- case XAException.XAER_NOTA:
- case XAException.XAER_RMERR:
- case XAException.XAER_RMFAIL:
- case XAException.XAER_INVAL:
- case XAException.XAER_PROTO:
- // All these amount to not knowing the result - so we have a hazard
- // unless we already know we have a mixed result.
- if (heurCode != XAException.XA_HEURMIX) {
- heurCode = XAException.XA_HEURHAZ;
- }
- break;
- case XAException.XA_HEURCOM:
- if (!rollbacked.isEmpty() || heurCode == XAException.XA_HEURRB) {
- // We know something else was rollbacked, so we know we have a mixed result.
+ if (rollbackFailed.isEmpty()) {
+ if (committed.isEmpty()) { // Clean failure and rollback
+ return; // SUCCESSFUL ROLLBACK - RETURN
+ } else { // No rollback-failure, but partial commit
+ heurCode = XAException.XA_HEURMIX;
+ throw new XAException(heurCode);
+ }
+ } else {
+ // Something went wrong - start by assuming if one committed all committed
+ heurCode = (committed.isEmpty()) ? 0 : XAException.XA_HEURCOM;
+ // Then check every rollback failure code for a contradiction to all committed.
+ for (XAException xaex : rollbackFailed.values()) {
+ switch (xaex.errorCode) {
+ case XAException.XA_HEURHAZ:
+ case XAException.XAER_NOTA:
+ case XAException.XAER_RMERR:
+ case XAException.XAER_RMFAIL:
+ case XAException.XAER_INVAL:
+ case XAException.XAER_PROTO:
+ // All these amount to not knowing the result - so we have a hazard
+ // unless we already know we have a mixed result.
+ if (heurCode != XAException.XA_HEURMIX) {
+ heurCode = XAException.XA_HEURHAZ;
+ }
+ break;
+ case XAException.XA_HEURCOM:
+ if (!rollbacked.isEmpty() || heurCode == XAException.XA_HEURRB) {
+ // We know something else was rollbacked, so we know we have a mixed result.
+ heurCode = XAException.XA_HEURMIX;
+ } else if (heurCode == 0) {
+ heurCode = XAException.XA_HEURCOM;
+ } // else it's a HEURHAZ or a HEURCOM and stays that way.
+ break;
+ case XAException.XA_HEURRB:
+ if (!committed.isEmpty() || heurCode == XAException.XA_HEURCOM) {
+ heurCode = XAException.XA_HEURMIX;
+ } else if (heurCode == 0) {
+ heurCode = XAException.XA_HEURRB;
+ } // else it's a HEURHAZ or a HEURRB and stays that way.
+ break;
+ case XAException.XA_HEURMIX:
+ // It can't get worse than, we know we have a mixed result.
heurCode = XAException.XA_HEURMIX;
- } else if (heurCode == 0) {
- heurCode = XAException.XA_HEURCOM;
- } // else it's a HEURHAZ or a HEURCOM and stays that way.
- break;
- case XAException.XA_HEURRB:
- if (!committed.isEmpty() || heurCode == XAException.XA_HEURCOM) {
- heurCode = XAException.XA_HEURMIX;
- } else if (heurCode == 0) {
- heurCode = XAException.XA_HEURRB;
- } // else it's a HEURHAZ or a HEURRB and stays that way.
- break;
- case XAException.XA_HEURMIX:
- // It can't get worse than, we know we have a mixed result.
- heurCode = XAException.XA_HEURMIX;
- break;
- default:
- // The codes above are the only codes permitted from a rollback() so
- // anything else indicates a serious error in the resource-manager.
- throw new XAException(XAException.XAER_RMERR);
+ break;
+ default:
+ // The codes above are the only codes permitted from a rollback() so
+ // anything else indicates a serious error in the resource-manager.
+ throw new XAException(XAException.XAER_RMERR);
+ }
}
+
+ throw new XAException(heurCode);
}
-
- throw new XAException(heurCode);
+ } finally {
+ cleanupTransaction();
}
} finally {
- cleanupTransaction();
+ releaseMutex();
}
}
@@ -347,6 +410,17 @@
}
}
+ private <T extends Throwable> void acquireMutex(long timeout, boolean isXACompletion, Class<T> exc) throws T {
+ synchronized (factory.getMutexLock()) {
+ factory.acquireMutex(timeout, exc);
+ inXACompletion = isXACompletion;
+ }
+ }
+
+ private void releaseMutex() {
+ factory.releaseMutex();
+ }
+
private void report(String desc) {
if (logger.isInfoEnabled()) {
logger.info(desc + ": " + System.identityHashCode(this));
Modified: branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransactionFactory.java
===================================================================
--- branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransactionFactory.java 2008-07-07 12:54:12 UTC (rev 1059)
+++ branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransactionFactory.java 2008-07-07 12:54:19 UTC (rev 1060)
@@ -63,7 +63,7 @@
}
public MulgaraTransaction getTransaction(boolean write) throws MulgaraTransactionException {
- acquireMutex();
+ acquireMutex(0, MulgaraTransactionException.class);
try {
if (associatedTransaction == null) {
throw new MulgaraTransactionException("No externally mediated transaction associated with session");
@@ -79,7 +79,7 @@
protected MulgaraExternalTransaction createTransaction(Xid xid, boolean write)
throws MulgaraTransactionException {
- acquireMutex();
+ acquireMutex(0, MulgaraTransactionException.class);
try {
if (associatedTransaction != null) {
throw new MulgaraTransactionException(
@@ -90,15 +90,11 @@
}
if (write) {
- // see comment in MulgaraInternalTransactionFactory regarding releasing the lock here
- runWithoutMutex(new TransactionOperation() {
- public void execute() throws MulgaraTransactionException {
- manager.obtainWriteLock(session);
- }
- });
+ manager.obtainWriteLock(session);
+ MulgaraExternalTransaction xa = null;
try {
- MulgaraExternalTransaction xa = new MulgaraExternalTransaction(this, xid, session.newOperationContext(true));
- writeTransaction = xa;
+ xa = new MulgaraExternalTransaction(this, xid, session.newOperationContext(true));
+ setWriteTransaction(xa);
associatedTransaction = xa;
transactions.add(xa);
transactionCreated(xa);
@@ -106,6 +102,8 @@
return xa;
} catch (Throwable th) {
manager.releaseWriteLock(session);
+ if (xa != null)
+ transactionComplete(xa);
throw new MulgaraTransactionException("Error initiating write transaction", th);
}
} else {
@@ -130,7 +128,7 @@
}
public XAResource getXAResource(boolean writing) {
- acquireMutex();
+ acquireMutex(0, RuntimeException.class);
try {
return xaResource.getResource(writing);
} finally {
@@ -140,7 +138,7 @@
public void transactionComplete(MulgaraExternalTransaction xa)
throws MulgaraTransactionException {
- acquireMutex();
+ acquireMutex(0, MulgaraTransactionException.class);
try {
super.transactionComplete(xa);
@@ -149,7 +147,7 @@
}
if (xa == writeTransaction) {
manager.releaseWriteLock(session);
- writeTransaction = null;
+ setWriteTransaction(null);
}
transactions.remove(xa);
if (associatedTransaction == xa) {
@@ -161,7 +159,7 @@
}
public boolean hasAssociatedTransaction() {
- acquireMutex();
+ acquireMutex(0, RuntimeException.class);
try {
return associatedTransaction != null;
} finally {
@@ -170,7 +168,7 @@
}
public boolean associateTransaction(MulgaraExternalTransaction xa) {
- acquireMutex();
+ acquireMutex(0, RuntimeException.class);
try {
if (associatedTransaction != null) {
return false;
@@ -184,7 +182,7 @@
}
public MulgaraExternalTransaction getAssociatedTransaction() {
- acquireMutex();
+ acquireMutex(0, RuntimeException.class);
try {
return associatedTransaction;
} finally {
@@ -194,7 +192,7 @@
public void disassociateTransaction(MulgaraExternalTransaction xa)
throws MulgaraTransactionException {
- acquireMutex();
+ acquireMutex(0, MulgaraTransactionException.class);
try {
if (associatedTransaction == xa) {
associatedTransaction = null;
@@ -205,7 +203,7 @@
}
public void closingSession() throws MulgaraTransactionException {
- acquireMutex();
+ acquireMutex(0, MulgaraTransactionException.class);
try {
try {
super.closingSession();
Modified: branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraInternalTransaction.java
===================================================================
--- branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraInternalTransaction.java 2008-07-07 12:54:12 UTC (rev 1059)
+++ branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraInternalTransaction.java 2008-07-07 12:54:19 UTC (rev 1060)
@@ -81,8 +81,8 @@
private Transaction transaction;
private Thread currentThread;
- private ReentrantLock activationMutex;
- private long deactivateTime;
+ private volatile long deactivateTime;
+ private boolean inXACompletion;
private State state;
private int inuse;
@@ -103,7 +103,6 @@
this.factory = factory;
this.context = context;
this.enlisted = new HashSet<EnlistableResource>();
- this.activationMutex = new ReentrantLock();
this.currentThread = null;
inuse = 0;
@@ -120,53 +119,46 @@
private void activate() throws MulgaraTransactionException {
// report("Activating Transaction");
try {
- synchronized (this) {
- if (currentThread != null && !currentThread.equals(Thread.currentThread())) {
- throw new MulgaraTransactionException("Concurrent access attempted to transaction: Transaction has NOT been rolledback.");
- }
+ if (currentThread != null && !currentThread.equals(Thread.currentThread())) {
+ throw new MulgaraTransactionException("Concurrent access attempted to transaction: Transaction has NOT been rolledback.");
}
-
- acquireActivationMutex();
+
deactivateTime = -1;
- try {
- switch (state) {
- case CONSTRUCTEDUNREF:
- startTransaction();
- inuse = 1;
- state = State.ACTUNREF;
- try {
- context.initiate(this);
- } catch (Throwable th) {
- throw implicitRollback(th);
- }
- break;
- case CONSTRUCTEDREF:
- startTransaction();
- inuse = 1;
- using = 1;
- state = State.ACTREF;
- try {
- context.initiate(this);
- } catch (Throwable th) {
- throw implicitRollback(th);
- }
- break;
- case DEACTREF:
- resumeTransaction();
- inuse = 1;
- state = State.ACTREF;
- break;
- case ACTREF:
- case ACTUNREF:
- inuse++;
- break;
- case FINISHED:
- throw new MulgaraTransactionException("Attempt to activate terminated transaction");
- case FAILED:
- throw new MulgaraTransactionException("Attempt to activate failed transaction", rollbackCause);
- }
- } finally {
- releaseActivationMutex();
+ switch (state) {
+ case CONSTRUCTEDUNREF:
+ startTransaction();
+ inuse = 1;
+ state = State.ACTUNREF;
+ try {
+ context.initiate(this);
+ } catch (Throwable th) {
+ throw implicitRollback(th);
+ }
+ break;
+ case CONSTRUCTEDREF:
+ startTransaction();
+ inuse = 1;
+ using = 1;
+ state = State.ACTREF;
+ try {
+ context.initiate(this);
+ } catch (Throwable th) {
+ throw implicitRollback(th);
+ }
+ break;
+ case DEACTREF:
+ resumeTransaction();
+ inuse = 1;
+ state = State.ACTREF;
+ break;
+ case ACTREF:
+ case ACTUNREF:
+ inuse++;
+ break;
+ case FINISHED:
+ throw new MulgaraTransactionException("Attempt to activate terminated transaction");
+ case FAILED:
+ throw new MulgaraTransactionException("Attempt to activate failed transaction", rollbackCause);
}
try {
@@ -188,49 +180,42 @@
// report("Deactivating transaction");
try {
- synchronized (this) {
- if (currentThread == null) {
- throw new MulgaraTransactionException("Transaction not associated with thread");
- } else if (!currentThread.equals(Thread.currentThread())) {
- throw new MulgaraTransactionException("Concurrent access attempted to transaction: Transaction has NOT been rolledback.");
- }
+ if (currentThread == null) {
+ throw new MulgaraTransactionException("Transaction not associated with thread");
+ } else if (!currentThread.equals(Thread.currentThread())) {
+ throw new MulgaraTransactionException("Concurrent access attempted to transaction: Transaction has NOT been rolledback.");
}
-
- acquireActivationMutex();
+
deactivateTime = System.currentTimeMillis();
- try {
- switch (state) {
- case ACTUNREF:
- if (inuse == 1) {
- commitTransaction();
- }
+ switch (state) {
+ case ACTUNREF:
+ if (inuse == 1) {
+ commitTransaction();
+ }
+ inuse--;
+ break;
+ case ACTREF:
+ if (inuse == 1) {
+ suspendTransaction();
+ }
+ inuse--;
+ break;
+ case CONSTRUCTEDREF:
+ throw new MulgaraTransactionException("Attempt to deactivate uninitiated refed transaction");
+ case CONSTRUCTEDUNREF:
+ throw new MulgaraTransactionException("Attempt to deactivate uninitiated transaction");
+ case DEACTREF:
+ throw new IllegalStateException("Attempt to deactivate unactivated transaction");
+ case FINISHED:
+ if (inuse < 0) {
+ errorReport("Activation count failure - too many deacts - in finished transaction", null);
+ } else {
inuse--;
- break;
- case ACTREF:
- if (inuse == 1) {
- suspendTransaction();
- }
- inuse--;
- break;
- case CONSTRUCTEDREF:
- throw new MulgaraTransactionException("Attempt to deactivate uninitiated refed transaction");
- case CONSTRUCTEDUNREF:
- throw new MulgaraTransactionException("Attempt to deactivate uninitiated transaction");
- case DEACTREF:
- throw new IllegalStateException("Attempt to deactivate unactivated transaction");
- case FINISHED:
- if (inuse < 0) {
- errorReport("Activation count failure - too many deacts - in finished transaction", null);
- } else {
- inuse--;
- }
- break;
- case FAILED:
- // Nothing to do here.
- break;
- }
- } finally {
- releaseActivationMutex();
+ }
+ break;
+ case FAILED:
+ // Nothing to do here.
+ break;
}
} catch (MulgaraTransactionException em) {
throw em;
@@ -246,161 +231,204 @@
// references a transaction object that won't be started/activated
// until it is first used.
public void reference() throws MulgaraTransactionException {
+ report("Referencing Transaction");
+
+ acquireMutex(0, false, MulgaraTransactionException.class);
try {
- report("Referencing Transaction");
-
- synchronized (this) {
+ try {
if (currentThread != null && !currentThread.equals(Thread.currentThread())) {
throw new MulgaraTransactionException("Concurrent access attempted to transaction: Transaction has NOT been rolledback.");
}
- }
- switch (state) {
- case CONSTRUCTEDUNREF:
- state = State.CONSTRUCTEDREF;
- break;
- case ACTREF:
- case ACTUNREF:
- using++;
- state = State.ACTREF;
- break;
- case DEACTREF:
- using++;
- break;
- case CONSTRUCTEDREF:
- throw new MulgaraTransactionException("Attempt to reference uninitated transaction twice");
- case FINISHED:
- throw new MulgaraTransactionException("Attempt to reference terminated transaction");
- case FAILED:
- throw new MulgaraTransactionException("Attempt to reference failed transaction", rollbackCause);
+ switch (state) {
+ case CONSTRUCTEDUNREF:
+ state = State.CONSTRUCTEDREF;
+ break;
+ case ACTREF:
+ case ACTUNREF:
+ using++;
+ state = State.ACTREF;
+ break;
+ case DEACTREF:
+ using++;
+ break;
+ case CONSTRUCTEDREF:
+ throw new MulgaraTransactionException("Attempt to reference uninitated transaction twice");
+ case FINISHED:
+ throw new MulgaraTransactionException("Attempt to reference terminated transaction");
+ case FAILED:
+ throw new MulgaraTransactionException("Attempt to reference failed transaction", rollbackCause);
+ }
+ } catch (MulgaraTransactionException em) {
+ throw em;
+ } catch (Throwable th) {
+ report("Error referencing transaction");
+ throw implicitRollback(th);
+ } finally {
+ report("Leaving Reference Transaction");
}
- } catch (MulgaraTransactionException em) {
- throw em;
- } catch (Throwable th) {
- report("Error referencing transaction");
- throw implicitRollback(th);
} finally {
- report("Leaving Reference Transaction");
+ releaseMutex();
}
}
public void dereference() throws MulgaraTransactionException {
report("Dereferencing Transaction");
+
+ acquireMutex(0, false, MulgaraTransactionException.class);
try {
- synchronized (this) {
- if (currentThread != null && !currentThread.equals(Thread.currentThread())) {
- throw new MulgaraTransactionException("Concurrent access attempted to transaction: Transaction has NOT been rolledback.");
+ try {
+ synchronized (this) {
+ if (currentThread != null && !currentThread.equals(Thread.currentThread())) {
+ throw new MulgaraTransactionException("Concurrent access attempted to transaction: Transaction has NOT been rolledback.");
+ }
}
- }
- switch (state) {
- case ACTREF:
- if (using == 1) {
- state = State.ACTUNREF;
- }
- using--;
- break;
- case CONSTRUCTEDREF:
- state = State.CONSTRUCTEDUNREF;
- break;
- case FINISHED:
- case FAILED:
- if (using < 1) {
- errorReport("Reference count failure - too many derefs - in finished transaction", null);
- } else {
+ switch (state) {
+ case ACTREF:
+ if (using == 1) {
+ state = State.ACTUNREF;
+ }
using--;
- }
- break;
- case ACTUNREF:
- throw new IllegalStateException("Attempt to dereference unreferenced transaction");
- case CONSTRUCTEDUNREF:
- throw new MulgaraTransactionException("Attempt to dereference uninitated transaction");
- case DEACTREF:
- throw new IllegalStateException("Attempt to dereference deactivated transaction");
+ break;
+ case CONSTRUCTEDREF:
+ state = State.CONSTRUCTEDUNREF;
+ break;
+ case FINISHED:
+ case FAILED:
+ if (using < 1) {
+ errorReport("Reference count failure - too many derefs - in finished transaction", null);
+ } else {
+ using--;
+ }
+ break;
+ case ACTUNREF:
+ throw new IllegalStateException("Attempt to dereference unreferenced transaction");
+ case CONSTRUCTEDUNREF:
+ throw new MulgaraTransactionException("Attempt to dereference uninitated transaction");
+ case DEACTREF:
+ throw new IllegalStateException("Attempt to dereference deactivated transaction");
+ }
+ } catch (MulgaraTransactionException em) {
+ throw em;
+ } catch (Throwable th) {
+ throw implicitRollback(th);
+ } finally {
+ report("Dereferenced Transaction");
}
- } catch (MulgaraTransactionException em) {
- throw em;
- } catch (Throwable th) {
- throw implicitRollback(th);
} finally {
- report("Dereferenced Transaction");
+ releaseMutex();
}
}
private void startTransaction() throws MulgaraTransactionException {
report("Initiating transaction");
+ acquireMutex(0, false, MulgaraTransactionException.class);
try {
- transaction = factory.transactionStart(this);
- synchronized (this) {
- currentThread = Thread.currentThread();
+ try {
+ transaction = factory.transactionStart(this);
+ synchronized (this) {
+ currentThread = Thread.currentThread();
+ }
+ } catch (Throwable th) {
+ throw abortTransaction("Failed to start transaction", th);
}
- } catch (Throwable th) {
- throw abortTransaction("Failed to start transaction", th);
+ } finally {
+ releaseMutex();
}
}
private void resumeTransaction() throws MulgaraTransactionException {
// report("Resuming transaction");
+ acquireMutex(0, false, MulgaraTransactionException.class);
try {
- factory.transactionResumed(this, transaction);
- synchronized (this) {
- currentThread = Thread.currentThread();
+ try {
+ factory.transactionResumed(this, transaction);
+ synchronized (this) {
+ currentThread = Thread.currentThread();
+ }
+ } catch (Throwable th) {
+ throw abortTransaction("Failed to resume transaction", th);
}
- } catch (Throwable th) {
- throw abortTransaction("Failed to resume transaction", th);
+ } finally {
+ releaseMutex();
}
}
private void suspendTransaction() throws MulgaraTransactionException {
// report("Suspending Transaction");
+ acquireMutex(0, false, MulgaraTransactionException.class);
try {
- if (using < 1) {
- throw implicitRollback(
- new MulgaraTransactionException("Attempt to suspend unreferenced transaction"));
+ try {
+ if (using < 1) {
+ throw implicitRollback(
+ new MulgaraTransactionException("Attempt to suspend unreferenced transaction"));
+ }
+ transaction = factory.transactionSuspended(this);
+ synchronized (this) {
+ currentThread = null;
+ }
+ state = State.DEACTREF;
+ } catch (Throwable th) {
+ throw implicitRollback(th);
+ } finally {
+// report("Finished suspending transaction");
}
- transaction = factory.transactionSuspended(this);
- synchronized (this) {
- currentThread = null;
- }
- state = State.DEACTREF;
- } catch (Throwable th) {
- throw implicitRollback(th);
} finally {
-// report("Finished suspending transaction");
+ releaseMutex();
}
}
public void commitTransaction() throws MulgaraTransactionException {
report("Committing Transaction");
+ acquireMutex(0, true, MulgaraTransactionException.class);
try {
- transaction.commit();
- } catch (Throwable th) {
- throw implicitRollback(th);
- }
- try {
try {
- transaction = null;
- } finally { try {
- state = State.FINISHED;
- } finally { try {
- context.clear();
- } finally { try {
- enlisted.clear();
- } finally { try {
- factory.transactionComplete(this);
- } finally { try {
- factory = null;
- } finally {
- report("Committed transaction");
- } } } } } }
- } catch (Throwable th) {
- errorReport("Error cleaning up transaction post-commit", th);
- throw new MulgaraTransactionException("Error cleaning up transaction post-commit", th);
+ transaction.commit();
+ } catch (Throwable th) {
+ throw implicitRollback(th);
+ }
+ try {
+ try {
+ transaction = null;
+ } finally { try {
+ state = State.FINISHED;
+ } finally { try {
+ context.clear();
+ } finally { try {
+ enlisted.clear();
+ } finally { try {
+ factory.transactionComplete(this);
+ } finally { try {
+ //FIXME: factory = null;
+ } finally {
+ report("Committed transaction");
+ } } } } } }
+ } catch (Throwable th) {
+ errorReport("Error cleaning up transaction post-commit", th);
+ throw new MulgaraTransactionException("Error cleaning up transaction post-commit", th);
+ }
+ } finally {
+ releaseMutex();
}
}
public void heuristicRollback(String cause) throws MulgaraTransactionException {
- implicitRollback(new MulgaraTransactionException(cause));
+ synchronized (factory.getMutexLock()) {
+ if (factory.getMutexHolder() != null && factory.getMutexHolder() != Thread.currentThread()) {
+ if (inXACompletion) {
+ return; // this txn is already being cleaned up, so let it go
+ }
+ }
+
+ acquireMutex(10*1000L, true, MulgaraTransactionException.class); // 10 seconds
+ }
+
+ try {
+ implicitRollback(new MulgaraTransactionException(cause));
+ } finally {
+ releaseMutex();
+ }
}
/**
@@ -409,41 +437,44 @@
* after all we requested it.
*/
public void explicitRollback() throws MulgaraTransactionException {
- synchronized (this) {
+ acquireMutex(0, true, MulgaraTransactionException.class);
+ try {
if (currentThread == null) {
throw new MulgaraTransactionException("Transaction failed activation check");
} else if (!currentThread.equals(Thread.currentThread())) {
throw new MulgaraTransactionException("Concurrent access attempted to transaction: Transaction has NOT been rolledback.");
}
- }
- try {
- switch (state) {
- case ACTUNREF:
- case ACTREF:
- transaction.rollback();
- context.clear();
- enlisted.clear();
- factory.transactionComplete(this);
- transaction = null;
- factory = null;
- state = State.FINISHED;
- break;
- case DEACTREF:
- throw new IllegalStateException("Attempt to rollback unactivated transaction");
- case CONSTRUCTEDREF:
- throw new MulgaraTransactionException("Attempt to rollback uninitiated ref'd transaction");
- case CONSTRUCTEDUNREF:
- throw new MulgaraTransactionException("Attempt to rollback uninitiated unref'd transaction");
- case FINISHED:
- throw new MulgaraTransactionException("Attempt to rollback finished transaction");
- case FAILED:
- throw new MulgaraTransactionException("Attempt to rollback failed transaction");
+ try {
+ switch (state) {
+ case ACTUNREF:
+ case ACTREF:
+ transaction.rollback();
+ context.clear();
+ enlisted.clear();
+ factory.transactionComplete(this);
+ transaction = null;
+ //FIXME: factory = null;
+ state = State.FINISHED;
+ break;
+ case DEACTREF:
+ throw new IllegalStateException("Attempt to rollback unactivated transaction");
+ case CONSTRUCTEDREF:
+ throw new MulgaraTransactionException("Attempt to rollback uninitiated ref'd transaction");
+ case CONSTRUCTEDUNREF:
+ throw new MulgaraTransactionException("Attempt to rollback uninitiated unref'd transaction");
+ case FINISHED:
+ throw new MulgaraTransactionException("Attempt to rollback finished transaction");
+ case FAILED:
+ throw new MulgaraTransactionException("Attempt to rollback failed transaction");
+ }
+ } catch (MulgaraTransactionException em) {
+ throw em;
+ } catch (Throwable th) {
+ throw implicitRollback(th);
}
- } catch (MulgaraTransactionException em) {
- throw em;
- } catch (Throwable th) {
- throw implicitRollback(th);
+ } finally {
+ releaseMutex();
}
}
@@ -456,16 +487,18 @@
* from abortTransaction().
* Post-condition: The transaction is terminated and cleaned up.
*/
- MulgaraTransactionException implicitRollback(Throwable cause) throws MulgaraTransactionException {
+ private MulgaraTransactionException implicitRollback(Throwable cause) throws MulgaraTransactionException {
+ report("Implicit Rollback triggered");
+
+ synchronized (factory.getMutexLock()) {
+ inXACompletion = true;
+ }
+
try {
- report("Implicit Rollback triggered");
-
- synchronized (this) {
- if (currentThread == null) {
- throw new MulgaraTransactionException("Transaction not associated with thread");
- } else if (!currentThread.equals(Thread.currentThread())) {
- throw new MulgaraTransactionException("Concurrent access attempted to transaction: Transaction has NOT been rolledback.");
- }
+ if (currentThread == null) {
+ throw new MulgaraTransactionException("Transaction not associated with thread");
+ } else if (!currentThread.equals(Thread.currentThread())) {
+ throw new MulgaraTransactionException("Concurrent access attempted to transaction: Transaction has NOT been rolledback.");
}
if (rollbackCause != null) {
@@ -485,7 +518,7 @@
enlisted.clear();
state = State.FAILED;
factory.transactionAborted(this);
- factory = null;
+ //FIXME: factory = null;
return new MulgaraTransactionException("Transaction rollback triggered", cause);
case DEACTREF:
throw new IllegalStateException("Attempt to rollback deactivated transaction");
@@ -522,36 +555,41 @@
*/
public MulgaraTransactionException abortTransaction(String errorMessage, Throwable cause)
throws MulgaraTransactionException {
- // We need to notify the factory here - this is serious, we
- // need to rollback this transaction, but if we have reached here
- // we have failed to obtain a valid transaction to rollback!
+ acquireMutex(10*1000L, true, MulgaraTransactionException.class); // 10 seconds
try {
+ // We need to notify the factory here - this is serious, we
+ // need to rollback this transaction, but if we have reached here
+ // we have failed to obtain a valid transaction to rollback!
try {
- errorReport(errorMessage + " - Aborting", cause);
- } finally { try {
- if (transaction != null) {
- transaction.rollback();
- }
- } finally { try {
- factory.transactionAborted(this);
- } finally { try {
- abortEnlistedResources();
- } finally { try {
- context.clear();
- } finally { try {
- enlisted.clear();
- } finally { try {
- transaction = null;
- } finally { try {
- factory = null;
+ try {
+ errorReport(errorMessage + " - Aborting", cause);
+ } finally { try {
+ if (transaction != null) {
+ transaction.rollback();
+ }
+ } finally { try {
+ factory.transactionAborted(this);
+ } finally { try {
+ abortEnlistedResources();
+ } finally { try {
+ context.clear();
+ } finally { try {
+ enlisted.clear();
+ } finally { try {
+ transaction = null;
+ } finally { try {
+ //FIXME: factory = null;
+ } finally {
+ state = State.FAILED;
+ } } } } } } } }
+ return new MulgaraTransactionException(errorMessage + " - Aborting", cause);
+ } catch (Throwable th) {
+ throw new MulgaraTransactionException(errorMessage + " - Failed to abort cleanly", th);
} finally {
- state = State.FAILED;
- } } } } } } } }
- return new MulgaraTransactionException(errorMessage + " - Aborting", cause);
- } catch (Throwable th) {
- throw new MulgaraTransactionException(errorMessage + " - Failed to abort cleanly", th);
+ report("Leaving abortTransaction");
+ }
} finally {
- report("Leaving abortTransaction");
+ releaseMutex();
}
}
@@ -572,91 +610,112 @@
public void execute(Operation operation, DatabaseMetadata metadata) throws MulgaraTransactionException {
report("Executing Operation");
+
+ acquireMutex(0, false, MulgaraTransactionException.class);
try {
- activate();
try {
- operation.execute(context,
- context.getSystemResolver(),
- metadata);
- } catch (Throwable th) {
- throw implicitRollback(th);
+ activate();
+ try {
+ operation.execute(context,
+ context.getSystemResolver(),
+ metadata);
+ } catch (Throwable th) {
+ throw implicitRollback(th);
+ } finally {
+ deactivate();
+ }
} finally {
- deactivate();
+ report("Executed Operation");
}
} finally {
- report("Executed Operation");
+ releaseMutex();
}
}
public AnswerOperationResult execute(AnswerOperation ao) throws TuplesException {
debugReport("Executing AnswerOperation");
+
+ acquireMutex(0, false, TuplesException.class);
try {
- activate();
try {
- ao.execute();
- return ao.getResult();
- } catch (Throwable th) {
- throw implicitRollback(th);
+ activate();
+ try {
+ ao.execute();
+ return ao.getResult();
+ } catch (Throwable th) {
+ throw implicitRollback(th);
+ } finally {
+ deactivate();
+ }
+ } catch (MulgaraTransactionException em) {
+ throw new TuplesException("Transaction error", em);
} finally {
- deactivate();
+ debugReport("Executed AnswerOperation");
}
- } catch (MulgaraTransactionException em) {
- throw new TuplesException("Transaction error", em);
} finally {
- debugReport("Executed AnswerOperation");
+ releaseMutex();
}
}
public void execute(TransactionOperation to) throws MulgaraTransactionException {
report("Executing TransactionOperation");
+
+ acquireMutex(0, false, MulgaraTransactionException.class);
try {
- activate();
try {
- to.execute();
- } catch (Throwable th) {
- throw implicitRollback(th);
+ activate();
+ try {
+ to.execute();
+ } catch (Throwable th) {
+ throw implicitRollback(th);
+ } finally {
+ deactivate();
+ }
} finally {
- deactivate();
+ report("Executed TransactionOperation");
}
} finally {
- report("Executed TransactionOperation");
+ releaseMutex();
}
}
public void enlist(EnlistableResource enlistable) throws MulgaraTransactionException {
+ acquireMutex(0, false, MulgaraTransactionException.class);
try {
- synchronized (this) {
+ try {
if (currentThread == null) {
throw new MulgaraTransactionException("Transaction not associated with thread");
} else if (!currentThread.equals(Thread.currentThread())) {
throw new MulgaraTransactionException("Concurrent access attempted to transaction: Transaction has NOT been rolledback.");
}
- }
- if (enlisted.contains(enlistable)) {
- return;
- }
+ if (enlisted.contains(enlistable)) {
+ return;
+ }
- switch (state) {
- case ACTUNREF:
- case ACTREF:
- transaction.enlistResource(enlistable.getXAResource());
- enlisted.add(enlistable);
- break;
- case CONSTRUCTEDREF:
- throw new MulgaraTransactionException("Attempt to enlist resource in uninitated ref'd transaction");
- case CONSTRUCTEDUNREF:
- throw new MulgaraTransactionException("Attempt to enlist resource in uninitated unref'd transaction");
- case DEACTREF:
- throw new MulgaraTransactionException("Attempt to enlist resource in unactivated transaction");
- case FINISHED:
- throw new MulgaraTransactionException("Attempt to enlist resource in finished transaction");
- case FAILED:
- throw new MulgaraTransactionException("Attempt to enlist resource in failed transaction");
+ switch (state) {
+ case ACTUNREF:
+ case ACTREF:
+ transaction.enlistResource(enlistable.getXAResource());
+ enlisted.add(enlistable);
+ break;
+ case CONSTRUCTEDREF:
+ throw new MulgaraTransactionException("Attempt to enlist resource in uninitated ref'd transaction");
+ case CONSTRUCTEDUNREF:
+ throw new MulgaraTransactionException("Attempt to enlist resource in uninitated unref'd transaction");
+ case DEACTREF:
+ throw new MulgaraTransactionException("Attempt to enlist resource in unactivated transaction");
+ case FINISHED:
+ throw new MulgaraTransactionException("Attempt to enlist resource in finished transaction");
+ case FAILED:
+ throw new MulgaraTransactionException("Attempt to enlist resource in failed transaction");
+ }
+ } catch (Throwable th) {
+ throw implicitRollback(th);
}
- } catch (Throwable th) {
- throw implicitRollback(th);
+ } finally {
+ releaseMutex();
}
}
@@ -669,12 +728,10 @@
//
private void checkActivated() throws MulgaraTransactionException {
- synchronized (this) {
- if (currentThread == null) {
- throw new MulgaraTransactionException("Transaction not associated with thread");
- } else if (!currentThread.equals(Thread.currentThread())) {
- throw new MulgaraTransactionException("Concurrent access attempted to transaction: Transaction has NOT been rolledback.");
- }
+ if (currentThread == null) {
+ throw new MulgaraTransactionException("Transaction not associated with thread");
+ } else if (!currentThread.equals(Thread.currentThread())) {
+ throw new MulgaraTransactionException("Concurrent access attempted to transaction: Transaction has NOT been rolledback.");
}
switch (state) {
@@ -697,12 +754,15 @@
}
}
- private void acquireActivationMutex() {
- activationMutex.lock();
+ private <T extends Throwable> void acquireMutex(long timeout, boolean isXACompletion, Class<T> exc) throws T {
+ synchronized (factory.getMutexLock()) {
+ factory.acquireMutex(timeout, exc);
+ inXACompletion = isXACompletion;
+ }
}
- private void releaseActivationMutex() {
- activationMutex.unlock();
+ private void releaseMutex() {
+ factory.releaseMutex();
}
protected void finalize() {
@@ -720,7 +780,8 @@
errorReport("Reference counting error in transaction", null);
}
- if (factory != null || transaction != null) {
+ //FIXME: if (factory != null || transaction != null) {
+ if (transaction != null) {
errorReport("Transaction not terminated properly", null);
}
}
Modified: branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraInternalTransactionFactory.java
===================================================================
--- branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraInternalTransactionFactory.java 2008-07-07 12:54:12 UTC (rev 1059)
+++ branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraInternalTransactionFactory.java 2008-07-07 12:54:19 UTC (rev 1060)
@@ -87,7 +87,7 @@
}
public MulgaraTransaction getTransaction(boolean write) throws MulgaraTransactionException {
- acquireMutex();
+ acquireMutex(0, MulgaraTransactionException.class);
try {
if (explicitXA != null) {
return explicitXA;
@@ -96,20 +96,11 @@
try {
MulgaraInternalTransaction transaction;
if (write) {
- // We must release the mutex so that if the manager blocks trying to acquire the write
- // lock then the client holding it can still enter here and release the lock (i.e. we
- // end up acquiring two sets of mutexes, this one and the one in the manager, but the
- // wait() in the manager only releases the second mutex, hence we must release this one
- // ourselves first).
- runWithoutMutex(new TransactionOperation() {
- public void execute() throws MulgaraTransactionException {
- manager.obtainWriteLock(session);
- }
- });
+ manager.obtainWriteLock(session);
try {
assert writeTransaction == null;
- writeTransaction = transaction =
- new MulgaraInternalTransaction(this, session.newOperationContext(true));
+ setWriteTransaction(transaction =
+ new MulgaraInternalTransaction(this, session.newOperationContext(true)));
} catch (Throwable th) {
manager.releaseWriteLock(session);
throw new MulgaraTransactionException("Error creating write transaction", th);
@@ -138,7 +129,7 @@
public void commit() throws MulgaraTransactionException {
- acquireMutex();
+ acquireMutex(0, MulgaraTransactionException.class);
try {
if (isFailed) {
throw new MulgaraTransactionException("Attempting to commit failed session");
@@ -166,7 +157,7 @@
* This needs to be distinguished from an implicit rollback triggered by failure.
*/
public void rollback() throws MulgaraTransactionException {
- acquireMutex();
+ acquireMutex(0, MulgaraTransactionException.class);
try {
if (manager.isHoldingWriteLock(session)) {
manager.reserveWriteLock(session);
@@ -205,7 +196,7 @@
}
public void setAutoCommit(boolean autoCommit) throws MulgaraTransactionException {
- acquireMutex();
+ acquireMutex(0, MulgaraTransactionException.class);
try {
if (manager.isHoldingWriteLock(session) && isFailed) {
writeTransaction.abortTransaction("Session failed and still holding writeLock",
@@ -221,16 +212,10 @@
if (manager.isHoldingWriteLock(session)) {
// Within active transaction - commit and finalise.
try {
- // Release mutex to avoid blocking everybody while the commit is in progress,
- // since this can be a lengthy operations (several minutes)
- runWithoutMutex(new TransactionOperation() {
+ writeTransaction.execute(new TransactionOperation() {
public void execute() throws MulgaraTransactionException {
- writeTransaction.execute(new TransactionOperation() {
- public void execute() throws MulgaraTransactionException {
- writeTransaction.dereference();
- ((MulgaraInternalTransaction)writeTransaction).commitTransaction();
- }
- });
+ writeTransaction.dereference();
+ ((MulgaraInternalTransaction)writeTransaction).commitTransaction();
}
});
} finally {
@@ -278,7 +263,7 @@
//
public Transaction transactionStart(MulgaraTransaction transaction) throws MulgaraTransactionException {
- acquireMutex();
+ acquireMutex(0, MulgaraTransactionException.class);
try {
try {
logger.info("Beginning Transaction");
@@ -306,7 +291,7 @@
public void transactionResumed(MulgaraTransaction transaction, Transaction jtaXA)
throws MulgaraTransactionException {
- acquireMutex();
+ acquireMutex(0, MulgaraTransactionException.class);
try {
if (activeTransactions.get(Thread.currentThread()) != null) {
throw new MulgaraTransactionException(
@@ -328,7 +313,7 @@
public Transaction transactionSuspended(MulgaraTransaction transaction)
throws MulgaraTransactionException {
- acquireMutex();
+ acquireMutex(0, MulgaraTransactionException.class);
try {
try {
if (transaction != activeTransactions.get(Thread.currentThread())) {
@@ -361,7 +346,7 @@
}
public void closingSession() throws MulgaraTransactionException {
- acquireMutex();
+ acquireMutex(0, MulgaraTransactionException.class);
try {
try {
super.closingSession();
@@ -374,7 +359,7 @@
}
public void transactionComplete(MulgaraTransaction transaction) throws MulgaraTransactionException {
- acquireMutex();
+ acquireMutex(0, MulgaraTransactionException.class);
try {
super.transactionComplete(transaction);
@@ -383,7 +368,7 @@
if (transaction == writeTransaction) {
if (manager.isHoldingWriteLock(session)) {
manager.releaseWriteLock(session);
- writeTransaction = null;
+ setWriteTransaction(null);
}
}
@@ -395,7 +380,7 @@
}
public void transactionAborted(MulgaraTransaction transaction) {
- acquireMutex();
+ acquireMutex(0, RuntimeException.class);
try {
try {
// Make sure this cleans up the transaction metadata - this transaction is DEAD!
Modified: branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionFactory.java
===================================================================
--- branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionFactory.java 2008-07-07 12:54:12 UTC (rev 1059)
+++ branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionFactory.java 2008-07-07 12:54:19 UTC (rev 1060)
@@ -27,7 +27,6 @@
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
-import java.util.concurrent.locks.ReentrantLock;
// Third party packages
import org.apache.log4j.Logger;
@@ -73,21 +72,30 @@
* Contains a reference the the current writing transaction IFF it is managed
* by this factory. If there is no current writing transaction, or if the
* writing transaction is managed by a different factory then it is null.
+ *
+ * Modifications of this must be holding the mutexLock.
*/
protected MulgaraTransaction writeTransaction;
- private ReentrantLock mutex;
+ private Thread mutexHolder;
+ private int lockCnt;
protected MulgaraTransactionFactory(DatabaseSession session, MulgaraTransactionManager manager) {
this.session = session;
this.manager = manager;
- this.mutex = new ReentrantLock();
+ this.mutexHolder = null;
+ this.lockCnt = 0;
this.writeTransaction = null;
this.reaperTimer = new Timer("Write-lock Reaper", true);
this.idleReaperTask = new IdleReaper(reaperTimer, 10*1000L); // check every 10 seconds
}
+ protected void setWriteTransaction(MulgaraTransaction txn) {
+ synchronized (getMutexLock()) {
+ writeTransaction = txn;
+ }
+ }
protected void transactionCreated(MulgaraTransaction transaction) {
if (transaction != writeTransaction)
@@ -124,7 +132,7 @@
* Will only abort the transaction if the rollback attempt fails.
*/
public void closingSession() throws MulgaraTransactionException {
- acquireMutex();
+ acquireMutex(0, MulgaraTransactionException.class);
logger.debug("Cleaning up any stale transactions on session close");
try {
Map<MulgaraTransaction, Throwable> requiresAbort = new HashMap<MulgaraTransaction, Throwable>();
@@ -148,7 +156,7 @@
error = th;
}
} finally {
- writeTransaction = null;
+ setWriteTransaction(null);
}
}
} finally {
@@ -231,40 +239,88 @@
}
}
- /**
- * Used to replace the built in monitor to allow it to be properly released
- * during potentially blocking operations. All potentially blocking
- * operations involve writes, so in these cases the write-lock is reserved
- * allowing the mutex to be safely released and then reobtained after the
- * blocking operation concludes.
+ /**
+ * Acquire the mutex. The mutex is re-entrant, but {@link #releaseMutex} must be called as many
+ * times as this is called.
+ *
+ * <p>We use our own implementation here (as opposed to, say, java.util.concurrent.Lock) so we
+ * reliable get the current mutex-owner, and we use a lock around the mutex acquisition and
+ * release to do atomic tests and settting of additional variables associated with the mutex.
+ *
+ * @param timeout how many milliseconds to wait for the mutex, or 0 to wait indefinitely
+ * @param T the type of exception to throw on failure
+ * @throws Exception if the mutex could not be acquired, either due to a timeout or due to an
+ * interrupt
*/
- protected void acquireMutex() {
- mutex.lock();
+ public <T extends Throwable> void acquireMutex(long timeout, Class<T> exc) throws T {
+ synchronized (getMutexLock()) {
+ if (mutexHolder == Thread.currentThread()) {
+ lockCnt++;
+ return;
+ }
+
+ long deadline = System.currentTimeMillis() + timeout;
+
+ while (mutexHolder != null) {
+ long wait = deadline - System.currentTimeMillis();
+ if (timeout == 0) {
+ wait = 0;
+ } else if (wait <= 0) {
+ throw newException(exc, "Timed out waiting to acquire lock");
+ }
+
+ try {
+ getMutexLock().wait(wait);
+ } catch (InterruptedException ie) {
+ throw exc.cast(newException(exc, "Interrupted while waiting to acquire lock").initCause(ie));
+ }
+ }
+
+ mutexHolder = Thread.currentThread();
+ lockCnt = 1;
+ }
}
+ /**
+ * Release the mutex.
+ *
+ * @throws IllegalStateException is the mutex is not held by the current thread
+ */
+ public void releaseMutex() {
+ synchronized (getMutexLock()) {
+ if (Thread.currentThread() != mutexHolder) {
+ throw new IllegalStateException("Attempt to release mutex without holding mutex");
+ }
- protected void releaseMutex() {
- if (!mutex.isHeldByCurrentThread()) {
- throw new IllegalStateException("Attempt to release mutex without holding mutex");
+ assert lockCnt > 0;
+ if (--lockCnt <= 0) {
+ mutexHolder = null;
+ getMutexLock().notify();
+ }
}
+ }
- mutex.unlock();
+ /**
+ * @return the lock used to protect access to and to implement the mutex; must be held as shortly
+ * as possible (no blocking operations)
+ */
+ public Object getMutexLock() {
+ return session;
}
- protected void runWithoutMutex(TransactionOperation proc) throws MulgaraTransactionException {
- if (!mutex.isHeldByCurrentThread()) {
- throw new IllegalStateException("Attempt to run procedure without holding mutex");
- }
- int holdCount = mutex.getHoldCount();
- for (int i = 0; i < holdCount; i++) {
- mutex.unlock();
- }
+ /**
+ * @return the current holder of the mutex, or null if none. Must hold the mutex-lock while
+ * calling this.
+ */
+ public Thread getMutexHolder() {
+ return mutexHolder;
+ }
+
+ private static <T> T newException(Class<T> exc, String msg) {
try {
- proc.execute();
- } finally {
- for (int i = 0; i < holdCount; i++) {
- mutex.lock();
- }
+ return exc.getConstructor(String.class).newInstance(msg);
+ } catch (Exception e) {
+ throw new Error("Internal error creating " + exc, e);
}
}
@@ -276,14 +332,18 @@
public void run() {
logger.debug("Idle-reaper running");
- acquireMutex();
+ MulgaraTransaction txn;
+ synchronized (getMutexLock()) {
+ txn = writeTransaction;
+ }
+ long lastActive = (txn != null) ? txn.lastActive() : 0;
+
try {
- if (writeTransaction != null) {
- long lastActive = writeTransaction.lastActive();
+ if (txn != null) {
if ((lastActive > 0) && (lastActive < System.currentTimeMillis() - idleTimeout)) {
logger.warn("Reclaiming writelock from inactive transaction");
- writeTransaction.heuristicRollback("Transaction idle-timeout");
+ txn.heuristicRollback("Transaction idle-timeout");
} else {
if (logger.isDebugEnabled())
logger.debug("Transaction still active: " + lastActive + " time: " + System.currentTimeMillis() + " timeout: " + idleTimeout);
@@ -293,8 +353,6 @@
}
} catch (MulgaraTransactionException em) {
logger.warn("Exception thrown while reclaiming writelock from inactive transaction");
- } finally {
- releaseMutex();
}
}
}
@@ -307,26 +365,19 @@
timer.schedule(this, timeoutMillis);
}
- public boolean cancel() {
- die = true;
- return super.cancel();
- }
-
public void run() {
- acquireMutex();
+ MulgaraTransaction txn;
+ synchronized (getMutexLock()) {
+ txn = writeTransaction;
+ }
try {
- if (die)
- return; // we were cancelled after being scheduled but before mutex was acquired
-
- if (writeTransaction != null) {
+ if (txn != null) {
logger.warn("Reclaiming writelock from over-extended transaction");
- writeTransaction.heuristicRollback("Transaction timed out");
+ txn.heuristicRollback("Transaction timed out");
}
} catch (MulgaraTransactionException em) {
logger.warn("Exception thrown while reclaiming writelock from over-extended transaction", em);
- } finally {
- releaseMutex();
}
}
}
Modified: branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionManager.java
===================================================================
--- branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionManager.java 2008-07-07 12:54:12 UTC (rev 1059)
+++ branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionManager.java 2008-07-07 12:54:19 UTC (rev 1060)
@@ -169,11 +169,21 @@
}
boolean writeLockReserved() {
- return sessionReservingWriteLock != null;
+ acquireMutex();
+ try {
+ return sessionReservingWriteLock != null;
+ } finally {
+ releaseMutex();
+ }
}
boolean writeLockReserved(DatabaseSession session) {
- return session == sessionReservingWriteLock;
+ acquireMutex();
+ try {
+ return session == sessionReservingWriteLock;
+ } finally {
+ releaseMutex();
+ }
}
void releaseReserve(DatabaseSession session) {
Modified: branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraXAResourceContext.java
===================================================================
--- branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraXAResourceContext.java 2008-07-07 12:54:12 UTC (rev 1059)
+++ branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraXAResourceContext.java 2008-07-07 12:54:19 UTC (rev 1060)
@@ -24,7 +24,6 @@
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.locks.ReentrantLock;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
@@ -79,8 +78,6 @@
private final Assoc1toNMap<MulgaraExternalTransaction, Xid> xa2xid;
- private final ReentrantLock mutex;
-
private final UUID uniqueId;
MulgaraXAResourceContext(MulgaraExternalTransactionFactory factory, DatabaseSession session) {
@@ -88,7 +85,6 @@
this.factory = factory;
this.session = session;
this.xa2xid = new Assoc1toNMap<MulgaraExternalTransaction, Xid>();
- this.mutex = new ReentrantLock();
this.uniqueId = UUID.randomUUID();
}
@@ -114,7 +110,7 @@
* for a call to forget().
*/
public void commit(Xid xid, boolean onePhase) throws XAException {
- acquireMutex();
+ factory.acquireMutex(0, XAException.class);
try {
xid = convertXid(xid);
logger.info("Performing commit: " + parseXid(xid));
@@ -167,7 +163,7 @@
}
}
} finally {
- releaseMutex();
+ factory.releaseMutex();
}
}
@@ -181,7 +177,7 @@
* In all cases disassociate from current session.
*/
public void end(Xid xid, int flags) throws XAException {
- acquireMutex();
+ factory.acquireMutex(0, XAException.class);
try {
xid = convertXid(xid);
logger.info("Performing end(" + formatFlags(flags) + "): " + parseXid(xid));
@@ -214,12 +210,12 @@
throw new XAException(XAException.XAER_PROTO);
}
} finally {
- releaseMutex();
+ factory.releaseMutex();
}
}
public void forget(Xid xid) throws XAException {
- acquireMutex();
+ factory.acquireMutex(0, XAException.class);
try {
xid = convertXid(xid);
logger.info("Performing forget: " + parseXid(xid));
@@ -240,22 +236,22 @@
xa2xid.remove1(xa);
}
} finally {
- releaseMutex();
+ factory.releaseMutex();
}
}
- public int getTransactionTimeout() {
- acquireMutex();
+ public int getTransactionTimeout() throws XAException {
+ factory.acquireMutex(0, XAException.class);
try {
logger.info("Performing getTransactionTimeout");
return (int) (session.getTransactionTimeout() / 1000);
} finally {
- releaseMutex();
+ factory.releaseMutex();
}
}
- public boolean isSameRM(XAResource xares) {
- acquireMutex();
+ public boolean isSameRM(XAResource xares) throws XAException {
+ factory.acquireMutex(0, XAException.class);
try {
logger.info("Performing isSameRM");
if (!xares.getClass().equals(MulgaraXAResource.class)) {
@@ -267,13 +263,13 @@
return session == ((MulgaraXAResource)xares).getSession();
}
} finally {
- releaseMutex();
+ factory.releaseMutex();
}
}
public int prepare(Xid xid) throws XAException {
- acquireMutex();
+ factory.acquireMutex(0, XAException.class);
try {
xid = convertXid(xid);
logger.info("Performing prepare: " + parseXid(xid));
@@ -288,7 +284,7 @@
return XA_OK;
} finally {
- releaseMutex();
+ factory.releaseMutex();
}
}
@@ -297,19 +293,19 @@
* FIXME: We should at least handle the case where we are asked to recover
* when we haven't crashed.
*/
- public Xid[] recover(int flag) {
- acquireMutex();
+ public Xid[] recover(int flag) throws XAException {
+ factory.acquireMutex(0, XAException.class);
try {
logger.info("Performing recover");
return new Xid[] {};
} finally {
- releaseMutex();
+ factory.releaseMutex();
}
}
public void rollback(Xid xid) throws XAException {
- acquireMutex();
+ factory.acquireMutex(0, XAException.class);
try {
xid = convertXid(xid);
logger.info("Performing rollback: " + parseXid(xid));
@@ -323,7 +319,7 @@
// transaction. doRollback only throws Heuristic Exceptions.
xa2xid.remove1(xa);
} finally {
- releaseMutex();
+ factory.releaseMutex();
}
}
@@ -346,19 +342,19 @@
if (seconds < 0)
throw new XAException(XAException.XAER_INVAL);
- acquireMutex();
+ factory.acquireMutex(0, XAException.class);
try {
logger.info("Performing setTransactionTimeout");
session.setTransactionTimeout(seconds * 1000L);
return true;
} finally {
- releaseMutex();
+ factory.releaseMutex();
}
}
public void start(Xid xid, int flags) throws XAException {
- acquireMutex();
+ factory.acquireMutex(0, XAException.class);
try {
xid = convertXid(xid);
logger.info("Performing start(" + formatFlags(flags) + "): " + parseXid(xid));
@@ -401,7 +397,7 @@
break;
}
} finally {
- releaseMutex();
+ factory.releaseMutex();
}
}
@@ -417,26 +413,6 @@
private DatabaseSession getSession() { return session; }
}
- /**
- * Used to replace the built in monitor to allow it to be properly released
- * during potentially blocking operations. All potentially blocking
- * operations involve writes, so in these cases the write-lock is reserved
- * allowing the mutex to be safely released and then reobtained after the
- * blocking operation concludes.
- */
- protected void acquireMutex() {
- mutex.lock();
- }
-
-
- protected void releaseMutex() {
- if (!mutex.isHeldByCurrentThread()) {
- throw new IllegalStateException("Attempt to release mutex without holding mutex");
- }
-
- mutex.unlock();
- }
-
public static String parseXid(Xid xid) {
return xid.toString();
}
More information about the Mulgara-svn
mailing list