[Mulgara-svn] r1259 - trunk/src/jar/resolver/java/org/mulgara/resolver
ronald at mulgara.org
ronald at mulgara.org
Tue Sep 9 09:45:26 UTC 2008
Author: ronald
Date: 2008-09-09 02:45:25 -0700 (Tue, 09 Sep 2008)
New Revision: 1259
Modified:
trunk/src/jar/resolver/java/org/mulgara/resolver/ExternalTransactionUnitTest.java
trunk/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransaction.java
Log:
Added better exception handling for for errors from the internal XAResource's.
This is now fairly close to the spec.
Also added proper handling of XA_RDONLY return from prepare().
Modified: trunk/src/jar/resolver/java/org/mulgara/resolver/ExternalTransactionUnitTest.java
===================================================================
--- trunk/src/jar/resolver/java/org/mulgara/resolver/ExternalTransactionUnitTest.java 2008-09-09 06:07:51 UTC (rev 1258)
+++ trunk/src/jar/resolver/java/org/mulgara/resolver/ExternalTransactionUnitTest.java 2008-09-09 09:45:25 UTC (rev 1259)
@@ -1837,6 +1837,8 @@
assertFalse(answer.next());
answer.close();
fail("query should've gotten aborted");
+ } catch (QueryException qe) {
+ logger.debug("query was aborted", qe);
} catch (TuplesException te) {
logger.debug("query was aborted", te);
}
@@ -2301,6 +2303,57 @@
}
assertTrue("transaction should've completed step2", steps[2]);
}
+
+ int RB = XAException.XA_RBOTHER;
+ // res.start fails immediately with no error-code
+ doResourceFailureTest(4, session1, resource1, testModel, 1, -1, -1, -1, false, 0, true,
+ 1, 0, 0, 0,
+ 1, 0, 0, 0, 0, 0, 0);
+
+ // res.start fails immediately with rollback error-code
+ doResourceFailureTest(5, session1, resource1, testModel, 1, -1, -1, -1, false, RB, true,
+ 1, 0, 0, 0,
+ 1, 0, 0, 0, 0, 0, 1);
+
+ // res.end fails on suspend with unspecified error-code
+ doResourceFailureTest(6, session1, resource1, testModel, -1, 1, -1, -1, false, 0, true,
+ 1, 1, 0, 0,
+ 1, 1, 0, 0, 0, 0, 0);
+
+ // res.end fails on suspend with rollback error-code
+ doResourceFailureTest(7, session1, resource1, testModel, -1, 1, -1, -1, false, RB, true,
+ 1, 1, 0, 0,
+ 1, 1, 0, 0, 0, 0, 1);
+
+ // res.start fails on resume with unspecified error-code
+ doResourceFailureTest(8, session1, resource1, testModel, -1, -1, 1, -1, false, 0, true,
+ 1, 1, 1, 0,
+ 1, 1, 1, 0, 0, 0, 0);
+
+ // res.start fails on resume with rollback error-code
+ doResourceFailureTest(9, session1, resource1, testModel, -1, -1, 1, -1, false, RB, true,
+ 1, 1, 1, 0,
+ 1, 1, 1, 0, 0, 0, 1);
+
+ // res.end fails on end with unspecified error-code
+ doResourceFailureTest(10, session1, resource1, testModel, -1, -1, -1, 1, false, 0, false,
+ 1, 2, 1, 0,
+ 1, 2, 2, 1, 0, 0, 0);
+
+ // res.end fails on end with rollback error-code
+ doResourceFailureTest(11, session1, resource1, testModel, -1, -1, -1, 1, false, RB, false,
+ 1, 2, 1, 0,
+ 1, 2, 2, 1, 0, 0, 1);
+
+ // res.prepare fails with unspecified error-code
+ doResourceFailureTest(12, session1, resource1, testModel, -1, -1, -1, -1, true, 0, false,
+ 1, 2, 1, 0,
+ 1, 2, 2, 1, 1, 0, 1);
+
+ // res.prepare fails with rollback error-code
+ doResourceFailureTest(13, session1, resource1, testModel, -1, -1, -1, -1, true, RB, false,
+ 1, 2, 1, 0,
+ 1, 2, 2, 1, 1, 0, 0);
} finally {
session1.close();
}
@@ -2309,11 +2362,76 @@
}
}
+ private void doResourceFailureTest(int testNum, Session session, XAResource resource, URI testModel,
+ int failStartAfter, int failSuspendAfter, int failResumeAfter,
+ int failEndAfter, boolean failPrepare, int errorCode, boolean qryFails,
+ int pfStartCnt, int pfSuspendCnt, int pfResumeCnt, int pfEndCnt,
+ int endStartCnt, int endSuspendCnt, int endResumeCnt, int endEndCnt,
+ int endPrepareCnt, int endCommitCnt, int endRollbackCnt)
+ throws Exception {
+ MockFailingXAResource mockRes = new MockFailingXAResource();
+ mockRes.failStartAfter = failStartAfter >= 0 ? failStartAfter : Integer.MAX_VALUE;
+ mockRes.failSuspendAfter = failSuspendAfter >= 0 ? failSuspendAfter : Integer.MAX_VALUE;
+ mockRes.failResumeAfter = failResumeAfter >= 0 ? failResumeAfter : Integer.MAX_VALUE;
+ mockRes.failEndAfter = failEndAfter >= 0 ? failEndAfter : Integer.MAX_VALUE;
+ mockRes.failPrepare = failPrepare;
+ mockRes.errorCode = errorCode;
+ MockResolver.setNextXAResource(mockRes);
+
+ TestXid xid1 = new TestXid(testNum);
+ resource.start(xid1, XAResource.TMNOFLAGS);
+
+ try {
+ session.query(createQuery(testModel)).close();
+ if (qryFails)
+ fail("query should have failed");
+ } catch (TuplesException te) {
+ if (!qryFails)
+ throw te;
+ logger.debug("Caught expected exception", te);
+ } catch (QueryException qe) {
+ if (!qryFails)
+ throw qe;
+ logger.debug("Caught expected exception", qe);
+ }
+
+ assertEquals(pfStartCnt, mockRes.startCnt);
+ assertEquals(pfSuspendCnt, mockRes.suspendCnt);
+ assertEquals(pfResumeCnt, mockRes.resumeCnt);
+ assertEquals(pfEndCnt, mockRes.endCnt);
+
+ try {
+ resource.end(xid1, qryFails ? XAResource.TMFAIL : XAResource.TMSUCCESS);
+ } catch (XAException xae) {
+ if (!isRollback(xae) && xae.errorCode != XAException.XA_HEURRB)
+ throw xae;
+ }
+ try {
+ if (qryFails)
+ resource.rollback(xid1);
+ else
+ resource.commit(xid1, true);
+ } catch (XAException xae) {
+ if (xae.errorCode == XAException.XA_HEURRB)
+ resource.forget(xid1);
+ else if (!isRollback(xae) && qryFails)
+ throw xae;
+ }
+
+ assertEquals(endStartCnt, mockRes.startCnt);
+ assertEquals(endSuspendCnt, mockRes.suspendCnt);
+ assertEquals(endResumeCnt, mockRes.resumeCnt);
+ assertEquals(endEndCnt, mockRes.endCnt);
+ assertEquals(endPrepareCnt, mockRes.prepareCnt);
+ assertEquals(endCommitCnt, mockRes.commitCnt);
+ assertEquals(endRollbackCnt, mockRes.rollbackCnt);
+ }
+
private static class MockXAResource extends DummyXAResource {
- private static enum State { IDLE, ACTIVE, SUSPENDED, ENDED, PREPARED, FINISHED };
+ protected static enum State { IDLE, ACTIVE, SUSPENDED, ENDED, PREPARED, FINISHED };
- private final ThreadLocal<Xid> currTxn = new ThreadLocal<Xid>();
- private State state = State.IDLE;
+ protected final ThreadLocal<Xid> currTxn = new ThreadLocal<Xid>();
+ protected State state = State.IDLE;
public int startCnt = 0;
public int resumeCnt = 0;
@@ -2405,7 +2523,41 @@
}
}
+ private static class MockFailingXAResource extends MockXAResource {
+ public int failStartAfter = Integer.MAX_VALUE;
+ public int failSuspendAfter = Integer.MAX_VALUE;
+ public int failResumeAfter = Integer.MAX_VALUE;
+ public int failEndAfter = Integer.MAX_VALUE;
+ public int errorCode = 0;
+ public boolean failPrepare = false;
+ public void start(Xid xid, int flags) throws XAException {
+ super.start(xid, flags);
+ if (startCnt >= failStartAfter || resumeCnt >= failResumeAfter) {
+ currTxn.set(null);
+ state = State.ENDED;
+ throw (errorCode != 0) ? new XAException(errorCode) : new XAException("Testing start failure");
+ }
+ }
+
+ public void end(Xid xid, int flags) throws XAException {
+ super.end(xid, flags);
+ if (endCnt >= failEndAfter || suspendCnt >= failSuspendAfter) {
+ state = State.ENDED;
+ throw (errorCode != 0) ? new XAException(errorCode) : new XAException("Testing end failure");
+ }
+ }
+
+ public int prepare(Xid xid) throws XAException {
+ super.prepare(xid);
+ if (failPrepare) {
+ throw (errorCode != 0) ? new XAException(errorCode) : new XAException("Testing prepare failure");
+ }
+ return XA_OK;
+ }
+ }
+
+
//
// Internal methods
//
Modified: trunk/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransaction.java
===================================================================
--- trunk/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransaction.java 2008-09-09 06:07:51 UTC (rev 1258)
+++ trunk/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransaction.java 2008-09-09 09:45:25 UTC (rev 1259)
@@ -20,6 +20,7 @@
// Java 2 enterprise packages
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import javax.transaction.xa.XAResource;
@@ -57,6 +58,7 @@
private Set<EnlistableResource> enlisted;
private Set<EnlistableResource> started;
+ private Set<EnlistableResource> needRollback;
private Set<EnlistableResource> prepared;
private Set<EnlistableResource> committed;
private Set<EnlistableResource> rollbacked;
@@ -85,6 +87,7 @@
this.enlisted = new HashSet<EnlistableResource>();
this.started = new HashSet<EnlistableResource>();
+ this.needRollback = new HashSet<EnlistableResource>();
this.prepared = new HashSet<EnlistableResource>();
this.committed = new HashSet<EnlistableResource>();
this.rollbacked = new HashSet<EnlistableResource>();
@@ -189,16 +192,20 @@
acquireMutex(0, false, MulgaraTransactionException.class);
try {
checkActive(MulgaraTransactionException.class);
- activateXARes(MulgaraTransactionException.class);
try {
- long la = lastActive;
- lastActive = -1;
+ activateXARes(MulgaraTransactionException.class);
+ try {
+ long la = lastActive;
+ lastActive = -1;
- operation.execute(context,
- context.getSystemResolver(),
- metadata);
+ operation.execute(context,
+ context.getSystemResolver(),
+ metadata);
- lastActive = (la != -1) ? System.currentTimeMillis() : -1;
+ lastActive = (la != -1) ? System.currentTimeMillis() : -1;
+ } finally {
+ deactivateXARes(MulgaraTransactionException.class);
+ }
} catch (Throwable th) {
try {
heuristicRollback(th.toString());
@@ -206,8 +213,6 @@
logger.error("Error in rollback after operation failure", ex);
}
throw new MulgaraTransactionException("Operation failed", th);
- } finally {
- deactivateXARes(MulgaraTransactionException.class);
}
} finally {
releaseMutex();
@@ -218,16 +223,20 @@
acquireMutex(0, false, TuplesException.class);
try {
checkActive(TuplesException.class);
- activateXARes(TuplesException.class);
try {
- long la = lastActive;
- lastActive = -1;
+ activateXARes(TuplesException.class);
+ try {
+ long la = lastActive;
+ lastActive = -1;
- ao.execute();
+ ao.execute();
- lastActive = (la != -1) ? System.currentTimeMillis() : -1;
+ lastActive = (la != -1) ? System.currentTimeMillis() : -1;
- return ao.getResult();
+ return ao.getResult();
+ } finally {
+ deactivateXARes(TuplesException.class);
+ }
} catch (Throwable th) {
try {
logger.warn("Error in answer operation triggered rollback", th);
@@ -236,8 +245,6 @@
logger.error("Error in rollback after answer-operation failure", ex);
}
throw new TuplesException("Request failed", th);
- } finally {
- deactivateXARes(TuplesException.class);
}
} finally {
releaseMutex();
@@ -295,6 +302,9 @@
started.add(eres);
}
} catch (XAException xae) {
+ started.remove(eres);
+ if (isRollback(xae)) needRollback.add(eres);
+
// end started resources so we're in a consistent state
flags = wasStarted ? XAResource.TMSUSPEND : XAResource.TMFAIL;
for (EnlistableResource eres2 : wasStarted ? started : enlisted) {
@@ -310,6 +320,8 @@
}
}
+ xaResState = wasStarted ? ResourceState.SUSPENDED : ResourceState.FINISHED;
+
throw factory.newExceptionOrCause(exc, "Error starting resource '" + res + "'", xae);
}
}
@@ -333,11 +345,15 @@
int flags = XAResource.TMSUSPEND;
T error = null;
- for (EnlistableResource eres : started) {
+ for (Iterator<EnlistableResource> iter = started.iterator(); iter.hasNext(); ) {
+ EnlistableResource eres = iter.next();
XAResource res = xaResources.get(eres);
try {
res.end(xid, flags);
} catch (XAException xae) {
+ iter.remove();
+ if (isRollback(xae)) needRollback.add(eres);
+
if (error == null) {
error = factory.newExceptionOrCause(exc, "Error ending resource '" + res + "'", xae);
} else {
@@ -361,7 +377,8 @@
int flags = success ? XAResource.TMSUCCESS : XAResource.TMFAIL;
XAException error = null;
- for (EnlistableResource eres : started) {
+ for (Iterator<EnlistableResource> iter = started.iterator(); iter.hasNext(); ) {
+ EnlistableResource eres = iter.next();
XAResource res = xaResources.get(eres);
try {
if (xaResState == ResourceState.SUSPENDED) {
@@ -369,6 +386,9 @@
}
res.end(xid, flags);
} catch (XAException xae) {
+ iter.remove();
+ if (isRollback(xae)) needRollback.add(eres);
+
if (error == null) {
error = xae;
} else {
@@ -406,6 +426,7 @@
started.add(enlistable);
}
} catch (XAException ex) {
+ if (isRollback(ex)) needRollback.add(enlistable);
throw new MulgaraTransactionException("Failed to enlist resource", ex);
}
} finally {
@@ -464,9 +485,18 @@
lastActive = -1;
endXARes(true);
- for (EnlistableResource er : started) {
- xaResources.get(er).prepare(xid);
- prepared.add(er);
+ for (Iterator<EnlistableResource> iter = started.iterator(); iter.hasNext(); ) {
+ EnlistableResource er = iter.next();
+ try {
+ if (xaResources.get(er).prepare(xid) == XAResource.XA_OK) {
+ prepared.add(er);
+ } else {
+ iter.remove();
+ }
+ } catch (XAException xae) {
+ if (isRollback(xae)) iter.remove();
+ throw xae;
+ }
}
lastActive = (la != -1) ? System.currentTimeMillis() : -1;
@@ -495,6 +525,7 @@
logger.error("Error ending resources - attempting to rollback anyway", ex);
}
+ started.addAll(needRollback);
for (EnlistableResource er : started) {
try {
if (!committed.contains(er)) {
@@ -602,6 +633,10 @@
factory.releaseMutex();
}
+ private static boolean isRollback(XAException xae) {
+ return xae.errorCode >= XAException.XA_RBBASE && xae.errorCode <= XAException.XA_RBEND;
+ }
+
private void report(String desc) {
if (logger.isInfoEnabled()) {
logger.info(desc + ": " + System.identityHashCode(this));
More information about the Mulgara-svn
mailing list