Skip to content

Commit

Permalink
feat: Give update.Wait() a context parameter so waiting can be cancelled
Browse files Browse the repository at this point in the history
  • Loading branch information
jameshoulahan authored and LBeernaertProton committed Oct 10, 2022
1 parent 51dfe8c commit 4d561c1
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 15 deletions.
5 changes: 3 additions & 2 deletions connector/dummy.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func NewDummy(usernames []string, password []byte, period time.Duration, flags,
conn.ticker.Tick(func(time.Time) {
for _, update := range conn.popUpdates() {
defer update.Wait()

select {
case conn.updateCh <- update:
continue
Expand Down Expand Up @@ -258,7 +259,7 @@ func (conn *Dummy) SetUIDValidity(imap.UID) error {
func (conn *Dummy) Sync(ctx context.Context) error {
for _, mailbox := range conn.state.getLabels() {
update := imap.NewMailboxCreated(mailbox)
defer update.Wait()
defer update.WaitContext(ctx)

conn.updateCh <- update
}
Expand All @@ -275,7 +276,7 @@ func (conn *Dummy) Sync(ctx context.Context) error {
}

update := imap.NewMessagesCreated(updates...)
defer update.Wait()
defer update.WaitContext(ctx)

conn.updateCh <- update

Expand Down
22 changes: 21 additions & 1 deletion imap/update_waiter.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
package imap

import "sync"
import (
"context"
"sync"
)

type Waiter interface {
// Wait waits until the update has been marked as done.
Wait()

// WaitContext waits until the update has been marked as done or the context is cancelled.
WaitContext(context.Context)

// Done marks the update as done.
Done()
}

Expand All @@ -23,6 +32,17 @@ func (w *updateWaiter) Wait() {
w.wg.Wait()
}

func (w *updateWaiter) WaitContext(ctx context.Context) {
waitCh := make(chan struct{})

go func() { w.wg.Wait(); close(waitCh) }()

select {
case <-ctx.Done():
case <-waitCh:
}
}

func (w *updateWaiter) Done() {
w.wg.Done()
}
4 changes: 2 additions & 2 deletions internal/backend/state_connector_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (sc *stateConnectorImpl) refresh(ctx context.Context, messageIDs []imap.Mes
return err
}

sc.user.updateInjector.send(imap.NewMessageLabelsUpdated(
sc.user.updateInjector.send(ctx, imap.NewMessageLabelsUpdated(
message.ID,
mboxIDs,
message.Flags.ContainsUnchecked(imap.FlagSeenLowerCase),
Expand All @@ -184,7 +184,7 @@ func (sc *stateConnectorImpl) refresh(ctx context.Context, messageIDs []imap.Mes
return err
}

sc.user.updateInjector.send(imap.NewMailboxUpdated(
sc.user.updateInjector.send(ctx, imap.NewMailboxUpdated(
mailbox.ID,
mailbox.Name,
), true)
Expand Down
22 changes: 12 additions & 10 deletions internal/backend/update_injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@ func newUpdateInjector(ctx context.Context, connector connector.Connector, userI
injector.forwardWG.Add(1)

go func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

labels := pprof.Labels("go", "forward()", "UserID", userID)
pprof.Do(ctx, labels, func(_ context.Context) {
injector.forward(connector.GetUpdates())
pprof.Do(ctx, labels, func(ctx context.Context) {
injector.forward(ctx, connector.GetUpdates())
})
}()

Expand All @@ -51,7 +54,7 @@ func (u *updateInjector) Close(ctx context.Context) error {
}

// forward pulls updates off the stream and forwards them to the outgoing update channel.
func (u *updateInjector) forward(updateCh <-chan imap.Update) {
func (u *updateInjector) forward(ctx context.Context, updateCh <-chan imap.Update) {
defer func() {
close(u.updatesCh)
u.forwardWG.Done()
Expand All @@ -64,7 +67,7 @@ func (u *updateInjector) forward(updateCh <-chan imap.Update) {
return
}

u.send(update)
u.send(ctx, update)

case <-u.forwardQuitCh:
return
Expand All @@ -73,15 +76,14 @@ func (u *updateInjector) forward(updateCh <-chan imap.Update) {
}

// send the update on the updates channel, optionally blocking until it has been processed.
func (u *updateInjector) send(update imap.Update, withBlock ...bool) {
func (u *updateInjector) send(ctx context.Context, update imap.Update, withBlock ...bool) {
select {
case u.updatesCh <- update:

case <-u.forwardQuitCh:
return
}

if len(withBlock) > 0 && withBlock[0] {
update.Wait()
case u.updatesCh <- update:
if len(withBlock) > 0 && withBlock[0] {
update.WaitContext(ctx)
}
}
}

0 comments on commit 4d561c1

Please sign in to comment.