diff --git a/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/DefaultRecordFilterPipeline.java b/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/DefaultRecordFilterPipeline.java index 58d2cdb27..bec0b8617 100644 --- a/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/DefaultRecordFilterPipeline.java +++ b/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/DefaultRecordFilterPipeline.java @@ -96,13 +96,15 @@ public RecordsIterable> apply(final RecordsIterable record = iterator.next(); boolean doHasNext = hasNext || iterator.hasNext(); - FilterContext context = getContextFor(record.offset(), this.context.metadata()); + // Create new context for current record and file-object metadata. + FilterContext context = newContextFor(record.offset(), this.context.metadata()); + // Apply the filter-chain on current record. results.addAll(apply(context, record.value(), doHasNext)); } return new RecordsIterable<>(results); } - private FilterContext getContextFor(final FileRecordOffset offset, + private FilterContext newContextFor(final FileRecordOffset offset, final FileObjectMeta metadata) { return FilterContextBuilder .newBuilder() @@ -141,7 +143,7 @@ private class FilterNode { * Creates a new {@link FilterNode} instance. * * @param filter the current filter. - * @param onSuccess the next filter ot be apply on success. + * @param onSuccess the next filter ot be applied on success. */ private FilterNode(final RecordFilter filter, final FilterNode onSuccess) { @@ -181,7 +183,8 @@ public List> apply(final FilterContext context, LOG.error( "Error occurred while executing filter '{}' on record='{}'", filter.label(), - record); + record + ); throw e; } // Some filters can aggregate records which follow each other by maintaining internal buffers. @@ -194,15 +197,17 @@ public List> apply(final FilterContext context, filtered.addAll(flushed); if (filter.onFailure() != null) { - final FilterContext errorContext = FilterContextBuilder.newBuilder(context) + final FilterContext errorContext = FilterContextBuilder + .newBuilder(context) .withError(FilterError.of(e, filter.label())) .build(); filtered.addAll(filter.onFailure().apply(errorContext, record, hasNext)); + // Ignore exception (i.e. ignoreFailure = true) } else { if (onSuccess != null) { filtered.addAll(onSuccess.apply(context, record, hasNext)); } else { - filtered.add(new TypedFileRecord(context.offset(), record)); + filtered.add(newRecordFor(context, record)); } } return filtered; @@ -214,17 +219,16 @@ public List> apply(final FilterContext context, filtered.addAll(onSuccess.apply(context, record, hasNext)); } else { if (!hasNext) { - final List> flushed = flush(context); - filtered.addAll(flushed); + filtered.addAll(flush(context)); } - // add current record to filtered result. - filtered.add(new TypedFileRecord(context.offset(), record)); + // add current record to filtered result and make sure to copy all context information. + filtered.add(newRecordFor(context, record)); } return filtered; } - private TypedFileRecord newRecordFor(final FilterContext context, final TypedStruct s) { - return new TypedFileRecord(context.offset(), s) + private TypedFileRecord newRecordFor(final FilterContext context, final TypedStruct object) { + return new TypedFileRecord(context.offset(), object) .withTopic(context.topic()) .withPartition(context.partition()) .withTimestamp(context.timestamp()) @@ -248,7 +252,7 @@ List> flush(final FilterContext context) { while (iterator.hasNext()) { final FileRecord record = iterator.next(); // create a new context for buffered records. - final FilterContext renewedContext = getContextFor(record.offset(), context.metadata()); + final FilterContext renewedContext = newContextFor(record.offset(), context.metadata()); filtered.addAll(onSuccess.apply(renewedContext, record.value(), iterator.hasNext())); } } else { diff --git a/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/RecordFilter.java b/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/RecordFilter.java index 19a08b29c..def66b883 100644 --- a/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/RecordFilter.java +++ b/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/RecordFilter.java @@ -73,7 +73,7 @@ RecordsIterable apply(final FilterContext context, /** * Clears all internal states (i.s buffered records) - * This method is invoke each time records from a new file is starting to be filtered. + * This method is invoked each time records from a new file is starting to be filtered. */ default void clear() {