Skip to content

Commit

Permalink
add timeout to operator execution
Browse files Browse the repository at this point in the history
  • Loading branch information
imanjra committed May 9, 2023
1 parent 7146414 commit fc4a7f2
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 3 deletions.
6 changes: 6 additions & 0 deletions fiftyone/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ def __init__(self, d=None):
self.plugins_dir = self.parse_string(
d, "plugins_dir", env_var="FIFTYONE_PLUGINS_DIR", default=None
)
self.operator_timeout = self.parse_int(
d,
"operator_timeout",
env_var="FIFTYONE_OPERATOR_TIMEOUT",
default=600, # 600 seconds (10 minutes)
)
self.dataset_zoo_manifest_paths = self.parse_path_array(
d,
"dataset_zoo_manifest_paths",
Expand Down
42 changes: 42 additions & 0 deletions fiftyone/operators/decorators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import asyncio
import signal
from contextlib import contextmanager
from functools import wraps


def coroutine_timeout(seconds):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
try:
if asyncio.iscoroutinefunction(func):
return await asyncio.wait_for(
func(*args, **kwargs), timeout=seconds
)
else:
raise TypeError(
f"Function {func.__name__} is not a coroutine function"
)
except asyncio.TimeoutError:
raise_timeout_error(seconds)

return wrapper

return decorator


@contextmanager
def timeout(seconds: int):
signal.signal(
signal.SIGALRM, lambda signum, frame: raise_timeout_error(seconds)
)
signal.alarm(seconds)

try:
yield
finally:
signal.signal(signal.SIGALRM, signal.SIG_IGN)


def raise_timeout_error(seconds):
raise TimeoutError(f"Timeout occurred after {seconds} seconds") from None
10 changes: 8 additions & 2 deletions fiftyone/operators/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
from .message import GeneratedMessage, MessageType
import types as python_types
import traceback
from .decorators import coroutine_timeout
import asyncio


class InvocationRequest:
Expand Down Expand Up @@ -71,7 +73,8 @@ def to_json(self):
}


def execute_operator(operator_name, request_params):
@coroutine_timeout(seconds=fo.config.operator_timeout)
async def execute_operator(operator_name, request_params):
"""Executes the operator with the given name.
Args:
operator_name: the name of the operator
Expand All @@ -92,7 +95,10 @@ def execute_operator(operator_name, request_params):
if validation_ctx.invalid:
return ExecutionResult(None, None, "Validation Error", validation_ctx)
try:
raw_result = operator.execute(ctx)
if asyncio.iscoroutinefunction(operator.execute):
raw_result = await operator.execute(ctx)
else:
raw_result = operator.execute(ctx)
except Exception as e:
return ExecutionResult(None, executor, str(e))

Expand Down
2 changes: 1 addition & 1 deletion fiftyone/operators/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ async def post(self, request: Request, data: dict) -> dict:
"loading_errors": registry.list_errors(),
}
raise HTTPException(status_code=404, detail=erroDetail)
result = execute_operator(operator_uri, data)
result = await execute_operator(operator_uri, data)
json = result.to_json()
if result.error is not None:
print(result.error)
Expand Down

0 comments on commit fc4a7f2

Please sign in to comment.