[Mulgara-svn] r536 - branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver
andrae at mulgara.org
andrae at mulgara.org
Wed Nov 14 08:13:11 UTC 2007
Author: andrae
Date: 2007-11-14 02:13:10 -0600 (Wed, 14 Nov 2007)
New Revision: 536
Added:
branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransaction.java
branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransactionFactory.java
branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraInternalTransaction.java
branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraInternalTransactionFactory.java
branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionFactory.java
branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraXAResource.java
Modified:
branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/DatabaseSession.java
branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransaction.java
branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionManager.java
Log:
This isn't finished, it doesn't even compile - however it does capture the
'shape' of the solution.
I remain hopeful that I should eventually be able to eliminate the internal
xa-management completely and replace it with calls to the external xa-management
classes using an internal JOTM instance.
The 'factory' classes will eventually be renamed FooManager or FooCoordinator,
as that is really their role - the MTManager class now has managing the
write-lock as its sole responsibility, so will also be renamed to reflect this.
One seriously complicating factor is that JOTM is not itself a JTA Resource
Manager - much to my surprise. Ie. it doesn't export its own XAResource, so
chaining of resources is impossible. Hence my original plan to simply wrap a
JOTMXAResource object and offer a simple RMI-proxy+ReadOnly-support XAResource
from Session didn't last very long.
Hence the XAResource needs to bypass JOTM completely, and hence the
dual-transaction-manager structure of the code below. Conveniently JTA Section
3.4.7 permits us to refuse to support mixing internal and external transaction
interfaces on 'transactional connection' - so currently a DatabaseSession
latches to either internal or external depending on which interface is used
first, and throws an exception should the other interface be used subsequently.
Modified: branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/DatabaseSession.java
===================================================================
--- branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/DatabaseSession.java 2007-11-13 22:10:39 UTC (rev 535)
+++ branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/DatabaseSession.java 2007-11-14 08:13:10 UTC (rev 536)
@@ -125,6 +125,10 @@
/** Source of transactions. */
private final MulgaraTransactionManager transactionManager;
+ private MulgaraTransactionFactory transactionFactory;
+ private MulgaraInternalTransactionFactory internalFactory;
+ private MulgaraExternalTransactionFactory externalFactory;
+
/** The name of the rule loader to use */
private String ruleLoaderClassName;
@@ -238,6 +242,9 @@
this.temporaryModelTypeURI = temporaryModelTypeURI;
this.ruleLoaderClassName = ruleLoaderClassName;
+ this.transactionFactory = null;
+ this.internalFactory = null;
+
if (logger.isDebugEnabled()) {
logger.debug("Constructed DatabaseSession");
}
@@ -522,8 +529,9 @@
if (logger.isInfoEnabled()) {
logger.info("setAutoCommit(" + autoCommit + ") called.");
}
+ assertInternallyManagedXA();
try {
- transactionManager.setAutoCommit(this, autoCommit);
+ internalFactory.setAutoCommit(this, autoCommit);
} catch (MulgaraTransactionException em) {
throw new QueryException("Error setting autocommit", em);
}
@@ -531,8 +539,9 @@
public void commit() throws QueryException {
logger.info("Committing transaction");
+ assertInternallyManagedXA();
try {
- transactionManager.commit(this);
+ internalFactory.commit(this);
} catch (MulgaraTransactionException em) {
throw new QueryException("Error performing commit", em);
}
@@ -540,8 +549,9 @@
public void rollback() throws QueryException {
logger.info("Rollback transaction");
+ assertInternallyManagedXA();
try {
- transactionManager.rollback(this);
+ internalFactory.rollback(this);
} catch (MulgaraTransactionException em) {
throw new QueryException("Error performing rollback", em);
}
@@ -550,7 +560,7 @@
public void close() throws QueryException {
logger.info("Closing session");
try {
- transactionManager.rollbackCurrentTransactions(this);
+ transactionFactory.rollbackCurrentTransactions(this);
} catch (MulgaraTransactionException em) {
throw new QueryException("Error closing session. Forced close required", em);
}
@@ -586,8 +596,7 @@
*/
private synchronized void backup(OutputStream outputStream, URI serverURI, URI destinationURI)
throws QueryException {
- execute(
- new BackupOperation(outputStream, serverURI, destinationURI),
+ execute(new BackupOperation(outputStream, serverURI, destinationURI),
"Unable to backup to " + destinationURI);
}
@@ -597,14 +606,13 @@
protected void modify(URI modelURI, Set statements, boolean insert) throws QueryException
{
if (logger.isInfoEnabled()) {
- logger.info("Inserting statements into " + modelURI);
+ logger.info("Modifying (ins:" + insert + ") : " + modelURI);
}
if (logger.isDebugEnabled()) {
- logger.debug("Inserting statements: " + statements);
+ logger.debug("Modifying statements: " + statements);
}
- execute(new ModifyModelOperation(modelURI, statements, insert),
- "Could not commit insert");
+ execute(new ModifyModelOperation(modelURI, statements, insert), "Could not commit modify");
}
private void modify(URI modelURI, Query query, boolean insert) throws QueryException
@@ -614,7 +622,7 @@
}
execute(new ModifyModelOperation(modelURI, query, insert, this),
- "Unable to modify " + modelURI);
+ "Unable to modify " + modelURI);
}
/**
@@ -627,7 +635,7 @@
{
try {
MulgaraTransaction transaction =
- transactionManager.getTransaction(this, operation.isWriteOperation());
+ transactionFactory.getTransaction(this, operation.isWriteOperation());
transaction.execute(operation, resolverSessionFactory, metadata);
} catch (MulgaraTransactionException em) {
logger.info("Error executing operation: " + errorString, em);
@@ -648,4 +656,25 @@
systemResolverFactory,
writing);
}
+
+ private void assertInternallyManagedXA() throws QueryException {
+ if (transactionFactory == null) {
+ internalFactory = transactionFactory = transactionManager.getInternalFactory();
+ } else if (internalFactory == null) {
+ throw new QueryException("Attempt to use internal transaction control in externally managed session");
+ }
+ }
+
+ private void assertExternallyManagedXA() throws QueryException {
+ if (transactionFactory == null) {
+ externalFactory = transactionFactory = transactionManager.getExternalFactory();
+ } else if (externalFactory == null) {
+ throw new QueryException("Attempt to use external transaction control in internally managed session");
+ }
+ }
+
+ public XAResource getXAResource() throws QueryException {
+ assertExternallyManagedXA();
+ return externalFactory.getXAResource();
+ }
}
Added: branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransaction.java
===================================================================
--- branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransaction.java 2007-11-13 22:10:39 UTC (rev 535)
+++ branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransaction.java 2007-11-14 08:13:10 UTC (rev 536)
@@ -0,0 +1,115 @@
+/*
+ * The contents of this file are subject to the Open Software License
+ * Version 3.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ * http://www.rosenlaw.com/OSL3.0.htm
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and limitations
+ * under the License.
+ *
+ * This file is an original work developed by Netymon Pty Ltd
+ * (http://www.netymon.com, mailto:mail at netymon.com). Portions created
+ * by Netymon Pty Ltd are Copyright (c) 2007 Netymon Pty Ltd.
+ * All Rights Reserved.
+ */
+package org.mulgara.resolver;
+
+// Java 2 enterprise packages
+
+// Third party packages
+import org.apache.log4j.Logger;
+
+// Local packages
+import org.mulgara.resolver.spi.DatabaseMetadata;
+import org.mulgara.resolver.spi.EnlistableResource;
+import org.mulgara.resolver.spi.ResolverSessionFactory;
+
+import org.mulgara.query.MulgaraTransactionException;
+import org.mulgara.query.TuplesException;
+import org.mulgara.query.QueryException;
+
+/**
+ * @created 2007-11-06
+ *
+ * @author <a href="mailto:andrae at netymon.com">Andrae Muys</a>
+ *
+ * @version $Revision: $
+ *
+ * @modified $Date: $
+ *
+ * @maintenanceAuthor $Author: $
+ *
+ * @company <a href="mailto:mail at netymon.com">Netymon Pty Ltd</a>
+ *
+ * @copyright ©2007 <a href="http://www.netymon.com/">Netymon Pty Ltd</a>
+ *
+ * @licence Open Software License v3.0
+ */
+public class MulgaraExternalTransaction implements MulgaraTransaction {
+ private Xid xid;
+ private Set<EnlistableResource> resources;
+
+ private MulgaraXAResource resource;
+
+ MulgaraExternalTransaction(Xid xid, MulgaraXAResource resource) {
+ this.xid = xid;
+ this.resources = new HashSet<EnlistableResource>();
+ this.resource = resource;
+ }
+
+ // We ignore reference counting in external transactions
+ void reference() throws MulgaraTransactionException {}
+ void dereference() throws MulgaraTransactionException {}
+
+ MulgaraTransactionException abortTransaction(String errorMessage, Throwable cause)
+ throws MulgaraTransactionException {
+ if (resource.getStatus() == MulgaraXAResource.ACTIVE) {
+ resource.setRollback(cause);
+ }
+ for (EnlistableResource resource : resources) {
+ try {
+ resource.abort();
+ } catch (Throwable throw_away) {}
+ }
+ }
+
+
+ void execute(Operation operation,
+ ResolverSessionFactory resolverSessionFactory,
+ DatabaseMetadata metadata) throws MulgaraTransactionException {
+ if (resource.getStatus() != MulgaraXAResource.ACTIVE) {
+ throw new MulgaraTransactionException("Invalid transactional context : " + resource.getStatus());
+ }
+ try {
+ operation.execute(context,
+ context.getSystemResolver(),
+ resolverSessionFactory,
+ metadata);
+ } catch (Throwable th) {
+ resource.setRollback(th);
+ throw new MulgaraTransactionException("Operation failed", th);
+ }
+ }
+
+ AnswerOperationResult execute(AnswerOperation ao) throws TuplesException {
+ if (resource.getStatus() != MulgaraXAResource.ACTIVE) {
+ throw new TuplesException("Invalid transactional context : " + resource.getStatus());
+ }
+ try {
+ return ao.execute();
+ } catch (Throwable th) {
+ resource.setRollback(th);
+ throw new TuplesException("Request failed", th);
+ }
+ }
+
+ public void enlist(EnlistableResource enlistable) throws MulgaraTransactionException {
+ if (!resources.contains(enlistable)) {
+ XAResource res = enlistable.getXAResource();
+ bringUptodate(res);
+ resources.add(enlistable);
+ }
+ }
+}
Added: branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransactionFactory.java
===================================================================
--- branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransactionFactory.java 2007-11-13 22:10:39 UTC (rev 535)
+++ branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraExternalTransactionFactory.java 2007-11-14 08:13:10 UTC (rev 536)
@@ -0,0 +1,96 @@
+/*
+ * The contents of this file are subject to the Open Software License
+ * Version 3.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ * http://www.rosenlaw.com/OSL3.0.htm
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and limitations
+ * under the License.
+ *
+ * This file is an original work developed by Netymon Pty Ltd
+ * (http://www.netymon.com, mailto:mail at netymon.com). Portions created
+ * by Netymon Pty Ltd are Copyright (c) 2006 Netymon Pty Ltd.
+ * All Rights Reserved.
+ */
+
+package org.mulgara.resolver;
+
+// Java2 packages
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAResource;
+
+// Third party packages
+import org.apache.log4j.Logger;
+
+// Local packages
+import org.mulgara.query.MulgaraTransactionException;
+import org.mulgara.server.Session;
+import org.mulgara.transaction.TransactionManagerFactory;
+
+/**
+ * Manages transactions within Mulgara.
+ *
+ * see http://mulgara.org/confluence/display/dev/Transaction+Architecture
+ *
+ * Maintains association between Answer's and TransactionContext's.
+ * Manages tracking the ownership of the write-lock.
+ * Maintains the write-queue and any timeout algorithm desired.
+ * Provides new/existing TransactionContext's to DatabaseSession on request.
+ * Note: Returns new context unless Session is currently in a User Demarcated Transaction.
+ *
+ *
+ * @created 2006-10-06
+ *
+ * @author <a href="mailto:andrae at netymon.com">Andrae Muys</a>
+ *
+ * @version $Revision: $
+ *
+ * @modified $Date: $
+ *
+ * @maintenanceAuthor $Author: $
+ *
+ * @company <A href="mailto:mail at netymon.com">Netymon Pty Ltd</A>
+ *
+ * @copyright ©2006 <a href="http://www.netymon.com/">Netymon Pty Ltd</a>
+ *
+ * @licence Open Software License v3.0</a>
+ */
+
+public class MulgaraExternalTransactionFactory implements MulgaraTransactionFactory {
+ private Map<DatabaseSession, MulgaraExternalTransaction> activeTransaction;
+ private Assoc1toNMap<DatabaseSession, MulgaraExternalTransaction> sessionXAMap;
+
+
+ public MulgaraExternalTransactionFactory() {
+ super();
+ }
+
+ protected MulgaraTransaction createTransaction(final DatabaseSession session, boolean write)
+ throws MulgaraTransactionException {
+ return new MulgaraExternalTransaction(this, write);
+ }
+
+ /**
+ * Rollback, or abort all transactions associated with a DatabaseSession.
+ *
+ * Will only abort the transaction if the rollback attempt fails.
+ */
+ public void rollbackCurrentTransactions(DatabaseSession session)
+ throws MulgaraTransactionException {
+ }
+
+ public XAResource getXAResource(DatabaseSession session) {
+ return new MulgaraXAResource(this, session);
+ }
+}
Copied: branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraInternalTransaction.java (from rev 535, branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransaction.java)
===================================================================
--- branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransaction.java 2007-11-13 22:10:39 UTC (rev 535)
+++ branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraInternalTransaction.java 2007-11-14 08:13:10 UTC (rev 536)
@@ -0,0 +1,743 @@
+/*
+ * The contents of this file are subject to the Open Software License
+ * Version 3.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ * http://www.rosenlaw.com/OSL3.0.htm
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and limitations
+ * under the License.
+ *
+ * This file is an original work developed by Netymon Pty Ltd
+ * (http://www.netymon.com, mailto:mail at netymon.com). Portions created
+ * by Netymon Pty Ltd are Copyright (c) 2006 Netymon Pty Ltd.
+ * All Rights Reserved.
+ */
+package org.mulgara.resolver;
+
+// Java 2 enterprise packages
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.transaction.RollbackException;
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+import javax.transaction.xa.XAResource;
+
+// Third party packages
+import org.apache.log4j.Logger;
+
+// Local packages
+import org.mulgara.resolver.spi.DatabaseMetadata;
+import org.mulgara.resolver.spi.EnlistableResource;
+import org.mulgara.resolver.spi.ResolverSessionFactory;
+
+import org.mulgara.query.MulgaraTransactionException;
+import org.mulgara.query.TuplesException;
+import org.mulgara.query.QueryException;
+
+/**
+ * Responsible for the javax.transaction.Transaction object.
+ * Responsibilities
+ * Ensuring every begin or resume is followed by either a suspend or an end.
+ * Ensuring every suspend or end is preceeded by either a begin or a resume.
+ * In conjunction with TransactionalAnswer ensuring that
+ * all calls to operations on SubqueryAnswer are preceeded by a successful resume.
+ * all calls to operations on SubqueryAnswer conclude with a suspend as the last call prior to returning to the user.
+ * Collaborates with DatabaseTransactionManager to determine when to end the transaction.
+ *
+ * @created 2006-10-06
+ *
+ * @author <a href="mailto:andrae at netymon.com">Andrae Muys</a>
+ *
+ * @version $Revision: $
+ *
+ * @modified $Date: $
+ *
+ * @maintenanceAuthor $Author: $
+ *
+ * @company <a href="mailto:mail at netymon.com">Netymon Pty Ltd</a>
+ *
+ * @copyright ©2006 <a href="http://www.netymon.com/">Netymon Pty Ltd</a>
+ *
+ * @licence Open Software License v3.0
+ */
+public class MulgaraInternalTransaction {
+ /**
+ * This is the state machine switch matching these states.
+ switch (state) {
+ case CONSTRUCTEDREF:
+ case CONSTRUCTEDUNREF:
+ case ACTUNREF:
+ case ACTREF:
+ case DEACTREF:
+ case FINISHED:
+ case FAILED:
+ }
+ */
+ private enum State { CONSTRUCTEDREF, CONSTRUCTEDUNREF, ACTUNREF, ACTREF, DEACTREF, FINISHED, FAILED };
+
+ /** Logger. */
+ private static final Logger logger =
+ Logger.getLogger(MulgaraInternalTransaction.class.getName());
+
+ private MulgaraTransactionManager manager;
+ private DatabaseOperationContext context;
+ private Set<EnlistableResource> enlisted;
+
+ private Transaction transaction;
+ private Thread currentThread;
+
+ private ReentrantLock activationMutex;
+
+ private State state;
+ private int inuse;
+ private int using;
+
+ private Throwable rollbackCause;
+
+ public MulgaraInternalTransaction(MulgaraTransactionManager manager, DatabaseOperationContext context)
+ throws IllegalArgumentException {
+ report("Creating Transaction");
+
+ try {
+ if (manager == null) {
+ throw new IllegalArgumentException("Manager null in MulgaraTransaction");
+ } else if (context == null) {
+ throw new IllegalArgumentException("OperationContext null in MulgaraTransaction");
+ }
+ this.manager = manager;
+ this.context = context;
+ this.enlisted = new HashSet<EnlistableResource>();
+ this.activationMutex = new ReentrantLock();
+ this.currentThread = null;
+
+ inuse = 0;
+ using = 0;
+ state = State.CONSTRUCTEDUNREF;
+ rollbackCause = null;
+ } finally {
+ report("Finished Creating Transaction");
+ }
+ }
+
+
+ void activate() throws MulgaraTransactionException {
+// report("Activating Transaction");
+ try {
+ synchronized (this) {
+ if (currentThread != null && !currentThread.equals(Thread.currentThread())) {
+ throw new MulgaraTransactionException("Concurrent access attempted to transaction: Transaction has NOT been rolledback.");
+ }
+ }
+
+ acquireActivationMutex();
+ try {
+ switch (state) {
+ case CONSTRUCTEDUNREF:
+ startTransaction();
+ inuse = 1;
+ state = State.ACTUNREF;
+ try {
+ context.initiate(this);
+ } catch (Throwable th) {
+ throw implicitRollback(th);
+ }
+ break;
+ case CONSTRUCTEDREF:
+ startTransaction();
+ inuse = 1;
+ using = 1;
+ state = State.ACTREF;
+ try {
+ context.initiate(this);
+ } catch (Throwable th) {
+ throw implicitRollback(th);
+ }
+ break;
+ case DEACTREF:
+ resumeTransaction();
+ inuse = 1;
+ state = State.ACTREF;
+ break;
+ case ACTREF:
+ case ACTUNREF:
+ inuse++;
+ break;
+ case FINISHED:
+ throw new MulgaraTransactionException("Attempt to activate terminated transaction");
+ case FAILED:
+ throw new MulgaraTransactionException("Attempt to activate failed transaction", rollbackCause);
+ }
+ } finally {
+ releaseActivationMutex();
+ }
+
+ try {
+ checkActivated();
+ } catch (MulgaraTransactionException em) {
+ throw abortTransaction("Activate failed post-condition check", em);
+ }
+ } catch (MulgaraTransactionException em) {
+ throw em;
+ } catch (Throwable th) {
+ throw abortTransaction("Error activating transaction", th);
+ } finally {
+// report("Leaving Activate transaction");
+ }
+ }
+
+
+ private void deactivate() throws MulgaraTransactionException {
+// report("Deactivating transaction");
+
+ try {
+ synchronized (this) {
+ if (currentThread == null) {
+ throw new MulgaraTransactionException("Transaction not associated with thread");
+ } else if (!currentThread.equals(Thread.currentThread())) {
+ throw new MulgaraTransactionException("Concurrent access attempted to transaction: Transaction has NOT been rolledback.");
+ }
+ }
+
+ acquireActivationMutex();
+ try {
+ switch (state) {
+ case ACTUNREF:
+ if (inuse == 1) {
+ commitTransaction();
+ }
+ inuse--;
+ break;
+ case ACTREF:
+ if (inuse == 1) {
+ suspendTransaction();
+ }
+ inuse--;
+ break;
+ case CONSTRUCTEDREF:
+ throw new MulgaraTransactionException("Attempt to deactivate uninitiated refed transaction");
+ case CONSTRUCTEDUNREF:
+ throw new MulgaraTransactionException("Attempt to deactivate uninitiated transaction");
+ case DEACTREF:
+ throw new IllegalStateException("Attempt to deactivate unactivated transaction");
+ case FINISHED:
+ if (inuse < 0) {
+ errorReport("Activation count failure - too many deacts - in finished transaction", null);
+ } else {
+ inuse--;
+ }
+ break;
+ case FAILED:
+ // Nothing to do here.
+ break;
+ }
+ } finally {
+ releaseActivationMutex();
+ }
+ } catch (MulgaraTransactionException em) {
+ throw em;
+ } catch (Throwable th) {
+ throw abortTransaction("Error deactivating transaction", th);
+ } finally {
+// report("Leaving Deactivate Transaction");
+ }
+ }
+
+ // Note: The transaction is often not activated when these are called.
+ // This occurs when setting autocommit, as this creates and
+ // references a transaction object that won't be started/activated
+ // until it is first used.
+ void reference() throws MulgaraTransactionException {
+ try {
+ report("Referencing Transaction");
+
+ synchronized (this) {
+ if (currentThread != null && !currentThread.equals(Thread.currentThread())) {
+ throw new MulgaraTransactionException("Concurrent access attempted to transaction: Transaction has NOT been rolledback.");
+ }
+ }
+
+ switch (state) {
+ case CONSTRUCTEDUNREF:
+ state = State.CONSTRUCTEDREF;
+ break;
+ case ACTREF:
+ case ACTUNREF:
+ using++;
+ state = State.ACTREF;
+ break;
+ case DEACTREF:
+ using++;
+ break;
+ case CONSTRUCTEDREF:
+ throw new MulgaraTransactionException("Attempt to reference uninitated transaction twice");
+ case FINISHED:
+ throw new MulgaraTransactionException("Attempt to reference terminated transaction");
+ case FAILED:
+ throw new MulgaraTransactionException("Attempt to reference failed transaction", rollbackCause);
+ }
+ } catch (MulgaraTransactionException em) {
+ throw em;
+ } catch (Throwable th) {
+ report("Error referencing transaction");
+ throw implicitRollback(th);
+ } finally {
+ report("Leaving Reference Transaction");
+ }
+ }
+
+ void dereference() throws MulgaraTransactionException {
+ report("Dereferencing Transaction");
+ try {
+ synchronized (this) {
+ if (currentThread != null && !currentThread.equals(Thread.currentThread())) {
+ throw new MulgaraTransactionException("Concurrent access attempted to transaction: Transaction has NOT been rolledback.");
+ }
+ }
+
+ switch (state) {
+ case ACTREF:
+ if (using == 1) {
+ state = State.ACTUNREF;
+ }
+ using--;
+ break;
+ case CONSTRUCTEDREF:
+ state = State.CONSTRUCTEDUNREF;
+ break;
+ case FINISHED:
+ case FAILED:
+ if (using < 1) {
+ errorReport("Reference count failure - too many derefs - in finished transaction", null);
+ } else {
+ using--;
+ }
+ break;
+ case ACTUNREF:
+ throw new IllegalStateException("Attempt to dereference unreferenced transaction");
+ case CONSTRUCTEDUNREF:
+ throw new MulgaraTransactionException("Attempt to dereference uninitated transaction");
+ case DEACTREF:
+ throw new IllegalStateException("Attempt to dereference deactivated transaction");
+ }
+ } catch (MulgaraTransactionException em) {
+ throw em;
+ } catch (Throwable th) {
+ throw implicitRollback(th);
+ } finally {
+ report("Dereferenced Transaction");
+ }
+ }
+
+ private void startTransaction() throws MulgaraTransactionException {
+ report("Initiating transaction");
+ try {
+ transaction = manager.transactionStart(this);
+ synchronized (this) {
+ currentThread = Thread.currentThread();
+ }
+ } catch (Throwable th) {
+ throw abortTransaction("Failed to start transaction", th);
+ }
+ }
+
+ private void resumeTransaction() throws MulgaraTransactionException {
+// report("Resuming transaction");
+ try {
+ manager.transactionResumed(this, transaction);
+ synchronized (this) {
+ currentThread = Thread.currentThread();
+ }
+ } catch (Throwable th) {
+ abortTransaction("Failed to resume transaction", th);
+ }
+ }
+
+ private void suspendTransaction() throws MulgaraTransactionException {
+// report("Suspending Transaction");
+ try {
+ if (using < 1) {
+ throw implicitRollback(
+ new MulgaraTransactionException("Attempt to suspend unreferenced transaction"));
+ }
+ transaction = manager.transactionSuspended(this);
+ synchronized (this) {
+ currentThread = null;
+ }
+ state = State.DEACTREF;
+ } catch (Throwable th) {
+ throw implicitRollback(th);
+ } finally {
+// report("Finished suspending transaction");
+ }
+ }
+
+ public void commitTransaction() throws MulgaraTransactionException {
+ report("Committing Transaction");
+ try {
+ transaction.commit();
+ } catch (Throwable th) {
+ throw implicitRollback(th);
+ }
+ try {
+ try {
+ transaction = null;
+ } finally { try {
+ state = State.FINISHED;
+ } finally { try {
+ context.clear();
+ } finally { try {
+ enlisted.clear();
+ } finally { try {
+ manager.transactionComplete(this);
+ } finally { try {
+ manager = null;
+ } finally {
+ report("Committed transaction");
+ } } } } } }
+ } catch (Throwable th) {
+ errorReport("Error cleaning up transaction post-commit", th);
+ throw new MulgaraTransactionException("Error cleaning up transaction post-commit", th);
+ }
+ }
+
+ /**
+ * Rollback the transaction.
+ * We don't throw an exception here when transaction fails - this is expected,
+ * after all we requested it.
+ */
+ public void explicitRollback() throws MulgaraTransactionException {
+ synchronized (this) {
+ if (currentThread == null) {
+ throw new MulgaraTransactionException("Transaction failed activation check");
+ } else if (!currentThread.equals(Thread.currentThread())) {
+ throw new MulgaraTransactionException("Concurrent access attempted to transaction: Transaction has NOT been rolledback.");
+ }
+ }
+
+ try {
+ switch (state) {
+ case ACTUNREF:
+ case ACTREF:
+ transaction.rollback();
+ context.clear();
+ enlisted.clear();
+ manager.transactionComplete(this);
+ state = State.FINISHED;
+ break;
+ case DEACTREF:
+ throw new IllegalStateException("Attempt to rollback unactivated transaction");
+ case CONSTRUCTEDREF:
+ throw new MulgaraTransactionException("Attempt to rollback uninitiated ref'd transaction");
+ case CONSTRUCTEDUNREF:
+ throw new MulgaraTransactionException("Attempt to rollback uninitiated unref'd transaction");
+ case FINISHED:
+ throw new MulgaraTransactionException("Attempt to rollback finished transaction");
+ case FAILED:
+ throw new MulgaraTransactionException("Attempt to rollback failed transaction");
+ }
+ } catch (MulgaraTransactionException em) {
+ throw em;
+ } catch (Throwable th) {
+ throw implicitRollback(th);
+ }
+ }
+
+ /**
+ * This will endevour to terminate the transaction via a rollback - if this
+ * fails it will abort the transaction.
+ * If the rollback succeeds then this method will return a suitable
+ * MulgaraTransactionException to be thrown by the caller.
+ * If the rollback fails then this method will throw the resulting exception
+ * from abortTransaction().
+ * Post-condition: The transaction is terminated and cleaned up.
+ */
+ MulgaraTransactionException implicitRollback(Throwable cause) throws MulgaraTransactionException {
+ try {
+ report("Implicit Rollback triggered");
+
+ synchronized (this) {
+ if (currentThread == null) {
+ throw new MulgaraTransactionException("Transaction not associated with thread");
+ } else if (!currentThread.equals(Thread.currentThread())) {
+ throw new MulgaraTransactionException("Concurrent access attempted to transaction: Transaction has NOT been rolledback.");
+ }
+ }
+
+ if (rollbackCause != null) {
+ errorReport("Cascading error, transaction already rolled back", cause);
+ errorReport("Cascade error, expected initial cause", rollbackCause);
+
+ return new MulgaraTransactionException("Transaction already in rollback", cause);
+ }
+
+ switch (state) {
+ case ACTUNREF:
+ case ACTREF:
+ rollbackCause = cause;
+ transaction.rollback();
+ transaction = null;
+ context.clear();
+ enlisted.clear();
+ state = State.FAILED;
+ manager.transactionComplete(this);
+ manager = null;
+ return new MulgaraTransactionException("Transaction rollback triggered", cause);
+ case DEACTREF:
+ throw new IllegalStateException("Attempt to rollback deactivated transaction");
+ case CONSTRUCTEDREF:
+ throw new MulgaraTransactionException("Attempt to rollback uninitiated ref'd transaction");
+ case CONSTRUCTEDUNREF:
+ throw new MulgaraTransactionException("Attempt to rollback uninitiated unref'd transaction");
+ case FINISHED:
+ throw new MulgaraTransactionException("Attempt to rollback finished transaction");
+ case FAILED:
+ throw new MulgaraTransactionException("Attempt to rollback failed transaction");
+ default:
+ throw new MulgaraTransactionException("Unknown state");
+ }
+ } catch (Throwable th) {
+ try {
+ errorReport("Attempt to rollback failed; initiating cause: ", cause);
+ } finally {
+ throw abortTransaction("Failed to rollback normally - see log for inititing cause", th);
+ }
+ } finally {
+ report("Leaving implicitRollback");
+ }
+ }
+
+ /**
+ * Forces the transaction to be abandoned, including bypassing JTA to directly
+ * rollback/abort the underlying store-phases if required.
+ * Heavilly nested try{}finally{} should guarentee that even JVM errors should
+ * not prevent this function from cleaning up anything that can be cleaned up.
+ * We have to delegate to the OperationContext the abort() on the resolvers as
+ * only it has full knowledge of which resolvers are associated with this
+ * transaction.
+ */
+ MulgaraTransactionException abortTransaction(String errorMessage, Throwable cause)
+ throws MulgaraTransactionException {
+ // We need to notify the manager here - this is serious, we
+ // need to rollback this transaction, but if we have reached here
+ // we have failed to obtain a valid transaction to rollback!
+ try {
+ try {
+ errorReport(errorMessage + " - Aborting", cause);
+ } finally { try {
+ if (transaction != null) {
+ transaction.rollback();
+ }
+ } finally { try {
+ manager.transactionAborted(this);
+ } finally { try {
+ abortEnlistedResources();
+ } finally { try {
+ context.clear();
+ } finally { try {
+ enlisted.clear();
+ } finally { try {
+ transaction = null;
+ } finally { try {
+ manager = null;
+ } finally {
+ state = State.FAILED;
+ } } } } } } } }
+ return new MulgaraTransactionException(errorMessage + " - Aborting", cause);
+ } catch (Throwable th) {
+ throw new MulgaraTransactionException(errorMessage + " - Failed to abort cleanly", th);
+ } finally {
+ report("Leaving abortTransaction");
+ }
+ }
+
+ /**
+ * Used to bypass JTA and explicitly abort resources behind the scenes.
+ */
+ private void abortEnlistedResources() {
+ for (EnlistableResource e : enlisted) {
+ try {
+ e.abort();
+ } catch (Throwable th) {
+ try {
+ errorReport("Error aborting enlistable resource", th);
+ } catch (Throwable ignore) { }
+ }
+ }
+ }
+
+ void execute(Operation operation,
+ ResolverSessionFactory resolverSessionFactory, // FIXME: We shouldn't need this. - only used for backup and restore operations.
+ DatabaseMetadata metadata) throws MulgaraTransactionException {
+ report("Executing Operation");
+ try {
+ activate();
+ try {
+ operation.execute(context,
+ context.getSystemResolver(),
+ resolverSessionFactory,
+ metadata);
+ } catch (Throwable th) {
+ throw implicitRollback(th);
+ } finally {
+ deactivate();
+ }
+ } finally {
+ report("Executed Operation");
+ }
+ }
+
+ AnswerOperationResult execute(AnswerOperation ao) throws TuplesException {
+ debugReport("Executing AnswerOperation");
+ try {
+ activate();
+ try {
+ ao.execute();
+ return ao.getResult();
+ } catch (Throwable th) {
+ throw implicitRollback(th);
+ } finally {
+ deactivate();
+ }
+ } catch (MulgaraTransactionException em) {
+ throw new TuplesException("Transaction error", em);
+ } finally {
+ debugReport("Executed AnswerOperation");
+ }
+ }
+
+
+ void execute(TransactionOperation to) throws MulgaraTransactionException {
+ report("Executing TransactionOperation");
+ try {
+ activate();
+ try {
+ to.execute();
+ } catch (Throwable th) {
+ throw implicitRollback(th);
+ } finally {
+ deactivate();
+ }
+ } finally {
+ report("Executed TransactionOperation");
+ }
+ }
+
+ public void enlist(EnlistableResource enlistable) throws MulgaraTransactionException {
+ try {
+ synchronized (this) {
+ if (currentThread == null) {
+ throw new MulgaraTransactionException("Transaction not associated with thread");
+ } else if (!currentThread.equals(Thread.currentThread())) {
+ throw new MulgaraTransactionException("Concurrent access attempted to transaction: Transaction has NOT been rolledback.");
+ }
+ }
+
+ if (enlisted.contains(enlistable)) {
+ return;
+ }
+
+ switch (state) {
+ case ACTUNREF:
+ case ACTREF:
+ transaction.enlistResource(enlistable.getXAResource());
+ enlisted.add(enlistable);
+ break;
+ case CONSTRUCTEDREF:
+ throw new MulgaraTransactionException("Attempt to enlist resource in uninitated ref'd transaction");
+ case CONSTRUCTEDUNREF:
+ throw new MulgaraTransactionException("Attempt to enlist resource in uninitated unref'd transaction");
+ case DEACTREF:
+ throw new MulgaraTransactionException("Attempt to enlist resource in unactivated transaction");
+ case FINISHED:
+ throw new MulgaraTransactionException("Attempt to enlist resource in finished transaction");
+ case FAILED:
+ throw new MulgaraTransactionException("Attempt to enlist resource in failed transaction");
+ }
+ } catch (Throwable th) {
+ throw implicitRollback(th);
+ }
+ }
+
+ //
+ // Used internally
+ //
+
+ private void checkActivated() throws MulgaraTransactionException {
+ synchronized (this) {
+ if (currentThread == null) {
+ throw new MulgaraTransactionException("Transaction not associated with thread");
+ } else if (!currentThread.equals(Thread.currentThread())) {
+ throw new MulgaraTransactionException("Concurrent access attempted to transaction: Transaction has NOT been rolledback.");
+ }
+ }
+
+ switch (state) {
+ case ACTUNREF:
+ case ACTREF:
+ if (inuse < 0 || using < 0) {
+ throw new MulgaraTransactionException("Reference Failure, using: " + using + ", inuse: " + inuse);
+ }
+ return;
+ case CONSTRUCTEDREF:
+ throw new MulgaraTransactionException("Transaction (ref) uninitiated");
+ case CONSTRUCTEDUNREF:
+ throw new MulgaraTransactionException("Transaction (unref) uninitiated");
+ case DEACTREF:
+ throw new MulgaraTransactionException("Transaction deactivated");
+ case FINISHED:
+ throw new MulgaraTransactionException("Transaction is terminated");
+ case FAILED:
+ throw new MulgaraTransactionException("Transaction is failed", rollbackCause);
+ }
+ }
+
+ private void acquireActivationMutex() {
+ activationMutex.lock();
+ }
+
+ private void releaseActivationMutex() {
+ activationMutex.unlock();
+ }
+
+ protected void finalize() {
+ report("GC-finalize");
+ if (state != State.FINISHED && state != State.FAILED) {
+ errorReport("Finalizing incomplete transaction - aborting...", null);
+ try {
+ abortTransaction("Transaction finalized while still valid", new Throwable());
+ } catch (Throwable th) {
+ errorReport("Attempt to abort transaction from finalize failed", th);
+ }
+ }
+
+ if (state != State.FAILED && (inuse != 0 || using != 0)) {
+ errorReport("Reference counting error in transaction", null);
+ }
+
+ if (manager != null || transaction != null) {
+ errorReport("Transaction not terminated properly", null);
+ }
+ }
+
+ private void report(String desc) {
+ if (logger.isInfoEnabled()) {
+ logger.info(desc + ": " + System.identityHashCode(this) + ", state=" + state +
+ ", inuse=" + inuse + ", using=" + using);
+ }
+ }
+
+ private void debugReport(String desc) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(desc + ": " + System.identityHashCode(this) + ", state=" + state +
+ ", inuse=" + inuse + ", using=" + using);
+ }
+ }
+
+ private void errorReport(String desc, Throwable cause) {
+ logger.error(desc + ": " + System.identityHashCode(this) + ", state=" + state +
+ ", inuse=" + inuse + ", using=" + using, cause != null ? cause : new Throwable());
+ }
+}
Copied: branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraInternalTransactionFactory.java (from rev 535, branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionManager.java)
===================================================================
--- branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionManager.java 2007-11-13 22:10:39 UTC (rev 535)
+++ branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraInternalTransactionFactory.java 2007-11-14 08:13:10 UTC (rev 536)
@@ -0,0 +1,351 @@
+/*
+ * The contents of this file are subject to the Open Software License
+ * Version 3.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ * http://www.rosenlaw.com/OSL3.0.htm
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and limitations
+ * under the License.
+ *
+ * This file is an original work developed by Netymon Pty Ltd
+ * (http://www.netymon.com, mailto:mail at netymon.com). Portions created
+ * by Netymon Pty Ltd are Copyright (c) 2006 Netymon Pty Ltd.
+ * All Rights Reserved.
+ */
+
+package org.mulgara.resolver;
+
+// Java2 packages
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAResource;
+
+// Third party packages
+import org.apache.log4j.Logger;
+
+// Local packages
+import org.mulgara.query.MulgaraTransactionException;
+import org.mulgara.server.Session;
+import org.mulgara.transaction.TransactionManagerFactory;
+
+/**
+ * Manages transactions within Mulgara.
+ *
+ * see http://mulgara.org/confluence/display/dev/Transaction+Architecture
+ *
+ * Maintains association between Answer's and TransactionContext's.
+ * Manages tracking the ownership of the write-lock.
+ * Maintains the write-queue and any timeout algorithm desired.
+ * Provides new/existing TransactionContext's to DatabaseSession on request.
+ * Note: Returns new context unless Session is currently in a User Demarcated Transaction.
+ *
+ *
+ * @created 2006-10-06
+ *
+ * @author <a href="mailto:andrae at netymon.com">Andrae Muys</a>
+ *
+ * @version $Revision: $
+ *
+ * @modified $Date: $
+ *
+ * @maintenanceAuthor $Author: $
+ *
+ * @company <A href="mailto:mail at netymon.com">Netymon Pty Ltd</A>
+ *
+ * @copyright ©2006 <a href="http://www.netymon.com/">Netymon Pty Ltd</a>
+ *
+ * @licence Open Software License v3.0</a>
+ */
+
+public class MulgaraInternalTransactionFactory extends MulgaraTransactionFactory {
+ /** Logger. */
+ private static final Logger logger =
+ Logger.getLogger(MulgaraInternalTransactionFactory.class.getName());
+
+ private boolean autoCommit;
+
+ /** Set of sessions whose transactions have been rolledback.*/
+ private Set<Session> failedSessions;
+
+ /** Map of threads to active transactions. */
+ private Map<Thread, MulgaraTransaction> activeTransactions;
+
+ private final TransactionManager transactionManager;
+
+ public MulgaraInternalTransactionFactory(MulgaraTransactionManager manager, TransactionManagerFactory transactionManagerFactory) {
+ super(manager);
+ this.autoCommit = true;
+
+ this.failedSessions = new HashSet<Session>();
+ this.activeTransactions = new HashMap<Thread, MulgaraTransaction>();
+
+ this.transactionManager = transactionManagerFactory.newTransactionManager();
+ }
+
+ /**
+ * Allows DatabaseSession to initiate/obtain a transaction.
+ * <ul>
+ * <li>If the Session holds the write lock, return the current Write-Transaction.</li>
+ * <li>If the Session does not hold the write lock and requests a read-only transaction,
+ * create a new ro-transaction object and return it.</li>
+ * <li>If the Session does not hold the write lock and requests a read-write transaction,
+ * obtain the write-lock, create a new transaction object and return it.</li>
+ * </ul>
+ */
+ public MulgaraTransaction newMulgaraTransaction(DatabaseOperationContext context)
+ throws MulgaraTransactionException {
+ return new MulgaraInternalTransaction(this, context);
+ }
+
+
+ public void commit(DatabaseSession session) throws MulgaraTransactionException {
+ acquireMutex();
+ try {
+ manager.reserveWriteLock();
+ try {
+ if (failedSessions.contains(session)) {
+ throw new MulgaraTransactionException("Attempting to commit failed exception");
+ } else if (session != currentWritingSession) {
+ throw new MulgaraTransactionException(
+ "Attempting to commit while not the current writing transaction");
+ }
+
+ setAutoCommit(session, true);
+ setAutoCommit(session, false);
+ } finally {
+ manager.releaseReserve();
+ }
+ } finally {
+ releaseMutex();
+ }
+ }
+
+
+ /**
+ * This is an explicit, user-specified rollback.
+ *
+ * This needs to be distinguished from an implicit rollback triggered by failure.
+ */
+ public void rollback(DatabaseSession session) throws MulgaraTransactionException {
+ acquireMutex();
+ try {
+ manager.reserveWriteLock();
+ try {
+ if (manager.isHoldingWriteLock(session)) {
+ try {
+ writeTransaction.rollback("Explicit Rollback");
+ // FIXME: Should be checking status here, not writelock.
+ if (manager.isHoldingWriteLock(session)) {
+ // transaction referenced by something - need to explicitly end it.
+ writeTransaction.abortTransaction("Rollback failed",
+ new MulgaraTransactionException("Rollback failed to terminate write transaction"));
+ }
+ } finally {
+ failedSessions.add(session);
+ setAutoCommit(session, false);
+ }
+ } else if (failedSessions.contains(session)) {
+ failedSessions.remove(session);
+ setAutoCommit(session, false);
+ } else {
+ throw new MulgaraTransactionException(
+ "Attempt to rollback while not in the current writing transaction");
+ }
+ } finally {
+ manager.releaseReserve();
+ }
+ } finally {
+ releaseMutex();
+ }
+ }
+
+ public void setAutoCommit(DatabaseSession session, boolean autoCommit)
+ throws MulgaraTransactionException {
+ acquireMutex();
+ try {
+ if (manager.isHoldingWriteLock(session) && failedSessions.contains(session)) {
+ writeTransaction.abortTransaction("Session failed and still holding writeLock",
+ new MulgaraTransactionException("Failed Session in setAutoCommit"));
+ }
+
+ if (manager.isHoldingWriteLock(session) || failedSessions.contains(session)) {
+ if (autoCommit) {
+ // AutoCommit off -> on === branch on current state of transaction.
+ if (manager.isHoldingWriteLock(session)) {
+ // Within active transaction - commit and finalise.
+ try {
+ runWithoutMutex(new TransactionOperation() {
+ public void execute() throws MulgaraTransactionException {
+ writeTransaction.execute(new TransactionOperation() {
+ public void execute() throws MulgaraTransactionException {
+ writeTransaction.dereference();
+ writeTransaction.commitTransaction();
+ }
+ });
+ }
+ });
+ } finally {
+ releaseWriteLock();
+ this.autoCommit = true;
+ }
+ } else if (failedSessions.contains(session)) {
+ // Within failed transaction - cleanup.
+ failedSessions.remove(session);
+ }
+ } else {
+ logger.info("Attempt to set autocommit false twice");
+ // AutoCommit off -> off === no-op. Log info.
+ }
+ } else {
+ if (autoCommit) {
+ // AutoCommit on -> on === no-op. Log info.
+ logger.info("Attempting to set autocommit true without setting it false");
+ } else {
+ // AutoCommit on -> off == Start new transaction.
+ writeTransaction = getTransaction(session, true);
+ writeTransaction.reference();
+ this.autoCommit = false;
+ }
+ }
+ } finally {
+ releaseMutex();
+ }
+ }
+
+ //
+ // Transaction livecycle callbacks.
+ //
+
+ public Transaction transactionStart(MulgaraTransaction transaction) throws MulgaraTransactionException {
+ acquireMutex();
+ try {
+ try {
+ logger.info("Beginning Transaction");
+ if (activeTransactions.get(Thread.currentThread()) != null) {
+ throw new MulgaraTransactionException(
+ "Attempt to start transaction in thread with exiting active transaction.");
+ } else if (activeTransactions.containsValue(transaction)) {
+ throw new MulgaraTransactionException("Attempt to start transaction twice");
+ }
+
+ transactionManager.begin();
+ Transaction jtaTrans = transactionManager.getTransaction();
+
+ activeTransactions.put(Thread.currentThread(), transaction);
+
+ return jtaTrans;
+ } catch (Exception e) {
+ throw new MulgaraTransactionException("Transaction Begin Failed", e);
+ }
+ } finally {
+ releaseMutex();
+ }
+ }
+
+ public void transactionResumed(MulgaraTransaction transaction, Transaction jtaXA)
+ throws MulgaraTransactionException {
+ acquireMutex();
+ try {
+ if (activeTransactions.get(Thread.currentThread()) != null) {
+ throw new MulgaraTransactionException(
+ "Attempt to resume transaction in already activated thread");
+ } else if (activeTransactions.containsValue(transaction)) {
+ throw new MulgaraTransactionException("Attempt to resume active transaction");
+ }
+
+ try {
+ transactionManager.resume(jtaXA);
+ activeTransactions.put(Thread.currentThread(), transaction);
+ } catch (Exception e) {
+ throw new MulgaraTransactionException("Resume Failed", e);
+ }
+ } finally {
+ releaseMutex();
+ }
+ }
+
+ public Transaction transactionSuspended(MulgaraTransaction transaction)
+ throws MulgaraTransactionException {
+ acquireMutex();
+ try {
+ try {
+ if (transaction != activeTransactions.get(Thread.currentThread())) {
+ throw new MulgaraTransactionException(
+ "Attempt to suspend transaction from outside thread");
+ }
+
+ if (autoCommit && transaction == writeTransaction) {
+ logger.error("Attempt to suspend write transaction without setting AutoCommit Off");
+ throw new MulgaraTransactionException(
+ "Attempt to suspend write transaction without setting AutoCommit Off");
+ }
+
+ Transaction xa = transactionManager.suspend();
+ activeTransactions.remove(Thread.currentThread());
+
+ return xa;
+ } catch (Throwable th) {
+ logger.error("Attempt to suspend failed", th);
+ try {
+ transactionManager.setRollbackOnly();
+ } catch (Throwable t) {
+ logger.error("Attempt to setRollbackOnly() failed", t);
+ }
+ throw new MulgaraTransactionException("Suspend failed", th);
+ }
+ } finally {
+ releaseMutex();
+ }
+ }
+
+ public void transactionComplete(MulgaraTransaction transaction) {
+ acquireMutex();
+ try {
+ if (transaction == writeTransaction) {
+ manager.releaseWriteLock(session);
+ writeTransaction = null;
+ }
+
+ sessionXAMap.removeN(transaction);
+ activeTransactions.remove(Thread.currentThread());
+ } finally {
+ releaseMutex();
+ }
+ }
+
+ public void transactionAborted(MulgaraTransaction transaction) {
+ acquireMutex();
+ try {
+ try {
+ // Make sure this cleans up the transaction metadata - this transaction is DEAD!
+ if (transaction == writeTransaction) {
+ failedSessions.add(currentWritingSession);
+ }
+ transactionComplete(transaction);
+ } catch (Throwable th) {
+ // FIXME: This should probably abort the entire server after logging the error!
+ logger.error("Error managing transaction abort", th);
+ }
+ } finally {
+ releaseMutex();
+ }
+ }
+
+ public void setTransactionTimeout(int transactionTimeout) {
+ try {
+ transactionManager.setTransactionTimeout(transactionTimeout);
+ } catch (SystemException es) {
+ logger.warn("Unable to set transaction timeout: " + transactionTimeout, es);
+ }
+ }
+}
Modified: branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransaction.java
===================================================================
--- branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransaction.java 2007-11-13 22:10:39 UTC (rev 535)
+++ branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransaction.java 2007-11-14 08:13:10 UTC (rev 536)
@@ -11,19 +11,12 @@
*
* This file is an original work developed by Netymon Pty Ltd
* (http://www.netymon.com, mailto:mail at netymon.com). Portions created
- * by Netymon Pty Ltd are Copyright (c) 2006 Netymon Pty Ltd.
+ * by Netymon Pty Ltd are Copyright (c) 2007 Netymon Pty Ltd.
* All Rights Reserved.
*/
package org.mulgara.resolver;
// Java 2 enterprise packages
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.locks.ReentrantLock;
-import javax.transaction.RollbackException;
-import javax.transaction.SystemException;
-import javax.transaction.Transaction;
-import javax.transaction.xa.XAResource;
// Third party packages
import org.apache.log4j.Logger;
@@ -38,17 +31,8 @@
import org.mulgara.query.QueryException;
/**
- * Responsible for the javax.transaction.Transaction object.
- * Responsibilities
- * Ensuring every begin or resume is followed by either a suspend or an end.
- * Ensuring every suspend or end is preceeded by either a begin or a resume.
- * In conjunction with TransactionalAnswer ensuring that
- * all calls to operations on SubqueryAnswer are preceeded by a successful resume.
- * all calls to operations on SubqueryAnswer conclude with a suspend as the last call prior to returning to the user.
- * Collaborates with DatabaseTransactionManager to determine when to end the transaction.
+ * @created 2007-11-06
*
- * @created 2006-10-06
- *
* @author <a href="mailto:andrae at netymon.com">Andrae Muys</a>
*
* @version $Revision: $
@@ -59,685 +43,50 @@
*
* @company <a href="mailto:mail at netymon.com">Netymon Pty Ltd</a>
*
- * @copyright ©2006 <a href="http://www.netymon.com/">Netymon Pty Ltd</a>
+ * @copyright ©2007 <a href="http://www.netymon.com/">Netymon Pty Ltd</a>
*
* @licence Open Software License v3.0
*/
public class MulgaraTransaction {
+ void reference() throws MulgaraTransactionException;
+ void dereference() throws MulgaraTransactionException;
+
/**
- * This is the state machine switch matching these states.
- switch (state) {
- case CONSTRUCTEDREF:
- case CONSTRUCTEDUNREF:
- case ACTUNREF:
- case ACTREF:
- case DEACTREF:
- case FINISHED:
- case FAILED:
- }
+ * Forces the transaction to be abandoned, including bypassing JTA to directly
+ * rollback/abort the underlying store-phases if required.
+ * This this transaction is externally managed this amounts to a heuristic
+ * rollback decision and should be treated as such.
+ *
+ * @return an exception constructed with the provided error-message and cause.
+ * @throws MulgaraTransactionException if a further error is encounted while
+ * attempting to abort.
*/
- private enum State { CONSTRUCTEDREF, CONSTRUCTEDUNREF, ACTUNREF, ACTREF, DEACTREF, FINISHED, FAILED };
+ MulgaraTransactionException abortTransaction(String errorMessage, Throwable cause) throws MulgaraTransactionException;
- /** Logger. */
- private static final Logger logger =
- Logger.getLogger(MulgaraTransaction.class.getName());
-
- private MulgaraTransactionManager manager;
- private DatabaseOperationContext context;
- private Set<EnlistableResource> enlisted;
-
- private Transaction transaction;
- private Thread currentThread;
-
- private ReentrantLock activationMutex;
-
- private State state;
- private int inuse;
- private int using;
-
- private Throwable rollbackCause;
-
- public MulgaraTransaction(MulgaraTransactionManager manager, DatabaseOperationContext context)
- throws IllegalArgumentException {
- report("Creating Transaction");
-
- try {
- if (manager == null) {
- throw new IllegalArgumentException("Manager null in MulgaraTransaction");
- } else if (context == null) {
- throw new IllegalArgumentException("OperationContext null in MulgaraTransaction");
- }
- this.manager = manager;
- this.context = context;
- this.enlisted = new HashSet<EnlistableResource>();
- this.activationMutex = new ReentrantLock();
- this.currentThread = null;
-
- inuse = 0;
- using = 0;
- state = State.CONSTRUCTEDUNREF;
- rollbackCause = null;
- } finally {
- report("Finished Creating Transaction");
- }
- }
-
-
- void activate() throws MulgaraTransactionException {
-// report("Activating Transaction");
- try {
- synchronized (this) {
- if (currentThread != null && !currentThread.equals(Thread.currentThread())) {
- throw new MulgaraTransactionException("Concurrent access attempted to transaction: Transaction has NOT been rolledback.");
- }
- }
-
- acquireActivationMutex();
- try {
- switch (state) {
- case CONSTRUCTEDUNREF:
- startTransaction();
- inuse = 1;
- state = State.ACTUNREF;
- try {
- context.initiate(this);
- } catch (Throwable th) {
- throw implicitRollback(th);
- }
- break;
- case CONSTRUCTEDREF:
- startTransaction();
- inuse = 1;
- using = 1;
- state = State.ACTREF;
- try {
- context.initiate(this);
- } catch (Throwable th) {
- throw implicitRollback(th);
- }
- break;
- case DEACTREF:
- resumeTransaction();
- inuse = 1;
- state = State.ACTREF;
- break;
- case ACTREF:
- case ACTUNREF:
- inuse++;
- break;
- case FINISHED:
- throw new MulgaraTransactionException("Attempt to activate terminated transaction");
- case FAILED:
- throw new MulgaraTransactionException("Attempt to activate failed transaction", rollbackCause);
- }
- } finally {
- releaseActivationMutex();
- }
-
- try {
- checkActivated();
- } catch (MulgaraTransactionException em) {
- throw abortTransaction("Activate failed post-condition check", em);
- }
- } catch (MulgaraTransactionException em) {
- throw em;
- } catch (Throwable th) {
- throw abortTransaction("Error activating transaction", th);
- } finally {
-// report("Leaving Activate transaction");
- }
- }
-
-
- private void deactivate() throws MulgaraTransactionException {
-// report("Deactivating transaction");
-
- try {
- synchronized (this) {
- if (currentThread == null) {
- throw new MulgaraTransactionException("Transaction not associated with thread");
- } else if (!currentThread.equals(Thread.currentThread())) {
- throw new MulgaraTransactionException("Concurrent access attempted to transaction: Transaction has NOT been rolledback.");
- }
- }
-
- acquireActivationMutex();
- try {
- switch (state) {
- case ACTUNREF:
- if (inuse == 1) {
- commitTransaction();
- }
- inuse--;
- break;
- case ACTREF:
- if (inuse == 1) {
- suspendTransaction();
- }
- inuse--;
- break;
- case CONSTRUCTEDREF:
- throw new MulgaraTransactionException("Attempt to deactivate uninitiated refed transaction");
- case CONSTRUCTEDUNREF:
- throw new MulgaraTransactionException("Attempt to deactivate uninitiated transaction");
- case DEACTREF:
- throw new IllegalStateException("Attempt to deactivate unactivated transaction");
- case FINISHED:
- if (inuse < 0) {
- errorReport("Activation count failure - too many deacts - in finished transaction", null);
- } else {
- inuse--;
- }
- break;
- case FAILED:
- // Nothing to do here.
- break;
- }
- } finally {
- releaseActivationMutex();
- }
- } catch (MulgaraTransactionException em) {
- throw em;
- } catch (Throwable th) {
- throw abortTransaction("Error deactivating transaction", th);
- } finally {
-// report("Leaving Deactivate Transaction");
- }
- }
-
- // Note: The transaction is often not activated when these are called.
- // This occurs when setting autocommit, as this creates and
- // references a transaction object that won't be started/activated
- // until it is first used.
- void reference() throws MulgaraTransactionException {
- try {
- report("Referencing Transaction");
-
- synchronized (this) {
- if (currentThread != null && !currentThread.equals(Thread.currentThread())) {
- throw new MulgaraTransactionException("Concurrent access attempted to transaction: Transaction has NOT been rolledback.");
- }
- }
-
- switch (state) {
- case CONSTRUCTEDUNREF:
- state = State.CONSTRUCTEDREF;
- break;
- case ACTREF:
- case ACTUNREF:
- using++;
- state = State.ACTREF;
- break;
- case DEACTREF:
- using++;
- break;
- case CONSTRUCTEDREF:
- throw new MulgaraTransactionException("Attempt to reference uninitated transaction twice");
- case FINISHED:
- throw new MulgaraTransactionException("Attempt to reference terminated transaction");
- case FAILED:
- throw new MulgaraTransactionException("Attempt to reference failed transaction", rollbackCause);
- }
- } catch (MulgaraTransactionException em) {
- throw em;
- } catch (Throwable th) {
- report("Error referencing transaction");
- throw implicitRollback(th);
- } finally {
- report("Leaving Reference Transaction");
- }
- }
-
- void dereference() throws MulgaraTransactionException {
- report("Dereferencing Transaction");
- try {
- synchronized (this) {
- if (currentThread != null && !currentThread.equals(Thread.currentThread())) {
- throw new MulgaraTransactionException("Concurrent access attempted to transaction: Transaction has NOT been rolledback.");
- }
- }
-
- switch (state) {
- case ACTREF:
- if (using == 1) {
- state = State.ACTUNREF;
- }
- using--;
- break;
- case CONSTRUCTEDREF:
- state = State.CONSTRUCTEDUNREF;
- break;
- case FINISHED:
- case FAILED:
- if (using < 1) {
- errorReport("Reference count failure - too many derefs - in finished transaction", null);
- } else {
- using--;
- }
- break;
- case ACTUNREF:
- throw new IllegalStateException("Attempt to dereference unreferenced transaction");
- case CONSTRUCTEDUNREF:
- throw new MulgaraTransactionException("Attempt to dereference uninitated transaction");
- case DEACTREF:
- throw new IllegalStateException("Attempt to dereference deactivated transaction");
- }
- } catch (MulgaraTransactionException em) {
- throw em;
- } catch (Throwable th) {
- throw implicitRollback(th);
- } finally {
- report("Dereferenced Transaction");
- }
- }
-
- private void startTransaction() throws MulgaraTransactionException {
- report("Initiating transaction");
- try {
- transaction = manager.transactionStart(this);
- synchronized (this) {
- currentThread = Thread.currentThread();
- }
- } catch (Throwable th) {
- throw abortTransaction("Failed to start transaction", th);
- }
- }
-
- private void resumeTransaction() throws MulgaraTransactionException {
-// report("Resuming transaction");
- try {
- manager.transactionResumed(this, transaction);
- synchronized (this) {
- currentThread = Thread.currentThread();
- }
- } catch (Throwable th) {
- abortTransaction("Failed to resume transaction", th);
- }
- }
-
- private void suspendTransaction() throws MulgaraTransactionException {
-// report("Suspending Transaction");
- try {
- if (using < 1) {
- throw implicitRollback(
- new MulgaraTransactionException("Attempt to suspend unreferenced transaction"));
- }
- transaction = manager.transactionSuspended(this);
- synchronized (this) {
- currentThread = null;
- }
- state = State.DEACTREF;
- } catch (Throwable th) {
- throw implicitRollback(th);
- } finally {
-// report("Finished suspending transaction");
- }
- }
-
- public void commitTransaction() throws MulgaraTransactionException {
- report("Committing Transaction");
- try {
- transaction.commit();
- } catch (Throwable th) {
- throw implicitRollback(th);
- }
- try {
- try {
- transaction = null;
- } finally { try {
- state = State.FINISHED;
- } finally { try {
- context.clear();
- } finally { try {
- enlisted.clear();
- } finally { try {
- manager.transactionComplete(this);
- } finally { try {
- manager = null;
- } finally {
- report("Committed transaction");
- } } } } } }
- } catch (Throwable th) {
- errorReport("Error cleaning up transaction post-commit", th);
- throw new MulgaraTransactionException("Error cleaning up transaction post-commit", th);
- }
- }
-
/**
- * Rollback the transaction.
- * We don't throw an exception here when transaction fails - this is expected,
- * after all we requested it.
+ * Execute the specified operation.
+ *
+ * FIXME: We shouldn't need resolverSessionFactory as this is only used for backup and restore operations.
*/
- public void explicitRollback() throws MulgaraTransactionException {
- synchronized (this) {
- if (currentThread == null) {
- throw new MulgaraTransactionException("Transaction failed activation check");
- } else if (!currentThread.equals(Thread.currentThread())) {
- throw new MulgaraTransactionException("Concurrent access attempted to transaction: Transaction has NOT been rolledback.");
- }
- }
+ void execute(Operation operation,
+ ResolverSessionFactory resolverSessionFactory,
+ DatabaseMetadata metadata) throws MulgaraTransactionException;
- try {
- switch (state) {
- case ACTUNREF:
- case ACTREF:
- transaction.rollback();
- context.clear();
- enlisted.clear();
- manager.transactionComplete(this);
- state = State.FINISHED;
- break;
- case DEACTREF:
- throw new IllegalStateException("Attempt to rollback unactivated transaction");
- case CONSTRUCTEDREF:
- throw new MulgaraTransactionException("Attempt to rollback uninitiated ref'd transaction");
- case CONSTRUCTEDUNREF:
- throw new MulgaraTransactionException("Attempt to rollback uninitiated unref'd transaction");
- case FINISHED:
- throw new MulgaraTransactionException("Attempt to rollback finished transaction");
- case FAILED:
- throw new MulgaraTransactionException("Attempt to rollback failed transaction");
- }
- } catch (MulgaraTransactionException em) {
- throw em;
- } catch (Throwable th) {
- throw implicitRollback(th);
- }
- }
-
/**
- * This will endevour to terminate the transaction via a rollback - if this
- * fails it will abort the transaction.
- * If the rollback succeeds then this method will return a suitable
- * MulgaraTransactionException to be thrown by the caller.
- * If the rollback fails then this method will throw the resulting exception
- * from abortTransaction().
- * Post-condition: The transaction is terminated and cleaned up.
+ * Execute the specified operation.
+ * Used by TransactionalAnswer to ensure transactional guarantees are met when
+ * using a result whose transaction may be in a suspended state.
*/
- MulgaraTransactionException implicitRollback(Throwable cause) throws MulgaraTransactionException {
- try {
- report("Implicit Rollback triggered");
+ AnswerOperationResult execute(AnswerOperation ao) throws TuplesException;
- synchronized (this) {
- if (currentThread == null) {
- throw new MulgaraTransactionException("Transaction not associated with thread");
- } else if (!currentThread.equals(Thread.currentThread())) {
- throw new MulgaraTransactionException("Concurrent access attempted to transaction: Transaction has NOT been rolledback.");
- }
- }
-
- if (rollbackCause != null) {
- errorReport("Cascading error, transaction already rolled back", cause);
- errorReport("Cascade error, expected initial cause", rollbackCause);
-
- return new MulgaraTransactionException("Transaction already in rollback", cause);
- }
-
- switch (state) {
- case ACTUNREF:
- case ACTREF:
- rollbackCause = cause;
- transaction.rollback();
- transaction = null;
- context.clear();
- enlisted.clear();
- state = State.FAILED;
- manager.transactionComplete(this);
- manager = null;
- return new MulgaraTransactionException("Transaction rollback triggered", cause);
- case DEACTREF:
- throw new IllegalStateException("Attempt to rollback deactivated transaction");
- case CONSTRUCTEDREF:
- throw new MulgaraTransactionException("Attempt to rollback uninitiated ref'd transaction");
- case CONSTRUCTEDUNREF:
- throw new MulgaraTransactionException("Attempt to rollback uninitiated unref'd transaction");
- case FINISHED:
- throw new MulgaraTransactionException("Attempt to rollback finished transaction");
- case FAILED:
- throw new MulgaraTransactionException("Attempt to rollback failed transaction");
- default:
- throw new MulgaraTransactionException("Unknown state");
- }
- } catch (Throwable th) {
- try {
- errorReport("Attempt to rollback failed; initiating cause: ", cause);
- } finally {
- throw abortTransaction("Failed to rollback normally - see log for inititing cause", th);
- }
- } finally {
- report("Leaving implicitRollback");
- }
- }
-
/**
- * Forces the transaction to be abandoned, including bypassing JTA to directly
- * rollback/abort the underlying store-phases if required.
- * Heavilly nested try{}finally{} should guarentee that even JVM errors should
- * not prevent this function from cleaning up anything that can be cleaned up.
- * We have to delegate to the OperationContext the abort() on the resolvers as
- * only it has full knowledge of which resolvers are associated with this
- * transaction.
+ * Used by the TransactionCoordinator/Manager.
*/
- MulgaraTransactionException abortTransaction(String errorMessage, Throwable cause)
- throws MulgaraTransactionException {
- // We need to notify the manager here - this is serious, we
- // need to rollback this transaction, but if we have reached here
- // we have failed to obtain a valid transaction to rollback!
- try {
- try {
- errorReport(errorMessage + " - Aborting", cause);
- } finally { try {
- if (transaction != null) {
- transaction.rollback();
- }
- } finally { try {
- manager.transactionAborted(this);
- } finally { try {
- abortEnlistedResources();
- } finally { try {
- context.clear();
- } finally { try {
- enlisted.clear();
- } finally { try {
- transaction = null;
- } finally { try {
- manager = null;
- } finally {
- state = State.FAILED;
- } } } } } } } }
- return new MulgaraTransactionException(errorMessage + " - Aborting", cause);
- } catch (Throwable th) {
- throw new MulgaraTransactionException(errorMessage + " - Failed to abort cleanly", th);
- } finally {
- report("Leaving abortTransaction");
- }
- }
+ void execute(TransactionOperation to) throws MulgaraTransactionException;
/**
- * Used to bypass JTA and explicitly abort resources behind the scenes.
+ * enlist an XAResource in this transaction - includes an extra method
+ * abort().
*/
- private void abortEnlistedResources() {
- for (EnlistableResource e : enlisted) {
- try {
- e.abort();
- } catch (Throwable th) {
- try {
- errorReport("Error aborting enlistable resource", th);
- } catch (Throwable ignore) { }
- }
- }
- }
-
- void execute(Operation operation,
- ResolverSessionFactory resolverSessionFactory, // FIXME: We shouldn't need this. - only used for backup and restore operations.
- DatabaseMetadata metadata) throws MulgaraTransactionException {
- report("Executing Operation");
- try {
- activate();
- try {
- operation.execute(context,
- context.getSystemResolver(),
- resolverSessionFactory,
- metadata);
- } catch (Throwable th) {
- throw implicitRollback(th);
- } finally {
- deactivate();
- }
- } finally {
- report("Executed Operation");
- }
- }
-
- AnswerOperationResult execute(AnswerOperation ao) throws TuplesException {
- debugReport("Executing AnswerOperation");
- try {
- activate();
- try {
- ao.execute();
- return ao.getResult();
- } catch (Throwable th) {
- throw implicitRollback(th);
- } finally {
- deactivate();
- }
- } catch (MulgaraTransactionException em) {
- throw new TuplesException("Transaction error", em);
- } finally {
- debugReport("Executed AnswerOperation");
- }
- }
-
-
- void execute(TransactionOperation to) throws MulgaraTransactionException {
- report("Executing TransactionOperation");
- try {
- activate();
- try {
- to.execute();
- } catch (Throwable th) {
- throw implicitRollback(th);
- } finally {
- deactivate();
- }
- } finally {
- report("Executed TransactionOperation");
- }
- }
-
- public void enlist(EnlistableResource enlistable) throws MulgaraTransactionException {
- try {
- synchronized (this) {
- if (currentThread == null) {
- throw new MulgaraTransactionException("Transaction not associated with thread");
- } else if (!currentThread.equals(Thread.currentThread())) {
- throw new MulgaraTransactionException("Concurrent access attempted to transaction: Transaction has NOT been rolledback.");
- }
- }
-
- if (enlisted.contains(enlistable)) {
- return;
- }
-
- switch (state) {
- case ACTUNREF:
- case ACTREF:
- transaction.enlistResource(enlistable.getXAResource());
- enlisted.add(enlistable);
- break;
- case CONSTRUCTEDREF:
- throw new MulgaraTransactionException("Attempt to enlist resource in uninitated ref'd transaction");
- case CONSTRUCTEDUNREF:
- throw new MulgaraTransactionException("Attempt to enlist resource in uninitated unref'd transaction");
- case DEACTREF:
- throw new MulgaraTransactionException("Attempt to enlist resource in unactivated transaction");
- case FINISHED:
- throw new MulgaraTransactionException("Attempt to enlist resource in finished transaction");
- case FAILED:
- throw new MulgaraTransactionException("Attempt to enlist resource in failed transaction");
- }
- } catch (Throwable th) {
- throw implicitRollback(th);
- }
- }
-
- //
- // Used internally
- //
-
- private void checkActivated() throws MulgaraTransactionException {
- synchronized (this) {
- if (currentThread == null) {
- throw new MulgaraTransactionException("Transaction not associated with thread");
- } else if (!currentThread.equals(Thread.currentThread())) {
- throw new MulgaraTransactionException("Concurrent access attempted to transaction: Transaction has NOT been rolledback.");
- }
- }
-
- switch (state) {
- case ACTUNREF:
- case ACTREF:
- if (inuse < 0 || using < 0) {
- throw new MulgaraTransactionException("Reference Failure, using: " + using + ", inuse: " + inuse);
- }
- return;
- case CONSTRUCTEDREF:
- throw new MulgaraTransactionException("Transaction (ref) uninitiated");
- case CONSTRUCTEDUNREF:
- throw new MulgaraTransactionException("Transaction (unref) uninitiated");
- case DEACTREF:
- throw new MulgaraTransactionException("Transaction deactivated");
- case FINISHED:
- throw new MulgaraTransactionException("Transaction is terminated");
- case FAILED:
- throw new MulgaraTransactionException("Transaction is failed", rollbackCause);
- }
- }
-
- private void acquireActivationMutex() {
- activationMutex.lock();
- }
-
- private void releaseActivationMutex() {
- activationMutex.unlock();
- }
-
- protected void finalize() {
- report("GC-finalize");
- if (state != State.FINISHED && state != State.FAILED) {
- errorReport("Finalizing incomplete transaction - aborting...", null);
- try {
- abortTransaction("Transaction finalized while still valid", new Throwable());
- } catch (Throwable th) {
- errorReport("Attempt to abort transaction from finalize failed", th);
- }
- }
-
- if (state != State.FAILED && (inuse != 0 || using != 0)) {
- errorReport("Reference counting error in transaction", null);
- }
-
- if (manager != null || transaction != null) {
- errorReport("Transaction not terminated properly", null);
- }
- }
-
- private void report(String desc) {
- if (logger.isInfoEnabled()) {
- logger.info(desc + ": " + System.identityHashCode(this) + ", state=" + state +
- ", inuse=" + inuse + ", using=" + using);
- }
- }
-
- private void debugReport(String desc) {
- if (logger.isDebugEnabled()) {
- logger.debug(desc + ": " + System.identityHashCode(this) + ", state=" + state +
- ", inuse=" + inuse + ", using=" + using);
- }
- }
-
- private void errorReport(String desc, Throwable cause) {
- logger.error(desc + ": " + System.identityHashCode(this) + ", state=" + state +
- ", inuse=" + inuse + ", using=" + using, cause != null ? cause : new Throwable());
- }
+ public void enlist(EnlistableResource enlistable) throws MulgaraTransactionException;
}
Copied: branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionFactory.java (from rev 535, branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionManager.java)
===================================================================
--- branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionManager.java 2007-11-13 22:10:39 UTC (rev 535)
+++ branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionFactory.java 2007-11-14 08:13:10 UTC (rev 536)
@@ -0,0 +1,284 @@
+/*
+ * The contents of this file are subject to the Open Software License
+ * Version 3.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ * http://www.rosenlaw.com/OSL3.0.htm
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and limitations
+ * under the License.
+ *
+ * This file is an original work developed by Netymon Pty Ltd
+ * (http://www.netymon.com, mailto:mail at netymon.com). Portions created
+ * by Netymon Pty Ltd are Copyright (c) 2006 Netymon Pty Ltd.
+ * All Rights Reserved.
+ */
+
+package org.mulgara.resolver;
+
+// Java2 packages
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAResource;
+
+// Third party packages
+import org.apache.log4j.Logger;
+
+// Local packages
+import org.mulgara.query.MulgaraTransactionException;
+import org.mulgara.server.Session;
+import org.mulgara.transaction.TransactionManagerFactory;
+
+/**
+ * Manages transactions within Mulgara.
+ *
+ * see http://mulgara.org/confluence/display/dev/Transaction+Architecture
+ *
+ * Maintains association between Answer's and TransactionContext's.
+ * Manages tracking the ownership of the write-lock.
+ * Maintains the write-queue and any timeout algorithm desired.
+ * Provides new/existing TransactionContext's to DatabaseSession on request.
+ * Note: Returns new context unless Session is currently in a User Demarcated Transaction.
+ *
+ *
+ * @created 2006-10-06
+ *
+ * @author <a href="mailto:andrae at netymon.com">Andrae Muys</a>
+ *
+ * @version $Revision: $
+ *
+ * @modified $Date: $
+ *
+ * @maintenanceAuthor $Author: $
+ *
+ * @company <A href="mailto:mail at netymon.com">Netymon Pty Ltd</A>
+ *
+ * @copyright ©2006 <a href="http://www.netymon.com/">Netymon Pty Ltd</a>
+ *
+ * @licence Open Software License v3.0</a>
+ */
+
+public abstract class MulgaraTransactionFactory {
+ protected final MulgaraTransactionManager manager;
+
+ /**
+ * Contains a reference the the current writing transaction IFF it is managed
+ * by this factory. If there is no current writing transaction, or if the
+ * writing transaction is managed by a different factory then it is null.
+ */
+ protected MulgaraTransaction writeTransaction;
+
+ protected Map<DatabaseSession, ? extends MulgaraTransaction> activeTransaction;
+
+ protected Assoc1toNMap<DatabaseSession, ? extends MulgaraTransaction> sessionXAMap;
+
+ private ReentrantLock mutex;
+
+ protected MulgaraTransactionFactory(MulgaraTransactionManager manager) {
+ this.manager = manager;
+ this.mutex = new ReentrantLock();
+ this.writeTransaction = null;
+
+ this.activeTransaction = new Map<DatabaseSession, ? extends MulgaraTransaction>();
+ this.sessionXAMap = new Assoc1toNMap<DatabaseSession, ? extends MulgaraTransaction>();
+ }
+
+ /**
+ * Obtain a transaction context associated with a DatabaseSession.
+ *
+ * Either returns the existing context if:
+ * a) we are currently within a recursive call while under implicit XA control
+ * or
+ * b) we are currently within an active user demarcated XA.
+ * otherwise creates a new transaction context and associates it with the
+ * session.
+ */
+ public MulgaraTransaction getTransaction(final DatabaseSession session, boolean write)
+ throws MulgaraTransactionException {
+ acquireMutex();
+ try {
+ if (write && manager.isHoldingWriteLock(session)) {
+ return writeTransaction;
+ }
+
+ try {
+ MulgaraInternalTransaction transaction;
+ if (write) {
+ runWithoutMutex(new TransactionOperation() {
+ public void execute() throws MulgaraTransactionException {
+ manager.obtainWriteLock(session);
+ }
+ });
+ try {
+ assert writeTransaction == null;
+ writeTransaction = transaction = newMulgaraTransaction(session.newOperationContext(true));
+ } catch (MulgaraTransactionException em) {
+ manager.releaseWriteLock(session);
+ throw em;
+ } catch (Throwable th) {
+ manager.releaseWriteLock(session);
+ throw new MulgaraTransactionException("Error creating write transaction", th);
+ }
+ } else {
+ transaction = newMulgaraTransaction(session.newOperationContext(false));
+ }
+
+ sessionXAMap.put(session, transaction);
+
+ return transaction;
+ } catch (MulgaraTransactionException em) {
+ throw em;
+ } catch (Exception e) {
+ throw new MulgaraTransactionException("Error creating transaction", e);
+ }
+ } finally {
+ releaseMutex();
+ }
+ }
+
+ protected abstract MulgaraTransaction newMulgaraTransaction(DatabaseOperationContext context);
+
+ /**
+ * Rollback, or abort all transactions associated with a DatabaseSession.
+ *
+ * Will only abort the transaction if the rollback attempt fails.
+ */
+ public void rollbackCurrentTransactions(DatabaseSession session) throws MulgaraTransactionException {
+ acquireMutex();
+ try {
+ try {
+ Throwable error = null;
+
+ Set<MulgaraTransaction> requiresAbort = new HashSet<MulgaraTransaction>();
+ try {
+ if (manager.isHoldingWriteLock(session)) {
+ try {
+ if (writeTransaction != null) {
+ logger.warn("Terminating session while holding writelock:" + session + ": " + writeTransaction);
+ writeTransaction.heuristicRollback("Session closed while holding write lock");
+ }
+ } finally {
+ manager.releaseWriteLock(session);
+ }
+ }
+ } catch (Throwable th) {
+ requiresAbort.add(writeTransaction);
+ error = th;
+ }
+
+ for (MulgaraTransaction transaction : sessionXAMap.getN(session)) {
+ try {
+ transaction.heuristicRollback("Rollback due to session close");
+ } catch (Throwable th) {
+ requiresAbort.add(transaction);
+ if (error == null) {
+ error = th;
+ }
+ }
+ }
+
+ if (error != null) {
+ throw new MulgaraTransactionException("Heuristic rollback failed on session close", error);
+ }
+ } finally {
+ try {
+ abortTransactions(requiresAbort);
+ } catch (Throwable th) {
+ try {
+ logger.error("Error aborting transactions after heuristic rollback failure on session close", th);
+ } catch (Throwable throw_away) { }
+ }
+ }
+ } finally {
+ releaseMutex();
+ }
+ }
+
+ /**
+ * Abort as many of the transactions as we can.
+ */
+ private void abortTransactions(Set<MulgaraTransaction> requiresAbort) {
+ try {
+ if (!requiresAbort.empty()) {
+ // At this point the originating exception has been thrown in the caller
+ // so we attempt to ensure it doesn't get superseeded by anything that
+ // might be thrown here while logging any errors.
+ try {
+ logger.error("Heuristic Rollback Failed on session close- aborting");
+ } catch (Throwable throw_away) { } // Logging difficulties.
+
+ try {
+ for (MulgaraTransaction transaction : requiresAbort) {
+ try {
+ transaction.abort("Heuristic Rollback failed on session close");
+ } catch (Throwable th) {
+ try {
+ logger.error("Error aborting transaction after heuristic rollback failure on session close", th);
+ } catch (Throwable throw_away) { }
+ }
+ }
+ } catch (Throwable th) {
+ try {
+ logger.error("Loop error while aborting transactions after heuristic rollback failure on session close", th);
+ } catch (Throwable throw_away) { }
+ }
+ }
+ } catch (Throwable th) {
+ try {
+ logger.error("Unidentified error while aborting transactions after heuristic rollback failure on session close", th);
+ } catch (Throwable throw_away) { }
+ }
+ }
+
+ /**
+ * Used to replace the built in monitor to allow it to be properly released
+ * during potentially blocking operations. All potentially blocking
+ * operations involve writes, so in these cases the write-lock is reserved
+ * allowing the mutex to be safely released and then reobtained after the
+ * blocking operation concludes.
+ */
+ protected void acquireMutex() {
+ mutex.lock();
+ }
+
+
+ protected void releaseMutex() {
+ if (!mutex.isHeldByCurrentThread()) {
+ throw new IllegalStateException("Attempt to release mutex without holding mutex");
+ }
+
+ FIXME!: Need to figure out how to handle reservation here!
+ if (mutex.getHoldCount() == 1 && Thread.currentThread().equals(reservingThread)) {
+ reservingThread = null;
+ reserveCondition.signal();
+ }
+
+ mutex.unlock();
+ }
+
+ protected void runWithoutMutex(TransactionOperation proc) throws MulgaraTransactionException {
+ if (!mutex.isHeldByCurrentThread()) {
+ throw new IllegalStateException("Attempt to run procedure without holding mutex");
+ }
+ int holdCount = mutex.getHoldCount();
+ for (int i = 0; i < holdCount; i++) {
+ mutex.unlock();
+ }
+ try {
+ proc.execute();
+ } finally {
+ for (int i = 0; i < holdCount; i++) {
+ mutex.lock();
+ }
+ }
+ }
+}
Modified: branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionManager.java
===================================================================
--- branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionManager.java 2007-11-13 22:10:39 UTC (rev 535)
+++ branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionManager.java 2007-11-14 08:13:10 UTC (rev 536)
@@ -20,22 +20,17 @@
// Java2 packages
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import javax.transaction.SystemException;
-import javax.transaction.Transaction;
-import javax.transaction.TransactionManager;
-import javax.transaction.xa.XAResource;
// Third party packages
import org.apache.log4j.Logger;
// Local packages
import org.mulgara.query.MulgaraTransactionException;
-import org.mulgara.server.Session;
import org.mulgara.transaction.TransactionManagerFactory;
/**
@@ -43,12 +38,12 @@
*
* see http://mulgara.org/confluence/display/dev/Transaction+Architecture
*
- * Maintains association between Answer's and TransactionContext's.
* Manages tracking the ownership of the write-lock.
- * Maintains the write-queue and any timeout algorithm desired.
* Provides new/existing TransactionContext's to DatabaseSession on request.
* Note: Returns new context unless Session is currently in a User Demarcated Transaction.
- *
+ * Provides a facility to trigger a heuristic rollback of any transactions still
+ * valid on session close.
+ * Maintains the write-queue and any timeout algorithm desired.
*
* @created 2006-10-06
*
@@ -73,456 +68,89 @@
Logger.getLogger(MulgaraTransactionManager.class.getName());
// Write lock is associated with a session.
- private Session currentWritingSession;
- private MulgaraTransaction userTransaction;
- private boolean autoCommit;
+ private DatabaseSession sessionHoldingWriteLock;
+ private MulgaraTransaction currentWritingTransaction;
+ // Used to synchronize access to other fields.
private ReentrantLock mutex;
private Condition writeLockCondition;
+
+ // Used to support write-lock reservation.
private Condition reserveCondition;
private Thread reservingThread;
- /** Set of sessions whose transactions have been rolledback.*/
- private Set<Session> failedSessions;
+ private MulgaraInternalTransactionFactory internalFactory;
+ private MulgaraExternalTransactionFactory externalFactory;
- /**
- * Map from transaction to initiating session.
- * FIXME: This is only required for checking while we wait for 1-N.
- * Remove once 1-N is implemented.
- */
- private Map<MulgaraTransaction, Session> sessions;
-
- /**
- * Map from initiating session to set of transactions.
- * Used to clean-up transactions upon session close.
- */
- private Map<Session, Set<MulgaraTransaction>> transactions;
-
- /** Map of threads to active transactions. */
- private Map<Thread, MulgaraTransaction> activeTransactions;
-
- private final TransactionManager transactionManager;
-
public MulgaraTransactionManager(TransactionManagerFactory transactionManagerFactory) {
- this.currentWritingSession = null;
- this.userTransaction = null;
- this.autoCommit = true;
+ this.sessionHoldingWriteLock = null;
+ this.currentWritingTransaction = null;
this.mutex = new ReentrantLock();
this.writeLockCondition = this.mutex.newCondition();
+
this.reserveCondition = this.mutex.newCondition();
this.reservingThread = null;
- this.failedSessions = new HashSet<Session>();
- this.sessions = new HashMap<MulgaraTransaction, Session>();
- this.transactions = new HashMap<Session, Set<MulgaraTransaction>>();
- this.activeTransactions = new HashMap<Thread, MulgaraTransaction>();
+ this.internalFactory = new MulgaraInternalTransactionFactory(this, transactionManagerFactory)
+ this.externalFactory = new MulgaraExternalTransactionFactory(this)
- this.transactionManager = transactionManagerFactory.newTransactionManager();
+ this.managers = new HashMap<DatabaseSession, MulgaraXAManager>
}
- /**
- * Allows DatabaseSession to initiate/obtain a transaction.
- * <ul>
- * <li>If the Session holds the write lock, return the current Write-Transaction.</li>
- * <li>If the Session does not hold the write lock and requests a read-only transaction,
- * create a new ro-transaction object and return it.</li>
- * <li>If the Session does not hold the write lock and requests a read-write transaction,
- * obtain the write-lock, create a new transaction object and return it.</li>
- * </ul>
- */
- public MulgaraTransaction getTransaction(DatabaseSession session, boolean write) throws MulgaraTransactionException {
- acquireMutex();
- try {
- if (session == currentWritingSession) {
- return userTransaction;
- }
- try {
- MulgaraTransaction transaction = write ?
- obtainWriteLock(session) :
- new MulgaraTransaction(this, session.newOperationContext(false));
+ MulgaraInternalTransactionFactory getInternalFactory() {
+ return internalFactory;
+ }
- sessions.put(transaction, session);
-
- if (!transactions.containsKey(session)) {
- transactions.put(session, new HashSet<MulgaraTransaction>());
- }
- transactions.get(session).add(transaction);
-
- return transaction;
- } catch (MulgaraTransactionException em) {
- throw em;
- } catch (Exception e) {
- throw new MulgaraTransactionException("Error creating transaction", e);
- }
- } finally {
- releaseMutex();
- }
+ MulgaraExternalTransactionFactory getExternalFactory() {
+ return externalFactory;
}
/**
* Obtains the write lock.
- * Must hold readMutex on entry - but will drop readMutex if
*/
- private MulgaraTransaction obtainWriteLock(DatabaseSession session)
- throws MulgaraTransactionException {
- while (writeLockHeld() || writeLockReserved()) {
- try {
- writeLockCondition.await();
- } catch (InterruptedException ei) {
- throw new MulgaraTransactionException("Interrupted while waiting for write lock", ei);
- }
- }
-
- try {
- currentWritingSession = session;
- userTransaction = new MulgaraTransaction(this, session.newOperationContext(true));
- return userTransaction;
- } catch (Throwable th) {
- releaseWriteLock();
- throw new MulgaraTransactionException("Error while obtaining write-lock", th);
- }
- }
-
- private void releaseWriteLock() {
- // Calling this method multiple times is safe as the lock cannot be obtained
- // between calls as this method is private, and all calling methods are
- // synchronized.
- currentWritingSession = null;
- userTransaction = null;
- writeLockCondition.signal();
- }
-
- public void commit(DatabaseSession session) throws MulgaraTransactionException {
+ void obtainWriteLock(DatabaseSession session) throws MulgaraTransactionException {
acquireMutex();
try {
- reserveWriteLock();
- if (failedSessions.contains(session)) {
- throw new MulgaraTransactionException("Attempting to commit failed exception");
- } else if (session != currentWritingSession) {
- throw new MulgaraTransactionException(
- "Attempting to commit while not the current writing transaction");
+ if (sessionHoldingWriteLock == session) {
+ return;
}
- setAutoCommit(session, true);
- setAutoCommit(session, false);
- } finally {
- releaseMutex();
- }
- }
-
-
- /**
- * This is an explicit, user-specified rollback.
- *
- * This needs to be distinguished from an implicit rollback triggered by failure.
- */
- public void rollback(DatabaseSession session) throws MulgaraTransactionException {
- acquireMutex();
- try {
- reserveWriteLock();
- if (session == currentWritingSession) {
+ while (writeLockHeld() || writeLockReserved()) {
try {
- userTransaction.execute(new TransactionOperation() {
- public void execute() throws MulgaraTransactionException {
- userTransaction.explicitRollback();
- }
- });
- if (userTransaction != null) {
- // transaction referenced by something - need to explicitly end it.
- userTransaction.abortTransaction("Rollback failed",
- new MulgaraTransactionException("Rollback failed to terminate write transaction"));
- }
- } finally {
- failedSessions.add(session);
- releaseWriteLock();
- setAutoCommit(session, false);
+ writeLockCondition.await();
+ } catch (InterruptedException ei) {
+ throw new MulgaraTransactionException("Interrupted while waiting for write lock", ei);
}
- } else if (failedSessions.contains(session)) {
- failedSessions.remove(session);
- setAutoCommit(session, false);
- } else {
- throw new MulgaraTransactionException(
- "Attempt to rollback while not in the current writing transaction");
}
- } finally {
- releaseMutex();
- }
- }
- public void setAutoCommit(DatabaseSession session, boolean autoCommit)
- throws MulgaraTransactionException {
- acquireMutex();
- try {
- if (session == currentWritingSession && failedSessions.contains(session)) {
- userTransaction.abortTransaction("Session failed and transaction not finalized",
- new MulgaraTransactionException("Failed Session in setAutoCommit"));
- }
-
- if (session == currentWritingSession || failedSessions.contains(session)) {
- if (autoCommit) {
- // AutoCommit off -> on === branch on current state of transaction.
- if (session == currentWritingSession) {
- // Within active transaction - commit and finalise.
- try {
- runWithoutMutex(new TransactionOperation() {
- public void execute() throws MulgaraTransactionException {
- userTransaction.execute(new TransactionOperation() {
- public void execute() throws MulgaraTransactionException {
- userTransaction.dereference();
- userTransaction.commitTransaction();
- }
- });
- }
- });
- } finally {
- releaseWriteLock();
- this.autoCommit = true;
- }
- } else if (failedSessions.contains(session)) {
- // Within failed transaction - cleanup.
- failedSessions.remove(session);
- }
- } else {
- logger.info("Attempt to set autocommit false twice");
- // AutoCommit off -> off === no-op. Log info.
- }
- } else {
- if (autoCommit) {
- // AutoCommit on -> on === no-op. Log info.
- logger.info("Attempting to set autocommit true without setting it false");
- } else {
- // AutoCommit on -> off == Start new transaction.
- userTransaction = getTransaction(session, true);
- userTransaction.reference();
- this.autoCommit = false;
- }
- }
+ sessionHoldingWriteLock = session;
} finally {
releaseMutex();
}
}
- public void rollbackCurrentTransactions(Session session) throws MulgaraTransactionException {
- acquireMutex();
- try {
- try {
- if (failedSessions.contains(session)) {
- failedSessions.remove(session);
- return;
- }
- Throwable error = null;
-
- try {
- if (session == currentWritingSession) {
- logger.warn("Terminating session while holding writelock:" + session + ": " + userTransaction);
- userTransaction.execute(new TransactionOperation() {
- public void execute() throws MulgaraTransactionException {
- throw new MulgaraTransactionException("Terminating session while holding writelock");
- }
- });
- }
- } catch (Throwable th) {
- error = th;
- }
-
- final Throwable trigger = new MulgaraTransactionException("trigger rollback");
-
- if (transactions.containsKey(session)) {
- for (MulgaraTransaction transaction : transactions.get(session)) {
- try {
- transaction.execute(new TransactionOperation() {
- public void execute() throws MulgaraTransactionException {
- throw new MulgaraTransactionException("Rolling back transactions due to session close");
- }
- });
- } catch (MulgaraTransactionException em) {
- // ignore.
- } catch (Throwable th) {
- if (error == null) {
- error = th;
- }
- }
- }
- }
-
- if (error != null) {
- if (error instanceof MulgaraTransactionException) {
- throw (MulgaraTransactionException)error;
- } else {
- throw new MulgaraTransactionException("Error in rollback on session close", error);
- }
- }
- } finally {
- if (transactions.containsKey(session)) {
- logger.error("Error in transaction rollback due to session close - aborting");
- abortCurrentTransactions(session);
- }
- }
- } finally {
- releaseMutex();
- }
- }
-
- private void abortCurrentTransactions(Session session) throws MulgaraTransactionException {
+ void releaseWriteLock(DatabaseSession session) MulgaraTransactionException {
acquireMutex();
try {
- try {
- Throwable error = null;
- for (MulgaraTransaction transaction : transactions.get(session)) {
- try {
- transaction.abortTransaction("Transaction still valid on session close", new Throwable());
- } catch (Throwable th) {
- try {
- if (error == null) {
- error = th;
- }
- } catch (Throwable throw_away) {}
- }
- }
-
- if (error != null) {
- if (error instanceof MulgaraTransactionException) {
- throw (MulgaraTransactionException)error;
- } else {
- throw new MulgaraTransactionException("Error in rollback on session close", error);
- }
- }
- } finally {
- if (session == currentWritingSession) {
- logger.error("Failed to abort write-transaction on session close - Server restart required");
- }
+ if (sessionHoldingWriteLock == null) {
+ return;
}
- } finally {
- releaseMutex();
- }
- }
-
- //
- // Transaction livecycle callbacks.
- //
-
- public Transaction transactionStart(MulgaraTransaction transaction) throws MulgaraTransactionException {
- acquireMutex();
- try {
- try {
- logger.info("Beginning Transaction");
- if (activeTransactions.get(Thread.currentThread()) != null) {
- throw new MulgaraTransactionException(
- "Attempt to start transaction in thread with exiting active transaction.");
- } else if (activeTransactions.containsValue(transaction)) {
- throw new MulgaraTransactionException("Attempt to start transaction twice");
- }
-
- transactionManager.begin();
- Transaction jtaTrans = transactionManager.getTransaction();
-
- activeTransactions.put(Thread.currentThread(), transaction);
-
- return jtaTrans;
- } catch (Exception e) {
- throw new MulgaraTransactionException("Transaction Begin Failed", e);
+ if (sessionHoldingWriteLock != session) {
+ throw new MulgaraTransactionException("Attempted to release write lock being held by another session");
}
- } finally {
- releaseMutex();
+ sessionHoldingWriteLock = null;
+ writeLockCondition.signal();
}
}
- public void transactionResumed(MulgaraTransaction transaction, Transaction jtaXA)
- throws MulgaraTransactionException {
- acquireMutex();
- try {
- if (activeTransactions.get(Thread.currentThread()) != null) {
- throw new MulgaraTransactionException(
- "Attempt to resume transaction in already activated thread");
- } else if (activeTransactions.containsValue(transaction)) {
- throw new MulgaraTransactionException("Attempt to resume active transaction");
- }
-
- try {
- transactionManager.resume(jtaXA);
- activeTransactions.put(Thread.currentThread(), transaction);
- } catch (Exception e) {
- throw new MulgaraTransactionException("Resume Failed", e);
- }
- } finally {
- releaseMutex();
- }
- }
- public Transaction transactionSuspended(MulgaraTransaction transaction)
- throws MulgaraTransactionException {
- acquireMutex();
- try {
- try {
- if (transaction != activeTransactions.get(Thread.currentThread())) {
- throw new MulgaraTransactionException(
- "Attempt to suspend transaction from outside thread");
- }
-
- if (autoCommit && transaction == userTransaction) {
- logger.error("Attempt to suspend write transaction without setting AutoCommit Off");
- throw new MulgaraTransactionException(
- "Attempt to suspend write transaction without setting AutoCommit Off");
- }
-
- Transaction xa = transactionManager.suspend();
- activeTransactions.remove(Thread.currentThread());
-
- return xa;
- } catch (Throwable th) {
- logger.error("Attempt to suspend failed", th);
- try {
- transactionManager.setRollbackOnly();
- } catch (Throwable t) {
- logger.error("Attempt to setRollbackOnly() failed", t);
- }
- throw new MulgaraTransactionException("Suspend failed", th);
- }
- } finally {
- releaseMutex();
- }
- }
-
- public void transactionComplete(MulgaraTransaction transaction) {
- acquireMutex();
- try {
- if (transaction == userTransaction) {
- releaseWriteLock();
- }
-
- activeTransactions.remove(Thread.currentThread());
- Session session = (Session)sessions.get(transaction);
- sessions.remove(transaction);
- transactions.remove(session);
- } finally {
- releaseMutex();
- }
- }
-
- public void transactionAborted(MulgaraTransaction transaction) {
- acquireMutex();
- try {
- try {
- // Make sure this cleans up the transaction metadata - this transaction is DEAD!
- if (transaction == userTransaction) {
- failedSessions.add(currentWritingSession);
- }
- transactionComplete(transaction);
- } catch (Throwable th) {
- // FIXME: This should probably abort the entire server after logging the error!
- logger.error("Error managing transaction abort", th);
- }
- } finally {
- releaseMutex();
- }
- }
-
public void setTransactionTimeout(int transactionTimeout) {
try {
- transactionManager.setTransactionTimeout(transactionTimeout);
+ internalFactory.setTransactionTimeout(transactionTimeout);
+ externalFactory.setTransactionTimeout(transactionTimeout);
} catch (SystemException es) {
logger.warn("Unable to set transaction timeout: " + transactionTimeout, es);
}
@@ -542,8 +170,9 @@
/**
* Used to reserve the write lock during a commit or rollback.
+ * Should only be used by a transaction manager.
*/
- private void reserveWriteLock() throws MulgaraTransactionException {
+ void reserveWriteLock() throws MulgaraTransactionException {
if (!mutex.isHeldByCurrentThread()) {
throw new IllegalStateException("Attempt to set modify without holding mutex");
}
@@ -562,13 +191,13 @@
reservingThread = Thread.currentThread();
}
- private boolean writeLockReserved() {
+ boolean writeLockReserved() {
// TRUE iff there is a reserving thread AND it is different thread to the current thread.
return reservingThread != null && !Thread.currentThread().equals(reservingThread);
}
private boolean writeLockHeld() {
- return currentWritingSession != null;
+ return sessionHoldingWriteLock != null;
}
private void releaseMutex() {
@@ -583,21 +212,4 @@
mutex.unlock();
}
-
- private void runWithoutMutex(TransactionOperation proc) throws MulgaraTransactionException {
- if (!mutex.isHeldByCurrentThread()) {
- throw new IllegalStateException("Attempt to run procedure without holding mutex");
- }
- int holdCount = mutex.getHoldCount();
- for (int i = 0; i < holdCount; i++) {
- mutex.unlock();
- }
- try {
- proc.execute();
- } finally {
- for (int i = 0; i < holdCount; i++) {
- mutex.lock();
- }
- }
- }
}
Added: branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraXAResource.java
===================================================================
--- branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraXAResource.java 2007-11-13 22:10:39 UTC (rev 535)
+++ branches/mgr-73/src/jar/resolver/java/org/mulgara/resolver/MulgaraXAResource.java 2007-11-14 08:13:10 UTC (rev 536)
@@ -0,0 +1,144 @@
+/*
+ * The contents of this file are subject to the Open Software License
+ * Version 3.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ * http://www.rosenlaw.com/OSL3.0.htm
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and limitations
+ * under the License.
+ *
+ * This file is an original work developed by Netymon Pty Ltd
+ * (http://www.netymon.com, mailto:mail at netymon.com). Portions created
+ * by Netymon Pty Ltd are Copyright (c) 2006 Netymon Pty Ltd.
+ * All Rights Reserved.
+ */
+
+package org.mulgara.resolver;
+
+// Java2 packages
+import javax.transaction.xa.XAResource;
+
+// Third party packages
+import org.apache.log4j.Logger;
+
+// Local packages
+
+/**
+ * Provides an external JTA-compliant TransactionManager with the ability to
+ * control Mulgara Transactions.
+ *
+ * @created 2007-11-07
+ *
+ * @author <a href="mailto:andrae at netymon.com">Andrae Muys</a>
+ *
+ * @company <A href="mailto:mail at netymon.com">Netymon Pty Ltd</A>
+ *
+ * @copyright ©2006 <a href="http://www.topazproject.org/">Topaz Project</a>
+ *
+ * @licence Open Software License v3.0</a>
+ */
+
+public class MulgaraXAResource implements XAResource {
+ private MulgaraExternalTransactionFactory factory;
+
+ private DatabaseSession session;
+
+ private Assoc1toNMap<MulgaraExternalTransaction, Xid> xa2xid;
+
+ MulgaraXAResource(MulgaraExternalTransactionFactory factory, DatabaseSession session) {
+ this.factory = factory;
+ this.session = session;
+ this.xa2xid = new Assoc1toNMap<MulgaraExternalTransaction, Xid>();
+ }
+
+ public void commit(Xid xid, boolean onePhase) {
+ MulgaraExternalTransaction xa = xa2xid.get1(xid);
+ if (xa.isHeuristicallyRollbacked()) {
+ throw new XAException(XAException.XA_HEURRB);
+ }
+ for (EnlistableResource er : xa.getEnlistedResources()) {
+ er.getXAResource().commit(xid, onePhase);
+ }
+ }
+
+ public void end(Xid xid, int flags) {
+ MulgaraExternalTransaction xa = xa2xid.get1(xid);
+ switch (flags) {
+ case TMSUCCESS:
+ case TMFAIL:
+ case TMSUSPEND:
+ factory.disassociate(session, xa);
+ }
+ }
+
+ public void forget(Xid xid) {
+ MulgaraExternalTransaction xa = xa2xid.get1(xid);
+ if (xa == null) {
+ throw new XAException(XAException.XAER_NOTA);
+ } else {
+ if (!xa.isHeuristicRollbackOnly()) {
+ xa.abortTransaction("External XA Manager specified 'forget'", new Throwable());
+ }
+ xa2xid.remove1(xa);
+ }
+ }
+
+ public int getTransactionTimeout() {
+ }
+
+ public boolean isSameRM(XAResource xares) {
+ }
+
+ public int prepare(Xid xid) {
+ MulgaraExternalTransaction xa = xa2xid.get1(xid);
+ // forall xa.enlistedResources():
+ // enlisted.prepare(xid);
+ // return XA_OK
+ }
+
+ /**
+ * We don't currently support recover.
+ * FIXME: We should at least handle the case where we are asked to recover
+ * when we haven't crashed.
+ */
+ public Xid[] recover(int flag) {
+ return new Xid[] {};
+ }
+
+ public boolean setTransactionTimeout(int seconds) {
+ }
+
+ public void start(Xid xid, int flags) {
+ switch (flags) {
+ case TMNOFLAGS:
+ if (xa2xid.containsN(xid)) {
+ throw new XAException(XAException.XAER_DUPID);
+ } else if (existing transaction(session)) {
+ throw new XAException(XAException.XA_RBDEADLOCK);
+ } else {
+ MulgaraTransaction xa = factory.newTransaction(session, isWrite);
+ xa2xid.put(xa, xid);
+ }
+ break;
+ case TMJOIN:
+ if (!existing transaction(session)) {
+ throw new XAException(XAException.XAER_NOTA);
+ } else {
+ // Do stuff.
+ }
+ break;
+ case TMRESUME:
+ MulgaraExternalTransaction xa = xa2xid.get1(xid);
+ if (xa == null) {
+ throw new XAException(XAException.XAER_NOTA);
+ } else if (xa.isRollbackOnly()) {
+ throw new XAException(XAException.XA_RBROLLBACK);
+ } else {
+ factory.associate(session, xa);
+ }
+ break;
+ }
+ }
+}
More information about the Mulgara-svn
mailing list