[Mulgara-svn] r100 - in branches/xafix/src/jar: resolver/java/org/mulgara/resolver resolver-spi/java/org/mulgara/resolver/spi

andrae at mulgara.org andrae at mulgara.org
Fri Oct 13 11:44:41 UTC 2006


Author: andrae
Date: 2006-10-13 06:44:40 -0500 (Fri, 13 Oct 2006)
New Revision: 100

Added:
   branches/xafix/src/jar/resolver/java/org/mulgara/resolver/PreallocateOperation.java
Removed:
   branches/xafix/src/jar/resolver-spi/java/org/mulgara/resolver/spi/LocalSession.java
Modified:
   branches/xafix/src/jar/resolver/java/org/mulgara/resolver/AnswerDatabaseSession.java
   branches/xafix/src/jar/resolver/java/org/mulgara/resolver/DatabaseOperationContext.java
   branches/xafix/src/jar/resolver/java/org/mulgara/resolver/DatabaseSession.java
   branches/xafix/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransaction.java
   branches/xafix/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionManager.java
   branches/xafix/src/jar/resolver/java/org/mulgara/resolver/SubqueryAnswer.java
   branches/xafix/src/jar/resolver/java/org/mulgara/resolver/SubqueryAnswerUnitTest.java
Log:
This is most of the integration work required to bring the new transaction architecture into Mulgara.

There will be substantial work required to bring JRDF inline with the new architecture.
If Jena complains (which it probably will), I'll simply delete it.

Note: There are unanswered questions about the precise behaviour of the session caches, and how to properly 
clear/writeback them on transaction commit.

The enlistedResolverMap will also need to be cleared at the end of transaction, but how that happens I'm not sure.



Modified: branches/xafix/src/jar/resolver/java/org/mulgara/resolver/AnswerDatabaseSession.java
===================================================================
--- branches/xafix/src/jar/resolver/java/org/mulgara/resolver/AnswerDatabaseSession.java	2006-10-11 12:35:42 UTC (rev 99)
+++ branches/xafix/src/jar/resolver/java/org/mulgara/resolver/AnswerDatabaseSession.java	2006-10-13 11:44:40 UTC (rev 100)
@@ -59,19 +59,4 @@
    * @throws QueryException if <var>query</var> can't be answered
    */
   public Answer innerQuery(Query query) throws QueryException;
-
-  /**
-   * Answers are registered so that the session can ensure that they are closed.
-   *
-   * @param answer the answer to register with this session.
-   */
-  public void registerAnswer(SubqueryAnswer answer);
-
-  /**
-   * Answers are deregistered to indicate that they have closed themselves.
-   *
-   * @param answer the answer to deregister.
-   * @throws QueryException if the transactional block for this answer fails.
-   */
-  public void deregisterAnswer(SubqueryAnswer answer) throws QueryException;
 }

Modified: branches/xafix/src/jar/resolver/java/org/mulgara/resolver/DatabaseOperationContext.java
===================================================================
--- branches/xafix/src/jar/resolver/java/org/mulgara/resolver/DatabaseOperationContext.java	2006-10-11 12:35:42 UTC (rev 99)
+++ branches/xafix/src/jar/resolver/java/org/mulgara/resolver/DatabaseOperationContext.java	2006-10-13 11:44:40 UTC (rev 100)
@@ -141,7 +141,6 @@
                            Set                cachedResolverFactorySet,
                            Set                changedCachedModelSet,
                            DatabaseSession    databaseSession,
-                           Map                enlistedResolverMap,
                            Map                externalResolverFactoryMap,
                            Map                internalResolverFactoryMap,
                            DatabaseMetadata   metadata,
@@ -168,7 +167,7 @@
     this.cachedResolverFactorySet   = cachedResolverFactorySet;
     this.changedCachedModelSet      = changedCachedModelSet;
     this.databaseSession            = databaseSession;
-    this.enlistedResolverMap        = enlistedResolverMap;
+    this.enlistedResolverMap        = new HashMap(); // This will need to be fixed.
     this.externalResolverFactoryMap = externalResolverFactoryMap;
     this.internalResolverFactoryMap = internalResolverFactoryMap;
     this.metadata                   = metadata;

Modified: branches/xafix/src/jar/resolver/java/org/mulgara/resolver/DatabaseSession.java
===================================================================
--- branches/xafix/src/jar/resolver/java/org/mulgara/resolver/DatabaseSession.java	2006-10-11 12:35:42 UTC (rev 99)
+++ branches/xafix/src/jar/resolver/java/org/mulgara/resolver/DatabaseSession.java	2006-10-13 11:44:40 UTC (rev 100)
@@ -91,9 +91,6 @@
   private static final Logger symbolicLogger =
     Logger.getLogger(DatabaseSession.class.getName() + "#symbolic");
 
-  private static DatabaseSession writeSession = null;
-  private boolean writing;
-
   /**
    * The models from external resolvers which have been cached as temporary
    * models.
@@ -158,56 +155,13 @@
   /** Factory used to obtain the SystemResolver */
   private final ResolverFactory temporaryResolverFactory;
 
-  /** Source of transactions.  */
-  private final TransactionManager transactionManager;
-
-  /** Session transaction */
-  private Transaction transaction;
-
-  /** The related query handler */
-  private RelatedQueryHandler relatedQueryHandler;
-
   /** The name of the rule loader to use */
   private String ruleLoaderClassName;
 
   /** A fallback rule loader */
   private static final String DUMMY_RULE_LOADER = "org.mulgara.rules.DummyRuleLoader";
 
