Skip to content

Commit

Permalink
feat: Batched message create updates (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
jameshoulahan authored Jun 14, 2022
1 parent 2e3c187 commit 1d7d456
Show file tree
Hide file tree
Showing 13 changed files with 170 additions and 117 deletions.
22 changes: 12 additions & 10 deletions connector/dummy.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,9 @@ func (conn *Dummy) CreateMessage(ctx context.Context, mboxID string, literal []b
date,
)

update, err := imap.NewMessageCreated(message, literal, []string{mboxID})
if err != nil {
update := imap.NewMessagesCreated()

if err := update.Add(message, literal, []string{mboxID}); err != nil {
return imap.Message{}, err
}

Expand Down Expand Up @@ -254,15 +255,16 @@ func (conn *Dummy) Sync(ctx context.Context) error {
conn.updateCh <- imap.NewMailboxCreated(mailbox)
}

update := imap.NewMessagesCreated()

for _, message := range conn.state.getMessages() {
update, err := imap.NewMessageCreated(message, conn.state.getLiteral(message.ID), conn.state.getLabelIDs(message.ID))
if err != nil {
if err := update.Add(message, conn.state.getLiteral(message.ID), conn.state.getLabelIDs(message.ID)); err != nil {
return err
}

conn.updateCh <- update
}

conn.updateCh <- update

return nil
}

Expand Down Expand Up @@ -303,10 +305,10 @@ func (conn *Dummy) pushUpdate(update imap.Update) {
// We mimic the behaviour of the Proton sever. if several update to a message or mailbox happen in between
// two event polls, we only get one refresh update with the latest state.
switch update := update.(type) {
case imap.MessageUpdated:
case *imap.MessageUpdated:
conn.queue = removeMessageUpdatedFromSlice(conn.queue, update.MessageID)

case imap.MailboxUpdated:
case *imap.MailboxUpdated:
conn.queue = removeMailboxUpdatedFromSlice(conn.queue, update.MailboxID)
}

Expand All @@ -326,15 +328,15 @@ func (conn *Dummy) popUpdates() []imap.Update {

func removeMessageUpdatedFromSlice(updates []imap.Update, messageID string) []imap.Update {
return xslices.Filter(updates, func(update imap.Update) bool {
u, ok := update.(imap.MessageUpdated)
u, ok := update.(*imap.MessageUpdated)

return (!ok) || (u.MessageID != messageID)
})
}

func removeMailboxUpdatedFromSlice(updates []imap.Update, mailboxID string) []imap.Update {
return xslices.Filter(updates, func(update imap.Update) bool {
u, ok := update.(imap.MailboxUpdated)
u, ok := update.(*imap.MailboxUpdated)

return (!ok) || (u.MailboxID != mailboxID)
})
Expand Down
5 changes: 3 additions & 2 deletions connector/dummy_simulate.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ func (conn *Dummy) MessageCreated(message imap.Message, literal []byte, mboxIDs
labelIDs: labelIDs,
}

update, err := imap.NewMessageCreated(message, literal, mboxIDs)
if err != nil {
update := imap.NewMessagesCreated()

if err := update.Add(message, literal, mboxIDs); err != nil {
return err
}

Expand Down
8 changes: 4 additions & 4 deletions imap/update_mailbox_created.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,19 @@ type MailboxCreated struct {
Mailbox Mailbox
}

func NewMailboxCreated(mailbox Mailbox) MailboxCreated {
return MailboxCreated{
func NewMailboxCreated(mailbox Mailbox) *MailboxCreated {
return &MailboxCreated{
updateWaiter: newUpdateWaiter(),
Mailbox: mailbox,
}
}

func (u MailboxCreated) String() string {
func (u *MailboxCreated) String() string {
return fmt.Sprintf(
"MailboxCreated: Mailbox.ID = %v, Mailbox.Name = %v",
utils.ShortID(u.Mailbox.ID),
utils.ShortID(strings.Join(u.Mailbox.Name, "/")),
)
}

func (MailboxCreated) _isUpdate() {}
func (*MailboxCreated) _isUpdate() {}
8 changes: 4 additions & 4 deletions imap/update_mailbox_deleted.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ type MailboxDeleted struct {
MailboxID string
}

func NewMailboxDeleted(mailboxID string) MailboxDeleted {
return MailboxDeleted{
func NewMailboxDeleted(mailboxID string) *MailboxDeleted {
return &MailboxDeleted{
updateWaiter: newUpdateWaiter(),
MailboxID: mailboxID,
}
}

func (u MailboxDeleted) String() string {
func (u *MailboxDeleted) String() string {
return fmt.Sprintf("MailboxDeleted: MailboxID = %v", utils.ShortID(u.MailboxID))
}

func (MailboxDeleted) _isUpdate() {}
func (*MailboxDeleted) _isUpdate() {}
8 changes: 4 additions & 4 deletions imap/update_mailbox_id_changed.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@ type MailboxIDChanged struct {
NewID string
}

func NewMailboxIDChanged(oldID, newID string) MailboxIDChanged {
return MailboxIDChanged{
func NewMailboxIDChanged(oldID, newID string) *MailboxIDChanged {
return &MailboxIDChanged{
updateWaiter: newUpdateWaiter(),
OldID: oldID,
NewID: newID,
}
}

func (u MailboxIDChanged) String() string {
func (u *MailboxIDChanged) String() string {
return fmt.Sprintf("MailboxIDChanged: OldID = %v, NewID = %v", utils.ShortID(u.OldID), utils.ShortID(u.NewID))
}

func (MailboxIDChanged) _isUpdate() {}
func (*MailboxIDChanged) _isUpdate() {}
8 changes: 4 additions & 4 deletions imap/update_mailbox_updated.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,20 @@ type MailboxUpdated struct {
MailboxName []string
}

func NewMailboxUpdated(mailboxID string, mailboxName []string) MailboxUpdated {
return MailboxUpdated{
func NewMailboxUpdated(mailboxID string, mailboxName []string) *MailboxUpdated {
return &MailboxUpdated{
updateWaiter: newUpdateWaiter(),
MailboxID: mailboxID,
MailboxName: mailboxName,
}
}

func (u MailboxUpdated) String() string {
func (u *MailboxUpdated) String() string {
return fmt.Sprintf(
"MailboxUpdated: MailboxID = %v, MailboxName = %v",
utils.ShortID(u.MailboxID),
utils.ShortID(strings.Join(u.MailboxName, "/")),
)
}

func (MailboxUpdated) _isUpdate() {}
func (*MailboxUpdated) _isUpdate() {}
37 changes: 23 additions & 14 deletions imap/update_message_created.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ package imap
import (
"fmt"

"github.com/ProtonMail/gluon/internal/utils"
"github.com/ProtonMail/gluon/rfc822"
)

type MessageCreated struct {
type MessagesCreated struct {
*updateWaiter

Messages []*MessageCreated
}

type MessageCreated struct {
Message Message
Literal []byte
MailboxIDs []string
Expand All @@ -19,42 +22,48 @@ type MessageCreated struct {
Envelope string
}

func NewMessageCreated(message Message, literal []byte, mailboxIDs []string) (MessageCreated, error) {
func NewMessagesCreated() *MessagesCreated {
return &MessagesCreated{
updateWaiter: newUpdateWaiter(),
}
}

func (u *MessagesCreated) Add(message Message, literal []byte, mailboxIDs []string) error {
root, err := rfc822.Parse(literal)
if err != nil {
return MessageCreated{}, fmt.Errorf("failed to parse message literal: %w", err)
return fmt.Errorf("failed to parse message literal: %w", err)
}

body, err := Structure(root, false)
if err != nil {
return MessageCreated{}, fmt.Errorf("failed to build message body: %w", err)
return fmt.Errorf("failed to build message body: %w", err)
}

structure, err := Structure(root, true)
if err != nil {
return MessageCreated{}, fmt.Errorf("failed to build message body structure: %w", err)
return fmt.Errorf("failed to build message body structure: %w", err)
}

envelope, err := Envelope(root.ParseHeader())
if err != nil {
return MessageCreated{}, fmt.Errorf("failed to build message envelope: %w", err)
return fmt.Errorf("failed to build message envelope: %w", err)
}

return MessageCreated{
updateWaiter: newUpdateWaiter(),

u.Messages = append(u.Messages, &MessageCreated{
Message: message,
Literal: literal,
MailboxIDs: mailboxIDs,

Body: body,
Structure: structure,
Envelope: envelope,
}, nil
})

return nil
}

func (u MessageCreated) String() string {
return fmt.Sprintf("MessageCreated: Message.ID = %v, MailboxIDs = %v", utils.ShortID(u.Message.ID), u.MailboxIDs)
func (u *MessagesCreated) String() string {
return fmt.Sprintf("MessagesCreated (length = %v)", len(u.Messages))
}

func (MessageCreated) _isUpdate() {}
func (*MessagesCreated) _isUpdate() {}
8 changes: 4 additions & 4 deletions imap/update_message_id_changed.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@ type MessageIDChanged struct {
NewID string
}

func NewMessageIDChanged(oldID, newID string) MessageIDChanged {
return MessageIDChanged{
func NewMessageIDChanged(oldID, newID string) *MessageIDChanged {
return &MessageIDChanged{
updateWaiter: newUpdateWaiter(),
OldID: oldID,
NewID: newID,
}
}

func (u MessageIDChanged) String() string {
func (u *MessageIDChanged) String() string {
return fmt.Sprintf("MessageID changed: OldID = %v, NewID = %v", utils.ShortID(u.OldID), utils.ShortID(u.NewID))
}

func (MessageIDChanged) _isUpdate() {}
func (*MessageIDChanged) _isUpdate() {}
8 changes: 4 additions & 4 deletions imap/update_message_updated.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ type MessageUpdated struct {
Seen, Flagged bool
}

func NewMessageUpdated(messageID string, mailboxIDs []string, seen, flagged bool) MessageUpdated {
return MessageUpdated{
func NewMessageUpdated(messageID string, mailboxIDs []string, seen, flagged bool) *MessageUpdated {
return &MessageUpdated{
updateWaiter: newUpdateWaiter(),
MessageID: messageID,
MailboxIDs: mailboxIDs,
Expand All @@ -26,7 +26,7 @@ func NewMessageUpdated(messageID string, mailboxIDs []string, seen, flagged bool
}
}

func (u MessageUpdated) String() string {
func (u *MessageUpdated) String() string {
return fmt.Sprintf(
"MessageUpdated: MessageID = %v, MailboxIDs = %v, seen = %v, flagged = %v",
utils.ShortID(u.MessageID),
Expand All @@ -36,4 +36,4 @@ func (u MessageUpdated) String() string {
)
}

func (MessageUpdated) _isUpdate() {}
func (*MessageUpdated) _isUpdate() {}
57 changes: 33 additions & 24 deletions internal/backend/message_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,33 +11,42 @@ import (
"github.com/bradenaw/juniper/xslices"
)

func txCreateMessage(
ctx context.Context,
tx *ent.Tx,
message imap.Message,
literal []byte,
body, structure, envelope string,
internalID string,
) (*ent.Message, error) {
builders := xslices.Map(message.Flags.ToSlice(), func(flag string) *ent.MessageFlagCreate {
return tx.MessageFlag.Create().SetValue(flag)
})
type txCreateMessageReq struct {
message imap.Message
literal []byte
body string
structure string
envelope string
internalID string
}

entFlags, err := tx.MessageFlag.CreateBulk(builders...).Save(ctx)
if err != nil {
return nil, err
func txCreateMessages(ctx context.Context, tx *ent.Tx, reqs ...*txCreateMessageReq) ([]*ent.Message, error) {
flags := make(map[string][]*ent.MessageFlag)

for _, req := range reqs {
builders := xslices.Map(req.message.Flags.ToSlice(), func(flag string) *ent.MessageFlagCreate {
return tx.MessageFlag.Create().SetValue(flag)
})

entFlags, err := tx.MessageFlag.CreateBulk(builders...).Save(ctx)
if err != nil {
return nil, err
}

flags[req.message.ID] = entFlags
}

return tx.Message.Create().
SetMessageID(message.ID).
SetInternalID(internalID).
SetDate(message.Date).
SetBody(body).
SetBodyStructure(structure).
SetEnvelope(envelope).
SetSize(len(literal)).
AddFlags(entFlags...).
Save(ctx)
return tx.Message.CreateBulk(xslices.Map(reqs, func(req *txCreateMessageReq) *ent.MessageCreate {
return tx.Message.Create().
SetMessageID(req.message.ID).
SetInternalID(req.internalID).
SetDate(req.message.Date).
SetBody(req.body).
SetBodyStructure(req.structure).
SetEnvelope(req.envelope).
SetSize(len(req.literal)).
AddFlags(flags[req.message.ID]...)
})...).Save(ctx)
}

func txAddMessagesToMailbox(ctx context.Context, tx *ent.Tx, messageIDs []string, mboxID string) (map[string]int, error) {
Expand Down
Loading

0 comments on commit 1d7d456

Please sign in to comment.