[Mulgara-svn] r1361 - trunk/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed

ronald at mulgara.org ronald at mulgara.org
Thu Oct 23 13:13:07 UTC 2008


Author: ronald
Date: 2008-10-23 06:13:06 -0700 (Thu, 23 Oct 2008)
New Revision: 1361

Added:
   trunk/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/SessionCache.java
Modified:
   trunk/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/DistributedResolver.java
   trunk/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/DistributedResolverFactory.java
   trunk/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/NetworkDelegator.java
Log:
Added caching of Session's and their factories across transactions.

Modified: trunk/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/DistributedResolver.java
===================================================================
--- trunk/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/DistributedResolver.java	2008-10-23 13:13:00 UTC (rev 1360)
+++ trunk/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/DistributedResolver.java	2008-10-23 13:13:06 UTC (rev 1361)
@@ -57,18 +57,19 @@
    * @param resolverSession the session this resolver is associated with.
    * @param resolverFactory the factory this resolver is associated with.
    * @param canWrite        whether the current transaction is read-only or r/w
+   * @param sessionCache    the session cache to use
    * @throws IllegalArgumentException if <var>resolverSession</var> is <code>null</code>
    * @throws ResolverFactoryException if the superclass is unable to handle its arguments
    */
   DistributedResolver(ResolverSession resolverSession, ResolverFactory resolverFactory,
-                      boolean canWrite) throws ResolverFactoryException {
+                      boolean canWrite, SessionCache sessionCache) throws ResolverFactoryException {
 
     if (logger.isDebugEnabled()) logger.debug("Instantiating a distributed resolver");
 
     // Validate "resolverSession" parameter
     if (resolverSession == null) throw new IllegalArgumentException( "Null \"resolverSession\" parameter");
 
-    delegator = new NetworkDelegator(resolverSession, canWrite, this);
+    delegator = new NetworkDelegator(resolverSession, canWrite, this, sessionCache);
     xares = new DistributedXAResource(10, resolverFactory, delegator);
   }
 

Modified: trunk/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/DistributedResolverFactory.java
===================================================================
--- trunk/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/DistributedResolverFactory.java	2008-10-23 13:13:00 UTC (rev 1360)
+++ trunk/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/DistributedResolverFactory.java	2008-10-23 13:13:06 UTC (rev 1361)
@@ -47,6 +47,9 @@
   /** Protocols which are handled by the served resolver. */
   private static final Set<String> protocols = new HashSet<String>(Arrays.asList("rmi"));
 
+  /** The session cache to use */
+  private final SessionCache sessionCache = new SessionCache();
+
   /**
    * Instantiate a {@link DistributedResolverFactory}.
    * @param initializer The system initializer to be registered with.
@@ -68,6 +71,7 @@
    * {@inheritDoc ResolverFactory}
    */
   public void close() {
+    sessionCache.close();
   }
 
   /**
@@ -124,7 +128,7 @@
       logger.debug("Creating new distributed resolver");
       if (canWrite) logger.debug("Expecting to write to distributed resolver.");
     }
-    return new DistributedResolver(resolverSession, this, canWrite);
+    return new DistributedResolver(resolverSession, this, canWrite, sessionCache);
   }
 
 

Modified: trunk/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/NetworkDelegator.java
===================================================================
--- trunk/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/NetworkDelegator.java	2008-10-23 13:13:00 UTC (rev 1360)
+++ trunk/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/NetworkDelegator.java	2008-10-23 13:13:06 UTC (rev 1361)
@@ -26,10 +26,8 @@
 import org.mulgara.query.Variable;
 import org.mulgara.query.rdf.URIReferenceImpl;
 import org.mulgara.server.Session;
-import org.mulgara.server.SessionFactory;
 import org.mulgara.server.ServerInfo;
 import org.mulgara.server.NonRemoteSessionException;
-import org.mulgara.server.driver.SessionFactoryFinder;
 import org.mulgara.server.driver.SessionFactoryFinderException;
 import org.mulgara.resolver.distributed.remote.StatementSetFactory;
 import org.mulgara.resolver.spi.GlobalizeException;
@@ -71,14 +69,11 @@
   /** The transaction coordinator with which to register new XAResource's */
   private final TransactionCoordinator txCord;
 
