[Mulgara-svn] r265 - trunk/src/jar/resolver/java/org/mulgara/resolver

andrae at mulgara.org andrae at mulgara.org
Mon May 7 04:13:08 UTC 2007


Author: andrae
Date: 2007-05-06 23:13:08 -0500 (Sun, 06 May 2007)
New Revision: 265

Modified:
   trunk/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionManager.java
Log:
Merged branches/mgr-37 revision 251 into trunk.

This provides mutex suspension over prepare() to avoid locking the transaction
manager over a potentially blocking operation.


Modified: trunk/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionManager.java
===================================================================
--- trunk/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionManager.java	2007-05-07 00:51:51 UTC (rev 264)
+++ trunk/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionManager.java	2007-05-07 04:13:08 UTC (rev 265)
@@ -23,6 +23,8 @@
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
 import javax.transaction.SystemException;
 import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
@@ -75,6 +77,11 @@
   private MulgaraTransaction userTransaction;
   private boolean autoCommit;
 
+  private ReentrantLock mutex;
+  private Condition writeLockCondition;
+  private Condition reserveCondition;
+  private Thread reservingThread;
+
   /** Set of sessions whose transactions have been rolledback.*/
   private Set<Session> failedSessions;
 
@@ -94,12 +101,16 @@
   /** Map of threads to active transactions. */
   private Map<Thread, MulgaraTransaction> activeTransactions;
 
