[Mulgara-svn] r1146 - trunk/src/jar/resolver/java/org/mulgara/resolver

ronald at mulgara.org ronald at mulgara.org
Mon Aug 18 09:13:46 UTC 2008


Author: ronald
Date: 2008-08-18 02:13:45 -0700 (Mon, 18 Aug 2008)
New Revision: 1146

Modified:
   trunk/src/jar/resolver/java/org/mulgara/resolver/ExternalTransactionUnitTest.java
   trunk/src/jar/resolver/java/org/mulgara/resolver/MockResolver.java
   trunk/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransaction.java
Log:
Fix resolver xa-resource activation for externally managed transactions.

The Resolver's XAResource's were not being called correctly: end() was never
being invoked, and start() was only invoked once at enlistment time. This
caused problems for resolvers that need to track per-transaction information
because in an RMI environment different operations in the same transaction may
occur in different threads, i.e. the thread-xid association may change during
the course of a transaction.

The XAResource's are now properly suspended after each operation and resumed
before each operation, just like for internally managed transactions. Also,
end() is always invoked before any prepare(), commit(), or rollback().


Modified: trunk/src/jar/resolver/java/org/mulgara/resolver/ExternalTransactionUnitTest.java
===================================================================
--- trunk/src/jar/resolver/java/org/mulgara/resolver/ExternalTransactionUnitTest.java	2008-08-18 09:13:39 UTC (rev 1145)
+++ trunk/src/jar/resolver/java/org/mulgara/resolver/ExternalTransactionUnitTest.java	2008-08-18 09:13:45 UTC (rev 1146)
@@ -46,6 +46,7 @@
 import org.mulgara.query.rdf.Mulgara;
 import org.mulgara.query.rdf.URIReferenceImpl;
 import org.mulgara.query.rdf.TripleImpl;
+import org.mulgara.resolver.spi.DummyXAResource;
 import org.mulgara.server.Session;
 import org.mulgara.util.FileUtil;
 
@@ -122,6 +123,7 @@
     suite.addTest(new ExternalTransactionUnitTest("testTransactionTimeout"));
     suite.addTest(new ExternalTransactionUnitTest("testTransactionFailure"));
     suite.addTest(new ExternalTransactionUnitTest("testSessionClose"));
+    suite.addTest(new ExternalTransactionUnitTest("testResourceActivation"));
 
     return suite;
   }
@@ -2140,6 +2142,270 @@
   }
 
 
