[Mulgara-svn] r587 - in branches/mgr-73: . src/jar/resolver/java/org/mulgara/resolver src/jar/server-rmi/java/org/mulgara/server/rmi

andrae at mulgara.org andrae at mulgara.org
Thu Nov 29 06:04:19 UTC 2007


Author: andrae
Date: 2007-11-29 00:04:18 -0600 (Thu, 29 Nov 2007)
New Revision: 587

Added:
   branches/mgr-73/ExternalTransactionUnitTest.java
Modified:
   branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraXAResource.java
   branches/mgr-73/src/jar/server-rmi/java/org/mulgara/server/rmi/RemoteXAResourceWrapperXAResource.java
Log:
Bizzarely javax.transaction.xa.Xid doesn't implement Serializable, so we have to
convert all Xid's received from the client into a trivial Xid class that does -
this allows the RMI layer to Marshall them appropriately.

Java doesn't provide any way of requiring that Xid implement equals/hashCode to
operate by value, not reference - and bizzarely JTA doesn't require this either.
Hence MuglaraXAResource also has to wrap the received Xid in a trivial class
that does - this allows us to use them in Sets and Maps to track transactions.

Finally I have added a manual testcase to demonstrate the XAResource working
correctly over RMI.  Currently checked into the toplevel directory - once I get
the test infrastructure correct I'll move it somewhere else.  The issue is the
need to 1) run mulgara in a seperate JVM to ensure the RMI code works; 2)
perform arbitary java operations to simulate the presence of a TM (precluding
use of a jxtest). 3) integrate properly with the existing test reporting.



