Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

relay: add async/batch relay writer #7580

Merged
merged 22 commits into from
Nov 29, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 96 additions & 34 deletions dm/relay/binlog_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package relay

import (
"bytes"
"encoding/json"
"fmt"
"os"
Expand All @@ -27,17 +28,26 @@ import (
"go.uber.org/zap"
)

const (
bufferSize = 1 * 1024 * 1024 // 1MB
chanSize = 1024
)

// BinlogWriter is a binlog event writer which writes binlog events to a file.
// Open/Write/Close cannot be called concurrently.
type BinlogWriter struct {
mu sync.RWMutex

offset atomic.Int64
file *os.File
relayDir string
uuid string
filename string
uuid atomic.String
filename atomic.String
err atomic.Error

logger log.Logger

input chan []byte
flushWg sync.WaitGroup
wg sync.WaitGroup
}

// BinlogWriterStatus represents the status of a BinlogWriter.
Expand All @@ -64,6 +74,53 @@ func NewBinlogWriter(logger log.Logger, relayDir string) *BinlogWriter {
}
}

// run starts the binlog writer.
func (w *BinlogWriter) run() {
var (
buf = &bytes.Buffer{}
errOccurs bool
)

// writeToFile writes buffer to file
writeToFile := func() {
if buf.Len() == 0 {
return
}

if w.file == nil {
w.err.CompareAndSwap(nil, terror.ErrRelayWriterNotOpened.Delegate(errors.New("file not opened")))
errOccurs = true
return
}
n, err := w.file.Write(buf.Bytes())
if err != nil {
w.err.CompareAndSwap(nil, terror.ErrBinlogWriterWriteDataLen.Delegate(err, n))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry seems the nil for CAS is not what I expected

func TestCAS(t *testing.T) {
	var nilErr error
	e := atomic.NewError(nil)
	e.Store(nilErr)
	require.NoError(t, e.Load())
	ok := e.CompareAndSwap(nilErr, errors.New("test"))
	require.True(t, ok)
}

Can we use the nilErr variable or simply Store? but in the latter we will store the last error not the first one.

errOccurs = true
return
}
buf.Reset()
}

for bs := range w.input {
if errOccurs {
continue
}
if bs != nil {
buf.Write(bs)
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
}
// we use bs = nil to mean flush
if bs == nil || buf.Len() > bufferSize || len(w.input) == 0 {
D3Hunter marked this conversation as resolved.
Show resolved Hide resolved
writeToFile()
}
if bs == nil {
w.flushWg.Done()
}
}
if !errOccurs {
writeToFile()
}
}

func (w *BinlogWriter) Open(uuid, filename string) error {
fullName := filepath.Join(w.relayDir, uuid, filename)
f, err := os.OpenFile(fullName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0o600)
Expand All @@ -79,58 +136,65 @@ func (w *BinlogWriter) Open(uuid, filename string) error {
return terror.ErrBinlogWriterGetFileStat.Delegate(err, f.Name())
}

w.mu.Lock()
defer w.mu.Unlock()

w.offset.Store(fs.Size())
w.file = f
w.uuid = uuid
w.filename = filename
w.uuid.Store(uuid)
w.filename.Store(filename)

w.input = make(chan []byte, chanSize)
w.wg.Add(1)
go func() {
defer w.wg.Done()
w.run()
}()

return nil
}

