[Mulgara-svn] r276 - in branches/blank_nodes/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed: . remote

pag at mulgara.org pag at mulgara.org
Mon May 14 21:35:35 UTC 2007


Author: pag
Date: 2007-05-14 16:35:35 -0500 (Mon, 14 May 2007)
New Revision: 276

Added:
   branches/blank_nodes/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/remote/Config.java
   branches/blank_nodes/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/remote/RemotePager.java
   branches/blank_nodes/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/remote/RemotePagerImpl.java
   branches/blank_nodes/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/remote/SetProxy.java
   branches/blank_nodes/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/remote/TripleSetAdaptor.java
Modified:
   branches/blank_nodes/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/DistributedResolverFactory.java
   branches/blank_nodes/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/NetworkDelegator.java
   branches/blank_nodes/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/remote/ShortGlobalStatementSet.java
   branches/blank_nodes/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/remote/StatementSetFactory.java
Log:
Initial commit of remote paging mechanism for Statements.  Not tested, but compiles

Modified: branches/blank_nodes/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/DistributedResolverFactory.java
===================================================================
--- branches/blank_nodes/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/DistributedResolverFactory.java	2007-05-14 18:35:26 UTC (rev 275)
+++ branches/blank_nodes/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/DistributedResolverFactory.java	2007-05-14 21:35:35 UTC (rev 276)
@@ -14,8 +14,6 @@
 package org.mulgara.resolver.distributed;
 
 // Java 2 standard packages
-import java.io.*;
-import java.net.*;
 import java.util.*;
 
 // Third party packages

Modified: branches/blank_nodes/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/NetworkDelegator.java
===================================================================
--- branches/blank_nodes/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/NetworkDelegator.java	2007-05-14 18:35:26 UTC (rev 275)
+++ branches/blank_nodes/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/NetworkDelegator.java	2007-05-14 21:35:35 UTC (rev 276)
@@ -31,7 +31,7 @@
 import org.mulgara.server.NonRemoteSessionException;
 import org.mulgara.server.driver.SessionFactoryFinder;
 import org.mulgara.server.driver.SessionFactoryFinderException;
-import org.mulgara.resolver.distributed.rmi.StatementSetFactory;
+import org.mulgara.resolver.distributed.remote.StatementSetFactory;
 import org.mulgara.resolver.spi.GlobalizeException;
 import org.mulgara.resolver.spi.Resolution;
 import org.mulgara.resolver.spi.ResolverException;
@@ -192,7 +192,7 @@
             model
     );
 
-    // convert the variable set to a variable list
+    // convert the variable set to a variable list - add types via unchecked casts
     List<Variable> variables = new ArrayList<Variable>((Set<Variable>)globalConstraint.getVariables());
     // build the new query
     return new Query(variables, new ModelResource(model.getURI()), globalConstraint, null, Collections.EMPTY_LIST, null, 0, new UnconstrainedAnswer());

Added: branches/blank_nodes/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/remote/Config.java
===================================================================
--- branches/blank_nodes/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/remote/Config.java	2007-05-14 18:35:26 UTC (rev 275)
+++ branches/blank_nodes/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/remote/Config.java	2007-05-14 21:35:35 UTC (rev 276)
@@ -0,0 +1,68 @@
+/*
+ * The contents of this file are subject to the Open Software License
+ * Version 3.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ * http://www.opensource.org/licenses/osl-3.0.txt
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and limitations
+ * under the License.
+ */
+
+package org.mulgara.resolver.distributed.remote;
+
+/**
+ * Defines a set of property names and default values for use with paged remote sets.
+ *
+ * @created 2007-04-23
+ * @author <a href="mailto:gearon at users.sourceforge.net">Paul Gearon</a>
+ * @copyright &copy; 2007 <a href="mailto:pgearon at users.sourceforge.net">Paul Gearon</a>
+ * @licence <a href="{@docRoot}/../../LICENCE.txt">Open Software License v3.0</a>
+ */
+public class Config {
+
+  /** Default maximum number of pages that may be queued. */
+  private static final int DEFAULT_MAX_PAGES = 100;
+
+  /** Name of the Maximum Pages property. */
+  private static final String MAX_PAGES_PROPERTY = "pagedset.pages.max";
+
+  /** Default maximum time to wait for a page to arrive, in milliseconds. */
+  private static final long DEFAULT_TIMEOUT = 10000;
+
+  /** Name of the timeout property. */
+  private static final String TIMEOUT_PROPERTY = "pagedset.timeout";
+
+  /** Default number of entries in a page. */
+  private static final int DEFAULT_PAGE_SIZE = 64;
+
+  /** Name of the page size property. */
+  private static final String PAGE_SIZE_PROPERTY = "pagedset.pages.size";
+
+
+  /**
+   * Get the maximum number of pages to keep in memory at once.
+   * @return The maximum number of pages to keep.
+   */
+  public static int getMaxPages() {
+    return Integer.getInteger(MAX_PAGES_PROPERTY, DEFAULT_MAX_PAGES).intValue();
+  }
+
+  /**
+   * Get the maximum time to wait for a page to arrive, in milliseconds.
+   * @return The maximum timeout for a remote call.
+   */
+  public static long getTimeout() {
+    return Long.getLong(TIMEOUT_PROPERTY, DEFAULT_TIMEOUT).longValue();
+  }
+
+  /**
+   * Get the size of pages to transfer.
+   * @return The number of entries in a page.
+   */
+  public static int getPageSize() {
+    return Integer.getInteger(PAGE_SIZE_PROPERTY, DEFAULT_PAGE_SIZE).intValue();
+  }
+
+}

