Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(slog): flush and remove all shared logs for garbage collection #1594

Merged
merged 28 commits into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
0ef570d
feat(slog): apply and remove shared logs
empiredan Aug 25, 2023
dfee154
feat(slog): apply and remove shared logs
empiredan Aug 25, 2023
261f6b4
feat(slog): apply and remove shared logs
empiredan Aug 29, 2023
3c19d64
feat(slog): apply and remove shared logs
empiredan Aug 30, 2023
05099cd
feat(slog): apply and remove shared logs
empiredan Aug 30, 2023
3fbeb61
feat(slog): apply and remove shared logs
empiredan Aug 30, 2023
1511f45
feat(slog): apply and remove shared logs
empiredan Aug 30, 2023
866e203
feat(slog): apply and remove shared logs
empiredan Aug 31, 2023
eb8be79
feat(slog): apply and remove shared logs
empiredan Sep 1, 2023
2fa23de
feat(slog): apply and remove shared logs
empiredan Sep 1, 2023
6f9e475
feat(slog): apply and remove shared logs
empiredan Sep 1, 2023
e5e01db
feat(slog): apply and remove shared logs
empiredan Sep 1, 2023
c2be2d3
feat(slog): apply and remove shared logs
empiredan Sep 1, 2023
711e5a0
feat(slog): apply and remove shared logs
empiredan Sep 4, 2023
ef5f67c
feat(slog): apply and remove shared logs
empiredan Sep 5, 2023
f804d07
feat(slog): apply and remove shared logs
empiredan Sep 5, 2023
9b43622
feat(slog): apply and remove shared logs
empiredan Sep 6, 2023
11f952b
feat(slog): apply and remove shared logs
empiredan Sep 6, 2023
2199b86
feat(slog): apply and remove shared logs
empiredan Sep 6, 2023
67ae435
feat(slog): apply and remove shared logs
empiredan Sep 6, 2023
409be4d
feat(slog): apply and remove shared logs
empiredan Sep 7, 2023
c6f23c2
feat(slog): apply and remove shared logs
empiredan Sep 8, 2023
6c46666
feat(slog): apply and remove shared logs
empiredan Sep 8, 2023
c05bc2d
feat(slog): apply and remove shared logs
empiredan Sep 11, 2023
755a0ea
feat(slog): apply and remove shared logs
empiredan Sep 19, 2023
280da66
feat(slog): apply and remove shared logs
empiredan Sep 19, 2023
e269d83
feat(slog): apply and remove shared logs
empiredan Sep 20, 2023
9b392b1
feat(slog): apply and remove shared logs
empiredan Sep 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions src/replica/duplication/load_from_private_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

#include <iterator>
#include <map>
#include <utility>

