Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
refactor(backup): move collect_backup_info to replica_backup_manager (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
zhao liwei authored Apr 8, 2020
1 parent 1571b8a commit de9bb3b
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 56 deletions.
58 changes: 57 additions & 1 deletion src/dist/replication/lib/backup/replica_backup_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ static bool get_policy_checkpoint_dirs(const std::string &dir,
// list sub dirs
std::vector<std::string> sub_dirs;
if (!utils::filesystem::get_subdirectories(dir, sub_dirs, false)) {
derror("list sub dirs of dir {} failed", dir.c_str());
derror_f("list sub dirs of dir {} failed", dir.c_str());
return false;
}

Expand All @@ -44,6 +44,15 @@ static bool get_policy_checkpoint_dirs(const std::string &dir,
return true;
}

replica_backup_manager::replica_backup_manager(replica *r) : replica_base(r), _replica(r) {}

replica_backup_manager::~replica_backup_manager()
{
if (_collect_info_timer != nullptr) {
_collect_info_timer->cancel(true);
}
}

void replica_backup_manager::on_clear_cold_backup(const backup_clear_request &request)
{
_replica->_checker.only_one_thread_access();
Expand Down Expand Up @@ -73,6 +82,53 @@ void replica_backup_manager::on_clear_cold_backup(const backup_clear_request &re
background_clear_backup_checkpoint(request.policy_name);
}

void replica_backup_manager::start_collect_backup_info()
{
if (_collect_info_timer == nullptr) {
_collect_info_timer =
tasking::enqueue_timer(LPC_PER_REPLICA_COLLECT_INFO_TIMER,
&_replica->_tracker,
[this]() { collect_backup_info(); },
std::chrono::milliseconds(_replica->options()->gc_interval_ms),
get_gpid().thread_hash());
}
}

void replica_backup_manager::collect_backup_info()
{
uint64_t cold_backup_running_count = 0;
uint64_t cold_backup_max_duration_time_ms = 0;
uint64_t cold_backup_max_upload_file_size = 0;
uint64_t now_ms = dsn_now_ms();

// collect backup info from all of the cold backup contexts
for (const auto &p : _replica->_cold_backup_contexts) {
const cold_backup_context_ptr &backup_context = p.second;
cold_backup_status backup_status = backup_context->status();
if (_replica->status() == partition_status::type::PS_PRIMARY) {
if (backup_status > ColdBackupInvalid && backup_status < ColdBackupCanceled) {
cold_backup_running_count++;
}
} else if (_replica->status() == partition_status::type::PS_SECONDARY) {
// secondary end backup with status ColdBackupCheckpointed
if (backup_status > ColdBackupInvalid && backup_status < ColdBackupCheckpointed) {
cold_backup_running_count++;
}
}

if (backup_status == ColdBackupUploading) {
cold_backup_max_duration_time_ms = std::max(
cold_backup_max_duration_time_ms, now_ms - backup_context->get_start_time_ms());
cold_backup_max_upload_file_size =
std::max(cold_backup_max_upload_file_size, backup_context->get_upload_file_size());
}
}

_replica->_cold_backup_running_count.store(cold_backup_running_count);
_replica->_cold_backup_max_duration_time_ms.store(cold_backup_max_duration_time_ms);
_replica->_cold_backup_max_upload_file_size.store(cold_backup_max_upload_file_size);
}

void replica_backup_manager::background_clear_backup_checkpoint(const std::string &policy_name)
{
ddebug_replica("schedule to clear all checkpoint dirs of policy({}) after {} minutes",
Expand Down
7 changes: 6 additions & 1 deletion src/dist/replication/lib/backup/replica_backup_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,20 @@ namespace replication {
class replica_backup_manager : replica_base
{
public:
replica_backup_manager(replica *r) : replica_base(r), _replica(r) {}
explicit replica_backup_manager(replica *r);
~replica_backup_manager();

void on_clear_cold_backup(const backup_clear_request &request);
void start_collect_backup_info();

private:
void clear_backup_checkpoint(const std::string &policy_name);
void send_clear_request_to_secondaries(const gpid &pid, const std::string &policy_name);
void background_clear_backup_checkpoint(const std::string &policy_name);
void collect_backup_info();

replica *_replica;
dsn::task_ptr _collect_info_timer;

friend class replica;
friend class replica_backup_manager_test;
Expand Down
7 changes: 1 addition & 6 deletions src/dist/replication/lib/replica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
#include "mutation_log.h"
#include "replica_stub.h"
#include "duplication/replica_duplicator_manager.h"
#include "dist/replication/lib/backup/replica_backup_manager.h"
#include "backup/replica_backup_manager.h"

#include <dsn/cpp/json_helper.h>
#include <dsn/dist/replication/replication_app_base.h>
Expand Down Expand Up @@ -368,11 +368,6 @@ void replica::close()
_checkpoint_timer = nullptr;
}

if (_collect_info_timer != nullptr) {
_collect_info_timer->cancel(true);
_collect_info_timer = nullptr;
}

if (_app != nullptr) {
_app->cancel_background_work(true);
}
Expand Down
3 changes: 0 additions & 3 deletions src/dist/replication/lib/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,6 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
void set_backup_context_pause();
void clear_cold_backup_state();

void collect_backup_info();

/////////////////////////////////////////////////////////////////
// replica restore from backup
bool read_cold_backup_metadata(const std::string &file, cold_backup_metadata &backup_metadata);
Expand Down Expand Up @@ -477,7 +475,6 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
partition_split_context _split_states;

// timer task that running in replication-thread
dsn::task_ptr _collect_info_timer;
std::atomic<uint64_t> _cold_backup_running_count;
std::atomic<uint64_t> _cold_backup_max_duration_time_ms;
std::atomic<uint64_t> _cold_backup_max_upload_file_size;
Expand Down
37 changes: 0 additions & 37 deletions src/dist/replication/lib/replica_backup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -703,42 +703,5 @@ void replica::set_backup_context_pause()
}

void replica::clear_cold_backup_state() { _cold_backup_contexts.clear(); }

void replica::collect_backup_info()
{
uint64_t cold_backup_running_count = 0;
uint64_t cold_backup_max_duration_time_ms = 0;
uint64_t cold_backup_max_upload_file_size = 0;
if (!_cold_backup_contexts.empty()) {
for (const auto &p : _cold_backup_contexts) {
const cold_backup_context_ptr &backup_context = p.second;
cold_backup_status backup_status = backup_context->status();
if (status() == partition_status::type::PS_PRIMARY) {
if (backup_status != ColdBackupInvalid && backup_status != ColdBackupCompleted &&
backup_status != ColdBackupCanceled && backup_status != ColdBackupFailed) {
cold_backup_running_count++;
}
} else if (status() == partition_status::type::PS_SECONDARY) {
if (backup_status != ColdBackupInvalid && backup_status != ColdBackupFailed &&
backup_status != ColdBackupCanceled &&
backup_status != ColdBackupCheckpointed) {
// secondary end backup with status ColdBackupCheckpointed
cold_backup_running_count++;
}
}

if (backup_status == ColdBackupUploading) {
cold_backup_max_duration_time_ms =
std::max(cold_backup_max_duration_time_ms,
(dsn_now_ms() - backup_context->get_start_time_ms()));
cold_backup_max_upload_file_size = std::max(cold_backup_max_upload_file_size,
backup_context->get_upload_file_size());
}
}
}
_cold_backup_running_count.store(cold_backup_running_count);
_cold_backup_max_duration_time_ms.store(cold_backup_max_duration_time_ms);
_cold_backup_max_upload_file_size.store(cold_backup_max_upload_file_size);
}
}
} // namespace
10 changes: 2 additions & 8 deletions src/dist/replication/lib/replica_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "mutation.h"
#include "mutation_log.h"
#include "replica_stub.h"
#include "backup/replica_backup_manager.h"
#include <dsn/utility/factory_store.h>
#include <dsn/utility/filesystem.h>
#include <dsn/dist/replication/replication_app_base.h>
Expand Down Expand Up @@ -351,14 +352,7 @@ error_code replica::init_app_and_prepare_list(bool create_new)
get_gpid().thread_hash());
}

if (_collect_info_timer == nullptr) {
_collect_info_timer =
tasking::enqueue_timer(LPC_PER_REPLICA_COLLECT_INFO_TIMER,
&_tracker,
[this]() { collect_backup_info(); },
std::chrono::milliseconds(_options->gc_interval_ms),
get_gpid().thread_hash());
}
_backup_mgr->start_collect_backup_info();
}
}

Expand Down

0 comments on commit de9bb3b

Please sign in to comment.