Skip to content

Commit

Permalink
refactor: simplify mutation_log write_pending_mutations (#436)
Browse files Browse the repository at this point in the history
  • Loading branch information
Wu Tao authored Apr 13, 2020
1 parent 54df4bb commit d5f06db
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,7 @@ class load_from_private_log_test : public duplication_test_base
void test_start_duplication(int num_entries, int private_log_size_mb)
{
std::vector<std::string> mutations;

mutation_log_ptr mlog = new mutation_log_private(_replica->dir(),
private_log_size_mb,
_replica->get_gpid(),
_replica.get(),
1024,
512,
50000);
EXPECT_EQ(mlog->open(nullptr, nullptr), ERR_OK);
mutation_log_ptr mlog = create_private_log(private_log_size_mb, _replica->get_gpid());

{
for (int i = 1; i <= num_entries; i++) {
Expand Down Expand Up @@ -175,7 +167,7 @@ class load_from_private_log_test : public duplication_test_base
{
load_from_private_log load(_replica.get(), duplicator.get());

int num_entries = generate_multiple_log_files(2);
generate_multiple_log_files(2);

std::vector<std::string> files;
ASSERT_EQ(log_utils::list_all_files(_log_dir, files), error_s::ok());
Expand All @@ -200,9 +192,19 @@ class load_from_private_log_test : public duplication_test_base
std::map<gpid, decree> replay_condition;
replay_condition[id] = 0; // duplicating
mutation_log::replay_callback cb = [](int, mutation_ptr &) { return true; };
mutation_log_ptr mlog = new mutation_log_private(
_replica->dir(), private_log_size_mb, id, _replica.get(), 1024, 512, 10000);
EXPECT_EQ(mlog->open(cb, nullptr, replay_condition), ERR_OK);
mutation_log_ptr mlog;

int try_cnt = 0;
while (try_cnt < 5) {
try_cnt++;
mlog = new mutation_log_private(
_replica->dir(), private_log_size_mb, id, _replica.get(), 1024, 512, 10000);
error_code err = mlog->open(cb, nullptr, replay_condition);
if (err == ERR_OK) {
break;
}
derror_f("mlog open failed, encountered error: {}", err);
}
return mlog;
}

Expand Down
98 changes: 51 additions & 47 deletions src/dist/replication/lib/mutation_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,17 +117,22 @@ void mutation_log_shared::write_pending_mutations(bool release_lock_required)

// move or reset pending variables
std::shared_ptr<log_block> blk = std::move(_pending_write);
int64_t start_offset = blk->start_offset();

// seperate commit_log_block from within the lock
_slock.unlock();
commit_pending_mutations(pr.first, blk);
}

pr.first->commit_log_block(
void mutation_log_shared::commit_pending_mutations(log_file_ptr &lf,
std::shared_ptr<log_block> &blk)
{
int64_t start_offset = blk->start_offset();
lf->commit_log_block(
*blk,
start_offset,
LPC_WRITE_REPLICATION_LOG_SHARED,
&_tracker,
[ this, lf = pr.first, block = blk ](error_code err, size_t sz) mutable {
[ this, lf, block = blk ](error_code err, size_t sz) mutable {
dassert(_is_writing.load(std::memory_order_relaxed), "");

auto hdr = (log_block_header *)block->front().data();
Expand Down Expand Up @@ -375,7 +380,6 @@ void mutation_log_private::write_pending_mutations(bool release_lock_required)
// move or reset pending variables
std::shared_ptr<log_block> blk = std::move(_pending_write);
_issued_write = blk;
int64_t start_offset = blk->start_offset();
_pending_write_start_time_ms = 0;
decree max_commit = _pending_write_max_commit;
_pending_write_max_commit = 0;
Expand All @@ -384,65 +388,65 @@ void mutation_log_private::write_pending_mutations(bool release_lock_required)
// Free plog from lock during committing log block, in the meantime
// new mutations can still be appended.
_plock.unlock();
commit_pending_mutations(pr.first, blk, max_commit);
}

pr.first->commit_log_block(
void mutation_log_private::commit_pending_mutations(log_file_ptr &lf,
std::shared_ptr<log_block> &blk,
decree max_commit)
{
int64_t start_offset = blk->start_offset();
lf->commit_log_block(
*blk,
start_offset,
LPC_WRITE_REPLICATION_LOG_PRIVATE,
&_tracker,
[ this, lf = pr.first, block = blk, max_commit ](error_code err, size_t sz) mutable {
[ this, lf, block = blk, max_commit ](error_code err, size_t sz) mutable {
dassert(_is_writing.load(std::memory_order_relaxed), "");

auto hdr = (log_block_header *)block->front().data();
dassert(hdr->magic == 0xdeadbeef, "header magic is changed: 0x%x", hdr->magic);

if (err == ERR_OK) {
dassert(sz == block->size(),
"log write size must equal to the given size: %d vs %d",
(int)sz,
block->size());

dassert(sz == sizeof(log_block_header) + hdr->length,
"log write size must equal to (header size + data size): %d vs (%d + %d)",
(int)sz,
(int)sizeof(log_block_header),
hdr->length);

// flush to ensure that there is no gap between private log and in-memory buffer
// so that we can get all mutations in learning process.
//
// FIXME : the file could have been closed
lf->flush();

// update _private_max_commit_on_disk after written into log file done
update_max_commit_on_disk(max_commit);
} else {
if (err != ERR_OK) {
derror("write private log failed, err = %s", err.to_string());
_is_writing.store(false, std::memory_order_relaxed);
if (_io_error_callback) {
_io_error_callback(err);
}
return;
}
dassert(sz == block->size(),
"log write size must equal to the given size: %d vs %d",
(int)sz,
block->size());

dassert(sz == sizeof(log_block_header) + hdr->length,
"log write size must equal to (header size + data size): %d vs (%d + %d)",
(int)sz,
(int)sizeof(log_block_header),
hdr->length);

// flush to ensure that there is no gap between private log and in-memory buffer
// so that we can get all mutations in learning process.
//
// FIXME : the file could have been closed
lf->flush();

// update _private_max_commit_on_disk after written into log file done
update_max_commit_on_disk(max_commit);

// here we use _is_writing instead of _issued_write.expired() to check writing done,
// because the following callbacks may run before "block" released, which may cause
// the next init_prepare() not starting the write.
_is_writing.store(false, std::memory_order_relaxed);

// notify error when necessary
if (err != ERR_OK) {
if (_io_error_callback) {
_io_error_callback(err);
}
// start to write if possible
_plock.lock();

if (!_is_writing.load(std::memory_order_acquire) && _pending_write &&
(static_cast<uint32_t>(_pending_write->size()) >= _batch_buffer_bytes ||
static_cast<uint32_t>(_pending_write->data().size()) >= _batch_buffer_max_count ||
flush_interval_expired())) {
write_pending_mutations(true);
} else {
// start to write if possible
_plock.lock();

if (!_is_writing.load(std::memory_order_acquire) && _pending_write &&
(static_cast<uint32_t>(_pending_write->size()) >= _batch_buffer_bytes ||
static_cast<uint32_t>(_pending_write->data().size()) >=
_batch_buffer_max_count ||
flush_interval_expired())) {
write_pending_mutations(true);
} else {
_plock.unlock();
}
_plock.unlock();
}
},
0);
Expand Down
6 changes: 6 additions & 0 deletions src/dist/replication/lib/mutation_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,8 @@ class mutation_log_shared : public mutation_log
// appropriately for less lock contention
void write_pending_mutations(bool release_lock_required);

void commit_pending_mutations(log_file_ptr &lf, std::shared_ptr<log_block> &pending);

// flush at most count times
// if count <= 0, means flush until all data is on disk
void flush_internal(int max_count);
Expand Down Expand Up @@ -524,6 +526,10 @@ class mutation_log_private : public mutation_log, private replica_base
// appropriately for less lock contention
void write_pending_mutations(bool release_lock_required);

void commit_pending_mutations(log_file_ptr &lf,
std::shared_ptr<log_block> &pending,
decree max_commit);

virtual void init_states() override;

// flush at most count times
Expand Down

0 comments on commit d5f06db

Please sign in to comment.