Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[direct call] changes raylet to push tasks to worker #5140

Merged
merged 32 commits into from
Jul 11, 2019
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
7f635b4
refactor grpc server
zhijunfu Jun 24, 2019
2205c87
format
zhijunfu Jun 24, 2019
0703b75
change GetTask() to PushTask()
zhijunfu Jun 24, 2019
6d5228a
merge code, and fix test
zhijunfu Jun 25, 2019
4c3eb6e
merge
zhijunfu Jun 26, 2019
d1b20e5
change PushTask to AssignTask
zhijunfu Jun 26, 2019
1a663bc
format
zhijunfu Jun 26, 2019
4ef00b2
merge
zhijunfu Jul 8, 2019
6caa126
add resource_ids
zhijunfu Jul 8, 2019
302e79f
move done_callback to server call
zhijunfu Jul 8, 2019
70d6e26
remove SetTaskHandler and initialize it in task receiver's constructor
zhijunfu Jul 8, 2019
17b22e4
format
zhijunfu Jul 8, 2019
dbf666c
resolve comments
zhijunfu Jul 8, 2019
a5f301d
update
zhijunfu Jul 8, 2019
0c762f1
update
zhijunfu Jul 8, 2019
819614f
Update src/ray/core_worker/core_worker.cc
zhijunfu Jul 9, 2019
beb01be
resolve comments
zhijunfu Jul 9, 2019
be05ed6
merge
zhijunfu Jul 9, 2019
878cae4
format
zhijunfu Jul 9, 2019
722511b
Merge branch 'worker_grpc' of https://github.com/ant-tech-alliance/ra…
zhijunfu Jul 9, 2019
3514156
merge
zhijunfu Jul 9, 2019
bddc209
Update src/ray/core_worker/transport/raylet_transport.cc
zhijunfu Jul 9, 2019
78bfc24
resolve comments
zhijunfu Jul 9, 2019
df2a6ff
Merge branch 'worker_grpc' of https://github.com/ant-tech-alliance/ra…
zhijunfu Jul 9, 2019
32b3d3e
resolve comments
zhijunfu Jul 10, 2019
d157b84
fix build
zhijunfu Jul 10, 2019
42179ea
format
zhijunfu Jul 10, 2019
20cbf1f
Merge branch 'master' of https://github.com/ray-project/ray into work…
zhijunfu Jul 10, 2019
85f2d73
merge
zhijunfu Jul 11, 2019
6600779
fix
raulchen Jul 11, 2019
5e66995
format
raulchen Jul 11, 2019
b60574d
noop
stephanie-wang Jul 11, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ namespace ray {

CoreWorker::CoreWorker(const enum WorkerType worker_type, const ::Language language,
const std::string &store_socket, const std::string &raylet_socket,
const JobID &job_id)
const JobID &job_id,
CoreWorkerTaskExecutionInterface::TaskExecutor execution_callback)
zhijunfu marked this conversation as resolved.
Show resolved Hide resolved
: worker_type_(worker_type),
language_(language),
raylet_socket_(raylet_socket),
Expand All @@ -14,9 +15,10 @@ CoreWorker::CoreWorker(const enum WorkerType worker_type, const ::Language langu
object_interface_(worker_context_, raylet_client_, store_socket) {
int rpc_server_port = 0;
if (worker_type_ == ray::WorkerType::WORKER) {
RAY_CHECK(execution_callback != nullptr);
task_execution_interface_ = std::unique_ptr<CoreWorkerTaskExecutionInterface>(
new CoreWorkerTaskExecutionInterface(worker_context_, raylet_client_,
object_interface_));
object_interface_, execution_callback));
rpc_server_port = task_execution_interface_->worker_server_.GetPort();
}
// TODO(zhijunfu): currently RayletClient would crash in its constructor if it cannot
Expand Down
3 changes: 2 additions & 1 deletion src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ class CoreWorker {
/// NOTE(zhijunfu): the constructor would throw if a failure happens.
CoreWorker(const WorkerType worker_type, const ::Language language,
const std::string &store_socket, const std::string &raylet_socket,
const JobID &job_id = JobID::Nil());
const JobID &job_id,
CoreWorkerTaskExecutionInterface::TaskExecutor execution_callback = nullptr);
zhijunfu marked this conversation as resolved.
Show resolved Hide resolved

