Skip to content

Commit

Permalink
Ordered execution of tasks per actor handle (#5664)
Browse files Browse the repository at this point in the history
  • Loading branch information
ericl authored Sep 15, 2019
1 parent a8888c5 commit 4979b8c
Show file tree
Hide file tree
Showing 10 changed files with 388 additions and 59 deletions.
10 changes: 10 additions & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,16 @@ cc_binary(
],
)

cc_test(
name = "scheduling_queue_test",
srcs = ["src/ray/core_worker/test/scheduling_queue_test.cc"],
copts = COPTS,
deps = [
":core_worker_lib",
"@com_google_googletest//:gtest_main",
],
)

cc_test(
name = "lineage_cache_test",
srcs = ["src/ray/raylet/lineage_cache_test.cc"],
Expand Down
15 changes: 14 additions & 1 deletion src/ray/core_worker/test/core_worker_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,7 @@ TEST_F(SingleNodeTest, TestDirectActorTaskSubmissionPerf) {
raylet_socket_names_[0], JobID::FromInt(1), gcs_options_, "",
nullptr);
std::unique_ptr<ActorHandle> actor_handle;
std::vector<ObjectID> object_ids;

// Test creating actor.
uint8_t array[] = {1, 2, 3};
Expand All @@ -715,11 +716,14 @@ TEST_F(SingleNodeTest, TestDirectActorTaskSubmissionPerf) {
30 * 1000 /* 30s */));
// Test submitting some tasks with by-value args for that actor.
int64_t start_ms = current_time_ms();
const int num_tasks = 10000;
const int num_tasks = 100000;
RAY_LOG(INFO) << "start submitting " << num_tasks << " tasks";
for (int i = 0; i < num_tasks; i++) {
// Create arguments with PassByValue.
std::vector<TaskArg> args;
int64_t array[] = {SHOULD_CHECK_MESSAGE_ORDER, i};
auto buffer = std::make_shared<LocalMemoryBuffer>(reinterpret_cast<uint8_t *>(array),
sizeof(array));
args.emplace_back(TaskArg::PassByValue(std::make_shared<RayObject>(buffer, nullptr)));

TaskOptions options{1, resources};
Expand All @@ -729,9 +733,18 @@ TEST_F(SingleNodeTest, TestDirectActorTaskSubmissionPerf) {
RAY_CHECK_OK(
driver.Tasks().SubmitActorTask(*actor_handle, func, args, options, &return_ids));
ASSERT_EQ(return_ids.size(), 1);
object_ids.emplace_back(return_ids[0]);
}
RAY_LOG(INFO) << "finish submitting " << num_tasks << " tasks"
<< ", which takes " << current_time_ms() - start_ms << " ms";

for (const auto &object_id : object_ids) {
std::vector<std::shared_ptr<RayObject>> results;
RAY_CHECK_OK(driver.Objects().Get({object_id}, -1, &results));
ASSERT_EQ(results.size(), 1);
}
RAY_LOG(INFO) << "finish executing " << num_tasks << " tasks"
<< ", which takes " << current_time_ms() - start_ms << " ms";
}

TEST_F(ZeroNodeTest, TestWorkerContext) {
Expand Down
11 changes: 11 additions & 0 deletions src/ray/core_worker/test/mock_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ class MockWorker {
auto &data = arg->GetData();
buffer.insert(buffer.end(), data->Data(), data->Data() + data->Size());
}
if (buffer.size() >= 8) {
auto int_arr = reinterpret_cast<int64_t *>(buffer.data());
if (int_arr[0] == SHOULD_CHECK_MESSAGE_ORDER) {
auto seq_no = int_arr[1];
if (seq_no > 0) {
RAY_CHECK(seq_no == prev_seq_no_ + 1) << seq_no << " vs " << prev_seq_no_;
}
prev_seq_no_ = seq_no;
}
}
auto memory_buffer =
std::make_shared<LocalMemoryBuffer>(buffer.data(), buffer.size(), true);

Expand All @@ -56,6 +66,7 @@ class MockWorker {
}

CoreWorker worker_;
int64_t prev_seq_no_ = 0;
};

} // namespace ray
Expand Down
81 changes: 81 additions & 0 deletions src/ray/core_worker/test/scheduling_queue_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
#include <thread>
#include "gtest/gtest.h"

#include "ray/core_worker/transport/direct_actor_transport.h"

