diff --git a/cpp/src/ray/runtime/abstract_ray_runtime.cc b/cpp/src/ray/runtime/abstract_ray_runtime.cc index 8604f2ce9309..7596c79dbfab 100644 --- a/cpp/src/ray/runtime/abstract_ray_runtime.cc +++ b/cpp/src/ray/runtime/abstract_ray_runtime.cc @@ -87,7 +87,7 @@ void AbstractRayRuntime::Put(std::shared_ptr data, } std::string AbstractRayRuntime::Put(std::shared_ptr data) { - ObjectID object_id{}; + ObjectID object_id; object_store_->Put(data, &object_id); return object_id.Binary(); } @@ -224,8 +224,8 @@ const JobID &AbstractRayRuntime::GetCurrentJobID() { return GetWorkerContext().GetCurrentJobID(); } -const WorkerContext &AbstractRayRuntime::GetWorkerContext() { - return CoreWorkerProcess::GetCoreWorker().GetWorkerContext(); +const ActorID &AbstractRayRuntime::GetCurrentActorID() { + return GetWorkerContext().GetCurrentActorID(); } void AbstractRayRuntime::AddLocalReference(const std::string &id) { diff --git a/cpp/src/ray/runtime/abstract_ray_runtime.h b/cpp/src/ray/runtime/abstract_ray_runtime.h index d9467c2c1b65..12b061fdc732 100644 --- a/cpp/src/ray/runtime/abstract_ray_runtime.h +++ b/cpp/src/ray/runtime/abstract_ray_runtime.h @@ -90,7 +90,9 @@ class AbstractRayRuntime : public RayRuntime { const JobID &GetCurrentJobID(); - virtual const WorkerContext &GetWorkerContext(); + const ActorID &GetCurrentActorID(); + + virtual const WorkerContext &GetWorkerContext() = 0; static std::shared_ptr GetInstance(); static std::shared_ptr DoInit(); @@ -101,8 +103,6 @@ class AbstractRayRuntime : public RayRuntime { bool WasCurrentActorRestarted(); - virtual ActorID GetCurrentActorID() { return ActorID::Nil(); } - virtual std::vector GetAllPlacementGroups(); virtual PlacementGroup GetPlacementGroupById(const std::string &id); virtual PlacementGroup GetPlacementGroup(const std::string &name); diff --git a/cpp/src/ray/runtime/local_mode_ray_runtime.cc b/cpp/src/ray/runtime/local_mode_ray_runtime.cc index 47e9c9dc1634..a8024899c33a 100644 --- a/cpp/src/ray/runtime/local_mode_ray_runtime.cc +++ b/cpp/src/ray/runtime/local_mode_ray_runtime.cc @@ -38,8 +38,6 @@ ActorID LocalModeRayRuntime::GetNextActorID() { return actor_id; } -ActorID LocalModeRayRuntime::GetCurrentActorID() { return worker_.GetCurrentActorID(); } - const WorkerContext &LocalModeRayRuntime::GetWorkerContext() { return worker_; } std::string LocalModeRayRuntime::Put(std::shared_ptr data) { diff --git a/cpp/src/ray/runtime/local_mode_ray_runtime.h b/cpp/src/ray/runtime/local_mode_ray_runtime.h index 96dcd8c18e9b..06dc6b8815f7 100644 --- a/cpp/src/ray/runtime/local_mode_ray_runtime.h +++ b/cpp/src/ray/runtime/local_mode_ray_runtime.h @@ -26,7 +26,6 @@ class LocalModeRayRuntime : public AbstractRayRuntime { LocalModeRayRuntime(); ActorID GetNextActorID(); - ActorID GetCurrentActorID(); std::string Put(std::shared_ptr data); const WorkerContext &GetWorkerContext(); bool IsLocalMode() { return true; } @@ -36,4 +35,4 @@ class LocalModeRayRuntime : public AbstractRayRuntime { }; } // namespace internal -} // namespace ray \ No newline at end of file +} // namespace ray diff --git a/cpp/src/ray/runtime/native_ray_runtime.cc b/cpp/src/ray/runtime/native_ray_runtime.cc index e6aa91c17c41..1921f3cb89c6 100644 --- a/cpp/src/ray/runtime/native_ray_runtime.cc +++ b/cpp/src/ray/runtime/native_ray_runtime.cc @@ -39,8 +39,8 @@ NativeRayRuntime::NativeRayRuntime() { ProcessHelper::GetInstance().CreateGlobalStateAccessor(bootstrap_address); } -ActorID NativeRayRuntime::GetCurrentActorID() { - return core::CoreWorkerProcess::GetCoreWorker().GetWorkerContext().GetCurrentActorID(); +const WorkerContext &NativeRayRuntime::GetWorkerContext() { + return core::CoreWorkerProcess::GetCoreWorker().GetWorkerContext(); } } // namespace internal diff --git a/cpp/src/ray/runtime/native_ray_runtime.h b/cpp/src/ray/runtime/native_ray_runtime.h index 50164fa29565..be1811037a92 100644 --- a/cpp/src/ray/runtime/native_ray_runtime.h +++ b/cpp/src/ray/runtime/native_ray_runtime.h @@ -24,8 +24,9 @@ namespace internal { class NativeRayRuntime : public AbstractRayRuntime { public: NativeRayRuntime(); - ActorID GetCurrentActorID(); + + const WorkerContext &GetWorkerContext(); }; } // namespace internal -} // namespace ray \ No newline at end of file +} // namespace ray diff --git a/cpp/src/ray/runtime/object/local_mode_object_store.cc b/cpp/src/ray/runtime/object/local_mode_object_store.cc index fdc06bc1477e..1102e5c84bc3 100644 --- a/cpp/src/ray/runtime/object/local_mode_object_store.cc +++ b/cpp/src/ray/runtime/object/local_mode_object_store.cc @@ -32,7 +32,8 @@ LocalModeObjectStore::LocalModeObjectStore(LocalModeRayRuntime &local_mode_ray_t void LocalModeObjectStore::PutRaw(std::shared_ptr data, ObjectID *object_id) { - PutRaw(data, (const ObjectID)(*object_id)); + *object_id = ObjectID::FromRandom(); + PutRaw(data, *object_id); } void LocalModeObjectStore::PutRaw(std::shared_ptr data, diff --git a/cpp/src/ray/runtime/task/task_executor.cc b/cpp/src/ray/runtime/task/task_executor.cc index 948d258c69d9..426920b0cdbe 100644 --- a/cpp/src/ray/runtime/task/task_executor.cc +++ b/cpp/src/ray/runtime/task/task_executor.cc @@ -76,12 +76,6 @@ using ray::core::CoreWorkerProcess; std::shared_ptr TaskExecutor::current_actor_ = nullptr; -// TODO(SongGuyang): Make a common task execution function used for both local mode and -// cluster mode. -std::unique_ptr TaskExecutor::Execute(InvocationSpec &invocation) { - return std::make_unique(); -}; - /// TODO(qicosmos): Need to add more details of the error messages, such as object id, /// task id etc. std::pair> GetExecuteResult( diff --git a/cpp/src/ray/runtime/task/task_executor.h b/cpp/src/ray/runtime/task/task_executor.h index b4223970254a..2e48351e86dc 100644 --- a/cpp/src/ray/runtime/task/task_executor.h +++ b/cpp/src/ray/runtime/task/task_executor.h @@ -67,9 +67,6 @@ class TaskExecutor { public: TaskExecutor() = default; - /// TODO(SongGuyang): support multiple tasks execution - std::unique_ptr Execute(InvocationSpec &invocation); - static void Invoke( const TaskSpecification &task_spec, std::shared_ptr actor, diff --git a/cpp/src/ray/util/process_helper.cc b/cpp/src/ray/util/process_helper.cc index 50786bc198b1..ad3d34b966dc 100644 --- a/cpp/src/ray/util/process_helper.cc +++ b/cpp/src/ray/util/process_helper.cc @@ -43,16 +43,18 @@ void ProcessHelper::StartRayNode(const int port, cmdargs.insert(cmdargs.end(), head_args.begin(), head_args.end()); } RAY_LOG(INFO) << CreateCommandLine(cmdargs); - RAY_CHECK(!Process::Spawn(cmdargs, true).second); - std::this_thread::sleep_for(std::chrono::seconds(5)); + auto spawn_result = Process::Spawn(cmdargs, true); + RAY_CHECK(!spawn_result.second); + spawn_result.first.Wait(); return; } void ProcessHelper::StopRayNode() { std::vector cmdargs({"ray", "stop"}); RAY_LOG(INFO) << CreateCommandLine(cmdargs); - RAY_CHECK(!Process::Spawn(cmdargs, true).second); - std::this_thread::sleep_for(std::chrono::seconds(3)); + auto spawn_result = Process::Spawn(cmdargs, true); + RAY_CHECK(!spawn_result.second); + spawn_result.first.Wait(); return; }