Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: Emit watermarks from the IcebergSource #8553

Merged
merged 15 commits into from
Nov 23, 2023
Merged

Conversation

pvary
Copy link
Contributor

@pvary pvary commented Sep 12, 2023

There are several cases when it is worthwhile to emitt watermarks from the source.
In these cases we need to make sure that the emitted watermarks are in order.
If we do this correctly we could be use:

  • Windowing operators on the source
  • Watermark Alignment to throttle runaway sources

The PR has 2 main additions to the current IcebergSource:

  • Adding a new IcebergSourceRecordEmitter interface to do the actual record emission
    • Default implementation which does only the record emission
    • EventTimeExtractorRecordEmitter which emits the watermark for every split and emits the record too
  • Adding a new IcebergEventTimeExtractor interface to extract the split watermark, and the record timestamp

The solution uses the OrderedSplitAssignerFactory to order the splits based on the split watermark, and enhances the IcebergSource so the users could provide the eventTimeExtractor

@jerqi
Copy link
Contributor

jerqi commented Sep 13, 2023

Minor typo about title

Emitt -> Emit

@pvary pvary changed the title Flink: Emitt watermarks from the IcebergSource Flink: Emit watermarks from the IcebergSource Sep 13, 2023
@pvary
Copy link
Contributor Author

pvary commented Sep 13, 2023

Minor typo about title

Emitt -> Emit

Fixed, thanks!

@pvary
Copy link
Contributor Author

pvary commented Sep 14, 2023

cc: @sundargates - you might be interested in this

@stevenzwu
Copy link
Contributor

Here is the design doc that discussed the proposal from this PR compared to a previously approach: https://docs.google.com/document/d/1JCF77SkeB8Z4h5pXRCRcMh4aiAVuBzHM5IwohLwNCto.

We are going with this approach due to simpler implementation and reuse of most of what Flink framework provided in terms of watermark alignment.

@pvary
Copy link
Contributor Author

pvary commented Oct 11, 2023

Created #8803 to enable the possibility to avoid keeping all of the stats when creating the ScanTasks

@pvary pvary force-pushed the watermark branch 2 times, most recently from 4e92111 to 372f93d Compare November 15, 2023 12:06
@pvary
Copy link
Contributor Author

pvary commented Nov 15, 2023

@stevenzwu, @sundargates: Finally, I was able to get #8803 in. So I updated the PR, and the new versions allows to set a Timestamp column name as a watermark source column:

 IcebergSource.<RowData>builder()
        .tableLoader(sourceTableResource.tableLoader())
        .watermarkColumn("ts") 

Please review!
Thanks!

Copy link
Contributor

@stevenzwu stevenzwu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still need to check the unit test code. didn't quite get it on the first look

});

// Use static variable to collect the windows, since other solutions were flaky
windows.clear();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is somewhat inelegant, but the windowed.collectAsync() solution was flaky. Failed once per ~1000 runs. Even though the window operator was called, and emitted the records (checked by logs), the results did not show up in the resultIterator. Maybe some Flink glitch?

So opted for a static ConcurrentMap instead to collect the generated windows.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure how the windows variable work here. won't it be serialized and shipped to the task manager?

If the windowing testing is flaky, I would be inclined to just remove this. I feel the throttling test method below this is more relevant and clear.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I would use a non-static windows variable, then it will be serialized, and I could not use it in my test. That is why I had to use a static variable - which only works correctly in tests which runs everything in one JVM.

Flakiness is solved by this change.

Copy link
Contributor

@stevenzwu stevenzwu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one more nit comment on Javadoc.

otherwise, LGTM

@@ -429,6 +444,30 @@ public Builder<T> setAll(Map<String, String> properties) {
return this;
}

/**
* Emits watermarks once per split based on the file statistics for the given split. The
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: based on the file statistics -> based on the min value of column statistics from file metadata

@@ -453,6 +492,18 @@ public IcebergSource<T> build() {
contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedFlinkSchema));
}

SerializableRecordEmitter<T> emitter = SerializableRecordEmitter.defaultEmitter();
if (watermarkColumn != null) {
Copy link
Contributor

@stevenzwu stevenzwu Nov 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine with tabling this as a follow-up.

  • The default combining logic is fine for batch processing where ordering doesn't matter. for streaming read and watermark alignment, ordering is important as it affects data buffering in the flink state. That is precisely what watermark alignment intended to solve.
  • Flink watermark alignment has been designed mostly with unbounded splits (like Kafka) in mind. also within an unbounded Kafka split, records are FIFO. there is a natural order within a split.

@pvary not sure every users understand the internal details. At least, we can at least document this option of disabling combining for better ordering in the doc then. Then users can make an informed choice. @dchristle can probably also chime in here and help review the doc change in a follow up PR.

this.eventTimeFieldId = field.fieldId();
this.eventTimeFieldName = eventTimeFieldName;
// Use the timeUnit only for Long columns.
this.timeUnit = typeID.equals(TypeID.LONG) ? timeUnit : TimeUnit.MICROSECONDS;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pvary let's create an issue to follow up on the time unit when PR #9008 is merged

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the pointer!
Created #9137 to tackle this

});

// Use static variable to collect the windows, since other solutions were flaky
windows.clear();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure how the windows variable work here. won't it be serialized and shipped to the task manager?

If the windowing testing is flaky, I would be inclined to just remove this. I feel the throttling test method below this is more relevant and clear.

@pvary pvary merged commit 0831eb0 into apache:main Nov 23, 2023
45 checks passed
@pvary
Copy link
Contributor Author

pvary commented Nov 23, 2023

Merged to main.
Thanks @stevenzwu, @dchristle, @sundargates, @gyfora and @jerqi for the reviews!

@pvary pvary deleted the watermark branch November 23, 2023 09:41
pvary pushed a commit to pvary/iceberg that referenced this pull request Nov 23, 2023
pvary pushed a commit to pvary/iceberg that referenced this pull request Nov 23, 2023
pvary pushed a commit to pvary/iceberg that referenced this pull request Nov 24, 2023
pvary pushed a commit to pvary/iceberg that referenced this pull request Nov 24, 2023
pvary pushed a commit to pvary/iceberg that referenced this pull request Nov 24, 2023
pvary pushed a commit to pvary/iceberg that referenced this pull request Nov 24, 2023
pvary pushed a commit to pvary/iceberg that referenced this pull request Nov 24, 2023
pvary pushed a commit to pvary/iceberg that referenced this pull request Nov 24, 2023
pvary pushed a commit to pvary/iceberg that referenced this pull request Nov 27, 2023
pvary added a commit that referenced this pull request Nov 28, 2023
lisirrx pushed a commit to lisirrx/iceberg that referenced this pull request Jan 4, 2024
mas-chen pushed a commit to mas-chen/iceberg that referenced this pull request Jan 9, 2024
rodmeneses pushed a commit to rodmeneses/iceberg that referenced this pull request Feb 19, 2024
rodmeneses added a commit to rodmeneses/iceberg that referenced this pull request Feb 19, 2024
devangjhabakh pushed a commit to cdouglas/iceberg that referenced this pull request Apr 22, 2024
devangjhabakh pushed a commit to cdouglas/iceberg that referenced this pull request Apr 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants