Skip to content

Commit

Permalink
[dbnode] Remove readers and writer from aggregator API (#3122)
Browse files Browse the repository at this point in the history
  • Loading branch information
linasm authored Feb 9, 2021
1 parent 9e233fe commit 59e5e2e
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 259 deletions.
47 changes: 4 additions & 43 deletions src/dbnode/storage/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -1721,6 +1721,7 @@ func (n *dbNamespace) aggregateTiles(
targetBlockStart = opts.Start.Truncate(targetBlockSize)
sourceBlockSize = sourceNs.Options().RetentionOptions().BlockSize()
lastSourceBlockEnd = opts.End.Truncate(sourceBlockSize)
processedShards = opts.InsOptions.MetricsScope().Counter("processed-shards")
)

if targetBlockStart.Add(targetBlockSize).Before(lastSourceBlockEnd) {
Expand All @@ -1732,24 +1733,6 @@ func (n *dbNamespace) aggregateTiles(
return 0, errNamespaceNotBootstrapped
}

var (
processedShards = opts.InsOptions.MetricsScope().Counter("processed-shards")
targetShards = n.OwnedShards()
bytesPool = sourceNs.StorageOptions().BytesPool()
fsOptions = sourceNs.StorageOptions().CommitLogOptions().FilesystemOptions()
blockReaders []fs.DataFileSetReader
sourceBlockStarts []time.Time
)

for sourceBlockStart := targetBlockStart; sourceBlockStart.Before(lastSourceBlockEnd); sourceBlockStart = sourceBlockStart.Add(sourceBlockSize) {
reader, err := fs.NewReader(bytesPool, fsOptions)
if err != nil {
return 0, err
}
sourceBlockStarts = append(sourceBlockStarts, sourceBlockStart)
blockReaders = append(blockReaders, reader)
}

// Cold flusher builds the reverse index for target (current) ns.
onColdFlushNs, err := n.opts.OnColdFlush().ColdFlushNamespace(n)
if err != nil {
Expand All @@ -1764,37 +1747,15 @@ func (n *dbNamespace) aggregateTiles(
if aggregationSuccess {
return
}
// Abort buildling reverse index if aggregation fails.
// Abort building reverse index if aggregation fails.
if err := onColdFlushNs.Abort(); err != nil {
n.log.Error("error aborting cold flush",
zap.Stringer("sourceNs", sourceNs.ID()), zap.Error(err))
}
}()
for _, targetShard := range targetShards {
sourceShard, _, err := sourceNs.ReadableShardAt(targetShard.ID())
if err != nil {
return 0, fmt.Errorf("no matching shard in source namespace %s: %v", sourceNs.ID(), err)
}

sourceBlockVolumes := make([]shardBlockVolume, 0, len(sourceBlockStarts))
for _, sourceBlockStart := range sourceBlockStarts {
latestVolume, err := sourceShard.LatestVolume(sourceBlockStart)
if err != nil {
n.log.Error("error getting shards latest volume",
zap.Error(err), zap.Uint32("shard", sourceShard.ID()), zap.Time("blockStart", sourceBlockStart))
return 0, err
}
sourceBlockVolumes = append(sourceBlockVolumes, shardBlockVolume{sourceBlockStart, latestVolume})
}

writer, err := fs.NewStreamingWriter(n.opts.CommitLogOptions().FilesystemOptions())
if err != nil {
return 0, err
}

for _, targetShard := range n.OwnedShards() {
shardProcessedTileCount, err := targetShard.AggregateTiles(
ctx, sourceNs, n, sourceShard.ID(), blockReaders, writer, sourceBlockVolumes,
onColdFlushNs, opts)
ctx, sourceNs, n, targetShard.ID(), onColdFlushNs, opts)

processedTileCount += shardProcessedTileCount
processedShards.Inc(1)
Expand Down
52 changes: 15 additions & 37 deletions src/dbnode/storage/namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1426,7 +1426,8 @@ func TestNamespaceAggregateTilesFailUntilBootstrapped(t *testing.T) {
sourceNsID = ident.StringID("source")
targetNsID = ident.StringID("target")
start = time.Now().Truncate(time.Hour)
opts = AggregateTilesOptions{Start: start, End: start.Add(time.Hour)}
insOpts = instrument.NewOptions()
opts = AggregateTilesOptions{Start: start, End: start.Add(time.Hour), InsOptions: insOpts}
)

sourceNs, sourceCloser := newTestNamespaceWithIDOpts(t, sourceNsID, namespace.NewOptions())
Expand All @@ -1445,22 +1446,21 @@ func TestNamespaceAggregateTilesFailUntilBootstrapped(t *testing.T) {
}

func TestNamespaceAggregateTiles(t *testing.T) {
ctrl := gomock.NewController(t)
ctrl := xtest.NewController(t)
defer ctrl.Finish()

ctx := context.NewContext()
defer ctx.Close()

var (
sourceNsID = ident.StringID("source")
targetNsID = ident.StringID("target")
sourceBlockSize = time.Hour
targetBlockSize = 2 * time.Hour
start = time.Now().Truncate(targetBlockSize)
secondSourceBlockStart = start.Add(sourceBlockSize)
shard0ID uint32 = 10
shard1ID uint32 = 20
insOpts = instrument.NewOptions()
sourceNsID = ident.StringID("source")
targetNsID = ident.StringID("target")
sourceBlockSize = time.Hour
targetBlockSize = 2 * time.Hour
start = time.Now().Truncate(targetBlockSize)
shard0ID = uint32(10)
shard1ID = uint32(20)
insOpts = instrument.NewOptions()
)

opts, err := NewAggregateTilesOptions(start, start.Add(targetBlockSize), time.Second, targetNsID, insOpts)
Expand All @@ -1485,42 +1485,20 @@ func TestNamespaceAggregateTiles(t *testing.T) {
mockOnColdFlush.EXPECT().ColdFlushNamespace(gomock.Any()).Return(mockOnColdFlushNs, nil)
targetNs.opts = targetNs.opts.SetOnColdFlush(mockOnColdFlush)

sourceShard0 := NewMockdatabaseShard(ctrl)
sourceShard1 := NewMockdatabaseShard(ctrl)
sourceNs.shards[0] = sourceShard0
sourceNs.shards[1] = sourceShard1

sourceShard0.EXPECT().ID().Return(shard0ID)
sourceShard0.EXPECT().IsBootstrapped().Return(true)
sourceShard0.EXPECT().LatestVolume(start).Return(5, nil)
sourceShard0.EXPECT().LatestVolume(start.Add(sourceBlockSize)).Return(15, nil)

sourceShard1.EXPECT().ID().Return(shard1ID)
sourceShard1.EXPECT().IsBootstrapped().Return(true)
sourceShard1.EXPECT().LatestVolume(start).Return(7, nil)
sourceShard1.EXPECT().LatestVolume(start.Add(sourceBlockSize)).Return(17, nil)

targetShard0 := NewMockdatabaseShard(ctrl)
targetShard1 := NewMockdatabaseShard(ctrl)
targetNs.shards[0] = targetShard0
targetNs.shards[1] = targetShard1

targetShard0.EXPECT().ID().Return(uint32(0))
targetShard1.EXPECT().ID().Return(uint32(1))

sourceBlockVolumes0 := []shardBlockVolume{{start, 5}, {secondSourceBlockStart, 15}}
sourceBlockVolumes1 := []shardBlockVolume{{start, 7}, {secondSourceBlockStart, 17}}
targetShard0.EXPECT().ID().Return(shard0ID)
targetShard1.EXPECT().ID().Return(shard1ID)

targetShard0.EXPECT().
AggregateTiles(
ctx, sourceNs, targetNs, shard0ID, gomock.Len(2), gomock.Any(),
sourceBlockVolumes0, gomock.Any(), opts).
AggregateTiles(ctx, sourceNs, targetNs, shard0ID, mockOnColdFlushNs, opts).
Return(int64(3), nil)

targetShard1.EXPECT().
AggregateTiles(
ctx, sourceNs, targetNs, shard1ID, gomock.Len(2), gomock.Any(),
sourceBlockVolumes1, gomock.Any(), opts).
AggregateTiles(ctx, sourceNs, targetNs, shard1ID, mockOnColdFlushNs, opts).
Return(int64(2), nil)

processedTileCount, err := targetNs.AggregateTiles(ctx, sourceNs, opts)
Expand Down
6 changes: 2 additions & 4 deletions src/dbnode/storage/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -946,10 +946,8 @@ func (a *noopTileAggregator) AggregateTiles(
ctx context.Context,
sourceNs, targetNs Namespace,
shardID uint32,
blockReaders []fs.DataFileSetReader,
writer fs.StreamingWriter,
onFlushSeries persist.OnFlushSeries,
opts AggregateTilesOptions,
) (int64, error) {
return 0, nil
) (int64, int, error) {
return 0, 0, nil
}
87 changes: 2 additions & 85 deletions src/dbnode/storage/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -2669,99 +2669,16 @@ func (s *dbShard) AggregateTiles(
ctx context.Context,
sourceNs, targetNs Namespace,
shardID uint32,
blockReaders []fs.DataFileSetReader,
writer fs.StreamingWriter,
sourceBlockVolumes []shardBlockVolume,
onFlushSeries persist.OnFlushSeries,
opts AggregateTilesOptions,
) (int64, error) {
if len(blockReaders) != len(sourceBlockVolumes) {
return 0, fmt.Errorf(
"blockReaders and sourceBlockVolumes length mismatch (%d != %d)",
len(blockReaders),
len(sourceBlockVolumes))
}

openBlockReaders := make([]fs.DataFileSetReader, 0, len(blockReaders))
defer func() {
for _, reader := range openBlockReaders {
if err := reader.Close(); err != nil {
s.logger.Error("could not close DataFileSetReader", zap.Error(err))
}
}
}()

var (
sourceNsID = sourceNs.ID()
plannedSeriesCount = 1
)

for sourceBlockPos, blockReader := range blockReaders {
sourceBlockVolume := sourceBlockVolumes[sourceBlockPos]
openOpts := fs.DataReaderOpenOptions{
Identifier: fs.FileSetFileIdentifier{
Namespace: sourceNsID,
Shard: shardID,
BlockStart: sourceBlockVolume.blockStart,
VolumeIndex: sourceBlockVolume.latestVolume,
},
FileSetType: persist.FileSetFlushType,
StreamingEnabled: true,
}

if err := blockReader.Open(openOpts); err != nil {
if err == fs.ErrCheckpointFileNotFound {
// A very recent source block might not have been flushed yet.
continue
}
s.logger.Error("blockReader.Open",
zap.Error(err),
zap.Time("blockStart", sourceBlockVolume.blockStart),
zap.Int("volumeIndex", sourceBlockVolume.latestVolume))
return 0, err
}

entries := blockReader.Entries()
if entries > plannedSeriesCount {
plannedSeriesCount = entries
}

openBlockReaders = append(openBlockReaders, blockReader)
}

latestTargetVolume, err := s.LatestVolume(opts.Start)
if err != nil {
return 0, err
}

nextVolume := latestTargetVolume + 1
writerOpenOpts := fs.StreamingWriterOpenOptions{
NamespaceID: s.namespace.ID(),
ShardID: s.ID(),
BlockStart: opts.Start,
BlockSize: s.namespace.Options().RetentionOptions().BlockSize(),
VolumeIndex: nextVolume,
PlannedRecordsCount: uint(plannedSeriesCount),
}
if err = writer.Open(writerOpenOpts); err != nil {
return 0, err
}

var multiErr xerrors.MultiError

processedTileCount, err := s.tileAggregator.AggregateTiles(
ctx, sourceNs, targetNs, s.ID(), openBlockReaders, writer, onFlushSeries, opts)
processedTileCount, nextVolume, err := s.tileAggregator.AggregateTiles(
ctx, sourceNs, targetNs, shardID, onFlushSeries, opts)
if err != nil {
// NB: cannot return on the error here, must finish writing.
multiErr = multiErr.Add(err)
}

if !multiErr.Empty() {
if err := writer.Abort(); err != nil {
multiErr = multiErr.Add(err)
}
} else if err := writer.Close(); err != nil {
multiErr = multiErr.Add(err)
} else {
// Notify all block leasers that a new volume for the namespace/shard/blockstart
// has been created. This will block until all leasers have relinquished their
Expand Down
80 changes: 6 additions & 74 deletions src/dbnode/storage/shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1855,14 +1855,11 @@ func TestShardAggregateTiles(t *testing.T) {
defer ctx.Close()

var (
sourceBlockSize = time.Hour
targetBlockSize = 2 * time.Hour
start = time.Now().Truncate(targetBlockSize)
opts = AggregateTilesOptions{Start: start, End: start.Add(targetBlockSize), Step: 10 * time.Minute}

firstSourceBlockEntries = 3
secondSourceBlockEntries = 2
maxSourceBlockEntries = 3
opts = AggregateTilesOptions{
Start: start, End: start.Add(targetBlockSize), Step: 10 * time.Minute,
}

expectedProcessedTileCount = int64(4)

Expand All @@ -1872,90 +1869,25 @@ func TestShardAggregateTiles(t *testing.T) {
aggregator := NewMockTileAggregator(ctrl)
testOpts := DefaultTestOptions().SetTileAggregator(aggregator)

sourceShard := testDatabaseShard(t, testOpts)
defer assert.NoError(t, sourceShard.Close())

reader0, volume0 := getMockReader(
ctrl, t, sourceShard, start, nil)
reader0.EXPECT().Entries().Return(firstSourceBlockEntries)

secondSourceBlockStart := start.Add(sourceBlockSize)
reader1, volume1 := getMockReader(
ctrl, t, sourceShard, secondSourceBlockStart, nil)
reader1.EXPECT().Entries().Return(secondSourceBlockEntries)

thirdSourceBlockStart := secondSourceBlockStart.Add(sourceBlockSize)
reader2, volume2 := getMockReader(
ctrl, t, sourceShard, thirdSourceBlockStart, fs.ErrCheckpointFileNotFound)

blockReaders := []fs.DataFileSetReader{reader0, reader1, reader2}
sourceBlockVolumes := []shardBlockVolume{
{start, volume0},
{secondSourceBlockStart, volume1},
{thirdSourceBlockStart, volume2},
}

targetShard := testDatabaseShardWithIndexFn(t, testOpts, nil, true)
defer assert.NoError(t, targetShard.Close())

writer := fs.NewMockStreamingWriter(ctrl)
gomock.InOrder(
writer.EXPECT().Open(fs.StreamingWriterOpenOptions{
NamespaceID: targetShard.namespace.ID(),
ShardID: targetShard.shard,
BlockStart: opts.Start,
BlockSize: targetBlockSize,
VolumeIndex: 1,
PlannedRecordsCount: uint(maxSourceBlockEntries),
}),
writer.EXPECT().Close(),
)

var (
noOpColdFlushNs = &persist.NoOpColdFlushNamespace{}
sourceNs = NewMockNamespace(ctrl)
targetNs = NewMockNamespace(ctrl)
)

sourceNs.EXPECT().ID().Return(sourceShard.namespace.ID())

aggregator.EXPECT().
AggregateTiles(ctx, sourceNs, targetNs, sourceShard.ID(), gomock.Len(2), writer,
noOpColdFlushNs, opts).
Return(expectedProcessedTileCount, nil)
AggregateTiles(ctx, sourceNs, targetNs, targetShard.ID(), noOpColdFlushNs, opts).
Return(expectedProcessedTileCount, 33, nil)

processedTileCount, err := targetShard.AggregateTiles(
ctx, sourceNs, targetNs, sourceShard.ID(), blockReaders, writer,
sourceBlockVolumes, noOpColdFlushNs, opts)
ctx, sourceNs, targetNs, targetShard.ID(), noOpColdFlushNs, opts)
require.NoError(t, err)
assert.Equal(t, expectedProcessedTileCount, processedTileCount)
}

func TestShardAggregateTilesVerifySliceLengths(t *testing.T) {
ctrl := xtest.NewController(t)
defer ctrl.Finish()

ctx := context.NewContext()
defer ctx.Close()

targetShard := testDatabaseShardWithIndexFn(t, DefaultTestOptions(), nil, true)
defer assert.NoError(t, targetShard.Close())

var (
start = time.Now()
blockReaders []fs.DataFileSetReader
sourceBlockVolumes = []shardBlockVolume{{start, 0}}
writer = fs.NewMockStreamingWriter(ctrl)
sourceNs = NewMockNamespace(ctrl)
targetNs = NewMockNamespace(ctrl)
)

_, err := targetShard.AggregateTiles(
ctx, sourceNs, targetNs, 1, blockReaders, writer, sourceBlockVolumes,
&persist.NoOpColdFlushNamespace{}, AggregateTilesOptions{})
require.EqualError(t, err, "blockReaders and sourceBlockVolumes length mismatch (0 != 1)")
}

func TestOpenStreamingReader(t *testing.T) {
ctrl := xtest.NewController(t)
defer ctrl.Finish()
Expand Down
Loading

0 comments on commit 59e5e2e

Please sign in to comment.