[Mulgara-svn] r1010 - branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver

ronald at mulgara.org ronald at mulgara.org
Mon Jun 23 02:37:03 UTC 2008


Author: ronald
Date: 2008-06-22 19:37:02 -0700 (Sun, 22 Jun 2008)
New Revision: 1010

Modified:
   branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/AdvDatabaseSessionUnitTest.java
   branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraInternalTransaction.java
   branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraInternalTransactionFactory.java
Log:
Fix transaction state handling after a failed operation in non-autocommit mode
and related issues:
 * track explicit transactions on the session, so that getTransaction() returns
   the current transaction even after a failure.
 * ensure the only valid operations on a session after a failure are
   setAutoCommit(true) and rollback(), both of which reset the failure; all
   others will throw an exception.
 * autoCommit is not a global flag, but instead must be tracked per session
 * added tests for this stuff
 * added tests for transaction operations in auto-commit mode.


Modified: branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/AdvDatabaseSessionUnitTest.java
===================================================================
--- branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/AdvDatabaseSessionUnitTest.java	2008-06-22 22:54:54 UTC (rev 1009)
+++ branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/AdvDatabaseSessionUnitTest.java	2008-06-23 02:37:02 UTC (rev 1010)
@@ -130,6 +130,8 @@
     suite.addTest(new AdvDatabaseSessionUnitTest("testDatabaseDelete"));
     suite.addTest(new AdvDatabaseSessionUnitTest("testCreateModel"));
     suite.addTest(new AdvDatabaseSessionUnitTest("testInsertionBlankNodes"));
+    suite.addTest(new AdvDatabaseSessionUnitTest("testAutoCommitTransactionOps"));
+    suite.addTest(new AdvDatabaseSessionUnitTest("testTransactionFailure"));
 
     return suite;
   }
@@ -1170,6 +1172,160 @@
   }
 
 
+  /**
+   * Test various transaction operations in auto-commit mode.
+   */
+  public void testAutoCommitTransactionOps() {
+    logger.info("Testing autoCommitTransactionOps");
+
+    try {
+      Session session = database.newSession();
+      try {
+        // should be a no-op
+        session.setAutoCommit(true);
+
+        // commit should not be allowed
+        try {
+          session.commit();
+          fail("Commit did not fail in auto-commit mode");
+        } catch (QueryException qe) {
+        }
+
+        // rollback should not be allowed
+        try {
+          session.rollback();
+          fail("Rollback did not fail in auto-commit mode");
+        } catch (QueryException qe) {
+        }
+
+        // verify we're still good to go
+        session.query(createQuery(modelURI)).close();
+
+        // verify second setAutoCommit(false) is a no-op
+        session.createModel(model3URI, null);
+        session.setAutoCommit(false);
+        session.setModel(model3URI, new ModelResource(new File("data/xatest-model1.rdf").toURI()));
+        session.setAutoCommit(false);
+        session.commit();
+
+        Answer answer = session.query(createQuery(model3URI));
+        compareResults(expectedResults(), answer);
+        answer.close();
+
+        session.removeModel(model3URI);
+      } finally {
+        session.close();
+      }
+    } catch (Exception e) {
+      fail(e);
+    }
+  }
+
+
+  /**
+   * Test various operations after a transaction fails.
+   */
+  public void testTransactionFailure() {
+    logger.info("Testing transactionFailure");
+
+    try {
+      // query after failure should fail
+      shouldFail(new TestOp() {
+        public void run(Session session) throws Exception {
+          session.query(createQuery(modelURI)).close();
+        }
+      }, "Query in failed transaction did not fail");
+
+      // insert after failure should fail
+      shouldFail(new TestOp() {
+        public void run(Session session) throws Exception {
+          session.insert(modelURI, Collections.singleton((Triple)new TripleImpl(
+                                                    new URIReferenceImpl(URI.create("test:a")),
+                                                    new URIReferenceImpl(URI.create("test:b")),
+                                                    new URIReferenceImpl(URI.create("test:c")))));
+        }
+      }, "Insert in failed transaction did not fail");
+
+      // commit after failure should fail
+      shouldFail(new TestOp() {
+        public void run(Session session) throws Exception {
+          session.commit();
+        }
+      }, "Commit in failed transaction did not fail");
+
+      // rollback after failure should succeed
+      shouldSucceed(new TestOp() {
+        public void run(Session session) throws Exception {
+          session.rollback();
+        }
+      });
+
+      // setAutoCommit(false) after failure should fail
+      shouldFail(new TestOp() {
+        public void run(Session session) throws Exception {
+          session.setAutoCommit(false);
+        }
+      }, "setAutoCommit(false) in failed transaction did not fail");
+
+      // setAutoCommit(true) after failure should succeed
+      shouldSucceed(new TestOp() {
+        public void run(Session session) throws Exception {
+          session.setAutoCommit(true);
+        }
+      });
+    } catch (Exception e) {
+      fail(e);
+    }
+  }
+
+  private void shouldFail(TestOp op, String msg) throws Exception {
+    testTransactionFailureOp(op, true, msg);
+  }
+
+  private void shouldSucceed(TestOp op) throws Exception {
+    testTransactionFailureOp(op, false, null);
+  }
+
+  private void testTransactionFailureOp(TestOp op, boolean shouldFail, String msg) throws Exception {
+    Session session = database.newSession();
+    try {
+      // start tx
+      session.setAutoCommit(false);
+
+      // run bad query -> failed tx
+      try {
+        session.query(createQuery(URI.create("urn:no:such:model")));
+        fail("Bad query failed to throw exception");
+      } catch (QueryException qe) {
+      }
+
+      // run test op, verify it succeeds/fails, and reset
+      if (shouldFail) {
+        try {
+          op.run(session);
+          fail(msg);
+        } catch (QueryException qe) {
+        }
+        session.setAutoCommit(true);
+      } else {
+        op.run(session);
+      }
+
+      // verify we're good to go
+      session.query(createQuery(modelURI)).close();
+
+      session.setAutoCommit(false);
+      session.query(createQuery(modelURI)).close();
+      session.commit();
+    } finally {
+      session.close();
+    }
+  }
+
+  private static interface TestOp {
+    public void run(Session session) throws Exception;
+  }
+
   //
   // Internal methods
   //