Added: branches/blank_nodes/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/remote/RemotePager.java
===================================================================
--- branches/blank_nodes/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/remote/RemotePager.java	2007-05-14 18:35:26 UTC (rev 275)
+++ branches/blank_nodes/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/remote/RemotePager.java	2007-05-14 21:35:35 UTC (rev 276)
@@ -0,0 +1,34 @@
+/*
+ * The contents of this file are subject to the Open Software License
+ * Version 3.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ * http://www.opensource.org/licenses/osl-3.0.txt
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and limitations
+ * under the License.
+ */
+
+package org.mulgara.resolver.distributed.remote;
+
+import java.rmi.*;
+import java.io.Serializable;
+
+/**
+ * The interface for paging an iterable object over RMI.
+ * @param <E> The elements of the paged list.
+ *
+ * @created 2007-04-23
+ * @author <a href="mailto:gearon at users.sourceforge.net">Paul Gearon</a>
+ * @copyright &copy; 2007 <a href="mailto:pgearon at users.sourceforge.net">Paul Gearon</a>
+ * @licence <a href="{@docRoot}/../../LICENCE.txt">Open Software License v3.0</a>
+ */
+public interface RemotePager<E extends Serializable> extends Remote {
+
+  public int size() throws RemoteException;
+
+  public E[] firstPage() throws RemoteException;
+
+  public E[] nextPage() throws RemoteException;
+}

