[Mulgara-svn] r1065 - branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver
ronald at mulgara.org
ronald at mulgara.org
Mon Jul 7 12:54:45 UTC 2008
Author: ronald
Date: 2008-07-07 05:54:45 -0700 (Mon, 07 Jul 2008)
New Revision: 1065
Modified:
branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransaction.java
branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraInternalTransaction.java
branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionFactory.java
Log:
Reworked timeout task management and fixed invocation of heuristic-rollback.
If a timeout fires, a new thread is now spun up from which heuristicRollback
is called. This lets it block until the mutex is released without having to
worry about preventing other timeouts from triggering and keeps things simple.
Also, heuristicRollback now interrupts the thread holding the mutex, if any.
This will kick it out of waiting for a lock, but will not (yet) help if it's
in a long-running store operation (to fix this we'd have to sprinkle checks
for Thread.interrupted() inside the stores and resolvers).
Because the timeout tasks are now very short, the timer has been changed to
a static singleton, i.e. a shared timer thread for everybody. Aditionally,
instead of polling for the idle timeout, a new task is scheduled each time
for the next earliest possible idle timeout. This both reduces the number of
checks in the common case, and allows for short idle timeouts in order to
speed up testing (no minimal timeout is enforced). In many respects this
reverts to the pre- r977 algorithm with respect to idle timeouts.
In addition, the transaction-timeout and idle-timeout handling have been
merged into a single timeout task. There is one such task per transaction.
And lastly, as of now even read-only transactions are subject to the timeouts.
Modified: branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransaction.java
===================================================================
--- branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransaction.java 2008-07-07 12:54:40 UTC (rev 1064)
+++ branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransaction.java 2008-07-07 12:54:45 UTC (rev 1065)
@@ -102,7 +102,8 @@
throws MulgaraTransactionException {
report("abortTransaction");
- acquireMutex(10*1000L, true, MulgaraTransactionException.class); // 10 seconds
+ // we should actually already have the mutex, but let's make sure
+ acquireMutex(0L, true, MulgaraTransactionException.class);
try {
try {
for (EnlistableResource resource : enlisted) {
@@ -135,7 +136,8 @@
}
}
- acquireMutex(10*1000L, true, MulgaraTransactionException.class); // 10 seconds
+ factory.acquireMutexWithInterrupt(0L, MulgaraTransactionException.class);
+ inXACompletion = true;
}
try {
Modified: branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraInternalTransaction.java
===================================================================
--- branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraInternalTransaction.java 2008-07-07 12:54:40 UTC (rev 1064)
+++ branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraInternalTransaction.java 2008-07-07 12:54:45 UTC (rev 1065)
@@ -396,7 +396,8 @@
}
}
- acquireMutex(10*1000L, true, MulgaraTransactionException.class); // 10 seconds
+ factory.acquireMutexWithInterrupt(0L, MulgaraTransactionException.class);
+ inXACompletion = true;
}
try {
@@ -469,12 +470,6 @@
}
try {
- if (currentThread == null) {
- throw new MulgaraTransactionException("Transaction not associated with thread");
- } else if (!currentThread.equals(Thread.currentThread())) {
- throw new MulgaraTransactionException("Concurrent access attempted to transaction: Transaction has NOT been rolledback.");
- }
-
if (rollbackCause != null) {
errorReport("Cascading error, transaction already rolled back", cause);
errorReport("Cascade error, expected initial cause", rollbackCause);
@@ -528,7 +523,8 @@
*/
public MulgaraTransactionException abortTransaction(String errorMessage, Throwable cause)
throws MulgaraTransactionException {
- acquireMutex(10*1000L, true, MulgaraTransactionException.class); // 10 seconds
+ // we should actually already have the mutex, but let's make sure
+ acquireMutex(0L, true, MulgaraTransactionException.class);
try {
// We need to notify the factory here - this is serious, we
// need to rollback this transaction, but if we have reached here
Modified: branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionFactory.java
===================================================================
--- branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionFactory.java 2008-07-07 12:54:40 UTC (rev 1064)
+++ branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionFactory.java 2008-07-07 12:54:45 UTC (rev 1065)
@@ -21,6 +21,7 @@
package org.mulgara.resolver;
// Java2 packages
+import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -57,17 +58,15 @@
*/
public abstract class MulgaraTransactionFactory {
- private static final Logger logger =
- Logger.getLogger(MulgaraTransactionFactory.class.getName());
+ private static final Logger logger = Logger.getLogger(MulgaraTransactionFactory.class.getName());
+ private static final Timer reaperTimer = new Timer("Write-lock Reaper", true);
protected final MulgaraTransactionManager manager;
- protected final Timer reaperTimer;
- protected final IdleReaper idleReaperTask;
- protected long idleTimeout;
- protected ExtXAReaper extXAReaperTask;
protected final DatabaseSession session;
+ private final Map<MulgaraTransaction, XAReaper> timeoutTasks;
+
/**
* Contains a reference the the current writing transaction IFF it is managed
* by this factory. If there is no current writing transaction, or if the
@@ -83,12 +82,10 @@
protected MulgaraTransactionFactory(DatabaseSession session, MulgaraTransactionManager manager) {
this.session = session;
this.manager = manager;
+ this.timeoutTasks = new HashMap<MulgaraTransaction, XAReaper>();
this.mutexHolder = null;
this.lockCnt = 0;
this.writeTransaction = null;
-
- this.reaperTimer = new Timer("Write-lock Reaper", true);
- this.idleReaperTask = new IdleReaper(reaperTimer, 10*1000L); // check every 10 seconds
}
protected void setWriteTransaction(MulgaraTransaction txn) {
@@ -98,17 +95,22 @@
}
protected void transactionCreated(MulgaraTransaction transaction) {
- if (transaction != writeTransaction)
- return;
+ long idleTimeout = session.getIdleTimeout() > 0 ? session.getIdleTimeout() : 15*60*1000L;
+ long txnTimeout = session.getTransactionTimeout() > 0 ? session.getTransactionTimeout() : 60*60*1000L;
+ long now = System.currentTimeMillis();
- idleTimeout = session.getIdleTimeout() > 0 ? session.getIdleTimeout() : 15*60*1000L;
- extXAReaperTask = new ExtXAReaper(reaperTimer,
- session.getTransactionTimeout() > 0 ? session.getTransactionTimeout() : 60*60*1000L);
+ synchronized (getMutexLock()) {
+ timeoutTasks.put(transaction, new XAReaper(transaction, now + txnTimeout, idleTimeout, now));
+ }
}
protected void transactionComplete(MulgaraTransaction transaction) throws MulgaraTransactionException {
- if (transaction == writeTransaction)
- extXAReaperTask.cancel();
+ synchronized (getMutexLock()) {
+ XAReaper reaper = timeoutTasks.remove(transaction);
+ if (reaper != null) {
+ reaper.cancel();
+ }
+ }
}
/**
@@ -135,7 +137,6 @@
acquireMutex(0, MulgaraTransactionException.class);
try {
logger.debug("Cleaning up any stale transactions on session close");
- reaperTimer.cancel();
Map<MulgaraTransaction, Throwable> requiresAbort = new HashMap<MulgaraTransaction, Throwable>();
try {
@@ -197,6 +198,13 @@
logger.error("Error aborting transactions after heuristic rollback failure on session close", th);
} catch (Throwable throw_away) { }
}
+
+ synchronized (getMutexLock()) {
+ for (XAReaper reaper : timeoutTasks.values()) {
+ reaper.cancel();
+ }
+ timeoutTasks.clear();
+ }
}
} finally {
releaseMutex();
@@ -337,7 +345,7 @@
return mutexHolder;
}
- private static <T extends Throwable> T newException(Class<T> exc, String msg) {
+ public static <T extends Throwable> T newException(Class<T> exc, String msg) {
try {
return exc.getConstructor(String.class).newInstance(msg);
} catch (Exception e) {
@@ -345,67 +353,67 @@
}
}
- private static <T extends Throwable> T newExceptionOrCause(Class<T> exc, String msg, Throwable cause) {
+ public static <T extends Throwable> T newExceptionOrCause(Class<T> exc, String msg, Throwable cause) {
if (exc.isAssignableFrom(cause.getClass()))
return exc.cast(cause);
return exc.cast(newException(exc, msg).initCause(cause));
}
- private class IdleReaper extends TimerTask {
- public IdleReaper(Timer timer, long periodMillis) {
- logger.info("Idle-reaper created: " + System.identityHashCode(this));
- timer.schedule(this, periodMillis, periodMillis);
- }
+ private class XAReaper extends TimerTask {
+ private final MulgaraTransaction transaction;
+ private final long txnDeadline;
+ private final long idleTimeout;
- public void run() {
- logger.debug("Idle-reaper running");
+ public XAReaper(MulgaraTransaction transaction, long txnDeadline, long idleTimeout, long lastActive) {
+ this.transaction = transaction;
+ this.txnDeadline = txnDeadline;
+ this.idleTimeout = idleTimeout;
- MulgaraTransaction txn;
- synchronized (getMutexLock()) {
- txn = writeTransaction;
+ if (lastActive <= 0)
+ lastActive = System.currentTimeMillis();
+ long nextWakeup = Math.min(txnDeadline, lastActive + idleTimeout);
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Transaction-reaper created, txn=" + transaction + ", txnDeadline=" + txnDeadline +
+ ", idleTimeout=" + idleTimeout + ", nextWakeup=" + nextWakeup + ": " +
+ System.identityHashCode(getMutexLock()));
}
- long lastActive = (txn != null) ? txn.lastActive() : 0;
- try {
- if (txn != null) {
- if ((lastActive > 0) && (lastActive < System.currentTimeMillis() - idleTimeout)) {
- logger.warn("Reclaiming writelock from inactive transaction");
- txn.heuristicRollback("Transaction idle-timeout");
- } else {
- if (logger.isDebugEnabled())
- logger.debug("Transaction still active: " + lastActive + " time: " + System.currentTimeMillis() + " timeout: " + idleTimeout);
- }
- } else {
- logger.debug("No write-lock held.");
- }
- } catch (MulgaraTransactionException em) {
- logger.warn("Exception thrown while reclaiming writelock from inactive transaction");
- }
+ reaperTimer.schedule(this, new Date(nextWakeup));
}
- }
- private class ExtXAReaper extends TimerTask {
- private boolean die = false;
+ public void run() {
+ logger.debug("Transaction-reaper running, txn=" + transaction + ": " + System.identityHashCode(getMutexLock()));
- public ExtXAReaper(Timer timer, long timeoutMillis) {
- logger.debug("Extended-transaction-reaper created: " + System.identityHashCode(this));
- timer.schedule(this, timeoutMillis);
- }
+ long lastActive = transaction.lastActive();
+ long now = System.currentTimeMillis();
- public void run() {
- MulgaraTransaction txn;
synchronized (getMutexLock()) {
- txn = writeTransaction;
+ if (timeoutTasks.remove(transaction) == null)
+ return; // looks like we got cleaned up
+
+ if (now < txnDeadline && ((lastActive <= 0) || (now < lastActive + idleTimeout))) {
+ if (logger.isDebugEnabled())
+ logger.debug("Transaction still active: " + lastActive + " time: " + now + " idle-timeout: " + idleTimeout + " - rescheduling timer");
+
+ timeoutTasks.put(transaction, new XAReaper(transaction, txnDeadline, idleTimeout, lastActive));
+ return;
+ }
}
- try {
- if (txn != null) {
- logger.warn("Reclaiming writelock from over-extended transaction");
- txn.heuristicRollback("Transaction timed out");
+ final String txnType = (now >= txnDeadline) ? "over-extended" : "inactive";
+ final String toType = (now >= txnDeadline) ? "transaction" : "idle";
+
+ logger.warn("Rolling back " + txnType + " transaction");
+ new Thread(toType + "-timeout executor") {
+ public void run() {
+ try {
+ transaction.heuristicRollback(toType + "-timeout");
+ } catch (MulgaraTransactionException em) {
+ logger.warn("Exception thrown while rolling back " + txnType + " transaction");
+ }
}
- } catch (MulgaraTransactionException em) {
- logger.warn("Exception thrown while reclaiming writelock from over-extended transaction", em);
- }
+ }.start();
}
}
}
More information about the Mulgara-svn
mailing list