+  /**
+   * Test that the underlying xa-resources (from resolvers) are properly activated and deactivated.
+   */
+  public void testResourceActivation() {
+    logger.info("Testing resourceActivation");
+
+    try {
+      Session session1 = database.newSession();
+      try {
+        // two commands in transaction
+        MockXAResource mockRes = new MockXAResource();
+        MockResolver.setNextXAResource(mockRes);
+
+        XAResource resource1 = session1.getXAResource();
+        Xid xid1 = new TestXid(1);
+        resource1.start(xid1, XAResource.TMNOFLAGS);
+
+        final URI testModel = new URI("foo://mulgara/resourceActivationTest");
+        session1.createModel(testModel, URI.create(Mulgara.NAMESPACE + "MockModel"));
+
+        assertEquals(1, mockRes.startCnt);
+        assertEquals(1, mockRes.suspendCnt);
+        assertEquals(0, mockRes.resumeCnt);
+        assertEquals(0, mockRes.endCnt);
+
+        session1.query(createQuery(testModel)).close();
+        assertEquals(1, mockRes.startCnt);
+        assertEquals(3, mockRes.suspendCnt);
+        assertEquals(2, mockRes.resumeCnt);
+        assertEquals(0, mockRes.endCnt);
+
+        resource1.end(xid1, XAResource.TMSUCCESS);
+        resource1.commit(xid1, true);
+        assertEquals(1, mockRes.startCnt);
+        assertEquals(3, mockRes.suspendCnt);
+        assertEquals(3, mockRes.resumeCnt);
+        assertEquals(1, mockRes.endCnt);
+        assertEquals(1, mockRes.prepareCnt);
+        assertEquals(1, mockRes.commitCnt);
+
+        // two commands in transaction, where one resolver is enlisted later
+        mockRes = new MockXAResource();
+        MockResolver.setNextXAResource(mockRes);
+
+        xid1 = new TestXid(2);
+        resource1.start(xid1, XAResource.TMNOFLAGS);
+
+        session1.query(createQuery(modelURI)).close();
+        assertEquals(0, mockRes.startCnt);
+        assertEquals(0, mockRes.suspendCnt);
+        assertEquals(0, mockRes.resumeCnt);
+        assertEquals(0, mockRes.endCnt);
+
+        session1.query(createQuery(testModel)).close();
+        assertEquals(1, mockRes.startCnt);
+        assertEquals(2, mockRes.suspendCnt);
+        assertEquals(1, mockRes.resumeCnt);
+        assertEquals(0, mockRes.endCnt);
+
+        resource1.end(xid1, XAResource.TMSUCCESS);
+        resource1.commit(xid1, true);
+        assertEquals(1, mockRes.startCnt);
+        assertEquals(2, mockRes.suspendCnt);
+        assertEquals(2, mockRes.resumeCnt);
+        assertEquals(1, mockRes.endCnt);
+        assertEquals(1, mockRes.prepareCnt);
+        assertEquals(1, mockRes.commitCnt);
+
+        // two threads, 2 commands in each thread, all one transaction (e.g. RMI)
+        mockRes = new MockXAResource();
+        MockResolver.setNextXAResource(mockRes);
+
+        xid1 = new TestXid(3);
+        resource1.start(xid1, XAResource.TMNOFLAGS);
+
+        session1.query(createQuery(testModel)).close();
+        assertEquals(1, mockRes.startCnt);
+        assertEquals(2, mockRes.suspendCnt);
+        assertEquals(1, mockRes.resumeCnt);
+        assertEquals(0, mockRes.endCnt);
+
+        final boolean[] steps = new boolean[3];
+        final Xid theXid = xid1;
+        final XAResource theRes = resource1;
+        final Session theSession = session1;
+        final MockXAResource theMockRes = mockRes;
+
+        Thread t1 = new Thread() {
+          public void run() {
+            try {
+              theSession.query(createQuery(testModel)).close();
+              assertEquals(1, theMockRes.startCnt);
+              assertEquals(4, theMockRes.suspendCnt);
+              assertEquals(3, theMockRes.resumeCnt);
+              assertEquals(0, theMockRes.endCnt);
+
+              synchronized (this) {
+                steps[0] = true;
+                notify();
+
+                try {
+                  wait(2000L);
+                } catch (InterruptedException ie) {
+                  logger.error("wait for tx step1 interrupted", ie);
+                  fail(ie);
+                }
+                assertTrue("transaction should've completed step1", steps[1]);
+              }
+
+              theRes.end(theXid, XAResource.TMSUCCESS);
+              theRes.rollback(theXid);
+              assertEquals(1, theMockRes.startCnt);
+              assertEquals(6, theMockRes.suspendCnt);
+              assertEquals(6, theMockRes.resumeCnt);
+              assertEquals(1, theMockRes.endCnt);
+              assertEquals(0, theMockRes.prepareCnt);
+              assertEquals(0, theMockRes.commitCnt);
+              assertEquals(1, theMockRes.rollbackCnt);
+
+              synchronized (this) {
+                steps[2] = true;
+                notify();
+              }
+            } catch (Exception e) {
+              fail(e);
+            }
+          }
+        };
+
+        synchronized (t1) {
+          t1.start();
+
+          try {
+            t1.wait(2000L);
+          } catch (InterruptedException ie) {
+            logger.error("wait for tx step0 interrupted", ie);
+            fail(ie);
+          }
+          assertTrue("transaction should've completed step0", steps[0]);
+        }
+
+        session1.query(createQuery(testModel)).close();
+        assertEquals(1, mockRes.startCnt);
+        assertEquals(6, mockRes.suspendCnt);
+        assertEquals(5, mockRes.resumeCnt);
+        assertEquals(0, mockRes.endCnt);
+
+        synchronized (t1) {
+          steps[1] = true;
+          t1.notify();
+
+          try {
+            t1.wait(2000L);
+          } catch (InterruptedException ie) {
+            logger.error("wait for tx step2 interrupted", ie);
+            fail(ie);
+          }
+          assertTrue("transaction should've completed step2", steps[2]);
+        }
+      } finally {
+        session1.close();
+      }
+    } catch (Exception e) {
+      fail(e);
+    }
+  }
+
+  private static class MockXAResource extends DummyXAResource {
+    private static enum State { IDLE, ACTIVE, SUSPENDED, ENDED, PREPARED, FINISHED };
+
+    private final ThreadLocal<Xid> currTxn = new ThreadLocal<Xid>();
+    private State state = State.IDLE;
+
+    public int startCnt = 0;
+    public int resumeCnt = 0;
+    public int suspendCnt = 0;
+    public int endCnt = 0;
+    public int prepareCnt = 0;
+    public int commitCnt = 0;
+    public int rollbackCnt = 0;
+
+    public void start(Xid xid, int flags) throws XAException {
+      super.start(xid, flags);
+
+      if (currTxn.get() != null) {
+        throw new XAException("transaction already active: " + currTxn.get());
+      }
+      currTxn.set(xid);
+
+      if (flags == XAResource.TMNOFLAGS && state == State.ACTIVE) {
+        throw new XAException("resource already active: " + state);
+      }
+      if (flags == XAResource.TMRESUME && state != State.SUSPENDED) {
+        throw new XAException("resource not suspended: " + state);
+      }
+      state = State.ACTIVE;
+
+      if (flags == XAResource.TMNOFLAGS) startCnt++;
+      if (flags == XAResource.TMRESUME) resumeCnt++;
+    }
+
+    public void end(Xid xid, int flags) throws XAException {
+      super.end(xid, flags);
+
+      if (!xid.equals(currTxn.get())) {
+        throw new XAException("mismatched transaction end");
+      }
+      currTxn.set(null);
+
+      if (state != State.ACTIVE) {
+        throw new XAException("resource not active: " + state);
+      }
+      state = (flags == XAResource.TMSUSPEND) ? State.SUSPENDED : State.ENDED;
+
+      if (flags == XAResource.TMSUSPEND) suspendCnt++;
+      if (flags != XAResource.TMSUSPEND) endCnt++;
+    }
+
+    public int prepare(Xid xid) throws XAException {
+      super.prepare(xid);
+
+      if (currTxn.get() != null) {
+        throw new XAException("transaction still active: " + currTxn.get());
+      }
+      if (state != State.ENDED) {
+        throw new XAException("resource not ended: " + state);
+      }
+      state = State.PREPARED;
+
+      prepareCnt++;
+      return XA_OK;
+    }
+
+    public void commit(Xid xid, boolean onePhase) throws XAException {
+      super.commit(xid, onePhase);
+
+      if (currTxn.get() != null) {
+        throw new XAException("transaction still active: " + currTxn.get());
+      }
+      if (onePhase && state != State.ENDED) {
+        throw new XAException("resource not ended: " + state);
+      }
+      if (!onePhase && state != State.PREPARED) {
+        throw new XAException("resource not prepared: " + state);
+      }
+      state = State.FINISHED;
+
+      commitCnt++;
+    }
+
+    public void rollback(Xid xid) throws XAException {
+      super.rollback(xid);
+
+      if (currTxn.get() != null) throw new XAException("transaction still active: " + currTxn.get());
+      if (state != State.ENDED && state != State.PREPARED) {
+        throw new XAException("resource not ended or prepared: " + state);
+      }
+      state = State.FINISHED;
+
+      rollbackCnt++;
+    }
+  }
+
+
   //
   // Internal methods
   //

