Skip to content

Commit

Permalink
feat(store): use blocking queue in wal appending
Browse files Browse the repository at this point in the history
Signed-off-by: James Yin <[email protected]>
  • Loading branch information
ifplusor committed Feb 23, 2023
1 parent 9b84776 commit 52eae4c
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 49 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ require (
google.golang.org/genproto v0.0.0-20221027153422-115e99e71e1c
google.golang.org/grpc v1.51.0
google.golang.org/protobuf v1.28.1
gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.1
k8s.io/apimachinery v0.25.0
k8s.io/client-go v0.25.0
Expand Down Expand Up @@ -143,7 +144,6 @@ require (
golang.org/x/text v0.5.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect
sigs.k8s.io/yaml v1.2.0 // indirect
)
Expand Down
56 changes: 50 additions & 6 deletions internal/primitive/container/conque/blocking/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,20 @@
package blocking

import (
// standard libraries.
stdsync "sync"
"sync/atomic"

// this project.
"github.com/linkall-labs/vanus/internal/primitive/container/conque/unbounded"
"github.com/linkall-labs/vanus/internal/primitive/sync"
)

type Queue[T any] struct {
q unbounded.Queue[T]
sem sync.Semaphore
q unbounded.Queue[T]
sem sync.Semaphore
mu stdsync.RWMutex
state int32
}

func New[T any](handoff bool) *Queue[T] {
Expand All @@ -35,18 +41,46 @@ func (q *Queue[T]) Init(handoff bool) *Queue[T] {
}

func (q *Queue[T]) Close() {
// FIXME(james.yin): implements this.
atomic.StoreInt32(&q.state, 1)
q.sem.Release()
}

func (q *Queue[T]) Push(v T) {
// Wait esures that all incoming Pushes observe that the queue is closed.
func (q *Queue[T]) Wait() {
// Make sure no inflight Push.
q.mu.Lock()

// no op
_ = 1

q.mu.Unlock()
}

func (q *Queue[T]) Push(v T) bool {
// NOTE: no panic, avoid unlocking with defer.
q.mu.RLock()

// TODO: maybe atomic is unnecessary.
if atomic.LoadInt32(&q.state) != 0 {
q.mu.RUnlock()
return false
}

_ = q.q.Push(v)
q.sem.Release()
q.mu.RUnlock()
return true
}

func (q *Queue[T]) SharedPop() (T, bool) {
q.sem.Acquire()

// FIXME(james.yin): check close.
// Check close.
if atomic.LoadInt32(&q.state) != 0 {
q.sem.Release()
var v T
return v, false
}

for {
v, ok := q.q.SharedPop()
Expand All @@ -59,7 +93,12 @@ func (q *Queue[T]) SharedPop() (T, bool) {
func (q *Queue[T]) UniquePop() (T, bool) {
q.sem.Acquire()

// FIXME(james.yin): check close.
// Check close.
if atomic.LoadInt32(&q.state) != 0 {
q.sem.Release()
var v T
return v, false
}

for {
v, _, ok := q.q.UniquePop()
Expand All @@ -68,3 +107,8 @@ func (q *Queue[T]) UniquePop() (T, bool) {
}
}
}

func (q *Queue[T]) RawPop() (T, bool) {
v, _, ok := q.q.UniquePop()
return v, ok
}
24 changes: 8 additions & 16 deletions internal/store/wal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,12 @@ const (
)

type config struct {
pos int64
cb OnEntryCallback
blockSize int
fileSize int64
flushDelayTime time.Duration // default: 3 * time.Millisecond
appendBufferSize int
engine ioengine.Interface
pos int64
cb OnEntryCallback
blockSize int
fileSize int64
flushDelayTime time.Duration // default: 3 * time.Millisecond
engine ioengine.Interface
}

func (cfg *config) segmentedFileOptions() []segmentedfile.Option {
Expand All @@ -62,9 +61,8 @@ func (cfg *config) streamSchedulerOptions() []stream.Option {

func defaultConfig() config {
cfg := config{
blockSize: defaultBlockSize,
fileSize: defaultFileSize,
appendBufferSize: defaultAppendBufferSize,
blockSize: defaultBlockSize,
fileSize: defaultFileSize,
}
return cfg
}
Expand Down Expand Up @@ -112,12 +110,6 @@ func WithFlushDelayTime(d time.Duration) Option {
}
}

func WithAppendBufferSize(size int) Option {
return func(cfg *config) {
cfg.appendBufferSize = size
}
}

func WithIOEngine(engine ioengine.Interface) Option {
return func(cfg *config) {
cfg.engine = engine
Expand Down
2 changes: 0 additions & 2 deletions internal/store/wal/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,13 @@ func TestConfig(t *testing.T) {
WithBlockSize(1024),
WithFileSize(4*1024*1024),
WithFlushDelayTime(5*time.Millisecond),
WithAppendBufferSize(1),
WithIOEngine(engine),
)

So(cfg.pos, ShouldEqual, 1024)
So(cfg.blockSize, ShouldEqual, 1024)
So(cfg.fileSize, ShouldEqual, 4*1024*1024)
So(cfg.flushDelayTime, ShouldEqual, 5*time.Millisecond)
So(cfg.appendBufferSize, ShouldEqual, 1)
So(cfg.engine, ShouldEqual, engine)
})
}
49 changes: 25 additions & 24 deletions internal/store/wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/linkall-labs/vanus/observability/log"

// this project.
"github.com/linkall-labs/vanus/internal/primitive/container/conque/blocking"
"github.com/linkall-labs/vanus/internal/store/io/engine"
"github.com/linkall-labs/vanus/internal/store/io/stream"
"github.com/linkall-labs/vanus/internal/store/io/zone/segmentedfile"
Expand Down Expand Up @@ -54,13 +55,11 @@ type WAL struct {

blockSize int

appendC chan *appender
appendQ blocking.Queue[*appender]

closeMu sync.RWMutex
appendWg sync.WaitGroup

closeC chan struct{}
doneC chan struct{}
doneC chan struct{}
}

func Open(ctx context.Context, dir string, opts ...Option) (*WAL, error) {
Expand Down Expand Up @@ -106,11 +105,10 @@ func open(ctx context.Context, dir string, cfg config) (*WAL, error) {
scheduler: scheduler,
blockSize: cfg.blockSize,

appendC: make(chan *appender, cfg.appendBufferSize),
closeC: make(chan struct{}),
doneC: make(chan struct{}),
doneC: make(chan struct{}),
}

w.appendQ.Init(false)
go w.runAppend()

return w, nil
Expand All @@ -121,15 +119,7 @@ func (w *WAL) Dir() string {
}

func (w *WAL) Close() {
w.closeMu.Lock()
defer w.closeMu.Unlock()

select {
case <-w.closeC:
default:
close(w.closeC)
close(w.appendC)
}
w.appendQ.Close()
}

func (w *WAL) doClose() {
Expand Down Expand Up @@ -164,20 +154,31 @@ func (w *WAL) append(ctx context.Context, entries [][]byte, direct bool, cb Appe
cb(nil, nil)
}

// NOTE: Can not close the WAL while writing to appendC.
w.closeMu.RLock()
select {
case <-w.closeC:
if !w.appendQ.Push(w.newAppender(ctx, entries, direct, cb)) {
// TODO(james.yin): invoke callback in another goroutine.
cb(nil, ErrClosed)
default:
w.appendC <- w.newAppender(ctx, entries, direct, cb)
}
w.closeMu.RUnlock()
}

func (w *WAL) runAppend() {
for task := range w.appendC {
for {
task, ok := w.appendQ.UniquePop()
if !ok {
break
}

task.invoke()
}

w.appendQ.Wait()

// Invoke remaind tasks in w.appendQ.
for {
task, ok := w.appendQ.RawPop()
if !ok {
break
}

task.invoke()
}

Expand Down
125 changes: 125 additions & 0 deletions test/wal/walbench.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright 2023 Linkall Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
// standard libraries.
"context"
"fmt"
"log"
stdsync "sync"
"sync/atomic"
"time"

// this project.
"github.com/linkall-labs/vanus/internal/primitive/sync"
walog "github.com/linkall-labs/vanus/internal/store/wal"
)

const (
N = 500000
PayloadSize = 1024
)

func main() {
ctx := context.Background()

wal, err := walog.Open(
ctx,
"wal-test",
// walog.WithBlockSize(4*1024),
// walog.WithFileSize(1024*1024*1024),
walog.WithFlushDelayTime(10*time.Millisecond),
// walog.WithIOEngine(uring.New()),
// walog.WithIOEngine(psync.New(psync.WithParallel(8))),
)
if err != nil {
panic(err)
}

defer func() {
wal.Close()
wal.Wait()
}()

payload := generatePayload(PayloadSize)

var count, last int64
var totalCost, lastCost int64
var totalWrite, lastWrite int64

go func() {
for {
time.Sleep(time.Second)
cu := atomic.LoadInt64(&count)
ct := atomic.LoadInt64(&totalCost)
cw := atomic.LoadInt64(&totalWrite)
if n := cu - last; n != 0 {
log.Printf("TPS: %d\tlatency: %.3fms\tthroughput: %.2fMiB/s\n",
n, float64(ct-lastCost)/float64(n)/1000, float64(cw-lastWrite)/1024/1024)
} else {
log.Printf("TPS: %d\tlatency: NaN\n", n)
}
last = cu
lastCost = ct
lastWrite = cw
}
}()

start := time.Now()

var sema sync.Semaphore
for i := 0; i < 256; i++ {
sema.Release()
}

var wg stdsync.WaitGroup
wg.Add(N)
for i := 0; i < N; i++ {
sema.Acquire()
st := time.Now()
wal.AppendOne(ctx, payload, func(r walog.Range, err error) {
cost := time.Since(st)
if err != nil {
log.Printf("err: %v", err)
} else {
atomic.AddInt64(&count, 1)
atomic.AddInt64(&totalCost, cost.Microseconds())
atomic.AddInt64(&totalWrite, r.EO-r.SO)
}
wg.Done()
sema.Release()
})
}
wg.Wait()

cost := time.Since(start)
fmt.Printf("write: %.2f MiB\n", float64(totalCost)/1024/1024)
fmt.Printf("cost: %d ms\n", cost.Milliseconds())
fmt.Printf("failed: %d\n", N-count)
fmt.Printf("tps: %f\n", float64(count)/cost.Seconds())
}

func generatePayload(size int) []byte {
data := func() string {
str := ""
for idx := 0; idx < size-1; idx++ {
str += "a"
}
str += "\n"
return str
}()
return []byte(data)
}

0 comments on commit 52eae4c

Please sign in to comment.