Added: branches/mgr-73/ExternalTransactionUnitTest.java
===================================================================
--- branches/mgr-73/ExternalTransactionUnitTest.java	2007-11-28 23:51:37 UTC (rev 586)
+++ branches/mgr-73/ExternalTransactionUnitTest.java	2007-11-29 06:04:18 UTC (rev 587)
@@ -0,0 +1,288 @@
+// Java 2 standard packages
+import java.io.File;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+// Third party packages
+
+// Locally written packages
+import org.mulgara.query.*;
+import org.mulgara.query.rdf.Mulgara;
+import org.mulgara.query.rdf.URIReferenceImpl;
+import org.mulgara.query.rdf.TripleImpl;
+import org.mulgara.server.Session;
+import org.mulgara.server.SessionFactory;
+import org.mulgara.server.driver.SessionFactoryFinder;
+import org.mulgara.util.FileUtil;
+
+public class ExternalTransactionUnitTest
+{
+  private static final URI databaseURI;
+
+  private static final URI systemModelURI;
+
+  private static final URI modelURI;
+  private static final URI model2URI;
+  private static final URI model3URI;
+  private static final URI model4URI;
+  private static final URI model5URI;
+
+  static {
+    try {
+      databaseURI    = new URI("rmi://localhost/server1");
+      systemModelURI = new URI("rmi://localhost/server1#");
+      modelURI       = new URI("rmi://localhost/server1#model");
+      model2URI      = new URI("rmi://localhost/server1#model2");
+      model3URI      = new URI("rmi://localhost/server1#model3");
+      model4URI      = new URI("rmi://localhost/server1#model4");
+      model5URI      = new URI("rmi://localhost/server1#model5");
+    } catch (URISyntaxException e) {
+      throw new Error("Bad hardcoded URI", e);
+    }
+  }
+
+  private SessionFactory sessionFactory;
+
+  public ExternalTransactionUnitTest() { }
+
+  public static void main(String[] args) throws Exception {
+    new ExternalTransactionUnitTest().run();
+  }
+
+  public void run() throws Exception{
+    System.out.println("Doing setup");
+    setup();
+    System.out.println("Doing OnePhase commit");
+    testSimpleOnePhaseCommit();
+    System.out.println("Doing TwoPhase commit");
+    testSimpleTwoPhaseCommit();
+    System.out.println("Doing Query");
+    testBasicQuery();
+    System.out.println("Finished Tests");
+  }
+
+  public void setup() throws Exception {
+    this.sessionFactory = SessionFactoryFinder.newSessionFactory(databaseURI);
+  }
+
+  //
+  // Test cases
+  //
+
+  private static class TestXid implements Xid {
+    private int xid;
+    public TestXid(int xid) {
+      this.xid = xid;
+    }
+    
+    public int getFormatId() {
+      return 'X';
+    }
+
+    public byte[] getBranchQualifier() {
+      return new byte[] {
+        (byte)(xid >> 0x00),
+        (byte)(xid >> 0x08)
+      };
+    }
+
+    public byte[] getGlobalTransactionId() {
+      return new byte[] {
+        (byte)(xid >> 0x10),
+        (byte)(xid >> 0x18)
+      };
+    }
+  }
+
+  /**
+   * Test the {@link DatabaseSession#create} method.
+   * As a side-effect, creates the model required by the next tests.
+   */
+  public void testSimpleOnePhaseCommit() throws URISyntaxException
+  {
+    try {
+      Session session = sessionFactory.newSession();
+      XAResource resource = session.getXAResource();
+      Xid xid = new TestXid(1);
+      resource.start(xid, XAResource.TMNOFLAGS);
+      try {
+        session.createModel(modelURI, null);
+        resource.end(xid, XAResource.TMSUCCESS);
+        resource.commit(xid, true);
+      } finally {
+        session.close();
+      }
+    } catch (Exception e) {
+      fail(e);
+    }
+  }
+
+
+  /**
+   * Test two phase commit.
+   * As a side-effect, loads the model required by the next tests.
+   */
+  public void testSimpleTwoPhaseCommit() throws URISyntaxException
+  {
+    URI fileURI  = new File("data/xatest-model1.rdf").toURI();
+
+    try {
+      Session session = sessionFactory.newSession();
+      XAResource resource = session.getXAResource();
+      Xid xid = new TestXid(1);
+      resource.start(xid, XAResource.TMNOFLAGS);
+      try {
+        session.setModel(modelURI, new ModelResource(fileURI));
+        resource.end(xid, XAResource.TMSUCCESS);
+        resource.prepare(xid);
+        resource.commit(xid, false);
+      } finally {
+        session.close();
+      }
+    } catch (Exception e) {
+      fail(e);
+    }
+  }
+
+
+  public void testBasicQuery() throws URISyntaxException {
+    try {
+      // Load some test data
+      Session session = sessionFactory.newSession();
+      try {
+        XAResource resource = session.getXAResource();
+        Xid xid = new TestXid(1);
+        resource.start(xid, XAResource.TMNOFLAGS);
+
+        Variable subjectVariable   = new Variable("subject");
+        Variable predicateVariable = new Variable("predicate");
+        Variable objectVariable    = new Variable("object");
+
+        List selectList = new ArrayList(3);
+        selectList.add(subjectVariable);
+        selectList.add(predicateVariable);
+        selectList.add(objectVariable);
+
+        // Evaluate the query
+        Answer answer = session.query(new Query(
+          selectList,                                       // SELECT
+          new ModelResource(modelURI),                      // FROM
+          new ConstraintImpl(subjectVariable,               // WHERE
+                         predicateVariable,
+                         objectVariable),
+          null,                                             // HAVING
+          Arrays.asList(new Order[] {                       // ORDER BY
+            new Order(subjectVariable, true),
+            new Order(predicateVariable, true),
+            new Order(objectVariable, true)
+          }),
+          null,                                             // LIMIT
+          0,                                                // OFFSET
+          new UnconstrainedAnswer()                         // GIVEN
+        ));
+        String[][] results = {
+          { "test:s01", "test:p01", "test:o01" },
+          { "test:s01", "test:p02", "test:o01" },
+          { "test:s01", "test:p02", "test:o02" },
+          { "test:s01", "test:p03", "test:o02" },
+          { "test:s02", "test:p03", "test:o02" },
+          { "test:s02", "test:p04", "test:o02" },
+          { "test:s02", "test:p04", "test:o03" },
+          { "test:s02", "test:p05", "test:o03" },
+          { "test:s03", "test:p01", "test:o01" },
+          { "test:s03", "test:p05", "test:o03" },
+          { "test:s03", "test:p06", "test:o01" },
+          { "test:s03", "test:p06", "test:o03" },
+        };
+        compareResults(results, answer);
+        answer.close();
+
+        resource.end(xid, XAResource.TMSUCCESS);
+        resource.commit(xid, true);
+      } finally {
+        session.close();
+      }
+    } catch (Exception e) {
+      fail(e);
+    }
+  }
+
+  //
+  // Internal methods
+  //
+
+  private void compareResults(String[][] expected, Answer answer) throws Exception {
+    try {
+      answer.beforeFirst();
+      for (int i = 0; i < expected.length; i++) {
+        assertTrue("Answer short at row " + i, answer.next());
+        assertEquals(expected[i].length, answer.getNumberOfVariables());
+        for (int j = 0; j < expected[i].length; j++) {
+          URIReferenceImpl uri = new URIReferenceImpl(new URI(expected[i][j]));
+          assertEquals(uri, answer.getObject(j));
+        }
+      }
+      assertFalse(answer.next());
+    } catch (Exception e) {
+      System.out.println("Failed test - " + answer);
+      answer.close();
+      throw e;
+    }
+  }
+
+  private void compareResults(Answer answer1, Answer answer2) throws Exception {
+    answer1.beforeFirst();
+    answer2.beforeFirst();
+    assertEquals(answer1.getNumberOfVariables(), answer2.getNumberOfVariables());
+    while (answer1.next()) {
+      assertTrue(answer2.next());
+      for (int i = 0; i < answer1.getNumberOfVariables(); i++) {
+        assertEquals(answer1.getObject(i), answer2.getObject(i));
+      }
+    }
+    assertFalse(answer2.next());
+  }
+
+  private static class AssertionFailedException extends Exception {
+    public AssertionFailedException(String msg) {
+      super(msg);
+    }
+  }
+
+  private void assertTrue(boolean result) throws Exception { assertTrue("", result); }
+  private void assertTrue(String string, boolean result) throws Exception {
+    if (!result) {
+      throw new AssertionFailedException("AssertTrue Failed: " + string);
+    }
+  }
+
+  private void assertFalse(boolean result) throws Exception { assertFalse("", result); }
+  private void assertFalse(String string, boolean result) throws Exception {
+    if (result) {
+      throw new AssertionFailedException("AssertFalse Failed: " + string);
+    }
+  }
+
+  private void assertEquals(Object lhs, Object rhs) throws Exception {
+    if (!lhs.equals(rhs)) {
+      throw new AssertionFailedException("AssertEquals Failed: " + lhs + " , " + rhs);
+    }
+  }
+
+  /**
+   * Fail with an unexpected exception
+   */
+  private void fail(Throwable throwable)
+  {
+    StringWriter stringWriter = new StringWriter();
+    throwable.printStackTrace(new PrintWriter(stringWriter));
+    System.out.println(stringWriter.toString());
+  }
+}

