Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

relay: add async/batch relay writer #7580

Merged
merged 22 commits into from
Nov 29, 2022
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 110 additions & 38 deletions dm/relay/binlog_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package relay

import (
"bytes"
"encoding/json"
"fmt"
"os"
Expand All @@ -27,17 +28,31 @@ import (
"go.uber.org/zap"
)

const (
bufferSize = 1 * 1024 * 1024 // 1MB
chanSize = 1024
)

var nilErr error
lance6716 marked this conversation as resolved.
Show resolved Hide resolved

// BinlogWriter is a binlog event writer which writes binlog events to a file.
// Open/Write/Close cannot be called concurrently.
type BinlogWriter struct {
mu sync.RWMutex

offset atomic.Int64
file *os.File
relayDir string
uuid string
filename string
// offset means the offset we add to binlog writer
offset atomic.Int64
// fileOffset means the offset we finish write to the file
fileOffset atomic.Int64
file *os.File
relayDir string
uuid atomic.String
filename atomic.String
err atomic.Error

logger log.Logger

input chan []byte
flushWg sync.WaitGroup
wg sync.WaitGroup
}

// BinlogWriterStatus represents the status of a BinlogWriter.
Expand All @@ -64,6 +79,54 @@ func NewBinlogWriter(logger log.Logger, relayDir string) *BinlogWriter {
}
}

// run starts the binlog writer.
func (w *BinlogWriter) run() {
var (
buf = &bytes.Buffer{}
errOccurs bool
)

// writeToFile writes buffer to file
writeToFile := func() {
if buf.Len() == 0 {
return
}

if w.file == nil {
w.err.CompareAndSwap(nilErr, terror.ErrRelayWriterNotOpened.Delegate(errors.New("file not opened")))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we simply use Generate() rather than Delegate?

errOccurs = true
return
}
n, err := w.file.Write(buf.Bytes())
if err != nil {
w.err.CompareAndSwap(nilErr, terror.ErrBinlogWriterWriteDataLen.Delegate(err, n))
errOccurs = true
return
}
w.fileOffset.Add(int64(n))
buf.Reset()
}

for bs := range w.input {
if errOccurs {
continue
}
if bs != nil {
buf.Write(bs)
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
}
// we use bs = nil to mean flush
if bs == nil || buf.Len() > bufferSize || len(w.input) == 0 {
D3Hunter marked this conversation as resolved.
Show resolved Hide resolved
writeToFile()
}
if bs == nil {
w.flushWg.Done()
}
}
if !errOccurs {
writeToFile()
}
}

func (w *BinlogWriter) Open(uuid, filename string) error {
fullName := filepath.Join(w.relayDir, uuid, filename)
f, err := os.OpenFile(fullName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0o600)
Expand All @@ -79,58 +142,69 @@ func (w *BinlogWriter) Open(uuid, filename string) error {
return terror.ErrBinlogWriterGetFileStat.Delegate(err, f.Name())
}

w.mu.Lock()
defer w.mu.Unlock()

w.offset.Store(fs.Size())
size := fs.Size()
w.offset.Store(size)
w.fileOffset.Store(size)
w.file = f
w.uuid = uuid
w.filename = filename
w.uuid.Store(uuid)
w.filename.Store(filename)
w.err.Store(nilErr)

w.input = make(chan []byte, chanSize)
w.wg.Add(1)
go func() {
defer w.wg.Done()
w.run()
}()

return nil
}

