[Mulgara-svn] r1960 - in trunk/src/jar/util/java/org/mulgara/util: . io

pag at mulgara.org pag at mulgara.org
Fri Jul 2 05:12:58 UTC 2010


Author: pag
Date: 2010-07-02 05:12:58 +0000 (Fri, 02 Jul 2010)
New Revision: 1960

Added:
   trunk/src/jar/util/java/org/mulgara/util/io/
   trunk/src/jar/util/java/org/mulgara/util/io/LBufferedFile.java
   trunk/src/jar/util/java/org/mulgara/util/io/LIOBufferedFile.java
   trunk/src/jar/util/java/org/mulgara/util/io/LMappedBufferedFile.java
   trunk/src/jar/util/java/org/mulgara/util/io/LReadOnlyIOBufferedFile.java
Log:
Initial code for reading large files. Not yet tested

Added: trunk/src/jar/util/java/org/mulgara/util/io/LBufferedFile.java
===================================================================
--- trunk/src/jar/util/java/org/mulgara/util/io/LBufferedFile.java	                        (rev 0)
+++ trunk/src/jar/util/java/org/mulgara/util/io/LBufferedFile.java	2010-07-02 05:12:58 UTC (rev 1960)
@@ -0,0 +1,161 @@
+/*
+ * Copyright 2010 Paul Gearon
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.mulgara.util.io;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import org.apache.log4j.Logger;
+
+/**
+ * An abstraction for reading and writing to files. 
+ * @author Paul Gearon
+ *
+ */
+public abstract class LBufferedFile {
+
+  private final static Logger logger = Logger.getLogger(LBufferedFile.class);
+
+  /** The property for the block type. May be "direct" or "javaHeap" */
+  public static final String MEM_TYPE_PROP = "mulgara.xa.memoryType";
+  
+  /** The property for the IO type. May be "mapped" or "explicit" */
+  public static final String IO_TYPE_PROP = "mulgara.xa.forceIOType";
+
+  /** Enumeration of the different memory types for blocks */
+  enum BlockMemoryType { DIRECT, HEAP };
+
+  /** Enumeration of the different memory types for blocks */
+  enum IOType { MAPPED, EXPLICIT };
+
+  /** Native ordering of the bytes */
+  ByteOrder NATIVE_ORDER = ByteOrder.nativeOrder();
+
+  /** The default value to use for the block memory type. Used when nothing is configured. */
+  private static final BlockMemoryType DEFAULT = BlockMemoryType.DIRECT;
+
+  /** The configured type of block type to use. */
+  private static final BlockMemoryType BLOCK_TYPE;
+
+  static {
+    String defBlockType = System.getProperty(MEM_TYPE_PROP, DEFAULT.name());
+    if (defBlockType.equalsIgnoreCase(BlockMemoryType.DIRECT.name())) {
+      BLOCK_TYPE = BlockMemoryType.DIRECT;
+    } else if (defBlockType.equalsIgnoreCase(BlockMemoryType.HEAP.name())) {
+      BLOCK_TYPE = BlockMemoryType.HEAP;
+    } else {
+      logger.warn("Invalid value for property " + MEM_TYPE_PROP + ": " + defBlockType);
+      BLOCK_TYPE = DEFAULT;
+    }
+  }
+
+
+  /** The file being accessed */
+  protected RandomAccessFile file;
+
+  /**
+   * Creates new buffered access for a file.
+   * @param file the file to provide buffered access for.
+   */
+  LBufferedFile(RandomAccessFile file) {
+    this.file = file;
+  }
+
+  /**
+   * Create read only buffered file access. This will be a mapped file, unless overridden.
+   * @param file The file to access.
+   * @return The file for buffered access.
+   * @throws IOException If there is an error accessing the file.
+   */
+  public static LBufferedFile newReadOnlyFile(RandomAccessFile file) throws IOException {
+    String ioTypeProp = System.getProperty("mulgara.xa.forceIOType", "mapped");
+
+    IOType ioType = IOType.MAPPED;
+    if (ioTypeProp.equalsIgnoreCase(IOType.MAPPED.name())) {
+      ioType = IOType.MAPPED;
+    } else if (ioTypeProp.equalsIgnoreCase(IOType.EXPLICIT.name())) {
+      ioType = IOType.EXPLICIT;
+    } else {
+      logger.warn("Invalid value for property mulgara.xa.forceIOType: " + ioTypeProp);
+    }
+
+    if (ioType == IOType.MAPPED) return new LMappedBufferedFile(file);
+    else return new LReadOnlyIOBufferedFile(file);
+  }
+
+  /**
+   * Reads a buffer from a file, at a given position.
+   * @param offset The offset to get the buffer from.
+   * @param length The required size of the buffer.
+   * @return The buffer that was read.
+   */
+  public abstract ByteBuffer read(long offset, int length) throws IOException;
+
+  /**
+   * Create a buffer that will be used for writing to a file at a given location.
+   * @param offset The location in the file that the buffer will write to.
+   * @param length The size of the buffer.
+   * @return The buffer for accessing that part of the file.
+   */
+  public abstract ByteBuffer allocate(long offset, int length) throws IOException;
+
+  /**
+   * Puts the contents of a buffer into the file. This will go back to wherever
+   * it was supposed to go.
+   * @param data The buffer to be written.
+   */
+  public abstract void write(ByteBuffer data) throws IOException;
+
+  /**
+   * Moves the current file position.
+   * @param offset The new position in the file.
+   */
+  public abstract void seek(long offset) throws IOException;
+
+  /**
+   * Ensures that all data written to this file is forced to disk.
+   * @throws IOException If there is an IO error accessing the disk.
+   */
+  public void force() throws IOException {
+    file.getChannel().force(false);
+  }
+
+  /**
+   * Allocates data according to the system configured memory model.
+   * @param size The number of bytes in the allocated buffer.
+   * @return the allocated buffer.
+   */
+  protected ByteBuffer allocate(int size) {
+    return BLOCK_TYPE == BlockMemoryType.DIRECT ?
+        ByteBuffer.allocateDirect(size).order(NATIVE_ORDER) :
+        ByteBuffer.allocate(size).order(NATIVE_ORDER);
+  }
+
+  /**
+   * Allocates data according to the system configured memory model.
+   * @param size The number of bytes in the allocated buffer.
+   * @param order The byteorder to use in the buffer.
+   * @return the allocated buffer.
+   */
+  protected ByteBuffer allocate(int size, ByteOrder order) {
+    return BLOCK_TYPE == BlockMemoryType.DIRECT ?
+        ByteBuffer.allocateDirect(size).order(order) :
+        ByteBuffer.allocate(size).order(order);
+  }
+}

