[Mulgara-svn] r1016 - branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver

ronald at mulgara.org ronald at mulgara.org
Mon Jun 23 06:02:39 UTC 2008


Author: ronald
Date: 2008-06-22 23:02:39 -0700 (Sun, 22 Jun 2008)
New Revision: 1016

Modified:
   branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/ExternalTransactionUnitTest.java
   branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransactionFactory.java
   branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraInternalTransactionFactory.java
   branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionFactory.java
   branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionManager.java
Log:
Move timeout code from MulgaraTransactionManager to the
MulgaraTransactionFactory's. This fixes a deadlock, because most operations
acquire the the MulgaraTransactionFactory mutex and then acquire the
MulgaraTransactionManager mutex, but the timeout code was acquiring them in
the opposite order.


Modified: branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/ExternalTransactionUnitTest.java
===================================================================
--- branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/ExternalTransactionUnitTest.java	2008-06-23 05:59:45 UTC (rev 1015)
+++ branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/ExternalTransactionUnitTest.java	2008-06-23 06:02:39 UTC (rev 1016)
@@ -121,6 +121,7 @@
     suite.addTest(new ExternalTransactionUnitTest("testInternalExternalConcurrentTxnRollback"));
     suite.addTest(new ExternalTransactionUnitTest("testExternalInternalConcurrentTxnRollback"));
     suite.addTest(new ExternalTransactionUnitTest("testInternalSerialMultipleSessions"));
+    suite.addTest(new ExternalTransactionUnitTest("testTransactionTimeout"));
     suite.addTest(new ExternalTransactionUnitTest("testTransactionFailure"));
 
     return suite;
