Skip to content

Commit

Permalink
#310: read osd_max_object_size and fail if mail.size > osd_max_object…
Browse files Browse the repository at this point in the history
…_size
  • Loading branch information
jrse committed Apr 2, 2022
1 parent 447695f commit e873ee3
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 11 deletions.
19 changes: 16 additions & 3 deletions src/librmb/rados-storage-impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ using librmb::RadosStorageImpl;

#define DICT_USERNAME_SEPARATOR '/'
const char *RadosStorageImpl::CFG_OSD_MAX_WRITE_SIZE = "osd_max_write_size";
const char *RadosStorageImpl::CFG_OSD_MAX_OBJECT_SIZE= "osd_max_object_size";

RadosStorageImpl::RadosStorageImpl(RadosCluster *_cluster) {
cluster = _cluster;
max_write_size = 10;
max_object_size = 134217728; //ceph default 128MB
io_ctx_created = false;
wait_method = WAIT_FOR_COMPLETE_AND_CB;
}
Expand All @@ -46,7 +48,7 @@ int RadosStorageImpl::split_buffer_and_exec_op(RadosMail *current_object,
if (!cluster->is_connected() || !io_ctx_created) {
return -1;
}
std::cout << "splitting buffer and exec op " << std::endl;

current_object->set_completion(librados::Rados::aio_create_completion());

/* librados::ObjectWriteOperation *op =
Expand All @@ -58,10 +60,11 @@ int RadosStorageImpl::split_buffer_and_exec_op(RadosMail *current_object,
ceph::bufferlist tmp_buffer;
assert(max_write > 0);

if (write_buffer_size <= 0 || max_write <= 0) {
if (write_buffer_size <= 0 ||
max_write <= 0) {
ret_val = -1;
return ret_val;
}

uint64_t rest = write_buffer_size % max_write;
int div = write_buffer_size / max_write + (rest > 0 ? 1 : 0);
for (int i = 0; i < div; ++i) {
Expand Down Expand Up @@ -197,9 +200,18 @@ int RadosStorageImpl::create_connection(const std::string &poolname) {
return err;
}
max_write_size = std::stoi(max_write_size_str);

string max_object_size_str;
err = cluster->get_config_option(RadosStorageImpl::CFG_OSD_MAX_OBJECT_SIZE, &max_object_size_str);
if (err < 0) {
return err;
}
max_object_size = std::stoi(max_object_size_str);

if (err == 0) {
io_ctx_created = true;
}

// set the poolname
pool_name = poolname;
return 0;
Expand Down Expand Up @@ -384,6 +396,7 @@ bool RadosStorageImpl::save_mail(librados::ObjectWriteOperation *write_op_xattr,
}
time_t save_date = mail->get_rados_save_date();
write_op_xattr->mtime(&save_date);

int ret = split_buffer_and_exec_op(mail, write_op_xattr, get_max_write_size_bytes());
if (ret != 0) {
write_op_xattr->remove();
Expand Down
5 changes: 5 additions & 0 deletions src/librmb/rados-storage-impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class RadosStorageImpl : public RadosStorage {
void set_ceph_wait_method(enum rbox_ceph_aio_wait_method wait_method_) { this->wait_method = wait_method_; }
int get_max_write_size() override { return max_write_size; }
int get_max_write_size_bytes() override { return max_write_size * 1024 * 1024; }
int get_max_object_size() override {return max_object_size;}

int split_buffer_and_exec_op(RadosMail *current_object, librados::ObjectWriteOperation *write_op_xattr,
const uint64_t &max_write) override;
Expand Down Expand Up @@ -77,14 +78,18 @@ class RadosStorageImpl : public RadosStorage {
private:
RadosCluster *cluster;
int max_write_size;
int max_object_size;
std::string nspace;
librados::IoCtx io_ctx;
bool io_ctx_created;
std::string pool_name;
enum rbox_ceph_aio_wait_method wait_method;

static const char *CFG_OSD_MAX_WRITE_SIZE;
static const char *CFG_OSD_MAX_OBJECT_SIZE;
};



} // namespace librmb

Expand Down
8 changes: 6 additions & 2 deletions src/librmb/rados-storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,17 @@ class RadosStorage {
/* set the wait method for async operations */
virtual void set_ceph_wait_method(enum rbox_ceph_aio_wait_method wait_method) = 0;

/*! get the max object size in mb
/*! get the max operation size in mb
* @return the maximal number of mb to write in a single write operation*/
virtual int get_max_write_size() = 0;
/*! get the max object size in bytes
/*! get the max operation size in bytes
* @return max number of bytes to write in a single write operation*/
virtual int get_max_write_size_bytes() = 0;

/*! get the max ceph object size
*/
virtual int get_max_object_size() = 0;

/*! In case the current object size exceeds the max_write (bytes), object should be split into
* max smaller operations and executed separately.
*
Expand Down
21 changes: 15 additions & 6 deletions src/storage-rbox/rbox-save.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,6 @@ int rbox_save_finish(struct mail_save_context *_ctx) {
r_ctx->failed = true;
i_error("ERROR, mailsize is <= 0 ");
} else {
bool async_write = true;

if (!zlib_plugin_active) {
// write \0 to ceph (length()+1) if stream is not binary
Expand All @@ -533,11 +532,21 @@ int rbox_save_finish(struct mail_save_context *_ctx) {

r_storage->ms->get_storage()->save_metadata(&write_op, r_ctx->rados_mail);

if (!r_storage->config->is_write_chunks()) {
r_ctx->failed = !r_storage->s->save_mail(&write_op, r_ctx->rados_mail, async_write);
} else {
r_ctx->failed = r_storage->s->aio_operate(&r_storage->s->get_io_ctx(), *r_ctx->rados_mail->get_oid(),
r_ctx->rados_mail->get_completion(), &write_op) < 0;
int max_object_size = r_storage->s->get_max_object_size();
i_debug("max_object_size %d mail_size %d",max_object_size, r_ctx->rados_mail->get_mail_size() );
if(max_object_size < r_ctx->rados_mail->get_mail_size()) {
i_error("configured CEPH Object size %d < then mail size %d ", r_storage->s->get_max_object_size(), r_ctx->rados_mail->get_mail_size() );
r_ctx->failed = true;
}else {
if (!r_storage->config->is_write_chunks()) {
bool async_write = true;
i_info("write chunks enabled %d ", r_storage->s->get_max_write_size_bytes() );
r_ctx->failed = !r_storage->s->save_mail(&write_op, r_ctx->rados_mail, async_write);
i_info("write failed? %d",r_ctx->failed);
} else {
r_ctx->failed = r_storage->s->aio_operate(&r_storage->s->get_io_ctx(), *r_ctx->rados_mail->get_oid(),
r_ctx->rados_mail->get_completion(), &write_op) < 0;
}
}
if (r_ctx->failed) {
i_error("saved mail: %s failed metadata_count %ld, mail_size (%d)", r_ctx->rados_mail->get_oid()->c_str(),
Expand Down
2 changes: 2 additions & 0 deletions src/tests/mocks/mock_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class RadosStorageMock : public RadosStorage {

MOCK_METHOD0(get_max_write_size, int());
MOCK_METHOD0(get_max_write_size_bytes, int());
MOCK_METHOD0(get_max_object_size, int());


MOCK_METHOD3(split_buffer_and_exec_op, int(RadosMail *current_object, librados::ObjectWriteOperation *write_op_xattr,
const uint64_t &max_write));
Expand Down

0 comments on commit e873ee3

Please sign in to comment.