namespace ray {

TEST(SchedulingQueueTest, TestInOrder) {
boost::asio::io_service io_service;
SchedulingQueue queue(io_service, 0);
int n_ok = 0;
int n_rej = 0;
auto fn_ok = [&n_ok]() { n_ok++; };
auto fn_rej = [&n_rej]() { n_rej++; };
queue.Add(0, -1, fn_ok, fn_rej);
queue.Add(1, -1, fn_ok, fn_rej);
queue.Add(2, -1, fn_ok, fn_rej);
queue.Add(3, -1, fn_ok, fn_rej);
io_service.run();
ASSERT_EQ(n_ok, 4);
ASSERT_EQ(n_rej, 0);
}

TEST(SchedulingQueueTest, TestOutOfOrder) {
boost::asio::io_service io_service;
SchedulingQueue queue(io_service, 0);
int n_ok = 0;
int n_rej = 0;
auto fn_ok = [&n_ok]() { n_ok++; };
auto fn_rej = [&n_rej]() { n_rej++; };
queue.Add(2, -1, fn_ok, fn_rej);
queue.Add(0, -1, fn_ok, fn_rej);
queue.Add(3, -1, fn_ok, fn_rej);
queue.Add(1, -1, fn_ok, fn_rej);
io_service.run();
ASSERT_EQ(n_ok, 4);
ASSERT_EQ(n_rej, 0);
}

TEST(SchedulingQueueTest, TestDepWaitTimeout) {
boost::asio::io_service io_service;
SchedulingQueue queue(io_service, 0);
int n_ok = 0;
int n_rej = 0;
auto fn_ok = [&n_ok]() { n_ok++; };
auto fn_rej = [&n_rej]() { n_rej++; };
queue.Add(2, -1, fn_ok, fn_rej);
queue.Add(0, -1, fn_ok, fn_rej);
queue.Add(3, -1, fn_ok, fn_rej);
ASSERT_EQ(n_ok, 1);
ASSERT_EQ(n_rej, 0);
io_service.run(); // immediately triggers timeout
ASSERT_EQ(n_ok, 1);
ASSERT_EQ(n_rej, 2);
queue.Add(4, -1, fn_ok, fn_rej);
queue.Add(5, -1, fn_ok, fn_rej);
ASSERT_EQ(n_ok, 3);
ASSERT_EQ(n_rej, 2);
}

TEST(SchedulingQueueTest, TestSkipAlreadyProcessedByClient) {
boost::asio::io_service io_service;
SchedulingQueue queue(io_service, 0);
int n_ok = 0;
int n_rej = 0;
auto fn_ok = [&n_ok]() { n_ok++; };
auto fn_rej = [&n_rej]() { n_rej++; };
queue.Add(2, 2, fn_ok, fn_rej);
queue.Add(3, 2, fn_ok, fn_rej);
queue.Add(1, 2, fn_ok, fn_rej);
io_service.run();
ASSERT_EQ(n_ok, 1);
ASSERT_EQ(n_rej, 2);
}

} // namespace ray

int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
95 changes: 54 additions & 41 deletions src/ray/core_worker/transport/direct_actor_transport.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

#include "ray/core_worker/transport/direct_actor_transport.h"
#include "ray/common/task/task.h"