-  private int opState;
-  private static final int UNINIT = 0;
-  private static final int BEGIN = 1;
-  private static final int RESUME = 2;
-  private static final int FINISH = 3;
-  private static final String[] opStates = {
-      "UNINIT", "BEGIN", "RESUME", "FINISH", };
-
-  private final Map enlistedResolverMap;
-
-  private Set outstandingAnswers;
-
   /**
-   * Whether each method call of the {@link Session} interface should
-   * implicitly have a transaction created for it and be performed within that
-   * transaction.
-   *
-   * This defaults to <code>true</code> until modified by the
-   * {@link #setAutoCommit} method.
-   */
-  private boolean autoCommit = true;
-  private boolean inFailedTransaction = false;
-
-  /**
-   * If a transaction is marked for rollback by the
-   * {@link #rollbackTransactionalBlock} method, this field holds the exception
-   * that caused the rollback so that the {@link #endTransactionalBlock} method
-   * can add it as the cause of the {@link RollbackException} it will
-   * subsequently throw.
-   */
-  private Throwable rollbackCause = null;
-
-  private boolean explicitRollback = false;
-
-  /**
    * The registered {@link ContentHandler} instances.
    */
   private ContentHandlerManager contentHandlers;
@@ -322,10 +276,6 @@
         //get appropriate constructor
         Constructor constructor = storeClass.getConstructor(new Class[] {
             Session.class });
