Skip to content

Commit

Permalink
Revert "test: fix some perf context position"
Browse files Browse the repository at this point in the history
This reverts commit b84bd77.

Signed-off-by: Little-Wallace <[email protected]>
  • Loading branch information
Little-Wallace committed Jan 27, 2022
1 parent b84bd77 commit b914b6b
Show file tree
Hide file tree
Showing 18 changed files with 85 additions and 95 deletions.
71 changes: 35 additions & 36 deletions db/c.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,6 @@
#include "rocksdb/c.h"

#include <stdlib.h>

#include <map>
#include <unordered_set>
#include <vector>

#include "port/port.h"
#include "rocksdb/cache.h"
#include "rocksdb/compaction_filter.h"
Expand Down Expand Up @@ -49,77 +44,81 @@
#include "utilities/merge_operators.h"
#include "utilities/rate_limiters/write_amp_based_rate_limiter.h"

using rocksdb::BackupableDBOptions;
using rocksdb::BackupEngine;
using rocksdb::BackupID;
using rocksdb::BackupInfo;
using rocksdb::BatchResult;
using rocksdb::BlockBasedTableOptions;
using rocksdb::BottommostLevelCompaction;
#include <vector>
#include <unordered_set>
#include <map>

using rocksdb::BytewiseComparator;
using rocksdb::Cache;
using rocksdb::CheckPerfFlag;
using rocksdb::Checkpoint;
using rocksdb::ColumnFamilyDescriptor;
using rocksdb::ColumnFamilyHandle;
using rocksdb::ColumnFamilyOptions;
using rocksdb::CompactionFilter;
using rocksdb::CompactionFilterFactory;
using rocksdb::CompactionOptionsFIFO;
using rocksdb::CompactRangeOptions;
using rocksdb::Comparator;
using rocksdb::CompressionType;
using rocksdb::CuckooTableOptions;
using rocksdb::WALRecoveryMode;
using rocksdb::DB;
using rocksdb::DBOptions;
using rocksdb::DbPath;
using rocksdb::DisablePerfFlag;
using rocksdb::EnablePerfFlag;
using rocksdb::Env;
using rocksdb::EnvOptions;
using rocksdb::InfoLogLevel;
using rocksdb::FileLock;
using rocksdb::FilterPolicy;
using rocksdb::FlushOptions;
using rocksdb::InfoLogLevel;
using rocksdb::IngestExternalFileOptions;
using rocksdb::Iterator;
using rocksdb::LiveFileMetaData;
using rocksdb::Logger;
using rocksdb::MemoryUtil;
using rocksdb::MergeOperator;
using rocksdb::MergeOperators;
using rocksdb::NewBloomFilterPolicy;
using rocksdb::NewGenericRateLimiter;
using rocksdb::NewLRUCache;
using rocksdb::NewWriteAmpBasedRateLimiter;
using rocksdb::OptimisticTransactionDB;
using rocksdb::OptimisticTransactionOptions;
using rocksdb::Options;
using rocksdb::PerfContext;
using rocksdb::PerfLevel;
using rocksdb::PinnableSlice;
using rocksdb::BlockBasedTableOptions;
using rocksdb::CuckooTableOptions;
using rocksdb::RandomAccessFile;
using rocksdb::Range;
using rocksdb::RateLimiter;
using rocksdb::ReadOptions;
using rocksdb::RestoreOptions;
using rocksdb::SequentialFile;
using rocksdb::Slice;
using rocksdb::SliceParts;
using rocksdb::SliceTransform;
using rocksdb::Snapshot;
using rocksdb::SstFileWriter;
using rocksdb::Status;
using rocksdb::Transaction;
using rocksdb::TransactionDB;
using rocksdb::TransactionDBOptions;
using rocksdb::TransactionLogIterator;
using rocksdb::TransactionOptions;
using rocksdb::WALRecoveryMode;
using rocksdb::WritableFile;
using rocksdb::WriteBatch;
using rocksdb::WriteBatchWithIndex;
using rocksdb::WriteOptions;
using rocksdb::LiveFileMetaData;
using rocksdb::BackupEngine;
using rocksdb::BackupableDBOptions;
using rocksdb::BackupInfo;
using rocksdb::BackupID;
using rocksdb::RestoreOptions;
using rocksdb::CompactRangeOptions;
using rocksdb::BottommostLevelCompaction;
using rocksdb::RateLimiter;
using rocksdb::NewGenericRateLimiter;
using rocksdb::NewWriteAmpBasedRateLimiter;
using rocksdb::PinnableSlice;
using rocksdb::TransactionDBOptions;
using rocksdb::TransactionDB;
using rocksdb::TransactionOptions;
using rocksdb::OptimisticTransactionDB;
using rocksdb::OptimisticTransactionOptions;
using rocksdb::Transaction;
using rocksdb::Checkpoint;
using rocksdb::TransactionLogIterator;
using rocksdb::BatchResult;
using rocksdb::PerfLevel;
using rocksdb::EnablePerfFlag;
using rocksdb::DisablePerfFlag;
using rocksdb::CheckPerfFlag;
using rocksdb::PerfContext;
using rocksdb::MemoryUtil;

