From 664e17a18578eaf7dabe160b404ca55ae3f99d7d Mon Sep 17 00:00:00 2001 From: qupeng Date: Tue, 11 Jun 2024 11:31:30 +0800 Subject: [PATCH 1/2] cdc: log slow conflict detect every 60s (#11251) close pingcap/tiflow#11271 --- cdc/sinkv2/eventsink/txn/worker.go | 34 +++++++++++++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/cdc/sinkv2/eventsink/txn/worker.go b/cdc/sinkv2/eventsink/txn/worker.go index c387cec835f..57282121b09 100644 --- a/cdc/sinkv2/eventsink/txn/worker.go +++ b/cdc/sinkv2/eventsink/txn/worker.go @@ -46,6 +46,8 @@ type worker struct { flushInterval time.Duration hasPending bool postTxnExecutedCallbacks []func() + + lastSlowConflictDetectLog map[model.TableID]time.Time } func newWorker(ctx context.Context, ID int, backend backend, workerCount int) *worker { @@ -68,6 +70,8 @@ func newWorker(ctx context.Context, ID int, backend backend, workerCount int) *w flushInterval: backend.MaxFlushInterval(), hasPending: false, postTxnExecutedCallbacks: make([]func(), 0, 1024), + + lastSlowConflictDetectLog: make(map[model.TableID]time.Time), } } @@ -85,6 +89,9 @@ func (w *worker) run(txnCh <-chan causality.TxnWithNotifier[*txnEvent]) error { zap.String("changefeedID", w.changefeed), zap.Int("workerID", w.ID)) + cleanSlowLogHistory := time.NewTicker(time.Hour) + defer cleanSlowLogHistory.Stop() + start := time.Now() for { select { @@ -93,6 +100,15 @@ func (w *worker) run(txnCh <-chan causality.TxnWithNotifier[*txnEvent]) error { zap.String("changefeedID", w.changefeed), zap.Int("workerID", w.ID)) return nil + case <-cleanSlowLogHistory.C: + lastSlowConflictDetectLog := w.lastSlowConflictDetectLog + w.lastSlowConflictDetectLog = make(map[model.TableID]time.Time) + now := time.Now() + for tableID, lastLog := range lastSlowConflictDetectLog { + if now.Sub(lastLog) <= time.Minute { + w.lastSlowConflictDetectLog[tableID] = lastLog + } + } case txn := <-txnCh: // we get the data from txnCh.out until no more data here or reach the state that can be flushed. // If no more data in txnCh.out, and also not reach the state that can be flushed, @@ -149,8 +165,24 @@ func (w *worker) onEvent(txn *txnEvent, postTxnExecuted func()) bool { return false } - w.metricConflictDetectDuration.Observe(txn.conflictResolved.Sub(txn.start).Seconds()) + conflictDetectTime := txn.conflictResolved.Sub(txn.start).Seconds() + w.metricConflictDetectDuration.Observe(conflictDetectTime) w.metricQueueDuration.Observe(time.Since(txn.start).Seconds()) + + // Log tables which conflict detect time larger than 1 minute. + if conflictDetectTime > float64(60) { + now := time.Now() + // Log slow conflict detect tables every minute. + if lastLog, ok := w.lastSlowConflictDetectLog[txn.Event.PhysicalTableID]; !ok || now.Sub(lastLog) > time.Minute { + log.Warn("Transaction dmlSink finds a slow transaction in conflict detector", + zap.String("changefeedID", w.changefeed), + zap.Int("workerID", w.ID), + zap.Int64("TableID", txn.Event.PhysicalTableID), + zap.Float64("seconds", conflictDetectTime)) + w.lastSlowConflictDetectLog[txn.Event.PhysicalTableID] = now + } + } + w.metricTxnWorkerHandledRows.Add(float64(len(txn.Event.Rows))) w.postTxnExecutedCallbacks = append(w.postTxnExecutedCallbacks, postTxnExecuted) return w.backend.OnTxnEvent(txn.TxnCallbackableEvent) From 955a5aa497e6d45e39858b548c508d6b2c750389 Mon Sep 17 00:00:00 2001 From: qupeng Date: Tue, 11 Jun 2024 14:29:41 +0800 Subject: [PATCH 2/2] fix Signed-off-by: qupeng --- cdc/sinkv2/eventsink/txn/worker.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/cdc/sinkv2/eventsink/txn/worker.go b/cdc/sinkv2/eventsink/txn/worker.go index 57282121b09..b85cafa5c4f 100644 --- a/cdc/sinkv2/eventsink/txn/worker.go +++ b/cdc/sinkv2/eventsink/txn/worker.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/contextutil" + "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sinkv2/metrics/txn" "github.com/pingcap/tiflow/cdc/sinkv2/tablesink/state" "github.com/pingcap/tiflow/pkg/causality" @@ -173,13 +174,13 @@ func (w *worker) onEvent(txn *txnEvent, postTxnExecuted func()) bool { if conflictDetectTime > float64(60) { now := time.Now() // Log slow conflict detect tables every minute. - if lastLog, ok := w.lastSlowConflictDetectLog[txn.Event.PhysicalTableID]; !ok || now.Sub(lastLog) > time.Minute { + if lastLog, ok := w.lastSlowConflictDetectLog[txn.Event.TableInfo.ID]; !ok || now.Sub(lastLog) > time.Minute { log.Warn("Transaction dmlSink finds a slow transaction in conflict detector", zap.String("changefeedID", w.changefeed), zap.Int("workerID", w.ID), - zap.Int64("TableID", txn.Event.PhysicalTableID), + zap.Int64("TableID", txn.Event.TableInfo.ID), zap.Float64("seconds", conflictDetectTime)) - w.lastSlowConflictDetectLog[txn.Event.PhysicalTableID] = now + w.lastSlowConflictDetectLog[txn.Event.Table.TableID] = now } }