[Mulgara-svn] r1902 - in trunk/src/jar/query/java/org/mulgara: connection query query/operation

alexhall at mulgara.org alexhall at mulgara.org
Fri Jan 29 21:19:03 UTC 2010


Author: alexhall
Date: 2010-01-29 13:19:02 -0800 (Fri, 29 Jan 2010)
New Revision: 1902

Added:
   trunk/src/jar/query/java/org/mulgara/connection/ConnectionUnitTest.java
   trunk/src/jar/query/java/org/mulgara/query/operation/AbstractCommand.java
   trunk/src/jar/query/java/org/mulgara/query/operation/SessionCommand.java
Modified:
   trunk/src/jar/query/java/org/mulgara/connection/CommandExecutor.java
   trunk/src/jar/query/java/org/mulgara/connection/Connection.java
   trunk/src/jar/query/java/org/mulgara/connection/ConnectionFactoryUnitTest.java
   trunk/src/jar/query/java/org/mulgara/connection/DummyConnection.java
   trunk/src/jar/query/java/org/mulgara/connection/SessionConnection.java
   trunk/src/jar/query/java/org/mulgara/query/AskQuery.java
   trunk/src/jar/query/java/org/mulgara/query/ConstructQuery.java
   trunk/src/jar/query/java/org/mulgara/query/Query.java
   trunk/src/jar/query/java/org/mulgara/query/operation/ApplyRules.java
   trunk/src/jar/query/java/org/mulgara/query/operation/Backup.java
   trunk/src/jar/query/java/org/mulgara/query/operation/Commit.java
   trunk/src/jar/query/java/org/mulgara/query/operation/CreateGraph.java
   trunk/src/jar/query/java/org/mulgara/query/operation/DataInputTx.java
   trunk/src/jar/query/java/org/mulgara/query/operation/DataOutputTx.java
   trunk/src/jar/query/java/org/mulgara/query/operation/Deletion.java
   trunk/src/jar/query/java/org/mulgara/query/operation/DropGraph.java
   trunk/src/jar/query/java/org/mulgara/query/operation/Export.java
   trunk/src/jar/query/java/org/mulgara/query/operation/Insertion.java
   trunk/src/jar/query/java/org/mulgara/query/operation/Load.java
   trunk/src/jar/query/java/org/mulgara/query/operation/LocalCommand.java
   trunk/src/jar/query/java/org/mulgara/query/operation/Modification.java
   trunk/src/jar/query/java/org/mulgara/query/operation/Restore.java
   trunk/src/jar/query/java/org/mulgara/query/operation/Rollback.java
   trunk/src/jar/query/java/org/mulgara/query/operation/ServerCommand.java
   trunk/src/jar/query/java/org/mulgara/query/operation/TransactionCommand.java
Log:
Refactoring the Command and Connection interfaces. Connection.getSession() has been deprecated in favor of the new method Connection.execute(SessionOp) which enforces concurrency constraints on the underlying session. There is also a new Connection.cancel() method that will interrupt the currently executing operation, although nothing is responding to the interrupt at the moment.

Modified: trunk/src/jar/query/java/org/mulgara/connection/CommandExecutor.java
===================================================================
--- trunk/src/jar/query/java/org/mulgara/connection/CommandExecutor.java	2010-01-28 20:57:46 UTC (rev 1901)
+++ trunk/src/jar/query/java/org/mulgara/connection/CommandExecutor.java	2010-01-29 21:19:02 UTC (rev 1902)
@@ -16,6 +16,12 @@
 
 package org.mulgara.connection;
 
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 import org.mulgara.query.Answer;
 import org.mulgara.query.AskQuery;
 import org.mulgara.query.BooleanAnswer;
@@ -26,10 +32,17 @@
 import org.mulgara.query.TuplesException;
 import org.mulgara.query.operation.Command;
 import org.mulgara.query.operation.Load;
