From dc9edc540e47abbfea1407b5e55b5517364af2c4 Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Thu, 10 Nov 2022 21:09:42 +0800 Subject: [PATCH 01/15] add aysnc relay writer --- dm/relay/binlog_writer.go | 115 +++++++++++++++++++++++++++++++-- dm/relay/binlog_writer_test.go | 4 +- dm/relay/local_reader_test.go | 4 ++ dm/relay/relay_writer.go | 14 ++++ dm/relay/relay_writer_test.go | 4 ++ 5 files changed, 135 insertions(+), 6 deletions(-) diff --git a/dm/relay/binlog_writer.go b/dm/relay/binlog_writer.go index fba7608597a..5c61b032369 100644 --- a/dm/relay/binlog_writer.go +++ b/dm/relay/binlog_writer.go @@ -14,6 +14,7 @@ package relay import ( + "bytes" "encoding/json" "fmt" "os" @@ -27,6 +28,11 @@ import ( "go.uber.org/zap" ) +const ( + bufferSize = 1024 * 1024 // 1MB + chanSize = 1024 +) + // BinlogWriter is a binlog event writer which writes binlog events to a file. type BinlogWriter struct { mu sync.RWMutex @@ -38,6 +44,11 @@ type BinlogWriter struct { filename string logger log.Logger + + input chan []byte + flushWg sync.WaitGroup + errCh chan error + wg sync.WaitGroup } // BinlogWriterStatus represents the status of a BinlogWriter. @@ -61,6 +72,64 @@ func NewBinlogWriter(logger log.Logger, relayDir string) *BinlogWriter { return &BinlogWriter{ logger: logger, relayDir: relayDir, + errCh: make(chan error, 1), + } +} + +// run starts the binlog writer. +func (w *BinlogWriter) run() { + var ( + buf = &bytes.Buffer{} + errOccurs bool + ) + + // writeToFile writes buffer to file + writeToFile := func() { + if buf.Len() == 0 { + return + } + + w.mu.Lock() + defer w.mu.Unlock() + if w.file == nil { + select { + case w.errCh <- terror.ErrRelayWriterNotOpened.Delegate(errors.New("file not opened")): + default: + } + errOccurs = true + return + } + n, err := w.file.Write(buf.Bytes()) + if err != nil { + select { + case w.errCh <- terror.ErrBinlogWriterWriteDataLen.Delegate(err, n): + default: + } + errOccurs = true + return + } + buf.Reset() + } + + for bs := range w.input { + if errOccurs { + continue + } + + if len(bs) != 0 { + buf.Write(bs) + } + + if len(bs) == 0 || buf.Len() > bufferSize || len(w.input) == 0 { + writeToFile() + } + + if len(bs) == 0 { + w.flushWg.Done() + } + } + if !errOccurs { + writeToFile() } } @@ -87,10 +156,24 @@ func (w *BinlogWriter) Open(uuid, filename string) error { w.uuid = uuid w.filename = filename + w.input = make(chan []byte, chanSize) + w.wg.Add(1) + go func() { + defer w.wg.Done() + w.run() + }() + return nil } func (w *BinlogWriter) Close() error { + w.mu.Lock() + if w.input != nil { + close(w.input) + } + w.mu.Unlock() + w.wg.Wait() + w.mu.Lock() defer w.mu.Unlock() @@ -107,22 +190,44 @@ func (w *BinlogWriter) Close() error { w.offset.Store(0) w.uuid = "" w.filename = "" + w.input = nil + if writeErr := w.Error(); writeErr != nil { + return writeErr + } return err } func (w *BinlogWriter) Write(rawData []byte) error { w.mu.RLock() - defer w.mu.RUnlock() - if w.file == nil { + w.mu.RUnlock() return terror.ErrRelayWriterNotOpened.Delegate(errors.New("file not opened")) } + input := w.input + w.mu.RUnlock() + + input <- rawData + w.offset.Add(int64(len(rawData))) + return w.Error() +} - n, err := w.file.Write(rawData) - w.offset.Add(int64(n)) +func (w *BinlogWriter) Flush() error { + w.flushWg.Add(1) + if err := w.Write(nil); err != nil { + return err + } + w.flushWg.Wait() + return w.Error() +} - return terror.ErrBinlogWriterWriteDataLen.Delegate(err, len(rawData)) +func (w *BinlogWriter) Error() error { + select { + case err := <-w.errCh: + return err + default: + return nil + } } func (w *BinlogWriter) Status() *BinlogWriterStatus { diff --git a/dm/relay/binlog_writer_test.go b/dm/relay/binlog_writer_test.go index 454265e2055..65eca5fb04b 100644 --- a/dm/relay/binlog_writer_test.go +++ b/dm/relay/binlog_writer_test.go @@ -73,6 +73,8 @@ func (t *testBinlogWriterSuite) TestWrite(c *C) { err = w.Write(data1) c.Assert(err, IsNil) + err = w.Flush() + c.Assert(err, IsNil) allData.Write(data1) fwStatus := w.Status() @@ -85,7 +87,7 @@ func (t *testBinlogWriterSuite) TestWrite(c *C) { c.Assert(err, IsNil) allData.Write(data2) - c.Assert(w.offset.Load(), Equals, int64(allData.Len())) + c.Assert(w.offset.Load(), LessEqual, int64(allData.Len())) err = w.Close() c.Assert(err, IsNil) diff --git a/dm/relay/local_reader_test.go b/dm/relay/local_reader_test.go index 0ff4527d3d7..8a16cbddd2c 100644 --- a/dm/relay/local_reader_test.go +++ b/dm/relay/local_reader_test.go @@ -1234,6 +1234,10 @@ func (m *mockFileWriterForActiveTest) Close() error { panic("should be used") } +func (m *mockFileWriterForActiveTest) Flush() error { + panic("should be used") +} + func (m *mockFileWriterForActiveTest) WriteEvent(ev *replication.BinlogEvent) (WResult, error) { panic("should be used") } diff --git a/dm/relay/relay_writer.go b/dm/relay/relay_writer.go index e300b6bf6f8..4675c5ef25f 100644 --- a/dm/relay/relay_writer.go +++ b/dm/relay/relay_writer.go @@ -57,6 +57,8 @@ type Writer interface { WriteEvent(ev *replication.BinlogEvent) (WResult, error) // IsActive check whether given uuid+filename is active binlog file, if true return current file offset IsActive(uuid, filename string) (bool, int64) + // Flush flushes the binlog writer. + Flush() error } // FileWriter implements Writer interface. @@ -104,6 +106,11 @@ func (w *FileWriter) WriteEvent(ev *replication.BinlogEvent) (WResult, error) { } } +// Flush implements Writer.Flush. +func (w *FileWriter) Flush() error { + return w.out.Flush() +} + // offset returns the current offset of the binlog file. // it is only used for testing now. func (w *FileWriter) offset() int64 { @@ -145,6 +152,10 @@ func (w *FileWriter) handleFormatDescriptionEvent(ev *replication.BinlogEvent) ( if err != nil { return WResult{}, terror.Annotatef(err, "write binlog file header for %s", fullName) } + err = w.Flush() + if err != nil { + return WResult{}, terror.Annotatef(err, "write binlog file header for %s", fullName) + } } // write the FormatDescriptionEvent if not exists one @@ -277,6 +288,9 @@ func (w *FileWriter) handlePotentialHoleOrDuplicate(ev *replication.BinlogEvent) } if mayDuplicate { + if err := w.Flush(); err != nil { + return WResult{}, terror.Annotatef(err, "flush before handle duplicate event %v in %s", ev.Header, w.filename.Load()) + } // handle any duplicate events if exist result, err2 := w.handleDuplicateEventsExist(ev) if err2 != nil { diff --git a/dm/relay/relay_writer_test.go b/dm/relay/relay_writer_test.go index 870bce2b9c7..3b0a7e33a95 100644 --- a/dm/relay/relay_writer_test.go +++ b/dm/relay/relay_writer_test.go @@ -262,6 +262,7 @@ func (t *testFileWriterSuite) TestRotateEventWithFormatDescriptionEvent(c *check filename2 := filepath.Join(relayDir, uuid, nextFilename) _, err = os.Stat(filename1) c.Assert(os.IsNotExist(err), check.IsTrue) + c.Assert(w2.Flush(), check.IsNil) data, err := os.ReadFile(filename2) c.Assert(err, check.IsNil) fileHeaderLen := len(replication.BinLogFileHeader) @@ -292,6 +293,7 @@ func (t *testFileWriterSuite) TestRotateEventWithFormatDescriptionEvent(c *check filename2 = filepath.Join(relayDir, uuid, nextFilename) _, err = os.Stat(filename2) c.Assert(os.IsNotExist(err), check.IsTrue) + c.Assert(w3.Flush(), check.IsNil) data, err = os.ReadFile(filename1) c.Assert(err, check.IsNil) c.Assert(len(data), check.Equals, fileHeaderLen+len(formatDescEv.RawData)) @@ -399,6 +401,7 @@ func (t *testFileWriterSuite) TestWriteMultiEvents(c *check.C) { c.Assert(result.Ignore, check.IsFalse) // no event is ignored } + c.Assert(w.Flush(), check.IsNil) t.verifyFilenameOffset(c, w, filename, int64(allData.Len())) // read the data back from the file @@ -448,6 +451,7 @@ func (t *testFileWriterSuite) TestHandleFileHoleExist(c *check.C) { result, err = w.WriteEvent(queryEv) c.Assert(err, check.IsNil) c.Assert(result.Ignore, check.IsFalse) + c.Assert(w.Flush(), check.IsNil) fileSize := int64(queryEv.Header.LogPos) t.verifyFilenameOffset(c, w, filename, fileSize) From a13cb43912cfc8fac647bd69fd0468f5dd8c33c5 Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Fri, 11 Nov 2022 10:17:28 +0800 Subject: [PATCH 02/15] update --- dm/relay/binlog_writer.go | 47 +++++++++++++++++++++++---------------- 1 file changed, 28 insertions(+), 19 deletions(-) diff --git a/dm/relay/binlog_writer.go b/dm/relay/binlog_writer.go index 5c61b032369..64718ec529e 100644 --- a/dm/relay/binlog_writer.go +++ b/dm/relay/binlog_writer.go @@ -20,6 +20,7 @@ import ( "os" "path/filepath" "sync" + "time" "github.com/pingcap/errors" "github.com/pingcap/tiflow/dm/pkg/log" @@ -29,8 +30,9 @@ import ( ) const ( - bufferSize = 1024 * 1024 // 1MB + bufferSize = 1 * 1024 * 1024 // 1MB chanSize = 1024 + waitTime = 10 * time.Millisecond ) // BinlogWriter is a binlog event writer which writes binlog events to a file. @@ -111,26 +113,33 @@ func (w *BinlogWriter) run() { buf.Reset() } - for bs := range w.input { - if errOccurs { - continue - } - - if len(bs) != 0 { - buf.Write(bs) - } - - if len(bs) == 0 || buf.Len() > bufferSize || len(w.input) == 0 { - writeToFile() - } - - if len(bs) == 0 { - w.flushWg.Done() + for { + select { + case bs, ok := <-w.input: + if !ok { + if !errOccurs { + writeToFile() + } + return + } + if errOccurs { + continue + } + if bs != nil { + buf.Write(bs) + } + if bs == nil || buf.Len() > bufferSize { + writeToFile() + } + if bs == nil { + w.flushWg.Done() + } + case <-time.After(waitTime): + if !errOccurs { + writeToFile() + } } } - if !errOccurs { - writeToFile() - } } func (w *BinlogWriter) Open(uuid, filename string) error { From 20c870a54ddc43b52dbc5567c06fb7b8eec3cbff Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Thu, 17 Nov 2022 19:32:40 +0800 Subject: [PATCH 03/15] remove lock --- dm/relay/binlog_writer.go | 81 +++++++++++----------------------- dm/relay/binlog_writer_test.go | 4 +- dm/relay/local_reader_test.go | 3 +- 3 files changed, 29 insertions(+), 59 deletions(-) diff --git a/dm/relay/binlog_writer.go b/dm/relay/binlog_writer.go index 64718ec529e..c3f88e2a1c9 100644 --- a/dm/relay/binlog_writer.go +++ b/dm/relay/binlog_writer.go @@ -36,14 +36,13 @@ const ( ) // BinlogWriter is a binlog event writer which writes binlog events to a file. +// Open/Write/Close cannot be called concurrently type BinlogWriter struct { - mu sync.RWMutex - offset atomic.Int64 file *os.File relayDir string - uuid string - filename string + uuid atomic.String + filename atomic.String logger log.Logger @@ -91,8 +90,6 @@ func (w *BinlogWriter) run() { return } - w.mu.Lock() - defer w.mu.Unlock() if w.file == nil { select { case w.errCh <- terror.ErrRelayWriterNotOpened.Delegate(errors.New("file not opened")): @@ -113,33 +110,23 @@ func (w *BinlogWriter) run() { buf.Reset() } - for { - select { - case bs, ok := <-w.input: - if !ok { - if !errOccurs { - writeToFile() - } - return - } - if errOccurs { - continue - } - if bs != nil { - buf.Write(bs) - } - if bs == nil || buf.Len() > bufferSize { - writeToFile() - } - if bs == nil { - w.flushWg.Done() - } - case <-time.After(waitTime): - if !errOccurs { - writeToFile() - } + for bs := range w.input { + if errOccurs { + continue + } + if bs != nil { + buf.Write(bs) + } + if bs == nil || buf.Len() > bufferSize || len(w.input) == 0 { + writeToFile() + } + if bs == nil { + w.flushWg.Done() } } + if !errOccurs { + writeToFile() + } } func (w *BinlogWriter) Open(uuid, filename string) error { @@ -157,13 +144,10 @@ func (w *BinlogWriter) Open(uuid, filename string) error { return terror.ErrBinlogWriterGetFileStat.Delegate(err, f.Name()) } - w.mu.Lock() - defer w.mu.Unlock() - w.offset.Store(fs.Size()) w.file = f - w.uuid = uuid - w.filename = filename + w.uuid.Store(uuid) + w.filename.Store(filename) w.input = make(chan []byte, chanSize) w.wg.Add(1) @@ -176,16 +160,11 @@ func (w *BinlogWriter) Open(uuid, filename string) error { } func (w *BinlogWriter) Close() error { - w.mu.Lock() if w.input != nil { close(w.input) } - w.mu.Unlock() w.wg.Wait() - w.mu.Lock() - defer w.mu.Unlock() - var err error if w.file != nil { err2 := w.file.Sync() // try sync manually before close. @@ -197,8 +176,8 @@ func (w *BinlogWriter) Close() error { w.file = nil w.offset.Store(0) - w.uuid = "" - w.filename = "" + w.uuid.Store("") + w.filename.Store("") w.input = nil if writeErr := w.Error(); writeErr != nil { @@ -208,15 +187,10 @@ func (w *BinlogWriter) Close() error { } func (w *BinlogWriter) Write(rawData []byte) error { - w.mu.RLock() if w.file == nil { - w.mu.RUnlock() return terror.ErrRelayWriterNotOpened.Delegate(errors.New("file not opened")) } - input := w.input - w.mu.RUnlock() - - input <- rawData + w.input <- rawData w.offset.Add(int64(len(rawData))) return w.Error() } @@ -240,11 +214,8 @@ func (w *BinlogWriter) Error() error { } func (w *BinlogWriter) Status() *BinlogWriterStatus { - w.mu.RLock() - defer w.mu.RUnlock() - return &BinlogWriterStatus{ - Filename: w.filename, + Filename: w.filename.Load(), Offset: w.offset.Load(), } } @@ -254,7 +225,5 @@ func (w *BinlogWriter) Offset() int64 { } func (w *BinlogWriter) isActive(uuid, filename string) (bool, int64) { - w.mu.RLock() - defer w.mu.RUnlock() - return uuid == w.uuid && filename == w.filename, w.offset.Load() + return uuid == w.uuid.Load() && filename == w.filename.Load(), w.offset.Load() } diff --git a/dm/relay/binlog_writer_test.go b/dm/relay/binlog_writer_test.go index 65eca5fb04b..553879af584 100644 --- a/dm/relay/binlog_writer_test.go +++ b/dm/relay/binlog_writer_test.go @@ -68,7 +68,7 @@ func (t *testBinlogWriterSuite) TestWrite(c *C) { err := w.Open(uuid, filename) c.Assert(err, IsNil) c.Assert(w.file, NotNil) - c.Assert(w.filename, Equals, filename) + c.Assert(w.filename.Load(), Equals, filename) c.Assert(w.offset.Load(), Equals, int64(0)) err = w.Write(data1) @@ -92,7 +92,7 @@ func (t *testBinlogWriterSuite) TestWrite(c *C) { err = w.Close() c.Assert(err, IsNil) c.Assert(w.file, IsNil) - c.Assert(w.filename, Equals, "") + c.Assert(w.filename.Load(), Equals, "") c.Assert(w.offset.Load(), Equals, int64(0)) c.Assert(w.Close(), IsNil) // noop diff --git a/dm/relay/local_reader_test.go b/dm/relay/local_reader_test.go index 8a16cbddd2c..0480e92b6cf 100644 --- a/dm/relay/local_reader_test.go +++ b/dm/relay/local_reader_test.go @@ -76,7 +76,8 @@ func newBinlogReaderForTest(logger log.Logger, cfg *BinlogReaderConfig, notify b func (t *testReaderSuite) setActiveRelayLog(r Process, uuid, filename string, offset int64) { relay := r.(*Relay) writer := relay.writer.(*FileWriter) - writer.out.uuid, writer.out.filename = uuid, filename + writer.out.uuid.Store(uuid) + writer.out.filename.Store(filename) writer.out.offset.Store(offset) } From ab28ad7d74ba696418043783ea12b3e5b122facf Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Tue, 22 Nov 2022 18:03:25 +0800 Subject: [PATCH 04/15] fix lint --- dm/relay/binlog_writer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dm/relay/binlog_writer.go b/dm/relay/binlog_writer.go index c3f88e2a1c9..aed17408fd2 100644 --- a/dm/relay/binlog_writer.go +++ b/dm/relay/binlog_writer.go @@ -36,7 +36,7 @@ const ( ) // BinlogWriter is a binlog event writer which writes binlog events to a file. -// Open/Write/Close cannot be called concurrently +// Open/Write/Close cannot be called concurrently. type BinlogWriter struct { offset atomic.Int64 file *os.File From 0ee15f36a3d4bf22b89592cdc78de7f67153726c Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Thu, 24 Nov 2022 15:55:51 +0800 Subject: [PATCH 05/15] address comment --- dm/relay/binlog_writer.go | 44 ++++++++++----------------------------- dm/relay/relay.go | 6 ++++-- 2 files changed, 15 insertions(+), 35 deletions(-) diff --git a/dm/relay/binlog_writer.go b/dm/relay/binlog_writer.go index aed17408fd2..f178c1c8f21 100644 --- a/dm/relay/binlog_writer.go +++ b/dm/relay/binlog_writer.go @@ -20,7 +20,6 @@ import ( "os" "path/filepath" "sync" - "time" "github.com/pingcap/errors" "github.com/pingcap/tiflow/dm/pkg/log" @@ -32,7 +31,6 @@ import ( const ( bufferSize = 1 * 1024 * 1024 // 1MB chanSize = 1024 - waitTime = 10 * time.Millisecond ) // BinlogWriter is a binlog event writer which writes binlog events to a file. @@ -43,12 +41,12 @@ type BinlogWriter struct { relayDir string uuid atomic.String filename atomic.String + err atomic.Error logger log.Logger input chan []byte flushWg sync.WaitGroup - errCh chan error wg sync.WaitGroup } @@ -73,7 +71,6 @@ func NewBinlogWriter(logger log.Logger, relayDir string) *BinlogWriter { return &BinlogWriter{ logger: logger, relayDir: relayDir, - errCh: make(chan error, 1), } } @@ -91,19 +88,13 @@ func (w *BinlogWriter) run() { } if w.file == nil { - select { - case w.errCh <- terror.ErrRelayWriterNotOpened.Delegate(errors.New("file not opened")): - default: - } + w.err.CompareAndSwap(nil, terror.ErrRelayWriterNotOpened.Delegate(errors.New("file not opened"))) errOccurs = true return } n, err := w.file.Write(buf.Bytes()) if err != nil { - select { - case w.errCh <- terror.ErrBinlogWriterWriteDataLen.Delegate(err, n): - default: - } + w.err.CompareAndSwap(nil, terror.ErrBinlogWriterWriteDataLen.Delegate(err, n)) errOccurs = true return } @@ -165,13 +156,13 @@ func (w *BinlogWriter) Close() error { } w.wg.Wait() - var err error if w.file != nil { - err2 := w.file.Sync() // try sync manually before close. - if err2 != nil { - w.logger.Error("fail to flush buffered data", zap.String("component", "file writer"), zap.Error(err2)) + if err := w.file.Sync(); err != nil { + w.logger.Error("fail to flush buffered data", zap.String("component", "file writer"), zap.Error(err)) + } + if err := w.file.Close(); err != nil { + w.err.CompareAndSwap(nil, err) } - err = w.file.Close() } w.file = nil @@ -179,11 +170,7 @@ func (w *BinlogWriter) Close() error { w.uuid.Store("") w.filename.Store("") w.input = nil - - if writeErr := w.Error(); writeErr != nil { - return writeErr - } - return err + return w.err.Swap(nil) } func (w *BinlogWriter) Write(rawData []byte) error { @@ -192,7 +179,7 @@ func (w *BinlogWriter) Write(rawData []byte) error { } w.input <- rawData w.offset.Add(int64(len(rawData))) - return w.Error() + return w.err.Load() } func (w *BinlogWriter) Flush() error { @@ -201,16 +188,7 @@ func (w *BinlogWriter) Flush() error { return err } w.flushWg.Wait() - return w.Error() -} - -func (w *BinlogWriter) Error() error { - select { - case err := <-w.errCh: - return err - default: - return nil - } + return w.err.Load() } func (w *BinlogWriter) Status() *BinlogWriterStatus { diff --git a/dm/relay/relay.go b/dm/relay/relay.go index 678bbca2ee6..54f6de4d7df 100644 --- a/dm/relay/relay.go +++ b/dm/relay/relay.go @@ -569,6 +569,8 @@ func (r *Relay) handleEvents( } firstEvent := true + relayPosGauge := relayLogPosGauge.WithLabelValues("relay") + relayFileGauge := relayLogFileGauge.WithLabelValues("relay") for { // 1. read events from upstream server readTimer := time.Now() @@ -704,11 +706,11 @@ func (r *Relay) handleEvents( } relayLogWriteSizeHistogram.Observe(float64(e.Header.EventSize)) - relayLogPosGauge.WithLabelValues("relay").Set(float64(lastPos.Pos)) + relayPosGauge.Set(float64(lastPos.Pos)) if index, err2 := utils.GetFilenameIndex(lastPos.Name); err2 != nil { r.logger.Error("parse binlog file name", zap.String("file name", lastPos.Name), log.ShortError(err2)) } else { - relayLogFileGauge.WithLabelValues("relay").Set(float64(index)) + relayFileGauge.Set(float64(index)) } if needSavePos { From 9057ed78d145561872d9de3fa64f1ec7bc6d68be Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Thu, 24 Nov 2022 15:57:17 +0800 Subject: [PATCH 06/15] add comment --- dm/relay/binlog_writer.go | 1 + 1 file changed, 1 insertion(+) diff --git a/dm/relay/binlog_writer.go b/dm/relay/binlog_writer.go index f178c1c8f21..1f503935dae 100644 --- a/dm/relay/binlog_writer.go +++ b/dm/relay/binlog_writer.go @@ -108,6 +108,7 @@ func (w *BinlogWriter) run() { if bs != nil { buf.Write(bs) } + // we use bs = nil to mean flush if bs == nil || buf.Len() > bufferSize || len(w.input) == 0 { writeToFile() } From 4453b50173b235d269345229d054d9636a1135e1 Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Fri, 25 Nov 2022 11:01:58 +0800 Subject: [PATCH 07/15] add fileOffset --- dm/relay/binlog_writer.go | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/dm/relay/binlog_writer.go b/dm/relay/binlog_writer.go index 1f503935dae..8bbc67b9f0e 100644 --- a/dm/relay/binlog_writer.go +++ b/dm/relay/binlog_writer.go @@ -36,12 +36,15 @@ const ( // BinlogWriter is a binlog event writer which writes binlog events to a file. // Open/Write/Close cannot be called concurrently. type BinlogWriter struct { - offset atomic.Int64 - file *os.File - relayDir string - uuid atomic.String - filename atomic.String - err atomic.Error + // offset means the offset we add to binlog writer + offset atomic.Int64 + // fileOffset means the offset we finish write to the file + fileOffset atomic.Int64 + file *os.File + relayDir string + uuid atomic.String + filename atomic.String + err atomic.Error logger log.Logger @@ -98,6 +101,7 @@ func (w *BinlogWriter) run() { errOccurs = true return } + w.fileOffset.Add(int64(n)) buf.Reset() } @@ -136,7 +140,9 @@ func (w *BinlogWriter) Open(uuid, filename string) error { return terror.ErrBinlogWriterGetFileStat.Delegate(err, f.Name()) } - w.offset.Store(fs.Size()) + size := fs.Size() + w.offset.Store(size) + w.fileOffset.Store(size) w.file = f w.uuid.Store(uuid) w.filename.Store(filename) @@ -168,6 +174,7 @@ func (w *BinlogWriter) Close() error { w.file = nil w.offset.Store(0) + w.fileOffset.Store(0) w.uuid.Store("") w.filename.Store("") w.input = nil @@ -204,5 +211,5 @@ func (w *BinlogWriter) Offset() int64 { } func (w *BinlogWriter) isActive(uuid, filename string) (bool, int64) { - return uuid == w.uuid.Load() && filename == w.filename.Load(), w.offset.Load() + return uuid == w.uuid.Load() && filename == w.filename.Load(), w.fileOffset.Load() } From 6ed8b7cbcd92b034bde915fec804ccd79b7083b6 Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Fri, 25 Nov 2022 11:17:36 +0800 Subject: [PATCH 08/15] use nil error --- dm/relay/binlog_writer.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/dm/relay/binlog_writer.go b/dm/relay/binlog_writer.go index 8bbc67b9f0e..d5e40c9ce64 100644 --- a/dm/relay/binlog_writer.go +++ b/dm/relay/binlog_writer.go @@ -33,6 +33,8 @@ const ( chanSize = 1024 ) +var nilErr error + // BinlogWriter is a binlog event writer which writes binlog events to a file. // Open/Write/Close cannot be called concurrently. type BinlogWriter struct { @@ -91,13 +93,13 @@ func (w *BinlogWriter) run() { } if w.file == nil { - w.err.CompareAndSwap(nil, terror.ErrRelayWriterNotOpened.Delegate(errors.New("file not opened"))) + w.err.CompareAndSwap(nilErr, terror.ErrRelayWriterNotOpened.Delegate(errors.New("file not opened"))) errOccurs = true return } n, err := w.file.Write(buf.Bytes()) if err != nil { - w.err.CompareAndSwap(nil, terror.ErrBinlogWriterWriteDataLen.Delegate(err, n)) + w.err.CompareAndSwap(nilErr, terror.ErrBinlogWriterWriteDataLen.Delegate(err, n)) errOccurs = true return } @@ -168,7 +170,7 @@ func (w *BinlogWriter) Close() error { w.logger.Error("fail to flush buffered data", zap.String("component", "file writer"), zap.Error(err)) } if err := w.file.Close(); err != nil { - w.err.CompareAndSwap(nil, err) + w.err.CompareAndSwap(nilErr, err) } } @@ -178,7 +180,7 @@ func (w *BinlogWriter) Close() error { w.uuid.Store("") w.filename.Store("") w.input = nil - return w.err.Swap(nil) + return w.err.Swap(nilErr) } func (w *BinlogWriter) Write(rawData []byte) error { From 2a44a25a9d3d7c744004b175c7dcfc22ce09fbcb Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Fri, 25 Nov 2022 12:24:18 +0800 Subject: [PATCH 09/15] fix ut --- dm/relay/local_reader_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/dm/relay/local_reader_test.go b/dm/relay/local_reader_test.go index 0480e92b6cf..df6c33fbd81 100644 --- a/dm/relay/local_reader_test.go +++ b/dm/relay/local_reader_test.go @@ -79,6 +79,7 @@ func (t *testReaderSuite) setActiveRelayLog(r Process, uuid, filename string, of writer.out.uuid.Store(uuid) writer.out.filename.Store(filename) writer.out.offset.Store(offset) + writer.out.fileOffset.Store(offset) } func (t *testReaderSuite) createBinlogFileParseState(c *C, relayLogDir, relayLogFile string, offset int64, possibleLast bool) *binlogFileParseState { From e3fa74117da77ac2f7bb8e41fbf2cf6fe498d88b Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Mon, 28 Nov 2022 13:54:47 +0800 Subject: [PATCH 10/15] address comment --- dm/relay/binlog_writer.go | 1 + dm/relay/binlog_writer_test.go | 88 +++++++++++++++++++++------------- 2 files changed, 56 insertions(+), 33 deletions(-) diff --git a/dm/relay/binlog_writer.go b/dm/relay/binlog_writer.go index d5e40c9ce64..68a2ececa78 100644 --- a/dm/relay/binlog_writer.go +++ b/dm/relay/binlog_writer.go @@ -148,6 +148,7 @@ func (w *BinlogWriter) Open(uuid, filename string) error { w.file = f w.uuid.Store(uuid) w.filename.Store(filename) + w.err.Store(nilErr) w.input = make(chan []byte, chanSize) w.wg.Add(1) diff --git a/dm/relay/binlog_writer_test.go b/dm/relay/binlog_writer_test.go index 553879af584..91e34a05ac6 100644 --- a/dm/relay/binlog_writer_test.go +++ b/dm/relay/binlog_writer_test.go @@ -17,21 +17,27 @@ import ( "bytes" "os" "path/filepath" - "strings" + "testing" - . "github.com/pingcap/check" "github.com/pingcap/tiflow/dm/pkg/log" + "github.com/pingcap/tiflow/dm/pkg/terror" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" ) -var _ = Suite(&testBinlogWriterSuite{}) +type testBinlogWriterSuite struct { + suite.Suite +} -type testBinlogWriterSuite struct{} +func TestBinlogWriterSuite(t *testing.T) { + suite.Run(t, new(testBinlogWriterSuite)) +} -func (t *testBinlogWriterSuite) TestWrite(c *C) { - dir := c.MkDir() +func (t *testBinlogWriterSuite) TestWrite() { + dir := t.T().TempDir() uuid := "3ccc475b-2343-11e7-be21-6c0b84d59f30.000001" binlogDir := filepath.Join(dir, uuid) - c.Assert(os.Mkdir(binlogDir, 0o755), IsNil) + require.NoError(t.T(), os.Mkdir(binlogDir, 0o755)) filename := "test-mysql-bin.000001" var ( @@ -41,66 +47,82 @@ func (t *testBinlogWriterSuite) TestWrite(c *C) { { w := NewBinlogWriter(log.L(), dir) - c.Assert(w, NotNil) - c.Assert(w.Open(uuid, filename), IsNil) + require.NotNil(t.T(), w) + require.NoError(t.T(), w.Open(uuid, filename)) fwStatus := w.Status() - c.Assert(fwStatus.Filename, Equals, filename) - c.Assert(fwStatus.Offset, Equals, int64(allData.Len())) + require.Equal(t.T(), filename, fwStatus.Filename) + require.Equal(t.T(), int64(allData.Len()), fwStatus.Offset) fwStatusStr := fwStatus.String() - c.Assert(strings.Contains(fwStatusStr, "filename"), IsTrue) - c.Assert(w.Close(), IsNil) + require.Contains(t.T(), fwStatusStr, filename) + require.NoError(t.T(), w.Close()) } { // not opened w := NewBinlogWriter(log.L(), dir) err := w.Write(data1) - c.Assert(err, ErrorMatches, "*not opened") + require.Contains(t.T(), err.Error(), "not opened") // open non exist dir err = w.Open("not-exist-uuid", "bin.000001") - c.Assert(err, ErrorMatches, "*no such file or directory") + require.Contains(t.T(), err.Error(), "no such file or directory") } { // normal call flow w := NewBinlogWriter(log.L(), dir) err := w.Open(uuid, filename) - c.Assert(err, IsNil) - c.Assert(w.file, NotNil) - c.Assert(w.filename.Load(), Equals, filename) - c.Assert(w.offset.Load(), Equals, int64(0)) + require.NoError(t.T(), err) + require.NotNil(t.T(), w.file) + require.Equal(t.T(), filename, w.filename.Load()) + require.Equal(t.T(), int64(0), w.offset.Load()) err = w.Write(data1) - c.Assert(err, IsNil) + require.NoError(t.T(), err) err = w.Flush() - c.Assert(err, IsNil) + require.NoError(t.T(), err) allData.Write(data1) fwStatus := w.Status() - c.Assert(fwStatus.Filename, Equals, filename) - c.Assert(fwStatus.Offset, Equals, int64(len(data1))) + require.Equal(t.T(), fwStatus.Filename, w.filename.Load()) + require.Equal(t.T(), int64(len(data1)), fwStatus.Offset) // write data again data2 := []byte("another-data") err = w.Write(data2) - c.Assert(err, IsNil) + require.NoError(t.T(), err) allData.Write(data2) - c.Assert(w.offset.Load(), LessEqual, int64(allData.Len())) + require.LessOrEqual(t.T(), int64(allData.Len()), w.offset.Load()) err = w.Close() - c.Assert(err, IsNil) - c.Assert(w.file, IsNil) - c.Assert(w.filename.Load(), Equals, "") - c.Assert(w.offset.Load(), Equals, int64(0)) - - c.Assert(w.Close(), IsNil) // noop + require.NoError(t.T(), err) + require.Nil(t.T(), w.file) + require.Equal(t.T(), "", w.filename.Load()) + require.Equal(t.T(), int64(0), w.offset.Load()) // try to read the data back fullName := filepath.Join(binlogDir, filename) dataInFile, err := os.ReadFile(fullName) - c.Assert(err, IsNil) - c.Assert(dataInFile, DeepEquals, allData.Bytes()) + require.NoError(t.T(), err) + require.Equal(t.T(), allData.Bytes(), dataInFile) + } + + { + // cover for error + w := NewBinlogWriter(log.L(), dir) + err := w.Open(uuid, filename) + require.NoError(t.T(), err) + require.NotNil(t.T(), w.file) + + err = w.Write(data1) + require.NoError(t.T(), err) + + w.file = nil + // write data again + data2 := []byte("another-data") + err = w.Write(data2) + require.True(t.T(), terror.ErrRelayWriterNotOpened.Equal(err)) + require.True(t.T(), terror.ErrRelayWriterNotOpened.Equal(w.Close())) } } From b0b75a0bb8998a150e475a3418209f275fc236c8 Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Mon, 28 Nov 2022 16:29:04 +0800 Subject: [PATCH 11/15] fix ut --- dm/relay/binlog_writer_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dm/relay/binlog_writer_test.go b/dm/relay/binlog_writer_test.go index 91e34a05ac6..5e4f462ba26 100644 --- a/dm/relay/binlog_writer_test.go +++ b/dm/relay/binlog_writer_test.go @@ -117,6 +117,8 @@ func (t *testBinlogWriterSuite) TestWrite() { err = w.Write(data1) require.NoError(t.T(), err) + err = w.Flush() + require.NoError(t.T(), err) w.file = nil // write data again From 63dfc31f14596c174667ec8beaf04b136d508b9a Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Mon, 28 Nov 2022 18:07:15 +0800 Subject: [PATCH 12/15] fix ut --- dm/relay/binlog_writer_test.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/dm/relay/binlog_writer_test.go b/dm/relay/binlog_writer_test.go index 5e4f462ba26..347d8ac4ea0 100644 --- a/dm/relay/binlog_writer_test.go +++ b/dm/relay/binlog_writer_test.go @@ -120,11 +120,14 @@ func (t *testBinlogWriterSuite) TestWrite() { err = w.Flush() require.NoError(t.T(), err) - w.file = nil + require.NoError(t.T(), w.file.Close()) // write data again data2 := []byte("another-data") - err = w.Write(data2) - require.True(t.T(), terror.ErrRelayWriterNotOpened.Equal(err)) - require.True(t.T(), terror.ErrRelayWriterNotOpened.Equal(w.Close())) + // we cannot determine the error is caused by `Write` or `Flush` + // nolint:errcheck + w.Write(data2) + // nolint:errcheck + w.Flush() + require.True(t.T(), terror.ErrBinlogWriterWriteDataLen.Equal(w.Close())) } } From 71a78960f5f810285787709bdaf27f7627f8a830 Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Tue, 29 Nov 2022 13:58:44 +0800 Subject: [PATCH 13/15] address comment --- dm/relay/binlog_writer.go | 5 ++--- dm/relay/binlog_writer_test.go | 2 +- dm/relay/relay_writer.go | 2 +- dm/relay/relay_writer_test.go | 2 +- 4 files changed, 5 insertions(+), 6 deletions(-) diff --git a/dm/relay/binlog_writer.go b/dm/relay/binlog_writer.go index 68a2ececa78..f2a1f9dd5f0 100644 --- a/dm/relay/binlog_writer.go +++ b/dm/relay/binlog_writer.go @@ -21,7 +21,6 @@ import ( "path/filepath" "sync" - "github.com/pingcap/errors" "github.com/pingcap/tiflow/dm/pkg/log" "github.com/pingcap/tiflow/dm/pkg/terror" "go.uber.org/atomic" @@ -93,7 +92,7 @@ func (w *BinlogWriter) run() { } if w.file == nil { - w.err.CompareAndSwap(nilErr, terror.ErrRelayWriterNotOpened.Delegate(errors.New("file not opened"))) + w.err.CompareAndSwap(nilErr, terror.ErrRelayWriterNotOpened.Generate()) errOccurs = true return } @@ -186,7 +185,7 @@ func (w *BinlogWriter) Close() error { func (w *BinlogWriter) Write(rawData []byte) error { if w.file == nil { - return terror.ErrRelayWriterNotOpened.Delegate(errors.New("file not opened")) + return terror.ErrRelayWriterNotOpened.Generate() } w.input <- rawData w.offset.Add(int64(len(rawData))) diff --git a/dm/relay/binlog_writer_test.go b/dm/relay/binlog_writer_test.go index 347d8ac4ea0..140b2ae6227 100644 --- a/dm/relay/binlog_writer_test.go +++ b/dm/relay/binlog_writer_test.go @@ -61,7 +61,7 @@ func (t *testBinlogWriterSuite) TestWrite() { // not opened w := NewBinlogWriter(log.L(), dir) err := w.Write(data1) - require.Contains(t.T(), err.Error(), "not opened") + require.Contains(t.T(), err.Error(), "no underlying writer opened") // open non exist dir err = w.Open("not-exist-uuid", "bin.000001") diff --git a/dm/relay/relay_writer.go b/dm/relay/relay_writer.go index 4675c5ef25f..a8a8c0537d5 100644 --- a/dm/relay/relay_writer.go +++ b/dm/relay/relay_writer.go @@ -154,7 +154,7 @@ func (w *FileWriter) handleFormatDescriptionEvent(ev *replication.BinlogEvent) ( } err = w.Flush() if err != nil { - return WResult{}, terror.Annotatef(err, "write binlog file header for %s", fullName) + return WResult{}, terror.Annotatef(err, "flush binlog file for %s", fullName) } } diff --git a/dm/relay/relay_writer_test.go b/dm/relay/relay_writer_test.go index 3b0a7e33a95..9b507f7e182 100644 --- a/dm/relay/relay_writer_test.go +++ b/dm/relay/relay_writer_test.go @@ -237,7 +237,7 @@ func (t *testFileWriterSuite) TestRotateEventWithFormatDescriptionEvent(c *check defer w1.Close() w1.Init(uuid, filename) _, err = w1.WriteEvent(rotateEv) - c.Assert(err, check.ErrorMatches, ".*file not opened.*") + c.Assert(err, check.ErrorMatches, ".*no underlying writer opened") // 2. fake RotateEvent before FormatDescriptionEvent relayDir = c.MkDir() // use a new relay directory From 570fdb658c065812e2f4fbbd9eb3f97935b86371 Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Tue, 29 Nov 2022 16:19:52 +0800 Subject: [PATCH 14/15] Revert "add fileOffset" This reverts commit 4453b50173b235d269345229d054d9636a1135e1. --- dm/relay/binlog_writer.go | 23 ++++++++--------------- 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/dm/relay/binlog_writer.go b/dm/relay/binlog_writer.go index f2a1f9dd5f0..b383bda0fcb 100644 --- a/dm/relay/binlog_writer.go +++ b/dm/relay/binlog_writer.go @@ -37,15 +37,12 @@ var nilErr error // BinlogWriter is a binlog event writer which writes binlog events to a file. // Open/Write/Close cannot be called concurrently. type BinlogWriter struct { - // offset means the offset we add to binlog writer - offset atomic.Int64 - // fileOffset means the offset we finish write to the file - fileOffset atomic.Int64 - file *os.File - relayDir string - uuid atomic.String - filename atomic.String - err atomic.Error + offset atomic.Int64 + file *os.File + relayDir string + uuid atomic.String + filename atomic.String + err atomic.Error logger log.Logger @@ -102,7 +99,6 @@ func (w *BinlogWriter) run() { errOccurs = true return } - w.fileOffset.Add(int64(n)) buf.Reset() } @@ -141,9 +137,7 @@ func (w *BinlogWriter) Open(uuid, filename string) error { return terror.ErrBinlogWriterGetFileStat.Delegate(err, f.Name()) } - size := fs.Size() - w.offset.Store(size) - w.fileOffset.Store(size) + w.offset.Store(fs.Size()) w.file = f w.uuid.Store(uuid) w.filename.Store(filename) @@ -176,7 +170,6 @@ func (w *BinlogWriter) Close() error { w.file = nil w.offset.Store(0) - w.fileOffset.Store(0) w.uuid.Store("") w.filename.Store("") w.input = nil @@ -213,5 +206,5 @@ func (w *BinlogWriter) Offset() int64 { } func (w *BinlogWriter) isActive(uuid, filename string) (bool, int64) { - return uuid == w.uuid.Load() && filename == w.filename.Load(), w.fileOffset.Load() + return uuid == w.uuid.Load() && filename == w.filename.Load(), w.offset.Load() } From 4d0a3241e6c7f09bf6a1bfd8ca31553c6cee496c Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Tue, 29 Nov 2022 16:21:23 +0800 Subject: [PATCH 15/15] Revert "fix ut" This reverts commit 2a44a25a9d3d7c744004b175c7dcfc22ce09fbcb. --- dm/relay/local_reader_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/dm/relay/local_reader_test.go b/dm/relay/local_reader_test.go index df6c33fbd81..0480e92b6cf 100644 --- a/dm/relay/local_reader_test.go +++ b/dm/relay/local_reader_test.go @@ -79,7 +79,6 @@ func (t *testReaderSuite) setActiveRelayLog(r Process, uuid, filename string, of writer.out.uuid.Store(uuid) writer.out.filename.Store(filename) writer.out.offset.Store(offset) - writer.out.fileOffset.Store(offset) } func (t *testReaderSuite) createBinlogFileParseState(c *C, relayLogDir, relayLogFile string, offset int64, possibleLast bool) *binlogFileParseState {