Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C++ worker] Refine worker context and more #26281

Merged
merged 2 commits into from
Jul 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cpp/src/ray/runtime/abstract_ray_runtime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ void AbstractRayRuntime::Put(std::shared_ptr<msgpack::sbuffer> data,
}

std::string AbstractRayRuntime::Put(std::shared_ptr<msgpack::sbuffer> data) {
ObjectID object_id{};
ObjectID object_id;
object_store_->Put(data, &object_id);
return object_id.Binary();
}
Expand Down Expand Up @@ -224,8 +224,8 @@ const JobID &AbstractRayRuntime::GetCurrentJobID() {
return GetWorkerContext().GetCurrentJobID();
}

const WorkerContext &AbstractRayRuntime::GetWorkerContext() {
return CoreWorkerProcess::GetCoreWorker().GetWorkerContext();
const ActorID &AbstractRayRuntime::GetCurrentActorID() {
return GetWorkerContext().GetCurrentActorID();
}

void AbstractRayRuntime::AddLocalReference(const std::string &id) {
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/ray/runtime/abstract_ray_runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ class AbstractRayRuntime : public RayRuntime {

const JobID &GetCurrentJobID();

virtual const WorkerContext &GetWorkerContext();
const ActorID &GetCurrentActorID();

virtual const WorkerContext &GetWorkerContext() = 0;

static std::shared_ptr<AbstractRayRuntime> GetInstance();
static std::shared_ptr<AbstractRayRuntime> DoInit();
Expand All @@ -101,8 +103,6 @@ class AbstractRayRuntime : public RayRuntime {

bool WasCurrentActorRestarted();

virtual ActorID GetCurrentActorID() { return ActorID::Nil(); }

virtual std::vector<PlacementGroup> GetAllPlacementGroups();
virtual PlacementGroup GetPlacementGroupById(const std::string &id);
virtual PlacementGroup GetPlacementGroup(const std::string &name);
Expand Down
2 changes: 0 additions & 2 deletions cpp/src/ray/runtime/local_mode_ray_runtime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ ActorID LocalModeRayRuntime::GetNextActorID() {
return actor_id;
}

ActorID LocalModeRayRuntime::GetCurrentActorID() { return worker_.GetCurrentActorID(); }

const WorkerContext &LocalModeRayRuntime::GetWorkerContext() { return worker_; }

std::string LocalModeRayRuntime::Put(std::shared_ptr<msgpack::sbuffer> data) {
Expand Down
3 changes: 1 addition & 2 deletions cpp/src/ray/runtime/local_mode_ray_runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ class LocalModeRayRuntime : public AbstractRayRuntime {
LocalModeRayRuntime();

ActorID GetNextActorID();
ActorID GetCurrentActorID();
std::string Put(std::shared_ptr<msgpack::sbuffer> data);
const WorkerContext &GetWorkerContext();
bool IsLocalMode() { return true; }
Expand All @@ -36,4 +35,4 @@ class LocalModeRayRuntime : public AbstractRayRuntime {
};

} // namespace internal
} // namespace ray
} // namespace ray
4 changes: 2 additions & 2 deletions cpp/src/ray/runtime/native_ray_runtime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ NativeRayRuntime::NativeRayRuntime() {
ProcessHelper::GetInstance().CreateGlobalStateAccessor(bootstrap_address);
}

ActorID NativeRayRuntime::GetCurrentActorID() {
return core::CoreWorkerProcess::GetCoreWorker().GetWorkerContext().GetCurrentActorID();
const WorkerContext &NativeRayRuntime::GetWorkerContext() {
return core::CoreWorkerProcess::GetCoreWorker().GetWorkerContext();
}

} // namespace internal
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/ray/runtime/native_ray_runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ namespace internal {
class NativeRayRuntime : public AbstractRayRuntime {
public:
NativeRayRuntime();
ActorID GetCurrentActorID();

const WorkerContext &GetWorkerContext();
};

} // namespace internal
} // namespace ray
} // namespace ray
3 changes: 2 additions & 1 deletion cpp/src/ray/runtime/object/local_mode_object_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ LocalModeObjectStore::LocalModeObjectStore(LocalModeRayRuntime &local_mode_ray_t

void LocalModeObjectStore::PutRaw(std::shared_ptr<msgpack::sbuffer> data,
ObjectID *object_id) {
PutRaw(data, (const ObjectID)(*object_id));
*object_id = ObjectID::FromRandom();
PutRaw(data, *object_id);
}

void LocalModeObjectStore::PutRaw(std::shared_ptr<msgpack::sbuffer> data,
Expand Down
6 changes: 0 additions & 6 deletions cpp/src/ray/runtime/task/task_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,6 @@ using ray::core::CoreWorkerProcess;

std::shared_ptr<msgpack::sbuffer> TaskExecutor::current_actor_ = nullptr;

// TODO(SongGuyang): Make a common task execution function used for both local mode and
// cluster mode.
std::unique_ptr<ObjectID> TaskExecutor::Execute(InvocationSpec &invocation) {
return std::make_unique<ObjectID>();
};

/// TODO(qicosmos): Need to add more details of the error messages, such as object id,
/// task id etc.
std::pair<Status, std::shared_ptr<msgpack::sbuffer>> GetExecuteResult(
Expand Down
3 changes: 0 additions & 3 deletions cpp/src/ray/runtime/task/task_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,6 @@ class TaskExecutor {
public:
TaskExecutor() = default;

/// TODO(SongGuyang): support multiple tasks execution
std::unique_ptr<ObjectID> Execute(InvocationSpec &invocation);

static void Invoke(
const TaskSpecification &task_spec,
std::shared_ptr<msgpack::sbuffer> actor,
Expand Down
10 changes: 6 additions & 4 deletions cpp/src/ray/util/process_helper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,18 @@ void ProcessHelper::StartRayNode(const int port,
cmdargs.insert(cmdargs.end(), head_args.begin(), head_args.end());
}
RAY_LOG(INFO) << CreateCommandLine(cmdargs);
RAY_CHECK(!Process::Spawn(cmdargs, true).second);
std::this_thread::sleep_for(std::chrono::seconds(5));
auto spawn_result = Process::Spawn(cmdargs, true);
RAY_CHECK(!spawn_result.second);
spawn_result.first.Wait();
return;
}

void ProcessHelper::StopRayNode() {
std::vector<std::string> cmdargs({"ray", "stop"});
RAY_LOG(INFO) << CreateCommandLine(cmdargs);
RAY_CHECK(!Process::Spawn(cmdargs, true).second);
std::this_thread::sleep_for(std::chrono::seconds(3));
auto spawn_result = Process::Spawn(cmdargs, true);
RAY_CHECK(!spawn_result.second);
spawn_result.first.Wait();
return;
}

Expand Down