Modified: branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraInternalTransaction.java
===================================================================
--- branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraInternalTransaction.java	2008-06-22 22:54:54 UTC (rev 1009)
+++ branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraInternalTransaction.java	2008-06-23 02:37:02 UTC (rev 1010)
@@ -487,7 +487,7 @@
             context.clear();
             enlisted.clear();
             state = State.FAILED;
-            factory.transactionComplete(this);
+            factory.transactionAborted(this);
             factory = null;
             return new MulgaraTransactionException("Transaction rollback triggered", cause);
         case DEACTREF:

Modified: branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraInternalTransactionFactory.java
===================================================================
--- branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraInternalTransactionFactory.java	2008-06-22 22:54:54 UTC (rev 1009)
+++ branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraInternalTransactionFactory.java	2008-06-23 02:37:02 UTC (rev 1010)
@@ -61,25 +61,27 @@
   private static final Logger logger =
     Logger.getLogger(MulgaraInternalTransactionFactory.class.getName());
 
-  private boolean autoCommit;
-
   /** Set of sessions whose transactions have been rolledback.*/
   private Set<DatabaseSession> failedSessions;
 
   /** Map of threads to active transactions. */
   private Map<Thread, MulgaraTransaction> activeTransactions;
 
-  private Assoc1toNMap<DatabaseSession, MulgaraTransaction> sessionXAMap;
+  /** Map of transactions to associated sessions. */
+  private Map<MulgaraTransaction, DatabaseSession> xaSessionMap;
 
+  /** Per session info. */
+  private Map<DatabaseSession, SessionInfo> sessionInfoMap;
+
   private final TransactionManager transactionManager;
 
   public MulgaraInternalTransactionFactory(MulgaraTransactionManager manager, TransactionManagerFactory transactionManagerFactory) {
     super(manager);
-    this.autoCommit = true;
 
     this.failedSessions = new HashSet<DatabaseSession>();
     this.activeTransactions = new HashMap<Thread, MulgaraTransaction>();
-    this.sessionXAMap = new Assoc1toNMap<DatabaseSession, MulgaraTransaction>();
+    this.xaSessionMap = new HashMap<MulgaraTransaction, DatabaseSession>();
+    this.sessionInfoMap = new HashMap<DatabaseSession, SessionInfo>();
 
     this.transactionManager = transactionManagerFactory.newTransactionManager();
   }