Added: branches/blank_nodes/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/remote/RemotePagerImpl.java
===================================================================
--- branches/blank_nodes/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/remote/RemotePagerImpl.java	2007-05-14 18:35:26 UTC (rev 275)
+++ branches/blank_nodes/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/remote/RemotePagerImpl.java	2007-05-14 21:35:35 UTC (rev 276)
@@ -0,0 +1,126 @@
+/*
+ * The contents of this file are subject to the Open Software License
+ * Version 3.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ * http://www.opensource.org/licenses/osl-3.0.txt
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and limitations
+ * under the License.
+ */
+
+package org.mulgara.resolver.distributed.remote;
+
+import java.util.*;
+import java.rmi.*;
+import java.lang.reflect.Array;
+import java.io.Serializable;
+import java.rmi.server.UnicastRemoteObject;
+import java.util.logging.*;
+
+/**
+ * Implements the remote pager by iterating on a list and moving pages of elements over RMI.
+ * @param <E> The elements of the paged list.
+ *
+ * @created 2007-04-23
+ * @author <a href="mailto:gearon at users.sourceforge.net">Paul Gearon</a>
+ * @copyright &copy; 2007 <a href="mailto:pgearon at users.sourceforge.net">Paul Gearon</a>
+ * @licence <a href="{@docRoot}/../../LICENCE.txt">Open Software License v3.0</a>
+ */
+public class RemotePagerImpl<E extends Serializable> implements RemotePager<E> {
+
+  /** Logger. */
+  protected static final Logger logger = Logger.getLogger(RemotePagerImpl.class.getName());
+
+  /** The type of the wrapped class.  Used for creating pages of elements in arrays. */
+  private Class<E> type;
+
+  /** Stored size of the wrapped collection. */
+  private final int size;
+  
+  /** The objects to page over RMI. */
+  private Collection<E> collection;
+
+  /** Internal iterator for the collection. */
+  private Iterator<E> iter;
+
+  /** The latest page of data. */
+  private E[] currentPage;
+
+  /** The size of a data page. */
+  private final int pageSize = Config.getPageSize();
+
+  /**
+   * Creates a new remote paging object.
+   * @param type The java.lang.Class of the elements to be paged.
+   * @param collection The data to be paged.
+   * @throws RemoteException If the data cannot be sent over RMI.
+   */
+  @SuppressWarnings("unchecked")
+  public RemotePagerImpl(Class<E> type, Collection<E> collection) throws RemoteException {
+    this.type = type;
+    this.collection = collection;
+    size = collection.size();
+    iter = null;
+    currentPage = (E[])Array.newInstance(type, pageSize);
+    UnicastRemoteObject.exportObject(this);
+  }
+
+
+  /**
+   * Gets the number of items in the underlying data.
+   */
+  public int size() {
+    return size;
+  }
+
+  
+  /**
+   * Gets the first page of data as an array with length equal to the size of the page.
+   * @return an array of elements.
+   */
+  public E[] firstPage() throws RemoteException {
+    iter = collection.iterator();
+    return fillPage();
+  }
+
+  
+  /**
+   * Gets the next page of data as an array with length equal to the size of the page.
+   * @return an array of elements.
+   */
+  public E[] nextPage() throws RemoteException {
+    return fillPage();
+  }
+
+  
+  /**
+   * Populates the current page with elements from the underlying collection.
+   * @return The current page.
+   */
+  private E[] fillPage() {
+    logger.finest("Filling page");
+    for (int i = 0; i < pageSize; i++) {
+      if (!iter.hasNext()) return truncatePage(i);
+      currentPage[i] = iter.next();
+    }
+    return currentPage;
+  }
+
+  
+  /**
+   * Reduces the size of an array if there are fewer valid elements than the length of the array.
+   * @param The size of the array.
+   * @return A new current page.
+   */
+  @SuppressWarnings("unchecked")
+  private E[] truncatePage(int offset) {
+    if (offset == 0) return null;
+    logger.finest("Building array of type: " + type +", with length: " + offset);
+    E[] result = (E[])Array.newInstance(type, offset);
+    System.arraycopy(currentPage, 0, result, 0, offset);
+    return result;
+  }
+
+}

