Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(workflows): Add support for simulation integration #1993

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ Changes are grouped as follows
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## [7.63.11] - 2024-10-24
### Added
- Added support for simulation tasks in data workflows.

## [7.63.10] - 2024-10-22
### Fixed
- The Not() filter now only accepts a single filter (and no longer silently ignores the rest).
Expand Down
2 changes: 1 addition & 1 deletion cognite/client/_version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from __future__ import annotations

__version__ = "7.63.10"
__version__ = "7.63.11"
__api_subversion__ = "20230101"
4 changes: 4 additions & 0 deletions cognite/client/data_classes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,8 @@
DynamicTaskParameters,
FunctionTaskOutput,
FunctionTaskParameters,
SimulationTaskOutput,
SimulationTaskParameters,
SubworkflowTaskParameters,
TransformationTaskOutput,
TransformationTaskParameters,
Expand Down Expand Up @@ -562,10 +564,12 @@
"WorkflowVersionList",
"FunctionTaskParameters",
"TransformationTaskParameters",
"SimulationTaskParameters",
"CDFTaskParameters",
"DynamicTaskParameters",
"SubworkflowTaskParameters",
"FunctionTaskOutput",
"SimulationTaskOutput",
"TransformationTaskOutput",
"CDFTaskOutput",
"DynamicTaskOutput",
Expand Down
92 changes: 91 additions & 1 deletion cognite/client/data_classes/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def as_write(self) -> WorkflowUpsertList:
return WorkflowUpsertList([workflow.as_write() for workflow in self.data])


ValidTaskType = Literal["function", "transformation", "cdf", "dynamic", "subworkflow"]
ValidTaskType = Literal["function", "transformation", "cdf", "dynamic", "subworkflow", "simulator"]


class WorkflowTaskParameters(CogniteObject, ABC):
Expand Down Expand Up @@ -140,6 +140,8 @@ def load_parameters(cls, data: dict) -> WorkflowTaskParameters:
return SubworkflowTaskParameters._load(parameters)
elif type_ == "subworkflow" and "workflowExternalId" in parameters["subworkflow"]:
return SubworkflowReferenceParameters._load(parameters)
elif type_ == "simulator":
return SimulationTaskParameters._load(parameters)
else:
raise ValueError(f"Unknown task type: {type_}. Expected {ValidTaskType}")

Expand Down Expand Up @@ -219,6 +221,48 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]:
return output


class SimulationTaskParameters(WorkflowTaskParameters):
"""
The simulation parameters are used to specify the simulation routine to be executed.

Args:
routine_external_id (str): The external ID of the simulation routine to be executed.
run_time (int | None): Reference timestamp used for data pre-processing and data sampling.
inputs (list[dict[str, Any]] | None): List of input overrides
"""

task_type = "simulator"

def __init__(
self,
routine_external_id: str,
run_time: int | None = None,
inputs: list[dict[str, Any]] | None = None,
) -> None:
self.routine_external_id = routine_external_id
self.run_time = run_time
self.inputs = inputs

@classmethod
def _load(cls, resource: dict, cognite_client: CogniteClient | None = None) -> SimulationTaskParameters:
simulation: dict[str, Any] = resource["simulator"]

return cls(
routine_external_id=simulation["routineExternalId"],
run_time=simulation.get("runTime"),
inputs=simulation.get("inputs"),
)

def dump(self, camel_case: bool = True) -> dict[str, Any]:
simulation = {
"routineExternalId" if camel_case else "routine_external_id": self.routine_external_id,
"runTime" if camel_case else "run_time": self.run_time,
"inputs": self.inputs,
}

return {"simulator": simulation}


class TransformationTaskParameters(WorkflowTaskParameters):
"""
The transformation parameters are used to specify the transformation to be called.
Expand Down Expand Up @@ -521,6 +565,8 @@ def load_output(cls, data: dict) -> WorkflowTaskOutput:
return DynamicTaskOutput.load(data)
elif task_type == "subworkflow":
return SubworkflowTaskOutput.load(data)
elif task_type == "simulator":
return SimulationTaskOutput.load(data)
else:
raise ValueError(f"Unknown task type: {task_type}")

Expand Down Expand Up @@ -560,6 +606,50 @@ def dump(self, camel_case: bool = True) -> dict[str, Any]:
}


class SimulationTaskOutput(WorkflowTaskOutput):
"""
The class represent the output of Simulation execution.

Args:
run_id (int | None): The run id of the simulation execution in the SimInt API.
logs (list[dict[str, Any]] | None): Logs from the simulation execution.
status_message (str | None): Status of the current simulation execution.
outputs (list[dict[str, Any]] | None): Outputs results from the simulation execution
"""

task_type: ClassVar[str] = "simulator"

def __init__(
self,
run_id: int | None,
logs: list[dict[str, Any]] | None,
status_message: str | None,
outputs: list[dict[str, Any]] | None,
) -> None:
self.run_id = run_id
self.logs = logs
self.status_message = status_message
self.outputs = outputs

@classmethod
def load(cls, data: dict[str, Any]) -> SimulationTaskOutput:
output = data["output"]
return cls(
run_id=output.get("runId"),
logs=output.get("logs"),
status_message=output.get("statusMessage"),
outputs=output.get("outputs"),
)

def dump(self, camel_case: bool = True) -> dict[str, Any]:
return {
"runId" if camel_case else "run_id": self.run_id,
"logs": self.logs,
"statusMessage" if camel_case else "status_message": self.status_message,
"outputs": self.outputs,
}


class TransformationTaskOutput(WorkflowTaskOutput):
"""
The transformation output is used to specify the output of a transformation task.
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[tool.poetry]
name = "cognite-sdk"

version = "7.63.10"
version = "7.63.11"
description = "Cognite Python SDK"
readme = "README.md"
documentation = "https://cognite-sdk-python.readthedocs-hosted.com"
Expand Down
Binary file added tests/data/simulators/ShowerMixer.dwxmz
Binary file not shown.
Empty file.
Loading