Skip to content

Commit

Permalink
sdk: optimize caching lists (so we don't fetch twice in a row).
Browse files Browse the repository at this point in the history
  • Loading branch information
fiatjaf committed Sep 26, 2024
1 parent 1b786ab commit 2edc0fb
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 67 deletions.
42 changes: 28 additions & 14 deletions sdk/helpers.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,35 @@
package sdk

import (
"strings"
)
import "time"

// IsVirtualRelay returns true if the given normalized relay URL shouldn't be considered for outbox-model calculations.
func IsVirtualRelay(url string) bool {
if len(url) < 6 {
// this is just invalid
return true
}
var serial = 0

func pickNext(list []string) string {
serial++
return list[serial%len(list)]
}

if strings.HasPrefix(url, "wss://feeds.nostr.band") ||
strings.HasPrefix(url, "wss://filter.nostr.wine") ||
strings.HasPrefix(url, "wss://cache") {
return true
func doThisNotMoreThanOnceAnHour(key string) (doItNow bool) {
if _dtnmtoah == nil {
go func() {
_dtnmtoah = make(map[string]time.Time)
for {
time.Sleep(time.Minute * 10)
_dtnmtoahLock.Lock()
now := time.Now()
for k, v := range _dtnmtoah {
if v.Before(now) {
delete(_dtnmtoah, k)
}
}
_dtnmtoahLock.Unlock()
}
}()
}

return false
_dtnmtoahLock.Lock()
defer _dtnmtoahLock.Unlock()

_, exists := _dtnmtoah[key]
return !exists
}
43 changes: 29 additions & 14 deletions sdk/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sdk
import (
"context"
"slices"
"sync"
"time"

"github.com/nbd-wtf/go-nostr"
Expand All @@ -20,6 +21,11 @@ type TagItemWithValue interface {
Value() string
}

var (
genericListMutexes = [24]sync.Mutex{}
valueWasJustCached = [24]bool{}
)

func fetchGenericList[I TagItemWithValue](
sys *System,
ctx context.Context,
Expand All @@ -29,36 +35,45 @@ func fetchGenericList[I TagItemWithValue](
cache cache.Cache32[GenericList[I]],
skipFetch bool,
) (fl GenericList[I], fromInternal bool) {
if cache != nil {
if v, ok := cache.Get(pubkey); ok {
return v, true
}
// we have 24 mutexes, so we can load up to 24 lists at the same time, but if we do the same exact
// call that will do it only once, the subsequent ones will wait for a result to be cached
// and then return it from cache -- 13 is an arbitrary index for the pubkey
lockIdx := (int(pubkey[13]) + kind) % 24
genericListMutexes[lockIdx].Lock()

if valueWasJustCached[lockIdx] {
// this ensures the cache has had time to commit the values
// so we don't repeat a fetch immediately after the other
valueWasJustCached[lockIdx] = false
time.Sleep(time.Millisecond * 10)
}

defer genericListMutexes[lockIdx].Unlock()

if v, ok := cache.Get(pubkey); ok {
return v, true
}

v := GenericList[I]{PubKey: pubkey}

events, _ := sys.StoreRelay.QuerySync(ctx, nostr.Filter{Kinds: []int{kind}, Authors: []string{pubkey}})
if len(events) != 0 {
items := parseItemsFromEventTags(events[0], parseTag)
v := GenericList[I]{
PubKey: pubkey,
Event: events[0],
Items: items,
}
cache.SetWithTTL(pubkey, v, time.Hour*6)
v.Event = events[0]
v.Items = items
valueWasJustCached[lockIdx] = true
return v, true
}

v := GenericList[I]{PubKey: pubkey}
if !skipFetch {
thunk := sys.replaceableLoaders[kind].Load(ctx, pubkey)
evt, err := thunk()
if err == nil {
items := parseItemsFromEventTags(evt, parseTag)
v.Items = items
if cache != nil {
cache.SetWithTTL(pubkey, v, time.Hour*6)
}
sys.StoreRelay.Publish(ctx, *evt)
}
valueWasJustCached[lockIdx] = true
}

return v, false
Expand Down
7 changes: 2 additions & 5 deletions sdk/outbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,10 @@ func (sys *System) FetchOutboxRelays(ctx context.Context, pubkey string, n int)
return relays
}

if rl, ok := sys.RelayListCache.Get(pubkey); !ok || (rl.Event != nil && rl.Event.CreatedAt < nostr.Now()-60*60*24*7) {
// try to fetch relays list again if we don't have one or if ours is a week old
fetchGenericList(sys, ctx, pubkey, 10002, parseRelayFromKind10002, sys.RelayListCache, false)
}
// if we have it cached that means we have at least tried to fetch recently and it won't be tried again
fetchGenericList(sys, ctx, pubkey, 10002, parseRelayFromKind10002, sys.RelayListCache, false)

relays := sys.Hints.TopN(pubkey, 6)

if len(relays) == 0 {
return []string{"wss://relay.damus.io", "wss://nos.lol"}
}
Expand Down
10 changes: 3 additions & 7 deletions sdk/replaceable_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,8 @@ func (sys *System) initializeDataloaders() {

func (sys *System) createReplaceableDataloader(kind int) *dataloader.Loader[string, *nostr.Event] {
return dataloader.NewBatchedLoader(
func(
ctx context.Context,
pubkeys []string,
) []*dataloader.Result[*nostr.Event] {
return sys.batchLoadReplaceableEvents(ctx, kind, pubkeys)
func(_ context.Context, pubkeys []string) []*dataloader.Result[*nostr.Event] {
return sys.batchLoadReplaceableEvents(kind, pubkeys)
},
dataloader.WithBatchCapacity[string, *nostr.Event](60),
dataloader.WithClearCacheOnBatch[string, *nostr.Event](),
Expand All @@ -35,7 +32,6 @@ func (sys *System) createReplaceableDataloader(kind int) *dataloader.Loader[stri
}

func (sys *System) batchLoadReplaceableEvents(
ctx context.Context,
kind int,
pubkeys []string,
) []*dataloader.Result[*nostr.Event] {
Expand Down Expand Up @@ -67,7 +63,7 @@ func (sys *System) batchLoadReplaceableEvents(
}

// save attempts here so we don't try the same failed query over and over
if doItNow := DoThisNotMoreThanOnceAnHour("repl:" + strconv.Itoa(kind) + pubkey); !doItNow {
if doItNow := doThisNotMoreThanOnceAnHour("repl:" + strconv.Itoa(kind) + pubkey); !doItNow {
results[i] = &dataloader.Result[*nostr.Event]{
Error: fmt.Errorf("last attempt failed, waiting more to try again"),
}
Expand Down
39 changes: 12 additions & 27 deletions sdk/utils.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sdk

import (
"strings"
"sync"
"time"
)
Expand All @@ -10,34 +11,18 @@ var (
_dtnmtoahLock sync.Mutex
)

func DoThisNotMoreThanOnceAnHour(key string) (doItNow bool) {
if _dtnmtoah == nil {
go func() {
_dtnmtoah = make(map[string]time.Time)
for {
time.Sleep(time.Minute * 10)
_dtnmtoahLock.Lock()
now := time.Now()
for k, v := range _dtnmtoah {
if v.Before(now) {
delete(_dtnmtoah, k)
}
}
_dtnmtoahLock.Unlock()
}
}()
// IsVirtualRelay returns true if the given normalized relay URL shouldn't be considered for outbox-model calculations.
func IsVirtualRelay(url string) bool {
if len(url) < 6 {
// this is just invalid
return true
}

_dtnmtoahLock.Lock()
defer _dtnmtoahLock.Unlock()

_, exists := _dtnmtoah[key]
return !exists
}

var serial = 0
if strings.HasPrefix(url, "wss://feeds.nostr.band") ||
strings.HasPrefix(url, "wss://filter.nostr.wine") ||
strings.HasPrefix(url, "wss://cache") {
return true
}

func pickNext(list []string) string {
serial++
return list[serial%len(list)]
return false
}

0 comments on commit 2edc0fb

Please sign in to comment.