[Mulgara-svn] r956 - branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver

andrae at mulgara.org andrae at mulgara.org
Thu May 22 07:25:14 UTC 2008


Author: andrae
Date: 2008-05-22 00:25:13 -0700 (Thu, 22 May 2008)
New Revision: 956

Modified:
   branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/AdvDatabaseSessionUnitTest.java
   branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/Database.java
   branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionManager.java
Log:
I'm not positive there aren't concurreny problems yet, I would like to take a
bit longer to test, but this should work.



Modified: branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/AdvDatabaseSessionUnitTest.java
===================================================================
--- branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/AdvDatabaseSessionUnitTest.java	2008-05-21 06:51:34 UTC (rev 955)
+++ branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/AdvDatabaseSessionUnitTest.java	2008-05-22 07:25:13 UTC (rev 956)
@@ -125,6 +125,7 @@
     suite.addTest(new AdvDatabaseSessionUnitTest("testImplicitCommitQuery"));
     suite.addTest(new AdvDatabaseSessionUnitTest("testConcurrentExplicitTxn"));
     suite.addTest(new AdvDatabaseSessionUnitTest("testConcurrentImplicitTxn"));
+    suite.addTest(new AdvDatabaseSessionUnitTest("testConcurrentImplicitRecovery"));
     suite.addTest(new AdvDatabaseSessionUnitTest("testPrefixingWithUnbound"));
     suite.addTest(new AdvDatabaseSessionUnitTest("testDatabaseDelete"));
     suite.addTest(new AdvDatabaseSessionUnitTest("testCreateModel"));
@@ -1519,6 +1520,106 @@
   }
 
 
+  /**
+   * Test two simultaneous transactions, the first one obtains the write-lock
+   * and sleeps longer than the recovery timeout.
+   */
+  public void testConcurrentImplicitRecovery() throws URISyntaxException {
+    logger.info("testConcurrentImplicitRecovery");
+    URI fileURI  = new File("data/xatest-model1.rdf").toURI();
+
+    database.setWriteLockTimeout(30000);
+    try {
+      Session session1 = database.newSession();
+      try {
+        session1.createModel(model3URI, null);
+        logger.debug("Obtaining autocommit for session1");
+        session1.setAutoCommit(false);
+        logger.debug("Obtained autocommit for session1");
+
+        Thread t2 = new Thread("tx2Test") {
+          public void run() {
+            try {
+              Session session2 = database.newSession();
+              try {
+                logger.debug("Obtaining autocommit for session2");
+                session2.setAutoCommit(false);
+                logger.debug("Obtained autocommit for session2");
+                Variable subjectVariable   = new Variable("subject");
+                Variable predicateVariable = new Variable("predicate");
+                Variable objectVariable    = new Variable("object");
+
+                List<SelectElement> selectList = new ArrayList<SelectElement>(3);
+                selectList.add(subjectVariable);
+                selectList.add(predicateVariable);
+                selectList.add(objectVariable);
+
+                // Evaluate the query
+                Answer answer = session2.query(new Query(
+                  selectList,                                       // SELECT
+                  new ModelResource(model3URI),                      // FROM
+                  new ConstraintImpl(subjectVariable,               // WHERE
+                                 predicateVariable,
+                                 objectVariable),
+                  null,                                             // HAVING
+                  Arrays.asList(new Order[] {                       // ORDER BY
+                    new Order(subjectVariable, true),
+                    new Order(predicateVariable, true),
+                    new Order(objectVariable, true)
+                  }),
+                  null,                                             // LIMIT
+                  0,                                                // OFFSET
+                  new UnconstrainedAnswer()                         // GIVEN
+                ));
+
+                answer.beforeFirst();
+                assertFalse(answer.next());
+                answer.close();
+
+                logger.debug("Releasing autocommit for session2");
+                session2.setAutoCommit(true);
+              } finally {
+                session2.close();
+              }
+            } catch (Exception e) {
+              fail(e);
+            }
+          }
+        };
+        t2.start();
+
+        session1.setModel(model3URI, new ModelResource(fileURI));
+        logger.debug("Sleeping for 40sec");
+        Thread.sleep(40000);
+        logger.debug("Slept for 40sec");
+        try {
+          t2.join(2000L);
+        } catch (InterruptedException ie) {
+          logger.error("wait for tx2-terminated interrupted", ie);
+          fail(ie);
+        }
+        assertFalse("second transaction should've terminated", t2.isAlive());
+
+        boolean qeThrown = false;
+        try {
+          session1.commit();
+        } catch (QueryException eq) {
+          qeThrown = true;
+        }
+
+        assertTrue("Commit should have failed due to lock timeout", qeThrown);
+
+        logger.debug("Releasing autocommit for session1");
+        session1.setAutoCommit(true);
+      } finally {
+        session1.close();
+      }
+    } catch (Exception e) {
+      fail(e);
+    }
+  }
+
+
   public void testPrefixingWithUnbound() throws URISyntaxException {
     logger.warn("testPrefixingWithUnbound");
     URI fileURI  = new File("data/prefix-unbound.rdf").toURI();

Modified: branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/Database.java
===================================================================
--- branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/Database.java	2008-05-21 06:51:34 UTC (rev 955)
+++ branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/Database.java	2008-05-22 07:25:13 UTC (rev 956)
@@ -828,6 +828,11 @@
   }
 
 
+  public void setWriteLockTimeout(long timeout) {
+    transactionManager.setLockTimeout(timeout);
+  }
+
+
   /**
    * Flush all resources associated with the database into a recoverable state.
    */

