Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(blooms): Implement retry in builder #13306

Merged
merged 8 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,19 @@ bloom_build:
# CLI flag: -bloom-build.builder.planner-address
[planner_address: <string> | default = ""]

backoff_config:
# Minimum backoff time
# CLI flag: -bloom-build.builder.backoff.min-backoff
[min_period: <duration> | default = 1s]

# Maximum backoff time
# CLI flag: -bloom-build.builder.backoff.max-backoff
[max_period: <duration> | default = 10s]

# Maximum number of times to retry an operation
# CLI flag: -bloom-build.builder.backoff.max-retries
[max_retries: <int> | default = 5]

# Experimental: The bloom_gateway block configures the Loki bloom gateway
# server, responsible for serving queries for filtering chunks based on filter
# expressions.
Expand Down
50 changes: 43 additions & 7 deletions pkg/bloombuild/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/google/uuid"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/user"
"github.com/pkg/errors"
Expand Down Expand Up @@ -110,12 +111,33 @@ func (b *Builder) stopping(_ error) error {
}

func (b *Builder) running(ctx context.Context) error {
// Retry if the connection to the planner is lost.
retries := backoff.New(context.Background(), b.cfg.BackoffConfig)
chaudum marked this conversation as resolved.
Show resolved Hide resolved
for retries.Ongoing() {
err := b.connectAndBuild(ctx)
if err == nil {
break
}

level.Error(b.logger).Log("msg", "failed to connect and build. Retrying", "err", err)
retries.Wait()
}

if err := retries.Err(); err != nil {
return fmt.Errorf("failed to connect and build: %w", err)
}

return nil
}

