Skip to content
This repository has been archived by the owner on Feb 10, 2024. It is now read-only.

Commit

Permalink
Simplify dataset subgraph logic (apache#27987)
Browse files Browse the repository at this point in the history
* fix merging connected dataset graphs

* refactor graph calculation
  • Loading branch information
bbovenzi authored and jrggggg committed Dec 1, 2022
1 parent 8c9ed8d commit fd4e6c0
Showing 1 changed file with 75 additions and 90 deletions.
165 changes: 75 additions & 90 deletions airflow/www/static/js/api/useDatasetDependencies.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@ import ELK, { ElkShape, ElkExtendedEdge } from 'elkjs';
import { getMetaValue } from 'src/utils';
import type { DepEdge, DepNode } from 'src/types';
import type { NodeType } from 'src/datasets/Graph/Node';
import { unionBy } from 'lodash';

interface DatasetDependencies {
edges: DepEdge[];
nodes: DepNode[];
}

interface EdgeGroup {
edges: DepEdge[];
}

interface GenerateProps {
nodes: DepNode[];
edges: DepEdge[];
Expand Down Expand Up @@ -82,111 +85,92 @@ const generateGraph = ({ nodes, edges, font }: GenerateProps) => ({
edges: edges.map((e) => ({ id: `${e.source}-${e.target}`, sources: [e.source], targets: [e.target] })),
});

interface SeparateGraphsProps extends DatasetDependencies {
graphs: DatasetDependencies[];
interface SeparateGraphsProps {
edges: DepEdge[];
graphs: EdgeGroup[];
}

const graphIndicesToMerge: Record<number, number[]> = {};
const indicesToRemove: number[] = [];

// find the downstream graph of each upstream edge
const findDownstreamGraph = (
{ edges, nodes, graphs = [] }: SeparateGraphsProps,
): DatasetDependencies[] => {
const newGraphs = [...graphs];
let filteredEdges = [...edges];

graphs.forEach((g, i) => {
// find downstream edges
const downstreamEdges = edges.filter((e) => g.edges.some((ge) => ge.target === e.source));
const downstreamNodes: DepNode[] = [];

downstreamEdges.forEach((e) => {
const newNode = nodes.find((n) => n.id === e.target);
if (newNode) {
downstreamNodes.push(newNode);

// check if the node already exists in a different graph
const existingGraphIndex = newGraphs
.findIndex(((ng) => ng.nodes.some((n) => n.id === newNode.id)));

// mark if the graph needs to merge with another
if (existingGraphIndex > -1) {
indicesToRemove.push(existingGraphIndex);
graphIndicesToMerge[i] = [...(graphIndicesToMerge[i] || []), existingGraphIndex];
{ edges, graphs = [] }: SeparateGraphsProps,
): EdgeGroup[] => {
let unassignedEdges = [...edges];

const mergedGraphs = graphs
.reduce(
(newGraphs, graph) => {
const otherGroupIndex = newGraphs.findIndex(
(otherGroup) => otherGroup.edges.some(
(otherEdge) => graph.edges.some(
(edge) => edge.target === otherEdge.target,
),
),
);
if (otherGroupIndex === -1) {
return [...newGraphs, graph];
}

// add node and edge to the graph
newGraphs[i] = {
nodes: [...newGraphs[i].nodes, newNode],
edges: [...newGraphs[i].edges, e],
};

// remove edge from edge list
filteredEdges = filteredEdges
.filter((fe) => !(fe.source === e.source && fe.target === e.target));
}
});
});

// once there are no more filtered edges left, merge relevant graphs
// we merge afterwards to make sure we captured all nodes + edges
if (!filteredEdges.length) {
Object.keys(graphIndicesToMerge).forEach((key) => {
const realKey = key as unknown as number;
const values = graphIndicesToMerge[realKey];
values.forEach((v) => {
newGraphs[realKey] = {
nodes: unionBy(newGraphs[realKey].nodes, newGraphs[v].nodes, 'id'),
edges: [...newGraphs[realKey].edges, ...newGraphs[v].edges]
.filter((e, i, s) => (
i === s.findIndex((t) => t.source === e.source && t.target === e.target)
)),
};
const mergedEdges = [...newGraphs[otherGroupIndex].edges, ...graph.edges]
.filter((edge, edgeIndex, otherEdges) => (
edgeIndex === otherEdges.findIndex(
(otherEdge) => otherEdge.source === edge.source && otherEdge.target === edge.target,
)
));
return [
...newGraphs.filter((_, newGraphIndex) => newGraphIndex !== otherGroupIndex),
{ edges: mergedEdges },
];
},
[] as EdgeGroup[],
)
.map((graph) => {
// find the next set of downstream edges and filter them out of the unassigned edges list
const downstreamEdges: DepEdge[] = [];
unassignedEdges = unassignedEdges.filter((edge) => {
const isDownstream = graph.edges.some((graphEdge) => graphEdge.target === edge.source);
if (isDownstream) downstreamEdges.push(edge);
return !isDownstream;
});

return {
edges: [...graph.edges, ...downstreamEdges],
};
});
return newGraphs.filter((g, i) => !indicesToRemove.some((j) => i === j));
}

return findDownstreamGraph({ edges: filteredEdges, nodes, graphs: newGraphs });
// recursively find downstream edges until there are no unassigned edges
return unassignedEdges.length
? findDownstreamGraph({ edges: unassignedEdges, graphs: mergedGraphs })
: mergedGraphs;
};

// separate the list of nodes/edges into distinct dataset pipeline graphs
const separateGraphs = ({ edges, nodes }: DatasetDependencies): DatasetDependencies[] => {
const separatedGraphs: DatasetDependencies[] = [];
let remainingEdges = [...edges];
let remainingNodes = [...nodes];

edges.forEach((edge) => {
const isDownstream = edges.some((e) => e.target === edge.source);

// if the edge is not downstream of anything, then start building the graph
if (!isDownstream) {
const connectedNodes = nodes.filter((n) => n.id === edge.source || n.id === edge.target);

// check if one of the nodes is already connected to a separated graph
const nodesInUse = separatedGraphs
.findIndex((g) => g.nodes.some((n) => connectedNodes.some((nn) => nn.id === n.id)));

if (nodesInUse > -1) {
// if one of the nodes is already in use, merge the graphs
const { nodes: existingNodes, edges: existingEdges } = separatedGraphs[nodesInUse];
separatedGraphs[nodesInUse] = { nodes: unionBy(existingNodes, connectedNodes, 'id'), edges: [...existingEdges, edge] };
} else {
// else just add the new separated graph
separatedGraphs.push({ nodes: connectedNodes, edges: [edge] });
}

// filter out used nodes and edges
remainingEdges = remainingEdges.filter((e) => e.source !== edge.source);
remainingNodes = remainingNodes.filter((n) => !connectedNodes.some((nn) => nn.id === n.id));
const separatedGraphs: EdgeGroup[] = [];
const remainingEdges: DepEdge[] = [];

edges.forEach((e) => {
// add a separate graph for each edge without an upstream
if (!edges.some((ee) => e.source === ee.target)) {
separatedGraphs.push({ edges: [e] });
} else {
remainingEdges.push(e);
}
});

if (remainingEdges.length) {
return findDownstreamGraph({ edges: remainingEdges, nodes, graphs: separatedGraphs });
}
return separatedGraphs;
const edgeGraphs = findDownstreamGraph({ edges: remainingEdges, graphs: separatedGraphs });

// once all the edges are found, add the nodes
return edgeGraphs.map((eg) => {
const graphNodes = nodes.filter(
(n) => eg.edges.some(
(e) => e.target === n.id || e.source === n.id,
),
);
return ({
edges: eg.edges,
nodes: graphNodes,
});
});
};

const formatDependencies = async ({ edges, nodes }: DatasetDependencies) => {
Expand All @@ -202,6 +186,7 @@ const formatDependencies = async ({ edges, nodes }: DatasetDependencies) => {
elk.layout(generateGraph({ nodes: g.nodes, edges: g.edges, font }))
)));
const fullGraph = await elk.layout(generateGraph({ nodes, edges, font }));

return {
fullGraph,
subGraphs,
Expand Down

0 comments on commit fd4e6c0

Please sign in to comment.