Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[assets] Precalculate dependedBy for better perf, fix foreign asset deps #5917

Merged
merged 2 commits into from
Dec 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,18 +75,14 @@ def resolve_dependencies(self, _graphene_info):
]

def resolve_dependedBy(self, _graphene_info):
results = []
for item in self._external_repository.get_external_asset_nodes():
for dep in item.dependencies:
if dep.upstream_asset_key == self._external_asset_node.asset_key:
results.append(
GrapheneAssetDependency(
external_repository=self._external_repository,
input_name=dep.input_name,
asset_key=item.asset_key,
)
)
return results
return [
GrapheneAssetDependency(
external_repository=self._external_repository,
input_name=dep.input_name,
asset_key=dep.downstream_asset_key,
)
for dep in self._external_asset_node.depended_by
]

def resolve_assetMaterializations(self, graphene_info, **kwargs):
from ..implementation.fetch_assets import get_asset_events
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,9 +410,29 @@ def __new__(cls, upstream_asset_key: AssetKey, input_name: str):
)


@whitelist_for_serdes
class ExternalAssetDependedBy(
namedtuple("_ExternalAssetDependedBy", "downstream_asset_key input_name")
):
"""A definition of a directed edge in the logical asset graph.

An downstream asset that's depended by, and the corresponding input name in the upstream
asset that it depends on.
"""

def __new__(cls, downstream_asset_key: AssetKey, input_name: str):
return super(ExternalAssetDependedBy, cls).__new__(
cls,
downstream_asset_key=downstream_asset_key,
input_name=input_name,
)


