Skip to content

Commit

Permalink
feat: adding initial largelistview splitAndTransfer
Browse files Browse the repository at this point in the history
  • Loading branch information
vibhatha committed Aug 12, 2024
1 parent ee86db5 commit e708ac9
Show file tree
Hide file tree
Showing 3 changed files with 618 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ private void allocateBuffers() {
sizeBuffer = allocateBuffers(sizeAllocationSizeInBytes);
}

private ArrowBuf allocateBuffers(final long size) {
protected ArrowBuf allocateBuffers(final long size) {
final int curSize = (int) size;
ArrowBuf buffer = allocator.buffer(curSize);
buffer.readerIndex(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.ValueIterableVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.ZeroVector;
import org.apache.arrow.vector.compare.VectorVisitor;
import org.apache.arrow.vector.complex.impl.UnionLargeListViewReader;
import org.apache.arrow.vector.complex.impl.UnionLargeListViewWriter;
Expand Down Expand Up @@ -361,20 +362,17 @@ public TransferPair getTransferPair(Field field, BufferAllocator allocator) {

@Override
public TransferPair getTransferPair(String ref, BufferAllocator allocator, CallBack callBack) {
throw new UnsupportedOperationException(
"LargeListViewVector does not support getTransferPair(String, BufferAllocator, CallBack) yet");
return new TransferImpl(ref, allocator, callBack);
}

@Override
public TransferPair getTransferPair(Field field, BufferAllocator allocator, CallBack callBack) {
throw new UnsupportedOperationException(
"LargeListViewVector does not support getTransferPair(Field, BufferAllocator, CallBack) yet");
return new TransferImpl(field, allocator, callBack);
}

@Override
public TransferPair makeTransferPair(ValueVector target) {
throw new UnsupportedOperationException(
"LargeListViewVector does not support makeTransferPair(ValueVector) yet");
return new TransferImpl((LargeListViewVector) target);
}

@Override
Expand Down Expand Up @@ -452,6 +450,163 @@ public <OUT, IN> OUT accept(VectorVisitor<OUT, IN> visitor, IN value) {
throw new UnsupportedOperationException();
}

private class TransferImpl implements TransferPair {

LargeListViewVector to;
TransferPair dataTransferPair;

public TransferImpl(String name, BufferAllocator allocator, CallBack callBack) {
this(new LargeListViewVector(name, allocator, field.getFieldType(), callBack));
}

public TransferImpl(Field field, BufferAllocator allocator, CallBack callBack) {
this(new LargeListViewVector(field, allocator, callBack));
}

public TransferImpl(LargeListViewVector to) {
this.to = to;
to.addOrGetVector(vector.getField().getFieldType());
if (to.getDataVector() instanceof ZeroVector) {
to.addOrGetVector(vector.getField().getFieldType());
}
dataTransferPair = getDataVector().makeTransferPair(to.getDataVector());
}

@Override
public void transfer() {
to.clear();
dataTransferPair.transfer();
to.validityBuffer = transferBuffer(validityBuffer, to.allocator);
to.offsetBuffer = transferBuffer(offsetBuffer, to.allocator);
to.sizeBuffer = transferBuffer(sizeBuffer, to.allocator);
if (valueCount > 0) {
to.setValueCount(valueCount);
}
clear();
}

@Override
public void splitAndTransfer(int startIndex, int length) {
Preconditions.checkArgument(
startIndex >= 0 && length >= 0 && startIndex + length <= valueCount,
"Invalid parameters startIndex: %s, length: %s for valueCount: %s",
startIndex,
length,
valueCount);
to.clear();
if (length > 0) {
// we have to scan by index since there are out-of-order offsets
to.offsetBuffer = to.allocateBuffers((long) length * OFFSET_WIDTH);
to.sizeBuffer = to.allocateBuffers((long) length * SIZE_WIDTH);

/* splitAndTransfer the size buffer */
int maxOffsetAndSizeSum = -1;
int minOffsetValue = -1;
for (int i = 0; i < length; i++) {
final int offsetValue = offsetBuffer.getInt((long) (startIndex + i) * OFFSET_WIDTH);
final int sizeValue = sizeBuffer.getInt((long) (startIndex + i) * SIZE_WIDTH);
to.sizeBuffer.setInt((long) i * SIZE_WIDTH, sizeValue);
if (maxOffsetAndSizeSum < offsetValue + sizeValue) {
maxOffsetAndSizeSum = offsetValue + sizeValue;
}
if (minOffsetValue == -1 || minOffsetValue > offsetValue) {
minOffsetValue = offsetValue;
}
}

/* splitAndTransfer the offset buffer */
for (int i = 0; i < length; i++) {
final int offsetValue = offsetBuffer.getInt((long) (startIndex + i) * OFFSET_WIDTH);
final int relativeOffset = offsetValue - minOffsetValue;
to.offsetBuffer.setInt((long) i * OFFSET_WIDTH, relativeOffset);
}

/* splitAndTransfer the validity buffer */
splitAndTransferValidityBuffer(startIndex, length, to);

/* splitAndTransfer the data buffer */
final int childSliceLength = maxOffsetAndSizeSum - minOffsetValue;
dataTransferPair.splitAndTransfer(minOffsetValue, childSliceLength);
to.setValueCount(length);
}
}

/*
* transfer the validity.
*/
private void splitAndTransferValidityBuffer(
int startIndex, int length, LargeListViewVector target) {
int firstByteSource = BitVectorHelper.byteIndex(startIndex);
int lastByteSource = BitVectorHelper.byteIndex(valueCount - 1);
int byteSizeTarget = getValidityBufferSizeFromCount(length);
int offset = startIndex % 8;

if (length > 0) {
if (offset == 0) {
// slice
if (target.validityBuffer != null) {
target.validityBuffer.getReferenceManager().release();
}
target.validityBuffer = validityBuffer.slice(firstByteSource, byteSizeTarget);
target.validityBuffer.getReferenceManager().retain(1);
} else {
/* Copy data
* When the first bit starts from the middle of a byte (offset != 0),
* copy data from src BitVector.
* Each byte in the target is composed by a part in i-th byte,
* another part in (i+1)-th byte.
*/
target.allocateValidityBuffer(byteSizeTarget);

for (int i = 0; i < byteSizeTarget - 1; i++) {
byte b1 =
BitVectorHelper.getBitsFromCurrentByte(validityBuffer, firstByteSource + i, offset);
byte b2 =
BitVectorHelper.getBitsFromNextByte(
validityBuffer, firstByteSource + i + 1, offset);

target.validityBuffer.setByte(i, (b1 + b2));
}

/* Copying the last piece is done in the following manner:
* if the source vector has 1 or more bytes remaining, we copy
* the last piece as a byte formed by shifting data
* from the current byte and the next byte.
*
* if the source vector has no more bytes remaining
* (we are at the last byte), we copy the last piece as a byte
* by shifting data from the current byte.
*/
if ((firstByteSource + byteSizeTarget - 1) < lastByteSource) {
byte b1 =
BitVectorHelper.getBitsFromCurrentByte(
validityBuffer, firstByteSource + byteSizeTarget - 1, offset);
byte b2 =
BitVectorHelper.getBitsFromNextByte(
validityBuffer, firstByteSource + byteSizeTarget, offset);

target.validityBuffer.setByte(byteSizeTarget - 1, b1 + b2);
} else {
byte b1 =
BitVectorHelper.getBitsFromCurrentByte(
validityBuffer, firstByteSource + byteSizeTarget - 1, offset);
target.validityBuffer.setByte(byteSizeTarget - 1, b1);
}
}
}
}

@Override
public ValueVector getTo() {
return to;
}

@Override
public void copyValueSafe(int from, int to) {
this.to.copyFrom(from, to, LargeListViewVector.this);
}
}

@Override
protected FieldReader getReaderImpl() {
throw new UnsupportedOperationException(
Expand Down
Loading

0 comments on commit e708ac9

Please sign in to comment.