Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[java] Add maxTaskRetries option for java actor creation #28377

Merged
merged 5 commits into from
Sep 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions java/api/src/main/java/io/ray/api/call/BaseActorCreator.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,20 @@ public T setMaxRestarts(int maxRestarts) {
return self();
}

/**
* This specifies the maximum number of times that an actor task can be retried. The minimum valid
* value is 0 (default), which indicates that the actor task can't be retried. A value of -1
* indicates that an actor task can be retried indefinitely.
*
* @param maxTaskRetries max number of actor task retries
* @return self
* @see ActorCreationOptions.Builder#setMaxTaskRetries(int)
*/
public T setMaxTaskRetries(int maxTaskRetries) {
builder.setMaxTaskRetries(maxTaskRetries);
return self();
}

/**
* Set the max number of concurrent calls to allow for this actor.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public class ActorCreationOptions extends BaseTaskOptions {
public final String name;
public ActorLifetime lifetime;
public final int maxRestarts;
public final int maxTaskRetries;
public final List<String> jvmOptions;
public final int maxConcurrency;
public final PlacementGroup group;
Expand All @@ -32,6 +33,7 @@ private ActorCreationOptions(
ActorLifetime lifetime,
Map<String, Double> resources,
int maxRestarts,
int maxTaskRetries,
List<String> jvmOptions,
int maxConcurrency,
PlacementGroup group,
Expand All @@ -45,6 +47,7 @@ private ActorCreationOptions(
this.name = name;
this.lifetime = lifetime;
this.maxRestarts = maxRestarts;
this.maxTaskRetries = maxTaskRetries;
this.jvmOptions = jvmOptions;
this.maxConcurrency = maxConcurrency;
this.group = group;
Expand All @@ -62,6 +65,7 @@ public static class Builder {
private ActorLifetime lifetime = null;
private Map<String, Double> resources = new HashMap<>();
private int maxRestarts = 0;
private int maxTaskRetries = 0;
private List<String> jvmOptions = new ArrayList<>();
private int maxConcurrency = 1;
private PlacementGroup group;
Expand Down Expand Up @@ -131,6 +135,19 @@ public Builder setMaxRestarts(int maxRestarts) {
return this;
}

/**
* This specifies the maximum number of times that the actor task can be resubmitted. The
* minimum valid value is 0 (default), which indicates that the actor task can't be resubmited.
* A value of -1 indicates that an actor task can be resubmited indefinitely.
*
* @param maxTaskRetries max number of actor task retries
* @return self
*/
public Builder setMaxTaskRetries(int maxTaskRetries) {
this.maxTaskRetries = maxTaskRetries;
return this;
}