-
-        //instantiate
-        relatedQueryHandler = (RelatedQueryHandler) constructor.newInstance(
-            new Object[] { this });
       }
     }
     catch (Exception e) {
@@ -351,14 +301,11 @@
 
     this.outstandingAnswers         = new HashSet();
     this.transaction                = null;
-    this.enlistedResolverMap        = new HashMap();
-    this.opState                    = FINISH;
     this.operationContext           = new DatabaseOperationContext(
                                         cachedModelSet,
                                         cachedResolverFactorySet,
                                         changedCachedModelSet,
                                         this,
-                                        enlistedResolverMap,
                                         externalResolverFactoryMap,
                                         internalResolverFactoryMap,
                                         metadata,
@@ -484,24 +431,7 @@
    * @throws QueryException if the local node number can't be obtained
    */
   long preallocate(Node node) throws QueryException {
-    // Find or create the node
-    startTransactionalOperation(true);
-    try {
-      long localNode = systemResolver.localizePersistent(node);
-
-      // Create a statement linking the node to the graph so it's never reaped
-      systemResolver.modifyModel(metadata.getPreallocationModelNode(),
-          new SingletonStatements(metadata.getPreallocationSubjectNode(),
-          metadata.getPreallocationPredicateNode(),
-          localNode),
-          true);
-      return localNode;
-    } catch (Throwable e) {
-      rollbackTransactionalBlock(e);
-    } finally {
-      finishTransactionalOperation("Could not preallocate " + node);
-    }
-    throw new QueryException("Illegal transactional state in session");
+    execute(new PreallocateOperation(node), "Failure to preallocated " + node);
   }
 
   //
@@ -554,38 +484,18 @@
   }
 
   public void close() throws QueryException {
-    logger.info("Closing session");
-    if (!autoCommit) {
-      logger.warn("Closing session while holding write-lock");
-
-      try {
-        resumeTransactionalBlock();
-      } catch (Throwable th) {
-        releaseWriteLock();
-        throw new QueryException("Error while resuming transaction in close", th);
-      }
-
-      try {
-        rollbackTransactionalBlock(
-            new QueryException("Attempt to close session whilst in transaction"));
-      } finally {
-        endTransactionalBlock("Failed to release write-lock in close");
-      }
-    } else {
-      if (this.transaction != null) {
-        resumeTransactionalBlock();
-        endPreviousQueryTransaction();
-      }
+    try {
+      transactionManager.closingSession(this);
+    } catch (MulgaraTransactionException em) {
+      throw new QueryException("Error closing session", em);
     }
   }
 
   public void commit() throws QueryException {
-    logger.info("Committing transaction");
-    if (!autoCommit) {
-      synchronized (DatabaseSession.class) {
-        setAutoCommit(true);
-        setAutoCommit(false);
-      }
+    try {
+      transactionManager.commit(this);
+    } catch (MulgaraTransactionException em) {
+      throw new QueryException("Commit Failed", em);
     }
   }
 
@@ -682,10 +592,6 @@
       logger.debug("Login of " + user + " to " + securityDomain);
     }
 
-    /*
-    execute(new LoginOperation(securityDomain, user, password),
-            "Unable to login " + user + " to " + securityDomain);
-    */
     if (securityDomain.equals(metadata.getSecurityDomainURI())) {
       // Propagate the login event to the security adapters
       for (Iterator i = securityAdapterList.iterator(); i.hasNext();) {
@@ -699,138 +605,36 @@
       logger.info("QUERY: " + query);
     }
 
-    // Evaluate the query
-    QueryOperation queryOperation = new QueryOperation(query, this);
-    executeQuery(queryOperation);
-    return queryOperation.getAnswer();
+    return execute(new QueryOperation(query, this)).getAnswer();
   }
 
   public Answer innerQuery(Query query) throws QueryException {
-    // Validate "query" parameter
-    if (query == null) {
-      throw new IllegalArgumentException("Null \"query\" parameter");
-    }
-
-    if (logger.isInfoEnabled()) {
-      logger.info("Query: " + query);
-    }
-
-    boolean resumed;
-
-    if (this.transaction != null) {
-      resumeTransactionalBlock();
-      resumed = true;
-    } else {
-      resumed = false;
-    }
-
-    Answer result = null;
-    try {
-      result = doQuery(this, systemResolver, query);
-    } catch (Throwable th) {
-      try {
-        logger.warn("Inner Query failed", th);
-        rollbackTransactionalBlock(th);
-      } finally {
-        endPreviousQueryTransaction();
-        logger.error("Inner Query should have thrown exception", th);
-        throw new IllegalStateException(
-            "Inner Query should have thrown exception");
-      }
-    }
-
-    try {
-      if (resumed) {
-        suspendTransactionalBlock();
-      }
-
-      return result;
-    } catch (Throwable th) {
-      endPreviousQueryTransaction();
-      logger.error("Failed to suspend Transaction", th);
-      throw new QueryException("Failed to suspend Transaction");
-    }
+    return doQuery(this, systemResolver, query);
   }
 
   Tuples innerCount(LocalQuery localQuery) throws QueryException {
-    // Validate "query" parameter
-    if (localQuery == null) {
-      throw new IllegalArgumentException("Null \"query\" parameter");
-    }
+    LocalQuery lq = (LocalQuery)localQuery.clone();
+    transform(this, lq);
+    Tuples result = lq.resolve();
+    lq.close();
 
-    if (logger.isInfoEnabled()) {
-      logger.info("Inner Count: " + localQuery);
-    }
-
-    boolean resumed;
-
-    if (this.transaction != null) {
-      resumeTransactionalBlock();
-      resumed = true;
-    } else {
-      resumed = false;
-    }
-
-    Tuples result = null;
-    try {
-      LocalQuery lq = (LocalQuery)localQuery.clone();
-      transform(this, lq);
-      result = lq.resolve();
-      lq.close();
-    } catch (Throwable th) {
-      try {
-        logger.warn("Inner Query failed", th);
-        rollbackTransactionalBlock(th);
-      } finally {
-        endPreviousQueryTransaction();
-        logger.error("Inner Query should have thrown exception", th);
-        throw new IllegalStateException(
-            "Inner Query should have thrown exception");
-      }
-    }
-
-    try {
-      if (resumed) {
-        suspendTransactionalBlock();
-      }
-
-      return result;
-    } catch (Throwable th) {
-      endPreviousQueryTransaction();
-      logger.error("Failed to suspend Transaction", th);
-      throw new QueryException("Failed to suspend Transaction");
-    }
+    return result;
   }
 
   static Answer doQuery(DatabaseSession databaseSession,
                         SystemResolver  systemResolver,
                         Query           query) throws Exception
   {
-    Answer result;
+    LocalQuery localQuery = new LocalQuery(query, systemResolver, databaseSession);
 
-    // If a related query, do not enter into a transaction as we can't have
-    // nested transactions.
-    if (query.getRelated() != null) {
-      RelatedExpression related = query.getRelated();
-      result = databaseSession.relatedQueryHandler.related(((Node) related.getBaseNode()),
-          new Query[] {query}, related.getLimit(), related.getThresholdValue());
-    } else {
-      LocalQuery localQuery =
-        new LocalQuery(query, systemResolver, databaseSession);
+    transform(databaseSession, localQuery);
 
-      transform(databaseSession, localQuery);
+    Tuples tuples = localQuery.resolve();
+    Answer result = new TransactionalAnswer(
+        new SubqueryAnswer(databaseSession, systemResolver, tuples, query.getVariableList()));
+    tuples.close();
+    localQuery.close();
 
-      // Complete the numerical phase of resolution
-      Tuples tuples = localQuery.resolve();
-      result = new TransactionalAnswer(
-          new SubqueryAnswer(databaseSession, systemResolver, tuples, query.getVariableList()));
-      tuples.close();
-      localQuery.close();
-    }
-
-    if (logger.isDebugEnabled()) {
-      logger.debug("Answer rows = " + result.getRowCount());
-    }
     return result;
   }
 
@@ -868,84 +672,11 @@
     mutableLocalQueryImpl.close();
   }
 
-  private void endPreviousQueryTransaction() throws QueryException {
-    logger.debug("Clearing previous transaction");
 
-    // Save the exception.
-    Throwable tmpThrowable = rollbackCause;
-
-    Set answers = new HashSet(outstandingAnswers);
-    Iterator i = answers.iterator();
-    while (i.hasNext()) {
-      try {
-        SubqueryAnswer s = (SubqueryAnswer) i.next();
-        deregisterAnswer(s);
-        //Do not close tuples - for Jena and JRDF.
-        //s.close();
-      } catch (Throwable th) {
-        logger.debug("Failed to close preexisting answer", th);
-      }
-    }
-
-    // If by closing the answer we have called endTransactionalBlock then
-    // throw the saved exception.
-    if ((rollbackCause == null) && (tmpThrowable != null) &&
-        (systemResolver == null)) {
-      throw new QueryException("Failure ending previous query", tmpThrowable);
-    }
-
-    try {
-      if (!outstandingAnswers.isEmpty()) {
-        throw new QueryException("Failed to clear preexisting transaction");
-      }
-      if (this.transaction != null) {
-        throw new QueryException("Failed to void suspended transaction");
-      }
-      if (transactionManager.getTransaction() != null) {
-        throw new QueryException("Failed to end transaction");
-      }
-    } catch (QueryException eq) {
-      endTransactionalBlock("Error ending previous query");
-      throw eq;
-    } catch (Throwable th) {
-      endTransactionalBlock("Error ending previous query");
-      throw new QueryException("Failure ending previous query", th);
-    }
-  }
-
-  public void registerAnswer(SubqueryAnswer answer) {
-    if (logger.isDebugEnabled()) {
-      logger.debug("registering Answer: " + System.identityHashCode(answer));
-    }
-    outstandingAnswers.add(answer);
-  }
-
-  public void deregisterAnswer(SubqueryAnswer answer) throws QueryException {
-    if (logger.isDebugEnabled()) {
-      logger.debug("deregistering Answer: " + System.identityHashCode(answer));
-    }
-
-    if (!outstandingAnswers.contains(answer)) {
-      logger.info("Stale answer being closed");
-    } else {
-      outstandingAnswers.remove(answer);
-      if (autoCommit && outstandingAnswers.isEmpty()) {
-        if (transaction != null) {
-          resumeTransactionalBlock();
-        }
-        endTransactionalBlock("Could not commit query");
-      }
-    }
-  }
-
   public List query(List queryList) throws QueryException {
 
     if (logger.isInfoEnabled()) {
-      StringBuffer log = new StringBuffer("QUERYING LIST: ");
-      for (int i = 0; i < queryList.size(); i++) {
-        log.append(queryList.get(i));
-      }
-      logger.info(log.toString());
+      logger.info("QUERYING LIST: " + queryList);
     }
 
     QueryOperation queryOperation = new QueryOperation(queryList, this);
@@ -1072,65 +803,19 @@
   }
 
   public void rollback() throws QueryException {
-    logger.info("Rollback transaction");
-    if (autoCommit) {
-      throw new QueryException(
-          "Attempt to rollback transaction outside transaction");
-    }
-    resumeTransactionalBlock();
     try {
-      explicitRollback = true;
-      rollbackTransactionalBlock(new QueryException(
-          "Explicit rollback requested on session"));
-    } catch (Throwable th) {
-      logger.error("Failed to rollback transaction", th);
-      throw new QueryException("Rollback failed", th);
-    } finally {
-      synchronized (DatabaseSession.class) {
-        try {
-          endTransactionalBlock("Rollback failed, ending transaction");
-        } catch (Throwable th) {
-          throw new QueryException("Rollback failed", th);
-        }
-        setAutoCommit(false);
-      }
+      transactionManager.rollback(this);
+    } catch (MulgaraTransactionException em) {
+      throw new QueryException("Error in rollback", em);
     }
   }
 
   public void setAutoCommit(boolean autoCommit) throws QueryException {
-    if (logger.isInfoEnabled()) {
-      logger.info("setAutoCommit(" + autoCommit + ") called with autoCommit = " + this.autoCommit);
+    try {
+      transactionManager.setAutoCommit(this);
+    } catch (MulgaraTransactionException em) {
+      throw new QueryException("Error setting autocommit to " + autoCommit, em);
     }
-
-    if (!this.autoCommit && autoCommit) { // Turning autoCommit on
-      try {
-        resumeTransactionalBlock();
-      } finally {
-        this.autoCommit = true;
-        this.inFailedTransaction = false;
-        endTransactionalBlock("Extended transaction failed");
-      }
-    } else if (this.autoCommit && !autoCommit) { // Turning autoCommit off
-      if (this.transaction != null) {
-        resumeTransactionalBlock();
-        endPreviousQueryTransaction();
-      }
-      systemResolver = beginTransactionalBlock(true);
-      try {
-        suspendTransactionalBlock();
-      } catch (Throwable th) {
-        logger.error("Failed to suspend transaction", th);
-        rollbackTransactionalBlock(th);
-        endTransactionalBlock("Could set auto Commit off");
-      }
-      this.autoCommit = false;
-    } else if (this.inFailedTransaction) { // Reset after failed autoCommit off based transaction.
-      this.inFailedTransaction = false;
-    } else { // Leaving autoCommit the same
-      if (logger.isInfoEnabled()) {
-        logger.info("Invalid call to setAutoCommit(" + autoCommit + ") called with autoCommit = " + this.autoCommit);
-      }
-    }
   }
 
   /**
@@ -1348,47 +1033,6 @@
   //
 
   /**
-   * Mark the beginning of a transactional block.
-   *
-   * This begins a transaction if {@link #autoCommit} is <code>true</code>.
-   *
-   * @throws QueryException if a transaction needed to be begun and couldn't be
-   */
-  private SystemResolver beginTransactionalBlock(boolean allowWrites) throws
-      QueryException {
-    if (logger.isInfoEnabled()) {
-      logger.info("Beginning transactional block: autocommit = " + autoCommit);
-    }
-
-    // Start the transaction
-    if (inFailedTransaction == true) {
-      throw new IllegalStateException("Transaction already failed, set autocommit true to reset");
-    } else if (!enlistedResolverMap.isEmpty()) {
-      throw new QueryException("Stale resolvers found in enlistedResolverMap");
-    }
-
-
-    if (allowWrites) {
-      try {
-        obtainWriteLock();
-      } catch (InterruptedException ei) {
-        throw new QueryException("Unable to obtain write lock", ei);
-      }
-    }
-
-    try {
-      transactionManager.begin();
-      if (systemResolver != null) {
-        throw new QueryException("beginning nested transaction");
-      }
-      systemResolver = systemResolverFactory.newResolver(allowWrites);
-      return (SystemResolver) operationContext.enlistResolver(systemResolver);
-    } catch (Exception e) {
-      throw new QueryException("Unable to begin transaction", e);
-    }
-  }
-
-  /**
    * Execute an {@link Operation}.
    *
    * @param operation  the {@link Operation} to execute
@@ -1399,148 +1043,15 @@
   private void execute(Operation operation, String failureMessage)
     throws QueryException
   {
-    assert operation != null;
-
-    startTransactionalOperation(operation.isWriteOperation());
-
-    assert systemResolver != null;
     try {
-      operation.execute(operationContext, systemResolver, resolverSessionFactory, metadata);
-    } catch (Throwable e) {
-      rollbackTransactionalBlock(e);
-    } finally {
-      finishTransactionalOperation(failureMessage);
+      MulgaraTransaction transaction = transactionManager.getTransaction(this, operation.isWriteOperation());
+      transaction.execute(operation, operationContext, systemResolver, resolverSessionFactory, metadata);
+    } catch (MulgaraTransactionException em) {
+      throw new QueryException(failureMessage, em);
     }
   }
 
   /**
-   * Execute an {@link Operation}.
-   *
-   * @param operation  the {@link Operation} to execute
-   * @throws QueryException if the <var>operation</var> fails
-   */
-  private void executeQuery(Operation operation) throws QueryException
-  {
-    /*
-     * Transaction semantics:
-     * AC && Suspended  -> R clr E B . S
-     * AC && !Suspended -> B . S
-     * !AC              -> R clr . S
-     */
-    if (autoCommit) {
-      if (this.transaction != null) {
-        resumeTransactionalBlock();
-        endPreviousQueryTransaction();
-      }
-      beginTransactionalBlock(operation.isWriteOperation());
-    }
-    else {
-      resumeTransactionalBlock();
-    }
-
-    try {
-      operation.execute(operationContext,
-                        systemResolver,
-                        resolverSessionFactory,
-                        metadata);
-    }
-    catch (Throwable th) {
-      try {
-        logger.warn("Query failed", th);
-        rollbackTransactionalBlock(th);
-      } finally {
-        endPreviousQueryTransaction();
-        throw new QueryException("Failed to rollback failed transaction", th);
-      }
-    }
-
-    try {
-      suspendTransactionalBlock();
-    } catch (Throwable th) {
-      endPreviousQueryTransaction();
-      logger.error("Query should have thrown exception", th);
-      throw new IllegalStateException("Query should have thrown exception");
-    }
-  }
-
-  private void obtainWriteLock() throws InterruptedException {
-    logger.info("Trying to obtain write lock. ");
-    synchronized (DatabaseSession.class) {
-      if (DatabaseSession.writeSession == this) {
-        return;
-      }
-      while (DatabaseSession.writeSession != null) {
-        DatabaseSession.class.wait();
-      }
-      DatabaseSession.writeSession = this;
-      this.writing = true;
-      logger.info("Obtained write lock. ");
-    }
-  }
-
-  private void releaseWriteLock() {
-    synchronized (DatabaseSession.class) {
-      if (DatabaseSession.writeSession == this) {
-        logger.info("Releasing write lock");
-        DatabaseSession.writeSession = null;
-        this.writing = false;
-        DatabaseSession.class.notifyAll();
-      }
-    }
-  }
-
-  /**
-   * Mark the end of a transactional block.
-   *
-   * This commits the current transaction if {@link #autoCommit} is
-   * <code>true</code>.
-   *
-   * @throws QueryException if a transaction needed to be committed and
-   *   couldn't be
-   */
-  private void endTransactionalBlock(String failureMessage) throws QueryException {
-    if (logger.isInfoEnabled()) {
-      logger.info(
-        "End Transactional Block autocommit=" + autoCommit +
-        " transaction status=" + StatusFormat.formatStatus(transactionManager)
-      );
-    }
-
-    try {
-      // Commit the transaction
-      if (rollbackCause == null) {
-        transactionManager.commit();
-      } else {
-        try {
-          transactionManager.commit();
-        } catch (RollbackException e) {
-          // Sneakily reinsert the exception recorded earlier by the
-          // rollbackTransactionalBlock method.  Without this feature, it's
-          // very difficult to determine why a rollback occurred.
-          e.initCause(rollbackCause);
-          throw e;
-        } finally {
-          rollbackCause = null;
-        }
-      }
-    } catch (Exception e) {
-      if (!explicitRollback) {
-        throw new QueryException(failureMessage, e);
-      }
-    } finally {
-      releaseWriteLock();
-      enlistedResolverMap.clear();
-      outstandingAnswers.clear();
-      clearCache();
-      operationContext.clearSystemModelCache();
-
-      systemResolver = null;
-      explicitRollback = false;
-      autoCommit = true;
-    }
-  }
-
-  /**
    * Clear the cache of temporary models.
    */
   private void clearCache()
@@ -1661,155 +1172,6 @@
     }
   }
 
