Skip to content

Commit

Permalink
ci-test
Browse files Browse the repository at this point in the history
  • Loading branch information
alangenfeld committed Apr 10, 2024
1 parent 73633a8 commit b54977c
Show file tree
Hide file tree
Showing 10 changed files with 176 additions and 48 deletions.
98 changes: 92 additions & 6 deletions dagster-cloud-cli/dagster_cloud_cli/commands/ci/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import sys
from collections import Counter
from enum import Enum
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, cast

import typer
from typer import Typer
Expand All @@ -28,11 +28,10 @@
get_org_url,
)
from dagster_cloud_cli.core import pex_builder, pydantic_yaml

from .. import metrics
from . import checks, report, state

app = Typer(hidden=True, help="CI/CD agnostic commands")
from dagster_cloud_cli.core.artifacts import (
download_organization_artifact,
upload_organization_artifact,
)
from dagster_cloud_cli.core.pex_builder import (
code_location,
deps,
Expand All @@ -42,6 +41,11 @@
)
from dagster_cloud_cli.types import CliEventTags, CliEventType

from .. import metrics
from . import checks, report, state

app = Typer(hidden=True, help="CI/CD agnostic commands")


@app.command(help="Print json information about current CI/CD environment")
def inspect(project_dir: str):
Expand Down Expand Up @@ -204,6 +208,10 @@ def init(
status_url: Optional[str] = None,
):
yaml_path = pathlib.Path(project_dir) / dagster_cloud_yaml_path
if not yaml_path.exists():
raise ui.error(
f"Dagster Cloud yaml file not found at specified path {yaml_path.resolve()}."
)
locations_def = pydantic_yaml.load_dagster_cloud_yaml(yaml_path.read_text())
locations = locations_def.locations
if location_name:
Expand All @@ -217,6 +225,7 @@ def init(
url = get_org_url(organization, dagster_env)
# Deploy to the branch deployment for the current context. If there is no branch deployment
# available (eg. if not in a PR) then we fallback to the --deployment flag.

try:
branch_deployment = get_deployment_from_context(url, project_dir)
if deployment:
Expand All @@ -225,9 +234,11 @@ def init(
f" --deployment={deployment}"
)
deployment = branch_deployment
is_branch_deployment = True
except ValueError as err:
if deployment:
ui.print(f"Deploying to {deployment}. No branch deployment ({err}).")
is_branch_deployment = False
else:
raise ui.error(
f"Cannot determine deployment name in current context ({err}). Please specify"
Expand All @@ -245,6 +256,7 @@ def init(
deployment_name=deployment,
location_file=str(yaml_path.absolute()),
location_name=location.location_name,
is_branch_deployment=is_branch_deployment,
build=state.BuildMetadata(
git_url=git_url, commit_hash=commit_hash, build_config=location.build
),
Expand Down Expand Up @@ -703,3 +715,77 @@ def _deploy(
agent_heartbeat_timeout=agent_heartbeat_timeout,
url=deployment_url,
)


dagster_dbt_app = typer.Typer(
hidden=True,
help="Dagster Cloud commands for managing the `dagster-dbt` integration.",
add_completion=False,
)
app.add_typer(dagster_dbt_app, name="dagster-dbt", no_args_is_help=True)

project_app = typer.Typer(
name="project",
no_args_is_help=True,
help="Commands for using a dbt project in Dagster.",
add_completion=False,
)
dagster_dbt_app.add_typer(project_app, name="project", no_args_is_help=True)


@project_app.command(
name="manage-state",
help="""
This CLI command will handle uploading and downloading artifacts if `state_dir` is specified on
`DbtProject`.
""",
)
def manage_state_command(
statedir: str = STATEDIR_OPTION,
file: str = typer.Option(),
source_deployment: str = typer.Option(
default="prod",
help="Which deployment should upload its manifest.json.",
),
key_prefix: str = typer.Option(
default="",
help="A key prefix for the key the manifest.json is saved with.",
),
):
try:
from dagster_dbt import DbtProject
except:
ui.print(
"Unable to import dagster_dbt, can not use dbt-prepare-for-deployment when dagster_dbt is not installed."
)
return
from dagster._core.code_pointer import load_python_file
from dagster._core.definitions.load_assets_from_modules import find_objects_in_module_of_types

state_store = state.FileStore(statedir=statedir)
locations = state_store.list_locations()
if not locations:
raise ui.error("Unable to determine deployment state.")

location = locations[0]
deployment_name = location.deployment_name
is_branch = location.is_branch_deployment

contents = load_python_file(file, None)
for project in find_objects_in_module_of_types(contents, DbtProject):
project = cast(DbtProject, project)
if project.state_path:
download_path = project.state_path.joinpath("manifest.json")
key = f"{key_prefix}{os.fspath(download_path)}"
if is_branch:
ui.print(f"Downloading {source_deployment} manifest for branch deployment.")
os.makedirs(project.state_path, exist_ok=True)
download_organization_artifact(key, download_path)
ui.print("Download complete.")

elif deployment_name == source_deployment:
ui.print(f"Uploading {source_deployment} manifest.")
upload_organization_artifact(key, project.manifest_path)
ui.print("Upload complete")

ui.print("Project ready")
1 change: 1 addition & 0 deletions dagster-cloud-cli/dagster_cloud_cli/commands/ci/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class LocationState(BaseModel, extra=Extra.forbid):
deployment_name: str
location_file: str
location_name: str
is_branch_deployment: bool
selected: bool = True
build: BuildMetadata
build_output: Optional[Union[DockerBuildOutput, PexBuildOutput]] = Field(
Expand Down
2 changes: 1 addition & 1 deletion dagster-cloud-cli/dagster_cloud_cli/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.7.0"
__version__ = "1!0+dev"
6 changes: 4 additions & 2 deletions dagster-cloud-cli/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,17 @@

def get_version() -> str:
version: Dict[str, str] = {}
with open(Path(__file__).parent / "dagster_cloud_cli/version.py", encoding="utf8") as fp:
with open(
Path(__file__).parent / "dagster_cloud_cli/version.py", encoding="utf8"
) as fp:
exec(fp.read(), version)

return version["__version__"]


setup(
name="dagster-cloud-cli",
version=get_version(),
version="1.7.0", # get_version(),
author_email="[email protected]",
packages=find_packages(exclude=["dagster_cloud.cli_tests*"]),
include_package_data=True,
Expand Down
5 changes: 4 additions & 1 deletion dagster-cloud/dagster_cloud/anomaly_detection/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from .defs import (
build_anomaly_detection_freshness_checks as build_anomaly_detection_freshness_checks,
)
from .types import AnomalyDetectionModelParams as AnomalyDetectionModelParams
from .types import (
AnomalyDetectionModelParams as AnomalyDetectionModelParams,
BetaFreshnessAnomalyDetectionParams as BetaFreshnessAnomalyDetectionParams,
)
26 changes: 13 additions & 13 deletions dagster-cloud/dagster_cloud/anomaly_detection/defs.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def _build_check_for_assets(
@multi_asset_check(
specs=[
AssetCheckSpec(
name="freshness_anomaly_detection_check",
name="anomaly_detection_freshness_check",
description=f"Detects anomalies in the freshness of the asset using model {params.model_version.value.lower()}.",
asset=asset_key,
)
Expand All @@ -65,7 +65,9 @@ def the_check(context: AssetCheckExecutionContext) -> Iterable[AssetCheckResult]
)
instance = cast(DagsterCloudAgentInstance, context.instance)
with create_cloud_webserver_client(
instance.dagit_url,
instance.dagit_url[:-1]
if instance.dagit_url.endswith("/")
else instance.dagit_url, # Remove trailing slash
check.str_param(instance.dagster_cloud_agent_token, "dagster_cloud_agent_token"),
) as client:
for check_key in context.selected_asset_check_keys:
Expand All @@ -82,16 +84,17 @@ def the_check(context: AssetCheckExecutionContext) -> Iterable[AssetCheckResult]
},
},
)
data = result["data"]["anomalyDetectionInference"]
metadata = {
"model_params": {**params.as_metadata},
"model_version": params.model_version.value,
}
if result["anomalyDetectionInference"]["__typename"] != "AnomalyDetectionSuccess":
if data["__typename"] != "AnomalyDetectionSuccess":
yield handle_anomaly_detection_inference_failure(
result, metadata, params, asset_key
data, metadata, params, asset_key
)
continue
response = result["anomalyDetectionInference"]["response"]
response = result["data"]["anomalyDetectionInference"]["response"]
overdue_seconds = check.float_param(response["overdue_seconds"], "overdue_seconds")
overdue_deadline_timestamp = response["overdue_deadline_timestamp"]
metadata["overdue_deadline_timestamp"] = MetadataValue.timestamp(
Expand Down Expand Up @@ -148,25 +151,22 @@ def the_check(context: AssetCheckExecutionContext) -> Iterable[AssetCheckResult]


def handle_anomaly_detection_inference_failure(
result: dict, metadata: dict, params: AnomalyDetectionModelParams, asset_key: AssetKey
data: dict, metadata: dict, params: AnomalyDetectionModelParams, asset_key: AssetKey
) -> AssetCheckResult:
if (
result["anomalyDetectionInference"]["__typename"] == "AnomalyDetectionFailure"
and result["anomalyDetectionInference"]["message"]
== params.model_version.minimum_required_records_msg
data["__typename"] == "AnomalyDetectionFailure"
and data["message"] == params.model_version.minimum_required_records_msg
):
# Intercept failure in the case of not enough records, and return a pass to avoid
# being too noisy with failures.
return AssetCheckResult(
passed=True,
severity=AssetCheckSeverity.WARN,
metadata=metadata,
description=result["anomalyDetectionInference"]["message"],
description=data["message"],
asset_key=asset_key,
)
raise DagsterCloudAnomalyDetectionFailed(
f"Anomaly detection failed: {result['anomalyDetectionInference']['message']}"
)
raise DagsterCloudAnomalyDetectionFailed(f"Anomaly detection failed: {data['message']}")


def build_anomaly_detection_freshness_checks(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def jaffle_shop_dbt_assets(
)
if row.bytes_billed or row.slots_ms:
cost_info = BigQueryCostInfo(
asset_key, partition, row.job_id, row.bytes_billed, row.slots_ms
asset_key, partition, row.job_id, row.slots_ms, row.bytes_billed
)
cost_by_asset[cost_info.asset_partition_key].append(cost_info)
except:
Expand Down
2 changes: 1 addition & 1 deletion dagster-cloud/dagster_cloud/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.7.0"
__version__ = "1!0+dev"
Loading

0 comments on commit b54977c

Please sign in to comment.