[Mulgara-svn] r1910 - in trunk/src/jar: query/java/org/mulgara/connection querylang/java/org/mulgara/protocol/http

alexhall at mulgara.org alexhall at mulgara.org
Fri Feb 5 20:47:55 UTC 2010


Author: alexhall
Date: 2010-02-05 12:47:54 -0800 (Fri, 05 Feb 2010)
New Revision: 1910

Modified:
   trunk/src/jar/query/java/org/mulgara/connection/CommandExecutor.java
   trunk/src/jar/query/java/org/mulgara/connection/ConnectionFactory.java
   trunk/src/jar/query/java/org/mulgara/connection/SessionConnection.java
   trunk/src/jar/querylang/java/org/mulgara/protocol/http/MulgaraServlet.java
Log:
Add logic to the ConnectionFactory to use the Interruptible RMI thread factory with client connections to remote servers.

Modified: trunk/src/jar/query/java/org/mulgara/connection/CommandExecutor.java
===================================================================
--- trunk/src/jar/query/java/org/mulgara/connection/CommandExecutor.java	2010-02-05 20:46:08 UTC (rev 1909)
+++ trunk/src/jar/query/java/org/mulgara/connection/CommandExecutor.java	2010-02-05 20:47:54 UTC (rev 1910)
@@ -52,7 +52,7 @@
 public abstract class CommandExecutor implements Connection {
   
   /** Factory for creating proxy threads. */
-  private final ThreadFactory threadFactory;
+  private ThreadFactory threadFactory;
   
   // Fields used to implement session locking and cancellation.
   private Thread sessionThread = null;
@@ -70,6 +70,15 @@
   }
 
   /**
+   * Sets the factory for creating proxy threads.
+   * @param threadFactory If non-null, then every call to {@link #execute(org.mulgara.connection.Connection.SessionOp)}
+   * will perform the operation in a proxy thread created with this factory.
+   */
+  void setThreadFactory(ThreadFactory threadFactory) {
+    this.threadFactory = threadFactory;
+  }
+  
+  /**
    * @see org.mulgara.connection.Connection#execute(org.mulgara.query.operation.Command)
    */
   public String execute(Command cmd) throws Exception {

Modified: trunk/src/jar/query/java/org/mulgara/connection/ConnectionFactory.java
===================================================================
--- trunk/src/jar/query/java/org/mulgara/connection/ConnectionFactory.java	2010-02-05 20:46:08 UTC (rev 1909)
+++ trunk/src/jar/query/java/org/mulgara/connection/ConnectionFactory.java	2010-02-05 20:47:54 UTC (rev 1910)
@@ -31,6 +31,7 @@
 import org.apache.log4j.Logger;
 import org.mulgara.query.QueryException;
 import org.mulgara.server.Session;
+import org.mulgara.util.Rmi;
 
 
 /**
@@ -47,7 +48,7 @@
  * @licence <a href="{@docRoot}/../../LICENCE.txt">Open Software License v3.0</a>
  */
 public class ConnectionFactory {
-
+  
   /** The logger. */
   private final static Logger logger = Logger.getLogger(ConnectionFactory.class.getName());
 
@@ -97,11 +98,27 @@
    * connections that have been abandoned but not closed.
    */
   private Set<Session> sessionsInUse;
- 
+  
+  /** Flag to determine whether to use interruptible RMI operations for remote session connections. */
+  private final boolean useInterruptibleRmi;
+  
   /**
-   * Default constructor.
+   * Default constructor. Uses the configured system default behavior for interruptible RMI.
    */
   public ConnectionFactory() {
+    this(Rmi.getDefaultInterrupt());
+  }
+  
+
+  /**
+   * Construct a connection factory, with optional support for interruptible RMI operations
+   * on remote session connections.
+   * @param useInterruptibleRmi If <tt>true</tt>, then the connections created by this
+   * factory will support interrupting RMI operations. This behavior must also be enabled
+   * on the server in order to take advantage of this feature.
+   */
+  public ConnectionFactory(boolean useInterruptibleRmi) {
+    this.useInterruptibleRmi = useInterruptibleRmi;
     cacheOnUri = new HashMap<URI,Set<Session>>();
     sessionsInUse = new HashSet<Session>();
   }
@@ -132,13 +149,10 @@
     }
       
     if (session == null) {
-      if (isLocalServer(serverUri)) {
-        c = new SessionConnection(serverUri, false);
-      } else {
-        c = new SessionConnection(serverUri);
-      }
+      boolean isRemote = !isLocalServer(serverUri);
+      c = new SessionConnection(serverUri, isRemote, useInterruptibleRmi);
     } else {
-      c = new SessionConnection(session, null, serverUri);
+      c = new SessionConnection(session, null, serverUri, useInterruptibleRmi);
     }
     c.setFactory(this);
     
@@ -160,10 +174,10 @@
    * @throws ConnectionException There was an error getting a connection.
    */
   public Connection newConnection(Session session) throws ConnectionException {
-    return new SessionConnection(session, null, null);
+    return new SessionConnection(session, null, null, useInterruptibleRmi);
   }
 
-
+  
   /**
    * Close all Sessions cached by this factory. Sessions belonging to connections which are
    * still in use will not be affected. Exceptions are logged, but not acted on.

Modified: trunk/src/jar/query/java/org/mulgara/connection/SessionConnection.java
===================================================================
--- trunk/src/jar/query/java/org/mulgara/connection/SessionConnection.java	2010-02-05 20:46:08 UTC (rev 1909)
+++ trunk/src/jar/query/java/org/mulgara/connection/SessionConnection.java	2010-02-05 20:47:54 UTC (rev 1910)
@@ -17,6 +17,7 @@
 package org.mulgara.connection;
 
 import java.net.URI;
+import java.util.concurrent.ThreadFactory;
 
 import org.apache.log4j.Logger;
 import org.mulgara.query.QueryException;
@@ -25,6 +26,7 @@
 import org.mulgara.server.SessionFactory;
 import org.mulgara.server.driver.SessionFactoryFinder;
 import org.mulgara.server.driver.SessionFactoryFinderException;
+import org.neilja.net.interruptiblermi.InterruptibleRMIThreadFactory;
 
 /**
  * A connection for sending commands to a server using a session object.
@@ -39,6 +41,12 @@
   /** Logger. */
   private static final Logger logger = Logger.getLogger(SessionConnection.class.getName());
   
+  /** Thread factory used to proxy session operations when interruptible RMI is enabled. */
+  private static final ThreadFactory interruptibleFactory = InterruptibleRMIThreadFactory.getInstance();
+  
+  /** Flag to control whether interruptible operations on remote RMI sessions are supported. */
+  private final boolean useInterruptibleRmi;
+  
   /** The URI for the server to establish a session on. */
   private URI serverUri;
   
@@ -75,7 +83,21 @@
    * @throws ConnectionException There was a problem establishing the details needed for a connection.
    */
   public SessionConnection(URI serverUri, boolean isRemote) throws ConnectionException {
+    this(serverUri, isRemote, false);
+  }
+
+
+  /**
+   * Creates a new connection, given a URI to a server, a flag to indicate if the server
+   * should be "remote", and another to indicate whether to use interruptible RMI operations.
+   * @param serverUri The URI to connect to.
+   * @param isRemote <code>true</code> for a remote session, <code>false</code> for local.
+   * @param useInterruptibleRmi <code>true</code> to support interruptible RMI operations on remote sessions.
+   * @throws ConnectionException There was a problem establishing the details needed for a connection.
+   */
+  public SessionConnection(URI serverUri, boolean isRemote, boolean useInterruptibleRmi) throws ConnectionException {
     super(null);
+    this.useInterruptibleRmi = useInterruptibleRmi;
     setServerUri(serverUri, isRemote);
   }
 
@@ -85,7 +107,7 @@
    * @param session The session to connect with.
    */
   public SessionConnection(Session session) {
-    this(session, null, null);
+    this(session, null, null, false);
   }
   
   /**
@@ -94,7 +116,7 @@
    * @param securityDomainUri The security domain URI for the session
    */
   public SessionConnection(Session session, URI securityDomainUri) {
-    this(session, securityDomainUri, null);
+    this(session, securityDomainUri, null, false);
   }
   
   
@@ -105,8 +127,21 @@
    * @param serverUri The server URI, needed for re-caching the session with the factory
    */
   public SessionConnection(Session session, URI securityDomainUri, URI serverUri) {
+    this(session, securityDomainUri, serverUri, false);
+  }
+  
+  
+  /**
+   * Creates a new connection, given a preassigned session
+   * @param session The session to connect with
+   * @param securityDomainUri The security domain URI for the session
+   * @param serverUri The server URI, needed for re-caching the session with the factory
+   * @param useInterruptibleRmi <code>true</code> to support interruptible RMI operations on remote sessions.
+   */
+  public SessionConnection(Session session, URI securityDomainUri, URI serverUri, boolean useInterruptibleRmi) {
     super(null);
     if (session == null) throw new IllegalArgumentException("Cannot create a connection without a server.");
+    this.useInterruptibleRmi = useInterruptibleRmi;
     setSession(session, securityDomainUri, serverUri);    
   }
   
@@ -260,6 +295,9 @@
     this.session = session;
     this.securityDomainUri = securityDomainUri;
     this.serverUri = serverUri;
+    if (this.useInterruptibleRmi && !session.isLocal()) {
+      setThreadFactory(interruptibleFactory);
+    }
     if (logger.isDebugEnabled()) logger.debug("Set server URI to: " + serverUri);
   }
 

Modified: trunk/src/jar/querylang/java/org/mulgara/protocol/http/MulgaraServlet.java
===================================================================
--- trunk/src/jar/querylang/java/org/mulgara/protocol/http/MulgaraServlet.java	2010-02-05 20:46:08 UTC (rev 1909)
+++ trunk/src/jar/querylang/java/org/mulgara/protocol/http/MulgaraServlet.java	2010-02-05 20:47:54 UTC (rev 1910)
@@ -142,7 +142,7 @@
   private Connection getConnection() throws QueryException, IOException {
     SessionFactory sessionFactory = getSessionFactory();
     if (sessionFactory != null) {
-      return new SessionConnection(sessionFactory.newSession(), null, null);
+      return new SessionConnection(sessionFactory.newSession(), null, null, false);
     } else {
       try {
         return getConnectionFactory().newConnection(serverUri);




More information about the Mulgara-svn mailing list