[Mulgara-svn] r1010 - branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver
ronald at mulgara.org
ronald at mulgara.org
Mon Jun 23 02:37:03 UTC 2008
Author: ronald
Date: 2008-06-22 19:37:02 -0700 (Sun, 22 Jun 2008)
New Revision: 1010
Modified:
branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/AdvDatabaseSessionUnitTest.java
branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraInternalTransaction.java
branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraInternalTransactionFactory.java
Log:
Fix transaction state handling after a failed operation in non-autocommit mode
and related issues:
* track explicit transactions on the session, so that getTransaction() returns
the current transaction even after a failure.
* ensure the only valid operations on a session after a failure are
setAutoCommit(true) and rollback(), both of which reset the failure; all
others will throw an exception.
* autoCommit is not a global flag, but instead must be tracked per session
* added tests for this stuff
* added tests for transaction operations in auto-commit mode.
Modified: branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/AdvDatabaseSessionUnitTest.java
===================================================================
--- branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/AdvDatabaseSessionUnitTest.java 2008-06-22 22:54:54 UTC (rev 1009)
+++ branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/AdvDatabaseSessionUnitTest.java 2008-06-23 02:37:02 UTC (rev 1010)
@@ -130,6 +130,8 @@
suite.addTest(new AdvDatabaseSessionUnitTest("testDatabaseDelete"));
suite.addTest(new AdvDatabaseSessionUnitTest("testCreateModel"));
suite.addTest(new AdvDatabaseSessionUnitTest("testInsertionBlankNodes"));
+ suite.addTest(new AdvDatabaseSessionUnitTest("testAutoCommitTransactionOps"));
+ suite.addTest(new AdvDatabaseSessionUnitTest("testTransactionFailure"));
return suite;
}
@@ -1170,6 +1172,160 @@
}
+ /**
+ * Test various transaction operations in auto-commit mode.
+ */
+ public void testAutoCommitTransactionOps() {
+ logger.info("Testing autoCommitTransactionOps");
+
+ try {
+ Session session = database.newSession();
+ try {
+ // should be a no-op
+ session.setAutoCommit(true);
+
+ // commit should not be allowed
+ try {
+ session.commit();
+ fail("Commit did not fail in auto-commit mode");
+ } catch (QueryException qe) {
+ }
+
+ // rollback should not be allowed
+ try {
+ session.rollback();
+ fail("Rollback did not fail in auto-commit mode");
+ } catch (QueryException qe) {
+ }
+
+ // verify we're still good to go
+ session.query(createQuery(modelURI)).close();
+
+ // verify second setAutoCommit(false) is a no-op
+ session.createModel(model3URI, null);
+ session.setAutoCommit(false);
+ session.setModel(model3URI, new ModelResource(new File("data/xatest-model1.rdf").toURI()));
+ session.setAutoCommit(false);
+ session.commit();
+
+ Answer answer = session.query(createQuery(model3URI));
+ compareResults(expectedResults(), answer);
+ answer.close();
+
+ session.removeModel(model3URI);
+ } finally {
+ session.close();
+ }
+ } catch (Exception e) {
+ fail(e);
+ }
+ }
+
+
+ /**
+ * Test various operations after a transaction fails.
+ */
+ public void testTransactionFailure() {
+ logger.info("Testing transactionFailure");
+
+ try {
+ // query after failure should fail
+ shouldFail(new TestOp() {
+ public void run(Session session) throws Exception {
+ session.query(createQuery(modelURI)).close();
+ }
+ }, "Query in failed transaction did not fail");
+
+ // insert after failure should fail
+ shouldFail(new TestOp() {
+ public void run(Session session) throws Exception {
+ session.insert(modelURI, Collections.singleton((Triple)new TripleImpl(
+ new URIReferenceImpl(URI.create("test:a")),
+ new URIReferenceImpl(URI.create("test:b")),
+ new URIReferenceImpl(URI.create("test:c")))));
+ }
+ }, "Insert in failed transaction did not fail");
+
+ // commit after failure should fail
+ shouldFail(new TestOp() {
+ public void run(Session session) throws Exception {
+ session.commit();
+ }
+ }, "Commit in failed transaction did not fail");
+
+ // rollback after failure should succeed
+ shouldSucceed(new TestOp() {
+ public void run(Session session) throws Exception {
+ session.rollback();
+ }
+ });
+
+ // setAutoCommit(false) after failure should fail
+ shouldFail(new TestOp() {
+ public void run(Session session) throws Exception {
+ session.setAutoCommit(false);
+ }
+ }, "setAutoCommit(false) in failed transaction did not fail");
+
+ // setAutoCommit(true) after failure should succeed
+ shouldSucceed(new TestOp() {
+ public void run(Session session) throws Exception {
+ session.setAutoCommit(true);
+ }
+ });
+ } catch (Exception e) {
+ fail(e);
+ }
+ }
+
+ private void shouldFail(TestOp op, String msg) throws Exception {
+ testTransactionFailureOp(op, true, msg);
+ }
+
+ private void shouldSucceed(TestOp op) throws Exception {
+ testTransactionFailureOp(op, false, null);
+ }
+
+ private void testTransactionFailureOp(TestOp op, boolean shouldFail, String msg) throws Exception {
+ Session session = database.newSession();
+ try {
+ // start tx
+ session.setAutoCommit(false);
+
+ // run bad query -> failed tx
+ try {
+ session.query(createQuery(URI.create("urn:no:such:model")));
+ fail("Bad query failed to throw exception");
+ } catch (QueryException qe) {
+ }
+
+ // run test op, verify it succeeds/fails, and reset
+ if (shouldFail) {
+ try {
+ op.run(session);
+ fail(msg);
+ } catch (QueryException qe) {
+ }
+ session.setAutoCommit(true);
+ } else {
+ op.run(session);
+ }
+
+ // verify we're good to go
+ session.query(createQuery(modelURI)).close();
+
+ session.setAutoCommit(false);
+ session.query(createQuery(modelURI)).close();
+ session.commit();
+ } finally {
+ session.close();
+ }
+ }
+
+ private static interface TestOp {
+ public void run(Session session) throws Exception;
+ }
+
//
// Internal methods
//
Modified: branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraInternalTransaction.java
===================================================================
--- branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraInternalTransaction.java 2008-06-22 22:54:54 UTC (rev 1009)
+++ branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraInternalTransaction.java 2008-06-23 02:37:02 UTC (rev 1010)
@@ -487,7 +487,7 @@
context.clear();
enlisted.clear();
state = State.FAILED;
- factory.transactionComplete(this);
+ factory.transactionAborted(this);
factory = null;
return new MulgaraTransactionException("Transaction rollback triggered", cause);
case DEACTREF:
Modified: branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraInternalTransactionFactory.java
===================================================================
--- branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraInternalTransactionFactory.java 2008-06-22 22:54:54 UTC (rev 1009)
+++ branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraInternalTransactionFactory.java 2008-06-23 02:37:02 UTC (rev 1010)
@@ -61,25 +61,27 @@
private static final Logger logger =
Logger.getLogger(MulgaraInternalTransactionFactory.class.getName());
- private boolean autoCommit;
-
/** Set of sessions whose transactions have been rolledback.*/
private Set<DatabaseSession> failedSessions;
/** Map of threads to active transactions. */
private Map<Thread, MulgaraTransaction> activeTransactions;
- private Assoc1toNMap<DatabaseSession, MulgaraTransaction> sessionXAMap;
+ /** Map of transactions to associated sessions. */
+ private Map<MulgaraTransaction, DatabaseSession> xaSessionMap;
+ /** Per session info. */
+ private Map<DatabaseSession, SessionInfo> sessionInfoMap;
+
private final TransactionManager transactionManager;
public MulgaraInternalTransactionFactory(MulgaraTransactionManager manager, TransactionManagerFactory transactionManagerFactory) {
super(manager);
- this.autoCommit = true;
this.failedSessions = new HashSet<DatabaseSession>();
this.activeTransactions = new HashMap<Thread, MulgaraTransaction>();
- this.sessionXAMap = new Assoc1toNMap<DatabaseSession, MulgaraTransaction>();
+ this.xaSessionMap = new HashMap<MulgaraTransaction, DatabaseSession>();
+ this.sessionInfoMap = new HashMap<DatabaseSession, SessionInfo>();
this.transactionManager = transactionManagerFactory.newTransactionManager();
}
@@ -88,8 +90,9 @@
throws MulgaraTransactionException {
acquireMutex();
try {
- if (manager.isHoldingWriteLock(session)) {
- return writeTransaction;
+ SessionInfo sessInfo = getOrCreateSessionInfo(session);
+ if (sessInfo.explicitXA != null) {
+ return sessInfo.explicitXA;
}
try {
@@ -112,7 +115,8 @@
transaction = new MulgaraInternalTransaction(this, session.newOperationContext(false));
}
- sessionXAMap.put(session, transaction);
+ xaSessionMap.put(transaction, session);
+ sessInfo.transactions.add(transaction);
return transaction;
} catch (MulgaraTransactionException em) {
@@ -128,8 +132,8 @@
public Set<MulgaraTransaction> getTransactionsForSession(DatabaseSession session) {
acquireMutex();
try {
- Set <MulgaraTransaction> xas = sessionXAMap.getN(session);
- return xas == null ? Collections.<MulgaraTransaction>emptySet() : xas;
+ SessionInfo sessInfo = sessionInfoMap.get(session);
+ return sessInfo == null ? Collections.<MulgaraTransaction>emptySet() : sessInfo.transactions;
} finally {
releaseMutex();
}
@@ -144,15 +148,15 @@
public void commit(DatabaseSession session) throws MulgaraTransactionException {
acquireMutex();
try {
+ if (failedSessions.contains(session)) {
+ throw new MulgaraTransactionException("Attempting to commit failed session");
+ } else if (!manager.isHoldingWriteLock(session)) {
+ throw new MulgaraTransactionException(
+ "Attempting to commit while not the current writing transaction");
+ }
+
manager.reserveWriteLock(session);
try {
- if (failedSessions.contains(session)) {
- throw new MulgaraTransactionException("Attempting to commit failed exception");
- } else if (!manager.isHoldingWriteLock(session)) {
- throw new MulgaraTransactionException(
- "Attempting to commit while not the current writing transaction");
- }
-
setAutoCommit(session, true);
setAutoCommit(session, false);
} finally {
@@ -172,9 +176,9 @@
public void rollback(DatabaseSession session) throws MulgaraTransactionException {
acquireMutex();
try {
- manager.reserveWriteLock(session);
- try {
- if (manager.isHoldingWriteLock(session)) {
+ if (manager.isHoldingWriteLock(session)) {
+ manager.reserveWriteLock(session);
+ try {
try {
writeTransaction.execute(new TransactionOperation() {
public void execute() throws MulgaraTransactionException {
@@ -188,18 +192,19 @@
new MulgaraTransactionException("Rollback failed to terminate write transaction"));
}
} finally {
- failedSessions.add(session);
+ sessionInfoMap.get(session).explicitXA = null;
setAutoCommit(session, false);
}
- } else if (failedSessions.contains(session)) {
- failedSessions.remove(session);
- setAutoCommit(session, false);
- } else {
- throw new MulgaraTransactionException(
- "Attempt to rollback while not in the current writing transaction");
+ } finally {
+ manager.releaseReserve(session);
}
- } finally {
- manager.releaseReserve(session);
+ } else if (failedSessions.contains(session)) {
+ sessionInfoMap.get(session).explicitXA = null;
+ failedSessions.remove(session);
+ setAutoCommit(session, false);
+ } else {
+ throw new MulgaraTransactionException(
+ "Attempt to rollback while not in the current writing transaction");
}
} finally {
releaseMutex();
@@ -215,8 +220,13 @@
new MulgaraTransactionException("Failed Session in setAutoCommit"));
}
+ SessionInfo sessInfo = getOrCreateSessionInfo(session);
+
if (manager.isHoldingWriteLock(session) || failedSessions.contains(session)) {
if (autoCommit) {
+ sessInfo.autoCommit = true;
+ sessInfo.explicitXA = null;
+
// AutoCommit off -> on === branch on current state of transaction.
if (manager.isHoldingWriteLock(session)) {
// Within active transaction - commit and finalise.
@@ -238,7 +248,6 @@
if (manager.isHoldingWriteLock(session)) {
manager.releaseWriteLock(session);
}
- this.autoCommit = true;
}
} else if (failedSessions.contains(session)) {
// Within failed transaction - cleanup.
@@ -246,12 +255,7 @@
}
} else {
if (!manager.isHoldingWriteLock(session)) {
- if (failedSessions.contains(session)) {
- failedSessions.remove(session);
- setAutoCommit(session, false);
- } else {
- throw new IllegalStateException("Can't reach here");
- }
+ throw new MulgaraTransactionException("Attempting set auto-commit false in failed session");
} else {
// AutoCommit off -> off === no-op. Log info.
if (logger.isInfoEnabled()) {
@@ -260,6 +264,7 @@
}
}
} else {
+ sessInfo.explicitXA = null;
if (autoCommit) {
// AutoCommit on -> on === no-op. Log info.
logger.info("Attempting to set autocommit true without setting it false");
@@ -267,7 +272,8 @@
// AutoCommit on -> off == Start new transaction.
getTransaction(session, true); // Set's writeTransaction.
writeTransaction.reference();
- this.autoCommit = false;
+ sessInfo.explicitXA = writeTransaction;
+ sessInfo.autoCommit = false;
}
}
} finally {
@@ -291,7 +297,7 @@
throw new MulgaraTransactionException("Attempt to start transaction twice");
}
- setTransactionTimeout((int) (sessionXAMap.get1(transaction).getTransactionTimeout() / 1000));
+ setTransactionTimeout((int) (xaSessionMap.get(transaction).getTransactionTimeout() / 1000));
transactionManager.begin();
Transaction jtaTrans = transactionManager.getTransaction();
@@ -338,7 +344,8 @@
"Attempt to suspend transaction from outside thread");
}
- if (autoCommit && transaction == writeTransaction) {
+ SessionInfo sessInfo = sessionInfoMap.get(xaSessionMap.get(transaction));
+ if (sessInfo.autoCommit && transaction == writeTransaction) {
logger.error("Attempt to suspend write transaction without setting AutoCommit Off");
throw new MulgaraTransactionException(
"Attempt to suspend write transaction without setting AutoCommit Off");
@@ -362,22 +369,39 @@
}
}
+ public void closingSession(DatabaseSession session) throws MulgaraTransactionException {
+ acquireMutex();
+ try {
+ try {
+ super.closingSession(session);
+ } finally {
+ SessionInfo sessInfo = sessionInfoMap.remove(session);
+ if (sessInfo != null) {
+ xaSessionMap.keySet().removeAll(sessInfo.transactions);
+ }
+ }
+ } finally {
+ releaseMutex();
+ }
+ }
+
public void transactionComplete(MulgaraTransaction transaction) throws MulgaraTransactionException {
acquireMutex();
try {
logger.debug("Transaction Complete");
+ DatabaseSession session = xaSessionMap.get(transaction);
+ if (session == null) {
+ throw new MulgaraTransactionException("No associated session found for transaction");
+ }
if (transaction == writeTransaction) {
- DatabaseSession session = sessionXAMap.get1(transaction);
- if (session == null) {
- throw new MulgaraTransactionException("No associated session found for write transaction");
- }
if (manager.isHoldingWriteLock(session)) {
manager.releaseWriteLock(session);
writeTransaction = null;
}
}
- sessionXAMap.removeN(transaction);
+ xaSessionMap.remove(transaction);
+ sessionInfoMap.get(session).transactions.remove(transaction);
activeTransactions.remove(Thread.currentThread());
} finally {
releaseMutex();
@@ -390,7 +414,7 @@
try {
// Make sure this cleans up the transaction metadata - this transaction is DEAD!
if (transaction == writeTransaction) {
- failedSessions.add(sessionXAMap.get1(transaction));
+ failedSessions.add(xaSessionMap.get(transaction));
}
transactionComplete(transaction);
} catch (Throwable th) {
@@ -409,4 +433,21 @@
logger.warn("Unable to set transaction timeout: " + transactionTimeout, es);
}
}
+
+ private SessionInfo getOrCreateSessionInfo(DatabaseSession session) {
+ SessionInfo info = sessionInfoMap.get(session);
+ if (info == null)
+ sessionInfoMap.put(session, info = new SessionInfo());
+ return info;
+ }
+
+ private static class SessionInfo {
+ public boolean autoCommit = true;
+
+ /** All uncompleted transactions (may be more than 1 because of unclosed answers) */
+ public Set<MulgaraTransaction> transactions = new HashSet<MulgaraTransaction>();
+
+ /** Currently associated explicit transaction */
+ public MulgaraTransaction explicitXA;
+ }
}
More information about the Mulgara-svn
mailing list