Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(ingest): Call source_helpers via new WorkUnitProcessors in base Source #8101

Merged
merged 8 commits into from
May 24, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 51 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/api/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,21 @@
from collections import defaultdict
from dataclasses import dataclass, field
from enum import Enum
from typing import Dict, Generic, Iterable, Optional, Set, Type, TypeVar, Union, cast
from functools import partial
from typing import (
Callable,
Dict,
Generic,
Iterable,
List,
Optional,
Sequence,
Set,
Type,
TypeVar,
Union,
cast,
)

from pydantic import BaseModel

Expand All @@ -12,6 +26,11 @@
from datahub.ingestion.api.closeable import Closeable
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope, WorkUnit
from datahub.ingestion.api.report import Report
from datahub.ingestion.api.source_helpers import (
auto_materialize_referenced_tags,
auto_status_aspect,
auto_workunit_reporter,
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.utilities.lossy_collections import LossyDict, LossyList
Expand Down Expand Up @@ -118,6 +137,9 @@ class TestConnectionReport(Report):
WorkUnitType = TypeVar("WorkUnitType", bound=WorkUnit)
ExtractorConfig = TypeVar("ExtractorConfig", bound=ConfigModel)

WorkUnitProcessor = Callable[[Iterable[WorkUnitType]], Iterable[WorkUnitType]]
MetadataWorkUnitProcessor = WorkUnitProcessor[MetadataWorkUnit]


class Extractor(Generic[WorkUnitType, ExtractorConfig], Closeable, metaclass=ABCMeta):
ctx: PipelineContext
Expand Down Expand Up @@ -155,9 +177,35 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> "Source":
# can't make this method abstract.
raise NotImplementedError('sources must implement "create"')

@abstractmethod
def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
"""A list of functions that transforms the workunits produced by this source.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the returned list can have None ?
Not clear to me why we need Optional[MetadataWorkUnitProcessor] as opposed to simply MetadataWorkUnitProcessor

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the idea is we can do something like return [*super().get_workunit_processors(), other_workunit_processor if self.config.flag else None]

Run in order, first in list is applied first. Be careful with order when overriding.
"""
return [
auto_status_aspect,
auto_materialize_referenced_tags,
partial(auto_workunit_reporter, self.get_report()),
]

@staticmethod
def _apply_workunit_processors(
workunit_processors: Sequence[Optional[MetadataWorkUnitProcessor]],
stream: Iterable[MetadataWorkUnit],
) -> Iterable[MetadataWorkUnit]:
for processor in workunit_processors:
if processor is not None:
stream = processor(stream)
return stream

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
pass
return self._apply_workunit_processors(
self.get_workunit_processors(), self.get_workunits_internal()
)

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
raise NotImplementedError(
"get_workunits_internal must be implemented if get_workunits is not overriden."
)

@abstractmethod
def get_report(self) -> SourceReport:
Expand Down
24 changes: 10 additions & 14 deletions metadata-ingestion/src/datahub/ingestion/api/source_helpers.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
from typing import Callable, Iterable, Optional, Set, TypeVar, Union
from typing import TYPE_CHECKING, Callable, Iterable, Optional, Set, TypeVar, Union

from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import WorkUnit
from datahub.ingestion.api.source import SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
)
from datahub.metadata.schema_classes import (
MetadataChangeEventClass,
MetadataChangeProposalClass,
Expand All @@ -17,6 +12,12 @@
from datahub.utilities.urns.urn import guess_entity_type
from datahub.utilities.urns.urn_iter import list_urns

if TYPE_CHECKING:
from datahub.ingestion.api.source import SourceReport
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
)


