Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always write run_results.json #7539

Merged
merged 12 commits into from
May 9, 2023
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20230508-093732.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: '`run_results.json` is now written after every node completes.'
time: 2023-05-08T09:37:32.809356-05:00
custom:
Author: iknox-fa
Issue: "7302"
3 changes: 3 additions & 0 deletions core/dbt/contracts/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,9 @@ def from_node_results(
meta = FreshnessMetadata(generated_at=generated_at)
return cls(metadata=meta, results=results, elapsed_time=elapsed_time)

def write(self, path):
FreshnessExecutionResultArtifact.from_result(self).write(path)
aranke marked this conversation as resolved.
Show resolved Hide resolved


@dataclass
@schema_version("sources", 3)
Expand Down
5 changes: 0 additions & 5 deletions core/dbt/task/freshness.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from .runnable import GraphRunnableTask

from dbt.contracts.results import (
FreshnessExecutionResultArtifact,
FreshnessResult,
PartialSourceFreshnessResult,
SourceFreshnessResult,
Expand Down Expand Up @@ -173,10 +172,6 @@ def get_node_selector(self):
def get_runner_type(self, _):
return FreshnessRunner

def write_result(self, result):
artifact = FreshnessExecutionResultArtifact.from_result(result)
artifact.write(self.result_path())

def get_result(self, results, elapsed_time, generated_at):
return FreshnessResult.from_node_results(
elapsed_time=elapsed_time, generated_at=generated_at, results=results
Expand Down
32 changes: 20 additions & 12 deletions core/dbt/task/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,16 @@ class GraphRunnableTask(ConfiguredTask):

def __init__(self, args, config, manifest):
super().__init__(args, config, manifest)
self.job_queue: Optional[GraphQueue] = None
self._flattened_nodes: Optional[List[ResultNode]] = None

self.run_count: int = 0
self.num_nodes: int = 0
self.node_results = []
self._skipped_children = {}
self._raise_next_tick = None
self._skipped_children = {}
self.job_queue: Optional[GraphQueue] = None
self.node_results = []
self.num_nodes: int = 0
self.previous_state: Optional[PreviousState] = None
self.run_count: int = 0
self.started: float = 0
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.started is the only new value here. The rest of the changes are just sorting things for readability.

stu-k marked this conversation as resolved.
Show resolved Hide resolved

self.set_previous_state()

def set_previous_state(self):
Expand Down Expand Up @@ -302,6 +303,15 @@ def _handle_result(self, result):
cause = None
self._mark_dependent_errors(node.unique_id, result, cause)

interim_run_result = self.get_result(
results=self.node_results,
elapsed_time=time.time() - self.started,
generated_at=datetime.utcnow(),
)

if get_flags().WRITE_JSON and hasattr(interim_run_result, "write"):
stu-k marked this conversation as resolved.
Show resolved Hide resolved
interim_run_result.write(self.result_path())

def _cancel_connections(self, pool):
"""Given a pool, cancel all adapter connections and wait until all
runners gentle terminates.
Expand Down Expand Up @@ -393,24 +403,21 @@ def print_results_line(self, node_results, elapsed):

def execute_with_hooks(self, selected_uids: AbstractSet[str]):
adapter = get_adapter(self.config)
started = time.time()
self.started = time.time()
try:
self.before_run(adapter, selected_uids)
res = self.execute_nodes()
self.after_run(adapter, res)
finally:
adapter.cleanup_connections()
elapsed = time.time() - started
elapsed = time.time() - self.started
self.print_results_line(self.node_results, elapsed)
result = self.get_result(
results=self.node_results, elapsed_time=elapsed, generated_at=datetime.utcnow()
)

return result

def write_result(self, result):
result.write(self.result_path())

def run(self):
"""
Run dbt for the query, based on the graph.
Expand Down Expand Up @@ -449,7 +456,8 @@ def run(self):

if get_flags().WRITE_JSON:
write_manifest(self.manifest, self.config.target_path)
self.write_result(result)
if hasattr(result, "write"):
result.write(self.result_path())
stu-k marked this conversation as resolved.
Show resolved Hide resolved

self.task_end_messages(result.results)
return result
Expand Down
37 changes: 37 additions & 0 deletions tests/functional/artifacts/test_run_results.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
from multiprocessing import Process
from pathlib import Path
import json
import pytest
import platform
from dbt.tests.util import run_dbt

good_model_sql = """
Expand All @@ -9,6 +13,11 @@
something bad
"""

slow_model_sql = """
{{ config(materialized='table') }}
select id from {{ ref('good_model') }}, pg_sleep(5)
"""


class TestRunResultsTimingSuccess:
@pytest.fixture(scope="class")
Expand All @@ -30,3 +39,31 @@ def test_timing_exists(self, project):
results = run_dbt(["run"], expect_pass=False)
assert len(results.results) == 1
assert len(results.results[0].timing) > 0


@pytest.mark.skipif(platform.system() != "Darwin", reason="Fails on linux in github actions")
aranke marked this conversation as resolved.
Show resolved Hide resolved
class TestRunResultsWritesFileOnSignal:
@pytest.fixture(scope="class")
def models(self):
return {"good_model.sql": good_model_sql, "slow_model.sql": slow_model_sql}

aranke marked this conversation as resolved.
Show resolved Hide resolved
def test_run_results_are_written_on_signal(self, project):
# Start the runner in a seperate process.
external_process_dbt = Process(
target=run_dbt, args=([["run"]]), kwargs={"expect_pass": False}
)
external_process_dbt.start()
assert external_process_dbt.is_alive()

# Wait until the first file write, then kill the process.
run_results_file = Path(project.project_root) / "target/run_results.json"
while run_results_file.is_file() is False:
pass
external_process_dbt.terminate()

# Wait until the process is dead, then check the file that there is only one result.
while external_process_dbt.is_alive() is True:
pass
with run_results_file.open() as run_results_str:
run_results = json.loads(run_results_str.read())
assert len(run_results["results"]) == 1
11 changes: 10 additions & 1 deletion tests/functional/fail_fast/test_fail_fast_run.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import pytest
import json
from pathlib import Path


from dbt.contracts.results import RunResult
from dbt.tests.util import run_dbt


models__one_sql = """
select 1 /failed
select 1
"""

models__two_sql = """
Expand All @@ -28,6 +31,12 @@ def test_fail_fast_run(
res = run_dbt(["run", "--fail-fast", "--threads", "1"], expect_pass=False)
# a RunResult contains only one node so we can be sure only one model was run
assert type(res) == RunResult
run_results_file = Path(project.project_root) / "target/run_results.json"
assert run_results_file.is_file()
with run_results_file.open() as run_results_str:
run_results = json.loads(run_results_str.read())
assert run_results["results"][0]["status"] == "success"
assert run_results["results"][1]["status"] == "error"


class TestFailFastFromConfig(FailFastBase):
Expand Down