diff --git a/src/storage-rbox/rbox-mail.c b/src/storage-rbox/rbox-mail.c index 88a3515a..7492b6b3 100644 --- a/src/storage-rbox/rbox-mail.c +++ b/src/storage-rbox/rbox-mail.c @@ -13,6 +13,8 @@ #include "debug-helper.h" #include "rbox-file.h" #include "rbox-storage.h" +#include +#include "message-part.h" struct mail * rbox_mail_alloc(struct mailbox_transaction_context *t, enum mail_fetch_field wanted_fields, @@ -170,11 +172,11 @@ static int rbox_mail_get_special(struct mail *_mail, enum mail_fetch_field field int rbox_mail_open(struct dbox_mail *mail, uoff_t *offset_r, struct dbox_file **file_r) { FUNC_START(); - - struct mail *_mail = &mail->imail.mail.mail; + +struct mail *_mail = &mail->imail.mail.mail; bool deleted; int ret; - + if (_mail->lookup_abort != MAIL_LOOKUP_ABORT_NEVER) { mail_set_aborted(_mail); rbox_dbg_print_mail(_mail, "rbox_mail_open", NULL); @@ -225,8 +227,6 @@ static int rbox_mail_metadata_read(struct dbox_mail *mail, struct dbox_file **fi return -1; } - i_debug("rbox_mail_metadata_read: offset = %lu, cur_path = %s", offset, (*file_r)->cur_path); - if (dbox_file_seek(*file_r, offset) <= 0) { rbox_dbg_print_mail(&mail->imail.mail.mail, "rbox_mail_metadata_read", NULL); FUNC_END_RET("ret == -1; seek failed"); @@ -259,7 +259,6 @@ static int rbox_mail_metadata_get(struct dbox_mail *mail, enum dbox_metadata_key } *value_r = dbox_file_metadata_get(file, key); - i_debug("rbox_mail_metadata_get: key = %d, value = %s", key, *value_r); rbox_dbg_print_mail(&mail->imail.mail.mail, "rbox_mail_metadata_get", NULL); FUNC_END(); return 0; @@ -393,7 +392,7 @@ static int get_mail_stream(struct dbox_mail *mail, uoff_t offset, struct istream *stream_r = NULL; return ret; } - + *stream_r = i_stream_create_limit(file->input, file->cur_physical_size); if (pmail->v.istream_opened != NULL) { if (pmail->v.istream_opened(&pmail->mail, stream_r) < 0) @@ -403,18 +402,35 @@ static int get_mail_stream(struct dbox_mail *mail, uoff_t offset, struct istream return 1; else return dbox_attachment_file_get_stream(file, stream_r); + + return 1; } + + static int rbox_mail_get_stream(struct mail *_mail, bool get_body ATTR_UNUSED, struct message_size *hdr_size, struct message_size *body_size, struct istream **stream_r) { + struct rbox_storage *storage = (struct rbox_storage *) _mail->box->storage; struct dbox_mail *mail = (struct dbox_mail *) _mail; struct index_mail_data *data = &mail->imail.data; struct istream *input; - uoff_t offset; + struct istream *rados_input; + + uoff_t offset = 0; int ret; + off_t size_r; + if(rbox_mail_get_physical_size(_mail,&size_r) <0){ + return -1; + } + + /* temporary guid generation see rbox-save.c */ + char oid[GUID_128_SIZE]; + generate_oid(oid, _mail->box->storage, _mail->seq); + if (data->stream == NULL) { + if (storage->storage.v.mail_open(mail, &offset, &mail->open_file) < 0) return -1; @@ -430,7 +446,24 @@ static int rbox_mail_get_stream(struct mail *_mail, bool get_body ATTR_UNUSED, s return -1; } data->stream = input; - index_mail_set_read_buffer_size(_mail, input); + + char buffer[size_r]; + int read = 0; + read = rados_read(storage->ceph_io,oid,buffer,size_r,0); + if(read <0){ + return -1; + } + rados_input = i_stream_create_from_data(buffer,size_r); + + i_stream_seek(input,mail->open_file->cur_physical_size); + + if(!i_stream_add_data(input,buffer,size_r)){ + return -1; + } + + i_stream_seek(input,0); + + index_mail_set_read_buffer_size(_mail, data->stream); } return index_mail_init_stream(&mail->imail, hdr_size, body_size, stream_r); @@ -445,7 +478,6 @@ struct mail_vfuncs rbox_mail_vfuncs = { index_mail_prefetch, index_mail_precache, index_mail_add_temp_wanted_fields, - index_mail_get_flags, index_mail_get_keywords, index_mail_get_keyword_indexes, diff --git a/src/storage-rbox/rbox-save.c b/src/storage-rbox/rbox-save.c index f24619db..6dae6ae0 100644 --- a/src/storage-rbox/rbox-save.c +++ b/src/storage-rbox/rbox-save.c @@ -36,6 +36,7 @@ struct rbox_save_context { files; }; + struct dbox_file * rbox_save_file_get_file(struct mailbox_transaction_context *t, uint32_t seq) { FUNC_START(); @@ -113,13 +114,16 @@ void rbox_save_add_file(struct mail_save_context *_ctx, struct dbox_file *file) int rbox_save_begin(struct mail_save_context *_ctx, struct istream *input) { FUNC_START(); struct rbox_save_context *ctx = (struct rbox_save_context *) _ctx; + struct dbox_save_context *ctx_dbox = (struct dbox_save_context*)_ctx; + struct dbox_file *file; int ret; rbox_dbg_print_mail_save_context(_ctx, "rbox_save_begin", NULL); - file = rbox_file_create(ctx->mbox); ctx->append_ctx = dbox_file_append_init(file); + + ret = dbox_file_get_append_stream(ctx->append_ctx, &ctx->ctx.dbox_output); if (ret <= 0) { i_assert(ret != 0); @@ -130,34 +134,87 @@ int rbox_save_begin(struct mail_save_context *_ctx, struct istream *input) { return -1; } ctx->cur_file = file; + + // add mail to index + // write dummy header to file dbox_save_begin(&ctx->ctx, input); + // append file to file list. rbox_save_add_file(_ctx, file); + int count = 0; + FUNC_END(); return ctx->ctx.failed ? -1 : 0; } +off_t stream_mail_to_rados(rados_ioctx_t* ceph_io, char* guid, struct istream *instream) +{ + uoff_t start_offset; + struct const_iovec iov; + const unsigned char *data; + ssize_t ret; + int offset = 0; + start_offset = instream->v_offset; + + do{ + (void)i_stream_read_data(instream, &data, &iov.iov_len,0); + if(iov.iov_len ==0){ + /*all sent */ + if(instream->stream_errno !=0){ + return -1; + } + break; + } + + iov.iov_base = data; + + int err = rados_write(*ceph_io,guid,iov.iov_base,iov.iov_len, offset); + if(err <0){ + return -1; + } + i_stream_skip(instream,0); + + }while((size_t)ret == iov.iov_len); + + return (off_t)(instream->v_offset - start_offset); +} + int rbox_save_continue(struct mail_save_context *_ctx) { + FUNC_START(); struct dbox_save_context *ctx = (struct dbox_save_context *) _ctx; - struct mail_storage *storage = _ctx->transaction->box->storage; + struct mail_storage *storage = _ctx->transaction->box->storage; + struct rbox_storage *rbox_ctx = (struct rbox_storage *) storage; rbox_dbg_print_mail_save_context(_ctx, "rbox_save_continue", NULL); + rbox_dbg_print_mail_storage(storage,"rbox_save_continue",NULL); + rbox_dbg_print_mail_user(storage->user,"rbox_save_continue",NULL); - if (ctx->failed) { + if (ctx->failed) { FUNC_END_RET("ret == -1; ctx failed"); return -1; } - - if (_ctx->data.attach != NULL) { + + if (_ctx->data.attach != NULL) { FUNC_END_RET("ret == index_attachment_save_continue"); return index_attachment_save_continue(_ctx); } + + /* temporary guid generation see rbox-mail.c */ + char oid[GUID_128_SIZE]; + generate_oid(oid,_ctx->transaction->box->storage, ctx->seq); + int bytes_written = 0; + do { + if(rbox_ctx->use_rados_storage ==0){ + bytes_written = o_stream_send_istream(_ctx->data.output, ctx->input); + }else{ + bytes_written = stream_mail_to_rados(&rbox_ctx->ceph_io, oid, ctx->input); + } - do { - if (o_stream_send_istream(_ctx->data.output, ctx->input) < 0) { + if (bytes_written < 0) { if (!mail_storage_set_error_from_errno(storage)) { - mail_storage_set_critical(storage, "write(%s) failed: %m", o_stream_get_name(_ctx->data.output)); + mail_storage_set_critical(storage, "write(%s) failed: %m", + o_stream_get_name(_ctx->data.output)); } ctx->failed = TRUE; FUNC_END_RET("ret == -1; o_stream_send_istream failed"); @@ -165,14 +222,17 @@ int rbox_save_continue(struct mail_save_context *_ctx) { } index_mail_cache_parse_continue(_ctx->dest_mail); - /* both tee input readers may consume data from our primary - input stream. we'll have to make sure we don't return with - one of the streams still having data in them. */ + // both tee input readers may consume data from our primary + // input stream. we'll have to make sure we don't return with + // one of the streams still having data in them. } while (i_stream_read(ctx->input) > 0); + + FUNC_END(); return 0; } + static int dbox_save_mail_write_metadata(struct dbox_save_context *ctx, struct dbox_file *file) { FUNC_START(); struct rbox_file *sfile = (struct rbox_file *) file; @@ -225,7 +285,7 @@ static int dbox_save_finish_write(struct mail_save_context *_ctx) { struct dbox_file **files; rbox_dbg_print_mail_save_context(_ctx, "dbox_save_finish_write", NULL); - + ctx->ctx.finished = TRUE; if (ctx->ctx.dbox_output == NULL) { FUNC_END_RET("ret == -1; ctx.dbox_output == NULL"); @@ -271,10 +331,12 @@ static int dbox_save_finish_write(struct mail_save_context *_ctx) { int rbox_save_finish(struct mail_save_context *ctx) { FUNC_START(); - int ret; + int ret; ret = dbox_save_finish_write(ctx); index_save_context_free(ctx); + int file_fd = ( (struct rbox_file*) ((struct rbox_save_context*)ctx)->cur_file)->uid; + FUNC_END(); return ret; } diff --git a/src/storage-rbox/rbox-storage.c b/src/storage-rbox/rbox-storage.c index 3c7d3559..8771849d 100644 --- a/src/storage-rbox/rbox-storage.c +++ b/src/storage-rbox/rbox-storage.c @@ -51,7 +51,9 @@ static int rbox_storage_create(struct mail_storage *_storage, struct mail_namesp FUNC_START(); struct rbox_storage *storage = (struct rbox_storage *) _storage; enum fs_properties props; + + storage->use_rados_storage = 1; if (dbox_storage_create(_storage, ns, error_r) < 0) { FUNC_END_RET("ret == -1; dbox_storage_create failed"); return -1; @@ -68,6 +70,40 @@ static int rbox_storage_create(struct mail_storage *_storage, struct mail_namesp } } rbox_dbg_print_mail_storage(_storage, "rbox_storage_create", NULL); + + // initialize c rados_ctx (test cluster, pool and user name !) + + char cluster_name[] = "ceph"; + char *poolname = "rbd"; + char user_name[] = "client.admin"; + uint64_t flags; + + int err; + err = rados_create2(&storage->cluster, cluster_name, user_name, flags); + + if (err < 0) { + exit(EXIT_FAILURE); + } + + err = rados_conf_parse_env(storage->cluster,NULL); + if (err < 0) { + exit(EXIT_FAILURE); + } + + err = rados_conf_read_file(storage->cluster, NULL); + if(err <0){ + exit(EXIT_FAILURE); + } + err = rados_connect(storage->cluster); + if (err < 0) { + exit(EXIT_FAILURE); + } + + err = rados_ioctx_create(storage->cluster,poolname, &storage->ceph_io); + if(err < 0){ + exit(EXIT_FAILURE); + } + FUNC_END(); return 0; } @@ -81,6 +117,11 @@ static void rbox_storage_destroy(struct mail_storage *_storage) { if (storage->storage.attachment_fs != NULL) fs_deinit(&storage->storage.attachment_fs); index_storage_destroy(_storage); + + // destroy + rados_ioctx_destroy(storage->ceph_io); + rados_shutdown(storage->cluster); + FUNC_END(); } @@ -460,13 +501,14 @@ static int rbox_mailbox_get_metadata(struct mailbox *box, enum mailbox_metadata_ if (index_mailbox_get_metadata(box, items, metadata_r) < 0) { FUNC_END_RET("ret == -1; index_mailbox_get_metadata failed"); + i_debug("jrse_: index_mailbox_get_metadata failed"); return -1; } if ((items & MAILBOX_METADATA_GUID) != 0) { memcpy(metadata_r->guid, mbox->mailbox_guid, sizeof(metadata_r->guid)); } FUNC_END(); - return 0; + return -1; } static int rbox_mailbox_update(struct mailbox *box, const struct mailbox_update *update) { @@ -503,6 +545,14 @@ static void rbox_notify_changes(struct mailbox *box) { FUNC_END(); } +//TODO: remove test function to generate oid (used in rbox-mail.c, rbox-save.c) +void generate_oid(char* oid, struct mail_storage *storage, int mail_uid){ + int32_t mail_user_uid = storage->user->uid; + sprintf(oid,"INBOX.%d%d",mail_user_uid,mail_uid); +} + + + struct mail_storage rbox_storage = { .name = RBOX_STORAGE_NAME, .class_flags = MAIL_STORAGE_CLASS_FLAG_FILE_PER_MSG | MAIL_STORAGE_CLASS_FLAG_HAVE_MAIL_GUIDS | MAIL_STORAGE_CLASS_FLAG_HAVE_MAIL_SAVE_GUIDS | MAIL_STORAGE_CLASS_FLAG_BINARY_DATA | MAIL_STORAGE_CLASS_FLAG_STUBS, .v = { diff --git a/src/storage-rbox/rbox-storage.h b/src/storage-rbox/rbox-storage.h index 3d760872..7b48ecfe 100644 --- a/src/storage-rbox/rbox-storage.h +++ b/src/storage-rbox/rbox-storage.h @@ -3,6 +3,7 @@ #include "index-storage.h" #include "dbox-storage.h" +#include #define RBOX_STORAGE_NAME "rbox" #define RBOX_MAIL_FILE_PREFIX "u." @@ -19,6 +20,12 @@ struct rbox_index_header { struct rbox_storage { struct dbox_storage storage; + /* rados cluster and io ctx */ + rados_t cluster; + rados_ioctx_t ceph_io; + // currently used to control if mail is stored + // in rados or in a local file + int use_rados_storage; }; struct obox_mail_index_record { @@ -68,4 +75,6 @@ void rbox_transaction_save_rollback(struct mail_save_context *ctx); int rbox_copy(struct mail_save_context *ctx, struct mail *mail); +void generate_oid(char* oid, struct mail_storage *storage, int mail_uid); + #endif