Skip to content

Commit

Permalink
Flink: Apply row-level deletes when reading (#1517)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenjunjiedada authored Oct 14, 2020
1 parent 9d98b1d commit a238a90
Show file tree
Hide file tree
Showing 15 changed files with 286 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.apache.iceberg.flink.sink;
package org.apache.iceberg.flink;

import java.lang.reflect.Array;
import java.nio.ByteBuffer;
Expand All @@ -35,13 +35,13 @@
import org.apache.iceberg.util.DateTimeUtil;
import org.apache.iceberg.util.UUIDUtil;

class RowDataWrapper implements StructLike {
public class RowDataWrapper implements StructLike {

private final LogicalType[] types;
private final PositionalGetter<?>[] getters;
private RowData rowData = null;

RowDataWrapper(RowType rowType, Types.StructType struct) {
public RowDataWrapper(RowType rowType, Types.StructType struct) {
int size = rowType.getFieldCount();

types = (LogicalType[]) Array.newInstance(LogicalType.class, size);
Expand All @@ -53,7 +53,7 @@ class RowDataWrapper implements StructLike {
}
}

RowDataWrapper wrap(RowData data) {
public RowDataWrapper wrap(RowData data) {
this.rowData = data;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.parquet.ParquetValueReader;
import org.apache.iceberg.parquet.ParquetValueReaders;
Expand Down Expand Up @@ -112,6 +113,9 @@ public ParquetValueReader<RowData> struct(Types.StructType expected, GroupType s
// containsKey is used because the constant may be null
reorderedFields.add(ParquetValueReaders.constant(idToConstant.get(id)));
types.add(null);
} else if (id == MetadataColumns.ROW_POSITION.fieldId()) {
reorderedFields.add(ParquetValueReaders.position());
types.add(null);
} else {
ParquetValueReader<?> reader = readersById.get(id);
if (reader != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.flink.RowDataWrapper;
import org.apache.iceberg.flink.data.FlinkAvroWriter;
import org.apache.iceberg.flink.data.FlinkOrcWriter;
import org.apache.iceberg.flink.data.FlinkParquetWriters;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.Map;
import java.util.stream.Stream;
import org.apache.avro.generic.GenericData;
import org.apache.avro.util.Utf8;
import org.apache.flink.table.data.DecimalData;
Expand All @@ -32,11 +34,14 @@
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.encryption.EncryptedFiles;
import org.apache.iceberg.encryption.EncryptedInputFile;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ByteBuffers;
Expand All @@ -50,23 +55,38 @@
abstract class DataIterator<T> implements CloseableIterator<T> {

private Iterator<FileScanTask> tasks;
private final FileIO io;
private final EncryptionManager encryption;
private final Map<String, InputFile> inputFiles;

private CloseableIterator<T> currentIterator;

DataIterator(CombinedScanTask task, FileIO io, EncryptionManager encryption) {
this.tasks = task.files().iterator();
this.io = io;
this.encryption = encryption;

Map<String, ByteBuffer> keyMetadata = Maps.newHashMap();
task.files().stream()
.flatMap(fileScanTask -> Stream.concat(Stream.of(fileScanTask.file()), fileScanTask.deletes().stream()))
.forEach(file -> keyMetadata.put(file.path().toString(), file.keyMetadata()));
Stream<EncryptedInputFile> encrypted = keyMetadata.entrySet().stream()
.map(entry -> EncryptedFiles.encryptedInput(io.newInputFile(entry.getKey()), entry.getValue()));

// decrypt with the batch call to avoid multiple RPCs to a key server, if possible
Iterable<InputFile> decryptedFiles = encryption.decrypt(encrypted::iterator);

ImmutableMap.Builder<String, InputFile> inputFileBuilder = ImmutableMap.builder();
decryptedFiles.forEach(decrypted -> inputFileBuilder.put(decrypted.location(), decrypted));
this.inputFiles = inputFileBuilder.build();

this.currentIterator = CloseableIterator.empty();
}

InputFile getInputFile(FileScanTask task) {
Preconditions.checkArgument(!task.isDataTask(), "Invalid task type");
return encryption.decrypt(EncryptedFiles.encryptedInput(
io.newInputFile(task.file().path().toString()),
task.file().keyMetadata()));

return inputFiles.get(task.file().path().toString());
}

InputFile getInputFile(String location) {
return inputFiles.get(location);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,17 @@ public class FlinkInputFormat extends RichInputFormat<RowData, FlinkInputSplit>
private static final long serialVersionUID = 1L;

private final TableLoader tableLoader;
private final Schema tableSchema;
private final FileIO io;
private final EncryptionManager encryption;
private final ScanContext context;

private transient RowDataIterator iterator;

FlinkInputFormat(TableLoader tableLoader, FileIO io, EncryptionManager encryption, ScanContext context) {
FlinkInputFormat(TableLoader tableLoader, Schema tableSchema, FileIO io, EncryptionManager encryption,
ScanContext context) {
this.tableLoader = tableLoader;
this.tableSchema = tableSchema;
this.io = io;
this.encryption = encryption;
this.context = context;
Expand Down Expand Up @@ -88,7 +91,8 @@ public void configure(Configuration parameters) {
@Override
public void open(FlinkInputSplit split) {
this.iterator = new RowDataIterator(
split.getTask(), io, encryption, context.projectedSchema(), context.nameMapping(), context.caseSensitive());
split.getTask(), io, encryption, tableSchema, context.projectedSchema(), context.nameMapping(),
context.caseSensitive());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public FlinkInputFormat buildFormat() {
context = context.project(projectedSchema == null ? icebergSchema :
FlinkSchemaUtil.convert(icebergSchema, projectedSchema));

return new FlinkInputFormat(tableLoader, io, encryption, context);
return new FlinkInputFormat(tableLoader, icebergSchema, io, encryption, context);
}

public DataStream<RowData> build() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,19 @@
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.data.DeleteFilter;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.RowDataWrapper;
import org.apache.iceberg.flink.data.FlinkAvroReader;
import org.apache.iceberg.flink.data.FlinkOrcReader;
import org.apache.iceberg.flink.data.FlinkParquetReaders;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.mapping.NameMappingParser;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
Expand All @@ -43,13 +48,15 @@

class RowDataIterator extends DataIterator<RowData> {

private final Schema tableSchema;
private final Schema projectedSchema;
private final String nameMapping;
private final boolean caseSensitive;

RowDataIterator(CombinedScanTask task, FileIO io, EncryptionManager encryption, Schema projectedSchema,
String nameMapping, boolean caseSensitive) {
RowDataIterator(CombinedScanTask task, FileIO io, EncryptionManager encryption, Schema tableSchema,
Schema projectedSchema, String nameMapping, boolean caseSensitive) {
super(task, io, encryption);
this.tableSchema = tableSchema;
this.projectedSchema = projectedSchema;
this.nameMapping = nameMapping;
this.caseSensitive = caseSensitive;
Expand All @@ -61,26 +68,29 @@ protected CloseableIterator<RowData> openTaskIterator(FileScanTask task) {

Map<Integer, ?> idToConstant = partitionSchema.columns().isEmpty() ? ImmutableMap.of() :
PartitionUtil.constantsMap(task, RowDataIterator::convertConstant);
CloseableIterable<RowData> iterable = newIterable(task, idToConstant);

FlinkDeleteFilter deletes = new FlinkDeleteFilter(task, tableSchema, projectedSchema);
CloseableIterable<RowData> iterable = deletes.filter(newIterable(task, deletes.requiredSchema(), idToConstant));

return iterable.iterator();
}

private CloseableIterable<RowData> newIterable(FileScanTask task, Map<Integer, ?> idToConstant) {
private CloseableIterable<RowData> newIterable(FileScanTask task, Schema schema, Map<Integer, ?> idToConstant) {
CloseableIterable<RowData> iter;
if (task.isDataTask()) {
throw new UnsupportedOperationException("Cannot read data task.");
} else {
switch (task.file().format()) {
case PARQUET:
iter = newParquetIterable(task, idToConstant);
iter = newParquetIterable(task, schema, idToConstant);
break;

case AVRO:
iter = newAvroIterable(task, idToConstant);
iter = newAvroIterable(task, schema, idToConstant);
break;

case ORC:
iter = newOrcIterable(task, idToConstant);
iter = newOrcIterable(task, schema, idToConstant);
break;

default:
Expand All @@ -92,12 +102,12 @@ private CloseableIterable<RowData> newIterable(FileScanTask task, Map<Integer, ?
return iter;
}

private CloseableIterable<RowData> newAvroIterable(FileScanTask task, Map<Integer, ?> idToConstant) {
private CloseableIterable<RowData> newAvroIterable(FileScanTask task, Schema schema, Map<Integer, ?> idToConstant) {
Avro.ReadBuilder builder = Avro.read(getInputFile(task))
.reuseContainers()
.project(projectedSchema)
.project(schema)
.split(task.start(), task.length())
.createReaderFunc(readSchema -> new FlinkAvroReader(projectedSchema, readSchema, idToConstant));
.createReaderFunc(readSchema -> new FlinkAvroReader(schema, readSchema, idToConstant));

if (nameMapping != null) {
builder.withNameMapping(NameMappingParser.fromJson(nameMapping));
Expand All @@ -106,12 +116,13 @@ private CloseableIterable<RowData> newAvroIterable(FileScanTask task, Map<Intege
return builder.build();
}

private CloseableIterable<RowData> newParquetIterable(FileScanTask task, Map<Integer, ?> idToConstant) {
private CloseableIterable<RowData> newParquetIterable(FileScanTask task, Schema schema,
Map<Integer, ?> idToConstant) {
Parquet.ReadBuilder builder = Parquet.read(getInputFile(task))
.reuseContainers()
.split(task.start(), task.length())
.project(projectedSchema)
.createReaderFunc(fileSchema -> FlinkParquetReaders.buildReader(projectedSchema, fileSchema, idToConstant))
.project(schema)
.createReaderFunc(fileSchema -> FlinkParquetReaders.buildReader(schema, fileSchema, idToConstant))
.filter(task.residual())
.caseSensitive(caseSensitive)
.reuseContainers();
Expand All @@ -123,14 +134,14 @@ private CloseableIterable<RowData> newParquetIterable(FileScanTask task, Map<Int
return builder.build();
}

private CloseableIterable<RowData> newOrcIterable(FileScanTask task, Map<Integer, ?> idToConstant) {
Schema readSchemaWithoutConstantAndMetadataFields = TypeUtil.selectNot(projectedSchema,
private CloseableIterable<RowData> newOrcIterable(FileScanTask task, Schema schema, Map<Integer, ?> idToConstant) {
Schema readSchemaWithoutConstantAndMetadataFields = TypeUtil.selectNot(schema,
Sets.union(idToConstant.keySet(), MetadataColumns.metadataFieldIds()));

ORC.ReadBuilder builder = ORC.read(getInputFile(task))
.project(readSchemaWithoutConstantAndMetadataFields)
.split(task.start(), task.length())
.createReaderFunc(readOrcSchema -> new FlinkOrcReader(projectedSchema, readOrcSchema, idToConstant))
.createReaderFunc(readOrcSchema -> new FlinkOrcReader(schema, readOrcSchema, idToConstant))
.filter(task.residual())
.caseSensitive(caseSensitive);

Expand All @@ -140,4 +151,23 @@ private CloseableIterable<RowData> newOrcIterable(FileScanTask task, Map<Integer

return builder.build();
}

private class FlinkDeleteFilter extends DeleteFilter<RowData> {
private final RowDataWrapper asStructLike;

FlinkDeleteFilter(FileScanTask task, Schema tableSchema, Schema requestedSchema) {
super(task, tableSchema, requestedSchema);
this.asStructLike = new RowDataWrapper(FlinkSchemaUtil.convert(requiredSchema()), requiredSchema().asStruct());
}

@Override
protected StructLike asStructLike(RowData row) {
return asStructLike.wrap(row);
}

@Override
protected InputFile getInputFile(String location) {
return RowDataIterator.this.getInputFile(location);
}
}
}
Loading

0 comments on commit a238a90

Please sign in to comment.