Added: branches/blank_nodes/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/remote/SetProxy.java
===================================================================
--- branches/blank_nodes/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/remote/SetProxy.java	2007-05-14 18:35:26 UTC (rev 275)
+++ branches/blank_nodes/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/remote/SetProxy.java	2007-05-14 21:35:35 UTC (rev 276)
@@ -0,0 +1,299 @@
+package org.mulgara.resolver.distributed.remote;
+
+import java.util.*;
+import java.util.logging.*;
+import java.rmi.RemoteException;
+import java.io.Serializable;
+
+/**
+ * Represents an iterable object on a remote system as a local Set.
+ * Created at the server side, and sent across the network.
+ */
+public class SetProxy<E extends Serializable> extends AbstractSet<E> implements Serializable {
+
+  /** Serial ID for versioning. */
+  private static final long serialVersionUID = -8343698708605937025L;
+
+  /** Logger. */
+  protected static final Logger logger = Logger.getLogger(RemotePagerImpl.class.getName());
+
+  /** Stores the currently running iterator. */
+  private static Object currentIterator = null;
+
+  /** A pager for returning sequential pages of a remote collection. */
+  private final RemotePager<E> remotePager;
+
+  /** The size of the remote collection. */
+  private final int cachedSize;
+
+  /**
+   * Creates a new proxy for a remote collection, meeting the Set interface.
+   * @param remotePager A device for sending data from the remote collection one page at a time.
+   */
+  public SetProxy(RemotePager<E> remotePager) {
+    this.remotePager = remotePager;
+    // local call for size
+    try {
+      cachedSize = remotePager.size();
+    } catch (RemoteException re) {
+      throw new IllegalStateException("The proxy should be instantiated on the host side");
+    }
+  }
+
+
+  /**
+   * Returns the number of elements in the underlying collection.
+   * @return The size of the collection.
+   */
+  public int size() {
+    return cachedSize;
+  }
+
+
+  /**
+   * Returns an iterator which will access all the remote data.
+   * NOTE: The current implementation allows only one iterator to be active at a time!
+   * @return A new iterator for the remote data.
+   */
+  public Iterator<E> iterator() {
+    return new PagedIterator();
+  }
+
+
+  /**
+   * An iterator class for traversing remote data.  Network activity is reduced by moving
+   * data in large pages at a time.
+   */
+  private class PagedIterator implements Iterator<E> {
+
+    /** A thread for managing bringing the pages over the network. */
+    private Pager pager;
+
+    /** The most recent page of data. */
+    private E[] currentPage;
+
+    /** The current position in the current page of data. */
+    int index;
+
+    /**
+     * Create a new iterator for traversing pages of data.
+     */
+    public PagedIterator() {
+      currentPage = null;
+      index = 0;
+      currentIterator = this;
+      logger.info("Starting pager");
+      pager = new Pager();
+      currentPage = pager.nextPage();
+      logger.info("Started pager");
+    }
+
+    
+    /**
+     * Remove the current element from the data.  Unsupported.
+     */
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+
+
+    /**
+     * Queries the data to check if more data exists.  Should not need to block.
+     * @return <code>true</code> if more data exists.
+     * @throws ConcurrentModificationException If more than one iterator is active.
+     */
+    public boolean hasNext() {
+      logger.info("called SetProxy$Iterator.hasNext()");
+      testState();
+      if (currentPage != null && index < currentPage.length) return true;
+      return currentPage != null;
+    }
+
+
+    /**
+     * Returns the next element of the data.  Will block until data is available.
+     * @return The next item of data in sequence.
+     * @throws ConcurrentModificationException If more than one iterator is active.
+     */
+    public E next() {
+      logger.info("called SetProxy$Iterator.next()");
+      testState();
+      logger.info("Accessing element " + index + " of " + currentPage.length);
+
+      if (currentPage != null && index < currentPage.length) return nextPageElement();
+      return nextPageElement();
+    }
+
+
+    /**
+     * Gets the next element out of the current page.
+     * @return The next element from the current page.
+     */
+    private E nextPageElement() {
+      logger.info("Getting next page element");
+      E element = currentPage[index++];
+      if (index == currentPage.length) updatePage();
+      return element;
+    }
+
+
+    /**
+     * Moves to the next page, if another page is available.
+     */
+    private void updatePage() {
+      logger.info("Moving to next page");
+      currentPage = pager.nextPage();
+      index = 0;
+    }
+
+
+    /**
+     * Check that this is the only iterator being accessed at the moment.
+     * @throws ConcurrentModificationException If this iterator is being accessed
+     *         after a new iterator has been created.
+     */
+    private void testState() {
+      if (currentIterator != this) {
+        throw new ConcurrentModificationException("Unable to use more than one remote iterator on the set");
+      }
+    }
+
+
+    /**
+     * Private thread for getting the next page in the background.
+     */
+    private class Pager extends Thread {
+
+      /** The maximum number of pages that may be queued. */
+      private final int maxPages = Config.getMaxPages();
+
+      /** Maximum time to wait for a page to arrive, in milliseconds. */
+      private final long timeout = Config.getTimeout();
+
+      /** Indicates that the thread has finished. */
+      private boolean complete;
+
+      /** The retrieved pages. */
+      private Queue<E[]> retrievedPages;
+
+      /** Stores exception when one occurs. */
+      private PagerException lastException;
+
+      /**
+       * Initialize and start the thread.
+       * Main thread.
+       */
+      public Pager() {
+        lastException = null;
+        retrievedPages = new LinkedList<E[]>();
+        try {
+          logger.info("Getting first page");
+          E[] page = remotePager.firstPage();
+          if (page != null) {
+            logger.info("Got data in first page: size=" + page.length);
+            retrievedPages.add(page);
+            complete = false;
+            start();
+          } else logger.info("Empty initial page");
+        } catch (RemoteException re) {
+          throw new PagerException("Unable to get the first page", re);
+        }
+      }
+
+
+      /**
+       * Checks if the thread is active.  Main thread.
+       * @return <code>false</code> if the thread is still running, <code>true</code> when complete.
+       */
+      public boolean isComplete() {
+        if (lastException != null) throw lastException;
+        return complete;
+      }
+
+
+      /**
+       * Pick up all the pages.
+       * Runs in the background Paging thread.
+       */
+      public void run() {
+        try {
+          while (true) {
+            synchronized (retrievedPages) {
+              while (retrievedPages.size() >= maxPages) {
+                try {
+                  logger.info("Waiting for queue to empty.  Currently at: " + retrievedPages.size());
+                  retrievedPages.wait();
+                } catch (InterruptedException ie) { }
+              }
+            }
+            E[] page = remotePager.nextPage();
+            if (page == null) {
+              logger.info("Got final page");
+              break;
+            }
+            logger.info("Got next page.  size=" + page.length);
+            synchronized (retrievedPages) {
+              retrievedPages.add(page);
+              logger.info("Queue now at " + retrievedPages.size() + " pages");
+            }
+            synchronized (this) {
+              this.notify();
+            }
+          }
+        } catch (RemoteException re) {
+          logger.severe("Error retrieving remote data: " + re.getMessage());
+          lastException = new PagerException("Unable to retrieve page", re);
+        }
+        complete = true;
+      }
+
+
+      /**
+       * Get the next page, if available.  The page will be an array of the configured length,
+       * or shorter if it is the last page.  If there is no more data, then <code>null</code>
+       * will be returned.
+       * Runs in the Main thread.
+       * @return The next page of data, or <code>null</code> if no more data exists.
+       */
+      public E[] nextPage() {
+        logger.info("Request for next page");
+        if (lastException != null) throw lastException;
+        E[] page;
+        long startTime = System.currentTimeMillis();
+        while (true) {
+          synchronized (retrievedPages) {
+            int oldSize = retrievedPages.size();
+            logger.info("Queue has " + oldSize + " pages");
+            page = retrievedPages.poll();
+            if (oldSize >= maxPages) retrievedPages.notify();
+            logger.info("page @" + page);
+          }
+          long waitTime = timeout + startTime - System.currentTimeMillis();
+          if (waitTime <= 0) throw new PagerException("Timed out waiting for page");
+          try {
+            synchronized (this) {
+              if (page == null && !complete) {
+                logger.info("Waiting for more pages to arrive");
+                this.wait(waitTime);
+              } else break;
+            }
+          } catch (InterruptedException ie) { }
+          if (System.currentTimeMillis() - startTime >= timeout) throw new PagerException("Timed out waiting for page");
+        }
+        logger.info("Returning page = " + page);
+        return page;
+      }
+    }
+  }
+
+
+  /** Exception class for paging.  Must be runtime so it can be thrown through Set interface. */
+  @SuppressWarnings("serial")
+  public static class PagerException extends RuntimeException {
+    public PagerException() { }
+    public PagerException(String message) { super(message); }
+    public PagerException(String message, RemoteException cause) { super(message, cause); }
+    public PagerException(RemoteException cause) { super(cause); }
+  }
+}
+