#include "common/duplication_common.h"
Expand Down Expand Up @@ -57,11 +58,11 @@ bool load_from_private_log::will_fail_fast() const
// we try to list all files and select a new one to start (find_log_file_to_start).
bool load_from_private_log::switch_to_next_log_file()
{
auto file_map = _private_log->get_log_file_map();
auto next_file_it = file_map.find(_current->index() + 1);
const auto &file_map = _private_log->get_log_file_map();
const auto &next_file_it = file_map.find(_current->index() + 1);
if (next_file_it != file_map.end()) {
log_file_ptr file;
error_s es = log_utils::open_read(next_file_it->second->path(), file);
const auto &es = log_utils::open_read(next_file_it->second->path(), file);
if (!es.is_ok()) {
LOG_ERROR_PREFIX("{}", es);
_current = nullptr;
Expand Down Expand Up @@ -123,11 +124,11 @@ void load_from_private_log::run()
void load_from_private_log::find_log_file_to_start()
{
// `file_map` has already excluded the useless log files during replica init.
auto file_map = _private_log->get_log_file_map();
const auto &file_map = _private_log->get_log_file_map();

// Reopen the files. Because the internal file handle of `file_map`
// is cleared once WAL replay finished. They are unable to read.
std::map<int, log_file_ptr> new_file_map;
mutation_log::log_file_map_by_index new_file_map;
for (const auto &pr : file_map) {
log_file_ptr file;
error_s es = log_utils::open_read(pr.second->path(), file);
Expand All @@ -141,7 +142,8 @@ void load_from_private_log::find_log_file_to_start()
find_log_file_to_start(std::move(new_file_map));
}

void load_from_private_log::find_log_file_to_start(std::map<int, log_file_ptr> log_file_map)
void load_from_private_log::find_log_file_to_start(
const mutation_log::log_file_map_by_index &log_file_map)
{
_current = nullptr;
if (dsn_unlikely(log_file_map.empty())) {
Expand Down
3 changes: 1 addition & 2 deletions src/replica/duplication/load_from_private_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include <stddef.h>
#include <stdint.h>
#include <chrono>
#include <map>

#include "common/replication_other_types.h"
#include "mutation_batch.h"
Expand Down Expand Up @@ -61,7 +60,7 @@ class load_from_private_log final : public replica_base,

/// Find the log file that contains `_start_decree`.
void find_log_file_to_start();
void find_log_file_to_start(std::map<int, log_file_ptr> log_files);
void find_log_file_to_start(const mutation_log::log_file_map_by_index &log_files);

void replay_log_block();

Expand Down
4 changes: 2 additions & 2 deletions src/replica/duplication/test/duplication_test_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ class duplication_test_base : public replica_test_base
return duplicator;
}

std::map<int, log_file_ptr> open_log_file_map(const std::string &log_dir)
mutation_log::log_file_map_by_index open_log_file_map(const std::string &log_dir)
{
std::map<int, log_file_ptr> log_file_map;
mutation_log::log_file_map_by_index log_file_map;
error_s err = log_utils::open_log_file_map(log_dir, log_file_map);
EXPECT_EQ(err, error_s::ok());
return log_file_map;
Expand Down
18 changes: 9 additions & 9 deletions src/replica/log_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,15 +166,15 @@ log_file::~log_file() { close(); }

log_file::log_file(
const char *path, disk_file *handle, int index, int64_t start_offset, bool is_read)
: _is_read(is_read)
: _crc32(0),
_start_offset(start_offset),
_end_offset(start_offset),
_handle(handle),
_is_read(is_read),
_path(path),
_index(index),
_last_write_time(0)
{
_start_offset = start_offset;
_end_offset = start_offset;
_handle = handle;
_path = path;
_index = index;
_crc32 = 0;
_last_write_time = 0;
memset(&_header, 0, sizeof(_header));

if (is_read) {
Expand Down Expand Up @@ -357,7 +357,7 @@ void log_file::reset_stream(size_t offset /*default = 0*/)
}
}

decree log_file::previous_log_max_decree(const dsn::gpid &pid)
decree log_file::previous_log_max_decree(const dsn::gpid &pid) const
{
auto it = _previous_log_max_decrees.find(pid);
return it == _previous_log_max_decrees.end() ? 0 : it->second.max_decree;
Expand Down
19 changes: 11 additions & 8 deletions src/replica/log_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ struct log_file_header
// a structure to record replica's log info
struct replica_log_info
{
int64_t max_decree;
decree max_decree;
int64_t valid_start_offset; // valid start offset in global space
replica_log_info(int64_t d, int64_t o)
replica_log_info(decree d, int64_t o)
{
max_decree = d;
valid_start_offset = o;
Expand Down Expand Up @@ -184,11 +184,14 @@ class log_file : public ref_counter
// file path
const std::string &path() const { return _path; }
// previous decrees
const replica_log_info_map &previous_log_max_decrees() { return _previous_log_max_decrees; }
const replica_log_info_map &previous_log_max_decrees() const
{
return _previous_log_max_decrees;
}
// previous decree for speicified gpid
decree previous_log_max_decree(const gpid &pid);
decree previous_log_max_decree(const gpid &pid) const;
// file header
log_file_header &header() { return _header; }
const log_file_header &header() const { return _header; }

// read file header from reader, return byte count consumed
int read_file_header(binary_reader &reader);
Expand All @@ -213,16 +216,16 @@ class log_file : public ref_counter
friend class mock_log_file;

uint32_t _crc32;
int64_t _start_offset; // start offset in the global space
const int64_t _start_offset; // start offset in the global space
std::atomic<int64_t>
_end_offset; // end offset in the global space: end_offset = start_offset + file_size
class file_streamer;

std::unique_ptr<file_streamer> _stream;
disk_file *_handle; // file handle
const bool _is_read; // if opened for read or write
std::string _path; // file path
int _index; // file index
const std::string _path; // file path
const int _index; // file index
log_file_header _header; // file header
uint64_t _last_write_time; // seconds from epoch time

Expand Down
Loading