[Mulgara-svn] r173 - trunk/src/jar/server-rmi/java/org/mulgara/server/rmi

pag at mulgara.org pag at mulgara.org
Fri Feb 23 22:17:06 UTC 2007


Author: pag
Date: 2007-02-23 16:17:06 -0600 (Fri, 23 Feb 2007)
New Revision: 173

Modified:
   trunk/src/jar/server-rmi/java/org/mulgara/server/rmi/RemoteAnswerWrapperAnswer.java
Log:
Fixed possible overlaps of thread operations on RMI when Answer pages are retrieved in the background

Modified: trunk/src/jar/server-rmi/java/org/mulgara/server/rmi/RemoteAnswerWrapperAnswer.java
===================================================================
--- trunk/src/jar/server-rmi/java/org/mulgara/server/rmi/RemoteAnswerWrapperAnswer.java	2007-02-23 22:15:54 UTC (rev 172)
+++ trunk/src/jar/server-rmi/java/org/mulgara/server/rmi/RemoteAnswerWrapperAnswer.java	2007-02-23 22:17:06 UTC (rev 173)
@@ -146,11 +146,13 @@
   public Object clone() {
 
     try {
-
+      // protect the RMI threading model
+      waitForPrefetchThread();
       RemoteAnswer remoteAnswer = this.remoteAnswer.remoteClone();
       return new RemoteAnswerWrapperAnswer(remoteAnswer);
-    }
-    catch (RemoteException rex) {
+    } catch (RMITimeoutException rmie) {
+      throw new RuntimeException("Timeout waiting on server", rmie);
+    } catch (RemoteException rex) {
       throw new RuntimeException("Clone failed on server", rex);
     }
   }
@@ -172,8 +174,7 @@
         obj = new RemoteAnswerWrapperAnswer( (RemoteAnswer) obj);
       }
       return obj;
-    }
-    catch (RemoteException e) {
+    } catch (RemoteException e) {
       throw new TuplesException("Unable to get column " + column, e);
     }
   }
@@ -195,11 +196,9 @@
         obj = new RemoteAnswerWrapperAnswer( (RemoteAnswer) obj);
       }
       return obj;
+    } catch (RemoteException e) {
+      throw new TuplesException("Unable to get column \"" + columnName + "\"", e);
     }
-    catch (RemoteException e) {
-      throw new TuplesException("Unable to get column \"" + columnName + "\"",
-                                e);
-    }
   }
 
   //
@@ -216,20 +215,17 @@
       waitForPrefetchThread();
       if (onFirstPage) {
         currentPage.beforeFirstInPage();
-      }
-      else {
+      } else {
         currentPage = remoteAnswer.beforeFirstAndInitPage();
         // make onFirstPage false if the page is invalid
         onFirstPage = (currentPage != null);
         // Abandon the last prefetched page, and start a new prefetch thread
         prefetchThread = new PrefetchThread(new Throwable());
       }
-    }
-    catch (RemoteException er) {
+    } catch (RemoteException er) {
       logger.warn("RemoteException thrown in beforeFirst", er);
       throw new TuplesException("Couldn't reset remote cursor", er);
-    }
-    catch (RMITimeoutException te) {
+    } catch (RMITimeoutException te) {
       throw new TuplesException("Couldn't reset remote cursor", te);
     }
   }
@@ -244,19 +240,19 @@
     //ensure the prefetchThread is not fetching next page
     if (prefetchThread != null) {
       try {
-
-        prefetchThread.join();
-      }
-      catch (InterruptedException ie) {
-
+        prefetchThread.join(timeout);
+      } catch (InterruptedException ie) {
         logger.info("Join on prefetchThread interrupted.", ie);
       }
+      if (!prefetchThread.hasFinished()) {
+        logger.warn("No RMI data returned within " + timeout + "ms while closing");
+      }
+      prefetchThread = null;
     }
 
     if (closed != null) {
       logger.warn("Was already closed at: " + closed);
-      throw new TuplesException("Attempting to close answer twice.",
-                                new Throwable());
+      throw new TuplesException("Attempting to close answer twice.", new Throwable());
     }
     closed = new Throwable();
 
@@ -265,8 +261,7 @@
       remoteAnswer.close();
       currentPage = null;
       onFirstPage = false;
-    }
-    catch (RemoteException e) {
+    } catch (RemoteException e) {
       throw new TuplesException("Couldn't close remote cursor", e);
     }
     // set the remote answer to null for the sake of the finalize method below