Modified: trunk/src/jar/resolver/java/org/mulgara/resolver/MockResolver.java
===================================================================
--- trunk/src/jar/resolver/java/org/mulgara/resolver/MockResolver.java	2008-08-18 09:13:39 UTC (rev 1145)
+++ trunk/src/jar/resolver/java/org/mulgara/resolver/MockResolver.java	2008-08-18 09:13:45 UTC (rev 1146)
@@ -72,9 +72,16 @@
   /** Logger */
   private static Logger logger = Logger.getLogger(MockResolver.class);
 
+  /** the next XAResource to return */
+  private static XAResource nextXARes = null;
+
   /** The session that this resolver is associated with */
   private final ResolverSession resolverSession;
 
+  public static synchronized void setNextXAResource(XAResource xaRes) {
+    nextXARes = xaRes;
+  }
+
   MockResolver(ResolverSession resolverSession) {
     this.resolverSession = resolverSession;
   }
@@ -84,7 +91,9 @@
   }
 
   public XAResource getXAResource() {
-    return new DummyXAResource(10);
+    synchronized (MockResolver.class) {
+      return (nextXARes != null) ? nextXARes : new DummyXAResource(10);
+    }
   }
 
   public void modifyModel(long model, Statements statements, boolean occurs) throws ResolverException {

Modified: trunk/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransaction.java
===================================================================
--- trunk/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransaction.java	2008-08-18 09:13:39 UTC (rev 1145)
+++ trunk/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransaction.java	2008-08-18 09:13:45 UTC (rev 1146)
@@ -10,7 +10,7 @@
  * under the License.
  *
  * This file is an original work developed by Netymon Pty Ltd
- * (http://www.netymon.com, mailto:mail at netymon.com) under contract to 
+ * (http://www.netymon.com, mailto:mail at netymon.com) under contract to
  * Topaz Foundation. Portions created under this contract are
  * Copyright (c) 2007 Topaz Foundation
  * All Rights Reserved.
@@ -51,15 +51,19 @@
 public class MulgaraExternalTransaction implements MulgaraTransaction {
   private static final Logger logger =
     Logger.getLogger(MulgaraExternalTransaction.class.getName());
+  private static enum ResourceState { IDLE, ACTIVE, SUSPENDED, FINISHED };
 
   private Xid xid;
 
   private Set<EnlistableResource> enlisted;
+  private Set<EnlistableResource> started;
   private Set<EnlistableResource> prepared;
   private Set<EnlistableResource> committed;
   private Set<EnlistableResource> rollbacked;
 
   private Map<EnlistableResource, XAResource> xaResources;
+  private ResourceState xaResState;
+  private int inuse;
 
   private MulgaraExternalTransactionFactory factory;
   private DatabaseOperationContext context;
@@ -80,11 +84,14 @@
     this.xid = xid;
 
     this.enlisted = new HashSet<EnlistableResource>();
+    this.started = new HashSet<EnlistableResource>();
     this.prepared = new HashSet<EnlistableResource>();
     this.committed = new HashSet<EnlistableResource>();
     this.rollbacked = new HashSet<EnlistableResource>();
 
     this.xaResources = new HashMap<EnlistableResource, XAResource>();
+    this.xaResState = ResourceState.IDLE;
+    this.inuse = 0;
 
     this.inXACompletion = false;
 
@@ -98,7 +105,7 @@
   }
 
   // We ignore reference counting in external transactions
-  public void reference() throws MulgaraTransactionException {}  
+  public void reference() throws MulgaraTransactionException {}
   public void dereference() throws MulgaraTransactionException {}
 
   /**
@@ -182,6 +189,7 @@
     acquireMutex(0, false, MulgaraTransactionException.class);
     try {
       checkActive(MulgaraTransactionException.class);
+      activateXARes(MulgaraTransactionException.class);
       try {
         long la = lastActive;
         lastActive = -1;
@@ -198,6 +206,8 @@
           logger.error("Error in rollback after operation failure", ex);
         }
         throw new MulgaraTransactionException("Operation failed", th);
+      } finally {
+        deactivateXARes(MulgaraTransactionException.class);
       }
     } finally {
       releaseMutex();
@@ -208,6 +218,7 @@
     acquireMutex(0, false, TuplesException.class);
     try {
       checkActive(TuplesException.class);
+      activateXARes(TuplesException.class);
       try {
         long la = lastActive;
         lastActive = -1;
@@ -225,6 +236,8 @@
           logger.error("Error in rollback after answer-operation failure", ex);
         }
         throw new TuplesException("Request failed", th);
+      } finally {
+        deactivateXARes(TuplesException.class);
       }
     } finally {
       releaseMutex();
@@ -236,13 +249,17 @@
     acquireMutex(0, false, MulgaraTransactionException.class);
     try {
       checkActive(MulgaraTransactionException.class);
+      activateXARes(MulgaraTransactionException.class);
+      try {
+        long la = lastActive;
+        lastActive = -1;
 
-      long la = lastActive;
-      lastActive = -1;
+        to.execute();
 
-      to.execute();
-
-      lastActive = (la != -1) ? System.currentTimeMillis() : -1;
+        lastActive = (la != -1) ? System.currentTimeMillis() : -1;
+      } finally {
+        deactivateXARes(MulgaraTransactionException.class);
+      }
     } finally {
       releaseMutex();
     }
@@ -257,6 +274,116 @@
       throw factory.newException(exc, "Transaction has been completed");
   }
 
+  private <T extends Throwable> void activateXARes(Class<T> exc) throws T {
+    assert xaResState != ResourceState.FINISHED : "Unexpected resource-state: state=" + xaResState;
+
+    if (xaResState == ResourceState.ACTIVE) {
+      inuse++;
+      return;
+    }
+
+    assert inuse == 0 : "Unexpected use-count: state=" + xaResState + ", inuse=" + inuse;
+    boolean wasStarted = (xaResState == ResourceState.SUSPENDED);
+    xaResState = ResourceState.ACTIVE;
+
+    int flags = wasStarted ? XAResource.TMRESUME : XAResource.TMNOFLAGS;
+    for (EnlistableResource eres : wasStarted ? started : enlisted) {
+      XAResource res = xaResources.get(eres);
+      try {
+        res.start(xid, flags);
+        if (!wasStarted) {
+          started.add(eres);
+        }
+      } catch (XAException xae) {
+        // end started resources so we're in a consistent state
+        flags = wasStarted ? XAResource.TMSUSPEND : XAResource.TMFAIL;
+        for (EnlistableResource eres2 : wasStarted ? started : enlisted) {
+          XAResource res2 = xaResources.get(eres2);
+          if (res2 == res) {
+            break;
+          }
+
+          try {
+            res2.end(xid, flags);
+          } catch (XAException xae2) {
+            logger.error("Error ending resource '" + res2 + "' after start failure", xae2);
+          }
+        }
+
+        throw factory.newExceptionOrCause(exc, "Error starting resource '" + res + "'", xae);
+      }
+    }
+
+    inuse = 1;
+  }
+
+  private <T extends Throwable> void deactivateXARes(Class<T> exc) throws T {
+    if (xaResState == ResourceState.FINISHED) {
+      return;
+    }
+
+    assert xaResState == ResourceState.ACTIVE : "Unexpected resource-state: state=" + xaResState;
+    assert inuse > 0 : "Unexpected use-count: state=" + xaResState + ", inuse=" + inuse;
+
+    inuse--;
+    if (inuse > 0) {
+      return;
+    }
+
+    int flags = XAResource.TMSUSPEND;
+    T error = null;
+
+    for (EnlistableResource eres : started) {
+      XAResource res = xaResources.get(eres);
+      try {
+        res.end(xid, flags);
+      } catch (XAException xae) {
+        if (error == null) {
+          error = factory.newExceptionOrCause(exc, "Error ending resource '" + res + "'", xae);
+        } else {
+          logger.error("Error ending resource '" + res + "'", xae);
+        }
+      }
+    }
+
+    xaResState = ResourceState.SUSPENDED;
+
+    if (error != null) {
+      throw error;
+    }
+  }
+
+  private void endXARes(boolean success) throws XAException {
+    if (xaResState != ResourceState.SUSPENDED && xaResState != ResourceState.ACTIVE) {
+      return;
+    }
+
+    int flags = success ? XAResource.TMSUCCESS : XAResource.TMFAIL;
+    XAException error = null;
+
+    for (EnlistableResource eres : started) {
+      XAResource res = xaResources.get(eres);
+      try {
+        if (xaResState == ResourceState.SUSPENDED) {
+          res.start(xid, XAResource.TMRESUME);
+        }
+        res.end(xid, flags);
+      } catch (XAException xae) {
+        if (error == null) {
+          error = xae;
+        } else {
+          logger.error("Error ending resource '" + res + "'", xae);
+        }
+      }
+    }
+
+    xaResState = ResourceState.FINISHED;
+
+    if (error != null) {
+      throw error;
+    }
+  }
+
   public void enlist(EnlistableResource enlistable) throws MulgaraTransactionException {
     acquireMutex(0, false, MulgaraTransactionException.class);
     try {
@@ -269,10 +396,15 @@
         }
         enlisted.add(enlistable);
         xaResources.put(enlistable, res);
-        // FIXME: We need to handle this uptodate operation properly - handle
-        // suspension or mid-prepare/commit.
-        // bringUptodate(res);
-        res.start(xid, XAResource.TMNOFLAGS);
+
+        if (xaResState == ResourceState.ACTIVE) {
+          res.start(xid, XAResource.TMNOFLAGS);
+          started.add(enlistable);
+        } else if (xaResState == ResourceState.SUSPENDED) {
+          res.start(xid, XAResource.TMNOFLAGS);
+          res.end(xid, XAResource.TMSUSPEND);
+          started.add(enlistable);
+        }
       } catch (XAException ex) {
         throw new MulgaraTransactionException("Failed to enlist resource", ex);
       }
@@ -331,10 +463,12 @@
       long la = lastActive;
       lastActive = -1;
 
-      for (EnlistableResource er : enlisted) {
+      endXARes(true);
+      for (EnlistableResource er : started) {
         xaResources.get(er).prepare(xid);
         prepared.add(er);
       }
+
       lastActive = (la != -1) ? System.currentTimeMillis() : -1;
     } finally {
       releaseMutex();
@@ -355,7 +489,13 @@
         rollback = true;
         Map<EnlistableResource, XAException> rollbackFailed = new HashMap<EnlistableResource, XAException>();
 
-        for (EnlistableResource er : enlisted) {
+        try {
+          endXARes(false);
+        } catch (XAException ex) {
+          logger.error("Error ending resources - attempting to rollback anyway", ex);
+        }
+
+        for (EnlistableResource er : started) {
           try {
             if (!committed.contains(er)) {
               xaResources.get(er).rollback(xid);
@@ -380,7 +520,7 @@
           // Then check every rollback failure code for a contradiction to all committed.
           for (XAException xaex : rollbackFailed.values()) {
             switch (xaex.errorCode) {
-              case XAException.XA_HEURHAZ:  
+              case XAException.XA_HEURHAZ:
               case XAException.XAER_NOTA:
               case XAException.XAER_RMERR:
               case XAException.XAER_RMFAIL:




More information about the Mulgara-svn mailing list