From c0ee1f611b016bef4b30ec0576ba80ea8ab1af70 Mon Sep 17 00:00:00 2001 From: HeYuchen <377710264@qq.com> Date: Wed, 13 May 2020 11:05:24 +0800 Subject: [PATCH] feat(bulk-load): meta server send bulk load request (#457) --- .../dsn/dist/replication/replication.codes.h | 1 + .../dsn/dist/replication/replication_types.h | 355 ++++++++ .../replication/common/replication_common.h | 1 + .../replication/common/replication_types.cpp | 838 ++++++++++++++++++ src/dist/replication/lib/replica.h | 18 + .../replication/lib/replica_bulk_load.cpp | 96 ++ src/dist/replication/lib/replica_context.h | 11 + src/dist/replication/lib/replica_stub.cpp | 13 + src/dist/replication/lib/replica_stub.h | 1 + .../meta_server/meta_bulk_load_service.cpp | 68 +- .../meta_server/meta_bulk_load_service.h | 27 + src/dist/replication/replication.thrift | 50 ++ .../unit_test/replica_bulk_load_test.cpp | 103 +++ 13 files changed, 1581 insertions(+), 1 deletion(-) create mode 100644 src/dist/replication/lib/replica_bulk_load.cpp create mode 100644 src/dist/replication/test/replica_test/unit_test/replica_bulk_load_test.cpp diff --git a/include/dsn/dist/replication/replication.codes.h b/include/dsn/dist/replication/replication.codes.h index baa3f9f054..9932b6477b 100644 --- a/include/dsn/dist/replication/replication.codes.h +++ b/include/dsn/dist/replication/replication.codes.h @@ -152,6 +152,7 @@ MAKE_EVENT_CODE(LPC_EXEC_COMMAND_ON_REPLICA, TASK_PRIORITY_LOW) MAKE_EVENT_CODE(LPC_PARTITION_SPLIT, TASK_PRIORITY_LOW) MAKE_EVENT_CODE(LPC_PARTITION_SPLIT_ERROR, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE_RPC(RPC_SPLIT_NOTIFY_CATCH_UP, TASK_PRIORITY_COMMON) +MAKE_EVENT_CODE_RPC(RPC_BULK_LOAD, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE(LPC_REPLICATION_LOW, TASK_PRIORITY_LOW) MAKE_EVENT_CODE(LPC_REPLICATION_COMMON, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE(LPC_REPLICATION_HIGH, TASK_PRIORITY_HIGH) diff --git a/include/dsn/dist/replication/replication_types.h b/include/dsn/dist/replication/replication_types.h index 8f02380e14..c06bda09f4 100644 --- a/include/dsn/dist/replication/replication_types.h +++ b/include/dsn/dist/replication/replication_types.h @@ -193,6 +193,19 @@ struct bulk_load_status extern const std::map _bulk_load_status_VALUES_TO_NAMES; +struct ingestion_status +{ + enum type + { + IS_INVALID = 0, + IS_RUNNING = 1, + IS_SUCCEED = 2, + IS_FAILED = 3 + }; +}; + +extern const std::map _ingestion_status_VALUES_TO_NAMES; + class mutation_header; class mutation_update; @@ -383,6 +396,12 @@ class start_bulk_load_request; class start_bulk_load_response; +class partition_bulk_load_state; + +class bulk_load_request; + +class bulk_load_response; + typedef struct _mutation_header__isset { _mutation_header__isset() @@ -6431,6 +6450,342 @@ inline std::ostream &operator<<(std::ostream &out, const start_bulk_load_respons obj.printTo(out); return out; } + +typedef struct _partition_bulk_load_state__isset +{ + _partition_bulk_load_state__isset() + : download_progress(true), + download_status(false), + ingest_status(true), + is_cleanuped(true), + is_paused(true) + { + } + bool download_progress : 1; + bool download_status : 1; + bool ingest_status : 1; + bool is_cleanuped : 1; + bool is_paused : 1; +} _partition_bulk_load_state__isset; + +class partition_bulk_load_state +{ +public: + partition_bulk_load_state(const partition_bulk_load_state &); + partition_bulk_load_state(partition_bulk_load_state &&); + partition_bulk_load_state &operator=(const partition_bulk_load_state &); + partition_bulk_load_state &operator=(partition_bulk_load_state &&); + partition_bulk_load_state() + : download_progress(0), + ingest_status((ingestion_status::type)0), + is_cleanuped(false), + is_paused(false) + { + ingest_status = (ingestion_status::type)0; + } + + virtual ~partition_bulk_load_state() throw(); + int32_t download_progress; + ::dsn::error_code download_status; + ingestion_status::type ingest_status; + bool is_cleanuped; + bool is_paused; + + _partition_bulk_load_state__isset __isset; + + void __set_download_progress(const int32_t val); + + void __set_download_status(const ::dsn::error_code &val); + + void __set_ingest_status(const ingestion_status::type val); + + void __set_is_cleanuped(const bool val); + + void __set_is_paused(const bool val); + + bool operator==(const partition_bulk_load_state &rhs) const + { + if (__isset.download_progress != rhs.__isset.download_progress) + return false; + else if (__isset.download_progress && !(download_progress == rhs.download_progress)) + return false; + if (__isset.download_status != rhs.__isset.download_status) + return false; + else if (__isset.download_status && !(download_status == rhs.download_status)) + return false; + if (__isset.ingest_status != rhs.__isset.ingest_status) + return false; + else if (__isset.ingest_status && !(ingest_status == rhs.ingest_status)) + return false; + if (__isset.is_cleanuped != rhs.__isset.is_cleanuped) + return false; + else if (__isset.is_cleanuped && !(is_cleanuped == rhs.is_cleanuped)) + return false; + if (__isset.is_paused != rhs.__isset.is_paused) + return false; + else if (__isset.is_paused && !(is_paused == rhs.is_paused)) + return false; + return true; + } + bool operator!=(const partition_bulk_load_state &rhs) const { return !(*this == rhs); } + + bool operator<(const partition_bulk_load_state &) const; + + uint32_t read(::apache::thrift::protocol::TProtocol *iprot); + uint32_t write(::apache::thrift::protocol::TProtocol *oprot) const; + + virtual void printTo(std::ostream &out) const; +}; + +void swap(partition_bulk_load_state &a, partition_bulk_load_state &b); + +inline std::ostream &operator<<(std::ostream &out, const partition_bulk_load_state &obj) +{ + obj.printTo(out); + return out; +} + +typedef struct _bulk_load_request__isset +{ + _bulk_load_request__isset() + : pid(false), + app_name(false), + primary_addr(false), + remote_provider_name(false), + cluster_name(false), + ballot(false), + meta_bulk_load_status(false), + query_bulk_load_metadata(false) + { + } + bool pid : 1; + bool app_name : 1; + bool primary_addr : 1; + bool remote_provider_name : 1; + bool cluster_name : 1; + bool ballot : 1; + bool meta_bulk_load_status : 1; + bool query_bulk_load_metadata : 1; +} _bulk_load_request__isset; + +class bulk_load_request +{ +public: + bulk_load_request(const bulk_load_request &); + bulk_load_request(bulk_load_request &&); + bulk_load_request &operator=(const bulk_load_request &); + bulk_load_request &operator=(bulk_load_request &&); + bulk_load_request() + : app_name(), + remote_provider_name(), + cluster_name(), + ballot(0), + meta_bulk_load_status((bulk_load_status::type)0), + query_bulk_load_metadata(0) + { + } + + virtual ~bulk_load_request() throw(); + ::dsn::gpid pid; + std::string app_name; + ::dsn::rpc_address primary_addr; + std::string remote_provider_name; + std::string cluster_name; + int64_t ballot; + bulk_load_status::type meta_bulk_load_status; + bool query_bulk_load_metadata; + + _bulk_load_request__isset __isset; + + void __set_pid(const ::dsn::gpid &val); + + void __set_app_name(const std::string &val); + + void __set_primary_addr(const ::dsn::rpc_address &val); + + void __set_remote_provider_name(const std::string &val); + + void __set_cluster_name(const std::string &val); + + void __set_ballot(const int64_t val); + + void __set_meta_bulk_load_status(const bulk_load_status::type val); + + void __set_query_bulk_load_metadata(const bool val); + + bool operator==(const bulk_load_request &rhs) const + { + if (!(pid == rhs.pid)) + return false; + if (!(app_name == rhs.app_name)) + return false; + if (!(primary_addr == rhs.primary_addr)) + return false; + if (!(remote_provider_name == rhs.remote_provider_name)) + return false; + if (!(cluster_name == rhs.cluster_name)) + return false; + if (!(ballot == rhs.ballot)) + return false; + if (!(meta_bulk_load_status == rhs.meta_bulk_load_status)) + return false; + if (!(query_bulk_load_metadata == rhs.query_bulk_load_metadata)) + return false; + return true; + } + bool operator!=(const bulk_load_request &rhs) const { return !(*this == rhs); } + + bool operator<(const bulk_load_request &) const; + + uint32_t read(::apache::thrift::protocol::TProtocol *iprot); + uint32_t write(::apache::thrift::protocol::TProtocol *oprot) const; + + virtual void printTo(std::ostream &out) const; +}; + +void swap(bulk_load_request &a, bulk_load_request &b); + +inline std::ostream &operator<<(std::ostream &out, const bulk_load_request &obj) +{ + obj.printTo(out); + return out; +} + +typedef struct _bulk_load_response__isset +{ + _bulk_load_response__isset() + : err(false), + pid(false), + app_name(false), + primary_bulk_load_status(false), + group_bulk_load_state(false), + metadata(false), + total_download_progress(false), + is_group_ingestion_finished(false), + is_group_bulk_load_context_cleaned(false), + is_group_bulk_load_paused(false) + { + } + bool err : 1; + bool pid : 1; + bool app_name : 1; + bool primary_bulk_load_status : 1; + bool group_bulk_load_state : 1; + bool metadata : 1; + bool total_download_progress : 1; + bool is_group_ingestion_finished : 1; + bool is_group_bulk_load_context_cleaned : 1; + bool is_group_bulk_load_paused : 1; +} _bulk_load_response__isset; + +class bulk_load_response +{ +public: + bulk_load_response(const bulk_load_response &); + bulk_load_response(bulk_load_response &&); + bulk_load_response &operator=(const bulk_load_response &); + bulk_load_response &operator=(bulk_load_response &&); + bulk_load_response() + : app_name(), + primary_bulk_load_status((bulk_load_status::type)0), + total_download_progress(0), + is_group_ingestion_finished(0), + is_group_bulk_load_context_cleaned(0), + is_group_bulk_load_paused(0) + { + } + + virtual ~bulk_load_response() throw(); + ::dsn::error_code err; + ::dsn::gpid pid; + std::string app_name; + bulk_load_status::type primary_bulk_load_status; + std::map<::dsn::rpc_address, partition_bulk_load_state> group_bulk_load_state; + bulk_load_metadata metadata; + int32_t total_download_progress; + bool is_group_ingestion_finished; + bool is_group_bulk_load_context_cleaned; + bool is_group_bulk_load_paused; + + _bulk_load_response__isset __isset; + + void __set_err(const ::dsn::error_code &val); + + void __set_pid(const ::dsn::gpid &val); + + void __set_app_name(const std::string &val); + + void __set_primary_bulk_load_status(const bulk_load_status::type val); + + void + __set_group_bulk_load_state(const std::map<::dsn::rpc_address, partition_bulk_load_state> &val); + + void __set_metadata(const bulk_load_metadata &val); + + void __set_total_download_progress(const int32_t val); + + void __set_is_group_ingestion_finished(const bool val); + + void __set_is_group_bulk_load_context_cleaned(const bool val); + + void __set_is_group_bulk_load_paused(const bool val); + + bool operator==(const bulk_load_response &rhs) const + { + if (!(err == rhs.err)) + return false; + if (!(pid == rhs.pid)) + return false; + if (!(app_name == rhs.app_name)) + return false; + if (!(primary_bulk_load_status == rhs.primary_bulk_load_status)) + return false; + if (!(group_bulk_load_state == rhs.group_bulk_load_state)) + return false; + if (__isset.metadata != rhs.__isset.metadata) + return false; + else if (__isset.metadata && !(metadata == rhs.metadata)) + return false; + if (__isset.total_download_progress != rhs.__isset.total_download_progress) + return false; + else if (__isset.total_download_progress && + !(total_download_progress == rhs.total_download_progress)) + return false; + if (__isset.is_group_ingestion_finished != rhs.__isset.is_group_ingestion_finished) + return false; + else if (__isset.is_group_ingestion_finished && + !(is_group_ingestion_finished == rhs.is_group_ingestion_finished)) + return false; + if (__isset.is_group_bulk_load_context_cleaned != + rhs.__isset.is_group_bulk_load_context_cleaned) + return false; + else if (__isset.is_group_bulk_load_context_cleaned && + !(is_group_bulk_load_context_cleaned == rhs.is_group_bulk_load_context_cleaned)) + return false; + if (__isset.is_group_bulk_load_paused != rhs.__isset.is_group_bulk_load_paused) + return false; + else if (__isset.is_group_bulk_load_paused && + !(is_group_bulk_load_paused == rhs.is_group_bulk_load_paused)) + return false; + return true; + } + bool operator!=(const bulk_load_response &rhs) const { return !(*this == rhs); } + + bool operator<(const bulk_load_response &) const; + + uint32_t read(::apache::thrift::protocol::TProtocol *iprot); + uint32_t write(::apache::thrift::protocol::TProtocol *oprot) const; + + virtual void printTo(std::ostream &out) const; +}; + +void swap(bulk_load_response &a, bulk_load_response &b); + +inline std::ostream &operator<<(std::ostream &out, const bulk_load_response &obj) +{ + obj.printTo(out); + return out; +} } } // namespace diff --git a/src/dist/replication/common/replication_common.h b/src/dist/replication/common/replication_common.h index 67d24474c9..6c844cda4f 100644 --- a/src/dist/replication/common/replication_common.h +++ b/src/dist/replication/common/replication_common.h @@ -36,6 +36,7 @@ typedef std::unordered_map<::dsn::rpc_address, partition_status::type> node_stat typedef std::unordered_map<::dsn::rpc_address, dsn::task_ptr> node_tasks; typedef rpc_holder start_bulk_load_rpc; +typedef rpc_holder bulk_load_rpc; class replication_options { diff --git a/src/dist/replication/common/replication_types.cpp b/src/dist/replication/common/replication_types.cpp index 985882a087..80d384503f 100644 --- a/src/dist/replication/common/replication_types.cpp +++ b/src/dist/replication/common/replication_types.cpp @@ -169,6 +169,15 @@ const std::map _bulk_load_status_VALUES_TO_NAMES( ::apache::thrift::TEnumIterator(9, _kbulk_load_statusValues, _kbulk_load_statusNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL)); +int _kingestion_statusValues[] = {ingestion_status::IS_INVALID, + ingestion_status::IS_RUNNING, + ingestion_status::IS_SUCCEED, + ingestion_status::IS_FAILED}; +const char *_kingestion_statusNames[] = {"IS_INVALID", "IS_RUNNING", "IS_SUCCEED", "IS_FAILED"}; +const std::map _ingestion_status_VALUES_TO_NAMES( + ::apache::thrift::TEnumIterator(4, _kingestion_statusValues, _kingestion_statusNames), + ::apache::thrift::TEnumIterator(-1, NULL, NULL)); + mutation_header::~mutation_header() throw() {} void mutation_header::__set_pid(const ::dsn::gpid &val) { this->pid = val; } @@ -15140,5 +15149,834 @@ void start_bulk_load_response::printTo(std::ostream &out) const << "hint_msg=" << to_string(hint_msg); out << ")"; } + +partition_bulk_load_state::~partition_bulk_load_state() throw() {} + +void partition_bulk_load_state::__set_download_progress(const int32_t val) +{ + this->download_progress = val; + __isset.download_progress = true; +} + +void partition_bulk_load_state::__set_download_status(const ::dsn::error_code &val) +{ + this->download_status = val; + __isset.download_status = true; +} + +void partition_bulk_load_state::__set_ingest_status(const ingestion_status::type val) +{ + this->ingest_status = val; + __isset.ingest_status = true; +} + +void partition_bulk_load_state::__set_is_cleanuped(const bool val) +{ + this->is_cleanuped = val; + __isset.is_cleanuped = true; +} + +void partition_bulk_load_state::__set_is_paused(const bool val) +{ + this->is_paused = val; + __isset.is_paused = true; +} + +uint32_t partition_bulk_load_state::read(::apache::thrift::protocol::TProtocol *iprot) +{ + + apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + while (true) { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) { + case 1: + if (ftype == ::apache::thrift::protocol::T_I32) { + xfer += iprot->readI32(this->download_progress); + this->__isset.download_progress = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 2: + if (ftype == ::apache::thrift::protocol::T_STRUCT) { + xfer += this->download_status.read(iprot); + this->__isset.download_status = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 3: + if (ftype == ::apache::thrift::protocol::T_I32) { + int32_t ecast656; + xfer += iprot->readI32(ecast656); + this->ingest_status = (ingestion_status::type)ecast656; + this->__isset.ingest_status = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 4: + if (ftype == ::apache::thrift::protocol::T_BOOL) { + xfer += iprot->readBool(this->is_cleanuped); + this->__isset.is_cleanuped = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 5: + if (ftype == ::apache::thrift::protocol::T_BOOL) { + xfer += iprot->readBool(this->is_paused); + this->__isset.is_paused = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + +uint32_t partition_bulk_load_state::write(::apache::thrift::protocol::TProtocol *oprot) const +{ + uint32_t xfer = 0; + apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot); + xfer += oprot->writeStructBegin("partition_bulk_load_state"); + + if (this->__isset.download_progress) { + xfer += oprot->writeFieldBegin("download_progress", ::apache::thrift::protocol::T_I32, 1); + xfer += oprot->writeI32(this->download_progress); + xfer += oprot->writeFieldEnd(); + } + if (this->__isset.download_status) { + xfer += oprot->writeFieldBegin("download_status", ::apache::thrift::protocol::T_STRUCT, 2); + xfer += this->download_status.write(oprot); + xfer += oprot->writeFieldEnd(); + } + if (this->__isset.ingest_status) { + xfer += oprot->writeFieldBegin("ingest_status", ::apache::thrift::protocol::T_I32, 3); + xfer += oprot->writeI32((int32_t)this->ingest_status); + xfer += oprot->writeFieldEnd(); + } + if (this->__isset.is_cleanuped) { + xfer += oprot->writeFieldBegin("is_cleanuped", ::apache::thrift::protocol::T_BOOL, 4); + xfer += oprot->writeBool(this->is_cleanuped); + xfer += oprot->writeFieldEnd(); + } + if (this->__isset.is_paused) { + xfer += oprot->writeFieldBegin("is_paused", ::apache::thrift::protocol::T_BOOL, 5); + xfer += oprot->writeBool(this->is_paused); + xfer += oprot->writeFieldEnd(); + } + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + +void swap(partition_bulk_load_state &a, partition_bulk_load_state &b) +{ + using ::std::swap; + swap(a.download_progress, b.download_progress); + swap(a.download_status, b.download_status); + swap(a.ingest_status, b.ingest_status); + swap(a.is_cleanuped, b.is_cleanuped); + swap(a.is_paused, b.is_paused); + swap(a.__isset, b.__isset); +} + +partition_bulk_load_state::partition_bulk_load_state(const partition_bulk_load_state &other657) +{ + download_progress = other657.download_progress; + download_status = other657.download_status; + ingest_status = other657.ingest_status; + is_cleanuped = other657.is_cleanuped; + is_paused = other657.is_paused; + __isset = other657.__isset; +} +partition_bulk_load_state::partition_bulk_load_state(partition_bulk_load_state &&other658) +{ + download_progress = std::move(other658.download_progress); + download_status = std::move(other658.download_status); + ingest_status = std::move(other658.ingest_status); + is_cleanuped = std::move(other658.is_cleanuped); + is_paused = std::move(other658.is_paused); + __isset = std::move(other658.__isset); +} +partition_bulk_load_state &partition_bulk_load_state:: +operator=(const partition_bulk_load_state &other659) +{ + download_progress = other659.download_progress; + download_status = other659.download_status; + ingest_status = other659.ingest_status; + is_cleanuped = other659.is_cleanuped; + is_paused = other659.is_paused; + __isset = other659.__isset; + return *this; +} +partition_bulk_load_state &partition_bulk_load_state:: +operator=(partition_bulk_load_state &&other660) +{ + download_progress = std::move(other660.download_progress); + download_status = std::move(other660.download_status); + ingest_status = std::move(other660.ingest_status); + is_cleanuped = std::move(other660.is_cleanuped); + is_paused = std::move(other660.is_paused); + __isset = std::move(other660.__isset); + return *this; +} +void partition_bulk_load_state::printTo(std::ostream &out) const +{ + using ::apache::thrift::to_string; + out << "partition_bulk_load_state("; + out << "download_progress="; + (__isset.download_progress ? (out << to_string(download_progress)) : (out << "")); + out << ", " + << "download_status="; + (__isset.download_status ? (out << to_string(download_status)) : (out << "")); + out << ", " + << "ingest_status="; + (__isset.ingest_status ? (out << to_string(ingest_status)) : (out << "")); + out << ", " + << "is_cleanuped="; + (__isset.is_cleanuped ? (out << to_string(is_cleanuped)) : (out << "")); + out << ", " + << "is_paused="; + (__isset.is_paused ? (out << to_string(is_paused)) : (out << "")); + out << ")"; +} + +bulk_load_request::~bulk_load_request() throw() {} + +void bulk_load_request::__set_pid(const ::dsn::gpid &val) { this->pid = val; } + +void bulk_load_request::__set_app_name(const std::string &val) { this->app_name = val; } + +void bulk_load_request::__set_primary_addr(const ::dsn::rpc_address &val) +{ + this->primary_addr = val; +} + +void bulk_load_request::__set_remote_provider_name(const std::string &val) +{ + this->remote_provider_name = val; +} + +void bulk_load_request::__set_cluster_name(const std::string &val) { this->cluster_name = val; } + +void bulk_load_request::__set_ballot(const int64_t val) { this->ballot = val; } + +void bulk_load_request::__set_meta_bulk_load_status(const bulk_load_status::type val) +{ + this->meta_bulk_load_status = val; +} + +void bulk_load_request::__set_query_bulk_load_metadata(const bool val) +{ + this->query_bulk_load_metadata = val; +} + +uint32_t bulk_load_request::read(::apache::thrift::protocol::TProtocol *iprot) +{ + + apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + while (true) { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) { + case 1: + if (ftype == ::apache::thrift::protocol::T_STRUCT) { + xfer += this->pid.read(iprot); + this->__isset.pid = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 2: + if (ftype == ::apache::thrift::protocol::T_STRING) { + xfer += iprot->readString(this->app_name); + this->__isset.app_name = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 3: + if (ftype == ::apache::thrift::protocol::T_STRUCT) { + xfer += this->primary_addr.read(iprot); + this->__isset.primary_addr = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 4: + if (ftype == ::apache::thrift::protocol::T_STRING) { + xfer += iprot->readString(this->remote_provider_name); + this->__isset.remote_provider_name = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 5: + if (ftype == ::apache::thrift::protocol::T_STRING) { + xfer += iprot->readString(this->cluster_name); + this->__isset.cluster_name = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 6: + if (ftype == ::apache::thrift::protocol::T_I64) { + xfer += iprot->readI64(this->ballot); + this->__isset.ballot = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 7: + if (ftype == ::apache::thrift::protocol::T_I32) { + int32_t ecast661; + xfer += iprot->readI32(ecast661); + this->meta_bulk_load_status = (bulk_load_status::type)ecast661; + this->__isset.meta_bulk_load_status = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 8: + if (ftype == ::apache::thrift::protocol::T_BOOL) { + xfer += iprot->readBool(this->query_bulk_load_metadata); + this->__isset.query_bulk_load_metadata = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + +uint32_t bulk_load_request::write(::apache::thrift::protocol::TProtocol *oprot) const +{ + uint32_t xfer = 0; + apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot); + xfer += oprot->writeStructBegin("bulk_load_request"); + + xfer += oprot->writeFieldBegin("pid", ::apache::thrift::protocol::T_STRUCT, 1); + xfer += this->pid.write(oprot); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("app_name", ::apache::thrift::protocol::T_STRING, 2); + xfer += oprot->writeString(this->app_name); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("primary_addr", ::apache::thrift::protocol::T_STRUCT, 3); + xfer += this->primary_addr.write(oprot); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("remote_provider_name", ::apache::thrift::protocol::T_STRING, 4); + xfer += oprot->writeString(this->remote_provider_name); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("cluster_name", ::apache::thrift::protocol::T_STRING, 5); + xfer += oprot->writeString(this->cluster_name); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("ballot", ::apache::thrift::protocol::T_I64, 6); + xfer += oprot->writeI64(this->ballot); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("meta_bulk_load_status", ::apache::thrift::protocol::T_I32, 7); + xfer += oprot->writeI32((int32_t)this->meta_bulk_load_status); + xfer += oprot->writeFieldEnd(); + + xfer += + oprot->writeFieldBegin("query_bulk_load_metadata", ::apache::thrift::protocol::T_BOOL, 8); + xfer += oprot->writeBool(this->query_bulk_load_metadata); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + +void swap(bulk_load_request &a, bulk_load_request &b) +{ + using ::std::swap; + swap(a.pid, b.pid); + swap(a.app_name, b.app_name); + swap(a.primary_addr, b.primary_addr); + swap(a.remote_provider_name, b.remote_provider_name); + swap(a.cluster_name, b.cluster_name); + swap(a.ballot, b.ballot); + swap(a.meta_bulk_load_status, b.meta_bulk_load_status); + swap(a.query_bulk_load_metadata, b.query_bulk_load_metadata); + swap(a.__isset, b.__isset); +} + +bulk_load_request::bulk_load_request(const bulk_load_request &other662) +{ + pid = other662.pid; + app_name = other662.app_name; + primary_addr = other662.primary_addr; + remote_provider_name = other662.remote_provider_name; + cluster_name = other662.cluster_name; + ballot = other662.ballot; + meta_bulk_load_status = other662.meta_bulk_load_status; + query_bulk_load_metadata = other662.query_bulk_load_metadata; + __isset = other662.__isset; +} +bulk_load_request::bulk_load_request(bulk_load_request &&other663) +{ + pid = std::move(other663.pid); + app_name = std::move(other663.app_name); + primary_addr = std::move(other663.primary_addr); + remote_provider_name = std::move(other663.remote_provider_name); + cluster_name = std::move(other663.cluster_name); + ballot = std::move(other663.ballot); + meta_bulk_load_status = std::move(other663.meta_bulk_load_status); + query_bulk_load_metadata = std::move(other663.query_bulk_load_metadata); + __isset = std::move(other663.__isset); +} +bulk_load_request &bulk_load_request::operator=(const bulk_load_request &other664) +{ + pid = other664.pid; + app_name = other664.app_name; + primary_addr = other664.primary_addr; + remote_provider_name = other664.remote_provider_name; + cluster_name = other664.cluster_name; + ballot = other664.ballot; + meta_bulk_load_status = other664.meta_bulk_load_status; + query_bulk_load_metadata = other664.query_bulk_load_metadata; + __isset = other664.__isset; + return *this; +} +bulk_load_request &bulk_load_request::operator=(bulk_load_request &&other665) +{ + pid = std::move(other665.pid); + app_name = std::move(other665.app_name); + primary_addr = std::move(other665.primary_addr); + remote_provider_name = std::move(other665.remote_provider_name); + cluster_name = std::move(other665.cluster_name); + ballot = std::move(other665.ballot); + meta_bulk_load_status = std::move(other665.meta_bulk_load_status); + query_bulk_load_metadata = std::move(other665.query_bulk_load_metadata); + __isset = std::move(other665.__isset); + return *this; +} +void bulk_load_request::printTo(std::ostream &out) const +{ + using ::apache::thrift::to_string; + out << "bulk_load_request("; + out << "pid=" << to_string(pid); + out << ", " + << "app_name=" << to_string(app_name); + out << ", " + << "primary_addr=" << to_string(primary_addr); + out << ", " + << "remote_provider_name=" << to_string(remote_provider_name); + out << ", " + << "cluster_name=" << to_string(cluster_name); + out << ", " + << "ballot=" << to_string(ballot); + out << ", " + << "meta_bulk_load_status=" << to_string(meta_bulk_load_status); + out << ", " + << "query_bulk_load_metadata=" << to_string(query_bulk_load_metadata); + out << ")"; +} + +bulk_load_response::~bulk_load_response() throw() {} + +void bulk_load_response::__set_err(const ::dsn::error_code &val) { this->err = val; } + +void bulk_load_response::__set_pid(const ::dsn::gpid &val) { this->pid = val; } + +void bulk_load_response::__set_app_name(const std::string &val) { this->app_name = val; } + +void bulk_load_response::__set_primary_bulk_load_status(const bulk_load_status::type val) +{ + this->primary_bulk_load_status = val; +} + +void bulk_load_response::__set_group_bulk_load_state( + const std::map<::dsn::rpc_address, partition_bulk_load_state> &val) +{ + this->group_bulk_load_state = val; +} + +void bulk_load_response::__set_metadata(const bulk_load_metadata &val) +{ + this->metadata = val; + __isset.metadata = true; +} + +void bulk_load_response::__set_total_download_progress(const int32_t val) +{ + this->total_download_progress = val; + __isset.total_download_progress = true; +} + +void bulk_load_response::__set_is_group_ingestion_finished(const bool val) +{ + this->is_group_ingestion_finished = val; + __isset.is_group_ingestion_finished = true; +} + +void bulk_load_response::__set_is_group_bulk_load_context_cleaned(const bool val) +{ + this->is_group_bulk_load_context_cleaned = val; + __isset.is_group_bulk_load_context_cleaned = true; +} + +void bulk_load_response::__set_is_group_bulk_load_paused(const bool val) +{ + this->is_group_bulk_load_paused = val; + __isset.is_group_bulk_load_paused = true; +} + +uint32_t bulk_load_response::read(::apache::thrift::protocol::TProtocol *iprot) +{ + + apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + while (true) { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) { + case 1: + if (ftype == ::apache::thrift::protocol::T_STRUCT) { + xfer += this->err.read(iprot); + this->__isset.err = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 2: + if (ftype == ::apache::thrift::protocol::T_STRUCT) { + xfer += this->pid.read(iprot); + this->__isset.pid = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 3: + if (ftype == ::apache::thrift::protocol::T_STRING) { + xfer += iprot->readString(this->app_name); + this->__isset.app_name = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 4: + if (ftype == ::apache::thrift::protocol::T_I32) { + int32_t ecast666; + xfer += iprot->readI32(ecast666); + this->primary_bulk_load_status = (bulk_load_status::type)ecast666; + this->__isset.primary_bulk_load_status = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 5: + if (ftype == ::apache::thrift::protocol::T_MAP) { + { + this->group_bulk_load_state.clear(); + uint32_t _size667; + ::apache::thrift::protocol::TType _ktype668; + ::apache::thrift::protocol::TType _vtype669; + xfer += iprot->readMapBegin(_ktype668, _vtype669, _size667); + uint32_t _i671; + for (_i671 = 0; _i671 < _size667; ++_i671) { + ::dsn::rpc_address _key672; + xfer += _key672.read(iprot); + partition_bulk_load_state &_val673 = this->group_bulk_load_state[_key672]; + xfer += _val673.read(iprot); + } + xfer += iprot->readMapEnd(); + } + this->__isset.group_bulk_load_state = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 6: + if (ftype == ::apache::thrift::protocol::T_STRUCT) { + xfer += this->metadata.read(iprot); + this->__isset.metadata = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 7: + if (ftype == ::apache::thrift::protocol::T_I32) { + xfer += iprot->readI32(this->total_download_progress); + this->__isset.total_download_progress = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 8: + if (ftype == ::apache::thrift::protocol::T_BOOL) { + xfer += iprot->readBool(this->is_group_ingestion_finished); + this->__isset.is_group_ingestion_finished = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 9: + if (ftype == ::apache::thrift::protocol::T_BOOL) { + xfer += iprot->readBool(this->is_group_bulk_load_context_cleaned); + this->__isset.is_group_bulk_load_context_cleaned = true; + } else { + xfer += iprot->skip(ftype); + } + break; + case 10: + if (ftype == ::apache::thrift::protocol::T_BOOL) { + xfer += iprot->readBool(this->is_group_bulk_load_paused); + this->__isset.is_group_bulk_load_paused = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + +uint32_t bulk_load_response::write(::apache::thrift::protocol::TProtocol *oprot) const +{ + uint32_t xfer = 0; + apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot); + xfer += oprot->writeStructBegin("bulk_load_response"); + + xfer += oprot->writeFieldBegin("err", ::apache::thrift::protocol::T_STRUCT, 1); + xfer += this->err.write(oprot); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("pid", ::apache::thrift::protocol::T_STRUCT, 2); + xfer += this->pid.write(oprot); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("app_name", ::apache::thrift::protocol::T_STRING, 3); + xfer += oprot->writeString(this->app_name); + xfer += oprot->writeFieldEnd(); + + xfer += + oprot->writeFieldBegin("primary_bulk_load_status", ::apache::thrift::protocol::T_I32, 4); + xfer += oprot->writeI32((int32_t)this->primary_bulk_load_status); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldBegin("group_bulk_load_state", ::apache::thrift::protocol::T_MAP, 5); + { + xfer += oprot->writeMapBegin(::apache::thrift::protocol::T_STRUCT, + ::apache::thrift::protocol::T_STRUCT, + static_cast(this->group_bulk_load_state.size())); + std::map<::dsn::rpc_address, partition_bulk_load_state>::const_iterator _iter674; + for (_iter674 = this->group_bulk_load_state.begin(); + _iter674 != this->group_bulk_load_state.end(); + ++_iter674) { + xfer += _iter674->first.write(oprot); + xfer += _iter674->second.write(oprot); + } + xfer += oprot->writeMapEnd(); + } + xfer += oprot->writeFieldEnd(); + + if (this->__isset.metadata) { + xfer += oprot->writeFieldBegin("metadata", ::apache::thrift::protocol::T_STRUCT, 6); + xfer += this->metadata.write(oprot); + xfer += oprot->writeFieldEnd(); + } + if (this->__isset.total_download_progress) { + xfer += + oprot->writeFieldBegin("total_download_progress", ::apache::thrift::protocol::T_I32, 7); + xfer += oprot->writeI32(this->total_download_progress); + xfer += oprot->writeFieldEnd(); + } + if (this->__isset.is_group_ingestion_finished) { + xfer += oprot->writeFieldBegin( + "is_group_ingestion_finished", ::apache::thrift::protocol::T_BOOL, 8); + xfer += oprot->writeBool(this->is_group_ingestion_finished); + xfer += oprot->writeFieldEnd(); + } + if (this->__isset.is_group_bulk_load_context_cleaned) { + xfer += oprot->writeFieldBegin( + "is_group_bulk_load_context_cleaned", ::apache::thrift::protocol::T_BOOL, 9); + xfer += oprot->writeBool(this->is_group_bulk_load_context_cleaned); + xfer += oprot->writeFieldEnd(); + } + if (this->__isset.is_group_bulk_load_paused) { + xfer += oprot->writeFieldBegin( + "is_group_bulk_load_paused", ::apache::thrift::protocol::T_BOOL, 10); + xfer += oprot->writeBool(this->is_group_bulk_load_paused); + xfer += oprot->writeFieldEnd(); + } + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + +void swap(bulk_load_response &a, bulk_load_response &b) +{ + using ::std::swap; + swap(a.err, b.err); + swap(a.pid, b.pid); + swap(a.app_name, b.app_name); + swap(a.primary_bulk_load_status, b.primary_bulk_load_status); + swap(a.group_bulk_load_state, b.group_bulk_load_state); + swap(a.metadata, b.metadata); + swap(a.total_download_progress, b.total_download_progress); + swap(a.is_group_ingestion_finished, b.is_group_ingestion_finished); + swap(a.is_group_bulk_load_context_cleaned, b.is_group_bulk_load_context_cleaned); + swap(a.is_group_bulk_load_paused, b.is_group_bulk_load_paused); + swap(a.__isset, b.__isset); +} + +bulk_load_response::bulk_load_response(const bulk_load_response &other675) +{ + err = other675.err; + pid = other675.pid; + app_name = other675.app_name; + primary_bulk_load_status = other675.primary_bulk_load_status; + group_bulk_load_state = other675.group_bulk_load_state; + metadata = other675.metadata; + total_download_progress = other675.total_download_progress; + is_group_ingestion_finished = other675.is_group_ingestion_finished; + is_group_bulk_load_context_cleaned = other675.is_group_bulk_load_context_cleaned; + is_group_bulk_load_paused = other675.is_group_bulk_load_paused; + __isset = other675.__isset; +} +bulk_load_response::bulk_load_response(bulk_load_response &&other676) +{ + err = std::move(other676.err); + pid = std::move(other676.pid); + app_name = std::move(other676.app_name); + primary_bulk_load_status = std::move(other676.primary_bulk_load_status); + group_bulk_load_state = std::move(other676.group_bulk_load_state); + metadata = std::move(other676.metadata); + total_download_progress = std::move(other676.total_download_progress); + is_group_ingestion_finished = std::move(other676.is_group_ingestion_finished); + is_group_bulk_load_context_cleaned = std::move(other676.is_group_bulk_load_context_cleaned); + is_group_bulk_load_paused = std::move(other676.is_group_bulk_load_paused); + __isset = std::move(other676.__isset); +} +bulk_load_response &bulk_load_response::operator=(const bulk_load_response &other677) +{ + err = other677.err; + pid = other677.pid; + app_name = other677.app_name; + primary_bulk_load_status = other677.primary_bulk_load_status; + group_bulk_load_state = other677.group_bulk_load_state; + metadata = other677.metadata; + total_download_progress = other677.total_download_progress; + is_group_ingestion_finished = other677.is_group_ingestion_finished; + is_group_bulk_load_context_cleaned = other677.is_group_bulk_load_context_cleaned; + is_group_bulk_load_paused = other677.is_group_bulk_load_paused; + __isset = other677.__isset; + return *this; +} +bulk_load_response &bulk_load_response::operator=(bulk_load_response &&other678) +{ + err = std::move(other678.err); + pid = std::move(other678.pid); + app_name = std::move(other678.app_name); + primary_bulk_load_status = std::move(other678.primary_bulk_load_status); + group_bulk_load_state = std::move(other678.group_bulk_load_state); + metadata = std::move(other678.metadata); + total_download_progress = std::move(other678.total_download_progress); + is_group_ingestion_finished = std::move(other678.is_group_ingestion_finished); + is_group_bulk_load_context_cleaned = std::move(other678.is_group_bulk_load_context_cleaned); + is_group_bulk_load_paused = std::move(other678.is_group_bulk_load_paused); + __isset = std::move(other678.__isset); + return *this; +} +void bulk_load_response::printTo(std::ostream &out) const +{ + using ::apache::thrift::to_string; + out << "bulk_load_response("; + out << "err=" << to_string(err); + out << ", " + << "pid=" << to_string(pid); + out << ", " + << "app_name=" << to_string(app_name); + out << ", " + << "primary_bulk_load_status=" << to_string(primary_bulk_load_status); + out << ", " + << "group_bulk_load_state=" << to_string(group_bulk_load_state); + out << ", " + << "metadata="; + (__isset.metadata ? (out << to_string(metadata)) : (out << "")); + out << ", " + << "total_download_progress="; + (__isset.total_download_progress ? (out << to_string(total_download_progress)) + : (out << "")); + out << ", " + << "is_group_ingestion_finished="; + (__isset.is_group_ingestion_finished ? (out << to_string(is_group_ingestion_finished)) + : (out << "")); + out << ", " + << "is_group_bulk_load_context_cleaned="; + (__isset.is_group_bulk_load_context_cleaned + ? (out << to_string(is_group_bulk_load_context_cleaned)) + : (out << "")); + out << ", " + << "is_group_bulk_load_paused="; + (__isset.is_group_bulk_load_paused ? (out << to_string(is_group_bulk_load_paused)) + : (out << "")); + out << ")"; +} } } // namespace diff --git a/src/dist/replication/lib/replica.h b/src/dist/replication/lib/replica.h index 25c52d4976..e96b3cb5bc 100644 --- a/src/dist/replication/lib/replica.h +++ b/src/dist/replication/lib/replica.h @@ -424,6 +424,22 @@ class replica : public serverlet, public ref_counter, public replica_ba void init_table_level_latency_counters(); + ///////////////////////////////////////////////////////////////// + // replica bulk load + void on_bulk_load(const bulk_load_request &request, /*out*/ bulk_load_response &response); + void broadcast_group_bulk_load(const bulk_load_request &meta_req); + + error_code do_bulk_load(const std::string &app_name, + bulk_load_status::type meta_status, + const std::string &cluster_name, + const std::string &provider_name); + + void report_bulk_load_states_to_meta(bulk_load_status::type remote_status, + bool report_metadata, + /*out*/ bulk_load_response &response); + + bulk_load_status::type get_bulk_load_status() { return _bulk_load_context._status; } + private: friend class ::dsn::replication::replication_checker; friend class ::dsn::replication::test::test_checker; @@ -437,6 +453,7 @@ class replica : public serverlet, public ref_counter, public replica_ba friend class replica_split_test; friend class replica_test; friend class replica_backup_manager; + friend class replica_bulk_load_test; // replica configuration, updated by update_local_configuration ONLY replica_configuration _config; @@ -484,6 +501,7 @@ class replica : public serverlet, public ref_counter, public replica_ba // policy_name --> cold_backup_context std::map _cold_backup_contexts; partition_split_context _split_states; + bulk_load_context _bulk_load_context; // timer task that running in replication-thread std::atomic _cold_backup_running_count; diff --git a/src/dist/replication/lib/replica_bulk_load.cpp b/src/dist/replication/lib/replica_bulk_load.cpp new file mode 100644 index 0000000000..eae1ec3b6c --- /dev/null +++ b/src/dist/replication/lib/replica_bulk_load.cpp @@ -0,0 +1,96 @@ +// Copyright (c) 2017-present, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#include "replica.h" +#include "replica_stub.h" + +#include + +#include +#include +#include +#include +#include + +namespace dsn { +namespace replication { + +// ThreadPool: THREAD_POOL_REPLICATION +void replica::on_bulk_load(const bulk_load_request &request, /*out*/ bulk_load_response &response) +{ + _checker.only_one_thread_access(); + + response.pid = request.pid; + response.app_name = request.app_name; + response.err = ERR_OK; + + if (status() != partition_status::PS_PRIMARY) { + dwarn_replica("receive bulk load request with wrong status {}", enum_to_string(status())); + response.err = ERR_INVALID_STATE; + return; + } + + if (request.ballot != get_ballot()) { + dwarn_replica( + "receive bulk load request with wrong version, remote ballot={}, local ballot={}", + request.ballot, + get_ballot()); + response.err = ERR_INVALID_STATE; + return; + } + + ddebug_replica( + "receive bulk load request, remote provider = {}, cluster_name = {}, app_name = {}, " + "meta_bulk_load_status = {}, local bulk_load_status = {}", + request.remote_provider_name, + request.cluster_name, + request.app_name, + enum_to_string(request.meta_bulk_load_status), + enum_to_string(get_bulk_load_status())); + + error_code ec = do_bulk_load(request.app_name, + request.meta_bulk_load_status, + request.cluster_name, + request.remote_provider_name); + if (ec != ERR_OK) { + response.err = ec; + response.primary_bulk_load_status = get_bulk_load_status(); + return; + } + + report_bulk_load_states_to_meta( + request.meta_bulk_load_status, request.query_bulk_load_metadata, response); + if (response.err != ERR_OK) { + return; + } + + broadcast_group_bulk_load(request); +} + +// ThreadPool: THREAD_POOL_REPLICATION +void replica::broadcast_group_bulk_load(const bulk_load_request &meta_req) +{ + // TODO(heyuchen): TBD +} + +// ThreadPool: THREAD_POOL_REPLICATION +error_code replica::do_bulk_load(const std::string &app_name, + bulk_load_status::type meta_status, + const std::string &cluster_name, + const std::string &provider_name) +{ + // TODO(heyuchen): TBD + return ERR_OK; +} + +// ThreadPool: THREAD_POOL_REPLICATION +void replica::report_bulk_load_states_to_meta(bulk_load_status::type remote_status, + bool report_metadata, + /*out*/ bulk_load_response &response) +{ + // TODO(heyuchen): TBD +} + +} // namespace replication +} // namespace dsn diff --git a/src/dist/replication/lib/replica_context.h b/src/dist/replication/lib/replica_context.h index f1a6f8c2b0..75c8c242e5 100644 --- a/src/dist/replication/lib/replica_context.h +++ b/src/dist/replication/lib/replica_context.h @@ -548,6 +548,17 @@ class partition_split_context dsn::task_ptr async_learn_task; }; +class bulk_load_context +{ +public: + // TODO(heyuchen): add public functions +private: + friend class replica; + friend class replica_bulk_load_test; + + bulk_load_status::type _status{bulk_load_status::BLS_INVALID}; +}; + //---------------inline impl---------------------------------------------------------------- inline partition_status::type primary_context::get_node_status(::dsn::rpc_address addr) const diff --git a/src/dist/replication/lib/replica_stub.cpp b/src/dist/replication/lib/replica_stub.cpp index a5851d041a..871e71e5ee 100644 --- a/src/dist/replication/lib/replica_stub.cpp +++ b/src/dist/replication/lib/replica_stub.cpp @@ -2049,6 +2049,7 @@ void replica_stub::open_service() register_rpc_handler(RPC_SPLIT_NOTIFY_CATCH_UP, "child_notify_catch_up", &replica_stub::on_notify_primary_split_catch_up); + register_rpc_handler(RPC_BULK_LOAD, "bulk_load", &replica_stub::on_bulk_load); _kill_partition_command = ::dsn::command_manager::instance().register_app_command( {"kill_partition"}, @@ -2633,5 +2634,17 @@ void replica_stub::update_disk_holding_replicas() } } +void replica_stub::on_bulk_load(const bulk_load_request &request, bulk_load_response &response) +{ + ddebug_f("[{}@{}]: receive bulk load request", request.pid, _primary_address_str); + replica_ptr rep = get_replica(request.pid); + if (rep != nullptr) { + rep->on_bulk_load(request, response); + } else { + derror_f("replica({}) is not existed", request.pid); + response.err = ERR_OBJECT_NOT_FOUND; + } +} + } // namespace replication } // namespace dsn diff --git a/src/dist/replication/lib/replica_stub.h b/src/dist/replication/lib/replica_stub.h index 458b18729d..495a0d8261 100644 --- a/src/dist/replication/lib/replica_stub.h +++ b/src/dist/replication/lib/replica_stub.h @@ -101,6 +101,7 @@ class replica_stub : public serverlet, public ref_counter /*out*/ query_app_info_response &resp); void on_cold_backup(const backup_request &request, /*out*/ backup_response &response); void on_clear_cold_backup(const backup_clear_request &request); + void on_bulk_load(const bulk_load_request &request, /*out*/ bulk_load_response &response); // // messages from peers (primary or secondary) diff --git a/src/dist/replication/meta_server/meta_bulk_load_service.cpp b/src/dist/replication/meta_server/meta_bulk_load_service.cpp index 333f06eaa1..6ca2167197 100644 --- a/src/dist/replication/meta_server/meta_bulk_load_service.cpp +++ b/src/dist/replication/meta_server/meta_bulk_load_service.cpp @@ -193,7 +193,7 @@ void bulk_load_service::create_app_bulk_load_dir(const std::string &app_name, int32_t partition_count, start_bulk_load_rpc rpc) { - const auto req = rpc.request(); + const auto &req = rpc.request(); app_bulk_load_info ainfo; ainfo.app_id = app_id; @@ -252,6 +252,72 @@ void bulk_load_service::create_partition_bulk_load_dir(const std::string &app_na // ThreadPool: THREAD_POOL_META_STATE void bulk_load_service::partition_bulk_load(const std::string &app_name, const gpid &pid) +{ + FAIL_POINT_INJECT_F("meta_bulk_load_partition_bulk_load", [](dsn::string_view) {}); + + rpc_address primary_addr; + ballot b; + { + zauto_read_lock l(app_lock()); + std::shared_ptr app = _state->get_app(pid.get_app_id()); + if (app == nullptr || app->status != app_status::AS_AVAILABLE) { + dwarn_f("app(name={}, id={}) is not existed, set bulk load failed", + app_name, + pid.get_app_id()); + handle_app_unavailable(pid.get_app_id(), app_name); + return; + } + primary_addr = app->partitions[pid.get_partition_index()].primary; + b = app->partitions[pid.get_partition_index()].ballot; + } + + if (primary_addr.is_invalid()) { + dwarn_f("app({}) partition({}) primary is invalid, try it later", app_name, pid); + tasking::enqueue(LPC_META_STATE_NORMAL, + _meta_svc->tracker(), + std::bind(&bulk_load_service::partition_bulk_load, this, app_name, pid), + 0, + std::chrono::seconds(1)); + return; + } + + zauto_read_lock l(_lock); + const app_bulk_load_info &ainfo = _app_bulk_load_info[pid.get_app_id()]; + auto req = make_unique(); + req->pid = pid; + req->app_name = app_name; + req->primary_addr = primary_addr; + req->remote_provider_name = ainfo.file_provider_type; + req->cluster_name = ainfo.cluster_name; + req->meta_bulk_load_status = get_partition_bulk_load_status_unlock(pid); + req->ballot = b; + req->query_bulk_load_metadata = is_partition_metadata_not_updated_unlock(pid); + + ddebug_f("send bulk load request to node({}), app({}), partition({}), partition " + "status = {}, remote provider = {}, cluster_name = {}", + primary_addr.to_string(), + app_name, + pid, + dsn::enum_to_string(req->meta_bulk_load_status), + req->remote_provider_name, + req->cluster_name); + + bulk_load_rpc rpc(std::move(req), RPC_BULK_LOAD, 0_ms, 0, pid.thread_hash()); + rpc.call(primary_addr, _meta_svc->tracker(), [this, rpc](error_code err) mutable { + on_partition_bulk_load_reply(err, rpc.request(), rpc.response()); + }); +} + +// ThreadPool: THREAD_POOL_META_STATE +void bulk_load_service::on_partition_bulk_load_reply(error_code err, + const bulk_load_request &request, + const bulk_load_response &response) +{ + // TODO(heyuchen): TBD +} + +// ThreadPool: THREAD_POOL_META_STATE +void bulk_load_service::handle_app_unavailable(int32_t app_id, const std::string &app_name) { // TODO(heyuchen): TBD } diff --git a/src/dist/replication/meta_server/meta_bulk_load_service.h b/src/dist/replication/meta_server/meta_bulk_load_service.h index 8578e675f9..5a65b1e855 100644 --- a/src/dist/replication/meta_server/meta_bulk_load_service.h +++ b/src/dist/replication/meta_server/meta_bulk_load_service.h @@ -107,6 +107,13 @@ class bulk_load_service void partition_bulk_load(const std::string &app_name, const gpid &pid); + void on_partition_bulk_load_reply(error_code err, + const bulk_load_request &request, + const bulk_load_response &response); + + // app not existed or not available during bulk load + void handle_app_unavailable(int32_t app_id, const std::string &app_name); + /// /// update bulk load states to remote storage functions /// @@ -175,6 +182,26 @@ class bulk_load_service return oss.str(); } + inline bool is_partition_metadata_not_updated_unlock(gpid pid) const + { + const auto &iter = _partition_bulk_load_info.find(pid); + if (iter == _partition_bulk_load_info.end()) { + return false; + } + const auto &metadata = iter->second.metadata; + return (metadata.files.size() == 0 && metadata.file_total_size == 0); + } + + inline bulk_load_status::type get_partition_bulk_load_status_unlock(gpid pid) const + { + const auto &iter = _partition_bulk_load_info.find(pid); + if (iter != _partition_bulk_load_info.end()) { + return iter->second.status; + } else { + return bulk_load_status::BLS_INVALID; + } + } + private: friend class bulk_load_service_test; diff --git a/src/dist/replication/replication.thrift b/src/dist/replication/replication.thrift index 8ead061d22..b1e5aa0af5 100644 --- a/src/dist/replication/replication.thrift +++ b/src/dist/replication/replication.thrift @@ -898,6 +898,14 @@ enum bulk_load_status BLS_CANCELED } +enum ingestion_status +{ + IS_INVALID, + IS_RUNNING, + IS_SUCCEED, + IS_FAILED +} + struct bulk_load_metadata { 1:list files; @@ -928,6 +936,48 @@ struct start_bulk_load_response 2:string hint_msg; } +struct partition_bulk_load_state +{ + 1:optional i32 download_progress = 0; + 2:optional dsn.error_code download_status; + 3:optional ingestion_status ingest_status = ingestion_status.IS_INVALID; + 4:optional bool is_cleanuped = false; + 5:optional bool is_paused = false; +} + +struct bulk_load_request +{ + 1:dsn.gpid pid; + 2:string app_name; + 3:dsn.rpc_address primary_addr; + 4:string remote_provider_name; + 5:string cluster_name; + 6:i64 ballot; + 7:bulk_load_status meta_bulk_load_status; + 8:bool query_bulk_load_metadata; +} + +struct bulk_load_response +{ + // Possible error: + // - ERR_OBJECT_NOT_FOUND: replica not found + // - ERR_INVALID_STATE: replica has invalid state + // - ERR_BUSY: node has enough replica executing bulk load downloading + // - ERR_FILE_OPERATION_FAILED: local file system error during bulk load downloading + // - ERR_FS_INTERNAL: remote file provider error during bulk load downloading + // - ERR_CORRUPTION: metadata corruption during bulk load downloading + 1:dsn.error_code err; + 2:dsn.gpid pid; + 3:string app_name; + 4:bulk_load_status primary_bulk_load_status; + 5:map group_bulk_load_state; + 6:optional bulk_load_metadata metadata; + 7:optional i32 total_download_progress; + 8:optional bool is_group_ingestion_finished; + 9:optional bool is_group_bulk_load_context_cleaned; + 10:optional bool is_group_bulk_load_paused; +} + /* service replica_s { diff --git a/src/dist/replication/test/replica_test/unit_test/replica_bulk_load_test.cpp b/src/dist/replication/test/replica_test/unit_test/replica_bulk_load_test.cpp new file mode 100644 index 0000000000..136951ab3c --- /dev/null +++ b/src/dist/replication/test/replica_test/unit_test/replica_bulk_load_test.cpp @@ -0,0 +1,103 @@ +// Copyright (c) 2017-present, Xiaomi, Inc. All rights reserved. +// This source code is licensed under the Apache License Version 2.0, which +// can be found in the LICENSE file in the root directory of this source tree. + +#include "replica_test_base.h" + +#include + +#include +#include + +namespace dsn { +namespace replication { + +class replica_bulk_load_test : public replica_test_base +{ +public: + replica_bulk_load_test() { _replica = create_mock_replica(stub.get()); } + + /// bulk load functions + + error_code test_on_bulk_load() + { + bulk_load_response resp; + _replica->on_bulk_load(_req, resp); + return resp.err; + } + + /// mock structure functions + + void + create_bulk_load_request(bulk_load_status::type status, ballot b, int32_t downloading_count = 0) + { + _req.app_name = APP_NAME; + _req.ballot = b; + _req.cluster_name = CLUSTER; + _req.meta_bulk_load_status = status; + _req.pid = PID; + _req.remote_provider_name = PROVIDER; + // TODO(heyuchen): set downloading_count in further pull request + } + + void create_bulk_load_request(bulk_load_status::type status, int32_t downloading_count = 0) + { + if (status != bulk_load_status::BLS_DOWNLOADING) { + downloading_count = 0; + } + create_bulk_load_request(status, BALLOT, downloading_count); + } + + void mock_replica_config(partition_status::type status) + { + replica_configuration rconfig; + rconfig.ballot = BALLOT; + rconfig.pid = PID; + rconfig.primary = PRIMARY; + rconfig.status = status; + _replica->_config = rconfig; + } + + void mock_primary_states() + { + mock_replica_config(partition_status::PS_PRIMARY); + partition_configuration config; + config.max_replica_count = 3; + config.pid = PID; + config.ballot = BALLOT; + config.primary = PRIMARY; + config.secondaries.emplace_back(SECONDARY); + config.secondaries.emplace_back(SECONDARY2); + _replica->_primary_states.membership = config; + } + +public: + std::unique_ptr _replica; + bulk_load_request _req; + + std::string APP_NAME = "replica"; + std::string CLUSTER = "cluster"; + std::string PROVIDER = "local_service"; + gpid PID = gpid(1, 0); + ballot BALLOT = 3; + rpc_address PRIMARY = rpc_address("127.0.0.2", 34801); + rpc_address SECONDARY = rpc_address("127.0.0.3", 34801); + rpc_address SECONDARY2 = rpc_address("127.0.0.4", 34801); +}; + +// on_bulk_load unit tests +TEST_F(replica_bulk_load_test, on_bulk_load_not_primary) +{ + create_bulk_load_request(bulk_load_status::BLS_DOWNLOADING); + ASSERT_EQ(test_on_bulk_load(), ERR_INVALID_STATE); +} + +TEST_F(replica_bulk_load_test, on_bulk_load_ballot_change) +{ + create_bulk_load_request(bulk_load_status::BLS_DOWNLOADING, BALLOT + 1); + mock_primary_states(); + ASSERT_EQ(test_on_bulk_load(), ERR_INVALID_STATE); +} + +} // namespace replication +} // namespace dsn