Added: trunk/src/jar/util/java/org/mulgara/util/io/LIOBufferedFile.java
===================================================================
--- trunk/src/jar/util/java/org/mulgara/util/io/LIOBufferedFile.java	                        (rev 0)
+++ trunk/src/jar/util/java/org/mulgara/util/io/LIOBufferedFile.java	2010-07-02 05:12:58 UTC (rev 1960)
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2010 Paul Gearon
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.mulgara.util.io;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.util.WeakHashMap;
+
+/**
+ * Object for reading and writing to files using ByteBuffers and standard IO. 
+ * @author pag
+ */
+public class LIOBufferedFile extends LBufferedFile {
+
+  private WeakHashMap<SystemBuffer,Long> offsets = new WeakHashMap<SystemBuffer,Long>();
+
+  public LIOBufferedFile(RandomAccessFile file) {
+    super(file);
+  }
+
+  @Override
+  public ByteBuffer read(long offset, int length) throws IOException {
+    ByteBuffer data = allocate(length);
+    synchronized (file) {
+      file.seek(offset);
+      file.readFully(data.array());
+    }
+    return data;
+  }
+
+  @Override
+  public ByteBuffer allocate(long offset, int length) {
+    ByteBuffer data = allocate(length);
+    offsets.put(new SystemBuffer(data), offset);
+    return data;
+  }
+
+  @Override
+  public void write(ByteBuffer data) throws IOException {
+    synchronized (file) {
+      Long offset = offsets.get(new SystemBuffer(data));
+      // if the offset is unknown, then the caller must have done a seek()
+      if (offset != null) file.seek(offset);
+      file.write(data.array());
+    }
+  }
+
+  @Override
+  public void seek(long offset) throws IOException {
+    file.seek(offset);
+  }
+
+  /**
+   * A wrapper class for allowing ByteBuffers to be mapped by == instead of their internal method.
+   */
+  class SystemBuffer {
+    ByteBuffer buffer;
+    public SystemBuffer(ByteBuffer b) { buffer = b; }
+    public boolean equals(Object o) { return buffer == ((SystemBuffer)o).buffer; }
+    public int hashCode() { return System.identityHashCode(buffer); }
+  }
+}

