Skip to content

Commit

Permalink
fix(GODT-2425): Always use DB state for flag operation
Browse files Browse the repository at this point in the history
Out of sync flag state could be related to the fact that the snapshots
were issuing connector operations based on the information from the
snapshot rather than what is actually in the database.
  • Loading branch information
LBeernaertProton committed Mar 6, 2023
1 parent c843a91 commit e4c7ed0
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 103 deletions.
12 changes: 7 additions & 5 deletions internal/db/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,9 @@ func GetMessageUIDsWithFlagsAfterAddOrUIDBump(ctx context.Context, client *ent.C
}

type MessageFlagSet struct {
ID imap.InternalMessageID
FlagSet imap.FlagSet
ID imap.InternalMessageID
RemoteID imap.MessageID
FlagSet imap.FlagSet
}

// GetMessageFlags returns the flags of the given messages.
Expand All @@ -347,16 +348,17 @@ func GetMessageFlags(ctx context.Context, client *ent.Client, messageIDs []imap.
WithFlags(func(query *ent.MessageFlagQuery) {
query.Select(messageflag.FieldValue)
}).
Select(message.FieldID).
Select(message.FieldID, message.FieldRemoteID).
All(ctx)
if err != nil {
return nil, err
}

for _, msg := range chunkMessages {
mfs := MessageFlagSet{
ID: msg.ID,
FlagSet: imap.NewFlagSetWithCapacity(len(msg.Edges.Flags)),
ID: msg.ID,
RemoteID: msg.RemoteID,
FlagSet: imap.NewFlagSetWithCapacity(len(msg.Edges.Flags)),
}

for _, v := range msg.Edges.Flags {
Expand Down
98 changes: 0 additions & 98 deletions internal/state/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,40 +554,6 @@ func (state *State) actionAddMessageFlags(
return sm.ID.InternalID
})

// If setting messages as seen, only set those messages that aren't currently seen.
if addFlags.ContainsUnchecked(imap.FlagSeenLowerCase) {
var messagesToApply []imap.MessageID

for _, msg := range messages {
if !msg.flags.ContainsUnchecked(imap.FlagSeenLowerCase) {
messagesToApply = append(messagesToApply, msg.ID.RemoteID)
}
}

if len(messagesToApply) != 0 {
if err := state.user.GetRemote().SetMessagesSeen(ctx, messagesToApply, true); err != nil {
return err
}
}
}

// If setting messages as flagged, only set those messages that aren't currently flagged.
if addFlags.ContainsUnchecked(imap.FlagFlaggedLowerCase) {
var messagesToApply []imap.MessageID

for _, msg := range messages {
if !msg.flags.ContainsUnchecked(imap.FlagFlaggedLowerCase) {
messagesToApply = append(messagesToApply, msg.ID.RemoteID)
}
}

if len(messagesToApply) != 0 {
if err := state.user.GetRemote().SetMessagesFlagged(ctx, messagesToApply, true); err != nil {
return err
}
}
}

if err := state.applyMessageFlagsAdded(ctx, tx, internalMessageIDs, addFlags); err != nil {
return err
}
Expand All @@ -605,40 +571,6 @@ func (state *State) actionRemoveMessageFlags(
return sm.ID.InternalID
})

// If setting messages as unseen, only set those messages that are currently seen.
if remFlags.ContainsUnchecked(imap.FlagSeenLowerCase) {
var messagesToApply []imap.MessageID

for _, msg := range messages {
if msg.flags.ContainsUnchecked(imap.FlagSeenLowerCase) {
messagesToApply = append(messagesToApply, msg.ID.RemoteID)
}
}

if len(messagesToApply) != 0 {
if err := state.user.GetRemote().SetMessagesSeen(ctx, messagesToApply, false); err != nil {
return err
}
}
}

// If setting messages as unflagged, only set those messages that are currently flagged.
if remFlags.ContainsUnchecked(imap.FlagFlaggedLowerCase) {
var messagesToApply []imap.MessageID

for _, msg := range messages {
if msg.flags.ContainsUnchecked(imap.FlagFlaggedLowerCase) {
messagesToApply = append(messagesToApply, msg.ID.RemoteID)
}
}

if len(messagesToApply) != 0 {
if err := state.user.GetRemote().SetMessagesFlagged(ctx, messagesToApply, false); err != nil {
return err
}
}
}

if err := state.applyMessageFlagsRemoved(ctx, tx, internalMessageIDs, remFlags); err != nil {
return err
}
Expand All @@ -655,35 +587,5 @@ func (state *State) actionSetMessageFlags(ctx context.Context, tx *ent.Tx, messa
return sm.ID.InternalID
})

// If setting messages as seen, only set those messages that aren't currently seen, and vice versa.
setSeen := map[bool][]imap.MessageID{true: {}, false: {}}

for _, msg := range messages {
if seen := setFlags.ContainsUnchecked(imap.FlagSeenLowerCase); seen != msg.flags.ContainsUnchecked(imap.FlagSeenLowerCase) {
setSeen[seen] = append(setSeen[seen], msg.ID.RemoteID)
}
}

for seen, messageIDs := range setSeen {
if err := state.user.GetRemote().SetMessagesSeen(ctx, messageIDs, seen); err != nil {
return err
}
}

// If setting messages as flagged, only set those messages that aren't currently flagged, and vice versa.
setFlagged := map[bool][]imap.MessageID{true: {}, false: {}}

