[Mulgara-svn] r121 - in branches/xafix-impl: . src/jar/resolver/java/org/mulgara/resolver

andrae at mulgara.org andrae at mulgara.org
Tue Oct 31 00:11:40 UTC 2006


Author: andrae
Date: 2006-10-30 18:11:40 -0600 (Mon, 30 Oct 2006)
New Revision: 121

Modified:
   branches/xafix-impl/log4j-conf.xml
   branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/BasicDatabaseSessionUnitTest.java
   branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/DatabaseOperationContext.java
   branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/DatabaseSession.java
   branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransaction.java
   branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/TransactionalAnswer.java
Log:
This is a repeat of revision 115, only hopefully this time in the correct
branch.  r115's message is included below:

 - r115:
Wraps TransactionalAnswer around SubqueryAnswer as per final plan.  This
introduces MulgaraTransaction into execution path of Answers.  In light of this
there is new sanity checking code in MulgaraTransaction and TransactionalAnswer
ensuring that everything is always cleaned up properly.

One unconsidered gotcha uncovered thus far is the need to force have Answers to
be closed when their enclosing session is closed.  Failure to do this is
detected via finalize methods on TA and MT.



Modified: branches/xafix-impl/log4j-conf.xml
===================================================================
--- branches/xafix-impl/log4j-conf.xml	2006-10-31 00:03:42 UTC (rev 120)
+++ branches/xafix-impl/log4j-conf.xml	2006-10-31 00:11:40 UTC (rev 121)
@@ -51,6 +51,12 @@
   </appender>
 
   <!-- (Insert category elements here) -->
+  <category name="org.mulgara.resolver.MulgaraTransaction">
+    <priority value="debug"/>
+  </category>
+  <category name="org.mulgara.resolver.TransactionalAnswer">
+    <priority value="debug"/>
+  </category>
 
   <!-- Default is to log messages of "warn" priority to the logfile appender -->
   <root>

Modified: branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/BasicDatabaseSessionUnitTest.java
===================================================================
--- branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/BasicDatabaseSessionUnitTest.java	2006-10-31 00:03:42 UTC (rev 120)
+++ branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/BasicDatabaseSessionUnitTest.java	2006-10-31 00:11:40 UTC (rev 121)
@@ -225,6 +225,7 @@
             new UnconstrainedAnswer()           // GIVEN
           )));
 
+
         // Compose the expected result of the query
         Answer expectedAnswer = new ArrayAnswer(
             new Variable[] { subjectVariable, predicateVariable, objectVariable },

Modified: branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/DatabaseOperationContext.java
===================================================================
--- branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/DatabaseOperationContext.java	2006-10-31 00:03:42 UTC (rev 120)
+++ branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/DatabaseOperationContext.java	2006-10-31 00:11:40 UTC (rev 121)
@@ -143,6 +143,7 @@
   /** Symbolic transformations this instance should apply. */
   private final List symbolicTransformationList;
 
+  private WeakHashMap answers;  // Used as a set, all values are null.  Java doesn't provide a WeakHashSet.
   //
   // Constructor
   //
@@ -195,6 +196,7 @@
     // before the end of the transaction fix.
     this.outstandingAnswers         = outstandingAnswers;
     this.symbolicTransformationList = symbolicTransformationList;
+    this.answers                    = new WeakHashMap();
   }
 
   //
@@ -941,7 +943,10 @@
 
     // Complete the numerical phase of resolution
     Tuples tuples = localQuery.resolve();
-    result = new SubqueryAnswer(this, systemResolver, tuples, query.getVariableList());
+    MulgaraTransaction xa = new MulgaraTransaction(null, this);
+    result = new TransactionalAnswer(new MulgaraTransaction(null, this), new SubqueryAnswer(this, systemResolver, tuples, query.getVariableList()));
+    xa.tempDeactivate();  // FIXME: Only necessary while we introduce the manager.
+    answers.put(result, null);
     tuples.close();
     localQuery.close();
 
@@ -984,4 +989,14 @@
     mutableLocalQueryImpl.close();
   }
 
