Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat(cold-backup): add rate limit for fds #443

Merged
merged 65 commits into from
May 8, 2020
Merged
Show file tree
Hide file tree
Changes from 62 commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
4df4d4c
rate limiter
Apr 20, 2020
cfa5a50
fix
Apr 20, 2020
18be2e3
fix
Apr 21, 2020
96cfabc
test
Apr 21, 2020
ec56f9c
test
Apr 21, 2020
45507cf
test
Apr 21, 2020
f91890f
fix
Apr 21, 2020
b438617
test
Apr 21, 2020
b8b8083
test
Apr 21, 2020
d30fcbe
test
Apr 21, 2020
0daa28c
fix
Apr 22, 2020
4bc9556
test
Apr 22, 2020
429a0d0
fiux
Apr 22, 2020
5f39178
fix
Apr 22, 2020
6cb6cb3
fix
Apr 22, 2020
e6eb81d
test
Apr 22, 2020
02b9808
test
Apr 22, 2020
4233135
fix
Apr 22, 2020
f4f8720
get content in batches
Apr 22, 2020
52b3a27
add note
Apr 22, 2020
c7f5a8a
fix
Apr 22, 2020
390809b
fix
Apr 22, 2020
38a938c
fix
Apr 22, 2020
597125b
fix
Apr 22, 2020
1630c53
fix
Apr 22, 2020
84d49b1
fix
Apr 22, 2020
0425f28
format
Apr 22, 2020
a8442f5
Merge remote-tracking branch 'origin/master' into backup-rate-limit
Apr 24, 2020
01cc1e0
Merge branch 'master' into backup-rate-limit
acelyc111 Apr 24, 2020
2158d24
Merge branch 'master' into backup-rate-limit
acelyc111 Apr 24, 2020
bddabfb
Update src/dist/block_service/fds/fds_service.h
levy5307 Apr 26, 2020
4ef68ac
Update src/dist/block_service/fds/fds_service.cpp
levy5307 Apr 26, 2020
26552ae
Update src/dist/block_service/fds/fds_service.cpp
levy5307 Apr 26, 2020
dd242f0
format
Apr 26, 2020
957b3a9
fix
Apr 26, 2020
079d078
fix
Apr 26, 2020
73b1b6f
fix
Apr 26, 2020
e36ec11
fix
Apr 26, 2020
c163ea5
Merge branch 'master' into backup-rate-limit
Apr 26, 2020
56ec29b
fix
Apr 26, 2020
0309fea
Merge branch 'backup-rate-limit' of github.com:levy5307/rdsn into bac…
Apr 26, 2020
b03211b
fix
Apr 26, 2020
b634270
Merge branch 'master' into backup-rate-limit
acelyc111 Apr 27, 2020
4743ec8
fix
levy5307 Apr 28, 2020
a43752e
Merge branch 'master' into backup-rate-limit
acelyc111 Apr 28, 2020
2f53f3d
fix
levy5307 Apr 28, 2020
544570c
Merge branch 'backup-rate-limit' of github.com:levy5307/rdsn into bac…
levy5307 Apr 28, 2020
ca107b5
fix
levy5307 Apr 28, 2020
44b73ea
fix
levy5307 Apr 28, 2020
043bad8
fix
levy5307 Apr 28, 2020
bb44ae4
fix
levy5307 Apr 28, 2020
ea2d13c
fix
Apr 29, 2020
287d79c
fix
Apr 29, 2020
94c6b8e
Merge branch 'master' into backup-rate-limit
Apr 30, 2020
c0be4ac
fiux
Apr 30, 2020
b1264e7
Merge branch 'backup-rate-limit' of github.com:levy5307/rdsn into bac…
Apr 30, 2020
5414a78
Merge branch 'master' into backup-rate-limit
acelyc111 Apr 30, 2020
d8d5e90
fix
Apr 30, 2020
645193a
Merge branch 'backup-rate-limit' of github.com:levy5307/rdsn into bac…
Apr 30, 2020
564563a
Merge branch 'master' into backup-rate-limit
Apr 30, 2020
f35c2c6
Merge branch 'master' into backup-rate-limit
acelyc111 Apr 30, 2020
ef9339b
fix
May 7, 2020
74ad3cb
fix
May 7, 2020
e75c1d8
Merge branch 'master' into backup-rate-limit
acelyc111 May 8, 2020
b92cd02
Merge branch 'master' into backup-rate-limit
May 8, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 163 additions & 53 deletions src/dist/block_service/fds/fds_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
#include <memory>
#include <fstream>
#include <string.h>
#include <dsn/utility/defer.h>
#include <dsn/utility/filesystem.h>
#include <dsn/utility/TokenBucket.h>
#include <dsn/dist/fmt_logging.h>

