Skip to content

Commit

Permalink
Cherry-pick: Wal debug 6.29.tikv (tikv#2)
Browse files Browse the repository at this point in the history
Signed-off-by: Qi Xu <[email protected]>
Co-authored-by: Qi Xu <[email protected]>
  • Loading branch information
2 people authored and mittalrishabh committed Apr 3, 2024
1 parent ce0ea61 commit 60df0d6
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 9 deletions.
17 changes: 15 additions & 2 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1471,13 +1471,19 @@ void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir,
for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;) {
auto& wal = *it;
assert(wal.IsSyncing());

ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Synced log %" PRIu64 " from logs_\n", wal.number);
if (logs_.size() > 1) {
if (immutable_db_options_.track_and_verify_wals_in_manifest &&
wal.GetPreSyncSize() > 0) {
synced_wals->AddWal(wal.number, WalMetadata(wal.GetPreSyncSize()));
}
logs_to_free_.push_back(wal.ReleaseWriter());
auto writer = wal.ReleaseWriter();
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"deleting log %" PRIu64
" from logs_. Last Seq number of the WAL is %" PRIu64 "\n",
wal.number, writer->GetLastSequence());
logs_to_free_.push_back(writer);
it = logs_.erase(it);
} else {
wal.FinishSync();
Expand All @@ -1491,12 +1497,19 @@ void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir,

void DBImpl::MarkLogsNotSynced(uint64_t up_to) {
log_write_mutex_.AssertHeld();
uint64_t min_wal = 0;
for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;
++it) {
auto& wal = *it;
if (min_wal == 0) {
min_wal = it->number;
}
wal.FinishSync();
}
log_sync_cv_.SignalAll();
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"MarkLogsNotSynced from %" PRIu64 " to %" PRIu64 "\n", min_wal,
up_to);
}

SequenceNumber DBImpl::GetLatestSequenceNumber() const {
Expand Down
2 changes: 1 addition & 1 deletion db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1822,7 +1822,7 @@ class DBImpl : public DB {

IOStatus WriteToWAL(const WriteBatch& merged_batch, log::Writer* log_writer,
uint64_t* log_used, uint64_t* log_size,
LogFileNumberSize& log_file_number_size);
LogFileNumberSize& log_file_number_size, int caller_id);

IOStatus WriteToWAL(const WriteThread::WriteGroup& write_group,
log::Writer* log_writer, uint64_t* log_used,
Expand Down
13 changes: 12 additions & 1 deletion db/db_impl/db_impl_files.cc
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,8 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
earliest.number);
log_recycle_files_.push_back(earliest.number);
} else {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"deleting WAL log %" PRIu64 "\n", earliest.number);
job_context->log_delete_files.push_back(earliest.number);
}
if (job_context->size_log_to_delete == 0) {
Expand All @@ -317,7 +319,12 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
// logs_ could have changed while we were waiting.
continue;
}
logs_to_free_.push_back(log.ReleaseWriter());
auto writer = log.ReleaseWriter();
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"deleting log %" PRIu64
" from logs_, last seq number of WAL %" PRIu64 "\n",
log.number, writer->GetLastSequence());
logs_to_free_.push_back(writer);
logs_.pop_front();
}
// Current log cannot be obsolete.
Expand Down Expand Up @@ -491,6 +498,10 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
// Close WALs before trying to delete them.
for (const auto w : state.logs_to_free) {
// TODO: maybe check the return value of Close.
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Close log %" PRIu64
" from logs_, last Seq number in WAL %" PRIu64 "\n",
w->get_log_number(), w->GetLastSequence());
auto s = w->Close();
s.PermitUncheckedError();
}
Expand Down
2 changes: 1 addition & 1 deletion db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1735,7 +1735,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
assert(log_writer->get_log_number() == log_file_number_size.number);
impl->mutex_.AssertHeld();
s = impl->WriteToWAL(empty_batch, log_writer, &log_used, &log_size,
log_file_number_size);
log_file_number_size, 0);
if (s.ok()) {
// Need to fsync, otherwise it might get lost after a power reset.
s = impl->FlushWAL(false);
Expand Down
22 changes: 18 additions & 4 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1412,10 +1412,23 @@ WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group,
IOStatus DBImpl::WriteToWAL(const WriteBatch& merged_batch,
log::Writer* log_writer, uint64_t* log_used,
uint64_t* log_size,
LogFileNumberSize& log_file_number_size) {
LogFileNumberSize& log_file_number_size,
int caller_id) {
assert(log_size != nullptr);

if (log_writer->file()->GetFileSize() == 0) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Start writing to WAL: [%" PRIu64 " ]",
log_writer->get_log_number());
}
if (log_writer->get_log_number() != logs_.back().number) {
ROCKS_LOG_INFO(
immutable_db_options_.info_log,
"Not writing to latest WAL: [%" PRIu64 ", %" PRIu64 "] CallerId: %d",
log_writer->get_log_number(), logs_.back().number, caller_id);
}
Slice log_entry = WriteBatchInternal::Contents(&merged_batch);
SequenceNumber seq = WriteBatchInternal::Sequence(&merged_batch);
log_writer->SetLastSequence(seq);
*log_size = log_entry.size();
// When two_write_queues_ WriteToWAL has to be protected from concurretn calls
// from the two queues anyway and log_write_mutex_ is already held. Otherwise
Expand Down Expand Up @@ -1468,7 +1481,7 @@ IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,

uint64_t log_size;
io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size,
log_file_number_size);
log_file_number_size, 1);
if (to_be_cached_state) {
cached_recoverable_state_ = *to_be_cached_state;
cached_recoverable_state_empty_ = false;
Expand Down Expand Up @@ -1530,6 +1543,7 @@ IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
stats->AddDBStats(InternalStats::kIntStatsWriteWithWal, write_with_wal);
RecordTick(stats_, WRITE_WITH_WAL, write_with_wal);
}

return io_s;
}

Expand Down Expand Up @@ -1569,7 +1583,7 @@ IOStatus DBImpl::ConcurrentWriteToWAL(

uint64_t log_size;
io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size,
log_file_number_size);
log_file_number_size, 2);
if (to_be_cached_state) {
cached_recoverable_state_ = *to_be_cached_state;
cached_recoverable_state_empty_ = false;
Expand Down
1 change: 1 addition & 0 deletions db/log_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ Writer::Writer(std::unique_ptr<WritableFileWriter>&& dest, uint64_t log_number,
char t = static_cast<char>(i);
type_crc_[i] = crc32c::Value(&t, 1);
}
last_seq_ = 0;
}

Writer::~Writer() {
Expand Down
6 changes: 6 additions & 0 deletions db/log_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "rocksdb/io_status.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "rocksdb/types.h"

namespace ROCKSDB_NAMESPACE {

Expand Down Expand Up @@ -92,11 +93,16 @@ class Writer {

bool TEST_BufferIsEmpty();

void SetLastSequence(SequenceNumber seq) { last_seq_ = seq; }

SequenceNumber GetLastSequence() const { return last_seq_; }

private:
std::unique_ptr<WritableFileWriter> dest_;
size_t block_offset_; // Current offset in block
uint64_t log_number_;
bool recycle_log_files_;
SequenceNumber last_seq_;

// crc32c values for all supported record types. These are
// pre-computed to reduce the overhead of computing the crc of the
Expand Down

0 comments on commit 60df0d6

Please sign in to comment.