diff --git a/go.mod b/go.mod index 4292b67f1..2d62ca2ab 100644 --- a/go.mod +++ b/go.mod @@ -51,6 +51,7 @@ require ( google.golang.org/genproto v0.0.0-20221027153422-115e99e71e1c google.golang.org/grpc v1.51.0 google.golang.org/protobuf v1.28.1 + gopkg.in/yaml.v2 v2.4.0 gopkg.in/yaml.v3 v3.0.1 k8s.io/apimachinery v0.25.0 k8s.io/client-go v0.25.0 @@ -143,7 +144,6 @@ require ( golang.org/x/text v0.5.0 // indirect google.golang.org/appengine v1.6.7 // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect - gopkg.in/yaml.v2 v2.4.0 // indirect sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect sigs.k8s.io/yaml v1.2.0 // indirect ) diff --git a/internal/primitive/container/conque/blocking/queue.go b/internal/primitive/container/conque/blocking/queue.go index 42820ad10..6dd21e72e 100644 --- a/internal/primitive/container/conque/blocking/queue.go +++ b/internal/primitive/container/conque/blocking/queue.go @@ -15,14 +15,20 @@ package blocking import ( + // standard libraries. + stdsync "sync" + "sync/atomic" + // this project. "github.com/linkall-labs/vanus/internal/primitive/container/conque/unbounded" "github.com/linkall-labs/vanus/internal/primitive/sync" ) type Queue[T any] struct { - q unbounded.Queue[T] - sem sync.Semaphore + q unbounded.Queue[T] + sem sync.Semaphore + mu stdsync.RWMutex + state int32 } func New[T any](handoff bool) *Queue[T] { @@ -35,18 +41,46 @@ func (q *Queue[T]) Init(handoff bool) *Queue[T] { } func (q *Queue[T]) Close() { - // FIXME(james.yin): implements this. + atomic.StoreInt32(&q.state, 1) + q.sem.Release() } -func (q *Queue[T]) Push(v T) { +// Wait esures that all incoming Pushes observe that the queue is closed. +func (q *Queue[T]) Wait() { + // Make sure no inflight Push. + q.mu.Lock() + + // no op + _ = 1 + + q.mu.Unlock() +} + +func (q *Queue[T]) Push(v T) bool { + // NOTE: no panic, avoid unlocking with defer. + q.mu.RLock() + + // TODO: maybe atomic is unnecessary. + if atomic.LoadInt32(&q.state) != 0 { + q.mu.RUnlock() + return false + } + _ = q.q.Push(v) q.sem.Release() + q.mu.RUnlock() + return true } func (q *Queue[T]) SharedPop() (T, bool) { q.sem.Acquire() - // FIXME(james.yin): check close. + // Check close. + if atomic.LoadInt32(&q.state) != 0 { + q.sem.Release() + var v T + return v, false + } for { v, ok := q.q.SharedPop() @@ -59,7 +93,12 @@ func (q *Queue[T]) SharedPop() (T, bool) { func (q *Queue[T]) UniquePop() (T, bool) { q.sem.Acquire() - // FIXME(james.yin): check close. + // Check close. + if atomic.LoadInt32(&q.state) != 0 { + q.sem.Release() + var v T + return v, false + } for { v, _, ok := q.q.UniquePop() @@ -68,3 +107,8 @@ func (q *Queue[T]) UniquePop() (T, bool) { } } } + +func (q *Queue[T]) RawPop() (T, bool) { + v, _, ok := q.q.UniquePop() + return v, ok +} diff --git a/internal/store/wal/config.go b/internal/store/wal/config.go index 5b488df8d..de6953273 100644 --- a/internal/store/wal/config.go +++ b/internal/store/wal/config.go @@ -32,13 +32,12 @@ const ( ) type config struct { - pos int64 - cb OnEntryCallback - blockSize int - fileSize int64 - flushDelayTime time.Duration // default: 3 * time.Millisecond - appendBufferSize int - engine ioengine.Interface + pos int64 + cb OnEntryCallback + blockSize int + fileSize int64 + flushDelayTime time.Duration // default: 3 * time.Millisecond + engine ioengine.Interface } func (cfg *config) segmentedFileOptions() []segmentedfile.Option { @@ -62,9 +61,8 @@ func (cfg *config) streamSchedulerOptions() []stream.Option { func defaultConfig() config { cfg := config{ - blockSize: defaultBlockSize, - fileSize: defaultFileSize, - appendBufferSize: defaultAppendBufferSize, + blockSize: defaultBlockSize, + fileSize: defaultFileSize, } return cfg } @@ -112,12 +110,6 @@ func WithFlushDelayTime(d time.Duration) Option { } } -func WithAppendBufferSize(size int) Option { - return func(cfg *config) { - cfg.appendBufferSize = size - } -} - func WithIOEngine(engine ioengine.Interface) Option { return func(cfg *config) { cfg.engine = engine diff --git a/internal/store/wal/config_test.go b/internal/store/wal/config_test.go index 9905fa877..c7f0af2fa 100644 --- a/internal/store/wal/config_test.go +++ b/internal/store/wal/config_test.go @@ -36,7 +36,6 @@ func TestConfig(t *testing.T) { WithBlockSize(1024), WithFileSize(4*1024*1024), WithFlushDelayTime(5*time.Millisecond), - WithAppendBufferSize(1), WithIOEngine(engine), ) @@ -44,7 +43,6 @@ func TestConfig(t *testing.T) { So(cfg.blockSize, ShouldEqual, 1024) So(cfg.fileSize, ShouldEqual, 4*1024*1024) So(cfg.flushDelayTime, ShouldEqual, 5*time.Millisecond) - So(cfg.appendBufferSize, ShouldEqual, 1) So(cfg.engine, ShouldEqual, engine) }) } diff --git a/internal/store/wal/wal.go b/internal/store/wal/wal.go index 59cc0b5e2..bff24d901 100644 --- a/internal/store/wal/wal.go +++ b/internal/store/wal/wal.go @@ -24,6 +24,7 @@ import ( "github.com/linkall-labs/vanus/observability/log" // this project. + "github.com/linkall-labs/vanus/internal/primitive/container/conque/blocking" "github.com/linkall-labs/vanus/internal/store/io/engine" "github.com/linkall-labs/vanus/internal/store/io/stream" "github.com/linkall-labs/vanus/internal/store/io/zone/segmentedfile" @@ -54,13 +55,11 @@ type WAL struct { blockSize int - appendC chan *appender + appendQ blocking.Queue[*appender] - closeMu sync.RWMutex appendWg sync.WaitGroup - closeC chan struct{} - doneC chan struct{} + doneC chan struct{} } func Open(ctx context.Context, dir string, opts ...Option) (*WAL, error) { @@ -106,11 +105,10 @@ func open(ctx context.Context, dir string, cfg config) (*WAL, error) { scheduler: scheduler, blockSize: cfg.blockSize, - appendC: make(chan *appender, cfg.appendBufferSize), - closeC: make(chan struct{}), - doneC: make(chan struct{}), + doneC: make(chan struct{}), } + w.appendQ.Init(false) go w.runAppend() return w, nil @@ -121,15 +119,7 @@ func (w *WAL) Dir() string { } func (w *WAL) Close() { - w.closeMu.Lock() - defer w.closeMu.Unlock() - - select { - case <-w.closeC: - default: - close(w.closeC) - close(w.appendC) - } + w.appendQ.Close() } func (w *WAL) doClose() { @@ -164,20 +154,31 @@ func (w *WAL) append(ctx context.Context, entries [][]byte, direct bool, cb Appe cb(nil, nil) } - // NOTE: Can not close the WAL while writing to appendC. - w.closeMu.RLock() - select { - case <-w.closeC: + if !w.appendQ.Push(w.newAppender(ctx, entries, direct, cb)) { // TODO(james.yin): invoke callback in another goroutine. cb(nil, ErrClosed) - default: - w.appendC <- w.newAppender(ctx, entries, direct, cb) } - w.closeMu.RUnlock() } func (w *WAL) runAppend() { - for task := range w.appendC { + for { + task, ok := w.appendQ.UniquePop() + if !ok { + break + } + + task.invoke() + } + + w.appendQ.Wait() + + // Invoke remaind tasks in w.appendQ. + for { + task, ok := w.appendQ.RawPop() + if !ok { + break + } + task.invoke() } diff --git a/test/wal/walbench.go b/test/wal/walbench.go new file mode 100644 index 000000000..581db4cc3 --- /dev/null +++ b/test/wal/walbench.go @@ -0,0 +1,125 @@ +// Copyright 2023 Linkall Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + // standard libraries. + "context" + "fmt" + "log" + stdsync "sync" + "sync/atomic" + "time" + + // this project. + "github.com/linkall-labs/vanus/internal/primitive/sync" + walog "github.com/linkall-labs/vanus/internal/store/wal" +) + +const ( + N = 500000 + PayloadSize = 1024 +) + +func main() { + ctx := context.Background() + + wal, err := walog.Open( + ctx, + "wal-test", + // walog.WithBlockSize(4*1024), + // walog.WithFileSize(1024*1024*1024), + walog.WithFlushDelayTime(10*time.Millisecond), + // walog.WithIOEngine(uring.New()), + // walog.WithIOEngine(psync.New(psync.WithParallel(8))), + ) + if err != nil { + panic(err) + } + + defer func() { + wal.Close() + wal.Wait() + }() + + payload := generatePayload(PayloadSize) + + var count, last int64 + var totalCost, lastCost int64 + var totalWrite, lastWrite int64 + + go func() { + for { + time.Sleep(time.Second) + cu := atomic.LoadInt64(&count) + ct := atomic.LoadInt64(&totalCost) + cw := atomic.LoadInt64(&totalWrite) + if n := cu - last; n != 0 { + log.Printf("TPS: %d\tlatency: %.3fms\tthroughput: %.2fMiB/s\n", + n, float64(ct-lastCost)/float64(n)/1000, float64(cw-lastWrite)/1024/1024) + } else { + log.Printf("TPS: %d\tlatency: NaN\n", n) + } + last = cu + lastCost = ct + lastWrite = cw + } + }() + + start := time.Now() + + var sema sync.Semaphore + for i := 0; i < 256; i++ { + sema.Release() + } + + var wg stdsync.WaitGroup + wg.Add(N) + for i := 0; i < N; i++ { + sema.Acquire() + st := time.Now() + wal.AppendOne(ctx, payload, func(r walog.Range, err error) { + cost := time.Since(st) + if err != nil { + log.Printf("err: %v", err) + } else { + atomic.AddInt64(&count, 1) + atomic.AddInt64(&totalCost, cost.Microseconds()) + atomic.AddInt64(&totalWrite, r.EO-r.SO) + } + wg.Done() + sema.Release() + }) + } + wg.Wait() + + cost := time.Since(start) + fmt.Printf("write: %.2f MiB\n", float64(totalCost)/1024/1024) + fmt.Printf("cost: %d ms\n", cost.Milliseconds()) + fmt.Printf("failed: %d\n", N-count) + fmt.Printf("tps: %f\n", float64(count)/cost.Seconds()) +} + +func generatePayload(size int) []byte { + data := func() string { + str := "" + for idx := 0; idx < size-1; idx++ { + str += "a" + } + str += "\n" + return str + }() + return []byte(data) +}