Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#310: read osd_max_object_size and fail if mail.size > osd_max_object… #311

Merged
merged 1 commit into from
Apr 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions src/librmb/rados-storage-impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ using librmb::RadosStorageImpl;

#define DICT_USERNAME_SEPARATOR '/'
const char *RadosStorageImpl::CFG_OSD_MAX_WRITE_SIZE = "osd_max_write_size";
const char *RadosStorageImpl::CFG_OSD_MAX_OBJECT_SIZE= "osd_max_object_size";

RadosStorageImpl::RadosStorageImpl(RadosCluster *_cluster) {
cluster = _cluster;
max_write_size = 10;
max_object_size = 134217728; //ceph default 128MB
io_ctx_created = false;
wait_method = WAIT_FOR_COMPLETE_AND_CB;
}
Expand All @@ -46,7 +48,7 @@ int RadosStorageImpl::split_buffer_and_exec_op(RadosMail *current_object,
if (!cluster->is_connected() || !io_ctx_created) {
return -1;
}
std::cout << "splitting buffer and exec op " << std::endl;

current_object->set_completion(librados::Rados::aio_create_completion());

/* librados::ObjectWriteOperation *op =
Expand All @@ -58,10 +60,11 @@ int RadosStorageImpl::split_buffer_and_exec_op(RadosMail *current_object,
ceph::bufferlist tmp_buffer;
assert(max_write > 0);

if (write_buffer_size <= 0 || max_write <= 0) {
if (write_buffer_size <= 0 ||
max_write <= 0) {
ret_val = -1;
return ret_val;
}

uint64_t rest = write_buffer_size % max_write;
int div = write_buffer_size / max_write + (rest > 0 ? 1 : 0);
for (int i = 0; i < div; ++i) {
Expand Down Expand Up @@ -197,9 +200,18 @@ int RadosStorageImpl::create_connection(const std::string &poolname) {
return err;
}
max_write_size = std::stoi(max_write_size_str);

string max_object_size_str;
err = cluster->get_config_option(RadosStorageImpl::CFG_OSD_MAX_OBJECT_SIZE, &max_object_size_str);
if (err < 0) {
return err;
}
max_object_size = std::stoi(max_object_size_str);

if (err == 0) {
io_ctx_created = true;
}

// set the poolname
pool_name = poolname;
return 0;
Expand Down Expand Up @@ -384,6 +396,7 @@ bool RadosStorageImpl::save_mail(librados::ObjectWriteOperation *write_op_xattr,
}
time_t save_date = mail->get_rados_save_date();
write_op_xattr->mtime(&save_date);

int ret = split_buffer_and_exec_op(mail, write_op_xattr, get_max_write_size_bytes());
if (ret != 0) {
write_op_xattr->remove();
Expand Down
5 changes: 5 additions & 0 deletions src/librmb/rados-storage-impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class RadosStorageImpl : public RadosStorage {
void set_ceph_wait_method(enum rbox_ceph_aio_wait_method wait_method_) { this->wait_method = wait_method_; }
int get_max_write_size() override { return max_write_size; }
int get_max_write_size_bytes() override { return max_write_size * 1024 * 1024; }
int get_max_object_size() override {return max_object_size;}

int split_buffer_and_exec_op(RadosMail *current_object, librados::ObjectWriteOperation *write_op_xattr,
const uint64_t &max_write) override;
Expand Down Expand Up @@ -77,14 +78,18 @@ class RadosStorageImpl : public RadosStorage {
private:
RadosCluster *cluster;
int max_write_size;
int max_object_size;
std::string nspace;
librados::IoCtx io_ctx;
bool io_ctx_created;
std::string pool_name;
enum rbox_ceph_aio_wait_method wait_method;

static const char *CFG_OSD_MAX_WRITE_SIZE;
static const char *CFG_OSD_MAX_OBJECT_SIZE;
};



} // namespace librmb

Expand Down
8 changes: 6 additions & 2 deletions src/librmb/rados-storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,17 @@ class RadosStorage {
/* set the wait method for async operations */
virtual void set_ceph_wait_method(enum rbox_ceph_aio_wait_method wait_method) = 0;

/*! get the max object size in mb
/*! get the max operation size in mb
* @return the maximal number of mb to write in a single write operation*/
virtual int get_max_write_size() = 0;
/*! get the max object size in bytes
/*! get the max operation size in bytes
* @return max number of bytes to write in a single write operation*/
virtual int get_max_write_size_bytes() = 0;

/*! get the max ceph object size
*/
virtual int get_max_object_size() = 0;

/*! In case the current object size exceeds the max_write (bytes), object should be split into
* max smaller operations and executed separately.
*
Expand Down
21 changes: 15 additions & 6 deletions src/storage-rbox/rbox-save.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,6 @@ int rbox_save_finish(struct mail_save_context *_ctx) {
r_ctx->failed = true;
i_error("ERROR, mailsize is <= 0 ");
} else {
bool async_write = true;

if (!zlib_plugin_active) {
// write \0 to ceph (length()+1) if stream is not binary
Expand All @@ -533,11 +532,21 @@ int rbox_save_finish(struct mail_save_context *_ctx) {

r_storage->ms->get_storage()->save_metadata(&write_op, r_ctx->rados_mail);

if (!r_storage->config->is_write_chunks()) {
r_ctx->failed = !r_storage->s->save_mail(&write_op, r_ctx->rados_mail, async_write);
} else {
r_ctx->failed = r_storage->s->aio_operate(&r_storage->s->get_io_ctx(), *r_ctx->rados_mail->get_oid(),
r_ctx->rados_mail->get_completion(), &write_op) < 0;
int max_object_size = r_storage->s->get_max_object_size();
i_debug("max_object_size %d mail_size %d",max_object_size, r_ctx->rados_mail->get_mail_size() );
if(max_object_size < r_ctx->rados_mail->get_mail_size()) {
i_error("configured CEPH Object size %d < then mail size %d ", r_storage->s->get_max_object_size(), r_ctx->rados_mail->get_mail_size() );
r_ctx->failed = true;
}else {
if (!r_storage->config->is_write_chunks()) {
bool async_write = true;
i_info("write chunks enabled %d ", r_storage->s->get_max_write_size_bytes() );
r_ctx->failed = !r_storage->s->save_mail(&write_op, r_ctx->rados_mail, async_write);
i_info("write failed? %d",r_ctx->failed);
} else {
r_ctx->failed = r_storage->s->aio_operate(&r_storage->s->get_io_ctx(), *r_ctx->rados_mail->get_oid(),
r_ctx->rados_mail->get_completion(), &write_op) < 0;
}
}
if (r_ctx->failed) {
i_error("saved mail: %s failed metadata_count %ld, mail_size (%d)", r_ctx->rados_mail->get_oid()->c_str(),
Expand Down
2 changes: 2 additions & 0 deletions src/tests/mocks/mock_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class RadosStorageMock : public RadosStorage {

MOCK_METHOD0(get_max_write_size, int());
MOCK_METHOD0(get_max_write_size_bytes, int());
MOCK_METHOD0(get_max_object_size, int());


MOCK_METHOD3(split_buffer_and_exec_op, int(RadosMail *current_object, librados::ObjectWriteOperation *write_op_xattr,
const uint64_t &max_write));
Expand Down