Skip to content

Commit

Permalink
feat(agents-api): refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
HamadaSalhab committed Oct 1, 2024
1 parent 25370c1 commit f3de527
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,13 @@

@activity.defn
async def raise_complete_async(context: StepContext, output: StepOutcome) -> None:
# TODO: Create a transtition to "wait" and save the captured_token to the transition

activity_info = activity.info()

captured_token = activity_info.task_token
captured_token = base64.b64encode(activity_info.task_token).decode('ascii')
activity_id = activity_info.activity_id
workflow_run_id = activity_info.workflow_run_id
workflow_id = activity_info.workflow_id
print("activity_id")
print(activity_id)

captured_token = base64.b64encode(captured_token).decode('ascii')

transition_info = CreateTransitionRequest(
current=context.cursor,
Expand All @@ -32,14 +27,11 @@ async def raise_complete_async(context: StepContext, output: StepOutcome) -> Non
task_token=captured_token,
metadata={
"x-activity-id": activity_id,
"workflow_run_id": workflow_run_id,
"workflow_id": workflow_id,
"x-run-id": workflow_run_id,
"x-workflow-id": workflow_id,
},
)

await original_transition_step(context, transition_info)

# await transition(context, output=output, type="wait", next=None, task_token=captured_token)

print("transition to wait called")
activity.raise_complete_async()
17 changes: 4 additions & 13 deletions agents-api/agents_api/routers/tasks/update_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ async def update_execution(
execution_id: UUID,
data: ResumeExecutionRequest | StopExecutionRequest,
):
print("inside update execution")
print("getting temporal client")
temporal_client = await get_client()

match data:
Expand All @@ -38,18 +36,16 @@ async def update_execution(
)
await wf_handle.cancel()
except Exception as e:
print(f"Error stopping execution: {e}")
raise HTTPException(status_code=500, detail="Failed to stop execution")

case ResumeExecutionRequest():
print("Resuming execution")
token_data = get_paused_execution_token(
developer_id=x_developer_id, execution_id=execution_id
)
activity_id = token_data["metadata"].get("x-activity-id", None)
workflow_run_id = token_data["metadata"].get("workflow_run_id", None)
workflow_id = token_data["metadata"].get("workflow_id", None)
if activity_id is None or workflow_run_id is None or workflow_id is None:
run_id = token_data["metadata"].get("x-run-id", None)
workflow_id = token_data["metadata"].get("x-workflow-id", None)
if activity_id is None or run_id is None or workflow_id is None:
act_handle = temporal_client.get_async_activity_handle(
task_token=base64.b64decode(token_data["task_token"].encode('ascii')),
)
Expand All @@ -58,16 +54,11 @@ async def update_execution(
act_handle = temporal_client.get_async_activity_handle(
activity_id=activity_id,
workflow_id=workflow_id,
run_id=workflow_run_id,
run_id=run_id,
)
try:
print("Activity id")
print(act_handle._id_or_token)
await act_handle.complete(data.input)
print("Resumed execution successfully")
except Exception as e:
print(f"Error resuming execution: {e}")
raise HTTPException(status_code=500, detail="Failed to resume execution")
case _:
print("Invalid request dataaaa")
raise HTTPException(status_code=400, detail="Invalid request data")

0 comments on commit f3de527

Please sign in to comment.