func (w *BinlogWriter) Close() error {
w.mu.Lock()
defer w.mu.Unlock()
if w.input != nil {
close(w.input)
}
w.wg.Wait()

var err error
if w.file != nil {
err2 := w.file.Sync() // try sync manually before close.
if err2 != nil {
w.logger.Error("fail to flush buffered data", zap.String("component", "file writer"), zap.Error(err2))
if err := w.file.Sync(); err != nil {
w.logger.Error("fail to flush buffered data", zap.String("component", "file writer"), zap.Error(err))
}
if err := w.file.Close(); err != nil {
w.err.CompareAndSwap(nilErr, err)
}
err = w.file.Close()
}

w.file = nil
w.offset.Store(0)
w.uuid = ""
w.filename = ""

return err
w.fileOffset.Store(0)
w.uuid.Store("")
w.filename.Store("")
w.input = nil
return w.err.Swap(nilErr)
}

func (w *BinlogWriter) Write(rawData []byte) error {
w.mu.RLock()
defer w.mu.RUnlock()

if w.file == nil {
return terror.ErrRelayWriterNotOpened.Delegate(errors.New("file not opened"))
}
w.input <- rawData
w.offset.Add(int64(len(rawData)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BinlogReader will use this offset to check whether there's need to re-parse binlog, maybe move it after we actual write data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we use offset to check whether hole exist, so cannot update it later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fileOffset := w.out.Offset()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use another offset?

return w.err.Load()
}

n, err := w.file.Write(rawData)
w.offset.Add(int64(n))

return terror.ErrBinlogWriterWriteDataLen.Delegate(err, len(rawData))
func (w *BinlogWriter) Flush() error {
w.flushWg.Add(1)
if err := w.Write(nil); err != nil {
return err
}
w.flushWg.Wait()
return w.err.Load()
}

func (w *BinlogWriter) Status() *BinlogWriterStatus {
w.mu.RLock()
defer w.mu.RUnlock()

return &BinlogWriterStatus{
Filename: w.filename,
Filename: w.filename.Load(),
Offset: w.offset.Load(),
}
}
Expand All @@ -140,7 +214,5 @@ func (w *BinlogWriter) Offset() int64 {
}

func (w *BinlogWriter) isActive(uuid, filename string) (bool, int64) {
w.mu.RLock()
defer w.mu.RUnlock()
return uuid == w.uuid && filename == w.filename, w.offset.Load()
return uuid == w.uuid.Load() && filename == w.filename.Load(), w.fileOffset.Load()
}
93 changes: 61 additions & 32 deletions dm/relay/binlog_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,27 @@ import (
"bytes"
"os"
"path/filepath"
"strings"
"testing"

. "github.com/pingcap/check"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/dm/pkg/terror"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)

var _ = Suite(&testBinlogWriterSuite{})
type testBinlogWriterSuite struct {
suite.Suite
}

type testBinlogWriterSuite struct{}
func TestBinlogWriterSuite(t *testing.T) {
suite.Run(t, new(testBinlogWriterSuite))
}

func (t *testBinlogWriterSuite) TestWrite(c *C) {
dir := c.MkDir()
func (t *testBinlogWriterSuite) TestWrite() {
dir := t.T().TempDir()
uuid := "3ccc475b-2343-11e7-be21-6c0b84d59f30.000001"
binlogDir := filepath.Join(dir, uuid)
c.Assert(os.Mkdir(binlogDir, 0o755), IsNil)
require.NoError(t.T(), os.Mkdir(binlogDir, 0o755))

filename := "test-mysql-bin.000001"
var (
Expand All @@ -41,64 +47,87 @@ func (t *testBinlogWriterSuite) TestWrite(c *C) {

{
w := NewBinlogWriter(log.L(), dir)
c.Assert(w, NotNil)
c.Assert(w.Open(uuid, filename), IsNil)
require.NotNil(t.T(), w)
require.NoError(t.T(), w.Open(uuid, filename))
fwStatus := w.Status()
c.Assert(fwStatus.Filename, Equals, filename)
c.Assert(fwStatus.Offset, Equals, int64(allData.Len()))
require.Equal(t.T(), filename, fwStatus.Filename)
require.Equal(t.T(), int64(allData.Len()), fwStatus.Offset)
fwStatusStr := fwStatus.String()
c.Assert(strings.Contains(fwStatusStr, "filename"), IsTrue)
c.Assert(w.Close(), IsNil)
require.Contains(t.T(), fwStatusStr, filename)
require.NoError(t.T(), w.Close())
}

{
// not opened
w := NewBinlogWriter(log.L(), dir)
err := w.Write(data1)
c.Assert(err, ErrorMatches, "*not opened")
require.Contains(t.T(), err.Error(), "not opened")

// open non exist dir
err = w.Open("not-exist-uuid", "bin.000001")
c.Assert(err, ErrorMatches, "*no such file or directory")
require.Contains(t.T(), err.Error(), "no such file or directory")
}

{
// normal call flow
w := NewBinlogWriter(log.L(), dir)
err := w.Open(uuid, filename)
c.Assert(err, IsNil)
c.Assert(w.file, NotNil)
c.Assert(w.filename, Equals, filename)
c.Assert(w.offset.Load(), Equals, int64(0))
require.NoError(t.T(), err)
require.NotNil(t.T(), w.file)
require.Equal(t.T(), filename, w.filename.Load())
require.Equal(t.T(), int64(0), w.offset.Load())

err = w.Write(data1)
c.Assert(err, IsNil)
require.NoError(t.T(), err)
err = w.Flush()
require.NoError(t.T(), err)
allData.Write(data1)

fwStatus := w.Status()
c.Assert(fwStatus.Filename, Equals, filename)
c.Assert(fwStatus.Offset, Equals, int64(len(data1)))
require.Equal(t.T(), fwStatus.Filename, w.filename.Load())
require.Equal(t.T(), int64(len(data1)), fwStatus.Offset)

// write data again
data2 := []byte("another-data")
err = w.Write(data2)
c.Assert(err, IsNil)
require.NoError(t.T(), err)
allData.Write(data2)

c.Assert(w.offset.Load(), Equals, int64(allData.Len()))
require.LessOrEqual(t.T(), int64(allData.Len()), w.offset.Load())

err = w.Close()
c.Assert(err, IsNil)
c.Assert(w.file, IsNil)
c.Assert(w.filename, Equals, "")
c.Assert(w.offset.Load(), Equals, int64(0))

c.Assert(w.Close(), IsNil) // noop
require.NoError(t.T(), err)
require.Nil(t.T(), w.file)
require.Equal(t.T(), "", w.filename.Load())
require.Equal(t.T(), int64(0), w.offset.Load())

// try to read the data back
fullName := filepath.Join(binlogDir, filename)
dataInFile, err := os.ReadFile(fullName)
c.Assert(err, IsNil)
c.Assert(dataInFile, DeepEquals, allData.Bytes())
require.NoError(t.T(), err)
require.Equal(t.T(), allData.Bytes(), dataInFile)
}

{
// cover for error
w := NewBinlogWriter(log.L(), dir)
err := w.Open(uuid, filename)
require.NoError(t.T(), err)
require.NotNil(t.T(), w.file)

err = w.Write(data1)
require.NoError(t.T(), err)
err = w.Flush()
require.NoError(t.T(), err)

require.NoError(t.T(), w.file.Close())
// write data again
data2 := []byte("another-data")
// we cannot determine the error is caused by `Write` or `Flush`
// nolint:errcheck
w.Write(data2)
// nolint:errcheck
w.Flush()
require.True(t.T(), terror.ErrBinlogWriterWriteDataLen.Equal(w.Close()))
}
}
8 changes: 7 additions & 1 deletion dm/relay/local_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,10 @@ func newBinlogReaderForTest(logger log.Logger, cfg *BinlogReaderConfig, notify b
func (t *testReaderSuite) setActiveRelayLog(r Process, uuid, filename string, offset int64) {
relay := r.(*Relay)
writer := relay.writer.(*FileWriter)
writer.out.uuid, writer.out.filename = uuid, filename
writer.out.uuid.Store(uuid)
writer.out.filename.Store(filename)
writer.out.offset.Store(offset)
writer.out.fileOffset.Store(offset)
}

func (t *testReaderSuite) createBinlogFileParseState(c *C, relayLogDir, relayLogFile string, offset int64, possibleLast bool) *binlogFileParseState {
Expand Down Expand Up @@ -1234,6 +1236,10 @@ func (m *mockFileWriterForActiveTest) Close() error {
panic("should be used")
}

func (m *mockFileWriterForActiveTest) Flush() error {
panic("should be used")
}

func (m *mockFileWriterForActiveTest) WriteEvent(ev *replication.BinlogEvent) (WResult, error) {
panic("should be used")
}
Expand Down
Loading