using std::shared_ptr;
using std::vector;
Expand Down
21 changes: 7 additions & 14 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -738,8 +738,7 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
bool needed_delay = write_controller->NeedsDelay();

if (write_stall_condition == WriteStallCondition::kStopped &&
write_stall_cause == WriteStallCause::kMemtableLimit &&
!mutable_cf_options.disable_write_stall) {
write_stall_cause == WriteStallCause::kMemtableLimit && !mutable_cf_options.disable_write_stall) {
write_controller_token_ = write_controller->GetStopToken();
internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_STOPS, 1);
ROCKS_LOG_WARN(
Expand All @@ -749,8 +748,7 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
name_.c_str(), imm()->NumNotFlushed(),
mutable_cf_options.max_write_buffer_number);
} else if (write_stall_condition == WriteStallCondition::kStopped &&
write_stall_cause == WriteStallCause::kL0FileCountLimit &&
!mutable_cf_options.disable_write_stall) {
write_stall_cause == WriteStallCause::kL0FileCountLimit && !mutable_cf_options.disable_write_stall) {
write_controller_token_ = write_controller->GetStopToken();
internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_STOPS, 1);
if (compaction_picker_->IsLevel0CompactionInProgress()) {
Expand All @@ -761,8 +759,7 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
"[%s] Stopping writes because we have %d level-0 files",
name_.c_str(), vstorage->l0_delay_trigger_count());
} else if (write_stall_condition == WriteStallCondition::kStopped &&
write_stall_cause == WriteStallCause::kPendingCompactionBytes &&
!mutable_cf_options.disable_write_stall) {
write_stall_cause == WriteStallCause::kPendingCompactionBytes && !mutable_cf_options.disable_write_stall) {
write_controller_token_ = write_controller->GetStopToken();
internal_stats_->AddCFStats(
InternalStats::PENDING_COMPACTION_BYTES_LIMIT_STOPS, 1);
Expand All @@ -772,8 +769,7 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
"bytes %" PRIu64,
name_.c_str(), compaction_needed_bytes);
} else if (write_stall_condition == WriteStallCondition::kDelayed &&
write_stall_cause == WriteStallCause::kMemtableLimit &&
!mutable_cf_options.disable_write_stall) {
write_stall_cause == WriteStallCause::kMemtableLimit && !mutable_cf_options.disable_write_stall) {
write_controller_token_ =
SetupDelay(write_controller, compaction_needed_bytes,
prev_compaction_needed_bytes_, was_stopped,
Expand All @@ -788,8 +784,7 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
mutable_cf_options.max_write_buffer_number,
write_controller->delayed_write_rate());
} else if (write_stall_condition == WriteStallCondition::kDelayed &&
write_stall_cause == WriteStallCause::kL0FileCountLimit &&
!mutable_cf_options.disable_write_stall) {
write_stall_cause == WriteStallCause::kL0FileCountLimit && !mutable_cf_options.disable_write_stall) {
// L0 is the last two files from stopping.
bool near_stop = vstorage->l0_delay_trigger_count() >=
mutable_cf_options.level0_stop_writes_trigger - 2;
Expand All @@ -809,8 +804,7 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
name_.c_str(), vstorage->l0_delay_trigger_count(),
write_controller->delayed_write_rate());
} else if (write_stall_condition == WriteStallCondition::kDelayed &&
write_stall_cause == WriteStallCause::kPendingCompactionBytes &&
!mutable_cf_options.disable_write_stall) {
write_stall_cause == WriteStallCause::kPendingCompactionBytes && !mutable_cf_options.disable_write_stall) {
// If the distance to hard limit is less than 1/4 of the gap between soft
// and
// hard bytes limit, we think it is near stop and speed up the slowdown.
Expand All @@ -835,8 +829,7 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
name_.c_str(), vstorage->estimated_compaction_needed_bytes(),
write_controller->delayed_write_rate());
} else {
assert(write_stall_condition == WriteStallCondition::kNormal ||
mutable_cf_options.disable_write_stall);
assert(write_stall_condition == WriteStallCondition::kNormal || mutable_cf_options.disable_write_stall);
if (vstorage->l0_delay_trigger_count() >=
GetL0ThresholdSpeedupCompaction(
mutable_cf_options.level0_file_num_compaction_trigger,
Expand Down
12 changes: 5 additions & 7 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
seq_per_batch_(seq_per_batch),
batch_per_txn_(batch_per_txn),
db_lock_(nullptr),
log_write_mutex_(stats_, env_, DB_MUTEX_WAIT_MICROS, false),
shutting_down_(false),
bg_cv_(&mutex_),
logfile_number_(0),
Expand Down Expand Up @@ -1020,12 +1019,11 @@ Status DBImpl::SetDBOptions(
mutable_db_options_.max_background_jobs,
mutable_db_options_.base_background_compactions,
/* parallelize_compactions */ true);
const BGJobLimits new_bg_job_limits =
GetBGJobLimits(new_options.max_background_flushes,
new_options.max_background_compactions,
new_options.max_background_jobs,
new_options.base_background_compactions,
/* parallelize_compactions */ true);
const BGJobLimits new_bg_job_limits = GetBGJobLimits(
new_options.max_background_flushes,
new_options.max_background_compactions,
new_options.max_background_jobs,
new_options.base_background_compactions, /* parallelize_compactions */ true);

const bool max_flushes_increased =
new_bg_job_limits.max_flushes > current_bg_job_limits.max_flushes;
Expand Down
3 changes: 1 addition & 2 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1988,8 +1988,7 @@ DBImpl::BGJobLimits DBImpl::GetBGJobLimits(int max_background_flushes,
}
if (!parallelize_compactions) {
// throttle background compactions until we deem necessary
res.max_compactions =
std::max(1, std::min(base_background_compactions, res.max_compactions));
res.max_compactions = std::max(1, std::min(base_background_compactions, res.max_compactions));
}
return res;
}
Expand Down
3 changes: 1 addition & 2 deletions db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
}
auto bg_job_limits = DBImpl::GetBGJobLimits(
result.max_background_flushes, result.max_background_compactions,
result.max_background_jobs, result.base_background_compactions,
true /* parallelize_compactions */);
result.max_background_jobs, result.base_background_compactions, true /* parallelize_compactions */);
result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_compactions,
Env::Priority::LOW);
result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_flushes,
Expand Down
24 changes: 16 additions & 8 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,15 @@ Status DBImpl::MultiBatchWriteImpl(const WriteOptions& write_options,
WriteContext write_context;
bool ignore_missing_faimly = write_options.ignore_missing_column_families;
if (writer.state == WriteThread::STATE_GROUP_LEADER) {
PERF_TIMER_STOP(write_pre_and_post_process_time);
PERF_TIMER_GUARD(write_delay_time);
if (writer.callback && !writer.callback->AllowWriteBatching()) {
write_thread_.WaitForMemTableWriters();
}
WriteThread::WriteGroup wal_write_group;
LogContext log_context;
PERF_TIMER_STOP(write_pre_and_post_process_time);
writer.status =
PreprocessWrite(write_options, &log_context, &write_context);
PERF_TIMER_START(write_pre_and_post_process_time);

// This can set non-OK status if callback fail.
last_batch_group_size_ =
Expand Down Expand Up @@ -132,6 +132,7 @@ Status DBImpl::MultiBatchWriteImpl(const WriteOptions& write_options,
RecordTick(stats_, BYTES_WRITTEN, total_byte_size);
RecordInHistogram(stats_, BYTES_PER_WRITE, total_byte_size);

PERF_TIMER_STOP(write_pre_and_post_process_time);
if (!write_options.disableWAL) {
PERF_TIMER_GUARD(write_wal_time);
stats->AddDBStats(InternalStats::kIntStatsWriteDoneBySelf, 1);
Expand Down Expand Up @@ -162,14 +163,14 @@ Status DBImpl::MultiBatchWriteImpl(const WriteOptions& write_options,
bool is_leader_thread = false;
WriteThread::WriteGroup memtable_write_group;
if (writer.state == WriteThread::STATE_MEMTABLE_WRITER_LEADER) {
PERF_TIMER_GUARD(write_memtable_time);
assert(writer.ShouldWriteToMemtable());
write_thread_.EnterAsMemTableWriter(&writer, &memtable_write_group);
assert(immutable_db_options_.allow_concurrent_memtable_write);
if (memtable_write_group.size > 1) {
is_leader_thread = true;
write_thread_.LaunchParallelMemTableWriters(&memtable_write_group);
} else {
PERF_TIMER_GUARD(write_memtable_time);
auto version_set = versions_->GetColumnFamilySet();
memtable_write_group.running.store(0);
for (auto it = memtable_write_group.begin();
Expand All @@ -193,7 +194,6 @@ Status DBImpl::MultiBatchWriteImpl(const WriteOptions& write_options,
}
if (writer.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
assert(writer.ShouldWriteToMemtable());
PERF_TIMER_GUARD(write_memtable_time);
auto version_set = versions_->GetColumnFamilySet();
WriteBatchInternal::AsyncInsertInto(
&writer, writer.sequence, version_set, &flush_scheduler_,
Expand Down Expand Up @@ -640,8 +640,8 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
LogContext log_context(!write_options.disableWAL && write_options.sync);
// PreprocessWrite does its own perf timing.
PERF_TIMER_STOP(write_pre_and_post_process_time);
PERF_TIMER_GUARD(write_delay_time);
w.status = PreprocessWrite(write_options, &log_context, &write_context);
PERF_TIMER_START(write_pre_and_post_process_time);

// This can set non-OK status if callback fail.
last_batch_group_size_ =
Expand Down Expand Up @@ -678,6 +678,8 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
RecordTick(stats_, BYTES_WRITTEN, total_byte_size);
RecordInHistogram(stats_, BYTES_PER_WRITE, total_byte_size);

PERF_TIMER_STOP(write_pre_and_post_process_time);

if (w.status.ok() && !write_options.disableWAL) {
PERF_TIMER_GUARD(write_wal_time);
stats->AddDBStats(InternalStats::kIntStatsWriteDoneBySelf, 1);
Expand Down Expand Up @@ -750,7 +752,7 @@ Status DBImpl::UnorderedWriteMemtable(const WriteOptions& write_options,
WriteCallback* callback, uint64_t log_ref,
SequenceNumber seq,
const size_t sub_batch_cnt) {
PERF_TIMER_GUARD(write_memtable_time);
PERF_TIMER_GUARD(write_pre_and_post_process_time);
StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);

WriteThread::Writer w(write_options, my_batch, callback, log_ref,
Expand Down Expand Up @@ -822,8 +824,6 @@ Status DBImpl::WriteImplWALOnly(
// else we are the leader of the write batch group
assert(w.state == WriteThread::STATE_GROUP_LEADER);

PERF_TIMER_STOP(write_pre_and_post_process_time);
PERF_TIMER_GUARD(write_delay_time);
if (publish_last_seq == kDoPublishLastSeq) {
// Currently we only use kDoPublishLastSeq in unordered_write
assert(immutable_db_options_.unordered_write);
Expand Down Expand Up @@ -884,6 +884,8 @@ Status DBImpl::WriteImplWALOnly(
}
RecordInHistogram(stats_, BYTES_PER_WRITE, total_byte_size);

PERF_TIMER_STOP(write_pre_and_post_process_time);

PERF_TIMER_GUARD(write_wal_time);
// LastAllocatedSequence is increased inside WriteToWAL under
// wal_write_mutex_ to ensure ordered events in WAL
Expand Down Expand Up @@ -932,6 +934,7 @@ Status DBImpl::WriteImplWALOnly(
status = SyncWAL();
}
}
PERF_TIMER_START(write_pre_and_post_process_time);

if (!w.CallbackFailed()) {
WriteStatusCheck(status);
Expand Down Expand Up @@ -1033,15 +1036,19 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
}

PERF_TIMER_STOP(write_scheduling_flushes_compactions_time);
PERF_TIMER_GUARD(write_pre_and_post_process_time);

if (UNLIKELY(status.ok() && (write_controller_.IsStopped() ||
write_controller_.NeedsDelay()))) {
PERF_TIMER_STOP(write_pre_and_post_process_time);
PERF_TIMER_GUARD(write_delay_time);
// We don't know size of curent batch so that we always use the size
// for previous one. It might create a fairness issue that expiration
// might happen for smaller writes but larger writes can go through.
// Can optimize it if it is an issue.
InstrumentedMutexLock l(&mutex_);
status = DelayWrite(last_batch_group_size_, write_options);
PERF_TIMER_START(write_pre_and_post_process_time);
}

InstrumentedMutexLock l(&log_write_mutex_);
Expand Down Expand Up @@ -1627,6 +1634,7 @@ Status DBImpl::ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options,
// is that in case the write is heavy, low pri writes may never have
// a chance to run. Now we guarantee we are still slowly making
// progress.
PERF_TIMER_GUARD(write_delay_time);
write_controller_.low_pri_rate_limiter()->Request(
my_batch->GetDataSize(), Env::IO_HIGH, nullptr /* stats */,
RateLimiter::OpType::kWrite);
Expand Down
Loading

0 comments on commit b914b6b

Please sign in to comment.