[Mulgara-svn] r594 - in branches/mgr-73: . src/jar/resolver/java/org/mulgara/resolver src/jar/util/java/org/mulgara/util
andrae at mulgara.org
andrae at mulgara.org
Tue Dec 4 04:29:13 UTC 2007
Author: andrae
Date: 2007-12-03 22:29:12 -0600 (Mon, 03 Dec 2007)
New Revision: 594
Modified:
branches/mgr-73/ExternalTransactionUnitTest.java
branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/ExternalTransactionUnitTest.java
branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransactionFactory.java
branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraXAResourceContext.java
branches/mgr-73/src/jar/util/java/org/mulgara/util/Assoc1toNMap.java
Log:
With these tests I am now confident that the standard path through the
XAResource to perform a commit works - including suspend/resume, write-locking,
and query isolation.
Next up: testing rollback semantics.
Modified: branches/mgr-73/ExternalTransactionUnitTest.java
===================================================================
--- branches/mgr-73/ExternalTransactionUnitTest.java 2007-12-03 22:12:33 UTC (rev 593)
+++ branches/mgr-73/ExternalTransactionUnitTest.java 2007-12-04 04:29:12 UTC (rev 594)
@@ -22,6 +22,11 @@
import org.mulgara.server.driver.SessionFactoryFinder;
import org.mulgara.util.FileUtil;
+import org.mulgara.query.QueryException;
+import org.mulgara.server.JRDFSession;
+import org.mulgara.server.SessionFactory;
+import org.mulgara.server.driver.SessionFactoryFinder;
+
public class ExternalTransactionUnitTest
{
private static final URI databaseURI;
@@ -56,7 +61,7 @@
new ExternalTransactionUnitTest().run();
}
- public void run() throws Exception{
+ public void run() throws Exception {
System.out.println("Doing setup");
setup();
System.out.println("Doing OnePhase commit");
@@ -65,6 +70,8 @@
testSimpleTwoPhaseCommit();
System.out.println("Doing Query");
testBasicQuery();
+ System.out.println("Doing Test Remote");
+ testRemote();
System.out.println("Finished Tests");
}
@@ -214,6 +221,21 @@
}
}
+ public void testRemote() throws Exception {
+ URI serverURI = new URI("rmi://localhost/server1");
+ URI modelURI = new URI("rmi://localhost/server1#ri");
+
+ SessionFactory factory = SessionFactoryFinder.newSessionFactory(serverURI, true);
+ Session session = factory.newSession();
+
+ assertFalse(session.modelExists(modelURI));
+
+ factory.close(); // comment this out to pass
+ factory = SessionFactoryFinder.newSessionFactory(serverURI, true);
+ session = factory.newSession();
+ assertFalse(session.modelExists(modelURI));
+ }
+
//
// Internal methods
//
Modified: branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/ExternalTransactionUnitTest.java
===================================================================
--- branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/ExternalTransactionUnitTest.java 2007-12-03 22:12:33 UTC (rev 593)
+++ branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/ExternalTransactionUnitTest.java 2007-12-04 04:29:12 UTC (rev 594)
@@ -36,6 +36,9 @@
// Third party packages
import junit.framework.*; // JUnit
import org.apache.log4j.Logger; // Log4J
+import org.jrdf.graph.SubjectNode;
+import org.jrdf.graph.PredicateNode;
+import org.jrdf.graph.ObjectNode;
// Locally written packages
import org.mulgara.query.*;
@@ -101,6 +104,14 @@
suite.addTest(new ExternalTransactionUnitTest("testSimpleOnePhaseCommit"));
suite.addTest(new ExternalTransactionUnitTest("testSimpleTwoPhaseCommit"));
suite.addTest(new ExternalTransactionUnitTest("testBasicQuery"));
+ suite.addTest(new ExternalTransactionUnitTest("testMultipleQuery"));
+ suite.addTest(new ExternalTransactionUnitTest("testBasicReadOnlyQuery"));
+ suite.addTest(new ExternalTransactionUnitTest("testConcurrentQuery"));
+ suite.addTest(new ExternalTransactionUnitTest("testConcurrentReadWrite"));
+ suite.addTest(new ExternalTransactionUnitTest("testSubqueryQuery"));
+ suite.addTest(new ExternalTransactionUnitTest("testConcurrentSubqueryQuery"));
+ suite.addTest(new ExternalTransactionUnitTest("testExplicitIsolationQuerySingleSession"));
+ suite.addTest(new ExternalTransactionUnitTest("testConcurrentExplicitTxn"));
return suite;
}
@@ -331,8 +342,8 @@
}
}
- public void testConcurrentQuery() throws URISyntaxException {
- logger.info("Testing concurrentQuery");
+ public void testMultipleQuery() throws URISyntaxException {
+ logger.info("Testing MultipleQuery");
try {
// Load some test data
@@ -395,6 +406,762 @@
fail(e);
}
}
+
+ public void testBasicReadOnlyQuery() throws URISyntaxException {
+ logger.info("Testing basicReadOnlyQuery");
+
+ try {
+ // Load some test data
+ DatabaseSession session = (DatabaseSession)database.newSession();
+ try {
+ XAResource resource = session.getReadOnlyXAResource();
+ Xid xid = new TestXid(1);
+ resource.start(xid, XAResource.TMNOFLAGS);
+
+ Variable subjectVariable = new Variable("subject");
+ Variable predicateVariable = new Variable("predicate");
+ Variable objectVariable = new Variable("object");
+
+ List selectList = new ArrayList(3);
+ selectList.add(subjectVariable);
+ selectList.add(predicateVariable);
+ selectList.add(objectVariable);
+
+ // Evaluate the query
+ Answer answer = session.query(new Query(
+ selectList, // SELECT
+ new ModelResource(modelURI), // FROM
+ new ConstraintImpl(subjectVariable, // WHERE
+ predicateVariable,
+ objectVariable),
+ null, // HAVING
+ Arrays.asList(new Order[] { // ORDER BY
+ new Order(subjectVariable, true),
+ new Order(predicateVariable, true),
+ new Order(objectVariable, true)
+ }),
+ null, // LIMIT
+ 0, // OFFSET
+ new UnconstrainedAnswer() // GIVEN
+ ));
+ String[][] results = {
+ { "test:s01", "test:p01", "test:o01" },
+ { "test:s01", "test:p02", "test:o01" },
+ { "test:s01", "test:p02", "test:o02" },
+ { "test:s01", "test:p03", "test:o02" },
+ { "test:s02", "test:p03", "test:o02" },
+ { "test:s02", "test:p04", "test:o02" },
+ { "test:s02", "test:p04", "test:o03" },
+ { "test:s02", "test:p05", "test:o03" },
+ { "test:s03", "test:p01", "test:o01" },
+ { "test:s03", "test:p05", "test:o03" },
+ { "test:s03", "test:p06", "test:o01" },
+ { "test:s03", "test:p06", "test:o03" },
+ };
+ compareResults(results, answer);
+ answer.close();
+
+ resource.end(xid, XAResource.TMSUCCESS);
+ resource.commit(xid, true);
+ } finally {
+ session.close();
+ }
+ } catch (Exception e) {
+ fail(e);
+ }
+ }
+
+ public void testConcurrentQuery() throws URISyntaxException {
+ logger.info("Testing concurrentQuery");
+
+ try {
+ // Load some test data
+ Session session = database.newSession();
+ XAResource resource = session.getReadOnlyXAResource();
+ Xid xid1 = new TestXid(1);
+ Xid xid2 = new TestXid(2);
+ resource.start(xid1, XAResource.TMNOFLAGS);
+ try {
+ Variable subjectVariable = new Variable("subject");
+ Variable predicateVariable = new Variable("predicate");
+ Variable objectVariable = new Variable("object");
+
+ List selectList = new ArrayList(3);
+ selectList.add(subjectVariable);
+ selectList.add(predicateVariable);
+ selectList.add(objectVariable);
+
+ // Evaluate the query
+ Answer answer1 = session.query(new Query(
+ selectList, // SELECT
+ new ModelResource(modelURI), // FROM
+ new ConstraintImpl(subjectVariable, // WHERE
+ predicateVariable,
+ objectVariable),
+ null, // HAVING
+ Collections.singletonList( // ORDER BY
+ new Order(subjectVariable, true)
+ ),
+ null, // LIMIT
+ 0, // OFFSET
+ new UnconstrainedAnswer() // GIVEN
+ ));
+ resource.end(xid1, XAResource.TMSUSPEND);
+ resource.start(xid2, XAResource.TMNOFLAGS);
+
+ Answer answer2 = session.query(new Query(
+ selectList, // SELECT
+ new ModelResource(modelURI), // FROM
+ new ConstraintImpl(subjectVariable, // WHERE
+ predicateVariable,
+ objectVariable),
+ null, // HAVING
+ Collections.singletonList( // ORDER BY
+ new Order(subjectVariable, true)
+ ),
+ null, // LIMIT
+ 0, // OFFSET
+ new UnconstrainedAnswer() // GIVEN
+ ));
+ resource.end(xid2, XAResource.TMSUSPEND);
+
+ compareResults(answer1, answer2);
+
+ answer1.close();
+ answer2.close();
+
+ resource.start(xid1, XAResource.TMRESUME);
+ resource.end(xid1, XAResource.TMSUCCESS);
+ resource.end(xid2, XAResource.TMSUCCESS);
+ resource.commit(xid1, true);
+ resource.commit(xid2, true);
+ } finally {
+ session.close();
+ }
+ } catch (Exception e) {
+ fail(e);
+ }
+ }
+
+ /**
+ * Note: What this test does is a really bad idea - there is no
+ * isolation provided as each operation is within its own
+ * transaction. It does however provide a good test.
+ */
+ public void testConcurrentReadWrite() throws URISyntaxException {
+ logger.info("Testing concurrentReadWrite");
+
+ try {
+ Session session = database.newSession();
+ XAResource roResource = session.getReadOnlyXAResource();
+ XAResource rwResource = session.getXAResource();
+ Xid xid1 = new TestXid(1);
+
+ rwResource.start(xid1, XAResource.TMNOFLAGS);
+ session.createModel(model2URI, null);
+ rwResource.end(xid1, XAResource.TMSUSPEND);
+
+ try {
+ Variable subjectVariable = new Variable("subject");
+ Variable predicateVariable = new Variable("predicate");
+ Variable objectVariable = new Variable("object");
+
+ List selectList = new ArrayList(3);
+ selectList.add(subjectVariable);
+ selectList.add(predicateVariable);
+ selectList.add(objectVariable);
+
+ Xid xid2 = new TestXid(2);
+ roResource.start(xid2, XAResource.TMNOFLAGS);
+
+ // Evaluate the query
+ Answer answer = session.query(new Query(
+ selectList, // SELECT
+ new ModelResource(modelURI), // FROM
+ new ConstraintImpl(subjectVariable, // WHERE
+ predicateVariable,
+ objectVariable),
+ null, // HAVING
+ Arrays.asList(new Order[] { // ORDER BY
+ new Order(subjectVariable, true),
+ new Order(predicateVariable, true),
+ new Order(objectVariable, true)
+ }),
+ null, // LIMIT
+ 0, // OFFSET
+ new UnconstrainedAnswer() // GIVEN
+ ));
+
+ roResource.end(xid2, XAResource.TMSUSPEND);
+ answer.beforeFirst();
+ while (answer.next()) {
+ rwResource.start(xid1, XAResource.TMRESUME);
+ session.insert(model2URI, Collections.singleton(new TripleImpl(
+ (SubjectNode)answer.getObject(0),
+ (PredicateNode)answer.getObject(1),
+ (ObjectNode)answer.getObject(2))));
+ rwResource.end(xid1, XAResource.TMSUSPEND);
+ }
+ answer.close();
+
+ rwResource.end(xid1, XAResource.TMSUCCESS);
+ rwResource.commit(xid1, true);
+
+ Xid xid3 = new TestXid(3);
+ roResource.start(xid3, XAResource.TMNOFLAGS);
+
+ Answer answer2 = session.query(new Query(
+ selectList, // SELECT
+ new ModelResource(model2URI), // FROM
+ new ConstraintImpl(subjectVariable, // WHERE
+ predicateVariable,
+ objectVariable),
+ null, // HAVING
+ Arrays.asList(new Order[] { // ORDER BY
+ new Order(subjectVariable, true),
+ new Order(predicateVariable, true),
+ new Order(objectVariable, true)
+ }),
+ null, // LIMIT
+ 0, // OFFSET
+ new UnconstrainedAnswer() // GIVEN
+ ));
+
+ roResource.end(xid3, XAResource.TMSUSPEND);
+ String[][] results = {
+ { "test:s01", "test:p01", "test:o01" },
+ { "test:s01", "test:p02", "test:o01" },
+ { "test:s01", "test:p02", "test:o02" },
+ { "test:s01", "test:p03", "test:o02" },
+ { "test:s02", "test:p03", "test:o02" },
+ { "test:s02", "test:p04", "test:o02" },
+ { "test:s02", "test:p04", "test:o03" },
+ { "test:s02", "test:p05", "test:o03" },
+ { "test:s03", "test:p01", "test:o01" },
+ { "test:s03", "test:p05", "test:o03" },
+ { "test:s03", "test:p06", "test:o01" },
+ { "test:s03", "test:p06", "test:o03" },
+ };
+ compareResults(results, answer2);
+ answer2.close();
+
+ Xid xid4 = new TestXid(4);
+ rwResource.start(xid4, XAResource.TMNOFLAGS);
+ session.removeModel(model2URI);
+ rwResource.end(xid4, XAResource.TMSUCCESS);
+ rwResource.commit(xid4, true);
+
+ roResource.end(xid2, XAResource.TMSUCCESS);
+ roResource.commit(xid2, true);
+ roResource.end(xid3, XAResource.TMSUCCESS);
+ roResource.commit(xid3, true);
+ } finally {
+ session.close();
+ }
+ } catch (Exception e) {
+ fail(e);
+ }
+ }
+
+ public void testSubqueryQuery() throws URISyntaxException {
+ logger.info("Testing subqueryQuery");
+
+ try {
+ // Load some test data
+ Session session = database.newSession();
+ XAResource roResource = session.getReadOnlyXAResource();
+ Xid xid1 = new TestXid(1);
+ roResource.start(xid1, XAResource.TMNOFLAGS);
+
+ try {
+ Variable subjectVariable = new Variable("subject");
+ Variable predicateVariable = new Variable("predicate");
+ Variable objectVariable = new Variable("object");
+
+ List selectList = new ArrayList(3);
+ selectList.add(subjectVariable);
+ selectList.add(new Subquery(new Variable("k0"), new Query(
+ Collections.singletonList(objectVariable),
+ new ModelResource(modelURI), // FROM
+ new ConstraintImpl(subjectVariable, // WHERE
+ predicateVariable,
+ objectVariable),
+ null, // HAVING
+ Collections.singletonList( // ORDER BY
+ new Order(objectVariable, true)
+ ),
+ null, // LIMIT
+ 0, // OFFSET
+ new UnconstrainedAnswer() // GIVEN
+ )));
+
+
+ // Evaluate the query
+ Answer answer = session.query(new Query(
+ selectList, // SELECT
+ new ModelResource(modelURI), // FROM
+ new ConstraintImpl(subjectVariable, // WHERE
+ new URIReferenceImpl(new URI("test:p03")),
+ objectVariable),
+ null, // HAVING
+ Collections.singletonList( // ORDER BY
+ new Order(subjectVariable, true)
+ ),
+ null, // LIMIT
+ 0, // OFFSET
+ new UnconstrainedAnswer() // GIVEN
+ ));
+
+ roResource.end(xid1, XAResource.TMSUSPEND);
+
+ answer.beforeFirst();
+
+ assertTrue(answer.next());
+ assertEquals(new URIReferenceImpl(new URI("test:s01")),
+ answer.getObject(0));
+ Answer sub1 = (Answer)answer.getObject(1);
+ compareResults(new String[][] { new String[] { "test:o01" },
+ new String[] { "test:o02" } }, sub1);
+ sub1.close();
+
+ assertTrue(answer.next());
+ assertEquals(new URIReferenceImpl(new URI("test:s02")),
+ answer.getObject(0));
+ Answer sub2 = (Answer)answer.getObject(1);
+ compareResults(new String[][] { new String[] { "test:o02" },
+ new String[] { "test:o03" } }, sub2);
+ // Leave sub2 open.
+
+ assertFalse(answer.next());
+ answer.close();
+ sub2.close();
+
+ // Leave transaction to be closed on session close.
+ } finally {
+ session.close();
+ }
+ } catch (Exception e) {
+ fail(e);
+ }
+ }
+
+ public void testConcurrentSubqueryQuery() throws URISyntaxException {
+ logger.info("Testing concurrentSubqueryQuery");
+
+ try {
+ Session session = database.newSession();
+ XAResource rwResource = session.getXAResource();
+ Xid xid1 = new TestXid(1);
+ rwResource.start(xid1, XAResource.TMNOFLAGS);
+
+ try {
+ Variable subjectVariable = new Variable("subject");
+ Variable predicateVariable = new Variable("predicate");
+ Variable objectVariable = new Variable("object");
+
+ List selectList = new ArrayList(3);
+ selectList.add(subjectVariable);
+ selectList.add(new Subquery(new Variable("k0"), new Query(
+ Collections.singletonList(objectVariable),
+ new ModelResource(modelURI), // FROM
+ new ConstraintImpl(subjectVariable, // WHERE
+ predicateVariable,
+ objectVariable),
+ null, // HAVING
+ Collections.singletonList( // ORDER BY
+ new Order(objectVariable, true)
+ ),
+ null, // LIMIT
+ 0, // OFFSET
+ new UnconstrainedAnswer() // GIVEN
+ )));
+
+
+ // Evaluate the query
+ Answer answer = session.query(new Query(
+ selectList, // SELECT
+ new ModelResource(modelURI), // FROM
+ new ConstraintImpl(subjectVariable, // WHERE
+ new URIReferenceImpl(new URI("test:p03")),
+ objectVariable),
+ null, // HAVING
+ Collections.singletonList( // ORDER BY
+ new Order(subjectVariable, true)
+ ),
+ null, // LIMIT
+ 0, // OFFSET
+ new UnconstrainedAnswer() // GIVEN
+ ));
+
+ answer.beforeFirst();
+
+ assertTrue(answer.next());
+ assertEquals(new URIReferenceImpl(new URI("test:s01")),
+ answer.getObject(0));
+ Answer sub1 = (Answer)answer.getObject(1);
+ assertTrue(answer.next());
+ assertEquals(new URIReferenceImpl(new URI("test:s02")),
+ answer.getObject(0));
+ Answer sub2 = (Answer)answer.getObject(1);
+ assertFalse(answer.next());
+
+ assertEquals(1, sub1.getNumberOfVariables());
+ assertEquals(1, sub2.getNumberOfVariables());
+ sub1.beforeFirst();
+ sub2.beforeFirst();
+ assertTrue(sub1.next());
+ assertTrue(sub2.next());
+ assertEquals(new URIReferenceImpl(new URI("test:o01")), sub1.getObject(0));
+ assertEquals(new URIReferenceImpl(new URI("test:o02")), sub2.getObject(0));
+
+ rwResource.end(xid1, XAResource.TMSUSPEND);
+
+ assertTrue(sub1.next());
+ assertTrue(sub2.next());
+ assertEquals(new URIReferenceImpl(new URI("test:o02")), sub1.getObject(0));
+ assertEquals(new URIReferenceImpl(new URI("test:o03")), sub2.getObject(0));
+ assertFalse(sub1.next());
+ assertFalse(sub2.next());
+
+ answer.close();
+
+ rwResource.end(xid1, XAResource.TMSUCCESS);
+ rwResource.commit(xid1, true);
+ } finally {
+ session.close();
+ }
+ } catch (Exception e) {
+ fail(e);
+ }
+ }
+
+ public void testExplicitIsolationQuerySingleSession() throws URISyntaxException
+ {
+ logger.info("testExplicitIsolationQuery");
+ URI fileURI = new File("data/xatest-model1.rdf").toURI();
+
+ try {
+ Session session = database.newSession();
+ XAResource roResource = session.getReadOnlyXAResource();
+ XAResource rwResource = session.getXAResource();
+ Xid xid1 = new TestXid(1); // Initial create model.
+ Xid xid2 = new TestXid(2); // Started before setModel.
+ Xid xid3 = new TestXid(3); // setModel.
+ Xid xid4 = new TestXid(4); // Started before setModel prepares
+ Xid xid5 = new TestXid(5); // Started before setModel commits
+ Xid xid6 = new TestXid(6); // Started after setModel commits
+ Xid xid7 = new TestXid(7); // Final remove model.
+
+ try {
+ rwResource.start(xid1, XAResource.TMNOFLAGS);
+ session.createModel(model3URI, null);
+ rwResource.end(xid1, XAResource.TMSUCCESS);
+ rwResource.commit(xid1, true);
+
+ // Nothing visible.
+ roResource.start(xid2, XAResource.TMNOFLAGS);
+ assertChangeNotVisible(session);
+ roResource.end(xid2, XAResource.TMSUSPEND);
+
+ // Perform update
+ rwResource.start(xid3, XAResource.TMNOFLAGS);
+ session.setModel(model3URI, new ModelResource(fileURI));
+ rwResource.end(xid3, XAResource.TMSUSPEND);
+
+ // Check uncommitted change not visible
+ roResource.start(xid4, XAResource.TMNOFLAGS);
+ assertChangeNotVisible(session);
+ roResource.end(xid4, XAResource.TMSUSPEND);
+
+ // Check original phase unaffected.
+ roResource.start(xid2, XAResource.TMRESUME);
+ assertChangeNotVisible(session);
+ roResource.end(xid2, XAResource.TMSUSPEND);
+
+ // Check micro-commit visible to current-phase
+ rwResource.start(xid3, XAResource.TMRESUME);
+ assertChangeVisible(session);
+ // Perform prepare
+ rwResource.end(xid3, XAResource.TMSUCCESS);
+ rwResource.prepare(xid3);
+
+ // Check original phase unaffected
+ roResource.start(xid2, XAResource.TMRESUME);
+ assertChangeNotVisible(session);
+ roResource.end(xid2, XAResource.TMSUSPEND);
+
+ // Check pre-prepare phase unaffected
+ roResource.start(xid4, XAResource.TMRESUME);
+ assertChangeNotVisible(session);
+ roResource.end(xid4, XAResource.TMSUSPEND);
+
+ // Check committed phase unaffected.
+ roResource.start(xid5, XAResource.TMNOFLAGS);
+ assertChangeNotVisible(session);
+ roResource.end(xid5, XAResource.TMSUSPEND);
+
+ // Do commit
+ rwResource.commit(xid3, false);
+
+ // Check original phase
+ roResource.start(xid2, XAResource.TMRESUME);
+ assertChangeNotVisible(session);
+ roResource.end(xid2, XAResource.TMSUSPEND);
+
+ // Check pre-prepare
+ roResource.start(xid4, XAResource.TMRESUME);
+ assertChangeNotVisible(session);
+ roResource.end(xid4, XAResource.TMSUSPEND);
+
+ // Check pre-commit
+ roResource.start(xid5, XAResource.TMRESUME);
+ assertChangeNotVisible(session);
+ roResource.end(xid5, XAResource.TMSUSPEND);
+
+ // Check committed phase is now updated
+ roResource.start(xid6, XAResource.TMNOFLAGS);
+ assertChangeVisible(session);
+
+ // Cleanup transactions.
+ roResource.end(xid6, XAResource.TMSUCCESS);
+ roResource.end(xid2, XAResource.TMSUCCESS);
+ roResource.end(xid4, XAResource.TMSUCCESS);
+ roResource.end(xid5, XAResource.TMSUCCESS);
+ roResource.commit(xid2, true);
+ roResource.commit(xid4, true);
+ roResource.commit(xid5, true);
+ roResource.commit(xid6, true);
+
+ // Cleanup database
+ rwResource.start(xid7, XAResource.TMNOFLAGS);
+ session.removeModel(model3URI);
+ rwResource.end(xid7, XAResource.TMSUCCESS);
+ rwResource.commit(xid7, true);
+ } finally {
+ session.close();
+ }
+ } catch (Exception e) {
+ fail(e);
+ }
+ }
+
+ private void assertChangeVisible(Session session) throws Exception {
+ Variable subjectVariable = new Variable("subject");
+ Variable predicateVariable = new Variable("predicate");
+ Variable objectVariable = new Variable("object");
+
+ List selectList = new ArrayList(3);
+ selectList.add(subjectVariable);
+ selectList.add(predicateVariable);
+ selectList.add(objectVariable);
+
+ // Evaluate the query
+ Answer answer = session.query(new Query(
+ selectList, // SELECT
+ new ModelResource(model3URI), // FROM
+ new ConstraintImpl(subjectVariable, // WHERE
+ predicateVariable,
+ objectVariable),
+ null, // HAVING
+ Arrays.asList(new Order[] { // ORDER BY
+ new Order(subjectVariable, true),
+ new Order(predicateVariable, true),
+ new Order(objectVariable, true)
+ }),
+ null, // LIMIT
+ 0, // OFFSET
+ new UnconstrainedAnswer() // GIVEN
+ ));
+
+ String[][] results = {
+ { "test:s01", "test:p01", "test:o01" },
+ { "test:s01", "test:p02", "test:o01" },
+ { "test:s01", "test:p02", "test:o02" },
+ { "test:s01", "test:p03", "test:o02" },
+ { "test:s02", "test:p03", "test:o02" },
+ { "test:s02", "test:p04", "test:o02" },
+ { "test:s02", "test:p04", "test:o03" },
+ { "test:s02", "test:p05", "test:o03" },
+ { "test:s03", "test:p01", "test:o01" },
+ { "test:s03", "test:p05", "test:o03" },
+ { "test:s03", "test:p06", "test:o01" },
+ { "test:s03", "test:p06", "test:o03" },
+ };
+ compareResults(results, answer);
+ answer.close();
+ }
+
+ private void assertChangeNotVisible(Session session) throws Exception {
+ Variable subjectVariable = new Variable("subject");
+ Variable predicateVariable = new Variable("predicate");
+ Variable objectVariable = new Variable("object");
+
+ List selectList = new ArrayList(3);
+ selectList.add(subjectVariable);
+ selectList.add(predicateVariable);
+ selectList.add(objectVariable);
+
+ // Evaluate the query
+ Answer answer = session.query(new Query(
+ selectList, // SELECT
+ new ModelResource(model3URI), // FROM
+ new ConstraintImpl(subjectVariable, // WHERE
+ predicateVariable,
+ objectVariable),
+ null, // HAVING
+ Arrays.asList(new Order[] { // ORDER BY
+ new Order(subjectVariable, true),
+ new Order(predicateVariable, true),
+ new Order(objectVariable, true)
+ }),
+ null, // LIMIT
+ 0, // OFFSET
+ new UnconstrainedAnswer() // GIVEN
+ ));
+ answer.beforeFirst();
+ assertFalse(answer.next());
+ answer.close();
+ }
+
+ /**
+ * Test two simultaneous, explicit transactions, in two threads. The second one should block
+ * until the first one sets auto-commit back to true.
+ */
+ public void testConcurrentExplicitTxn() throws URISyntaxException
+ {
+ logger.info("testConcurrentExplicitTxn");
+ URI fileURI = new File("data/xatest-model1.rdf").toURI();
+
+ try {
+ Session session1 = database.newSession();
+ try {
+ XAResource resource1 = session1.getXAResource();
+ resource1.start(new TestXid(1), XAResource.TMNOFLAGS);
+ session1.createModel(model3URI, null);
+ resource1.end(new TestXid(1), XAResource.TMSUCCESS);
+ resource1.commit(new TestXid(1), true);
+
+ resource1.start(new TestXid(2), XAResource.TMNOFLAGS);
+ session1.setModel(model3URI, new ModelResource(fileURI));
+
+ final boolean[] tx2Started = new boolean[] { false };
+
+ Thread t2 = new Thread("tx2Test") {
+ public void run() {
+ try {
+ Session session2 = database.newSession();
+ XAResource resource2 = session2.getXAResource();
+ try {
+ resource2.start(new TestXid(3), XAResource.TMNOFLAGS);
+
+ synchronized (tx2Started) {
+ tx2Started[0] = true;
+ tx2Started.notify();
+ }
+
+ Variable subjectVariable = new Variable("subject");
+ Variable predicateVariable = new Variable("predicate");
+ Variable objectVariable = new Variable("object");
+
+ List selectList = new ArrayList(3);
+ selectList.add(subjectVariable);
+ selectList.add(predicateVariable);
+ selectList.add(objectVariable);
+
+ // Evaluate the query
+ Answer answer = session2.query(new Query(
+ selectList, // SELECT
+ new ModelResource(model3URI), // FROM
+ new ConstraintImpl(subjectVariable, // WHERE
+ predicateVariable,
+ objectVariable),
+ null, // HAVING
+ Arrays.asList(new Order[] { // ORDER BY
+ new Order(subjectVariable, true),
+ new Order(predicateVariable, true),
+ new Order(objectVariable, true)
+ }),
+ null, // LIMIT
+ 0, // OFFSET
+ new UnconstrainedAnswer() // GIVEN
+ ));
+
+ String[][] results = {
+ { "test:s01", "test:p01", "test:o01" },
+ { "test:s01", "test:p02", "test:o01" },
+ { "test:s01", "test:p02", "test:o02" },
+ { "test:s01", "test:p03", "test:o02" },
+ { "test:s02", "test:p03", "test:o02" },
+ { "test:s02", "test:p04", "test:o02" },
+ { "test:s02", "test:p04", "test:o03" },
+ { "test:s02", "test:p05", "test:o03" },
+ { "test:s03", "test:p01", "test:o01" },
+ { "test:s03", "test:p05", "test:o03" },
+ { "test:s03", "test:p06", "test:o01" },
+ { "test:s03", "test:p06", "test:o03" },
+ };
+ compareResults(results, answer);
+ answer.close();
+
+ resource2.end(new TestXid(3), XAResource.TMSUCCESS);
+ resource2.commit(new TestXid(3), true);
+ } finally {
+ session2.close();
+ }
+ } catch (Exception e) {
+ fail(e);
+ }
+ }
+ };
+ t2.start();
+
+ synchronized (tx2Started) {
+ if (!tx2Started[0]) {
+ try {
+ tx2Started.wait(2000L);
+ } catch (InterruptedException ie) {
+ logger.error("wait for tx2-started interrupted", ie);
+ fail(ie);
+ }
+ }
+ assertFalse("second transaction should still be waiting for write lock", tx2Started[0]);
+ }
+
+ resource1.commit(new TestXid(2), true);
+
+ synchronized (tx2Started) {
+ if (!tx2Started[0]) {
+ try {
+ tx2Started.wait(2000L);
+ } catch (InterruptedException ie) {
+ logger.error("wait for tx2-started interrupted", ie);
+ fail(ie);
+ }
+ assertTrue("second transaction should've started", tx2Started[0]);
+ }
+ }
+
+ try {
+ t2.join(2000L);
+ } catch (InterruptedException ie) {
+ logger.error("wait for tx2-terminated interrupted", ie);
+ fail(ie);
+ }
+ assertFalse("second transaction should've terminated", t2.isAlive());
+
+ resource1.start(new TestXid(4), XAResource.TMNOFLAGS);
+ session1.removeModel(model3URI);
+ resource1.end(new TestXid(4), XAResource.TMSUCCESS);
+ resource1.commit(new TestXid(4), true);
+
+ } finally {
+ session1.close();
+ }
+ } catch (Exception e) {
+ fail(e);
+ }
+ }
+
//
// Internal methods
//
Modified: branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransactionFactory.java
===================================================================
--- branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransactionFactory.java 2007-12-03 22:12:33 UTC (rev 593)
+++ branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransactionFactory.java 2007-12-04 04:29:12 UTC (rev 594)
@@ -182,12 +182,9 @@
public void disassociateTransaction(DatabaseSession session, MulgaraExternalTransaction xa)
throws MulgaraTransactionException {
- if (associatedTransaction.get(session) != xa) {
- throw new MulgaraTransactionException(
- "Attempt to disassociate transaction from session from wrong transaction");
+ if (associatedTransaction.get(session) == xa) {
+ associatedTransaction.remove(session);
}
-
- associatedTransaction.remove(session);
}
void abortWriteTransaction() throws MulgaraTransactionException {
Modified: branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraXAResourceContext.java
===================================================================
--- branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraXAResourceContext.java 2007-12-03 22:12:33 UTC (rev 593)
+++ branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraXAResourceContext.java 2007-12-04 04:29:12 UTC (rev 594)
@@ -138,7 +138,7 @@
*
* TMSUCCESS: Move to Idle and await call to rollback, prepare, or commit.
* TMFAIL: Move to RollbackOnly; await call to rollback.
- * TMSUSPEND: Move to Idle and await start(TMRESUME)
+ * TMSUSPEND: Move to Idle and await start(TMRESUME) or end(TMSUCCESS|FAIL)
*
* In all cases disassociate from current session.
*/
@@ -157,7 +157,8 @@
break;
case TMSUCCESS:
break;
- case TMSUSPEND:
+ case TMSUSPEND: // Should I be tracking the xid's state to ensure
+ // conformance with the X/Open state diagrams?
break;
default:
logger.error("Invalid flag passed to end() : " + flags);
@@ -165,6 +166,7 @@
}
try {
+ // If XA is currently associated with session, disassociate it.
factory.disassociateTransaction(session, xa);
} catch (MulgaraTransactionException em) {
logger.error("Error disassociating transaction from session", em);
Modified: branches/mgr-73/src/jar/util/java/org/mulgara/util/Assoc1toNMap.java
===================================================================
--- branches/mgr-73/src/jar/util/java/org/mulgara/util/Assoc1toNMap.java 2007-12-03 22:12:33 UTC (rev 593)
+++ branches/mgr-73/src/jar/util/java/org/mulgara/util/Assoc1toNMap.java 2007-12-04 04:29:12 UTC (rev 594)
@@ -75,10 +75,12 @@
public void removeN(T2 t2) {
T1 t1 = mapNto1.remove(t2);
- Set<T2> t2set = map1toN.get(t1);
- t2set.remove(t2);
- if (t2set.isEmpty()) {
- map1toN.remove(t1);
+ if (t1 != null) {
+ Set<T2> t2set = map1toN.get(t1);
+ t2set.remove(t2);
+ if (t2set.isEmpty()) {
+ map1toN.remove(t1);
+ }
}
}
More information about the Mulgara-svn
mailing list