From 3ddfeaa9d82ce767d6de09226a38edcbc3ac7c94 Mon Sep 17 00:00:00 2001 From: popcorny Date: Tue, 12 Sep 2023 15:30:39 +0800 Subject: [PATCH] Improve the telemetry events Signed-off-by: popcorny --- .../cli_utils/compare_with_recipe.py | 32 ++- piperider_cli/cli_utils/run_cmd.py | 221 ++++++++++-------- piperider_cli/configuration.py | 25 ++ piperider_cli/event/__init__.py | 33 ++- piperider_cli/event/events.py | 34 +++ piperider_cli/event/track.py | 33 ++- piperider_cli/profiler/profiler.py | 5 +- piperider_cli/recipe_executor.py | 12 +- piperider_cli/recipes/__init__.py | 19 +- piperider_cli/recipes/utils.py | 14 +- piperider_cli/runner.py | 92 +++----- 11 files changed, 329 insertions(+), 191 deletions(-) create mode 100644 piperider_cli/event/events.py diff --git a/piperider_cli/cli_utils/compare_with_recipe.py b/piperider_cli/cli_utils/compare_with_recipe.py index 90ebb0f3b..11c5222b2 100644 --- a/piperider_cli/cli_utils/compare_with_recipe.py +++ b/piperider_cli/cli_utils/compare_with_recipe.py @@ -1,4 +1,6 @@ +import time from rich.console import Console +from piperider_cli.event import CompareEventPayload, log_event def parse_compare_ref(ref: str): @@ -48,6 +50,7 @@ def compare_with_recipe(ref, **kwargs): select = kwargs.get('select') modified = kwargs.get('modified') skip_datasource_connection = kwargs.get('skip_datasource') + event_payload = CompareEventPayload() base_ref, target_ref = parse_compare_ref(ref) if ref is not None and base_ref is None: @@ -84,7 +87,9 @@ def compare_with_recipe(ref, **kwargs): elif is_piperider_workspace_exist() is False: raise DbtProjectNotFoundError() - ret = 0 + status = False + reason = None + start_time = time.time() try: # note: dry-run and interactive are set by configure_recipe_execution_flags from piperider_cli.recipe_executor import RecipeExecutor @@ -95,7 +100,8 @@ def compare_with_recipe(ref, **kwargs): base_ref=base_ref, target_ref=target_ref, skip_datasource_connection=skip_datasource_connection, - debug=debug) + debug=debug, + event_payload=event_payload) last = False base = target = None if not recipe_config.base.is_file_specified() and not recipe_config.target.is_file_specified(): @@ -105,6 +111,7 @@ def compare_with_recipe(ref, **kwargs): target = recipe_config.target.get_run_report() if not is_recipe_dry_run(): + event_payload.step = "compare reports" from piperider_cli.compare_report import CompareReport CompareReport.exec(a=base, b=target, last=last, datasource=None, output=kwargs.get('output'), tables_from="all", @@ -115,7 +122,24 @@ def compare_with_recipe(ref, **kwargs): project_name=project_name, show_progress=True, debug=debug) + + status = True + reason = 'ok' + event_payload.step = "done" + return 0 + except SystemExit as e: + reason = 'error' + raise e + except KeyboardInterrupt as e: + reason = 'aborted' + raise e except Exception as e: + reason = 'fatal' raise e - - return ret + finally: + end_time = time.time() + duration = end_time - start_time + event_payload.status = status + event_payload.reason = reason + event_payload.duration = duration + log_event(event_payload.to_dict(), 'compare') diff --git a/piperider_cli/cli_utils/run_cmd.py b/piperider_cli/cli_utils/run_cmd.py index 930e8e004..8abbf8c4d 100644 --- a/piperider_cli/cli_utils/run_cmd.py +++ b/piperider_cli/cli_utils/run_cmd.py @@ -1,8 +1,11 @@ import os import sys +import time from rich.console import Console +from piperider_cli.event import log_event + def run(**kwargs): 'Profile data source, run assertions, and generate report(s). By default, the raw results and reports are saved in ".piperider/outputs".' @@ -11,101 +14,129 @@ def run(**kwargs): from piperider_cli.cli_utils.cloud import CloudConnectorHelper from piperider_cli.configuration import FileSystem, is_piperider_workspace_exist from piperider_cli.error import DbtProjectNotFoundError, PipeRiderConflictOptionsError - from piperider_cli.exitcode import EC_ERR_TEST_FAILED, EC_WARN_NO_PROFILED_MODULES + from piperider_cli.exitcode import EC_WARN_NO_PROFILED_MODULES from piperider_cli.generate_report import GenerateReport from piperider_cli.guide import Guide from piperider_cli.initializer import Initializer - from piperider_cli.runner import Runner - - datasource = kwargs.get('datasource') - table = kwargs.get('table') - output = kwargs.get('output') - open_report = kwargs.get('open') - enable_share = kwargs.get('share') - skip_report = kwargs.get('skip_report') - dbt_target_path = kwargs.get('dbt_target_path') - dbt_list = kwargs.get('dbt_list') - force_upload = kwargs.get('upload') - project_name = kwargs.get('project') - select = kwargs.get('select') - state = kwargs.get('state') - - if project_name is not None: - os.environ.get('PIPERIDER_API_PROJECT') - - console = Console() - env_dbt_resources = os.environ.get('PIPERIDER_DBT_RESOURCES') - - # True -> 1, False -> 0 - if sum([True if table else False, dbt_list, env_dbt_resources is not None]) > 1: - console.print("[bold red]Error:[/bold red] " - "['--table', '--dbt-list'] are mutually exclusive") - sys.exit(1) - - # Search dbt project config files - dbt_project_dir = kwargs.get('dbt_project_dir') - no_auto_search = kwargs.get('no_auto_search') - dbt_project_path = DbtUtil.get_dbt_project_path(dbt_project_dir, no_auto_search, recursive=False) - dbt_profiles_dir = kwargs.get('dbt_profiles_dir') - if dbt_project_path: - working_dir = os.path.dirname(dbt_project_path) if dbt_project_path.endswith('.yml') else dbt_project_path - FileSystem.set_working_directory(working_dir) - if dbt_profiles_dir: - FileSystem.set_dbt_profiles_dir(dbt_profiles_dir) - # Only run initializer when dbt project path is provided - Initializer.exec(dbt_project_path=dbt_project_path, dbt_profiles_dir=dbt_profiles_dir, interactive=False) - elif is_piperider_workspace_exist() is False: - raise DbtProjectNotFoundError() - - dbt_resources = None - if select and dbt_list is True: - raise PipeRiderConflictOptionsError( - 'Cannot use options "--select" with "--dbt-list"', - hint='Remove "--select" option and use "--dbt-list" instead.' - ) - - if dbt_list: - dbt_resources = DbtUtil.read_dbt_resources(sys.stdin) - if env_dbt_resources is not None: - dbt_resources = DbtUtil.read_dbt_resources(env_dbt_resources) - - ret = Runner.exec(datasource=datasource, - table=table, - output=output, - skip_report=skip_report, - dbt_target_path=dbt_target_path, - dbt_resources=dbt_resources, - dbt_select=select, - dbt_state=state, - report_dir=kwargs.get('report_dir'), - skip_datasource_connection=kwargs.get('skip_datasource')) - if ret in (0, EC_ERR_TEST_FAILED, EC_WARN_NO_PROFILED_MODULES): - if enable_share: - force_upload = True - - auto_upload = CloudConnectorHelper.is_auto_upload() - is_cloud_view = (force_upload or auto_upload) - - if not skip_report: - GenerateReport.exec(None, kwargs.get('report_dir'), output, open_report, is_cloud_view) - - if ret == EC_WARN_NO_PROFILED_MODULES: - # No module was profiled - if dbt_list or dbt_resources or select: - Guide().show('No resources was profiled. Please check given "--select", "--dbt-list" option or ' - 'environment variable "PIPERIDER_DBT_RESOURCES" to choose the resources to profile.') - ret = 0 - - if CloudConnectorHelper.is_login() and is_cloud_view: - ret = CloudConnectorHelper.upload_latest_report(report_dir=kwargs.get('report_dir'), - debug=kwargs.get('debug'), - open_report=open_report, enable_share=enable_share, - project_name=project_name) - elif not CloudConnectorHelper.is_login() and is_cloud_view: - console = Console() - console.print('[bold yellow]Warning: [/bold yellow]The report is not uploaded due to not logged in.') - - if ret != 0: - if ret != EC_WARN_NO_PROFILED_MODULES: - sys.exit(ret) - return ret + from piperider_cli.runner import Runner, RunEventPayload + + status = False + reason = None + start_time = time.time() + event_payload = RunEventPayload() + event_payload.step = 'start' + try: + datasource = kwargs.get('datasource') + table = kwargs.get('table') + output = kwargs.get('output') + open_report = kwargs.get('open') + enable_share = kwargs.get('share') + skip_report = kwargs.get('skip_report') + dbt_target_path = kwargs.get('dbt_target_path') + dbt_list = kwargs.get('dbt_list') + force_upload = kwargs.get('upload') + project_name = kwargs.get('project') + select = kwargs.get('select') + state = kwargs.get('state') + + if project_name is not None: + os.environ.get('PIPERIDER_API_PROJECT') + + console = Console() + env_dbt_resources = os.environ.get('PIPERIDER_DBT_RESOURCES') + + # True -> 1, False -> 0 + if sum([True if table else False, dbt_list, env_dbt_resources is not None]) > 1: + console.print("[bold red]Error:[/bold red] " + "['--table', '--dbt-list'] are mutually exclusive") + sys.exit(1) + + # Search dbt project config files + dbt_project_dir = kwargs.get('dbt_project_dir') + no_auto_search = kwargs.get('no_auto_search') + dbt_project_path = DbtUtil.get_dbt_project_path(dbt_project_dir, no_auto_search, recursive=False) + dbt_profiles_dir = kwargs.get('dbt_profiles_dir') + if dbt_project_path: + working_dir = os.path.dirname(dbt_project_path) if dbt_project_path.endswith('.yml') else dbt_project_path + FileSystem.set_working_directory(working_dir) + if dbt_profiles_dir: + FileSystem.set_dbt_profiles_dir(dbt_profiles_dir) + # Only run initializer when dbt project path is provided + Initializer.exec(dbt_project_path=dbt_project_path, dbt_profiles_dir=dbt_profiles_dir, interactive=False) + elif is_piperider_workspace_exist() is False: + raise DbtProjectNotFoundError() + + dbt_resources = None + if select and dbt_list is True: + raise PipeRiderConflictOptionsError( + 'Cannot use options "--select" with "--dbt-list"', + hint='Remove "--select" option and use "--dbt-list" instead.' + ) + + if dbt_list: + dbt_resources = DbtUtil.read_dbt_resources(sys.stdin) + if env_dbt_resources is not None: + dbt_resources = DbtUtil.read_dbt_resources(env_dbt_resources) + + ret = Runner.exec(datasource=datasource, + table=table, + output=output, + skip_report=skip_report, + dbt_target_path=dbt_target_path, + dbt_resources=dbt_resources, + dbt_select=select, + dbt_state=state, + report_dir=kwargs.get('report_dir'), + skip_datasource_connection=kwargs.get('skip_datasource'), + event_payload=event_payload) + if ret in (0, EC_WARN_NO_PROFILED_MODULES): + if enable_share: + force_upload = True + + auto_upload = CloudConnectorHelper.is_auto_upload() + is_cloud_view = (force_upload or auto_upload) + + if not skip_report: + GenerateReport.exec(None, kwargs.get('report_dir'), output, open_report, is_cloud_view) + + if ret == EC_WARN_NO_PROFILED_MODULES: + # No module was profiled + if dbt_list or dbt_resources or select: + Guide().show('No resources was profiled. Please check given "--select", "--dbt-list" option or ' + 'environment variable "PIPERIDER_DBT_RESOURCES" to choose the resources to profile.') + ret = 0 + + event_payload.step = 'upload' + if CloudConnectorHelper.is_login() and is_cloud_view: + ret = CloudConnectorHelper.upload_latest_report(report_dir=kwargs.get('report_dir'), + debug=kwargs.get('debug'), + open_report=open_report, enable_share=enable_share, + project_name=project_name) + elif not CloudConnectorHelper.is_login() and is_cloud_view: + console = Console() + console.print('[bold yellow]Warning: [/bold yellow]The report is not uploaded due to not logged in.') + + if ret != 0: + reason = 'error' + return ret + + status = True + reason = 'ok' + event_payload.step = 'done' + return 0 + except SystemExit as e: + reason = 'error' + raise e + except KeyboardInterrupt as e: + reason = 'aborted' + raise e + except Exception as e: + reason = 'fatal' + raise e + finally: + end_time = time.time() + duration = end_time - start_time + event_payload.status = status + event_payload.reason = reason + event_payload.duration = duration + log_event(event_payload.to_dict(), 'run') diff --git a/piperider_cli/configuration.py b/piperider_cli/configuration.py index af6ae0121..4b3558677 100644 --- a/piperider_cli/configuration.py +++ b/piperider_cli/configuration.py @@ -324,6 +324,31 @@ def _verify_input_config(self): if not isinstance(self.excludes, List): raise PipeRiderConfigTypeError("'excludes' should be a list of tables' name") + def get_datasource(self, datasource: str = None): + datasources = {} + datasource_names = [] + for ds in self.dataSources: + datasource_names.append(ds.name) + datasources[ds.name] = ds + + if len(datasource_names) == 0: + return None + + if datasource: + ds_name = datasource + else: + # if global dbt config exists, use dbt profile target + # else use the first datasource + ds_name = self.dbt.get('target') if self.dbt else datasource_names[0] + + if ds_name not in datasource_names: + console = Console() + console.print(f"[[bold red]Error[/bold red]] Datasource '{ds_name}' doesn't exist") + console.print(f"Available datasources: {', '.join(datasource_names)}") + return None + + return datasources[ds_name] + def get_telemetry_id(self): if self.telemetry_id is not None: return self.telemetry_id diff --git a/piperider_cli/event/__init__.py b/piperider_cli/event/__init__.py index f76a2ef43..945be5f2a 100644 --- a/piperider_cli/event/__init__.py +++ b/piperider_cli/event/__init__.py @@ -2,14 +2,16 @@ import uuid from typing import Union +import sentry_sdk from rich.console import Console from ruamel import yaml -from piperider_cli import PIPERIDER_USER_HOME, PIPERIDER_USER_PROFILE +from piperider_cli import PIPERIDER_USER_HOME, PIPERIDER_USER_PROFILE, is_executed_manually from piperider_cli.event.collector import Collector +from .events import CompareEventPayload PIPERIDER_USER_EVENT_PATH = os.path.join(PIPERIDER_USER_HOME, '.unsend_events.json') -PIPERIDER_FLUSH_EVENTS_WHITELIST = ['init', 'run', 'generate-report', 'compare-reports'] +PIPERIDER_FLUSH_EVENTS_WHITELIST = ['init', 'run', 'generate-report', 'compare-reports', 'compare'] _collector = Collector() _yml = yaml.YAML() @@ -98,16 +100,6 @@ def flush_events(command=None): _collector.send_events() -def log_usage_event(command, params, status): - prop = dict( - command=command, - status=status, - upload=params.get('upload', False), - share=params.get('share', False) - ) - log_event(prop, 'usage', params=params) - - def log_event(prop, event_type, **kwargs): with open(PIPERIDER_USER_PROFILE, 'r') as f: user_profile = _yml.load(f) @@ -135,6 +127,23 @@ def log_event(prop, event_type, **kwargs): _collector.log_event(payload, event_type) +def capture_exception(e): + user_id = load_user_profile().get('user_id') + if is_executed_manually() is False: + project_info = _obtain_project_info() + project_id = project_info.get('project_id') + if not project_id: + return + user_id = f"{project_id}_CI" + + sentry_sdk.set_tag("user_id", user_id) + sentry_sdk.capture_exception(e) + + +def flush_exceptions(): + sentry_sdk.flush() + + class UserProfileConfigurator(object): @staticmethod def update(key: str, val: Union[str, bool], name: str = None): diff --git a/piperider_cli/event/events.py b/piperider_cli/event/events.py new file mode 100644 index 000000000..8c1567ef7 --- /dev/null +++ b/piperider_cli/event/events.py @@ -0,0 +1,34 @@ +class RunEventPayload: + + def __init__(self): + self.status = False + self.reason = None + self.step = None + self.datasource_type = None + self.skip_datasource = None + self.tables = 0 + self.columns = [] + self.rows = [] + self.passed_assertions = 0 + self.failed_assertions = 0 + self.passed_dbt_testcases = 0 + self.failed_dbt_testcases = 0 + self.build_in_assertions = 0 + self.custom_assertions = 0 + self.recommended_assertions = 0 + + def to_dict(self): + return self.__dict__ + + +class CompareEventPayload: + + def __init__(self): + self.status = False + self.reason = None + self.step = None + self.datasource_type = None + self.skip_datasource = None + + def to_dict(self): + return self.__dict__ diff --git a/piperider_cli/event/track.py b/piperider_cli/event/track.py index c3b8af8fd..6c799c327 100644 --- a/piperider_cli/event/track.py +++ b/piperider_cli/event/track.py @@ -2,6 +2,7 @@ import sys import traceback import typing as t +import time import sentry_sdk from click.core import Command, Context, Group @@ -64,6 +65,8 @@ def _show_hint_message(self, hint): def invoke(self, ctx: Context) -> t.Any: status = False + start_time = time.time() + try: self._apply_project_parameters(ctx) @@ -71,7 +74,16 @@ def invoke(self, ctx: Context) -> t.Any: if ret is None or ret == 0: guide.show_tips(ctx.command.name) status = True + reason = 'ok' + else: + reason = 'error' return ret + except SystemExit as e: + reason = 'error' + raise e + except KeyboardInterrupt as e: + reason = 'aborted' + raise e except Exception as e: if _enable_trackback: print(traceback.format_exc()) @@ -99,11 +111,26 @@ def invoke(self, ctx: Context) -> t.Any: self._show_error_message(e, ctx.params) if not ignored: - sentry_sdk.capture_exception(e) - sentry_sdk.flush() + event.capture_exception(e) + reason = 'fatal' + else: + reason = 'error' + event.flush_exceptions() sys.exit(1) finally: - event.log_usage_event(ctx.command.name, ctx.params, status) + end_time = time.time() + duration = end_time - start_time + props = dict( + command=ctx.command.name, + status=status, + reason=reason, + duration=duration, + upload=ctx.params.get('upload', False), + share=ctx.params.get('share', False), + ) + + event.log_event(props, 'usage', params=ctx.params) + event.flush_events(ctx.command.name) def _apply_project_parameters(self, ctx): diff --git a/piperider_cli/profiler/profiler.py b/piperider_cli/profiler/profiler.py index bfd2b8fa6..a5f5e1ba9 100644 --- a/piperider_cli/profiler/profiler.py +++ b/piperider_cli/profiler/profiler.py @@ -21,6 +21,7 @@ from .event import ProfilerEventHandler, DefaultProfilerEventHandler from ..configuration import Configuration from ..datasource import DataSource +from ..event import capture_exception HISTOGRAM_NUM_BUCKET = 50 @@ -162,9 +163,9 @@ def _fetch_table_task(subject: ProfileSubject): table = None try: table = Table(subject.table, MetaData(), autoload_with=engine, schema=schema) - except BaseException as e: + except Exception as e: # ignore the table metadata fetch error - sentry_sdk.capture_exception(e) + capture_exception(e) pass return subject, table diff --git a/piperider_cli/recipe_executor.py b/piperider_cli/recipe_executor.py index b679e4868..e42ba14f0 100644 --- a/piperider_cli/recipe_executor.py +++ b/piperider_cli/recipe_executor.py @@ -4,6 +4,7 @@ from piperider_cli.configuration import Configuration from piperider_cli.error import RecipeConfigException +from piperider_cli.event import CompareEventPayload from piperider_cli.recipes import select_recipe_file, RecipeConfiguration, execute_recipe_configuration from piperider_cli.recipes.default_recipe_generator import generate_default_recipe, show_recipe_content @@ -13,10 +14,17 @@ class RecipeExecutor: @staticmethod def exec(recipe_name: str, select: tuple = None, modified: bool = False, base_ref: str = None, - target_ref: str = None, skip_datasource_connection: bool = False, debug=False) -> RecipeConfiguration: + target_ref: str = None, skip_datasource_connection: bool = False, debug=False, event_payload=CompareEventPayload()) -> RecipeConfiguration: config = Configuration.instance() recipe_path = select_recipe_file(recipe_name) + event_payload.step = "prepare recipe" + ds = config.get_datasource() + if ds is not None: + event_payload.datasource_type = ds.type_name + if skip_datasource_connection: + event_payload.skip_datasource = True + if recipe_name and (select or modified or base_ref or skip_datasource_connection): console.print( "[[bold yellow]Warning[/bold yellow]] " @@ -52,6 +60,6 @@ def exec(recipe_name: str, select: tuple = None, modified: bool = False, base_re recipe = RecipeConfiguration.load(recipe_path) options = dict(skip_datasource_connection=skip_datasource_connection, select=select) - execute_recipe_configuration(recipe, options, debug=debug) + execute_recipe_configuration(recipe, options, debug=debug, event_payload=event_payload) return recipe diff --git a/piperider_cli/recipes/__init__.py b/piperider_cli/recipes/__init__.py index 48334803b..3587edf73 100644 --- a/piperider_cli/recipes/__init__.py +++ b/piperider_cli/recipes/__init__.py @@ -15,6 +15,7 @@ from piperider_cli import get_run_json_path, load_jinja_template, load_json from piperider_cli.configuration import Configuration, FileSystem from piperider_cli.error import RecipeConfigException +from piperider_cli.event import CompareEventPayload from piperider_cli.recipes.utils import InteractiveStopException PIPERIDER_RECIPES_SCHEMA_PATH = os.path.join(os.path.dirname(__file__), 'recipe_schema.json') @@ -308,7 +309,7 @@ def prepare_dbt_resources_candidate(cfg: RecipeConfiguration, options: Dict): return resources, state -def execute_recipe(cfg: RecipeConfiguration, recipe_type='base', debug=False): +def execute_recipe(cfg: RecipeConfiguration, recipe_type='base', debug=False, event_payload=CompareEventPayload()): """ We execute a recipe in the following steps: 1. run dbt commands @@ -324,6 +325,7 @@ def execute_recipe(cfg: RecipeConfiguration, recipe_type='base', debug=False): return # model.dbt.commands + event_payload.step = f"{recipe_type}:dbt" for cmd in model.dbt.commands or []: console.print(f"Run: \[{cmd}]") exit_code = tool().execute_command_with_showing_output(cmd, model.dbt.envs()) @@ -336,6 +338,7 @@ def execute_recipe(cfg: RecipeConfiguration, recipe_type='base', debug=False): console.print() # model.piperider.commands + event_payload.step = f"{recipe_type}:piperider" for cmd in model.piperider.commands or []: console.print(f"Run: \[{cmd}]") exit_code = tool().execute_command_with_showing_output(cmd, model.piperider.envs()) @@ -404,7 +407,7 @@ def execute_dbt_compile(model: RecipeModel, project_dir: str = None, profiles_di console.print() -def execute_recipe_archive(cfg: RecipeConfiguration, recipe_type='base', debug=False): +def execute_recipe_archive(cfg: RecipeConfiguration, recipe_type='base', debug=False, event_payload=CompareEventPayload()): """ We execute a recipe in the following steps: 1. export the repo with specified commit or branch if needed @@ -421,6 +424,8 @@ def execute_recipe_archive(cfg: RecipeConfiguration, recipe_type='base', debug=F console.print(f"Select {recipe_type} report: \[{model.file}]") return + # git archive + event_payload.step = f"{recipe_type}:git-archive" diff_target = 'HEAD' if recipe_type == 'target': branch_or_commit = tool().git_rev_parse(cfg.target.ref) @@ -445,6 +450,7 @@ def execute_recipe_archive(cfg: RecipeConfiguration, recipe_type='base', debug=F console.print() # model.dbt.commands + event_payload.step = f"{recipe_type}:dbt" for cmd in model.dbt.commands or []: console.print(f"Run: \[{cmd}]") # TODO: handle existing flags in command from recipe @@ -459,6 +465,7 @@ def execute_recipe_archive(cfg: RecipeConfiguration, recipe_type='base', debug=F console.print() # model.piperider.commands + event_payload.step = f"{recipe_type}:piperider" for cmd in model.piperider.commands or []: console.print(f"Run: \[{cmd}]") cmd = f'{cmd} --dbt-project-dir {model.tmp_dir_path} --dbt-target-path {model.tmp_dir_path}/target' if model.tmp_dir_path else cmd @@ -507,7 +514,7 @@ def clean_up(cfg: RecipeConfiguration): tool().remove_dir(cfg.target.tmp_dir_path) -def execute_recipe_configuration(cfg: RecipeConfiguration, options, debug=False): +def execute_recipe_configuration(cfg: RecipeConfiguration, options, debug=False, event_payload=CompareEventPayload()): console.rule("Recipe executor: verify execution environments") # check the dependencies console.print("Check: git") @@ -529,13 +536,13 @@ def execute_recipe_configuration(cfg: RecipeConfiguration, options, debug=False) cfg.target.dbt.commands = replace_commands_dbt_state_path(cfg.target.dbt.commands, dbt_state_path) console.rule("Recipe executor: base phase") - execute_recipe_archive(cfg, recipe_type='base', debug=debug) + execute_recipe_archive(cfg, recipe_type='base', debug=debug, event_payload=event_payload) console.rule("Recipe executor: target phase") if cfg.target.ref: - execute_recipe_archive(cfg, recipe_type='target', debug=debug) + execute_recipe_archive(cfg, recipe_type='target', debug=debug, event_payload=event_payload) else: - execute_recipe(cfg, recipe_type='target', debug=debug) + execute_recipe(cfg, recipe_type='target', debug=debug, event_payload=event_payload) except Exception as e: if isinstance(e, InteractiveStopException): diff --git a/piperider_cli/recipes/utils.py b/piperider_cli/recipes/utils.py index 633ad5b4e..c670f7650 100644 --- a/piperider_cli/recipes/utils.py +++ b/piperider_cli/recipes/utils.py @@ -23,7 +23,7 @@ def __init__(self, console: Console): @abc.abstractmethod def execute_command_in_silent( - self, command_line, env: Dict = None + self, command_line, env: Dict = None ) -> Tuple[str, str, int]: """ Execute command without showing outputs @@ -35,7 +35,7 @@ def execute_command_in_silent( @abc.abstractmethod def execute_command_with_showing_output( - self, command_line, env: Dict = None + self, command_line, env: Dict = None ) -> int: """ Execute command and showing outputs @@ -58,6 +58,10 @@ def dryrun_ignored_execute_command(command_line, env: Dict = None): cwd=FileSystem.WORKING_DIRECTORY, ) outs, errs = proc.communicate() + except KeyboardInterrupt as e: + print("subprocess interrupted") + raise e + except BaseException as e: if proc: proc.kill() @@ -79,7 +83,7 @@ def dryrun_ignored_execute_command_no_outputs(command_line, env: Dict = None): try: proc = Popen(cmd, env=env or os.environ.copy(), cwd=FileSystem.WORKING_DIRECTORY) proc.communicate() - except BaseException: + except Exception: if proc: proc.kill() proc.communicate() @@ -242,7 +246,7 @@ def __init__(self, utils: AbstractRecipeUtils): self.decoratee = utils def execute_command_in_silent( - self, command_line, env: Dict = None + self, command_line, env: Dict = None ) -> Tuple[str, str, int]: # ask user continue if self.should_continue(command_line): @@ -253,7 +257,7 @@ def execute_command_in_silent( raise InteractiveStopException() def execute_command_with_showing_output( - self, command_line, env: Dict = None + self, command_line, env: Dict = None ) -> int: # ask user continue if self.should_continue(command_line): diff --git a/piperider_cli/runner.py b/piperider_cli/runner.py index 4c0855063..5d9e15df8 100644 --- a/piperider_cli/runner.py +++ b/piperider_cli/runner.py @@ -30,33 +30,14 @@ from piperider_cli.datasource import DataSource from piperider_cli.datasource.unsupported import UnsupportedDataSource from piperider_cli.error import PipeRiderInvalidDataSourceError, PipeRiderError, PipeRiderConnectorUnsupportedError -from piperider_cli.exitcode import EC_ERR_TEST_FAILED, EC_WARN_NO_PROFILED_MODULES +from piperider_cli.event.events import RunEventPayload +from piperider_cli.exitcode import EC_WARN_NO_PROFILED_MODULES from piperider_cli.metrics_engine import MetricEngine, MetricEventHandler from piperider_cli.profiler import ProfileSubject, Profiler, ProfilerEventHandler from piperider_cli.statistics import Statistics from piperider_cli.utils import create_link, remove_link -class RunEventPayload: - - def __init__(self): - # all fields - self.tables = 0 - self.columns = [] - self.rows = [] - self.dbt_command = '' - self.passed_assertions = 0 - self.failed_assertions = 0 - self.passed_dbt_testcases = 0 - self.failed_dbt_testcases = 0 - self.build_in_assertions = 0 - self.custom_assertions = 0 - self.recommended_assertions = 0 - - def to_dict(self): - return self.__dict__ - - class RichProfilerEventHandler(ProfilerEventHandler): def __init__(self, tables): @@ -455,10 +436,9 @@ def _append_descriptions_from_assertion(profile_result): 'description'] = f'{column_desc}' -def _analyse_and_log_run_event(profiled_result, assertion_results, dbt_test_results): +def _analyse_run_event(event_payload: RunEventPayload, profiled_result, assertion_results, dbt_test_results): tables = profiled_result.get('tables', []) tables = {k: v for k, v in tables.items() if v} - event_payload = RunEventPayload() event_payload.tables = len(tables) # Table info @@ -492,8 +472,6 @@ def _analyse_and_log_run_event(profiled_result, assertion_results, dbt_test_resu else: event_payload.failed_dbt_testcases += 1 - event.log_event(event_payload.to_dict(), 'run') - def decorate_with_metadata(profile_result: dict): from piperider_cli import __version__ @@ -644,45 +622,30 @@ class Runner: @staticmethod def exec(datasource=None, table=None, output=None, skip_report=False, dbt_target_path: str = None, dbt_resources: Optional[dict] = None, dbt_select: tuple = None, dbt_state: str = None, - report_dir: str = None, skip_datasource_connection: bool = False): + report_dir: str = None, skip_datasource_connection: bool = False, event_payload=RunEventPayload()): console = Console() raise_exception_when_directory_not_writable(output) configuration = Configuration.instance() filesystem = configuration.activate_report_directory(report_dir=report_dir) - datasources = {} - datasource_names = [] - for ds in configuration.dataSources: - datasource_names.append(ds.name) - datasources[ds.name] = ds - - if len(datasource_names) == 0: + ds = configuration.get_datasource(datasource) + if ds is None: if skip_datasource_connection is False: console.print( "[[bold red]Error[/bold red]] Data source not found. " "Please check your dbt 'profiles.yml' configuration.") return 1 else: - configuration.dbt['target'] = 'dev' - configuration.dbt['profile'] = 'unsupported-data-source' - datasources = {'dev': UnsupportedDataSource('Unknown', dbt=configuration.dbt)} - datasource_names = ['dev'] - - if datasource: - ds_name = datasource + ds = UnsupportedDataSource('Unknown', dbt=configuration.dbt) else: - # if global dbt config exists, use dbt profile target - # else use the first datasource - ds_name = configuration.dbt.get('target') if configuration.dbt else datasource_names[0] - - if ds_name not in datasource_names: - console.print(f"[[bold red]Error[/bold red]] Datasource '{ds_name}' doesn't exist") - console.print(f"Available datasources: {', '.join(datasource_names)}") - return 1 - - ds = datasources[ds_name] + event_payload.datasource_type = ds.type_name + if skip_datasource_connection: + event_payload.skip_datasource = True + # Validating + console.rule('Validating') + event_payload.step = 'validate' passed, reasons = ds.validate() if not passed: console.print(f"[bold red]Error:[/bold red] The credential of '{ds.name}' is not configured.") @@ -692,12 +655,6 @@ def exec(datasource=None, table=None, output=None, skip_report=False, dbt_target "[bold yellow]Hint:[/bold yellow]\n Please execute command 'piperider init' to move forward.") return 1 - if not datasource and configuration.dbt is None and len(datasource_names) > 1: - console.print( - f"[bold yellow]Warning: multiple datasources found ({', '.join(datasource_names)}), using '{ds_name}'[/bold yellow]\n") - - console.print(f'[bold dark_orange]DataSource:[/bold dark_orange] {ds.name}') - console.rule('Validating') result, err = _pre_run_validating(ds) if result is PreRunValidatingResult.ERROR: console.print('\n\n[[bold red]ERROR[/bold red]] Stop profiling, please fix the syntax errors above.') @@ -730,11 +687,11 @@ def exec(datasource=None, table=None, output=None, skip_report=False, dbt_target if not dbtutil.is_ready(dbt_config): console.log( '[[bold red]ERROR:[/bold red]] DBT configuration is not completed, please check the config.yml') - return sys.exit(1) + return 1 dbt_target_path, err_msg = get_dbt_state_dir(dbt_target_path, dbt_config, ds, skip_datasource_connection) if err_msg: console.print(err_msg) - return sys.exit(1) + return 1 dbt_manifest = dbtutil.get_dbt_manifest(dbt_target_path) dbt_run_results = dbtutil.get_dbt_run_results(dbt_target_path) if dbt_select: @@ -743,6 +700,9 @@ def exec(datasource=None, table=None, output=None, skip_report=False, dbt_target select=dbt_select, state=dbt_state) console.print('everything is OK.') + + # Profiling + event_payload.step = 'pre-profile' run_id = uuid.uuid4().hex created_at = datetime.utcnow() if skip_datasource_connection: @@ -788,7 +748,6 @@ def filter_fn(subject: ProfileSubject): profiler_result = {} statistics = Statistics() - # Profile the datasource profiler = Profiler(ds, RichProfilerEventHandler([subject.name for subject in subjects]), configuration) if skip_datasource_connection: @@ -799,9 +758,11 @@ def filter_fn(subject: ProfileSubject): run_result.update(profiler_result) else: try: + event_payload.step = 'schema' console.rule('Collect metadata') profiler.collect_metadata(dbt_metadata_subjects, subjects) + event_payload.step = 'profile' console.rule('Profile statistics') profiler_result = profiler.profile(subjects, metadata_subjects=dbt_metadata_subjects) run_result.update(profiler_result) @@ -813,6 +774,8 @@ def filter_fn(subject: ProfileSubject): statistics.reset() + # Query metrics + event_payload.step = 'metric' if skip_datasource_connection is False: console.rule('Query metrics') metrics = [] @@ -833,6 +796,8 @@ def filter_fn(subject: ProfileSubject): assertion_results, assertion_exceptions = _execute_assertions(console, engine, ds.name, output, profiler_result, created_at) + # Assertion + event_payload.step = 'assertion' run_result['tests'] = [] if assertion_results or dbt_test_results: console.rule('Assertion Results') @@ -874,6 +839,8 @@ def _slim_dbt_manifest(manifest): dbtutil.append_descriptions(run_result, dbt_target_path) _append_descriptions_from_assertion(run_result) + # Generate report + event_payload.step = 'report' run_result['id'] = run_id run_result['created_at'] = datetime_to_str(created_at) git_branch, git_sha = get_git_branch() @@ -904,10 +871,11 @@ def _slim_dbt_manifest(manifest): if skip_report: console.print(f'Results saved to {output if output else output_path}') - _analyse_and_log_run_event(run_result, assertion_results, dbt_test_results) + _analyse_run_event(event_payload, run_result, assertion_results, dbt_test_results) - if not _check_assertion_status(assertion_results, assertion_exceptions): - return EC_ERR_TEST_FAILED + # The assertion is deprecated. We should not run failed event the dbt test failed. + # if not _check_assertion_status(assertion_results, assertion_exceptions): + # return EC_ERR_TEST_FAILED if len(subjects) == 0 and len(run_result.get('metrics', [])) == 0 and not skip_datasource_connection: return EC_WARN_NO_PROFILED_MODULES