Skip to content

Commit

Permalink
feat(wal): First pass to implement WAL and hook to PBQ store. (#344)
Browse files Browse the repository at this point in the history
Signed-off-by: Hao Hao <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
Signed-off-by: jyu6 <[email protected]>
Signed-off-by: Derek Wang <[email protected]>
Signed-off-by: Keran Yang <[email protected]>
Signed-off-by: ashwinidulams <[email protected]>
Signed-off-by: ssrigiri1 <[email protected]>
Co-authored-by: Vigith Maurice <[email protected]>
Co-authored-by: Juanlu Yu <[email protected]>
Co-authored-by: jyu6 <[email protected]>
Co-authored-by: Derek Wang <[email protected]>
Co-authored-by: Keran Yang <[email protected]>
Co-authored-by: ashwinidulams <[email protected]>
Co-authored-by: shashank10456 <[email protected]>
Co-authored-by: ssrigiri1 <[email protected]>
  • Loading branch information
9 people committed Nov 29, 2022
1 parent 3cf391b commit 9bb8ebd
Show file tree
Hide file tree
Showing 33 changed files with 1,527 additions and 464 deletions.
2 changes: 1 addition & 1 deletion docs/APIs.md
Original file line number Diff line number Diff line change
Expand Up @@ -3454,7 +3454,7 @@ StoreType (<code>string</code> alias)
</h3>
<p>
<p>
PBQ store’s backend type.
StoreType is the PBQ store’s backend type.
</p>
</p>
<h3 id="numaflow.numaproj.io/v1alpha1.TLS">
Expand Down
11 changes: 6 additions & 5 deletions pkg/apis/numaflow/v1alpha1/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,11 @@ const (
DefaultPBQReadBatchSize = 100 // Default read batch size for pbq

// Default persistent store options
DefaultStoreSyncDuration = 2 * time.Second // Default sync duration for pbq
DefaultStoreType = NoOpType // Default store type
DefaultStoreSize = 1000000 // Default persistent store size
DefaultStoreMaxBufferSize = 100000 // Default buffer size for pbq in bytes
DefaultStoreSyncDuration = 2 * time.Second // Default sync duration for pbq
DefaultStoreType = NoOpType // Default store type
DefaultStoreSize = 1000000 // Default persistent store size
DefaultStoreMaxBufferSize = 100000 // Default buffer size for pbq in bytes
DefaultStorePath = PathPBQMount + "/wals" // Default store path

// Default window options
DefaultWindowType = FixedType
Expand All @@ -144,7 +145,7 @@ const (
PathPBQMount = "/var/numaflow/pbq"
)

// PBQ store's backend type.
// StoreType is the PBQ store's backend type.
type StoreType string

const (
Expand Down
4 changes: 2 additions & 2 deletions pkg/isb/stores/simplebuffer/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ func (b *InMemoryBuffer) Write(_ context.Context, messages []isb.Message) ([]isb
errs[idx] = nil
b.buffer[currentIdx].dirty = true
b.writeIdx = (currentIdx + 1) % b.size
writeOffsets = append(writeOffsets, isb.SimpleIntOffset(func() int64 {
writeOffsets[idx] = isb.SimpleIntOffset(func() int64 {
return currentIdx
}))
})
// access buffer via lock
b.rwlock.Unlock()
} else {
Expand Down
17 changes: 17 additions & 0 deletions pkg/isb/testutils/rw.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"encoding/json"
"fmt"
"strconv"
"time"

"github.com/go-redis/redis/v8"
Expand Down Expand Up @@ -73,6 +74,22 @@ func BuildTestReadMessages(count int64, startTime time.Time) []isb.ReadMessage {
return readMessages
}

// BuildTestReadMessages builds test isb.ReadMessage which can be used for testing.
func BuildTestReadMessagesIntOffset(count int64, startTime time.Time) []isb.ReadMessage {
writeMessages := BuildTestWriteMessages(count, startTime)
var readMessages = make([]isb.ReadMessage, count)

for idx, writeMessage := range writeMessages {
offset, _ := strconv.Atoi(writeMessage.Header.ID)
readMessages[idx] = isb.ReadMessage{
Message: writeMessage,
ReadOffset: isb.SimpleIntOffset(func() int64 { return int64(offset) }),
}
}

return readMessages
}

// ReadMessagesLen is used to test the length of the messages read as they arrive on the stream
// If a stream already has 5 elements which have been read and then we add another set of elements of 10 the total number would be 15.
func ReadMessagesLen(ctx context.Context, options *redis.UniversalOptions, streamName string, expectedValue int64) bool {
Expand Down
24 changes: 2 additions & 22 deletions pkg/pbq/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ limitations under the License.
package pbq

import (
dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/pbq/store"
"time"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
)

type options struct {
Expand All @@ -29,12 +29,6 @@ type options struct {
readTimeout time.Duration
// readBatchSize max size of batch to read from store
readBatchSize int64
// storeOptions options for pbq store
storeOptions *store.StoreOptions
}

func (o *options) StoreOptions() *store.StoreOptions {
return o.storeOptions
}

type PBQOption func(options *options) error
Expand All @@ -44,7 +38,6 @@ func DefaultOptions() *options {
channelBufferSize: dfv1.DefaultPBQChannelBufferSize,
readTimeout: dfv1.DefaultPBQReadTimeout,
readBatchSize: dfv1.DefaultPBQReadBatchSize,
storeOptions: store.DefaultOptions(),
}
}

Expand All @@ -71,16 +64,3 @@ func WithReadBatchSize(size int64) PBQOption {
return nil
}
}

// WithPBQStoreOptions sets different pbq store options
func WithPBQStoreOptions(opts ...store.StoreOption) PBQOption {
return func(options *options) error {
for _, opt := range opts {
err := opt(options.storeOptions)
if err != nil {
return err
}
}
return nil
}
}
7 changes: 2 additions & 5 deletions pkg/pbq/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,23 @@ limitations under the License.
package pbq

import (
"github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/pbq/store"
"github.com/stretchr/testify/assert"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestOptions(t *testing.T) {
testOpts := []PBQOption{
WithReadBatchSize(100),
WithChannelBufferSize(10),
WithReadTimeout(2 * time.Second),
WithPBQStoreOptions(store.WithPbqStoreType(v1alpha1.NoOpType), store.WithStoreSize(1000)),
}

queueOption := &options{
channelBufferSize: 5,
readTimeout: 1,
readBatchSize: 5,
storeOptions: &store.StoreOptions{},
}

for _, opt := range testOpts {
Expand Down
12 changes: 8 additions & 4 deletions pkg/pbq/pbq.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ import (
"context"
"errors"

"go.uber.org/zap"

"github.com/numaproj/numaflow/pkg/isb"
"github.com/numaproj/numaflow/pkg/pbq/partition"
"github.com/numaproj/numaflow/pkg/pbq/store"
"go.uber.org/zap"
)

var ErrCOB = errors.New("error while writing to pbq, pbq is closed")
Expand All @@ -46,8 +47,8 @@ var _ ReadWriteCloser = (*PBQ)(nil)
func (p *PBQ) Write(ctx context.Context, message *isb.ReadMessage) error {
// if cob we should return
if p.cob {
p.log.Errorw("Failed to write message to pbq, pbq is closed", zap.Any("ID", p.PartitionID), zap.Any("header", message.Header))
return ErrCOB
p.log.Errorw("Failed to write message to pbq, pbq is closed", zap.Any("ID", p.PartitionID), zap.Any("header", message.Header), zap.Any("message", message))
return nil
}
var writeErr error
// we need context to get out of blocking write
Expand Down Expand Up @@ -87,9 +88,12 @@ func (p *PBQ) ReadCh() <-chan *isb.ReadMessage {
// finished forwarding the output to ISB.
func (p *PBQ) GC() error {
err := p.store.GC()
if err != nil {
return err
}
p.store = nil
p.manager.deregister(p.PartitionID)
return err
return nil
}

// replayRecordsFromStore replays store messages when replay flag is set during start up time. It replays by reading from
Expand Down
17 changes: 7 additions & 10 deletions pkg/pbq/pbq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,25 @@ import (
"testing"
"time"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/isb"
"github.com/numaproj/numaflow/pkg/isb/testutils"
"github.com/numaproj/numaflow/pkg/pbq/partition"
"github.com/numaproj/numaflow/pkg/pbq/store"
"github.com/numaproj/numaflow/pkg/pbq/store/memory"
"github.com/stretchr/testify/assert"
)

// test cases for PBQ (store type in-memory)

func TestPBQ_ReadWrite(t *testing.T) {
// create a store of size 100 (it can store max 100 messages)
storeSize := 100
storeSize := int64(100)
// create a pbq with buffer size 5
buffSize := 5

ctx := context.Background()

qManager, _ := NewManager(ctx, WithChannelBufferSize(int64(buffSize)), WithReadTimeout(1*time.Second),
WithPBQStoreOptions(store.WithPbqStoreType(dfv1.InMemoryType), store.WithStoreSize(int64(storeSize))))
qManager, _ := NewManager(ctx, memory.NewMemoryStores(memory.WithStoreSize(storeSize)), WithChannelBufferSize(int64(buffSize)), WithReadTimeout(1*time.Second))

// write 10 isb messages to persisted store
msgCount := 10
Expand Down Expand Up @@ -94,16 +93,15 @@ func TestPBQ_ReadWrite(t *testing.T) {

func Test_PBQReadWithCanceledContext(t *testing.T) {
// create a store of size 100 (it can store max 100 messages)
storeSize := 100
storeSize := int64(100)
//create a pbq with buffer size 10
bufferSize := 10
var err error
var qManager *Manager

ctx := context.Background()

qManager, err = NewManager(ctx, WithChannelBufferSize(int64(bufferSize)), WithReadTimeout(1*time.Second),
WithPBQStoreOptions(store.WithPbqStoreType(dfv1.InMemoryType), store.WithStoreSize(int64(storeSize))))
qManager, err = NewManager(ctx, memory.NewMemoryStores(memory.WithStoreSize(storeSize)), WithChannelBufferSize(int64(bufferSize)), WithReadTimeout(1*time.Second))

assert.NoError(t, err)

Expand Down Expand Up @@ -162,15 +160,14 @@ func Test_PBQReadWithCanceledContext(t *testing.T) {
func TestPBQ_WriteWithStoreFull(t *testing.T) {

// create a store of size 100 (it can store max 100 messages)
storeSize := 100
storeSize := int64(100)
// create a pbq with buffer size 101
buffSize := 101
var qManager *Manager
var err error
ctx := context.Background()

qManager, err = NewManager(ctx, WithChannelBufferSize(int64(buffSize)), WithReadTimeout(1*time.Second),
WithPBQStoreOptions(store.WithPbqStoreType(dfv1.InMemoryType), store.WithStoreSize(int64(storeSize))))
qManager, err = NewManager(ctx, memory.NewMemoryStores(memory.WithStoreSize(storeSize)), WithChannelBufferSize(int64(buffSize)), WithReadTimeout(1*time.Second))
assert.NoError(t, err)

// write 101 isb messages to pbq, but the store size is 100, we should get store is full error
Expand Down
Loading

0 comments on commit 9bb8ebd

Please sign in to comment.