Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow actor exceptions to propagate #4232

Merged
merged 17 commits into from
Jun 14, 2021
26 changes: 21 additions & 5 deletions distributed/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,17 @@ async def run_actor_function_on_worker():
await self._future
else:
raise OSError("Unable to contact Actor's worker")
return result["result"]
return result

if self._asynchronous:
return asyncio.ensure_future(run_actor_function_on_worker())

async def unwrap():
result = await run_actor_function_on_worker()
if result["status"] == "OK":
return result["result"]
raise result["exception"]

return asyncio.ensure_future(unwrap())
else:
# TODO: this mechanism is error prone
# we should endeavor to make dask's standard code work here
Expand All @@ -187,7 +194,10 @@ async def get_actor_attribute_from_worker():
x = await self._worker_rpc.actor_attribute(
attribute=key, actor=self.key
)
return x["result"]
if x["status"] == "OK":
return x["result"]
else:
raise x["exception"]

return self._sync(get_actor_attribute_from_worker)

Expand Down Expand Up @@ -237,10 +247,16 @@ def __await__(self):

def result(self, timeout=None):
try:
if isinstance(self._cached_result, Exception):
raise self._cached_result
return self._cached_result
except AttributeError:
self._cached_result = self.q.get(timeout=timeout)
return self._cached_result
out = self.q.get(timeout=timeout)
if out["status"] == "OK":
self._cached_result = out["result"]
else:
self._cached_result = out["exception"]
return self.result()

def __repr__(self):
return "<ActorFuture>"
59 changes: 53 additions & 6 deletions distributed/tests/test_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,13 @@

from distributed import Actor, ActorFuture, Client, Future, Nanny, wait
from distributed.metrics import time
from distributed.utils_test import cluster, gen_cluster
from distributed.utils_test import ( # noqa: F401
async_wait_for,
cluster,
cluster_fixture,
gen_cluster,
loop,
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following #4888, all pytest fixtures in distributed.utils_test are globally available. So there's no need for the cluster_fixture or loop imports (and hopefully we can drop # noqa: F401 too).



class Counter:
Expand Down Expand Up @@ -465,7 +471,6 @@ def f(block, ps=None):
print(format_time(end - start))


@pytest.mark.flaky(reruns=10, reruns_delay=5)
martindurant marked this conversation as resolved.
Show resolved Hide resolved
@gen_cluster(client=True)
async def test_compute(c, s, a, b):
@dask.delayed
Expand All @@ -485,10 +490,7 @@ def check(counter, blanks):
result = await c.compute(final, actors=counter)
assert result == 0 + 1 + 2 + 3 + 4

start = time()
while a.data or b.data:
await asyncio.sleep(0.01)
assert time() < start + 30
await async_wait_for(lambda: a.data or b.data, timeout=10)
martindurant marked this conversation as resolved.
Show resolved Hide resolved


def test_compute_sync(client):
Expand Down Expand Up @@ -581,3 +583,48 @@ async def test_async_deadlock(client, s, a, b):
ac2 = await client.submit(UsesCounter, actor=True, workers=[ac._address])

assert (await ac2.ado_inc(ac)) == 1


def test_exception():
class MyException(Exception):
pass

class Broken:
def method(self):
raise MyException

@property
def prop(self):
raise MyException

with cluster(nworkers=2) as (cl, w):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason to use cluster here instead of @gen_cluster like most of the other test in this module?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was to test the sync API, which is more typical for actors. I added an async version immediately below (could remove this one, if you like).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was to test the sync API, which is more typical for actors. I added an async version immediately below (could remove this one, if you like).

To be clear here, the sync api is more common for everything. We prefer the async tests because they are faster/easier on CI and allow for greater debuggability. Adding sync tests too is fine if we want to be extra careful, but in general we prefer async tests. In general if async tests I usually have confidence that sync works just as well, unless I'm explicitly writing code to handle synchronization.

What's here is great though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's here is great though.

I don't mind keeping it or not. Writing the sync version first probably reflects how I initially tested by hand. It was some time ago, so I can't remember if there was any other reason, given that the async version is effectively identical and works just fine.

client = Client(cl["address"])
ac = client.submit(Broken, actor=True).result()
acfut = ac.method()
with pytest.raises(MyException):
acfut.result()

with pytest.raises(MyException):
ac.prop


@gen_cluster(client=True)
async def test_exception_async(client, s, a, b):
class MyException(Exception):
pass

class Broken:
def method(self):
raise MyException

@property
def prop(self):
raise MyException

ac = await client.submit(Broken, actor=True)
acfut = ac.method()
with pytest.raises(MyException):
await acfut

with pytest.raises(MyException):
await ac.prop
48 changes: 27 additions & 21 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2830,30 +2830,36 @@ async def actor_execute(
func = getattr(actor, function)
name = key_split(key) + "." + function

if iscoroutinefunction(func):
result = await func(*args, **kwargs)
elif separate_thread:
result = await self.executor_submit(
name,
apply_function_actor,
args=(
func,
args,
kwargs,
self.execution_state,
try:
if iscoroutinefunction(func):
result = await func(*args, **kwargs)
elif separate_thread:
result = await self.executor_submit(
name,
self.active_threads,
self.active_threads_lock,
),
executor=self.executors["actor"],
)
else:
result = func(*args, **kwargs)
return {"status": "OK", "result": to_serialize(result)}
apply_function_actor,
args=(
func,
args,
kwargs,
self.execution_state,
name,
self.active_threads,
self.active_threads_lock,
),
executor=self.executors["actor"],
)
else:
result = func(*args, **kwargs)
return {"status": "OK", "result": to_serialize(result)}
except Exception as ex:
return {"status": "error", "exception": to_serialize(ex)}

def actor_attribute(self, comm=None, actor=None, attribute=None):
value = getattr(self.actors[actor], attribute)
return {"status": "OK", "result": to_serialize(value)}
try:
value = getattr(self.actors[actor], attribute)
return {"status": "OK", "result": to_serialize(value)}
except Exception as ex:
return {"status": "error", "exception": to_serialize(ex)}

def meets_resource_constraints(self, key):
ts = self.tasks[key]
Expand Down