[Mulgara-svn] r1960 - in trunk/src/jar/util/java/org/mulgara/util: . io
pag at mulgara.org
pag at mulgara.org
Fri Jul 2 05:12:58 UTC 2010
Author: pag
Date: 2010-07-02 05:12:58 +0000 (Fri, 02 Jul 2010)
New Revision: 1960
Added:
trunk/src/jar/util/java/org/mulgara/util/io/
trunk/src/jar/util/java/org/mulgara/util/io/LBufferedFile.java
trunk/src/jar/util/java/org/mulgara/util/io/LIOBufferedFile.java
trunk/src/jar/util/java/org/mulgara/util/io/LMappedBufferedFile.java
trunk/src/jar/util/java/org/mulgara/util/io/LReadOnlyIOBufferedFile.java
Log:
Initial code for reading large files. Not yet tested
Added: trunk/src/jar/util/java/org/mulgara/util/io/LBufferedFile.java
===================================================================
--- trunk/src/jar/util/java/org/mulgara/util/io/LBufferedFile.java (rev 0)
+++ trunk/src/jar/util/java/org/mulgara/util/io/LBufferedFile.java 2010-07-02 05:12:58 UTC (rev 1960)
@@ -0,0 +1,161 @@
+/*
+ * Copyright 2010 Paul Gearon
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.mulgara.util.io;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import org.apache.log4j.Logger;
+
+/**
+ * An abstraction for reading and writing to files.
+ * @author Paul Gearon
+ *
+ */
+public abstract class LBufferedFile {
+
+ private final static Logger logger = Logger.getLogger(LBufferedFile.class);
+
+ /** The property for the block type. May be "direct" or "javaHeap" */
+ public static final String MEM_TYPE_PROP = "mulgara.xa.memoryType";
+
+ /** The property for the IO type. May be "mapped" or "explicit" */
+ public static final String IO_TYPE_PROP = "mulgara.xa.forceIOType";
+
+ /** Enumeration of the different memory types for blocks */
+ enum BlockMemoryType { DIRECT, HEAP };
+
+ /** Enumeration of the different memory types for blocks */
+ enum IOType { MAPPED, EXPLICIT };
+
+ /** Native ordering of the bytes */
+ ByteOrder NATIVE_ORDER = ByteOrder.nativeOrder();
+
+ /** The default value to use for the block memory type. Used when nothing is configured. */
+ private static final BlockMemoryType DEFAULT = BlockMemoryType.DIRECT;
+
+ /** The configured type of block type to use. */
+ private static final BlockMemoryType BLOCK_TYPE;
+
+ static {
+ String defBlockType = System.getProperty(MEM_TYPE_PROP, DEFAULT.name());
+ if (defBlockType.equalsIgnoreCase(BlockMemoryType.DIRECT.name())) {
+ BLOCK_TYPE = BlockMemoryType.DIRECT;
+ } else if (defBlockType.equalsIgnoreCase(BlockMemoryType.HEAP.name())) {
+ BLOCK_TYPE = BlockMemoryType.HEAP;
+ } else {
+ logger.warn("Invalid value for property " + MEM_TYPE_PROP + ": " + defBlockType);
+ BLOCK_TYPE = DEFAULT;
+ }
+ }
+
+
+ /** The file being accessed */
+ protected RandomAccessFile file;
+
+ /**
+ * Creates new buffered access for a file.
+ * @param file the file to provide buffered access for.
+ */
+ LBufferedFile(RandomAccessFile file) {
+ this.file = file;
+ }
+
+ /**
+ * Create read only buffered file access. This will be a mapped file, unless overridden.
+ * @param file The file to access.
+ * @return The file for buffered access.
+ * @throws IOException If there is an error accessing the file.
+ */
+ public static LBufferedFile newReadOnlyFile(RandomAccessFile file) throws IOException {
+ String ioTypeProp = System.getProperty("mulgara.xa.forceIOType", "mapped");
+
+ IOType ioType = IOType.MAPPED;
+ if (ioTypeProp.equalsIgnoreCase(IOType.MAPPED.name())) {
+ ioType = IOType.MAPPED;
+ } else if (ioTypeProp.equalsIgnoreCase(IOType.EXPLICIT.name())) {
+ ioType = IOType.EXPLICIT;
+ } else {
+ logger.warn("Invalid value for property mulgara.xa.forceIOType: " + ioTypeProp);
+ }
+
+ if (ioType == IOType.MAPPED) return new LMappedBufferedFile(file);
+ else return new LReadOnlyIOBufferedFile(file);
+ }
+
+ /**
+ * Reads a buffer from a file, at a given position.
+ * @param offset The offset to get the buffer from.
+ * @param length The required size of the buffer.
+ * @return The buffer that was read.
+ */
+ public abstract ByteBuffer read(long offset, int length) throws IOException;
+
+ /**
+ * Create a buffer that will be used for writing to a file at a given location.
+ * @param offset The location in the file that the buffer will write to.
+ * @param length The size of the buffer.
+ * @return The buffer for accessing that part of the file.
+ */
+ public abstract ByteBuffer allocate(long offset, int length) throws IOException;
+
+ /**
+ * Puts the contents of a buffer into the file. This will go back to wherever
+ * it was supposed to go.
+ * @param data The buffer to be written.
+ */
+ public abstract void write(ByteBuffer data) throws IOException;
+
+ /**
+ * Moves the current file position.
+ * @param offset The new position in the file.
+ */
+ public abstract void seek(long offset) throws IOException;
+
+ /**
+ * Ensures that all data written to this file is forced to disk.
+ * @throws IOException If there is an IO error accessing the disk.
+ */
+ public void force() throws IOException {
+ file.getChannel().force(false);
+ }
+
+ /**
+ * Allocates data according to the system configured memory model.
+ * @param size The number of bytes in the allocated buffer.
+ * @return the allocated buffer.
+ */
+ protected ByteBuffer allocate(int size) {
+ return BLOCK_TYPE == BlockMemoryType.DIRECT ?
+ ByteBuffer.allocateDirect(size).order(NATIVE_ORDER) :
+ ByteBuffer.allocate(size).order(NATIVE_ORDER);
+ }
+
+ /**
+ * Allocates data according to the system configured memory model.
+ * @param size The number of bytes in the allocated buffer.
+ * @param order The byteorder to use in the buffer.
+ * @return the allocated buffer.
+ */
+ protected ByteBuffer allocate(int size, ByteOrder order) {
+ return BLOCK_TYPE == BlockMemoryType.DIRECT ?
+ ByteBuffer.allocateDirect(size).order(order) :
+ ByteBuffer.allocate(size).order(order);
+ }
+}
Added: trunk/src/jar/util/java/org/mulgara/util/io/LIOBufferedFile.java
===================================================================
--- trunk/src/jar/util/java/org/mulgara/util/io/LIOBufferedFile.java (rev 0)
+++ trunk/src/jar/util/java/org/mulgara/util/io/LIOBufferedFile.java 2010-07-02 05:12:58 UTC (rev 1960)
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2010 Paul Gearon
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.mulgara.util.io;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.util.WeakHashMap;
+
+/**
+ * Object for reading and writing to files using ByteBuffers and standard IO.
+ * @author pag
+ */
+public class LIOBufferedFile extends LBufferedFile {
+
+ private WeakHashMap<SystemBuffer,Long> offsets = new WeakHashMap<SystemBuffer,Long>();
+
+ public LIOBufferedFile(RandomAccessFile file) {
+ super(file);
+ }
+
+ @Override
+ public ByteBuffer read(long offset, int length) throws IOException {
+ ByteBuffer data = allocate(length);
+ synchronized (file) {
+ file.seek(offset);
+ file.readFully(data.array());
+ }
+ return data;
+ }
+
+ @Override
+ public ByteBuffer allocate(long offset, int length) {
+ ByteBuffer data = allocate(length);
+ offsets.put(new SystemBuffer(data), offset);
+ return data;
+ }
+
+ @Override
+ public void write(ByteBuffer data) throws IOException {
+ synchronized (file) {
+ Long offset = offsets.get(new SystemBuffer(data));
+ // if the offset is unknown, then the caller must have done a seek()
+ if (offset != null) file.seek(offset);
+ file.write(data.array());
+ }
+ }
+
+ @Override
+ public void seek(long offset) throws IOException {
+ file.seek(offset);
+ }
+
+ /**
+ * A wrapper class for allowing ByteBuffers to be mapped by == instead of their internal method.
+ */
+ class SystemBuffer {
+ ByteBuffer buffer;
+ public SystemBuffer(ByteBuffer b) { buffer = b; }
+ public boolean equals(Object o) { return buffer == ((SystemBuffer)o).buffer; }
+ public int hashCode() { return System.identityHashCode(buffer); }
+ }
+}
Added: trunk/src/jar/util/java/org/mulgara/util/io/LMappedBufferedFile.java
===================================================================
--- trunk/src/jar/util/java/org/mulgara/util/io/LMappedBufferedFile.java (rev 0)
+++ trunk/src/jar/util/java/org/mulgara/util/io/LMappedBufferedFile.java 2010-07-02 05:12:58 UTC (rev 1960)
@@ -0,0 +1,108 @@
+package org.mulgara.util.io;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+
+/**
+ * A memory mapped read-only version of LBufferedFile
+ * @author pag
+ */
+public class LMappedBufferedFile extends LBufferedFile {
+
+ /** The size of a page to be mapped */
+ private static final int PAGE_SIZE = 33554432; // 32 MB
+
+ /** The channel to the file being accessed */
+ private FileChannel fc;
+
+ /** All the pages of mapped buffers */
+ private MappedByteBuffer[] buffers;
+
+ /**
+ * Create a new buffered file for Long seeks.
+ * @param file The file to buffer.
+ * @throws IOException There was an error setting up initial mapping of the file.
+ */
+ public LMappedBufferedFile(RandomAccessFile file) throws IOException {
+ super(file);
+ fc = file.getChannel();
+ mapFile();
+ }
+
+ @Override
+ public ByteBuffer read(long offset, int length) throws IOException {
+ if (offset + length > buffers.length * PAGE_SIZE) mapFile();
+ int page = (int)(offset / PAGE_SIZE);
+ int page_offset = (int)(offset % PAGE_SIZE);
+ if (page_offset + length <= PAGE_SIZE) {
+ ByteBuffer bb = buffers[page].asReadOnlyBuffer();
+ bb.position(page_offset);
+ bb.limit(length);
+ return bb.slice();
+ } else {
+ ByteBuffer data = allocate(length);
+ byte[] dataArray = data.array();
+ ByteBuffer tmp = buffers[page].asReadOnlyBuffer();
+ tmp.position(page_offset);
+ int firstSlice = PAGE_SIZE - page_offset;
+ tmp.get(dataArray, 0, firstSlice);
+ tmp = buffers[page + 1].asReadOnlyBuffer();
+ tmp.get(dataArray, firstSlice, length - firstSlice);
+ return data;
+ }
+ }
+
+ @Override
+ public ByteBuffer allocate(long offset, int length) throws IOException {
+ return read(offset, length);
+ }
+
+ @Override
+ public void write(ByteBuffer data) throws IOException {
+ // no-op, even if read/write
+ }
+
+ @Override
+ public void seek(long offset) throws IOException {
+ // no-op
+ }
+
+ /**
+ * Map the entire file
+ * @throws IOException If there is an error mapping the file
+ */
+ synchronized void mapFile() throws IOException {
+ long size = fc.size();
+ // get all the pages, including a possible partial page
+ int pages = (int)((size + PAGE_SIZE - 1) / PAGE_SIZE);
+ // get all the full pages. Either pages or pages-1
+ int fullPages = (int)(size / PAGE_SIZE);
+
+ // create a larger buffers array, with all the original buffers in it
+ // except any partial pages
+ MappedByteBuffer[] newBuffers = new MappedByteBuffer[pages];
+ int start = 0;
+ if (buffers != null) {
+ int topBuffer = buffers.length - 1;
+ if (buffers[topBuffer].limit() < PAGE_SIZE) {
+ buffers[topBuffer] = null;
+ } else {
+ topBuffer++;
+ }
+ System.arraycopy(buffers, 0, newBuffers, 0, topBuffer);
+ start = buffers.length;
+ }
+
+ // fill in the rest of the new array
+ for (int page = start; page < fullPages; page++) {
+ newBuffers[page] = fc.map(FileChannel.MapMode.READ_ONLY, page * PAGE_SIZE, PAGE_SIZE);
+ }
+ // if there's a partial page at the end, then map it
+ if (fullPages < pages) newBuffers[fullPages] = fc.map(FileChannel.MapMode.READ_ONLY, fullPages * PAGE_SIZE, size % PAGE_SIZE);
+
+ buffers = newBuffers;
+ }
+}
Added: trunk/src/jar/util/java/org/mulgara/util/io/LReadOnlyIOBufferedFile.java
===================================================================
--- trunk/src/jar/util/java/org/mulgara/util/io/LReadOnlyIOBufferedFile.java (rev 0)
+++ trunk/src/jar/util/java/org/mulgara/util/io/LReadOnlyIOBufferedFile.java 2010-07-02 05:12:58 UTC (rev 1960)
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2010 Paul Gearon
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.mulgara.util.io;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+
+/**
+ * Object for reading and writing to files using ByteBuffers and standard IO.
+ * @author pag
+ */
+public class LReadOnlyIOBufferedFile extends LBufferedFile {
+
+ public LReadOnlyIOBufferedFile(RandomAccessFile file) {
+ super(file);
+ }
+
+ @Override
+ public ByteBuffer read(long offset, int length) throws IOException {
+ ByteBuffer data = allocate(length);
+ synchronized (file) {
+ file.seek(offset);
+ file.readFully(data.array());
+ }
+ return data;
+ }
+
+ @Override
+ public ByteBuffer allocate(long offset, int length) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void write(ByteBuffer data) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void seek(long offset) throws IOException {
+ file.seek(offset);
+ }
+
+}
More information about the Mulgara-svn
mailing list