From 6d46eac755c2db039f51e3103de45abab5a8dd85 Mon Sep 17 00:00:00 2001 From: Ian Knox Date: Sat, 6 May 2023 11:23:45 -0500 Subject: [PATCH 01/12] writes run_results.json after each node --- core/dbt/task/runnable.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/core/dbt/task/runnable.py b/core/dbt/task/runnable.py index a9055fc27ce..4746490a791 100644 --- a/core/dbt/task/runnable.py +++ b/core/dbt/task/runnable.py @@ -302,6 +302,13 @@ def _handle_result(self, result): cause = None self._mark_dependent_errors(node.unique_id, result, cause) + interim_run_result = self.get_result( + results=self.node_results, + elapsed_time=time.time() - self.started, + generated_at=datetime.utcnow(), + ) + interim_run_result.write(self.result_path()) + def _cancel_connections(self, pool): """Given a pool, cancel all adapter connections and wait until all runners gentle terminates. @@ -393,14 +400,14 @@ def print_results_line(self, node_results, elapsed): def execute_with_hooks(self, selected_uids: AbstractSet[str]): adapter = get_adapter(self.config) - started = time.time() + self.started = time.time() try: self.before_run(adapter, selected_uids) res = self.execute_nodes() self.after_run(adapter, res) finally: adapter.cleanup_connections() - elapsed = time.time() - started + elapsed = time.time() - self.started self.print_results_line(self.node_results, elapsed) result = self.get_result( results=self.node_results, elapsed_time=elapsed, generated_at=datetime.utcnow() @@ -408,9 +415,6 @@ def execute_with_hooks(self, selected_uids: AbstractSet[str]): return result - def write_result(self, result): - result.write(self.result_path()) - def run(self): """ Run dbt for the query, based on the graph. @@ -449,7 +453,7 @@ def run(self): if get_flags().WRITE_JSON: write_manifest(self.manifest, self.config.target_path) - self.write_result(result) + result.write(self.result_path()) self.task_end_messages(result.results) return result From aaa5599ebcbace90fd1b6fc0c40b754ce1d74722 Mon Sep 17 00:00:00 2001 From: Ian Knox Date: Sat, 6 May 2023 11:44:27 -0500 Subject: [PATCH 02/12] better class attrs --- core/dbt/task/runnable.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/core/dbt/task/runnable.py b/core/dbt/task/runnable.py index 4746490a791..55e54f57140 100644 --- a/core/dbt/task/runnable.py +++ b/core/dbt/task/runnable.py @@ -65,15 +65,16 @@ class GraphRunnableTask(ConfiguredTask): def __init__(self, args, config, manifest): super().__init__(args, config, manifest) - self.job_queue: Optional[GraphQueue] = None self._flattened_nodes: Optional[List[ResultNode]] = None - - self.run_count: int = 0 - self.num_nodes: int = 0 - self.node_results = [] - self._skipped_children = {} self._raise_next_tick = None + self._skipped_children = {} + self.job_queue: Optional[GraphQueue] = None + self.node_results = [] + self.num_nodes: int = 0 self.previous_state: Optional[PreviousState] = None + self.run_count: int = 0 + self.started: float = 0 + self.set_previous_state() def set_previous_state(self): From 55768a6400fa7d924f09cfd25cbc32d5f8ebce60 Mon Sep 17 00:00:00 2001 From: Ian Knox Date: Sat, 6 May 2023 14:06:22 -0500 Subject: [PATCH 03/12] Fix inconsistant result writing for freshness vs run, respect WRITE_JSON flag --- core/dbt/contracts/results.py | 3 +++ core/dbt/task/freshness.py | 5 ----- core/dbt/task/runnable.py | 7 +++++-- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/core/dbt/contracts/results.py b/core/dbt/contracts/results.py index 63655f972c9..00a95b573fb 100644 --- a/core/dbt/contracts/results.py +++ b/core/dbt/contracts/results.py @@ -393,6 +393,9 @@ def from_node_results( meta = FreshnessMetadata(generated_at=generated_at) return cls(metadata=meta, results=results, elapsed_time=elapsed_time) + def write(self, path): + FreshnessExecutionResultArtifact.from_result(self).write(path) + @dataclass @schema_version("sources", 3) diff --git a/core/dbt/task/freshness.py b/core/dbt/task/freshness.py index 95ff76083a9..e18271778b5 100644 --- a/core/dbt/task/freshness.py +++ b/core/dbt/task/freshness.py @@ -9,7 +9,6 @@ from .runnable import GraphRunnableTask from dbt.contracts.results import ( - FreshnessExecutionResultArtifact, FreshnessResult, PartialSourceFreshnessResult, SourceFreshnessResult, @@ -173,10 +172,6 @@ def get_node_selector(self): def get_runner_type(self, _): return FreshnessRunner - def write_result(self, result): - artifact = FreshnessExecutionResultArtifact.from_result(result) - artifact.write(self.result_path()) - def get_result(self, results, elapsed_time, generated_at): return FreshnessResult.from_node_results( elapsed_time=elapsed_time, generated_at=generated_at, results=results diff --git a/core/dbt/task/runnable.py b/core/dbt/task/runnable.py index 55e54f57140..4542b9dcb13 100644 --- a/core/dbt/task/runnable.py +++ b/core/dbt/task/runnable.py @@ -308,7 +308,9 @@ def _handle_result(self, result): elapsed_time=time.time() - self.started, generated_at=datetime.utcnow(), ) - interim_run_result.write(self.result_path()) + + if get_flags().WRITE_JSON and hasattr(interim_run_result, "write"): + interim_run_result.write(self.result_path()) def _cancel_connections(self, pool): """Given a pool, cancel all adapter connections and wait until all @@ -454,7 +456,8 @@ def run(self): if get_flags().WRITE_JSON: write_manifest(self.manifest, self.config.target_path) - result.write(self.result_path()) + if hasattr(result, "write"): + result.write(self.result_path()) self.task_end_messages(result.results) return result From 2f294b9af4bfcde32cca7c62502b760b16f169ed Mon Sep 17 00:00:00 2001 From: Ian Knox Date: Mon, 8 May 2023 09:37:44 -0500 Subject: [PATCH 04/12] changelog --- .changes/unreleased/Fixes-20230508-093732.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changes/unreleased/Fixes-20230508-093732.yaml diff --git a/.changes/unreleased/Fixes-20230508-093732.yaml b/.changes/unreleased/Fixes-20230508-093732.yaml new file mode 100644 index 00000000000..b0605d11ac6 --- /dev/null +++ b/.changes/unreleased/Fixes-20230508-093732.yaml @@ -0,0 +1,6 @@ +kind: Fixes +body: '`run_results.json` is now written after every node completes.' +time: 2023-05-08T09:37:32.809356-05:00 +custom: + Author: iknox-fa + Issue: "7302" From 9be134dc6a6761eb12e3e8251a712efc43170682 Mon Sep 17 00:00:00 2001 From: Ian Knox Date: Mon, 8 May 2023 10:16:16 -0500 Subject: [PATCH 05/12] fail fast tests for run_results.json --- tests/functional/fail_fast/test_fail_fast_run.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/tests/functional/fail_fast/test_fail_fast_run.py b/tests/functional/fail_fast/test_fail_fast_run.py index 4fb7821d072..92ed1c7aea4 100644 --- a/tests/functional/fail_fast/test_fail_fast_run.py +++ b/tests/functional/fail_fast/test_fail_fast_run.py @@ -1,11 +1,14 @@ import pytest +import json +from pathlib import Path + from dbt.contracts.results import RunResult from dbt.tests.util import run_dbt models__one_sql = """ -select 1 /failed +select 1 """ models__two_sql = """ @@ -28,6 +31,12 @@ def test_fail_fast_run( res = run_dbt(["run", "--fail-fast", "--threads", "1"], expect_pass=False) # a RunResult contains only one node so we can be sure only one model was run assert type(res) == RunResult + run_results_file = Path(project.project_root) / "target/run_results.json" + assert run_results_file.is_file() + with run_results_file.open() as run_results_str: + run_results = json.loads(run_results_str.read()) + assert run_results["results"][0]["status"] == "success" + assert run_results["results"][1]["status"] == "error" class TestFailFastFromConfig(FailFastBase): From dc92aa11cfbb26a64addc3c89649aa624f85897a Mon Sep 17 00:00:00 2001 From: Ian Knox Date: Mon, 8 May 2023 15:59:23 -0500 Subject: [PATCH 06/12] terminate process testing --- .../functional/artifacts/test_run_results.py | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/tests/functional/artifacts/test_run_results.py b/tests/functional/artifacts/test_run_results.py index a1e1b92187f..07e02d37ee1 100644 --- a/tests/functional/artifacts/test_run_results.py +++ b/tests/functional/artifacts/test_run_results.py @@ -1,3 +1,6 @@ +import multiprocessing +from pathlib import Path +from time import sleep import pytest from dbt.tests.util import run_dbt @@ -9,6 +12,11 @@ something bad """ +slow_model_sql = """ +{{ config(materialized='table') }} +select id from {{ ref('good_model') }}, pg_sleep(5) +""" + class TestRunResultsTimingSuccess: @pytest.fixture(scope="class") @@ -30,3 +38,31 @@ def test_timing_exists(self, project): results = run_dbt(["run"], expect_pass=False) assert len(results.results) == 1 assert len(results.results[0].timing) > 0 + + +class TestRunResultsWritesFileOnSignal: + @pytest.fixture(scope="class") + def models(self): + return {"good_model.sql": good_model_sql, "slow_model.sql": slow_model_sql} + + def test_run_results_are_written_on_signal(self, project): + + # N.B. This test is... no great. + # Due to the way that multiprocessing handles termination this test + # will always take the entire amount of time designated in pg_sleep. + # Additionally playing these timing games is probably quite fragile. + + external_process_dbt = multiprocessing.Process( + target=run_dbt, args=([["run"]]), kwargs={"expect_pass": False} + ) + external_process_dbt.start() + assert external_process_dbt.is_alive() + # More than enough time for the first model to complete + # but not enough for the second to complete. + # A bit janky, I know. + sleep(2) + external_process_dbt.terminate() + while external_process_dbt.is_alive() is True: + pass + run_results_file = Path(project.project_root) / "target/run_results.json" + assert run_results_file.is_file() From ffe9f5355f86d296ebaa7ef74225fba461572aa1 Mon Sep 17 00:00:00 2001 From: Ian Knox Date: Mon, 8 May 2023 16:01:09 -0500 Subject: [PATCH 07/12] better comments --- tests/functional/artifacts/test_run_results.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/functional/artifacts/test_run_results.py b/tests/functional/artifacts/test_run_results.py index 07e02d37ee1..1aefe038f53 100644 --- a/tests/functional/artifacts/test_run_results.py +++ b/tests/functional/artifacts/test_run_results.py @@ -47,9 +47,12 @@ def models(self): def test_run_results_are_written_on_signal(self, project): - # N.B. This test is... no great. + # N.B. This test is... not great. # Due to the way that multiprocessing handles termination this test # will always take the entire amount of time designated in pg_sleep. + # See: + # https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Process.terminate + # # Additionally playing these timing games is probably quite fragile. external_process_dbt = multiprocessing.Process( From 0b53d3fd65b66556ece8986aac94d3ed8ec9c0c9 Mon Sep 17 00:00:00 2001 From: Ian Knox Date: Mon, 8 May 2023 16:45:48 -0500 Subject: [PATCH 08/12] new test approach --- .../functional/artifacts/test_run_results.py | 32 ++++++++----------- 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/tests/functional/artifacts/test_run_results.py b/tests/functional/artifacts/test_run_results.py index 1aefe038f53..ef0e789ec96 100644 --- a/tests/functional/artifacts/test_run_results.py +++ b/tests/functional/artifacts/test_run_results.py @@ -1,6 +1,6 @@ -import multiprocessing +from multiprocessing import Process from pathlib import Path -from time import sleep +import json import pytest from dbt.tests.util import run_dbt @@ -46,26 +46,22 @@ def models(self): return {"good_model.sql": good_model_sql, "slow_model.sql": slow_model_sql} def test_run_results_are_written_on_signal(self, project): - - # N.B. This test is... not great. - # Due to the way that multiprocessing handles termination this test - # will always take the entire amount of time designated in pg_sleep. - # See: - # https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Process.terminate - # - # Additionally playing these timing games is probably quite fragile. - - external_process_dbt = multiprocessing.Process( + # Start the runner in a seperate process. + external_process_dbt = Process( target=run_dbt, args=([["run"]]), kwargs={"expect_pass": False} ) external_process_dbt.start() assert external_process_dbt.is_alive() - # More than enough time for the first model to complete - # but not enough for the second to complete. - # A bit janky, I know. - sleep(2) + + # Wait until the first file write, then kill the process. + run_results_file = Path(project.project_root) / "target/run_results.json" + while run_results_file.is_file() is False: + pass external_process_dbt.terminate() + + # Wait until the process is dead, then check the file that there is only one result. while external_process_dbt.is_alive() is True: pass - run_results_file = Path(project.project_root) / "target/run_results.json" - assert run_results_file.is_file() + with run_results_file.open() as run_results_str: + run_results = json.loads(run_results_str.read()) + assert len(run_results["results"]) == 1 From dcd73e5775da7385d66e832b426fe404c313b2ce Mon Sep 17 00:00:00 2001 From: Ian Knox Date: Mon, 8 May 2023 17:34:34 -0500 Subject: [PATCH 09/12] Janky test gets Jankier --- tests/functional/artifacts/test_run_results.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/functional/artifacts/test_run_results.py b/tests/functional/artifacts/test_run_results.py index ef0e789ec96..44b8c38122b 100644 --- a/tests/functional/artifacts/test_run_results.py +++ b/tests/functional/artifacts/test_run_results.py @@ -2,6 +2,7 @@ from pathlib import Path import json import pytest +import platform from dbt.tests.util import run_dbt good_model_sql = """ @@ -40,6 +41,9 @@ def test_timing_exists(self, project): assert len(results.results[0].timing) > 0 +@pytest.mark.skipif( + platform.system() != "Darwin", reason="Fails on linux in github actions, reason unknown" +) class TestRunResultsWritesFileOnSignal: @pytest.fixture(scope="class") def models(self): From 064e77414cba7c3f7ccb2a79ab6c65b886101529 Mon Sep 17 00:00:00 2001 From: Ian Knox Date: Mon, 8 May 2023 17:35:46 -0500 Subject: [PATCH 10/12] Janky test gets Jankier --- tests/functional/artifacts/test_run_results.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/functional/artifacts/test_run_results.py b/tests/functional/artifacts/test_run_results.py index 44b8c38122b..c03c7abdf8f 100644 --- a/tests/functional/artifacts/test_run_results.py +++ b/tests/functional/artifacts/test_run_results.py @@ -41,9 +41,7 @@ def test_timing_exists(self, project): assert len(results.results[0].timing) > 0 -@pytest.mark.skipif( - platform.system() != "Darwin", reason="Fails on linux in github actions, reason unknown" -) +@pytest.mark.skipif(platform.system() != "Darwin", reason="Fails on linux in github actions") class TestRunResultsWritesFileOnSignal: @pytest.fixture(scope="class") def models(self): From 64756b9e9b727cf713f04b04f70f842cd2cadac2 Mon Sep 17 00:00:00 2001 From: Ian Knox Date: Tue, 9 May 2023 12:30:34 -0500 Subject: [PATCH 11/12] PR feedback --- core/dbt/task/runnable.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/core/dbt/task/runnable.py b/core/dbt/task/runnable.py index 4542b9dcb13..5c116a9f49b 100644 --- a/core/dbt/task/runnable.py +++ b/core/dbt/task/runnable.py @@ -73,7 +73,7 @@ def __init__(self, args, config, manifest): self.num_nodes: int = 0 self.previous_state: Optional[PreviousState] = None self.run_count: int = 0 - self.started: float = 0 + self.started_at: float = 0 self.set_previous_state() @@ -305,11 +305,11 @@ def _handle_result(self, result): interim_run_result = self.get_result( results=self.node_results, - elapsed_time=time.time() - self.started, + elapsed_time=time.time() - self.started_at, generated_at=datetime.utcnow(), ) - if get_flags().WRITE_JSON and hasattr(interim_run_result, "write"): + if self.args.WRITE_JSON and hasattr(interim_run_result, "write"): interim_run_result.write(self.result_path()) def _cancel_connections(self, pool): @@ -403,14 +403,14 @@ def print_results_line(self, node_results, elapsed): def execute_with_hooks(self, selected_uids: AbstractSet[str]): adapter = get_adapter(self.config) - self.started = time.time() + self.started_at = time.time() try: self.before_run(adapter, selected_uids) res = self.execute_nodes() self.after_run(adapter, res) finally: adapter.cleanup_connections() - elapsed = time.time() - self.started + elapsed = time.time() - self.started_at self.print_results_line(self.node_results, elapsed) result = self.get_result( results=self.node_results, elapsed_time=elapsed, generated_at=datetime.utcnow() @@ -454,7 +454,7 @@ def run(self): ) ) - if get_flags().WRITE_JSON: + if self.args.WRITE_JSON: write_manifest(self.manifest, self.config.target_path) if hasattr(result, "write"): result.write(self.result_path()) From 606aac22494eeb8dba3db21d6fbf7c7510560f03 Mon Sep 17 00:00:00 2001 From: Ian Knox Date: Tue, 9 May 2023 12:58:11 -0500 Subject: [PATCH 12/12] lcase is preferred --- core/dbt/task/runnable.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/dbt/task/runnable.py b/core/dbt/task/runnable.py index 5c116a9f49b..494acf98904 100644 --- a/core/dbt/task/runnable.py +++ b/core/dbt/task/runnable.py @@ -309,7 +309,7 @@ def _handle_result(self, result): generated_at=datetime.utcnow(), ) - if self.args.WRITE_JSON and hasattr(interim_run_result, "write"): + if self.args.write_json and hasattr(interim_run_result, "write"): interim_run_result.write(self.result_path()) def _cancel_connections(self, pool): @@ -454,7 +454,7 @@ def run(self): ) ) - if self.args.WRITE_JSON: + if self.args.write_json: write_manifest(self.manifest, self.config.target_path) if hasattr(result, "write"): result.write(self.result_path())