[Mulgara-svn] r1056 - branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver

andrae at mulgara.org andrae at mulgara.org
Mon Jul 7 06:38:29 UTC 2008


Author: andrae
Date: 2008-07-06 23:38:28 -0700 (Sun, 06 Jul 2008)
New Revision: 1056

Modified:
   branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/StringPoolSession.java
Log:
Adds check for concurrent access to StringPoolSession.



Modified: branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/StringPoolSession.java
===================================================================
--- branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/StringPoolSession.java	2008-07-07 05:46:46 UTC (rev 1055)
+++ branches/mgr-121-lockrecovery/src/jar/resolver/java/org/mulgara/resolver/StringPoolSession.java	2008-07-07 06:38:28 UTC (rev 1056)
@@ -122,6 +122,8 @@
 
   private Object globalLock;
 
+  private Thread currentThread;
+
   StringPoolSession(URI          databaseURI,
                     Set<String>  hostnameAliases,
                     XAStringPool persistentStringPool,
@@ -145,6 +147,7 @@
     this.temporaryNodePool = temporaryNodePool;
     this.globalLock = globalLock;
     this.state = OBTAIN;
+    this.currentThread = null;
   }
 
 
@@ -153,7 +156,6 @@
   //
 
   public Node globalize(long localNode) throws GlobalizeException {
-
     // this should not require guarding, as read-only operations will usually not be on the current phase
     // any reads on the current phase are about to start failing anyway if the state changes under us
     if (state == ROLLBACK || state == RELEASE) {
@@ -199,117 +201,152 @@
   }
 
   public long localizePersistent(Node node) throws LocalizeException {
-    return localize(node, WRITE | PERSIST);
+    checkCurrentThread();
+    try {
+      return localize(node, WRITE | PERSIST);
+    } finally {
+      releaseCurrentThread();
+    }
   }
 
   public long newBlankNode() throws NodePoolException {
-    return persistentNodePool.newNode();
+    checkCurrentThread();
+    try {
+      return persistentNodePool.newNode();
+    } finally {
+      releaseCurrentThread();
+    }
   }
 
   public void refresh(SimpleXAResource[] resources) throws SimpleXAResourceException {
-    if (logger.isDebugEnabled()) {
-      logger.debug("Obtaining phase on StringPoolSession " + System.identityHashCode(this));
-    }
-    this.resources = resources;
+    checkCurrentThread();
+    try {
+      if (logger.isDebugEnabled()) {
+        logger.debug("Obtaining phase on StringPoolSession " + System.identityHashCode(this));
+      }
+      this.resources = resources;
 
-    synchronized (this.globalLock) {
-      this.persistentStringPool.refresh();
-      this.persistentNodePool.refresh();
-      // !!Review: Call rollback on temporary? NB. Can't rollback non XA-SP/NP.
-      //this.temporaryStringPool.refresh();
-      //this.temporaryNodePool.refresh();
+      synchronized (this.globalLock) {
+        this.persistentStringPool.refresh();
+        this.persistentNodePool.refresh();
+        // !!Review: Call rollback on temporary? NB. Can't rollback non XA-SP/NP.
+        //this.temporaryStringPool.refresh();
+        //this.temporaryNodePool.refresh();
 
-      for (int i = 0; i < this.resources.length; i++) {
-        this.resources[i].refresh();
+        for (int i = 0; i < this.resources.length; i++) {
+          this.resources[i].refresh();
+        }
       }
+    } finally {
+      releaseCurrentThread();
     }
   }
 
 
   public void prepare() throws SimpleXAResourceException {
-    synchronized (globalLock) {
-      if (logger.isDebugEnabled()) {
-        logger.debug("Preparing phase on StringPoolSession " + System.identityHashCode(this) + " SP=" + System.identityHashCode(persistentStringPool));
+    checkCurrentThread();
+    try {
+      synchronized (globalLock) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("Preparing phase on StringPoolSession " + System.identityHashCode(this) + " SP=" + System.identityHashCode(persistentStringPool));
+        }
+        if (state == PREPARE) {
+          return;
+        } else if (state != OBTAIN) {
+          throw new SimpleXAResourceException("Attempting to prepare phase without obtaining phase");
+        }
+    
+        state = PREPARE;
+    
+        persistentStringPool.prepare();
+        persistentNodePool.prepare();
+        for (int i = 0; i < resources.length; i++) {
+          resources[i].prepare();
+        }
       }
-      if (state == PREPARE) {
-        return;
-      } else if (state != OBTAIN) {
-        throw new SimpleXAResourceException("Attempting to prepare phase without obtaining phase");
-      }
-  
-      state = PREPARE;
-  
-      persistentStringPool.prepare();
-      persistentNodePool.prepare();
-      for (int i = 0; i < resources.length; i++) {
-        resources[i].prepare();
-      }
+    } finally {
+      releaseCurrentThread();
     }
   }
 
 
   public void commit() throws SimpleXAResourceException {
-    synchronized (globalLock) {
-      if (logger.isDebugEnabled()) {
-        logger.debug("Committing phase on StringPoolSession " + System.identityHashCode(this));
-      }
-      if (state == COMMIT) {
-        return;
-      } else if (state != PREPARE) {
-        throw new SimpleXAResourceException("Attempting to commit phase without preparing");
-      }
-  
-      state = COMMIT;
+    checkCurrentThread();
+    try {
+      synchronized (globalLock) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("Committing phase on StringPoolSession " + System.identityHashCode(this));
+        }
+        if (state == COMMIT) {
+          return;
+        } else if (state != PREPARE) {
+          throw new SimpleXAResourceException("Attempting to commit phase without preparing");
+        }
+    
+        state = COMMIT;
 
-      persistentStringPool.commit();
-      persistentNodePool.commit();
-      for (int i = 0; i < resources.length; i++) {
-        resources[i].commit();
+        persistentStringPool.commit();
+        persistentNodePool.commit();
+        for (int i = 0; i < resources.length; i++) {
+          resources[i].commit();
+        }
       }
+    } finally {
+      releaseCurrentThread();
     }
   }
 
 
   public void rollback() throws SimpleXAResourceException {
-    synchronized (globalLock) {
-      if (logger.isDebugEnabled()) {
-        logger.debug("Rollback phase on StringPoolSession " + System.identityHashCode(this));
+    checkCurrentThread();
+    try {
+      synchronized (globalLock) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("Rollback phase on StringPoolSession " + System.identityHashCode(this));
+        }
+        if (state == RELEASE) {
+          throw new SimpleXAResourceException("Attempting to rollback phase outside transaction");
+        }
+        state = ROLLBACK;
+        persistentStringPool.rollback();
+        persistentNodePool.rollback();
+        for (int i = 0; i < resources.length; i++) {
+          resources[i].rollback();
+        }
       }
-      if (state == RELEASE) {
-        throw new SimpleXAResourceException("Attempting to rollback phase outside transaction");
-      }
-      state = ROLLBACK;
-      persistentStringPool.rollback();
-      persistentNodePool.rollback();
-      for (int i = 0; i < resources.length; i++) {
-        resources[i].rollback();
-      }
+    } finally {
+      releaseCurrentThread();
     }
   }
 
 
   public void release() throws SimpleXAResourceException {
-    synchronized (globalLock) {
-      if (logger.isDebugEnabled()) {
-        logger.debug("Release phase on StringPoolSession " + System.identityHashCode(this));
+    checkCurrentThread();
+    try {
+      synchronized (globalLock) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("Release phase on StringPoolSession " + System.identityHashCode(this));
+        }
+        if (state == RELEASE) {
+          return;
+        } else if (state != COMMIT && state != ROLLBACK) {
+          throw new SimpleXAResourceException("Attempting to release phase without commit or rollback");
+        }
+    
+        state = RELEASE;
+    
+        persistentStringPool.release();
+        persistentNodePool.release();
+    
+        // TODO determine if release() should be called for the temp components.
+        //temporaryStringPool.release();
+        //temporaryNodePool.release();
+        for (int i = 0; i < resources.length; i++) {
+          resources[i].release();
+        }
       }
-      if (state == RELEASE) {
-        return;
-      } else if (state != COMMIT && state != ROLLBACK) {
-        throw new SimpleXAResourceException("Attempting to release phase without commit or rollback");
-      }
-  
-      state = RELEASE;
-  
-      persistentStringPool.release();
-      persistentNodePool.release();
-  
-      // TODO determine if release() should be called for the temp components.
-      //temporaryStringPool.release();
-      //temporaryNodePool.release();
-      for (int i = 0; i < resources.length; i++) {
-        resources[i].release();
-      }
+    } finally {
+      releaseCurrentThread();
     }
   }
 
@@ -733,4 +770,25 @@
   public long findGNode(SPObject spObject) throws StringPoolException {
     return persistentStringPool.findGNode(spObject, persistentNodePool);
   }
+
+  /**
+   * Used purely as a sanity check in the hope that we might catch concurrency bugs in higher layers should
+   * they exist.
+   */
+  private void checkCurrentThread() {
+    synchronized(this) {
+      if (currentThread == null || currentThread.equals(Thread.currentThread())) {
+        currentThread = Thread.currentThread();
+      } else {
+        logger.warn("Concurrent Access of StringPoolSession Attempted");
+        throw new IllegalStateException("Concurrent Access of StringPoolSession Attempted");
+      }
+    }
+  }
+
+  private void releaseCurrentThread() {
+    synchronized(this) {
+      currentThread = null;
+    }
+  }
 }




More information about the Mulgara-svn mailing list