Skip to content

Commit

Permalink
#243: changed mail buffer. mail buffer for i_stream needs to exist un…
Browse files Browse the repository at this point in the history
…til index_mail_close finished.
  • Loading branch information
jrse committed Feb 5, 2019
1 parent f5d88ce commit 96c12f6
Show file tree
Hide file tree
Showing 14 changed files with 143 additions and 50 deletions.
3 changes: 2 additions & 1 deletion src/librmb/rados-mail.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ RadosMail::RadosMail()
active_op(0),
save_date_rados(-1),
valid(true),
index_ref(false) {}
index_ref(false),
mail_buffer(nullptr) {}

RadosMail::~RadosMail() {}

Expand Down
6 changes: 4 additions & 2 deletions src/librmb/rados-mail.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ class RadosMail {
/*!
* @return ptr to internal buffer .
*/
librados::bufferlist* get_mail_buffer() { return &this->mail_buffer; }
librados::bufferlist* get_mail_buffer() { return this->mail_buffer; }
void set_mail_buffer(librados::bufferlist* buffer) { this->mail_buffer = buffer; }

map<string, ceph::bufferlist>* get_metadata() { return &this->attrset; }

AioCompletion* get_completion() { return completion; }
Expand Down Expand Up @@ -112,7 +114,7 @@ class RadosMail {
ObjectWriteOperation* write_operation;

int active_op;
ceph::bufferlist mail_buffer;
ceph::bufferlist* mail_buffer;
time_t save_date_rados;

map<string, ceph::bufferlist> attrset;
Expand Down
9 changes: 7 additions & 2 deletions src/librmb/rados-storage-impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,9 @@ bool RadosStorageImpl::wait_for_rados_operations(const std::vector<librmb::Rados
// (*it_cur_obj)->set_completion(nullptr);
(*it_cur_obj)->set_write_operation(nullptr);
}
// free mail's buffer cause we don't need it anymore
librados::bufferlist *mail_buffer = (*it_cur_obj)->get_mail_buffer();
delete mail_buffer;
}
return ctx_failed;
}
Expand Down Expand Up @@ -405,6 +408,8 @@ bool RadosStorageImpl::save_mail(RadosMail *mail, bool &save_async) {
}
librmb::RadosMail *RadosStorageImpl::alloc_rados_mail() { return new librmb::RadosMail(); }
void RadosStorageImpl::free_rados_mail(librmb::RadosMail *mail) {
delete mail;
mail = nullptr;
if (mail != nullptr) {
delete mail;
mail = nullptr;
}
}
3 changes: 3 additions & 0 deletions src/librmb/rados-util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,9 @@ int RadosUtils::copy_to_alt(std::string &src_oid, std::string &dest_oid, RadosSt
RadosMail mail;
mail.set_oid(src_oid);

librados::bufferlist *bl = new librados::bufferlist();
mail.set_mail_buffer(bl);

if (inverse) {
ret = alt_storage->read_mail(src_oid, mail.get_mail_buffer());
metadata->get_storage()->set_io_ctx(&alt_storage->get_io_ctx());
Expand Down
5 changes: 3 additions & 2 deletions src/librmb/tools/rmb/rmb-commands.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,8 @@ int RmbCommands::print_mail(std::map<std::string, librmb::RadosMailBox *> *mailb
for (std::vector<librmb::RadosMail *>::iterator it_mail = it->second->get_mails().begin();
it_mail != it->second->get_mails().end(); ++it_mail) {
const std::string oid = *(*it_mail)->get_oid();

librados::bufferlist bl;
(*it_mail)->set_mail_buffer(&bl);
if (storage->read_mail(oid, (*it_mail)->get_mail_buffer()) > 0) {
if (tools.save_mail((*it_mail)) < 0) {
std::cout << " error saving mail : " << oid << " to " << tools.get_mailbox_path() << std::endl;
Expand All @@ -520,7 +521,7 @@ int RmbCommands::print_mail(std::map<std::string, librmb::RadosMailBox *> *mailb
int RmbCommands::query_mail_storage(std::vector<librmb::RadosMail *> *mail_objects, librmb::CmdLineParser *parser,
bool download, bool silent) {
print_debug("entry: query_mail_storage");

std::map<std::string, librmb::RadosMailBox *> mailbox;
for (std::vector<librmb::RadosMail *>::iterator it = mail_objects->begin(); it != mail_objects->end(); ++it) {
std::string mailbox_key = std::string(1, static_cast<char>(librmb::RBOX_METADATA_MAILBOX_GUID));
Expand Down
8 changes: 3 additions & 5 deletions src/storage-rbox/doveadm-rbox-plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ static int cmd_rmb_search_run(std::map<std::string, std::string> &opts, struct m
}
if (user->namespaces != NULL) {
struct mail_namespace *ns = mail_namespace_find_inbox(user->namespaces);
for (; ns != NULL; ns = ns->next) {
for (; ns != NULL; ns = ns->next) {
check_namespace_mailboxes(ns, mail_objects);
}
}
Expand Down Expand Up @@ -685,10 +685,8 @@ int check_namespace_mailboxes(const struct mail_namespace *ns, const std::vector
struct mailbox_list_iterate_context *iter;
const struct mailbox_info *info;
int ret = 0;
std::cout << "INDEX: Check" << std::endl;
iter = mailbox_list_iter_init(
ns->list, "*",
static_cast<enum mailbox_list_iter_flags>(MAILBOX_LIST_ITER_RAW_LIST | MAILBOX_LIST_ITER_RETURN_NO_FLAGS));
iter = mailbox_list_iter_init(ns->list, "*", static_cast<enum mailbox_list_iter_flags>(
MAILBOX_LIST_ITER_RAW_LIST | MAILBOX_LIST_ITER_RETURN_NO_FLAGS));
while ((info = mailbox_list_iter_next(iter)) != NULL) {
if ((info->flags & (MAILBOX_NONEXISTENT | MAILBOX_NOSELECT)) == 0) {
ret = iterate_mailbox(ns, info, mail_objects);
Expand Down
39 changes: 23 additions & 16 deletions src/storage-rbox/istream-bufferlist.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ extern "C" {
#include "istream-bufferlist.h"
#include <rados/librados.hpp>

struct bufferlist_istream {
struct istream_private istream;
librados::bufferlist *bl;
};
static ssize_t i_stream_data_read(struct istream_private *stream) {
stream->istream.eof = TRUE; // all in!
return -1;
Expand All @@ -31,30 +35,33 @@ static void i_stream_data_seek(struct istream_private *stream, uoff_t v_offset,
static void rbox_istream_destroy(struct iostream_private *stream) {
// nothing to do. but required, so that default destroy is not evoked!
// buffer is member of RboxMailObjec, which destroys the bufferlist
struct bufferlist_istream *bstream = (struct bufferlist_istream *)stream;
delete bstream->bl;
}
struct istream *i_stream_create_from_bufferlist(librados::bufferlist *data, const size_t &size) {
struct istream_private *stream;
struct bufferlist_istream *bstream;

stream = i_new(struct istream_private, 1);
bstream = i_new(struct bufferlist_istream, 1);
// use unsigned char* for binary data!
stream->buffer = reinterpret_cast<const unsigned char *>(data->c_str());
stream->pos = size;
stream->max_buffer_size = (size_t)-1;
bstream->istream.buffer = reinterpret_cast<unsigned char *>(data->c_str());
bstream->istream.pos = size;
bstream->istream.max_buffer_size = (size_t)-1;

stream->read = i_stream_data_read;
stream->seek = i_stream_data_seek; // use default
bstream->istream.read = i_stream_data_read;
bstream->istream.seek = i_stream_data_seek; // use default

stream->istream.readable_fd = FALSE;
stream->istream.blocking = TRUE;
stream->istream.seekable = TRUE;
stream->iostream.destroy = rbox_istream_destroy;
bstream->istream.istream.readable_fd = FALSE;
bstream->istream.istream.blocking = TRUE;
bstream->istream.istream.seekable = TRUE;
bstream->istream.iostream.destroy = rbox_istream_destroy;
bstream->bl = data;

#if DOVECOT_PREREQ(2, 3)
i_stream_create(stream, NULL, -1, ISTREAM_CREATE_FLAG_NOOP_SNAPSHOT);
i_stream_create(&bstream->istream, NULL, -1, ISTREAM_CREATE_FLAG_NOOP_SNAPSHOT);
#else
i_stream_create(stream, NULL, -1);
i_stream_create(&bstream->istream, NULL, -1);
#endif
stream->statbuf.st_size = size - 1;
i_stream_set_name(&stream->istream, "(buffer)");
return &stream->istream;
bstream->istream.statbuf.st_size = size - 1;
i_stream_set_name(&bstream->istream.istream, "(buffer)");
return &bstream->istream.istream;
}
6 changes: 4 additions & 2 deletions src/storage-rbox/ostream-bufferlist.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ static int o_stream_buffer_seek(struct ostream_private *stream, uoff_t offset) {

int o_stream_buffer_write_at(struct ostream_private *stream, const void *data, size_t size, uoff_t offset) {
i_assert(stream != NULL);
i_error("unused !");
i_error("unused (o_stream_buffer_write_at) !");
return -1;
}

Expand All @@ -47,6 +47,9 @@ static void rbox_ostream_destroy(struct iostream_private *stream) {
// wait for write operation
struct bufferlist_ostream *bstream = (struct bufferlist_ostream *)stream;
i_assert(bstream->buf != nullptr);

// do not free the outbut stream! cause, it is needed until all write operations are finished!
// delete bstream->buf;
}

static ssize_t o_stream_buffer_sendv(struct ostream_private *stream, const struct const_iovec *iov,
Expand Down Expand Up @@ -98,7 +101,6 @@ struct ostream *o_stream_create_bufferlist(librmb::RadosMail *rados_mail, const
if (execute_write_ops) {
rados_mail->set_completion(librados::Rados::aio_create_completion());
rados_mail->set_active_op(1);
// rados_mail->set_write_operation(new librados::ObjectWriteOperation());
}
output = o_stream_create(&bstream->ostream, NULL, -1);
o_stream_set_name(output, "(buffer)");
Expand Down
10 changes: 9 additions & 1 deletion src/storage-rbox/rbox-mail.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,8 @@ static int rbox_mail_get_stream(struct mail *_mail, bool get_body ATTR_UNUSED, s
return -1;
}
}
rmail->rados_mail->get_mail_buffer()->clear();
// create mail buffer!
rmail->rados_mail->set_mail_buffer(new librados::bufferlist());

uint64_t psize;
time_t save_date;
Expand All @@ -436,11 +437,13 @@ static int rbox_mail_get_stream(struct mail *_mail, bool get_body ATTR_UNUSED, s
rmail->rados_mail->get_oid()->c_str(), rados_storage->get_namespace().c_str(), getpid(), alt_storage);
rbox_mail_set_expunged(rmail);
FUNC_END_RET("ret == -1");
delete rmail->rados_mail->get_mail_buffer();
return -1;
} else {
i_error("reading mail return code(%d), oid(%s),namespace(%s), alt_storage(%d)", ret,
rmail->rados_mail->get_oid()->c_str(), rados_storage->get_namespace().c_str(), alt_storage);
FUNC_END_RET("ret == -1");
delete rmail->rados_mail->get_mail_buffer();
return -1;
}
}
Expand All @@ -457,16 +460,19 @@ static int rbox_mail_get_stream(struct mail *_mail, bool get_body ATTR_UNUSED, s
rmail->rados_mail->get_oid()->c_str(), rados_storage->get_namespace().c_str(), alt_storage, _mail->uid);
FUNC_END_RET("ret == 0");
rbox_mail_set_expunged(rmail);
delete rmail->rados_mail->get_mail_buffer();
return -1;
} else if (physical_size == INT_MAX) {
i_error("trying to read a mail with INT_MAX size. (uid=%d,oid=%s,namespace=%s,alt_storage=%d)", _mail->uid,
rmail->rados_mail->get_oid()->c_str(), rados_storage->get_namespace().c_str(), alt_storage);
FUNC_END_RET("ret == -1");
delete rmail->rados_mail->get_mail_buffer();
return -1;
}

if (get_mail_stream(rmail, rmail->rados_mail->get_mail_buffer(), physical_size, &input) < 0) {
FUNC_END_RET("ret == -1");
delete rmail->rados_mail->get_mail_buffer();
return -1;
}

Expand Down Expand Up @@ -650,6 +656,7 @@ static void rbox_mail_close(struct mail *_mail) {

if (rmail_->rados_mail != nullptr) {
r_storage->s->free_rados_mail(rmail_->rados_mail);
i_debug("freeing mail");
rmail_->rados_mail = nullptr;
}

Expand All @@ -664,6 +671,7 @@ static void rbox_index_mail_set_seq(struct mail *_mail, uint32_t seq, bool savin

if (rmail_->rados_mail == nullptr) {
struct rbox_storage *r_storage = (struct rbox_storage *)_mail->box->storage;

rmail_->rados_mail = r_storage->s->alloc_rados_mail();
rbox_get_index_record(_mail);
}
Expand Down
12 changes: 7 additions & 5 deletions src/storage-rbox/rbox-save.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ void init_output_stream(mail_save_context *_ctx) {
o_stream_unref(&_ctx->data.output);
}

// create buffer ( delete is in wait_for_write_operations)
r_ctx->rados_mail->set_mail_buffer(new librados::bufferlist());
r_ctx->output_stream = o_stream_create_bufferlist(r_ctx->rados_mail, &r_ctx->rados_storage,
rbox->storage->config->is_create_write_op_in_write_continue());
o_stream_cork(r_ctx->output_stream);
Expand Down Expand Up @@ -389,10 +391,10 @@ static void clean_up_failed(struct rbox_save_context *r_ctx, bool wait_for_opera
struct rbox_storage *r_storage = (struct rbox_storage *)&r_ctx->mbox->storage->storage;

if (wait_for_operations) {
/* if (r_storage->s->wait_for_rados_operations(r_ctx->rados_mails)) {
i_error("Librados waiting for rados operations failed (mails: %lu), namespace=%s", r_ctx->rados_mails.size(),
r_storage->s->get_namespace().c_str());
}*/
if (r_storage->s->wait_for_rados_operations(r_ctx->rados_mails)) {
i_error("Librados waiting for rados operations failed (mails: %lu), namespace=%s", r_ctx->rados_mails.size(),
r_storage->s->get_namespace().c_str());
}
}
// try to clean up!
int delete_ret = 0;
Expand Down Expand Up @@ -512,7 +514,7 @@ int rbox_save_finish(struct mail_save_context *_ctx) {
rbox_save_mail_set_metadata(r_ctx, r_ctx->rados_mail);

// write_op will be deleted in [wait_for_operations]
librados::ObjectWriteOperation write_op; // = new librados::ObjectWriteOperation();
librados::ObjectWriteOperation write_op; // = new librados::ObjectWriteOperation();
r_storage->ms->get_storage()->save_metadata(&write_op, r_ctx->rados_mail);

if (!r_storage->config->is_create_write_op_in_write_continue()) {
Expand Down
Loading

0 comments on commit 96c12f6

Please sign in to comment.