[Mulgara-svn] r1259 - trunk/src/jar/resolver/java/org/mulgara/resolver

ronald at mulgara.org ronald at mulgara.org
Tue Sep 9 09:45:26 UTC 2008


Author: ronald
Date: 2008-09-09 02:45:25 -0700 (Tue, 09 Sep 2008)
New Revision: 1259

Modified:
   trunk/src/jar/resolver/java/org/mulgara/resolver/ExternalTransactionUnitTest.java
   trunk/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransaction.java
Log:
Added better exception handling for for errors from the internal XAResource's.
This is now fairly close to the spec.

Also added proper handling of XA_RDONLY return from prepare().


Modified: trunk/src/jar/resolver/java/org/mulgara/resolver/ExternalTransactionUnitTest.java
===================================================================
--- trunk/src/jar/resolver/java/org/mulgara/resolver/ExternalTransactionUnitTest.java	2008-09-09 06:07:51 UTC (rev 1258)
+++ trunk/src/jar/resolver/java/org/mulgara/resolver/ExternalTransactionUnitTest.java	2008-09-09 09:45:25 UTC (rev 1259)
@@ -1837,6 +1837,8 @@
           assertFalse(answer.next());
           answer.close();
           fail("query should've gotten aborted");
+        } catch (QueryException qe) {
+          logger.debug("query was aborted", qe);
         } catch (TuplesException te) {
           logger.debug("query was aborted", te);
         }
@@ -2301,6 +2303,57 @@
           }
           assertTrue("transaction should've completed step2", steps[2]);
         }
+
+        int RB = XAException.XA_RBOTHER;
+        // res.start fails immediately with no error-code
+        doResourceFailureTest(4, session1, resource1, testModel, 1, -1, -1, -1, false, 0, true,
+                              1, 0, 0, 0,
+                              1, 0, 0, 0, 0, 0, 0);
+
+        // res.start fails immediately with rollback error-code
+        doResourceFailureTest(5, session1, resource1, testModel, 1, -1, -1, -1, false, RB, true,
+                              1, 0, 0, 0,
+                              1, 0, 0, 0, 0, 0, 1);
+
+        // res.end fails on suspend with unspecified error-code
+        doResourceFailureTest(6, session1, resource1, testModel, -1, 1, -1, -1, false, 0, true,
+                              1, 1, 0, 0,
+                              1, 1, 0, 0, 0, 0, 0);
+
+        // res.end fails on suspend with rollback error-code
+        doResourceFailureTest(7, session1, resource1, testModel, -1, 1, -1, -1, false, RB, true,
+                              1, 1, 0, 0,
+                              1, 1, 0, 0, 0, 0, 1);
+
+        // res.start fails on resume with unspecified error-code
+        doResourceFailureTest(8, session1, resource1, testModel, -1, -1, 1, -1, false, 0, true,
+                              1, 1, 1, 0,
+                              1, 1, 1, 0, 0, 0, 0);
+
+        // res.start fails on resume with rollback error-code
+        doResourceFailureTest(9, session1, resource1, testModel, -1, -1, 1, -1, false, RB, true,
+                              1, 1, 1, 0,
+                              1, 1, 1, 0, 0, 0, 1);
+
+        // res.end fails on end with unspecified error-code
+        doResourceFailureTest(10, session1, resource1, testModel, -1, -1, -1, 1, false, 0, false,
+                              1, 2, 1, 0,
+                              1, 2, 2, 1, 0, 0, 0);
+
+        // res.end fails on end with rollback error-code
+        doResourceFailureTest(11, session1, resource1, testModel, -1, -1, -1, 1, false, RB, false,
+                              1, 2, 1, 0,
+                              1, 2, 2, 1, 0, 0, 1);
+
+        // res.prepare fails with unspecified error-code
+        doResourceFailureTest(12, session1, resource1, testModel, -1, -1, -1, -1, true, 0, false,
+                              1, 2, 1, 0,
+                              1, 2, 2, 1, 1, 0, 1);
+
+        // res.prepare fails with rollback error-code
+        doResourceFailureTest(13, session1, resource1, testModel, -1, -1, -1, -1, true, RB, false,
+                              1, 2, 1, 0,
+                              1, 2, 2, 1, 1, 0, 0);
       } finally {
         session1.close();
       }
