Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Adaptive WriteBatch allocations #3429

Merged
merged 4 commits into from
Apr 21, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/dbnode/network/server/tchannelthrift/node/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2546,7 +2546,7 @@ func TestServiceWriteBatchRaw(t *testing.T) {
{"bar", time.Now().Truncate(time.Second), 42.42},
}

writeBatch := writes.NewWriteBatch(len(values), ident.StringID(nsID), nil)
writeBatch := writes.NewWriteBatch(0, ident.StringID(nsID), nil)
mockDB.EXPECT().
BatchWriter(ident.NewIDMatcher(nsID), len(values)).
Return(writeBatch, nil)
Expand Down Expand Up @@ -2600,7 +2600,7 @@ func TestServiceWriteBatchRawV2SingleNS(t *testing.T) {
{"bar", time.Now().Truncate(time.Second), 42.42},
}

writeBatch := writes.NewWriteBatch(len(values), ident.StringID(nsID), nil)
writeBatch := writes.NewWriteBatch(0, ident.StringID(nsID), nil)
mockDB.EXPECT().
BatchWriter(ident.NewIDMatcher(nsID), len(values)).
Return(writeBatch, nil)
Expand Down Expand Up @@ -2657,8 +2657,8 @@ func TestServiceWriteBatchRawV2MultiNS(t *testing.T) {
{"bar", time.Now().Truncate(time.Second), 42.42},
}

writeBatch1 = writes.NewWriteBatch(len(values), ident.StringID(nsID1), nil)
writeBatch2 = writes.NewWriteBatch(len(values), ident.StringID(nsID2), nil)
writeBatch1 = writes.NewWriteBatch(0, ident.StringID(nsID1), nil)
writeBatch2 = writes.NewWriteBatch(0, ident.StringID(nsID2), nil)
)

