From 647fed761e5801a0bbe39c24553436fd06633b16 Mon Sep 17 00:00:00 2001 From: heyuchen Date: Thu, 14 May 2020 17:34:52 +0800 Subject: [PATCH 1/3] add replica start_download --- .../dsn/dist/replication/replication.codes.h | 1 + .../replication/common/replication_common.cpp | 10 ++ .../replication/common/replication_common.h | 3 + src/dist/replication/lib/replica.h | 61 +++++++- .../replication/lib/replica_bulk_load.cpp | 148 ++++++++++++++++++ src/dist/replication/lib/replica_context.cpp | 9 ++ src/dist/replication/lib/replica_context.h | 7 + src/dist/replication/lib/replica_stub.cpp | 35 ++++- src/dist/replication/lib/replica_stub.h | 5 + .../replica_test/unit_test/config-test.ini | 6 +- .../test/replica_test/unit_test/mock_utils.h | 6 + .../unit_test/replica_bulk_load_test.cpp | 57 ++++++- 12 files changed, 343 insertions(+), 5 deletions(-) diff --git a/include/dsn/dist/replication/replication.codes.h b/include/dsn/dist/replication/replication.codes.h index 9932b6477b..a3f86ed355 100644 --- a/include/dsn/dist/replication/replication.codes.h +++ b/include/dsn/dist/replication/replication.codes.h @@ -175,6 +175,7 @@ MAKE_EVENT_CODE(LPC_CATCHUP_WITH_PRIVATE_LOGS, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE(LPC_DISK_STAT, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE(LPC_BACKGROUND_COLD_BACKUP, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE(LPC_PARTITION_SPLIT_ASYNC_LEARN, TASK_PRIORITY_COMMON) +MAKE_EVENT_CODE(LPC_BACKGROUND_BULK_LOAD, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE(LPC_REPLICATION_LONG_LOW, TASK_PRIORITY_LOW) MAKE_EVENT_CODE(LPC_REPLICATION_LONG_COMMON, TASK_PRIORITY_COMMON) MAKE_EVENT_CODE(LPC_REPLICATION_LONG_HIGH, TASK_PRIORITY_HIGH) diff --git a/src/dist/replication/common/replication_common.cpp b/src/dist/replication/common/replication_common.cpp index 6b7d0a5840..262108c0cf 100644 --- a/src/dist/replication/common/replication_common.cpp +++ b/src/dist/replication/common/replication_common.cpp @@ -110,6 +110,8 @@ replication_options::replication_options() max_concurrent_uploading_file_count = 10; cold_backup_checkpoint_reserve_minutes = 10; + + max_concurrent_bulk_load_downloading_count = 5; } replication_options::~replication_options() {} @@ -518,6 +520,11 @@ void replication_options::initialize() "bulk_load_provider_root", "bulk_load_provider_root", "bulk load root on remote file provider"); + max_concurrent_bulk_load_downloading_count = + (int32_t)dsn_config_get_value_uint64("replication", + "max_concurrent_bulk_load_downloading_count", + max_concurrent_bulk_load_downloading_count, + "concurrent bulk load downloading replica count"); replica_helper::load_meta_servers(meta_servers); @@ -634,7 +641,10 @@ const std::string replica_envs::ROCKSDB_CHECKPOINT_RESERVE_TIME_SECONDS( const std::string replica_envs::ROCKSDB_ITERATION_THRESHOLD_TIME_MS( "replica.rocksdb_iteration_threshold_time_ms"); const std::string replica_envs::BUSINESS_INFO("business.info"); + const std::string bulk_load_constant::BULK_LOAD_INFO("bulk_load_info"); +const std::string bulk_load_constant::BULK_LOAD_METADATA("bulk_load_metadata"); +const std::string bulk_load_constant::BULK_LOAD_LOCAL_ROOT_DIR(".bulk_load"); namespace cold_backup { std::string get_policy_path(const std::string &root, const std::string &policy_name) diff --git a/src/dist/replication/common/replication_common.h b/src/dist/replication/common/replication_common.h index 6c844cda4f..3c268fedc9 100644 --- a/src/dist/replication/common/replication_common.h +++ b/src/dist/replication/common/replication_common.h @@ -118,6 +118,7 @@ class replication_options int32_t cold_backup_checkpoint_reserve_minutes; std::string bulk_load_provider_root; + int32_t max_concurrent_bulk_load_downloading_count; public: replication_options(); @@ -160,6 +161,8 @@ class bulk_load_constant { public: static const std::string BULK_LOAD_INFO; + static const std::string BULK_LOAD_METADATA; + static const std::string BULK_LOAD_LOCAL_ROOT_DIR; // TODO(heyuchen): add more constant in further pr }; diff --git a/src/dist/replication/lib/replica.h b/src/dist/replication/lib/replica.h index e96b3cb5bc..cb2e5caa29 100644 --- a/src/dist/replication/lib/replica.h +++ b/src/dist/replication/lib/replica.h @@ -434,11 +434,70 @@ class replica : public serverlet, public ref_counter, public replica_ba const std::string &cluster_name, const std::string &provider_name); + // replica start or restart download sst files from remote provider + // \return ERR_BUSY if node has already had enought replica executing downloading + // \return download errors by function `download_sst_files` + error_code bulk_load_start_download(const std::string &app_name, + const std::string &cluster_name, + const std::string &provider_name); + + // \return ERR_FILE_OPERATION_FAILED: create local bulk load dir failed + // \return download metadata file error, see function `do_download` + // \return parse metadata file error, see function `parse_bulk_load_metadata` + error_code download_sst_files(const std::string &app_name, + const std::string &cluster_name, + const std::string &provider_name); + + // download file from remote file system + // download_err = ERR_FILE_OPERATION_FAILED: local file system errors + // download_err = ERR_FS_INTERNAL: remote file system error + // download_err = ERR_CORRUPTION: file not exist or damaged or not pass verify + // if download file succeed, download_err = ERR_OK and set download_file_size + void do_download(const std::string &remote_dir, + const std::string &local_dir, + const std::string &file_name, + dist::block_service::block_filesystem *fs, + /*out*/ error_code &download_err, + /*out*/ uint64_t &download_file_size); + + // \return ERR_FILE_OPERATION_FAILED: file not exist, get size failed, open file failed + // \return ERR_CORRUPTION: parse failed + error_code parse_bulk_load_metadata(const std::string &fname, /*out*/ bulk_load_metadata &meta); + + bool verify_sst_files(const file_meta &f_meta, const std::string &local_dir); + void update_bulk_load_download_progress(uint64_t file_size, const std::string &file_name); + void try_decrease_bulk_load_download_count(); + + void clear_bulk_load_states(); + void report_bulk_load_states_to_meta(bulk_load_status::type remote_status, bool report_metadata, /*out*/ bulk_load_response &response); - bulk_load_status::type get_bulk_load_status() { return _bulk_load_context._status; } + /// + /// bulk load path on remote file provider: + /// ///{bulk_load_info} + /// //// + /// ////bulk_load_metadata + /// + // get partition's file dir on remote file provider + inline std::string get_remote_bulk_load_dir(const std::string &app_name, + const std::string &cluster_name, + uint32_t pidx) const + { + std::ostringstream oss; + oss << _options->bulk_load_provider_root << "/" << cluster_name << "/" << app_name << "/" + << pidx; + return oss.str(); + } + inline bulk_load_status::type get_bulk_load_status() const + { + return _bulk_load_context._status; + } + inline void set_bulk_load_status(bulk_load_status::type status) + { + _bulk_load_context._status = status; + } private: friend class ::dsn::replication::replication_checker; diff --git a/src/dist/replication/lib/replica_bulk_load.cpp b/src/dist/replication/lib/replica_bulk_load.cpp index eae1ec3b6c..34f35fb3df 100644 --- a/src/dist/replication/lib/replica_bulk_load.cpp +++ b/src/dist/replication/lib/replica_bulk_load.cpp @@ -84,6 +84,154 @@ error_code replica::do_bulk_load(const std::string &app_name, return ERR_OK; } +// ThreadPool: THREAD_POOL_REPLICATION +error_code replica::bulk_load_start_download(const std::string &app_name, + const std::string &cluster_name, + const std::string &provider_name) +{ + if (_stub->_bulk_load_downloading_count.load() >= + _stub->_max_concurrent_bulk_load_downloading_count) { + dwarn_replica("node[{}] already has {} replica downloading, wait for next round", + _stub->_primary_address_str, + _stub->_bulk_load_downloading_count.load()); + return ERR_BUSY; + } + + // reset local bulk load context and state + if (status() == partition_status::PS_PRIMARY) { + _primary_states.cleanup_bulk_load_states(); + } + clear_bulk_load_states(); + + set_bulk_load_status(bulk_load_status::BLS_DOWNLOADING); + _stub->_bulk_load_downloading_count.fetch_add(1); + // TODO(heyuchen): add perf-counter + + // start download + ddebug_replica("start to download sst files"); + error_code err = download_sst_files(app_name, cluster_name, provider_name); + if (err != ERR_OK) { + try_decrease_bulk_load_download_count(); + } + return err; +} + +// ThreadPool: THREAD_POOL_REPLICATION +error_code replica::download_sst_files(const std::string &app_name, + const std::string &cluster_name, + const std::string &provider_name) +{ + FAIL_POINT_INJECT_F("replica_bulk_load_download_sst_files", + [](string_view) -> error_code { return ERR_OK; }); + + // create local bulk load dir + const std::string local_dir = + utils::filesystem::path_combine(_dir, bulk_load_constant::BULK_LOAD_LOCAL_ROOT_DIR); + if (!utils::filesystem::directory_exists(_dir)) { + derror_replica("_dir({}) is not existed", _dir); + return ERR_FILE_OPERATION_FAILED; + } + if (!utils::filesystem::directory_exists(local_dir) && + !utils::filesystem::create_directory(local_dir)) { + derror_replica("create bulk_load_dir({}) failed", local_dir); + return ERR_FILE_OPERATION_FAILED; + } + + const std::string remote_dir = + get_remote_bulk_load_dir(app_name, cluster_name, get_gpid().get_partition_index()); + dist::block_service::block_filesystem *fs = + _stub->_block_service_manager.get_block_filesystem(provider_name); + + // download metadata file synchronously + error_code err = ERR_OK; + uint64_t file_size = 0; + do_download(remote_dir, local_dir, bulk_load_constant::BULK_LOAD_METADATA, fs, err, file_size); + if (err != ERR_OK) { + derror_replica("download bulk load metadata file failed, error = {}", err.to_string()); + return err; + } + + // parse metadata + const std::string &local_metadata_file_name = + utils::filesystem::path_combine(local_dir, bulk_load_constant::BULK_LOAD_METADATA); + err = parse_bulk_load_metadata(local_metadata_file_name, _bulk_load_context._metadata); + if (err != ERR_OK) { + derror_replica("parse bulk load metadata failed, error = {}", err.to_string()); + return err; + } + + // download sst files asynchronously + for (const auto &f_meta : _bulk_load_context._metadata.files) { + auto bulk_load_download_task = tasking::enqueue( + LPC_BACKGROUND_BULK_LOAD, &_tracker, [this, remote_dir, local_dir, f_meta, fs]() { + error_code ec = ERR_OK; + uint64_t f_size = 0; + do_download(remote_dir, local_dir, f_meta.name, fs, ec, f_size); + if (ec == ERR_OK && !verify_sst_files(f_meta, local_dir)) { + ec = ERR_CORRUPTION; + } + if (ec != ERR_OK) { + try_decrease_bulk_load_download_count(); + _bulk_load_context._download_status.store(ec); + derror_replica( + "failed to download file({}), error = {}", f_meta.name, ec.to_string()); + // TODO(heyuchen): add perf-counter + return; + } + // download file succeed, update progress + update_bulk_load_download_progress(f_size, f_meta.name); + // TODO(heyuchen): add perf-counter + }); + _bulk_load_context._download_task[f_meta.name] = bulk_load_download_task; + } + return err; +} + +// ThreadPool: THREAD_POOL_REPLICATION, THREAD_POOL_REPLICATION_LONG +void replica::do_download(const std::string &remote_dir, + const std::string &local_dir, + const std::string &file_name, + dist::block_service::block_filesystem *fs, + /*out*/ error_code &download_err, + /*out*/ uint64_t &download_file_size) +{ + // TODO(heyuchen): TBD +} + +// ThreadPool: THREAD_POOL_REPLICATION +error_code replica::parse_bulk_load_metadata(const std::string &fname, + /*out*/ bulk_load_metadata &meta) +{ + // TODO(heyuchen): TBD + return ERR_OK; +} + +// ThreadPool: THREAD_POOL_REPLICATION_LONG +bool replica::verify_sst_files(const file_meta &f_meta, const std::string &local_dir) +{ + // TODO(heyuchen): TBD + return true; +} + +// ThreadPool: THREAD_POOL_REPLICATION_LONG +void replica::update_bulk_load_download_progress(uint64_t file_size, const std::string &file_name) +{ + // TODO(heyuchen): TBD +} + +// ThreadPool: THREAD_POOL_REPLICATION, THREAD_POOL_REPLICATION_LONG +void replica::try_decrease_bulk_load_download_count() +{ + if (_stub->_bulk_load_downloading_count.load() > 0) { + _stub->_bulk_load_downloading_count.fetch_sub(1); + } +} + +void replica::clear_bulk_load_states() +{ + // TODO(heyuchen): TBD +} + // ThreadPool: THREAD_POOL_REPLICATION void replica::report_bulk_load_states_to_meta(bulk_load_status::type remote_status, bool report_metadata, diff --git a/src/dist/replication/lib/replica_context.cpp b/src/dist/replication/lib/replica_context.cpp index 3c96e443ca..e2177b7069 100644 --- a/src/dist/replication/lib/replica_context.cpp +++ b/src/dist/replication/lib/replica_context.cpp @@ -99,6 +99,8 @@ void primary_context::cleanup(bool clean_pending_mutations) caught_up_children.clear(); sync_send_write_request = false; + + cleanup_bulk_load_states(); } bool primary_context::is_cleaned() @@ -170,6 +172,13 @@ bool primary_context::check_exist(::dsn::rpc_address node, partition_status::typ } } +void primary_context::cleanup_bulk_load_states() +{ + // TODO(heyuchen): TBD + // primary will save bulk load states reported from secondaries, this function is to cleanup + // those states +} + bool secondary_context::cleanup(bool force) { CLEANUP_TASK(checkpoint_task, force) diff --git a/src/dist/replication/lib/replica_context.h b/src/dist/replication/lib/replica_context.h index 75c8c242e5..ee1bae17d1 100644 --- a/src/dist/replication/lib/replica_context.h +++ b/src/dist/replication/lib/replica_context.h @@ -73,6 +73,8 @@ class primary_context void do_cleanup_pending_mutations(bool clean_pending_mutations = true); + void cleanup_bulk_load_states(); + public: // membership mgr, including learners partition_configuration membership; @@ -557,6 +559,11 @@ class bulk_load_context friend class replica_bulk_load_test; bulk_load_status::type _status{bulk_load_status::BLS_INVALID}; + bulk_load_metadata _metadata; + + std::atomic _download_status{ERR_OK}; + // file_name -> downloading task + std::map _download_task; }; //---------------inline impl---------------------------------------------------------------- diff --git a/src/dist/replication/lib/replica_stub.cpp b/src/dist/replication/lib/replica_stub.cpp index 871e71e5ee..64de2435e8 100644 --- a/src/dist/replication/lib/replica_stub.cpp +++ b/src/dist/replication/lib/replica_stub.cpp @@ -69,6 +69,7 @@ replica_stub::replica_stub(replica_state_subscriber subscriber /*= nullptr*/, _query_compact_command(nullptr), _query_app_envs_command(nullptr), _useless_dir_reserve_seconds_command(nullptr), + _max_concurrent_bulk_load_downloading_count_command(nullptr), _deny_client(false), _verbose_client_log(false), _verbose_commit_log(false), @@ -76,8 +77,10 @@ replica_stub::replica_stub(replica_state_subscriber subscriber /*= nullptr*/, _gc_disk_garbage_replica_interval_seconds(3600), _release_tcmalloc_memory(false), _mem_release_max_reserved_mem_percentage(10), + _max_concurrent_bulk_load_downloading_count(5), _learn_app_concurrent_count(0), - _fs_manager(false) + _fs_manager(false), + _bulk_load_downloading_count(0) { #ifdef DSN_ENABLE_GPERF _release_tcmalloc_memory_command = nullptr; @@ -360,6 +363,8 @@ void replica_stub::initialize(const replication_options &opts, bool clear /* = f _gc_disk_garbage_replica_interval_seconds = _options.gc_disk_garbage_replica_interval_seconds; _release_tcmalloc_memory = _options.mem_release_enabled; _mem_release_max_reserved_mem_percentage = _options.mem_release_max_reserved_mem_percentage; + _max_concurrent_bulk_load_downloading_count = + _options.max_concurrent_bulk_load_downloading_count; // clear dirs if need if (clear) { @@ -2200,6 +2205,31 @@ void replica_stub::open_service() return result; }); #endif + _max_concurrent_bulk_load_downloading_count_command = + dsn::command_manager::instance().register_app_command( + {"max-concurrent-bulk-load-downloading-count"}, + "max-concurrent-bulk-load-downloading-count [num | DEFAULT]", + "control stub max_concurrent_bulk_load_downloading_count", + [this](const std::vector &args) { + std::string result("OK"); + if (args.empty()) { + result = "max_concurrent_bulk_load_downloading_count=" + + std::to_string(_max_concurrent_bulk_load_downloading_count); + } else { + if (args[0] == "DEFAULT") { + _max_concurrent_bulk_load_downloading_count = + _options.max_concurrent_bulk_load_downloading_count; + } else { + int32_t count = 0; + if (!dsn::buf2int32(args[0], count) || count <= 0) { + result = std::string("ERR: invalid arguments"); + } else { + _max_concurrent_bulk_load_downloading_count = count; + } + } + } + return result; + }); } std::string @@ -2329,6 +2359,8 @@ void replica_stub::close() dsn::command_manager::instance().deregister_command(_release_tcmalloc_memory_command); dsn::command_manager::instance().deregister_command(_max_reserved_memory_percentage_command); #endif + dsn::command_manager::instance().deregister_command( + _max_concurrent_bulk_load_downloading_count_command); _kill_partition_command = nullptr; _deny_client_command = nullptr; @@ -2342,6 +2374,7 @@ void replica_stub::close() _release_tcmalloc_memory_command = nullptr; _max_reserved_memory_percentage_command = nullptr; #endif + _max_concurrent_bulk_load_downloading_count_command = nullptr; if (_config_sync_timer_task != nullptr) { _config_sync_timer_task->cancel(true); diff --git a/src/dist/replication/lib/replica_stub.h b/src/dist/replication/lib/replica_stub.h index 495a0d8261..d74573f49e 100644 --- a/src/dist/replication/lib/replica_stub.h +++ b/src/dist/replication/lib/replica_stub.h @@ -333,6 +333,7 @@ class replica_stub : public serverlet, public ref_counter dsn_handle_t _release_tcmalloc_memory_command; dsn_handle_t _max_reserved_memory_percentage_command; #endif + dsn_handle_t _max_concurrent_bulk_load_downloading_count_command; bool _deny_client; bool _verbose_client_log; @@ -341,6 +342,7 @@ class replica_stub : public serverlet, public ref_counter int32_t _gc_disk_garbage_replica_interval_seconds; bool _release_tcmalloc_memory; int32_t _mem_release_max_reserved_mem_percentage; + int32_t _max_concurrent_bulk_load_downloading_count; // we limit LT_APP max concurrent count, because nfs service implementation is // too simple, it do not support priority. @@ -362,6 +364,9 @@ class replica_stub : public serverlet, public ref_counter // write body size exceed this threshold will be logged and reject, 0 means no check uint64_t _max_allowed_write_size; + // replica count exectuting bulk load downloading concurrently + std::atomic_int _bulk_load_downloading_count; + // performance counters perf_counter_wrapper _counter_replicas_count; perf_counter_wrapper _counter_replicas_opening_count; diff --git a/src/dist/replication/test/replica_test/unit_test/config-test.ini b/src/dist/replication/test/replica_test/unit_test/config-test.ini index 26d446d584..b4d14163c0 100644 --- a/src/dist/replication/test/replica_test/unit_test/config-test.ini +++ b/src/dist/replication/test/replica_test/unit_test/config-test.ini @@ -10,7 +10,7 @@ type = replica run = true count = 1 ports = 54321 -pools = THREAD_POOL_DEFAULT,THREAD_POOL_REPLICATION_LONG,THREAD_POOL_REPLICATION +pools = THREAD_POOL_DEFAULT,THREAD_POOL_REPLICATION_LONG,THREAD_POOL_REPLICATION,THREAD_POOL_LOCAL_SERVICE [core] ;tool = simulator @@ -58,6 +58,10 @@ worker_count = 3 [threadpool.THREAD_POOL_REPLICATION_LONG] name = replica_long +[threadpool.THREAD_POOL_LOCAL_SERVICE] +name = local_service +worker_count = 1 + [task..default] is_trace = true is_profile = true diff --git a/src/dist/replication/test/replica_test/unit_test/mock_utils.h b/src/dist/replication/test/replica_test/unit_test/mock_utils.h index 79f4445584..33eff788df 100644 --- a/src/dist/replication/test/replica_test/unit_test/mock_utils.h +++ b/src/dist/replication/test/replica_test/unit_test/mock_utils.h @@ -236,6 +236,12 @@ class mock_replica_stub : public replica_stub } void set_log(mutation_log_ptr log) { _log = log; } + + int32_t get_bulk_load_downloading_count() const { return _bulk_load_downloading_count.load(); } + void set_bulk_load_downloading_count(int32_t count) + { + _bulk_load_downloading_count.store(count); + } }; class mock_log_file : public log_file diff --git a/src/dist/replication/test/replica_test/unit_test/replica_bulk_load_test.cpp b/src/dist/replication/test/replica_test/unit_test/replica_bulk_load_test.cpp index 136951ab3c..dd475979c2 100644 --- a/src/dist/replication/test/replica_test/unit_test/replica_bulk_load_test.cpp +++ b/src/dist/replication/test/replica_test/unit_test/replica_bulk_load_test.cpp @@ -15,7 +15,13 @@ namespace replication { class replica_bulk_load_test : public replica_test_base { public: - replica_bulk_load_test() { _replica = create_mock_replica(stub.get()); } + replica_bulk_load_test() + { + _replica = create_mock_replica(stub.get()); + fail::setup(); + } + + ~replica_bulk_load_test() { fail::teardown(); } /// bulk load functions @@ -26,6 +32,11 @@ class replica_bulk_load_test : public replica_test_base return resp.err; } + error_code test_start_downloading() + { + return _replica->bulk_load_start_download(APP_NAME, CLUSTER, PROVIDER); + } + /// mock structure functions void @@ -37,7 +48,7 @@ class replica_bulk_load_test : public replica_test_base _req.meta_bulk_load_status = status; _req.pid = PID; _req.remote_provider_name = PROVIDER; - // TODO(heyuchen): set downloading_count in further pull request + stub->set_bulk_load_downloading_count(downloading_count); } void create_bulk_load_request(bulk_load_status::type status, int32_t downloading_count = 0) @@ -71,6 +82,12 @@ class replica_bulk_load_test : public replica_test_base _replica->_primary_states.membership = config; } + // helper functions + bulk_load_status::type get_bulk_load_status() const + { + return _replica->_bulk_load_context._status; + } + public: std::unique_ptr _replica; bulk_load_request _req; @@ -83,6 +100,7 @@ class replica_bulk_load_test : public replica_test_base rpc_address PRIMARY = rpc_address("127.0.0.2", 34801); rpc_address SECONDARY = rpc_address("127.0.0.3", 34801); rpc_address SECONDARY2 = rpc_address("127.0.0.4", 34801); + int32_t MAX_DOWNLOADING_COUNT = 5; }; // on_bulk_load unit tests @@ -99,5 +117,40 @@ TEST_F(replica_bulk_load_test, on_bulk_load_ballot_change) ASSERT_EQ(test_on_bulk_load(), ERR_INVALID_STATE); } +// start_downloading unit tests +TEST_F(replica_bulk_load_test, start_downloading_test) +{ + // Test cases: + // - stub concurrent downloading count excceed + // - downloading error + // - downloading succeed + struct test_struct + { + bool mock_function; + int32_t downloading_count; + error_code expected_err; + bulk_load_status::type expected_status; + int32_t expected_downloading_count; + } tests[]{{false, + MAX_DOWNLOADING_COUNT, + ERR_BUSY, + bulk_load_status::BLS_INVALID, + MAX_DOWNLOADING_COUNT}, + // TODO(heyuchen): add this test after implemtation do_download + /*{false, 1, ERR_CORRUPTION, bulk_load_status::BLS_DOWNLOADING, 1},*/ + {true, 1, ERR_OK, bulk_load_status::BLS_DOWNLOADING, 2}}; + + for (auto test : tests) { + if (test.mock_function) { + fail::cfg("replica_bulk_load_download_sst_files", "return()"); + } + create_bulk_load_request(bulk_load_status::BLS_DOWNLOADING, test.downloading_count); + + ASSERT_EQ(test_start_downloading(), test.expected_err); + ASSERT_EQ(get_bulk_load_status(), test.expected_status); + ASSERT_EQ(stub->get_bulk_load_downloading_count(), test.expected_downloading_count); + } +} + } // namespace replication } // namespace dsn From e2457fc7c6c438232f5499d40a40362f787f5efb Mon Sep 17 00:00:00 2001 From: heyuchen Date: Tue, 19 May 2020 15:07:09 +0800 Subject: [PATCH 2/3] small fix --- .../replication/common/replication_common.cpp | 18 ++++++---- src/dist/replication/lib/replica.h | 17 +++++---- .../replication/lib/replica_bulk_load.cpp | 35 ++++++++++--------- src/dist/replication/lib/replica_stub.cpp | 24 +++++++------ 4 files changed, 52 insertions(+), 42 deletions(-) diff --git a/src/dist/replication/common/replication_common.cpp b/src/dist/replication/common/replication_common.cpp index 5eae5c4d3f..c8bd62c25f 100644 --- a/src/dist/replication/common/replication_common.cpp +++ b/src/dist/replication/common/replication_common.cpp @@ -25,13 +25,22 @@ */ #include "replication_common.h" -#include + #include + #include +#include +#include namespace dsn { namespace replication { +DSN_DEFINE_int32("nfs", max_copy_rate_megabytes, 500, "max rate of copying from remote node(MB/s)"); +DSN_DEFINE_int32("replication", + max_concurrent_bulk_load_downloading_count, + 5, + "concurrent bulk load downloading replica count"); + /*extern*/ const char *partition_status_to_string(partition_status::type status) { auto it = _partition_status_VALUES_TO_NAMES.find(status); @@ -520,11 +529,8 @@ void replication_options::initialize() "bulk_load_provider_root", "bulk_load_provider_root", "bulk load root on remote file provider"); - max_concurrent_bulk_load_downloading_count = - (int32_t)dsn_config_get_value_uint64("replication", - "max_concurrent_bulk_load_downloading_count", - max_concurrent_bulk_load_downloading_count, - "concurrent bulk load downloading replica count"); + + max_concurrent_bulk_load_downloading_count = FLAGS_max_concurrent_bulk_load_downloading_count; replica_helper::load_meta_servers(meta_servers); diff --git a/src/dist/replication/lib/replica.h b/src/dist/replication/lib/replica.h index 84e67a1374..4853826d79 100644 --- a/src/dist/replication/lib/replica.h +++ b/src/dist/replication/lib/replica.h @@ -456,16 +456,15 @@ class replica : public serverlet, public ref_counter, public replica_ba const std::string &provider_name); // download files from remote file system - // download_err = ERR_FILE_OPERATION_FAILED: local file system error - // download_err = ERR_FS_INTERNAL: remote file system error - // download_err = ERR_CORRUPTION: file not exist or damaged + // \return ERR_FILE_OPERATION_FAILED: local file system error + // \return ERR_FS_INTERNAL: remote file system error + // \return ERR_CORRUPTION: file not exist or damaged // if download file succeed, download_err = ERR_OK and set download_file_size - void do_download(const std::string &remote_dir, - const std::string &local_dir, - const std::string &file_name, - dist::block_service::block_filesystem *fs, - /*out*/ error_code &download_err, - /*out*/ uint64_t &download_file_size); + error_code do_download(const std::string &remote_dir, + const std::string &local_dir, + const std::string &file_name, + dist::block_service::block_filesystem *fs, + /*out*/ uint64_t &download_file_size); // \return ERR_FILE_OPERATION_FAILED: file not exist, get size failed, open file failed // \return ERR_CORRUPTION: parse failed diff --git a/src/dist/replication/lib/replica_bulk_load.cpp b/src/dist/replication/lib/replica_bulk_load.cpp index 9ac157d6aa..61bf70dee7 100644 --- a/src/dist/replication/lib/replica_bulk_load.cpp +++ b/src/dist/replication/lib/replica_bulk_load.cpp @@ -214,7 +214,10 @@ error_code replica::bulk_load_start_download(const std::string &app_name, clear_bulk_load_states(); set_bulk_load_status(bulk_load_status::BLS_DOWNLOADING); - _stub->_bulk_load_downloading_count.fetch_add(1); + ++_stub->_bulk_load_downloading_count; + ddebug_replica("node[{}] has {} replica executing downloading", + _stub->_primary_address_str, + _stub->_bulk_load_downloading_count.load()); // TODO(heyuchen): add perf-counter // start download @@ -235,12 +238,12 @@ error_code replica::download_sst_files(const std::string &app_name, [](string_view) -> error_code { return ERR_OK; }); // create local bulk load dir - const std::string local_dir = - utils::filesystem::path_combine(_dir, bulk_load_constant::BULK_LOAD_LOCAL_ROOT_DIR); if (!utils::filesystem::directory_exists(_dir)) { derror_replica("_dir({}) is not existed", _dir); return ERR_FILE_OPERATION_FAILED; } + const std::string local_dir = + utils::filesystem::path_combine(_dir, bulk_load_constant::BULK_LOAD_LOCAL_ROOT_DIR); if (!utils::filesystem::directory_exists(local_dir) && !utils::filesystem::create_directory(local_dir)) { derror_replica("create bulk_load_dir({}) failed", local_dir); @@ -253,9 +256,9 @@ error_code replica::download_sst_files(const std::string &app_name, _stub->_block_service_manager.get_block_filesystem(provider_name); // download metadata file synchronously - error_code err = ERR_OK; uint64_t file_size = 0; - do_download(remote_dir, local_dir, bulk_load_constant::BULK_LOAD_METADATA, fs, err, file_size); + error_code err = + do_download(remote_dir, local_dir, bulk_load_constant::BULK_LOAD_METADATA, fs, file_size); if (err != ERR_OK) { derror_replica("download bulk load metadata file failed, error = {}", err.to_string()); return err; @@ -274,9 +277,8 @@ error_code replica::download_sst_files(const std::string &app_name, for (const auto &f_meta : _bulk_load_context._metadata.files) { auto bulk_load_download_task = tasking::enqueue( LPC_BACKGROUND_BULK_LOAD, &_tracker, [this, remote_dir, local_dir, f_meta, fs]() { - error_code ec = ERR_OK; uint64_t f_size = 0; - do_download(remote_dir, local_dir, f_meta.name, fs, ec, f_size); + error_code ec = do_download(remote_dir, local_dir, f_meta.name, fs, f_size); if (ec == ERR_OK && !verify_sst_files(f_meta, local_dir)) { ec = ERR_CORRUPTION; } @@ -298,16 +300,16 @@ error_code replica::download_sst_files(const std::string &app_name, } // ThreadPool: THREAD_POOL_REPLICATION, THREAD_POOL_REPLICATION_LONG -void replica::do_download(const std::string &remote_dir, - const std::string &local_dir, - const std::string &file_name, - dist::block_service::block_filesystem *fs, - /*out*/ error_code &download_err, - /*out*/ uint64_t &download_file_size) +error_code replica::do_download(const std::string &remote_dir, + const std::string &local_dir, + const std::string &file_name, + dist::block_service::block_filesystem *fs, + /*out*/ uint64_t &download_file_size) { // TODO(heyuchen): TBD // download files from remote provider // this function can also be used in restore + return ERR_OK; } // ThreadPool: THREAD_POOL_REPLICATION @@ -337,9 +339,10 @@ void replica::update_bulk_load_download_progress(uint64_t file_size, const std:: // ThreadPool: THREAD_POOL_REPLICATION, THREAD_POOL_REPLICATION_LONG void replica::try_decrease_bulk_load_download_count() { - if (_stub->_bulk_load_downloading_count.load() > 0) { - _stub->_bulk_load_downloading_count.fetch_sub(1); - } + --_stub->_bulk_load_downloading_count; + ddebug_replica("node[{}] has {} replica executing downloading", + _stub->_primary_address_str, + _stub->_bulk_load_downloading_count.load()); } void replica::clear_bulk_load_states() diff --git a/src/dist/replication/lib/replica_stub.cpp b/src/dist/replication/lib/replica_stub.cpp index 28b1bceda0..0f37bf390d 100644 --- a/src/dist/replication/lib/replica_stub.cpp +++ b/src/dist/replication/lib/replica_stub.cpp @@ -2216,18 +2216,20 @@ void replica_stub::open_service() if (args.empty()) { result = "max_concurrent_bulk_load_downloading_count=" + std::to_string(_max_concurrent_bulk_load_downloading_count); + return result; + } + + if (args[0] == "DEFAULT") { + _max_concurrent_bulk_load_downloading_count = + _options.max_concurrent_bulk_load_downloading_count; + return result; + } + + int32_t count = 0; + if (!dsn::buf2int32(args[0], count) || count <= 0) { + result = std::string("ERR: invalid arguments"); } else { - if (args[0] == "DEFAULT") { - _max_concurrent_bulk_load_downloading_count = - _options.max_concurrent_bulk_load_downloading_count; - } else { - int32_t count = 0; - if (!dsn::buf2int32(args[0], count) || count <= 0) { - result = std::string("ERR: invalid arguments"); - } else { - _max_concurrent_bulk_load_downloading_count = count; - } - } + _max_concurrent_bulk_load_downloading_count = count; } return result; }); From 4aeddca2e34ad44dfea82dc6d078860283756c36 Mon Sep 17 00:00:00 2001 From: heyuchen Date: Tue, 19 May 2020 15:11:07 +0800 Subject: [PATCH 3/3] fix --- src/dist/replication/common/replication_common.cpp | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/dist/replication/common/replication_common.cpp b/src/dist/replication/common/replication_common.cpp index c8bd62c25f..bc1da85f51 100644 --- a/src/dist/replication/common/replication_common.cpp +++ b/src/dist/replication/common/replication_common.cpp @@ -35,7 +35,6 @@ namespace dsn { namespace replication { -DSN_DEFINE_int32("nfs", max_copy_rate_megabytes, 500, "max rate of copying from remote node(MB/s)"); DSN_DEFINE_int32("replication", max_concurrent_bulk_load_downloading_count, 5, @@ -119,8 +118,6 @@ replication_options::replication_options() max_concurrent_uploading_file_count = 10; cold_backup_checkpoint_reserve_minutes = 10; - - max_concurrent_bulk_load_downloading_count = 5; } replication_options::~replication_options() {}