diff --git a/CHANGELOG.md b/CHANGELOG.md index 0ee03ab8..3394a48e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,7 @@ ## [unreleased] +### Bug fixes +- [#354](https://github.com/influxdata/influxdb-client-go/pull/354) More efficient synchronization in WriteAPIBlocking. + ## 2.10.0 [2022-08-25] ### Features diff --git a/api/writeAPIBlocking.go b/api/writeAPIBlocking.go index d348aa8f..b1711a86 100644 --- a/api/writeAPIBlocking.go +++ b/api/writeAPIBlocking.go @@ -8,6 +8,7 @@ import ( "context" "strings" "sync" + "sync/atomic" http2 "github.com/influxdata/influxdb-client-go/v2/api/http" "github.com/influxdata/influxdb-client-go/v2/api/write" @@ -51,7 +52,7 @@ type WriteAPIBlocking interface { type writeAPIBlocking struct { service *iwrite.Service writeOptions *write.Options - batching bool + batching int32 batch []string mu sync.Mutex } @@ -69,28 +70,28 @@ func NewWriteAPIBlockingWithBatching(org string, bucket string, service http2.Se } func (w *writeAPIBlocking) EnableBatching() { - w.mu.Lock() - defer w.mu.Unlock() - if !w.batching { - w.batching = true + if atomic.LoadInt32(&w.batching) == 0 { + w.mu.Lock() + w.batching = 1 w.batch = make([]string, 0, w.writeOptions.BatchSize()) + w.mu.Unlock() } } func (w *writeAPIBlocking) write(ctx context.Context, line string) error { - w.mu.Lock() - defer w.mu.Unlock() - body := line - if w.batching { - w.batch = append(w.batch, line) - if len(w.batch) == int(w.writeOptions.BatchSize()) { - body = strings.Join(w.batch, "\n") - w.batch = w.batch[:0] - } else { - return nil - } + if atomic.LoadInt32(&w.batching) > 0 { + return func(b string) error { + w.mu.Lock() + defer w.mu.Unlock() + w.batch = append(w.batch, b) + if len(w.batch) == int(w.writeOptions.BatchSize()) { + return w.flush(ctx) + } else { + return nil + } + }(line) } - err := w.service.WriteBatch(ctx, iwrite.NewBatch(body, w.writeOptions.MaxRetryTime())) + err := w.service.WriteBatch(ctx, iwrite.NewBatch(line, w.writeOptions.MaxRetryTime())) if err != nil { return err } @@ -112,13 +113,23 @@ func (w *writeAPIBlocking) WritePoint(ctx context.Context, point ...*write.Point return w.write(ctx, line) } -func (w *writeAPIBlocking) Flush(ctx context.Context) error { - w.mu.Lock() - defer w.mu.Unlock() - if w.batching && len(w.batch) > 0 { +// flush is unsychronized helper for creating and sending batch +// Must be called from synchronized block +func (w *writeAPIBlocking) flush(ctx context.Context) error { + if len(w.batch) > 0 { body := strings.Join(w.batch, "\n") w.batch = w.batch[:0] - return w.service.WriteBatch(ctx, iwrite.NewBatch(body, w.writeOptions.MaxRetryTime())) + b := iwrite.NewBatch(body, w.writeOptions.MaxRetryTime()) + return w.service.WriteBatch(ctx, b) + } + return nil +} + +func (w *writeAPIBlocking) Flush(ctx context.Context) error { + if atomic.LoadInt32(&w.batching) > 0 { + w.mu.Lock() + defer w.mu.Unlock() + return w.flush(ctx) } return nil } diff --git a/internal/test/http_service.go b/internal/test/http_service.go index ff72e3a0..6f35f686 100644 --- a/internal/test/http_service.go +++ b/internal/test/http_service.go @@ -123,7 +123,9 @@ func (t *HTTPService) DoHTTPRequestWithResponse(_ *http.Request, _ http2.Request // DoPostRequest reads http request, validates URL and stores data in the request func (t *HTTPService) DoPostRequest(_ context.Context, url string, body io.Reader, requestCallback http2.RequestCallback, _ http2.ResponseCallback) *http2.Error { req, err := http.NewRequest("POST", url, nil) + t.lock.Lock() t.requests++ + t.lock.Unlock() if err != nil { return http2.NewError(err) }