Skip to content

Commit

Permalink
fix(api): fix filter-pipeline to always copy record context between f…
Browse files Browse the repository at this point in the history
…ilters (#235)

Resolves: #235
  • Loading branch information
fhussonnois committed Mar 21, 2022
1 parent f639209 commit e8cbbf2
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,15 @@ public RecordsIterable<FileRecord<TypedStruct>> apply(final RecordsIterable<File
while (iterator.hasNext()) {
FileRecord<TypedStruct> record = iterator.next();
boolean doHasNext = hasNext || iterator.hasNext();
FilterContext context = getContextFor(record.offset(), this.context.metadata());
// Create new context for current record and file-object metadata.
FilterContext context = newContextFor(record.offset(), this.context.metadata());
// Apply the filter-chain on current record.
results.addAll(apply(context, record.value(), doHasNext));
}
return new RecordsIterable<>(results);
}

private FilterContext getContextFor(final FileRecordOffset offset,
private FilterContext newContextFor(final FileRecordOffset offset,
final FileObjectMeta metadata) {
return FilterContextBuilder
.newBuilder()
Expand Down Expand Up @@ -141,7 +143,7 @@ private class FilterNode {
* Creates a new {@link FilterNode} instance.
*
* @param filter the current filter.
* @param onSuccess the next filter ot be apply on success.
* @param onSuccess the next filter ot be applied on success.
*/
private FilterNode(final RecordFilter filter,
final FilterNode onSuccess) {
Expand Down Expand Up @@ -181,7 +183,8 @@ public List<FileRecord<TypedStruct>> apply(final FilterContext context,
LOG.error(
"Error occurred while executing filter '{}' on record='{}'",
filter.label(),
record);
record
);
throw e;
}
// Some filters can aggregate records which follow each other by maintaining internal buffers.
Expand All @@ -194,15 +197,17 @@ public List<FileRecord<TypedStruct>> apply(final FilterContext context,
filtered.addAll(flushed);

if (filter.onFailure() != null) {
final FilterContext errorContext = FilterContextBuilder.newBuilder(context)
final FilterContext errorContext = FilterContextBuilder
.newBuilder(context)
.withError(FilterError.of(e, filter.label()))
.build();
filtered.addAll(filter.onFailure().apply(errorContext, record, hasNext));
// Ignore exception (i.e. ignoreFailure = true)
} else {
if (onSuccess != null) {
filtered.addAll(onSuccess.apply(context, record, hasNext));
} else {
filtered.add(new TypedFileRecord(context.offset(), record));
filtered.add(newRecordFor(context, record));
}
}
return filtered;
Expand All @@ -214,17 +219,16 @@ public List<FileRecord<TypedStruct>> apply(final FilterContext context,
filtered.addAll(onSuccess.apply(context, record, hasNext));
} else {
if (!hasNext) {
final List<FileRecord<TypedStruct>> flushed = flush(context);
filtered.addAll(flushed);
filtered.addAll(flush(context));
}
// add current record to filtered result.
filtered.add(new TypedFileRecord(context.offset(), record));
// add current record to filtered result and make sure to copy all context information.
filtered.add(newRecordFor(context, record));
}
return filtered;
}

private TypedFileRecord newRecordFor(final FilterContext context, final TypedStruct s) {
return new TypedFileRecord(context.offset(), s)
private TypedFileRecord newRecordFor(final FilterContext context, final TypedStruct object) {
return new TypedFileRecord(context.offset(), object)
.withTopic(context.topic())
.withPartition(context.partition())
.withTimestamp(context.timestamp())
Expand All @@ -248,7 +252,7 @@ List<FileRecord<TypedStruct>> flush(final FilterContext context) {
while (iterator.hasNext()) {
final FileRecord<TypedStruct> record = iterator.next();
// create a new context for buffered records.
final FilterContext renewedContext = getContextFor(record.offset(), context.metadata());
final FilterContext renewedContext = newContextFor(record.offset(), context.metadata());
filtered.addAll(onSuccess.apply(renewedContext, record.value(), iterator.hasNext()));
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ RecordsIterable<TypedStruct> apply(final FilterContext context,

/**
* Clears all internal states (i.s buffered records)
* This method is invoke each time records from a new file is starting to be filtered.
* This method is invoked each time records from a new file is starting to be filtered.
*/
default void clear() {

Expand Down

0 comments on commit e8cbbf2

Please sign in to comment.