@@ -88,8 +90,9 @@
       throws MulgaraTransactionException {
     acquireMutex();
     try {
-      if (manager.isHoldingWriteLock(session)) {
-        return writeTransaction;
+      SessionInfo sessInfo = getOrCreateSessionInfo(session);
+      if (sessInfo.explicitXA != null) {
+        return sessInfo.explicitXA;
       }
 
       try {
@@ -112,7 +115,8 @@
           transaction = new MulgaraInternalTransaction(this, session.newOperationContext(false));
         }
 
-        sessionXAMap.put(session, transaction);
+        xaSessionMap.put(transaction, session);
+        sessInfo.transactions.add(transaction);
 
         return transaction;
       } catch (MulgaraTransactionException em) {
@@ -128,8 +132,8 @@
   public Set<MulgaraTransaction> getTransactionsForSession(DatabaseSession session) {
     acquireMutex();
     try {
-      Set <MulgaraTransaction> xas = sessionXAMap.getN(session);
-      return xas == null ? Collections.<MulgaraTransaction>emptySet() : xas;
+      SessionInfo sessInfo = sessionInfoMap.get(session);
+      return sessInfo == null ? Collections.<MulgaraTransaction>emptySet() : sessInfo.transactions;
     } finally {
       releaseMutex();
     }
@@ -144,15 +148,15 @@
   public void commit(DatabaseSession session) throws MulgaraTransactionException {
     acquireMutex();
     try {
+      if (failedSessions.contains(session)) {
+        throw new MulgaraTransactionException("Attempting to commit failed session");
+      } else if (!manager.isHoldingWriteLock(session)) {
+        throw new MulgaraTransactionException(
+            "Attempting to commit while not the current writing transaction");
+      }
+
       manager.reserveWriteLock(session);
       try {
-        if (failedSessions.contains(session)) {
-          throw new MulgaraTransactionException("Attempting to commit failed exception");
-        } else if (!manager.isHoldingWriteLock(session)) {
-          throw new MulgaraTransactionException(
-              "Attempting to commit while not the current writing transaction");
-        }
-
         setAutoCommit(session, true);
         setAutoCommit(session, false);
       } finally {
@@ -172,9 +176,9 @@
   public void rollback(DatabaseSession session) throws MulgaraTransactionException {
     acquireMutex();
     try {
-      manager.reserveWriteLock(session);
-      try {
-        if (manager.isHoldingWriteLock(session)) {
+      if (manager.isHoldingWriteLock(session)) {
+        manager.reserveWriteLock(session);
+        try {
           try {
             writeTransaction.execute(new TransactionOperation() {
                 public void execute() throws MulgaraTransactionException {
@@ -188,18 +192,19 @@
                   new MulgaraTransactionException("Rollback failed to terminate write transaction"));
             }
           } finally {
-            failedSessions.add(session);
+            sessionInfoMap.get(session).explicitXA = null;
             setAutoCommit(session, false);
           }
-        } else if (failedSessions.contains(session)) {
-          failedSessions.remove(session);
-          setAutoCommit(session, false);
-        } else {
-          throw new MulgaraTransactionException(
-              "Attempt to rollback while not in the current writing transaction");
+        } finally {
+          manager.releaseReserve(session);
         }
-      } finally {
-        manager.releaseReserve(session);
+      } else if (failedSessions.contains(session)) {
+        sessionInfoMap.get(session).explicitXA = null;
+        failedSessions.remove(session);
+        setAutoCommit(session, false);
+      } else {
+        throw new MulgaraTransactionException(
+            "Attempt to rollback while not in the current writing transaction");
       }
     } finally {
       releaseMutex();
@@ -215,8 +220,13 @@
             new MulgaraTransactionException("Failed Session in setAutoCommit"));
       }
 
+      SessionInfo sessInfo = getOrCreateSessionInfo(session);
+
       if (manager.isHoldingWriteLock(session) || failedSessions.contains(session)) {
         if (autoCommit) {
+          sessInfo.autoCommit = true;
+          sessInfo.explicitXA = null;
+
           // AutoCommit off -> on === branch on current state of transaction.
           if (manager.isHoldingWriteLock(session)) {
             // Within active transaction - commit and finalise.
@@ -238,7 +248,6 @@
               if (manager.isHoldingWriteLock(session)) {
                 manager.releaseWriteLock(session);
               }
-              this.autoCommit = true;
             }
           } else if (failedSessions.contains(session)) {
             // Within failed transaction - cleanup.
@@ -246,12 +255,7 @@
           }
         } else {
           if (!manager.isHoldingWriteLock(session)) {
-            if (failedSessions.contains(session)) {
-              failedSessions.remove(session);
-              setAutoCommit(session, false);
-            } else {
-              throw new IllegalStateException("Can't reach here");
-            }
+            throw new MulgaraTransactionException("Attempting set auto-commit false in failed session");
           } else {
             // AutoCommit off -> off === no-op. Log info.
             if (logger.isInfoEnabled()) {
@@ -260,6 +264,7 @@
           }
         }
       } else {
+        sessInfo.explicitXA = null;
         if (autoCommit) {
           // AutoCommit on -> on === no-op.  Log info.
           logger.info("Attempting to set autocommit true without setting it false");
@@ -267,7 +272,8 @@
           // AutoCommit on -> off == Start new transaction.
           getTransaction(session, true); // Set's writeTransaction.
           writeTransaction.reference();
-          this.autoCommit = false;
+          sessInfo.explicitXA = writeTransaction;
+          sessInfo.autoCommit = false;
         }
       }
     } finally {
@@ -291,7 +297,7 @@
           throw new MulgaraTransactionException("Attempt to start transaction twice");
         }
 
-        setTransactionTimeout((int) (sessionXAMap.get1(transaction).getTransactionTimeout() / 1000));
+        setTransactionTimeout((int) (xaSessionMap.get(transaction).getTransactionTimeout() / 1000));
         transactionManager.begin();
         Transaction jtaTrans = transactionManager.getTransaction();
 
@@ -338,7 +344,8 @@
               "Attempt to suspend transaction from outside thread");
         }
 
-        if (autoCommit && transaction == writeTransaction) {
+        SessionInfo sessInfo = sessionInfoMap.get(xaSessionMap.get(transaction));
+        if (sessInfo.autoCommit && transaction == writeTransaction) {
           logger.error("Attempt to suspend write transaction without setting AutoCommit Off");
           throw new MulgaraTransactionException(
               "Attempt to suspend write transaction without setting AutoCommit Off");
@@ -362,22 +369,39 @@
     }
   }
 
+  public void closingSession(DatabaseSession session) throws MulgaraTransactionException {
+    acquireMutex();
+    try {
+      try {
+        super.closingSession(session);
+      } finally {
+        SessionInfo sessInfo = sessionInfoMap.remove(session);
+        if (sessInfo != null) {
+          xaSessionMap.keySet().removeAll(sessInfo.transactions);
+        }
+      }
+    } finally {
+      releaseMutex();
+    }
+  }
+
   public void transactionComplete(MulgaraTransaction transaction) throws MulgaraTransactionException {
     acquireMutex();
     try {
       logger.debug("Transaction Complete");
+      DatabaseSession session = xaSessionMap.get(transaction);
+      if (session == null) {
+        throw new MulgaraTransactionException("No associated session found for transaction");
+      }
       if (transaction == writeTransaction) {
-        DatabaseSession session = sessionXAMap.get1(transaction);
-        if (session == null) {
-          throw new MulgaraTransactionException("No associated session found for write transaction");
-        }
         if (manager.isHoldingWriteLock(session)) {
           manager.releaseWriteLock(session);
           writeTransaction = null;
         }
       }
 
-      sessionXAMap.removeN(transaction);
+      xaSessionMap.remove(transaction);
+      sessionInfoMap.get(session).transactions.remove(transaction);
       activeTransactions.remove(Thread.currentThread());
     } finally {
       releaseMutex();
@@ -390,7 +414,7 @@
       try {
         // Make sure this cleans up the transaction metadata - this transaction is DEAD!
         if (transaction == writeTransaction) {
-          failedSessions.add(sessionXAMap.get1(transaction));
+          failedSessions.add(xaSessionMap.get(transaction));
         }
         transactionComplete(transaction);
       } catch (Throwable th) {
@@ -409,4 +433,21 @@
       logger.warn("Unable to set transaction timeout: " + transactionTimeout, es);
     }
   }
+
+  private SessionInfo getOrCreateSessionInfo(DatabaseSession session) {
+    SessionInfo info = sessionInfoMap.get(session);
+    if (info == null)
+      sessionInfoMap.put(session, info = new SessionInfo());
+    return info;
+  }
+
+  private static class SessionInfo {
+    public boolean autoCommit = true;
+
+    /** All uncompleted transactions (may be more than 1 because of unclosed answers) */
+    public Set<MulgaraTransaction> transactions = new HashSet<MulgaraTransaction>();
+
+    /** Currently associated explicit transaction */
+    public MulgaraTransaction explicitXA;
+  }
 }




More information about the Mulgara-svn mailing list