From 5fc77dbb50cbcdd769410c74c83177d9bdad3a78 Mon Sep 17 00:00:00 2001 From: James Houlahan Date: Tue, 7 Jun 2022 12:08:35 +0200 Subject: [PATCH] feat: Implement Server.RemoveUser --- connector/connector.go | 29 ++++++++++------ connector/dummy.go | 6 ---- internal/backend/backend.go | 28 +++++++++++++-- internal/backend/errors.go | 19 +++++----- internal/backend/state.go | 2 +- internal/pchan/pchan.go | 69 +++++++++++++++++++++---------------- internal/remote/manager.go | 29 ++++++++++++++++ internal/remote/user.go | 11 +++--- server.go | 13 +++++++ tests/server_test.go | 5 +++ tests/state_test.go | 8 ++--- 11 files changed, 152 insertions(+), 67 deletions(-) diff --git a/connector/connector.go b/connector/connector.go index fc238bc5..27481fd9 100644 --- a/connector/connector.go +++ b/connector/connector.go @@ -12,15 +12,6 @@ type Connector interface { // Authorize returns whether the given username/password combination are valid for this connector. Authorize(username, password string) bool - // GetUpdates returns a stream of updates that the gluon server should apply. - GetUpdates() <-chan imap.Update - - // Pause pauses the stream of updates. - Pause() - - // Resume resumes the stream of updates. - Resume() - // ValidateCreate checks whether a mailbox with the given name can be created. // If so, the flags, permanent flags and attributes which the mailbox would have are returned. ValidateCreate(name []string) (flags, permFlags, attrs imap.FlagSet, err error) @@ -43,12 +34,30 @@ type Connector interface { // DeleteLabel deletes the label with the given ID. DeleteLabel(ctx context.Context, mboxID string) error + // GetMessage returns the message with the given ID. GetMessage(ctx context.Context, messageID string) (imap.Message, []string, error) + + // CreateMessage creates a new message on the remote. CreateMessage(ctx context.Context, mboxID string, literal []byte, flags imap.FlagSet, date time.Time) (imap.Message, error) + + // LabelMessages labels the given messages with the given mailbox ID. LabelMessages(ctx context.Context, messageIDs []string, mboxID string) error + + // UnlabelMessages unlabels the given messages with the given mailbox ID. UnlabelMessages(ctx context.Context, messageIDs []string, mboxID string) error + + // MarkMessagesSeen sets the seen value of the given messages. MarkMessagesSeen(ctx context.Context, messageIDs []string, seen bool) error + + // MarkMessagesFlagged sets the flagged value of the given messages. MarkMessagesFlagged(ctx context.Context, messageIDs []string, flagged bool) error - Close() error + // GetUpdates returns a stream of updates that the gluon server should apply. + GetUpdates() <-chan imap.Update + + // Pause pauses the stream of updates. + Pause() + + // Resume resumes the stream of updates. + Resume() } diff --git a/connector/dummy.go b/connector/dummy.go index 14da24e2..fce77988 100644 --- a/connector/dummy.go +++ b/connector/dummy.go @@ -241,12 +241,6 @@ func (conn *Dummy) MarkMessagesFlagged(ctx context.Context, messageIDs []string, return nil } -func (conn *Dummy) Close() error { - conn.ticker.Stop() - - return nil -} - func (conn *Dummy) Sync(ctx context.Context) error { for _, mailbox := range conn.state.getLabels() { conn.updateCh <- imap.NewMailboxCreated(mailbox) diff --git a/internal/backend/backend.go b/internal/backend/backend.go index 4c08f130..e46b58ac 100644 --- a/internal/backend/backend.go +++ b/internal/backend/backend.go @@ -58,6 +58,28 @@ func (b *Backend) AddUser(conn connector.Connector, store store.Store, client *e return userID, nil } +func (b *Backend) RemoveUser(ctx context.Context, userID string) error { + b.usersLock.Lock() + defer b.usersLock.Unlock() + + user, ok := b.users[userID] + if !ok { + return ErrNoSuchUser + } + + if err := user.close(ctx); err != nil { + return fmt.Errorf("failed to close backend user: %w", err) + } + + if err := b.remote.RemoveUser(ctx, userID); err != nil { + return fmt.Errorf("failed to remove remote user: %w", err) + } + + delete(b.users, userID) + + return nil +} + func (b *Backend) GetState(username, password string) (*State, error) { b.usersLock.Lock() defer b.usersLock.Unlock() @@ -79,10 +101,12 @@ func (b *Backend) Close(ctx context.Context) error { b.usersLock.Lock() defer b.usersLock.Unlock() - for _, user := range b.users { + for userID, user := range b.users { if err := user.close(ctx); err != nil { - return fmt.Errorf("failed to close backend user: %w", err) + return fmt.Errorf("failed to close backend user (%v): %w", userID, err) } + + delete(b.users, userID) } logrus.Debug("Backend was closed") diff --git a/internal/backend/errors.go b/internal/backend/errors.go index 8961b617..1b49f2f7 100644 --- a/internal/backend/errors.go +++ b/internal/backend/errors.go @@ -3,12 +3,15 @@ package backend import "errors" var ( - ErrNoSuchSnapshot = errors.New("no such snapshot") - ErrNoSuchMessage = errors.New("no such message") - ErrNoSuchMailbox = errors.New("no such mailbox") - ErrExistingMailbox = errors.New("a mailbox with that name already exists") - ErrAlreadySubscribed = errors.New("already subscribed to this mailbox") - ErrAlreadyUnsubscribed = errors.New("not subscribed to this mailbox") - ErrSessionIsNotSelected = errors.New("session is not selected") - ErrNotImplemented = errors.New("not implemented") + ErrNoSuchUser = errors.New("no such user") + ErrNoSuchSnapshot = errors.New("no such snapshot") + ErrNoSuchMessage = errors.New("no such message") + ErrNoSuchMailbox = errors.New("no such mailbox") + + ErrExistingMailbox = errors.New("a mailbox with that name already exists") + ErrAlreadySubscribed = errors.New("already subscribed to this mailbox") + ErrAlreadyUnsubscribed = errors.New("not subscribed to this mailbox") + ErrSessionNotSelected = errors.New("session is not selected") + + ErrNotImplemented = errors.New("not implemented") ) diff --git a/internal/backend/state.go b/internal/backend/state.go index fe4e0408..fb2b3c35 100644 --- a/internal/backend/state.go +++ b/internal/backend/state.go @@ -260,7 +260,7 @@ func (state *State) Mailbox(ctx context.Context, name string, fn func(*Mailbox) func (state *State) Selected(ctx context.Context, fn func(*Mailbox) error) error { if !state.IsSelected() { - return ErrSessionIsNotSelected + return ErrSessionNotSelected } return state.tx(ctx, func(tx *ent.Tx) error { diff --git a/internal/pchan/pchan.go b/internal/pchan/pchan.go index 1b09d80a..ee023872 100644 --- a/internal/pchan/pchan.go +++ b/internal/pchan/pchan.go @@ -16,6 +16,7 @@ type PChan[T any] struct { ready chan struct{} done chan struct{} lock sync.Mutex + wg sync.WaitGroup } type item[T any] struct { @@ -46,6 +47,7 @@ func (ch *PChan[T]) Push(val T, withPrio ...int) chan struct{} { panic("channel is closed") default: + // ... } var prio int @@ -66,41 +68,61 @@ func (ch *PChan[T]) Push(val T, withPrio ...int) chan struct{} { go func() { ch.ready <- struct{}{} }() - return done -} - -// Len returns the number of items queued. -func (ch *PChan[T]) Len() int { - ch.lock.Lock() - defer ch.lock.Unlock() + ch.wg.Add(1) - return len(ch.items) + return done } // Pop blocks until an item is available, then returns that item. // If the channel is already closed, the call returns immediately and the bool value is false. -func (ch *PChan[T]) Pop() (T, bool) { +func (ch *PChan[T]) Pop() (t T, ok bool) { select { case <-ch.ready: + // ... + case <-ch.done: + // ... } - return ch.pop() + ch.lock.Lock() + defer ch.lock.Unlock() + + if len(ch.items) == 0 { + return t, false + } + + var item *item[T] + + item, ch.items = ch.items[0], ch.items[1:] + + defer close(item.done) + + ch.wg.Done() + + return item.val, true } // Peek returns the highest priority item, if any. // The bool is true if an item was available. -func (ch *PChan[T]) Peek() (T, bool) { +func (ch *PChan[T]) Peek() (t T, ok bool) { ch.lock.Lock() defer ch.lock.Unlock() if len(ch.items) == 0 { - return *new(T), false + return t, false } return ch.items[0].val, true } +// Len returns the number of items queued. +func (ch *PChan[T]) Len() int { + ch.lock.Lock() + defer ch.lock.Unlock() + + return len(ch.items) +} + // Range repeatedly calls the callback with items as they are pushed onto the channel. // It stops when the channel is closed. func (ch *PChan[T]) Range(fn func(T)) { @@ -125,6 +147,11 @@ func (ch *PChan[T]) Apply(fn func(T)) { } } +// Wait blocks until the queue is empty. +func (ch *PChan[T]) Wait() { + ch.wg.Wait() +} + // Close closes the channel, returning whatever was still queued on the channel. func (ch *PChan[T]) Close() []T { ch.lock.Lock() @@ -135,6 +162,7 @@ func (ch *PChan[T]) Close() []T { panic("channel is closed") default: + // ... } for range ch.items { @@ -159,23 +187,6 @@ func (ch *PChan[T]) String() string { return res } -func (ch *PChan[T]) pop() (T, bool) { - ch.lock.Lock() - defer ch.lock.Unlock() - - if len(ch.items) == 0 { - return *new(T), false - } - - var item *item[T] - - item, ch.items = ch.items[0], ch.items[1:] - - defer close(item.done) - - return item.val, true -} - //nolint:gosec func (ch *PChan[T]) getPosition(prio int) int { lo := slices.IndexFunc(ch.items, func(item *item[T]) bool { diff --git a/internal/remote/manager.go b/internal/remote/manager.go index 3215d5eb..ba8276b6 100644 --- a/internal/remote/manager.go +++ b/internal/remote/manager.go @@ -1,6 +1,7 @@ package remote import ( + "context" "errors" "fmt" "os" @@ -51,6 +52,34 @@ func (m *Manager) AddUser(userID string, conn connector.Connector) (*User, error return user, nil } +// RemoveUser removes the user with the given ID from the remote manager. +// It waits until all the user's queued operations have been performed. +// TODO: Find a better way to flush the operation queue? +func (m *Manager) RemoveUser(ctx context.Context, userID string) error { + m.usersLock.Lock() + defer m.usersLock.Unlock() + + user, ok := m.users[userID] + if !ok { + return ErrNoSuchUser + } + + user.queue.Wait() + + path, err := m.getQueuePath(userID) + if err != nil { + return err + } + + if err := os.Remove(path); err != nil { + return err + } + + delete(m.users, userID) + + return nil +} + // GetUserID returns the user ID of the user with the given credentials. func (m *Manager) GetUserID(username, password string) (string, error) { m.usersLock.Lock() diff --git a/internal/remote/user.go b/internal/remote/user.go index 8157bf4c..49e92193 100644 --- a/internal/remote/user.go +++ b/internal/remote/user.go @@ -2,6 +2,7 @@ package remote import ( "errors" + "fmt" "io" "os" "sync" @@ -70,7 +71,7 @@ func (user *User) GetUpdates() <-chan imap.Update { func (user *User) Close() error { ops, err := user.closeQueue() if err != nil { - return err + return fmt.Errorf("failed to close queue: %w", err) } if user.lastOp != nil { @@ -79,15 +80,11 @@ func (user *User) Close() error { b, err := saveOps(ops) if err != nil { - return err + return fmt.Errorf("failed to serialize operations: %w", err) } if err := os.WriteFile(user.path, b, 0o600); err != nil { - return err - } - - if err := user.conn.Close(); err != nil { - return err + return fmt.Errorf("failed to save operations: %w", err) } return nil diff --git a/server.go b/server.go index ac20e98e..d9972bae 100644 --- a/server.go +++ b/server.go @@ -83,6 +83,19 @@ func (s *Server) AddUser(conn connector.Connector, store store.Store, driver, so return userID, nil } +// RemoveUser removes a user from the mailserver. +func (s *Server) RemoveUser(ctx context.Context, userID string) error { + if err := s.backend.RemoveUser(ctx, userID); err != nil { + return err + } + + s.publish(events.EventUserRemoved{ + UserID: userID, + }) + + return nil +} + // AddWatcher adds a new watcher. func (s *Server) AddWatcher() chan events.Event { s.watchersLock.Lock() diff --git a/tests/server_test.go b/tests/server_test.go index 9ba921f4..0840bc79 100644 --- a/tests/server_test.go +++ b/tests/server_test.go @@ -84,6 +84,11 @@ func runServer(tb testing.TB, credentials map[string]string, delim string, tests conn.Flush() } + // Remove all users before shutdown. + for _, userID := range userIDs { + require.NoError(tb, server.RemoveUser(ctx, userID)) + } + // Expect the server to shut down successfully when closed. require.NoError(tb, server.Close(ctx)) require.NoError(tb, <-errCh) diff --git a/tests/state_test.go b/tests/state_test.go index dcb98ee5..43eb638d 100644 --- a/tests/state_test.go +++ b/tests/state_test.go @@ -49,7 +49,7 @@ func TestErrorsWhenAuthenticated(t *testing.T) { runOneToOneTestWithAuth(t, "user", "pass", "/", func(c *testConnection, _ *testSession) { for i, command := range notAuthenticatedCommands { c.C(fmt.Sprintf("%d %v", i, command)) - c.Sx(fmt.Sprintf("%d BAD %v", i, session.ErrAlreadyAuthenticated.Error())) + c.Sx(fmt.Sprintf("%d BAD %v", i, session.ErrAlreadyAuthenticated)) } }) } @@ -58,14 +58,14 @@ func TestErrorsWhenNotAuthenticated(t *testing.T) { runOneToOneTest(t, "user", "pass", "/", func(c *testConnection, _ *testSession) { for i, command := range append(authenticatedCommands, selectedCommands...) { c.C(fmt.Sprintf("%d %v", i, command)) - c.Sx(fmt.Sprintf("%d NO %v", i, session.ErrNotAuthenticated.Error())) + c.Sx(fmt.Sprintf("%d NO %v", i, session.ErrNotAuthenticated)) } // Currently, the parser requires to read the message content before the error can be reported. c.C(`A001 APPEND INBOX {2}`) c.Sx(`\+ `) c.C(`12`) - c.Sx(session.ErrNotAuthenticated.Error()) + c.Sx(fmt.Sprintf("NO %v", session.ErrNotAuthenticated)) }) } @@ -73,7 +73,7 @@ func TestErrorsWhenNotSelected(t *testing.T) { runOneToOneTestWithAuth(t, "user", "pass", "/", func(c *testConnection, _ *testSession) { for i, command := range selectedCommands { c.C(fmt.Sprintf("%d %v", i, command)) - c.Sx(fmt.Sprintf("%d NO %v", i, backend.ErrSessionIsNotSelected.Error())) + c.Sx(fmt.Sprintf("%d NO %v", i, backend.ErrSessionNotSelected)) } }) }