-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(ingest): Call source_helpers via new WorkUnitProcessors in base Source #8101
refactor(ingest): Call source_helpers via new WorkUnitProcessors in base Source #8101
Conversation
@@ -119,14 +118,9 @@ def auto_workunit_reporter(report: SourceReport, stream: Iterable[T]) -> Iterabl | |||
|
|||
def auto_materialize_referenced_tags( | |||
stream: Iterable[MetadataWorkUnit], | |||
active: bool = True, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now handled by passing in None
for the workunit processor, although I don't think this was ever passed as False
state_type_class=GenericCheckpointState, | ||
pipeline_name=self.ctx.pipeline_name, | ||
run_id=self.ctx.run_id, | ||
self.stale_entity_removal_handler = StaleEntityRemovalHandler.create( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This gets used to call self.stale_entity_removal_handler.add_urn_to_skip(node_datahub_urn)
at some point
@@ -1197,6 +1185,17 @@ def get_workspace_workunit( | |||
for workunit in dataset_workunits: | |||
yield workunit | |||
|
|||
def get_workunit_processors(self) -> Sequence[Optional[MetadataWorkUnitProcessor]]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pay attention to this file, some pretty non-standard behavior. Not sure if there's a cleaner way to do this... could directly overwrite get_workunits
as one option
def get_workunits(self) -> Iterable[MetadataWorkUnit]: | ||
return auto_workunit_reporter(self.report, self.get_workunits_internal()) | ||
def get_workunit_processors(self) -> Sequence[Optional[MetadataWorkUnitProcessor]]: | ||
return [partial(auto_workunit_reporter, self.report)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a comment that not calling super() is intentional here
@@ -155,9 +176,35 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> "Source": | |||
# can't make this method abstract. | |||
raise NotImplementedError('sources must implement "create"') | |||
|
|||
@abstractmethod | |||
def get_workunit_processors(self) -> Sequence[Optional[MetadataWorkUnitProcessor]]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should change this return type to List[Optional[MetadataWorkUnitProcessor]]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we can append
to it? I guess it's more specific.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not for append - we shouldn't be using append here
Sequence is a bit weird to work with in some places with mypy. We can always make it more general in the future, but let's not restrain ourselves unnecessarily
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh really, what issues have you seen with Sequence? In general, I think we want to use Sequence
(if possible) for parameters but List
/ other specific types for return types
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually i think sequences are fine. iirc the issue i've seen is that Iterable is not covariant (so passing Iterable[Square] to a function that takes Iterable[Shape] would throw an error), but sequence works fine there
In general, I think we want to use Sequence (if possible) for parameters but List / other specific types for return types
yup this seems like a good rule of thumb - might be worth adding to the list here https://datahubproject.io/docs/metadata-ingestion/developing/#code-style
) | ||
|
||
|
||
def get_current_checkpoint_from_pipeline( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we're gonna get merge conflicts on this with my PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ref #8104
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@@ -155,9 +177,35 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> "Source": | |||
# can't make this method abstract. | |||
raise NotImplementedError('sources must implement "create"') | |||
|
|||
@abstractmethod | |||
def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: | |||
"""A list of functions that transforms the workunits produced by this source. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the returned list can have None ?
Not clear to me why we need Optional[MetadataWorkUnitProcessor]
as opposed to simply MetadataWorkUnitProcessor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, the idea is we can do something like return [*super().get_workunit_processors(), other_workunit_processor if self.config.flag else None]
StaleEntityRemovalHandler.create( | ||
self, self.config, self.ctx | ||
).workunit_processor, | ||
] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stale Entity Removal would emit some workunits, which won't be reported I believe, as workunit_processor for stale entity removal comes after auto_workunit_reporter. The same was the case earlier, so guessing thats okay.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, was trying to match existing behavior. If we want it the other way we can always make the change
Somewhat serious refactor that provides a default implementation of
get_workunits
and changesget_workunits_internal
to be the main overriden method. Addsget_workunit_processors()
which returns a list of callables that take in aWorkUnit
stream and alter that stream (i.e. source helpers). Provides a default list of workunit processors (source helpers) that can be changed by overriding the method.Checklist