for _, msg := range messages {
if flagged := setFlags.ContainsUnchecked(imap.FlagFlaggedLowerCase); flagged != msg.flags.ContainsUnchecked(imap.FlagFlaggedLowerCase) {
setFlagged[flagged] = append(setFlagged[flagged], msg.ID.RemoteID)
}
}

for flagged, messageIDs := range setFlagged {
if err := state.user.GetRemote().SetMessagesFlagged(ctx, messageIDs, flagged); err != nil {
return err
}
}

return state.applyMessageFlagsSet(ctx, tx, internalMessageIDs, setFlags)
}
102 changes: 102 additions & 0 deletions internal/state/updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,40 @@ func (state *State) applyMessageFlagsAdded(ctx context.Context, tx *ent.Tx, mess
return err
}

// If setting messages as seen, only set those messages that aren't currently seen.
if addFlags.ContainsUnchecked(imap.FlagSeenLowerCase) {
var messagesToApply []imap.MessageID

for _, msg := range curFlags {
if !msg.FlagSet.ContainsUnchecked(imap.FlagSeenLowerCase) {
messagesToApply = append(messagesToApply, msg.RemoteID)
}
}

if len(messagesToApply) != 0 {
if err := state.user.GetRemote().SetMessagesSeen(ctx, messagesToApply, true); err != nil {
return err
}
}
}

// If setting messages as flagged, only set those messages that aren't currently flagged.
if addFlags.ContainsUnchecked(imap.FlagFlaggedLowerCase) {
var messagesToApply []imap.MessageID

for _, msg := range curFlags {
if !msg.FlagSet.ContainsUnchecked(imap.FlagFlaggedLowerCase) {
messagesToApply = append(messagesToApply, msg.RemoteID)
}
}

if len(messagesToApply) != 0 {
if err := state.user.GetRemote().SetMessagesFlagged(ctx, messagesToApply, true); err != nil {
return err
}
}
}

if addFlags.ContainsUnchecked(imap.FlagDeletedLowerCase) {
if err := db.SetDeletedFlag(ctx, tx, state.snap.mboxID.InternalID, messageIDs, true); err != nil {
return err
Expand Down Expand Up @@ -166,6 +200,39 @@ func (state *State) applyMessageFlagsRemoved(ctx context.Context, tx *ent.Tx, me
if err != nil {
return err
}
// If setting messages as unseen, only set those messages that are currently seen.
if remFlags.ContainsUnchecked(imap.FlagSeenLowerCase) {
var messagesToApply []imap.MessageID

for _, msg := range curFlags {
if msg.FlagSet.ContainsUnchecked(imap.FlagSeenLowerCase) {
messagesToApply = append(messagesToApply, msg.RemoteID)
}
}

if len(messagesToApply) != 0 {
if err := state.user.GetRemote().SetMessagesSeen(ctx, messagesToApply, false); err != nil {
return err
}
}
}

// If setting messages as unflagged, only set those messages that are currently flagged.
if remFlags.ContainsUnchecked(imap.FlagFlaggedLowerCase) {
var messagesToApply []imap.MessageID

for _, msg := range curFlags {
if msg.FlagSet.ContainsUnchecked(imap.FlagFlaggedLowerCase) {
messagesToApply = append(messagesToApply, msg.RemoteID)
}
}

if len(messagesToApply) != 0 {
if err := state.user.GetRemote().SetMessagesFlagged(ctx, messagesToApply, false); err != nil {
return err
}
}
}

if remFlags.ContainsUnchecked(imap.FlagDeletedLowerCase) {
if err := db.SetDeletedFlag(ctx, tx, state.snap.mboxID.InternalID, messageIDs, false); err != nil {
Expand Down Expand Up @@ -251,6 +318,41 @@ func (state *State) applyMessageFlagsSet(ctx context.Context, tx *ent.Tx, messag
return nil
}

curFlags, err := db.GetMessageFlags(ctx, tx.Client(), messageIDs)
if err != nil {
return err
}

// If setting messages as seen, only set those messages that aren't currently seen, and vice versa.
setSeen := map[bool][]imap.MessageID{true: {}, false: {}}

for _, msg := range curFlags {
if seen := setFlags.ContainsUnchecked(imap.FlagSeenLowerCase); seen != msg.FlagSet.ContainsUnchecked(imap.FlagSeenLowerCase) {
setSeen[seen] = append(setSeen[seen], msg.RemoteID)
}
}

for seen, messageIDs := range setSeen {
if err := state.user.GetRemote().SetMessagesSeen(ctx, messageIDs, seen); err != nil {
return err
}
}

// If setting messages as flagged, only set those messages that aren't currently flagged, and vice versa.
setFlagged := map[bool][]imap.MessageID{true: {}, false: {}}

for _, msg := range curFlags {
if flagged := setFlags.ContainsUnchecked(imap.FlagFlaggedLowerCase); flagged != msg.FlagSet.ContainsUnchecked(imap.FlagFlaggedLowerCase) {
setFlagged[flagged] = append(setFlagged[flagged], msg.RemoteID)
}
}

for flagged, messageIDs := range setFlagged {
if err := state.user.GetRemote().SetMessagesFlagged(ctx, messageIDs, flagged); err != nil {
return err
}
}

if err := db.SetDeletedFlag(ctx, tx, state.snap.mboxID.InternalID, messageIDs, setFlags.Contains(imap.FlagDeleted)); err != nil {
return err
}
Expand Down

0 comments on commit e4c7ed0

Please sign in to comment.