[Mulgara-svn] r251 - branches/mgr-37/src/jar/resolver/java/org/mulgara/resolver
andrae at mulgara.org
andrae at mulgara.org
Tue Apr 24 12:14:21 UTC 2007
Author: andrae
Date: 2007-04-24 07:14:21 -0500 (Tue, 24 Apr 2007)
New Revision: 251
Modified:
branches/mgr-37/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionManager.java
Log:
Sustained attempts to split the mutex floundered on lock-ordering.
I still need to double check my reasoning here but I am pretty confident that
the mutex-suspension logic provided by runWithoutMutex(), combined with the
ability to reserve the right to obtain the writeLock, is sufficient to allow the
prepare() in setAutoCommit() to proceed without blocking read-only transactions.
Note: this code compiles, but is untested - testing is waiting on me synching
this with a faster box then my notebook.
Modified: branches/mgr-37/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionManager.java
===================================================================
--- branches/mgr-37/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionManager.java 2007-04-23 01:48:45 UTC (rev 250)
+++ branches/mgr-37/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionManager.java 2007-04-24 12:14:21 UTC (rev 251)
@@ -23,6 +23,8 @@
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
@@ -75,6 +77,11 @@
private MulgaraTransaction userTransaction;
private boolean autoCommit;
+ private ReentrantLock mutex;
+ private Condition writeLockCondition;
+ private Condition reserveCondition;
+ private Thread reservingThread;
+
/** Set of sessions whose transactions have been rolledback.*/
private Set<Session> failedSessions;
@@ -94,12 +101,16 @@
/** Map of threads to active transactions. */
private Map<Thread, MulgaraTransaction> activeTransactions;
- private TransactionManager transactionManager;
+ private final TransactionManager transactionManager;
public MulgaraTransactionManager(TransactionManagerFactory transactionManagerFactory) {
this.currentWritingSession = null;
this.userTransaction = null;
this.autoCommit = true;
+ this.mutex = new ReentrantLock();
+ this.writeLockCondition = this.mutex.newCondition();
+ this.reserveCondition = this.mutex.newCondition();
+ this.reservingThread = null;
this.failedSessions = new HashSet<Session>();
this.sessions = new HashMap<MulgaraTransaction, Session>();
@@ -119,38 +130,46 @@
* obtain the write-lock, create a new transaction object and return it.</li>
* </ul>
*/
- public synchronized MulgaraTransaction getTransaction(DatabaseSession session, boolean write) throws MulgaraTransactionException {
+ public MulgaraTransaction getTransaction(DatabaseSession session, boolean write) throws MulgaraTransactionException {
+ acquireMutex();
+ try {
+ if (session == currentWritingSession) {
+ return userTransaction;
+ }
- if (session == currentWritingSession) {
- return userTransaction;
- }
+ try {
+ MulgaraTransaction transaction = write ?
+ obtainWriteLock(session) :
+ new MulgaraTransaction(this, session.newOperationContext(false));
- try {
- MulgaraTransaction transaction = write ?
- obtainWriteLock(session) :
- new MulgaraTransaction(this, session.newOperationContext(false));
+ sessions.put(transaction, session);
- sessions.put(transaction, session);
+ if (!transactions.containsKey(session)) {
+ transactions.put(session, new HashSet<MulgaraTransaction>());
+ }
+ transactions.get(session).add(transaction);
- if (!transactions.containsKey(session)) {
- transactions.put(session, new HashSet<MulgaraTransaction>());
+ return transaction;
+ } catch (MulgaraTransactionException em) {
+ throw em;
+ } catch (Exception e) {
+ throw new MulgaraTransactionException("Error creating transaction", e);
}
- transactions.get(session).add(transaction);
-
- return transaction;
- } catch (MulgaraTransactionException em) {
- throw em;
- } catch (Exception e) {
- throw new MulgaraTransactionException("Error creating transaction", e);
+ } finally {
+ releaseMutex();
}
}
- private synchronized MulgaraTransaction obtainWriteLock(DatabaseSession session)
+ /**
+ * Obtains the write lock.
+ * Must hold readMutex on entry - but will drop readMutex if
+ */
+ private MulgaraTransaction obtainWriteLock(DatabaseSession session)
throws MulgaraTransactionException {
- while (currentWritingSession != null) {
+ while (currentWritingSession != null && !writeLockReserved()) {
try {
- this.wait();
+ writeLockCondition.await();
} catch (InterruptedException ei) {
throw new MulgaraTransactionException("Interrupted while waiting for write lock", ei);
}
@@ -166,190 +185,218 @@
}
}
- private synchronized void releaseWriteLock() {
+ private void releaseWriteLock() {
// Calling this method multiple times is safe as the lock cannot be obtained
// between calls as this method is private, and all calling methods are
// synchronized.
currentWritingSession = null;
userTransaction = null;
- this.notify();
+ writeLockCondition.signal();
}
+ public void commit(DatabaseSession session) throws MulgaraTransactionException {
+ acquireMutex();
+ try {
+ reserveWriteLock();
+ if (failedSessions.contains(session)) {
+ throw new MulgaraTransactionException("Attempting to commit failed exception");
+ } else if (session != currentWritingSession) {
+ throw new MulgaraTransactionException(
+ "Attempting to commit while not the current writing transaction");
+ }
- public synchronized void commit(DatabaseSession session) throws MulgaraTransactionException {
- if (failedSessions.contains(session)) {
- throw new MulgaraTransactionException("Attempting to commit failed exception");
- } else if (session != currentWritingSession) {
- throw new MulgaraTransactionException(
- "Attempting to commit while not the current writing transaction");
+ setAutoCommit(session, true);
+ setAutoCommit(session, false);
+ } finally {
+ releaseMutex();
}
-
- setAutoCommit(session, true);
- setAutoCommit(session, false);
}
/**
* This is an explicit, user-specified rollback.
- * This
+ *
* This needs to be distinguished from an implicit rollback triggered by failure.
*/
- public synchronized void rollback(DatabaseSession session) throws MulgaraTransactionException {
- if (session == currentWritingSession) {
- try {
- userTransaction.execute(new TransactionOperation() {
- public void execute() throws MulgaraTransactionException {
- userTransaction.explicitRollback();
+ public void rollback(DatabaseSession session) throws MulgaraTransactionException {
+ acquireMutex();
+ try {
+ reserveWriteLock();
+ if (session == currentWritingSession) {
+ try {
+ userTransaction.execute(new TransactionOperation() {
+ public void execute() throws MulgaraTransactionException {
+ userTransaction.explicitRollback();
+ }
+ });
+ if (userTransaction != null) {
+ // transaction referenced by something - need to explicitly end it.
+ userTransaction.abortTransaction("Rollback failed",
+ new MulgaraTransactionException("Rollback failed to terminate write transaction"));
}
- });
- if (userTransaction != null) {
- // transaction referenced by something - need to explicitly end it.
- userTransaction.abortTransaction("Rollback failed",
- new MulgaraTransactionException("Rollback failed to terminate write transaction"));
+ } finally {
+ failedSessions.add(session);
+ releaseWriteLock();
+ setAutoCommit(session, false);
}
- } finally {
- failedSessions.add(currentWritingSession);
- releaseWriteLock();
+ } else if (failedSessions.contains(session)) {
+ failedSessions.remove(session);
setAutoCommit(session, false);
+ } else {
+ throw new MulgaraTransactionException(
+ "Attempt to rollback while not in the current writing transaction");
}
- } else if (failedSessions.contains(session)) {
- failedSessions.remove(session);
- setAutoCommit(session, false);
- } else {
- throw new MulgaraTransactionException(
- "Attempt to rollback while not in the current writing transaction");
+ } finally {
+ releaseMutex();
}
}
- public synchronized void setAutoCommit(DatabaseSession session, boolean autoCommit)
+ public void setAutoCommit(DatabaseSession session, boolean autoCommit)
throws MulgaraTransactionException {
- if (session == currentWritingSession && failedSessions.contains(session)) {
- userTransaction.abortTransaction("Session failed and transaction not finalized",
- new MulgaraTransactionException("Failed Session in setAutoCommit"));
- }
+ acquireMutex();
+ try {
+ if (session == currentWritingSession && failedSessions.contains(session)) {
+ userTransaction.abortTransaction("Session failed and transaction not finalized",
+ new MulgaraTransactionException("Failed Session in setAutoCommit"));
+ }
- if (session == currentWritingSession || failedSessions.contains(session)) {
- if (autoCommit) {
- // AutoCommit off -> on === branch on current state of transaction.
- if (session == currentWritingSession) {
- // Within active transaction - commit and finalise.
- try {
- userTransaction.execute(new TransactionOperation() {
- public void execute() throws MulgaraTransactionException {
- userTransaction.dereference();
- userTransaction.commitTransaction();
- }
- });
- } finally {
- releaseWriteLock();
- this.autoCommit = true;
+ if (session == currentWritingSession || failedSessions.contains(session)) {
+ if (autoCommit) {
+ // AutoCommit off -> on === branch on current state of transaction.
+ if (session == currentWritingSession) {
+ // Within active transaction - commit and finalise.
+ try {
+ runWithoutMutex(new TransactionOperation() {
+ public void execute() throws MulgaraTransactionException {
+ userTransaction.execute(new TransactionOperation() {
+ public void execute() throws MulgaraTransactionException {
+ userTransaction.dereference();
+ userTransaction.commitTransaction();
+ }
+ });
+ }
+ });
+ } finally {
+ releaseWriteLock();
+ this.autoCommit = true;
+ }
+ } else if (failedSessions.contains(session)) {
+ // Within failed transaction - cleanup.
+ failedSessions.remove(session);
}
- } else if (failedSessions.contains(session)) {
- // Within failed transaction - cleanup.
- failedSessions.remove(session);
+ } else {
+ logger.info("Attempt to set autocommit false twice");
+ // AutoCommit off -> off === no-op. Log info.
}
} else {
- logger.info("Attempt to set autocommit false twice");
- // AutoCommit off -> off === no-op. Log info.
+ if (autoCommit) {
+ // AutoCommit on -> on === no-op. Log info.
+ logger.info("Attempting to set autocommit true without setting it false");
+ } else {
+ // AutoCommit on -> off == Start new transaction.
+ userTransaction = getTransaction(session, true);
+ userTransaction.reference();
+ this.autoCommit = false;
+ }
}
- } else {
- if (autoCommit) {
- // AutoCommit on -> on === no-op. Log info.
- logger.info("Attempting to set autocommit true without setting it false");
- } else {
- // AutoCommit on -> off == Start new transaction.
- userTransaction = getTransaction(session, true);
- userTransaction.reference();
- this.autoCommit = false;
- }
+ } finally {
+ releaseMutex();
}
}
- public synchronized void rollbackCurrentTransactions(Session session)
- throws MulgaraTransactionException {
+ public void rollbackCurrentTransactions(Session session) throws MulgaraTransactionException {
+ acquireMutex();
try {
- if (failedSessions.contains(session)) {
- failedSessions.remove(session);
- return;
- }
+ try {
+ if (failedSessions.contains(session)) {
+ failedSessions.remove(session);
+ return;
+ }
- Throwable error = null;
+ Throwable error = null;
- try {
- if (session == currentWritingSession) {
- logger.warn("Terminating session while holding writelock:" + session +
- ":" + currentWritingSession + ": " + userTransaction);
- userTransaction.execute(new TransactionOperation() {
- public void execute() throws MulgaraTransactionException {
- throw new MulgaraTransactionException("Terminating session while holding writelock");
- }
- });
+ try {
+ if (session == currentWritingSession) {
+ logger.warn("Terminating session while holding writelock:" + session + ": " + userTransaction);
+ userTransaction.execute(new TransactionOperation() {
+ public void execute() throws MulgaraTransactionException {
+ throw new MulgaraTransactionException("Terminating session while holding writelock");
+ }
+ });
+ }
+ } catch (Throwable th) {
+ error = th;
}
- } catch (Throwable th) {
- error = th;
- }
- final Throwable trigger = new MulgaraTransactionException("trigger rollback");
+ final Throwable trigger = new MulgaraTransactionException("trigger rollback");
- if (transactions.containsKey(session)) {
- for (MulgaraTransaction transaction : transactions.get(session)) {
- try {
- transaction.execute(new TransactionOperation() {
- public void execute() throws MulgaraTransactionException {
- throw new MulgaraTransactionException("Rolling back transactions due to session close");
+ if (transactions.containsKey(session)) {
+ for (MulgaraTransaction transaction : transactions.get(session)) {
+ try {
+ transaction.execute(new TransactionOperation() {
+ public void execute() throws MulgaraTransactionException {
+ throw new MulgaraTransactionException("Rolling back transactions due to session close");
+ }
+ });
+ } catch (MulgaraTransactionException em) {
+ // ignore.
+ } catch (Throwable th) {
+ if (error == null) {
+ error = th;
}
- });
- } catch (MulgaraTransactionException em) {
- // ignore.
- } catch (Throwable th) {
- if (error == null) {
- error = th;
}
}
}
- }
- if (error != null) {
- if (error instanceof MulgaraTransactionException) {
- throw (MulgaraTransactionException)error;
- } else {
- throw new MulgaraTransactionException("Error in rollback on session close", error);
+ if (error != null) {
+ if (error instanceof MulgaraTransactionException) {
+ throw (MulgaraTransactionException)error;
+ } else {
+ throw new MulgaraTransactionException("Error in rollback on session close", error);
+ }
}
+ } finally {
+ if (transactions.containsKey(session)) {
+ logger.error("Error in transaction rollback due to session close - aborting");
+ abortCurrentTransactions(session);
+ }
}
} finally {
- if (transactions.containsKey(session)) {
- logger.error("Error in transaction rollback due to session close - aborting");
- abortCurrentTransactions(session);
- }
+ releaseMutex();
}
}
private void abortCurrentTransactions(Session session) throws MulgaraTransactionException {
+ acquireMutex();
try {
- Throwable error = null;
- for (MulgaraTransaction transaction : transactions.get(session)) {
- try {
- transaction.abortTransaction("Transaction still valid on session close", new Throwable());
- } catch (Throwable th) {
+ try {
+ Throwable error = null;
+ for (MulgaraTransaction transaction : transactions.get(session)) {
try {
- if (error == null) {
- error = th;
- }
- } catch (Throwable throw_away) {}
+ transaction.abortTransaction("Transaction still valid on session close", new Throwable());
+ } catch (Throwable th) {
+ try {
+ if (error == null) {
+ error = th;
+ }
+ } catch (Throwable throw_away) {}
+ }
}
- }
- if (error != null) {
- if (error instanceof MulgaraTransactionException) {
- throw (MulgaraTransactionException)error;
- } else {
- throw new MulgaraTransactionException("Error in rollback on session close", error);
+ if (error != null) {
+ if (error instanceof MulgaraTransactionException) {
+ throw (MulgaraTransactionException)error;
+ } else {
+ throw new MulgaraTransactionException("Error in rollback on session close", error);
+ }
}
+ } finally {
+ if (session == currentWritingSession) {
+ logger.error("Failed to abort write-transaction on session close - Server restart required");
+ }
}
} finally {
- if (session == currentWritingSession) {
- logger.error("Failed to abort write-transaction on session close - Server restart required");
- }
+ releaseMutex();
}
}
@@ -357,97 +404,119 @@
// Transaction livecycle callbacks.
//
- public synchronized Transaction transactionStart(MulgaraTransaction transaction)
- throws MulgaraTransactionException {
+ public Transaction transactionStart(MulgaraTransaction transaction) throws MulgaraTransactionException {
+ acquireMutex();
try {
- logger.info("Beginning Transaction");
- if (activeTransactions.get(Thread.currentThread()) != null) {
- throw new MulgaraTransactionException(
- "Attempt to start transaction in thread with exiting active transaction.");
- } else if (activeTransactions.containsValue(transaction)) {
- throw new MulgaraTransactionException("Attempt to start transaction twice");
- }
+ try {
+ logger.info("Beginning Transaction");
+ if (activeTransactions.get(Thread.currentThread()) != null) {
+ throw new MulgaraTransactionException(
+ "Attempt to start transaction in thread with exiting active transaction.");
+ } else if (activeTransactions.containsValue(transaction)) {
+ throw new MulgaraTransactionException("Attempt to start transaction twice");
+ }
- transactionManager.begin();
- Transaction jtaTrans = transactionManager.getTransaction();
+ transactionManager.begin();
+ Transaction jtaTrans = transactionManager.getTransaction();
- activeTransactions.put(Thread.currentThread(), transaction);
+ activeTransactions.put(Thread.currentThread(), transaction);
- return jtaTrans;
- } catch (Exception e) {
- throw new MulgaraTransactionException("Transaction Begin Failed", e);
+ return jtaTrans;
+ } catch (Exception e) {
+ throw new MulgaraTransactionException("Transaction Begin Failed", e);
+ }
+ } finally {
+ releaseMutex();
}
}
- public synchronized void transactionResumed(MulgaraTransaction transaction, Transaction jtaXA)
+ public void transactionResumed(MulgaraTransaction transaction, Transaction jtaXA)
throws MulgaraTransactionException {
- if (activeTransactions.get(Thread.currentThread()) != null) {
- throw new MulgaraTransactionException(
- "Attempt to resume transaction in already activated thread");
- } else if (activeTransactions.containsValue(transaction)) {
- throw new MulgaraTransactionException("Attempt to resume active transaction");
- }
-
+ acquireMutex();
try {
- transactionManager.resume(jtaXA);
- activeTransactions.put(Thread.currentThread(), transaction);
- } catch (Exception e) {
- throw new MulgaraTransactionException("Resume Failed", e);
+ if (activeTransactions.get(Thread.currentThread()) != null) {
+ throw new MulgaraTransactionException(
+ "Attempt to resume transaction in already activated thread");
+ } else if (activeTransactions.containsValue(transaction)) {
+ throw new MulgaraTransactionException("Attempt to resume active transaction");
+ }
+
+ try {
+ transactionManager.resume(jtaXA);
+ activeTransactions.put(Thread.currentThread(), transaction);
+ } catch (Exception e) {
+ throw new MulgaraTransactionException("Resume Failed", e);
+ }
+ } finally {
+ releaseMutex();
}
}
- public synchronized Transaction transactionSuspended(MulgaraTransaction transaction)
+ public Transaction transactionSuspended(MulgaraTransaction transaction)
throws MulgaraTransactionException {
+ acquireMutex();
try {
- if (transaction != activeTransactions.get(Thread.currentThread())) {
- throw new MulgaraTransactionException(
- "Attempt to suspend transaction from outside thread");
- }
+ try {
+ if (transaction != activeTransactions.get(Thread.currentThread())) {
+ throw new MulgaraTransactionException(
+ "Attempt to suspend transaction from outside thread");
+ }
- if (autoCommit && transaction == userTransaction) {
- logger.error("Attempt to suspend write transaction without setting AutoCommit Off");
- throw new MulgaraTransactionException(
- "Attempt to suspend write transaction without setting AutoCommit Off");
- } else {
-// logger.error("Suspended transaction: ac=" + autoCommit + " t=" + transaction + "ut=" + userTransaction);
- }
+ if (autoCommit && transaction == userTransaction) {
+ logger.error("Attempt to suspend write transaction without setting AutoCommit Off");
+ throw new MulgaraTransactionException(
+ "Attempt to suspend write transaction without setting AutoCommit Off");
+ }
- Transaction xa = transactionManager.suspend();
- activeTransactions.remove(Thread.currentThread());
+ Transaction xa = transactionManager.suspend();
+ activeTransactions.remove(Thread.currentThread());
- return xa;
- } catch (Throwable th) {
- logger.error("Attempt to suspend failed", th);
- try {
- transactionManager.setRollbackOnly();
- } catch (Throwable t) {
- logger.error("Attempt to setRollbackOnly() failed", t);
+ return xa;
+ } catch (Throwable th) {
+ logger.error("Attempt to suspend failed", th);
+ try {
+ transactionManager.setRollbackOnly();
+ } catch (Throwable t) {
+ logger.error("Attempt to setRollbackOnly() failed", t);
+ }
+ throw new MulgaraTransactionException("Suspend failed", th);
}
- throw new MulgaraTransactionException("Suspend failed", th);
+ } finally {
+ releaseMutex();
}
}
- public synchronized void transactionComplete(MulgaraTransaction transaction) {
- if (holdsWriteLock(transaction)) {
- releaseWriteLock();
+ public void transactionComplete(MulgaraTransaction transaction) {
+ acquireMutex();
+ try {
+ if (transaction == userTransaction) {
+ releaseWriteLock();
+ }
+
+ activeTransactions.remove(Thread.currentThread());
+ Session session = (Session)sessions.get(transaction);
+ sessions.remove(transaction);
+ transactions.remove(session);
+ } finally {
+ releaseMutex();
}
-
- activeTransactions.remove(Thread.currentThread());
- Session session = (Session)sessions.get(transaction);
- sessions.remove(transaction);
- transactions.remove(session);
}
- public synchronized void transactionAborted(MulgaraTransaction transaction) {
+ public void transactionAborted(MulgaraTransaction transaction) {
+ acquireMutex();
try {
- // Make sure this cleans up the transaction metadata - this transaction is DEAD!
- if (transaction == userTransaction) {
- failedSessions.add(currentWritingSession);
+ try {
+ // Make sure this cleans up the transaction metadata - this transaction is DEAD!
+ if (transaction == userTransaction) {
+ failedSessions.add(currentWritingSession);
+ }
+ transactionComplete(transaction);
+ } catch (Throwable th) {
+ // FIXME: This should probably abort the entire server after logging the error!
+ logger.error("Error managing transaction abort", th);
}
- transactionComplete(transaction);
- } catch (Throwable th) {
- // FIXME: This should probably abort the entire server after logging the error!
- logger.error("Error managing transaction abort", th);
+ } finally {
+ releaseMutex();
}
}
@@ -459,7 +528,59 @@
}
}
- private boolean holdsWriteLock(MulgaraTransaction transaction) {
- return transaction == userTransaction;
+ private void acquireMutex() {
+ mutex.lock();
}
+
+ private void reserveWriteLock() throws MulgaraTransactionException {
+ if (!mutex.isHeldByCurrentThread()) {
+ throw new IllegalStateException("Attempt to set modify without holding mutex");
+ }
+
+ if (Thread.currentThread().equals(reservingThread)) {
+ return;
+ }
+
+ while (reservingThread != null) {
+ try {
+ reserveCondition.await();
+ } catch (InterruptedException ei) {
+ throw new MulgaraTransactionException("Thread interrupted while reserving write lock", ei);
+ }
+ }
+ reservingThread = Thread.currentThread();
+ }
+
+ private boolean writeLockReserved() {
+ return reservingThread == null || Thread.currentThread().equals(reservingThread);
+ }
+
+ private void releaseMutex() {
+ if (!mutex.isHeldByCurrentThread()) {
+ throw new IllegalStateException("Attempt to release mutex without holding mutex");
+ }
+
+ if (mutex.getHoldCount() == 1 && Thread.currentThread().equals(reservingThread)) {
+ reservingThread = null;
+ reserveCondition.signal();
+ }
+
+ mutex.unlock();
+ }
+
+ private void runWithoutMutex(TransactionOperation proc) throws MulgaraTransactionException {
+ if (!mutex.isHeldByCurrentThread()) {
+ throw new IllegalStateException("Attempt to run procedure without holding mutex");
+ }
+ int holdCount = mutex.getHoldCount();
+ for (int i = 0; i < holdCount; i++) {
+ mutex.unlock();
+ }
+
+ proc.execute();
+
+ for (int i = 0; i < holdCount; i++) {
+ mutex.lock();
+ }
+ }
}
More information about the Mulgara-svn
mailing list