Expand Down Expand Up @@ -68,7 +67,7 @@ Status CoreWorkerDirectActorTaskSubmitter::SubmitTask(

// Submit request.
auto &client = rpc_clients_[actor_id];
PushTask(*client, *request, actor_id, task_id, num_returns);
PushTask(*client, std::move(request), actor_id, task_id, num_returns);
return Status::OK();
} else {
// Actor is dead, treat the task as failure.
Expand Down Expand Up @@ -138,32 +137,31 @@ Status CoreWorkerDirectActorTaskSubmitter::SubscribeActorUpdates(

void CoreWorkerDirectActorTaskSubmitter::ConnectAndSendPendingTasks(
const ActorID &actor_id, std::string ip_address, int port) {
std::unique_ptr<rpc::DirectActorClient> grpc_client(
new rpc::DirectActorClient(ip_address, port, client_call_manager_));
std::shared_ptr<rpc::DirectActorClient> grpc_client =
rpc::DirectActorClient::make(ip_address, port, client_call_manager_);
RAY_CHECK(rpc_clients_.emplace(actor_id, std::move(grpc_client)).second);

// Submit all pending requests.
auto &client = rpc_clients_[actor_id];
auto &requests = pending_requests_[actor_id];
while (!requests.empty()) {
const auto &request = *requests.front();
PushTask(*client, request, actor_id,
TaskID::FromBinary(request.task_spec().task_id()),
request.task_spec().num_returns());
auto request = std::move(requests.front());
auto num_returns = request->task_spec().num_returns();
auto task_id = TaskID::FromBinary(request->task_spec().task_id());
PushTask(*client, std::move(request), actor_id, task_id, num_returns);
requests.pop_front();
}
}

void CoreWorkerDirectActorTaskSubmitter::PushTask(rpc::DirectActorClient &client,
const rpc::PushTaskRequest &request,
const ActorID &actor_id,
const TaskID &task_id,
int num_returns) {
void CoreWorkerDirectActorTaskSubmitter::PushTask(
rpc::DirectActorClient &client, std::unique_ptr<rpc::PushTaskRequest> request,
const ActorID &actor_id, const TaskID &task_id, int num_returns) {
RAY_LOG(DEBUG) << "Pushing task " << task_id << " to actor " << actor_id;
waiting_reply_tasks_[actor_id].insert(std::make_pair(task_id, num_returns));
auto status =
client.PushTask(request, [this, actor_id, task_id, num_returns](
Status status, const rpc::PushTaskReply &reply) {

auto status = client.PushTask(
std::move(request), [this, actor_id, task_id, num_returns](
Status status, const rpc::PushTaskReply &reply) {
{
std::unique_lock<std::mutex> guard(mutex_);
waiting_reply_tasks_[actor_id].erase(task_id);
Expand Down Expand Up @@ -230,6 +228,7 @@ CoreWorkerDirectActorTaskReceiver::CoreWorkerDirectActorTaskReceiver(
boost::asio::io_service &io_service, rpc::GrpcServer &server,
const TaskHandler &task_handler)
: worker_context_(worker_context),
io_service_(io_service),
object_interface_(object_interface),
task_service_(io_service, *this),
task_handler_(task_handler) {
Expand All @@ -253,33 +252,47 @@ void CoreWorkerDirectActorTaskReceiver::HandlePushTask(
return;
}

auto num_returns = task_spec.NumReturns();
RAY_CHECK(task_spec.IsActorCreationTask() || task_spec.IsActorTask());
RAY_CHECK(num_returns > 0);
// Decrease to account for the dummy object id.
num_returns--;

std::vector<std::shared_ptr<RayObject>> results;
auto status = task_handler_(task_spec, &results);
RAY_CHECK(results.size() == num_returns) << results.size() << " " << num_returns;

for (size_t i = 0; i < results.size(); i++) {
auto return_object = (*reply).add_return_objects();
ObjectID id = ObjectID::ForTaskReturn(
task_spec.TaskId(), /*index=*/i + 1,
/*transport_type=*/static_cast<int>(TaskTransportType::DIRECT_ACTOR));
return_object->set_object_id(id.Binary());
const auto &result = results[i];
if (result->HasData()) {
return_object->set_data(result->GetData()->Data(), result->GetData()->Size());
}
if (result->HasMetadata()) {
return_object->set_metadata(result->GetMetadata()->Data(),
result->GetMetadata()->Size());
}
auto it = scheduling_queue_.find(task_spec.ActorHandleId());
if (it == scheduling_queue_.end()) {
auto result = scheduling_queue_.emplace(
task_spec.ActorHandleId(),
std::unique_ptr<SchedulingQueue>(new SchedulingQueue(io_service_)));
it = result.first;
}
it->second->Add(
request.sequence_number(), request.client_processed_up_to(),
[this, reply, send_reply_callback, task_spec]() {
auto num_returns = task_spec.NumReturns();
RAY_CHECK(task_spec.IsActorCreationTask() || task_spec.IsActorTask());
RAY_CHECK(num_returns > 0);
// Decrease to account for the dummy object id.
num_returns--;

std::vector<std::shared_ptr<RayObject>> results;
auto status = task_handler_(task_spec, &results);
RAY_CHECK(results.size() == num_returns) << results.size() << " " << num_returns;

for (size_t i = 0; i < results.size(); i++) {
auto return_object = (*reply).add_return_objects();
ObjectID id = ObjectID::ForTaskReturn(
task_spec.TaskId(), /*index=*/i + 1,
/*transport_type=*/static_cast<int>(TaskTransportType::DIRECT_ACTOR));
return_object->set_object_id(id.Binary());
const auto &result = results[i];
if (result->GetData() != nullptr) {
return_object->set_data(result->GetData()->Data(), result->GetData()->Size());
}
if (result->GetMetadata() != nullptr) {
return_object->set_metadata(result->GetMetadata()->Data(),
result->GetMetadata()->Size());
}
}

send_reply_callback(status, nullptr, nullptr);
send_reply_callback(status, nullptr, nullptr);
},
[this, send_reply_callback]() {
send_reply_callback(Status::Invalid("client cancelled rpc"), nullptr, nullptr);
});
}

} // namespace ray
Loading

0 comments on commit 4979b8c

Please sign in to comment.