func (b *Builder) connectAndBuild(
ctx context.Context,
) error {
opts, err := b.cfg.GrpcConfig.DialOption(nil, nil)
if err != nil {
return fmt.Errorf("failed to create grpc dial options: %w", err)
}

// TODO: Wrap hereafter in retry logic
conn, err := grpc.DialContext(ctx, b.cfg.PlannerAddress, opts...)
if err != nil {
return fmt.Errorf("failed to dial bloom planner: %w", err)
Expand Down Expand Up @@ -162,14 +184,16 @@ func (b *Builder) builderLoop(c protos.PlannerForBuilder_BuilderLoopClient) erro
return fmt.Errorf("failed to receive task from planner: %w", err)
}

logger := log.With(b.logger, "task", protoTask.Task.Id)

b.metrics.taskStarted.Inc()
start := time.Now()
status := statusSuccess

newMetas, err := b.processTask(c.Context(), protoTask.Task)
if err != nil {
status = statusFailure
level.Error(b.logger).Log("msg", "failed to process task", "err", err)
level.Error(logger).Log("msg", "failed to process task", "err", err)
}

b.metrics.taskCompleted.WithLabelValues(status).Inc()
Expand Down Expand Up @@ -197,13 +221,25 @@ func (b *Builder) notifyTaskCompletedToPlanner(
CreatedMetas: metas,
}

// TODO: Implement retry
if err := c.Send(&protos.BuilderToPlanner{
BuilderID: b.ID,
Result: *result.ToProtoTaskResult(),
}); err != nil {
// We have a retry mechanism upper in the stack, but we add another one here
// to try our best to avoid losing the task result.
retries := backoff.New(context.Background(), b.cfg.BackoffConfig)
for retries.Ongoing() {
if err := c.Send(&protos.BuilderToPlanner{
BuilderID: b.ID,
Result: *result.ToProtoTaskResult(),
}); err == nil {
break
}

level.Error(b.logger).Log("msg", "failed to acknowledge task completion to planner. Retrying", "err", err)
retries.Wait()
}

if err := retries.Err(); err != nil {
return fmt.Errorf("failed to acknowledge task completion to planner: %w", err)
}

return nil
}

Expand Down
91 changes: 69 additions & 22 deletions pkg/bloombuild/builder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ import (
"time"

"github.com/go-kit/log"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"google.golang.org/grpc"

"github.com/grafana/loki/v3/pkg/bloombuild/protos"
Expand All @@ -26,6 +26,7 @@ import (

func Test_BuilderLoop(t *testing.T) {
logger := log.NewNopLogger()
//logger := log.NewLogfmtLogger(os.Stdout)
chaudum marked this conversation as resolved.
Show resolved Hide resolved

schemaCfg := config.SchemaConfig{
Configs: []config.PeriodConfig{
Expand Down Expand Up @@ -69,9 +70,17 @@ func Test_BuilderLoop(t *testing.T) {
server, err := newFakePlannerServer(tasks)
require.NoError(t, err)

// Start the server so the builder can connect and receive tasks.
server.Start()

limits := fakeLimits{}
cfg := Config{
PlannerAddress: server.Addr(),
BackoffConfig: backoff.Config{
MinBackoff: 1 * time.Second,
MaxBackoff: 10 * time.Second,
MaxRetries: 5,
},
}
flagext.DefaultValues(&cfg.GrpcConfig)

Expand All @@ -87,10 +96,28 @@ func Test_BuilderLoop(t *testing.T) {
err = services.StartAndAwaitRunning(context.Background(), builder)
require.NoError(t, err)

// Wait for at least one task to be processed.
require.Eventually(t, func() bool {
return int(server.completedTasks.Load()) == len(tasks)
return server.CompletedTasks() > 0
}, 5*time.Second, 100*time.Millisecond)

// Right after stop it so connection is broken, and builder will retry.
server.Stop()

// While the server is stopped, the builder should keep retrying to connect but no tasks should be processed.
// Note this is just a way to sleep while making sure no tasks are processed.
tasksProcessedSoFar := server.CompletedTasks()
require.Never(t, func() bool {
return server.CompletedTasks() > tasksProcessedSoFar
}, 5*time.Second, 500*time.Millisecond)

// Now we start the server so the builder can connect and receive tasks.
server.Start()

require.Eventually(t, func() bool {
return server.CompletedTasks() >= len(tasks)
}, 10*time.Second, 500*time.Millisecond)

err = services.StopAndAwaitTerminated(context.Background(), builder)
require.NoError(t, err)

Expand All @@ -99,41 +126,56 @@ func Test_BuilderLoop(t *testing.T) {

type fakePlannerServer struct {
tasks []*protos.ProtoTask
completedTasks atomic.Int64
completedTasks int
chaudum marked this conversation as resolved.
Show resolved Hide resolved
shutdownCalled bool

addr string
lisAddr string
chaudum marked this conversation as resolved.
Show resolved Hide resolved
grpcServer *grpc.Server
}

func newFakePlannerServer(tasks []*protos.ProtoTask) (*fakePlannerServer, error) {
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
return nil, err
}

server := &fakePlannerServer{
tasks: tasks,
addr: lis.Addr().String(),
grpcServer: grpc.NewServer(),
tasks: tasks,
}

protos.RegisterPlannerForBuilderServer(server.grpcServer, server)
go func() {
if err := server.grpcServer.Serve(lis); err != nil {
panic(err)
}
}()

return server, nil
}

func (f *fakePlannerServer) Addr() string {
return f.addr
if f.lisAddr == "" {
panic("server not started")
}
return f.lisAddr
}

func (f *fakePlannerServer) Stop() {
f.grpcServer.Stop()
if f.grpcServer != nil {
f.grpcServer.Stop()
}
}

func (f *fakePlannerServer) Start() {
f.Stop()

lisAddr := "localhost:0"
if f.lisAddr != "" {
// Reuse the same address if the server was stopped and started again.
lisAddr = f.lisAddr
}

lis, err := net.Listen("tcp", lisAddr)
if err != nil {
panic(err)
}
f.lisAddr = lis.Addr().String()

f.grpcServer = grpc.NewServer()
protos.RegisterPlannerForBuilderServer(f.grpcServer, f)
go func() {
if err := f.grpcServer.Serve(lis); err != nil {
panic(err)
}
}()
}

func (f *fakePlannerServer) BuilderLoop(srv protos.PlannerForBuilder_BuilderLoopServer) error {
Expand All @@ -149,14 +191,19 @@ func (f *fakePlannerServer) BuilderLoop(srv protos.PlannerForBuilder_BuilderLoop
if _, err := srv.Recv(); err != nil {
return fmt.Errorf("failed to receive task response: %w", err)
}
f.completedTasks.Add(1)
f.completedTasks++
time.Sleep(10 * time.Millisecond) // Simulate task processing time to add some latency.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the sleep be before the task is counted as completed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessarilly but I changed that.

}

// No more tasks. Wait until shutdown.
<-srv.Context().Done()
return nil
}

func (f *fakePlannerServer) CompletedTasks() int {
return f.completedTasks
}

func (f *fakePlannerServer) NotifyBuilderShutdown(_ context.Context, _ *protos.NotifyBuilderShutdownRequest) (*protos.NotifyBuilderShutdownResponse, error) {
f.shutdownCalled = true
return &protos.NotifyBuilderShutdownResponse{}, nil
Expand Down
7 changes: 7 additions & 0 deletions pkg/bloombuild/builder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,27 @@ package builder
import (
"flag"
"fmt"
"time"

"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/grpcclient"
)

// Config configures the bloom-builder component.
type Config struct {
GrpcConfig grpcclient.Config `yaml:"grpc_config"`
PlannerAddress string `yaml:"planner_address"`
BackoffConfig backoff.Config `yaml:"backoff_config"`
}

// RegisterFlagsWithPrefix registers flags for the bloom-planner configuration.
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.PlannerAddress, prefix+".planner-address", "", "Hostname (and port) of the bloom planner")
cfg.GrpcConfig.RegisterFlagsWithPrefix(prefix+".grpc", f)

f.DurationVar(&cfg.BackoffConfig.MinBackoff, prefix+".backoff.min-backoff", 1*time.Second, "Minimum backoff time")
f.DurationVar(&cfg.BackoffConfig.MaxBackoff, prefix+".backoff.max-backoff", 10*time.Second, "Maximum backoff time")
f.IntVar(&cfg.BackoffConfig.MaxRetries, prefix+".backoff.max-retries", 5, "Maximum number of times to retry an operation")
chaudum marked this conversation as resolved.
Show resolved Hide resolved
}

func (cfg *Config) Validate() error {
Expand Down
Loading