Skip to content

Commit

Permalink
[ARROW-499]: [Java] Update file serialization to use the streaming se…
Browse files Browse the repository at this point in the history
…rialization format.

This removes the old serialization code and unifies the code paths. This also changes
the serialization format to match the alignment requirements from the previous file
format.
  • Loading branch information
nongli committed Jan 19, 2017
1 parent 6811d3f commit 27b3909
Show file tree
Hide file tree
Showing 9 changed files with 119 additions and 133 deletions.
5 changes: 1 addition & 4 deletions format/File.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,7 @@ struct Block {

offset: long;

metaDataLength: int;

bodyLength: long;

length: int;
}

root_type Footer;
Original file line number Diff line number Diff line change
Expand Up @@ -25,39 +25,32 @@
public class ArrowBlock implements FBSerializable {

private final long offset;
private final int metadataLength;
private final long bodyLength;
private final int length;

public ArrowBlock(long offset, int metadataLength, long bodyLength) {
public ArrowBlock(long offset, int length) {
super();
this.offset = offset;
this.metadataLength = metadataLength;
this.bodyLength = bodyLength;
this.length = length;
}

public long getOffset() {
return offset;
}

public int getMetadataLength() {
return metadataLength;
}

public long getBodyLength() {
return bodyLength;
public int getLength() {
return length;
}

@Override
public int writeTo(FlatBufferBuilder builder) {
return Block.createBlock(builder, offset, metadataLength, bodyLength);
return Block.createBlock(builder, offset, length);
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + (int) (bodyLength ^ (bodyLength >>> 32));
result = prime * result + metadataLength;
result = prime * result + (length ^ (length >>> 32));
result = prime * result + (int) (offset ^ (offset >>> 32));
return result;
}
Expand All @@ -71,9 +64,7 @@ public boolean equals(Object obj) {
if (getClass() != obj.getClass())
return false;
ArrowBlock other = (ArrowBlock) obj;
if (bodyLength != other.bodyLength)
return false;
if (metadataLength != other.metadataLength)
if (length != other.length)
return false;
if (offset != other.offset)
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,23 @@ public ArrowFooter(Footer footer) {
private static List<ArrowBlock> recordBatches(Footer footer) {
List<ArrowBlock> recordBatches = new ArrayList<>();
Block tempBlock = new Block();

int recordBatchesLength = footer.recordBatchesLength();
for (int i = 0; i < recordBatchesLength; i++) {
Block block = footer.recordBatches(tempBlock, i);
recordBatches.add(new ArrowBlock(block.offset(), block.metaDataLength(), block.bodyLength()));
recordBatches.add(new ArrowBlock(block.offset(), block.length()));
}
return recordBatches;
}

private static List<ArrowBlock> dictionaries(Footer footer) {
List<ArrowBlock> dictionaries = new ArrayList<>();
Block tempBLock = new Block();
Block tempBlock = new Block();

int dictionariesLength = footer.dictionariesLength();
for (int i = 0; i < dictionariesLength; i++) {
Block block = footer.dictionaries(tempBLock, i);
dictionaries.add(new ArrowBlock(block.offset(), block.metaDataLength(), block.bodyLength()));
Block block = footer.dictionaries(tempBlock, i);
dictionaries.add(new ArrowBlock(block.offset(), block.length()));
}
return dictionaries;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,15 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.arrow.flatbuf.Buffer;
import org.apache.arrow.flatbuf.FieldNode;
import org.apache.arrow.flatbuf.Footer;
import org.apache.arrow.flatbuf.RecordBatch;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.schema.ArrowFieldNode;
import org.apache.arrow.vector.schema.ArrowRecordBatch;
import org.apache.arrow.vector.stream.MessageSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.buffer.ArrowBuf;

public class ArrowReader implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(ArrowReader.class);

Expand All @@ -54,15 +46,6 @@ public ArrowReader(SeekableByteChannel in, BufferAllocator allocator) {
this.allocator = allocator;
}

private int readFully(ArrowBuf buffer, int l) throws IOException {
int n = readFully(buffer.nioBuffer(buffer.writerIndex(), l));
buffer.writerIndex(n);
if (n != l) {
throw new IllegalStateException(n + " != " + l);
}
return n;
}

private int readFully(ByteBuffer buffer) throws IOException {
int total = 0;
int n;
Expand Down Expand Up @@ -104,46 +87,20 @@ public ArrowFooter readFooter() throws IOException {

// TODO: read dictionaries

public ArrowRecordBatch readRecordBatch(ArrowBlock recordBatchBlock) throws IOException {
LOGGER.debug(String.format("RecordBatch at %d, metadata: %d, body: %d", recordBatchBlock.getOffset(), recordBatchBlock.getMetadataLength(), recordBatchBlock.getBodyLength()));
int l = (int)(recordBatchBlock.getMetadataLength() + recordBatchBlock.getBodyLength());
if (l < 0) {
throw new InvalidArrowFileException("block invalid: " + recordBatchBlock);
}
final ArrowBuf buffer = allocator.buffer(l);
LOGGER.debug("allocated buffer " + buffer);
in.position(recordBatchBlock.getOffset());
int n = readFully(buffer, l);
if (n != l) {
throw new IllegalStateException(n + " != " + l);
}

// Record batch flatbuffer is prefixed by its size as int32le
final ArrowBuf metadata = buffer.slice(4, recordBatchBlock.getMetadataLength() - 4);
RecordBatch recordBatchFB = RecordBatch.getRootAsRecordBatch(metadata.nioBuffer().asReadOnlyBuffer());

int nodesLength = recordBatchFB.nodesLength();
final ArrowBuf body = buffer.slice(recordBatchBlock.getMetadataLength(), (int)recordBatchBlock.getBodyLength());
List<ArrowFieldNode> nodes = new ArrayList<>();
for (int i = 0; i < nodesLength; ++i) {
FieldNode node = recordBatchFB.nodes(i);
nodes.add(new ArrowFieldNode(node.length(), node.nullCount()));
public ArrowRecordBatch readRecordBatch(ArrowBlock block) throws IOException {
LOGGER.debug(String.format("RecordBatch at offset %d len: %d",
block.getOffset(), block.getLength()));
in.position(block.getOffset());
ArrowRecordBatch batch = MessageSerializer.deserializeRecordBatch(
new ReadChannel(in, block.getOffset()), (int)block.getLength(), allocator);
if (batch == null) {
throw new IOException("Invalid file. No batch at offset: " + block.getOffset());
}
List<ArrowBuf> buffers = new ArrayList<>();
for (int i = 0; i < recordBatchFB.buffersLength(); ++i) {
Buffer bufferFB = recordBatchFB.buffers(i);
LOGGER.debug(String.format("Buffer in RecordBatch at %d, length: %d", bufferFB.offset(), bufferFB.length()));
ArrowBuf vectorBuffer = body.slice((int)bufferFB.offset(), (int)bufferFB.length());
buffers.add(vectorBuffer);
}
ArrowRecordBatch arrowRecordBatch = new ArrowRecordBatch(recordBatchFB.length(), nodes, buffers);
LOGGER.debug("released buffer " + buffer);
buffer.release();
return arrowRecordBatch;
return batch;
}

@Override
public void close() throws IOException {
in.close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,12 @@
import java.util.Collections;
import java.util.List;

import org.apache.arrow.vector.schema.ArrowBuffer;
import org.apache.arrow.vector.schema.ArrowRecordBatch;
import org.apache.arrow.vector.stream.MessageSerializer;
import org.apache.arrow.vector.types.pojo.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.buffer.ArrowBuf;

public class ArrowWriter implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(ArrowWriter.class);

Expand All @@ -39,7 +37,6 @@ public class ArrowWriter implements AutoCloseable {
private final Schema schema;

private final List<ArrowBlock> recordBatches = new ArrayList<>();

private boolean started = false;

public ArrowWriter(WritableByteChannel out, Schema schema) {
Expand All @@ -49,47 +46,17 @@ public ArrowWriter(WritableByteChannel out, Schema schema) {

private void start() throws IOException {
writeMagic();
MessageSerializer.serialize(out, schema);
}


// TODO: write dictionaries

public void writeRecordBatch(ArrowRecordBatch recordBatch) throws IOException {
checkStarted();
out.align();

// write metadata header with int32 size prefix
long offset = out.getCurrentPosition();
out.write(recordBatch, true);
out.align();
// write body
long bodyOffset = out.getCurrentPosition();
List<ArrowBuf> buffers = recordBatch.getBuffers();
List<ArrowBuffer> buffersLayout = recordBatch.getBuffersLayout();
if (buffers.size() != buffersLayout.size()) {
throw new IllegalStateException("the layout does not match: " + buffers.size() + " != " + buffersLayout.size());
}
for (int i = 0; i < buffers.size(); i++) {
ArrowBuf buffer = buffers.get(i);
ArrowBuffer layout = buffersLayout.get(i);
long startPosition = bodyOffset + layout.getOffset();
if (startPosition != out.getCurrentPosition()) {
out.writeZeros((int)(startPosition - out.getCurrentPosition()));
}

out.write(buffer);
if (out.getCurrentPosition() != startPosition + layout.getSize()) {
throw new IllegalStateException("wrong buffer size: " + out.getCurrentPosition() + " != " + startPosition + layout.getSize());
}
}
int metadataLength = (int)(bodyOffset - offset);
if (metadataLength <= 0) {
throw new InvalidArrowFileException("invalid recordBatch");
}
long bodyLength = out.getCurrentPosition() - bodyOffset;
LOGGER.debug(String.format("RecordBatch at %d, metadata: %d, body: %d", offset, metadataLength, bodyLength));
// add metadata to footer
recordBatches.add(new ArrowBlock(offset, metadataLength, bodyLength));
ArrowBlock batchDesc = MessageSerializer.serialize(out, recordBatch);
LOGGER.debug(String.format("RecordBatch at offset: %d len: %d",
batchDesc.getOffset(), batchDesc.getLength()));
recordBatches.add(batchDesc);
}

private void checkStarted() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,16 @@ public class ReadChannel implements AutoCloseable {

private ReadableByteChannel in;
private long bytesRead = 0;
// The starting byte offset into 'in'.
private final long startByteOffset;

public ReadChannel(ReadableByteChannel in) {
public ReadChannel(ReadableByteChannel in, long startByteOffset) {
this.in = in;
this.startByteOffset = startByteOffset;
}

public ReadChannel(ReadableByteChannel in) {
this(in, 0);
}

public long bytesRead() { return bytesRead; }
Expand Down Expand Up @@ -65,6 +72,8 @@ public int readFully(ArrowBuf buffer, int l) throws IOException {
return n;
}

public long getCurrentPositiion() { return startByteOffset + bytesRead; }

@Override
public void close() throws IOException {
if (this.in != null) {
Expand Down
Loading

0 comments on commit 27b3909

Please sign in to comment.