Skip to content

Commit

Permalink
feat: Implement owned streams calculation using Partition Ring (#14282)
Browse files Browse the repository at this point in the history
  • Loading branch information
benclive authored Sep 26, 2024
1 parent 923a3e4 commit 3c36ba9
Show file tree
Hide file tree
Showing 9 changed files with 235 additions and 74 deletions.
22 changes: 11 additions & 11 deletions pkg/ingester/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestIngesterWAL(t *testing.T) {

readRingMock := mockReadRingWithOneActiveIngester()

i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock)
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock, nil)
require.NoError(t, err)
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
Expand Down Expand Up @@ -116,7 +116,7 @@ func TestIngesterWAL(t *testing.T) {
expectCheckpoint(t, walDir, false, time.Second)

// restart the ingester
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock)
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock, nil)
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
Expand All @@ -130,7 +130,7 @@ func TestIngesterWAL(t *testing.T) {
require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i))

// restart the ingester
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock)
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock, nil)
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
Expand All @@ -155,7 +155,7 @@ func TestIngesterWALIgnoresStreamLimits(t *testing.T) {

readRingMock := mockReadRingWithOneActiveIngester()

i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock)
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock, nil)
require.NoError(t, err)
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
Expand Down Expand Up @@ -201,7 +201,7 @@ func TestIngesterWALIgnoresStreamLimits(t *testing.T) {
require.NoError(t, err)

// restart the ingester
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock)
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock, nil)
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
Expand Down Expand Up @@ -260,7 +260,7 @@ func TestIngesterWALBackpressureSegments(t *testing.T) {

readRingMock := mockReadRingWithOneActiveIngester()

i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock)
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock, nil)
require.NoError(t, err)
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
Expand All @@ -281,7 +281,7 @@ func TestIngesterWALBackpressureSegments(t *testing.T) {
expectCheckpoint(t, walDir, false, time.Second)

// restart the ingester, ensuring we replayed from WAL.
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock)
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock, nil)
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
Expand All @@ -304,7 +304,7 @@ func TestIngesterWALBackpressureCheckpoint(t *testing.T) {

readRingMock := mockReadRingWithOneActiveIngester()

i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock)
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock, nil)
require.NoError(t, err)
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
Expand All @@ -325,7 +325,7 @@ func TestIngesterWALBackpressureCheckpoint(t *testing.T) {
require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i))

// restart the ingester, ensuring we can replay from the checkpoint as well.
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock)
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock, nil)
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
Expand Down Expand Up @@ -602,7 +602,7 @@ func TestIngesterWALReplaysUnorderedToOrdered(t *testing.T) {

readRingMock := mockReadRingWithOneActiveIngester()

i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock)
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock, nil)
require.NoError(t, err)
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
Expand Down Expand Up @@ -674,7 +674,7 @@ func TestIngesterWALReplaysUnorderedToOrdered(t *testing.T) {
require.NoError(t, err)

// restart the ingester
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock)
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock, nil)
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func newTestStore(t require.TestingT, cfg Config, walOverride WAL) (*testStore,
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)

ing, err := New(cfg, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokitlog.NewNopLogger(), nil, readRingMock)
ing, err := New(cfg, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokitlog.NewNopLogger(), nil, readRingMock, nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))

Expand Down
12 changes: 9 additions & 3 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,15 +292,15 @@ type Ingester struct {

// recalculateOwnedStreams periodically checks the ring for changes and recalculates owned streams for each instance.
readRing ring.ReadRing
recalculateOwnedStreams *recalculateOwnedStreams
recalculateOwnedStreams *recalculateOwnedStreamsSvc

ingestPartitionID int32
partitionRingLifecycler *ring.PartitionInstanceLifecycler
partitionReader *partition.Reader
}

// New makes a new Ingester.
func New(cfg Config, clientConfig client.Config, store Store, limits Limits, configs *runtime.TenantConfigs, registerer prometheus.Registerer, writeFailuresCfg writefailures.Cfg, metricsNamespace string, logger log.Logger, customStreamsTracker push.UsageTracker, readRing ring.ReadRing) (*Ingester, error) {
func New(cfg Config, clientConfig client.Config, store Store, limits Limits, configs *runtime.TenantConfigs, registerer prometheus.Registerer, writeFailuresCfg writefailures.Cfg, metricsNamespace string, logger log.Logger, customStreamsTracker push.UsageTracker, readRing ring.ReadRing, partitionRingWatcher *ring.PartitionRingWatcher) (*Ingester, error) {
if cfg.ingesterClientFactory == nil {
cfg.ingesterClientFactory = client.New
}
Expand Down Expand Up @@ -408,7 +408,13 @@ func New(cfg Config, clientConfig client.Config, store Store, limits Limits, con
i.SetExtractorWrapper(i.cfg.SampleExtractorWrapper)
}

i.recalculateOwnedStreams = newRecalculateOwnedStreams(i.getInstances, i.lifecycler.ID, i.readRing, cfg.OwnedStreamsCheckInterval, util_log.Logger)
var ownedStreamsStrategy ownershipStrategy
if i.cfg.KafkaIngestion.Enabled {
ownedStreamsStrategy = newOwnedStreamsPartitionStrategy(i.ingestPartitionID, partitionRingWatcher, util_log.Logger)
} else {
ownedStreamsStrategy = newOwnedStreamsIngesterStrategy(i.lifecycler.ID, i.readRing, util_log.Logger)
}
i.recalculateOwnedStreams = newRecalculateOwnedStreamsSvc(i.getInstances, ownedStreamsStrategy, cfg.OwnedStreamsCheckInterval, util_log.Logger)

return i, nil
}
Expand Down
26 changes: 13 additions & 13 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestPrepareShutdownMarkerPathNotSet(t *testing.T) {

mockRing := mockReadRingWithOneActiveIngester()

i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, mockRing)
i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, mockRing, nil)
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

