Skip to content

Commit

Permalink
Drop old events after some time
Browse files Browse the repository at this point in the history
The matter of the fact is that some events are stuck in the queue and we
just can't post tweets for them. As discussed earlier this may be
because someone revoked permissions for our app or somehow ran out of
their personal API limit - the exact reason is unclear and doesn't
really matter.

This commit removes events which are stuck in the queue after some time.

The reasoning for this is twofold. Firstly the likelihood of actually
successfuly posting a tweet which is stuck is very low. As far as I can
tell they are just stuck. Secondly posting tweets for notes which are
old can be confusing and I'd personally be surprised if I suddently had
tweets for notes that are weeks old posted in my profile.

This rollout has to happen in two stages. First we start emitting tweet
created events containing new data (nostr events). After a week passes
we drop all tweet created events that don't have nostr events in them
and change the code to no longer support tweet created events with
missing nostr events.
  • Loading branch information
boreq committed Nov 17, 2023
1 parent 10bb7b4 commit ca1a0f8
Show file tree
Hide file tree
Showing 24 changed files with 800 additions and 22 deletions.
40 changes: 40 additions & 0 deletions cmd/crossposting-service/di/inject_adapters.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/google/wire"
"github.com/planetary-social/nos-crossposting-service/internal/logging"
"github.com/planetary-social/nos-crossposting-service/service/adapters"
"github.com/planetary-social/nos-crossposting-service/service/adapters/mocks"
"github.com/planetary-social/nos-crossposting-service/service/adapters/prometheus"
"github.com/planetary-social/nos-crossposting-service/service/adapters/sqlite"
"github.com/planetary-social/nos-crossposting-service/service/adapters/twitter"
Expand Down Expand Up @@ -73,6 +74,45 @@ var adaptersSet = wire.NewSet(

adapters.NewTwitterAccountDetailsCache,
wire.Bind(new(app.TwitterAccountDetailsCache), new(*adapters.TwitterAccountDetailsCache)),

adapters.NewCurrentTimeProvider,
wire.Bind(new(app.CurrentTimeProvider), new(*adapters.CurrentTimeProvider)),
)

var testAdaptersSet = wire.NewSet(
prometheus.NewPrometheus,
wire.Bind(new(app.Metrics), new(*prometheus.Prometheus)),

mocks.NewTwitter,
wire.Bind(new(app.Twitter), new(*mocks.Twitter)),

mocks.NewCurrentTimeProvider,
wire.Bind(new(app.CurrentTimeProvider), new(*mocks.CurrentTimeProvider)),
)

var mockTxAdaptersSet = wire.NewSet(
mocks.NewTransactionProvider,
wire.Bind(new(app.TransactionProvider), new(*mocks.TransactionProvider)),

wire.Struct(new(app.Adapters), "*"),

mocks.NewAccountRepository,
wire.Bind(new(app.AccountRepository), new(*mocks.AccountRepository)),

mocks.NewSessionRepository,
wire.Bind(new(app.SessionRepository), new(*mocks.SessionRepository)),

mocks.NewPublicKeyRepository,
wire.Bind(new(app.PublicKeyRepository), new(*mocks.PublicKeyRepository)),

mocks.NewProcessedEventRepository,
wire.Bind(new(app.ProcessedEventRepository), new(*mocks.ProcessedEventRepository)),

mocks.NewUserTokensRepository,
wire.Bind(new(app.UserTokensRepository), new(*mocks.UserTokensRepository)),

mocks.NewPublisher,
wire.Bind(new(app.Publisher), new(*mocks.Publisher)),
)