mockDB.EXPECT().
Expand Down Expand Up @@ -2751,7 +2751,7 @@ func TestServiceWriteBatchRawOverMaxOutstandingRequests(t *testing.T) {
testIsComplete = make(chan struct{}, 0)
requestIsOutstanding = make(chan struct{}, 0)
)
writeBatch := writes.NewWriteBatch(len(values), ident.StringID(nsID), nil)
writeBatch := writes.NewWriteBatch(0, ident.StringID(nsID), nil)
mockDB.EXPECT().
BatchWriter(ident.NewIDMatcher(nsID), len(values)).
Do(func(nsID ident.ID, numValues int) {
Expand Down
4 changes: 2 additions & 2 deletions src/dbnode/persist/fs/commitlog/commit_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func testSeries(
UniqueIndex: uniqueIndex,
Namespace: ident.StringID("testNS"),
ID: ident.StringID(id),
EncodedTags: ts.EncodedTags(encodedTagsChecked.Bytes()),
EncodedTags: encodedTagsChecked.Bytes(),
Shard: shard,
}
}
Expand Down Expand Up @@ -1064,7 +1064,7 @@ func TestCommitLogBatchWriteDoesNotAddErroredOrSkippedSeries(t *testing.T) {
finalized++
}

writes := writes.NewWriteBatch(4, ident.StringID("ns"), finalizeFn)
writes := writes.NewWriteBatch(0, ident.StringID("ns"), finalizeFn)

testSeriesWrites := []ts.Series{
testSeries(t, opts, 0, "foo.bar", testTags0, 42),
Expand Down
13 changes: 6 additions & 7 deletions src/dbnode/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1598,14 +1598,10 @@ func withEncodingAndPoolingOptions(
policy.IteratorPool,
scope.SubScope("multi-iterator-pool")))

var writeBatchPoolInitialBatchSize *int
writeBatchPoolInitialBatchSize := 0
if policy.WriteBatchPool.InitialBatchSize != nil {
// Use config value if available.
writeBatchPoolInitialBatchSize = policy.WriteBatchPool.InitialBatchSize
} else {
// Otherwise use the default batch size that the client will use.
clientDefaultSize := client.DefaultWriteBatchSize
writeBatchPoolInitialBatchSize = &clientDefaultSize
writeBatchPoolInitialBatchSize = *policy.WriteBatchPool.InitialBatchSize
}

var writeBatchPoolMaxBatchSize *int
Expand All @@ -1623,7 +1619,10 @@ func withEncodingAndPoolingOptions(
// writes without allocating because these objects are very expensive to
// allocate.
commitlogQueueSize := opts.CommitLogOptions().BacklogQueueSize()
expectedBatchSize := *writeBatchPoolInitialBatchSize
expectedBatchSize := writeBatchPoolInitialBatchSize
if expectedBatchSize == 0 {
expectedBatchSize = client.DefaultWriteBatchSize
}
writeBatchPoolSize = commitlogQueueSize / expectedBatchSize
}

Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/storage/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func newOptions(poolOpts pool.ObjectPoolOptions) Options {
bytesPool.Init()
seriesOpts := series.NewOptions()

writeBatchPool := writes.NewWriteBatchPool(poolOpts, nil, nil)
writeBatchPool := writes.NewWriteBatchPool(poolOpts, 0, nil)
writeBatchPool.Init()

segmentReaderPool := xio.NewSegmentReaderPool(poolOpts)
Expand Down
56 changes: 45 additions & 11 deletions src/dbnode/ts/writes/write_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ var (
errTagsAndEncodedTagsRequired = errors.New("tags iterator and encoded tags must be provided")
)

const (
// preallocateBatchCoeff is used for allocating write batches of slightly bigger
// capacity than needed for the current request, in order to reduce allocations on
// subsequent reuse of pooled write batch.
preallocateBatchCoeff = 1.2
)

type writeBatch struct {
writes []BatchWrite
pendingIndex []PendingIndexInsert
Expand All @@ -50,13 +57,25 @@ type writeBatch struct {

// NewWriteBatch creates a new WriteBatch.
func NewWriteBatch(
batchSize int,
initialBatchSize int,
ns ident.ID,
finalizeFn func(WriteBatch),
) WriteBatch {
var (
writes []BatchWrite
pendingIndex []PendingIndexInsert
)

if initialBatchSize > 0 {
writes = make([]BatchWrite, 0, initialBatchSize)
pendingIndex = make([]PendingIndexInsert, 0, initialBatchSize)
// Leaving nil slices if initialBatchSize == 0,
// they will be allocated when needed, based on the actual batch size.
}

return &writeBatch{
writes: make([]BatchWrite, 0, batchSize),
pendingIndex: make([]PendingIndexInsert, 0, batchSize),
writes: writes,
pendingIndex: pendingIndex,
ns: ns,
finalizeFn: finalizeFn,
}
Expand Down Expand Up @@ -102,14 +121,29 @@ func (b *writeBatch) Reset(
batchSize int,
ns ident.ID,
) {
var writes []BatchWrite
// Preallocate slightly more when not using initialBatchSize.
preallocateBatchCap := int(float32(batchSize) * preallocateBatchCoeff)

if batchSize > cap(b.writes) {
writes = make([]BatchWrite, 0, batchSize)
batchCap := batchSize
if cap(b.writes) == 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do this if cap of writes is zero?

I thought you’d always want to allocate 1.2x the size you actually need since next request will likely have a little more or a little less.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason for this was to preserve the original behaviour when having InitialBatchSize set in config (so that if you have initial batch size of 128 in config, and a batch size of 200 arrives, exactly 200 would be allocated, and not 200*1.2).
I'll add an explicit writeBatch.adaptiveSize flag which will help me handle every case better.

batchCap = preallocateBatchCap
}
b.writes = make([]BatchWrite, 0, batchCap)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think it would be very useful to have a gauge for current allocated batch capacity. It could be valuable in cases when, e.g. we receive one huge batch and subsequent batches are much smaller. The memory usage will probably be increased after this even if we are receiving smaller batches so this gauge could help us to identify such cases.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would this gauge work? I guess we would increment it on line 139, but when would we decrement it?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be also a counter for now, I was thinking that maybe in future we will update this to also shrink batch capacity so gauge makes sense from this point of view.

} else {
b.writes = b.writes[:0]
}

if batchSize > cap(b.pendingIndex) {
batchCap := batchSize
if cap(b.pendingIndex) == 0 {
batchCap = preallocateBatchCap
}
b.pendingIndex = make([]PendingIndexInsert, 0, batchCap)
} else {
writes = b.writes[:0]
b.pendingIndex = b.pendingIndex[:0]
}

b.writes = writes
b.ns = ns
b.finalizeEncodedTagsFn = nil
b.finalizeAnnotationFn = nil
Expand Down Expand Up @@ -144,14 +178,14 @@ func (b *writeBatch) PendingIndex() []PendingIndexInsert {
return b.pendingIndex
}

// Set the function that will be called to finalize annotations when a WriteBatch
// is finalized, allowing the caller to pool them.
// SetFinalizeEncodedTagsFn sets the function that will be called to finalize encodedTags
// when a WriteBatch is finalized, allowing the caller to pool them.
func (b *writeBatch) SetFinalizeEncodedTagsFn(f FinalizeEncodedTagsFn) {
b.finalizeEncodedTagsFn = f
}

// Set the function that will be called to finalize annotations when a WriteBatch
// is finalized, allowing the caller to pool them.
// SetFinalizeAnnotationFn sets the function that will be called to finalize annotations
// when a WriteBatch is finalized, allowing the caller to pool them.
func (b *writeBatch) SetFinalizeAnnotationFn(f FinalizeAnnotationFn) {
b.finalizeAnnotationFn = f
}
Expand Down
12 changes: 2 additions & 10 deletions src/dbnode/ts/writes/write_batch_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,10 @@ import (
)

const (
// defaultInitialiBatchSize determines the initial batch size that will be used when filling up the
// pool.
defaultInitialBatchSize = 1024
// defaultWritePoolMaxBatchSize is the default maximum size for a writeBatch that the pool
// will allow to remain in the pool. Any batches larger than that will be discarded to prevent
// excessive memory use forever in the case of an exceptionally large batch write.
defaultMaxBatchSize = 100000
defaultMaxBatchSize = 10240
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reducing this default as 100000 seems to be crazy lot (it would result in a 39MB batch).

)

// WriteBatchPool is a pool of WriteBatch.
Expand All @@ -44,14 +41,9 @@ type WriteBatchPool struct {
// NewWriteBatchPool constructs a new WriteBatchPool.
func NewWriteBatchPool(
opts pool.ObjectPoolOptions,
initialBatchSizeOverride,
initialBatchSize int,
maxBatchSizeOverride *int,
) *WriteBatchPool {
initialBatchSize := defaultInitialBatchSize
if initialBatchSizeOverride != nil {
initialBatchSize = *initialBatchSizeOverride
}

maxBatchSize := defaultMaxBatchSize
if maxBatchSizeOverride != nil {
maxBatchSize = *maxBatchSizeOverride
Expand Down