Skip to content

Commit

Permalink
chore: sync tables without acquiring read lock the whole time (#14179)
Browse files Browse the repository at this point in the history
  • Loading branch information
ashwanthgoli authored Sep 27, 2024
1 parent 17c472d commit a584fb7
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 5 deletions.
1 change: 1 addition & 0 deletions pkg/storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1894,6 +1894,7 @@ func TestStore_BoltdbTsdbSameIndexPrefix(t *testing.T) {

// recreate the store because boltdb-shipper now runs queriers on snapshots which are created every 1 min and during startup.
store.Stop()
ResetBoltDBIndexClientsWithShipper()

// there should be 2 index tables in the object storage
indexTables, err := os.ReadDir(filepath.Join(cfg.FSConfig.Directory, "index"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,11 @@ func (t *indexSet) cleanupDB(fileName string) error {
}

func (t *indexSet) Sync(ctx context.Context) (err error) {
if !t.indexMtx.isReady() {
level.Info(t.logger).Log("msg", "skip sync since the index set is not ready")
return nil
}

return t.syncWithRetry(ctx, true, false)
}

Expand Down
18 changes: 16 additions & 2 deletions pkg/storage/stores/shipper/indexshipper/downloads/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/go-kit/log/level"
"github.com/grafana/dskit/concurrency"
"github.com/pkg/errors"
"golang.org/x/exp/maps"
"golang.org/x/sync/errgroup"

"github.com/grafana/loki/v3/pkg/storage/chunk/client/util"
Expand Down Expand Up @@ -271,9 +272,22 @@ func (t *table) Sync(ctx context.Context) error {
level.Debug(t.logger).Log("msg", fmt.Sprintf("syncing files for table %s", t.name))

t.indexSetsMtx.RLock()
defer t.indexSetsMtx.RUnlock()
users := maps.Keys(t.indexSets)
t.indexSetsMtx.RUnlock()

for _, userID := range users {
if err := ctx.Err(); err != nil {
return err
}

t.indexSetsMtx.RLock()
indexSet, ok := t.indexSets[userID]
t.indexSetsMtx.RUnlock()

if !ok {
continue
}

for userID, indexSet := range t.indexSets {
if err := indexSet.Sync(ctx); err != nil {
return errors.Wrap(err, fmt.Sprintf("failed to sync index set %s for table %s", userID, t.name))
}
Expand Down
25 changes: 22 additions & 3 deletions pkg/storage/stores/shipper/indexshipper/downloads/table_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"golang.org/x/exp/maps"

"github.com/grafana/loki/v3/pkg/compactor/deletion"
"github.com/grafana/loki/v3/pkg/storage/chunk/client/util"
Expand Down Expand Up @@ -180,6 +181,10 @@ func (tm *tableManager) ForEach(ctx context.Context, tableName, userID string, c
}

func (tm *tableManager) getOrCreateTable(tableName string) (Table, error) {
if tm.ctx.Err() != nil {
return nil, errors.New("table manager is stopping")
}

// if table is already there, use it.
start := time.Now()
tm.tablesMtx.RLock()
Expand Down Expand Up @@ -214,7 +219,8 @@ func (tm *tableManager) getOrCreateTable(tableName string) (Table, error) {

func (tm *tableManager) syncTables(ctx context.Context) error {
tm.tablesMtx.RLock()
defer tm.tablesMtx.RUnlock()
tables := maps.Keys(tm.tables)
tm.tablesMtx.RUnlock()

start := time.Now()
var err error
Expand All @@ -231,11 +237,24 @@ func (tm *tableManager) syncTables(ctx context.Context) error {

level.Info(tm.logger).Log("msg", "syncing tables")

for name, table := range tm.tables {
for _, name := range tables {
if err := ctx.Err(); err != nil {
return err
}

level.Debug(tm.logger).Log("msg", "syncing table", "table", name)
start := time.Now()

tm.tablesMtx.RLock()
table, ok := tm.tables[name]
tm.tablesMtx.RUnlock()

if !ok {
continue
}

err := table.Sync(ctx)
duration := float64(time.Since(start))
duration := time.Since(start).Seconds()
if err != nil {
tm.metrics.tableSyncLatency.WithLabelValues(name, statusFailure).Observe(duration)
return errors.Wrapf(err, "failed to sync table '%s'", name)
Expand Down
9 changes: 9 additions & 0 deletions pkg/storage/stores/shipper/indexshipper/downloads/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,15 @@ func (m *mtxWithReadiness) markReady() {
close(m.ready)
}

func (m *mtxWithReadiness) isReady() bool {
select {
case <-m.ready:
return true
default:
return false
}
}

func (m *mtxWithReadiness) awaitReady(ctx context.Context) error {
ctx, cancel := context.WithTimeoutCause(ctx, 30*time.Second, errors.New("exceeded 30 seconds in awaitReady"))
defer cancel()
Expand Down

0 comments on commit a584fb7

Please sign in to comment.