diff --git a/include/dsn/dist/replication/replica_base.h b/include/dsn/dist/replication/replica_base.h index 9b1a7fb88d..36fb87ddc5 100644 --- a/include/dsn/dist/replication/replica_base.h +++ b/include/dsn/dist/replication/replica_base.h @@ -35,17 +35,26 @@ namespace replication { /// Base class for types that are one-instance-per-replica. struct replica_base { - replica_base(gpid id, string_view name) : _gpid(id), _name(name) {} + replica_base(gpid id, string_view name, string_view app_name) + : _gpid(id), _name(name), _app_name(app_name) + { + } - explicit replica_base(replica_base *rhs) : replica_base(rhs->get_gpid(), rhs->replica_name()) {} + explicit replica_base(replica_base *rhs) + : replica_base(rhs->get_gpid(), rhs->replica_name(), rhs->_app_name) + { + } gpid get_gpid() const { return _gpid; } const char *replica_name() const { return _name.c_str(); } + const char *app_name() const { return _app_name.c_str(); } + private: const gpid _gpid; const std::string _name; + const std::string _app_name; }; } // namespace replication diff --git a/src/dist/replication/lib/duplication/mutation_batch.cpp b/src/dist/replication/lib/duplication/mutation_batch.cpp index 980db26afe..fcb5a2be30 100644 --- a/src/dist/replication/lib/duplication/mutation_batch.cpp +++ b/src/dist/replication/lib/duplication/mutation_batch.cpp @@ -80,7 +80,8 @@ mutation_batch::mutation_batch(replica_duplicator *r) : replica_base(r) // Prepend a special tag identifying this is a mutation_batch, // so `dxxx_replica` logging in prepare_list will print along with its real caller. // This helps for debugging. - replica_base base(r->get_gpid(), std::string("mutation_batch@") + r->replica_name()); + replica_base base( + r->get_gpid(), std::string("mutation_batch@") + r->replica_name(), r->app_name()); _mutation_buffer = make_unique(&base, 0, PREPARE_LIST_NUM_ENTRIES, [this](mutation_ptr &mu) { // committer diff --git a/src/dist/replication/lib/duplication/replica_duplicator.cpp b/src/dist/replication/lib/duplication/replica_duplicator.cpp index 59af3022d5..5a15cf6988 100644 --- a/src/dist/replication/lib/duplication/replica_duplicator.cpp +++ b/src/dist/replication/lib/duplication/replica_duplicator.cpp @@ -154,13 +154,13 @@ void replica_duplicator::verify_start_decree(decree start_decree) decree confirmed_decree = progress().confirmed_decree; decree last_decree = progress().last_decree; decree max_gced_decree = get_max_gced_decree(); - dassert(max_gced_decree < start_decree, - "the logs haven't yet duplicated were accidentally truncated " - "[max_gced_decree: {}, start_decree: {}, confirmed_decree: {}, last_decree: {}]", - max_gced_decree, - start_decree, - confirmed_decree, - last_decree); + dassert_f(max_gced_decree < start_decree, + "the logs haven't yet duplicated were accidentally truncated " + "[max_gced_decree: {}, start_decree: {}, confirmed_decree: {}, last_decree: {}]", + max_gced_decree, + start_decree, + confirmed_decree, + last_decree); } decree replica_duplicator::get_max_gced_decree() const diff --git a/src/dist/replication/lib/mutation_log.cpp b/src/dist/replication/lib/mutation_log.cpp index 46b86f29db..c1f7f9ab7c 100644 --- a/src/dist/replication/lib/mutation_log.cpp +++ b/src/dist/replication/lib/mutation_log.cpp @@ -1201,7 +1201,7 @@ static bool should_reserve_file(log_file_ptr log, } int mutation_log::garbage_collection(gpid gpid, - decree durable_decree, + decree cleanable_decree, int64_t valid_start_offset, int64_t reserve_max_size, int64_t reserve_max_time) @@ -1252,27 +1252,25 @@ int mutation_log::garbage_collection(gpid gpid, // log is invalid, ok to delete else if (valid_start_offset >= log->end_offset()) { - dinfo("gc_private @ %d.%d: max_offset for %s is %" PRId64 " vs %" PRId64 - " as app.valid_start_offset.private," - " safe to delete this and all older logs", - _private_gpid.get_app_id(), - _private_gpid.get_partition_index(), - mark_it->second->path().c_str(), - mark_it->second->end_offset(), - valid_start_offset); + ddebug_f("gc_private @ {}: will remove files {} ~ log.{} because " + "valid_start_offset={} outdates log_end_offset={}", + _private_gpid, + files.begin()->second->path(), + log->index(), + valid_start_offset, + log->end_offset()); break; } - // all decrees are durable, ok to delete - else if (durable_decree >= max_decree) { - dinfo("gc_private @ %d.%d: max_decree for %s is %" PRId64 " vs %" PRId64 - " as app.durable decree," - " safe to delete this and all older logs", - _private_gpid.get_app_id(), - _private_gpid.get_partition_index(), - mark_it->second->path().c_str(), - max_decree, - durable_decree); + // all mutations are cleanable, ok to delete + else if (cleanable_decree >= max_decree) { + ddebug_f("gc_private @ {}: will remove files {} ~ log.{} because " + "cleanable_decree={} outdates max_decree={}", + _private_gpid, + files.begin()->second->path(), + log->index(), + cleanable_decree, + max_decree); break; } @@ -1296,7 +1294,7 @@ int mutation_log::garbage_collection(gpid gpid, for (auto it = files.begin(); it != files.end() && it->second->index() <= largest_to_delete; ++it) { log_file_ptr log = it->second; - dassert(it->first == log->index(), "%d VS %d", it->first, log->index()); + dcheck_eq(it->first, log->index()); // close first log->close(); @@ -1312,10 +1310,7 @@ int mutation_log::garbage_collection(gpid gpid, } // delete succeed - ddebug("gc_private @ %d.%d: log file %s is removed", - _private_gpid.get_app_id(), - _private_gpid.get_partition_index(), - fpath.c_str()); + ddebug_f("gc_private @ {}: log file {} is removed", _private_gpid, fpath); deleted++; // erase from _log_files diff --git a/src/dist/replication/lib/mutation_log.h b/src/dist/replication/lib/mutation_log.h index acbfb6faea..116b00faf5 100644 --- a/src/dist/replication/lib/mutation_log.h +++ b/src/dist/replication/lib/mutation_log.h @@ -300,7 +300,8 @@ class mutation_log : public ref_counter // Decree of the maximum garbage-collected mutation. // For example, given mutations [20, 100], if [20, 50] is garbage-collected, // the max_gced_decree=50. - // In production the mutations may not be ordered with the file-id. Given 3 log files: + // Under the real-world cases, the mutations may not be ordered with the file-id. + // Given 3 log files: // #1:[20, 30], #2:[30, 50], #3:[10, 50] // The third file is learned from primary of new epoch. Since it contains mutations smaller // than the others, the max_gced_decree = 9. diff --git a/src/dist/replication/lib/replica.cpp b/src/dist/replication/lib/replica.cpp index 16e82d0557..61e9973583 100644 --- a/src/dist/replication/lib/replica.cpp +++ b/src/dist/replication/lib/replica.cpp @@ -45,7 +45,7 @@ namespace replication { replica::replica( replica_stub *stub, gpid gpid, const app_info &app, const char *dir, bool need_restore) : serverlet("replica"), - replica_base(gpid, fmt::format("{}@{}", gpid, stub->_primary_address_str)), + replica_base(gpid, fmt::format("{}@{}", gpid, stub->_primary_address_str), app.app_name), _app_info(app), _primary_states( gpid, stub->options().staleness_for_commit, stub->options().batch_write_disabled), diff --git a/src/dist/replication/lib/replica_chkpt.cpp b/src/dist/replication/lib/replica_chkpt.cpp index f60748fc17..dfefdd5859 100644 --- a/src/dist/replication/lib/replica_chkpt.cpp +++ b/src/dist/replication/lib/replica_chkpt.cpp @@ -73,7 +73,12 @@ void replica::on_checkpoint_timer() decree last_durable_decree = _app->last_durable_decree(); decree min_confirmed_decree = _duplication_mgr->min_confirmed_decree(); decree cleanable_decree = last_durable_decree; + int64_t valid_start_offset = _app->init_info().init_offset_in_private_log; + if (min_confirmed_decree >= 0) { + // Do not rely on valid_start_offset for GC during duplication. + // cleanable_decree is the only GC trigger. + valid_start_offset = 0; if (min_confirmed_decree < last_durable_decree) { ddebug_replica("gc_private {}: delay gc for duplication: min_confirmed_decree({}) " "last_durable_decree({})", @@ -87,20 +92,15 @@ void replica::on_checkpoint_timer() min_confirmed_decree, last_durable_decree); } - } else { - // protect the logs from being truncated - // if this app is in duplication - if (is_duplicating()) { - // unsure if the logs can be dropped, because min_confirmed_decree - // is currently unavailable - ddebug_replica( - "gc_private {}: skip gc because confirmed duplication progress is unknown", - enum_to_string(status())); - return; - } + } else if (is_duplicating()) { + // unsure if the logs can be dropped, because min_confirmed_decree + // is currently unavailable + ddebug_replica( + "gc_private {}: skip gc because confirmed duplication progress is unknown", + enum_to_string(status())); + return; } - int64_t valid_start_offset = _app->init_info().init_offset_in_private_log; tasking::enqueue(LPC_GARBAGE_COLLECT_LOGS_AND_REPLICAS, &_tracker, [this, plog, cleanable_decree, valid_start_offset] { diff --git a/src/dist/replication/lib/replica_stub.cpp b/src/dist/replication/lib/replica_stub.cpp index 36ab9683bd..a5851d041a 100644 --- a/src/dist/replication/lib/replica_stub.cpp +++ b/src/dist/replication/lib/replica_stub.cpp @@ -242,11 +242,6 @@ void replica_stub::install_perf_counters() "dup.pending_mutations_count", COUNTER_TYPE_VOLATILE_NUMBER, "number of mutations pending for duplication"); - _counter_dup_time_lag.init_app_counter( - "eon.replica_stub", - "dup.time_lag(ms)", - COUNTER_TYPE_NUMBER_PERCENTILES, - "time (in ms) lag between master and slave in the duplication"); // <- Cold Backup Metrics -> diff --git a/src/dist/replication/lib/replica_stub.h b/src/dist/replication/lib/replica_stub.h index 2c62a3a4a9..458b18729d 100644 --- a/src/dist/replication/lib/replica_stub.h +++ b/src/dist/replication/lib/replica_stub.h @@ -398,7 +398,6 @@ class replica_stub : public serverlet, public ref_counter // if we need to duplicate to multiple clusters someday. perf_counter_wrapper _counter_dup_confirmed_rate; perf_counter_wrapper _counter_dup_pending_mutations_count; - perf_counter_wrapper _counter_dup_time_lag; perf_counter_wrapper _counter_cold_backup_running_count; perf_counter_wrapper _counter_cold_backup_recent_start_count; diff --git a/src/dist/replication/test/replica_test/unit_test/mutation_log_test.cpp b/src/dist/replication/test/replica_test/unit_test/mutation_log_test.cpp index 644eb0ca92..8a8dbd6b87 100644 --- a/src/dist/replication/test/replica_test/unit_test/mutation_log_test.cpp +++ b/src/dist/replication/test/replica_test/unit_test/mutation_log_test.cpp @@ -307,15 +307,55 @@ class mutation_log_test : public replica_test_base ASSERT_EQ(std::string(lhs.data(), lhs.length()), std::string(rhs.data(), rhs.length())); } + // return number of entries written + int generate_multiple_log_files(uint files_num = 3) + { + // decree ranges from [1, files_num*10) + for (int f = 0; f < files_num; f++) { + // each round mlog will replay the former logs, and create new file + mutation_log_ptr mlog = create_private_log(); + for (int i = 1; i <= 10; i++) { + std::string msg = "hello!"; + mutation_ptr mu = create_test_mutation(msg, 10 * f + i); + mlog->append(mu, LPC_AIO_IMMEDIATE_CALLBACK, nullptr, nullptr, 0); + } + mlog->tracker()->wait_outstanding_tasks(); + mlog->close(); + } + return static_cast(files_num * 10); + } + + mutation_log_ptr create_private_log() { return create_private_log(1); } + + mutation_log_ptr create_private_log(int private_log_size_mb, decree replay_start_decree = 0) + { + gpid id = get_gpid(); + std::map replay_condition; + replay_condition[id] = replay_start_decree; + mutation_log::replay_callback cb = [](int, mutation_ptr &) { return true; }; + mutation_log_ptr mlog; + + int try_cnt = 0; + while (try_cnt < 5) { + try_cnt++; + mlog = new mutation_log_private( + _replica->dir(), private_log_size_mb, id, _replica.get(), 1024, 512, 10000); + error_code err = mlog->open(cb, nullptr, replay_condition); + if (err == ERR_OK) { + break; + } + derror_f("mlog open failed, encountered error: {}", err); + } + EXPECT_NE(mlog, nullptr); + return mlog; + } + void test_replay_single_file(int num_entries) { std::vector mutations; { // writing logs - mutation_log_ptr mlog = new mutation_log_private( - _log_dir, 1024, get_gpid(), _replica.get(), 1024, 512, 10000); - - EXPECT_EQ(mlog->open(nullptr, nullptr), ERR_OK); + mutation_log_ptr mlog = create_private_log(); for (int i = 0; i < num_entries; i++) { mutation_ptr mu = create_test_mutation("hello!", 2 + i); @@ -354,10 +394,7 @@ class mutation_log_test : public replica_test_base std::vector mutations; { // writing logs - mutation_log_ptr mlog = new mutation_log_private( - _log_dir, private_log_file_size_mb, get_gpid(), _replica.get(), 1024, 512, 10000); - EXPECT_EQ(mlog->open(nullptr, nullptr), ERR_OK); - + mutation_log_ptr mlog = create_private_log(private_log_file_size_mb); for (int i = 0; i < num_entries; i++) { mutation_ptr mu = create_test_mutation("hello!", 2 + i); mutations.push_back(mu); @@ -366,8 +403,7 @@ class mutation_log_test : public replica_test_base } { // reading logs - mutation_log_ptr mlog = - new mutation_log_private(_log_dir, 4, get_gpid(), _replica.get(), 1024, 512, 10000); + mutation_log_ptr mlog = create_private_log(private_log_file_size_mb); std::vector log_files; ASSERT_TRUE(utils::filesystem::get_subfiles(mlog->dir(), log_files, false)); @@ -411,10 +447,7 @@ TEST_F(mutation_log_test, open) std::vector mutations; { // writing logs - mutation_log_ptr mlog = - new mutation_log_private(_log_dir, 4, get_gpid(), _replica.get(), 1024, 512, 10000); - - EXPECT_EQ(mlog->open(nullptr, nullptr), ERR_OK); + mutation_log_ptr mlog = create_private_log(4); for (int i = 0; i < 1000; i++) { mutation_ptr mu = create_test_mutation("hello!", 2 + i); @@ -449,5 +482,18 @@ TEST_F(mutation_log_test, replay_multiple_files_20000_1mb) { test_replay_multipl TEST_F(mutation_log_test, replay_multiple_files_50000_1mb) { test_replay_multiple_files(50000, 1); } +TEST_F(mutation_log_test, replay_start_decree) +{ + // decree ranges from [1, 30) + generate_multiple_log_files(3); + + decree replay_start_decree = 11; // start replay from second file, the first file is ignored. + mutation_log_ptr mlog = create_private_log(1, replay_start_decree); + + // ensure the first file is not stripped out. + ASSERT_EQ(mlog->max_gced_decree(get_gpid()), 0); + ASSERT_EQ(mlog->get_log_file_map().size(), 3); +} + } // namespace replication } // namespace dsn