diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java index e4bfbf1452e2..44e37afcfc60 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java @@ -132,6 +132,14 @@ static IcebergSourceSplit deserializeV1(byte[] serialized) throws IOException { } byte[] serializeV2() throws IOException { + return serialize(2); + } + + byte[] serializeV3() throws IOException { + return serialize(3); + } + + private byte[] serialize(int version) throws IOException { if (serializedBytesCache == null) { DataOutputSerializer out = SERIALIZER_CACHE.get(); Collection fileScanTasks = task.tasks(); @@ -147,7 +155,7 @@ byte[] serializeV2() throws IOException { for (FileScanTask fileScanTask : fileScanTasks) { String taskJson = FileScanTaskParser.toJson(fileScanTask); - out.writeUTF(taskJson); + writeTaskJson(out, taskJson, version); } serializedBytesCache = out.getCopyOfBuffer(); @@ -157,8 +165,32 @@ byte[] serializeV2() throws IOException { return serializedBytesCache; } + private static void writeTaskJson(DataOutputSerializer out, String taskJson, int version) + throws IOException { + switch (version) { + case 2: + out.writeUTF(taskJson); + break; + case 3: + SerializerHelper.writeLongUTF(out, taskJson); + break; + default: + throw new IllegalArgumentException("Unsupported version: " + version); + } + } + static IcebergSourceSplit deserializeV2(byte[] serialized, boolean caseSensitive) throws IOException { + return deserialize(serialized, caseSensitive, 2); + } + + static IcebergSourceSplit deserializeV3(byte[] serialized, boolean caseSensitive) + throws IOException { + return deserialize(serialized, caseSensitive, 3); + } + + private static IcebergSourceSplit deserialize( + byte[] serialized, boolean caseSensitive, int version) throws IOException { DataInputDeserializer in = new DataInputDeserializer(serialized); int fileOffset = in.readInt(); long recordOffset = in.readLong(); @@ -166,7 +198,7 @@ static IcebergSourceSplit deserializeV2(byte[] serialized, boolean caseSensitive List tasks = Lists.newArrayListWithCapacity(taskCount); for (int i = 0; i < taskCount; ++i) { - String taskJson = in.readUTF(); + String taskJson = readTaskJson(in, version); FileScanTask task = FileScanTaskParser.fromJson(taskJson, caseSensitive); tasks.add(task); } @@ -174,4 +206,15 @@ static IcebergSourceSplit deserializeV2(byte[] serialized, boolean caseSensitive CombinedScanTask combinedScanTask = new BaseCombinedScanTask(tasks); return IcebergSourceSplit.fromCombinedScanTask(combinedScanTask, fileOffset, recordOffset); } + + private static String readTaskJson(DataInputDeserializer in, int version) throws IOException { + switch (version) { + case 2: + return in.readUTF(); + case 3: + return SerializerHelper.readLongUTF(in); + default: + throw new IllegalArgumentException("Unsupported version: " + version); + } + } } diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplitSerializer.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplitSerializer.java index 8c089819e731..d4b0f9e1977d 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplitSerializer.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplitSerializer.java @@ -24,7 +24,7 @@ @Internal public class IcebergSourceSplitSerializer implements SimpleVersionedSerializer { - private static final int VERSION = 2; + private static final int VERSION = 3; private final boolean caseSensitive; @@ -39,7 +39,7 @@ public int getVersion() { @Override public byte[] serialize(IcebergSourceSplit split) throws IOException { - return split.serializeV2(); + return split.serializeV3(); } @Override @@ -49,6 +49,8 @@ public IcebergSourceSplit deserialize(int version, byte[] serialized) throws IOE return IcebergSourceSplit.deserializeV1(serialized); case 2: return IcebergSourceSplit.deserializeV2(serialized, caseSensitive); + case 3: + return IcebergSourceSplit.deserializeV3(serialized, caseSensitive); default: throw new IOException( String.format( diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/SerializerHelper.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/SerializerHelper.java new file mode 100644 index 000000000000..a0395f29ac5b --- /dev/null +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/split/SerializerHelper.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.split; + +import java.io.IOException; +import java.io.Serializable; +import java.io.UTFDataFormatException; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; + +/** + * Helper class to serialize and deserialize strings longer than 65K. The inspiration is mostly + * taken from the class org.apache.flink.core.memory.DataInputSerializer.readUTF and + * org.apache.flink.core.memory.DataOutputSerializer.writeUTF. + */ +class SerializerHelper implements Serializable { + + private SerializerHelper() {} + + /** + * Similar to {@link DataOutputSerializer#writeUTF(String)}. Except this supports larger payloads + * which is up to max integer value. + * + *

Note: This method can be removed when the method which does similar thing within the {@link + * DataOutputSerializer} already which does the same thing, so use that one instead once that is + * released on Flink version 1.20. + * + *

See * FLINK-34228 * https://github.com/apache/flink/pull/24191 + * + * @param out the output stream to write the string to. + * @param str the string value to be written. + */ + public static void writeLongUTF(DataOutputSerializer out, String str) throws IOException { + int strlen = str.length(); + long utflen = 0; + int ch; + + /* use charAt instead of copying String to char array */ + for (int i = 0; i < strlen; i++) { + ch = str.charAt(i); + utflen += getUTFBytesSize(ch); + + if (utflen > Integer.MAX_VALUE) { + throw new UTFDataFormatException("Encoded string reached maximum length: " + utflen); + } + } + + if (utflen > Integer.MAX_VALUE - 4) { + throw new UTFDataFormatException("Encoded string is too long: " + utflen); + } + + out.writeInt((int) utflen); + writeUTFBytes(out, str, (int) utflen); + } + + /** + * Similar to {@link DataInputDeserializer#readUTF()}. Except this supports larger payloads which + * is up to max integer value. + * + *

Note: This method can be removed when the method which does similar thing within the {@link + * DataOutputSerializer} already which does the same thing, so use that one instead once that is + * released on Flink version 1.20. + * + *

See * FLINK-34228 * https://github.com/apache/flink/pull/24191 + * + * @param in the input stream to read the string from. + * @return the string value read from the input stream. + * @throws IOException if an I/O error occurs when reading from the input stream. + */ + public static String readLongUTF(DataInputDeserializer in) throws IOException { + int utflen = in.readInt(); + byte[] bytearr = new byte[utflen]; + char[] chararr = new char[utflen]; + + int ch; + int char2; + int char3; + int count = 0; + int chararrCount = 0; + + in.readFully(bytearr, 0, utflen); + + while (count < utflen) { + ch = (int) bytearr[count] & 0xff; + if (ch > 127) { + break; + } + count++; + chararr[chararrCount++] = (char) ch; + } + + while (count < utflen) { + ch = (int) bytearr[count] & 0xff; + switch (ch >> 4) { + case 0: + case 1: + case 2: + case 3: + case 4: + case 5: + case 6: + case 7: + /* 0xxxxxxx */ + count++; + chararr[chararrCount++] = (char) ch; + break; + case 12: + case 13: + /* 110x xxxx 10xx xxxx */ + count += 2; + if (count > utflen) { + throw new UTFDataFormatException("malformed input: partial character at end"); + } + char2 = (int) bytearr[count - 1]; + if ((char2 & 0xC0) != 0x80) { + throw new UTFDataFormatException("malformed input around byte " + count); + } + chararr[chararrCount++] = (char) (((ch & 0x1F) << 6) | (char2 & 0x3F)); + break; + case 14: + /* 1110 xxxx 10xx xxxx 10xx xxxx */ + count += 3; + if (count > utflen) { + throw new UTFDataFormatException("malformed input: partial character at end"); + } + char2 = (int) bytearr[count - 2]; + char3 = (int) bytearr[count - 1]; + if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) { + throw new UTFDataFormatException("malformed input around byte " + (count - 1)); + } + chararr[chararrCount++] = + (char) (((ch & 0x0F) << 12) | ((char2 & 0x3F) << 6) | (char3 & 0x3F)); + break; + default: + /* 10xx xxxx, 1111 xxxx */ + throw new UTFDataFormatException("malformed input around byte " + count); + } + } + // The number of chars produced may be less than utflen + return new String(chararr, 0, chararrCount); + } + + private static int getUTFBytesSize(int ch) { + if ((ch >= 0x0001) && (ch <= 0x007F)) { + return 1; + } else if (ch > 0x07FF) { + return 3; + } else { + return 2; + } + } + + private static void writeUTFBytes(DataOutputSerializer out, String str, int utflen) + throws IOException { + int strlen = str.length(); + int ch; + + int len = Math.max(1024, utflen); + + byte[] bytearr = new byte[len]; + int count = 0; + + int index; + for (index = 0; index < strlen; index++) { + ch = str.charAt(index); + if (!((ch >= 0x0001) && (ch <= 0x007F))) { + break; + } + bytearr[count++] = (byte) ch; + } + + for (; index < strlen; index++) { + ch = str.charAt(index); + if ((ch >= 0x0001) && (ch <= 0x007F)) { + bytearr[count++] = (byte) ch; + } else if (ch > 0x07FF) { + bytearr[count++] = (byte) (0xE0 | ((ch >> 12) & 0x0F)); + bytearr[count++] = (byte) (0x80 | ((ch >> 6) & 0x3F)); + bytearr[count++] = (byte) (0x80 | (ch & 0x3F)); + } else { + bytearr[count++] = (byte) (0xC0 | ((ch >> 6) & 0x1F)); + bytearr[count++] = (byte) (0x80 | (ch & 0x3F)); + } + } + + out.write(bytearr, 0, count); + } +} diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/SplitHelpers.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/SplitHelpers.java index 3a8071523b7c..ebd220b00dba 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/SplitHelpers.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/SplitHelpers.java @@ -18,19 +18,30 @@ */ package org.apache.iceberg.flink.source; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; + import java.io.File; +import java.io.IOException; import java.util.List; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.BaseCombinedScanTask; +import org.apache.iceberg.BaseFileScanTask; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileMetadata; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PartitionSpecParser; +import org.apache.iceberg.SchemaParser; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.data.GenericAppenderHelper; import org.apache.iceberg.data.RandomGenericData; import org.apache.iceberg.data.Record; +import org.apache.iceberg.expressions.ResidualEvaluator; import org.apache.iceberg.flink.TestFixtures; import org.apache.iceberg.flink.source.split.IcebergSourceSplit; import org.apache.iceberg.hadoop.HadoopCatalog; @@ -129,4 +140,64 @@ public static List createSplitsFromTransientHadoopTable( catalog.close(); } } + + /** + * This method will equip the {@code icebergSourceSplits} with mock delete files. + *

  • For each split, create {@code deleteFilesPerSplit} number of delete files + *
  • Replace the original {@code FileScanTask} with the new {@code FileScanTask} with mock + *
  • Caller should not attempt to read the deleted files since they are created as mock, and + * they are not real files + * + * @param icebergSourceSplits The real splits to equip with mock delete files + * @param temporaryFolder The temporary folder to create the mock delete files with + * @param deleteFilesPerSplit The number of delete files to create for each split + * @return The list of re-created splits with mock delete files + * @throws IOException If there is any error creating the mock delete files + */ + public static List equipSplitsWithMockDeleteFiles( + List icebergSourceSplits, + TemporaryFolder temporaryFolder, + int deleteFilesPerSplit) + throws IOException { + List icebergSourceSplitsWithMockDeleteFiles = Lists.newArrayList(); + for (IcebergSourceSplit split : icebergSourceSplits) { + final CombinedScanTask combinedScanTask = spy(split.task()); + + final List deleteFiles = Lists.newArrayList(); + final PartitionSpec spec = + PartitionSpec.builderFor(TestFixtures.SCHEMA).withSpecId(0).build(); + + for (int i = 0; i < deleteFilesPerSplit; ++i) { + final DeleteFile deleteFile = + FileMetadata.deleteFileBuilder(spec) + .withFormat(FileFormat.PARQUET) + .withPath(temporaryFolder.newFile().getPath()) + .ofPositionDeletes() + .withFileSizeInBytes(1000) + .withRecordCount(1000) + .build(); + deleteFiles.add(deleteFile); + } + + List newFileScanTasks = Lists.newArrayList(); + for (FileScanTask task : combinedScanTask.tasks()) { + String schemaString = SchemaParser.toJson(task.schema()); + String specString = PartitionSpecParser.toJson(task.spec()); + + BaseFileScanTask baseFileScanTask = + new BaseFileScanTask( + task.file(), + deleteFiles.toArray(new DeleteFile[] {}), + schemaString, + specString, + ResidualEvaluator.unpartitioned(task.residual())); + newFileScanTasks.add(baseFileScanTask); + } + doReturn(newFileScanTasks).when(combinedScanTask).tasks(); + icebergSourceSplitsWithMockDeleteFiles.add( + IcebergSourceSplit.fromCombinedScanTask( + combinedScanTask, split.fileOffset(), split.recordOffset())); + } + return icebergSourceSplitsWithMockDeleteFiles; + } } diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/split/TestIcebergSourceSplitSerializer.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/split/TestIcebergSourceSplitSerializer.java index cd778309f90d..c72d622f86ba 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/split/TestIcebergSourceSplitSerializer.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/source/split/TestIcebergSourceSplitSerializer.java @@ -100,6 +100,26 @@ private void serializeAndDeserializeV2(int splitCount, int filesPerSplit) throws } } + @Test + public void testV3WithTooManyDeleteFiles() throws Exception { + serializeAndDeserializeV3(1, 1, 5000); + } + + private void serializeAndDeserializeV3(int splitCount, int filesPerSplit, int mockDeletesPerSplit) + throws Exception { + final List splits = + SplitHelpers.createSplitsFromTransientHadoopTable( + TEMPORARY_FOLDER, splitCount, filesPerSplit); + final List splitsWithMockDeleteFiles = + SplitHelpers.equipSplitsWithMockDeleteFiles(splits, TEMPORARY_FOLDER, mockDeletesPerSplit); + + for (IcebergSourceSplit split : splitsWithMockDeleteFiles) { + byte[] result = split.serializeV3(); + IcebergSourceSplit deserialized = IcebergSourceSplit.deserializeV3(result, true); + assertSplitEquals(split, deserialized); + } + } + @Test public void testDeserializeV1() throws Exception { final List splits = diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java index e4bfbf1452e2..44e37afcfc60 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java @@ -132,6 +132,14 @@ static IcebergSourceSplit deserializeV1(byte[] serialized) throws IOException { } byte[] serializeV2() throws IOException { + return serialize(2); + } + + byte[] serializeV3() throws IOException { + return serialize(3); + } + + private byte[] serialize(int version) throws IOException { if (serializedBytesCache == null) { DataOutputSerializer out = SERIALIZER_CACHE.get(); Collection fileScanTasks = task.tasks(); @@ -147,7 +155,7 @@ byte[] serializeV2() throws IOException { for (FileScanTask fileScanTask : fileScanTasks) { String taskJson = FileScanTaskParser.toJson(fileScanTask); - out.writeUTF(taskJson); + writeTaskJson(out, taskJson, version); } serializedBytesCache = out.getCopyOfBuffer(); @@ -157,8 +165,32 @@ byte[] serializeV2() throws IOException { return serializedBytesCache; } + private static void writeTaskJson(DataOutputSerializer out, String taskJson, int version) + throws IOException { + switch (version) { + case 2: + out.writeUTF(taskJson); + break; + case 3: + SerializerHelper.writeLongUTF(out, taskJson); + break; + default: + throw new IllegalArgumentException("Unsupported version: " + version); + } + } + static IcebergSourceSplit deserializeV2(byte[] serialized, boolean caseSensitive) throws IOException { + return deserialize(serialized, caseSensitive, 2); + } + + static IcebergSourceSplit deserializeV3(byte[] serialized, boolean caseSensitive) + throws IOException { + return deserialize(serialized, caseSensitive, 3); + } + + private static IcebergSourceSplit deserialize( + byte[] serialized, boolean caseSensitive, int version) throws IOException { DataInputDeserializer in = new DataInputDeserializer(serialized); int fileOffset = in.readInt(); long recordOffset = in.readLong(); @@ -166,7 +198,7 @@ static IcebergSourceSplit deserializeV2(byte[] serialized, boolean caseSensitive List tasks = Lists.newArrayListWithCapacity(taskCount); for (int i = 0; i < taskCount; ++i) { - String taskJson = in.readUTF(); + String taskJson = readTaskJson(in, version); FileScanTask task = FileScanTaskParser.fromJson(taskJson, caseSensitive); tasks.add(task); } @@ -174,4 +206,15 @@ static IcebergSourceSplit deserializeV2(byte[] serialized, boolean caseSensitive CombinedScanTask combinedScanTask = new BaseCombinedScanTask(tasks); return IcebergSourceSplit.fromCombinedScanTask(combinedScanTask, fileOffset, recordOffset); } + + private static String readTaskJson(DataInputDeserializer in, int version) throws IOException { + switch (version) { + case 2: + return in.readUTF(); + case 3: + return SerializerHelper.readLongUTF(in); + default: + throw new IllegalArgumentException("Unsupported version: " + version); + } + } } diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplitSerializer.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplitSerializer.java index 8c089819e731..d4b0f9e1977d 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplitSerializer.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplitSerializer.java @@ -24,7 +24,7 @@ @Internal public class IcebergSourceSplitSerializer implements SimpleVersionedSerializer { - private static final int VERSION = 2; + private static final int VERSION = 3; private final boolean caseSensitive; @@ -39,7 +39,7 @@ public int getVersion() { @Override public byte[] serialize(IcebergSourceSplit split) throws IOException { - return split.serializeV2(); + return split.serializeV3(); } @Override @@ -49,6 +49,8 @@ public IcebergSourceSplit deserialize(int version, byte[] serialized) throws IOE return IcebergSourceSplit.deserializeV1(serialized); case 2: return IcebergSourceSplit.deserializeV2(serialized, caseSensitive); + case 3: + return IcebergSourceSplit.deserializeV3(serialized, caseSensitive); default: throw new IOException( String.format( diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/split/SerializerHelper.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/split/SerializerHelper.java new file mode 100644 index 000000000000..a0395f29ac5b --- /dev/null +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/split/SerializerHelper.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.split; + +import java.io.IOException; +import java.io.Serializable; +import java.io.UTFDataFormatException; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; + +/** + * Helper class to serialize and deserialize strings longer than 65K. The inspiration is mostly + * taken from the class org.apache.flink.core.memory.DataInputSerializer.readUTF and + * org.apache.flink.core.memory.DataOutputSerializer.writeUTF. + */ +class SerializerHelper implements Serializable { + + private SerializerHelper() {} + + /** + * Similar to {@link DataOutputSerializer#writeUTF(String)}. Except this supports larger payloads + * which is up to max integer value. + * + *

    Note: This method can be removed when the method which does similar thing within the {@link + * DataOutputSerializer} already which does the same thing, so use that one instead once that is + * released on Flink version 1.20. + * + *

    See * FLINK-34228 * https://github.com/apache/flink/pull/24191 + * + * @param out the output stream to write the string to. + * @param str the string value to be written. + */ + public static void writeLongUTF(DataOutputSerializer out, String str) throws IOException { + int strlen = str.length(); + long utflen = 0; + int ch; + + /* use charAt instead of copying String to char array */ + for (int i = 0; i < strlen; i++) { + ch = str.charAt(i); + utflen += getUTFBytesSize(ch); + + if (utflen > Integer.MAX_VALUE) { + throw new UTFDataFormatException("Encoded string reached maximum length: " + utflen); + } + } + + if (utflen > Integer.MAX_VALUE - 4) { + throw new UTFDataFormatException("Encoded string is too long: " + utflen); + } + + out.writeInt((int) utflen); + writeUTFBytes(out, str, (int) utflen); + } + + /** + * Similar to {@link DataInputDeserializer#readUTF()}. Except this supports larger payloads which + * is up to max integer value. + * + *

    Note: This method can be removed when the method which does similar thing within the {@link + * DataOutputSerializer} already which does the same thing, so use that one instead once that is + * released on Flink version 1.20. + * + *

    See * FLINK-34228 * https://github.com/apache/flink/pull/24191 + * + * @param in the input stream to read the string from. + * @return the string value read from the input stream. + * @throws IOException if an I/O error occurs when reading from the input stream. + */ + public static String readLongUTF(DataInputDeserializer in) throws IOException { + int utflen = in.readInt(); + byte[] bytearr = new byte[utflen]; + char[] chararr = new char[utflen]; + + int ch; + int char2; + int char3; + int count = 0; + int chararrCount = 0; + + in.readFully(bytearr, 0, utflen); + + while (count < utflen) { + ch = (int) bytearr[count] & 0xff; + if (ch > 127) { + break; + } + count++; + chararr[chararrCount++] = (char) ch; + } + + while (count < utflen) { + ch = (int) bytearr[count] & 0xff; + switch (ch >> 4) { + case 0: + case 1: + case 2: + case 3: + case 4: + case 5: + case 6: + case 7: + /* 0xxxxxxx */ + count++; + chararr[chararrCount++] = (char) ch; + break; + case 12: + case 13: + /* 110x xxxx 10xx xxxx */ + count += 2; + if (count > utflen) { + throw new UTFDataFormatException("malformed input: partial character at end"); + } + char2 = (int) bytearr[count - 1]; + if ((char2 & 0xC0) != 0x80) { + throw new UTFDataFormatException("malformed input around byte " + count); + } + chararr[chararrCount++] = (char) (((ch & 0x1F) << 6) | (char2 & 0x3F)); + break; + case 14: + /* 1110 xxxx 10xx xxxx 10xx xxxx */ + count += 3; + if (count > utflen) { + throw new UTFDataFormatException("malformed input: partial character at end"); + } + char2 = (int) bytearr[count - 2]; + char3 = (int) bytearr[count - 1]; + if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) { + throw new UTFDataFormatException("malformed input around byte " + (count - 1)); + } + chararr[chararrCount++] = + (char) (((ch & 0x0F) << 12) | ((char2 & 0x3F) << 6) | (char3 & 0x3F)); + break; + default: + /* 10xx xxxx, 1111 xxxx */ + throw new UTFDataFormatException("malformed input around byte " + count); + } + } + // The number of chars produced may be less than utflen + return new String(chararr, 0, chararrCount); + } + + private static int getUTFBytesSize(int ch) { + if ((ch >= 0x0001) && (ch <= 0x007F)) { + return 1; + } else if (ch > 0x07FF) { + return 3; + } else { + return 2; + } + } + + private static void writeUTFBytes(DataOutputSerializer out, String str, int utflen) + throws IOException { + int strlen = str.length(); + int ch; + + int len = Math.max(1024, utflen); + + byte[] bytearr = new byte[len]; + int count = 0; + + int index; + for (index = 0; index < strlen; index++) { + ch = str.charAt(index); + if (!((ch >= 0x0001) && (ch <= 0x007F))) { + break; + } + bytearr[count++] = (byte) ch; + } + + for (; index < strlen; index++) { + ch = str.charAt(index); + if ((ch >= 0x0001) && (ch <= 0x007F)) { + bytearr[count++] = (byte) ch; + } else if (ch > 0x07FF) { + bytearr[count++] = (byte) (0xE0 | ((ch >> 12) & 0x0F)); + bytearr[count++] = (byte) (0x80 | ((ch >> 6) & 0x3F)); + bytearr[count++] = (byte) (0x80 | (ch & 0x3F)); + } else { + bytearr[count++] = (byte) (0xC0 | ((ch >> 6) & 0x1F)); + bytearr[count++] = (byte) (0x80 | (ch & 0x3F)); + } + } + + out.write(bytearr, 0, count); + } +} diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/SplitHelpers.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/SplitHelpers.java index 3a8071523b7c..ebd220b00dba 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/SplitHelpers.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/SplitHelpers.java @@ -18,19 +18,30 @@ */ package org.apache.iceberg.flink.source; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; + import java.io.File; +import java.io.IOException; import java.util.List; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.BaseCombinedScanTask; +import org.apache.iceberg.BaseFileScanTask; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileMetadata; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PartitionSpecParser; +import org.apache.iceberg.SchemaParser; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.data.GenericAppenderHelper; import org.apache.iceberg.data.RandomGenericData; import org.apache.iceberg.data.Record; +import org.apache.iceberg.expressions.ResidualEvaluator; import org.apache.iceberg.flink.TestFixtures; import org.apache.iceberg.flink.source.split.IcebergSourceSplit; import org.apache.iceberg.hadoop.HadoopCatalog; @@ -129,4 +140,64 @@ public static List createSplitsFromTransientHadoopTable( catalog.close(); } } + + /** + * This method will equip the {@code icebergSourceSplits} with mock delete files. + *

  • For each split, create {@code deleteFilesPerSplit} number of delete files + *
  • Replace the original {@code FileScanTask} with the new {@code FileScanTask} with mock + *
  • Caller should not attempt to read the deleted files since they are created as mock, and + * they are not real files + * + * @param icebergSourceSplits The real splits to equip with mock delete files + * @param temporaryFolder The temporary folder to create the mock delete files with + * @param deleteFilesPerSplit The number of delete files to create for each split + * @return The list of re-created splits with mock delete files + * @throws IOException If there is any error creating the mock delete files + */ + public static List equipSplitsWithMockDeleteFiles( + List icebergSourceSplits, + TemporaryFolder temporaryFolder, + int deleteFilesPerSplit) + throws IOException { + List icebergSourceSplitsWithMockDeleteFiles = Lists.newArrayList(); + for (IcebergSourceSplit split : icebergSourceSplits) { + final CombinedScanTask combinedScanTask = spy(split.task()); + + final List deleteFiles = Lists.newArrayList(); + final PartitionSpec spec = + PartitionSpec.builderFor(TestFixtures.SCHEMA).withSpecId(0).build(); + + for (int i = 0; i < deleteFilesPerSplit; ++i) { + final DeleteFile deleteFile = + FileMetadata.deleteFileBuilder(spec) + .withFormat(FileFormat.PARQUET) + .withPath(temporaryFolder.newFile().getPath()) + .ofPositionDeletes() + .withFileSizeInBytes(1000) + .withRecordCount(1000) + .build(); + deleteFiles.add(deleteFile); + } + + List newFileScanTasks = Lists.newArrayList(); + for (FileScanTask task : combinedScanTask.tasks()) { + String schemaString = SchemaParser.toJson(task.schema()); + String specString = PartitionSpecParser.toJson(task.spec()); + + BaseFileScanTask baseFileScanTask = + new BaseFileScanTask( + task.file(), + deleteFiles.toArray(new DeleteFile[] {}), + schemaString, + specString, + ResidualEvaluator.unpartitioned(task.residual())); + newFileScanTasks.add(baseFileScanTask); + } + doReturn(newFileScanTasks).when(combinedScanTask).tasks(); + icebergSourceSplitsWithMockDeleteFiles.add( + IcebergSourceSplit.fromCombinedScanTask( + combinedScanTask, split.fileOffset(), split.recordOffset())); + } + return icebergSourceSplitsWithMockDeleteFiles; + } } diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/split/TestIcebergSourceSplitSerializer.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/split/TestIcebergSourceSplitSerializer.java index cd778309f90d..c72d622f86ba 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/split/TestIcebergSourceSplitSerializer.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/split/TestIcebergSourceSplitSerializer.java @@ -100,6 +100,26 @@ private void serializeAndDeserializeV2(int splitCount, int filesPerSplit) throws } } + @Test + public void testV3WithTooManyDeleteFiles() throws Exception { + serializeAndDeserializeV3(1, 1, 5000); + } + + private void serializeAndDeserializeV3(int splitCount, int filesPerSplit, int mockDeletesPerSplit) + throws Exception { + final List splits = + SplitHelpers.createSplitsFromTransientHadoopTable( + TEMPORARY_FOLDER, splitCount, filesPerSplit); + final List splitsWithMockDeleteFiles = + SplitHelpers.equipSplitsWithMockDeleteFiles(splits, TEMPORARY_FOLDER, mockDeletesPerSplit); + + for (IcebergSourceSplit split : splitsWithMockDeleteFiles) { + byte[] result = split.serializeV3(); + IcebergSourceSplit deserialized = IcebergSourceSplit.deserializeV3(result, true); + assertSplitEquals(split, deserialized); + } + } + @Test public void testDeserializeV1() throws Exception { final List splits =