+import org.mulgara.server.Session;
 
 
 /**
  * A central point to direct to commands on a connection.
+ * 
+ * This class also synchronizes access to the session backing this connection, to
+ * ensure that the session is not accessed concurrently by multiple threads.
+ * 
+ * Cancellation is implemented by calling {@link Thread#interrupt()} on the thread
+ * currently accessing the session.
  *
  * @created Feb 22, 2008
  * @author Paul Gearon
@@ -37,8 +50,26 @@
  * @licence <a href="{@docRoot}/../../LICENCE.txt">Open Software License v3.0</a>
  */
 public abstract class CommandExecutor implements Connection {
+  
+  /** Factory for creating proxy threads. */
+  private final ThreadFactory threadFactory;
+  
+  // Fields used to implement session locking and cancellation.
+  private Thread sessionThread = null;
+  private final Lock sessionLock = new ReentrantLock();
+  private final ReadWriteLock threadLock = new ReentrantReadWriteLock();
 
   /**
+   * Construct an command executor, with an optional thread factory that will be used
+   * to create proxy threads for executing operations.
+   * @param threadFactory If non-null, then every call to {@link #execute(org.mulgara.connection.Connection.SessionOp)}
+   * will perform the operation in a proxy thread created with this factory.
+   */
+  public CommandExecutor(ThreadFactory threadFactory) {
+    this.threadFactory = threadFactory;
+  }
+
+  /**
    * @see org.mulgara.connection.Connection#execute(org.mulgara.query.operation.Command)
    */
   public String execute(Command cmd) throws Exception {
@@ -73,4 +104,133 @@
     return (GraphAnswer)cmd.execute(this);
   }
 
+  /* (non-Javadoc)
+   * @see org.mulgara.connection.Connection#execute(org.mulgara.connection.Connection.Executable)
+   */
+  @Override
+  final public <T,E extends Exception> T execute(SessionOp<T,E> cmd) throws E {
+    return (this.threadFactory != null) ? executeWithProxy(cmd) : doExecute(cmd);
+  }
+  
+  /**
+   * Execute the given operation atomically on the session, using a new proxy thread.
+   */
+  @SuppressWarnings("unchecked")
+  private <T,E extends Exception> T executeWithProxy(final SessionOp<T,E> cmd) throws E {
+    assert this.threadFactory != null;
+    final Wrapper<T> result = new Wrapper<T>();
+    final Wrapper<Throwable> exception = new Wrapper<Throwable>();
+    
+    Runnable r = new Runnable() {
+      @Override
+      public void run() {
+        try {
+          result.set(doExecute(cmd));
+        } catch (Throwable t) {
+          // Save the error to re-throw in the calling thread - catch Throwable to make
+          // sure all possible errors get reported to the caller (uncaught errors in a
+          // child thread are usually lost).
+          exception.set(t);
+        }
+      }
+    };
+    
+    Thread t = this.threadFactory.newThread(r);
+    t.start();
+    
+    while (t.isAlive()) {
+      try {
+        t.join();
+      } catch (InterruptedException e) {
+        t.interrupt();
+      }
+    }
+    
+    Throwable th = exception.get();
+    if (th != null) {
+      // The caught exception should have been an instance of the generic type E, but
+      // we can't use generic types in a catch statement. First, try an unchecked cast
+      // to the declared generic exception type.
+      try {
+        throw (E)th;
+      } catch (ClassCastException cce) {
+        // Whatever was caught wasn't of the declared generic exception type.
+        // It could be RuntimeException or Error, which don't need to be declared.
+        // Check for those, and if all else fails wrap in a RuntimeException so we can re-throw.
+        if (th instanceof RuntimeException) {
+          throw (RuntimeException)th;
+        } else if (th instanceof Error) {
+          throw (Error)th;
+        } else {
+          // TODO This could potentially mask a more serious exception -- 
+          // don't know how else to throw the proper generic exception type
+          throw new RuntimeException("Unexpected exception in proxy thread", th);
+        }
+      }
+    }
+    
+    // No error, so return the operation result.
+    return result.get();
+  }
+
+  /**
+   * Execute the given operation atomically on the session that backs this connection.
+   */
+  @SuppressWarnings("deprecation")
+  private <T,E extends Exception> T doExecute(SessionOp<T,E> cmd) throws E {
+    sessionLock.lock();
+    try {
+      Session session = getSession();
+      setSessionThread(Thread.currentThread());
+      
+      try {
+        // TODO To be completely safe, we could wrap the session in a closeable facade, but that's probably overkill.
+        return cmd.fn(session);
+      } finally {
+        setSessionThread(null);
+      }
+    } finally {
+      sessionLock.unlock();
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see org.mulgara.connection.Connection#cancel()
+   */
+  @Override
+  final public void cancel() {
+    threadLock.readLock().lock();
+    try {
+      if (sessionThread != null) sessionThread.interrupt();
+    } finally {
+      threadLock.readLock().unlock();
+    }
+  }
+
+  /**
+   * Sets the thread currently using the session, subject to a write-lock on the thread.
+   * @param t
+   */
+  private void setSessionThread(Thread t) {
+    threadLock.writeLock().lock();
+    try {
+      this.sessionThread = t;
+    } finally {
+      threadLock.writeLock().unlock();
+    }
+  }
+  
+  /**
+   * Utility class for wrapping a variable so it can be get and set from an anonymous inner class.
+   * @param <T> The type of object being wrapped.
+   */
+  private static class Wrapper<T> {
+    private T value = null;
+    
+    /** Set the wrapped value. */
+    public void set(T value) { this.value = value; }
+    
+    /** Get the wrapped value. */
+    public T get() { return this.value; }
+  }
 }

Modified: trunk/src/jar/query/java/org/mulgara/connection/Connection.java
===================================================================
--- trunk/src/jar/query/java/org/mulgara/connection/Connection.java	2010-01-28 20:57:46 UTC (rev 1901)
+++ trunk/src/jar/query/java/org/mulgara/connection/Connection.java	2010-01-29 21:19:02 UTC (rev 1902)
@@ -24,8 +24,10 @@
 import org.mulgara.query.Query;
 import org.mulgara.query.QueryException;
 import org.mulgara.query.TuplesException;
+import org.mulgara.query.operation.Command;
+import org.mulgara.query.operation.Load;
 import org.mulgara.server.Session;
-import org.mulgara.query.operation.*;
+import org.mulgara.util.functional.Fn1E;
 
 /**
  * This interface abstracts connections to a server, holding any information relevant to that
@@ -116,6 +118,7 @@
 
   /**
    * @return the session
+   * @deprecated Clients should not access the session directly; use {@link #execute(SessionOp)} instead.
    */
   public Session getSession();
 
@@ -162,6 +165,18 @@
    * @throws Exception A general exception catchall
    */
   public String execute(Command cmd) throws Exception;
+  
+  /**
+   * Executes the given operation synchronously and atomically against the session underlying this connection.
+   * Note that the session passed to the operation may be <tt>null</tt> in the case of
+   * a dummy local connection.
+   * @param <T> The type of object returned by the operation.
+   * @param <E> The type of exception thrown by the operation.
+   * @param cmd The operation to be executed.
+   * @return The result of the operation.
+   * @throws E If an exception occurred executing the operation.
+   */
+  public <T,E extends Exception> T execute(SessionOp<T,E> cmd) throws E;
 
   /**
    * Loads data from a file or URL
@@ -183,5 +198,18 @@
    * @return A BooleanAnswer with the true/false result of the query.
    */
   public BooleanAnswer execute(AskQuery cmd) throws QueryException, TuplesException;
+  
+  /**
+   * Cancels the operation currently executing against this connection's session, if
+   * there is one. The operation will be canceled using the {@link Thread#interrupt()} method.
+   */
+  public void cancel();
 
+  /**
+   * An operation that can be executed against a session and throw an exception.
+   * @param <T> The result type of the operation.
+   * @param <E> The type of exception thrown by the operation.
+   */
+  public interface SessionOp<T,E extends Exception> extends Fn1E<Session,T,E> {}
+  
 }

Modified: trunk/src/jar/query/java/org/mulgara/connection/ConnectionFactoryUnitTest.java
===================================================================
--- trunk/src/jar/query/java/org/mulgara/connection/ConnectionFactoryUnitTest.java	2010-01-28 20:57:46 UTC (rev 1901)
+++ trunk/src/jar/query/java/org/mulgara/connection/ConnectionFactoryUnitTest.java	2010-01-29 21:19:02 UTC (rev 1902)
@@ -45,6 +45,7 @@
  * @copyright &copy; 2008 <a href="http://www.revelytix.com">Revelytix, Inc.</a>
  * @licence <a href="{@docRoot}/../../LICENCE.txt">Open Software License v3.0</a>
  */
+ at SuppressWarnings("deprecation") // Tests use the deprecated Connection.getSession() method.
 public class ConnectionFactoryUnitTest extends TestCase {
   
   /** Factory under testing */

Added: trunk/src/jar/query/java/org/mulgara/connection/ConnectionUnitTest.java
===================================================================
--- trunk/src/jar/query/java/org/mulgara/connection/ConnectionUnitTest.java	                        (rev 0)
+++ trunk/src/jar/query/java/org/mulgara/connection/ConnectionUnitTest.java	2010-01-29 21:19:02 UTC (rev 1902)
@@ -0,0 +1,298 @@
+/*
+ * Copyright 2010 Revelytix.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.mulgara.connection;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+import org.mulgara.connection.Connection.SessionOp;
+import org.mulgara.server.Session;
+
+/**
+ * Test the command execution methods of Connection interface.
+ */
+public class ConnectionUnitTest extends TestCase {
+  
+  protected static ThreadFactory threadFactory = new ThreadFactory() {
+    @Override
+    public Thread newThread(Runnable r) {
+      return new Thread(r);
+    }
+  };
+  
+  public ConnectionUnitTest(String name) {
+    super(name);
+  }
+
+  public static Test suite() {
+    TestSuite suite = new TestSuite();
+    suite.addTest(new ConnectionUnitTest("testExecute"));
+    suite.addTest(new ConnectionUnitTest("testExecuteProxy"));
+    suite.addTest(new ConnectionUnitTest("testException"));
+    suite.addTest(new ConnectionUnitTest("testExceptionProxy"));
+    suite.addTest(new ConnectionUnitTest("testLock"));
+    suite.addTest(new ConnectionUnitTest("testLockProxy"));
+    suite.addTest(new ConnectionUnitTest("testCancel"));
+    suite.addTest(new ConnectionUnitTest("testCancelProxy"));
+    return suite;
+  }
+  
+  public void testExecute() throws Exception {
+    doTestExecute(null);
+  }
+  
+  public void testExecuteProxy() throws Exception {
+    doTestExecute(threadFactory);
+  }
+  
+  public void testException() throws Exception {
+    doTestException(null);
+  }
+  
+  public void testExceptionProxy() throws Exception {
+    doTestException(threadFactory);
+  }
+  
+  public void testLock() throws Exception {
+    doTestLock(null);
+  }
+  
+  public void testLockProxy() throws Exception {
+    doTestLock(threadFactory);
+  }
+  
+  public void testCancel() throws Exception {
+    doTestCancel(null);
+  }
+  
+  public void testCancelProxy() throws Exception {
+    doTestCancel(threadFactory);
+  }
+  
+  protected void doTestExecute(ThreadFactory factory) throws Exception {
+    Connection conn = new DummyConnection(factory);
+    final int testValue = 123;
+    int value = conn.execute(new SessionOp<Integer,Exception>() {
+      @Override
+      public Integer fn(Session session) throws Exception {
+        return testValue;
+      }
+    });
+    assertEquals(testValue, value);
+    
+    final String exMsg = "Test Exception Message";
+    try {
+      conn.execute(new SessionOp<Object,Exception>() {
+        @Override
+        public Object fn(Session arg) throws Exception {
+          throw new Exception(exMsg);
+        }
+      });
+      fail("Should have thrown exception");
+    } catch (Exception ex) {
+      assertEquals(exMsg, ex.getMessage());
+    }
+  }
+  
+  protected void doTestException(ThreadFactory factory) throws Exception {
+    Connection conn = new DummyConnection(factory);
+    final Wrapper<Object> exception = new Wrapper<Object>();
+    
+    try {
+      conn.execute(new SessionOp<Object,ConnectionTestException>() {
+        @Override
+        public Object fn(Session arg) throws ConnectionTestException {
+          ConnectionTestException e = new ConnectionTestException("Test Exception Message");
+          exception.set(e);
+          throw e;
+        }
+      });
+      fail("Should have thrown exception");
+    } catch (ConnectionTestException e) {
+      assertTrue(e == exception.get());
+    }
+    
+    exception.set(null);
+    try {
+      conn.execute(new SessionOp<Object,ConnectionTestException>() {
+        @Override
+        public Object fn(Session arg) throws ConnectionTestException {
+          RuntimeException e = new RuntimeException("Test Exception Message");
+          exception.set(e);
+          throw e;
+        }
+      });
+      fail("Should have thrown exception");
+    } catch (RuntimeException e) {
+      assertTrue(e == exception.get());
+    }
+    
+    exception.set(null);
+    try {
+      conn.execute(new SessionOp<Object,ConnectionTestException>() {
+        @Override
+        public Object fn(Session arg) throws ConnectionTestException {
+          Error e = new Error("Test Exception Message");
+          exception.set(e);
+          throw e;
+        }
+      });
+      fail("Should have thrown exception");
+    } catch (Throwable th) {
+      assertTrue(th == exception.get());
+    }
+  }
+  
+  protected void doTestLock(ThreadFactory factory) throws Exception {
+    final Connection conn = new DummyConnection(factory);
+    final AtomicBoolean t1Started = new AtomicBoolean(false);
+    final AtomicBoolean t1Complete = new AtomicBoolean(false);
+    final AtomicBoolean t2Started = new AtomicBoolean(false);
+    final AtomicBoolean t1Error = new AtomicBoolean(false);
+    final AtomicBoolean t2Error = new AtomicBoolean(false);
+    
+    Runnable r1 = new Runnable() {
+      @Override
+      public void run() {
+        conn.execute(new SessionOp<Object,RuntimeException>() {
+          @Override
+          public Object fn(Session session) throws RuntimeException {
+            synchronized (t1Started) {
+              t1Started.set(true);
+              t1Started.notify();
+            }
+            safeSleep(5000);
+            if (t2Started.get()) t1Error.set(true);
+            t1Complete.set(true);
+            return null;
+          }
+        });
+      }
+    };
+    
+    Runnable r2 = new Runnable() {
+      @Override
+      public void run() {
+        synchronized (t1Started) {
+          while (!t1Started.get()) safeWait(t1Started);
+        }
+        conn.execute(new SessionOp<Object,RuntimeException>(){
+          @Override
+          public Object fn(Session session) throws RuntimeException {
+            if (!t1Complete.get()) t2Error.set(true);
+            return null;
+          }
+        });
+      }
+    };
+    
+    runAll(r1, r2);
+    
+    assertFalse("t2 started before t1 complete", t1Error.get() || t2Error.get());
+  }
+  
+  protected void doTestCancel(ThreadFactory factory) throws Exception {
+    final Connection conn = new DummyConnection(factory);
+    final AtomicBoolean interrupted = new AtomicBoolean(false);
+    
+    final Runnable r = new Runnable() {
+      @Override
+      public void run() {
+        boolean result = conn.execute(new SessionOp<Boolean,RuntimeException>() {
+          @Override
+          public Boolean fn(Session session) throws RuntimeException {
+            try {
+              Thread.sleep(5000);
+            } catch (InterruptedException e) {
+              return true;
+            }
+            return false;
+          }
+        });
+        interrupted.set(result);
+      }
+    };
+    
+    Thread t = new Thread(r);
+    t.start();
+    
+    safeSleep(1000);
+    conn.cancel();
+    
+    safeJoin(t);
+    assertTrue("thread should have been interrupted", interrupted.get());
+  }
+  
+  protected static void safeSleep(long millis) {
+    try {
+      Thread.sleep(millis);
+    } catch (InterruptedException e) {
+      fail("Interrupted while sleeping");
+    }
+  }
+  
+  protected static void safeWait(Object obj) {
+    try {
+      obj.wait();
+    } catch (InterruptedException e) {
+      fail("Interrupted while waiting");
+    }
+  }
+  
+  protected static void safeJoin(Thread t) {
+    try {
+      t.join();
+    } catch (InterruptedException e) {
+      fail("Interrupted while joining");
+    }
+  }
+  
+  protected static void runAll(Runnable... ops) {
+    List<Thread> threads = new ArrayList<Thread>(ops.length);
+    for (Runnable r : ops) {
+      Thread t = new Thread(r);
+      t.start();
+      threads.add(t);
+    }
+    for (Thread t : threads) {
+      safeJoin(t);
+    }
+  }
+  
+  protected static class ConnectionTestException extends Exception {
+    private static final long serialVersionUID = 7763762484291936870L;
+
+    public ConnectionTestException() {
+      super();
+    }
+
+    public ConnectionTestException(String message) {
+      super(message);
+    }
+  }
+  
+  private static class Wrapper<T> {
+    private T value = null;
+    public void set(T value) { this.value = value; }
+    public T get() { return this.value; }
+  }
+}

Modified: trunk/src/jar/query/java/org/mulgara/connection/DummyConnection.java
===================================================================
--- trunk/src/jar/query/java/org/mulgara/connection/DummyConnection.java	2010-01-28 20:57:46 UTC (rev 1901)
+++ trunk/src/jar/query/java/org/mulgara/connection/DummyConnection.java	2010-01-29 21:19:02 UTC (rev 1902)
@@ -17,6 +17,7 @@
 package org.mulgara.connection;
 
 import java.net.URI;
+import java.util.concurrent.ThreadFactory;
 
 import org.apache.log4j.Logger;
 import org.mulgara.query.QueryException;
@@ -45,10 +46,22 @@
    * Creates a new connection.
    */
   public DummyConnection() {
+    this(null);
   }
 
 
   /**
+   * Creates a new connection, using the given factory for proxy threads.
+   * This constructor is mostly for testing purposes; operations on a dummy connection
+   * are executed locally and don't need a proxy.
+   * @param threadFactory
+   */
+  public DummyConnection(ThreadFactory threadFactory) {
+    super(threadFactory);
+  }
+
+
+  /**
    * Give login credentials and security domain to a session.  This operation is ignored.
    * @param securityDomainUri The security domain for the login.
    * @param user The username.

Modified: trunk/src/jar/query/java/org/mulgara/connection/SessionConnection.java
===================================================================
--- trunk/src/jar/query/java/org/mulgara/connection/SessionConnection.java	2010-01-28 20:57:46 UTC (rev 1901)
+++ trunk/src/jar/query/java/org/mulgara/connection/SessionConnection.java	2010-01-29 21:19:02 UTC (rev 1902)
@@ -75,6 +75,7 @@
    * @throws ConnectionException There was a problem establishing the details needed for a connection.
    */
   public SessionConnection(URI serverUri, boolean isRemote) throws ConnectionException {
+    super(null);
     setServerUri(serverUri, isRemote);
   }
 
@@ -104,6 +105,7 @@
    * @param serverUri The server URI, needed for re-caching the session with the factory
    */
   public SessionConnection(Session session, URI securityDomainUri, URI serverUri) {
+    super(null);
     if (session == null) throw new IllegalArgumentException("Cannot create a connection without a server.");
     setSession(session, securityDomainUri, serverUri);    
   }

Modified: trunk/src/jar/query/java/org/mulgara/query/AskQuery.java
===================================================================
--- trunk/src/jar/query/java/org/mulgara/query/AskQuery.java	2010-01-28 20:57:46 UTC (rev 1901)
+++ trunk/src/jar/query/java/org/mulgara/query/AskQuery.java	2010-01-29 21:19:02 UTC (rev 1902)
@@ -20,6 +20,8 @@
 import java.util.List;
 
 import org.mulgara.connection.Connection;
+import org.mulgara.connection.Connection.SessionOp;
+import org.mulgara.server.Session;
 
 /**
  * A query type to indicate that the result should be boolean. A true result indicates
@@ -59,7 +61,13 @@
    * @return The answer to this query.  Closing is optional.
    */
   public BooleanAnswer execute(Connection conn) throws QueryException, TuplesException {
-    return new BooleanAnswer(conn.getSession().query(this));
+    boolean answer = conn.execute(new SessionOp<Boolean,QueryException>() {
+      @Override
+      public Boolean fn(Session session) throws QueryException {
+        return session.query(AskQuery.this);
+      }
+    });
+    return new BooleanAnswer(answer);
   }
 
 }

Modified: trunk/src/jar/query/java/org/mulgara/query/ConstructQuery.java
===================================================================
--- trunk/src/jar/query/java/org/mulgara/query/ConstructQuery.java	2010-01-28 20:57:46 UTC (rev 1901)
+++ trunk/src/jar/query/java/org/mulgara/query/ConstructQuery.java	2010-01-29 21:19:02 UTC (rev 1902)
@@ -19,6 +19,8 @@
 import java.util.List;
 
 import org.mulgara.connection.Connection;
+import org.mulgara.connection.Connection.SessionOp;
+import org.mulgara.server.Session;
 
 /**
  * A query type similar to SELECT, though it may only contain results with multiples of
@@ -56,7 +58,12 @@
    */
   public GraphAnswer execute(Connection conn) throws QueryException, TuplesException {
     // pipe all the query types through the one Session method
-    GraphAnswer answer = (GraphAnswer)conn.getSession().query(this);
+    GraphAnswer answer = conn.execute(new SessionOp<GraphAnswer,QueryException>(){
+      @Override
+      public GraphAnswer fn(Session session) throws QueryException {
+        return session.query(ConstructQuery.this);
+      }
+    });
     if (answer == null) throw new QueryException("Invalid answer received");
     // move to the first row
     answer.beforeFirst();

Modified: trunk/src/jar/query/java/org/mulgara/query/Query.java
===================================================================
--- trunk/src/jar/query/java/org/mulgara/query/Query.java	2010-01-28 20:57:46 UTC (rev 1901)
+++ trunk/src/jar/query/java/org/mulgara/query/Query.java	2010-01-29 21:19:02 UTC (rev 1902)
@@ -29,15 +29,22 @@
 package org.mulgara.query;
 
 // Java 2 standard packages;
-import java.io.*;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
 import java.net.URI;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 
-// Third party packages
-
-import org.apache.log4j.*;
+import org.apache.log4j.Logger;
 import org.mulgara.connection.Connection;
+import org.mulgara.connection.Connection.SessionOp;
 import org.mulgara.query.operation.Command;
+import org.mulgara.server.Session;
 
 /**
  * An ITQL query. This is a data structure used as an argument to the
@@ -586,7 +593,12 @@
    */
   public Answer execute(Connection conn) throws QueryException, TuplesException {
     if (logger.isDebugEnabled()) logger.debug("Executing query " + toString());
-    Answer answer = conn.getSession().query(this);
+    Answer answer = conn.execute(new SessionOp<Answer,QueryException>() {
+      @Override
+      public Answer fn(Session session) throws QueryException {
+        return session.query(Query.this);
+      }
+    });
     if (answer == null) throw new QueryException("Invalid answer received");
     if (logger.isDebugEnabled()) logger.debug("Successfully executed query");
     // move to the first row

Added: trunk/src/jar/query/java/org/mulgara/query/operation/AbstractCommand.java
===================================================================
--- trunk/src/jar/query/java/org/mulgara/query/operation/AbstractCommand.java	                        (rev 0)
+++ trunk/src/jar/query/java/org/mulgara/query/operation/AbstractCommand.java	2010-01-29 21:19:02 UTC (rev 1902)
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2010 Revelytix.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.mulgara.query.operation;
+
+public abstract class AbstractCommand implements Command {
+
+  /** The text used to create this command. */
+  private String textualForm;
+  /** The message set by the result of this command. */
+  private String resultMessage = "";
+
+  /**
+   * Indicates that the command modifies the state in a transaction.
+   * @return <code>true</code> If the transaction state is to be modified.
+   */
+  public boolean isTxCommitRollback() {
+    return false;
+  }
+
+  /**
+   * Indicates that this command cannot return an Answer.
+   * @return <code>false</code> by default.
+   */
+  public boolean isAnswerable() {
+    return false;
+  }
+
+  /** @see org.mulgara.query.operation.Command#setText(java.lang.String) */
+  public void setText(String text) {
+    textualForm = text;
+  }
+
+  /**
+   * Returns the textual representation of this Command. Same as {@link #toString()}.
+   * @return The text of the command.
+   */
+  public String getText() {
+    return textualForm;
+  }
+
+  /**
+   * Returns the textual representation of this Command.
+   * @return The text of the command.
+   */
+  public String toString() {
+    return textualForm;
+  }
+
+  /**
+   * Sets message text relevant to the operation.  Useful for the UI.
+   * @return The set text.
+   */
+  public String setResultMessage(String resultMessage) {
+    return this.resultMessage = resultMessage;
+  }
+  
+  /**
+   * Gets a message text relevant to the operation.  Useful for the UI.
+   * @return A text message associated with the result of this
+   * operation.
+   */
+  public String getResultMessage() {
+    return resultMessage;
+  }
+
+}
\ No newline at end of file

Modified: trunk/src/jar/query/java/org/mulgara/query/operation/ApplyRules.java
===================================================================
--- trunk/src/jar/query/java/org/mulgara/query/operation/ApplyRules.java	2010-01-28 20:57:46 UTC (rev 1901)
+++ trunk/src/jar/query/java/org/mulgara/query/operation/ApplyRules.java	2010-01-29 21:19:02 UTC (rev 1902)
@@ -20,11 +20,13 @@
 import java.rmi.RemoteException;
 
 import org.mulgara.connection.Connection;
+import org.mulgara.connection.Connection.SessionOp;
 import org.mulgara.query.GraphExpression;
 import org.mulgara.query.GraphResource;
 import org.mulgara.query.QueryException;
 import org.mulgara.rules.InitializerException;
 import org.mulgara.rules.RulesRef;
+import org.mulgara.server.Session;
 
 /**
  * Represents a command to apply rules to a set of data.
@@ -97,7 +99,7 @@
    * @throws QueryException Unable to read the rules.
    * @throws RemoteException There was a connectivity problem with the server.
    */
-  public Object execute(Connection conn) throws RemoteException, QueryException, InitializerException {
+  public Object execute(Connection conn) throws QueryException, InitializerException {
     return execute(conn, conn);
   }
 
@@ -110,12 +112,33 @@
    * @throws QueryException Unable to read the rules.
    * @throws RemoteException There was a connectivity problem with the server.
    */
-  public Object execute(Connection conn, Connection ruleConn) throws RemoteException, QueryException, InitializerException {
+  public Object execute(Connection conn, Connection ruleConn) throws QueryException, InitializerException {
     if (conn == null) throw new IllegalArgumentException("Connection may not be null");
+    if (ruleConn == null) throw new IllegalArgumentException("Rule Connection may not be null");
     // get the structure from the rule model
-    RulesRef rules = ruleConn.getSession().buildRules(ruleGraph, baseGraph, destGraph);
+    final RulesRef rules;
+    try {
+      rules = ruleConn.execute(new SessionOp<RulesRef,Exception>() {
+        @Override
+        public RulesRef fn(Session session) throws Exception {
+          return session.buildRules(ruleGraph, baseGraph, destGraph);
+        }
+      });
+    } catch (QueryException qe) {
+      throw qe;
+    } catch (InitializerException ie) {
+      throw ie;
+    } catch (Exception e) { // shouldn't happen.
+      throw new QueryException("Unexpected exception building rules", e);
+    }
     // create apply the rules to the model
-    conn.getSession().applyRules(rules);
+    conn.execute(new SessionOp<Object,QueryException>() {
+      @Override
+      public Object fn(Session session) throws QueryException {
+        session.applyRules(rules);
+        return null;
+      }
+    });
     return setResultMessage("Successfully applied " + ruleGraph + " to " + baseGraph + (GraphResource.sameAs(baseGraph, destGraph) ? "" : " => " + destGraph));
   }
 

Modified: trunk/src/jar/query/java/org/mulgara/query/operation/Backup.java
===================================================================
--- trunk/src/jar/query/java/org/mulgara/query/operation/Backup.java	2010-01-28 20:57:46 UTC (rev 1901)
+++ trunk/src/jar/query/java/org/mulgara/query/operation/Backup.java	2010-01-29 21:19:02 UTC (rev 1902)
@@ -23,7 +23,9 @@
 
 import org.apache.log4j.Logger;
 import org.mulgara.connection.Connection;
+import org.mulgara.connection.Connection.SessionOp;
 import org.mulgara.query.QueryException;
+import org.mulgara.server.Session;
 
 /**
  * Represents a command to back data up from a server.
@@ -111,7 +113,7 @@
       if (isLocal()) {
         getMarshalledData(conn);
       } else {
-        conn.getSession().backup(dest);
+        doTx(conn, dest);
       }
       
       if (logger.isDebugEnabled()) logger.debug("Completed backing up " + src + " to " + dest);
@@ -140,12 +142,33 @@
   }
 
   
-  /**
-   * Perform the transfer with the configured data stream.
+  /* (non-Javadoc)
+   * @see org.mulgara.query.operation.DataOutputTx#getOp(java.io.OutputStream)
    */
   @Override
-  protected void doTx(Connection conn, OutputStream outputStream) throws QueryException {
-    conn.getSession().backup(outputStream);
+  protected SessionOp<Object,QueryException> getOp(final OutputStream outputStream) {
+    return new SessionOp<Object,QueryException>() {
+      @Override
+      public Object fn(Session session) throws QueryException {
+        session.backup(outputStream);
+        return null;
+      }
+    };
   }
 
+  
+  /* (non-Javadoc)
+   * @see org.mulgara.query.operation.DataOutputTx#getOp(java.net.URI)
+   */
+  @Override
+  protected SessionOp<Object,QueryException> getOp(final URI destUri) {
+    return new SessionOp<Object,QueryException>() {
+      @Override
+      public Object fn(Session session) throws QueryException {
+        session.backup(destUri);
+        return null;
+      }
+    };
+  }
+
 }

Modified: trunk/src/jar/query/java/org/mulgara/query/operation/Commit.java
===================================================================
--- trunk/src/jar/query/java/org/mulgara/query/operation/Commit.java	2010-01-28 20:57:46 UTC (rev 1901)
+++ trunk/src/jar/query/java/org/mulgara/query/operation/Commit.java	2010-01-29 21:19:02 UTC (rev 1902)
@@ -36,22 +36,26 @@
  * @copyright &copy; 2007 <a href="mailto:pgearon at users.sourceforge.net">Paul Gearon</a>
  * @licence <a href="{@docRoot}/../../LICENCE.txt">Open Software License v3.0</a>
  */
-public class Commit extends TransactionCommand implements Command, TxOp {
+public class Commit extends TransactionCommand {
   
   /**
    * Commits the transaction on a connection.
    * @param conn Contains the session to commit. 
    * @throws QueryException There was a server error commiting the transaction.
    */
-  public Object execute(Connection conn) throws QueryException {
-    Session session = conn.getSession();
-    if (session != null) {
-      session.commit();
-      return setResultMessage("Successfully committed transaction");
-    } else {
-      assert conn instanceof org.mulgara.connection.DummyConnection;
-      return setResultMessage("Successfully committed transaction");
-    }
+  public TxExecutable getExecutable(final Connection conn) {
+    return new TxExecutable() {
+      @Override
+      public Object fn(Session session) throws QueryException {
+        if (session != null) {
+          session.commit();
+          return setResultMessage("Successfully committed transaction");
+        } else {
+          assert conn instanceof org.mulgara.connection.DummyConnection;
+          return setResultMessage("Skipped commit for internal connection");
+        }
+      }
+    };
   }
 
 
@@ -61,5 +65,4 @@
   public boolean stayInTx() {
     return true;
   }
-
 }

Modified: trunk/src/jar/query/java/org/mulgara/query/operation/CreateGraph.java
===================================================================
--- trunk/src/jar/query/java/org/mulgara/query/operation/CreateGraph.java	2010-01-28 20:57:46 UTC (rev 1901)
+++ trunk/src/jar/query/java/org/mulgara/query/operation/CreateGraph.java	2010-01-29 21:19:02 UTC (rev 1902)
@@ -19,8 +19,9 @@
 import java.net.URI;
 
 import org.apache.log4j.Logger;
-import org.mulgara.connection.Connection;
+import org.mulgara.connection.Connection.SessionOp;
 import org.mulgara.query.QueryException;
+import org.mulgara.server.Session;
 
 
 /**
@@ -30,7 +31,7 @@
  * @copyright &copy; 2007 <a href="mailto:pgearon at users.sourceforge.net">Paul Gearon</a>
  * @licence <a href="{@docRoot}/../../LICENCE.txt">Open Software License v3.0</a>
  */
-public class CreateGraph extends ServerCommand {
+public class CreateGraph extends SessionCommand {
 
   /** The logger */
   private static final Logger logger = Logger.getLogger(CreateGraph.class.getName());
@@ -83,10 +84,15 @@
    * @param conn The connection to a session to create the graph in.
    * @return Text describing the outcome.
    */
-  public Object execute(Connection conn) throws QueryException {
-    if (logger.isDebugEnabled()) logger.debug("Creating new graph " + graphUri);
-    conn.getSession().createModel(graphUri, type);
-    return setResultMessage("Successfully created graph " + graphUri);
+  public SessionOp<String,QueryException> getExecutable() {
+    return new SessionOp<String,QueryException>() {
+      @Override
+      public String fn(Session session) throws QueryException {
+        if (logger.isDebugEnabled()) logger.debug("Creating new graph " + graphUri);
+        session.createModel(graphUri, type);
+        return setResultMessage("Successfully created graph " + graphUri);
+      }
+    };
   }
 
 }

Modified: trunk/src/jar/query/java/org/mulgara/query/operation/DataInputTx.java
===================================================================
--- trunk/src/jar/query/java/org/mulgara/query/operation/DataInputTx.java	2010-01-28 20:57:46 UTC (rev 1901)
+++ trunk/src/jar/query/java/org/mulgara/query/operation/DataInputTx.java	2010-01-29 21:19:02 UTC (rev 1902)
@@ -25,6 +25,7 @@
 import java.util.zip.ZipInputStream;
 
 import org.mulgara.connection.Connection;
+import org.mulgara.connection.Connection.SessionOp;
 import org.mulgara.query.QueryException;
 import org.mulgara.util.Rmi;
 
@@ -33,13 +34,14 @@
 
 /**
  * Represents a command to move data into a graph (Load) or server (Restore).
+ * @param SourceType The type of object that provides the input for the command.
  * 
  * @created Jun 27, 2008
  * @author Alex Hall
  * @copyright &copy; 2008 <a href="http://www.revelytix.com">Revelytix, Inc.</a>
  * @licence <a href="{@docRoot}/../../LICENCE.txt">Open Software License v3.0</a>
  */
-public abstract class DataInputTx extends DataTx {
+public abstract class DataInputTx<SourceType> extends DataTx {
 
   /** String constant for the extension of gzip files. */
   private static final String GZIP_EXTENSION = ".gz";
@@ -84,10 +86,28 @@
    * Perform the input transfer with the configured datastream.
    * @return The number of statements affected, or <code>null</code> if this is not relevant.
    */
-  protected abstract Long doTx(Connection conn, InputStream inputStream) throws QueryException;
+  protected Long doTx(Connection conn, InputStream inputStream) throws QueryException {
+    return conn.execute(getExecutable(inputStream));
+  }
   
+  /**
+   * Perform the input transfer with the configured source.
+   */
+  protected Long doTx(Connection conn, SourceType src) throws QueryException {
+    return conn.execute(getExecutable(src));
+  }
   
   /**
+   * Get the operation that will transfer from the given source stream.
+   */
+  protected abstract SessionOp<Long,QueryException> getExecutable(InputStream inputStream);
+  
+  /**
+   * Get the operation that will transfer from the given source object.
+   */
+  protected abstract SessionOp<Long,QueryException> getExecutable(SourceType src);
+  
+  /**
    * Wrap the local source data (input stream or file URI) in an RMI object for marshalling, 
    * and send over the connection. Used by Load and Restore. Delegates to the {@link #doTx(Connection, InputStream)}
    * abstract method to send the data over the connection.

Modified: trunk/src/jar/query/java/org/mulgara/query/operation/DataOutputTx.java
===================================================================
--- trunk/src/jar/query/java/org/mulgara/query/operation/DataOutputTx.java	2010-01-28 20:57:46 UTC (rev 1901)
+++ trunk/src/jar/query/java/org/mulgara/query/operation/DataOutputTx.java	2010-01-29 21:19:02 UTC (rev 1902)
@@ -24,6 +24,7 @@
 import java.rmi.NoSuchObjectException;
 
 import org.mulgara.connection.Connection;
+import org.mulgara.connection.Connection.SessionOp;
 import org.mulgara.query.QueryException;
 import org.mulgara.util.Rmi;
 
@@ -82,10 +83,28 @@
   /**
    * Perform the output transfer with the configured datastream.
    */
-  protected abstract void doTx(Connection conn, OutputStream outputStream) throws QueryException;
+  protected void doTx(Connection conn, OutputStream outputStream) throws QueryException {
+    conn.execute(getOp(outputStream));
+  }
+  
+  /**
+   * Perform the output transfer to the configured destination URI.
+   */
+  protected void doTx(Connection conn, URI destUri) throws QueryException {
+    conn.execute(getOp(destUri));
+  }
 
+  /**
+   * Get the operation that will be trasnfer to the output stream.
+   */
+  protected abstract SessionOp<Object,QueryException> getOp(final OutputStream outputStream);
 
   /**
+   * Get the operation that will be transfer to the destination URI.
+   */
+  protected abstract SessionOp<Object,QueryException> getOp(final URI destUri);
+
+  /**
    * Wrap the local destination object data (output stream or file URI) in an RMI object for marshalling, 
    * and receive over the connection. Delegates to the {@link #doTx(Connection, OutputStream)}
    * abstract method to send the data over the connection.

Modified: trunk/src/jar/query/java/org/mulgara/query/operation/Deletion.java
===================================================================
--- trunk/src/jar/query/java/org/mulgara/query/operation/Deletion.java	2010-01-28 20:57:46 UTC (rev 1901)
+++ trunk/src/jar/query/java/org/mulgara/query/operation/Deletion.java	2010-01-29 21:19:02 UTC (rev 1902)
@@ -20,9 +20,10 @@
 import java.util.Set;
 
 import org.jrdf.graph.Triple;
-import org.mulgara.connection.Connection;
+import org.mulgara.connection.Connection.SessionOp;
 import org.mulgara.query.Query;
 import org.mulgara.query.QueryException;
+import org.mulgara.server.Session;
 
 /**
  * An AST element for deleting from a graph.
@@ -51,15 +52,19 @@
     super(graph, selectQuery);
   }
 
-  /**
-   * Performs the deletion.
-   * @param conn the session to delete the data from the graph in.
-   * @return Text describing the action.
+  /* (non-Javadoc)
+   * @see org.mulgara.query.operation.SessionCommand#getExecutable()
    */
-  public Object execute(Connection conn) throws QueryException {
-    if (isSelectBased()) conn.getSession().delete(graph, getSelectQuery());
-    else conn.getSession().delete(graph, getStatements());
-    return setResultMessage("Successfully deleted statements from " + graph);
+  @Override
+  protected SessionOp<String,QueryException> getExecutable() {
+    return new SessionOp<String,QueryException>() {
+      @Override
+      public String fn(Session session) throws QueryException {
+        if (isSelectBased()) session.delete(graph, getSelectQuery());
+        else session.delete(graph, getStatements());
+        return setResultMessage("Successfully deleted statements from " + graph);
+      }
+    };
   }
 
 }

Modified: trunk/src/jar/query/java/org/mulgara/query/operation/DropGraph.java
===================================================================
--- trunk/src/jar/query/java/org/mulgara/query/operation/DropGraph.java	2010-01-28 20:57:46 UTC (rev 1901)
+++ trunk/src/jar/query/java/org/mulgara/query/operation/DropGraph.java	2010-01-29 21:19:02 UTC (rev 1902)
@@ -18,8 +18,9 @@
 
 import java.net.URI;
 
-import org.mulgara.connection.Connection;
+import org.mulgara.connection.Connection.SessionOp;
 import org.mulgara.query.QueryException;
+import org.mulgara.server.Session;
 
 /**
  * Represents a command to drop a graph.
@@ -28,7 +29,7 @@
  * @copyright &copy; 2007 <a href="mailto:pgearon at users.sourceforge.net">Paul Gearon</a>
  * @licence <a href="{@docRoot}/../../LICENCE.txt">Open Software License v3.0</a>
  */
-public class DropGraph extends ServerCommand {
+public class DropGraph extends SessionCommand {
 
   /** The URI for the graph. */
   private final URI graphUri;
@@ -46,14 +47,18 @@
     return graphUri;
   }
 
-  /**
-   * Performs the deletion.
-   * @param conn the session to delete the graph in.
-   * @return Text describing the action.
+  /* (non-Javadoc)
+   * @see org.mulgara.query.operation.SessionCommand#getExecutable()
    */
-  public Object execute(Connection conn) throws QueryException {
-    conn.getSession().removeModel(graphUri);
-    return setResultMessage("Successfully dropped graph " + graphUri);
+  @Override
+  protected SessionOp<String,QueryException> getExecutable() {
+    return new SessionOp<String,QueryException>() {
+      @Override
+      public String fn(Session session) throws QueryException {
+        session.removeModel(graphUri);
+        return setResultMessage("Successfully dropped graph " + graphUri);
+      }
+    };
   }
 
 }

Modified: trunk/src/jar/query/java/org/mulgara/query/operation/Export.java
===================================================================
--- trunk/src/jar/query/java/org/mulgara/query/operation/Export.java	2010-01-28 20:57:46 UTC (rev 1901)
+++ trunk/src/jar/query/java/org/mulgara/query/operation/Export.java	2010-01-29 21:19:02 UTC (rev 1902)
@@ -23,7 +23,9 @@
 import java.util.Map;
 
 import org.mulgara.connection.Connection;
+import org.mulgara.connection.Connection.SessionOp;
 import org.mulgara.query.QueryException;
+import org.mulgara.server.Session;
 
 /**
  * Represents a command to export data from a graph.
@@ -88,7 +90,7 @@
       if (isLocal()) {
         getMarshalledData(conn);
       } else {
-        conn.getSession().export(src, dest, namespacePrefixes);
+        doTx(conn, dest);
       } 
       
       if (logger.isDebugEnabled()) logger.debug("Completed backing up " + src + " to " + dest);
@@ -116,12 +118,33 @@
     export.execute(conn);
   }
 
-  /**
-   * Perform the transfer with the configured data stream.
+  /* (non-Javadoc)
+   * @see org.mulgara.query.operation.DataOutputTx#getOp(java.io.OutputStream)
    */
   @Override
-  protected void doTx(Connection conn, OutputStream outputStream) throws QueryException {
-    conn.getSession().export(getSource(), outputStream, namespacePrefixes);
+  protected SessionOp<Object,QueryException> getOp(final OutputStream outputStream) {
+    return new SessionOp<Object,QueryException>() {
+      @Override
+      public Object fn(Session session) throws QueryException {
+        session.export(getSource(), outputStream, namespacePrefixes);
+        return null;
+      }
+    };
   }
 
+  
+  /* (non-Javadoc)
+   * @see org.mulgara.query.operation.DataOutputTx#getOp(java.net.URI)
+   */
+  @Override
+  protected SessionOp<Object,QueryException> getOp(final URI destUri) {
+    return new SessionOp<Object,QueryException>() {
+      @Override
+      public Object fn(Session session) throws QueryException {
+        session.export(getSource(), destUri, namespacePrefixes);
+        return null;
+      }
+    };
+  }
+
 }

Modified: trunk/src/jar/query/java/org/mulgara/query/operation/Insertion.java
===================================================================
--- trunk/src/jar/query/java/org/mulgara/query/operation/Insertion.java	2010-01-28 20:57:46 UTC (rev 1901)
+++ trunk/src/jar/query/java/org/mulgara/query/operation/Insertion.java	2010-01-29 21:19:02 UTC (rev 1902)
@@ -21,9 +21,10 @@
 import java.util.Set;
 
 import org.jrdf.graph.Triple;
-import org.mulgara.connection.Connection;
+import org.mulgara.connection.Connection.SessionOp;
 import org.mulgara.query.Query;
 import org.mulgara.query.QueryException;
+import org.mulgara.server.Session;
 
 /**
  * An AST element for inserting into a graph.
@@ -61,15 +62,19 @@
     super(graph, selectQuery);
   }
 
-  /**
-   * Performs the insertion.
-   * @param conn the session for the graph to insert the data into.
-   * @return Text describing the action.
+  /* (non-Javadoc)
+   * @see org.mulgara.query.operation.SessionCommand#getExecutable()
    */
-  public Object execute(Connection conn) throws QueryException {
-    if (isSelectBased()) conn.getSession().insert(graph, getSelectQuery());
-    else conn.getSession().insert(graph, getStatements());
-    return setResultMessage("Successfully inserted statements into " + graph);
+  @Override
+  protected SessionOp<String,QueryException> getExecutable() {
+    return new SessionOp<String,QueryException>() {
+      @Override
+      public String fn(Session session) throws QueryException {
+        if (isSelectBased()) session.insert(graph, getSelectQuery());
+        else session.insert(graph, getStatements());
+        return setResultMessage("Successfully inserted statements into " + graph);
+      }
+    };
   }
 
 }

Modified: trunk/src/jar/query/java/org/mulgara/query/operation/Load.java
===================================================================
--- trunk/src/jar/query/java/org/mulgara/query/operation/Load.java	2010-01-28 20:57:46 UTC (rev 1901)
+++ trunk/src/jar/query/java/org/mulgara/query/operation/Load.java	2010-01-29 21:19:02 UTC (rev 1902)
@@ -25,9 +25,11 @@
 
 import org.apache.log4j.Logger;
 import org.mulgara.connection.Connection;
+import org.mulgara.connection.Connection.SessionOp;
 import org.mulgara.query.GraphResource;
 import org.mulgara.query.QueryException;
 import org.mulgara.query.rdf.Mulgara;
+import org.mulgara.server.Session;
 
 /**
  * Represents a command to load data into a model.
@@ -37,7 +39,7 @@
  * @copyright &copy; 2007 <a href="mailto:pgearon at users.sourceforge.net">Paul Gearon</a>
  * @licence <a href="{@docRoot}/../../LICENCE.txt">Open Software License v3.0</a>
  */
-public class Load extends DataInputTx {
+public class Load extends DataInputTx<GraphResource> {
 
   /** The logger */
   static final Logger logger = Logger.getLogger(Load.class.getName());
@@ -106,7 +108,7 @@
     }
 
     try {
-      long stmtCount = isLocal() ? sendMarshalledData(conn, true) : conn.getSession().setModel(dest, srcRsc);
+      long stmtCount = isLocal() ? sendMarshalledData(conn, true) : doTx(conn, srcRsc);
       if (logger.isDebugEnabled()) logger.debug("Loaded " + stmtCount + " statements from " + src + " into " + dest);
   
       if (stmtCount > 0L) setResultMessage("Successfully loaded " + stmtCount + " statements from " + 
@@ -122,16 +124,34 @@
   }
 
 
-  /**
-   * Perform the transfer with the configured datastream.
-   * @return The number of statements affected, or <code>null</code> if this is not relevant.
+  /* (non-Javadoc)
+   * @see org.mulgara.query.operation.DataInputTx#getExecutable(java.io.InputStream)
    */
   @Override
-  protected Long doTx(Connection conn, InputStream inputStream) throws QueryException {
-    return conn.getSession().setModel(inputStream, getDestination(), srcRsc, contentType);
+  protected SessionOp<Long,QueryException> getExecutable(final InputStream inputStream) {
+    return new SessionOp<Long,QueryException>() {
+      @Override
+      public Long fn(Session session) throws QueryException {
+        return session.setModel(inputStream, getDestination(), srcRsc, contentType);
+      }
+    };
   }
 
 
+  /* (non-Javadoc)
+   * @see org.mulgara.query.operation.DataInputTx#getExecutable(java.net.URI)
+   */
+  @Override
+  protected SessionOp<Long,QueryException> getExecutable(final GraphResource src) {
+    return new SessionOp<Long,QueryException>() {
+      @Override
+      public Long fn(Session session) throws QueryException {
+        return session.setModel(getDestination(), src);
+      }
+    };
+  }
+
+
   /**
    * Get the text of the command, or generate a virtual command if no text was parsed.
    * @return The query that created this command, or a generated query if no query exists.

Modified: trunk/src/jar/query/java/org/mulgara/query/operation/LocalCommand.java
===================================================================
--- trunk/src/jar/query/java/org/mulgara/query/operation/LocalCommand.java	2010-01-28 20:57:46 UTC (rev 1901)
+++ trunk/src/jar/query/java/org/mulgara/query/operation/LocalCommand.java	2010-01-29 21:19:02 UTC (rev 1902)
@@ -26,23 +26,9 @@
  * @copyright &copy; 2007 <a href="mailto:pgearon at users.sourceforge.net">Paul Gearon</a>
  * @licence <a href="{@docRoot}/../../LICENCE.txt">Open Software License v3.0</a>
  */
-public abstract class LocalCommand implements Command {
+public abstract class LocalCommand extends AbstractCommand {
 
-  /** The text used to create this command. */
-  private String textualForm;
-
-  /** The message set by the result of this command. */
-  private String resultMessage = "";
-
   /**
-   * Indicates that the command modifies the state in a transaction.
-   * @return <code>true</code> If the transaction state is to be modified.
-   */
-  public boolean isTxCommitRollback() {
-    return false;
-  }
-  
-  /**
    * Indicates that this operation is local.
    * @return Always <code>true</code> to indicate this command is local.
    */
@@ -60,15 +46,6 @@
 
 
   /**
-   * Indicates that this command cannot return an Answer.
-   * @return <code>false</code> by default.
-   */
-  public boolean isAnswerable() {
-    return false;
-  }
-
-
-  /**
    * Gets the associated server for a non-local operation.
    * @return <code>null</code>
    */
@@ -77,31 +54,7 @@
   }
 
 
-  /** @see org.mulgara.query.operation.Command#setText(java.lang.String) */
-  public void setText(String text) {
-    textualForm = text;
-  }
-
-
   /**
-   * Returns the textual representation of this Command. Same as {@link #toString()}.
-   * @return The text of the command.
-   */
-  public String getText() {
-    return textualForm;
-  }
-
-
-  /**
-   * Returns the textual representation of this Command.
-   * @return The text of the command.
-   */
-  public String toString() {
-    return textualForm;
-  }
-
-
-  /**
    * Executes the operation. This is highly specific to each operation.
    * @return Data specific to the operation.
    * @throws Exception specific to the operation.
@@ -111,21 +64,4 @@
   }
 
 
-  /**
-   * Gets a message text relevant to the operation.  Useful for the UI.
-   * @return A text message associated with the result of this
-   * operation.
-   */
-  public String getResultMessage() {
-    return resultMessage;
-  }
-
-
-  /**
-   * Sets message text relevant to the operation.  Useful for the UI.
-   * @return The set text.
-   */
-  public String setResultMessage(String resultMessage) {
-    return this.resultMessage = resultMessage;
-  }
 }

Modified: trunk/src/jar/query/java/org/mulgara/query/operation/Modification.java
===================================================================
--- trunk/src/jar/query/java/org/mulgara/query/operation/Modification.java	2010-01-28 20:57:46 UTC (rev 1901)
+++ trunk/src/jar/query/java/org/mulgara/query/operation/Modification.java	2010-01-29 21:19:02 UTC (rev 1902)
@@ -22,7 +22,7 @@
 import org.jrdf.graph.Triple;
 import org.mulgara.query.Query;
 
-public abstract class Modification extends ServerCommand {
+public abstract class Modification extends SessionCommand {
 
   /** The graph to insert into. */
   protected final URI graph;

Modified: trunk/src/jar/query/java/org/mulgara/query/operation/Restore.java
===================================================================
--- trunk/src/jar/query/java/org/mulgara/query/operation/Restore.java	2010-01-28 20:57:46 UTC (rev 1901)
+++ trunk/src/jar/query/java/org/mulgara/query/operation/Restore.java	2010-01-29 21:19:02 UTC (rev 1902)
@@ -21,7 +21,9 @@
 import java.net.URI;
 
 import org.mulgara.connection.Connection;
+import org.mulgara.connection.Connection.SessionOp;
 import org.mulgara.query.QueryException;
+import org.mulgara.server.Session;
 
 /**
  * Represents a command to restore a server from backup data.
@@ -31,7 +33,7 @@
  * @copyright &copy; 2007 <a href="mailto:pgearon at users.sourceforge.net">Paul Gearon</a>
  * @licence <a href="{@docRoot}/../../LICENCE.txt">Open Software License v3.0</a>
  */
-public class Restore extends DataInputTx {
+public class Restore extends DataInputTx<URI> {
 
   /**
    * Creates a restore operation, restoring the server from backup data at the given location.
@@ -102,7 +104,7 @@
 
     try {
       if (isLocal()) sendMarshalledData(conn, false);
-      else conn.getSession().restore(src);
+      else doTx(conn, src);
 
       String message;
       if (dest == null) message = "Successfully restored from " + src;
@@ -118,14 +120,32 @@
     }
   }
 
-  /**
-   * Perform the transfer with the configured datastream.
-   * @return <code>0</code>, as this operation does not return a number.
+  /* (non-Javadoc)
+   * @see org.mulgara.query.operation.DataInputTx#getExecutable(java.io.InputStream)
    */
   @Override
-  protected Long doTx(Connection conn, InputStream inputStream) throws QueryException {
-    conn.getSession().restore(inputStream, getSource());
-    return 0L;
+  protected SessionOp<Long,QueryException> getExecutable(final InputStream inputStream) {
+    return new SessionOp<Long,QueryException>() {
+      @Override
+      public Long fn(Session session) throws QueryException {
+        session.restore(inputStream, getSource());
+        return 0L;
+      }
+    };
   }
 
+  /* (non-Javadoc)
+   * @see org.mulgara.query.operation.DataInputTx#getExecutable(java.lang.Object)
+   */
+  @Override
+  protected SessionOp<Long,QueryException> getExecutable(final URI srcUri) {
+    return new SessionOp<Long,QueryException>() {
+      @Override
+      public Long fn(Session session) throws QueryException {
+        session.restore(srcUri);
+        return 0L;
+      }
+    };
+  }
+
 }

Modified: trunk/src/jar/query/java/org/mulgara/query/operation/Rollback.java
===================================================================
--- trunk/src/jar/query/java/org/mulgara/query/operation/Rollback.java	2010-01-28 20:57:46 UTC (rev 1901)
+++ trunk/src/jar/query/java/org/mulgara/query/operation/Rollback.java	2010-01-29 21:19:02 UTC (rev 1902)
@@ -36,24 +36,27 @@
  * @copyright &copy; 2007 <a href="mailto:pgearon at users.sourceforge.net">Paul Gearon</a>
  * @licence <a href="{@docRoot}/../../LICENCE.txt">Open Software License v3.0</a>
  */
-public class Rollback extends TransactionCommand implements Command, TxOp {
+public class Rollback extends TransactionCommand {
 
   /**
    * Commits the transaction on a connection.
    * @param conn Contains the session to commit. 
    * @throws QueryException There was a server error commiting the transaction.
    */
-  public Object execute(Connection conn) throws QueryException {
-    Session session = conn.getSession();
-    if (session != null) {
-      session.rollback();
-      conn.setAutoCommit(true);  // this is called because stayInTx returns false
-      return setResultMessage("Successfully rolled back changes");
-    } else {
-      assert conn instanceof org.mulgara.connection.DummyConnection;
-      conn.setAutoCommit(true);
-      return setResultMessage("Successfully rolled back changes");
-    }
+  public TxExecutable getExecutable(final Connection conn) {
+    return new TxExecutable() {
+      @Override
+      public Object fn(Session session) throws QueryException {
+        if (session != null) {
+          session.rollback();
+          conn.setAutoCommit(true);  // this is called because stayInTx returns false
+          return setResultMessage("Successfully rolled back changes");
+        } else {
+          assert conn instanceof org.mulgara.connection.DummyConnection;
+          return setResultMessage("Skipped rollback for internal connection");
+        }
+      }
+    };
   }
 
 

Modified: trunk/src/jar/query/java/org/mulgara/query/operation/ServerCommand.java
===================================================================
--- trunk/src/jar/query/java/org/mulgara/query/operation/ServerCommand.java	2010-01-28 20:57:46 UTC (rev 1901)
+++ trunk/src/jar/query/java/org/mulgara/query/operation/ServerCommand.java	2010-01-29 21:19:02 UTC (rev 1902)
@@ -30,24 +30,17 @@
  * @copyright &copy; 2007 <a href="mailto:pgearon at users.sourceforge.net">Paul Gearon</a>
  * @licence <a href="{@docRoot}/../../LICENCE.txt">Open Software License v3.0</a>
  */
-public abstract class ServerCommand implements Command {
+public abstract class ServerCommand extends AbstractCommand {
 
-  /** The text used to create this command. */
-  private String textualForm = "";
-
   /** The graph being referred to on the server. */
   private final GraphResource serverGraph;
   
-  /** The message set by the result of this command. */
-  private String resultMessage;
-  
   /**
    * Creates a new command, with a principle graph URI.
    * @param serverGraphUri The URI of the graph.
    */
   public ServerCommand(URI serverGraphUri) {
     serverGraph = (serverGraphUri != null) ? new GraphResource(serverGraphUri) : null;
-    resultMessage = "";
   }
   
   
@@ -72,15 +65,6 @@
 
 
   /**
-   * Indicates that the command modifies the state in a transaction.
-   * @return <code>true</code> If the transaction state is to be modified.
-   */
-  public boolean isTxCommitRollback() {
-    return false;
-  }
-  
-
-  /**
    * Indicates that this operation is not local.
    * @return Always <code>false</code> to indicate this command is not local.
    */
@@ -97,48 +81,4 @@
   public boolean isUICommand() {
     return false;
   }
-
-
-  /**
-   * Gets a message text relevant to the operation.  Useful for the UI.
-   * @return A text message associated with the result of this
-   * operation.
-   */
-  public String getResultMessage() {
-    return resultMessage;
-  }
-
-
-  /**
-   * Indicates that this command returns an Answer. Saves the overhead of checking
-   * the return type of execute.
-   * @return <code>false</code> by default.
-   */
-  public boolean isAnswerable() {
-    return false;
-  }
-
-
-  /** @see org.mulgara.query.operation.Command#setText(java.lang.String) */
-  public void setText(String text) {
-    textualForm = text;
-  }
-
-
-  /**
-   * Returns the textual representation of this Command. Same as {@link #toString()}.
-   * @return The text of the command.
-   */
-  public String getText() {
-    return textualForm;
-  }
-
-
-  /**
-   * Sets message text relevant to the operation.  Useful for the UI.
-   * @return The set text.
-   */
-  public String setResultMessage(String resultMessage) {
-    return this.resultMessage = resultMessage;
-  }
 }

Added: trunk/src/jar/query/java/org/mulgara/query/operation/SessionCommand.java
===================================================================
--- trunk/src/jar/query/java/org/mulgara/query/operation/SessionCommand.java	                        (rev 0)
+++ trunk/src/jar/query/java/org/mulgara/query/operation/SessionCommand.java	2010-01-29 21:19:02 UTC (rev 1902)
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2010 Revelytix.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.mulgara.query.operation;
+
+import java.net.URI;
+
+import org.mulgara.connection.Connection;
+import org.mulgara.connection.Connection.SessionOp;
+import org.mulgara.query.QueryException;
+
+/**
+ * A common superclass for commands that perform an operation on a server session.
+ */
+public abstract class SessionCommand extends ServerCommand {
+
+	/**
+	 * Construct a command to operate on the given server graph URI.
+	 * @param serverGraphUri The principal graph URI involved in the operation on the server.
+	 */
+	public SessionCommand(URI serverGraphUri) {
+    super(serverGraphUri);
+  }
+
+  /* (non-Javadoc)
+   * @see org.mulgara.query.operation.Command#execute(org.mulgara.connection.Connection)
+   */
+  @Override
+	final public Object execute(Connection conn) throws QueryException {
+		return conn.execute(getExecutable());
+	}
+
+	/**
+	 * Gets the operation that will be performed on the server session.
+	 * @return The session operation that implements this command.
+	 */
+	protected abstract SessionOp<?, ? extends QueryException> getExecutable();
+}

Modified: trunk/src/jar/query/java/org/mulgara/query/operation/TransactionCommand.java
===================================================================
--- trunk/src/jar/query/java/org/mulgara/query/operation/TransactionCommand.java	2010-01-28 20:57:46 UTC (rev 1901)
+++ trunk/src/jar/query/java/org/mulgara/query/operation/TransactionCommand.java	2010-01-29 21:19:02 UTC (rev 1902)
@@ -18,6 +18,10 @@
 
 import java.net.URI;
 
+import org.mulgara.connection.Connection;
+import org.mulgara.connection.Connection.SessionOp;
+import org.mulgara.query.QueryException;
+
 /**
  * An AST element for controlling transactions.  These commands are considered
  * local, as they do not establish a new connection to a server.  However, if
@@ -64,4 +68,31 @@
     return null;
   }
 
+  /**
+   * Sets message text relevant to the operation.  Exposes this publicly, but only for internal use.
+   * @return The set text.
+   */
+  public String setResultMessage(String resultMessage) {
+    return super.setResultMessage(resultMessage);
+  }
+  
+  /* (non-Javadoc)
+   * @see org.mulgara.query.operation.Command#execute(org.mulgara.connection.Connection)
+   */
+  @Override
+  final public Object execute(Connection conn) throws QueryException {
+    return conn.execute(getExecutable(conn));
+  }
+
+  /**
+   * Gets the operation to perform on the given connection.
+   * @param conn The connection.
+   * @return The operation that implements this transactional command.
+   */
+  protected abstract TxExecutable getExecutable(Connection conn);
+  
+  /**
+   * Shorthand for an operation that returns an Object and throws a QueryException.
+   */
+  protected interface TxExecutable extends SessionOp<Object,QueryException> {}
 }




More information about the Mulgara-svn mailing list