@@ -2309,11 +2362,76 @@
     }
   }
 
+  private void doResourceFailureTest(int testNum, Session session, XAResource resource, URI testModel,
+                                     int failStartAfter, int failSuspendAfter, int failResumeAfter,
+                                     int failEndAfter, boolean failPrepare, int errorCode, boolean qryFails,
+                                     int pfStartCnt, int pfSuspendCnt, int pfResumeCnt, int pfEndCnt,
+                                     int endStartCnt, int endSuspendCnt, int endResumeCnt, int endEndCnt,
+                                     int endPrepareCnt, int endCommitCnt, int endRollbackCnt)
+      throws Exception {
+    MockFailingXAResource mockRes = new MockFailingXAResource();
+    mockRes.failStartAfter = failStartAfter >= 0 ? failStartAfter : Integer.MAX_VALUE;
+    mockRes.failSuspendAfter = failSuspendAfter >= 0 ? failSuspendAfter : Integer.MAX_VALUE;
+    mockRes.failResumeAfter = failResumeAfter >= 0 ? failResumeAfter : Integer.MAX_VALUE;
+    mockRes.failEndAfter = failEndAfter >= 0 ? failEndAfter : Integer.MAX_VALUE;
+    mockRes.failPrepare = failPrepare;
+    mockRes.errorCode = errorCode;
+    MockResolver.setNextXAResource(mockRes);
+
+    TestXid xid1 = new TestXid(testNum);
+    resource.start(xid1, XAResource.TMNOFLAGS);
+
+    try {
+      session.query(createQuery(testModel)).close();
+      if (qryFails)
+        fail("query should have failed");
+    } catch (TuplesException te) {
+      if (!qryFails)
+        throw te;
+      logger.debug("Caught expected exception", te);
+    } catch (QueryException qe) {
+      if (!qryFails)
+        throw qe;
+      logger.debug("Caught expected exception", qe);
+    }
+
+    assertEquals(pfStartCnt, mockRes.startCnt);
+    assertEquals(pfSuspendCnt, mockRes.suspendCnt);
+    assertEquals(pfResumeCnt, mockRes.resumeCnt);
+    assertEquals(pfEndCnt, mockRes.endCnt);
+
+    try {
+      resource.end(xid1, qryFails ? XAResource.TMFAIL : XAResource.TMSUCCESS);
+    } catch (XAException xae) {
+      if (!isRollback(xae) && xae.errorCode != XAException.XA_HEURRB)
+        throw xae;
+    }
+    try {
+      if (qryFails)
+        resource.rollback(xid1);
+      else
+        resource.commit(xid1, true);
+    } catch (XAException xae) {
+      if (xae.errorCode == XAException.XA_HEURRB)
+        resource.forget(xid1);
+      else if (!isRollback(xae) && qryFails)
+        throw xae;
+    }
+
+    assertEquals(endStartCnt, mockRes.startCnt);
+    assertEquals(endSuspendCnt, mockRes.suspendCnt);
+    assertEquals(endResumeCnt, mockRes.resumeCnt);
+    assertEquals(endEndCnt, mockRes.endCnt);
+    assertEquals(endPrepareCnt, mockRes.prepareCnt);
+    assertEquals(endCommitCnt, mockRes.commitCnt);
+    assertEquals(endRollbackCnt, mockRes.rollbackCnt);
+  }
+
   private static class MockXAResource extends DummyXAResource {
-    private static enum State { IDLE, ACTIVE, SUSPENDED, ENDED, PREPARED, FINISHED };
+    protected static enum State { IDLE, ACTIVE, SUSPENDED, ENDED, PREPARED, FINISHED };
 
-    private final ThreadLocal<Xid> currTxn = new ThreadLocal<Xid>();
-    private State state = State.IDLE;
+    protected final ThreadLocal<Xid> currTxn = new ThreadLocal<Xid>();
+    protected State state = State.IDLE;
 
     public int startCnt = 0;
     public int resumeCnt = 0;
@@ -2405,7 +2523,41 @@
     }
   }
 