/**
* Set the JVM options for the Java worker that this actor is running in.
*
Expand Down Expand Up @@ -210,6 +227,7 @@ public ActorCreationOptions build() {
lifetime,
resources,
maxRestarts,
maxTaskRetries,
jvmOptions,
maxConcurrency,
group,
Expand Down
50 changes: 45 additions & 5 deletions java/test/src/main/java/io/ray/test/ActorRestartTest.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.ray.test;

import io.ray.api.ActorHandle;
import io.ray.api.ObjectRef;
import io.ray.api.Ray;
import io.ray.api.exception.RayActorException;
import io.ray.api.exception.RayException;
Expand Down Expand Up @@ -32,6 +33,16 @@ public int increase() {
return value;
}

public int increaseAfterTimeout(int timeout) {
try {
Thread.sleep(timeout);
} catch (Exception e) {
e.printStackTrace();
}
value += 1;
return value;
}

public boolean checkWasCurrentActorRestartedInActorTask() {
return Ray.getRuntimeContext().wasCurrentActorRestarted();
}
Expand All @@ -55,7 +66,8 @@ public void testActorRestart() throws InterruptedException, IOException {
actor.task(Counter::checkWasCurrentActorRestartedInActorTask).remote().get());

// Kill the actor process.
killActorProcess(actor);
int pid = actor.task(Counter::getPid).remote().get();
killActorProcess(pid);

waitForActorAlive(actor);
int value = actor.task(Counter::increase).remote().get();
Expand All @@ -67,18 +79,46 @@ public void testActorRestart() throws InterruptedException, IOException {
Assert.assertTrue(actor.task(Counter::checkWasCurrentActorRestartedInActorTask).remote().get());

// Kill the actor process again.
killActorProcess(actor);
pid = actor.task(Counter::getPid).remote().get();
killActorProcess(pid);

// Try calling increase on this actor again and this should fail.
Assert.assertThrows(
RayActorException.class, () -> actor.task(Counter::increase).remote().get());
}

public void testActorRestartWithRetry() throws InterruptedException, IOException {
ActorHandle<Counter> actor =
Ray.actor(Counter::new).setMaxRestarts(1).setMaxTaskRetries(1).remote();
// Call increase 3 times.
for (int i = 0; i < 3; i++) {
int result = actor.task(Counter::increase).remote().get();
Assert.assertEquals(result, i + 1);
}
// Need to call getPid before submitting the task to kill
int pid = actor.task(Counter::getPid).remote().get();
// Task to kill
ObjectRef<Integer> ref = actor.task(Counter::increaseAfterTimeout, 3000).remote();
// Kill the actor process.
killActorProcess(pid);
waitForActorAlive(actor);
// The task should fail and retry, so result is 1
int result = ref.get();
Assert.assertEquals(result, 1);
// Check that we can still call the actor
result = actor.task(Counter::increase).remote().get();
Assert.assertEquals(result, 2);
// Kill the actor process again.
pid = actor.task(Counter::getPid).remote().get();
killActorProcess(pid);
// Try calling increase on this actor again and this should fail.
Assert.assertThrows(
RayActorException.class, () -> actor.task(Counter::increase).remote().get());
}

/** The helper to kill a counter actor. */
private static void killActorProcess(ActorHandle<Counter> actor)
throws IOException, InterruptedException {
private static void killActorProcess(int pid) throws IOException, InterruptedException {
// Kill the actor process.
int pid = actor.task(Counter::getPid).remote().get();
Process p = Runtime.getRuntime().exec("kill -9 " + pid);
// Wait for the actor to be killed.
TimeUnit.SECONDS.sleep(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ inline ActorCreationOptions ToActorCreationOptions(JNIEnv *env,
std::string name = "";
std::optional<bool> is_detached = std::nullopt;
int64_t max_restarts = 0;
int64_t max_task_retries = 0;
std::unordered_map<std::string, double> resources;
std::vector<std::string> dynamic_worker_options;
uint64_t max_concurrency = 1;
Expand All @@ -186,6 +187,8 @@ inline ActorCreationOptions ToActorCreationOptions(JNIEnv *env,

max_restarts =
env->GetIntField(actorCreationOptions, java_actor_creation_options_max_restarts);
max_task_retries =
env->GetIntField(actorCreationOptions, java_actor_creation_options_max_task_retries);
jobject java_resources =
env->GetObjectField(actorCreationOptions, java_base_task_options_resources);
resources = ToResources(env, java_resources);
Expand Down Expand Up @@ -277,7 +280,7 @@ inline ActorCreationOptions ToActorCreationOptions(JNIEnv *env,
}
ActorCreationOptions actor_creation_options{
max_restarts,
0, // TODO: Allow setting max_task_retries from Java.
max_task_retries,
static_cast<int>(max_concurrency),
resources,
resources,
Expand Down
3 changes: 3 additions & 0 deletions src/ray/core_worker/lib/java/jni_init.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ jclass java_actor_creation_options_class;
jfieldID java_actor_creation_options_name;
jfieldID java_actor_creation_options_lifetime;
jfieldID java_actor_creation_options_max_restarts;
jfieldID java_actor_creation_options_max_task_retries;
jfieldID java_actor_creation_options_jvm_options;
jfieldID java_actor_creation_options_max_concurrency;
jfieldID java_actor_creation_options_group;
Expand Down Expand Up @@ -350,6 +351,8 @@ jint JNI_OnLoad(JavaVM *vm, void *reserved) {
"Lio/ray/api/options/ActorLifetime;");
java_actor_creation_options_max_restarts =
env->GetFieldID(java_actor_creation_options_class, "maxRestarts", "I");
java_actor_creation_options_max_task_retries =
env->GetFieldID(java_actor_creation_options_class, "maxTaskRetries", "I");
java_actor_creation_options_jvm_options = env->GetFieldID(
java_actor_creation_options_class, "jvmOptions", "Ljava/util/List;");
java_actor_creation_options_max_concurrency =
Expand Down
2 changes: 2 additions & 0 deletions src/ray/core_worker/lib/java/jni_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ extern jfieldID java_actor_creation_options_name;
extern jfieldID java_actor_creation_options_lifetime;
/// maxRestarts field of ActorCreationOptions class
extern jfieldID java_actor_creation_options_max_restarts;
/// maxTaskRetries field of ActorCreationOptions class
extern jfieldID java_actor_creation_options_max_task_retries;
/// jvmOptions field of ActorCreationOptions class
extern jfieldID java_actor_creation_options_jvm_options;
/// maxConcurrency field of ActorCreationOptions class
Expand Down