-  private TransactionManager transactionManager;
+  private final TransactionManager transactionManager;
 
   public MulgaraTransactionManager(TransactionManagerFactory transactionManagerFactory) {
     this.currentWritingSession = null;
     this.userTransaction = null;
     this.autoCommit = true;
+    this.mutex = new ReentrantLock();
+    this.writeLockCondition = this.mutex.newCondition();
+    this.reserveCondition = this.mutex.newCondition();
+    this.reservingThread = null;
 
     this.failedSessions = new HashSet<Session>();
     this.sessions = new HashMap<MulgaraTransaction, Session>();
@@ -119,38 +130,46 @@
    *     obtain the write-lock, create a new transaction object and return it.</li>
    * </ul>
    */
-  public synchronized MulgaraTransaction getTransaction(DatabaseSession session, boolean write) throws MulgaraTransactionException {
+  public MulgaraTransaction getTransaction(DatabaseSession session, boolean write) throws MulgaraTransactionException {
+    acquireMutex();
+    try {
+      if (session == currentWritingSession) {
+        return userTransaction;
+      } 
 
-    if (session == currentWritingSession) {
-      return userTransaction;
-    } 
+      try {
+        MulgaraTransaction transaction = write ?
+            obtainWriteLock(session) :
+            new MulgaraTransaction(this, session.newOperationContext(false));
 
-    try {
-      MulgaraTransaction transaction = write ?
-          obtainWriteLock(session) :
-          new MulgaraTransaction(this, session.newOperationContext(false));
+        sessions.put(transaction, session);
 
-      sessions.put(transaction, session);
+        if (!transactions.containsKey(session)) {
+          transactions.put(session, new HashSet<MulgaraTransaction>());
+        }
+        transactions.get(session).add(transaction);
 
-      if (!transactions.containsKey(session)) {
-        transactions.put(session, new HashSet<MulgaraTransaction>());
+        return transaction;
+      } catch (MulgaraTransactionException em) {
+        throw em;
+      } catch (Exception e) {
+        throw new MulgaraTransactionException("Error creating transaction", e);
       }
-      transactions.get(session).add(transaction);
-
-      return transaction;
-    } catch (MulgaraTransactionException em) {
-      throw em;
-    } catch (Exception e) {
-      throw new MulgaraTransactionException("Error creating transaction", e);
+    } finally {
+      releaseMutex();
     }
   }
 
 
-  private synchronized MulgaraTransaction obtainWriteLock(DatabaseSession session)
+  /** 
+   * Obtains the write lock.
+   * Must hold readMutex on entry - but will drop readMutex if
+   */
+  private MulgaraTransaction obtainWriteLock(DatabaseSession session)
       throws MulgaraTransactionException {
-    while (currentWritingSession != null) {
+    while (currentWritingSession != null && !writeLockReserved()) {
       try {
-        this.wait();
+        writeLockCondition.await();
       } catch (InterruptedException ei) {
         throw new MulgaraTransactionException("Interrupted while waiting for write lock", ei);
       }
@@ -166,190 +185,218 @@
     }
   }
 
-  private synchronized void releaseWriteLock() {
+  private void releaseWriteLock() {
     // Calling this method multiple times is safe as the lock cannot be obtained
     // between calls as this method is private, and all calling methods are
     // synchronized.
     currentWritingSession = null;
     userTransaction = null;
-    this.notify();
+    writeLockCondition.signal();
   }
 
+  public void commit(DatabaseSession session) throws MulgaraTransactionException {
+    acquireMutex();
+    try {
+      reserveWriteLock();
+      if (failedSessions.contains(session)) {
+        throw new MulgaraTransactionException("Attempting to commit failed exception");
+      } else if (session != currentWritingSession) {
+        throw new MulgaraTransactionException(
+            "Attempting to commit while not the current writing transaction");
+      }
 
-  public synchronized void commit(DatabaseSession session) throws MulgaraTransactionException {
-    if (failedSessions.contains(session)) {
-      throw new MulgaraTransactionException("Attempting to commit failed exception");
-    } else if (session != currentWritingSession) {
-      throw new MulgaraTransactionException(
-          "Attempting to commit while not the current writing transaction");
+      setAutoCommit(session, true);
+      setAutoCommit(session, false);
+    } finally {
+      releaseMutex();
     }
-
-    setAutoCommit(session, true);
-    setAutoCommit(session, false);
   }
 
 
   /**
    * This is an explicit, user-specified rollback.
-   * This 
+   * 
    * This needs to be distinguished from an implicit rollback triggered by failure.
    */
-  public synchronized void rollback(DatabaseSession session) throws MulgaraTransactionException {
-    if (session == currentWritingSession) {
-      try {
-        userTransaction.execute(new TransactionOperation() {
-          public void execute() throws MulgaraTransactionException {
-            userTransaction.explicitRollback();
+  public void rollback(DatabaseSession session) throws MulgaraTransactionException {
+    acquireMutex();
+    try {
+      reserveWriteLock();
+      if (session == currentWritingSession) {
+        try {
+          userTransaction.execute(new TransactionOperation() {
+            public void execute() throws MulgaraTransactionException {
+              userTransaction.explicitRollback();
+            }
+          });
+          if (userTransaction != null) {
+            // transaction referenced by something - need to explicitly end it.
+            userTransaction.abortTransaction("Rollback failed",
+                new MulgaraTransactionException("Rollback failed to terminate write transaction"));
           }
-        });
-        if (userTransaction != null) {
-          // transaction referenced by something - need to explicitly end it.
-          userTransaction.abortTransaction("Rollback failed",
-              new MulgaraTransactionException("Rollback failed to terminate write transaction"));
+        } finally {
+          failedSessions.add(session);
+          releaseWriteLock();
+          setAutoCommit(session, false);
         }
-      } finally {
-        failedSessions.add(currentWritingSession);
-        releaseWriteLock();
+      } else if (failedSessions.contains(session)) {
+        failedSessions.remove(session);
         setAutoCommit(session, false);
+      } else {
+        throw new MulgaraTransactionException(
+            "Attempt to rollback while not in the current writing transaction");
       }
-    } else if (failedSessions.contains(session)) {
-      failedSessions.remove(session);
-      setAutoCommit(session, false);
-    } else {
-      throw new MulgaraTransactionException(
-          "Attempt to rollback while not in the current writing transaction");
+    } finally {
+      releaseMutex();
     }
   }
 
-  public synchronized void setAutoCommit(DatabaseSession session, boolean autoCommit)
+  public void setAutoCommit(DatabaseSession session, boolean autoCommit)
       throws MulgaraTransactionException {
-    if (session == currentWritingSession && failedSessions.contains(session)) {
-      userTransaction.abortTransaction("Session failed and transaction not finalized",
-          new MulgaraTransactionException("Failed Session in setAutoCommit"));
-    }
+    acquireMutex();
+    try {
+      if (session == currentWritingSession && failedSessions.contains(session)) {
+        userTransaction.abortTransaction("Session failed and transaction not finalized",
+            new MulgaraTransactionException("Failed Session in setAutoCommit"));
+      }
 
-    if (session == currentWritingSession || failedSessions.contains(session)) {
-      if (autoCommit) {
-        // AutoCommit off -> on === branch on current state of transaction.
-        if (session == currentWritingSession) {
-          // Within active transaction - commit and finalise.
-          try {
-            userTransaction.execute(new TransactionOperation() {
-              public void execute() throws MulgaraTransactionException {
-                userTransaction.dereference();
-                userTransaction.commitTransaction();
-              }
-            });
-          } finally {
-            releaseWriteLock();
-            this.autoCommit = true;
+      if (session == currentWritingSession || failedSessions.contains(session)) {
+        if (autoCommit) {
+          // AutoCommit off -> on === branch on current state of transaction.
+          if (session == currentWritingSession) {
+            // Within active transaction - commit and finalise.
+            try {
+              runWithoutMutex(new TransactionOperation() {
+                public void execute() throws MulgaraTransactionException {
+                  userTransaction.execute(new TransactionOperation() {
+                    public void execute() throws MulgaraTransactionException {
+                      userTransaction.dereference();
+                      userTransaction.commitTransaction();
+                    }
+                  });
+                }
+              });
+            } finally {
+              releaseWriteLock();
+              this.autoCommit = true;
+            }
+          } else if (failedSessions.contains(session)) {
+            // Within failed transaction - cleanup.
+            failedSessions.remove(session);
           }
-        } else if (failedSessions.contains(session)) {
-          // Within failed transaction - cleanup.
-          failedSessions.remove(session);
+        } else {
+          logger.info("Attempt to set autocommit false twice");
+          // AutoCommit off -> off === no-op. Log info.
         }
       } else {
-        logger.info("Attempt to set autocommit false twice");
-        // AutoCommit off -> off === no-op. Log info.
+        if (autoCommit) {
+          // AutoCommit on -> on === no-op.  Log info.
+          logger.info("Attempting to set autocommit true without setting it false");
+        } else {
+          // AutoCommit on -> off == Start new transaction.
+          userTransaction = getTransaction(session, true);
+          userTransaction.reference();
+          this.autoCommit = false;
+        }
       }
-    } else {
-      if (autoCommit) {
-        // AutoCommit on -> on === no-op.  Log info.
-        logger.info("Attempting to set autocommit true without setting it false");
-      } else {
-        // AutoCommit on -> off == Start new transaction.
-        userTransaction = getTransaction(session, true);
-        userTransaction.reference();
-        this.autoCommit = false;
-      }
+    } finally {
+      releaseMutex();
     }
   }
 
-  public synchronized void rollbackCurrentTransactions(Session session)
-      throws MulgaraTransactionException {
+  public void rollbackCurrentTransactions(Session session) throws MulgaraTransactionException {
+    acquireMutex();
     try {
-      if (failedSessions.contains(session)) {
-        failedSessions.remove(session);
-        return;
-      }
+      try {
+        if (failedSessions.contains(session)) {
+          failedSessions.remove(session);
+          return;
+        }
 
-      Throwable error = null;
+        Throwable error = null;
 
-      try {
-        if (session == currentWritingSession) {
-          logger.warn("Terminating session while holding writelock:" + session +
-              ":" + currentWritingSession + ": " + userTransaction);
-          userTransaction.execute(new TransactionOperation() {
-              public void execute() throws MulgaraTransactionException {
-                throw new MulgaraTransactionException("Terminating session while holding writelock");
-              }
-          });
+        try {
+          if (session == currentWritingSession) {
+            logger.warn("Terminating session while holding writelock:" + session + ": " + userTransaction);
+            userTransaction.execute(new TransactionOperation() {
+                public void execute() throws MulgaraTransactionException {
+                  throw new MulgaraTransactionException("Terminating session while holding writelock");
+                }
+            });
+          }
+        } catch (Throwable th) {
+          error = th;
         }
-      } catch (Throwable th) {
-        error = th;
-      }
 
-      final Throwable trigger = new MulgaraTransactionException("trigger rollback");
+        final Throwable trigger = new MulgaraTransactionException("trigger rollback");
 
-      if (transactions.containsKey(session)) {
-        for (MulgaraTransaction transaction : transactions.get(session)) {
-          try {
-            transaction.execute(new TransactionOperation() {
-              public void execute() throws MulgaraTransactionException {
-                throw new MulgaraTransactionException("Rolling back transactions due to session close");
+        if (transactions.containsKey(session)) {
+          for (MulgaraTransaction transaction : transactions.get(session)) {
+            try {
+              transaction.execute(new TransactionOperation() {
+                public void execute() throws MulgaraTransactionException {
+                  throw new MulgaraTransactionException("Rolling back transactions due to session close");
+                }
+              });
+            } catch (MulgaraTransactionException em) {
+              // ignore.
+            } catch (Throwable th) {
+              if (error == null) {
+                error = th;
               }
-            });
-          } catch (MulgaraTransactionException em) {
-            // ignore.
-          } catch (Throwable th) {
-            if (error == null) {
-              error = th;
             }
           }
         }
-      }
 
-      if (error != null) {
-        if (error instanceof MulgaraTransactionException) {
-          throw (MulgaraTransactionException)error;
-        } else {
-          throw new MulgaraTransactionException("Error in rollback on session close", error);
+        if (error != null) {
+          if (error instanceof MulgaraTransactionException) {
+            throw (MulgaraTransactionException)error;
+          } else {
+            throw new MulgaraTransactionException("Error in rollback on session close", error);
+          }
         }
+      } finally {
+        if (transactions.containsKey(session)) {
+          logger.error("Error in transaction rollback due to session close - aborting");
+          abortCurrentTransactions(session);
+        }
       }
     } finally {
-      if (transactions.containsKey(session)) {
-        logger.error("Error in transaction rollback due to session close - aborting");
-        abortCurrentTransactions(session);
-      }
+      releaseMutex();
     }
   }
 
   private void abortCurrentTransactions(Session session) throws MulgaraTransactionException {
+    acquireMutex();
     try {
-      Throwable error = null;
-      for (MulgaraTransaction transaction : transactions.get(session)) {
-        try {
-          transaction.abortTransaction("Transaction still valid on session close", new Throwable());
-        } catch (Throwable th) {
+      try {
+        Throwable error = null;
+        for (MulgaraTransaction transaction : transactions.get(session)) {
           try {
-            if (error == null) {
-              error = th;
-            }
-          } catch (Throwable throw_away) {}
+            transaction.abortTransaction("Transaction still valid on session close", new Throwable());
+          } catch (Throwable th) {
+            try {
+              if (error == null) {
+                error = th;
+              }
+            } catch (Throwable throw_away) {}
+          }
         }
-      }
 
-      if (error != null) {
-        if (error instanceof MulgaraTransactionException) {
-          throw (MulgaraTransactionException)error;
-        } else {
-          throw new MulgaraTransactionException("Error in rollback on session close", error);
+        if (error != null) {
+          if (error instanceof MulgaraTransactionException) {
+            throw (MulgaraTransactionException)error;
+          } else {
+            throw new MulgaraTransactionException("Error in rollback on session close", error);
+          }
         }
+      } finally {
+        if (session == currentWritingSession) {
+          logger.error("Failed to abort write-transaction on session close - Server restart required");
+        }
       }
     } finally {
-      if (session == currentWritingSession) {
-        logger.error("Failed to abort write-transaction on session close - Server restart required");
-      }
+      releaseMutex();
     }
   }
 
@@ -357,95 +404,119 @@
   // Transaction livecycle callbacks.
   //
 
-  public synchronized Transaction transactionStart(MulgaraTransaction transaction)
-      throws MulgaraTransactionException {
+  public Transaction transactionStart(MulgaraTransaction transaction) throws MulgaraTransactionException {
+    acquireMutex();
     try {
-      logger.info("Beginning Transaction");
-      if (activeTransactions.get(Thread.currentThread()) != null) {
-        throw new MulgaraTransactionException(
-            "Attempt to start transaction in thread with exiting active transaction.");
-      } else if (activeTransactions.containsValue(transaction)) {
-        throw new MulgaraTransactionException("Attempt to start transaction twice");
-      }
+      try {
+        logger.info("Beginning Transaction");
+        if (activeTransactions.get(Thread.currentThread()) != null) {
+          throw new MulgaraTransactionException(
+              "Attempt to start transaction in thread with exiting active transaction.");
+        } else if (activeTransactions.containsValue(transaction)) {
+          throw new MulgaraTransactionException("Attempt to start transaction twice");
+        }
 
-      transactionManager.begin();
-      Transaction jtaTrans = transactionManager.getTransaction();
+        transactionManager.begin();
+        Transaction jtaTrans = transactionManager.getTransaction();
 
-      activeTransactions.put(Thread.currentThread(), transaction);
+        activeTransactions.put(Thread.currentThread(), transaction);
 
-      return jtaTrans;
-    } catch (Exception e) {
-      throw new MulgaraTransactionException("Transaction Begin Failed", e);
+        return jtaTrans;
+      } catch (Exception e) {
+        throw new MulgaraTransactionException("Transaction Begin Failed", e);
+      }
+    } finally {
+      releaseMutex();
     }
   }
 
-  public synchronized void transactionResumed(MulgaraTransaction transaction, Transaction jtaXA) 
+  public void transactionResumed(MulgaraTransaction transaction, Transaction jtaXA) 
       throws MulgaraTransactionException {
-    if (activeTransactions.get(Thread.currentThread()) != null) {
-      throw new MulgaraTransactionException(
-          "Attempt to resume transaction in already activated thread");
-    } else if (activeTransactions.containsValue(transaction)) {
-      throw new MulgaraTransactionException("Attempt to resume active transaction");
-    }
-    
+    acquireMutex();
     try {
-      transactionManager.resume(jtaXA);
-      activeTransactions.put(Thread.currentThread(), transaction);
-    } catch (Exception e) {
-      throw new MulgaraTransactionException("Resume Failed", e);
+      if (activeTransactions.get(Thread.currentThread()) != null) {
+        throw new MulgaraTransactionException(
+            "Attempt to resume transaction in already activated thread");
+      } else if (activeTransactions.containsValue(transaction)) {
+        throw new MulgaraTransactionException("Attempt to resume active transaction");
+      }
+      
+      try {
+        transactionManager.resume(jtaXA);
+        activeTransactions.put(Thread.currentThread(), transaction);
+      } catch (Exception e) {
+        throw new MulgaraTransactionException("Resume Failed", e);
+      }
+    } finally {
+      releaseMutex();
     }
   }
 
-  public synchronized Transaction transactionSuspended(MulgaraTransaction transaction)
+  public Transaction transactionSuspended(MulgaraTransaction transaction)
       throws MulgaraTransactionException {
+    acquireMutex();
     try {
-      if (transaction != activeTransactions.get(Thread.currentThread())) {
-        throw new MulgaraTransactionException(
-            "Attempt to suspend transaction from outside thread");
-      }
+      try {
+        if (transaction != activeTransactions.get(Thread.currentThread())) {
+          throw new MulgaraTransactionException(
+              "Attempt to suspend transaction from outside thread");
+        }
 
-      if (autoCommit && transaction == userTransaction) {
-        logger.error("Attempt to suspend write transaction without setting AutoCommit Off");
-        throw new MulgaraTransactionException(
-            "Attempt to suspend write transaction without setting AutoCommit Off");
-      }
+        if (autoCommit && transaction == userTransaction) {
+          logger.error("Attempt to suspend write transaction without setting AutoCommit Off");
+          throw new MulgaraTransactionException(
+              "Attempt to suspend write transaction without setting AutoCommit Off");
+        }
 
-      Transaction xa = transactionManager.suspend();
-      activeTransactions.remove(Thread.currentThread());
+        Transaction xa = transactionManager.suspend();
+        activeTransactions.remove(Thread.currentThread());
 
-      return xa;
-    } catch (Throwable th) {
-      logger.error("Attempt to suspend failed", th);
-      try {
-        transactionManager.setRollbackOnly();
-      } catch (Throwable t) {
-        logger.error("Attempt to setRollbackOnly() failed", t);
+        return xa;
+      } catch (Throwable th) {
+        logger.error("Attempt to suspend failed", th);
+        try {
+          transactionManager.setRollbackOnly();
+        } catch (Throwable t) {
+          logger.error("Attempt to setRollbackOnly() failed", t);
+        }
+        throw new MulgaraTransactionException("Suspend failed", th);
       }
-      throw new MulgaraTransactionException("Suspend failed", th);
+    } finally {
+      releaseMutex();
     }
   }
 
-  public synchronized void transactionComplete(MulgaraTransaction transaction) {
-    if (holdsWriteLock(transaction)) {
-      releaseWriteLock();
+  public void transactionComplete(MulgaraTransaction transaction) {
+    acquireMutex();
+    try {
+      if (transaction == userTransaction) {
+        releaseWriteLock();
+      }
+
+      activeTransactions.remove(Thread.currentThread());
+      Session session = (Session)sessions.get(transaction);
+      sessions.remove(transaction);
+      transactions.remove(session);
+    } finally {
+      releaseMutex();
     }
-
-    activeTransactions.remove(Thread.currentThread());
-    Session session = (Session)sessions.get(transaction);
-    sessions.remove(transaction);
-    transactions.remove(session);
   }
 
-  public synchronized void transactionAborted(MulgaraTransaction transaction) {
+  public void transactionAborted(MulgaraTransaction transaction) {
+    acquireMutex();
     try {
-      // Make sure this cleans up the transaction metadata - this transaction is DEAD!
-      if (transaction == userTransaction) {
-        failedSessions.add(currentWritingSession);
+      try {
+        // Make sure this cleans up the transaction metadata - this transaction is DEAD!
+        if (transaction == userTransaction) {
+          failedSessions.add(currentWritingSession);
+        }
+        transactionComplete(transaction);
+      } catch (Throwable th) {
+        // FIXME: This should probably abort the entire server after logging the error!
+        logger.error("Error managing transaction abort", th);
       }
-      transactionComplete(transaction);
-    } catch (Throwable th) {
-      // FIXME: This should probably abort the entire server after logging the error!
-      logger.error("Error managing transaction abort", th);
+    } finally {
+      releaseMutex();
     }
   }
 
@@ -457,7 +528,59 @@
     }
   }
 
-  private boolean holdsWriteLock(MulgaraTransaction transaction) {
-    return transaction == userTransaction;
+  private void acquireMutex() {
+    mutex.lock();
   }
+
+  private void reserveWriteLock() throws MulgaraTransactionException {
+    if (!mutex.isHeldByCurrentThread()) {
+      throw new IllegalStateException("Attempt to set modify without holding mutex");
+    }
+
+    if (Thread.currentThread().equals(reservingThread)) {
+      return;
+    }
+
+    while (reservingThread != null) {
+      try {
+        reserveCondition.await();
+      } catch (InterruptedException ei) {
+        throw new MulgaraTransactionException("Thread interrupted while reserving write lock", ei);
+      }
+    }
+    reservingThread = Thread.currentThread();
+  }
+
+  private boolean writeLockReserved() {
+    return reservingThread == null || Thread.currentThread().equals(reservingThread);
+  }
+
+  private void releaseMutex() {
+    if (!mutex.isHeldByCurrentThread()) {
+      throw new IllegalStateException("Attempt to release mutex without holding mutex");
+    }
+
+    if (mutex.getHoldCount() == 1 && Thread.currentThread().equals(reservingThread)) {
+      reservingThread = null;
+      reserveCondition.signal();
+    }
+
+    mutex.unlock();
+  }
+
+  private void runWithoutMutex(TransactionOperation proc) throws MulgaraTransactionException {
+    if (!mutex.isHeldByCurrentThread()) {
+      throw new IllegalStateException("Attempt to run procedure without holding mutex");
+    }
+    int holdCount = mutex.getHoldCount();
+    for (int i = 0; i < holdCount; i++) {
+      mutex.unlock();
+    }
+
+    proc.execute();
+
+    for (int i = 0; i < holdCount; i++) {
+      mutex.lock();
+    }
+  }
 }




More information about the Mulgara-svn mailing list