Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow actor exceptions to propagate #4232

Merged
merged 17 commits into from
Jun 14, 2021
26 changes: 21 additions & 5 deletions distributed/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,17 @@ async def run_actor_function_on_worker():
await self._future
else:
raise OSError("Unable to contact Actor's worker")
return result["result"]
return result

if self._asynchronous:
return asyncio.ensure_future(run_actor_function_on_worker())

async def unwrap():
result = await run_actor_function_on_worker()
if "result" in result:
martindurant marked this conversation as resolved.
Show resolved Hide resolved
return result["result"]
raise result["exception"]

return asyncio.ensure_future(unwrap())
else:
# TODO: this mechanism is error prone
# we should endeavor to make dask's standard code work here
Expand All @@ -187,7 +194,10 @@ async def get_actor_attribute_from_worker():
x = await self._worker_rpc.actor_attribute(
attribute=key, actor=self.key
)
return x["result"]
if "result" in x:
martindurant marked this conversation as resolved.
Show resolved Hide resolved
return x["result"]
else:
raise x["exception"]

return self._sync(get_actor_attribute_from_worker)

Expand Down Expand Up @@ -237,10 +247,16 @@ def __await__(self):

def result(self, timeout=None):
try:
if isinstance(self._cached_result, Exception):
raise self._cached_result
return self._cached_result
except AttributeError:
self._cached_result = self.q.get(timeout=timeout)
return self._cached_result
out = self.q.get(timeout=timeout)
if "result" in out:
martindurant marked this conversation as resolved.
Show resolved Hide resolved
self._cached_result = out["result"]
else:
self._cached_result = out["exception"]
return self.result()

def __repr__(self):
return "<ActorFuture>"
24 changes: 23 additions & 1 deletion distributed/tests/test_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,6 @@ def f(block, ps=None):
print(format_time(end - start))


@pytest.mark.flaky(reruns=10, reruns_delay=5)
martindurant marked this conversation as resolved.
Show resolved Hide resolved
@gen_cluster(client=True)
async def test_compute(c, s, a, b):
@dask.delayed
Expand Down Expand Up @@ -587,3 +586,26 @@ async def test_async_deadlock(client, s, a, b):
ac2 = await client.submit(UsesCounter, actor=True, workers=[ac._address])

assert (await ac2.ado_inc(ac)) == 1


def test_exception():
class MyException(Exception):
pass

class Broken:
def method(self):
raise MyException

@property
def prop(self):
raise MyException

with cluster(nworkers=2) as (cl, w):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason to use cluster here instead of @gen_cluster like most of the other test in this module?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was to test the sync API, which is more typical for actors. I added an async version immediately below (could remove this one, if you like).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was to test the sync API, which is more typical for actors. I added an async version immediately below (could remove this one, if you like).

To be clear here, the sync api is more common for everything. We prefer the async tests because they are faster/easier on CI and allow for greater debuggability. Adding sync tests too is fine if we want to be extra careful, but in general we prefer async tests. In general if async tests I usually have confidence that sync works just as well, unless I'm explicitly writing code to handle synchronization.

What's here is great though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's here is great though.

I don't mind keeping it or not. Writing the sync version first probably reflects how I initially tested by hand. It was some time ago, so I can't remember if there was any other reason, given that the async version is effectively identical and works just fine.

client = Client(cl["address"])
ac = client.submit(Broken, actor=True).result()
acfut = ac.method()
with pytest.raises(MyException):
acfut.result()

with pytest.raises(MyException):
ac.prop
48 changes: 27 additions & 21 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2669,30 +2669,36 @@ async def actor_execute(
func = getattr(actor, function)
name = key_split(key) + "." + function

if iscoroutinefunction(func):
result = await func(*args, **kwargs)
elif separate_thread:
result = await self.executor_submit(
name,
apply_function_actor,
args=(
func,
args,
kwargs,
self.execution_state,
try:
if iscoroutinefunction(func):
result = await func(*args, **kwargs)
elif separate_thread:
result = await self.executor_submit(
name,
self.active_threads,
self.active_threads_lock,
),
executor=self.executors["actor"],
)
else:
result = func(*args, **kwargs)
return {"status": "OK", "result": to_serialize(result)}
apply_function_actor,
args=(
func,
args,
kwargs,
self.execution_state,
name,
self.active_threads,
self.active_threads_lock,
),
executor=self.executors["actor"],
)
else:
result = func(*args, **kwargs)
return {"status": "OK", "result": to_serialize(result)}
except Exception as ex:
return {"status": "error", "exception": to_serialize(ex)}

def actor_attribute(self, comm=None, actor=None, attribute=None):
value = getattr(self.actors[actor], attribute)
return {"status": "OK", "result": to_serialize(value)}
try:
value = getattr(self.actors[actor], attribute)
return {"status": "OK", "result": to_serialize(value)}
except Exception as ex:
return {"status": "error", "exception": to_serialize(ex)}

def meets_resource_constraints(self, key):
ts = self.tasks[key]
Expand Down