func (w *BinlogWriter) Close() error {
w.mu.Lock()
defer w.mu.Unlock()
if w.input != nil {
close(w.input)
}
w.wg.Wait()

var err error
if w.file != nil {
err2 := w.file.Sync() // try sync manually before close.
if err2 != nil {
w.logger.Error("fail to flush buffered data", zap.String("component", "file writer"), zap.Error(err2))
if err := w.file.Sync(); err != nil {
w.logger.Error("fail to flush buffered data", zap.String("component", "file writer"), zap.Error(err))
}
if err := w.file.Close(); err != nil {
w.err.CompareAndSwap(nil, err)
}
err = w.file.Close()
}

w.file = nil
w.offset.Store(0)
w.uuid = ""
w.filename = ""

return err
w.uuid.Store("")
w.filename.Store("")
w.input = nil
return w.err.Swap(nil)
}

func (w *BinlogWriter) Write(rawData []byte) error {
w.mu.RLock()
defer w.mu.RUnlock()

if w.file == nil {
return terror.ErrRelayWriterNotOpened.Delegate(errors.New("file not opened"))
}
w.input <- rawData
w.offset.Add(int64(len(rawData)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BinlogReader will use this offset to check whether there's need to re-parse binlog, maybe move it after we actual write data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we use offset to check whether hole exist, so cannot update it later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fileOffset := w.out.Offset()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use another offset?

return w.err.Load()
}

n, err := w.file.Write(rawData)
w.offset.Add(int64(n))

return terror.ErrBinlogWriterWriteDataLen.Delegate(err, len(rawData))
func (w *BinlogWriter) Flush() error {
w.flushWg.Add(1)
if err := w.Write(nil); err != nil {
return err
}
w.flushWg.Wait()
return w.err.Load()
}

func (w *BinlogWriter) Status() *BinlogWriterStatus {
w.mu.RLock()
defer w.mu.RUnlock()

return &BinlogWriterStatus{
Filename: w.filename,
Filename: w.filename.Load(),
Offset: w.offset.Load(),
}
}
Expand All @@ -140,7 +204,5 @@ func (w *BinlogWriter) Offset() int64 {
}

func (w *BinlogWriter) isActive(uuid, filename string) (bool, int64) {
w.mu.RLock()
defer w.mu.RUnlock()
return uuid == w.uuid && filename == w.filename, w.offset.Load()
return uuid == w.uuid.Load() && filename == w.filename.Load(), w.offset.Load()
}
8 changes: 5 additions & 3 deletions dm/relay/binlog_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,13 @@ func (t *testBinlogWriterSuite) TestWrite(c *C) {
err := w.Open(uuid, filename)
c.Assert(err, IsNil)
c.Assert(w.file, NotNil)
c.Assert(w.filename, Equals, filename)
c.Assert(w.filename.Load(), Equals, filename)
c.Assert(w.offset.Load(), Equals, int64(0))

err = w.Write(data1)
c.Assert(err, IsNil)
err = w.Flush()
c.Assert(err, IsNil)
allData.Write(data1)

fwStatus := w.Status()
Expand All @@ -85,12 +87,12 @@ func (t *testBinlogWriterSuite) TestWrite(c *C) {
c.Assert(err, IsNil)
allData.Write(data2)

c.Assert(w.offset.Load(), Equals, int64(allData.Len()))
c.Assert(w.offset.Load(), LessEqual, int64(allData.Len()))

err = w.Close()
c.Assert(err, IsNil)
c.Assert(w.file, IsNil)
c.Assert(w.filename, Equals, "")
c.Assert(w.filename.Load(), Equals, "")
c.Assert(w.offset.Load(), Equals, int64(0))

c.Assert(w.Close(), IsNil) // noop
Expand Down
7 changes: 6 additions & 1 deletion dm/relay/local_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ func newBinlogReaderForTest(logger log.Logger, cfg *BinlogReaderConfig, notify b
func (t *testReaderSuite) setActiveRelayLog(r Process, uuid, filename string, offset int64) {
relay := r.(*Relay)
writer := relay.writer.(*FileWriter)
writer.out.uuid, writer.out.filename = uuid, filename
writer.out.uuid.Store(uuid)
writer.out.filename.Store(filename)
writer.out.offset.Store(offset)
}

Expand Down Expand Up @@ -1234,6 +1235,10 @@ func (m *mockFileWriterForActiveTest) Close() error {
panic("should be used")
}

func (m *mockFileWriterForActiveTest) Flush() error {
panic("should be used")
}

func (m *mockFileWriterForActiveTest) WriteEvent(ev *replication.BinlogEvent) (WResult, error) {
panic("should be used")
}
Expand Down
6 changes: 4 additions & 2 deletions dm/relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,8 @@ func (r *Relay) handleEvents(
}

firstEvent := true
relayPosGauge := relayLogPosGauge.WithLabelValues("relay")
relayFileGauge := relayLogFileGauge.WithLabelValues("relay")
for {
// 1. read events from upstream server
readTimer := time.Now()
Expand Down Expand Up @@ -704,11 +706,11 @@ func (r *Relay) handleEvents(
}

relayLogWriteSizeHistogram.Observe(float64(e.Header.EventSize))
relayLogPosGauge.WithLabelValues("relay").Set(float64(lastPos.Pos))
relayPosGauge.Set(float64(lastPos.Pos))
if index, err2 := utils.GetFilenameIndex(lastPos.Name); err2 != nil {
r.logger.Error("parse binlog file name", zap.String("file name", lastPos.Name), log.ShortError(err2))
} else {
relayLogFileGauge.WithLabelValues("relay").Set(float64(index))
relayFileGauge.Set(float64(index))
}

if needSavePos {
Expand Down
14 changes: 14 additions & 0 deletions dm/relay/relay_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ type Writer interface {
WriteEvent(ev *replication.BinlogEvent) (WResult, error)
// IsActive check whether given uuid+filename is active binlog file, if true return current file offset
IsActive(uuid, filename string) (bool, int64)
// Flush flushes the binlog writer.
Flush() error
}

// FileWriter implements Writer interface.
Expand Down Expand Up @@ -104,6 +106,11 @@ func (w *FileWriter) WriteEvent(ev *replication.BinlogEvent) (WResult, error) {
}
}

// Flush implements Writer.Flush.
func (w *FileWriter) Flush() error {
return w.out.Flush()
}

// offset returns the current offset of the binlog file.
// it is only used for testing now.
func (w *FileWriter) offset() int64 {
Expand Down Expand Up @@ -145,6 +152,10 @@ func (w *FileWriter) handleFormatDescriptionEvent(ev *replication.BinlogEvent) (
if err != nil {
return WResult{}, terror.Annotatef(err, "write binlog file header for %s", fullName)
}
err = w.Flush()
if err != nil {
return WResult{}, terror.Annotatef(err, "write binlog file header for %s", fullName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return WResult{}, terror.Annotatef(err, "write binlog file header for %s", fullName)
return WResult{}, terror.Annotatef(err, "flush binlog file for %s", fullName)

}
}

// write the FormatDescriptionEvent if not exists one
Expand Down Expand Up @@ -277,6 +288,9 @@ func (w *FileWriter) handlePotentialHoleOrDuplicate(ev *replication.BinlogEvent)
}

if mayDuplicate {
if err := w.Flush(); err != nil {
return WResult{}, terror.Annotatef(err, "flush before handle duplicate event %v in %s", ev.Header, w.filename.Load())
}
// handle any duplicate events if exist
result, err2 := w.handleDuplicateEventsExist(ev)
if err2 != nil {
Expand Down
4 changes: 4 additions & 0 deletions dm/relay/relay_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ func (t *testFileWriterSuite) TestRotateEventWithFormatDescriptionEvent(c *check
filename2 := filepath.Join(relayDir, uuid, nextFilename)
_, err = os.Stat(filename1)
c.Assert(os.IsNotExist(err), check.IsTrue)
c.Assert(w2.Flush(), check.IsNil)
data, err := os.ReadFile(filename2)
c.Assert(err, check.IsNil)
fileHeaderLen := len(replication.BinLogFileHeader)
Expand Down Expand Up @@ -292,6 +293,7 @@ func (t *testFileWriterSuite) TestRotateEventWithFormatDescriptionEvent(c *check
filename2 = filepath.Join(relayDir, uuid, nextFilename)
_, err = os.Stat(filename2)
c.Assert(os.IsNotExist(err), check.IsTrue)
c.Assert(w3.Flush(), check.IsNil)
data, err = os.ReadFile(filename1)
c.Assert(err, check.IsNil)
c.Assert(len(data), check.Equals, fileHeaderLen+len(formatDescEv.RawData))
Expand Down Expand Up @@ -399,6 +401,7 @@ func (t *testFileWriterSuite) TestWriteMultiEvents(c *check.C) {
c.Assert(result.Ignore, check.IsFalse) // no event is ignored
}

c.Assert(w.Flush(), check.IsNil)
t.verifyFilenameOffset(c, w, filename, int64(allData.Len()))

// read the data back from the file
Expand Down Expand Up @@ -448,6 +451,7 @@ func (t *testFileWriterSuite) TestHandleFileHoleExist(c *check.C) {
result, err = w.WriteEvent(queryEv)
c.Assert(err, check.IsNil)
c.Assert(result.Ignore, check.IsFalse)
c.Assert(w.Flush(), check.IsNil)
fileSize := int64(queryEv.Header.LogPos)
t.verifyFilenameOffset(c, w, filename, fileSize)

Expand Down