Skip to content

Commit

Permalink
Merge pull request #887 from InfuseAI/chore/sc-32209/improve-telemetr…
Browse files Browse the repository at this point in the history
…y-usage-and-run-event

[Chore] improve telemetry usage and run event
  • Loading branch information
popcornylu authored Sep 18, 2023
2 parents e4c7fcb + 3ddfeaa commit ec45850
Show file tree
Hide file tree
Showing 11 changed files with 329 additions and 191 deletions.
32 changes: 28 additions & 4 deletions piperider_cli/cli_utils/compare_with_recipe.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import time
from rich.console import Console
from piperider_cli.event import CompareEventPayload, log_event


def parse_compare_ref(ref: str):
Expand Down Expand Up @@ -48,6 +50,7 @@ def compare_with_recipe(ref, **kwargs):
select = kwargs.get('select')
modified = kwargs.get('modified')
skip_datasource_connection = kwargs.get('skip_datasource')
event_payload = CompareEventPayload()

base_ref, target_ref = parse_compare_ref(ref)
if ref is not None and base_ref is None:
Expand Down Expand Up @@ -84,7 +87,9 @@ def compare_with_recipe(ref, **kwargs):
elif is_piperider_workspace_exist() is False:
raise DbtProjectNotFoundError()

ret = 0
status = False
reason = None
start_time = time.time()
try:
# note: dry-run and interactive are set by configure_recipe_execution_flags
from piperider_cli.recipe_executor import RecipeExecutor
Expand All @@ -95,7 +100,8 @@ def compare_with_recipe(ref, **kwargs):
base_ref=base_ref,
target_ref=target_ref,
skip_datasource_connection=skip_datasource_connection,
debug=debug)
debug=debug,
event_payload=event_payload)
last = False
base = target = None
if not recipe_config.base.is_file_specified() and not recipe_config.target.is_file_specified():
Expand All @@ -105,6 +111,7 @@ def compare_with_recipe(ref, **kwargs):
target = recipe_config.target.get_run_report()

if not is_recipe_dry_run():
event_payload.step = "compare reports"
from piperider_cli.compare_report import CompareReport
CompareReport.exec(a=base, b=target, last=last, datasource=None,
output=kwargs.get('output'), tables_from="all",
Expand All @@ -115,7 +122,24 @@ def compare_with_recipe(ref, **kwargs):
project_name=project_name,
show_progress=True,
debug=debug)

status = True
reason = 'ok'
event_payload.step = "done"
return 0
except SystemExit as e:
reason = 'error'
raise e
except KeyboardInterrupt as e:
reason = 'aborted'
raise e
except Exception as e:
reason = 'fatal'
raise e

return ret
finally:
end_time = time.time()
duration = end_time - start_time
event_payload.status = status
event_payload.reason = reason
event_payload.duration = duration
log_event(event_payload.to_dict(), 'compare')
221 changes: 126 additions & 95 deletions piperider_cli/cli_utils/run_cmd.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import os
import sys
import time

from rich.console import Console

from piperider_cli.event import log_event


def run(**kwargs):
'Profile data source, run assertions, and generate report(s). By default, the raw results and reports are saved in ".piperider/outputs".'
Expand All @@ -11,101 +14,129 @@ def run(**kwargs):
from piperider_cli.cli_utils.cloud import CloudConnectorHelper
from piperider_cli.configuration import FileSystem, is_piperider_workspace_exist
from piperider_cli.error import DbtProjectNotFoundError, PipeRiderConflictOptionsError
from piperider_cli.exitcode import EC_ERR_TEST_FAILED, EC_WARN_NO_PROFILED_MODULES
from piperider_cli.exitcode import EC_WARN_NO_PROFILED_MODULES
from piperider_cli.generate_report import GenerateReport
from piperider_cli.guide import Guide
from piperider_cli.initializer import Initializer
from piperider_cli.runner import Runner

datasource = kwargs.get('datasource')
table = kwargs.get('table')
output = kwargs.get('output')
open_report = kwargs.get('open')
enable_share = kwargs.get('share')
skip_report = kwargs.get('skip_report')
dbt_target_path = kwargs.get('dbt_target_path')
dbt_list = kwargs.get('dbt_list')
force_upload = kwargs.get('upload')
project_name = kwargs.get('project')
select = kwargs.get('select')
state = kwargs.get('state')

if project_name is not None:
os.environ.get('PIPERIDER_API_PROJECT')

console = Console()
env_dbt_resources = os.environ.get('PIPERIDER_DBT_RESOURCES')

# True -> 1, False -> 0
if sum([True if table else False, dbt_list, env_dbt_resources is not None]) > 1:
console.print("[bold red]Error:[/bold red] "
"['--table', '--dbt-list'] are mutually exclusive")
sys.exit(1)

