Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

fix(dup): don't GC by valid_start_offset during duplication & add app_name to replica_base #448

Merged
merged 5 commits into from
Apr 28, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions include/dsn/dist/replication/replica_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,26 @@ namespace replication {
/// Base class for types that are one-instance-per-replica.
struct replica_base
{
replica_base(gpid id, string_view name) : _gpid(id), _name(name) {}
replica_base(gpid id, string_view name, string_view app_name)
: _gpid(id), _name(name), _app_name(app_name)
{
}

explicit replica_base(replica_base *rhs) : replica_base(rhs->get_gpid(), rhs->replica_name()) {}
explicit replica_base(replica_base *rhs)
: replica_base(rhs->get_gpid(), rhs->replica_name(), rhs->_app_name)
{
}

gpid get_gpid() const { return _gpid; }

const char *replica_name() const { return _name.c_str(); }

const char *app_name() const { return _app_name.c_str(); }

private:
const gpid _gpid;
const std::string _name;
const std::string _app_name;
};

} // namespace replication
Expand Down
3 changes: 2 additions & 1 deletion src/dist/replication/lib/duplication/mutation_batch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ mutation_batch::mutation_batch(replica_duplicator *r) : replica_base(r)
// Prepend a special tag identifying this is a mutation_batch,
// so `dxxx_replica` logging in prepare_list will print along with its real caller.
// This helps for debugging.
replica_base base(r->get_gpid(), std::string("mutation_batch@") + r->replica_name());
replica_base base(
r->get_gpid(), std::string("mutation_batch@") + r->replica_name(), r->app_name());
_mutation_buffer =
make_unique<prepare_list>(&base, 0, PREPARE_LIST_NUM_ENTRIES, [this](mutation_ptr &mu) {
// committer
Expand Down
14 changes: 7 additions & 7 deletions src/dist/replication/lib/duplication/replica_duplicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,13 @@ void replica_duplicator::verify_start_decree(decree start_decree)
decree confirmed_decree = progress().confirmed_decree;
decree last_decree = progress().last_decree;
decree max_gced_decree = get_max_gced_decree();
dassert(max_gced_decree < start_decree,
"the logs haven't yet duplicated were accidentally truncated "
"[max_gced_decree: {}, start_decree: {}, confirmed_decree: {}, last_decree: {}]",
max_gced_decree,
start_decree,
confirmed_decree,
last_decree);
dassert_f(max_gced_decree < start_decree,
"the logs haven't yet duplicated were accidentally truncated "
"[max_gced_decree: {}, start_decree: {}, confirmed_decree: {}, last_decree: {}]",
max_gced_decree,
start_decree,
confirmed_decree,
last_decree);
}

decree replica_duplicator::get_max_gced_decree() const
Expand Down
43 changes: 19 additions & 24 deletions src/dist/replication/lib/mutation_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1201,7 +1201,7 @@ static bool should_reserve_file(log_file_ptr log,
}

int mutation_log::garbage_collection(gpid gpid,
decree durable_decree,
decree cleanable_decree,
int64_t valid_start_offset,
int64_t reserve_max_size,
int64_t reserve_max_time)
Expand Down Expand Up @@ -1252,27 +1252,25 @@ int mutation_log::garbage_collection(gpid gpid,

// log is invalid, ok to delete
else if (valid_start_offset >= log->end_offset()) {
dinfo("gc_private @ %d.%d: max_offset for %s is %" PRId64 " vs %" PRId64
" as app.valid_start_offset.private,"
" safe to delete this and all older logs",
_private_gpid.get_app_id(),
_private_gpid.get_partition_index(),
mark_it->second->path().c_str(),
mark_it->second->end_offset(),
valid_start_offset);
ddebug_f("gc_private @ {}: will remove files {} ~ log.{} because "
"valid_start_offset={} outdates log_end_offset={}",
_private_gpid,
files.begin()->second->path(),
log->index(),
valid_start_offset,
log->end_offset());
break;
}

// all decrees are durable, ok to delete
else if (durable_decree >= max_decree) {
dinfo("gc_private @ %d.%d: max_decree for %s is %" PRId64 " vs %" PRId64
" as app.durable decree,"
" safe to delete this and all older logs",
_private_gpid.get_app_id(),
_private_gpid.get_partition_index(),
mark_it->second->path().c_str(),
max_decree,
durable_decree);
// all mutations are cleanable, ok to delete
else if (cleanable_decree >= max_decree) {
ddebug_f("gc_private @ {}: will remove files {} ~ log.{} because "
"cleanable_decree={} outdates max_decree={}",
_private_gpid,
files.begin()->second->path(),
log->index(),
cleanable_decree,
max_decree);
break;
}

Expand All @@ -1296,7 +1294,7 @@ int mutation_log::garbage_collection(gpid gpid,
for (auto it = files.begin(); it != files.end() && it->second->index() <= largest_to_delete;
++it) {
log_file_ptr log = it->second;
dassert(it->first == log->index(), "%d VS %d", it->first, log->index());
dcheck_eq(it->first, log->index());

// close first
log->close();
Expand All @@ -1312,10 +1310,7 @@ int mutation_log::garbage_collection(gpid gpid,
}

// delete succeed
ddebug("gc_private @ %d.%d: log file %s is removed",
_private_gpid.get_app_id(),
_private_gpid.get_partition_index(),
fpath.c_str());
ddebug_f("gc_private @ {}: log file {} is removed", _private_gpid, fpath);
deleted++;

// erase from _log_files
Expand Down
3 changes: 2 additions & 1 deletion src/dist/replication/lib/mutation_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,8 @@ class mutation_log : public ref_counter
// Decree of the maximum garbage-collected mutation.
// For example, given mutations [20, 100], if [20, 50] is garbage-collected,
// the max_gced_decree=50.
// In production the mutations may not be ordered with the file-id. Given 3 log files:
// Under the real-world cases, the mutations may not be ordered with the file-id.
// Given 3 log files:
// #1:[20, 30], #2:[30, 50], #3:[10, 50]
// The third file is learned from primary of new epoch. Since it contains mutations smaller
// than the others, the max_gced_decree = 9.
Expand Down
2 changes: 1 addition & 1 deletion src/dist/replication/lib/replica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ namespace replication {
replica::replica(
replica_stub *stub, gpid gpid, const app_info &app, const char *dir, bool need_restore)
: serverlet<replica>("replica"),
replica_base(gpid, fmt::format("{}@{}", gpid, stub->_primary_address_str)),
replica_base(gpid, fmt::format("{}@{}", gpid, stub->_primary_address_str), app.app_name),
_app_info(app),
_primary_states(
gpid, stub->options().staleness_for_commit, stub->options().batch_write_disabled),
Expand Down
24 changes: 12 additions & 12 deletions src/dist/replication/lib/replica_chkpt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,12 @@ void replica::on_checkpoint_timer()
decree last_durable_decree = _app->last_durable_decree();
decree min_confirmed_decree = _duplication_mgr->min_confirmed_decree();
decree cleanable_decree = last_durable_decree;
int64_t valid_start_offset = _app->init_info().init_offset_in_private_log;

if (min_confirmed_decree >= 0) {
// Do not rely on valid_start_offset for GC during duplication.
// cleanable_decree is the only GC trigger.
valid_start_offset = 0;
if (min_confirmed_decree < last_durable_decree) {
ddebug_replica("gc_private {}: delay gc for duplication: min_confirmed_decree({}) "
"last_durable_decree({})",
Expand All @@ -87,20 +92,15 @@ void replica::on_checkpoint_timer()
min_confirmed_decree,
last_durable_decree);
}
} else {
// protect the logs from being truncated
// if this app is in duplication
if (is_duplicating()) {
// unsure if the logs can be dropped, because min_confirmed_decree
// is currently unavailable
ddebug_replica(
"gc_private {}: skip gc because confirmed duplication progress is unknown",
enum_to_string(status()));
return;
}
} else if (is_duplicating()) {
// unsure if the logs can be dropped, because min_confirmed_decree
// is currently unavailable
ddebug_replica(
"gc_private {}: skip gc because confirmed duplication progress is unknown",
enum_to_string(status()));
return;
}

int64_t valid_start_offset = _app->init_info().init_offset_in_private_log;
tasking::enqueue(LPC_GARBAGE_COLLECT_LOGS_AND_REPLICAS,
&_tracker,
[this, plog, cleanable_decree, valid_start_offset] {
Expand Down
5 changes: 0 additions & 5 deletions src/dist/replication/lib/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,11 +242,6 @@ void replica_stub::install_perf_counters()
"dup.pending_mutations_count",
COUNTER_TYPE_VOLATILE_NUMBER,
"number of mutations pending for duplication");
_counter_dup_time_lag.init_app_counter(
"eon.replica_stub",
"dup.time_lag(ms)",
COUNTER_TYPE_NUMBER_PERCENTILES,
"time (in ms) lag between master and slave in the duplication");

// <- Cold Backup Metrics ->

Expand Down
1 change: 0 additions & 1 deletion src/dist/replication/lib/replica_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,6 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
// if we need to duplicate to multiple clusters someday.
perf_counter_wrapper _counter_dup_confirmed_rate;
perf_counter_wrapper _counter_dup_pending_mutations_count;
perf_counter_wrapper _counter_dup_time_lag;

perf_counter_wrapper _counter_cold_backup_running_count;
perf_counter_wrapper _counter_cold_backup_recent_start_count;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,15 +307,55 @@ class mutation_log_test : public replica_test_base
ASSERT_EQ(std::string(lhs.data(), lhs.length()), std::string(rhs.data(), rhs.length()));
}

// return number of entries written
int generate_multiple_log_files(uint files_num = 3)
{
// decree ranges from [1, files_num*10)
for (int f = 0; f < files_num; f++) {
// each round mlog will replay the former logs, and create new file
mutation_log_ptr mlog = create_private_log();
for (int i = 1; i <= 10; i++) {
std::string msg = "hello!";
mutation_ptr mu = create_test_mutation(msg, 10 * f + i);
mlog->append(mu, LPC_AIO_IMMEDIATE_CALLBACK, nullptr, nullptr, 0);
}
mlog->tracker()->wait_outstanding_tasks();
mlog->close();
}
return static_cast<int>(files_num * 10);
}

mutation_log_ptr create_private_log() { return create_private_log(1); }

mutation_log_ptr create_private_log(int private_log_size_mb, decree replay_start_decree = 0)
{
gpid id = get_gpid();
std::map<gpid, decree> replay_condition;
replay_condition[id] = replay_start_decree;
mutation_log::replay_callback cb = [](int, mutation_ptr &) { return true; };
mutation_log_ptr mlog;

int try_cnt = 0;
while (try_cnt < 5) {
try_cnt++;
mlog = new mutation_log_private(
_replica->dir(), private_log_size_mb, id, _replica.get(), 1024, 512, 10000);
error_code err = mlog->open(cb, nullptr, replay_condition);
if (err == ERR_OK) {
break;
}
derror_f("mlog open failed, encountered error: {}", err);
}
EXPECT_NE(mlog, nullptr);
return mlog;
}

void test_replay_single_file(int num_entries)
{
std::vector<mutation_ptr> mutations;

{ // writing logs
mutation_log_ptr mlog = new mutation_log_private(
_log_dir, 1024, get_gpid(), _replica.get(), 1024, 512, 10000);

EXPECT_EQ(mlog->open(nullptr, nullptr), ERR_OK);
mutation_log_ptr mlog = create_private_log();

for (int i = 0; i < num_entries; i++) {
mutation_ptr mu = create_test_mutation("hello!", 2 + i);
Expand Down Expand Up @@ -354,10 +394,7 @@ class mutation_log_test : public replica_test_base
std::vector<mutation_ptr> mutations;

{ // writing logs
mutation_log_ptr mlog = new mutation_log_private(
_log_dir, private_log_file_size_mb, get_gpid(), _replica.get(), 1024, 512, 10000);
EXPECT_EQ(mlog->open(nullptr, nullptr), ERR_OK);

mutation_log_ptr mlog = create_private_log(private_log_file_size_mb);
for (int i = 0; i < num_entries; i++) {
mutation_ptr mu = create_test_mutation("hello!", 2 + i);
mutations.push_back(mu);
Expand All @@ -366,8 +403,7 @@ class mutation_log_test : public replica_test_base
}

{ // reading logs
mutation_log_ptr mlog =
new mutation_log_private(_log_dir, 4, get_gpid(), _replica.get(), 1024, 512, 10000);
mutation_log_ptr mlog = create_private_log(private_log_file_size_mb);

std::vector<std::string> log_files;
ASSERT_TRUE(utils::filesystem::get_subfiles(mlog->dir(), log_files, false));
Expand Down Expand Up @@ -411,10 +447,7 @@ TEST_F(mutation_log_test, open)
std::vector<mutation_ptr> mutations;

{ // writing logs
mutation_log_ptr mlog =
new mutation_log_private(_log_dir, 4, get_gpid(), _replica.get(), 1024, 512, 10000);

EXPECT_EQ(mlog->open(nullptr, nullptr), ERR_OK);
mutation_log_ptr mlog = create_private_log(4);

for (int i = 0; i < 1000; i++) {
mutation_ptr mu = create_test_mutation("hello!", 2 + i);
Expand Down Expand Up @@ -449,5 +482,18 @@ TEST_F(mutation_log_test, replay_multiple_files_20000_1mb) { test_replay_multipl

TEST_F(mutation_log_test, replay_multiple_files_50000_1mb) { test_replay_multiple_files(50000, 1); }

TEST_F(mutation_log_test, replay_start_decree)
{
// decree ranges from [1, 30)
generate_multiple_log_files(3);

decree replay_start_decree = 11; // start replay from second file, the first file is ignored.
mutation_log_ptr mlog = create_private_log(1, replay_start_decree);

// ensure the first file is not stripped out.
ASSERT_EQ(mlog->max_gced_decree(get_gpid()), 0);
ASSERT_EQ(mlog->get_log_file_map().size(), 3);
}

} // namespace replication
} // namespace dsn