Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat(bulk-load): bulk load download part1 - replica start download #465

Merged
merged 5 commits into from
May 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/dsn/dist/replication/replication.codes.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ MAKE_EVENT_CODE(LPC_CATCHUP_WITH_PRIVATE_LOGS, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_DISK_STAT, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_BACKGROUND_COLD_BACKUP, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_PARTITION_SPLIT_ASYNC_LEARN, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_BACKGROUND_BULK_LOAD, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_REPLICATION_LONG_LOW, TASK_PRIORITY_LOW)
MAKE_EVENT_CODE(LPC_REPLICATION_LONG_COMMON, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_REPLICATION_LONG_HIGH, TASK_PRIORITY_HIGH)
Expand Down
14 changes: 13 additions & 1 deletion src/dist/replication/common/replication_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,21 @@
*/

#include "replication_common.h"
#include <dsn/utility/filesystem.h>

#include <fstream>

#include <dsn/dist/replication/replica_envs.h>
#include <dsn/utility/flags.h>
#include <dsn/utility/filesystem.h>

namespace dsn {
namespace replication {

DSN_DEFINE_int32("replication",
max_concurrent_bulk_load_downloading_count,
5,
"concurrent bulk load downloading replica count");

/*extern*/ const char *partition_status_to_string(partition_status::type status)
{
auto it = _partition_status_VALUES_TO_NAMES.find(status);
Expand Down Expand Up @@ -519,6 +527,8 @@ void replication_options::initialize()
"bulk_load_provider_root",
"bulk load root on remote file provider");

max_concurrent_bulk_load_downloading_count = FLAGS_max_concurrent_bulk_load_downloading_count;

replica_helper::load_meta_servers(meta_servers);

sanity_check();
Expand Down Expand Up @@ -638,6 +648,8 @@ const std::string replica_envs::BUSINESS_INFO("business.info");
const std::string bulk_load_constant::BULK_LOAD_INFO("bulk_load_info");
const int32_t bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL = 10;
const int32_t bulk_load_constant::BULK_LOAD_REQUEST_SHORT_INTERVAL = 5;
const std::string bulk_load_constant::BULK_LOAD_METADATA("bulk_load_metadata");
const std::string bulk_load_constant::BULK_LOAD_LOCAL_ROOT_DIR(".bulk_load");

namespace cold_backup {
std::string get_policy_path(const std::string &root, const std::string &policy_name)
Expand Down
3 changes: 3 additions & 0 deletions src/dist/replication/common/replication_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ class replication_options
int32_t cold_backup_checkpoint_reserve_minutes;

std::string bulk_load_provider_root;
int32_t max_concurrent_bulk_load_downloading_count;

public:
replication_options();
Expand Down Expand Up @@ -162,6 +163,8 @@ class bulk_load_constant
static const std::string BULK_LOAD_INFO;
static const int32_t BULK_LOAD_REQUEST_INTERVAL;
static const int32_t BULK_LOAD_REQUEST_SHORT_INTERVAL;
static const std::string BULK_LOAD_METADATA;
static const std::string BULK_LOAD_LOCAL_ROOT_DIR;
// TODO(heyuchen): add more constant in further pr
};

Expand Down
65 changes: 63 additions & 2 deletions src/dist/replication/lib/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -439,14 +439,75 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
const std::string &cluster_name,
const std::string &provider_name);

// replica start or restart download sst files from remote provider
// \return ERR_BUSY if node has already had enough replica executing downloading
// \return download errors by function `download_sst_files`
error_code bulk_load_start_download(const std::string &app_name,
const std::string &cluster_name,
const std::string &provider_name);

// download metadata and sst files from remote provider
// metadata and sst files will be downloaded in {_dir}/.bulk_load directory
// \return ERR_FILE_OPERATION_FAILED: create local bulk load dir failed
// \return download metadata file error, see function `do_download`
// \return parse metadata file error, see function `parse_bulk_load_metadata`
error_code download_sst_files(const std::string &app_name,
const std::string &cluster_name,
const std::string &provider_name);

// download files from remote file system
// \return ERR_FILE_OPERATION_FAILED: local file system error
// \return ERR_FS_INTERNAL: remote file system error
// \return ERR_CORRUPTION: file not exist or damaged
// if download file succeed, download_err = ERR_OK and set download_file_size
error_code do_download(const std::string &remote_dir,
const std::string &local_dir,
const std::string &file_name,
dist::block_service::block_filesystem *fs,
/*out*/ uint64_t &download_file_size);

// \return ERR_FILE_OPERATION_FAILED: file not exist, get size failed, open file failed
// \return ERR_CORRUPTION: parse failed
error_code parse_bulk_load_metadata(const std::string &fname, /*out*/ bulk_load_metadata &meta);

bool verify_sst_files(const file_meta &f_meta, const std::string &local_dir);
void update_bulk_load_download_progress(uint64_t file_size, const std::string &file_name);
void try_decrease_bulk_load_download_count();

void clear_bulk_load_states();

void report_bulk_load_states_to_meta(bulk_load_status::type remote_status,
bool report_metadata,
/*out*/ bulk_load_response &response);

void report_bulk_load_states_to_primary(bulk_load_status::type remote_status,
/*out*/ group_bulk_load_response &response);

bulk_load_status::type get_bulk_load_status() { return _bulk_load_context._status; }
///
/// bulk load path on remote file provider:
/// <bulk_load_root>/<cluster_name>/<app_name>/{bulk_load_info}
/// <bulk_load_root>/<cluster_name>/<app_name>/<partition_index>/<file_name>
/// <bulk_load_root>/<cluster_name>/<app_name>/<partition_index>/bulk_load_metadata
///
// get partition's file dir on remote file provider
inline std::string get_remote_bulk_load_dir(const std::string &app_name,
const std::string &cluster_name,
uint32_t pidx) const
{
std::ostringstream oss;
oss << _options->bulk_load_provider_root << "/" << cluster_name << "/" << app_name << "/"
<< pidx;
return oss.str();
}

inline bulk_load_status::type get_bulk_load_status() const
{
return _bulk_load_context._status;
}

inline void set_bulk_load_status(bulk_load_status::type status)
{
_bulk_load_context._status = status;
}

private:
friend class ::dsn::replication::replication_checker;
Expand Down
157 changes: 157 additions & 0 deletions src/dist/replication/lib/replica_bulk_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,163 @@ error_code replica::do_bulk_load(const std::string &app_name,
return ERR_OK;
}

// ThreadPool: THREAD_POOL_REPLICATION
error_code replica::bulk_load_start_download(const std::string &app_name,
const std::string &cluster_name,
const std::string &provider_name)
{
if (_stub->_bulk_load_downloading_count.load() >=
_stub->_max_concurrent_bulk_load_downloading_count) {
dwarn_replica("node[{}] already has {} replica downloading, wait for next round",
_stub->_primary_address_str,
_stub->_bulk_load_downloading_count.load());
return ERR_BUSY;
}

// reset local bulk load context and state
if (status() == partition_status::PS_PRIMARY) {
_primary_states.cleanup_bulk_load_states();
}
clear_bulk_load_states();

set_bulk_load_status(bulk_load_status::BLS_DOWNLOADING);
++_stub->_bulk_load_downloading_count;
ddebug_replica("node[{}] has {} replica executing downloading",
_stub->_primary_address_str,
_stub->_bulk_load_downloading_count.load());
// TODO(heyuchen): add perf-counter

// start download
ddebug_replica("start to download sst files");
error_code err = download_sst_files(app_name, cluster_name, provider_name);
if (err != ERR_OK) {
try_decrease_bulk_load_download_count();
}
return err;
}

// ThreadPool: THREAD_POOL_REPLICATION
error_code replica::download_sst_files(const std::string &app_name,
const std::string &cluster_name,
const std::string &provider_name)
{
FAIL_POINT_INJECT_F("replica_bulk_load_download_sst_files",
[](string_view) -> error_code { return ERR_OK; });

// create local bulk load dir
if (!utils::filesystem::directory_exists(_dir)) {
acelyc111 marked this conversation as resolved.
Show resolved Hide resolved
derror_replica("_dir({}) is not existed", _dir);
return ERR_FILE_OPERATION_FAILED;
}
const std::string local_dir =
utils::filesystem::path_combine(_dir, bulk_load_constant::BULK_LOAD_LOCAL_ROOT_DIR);
if (!utils::filesystem::directory_exists(local_dir) &&
!utils::filesystem::create_directory(local_dir)) {
derror_replica("create bulk_load_dir({}) failed", local_dir);
return ERR_FILE_OPERATION_FAILED;
}

const std::string remote_dir =
get_remote_bulk_load_dir(app_name, cluster_name, get_gpid().get_partition_index());
dist::block_service::block_filesystem *fs =
_stub->_block_service_manager.get_block_filesystem(provider_name);

// download metadata file synchronously
uint64_t file_size = 0;
error_code err =
do_download(remote_dir, local_dir, bulk_load_constant::BULK_LOAD_METADATA, fs, file_size);
if (err != ERR_OK) {
derror_replica("download bulk load metadata file failed, error = {}", err.to_string());
return err;
}

// parse metadata
const std::string &local_metadata_file_name =
utils::filesystem::path_combine(local_dir, bulk_load_constant::BULK_LOAD_METADATA);
err = parse_bulk_load_metadata(local_metadata_file_name, _bulk_load_context._metadata);
if (err != ERR_OK) {
derror_replica("parse bulk load metadata failed, error = {}", err.to_string());
return err;
}

// download sst files asynchronously
for (const auto &f_meta : _bulk_load_context._metadata.files) {
auto bulk_load_download_task = tasking::enqueue(
LPC_BACKGROUND_BULK_LOAD, &_tracker, [this, remote_dir, local_dir, f_meta, fs]() {
uint64_t f_size = 0;
error_code ec = do_download(remote_dir, local_dir, f_meta.name, fs, f_size);
if (ec == ERR_OK && !verify_sst_files(f_meta, local_dir)) {
ec = ERR_CORRUPTION;
}
if (ec != ERR_OK) {
try_decrease_bulk_load_download_count();
_bulk_load_context._download_status.store(ec);
acelyc111 marked this conversation as resolved.
Show resolved Hide resolved
derror_replica(
"failed to download file({}), error = {}", f_meta.name, ec.to_string());
// TODO(heyuchen): add perf-counter
return;
}
// download file succeed, update progress
update_bulk_load_download_progress(f_size, f_meta.name);
// TODO(heyuchen): add perf-counter
});
_bulk_load_context._download_task[f_meta.name] = bulk_load_download_task;
}
return err;
}

// ThreadPool: THREAD_POOL_REPLICATION, THREAD_POOL_REPLICATION_LONG
error_code replica::do_download(const std::string &remote_dir,
const std::string &local_dir,
const std::string &file_name,
dist::block_service::block_filesystem *fs,
/*out*/ uint64_t &download_file_size)
{
hycdong marked this conversation as resolved.
Show resolved Hide resolved
// TODO(heyuchen): TBD
// download files from remote provider
// this function can also be used in restore
return ERR_OK;
}

// ThreadPool: THREAD_POOL_REPLICATION
error_code replica::parse_bulk_load_metadata(const std::string &fname,
/*out*/ bulk_load_metadata &meta)
{
// TODO(heyuchen): TBD
// read file and parse file content as bulk_load_metadata
return ERR_OK;
}

// ThreadPool: THREAD_POOL_REPLICATION_LONG
bool replica::verify_sst_files(const file_meta &f_meta, const std::string &local_dir)
{
// TODO(heyuchen): TBD
// compare sst file metadata calculated by file and parsed by metadata
return true;
}

// ThreadPool: THREAD_POOL_REPLICATION_LONG
void replica::update_bulk_load_download_progress(uint64_t file_size, const std::string &file_name)
{
// TODO(heyuchen): TBD
// update download progress after downloading sst files succeed
}

// ThreadPool: THREAD_POOL_REPLICATION, THREAD_POOL_REPLICATION_LONG
void replica::try_decrease_bulk_load_download_count()
{
--_stub->_bulk_load_downloading_count;
ddebug_replica("node[{}] has {} replica executing downloading",
_stub->_primary_address_str,
_stub->_bulk_load_downloading_count.load());
}

void replica::clear_bulk_load_states()
{
// TODO(heyuchen): TBD
// clear replica bulk load states and cleanup bulk load context
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica::report_bulk_load_states_to_meta(bulk_load_status::type remote_status,
bool report_metadata,
Expand Down
9 changes: 9 additions & 0 deletions src/dist/replication/lib/replica_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ void primary_context::cleanup(bool clean_pending_mutations)
caught_up_children.clear();

sync_send_write_request = false;

cleanup_bulk_load_states();
}

bool primary_context::is_cleaned()
Expand Down Expand Up @@ -152,6 +154,13 @@ bool primary_context::check_exist(::dsn::rpc_address node, partition_status::typ
}
}

void primary_context::cleanup_bulk_load_states()
{
// TODO(heyuchen): TBD
// primary will save bulk load states reported from secondaries, this function is to cleanup
// those states
}

bool secondary_context::cleanup(bool force)
{
CLEANUP_TASK(checkpoint_task, force)
Expand Down
7 changes: 7 additions & 0 deletions src/dist/replication/lib/replica_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ class primary_context

void do_cleanup_pending_mutations(bool clean_pending_mutations = true);

void cleanup_bulk_load_states();

public:
// membership mgr, including learners
partition_configuration membership;
Expand Down Expand Up @@ -585,6 +587,11 @@ class bulk_load_context
friend class replica_bulk_load_test;

bulk_load_status::type _status{bulk_load_status::BLS_INVALID};
bulk_load_metadata _metadata;

std::atomic<error_code> _download_status{ERR_OK};
// file_name -> downloading task
std::map<std::string, task_ptr> _download_task;
};

//---------------inline impl----------------------------------------------------------------
Expand Down
Loading