diff --git a/dm/relay/binlog_writer.go b/dm/relay/binlog_writer.go index fba7608597a..b383bda0fcb 100644 --- a/dm/relay/binlog_writer.go +++ b/dm/relay/binlog_writer.go @@ -14,30 +14,41 @@ package relay import ( + "bytes" "encoding/json" "fmt" "os" "path/filepath" "sync" - "github.com/pingcap/errors" "github.com/pingcap/tiflow/dm/pkg/log" "github.com/pingcap/tiflow/dm/pkg/terror" "go.uber.org/atomic" "go.uber.org/zap" ) +const ( + bufferSize = 1 * 1024 * 1024 // 1MB + chanSize = 1024 +) + +var nilErr error + // BinlogWriter is a binlog event writer which writes binlog events to a file. +// Open/Write/Close cannot be called concurrently. type BinlogWriter struct { - mu sync.RWMutex - offset atomic.Int64 file *os.File relayDir string - uuid string - filename string + uuid atomic.String + filename atomic.String + err atomic.Error logger log.Logger + + input chan []byte + flushWg sync.WaitGroup + wg sync.WaitGroup } // BinlogWriterStatus represents the status of a BinlogWriter. @@ -64,6 +75,53 @@ func NewBinlogWriter(logger log.Logger, relayDir string) *BinlogWriter { } } +// run starts the binlog writer. +func (w *BinlogWriter) run() { + var ( + buf = &bytes.Buffer{} + errOccurs bool + ) + + // writeToFile writes buffer to file + writeToFile := func() { + if buf.Len() == 0 { + return + } + + if w.file == nil { + w.err.CompareAndSwap(nilErr, terror.ErrRelayWriterNotOpened.Generate()) + errOccurs = true + return + } + n, err := w.file.Write(buf.Bytes()) + if err != nil { + w.err.CompareAndSwap(nilErr, terror.ErrBinlogWriterWriteDataLen.Delegate(err, n)) + errOccurs = true + return + } + buf.Reset() + } + + for bs := range w.input { + if errOccurs { + continue + } + if bs != nil { + buf.Write(bs) + } + // we use bs = nil to mean flush + if bs == nil || buf.Len() > bufferSize || len(w.input) == 0 { + writeToFile() + } + if bs == nil { + w.flushWg.Done() + } + } + if !errOccurs { + writeToFile() + } +} + func (w *BinlogWriter) Open(uuid, filename string) error { fullName := filepath.Join(w.relayDir, uuid, filename) f, err := os.OpenFile(fullName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0o600) @@ -79,58 +137,66 @@ func (w *BinlogWriter) Open(uuid, filename string) error { return terror.ErrBinlogWriterGetFileStat.Delegate(err, f.Name()) } - w.mu.Lock() - defer w.mu.Unlock() - w.offset.Store(fs.Size()) w.file = f - w.uuid = uuid - w.filename = filename + w.uuid.Store(uuid) + w.filename.Store(filename) + w.err.Store(nilErr) + + w.input = make(chan []byte, chanSize) + w.wg.Add(1) + go func() { + defer w.wg.Done() + w.run() + }() return nil } func (w *BinlogWriter) Close() error { - w.mu.Lock() - defer w.mu.Unlock() + if w.input != nil { + close(w.input) + } + w.wg.Wait() - var err error if w.file != nil { - err2 := w.file.Sync() // try sync manually before close. - if err2 != nil { - w.logger.Error("fail to flush buffered data", zap.String("component", "file writer"), zap.Error(err2)) + if err := w.file.Sync(); err != nil { + w.logger.Error("fail to flush buffered data", zap.String("component", "file writer"), zap.Error(err)) + } + if err := w.file.Close(); err != nil { + w.err.CompareAndSwap(nilErr, err) } - err = w.file.Close() } w.file = nil w.offset.Store(0) - w.uuid = "" - w.filename = "" - - return err + w.uuid.Store("") + w.filename.Store("") + w.input = nil + return w.err.Swap(nilErr) } func (w *BinlogWriter) Write(rawData []byte) error { - w.mu.RLock() - defer w.mu.RUnlock() - if w.file == nil { - return terror.ErrRelayWriterNotOpened.Delegate(errors.New("file not opened")) + return terror.ErrRelayWriterNotOpened.Generate() } + w.input <- rawData + w.offset.Add(int64(len(rawData))) + return w.err.Load() +} - n, err := w.file.Write(rawData) - w.offset.Add(int64(n)) - - return terror.ErrBinlogWriterWriteDataLen.Delegate(err, len(rawData)) +func (w *BinlogWriter) Flush() error { + w.flushWg.Add(1) + if err := w.Write(nil); err != nil { + return err + } + w.flushWg.Wait() + return w.err.Load() } func (w *BinlogWriter) Status() *BinlogWriterStatus { - w.mu.RLock() - defer w.mu.RUnlock() - return &BinlogWriterStatus{ - Filename: w.filename, + Filename: w.filename.Load(), Offset: w.offset.Load(), } } @@ -140,7 +206,5 @@ func (w *BinlogWriter) Offset() int64 { } func (w *BinlogWriter) isActive(uuid, filename string) (bool, int64) { - w.mu.RLock() - defer w.mu.RUnlock() - return uuid == w.uuid && filename == w.filename, w.offset.Load() + return uuid == w.uuid.Load() && filename == w.filename.Load(), w.offset.Load() } diff --git a/dm/relay/binlog_writer_test.go b/dm/relay/binlog_writer_test.go index 454265e2055..140b2ae6227 100644 --- a/dm/relay/binlog_writer_test.go +++ b/dm/relay/binlog_writer_test.go @@ -17,21 +17,27 @@ import ( "bytes" "os" "path/filepath" - "strings" + "testing" - . "github.com/pingcap/check" "github.com/pingcap/tiflow/dm/pkg/log" + "github.com/pingcap/tiflow/dm/pkg/terror" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" ) -var _ = Suite(&testBinlogWriterSuite{}) +type testBinlogWriterSuite struct { + suite.Suite +} -type testBinlogWriterSuite struct{} +func TestBinlogWriterSuite(t *testing.T) { + suite.Run(t, new(testBinlogWriterSuite)) +} -func (t *testBinlogWriterSuite) TestWrite(c *C) { - dir := c.MkDir() +func (t *testBinlogWriterSuite) TestWrite() { + dir := t.T().TempDir() uuid := "3ccc475b-2343-11e7-be21-6c0b84d59f30.000001" binlogDir := filepath.Join(dir, uuid) - c.Assert(os.Mkdir(binlogDir, 0o755), IsNil) + require.NoError(t.T(), os.Mkdir(binlogDir, 0o755)) filename := "test-mysql-bin.000001" var ( @@ -41,64 +47,87 @@ func (t *testBinlogWriterSuite) TestWrite(c *C) { { w := NewBinlogWriter(log.L(), dir) - c.Assert(w, NotNil) - c.Assert(w.Open(uuid, filename), IsNil) + require.NotNil(t.T(), w) + require.NoError(t.T(), w.Open(uuid, filename)) fwStatus := w.Status() - c.Assert(fwStatus.Filename, Equals, filename) - c.Assert(fwStatus.Offset, Equals, int64(allData.Len())) + require.Equal(t.T(), filename, fwStatus.Filename) + require.Equal(t.T(), int64(allData.Len()), fwStatus.Offset) fwStatusStr := fwStatus.String() - c.Assert(strings.Contains(fwStatusStr, "filename"), IsTrue) - c.Assert(w.Close(), IsNil) + require.Contains(t.T(), fwStatusStr, filename) + require.NoError(t.T(), w.Close()) } { // not opened w := NewBinlogWriter(log.L(), dir) err := w.Write(data1) - c.Assert(err, ErrorMatches, "*not opened") + require.Contains(t.T(), err.Error(), "no underlying writer opened") // open non exist dir err = w.Open("not-exist-uuid", "bin.000001") - c.Assert(err, ErrorMatches, "*no such file or directory") + require.Contains(t.T(), err.Error(), "no such file or directory") } { // normal call flow w := NewBinlogWriter(log.L(), dir) err := w.Open(uuid, filename) - c.Assert(err, IsNil) - c.Assert(w.file, NotNil) - c.Assert(w.filename, Equals, filename) - c.Assert(w.offset.Load(), Equals, int64(0)) + require.NoError(t.T(), err) + require.NotNil(t.T(), w.file) + require.Equal(t.T(), filename, w.filename.Load()) + require.Equal(t.T(), int64(0), w.offset.Load()) err = w.Write(data1) - c.Assert(err, IsNil) + require.NoError(t.T(), err) + err = w.Flush() + require.NoError(t.T(), err) allData.Write(data1) fwStatus := w.Status() - c.Assert(fwStatus.Filename, Equals, filename) - c.Assert(fwStatus.Offset, Equals, int64(len(data1))) + require.Equal(t.T(), fwStatus.Filename, w.filename.Load()) + require.Equal(t.T(), int64(len(data1)), fwStatus.Offset) // write data again data2 := []byte("another-data") err = w.Write(data2) - c.Assert(err, IsNil) + require.NoError(t.T(), err) allData.Write(data2) - c.Assert(w.offset.Load(), Equals, int64(allData.Len())) + require.LessOrEqual(t.T(), int64(allData.Len()), w.offset.Load()) err = w.Close() - c.Assert(err, IsNil) - c.Assert(w.file, IsNil) - c.Assert(w.filename, Equals, "") - c.Assert(w.offset.Load(), Equals, int64(0)) - - c.Assert(w.Close(), IsNil) // noop + require.NoError(t.T(), err) + require.Nil(t.T(), w.file) + require.Equal(t.T(), "", w.filename.Load()) + require.Equal(t.T(), int64(0), w.offset.Load()) // try to read the data back fullName := filepath.Join(binlogDir, filename) dataInFile, err := os.ReadFile(fullName) - c.Assert(err, IsNil) - c.Assert(dataInFile, DeepEquals, allData.Bytes()) + require.NoError(t.T(), err) + require.Equal(t.T(), allData.Bytes(), dataInFile) + } + + { + // cover for error + w := NewBinlogWriter(log.L(), dir) + err := w.Open(uuid, filename) + require.NoError(t.T(), err) + require.NotNil(t.T(), w.file) + + err = w.Write(data1) + require.NoError(t.T(), err) + err = w.Flush() + require.NoError(t.T(), err) + + require.NoError(t.T(), w.file.Close()) + // write data again + data2 := []byte("another-data") + // we cannot determine the error is caused by `Write` or `Flush` + // nolint:errcheck + w.Write(data2) + // nolint:errcheck + w.Flush() + require.True(t.T(), terror.ErrBinlogWriterWriteDataLen.Equal(w.Close())) } } diff --git a/dm/relay/local_reader_test.go b/dm/relay/local_reader_test.go index 0ff4527d3d7..0480e92b6cf 100644 --- a/dm/relay/local_reader_test.go +++ b/dm/relay/local_reader_test.go @@ -76,7 +76,8 @@ func newBinlogReaderForTest(logger log.Logger, cfg *BinlogReaderConfig, notify b func (t *testReaderSuite) setActiveRelayLog(r Process, uuid, filename string, offset int64) { relay := r.(*Relay) writer := relay.writer.(*FileWriter) - writer.out.uuid, writer.out.filename = uuid, filename + writer.out.uuid.Store(uuid) + writer.out.filename.Store(filename) writer.out.offset.Store(offset) } @@ -1234,6 +1235,10 @@ func (m *mockFileWriterForActiveTest) Close() error { panic("should be used") } +func (m *mockFileWriterForActiveTest) Flush() error { + panic("should be used") +} + func (m *mockFileWriterForActiveTest) WriteEvent(ev *replication.BinlogEvent) (WResult, error) { panic("should be used") } diff --git a/dm/relay/relay.go b/dm/relay/relay.go index 678bbca2ee6..54f6de4d7df 100644 --- a/dm/relay/relay.go +++ b/dm/relay/relay.go @@ -569,6 +569,8 @@ func (r *Relay) handleEvents( } firstEvent := true + relayPosGauge := relayLogPosGauge.WithLabelValues("relay") + relayFileGauge := relayLogFileGauge.WithLabelValues("relay") for { // 1. read events from upstream server readTimer := time.Now() @@ -704,11 +706,11 @@ func (r *Relay) handleEvents( } relayLogWriteSizeHistogram.Observe(float64(e.Header.EventSize)) - relayLogPosGauge.WithLabelValues("relay").Set(float64(lastPos.Pos)) + relayPosGauge.Set(float64(lastPos.Pos)) if index, err2 := utils.GetFilenameIndex(lastPos.Name); err2 != nil { r.logger.Error("parse binlog file name", zap.String("file name", lastPos.Name), log.ShortError(err2)) } else { - relayLogFileGauge.WithLabelValues("relay").Set(float64(index)) + relayFileGauge.Set(float64(index)) } if needSavePos { diff --git a/dm/relay/relay_writer.go b/dm/relay/relay_writer.go index e300b6bf6f8..a8a8c0537d5 100644 --- a/dm/relay/relay_writer.go +++ b/dm/relay/relay_writer.go @@ -57,6 +57,8 @@ type Writer interface { WriteEvent(ev *replication.BinlogEvent) (WResult, error) // IsActive check whether given uuid+filename is active binlog file, if true return current file offset IsActive(uuid, filename string) (bool, int64) + // Flush flushes the binlog writer. + Flush() error } // FileWriter implements Writer interface. @@ -104,6 +106,11 @@ func (w *FileWriter) WriteEvent(ev *replication.BinlogEvent) (WResult, error) { } } +// Flush implements Writer.Flush. +func (w *FileWriter) Flush() error { + return w.out.Flush() +} + // offset returns the current offset of the binlog file. // it is only used for testing now. func (w *FileWriter) offset() int64 { @@ -145,6 +152,10 @@ func (w *FileWriter) handleFormatDescriptionEvent(ev *replication.BinlogEvent) ( if err != nil { return WResult{}, terror.Annotatef(err, "write binlog file header for %s", fullName) } + err = w.Flush() + if err != nil { + return WResult{}, terror.Annotatef(err, "flush binlog file for %s", fullName) + } } // write the FormatDescriptionEvent if not exists one @@ -277,6 +288,9 @@ func (w *FileWriter) handlePotentialHoleOrDuplicate(ev *replication.BinlogEvent) } if mayDuplicate { + if err := w.Flush(); err != nil { + return WResult{}, terror.Annotatef(err, "flush before handle duplicate event %v in %s", ev.Header, w.filename.Load()) + } // handle any duplicate events if exist result, err2 := w.handleDuplicateEventsExist(ev) if err2 != nil { diff --git a/dm/relay/relay_writer_test.go b/dm/relay/relay_writer_test.go index 870bce2b9c7..9b507f7e182 100644 --- a/dm/relay/relay_writer_test.go +++ b/dm/relay/relay_writer_test.go @@ -237,7 +237,7 @@ func (t *testFileWriterSuite) TestRotateEventWithFormatDescriptionEvent(c *check defer w1.Close() w1.Init(uuid, filename) _, err = w1.WriteEvent(rotateEv) - c.Assert(err, check.ErrorMatches, ".*file not opened.*") + c.Assert(err, check.ErrorMatches, ".*no underlying writer opened") // 2. fake RotateEvent before FormatDescriptionEvent relayDir = c.MkDir() // use a new relay directory @@ -262,6 +262,7 @@ func (t *testFileWriterSuite) TestRotateEventWithFormatDescriptionEvent(c *check filename2 := filepath.Join(relayDir, uuid, nextFilename) _, err = os.Stat(filename1) c.Assert(os.IsNotExist(err), check.IsTrue) + c.Assert(w2.Flush(), check.IsNil) data, err := os.ReadFile(filename2) c.Assert(err, check.IsNil) fileHeaderLen := len(replication.BinLogFileHeader) @@ -292,6 +293,7 @@ func (t *testFileWriterSuite) TestRotateEventWithFormatDescriptionEvent(c *check filename2 = filepath.Join(relayDir, uuid, nextFilename) _, err = os.Stat(filename2) c.Assert(os.IsNotExist(err), check.IsTrue) + c.Assert(w3.Flush(), check.IsNil) data, err = os.ReadFile(filename1) c.Assert(err, check.IsNil) c.Assert(len(data), check.Equals, fileHeaderLen+len(formatDescEv.RawData)) @@ -399,6 +401,7 @@ func (t *testFileWriterSuite) TestWriteMultiEvents(c *check.C) { c.Assert(result.Ignore, check.IsFalse) // no event is ignored } + c.Assert(w.Flush(), check.IsNil) t.verifyFilenameOffset(c, w, filename, int64(allData.Len())) // read the data back from the file @@ -448,6 +451,7 @@ func (t *testFileWriterSuite) TestHandleFileHoleExist(c *check.C) { result, err = w.WriteEvent(queryEv) c.Assert(err, check.IsNil) c.Assert(result.Ignore, check.IsFalse) + c.Assert(w.Flush(), check.IsNil) fileSize := int64(queryEv.Header.LogPos) t.verifyFilenameOffset(c, w, filename, fileSize)