[Mulgara-svn] r588 - in branches/mgr-73/src/jar: query/java/org/mulgara/server resolver/java/org/mulgara/resolver server-beep/java/org/mulgara/server/beep server-rmi/java/org/mulgara/server/rmi
andrae at mulgara.org
andrae at mulgara.org
Fri Nov 30 05:31:17 UTC 2007
Author: andrae
Date: 2007-11-29 23:31:16 -0600 (Thu, 29 Nov 2007)
New Revision: 588
Added:
branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraXAResourceContext.java
Removed:
branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraXAResource.java
Modified:
branches/mgr-73/src/jar/query/java/org/mulgara/server/Session.java
branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/DatabaseSession.java
branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/ExternalTransactionUnitTest.java
branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransactionFactory.java
branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionFactory.java
branches/mgr-73/src/jar/server-beep/java/org/mulgara/server/beep/BEEPSession.java
branches/mgr-73/src/jar/server-rmi/java/org/mulgara/server/rmi/RemoteSession.java
branches/mgr-73/src/jar/server-rmi/java/org/mulgara/server/rmi/RemoteSessionWrapperSession.java
branches/mgr-73/src/jar/server-rmi/java/org/mulgara/server/rmi/SessionWrapperRemoteSession.java
Log:
Adds support for read-only transactions to the JTA interface.
The way JTA works requires that the decision to use ro/rw has to be made prior
to any transaction initiation, and moreover that multiple transctions may be
started for a given choice. As a result the ro/rw designation needs to be
carried within the XAResource itself.
However for the resources to manage the transactions correctly they need to
share the various datastructures involved. Hence the decision to introduce a
Context object, and to make the MulgaraXAResource an inner-class of the context.
Almost all the functionality lives in the inner-class; almost all the data lives
in the context.
Modified: branches/mgr-73/src/jar/query/java/org/mulgara/server/Session.java
===================================================================
--- branches/mgr-73/src/jar/query/java/org/mulgara/server/Session.java 2007-11-29 06:04:18 UTC (rev 587)
+++ branches/mgr-73/src/jar/query/java/org/mulgara/server/Session.java 2007-11-30 05:31:16 UTC (rev 588)
@@ -309,8 +309,11 @@
*
* Use of this method is incompatible with any use of implicit or internally
* mediated transactions with this Session.
+ * Transactions initiated from the XAResource returned by the read-only
+ * version of this method will be read-only.
*/
public XAResource getXAResource() throws QueryException;
+ public XAResource getReadOnlyXAResource() throws QueryException;
/**
* This class is just a devious way to get static initialization for the
Modified: branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/DatabaseSession.java
===================================================================
--- branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/DatabaseSession.java 2007-11-29 06:04:18 UTC (rev 587)
+++ branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/DatabaseSession.java 2007-11-30 05:31:16 UTC (rev 588)
@@ -561,18 +561,18 @@
public void close() throws QueryException {
logger.info("Closing session");
try {
- if (transactionFactory != null) {
- transactionFactory.closingSession(this);
- }
- } catch (MulgaraTransactionException em) {
- logger.error("Error closing session", em);
- throw new QueryException("Error closing session. Forced close required", em);
+ transactionManager.closingSession(this);
+ } catch (MulgaraTransactionException em2) {
+ logger.error("Error force-closing session", em2);
+ throw new QueryException("Error force-closing session.", em2);
} finally {
try {
- transactionManager.closingSession(this);
- } catch (MulgaraTransactionException em2) {
- logger.error("Error force-closing session", em2);
- throw new QueryException("Error force-closing session.", em2);
+ if (transactionFactory != null) {
+ transactionFactory.closingSession(this);
+ }
+ } catch (MulgaraTransactionException em) {
+ logger.error("Error closing session", em);
+ throw new QueryException("Error closing session. Forced close required", em);
}
}
}
@@ -693,6 +693,11 @@
public XAResource getXAResource() throws QueryException {
assertExternallyManagedXA();
- return externalFactory.getXAResource(this);
+ return externalFactory.getXAResource(this, true);
}
+
+ public XAResource getReadOnlyXAResource() throws QueryException {
+ assertExternallyManagedXA();
+ return externalFactory.getXAResource(this, false);
+ }
}
Modified: branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/ExternalTransactionUnitTest.java
===================================================================
--- branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/ExternalTransactionUnitTest.java 2007-11-29 06:04:18 UTC (rev 587)
+++ branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/ExternalTransactionUnitTest.java 2007-11-30 05:31:16 UTC (rev 588)
@@ -28,6 +28,7 @@
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
@@ -330,6 +331,70 @@
}
}
+ public void testConcurrentQuery() throws URISyntaxException {
+ logger.info("Testing concurrentQuery");
+
+ try {
+ // Load some test data
+ Session session = database.newSession();
+ XAResource resource = session.getXAResource();
+ Xid xid = new TestXid(1);
+ resource.start(xid, XAResource.TMNOFLAGS);
+ try {
+ Variable subjectVariable = new Variable("subject");
+ Variable predicateVariable = new Variable("predicate");
+ Variable objectVariable = new Variable("object");
+
+ List selectList = new ArrayList(3);
+ selectList.add(subjectVariable);
+ selectList.add(predicateVariable);
+ selectList.add(objectVariable);
+
+ // Evaluate the query
+ Answer answer1 = session.query(new Query(
+ selectList, // SELECT
+ new ModelResource(modelURI), // FROM
+ new ConstraintImpl(subjectVariable, // WHERE
+ predicateVariable,
+ objectVariable),
+ null, // HAVING
+ Collections.singletonList( // ORDER BY
+ new Order(subjectVariable, true)
+ ),
+ null, // LIMIT
+ 0, // OFFSET
+ new UnconstrainedAnswer() // GIVEN
+ ));
+
+ Answer answer2 = session.query(new Query(
+ selectList, // SELECT
+ new ModelResource(modelURI), // FROM
+ new ConstraintImpl(subjectVariable, // WHERE
+ predicateVariable,
+ objectVariable),
+ null, // HAVING
+ Collections.singletonList( // ORDER BY
+ new Order(subjectVariable, true)
+ ),
+ null, // LIMIT
+ 0, // OFFSET
+ new UnconstrainedAnswer() // GIVEN
+ ));
+
+ compareResults(answer1, answer2);
+
+ answer1.close();
+ answer2.close();
+
+ resource.end(xid, XAResource.TMSUCCESS);
+ resource.commit(xid, true);
+ } finally {
+ session.close();
+ }
+ } catch (Exception e) {
+ fail(e);
+ }
+ }
//
// Internal methods
//
Modified: branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransactionFactory.java
===================================================================
--- branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransactionFactory.java 2007-11-29 06:04:18 UTC (rev 587)
+++ branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransactionFactory.java 2007-11-30 05:31:16 UTC (rev 588)
@@ -53,7 +53,7 @@
private Map<DatabaseSession, MulgaraExternalTransaction> associatedTransaction;
private Assoc1toNMap<DatabaseSession, MulgaraExternalTransaction> sessionXAMap;
- private Map<DatabaseSession, MulgaraXAResource> xaResources;
+ private Map<DatabaseSession, MulgaraXAResourceContext> xaResources;
private MulgaraExternalTransaction writeTransaction;
@@ -62,7 +62,7 @@
this.associatedTransaction = new HashMap<DatabaseSession, MulgaraExternalTransaction>();
this.sessionXAMap = new Assoc1toNMap<DatabaseSession, MulgaraExternalTransaction>();
- this.xaResources = new HashMap<DatabaseSession, MulgaraXAResource>();
+ this.xaResources = new HashMap<DatabaseSession, MulgaraXAResourceContext>();
this.writeTransaction = null;
}
@@ -131,33 +131,21 @@
return xas != null ? xas : new HashSet<MulgaraExternalTransaction>();
}
- public XAResource getXAResource(DatabaseSession session) {
+ public XAResource getXAResource(DatabaseSession session, boolean writing) {
acquireMutex();
try {
- MulgaraXAResource xar = xaResources.get(session);
- if (xar == null) {
- xar = new MulgaraXAResource(this, session);
- xaResources.put(session, xar);
+ MulgaraXAResourceContext xarc = xaResources.get(session);
+ if (xarc == null) {
+ xarc = new MulgaraXAResourceContext(this, session);
+ xaResources.put(session, xarc);
}
- return xar;
+ return xarc.getResource(writing);
} finally {
releaseMutex();
}
}
- public void closingSession(DatabaseSession session) {
- acquireMutex();
- try {
- MulgaraXAResource xar = xaResources.remove(session);
- if (xar != null) {
- xar.close();
- }
- } finally {
- releaseMutex();
- }
- }
-
public void transactionComplete(MulgaraExternalTransaction xa)
throws MulgaraTransactionException {
acquireMutex();
Modified: branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionFactory.java
===================================================================
--- branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionFactory.java 2007-11-29 06:04:18 UTC (rev 587)
+++ branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionFactory.java 2007-11-30 05:31:16 UTC (rev 588)
@@ -26,7 +26,6 @@
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
Deleted: branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraXAResource.java
===================================================================
--- branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraXAResource.java 2007-11-29 06:04:18 UTC (rev 587)
+++ branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraXAResource.java 2007-11-30 05:31:16 UTC (rev 588)
@@ -1,342 +0,0 @@
-/*
- * The contents of this file are subject to the Open Software License
- * Version 3.0 (the "License"); you may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- * http://www.rosenlaw.com/OSL3.0.htm
- *
- * Software distributed under the License is distributed on an "AS IS"
- * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
- * the License for the specific language governing rights and limitations
- * under the License.
- *
- * This file is an original work developed by Netymon Pty Ltd
- * (http://www.netymon.com, mailto:mail at netymon.com) under contract to
- * Topaz Foundation. Portions created under this contract are
- * Copyright (c) 2007 Topaz Foundation
- * All Rights Reserved.
- */
-
-package org.mulgara.resolver;
-
-// Java2 packages
-import java.util.Arrays;
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-
-// Third party packages
-import org.apache.log4j.Logger;
-
-// Local packages
-import org.mulgara.query.MulgaraTransactionException;
-import org.mulgara.util.Assoc1toNMap;
-
-/**
- * Provides an external JTA-compliant TransactionManager with the ability to
- * control Mulgara Transactions.
- *
- * @created 2007-11-07
- *
- * @author <a href="mailto:andrae at netymon.com">Andrae Muys</a>
- *
- * @company <A href="mailto:mail at netymon.com">Netymon Pty Ltd</A>
- *
- * @copyright ©2006 <a href="http://www.topazproject.org/">Topaz Project</a>
- *
- * @licence Open Software License v3.0</a>
- */
-
-public class MulgaraXAResource implements XAResource {
- private static final Logger logger =
- Logger.getLogger(MulgaraXAResource.class.getName());
- private MulgaraExternalTransactionFactory factory;
-
- private DatabaseSession session;
-
- private Assoc1toNMap<MulgaraExternalTransaction, Xid> xa2xid;
-
- MulgaraXAResource(MulgaraExternalTransactionFactory factory, DatabaseSession session) {
- logger.info("Creating MulgaraXAResource");
- this.factory = factory;
- this.session = session;
- this.xa2xid = new Assoc1toNMap<MulgaraExternalTransaction, Xid>();
- }
-
- /**
- * Commit transaction identified by xid.
- *
- * Transaction must be Idle, Prepared, or Heuristically-Completed.
- * If transaction not Heuristically-Completed we are required to finish it,
- * clean up, and forget it.
- * If transaction is Heuristically-Completed we throw an exception and wait
- * for a call to forget().
- */
- public void commit(Xid xid, boolean onePhase) throws XAException {
- xid = convertXid(xid);
- logger.info("Performing commit: " + parseXid(xid));
- MulgaraExternalTransaction xa = xa2xid.get1(xid);
- if (xa == null) {
- throw new XAException(XAException.XAER_NOTA);
- } else if (xa.isHeuristicallyRollbacked()) {
- throw new XAException(XAException.XA_HEURRB);
- } else if (xa.isHeuristicallyCommitted()) {
- throw new XAException(XAException.XA_HEURCOM);
- }
-
- if (onePhase) {
- try {
- xa.prepare(xid);
- } catch (XAException ex) {
- if (ex.errorCode != XAException.XA_RDONLY) {
- xa.rollback(xid);
- }
- throw ex;
- }
- }
-
- try {
- xa.commit(xid);
- xa2xid.remove1(xa);
- } catch (XAException ex) {
- // We are not allowed to forget this transaction if we completed
- // heuristically.
- switch (ex.errorCode) {
- case XAException.XA_HEURHAZ:
- case XAException.XA_HEURCOM:
- case XAException.XA_HEURRB:
- case XAException.XA_HEURMIX:
- throw ex;
- default:
- xa2xid.remove1(xa);
- throw ex;
- }
- }
- }
-
- /**
- * Deactivate a transaction.
- *
- * TMSUCCESS: Move to Idle and await call to rollback, prepare, or commit.
- * TMFAIL: Move to RollbackOnly; await call to rollback.
- * TMSUSPEND: Move to Idle and await start(TMRESUME)
- *
- * In all cases disassociate from current session.
- */
- public void end(Xid xid, int flags) throws XAException {
- xid = convertXid(xid);
- logger.info("Performing end: " + parseXid(xid));
- MulgaraExternalTransaction xa = xa2xid.get1(xid);
- if (xa == null) {
- throw new XAException(XAException.XAER_NOTA);
- }
- switch (flags) {
- case TMFAIL:
- xa.rollback(xid);
- break;
- case TMSUCCESS:
- break;
- case TMSUSPEND:
- break;
- default:
- logger.error("Invalid flag passed to end() : " + flags);
- throw new XAException(XAException.XAER_INVAL);
- }
-
- try {
- factory.disassociateTransaction(session, xa);
- } catch (MulgaraTransactionException em) {
- logger.error("Error disassociating transaction from session", em);
- throw new XAException(XAException.XAER_PROTO);
- }
- }
-
- public void forget(Xid xid) throws XAException {
- xid = convertXid(xid);
- logger.info("Performing forget: " + parseXid(xid));
- MulgaraExternalTransaction xa = xa2xid.get1(xid);
- if (xa == null) {
- throw new XAException(XAException.XAER_NOTA);
- }
- try {
- if (!xa.isHeuristicallyRollbacked()) {
- try {
- xa.abortTransaction("External XA Manager specified 'forget'", new Throwable());
- } catch (MulgaraTransactionException em) {
- logger.error("Failed to abort transaction in forget", em);
- throw new XAException(XAException.XAER_RMERR);
- }
- }
- } finally {
- xa2xid.remove1(xa);
- }
- }
-
- public int getTransactionTimeout() {
- logger.info("Performing getTransactionTimeout");
- return 3600;
- }
-
- public boolean isSameRM(XAResource xares) {
- logger.info("Performing isSameRM");
- if (!xares.getClass().equals(MulgaraXAResource.class)) {
- return false;
- } else {
- // Based on X/Open-XA-TP section 3.2 I believe a 'Resource Manager
- // Instance' corresponds to a session, as each session 'supports
- // independent transaction completion'.
- return ((MulgaraXAResource)xares).session == session;
- }
- }
-
-
- public int prepare(Xid xid) throws XAException {
- xid = convertXid(xid);
- logger.info("Performing prepare: " + parseXid(xid));
- MulgaraExternalTransaction xa = xa2xid.get1(xid);
- if (xa == null) {
- throw new XAException(XAException.XAER_NOTA);
- } else if (xa.isRollbacked()) {
- throw new XAException(XAException.XA_RBROLLBACK);
- }
-
- xa.prepare(xid);
-
- return XA_OK;
- }
-
- /**
- * We don't currently support recover.
- * FIXME: We should at least handle the case where we are asked to recover
- * when we haven't crashed.
- */
- public Xid[] recover(int flag) {
- logger.info("Performing recover");
- return new Xid[] {};
- }
-
-
- public void rollback(Xid xid) throws XAException {
- xid = convertXid(xid);
- logger.info("Performing rollback: " + parseXid(xid));
- MulgaraExternalTransaction xa = xa2xid.get1(xid);
- if (xa == null) {
- throw new XAException(XAException.XAER_NOTA);
- }
-
- xa.rollback(xid);
- }
-
-
- public boolean setTransactionTimeout(int seconds) {
- logger.info("Performing setTransactionTimeout");
- return false;
- }
-
- public void start(Xid xid, int flags) throws XAException {
- xid = convertXid(xid);
- logger.info("Performing start: " + parseXid(xid));
- switch (flags) {
- case TMNOFLAGS:
- if (xa2xid.containsN(xid)) {
- throw new XAException(XAException.XAER_DUPID);
- } else if (factory.hasAssociatedTransaction(session)) {
- throw new XAException(XAException.XA_RBDEADLOCK);
- } else {
- // FIXME: Need to consider read-only transactions here.
- try {
- MulgaraExternalTransaction xa = factory.createTransaction(session, xid, true);
- xa2xid.put(xa, xid);
- } catch (MulgaraTransactionException em) {
- logger.error("Failed to create transaction", em);
- throw new XAException(XAException.XAER_RMFAIL);
- }
- }
- break;
- case TMJOIN:
- if (!factory.hasAssociatedTransaction(session)) {
- throw new XAException(XAException.XAER_NOTA);
- } else {
- // FIXME
- throw new XAException("Need to finalise understanding of semantics of TMJOIN");
- }
- //break;
- case TMRESUME:
- MulgaraExternalTransaction xa = xa2xid.get1(xid);
- if (xa == null) {
- throw new XAException(XAException.XAER_NOTA);
- } else if (xa.isRollbacked()) {
- throw new XAException(XAException.XA_RBROLLBACK);
- } else {
- if (!factory.associateTransaction(session, xa)) {
- // session already associated with a transaction.
- throw new XAException(XAException.XAER_PROTO);
- }
- }
- break;
- }
- }
-
- /**
- * Called on session close to clean-up any remaining transactions.
- */
- public void close() {
- logger.info("Performing close");
- factory.closingSession(session);
- }
-
- public static String parseXid(Xid xid) {
- return xid.toString();
- }
-
- private InternalXid convertXid(Xid xid) {
- return new InternalXid(xid);
- }
-
- private static class InternalXid implements Xid {
- private byte[] bq;
- private int fi;
- private byte[] gtid;
-
- public InternalXid(Xid xid) {
- byte[] tbq = xid.getBranchQualifier();
- byte[] tgtid = xid.getGlobalTransactionId();
- this.bq = new byte[tbq.length];
- this.fi = xid.getFormatId();
- this.gtid = new byte[tgtid.length];
- System.arraycopy(tbq, 0, this.bq, 0, tbq.length);
- System.arraycopy(tgtid, 0, this.gtid, 0, tgtid.length);
- }
-
- public byte[] getBranchQualifier() {
- return bq;
- }
-
- public int getFormatId() {
- return fi;
- }
-
- public byte[] getGlobalTransactionId() {
- return gtid;
- }
-
- public int hashCode() {
- return Arrays.hashCode(bq) ^ fi ^ Arrays.hashCode(gtid);
- }
-
- public boolean equals(Object rhs) {
- if (!(rhs instanceof InternalXid)) {
- return false;
- } else {
- InternalXid rhx = (InternalXid)rhs;
- return this.fi == rhx.fi &&
- Arrays.equals(this.bq, rhx.bq) &&
- Arrays.equals(this.gtid, rhx.gtid);
- }
- }
-
- public String toString() {
- return ":" + fi + ":" + Arrays.toString(gtid) + ":" + Arrays.toString(bq) + ":";
- }
- }
-
-}
Copied: branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraXAResourceContext.java (from rev 587, branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraXAResource.java)
===================================================================
--- branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraXAResource.java 2007-11-29 06:04:18 UTC (rev 587)
+++ branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraXAResourceContext.java 2007-11-30 05:31:16 UTC (rev 588)
@@ -0,0 +1,429 @@
+/*
+ * The contents of this file are subject to the Open Software License
+ * Version 3.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ * http://www.rosenlaw.com/OSL3.0.htm
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and limitations
+ * under the License.
+ *
+ * This file is an original work developed by Netymon Pty Ltd
+ * (http://www.netymon.com, mailto:mail at netymon.com) under contract to
+ * Topaz Foundation. Portions created under this contract are
+ * Copyright (c) 2007 Topaz Foundation
+ * All Rights Reserved.
+ */
+
+package org.mulgara.resolver;
+
+// Java2 packages
+import java.util.Arrays;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+// Third party packages
+import org.apache.log4j.Logger;
+
+// Local packages
+import org.mulgara.query.MulgaraTransactionException;
+import org.mulgara.util.Assoc1toNMap;
+
+/**
+ * Provides an external JTA-compliant TransactionManager with the ability to
+ * control Mulgara Transactions.
+ *
+ * @created 2007-11-07
+ *
+ * @author <a href="mailto:andrae at netymon.com">Andrae Muys</a>
+ *
+ * @company <A href="mailto:mail at netymon.com">Netymon Pty Ltd</A>
+ *
+ * @copyright ©2006 <a href="http://www.topazproject.org/">Topaz Project</a>
+ *
+ * @licence Open Software License v3.0</a>
+ */
+
+public class MulgaraXAResourceContext {
+ private static final Logger logger =
+ Logger.getLogger(MulgaraXAResourceContext.class.getName());
+ private MulgaraExternalTransactionFactory factory;
+
+ protected DatabaseSession session;
+
+ private Assoc1toNMap<MulgaraExternalTransaction, Xid> xa2xid;
+
+ private ReentrantLock mutex;
+
+ MulgaraXAResourceContext(MulgaraExternalTransactionFactory factory, DatabaseSession session) {
+ logger.info("Creating MulgaraXAResource");
+ this.factory = factory;
+ this.session = session;
+ this.xa2xid = new Assoc1toNMap<MulgaraExternalTransaction, Xid>();
+ this.mutex = new ReentrantLock();
+ }
+
+ public XAResource getResource(boolean writing) {
+ return new MulgaraXAResource(writing);
+ }
+
+ private class MulgaraXAResource implements XAResource {
+ private final boolean writing;
+
+ public MulgaraXAResource(boolean writing) {
+ this.writing = writing;
+ }
+
+ /**
+ * Commit transaction identified by xid.
+ *
+ * Transaction must be Idle, Prepared, or Heuristically-Completed.
+ * If transaction not Heuristically-Completed we are required to finish it,
+ * clean up, and forget it.
+ * If transaction is Heuristically-Completed we throw an exception and wait
+ * for a call to forget().
+ */
+ public void commit(Xid xid, boolean onePhase) throws XAException {
+ acquireMutex();
+ try {
+ xid = convertXid(xid);
+ logger.info("Performing commit: " + parseXid(xid));
+ MulgaraExternalTransaction xa = xa2xid.get1(xid);
+ if (xa == null) {
+ throw new XAException(XAException.XAER_NOTA);
+ } else if (xa.isHeuristicallyRollbacked()) {
+ throw new XAException(XAException.XA_HEURRB);
+ } else if (xa.isHeuristicallyCommitted()) {
+ throw new XAException(XAException.XA_HEURCOM);
+ }
+
+ if (onePhase) {
+ try {
+ xa.prepare(xid);
+ } catch (XAException ex) {
+ if (ex.errorCode != XAException.XA_RDONLY) {
+ xa.rollback(xid);
+ }
+ throw ex;
+ }
+ }
+
+ try {
+ xa.commit(xid);
+ xa2xid.remove1(xa);
+ } catch (XAException ex) {
+ // We are not allowed to forget this transaction if we completed
+ // heuristically.
+ switch (ex.errorCode) {
+ case XAException.XA_HEURHAZ:
+ case XAException.XA_HEURCOM:
+ case XAException.XA_HEURRB:
+ case XAException.XA_HEURMIX:
+ throw ex;
+ default:
+ xa2xid.remove1(xa);
+ throw ex;
+ }
+ }
+ } finally {
+ releaseMutex();
+ }
+ }
+
+ /**
+ * Deactivate a transaction.
+ *
+ * TMSUCCESS: Move to Idle and await call to rollback, prepare, or commit.
+ * TMFAIL: Move to RollbackOnly; await call to rollback.
+ * TMSUSPEND: Move to Idle and await start(TMRESUME)
+ *
+ * In all cases disassociate from current session.
+ */
+ public void end(Xid xid, int flags) throws XAException {
+ acquireMutex();
+ try {
+ xid = convertXid(xid);
+ logger.info("Performing end: " + parseXid(xid));
+ MulgaraExternalTransaction xa = xa2xid.get1(xid);
+ if (xa == null) {
+ throw new XAException(XAException.XAER_NOTA);
+ }
+ switch (flags) {
+ case TMFAIL:
+ xa.rollback(xid);
+ break;
+ case TMSUCCESS:
+ break;
+ case TMSUSPEND:
+ break;
+ default:
+ logger.error("Invalid flag passed to end() : " + flags);
+ throw new XAException(XAException.XAER_INVAL);
+ }
+
+ try {
+ factory.disassociateTransaction(session, xa);
+ } catch (MulgaraTransactionException em) {
+ logger.error("Error disassociating transaction from session", em);
+ throw new XAException(XAException.XAER_PROTO);
+ }
+ } finally {
+ releaseMutex();
+ }
+ }
+
+ public void forget(Xid xid) throws XAException {
+ acquireMutex();
+ try {
+ xid = convertXid(xid);
+ logger.info("Performing forget: " + parseXid(xid));
+ MulgaraExternalTransaction xa = xa2xid.get1(xid);
+ if (xa == null) {
+ throw new XAException(XAException.XAER_NOTA);
+ }
+ try {
+ if (!xa.isHeuristicallyRollbacked()) {
+ try {
+ xa.abortTransaction("External XA Manager specified 'forget'", new Throwable());
+ } catch (MulgaraTransactionException em) {
+ logger.error("Failed to abort transaction in forget", em);
+ throw new XAException(XAException.XAER_RMERR);
+ }
+ }
+ } finally {
+ xa2xid.remove1(xa);
+ }
+ } finally {
+ releaseMutex();
+ }
+ }
+
+ public int getTransactionTimeout() {
+ acquireMutex();
+ try {
+ logger.info("Performing getTransactionTimeout");
+ return 3600;
+ } finally {
+ releaseMutex();
+ }
+ }
+
+ public boolean isSameRM(XAResource xares) {
+ acquireMutex();
+ try {
+ logger.info("Performing isSameRM");
+ if (!xares.getClass().equals(MulgaraXAResource.class)) {
+ return false;
+ } else {
+ // Based on X/Open-XA-TP section 3.2 I believe a 'Resource Manager
+ // Instance' corresponds to a session, as each session 'supports
+ // independent transaction completion'.
+ return session == ((MulgaraXAResource)xares).getSession();
+ }
+ } finally {
+ releaseMutex();
+ }
+ }
+
+
+ public int prepare(Xid xid) throws XAException {
+ acquireMutex();
+ try {
+ xid = convertXid(xid);
+ logger.info("Performing prepare: " + parseXid(xid));
+ MulgaraExternalTransaction xa = xa2xid.get1(xid);
+ if (xa == null) {
+ throw new XAException(XAException.XAER_NOTA);
+ } else if (xa.isRollbacked()) {
+ throw new XAException(XAException.XA_RBROLLBACK);
+ }
+
+ xa.prepare(xid);
+
+ return XA_OK;
+ } finally {
+ releaseMutex();
+ }
+ }
+
+ /**
+ * We don't currently support recover.
+ * FIXME: We should at least handle the case where we are asked to recover
+ * when we haven't crashed.
+ */
+ public Xid[] recover(int flag) {
+ acquireMutex();
+ try {
+ logger.info("Performing recover");
+ return new Xid[] {};
+ } finally {
+ releaseMutex();
+ }
+ }
+
+
+ public void rollback(Xid xid) throws XAException {
+ acquireMutex();
+ try {
+ xid = convertXid(xid);
+ logger.info("Performing rollback: " + parseXid(xid));
+ MulgaraExternalTransaction xa = xa2xid.get1(xid);
+ if (xa == null) {
+ throw new XAException(XAException.XAER_NOTA);
+ }
+
+ xa.rollback(xid);
+ } finally {
+ releaseMutex();
+ }
+ }
+
+
+ public boolean setTransactionTimeout(int seconds) {
+ acquireMutex();
+ try {
+ logger.info("Performing setTransactionTimeout");
+ return false;
+ } finally {
+ releaseMutex();
+ }
+ }
+
+
+ public void start(Xid xid, int flags) throws XAException {
+ acquireMutex();
+ try {
+ xid = convertXid(xid);
+ logger.info("Performing start: " + parseXid(xid));
+ switch (flags) {
+ case TMNOFLAGS:
+ if (xa2xid.containsN(xid)) {
+ throw new XAException(XAException.XAER_DUPID);
+ } else if (factory.hasAssociatedTransaction(session)) {
+ throw new XAException(XAException.XA_RBDEADLOCK);
+ } else {
+ // FIXME: Need to consider read-only transactions here.
+ try {
+ MulgaraExternalTransaction xa = factory.createTransaction(session, xid, writing);
+ xa2xid.put(xa, xid);
+ } catch (MulgaraTransactionException em) {
+ logger.error("Failed to create transaction", em);
+ throw new XAException(XAException.XAER_RMFAIL);
+ }
+ }
+ break;
+ case TMJOIN:
+ if (!factory.hasAssociatedTransaction(session)) {
+ throw new XAException(XAException.XAER_NOTA);
+ } else {
+ // FIXME
+ throw new XAException("Need to finalise understanding of semantics of TMJOIN");
+ }
+ //break;
+ case TMRESUME:
+ MulgaraExternalTransaction xa = xa2xid.get1(xid);
+ if (xa == null) {
+ throw new XAException(XAException.XAER_NOTA);
+ } else if (xa.isRollbacked()) {
+ throw new XAException(XAException.XA_RBROLLBACK);
+ } else {
+ if (!factory.associateTransaction(session, xa)) {
+ // session already associated with a transaction.
+ throw new XAException(XAException.XAER_PROTO);
+ }
+ }
+ break;
+ }
+ } finally {
+ releaseMutex();
+ }
+ }
+
+ /**
+ * Required only because Java has trouble with accessing fields from
+ * inner-classes.
+ */
+ private DatabaseSession getSession() { return session; }
+ }
+
+ /**
+ * Used to replace the built in monitor to allow it to be properly released
+ * during potentially blocking operations. All potentially blocking
+ * operations involve writes, so in these cases the write-lock is reserved
+ * allowing the mutex to be safely released and then reobtained after the
+ * blocking operation concludes.
+ */
+ protected void acquireMutex() {
+ mutex.lock();
+ }
+
+
+ protected void releaseMutex() {
+ if (!mutex.isHeldByCurrentThread()) {
+ throw new IllegalStateException("Attempt to release mutex without holding mutex");
+ }
+
+ mutex.unlock();
+ }
+
+ public static String parseXid(Xid xid) {
+ return xid.toString();
+ }
+
+ private static InternalXid convertXid(Xid xid) {
+ return new InternalXid(xid);
+ }
+
+ /**
+ * Provides an Xid that compares equal by value.
+ */
+ private static class InternalXid implements Xid {
+ private byte[] bq;
+ private int fi;
+ private byte[] gtid;
+
+ public InternalXid(Xid xid) {
+ byte[] tbq = xid.getBranchQualifier();
+ byte[] tgtid = xid.getGlobalTransactionId();
+ this.bq = new byte[tbq.length];
+ this.fi = xid.getFormatId();
+ this.gtid = new byte[tgtid.length];
+ System.arraycopy(tbq, 0, this.bq, 0, tbq.length);
+ System.arraycopy(tgtid, 0, this.gtid, 0, tgtid.length);
+ }
+
+ public byte[] getBranchQualifier() {
+ return bq;
+ }
+
+ public int getFormatId() {
+ return fi;
+ }
+
+ public byte[] getGlobalTransactionId() {
+ return gtid;
+ }
+
+ public int hashCode() {
+ return Arrays.hashCode(bq) ^ fi ^ Arrays.hashCode(gtid);
+ }
+
+ public boolean equals(Object rhs) {
+ if (!(rhs instanceof InternalXid)) {
+ return false;
+ } else {
+ InternalXid rhx = (InternalXid)rhs;
+ return this.fi == rhx.fi &&
+ Arrays.equals(this.bq, rhx.bq) &&
+ Arrays.equals(this.gtid, rhx.gtid);
+ }
+ }
+
+ public String toString() {
+ return ":" + fi + ":" + Arrays.toString(gtid) + ":" + Arrays.toString(bq) + ":";
+ }
+ }
+}
Modified: branches/mgr-73/src/jar/server-beep/java/org/mulgara/server/beep/BEEPSession.java
===================================================================
--- branches/mgr-73/src/jar/server-beep/java/org/mulgara/server/beep/BEEPSession.java 2007-11-29 06:04:18 UTC (rev 587)
+++ branches/mgr-73/src/jar/server-beep/java/org/mulgara/server/beep/BEEPSession.java 2007-11-30 05:31:16 UTC (rev 588)
@@ -542,4 +542,8 @@
public XAResource getXAResource() throws QueryException {
throw new QueryException("External transactions not implemented under Beep");
}
+
+ public XAResource getReadOnlyXAResource() throws QueryException {
+ throw new QueryException("External transactions not implemented under Beep");
+ }
}
Modified: branches/mgr-73/src/jar/server-rmi/java/org/mulgara/server/rmi/RemoteSession.java
===================================================================
--- branches/mgr-73/src/jar/server-rmi/java/org/mulgara/server/rmi/RemoteSession.java 2007-11-29 06:04:18 UTC (rev 587)
+++ branches/mgr-73/src/jar/server-rmi/java/org/mulgara/server/rmi/RemoteSession.java 2007-11-30 05:31:16 UTC (rev 588)
@@ -312,4 +312,5 @@
* Obtain an XAResource for this session.
*/
public RemoteXAResource getXAResource() throws QueryException, RemoteException;
+ public RemoteXAResource getReadOnlyXAResource() throws QueryException, RemoteException;
}
Modified: branches/mgr-73/src/jar/server-rmi/java/org/mulgara/server/rmi/RemoteSessionWrapperSession.java
===================================================================
--- branches/mgr-73/src/jar/server-rmi/java/org/mulgara/server/rmi/RemoteSessionWrapperSession.java 2007-11-29 06:04:18 UTC (rev 587)
+++ branches/mgr-73/src/jar/server-rmi/java/org/mulgara/server/rmi/RemoteSessionWrapperSession.java 2007-11-30 05:31:16 UTC (rev 588)
@@ -677,4 +677,12 @@
throw new QueryException("Java RMI failure", re);
}
}
+
+ public XAResource getReadOnlyXAResource() throws QueryException {
+ try {
+ return new RemoteXAResourceWrapperXAResource(remoteSession.getReadOnlyXAResource());
+ } catch (RemoteException re){
+ throw new QueryException("Java RMI failure", re);
+ }
+ }
}
Modified: branches/mgr-73/src/jar/server-rmi/java/org/mulgara/server/rmi/SessionWrapperRemoteSession.java
===================================================================
--- branches/mgr-73/src/jar/server-rmi/java/org/mulgara/server/rmi/SessionWrapperRemoteSession.java 2007-11-29 06:04:18 UTC (rev 587)
+++ branches/mgr-73/src/jar/server-rmi/java/org/mulgara/server/rmi/SessionWrapperRemoteSession.java 2007-11-30 05:31:16 UTC (rev 588)
@@ -524,7 +524,15 @@
}
}
+ public RemoteXAResource getReadOnlyXAResource() throws QueryException, RemoteException {
+ try {
+ return new XAResourceWrapperRemoteXAResource(session.getReadOnlyXAResource());
+ } catch (Throwable t) {
+ throw convertToQueryException(t);
+ }
+ }
+
// Construct an exception chain that will pass over RMI.
protected Throwable mapThrowable(Throwable t) {
Throwable cause = t.getCause();
More information about the Mulgara-svn
mailing list