From 358e6690f6cab992e8e12a665b3f4014460313c1 Mon Sep 17 00:00:00 2001 From: Jovan Date: Wed, 16 Jan 2019 18:37:27 +0800 Subject: [PATCH 1/7] Fix --- java/runtime/src/main/java/org/ray/runtime/Worker.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/java/runtime/src/main/java/org/ray/runtime/Worker.java b/java/runtime/src/main/java/org/ray/runtime/Worker.java index d97f4d2a6922..5813998a2eda 100644 --- a/java/runtime/src/main/java/org/ray/runtime/Worker.java +++ b/java/runtime/src/main/java/org/ray/runtime/Worker.java @@ -95,7 +95,12 @@ public void execute(TaskSpec spec) { currentActorId = returnId; } } finally { - runtime.getWorkerContext().setCurrentTask(null, null); + // Don't need to reset current driver id if the worker is an actor. + // Because the following tasks should all have the same driver id. + if (!spec.isActorTask() && !spec.isActorCreationTask()) { + runtime.getWorkerContext().setCurrentTask(null, null); + } + Thread.currentThread().setContextClassLoader(oldLoader); } } From b3128b5ba71e1c9fcf24264c19423537bb098158 Mon Sep 17 00:00:00 2001 From: Jovan Date: Thu, 17 Jan 2019 00:56:24 +0800 Subject: [PATCH 2/7] add assert --- .../src/main/java/org/ray/runtime/AbstractRayRuntime.java | 4 ++++ python/ray/worker.py | 3 +++ 2 files changed, 7 insertions(+) diff --git a/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java b/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java index 049e4a9c0cdb..1b6ec4993837 100644 --- a/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java +++ b/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java @@ -1,5 +1,6 @@ package org.ray.runtime; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import java.util.ArrayList; import java.util.Arrays; @@ -215,6 +216,7 @@ public WaitResult wait(List> waitList, int numReturns, int t @Override public RayObject call(RayFunc func, Object[] args, CallOptions options) { + Preconditions.checkState(!workerContext.getCurrentDriverId().isNil()); TaskSpec spec = createTaskSpec(func, RayActorImpl.NIL, args, false, options); rayletClient.submitTask(spec); return new RayObjectImpl(spec.returnIds[0]); @@ -222,6 +224,7 @@ public RayObject call(RayFunc func, Object[] args, CallOptions options) { @Override public RayObject call(RayFunc func, RayActor actor, Object[] args) { + Preconditions.checkState(!workerContext.getCurrentDriverId().isNil()); if (!(actor instanceof RayActorImpl)) { throw new IllegalArgumentException("Unsupported actor type: " + actor.getClass().getName()); } @@ -240,6 +243,7 @@ public RayObject call(RayFunc func, RayActor actor, Object[] args) { @SuppressWarnings("unchecked") public RayActor createActor(RayFunc actorFactoryFunc, Object[] args, ActorCreationOptions options) { + Preconditions.checkState(!workerContext.getCurrentDriverId().isNil()); TaskSpec spec = createTaskSpec(actorFactoryFunc, RayActorImpl.NIL, args, true, options); RayActorImpl actor = new RayActorImpl(spec.returnIds[0]); diff --git a/python/ray/worker.py b/python/ray/worker.py index 1531f54e6bc3..1f51b78237fa 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -620,6 +620,9 @@ def submit_task(self, self.task_context.task_index += 1 # The parent task must be set for the submitted task. assert not self.current_task_id.is_nil() + # Current driver id must not be nil when submitting a task. + # Because every task must belong to a driver. + assert not self.task_driver_id.is_nil() # Submit the task to local scheduler. function_descriptor_list = ( function_descriptor.get_function_descriptor_list()) From 151da029744470e6c7a1daebbcc96eff50d0dd6d Mon Sep 17 00:00:00 2001 From: Jovan Date: Thu, 17 Jan 2019 21:21:08 +0800 Subject: [PATCH 3/7] Fix --- .../src/main/java/org/ray/runtime/AbstractRayRuntime.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java b/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java index 1b6ec4993837..7c3985f7b205 100644 --- a/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java +++ b/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java @@ -216,7 +216,6 @@ public WaitResult wait(List> waitList, int numReturns, int t @Override public RayObject call(RayFunc func, Object[] args, CallOptions options) { - Preconditions.checkState(!workerContext.getCurrentDriverId().isNil()); TaskSpec spec = createTaskSpec(func, RayActorImpl.NIL, args, false, options); rayletClient.submitTask(spec); return new RayObjectImpl(spec.returnIds[0]); @@ -224,7 +223,6 @@ public RayObject call(RayFunc func, Object[] args, CallOptions options) { @Override public RayObject call(RayFunc func, RayActor actor, Object[] args) { - Preconditions.checkState(!workerContext.getCurrentDriverId().isNil()); if (!(actor instanceof RayActorImpl)) { throw new IllegalArgumentException("Unsupported actor type: " + actor.getClass().getName()); } @@ -243,7 +241,6 @@ public RayObject call(RayFunc func, RayActor actor, Object[] args) { @SuppressWarnings("unchecked") public RayActor createActor(RayFunc actorFactoryFunc, Object[] args, ActorCreationOptions options) { - Preconditions.checkState(!workerContext.getCurrentDriverId().isNil()); TaskSpec spec = createTaskSpec(actorFactoryFunc, RayActorImpl.NIL, args, true, options); RayActorImpl actor = new RayActorImpl(spec.returnIds[0]); From 9793290de24c36567127356c98970a865bef58cb Mon Sep 17 00:00:00 2001 From: Jovan Date: Thu, 17 Jan 2019 21:43:54 +0800 Subject: [PATCH 4/7] Fix --- .../src/main/java/org/ray/runtime/Worker.java | 14 +++++++++----- .../main/java/org/ray/runtime/WorkerContext.java | 15 ++++++--------- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/java/runtime/src/main/java/org/ray/runtime/Worker.java b/java/runtime/src/main/java/org/ray/runtime/Worker.java index 5813998a2eda..76dcc445e3ed 100644 --- a/java/runtime/src/main/java/org/ray/runtime/Worker.java +++ b/java/runtime/src/main/java/org/ray/runtime/Worker.java @@ -59,7 +59,8 @@ public void execute(TaskSpec spec) { RayFunction rayFunction = runtime.getFunctionManager() .getFunction(spec.driverId, spec.functionDescriptor); // Set context - runtime.getWorkerContext().setCurrentTask(spec, rayFunction.classLoader); + runtime.getWorkerContext().setCurrentTask( + spec.taskId, spec.driverId, rayFunction.classLoader); Thread.currentThread().setContextClassLoader(rayFunction.classLoader); // Get local actor object and arguments. Object actor = null; @@ -95,10 +96,13 @@ public void execute(TaskSpec spec) { currentActorId = returnId; } } finally { - // Don't need to reset current driver id if the worker is an actor. - // Because the following tasks should all have the same driver id. - if (!spec.isActorTask() && !spec.isActorCreationTask()) { - runtime.getWorkerContext().setCurrentTask(null, null); + if (spec.isActorTask() || spec.isActorCreationTask()) { + // Don't need to reset current driver id if the worker is an actor. + // Because the following tasks should all have the same driver id. + runtime.getWorkerContext().setCurrentTask(null, spec.driverId, null); + } else { + // For normal task, we should set current task id to null as well. + runtime.getWorkerContext().setCurrentTask(null, null, null); } Thread.currentThread().setContextClassLoader(oldLoader); diff --git a/java/runtime/src/main/java/org/ray/runtime/WorkerContext.java b/java/runtime/src/main/java/org/ray/runtime/WorkerContext.java index b97a08b5288a..11cce7fe0f7a 100644 --- a/java/runtime/src/main/java/org/ray/runtime/WorkerContext.java +++ b/java/runtime/src/main/java/org/ray/runtime/WorkerContext.java @@ -47,7 +47,7 @@ public WorkerContext(WorkerMode workerMode, UniqueId driverId) { currentClassLoader = null; } else { workerId = UniqueId.randomId(); - setCurrentTask(null, null); + setCurrentTask(null, null, null); } } @@ -63,18 +63,15 @@ public UniqueId getCurrentTaskId() { * Set the current task which is being executed by the current worker. Note, this method can only * be called from the main thread. */ - public void setCurrentTask(TaskSpec task, ClassLoader classLoader) { + public void setCurrentTask(UniqueId currentTaskId, + UniqueId currentDriverId, ClassLoader classLoader) { Preconditions.checkState( Thread.currentThread().getId() == mainThreadId, "This method should only be called from the main thread." ); - if (task != null) { - currentTaskId.set(task.taskId); - currentDriverId = task.driverId; - } else { - currentTaskId.set(UniqueId.NIL); - currentDriverId = UniqueId.NIL; - } + + this.currentTaskId.set(currentTaskId); + this.currentDriverId = currentDriverId; taskIndex.set(0); putIndex.set(0); currentClassLoader = classLoader; From f3c298ac30ee625b96fa1c00d8f2a950b8af9edb Mon Sep 17 00:00:00 2001 From: Jovan Date: Tue, 22 Jan 2019 19:52:49 +0800 Subject: [PATCH 5/7] Fix lint --- java/runtime/src/main/java/org/ray/runtime/Worker.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/runtime/src/main/java/org/ray/runtime/Worker.java b/java/runtime/src/main/java/org/ray/runtime/Worker.java index 76dcc445e3ed..4d00bf50ea71 100644 --- a/java/runtime/src/main/java/org/ray/runtime/Worker.java +++ b/java/runtime/src/main/java/org/ray/runtime/Worker.java @@ -60,7 +60,7 @@ public void execute(TaskSpec spec) { .getFunction(spec.driverId, spec.functionDescriptor); // Set context runtime.getWorkerContext().setCurrentTask( - spec.taskId, spec.driverId, rayFunction.classLoader); + spec.taskId, spec.driverId, rayFunction.classLoader); Thread.currentThread().setContextClassLoader(rayFunction.classLoader); // Get local actor object and arguments. Object actor = null; From 8f84e26515bdaf0c0fad4e4369100aebe0322cda Mon Sep 17 00:00:00 2001 From: Jovan Date: Wed, 23 Jan 2019 17:52:09 +0800 Subject: [PATCH 6/7] Refine --- .../src/main/java/org/ray/runtime/Worker.java | 12 +----------- .../main/java/org/ray/runtime/WorkerContext.java | 13 +++++++------ 2 files changed, 8 insertions(+), 17 deletions(-) diff --git a/java/runtime/src/main/java/org/ray/runtime/Worker.java b/java/runtime/src/main/java/org/ray/runtime/Worker.java index 4d00bf50ea71..6e403c80aec6 100644 --- a/java/runtime/src/main/java/org/ray/runtime/Worker.java +++ b/java/runtime/src/main/java/org/ray/runtime/Worker.java @@ -59,8 +59,7 @@ public void execute(TaskSpec spec) { RayFunction rayFunction = runtime.getFunctionManager() .getFunction(spec.driverId, spec.functionDescriptor); // Set context - runtime.getWorkerContext().setCurrentTask( - spec.taskId, spec.driverId, rayFunction.classLoader); + runtime.getWorkerContext().setCurrentTask(spec, rayFunction.classLoader); Thread.currentThread().setContextClassLoader(rayFunction.classLoader); // Get local actor object and arguments. Object actor = null; @@ -96,15 +95,6 @@ public void execute(TaskSpec spec) { currentActorId = returnId; } } finally { - if (spec.isActorTask() || spec.isActorCreationTask()) { - // Don't need to reset current driver id if the worker is an actor. - // Because the following tasks should all have the same driver id. - runtime.getWorkerContext().setCurrentTask(null, spec.driverId, null); - } else { - // For normal task, we should set current task id to null as well. - runtime.getWorkerContext().setCurrentTask(null, null, null); - } - Thread.currentThread().setContextClassLoader(oldLoader); } } diff --git a/java/runtime/src/main/java/org/ray/runtime/WorkerContext.java b/java/runtime/src/main/java/org/ray/runtime/WorkerContext.java index 11cce7fe0f7a..8aa2eedea6ba 100644 --- a/java/runtime/src/main/java/org/ray/runtime/WorkerContext.java +++ b/java/runtime/src/main/java/org/ray/runtime/WorkerContext.java @@ -40,14 +40,15 @@ public WorkerContext(WorkerMode workerMode, UniqueId driverId) { taskIndex = ThreadLocal.withInitial(() -> 0); putIndex = ThreadLocal.withInitial(() -> 0); currentTaskId = ThreadLocal.withInitial(UniqueId::randomId); + currentClassLoader = null; if (workerMode == WorkerMode.DRIVER) { workerId = driverId; currentTaskId.set(UniqueId.randomId()); currentDriverId = driverId; - currentClassLoader = null; } else { workerId = UniqueId.randomId(); - setCurrentTask(null, null, null); + this.currentTaskId.set(UniqueId.NIL); + this.currentDriverId = UniqueId.NIL; } } @@ -63,15 +64,15 @@ public UniqueId getCurrentTaskId() { * Set the current task which is being executed by the current worker. Note, this method can only * be called from the main thread. */ - public void setCurrentTask(UniqueId currentTaskId, - UniqueId currentDriverId, ClassLoader classLoader) { + public void setCurrentTask(TaskSpec task, ClassLoader classLoader) { Preconditions.checkState( Thread.currentThread().getId() == mainThreadId, "This method should only be called from the main thread." ); - this.currentTaskId.set(currentTaskId); - this.currentDriverId = currentDriverId; + Preconditions.checkNotNull(task); + this.currentTaskId.set(task.taskId); + this.currentDriverId = task.driverId; taskIndex.set(0); putIndex.set(0); currentClassLoader = classLoader; From f4bf13b2c98adbdbc72f3f918ffda97af47f6665 Mon Sep 17 00:00:00 2001 From: Jovan Date: Wed, 23 Jan 2019 18:01:03 +0800 Subject: [PATCH 7/7] address comment --- .../src/main/java/org/ray/runtime/AbstractRayRuntime.java | 1 - 1 file changed, 1 deletion(-) diff --git a/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java b/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java index 7c3985f7b205..049e4a9c0cdb 100644 --- a/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java +++ b/java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java @@ -1,6 +1,5 @@ package org.ray.runtime; -import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import java.util.ArrayList; import java.util.Arrays;