[Mulgara-svn] r955 - branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver

andrae at mulgara.org andrae at mulgara.org
Wed May 21 06:51:35 UTC 2008


Author: andrae
Date: 2008-05-20 23:51:34 -0700 (Tue, 20 May 2008)
New Revision: 955

Modified:
   branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransaction.java
   branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransactionFactory.java
   branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraInternalTransaction.java
   branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraInternalTransactionFactory.java
   branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransaction.java
   branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionFactory.java
   branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionManager.java
Log:
Implements a LockRecovery task assoicated with the TransactionManager, involved in recovering the write-lock
if a session fails to use the write-transaction within a given timeout.

Untested, committed to allow testing.



Modified: branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransaction.java
===================================================================
--- branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransaction.java	2008-05-21 06:13:27 UTC (rev 954)
+++ branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransaction.java	2008-05-21 06:51:34 UTC (rev 955)
@@ -68,6 +68,7 @@
   private boolean hRollback;
   private int heurCode;
   private boolean rollback;
+  private long lastActive;
 
   MulgaraExternalTransaction(MulgaraExternalTransactionFactory factory, Xid xid, DatabaseOperationContext context)
       throws QueryException {
@@ -85,6 +86,7 @@
     this.hRollback = false;
     this.heurCode = 0;
     this.rollback = false;
+    this.lastActive = System.currentTimeMillis();
 
     this.context.initiate(this);
   }
