Skip to content

Commit

Permalink
#36 manual tests
Browse files Browse the repository at this point in the history
- fixed bug waiting for rados write finish: handling of return value was
not valid.
- made some changes to rados-copy.cpp 
- added rbox_add_to_index function (used in rbox-copy instead of
save_begin.
-TODO: rollback in case write to rados not suceessul is untested.
- removed save_continue from rbox-copy
  • Loading branch information
jrse committed Jun 21, 2017
1 parent f5c3037 commit 38ab07c
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 132 deletions.
12 changes: 0 additions & 12 deletions src/librmb/rados-mail-object.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,4 @@ RadosMailObject::RadosMailObject() {
this->received_date = 0;
memset(this->guid, 0, GUID_128_SIZE);
completion_private = std::make_shared<librados::AioCompletion>(*librados::Rados::aio_create_completion());
aio_write_successful = true;
aio_write_finished = false;
}

// exclusive use for write operations (not sure if radps_completion_t can be something else than int
void RadosMailObject::rados_transaction_private_complete_callback(rados_completion_t comp, void* arg) {
int ret_val = (int)comp;

if (ret_val < 0) {
aio_write_successful = false;
}
aio_write_finished = true;
}
5 changes: 0 additions & 5 deletions src/librmb/rados-mail-object.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,6 @@ class RadosMailObject {
librados::ObjectWriteOperation& get_write_op() { return this->write_op; }

AioCompletionPtr get_completion_private() { return this->completion_private; }
void rados_transaction_private_complete_callback(rados_completion_t comp, void* arg);
bool is_aio_write_successful() { return this->aio_write_successful; }
bool is_aio_write_finished() { return this->aio_write_finished; }


private:
Expand All @@ -66,8 +63,6 @@ class RadosMailObject {
librados::ObjectWriteOperation write_op;

AioCompletionPtr completion_private;
bool aio_write_successful;
bool aio_write_finished;

public:
// X_ATTRIBUTES
Expand Down
65 changes: 19 additions & 46 deletions src/storage-rbox/rbox-copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,70 +92,42 @@ static int rbox_mail_storage_try_copy(struct mail_save_context **_ctx, struct ma

i_debug("rbox_mail_storage_try_copy: mail = %p", mail);
debug_print_mail_save_context(*_ctx, "rbox_mail_storage_try_copy", NULL);
int ret_val = 0;

ctx->copying_via_save = TRUE;

if (r_ctx->copying != TRUE) {
/* we need to open the file in any case. caching metadata is unlikely
to help anything. */
pmail->v.set_uid_cache_updates(mail, TRUE);

if (mail_get_stream_because(mail, NULL, NULL, "copying", &input) < 0) {
rbox_mail_copy_set_failed(ctx, mail, "stream");
FUNC_END_RET("ret == -1, mail_get_stream_because failed");
return -1;
}
} else {
if (r_ctx->copying == TRUE) {
if (rbox_get_index_record(mail) < 0) {
rbox_mail_copy_set_failed(ctx, mail, "index record");
FUNC_END_RET("ret == -1, rbox_get_index_record failed");
return -1;
}
}

if (rbox_mail_save_copy_default_metadata(ctx, mail) < 0) {
FUNC_END_RET("ret == -1, mail_save_copy_default_metadata failed");
return -1;
}

if (mailbox_save_begin(_ctx, input) < 0) {
FUNC_END_RET("ret == -1, mailbox_save_begin failed");
return -1;
}

i_debug("rbox_mail_storage_try_copy: dest mail oid = %s", r_ctx->current_object->get_oid().c_str());

if (r_ctx->copying != TRUE) {
ssize_t ret;
do {
if (mailbox_save_continue(ctx) < 0)
break;
ret = i_stream_read(input);
i_assert(ret != 0);
} while (ret != -1);

if (input->stream_errno != 0) {
mail_storage_set_critical(ctx->transaction->box->storage, "copy: i_stream_read(%s) failed: %s",
i_stream_get_name(input), i_stream_get_error(input));
FUNC_END_RET("ret == -1, input->stream_errno != 0");
if (rbox_mail_save_copy_default_metadata(ctx, mail) < 0) {
FUNC_END_RET("ret == -1, mail_save_copy_default_metadata failed");
return -1;
}
}
rbox_add_to_index(ctx);
index_copy_cache_fields(ctx, mail, r_ctx->seq);
mail_set_seq_saving(ctx->dest_mail, r_ctx->seq);
i_debug("rbox_mail_storage_try_copy: dest mail oid = %s", r_ctx->current_object->get_oid().c_str());

if (r_ctx->copying == TRUE) {
librados::ObjectWriteOperation write_op;
struct rbox_mail *rmail = (struct rbox_mail *)mail;
std::string src_oid = rmail->mail_object->get_oid();
i_debug("rbox_mail_storage_try_copy: source mail oid = %s", src_oid.c_str());
r_ctx->current_object->get_write_op().copy_from(src_oid, r_storage->s->get_io_ctx(),
r_storage->s->get_io_ctx().get_last_version());

write_op.copy_from(src_oid, r_storage->s->get_io_ctx(), r_storage->s->get_io_ctx().get_last_version());
ret_val = r_storage->s->get_io_ctx().operate(r_ctx->current_object->get_oid(), &write_op);
}
FUNC_END();
return 0;
return ret_val;
}

int rbox_mail_storage_copy(struct mail_save_context *ctx, struct mail *mail) {
struct rbox_save_context *r_ctx = (struct rbox_save_context *)ctx;

FUNC_START();

i_assert(ctx->copying_or_moving);
r_ctx->finished = TRUE;

if (ctx->data.keywords != NULL) {
/* keywords gets unreferenced twice: first in
Expand All @@ -171,5 +143,6 @@ int rbox_mail_storage_copy(struct mail_save_context *ctx, struct mail *mail) {
return -1;
}
FUNC_END();
return mailbox_save_finish(&ctx);

return mail_storage_copy(ctx, mail);
}
142 changes: 74 additions & 68 deletions src/storage-rbox/rbox-save.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,44 @@ struct mail_save_context *rbox_save_alloc(struct mailbox_transaction_context *t)
return t->save_ctx;
}

void rbox_add_to_index(struct mail_save_context *_ctx) {
struct mail_save_data *mdata = &_ctx->data;
rbox_save_context *r_ctx = (struct rbox_save_context *)_ctx;
enum mail_flags save_flags;

/* add to index */
save_flags = mdata->flags & ~MAIL_RECENT;
mail_index_append(r_ctx->trans, 0, &r_ctx->seq);
mail_index_update_flags(r_ctx->trans, r_ctx->seq, MODIFY_REPLACE, static_cast<mail_flags>(save_flags));
if (_ctx->data.keywords != NULL) {
mail_index_update_keywords(r_ctx->trans, r_ctx->seq, MODIFY_REPLACE, _ctx->data.keywords);
}
if (_ctx->data.min_modseq != 0) {
mail_index_update_modseq(r_ctx->trans, r_ctx->seq, _ctx->data.min_modseq);
}

mail_set_seq_saving(_ctx->dest_mail, r_ctx->seq);

guid_128_generate(r_ctx->mail_oid);

r_ctx->current_object = new RadosMailObject();
r_ctx->current_object->set_oid(guid_128_to_string(r_ctx->mail_oid));

if (mdata->guid != NULL) {
mail_generate_guid_128_hash(mdata->guid, r_ctx->mail_guid);
} else {
guid_128_generate(r_ctx->mail_guid);
}

/* save the 128bit GUID/OID to index record */
struct obox_mail_index_record rec;
i_zero(&rec);
memcpy(rec.guid, r_ctx->mail_guid, sizeof(r_ctx->mail_guid));
memcpy(rec.oid, r_ctx->mail_oid, sizeof(r_ctx->mail_oid));
i_debug("SAVE OID: %s", guid_128_to_string(rec.oid));
mail_index_update_ext(r_ctx->trans, r_ctx->seq, r_ctx->mbox->ext_id, &rec, NULL);
}

int rbox_save_begin(struct mail_save_context *_ctx, struct istream *input) {
FUNC_START();
rbox_save_context *r_ctx = (struct rbox_save_context *)_ctx;
Expand All @@ -67,16 +105,17 @@ int rbox_save_begin(struct mail_save_context *_ctx, struct istream *input) {
struct mailbox_transaction_context *trans = _ctx->transaction;
struct mail_storage *storage = &r_ctx->mbox->storage->storage;
struct rbox_storage *r_storage = (struct rbox_storage *)storage;

int save_flags;
struct istream *crlf_input;

r_ctx->failed = FALSE;

guid_128_generate(r_ctx->mail_oid);
if (r_ctx->copying != TRUE) {
rbox_add_to_index(_ctx);
crlf_input = i_stream_create_crlf(input);
r_ctx->input = index_mail_cache_parse_init(_ctx->dest_mail, crlf_input);
i_stream_unref(&crlf_input);
}

r_ctx->current_object = new RadosMailObject();
r_ctx->current_object->set_oid(guid_128_to_string(r_ctx->mail_oid));

r_ctx->mail_buffer = buffer_create_dynamic(default_pool, 1014);
if (r_ctx->mail_buffer == NULL) {
Expand All @@ -99,24 +138,6 @@ int rbox_save_begin(struct mail_save_context *_ctx, struct istream *input) {
return -1;
}

/* add to index */
save_flags = _ctx->data.flags & ~MAIL_RECENT;
mail_index_append(r_ctx->trans, 0, &r_ctx->seq);
mail_index_update_flags(r_ctx->trans, r_ctx->seq, MODIFY_REPLACE, static_cast<mail_flags>(save_flags));
if (_ctx->data.keywords != NULL) {
mail_index_update_keywords(r_ctx->trans, r_ctx->seq, MODIFY_REPLACE, _ctx->data.keywords);
}
if (_ctx->data.min_modseq != 0) {
mail_index_update_modseq(r_ctx->trans, r_ctx->seq, _ctx->data.min_modseq);
}

mail_set_seq_saving(_ctx->dest_mail, r_ctx->seq);

if (r_ctx->copying != TRUE) {
crlf_input = i_stream_create_crlf(input);
r_ctx->input = index_mail_cache_parse_init(_ctx->dest_mail, crlf_input);
i_stream_unref(&crlf_input);
}
debug_print_mail_save_context(_ctx, "rbox-save::rbox_save_begin", NULL);

FUNC_END();
Expand Down Expand Up @@ -170,20 +191,6 @@ static int rbox_save_mail_write_metadata(struct rbox_save_context *ctx) {
struct rbox_storage *r_storage = (struct rbox_storage *)storage;
struct mail_save_data *mdata = &ctx->ctx.data;

if (ctx->ctx.data.guid != NULL) {
mail_generate_guid_128_hash(ctx->ctx.data.guid, ctx->mail_guid);
} else {
guid_128_generate(ctx->mail_guid);
}

/* save the 128bit GUID/OID to index record */
struct obox_mail_index_record rec;
i_zero(&rec);
memcpy(rec.guid, ctx->mail_guid, sizeof(ctx->mail_guid));
memcpy(rec.oid, ctx->mail_oid, sizeof(ctx->mail_oid));
i_debug("SAVE OID: %s", guid_128_to_string(rec.oid));
mail_index_update_ext(ctx->trans, ctx->seq, mbox->ext_id, &rec, NULL);

{
bufferlist bl;
bl.append((const char *)ctx->mail_guid, sizeof(ctx->mail_guid));
Expand Down Expand Up @@ -225,18 +232,15 @@ static void remove_from_rados(librmb::RadosStorage *_storage, const std::string
// delegate completion call to given rados object
static void rbox_transaction_private_complete_callback(rados_completion_t comp, void *arg) {
RadosMailObject *rados_mail_object = reinterpret_cast<RadosMailObject *>(arg);
rados_mail_object->rados_transaction_private_complete_callback(comp, NULL);
i_debug("mail_saved ! %s is sucessful %d ", rados_mail_object->get_oid().c_str(),
rados_mail_object->is_aio_write_successful());
i_debug("mail_saved ! %s callback ", rados_mail_object->get_oid().c_str());
}

void clean_up_failed(struct rbox_save_context *_r_ctx) {
struct rbox_storage *r_storage = (struct rbox_storage *)&_r_ctx->mbox->storage->storage;

// do some expunges
mail_index_expunge(_r_ctx->trans, _r_ctx->seq);
mail_cache_transaction_reset(_r_ctx->ctx.transaction->cache_trans);

/* delete aio operate */
// TOD(jrse) reenable it mail_cache_transaction_reset(_r_ctx->ctx.transaction->cache_trans);
_r_ctx->current_object->get_completion_private()->wait_for_complete();
_r_ctx->current_object->get_write_op().remove();
remove_from_rados(r_storage->s, _r_ctx->current_object->get_oid());
Expand All @@ -261,33 +265,30 @@ int rbox_save_finish(struct mail_save_context *_ctx) {
int ret = -1;

r_ctx->finished = TRUE;
i_debug("finish called !");
if (_ctx->data.save_date != (time_t)-1) {
struct index_mail *mail = (struct index_mail *)_ctx->dest_mail;
uint32_t t = _ctx->data.save_date;
index_mail_cache_add(mail, MAIL_CACHE_SAVE_DATE, &t, sizeof(t));
}

if (!r_ctx->failed) {
T_BEGIN {
rbox_save_mail_write_metadata(r_ctx);

ret = r_ctx->current_object->get_completion_private()->set_complete_callback(
r_ctx->current_object, rbox_transaction_private_complete_callback);
if (ret == 0) {
if (r_ctx->copying != TRUE) {
librados::bufferlist mail_data_bl;
mail_data_bl.append(str_c(r_ctx->mail_buffer));
r_ctx->current_object->get_write_op().write_full(mail_data_bl);
}
// MAKE SYNC, ASYNC
ret = r_storage->s->get_io_ctx().aio_operate(r_ctx->current_object->get_oid(),
r_ctx->current_object->get_completion_private().get(),
&r_ctx->current_object->get_write_op());
rbox_save_mail_write_metadata(r_ctx);

ret = r_ctx->current_object->get_completion_private()->set_complete_callback(
r_ctx->current_object, rbox_transaction_private_complete_callback);
if (ret == 0) {
if (r_ctx->copying != TRUE) {
i_debug("copying is true ");
librados::bufferlist mail_data_bl;
mail_data_bl.append(str_c(r_ctx->mail_buffer));
r_ctx->current_object->get_write_op().write_full(mail_data_bl);
}
// MAKE SYNC, ASYNC
ret = r_storage->s->get_io_ctx().aio_operate(r_ctx->current_object->get_oid(),
r_ctx->current_object->get_completion_private().get(),
&r_ctx->current_object->get_write_op());
}
r_ctx->failed = ret < 0;
}
T_END;
}

if (r_ctx->failed) {
Expand Down Expand Up @@ -350,13 +351,18 @@ void rbox_transaction_save_commit_post(struct mail_save_context *_ctx,
struct rbox_save_context *r_ctx = (struct rbox_save_context *)_ctx;
struct rbox_storage *r_storage = (struct rbox_storage *)&r_ctx->mbox->storage->storage;

// wait for rados async write to complete
// TODO(jrse): add to rollback
r_ctx->current_object->get_completion_private()->wait_for_complete();
r_ctx->failed = !r_ctx->current_object->is_aio_write_successful();

if (r_ctx->failed) {
clean_up_failed(r_ctx);
if (r_ctx->copying != TRUE) {
int ret = r_storage->s->get_io_ctx().aio_flush();
if (ret != 0) {
r_ctx->failed = true;
} else if (r_ctx->current_object->get_completion_private()->get_return_value() < 0) {
r_ctx->failed = true;
}
// clean
r_ctx->current_object->get_write_op().remove();
if (r_ctx->failed) {
clean_up_failed(r_ctx);
}
}

_ctx->transaction = NULL; /* transaction is already freed */
Expand Down
2 changes: 2 additions & 0 deletions src/storage-rbox/rbox-save.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,6 @@ class rbox_save_context {
unsigned int copying : 1;
};

void rbox_add_to_index(struct mail_save_context *_ctx);

#endif /* SRC_STORAGE_RBOX_RBOX_SAVE_H_ */
1 change: 0 additions & 1 deletion src/storage-rbox/rbox-sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ static void rbox_sync_expunge(struct rbox_sync_context *ctx, uint32_t seq1, uint
// continue anyway
} else {
array_append(&ctx->expunged_items, &item, 1);
mail_index_expunge(ctx->trans, seq1);
}
}
debug_print_rbox_sync_context(ctx, "rbox-sync::rbox_sync_expunge", NULL);
Expand Down

0 comments on commit 38ab07c

Please sign in to comment.