Modified: branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraXAResource.java
===================================================================
--- branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraXAResource.java	2007-11-28 23:51:37 UTC (rev 586)
+++ branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraXAResource.java	2007-11-29 06:04:18 UTC (rev 587)
@@ -19,6 +19,7 @@
 package org.mulgara.resolver;
 
 // Java2 packages
+import java.util.Arrays;
 import javax.transaction.xa.XAException;
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
@@ -55,6 +56,7 @@
   private Assoc1toNMap<MulgaraExternalTransaction, Xid> xa2xid;
 
   MulgaraXAResource(MulgaraExternalTransactionFactory factory, DatabaseSession session) {
+    logger.info("Creating MulgaraXAResource");
     this.factory = factory;
     this.session = session;
     this.xa2xid = new Assoc1toNMap<MulgaraExternalTransaction, Xid>();
@@ -70,6 +72,8 @@
    * for a call to forget().
    */
   public void commit(Xid xid, boolean onePhase) throws XAException {
+    xid = convertXid(xid);
+    logger.info("Performing commit: " + parseXid(xid));
     MulgaraExternalTransaction xa = xa2xid.get1(xid);
     if (xa == null) {
       throw new XAException(XAException.XAER_NOTA);
@@ -119,6 +123,8 @@
    * In all cases disassociate from current session.
    */
   public void end(Xid xid, int flags) throws XAException {
+    xid = convertXid(xid);
+    logger.info("Performing end: " + parseXid(xid));
     MulgaraExternalTransaction xa = xa2xid.get1(xid);
     if (xa == null) {
       throw new XAException(XAException.XAER_NOTA);
@@ -145,6 +151,8 @@
   }
 
   public void forget(Xid xid) throws XAException {
+    xid = convertXid(xid);
+    logger.info("Performing forget: " + parseXid(xid));
     MulgaraExternalTransaction xa = xa2xid.get1(xid);
     if (xa == null) {
       throw new XAException(XAException.XAER_NOTA);
@@ -164,10 +172,12 @@
   }
 
   public int getTransactionTimeout() {
+    logger.info("Performing getTransactionTimeout");
     return 3600;
   }
 
   public boolean isSameRM(XAResource xares) {
+    logger.info("Performing isSameRM");
     if (!xares.getClass().equals(MulgaraXAResource.class)) {
       return false;
     } else {
@@ -180,6 +190,8 @@
 
 
   public int prepare(Xid xid) throws XAException {
+    xid = convertXid(xid);
+    logger.info("Performing prepare: " + parseXid(xid));
     MulgaraExternalTransaction xa = xa2xid.get1(xid);
     if (xa == null) {
       throw new XAException(XAException.XAER_NOTA);
@@ -198,11 +210,14 @@
    * when we haven't crashed.
    */
   public Xid[] recover(int flag) {
+    logger.info("Performing recover");
     return new Xid[] {};
   }
 
 
   public void rollback(Xid xid) throws XAException {
+    xid = convertXid(xid);
+    logger.info("Performing rollback: " + parseXid(xid));
     MulgaraExternalTransaction xa = xa2xid.get1(xid);
     if (xa == null) {
       throw new XAException(XAException.XAER_NOTA);
@@ -213,10 +228,13 @@
 
 
   public boolean setTransactionTimeout(int seconds) {
+    logger.info("Performing setTransactionTimeout");
     return false;
   }
 
   public void start(Xid xid, int flags) throws XAException {
+    xid = convertXid(xid);
+    logger.info("Performing start: " + parseXid(xid));
     switch (flags) {
       case TMNOFLAGS:
         if (xa2xid.containsN(xid)) {
@@ -262,6 +280,63 @@
    * Called on session close to clean-up any remaining transactions.
    */
   public void close() {
+    logger.info("Performing close");
     factory.closingSession(session);
   }
+
+  public static String parseXid(Xid xid) {
+    return xid.toString();
+  }
+
+  private InternalXid convertXid(Xid xid) {
+    return new InternalXid(xid);
+  }
+
+  private static class InternalXid implements Xid {
+    private byte[] bq;
+    private int fi;
+    private byte[] gtid;
+
+    public InternalXid(Xid xid) {
+      byte[] tbq = xid.getBranchQualifier();
+      byte[] tgtid = xid.getGlobalTransactionId();
+      this.bq = new byte[tbq.length];
+      this.fi = xid.getFormatId();
+      this.gtid = new byte[tgtid.length];
+      System.arraycopy(tbq, 0, this.bq, 0, tbq.length);
+      System.arraycopy(tgtid, 0, this.gtid, 0, tgtid.length);
+    }
+
+    public byte[] getBranchQualifier() {
+      return bq;
+    }
+
+    public int getFormatId() {
+      return fi;
+    }
+
+    public byte[] getGlobalTransactionId() {
+      return gtid;
+    }
+
+    public int hashCode() {
+      return Arrays.hashCode(bq) ^ fi ^ Arrays.hashCode(gtid);
+    }
+
+    public boolean equals(Object rhs) {
+      if (!(rhs instanceof InternalXid)) {
+        return false;
+      } else {
+        InternalXid rhx = (InternalXid)rhs;
+        return this.fi == rhx.fi &&
+            Arrays.equals(this.bq, rhx.bq) &&
+            Arrays.equals(this.gtid, rhx.gtid);
+      }
+    }
+
+    public String toString() {
+      return ":" + fi + ":" + Arrays.toString(gtid) + ":" + Arrays.toString(bq) + ":";
+    }
+  }
+
 }

Modified: branches/mgr-73/src/jar/server-rmi/java/org/mulgara/server/rmi/RemoteXAResourceWrapperXAResource.java
===================================================================
--- branches/mgr-73/src/jar/server-rmi/java/org/mulgara/server/rmi/RemoteXAResourceWrapperXAResource.java	2007-11-28 23:51:37 UTC (rev 586)
+++ branches/mgr-73/src/jar/server-rmi/java/org/mulgara/server/rmi/RemoteXAResourceWrapperXAResource.java	2007-11-29 06:04:18 UTC (rev 587)
@@ -1,8 +1,10 @@
 package org.mulgara.server.rmi;
 
 // Java 2 standard packages
+import java.io.Serializable;
 import java.rmi.RemoteException;
 import java.rmi.server.UnicastRemoteObject;
+import java.util.Arrays;
 import javax.transaction.xa.XAException;
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
@@ -47,7 +49,7 @@
 
   public void commit(Xid xid, boolean onePhase) throws XAException {
     try {
-      remoteResource.commit(xid, onePhase);
+      remoteResource.commit(convertXid(xid), onePhase);
     } catch (RemoteException re) {
       logger.warn("RMI Error in XAResource", re);
       throw new XAException(XAException.XAER_RMFAIL);
@@ -56,7 +58,7 @@
 
   public void end(Xid xid, int flags) throws XAException {
     try {
-      remoteResource.end(xid, flags);
+      remoteResource.end(convertXid(xid), flags);
     } catch (RemoteException re) {
       logger.warn("RMI Error in XAResource", re);
       throw new XAException(XAException.XAER_RMFAIL);
@@ -65,7 +67,7 @@
 
   public void forget(Xid xid) throws XAException {
     try {
-      remoteResource.forget(xid);
+      remoteResource.forget(convertXid(xid));
     } catch (RemoteException re) {
       logger.warn("RMI Error in XAResource", re);
       throw new XAException(XAException.XAER_RMFAIL);
@@ -92,7 +94,7 @@
 
   public int prepare(Xid xid) throws XAException {
     try {
-      return remoteResource.prepare(xid);
+      return remoteResource.prepare(convertXid(xid));
     } catch (RemoteException re) {
       logger.warn("RMI Error in XAResource", re);
       throw new XAException(XAException.XAER_RMFAIL);
@@ -110,7 +112,7 @@
 
   public void rollback(Xid xid) throws XAException {
     try {
-      remoteResource.rollback(xid);
+      remoteResource.rollback(convertXid(xid));
     } catch (RemoteException re) {
       logger.warn("RMI Error in XAResource", re);
       throw new XAException(XAException.XAER_RMFAIL);
@@ -128,10 +130,42 @@
 
   public void start(Xid xid, int flags) throws XAException {
     try {
-      remoteResource.start(xid, flags);
+      remoteResource.start(convertXid(xid), flags);
     } catch (RemoteException re) {
       logger.warn("RMI Error in XAResource", re);
       throw new XAException(XAException.XAER_RMFAIL);
     }
   }
+
+  private SerializableXid convertXid(Xid xid) {
+    return new SerializableXid(xid);
+  }
+
+  private static class SerializableXid implements Xid, Serializable {
+    private byte[] bq;
+    private int fi;
+    private byte[] gtid;
+
+    public SerializableXid(Xid xid) {
+      byte[] tbq = xid.getBranchQualifier();
+      byte[] tgtid = xid.getGlobalTransactionId();
+      this.bq = new byte[tbq.length];
+      this.fi = xid.getFormatId();
+      this.gtid = new byte[tgtid.length];
+      System.arraycopy(tbq, 0, this.bq, 0, tbq.length);
+      System.arraycopy(tgtid, 0, this.gtid, 0, tgtid.length);
+    }
+
+    public byte[] getBranchQualifier() {
+      return bq;
+    }
+
+    public int getFormatId() {
+      return fi;
+    }
+
+    public byte[] getGlobalTransactionId() {
+      return gtid;
+    }
+  }
 }




More information about the Mulgara-svn mailing list