[Mulgara-svn] r1357 - trunk/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed
ronald at mulgara.org
ronald at mulgara.org
Thu Oct 23 13:12:42 UTC 2008
Author: ronald
Date: 2008-10-23 06:12:41 -0700 (Thu, 23 Oct 2008)
New Revision: 1357
Added:
trunk/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/MultiXAResource.java
trunk/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/MultiXAResourceUnitTest.java
trunk/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/TransactionCoordinator.java
Modified:
trunk/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/DistributedResolver.java
trunk/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/DistributedResolverFactory.java
trunk/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/NetworkDelegator.java
Log:
Added transaction suport to the distributed resolver. The main chunk of this is
an XAResource capable of forwarding to a list of other XAResource's.
Modified: trunk/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/DistributedResolver.java
===================================================================
--- trunk/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/DistributedResolver.java 2008-10-23 13:12:35 UTC (rev 1356)
+++ trunk/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/DistributedResolver.java 2008-10-23 13:12:41 UTC (rev 1357)
@@ -13,11 +13,9 @@
package org.mulgara.resolver.distributed;
// Java 2 standard packages
-import java.net.*;
-import java.util.*;
+import java.net.URI;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
// Third party packages
import org.apache.log4j.Logger;
@@ -27,15 +25,13 @@
import org.mulgara.query.ConstraintElement;
import org.mulgara.query.LocalNode;
import org.mulgara.query.QueryException;
-import org.mulgara.resolver.spi.DummyXAResource;
import org.mulgara.resolver.spi.Resolution;
import org.mulgara.resolver.spi.Resolver;
import org.mulgara.resolver.spi.ResolverException;
+import org.mulgara.resolver.spi.ResolverFactory;
import org.mulgara.resolver.spi.ResolverFactoryException;
import org.mulgara.resolver.spi.ResolverSession;
import org.mulgara.resolver.spi.Statements;
-import org.mulgara.server.Session;
-import org.mulgara.server.SessionFactory;
/**
* Resolves constraints accessible through a session.
@@ -45,42 +41,35 @@
* @copyright © 2007 <a href="mailto:pgearon at users.sourceforge.net">Paul Gearon</a>
* @licence <a href="{@docRoot}/../../LICENCE.txt">Open Software License v3.0</a>
*/
-public class DistributedResolver implements Resolver {
-
+public class DistributedResolver implements Resolver, TransactionCoordinator {
/** Logger. */
- private static Logger logger = Logger.getLogger(DistributedResolver.class.getName());
+ private static Logger logger = Logger.getLogger(DistributedResolver.class);
/** The delegator that resolves the constraint on another server. */
private final Delegator delegator;
/** our xa-resource */
- private final XAResource xares;
+ private final DistributedXAResource xares;
/**
* Construct a Distributed Resolver.
* @param resolverSession the session this resolver is associated with.
+ * @param resolverFactory the factory this resolver is associated with.
+ * @param canWrite whether the current transaction is read-only or r/w
* @throws IllegalArgumentException if <var>resolverSession</var> is <code>null</code>
* @throws ResolverFactoryException if the superclass is unable to handle its arguments
*/
- DistributedResolver(ResolverSession resolverSession) throws ResolverFactoryException {
+ DistributedResolver(ResolverSession resolverSession, ResolverFactory resolverFactory,
+ boolean canWrite) throws ResolverFactoryException {
if (logger.isDebugEnabled()) logger.debug("Instantiating a distributed resolver");
// Validate "resolverSession" parameter
if (resolverSession == null) throw new IllegalArgumentException( "Null \"resolverSession\" parameter");
- delegator = new NetworkDelegator(resolverSession);
-
- xares = new DummyXAResource() {
- public void commit(Xid xid, boolean onePhase) throws XAException {
- delegator.close();
- }
-
- public void rollback(Xid xid) throws XAException {
- delegator.close();
- }
- };
+ delegator = new NetworkDelegator(resolverSession, canWrite, this);
+ xares = new DistributedXAResource(10, resolverFactory, delegator);
}
@@ -162,4 +151,50 @@
public void abort() {
delegator.close();
}
+
+
+ public void enlistResource(XAResource xares) throws XAException {
+ this.xares.enlistResource(xares);
+ }
+
+ /**
+ * An XAResource which encapsulates and delegates to the all the remote XAResource's.
+ *
+ * <p>Note that this can never be really correct because it's basically impossible to get
+ * the <var>isSameRM</var> semantics correct when proxying multiple XAResource's.
+ */
+ private static class DistributedXAResource extends MultiXAResource {
+ private final Delegator delegator;
+
+ /**
+ * Construct a {@link DistributedXAResource} with a specified transaction timeout.
+ *
+ * @param transactionTimeout transaction timeout period, in seconds
+ * @param resolverFactory the resolver-factory we belong to
+ * @param delegator the delegator being used
+ */
+ public DistributedXAResource(int transactionTimeout, ResolverFactory resolverFactory,
+ Delegator delegator) {
+ super(transactionTimeout, resolverFactory);
+ this.delegator = delegator;
+ }
+
+ protected DistributedTxInfo newTransactionInfo() {
+ DistributedTxInfo txInfo = new DistributedTxInfo();
+ txInfo.delegator = delegator;
+ return txInfo;
+ }
+
+ protected void transactionCompleted(MultiXAResource.MultiTxInfo tx) {
+ try {
+ ((DistributedTxInfo) tx).delegator.close();
+ } finally {
+ super.transactionCompleted(tx);
+ }
+ }
+
+ static class DistributedTxInfo extends MultiXAResource.MultiTxInfo {
+ public Delegator delegator;
+ }
+ }
}
Modified: trunk/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/DistributedResolverFactory.java
===================================================================
--- trunk/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/DistributedResolverFactory.java 2008-10-23 13:12:35 UTC (rev 1356)
+++ trunk/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/DistributedResolverFactory.java 2008-10-23 13:12:41 UTC (rev 1357)
@@ -124,7 +124,7 @@
logger.debug("Creating new distributed resolver");
if (canWrite) logger.debug("Expecting to write to distributed resolver.");
}
- return new DistributedResolver(resolverSession);
+ return new DistributedResolver(resolverSession, this, canWrite);
}
Added: trunk/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/MultiXAResource.java
===================================================================
--- trunk/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/MultiXAResource.java (rev 0)
+++ trunk/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/MultiXAResource.java 2008-10-23 13:12:41 UTC (rev 1357)
@@ -0,0 +1,445 @@
+/*
+ * Copyright 2008 The Topaz Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ * Contributions:
+ */
+
+package org.mulgara.resolver.distributed;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.apache.log4j.Logger;
+
+import org.mulgara.resolver.spi.AbstractXAResource;
+import org.mulgara.resolver.spi.AbstractXAResource.RMInfo;
+import org.mulgara.resolver.spi.AbstractXAResource.TxInfo;
+import org.mulgara.resolver.spi.ResolverFactory;
+
+/**
+ * This is an implementation of {@link XAResource} that presents a collection of xa-resources as
+ * a single xa-resource. In doing so, this implements parts of what a transaction-manager must
+ * provide in order to present a unified view. For example, a one-phase commit may need to be
+ * turned into a two-phase commit over the underlying resources, and all underlying resources must
+ * be aborted if an RMFAIL exception occurs.
+ *
+ * <p>XAResource's may be enlisted via {@link #enlistResource}; the enlistement is only valid for
+ * the duration of the transaction.
+ *
+ * <p>This class is partially thread-safe, as required by JTA. Specifically, {@link #prepare},
+ * {@link #commit}, and {@link #rollback} may be invoked concurrently with {@link #start} and
+ * {@link #end} so long as these do not involve the same xid. Also, there is no requirement that
+ * the same thread be used for any two operations on a given xid. However, {@link #start} and
+ * {@link #end} may not be nested, nor may any two methods be invoked concurrently with the same
+ * xid.
+ *
+ * <p>Limitations:
+ * <ul>
+ * <li>Transaction-id's (Xid's) are not remembered persistently, so {@link #recover} and
+ * {@link #forget} across a restart won't work</li>
+ * <li>{@link #isSameRM} cannot be properly implemented.</li>
+ * </ul>
+ *
+ * @created 2008-02-16
+ * @author Ronald Tschalär
+ * @copyright ©2008 <a href="http://www.topazproject.org/">Topaz Project</a>
+ * @licence Apache License v2.0
+ */
+public class MultiXAResource
+ extends AbstractXAResource<RMInfo<MultiXAResource.MultiTxInfo>,MultiXAResource.MultiTxInfo> {
+
+ private static final Logger logger = Logger.getLogger(MultiXAResource.class);
+
+ private final Set<XAResource> ended = new HashSet<XAResource>();
+
+ private volatile MultiTxInfo curTx;
+
+
+ /**
+ * Create a new Multi-XAResource.
+ *
+ * @param transactionTimeout transaction timeout period, in seconds
+ * @param resolverFactory the resolver-factory we belong to
+ */
+ public MultiXAResource(int transactionTimeout, ResolverFactory resolverFactory) {
+ super(transactionTimeout, resolverFactory);
+ }
+
+ protected RMInfo<MultiTxInfo> newResourceManager() {
+ return new RMInfo<MultiTxInfo>();
+ }
+
+ protected MultiTxInfo newTransactionInfo() {
+ return new MultiTxInfo();
+ }
+
+ /**
+ * Enlist a resource in the current transaction. This may only be invoked while a transaction is
+ * in the ACTIVE state (between a {@link #start} and {@link #end}).
+ *
+ * @param res the resource to enlist
+ * @throws XAException if an error occurs
+ */
+ public void enlistResource(XAResource res) throws XAException {
+ if (curTx == null)
+ throw new IllegalStateException("No transaction active");
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("enlisting resource '" + res + "' in txn '" + formatXid(curTx.xid) + "'");
+ }
+
+ int elapsed = (int)((System.currentTimeMillis() - curTx.startTime) / 1000);
+ res.setTransactionTimeout(Math.max(transactionTimeout - elapsed, 10));
+
+ for (XAResource r : curTx.resources) {
+ if (res.isSameRM(r)) {
+ res.start(curTx.xid, TMJOIN);
+ return;
+ }
+ }
+
+ curTx.resources.add(res);
+ try {
+ res.start(curTx.xid, TMNOFLAGS);
+ } catch (Throwable t) {
+ if (isCompleted(t)) {
+ curTx.resources.remove(res);
+ // turn this into a non-rmfail exception so rollback() is called for us
+ t = (XAException)new XAException(XAException.XAER_RMERR).initCause(t);
+ } else {
+ ended.add(res);
+ }
+
+ throwExc(t);
+ }
+ }
+
+ /* flags - One of TMNOFLAGS, TMJOIN, or TMRESUME
+ * Possible exceptions are: XA_RB*, XAER_RMERR, XAER_RMFAIL, XAER_DUPID, XAER_OUTSIDE, XAER_NOTA,
+ * XAER_INVAL, or XAER_PROTO
+ */
+ protected void doStart(MultiTxInfo txInfo, int flags, boolean isNew) throws XAException {
+ // check we're not already between a start and end
+ if (curTx != null) throw new XAException(XAException.XAER_PROTO);
+
+ // mark that we're active
+ curTx = txInfo;
+ txInfo.state = MultiTxInfo.States.ACTIVE;
+
+ // propagate the start
+ if (flags == TMJOIN) {
+ throw new XAException("Can't handle joins");
+ } else if (flags == XAResource.TMRESUME) {
+ for (Iterator<XAResource> iter = txInfo.resources.iterator(); iter.hasNext(); ) {
+ XAResource r = iter.next();
+ try {
+ r.start(txInfo.xid, flags);
+ } catch (Throwable t) {
+ if (isCompleted(t)) {
+ iter.remove();
+ // turn this into a non-rmfail exception so rollback() is called for us
+ t = (XAException)new XAException(XAException.XAER_RMERR).initCause(t);
+ }
+
+ for (Iterator<XAResource> iter2 = txInfo.resources.iterator(); iter2.hasNext(); ) {
+ XAResource r2 = iter2.next();
+ if (r2 == r) continue;
+
+ try {
+ r2.end(txInfo.xid, TMFAIL);
+ } catch (Throwable t2) {
+ logger.error("Error suspending resource '" + r2 + "' while handling aborted start", t2);
+ if (isCompleted(t2)) iter2.remove();
+ }
+ }
+
+ curTx = null;
+ txInfo.state = MultiTxInfo.States.IDLE;
+
+ throwExc(t);
+ }
+ }
+ }
+ }
+
+ /* flags - One of TMSUCCESS, TMFAIL, or TMSUSPEND
+ * Possible XAException values are: XAER_RMERR, XAER_RMFAIL, XAER_NOTA, XAER_INVAL, XAER_PROTO, or
+ * XA_RB*.
+ */
+ protected void doEnd(MultiTxInfo txInfo, int flags) throws XAException {
+ // check we're between a start and end
+ if (curTx != txInfo) throw new XAException(XAException.XAER_PROTO);
+
+ // propagate the end
+ Throwable exc = null;
+
+ for (Iterator<XAResource> iter = txInfo.resources.iterator(); iter.hasNext(); ) {
+ XAResource r = iter.next();
+ if (ended.contains(r)) continue;
+
+ try {
+ r.end(txInfo.xid, flags);
+ } catch (Throwable t) {
+ if (isCompleted(t)) iter.remove();
+ else if (flags == TMSUSPEND) ended.add(r);
+
+ if (exc == null) exc = t;
+ else logger.error("2nd or more exception during end; resource = '" + r + "'", t);
+ }
+ }
+
+ // mark that we're idle
+ curTx = null;
+ txInfo.state = MultiTxInfo.States.IDLE;
+
+ // clean up if we failed on suspend
+ if (flags == TMSUSPEND && exc != null) {
+ for (Iterator<XAResource> iter = txInfo.resources.iterator(); iter.hasNext(); ) {
+ XAResource r = iter.next();
+ if (ended.contains(r)) continue;
+
+ try {
+ r.end(txInfo.xid, TMFAIL);
+ } catch (Throwable t) {
+ if (isCompleted(t)) iter.remove();
+ logger.error("2nd or more exception during end; resource = '" + r + "'", t);
+ }
+ }
+ }
+
+ ended.clear();
+
+ // turn this into a non-rmfail exception so rollback() is called for us
+ if (isCompleted(exc)) {
+ throw (XAException)new XAException(XAException.XAER_RMERR).initCause(exc);
+ }
+ if (exc != null) throwExc(exc);
+ }
+
+ /* Possible exception values are: XA_RB*, XAER_RMERR, XAER_RMFAIL, XAER_NOTA, XAER_INVAL, or
+ * XAER_PROTO
+ */
+ protected int doPrepare(MultiTxInfo txInfo) throws XAException {
+ // check that this tx isn't in the ACTIVE state
+ if (txInfo == curTx) throw new XAException(XAException.XAER_PROTO);
+
+ // tell everyone to prepare and gather their votes
+ Throwable exc = null;
+ txInfo.state = MultiTxInfo.States.PREPARING;
+
+ for (Iterator<XAResource> iter = txInfo.resources.iterator(); iter.hasNext(); ) {
+ XAResource r = iter.next();
+ try {
+ if (r.prepare(txInfo.xid) == XA_RDONLY) {
+ iter.remove(); // read-only don't participate in commit/rollback
+ }
+ } catch (Throwable t) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("prepare vetoed by '" + r + "'", t);
+ }
+ if (isCompleted(t) || isRollback(t)) iter.remove();
+
+ if (exc == null) exc = t;
+ }
+ }
+
+ txInfo.state = MultiTxInfo.States.PREPARED;
+
+ // turn this into a non-rmfail/no-rb exception so rollback() is called for us
+ if (isCompleted(exc) || isRollback(exc)) {
+ throw (XAException)new XAException(XAException.XAER_RMERR).initCause(exc);
+ }
+ if (exc != null) throwExc(exc);
+
+ //return (txInfo.resources.size() > 0) ? XA_OK : XA_RDONLY;
+ return XA_OK; // JOTM bug
+ }
+
+ /* Possible XAExceptions are XA_HEURHAZ, XA_HEURCOM, XA_HEURRB, XA_HEURMIX, XAER_RMERR,
+ * XAER_RMFAIL, XAER_NOTA, XAER_INVAL, or XAER_PROTO
+ */
+ protected void doCommit(MultiTxInfo txInfo) throws XAException {
+ // check that this tx isn't in the ACTIVE state
+ if (txInfo == curTx) throw new XAException(XAException.XAER_PROTO);
+
+ Throwable exc = null;
+ txInfo.state = MultiTxInfo.States.COMMITTING;
+
+ try {
+ // ready to do commit
+ int numCmt = 0;
+
+ for (Iterator<XAResource> iter = txInfo.resources.iterator(); iter.hasNext(); ) {
+ XAResource r = iter.next();
+ try {
+ if (numCmt == 0 && exc instanceof XAException &&
+ (isRollback(exc) || ((XAException)exc).errorCode == XAException.XA_HEURRB)) {
+ r.rollback(txInfo.xid); // the first commit failed with a RB, so we roll back all
+ } else {
+ r.commit(txInfo.xid, false);
+ numCmt++;
+ }
+ iter.remove();
+ } catch (Throwable t) {
+ if (!isHeuristic(t)) iter.remove();
+
+ if (exc == null) {
+ exc = t;
+ } else if (isHeuristic(t) && isHeuristic(exc) &&
+ ((XAException)t).errorCode != ((XAException)exc).errorCode &&
+ ((XAException)exc).errorCode != XAException.XA_HEURMIX ) {
+ exc = (XAException)new XAException(XAException.XA_HEURMIX).initCause(exc);
+ logger.error("2nd or more exception during commit; resource = '" + r + "'", t);
+ } else {
+ logger.error("2nd or more exception during commit; resource = '" + r + "'", t);
+ }
+ }
+ }
+
+ if (exc instanceof XAException) {
+ XAException xae = (XAException)exc;
+ if (xae.errorCode == XAException.XA_HEURMIX) throw xae;
+ if (numCmt == 0 && xae.errorCode == XAException.XA_HEURRB) throw xae;
+ }
+ if (exc != null) {
+ if (numCmt == 0) throw (XAException)new XAException(XAException.XA_HEURRB).initCause(exc);
+ else throw (XAException)new XAException(XAException.XA_HEURMIX).initCause(exc);
+ }
+ } finally {
+ txInfo.state = MultiTxInfo.States.FINISHED;
+ }
+ }
+
+ /* Possible XAExceptions are XA_HEURHAZ, XA_HEURCOM, XA_HEURRB, XA_HEURMIX, XAER_RMERR,
+ * XAER_RMFAIL, XAER_NOTA, XAER_INVAL, or XAER_PROTO
+ */
+ protected void doRollback(MultiTxInfo txInfo) throws XAException {
+ // check that this tx isn't in the ACTIVE state
+ if (txInfo == curTx) throw new XAException(XAException.XAER_PROTO);
+
+ Throwable exc = null;
+ txInfo.state = MultiTxInfo.States.ROLLINGBACK;
+ try {
+ for (Iterator<XAResource> iter = txInfo.resources.iterator(); iter.hasNext(); ) {
+ XAResource r = iter.next();
+ try {
+ r.rollback(txInfo.xid);
+ iter.remove();
+ } catch (Throwable t) {
+ if (!isHeuristic(t)) iter.remove();
+
+ if (exc == null) {
+ exc = t;
+ } else if (isHeuristic(t) && isHeuristic(exc) &&
+ ((XAException)t).errorCode != ((XAException)exc).errorCode &&
+ ((XAException)exc).errorCode != XAException.XA_HEURMIX ) {
+ exc = (XAException)new XAException(XAException.XA_HEURMIX).initCause(exc);
+ logger.error("2nd or more exception during rollback; resource = '" + r + "'", t);
+ } else {
+ logger.error("2nd or more exception during rollback; resource = '" + r + "'", t);
+ }
+ }
+ }
+
+ if (exc instanceof XAException) {
+ if (((XAException)exc).errorCode == XAException.XA_HEURMIX) throw (XAException)exc;
+ }
+ if (exc != null) {
+ throw (XAException)new XAException(XAException.XA_HEURHAZ).initCause(exc);
+ }
+ } finally {
+ txInfo.state = MultiTxInfo.States.FINISHED;
+ }
+ }
+
+ // Possible exception values are: XAER_RMERR, XAER_RMFAIL, XAER_NOTA, XAER_INVAL, or XAER_PROTO.
+ protected void doForget(MultiTxInfo txInfo) throws XAException {
+ Throwable exc = null;
+
+ for (Iterator<XAResource> iter = txInfo.resources.iterator(); iter.hasNext(); ) {
+ XAResource r = iter.next();
+ try {
+ r.forget(txInfo.xid);
+ iter.remove();
+ } catch (Throwable t) {
+ if (exc == null) {
+ exc = t;
+ } else {
+ logger.error("2nd or more exception during forget; resource = '" + r + "'", t);
+ }
+ }
+ }
+
+ if (exc != null) throwExc(exc);
+ }
+
+ /* flag - One of TMSTARTRSCAN, TMENDRSCAN, TMNOFLAGS
+ * Possible exception values are: XAER_RMERR, XAER_RMFAIL, XAER_INVAL, and XAER_PROTO
+ */
+ public Xid[] recover(int flag) throws XAException {
+ Set<Xid> res = new HashSet<Xid>();
+
+ if ((flag & TMSTARTRSCAN) != 0) {
+ for (MultiTxInfo txInfo : resourceManager.transactions.values()) {
+ if (txInfo.state == MultiTxInfo.States.PREPARING ||
+ txInfo.state == MultiTxInfo.States.PREPARED ||
+ txInfo.state == MultiTxInfo.States.COMMITTING ||
+ txInfo.state == MultiTxInfo.States.ROLLINGBACK ||
+ txInfo.state == MultiTxInfo.States.FINISHED) {
+ res.add(txInfo.xid);
+ }
+ }
+ }
+
+ return res.toArray(new Xid[res.size()]);
+ }
+
+ /**
+ * Tests whether the exception indicates that the resource has completed its participation in
+ * the transaction. See Table 6.4 (page 62) of X/Open for RMFAIL; for NOTA see X/Open pages 37 ff
+ * (xa_end), page 62 (Table 6.4), page 15 ("Rollback-Only", last two sentences), page 18
+ * ("Unilateral RM Action"),
+ *
+ * @param t the exception to test
+ * @return true if the resource is done with this transaction
+ */
+ private static boolean isCompleted(Throwable t) {
+ return (t instanceof XAException) &&
+ (((XAException)t).errorCode == XAException.XAER_RMFAIL ||
+ ((XAException)t).errorCode == XAException.XAER_NOTA);
+ }
+
+ private static final void throwExc(Throwable t) throws XAException {
+ if (t instanceof XAException) throw (XAException)t;
+ if (t instanceof RuntimeException) throw (RuntimeException)t;
+ throw (Error)t;
+ }
+
+ public static class MultiTxInfo extends TxInfo {
+ public enum States { IDLE, ACTIVE, PREPARING, PREPARED, COMMITTING, ROLLINGBACK, FINISHED }
+
+ // should be a Set, but easier to test with List
+ public final List<XAResource> resources = new ArrayList<XAResource>();
+ public States state = States.IDLE;
+ public long startTime = System.currentTimeMillis();
+ }
+}
Property changes on: trunk/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/MultiXAResource.java
___________________________________________________________________
Name: svn:keywords
+ Id HeadURL Revision
Name: svn:eol-style
+ native
Added: trunk/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/MultiXAResourceUnitTest.java
===================================================================
--- trunk/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/MultiXAResourceUnitTest.java (rev 0)
+++ trunk/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/MultiXAResourceUnitTest.java 2008-10-23 13:12:41 UTC (rev 1357)
@@ -0,0 +1,1432 @@
+/*
+ * Copyright 2008 The Topaz Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ * Contributions:
+ */
+
+package org.mulgara.resolver.distributed;
+
+// Java 2 standard packages
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Arrays;
+import java.util.Collection;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+// Third party packages
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+import org.apache.log4j.Logger;
+
+// Locally written packages
+import org.mulgara.resolver.spi.DummyXAResource;
+import org.mulgara.resolver.spi.Resolver;
+import org.mulgara.resolver.spi.ResolverFactory;
+import org.mulgara.resolver.spi.ResolverSession;
+
+/**
+ * Testing MultiXAResource.
+ *
+ * @created 2008-10-21
+ * @author Ronald Tschalär
+ * @copyright ©2008 <a href="http://www.topazproject.org/">Topaz Project Foundation</a>
+ * @licence Apache License v2.0
+ */
+public class MultiXAResourceUnitTest extends TestCase {
+ /** Logger. */
+ private static final Logger logger = Logger.getLogger(MultiXAResourceUnitTest.class);
+
+ // XA error codes in shorter form
+ private static final int UN = 0;
+ private static final int RB = XAException.XA_RBOTHER;
+ private static final int HR = XAException.XA_HEURHAZ;
+ private static final int ER = XAException.XAER_RMERR;
+ private static final int FL = XAException.XAER_RMFAIL;
+ private static final int NT = XAException.XAER_NOTA;
+
+ public MultiXAResourceUnitTest(String name) {
+ super(name);
+ }
+
+ public static Test suite() {
+ TestSuite suite = new TestSuite();
+ suite.addTest(new MultiXAResourceUnitTest("testCommit"));
+ suite.addTest(new MultiXAResourceUnitTest("testRollback"));
+ suite.addTest(new MultiXAResourceUnitTest("testSuspend"));
+ suite.addTest(new MultiXAResourceUnitTest("testStartFailure"));
+ suite.addTest(new MultiXAResourceUnitTest("testSuspendFailure"));
+ suite.addTest(new MultiXAResourceUnitTest("testResumeFailure"));
+ suite.addTest(new MultiXAResourceUnitTest("testEndFailure"));
+ suite.addTest(new MultiXAResourceUnitTest("testPrepareFailure"));
+ suite.addTest(new MultiXAResourceUnitTest("testCommitFailure"));
+ suite.addTest(new MultiXAResourceUnitTest("testRollbackFailure"));
+ suite.addTest(new MultiXAResourceUnitTest("testMultiFailure"));
+
+ return suite;
+ }
+
+ //
+ // Test cases
+ //
+
+ /**
+ * Test simple sequence ending in commit.
+ */
+ public void testCommit() {
+ logger.info("Testing commit");
+
+ testCommit(new MockXAResource[] { new MockXAResource() });
+ testCommit(new MockXAResource[] { new MockXAResource(), new MockXAResource() });
+ testCommit(new MockXAResource[] {
+ new MockXAResource(), new MockXAResource(), new MockXAResource() });
+ }
+
+ private void testCommit(MockXAResource[] mocks) {
+ testCommit(mocks, new int[] { });
+ testCommit(mocks, new int[] { 0 });
+
+ if (mocks.length < 2) return;
+ testCommit(mocks, new int[] { 1 });
+ testCommit(mocks, new int[] { 0, 1 });
+
+ if (mocks.length < 3) return;
+ testCommit(mocks, new int[] { 2 });
+ testCommit(mocks, new int[] { 0, 2 });
+ testCommit(mocks, new int[] { 1, 2 });
+ testCommit(mocks, new int[] { 0, 1, 2 });
+ }
+
+ private void testCommit(MockXAResource[] mocks, int[] readOnlys) {
+ try {
+ TestMultiXAResource xares = new TestMultiXAResource(15, new DummyResolverFactory());
+
+ // one-phase commit
+ for (MockXAResource mock : mocks) mock.reset();
+ for (int idx : readOnlys) mocks[idx].prepareStatus = XAResource.XA_RDONLY;
+
+ Xid xid = new TestXid(1);
+ xares.start(xid, XAResource.TMNOFLAGS);
+
+ for (MockXAResource mock : mocks) xares.enlistResource(mock);
+ for (MockXAResource mock : mocks) assertEquals(MockXAResource.State.ACTIVE, mock.state);
+
+ xares.end(xid, XAResource.TMSUCCESS);
+ for (MockXAResource mock : mocks) assertEquals(MockXAResource.State.ENDED, mock.state);
+
+ xares.commit(xid, true);
+ for (int idx = 0; idx < mocks.length; idx++) {
+ if (Arrays.binarySearch(readOnlys, idx) >= 0) {
+ assertEquals(MockXAResource.State.ROLLEDBACK, mocks[idx].state);
+ } else {
+ assertEquals(MockXAResource.State.COMMITTED, mocks[idx].state);
+ }
+ }
+
+ assertEquals(0, xares.getTxns().size());
+
+ // two-phase commit
+ for (MockXAResource mock : mocks) mock.reset();
+ for (int idx : readOnlys) mocks[idx].prepareStatus = XAResource.XA_RDONLY;
+
+ xid = new TestXid(2);
+ xares.start(xid, XAResource.TMNOFLAGS);
+
+ for (MockXAResource mock : mocks) xares.enlistResource(mock);
+ for (MockXAResource mock : mocks) assertEquals(MockXAResource.State.ACTIVE, mock.state);
+
+ xares.end(xid, XAResource.TMSUCCESS);
+ for (MockXAResource mock : mocks) assertEquals(MockXAResource.State.ENDED, mock.state);
+
+ xares.prepare(xid);
+ for (int idx = 0; idx < mocks.length; idx++) {
+ if (Arrays.binarySearch(readOnlys, idx) >= 0) {
+ assertEquals(MockXAResource.State.ROLLEDBACK, mocks[idx].state);
+ } else {
+ assertEquals(MockXAResource.State.PREPARED, mocks[idx].state);
+ }
+ }
+
+ xares.commit(xid, false);
+ for (int idx = 0; idx < mocks.length; idx++) {
+ if (Arrays.binarySearch(readOnlys, idx) >= 0) {
+ assertEquals(MockXAResource.State.ROLLEDBACK, mocks[idx].state);
+ } else {
+ assertEquals(MockXAResource.State.COMMITTED, mocks[idx].state);
+ }
+ }
+
+ assertEquals(0, xares.getTxns().size());
+ } catch (Exception e) {
+ fail(e);
+ }
+ }
+
+ /**
+ * Test simple sequence ending in rollback.
+ */
+ public void testRollback() {
+ logger.info("Testing rollback");
+
+ testRollback(new MockXAResource[] { new MockXAResource() });
+ testRollback(new MockXAResource[] { new MockXAResource(), new MockXAResource() });
+ testRollback(new MockXAResource[] {
+ new MockXAResource(), new MockXAResource(), new MockXAResource() });
+ }
+
+ private void testRollback(MockXAResource[] mocks) {
+ testRollback(mocks, new int[] { });
+ testRollback(mocks, new int[] { 0 });
+
+ if (mocks.length < 2) return;
+ testRollback(mocks, new int[] { 1 });
+ testRollback(mocks, new int[] { 0, 1 });
+
+ if (mocks.length < 3) return;
+ testRollback(mocks, new int[] { 2 });
+ testRollback(mocks, new int[] { 0, 2 });
+ testRollback(mocks, new int[] { 1, 2 });
+ testRollback(mocks, new int[] { 0, 1, 2 });
+ }
+
+ private void testRollback(MockXAResource[] mocks, int[] readOnlys) {
+ try {
+ TestMultiXAResource xares = new TestMultiXAResource(15, new DummyResolverFactory());
+
+ // rollback w/o prepare
+ for (MockXAResource mock : mocks) mock.reset();
+ for (int idx : readOnlys) mocks[idx].prepareStatus = XAResource.XA_RDONLY;
+
+ Xid xid = new TestXid(1);
+ xares.start(xid, XAResource.TMNOFLAGS);
+
+ for (MockXAResource mock : mocks) xares.enlistResource(mock);
+ for (MockXAResource mock : mocks) assertEquals(MockXAResource.State.ACTIVE, mock.state);
+
+ xares.end(xid, XAResource.TMSUCCESS);
+ for (MockXAResource mock : mocks) assertEquals(MockXAResource.State.ENDED, mock.state);
+
+ xares.rollback(xid);
+ for (MockXAResource mock : mocks) assertEquals(MockXAResource.State.ROLLEDBACK, mock.state);
+
+ assertEquals(0, xares.getTxns().size());
+
+ // rollback after prepare
+ for (MockXAResource mock : mocks) mock.reset();
+ for (int idx : readOnlys) mocks[idx].prepareStatus = XAResource.XA_RDONLY;
+
+ xid = new TestXid(2);
+ xares.start(xid, XAResource.TMNOFLAGS);
+
+ for (MockXAResource mock : mocks) xares.enlistResource(mock);
+ for (MockXAResource mock : mocks) assertEquals(MockXAResource.State.ACTIVE, mock.state);
+
+ xares.end(xid, XAResource.TMSUCCESS);
+ for (MockXAResource mock : mocks) assertEquals(MockXAResource.State.ENDED, mock.state);
+
+ xares.prepare(xid);
+ for (int idx = 0; idx < mocks.length; idx++) {
+ if (Arrays.binarySearch(readOnlys, idx) >= 0) {
+ assertEquals(MockXAResource.State.ROLLEDBACK, mocks[idx].state);
+ } else {
+ assertEquals(MockXAResource.State.PREPARED, mocks[idx].state);
+ }
+ }
+
+ xares.rollback(xid);
+ for (MockXAResource mock : mocks) assertEquals(MockXAResource.State.ROLLEDBACK, mock.state);
+
+ assertEquals(0, xares.getTxns().size());
+ } catch (Exception e) {
+ fail(e);
+ }
+ }
+
+ /**
+ * Test simple suspend/resume.
+ */
+ public void testSuspend() {
+ logger.info("Testing suspend");
+
+ testSuspend(new MockXAResource[] { new MockXAResource() });
+ testSuspend(new MockXAResource[] { new MockXAResource(), new MockXAResource() });
+ testSuspend(new MockXAResource[] {
+ new MockXAResource(), new MockXAResource(), new MockXAResource() });
+ }
+
+ private void testSuspend(MockXAResource[] mocks) {
+ try {
+ TestMultiXAResource xares = new TestMultiXAResource(15, new DummyResolverFactory());
+
+ // simple suspend/resume
+ Xid xid = new TestXid(1);
+ xares.start(xid, XAResource.TMNOFLAGS);
+
+ for (MockXAResource mock : mocks) xares.enlistResource(mock);
+ for (MockXAResource mock : mocks) assertEquals(MockXAResource.State.ACTIVE, mock.state);
+
+ xares.end(xid, XAResource.TMSUSPEND);
+ for (MockXAResource mock : mocks) assertEquals(MockXAResource.State.SUSPENDED, mock.state);
+
+ xares.start(xid, XAResource.TMRESUME);
+ for (MockXAResource mock : mocks) assertEquals(MockXAResource.State.ACTIVE, mock.state);
+
+ xares.end(xid, XAResource.TMSUCCESS);
+ for (MockXAResource mock : mocks) assertEquals(MockXAResource.State.ENDED, mock.state);
+
+ assertEquals(1, xares.getTxns().size());
+
+ // start/suspend/resume first, then enlist second, suspend and resume
+ if (mocks.length < 2) return;
+
+ for (MockXAResource mock : mocks) mock.reset();
+
+ xid = new TestXid(2);
+ xares.start(xid, XAResource.TMNOFLAGS);
+
+ xares.enlistResource(mocks[0]);
+ assertEquals(MockXAResource.State.ACTIVE, mocks[0].state);
+ assertEquals(MockXAResource.State.IDLE, mocks[1].state);
+ if (mocks.length > 2) assertEquals(MockXAResource.State.IDLE, mocks[2].state);
+
+ xares.end(xid, XAResource.TMSUSPEND);
+ assertEquals(MockXAResource.State.SUSPENDED, mocks[0].state);
+ assertEquals(MockXAResource.State.IDLE, mocks[1].state);
+ if (mocks.length > 2) assertEquals(MockXAResource.State.IDLE, mocks[2].state);
+
+ xares.start(xid, XAResource.TMRESUME);
+ assertEquals(MockXAResource.State.ACTIVE, mocks[0].state);
+ assertEquals(MockXAResource.State.IDLE, mocks[1].state);
+ if (mocks.length > 2) assertEquals(MockXAResource.State.IDLE, mocks[2].state);
+
+ xares.enlistResource(mocks[1]);
+ assertEquals(MockXAResource.State.ACTIVE, mocks[0].state);
+ assertEquals(MockXAResource.State.ACTIVE, mocks[1].state);
+ if (mocks.length > 2) assertEquals(MockXAResource.State.IDLE, mocks[2].state);
+
+ xares.end(xid, XAResource.TMSUCCESS);
+ assertEquals(MockXAResource.State.ENDED, mocks[0].state);
+ assertEquals(MockXAResource.State.ENDED, mocks[1].state);
+ if (mocks.length > 2) assertEquals(MockXAResource.State.IDLE, mocks[2].state);
+
+ assertEquals(2, xares.getTxns().size());
+ } catch (Exception e) {
+ fail(e);
+ }
+ }
+
+ /**
+ * Test start failures.
+ */
+ public void testStartFailure() {
+ logger.info("Testing start failure");
+
+ testStartFailure(new MockFailingXAResource[] { new MockFailingXAResource() });
+ testStartFailure(new MockFailingXAResource[] {
+ new MockFailingXAResource(), new MockFailingXAResource() });
+ testStartFailure(new MockFailingXAResource[] {
+ new MockFailingXAResource(), new MockFailingXAResource(), new MockFailingXAResource() });
+ }
+
+ private void testStartFailure(MockFailingXAResource[] mocks) {
+ testStartFailure(mocks, UN, 0);
+ testStartFailure(mocks, RB, 0);
+ testStartFailure(mocks, FL, 0);
+ testStartFailure(mocks, NT, 0);
+
+ if (mocks.length < 2) return;
+ testStartFailure(mocks, UN, 1);
+ testStartFailure(mocks, RB, 1);
+ testStartFailure(mocks, FL, 1);
+ testStartFailure(mocks, NT, 1);
+
+ if (mocks.length < 3) return;
+ testStartFailure(mocks, UN, 2);
+ testStartFailure(mocks, RB, 2);
+ testStartFailure(mocks, FL, 2);
+ testStartFailure(mocks, NT, 2);
+ }
+
+ private void testStartFailure(MockFailingXAResource[] mocks, int errorCode, int failer) {
+ try {
+ TestMultiXAResource xares = new TestMultiXAResource(15, new DummyResolverFactory());
+
+ for (MockXAResource mock : mocks) mock.reset();
+ setFailMode(mocks[failer], errorCode, 1, -1, -1, -1, false, false, false);
+
+ Xid xid = new TestXid(1);
+ xares.start(xid, XAResource.TMNOFLAGS);
+
+ for (int idx = 0; idx < failer; idx++) xares.enlistResource(mocks[idx]);
+ try {
+ xares.enlistResource(mocks[failer]);
+ fail("should have thrown exception");
+ } catch (XAException xae) {
+ if (errorCode == FL) assertTrue(FL != xae.errorCode);
+ else if (errorCode == NT) assertTrue(NT != xae.errorCode);
+ else assertEquals(errorCode, xae.errorCode);
+
+ assertStates(mocks, failer, MockXAResource.State.ACTIVE,
+ (errorCode == FL || errorCode == NT) ? MockXAResource.State.ROLLEDBACK :
+ MockXAResource.State.RB_ONLY,
+ MockXAResource.State.IDLE);
+ }
+
+ xares.end(xid, XAResource.TMFAIL);
+ assertStates(mocks, failer, MockXAResource.State.ENDED,
+ (errorCode == FL || errorCode == NT) ? MockXAResource.State.ROLLEDBACK :
+ MockXAResource.State.RB_ONLY,
+ MockXAResource.State.IDLE);
+
+ xares.rollback(xid);
+ assertStates(mocks, failer, MockXAResource.State.ROLLEDBACK, MockXAResource.State.ROLLEDBACK,
+ MockXAResource.State.IDLE);
+
+ assertEquals(0, xares.getTxns().size());
+ } catch (Exception e) {
+ fail(e);
+ }
+ }
+
+ /**
+ * Test suspend failures.
+ */
+ public void testSuspendFailure() {
+ logger.info("Testing suspend failure");
+
+ testSuspendFailure(new MockFailingXAResource[] { new MockFailingXAResource() });
+ testSuspendFailure(new MockFailingXAResource[] {
+ new MockFailingXAResource(), new MockFailingXAResource() });
+ testSuspendFailure(new MockFailingXAResource[] {
+ new MockFailingXAResource(), new MockFailingXAResource(), new MockFailingXAResource() });
+ }
+
+ private void testSuspendFailure(MockFailingXAResource[] mocks) {
+ testSuspendFailure(mocks, UN, 0);
+ testSuspendFailure(mocks, RB, 0);
+ testSuspendFailure(mocks, FL, 0);
+ testSuspendFailure(mocks, NT, 0);
+
+ if (mocks.length < 2) return;
+ testSuspendFailure(mocks, UN, 1);
+ testSuspendFailure(mocks, RB, 1);
+ testSuspendFailure(mocks, FL, 1);
+ testSuspendFailure(mocks, NT, 1);
+
+ if (mocks.length < 3) return;
+ testSuspendFailure(mocks, UN, 2);
+ testSuspendFailure(mocks, RB, 2);
+ testSuspendFailure(mocks, FL, 2);
+ testSuspendFailure(mocks, NT, 2);
+ }
+
+ private void testSuspendFailure(MockFailingXAResource[] mocks, int errorCode, int failer) {
+ try {
+ TestMultiXAResource xares = new TestMultiXAResource(15, new DummyResolverFactory());
+
+ for (MockXAResource mock : mocks) mock.reset();
+ setFailMode(mocks[failer], errorCode, -1, 1, -1, -1, false, false, false);
+
+ Xid xid = new TestXid(1);
+ xares.start(xid, XAResource.TMNOFLAGS);
+
+ for (MockXAResource mock : mocks) xares.enlistResource(mock);
+ assertStates(mocks, failer, MockXAResource.State.ACTIVE, MockXAResource.State.ACTIVE,
+ MockXAResource.State.ACTIVE);
+
+ try {
+ xares.end(xid, XAResource.TMSUSPEND);
+ fail("should have thrown exception");
+ } catch (XAException xae) {
+ if (errorCode == FL) assertTrue(FL != xae.errorCode);
+ else if (errorCode == NT) assertTrue(NT != xae.errorCode);
+ else assertEquals(errorCode, xae.errorCode);
+
+ assertStates(mocks, failer, MockXAResource.State.ENDED,
+ (errorCode == FL || errorCode == NT) ? MockXAResource.State.ROLLEDBACK :
+ MockXAResource.State.RB_ONLY,
+ MockXAResource.State.ENDED);
+ }
+
+ xares.rollback(xid);
+ assertStates(mocks, failer, MockXAResource.State.ROLLEDBACK, MockXAResource.State.ROLLEDBACK,
+ MockXAResource.State.ROLLEDBACK);
+
+ assertEquals(0, xares.getTxns().size());
+ } catch (Exception e) {
+ fail(e);
+ }
+ }
+
+ /**
+ * Test resume failures.
+ */
+ public void testResumeFailure() {
+ logger.info("Testing resume failure");
+
+ testResumeFailure(new MockFailingXAResource[] { new MockFailingXAResource() });
+ testResumeFailure(new MockFailingXAResource[] {
+ new MockFailingXAResource(), new MockFailingXAResource() });
+ testResumeFailure(new MockFailingXAResource[] {
+ new MockFailingXAResource(), new MockFailingXAResource(), new MockFailingXAResource() });
+ }
+
+ private void testResumeFailure(MockFailingXAResource[] mocks) {
+ testResumeFailure(mocks, UN, 0);
+ testResumeFailure(mocks, RB, 0);
+ testResumeFailure(mocks, FL, 0);
+ testResumeFailure(mocks, NT, 0);
+
+ if (mocks.length < 2) return;
+ testResumeFailure(mocks, UN, 1);
+ testResumeFailure(mocks, RB, 1);
+ testResumeFailure(mocks, FL, 1);
+ testResumeFailure(mocks, NT, 1);
+
+ if (mocks.length < 3) return;
+ testResumeFailure(mocks, UN, 2);
+ testResumeFailure(mocks, RB, 2);
+ testResumeFailure(mocks, FL, 2);
+ testResumeFailure(mocks, NT, 2);
+ }
+
+ private void testResumeFailure(MockFailingXAResource[] mocks, int errorCode, int failer) {
+ try {
+ TestMultiXAResource xares = new TestMultiXAResource(15, new DummyResolverFactory());
+
+ for (MockXAResource mock : mocks) mock.reset();
+ setFailMode(mocks[failer], errorCode, -1, -1, 1, -1, false, false, false);
+
+ Xid xid = new TestXid(1);
+ xares.start(xid, XAResource.TMNOFLAGS);
+
+ for (MockXAResource mock : mocks) xares.enlistResource(mock);
+ assertStates(mocks, failer, MockXAResource.State.ACTIVE, MockXAResource.State.ACTIVE,
+ MockXAResource.State.ACTIVE);
+
+ xares.end(xid, XAResource.TMSUSPEND);
+ assertStates(mocks, failer, MockXAResource.State.SUSPENDED, MockXAResource.State.SUSPENDED,
+ MockXAResource.State.SUSPENDED);
+
+ try {
+ xares.start(xid, XAResource.TMRESUME);
+ fail("should have thrown exception");
+ } catch (XAException xae) {
+ if (errorCode == FL) assertTrue(FL != xae.errorCode);
+ else if (errorCode == NT) assertTrue(NT != xae.errorCode);
+ else assertEquals(errorCode, xae.errorCode);
+
+ assertStates(mocks, failer, MockXAResource.State.ENDED,
+ (errorCode == FL || errorCode == NT) ? MockXAResource.State.ROLLEDBACK :
+ MockXAResource.State.RB_ONLY,
+ MockXAResource.State.ENDED);
+ }
+
+ xares.rollback(xid);
+ assertStates(mocks, failer, MockXAResource.State.ROLLEDBACK, MockXAResource.State.ROLLEDBACK,
+ MockXAResource.State.ROLLEDBACK);
+
+ assertEquals(0, xares.getTxns().size());
+ } catch (Exception e) {
+ fail(e);
+ }
+ }
+
+ /**
+ * Test end failures.
+ */
+ public void testEndFailure() {
+ logger.info("Testing end failure");
+
+ testEndFailure(new MockFailingXAResource[] { new MockFailingXAResource() });
+ testEndFailure(new MockFailingXAResource[] {
+ new MockFailingXAResource(), new MockFailingXAResource() });
+ testEndFailure(new MockFailingXAResource[] {
+ new MockFailingXAResource(), new MockFailingXAResource(), new MockFailingXAResource() });
+ }
+
+ private void testEndFailure(MockFailingXAResource[] mocks) {
+ testEndFailure(mocks, UN, 0);
+ testEndFailure(mocks, RB, 0);
+ testEndFailure(mocks, FL, 0);
+ testEndFailure(mocks, NT, 0);
+
+ if (mocks.length < 2) return;
+ testEndFailure(mocks, UN, 1);
+ testEndFailure(mocks, RB, 1);
+ testEndFailure(mocks, FL, 1);
+ testEndFailure(mocks, NT, 1);
+
+ if (mocks.length < 3) return;
+ testEndFailure(mocks, UN, 2);
+ testEndFailure(mocks, RB, 2);
+ testEndFailure(mocks, FL, 2);
+ testEndFailure(mocks, NT, 2);
+ }
+
+ private void testEndFailure(MockFailingXAResource[] mocks, int errorCode, int failer) {
+ try {
+ TestMultiXAResource xares = new TestMultiXAResource(15, new DummyResolverFactory());
+
+ // straight start-end
+ for (MockXAResource mock : mocks) mock.reset();
+ setFailMode(mocks[failer], errorCode, -1, -1, -1, 1, false, false, false);
+
+ Xid xid = new TestXid(1);
+ xares.start(xid, XAResource.TMNOFLAGS);
+
+ for (MockXAResource mock : mocks) xares.enlistResource(mock);
+ assertStates(mocks, failer, MockXAResource.State.ACTIVE, MockXAResource.State.ACTIVE,
+ MockXAResource.State.ACTIVE);
+
+ try {
+ xares.end(xid, XAResource.TMSUCCESS);
+ fail("should have thrown exception");
+ } catch (XAException xae) {
+ if (errorCode == FL) assertTrue(FL != xae.errorCode);
+ else if (errorCode == NT) assertTrue(NT != xae.errorCode);
+ else assertEquals(errorCode, xae.errorCode);
+
+ assertStates(mocks, failer, MockXAResource.State.ENDED,
+ (errorCode == FL || errorCode == NT) ? MockXAResource.State.ROLLEDBACK :
+ MockXAResource.State.RB_ONLY,
+ MockXAResource.State.ENDED);
+ }
+
+ xares.rollback(xid);
+ assertStates(mocks, failer, MockXAResource.State.ROLLEDBACK, MockXAResource.State.ROLLEDBACK,
+ MockXAResource.State.ROLLEDBACK);
+
+ assertEquals(0, xares.getTxns().size());
+
+ // start-suspend-resume-end
+ for (MockXAResource mock : mocks) mock.reset();
+ setFailMode(mocks[failer], errorCode, -1, -1, -1, 1, false, false, false);
+
+ xid = new TestXid(2);
+ xares.start(xid, XAResource.TMNOFLAGS);
+
+ for (MockXAResource mock : mocks) xares.enlistResource(mock);
+ assertStates(mocks, failer, MockXAResource.State.ACTIVE, MockXAResource.State.ACTIVE,
+ MockXAResource.State.ACTIVE);
+
+ xares.end(xid, XAResource.TMSUSPEND);
+ assertStates(mocks, failer, MockXAResource.State.SUSPENDED, MockXAResource.State.SUSPENDED,
+ MockXAResource.State.SUSPENDED);
+
+ xares.start(xid, XAResource.TMRESUME);
+ assertStates(mocks, failer, MockXAResource.State.ACTIVE, MockXAResource.State.ACTIVE,
+ MockXAResource.State.ACTIVE);
+
+ try {
+ xares.end(xid, XAResource.TMSUCCESS);
+ fail("should have thrown exception");
+ } catch (XAException xae) {
+ if (errorCode == FL) assertTrue(FL != xae.errorCode);
+ else if (errorCode == NT) assertTrue(NT != xae.errorCode);
+ else assertEquals(errorCode, xae.errorCode);
+
+ assertStates(mocks, failer, MockXAResource.State.ENDED,
+ (errorCode == FL || errorCode == NT) ? MockXAResource.State.ROLLEDBACK :
+ MockXAResource.State.RB_ONLY,
+ MockXAResource.State.ENDED);
+ }
+
+ xares.rollback(xid);
+ assertStates(mocks, failer, MockXAResource.State.ROLLEDBACK, MockXAResource.State.ROLLEDBACK,
+ MockXAResource.State.ROLLEDBACK);
+
+ assertEquals(0, xares.getTxns().size());
+ } catch (Exception e) {
+ fail(e);
+ }
+ }
+
+ /**
+ * Test prepare failures.
+ */
+ public void testPrepareFailure() {
+ logger.info("Testing prepare failure");
+
+ testPrepareFailure(new MockFailingXAResource[] { new MockFailingXAResource() });
+ testPrepareFailure(new MockFailingXAResource[] {
+ new MockFailingXAResource(), new MockFailingXAResource() });
+ testPrepareFailure(new MockFailingXAResource[] {
+ new MockFailingXAResource(), new MockFailingXAResource(), new MockFailingXAResource() });
+ }
+
+ private void testPrepareFailure(MockFailingXAResource[] mocks) {
+ testPrepareFailure(mocks, UN, 0);
+ testPrepareFailure(mocks, RB, 0);
+ testPrepareFailure(mocks, FL, 0);
+ testPrepareFailure(mocks, NT, 0);
+
+ if (mocks.length < 2) return;
+ testPrepareFailure(mocks, UN, 1);
+ testPrepareFailure(mocks, RB, 1);
+ testPrepareFailure(mocks, FL, 1);
+ testPrepareFailure(mocks, NT, 1);
+
+ if (mocks.length < 3) return;
+ testPrepareFailure(mocks, UN, 2);
+ testPrepareFailure(mocks, RB, 2);
+ testPrepareFailure(mocks, FL, 2);
+ testPrepareFailure(mocks, NT, 2);
+ }
+
+ private void testPrepareFailure(MockFailingXAResource[] mocks, int errorCode, int failer) {
+ try {
+ TestMultiXAResource xares = new TestMultiXAResource(15, new DummyResolverFactory());
+
+ // implied prepare (one phase commit)
+ for (MockXAResource mock : mocks) mock.reset();
+ setFailMode(mocks[failer], errorCode, -1, -1, -1, -1, true, false, false);
+
+ Xid xid = new TestXid(1);
+ xares.start(xid, XAResource.TMNOFLAGS);
+
+ for (MockXAResource mock : mocks) xares.enlistResource(mock);
+ assertStates(mocks, failer, MockXAResource.State.ACTIVE, MockXAResource.State.ACTIVE,
+ MockXAResource.State.ACTIVE);
+
+ xares.end(xid, XAResource.TMSUCCESS);
+ assertStates(mocks, failer, MockXAResource.State.ENDED, MockXAResource.State.ENDED,
+ MockXAResource.State.ENDED);
+
+ try {
+ xares.commit(xid, true);
+ fail("should have thrown exception");
+ } catch (XAException xae) {
+ assertTrue("not a rollback: " + xae.errorCode, DummyXAResource.isRollback(xae.errorCode));
+
+ assertStates(mocks, failer, MockXAResource.State.ROLLEDBACK,
+ MockXAResource.State.ROLLEDBACK, MockXAResource.State.ROLLEDBACK);
+ }
+
+ assertEquals(0, xares.getTxns().size());
+
+ // explicit prepare (two phase commit)
+ for (MockXAResource mock : mocks) mock.reset();
+ setFailMode(mocks[failer], errorCode, -1, -1, -1, -1, true, false, false);
+
+ xid = new TestXid(2);
+ xares.start(xid, XAResource.TMNOFLAGS);
+
+ for (MockXAResource mock : mocks) xares.enlistResource(mock);
+ assertStates(mocks, failer, MockXAResource.State.ACTIVE, MockXAResource.State.ACTIVE,
+ MockXAResource.State.ACTIVE);
+
+ xares.end(xid, XAResource.TMSUCCESS);
+ assertStates(mocks, failer, MockXAResource.State.ENDED, MockXAResource.State.ENDED,
+ MockXAResource.State.ENDED);
+
+ try {
+ xares.prepare(xid);
+ fail("should have thrown exception");
+ } catch (XAException xae) {
+ boolean isEndCode = errorCode == FL || errorCode == NT || DummyXAResource.isRollback(errorCode);
+ if (isEndCode) assertEquals(ER, xae.errorCode);
+ else assertEquals(errorCode, xae.errorCode);
+
+ assertStates(mocks, failer, MockXAResource.State.PREPARED,
+ (isEndCode) ? MockXAResource.State.ROLLEDBACK : MockXAResource.State.RB_ONLY,
+ MockXAResource.State.PREPARED);
+ }
+
+ xares.rollback(xid);
+ assertStates(mocks, failer, MockXAResource.State.ROLLEDBACK, MockXAResource.State.ROLLEDBACK,
+ MockXAResource.State.ROLLEDBACK);
+
+ assertEquals(0, xares.getTxns().size());
+ } catch (Exception e) {
+ fail(e);
+ }
+ }
+
+ /**
+ * Test commit failures.
+ */
+ public void testCommitFailure() {
+ logger.info("Testing commit failure");
+
+ testCommitFailure(new MockFailingXAResource[] { new MockFailingXAResource() });
+ testCommitFailure(new MockFailingXAResource[] {
+ new MockFailingXAResource(), new MockFailingXAResource() });
+ testCommitFailure(new MockFailingXAResource[] {
+ new MockFailingXAResource(), new MockFailingXAResource(), new MockFailingXAResource() });
+ }
+
+ private void testCommitFailure(MockFailingXAResource[] mocks) {
+ testCommitFailure(mocks, UN, 0);
+ testCommitFailure(mocks, RB, 0);
+ testCommitFailure(mocks, HR, 0);
+ testCommitFailure(mocks, FL, 0);
+
+ if (mocks.length < 2) return;
+ testCommitFailure(mocks, UN, 1);
+ testCommitFailure(mocks, RB, 1);
+ testCommitFailure(mocks, HR, 1);
+ testCommitFailure(mocks, FL, 1);
+
+ if (mocks.length < 3) return;
+ testCommitFailure(mocks, UN, 2);
+ testCommitFailure(mocks, RB, 2);
+ testCommitFailure(mocks, HR, 2);
+ testCommitFailure(mocks, FL, 2);
+ }
+
+ private void testCommitFailure(MockFailingXAResource[] mocks, int errorCode, int failer) {
+ try {
+ TestMultiXAResource xares = new TestMultiXAResource(15, new DummyResolverFactory());
+
+ // one-phase
+ for (MockXAResource mock : mocks) mock.reset();
+ setFailMode(mocks[failer], errorCode, -1, -1, -1, -1, false, true, false);
+
+ Xid xid = new TestXid(1);
+ xares.start(xid, XAResource.TMNOFLAGS);
+
+ for (MockXAResource mock : mocks) xares.enlistResource(mock);
+ assertStates(mocks, failer, MockXAResource.State.ACTIVE, MockXAResource.State.ACTIVE,
+ MockXAResource.State.ACTIVE);
+
+ xares.end(xid, XAResource.TMSUCCESS);
+ assertStates(mocks, failer, MockXAResource.State.ENDED, MockXAResource.State.ENDED,
+ MockXAResource.State.ENDED);
+
+ try {
+ xares.commit(xid, true);
+ fail("should have thrown exception");
+ } catch (XAException xae) {
+ assertTrue("expected heuristic code: " + xae.errorCode, DummyXAResource.isHeuristic(xae.errorCode));
+
+ assertStates(mocks, failer, MockXAResource.State.COMMITTED,
+ (errorCode == UN) ? MockXAResource.State.COMMITTED :
+ (errorCode == HR) ? MockXAResource.State.HEUR :
+ MockXAResource.State.ROLLEDBACK,
+ (failer == 0 && errorCode == RB) ? MockXAResource.State.ROLLEDBACK : MockXAResource.State.COMMITTED);
+ }
+
+ xares.forget(xid);
+ assertStates(mocks, failer, MockXAResource.State.COMMITTED,
+ (errorCode == UN) ? MockXAResource.State.COMMITTED :
+ (errorCode == HR) ? MockXAResource.State.HEUR_DONE :
+ MockXAResource.State.ROLLEDBACK,
+ (failer == 0 && errorCode == RB) ? MockXAResource.State.ROLLEDBACK :
+ MockXAResource.State.COMMITTED);
+
+ assertEquals(0, xares.getTxns().size());
+
+ // two-phase
+ for (MockXAResource mock : mocks) mock.reset();
+ setFailMode(mocks[failer], errorCode, -1, -1, -1, -1, false, true, false);
+
+ xid = new TestXid(2);
+ xares.start(xid, XAResource.TMNOFLAGS);
+
+ for (MockXAResource mock : mocks) xares.enlistResource(mock);
+ assertStates(mocks, failer, MockXAResource.State.ACTIVE, MockXAResource.State.ACTIVE,
+ MockXAResource.State.ACTIVE);
+
+ xares.end(xid, XAResource.TMSUCCESS);
+ assertStates(mocks, failer, MockXAResource.State.ENDED, MockXAResource.State.ENDED,
+ MockXAResource.State.ENDED);
+
+ xares.prepare(xid);
+ assertStates(mocks, failer, MockXAResource.State.PREPARED, MockXAResource.State.PREPARED,
+ MockXAResource.State.PREPARED);
+
+ try {
+ xares.commit(xid, false);
+ fail("should have thrown exception");
+ } catch (XAException xae) {
+ assertTrue("expected heuristic code: " + xae.errorCode, DummyXAResource.isHeuristic(xae.errorCode));
+
+ assertStates(mocks, failer, MockXAResource.State.COMMITTED,
+ (errorCode == UN) ? MockXAResource.State.COMMITTED :
+ (errorCode == HR) ? MockXAResource.State.HEUR :
+ MockXAResource.State.ROLLEDBACK,
+ (failer == 0 && errorCode == RB) ? MockXAResource.State.ROLLEDBACK :
+ MockXAResource.State.COMMITTED);
+ }
+
+ xares.forget(xid);
+ assertStates(mocks, failer, MockXAResource.State.COMMITTED,
+ (errorCode == UN) ? MockXAResource.State.COMMITTED :
+ (errorCode == HR) ? MockXAResource.State.HEUR_DONE :
+ MockXAResource.State.ROLLEDBACK,
+ (failer == 0 && errorCode == RB) ? MockXAResource.State.ROLLEDBACK :
+ MockXAResource.State.COMMITTED);
+
+ assertEquals(0, xares.getTxns().size());
+ } catch (Exception e) {
+ fail(e);
+ }
+ }
+
+ /**
+ * Test rollback failures.
+ */
+ public void testRollbackFailure() {
+ logger.info("Testing rollback failure");
+
+ testRollbackFailure(new MockFailingXAResource[] { new MockFailingXAResource() });
+ testRollbackFailure(new MockFailingXAResource[] {
+ new MockFailingXAResource(), new MockFailingXAResource() });
+ testRollbackFailure(new MockFailingXAResource[] {
+ new MockFailingXAResource(), new MockFailingXAResource(), new MockFailingXAResource() });
+ }
+
+ private void testRollbackFailure(MockFailingXAResource[] mocks) {
+ testRollbackFailure(mocks, UN, 0);
+ testRollbackFailure(mocks, RB, 0);
+ testRollbackFailure(mocks, HR, 0);
+ testRollbackFailure(mocks, FL, 0);
+
+ if (mocks.length < 2) return;
+ testRollbackFailure(mocks, UN, 1);
+ testRollbackFailure(mocks, RB, 1);
+ testRollbackFailure(mocks, HR, 1);
+ testRollbackFailure(mocks, FL, 1);
+
+ if (mocks.length < 3) return;
+ testRollbackFailure(mocks, UN, 2);
+ testRollbackFailure(mocks, RB, 2);
+ testRollbackFailure(mocks, HR, 2);
+ testRollbackFailure(mocks, FL, 2);
+ }
+
+ private void testRollbackFailure(MockFailingXAResource[] mocks, int errorCode, int failer) {
+ try {
+ TestMultiXAResource xares = new TestMultiXAResource(15, new DummyResolverFactory());
+
+ // after end
+ for (MockXAResource mock : mocks) mock.reset();
+ setFailMode(mocks[failer], errorCode, -1, -1, -1, -1, false, false, true);
+
+ Xid xid = new TestXid(1);
+ xares.start(xid, XAResource.TMNOFLAGS);
+
+ for (MockXAResource mock : mocks) xares.enlistResource(mock);
+ assertStates(mocks, failer, MockXAResource.State.ACTIVE, MockXAResource.State.ACTIVE,
+ MockXAResource.State.ACTIVE);
+
+ xares.end(xid, XAResource.TMSUCCESS);
+ assertStates(mocks, failer, MockXAResource.State.ENDED, MockXAResource.State.ENDED,
+ MockXAResource.State.ENDED);
+
+ try {
+ xares.rollback(xid);
+ fail("should have thrown exception");
+ } catch (XAException xae) {
+ assertTrue("expected heuristic code: " + xae.errorCode, DummyXAResource.isHeuristic(xae.errorCode));
+
+ assertStates(mocks, failer, MockXAResource.State.ROLLEDBACK,
+ (errorCode == HR) ? MockXAResource.State.HEUR :
+ MockXAResource.State.ROLLEDBACK,
+ MockXAResource.State.ROLLEDBACK);
+ }
+
+ xares.forget(xid);
+ assertStates(mocks, failer, MockXAResource.State.ROLLEDBACK,
+ (errorCode == HR) ? MockXAResource.State.HEUR_DONE :
+ MockXAResource.State.ROLLEDBACK,
+ MockXAResource.State.ROLLEDBACK);
+
+ assertEquals(0, xares.getTxns().size());
+
+ // after prepare
+ for (MockXAResource mock : mocks) mock.reset();
+ setFailMode(mocks[failer], errorCode, -1, -1, -1, -1, false, false, true);
+
+ xid = new TestXid(2);
+ xares.start(xid, XAResource.TMNOFLAGS);
+
+ for (MockXAResource mock : mocks) xares.enlistResource(mock);
+ assertStates(mocks, failer, MockXAResource.State.ACTIVE, MockXAResource.State.ACTIVE,
+ MockXAResource.State.ACTIVE);
+
+ xares.end(xid, XAResource.TMSUCCESS);
+ assertStates(mocks, failer, MockXAResource.State.ENDED, MockXAResource.State.ENDED,
+ MockXAResource.State.ENDED);
+
+ xares.prepare(xid);
+ assertStates(mocks, failer, MockXAResource.State.PREPARED, MockXAResource.State.PREPARED,
+ MockXAResource.State.PREPARED);
+
+ try {
+ xares.rollback(xid);
+ fail("should have thrown exception");
+ } catch (XAException xae) {
+ assertTrue("expected heuristic code: " + xae.errorCode, DummyXAResource.isHeuristic(xae.errorCode));
+
+ assertStates(mocks, failer, MockXAResource.State.ROLLEDBACK,
+ (errorCode == HR) ? MockXAResource.State.HEUR :
+ MockXAResource.State.ROLLEDBACK,
+ MockXAResource.State.ROLLEDBACK);
+ }
+
+ xares.forget(xid);
+ assertStates(mocks, failer, MockXAResource.State.ROLLEDBACK,
+ (errorCode == HR) ? MockXAResource.State.HEUR_DONE :
+ MockXAResource.State.ROLLEDBACK,
+ MockXAResource.State.ROLLEDBACK);
+
+ assertEquals(0, xares.getTxns().size());
+ } catch (Exception e) {
+ fail(e);
+ }
+ }
+
+ public void testMultiFailure() {
+ logger.info("Testing multiple failures");
+
+ try {
+ TestMultiXAResource xares = new TestMultiXAResource(15, new DummyResolverFactory());
+ MockFailingXAResource[] mocks =
+ new MockFailingXAResource[] { new MockFailingXAResource(), new MockFailingXAResource() };
+
+ // two in end
+ for (MockXAResource mock : mocks) mock.reset();
+ setFailMode(mocks[0], FL, -1, -1, -1, 1, false, false, false);
+ setFailMode(mocks[1], RB, -1, -1, -1, 1, false, false, false);
+
+ Xid xid = new TestXid(1);
+ xares.start(xid, XAResource.TMNOFLAGS);
+
+ for (MockXAResource mock : mocks) xares.enlistResource(mock);
+ assertEquals(MockXAResource.State.ACTIVE, mocks[0].state);
+ assertEquals(MockXAResource.State.ACTIVE, mocks[1].state);
+
+ try {
+ xares.end(xid, XAResource.TMSUCCESS);
+ fail("should have thrown exception");
+ } catch (XAException xae) {
+ assertTrue(xae.errorCode != FL);
+
+ assertEquals(MockXAResource.State.ROLLEDBACK, mocks[0].state);
+ assertEquals(MockXAResource.State.RB_ONLY, mocks[1].state);
+ }
+
+ xares.rollback(xid);
+ assertEquals(MockXAResource.State.ROLLEDBACK, mocks[0].state);
+ assertEquals(MockXAResource.State.ROLLEDBACK, mocks[1].state);
+
+ assertEquals(0, xares.getTxns().size());
+
+ // two RMFAIL in end
+ for (MockXAResource mock : mocks) mock.reset();
+ setFailMode(mocks[0], FL, -1, -1, -1, 1, false, false, false);
+ setFailMode(mocks[1], FL, -1, -1, -1, 1, false, false, false);
+
+ xid = new TestXid(2);
+ xares.start(xid, XAResource.TMNOFLAGS);
+
+ for (MockXAResource mock : mocks) xares.enlistResource(mock);
+ assertEquals(MockXAResource.State.ACTIVE, mocks[0].state);
+ assertEquals(MockXAResource.State.ACTIVE, mocks[1].state);
+
+ try {
+ xares.end(xid, XAResource.TMSUCCESS);
+ fail("should have thrown exception");
+ } catch (XAException xae) {
+ assertTrue(xae.errorCode != FL);
+
+ assertEquals(MockXAResource.State.ROLLEDBACK, mocks[0].state);
+ assertEquals(MockXAResource.State.ROLLEDBACK, mocks[1].state);
+ }
+
+ xares.rollback(xid);
+ assertEquals(MockXAResource.State.ROLLEDBACK, mocks[0].state);
+ assertEquals(MockXAResource.State.ROLLEDBACK, mocks[1].state);
+
+ assertEquals(0, xares.getTxns().size());
+
+ // one in prepare, one in rollback
+ for (MockXAResource mock : mocks) mock.reset();
+ setFailMode(mocks[0], UN, -1, -1, -1, -1, true, false, false);
+ setFailMode(mocks[1], HR, -1, -1, -1, -1, false, false, true);
+
+ xid = new TestXid(3);
+ xares.start(xid, XAResource.TMNOFLAGS);
+
+ for (MockXAResource mock : mocks) xares.enlistResource(mock);
+ assertEquals(MockXAResource.State.ACTIVE, mocks[0].state);
+ assertEquals(MockXAResource.State.ACTIVE, mocks[1].state);
+
+ xares.end(xid, XAResource.TMSUCCESS);
+ assertEquals(MockXAResource.State.ENDED, mocks[0].state);
+ assertEquals(MockXAResource.State.ENDED, mocks[1].state);
+
+ try {
+ xares.prepare(xid);
+ fail("should have thrown exception");
+ } catch (XAException xae) {
+ assertTrue(xae.errorCode != FL);
+
+ assertEquals(MockXAResource.State.RB_ONLY, mocks[0].state);
+ assertEquals(MockXAResource.State.PREPARED, mocks[1].state);
+ }
+
+ try {
+ xares.rollback(xid);
+ } catch (XAException xae) {
+ assertTrue(xae.errorCode != FL);
+
+ assertEquals(MockXAResource.State.ROLLEDBACK, mocks[0].state);
+ assertEquals(MockXAResource.State.HEUR, mocks[1].state);
+ }
+
+ xares.forget(xid);
+ assertEquals(MockXAResource.State.ROLLEDBACK, mocks[0].state);
+ assertEquals(MockXAResource.State.HEUR_DONE, mocks[1].state);
+
+ assertEquals(0, xares.getTxns().size());
+
+ // two in commit
+ for (MockXAResource mock : mocks) mock.reset();
+ setFailMode(mocks[0], UN, -1, -1, -1, -1, false, true, false);
+ setFailMode(mocks[1], FL, -1, -1, -1, -1, false, true, false);
+
+ xid = new TestXid(4);
+ xares.start(xid, XAResource.TMNOFLAGS);
+
+ for (MockXAResource mock : mocks) xares.enlistResource(mock);
+ assertEquals(MockXAResource.State.ACTIVE, mocks[0].state);
+ assertEquals(MockXAResource.State.ACTIVE, mocks[1].state);
+
+ xares.end(xid, XAResource.TMSUCCESS);
+ assertEquals(MockXAResource.State.ENDED, mocks[0].state);
+ assertEquals(MockXAResource.State.ENDED, mocks[1].state);
+
+ try {
+ xares.commit(xid, true);
+ fail("should have thrown exception");
+ } catch (XAException xae) {
+ assertTrue("expected heuristic code: " + xae.errorCode, DummyXAResource.isHeuristic(xae.errorCode));
+
+ assertEquals(MockXAResource.State.COMMITTED, mocks[0].state);
+ assertEquals(MockXAResource.State.ROLLEDBACK, mocks[1].state);
+ }
+
+ xares.forget(xid);
+ assertEquals(MockXAResource.State.COMMITTED, mocks[0].state);
+ assertEquals(MockXAResource.State.ROLLEDBACK, mocks[1].state);
+
+ assertEquals(0, xares.getTxns().size());
+ } catch (Exception e) {
+ fail(e);
+ }
+ }
+
+
+ //
+ // Internal Test Helpers
+ //
+
+ private static void assertStates(MockFailingXAResource[] mocks, int failer,
+ MockXAResource.State preFail, MockXAResource.State fail,
+ MockXAResource.State postFail) {
+ for (int idx = 0; idx < failer; idx++) assertEquals(preFail, mocks[idx].state);
+ assertEquals(fail, mocks[failer].state);
+ for (int idx = failer + 1; idx < mocks.length; idx++) assertEquals(postFail, mocks[idx].state);
+ }
+
+ private static void setFailMode(MockFailingXAResource mock, int errorCode, int failStartAfter,
+ int failSuspendAfter, int failResumeAfter, int failEndAfter,
+ boolean failPrepare, boolean failCommit, boolean failRollback) {
+ mock.errorCode = errorCode;
+ mock.failStartAfter = failStartAfter >= 0 ? failStartAfter : Integer.MAX_VALUE;
+ mock.failSuspendAfter = failSuspendAfter >= 0 ? failSuspendAfter : Integer.MAX_VALUE;
+ mock.failResumeAfter = failResumeAfter >= 0 ? failResumeAfter : Integer.MAX_VALUE;
+ mock.failEndAfter = failEndAfter >= 0 ? failEndAfter : Integer.MAX_VALUE;
+ mock.failPrepare = failPrepare;
+ mock.failCommit = failCommit;
+ mock.failRollback = failRollback;
+ }
+
+ /**
+ * A simple extension to MultiXAResource so we can get at the list of active transactions.
+ */
+ private static class TestMultiXAResource extends MultiXAResource {
+ public TestMultiXAResource(int transactionTimeout, ResolverFactory resolverFactory) {
+ super(transactionTimeout, resolverFactory);
+ }
+
+ public Collection<MultiXAResource.MultiTxInfo> getTxns() {
+ return resourceManager.transactions.values();
+ }
+ }
+
+ /**
+ * A stub/mock XAResource. This verifies state transitions to make sure they follow the rules as
+ * layed out in the X/Open and JTA specs. The prepare-status can be modified to return RDONLY on
+ * prepare.
+ */
+ private static class MockXAResource extends DummyXAResource {
+ protected final ThreadLocal<Xid> currTxn = new ThreadLocal<Xid>();
+
+ public static enum State { IDLE, ACTIVE, SUSPENDED, ENDED, RB_ONLY, PREPARED, COMMITTED,
+ ROLLEDBACK, HEUR, HEUR_DONE };
+ public State state = State.IDLE;
+
+ public int startCnt = 0;
+ public int resumeCnt = 0;
+ public int suspendCnt = 0;
+ public int endCnt = 0;
+ public int prepareCnt = 0;
+ public int commitCnt = 0;
+ public int rollbackCnt = 0;
+
+ public int prepareStatus = XA_OK;
+
+ public void start(Xid xid, int flags) throws XAException {
+ super.start(xid, flags);
+
+ if (currTxn.get() != null) {
+ throw new XAException("transaction already active: " + currTxn.get());
+ }
+ currTxn.set(xid);
+
+ if (flags == XAResource.TMNOFLAGS && state == State.ACTIVE) {
+ throw new XAException("resource already active: " + state);
+ }
+ if (flags == XAResource.TMRESUME && state != State.SUSPENDED) {
+ throw new XAException("resource not suspended: " + state);
+ }
+ state = State.ACTIVE;
+
+ if (flags == XAResource.TMNOFLAGS) startCnt++;
+ if (flags == XAResource.TMRESUME) resumeCnt++;
+ }
+
+ public void end(Xid xid, int flags) throws XAException {
+ super.end(xid, flags);
+
+ if (!(state == State.SUSPENDED && (flags == XAResource.TMSUCCESS || flags == XAResource.TMFAIL))) {
+ if (!xid.equals(currTxn.get())) {
+ throw new XAException("mismatched transaction end");
+ }
+ currTxn.set(null);
+
+ if (state != State.ACTIVE) {
+ throw new XAException("resource not active: " + state);
+ }
+ }
+ state = (flags == XAResource.TMSUSPEND) ? State.SUSPENDED : State.ENDED;
+
+ if (flags == XAResource.TMSUSPEND) suspendCnt++;
+ if (flags != XAResource.TMSUSPEND) endCnt++;
+ }
+
+ public int prepare(Xid xid) throws XAException {
+ super.prepare(xid);
+
+ if (currTxn.get() != null) {
+ throw new XAException("transaction still active: " + currTxn.get());
+ }
+ if (state != State.ENDED) {
+ throw new XAException("resource not ended: " + state);
+ }
+ state = (prepareStatus == XA_OK) ? State.PREPARED : State.ROLLEDBACK;
+
+ prepareCnt++;
+ return prepareStatus;
+ }
+
+ public void commit(Xid xid, boolean onePhase) throws XAException {
+ super.commit(xid, onePhase);
+
+ if (currTxn.get() != null) {
+ throw new XAException("transaction still active: " + currTxn.get());
+ }
+
+ if (state != State.HEUR) {
+ if (onePhase && state != State.ENDED) {
+ throw new XAException("resource not ended: " + state);
+ }
+ if (!onePhase && state != State.PREPARED) {
+ throw new XAException("resource not prepared: " + state);
+ }
+ state = State.COMMITTED;
+ }
+
+ commitCnt++;
+ }
+
+ public void rollback(Xid xid) throws XAException {
+ super.rollback(xid);
+
+ if (currTxn.get() != null) throw new XAException("transaction still active: " + currTxn.get());
+
+ if (state != State.HEUR) {
+ if (state != State.ENDED && state != State.RB_ONLY && state != State.PREPARED) {
+ throw new XAException("resource not ended or prepared: " + state);
+ }
+ state = State.ROLLEDBACK;
+ }
+
+ rollbackCnt++;
+ }
+
+ public void forget(Xid xid) throws XAException {
+ super.forget(xid);
+
+ if (state != State.HEUR) throw new XAException("transaction not heuristically completed: " + state);
+ state = State.HEUR_DONE;
+ }
+
+ public void reset() {
+ state = State.IDLE;
+ startCnt = 0;
+ resumeCnt = 0;
+ suspendCnt = 0;
+ endCnt = 0;
+ prepareCnt = 0;
+ commitCnt = 0;
+ rollbackCnt = 0;
+ prepareStatus = XA_OK;
+ currTxn.set(null);
+ }
+ }
+
+ /**
+ * This extends MockXAResource to be able to force failures at various points. The fail* and
+ * errorCode fields can be set to control the behaviour.
+ */
+ private static class MockFailingXAResource extends MockXAResource {
+ public int failStartAfter = Integer.MAX_VALUE;
+ public int failSuspendAfter = Integer.MAX_VALUE;
+ public int failResumeAfter = Integer.MAX_VALUE;
+ public int failEndAfter = Integer.MAX_VALUE;
+ public int errorCode = 0;
+ public boolean failPrepare = false;
+ public boolean failCommit = false;
+ public boolean failRollback = false;
+
+ public void start(Xid xid, int flags) throws XAException {
+ super.start(xid, flags);
+ if (startCnt >= failStartAfter || resumeCnt >= failResumeAfter) {
+ currTxn.set(null);
+ state = State.RB_ONLY;
+ fail("start");
+ }
+ }
+
+ public void end(Xid xid, int flags) throws XAException {
+ super.end(xid, flags);
+ if (endCnt >= failEndAfter || suspendCnt >= failSuspendAfter) {
+ state = State.RB_ONLY;
+ fail("end");
+ }
+ }
+
+ public int prepare(Xid xid) throws XAException {
+ int sts = super.prepare(xid);
+ if (failPrepare) {
+ if (isRollback(errorCode)) state = State.ROLLEDBACK;
+ else state = State.RB_ONLY;
+ fail("prepare");
+ }
+ return sts;
+ }
+
+ public void commit(Xid xid, boolean onePhase) throws XAException {
+ super.commit(xid, onePhase);
+ if (failCommit) {
+ if (isHeuristic(errorCode)) state = State.HEUR;
+ if (isRollback(errorCode)) state = State.ROLLEDBACK;
+ fail("commit");
+ }
+ }
+
+ public void rollback(Xid xid) throws XAException {
+ super.rollback(xid);
+ if (failRollback) {
+ if (isHeuristic(errorCode)) state = State.HEUR;
+ fail("rollback");
+ }
+ }
+
+ private void fail(String op) throws XAException {
+ if (errorCode == XAException.XAER_RMFAIL || errorCode == XAException.XAER_NOTA) {
+ state = State.ROLLEDBACK;
+ }
+ throw (errorCode != 0) ? new XAException(errorCode) : new XAException("Testing " + op + " failure");
+ }
+
+ public void reset() {
+ super.reset();
+ failStartAfter = Integer.MAX_VALUE;
+ failSuspendAfter = Integer.MAX_VALUE;
+ failResumeAfter = Integer.MAX_VALUE;
+ failEndAfter = Integer.MAX_VALUE;
+ errorCode = 0;
+ failPrepare = false;
+ failCommit = false;
+ failRollback = false;
+ }
+ }
+
+ /**
+ * Basic Xid implementation.
+ */
+ private static class TestXid implements Xid {
+ private int xid;
+ public TestXid(int xid) {
+ this.xid = xid;
+ }
+
+ public int getFormatId() {
+ return 'X';
+ }
+
+ public byte[] getBranchQualifier() {
+ return new byte[] {
+ (byte)(xid >> 0x00),
+ (byte)(xid >> 0x08)
+ };
+ }
+
+ public byte[] getGlobalTransactionId() {
+ return new byte[] {
+ (byte)(xid >> 0x10),
+ (byte)(xid >> 0x18)
+ };
+ }
+ }
+
+ /**
+ * Just a dummy - nothing is ever called.
+ */
+ private static class DummyResolverFactory implements ResolverFactory {
+ public void close() { }
+ public void delete() { }
+ public Graph[] getDefaultGraphs() { return null; }
+ public boolean supportsExport() { return true; }
+ public Resolver newResolver(boolean canWrite, ResolverSession resolverSession, Resolver systemResolver) { return null; }
+ }
+
+ /**
+ * Fail with an unexpected exception
+ */
+ private void fail(Throwable throwable) {
+ StringWriter stringWriter = new StringWriter();
+ throwable.printStackTrace(new PrintWriter(stringWriter));
+ fail(stringWriter.toString());
+ }
+}
Property changes on: trunk/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/MultiXAResourceUnitTest.java
___________________________________________________________________
Name: svn:keywords
+ Id HeadURL Revision
Name: svn:eol-style
+ native
Modified: trunk/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/NetworkDelegator.java
===================================================================
--- trunk/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/NetworkDelegator.java 2008-10-23 13:12:35 UTC (rev 1356)
+++ trunk/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/NetworkDelegator.java 2008-10-23 13:12:41 UTC (rev 1357)
@@ -47,6 +47,8 @@
import java.net.URISyntaxException;
import java.util.*;
+import javax.transaction.xa.XAException;
+
/**
* Resolve a constraint across a socket.
*
@@ -61,8 +63,14 @@
private static Logger logger = Logger.getLogger(NetworkDelegator.class.getName());
/** The session to delegate resolutions through. */
- private ResolverSession session;
+ private final ResolverSession session;
+ /** Whether the current transaction is r/w or r/o. */
+ private final boolean forWrite;
+
+ /** The transaction coordinator with which to register new XAResource's */
+ private final TransactionCoordinator txCord;
+
/** A cache of distributed sessions. */
private Map<URI,Session> sessionCache = new HashMap<URI,Session>();
@@ -76,9 +84,13 @@
/**
* Constructs a delegator, using a given session.
* @param session The session to delegate resolution through.
+ * @param forWrite Whether to open this for writes or for read-only
+ * @param txCord the transaction-coordinator being used
*/
- public NetworkDelegator(ResolverSession session) {
+ public NetworkDelegator(ResolverSession session, boolean forWrite, TransactionCoordinator txCord) {
this.session = session;
+ this.forWrite = forWrite;
+ this.txCord = txCord;
}
@@ -316,14 +328,22 @@
// The factory won't be in the cache, as a corresponding session would have already been created.
SessionFactory sessionFactory = SessionFactoryFinder.newSessionFactory(serverUri, true);
factoryCache.add(sessionFactory);
+
// now create the session
Session session = sessionFactory.newSession();
sessionCache.put(serverUri, session);
+
+ // get the XAResource and enlist it
+ txCord.enlistResource(forWrite ? session.getXAResource() : session.getReadOnlyXAResource());
+
+ // done
return session;
} catch (NonRemoteSessionException nrse) {
throw new QueryException("State Error: non-local URI was mapped to a local session", nrse);
} catch (SessionFactoryFinderException sffe) {
throw new QueryException("Unable to get a session to the server", sffe);
+ } catch (XAException xae) {
+ throw new QueryException("Error enlisting xaresource", xae);
}
}
Added: trunk/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/TransactionCoordinator.java
===================================================================
--- trunk/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/TransactionCoordinator.java (rev 0)
+++ trunk/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/TransactionCoordinator.java 2008-10-23 13:12:41 UTC (rev 1357)
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2008 The Topaz Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ * Contributions:
+ */
+
+package org.mulgara.resolver.distributed;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+
+/**
+ * Interface representating a transaction coordinator for the backends.
+ *
+ * @created 2008-02-16
+ * @author Ronald Tschalär
+ * @copyright ©2008 <a href="http://www.topazproject.org/">Topaz Project</a>
+ * @licence Apache License v2.0
+ */
+public interface TransactionCoordinator {
+ /**
+ * Enlist the given resource in the current transaction.
+ *
+ * @param xares the XAResource to enlist
+ * @throws XAException if a problem occurs during enlistment
+ */
+ public void enlistResource(XAResource xares) throws XAException;
+}
Property changes on: trunk/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/TransactionCoordinator.java
___________________________________________________________________
Name: svn:keywords
+ Id HeadURL Revision
Name: svn:eol-style
+ native
More information about the Mulgara-svn
mailing list