Skip to content

Commit

Permalink
feat(GODT-1619): Per connection metadata storage
Browse files Browse the repository at this point in the history
This patch adds features to the gluon so that we can store per
connection metadata. This is required to implement the IMAP ID
extension, where we need to store IMAP client identifiers for each of
the incoming connections.

To achieve this we queue operations onto the queue which create, delete
and store values in this storage space. This guarantees the operations
are in sync with any potential future IMAP commands.

This patch also updates the data written to disk to include the metadate
storage in order for serialized commands that haven't been execute to
function correctly.

This patch also addresses a bug in pchan.go related to concurrent access
after queue closing (thanks @james).

Finally this patch also address some issues in pchan reported by
golangcli-run.
  • Loading branch information
LBeernaertProton committed Jun 13, 2022
1 parent 507ee09 commit bf2f1c3
Show file tree
Hide file tree
Showing 29 changed files with 461 additions and 140 deletions.
5 changes: 4 additions & 1 deletion demo/demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ func main() {
logrus.WithError(err).Fatal("Failed to make temporary directory")
}

server := gluon.New(filepath.Join(dir, "server"))
server, err := gluon.New(filepath.Join(dir, "server"))
if err != nil {
logrus.WithError(err).Fatal("Failed to create server")
}

connector := connector.NewDummy(
[]string{"[email protected]", "[email protected]"},
Expand Down
22 changes: 16 additions & 6 deletions internal/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,17 @@ type Backend struct {
usersLock sync.Mutex
}

func New(dir string) *Backend {
func New(dir string) (*Backend, error) {
manager, err := remote.New(dir)
if err != nil {
return nil, err
}

return &Backend{
remote: remote.New(dir),
remote: manager,
delim: "/",
users: make(map[string]*user),
}
}, nil
}

func (b *Backend) SetDelimiter(delim string) {
Expand Down Expand Up @@ -80,7 +85,7 @@ func (b *Backend) RemoveUser(ctx context.Context, userID string) error {
return nil
}

func (b *Backend) GetState(username, password string) (*State, error) {
func (b *Backend) GetState(username, password string, sessionID int) (*State, error) {
b.usersLock.Lock()
defer b.usersLock.Unlock()

Expand All @@ -89,12 +94,17 @@ func (b *Backend) GetState(username, password string) (*State, error) {
return nil, err
}

state, err := b.users[userID].newState(remote.ConnMetadataID(sessionID))
if err != nil {
return nil, err
}

logrus.
WithField("userID", userID).
WithField("username", username).
Debug("Creating new IMAP state")
Debug("Created new IMAP state")

return b.users[userID].newState()
return state, nil
}

func (b *Backend) Close(ctx context.Context) error {
Expand Down
8 changes: 8 additions & 0 deletions internal/backend/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package backend

import (
"context"
"fmt"
"strings"
"sync"

"github.com/ProtonMail/gluon/imap"
"github.com/ProtonMail/gluon/internal/backend/ent"
"github.com/ProtonMail/gluon/internal/backend/ent/mailbox"
"github.com/ProtonMail/gluon/internal/remote"
"github.com/ProtonMail/gluon/internal/response"
"github.com/bradenaw/juniper/sets"
"github.com/bradenaw/juniper/xslices"
Expand All @@ -25,6 +27,8 @@ type State struct {

snap *snapshot
ro bool

metadataID remote.ConnMetadataID
}

func (state *State) UserID() string {
Expand Down Expand Up @@ -467,6 +471,10 @@ func (state *State) close(ctx context.Context, tx *ent.Tx) error {
}
}

if err := state.remote.DeleteConnMetadataStore(state.metadataID); err != nil {
return fmt.Errorf("failed to delete conn metadata store: %w", err)
}

state.snap = nil

state.res = nil
Expand Down
28 changes: 14 additions & 14 deletions internal/backend/state_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

func (state *State) actionCreateMailbox(ctx context.Context, tx *ent.Tx, name string) (*ent.Mailbox, error) {
res, err := state.remote.CreateMailbox(ctx, strings.Split(name, state.delimiter))
res, err := state.remote.CreateMailbox(state.metadataID, strings.Split(name, state.delimiter))
if err != nil {
return nil, err
}
Expand All @@ -27,7 +27,7 @@ func (state *State) actionCreateMailbox(ctx context.Context, tx *ent.Tx, name st

// TODO(REFACTOR): What if another client is selected in the same mailbox -- should we send expunge updates?
func (state *State) actionDeleteMailbox(ctx context.Context, tx *ent.Tx, mboxID, name string) error {
if err := state.remote.DeleteMailbox(ctx, mboxID, strings.Split(name, state.delimiter)); err != nil {
if err := state.remote.DeleteMailbox(state.metadataID, mboxID, strings.Split(name, state.delimiter)); err != nil {
return err
}

Expand All @@ -36,7 +36,7 @@ func (state *State) actionDeleteMailbox(ctx context.Context, tx *ent.Tx, mboxID,

func (state *State) actionUpdateMailbox(ctx context.Context, tx *ent.Tx, mboxID, oldName, newName string) error {
if err := state.remote.UpdateMailbox(
ctx,
state.metadataID,
mboxID,
strings.Split(oldName, state.delimiter),
strings.Split(newName, state.delimiter),
Expand All @@ -48,7 +48,7 @@ func (state *State) actionUpdateMailbox(ctx context.Context, tx *ent.Tx, mboxID,
}

func (state *State) actionCreateMessage(ctx context.Context, tx *ent.Tx, mboxID string, literal []byte, flags imap.FlagSet, date time.Time) (int, error) {
res, err := state.remote.CreateMessage(ctx, mboxID, literal, flags, date)
res, err := state.remote.CreateMessage(state.metadataID, mboxID, literal, flags, date)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -91,7 +91,7 @@ func (state *State) actionAddMessagesToMailbox(ctx context.Context, tx *ent.Tx,
}
}

if err := state.remote.AddMessagesToMailbox(ctx, messageIDs, mboxID); err != nil {
if err := state.remote.AddMessagesToMailbox(state.metadataID, messageIDs, mboxID); err != nil {
return nil, err
}

Expand All @@ -117,7 +117,7 @@ func (state *State) actionRemoveMessagesFromMailbox(ctx context.Context, tx *ent
return slices.Contains(haveMessageIDs, messageID)
})

if err := state.remote.RemoveMessagesFromMailbox(ctx, messageIDs, mboxID); err != nil {
if err := state.remote.RemoveMessagesFromMailbox(state.metadataID, messageIDs, mboxID); err != nil {
return err
}

Expand All @@ -139,7 +139,7 @@ func (state *State) actionAddMessageFlags(ctx context.Context, tx *ent.Tx, messa

// If setting messages as seen, only set those messages that aren't currently seen.
if addFlags.Contains(imap.FlagSeen) {
if err := state.remote.SetMessagesSeen(ctx, xslices.Filter(messageIDs, func(messageID string) bool {
if err := state.remote.SetMessagesSeen(state.metadataID, xslices.Filter(messageIDs, func(messageID string) bool {
return !curFlags[messageID].Contains(imap.FlagSeen)
}), true); err != nil {
return nil, err
Expand All @@ -148,7 +148,7 @@ func (state *State) actionAddMessageFlags(ctx context.Context, tx *ent.Tx, messa

// If setting messages as flagged, only set those messages that aren't currently flagged.
if addFlags.Contains(imap.FlagFlagged) {
if err := state.remote.SetMessagesFlagged(ctx, xslices.Filter(messageIDs, func(messageID string) bool {
if err := state.remote.SetMessagesFlagged(state.metadataID, xslices.Filter(messageIDs, func(messageID string) bool {
return !curFlags[messageID].Contains(imap.FlagFlagged)
}), true); err != nil {
return nil, err
Expand Down Expand Up @@ -183,7 +183,7 @@ func (state *State) actionRemoveMessageFlags(ctx context.Context, tx *ent.Tx, me

// If setting messages as unseen, only set those messages that are currently seen.
if remFlags.Contains(imap.FlagSeen) {
if err := state.remote.SetMessagesSeen(ctx, xslices.Filter(messageIDs, func(messageID string) bool {
if err := state.remote.SetMessagesSeen(state.metadataID, xslices.Filter(messageIDs, func(messageID string) bool {
return curFlags[messageID].Contains(imap.FlagSeen)
}), false); err != nil {
return nil, err
Expand All @@ -192,7 +192,7 @@ func (state *State) actionRemoveMessageFlags(ctx context.Context, tx *ent.Tx, me

// If setting messages as unflagged, only set those messages that are currently flagged.
if remFlags.Contains(imap.FlagFlagged) {
if err := state.remote.SetMessagesFlagged(ctx, xslices.Filter(messageIDs, func(messageID string) bool {
if err := state.remote.SetMessagesFlagged(state.metadataID, xslices.Filter(messageIDs, func(messageID string) bool {
return curFlags[messageID].Contains(imap.FlagFlagged)
}), false); err != nil {
return nil, err
Expand Down Expand Up @@ -231,13 +231,13 @@ func (state *State) actionSetMessageFlags(ctx context.Context, tx *ent.Tx, messa

// If setting messages as seen, only set those messages that aren't currently seen.
if setFlags.Contains(imap.FlagSeen) {
if err := state.remote.SetMessagesSeen(ctx, xslices.Filter(messageIDs, func(messageID string) bool {
if err := state.remote.SetMessagesSeen(state.metadataID, xslices.Filter(messageIDs, func(messageID string) bool {
return !curFlags[messageID].Contains(imap.FlagSeen)
}), true); err != nil {
return err
}
} else {
if err := state.remote.SetMessagesSeen(ctx, xslices.Filter(messageIDs, func(messageID string) bool {
if err := state.remote.SetMessagesSeen(state.metadataID, xslices.Filter(messageIDs, func(messageID string) bool {
return curFlags[messageID].Contains(imap.FlagSeen)
}), false); err != nil {
return err
Expand All @@ -246,13 +246,13 @@ func (state *State) actionSetMessageFlags(ctx context.Context, tx *ent.Tx, messa

// If setting messages as flagged, only set those messages that aren't currently flagged.
if setFlags.Contains(imap.FlagFlagged) {
if err := state.remote.SetMessagesFlagged(ctx, xslices.Filter(messageIDs, func(messageID string) bool {
if err := state.remote.SetMessagesFlagged(state.metadataID, xslices.Filter(messageIDs, func(messageID string) bool {
return !curFlags[messageID].Contains(imap.FlagFlagged)
}), true); err != nil {
return err
}
} else {
if err := state.remote.SetMessagesFlagged(ctx, xslices.Filter(messageIDs, func(messageID string) bool {
if err := state.remote.SetMessagesFlagged(state.metadataID, xslices.Filter(messageIDs, func(messageID string) bool {
return curFlags[messageID].Contains(imap.FlagFlagged)
}), false); err != nil {
return err
Expand Down
11 changes: 8 additions & 3 deletions internal/backend/user_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,22 @@ package backend
import (
"context"
"fmt"
"github.com/ProtonMail/gluon/internal/remote"

"github.com/ProtonMail/gluon/internal/backend/ent"
"github.com/bradenaw/juniper/xslices"
"golang.org/x/exp/maps"
)

func (user *user) newState() (*State, error) {
func (user *user) newState(metadataID remote.ConnMetadataID) (*State, error) {
user.statesLock.Lock()
defer user.statesLock.Unlock()

state := &State{user: user}
if err := user.remote.CreateConnMetadataStore(metadataID); err != nil {
return nil, err
}

state := &State{user: user, metadataID: metadataID}

user.states[user.stateID] = state
user.stateID++
Expand All @@ -27,7 +32,7 @@ func (user *user) closeState(ctx context.Context, state *State) error {
defer user.statesLock.Unlock()

if err := state.close(ctx, tx); err != nil {
return err
return fmt.Errorf("failed to close state: %w", err)
}

delete(user.states, user.stateID)
Expand Down
20 changes: 9 additions & 11 deletions internal/pchan/pchan.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ func (ch *PChan[T]) Push(val T, withPrio ...int) chan struct{} {
case <-ch.done:
panic("channel is closed")

default:
// ...
default: // ...
}

var prio int
Expand Down Expand Up @@ -77,11 +76,9 @@ func (ch *PChan[T]) Push(val T, withPrio ...int) chan struct{} {
// If the channel is already closed, the call returns immediately and the bool value is false.
func (ch *PChan[T]) Pop() (t T, ok bool) {
select {
case <-ch.ready:
// ...
case <-ch.ready: // ...

case <-ch.done:
// ...
case <-ch.done: // ...
}

ch.lock.Lock()
Expand Down Expand Up @@ -161,13 +158,14 @@ func (ch *PChan[T]) Close() []T {
case <-ch.done:
panic("channel is closed")

default:
// ...
default: // ...
}

for range ch.items {
<-ch.ready
}
go func() {
for range ch.items {
<-ch.ready
}
}()

close(ch.done)

Expand Down
83 changes: 83 additions & 0 deletions internal/remote/conn_metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package remote

import (
"bytes"
"encoding/gob"
)

type ConnMetadataID uint32

// connMetadataStore provides a storage container for any type of data that needs to be associated to a connection.
type connMetadataStore struct {
data map[ConnMetadataID]map[string]any
}

func newConnMetadataStore() connMetadataStore {
return connMetadataStore{
data: make(map[ConnMetadataID]map[string]any),
}
}

func (c *connMetadataStore) CreateStore(id ConnMetadataID) {
c.data[id] = make(map[string]any)
}

func (c *connMetadataStore) DeleteStore(id ConnMetadataID) {
delete(c.data, id)
}

func (c *connMetadataStore) GetActiveStoreIDs() []ConnMetadataID {
var values []ConnMetadataID

for k, _ := range c.data {
values = append(values, k)
}

return values
}

func (c *connMetadataStore) SetValue(id ConnMetadataID, key string, value any) bool {
valueStore, ok := c.data[id]

if !ok {
return false
}

valueStore[key] = value

return true
}

func (c *connMetadataStore) GetValue(id ConnMetadataID, key string) any {
valueStore, ok := c.data[id]

if !ok {
return false
}

value, ok := valueStore[key]

if !ok {
return nil
}

return value
}

func (c *connMetadataStore) MarshalBinary() ([]byte, error) {
buf := new(bytes.Buffer)

if err := gob.NewEncoder(buf).Encode(c.data); err != nil {
return nil, err
}

return buf.Bytes(), nil
}

func (c *connMetadataStore) UnmarshalBinary(data []byte) error {
return gob.NewDecoder(bytes.NewReader(data)).Decode(&c.data)
}

func init() {
gob.Register(&connMetadataStore{})
}
Loading

0 comments on commit bf2f1c3

Please sign in to comment.