[Mulgara-svn] r587 - in branches/mgr-73: . src/jar/resolver/java/org/mulgara/resolver src/jar/server-rmi/java/org/mulgara/server/rmi
andrae at mulgara.org
andrae at mulgara.org
Thu Nov 29 06:04:19 UTC 2007
Author: andrae
Date: 2007-11-29 00:04:18 -0600 (Thu, 29 Nov 2007)
New Revision: 587
Added:
branches/mgr-73/ExternalTransactionUnitTest.java
Modified:
branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraXAResource.java
branches/mgr-73/src/jar/server-rmi/java/org/mulgara/server/rmi/RemoteXAResourceWrapperXAResource.java
Log:
Bizzarely javax.transaction.xa.Xid doesn't implement Serializable, so we have to
convert all Xid's received from the client into a trivial Xid class that does -
this allows the RMI layer to Marshall them appropriately.
Java doesn't provide any way of requiring that Xid implement equals/hashCode to
operate by value, not reference - and bizzarely JTA doesn't require this either.
Hence MuglaraXAResource also has to wrap the received Xid in a trivial class
that does - this allows us to use them in Sets and Maps to track transactions.
Finally I have added a manual testcase to demonstrate the XAResource working
correctly over RMI. Currently checked into the toplevel directory - once I get
the test infrastructure correct I'll move it somewhere else. The issue is the
need to 1) run mulgara in a seperate JVM to ensure the RMI code works; 2)
perform arbitary java operations to simulate the presence of a TM (precluding
use of a jxtest). 3) integrate properly with the existing test reporting.
Added: branches/mgr-73/ExternalTransactionUnitTest.java
===================================================================
--- branches/mgr-73/ExternalTransactionUnitTest.java 2007-11-28 23:51:37 UTC (rev 586)
+++ branches/mgr-73/ExternalTransactionUnitTest.java 2007-11-29 06:04:18 UTC (rev 587)
@@ -0,0 +1,288 @@
+// Java 2 standard packages
+import java.io.File;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+// Third party packages
+
+// Locally written packages
+import org.mulgara.query.*;
+import org.mulgara.query.rdf.Mulgara;
+import org.mulgara.query.rdf.URIReferenceImpl;
+import org.mulgara.query.rdf.TripleImpl;
+import org.mulgara.server.Session;
+import org.mulgara.server.SessionFactory;
+import org.mulgara.server.driver.SessionFactoryFinder;
+import org.mulgara.util.FileUtil;
+
+public class ExternalTransactionUnitTest
+{
+ private static final URI databaseURI;
+
+ private static final URI systemModelURI;
+
+ private static final URI modelURI;
+ private static final URI model2URI;
+ private static final URI model3URI;
+ private static final URI model4URI;
+ private static final URI model5URI;
+
+ static {
+ try {
+ databaseURI = new URI("rmi://localhost/server1");
+ systemModelURI = new URI("rmi://localhost/server1#");
+ modelURI = new URI("rmi://localhost/server1#model");
+ model2URI = new URI("rmi://localhost/server1#model2");
+ model3URI = new URI("rmi://localhost/server1#model3");
+ model4URI = new URI("rmi://localhost/server1#model4");
+ model5URI = new URI("rmi://localhost/server1#model5");
+ } catch (URISyntaxException e) {
+ throw new Error("Bad hardcoded URI", e);
+ }
+ }
+
+ private SessionFactory sessionFactory;
+
+ public ExternalTransactionUnitTest() { }
+
+ public static void main(String[] args) throws Exception {
+ new ExternalTransactionUnitTest().run();
+ }
+
+ public void run() throws Exception{
+ System.out.println("Doing setup");
+ setup();
+ System.out.println("Doing OnePhase commit");
+ testSimpleOnePhaseCommit();
+ System.out.println("Doing TwoPhase commit");
+ testSimpleTwoPhaseCommit();
+ System.out.println("Doing Query");
+ testBasicQuery();
+ System.out.println("Finished Tests");
+ }
+
+ public void setup() throws Exception {
+ this.sessionFactory = SessionFactoryFinder.newSessionFactory(databaseURI);
+ }
+
+ //
+ // Test cases
+ //
+
+ private static class TestXid implements Xid {
+ private int xid;
+ public TestXid(int xid) {
+ this.xid = xid;
+ }
+
+ public int getFormatId() {
+ return 'X';
+ }
+
+ public byte[] getBranchQualifier() {
+ return new byte[] {
+ (byte)(xid >> 0x00),
+ (byte)(xid >> 0x08)
+ };
+ }
+
+ public byte[] getGlobalTransactionId() {
+ return new byte[] {
+ (byte)(xid >> 0x10),
+ (byte)(xid >> 0x18)
+ };
+ }
+ }
+
+ /**
+ * Test the {@link DatabaseSession#create} method.
+ * As a side-effect, creates the model required by the next tests.
+ */
+ public void testSimpleOnePhaseCommit() throws URISyntaxException
+ {
+ try {
+ Session session = sessionFactory.newSession();
+ XAResource resource = session.getXAResource();
+ Xid xid = new TestXid(1);
+ resource.start(xid, XAResource.TMNOFLAGS);
+ try {
+ session.createModel(modelURI, null);
+ resource.end(xid, XAResource.TMSUCCESS);
+ resource.commit(xid, true);
+ } finally {
+ session.close();
+ }
+ } catch (Exception e) {
+ fail(e);
+ }
+ }
+
+
+ /**
+ * Test two phase commit.
+ * As a side-effect, loads the model required by the next tests.
+ */
+ public void testSimpleTwoPhaseCommit() throws URISyntaxException
+ {
+ URI fileURI = new File("data/xatest-model1.rdf").toURI();
+
+ try {
+ Session session = sessionFactory.newSession();
+ XAResource resource = session.getXAResource();
+ Xid xid = new TestXid(1);
+ resource.start(xid, XAResource.TMNOFLAGS);
+ try {
+ session.setModel(modelURI, new ModelResource(fileURI));
+ resource.end(xid, XAResource.TMSUCCESS);
+ resource.prepare(xid);
+ resource.commit(xid, false);
+ } finally {
+ session.close();
+ }
+ } catch (Exception e) {
+ fail(e);
+ }
+ }
+
+
+ public void testBasicQuery() throws URISyntaxException {
+ try {
+ // Load some test data
+ Session session = sessionFactory.newSession();
+ try {
+ XAResource resource = session.getXAResource();
+ Xid xid = new TestXid(1);
+ resource.start(xid, XAResource.TMNOFLAGS);
+
+ Variable subjectVariable = new Variable("subject");
+ Variable predicateVariable = new Variable("predicate");
+ Variable objectVariable = new Variable("object");
+
+ List selectList = new ArrayList(3);
+ selectList.add(subjectVariable);
+ selectList.add(predicateVariable);
+ selectList.add(objectVariable);
+
+ // Evaluate the query
+ Answer answer = session.query(new Query(
+ selectList, // SELECT
+ new ModelResource(modelURI), // FROM
+ new ConstraintImpl(subjectVariable, // WHERE
+ predicateVariable,
+ objectVariable),
+ null, // HAVING
+ Arrays.asList(new Order[] { // ORDER BY
+ new Order(subjectVariable, true),
+ new Order(predicateVariable, true),
+ new Order(objectVariable, true)
+ }),
+ null, // LIMIT
+ 0, // OFFSET
+ new UnconstrainedAnswer() // GIVEN
+ ));
+ String[][] results = {
+ { "test:s01", "test:p01", "test:o01" },
+ { "test:s01", "test:p02", "test:o01" },
+ { "test:s01", "test:p02", "test:o02" },
+ { "test:s01", "test:p03", "test:o02" },
+ { "test:s02", "test:p03", "test:o02" },
+ { "test:s02", "test:p04", "test:o02" },
+ { "test:s02", "test:p04", "test:o03" },
+ { "test:s02", "test:p05", "test:o03" },
+ { "test:s03", "test:p01", "test:o01" },
+ { "test:s03", "test:p05", "test:o03" },
+ { "test:s03", "test:p06", "test:o01" },
+ { "test:s03", "test:p06", "test:o03" },
+ };
+ compareResults(results, answer);
+ answer.close();
+
+ resource.end(xid, XAResource.TMSUCCESS);
+ resource.commit(xid, true);
+ } finally {
+ session.close();
+ }
+ } catch (Exception e) {
+ fail(e);
+ }
+ }
+
+ //
+ // Internal methods
+ //
+
+ private void compareResults(String[][] expected, Answer answer) throws Exception {
+ try {
+ answer.beforeFirst();
+ for (int i = 0; i < expected.length; i++) {
+ assertTrue("Answer short at row " + i, answer.next());
+ assertEquals(expected[i].length, answer.getNumberOfVariables());
+ for (int j = 0; j < expected[i].length; j++) {
+ URIReferenceImpl uri = new URIReferenceImpl(new URI(expected[i][j]));
+ assertEquals(uri, answer.getObject(j));
+ }
+ }
+ assertFalse(answer.next());
+ } catch (Exception e) {
+ System.out.println("Failed test - " + answer);
+ answer.close();
+ throw e;
+ }
+ }
+
+ private void compareResults(Answer answer1, Answer answer2) throws Exception {
+ answer1.beforeFirst();
+ answer2.beforeFirst();
+ assertEquals(answer1.getNumberOfVariables(), answer2.getNumberOfVariables());
+ while (answer1.next()) {
+ assertTrue(answer2.next());
+ for (int i = 0; i < answer1.getNumberOfVariables(); i++) {
+ assertEquals(answer1.getObject(i), answer2.getObject(i));
+ }
+ }
+ assertFalse(answer2.next());
+ }
+
+ private static class AssertionFailedException extends Exception {
+ public AssertionFailedException(String msg) {
+ super(msg);
+ }
+ }
+
+ private void assertTrue(boolean result) throws Exception { assertTrue("", result); }
+ private void assertTrue(String string, boolean result) throws Exception {
+ if (!result) {
+ throw new AssertionFailedException("AssertTrue Failed: " + string);
+ }
+ }
+
+ private void assertFalse(boolean result) throws Exception { assertFalse("", result); }
+ private void assertFalse(String string, boolean result) throws Exception {
+ if (result) {
+ throw new AssertionFailedException("AssertFalse Failed: " + string);
+ }
+ }
+
+ private void assertEquals(Object lhs, Object rhs) throws Exception {
+ if (!lhs.equals(rhs)) {
+ throw new AssertionFailedException("AssertEquals Failed: " + lhs + " , " + rhs);
+ }
+ }
+
+ /**
+ * Fail with an unexpected exception
+ */
+ private void fail(Throwable throwable)
+ {
+ StringWriter stringWriter = new StringWriter();
+ throwable.printStackTrace(new PrintWriter(stringWriter));
+ System.out.println(stringWriter.toString());
+ }
+}
Modified: branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraXAResource.java
===================================================================
--- branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraXAResource.java 2007-11-28 23:51:37 UTC (rev 586)
+++ branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraXAResource.java 2007-11-29 06:04:18 UTC (rev 587)
@@ -19,6 +19,7 @@
package org.mulgara.resolver;
// Java2 packages
+import java.util.Arrays;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
@@ -55,6 +56,7 @@
private Assoc1toNMap<MulgaraExternalTransaction, Xid> xa2xid;
MulgaraXAResource(MulgaraExternalTransactionFactory factory, DatabaseSession session) {
+ logger.info("Creating MulgaraXAResource");
this.factory = factory;
this.session = session;
this.xa2xid = new Assoc1toNMap<MulgaraExternalTransaction, Xid>();
@@ -70,6 +72,8 @@
* for a call to forget().
*/
public void commit(Xid xid, boolean onePhase) throws XAException {
+ xid = convertXid(xid);
+ logger.info("Performing commit: " + parseXid(xid));
MulgaraExternalTransaction xa = xa2xid.get1(xid);
if (xa == null) {
throw new XAException(XAException.XAER_NOTA);
@@ -119,6 +123,8 @@
* In all cases disassociate from current session.
*/
public void end(Xid xid, int flags) throws XAException {
+ xid = convertXid(xid);
+ logger.info("Performing end: " + parseXid(xid));
MulgaraExternalTransaction xa = xa2xid.get1(xid);
if (xa == null) {
throw new XAException(XAException.XAER_NOTA);
@@ -145,6 +151,8 @@
}
public void forget(Xid xid) throws XAException {
+ xid = convertXid(xid);
+ logger.info("Performing forget: " + parseXid(xid));
MulgaraExternalTransaction xa = xa2xid.get1(xid);
if (xa == null) {
throw new XAException(XAException.XAER_NOTA);
@@ -164,10 +172,12 @@
}
public int getTransactionTimeout() {
+ logger.info("Performing getTransactionTimeout");
return 3600;
}
public boolean isSameRM(XAResource xares) {
+ logger.info("Performing isSameRM");
if (!xares.getClass().equals(MulgaraXAResource.class)) {
return false;
} else {
@@ -180,6 +190,8 @@
public int prepare(Xid xid) throws XAException {
+ xid = convertXid(xid);
+ logger.info("Performing prepare: " + parseXid(xid));
MulgaraExternalTransaction xa = xa2xid.get1(xid);
if (xa == null) {
throw new XAException(XAException.XAER_NOTA);
@@ -198,11 +210,14 @@
* when we haven't crashed.
*/
public Xid[] recover(int flag) {
+ logger.info("Performing recover");
return new Xid[] {};
}
public void rollback(Xid xid) throws XAException {
+ xid = convertXid(xid);
+ logger.info("Performing rollback: " + parseXid(xid));
MulgaraExternalTransaction xa = xa2xid.get1(xid);
if (xa == null) {
throw new XAException(XAException.XAER_NOTA);
@@ -213,10 +228,13 @@
public boolean setTransactionTimeout(int seconds) {
+ logger.info("Performing setTransactionTimeout");
return false;
}
public void start(Xid xid, int flags) throws XAException {
+ xid = convertXid(xid);
+ logger.info("Performing start: " + parseXid(xid));
switch (flags) {
case TMNOFLAGS:
if (xa2xid.containsN(xid)) {
@@ -262,6 +280,63 @@
* Called on session close to clean-up any remaining transactions.
*/
public void close() {
+ logger.info("Performing close");
factory.closingSession(session);
}
+
+ public static String parseXid(Xid xid) {
+ return xid.toString();
+ }
+
+ private InternalXid convertXid(Xid xid) {
+ return new InternalXid(xid);
+ }
+
+ private static class InternalXid implements Xid {
+ private byte[] bq;
+ private int fi;
+ private byte[] gtid;
+
+ public InternalXid(Xid xid) {
+ byte[] tbq = xid.getBranchQualifier();
+ byte[] tgtid = xid.getGlobalTransactionId();
+ this.bq = new byte[tbq.length];
+ this.fi = xid.getFormatId();
+ this.gtid = new byte[tgtid.length];
+ System.arraycopy(tbq, 0, this.bq, 0, tbq.length);
+ System.arraycopy(tgtid, 0, this.gtid, 0, tgtid.length);
+ }
+
+ public byte[] getBranchQualifier() {
+ return bq;
+ }
+
+ public int getFormatId() {
+ return fi;
+ }
+
+ public byte[] getGlobalTransactionId() {
+ return gtid;
+ }
+
+ public int hashCode() {
+ return Arrays.hashCode(bq) ^ fi ^ Arrays.hashCode(gtid);
+ }
+
+ public boolean equals(Object rhs) {
+ if (!(rhs instanceof InternalXid)) {
+ return false;
+ } else {
+ InternalXid rhx = (InternalXid)rhs;
+ return this.fi == rhx.fi &&
+ Arrays.equals(this.bq, rhx.bq) &&
+ Arrays.equals(this.gtid, rhx.gtid);
+ }
+ }
+
+ public String toString() {
+ return ":" + fi + ":" + Arrays.toString(gtid) + ":" + Arrays.toString(bq) + ":";
+ }
+ }
+
}
Modified: branches/mgr-73/src/jar/server-rmi/java/org/mulgara/server/rmi/RemoteXAResourceWrapperXAResource.java
===================================================================
--- branches/mgr-73/src/jar/server-rmi/java/org/mulgara/server/rmi/RemoteXAResourceWrapperXAResource.java 2007-11-28 23:51:37 UTC (rev 586)
+++ branches/mgr-73/src/jar/server-rmi/java/org/mulgara/server/rmi/RemoteXAResourceWrapperXAResource.java 2007-11-29 06:04:18 UTC (rev 587)
@@ -1,8 +1,10 @@
package org.mulgara.server.rmi;
// Java 2 standard packages
+import java.io.Serializable;
import java.rmi.RemoteException;
import java.rmi.server.UnicastRemoteObject;
+import java.util.Arrays;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
@@ -47,7 +49,7 @@
public void commit(Xid xid, boolean onePhase) throws XAException {
try {
- remoteResource.commit(xid, onePhase);
+ remoteResource.commit(convertXid(xid), onePhase);
} catch (RemoteException re) {
logger.warn("RMI Error in XAResource", re);
throw new XAException(XAException.XAER_RMFAIL);
@@ -56,7 +58,7 @@
public void end(Xid xid, int flags) throws XAException {
try {
- remoteResource.end(xid, flags);
+ remoteResource.end(convertXid(xid), flags);
} catch (RemoteException re) {
logger.warn("RMI Error in XAResource", re);
throw new XAException(XAException.XAER_RMFAIL);
@@ -65,7 +67,7 @@
public void forget(Xid xid) throws XAException {
try {
- remoteResource.forget(xid);
+ remoteResource.forget(convertXid(xid));
} catch (RemoteException re) {
logger.warn("RMI Error in XAResource", re);
throw new XAException(XAException.XAER_RMFAIL);
@@ -92,7 +94,7 @@
public int prepare(Xid xid) throws XAException {
try {
- return remoteResource.prepare(xid);
+ return remoteResource.prepare(convertXid(xid));
} catch (RemoteException re) {
logger.warn("RMI Error in XAResource", re);
throw new XAException(XAException.XAER_RMFAIL);
@@ -110,7 +112,7 @@
public void rollback(Xid xid) throws XAException {
try {
- remoteResource.rollback(xid);
+ remoteResource.rollback(convertXid(xid));
} catch (RemoteException re) {
logger.warn("RMI Error in XAResource", re);
throw new XAException(XAException.XAER_RMFAIL);
@@ -128,10 +130,42 @@
public void start(Xid xid, int flags) throws XAException {
try {
- remoteResource.start(xid, flags);
+ remoteResource.start(convertXid(xid), flags);
} catch (RemoteException re) {
logger.warn("RMI Error in XAResource", re);
throw new XAException(XAException.XAER_RMFAIL);
}
}
+
+ private SerializableXid convertXid(Xid xid) {
+ return new SerializableXid(xid);
+ }
+
+ private static class SerializableXid implements Xid, Serializable {
+ private byte[] bq;
+ private int fi;
+ private byte[] gtid;
+
+ public SerializableXid(Xid xid) {
+ byte[] tbq = xid.getBranchQualifier();
+ byte[] tgtid = xid.getGlobalTransactionId();
+ this.bq = new byte[tbq.length];
+ this.fi = xid.getFormatId();
+ this.gtid = new byte[tgtid.length];
+ System.arraycopy(tbq, 0, this.bq, 0, tbq.length);
+ System.arraycopy(tgtid, 0, this.gtid, 0, tgtid.length);
+ }
+
+ public byte[] getBranchQualifier() {
+ return bq;
+ }
+
+ public int getFormatId() {
+ return fi;
+ }
+
+ public byte[] getGlobalTransactionId() {
+ return gtid;
+ }
+ }
}
More information about the Mulgara-svn
mailing list