+  private static class MockFailingXAResource extends MockXAResource {
+    public int failStartAfter = Integer.MAX_VALUE;
+    public int failSuspendAfter = Integer.MAX_VALUE;
+    public int failResumeAfter = Integer.MAX_VALUE;
+    public int failEndAfter = Integer.MAX_VALUE;
+    public int errorCode = 0;
+    public boolean failPrepare = false;
 
+    public void start(Xid xid, int flags) throws XAException {
+      super.start(xid, flags);
+      if (startCnt >= failStartAfter || resumeCnt >= failResumeAfter) {
+        currTxn.set(null);
+        state = State.ENDED;
+        throw (errorCode != 0) ? new XAException(errorCode) : new XAException("Testing start failure");
+      }
+    }
+
+    public void end(Xid xid, int flags) throws XAException {
+      super.end(xid, flags);
+      if (endCnt >= failEndAfter || suspendCnt >= failSuspendAfter) {
+        state = State.ENDED;
+        throw (errorCode != 0) ? new XAException(errorCode) : new XAException("Testing end failure");
+      }
+    }
+
+    public int prepare(Xid xid) throws XAException {
+      super.prepare(xid);
+      if (failPrepare) {
+        throw (errorCode != 0) ? new XAException(errorCode) : new XAException("Testing prepare failure");
+      }
+      return XA_OK;
+    }
+  }
+
+
   //
   // Internal methods
   //

Modified: trunk/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransaction.java
===================================================================
--- trunk/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransaction.java	2008-09-09 06:07:51 UTC (rev 1258)
+++ trunk/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransaction.java	2008-09-09 09:45:25 UTC (rev 1259)
@@ -20,6 +20,7 @@
 // Java 2 enterprise packages
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import javax.transaction.xa.XAResource;
@@ -57,6 +58,7 @@
 
   private Set<EnlistableResource> enlisted;
   private Set<EnlistableResource> started;
+  private Set<EnlistableResource> needRollback;
   private Set<EnlistableResource> prepared;
   private Set<EnlistableResource> committed;
   private Set<EnlistableResource> rollbacked;
@@ -85,6 +87,7 @@
 
     this.enlisted = new HashSet<EnlistableResource>();
     this.started = new HashSet<EnlistableResource>();
+    this.needRollback = new HashSet<EnlistableResource>();
     this.prepared = new HashSet<EnlistableResource>();
     this.committed = new HashSet<EnlistableResource>();
     this.rollbacked = new HashSet<EnlistableResource>();
@@ -189,16 +192,20 @@
     acquireMutex(0, false, MulgaraTransactionException.class);
     try {
       checkActive(MulgaraTransactionException.class);
-      activateXARes(MulgaraTransactionException.class);
       try {
-        long la = lastActive;
-        lastActive = -1;
+        activateXARes(MulgaraTransactionException.class);
+        try {
+          long la = lastActive;
+          lastActive = -1;
 
-        operation.execute(context,
-            context.getSystemResolver(),
-            metadata);
+          operation.execute(context,
+              context.getSystemResolver(),
+              metadata);
 
-        lastActive = (la != -1) ? System.currentTimeMillis() : -1;
+          lastActive = (la != -1) ? System.currentTimeMillis() : -1;
+        } finally {
+          deactivateXARes(MulgaraTransactionException.class);
+        }
       } catch (Throwable th) {
         try {
           heuristicRollback(th.toString());
@@ -206,8 +213,6 @@
           logger.error("Error in rollback after operation failure", ex);
         }
         throw new MulgaraTransactionException("Operation failed", th);
-      } finally {
-        deactivateXARes(MulgaraTransactionException.class);
       }
     } finally {
       releaseMutex();
@@ -218,16 +223,20 @@
     acquireMutex(0, false, TuplesException.class);
     try {
       checkActive(TuplesException.class);
-      activateXARes(TuplesException.class);
       try {
-        long la = lastActive;
-        lastActive = -1;
+        activateXARes(TuplesException.class);
+        try {
+          long la = lastActive;
+          lastActive = -1;
 
-        ao.execute();
+          ao.execute();
 
-        lastActive = (la != -1) ? System.currentTimeMillis() : -1;
+          lastActive = (la != -1) ? System.currentTimeMillis() : -1;
 
-        return ao.getResult();
+          return ao.getResult();
+        } finally {
+          deactivateXARes(TuplesException.class);
+        }
       } catch (Throwable th) {
         try {
           logger.warn("Error in answer operation triggered rollback", th);
@@ -236,8 +245,6 @@
           logger.error("Error in rollback after answer-operation failure", ex);
         }
         throw new TuplesException("Request failed", th);
-      } finally {
-        deactivateXARes(TuplesException.class);
       }
     } finally {
       releaseMutex();
@@ -295,6 +302,9 @@
           started.add(eres);
         }
       } catch (XAException xae) {
+        started.remove(eres);
+        if (isRollback(xae)) needRollback.add(eres);
+
         // end started resources so we're in a consistent state
         flags = wasStarted ? XAResource.TMSUSPEND : XAResource.TMFAIL;
         for (EnlistableResource eres2 : wasStarted ? started : enlisted) {
@@ -310,6 +320,8 @@
           }
         }
 
