Skip to content

Commit

Permalink
feat: Implement Server.RemoveUser
Browse files Browse the repository at this point in the history
  • Loading branch information
jameshoulahan committed Jun 7, 2022
1 parent 2ec2361 commit 5fc77db
Show file tree
Hide file tree
Showing 11 changed files with 152 additions and 67 deletions.
29 changes: 19 additions & 10 deletions connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,6 @@ type Connector interface {
// Authorize returns whether the given username/password combination are valid for this connector.
Authorize(username, password string) bool

// GetUpdates returns a stream of updates that the gluon server should apply.
GetUpdates() <-chan imap.Update

// Pause pauses the stream of updates.
Pause()

// Resume resumes the stream of updates.
Resume()

// ValidateCreate checks whether a mailbox with the given name can be created.
// If so, the flags, permanent flags and attributes which the mailbox would have are returned.
ValidateCreate(name []string) (flags, permFlags, attrs imap.FlagSet, err error)
Expand All @@ -43,12 +34,30 @@ type Connector interface {
// DeleteLabel deletes the label with the given ID.
DeleteLabel(ctx context.Context, mboxID string) error

// GetMessage returns the message with the given ID.
GetMessage(ctx context.Context, messageID string) (imap.Message, []string, error)

// CreateMessage creates a new message on the remote.
CreateMessage(ctx context.Context, mboxID string, literal []byte, flags imap.FlagSet, date time.Time) (imap.Message, error)

// LabelMessages labels the given messages with the given mailbox ID.
LabelMessages(ctx context.Context, messageIDs []string, mboxID string) error

// UnlabelMessages unlabels the given messages with the given mailbox ID.
UnlabelMessages(ctx context.Context, messageIDs []string, mboxID string) error

// MarkMessagesSeen sets the seen value of the given messages.
MarkMessagesSeen(ctx context.Context, messageIDs []string, seen bool) error

// MarkMessagesFlagged sets the flagged value of the given messages.
MarkMessagesFlagged(ctx context.Context, messageIDs []string, flagged bool) error

Close() error
// GetUpdates returns a stream of updates that the gluon server should apply.
GetUpdates() <-chan imap.Update

// Pause pauses the stream of updates.
Pause()

// Resume resumes the stream of updates.
Resume()
}
6 changes: 0 additions & 6 deletions connector/dummy.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,12 +241,6 @@ func (conn *Dummy) MarkMessagesFlagged(ctx context.Context, messageIDs []string,
return nil
}

func (conn *Dummy) Close() error {
conn.ticker.Stop()

return nil
}

func (conn *Dummy) Sync(ctx context.Context) error {
for _, mailbox := range conn.state.getLabels() {
conn.updateCh <- imap.NewMailboxCreated(mailbox)
Expand Down
28 changes: 26 additions & 2 deletions internal/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,28 @@ func (b *Backend) AddUser(conn connector.Connector, store store.Store, client *e
return userID, nil
}

func (b *Backend) RemoveUser(ctx context.Context, userID string) error {
b.usersLock.Lock()
defer b.usersLock.Unlock()

user, ok := b.users[userID]
if !ok {
return ErrNoSuchUser
}

if err := user.close(ctx); err != nil {
return fmt.Errorf("failed to close backend user: %w", err)
}

if err := b.remote.RemoveUser(ctx, userID); err != nil {
return fmt.Errorf("failed to remove remote user: %w", err)
}

delete(b.users, userID)

return nil
}

func (b *Backend) GetState(username, password string) (*State, error) {
b.usersLock.Lock()
defer b.usersLock.Unlock()
Expand All @@ -79,10 +101,12 @@ func (b *Backend) Close(ctx context.Context) error {
b.usersLock.Lock()
defer b.usersLock.Unlock()

for _, user := range b.users {
for userID, user := range b.users {
if err := user.close(ctx); err != nil {
return fmt.Errorf("failed to close backend user: %w", err)
return fmt.Errorf("failed to close backend user (%v): %w", userID, err)
}

delete(b.users, userID)
}

logrus.Debug("Backend was closed")
Expand Down
19 changes: 11 additions & 8 deletions internal/backend/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ package backend
import "errors"

var (
ErrNoSuchSnapshot = errors.New("no such snapshot")
ErrNoSuchMessage = errors.New("no such message")
ErrNoSuchMailbox = errors.New("no such mailbox")
ErrExistingMailbox = errors.New("a mailbox with that name already exists")
ErrAlreadySubscribed = errors.New("already subscribed to this mailbox")
ErrAlreadyUnsubscribed = errors.New("not subscribed to this mailbox")
ErrSessionIsNotSelected = errors.New("session is not selected")
ErrNotImplemented = errors.New("not implemented")
ErrNoSuchUser = errors.New("no such user")
ErrNoSuchSnapshot = errors.New("no such snapshot")
ErrNoSuchMessage = errors.New("no such message")
ErrNoSuchMailbox = errors.New("no such mailbox")

ErrExistingMailbox = errors.New("a mailbox with that name already exists")
ErrAlreadySubscribed = errors.New("already subscribed to this mailbox")
ErrAlreadyUnsubscribed = errors.New("not subscribed to this mailbox")
ErrSessionNotSelected = errors.New("session is not selected")

ErrNotImplemented = errors.New("not implemented")
)
2 changes: 1 addition & 1 deletion internal/backend/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (state *State) Mailbox(ctx context.Context, name string, fn func(*Mailbox)

func (state *State) Selected(ctx context.Context, fn func(*Mailbox) error) error {
if !state.IsSelected() {
return ErrSessionIsNotSelected
return ErrSessionNotSelected
}

return state.tx(ctx, func(tx *ent.Tx) error {
Expand Down
69 changes: 40 additions & 29 deletions internal/pchan/pchan.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type PChan[T any] struct {
ready chan struct{}
done chan struct{}
lock sync.Mutex
wg sync.WaitGroup
}

type item[T any] struct {
Expand Down Expand Up @@ -46,6 +47,7 @@ func (ch *PChan[T]) Push(val T, withPrio ...int) chan struct{} {
panic("channel is closed")

default:
// ...
}

var prio int
Expand All @@ -66,41 +68,61 @@ func (ch *PChan[T]) Push(val T, withPrio ...int) chan struct{} {

go func() { ch.ready <- struct{}{} }()

return done
}

// Len returns the number of items queued.
func (ch *PChan[T]) Len() int {
ch.lock.Lock()
defer ch.lock.Unlock()
ch.wg.Add(1)

return len(ch.items)
return done
}

// Pop blocks until an item is available, then returns that item.
// If the channel is already closed, the call returns immediately and the bool value is false.
func (ch *PChan[T]) Pop() (T, bool) {
func (ch *PChan[T]) Pop() (t T, ok bool) {
select {
case <-ch.ready:
// ...

case <-ch.done:
// ...
}

return ch.pop()
ch.lock.Lock()
defer ch.lock.Unlock()

if len(ch.items) == 0 {
return t, false
}

var item *item[T]

item, ch.items = ch.items[0], ch.items[1:]

defer close(item.done)

ch.wg.Done()

return item.val, true
}

// Peek returns the highest priority item, if any.
// The bool is true if an item was available.
func (ch *PChan[T]) Peek() (T, bool) {
func (ch *PChan[T]) Peek() (t T, ok bool) {
ch.lock.Lock()
defer ch.lock.Unlock()

if len(ch.items) == 0 {
return *new(T), false
return t, false
}

return ch.items[0].val, true
}

// Len returns the number of items queued.
func (ch *PChan[T]) Len() int {
ch.lock.Lock()
defer ch.lock.Unlock()

return len(ch.items)
}

// Range repeatedly calls the callback with items as they are pushed onto the channel.
// It stops when the channel is closed.
func (ch *PChan[T]) Range(fn func(T)) {
Expand All @@ -125,6 +147,11 @@ func (ch *PChan[T]) Apply(fn func(T)) {
}
}

// Wait blocks until the queue is empty.
func (ch *PChan[T]) Wait() {
ch.wg.Wait()
}

// Close closes the channel, returning whatever was still queued on the channel.
func (ch *PChan[T]) Close() []T {
ch.lock.Lock()
Expand All @@ -135,6 +162,7 @@ func (ch *PChan[T]) Close() []T {
panic("channel is closed")

default:
// ...
}

for range ch.items {
Expand All @@ -159,23 +187,6 @@ func (ch *PChan[T]) String() string {
return res
}

func (ch *PChan[T]) pop() (T, bool) {
ch.lock.Lock()
defer ch.lock.Unlock()

if len(ch.items) == 0 {
return *new(T), false
}

var item *item[T]

item, ch.items = ch.items[0], ch.items[1:]

defer close(item.done)

return item.val, true
}

//nolint:gosec
func (ch *PChan[T]) getPosition(prio int) int {
lo := slices.IndexFunc(ch.items, func(item *item[T]) bool {
Expand Down
29 changes: 29 additions & 0 deletions internal/remote/manager.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package remote

import (
"context"
"errors"
"fmt"
"os"
Expand Down Expand Up @@ -51,6 +52,34 @@ func (m *Manager) AddUser(userID string, conn connector.Connector) (*User, error
return user, nil
}

// RemoveUser removes the user with the given ID from the remote manager.
// It waits until all the user's queued operations have been performed.
// TODO: Find a better way to flush the operation queue?
func (m *Manager) RemoveUser(ctx context.Context, userID string) error {
m.usersLock.Lock()
defer m.usersLock.Unlock()

user, ok := m.users[userID]
if !ok {
return ErrNoSuchUser
}

user.queue.Wait()

path, err := m.getQueuePath(userID)
if err != nil {
return err
}

if err := os.Remove(path); err != nil {
return err
}

delete(m.users, userID)

return nil
}

// GetUserID returns the user ID of the user with the given credentials.
func (m *Manager) GetUserID(username, password string) (string, error) {
m.usersLock.Lock()
Expand Down
11 changes: 4 additions & 7 deletions internal/remote/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package remote

import (
"errors"
"fmt"
"io"
"os"
"sync"
Expand Down Expand Up @@ -70,7 +71,7 @@ func (user *User) GetUpdates() <-chan imap.Update {
func (user *User) Close() error {
ops, err := user.closeQueue()
if err != nil {
return err
return fmt.Errorf("failed to close queue: %w", err)
}

if user.lastOp != nil {
Expand All @@ -79,15 +80,11 @@ func (user *User) Close() error {

b, err := saveOps(ops)
if err != nil {
return err
return fmt.Errorf("failed to serialize operations: %w", err)
}

if err := os.WriteFile(user.path, b, 0o600); err != nil {
return err
}

if err := user.conn.Close(); err != nil {
return err
return fmt.Errorf("failed to save operations: %w", err)
}

return nil
Expand Down
13 changes: 13 additions & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,19 @@ func (s *Server) AddUser(conn connector.Connector, store store.Store, driver, so
return userID, nil
}

// RemoveUser removes a user from the mailserver.
func (s *Server) RemoveUser(ctx context.Context, userID string) error {
if err := s.backend.RemoveUser(ctx, userID); err != nil {
return err
}

s.publish(events.EventUserRemoved{
UserID: userID,
})

return nil
}

// AddWatcher adds a new watcher.
func (s *Server) AddWatcher() chan events.Event {
s.watchersLock.Lock()
Expand Down
5 changes: 5 additions & 0 deletions tests/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ func runServer(tb testing.TB, credentials map[string]string, delim string, tests
conn.Flush()
}

// Remove all users before shutdown.
for _, userID := range userIDs {
require.NoError(tb, server.RemoveUser(ctx, userID))
}

// Expect the server to shut down successfully when closed.
require.NoError(tb, server.Close(ctx))
require.NoError(tb, <-errCh)
Expand Down
Loading

0 comments on commit 5fc77db

Please sign in to comment.