Skip to content

Commit

Permalink
[#9410] Extract common code parts into separate functions to reduce c…
Browse files Browse the repository at this point in the history
…ode duplication
  • Loading branch information
javrasya committed Apr 12, 2024
1 parent 318ee3c commit 1b5a26f
Showing 1 changed file with 37 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,32 +132,14 @@ static IcebergSourceSplit deserializeV1(byte[] serialized) throws IOException {
}

byte[] serializeV2() throws IOException {
if (serializedBytesCache == null) {
DataOutputSerializer out = SERIALIZER_CACHE.get();
Collection<FileScanTask> fileScanTasks = task.tasks();
Preconditions.checkArgument(
fileOffset >= 0 && fileOffset < fileScanTasks.size(),
"Invalid file offset: %s. Should be within the range of [0, %s)",
fileOffset,
fileScanTasks.size());

out.writeInt(fileOffset);
out.writeLong(recordOffset);
out.writeInt(fileScanTasks.size());

for (FileScanTask fileScanTask : fileScanTasks) {
String taskJson = FileScanTaskParser.toJson(fileScanTask);
out.writeUTF(taskJson);
}

serializedBytesCache = out.getCopyOfBuffer();
out.clear();
}

return serializedBytesCache;
return serialize(2);
}

byte[] serializeV3() throws IOException {
return serialize(3);
}

private byte[] serialize(int version) throws IOException {
if (serializedBytesCache == null) {
DataOutputSerializer out = SERIALIZER_CACHE.get();
Collection<FileScanTask> fileScanTasks = task.tasks();
Expand All @@ -173,7 +155,7 @@ byte[] serializeV3() throws IOException {

for (FileScanTask fileScanTask : fileScanTasks) {
String taskJson = FileScanTaskParser.toJson(fileScanTask);
SerializerHelper.writeLongUTF(out, taskJson);
writeLongUTF(out, taskJson, version);
}

serializedBytesCache = out.getCopyOfBuffer();
Expand All @@ -183,39 +165,56 @@ byte[] serializeV3() throws IOException {
return serializedBytesCache;
}

static IcebergSourceSplit deserializeV2(byte[] serialized, boolean caseSensitive)
private static void writeLongUTF(DataOutputSerializer out, String taskJson, int version)
throws IOException {
DataInputDeserializer in = new DataInputDeserializer(serialized);
int fileOffset = in.readInt();
long recordOffset = in.readLong();
int taskCount = in.readInt();

List<FileScanTask> tasks = Lists.newArrayListWithCapacity(taskCount);
for (int i = 0; i < taskCount; ++i) {
String taskJson = in.readUTF();
FileScanTask task = FileScanTaskParser.fromJson(taskJson, caseSensitive);
tasks.add(task);
switch (version) {
case 2:
out.writeUTF(taskJson);
break;
case 3:
SerializerHelper.writeLongUTF(out, taskJson);
break;
default:
throw new IllegalArgumentException("Unsupported version: " + version);
}
}

CombinedScanTask combinedScanTask = new BaseCombinedScanTask(tasks);
return IcebergSourceSplit.fromCombinedScanTask(combinedScanTask, fileOffset, recordOffset);
static IcebergSourceSplit deserializeV2(byte[] serialized, boolean caseSensitive)
throws IOException {
return deserialize(serialized, caseSensitive, 2);
}

static IcebergSourceSplit deserializeV3(byte[] serialized, boolean caseSensitive)
throws IOException {
return deserialize(serialized, caseSensitive, 3);
}

private static IcebergSourceSplit deserialize(
byte[] serialized, boolean caseSensitive, int version) throws IOException {
DataInputDeserializer in = new DataInputDeserializer(serialized);
int fileOffset = in.readInt();
long recordOffset = in.readLong();
int taskCount = in.readInt();

List<FileScanTask> tasks = Lists.newArrayListWithCapacity(taskCount);
for (int i = 0; i < taskCount; ++i) {
String taskJson = SerializerHelper.readLongUTF(in);
String taskJson = readTaskJson(in, version);
FileScanTask task = FileScanTaskParser.fromJson(taskJson, caseSensitive);
tasks.add(task);
}

CombinedScanTask combinedScanTask = new BaseCombinedScanTask(tasks);
return IcebergSourceSplit.fromCombinedScanTask(combinedScanTask, fileOffset, recordOffset);
}

private static String readTaskJson(DataInputDeserializer in, int version) throws IOException {
switch (version) {
case 2:
return in.readUTF();
case 3:
return SerializerHelper.readLongUTF(in);
default:
throw new IllegalArgumentException("Unsupported version: " + version);
}
}
}

0 comments on commit 1b5a26f

Please sign in to comment.