/// Type of this worker.
enum WorkerType WorkerType() const { return worker_type_; }
Expand Down
18 changes: 10 additions & 8 deletions src/ray/core_worker/mock_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,7 @@ namespace ray {
/// for more details on how this class is used.
class MockWorker {
public:
MockWorker(const std::string &store_socket, const std::string &raylet_socket)
: worker_(WorkerType::WORKER, Language::PYTHON, store_socket, raylet_socket,
JobID::FromRandom()) {}

void Run() {
MockWorker(const std::string &store_socket, const std::string &raylet_socket) {
auto executor_func = [this](const RayFunction &ray_function,
const std::vector<std::shared_ptr<RayObject>> &args,
const TaskInfo &task_info, int num_returns) {
Expand All @@ -40,17 +36,23 @@ class MockWorker {
// Write the merged content to each of return ids.
for (int i = 0; i < num_returns; i++) {
ObjectID id = ObjectID::ForTaskReturn(task_info.task_id, i + 1);
RAY_CHECK_OK(worker_.Objects().Put(return_value, id));
RAY_CHECK_OK(worker_->Objects().Put(return_value, id));
}
return Status::OK();
};

worker_ = std::unique_ptr<CoreWorker>(
zhijunfu marked this conversation as resolved.
Show resolved Hide resolved
new CoreWorker(WorkerType::WORKER, Language::PYTHON, store_socket, raylet_socket,
JobID::FromRandom(), executor_func));
}

void Run() {
// Start executing tasks.
worker_.Execution().Run(executor_func);
worker_->Execution().Run();
}

private:
CoreWorker worker_;
std::unique_ptr<CoreWorker> worker_;
};

} // namespace ray
Expand Down
88 changes: 43 additions & 45 deletions src/ray/core_worker/task_execution.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,65 +7,63 @@ namespace ray {

CoreWorkerTaskExecutionInterface::CoreWorkerTaskExecutionInterface(
WorkerContext &worker_context, std::unique_ptr<RayletClient> &raylet_client,
CoreWorkerObjectInterface &object_interface)
CoreWorkerObjectInterface &object_interface, const TaskExecutor &executor)
: worker_context_(worker_context),
object_interface_(object_interface),
execution_callback_(executor),
worker_server_("Worker", 0 /* let grpc choose port */),
main_work_(main_service_) {
RAY_CHECK(execution_callback_ != nullptr);

auto func = std::bind(&CoreWorkerTaskExecutionInterface::ExecuteTask, this,
std::placeholders::_1);
task_receivers_.emplace(
static_cast<int>(TaskTransportType::RAYLET),
std::unique_ptr<CoreWorkerRayletTaskReceiver>(new CoreWorkerRayletTaskReceiver(
raylet_client, main_service_, worker_server_)));
std::unique_ptr<CoreWorkerRayletTaskReceiver>(
new CoreWorkerRayletTaskReceiver(main_service_, worker_server_, func)));

// Start RPC server after all the task receivers are properly initialized.
worker_server_.Run();
}

Status CoreWorkerTaskExecutionInterface::Run(const TaskExecutor &executor) {
while (true) {
std::vector<TaskSpec> tasks;
auto status =
task_receivers_[static_cast<int>(TaskTransportType::RAYLET)]->GetTasks(&tasks);
if (!status.ok()) {
RAY_LOG(ERROR) << "Getting task failed with error: "
<< ray::Status::IOError(status.message());
return status;
}
Status CoreWorkerTaskExecutionInterface::ExecuteTask(
const raylet::TaskSpecification &spec) {
worker_context_.SetCurrentTask(spec);

for (const auto &task : tasks) {
const auto &spec = task.GetTaskSpecification();
worker_context_.SetCurrentTask(spec);

RayFunction func{spec.GetLanguage(), spec.FunctionDescriptor()};

std::vector<std::shared_ptr<RayObject>> args;
RAY_CHECK_OK(BuildArgsForExecutor(spec, &args));

TaskType task_type;
if (spec.IsActorCreationTask()) {
task_type = TaskType::ACTOR_CREATION_TASK;
} else if (spec.IsActorTask()) {
task_type = TaskType::ACTOR_TASK;
} else {
task_type = TaskType::NORMAL_TASK;
}

TaskInfo task_info{spec.TaskId(), spec.JobId(), task_type};

auto num_returns = spec.NumReturns();
if (spec.IsActorCreationTask() || spec.IsActorTask()) {
RAY_CHECK(num_returns > 0);
// Decrease to account for the dummy object id.
num_returns--;
}

status = executor(func, args, task_info, num_returns);
// TODO(zhijunfu):
// 1. Check and handle failure.
// 2. Save or load checkpoint.
}
RayFunction func{spec.GetLanguage(), spec.FunctionDescriptor()};

std::vector<std::shared_ptr<RayObject>> args;
RAY_CHECK_OK(BuildArgsForExecutor(spec, &args));

TaskType task_type;
if (spec.IsActorCreationTask()) {
task_type = TaskType::ACTOR_CREATION_TASK;
} else if (spec.IsActorTask()) {
task_type = TaskType::ACTOR_TASK;
} else {
task_type = TaskType::NORMAL_TASK;
}

TaskInfo task_info{spec.TaskId(), spec.JobId(), task_type};

auto num_returns = spec.NumReturns();
if (spec.IsActorCreationTask() || spec.IsActorTask()) {
RAY_CHECK(num_returns > 0);
// Decrease to account for the dummy object id.
num_returns--;
}

auto status = execution_callback_(func, args, task_info, num_returns);
// TODO(zhijunfu):
// 1. Check and handle failure.
// 2. Save or load checkpoint.
return status;
}

Status CoreWorkerTaskExecutionInterface::Run() {
zhijunfu marked this conversation as resolved.
Show resolved Hide resolved
// Run main IO service.
main_service_.run();

// should never reach here.
return Status::OK();
zhijunfu marked this conversation as resolved.
Show resolved Hide resolved
}
Expand Down
17 changes: 12 additions & 5 deletions src/ray/core_worker/task_execution.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@ class TaskSpecification;
/// execution.
class CoreWorkerTaskExecutionInterface {
public:
CoreWorkerTaskExecutionInterface(WorkerContext &worker_context,
std::unique_ptr<RayletClient> &raylet_client,
CoreWorkerObjectInterface &object_interface);

/// The callback provided app-language workers that executes tasks.
///
/// \param ray_function[in] Information about the function to execute.
Expand All @@ -37,9 +33,14 @@ class CoreWorkerTaskExecutionInterface {
const std::vector<std::shared_ptr<RayObject>> &args,
const TaskInfo &task_info, int num_returns)>;

CoreWorkerTaskExecutionInterface(WorkerContext &worker_context,
std::unique_ptr<RayletClient> &raylet_client,
CoreWorkerObjectInterface &object_interface,
const TaskExecutor &executor);

/// Start receving and executes tasks in a infinite loop.
/// \return Status.
Status Run(const TaskExecutor &executor);
Status Run();

private:
/// Build arguments for task executor. This would loop through all the arguments
Expand All @@ -53,11 +54,17 @@ class CoreWorkerTaskExecutionInterface {
Status BuildArgsForExecutor(const raylet::TaskSpecification &spec,
std::vector<std::shared_ptr<RayObject>> *args);

/// Execute a task.
Status ExecuteTask(const raylet::TaskSpecification &spec);

/// Reference to the parent CoreWorker's context.
WorkerContext &worker_context_;
/// Reference to the parent CoreWorker's objects interface.
CoreWorkerObjectInterface &object_interface_;

// Task execution callback.
TaskExecutor execution_callback_;

/// All the task task receivers supported.
std::unordered_map<int, std::unique_ptr<CoreWorkerTaskReceiver>> task_receivers_;

Expand Down
23 changes: 4 additions & 19 deletions src/ray/core_worker/transport/raylet_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,14 @@ CoreWorkerRayletTaskSubmitter::CoreWorkerRayletTaskSubmitter(
: raylet_client_(raylet_client) {}

Status CoreWorkerRayletTaskSubmitter::SubmitTask(const TaskSpec &task) {
RAY_CHECK(raylet_client_ != nullptr);
return raylet_client_->SubmitTask(task.GetDependencies(), task.GetTaskSpecification());
}

Status CoreWorkerRayletTaskReceiver::GetTasks(std::vector<TaskSpec> *tasks) {
std::unique_ptr<raylet::TaskSpecification> task_spec;
auto status = raylet_client_->GetTask(&task_spec);
if (!status.ok()) {
RAY_LOG(ERROR) << "Get task from raylet failed with error: "
<< ray::Status::IOError(status.message());
return status;
}

std::vector<ObjectID> dependencies;
RAY_CHECK((*tasks).empty());
(*tasks).emplace_back(*task_spec, dependencies);

return Status::OK();
}

CoreWorkerRayletTaskReceiver::CoreWorkerRayletTaskReceiver(
std::unique_ptr<RayletClient> &raylet_client, boost::asio::io_service &io_service,
rpc::GrpcServer &server)
: raylet_client_(raylet_client), task_service_(io_service, *this) {
boost::asio::io_service &io_service, rpc::GrpcServer &server,
const TaskHandler &task_handler)
: task_service_(io_service, *this), task_handler_(task_handler) {
server.RegisterService(task_service_);
}

Expand Down
18 changes: 4 additions & 14 deletions src/ray/core_worker/transport/raylet_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,9 @@ class CoreWorkerRayletTaskSubmitter : public CoreWorkerTaskSubmitter {
class CoreWorkerRayletTaskReceiver : public CoreWorkerTaskReceiver,
public rpc::WorkerTaskHandler {
public:
CoreWorkerRayletTaskReceiver(std::unique_ptr<RayletClient> &raylet_client,
boost::asio::io_service &io_service,
rpc::GrpcServer &server);
CoreWorkerRayletTaskReceiver(boost::asio::io_service &io_service,
rpc::GrpcServer &server, const TaskHandler &task_handler);

// Get tasks for execution from raylet.
virtual Status GetTasks(std::vector<TaskSpec> *tasks) override;

/// TODO(zhijunfu): This is currently unused. Later when we migrate from worker "get
/// task" to raylet "assign task", this method will be used and the `GetTask` above will
/// be removed.
///
/// Handle a `AssignTask` request.
/// The implementation can handle this request asynchronously. When hanling is done, the
/// `done_callback` should be called.
Expand All @@ -54,12 +46,10 @@ class CoreWorkerRayletTaskReceiver : public CoreWorkerTaskReceiver,
rpc::RequestDoneCallback done_callback) override;

private:
/// Raylet client.
std::unique_ptr<RayletClient> &raylet_client_;
/// The callback function to process a task.
TaskHandler task_handler_;
/// The rpc service for `WorkerTaskService`.
rpc::WorkerTaskGrpcService task_service_;
/// The callback function to process a task.
TaskHandler task_handler_;
};

} // namespace ray
Expand Down
3 changes: 0 additions & 3 deletions src/ray/core_worker/transport/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@ class CoreWorkerTaskSubmitter {
class CoreWorkerTaskReceiver {
public:
using TaskHandler = std::function<Status(const raylet::TaskSpecification &task_spec)>;

// Get tasks for execution.
virtual Status GetTasks(std::vector<TaskSpec> *tasks) = 0;
};

} // namespace ray
Expand Down
6 changes: 5 additions & 1 deletion src/ray/protobuf/worker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,16 @@ package ray.rpc;
message AssignTaskRequest {
// The ID of the task to be pushed.
bytes task_id = 1;
// The task to be pushed. This should include task_id.
// The task to be assigned. This should include task_id.
// TODO(hchen): Currently, `task_spec` are represented as
// flatbutters-serialized bytes. This is because the flatbuffers-defined Task data
// structure is being used in many places. We should move Task and all related data
// structures to protobuf.
bytes task_spec = 2;
// A list of the resources reserved for this worker.
// TODO(zhijunfu): `resource_ids` is represented as
// flatbutters-serialized bytes, will be moved to protobuf later.
bytes resource_ids = 3;
}

message AssignTaskReply {
Expand Down
Loading