def auto_workunit(
stream: Iterable[Union[MetadataChangeEventClass, MetadataChangeProposalWrapper]]
Expand Down Expand Up @@ -78,7 +79,7 @@ def _default_entity_type_fn(wu: MetadataWorkUnit) -> Optional[str]:


def auto_stale_entity_removal(
stale_entity_removal_handler: StaleEntityRemovalHandler,
stale_entity_removal_handler: "StaleEntityRemovalHandler",
stream: Iterable[MetadataWorkUnit],
entity_type_fn: Callable[
[MetadataWorkUnit], Optional[str]
Expand All @@ -104,10 +105,10 @@ def auto_stale_entity_removal(
yield from stale_entity_removal_handler.gen_removed_entity_workunits()


T = TypeVar("T", bound=WorkUnit)
T = TypeVar("T", bound=MetadataWorkUnit)


def auto_workunit_reporter(report: SourceReport, stream: Iterable[T]) -> Iterable[T]:
def auto_workunit_reporter(report: "SourceReport", stream: Iterable[T]) -> Iterable[T]:
"""
Calls report.report_workunit() on each workunit.
"""
Expand All @@ -119,14 +120,9 @@ def auto_workunit_reporter(report: SourceReport, stream: Iterable[T]) -> Iterabl

def auto_materialize_referenced_tags(
stream: Iterable[MetadataWorkUnit],
active: bool = True,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now handled by passing in None for the workunit processor, although I don't think this was ever passed as False

) -> Iterable[MetadataWorkUnit]:
"""For all references to tags, emit a tag key aspect to ensure that the tag exists in our backend."""

if not active:
yield from stream
return

referenced_tags = set()
tags_with_aspects = set()

Expand Down
32 changes: 8 additions & 24 deletions metadata-ingestion/src/datahub/ingestion/source/aws/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,7 @@
platform_name,
support_status,
)
from datahub.ingestion.api.source_helpers import (
auto_stale_entity_removal,
auto_status_aspect,
auto_workunit_reporter,
)
from datahub.ingestion.api.source import MetadataWorkUnitProcessor
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.aws import s3_util
from datahub.ingestion.source.aws.aws_common import AwsSourceConfig
Expand All @@ -62,9 +58,6 @@
DatasetSubTypes,
)
from datahub.ingestion.source.glue_profiling_config import GlueProfilingConfig
from datahub.ingestion.source.state.sql_common_state import (
BaseSQLAlchemyCheckpointState,
)
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
StaleEntityRemovalSourceReport,
Expand Down Expand Up @@ -273,15 +266,6 @@ def __init__(self, config: GlueSourceConfig, ctx: PipelineContext):
self.extract_transforms = config.extract_transforms
self.env = config.env

# Create and register the stateful ingestion use-case handlers.
self.stale_entity_removal_handler = StaleEntityRemovalHandler(
source=self,
config=self.source_config,
state_type_class=BaseSQLAlchemyCheckpointState,
pipeline_name=self.ctx.pipeline_name,
run_id=self.ctx.run_id,
)

def get_glue_arn(
self, account_id: str, database: str, table: Optional[str] = None
) -> str:
Expand Down Expand Up @@ -919,13 +903,13 @@ def _get_domain_wu(
domain_urn=domain_urn,
)

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
return auto_stale_entity_removal(
self.stale_entity_removal_handler,
auto_workunit_reporter(
self.report, auto_status_aspect(self.get_workunits_internal())
),
)
def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
return [
*super().get_workunit_processors(),
StaleEntityRemovalHandler.create(
self, self.source_config, self.ctx
).workunit_processor,
]

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
database_seen = set()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,11 @@
)
from datahub.ingestion.api.source import (
CapabilityReport,
MetadataWorkUnitProcessor,
SourceCapability,
TestableSource,
TestConnectionReport,
)
from datahub.ingestion.api.source_helpers import (
auto_materialize_referenced_tags,
auto_stale_entity_removal,
auto_status_aspect,
auto_workunit_reporter,
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.bigquery_v2.bigquery_audit import (
BigqueryTableIdentifier,
Expand Down Expand Up @@ -80,9 +75,6 @@
from datahub.ingestion.source.state.redundant_run_skip_handler import (
RedundantRunSkipHandler,
)
from datahub.ingestion.source.state.sql_common_state import (
BaseSQLAlchemyCheckpointState,
)
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
)
Expand Down Expand Up @@ -228,15 +220,6 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config):
self.lineage_extractor = BigqueryLineageExtractor(config, self.report)
self.usage_extractor = BigQueryUsageExtractor(config, self.report)

# Create and register the stateful ingestion use-case handler.
self.stale_entity_removal_handler = StaleEntityRemovalHandler(
source=self,
config=self.config,
state_type_class=BaseSQLAlchemyCheckpointState,
pipeline_name=self.ctx.pipeline_name,
run_id=self.ctx.run_id,
)

self.domain_registry: Optional[DomainRegistry] = None
if self.config.domain:
self.domain_registry = DomainRegistry(
Expand Down Expand Up @@ -491,6 +474,14 @@ def gen_dataset_containers(
tags=tags_joined,
)

def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
return [
*super().get_workunit_processors(),
StaleEntityRemovalHandler.create(
self, self.config, self.ctx
).workunit_processor,
]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stale Entity Removal would emit some workunits, which won't be reported I believe, as workunit_processor for stale entity removal comes after auto_workunit_reporter. The same was the case earlier, so guessing thats okay.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, was trying to match existing behavior. If we want it the other way we can always make the change


def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
conn: bigquery.Client = get_bigquery_client(self.config)
self.add_config_to_report()
Expand All @@ -514,17 +505,6 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
self.report.set_ingestion_stage(project.id, "Lineage Extraction")
yield from self.generate_lineage(project.id)

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
return auto_materialize_referenced_tags(
auto_stale_entity_removal(
self.stale_entity_removal_handler,
auto_workunit_reporter(
self.report,
auto_status_aspect(self.get_workunits_internal()),
),
)
)

def _should_ingest_usage(self) -> bool:
if not self.config.include_usage_statistics:
return False
Expand Down
30 changes: 8 additions & 22 deletions metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,7 @@
platform_name,
support_status,
)
from datahub.ingestion.api.source_helpers import (
auto_materialize_referenced_tags,
auto_stale_entity_removal,
auto_status_aspect,
auto_workunit_reporter,
)
from datahub.ingestion.api.source import MetadataWorkUnitProcessor
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.common.subtypes import DatasetSubTypes
from datahub.ingestion.source.sql.sql_types import (
Expand All @@ -50,7 +45,6 @@
resolve_trino_modified_type,
resolve_vertica_modified_type,
)
from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
StaleEntityRemovalSourceReport,
Expand Down Expand Up @@ -688,12 +682,8 @@ def __init__(self, config: DBTCommonConfig, ctx: PipelineContext, platform: str)
self.config.owner_extraction_pattern
)
# Create and register the stateful ingestion use-case handler.
self.stale_entity_removal_handler = StaleEntityRemovalHandler(
source=self,
config=self.config,
state_type_class=GenericCheckpointState,
pipeline_name=self.ctx.pipeline_name,
run_id=self.ctx.run_id,
self.stale_entity_removal_handler = StaleEntityRemovalHandler.create(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This gets used to call self.stale_entity_removal_handler.add_urn_to_skip(node_datahub_urn) at some point

self, self.config, ctx
)

def create_test_entity_mcps(
Expand Down Expand Up @@ -878,15 +868,11 @@ def load_nodes(self) -> Tuple[List[DBTNode], Dict[str, Optional[str]]]:
# return dbt nodes + global custom properties
raise NotImplementedError()

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
return auto_materialize_referenced_tags(
auto_stale_entity_removal(
self.stale_entity_removal_handler,
auto_workunit_reporter(
self.report, auto_status_aspect(self.get_workunits_internal())
),
)
)
def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
return [
*super().get_workunit_processors(),
self.stale_entity_removal_handler.workunit_processor,
]

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
if self.config.write_semantics == "PATCH" and not self.ctx.graph:
Expand Down
9 changes: 6 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
from collections import defaultdict
from dataclasses import dataclass, field
from enum import auto
from functools import partial
from io import BufferedReader
from typing import Any, Dict, Iterable, Iterator, Optional, Tuple, Union
from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple, Union
from urllib import parse

import ijson
Expand All @@ -28,6 +29,7 @@
)
from datahub.ingestion.api.source import (
CapabilityReport,
MetadataWorkUnitProcessor,
SourceReport,
TestableSource,
TestConnectionReport,
Expand Down Expand Up @@ -205,8 +207,9 @@ def get_filenames(self) -> Iterable[str]:
self.report.total_num_files = 1
return [str(self.config.path)]

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
return auto_workunit_reporter(self.report, self.get_workunits_internal())
def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
# No super() call, as we don't want helpers that create / remove workunits
return [partial(auto_workunit_reporter, self.report)]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a comment that not calling super() is intentional here


def get_workunits_internal(
self,
Expand Down
Loading