[Mulgara-svn] r122 - branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver
andrae at mulgara.org
andrae at mulgara.org
Wed Nov 1 06:41:28 UTC 2006
Author: andrae
Date: 2006-11-01 00:41:28 -0600 (Wed, 01 Nov 2006)
New Revision: 122
Modified:
branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/BasicDatabaseSessionUnitTest.java
branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/Database.java
branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/DatabaseOperationContext.java
branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/DatabaseSession.java
branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/LocalJRDFDatabaseSession.java
branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransaction.java
branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionManager.java
branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/StatusFormat.java
Log:
Starting to seriously integrate MulgaraTransaction and MTMgr into the code-path.
DatabaseSession now uses MulgaraTransactionManager/MugaraTransaction and no
longer has any direct interaction with JTA classes. There is some hackery to
deal with DatabaseSession not calling execute on MT - which would improve the
activate/deactivate guarentees.
Modified: branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/BasicDatabaseSessionUnitTest.java
===================================================================
--- branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/BasicDatabaseSessionUnitTest.java 2006-10-31 00:11:40 UTC (rev 121)
+++ branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/BasicDatabaseSessionUnitTest.java 2006-11-01 06:41:28 UTC (rev 122)
@@ -195,10 +195,12 @@
* of the system model in the newly-created {@link Database}.
*/
public void testQuery1() {
- logger.debug("Testing testQuery1");
+ logger.warn("Testing testQuery1");
try {
// Test querying the system model (#)
+ logger.warn("Obtaining new Session");
Session session = database.newSession();
+ logger.warn("Obtained new Session");
try {
Variable subjectVariable = new Variable("subject");
Variable predicateVariable = new Variable("predicate");
@@ -210,6 +212,7 @@
selectList.add(objectVariable);
// Evaluate the query
+ logger.warn("Performing query");
Answer answer = new ArrayAnswer(session.query(new Query(
selectList, // SELECT
new ModelResource(systemModelURI), // FROM
@@ -224,6 +227,7 @@
0, // OFFSET
new UnconstrainedAnswer() // GIVEN
)));
+ logger.warn("Performed query");
// Compose the expected result of the query
@@ -238,7 +242,9 @@
// Verify that the query result is as expected
assertEquals(expectedAnswer, answer);
} finally {
+ logger.warn("Closing session");
session.close();
+ logger.warn("Closed session");
}
} catch (Exception e) {
fail(e);
@@ -251,7 +257,7 @@
*/
public void testSetModel() throws URISyntaxException
{
- logger.debug("Testing testSetModel");
+ logger.warn("Testing testSetModel");
URI fileURI = new File("data/dc.rdfs").toURI();
URI modelURI = new URI("local:database#model");
Modified: branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/Database.java
===================================================================
--- branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/Database.java 2006-10-31 00:11:40 UTC (rev 121)
+++ branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/Database.java 2006-11-01 06:41:28 UTC (rev 122)
@@ -253,10 +253,12 @@
private final TransactionManagerFactory transactionManagerFactory;
/**
- * JTS transaction manager used to distribute transactions over multiple
- * resolvers.
+ * The internal transaction manager.
+ *
+ * This class is a singleton with respect to a database instance.
+ * Passed to new DatabaseSession's.
*/
- private final TransactionManager transactionManager;
+ private final MulgaraTransactionManager transactionManager;
/** The unique {@link URI} naming this database. */
private final URI uri;
@@ -510,17 +512,9 @@
assert this.contentHandlers != null;
// FIXME: Migrate this code inside StringPoolSession. Pass config to StringPoolSession.
- this.transactionManager = transactionManagerFactory.newTransactionManager();
+ this.transactionManager = new MulgaraTransactionManager(transactionManagerFactory);
- // Set the transaction timeout to an hour
- try {
- transactionManager.setTransactionTimeout(transactionTimeout);
- }
- catch (SystemException e) {
- logger.warn(
- "Unable to set transaction timeout to " + transactionTimeout + "s", e
- );
- }
+ transactionManager.setTransactionTimeout(transactionTimeout);
// Enable resolver initialization
if (logger.isDebugEnabled()) {
Modified: branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/DatabaseOperationContext.java
===================================================================
--- branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/DatabaseOperationContext.java 2006-10-31 00:11:40 UTC (rev 121)
+++ branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/DatabaseOperationContext.java 2006-11-01 06:41:28 UTC (rev 122)
@@ -138,7 +138,7 @@
private final List securityAdapterList;
private final URI temporaryModelTypeURI;
private final ResolverFactory temporaryResolverFactory;
- private final TransactionManager transactionManager;
+ private final MulgaraTransactionManager transactionManager;
private final Set outstandingAnswers;
/** Symbolic transformations this instance should apply. */
private final List symbolicTransformationList;
@@ -162,7 +162,7 @@
List securityAdapterList,
URI temporaryModelTypeURI,
ResolverFactory temporaryResolverFactory,
- TransactionManager transactionManager,
+ MulgaraTransactionManager transactionManager,
Set outstandingAnswers,
List symbolicTransformationList)
{
@@ -453,7 +453,7 @@
throw new QueryException("Unable to obtain transaction", e);
}
if (transaction == null) {
- if (databaseSession.getTransaction() != null) {
+ if (databaseSession.getTransaction().isSuspended()) {
logger.error("Transaction suspended and not resumed when enlisting resolver");
}
else {
@@ -846,7 +846,7 @@
} else {
outstandingAnswers.remove(answer);
if (databaseSession.autoCommit && outstandingAnswers.isEmpty()) {
- if (databaseSession.getTransaction() != null) {
+ if (databaseSession.getTransaction().isSuspended()) {
databaseSession.resumeTransactionalBlock();
}
databaseSession.endTransactionalBlock("Could not commit query");
@@ -943,9 +943,8 @@
// Complete the numerical phase of resolution
Tuples tuples = localQuery.resolve();
- MulgaraTransaction xa = new MulgaraTransaction(null, this);
- result = new TransactionalAnswer(new MulgaraTransaction(null, this), new SubqueryAnswer(this, systemResolver, tuples, query.getVariableList()));
- xa.tempDeactivate(); // FIXME: Only necessary while we introduce the manager.
+ MulgaraTransaction xa = databaseSession.getTransaction();
+ result = new TransactionalAnswer(xa, new SubqueryAnswer(this, systemResolver, tuples, query.getVariableList()));
answers.put(result, null);
tuples.close();
localQuery.close();
Modified: branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/DatabaseSession.java
===================================================================
--- branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/DatabaseSession.java 2006-10-31 00:11:40 UTC (rev 121)
+++ branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/DatabaseSession.java 2006-11-01 06:41:28 UTC (rev 122)
@@ -39,9 +39,6 @@
// Java 2 enterprise packages
import javax.transaction.RollbackException;
import javax.transaction.Status;
-import javax.transaction.SystemException;
-import javax.transaction.Transaction;
-import javax.transaction.TransactionManager;
// Third party packages
import org.apache.log4j.Logger;
@@ -154,10 +151,10 @@
private final ResolverFactory temporaryResolverFactory;
/** Source of transactions. */
- private final TransactionManager transactionManager;
+ private final MulgaraTransactionManager transactionManager;
/** Session transaction */
- private Transaction transaction;
+ private MulgaraTransaction transaction;
/** The name of the rule loader to use */
private String ruleLoaderClassName;
@@ -204,8 +201,6 @@
*/
private ContentHandlerManager contentHandlers;
- private MulgaraTransactionManager manager;
-
//
// Constructor
//
@@ -243,7 +238,7 @@
* external models
* @throws IllegalArgumentException if any argument is <code>null</code>
*/
- DatabaseSession(TransactionManager transactionManager,
+ DatabaseSession(MulgaraTransactionManager transactionManager,
List securityAdapterList,
List symbolicTransformationList,
ResolverSessionFactory resolverSessionFactory,
@@ -338,26 +333,19 @@
outstandingAnswers,
symbolicTransformationList
);
- // FIXME: This will eventually be passed as a parameter from Database.
- this.manager = new MulgaraTransactionManager(null);
-
if (logger.isDebugEnabled()) {
logger.debug("Constructed DatabaseSession");
}
// Set the transaction timeout to an hour
- try {
- transactionManager.setTransactionTimeout(3600);
- } catch (SystemException e) {
- logger.warn("Unable to set transaction timeout to 3600s", e);
- }
+ transactionManager.setTransactionTimeout(3600);
}
/**
* Non-rule version of the constructor. Accepts all parameters except ruleLoaderClassName.
*/
- DatabaseSession(TransactionManager transactionManager,
+ DatabaseSession(MulgaraTransactionManager transactionManager,
List securityAdapterList,
List symbolicTransformationList,
ResolverSessionFactory resolverSessionFactory,
@@ -696,7 +684,7 @@
endTransactionalBlock("Extended transaction failed");
}
} else if (this.autoCommit && !autoCommit) { // Turning autoCommit off
- if (this.transaction != null) {
+ if (this.transaction.isSuspended()) {
resumeTransactionalBlock();
endPreviousQueryTransaction();
}
@@ -774,7 +762,7 @@
endTransactionalBlock("Failed to release write-lock in close");
}
} else {
- if (this.transaction != null) {
+ if (transaction != null && transaction.isSuspended()) {
resumeTransactionalBlock();
endPreviousQueryTransaction();
}
@@ -824,7 +812,7 @@
opStates[opState]);
}
if (autoCommit) {
- if (this.transaction != null) {
+ if (transaction != null && transaction.isSuspended()) {
resumeTransactionalBlock();
endPreviousQueryTransaction();
}
@@ -853,7 +841,7 @@
if (logger.isDebugEnabled()) {
logger.debug("Marking transaction for rollback", throwable);
}
- transactionManager.setRollbackOnly();
+ transaction.setRollbackOnly();
} catch (Throwable e) {
logger.error("Needed to mark transaction for rollback", throwable);
logger.error("Unable to mark transaction for rollback", e);
@@ -912,15 +900,14 @@
*/
public void resumeTransactionalBlock() throws QueryException {
logger.info("Resume Transactional Block");
- if (transaction == null) {
+ if (!transaction.isSuspended()) {
throw new IllegalStateException("Attempt to resume unsuspended transaction");
} else if (inFailedTransaction == true) {
throw new IllegalStateException("Transaction already failed, set autocommit true to reset");
}
try {
- transactionManager.resume(this.transaction);
- this.transaction = null;
+ this.transaction.resume();
} catch (Exception e) {
logger.error("Resume failed", e);
throw new QueryException("Failed to resume transaction", e);
@@ -935,7 +922,7 @@
*/
public void suspendTransactionalBlock() throws Throwable {
logger.info("Suspend Transactional Block");
- if (transaction != null) {
+ if (transaction.isSuspended()) {
throw new IllegalStateException(
"Attempt to suspend unresumed transaction.");
}
@@ -946,7 +933,7 @@
);
}
- int status = transactionManager.getStatus();
+ int status = transaction.getStatus();
if (!autoCommit &&
(status == Status.STATUS_MARKED_ROLLBACK ||
status == Status.STATUS_ROLLEDBACK ||
@@ -955,7 +942,7 @@
throw new QueryException("Transaction marked for rollback");
}
- this.transaction = transactionManager.suspend();
+ this.transaction.suspend();
}
public ResolverSession getResolverSession() {
@@ -986,7 +973,6 @@
throw new QueryException("Stale resolvers found in enlistedResolverMap");
}
-
if (allowWrites) {
try {
obtainWriteLock();
@@ -995,8 +981,12 @@
}
}
+ if (transaction != null) {
+ throw new QueryException("Attempt to start nested transaction");
+ }
+
try {
- transactionManager.begin();
+ transaction = new MulgaraTransaction(transactionManager, operationContext);
if (systemResolver != null) {
throw new QueryException("beginning nested transaction");
}
@@ -1027,10 +1017,10 @@
try {
// Commit the transaction
if (rollbackCause == null) {
- transactionManager.commit();
+ transaction.commit();
} else {
try {
- transactionManager.commit();
+ transaction.commit();
} catch (RollbackException e) {
// Sneakily reinsert the exception recorded earlier by the
// rollbackTransactionalBlock method. Without this feature, it's
@@ -1046,6 +1036,9 @@
throw new QueryException(failureMessage, e);
}
} finally {
+ transaction.tempDeactivate();
+ transaction = null;
+
releaseWriteLock();
enlistedResolverMap.clear();
outstandingAnswers.clear();
@@ -1086,18 +1079,22 @@
try {
if (!outstandingAnswers.isEmpty()) {
+ logger.error("Failed to clear preexisting transaction");
throw new QueryException("Failed to clear preexisting transaction");
}
- if (this.transaction != null) {
+ if (transaction != null && transaction.isSuspended()) {
+ logger.error("Failed to void suspended transaction");
throw new QueryException("Failed to void suspended transaction");
}
if (transactionManager.getTransaction() != null) {
+ logger.error("Failed to end transaction");
throw new QueryException("Failed to end transaction");
}
} catch (QueryException eq) {
endTransactionalBlock("Error ending previous query");
throw eq;
} catch (Throwable th) {
+ logger.error("Threw exception in endPreviousQuery", th);
endTransactionalBlock("Error ending previous query");
throw new QueryException("Failure ending previous query", th);
}
@@ -1262,7 +1259,7 @@
return systemResolver;
}
- Transaction getTransaction() {
+ MulgaraTransaction getTransaction() {
return transaction;
}
@@ -1271,7 +1268,7 @@
}
boolean ensureTransactionResumed() throws QueryException {
- if (this.transaction != null) {
+ if (this.transaction.isSuspended()) {
resumeTransactionalBlock();
return true;
} else {
@@ -1366,13 +1363,12 @@
* !AC -> R clr . S
*/
if (autoCommit) {
- if (this.transaction != null) {
+ if (transaction != null && transaction.isSuspended()) {
resumeTransactionalBlock();
endPreviousQueryTransaction();
}
beginTransactionalBlock(operation.isWriteOperation());
- }
- else {
+ } else {
resumeTransactionalBlock();
}
@@ -1381,8 +1377,7 @@
systemResolver,
resolverSessionFactory,
metadata);
- }
- catch (Throwable th) {
+ } catch (Throwable th) {
try {
logger.warn("Query failed", th);
rollbackTransactionalBlock(th);
Modified: branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/LocalJRDFDatabaseSession.java
===================================================================
--- branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/LocalJRDFDatabaseSession.java 2006-10-31 00:11:40 UTC (rev 121)
+++ branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/LocalJRDFDatabaseSession.java 2006-11-01 06:41:28 UTC (rev 122)
@@ -122,7 +122,7 @@
* external models
* @throws IllegalArgumentException if any argument is <code>null</code>
*/
- LocalJRDFDatabaseSession(TransactionManager transactionManager,
+ LocalJRDFDatabaseSession(MulgaraTransactionManager transactionManager,
List securityAdapterList, List symbolicTransformationList,
ResolverSessionFactory resolverSessionFactory,
SystemResolverFactory systemResolverFactory,
Modified: branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransaction.java
===================================================================
--- branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransaction.java 2006-10-31 00:11:40 UTC (rev 121)
+++ branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransaction.java 2006-11-01 06:41:28 UTC (rev 122)
@@ -31,6 +31,7 @@
import org.mulgara.resolver.spi.ResolverSessionFactory;
import org.mulgara.query.TuplesException;
+import org.mulgara.query.QueryException;
/**
* Responsible for the javax.transaction.Transaction object.
@@ -79,24 +80,42 @@
private int rollback;
private Throwable rollbackCause;
- public MulgaraTransaction(MulgaraTransactionManager manager, OperationContext context) {
+ //
+ // Temporary scaffolding to support transition.
+ //
+
+ private boolean suspended;
+ private boolean ended;
+
+ public MulgaraTransaction(MulgaraTransactionManager manager, OperationContext context)
+ throws Exception {
report("Creating Transaction");
+ if (manager == null) {
+ throw new IllegalArgumentException("Manager null in MulgaraTransaction");
+ } else if (context == null) {
+ throw new IllegalArgumentException("OperationContext null in MulgaraTransaction");
+ }
this.manager = manager;
this.context = context;
-// FIXME: MTMgr will be null until operational.
-// this.transaction = manager.transactionStart(this);
+ this.transaction = manager.transactionStart(this);
+
inuse = 1; // Note: This implies implict activation as a part of construction.
using = 0;
rollback = NO_ROLLBACK;
rollbackCause = null;
+// FIXME: scaffolding.
+ suspended = false;
+ ended = false;
+
// FIXME: need this added to context. Sets up and enlists the system-resolver.
// context.initiate();
// FIXME: need this added to context. Allows context to cleanup caches at end of transaction.
// this.transaction.enlistResource(context.getXAResource());
+// logger.warn("Created Transaction from: ", new Throwable());
report("Created Transaction");
}
@@ -128,10 +147,6 @@
}
- public synchronized void tempDeactivate() throws MulgaraTransactionException {
- deactivate();
- }
-
// FIXME: Not yet certain I have the error handling right here.
// Need to clarify semantics and ensure the error conditions are
// properly handled.
@@ -211,7 +226,6 @@
/** Should rename this 'wrap' */
AnswerOperationResult execute(AnswerOperation ao) throws TuplesException {
-// FIXME: activate/deactivate won't work until we have MTMgr operational.
report("Executing Operation");
try {
activate();
@@ -308,4 +322,54 @@
", inuse=" + inuse + ", using=" + using);
}
}
+
+ //
+ // Scaffolding
+ //
+ public boolean isSuspended() {
+ return suspended;
+ }
+
+ public void suspend() throws Throwable {
+ if (ended) {
+ throw new MulgaraTransactionException("Attempt to suspend ended transaction");
+ }
+ manager.suspend();
+ suspended = true;
+ }
+
+ public void resume() throws Exception {
+ if (ended) {
+ throw new MulgaraTransactionException("Attempt to resume ended transaction");
+ }
+ manager.resume(this.transaction);
+ suspended = false;
+ }
+
+ public void commit() throws Exception {
+ if (suspended) {
+ throw new MulgaraTransactionException("Attempt to commit suspended transaction");
+ }
+ ended = true;
+ manager.commit();
+ }
+
+ public void setRollbackOnly() throws Exception {
+ if (suspended) {
+ throw new MulgaraTransactionException("Attempt to rollback suspended transaction");
+ }
+ manager.setRollbackOnly();
+ }
+
+ public int getStatus() throws Throwable {
+ return manager.getStatus();
+ }
+
+ public void tempDeactivate() throws QueryException {
+ try {
+ deactivate();
+ } catch (MulgaraTransactionException em) {
+ throw new QueryException("Failed to dereference in bootstrap", em);
+ }
+ }
}
Modified: branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionManager.java
===================================================================
--- branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionManager.java 2006-10-31 00:11:40 UTC (rev 121)
+++ branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/MulgaraTransactionManager.java 2006-11-01 06:41:28 UTC (rev 122)
@@ -20,6 +20,7 @@
// Java2 packages
import java.util.HashMap;
import java.util.Map;
+import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import javax.transaction.xa.XAResource;
@@ -29,6 +30,7 @@
// Local packages
import org.mulgara.server.Session;
+import org.mulgara.transaction.TransactionManagerFactory;
/**
* Manages transactions within Mulgara.
@@ -78,13 +80,13 @@
private Object writeLockMutex;
- public MulgaraTransactionManager(TransactionManager transactionManager) {
+ public MulgaraTransactionManager(TransactionManagerFactory transactionManagerFactory) {
this.currentWritingSession = null;
this.userTransaction = null;
this.failedSessions = new HashMap();
- this.transactionManager = transactionManager;
+ this.transactionManager = transactionManagerFactory.newTransactionManager();
this.writeLockMutex = new Object();
}
@@ -112,6 +114,8 @@
}
+/*
+ * disabled temporarally while migrating.
public synchronized MulgaraTransaction getTransaction()
throws MulgaraTransactionException {
MulgaraTransaction transaction = (MulgaraTransaction)activeTransactions.get(Thread.currentThread());
@@ -121,6 +125,7 @@
throw new MulgaraTransactionException("No transaction assoicated with current thread");
}
}
+ */
private synchronized void obtainWriteLock(Session session)
throws MulgaraTransactionException {
@@ -222,7 +227,8 @@
transactionManager.begin();
Transaction jtaTrans = transactionManager.getTransaction();
- activeTransactions.put(Thread.currentThread(), transaction);
+// FIXME: Not in use yet - activate later.
+// activeTransactions.put(Thread.currentThread(), transaction);
return jtaTrans;
} catch (Exception e) {
@@ -279,4 +285,44 @@
// Remove transaction from Session's list.
}
}
+
+ //
+ // Temporary methods to introduce the manager into the code-path.
+ //
+
+ void setTransactionTimeout(int transactionTimeout) {
+ try {
+ transactionManager.setTransactionTimeout(transactionTimeout);
+ } catch (SystemException es) {
+ logger.warn("Unable to set transaction timeout: " + transactionTimeout, es);
+ }
+ }
+
+ void setRollbackOnly() throws Exception {
+ transactionManager.setRollbackOnly();
+ }
+
+ void resume(Transaction t) throws Exception {
+ transactionManager.resume(t);
+ }
+
+ int getStatus() throws SystemException {
+ return transactionManager.getStatus();
+ }
+
+ void commit() throws Exception {
+ transactionManager.commit();
+ }
+
+ Transaction getTransaction() throws Exception {
+ return transactionManager.getTransaction();
+ }
+
+ Transaction suspend() throws Exception {
+ return transactionManager.suspend();
+ }
+
+ void begin() throws Exception {
+ transactionManager.begin();
+ }
}
Modified: branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/StatusFormat.java
===================================================================
--- branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/StatusFormat.java 2006-10-31 00:11:40 UTC (rev 121)
+++ branches/xafix-impl/src/jar/resolver/java/org/mulgara/resolver/StatusFormat.java 2006-11-01 06:41:28 UTC (rev 122)
@@ -30,7 +30,6 @@
// Java 2 enterprise packages
import javax.transaction.Status;
import javax.transaction.SystemException;
-import javax.transaction.TransactionManager;
/**
* Generate a presentation form for a transaction {@link Status}.
@@ -89,7 +88,7 @@
*
* @param transactionManager the transaction manager
*/
- public static String formatStatus(TransactionManager transactionManager)
+ public static String formatStatus(MulgaraTransactionManager transactionManager)
{
try {
return formatStatus(transactionManager.getStatus());
More information about the Mulgara-svn
mailing list