Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add code to ease batching #103

Open
srebhan opened this issue Sep 20, 2024 · 0 comments
Open

Add code to ease batching #103

srebhan opened this issue Sep 20, 2024 · 0 comments
Assignees

Comments

@srebhan
Copy link

srebhan commented Sep 20, 2024

Use Case

When writing data to InfluxDB continuously it is preferable to send those metrics as batches for performance reasons. However, currently the API does not support automatic batching. It would be useful to aid users in creating batches more easily.

Expected behavior

Have means to easily create batches.

Actual behavior

No means to automatically create batches

Additional info

I envision the following:

package main

import (
	"fmt"
	"sync"
	"time"

	"github.com/InfluxCommunity/influxdb3-go/influxdb3"
)

// Option to adapt properties of a batcher
type Option func(*Batcher)

// WithSize changes the batch-size emitted by the batcher
func WithSize(size int) Option {
	return func(b *Batcher) {
		b.size = size
	}
}

// WithCapacity changes the initial capacity of the points buffer
func WithCapacity(capacity int) Option {
	return func(b *Batcher) {
		b.capacity = capacity
	}
}

// WithReadyCallback sets the function called when a new batch is ready. The
// batcher will wait for the callback to finish, so please return as fast as
// possible and move long-running processing to a  go-routine.
func WithReadyCallback(f func()) Option {
	return func(b *Batcher) {
		b.callbackReady = f
	}
}

// WithEmitCallback sets the function called when a new batch is ready with the
// batch of points. The batcher will wait for the callback to finish, so please
// return as fast as possible and move long-running processing to a go-routine.
func WithEmitCallback(f func([]*influxdb3.Point)) Option {
	return func(b *Batcher) {
		b.callbackEmit = f
	}
}

// DefaultBatchSize is the default number of points emitted
const DefaultBatchSize = 1000

// DefaultCapacity is the default initial capacity of the point buffer
const DefaultCapacity = 2 * DefaultBatchSize

// Batcher collects points and emits them as batches
type Batcher struct {
	size     int
	capacity int

	callbackReady func()
	callbackEmit  func([]*influxdb3.Point)

	points []*influxdb3.Point
	sync.Mutex
}

// NewBatcher creates and initializes a new Batcher instance applying the
// specified options. By default a batch-size is DefaultBatchSize and the
// initial capacity is DefaultCapacity.
func NewBatcher(options ...Option) *Batcher {
	// Setup a batcher with the default values
	b := &Batcher{
		size:     DefaultBatchSize,
		capacity: DefaultCapacity,
	}

	// Apply the options
	for _, o := range options {
		o(b)
	}

	// Setup the internal data
	b.points = make([]*influxdb3.Point, 0, b.capacity)

	return b
}

// Add a metric to the batcher and call the given callbacks if any
func (b *Batcher) Add(p *influxdb3.Point) {
	b.Lock()
	defer b.Unlock()

	// Add the point
	b.points = append(b.points, p)

	// Call callbacks if a new batch is ready
	if b.isReady() {
		if b.callbackReady != nil {
			b.callbackReady()
		}
		if b.callbackEmit != nil {
			b.callbackEmit(b.emitPoints())
		}
	}
}

// Ready tells the call if a new batch is ready to be emitted
func (b *Batcher) Ready() bool {
	b.Lock()
	defer b.Unlock()
	return b.isReady()
}

func (b *Batcher) isReady() bool {
	return len(b.points) >= b.size
}

// Emit returns a new batch of points with the provided batch size or with the
// remaining points. Please drain the points at the end of your processing to
// get the remaining points not filling up a batch.
func (b *Batcher) Emit() []*influxdb3.Point {
	b.Lock()
	defer b.Unlock()

	return b.emitPoints()
}

func (b *Batcher) emitPoints() []*influxdb3.Point {
	l := min(b.size, len(b.points))

	points := b.points[:l]
	b.points = b.points[l:]

	return points
}

// TEST
func main() {
	fmt.Println("Synchroneous use")
	b := NewBatcher(WithSize(5))
	for i := 0; i < 16; i++ {
		p := influxdb3.NewPoint("stat", map[string]string{"place": "home"}, map[string]any{"value": i}, time.Now())
		b.Add(p)
		if b.Ready() {
			fmt.Printf("%d: %v\n", i, b.Emit())
		}
	}
	fmt.Println("final batch:", b.Emit())

	fmt.Println("Asynchroneous use")
	b = NewBatcher(
		WithSize(5),
		WithReadyCallback(func() { fmt.Println("ready") }),
		WithEmitCallback(func(points []*influxdb3.Point) { fmt.Println(points) }),
	)
	fmt.Printf("b=%+v\n", b)
	for i := 0; i < 16; i++ {
		p := influxdb3.NewPoint("stat", map[string]string{"place": "home"}, map[string]any{"value": i}, time.Now())
		b.Add(p)
	}
	fmt.Println("final batch:", b.Emit())
}

Where data can be sent at the fmt.Print locations. This allows to flexibly introduce batching without complicating the API.
The batcher code should probably go to influxdb3/batching or similar.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants