From 2d7003b034aa80adfdf6756f6f74b16bb1a38cdb Mon Sep 17 00:00:00 2001 From: Carolyn Nguyen Date: Sat, 11 Sep 2021 14:06:42 -0700 Subject: [PATCH 01/15] Add support for nested Aws StepFunctions service integration --- doc/services.rst | 7 ++ src/stepfunctions/steps/__init__.py | 1 + .../steps/integration_resources.py | 1 + src/stepfunctions/steps/service.py | 61 ++++++++++ tests/unit/test_service_steps.py | 106 ++++++++++++++++++ 5 files changed, 176 insertions(+) diff --git a/doc/services.rst b/doc/services.rst index eee4e9e..c2ac225 100644 --- a/doc/services.rst +++ b/doc/services.rst @@ -20,6 +20,8 @@ This module provides classes to build steps that integrate with Amazon DynamoDB, - `Amazon SQS <#amazon-sqs>`__ +- `AWS Step Functions <#aws-step-functions>`__ + Amazon DynamoDB ---------------- @@ -82,3 +84,8 @@ Amazon SNS Amazon SQS ----------- .. autoclass:: stepfunctions.steps.service.SqsSendMessageStep + +AWS Step Functions +------------------ +.. autoclass:: stepfunctions.steps.service.StepFunctionsStartExecutionStep + diff --git a/src/stepfunctions/steps/__init__.py b/src/stepfunctions/steps/__init__.py index 93bc0d9..cb71b3b 100644 --- a/src/stepfunctions/steps/__init__.py +++ b/src/stepfunctions/steps/__init__.py @@ -34,3 +34,4 @@ from stepfunctions.steps.service import EventBridgePutEventsStep from stepfunctions.steps.service import GlueDataBrewStartJobRunStep from stepfunctions.steps.service import SnsPublishStep, SqsSendMessageStep +from stepfunctions.steps.service import StepFunctionsStartExecutionStep diff --git a/src/stepfunctions/steps/integration_resources.py b/src/stepfunctions/steps/integration_resources.py index 5223f14..e7ffcb4 100644 --- a/src/stepfunctions/steps/integration_resources.py +++ b/src/stepfunctions/steps/integration_resources.py @@ -25,6 +25,7 @@ class IntegrationPattern(Enum): WaitForTaskToken = "waitForTaskToken" WaitForCompletion = "sync" RequestResponse = "" + WaitForCompletionWithJsonResponse = "sync:2" def get_service_integration_arn(service, api, integration_pattern=IntegrationPattern.RequestResponse): diff --git a/src/stepfunctions/steps/service.py b/src/stepfunctions/steps/service.py index a0c3950..09a2fea 100644 --- a/src/stepfunctions/steps/service.py +++ b/src/stepfunctions/steps/service.py @@ -24,6 +24,7 @@ GLUE_DATABREW_SERVICE_NAME = "databrew" SNS_SERVICE_NAME = "sns" SQS_SERVICE_NAME = "sqs" +STEP_FUNCTIONS_SERVICE_NAME = "states" class DynamoDBApi(Enum): @@ -70,6 +71,10 @@ class SqsApi(Enum): SendMessage = "sendMessage" +class StepFunctions(Enum): + StartExecution = "startExecution" + + class DynamoDBGetItemStep(Task): """ Creates a Task state to get an item from DynamoDB. See `Call DynamoDB APIs with Step Functions `_ for more details. @@ -887,3 +892,59 @@ def __init__(self, state_id, **kwargs): ElasticMapReduceApi.ModifyInstanceGroupByName) super(EmrModifyInstanceGroupByNameStep, self).__init__(state_id, **kwargs) + + +class StepFunctionsStartExecutionStep(Task): + + """ + Creates a Task state that starts an execution of another state machine. See `Manage AWS Step Functions Executions as an Integrated Service Date: Sun, 12 Sep 2021 13:39:34 -0700 Subject: [PATCH 02/15] Json response by default when wait_for_completion enabled and raise exception when more than one flag is enabled --- src/stepfunctions/steps/service.py | 35 +++++++++++++++++++++--------- tests/unit/test_service_steps.py | 26 ++++++++++++++-------- 2 files changed, 42 insertions(+), 19 deletions(-) diff --git a/src/stepfunctions/steps/service.py b/src/stepfunctions/steps/service.py index 09a2fea..dc4b5ca 100644 --- a/src/stepfunctions/steps/service.py +++ b/src/stepfunctions/steps/service.py @@ -898,13 +898,18 @@ class StepFunctionsStartExecutionStep(Task): """ Creates a Task state that starts an execution of another state machine. See `Manage AWS Step Functions Executions as an Integrated Service 1: + raise ValueError(f"More than 1 activated resource flags: {enabled_property_flags} - " + f"Please enable one at most.") + if wait_for_callback: """ Example resource arn: arn:aws:states:::states:startExecution.waitForTaskToken @@ -926,20 +940,21 @@ def __init__(self, state_id, wait_for_callback=False, wait_for_completion=True, StepFunctions.StartExecution, IntegrationPattern.WaitForTaskToken) elif wait_for_completion: - if json_ouput: + if string_response: """ - Example resource arn:aws:states:::states:startExecution.sync:2 + Example resource arn:aws:states:::states:startExecution.sync """ kwargs[Field.Resource.value] = get_service_integration_arn(STEP_FUNCTIONS_SERVICE_NAME, - StepFunctions.StartExecution, - IntegrationPattern.WaitForCompletionWithJsonResponse) + StepFunctions.StartExecution, + IntegrationPattern.WaitForCompletion) else: """ - Example resource arn:aws:states:::states:startExecution.sync + Example resource arn:aws:states:::states:startExecution.sync:2 """ + kwargs[Field.Resource.value] = get_service_integration_arn(STEP_FUNCTIONS_SERVICE_NAME, - StepFunctions.StartExecution, - IntegrationPattern.WaitForCompletion) + StepFunctions.StartExecution, + IntegrationPattern.WaitForCompletionWithJsonResponse) else: """ Example resource arn:aws:states:::states:startExecution diff --git a/tests/unit/test_service_steps.py b/tests/unit/test_service_steps.py index ac8c7df..33769dd 100644 --- a/tests/unit/test_service_steps.py +++ b/tests/unit/test_service_steps.py @@ -13,6 +13,7 @@ from __future__ import absolute_import import boto3 +import pytest from unittest.mock import patch from stepfunctions.steps.service import DynamoDBGetItemStep, DynamoDBPutItemStep, DynamoDBUpdateItemStep, DynamoDBDeleteItemStep @@ -1188,10 +1189,10 @@ def test_step_functions_start_execution_step_creation(): @patch.object(boto3.session.Session, 'region_name', 'us-east-1') -def test_step_functions_start_execution_step_creation_sync_json_response(): +def test_step_functions_start_execution_step_creation_sync_string_response(): step = StepFunctionsStartExecutionStep( - "SFN Start Execution - Sync with json response", wait_for_callback=False, wait_for_completion=True, - json_ouput=True, parameters={ + "SFN Start Execution - Sync with string response", wait_for_callback=False, wait_for_completion=True, + string_response=True, parameters={ "Input": { "Comment": "Hello world!" }, @@ -1201,7 +1202,7 @@ def test_step_functions_start_execution_step_creation_sync_json_response(): assert step.to_dict() == { "Type": "Task", - "Resource": "arn:aws:states:::states:startExecution.sync:2", + "Resource": "arn:aws:states:::states:startExecution.sync", "Parameters": { "Input": { "Comment": "Hello world!" @@ -1214,10 +1215,10 @@ def test_step_functions_start_execution_step_creation_sync_json_response(): @patch.object(boto3.session.Session, 'region_name', 'us-east-1') -def test_step_functions_start_execution_step_creation_sync_json_response(): +def test_step_functions_start_execution_step_creation_sync(): step = StepFunctionsStartExecutionStep( - "SFN Start Execution - Sync with string response", wait_for_callback=False, wait_for_completion=True, - json_ouput=False, parameters={ + "SFN Start Execution - Sync", wait_for_callback=False, wait_for_completion=True, + parameters={ "Input": { "Comment": "Hello world!" }, @@ -1227,7 +1228,7 @@ def test_step_functions_start_execution_step_creation_sync_json_response(): assert step.to_dict() == { "Type": "Task", - "Resource": "arn:aws:states:::states:startExecution.sync", + "Resource": "arn:aws:states:::states:startExecution.sync:2", "Parameters": { "Input": { "Comment": "Hello world!" @@ -1242,7 +1243,7 @@ def test_step_functions_start_execution_step_creation_sync_json_response(): @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_step_functions_start_execution_step_creation_wait_for_callback(): step = StepFunctionsStartExecutionStep( - "SFN Start Execution - Wait for Callback", wait_for_callback=True, parameters={ + "SFN Start Execution - Wait for Callback", wait_for_callback=True, wait_for_completion=False, parameters={ "Input": { "Comment": "Hello world!", "token.$": "$$.Task.Token" @@ -1264,3 +1265,10 @@ def test_step_functions_start_execution_step_creation_wait_for_callback(): }, "End": True } + + +@patch.object(boto3.session.Session, 'region_name', 'us-east-1') +def test_step_functions_start_execution_step_creation_multiple_enabled_flags_raises_exception(): + with pytest.raises(ValueError): + StepFunctionsStartExecutionStep("SFN Start Execution - Multiple flags", wait_for_callback=True, + wait_for_completion=True) From e03be34652bc33b5a3f60f8b26bdfdc5d2979f48 Mon Sep 17 00:00:00 2001 From: Carolyn Nguyen Date: Sun, 12 Sep 2021 13:54:16 -0700 Subject: [PATCH 03/15] Removed string_response arg - wait_for_completion will use :sync:2 resource that returns a JSON response --- src/stepfunctions/steps/service.py | 17 ++--------------- tests/unit/test_service_steps.py | 26 -------------------------- 2 files changed, 2 insertions(+), 41 deletions(-) diff --git a/src/stepfunctions/steps/service.py b/src/stepfunctions/steps/service.py index dc4b5ca..06e9126 100644 --- a/src/stepfunctions/steps/service.py +++ b/src/stepfunctions/steps/service.py @@ -903,13 +903,12 @@ class StepFunctionsStartExecutionStep(Task): Disable the two flags for the Task state to start an execution of another state machine and proceed to the next step in the workflow without waiting for completion. """ - def __init__(self, state_id, wait_for_callback=False, wait_for_completion=True, string_response=False, **kwargs): + def __init__(self, state_id, wait_for_callback=False, wait_for_completion=True, **kwargs): """ Args: state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine. wait_for_callback(bool, optional): Boolean value set to `True` if the Task state should wait for callback to resume the operation. (default: False) wait_for_completion (bool, optional): Boolean value set to `True` if the Task state should wait to complete before proceeding to the next step in the workflow. (default: True) - string_response(bool, optional): Boolean value set to `True` if the the Task should return a response in string format instead of the default JSON format (default: False) timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60) timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. @@ -921,12 +920,8 @@ def __init__(self, state_id, wait_for_callback=False, wait_for_completion=True, output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ # Validate that only 1 one of the property flags are set to true - enabled_property_flags = [] property_flags = [(wait_for_callback, "wait_for_callback"), (wait_for_completion, "wait_for_completion")] - for property_flag in property_flags: - if property_flag[0]: - enabled_property_flags.append(property_flag[1]) - + enabled_property_flags = [property_flag[1] for property_flag in property_flags if property_flag[0]] if len(enabled_property_flags) > 1: raise ValueError(f"More than 1 activated resource flags: {enabled_property_flags} - " f"Please enable one at most.") @@ -940,14 +935,6 @@ def __init__(self, state_id, wait_for_callback=False, wait_for_completion=True, StepFunctions.StartExecution, IntegrationPattern.WaitForTaskToken) elif wait_for_completion: - if string_response: - """ - Example resource arn:aws:states:::states:startExecution.sync - """ - kwargs[Field.Resource.value] = get_service_integration_arn(STEP_FUNCTIONS_SERVICE_NAME, - StepFunctions.StartExecution, - IntegrationPattern.WaitForCompletion) - else: """ Example resource arn:aws:states:::states:startExecution.sync:2 """ diff --git a/tests/unit/test_service_steps.py b/tests/unit/test_service_steps.py index 33769dd..dae4d78 100644 --- a/tests/unit/test_service_steps.py +++ b/tests/unit/test_service_steps.py @@ -1188,32 +1188,6 @@ def test_step_functions_start_execution_step_creation(): } -@patch.object(boto3.session.Session, 'region_name', 'us-east-1') -def test_step_functions_start_execution_step_creation_sync_string_response(): - step = StepFunctionsStartExecutionStep( - "SFN Start Execution - Sync with string response", wait_for_callback=False, wait_for_completion=True, - string_response=True, parameters={ - "Input": { - "Comment": "Hello world!" - }, - "StateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:HelloWorld", - "Name": "ExecutionName" - }) - - assert step.to_dict() == { - "Type": "Task", - "Resource": "arn:aws:states:::states:startExecution.sync", - "Parameters": { - "Input": { - "Comment": "Hello world!" - }, - "StateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:HelloWorld", - "Name": "ExecutionName" - }, - "End": True - } - - @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_step_functions_start_execution_step_creation_sync(): step = StepFunctionsStartExecutionStep( From d1185a9a099606b083b9acd35b39c7ba613f56a3 Mon Sep 17 00:00:00 2001 From: Carolyn Nguyen Date: Sun, 12 Sep 2021 14:25:55 -0700 Subject: [PATCH 04/15] Add async_call flag to use arn:aws:states:::states:startExecution resource --- src/stepfunctions/steps/service.py | 19 ++++++++++++------- tests/unit/test_service_steps.py | 25 +++++++++++++++++++------ 2 files changed, 31 insertions(+), 13 deletions(-) diff --git a/src/stepfunctions/steps/service.py b/src/stepfunctions/steps/service.py index 06e9126..6fb2f18 100644 --- a/src/stepfunctions/steps/service.py +++ b/src/stepfunctions/steps/service.py @@ -899,16 +899,17 @@ class StepFunctionsStartExecutionStep(Task): """ Creates a Task state that starts an execution of another state machine. See `Manage AWS Step Functions Executions as an Integrated Service 1: - raise ValueError(f"More than 1 activated resource flags: {enabled_property_flags} - " - f"Please enable one at most.") + if len(enabled_property_flags) == 0: + raise ValueError(f"No resource flag enabled - Please enable one of " + f"{[property_flag[1] for property_flag in property_flags]}") + elif len(enabled_property_flags) > 1: + raise ValueError(f"Multiple resource flags enabled({enabled_property_flags}) - " + f"Please enable only one.") if wait_for_callback: """ diff --git a/tests/unit/test_service_steps.py b/tests/unit/test_service_steps.py index dae4d78..18c63b8 100644 --- a/tests/unit/test_service_steps.py +++ b/tests/unit/test_service_steps.py @@ -1165,7 +1165,7 @@ def test_eks_call_step_creation(): @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_step_functions_start_execution_step_creation(): step = StepFunctionsStartExecutionStep( - "SFN Start Execution", wait_for_callback=False, wait_for_completion=False, + "SFN Start Execution", wait_for_callback=False, wait_for_completion=False, async_call=True, parameters={ "Input": { "Comment": "Hello world!" @@ -1191,7 +1191,7 @@ def test_step_functions_start_execution_step_creation(): @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_step_functions_start_execution_step_creation_sync(): step = StepFunctionsStartExecutionStep( - "SFN Start Execution - Sync", wait_for_callback=False, wait_for_completion=True, + "SFN Start Execution - Sync", wait_for_callback=False, wait_for_completion=True, async_call=False, parameters={ "Input": { "Comment": "Hello world!" @@ -1217,7 +1217,8 @@ def test_step_functions_start_execution_step_creation_sync(): @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_step_functions_start_execution_step_creation_wait_for_callback(): step = StepFunctionsStartExecutionStep( - "SFN Start Execution - Wait for Callback", wait_for_callback=True, wait_for_completion=False, parameters={ + "SFN Start Execution - Wait for Callback", wait_for_callback=True, wait_for_completion=False, async_call=False, + parameters={ "Input": { "Comment": "Hello world!", "token.$": "$$.Task.Token" @@ -1241,8 +1242,20 @@ def test_step_functions_start_execution_step_creation_wait_for_callback(): } +@pytest.mark.parametrize("resource_flags", [ + {'wait_for_callback': True, 'wait_for_completion': True, 'async_call': True}, + {'wait_for_callback': False, 'wait_for_completion': True, 'async_call': True}, + {'wait_for_callback': True, 'wait_for_completion': False, 'async_call': True}, + {'wait_for_callback': True, 'wait_for_completion': True, 'async_call': False}, +]) +@patch.object(boto3.session.Session, 'region_name', 'us-east-1') +def test_step_functions_start_execution_step_creation_multiple_enabled_flags_raises_exception(resource_flags): + with pytest.raises(ValueError): + StepFunctionsStartExecutionStep("SFN Start Execution - Multiple flags", **resource_flags) + + @patch.object(boto3.session.Session, 'region_name', 'us-east-1') -def test_step_functions_start_execution_step_creation_multiple_enabled_flags_raises_exception(): +def test_step_functions_start_execution_step_creation_no_enabled_flags_raises_exception(): with pytest.raises(ValueError): - StepFunctionsStartExecutionStep("SFN Start Execution - Multiple flags", wait_for_callback=True, - wait_for_completion=True) + StepFunctionsStartExecutionStep("SFN Start Execution - Multiple flags", wait_for_callback=False, + wait_for_completion=False, async_call=False) From 1c7a4acc8d083f37dfc70740b805b4b5bcbff011 Mon Sep 17 00:00:00 2001 From: Carolyn Nguyen Date: Sun, 12 Sep 2021 14:45:42 -0700 Subject: [PATCH 05/15] Updated flags validation --- src/stepfunctions/steps/service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stepfunctions/steps/service.py b/src/stepfunctions/steps/service.py index 6fb2f18..5d3116d 100644 --- a/src/stepfunctions/steps/service.py +++ b/src/stepfunctions/steps/service.py @@ -924,7 +924,7 @@ def __init__(self, state_id, wait_for_callback=False, wait_for_completion=True, property_flags = [(wait_for_callback, "wait_for_callback"), (wait_for_completion, "wait_for_completion"), (async_call, "async_call")] enabled_property_flags = [property_flag[1] for property_flag in property_flags if property_flag[0]] - if len(enabled_property_flags) == 0: + if not enabled_property_flags: raise ValueError(f"No resource flag enabled - Please enable one of " f"{[property_flag[1] for property_flag in property_flags]}") elif len(enabled_property_flags) > 1: From c08140a708e3eddde08880105d439a28bc6788cf Mon Sep 17 00:00:00 2001 From: Carolyn Nguyen Date: Mon, 13 Sep 2021 09:28:07 -0700 Subject: [PATCH 06/15] Updated test --- tests/unit/test_service_steps.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/test_service_steps.py b/tests/unit/test_service_steps.py index 18c63b8..01889f3 100644 --- a/tests/unit/test_service_steps.py +++ b/tests/unit/test_service_steps.py @@ -1257,5 +1257,5 @@ def test_step_functions_start_execution_step_creation_multiple_enabled_flags_rai @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_step_functions_start_execution_step_creation_no_enabled_flags_raises_exception(): with pytest.raises(ValueError): - StepFunctionsStartExecutionStep("SFN Start Execution - Multiple flags", wait_for_callback=False, + StepFunctionsStartExecutionStep("SFN Start Execution - No active flags", wait_for_callback=False, wait_for_completion=False, async_call=False) From 1c52e6c60053d70455b111301fec027e1b89acbf Mon Sep 17 00:00:00 2001 From: Carolyn Nguyen Date: Tue, 14 Sep 2021 13:11:33 -0700 Subject: [PATCH 07/15] Use enum to select service integration type --- .../steps/integration_resources.py | 31 +++++++++++ src/stepfunctions/steps/service.py | 53 +++++++++---------- tests/unit/test_service_steps.py | 29 ++++------ tests/unit/test_steps_utils.py | 35 ++++++++++-- 4 files changed, 97 insertions(+), 51 deletions(-) diff --git a/src/stepfunctions/steps/integration_resources.py b/src/stepfunctions/steps/integration_resources.py index e7ffcb4..d986362 100644 --- a/src/stepfunctions/steps/integration_resources.py +++ b/src/stepfunctions/steps/integration_resources.py @@ -13,9 +13,13 @@ from __future__ import absolute_import +import logging + from enum import Enum from stepfunctions.steps.utils import get_aws_partition +logger = logging.getLogger('stepfunctions') + class IntegrationPattern(Enum): """ @@ -28,6 +32,16 @@ class IntegrationPattern(Enum): WaitForCompletionWithJsonResponse = "sync:2" +class ServiceIntegrationType(Enum): + """ + Service Integration Types for service integration resources (see `Service Integration Patterns 1: - raise ValueError(f"Multiple resource flags enabled({enabled_property_flags}) - " - f"Please enable only one.") + supported_integ_types = [ServiceIntegrationType.REQUEST_RESPONSE, ServiceIntegrationType.RUN_A_JOB, + ServiceIntegrationType.WAIT_FOR_CALLBACK] - if wait_for_callback: + if not isinstance(service_integration_type, ServiceIntegrationType): + raise ValueError(f"Invalid type used for service_integration_type arg ({service_integration_type}, " + f"{type(service_integration_type)}). Accepted type: {ServiceIntegrationType}") + elif service_integration_type not in supported_integ_types: + raise ValueError(f"Service Integration Type ({service_integration_type.name}) is not supported for this step - " + f"Please use one of the following: " + f"{[integ_type.name for integ_type in supported_integ_types]}") + + if service_integration_type == ServiceIntegrationType.RUN_A_JOB: + # For RunAJob service integration type, use the resource that returns json response (`.sync:2`) """ - Example resource arn: arn:aws:states:::states:startExecution.waitForTaskToken + Example resource arn:aws:states:::states:startExecution.sync:2 """ - kwargs[Field.Resource.value] = get_service_integration_arn(STEP_FUNCTIONS_SERVICE_NAME, StepFunctions.StartExecution, - IntegrationPattern.WaitForTaskToken) - elif wait_for_completion: - """ - Example resource arn:aws:states:::states:startExecution.sync:2 - """ - - kwargs[Field.Resource.value] = get_service_integration_arn(STEP_FUNCTIONS_SERVICE_NAME, - StepFunctions.StartExecution, - IntegrationPattern.WaitForCompletionWithJsonResponse) + IntegrationPattern.WaitForCompletionWithJsonResponse) else: """ - Example resource arn:aws:states:::states:startExecution + Example resource arn: + - arn:aws:states:::states:startExecution.waitForTaskToken + - arn:aws:states:::states:startExecution """ kwargs[Field.Resource.value] = get_service_integration_arn(STEP_FUNCTIONS_SERVICE_NAME, - StepFunctions.StartExecution) + StepFunctions.StartExecution, + get_integration_pattern_from_service_integration_type(service_integration_type)) super(StepFunctionsStartExecutionStep, self).__init__(state_id, **kwargs) diff --git a/tests/unit/test_service_steps.py b/tests/unit/test_service_steps.py index 01889f3..39e71d7 100644 --- a/tests/unit/test_service_steps.py +++ b/tests/unit/test_service_steps.py @@ -32,6 +32,7 @@ from stepfunctions.steps.service import SnsPublishStep, SqsSendMessageStep from stepfunctions.steps.service import GlueDataBrewStartJobRunStep from stepfunctions.steps.service import StepFunctionsStartExecutionStep +from stepfunctions.steps.integration_resources import IntegrationPattern, ServiceIntegrationType @patch.object(boto3.session.Session, 'region_name', 'us-east-1') @@ -1165,8 +1166,7 @@ def test_eks_call_step_creation(): @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_step_functions_start_execution_step_creation(): step = StepFunctionsStartExecutionStep( - "SFN Start Execution", wait_for_callback=False, wait_for_completion=False, async_call=True, - parameters={ + "SFN Start Execution", service_integration_type=ServiceIntegrationType.REQUEST_RESPONSE, parameters={ "Input": { "Comment": "Hello world!" }, @@ -1191,8 +1191,7 @@ def test_step_functions_start_execution_step_creation(): @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_step_functions_start_execution_step_creation_sync(): step = StepFunctionsStartExecutionStep( - "SFN Start Execution - Sync", wait_for_callback=False, wait_for_completion=True, async_call=False, - parameters={ + "SFN Start Execution - Sync", service_integration_type=ServiceIntegrationType.RUN_A_JOB, parameters={ "Input": { "Comment": "Hello world!" }, @@ -1217,7 +1216,7 @@ def test_step_functions_start_execution_step_creation_sync(): @patch.object(boto3.session.Session, 'region_name', 'us-east-1') def test_step_functions_start_execution_step_creation_wait_for_callback(): step = StepFunctionsStartExecutionStep( - "SFN Start Execution - Wait for Callback", wait_for_callback=True, wait_for_completion=False, async_call=False, + "SFN Start Execution - Wait for Callback", service_integration_type=ServiceIntegrationType.WAIT_FOR_CALLBACK, parameters={ "Input": { "Comment": "Hello world!", @@ -1242,20 +1241,12 @@ def test_step_functions_start_execution_step_creation_wait_for_callback(): } -@pytest.mark.parametrize("resource_flags", [ - {'wait_for_callback': True, 'wait_for_completion': True, 'async_call': True}, - {'wait_for_callback': False, 'wait_for_completion': True, 'async_call': True}, - {'wait_for_callback': True, 'wait_for_completion': False, 'async_call': True}, - {'wait_for_callback': True, 'wait_for_completion': True, 'async_call': False}, +@pytest.mark.parametrize("service_integration_type", [ + None, + "ServiceIntegrationTypeStr", + IntegrationPattern.RequestResponse ]) @patch.object(boto3.session.Session, 'region_name', 'us-east-1') -def test_step_functions_start_execution_step_creation_multiple_enabled_flags_raises_exception(resource_flags): - with pytest.raises(ValueError): - StepFunctionsStartExecutionStep("SFN Start Execution - Multiple flags", **resource_flags) - - -@patch.object(boto3.session.Session, 'region_name', 'us-east-1') -def test_step_functions_start_execution_step_creation_no_enabled_flags_raises_exception(): +def test_step_functions_start_execution_step_creation_invalid_service_integration_type_raises_exception(service_integration_type): with pytest.raises(ValueError): - StepFunctionsStartExecutionStep("SFN Start Execution - No active flags", wait_for_callback=False, - wait_for_completion=False, async_call=False) + StepFunctionsStartExecutionStep("SFN Start Execution - invalid ServiceType", service_integration_type=service_integration_type) diff --git a/tests/unit/test_steps_utils.py b/tests/unit/test_steps_utils.py index 7e06e37..91ea054 100644 --- a/tests/unit/test_steps_utils.py +++ b/tests/unit/test_steps_utils.py @@ -13,11 +13,16 @@ # Test if boto3 session can fetch correct aws partition info from test environment -from stepfunctions.steps.utils import get_aws_partition, merge_dicts -from stepfunctions.steps.integration_resources import IntegrationPattern, get_service_integration_arn import boto3 -from unittest.mock import patch +import logging +import pytest + from enum import Enum +from unittest.mock import patch + +from stepfunctions.steps.utils import get_aws_partition, merge_dicts +from stepfunctions.steps.integration_resources import IntegrationPattern, ServiceIntegrationType,\ + get_integration_pattern_from_service_integration_type, get_service_integration_arn testService = "sagemaker" @@ -86,3 +91,27 @@ def test_merge_dicts(): 'b': 2, 'c': 3 } + + +@pytest.mark.parametrize("service_integration_type, expected_integration_pattern", [ + (ServiceIntegrationType.REQUEST_RESPONSE, IntegrationPattern.RequestResponse), + (ServiceIntegrationType.RUN_A_JOB, IntegrationPattern.WaitForCompletion), + (ServiceIntegrationType.WAIT_FOR_CALLBACK, IntegrationPattern.WaitForTaskToken) +]) +@patch.object(boto3.session.Session, 'region_name', 'us-east-1') +def test_get_integration_pattern_from_service_integration_type(service_integration_type, expected_integration_pattern): + integration_pattern = get_integration_pattern_from_service_integration_type(service_integration_type) + assert integration_pattern == expected_integration_pattern + + +@pytest.mark.parametrize("service_integration_type", [ + None, + "ServiceIntegrationTypeStr", + IntegrationPattern.RequestResponse +]) +@patch.object(boto3.session.Session, 'region_name', 'us-east-1') +def test_get_integration_pattern_from_service_integration_type_with_invalid_type(service_integration_type, caplog): + with caplog.at_level(logging.WARNING): + integration_pattern = get_integration_pattern_from_service_integration_type(service_integration_type) + assert 'WARNING' in caplog.text + assert integration_pattern == IntegrationPattern.RequestResponse From 5d7c99e661a097622b3882fdaf95584cca3c9e38 Mon Sep 17 00:00:00 2001 From: Carolyn Nguyen Date: Wed, 6 Oct 2021 18:06:24 -0700 Subject: [PATCH 08/15] Update per PR review --- .../steps/integration_resources.py | 13 +++++- src/stepfunctions/steps/service.py | 21 ++++------ tests/unit/test_service_steps.py | 41 +++++++++++-------- tests/unit/test_steps_utils.py | 19 ++++++++- 4 files changed, 58 insertions(+), 36 deletions(-) diff --git a/src/stepfunctions/steps/integration_resources.py b/src/stepfunctions/steps/integration_resources.py index d986362..debaf18 100644 --- a/src/stepfunctions/steps/integration_resources.py +++ b/src/stepfunctions/steps/integration_resources.py @@ -38,7 +38,7 @@ class ServiceIntegrationType(Enum): """ REQUEST_RESPONSE = "RequestResponse", - RUN_A_JOB = "RunAJob", + RUN_JOB = "RunJob", WAIT_FOR_CALLBACK = "WaitForCallback" @@ -67,7 +67,7 @@ def get_integration_pattern_from_service_integration_type(service_integration_ty service_integration_type(ServiceIntegrationType): Service integration type to use to get the integration pattern """ - if service_integration_type == ServiceIntegrationType.RUN_A_JOB: + if service_integration_type == ServiceIntegrationType.RUN_JOB: return IntegrationPattern.WaitForCompletion elif service_integration_type == ServiceIntegrationType.WAIT_FOR_CALLBACK: return IntegrationPattern.WaitForTaskToken @@ -76,3 +76,12 @@ def get_integration_pattern_from_service_integration_type(service_integration_ty logger.warning(f"Invalid Service integration type ({service_integration_type}) - returning IntegrationPattern.RequestResponse by default") return IntegrationPattern.RequestResponse + +def is_integration_type_valid(service_integration_type, supported_integration_types): + if not isinstance(service_integration_type, ServiceIntegrationType): + raise ValueError(f"Invalid type used for service_integration_type arg ({service_integration_type}, " + f"{type(service_integration_type)}). Accepted type: {ServiceIntegrationType}") + elif service_integration_type not in supported_integration_types: + raise ValueError(f"Service Integration Type ({service_integration_type.name}) is not supported for this step - " + f"Please use one of the following: " + f"{[integ_type.name for integ_type in supported_integration_types]}") diff --git a/src/stepfunctions/steps/service.py b/src/stepfunctions/steps/service.py index f38dcd9..3295887 100644 --- a/src/stepfunctions/steps/service.py +++ b/src/stepfunctions/steps/service.py @@ -16,7 +16,7 @@ from stepfunctions.steps.states import Task from stepfunctions.steps.fields import Field from stepfunctions.steps.integration_resources import IntegrationPattern, ServiceIntegrationType,\ - get_integration_pattern_from_service_integration_type, get_service_integration_arn + get_integration_pattern_from_service_integration_type, get_service_integration_arn, is_integration_type_valid DYNAMODB_SERVICE_NAME = "dynamodb" EKS_SERVICES_NAME = "eks" @@ -904,12 +904,12 @@ class StepFunctionsStartExecutionStep(Task): One of three must be enabled to create the step successfully. """ - def __init__(self, state_id, service_integration_type, **kwargs): + def __init__(self, state_id, service_integration_type=ServiceIntegrationType.RUN_JOB, **kwargs): """ Args: state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine. - integration_pattern (stepfunctions.steps.integration_resources.ServiceIntegrationType): Service integration type to use to build resource. - Supported service integration types: REQUEST_RESPONSE, RUN_A_JOB and WAIT_FOR_CALLBACK + service_integration_type (stepfunctions.steps.integration_resources.ServiceIntegrationType, optional): Service integration type to use to build resource. (default: RUN_JOB) + Supported service integration types: REQUEST_RESPONSE, RUN_JOB and WAIT_FOR_CALLBACK timeout_seconds (int, optional): Positive integer specifying timeout for the state in seconds. If the state runs longer than the specified timeout, then the interpreter fails the state with a `States.Timeout` Error Name. (default: 60) timeout_seconds_path (str, optional): Path specifying the state's timeout value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. heartbeat_seconds (int, optional): Positive integer specifying heartbeat timeout for the state in seconds. This value should be lower than the one specified for `timeout_seconds`. If more time than the specified heartbeat elapses between heartbeats from the task, then the interpreter fails the state with a `States.Timeout` Error Name. @@ -920,19 +920,12 @@ def __init__(self, state_id, service_integration_type, **kwargs): result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ - supported_integ_types = [ServiceIntegrationType.REQUEST_RESPONSE, ServiceIntegrationType.RUN_A_JOB, + supported_integ_types = [ServiceIntegrationType.REQUEST_RESPONSE, ServiceIntegrationType.RUN_JOB, ServiceIntegrationType.WAIT_FOR_CALLBACK] - if not isinstance(service_integration_type, ServiceIntegrationType): - raise ValueError(f"Invalid type used for service_integration_type arg ({service_integration_type}, " - f"{type(service_integration_type)}). Accepted type: {ServiceIntegrationType}") - elif service_integration_type not in supported_integ_types: - raise ValueError(f"Service Integration Type ({service_integration_type.name}) is not supported for this step - " - f"Please use one of the following: " - f"{[integ_type.name for integ_type in supported_integ_types]}") + is_integration_type_valid(service_integration_type, supported_integ_types) - if service_integration_type == ServiceIntegrationType.RUN_A_JOB: - # For RunAJob service integration type, use the resource that returns json response (`.sync:2`) + if service_integration_type == ServiceIntegrationType.RUN_JOB: """ Example resource arn:aws:states:::states:startExecution.sync:2 """ diff --git a/tests/unit/test_service_steps.py b/tests/unit/test_service_steps.py index 39e71d7..d4b2a7f 100644 --- a/tests/unit/test_service_steps.py +++ b/tests/unit/test_service_steps.py @@ -1164,12 +1164,28 @@ def test_eks_call_step_creation(): @patch.object(boto3.session.Session, 'region_name', 'us-east-1') -def test_step_functions_start_execution_step_creation(): +def test_step_functions_start_execution_step_creation_default(): + step = StepFunctionsStartExecutionStep( + "SFN Start Execution", parameters={ + "StateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:HelloWorld", + "Name": "ExecutionName" + }) + + assert step.to_dict() == { + "Type": "Task", + "Resource": "arn:aws:states:::states:startExecution.sync:2", + "Parameters": { + "StateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:HelloWorld", + "Name": "ExecutionName" + }, + "End": True + } + + +@patch.object(boto3.session.Session, 'region_name', 'us-east-1') +def test_step_functions_start_execution_step_creation_request_response(): step = StepFunctionsStartExecutionStep( "SFN Start Execution", service_integration_type=ServiceIntegrationType.REQUEST_RESPONSE, parameters={ - "Input": { - "Comment": "Hello world!" - }, "StateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:HelloWorld", "Name": "ExecutionName" }) @@ -1178,9 +1194,6 @@ def test_step_functions_start_execution_step_creation(): "Type": "Task", "Resource": "arn:aws:states:::states:startExecution", "Parameters": { - "Input": { - "Comment": "Hello world!" - }, "StateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:HelloWorld", "Name": "ExecutionName" }, @@ -1189,12 +1202,9 @@ def test_step_functions_start_execution_step_creation(): @patch.object(boto3.session.Session, 'region_name', 'us-east-1') -def test_step_functions_start_execution_step_creation_sync(): +def test_step_functions_start_execution_step_creation_run_job(): step = StepFunctionsStartExecutionStep( - "SFN Start Execution - Sync", service_integration_type=ServiceIntegrationType.RUN_A_JOB, parameters={ - "Input": { - "Comment": "Hello world!" - }, + "SFN Start Execution - Sync", service_integration_type=ServiceIntegrationType.RUN_JOB, parameters={ "StateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:HelloWorld", "Name": "ExecutionName" }) @@ -1203,9 +1213,6 @@ def test_step_functions_start_execution_step_creation_sync(): "Type": "Task", "Resource": "arn:aws:states:::states:startExecution.sync:2", "Parameters": { - "Input": { - "Comment": "Hello world!" - }, "StateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:HelloWorld", "Name": "ExecutionName" }, @@ -1219,7 +1226,6 @@ def test_step_functions_start_execution_step_creation_wait_for_callback(): "SFN Start Execution - Wait for Callback", service_integration_type=ServiceIntegrationType.WAIT_FOR_CALLBACK, parameters={ "Input": { - "Comment": "Hello world!", "token.$": "$$.Task.Token" }, "StateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:HelloWorld", @@ -1231,7 +1237,6 @@ def test_step_functions_start_execution_step_creation_wait_for_callback(): "Resource": "arn:aws:states:::states:startExecution.waitForTaskToken", "Parameters": { "Input": { - "Comment": "Hello world!", "token.$": "$$.Task.Token" }, "StateMachineArn": "arn:aws:states:us-east-1:123456789012:stateMachine:HelloWorld", @@ -1247,6 +1252,6 @@ def test_step_functions_start_execution_step_creation_wait_for_callback(): IntegrationPattern.RequestResponse ]) @patch.object(boto3.session.Session, 'region_name', 'us-east-1') -def test_step_functions_start_execution_step_creation_invalid_service_integration_type_raises_exception(service_integration_type): +def test_step_functions_start_execution_step_creation_invalid_service_integration_type_raises_value_error(service_integration_type): with pytest.raises(ValueError): StepFunctionsStartExecutionStep("SFN Start Execution - invalid ServiceType", service_integration_type=service_integration_type) diff --git a/tests/unit/test_steps_utils.py b/tests/unit/test_steps_utils.py index 91ea054..292e457 100644 --- a/tests/unit/test_steps_utils.py +++ b/tests/unit/test_steps_utils.py @@ -22,7 +22,7 @@ from stepfunctions.steps.utils import get_aws_partition, merge_dicts from stepfunctions.steps.integration_resources import IntegrationPattern, ServiceIntegrationType,\ - get_integration_pattern_from_service_integration_type, get_service_integration_arn + get_integration_pattern_from_service_integration_type, get_service_integration_arn, is_integration_type_valid testService = "sagemaker" @@ -95,7 +95,7 @@ def test_merge_dicts(): @pytest.mark.parametrize("service_integration_type, expected_integration_pattern", [ (ServiceIntegrationType.REQUEST_RESPONSE, IntegrationPattern.RequestResponse), - (ServiceIntegrationType.RUN_A_JOB, IntegrationPattern.WaitForCompletion), + (ServiceIntegrationType.RUN_JOB, IntegrationPattern.WaitForCompletion), (ServiceIntegrationType.WAIT_FOR_CALLBACK, IntegrationPattern.WaitForTaskToken) ]) @patch.object(boto3.session.Session, 'region_name', 'us-east-1') @@ -115,3 +115,18 @@ def test_get_integration_pattern_from_service_integration_type_with_invalid_type integration_pattern = get_integration_pattern_from_service_integration_type(service_integration_type) assert 'WARNING' in caplog.text assert integration_pattern == IntegrationPattern.RequestResponse + + +@pytest.mark.parametrize("service_integration_type", [ + None, + "ServiceIntegrationTypeStr", + IntegrationPattern.RequestResponse +]) +def test_is_integration_type_valid_with_invalid_type_raises_value_error(service_integration_type): + with pytest.raises(ValueError): + is_integration_type_valid(service_integration_type, [ServiceIntegrationType.REQUEST_RESPONSE]) + + +def test_is_integration_type_valid_with_non_supported_type_raises_value_error(): + with pytest.raises(ValueError): + is_integration_type_valid(ServiceIntegrationType.WAIT_FOR_CALLBACK, [ServiceIntegrationType.REQUEST_RESPONSE]) From 79bc5dd41926271a670dcdc174c59275b6500c5f Mon Sep 17 00:00:00 2001 From: Carolyn Nguyen <83104894+ca-nguyen@users.noreply.github.com> Date: Fri, 8 Oct 2021 09:43:19 -0700 Subject: [PATCH 09/15] Update src/stepfunctions/steps/service.py Co-authored-by: Adam Wong <55506708+wong-a@users.noreply.github.com> --- src/stepfunctions/steps/service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stepfunctions/steps/service.py b/src/stepfunctions/steps/service.py index 3295887..216ab87 100644 --- a/src/stepfunctions/steps/service.py +++ b/src/stepfunctions/steps/service.py @@ -898,7 +898,7 @@ def __init__(self, state_id, **kwargs): class StepFunctionsStartExecutionStep(Task): """ - Creates a Task state that starts an execution of another state machine. See `Manage AWS Step Functions Executions as an Integrated Service Date: Fri, 8 Oct 2021 10:55:06 -0700 Subject: [PATCH 10/15] Remove ServiceIntegrationType to use existing IntegrationPattern enum --- .../steps/integration_resources.py | 53 +++++-------------- src/stepfunctions/steps/service.py | 29 +++++----- tests/unit/test_service_steps.py | 24 ++++----- tests/unit/test_steps_utils.py | 42 ++++----------- 4 files changed, 50 insertions(+), 98 deletions(-) diff --git a/src/stepfunctions/steps/integration_resources.py b/src/stepfunctions/steps/integration_resources.py index debaf18..8febb53 100644 --- a/src/stepfunctions/steps/integration_resources.py +++ b/src/stepfunctions/steps/integration_resources.py @@ -28,60 +28,35 @@ class IntegrationPattern(Enum): WaitForTaskToken = "waitForTaskToken" WaitForCompletion = "sync" - RequestResponse = "" - WaitForCompletionWithJsonResponse = "sync:2" + CallAndContinue = "" -class ServiceIntegrationType(Enum): - """ - Service Integration Types for service integration resources (see `Service Integration Patterns Date: Thu, 14 Oct 2021 10:03:56 -0700 Subject: [PATCH 11/15] Update src/stepfunctions/steps/integration_resources.py Co-authored-by: Adam Wong <55506708+wong-a@users.noreply.github.com> --- src/stepfunctions/steps/integration_resources.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stepfunctions/steps/integration_resources.py b/src/stepfunctions/steps/integration_resources.py index 8febb53..c86820b 100644 --- a/src/stepfunctions/steps/integration_resources.py +++ b/src/stepfunctions/steps/integration_resources.py @@ -55,7 +55,7 @@ def get_service_integration_arn(service, api, integration_pattern=IntegrationPat def is_integration_pattern_valid(integration_pattern, supported_integration_patterns): if not isinstance(integration_pattern, IntegrationPattern): - raise TypeError(f"{integration_pattern} must be of type {IntegrationPattern}") + raise TypeError(f"Integration pattern must be of type {IntegrationPattern}") elif integration_pattern not in supported_integration_patterns: raise ValueError(f"Service Integration Type ({integration_pattern.name}) is not supported for this step - " f"Please use one of the following: " From 64708c489ccc108049ac8a5d72d2bf68834f3a43 Mon Sep 17 00:00:00 2001 From: Carolyn Nguyen Date: Thu, 14 Oct 2021 21:54:45 -0700 Subject: [PATCH 12/15] Removed unused logger and updated docstring --- src/stepfunctions/steps/integration_resources.py | 4 ---- src/stepfunctions/steps/service.py | 2 +- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/src/stepfunctions/steps/integration_resources.py b/src/stepfunctions/steps/integration_resources.py index c86820b..caeb257 100644 --- a/src/stepfunctions/steps/integration_resources.py +++ b/src/stepfunctions/steps/integration_resources.py @@ -13,13 +13,9 @@ from __future__ import absolute_import -import logging - from enum import Enum from stepfunctions.steps.utils import get_aws_partition -logger = logging.getLogger('stepfunctions') - class IntegrationPattern(Enum): """ diff --git a/src/stepfunctions/steps/service.py b/src/stepfunctions/steps/service.py index b1cd6c4..3fee139 100644 --- a/src/stepfunctions/steps/service.py +++ b/src/stepfunctions/steps/service.py @@ -907,7 +907,7 @@ def __init__(self, state_id, integration_pattern=IntegrationPattern.WaitForCompl state_id (str): State name whose length **must be** less than or equal to 128 unicode characters. State names **must be** unique within the scope of the whole state machine. integration_pattern (stepfunctions.steps.integration_resources.IntegrationPattern, optional): Service integration pattern used to call the integrated service. (default: WaitForCompletion) Supported integration patterns: - WaitForCompletion: Wait for a request to complete before progressing to the next state (See `Run A Job Date: Fri, 15 Oct 2021 10:07:49 -0700 Subject: [PATCH 13/15] Fixing error message and documenting args default values --- src/stepfunctions/steps/integration_resources.py | 4 ++-- src/stepfunctions/steps/service.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/stepfunctions/steps/integration_resources.py b/src/stepfunctions/steps/integration_resources.py index caeb257..46a15df 100644 --- a/src/stepfunctions/steps/integration_resources.py +++ b/src/stepfunctions/steps/integration_resources.py @@ -35,7 +35,7 @@ def get_service_integration_arn(service, api, integration_pattern=IntegrationPat service (str): The service name for the service integration api (str): The api of the service integration integration_pattern (IntegrationPattern, optional): The integration pattern for the task. (Default: IntegrationPattern.CallAndContinue) - version (int, optional): The version of the resource to use + version (int, optional): The version of the resource to use. (Default: None) """ arn = "" if integration_pattern == IntegrationPattern.CallAndContinue: @@ -53,6 +53,6 @@ def is_integration_pattern_valid(integration_pattern, supported_integration_patt if not isinstance(integration_pattern, IntegrationPattern): raise TypeError(f"Integration pattern must be of type {IntegrationPattern}") elif integration_pattern not in supported_integration_patterns: - raise ValueError(f"Service Integration Type ({integration_pattern.name}) is not supported for this step - " + raise ValueError(f"Integration Pattern ({integration_pattern.name}) is not supported for this step - " f"Please use one of the following: " f"{[integ_type.name for integ_type in supported_integration_patterns]}") diff --git a/src/stepfunctions/steps/service.py b/src/stepfunctions/steps/service.py index 3fee139..bfadc5e 100644 --- a/src/stepfunctions/steps/service.py +++ b/src/stepfunctions/steps/service.py @@ -916,7 +916,7 @@ def __init__(self, state_id, integration_pattern=IntegrationPattern.WaitForCompl heartbeat_seconds_path (str, optional): Path specifying the state's heartbeat value in seconds from the state input. When resolved, the path must select a field whose value is a positive integer. comment (str, optional): Human-readable comment or description. (default: None) input_path (str, optional): Path applied to the state’s raw input to select some or all of it; that selection is used by the state. (default: '$') - parameters (dict, optional): The value of this field becomes the effective input for the state. + parameters (dict, optional): The value of this field becomes the effective input for the state. (default: None) result_path (str, optional): Path specifying the raw input’s combination with or replacement by the state’s result. (default: '$') output_path (str, optional): Path applied to the state’s output after the application of `result_path`, producing the effective output which serves as the raw input for the next state. (default: '$') """ From a2dba89c0ed112d2ba97c2297a7d3040ee56d62f Mon Sep 17 00:00:00 2001 From: Carolyn Nguyen Date: Fri, 15 Oct 2021 10:48:28 -0700 Subject: [PATCH 14/15] Updated arg description doc string for CallAndContinue --- src/stepfunctions/steps/service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stepfunctions/steps/service.py b/src/stepfunctions/steps/service.py index bfadc5e..b5d8b49 100644 --- a/src/stepfunctions/steps/service.py +++ b/src/stepfunctions/steps/service.py @@ -909,7 +909,7 @@ def __init__(self, state_id, integration_pattern=IntegrationPattern.WaitForCompl Supported integration patterns: WaitForCompletion: Wait for the state machine execution to complete before going to the next state. (See `Run A Job Date: Fri, 15 Oct 2021 11:18:41 -0700 Subject: [PATCH 15/15] Updated doc string for WaitForTaskToken --- src/stepfunctions/steps/service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stepfunctions/steps/service.py b/src/stepfunctions/steps/service.py index b5d8b49..986217c 100644 --- a/src/stepfunctions/steps/service.py +++ b/src/stepfunctions/steps/service.py @@ -908,7 +908,7 @@ def __init__(self, state_id, integration_pattern=IntegrationPattern.WaitForCompl integration_pattern (stepfunctions.steps.integration_resources.IntegrationPattern, optional): Service integration pattern used to call the integrated service. (default: WaitForCompletion) Supported integration patterns: WaitForCompletion: Wait for the state machine execution to complete before going to the next state. (See `Run A Job