From fd4e6c0b713d5ec73830cdcf80a117627410cdbd Mon Sep 17 00:00:00 2001 From: Brent Bovenzi Date: Wed, 30 Nov 2022 16:35:06 -0600 Subject: [PATCH] Simplify dataset subgraph logic (#27987) * fix merging connected dataset graphs * refactor graph calculation --- .../static/js/api/useDatasetDependencies.ts | 165 ++++++++---------- 1 file changed, 75 insertions(+), 90 deletions(-) diff --git a/airflow/www/static/js/api/useDatasetDependencies.ts b/airflow/www/static/js/api/useDatasetDependencies.ts index bd61ee9087a063..a4167c6f8fbdb0 100644 --- a/airflow/www/static/js/api/useDatasetDependencies.ts +++ b/airflow/www/static/js/api/useDatasetDependencies.ts @@ -24,13 +24,16 @@ import ELK, { ElkShape, ElkExtendedEdge } from 'elkjs'; import { getMetaValue } from 'src/utils'; import type { DepEdge, DepNode } from 'src/types'; import type { NodeType } from 'src/datasets/Graph/Node'; -import { unionBy } from 'lodash'; interface DatasetDependencies { edges: DepEdge[]; nodes: DepNode[]; } +interface EdgeGroup { + edges: DepEdge[]; +} + interface GenerateProps { nodes: DepNode[]; edges: DepEdge[]; @@ -82,111 +85,92 @@ const generateGraph = ({ nodes, edges, font }: GenerateProps) => ({ edges: edges.map((e) => ({ id: `${e.source}-${e.target}`, sources: [e.source], targets: [e.target] })), }); -interface SeparateGraphsProps extends DatasetDependencies { - graphs: DatasetDependencies[]; +interface SeparateGraphsProps { + edges: DepEdge[]; + graphs: EdgeGroup[]; } -const graphIndicesToMerge: Record = {}; -const indicesToRemove: number[] = []; - // find the downstream graph of each upstream edge const findDownstreamGraph = ( - { edges, nodes, graphs = [] }: SeparateGraphsProps, -): DatasetDependencies[] => { - const newGraphs = [...graphs]; - let filteredEdges = [...edges]; - - graphs.forEach((g, i) => { - // find downstream edges - const downstreamEdges = edges.filter((e) => g.edges.some((ge) => ge.target === e.source)); - const downstreamNodes: DepNode[] = []; - - downstreamEdges.forEach((e) => { - const newNode = nodes.find((n) => n.id === e.target); - if (newNode) { - downstreamNodes.push(newNode); - - // check if the node already exists in a different graph - const existingGraphIndex = newGraphs - .findIndex(((ng) => ng.nodes.some((n) => n.id === newNode.id))); - - // mark if the graph needs to merge with another - if (existingGraphIndex > -1) { - indicesToRemove.push(existingGraphIndex); - graphIndicesToMerge[i] = [...(graphIndicesToMerge[i] || []), existingGraphIndex]; + { edges, graphs = [] }: SeparateGraphsProps, +): EdgeGroup[] => { + let unassignedEdges = [...edges]; + + const mergedGraphs = graphs + .reduce( + (newGraphs, graph) => { + const otherGroupIndex = newGraphs.findIndex( + (otherGroup) => otherGroup.edges.some( + (otherEdge) => graph.edges.some( + (edge) => edge.target === otherEdge.target, + ), + ), + ); + if (otherGroupIndex === -1) { + return [...newGraphs, graph]; } - // add node and edge to the graph - newGraphs[i] = { - nodes: [...newGraphs[i].nodes, newNode], - edges: [...newGraphs[i].edges, e], - }; - - // remove edge from edge list - filteredEdges = filteredEdges - .filter((fe) => !(fe.source === e.source && fe.target === e.target)); - } - }); - }); - - // once there are no more filtered edges left, merge relevant graphs - // we merge afterwards to make sure we captured all nodes + edges - if (!filteredEdges.length) { - Object.keys(graphIndicesToMerge).forEach((key) => { - const realKey = key as unknown as number; - const values = graphIndicesToMerge[realKey]; - values.forEach((v) => { - newGraphs[realKey] = { - nodes: unionBy(newGraphs[realKey].nodes, newGraphs[v].nodes, 'id'), - edges: [...newGraphs[realKey].edges, ...newGraphs[v].edges] - .filter((e, i, s) => ( - i === s.findIndex((t) => t.source === e.source && t.target === e.target) - )), - }; + const mergedEdges = [...newGraphs[otherGroupIndex].edges, ...graph.edges] + .filter((edge, edgeIndex, otherEdges) => ( + edgeIndex === otherEdges.findIndex( + (otherEdge) => otherEdge.source === edge.source && otherEdge.target === edge.target, + ) + )); + return [ + ...newGraphs.filter((_, newGraphIndex) => newGraphIndex !== otherGroupIndex), + { edges: mergedEdges }, + ]; + }, + [] as EdgeGroup[], + ) + .map((graph) => { + // find the next set of downstream edges and filter them out of the unassigned edges list + const downstreamEdges: DepEdge[] = []; + unassignedEdges = unassignedEdges.filter((edge) => { + const isDownstream = graph.edges.some((graphEdge) => graphEdge.target === edge.source); + if (isDownstream) downstreamEdges.push(edge); + return !isDownstream; }); + + return { + edges: [...graph.edges, ...downstreamEdges], + }; }); - return newGraphs.filter((g, i) => !indicesToRemove.some((j) => i === j)); - } - return findDownstreamGraph({ edges: filteredEdges, nodes, graphs: newGraphs }); + // recursively find downstream edges until there are no unassigned edges + return unassignedEdges.length + ? findDownstreamGraph({ edges: unassignedEdges, graphs: mergedGraphs }) + : mergedGraphs; }; // separate the list of nodes/edges into distinct dataset pipeline graphs const separateGraphs = ({ edges, nodes }: DatasetDependencies): DatasetDependencies[] => { - const separatedGraphs: DatasetDependencies[] = []; - let remainingEdges = [...edges]; - let remainingNodes = [...nodes]; - - edges.forEach((edge) => { - const isDownstream = edges.some((e) => e.target === edge.source); - - // if the edge is not downstream of anything, then start building the graph - if (!isDownstream) { - const connectedNodes = nodes.filter((n) => n.id === edge.source || n.id === edge.target); - - // check if one of the nodes is already connected to a separated graph - const nodesInUse = separatedGraphs - .findIndex((g) => g.nodes.some((n) => connectedNodes.some((nn) => nn.id === n.id))); - - if (nodesInUse > -1) { - // if one of the nodes is already in use, merge the graphs - const { nodes: existingNodes, edges: existingEdges } = separatedGraphs[nodesInUse]; - separatedGraphs[nodesInUse] = { nodes: unionBy(existingNodes, connectedNodes, 'id'), edges: [...existingEdges, edge] }; - } else { - // else just add the new separated graph - separatedGraphs.push({ nodes: connectedNodes, edges: [edge] }); - } - - // filter out used nodes and edges - remainingEdges = remainingEdges.filter((e) => e.source !== edge.source); - remainingNodes = remainingNodes.filter((n) => !connectedNodes.some((nn) => nn.id === n.id)); + const separatedGraphs: EdgeGroup[] = []; + const remainingEdges: DepEdge[] = []; + + edges.forEach((e) => { + // add a separate graph for each edge without an upstream + if (!edges.some((ee) => e.source === ee.target)) { + separatedGraphs.push({ edges: [e] }); + } else { + remainingEdges.push(e); } }); - if (remainingEdges.length) { - return findDownstreamGraph({ edges: remainingEdges, nodes, graphs: separatedGraphs }); - } - return separatedGraphs; + const edgeGraphs = findDownstreamGraph({ edges: remainingEdges, graphs: separatedGraphs }); + + // once all the edges are found, add the nodes + return edgeGraphs.map((eg) => { + const graphNodes = nodes.filter( + (n) => eg.edges.some( + (e) => e.target === n.id || e.source === n.id, + ), + ); + return ({ + edges: eg.edges, + nodes: graphNodes, + }); + }); }; const formatDependencies = async ({ edges, nodes }: DatasetDependencies) => { @@ -202,6 +186,7 @@ const formatDependencies = async ({ edges, nodes }: DatasetDependencies) => { elk.layout(generateGraph({ nodes: g.nodes, edges: g.edges, font })) ))); const fullGraph = await elk.layout(generateGraph({ nodes, edges, font })); + return { fullGraph, subGraphs,