# Search dbt project config files
dbt_project_dir = kwargs.get('dbt_project_dir')
no_auto_search = kwargs.get('no_auto_search')
dbt_project_path = DbtUtil.get_dbt_project_path(dbt_project_dir, no_auto_search, recursive=False)
dbt_profiles_dir = kwargs.get('dbt_profiles_dir')
if dbt_project_path:
working_dir = os.path.dirname(dbt_project_path) if dbt_project_path.endswith('.yml') else dbt_project_path
FileSystem.set_working_directory(working_dir)
if dbt_profiles_dir:
FileSystem.set_dbt_profiles_dir(dbt_profiles_dir)
# Only run initializer when dbt project path is provided
Initializer.exec(dbt_project_path=dbt_project_path, dbt_profiles_dir=dbt_profiles_dir, interactive=False)
elif is_piperider_workspace_exist() is False:
raise DbtProjectNotFoundError()

dbt_resources = None
if select and dbt_list is True:
raise PipeRiderConflictOptionsError(
'Cannot use options "--select" with "--dbt-list"',
hint='Remove "--select" option and use "--dbt-list" instead.'
)

if dbt_list:
dbt_resources = DbtUtil.read_dbt_resources(sys.stdin)
if env_dbt_resources is not None:
dbt_resources = DbtUtil.read_dbt_resources(env_dbt_resources)

ret = Runner.exec(datasource=datasource,
table=table,
output=output,
skip_report=skip_report,
dbt_target_path=dbt_target_path,
dbt_resources=dbt_resources,
dbt_select=select,
dbt_state=state,
report_dir=kwargs.get('report_dir'),
skip_datasource_connection=kwargs.get('skip_datasource'))
if ret in (0, EC_ERR_TEST_FAILED, EC_WARN_NO_PROFILED_MODULES):
if enable_share:
force_upload = True

auto_upload = CloudConnectorHelper.is_auto_upload()
is_cloud_view = (force_upload or auto_upload)

if not skip_report:
GenerateReport.exec(None, kwargs.get('report_dir'), output, open_report, is_cloud_view)

if ret == EC_WARN_NO_PROFILED_MODULES:
# No module was profiled
if dbt_list or dbt_resources or select:
Guide().show('No resources was profiled. Please check given "--select", "--dbt-list" option or '
'environment variable "PIPERIDER_DBT_RESOURCES" to choose the resources to profile.')
ret = 0

if CloudConnectorHelper.is_login() and is_cloud_view:
ret = CloudConnectorHelper.upload_latest_report(report_dir=kwargs.get('report_dir'),
debug=kwargs.get('debug'),
open_report=open_report, enable_share=enable_share,
project_name=project_name)
elif not CloudConnectorHelper.is_login() and is_cloud_view:
console = Console()
console.print('[bold yellow]Warning: [/bold yellow]The report is not uploaded due to not logged in.')

if ret != 0:
if ret != EC_WARN_NO_PROFILED_MODULES:
sys.exit(ret)
return ret
from piperider_cli.runner import Runner, RunEventPayload

status = False
reason = None
start_time = time.time()
event_payload = RunEventPayload()
event_payload.step = 'start'
try:
datasource = kwargs.get('datasource')
table = kwargs.get('table')
output = kwargs.get('output')
open_report = kwargs.get('open')
enable_share = kwargs.get('share')
skip_report = kwargs.get('skip_report')
dbt_target_path = kwargs.get('dbt_target_path')
dbt_list = kwargs.get('dbt_list')
force_upload = kwargs.get('upload')
project_name = kwargs.get('project')
select = kwargs.get('select')
state = kwargs.get('state')

if project_name is not None:
os.environ.get('PIPERIDER_API_PROJECT')

console = Console()
env_dbt_resources = os.environ.get('PIPERIDER_DBT_RESOURCES')

# True -> 1, False -> 0
if sum([True if table else False, dbt_list, env_dbt_resources is not None]) > 1:
console.print("[bold red]Error:[/bold red] "
"['--table', '--dbt-list'] are mutually exclusive")
sys.exit(1)

# Search dbt project config files
dbt_project_dir = kwargs.get('dbt_project_dir')
no_auto_search = kwargs.get('no_auto_search')
dbt_project_path = DbtUtil.get_dbt_project_path(dbt_project_dir, no_auto_search, recursive=False)
dbt_profiles_dir = kwargs.get('dbt_profiles_dir')
if dbt_project_path:
working_dir = os.path.dirname(dbt_project_path) if dbt_project_path.endswith('.yml') else dbt_project_path
FileSystem.set_working_directory(working_dir)
if dbt_profiles_dir:
FileSystem.set_dbt_profiles_dir(dbt_profiles_dir)
# Only run initializer when dbt project path is provided
Initializer.exec(dbt_project_path=dbt_project_path, dbt_profiles_dir=dbt_profiles_dir, interactive=False)
elif is_piperider_workspace_exist() is False:
raise DbtProjectNotFoundError()

dbt_resources = None
if select and dbt_list is True:
raise PipeRiderConflictOptionsError(
'Cannot use options "--select" with "--dbt-list"',
hint='Remove "--select" option and use "--dbt-list" instead.'
)

if dbt_list:
dbt_resources = DbtUtil.read_dbt_resources(sys.stdin)
if env_dbt_resources is not None:
dbt_resources = DbtUtil.read_dbt_resources(env_dbt_resources)

