[Mulgara-svn] r1016 - branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver
ronald at mulgara.org
ronald at mulgara.org
Mon Jun 23 06:02:39 UTC 2008
Author: ronald
Date: 2008-06-22 23:02:39 -0700 (Sun, 22 Jun 2008)
New Revision: 1016
Modified:
branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/ExternalTransactionUnitTest.java
branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransactionFactory.java
branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraInternalTransactionFactory.java
branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionFactory.java
branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionManager.java
Log:
Move timeout code from MulgaraTransactionManager to the
MulgaraTransactionFactory's. This fixes a deadlock, because most operations
acquire the the MulgaraTransactionFactory mutex and then acquire the
MulgaraTransactionManager mutex, but the timeout code was acquiring them in
the opposite order.
Modified: branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/ExternalTransactionUnitTest.java
===================================================================
--- branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/ExternalTransactionUnitTest.java 2008-06-23 05:59:45 UTC (rev 1015)
+++ branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/ExternalTransactionUnitTest.java 2008-06-23 06:02:39 UTC (rev 1016)
@@ -121,6 +121,7 @@
suite.addTest(new ExternalTransactionUnitTest("testInternalExternalConcurrentTxnRollback"));
suite.addTest(new ExternalTransactionUnitTest("testExternalInternalConcurrentTxnRollback"));
suite.addTest(new ExternalTransactionUnitTest("testInternalSerialMultipleSessions"));
+ suite.addTest(new ExternalTransactionUnitTest("testTransactionTimeout"));
suite.addTest(new ExternalTransactionUnitTest("testTransactionFailure"));
return suite;
@@ -1513,6 +1514,171 @@
/**
+ * Test transaction timeout.
+ */
+ public void testTransactionTimeout() throws URISyntaxException {
+ logger.info("testTransactionTimeout");
+ URI fileURI = new File("data/xatest-model1.rdf").toURI();
+
+ // test idle timeout
+ try {
+ Session session1 = database.newSession();
+ session1.setIdleTimeout(10000);
+
+ try {
+ XAResource resource = session1.getXAResource();
+ Xid xid = new TestXid(1);
+ resource.start(xid, XAResource.TMNOFLAGS);
+
+ session1.createModel(model3URI, null);
+
+ resource.end(xid, XAResource.TMSUCCESS);
+ resource.commit(xid, true);
+
+ logger.debug("Starting transaction for session1");
+ resource.start(xid, XAResource.TMNOFLAGS);
+ logger.debug("Started transaction for session1");
+
+ Thread t2 = new Thread("tx2Test") {
+ public void run() {
+ try {
+ Session session2 = database.newSession();
+ try {
+ logger.debug("Obtaining autocommit for session2");
+ session2.setAutoCommit(false);
+ logger.debug("Obtained autocommit for session2");
+
+ // Evaluate the query
+ Answer answer = session2.query(createQuery(model3URI));
+
+ answer.beforeFirst();
+ assertFalse(answer.next());
+ answer.close();
+
+ logger.debug("Releasing autocommit for session2");
+ session2.setAutoCommit(true);
+ } finally {
+ session2.close();
+ }
+ } catch (Exception e) {
+ fail(e);
+ }
+ }
+ };
+ t2.start();
+
+ session1.setModel(model3URI, new ModelResource(fileURI));
+ logger.debug("Sleeping for 20sec");
+ Thread.sleep(20000);
+ logger.debug("Slept for 20sec");
+ try {
+ t2.join(2000L);
+ } catch (InterruptedException ie) {
+ logger.error("wait for tx2-terminated interrupted", ie);
+ fail(ie);
+ }
+ assertFalse("second transaction should've terminated", t2.isAlive());
+
+ boolean xaeThrown = false;
+ try {
+ resource.end(xid, XAResource.TMSUCCESS);
+ resource.commit(xid, true);
+ } catch (XAException xae) {
+ xaeThrown = true;
+ }
+
+ assertTrue("Commit should have failed due to idle timeout", xaeThrown);
+
+ logger.debug("Rolling back transaction on session1");
+ resource.rollback(xid);
+ } finally {
+ session1.close();
+ }
+ } catch (Exception e) {
+ fail(e);
+ }
+
+ // test transaction timeout
+ try {
+ Session session1 = database.newSession();
+ try {
+ XAResource resource = session1.getXAResource();
+ Xid xid = new TestXid(1);
+ resource.start(xid, XAResource.TMNOFLAGS);
+
+ session1.createModel(model3URI, null);
+
+ resource.end(xid, XAResource.TMSUCCESS);
+ resource.commit(xid, true);
+
+ logger.debug("Starting transaction for session1");
+ assertTrue(resource.setTransactionTimeout(10));
+ assertEquals(10, resource.getTransactionTimeout());
+ resource.start(xid, XAResource.TMNOFLAGS);
+ logger.debug("Started transaction for session1");
+
+ Thread t2 = new Thread("tx2Test") {
+ public void run() {
+ try {
+ Session session2 = database.newSession();
+ try {
+ logger.debug("Obtaining autocommit for session2");
+ session2.setAutoCommit(false);
+ logger.debug("Obtained autocommit for session2");
+
+ // Evaluate the query
+ Answer answer = session2.query(createQuery(model3URI));
+
+ answer.beforeFirst();
+ assertFalse(answer.next());
+ answer.close();
+
+ logger.debug("Releasing autocommit for session2");
+ session2.setAutoCommit(true);
+ } finally {
+ session2.close();
+ }
+ } catch (Exception e) {
+ fail(e);
+ }
+ }
+ };
+ t2.start();
+
+ session1.setModel(model3URI, new ModelResource(fileURI));
+ logger.debug("Sleeping for 10sec");
+ Thread.sleep(10000);
+ logger.debug("Slept for 10sec");
+ try {
+ t2.join(2000L);
+ } catch (InterruptedException ie) {
+ logger.error("wait for tx2-terminated interrupted", ie);
+ fail(ie);
+ }
+ assertFalse("second transaction should've terminated", t2.isAlive());
+
+ boolean xaeThrown = false;
+ try {
+ resource.end(xid, XAResource.TMSUCCESS);
+ resource.commit(xid, true);
+ } catch (XAException xae) {
+ xaeThrown = true;
+ }
+
+ assertTrue("Commit should have failed due to transaction timeout", xaeThrown);
+
+ logger.debug("Rolling back transaction on session1");
+ resource.rollback(xid);
+ } finally {
+ session1.close();
+ }
+ } catch (Exception e) {
+ fail(e);
+ }
+ }
+
+
+ /**
* Test various operations after a transaction fails.
*/
public void testTransactionFailure() {
Modified: branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransactionFactory.java
===================================================================
--- branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransactionFactory.java 2008-06-23 05:59:45 UTC (rev 1015)
+++ branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransactionFactory.java 2008-06-23 06:02:39 UTC (rev 1016)
@@ -97,7 +97,7 @@
// see comment in MulgaraInternalTransactionFactory regarding releasing the lock here
runWithoutMutex(new TransactionOperation() {
public void execute() throws MulgaraTransactionException {
- manager.obtainWriteLock(session, MulgaraExternalTransactionFactory.this);
+ manager.obtainWriteLock(session);
}
});
try {
@@ -105,6 +105,7 @@
writeTransaction = xa;
associatedTransaction.put(session, xa);
sessionXAMap.put(session, xa);
+ transactionCreated(xa, session);
return xa;
} catch (Throwable th) {
@@ -116,6 +117,7 @@
MulgaraExternalTransaction xa = new MulgaraExternalTransaction(this, xid, session.newOperationContext(false));
associatedTransaction.put(session, xa);
sessionXAMap.put(session, xa);
+ transactionCreated(xa, session);
return xa;
} catch (QueryException eq) {
@@ -168,6 +170,8 @@
public void transactionComplete(MulgaraExternalTransaction xa)
throws MulgaraTransactionException {
acquireMutex();
+ super.transactionComplete(xa);
+
try {
if (xa == null) {
throw new IllegalArgumentException("Null transaction indicated completion");
Modified: branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraInternalTransactionFactory.java
===================================================================
--- branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraInternalTransactionFactory.java 2008-06-23 05:59:45 UTC (rev 1015)
+++ branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraInternalTransactionFactory.java 2008-06-23 06:02:39 UTC (rev 1016)
@@ -105,7 +105,7 @@
// ourselves first).
runWithoutMutex(new TransactionOperation() {
public void execute() throws MulgaraTransactionException {
- manager.obtainWriteLock(session, MulgaraInternalTransactionFactory.this);
+ manager.obtainWriteLock(session);
}
});
try {
@@ -122,6 +122,7 @@
xaSessionMap.put(transaction, session);
sessInfo.transactions.add(transaction);
+ transactionCreated(transaction, session);
return transaction;
} catch (MulgaraTransactionException em) {
@@ -394,6 +395,8 @@
public void transactionComplete(MulgaraTransaction transaction) throws MulgaraTransactionException {
acquireMutex();
+ super.transactionComplete(transaction);
+
try {
logger.debug("Transaction Complete");
DatabaseSession session = xaSessionMap.get(transaction);
Modified: branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionFactory.java
===================================================================
--- branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionFactory.java 2008-06-23 05:59:45 UTC (rev 1015)
+++ branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionFactory.java 2008-06-23 06:02:39 UTC (rev 1016)
@@ -26,6 +26,8 @@
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.locks.ReentrantLock;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
@@ -67,7 +69,12 @@
Logger.getLogger(MulgaraTransactionFactory.class.getName());
protected final MulgaraTransactionManager manager;
-
+
+ protected final Timer reaperTimer;
+ protected final IdleReaper idleReaperTask;
+ protected long idleTimeout;
+ protected ExtXAReaper extXAReaperTask;
+
/**
* Contains a reference the the current writing transaction IFF it is managed
* by this factory. If there is no current writing transaction, or if the
@@ -81,9 +88,26 @@
this.manager = manager;
this.mutex = new ReentrantLock();
this.writeTransaction = null;
+
+ this.reaperTimer = new Timer("Write-lock Reaper", true);
+ this.idleReaperTask = new IdleReaper(reaperTimer, 10*1000L); // check every 10 seconds
}
+ protected void transactionCreated(MulgaraTransaction transaction, DatabaseSession session) {
+ if (transaction != writeTransaction)
+ return;
+
+ idleTimeout = session.getIdleTimeout() > 0 ? session.getIdleTimeout() : 15*60*1000L;
+ extXAReaperTask = new ExtXAReaper(reaperTimer,
+ session.getTransactionTimeout() > 0 ? session.getTransactionTimeout() : 60*60*1000L);
+ }
+
+ protected void transactionComplete(MulgaraTransaction transaction) throws MulgaraTransactionException {
+ if (transaction == writeTransaction)
+ extXAReaperTask.cancel();
+ }
+
/**
* Obtain a transaction context associated with a DatabaseSession.
*
@@ -175,38 +199,7 @@
}
- /**
- * Return time in millis when the current write transaction last activated.
- */
- public long getLastActiveHoldingLock() {
- acquireMutex();
- try {
- if (writeTransaction != null) {
- return writeTransaction.lastActive();
- } else {
- return 0;
- }
- } finally {
- releaseMutex();
- }
- }
-
/**
- * Abort the current write Transaction.
- */
- public void abortWriteTransaction() throws MulgaraTransactionException {
- acquireMutex();
- try {
- if (writeTransaction != null) {
- writeTransaction.abortTransaction("Explicit abort requested by write-lock manager", new Throwable());
- writeTransaction = null;
- }
- } finally {
- releaseMutex();
- }
- }
-
- /**
* Abort as many of the transactions as we can.
*/
protected void abortTransactions(Map<MulgaraTransaction, Throwable> requiresAbort) {
@@ -279,4 +272,69 @@
}
}
}
+
+ private class IdleReaper extends TimerTask {
+ public IdleReaper(Timer timer, long periodMillis) {
+ logger.info("Idle-reaper created: " + System.identityHashCode(this));
+ timer.schedule(this, periodMillis, periodMillis);
+ }
+
+ public void run() {
+ logger.debug("Idle-reaper running");
+ acquireMutex();
+
+ try {
+ if (writeTransaction != null) {
+ long lastActive = writeTransaction.lastActive();
+ if ((lastActive > 0) && (lastActive < System.currentTimeMillis() - idleTimeout)) {
+ logger.warn("Reclaiming writelock from inactive transaction");
+ writeTransaction.abortTransaction("Transaction idle-timeout", new Throwable());
+ writeTransaction = null;
+ } else {
+ if (logger.isDebugEnabled())
+ logger.debug("Transaction still active: " + lastActive + " time: " + System.currentTimeMillis() + " timeout: " + idleTimeout);
+ }
+ } else {
+ logger.debug("No write-lock held.");
+ }
+ } catch (MulgaraTransactionException em) {
+ logger.warn("Exception thrown while reclaiming writelock from inactive transaction");
+ } finally {
+ releaseMutex();
+ }
+ }
+ }
+
+ private class ExtXAReaper extends TimerTask {
+ private boolean die = false;
+
+ public ExtXAReaper(Timer timer, long timeoutMillis) {
+ logger.debug("Extended-transaction-reaper created: " + System.identityHashCode(this));
+ timer.schedule(this, timeoutMillis);
+ }
+
+ public boolean cancel() {
+ die = true;
+ return super.cancel();
+ }
+
+ public void run() {
+ acquireMutex();
+
+ try {
+ if (die)
+ return; // we were cancelled after being scheduled but before mutex was acquired
+
+ if (writeTransaction != null) {
+ logger.warn("Reclaiming writelock from over-extended transaction");
+ writeTransaction.abortTransaction("Transaction timed out", new Throwable());
+ writeTransaction = null;
+ }
+ } catch (MulgaraTransactionException em) {
+ logger.warn("Exception thrown while reclaiming writelock from over-extended transaction", em);
+ } finally {
+ releaseMutex();
+ }
+ }
+ }
}
Modified: branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionManager.java
===================================================================
--- branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionManager.java 2008-06-23 05:59:45 UTC (rev 1015)
+++ branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionManager.java 2008-06-23 06:02:39 UTC (rev 1016)
@@ -22,8 +22,6 @@
package org.mulgara.resolver;
// Java2 packages
-import java.util.Timer;
-import java.util.TimerTask;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@@ -68,7 +66,6 @@
// Write lock is associated with a session.
private DatabaseSession sessionHoldingWriteLock;
- private MulgaraTransactionFactory factoryWithWriteTransaction;
// Used to support write-lock reservation.
private DatabaseSession sessionReservingWriteLock;
@@ -80,23 +77,14 @@
private final MulgaraInternalTransactionFactory internalFactory;
private final MulgaraExternalTransactionFactory externalFactory;
- private final Timer reaperTimer;
- private IdleReaper idleReaperTask;
- private long idleTimeout;
- private LockReaper lockReaperTask;
-
public MulgaraTransactionManager(TransactionManagerFactory transactionManagerFactory) {
this.sessionHoldingWriteLock = null;
this.sessionReservingWriteLock = null;
- this.factoryWithWriteTransaction = null;
this.mutex = new ReentrantLock();
this.writeLockCondition = this.mutex.newCondition();
this.internalFactory = new MulgaraInternalTransactionFactory(this, transactionManagerFactory);
this.externalFactory = new MulgaraExternalTransactionFactory(this);
-
- this.reaperTimer = new Timer("Write-lock Reaper", true);
- this.idleReaperTask = new IdleReaper(reaperTimer, 10*1000L); // check every 10 seconds
}
@@ -112,7 +100,7 @@
/**
* Obtains the write lock.
*/
- void obtainWriteLock(DatabaseSession session, MulgaraTransactionFactory factory) throws MulgaraTransactionException {
+ void obtainWriteLock(DatabaseSession session) throws MulgaraTransactionException {
acquireMutex();
try {
if (sessionHoldingWriteLock == session) {
@@ -131,10 +119,6 @@
logger.debug("Obtaining write lock", new Throwable());
}
sessionHoldingWriteLock = session;
- factoryWithWriteTransaction = factory;
- this.idleTimeout = session.getIdleTimeout() > 0 ? session.getIdleTimeout() : 15*60*1000L;
- this.lockReaperTask =
- new LockReaper(reaperTimer, session.getTransactionTimeout() > 0 ? session.getTransactionTimeout() : 60*60*1000L);
} finally {
releaseMutex();
}
@@ -163,8 +147,6 @@
logger.debug("Releasing writelock", new Throwable());
}
sessionHoldingWriteLock = null;
- factoryWithWriteTransaction = null;
- lockReaperTask.cancel();
writeLockCondition.signal();
} finally {
releaseMutex();
@@ -293,65 +275,4 @@
releaseMutex();
}
}
-
- private class IdleReaper extends TimerTask {
- public IdleReaper(Timer timer, long periodMillis) {
- logger.info("Idle-reaper created: " + System.identityHashCode(this));
- timer.schedule(this, periodMillis, periodMillis);
- }
-
- public void run() {
- logger.debug("Idle-reaper running");
- acquireMutex();
-
- try {
- if (factoryWithWriteTransaction != null) {
- long lastActive = factoryWithWriteTransaction.getLastActiveHoldingLock();
- if ((lastActive > 0) && (lastActive < System.currentTimeMillis() - idleTimeout)) {
- logger.warn("Reclaiming writelock from inactive transaction");
- factoryWithWriteTransaction.abortWriteTransaction();
- } else {
- if (logger.isDebugEnabled())
- logger.debug("Transaction still active: " + lastActive + " time: " + System.currentTimeMillis() + " timeout: " + idleTimeout);
- }
- } else {
- logger.debug("No write-lock held.");
- }
- } catch (MulgaraTransactionException em) {
- logger.warn("Exception thrown while reclaiming writelock from inactive transaction");
- } finally {
- releaseMutex();
- }
- }
- }
-
- private class LockReaper extends TimerTask {
- private boolean die = false;
-
- public LockReaper(Timer timer, long timeoutMillis) {
- logger.debug("Lock-reaper created: " + System.identityHashCode(this));
- timer.schedule(this, timeoutMillis);
- }
-
- public boolean cancel() {
- die = true;
- return super.cancel();
- }
-
- public void run() {
- acquireMutex();
-
- try {
- if (die)
- return; // we were cancelled after being scheduled but before mutex was acquired
-
- logger.warn("Reclaiming writelock from over-extended transaction");
- factoryWithWriteTransaction.abortWriteTransaction();
- } catch (MulgaraTransactionException em) {
- logger.warn("Exception thrown while reclaiming writelock from over-extended transaction", em);
- } finally {
- releaseMutex();
- }
- }
- }
}
More information about the Mulgara-svn
mailing list