Added: trunk/src/jar/util/java/org/mulgara/util/io/LMappedBufferedFile.java
===================================================================
--- trunk/src/jar/util/java/org/mulgara/util/io/LMappedBufferedFile.java	                        (rev 0)
+++ trunk/src/jar/util/java/org/mulgara/util/io/LMappedBufferedFile.java	2010-07-02 05:12:58 UTC (rev 1960)
@@ -0,0 +1,108 @@
+package org.mulgara.util.io;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+
+/**
+ * A memory mapped read-only version of LBufferedFile
+ * @author pag
+ */
+public class LMappedBufferedFile extends LBufferedFile {
+
+  /** The size of a page to be mapped */
+  private static final int PAGE_SIZE = 33554432; // 32 MB
+
+  /** The channel to the file being accessed */
+  private FileChannel fc;
+
+  /** All the pages of mapped buffers */
+  private MappedByteBuffer[] buffers;
+
+  /**
+   * Create a new buffered file for Long seeks.
+   * @param file The file to buffer.
+   * @throws IOException There was an error setting up initial mapping of the file.
+   */
+  public LMappedBufferedFile(RandomAccessFile file) throws IOException {
+    super(file);
+    fc = file.getChannel();
+    mapFile();
+  }
+
+  @Override
+  public ByteBuffer read(long offset, int length) throws IOException {
+    if (offset + length > buffers.length * PAGE_SIZE) mapFile();
+    int page = (int)(offset / PAGE_SIZE);
+    int page_offset = (int)(offset % PAGE_SIZE);
+    if (page_offset + length <= PAGE_SIZE) {
+      ByteBuffer bb = buffers[page].asReadOnlyBuffer();
+      bb.position(page_offset);
+      bb.limit(length);
+      return bb.slice();
+    } else {
+      ByteBuffer data = allocate(length);
+      byte[] dataArray = data.array();
+      ByteBuffer tmp = buffers[page].asReadOnlyBuffer();
+      tmp.position(page_offset);
+      int firstSlice = PAGE_SIZE - page_offset;
+      tmp.get(dataArray, 0, firstSlice);
+      tmp = buffers[page + 1].asReadOnlyBuffer();
+      tmp.get(dataArray, firstSlice, length - firstSlice);
+      return data;
+    }
+  }
+
+  @Override
+  public ByteBuffer allocate(long offset, int length) throws IOException {
+    return read(offset, length);
+  }
+
+  @Override
+  public void write(ByteBuffer data) throws IOException {
+    // no-op, even if read/write
+  }
+
+  @Override
+  public void seek(long offset) throws IOException {
+    // no-op
+  }
+
+  /**
+   * Map the entire file
+   * @throws IOException If there is an error mapping the file
+   */
+  synchronized void mapFile() throws IOException {
+    long size = fc.size();
+    // get all the pages, including a possible partial page
+    int pages = (int)((size + PAGE_SIZE - 1) / PAGE_SIZE);
+    // get all the full pages. Either pages or pages-1
+    int fullPages = (int)(size / PAGE_SIZE);
+
+    // create a larger buffers array, with all the original buffers in it
+    // except any partial pages
+    MappedByteBuffer[] newBuffers = new MappedByteBuffer[pages];
+    int start = 0;
+    if (buffers != null) {
+      int topBuffer = buffers.length - 1;
+      if (buffers[topBuffer].limit() < PAGE_SIZE) {
+        buffers[topBuffer] = null;
+      } else {
+        topBuffer++;
+      }
+      System.arraycopy(buffers, 0, newBuffers, 0, topBuffer);
+      start = buffers.length;
+    }
+
+    // fill in the rest of the new array
+    for (int page = start; page < fullPages; page++) {
+      newBuffers[page] = fc.map(FileChannel.MapMode.READ_ONLY, page * PAGE_SIZE, PAGE_SIZE);
+    }
+    // if there's a partial page at the end, then map it
+    if (fullPages < pages) newBuffers[fullPages] = fc.map(FileChannel.MapMode.READ_ONLY, fullPages * PAGE_SIZE, size % PAGE_SIZE);
+
+    buffers = newBuffers;
+  }
+}

Added: trunk/src/jar/util/java/org/mulgara/util/io/LReadOnlyIOBufferedFile.java
===================================================================
--- trunk/src/jar/util/java/org/mulgara/util/io/LReadOnlyIOBufferedFile.java	                        (rev 0)
+++ trunk/src/jar/util/java/org/mulgara/util/io/LReadOnlyIOBufferedFile.java	2010-07-02 05:12:58 UTC (rev 1960)
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2010 Paul Gearon
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.mulgara.util.io;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+
+/**
+ * Object for reading and writing to files using ByteBuffers and standard IO. 
+ * @author pag
+ */
+public class LReadOnlyIOBufferedFile extends LBufferedFile {
+
+  public LReadOnlyIOBufferedFile(RandomAccessFile file) {
+    super(file);
+  }
+
+  @Override
+  public ByteBuffer read(long offset, int length) throws IOException {
+    ByteBuffer data = allocate(length);
+    synchronized (file) {
+      file.seek(offset);
+      file.readFully(data.array());
+    }
+    return data;
+  }
+
+  @Override
+  public ByteBuffer allocate(long offset, int length) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void write(ByteBuffer data) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void seek(long offset) throws IOException {
+    file.seek(offset);
+  }
+
+}



More information about the Mulgara-svn mailing list