[Mulgara-svn] r956 - branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver
andrae at mulgara.org
andrae at mulgara.org
Thu May 22 07:25:14 UTC 2008
Author: andrae
Date: 2008-05-22 00:25:13 -0700 (Thu, 22 May 2008)
New Revision: 956
Modified:
branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/AdvDatabaseSessionUnitTest.java
branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/Database.java
branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionManager.java
Log:
I'm not positive there aren't concurreny problems yet, I would like to take a
bit longer to test, but this should work.
Modified: branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/AdvDatabaseSessionUnitTest.java
===================================================================
--- branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/AdvDatabaseSessionUnitTest.java 2008-05-21 06:51:34 UTC (rev 955)
+++ branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/AdvDatabaseSessionUnitTest.java 2008-05-22 07:25:13 UTC (rev 956)
@@ -125,6 +125,7 @@
suite.addTest(new AdvDatabaseSessionUnitTest("testImplicitCommitQuery"));
suite.addTest(new AdvDatabaseSessionUnitTest("testConcurrentExplicitTxn"));
suite.addTest(new AdvDatabaseSessionUnitTest("testConcurrentImplicitTxn"));
+ suite.addTest(new AdvDatabaseSessionUnitTest("testConcurrentImplicitRecovery"));
suite.addTest(new AdvDatabaseSessionUnitTest("testPrefixingWithUnbound"));
suite.addTest(new AdvDatabaseSessionUnitTest("testDatabaseDelete"));
suite.addTest(new AdvDatabaseSessionUnitTest("testCreateModel"));
@@ -1519,6 +1520,106 @@
}
+ /**
+ * Test two simultaneous transactions, the first one obtains the write-lock
+ * and sleeps longer than the recovery timeout.
+ */
+ public void testConcurrentImplicitRecovery() throws URISyntaxException {
+ logger.info("testConcurrentImplicitRecovery");
+ URI fileURI = new File("data/xatest-model1.rdf").toURI();
+
+ database.setWriteLockTimeout(30000);
+ try {
+ Session session1 = database.newSession();
+ try {
+ session1.createModel(model3URI, null);
+ logger.debug("Obtaining autocommit for session1");
+ session1.setAutoCommit(false);
+ logger.debug("Obtained autocommit for session1");
+
+ Thread t2 = new Thread("tx2Test") {
+ public void run() {
+ try {
+ Session session2 = database.newSession();
+ try {
+ logger.debug("Obtaining autocommit for session2");
+ session2.setAutoCommit(false);
+ logger.debug("Obtained autocommit for session2");
+ Variable subjectVariable = new Variable("subject");
+ Variable predicateVariable = new Variable("predicate");
+ Variable objectVariable = new Variable("object");
+
+ List<SelectElement> selectList = new ArrayList<SelectElement>(3);
+ selectList.add(subjectVariable);
+ selectList.add(predicateVariable);
+ selectList.add(objectVariable);
+
+ // Evaluate the query
+ Answer answer = session2.query(new Query(
+ selectList, // SELECT
+ new ModelResource(model3URI), // FROM
+ new ConstraintImpl(subjectVariable, // WHERE
+ predicateVariable,
+ objectVariable),
+ null, // HAVING
+ Arrays.asList(new Order[] { // ORDER BY
+ new Order(subjectVariable, true),
+ new Order(predicateVariable, true),
+ new Order(objectVariable, true)
+ }),
+ null, // LIMIT
+ 0, // OFFSET
+ new UnconstrainedAnswer() // GIVEN
+ ));
+
+ answer.beforeFirst();
+ assertFalse(answer.next());
+ answer.close();
+
+ logger.debug("Releasing autocommit for session2");
+ session2.setAutoCommit(true);
+ } finally {
+ session2.close();
+ }
+ } catch (Exception e) {
+ fail(e);
+ }
+ }
+ };
+ t2.start();
+
+ session1.setModel(model3URI, new ModelResource(fileURI));
+ logger.debug("Sleeping for 40sec");
+ Thread.sleep(40000);
+ logger.debug("Slept for 40sec");
+ try {
+ t2.join(2000L);
+ } catch (InterruptedException ie) {
+ logger.error("wait for tx2-terminated interrupted", ie);
+ fail(ie);
+ }
+ assertFalse("second transaction should've terminated", t2.isAlive());
+
+ boolean qeThrown = false;
+ try {
+ session1.commit();
+ } catch (QueryException eq) {
+ qeThrown = true;
+ }
+
+ assertTrue("Commit should have failed due to lock timeout", qeThrown);
+
+ logger.debug("Releasing autocommit for session1");
+ session1.setAutoCommit(true);
+ } finally {
+ session1.close();
+ }
+ } catch (Exception e) {
+ fail(e);
+ }
+ }
+
+
public void testPrefixingWithUnbound() throws URISyntaxException {
logger.warn("testPrefixingWithUnbound");
URI fileURI = new File("data/prefix-unbound.rdf").toURI();
Modified: branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/Database.java
===================================================================
--- branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/Database.java 2008-05-21 06:51:34 UTC (rev 955)
+++ branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/Database.java 2008-05-22 07:25:13 UTC (rev 956)
@@ -828,6 +828,11 @@
}
+ public void setWriteLockTimeout(long timeout) {
+ transactionManager.setLockTimeout(timeout);
+ }
+
+
/**
* Flush all resources associated with the database into a recoverable state.
*/
Modified: branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionManager.java
===================================================================
--- branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionManager.java 2008-05-21 06:51:34 UTC (rev 955)
+++ branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionManager.java 2008-05-22 07:25:13 UTC (rev 956)
@@ -81,7 +81,7 @@
private final MulgaraExternalTransactionFactory externalFactory;
private final Timer reaperTimer;
- private final TimerTask reaperTask;
+ private LockReaper reaperTask;
public MulgaraTransactionManager(TransactionManagerFactory transactionManagerFactory) {
this.sessionHoldingWriteLock = null;
@@ -170,6 +170,16 @@
internalFactory.setTransactionTimeout(transactionTimeout);
}
+
+ public void setLockTimeout(long timeout) {
+ acquireMutex();
+ try {
+ reaperTask.setTimeout(timeout);
+ } finally {
+ releaseMutex();
+ }
+ }
+
/**
* Used to replace the built in monitor to allow it to be properly released
* during potentially blocking operations. All potentially blocking
@@ -298,34 +308,47 @@
private Timer timer;
protected long timeoutMillis;
- private boolean scheduled;
+ private boolean die;
public LockReaper(Timer timer, long timeoutMillis) {
+ logger.info("Lock-reaper created: " + System.identityHashCode(this));
this.timer = timer;
this.timeoutMillis = (timeoutMillis > MIN_PERIOD) ? timeoutMillis : MIN_PERIOD;
- this.scheduled = false;
- schedule(timeoutMillis);
+ this.die = false;
+ schedule();
}
public void run() {
logger.warn("Lock-reaper running");
acquireMutex();
- scheduled = false;
- boolean scheduleDefault;
+
try {
- long lastActive = factoryWithWriteTransaction.getLastActiveHoldingLock();
- if ((lastActive > 0) && (lastActive < System.currentTimeMillis() - timeoutMillis)) {
- logger.warn("Reclaiming writelock from inactive transaction");
- factoryWithWriteTransaction.abortWriteTransaction();
- schedule(timeoutMillis);
+ if (die) {
+ logger.info("Lock-reaper dying on request: " + System.identityHashCode(this));
+ return;
+ }
+
+ if (factoryWithWriteTransaction != null) {
+ long lastActive = factoryWithWriteTransaction.getLastActiveHoldingLock();
+ if ((lastActive > 0) && (lastActive < System.currentTimeMillis() - timeoutMillis)) {
+ logger.warn("Reclaiming writelock from inactive transaction");
+ factoryWithWriteTransaction.abortWriteTransaction();
+ } else {
+ logger.debug("Transaction still active: " + lastActive + " time: " + System.currentTimeMillis() + " timeout: " + timeoutMillis);
+ }
} else {
- schedule(timeoutMillis - (System.currentTimeMillis() - lastActive));
+ logger.debug("No write-lock held.");
}
} catch (MulgaraTransactionException em) {
logger.warn("Exception thrown while reclaiming writelock from inactive transaction");
} finally {
try {
- if (!scheduled) schedule(timeoutMillis);
+ if (!die) {
+ logger.debug("Rescheduling lock-reaper: " + System.identityHashCode(this));
+ schedule();
+ } else {
+ logger.debug("Not rescheduling lock-reaper on request: " + System.identityHashCode(this));
+ }
} finally {
releaseMutex();
}
@@ -335,28 +358,10 @@
public void setTimeout(long timeout) {
acquireMutex();
try {
- this.cancel();
-
- timeoutMillis = (timeoutMillis > MIN_PERIOD) ? timeoutMillis : MIN_PERIOD;
-
- if (factoryWithWriteTransaction == null) {
- schedule(timeoutMillis);
-
- return;
- }
-
- long lastActive = factoryWithWriteTransaction.getLastActiveHoldingLock();
- if (lastActive > 0) {
- schedule(timeoutMillis - (System.currentTimeMillis() - lastActive));
- } else {
- schedule(timeoutMillis);
- }
+ reaperTask = new LockReaper(timer, timeout);
+ this.die = true;
} finally {
- try {
- if (!scheduled) schedule(timeoutMillis);
- } finally {
- releaseMutex();
- }
+ releaseMutex();
}
}
@@ -365,18 +370,25 @@
try {
return timeoutMillis;
} finally {
- try {
- if (!scheduled) schedule(timeoutMillis);
- } finally {
- releaseMutex();
- }
+ releaseMutex();
}
}
- private void schedule(long timeout) {
+ private void schedule() {
+ long timeout = timeoutMillis;
+
+ if (factoryWithWriteTransaction != null) {
+ long lastActive = factoryWithWriteTransaction.getLastActiveHoldingLock();
+
+ if (lastActive > 0) {
+ timeout = timeoutMillis - (System.currentTimeMillis() - lastActive);
+ }
+ }
+
+ timeout = (timeout > MIN_PERIOD) ? timeout : MIN_PERIOD;
+
logger.warn("Scheduling lock-reaper to run in: " + timeout + "ms");
- timer.schedule(this, (timeout < MIN_PERIOD) ? MIN_PERIOD : timeout);
- scheduled = true;
+ timer.schedule(this, timeout);
}
}
}
More information about the Mulgara-svn
mailing list