-  /** A cache of distributed sessions. */
-  private Map<URI,Session> sessionCache = new HashMap<URI,Session>();
+  /** The session cache to use. */
+  private final SessionCache sessionCache;
 
-  /**
-   * A cache of distributed session factories.
-   * Each entry matches an entry in sessionCache, but a separate set for cleaner code.
-   */
-  private List<SessionFactory> factoryCache = new ArrayList<SessionFactory>();
+  /** The map of distributed sessions. */
+  private Map<URI,Session> sessionMap = new HashMap<URI,Session>();
 
 
   /**
@@ -86,11 +81,14 @@
    * @param session The session to delegate resolution through.
    * @param forWrite Whether to open this for writes or for read-only
    * @param txCord the transaction-coordinator being used
+   * @param sessionCache the session cache to use
    */
-  public NetworkDelegator(ResolverSession session, boolean forWrite, TransactionCoordinator txCord) {
+  public NetworkDelegator(ResolverSession session, boolean forWrite, TransactionCoordinator txCord,
+                          SessionCache sessionCache) {
     this.session = session;
     this.forWrite = forWrite;
     this.txCord = txCord;
+    this.sessionCache = sessionCache;
   }
 
 
@@ -313,7 +311,7 @@
    * @throws QueryException Thrown when the session cannot be created.
    */
   protected Session getServerSession(URI serverUri) throws QueryException {
-    Session session = sessionCache.get(serverUri);
+    Session session = sessionMap.get(serverUri);
     return (session != null) ? session : newSession(serverUri);
   }
 
@@ -326,14 +324,10 @@
    */
   protected Session newSession(URI serverUri) throws QueryException {
     try {
-      // The factory won't be in the cache, as a corresponding session would have already been created.
-      SessionFactory sessionFactory = SessionFactoryFinder.newSessionFactory(serverUri, true);
-      factoryCache.add(sessionFactory);
+      // get a new session
+      Session session = sessionCache.getSession(serverUri);
+      sessionMap.put(serverUri, session);
 
-      // now create the session
-      Session session = sessionFactory.newSession();
-      sessionCache.put(serverUri, session);
-
       // get the XAResource and enlist it
       txCord.enlistResource(forWrite ? session.getXAResource() : session.getReadOnlyXAResource());
 
@@ -349,22 +343,11 @@
   }
 
   /**
-   * Close all sessions and factories used by this delegator.
+   * Return all sessions used by this delegator.
    */
   public void close() {
-    for (Session s: sessionCache.values()) {
-      try {
-        s.close();
-      } catch (QueryException qe) {
-        logger.error("Exception while closing session", qe);
-      }
+    for (Map.Entry<URI,Session> e : sessionMap.entrySet()) {
+      sessionCache.returnSession(e.getKey(), e.getValue());
     }
-    for (SessionFactory sf: factoryCache) {
-      try {
-        sf.close();
-      } catch (QueryException qe) {
-        logger.error("Exception while closing session", qe);
-      }
-    }
   }
 }

Added: trunk/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/SessionCache.java
===================================================================
--- trunk/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/SessionCache.java	                        (rev 0)
+++ trunk/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/SessionCache.java	2008-10-23 13:13:06 UTC (rev 1361)
@@ -0,0 +1,138 @@
+/*
+ * Copyright 2008 The Topaz Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ * Contributions:
+ */
+
+package org.mulgara.resolver.distributed;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.log4j.Logger;
+
+import org.mulgara.query.QueryException;
+import org.mulgara.server.Session;
+import org.mulgara.server.SessionFactory;
+import org.mulgara.server.NonRemoteSessionException;
+import org.mulgara.server.driver.SessionFactoryFinder;
+import org.mulgara.server.driver.SessionFactoryFinderException;
+
+/**
+ * A simple cache of {@link SessionFactory}'s and {@link Session}'s. Note that there is currently
+ * no cache eviction policy, as the assumption is that this will hold a relatively small set of
+ * session's.
+ *
+ * @created 2008-02-16
+ * @author Ronald Tschalär
+ * @copyright &copy;2008 <a href="http://www.topazproject.org/">Topaz Project</a>
+ * @licence Apache License v2.0
+ */
+public class SessionCache {
+  private static Logger logger = Logger.getLogger(SessionCache.class);
+
+  private final Map<URI,SessionFactory> factoryCache = new HashMap<URI,SessionFactory>();
+  private final ConcurrentMap<URI,List<Session>> sessionCache = new ConcurrentHashMap<URI,List<Session>>();
+
+  private SessionFactory getSessionFactory(URI serverUri)
+      throws SessionFactoryFinderException, NonRemoteSessionException {
+    synchronized (factoryCache) {
+      SessionFactory sessionFactory = factoryCache.get(serverUri);
+      if (sessionFactory == null) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("Creating session-factory for server '" + serverUri + "'");
+        }
+        factoryCache.put(serverUri,
+                         sessionFactory = SessionFactoryFinder.newSessionFactory(serverUri, true));
+      }
+
+      return sessionFactory;
+    }
+  }
+
+  /**
+   * Get a session from the cache. A new session will be created none are available.
+   *
+   * @param serverUri the server for which to get the session
+   * @return the session
+   * @throws SessionFactoryFinderException if an error occurred creating the session-factory
+   * @throws NonRemoteSessionException     if an error occurred creating the session-factory
+   * @throws QueryException                if an error occurred creating the session
+   * @see #returnSession(URI, Session)
+   */
+  public Session getSession(URI serverUri)
+      throws SessionFactoryFinderException, NonRemoteSessionException, QueryException {
+    List<Session> sessions = sessionCache.get(serverUri);
+    if (sessions == null) {
+      sessionCache.putIfAbsent(serverUri, new ArrayList<Session>());
+      sessions = sessionCache.get(serverUri);
+    }
+
+    synchronized (sessions) {
+      if (sessions.size() > 0) {
+        return sessions.remove(sessions.size() - 1);
+      } else {
+        if (logger.isDebugEnabled()) {
+          logger.debug("Creating session for server '" + serverUri + "'");
+        }
+        return getSessionFactory(serverUri).newSession();
+      }
+    }
+  }
+
+  /**
+   * Return a session to the cache.
+   *
+   * @param serverUri the server this session belongs to
+   * @param session   the session to return
+   */
+  public void returnSession(URI serverUri, Session session) {
+    synchronized (serverUri.toString().intern()) {
+      sessionCache.get(serverUri).add(session);
+    }
+  }
+
+  /**
+   * Closes all sessions and factories.
+   */
+  public void close() {
+    synchronized (factoryCache) {
+      for (SessionFactory sf : factoryCache.values()) {
+        try {
+          sf.close();
+        } catch (QueryException qe) {
+          logger.error("Exception while closing session-factory", qe);
+        }
+      }
+      factoryCache.clear();
+
+      for (List<Session> sl : sessionCache.values()) {
+        for (Session s : sl) {
+          try {
+            s.close();
+          } catch (QueryException qe) {
+            logger.error("Exception while closing session", qe);
+          }
+        }
+      }
+      sessionCache.clear();
+    }
+  }
+}


Property changes on: trunk/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/SessionCache.java
___________________________________________________________________
Name: svn:keywords
   + Id HeadURL Revision
Name: svn:eol-style
   + native




More information about the Mulgara-svn mailing list