+  void close() throws QueryException {
+    try {
+      Iterator i = answers.keySet().iterator();
+      while (i.hasNext()) {
+        ((TransactionalAnswer)i.next()).sessionClose();
+      }
+    } catch (TuplesException et) {
+      throw new QueryException("Error force-closing answers", et);
+    }
+  }
 }

Modified: branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/DatabaseSession.java
===================================================================
--- branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/DatabaseSession.java	2006-10-31 00:03:42 UTC (rev 120)
+++ branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/DatabaseSession.java	2006-10-31 00:11:40 UTC (rev 121)
@@ -204,6 +204,8 @@
    */
   private ContentHandlerManager contentHandlers;
 
+  private MulgaraTransactionManager manager;
+
   //
   // Constructor
   //
@@ -336,6 +338,8 @@
                                         outstandingAnswers,
                                         symbolicTransformationList
                                       );
+    // FIXME: This will eventually be passed as a parameter from Database.
+    this.manager                    = new MulgaraTransactionManager(null);
 
     if (logger.isDebugEnabled()) {
       logger.debug("Constructed DatabaseSession");
@@ -752,27 +756,31 @@
 
   public void close() throws QueryException {
     logger.info("Closing session");
-    if (!autoCommit) {
-      logger.warn("Closing session while holding write-lock");
+    try {
+      if (!autoCommit) {
+        logger.warn("Closing session while holding write-lock");
 
-      try {
-        resumeTransactionalBlock();
-      } catch (Throwable th) {
-        releaseWriteLock();
-        throw new QueryException("Error while resuming transaction in close", th);
-      }
+        try {
+          resumeTransactionalBlock();
+        } catch (Throwable th) {
+          releaseWriteLock();
+          throw new QueryException("Error while resuming transaction in close", th);
+        }
 
-      try {
-        rollbackTransactionalBlock(
-            new QueryException("Attempt to close session whilst in transaction"));
-      } finally {
-        endTransactionalBlock("Failed to release write-lock in close");
+        try {
+          rollbackTransactionalBlock(
+              new QueryException("Attempt to close session whilst in transaction"));
+        } finally {
+          endTransactionalBlock("Failed to release write-lock in close");
+        }
+      } else {
+        if (this.transaction != null) {
+          resumeTransactionalBlock();
+          endPreviousQueryTransaction();
+        }
       }
-    } else {
-      if (this.transaction != null) {
-        resumeTransactionalBlock();
-        endPreviousQueryTransaction();
-      }
+    } finally {
+      operationContext.close();
     }
   }
 
@@ -1393,4 +1401,7 @@
     }
   }
 
+  OperationContext getOperationContext() {
+    return operationContext;
+  }
 }

Modified: branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransaction.java
===================================================================
--- branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransaction.java	2006-10-31 00:03:42 UTC (rev 120)
+++ branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransaction.java	2006-10-31 00:11:40 UTC (rev 121)
@@ -80,6 +80,7 @@
   private Throwable rollbackCause;
 
   public MulgaraTransaction(MulgaraTransactionManager manager, OperationContext context) {
+    report("Creating Transaction");
     this.manager = manager;
     this.context = context;
 
@@ -96,12 +97,14 @@
 
 //    FIXME: need this added to context. Allows context to cleanup caches at end of transaction.
 //    this.transaction.enlistResource(context.getXAResource());
+    report("Created Transaction");
   }
 
   // FIXME: Not yet certain I have the error handling right here.
   // Need to clarify semantics and ensure the error conditions are 
   // properly handled.
   private synchronized void activate() throws MulgaraTransactionException {
+    report("Activating Transaction");
     if (currentThread == null) {
       currentThread = Thread.currentThread();
     } else if (!currentThread.equals(Thread.currentThread())) {
@@ -109,57 +112,84 @@
     }
 
     if (inuse == 0) {
-      try {
-        manager.transactionResumed(this);
-      } catch (Throwable th) {
-        logger.warn("Error resuming transaction: ", th);
-        failTransaction();
-        throw new MulgaraTransactionException("Error resuming transaction", th);
-      }
+      report("Resuming transaction");
+//      try {
+//        manager.transactionResumed(this);
+//      } catch (Throwable th) {
+//        logger.warn("Error resuming transaction: ", th);
+//        failTransaction();
+//        throw new MulgaraTransactionException("Error resuming transaction", th);
+//      }
     }
 
     inuse++;
+
+    report("Activated transaction");
   }
 
 