Modified: branches/blank_nodes/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/remote/ShortGlobalStatementSet.java
===================================================================
--- branches/blank_nodes/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/remote/ShortGlobalStatementSet.java	2007-05-14 18:35:26 UTC (rev 275)
+++ branches/blank_nodes/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/remote/ShortGlobalStatementSet.java	2007-05-14 21:35:35 UTC (rev 276)
@@ -1,3 +1,15 @@
+/*
+ * The contents of this file are subject to the Open Software License
+ * Version 3.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ * http://www.opensource.org/licenses/osl-3.0.txt
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and limitations
+ * under the License.
+ */
+
 package org.mulgara.resolver.distributed.remote;
 
 import java.io.Serializable;

Modified: branches/blank_nodes/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/remote/StatementSetFactory.java
===================================================================
--- branches/blank_nodes/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/remote/StatementSetFactory.java	2007-05-14 18:35:26 UTC (rev 275)
+++ branches/blank_nodes/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/remote/StatementSetFactory.java	2007-05-14 21:35:35 UTC (rev 276)
@@ -13,8 +13,10 @@
 
 package org.mulgara.resolver.distributed.remote;
 
+import java.rmi.RemoteException;
 import java.util.Set;
 
+import org.jrdf.graph.Triple;
 import org.mulgara.query.TuplesException;
 import org.mulgara.resolver.spi.GlobalizeException;
 import org.mulgara.resolver.spi.ResolverSession;
@@ -37,7 +39,11 @@
     // make sure the WATER_MARK refers to a set that is indexable by integer
     assert (long)(int)WATER_MARK == WATER_MARK;
     if (statements.getRowUpperBound() < WATER_MARK) return new ShortGlobalStatementSet(statements, session);
