diff --git a/CHANGELOG.md b/CHANGELOG.md index 9594160f..2ad78518 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,10 +1,13 @@ ## [unreleased] ### Features - [#348](https://github.com/influxdata/influxdb-client-go/pull/348) Added `write.Options.Consitency` parameter to support InfluxDB Enterprise. +- [#350](https://github.com/influxdata/influxdb-client-go/pull/350) Added support for implicit batching to `WriteAPIBlocking`. It's off by default, enabled by `EnableBatching()`. ### Bug fixes - [#349](https://github.com/influxdata/influxdb-client-go/pull/349) Skip retrying on specific write errors (mostly partial write error). +### Breaking change +- [#350](https://github.com/influxdata/influxdb-client-go/pull/350) Interface `WriteAPIBlocking` is extend with `EnableBatching()` and `Flush()`. ## 2.9.2 [2022-07-29] ### Bug fixes diff --git a/README.md b/README.md index 928ea3d7..adaed299 100644 --- a/README.md +++ b/README.md @@ -286,7 +286,8 @@ func main() { ``` ### Blocking write client -Blocking write client writes given point(s) synchronously. It doesn't have implicit batching. Batch is created from given set of points. +Blocking write client writes given point(s) synchronously. It doesn't do implicit batching. Batch is created from given set of points. +Implicit batching can be enabled with `WriteAPIBlocking.EnableBatching()`. ```go package main diff --git a/api/writeAPIBlocking.go b/api/writeAPIBlocking.go index 8569c53c..d348aa8f 100644 --- a/api/writeAPIBlocking.go +++ b/api/writeAPIBlocking.go @@ -7,6 +7,7 @@ package api import ( "context" "strings" + "sync" http2 "github.com/influxdata/influxdb-client-go/v2/api/http" "github.com/influxdata/influxdb-client-go/v2/api/write" @@ -14,57 +15,45 @@ import ( ) // WriteAPIBlocking offers blocking methods for writing time series data synchronously into an InfluxDB server. -// It doesn't implicitly create batches of points. It is intended to use for writing less frequent data, such as a weather sensing, or if there is a need to have explicit control of failed batches. +// It doesn't implicitly create batches of points by default. Batches are created from array of points/records. // -// WriteAPIBlocking can be used concurrently. -// When using multiple goroutines for writing, use a single WriteAPIBlocking instance in all goroutines. -// -// To add implicit batching, use a wrapper, such as: -// type writer struct { -// batch []*write.Point -// writeAPI api.WriteAPIBlocking -// batchSize int -// } +// Implicit batching is enabled with EnableBatching(). In this mode, each call to WritePoint or WriteRecord adds a line +// to internal buffer. If length ot the buffer is equal to the batch-size (set in write.Options), the buffer is sent to the server +// and the result of the operation is returned. +// When a point is written to the buffer, nil error is always returned. +// Flush() can be used to trigger sending of batch when it doesn't have the batch-size. // -// func (w *writer) CurrentBatch() []*write.Point { -// return w.batch -// } -// -// func newWriter(writeAPI api.WriteAPIBlocking, batchSize int) *writer { -// return &writer{ -// batch: make([]*write.Point, 0, batchSize), -// writeAPI: writeAPI, -// batchSize: batchSize, -// } -// } +// Synchronous writing is intended to use for writing less frequent data, such as a weather sensing, or if there is a need to have explicit control of failed batches. + // -// func (w *writer) write(ctx context.Context, p *write.Point) error { -// w.batch = append(w.batch, p) -// if len(w.batch) == w.batchSize { -// err := w.writeAPI.WritePoint(ctx, w.batch...) -// if err != nil { -// return err -// } -// w.batch = w.batch[:0] -// } -// return nil -// } +// WriteAPIBlocking can be used concurrently. +// When using multiple goroutines for writing, use a single WriteAPIBlocking instance in all goroutines. type WriteAPIBlocking interface { // WriteRecord writes line protocol record(s) into bucket. - // WriteRecord writes without implicit batching. Batch is created from given number of records. + // WriteRecord writes lines without implicit batching by default, batch is created from given number of records. + // Automatic batching can be enabled by EnableBatching() // Individual arguments can also be batches (multiple records separated by newline). // Non-blocking alternative is available in the WriteAPI interface WriteRecord(ctx context.Context, line ...string) error // WritePoint data point into bucket. - // WritePoint writes without implicit batching. Batch is created from given number of points + // WriteRecord writes points without implicit batching by default, batch is created from given number of points. + // Automatic batching can be enabled by EnableBatching(). // Non-blocking alternative is available in the WriteAPI interface WritePoint(ctx context.Context, point ...*write.Point) error + // EnableBatching turns on implicit batching + // Batch size is controlled via write.Options + EnableBatching() + // Flush forces write of buffer if batching is enabled, even buffer doesn't have the batch-size. + Flush(ctx context.Context) error } // writeAPIBlocking implements WriteAPIBlocking interface type writeAPIBlocking struct { service *iwrite.Service writeOptions *write.Options + batching bool + batch []string + mu sync.Mutex } // NewWriteAPIBlocking creates new instance of blocking write client for writing data to bucket belonging to org @@ -72,8 +61,36 @@ func NewWriteAPIBlocking(org string, bucket string, service http2.Service, write return &writeAPIBlocking{service: iwrite.NewService(org, bucket, service, writeOptions), writeOptions: writeOptions} } +// NewWriteAPIBlockingWithBatching creates new instance of blocking write client for writing data to bucket belonging to org with batching enabled +func NewWriteAPIBlockingWithBatching(org string, bucket string, service http2.Service, writeOptions *write.Options) WriteAPIBlocking { + api := &writeAPIBlocking{service: iwrite.NewService(org, bucket, service, writeOptions), writeOptions: writeOptions} + api.EnableBatching() + return api +} + +func (w *writeAPIBlocking) EnableBatching() { + w.mu.Lock() + defer w.mu.Unlock() + if !w.batching { + w.batching = true + w.batch = make([]string, 0, w.writeOptions.BatchSize()) + } +} + func (w *writeAPIBlocking) write(ctx context.Context, line string) error { - err := w.service.WriteBatch(ctx, iwrite.NewBatch(line, w.writeOptions.MaxRetryTime())) + w.mu.Lock() + defer w.mu.Unlock() + body := line + if w.batching { + w.batch = append(w.batch, line) + if len(w.batch) == int(w.writeOptions.BatchSize()) { + body = strings.Join(w.batch, "\n") + w.batch = w.batch[:0] + } else { + return nil + } + } + err := w.service.WriteBatch(ctx, iwrite.NewBatch(body, w.writeOptions.MaxRetryTime())) if err != nil { return err } @@ -94,3 +111,14 @@ func (w *writeAPIBlocking) WritePoint(ctx context.Context, point ...*write.Point } return w.write(ctx, line) } + +func (w *writeAPIBlocking) Flush(ctx context.Context) error { + w.mu.Lock() + defer w.mu.Unlock() + if w.batching && len(w.batch) > 0 { + body := strings.Join(w.batch, "\n") + w.batch = w.batch[:0] + return w.service.WriteBatch(ctx, iwrite.NewBatch(body, w.writeOptions.MaxRetryTime())) + } + return nil +} diff --git a/api/writeAPIBlocking_test.go b/api/writeAPIBlocking_test.go index 0d9ad25b..c237241e 100644 --- a/api/writeAPIBlocking_test.go +++ b/api/writeAPIBlocking_test.go @@ -40,9 +40,20 @@ func TestWriteRecord(t *testing.T) { service := test.NewTestService(t, "http://localhost:8888") writeAPI := NewWriteAPIBlocking("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5)) lines := test.GenRecords(10) + for _, line := range lines { + err := writeAPI.WriteRecord(context.Background(), line) + require.Nil(t, err) + } + require.Len(t, service.Lines(), 10) + require.Equal(t, 10, service.Requests()) + for i, l := range lines { + assert.Equal(t, l, service.Lines()[i]) + } + service.Close() + err := writeAPI.WriteRecord(context.Background(), lines...) require.Nil(t, err) - require.Len(t, service.Lines(), 10) + require.Equal(t, 1, service.Requests()) for i, l := range lines { assert.Equal(t, l, service.Lines()[i]) } @@ -120,3 +131,30 @@ func TestWriteErrors(t *testing.T) { require.Equal(t, 10, errors) } + +func TestWriteBatchIng(t *testing.T) { + service := test.NewTestService(t, "http://localhost:8888") + writeAPI := NewWriteAPIBlockingWithBatching("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5)) + lines := test.GenRecords(10) + for i, line := range lines { + err := writeAPI.WriteRecord(context.Background(), line) + require.Nil(t, err) + if i == 4 || i == 9 { + assert.Equal(t, 1, service.Requests()) + require.Len(t, service.Lines(), 5) + + service.Close() + } + } + + for i := 0; i < 4; i++ { + err := writeAPI.WriteRecord(context.Background(), lines[i]) + require.Nil(t, err) + } + assert.Equal(t, 0, service.Requests()) + require.Len(t, service.Lines(), 0) + err := writeAPI.Flush(context.Background()) + require.Nil(t, err) + assert.Equal(t, 1, service.Requests()) + require.Len(t, service.Lines(), 4) +} diff --git a/internal/test/http_service.go b/internal/test/http_service.go index ab97f705..26f9aca4 100644 --- a/internal/test/http_service.go +++ b/internal/test/http_service.go @@ -30,6 +30,7 @@ type HTTPService struct { requestHandler func(url string, body io.Reader) error replyError *http2.Error lock sync.Mutex + requests int } // WasGzip returns true of request was in GZip format @@ -67,6 +68,11 @@ func (t *HTTPService) HTTPClient() *http.Client { return nil } +// Requests returns number of requests +func (t *HTTPService) Requests() int { + return t.requests +} + // Close clears instance func (t *HTTPService) Close() { t.lock.Lock() @@ -76,6 +82,7 @@ func (t *HTTPService) Close() { t.wasGzip = false t.replyError = nil t.requestHandler = nil + t.requests = 0 t.lock.Unlock() } @@ -116,6 +123,7 @@ func (t *HTTPService) DoHTTPRequestWithResponse(_ *http.Request, _ http2.Request // DoPostRequest reads http request, validates URL and stores data in the request func (t *HTTPService) DoPostRequest(_ context.Context, url string, body io.Reader, requestCallback http2.RequestCallback, _ http2.ResponseCallback) *http2.Error { req, err := http.NewRequest("POST", url, nil) + t.requests++ if err != nil { return http2.NewError(err) }