Skip to content

Commit

Permalink
[java] Add maxTaskRetries option for java actor creation (ray-project…
Browse files Browse the repository at this point in the history
  • Loading branch information
kira-lin authored Sep 19, 2022
1 parent 586c217 commit 7280ef4
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 6 deletions.
14 changes: 14 additions & 0 deletions java/api/src/main/java/io/ray/api/call/BaseActorCreator.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,20 @@ public T setMaxRestarts(int maxRestarts) {
return self();
}

/**
* This specifies the maximum number of times that an actor task can be retried. The minimum valid
* value is 0 (default), which indicates that the actor task can't be retried. A value of -1
* indicates that an actor task can be retried indefinitely.
*
* @param maxTaskRetries max number of actor task retries
* @return self
* @see ActorCreationOptions.Builder#setMaxTaskRetries(int)
*/
public T setMaxTaskRetries(int maxTaskRetries) {
builder.setMaxTaskRetries(maxTaskRetries);
return self();
}

/**
* Set the max number of concurrent calls to allow for this actor.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public class ActorCreationOptions extends BaseTaskOptions {
public final String name;
public ActorLifetime lifetime;
public final int maxRestarts;
public final int maxTaskRetries;
public final List<String> jvmOptions;
public final int maxConcurrency;
public final PlacementGroup group;
Expand All @@ -32,6 +33,7 @@ private ActorCreationOptions(
ActorLifetime lifetime,
Map<String, Double> resources,
int maxRestarts,
int maxTaskRetries,
List<String> jvmOptions,
int maxConcurrency,
PlacementGroup group,
Expand All @@ -45,6 +47,7 @@ private ActorCreationOptions(
this.name = name;
this.lifetime = lifetime;
this.maxRestarts = maxRestarts;
this.maxTaskRetries = maxTaskRetries;
this.jvmOptions = jvmOptions;
this.maxConcurrency = maxConcurrency;
this.group = group;
Expand All @@ -62,6 +65,7 @@ public static class Builder {
private ActorLifetime lifetime = null;
private Map<String, Double> resources = new HashMap<>();
private int maxRestarts = 0;
private int maxTaskRetries = 0;
private List<String> jvmOptions = new ArrayList<>();
private int maxConcurrency = 1;
private PlacementGroup group;
Expand Down Expand Up @@ -131,6 +135,19 @@ public Builder setMaxRestarts(int maxRestarts) {
return this;
}

/**
* This specifies the maximum number of times that the actor task can be resubmitted. The
* minimum valid value is 0 (default), which indicates that the actor task can't be resubmited.
* A value of -1 indicates that an actor task can be resubmited indefinitely.
*
* @param maxTaskRetries max number of actor task retries
* @return self
*/
public Builder setMaxTaskRetries(int maxTaskRetries) {
this.maxTaskRetries = maxTaskRetries;
return this;
}

/**
* Set the JVM options for the Java worker that this actor is running in.
*
Expand Down Expand Up @@ -210,6 +227,7 @@ public ActorCreationOptions build() {
lifetime,
resources,
maxRestarts,
maxTaskRetries,
jvmOptions,
maxConcurrency,
group,
Expand Down
50 changes: 45 additions & 5 deletions java/test/src/main/java/io/ray/test/ActorRestartTest.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.ray.test;

import io.ray.api.ActorHandle;
import io.ray.api.ObjectRef;
import io.ray.api.Ray;
import io.ray.api.exception.RayActorException;
import io.ray.api.exception.RayException;
Expand Down Expand Up @@ -32,6 +33,16 @@ public int increase() {
return value;
}

public int increaseAfterTimeout(int timeout) {
try {
Thread.sleep(timeout);
} catch (Exception e) {
e.printStackTrace();
}
value += 1;
return value;
}

public boolean checkWasCurrentActorRestartedInActorTask() {
return Ray.getRuntimeContext().wasCurrentActorRestarted();
}
Expand All @@ -55,7 +66,8 @@ public void testActorRestart() throws InterruptedException, IOException {
actor.task(Counter::checkWasCurrentActorRestartedInActorTask).remote().get());

// Kill the actor process.
killActorProcess(actor);
int pid = actor.task(Counter::getPid).remote().get();
killActorProcess(pid);

waitForActorAlive(actor);
int value = actor.task(Counter::increase).remote().get();
Expand All @@ -67,18 +79,46 @@ public void testActorRestart() throws InterruptedException, IOException {
Assert.assertTrue(actor.task(Counter::checkWasCurrentActorRestartedInActorTask).remote().get());

// Kill the actor process again.
killActorProcess(actor);
pid = actor.task(Counter::getPid).remote().get();
killActorProcess(pid);

// Try calling increase on this actor again and this should fail.
Assert.assertThrows(
RayActorException.class, () -> actor.task(Counter::increase).remote().get());
}

public void testActorRestartWithRetry() throws InterruptedException, IOException {
ActorHandle<Counter> actor =
Ray.actor(Counter::new).setMaxRestarts(1).setMaxTaskRetries(1).remote();
// Call increase 3 times.
for (int i = 0; i < 3; i++) {
int result = actor.task(Counter::increase).remote().get();
Assert.assertEquals(result, i + 1);
}
// Need to call getPid before submitting the task to kill
int pid = actor.task(Counter::getPid).remote().get();
// Task to kill
ObjectRef<Integer> ref = actor.task(Counter::increaseAfterTimeout, 3000).remote();
// Kill the actor process.
killActorProcess(pid);
waitForActorAlive(actor);
// The task should fail and retry, so result is 1
int result = ref.get();
Assert.assertEquals(result, 1);
// Check that we can still call the actor
result = actor.task(Counter::increase).remote().get();
Assert.assertEquals(result, 2);
// Kill the actor process again.
pid = actor.task(Counter::getPid).remote().get();
killActorProcess(pid);
// Try calling increase on this actor again and this should fail.
Assert.assertThrows(
RayActorException.class, () -> actor.task(Counter::increase).remote().get());
}

/** The helper to kill a counter actor. */
private static void killActorProcess(ActorHandle<Counter> actor)
throws IOException, InterruptedException {
private static void killActorProcess(int pid) throws IOException, InterruptedException {
// Kill the actor process.
int pid = actor.task(Counter::getPid).remote().get();
Process p = Runtime.getRuntime().exec("kill -9 " + pid);
// Wait for the actor to be killed.
TimeUnit.SECONDS.sleep(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ inline ActorCreationOptions ToActorCreationOptions(JNIEnv *env,
std::string name = "";
std::optional<bool> is_detached = std::nullopt;
int64_t max_restarts = 0;
int64_t max_task_retries = 0;
std::unordered_map<std::string, double> resources;
std::vector<std::string> dynamic_worker_options;
uint64_t max_concurrency = 1;
Expand All @@ -186,6 +187,8 @@ inline ActorCreationOptions ToActorCreationOptions(JNIEnv *env,

max_restarts =
env->GetIntField(actorCreationOptions, java_actor_creation_options_max_restarts);
max_task_retries =
env->GetIntField(actorCreationOptions, java_actor_creation_options_max_task_retries);
jobject java_resources =
env->GetObjectField(actorCreationOptions, java_base_task_options_resources);
resources = ToResources(env, java_resources);
Expand Down Expand Up @@ -277,7 +280,7 @@ inline ActorCreationOptions ToActorCreationOptions(JNIEnv *env,
}
ActorCreationOptions actor_creation_options{
max_restarts,
0, // TODO: Allow setting max_task_retries from Java.
max_task_retries,
static_cast<int>(max_concurrency),
resources,
resources,
Expand Down
3 changes: 3 additions & 0 deletions src/ray/core_worker/lib/java/jni_init.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ jclass java_actor_creation_options_class;
jfieldID java_actor_creation_options_name;
jfieldID java_actor_creation_options_lifetime;
jfieldID java_actor_creation_options_max_restarts;
jfieldID java_actor_creation_options_max_task_retries;
jfieldID java_actor_creation_options_jvm_options;
jfieldID java_actor_creation_options_max_concurrency;
jfieldID java_actor_creation_options_group;
Expand Down Expand Up @@ -350,6 +351,8 @@ jint JNI_OnLoad(JavaVM *vm, void *reserved) {
"Lio/ray/api/options/ActorLifetime;");
java_actor_creation_options_max_restarts =
env->GetFieldID(java_actor_creation_options_class, "maxRestarts", "I");
java_actor_creation_options_max_task_retries =
env->GetFieldID(java_actor_creation_options_class, "maxTaskRetries", "I");
java_actor_creation_options_jvm_options = env->GetFieldID(
java_actor_creation_options_class, "jvmOptions", "Ljava/util/List;");
java_actor_creation_options_max_concurrency =
Expand Down
2 changes: 2 additions & 0 deletions src/ray/core_worker/lib/java/jni_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ extern jfieldID java_actor_creation_options_name;
extern jfieldID java_actor_creation_options_lifetime;
/// maxRestarts field of ActorCreationOptions class
extern jfieldID java_actor_creation_options_max_restarts;
/// maxTaskRetries field of ActorCreationOptions class
extern jfieldID java_actor_creation_options_max_task_retries;
/// jvmOptions field of ActorCreationOptions class
extern jfieldID java_actor_creation_options_jvm_options;
/// maxConcurrency field of ActorCreationOptions class
Expand Down

0 comments on commit 7280ef4

Please sign in to comment.