Skip to content

Commit

Permalink
[SKV-636] fix: fix a crash bug when open an incomplete created RocksD…
Browse files Browse the repository at this point in the history
…B instance (apache#1451)

对应社区commit: https://github.com/apache/incubator-pegasus/pull/1451/files

注: 由于单测部分变更较大,本次未合入

apache#1450

If replica server attempt to open an incomplete RocksDB instance (maybe caused
by a previous crash), it will crash before moving the incomplete path to ".err"
trash path, and it will crash again if restart the server.

This patch avoid to crash before moving the incomplete RocksDB path to ".err" path,
thus the replica has an opportunity to recovery automatically without move the
incomplete RocksDB path manually.
  • Loading branch information
acelyc111 authored and 王聃 committed May 5, 2023
1 parent 38d4da0 commit 4c24c98
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 34 deletions.
31 changes: 16 additions & 15 deletions src/server/meta_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,29 +39,30 @@ meta_store::meta_store(pegasus_server_impl *server,
_wt_opts.disableWAL = true;
}

uint64_t meta_store::get_last_flushed_decree() const
dsn::error_code meta_store::get_last_flushed_decree(uint64_t *decree) const
{
uint64_t last_flushed_decree = 0;
auto ec = get_value_from_meta_cf(true, LAST_FLUSHED_DECREE, &last_flushed_decree);
dcheck_eq_replica(::dsn::ERR_OK, ec);
return last_flushed_decree;
LOG_AND_RETURN_NOT_OK(error_replica,
get_value_from_meta_cf(true, LAST_FLUSHED_DECREE, decree),
"get_value_from_meta_cf failed");
return dsn::ERR_OK;
}

uint32_t meta_store::get_data_version() const
dsn::error_code meta_store::get_data_version(uint32_t *version) const
{
uint64_t pegasus_data_version = 0;
auto ec = get_value_from_meta_cf(false, DATA_VERSION, &pegasus_data_version);
dcheck_eq_replica(::dsn::ERR_OK, ec);
return static_cast<uint32_t>(pegasus_data_version);
LOG_AND_RETURN_NOT_OK(error_replica,
get_value_from_meta_cf(false, DATA_VERSION, &pegasus_data_version),
"get_value_from_meta_cf failed");
*version = static_cast<uint32_t>(pegasus_data_version);
return dsn::ERR_OK;
}

uint64_t meta_store::get_last_manual_compact_finish_time() const
dsn::error_code meta_store::get_last_manual_compact_finish_time(uint64_t *ts) const
{
uint64_t last_manual_compact_finish_time = 0;
auto ec = get_value_from_meta_cf(
false, LAST_MANUAL_COMPACT_FINISH_TIME, &last_manual_compact_finish_time);
dcheck_eq_replica(::dsn::ERR_OK, ec);
return last_manual_compact_finish_time;
LOG_AND_RETURN_NOT_OK(error_replica,
get_value_from_meta_cf(false, LAST_MANUAL_COMPACT_FINISH_TIME, ts),
"get_value_from_meta_cf failed");
return dsn::ERR_OK;
}

uint64_t meta_store::get_decree_from_readonly_db(rocksdb::DB *db,
Expand Down
6 changes: 3 additions & 3 deletions src/server/meta_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ class meta_store : public dsn::replication::replica_base
public:
meta_store(pegasus_server_impl *server, rocksdb::DB *db, rocksdb::ColumnFamilyHandle *meta_cf);

uint64_t get_last_flushed_decree() const;
dsn::error_code get_last_flushed_decree(uint64_t *decree) const;
uint64_t get_decree_from_readonly_db(rocksdb::DB *db,
rocksdb::ColumnFamilyHandle *meta_cf) const;
uint32_t get_data_version() const;
uint64_t get_last_manual_compact_finish_time() const;
dsn::error_code get_data_version(uint32_t *version) const;
dsn::error_code get_last_manual_compact_finish_time(uint64_t *ts) const;
std::string get_usage_scenario() const;

void set_last_flushed_decree(uint64_t decree) const;
Expand Down
56 changes: 40 additions & 16 deletions src/server/pegasus_server_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <rocksdb/utilities/checkpoint.h>
#include <rocksdb/utilities/options_util.h>
#include <dsn/utility/chrono_literals.h>
#include <dsn/utility/defer.h>
#include <dsn/utility/utils.h>
#include <dsn/utility/filesystem.h>
#include <dsn/utility/string_conv.h>
Expand Down Expand Up @@ -1574,9 +1575,10 @@ dsn::error_code pegasus_server_impl::start(int argc, char **argv)
// When DB exists, meta CF and data CF must be present.
bool missing_meta_cf = true;
bool missing_data_cf = true;
if (check_column_families(rdb_path, &missing_meta_cf, &missing_data_cf) != dsn::ERR_OK) {
auto ec = check_column_families(rdb_path, &missing_meta_cf, &missing_data_cf);
if (ec != dsn::ERR_OK) {
derror_replica("check column families failed");
return dsn::ERR_LOCAL_APP_FAILURE;
return ec;
}
dassert_replica(!missing_meta_cf, "You must upgrade Pegasus server from 2.0");
dassert_replica(!missing_data_cf, "Missing data column family");
Expand Down Expand Up @@ -1652,19 +1654,29 @@ dsn::error_code pegasus_server_impl::start(int argc, char **argv)
_meta_store = dsn::make_unique<meta_store>(this, _db, _meta_cf);

if (db_exist) {
_last_committed_decree = _meta_store->get_last_flushed_decree();
_pegasus_data_version = _meta_store->get_data_version();
auto cleanup = dsn::defer([this]() { release_db(); });
uint64_t decree = 0;
LOG_AND_RETURN_NOT_OK(error_replica,
_meta_store->get_last_flushed_decree(&decree),
"get_last_flushed_decree failed");
_last_committed_decree.store(static_cast<int64_t>(decree));
LOG_AND_RETURN_NOT_OK(error_replica,
_meta_store->get_data_version(&_pegasus_data_version),
"get_data_version failed");
_usage_scenario = _meta_store->get_usage_scenario();
uint64_t last_manual_compact_finish_time =
_meta_store->get_last_manual_compact_finish_time();
if (_pegasus_data_version > PEGASUS_DATA_VERSION_MAX) {
derror_replica("open app failed, unsupported data version {}", _pegasus_data_version);
release_db();
return dsn::ERR_LOCAL_APP_FAILURE;
}

uint64_t last_manual_compact_finish_time = 0;
LOG_AND_RETURN_NOT_OK(
error_replica,
_meta_store->get_last_manual_compact_finish_time(&last_manual_compact_finish_time),
"get_last_manual_compact_finish_time failed");
LOG_AND_RETURN_NOT_TRUE(error_replica,
_pegasus_data_version <= PEGASUS_DATA_VERSION_MAX,
dsn::ERR_LOCAL_APP_FAILURE,
"open app failed, unsupported data version {}",
_pegasus_data_version);
// update last manual compact finish timestamp
_manual_compact_svc.init_last_finish_time_ms(last_manual_compact_finish_time);
cleanup.cancel();
} else {
// Write initial meta data to meta CF and flush when create new DB.
_meta_store->set_data_version(PEGASUS_DATA_VERSION_MAX);
Expand Down Expand Up @@ -1902,7 +1914,8 @@ ::dsn::error_code pegasus_server_impl::sync_checkpoint()
{
::dsn::utils::auto_lock<::dsn::utils::ex_lock_nr> l(_checkpoints_lock);
dcheck_gt_replica(last_commit, last_durable_decree());
int64_t last_flushed = static_cast<int64_t>(_meta_store->get_last_flushed_decree());
uint64_t last_flushed = 0;
dcheck_eq_replica(::dsn::ERR_OK, _meta_store->get_last_flushed_decree(&last_flushed));
dcheck_eq_replica(last_commit, last_flushed);
if (!_checkpoints.empty()) {
dcheck_gt_replica(last_commit, _checkpoints.back());
Expand All @@ -1927,7 +1940,8 @@ ::dsn::error_code pegasus_server_impl::async_checkpoint(bool flush_memtable)
return ::dsn::ERR_WRONG_TIMING;

int64_t last_durable = last_durable_decree();
int64_t last_flushed = static_cast<int64_t>(_meta_store->get_last_flushed_decree());
uint64_t last_flushed = 0;
dcheck_eq_replica(::dsn::ERR_OK, _meta_store->get_last_flushed_decree(&last_flushed));
int64_t last_commit = last_committed_decree();

dcheck_le_replica(last_durable, last_flushed);
Expand Down Expand Up @@ -3090,6 +3104,11 @@ ::dsn::error_code pegasus_server_impl::check_column_families(const std::string &
return ::dsn::ERR_LOCAL_APP_FAILURE;
}

if (column_families.empty()) {
derror_replica("column families are empty");
return ::dsn::ERR_LOCAL_APP_FAILURE;
}

for (const auto &column_family : column_families) {
if (column_family == META_COLUMN_FAMILY_NAME) {
*missing_meta_cf = false;
Expand Down Expand Up @@ -3143,7 +3162,11 @@ uint64_t pegasus_server_impl::do_manual_compact(const rocksdb::CompactRangeOptio
// update rocksdb statistics immediately
update_replica_rocksdb_statistics();

return _meta_store->get_last_manual_compact_finish_time();
uint64_t last_manual_compact_finish_time = 0;
dcheck_eq_replica(
::dsn::ERR_OK,
_meta_store->get_last_manual_compact_finish_time(&last_manual_compact_finish_time));
return last_manual_compact_finish_time;
}

bool pegasus_server_impl::release_storage_after_manual_compact()
Expand All @@ -3169,7 +3192,8 @@ bool pegasus_server_impl::release_storage_after_manual_compact()
gc_checkpoints(true);
ddebug_replica("finish gc_checkpoints, time_used = {}ms", dsn_now_ms() - start_time);

int64_t new_last_durable = _meta_store->get_last_flushed_decree();
uint64_t new_last_durable = 0;
dcheck_eq_replica(::dsn::ERR_OK, _meta_store->get_last_flushed_decree(&new_last_durable));
if (new_last_durable > old_last_durable) {
ddebug_replica("release storage succeed, last_durable_decree changed from {} to {}",
old_last_durable,
Expand Down

0 comments on commit 4c24c98

Please sign in to comment.