+  public synchronized void tempDeactivate() throws MulgaraTransactionException {
+    deactivate();
+  }
+
   // FIXME: Not yet certain I have the error handling right here.
   // Need to clarify semantics and ensure the error conditions are 
   // properly handled.
   private synchronized void deactivate() throws MulgaraTransactionException {
+    report("Deactivating transaction");
 
     inuse--;
 
     if (inuse < 0) {
-      throw implicitRollback(
-          new MulgaraTransactionException("Mismatched activate/deactivate.  inuse < 0: " + inuse));
+        throw new MulgaraTransactionException("Mismatched activate/deactivate.  inuse < 0: " + inuse);
+//      throw implicitRollback(
+//          new MulgaraTransactionException("Mismatched activate/deactivate.  inuse < 0: " + inuse));
+    } else if (using < 0) {
+        throw new MulgaraTransactionException("Reference Failure.  using < 0: " + using);
     }
 
     if (inuse == 0) {
       if (using == 0) {
+        report("Completing Transaction");
         // END TRANSACTION HERE.  But commit might fail.
-        manager.transactionComplete(this);
+//        manager.transactionComplete(this);
+          manager = null;
+          transaction = null;
       } else {
+        report("Suspending Transaction");
         // What happens if suspend fails?
         // Rollback and terminate transaction.
         // JTA isn't entirely unambiguous as to the long-term stability of the original
         // transaction object - can suspend return a new object?
-        this.transaction = manager.transactionSuspended(this);
+//        this.transaction = manager.transactionSuspended(this);
       }
       currentThread = null;
     }
+    report("Deactivated Transaction");
   }
 
   // Do I want to check for currentThread here?  Do I want a seperate check() method to 
   // cover precondition checks against currentThread?
-  void reference() {
+  void reference() throws MulgaraTransactionException {
+    report("Referencing Transaction");
+    if (inuse < 1) {
+        throw new MulgaraTransactionException("Mismatched activate/deactivate.  inuse < 1: " + inuse);
+    } else if (using < 0) {
+        throw new MulgaraTransactionException("Reference Failure.  using < 0: " + using);
+    }
     using++;
+    report("Referenced Transaction");
   }
 
   void dereference() throws MulgaraTransactionException {
+    report("Dereferencing Transaction");
+    if (inuse < 1) {
+        throw new MulgaraTransactionException("Mismatched activate/deactivate.  inuse < 1: " + inuse);
+    } else if (using < 1) {
+        throw new MulgaraTransactionException("Reference Failure.  using < 1: " + using);
+    }
     using--;
-    if (using < 0) {
-      throw implicitRollback(new MulgaraTransactionException("ERROR: Transaction dereferenced more times than referenced!"));
-    }
+    report("Dereferenced Transaction");
   }
 
   void execute(Operation operation,
@@ -182,15 +212,22 @@
   /** Should rename this 'wrap' */
   AnswerOperationResult execute(AnswerOperation ao) throws TuplesException {
 //    FIXME: activate/deactivate won't work until we have MTMgr operational.
-//    activate();
+    report("Executing Operation");
     try {
-      ao.execute();
-      return ao.getResult();
-    } catch (Throwable th) {
-      throw new TuplesException("Error accessing Answer", th);
-//      throw implicitRollback(th);
+      activate();
+      try {
+        ao.execute();
+        return ao.getResult();
+      } catch (Throwable th) {
+        throw new TuplesException("Error accessing Answer", th);
+  //      throw implicitRollback(th);
+      } finally {
+        deactivate();
+      }
+    } catch (MulgaraTransactionException em) {
+      throw new TuplesException("Transaction error", em);
     } finally {
-//      deactivate();
+      report("Executed Operation");
     }
   }
 
@@ -230,7 +267,7 @@
     } catch (Exception e) {
       throw new MulgaraTransactionException("Error while trying to commit", e);
     } finally {
-      manager.transactionComplete(this);
+//      manager.transactionComplete(this);
     }
   }
 