@@ -341,9 +336,12 @@
    */
   public boolean isUnconstrained() throws TuplesException {
     try {
+      waitForPrefetchThread();
+      assert prefetchThread == null || prefetchThread.hasFinished();
       return remoteAnswer.isUnconstrained();
-    }
-    catch (RemoteException e) {
+    } catch (RMITimeoutException rmie) {
+      throw new RuntimeException("Timeout waiting on server", rmie);
+    } catch (RemoteException e) {
       throw new TuplesException("Can't test for unconstrained", e);
     }
   }
@@ -356,27 +354,36 @@
    */
   public long getRowCount() throws TuplesException {
     try {
+      waitForPrefetchThread();
+      assert prefetchThread == null || prefetchThread.hasFinished();
       return remoteAnswer.getRowCount();
-    }
-    catch (RemoteException e) {
+    } catch (RMITimeoutException rmie) {
+      throw new RuntimeException("Timeout waiting on server", rmie);
+    } catch (RemoteException e) {
       throw new TuplesException("Can't get remote row count", e);
     }
   }
 
   public long getRowUpperBound() throws TuplesException {
     try {
+      waitForPrefetchThread();
+      assert prefetchThread == null || prefetchThread.hasFinished();
       return remoteAnswer.getRowUpperBound();
-    }
-    catch (RemoteException e) {
+    } catch (RMITimeoutException rmie) {
+      throw new RuntimeException("Timeout waiting on server", rmie);
+    } catch (RemoteException e) {
       throw new TuplesException("Can't get remote row upper bound", e);
     }
   }
 
   public int getRowCardinality() throws TuplesException {
     try {
+      waitForPrefetchThread();
+      assert prefetchThread == null || prefetchThread.hasFinished();
       return remoteAnswer.getRowCardinality();
-    }
-    catch (RemoteException e) {
+    } catch (RMITimeoutException rmie) {
+      throw new RuntimeException("Timeout waiting on server", rmie);
+    } catch (RemoteException e) {
       throw new TuplesException("Can't get remote row cardinality", e);
     }
   }
@@ -407,8 +414,7 @@
         // move to the next page
         if (currentPage != null && currentPage.isLastPage()) {
           currentPage = null;
-        }
-        else {
+        } else {
           currentPage = nextPage();
         }
 
@@ -417,8 +423,7 @@
           boolean test = currentPage.nextInPage();
           assert test || ! (currentPage instanceof AnswerPageImpl);
           // instances of AnswerPageImpl should be null if it contains no valid rows
-        }
-        else {
+        } else {
           // no valid page: if it was the first page, then turn the flag off
           onFirstPage = false;
         }
@@ -426,11 +431,9 @@
       }
       // Return true if we have a current valid page (page can't be finished at this point)
       return currentPage != null;
-    }
-    catch (RemoteException e) {
+    } catch (RemoteException e) {
       throw new TuplesException("Can't advance remote cursor", e);
-    }
-    catch (RMITimeoutException te) {
+    } catch (RMITimeoutException te) {
       throw new TuplesException("Can't get to next page of answers", te);
     }
   }
@@ -441,8 +444,7 @@
    *
    * @return The next page from the answer.
    */
-  protected AnswerPage nextPage() throws RMITimeoutException, TuplesException,
-      RemoteException {
+  protected AnswerPage nextPage() throws RMITimeoutException, TuplesException, RemoteException {
 
     waitForPrefetchThread();
     assert prefetchThread == null || prefetchThread.hasFinished();
@@ -452,8 +454,7 @@
     if (prefetchThread != null) {
       // a finished thread exists
       page = prefetchThread.getPendingPage();
-    }
-    else {
+    } else {
       // no old thread
       page = remoteAnswer.nextPage();
     }
@@ -473,15 +474,13 @@
     if (prefetchThread != null) {
       try {
         prefetchThread.join(timeout);
-      }
-      catch (InterruptedException ie) {
+      } catch (InterruptedException ie) {
         // Not concerned about interruptions, only in finishing
       }
       if (!prefetchThread.hasFinished()) {
         // abandon the joining thread
         prefetchThread = null;
-        throw new RMITimeoutException("No data returned within " + timeout +
-                                      "ms");
+        throw new RMITimeoutException("No data returned within " + timeout + "ms");
       }
     }
   }
@@ -493,8 +492,7 @@
     if (remoteAnswer != null) {
       try {
         remoteAnswer.close();
-      }
-      catch (Exception e) {
+      } catch (Exception e) {
         // forget the exception
       }
       remoteAnswer = null;
@@ -534,8 +532,7 @@
       try {
         page = remoteAnswer.nextPage();
         finished = true;
-      }
-      catch (Exception e) {
+      } catch (Exception e) {
         // finished will never be set
 
         // log exception and include the stack trace that created this Thread.




More information about the Mulgara-svn mailing list