@@ -1513,6 +1514,171 @@
 
 
   /**
+   * Test transaction timeout.
+   */
+  public void testTransactionTimeout() throws URISyntaxException {
+    logger.info("testTransactionTimeout");
+    URI fileURI  = new File("data/xatest-model1.rdf").toURI();
+
+    // test idle timeout
+    try {
+      Session session1 = database.newSession();
+      session1.setIdleTimeout(10000);
+
+      try {
+        XAResource resource = session1.getXAResource();
+        Xid xid = new TestXid(1);
+        resource.start(xid, XAResource.TMNOFLAGS);
+
+        session1.createModel(model3URI, null);
+
+        resource.end(xid, XAResource.TMSUCCESS);
+        resource.commit(xid, true);
+
+        logger.debug("Starting transaction for session1");
+        resource.start(xid, XAResource.TMNOFLAGS);
+        logger.debug("Started transaction for session1");
+
+        Thread t2 = new Thread("tx2Test") {
+          public void run() {
+            try {
+              Session session2 = database.newSession();
+              try {
+                logger.debug("Obtaining autocommit for session2");
+                session2.setAutoCommit(false);
+                logger.debug("Obtained autocommit for session2");
+
+                // Evaluate the query
+                Answer answer = session2.query(createQuery(model3URI));
+
+                answer.beforeFirst();
+                assertFalse(answer.next());
+                answer.close();
+
+                logger.debug("Releasing autocommit for session2");
+                session2.setAutoCommit(true);
+              } finally {
+                session2.close();
+              }
+            } catch (Exception e) {
+              fail(e);
+            }
+          }
+        };
+        t2.start();
+
+        session1.setModel(model3URI, new ModelResource(fileURI));
+        logger.debug("Sleeping for 20sec");
+        Thread.sleep(20000);
+        logger.debug("Slept for 20sec");
+        try {
+          t2.join(2000L);
+        } catch (InterruptedException ie) {
+          logger.error("wait for tx2-terminated interrupted", ie);
+          fail(ie);
+        }
+        assertFalse("second transaction should've terminated", t2.isAlive());
+
+        boolean xaeThrown = false;
+        try {
+          resource.end(xid, XAResource.TMSUCCESS);
+          resource.commit(xid, true);
+        } catch (XAException xae) {
+          xaeThrown = true;
+        }
+
+        assertTrue("Commit should have failed due to idle timeout", xaeThrown);
+
+        logger.debug("Rolling back transaction on session1");
+        resource.rollback(xid);
+      } finally {
+        session1.close();
+      }
+    } catch (Exception e) {
+      fail(e);
+    }
+
+    // test transaction timeout
+    try {
+      Session session1 = database.newSession();
+      try {
+        XAResource resource = session1.getXAResource();
+        Xid xid = new TestXid(1);
+        resource.start(xid, XAResource.TMNOFLAGS);
+
+        session1.createModel(model3URI, null);
+
+        resource.end(xid, XAResource.TMSUCCESS);
+        resource.commit(xid, true);
+
+        logger.debug("Starting transaction for session1");
+        assertTrue(resource.setTransactionTimeout(10));
+        assertEquals(10, resource.getTransactionTimeout());
+        resource.start(xid, XAResource.TMNOFLAGS);
+        logger.debug("Started transaction for session1");
+
+        Thread t2 = new Thread("tx2Test") {
+          public void run() {
+            try {
+              Session session2 = database.newSession();
+              try {
+                logger.debug("Obtaining autocommit for session2");
+                session2.setAutoCommit(false);
+                logger.debug("Obtained autocommit for session2");
+
+                // Evaluate the query
+                Answer answer = session2.query(createQuery(model3URI));
+
+                answer.beforeFirst();
+                assertFalse(answer.next());
+                answer.close();
+
+                logger.debug("Releasing autocommit for session2");
+                session2.setAutoCommit(true);
+              } finally {
+                session2.close();
+              }
+            } catch (Exception e) {
+              fail(e);
+            }
+          }
+        };
+        t2.start();
+
+        session1.setModel(model3URI, new ModelResource(fileURI));
+        logger.debug("Sleeping for 10sec");
+        Thread.sleep(10000);
+        logger.debug("Slept for 10sec");
+        try {
+          t2.join(2000L);
+        } catch (InterruptedException ie) {
+          logger.error("wait for tx2-terminated interrupted", ie);
+          fail(ie);
+        }
+        assertFalse("second transaction should've terminated", t2.isAlive());
+
+        boolean xaeThrown = false;
+        try {
+          resource.end(xid, XAResource.TMSUCCESS);
+          resource.commit(xid, true);
+        } catch (XAException xae) {
+          xaeThrown = true;
+        }
+
+        assertTrue("Commit should have failed due to transaction timeout", xaeThrown);
+
+        logger.debug("Rolling back transaction on session1");
+        resource.rollback(xid);
+      } finally {
+        session1.close();
+      }
+    } catch (Exception e) {
+      fail(e);
+    }
+  }
+
+
+  /**
    * Test various operations after a transaction fails.
    */
   public void testTransactionFailure() {

Modified: branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransactionFactory.java
===================================================================
--- branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransactionFactory.java	2008-06-23 05:59:45 UTC (rev 1015)
+++ branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransactionFactory.java	2008-06-23 06:02:39 UTC (rev 1016)
@@ -97,7 +97,7 @@
           // see comment in MulgaraInternalTransactionFactory regarding releasing the lock here
           runWithoutMutex(new TransactionOperation() {
             public void execute() throws MulgaraTransactionException {
-              manager.obtainWriteLock(session, MulgaraExternalTransactionFactory.this);
+              manager.obtainWriteLock(session);
             }
           });
         try {
@@ -105,6 +105,7 @@
           writeTransaction = xa;
           associatedTransaction.put(session, xa);
           sessionXAMap.put(session, xa);
+          transactionCreated(xa, session);
 
           return xa;
         } catch (Throwable th) {
@@ -116,6 +117,7 @@
           MulgaraExternalTransaction xa = new MulgaraExternalTransaction(this, xid, session.newOperationContext(false));
           associatedTransaction.put(session, xa);
           sessionXAMap.put(session, xa);
+          transactionCreated(xa, session);
 
           return xa;
         } catch (QueryException eq) {
@@ -168,6 +170,8 @@
   public void transactionComplete(MulgaraExternalTransaction xa)
       throws MulgaraTransactionException {
     acquireMutex();
+    super.transactionComplete(xa);
+
     try {
       if (xa == null) {
         throw new IllegalArgumentException("Null transaction indicated completion");

Modified: branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraInternalTransactionFactory.java
===================================================================
--- branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraInternalTransactionFactory.java	2008-06-23 05:59:45 UTC (rev 1015)
+++ branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraInternalTransactionFactory.java	2008-06-23 06:02:39 UTC (rev 1016)
@@ -105,7 +105,7 @@
           // ourselves first).
           runWithoutMutex(new TransactionOperation() {
             public void execute() throws MulgaraTransactionException {
-              manager.obtainWriteLock(session, MulgaraInternalTransactionFactory.this);
+              manager.obtainWriteLock(session);
             }
           });
           try {
@@ -122,6 +122,7 @@
 
         xaSessionMap.put(transaction, session);
         sessInfo.transactions.add(transaction);
+        transactionCreated(transaction, session);
 
         return transaction;
       } catch (MulgaraTransactionException em) {
@@ -394,6 +395,8 @@
 
   public void transactionComplete(MulgaraTransaction transaction) throws MulgaraTransactionException {
     acquireMutex();
+    super.transactionComplete(transaction);
+
     try {
       logger.debug("Transaction Complete");
       DatabaseSession session = xaSessionMap.get(transaction);

Modified: branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionFactory.java
===================================================================
--- branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionFactory.java	2008-06-23 05:59:45 UTC (rev 1015)
+++ branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionFactory.java	2008-06-23 06:02:39 UTC (rev 1016)
@@ -26,6 +26,8 @@
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.locks.ReentrantLock;
 import javax.transaction.SystemException;
 import javax.transaction.Transaction;
@@ -67,7 +69,12 @@
     Logger.getLogger(MulgaraTransactionFactory.class.getName());
 
   protected final MulgaraTransactionManager manager;
-  
+
+  protected final Timer reaperTimer;
+  protected final IdleReaper idleReaperTask;
+  protected long idleTimeout;
+  protected ExtXAReaper extXAReaperTask;
+
   /**
    * Contains a reference the the current writing transaction IFF it is managed
    * by this factory.  If there is no current writing transaction, or if the
@@ -81,9 +88,26 @@
     this.manager = manager;
     this.mutex = new ReentrantLock();
     this.writeTransaction = null;
+
+    this.reaperTimer = new Timer("Write-lock Reaper", true);
+    this.idleReaperTask = new IdleReaper(reaperTimer, 10*1000L);        // check every 10 seconds
   }
 
 
+  protected void transactionCreated(MulgaraTransaction transaction, DatabaseSession session) {
+    if (transaction != writeTransaction)
+      return;
+
+    idleTimeout = session.getIdleTimeout() > 0 ? session.getIdleTimeout() : 15*60*1000L;
+    extXAReaperTask = new ExtXAReaper(reaperTimer,
+              session.getTransactionTimeout() > 0 ? session.getTransactionTimeout() : 60*60*1000L);
+  }
+
+  protected void transactionComplete(MulgaraTransaction transaction) throws MulgaraTransactionException {
+    if (transaction == writeTransaction)
+      extXAReaperTask.cancel();
+  }
+
   /**
    * Obtain a transaction context associated with a DatabaseSession.
    *
@@ -175,38 +199,7 @@
   }
 
 
-  /** 
-   * Return time in millis when the current write transaction last activated.
-   */
-  public long getLastActiveHoldingLock() {
-    acquireMutex();
-    try {
-      if (writeTransaction != null) {
-        return writeTransaction.lastActive();
-      } else {
-        return 0;
-      }
-    } finally {
-      releaseMutex();
-    }
-  }
-
   /**
-   * Abort the current write Transaction.
-   */
-  public void abortWriteTransaction() throws MulgaraTransactionException {
-    acquireMutex();
-    try {
-      if (writeTransaction != null) {
-        writeTransaction.abortTransaction("Explicit abort requested by write-lock manager", new Throwable());
-        writeTransaction = null;
-      }
-    } finally {
-      releaseMutex();
-    }
-  }
-
-  /**
    * Abort as many of the transactions as we can.
    */
   protected void abortTransactions(Map<MulgaraTransaction, Throwable> requiresAbort) {
@@ -279,4 +272,69 @@
       }
     }
   }
+
+  private class IdleReaper extends TimerTask {
+    public IdleReaper(Timer timer, long periodMillis) {
+      logger.info("Idle-reaper created: " + System.identityHashCode(this));
+      timer.schedule(this, periodMillis, periodMillis);
+    }
+
+    public void run() {
+      logger.debug("Idle-reaper running");
+      acquireMutex();
+
+      try {
+        if (writeTransaction != null) {
+          long lastActive = writeTransaction.lastActive();
+          if ((lastActive > 0) && (lastActive < System.currentTimeMillis() - idleTimeout)) {
+            logger.warn("Reclaiming writelock from inactive transaction");
+            writeTransaction.abortTransaction("Transaction idle-timeout", new Throwable());
+            writeTransaction = null;
+          } else {
+            if (logger.isDebugEnabled())
+              logger.debug("Transaction still active: " + lastActive + " time: " + System.currentTimeMillis() + " timeout: " + idleTimeout);
+          }
+        } else {
+          logger.debug("No write-lock held.");
+        }
+      } catch (MulgaraTransactionException em) {
+        logger.warn("Exception thrown while reclaiming writelock from inactive transaction");
+      } finally {
+        releaseMutex();
+      }
+    }
+  }
+
+  private class ExtXAReaper extends TimerTask {
+    private boolean die = false;
+
+    public ExtXAReaper(Timer timer, long timeoutMillis) {
+      logger.debug("Extended-transaction-reaper created: " + System.identityHashCode(this));
+      timer.schedule(this, timeoutMillis);
+    }
+
+    public boolean cancel() {
+      die = true;
+      return super.cancel();
+    }
+
+    public void run() {
+      acquireMutex();
+
+      try {
+        if (die)
+          return;       // we were cancelled after being scheduled but before mutex was acquired
+
+        if (writeTransaction != null) {
+          logger.warn("Reclaiming writelock from over-extended transaction");
+          writeTransaction.abortTransaction("Transaction timed out", new Throwable());
+          writeTransaction = null;
+        }
+      } catch (MulgaraTransactionException em) {
+        logger.warn("Exception thrown while reclaiming writelock from over-extended transaction", em);
+      } finally {
+        releaseMutex();
+      }
+    }
+  }
 }

Modified: branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionManager.java
===================================================================
--- branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionManager.java	2008-06-23 05:59:45 UTC (rev 1015)
+++ branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionManager.java	2008-06-23 06:02:39 UTC (rev 1016)
@@ -22,8 +22,6 @@
 package org.mulgara.resolver;
 
 // Java2 packages
-import java.util.Timer;
-import java.util.TimerTask;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -68,7 +66,6 @@
 
   // Write lock is associated with a session.
   private DatabaseSession sessionHoldingWriteLock;
-  private MulgaraTransactionFactory factoryWithWriteTransaction;
 
   // Used to support write-lock reservation.
   private DatabaseSession sessionReservingWriteLock;
@@ -80,23 +77,14 @@
   private final MulgaraInternalTransactionFactory internalFactory;
   private final MulgaraExternalTransactionFactory externalFactory;
 
-  private final Timer reaperTimer;
-  private IdleReaper idleReaperTask;
-  private long idleTimeout;
-  private LockReaper lockReaperTask;
-
   public MulgaraTransactionManager(TransactionManagerFactory transactionManagerFactory) {
     this.sessionHoldingWriteLock = null;
     this.sessionReservingWriteLock = null;
-    this.factoryWithWriteTransaction = null;
     this.mutex = new ReentrantLock();
     this.writeLockCondition = this.mutex.newCondition();
 
     this.internalFactory = new MulgaraInternalTransactionFactory(this, transactionManagerFactory);
     this.externalFactory = new MulgaraExternalTransactionFactory(this);
-
-    this.reaperTimer = new Timer("Write-lock Reaper", true);
-    this.idleReaperTask = new IdleReaper(reaperTimer, 10*1000L);        // check every 10 seconds
   }
 
 
@@ -112,7 +100,7 @@
   /** 
    * Obtains the write lock.
    */
-  void obtainWriteLock(DatabaseSession session, MulgaraTransactionFactory factory) throws MulgaraTransactionException {
+  void obtainWriteLock(DatabaseSession session) throws MulgaraTransactionException {
     acquireMutex();
     try {
       if (sessionHoldingWriteLock == session) {
@@ -131,10 +119,6 @@
         logger.debug("Obtaining write lock", new Throwable());
       }
       sessionHoldingWriteLock = session;
-      factoryWithWriteTransaction = factory;
-      this.idleTimeout = session.getIdleTimeout() > 0 ? session.getIdleTimeout() : 15*60*1000L;
-      this.lockReaperTask =
-          new LockReaper(reaperTimer, session.getTransactionTimeout() > 0 ? session.getTransactionTimeout() : 60*60*1000L);
     } finally {
       releaseMutex();
     }
@@ -163,8 +147,6 @@
         logger.debug("Releasing writelock", new Throwable());
       }
       sessionHoldingWriteLock = null;
-      factoryWithWriteTransaction = null;
-      lockReaperTask.cancel();
       writeLockCondition.signal();
     } finally {
       releaseMutex();
@@ -293,65 +275,4 @@
       releaseMutex();
     }
   }
-
-  private class IdleReaper extends TimerTask {
-    public IdleReaper(Timer timer, long periodMillis) {
-      logger.info("Idle-reaper created: " + System.identityHashCode(this));
-      timer.schedule(this, periodMillis, periodMillis);
-    }
-
-    public void run() {
-      logger.debug("Idle-reaper running");
-      acquireMutex();
-
-      try {
-        if (factoryWithWriteTransaction != null) {
-          long lastActive = factoryWithWriteTransaction.getLastActiveHoldingLock();
-          if ((lastActive > 0) && (lastActive < System.currentTimeMillis() - idleTimeout)) {
-            logger.warn("Reclaiming writelock from inactive transaction");
-            factoryWithWriteTransaction.abortWriteTransaction();
-          } else {
-            if (logger.isDebugEnabled())
-              logger.debug("Transaction still active: " + lastActive + " time: " + System.currentTimeMillis() + " timeout: " + idleTimeout);
-          }
-        } else {
-          logger.debug("No write-lock held.");
-        }
-      } catch (MulgaraTransactionException em) {
-        logger.warn("Exception thrown while reclaiming writelock from inactive transaction");
-      } finally {
-        releaseMutex();
-      }
-    }
-  }
-
-  private class LockReaper extends TimerTask {
-    private boolean die = false;
-
-    public LockReaper(Timer timer, long timeoutMillis) {
-      logger.debug("Lock-reaper created: " + System.identityHashCode(this));
-      timer.schedule(this, timeoutMillis);
-    }
-
-    public boolean cancel() {
-      die = true;
-      return super.cancel();
-    }
-
-    public void run() {
-      acquireMutex();
-
-      try {
-        if (die)
-          return;       // we were cancelled after being scheduled but before mutex was acquired
-
-        logger.warn("Reclaiming writelock from over-extended transaction");
-        factoryWithWriteTransaction.abortWriteTransaction();
-      } catch (MulgaraTransactionException em) {
-        logger.warn("Exception thrown while reclaiming writelock from over-extended transaction", em);
-      } finally {
-        releaseMutex();
-      }
-    }
-  }
 }




More information about the Mulgara-svn mailing list