Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature 2 16 -> code review and merge #22

Merged
merged 3 commits into from
May 19, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 42 additions & 10 deletions src/storage-rbox/rbox-mail.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
#include "debug-helper.h"
#include "rbox-file.h"
#include "rbox-storage.h"
#include <rados/librados.h>
#include "message-part.h"

struct mail *
rbox_mail_alloc(struct mailbox_transaction_context *t, enum mail_fetch_field wanted_fields,
Expand Down Expand Up @@ -170,11 +172,11 @@ static int rbox_mail_get_special(struct mail *_mail, enum mail_fetch_field field

int rbox_mail_open(struct dbox_mail *mail, uoff_t *offset_r, struct dbox_file **file_r) {
FUNC_START();

struct mail *_mail = &mail->imail.mail.mail;
struct mail *_mail = &mail->imail.mail.mail;
bool deleted;
int ret;

if (_mail->lookup_abort != MAIL_LOOKUP_ABORT_NEVER) {
mail_set_aborted(_mail);
rbox_dbg_print_mail(_mail, "rbox_mail_open", NULL);
Expand Down Expand Up @@ -225,8 +227,6 @@ static int rbox_mail_metadata_read(struct dbox_mail *mail, struct dbox_file **fi
return -1;
}

i_debug("rbox_mail_metadata_read: offset = %lu, cur_path = %s", offset, (*file_r)->cur_path);

if (dbox_file_seek(*file_r, offset) <= 0) {
rbox_dbg_print_mail(&mail->imail.mail.mail, "rbox_mail_metadata_read", NULL);
FUNC_END_RET("ret == -1; seek failed");
Expand Down Expand Up @@ -259,7 +259,6 @@ static int rbox_mail_metadata_get(struct dbox_mail *mail, enum dbox_metadata_key
}

*value_r = dbox_file_metadata_get(file, key);
i_debug("rbox_mail_metadata_get: key = %d, value = %s", key, *value_r);
rbox_dbg_print_mail(&mail->imail.mail.mail, "rbox_mail_metadata_get", NULL);
FUNC_END();
return 0;
Expand Down Expand Up @@ -393,7 +392,7 @@ static int get_mail_stream(struct dbox_mail *mail, uoff_t offset, struct istream
*stream_r = NULL;
return ret;
}

*stream_r = i_stream_create_limit(file->input, file->cur_physical_size);
if (pmail->v.istream_opened != NULL) {
if (pmail->v.istream_opened(&pmail->mail, stream_r) < 0)
Expand All @@ -403,18 +402,35 @@ static int get_mail_stream(struct dbox_mail *mail, uoff_t offset, struct istream
return 1;
else
return dbox_attachment_file_get_stream(file, stream_r);

return 1;
}



static int rbox_mail_get_stream(struct mail *_mail, bool get_body ATTR_UNUSED, struct message_size *hdr_size,
struct message_size *body_size, struct istream **stream_r) {

struct rbox_storage *storage = (struct rbox_storage *) _mail->box->storage;
struct dbox_mail *mail = (struct dbox_mail *) _mail;
struct index_mail_data *data = &mail->imail.data;
struct istream *input;
uoff_t offset;
struct istream *rados_input;

uoff_t offset = 0;
int ret;

off_t size_r;
if(rbox_mail_get_physical_size(_mail,&size_r) <0){
return -1;
}

/* temporary guid generation see rbox-save.c */
char oid[GUID_128_SIZE];
generate_oid(oid, _mail->box->storage, _mail->seq);

if (data->stream == NULL) {

if (storage->storage.v.mail_open(mail, &offset, &mail->open_file) < 0)
return -1;

Expand All @@ -430,7 +446,24 @@ static int rbox_mail_get_stream(struct mail *_mail, bool get_body ATTR_UNUSED, s
return -1;
}
data->stream = input;
index_mail_set_read_buffer_size(_mail, input);

char buffer[size_r];
int read = 0;
read = rados_read(storage->ceph_io,oid,buffer,size_r,0);
if(read <0){
return -1;
}
rados_input = i_stream_create_from_data(buffer,size_r);

i_stream_seek(input,mail->open_file->cur_physical_size);

if(!i_stream_add_data(input,buffer,size_r)){
return -1;
}

i_stream_seek(input,0);

index_mail_set_read_buffer_size(_mail, data->stream);
}

return index_mail_init_stream(&mail->imail, hdr_size, body_size, stream_r);
Expand All @@ -445,7 +478,6 @@ struct mail_vfuncs rbox_mail_vfuncs = {
index_mail_prefetch,
index_mail_precache,
index_mail_add_temp_wanted_fields,

index_mail_get_flags,
index_mail_get_keywords,
index_mail_get_keyword_indexes,
Expand Down
88 changes: 75 additions & 13 deletions src/storage-rbox/rbox-save.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ struct rbox_save_context {
files;
};


struct dbox_file *
rbox_save_file_get_file(struct mailbox_transaction_context *t, uint32_t seq) {
FUNC_START();
Expand Down Expand Up @@ -113,13 +114,16 @@ void rbox_save_add_file(struct mail_save_context *_ctx, struct dbox_file *file)
int rbox_save_begin(struct mail_save_context *_ctx, struct istream *input) {
FUNC_START();
struct rbox_save_context *ctx = (struct rbox_save_context *) _ctx;
struct dbox_save_context *ctx_dbox = (struct dbox_save_context*)_ctx;

struct dbox_file *file;
int ret;

rbox_dbg_print_mail_save_context(_ctx, "rbox_save_begin", NULL);

file = rbox_file_create(ctx->mbox);
ctx->append_ctx = dbox_file_append_init(file);


ret = dbox_file_get_append_stream(ctx->append_ctx, &ctx->ctx.dbox_output);
if (ret <= 0) {
i_assert(ret != 0);
Expand All @@ -130,49 +134,105 @@ int rbox_save_begin(struct mail_save_context *_ctx, struct istream *input) {
return -1;
}
ctx->cur_file = file;

// add mail to index
// write dummy header to file
dbox_save_begin(&ctx->ctx, input);

// append file to file list.
rbox_save_add_file(_ctx, file);
int count = 0;

FUNC_END();
return ctx->ctx.failed ? -1 : 0;
}

off_t stream_mail_to_rados(rados_ioctx_t* ceph_io, char* guid, struct istream *instream)
{
uoff_t start_offset;
struct const_iovec iov;
const unsigned char *data;
ssize_t ret;
int offset = 0;
start_offset = instream->v_offset;

do{
(void)i_stream_read_data(instream, &data, &iov.iov_len,0);
if(iov.iov_len ==0){
/*all sent */
if(instream->stream_errno !=0){
return -1;
}
break;
}

iov.iov_base = data;

int err = rados_write(*ceph_io,guid,iov.iov_base,iov.iov_len, offset);
if(err <0){
return -1;
}
i_stream_skip(instream,0);

}while((size_t)ret == iov.iov_len);

return (off_t)(instream->v_offset - start_offset);
}

int rbox_save_continue(struct mail_save_context *_ctx) {

FUNC_START();
struct dbox_save_context *ctx = (struct dbox_save_context *) _ctx;
struct mail_storage *storage = _ctx->transaction->box->storage;
struct mail_storage *storage = _ctx->transaction->box->storage;
struct rbox_storage *rbox_ctx = (struct rbox_storage *) storage;

rbox_dbg_print_mail_save_context(_ctx, "rbox_save_continue", NULL);
rbox_dbg_print_mail_storage(storage,"rbox_save_continue",NULL);
rbox_dbg_print_mail_user(storage->user,"rbox_save_continue",NULL);

if (ctx->failed) {
if (ctx->failed) {
FUNC_END_RET("ret == -1; ctx failed");
return -1;
}

if (_ctx->data.attach != NULL) {
if (_ctx->data.attach != NULL) {
FUNC_END_RET("ret == index_attachment_save_continue");
return index_attachment_save_continue(_ctx);
}

/* temporary guid generation see rbox-mail.c */
char oid[GUID_128_SIZE];
generate_oid(oid,_ctx->transaction->box->storage, ctx->seq);
int bytes_written = 0;
do {
if(rbox_ctx->use_rados_storage ==0){
bytes_written = o_stream_send_istream(_ctx->data.output, ctx->input);
}else{
bytes_written = stream_mail_to_rados(&rbox_ctx->ceph_io, oid, ctx->input);
}

do {
if (o_stream_send_istream(_ctx->data.output, ctx->input) < 0) {
if (bytes_written < 0) {
if (!mail_storage_set_error_from_errno(storage)) {
mail_storage_set_critical(storage, "write(%s) failed: %m", o_stream_get_name(_ctx->data.output));
mail_storage_set_critical(storage, "write(%s) failed: %m",
o_stream_get_name(_ctx->data.output));
}
ctx->failed = TRUE;
FUNC_END_RET("ret == -1; o_stream_send_istream failed");
return -1;
}
index_mail_cache_parse_continue(_ctx->dest_mail);

/* both tee input readers may consume data from our primary
input stream. we'll have to make sure we don't return with
one of the streams still having data in them. */
// both tee input readers may consume data from our primary
// input stream. we'll have to make sure we don't return with
// one of the streams still having data in them.
} while (i_stream_read(ctx->input) > 0);


FUNC_END();
return 0;
}


static int dbox_save_mail_write_metadata(struct dbox_save_context *ctx, struct dbox_file *file) {
FUNC_START();
struct rbox_file *sfile = (struct rbox_file *) file;
Expand Down Expand Up @@ -225,7 +285,7 @@ static int dbox_save_finish_write(struct mail_save_context *_ctx) {
struct dbox_file **files;

rbox_dbg_print_mail_save_context(_ctx, "dbox_save_finish_write", NULL);

ctx->ctx.finished = TRUE;
if (ctx->ctx.dbox_output == NULL) {
FUNC_END_RET("ret == -1; ctx.dbox_output == NULL");
Expand Down Expand Up @@ -271,10 +331,12 @@ static int dbox_save_finish_write(struct mail_save_context *_ctx) {

int rbox_save_finish(struct mail_save_context *ctx) {
FUNC_START();
int ret;
int ret;

ret = dbox_save_finish_write(ctx);
index_save_context_free(ctx);
int file_fd = ( (struct rbox_file*) ((struct rbox_save_context*)ctx)->cur_file)->uid;

FUNC_END();
return ret;
}
Expand Down
52 changes: 51 additions & 1 deletion src/storage-rbox/rbox-storage.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ static int rbox_storage_create(struct mail_storage *_storage, struct mail_namesp
FUNC_START();
struct rbox_storage *storage = (struct rbox_storage *) _storage;
enum fs_properties props;


storage->use_rados_storage = 1;
if (dbox_storage_create(_storage, ns, error_r) < 0) {
FUNC_END_RET("ret == -1; dbox_storage_create failed");
return -1;
Expand All @@ -68,6 +70,40 @@ static int rbox_storage_create(struct mail_storage *_storage, struct mail_namesp
}
}
rbox_dbg_print_mail_storage(_storage, "rbox_storage_create", NULL);

// initialize c rados_ctx (test cluster, pool and user name !)

char cluster_name[] = "ceph";
char *poolname = "rbd";
char user_name[] = "client.admin";
uint64_t flags;

int err;
err = rados_create2(&storage->cluster, cluster_name, user_name, flags);

if (err < 0) {
exit(EXIT_FAILURE);
}

err = rados_conf_parse_env(storage->cluster,NULL);
if (err < 0) {
exit(EXIT_FAILURE);
}

err = rados_conf_read_file(storage->cluster, NULL);
if(err <0){
exit(EXIT_FAILURE);
}
err = rados_connect(storage->cluster);
if (err < 0) {
exit(EXIT_FAILURE);
}

err = rados_ioctx_create(storage->cluster,poolname, &storage->ceph_io);
if(err < 0){
exit(EXIT_FAILURE);
}

FUNC_END();
return 0;
}
Expand All @@ -81,6 +117,11 @@ static void rbox_storage_destroy(struct mail_storage *_storage) {
if (storage->storage.attachment_fs != NULL)
fs_deinit(&storage->storage.attachment_fs);
index_storage_destroy(_storage);

// destroy
rados_ioctx_destroy(storage->ceph_io);
rados_shutdown(storage->cluster);

FUNC_END();
}

Expand Down Expand Up @@ -460,13 +501,14 @@ static int rbox_mailbox_get_metadata(struct mailbox *box, enum mailbox_metadata_

if (index_mailbox_get_metadata(box, items, metadata_r) < 0) {
FUNC_END_RET("ret == -1; index_mailbox_get_metadata failed");
i_debug("jrse_: index_mailbox_get_metadata failed");
return -1;
}
if ((items & MAILBOX_METADATA_GUID) != 0) {
memcpy(metadata_r->guid, mbox->mailbox_guid, sizeof(metadata_r->guid));
}
FUNC_END();
return 0;
return -1;
}

static int rbox_mailbox_update(struct mailbox *box, const struct mailbox_update *update) {
Expand Down Expand Up @@ -503,6 +545,14 @@ static void rbox_notify_changes(struct mailbox *box) {
FUNC_END();
}

//TODO: remove test function to generate oid (used in rbox-mail.c, rbox-save.c)
void generate_oid(char* oid, struct mail_storage *storage, int mail_uid){
int32_t mail_user_uid = storage->user->uid;
sprintf(oid,"INBOX.%d%d",mail_user_uid,mail_uid);
}



struct mail_storage rbox_storage = { .name = RBOX_STORAGE_NAME, .class_flags = MAIL_STORAGE_CLASS_FLAG_FILE_PER_MSG
| MAIL_STORAGE_CLASS_FLAG_HAVE_MAIL_GUIDS | MAIL_STORAGE_CLASS_FLAG_HAVE_MAIL_SAVE_GUIDS
| MAIL_STORAGE_CLASS_FLAG_BINARY_DATA | MAIL_STORAGE_CLASS_FLAG_STUBS, .v = {
Expand Down
Loading