@whitelist_for_serdes
class ExternalAssetNode(
namedtuple("_ExternalAssetNode", "asset_key dependencies op_name op_description job_names")
namedtuple(
"_ExternalAssetNode", "asset_key dependencies depended_by op_name op_description job_names"
)
):
"""A definition of a node in the logical asset graph.

Expand All @@ -423,6 +443,7 @@ def __new__(
cls,
asset_key: AssetKey,
dependencies: List[ExternalAssetDependency],
depended_by: List[ExternalAssetDependedBy],
op_name: Optional[str] = None,
op_description: Optional[str] = None,
job_names: Optional[List[str]] = None,
Expand All @@ -431,6 +452,7 @@ def __new__(
cls,
asset_key=asset_key,
dependencies=dependencies,
depended_by=depended_by,
op_name=op_name,
op_description=op_description,
job_names=job_names,
Expand Down Expand Up @@ -471,9 +493,11 @@ def external_asset_graph_from_defs(
node_defs_by_asset_key: Dict[
AssetKey, List[Tuple[NodeDefinition, PipelineDefinition]]
] = defaultdict(list)
deps: Dict[AssetKey, Dict[str, ExternalAssetDependency]] = defaultdict(dict)

deps: Dict[AssetKey, Dict[str, ExternalAssetDependency]] = defaultdict(dict)
dep_by: Dict[AssetKey, Dict[str, ExternalAssetDependedBy]] = defaultdict(dict)
all_upstream_asset_keys = set()

for pipeline in pipelines:
for node_def in pipeline.all_node_defs:
node_asset_keys: Set[AssetKey] = set()
Expand All @@ -488,12 +512,16 @@ def external_asset_graph_from_defs(
upstream_asset_key = input_def.hardcoded_asset_key

if upstream_asset_key:
all_upstream_asset_keys.add(upstream_asset_key)
for node_asset_key in node_asset_keys:
deps[node_asset_key][input_def.name] = ExternalAssetDependency(
upstream_asset_key=upstream_asset_key,
input_name=input_def.name,
)
all_upstream_asset_keys.add(upstream_asset_key)
dep_by[upstream_asset_key][input_def.name] = ExternalAssetDependedBy(
downstream_asset_key=node_asset_key,
input_name=input_def.name,
)

asset_keys_without_definitions = all_upstream_asset_keys.difference(
node_defs_by_asset_key.keys()
Expand All @@ -502,7 +530,8 @@ def external_asset_graph_from_defs(
asset_nodes = [
ExternalAssetNode(
asset_key=asset_key,
dependencies=[],
dependencies=list(deps[asset_key].values()),
depended_by=list(dep_by[asset_key].values()),
job_names=[],
)
for asset_key in asset_keys_without_definitions
Expand All @@ -518,7 +547,8 @@ def external_asset_graph_from_defs(
asset_nodes.append(
ExternalAssetNode(
asset_key=foreign_asset.key,
dependencies=[],
dependencies=list(deps[foreign_asset.key].values()),
depended_by=list(dep_by[foreign_asset.key].values()),
job_names=[],
op_description=foreign_asset.description,
)
Expand All @@ -531,6 +561,7 @@ def external_asset_graph_from_defs(
ExternalAssetNode(
asset_key=asset_key,
dependencies=list(deps[asset_key].values()),
depended_by=list(dep_by[asset_key].values()),
op_name=node_def.name,
op_description=node_def.description,
job_names=job_names,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from dagster.core.decorator_utils import get_function_params
from dagster.core.definitions.decorators.op import _Op
from dagster.core.host_representation.external_data import (
ExternalAssetDependedBy,
ExternalAssetDependency,
ExternalAssetNode,
ExternalSensorData,
Expand Down Expand Up @@ -47,6 +48,7 @@ def my_graph():
ExternalAssetNode(
asset_key=AssetKey("asset1"),
dependencies=[],
depended_by=[],
op_name="asset1",
op_description=None,
job_names=["my_graph"],
Expand All @@ -73,6 +75,11 @@ def my_graph():
ExternalAssetNode(
asset_key=AssetKey("asset1"),
dependencies=[],
depended_by=[
ExternalAssetDependedBy(
downstream_asset_key=AssetKey("asset2"), input_name="asset1"
)
],
op_name="asset1",
op_description=None,
job_names=["my_graph"],
Expand All @@ -82,6 +89,7 @@ def my_graph():
dependencies=[
ExternalAssetDependency(upstream_asset_key=AssetKey("asset1"), input_name="asset1")
],
depended_by=[],
op_name="asset2",
op_description=None,
job_names=["my_graph"],
Expand Down Expand Up @@ -114,6 +122,11 @@ def asset2_graph():
ExternalAssetNode(
asset_key=AssetKey("asset1"),
dependencies=[],
depended_by=[
ExternalAssetDependedBy(
downstream_asset_key=AssetKey("asset2"), input_name="asset1"
)
],
op_name="asset1",
op_description=None,
job_names=["asset1_graph"],
Expand All @@ -123,6 +136,7 @@ def asset2_graph():
dependencies=[
ExternalAssetDependency(upstream_asset_key=AssetKey("asset1"), input_name="asset1")
],
depended_by=[],
op_name="asset2",
op_description=None,
job_names=["asset2_graph"],
Expand Down Expand Up @@ -151,6 +165,7 @@ def graph2():
ExternalAssetNode(
asset_key=AssetKey("asset1"),
dependencies=[],
depended_by=[],
op_name="asset1",
op_description=None,
job_names=["graph1", "graph2"],
Expand All @@ -167,10 +182,18 @@ def test_unused_foreign_asset():
)
assert external_asset_nodes == [
ExternalAssetNode(
asset_key=AssetKey("foo"), op_description="abc", dependencies=[], job_names=[]
asset_key=AssetKey("foo"),
op_description="abc",
dependencies=[],
depended_by=[],
job_names=[],
),
ExternalAssetNode(
asset_key=AssetKey("bar"), op_description="def", dependencies=[], job_names=[]
asset_key=AssetKey("bar"),
op_description="def",
dependencies=[],
depended_by=[],
job_names=[],
),
]

Expand All @@ -191,7 +214,13 @@ def job1():
)
assert external_asset_nodes == [
ExternalAssetNode(
asset_key=AssetKey("bar"), op_description="def", dependencies=[], job_names=[]
asset_key=AssetKey("bar"),
op_description="def",
dependencies=[],
depended_by=[
ExternalAssetDependedBy(downstream_asset_key=AssetKey(["foo"]), input_name="bar")
],
job_names=[],
),
ExternalAssetNode(
asset_key=AssetKey("foo"),
Expand All @@ -200,6 +229,7 @@ def job1():
dependencies=[
ExternalAssetDependency(upstream_asset_key=AssetKey(["bar"]), input_name="bar")
],
depended_by=[],
job_names=["job1"],
),
]
Expand Down