+        xaResState = wasStarted ? ResourceState.SUSPENDED : ResourceState.FINISHED;
+
         throw factory.newExceptionOrCause(exc, "Error starting resource '" + res + "'", xae);
       }
     }
@@ -333,11 +345,15 @@
     int flags = XAResource.TMSUSPEND;
     T error = null;
 
-    for (EnlistableResource eres : started) {
+    for (Iterator<EnlistableResource> iter = started.iterator(); iter.hasNext(); ) {
+      EnlistableResource eres = iter.next();
       XAResource res = xaResources.get(eres);
       try {
         res.end(xid, flags);
       } catch (XAException xae) {
+        iter.remove();
+        if (isRollback(xae)) needRollback.add(eres);
+
         if (error == null) {
           error = factory.newExceptionOrCause(exc, "Error ending resource '" + res + "'", xae);
         } else {
@@ -361,7 +377,8 @@
     int flags = success ? XAResource.TMSUCCESS : XAResource.TMFAIL;
     XAException error = null;
 
-    for (EnlistableResource eres : started) {
+    for (Iterator<EnlistableResource> iter = started.iterator(); iter.hasNext(); ) {
+      EnlistableResource eres = iter.next();
       XAResource res = xaResources.get(eres);
       try {
         if (xaResState == ResourceState.SUSPENDED) {
@@ -369,6 +386,9 @@
         }
         res.end(xid, flags);
       } catch (XAException xae) {
+        iter.remove();
+        if (isRollback(xae)) needRollback.add(eres);
+
         if (error == null) {
           error = xae;
         } else {
@@ -406,6 +426,7 @@
           started.add(enlistable);
         }
       } catch (XAException ex) {
+        if (isRollback(ex)) needRollback.add(enlistable);
         throw new MulgaraTransactionException("Failed to enlist resource", ex);
       }
     } finally {
@@ -464,9 +485,18 @@
       lastActive = -1;
 
       endXARes(true);
-      for (EnlistableResource er : started) {
-        xaResources.get(er).prepare(xid);
-        prepared.add(er);
+      for (Iterator<EnlistableResource> iter = started.iterator(); iter.hasNext(); ) {
+        EnlistableResource er = iter.next();
+        try {
+          if (xaResources.get(er).prepare(xid) == XAResource.XA_OK) {
+            prepared.add(er);
+          } else {
+            iter.remove();
+          }
+        } catch (XAException xae) {
+          if (isRollback(xae)) iter.remove();
+          throw xae;
+        }
       }
 
       lastActive = (la != -1) ? System.currentTimeMillis() : -1;
@@ -495,6 +525,7 @@
           logger.error("Error ending resources - attempting to rollback anyway", ex);
         }
 
+        started.addAll(needRollback);
         for (EnlistableResource er : started) {
           try {
             if (!committed.contains(er)) {
@@ -602,6 +633,10 @@
     factory.releaseMutex();
   }
 
+  private static boolean isRollback(XAException xae) {
+    return xae.errorCode >= XAException.XA_RBBASE && xae.errorCode <= XAException.XA_RBEND;
+  }
+
   private void report(String desc) {
     if (logger.isInfoEnabled()) {
       logger.info(desc + ": " + System.identityHashCode(this));




More information about the Mulgara-svn mailing list