@@ -254,4 +291,21 @@
   protected Transaction getTransaction() {
     return transaction;
   }
+
+  protected void finalize() {
+    report("GC-finalize");
+    if (inuse != 0 || using != 0) {
+      logger.error("Referernce counting error in transaction, inuse=" + inuse + ", using=" + using);
+    }
+    if (manager != null || transaction != null) {
+      logger.error("Transaction not terminated properly");
+    }
+  }
+
+  private void report(String desc) {
+    if (logger.isInfoEnabled()) {
+      logger.info(desc + ": " + System.identityHashCode(this) +
+          ", inuse=" + inuse + ", using=" + using);
+    }
+  }
 }

Modified: branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/TransactionalAnswer.java
===================================================================
--- branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/TransactionalAnswer.java	2006-10-31 00:03:42 UTC (rev 120)
+++ branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/TransactionalAnswer.java	2006-10-31 00:11:40 UTC (rev 121)
@@ -17,6 +17,10 @@
 
 package org.mulgara.resolver;
 
+// Third party packages
+import org.apache.log4j.Logger;
+
+// Local packages
 import org.mulgara.query.Answer;
 import org.mulgara.query.TuplesException;
 import org.mulgara.query.Variable;
@@ -45,15 +49,26 @@
  */
 
 public class TransactionalAnswer implements Answer {
+  /** Logger.  */
+  private static final Logger logger =
+    Logger.getLogger(MulgaraTransaction.class.getName());
 
   private Answer answer;
 
   private MulgaraTransaction transaction;
 
-  public TransactionalAnswer(MulgaraTransaction transaction, Answer answer) {
-    this.answer = answer;
-    this.transaction = transaction;
-    transaction.reference();
+  public TransactionalAnswer(MulgaraTransaction transaction, Answer answer) throws TuplesException {
+    try {
+      report("Creating Answer");
+
+      this.answer = answer;
+      this.transaction = transaction;
+      transaction.reference();
+
+      report("Created Answer");
+    } catch (MulgaraTransactionException em) {
+      throw new TuplesException("Failed to associate with transaction", em);
+    }
   }
 
   public Object getObject(final int column) throws TuplesException {
@@ -81,16 +96,24 @@
   }
 
   public void close() throws TuplesException {
-    transaction.execute(new AnswerOperation() {
-        public void execute() throws TuplesException {
-          answer.close();
-          try {
-            transaction.dereference();
-          } catch (MulgaraTransactionException em) {
-            throw new TuplesException("Error dereferencing transaction", em);
+    report("Closing Answer");
+    try {
+      transaction.execute(new AnswerOperation() {
+          public void execute() throws TuplesException {
+            answer.close();
+            try {
+              transaction.dereference();
+            } catch (MulgaraTransactionException em) {
+              throw new TuplesException("Error dereferencing transaction", em);
+            }
           }
-        }
-      });
+        });
+    } finally {
+      // !!FIXME: Note - We will need to add checks for null to all operations.
+      transaction = null;
+      answer = null;    // Note this permits the gc of the answer.
+      report("Closed Answer");
+    }
   }
 
   public int getColumnIndex(final Variable column) throws TuplesException {
@@ -174,6 +197,29 @@
       return c;
     } catch (CloneNotSupportedException ec) {
       throw new IllegalStateException("Clone failed on Cloneable");
+    } catch (MulgaraTransactionException em) {
+      throw new IllegalStateException("Failed to associate with transaction", em);
     }
   }
+
+  private void report(String desc) {
+    if (logger.isInfoEnabled()) {
+      logger.info(desc + ": " + System.identityHashCode(this) + ", xa=" + System.identityHashCode(transaction));
+    }
+  }
+
+  public void finalize() {
+    report("GC-finalizing");
+    if (transaction != null) {
+      logger.error("TransactionalAnswer not closed");
+    }
+  }
+
+
+  void sessionClose() throws TuplesException {
+    if (answer != null) {
+      report("Session forced close");
+      close();
+    }
+  }
 }




More information about the Mulgara-svn mailing list