ret = Runner.exec(datasource=datasource,
table=table,
output=output,
skip_report=skip_report,
dbt_target_path=dbt_target_path,
dbt_resources=dbt_resources,
dbt_select=select,
dbt_state=state,
report_dir=kwargs.get('report_dir'),
skip_datasource_connection=kwargs.get('skip_datasource'),
event_payload=event_payload)
if ret in (0, EC_WARN_NO_PROFILED_MODULES):
if enable_share:
force_upload = True

auto_upload = CloudConnectorHelper.is_auto_upload()
is_cloud_view = (force_upload or auto_upload)

if not skip_report:
GenerateReport.exec(None, kwargs.get('report_dir'), output, open_report, is_cloud_view)

if ret == EC_WARN_NO_PROFILED_MODULES:
# No module was profiled
if dbt_list or dbt_resources or select:
Guide().show('No resources was profiled. Please check given "--select", "--dbt-list" option or '
'environment variable "PIPERIDER_DBT_RESOURCES" to choose the resources to profile.')
ret = 0

event_payload.step = 'upload'
if CloudConnectorHelper.is_login() and is_cloud_view:
ret = CloudConnectorHelper.upload_latest_report(report_dir=kwargs.get('report_dir'),
debug=kwargs.get('debug'),
open_report=open_report, enable_share=enable_share,
project_name=project_name)
elif not CloudConnectorHelper.is_login() and is_cloud_view:
console = Console()
console.print('[bold yellow]Warning: [/bold yellow]The report is not uploaded due to not logged in.')

if ret != 0:
reason = 'error'
return ret

status = True
reason = 'ok'
event_payload.step = 'done'
return 0
except SystemExit as e:
reason = 'error'
raise e
except KeyboardInterrupt as e:
reason = 'aborted'
raise e
except Exception as e:
reason = 'fatal'
raise e
finally:
end_time = time.time()
duration = end_time - start_time
event_payload.status = status
event_payload.reason = reason
event_payload.duration = duration
log_event(event_payload.to_dict(), 'run')
25 changes: 25 additions & 0 deletions piperider_cli/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,31 @@ def _verify_input_config(self):
if not isinstance(self.excludes, List):
raise PipeRiderConfigTypeError("'excludes' should be a list of tables' name")

def get_datasource(self, datasource: str = None):
datasources = {}
datasource_names = []
for ds in self.dataSources:
datasource_names.append(ds.name)
datasources[ds.name] = ds

if len(datasource_names) == 0:
return None

if datasource:
ds_name = datasource
else:
# if global dbt config exists, use dbt profile target
# else use the first datasource
ds_name = self.dbt.get('target') if self.dbt else datasource_names[0]

if ds_name not in datasource_names:
console = Console()
console.print(f"[[bold red]Error[/bold red]] Datasource '{ds_name}' doesn't exist")
console.print(f"Available datasources: {', '.join(datasource_names)}")
return None

return datasources[ds_name]

def get_telemetry_id(self):
if self.telemetry_id is not None:
return self.telemetry_id
Expand Down
33 changes: 21 additions & 12 deletions piperider_cli/event/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@
import uuid
from typing import Union

import sentry_sdk
from rich.console import Console
from ruamel import yaml

from piperider_cli import PIPERIDER_USER_HOME, PIPERIDER_USER_PROFILE
from piperider_cli import PIPERIDER_USER_HOME, PIPERIDER_USER_PROFILE, is_executed_manually
from piperider_cli.event.collector import Collector
from .events import CompareEventPayload

PIPERIDER_USER_EVENT_PATH = os.path.join(PIPERIDER_USER_HOME, '.unsend_events.json')
PIPERIDER_FLUSH_EVENTS_WHITELIST = ['init', 'run', 'generate-report', 'compare-reports']
PIPERIDER_FLUSH_EVENTS_WHITELIST = ['init', 'run', 'generate-report', 'compare-reports', 'compare']

_collector = Collector()
_yml = yaml.YAML()
Expand Down Expand Up @@ -98,16 +100,6 @@ def flush_events(command=None):
_collector.send_events()


def log_usage_event(command, params, status):
prop = dict(
command=command,
status=status,
upload=params.get('upload', False),
share=params.get('share', False)
)
log_event(prop, 'usage', params=params)


def log_event(prop, event_type, **kwargs):
with open(PIPERIDER_USER_PROFILE, 'r') as f:
user_profile = _yml.load(f)
Expand Down Expand Up @@ -135,6 +127,23 @@ def log_event(prop, event_type, **kwargs):
_collector.log_event(payload, event_type)


def capture_exception(e):
user_id = load_user_profile().get('user_id')
if is_executed_manually() is False:
project_info = _obtain_project_info()
project_id = project_info.get('project_id')
if not project_id:
return
user_id = f"{project_id}_CI"

sentry_sdk.set_tag("user_id", user_id)
sentry_sdk.capture_exception(e)


def flush_exceptions():
sentry_sdk.flush()


class UserProfileConfigurator(object):
@staticmethod
def update(key: str, val: Union[str, bool], name: str = None):
Expand Down
Loading

0 comments on commit ec45850

Please sign in to comment.