Expand All @@ -88,7 +88,7 @@ func TestPrepareShutdown(t *testing.T) {

readRingMock := mockReadRingWithOneActiveIngester()

i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, readRingMock)
i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, readRingMock, nil)
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

Expand Down Expand Up @@ -151,7 +151,7 @@ func TestIngester_GetStreamRates_Correctness(t *testing.T) {

readRingMock := mockReadRingWithOneActiveIngester()

i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, readRingMock)
i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, readRingMock, nil)
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

Expand Down Expand Up @@ -184,7 +184,7 @@ func BenchmarkGetStreamRatesAllocs(b *testing.B) {
}
readRingMock := mockReadRingWithOneActiveIngester()

i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, readRingMock)
i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, readRingMock, nil)
require.NoError(b, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

Expand All @@ -210,7 +210,7 @@ func TestIngester(t *testing.T) {

readRingMock := mockReadRingWithOneActiveIngester()

i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, readRingMock)
i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, readRingMock, nil)
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

Expand Down Expand Up @@ -397,7 +397,7 @@ func TestIngesterStreamLimitExceeded(t *testing.T) {

readRingMock := mockReadRingWithOneActiveIngester()

i, err := New(ingesterConfig, client.Config{}, store, overrides, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, readRingMock)
i, err := New(ingesterConfig, client.Config{}, store, overrides, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, readRingMock, nil)
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

Expand Down Expand Up @@ -819,7 +819,7 @@ func Test_InMemoryLabels(t *testing.T) {

readRingMock := mockReadRingWithOneActiveIngester()

i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, readRingMock)
i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, readRingMock, nil)
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

Expand Down Expand Up @@ -874,7 +874,7 @@ func TestIngester_GetDetectedLabels(t *testing.T) {
}
readRingMock := mockReadRingWithOneActiveIngester()

i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, readRingMock)
i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, readRingMock, nil)
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

Expand Down Expand Up @@ -938,7 +938,7 @@ func TestIngester_GetDetectedLabelsWithQuery(t *testing.T) {
}
readRingMock := mockReadRingWithOneActiveIngester()

i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, readRingMock)
i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, readRingMock, nil)
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

Expand Down Expand Up @@ -1306,7 +1306,7 @@ func TestStats(t *testing.T) {
require.NoError(t, err)
readRingMock := mockReadRingWithOneActiveIngester()

i, err := New(ingesterConfig, client.Config{}, &mockStore{}, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, readRingMock)
i, err := New(ingesterConfig, client.Config{}, &mockStore{}, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, readRingMock, nil)
require.NoError(t, err)

i.instances["test"] = defaultInstance(t)
Expand Down Expand Up @@ -1334,7 +1334,7 @@ func TestVolume(t *testing.T) {
require.NoError(t, err)
readRingMock := mockReadRingWithOneActiveIngester()

i, err := New(ingesterConfig, client.Config{}, &mockStore{}, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, readRingMock)
i, err := New(ingesterConfig, client.Config{}, &mockStore{}, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, readRingMock, nil)
require.NoError(t, err)

i.instances["test"] = defaultInstance(t)
Expand Down Expand Up @@ -1414,7 +1414,7 @@ func createIngesterServer(t *testing.T, ingesterConfig Config) (ingesterClient,
require.NoError(t, err)
readRingMock := mockReadRingWithOneActiveIngester()

ing, err := New(ingesterConfig, client.Config{}, &mockStore{}, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, readRingMock)
ing, err := New(ingesterConfig, client.Config{}, &mockStore{}, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, readRingMock, nil)
require.NoError(t, err)

listener := bufconn.Listen(1024 * 1024)
Expand Down Expand Up @@ -1631,7 +1631,7 @@ func TestUpdateOwnedStreams(t *testing.T) {
require.NoError(t, err)
readRingMock := mockReadRingWithOneActiveIngester()

i, err := New(ingesterConfig, client.Config{}, &mockStore{}, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, readRingMock)
i, err := New(ingesterConfig, client.Config{}, &mockStore{}, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, readRingMock, nil)
require.NoError(t, err)

i.instances["test"] = defaultInstance(t)
Expand Down
Loading

0 comments on commit 3c36ba9

Please sign in to comment.