func newAdaptersFactoryFn(deps buildTransactionSqliteAdaptersDependencies) sqlite.AdaptersFactoryFn {
Expand Down
1 change: 1 addition & 0 deletions cmd/crossposting-service/di/inject_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ var sqlitePubsubSet = wire.NewSet(

sqlite.NewSubscriber,
wire.Bind(new(app.Subscriber), new(*sqlite.Subscriber)),
wire.Bind(new(sqlitepubsubport.SqliteSubscriber), new(*sqlite.Subscriber)),
)

var sqliteTxPubsubSet = wire.NewSet(
Expand Down
22 changes: 22 additions & 0 deletions cmd/crossposting-service/di/wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/google/wire"
"github.com/planetary-social/nos-crossposting-service/internal/fixtures"
"github.com/planetary-social/nos-crossposting-service/internal/logging"
"github.com/planetary-social/nos-crossposting-service/service/adapters/mocks"
"github.com/planetary-social/nos-crossposting-service/service/adapters/sqlite"
"github.com/planetary-social/nos-crossposting-service/service/app"
"github.com/planetary-social/nos-crossposting-service/service/config"
Expand Down Expand Up @@ -49,6 +50,27 @@ func BuildTestAdapters(context.Context, testing.TB) (sqlite.TestedItems, func(),
return sqlite.TestedItems{}, nil, nil
}

type TestApplication struct {
SendTweetHandler *app.SendTweetHandler

CurrentTimeProvider *mocks.CurrentTimeProvider
UserTokensRepository *mocks.UserTokensRepository
Twitter *mocks.Twitter
}

func BuildTestApplication(tb testing.TB) (TestApplication, error) {
wire.Build(
wire.Struct(new(TestApplication), "*"),

applicationSet,
testAdaptersSet,
mockTxAdaptersSet,

fixtures.TestLogger,
)
return TestApplication{}, nil
}

func newTestAdaptersConfig(tb testing.TB) (config.Config, error) {
return config.NewConfig(
fixtures.SomeString(),
Expand Down
62 changes: 60 additions & 2 deletions cmd/crossposting-service/di/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

47 changes: 47 additions & 0 deletions internal/fixtures/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import (
"math/rand"
"os"
"testing"
"time"

"github.com/nbd-wtf/go-nostr"
"github.com/planetary-social/nos-crossposting-service/internal"
"github.com/planetary-social/nos-crossposting-service/internal/logging"
"github.com/planetary-social/nos-crossposting-service/service/domain"
"github.com/planetary-social/nos-crossposting-service/service/domain/accounts"
"github.com/planetary-social/nos-crossposting-service/service/domain/sessions"
Expand Down Expand Up @@ -130,6 +132,51 @@ func SomeError() error {
return fmt.Errorf("some error: %d", rand.Int())
}

func SomeTwitterUserAccessToken() accounts.TwitterUserAccessToken {
v, err := accounts.NewTwitterUserAccessToken(SomeString())
if err != nil {
panic(err)
}
return v
}

func SomeTwitterUserAccessSecret() accounts.TwitterUserAccessSecret {
v, err := accounts.NewTwitterUserAccessSecret(SomeString())
if err != nil {
panic(err)
}
return v
}

func SomeEventWithCreatedAt(createdAt time.Time) domain.Event {
_, sk := SomeKeyPair()

libevent := nostr.Event{
CreatedAt: nostr.Timestamp(createdAt.Unix()),
Kind: domain.EventKindNote.Int(),
Content: SomeString(),
}
err := libevent.Sign(sk)
if err != nil {
panic(err)
}

event, err := domain.NewEvent(libevent)
if err != nil {
panic(err)
}

return event
}

func SomeEvent() domain.Event {
return SomeEventWithCreatedAt(time.Now())
}

func TestLogger(tb testing.TB) logging.Logger {
return logging.NewSystemLogger(logging.NewTestingLoggingSystem(tb), "test")
}

var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

func randSeq(n int) string {
Expand Down
14 changes: 14 additions & 0 deletions service/adapters/current_time_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package adapters

import "time"

type CurrentTimeProvider struct {
}

func NewCurrentTimeProvider() *CurrentTimeProvider {
return &CurrentTimeProvider{}
}

func (c CurrentTimeProvider) GetCurrentTime() time.Time {
return time.Now()
}
29 changes: 29 additions & 0 deletions service/adapters/mocks/account_repository.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package mocks

import (
"github.com/boreq/errors"
"github.com/planetary-social/nos-crossposting-service/service/domain/accounts"
)

type AccountRepository struct {
}

func NewAccountRepository() (*AccountRepository, error) {
return &AccountRepository{}, nil
}

func (m *AccountRepository) GetByTwitterID(twitterID accounts.TwitterID) (*accounts.Account, error) {
return nil, errors.New("not implemented")
}

func (m *AccountRepository) GetByAccountID(accountID accounts.AccountID) (*accounts.Account, error) {
return nil, errors.New("not implemented")
}

func (m *AccountRepository) Save(account *accounts.Account) error {
return errors.New("not implemented")
}

func (m *AccountRepository) Count() (int, error) {
return 0, errors.New("not implemented")
}
19 changes: 19 additions & 0 deletions service/adapters/mocks/current_time_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package mocks

import "time"

type CurrentTimeProvider struct {
CurrentTime time.Time
}

func NewCurrentTimeProvider() *CurrentTimeProvider {
return &CurrentTimeProvider{}
}

func (c *CurrentTimeProvider) GetCurrentTime() time.Time {
return c.CurrentTime
}

func (c *CurrentTimeProvider) SetCurrentTime(currentTime time.Time) {
c.CurrentTime = currentTime
}
22 changes: 22 additions & 0 deletions service/adapters/mocks/processed_event_repository.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package mocks

import (
"github.com/boreq/errors"
"github.com/planetary-social/nos-crossposting-service/service/domain"
"github.com/planetary-social/nos-crossposting-service/service/domain/accounts"
)

type ProcessedEventRepository struct {
}

func NewProcessedEventRepository() (*ProcessedEventRepository, error) {
return &ProcessedEventRepository{}, nil
}

func (m *ProcessedEventRepository) Save(eventID domain.EventId, twitterID accounts.TwitterID) error {
return errors.New("not implemented")
}

func (m *ProcessedEventRepository) WasProcessed(eventID domain.EventId, twitterID accounts.TwitterID) (bool, error) {
return false, errors.New("not implemented")
}
38 changes: 38 additions & 0 deletions service/adapters/mocks/public_key_repository.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package mocks

import (
"github.com/boreq/errors"
"github.com/planetary-social/nos-crossposting-service/service/domain"
"github.com/planetary-social/nos-crossposting-service/service/domain/accounts"
)

type PublicKeyRepository struct {
}

func NewPublicKeyRepository() (*PublicKeyRepository, error) {
return &PublicKeyRepository{}, nil
}

func (m *PublicKeyRepository) Save(linkedPublicKey *domain.LinkedPublicKey) error {
return errors.New("not implemented")
}

func (m *PublicKeyRepository) Delete(accountID accounts.AccountID, publicKey domain.PublicKey) error {
return errors.New("not implemented")
}

func (m *PublicKeyRepository) List() ([]*domain.LinkedPublicKey, error) {
return nil, errors.New("not implemented")
}

func (m *PublicKeyRepository) ListByPublicKey(publicKey domain.PublicKey) ([]*domain.LinkedPublicKey, error) {
return nil, errors.New("not implemented")
}

func (m *PublicKeyRepository) ListByAccountID(accountID accounts.AccountID) ([]*domain.LinkedPublicKey, error) {
return nil, errors.New("not implemented")
}

func (m *PublicKeyRepository) Count() (int, error) {
return 0, errors.New("not implemented")
}
16 changes: 16 additions & 0 deletions service/adapters/mocks/publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package mocks

import (
"github.com/planetary-social/nos-crossposting-service/service/app"
)

type Publisher struct {
}

func NewPublisher() *Publisher {
return &Publisher{}
}

func (p *Publisher) PublishTweetCreated(event app.TweetCreatedEvent) error {
return nil
}
Loading

0 comments on commit ca1a0f8

Please sign in to comment.