Skip to content

Commit

Permalink
#21 write object asynchronous.
Browse files Browse the repository at this point in the history
  - creates mail data copy to std::string buffer in rados_save_continue
fkt.
  - async op to write object to rados: called in rados_save_finish
  - finish callback is delegated to rados_mail_object.
  - rados_save_finish will wait for completion before returning.
  - TODO (add: wait for completion to correct lifecycle method)
  • Loading branch information
jrse committed Jun 8, 2017
1 parent 074d595 commit 5f9d190
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 4 deletions.
14 changes: 14 additions & 0 deletions src/librmb/rados-mail-object.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,18 @@ RadosMailObject::RadosMailObject() {
this->received_date = 0;
memset(this->guid, 0, GUID_128_SIZE);
this->bytes_written = 0;
completion_private = std::make_shared<librados::AioCompletion>(*librados::Rados::aio_create_completion());
aio_write_successfull = false;
aio_write_finished = false;
}

// exclusive use for write operations (not sure if radps_completion_t can be something else than int
void RadosMailObject::rados_transaction_private_complete_callback(rados_completion_t comp, void* arg) {
int ret_val = (int)comp;
if (ret_val < 0) {
aio_write_successfull = false;
} else {
aio_write_successfull = true;
}
aio_write_finished = true;
}
11 changes: 11 additions & 0 deletions src/librmb/rados-mail-object.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

namespace librmb {

typedef std::shared_ptr<librados::AioCompletion> AioCompletionPtr;

class RadosMailObject {

public:
Expand Down Expand Up @@ -49,6 +51,11 @@ class RadosMailObject {
librados::ObjectWriteOperation& get_write_op() { return this->write_op; }
librados::ObjectReadOperation& get_read_op() { return this->read_op; }

AioCompletionPtr get_completion_private() { return this->completion_private; }
void rados_transaction_private_complete_callback(rados_completion_t comp, void* arg);
bool is_aio_write_successfull() { return this->aio_write_successfull; }
bool is_aio_write_finished() { return this->aio_write_finished; }

private:
std::string oid;
std::string state;
Expand All @@ -64,6 +71,10 @@ class RadosMailObject {
librados::ObjectWriteOperation write_op;
librados::ObjectReadOperation read_op;

AioCompletionPtr completion_private;
bool aio_write_successfull;
bool aio_write_finished;

public:
// X_ATTRIBUTES
static const std::string X_ATTR_STATE;
Expand Down
30 changes: 26 additions & 4 deletions src/storage-rados/rados-save.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ using namespace ceph; // NOLINT

using std::string;


class rados_save_context {
public:
explicit rados_save_context(RadosStorage &rados_storage)
Expand Down Expand Up @@ -263,6 +264,12 @@ static void remove_from_rados(librmb::RadosStorage *_storage, const std::string
i_debug("Librados obj: %s , could not be removed", _oid);
}
}
// delegate completion call to given rados object
static void rados_transaction_private_complete_callback(rados_completion_t comp, void *arg) {
RadosMailObject *rados_mail_object = reinterpret_cast<RadosMailObject *>(arg);
rados_mail_object->rados_transaction_private_complete_callback(comp, NULL);
}

int rados_save_finish(struct mail_save_context *_ctx) {
FUNC_START();
struct rados_save_context *r_ctx = (struct rados_save_context *)_ctx;
Expand All @@ -273,25 +280,40 @@ int rados_save_finish(struct mail_save_context *_ctx) {
if (!r_ctx->failed) {
rados_save_mail_write_metadata(r_ctx);

r_ctx->current_object->get_completion_private()->set_complete_callback(r_ctx->current_object,
rados_transaction_private_complete_callback);

librados::bufferlist mail_data_bl;
mail_data_bl.append(r_ctx->current_object->get_mail_data_ref());
r_ctx->current_object->get_write_op().write_full(mail_data_bl);

int ret = r_storage->s->get_io_ctx().operate(r_ctx->current_object->get_oid(), &r_ctx->current_object->get_write_op());
i_debug("saving to : %s", r_ctx->current_object->get_oid().c_str());
int ret = r_storage->s->get_io_ctx().aio_operate(r_ctx->current_object->get_oid(),
r_ctx->current_object->get_completion_private().get(),
&r_ctx->current_object->get_write_op());

if (ret < 0) {
i_debug("rados_save_finish(): saving object %s to rados failed err=%d(%s)",
r_ctx->current_object->get_oid().c_str(), ret, strerror(-ret));
r_ctx->failed = TRUE;
} else {
// TODO(jrse) replace wait_for_complete to better suited function and
// remember fkt remove_from_rados() if you move wait_for_complete call

r_ctx->current_object->get_completion_private()->wait_for_complete();
r_ctx->failed = r_ctx->current_object->is_aio_write_successfull();

i_debug("saving : %s finished", r_ctx->current_object->get_oid().c_str());
}
}

if (!r_ctx->failed) {
r_ctx->mail_count++;
// r_ctx->saved_oids.push_back(r_ctx->current_object->get_oid());
} else {
r_ctx->current_object->get_write_op().remove();
remove_from_rados(r_storage->s, r_ctx->current_object->get_oid());
if (r_ctx->current_object->is_aio_write_successfull()) {
r_ctx->current_object->get_write_op().remove();
remove_from_rados(r_storage->s, r_ctx->current_object->get_oid());
}
}

r_ctx->finished = TRUE;
Expand Down

0 comments on commit 5f9d190

Please sign in to comment.