Skip to content

Commit

Permalink
Issue #150 add some debug logging
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Sep 18, 2024
1 parent c6fc7ee commit f50b85c
Showing 1 changed file with 9 additions and 3 deletions.
12 changes: 9 additions & 3 deletions src/openeo_aggregator/partitionedjobs/crossbackend.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,8 +502,6 @@ class _GraphViewer:
"""

# TODO: add more logging of what is happening under the hood

def __init__(self, node_map: dict[NodeId, _GVNode]):
self._check_consistency(node_map=node_map)
# Work with a read-only proxy to prevent accidental changes
Expand Down Expand Up @@ -532,6 +530,7 @@ def from_flat_graph(cls, flat_graph: FlatPG, supporting_backends: SupportingBack
"""
Build _GraphViewer from a flat process graph representation
"""
_log.debug(f"_GraphViewer.from_flat_graph: {flat_graph.keys()=}")
# Extract dependency links between nodes
depends_on = collections.defaultdict(list)
flows_to = collections.defaultdict(list)
Expand Down Expand Up @@ -768,8 +767,11 @@ def produce_split_locations(self, limit: int = 2) -> Iterator[List[NodeId]]:
forsaken_nodes = sorted(
forsaken_nodes, key=lambda n: sum(p in forsaken_nodes for p in self.node(n).depends_on)
)
_log.debug(f"_GraphViewer.produce_split_locations: {forsaken_nodes=}")

# Collect nodes where we could split the graph in disjoint subgraphs
articulation_points: Set[NodeId] = set(self.find_articulation_points())
_log.debug(f"_GraphViewer.produce_split_locations: {articulation_points=}")

# TODO: allow/deny lists of what openEO processes can be split on? E.g. only split raster cube paths

Expand All @@ -779,13 +781,14 @@ def produce_split_locations(self, limit: int = 2) -> Iterator[List[NodeId]]:
for n in self.walk_upstream_nodes(seeds=forsaken_nodes, include_seeds=False)
if n in articulation_points
]
_log.debug(f"_GraphViewer.produce_split_locations: {split_options=}")
if not split_options:
raise GraphSplitException("No split options found.")
# TODO: how to handle limit? will it scale feasibly to iterate over all possibilities at this point?
# TODO: smarter picking of split node (e.g. one with most upstream nodes)
assert limit > 0
for split_node_id in split_options[:limit]:
# Split graph at this articulation point
_log.debug(f"_GraphViewer.produce_split_locations: splitting at {split_node_id=}")
up, down = self.split_at(split_node_id)
if down.find_forsaken_nodes():
down_splits = list(down.produce_split_locations(limit=max(limit - 1, 1)))
Expand Down Expand Up @@ -820,6 +823,7 @@ def split(self, process_graph: FlatPG) -> _PGSplitResult:

# TODO: make picking "optimal" split location set a bit more deterministic (e.g. sort first)
(split_nodes,) = graph.produce_split_locations(limit=1)
_log.debug(f"DeepGraphSplitter.split: split nodes: {split_nodes=}")

secondary_graphs: List[_SubGraphData] = []
graph_to_split = graph
Expand All @@ -831,6 +835,7 @@ def split(self, process_graph: FlatPG) -> _PGSplitResult:
# TODO: better backend selection?
# TODO handle case where backend_candidates is None?
backend_id = sorted(backend_candidates)[0]
_log.debug(f"DeepGraphSplitter.split: secondary graph: from {split_node_id=}: {backend_id=} {node_ids=}")
secondary_graphs.append(
_SubGraphData(
split_node=split_node_id,
Expand All @@ -847,6 +852,7 @@ def split(self, process_graph: FlatPG) -> _PGSplitResult:
primary_node_ids = set(n for n, _ in primary_graph.iter_nodes())
backend_candidates = primary_graph.get_backend_candidates_for_node_set(primary_node_ids)
primary_backend_id = sorted(backend_candidates)[0]
_log.debug(f"DeepGraphSplitter.split: primary graph: {primary_backend_id=} {primary_node_ids=}")

return _PGSplitResult(
primary_node_ids=primary_node_ids,
Expand Down

0 comments on commit f50b85c

Please sign in to comment.