[Mulgara-svn] r1056 - branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver
andrae at mulgara.org
andrae at mulgara.org
Mon Jul 7 06:38:29 UTC 2008
Author: andrae
Date: 2008-07-06 23:38:28 -0700 (Sun, 06 Jul 2008)
New Revision: 1056
Modified:
branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/StringPoolSession.java
Log:
Adds check for concurrent access to StringPoolSession.
Modified: branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/StringPoolSession.java
===================================================================
--- branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/StringPoolSession.java 2008-07-07 05:46:46 UTC (rev 1055)
+++ branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/StringPoolSession.java 2008-07-07 06:38:28 UTC (rev 1056)
@@ -122,6 +122,8 @@
private Object globalLock;
+ private Thread currentThread;
+
StringPoolSession(URI databaseURI,
Set<String> hostnameAliases,
XAStringPool persistentStringPool,
@@ -145,6 +147,7 @@
this.temporaryNodePool = temporaryNodePool;
this.globalLock = globalLock;
this.state = OBTAIN;
+ this.currentThread = null;
}
@@ -153,7 +156,6 @@
//
public Node globalize(long localNode) throws GlobalizeException {
-
// this should not require guarding, as read-only operations will usually not be on the current phase
// any reads on the current phase are about to start failing anyway if the state changes under us
if (state == ROLLBACK || state == RELEASE) {
@@ -199,117 +201,152 @@
}
public long localizePersistent(Node node) throws LocalizeException {
- return localize(node, WRITE | PERSIST);
+ checkCurrentThread();
+ try {
+ return localize(node, WRITE | PERSIST);
+ } finally {
+ releaseCurrentThread();
+ }
}
public long newBlankNode() throws NodePoolException {
- return persistentNodePool.newNode();
+ checkCurrentThread();
+ try {
+ return persistentNodePool.newNode();
+ } finally {
+ releaseCurrentThread();
+ }
}
public void refresh(SimpleXAResource[] resources) throws SimpleXAResourceException {
- if (logger.isDebugEnabled()) {
- logger.debug("Obtaining phase on StringPoolSession " + System.identityHashCode(this));
- }
- this.resources = resources;
+ checkCurrentThread();
+ try {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Obtaining phase on StringPoolSession " + System.identityHashCode(this));
+ }
+ this.resources = resources;
- synchronized (this.globalLock) {
- this.persistentStringPool.refresh();
- this.persistentNodePool.refresh();
- // !!Review: Call rollback on temporary? NB. Can't rollback non XA-SP/NP.
- //this.temporaryStringPool.refresh();
- //this.temporaryNodePool.refresh();
+ synchronized (this.globalLock) {
+ this.persistentStringPool.refresh();
+ this.persistentNodePool.refresh();
+ // !!Review: Call rollback on temporary? NB. Can't rollback non XA-SP/NP.
+ //this.temporaryStringPool.refresh();
+ //this.temporaryNodePool.refresh();
- for (int i = 0; i < this.resources.length; i++) {
- this.resources[i].refresh();
+ for (int i = 0; i < this.resources.length; i++) {
+ this.resources[i].refresh();
+ }
}
+ } finally {
+ releaseCurrentThread();
}
}
public void prepare() throws SimpleXAResourceException {
- synchronized (globalLock) {
- if (logger.isDebugEnabled()) {
- logger.debug("Preparing phase on StringPoolSession " + System.identityHashCode(this) + " SP=" + System.identityHashCode(persistentStringPool));
+ checkCurrentThread();
+ try {
+ synchronized (globalLock) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Preparing phase on StringPoolSession " + System.identityHashCode(this) + " SP=" + System.identityHashCode(persistentStringPool));
+ }
+ if (state == PREPARE) {
+ return;
+ } else if (state != OBTAIN) {
+ throw new SimpleXAResourceException("Attempting to prepare phase without obtaining phase");
+ }
+
+ state = PREPARE;
+
+ persistentStringPool.prepare();
+ persistentNodePool.prepare();
+ for (int i = 0; i < resources.length; i++) {
+ resources[i].prepare();
+ }
}
- if (state == PREPARE) {
- return;
- } else if (state != OBTAIN) {
- throw new SimpleXAResourceException("Attempting to prepare phase without obtaining phase");
- }
-
- state = PREPARE;
-
- persistentStringPool.prepare();
- persistentNodePool.prepare();
- for (int i = 0; i < resources.length; i++) {
- resources[i].prepare();
- }
+ } finally {
+ releaseCurrentThread();
}
}
public void commit() throws SimpleXAResourceException {
- synchronized (globalLock) {
- if (logger.isDebugEnabled()) {
- logger.debug("Committing phase on StringPoolSession " + System.identityHashCode(this));
- }
- if (state == COMMIT) {
- return;
- } else if (state != PREPARE) {
- throw new SimpleXAResourceException("Attempting to commit phase without preparing");
- }
-
- state = COMMIT;
+ checkCurrentThread();
+ try {
+ synchronized (globalLock) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Committing phase on StringPoolSession " + System.identityHashCode(this));
+ }
+ if (state == COMMIT) {
+ return;
+ } else if (state != PREPARE) {
+ throw new SimpleXAResourceException("Attempting to commit phase without preparing");
+ }
+
+ state = COMMIT;
- persistentStringPool.commit();
- persistentNodePool.commit();
- for (int i = 0; i < resources.length; i++) {
- resources[i].commit();
+ persistentStringPool.commit();
+ persistentNodePool.commit();
+ for (int i = 0; i < resources.length; i++) {
+ resources[i].commit();
+ }
}
+ } finally {
+ releaseCurrentThread();
}
}
public void rollback() throws SimpleXAResourceException {
- synchronized (globalLock) {
- if (logger.isDebugEnabled()) {
- logger.debug("Rollback phase on StringPoolSession " + System.identityHashCode(this));
+ checkCurrentThread();
+ try {
+ synchronized (globalLock) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Rollback phase on StringPoolSession " + System.identityHashCode(this));
+ }
+ if (state == RELEASE) {
+ throw new SimpleXAResourceException("Attempting to rollback phase outside transaction");
+ }
+ state = ROLLBACK;
+ persistentStringPool.rollback();
+ persistentNodePool.rollback();
+ for (int i = 0; i < resources.length; i++) {
+ resources[i].rollback();
+ }
}
- if (state == RELEASE) {
- throw new SimpleXAResourceException("Attempting to rollback phase outside transaction");
- }
- state = ROLLBACK;
- persistentStringPool.rollback();
- persistentNodePool.rollback();
- for (int i = 0; i < resources.length; i++) {
- resources[i].rollback();
- }
+ } finally {
+ releaseCurrentThread();
}
}
public void release() throws SimpleXAResourceException {
- synchronized (globalLock) {
- if (logger.isDebugEnabled()) {
- logger.debug("Release phase on StringPoolSession " + System.identityHashCode(this));
+ checkCurrentThread();
+ try {
+ synchronized (globalLock) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Release phase on StringPoolSession " + System.identityHashCode(this));
+ }
+ if (state == RELEASE) {
+ return;
+ } else if (state != COMMIT && state != ROLLBACK) {
+ throw new SimpleXAResourceException("Attempting to release phase without commit or rollback");
+ }
+
+ state = RELEASE;
+
+ persistentStringPool.release();
+ persistentNodePool.release();
+
+ // TODO determine if release() should be called for the temp components.
+ //temporaryStringPool.release();
+ //temporaryNodePool.release();
+ for (int i = 0; i < resources.length; i++) {
+ resources[i].release();
+ }
}
- if (state == RELEASE) {
- return;
- } else if (state != COMMIT && state != ROLLBACK) {
- throw new SimpleXAResourceException("Attempting to release phase without commit or rollback");
- }
-
- state = RELEASE;
-
- persistentStringPool.release();
- persistentNodePool.release();
-
- // TODO determine if release() should be called for the temp components.
- //temporaryStringPool.release();
- //temporaryNodePool.release();
- for (int i = 0; i < resources.length; i++) {
- resources[i].release();
- }
+ } finally {
+ releaseCurrentThread();
}
}
@@ -733,4 +770,25 @@
public long findGNode(SPObject spObject) throws StringPoolException {
return persistentStringPool.findGNode(spObject, persistentNodePool);
}
+
+ /**
+ * Used purely as a sanity check in the hope that we might catch concurrency bugs in higher layers should
+ * they exist.
+ */
+ private void checkCurrentThread() {
+ synchronized(this) {
+ if (currentThread == null || currentThread.equals(Thread.currentThread())) {
+ currentThread = Thread.currentThread();
+ } else {
+ logger.warn("Concurrent Access of StringPoolSession Attempted");
+ throw new IllegalStateException("Concurrent Access of StringPoolSession Attempted");
+ }
+ }
+ }
+
+ private void releaseCurrentThread() {
+ synchronized(this) {
+ currentThread = null;
+ }
+ }
}
More information about the Mulgara-svn
mailing list