-    // TODO return a remote set
-    return null;
+    try {
+      RemotePager<Triple> pager = new RemotePagerImpl<Triple>(Triple.class, new TripleSetAdaptor(statements, session));
+      return new SetProxy<Triple>(pager);
+    } catch (RemoteException re) {
+      throw new TuplesException("Error accessing remote data", re);
+    }
   }
 }

Added: branches/blank_nodes/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/remote/TripleSetAdaptor.java
===================================================================
--- branches/blank_nodes/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/remote/TripleSetAdaptor.java	2007-05-14 18:35:26 UTC (rev 275)
+++ branches/blank_nodes/src/jar/resolver-distributed/java/org/mulgara/resolver/distributed/remote/TripleSetAdaptor.java	2007-05-14 21:35:35 UTC (rev 276)
@@ -0,0 +1,116 @@
+/*
+ * The contents of this file are subject to the Open Software License
+ * Version 3.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ * http://www.opensource.org/licenses/osl-3.0.txt
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and limitations
+ * under the License.
+ */
+
+package org.mulgara.resolver.distributed.remote;
+
+import java.util.AbstractSet;
+import java.util.Iterator;
+
+import org.jrdf.graph.ObjectNode;
+import org.jrdf.graph.PredicateNode;
+import org.jrdf.graph.SubjectNode;
+import org.jrdf.graph.Triple;
+import org.mulgara.query.TuplesException;
+import org.mulgara.query.rdf.TripleImpl;
+import org.mulgara.resolver.spi.GlobalizeException;
+import org.mulgara.resolver.spi.ResolverSession;
+import org.mulgara.resolver.spi.Statements;
+
+/**
+ * Converts a StatementSet into a List of Triple.
+ *
+ * @created 2007-05-14
+ * @author <a href="mailto:pgearon at users.sourceforge.net">Paul Gearon</a>
+ * @copyright &copy; 2007 <a href="mailto:pgearon at users.sourceforge.net">Paul Gearon</a>
+ * @licence <a href="{@docRoot}/../../LICENCE.txt">Open Software License v3.0</a>
+ */
+public class TripleSetAdaptor extends AbstractSet<Triple> {
+
+  /** The underlying set of statements. */
+  private Statements statements;
+  
+  /** A ResolverSession for use in globalizing nodes. */
+  private ResolverSession session;
+  
+  /**
+   * Builds a new iterable List of Triple, based on a Statements.
+   * @param statements The statements to adapt to a List.
+   */
+  public TripleSetAdaptor(Statements statements, ResolverSession session) {
+    this.statements = statements;
+    this.session = session;
+  }
+  
+  
+  @Override
+  public int size() {
+    try {
+      long size = statements.getRowCount();
+      return (long)(int)size == size ? (int)size : Integer.MAX_VALUE;
+    } catch (TuplesException te) {
+      throw new RuntimeException("Unable to get result size", te);
+    }
+  }
+
+
+  @Override
+  public Iterator<Triple> iterator() {
+    return new TripleCursorIterator(statements);
+  }
+  
+  
+  /**
+   * An iterator class for Cursor objects.
+   */
+  public class TripleCursorIterator implements Iterator<Triple> {
+    
+    /** The cursor to iterate on. */
+    private Statements s;
+
+    /** A flag to indicate if there is more data. */
+    private boolean hasNext;
+    
+    TripleCursorIterator(Statements s) {
+      this.s = s;
+      try {
+        hasNext = s.getRowCardinality() != 0;
+      } catch (TuplesException te) {
+        throw new RuntimeException("Unable to access result size", te);
+      }
+    }
+
+    public boolean hasNext() {
+      return hasNext;
+    }
+
+    public Triple next() {
+      try {
+        hasNext = s.next();
+        return new TripleImpl(
+            (SubjectNode)session.globalize(statements.getSubject()),
+            (PredicateNode)session.globalize(statements.getPredicate()),
+            (ObjectNode)session.globalize(statements.getObject())
+        );
+      } catch (TuplesException te) {
+        throw new RuntimeException("Unable to access result data", te);
+      } catch (GlobalizeException ge) {
+        throw new RuntimeException("Unable to globalize result data", ge);
+      }
+    }
+
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+  }
+  
+
+}




More information about the Mulgara-svn mailing list