Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat: append mlog in fixed-size blocks using log_appender #418

Merged
merged 3 commits into from
Apr 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions src/dist/replication/lib/log_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,22 @@ void log_block::init()
add(temp_writer.get_buffer());
}

void log_block::append_mutation(const mutation_ptr &mu, const aio_task_ptr &cb)
void log_appender::append_mutation(const mutation_ptr &mu, const aio_task_ptr &cb)
{
_mutations.push_back(mu);
if (cb) {
_callbacks.push_back(cb);
}
mu->data.header.log_offset = _start_offset + size();
mu->write_to([this](const blob &bb) { add(bb); });
log_block *blk = &_blocks.back();
if (blk->size() > DEFAULT_MAX_BLOCK_BYTES) {
_full_blocks_size += blk->size();
_full_blocks_blob_cnt += blk->data().size();
int64_t new_block_start_offset = blk->start_offset() + blk->size();
_blocks.emplace_back(new_block_start_offset);
blk = &_blocks.back();
}
mu->data.header.log_offset = blk->start_offset() + blk->size();
mu->write_to([blk](const blob &bb) { blk->add(bb); });
}

} // namespace replication
Expand Down
53 changes: 45 additions & 8 deletions src/dist/replication/lib/log_block.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ class log_block
{
std::vector<blob> _data; // the first blob is log_block_header
size_t _size{0}; // total data size of all blobs
std::vector<mutation_ptr> _mutations;
std::vector<aio_task_ptr> _callbacks;
int64_t _start_offset{0};

public:
Expand Down Expand Up @@ -60,21 +58,60 @@ class log_block
_data.push_back(bb);
}

void append_mutation(const mutation_ptr &mu, const aio_task_ptr &cb);

const std::vector<mutation_ptr> &mutations() const { return _mutations; }

const std::vector<aio_task_ptr> &callbacks() const { return _callbacks; }

// return total data size in the block
size_t size() const { return _size; }

// global offset to start writting this block
int64_t start_offset() const { return _start_offset; }

private:
friend class log_appender;
void init();
};

// Append writes into a buffer which consists of one or more fixed-size log blocks,
// which will be continuously flushed into one log file.
// Not thread-safe. Requires lock protection.
class log_appender
neverchanje marked this conversation as resolved.
Show resolved Hide resolved
{
public:
explicit log_appender(int64_t start_offset) { _blocks.emplace_back(start_offset); }

log_appender(int64_t start_offset, log_block &block)
{
block._start_offset = start_offset;
_blocks.emplace_back(std::move(block));
}

void append_mutation(const mutation_ptr &mu, const aio_task_ptr &cb);

size_t size() const { return _full_blocks_size + _blocks.crbegin()->size(); }
size_t blob_count() const { return _full_blocks_blob_cnt + _blocks.crbegin()->data().size(); }

std::vector<mutation_ptr> mutations() const { return _mutations; }

// The callback registered for each write.
const std::vector<aio_task_ptr> &callbacks() const { return _callbacks; }

// Returns the heading block's start_offset.
int64_t start_offset() const { return _blocks.cbegin()->start_offset(); }

std::vector<log_block> &all_blocks() { return _blocks; }

protected:
static constexpr size_t DEFAULT_MAX_BLOCK_BYTES = 1 * 1024 * 1024; // 1MB

// |---------------------- _blocks ----------------------|
// | full block 0 | full block 1 | .... | unfilled block |

// New block is appended to tail.
// The tailing block is the only block that may be unfilled.
std::vector<log_block> _blocks;
size_t _full_blocks_size{0};
size_t _full_blocks_blob_cnt{0};
std::vector<aio_task_ptr> _callbacks;
std::vector<mutation_ptr> _mutations;
};

} // namespace replication
} // namespace dsn
Loading