[Mulgara-svn] r173 - trunk/src/jar/server-rmi/java/org/mulgara/server/rmi
pag at mulgara.org
pag at mulgara.org
Fri Feb 23 22:17:06 UTC 2007
Author: pag
Date: 2007-02-23 16:17:06 -0600 (Fri, 23 Feb 2007)
New Revision: 173
Modified:
trunk/src/jar/server-rmi/java/org/mulgara/server/rmi/RemoteAnswerWrapperAnswer.java
Log:
Fixed possible overlaps of thread operations on RMI when Answer pages are retrieved in the background
Modified: trunk/src/jar/server-rmi/java/org/mulgara/server/rmi/RemoteAnswerWrapperAnswer.java
===================================================================
--- trunk/src/jar/server-rmi/java/org/mulgara/server/rmi/RemoteAnswerWrapperAnswer.java 2007-02-23 22:15:54 UTC (rev 172)
+++ trunk/src/jar/server-rmi/java/org/mulgara/server/rmi/RemoteAnswerWrapperAnswer.java 2007-02-23 22:17:06 UTC (rev 173)
@@ -146,11 +146,13 @@
public Object clone() {
try {
-
+ // protect the RMI threading model
+ waitForPrefetchThread();
RemoteAnswer remoteAnswer = this.remoteAnswer.remoteClone();
return new RemoteAnswerWrapperAnswer(remoteAnswer);
- }
- catch (RemoteException rex) {
+ } catch (RMITimeoutException rmie) {
+ throw new RuntimeException("Timeout waiting on server", rmie);
+ } catch (RemoteException rex) {
throw new RuntimeException("Clone failed on server", rex);
}
}
@@ -172,8 +174,7 @@
obj = new RemoteAnswerWrapperAnswer( (RemoteAnswer) obj);
}
return obj;
- }
- catch (RemoteException e) {
+ } catch (RemoteException e) {
throw new TuplesException("Unable to get column " + column, e);
}
}
@@ -195,11 +196,9 @@
obj = new RemoteAnswerWrapperAnswer( (RemoteAnswer) obj);
}
return obj;
+ } catch (RemoteException e) {
+ throw new TuplesException("Unable to get column \"" + columnName + "\"", e);
}
- catch (RemoteException e) {
- throw new TuplesException("Unable to get column \"" + columnName + "\"",
- e);
- }
}
//
@@ -216,20 +215,17 @@
waitForPrefetchThread();
if (onFirstPage) {
currentPage.beforeFirstInPage();
- }
- else {
+ } else {
currentPage = remoteAnswer.beforeFirstAndInitPage();
// make onFirstPage false if the page is invalid
onFirstPage = (currentPage != null);
// Abandon the last prefetched page, and start a new prefetch thread
prefetchThread = new PrefetchThread(new Throwable());
}
- }
- catch (RemoteException er) {
+ } catch (RemoteException er) {
logger.warn("RemoteException thrown in beforeFirst", er);
throw new TuplesException("Couldn't reset remote cursor", er);
- }
- catch (RMITimeoutException te) {
+ } catch (RMITimeoutException te) {
throw new TuplesException("Couldn't reset remote cursor", te);
}
}
@@ -244,19 +240,19 @@
//ensure the prefetchThread is not fetching next page
if (prefetchThread != null) {
try {
-
- prefetchThread.join();
- }
- catch (InterruptedException ie) {
-
+ prefetchThread.join(timeout);
+ } catch (InterruptedException ie) {
logger.info("Join on prefetchThread interrupted.", ie);
}
+ if (!prefetchThread.hasFinished()) {
+ logger.warn("No RMI data returned within " + timeout + "ms while closing");
+ }
+ prefetchThread = null;
}
if (closed != null) {
logger.warn("Was already closed at: " + closed);
- throw new TuplesException("Attempting to close answer twice.",
- new Throwable());
+ throw new TuplesException("Attempting to close answer twice.", new Throwable());
}
closed = new Throwable();
@@ -265,8 +261,7 @@
remoteAnswer.close();
currentPage = null;
onFirstPage = false;
- }
- catch (RemoteException e) {
+ } catch (RemoteException e) {
throw new TuplesException("Couldn't close remote cursor", e);
}
// set the remote answer to null for the sake of the finalize method below
@@ -341,9 +336,12 @@
*/
public boolean isUnconstrained() throws TuplesException {
try {
+ waitForPrefetchThread();
+ assert prefetchThread == null || prefetchThread.hasFinished();
return remoteAnswer.isUnconstrained();
- }
- catch (RemoteException e) {
+ } catch (RMITimeoutException rmie) {
+ throw new RuntimeException("Timeout waiting on server", rmie);
+ } catch (RemoteException e) {
throw new TuplesException("Can't test for unconstrained", e);
}
}
@@ -356,27 +354,36 @@
*/
public long getRowCount() throws TuplesException {
try {
+ waitForPrefetchThread();
+ assert prefetchThread == null || prefetchThread.hasFinished();
return remoteAnswer.getRowCount();
- }
- catch (RemoteException e) {
+ } catch (RMITimeoutException rmie) {
+ throw new RuntimeException("Timeout waiting on server", rmie);
+ } catch (RemoteException e) {
throw new TuplesException("Can't get remote row count", e);
}
}
public long getRowUpperBound() throws TuplesException {
try {
+ waitForPrefetchThread();
+ assert prefetchThread == null || prefetchThread.hasFinished();
return remoteAnswer.getRowUpperBound();
- }
- catch (RemoteException e) {
+ } catch (RMITimeoutException rmie) {
+ throw new RuntimeException("Timeout waiting on server", rmie);
+ } catch (RemoteException e) {
throw new TuplesException("Can't get remote row upper bound", e);
}
}
public int getRowCardinality() throws TuplesException {
try {
+ waitForPrefetchThread();
+ assert prefetchThread == null || prefetchThread.hasFinished();
return remoteAnswer.getRowCardinality();
- }
- catch (RemoteException e) {
+ } catch (RMITimeoutException rmie) {
+ throw new RuntimeException("Timeout waiting on server", rmie);
+ } catch (RemoteException e) {
throw new TuplesException("Can't get remote row cardinality", e);
}
}
@@ -407,8 +414,7 @@
// move to the next page
if (currentPage != null && currentPage.isLastPage()) {
currentPage = null;
- }
- else {
+ } else {
currentPage = nextPage();
}
@@ -417,8 +423,7 @@
boolean test = currentPage.nextInPage();
assert test || ! (currentPage instanceof AnswerPageImpl);
// instances of AnswerPageImpl should be null if it contains no valid rows
- }
- else {
+ } else {
// no valid page: if it was the first page, then turn the flag off
onFirstPage = false;
}
@@ -426,11 +431,9 @@
}
// Return true if we have a current valid page (page can't be finished at this point)
return currentPage != null;
- }
- catch (RemoteException e) {
+ } catch (RemoteException e) {
throw new TuplesException("Can't advance remote cursor", e);
- }
- catch (RMITimeoutException te) {
+ } catch (RMITimeoutException te) {
throw new TuplesException("Can't get to next page of answers", te);
}
}
@@ -441,8 +444,7 @@
*
* @return The next page from the answer.
*/
- protected AnswerPage nextPage() throws RMITimeoutException, TuplesException,
- RemoteException {
+ protected AnswerPage nextPage() throws RMITimeoutException, TuplesException, RemoteException {
waitForPrefetchThread();
assert prefetchThread == null || prefetchThread.hasFinished();
@@ -452,8 +454,7 @@
if (prefetchThread != null) {
// a finished thread exists
page = prefetchThread.getPendingPage();
- }
- else {
+ } else {
// no old thread
page = remoteAnswer.nextPage();
}
@@ -473,15 +474,13 @@
if (prefetchThread != null) {
try {
prefetchThread.join(timeout);
- }
- catch (InterruptedException ie) {
+ } catch (InterruptedException ie) {
// Not concerned about interruptions, only in finishing
}
if (!prefetchThread.hasFinished()) {
// abandon the joining thread
prefetchThread = null;
- throw new RMITimeoutException("No data returned within " + timeout +
- "ms");
+ throw new RMITimeoutException("No data returned within " + timeout + "ms");
}
}
}
@@ -493,8 +492,7 @@
if (remoteAnswer != null) {
try {
remoteAnswer.close();
- }
- catch (Exception e) {
+ } catch (Exception e) {
// forget the exception
}
remoteAnswer = null;
@@ -534,8 +532,7 @@
try {
page = remoteAnswer.nextPage();
finished = true;
- }
- catch (Exception e) {
+ } catch (Exception e) {
// finished will never be set
// log exception and include the stack trace that created this Thread.
More information about the Mulgara-svn
mailing list