Modified: branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionManager.java
===================================================================
--- branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionManager.java	2008-05-21 06:51:34 UTC (rev 955)
+++ branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionManager.java	2008-05-22 07:25:13 UTC (rev 956)
@@ -81,7 +81,7 @@
   private final MulgaraExternalTransactionFactory externalFactory;
 
   private final Timer reaperTimer;
-  private final TimerTask reaperTask;
+  private LockReaper reaperTask;
 
   public MulgaraTransactionManager(TransactionManagerFactory transactionManagerFactory) {
     this.sessionHoldingWriteLock = null;
@@ -170,6 +170,16 @@
     internalFactory.setTransactionTimeout(transactionTimeout);
   }
 
+
+  public void setLockTimeout(long timeout) {
+    acquireMutex();
+    try {
+      reaperTask.setTimeout(timeout);
+    } finally {
+      releaseMutex();
+    }
+  }
+
   /**
    * Used to replace the built in monitor to allow it to be properly released
    * during potentially blocking operations.  All potentially blocking
@@ -298,34 +308,47 @@
     private Timer timer;
     protected long timeoutMillis;
 
-    private boolean scheduled;
+    private boolean die;
 
     public LockReaper(Timer timer, long timeoutMillis) {
+      logger.info("Lock-reaper created: " + System.identityHashCode(this));
       this.timer = timer;
       this.timeoutMillis = (timeoutMillis > MIN_PERIOD) ? timeoutMillis : MIN_PERIOD; 
-      this.scheduled = false;
-      schedule(timeoutMillis);
+      this.die = false;
+      schedule();
     }
 
     public void run() {
       logger.warn("Lock-reaper running");
       acquireMutex();
-      scheduled = false;
-      boolean scheduleDefault;
+
       try {
-        long lastActive = factoryWithWriteTransaction.getLastActiveHoldingLock();
-        if ((lastActive > 0) && (lastActive < System.currentTimeMillis() - timeoutMillis)) {
-          logger.warn("Reclaiming writelock from inactive transaction");
-          factoryWithWriteTransaction.abortWriteTransaction();
-          schedule(timeoutMillis);
+        if (die) {
+          logger.info("Lock-reaper dying on request: " + System.identityHashCode(this));
+          return;
+        }
+
+        if (factoryWithWriteTransaction != null) {
+          long lastActive = factoryWithWriteTransaction.getLastActiveHoldingLock();
+          if ((lastActive > 0) && (lastActive < System.currentTimeMillis() - timeoutMillis)) {
+            logger.warn("Reclaiming writelock from inactive transaction");
+            factoryWithWriteTransaction.abortWriteTransaction();
+          } else {
+            logger.debug("Transaction still active: " + lastActive + " time: " + System.currentTimeMillis() + " timeout: " + timeoutMillis);
+          }
         } else {
-          schedule(timeoutMillis - (System.currentTimeMillis() - lastActive));
+          logger.debug("No write-lock held.");
         }
       } catch (MulgaraTransactionException em) {
         logger.warn("Exception thrown while reclaiming writelock from inactive transaction");
       } finally {
         try {
-          if (!scheduled) schedule(timeoutMillis);
+          if (!die) {
+            logger.debug("Rescheduling lock-reaper: " + System.identityHashCode(this));
+            schedule();
+          } else {
+            logger.debug("Not rescheduling lock-reaper on request: " + System.identityHashCode(this));
+          }
         } finally {
           releaseMutex();
         }
@@ -335,28 +358,10 @@
     public void setTimeout(long timeout) {
       acquireMutex();
       try {
-        this.cancel();
-
-        timeoutMillis = (timeoutMillis > MIN_PERIOD) ? timeoutMillis : MIN_PERIOD; 
-
-        if (factoryWithWriteTransaction == null) {
-          schedule(timeoutMillis);
-
-          return;
-        }
-
-        long lastActive = factoryWithWriteTransaction.getLastActiveHoldingLock();
-        if (lastActive > 0) {
-          schedule(timeoutMillis - (System.currentTimeMillis() - lastActive));
-        } else {
-          schedule(timeoutMillis);
-        }
+        reaperTask = new LockReaper(timer, timeout);
+        this.die = true;
       } finally {
-        try {
-          if (!scheduled) schedule(timeoutMillis);
-        } finally {
-          releaseMutex();
-        }
+        releaseMutex();
       }
     }
 
@@ -365,18 +370,25 @@
       try {
         return timeoutMillis;
       } finally {
-        try {
-          if (!scheduled) schedule(timeoutMillis);
-        } finally {
-          releaseMutex();
-        }
+        releaseMutex();
       }
     }
 
-    private void schedule(long timeout) {
+    private void schedule() {
+      long timeout = timeoutMillis;
+
+      if (factoryWithWriteTransaction != null) {
+        long lastActive = factoryWithWriteTransaction.getLastActiveHoldingLock();
+
+        if (lastActive > 0) {
+          timeout = timeoutMillis - (System.currentTimeMillis() - lastActive);
+        }
+      }
+        
+      timeout =  (timeout > MIN_PERIOD) ? timeout : MIN_PERIOD; 
+
       logger.warn("Scheduling lock-reaper to run in: " + timeout + "ms");
-      timer.schedule(this, (timeout < MIN_PERIOD) ? MIN_PERIOD : timeout);
-      scheduled = true;
+      timer.schedule(this, timeout);
     }
   }
 }




More information about the Mulgara-svn mailing list