-  /**
-   * Mark the current transaction for rollback due to an exception.
-   *
-   * This records the exception which caused the rollback in the
-   * {@link #rollbackCause} field.
-   */
-  public void rollbackTransactionalBlock(Throwable throwable) throws
-      QueryException {
-    logger.info("Rollback Transactional Block");
-    assert throwable != null;
-
-    try {
-      if (logger.isDebugEnabled()) {
-        logger.debug("Marking transaction for rollback", throwable);
-      }
-      transactionManager.setRollbackOnly();
-    } catch (Throwable e) {
-      logger.error("Needed to mark transaction for rollback", throwable);
-      logger.error("Unable to mark transaction for rollback", e);
-      throw new QueryException("Unable to mark transaction for rollback", e);
-    }
-
-    rollbackCause = throwable;
-  }
-
-  /**
-   * Suspends current transaction, storing it in session for latter resumption.
-   *
-   * @throws Throwable Must be called inside the try/catch(Throwable) block
-   * protecting the transaction.
-   */
-  public void suspendTransactionalBlock() throws Throwable {
-    logger.info("Suspend Transactional Block");
-    if (transaction != null) {
-      throw new IllegalStateException(
-          "Attempt to suspend unresumed transaction.");
-    }
-    if (logger.isInfoEnabled()) {
-      logger.info(
-         "Suspend Transactional Block autocommit=" + autoCommit +
-         " transaction status=" + StatusFormat.formatStatus(transactionManager)
-      );
-    }
-
-    int status = transactionManager.getStatus();
-    if (!autoCommit &&
-        (status == Status.STATUS_MARKED_ROLLBACK ||
-         status == Status.STATUS_ROLLEDBACK ||
-         status == Status.STATUS_ROLLING_BACK)) {
-      inFailedTransaction = true;
-      throw new QueryException("Transaction marked for rollback");
-    }
-
-    this.transaction = transactionManager.suspend();
-  }
-
-  /**
-   * Resumes the previously suspended transaction from the current session.
-   *
-   * @throws QueryException Must be called outside the try/catch(Throwable) block
-   * protecting the transaction.
-   */
-  public void resumeTransactionalBlock() throws QueryException {
-    logger.info("Resume Transactional Block");
-    if (transaction == null) {
-      throw new IllegalStateException("Attempt to resume unsuspended transaction");
-    } else if (inFailedTransaction == true) {
-      throw new IllegalStateException("Transaction already failed, set autocommit true to reset");
-    }
-
-    try {
-      transactionManager.resume(this.transaction);
-      this.transaction = null;
-    } catch (Exception e) {
-      logger.error("Resume failed", e);
-      throw new QueryException("Failed to resume transaction", e);
-    }
-  }
-
-  /**
-   * Start's or resumes a transaction for an operation.
-   *
-   * Using start/finish TransactionalOperation ensures properly matched pairs of
-   * begin/end and suspend/resume.
-   */
-  public void startTransactionalOperation(boolean needsWrite) throws
-      QueryException {
-    logger.info("Starting Transactional Operation");
-    if (opState != FINISH) {
-      throw new IllegalArgumentException(
-          "Attempt to start transactional operation during: " +
-          opStates[opState]);
-    }
-    if (autoCommit) {
-      if (this.transaction != null) {
-        resumeTransactionalBlock();
-        endPreviousQueryTransaction();
-      }
-      beginTransactionalBlock(needsWrite);
-      logger.info("BEGIN new transaction.");
-      opState = BEGIN;
-    } else {
-      resumeTransactionalBlock();
-      logger.info("RESUME old transaction.");
-      opState = RESUME;
-    }
-  }
-
-  /**
-   * Ends's or suspends a transaction for an operation.
-   *
-   * Using start/finish TransactionalOperation ensures properly matched pairs of
-   * begin/end and suspend/resume.
-   */
-  public void finishTransactionalOperation(String errorString) throws
-      QueryException {
-    logger.info("Finishing Transactional Operation");
-    if (logger.isDebugEnabled()) {
-      logger.debug("opState = " + opStates[opState]);
-      logger.debug("autoCommit = " + autoCommit);
-    }
-    if (opState == FINISH) {
-      throw new IllegalArgumentException(
-          "Attempt to finish transactional operation during: " + opStates[opState]);
-    }
-    if (autoCommit) {
-      try {
-        endTransactionalBlock(errorString);
-      } finally {
-        logger.info("FINISH(end) implicit transaction.");
-        opState = FINISH;
-      }
-    } else {
-      try {
-        suspendTransactionalBlock();
-      } catch (Throwable th) {
-        logger.error("Failed to suspend transaction", th);
-        try {
-          rollbackTransactionalBlock(new QueryException("Failed to suspend transaction"));
-        } finally {
-          endTransactionalBlock("Failed to suspend transaction at end of operation");
-        }
-      } finally {
-        logger.info("FINISH(suspend) explicit transaction.");
-        opState = FINISH;
-      }
-    }
-  }
-
   public boolean isLocal() {
     return true;
   }