@@ -128,9 +130,14 @@
 
   public void execute(Operation operation, DatabaseMetadata metadata) throws MulgaraTransactionException {
     try {
+      long la = lastActive;
+      lastActive = -1;
+
       operation.execute(context,
                         context.getSystemResolver(),
                         metadata);
+
+      lastActive = (la != -1) ? System.currentTimeMillis() : -1;
     } catch (Throwable th) {
       try {
         rollback(xid);
@@ -143,7 +150,13 @@
 
   public AnswerOperationResult execute(AnswerOperation ao) throws TuplesException {
     try {
+      long la = lastActive;
+      lastActive = -1;
+
       ao.execute();
+
+      lastActive = (la != -1) ? System.currentTimeMillis() : -1;
+
       return ao.getResult();
     } catch (Throwable th) {
       try {
@@ -158,7 +171,12 @@
 
   // FIXME: See if we can't rearrange things to allow this to be deleted.
   public void execute(TransactionOperation to) throws MulgaraTransactionException {
+    long la = lastActive;
+    lastActive = -1;
+
     to.execute();
+
+    lastActive = (la != -1) ? System.currentTimeMillis() : -1;
   }
 
   public void enlist(EnlistableResource enlistable) throws MulgaraTransactionException {
@@ -180,12 +198,18 @@
     }
   }
 
+  public long lastActive() {
+    return lastActive;
+  }
+
   //
   // Methods used to manage transaction from XAResource.
   //
 
   void commit(Xid xid) throws XAException {
     report("commit");
+    lastActive = -1;
+
     // FIXME: Consider the possiblity prepare failed, or was incomplete.
     for (EnlistableResource er : prepared) {
       xaResources.get(er).commit(xid, false);
@@ -212,11 +236,14 @@
 
   void prepare(Xid xid) throws XAException {
     report("prepare");
+    long la = lastActive;
+    lastActive = -1;
+
     for (EnlistableResource er : enlisted) {
       xaResources.get(er).prepare(xid);
       prepared.add(er);
     }
-    // status = PREPARED; ?
+    lastActive = (la != -1) ? System.currentTimeMillis() : -1;
   }
 
   /**
@@ -225,6 +252,7 @@
    */
   void rollback(Xid xid) throws XAException {
     report("rollback");
+    lastActive = -1;
     try {
       rollback = true;
       Map<EnlistableResource, XAException> rollbackFailed = new HashMap<EnlistableResource, XAException>();

Modified: branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransactionFactory.java
===================================================================
--- branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransactionFactory.java	2008-05-21 06:13:27 UTC (rev 954)
+++ branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransactionFactory.java	2008-05-21 06:51:34 UTC (rev 955)
@@ -94,9 +94,10 @@
       }
 
       if (write) {
+          final MulgaraTransactionFactory This = this; // Required so inner-class can close over it.
           runWithoutMutex(new TransactionOperation() {
             public void execute() throws MulgaraTransactionException {
-              manager.obtainWriteLock(session);
+              manager.obtainWriteLock(session, This);
             }
           });
         try {
@@ -228,16 +229,4 @@
       releaseMutex();
     }
   }
-
-  void abortWriteTransaction() throws MulgaraTransactionException {
-    acquireMutex();
-    try {
-      if (writeTransaction != null) {
-        writeTransaction.abortTransaction("Explicit abort requested by write-lock manager", new Throwable());
-        writeTransaction = null;
-      }
-    } finally {
-      releaseMutex();
-    }
-  }
 }

Modified: branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraInternalTransaction.java
===================================================================
--- branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraInternalTransaction.java	2008-05-21 06:13:27 UTC (rev 954)
+++ branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraInternalTransaction.java	2008-05-21 06:51:34 UTC (rev 955)
@@ -87,6 +87,7 @@
   private Thread currentThread;
 
   private ReentrantLock activationMutex;
+  private long deactivateTime;
 
   private State state;
   private int inuse;
@@ -114,6 +115,7 @@
       using = 0;
       state = State.CONSTRUCTEDUNREF;
       rollbackCause = null;
+      deactivateTime = 0;
     } finally {
       report("Finished Creating Transaction");
     }
@@ -130,6 +132,7 @@
       }
       
       acquireActivationMutex();
+      deactivateTime = -1;
       try {
         switch (state) {
           case CONSTRUCTEDUNREF:
@@ -199,6 +202,7 @@
       }
       
       acquireActivationMutex();
+      deactivateTime = System.currentTimeMillis();
       try {
         switch (state) {
           case ACTUNREF:
@@ -659,6 +663,10 @@
     }
   }
 
+  public long lastActive() {
+    return deactivateTime;
+  }
+
   //
   // Used internally
   //

Modified: branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraInternalTransactionFactory.java
===================================================================
--- branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraInternalTransactionFactory.java	2008-05-21 06:13:27 UTC (rev 954)
+++ branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraInternalTransactionFactory.java	2008-05-21 06:51:34 UTC (rev 955)
@@ -94,10 +94,11 @@
 
       try {
         MulgaraInternalTransaction transaction;
+        final MulgaraTransactionFactory This = this; // Required so inner-class can close over it.
         if (write) {
           runWithoutMutex(new TransactionOperation() {
             public void execute() throws MulgaraTransactionException {
-              manager.obtainWriteLock(session);
+              manager.obtainWriteLock(session, This);
             }
           });
           try {
@@ -408,16 +409,4 @@
       logger.warn("Unable to set transaction timeout: " + transactionTimeout, es);
     }
   }
-
-  void abortWriteTransaction() throws MulgaraTransactionException {
-    acquireMutex();
-    try {
-      if (writeTransaction != null) {
-        writeTransaction.abortTransaction("Explicit abort requested by write-lock manager", new Throwable());
-        writeTransaction = null;
-      }
-    } finally {
-      releaseMutex();
-    }
-  }
 }

Modified: branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransaction.java
===================================================================
--- branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransaction.java	2008-05-21 06:13:27 UTC (rev 954)
+++ branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransaction.java	2008-05-21 06:51:34 UTC (rev 955)
@@ -90,4 +90,11 @@
    * abort().
    */
   public void enlist(EnlistableResource enlistable) throws MulgaraTransactionException;
+
+  /**
+   * Returns the currentTimeMillis when the transaction last used;
+   *   -1 if it is currently in use; or
+   *    0 if it has never been used.
+   */
+  public long lastActive();
 }

Modified: branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionFactory.java
===================================================================
--- branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionFactory.java	2008-05-21 06:13:27 UTC (rev 954)
+++ branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionFactory.java	2008-05-21 06:51:34 UTC (rev 955)
@@ -173,7 +173,39 @@
     }
   }
 
+
+  /** 
+   * Return time in millis when the current write transaction last activated.
+   */
+  public long getLastActiveHoldingLock() {
+    acquireMutex();
+    try {
+      if (writeTransaction != null) {
+        return writeTransaction.lastActive();
+      } else {
+        return 0;
+      }
+    } finally {
+      releaseMutex();
+    }
+  }
+
   /**
+   * Abort the current write Transaction.
+   */
+  public void abortWriteTransaction() throws MulgaraTransactionException {
+    acquireMutex();
+    try {
+      if (writeTransaction != null) {
+        writeTransaction.abortTransaction("Explicit abort requested by write-lock manager", new Throwable());
+        writeTransaction = null;
+      }
+    } finally {
+      releaseMutex();
+    }
+  }
+
+  /**
    * Abort as many of the transactions as we can.
    */
   protected void abortTransactions(Map<MulgaraTransaction, Throwable> requiresAbort) {

Modified: branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionManager.java
===================================================================
--- branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionManager.java	2008-05-21 06:13:27 UTC (rev 954)
+++ branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionManager.java	2008-05-21 06:51:34 UTC (rev 955)
@@ -22,6 +22,8 @@
 package org.mulgara.resolver;
 
 // Java2 packages
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -66,6 +68,7 @@
 
   // Write lock is associated with a session.
   private DatabaseSession sessionHoldingWriteLock;
+  private MulgaraTransactionFactory factoryWithWriteTransaction;
 
   // Used to support write-lock reservation.
   private DatabaseSession sessionReservingWriteLock;
@@ -77,14 +80,21 @@
   private final MulgaraInternalTransactionFactory internalFactory;
   private final MulgaraExternalTransactionFactory externalFactory;
 
+  private final Timer reaperTimer;
+  private final TimerTask reaperTask;
+
   public MulgaraTransactionManager(TransactionManagerFactory transactionManagerFactory) {
     this.sessionHoldingWriteLock = null;
     this.sessionReservingWriteLock = null;
+    this.factoryWithWriteTransaction = null;
     this.mutex = new ReentrantLock();
     this.writeLockCondition = this.mutex.newCondition();
 
     this.internalFactory = new MulgaraInternalTransactionFactory(this, transactionManagerFactory);
     this.externalFactory = new MulgaraExternalTransactionFactory(this);
+
+    this.reaperTimer = new Timer("Write-lock Reaper", true);
+    this.reaperTask = new LockReaper(reaperTimer, 900000); // 15 minutes inactivity.
   }
 
 
@@ -100,7 +110,7 @@
   /** 
    * Obtains the write lock.
    */
-  void obtainWriteLock(DatabaseSession session) throws MulgaraTransactionException {
+  void obtainWriteLock(DatabaseSession session, MulgaraTransactionFactory factory) throws MulgaraTransactionException {
     acquireMutex();
     try {
       if (sessionHoldingWriteLock == session) {
@@ -119,6 +129,7 @@
         logger.debug("Obtaining write lock", new Throwable());
       }
       sessionHoldingWriteLock = session;
+      factoryWithWriteTransaction = factory;
     } finally {
       releaseMutex();
     }
@@ -147,6 +158,7 @@
         logger.debug("Releasing writelock", new Throwable());
       }
       sessionHoldingWriteLock = null;
+      factoryWithWriteTransaction = null;
       writeLockCondition.signal();
     } finally {
       releaseMutex();
@@ -279,4 +291,92 @@
       releaseMutex();
     }
   }
+
+  class LockReaper extends TimerTask {
+    private final long MIN_PERIOD = 30000;
+
+    private Timer timer;
+    protected long timeoutMillis;
+
+    private boolean scheduled;
+
+    public LockReaper(Timer timer, long timeoutMillis) {
+      this.timer = timer;
+      this.timeoutMillis = (timeoutMillis > MIN_PERIOD) ? timeoutMillis : MIN_PERIOD; 
+      this.scheduled = false;
+      schedule(timeoutMillis);
+    }
+
+    public void run() {
+      logger.warn("Lock-reaper running");
+      acquireMutex();
+      scheduled = false;
+      boolean scheduleDefault;
+      try {
+        long lastActive = factoryWithWriteTransaction.getLastActiveHoldingLock();
+        if ((lastActive > 0) && (lastActive < System.currentTimeMillis() - timeoutMillis)) {
+          logger.warn("Reclaiming writelock from inactive transaction");
+          factoryWithWriteTransaction.abortWriteTransaction();
+          schedule(timeoutMillis);
+        } else {
+          schedule(timeoutMillis - (System.currentTimeMillis() - lastActive));
+        }
+      } catch (MulgaraTransactionException em) {
+        logger.warn("Exception thrown while reclaiming writelock from inactive transaction");
+      } finally {
+        try {
+          if (!scheduled) schedule(timeoutMillis);
+        } finally {
+          releaseMutex();
+        }
+      }
+    }
+
+    public void setTimeout(long timeout) {
+      acquireMutex();
+      try {
+        this.cancel();
+
+        timeoutMillis = (timeoutMillis > MIN_PERIOD) ? timeoutMillis : MIN_PERIOD; 
+
+        if (factoryWithWriteTransaction == null) {
+          schedule(timeoutMillis);
+
+          return;
+        }
+
+        long lastActive = factoryWithWriteTransaction.getLastActiveHoldingLock();
+        if (lastActive > 0) {
+          schedule(timeoutMillis - (System.currentTimeMillis() - lastActive));
+        } else {
+          schedule(timeoutMillis);
+        }
+      } finally {
+        try {
+          if (!scheduled) schedule(timeoutMillis);
+        } finally {
+          releaseMutex();
+        }
+      }
+    }
+
+    public long getTimeout() {
+      acquireMutex();
+      try {
+        return timeoutMillis;
+      } finally {
+        try {
+          if (!scheduled) schedule(timeoutMillis);
+        } finally {
+          releaseMutex();
+        }
+      }
+    }
+
+    private void schedule(long timeout) {
+      logger.warn("Scheduling lock-reaper to run in: " + timeout + "ms");
+      timer.schedule(this, (timeout < MIN_PERIOD) ? MIN_PERIOD : timeout);
+      scheduled = true;
+    }
+  }
 }




More information about the Mulgara-svn mailing list