namespace dsn {
namespace dist {
Expand Down Expand Up @@ -95,10 +99,46 @@ DEFINE_THREAD_POOL_CODE(THREAD_POOL_FDS_SERVICE)
DEFINE_TASK_CODE(LPC_FDS_CALL, TASK_PRIORITY_COMMON, THREAD_POOL_FDS_SERVICE)

const std::string fds_service::FILE_LENGTH_CUSTOM_KEY = "x-xiaomi-meta-content-length";
const std::string fds_service::FILE_LENGTH_KEY = "content-length";
const std::string fds_service::FILE_MD5_KEY = "content-md5";

fds_service::fds_service() {}
fds_service::fds_service()
{
const int BYTE_TO_BIT = 8;

/// In normal scenario, the sst file size of level 0 is write_buffer_size * [0.75, 1.25]
/// And in BULK_LOAD scenario, it is 4 * write_buffer_size * [0.75, 1.25].
/// In rdsn, we can't get the scenario, so if we take BULK_LOAD scenario into consideration,
/// we must set max_sst_file_size to 4 * write_buffer_size * [0.75, 1.25], which is too big.
/// So in this implementation, we don't take BULK_LOAD scenario into consideration.
uint64_t target_file_size =
acelyc111 marked this conversation as resolved.
Show resolved Hide resolved
dsn_config_get_value_uint64("pegasus.server",
"rocksdb_target_file_size_base",
64 * 1024 * 1024,
"rocksdb options.target_file_size_base");
uint64_t write_buffer_size = dsn_config_get_value_uint64("pegasus.server",
"rocksdb_write_buffer_size",
64 * 1024 * 1024,
"rocksdb options.write_buffer_size");
uint64_t max_sst_file_size = std::max(target_file_size, (uint64_t)1.25 * write_buffer_size);
Copy link
Contributor

@neverchanje neverchanje Apr 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rDSN & FDS client should not assume anything about Pegasus, as well as the details around rocksdb. Pegasus is the submodule aka one of the storage engines of rDSN. What if we use other storage engines without rocksdb_write_buffer_size?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But we should get the file size, or we must set the burst size to a very big number.
Do you have another better way?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's bit of tricky here, but in the following plan, we are going to merge rdsn and pegasus projects.


uint32_t write_rate_limit = (uint32_t)dsn_config_get_value_uint64(
"replication", "fds_write_limit_rate", 20, "rate limit of fds(Mb/s)");
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
/// For write operation, we can't send a file in batches. Because putContent interface of fds
/// will overwrite what was sent before for the same file. So we must send a file as a whole.
/// If file size > burst size, the file will be rejected by the token bucket.
/// Here we set burst_size = max_sst_file_size + 3MB, a litte greater than max_sst_file_size
uint32_t burst_size =
std::max(3 * write_rate_limit * 1e6 / BYTE_TO_BIT, max_sst_file_size + 3e6);
_write_token_bucket.reset(
new folly::TokenBucket(write_rate_limit * 1e6 / BYTE_TO_BIT, burst_size));

uint32_t read_rate_limit = (uint32_t)dsn_config_get_value_uint64(
"replication", "fds_read_limit_rate", 20, "rate limit of fds(Mb/s)");
burst_size = 3 * read_rate_limit * 1e6 / BYTE_TO_BIT;
_read_token_bucket.reset(
new folly::TokenBucket(read_rate_limit * 1e6 / BYTE_TO_BIT, burst_size));
}

fds_service::~fds_service() {}

/**
Expand Down Expand Up @@ -231,6 +271,7 @@ dsn::task_ptr fds_service::list_dir(const ls_request &req,
return t;
}

// TODO(zhaoliwei) refactor these code, because there have same code in get_file_meta()
dsn::task_ptr fds_service::create_file(const create_file_request &req,
dsn::task_code code,
const create_file_callback &cb,
Expand Down Expand Up @@ -484,82 +525,146 @@ fds_file_object::fds_file_object(fds_service *s,

fds_file_object::~fds_file_object() {}

dsn::error_code fds_file_object::get_content(uint64_t pos,
int64_t length,
/*out*/ std::ostream &os,
/*out*/ uint64_t &transfered_bytes)
error_code fds_file_object::get_file_meta()
levy5307 marked this conversation as resolved.
Show resolved Hide resolved
{
dsn::error_code err;
galaxy::fds::GalaxyFDSClient *c = _service->get_client();
try {
auto meta = c->getObjectMetadata(_service->get_bucket_name(), _fds_path)->metadata();

// get file length
auto iter = meta.find(fds_service::FILE_LENGTH_CUSTOM_KEY);
dassert_f(iter != meta.end(),
"can't find {} in object({})'s metadata",
fds_service::FILE_LENGTH_CUSTOM_KEY.c_str(),
_fds_path.c_str());
bool valid = dsn::buf2uint64(iter->second, _size);
dassert_f(valid, "error to get file size");

// get md5 key
iter = meta.find(fds_service::FILE_MD5_KEY);
dassert_f(iter != meta.end(),
"can't find {} in object({})'s metadata",
fds_service::FILE_MD5_KEY.c_str(),
_fds_path.c_str());
_md5sum = iter->second;

_has_meta_synced = true;
return ERR_OK;
} catch (const galaxy::fds::GalaxyFDSClientException &ex) {
if (ex.code() == Poco::Net::HTTPResponse::HTTP_NOT_FOUND) {
return ERR_OBJECT_NOT_FOUND;
} else {
derror_f("fds getObjectMetadata failed: parameter({}), code({}), msg({})",
_name.c_str(),
ex.code(),
ex.what());
return ERR_FS_INTERNAL;
}
}
}

error_code fds_file_object::get_content_in_batches(uint64_t start,
int64_t length,
/*out*/ std::ostream &os,
/*out*/ uint64_t &transfered_bytes)
{
// the max batch size is 1MB
const uint64_t BATCH_MAX = 1e6;
levy5307 marked this conversation as resolved.
Show resolved Hide resolved
error_code err = ERR_OK;
transfered_bytes = 0;

// get file meta if it is not synced
if (!_has_meta_synced) {
err = get_file_meta();
if (ERR_OK != err) {
return err;
}
}

// if length = -1, it means we should transfer the whole file
uint64_t to_transfer_bytes = (length == -1 ? _size : length);

uint64_t pos = start;
uint64_t once_transfered_bytes = 0;
while (pos < start + to_transfer_bytes) {
uint64_t batch_len = std::min(BATCH_MAX, start + to_transfer_bytes - pos);
// get tokens from token bucket
_service->_read_token_bucket->consumeWithBorrowAndWait(batch_len);

err = get_content(pos, batch_len, os, once_transfered_bytes);
transfered_bytes += once_transfered_bytes;
if (err != ERR_OK || once_transfered_bytes < batch_len) {
return err;
acelyc111 marked this conversation as resolved.
Show resolved Hide resolved
}
pos += batch_len;
}
hycdong marked this conversation as resolved.
Show resolved Hide resolved

return ERR_OK;
}

error_code fds_file_object::get_content(uint64_t pos,
uint64_t length,
/*out*/ std::ostream &os,
/*out*/ uint64_t &transfered_bytes)
{
error_code err = ERR_OK;
transfered_bytes = 0;
while (true) {
if (_has_meta_synced) {
// if we have download enough or we have reach the end
if ((length != -1 && (int64_t)transfered_bytes >= length) ||
transfered_bytes + pos >= _size) {
return dsn::ERR_OK;
}
// if we have download enough or we have reach the end
if (transfered_bytes >= length || transfered_bytes + pos >= _size) {
return ERR_OK;
}

try {
galaxy::fds::GalaxyFDSClient *c = _service->get_client();
std::shared_ptr<galaxy::fds::FDSObject> obj;
if (length == -1)
obj = c->getObject(_service->get_bucket_name(), _fds_path, pos + transfered_bytes);
else
obj = c->getObject(_service->get_bucket_name(),
_fds_path,
pos + transfered_bytes,
length - transfered_bytes);
obj = c->getObject(_service->get_bucket_name(),
_fds_path,
pos + transfered_bytes,
length - transfered_bytes);
dinfo("get object from fds succeed, remote_file(%s)", _fds_path.c_str());
if (!_has_meta_synced) {
const std::map<std::string, std::string> &meta = obj->objectMetadata().metadata();
auto iter = meta.find(fds_service::FILE_MD5_KEY);
if (iter != meta.end()) {
_md5sum = iter->second;
iter = meta.find(fds_service::FILE_LENGTH_KEY);
dassert(iter != meta.end(),
"%s: can't get %s in getObject %s",
_name.c_str(),
fds_service::FILE_LENGTH_KEY.c_str(),
_fds_path.c_str());
_size = atoll(iter->second.c_str());
_has_meta_synced = true;
}
}
std::istream &is = obj->objectContent();
transfered_bytes += utils::copy_stream(is, os, PIECE_SIZE);
err = dsn::ERR_OK;
err = ERR_OK;
} catch (const galaxy::fds::GalaxyFDSClientException &ex) {
derror("fds getObject error: remote_file(%s), code(%d), msg(%s)",
file_name().c_str(),
ex.code(),
ex.what());
if (!_has_meta_synced && ex.code() == Poco::Net::HTTPResponse::HTTP_NOT_FOUND) {
if (ex.code() == Poco::Net::HTTPResponse::HTTP_NOT_FOUND) {
_has_meta_synced = true;
_md5sum = "";
_size = 0;
err = dsn::ERR_OBJECT_NOT_FOUND;
err = ERR_OBJECT_NOT_FOUND;
} else {
err = dsn::ERR_FS_INTERNAL;
err = ERR_FS_INTERNAL;
}
}
FDS_EXCEPTION_HANDLE(err, "getObject", file_name().c_str())

if (err != dsn::ERR_OK) {
if (err != ERR_OK) {
return err;
}
}

return err;
}

dsn::error_code fds_file_object::put_content(/*in-out*/ std::istream &is,
uint64_t &transfered_bytes)
error_code fds_file_object::put_content(/*in-out*/ std::istream &is,
int64_t to_transfer_bytes,
uint64_t &transfered_bytes)
{
dsn::error_code err = dsn::ERR_OK;
error_code err = ERR_OK;
transfered_bytes = 0;
galaxy::fds::GalaxyFDSClient *c = _service->get_client();

// get tokens from token bucket
if (!_service->_write_token_bucket->consumeWithBorrowAndWait(to_transfer_bytes)) {
ddebug_f("the transfer count({}) is greater than burst size({}), so it is rejected by "
"token bucket",
to_transfer_bytes,
_service->_write_token_bucket->burst());
return ERR_BUSY;
}

try {
c->putObject(_service->get_bucket_name(), _fds_path, is, galaxy::fds::FDSObjectMetadata());
} catch (const galaxy::fds::GalaxyFDSClientException &ex) {
Expand All @@ -571,7 +676,7 @@ dsn::error_code fds_file_object::put_content(/*in-out*/ std::istream &is,
}
FDS_EXCEPTION_HANDLE(err, "putObject", file_name().c_str())

if (err != dsn::ERR_OK) {
if (err != ERR_OK) {
return err;
}

Expand Down Expand Up @@ -621,7 +726,7 @@ dsn::task_ptr fds_file_object::write(const write_request &req,
write_response resp;
std::istringstream is;
is.str(std::string(req.buffer.data(), req.buffer.length()));
resp.err = put_content(is, resp.written_size);
resp.err = put_content(is, req.buffer.length(), resp.written_size);

t->enqueue_with(resp);
release_ref();
Expand All @@ -643,6 +748,10 @@ dsn::task_ptr fds_file_object::upload(const upload_request &req,
add_ref();
auto upload_background = [this, req, t]() {
const std::string &local_file = req.input_local_name;
// get file size
int64_t file_sz = 0;
dsn::utils::filesystem::file_size(local_file, file_sz);

upload_response resp;
// TODO: we can cache the whole file in buffer, then upload the buffer rather than the
// ifstream, because if ifstream read file beyond 60s, fds-server will reset the session,
Expand All @@ -658,7 +767,7 @@ dsn::task_ptr fds_file_object::upload(const upload_request &req,
ptr);
resp.err = dsn::ERR_FILE_OPERATION_FAILED;
} else {
resp.err = put_content(is, resp.uploaded_size);
resp.err = put_content(is, file_sz, resp.uploaded_size);
is.close();
}

Expand Down Expand Up @@ -691,7 +800,7 @@ dsn::task_ptr fds_file_object::read(const read_request &req,
read_response resp;
std::ostringstream os;
uint64_t transferd_size;
resp.err = get_content(req.remote_pos, req.remote_length, os, transferd_size);
resp.err = get_content_in_batches(req.remote_pos, req.remote_length, os, transferd_size);
if (os.tellp() > 0) {
std::string *output = new std::string();
*output = os.str();
Expand Down Expand Up @@ -743,7 +852,8 @@ dsn::task_ptr fds_file_object::download(const download_request &req,
auto download_background = [this, req, handle, t]() {
download_response resp;
uint64_t transfered_size;
resp.err = get_content(req.remote_pos, req.remote_length, *handle, transfered_size);
resp.err =
get_content_in_batches(req.remote_pos, req.remote_length, *handle, transfered_size);
resp.downloaded_size = 0;
if (handle->tellp() != -1)
resp.downloaded_size = handle->tellp();
Expand All @@ -755,6 +865,6 @@ dsn::task_ptr fds_file_object::download(const download_request &req,
dsn::tasking::enqueue(LPC_FDS_CALL, nullptr, download_background);
return t;
}
}
}
}
} // namespace block_service
} // namespace dist
} // namespace dsn
29 changes: 24 additions & 5 deletions src/dist/block_service/fds/fds_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@

#include <dsn/dist/block_service.h>

namespace folly {
template <typename Clock>
class BasicTokenBucket;

using TokenBucket = BasicTokenBucket<std::chrono::steady_clock>;
}

namespace galaxy {
namespace fds {
class GalaxyFDSClient;
Expand Down Expand Up @@ -64,6 +71,10 @@ class fds_service : public block_filesystem
private:
std::shared_ptr<galaxy::fds::GalaxyFDSClient> _client;
std::string _bucket_name;
std::unique_ptr<folly::TokenBucket> _read_token_bucket;
std::unique_ptr<folly::TokenBucket> _write_token_bucket;

friend class fds_file_object;
};

class fds_file_object : public block_file
Expand Down Expand Up @@ -101,11 +112,19 @@ class fds_file_object : public block_file
dsn::task_tracker *tracker) override;

private:
dsn::error_code get_content(uint64_t pos,
int64_t length,
/*out*/ std::ostream &os,
/*out*/ uint64_t &transfered_bytes);
dsn::error_code put_content(/*in-out*/ std::istream &is, /*out*/ uint64_t &transfered_bytes);
error_code get_content_in_batches(uint64_t start,
int64_t length,
/*out*/ std::ostream &os,
/*out*/ uint64_t &transfered_bytes);
error_code get_content(uint64_t pos,
uint64_t length,
/*out*/ std::ostream &os,
/*out*/ uint64_t &transfered_bytes);
error_code put_content(/*in-out*/ std::istream &is,
/*int*/ int64_t to_transfer_bytes,
/*out*/ uint64_t &transfered_bytes);
error_code get_file_meta();

fds_service *_service;
std::string _fds_path;
std::string _md5sum;
Expand Down