Modified: branches/xafix/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransaction.java
===================================================================
--- branches/xafix/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransaction.java	2006-10-11 12:35:42 UTC (rev 99)
+++ branches/xafix/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransaction.java	2006-10-13 11:44:40 UTC (rev 100)
@@ -43,6 +43,7 @@
  * @licence Open Software License v3.0</a>
  */
 public class MulgaraTransaction {
+  private MulgaraTransactionManager Mulagaramanager;
   private TransactionManager manager;
 
   private Transaction transaction;
@@ -58,7 +59,9 @@
   private int rollback;
   private Throwable rollbackCause;
 
-  public MulgaraTransaction(TransactionManager manager) { 
+  public MulgaraTransaction(MulgaraTransactionManager mulgaraManager,
+      TransactionManager manager) { 
+    this.mulgaraManager = mulgaraManager;
     this.manager = manager;
 
     manager.begin();
@@ -131,10 +134,16 @@
     }
   }
 
-  void execute(Operation operation) {
+  void execute(OperationContext operation,
+               SystemResolver systemResolver,
+               ResolverSessionFactory resolverSessionFactory, // FIXME: We shouldn't need this.
+               DatabaseMetadata databaseMetatdata) {
     activate();
     try {
-      operation.execute(this);
+      operation.execute(operationContext,
+                        systemResolver,
+                        resolverSessionFactory,
+                        metadata);
     } catch (Throwable th) {
       implicitRollback(th);
     } finally {
@@ -181,6 +190,17 @@
 
   private void finalizeTransaction() {
     // We need a whole load of error handling here, but the core operation is commit.
-    transaction.commit();
+    try {
+      transaction.commit();
+    } finally {
+      session.clearCache();
+      operationContext.clearSystemModelCache();
+      transactionManager.transactionComplete(this);
+    }
   }
+
+  public void enlistResolver(Resolver resolver) {
+    XAResource resource = resolver.getXAResource();
+    transaction.enlistResource(xaResource);
+  }
 }

Modified: branches/xafix/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionManager.java
===================================================================
--- branches/xafix/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionManager.java	2006-10-11 12:35:42 UTC (rev 99)
+++ branches/xafix/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionManager.java	2006-10-13 11:44:40 UTC (rev 100)
@@ -51,9 +51,18 @@
   private Session currentWritingSession;
   private MulgaraTransaction writeTransaction;
 
+  /** Map from session to transaction for all 'write' transactions that have been rolledback. */
   private Map failedSessions;
 
-  public MulgaraTransactionManager() {
+  private TransactionManager transactionManager;
+
+  public MulgaraTransactionManager(TransactionManager transactionManager) {
+    this.currentWritingSession = null;
+    this.writeTransaction = null;
+
+    this.failedSessions = new HashMap();
+
+    this.transactionManager = transactionManager;
   }
 
   /**
@@ -66,7 +75,7 @@
    *     obtain the write-lock, create a new transaction object and return it.</li>
    * </ul>
    */
-  public synchronized MulgaraTransaction getTransaction(Session session, boolean write) {
+  public synchronized MulgaraTransaction getTransaction(Session session, boolean write) throws MulgaraTransactionException {
     if (session == currentWritingSession) {
       return writeTransaction;
     } 
@@ -74,7 +83,7 @@
     if (write) {
       obtainWriteLock(session);
     }
-    writeTransaction = new MulgaraTransaction(this);
+    writeTransaction = new MulgaraTransaction(this, transactionManager);
   }
 
   private synchronized void obtainWriteLock(Session session) {
@@ -148,6 +157,14 @@
     }
   }
 
-  public synchronized close() {
+  public synchronized void closingSession(Session session) throws MulgaraTransactionException {
+    // Check if we hold the write lock, if we do then rollback and throw exception.
+    // Otherwise - no-op.
   }
+
+  public synchronized void transactionComplete(MulgaraTransaction transaction) {
+    if (transaction == writeTransaction) {
+      releaseWriteLock();
+    }
+  }
 }

Added: branches/xafix/src/jar/resolver/java/org/mulgara/resolver/PreallocateOperation.java
===================================================================
--- branches/xafix/src/jar/resolver/java/org/mulgara/resolver/PreallocateOperation.java	2006-10-11 12:35:42 UTC (rev 99)
+++ branches/xafix/src/jar/resolver/java/org/mulgara/resolver/PreallocateOperation.java	2006-10-13 11:44:40 UTC (rev 100)
@@ -0,0 +1,110 @@
+/*
+ * The contents of this file are subject to the Mozilla Public License
+ * Version 1.1 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ * http://www.mozilla.org/MPL/
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and limitations
+ * under the License.
+ *
+ * The Original Code is the Kowari Metadata Store.
+ *
+ * The Initial Developer of the Original Code is Plugged In Software Pty
+ * Ltd (http://www.pisoftware.com, mailto:info at pisoftware.com). Portions
+ * created by Plugged In Software Pty Ltd are Copyright (C) 2001,2002
+ * Plugged In Software Pty Ltd. All Rights Reserved.
+ *
+ * Contributor(s): N/A.
+ *
+ * [NOTE: The text of this Exhibit A may differ slightly from the text
+ * of the notices in the Source Code files of the Original Code. You
+ * should use the text of this Exhibit A rather than the text found in the
+ * Original Code Source Code for Your Modifications.]
+ *
+ */
+
+package org.mulgara.resolver;
+
+// Java 2 standard packages
+import java.io.*;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.*;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+
+// Java 2 enterprise packages
+import javax.transaction.RollbackException;
+import javax.transaction.Status;
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import javax.transaction.InvalidTransactionException;
+
+// Third party packages
+import org.apache.log4j.Logger;
+import org.jrdf.graph.*;
+
+/**
+ * An {@link Operation} that implements node preallocation.
+ *
+ * @created 2004-11-24
+ *
+ * @author <a href="mailto:andrae at netymon.com">Andrae Muys</a>
+ *
+ * @version $Revision: $
+ *
+ * @modified $Date: $ by $Author: $
+ *
+ * @maintenanceAuthor $Author: $
+ *
+ * @copyright &copy;2004 <a href="http://www.tucanatech.com/">Tucana
+ *   Technology, Inc</a>
+ *
+ * @licence <a href="{@docRoot}/../../LICENCE">Mozilla Public License v1.1</a>
+ */
+class PreallocateOperation implements Operation
+{
+  /** Logger.  */
+  private static final Logger logger =
+    Logger.getLogger(PreallocateOperation.class.getName());
+
+  /**
+   * The node to be preallocated
+   */
+  private final Node node;
+
+  /** 
+   * The localized node.
+   */
+  private final long localNode;
+
+  RemoveModelOperation(Node node) {
+    assert node != null;
+    this.node = node;
+  }
+
+  public void execute(OperationContext       operationContext,
+                      SystemResolver         systemResolver,
+                      ResolverSessionFactory resolverSessionFactory,
+                      DatabaseMetadata       metadata) throws Exception {
+
+    this.localNode = systemResolver.localizePersistent(node);
+
+    // Create a statement linking the node to the graph so it's never reaped
+    systemResolver.modifyModel(metadata.getPreallocationModelNode(),
+    new SingletonStatements(metadata.getPreallocationSubjectNode(),
+        metadata.getPreallocationPredicateNode(),
+            localNode),
+  }
+
+  public long getResult() {
+    return localNode;
+
+  public boolean isWriteOperation() {
+    return true;
+  }
+}

Modified: branches/xafix/src/jar/resolver/java/org/mulgara/resolver/SubqueryAnswer.java
===================================================================
--- branches/xafix/src/jar/resolver/java/org/mulgara/resolver/SubqueryAnswer.java	2006-10-11 12:35:42 UTC (rev 99)
+++ branches/xafix/src/jar/resolver/java/org/mulgara/resolver/SubqueryAnswer.java	2006-10-13 11:44:40 UTC (rev 100)
@@ -120,7 +120,6 @@
     super(tuples, resolverSession);
 
     this.databaseSession = session;
-    this.databaseSession.registerAnswer(this);
     assignVariables(tuples, variableList);
   }
 
@@ -185,7 +184,6 @@
     cloned.variables = new Variable[this.variables.length];
     System.arraycopy(this.variables, 0, cloned.variables, 0,
         this.variables.length);
-    databaseSession.registerAnswer(cloned);
     return cloned;
   }
 
@@ -196,12 +194,6 @@
 
   public void close() throws TuplesException {
     super.close();
-    try {
-      databaseSession.deregisterAnswer(this);
-    }
-    catch (QueryException eq) {
-      logger.info("Failed to deregister answer from session", eq);
-    }
   }
 
   public int getColumnIndex(Variable variable) throws TuplesException {

Modified: branches/xafix/src/jar/resolver/java/org/mulgara/resolver/SubqueryAnswerUnitTest.java
===================================================================
--- branches/xafix/src/jar/resolver/java/org/mulgara/resolver/SubqueryAnswerUnitTest.java	2006-10-11 12:35:42 UTC (rev 99)
+++ branches/xafix/src/jar/resolver/java/org/mulgara/resolver/SubqueryAnswerUnitTest.java	2006-10-13 11:44:40 UTC (rev 100)
@@ -196,11 +196,5 @@
     public Answer innerQuery(Query query) throws QueryException {
       return null;
     }
-
-    public void registerAnswer(SubqueryAnswer answer) {
-    }
-
-    public void deregisterAnswer(SubqueryAnswer answer) throws QueryException {
-    }
   }
 }

Deleted: branches/xafix/src/jar/resolver-spi/java/org/mulgara/resolver/spi/LocalSession.java
===================================================================
--- branches/xafix/src/jar/resolver-spi/java/org/mulgara/resolver/spi/LocalSession.java	2006-10-11 12:35:42 UTC (rev 99)
+++ branches/xafix/src/jar/resolver-spi/java/org/mulgara/resolver/spi/LocalSession.java	2006-10-13 11:44:40 UTC (rev 100)
@@ -1,120 +0,0 @@
-/*
- * The contents of this file are subject to the Mozilla Public License
- * Version 1.1 (the "License"); you may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- * http://www.mozilla.org/MPL/
- *
- * Software distributed under the License is distributed on an "AS IS"
- * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
- * the License for the specific language governing rights and limitations
- * under the License.
- *
- * The Original Code is the Kowari Metadata Store.
- *
- * The Initial Developer of the Original Code is Plugged In Software Pty
- * Ltd (http://www.pisoftware.com, mailto:info at pisoftware.com). Portions
- * created by Plugged In Software Pty Ltd are Copyright (C) 2001,2002
- * Plugged In Software Pty Ltd. All Rights Reserved.
- *
- * Contributor(s): N/A.
- *
- * [NOTE: The text of this Exhibit A may differ slightly from the text
- * of the notices in the Source Code files of the Original Code. You
- * should use the text of this Exhibit A rather than the text found in the
- * Original Code Source Code for Your Modifications.]
- *
- */
-
-package org.mulgara.resolver.spi;
-
-// Local packages
-import org.mulgara.query.QueryException;
-import org.mulgara.server.*;
-
-/**
- * Local extensions to the session interface.
- *
- * Note: This interface is one of the first steps towards a generic server-side
- *       extension API.  Expect it to change in signifigant, non-backwards compatible
- *       ways.
- *
- * @created 2004-11-10
- *
- * @author Andrew Newman
- *
- * @version $Revision: 1.8 $
- *
- * @modified $Date: 2005/01/05 04:58:50 $ by $Author: newmana $
- *
- * @maintenanceAuthor $Author: newmana $
- *
- * @copyright &copy;2004 <a href="http://www.tucanatech.com/">Tucana
- *   Technology, Inc</a>
- *
- * @licence <a href="{@docRoot}/../../LICENCE">Mozilla Public License v1.1</a>
- */
-public interface LocalSession extends Session {
-
-  /**
-   * Start's or resumes a transaction for an operation.
-   *
-   * Using start/finish TransactionalOperation ensures properly matched pairs of
-   * begin/end and suspend/resume.
-   *
-   * @deprecated This should not be relied upon to create resolvers as this
-   *     should go away.
-   */
-  public void startTransactionalOperation(boolean needsWrite)
-      throws QueryException;
-
-  /**
-   * Mark the current transaction for rollback due to an exception.
-   *
-   * @param throwable  the exception which caused the rollback
-   * @deprecated This should not be relied upon to create resolvers as this
-   *     should go away.
-   */
-  public void rollbackTransactionalBlock(Throwable throwable)
-      throws QueryException;
-
-  /**
-   * Ends's or suspends a transaction for an operation.
-   *
-   * Using start/finish TransactionalOperation ensures properly matched pairs of
-   * begin/end and suspend/resume.
-   *
-   * @deprecated This should not be relied upon to create resolvers as this
-   *     should go away.
-   */
-  public void finishTransactionalOperation(String errorString)
-      throws QueryException;
-
-  /**
-   * Resumes the previously suspended transaction from the current session.
-   *
-   * @throws QueryException Must be called outside the try/catch(Throwable) block
-   * protecting the transaction.
-   * @deprecated This should not be relied upon to create resolvers as this
-   *     should go away.
-   */
-  public void resumeTransactionalBlock() throws QueryException;
-
-  /**
-   * Suspends current transaction, storing it in session for latter resumption.
-   *
-   * @throws Throwable Must be called inside the try/catch(Throwable) block
-   * protecting the transaction.
-   * @deprecated This should not be relied upon to create resolvers as this
-   *     should go away.
-   */
-  public void suspendTransactionalBlock() throws Throwable;
-
-  /**
-   * Returns the current Resolver Session.
-   *
-   * @return the current resolver session.
-   * @deprecated This should not be relied upon to create resolvers as this
-   *     should go away.
-   */
-  public ResolverSession getResolverSession();
-}




More information about the Mulgara-svn mailing list