[Mulgara-svn] r1910 - in trunk/src/jar: query/java/org/mulgara/connection querylang/java/org/mulgara/protocol/http
alexhall at mulgara.org
alexhall at mulgara.org
Fri Feb 5 20:47:55 UTC 2010
Author: alexhall
Date: 2010-02-05 12:47:54 -0800 (Fri, 05 Feb 2010)
New Revision: 1910
Modified:
trunk/src/jar/query/java/org/mulgara/connection/CommandExecutor.java
trunk/src/jar/query/java/org/mulgara/connection/ConnectionFactory.java
trunk/src/jar/query/java/org/mulgara/connection/SessionConnection.java
trunk/src/jar/querylang/java/org/mulgara/protocol/http/MulgaraServlet.java
Log:
Add logic to the ConnectionFactory to use the Interruptible RMI thread factory with client connections to remote servers.
Modified: trunk/src/jar/query/java/org/mulgara/connection/CommandExecutor.java
===================================================================
--- trunk/src/jar/query/java/org/mulgara/connection/CommandExecutor.java 2010-02-05 20:46:08 UTC (rev 1909)
+++ trunk/src/jar/query/java/org/mulgara/connection/CommandExecutor.java 2010-02-05 20:47:54 UTC (rev 1910)
@@ -52,7 +52,7 @@
public abstract class CommandExecutor implements Connection {
/** Factory for creating proxy threads. */
- private final ThreadFactory threadFactory;
+ private ThreadFactory threadFactory;
// Fields used to implement session locking and cancellation.
private Thread sessionThread = null;
@@ -70,6 +70,15 @@
}
/**
+ * Sets the factory for creating proxy threads.
+ * @param threadFactory If non-null, then every call to {@link #execute(org.mulgara.connection.Connection.SessionOp)}
+ * will perform the operation in a proxy thread created with this factory.
+ */
+ void setThreadFactory(ThreadFactory threadFactory) {
+ this.threadFactory = threadFactory;
+ }
+
+ /**
* @see org.mulgara.connection.Connection#execute(org.mulgara.query.operation.Command)
*/
public String execute(Command cmd) throws Exception {
Modified: trunk/src/jar/query/java/org/mulgara/connection/ConnectionFactory.java
===================================================================
--- trunk/src/jar/query/java/org/mulgara/connection/ConnectionFactory.java 2010-02-05 20:46:08 UTC (rev 1909)
+++ trunk/src/jar/query/java/org/mulgara/connection/ConnectionFactory.java 2010-02-05 20:47:54 UTC (rev 1910)
@@ -31,6 +31,7 @@
import org.apache.log4j.Logger;
import org.mulgara.query.QueryException;
import org.mulgara.server.Session;
+import org.mulgara.util.Rmi;
/**
@@ -47,7 +48,7 @@
* @licence <a href="{@docRoot}/../../LICENCE.txt">Open Software License v3.0</a>
*/
public class ConnectionFactory {
-
+
/** The logger. */
private final static Logger logger = Logger.getLogger(ConnectionFactory.class.getName());
@@ -97,11 +98,27 @@
* connections that have been abandoned but not closed.
*/
private Set<Session> sessionsInUse;
-
+
+ /** Flag to determine whether to use interruptible RMI operations for remote session connections. */
+ private final boolean useInterruptibleRmi;
+
/**
- * Default constructor.
+ * Default constructor. Uses the configured system default behavior for interruptible RMI.
*/
public ConnectionFactory() {
+ this(Rmi.getDefaultInterrupt());
+ }
+
+
+ /**
+ * Construct a connection factory, with optional support for interruptible RMI operations
+ * on remote session connections.
+ * @param useInterruptibleRmi If <tt>true</tt>, then the connections created by this
+ * factory will support interrupting RMI operations. This behavior must also be enabled
+ * on the server in order to take advantage of this feature.
+ */
+ public ConnectionFactory(boolean useInterruptibleRmi) {
+ this.useInterruptibleRmi = useInterruptibleRmi;
cacheOnUri = new HashMap<URI,Set<Session>>();
sessionsInUse = new HashSet<Session>();
}
@@ -132,13 +149,10 @@
}
if (session == null) {
- if (isLocalServer(serverUri)) {
- c = new SessionConnection(serverUri, false);
- } else {
- c = new SessionConnection(serverUri);
- }
+ boolean isRemote = !isLocalServer(serverUri);
+ c = new SessionConnection(serverUri, isRemote, useInterruptibleRmi);
} else {
- c = new SessionConnection(session, null, serverUri);
+ c = new SessionConnection(session, null, serverUri, useInterruptibleRmi);
}
c.setFactory(this);
@@ -160,10 +174,10 @@
* @throws ConnectionException There was an error getting a connection.
*/
public Connection newConnection(Session session) throws ConnectionException {
- return new SessionConnection(session, null, null);
+ return new SessionConnection(session, null, null, useInterruptibleRmi);
}
-
+
/**
* Close all Sessions cached by this factory. Sessions belonging to connections which are
* still in use will not be affected. Exceptions are logged, but not acted on.
Modified: trunk/src/jar/query/java/org/mulgara/connection/SessionConnection.java
===================================================================
--- trunk/src/jar/query/java/org/mulgara/connection/SessionConnection.java 2010-02-05 20:46:08 UTC (rev 1909)
+++ trunk/src/jar/query/java/org/mulgara/connection/SessionConnection.java 2010-02-05 20:47:54 UTC (rev 1910)
@@ -17,6 +17,7 @@
package org.mulgara.connection;
import java.net.URI;
+import java.util.concurrent.ThreadFactory;
import org.apache.log4j.Logger;
import org.mulgara.query.QueryException;
@@ -25,6 +26,7 @@
import org.mulgara.server.SessionFactory;
import org.mulgara.server.driver.SessionFactoryFinder;
import org.mulgara.server.driver.SessionFactoryFinderException;
+import org.neilja.net.interruptiblermi.InterruptibleRMIThreadFactory;
/**
* A connection for sending commands to a server using a session object.
@@ -39,6 +41,12 @@
/** Logger. */
private static final Logger logger = Logger.getLogger(SessionConnection.class.getName());
+ /** Thread factory used to proxy session operations when interruptible RMI is enabled. */
+ private static final ThreadFactory interruptibleFactory = InterruptibleRMIThreadFactory.getInstance();
+
+ /** Flag to control whether interruptible operations on remote RMI sessions are supported. */
+ private final boolean useInterruptibleRmi;
+
/** The URI for the server to establish a session on. */
private URI serverUri;
@@ -75,7 +83,21 @@
* @throws ConnectionException There was a problem establishing the details needed for a connection.
*/
public SessionConnection(URI serverUri, boolean isRemote) throws ConnectionException {
+ this(serverUri, isRemote, false);
+ }
+
+
+ /**
+ * Creates a new connection, given a URI to a server, a flag to indicate if the server
+ * should be "remote", and another to indicate whether to use interruptible RMI operations.
+ * @param serverUri The URI to connect to.
+ * @param isRemote <code>true</code> for a remote session, <code>false</code> for local.
+ * @param useInterruptibleRmi <code>true</code> to support interruptible RMI operations on remote sessions.
+ * @throws ConnectionException There was a problem establishing the details needed for a connection.
+ */
+ public SessionConnection(URI serverUri, boolean isRemote, boolean useInterruptibleRmi) throws ConnectionException {
super(null);
+ this.useInterruptibleRmi = useInterruptibleRmi;
setServerUri(serverUri, isRemote);
}
@@ -85,7 +107,7 @@
* @param session The session to connect with.
*/
public SessionConnection(Session session) {
- this(session, null, null);
+ this(session, null, null, false);
}
/**
@@ -94,7 +116,7 @@
* @param securityDomainUri The security domain URI for the session
*/
public SessionConnection(Session session, URI securityDomainUri) {
- this(session, securityDomainUri, null);
+ this(session, securityDomainUri, null, false);
}
@@ -105,8 +127,21 @@
* @param serverUri The server URI, needed for re-caching the session with the factory
*/
public SessionConnection(Session session, URI securityDomainUri, URI serverUri) {
+ this(session, securityDomainUri, serverUri, false);
+ }
+
+
+ /**
+ * Creates a new connection, given a preassigned session
+ * @param session The session to connect with
+ * @param securityDomainUri The security domain URI for the session
+ * @param serverUri The server URI, needed for re-caching the session with the factory
+ * @param useInterruptibleRmi <code>true</code> to support interruptible RMI operations on remote sessions.
+ */
+ public SessionConnection(Session session, URI securityDomainUri, URI serverUri, boolean useInterruptibleRmi) {
super(null);
if (session == null) throw new IllegalArgumentException("Cannot create a connection without a server.");
+ this.useInterruptibleRmi = useInterruptibleRmi;
setSession(session, securityDomainUri, serverUri);
}
@@ -260,6 +295,9 @@
this.session = session;
this.securityDomainUri = securityDomainUri;
this.serverUri = serverUri;
+ if (this.useInterruptibleRmi && !session.isLocal()) {
+ setThreadFactory(interruptibleFactory);
+ }
if (logger.isDebugEnabled()) logger.debug("Set server URI to: " + serverUri);
}
Modified: trunk/src/jar/querylang/java/org/mulgara/protocol/http/MulgaraServlet.java
===================================================================
--- trunk/src/jar/querylang/java/org/mulgara/protocol/http/MulgaraServlet.java 2010-02-05 20:46:08 UTC (rev 1909)
+++ trunk/src/jar/querylang/java/org/mulgara/protocol/http/MulgaraServlet.java 2010-02-05 20:47:54 UTC (rev 1910)
@@ -142,7 +142,7 @@
private Connection getConnection() throws QueryException, IOException {
SessionFactory sessionFactory = getSessionFactory();
if (sessionFactory != null) {
- return new SessionConnection(sessionFactory.newSession(), null, null);
+ return new SessionConnection(sessionFactory.newSession(), null, null, false);
} else {
try {
return getConnectionFactory().newConnection(serverUri);
More information about the Mulgara-svn
mailing list