From 656dd1cf97a24d648368b7c4d768bcb386bf5974 Mon Sep 17 00:00:00 2001 From: jan Date: Wed, 17 May 2017 19:59:33 +0200 Subject: [PATCH 1/3] initial version to save e-mailmessage to rados and receive it. NOTE: due to incorrect physical file size and initial access to tmp file, first msg receive will fail --- src/storage-rbox/rbox-mail.c | 54 ++++++++++++--- src/storage-rbox/rbox-save.c | 112 ++++++++++++++++++++++++++------ src/storage-rbox/rbox-storage.c | 51 ++++++++++++++- src/storage-rbox/rbox-storage.h | 9 +++ 4 files changed, 194 insertions(+), 32 deletions(-) diff --git a/src/storage-rbox/rbox-mail.c b/src/storage-rbox/rbox-mail.c index 88a3515a..ca1117a3 100644 --- a/src/storage-rbox/rbox-mail.c +++ b/src/storage-rbox/rbox-mail.c @@ -13,6 +13,8 @@ #include "debug-helper.h" #include "rbox-file.h" #include "rbox-storage.h" +#include +#include "message-part.h" struct mail * rbox_mail_alloc(struct mailbox_transaction_context *t, enum mail_fetch_field wanted_fields, @@ -170,11 +172,11 @@ static int rbox_mail_get_special(struct mail *_mail, enum mail_fetch_field field int rbox_mail_open(struct dbox_mail *mail, uoff_t *offset_r, struct dbox_file **file_r) { FUNC_START(); - - struct mail *_mail = &mail->imail.mail.mail; + +struct mail *_mail = &mail->imail.mail.mail; bool deleted; int ret; - + if (_mail->lookup_abort != MAIL_LOOKUP_ABORT_NEVER) { mail_set_aborted(_mail); rbox_dbg_print_mail(_mail, "rbox_mail_open", NULL); @@ -225,8 +227,6 @@ static int rbox_mail_metadata_read(struct dbox_mail *mail, struct dbox_file **fi return -1; } - i_debug("rbox_mail_metadata_read: offset = %lu, cur_path = %s", offset, (*file_r)->cur_path); - if (dbox_file_seek(*file_r, offset) <= 0) { rbox_dbg_print_mail(&mail->imail.mail.mail, "rbox_mail_metadata_read", NULL); FUNC_END_RET("ret == -1; seek failed"); @@ -259,7 +259,6 @@ static int rbox_mail_metadata_get(struct dbox_mail *mail, enum dbox_metadata_key } *value_r = dbox_file_metadata_get(file, key); - i_debug("rbox_mail_metadata_get: key = %d, value = %s", key, *value_r); rbox_dbg_print_mail(&mail->imail.mail.mail, "rbox_mail_metadata_get", NULL); FUNC_END(); return 0; @@ -393,7 +392,7 @@ static int get_mail_stream(struct dbox_mail *mail, uoff_t offset, struct istream *stream_r = NULL; return ret; } - + *stream_r = i_stream_create_limit(file->input, file->cur_physical_size); if (pmail->v.istream_opened != NULL) { if (pmail->v.istream_opened(&pmail->mail, stream_r) < 0) @@ -403,18 +402,36 @@ static int get_mail_stream(struct dbox_mail *mail, uoff_t offset, struct istream return 1; else return dbox_attachment_file_get_stream(file, stream_r); + + return 1; } + + static int rbox_mail_get_stream(struct mail *_mail, bool get_body ATTR_UNUSED, struct message_size *hdr_size, struct message_size *body_size, struct istream **stream_r) { + struct rbox_storage *storage = (struct rbox_storage *) _mail->box->storage; struct dbox_mail *mail = (struct dbox_mail *) _mail; struct index_mail_data *data = &mail->imail.data; struct istream *input; - uoff_t offset; + struct istream *rados_input; + + uoff_t offset = 0; int ret; + off_t size_r; + if(rbox_mail_get_virtual_size(_mail, &size_r) <0 ){ + i_debug("rbox_mail_get_stream: error getting mail_virtual_size, lookup in rados not yet implemented"); + return -1; + } + + /* temporary guid generation see rbox-save.c */ + char oid[GUID_128_SIZE]; + generate_oid(oid, _mail->box->storage, _mail->seq); + if (data->stream == NULL) { + if (storage->storage.v.mail_open(mail, &offset, &mail->open_file) < 0) return -1; @@ -430,7 +447,25 @@ static int rbox_mail_get_stream(struct mail *_mail, bool get_body ATTR_UNUSED, s return -1; } data->stream = input; - index_mail_set_read_buffer_size(_mail, input); + + + char buffer[size_r]; + int read = 0; + read = rados_read(storage->ceph_io,oid,buffer,size_r,0); + if(read <0){ + return -1; + } + rados_input = i_stream_create_from_data(buffer,size_r); + + i_stream_seek(input,mail->open_file->cur_physical_size); + + if(!i_stream_add_data(input,buffer,size_r)){ + return -1; + } + + i_stream_seek(input,0); + + index_mail_set_read_buffer_size(_mail, data->stream); } return index_mail_init_stream(&mail->imail, hdr_size, body_size, stream_r); @@ -445,7 +480,6 @@ struct mail_vfuncs rbox_mail_vfuncs = { index_mail_prefetch, index_mail_precache, index_mail_add_temp_wanted_fields, - index_mail_get_flags, index_mail_get_keywords, index_mail_get_keyword_indexes, diff --git a/src/storage-rbox/rbox-save.c b/src/storage-rbox/rbox-save.c index f24619db..c697e159 100644 --- a/src/storage-rbox/rbox-save.c +++ b/src/storage-rbox/rbox-save.c @@ -36,6 +36,7 @@ struct rbox_save_context { files; }; + struct dbox_file * rbox_save_file_get_file(struct mailbox_transaction_context *t, uint32_t seq) { FUNC_START(); @@ -113,13 +114,16 @@ void rbox_save_add_file(struct mail_save_context *_ctx, struct dbox_file *file) int rbox_save_begin(struct mail_save_context *_ctx, struct istream *input) { FUNC_START(); struct rbox_save_context *ctx = (struct rbox_save_context *) _ctx; + struct dbox_save_context *ctx_dbox = (struct dbox_save_context*)_ctx; + struct dbox_file *file; int ret; rbox_dbg_print_mail_save_context(_ctx, "rbox_save_begin", NULL); - file = rbox_file_create(ctx->mbox); ctx->append_ctx = dbox_file_append_init(file); + + ret = dbox_file_get_append_stream(ctx->append_ctx, &ctx->ctx.dbox_output); if (ret <= 0) { i_assert(ret != 0); @@ -130,49 +134,113 @@ int rbox_save_begin(struct mail_save_context *_ctx, struct istream *input) { return -1; } ctx->cur_file = file; + + // add mail to index + // write dummy header to file dbox_save_begin(&ctx->ctx, input); + // append file to file list. rbox_save_add_file(_ctx, file); + int count = 0; + FUNC_END(); return ctx->ctx.failed ? -1 : 0; } +off_t stream_mail_to_rados(rados_ioctx_t* ceph_io, char* guid, struct istream *instream) +{ + uoff_t start_offset; + struct const_iovec iov; + const unsigned char *data; + ssize_t ret; + int offset = 0; + start_offset = instream->v_offset; + do{ + (void)i_stream_read_data(instream, &data, &iov.iov_len,0); + if(iov.iov_len ==0){ + /*all sent */ + if(instream->stream_errno !=0){ + return -1; + } + break; + } + + iov.iov_base = data; + + int err = rados_write(*ceph_io,guid,iov.iov_base,iov.iov_len, offset); + if(err <0){ + i_debug("cannot write to pool %lu",ceph_io); + return -1; + } + i_stream_skip(instream,0); + + }while((size_t)ret == iov.iov_len); + + return (off_t)(instream->v_offset - start_offset); +} + int rbox_save_continue(struct mail_save_context *_ctx) { + FUNC_START(); struct dbox_save_context *ctx = (struct dbox_save_context *) _ctx; - struct mail_storage *storage = _ctx->transaction->box->storage; - + struct mail_storage *storage = _ctx->transaction->box->storage; + rbox_dbg_print_mail_save_context(_ctx, "rbox_save_continue", NULL); + rbox_dbg_print_mail_storage(storage,"rbox_save_continue",NULL); + rbox_dbg_print_mail_user(storage->user,"rbox_save_continue",NULL); - if (ctx->failed) { + if (ctx->failed) { FUNC_END_RET("ret == -1; ctx failed"); return -1; } - - if (_ctx->data.attach != NULL) { + + if (_ctx->data.attach != NULL) { FUNC_END_RET("ret == index_attachment_save_continue"); return index_attachment_save_continue(_ctx); } - do { - if (o_stream_send_istream(_ctx->data.output, ctx->input) < 0) { - if (!mail_storage_set_error_from_errno(storage)) { - mail_storage_set_critical(storage, "write(%s) failed: %m", o_stream_get_name(_ctx->data.output)); + if(((struct rbox_storage*) storage)->use_rados_storage ==0 ){ + do { + if (o_stream_send_istream(_ctx->data.output, ctx->input) < 0) { + if (!mail_storage_set_error_from_errno(storage)) { + mail_storage_set_critical(storage, "write(%s) failed: %m", o_stream_get_name(_ctx->data.output)); + } + ctx->failed = TRUE; + FUNC_END_RET("ret == -1; o_stream_send_istream failed"); + return -1; } - ctx->failed = TRUE; - FUNC_END_RET("ret == -1; o_stream_send_istream failed"); - return -1; - } - index_mail_cache_parse_continue(_ctx->dest_mail); + index_mail_cache_parse_continue(_ctx->dest_mail); + + // both tee input readers may consume data from our primary + // input stream. we'll have to make sure we don't return with + // one of the streams still having data in them. + } while (i_stream_read(ctx->input) > 0); + + } + else{ + do { + /* temporary guid generation see rbox-save.c */ + char oid[GUID_128_SIZE]; + generate_oid(oid,_ctx->transaction->box->storage, ctx->seq); + + if (stream_mail_to_rados(&((struct rbox_storage*) storage)->ceph_io, oid, ctx->input) < 0) { + if (!mail_storage_set_error_from_errno(storage)) { + mail_storage_set_critical(storage, "write(%s) failed: %m", o_stream_get_name(_ctx->data.output)); + } + ctx->failed = TRUE; + FUNC_END_RET("ret == -1; o_stream_send_istream failed"); + return -1; + } + index_mail_cache_parse_continue(_ctx->dest_mail); + + } while (i_stream_read(ctx->input) > 0); + } - /* both tee input readers may consume data from our primary - input stream. we'll have to make sure we don't return with - one of the streams still having data in them. */ - } while (i_stream_read(ctx->input) > 0); FUNC_END(); return 0; } + static int dbox_save_mail_write_metadata(struct dbox_save_context *ctx, struct dbox_file *file) { FUNC_START(); struct rbox_file *sfile = (struct rbox_file *) file; @@ -225,7 +293,7 @@ static int dbox_save_finish_write(struct mail_save_context *_ctx) { struct dbox_file **files; rbox_dbg_print_mail_save_context(_ctx, "dbox_save_finish_write", NULL); - + ctx->ctx.finished = TRUE; if (ctx->ctx.dbox_output == NULL) { FUNC_END_RET("ret == -1; ctx.dbox_output == NULL"); @@ -271,10 +339,12 @@ static int dbox_save_finish_write(struct mail_save_context *_ctx) { int rbox_save_finish(struct mail_save_context *ctx) { FUNC_START(); - int ret; + int ret; ret = dbox_save_finish_write(ctx); index_save_context_free(ctx); + int file_fd = ( (struct rbox_file*) ((struct rbox_save_context*)ctx)->cur_file)->uid; + FUNC_END(); return ret; } diff --git a/src/storage-rbox/rbox-storage.c b/src/storage-rbox/rbox-storage.c index 3c7d3559..570d26f3 100644 --- a/src/storage-rbox/rbox-storage.c +++ b/src/storage-rbox/rbox-storage.c @@ -51,7 +51,9 @@ static int rbox_storage_create(struct mail_storage *_storage, struct mail_namesp FUNC_START(); struct rbox_storage *storage = (struct rbox_storage *) _storage; enum fs_properties props; + + storage->use_rados_storage = 1; if (dbox_storage_create(_storage, ns, error_r) < 0) { FUNC_END_RET("ret == -1; dbox_storage_create failed"); return -1; @@ -68,6 +70,40 @@ static int rbox_storage_create(struct mail_storage *_storage, struct mail_namesp } } rbox_dbg_print_mail_storage(_storage, "rbox_storage_create", NULL); + + // initialize c rados_ctx (test cluster, pool and user name !) + + char cluster_name[] = "ceph"; + char *poolname = "rbd"; + char user_name[] = "client.admin"; + uint64_t flags; + + int err; + err = rados_create2(&storage->cluster, cluster_name, user_name, flags); + + if (err < 0) { + exit(EXIT_FAILURE); + } + + err = rados_conf_parse_env(storage->cluster,NULL); + if (err < 0) { + exit(EXIT_FAILURE); + } + + err = rados_conf_read_file(storage->cluster, NULL); + if(err <0){ + exit(EXIT_FAILURE); + } + err = rados_connect(storage->cluster); + if (err < 0) { + exit(EXIT_FAILURE); + } + + err = rados_ioctx_create(storage->cluster,poolname, &storage->ceph_io); + if(err < 0){ + exit(EXIT_FAILURE); + } + FUNC_END(); return 0; } @@ -81,6 +117,11 @@ static void rbox_storage_destroy(struct mail_storage *_storage) { if (storage->storage.attachment_fs != NULL) fs_deinit(&storage->storage.attachment_fs); index_storage_destroy(_storage); + + // destroy + rados_ioctx_destroy(storage->ceph_io); + rados_shutdown(storage->cluster); + FUNC_END(); } @@ -460,13 +501,14 @@ static int rbox_mailbox_get_metadata(struct mailbox *box, enum mailbox_metadata_ if (index_mailbox_get_metadata(box, items, metadata_r) < 0) { FUNC_END_RET("ret == -1; index_mailbox_get_metadata failed"); + i_debug("jrse_: index_mailbox_get_metadata failed"); return -1; } if ((items & MAILBOX_METADATA_GUID) != 0) { memcpy(metadata_r->guid, mbox->mailbox_guid, sizeof(metadata_r->guid)); } FUNC_END(); - return 0; + return -1; } static int rbox_mailbox_update(struct mailbox *box, const struct mailbox_update *update) { @@ -503,6 +545,13 @@ static void rbox_notify_changes(struct mailbox *box) { FUNC_END(); } +void generate_oid(char* oid, struct mail_storage *storage, int mail_uid){ + int32_t mail_user_uid = storage->user->uid; + sprintf(oid,"INBOX.%d%d",mail_user_uid,mail_uid); +} + + + struct mail_storage rbox_storage = { .name = RBOX_STORAGE_NAME, .class_flags = MAIL_STORAGE_CLASS_FLAG_FILE_PER_MSG | MAIL_STORAGE_CLASS_FLAG_HAVE_MAIL_GUIDS | MAIL_STORAGE_CLASS_FLAG_HAVE_MAIL_SAVE_GUIDS | MAIL_STORAGE_CLASS_FLAG_BINARY_DATA | MAIL_STORAGE_CLASS_FLAG_STUBS, .v = { diff --git a/src/storage-rbox/rbox-storage.h b/src/storage-rbox/rbox-storage.h index 3d760872..7b48ecfe 100644 --- a/src/storage-rbox/rbox-storage.h +++ b/src/storage-rbox/rbox-storage.h @@ -3,6 +3,7 @@ #include "index-storage.h" #include "dbox-storage.h" +#include #define RBOX_STORAGE_NAME "rbox" #define RBOX_MAIL_FILE_PREFIX "u." @@ -19,6 +20,12 @@ struct rbox_index_header { struct rbox_storage { struct dbox_storage storage; + /* rados cluster and io ctx */ + rados_t cluster; + rados_ioctx_t ceph_io; + // currently used to control if mail is stored + // in rados or in a local file + int use_rados_storage; }; struct obox_mail_index_record { @@ -68,4 +75,6 @@ void rbox_transaction_save_rollback(struct mail_save_context *ctx); int rbox_copy(struct mail_save_context *ctx, struct mail *mail); +void generate_oid(char* oid, struct mail_storage *storage, int mail_uid); + #endif From f413a61441a62b6bf12b1f1c934694e6cfd04149 Mon Sep 17 00:00:00 2001 From: jan Date: Wed, 17 May 2017 20:41:17 +0200 Subject: [PATCH 2/3] remove duplicated code --- src/storage-rbox/rbox-save.c | 60 ++++++++++++++++-------------------- 1 file changed, 26 insertions(+), 34 deletions(-) diff --git a/src/storage-rbox/rbox-save.c b/src/storage-rbox/rbox-save.c index c697e159..e05c0a0b 100644 --- a/src/storage-rbox/rbox-save.c +++ b/src/storage-rbox/rbox-save.c @@ -155,9 +155,10 @@ off_t stream_mail_to_rados(rados_ioctx_t* ceph_io, char* guid, struct istream *i ssize_t ret; int offset = 0; start_offset = instream->v_offset; + do{ (void)i_stream_read_data(instream, &data, &iov.iov_len,0); - if(iov.iov_len ==0){ + if(iov.iov_len ==0){ /*all sent */ if(instream->stream_errno !=0){ return -1; @@ -166,10 +167,9 @@ off_t stream_mail_to_rados(rados_ioctx_t* ceph_io, char* guid, struct istream *i } iov.iov_base = data; - + int err = rados_write(*ceph_io,guid,iov.iov_base,iov.iov_len, offset); if(err <0){ - i_debug("cannot write to pool %lu",ceph_io); return -1; } i_stream_skip(instream,0); @@ -184,7 +184,8 @@ int rbox_save_continue(struct mail_save_context *_ctx) { FUNC_START(); struct dbox_save_context *ctx = (struct dbox_save_context *) _ctx; struct mail_storage *storage = _ctx->transaction->box->storage; - + struct rbox_storage *rbox_ctx = (struct rbox_storage *) storage; + rbox_dbg_print_mail_save_context(_ctx, "rbox_save_continue", NULL); rbox_dbg_print_mail_storage(storage,"rbox_save_continue",NULL); rbox_dbg_print_mail_user(storage->user,"rbox_save_continue",NULL); @@ -198,43 +199,34 @@ int rbox_save_continue(struct mail_save_context *_ctx) { FUNC_END_RET("ret == index_attachment_save_continue"); return index_attachment_save_continue(_ctx); } + + /* temporary guid generation see rbox-mail.c */ + char oid[GUID_128_SIZE]; + generate_oid(oid,_ctx->transaction->box->storage, ctx->seq); + int bytes_written = 0; + do { + if(((struct rbox_storage*) storage)->use_rados_storage ==0){ + bytes_written = o_stream_send_istream(_ctx->data.output, ctx->input); + }else{ + bytes_written = stream_mail_to_rados(&rbox_ctx->ceph_io, oid, ctx->input); + } - if(((struct rbox_storage*) storage)->use_rados_storage ==0 ){ - do { - if (o_stream_send_istream(_ctx->data.output, ctx->input) < 0) { - if (!mail_storage_set_error_from_errno(storage)) { - mail_storage_set_critical(storage, "write(%s) failed: %m", o_stream_get_name(_ctx->data.output)); - } - ctx->failed = TRUE; - FUNC_END_RET("ret == -1; o_stream_send_istream failed"); - return -1; + if (bytes_written < 0) { + if (!mail_storage_set_error_from_errno(storage)) { + mail_storage_set_critical(storage, "write(%s) failed: %m", + o_stream_get_name(_ctx->data.output)); } - index_mail_cache_parse_continue(_ctx->dest_mail); + ctx->failed = TRUE; + FUNC_END_RET("ret == -1; o_stream_send_istream failed"); + return -1; + } + index_mail_cache_parse_continue(_ctx->dest_mail); // both tee input readers may consume data from our primary // input stream. we'll have to make sure we don't return with // one of the streams still having data in them. - } while (i_stream_read(ctx->input) > 0); + } while (i_stream_read(ctx->input) > 0); - } - else{ - do { - /* temporary guid generation see rbox-save.c */ - char oid[GUID_128_SIZE]; - generate_oid(oid,_ctx->transaction->box->storage, ctx->seq); - - if (stream_mail_to_rados(&((struct rbox_storage*) storage)->ceph_io, oid, ctx->input) < 0) { - if (!mail_storage_set_error_from_errno(storage)) { - mail_storage_set_critical(storage, "write(%s) failed: %m", o_stream_get_name(_ctx->data.output)); - } - ctx->failed = TRUE; - FUNC_END_RET("ret == -1; o_stream_send_istream failed"); - return -1; - } - index_mail_cache_parse_continue(_ctx->dest_mail); - - } while (i_stream_read(ctx->input) > 0); - } FUNC_END(); return 0; From a5783277353d688d1412f4c4d8f7665f223fc4ff Mon Sep 17 00:00:00 2001 From: jan Date: Wed, 17 May 2017 20:53:44 +0200 Subject: [PATCH 3/3] added comment to test oid generate function, removed unnecessary cast from rbox-save.c, fixed invalid use of tmail_virtual_size to determine mail size in rados --- src/storage-rbox/rbox-mail.c | 6 ++---- src/storage-rbox/rbox-save.c | 2 +- src/storage-rbox/rbox-storage.c | 1 + 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/storage-rbox/rbox-mail.c b/src/storage-rbox/rbox-mail.c index ca1117a3..7492b6b3 100644 --- a/src/storage-rbox/rbox-mail.c +++ b/src/storage-rbox/rbox-mail.c @@ -421,11 +421,10 @@ static int rbox_mail_get_stream(struct mail *_mail, bool get_body ATTR_UNUSED, s int ret; off_t size_r; - if(rbox_mail_get_virtual_size(_mail, &size_r) <0 ){ - i_debug("rbox_mail_get_stream: error getting mail_virtual_size, lookup in rados not yet implemented"); + if(rbox_mail_get_physical_size(_mail,&size_r) <0){ return -1; } - + /* temporary guid generation see rbox-save.c */ char oid[GUID_128_SIZE]; generate_oid(oid, _mail->box->storage, _mail->seq); @@ -448,7 +447,6 @@ static int rbox_mail_get_stream(struct mail *_mail, bool get_body ATTR_UNUSED, s } data->stream = input; - char buffer[size_r]; int read = 0; read = rados_read(storage->ceph_io,oid,buffer,size_r,0); diff --git a/src/storage-rbox/rbox-save.c b/src/storage-rbox/rbox-save.c index e05c0a0b..6dae6ae0 100644 --- a/src/storage-rbox/rbox-save.c +++ b/src/storage-rbox/rbox-save.c @@ -205,7 +205,7 @@ int rbox_save_continue(struct mail_save_context *_ctx) { generate_oid(oid,_ctx->transaction->box->storage, ctx->seq); int bytes_written = 0; do { - if(((struct rbox_storage*) storage)->use_rados_storage ==0){ + if(rbox_ctx->use_rados_storage ==0){ bytes_written = o_stream_send_istream(_ctx->data.output, ctx->input); }else{ bytes_written = stream_mail_to_rados(&rbox_ctx->ceph_io, oid, ctx->input); diff --git a/src/storage-rbox/rbox-storage.c b/src/storage-rbox/rbox-storage.c index 570d26f3..8771849d 100644 --- a/src/storage-rbox/rbox-storage.c +++ b/src/storage-rbox/rbox-storage.c @@ -545,6 +545,7 @@ static void rbox_notify_changes(struct mailbox *box) { FUNC_END(); } +//TODO: remove test function to generate oid (used in rbox-mail.c, rbox-save.c) void generate_oid(char* oid, struct mail_storage *storage, int mail_uid){ int32_t mail_user_uid = storage->user->uid; sprintf(oid,"INBOX.%d%d",mail_user_uid,mail_uid);