From bd33f86a39f4b3f2a768acff1dab3d391d8172b9 Mon Sep 17 00:00:00 2001 From: lchu Date: Tue, 8 Mar 2022 11:32:15 -0500 Subject: [PATCH 1/6] align behavior of max_retries with the one in remote functions --- python/ray/workflow/common.py | 6 ++---- python/ray/workflow/step_executor.py | 17 +++++++++++------ .../ray/workflow/tests/test_basic_workflows.py | 15 ++++++--------- python/ray/workflow/virtual_actor_class.py | 4 ++-- 4 files changed, 21 insertions(+), 21 deletions(-) diff --git a/python/ray/workflow/common.py b/python/ray/workflow/common.py index 6461fbab251e..2813498ef549 100644 --- a/python/ray/workflow/common.py +++ b/python/ray/workflow/common.py @@ -216,12 +216,10 @@ def make( ): if max_retries is None: max_retries = 3 - elif not isinstance(max_retries, int) or max_retries < 1: - raise ValueError("max_retries should be greater or equal to 1.") + elif not isinstance(max_retries, int) or max_retries < -1: + raise ValueError("'max_retries' only accepts 0, -1 or a positive integer.") if catch_exceptions is None: catch_exceptions = False - if max_retries is None: - max_retries = 3 if not isinstance(checkpoint, bool) and checkpoint is not None: raise ValueError("'checkpoint' should be None or a boolean.") if ray_options is None: diff --git a/python/ray/workflow/step_executor.py b/python/ray/workflow/step_executor.py index b7aa091593a1..cb7f41887563 100644 --- a/python/ray/workflow/step_executor.py +++ b/python/ray/workflow/step_executor.py @@ -320,17 +320,22 @@ def _wrap_run( result = None # max_retries are for application level failure. # For ray failure, we should use max_retries. - for i in range(runtime_options.max_retries): - logger.info( - f"{get_step_status_info(WorkflowStatus.RUNNING)}" - f"\t[{i + 1}/{runtime_options.max_retries}]" - ) + for i in range(runtime_options.max_retries + 1): + if i == 0: + logger.info( + f"{get_step_status_info(WorkflowStatus.RUNNING)}" + ) + else: + logger.info( + f"{get_step_status_info(WorkflowStatus.RUNNING)}" + f"\tretries: [{i}/{runtime_options.max_retries}]" + ) try: result = func(*args, **kwargs) exception = None break except BaseException as e: - if i + 1 == runtime_options.max_retries: + if i == runtime_options.max_retries: retry_msg = "Maximum retry reached, stop retry." else: retry_msg = "The step will be retried." diff --git a/python/ray/workflow/tests/test_basic_workflows.py b/python/ray/workflow/tests/test_basic_workflows.py index 7e03a323e8e5..e9d5df4c4d8c 100644 --- a/python/ray/workflow/tests/test_basic_workflows.py +++ b/python/ray/workflow/tests/test_basic_workflows.py @@ -197,19 +197,16 @@ def unstable_step(): return v with pytest.raises(Exception): - unstable_step.options(max_retries=-1).step().run() - - with pytest.raises(Exception): - unstable_step.options(max_retries=3).step().run() - assert 10 == unstable_step.options(max_retries=8).step().run() + unstable_step.options(max_retries=2).step().run() + assert 10 == unstable_step.options(max_retries=7).step().run() (tmp_path / "test").write_text("0") (ret, err) = ( - unstable_step.options(max_retries=3, catch_exceptions=True).step().run() + unstable_step.options(max_retries=2, catch_exceptions=True).step().run() ) assert ret is None assert isinstance(err, ValueError) (ret, err) = ( - unstable_step.options(max_retries=8, catch_exceptions=True).step().run() + unstable_step.options(max_retries=7, catch_exceptions=True).step().run() ) assert ret == 10 assert err is None @@ -218,7 +215,7 @@ def unstable_step(): def test_step_failure_decorator(workflow_start_regular_shared, tmp_path): (tmp_path / "test").write_text("0") - @workflow.step(max_retries=11) + @workflow.step(max_retries=10) def unstable_step(): v = int((tmp_path / "test").read_text()) (tmp_path / "test").write_text(f"{v + 1}") @@ -244,7 +241,7 @@ def unstable_step_exception(): (tmp_path / "test").write_text("0") - @workflow.step(catch_exceptions=True, max_retries=4) + @workflow.step(catch_exceptions=True, max_retries=3) def unstable_step_exception(): v = int((tmp_path / "test").read_text()) (tmp_path / "test").write_text(f"{v + 1}") diff --git a/python/ray/workflow/virtual_actor_class.py b/python/ray/workflow/virtual_actor_class.py index 3d5efbf8e3b1..ee861bfb410e 100644 --- a/python/ray/workflow/virtual_actor_class.py +++ b/python/ray/workflow/virtual_actor_class.py @@ -92,7 +92,7 @@ def run_async(self, *args, **kwargs) -> "ObjectRef": def options( self, *, - max_retries: int = 1, + max_retries: int = 0, catch_exceptions: bool = False, name: str = None, metadata: Dict[str, Any] = None, @@ -262,7 +262,7 @@ def step(self, *args, **kwargs): def options( self, *, - max_retries=1, + max_retries=0, catch_exceptions=False, name=None, metadata=None, From 3d832cd9b9e66d33d31fc24673674b333a1eac46 Mon Sep 17 00:00:00 2001 From: lchu Date: Tue, 8 Mar 2022 11:53:36 -0500 Subject: [PATCH 2/6] up --- python/ray/workflow/step_executor.py | 69 ++++++++++++++++++---------- 1 file changed, 46 insertions(+), 23 deletions(-) diff --git a/python/ray/workflow/step_executor.py b/python/ray/workflow/step_executor.py index cb7f41887563..2e93b5e752a0 100644 --- a/python/ray/workflow/step_executor.py +++ b/python/ray/workflow/step_executor.py @@ -320,31 +320,54 @@ def _wrap_run( result = None # max_retries are for application level failure. # For ray failure, we should use max_retries. - for i in range(runtime_options.max_retries + 1): - if i == 0: - logger.info( - f"{get_step_status_info(WorkflowStatus.RUNNING)}" - ) - else: - logger.info( - f"{get_step_status_info(WorkflowStatus.RUNNING)}" - f"\tretries: [{i}/{runtime_options.max_retries}]" - ) - try: - result = func(*args, **kwargs) - exception = None - break - except BaseException as e: - if i == runtime_options.max_retries: - retry_msg = "Maximum retry reached, stop retry." + if runtime_options.max_retries != -1: + for i in range(runtime_options.max_retries + 1): + if i == 0: + logger.info( + f"{get_step_status_info(WorkflowStatus.RUNNING)}" + ) + else: + logger.info( + f"{get_step_status_info(WorkflowStatus.RUNNING)}" + f"\tretries: [{i}/{runtime_options.max_retries}]" + ) + try: + result = func(*args, **kwargs) + exception = None + break + except BaseException as e: + if i == runtime_options.max_retries: + retry_msg = "Maximum retry reached, stop retry." + else: + retry_msg = "The step will be retried." + logger.error( + f"{workflow_context.get_name()} failed with error message" + f" {e}. {retry_msg}" + ) + exception = e + else: + i = 0 + while True: + if i == 0: + logger.info( + f"{get_step_status_info(WorkflowStatus.RUNNING)}" + ) else: + logger.info( + f"{get_step_status_info(WorkflowStatus.RUNNING)}" + f"\tretries: [{i}/inf]" + ) + try: + result = func(*args, **kwargs) + exception = None + break + except BaseException as e: retry_msg = "The step will be retried." - logger.error( - f"{workflow_context.get_name()} failed with error message" - f" {e}. {retry_msg}" - ) - exception = e - + logger.error( + f"{workflow_context.get_name()} failed with error message" + f" {e}. {retry_msg}" + ) + i += 1 step_type = runtime_options.step_type if runtime_options.catch_exceptions: if step_type == StepType.FUNCTION: From 255209d4cd50392919fd640fce5ce0c66bdbcd18 Mon Sep 17 00:00:00 2001 From: lchu Date: Tue, 8 Mar 2022 12:07:15 -0500 Subject: [PATCH 3/6] format --- python/ray/workflow/step_executor.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/python/ray/workflow/step_executor.py b/python/ray/workflow/step_executor.py index 2e93b5e752a0..e9e6f744a8b2 100644 --- a/python/ray/workflow/step_executor.py +++ b/python/ray/workflow/step_executor.py @@ -323,9 +323,7 @@ def _wrap_run( if runtime_options.max_retries != -1: for i in range(runtime_options.max_retries + 1): if i == 0: - logger.info( - f"{get_step_status_info(WorkflowStatus.RUNNING)}" - ) + logger.info(f"{get_step_status_info(WorkflowStatus.RUNNING)}") else: logger.info( f"{get_step_status_info(WorkflowStatus.RUNNING)}" @@ -349,9 +347,7 @@ def _wrap_run( i = 0 while True: if i == 0: - logger.info( - f"{get_step_status_info(WorkflowStatus.RUNNING)}" - ) + logger.info(f"{get_step_status_info(WorkflowStatus.RUNNING)}") else: logger.info( f"{get_step_status_info(WorkflowStatus.RUNNING)}" From 0b9dcfa3cc7d7cb081f5b1aee0694449825e4e8b Mon Sep 17 00:00:00 2001 From: lchu Date: Tue, 8 Mar 2022 15:35:12 -0500 Subject: [PATCH 4/6] up --- python/ray/workflow/tests/test_basic_workflows_2.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/workflow/tests/test_basic_workflows_2.py b/python/ray/workflow/tests/test_basic_workflows_2.py index 214368614e44..0c82fbe843e5 100644 --- a/python/ray/workflow/tests/test_basic_workflows_2.py +++ b/python/ray/workflow/tests/test_basic_workflows_2.py @@ -109,7 +109,7 @@ def incr(): return 10 with pytest.raises(ray.exceptions.RaySystemError): - incr.options(max_retries=1).step().run("incr") + incr.options(max_retries=0).step().run("incr") assert cnt_file.read_text() == "1" From 95c1d6212eba4337f270c9c5b4c2f1441c74b7aa Mon Sep 17 00:00:00 2001 From: lchu Date: Tue, 22 Mar 2022 14:56:20 -0400 Subject: [PATCH 5/6] Trigger Build From 3dc796d90db33bf863cef9af4437dbd033a46499 Mon Sep 17 00:00:00 2001 From: lchu Date: Wed, 23 Mar 2022 16:07:16 -0400 Subject: [PATCH 6/6] up --- python/ray/workflow/step_executor.py | 67 ++++++++----------- .../workflow/tests/test_basic_workflows.py | 3 + 2 files changed, 30 insertions(+), 40 deletions(-) diff --git a/python/ray/workflow/step_executor.py b/python/ray/workflow/step_executor.py index e9e6f744a8b2..ec261bac5e89 100644 --- a/python/ray/workflow/step_executor.py +++ b/python/ray/workflow/step_executor.py @@ -318,52 +318,39 @@ def _wrap_run( """ exception = None result = None + done = False # max_retries are for application level failure. # For ray failure, we should use max_retries. - if runtime_options.max_retries != -1: - for i in range(runtime_options.max_retries + 1): - if i == 0: - logger.info(f"{get_step_status_info(WorkflowStatus.RUNNING)}") - else: - logger.info( - f"{get_step_status_info(WorkflowStatus.RUNNING)}" - f"\tretries: [{i}/{runtime_options.max_retries}]" - ) - try: - result = func(*args, **kwargs) - exception = None - break - except BaseException as e: - if i == runtime_options.max_retries: - retry_msg = "Maximum retry reached, stop retry." - else: - retry_msg = "The step will be retried." - logger.error( - f"{workflow_context.get_name()} failed with error message" - f" {e}. {retry_msg}" - ) + i = 0 + while not done: + if i == 0: + logger.info(f"{get_step_status_info(WorkflowStatus.RUNNING)}") + else: + total_retries = ( + runtime_options.max_retries + if runtime_options.max_retries != -1 + else "inf" + ) + logger.info( + f"{get_step_status_info(WorkflowStatus.RUNNING)}" + f"\tretries: [{i}/{total_retries}]" + ) + try: + result = func(*args, **kwargs) + exception = None + done = True + except BaseException as e: + if i == runtime_options.max_retries: + retry_msg = "Maximum retry reached, stop retry." exception = e - else: - i = 0 - while True: - if i == 0: - logger.info(f"{get_step_status_info(WorkflowStatus.RUNNING)}") + done = True else: - logger.info( - f"{get_step_status_info(WorkflowStatus.RUNNING)}" - f"\tretries: [{i}/inf]" - ) - try: - result = func(*args, **kwargs) - exception = None - break - except BaseException as e: retry_msg = "The step will be retried." - logger.error( - f"{workflow_context.get_name()} failed with error message" - f" {e}. {retry_msg}" - ) i += 1 + logger.error( + f"{workflow_context.get_name()} failed with error message" + f" {e}. {retry_msg}" + ) step_type = runtime_options.step_type if runtime_options.catch_exceptions: if step_type == StepType.FUNCTION: diff --git a/python/ray/workflow/tests/test_basic_workflows.py b/python/ray/workflow/tests/test_basic_workflows.py index e9d5df4c4d8c..bd046b4263c2 100644 --- a/python/ray/workflow/tests/test_basic_workflows.py +++ b/python/ray/workflow/tests/test_basic_workflows.py @@ -196,6 +196,9 @@ def unstable_step(): raise ValueError("Invalid") return v + with pytest.raises(Exception): + unstable_step.options(max_retries=-2).step().run() + with pytest.raises(Exception): unstable_step.options(max_retries=2).step().run() assert 10 == unstable_step.options(max_retries=7).step().run()