Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dvc: replace pipeline with dag #3905

Merged
merged 1 commit into from
May 31, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dvc/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
commit,
config,
daemon,
dag,
data_sync,
destroy,
diff,
Expand All @@ -26,7 +27,6 @@
metrics,
move,
params,
pipeline,
plots,
remote,
remove,
Expand Down Expand Up @@ -67,7 +67,7 @@
root,
ls,
freeze,
pipeline,
dag,
daemon,
commit,
diff,
Expand Down
115 changes: 115 additions & 0 deletions dvc/command/dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import argparse
import logging

from dvc.command.base import CmdBase, append_doc_link
from dvc.exceptions import DvcException

logger = logging.getLogger(__name__)


def _show_ascii(G):
from dvc.repo.graph import get_pipelines
from dvc.dagascii import draw

pipelines = get_pipelines(G)

ret = []
for pipeline in pipelines:
ret.append(draw(pipeline.nodes, pipeline.edges))

return "\n".join(ret)


def _show_dot(G):
import io
from networkx.drawing.nx_pydot import write_dot

dot_file = io.StringIO()
write_dot(G, dot_file)
return dot_file.getvalue()


def _build(G, target=None, full=False):
import networkx as nx
from dvc.repo.graph import get_pipeline, get_pipelines

if target:
H = get_pipeline(get_pipelines(G), target)
if not full:
descendants = nx.descendants(G, target)
descendants.add(target)
H.remove_nodes_from(set(G.nodes()) - descendants)
else:
H = G

def _relabel(stage):
return stage.addressing

return nx.relabel_nodes(H, _relabel, copy=False)


class CmdDAG(CmdBase):
def run(self):
try:
target = None
if self.args.target:
stages = self.repo.collect(self.args.target)
if len(stages) > 1:
logger.error(
f"'{self.args.target}' contains more than one stage "
"{stages}, please specify one stage"
)
return 1
target = stages[0]

G = _build(self.repo.graph, target=target, full=self.args.full,)

if self.args.dot:
logger.info(_show_dot(G))
else:
from dvc.utils.pager import pager

pager(_show_ascii(G))

return 0
except DvcException:
msg = "failed to show"
if self.args.target:
msg += f"a pipeline for '{target}'"
else:
msg += "pipelines"
logger.exception(msg)
return 1


def add_parser(subparsers, parent_parser):
DAG_HELP = "Visualize DVC project DAG."
dag_parser = subparsers.add_parser(
"dag",
parents=[parent_parser],
description=append_doc_link(DAG_HELP, "dag"),
help=DAG_HELP,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
dag_parser.add_argument(
"--dot",
action="store_true",
default=False,
help="Print DAG with .dot format.",
)
dag_parser.add_argument(
"--full",
action="store_true",
default=False,
help=(
"Show full DAG that the target belongs too, instead of "
"showing DAG consisting only of ancestors."
),
)
dag_parser.add_argument(
"target",
nargs="?",
help="Stage or output to show pipeline for. Optional. "
"(Finds all stages in the workspace by default.)",
)
dag_parser.set_defaults(func=CmdDAG)
Loading