From a2d5f425c01930fdcc9da1c180e917a1a7608eb6 Mon Sep 17 00:00:00 2001 From: qupeng Date: Thu, 14 Sep 2023 13:30:09 +0800 Subject: [PATCH 1/3] This is an automated cherry-pick of #9742 Signed-off-by: ti-chi-bot --- cdc/processor/sinkmanager/manager.go | 20 +++++++ cdc/processor/sinkmanager/manager_test.go | 15 ++++++ .../sinkmanager/table_sink_wrapper.go | 4 +- cdc/sinkv2/eventsink/factory/factory.go | 53 +++++++++++++++++++ .../hang_sink_suicide/run.sh | 5 +- 5 files changed, 93 insertions(+), 4 deletions(-) diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index 7e07f051ba4..c73bfce6ca9 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -314,6 +314,12 @@ func (m *SinkManager) run(ctx context.Context, warnings ...chan<- error) (err er } } +func (m *SinkManager) needsStuckCheck() bool { + m.sinkFactory.Lock() + defer m.sinkFactory.Unlock() + return m.sinkFactory.f != nil && m.sinkFactory.f.Category() == factory.CategoryMQ +} + func (m *SinkManager) initSinkFactory() (chan error, uint64) { m.sinkFactory.Lock() defer m.sinkFactory.Unlock() @@ -989,6 +995,7 @@ func (m *SinkManager) GetTableStats(tableID model.TableID) TableStats { } stuckCheck := time.Duration(advanceTimeoutInSec) * time.Second +<<<<<<< HEAD isStuck, sinkVersion := tableSink.sinkMaybeStuck(stuckCheck) if isStuck && m.putSinkFactoryError(errors.New("table sink stuck"), sinkVersion) { log.Warn("Table checkpoint is stuck too long, will restart the sink backend", @@ -998,6 +1005,19 @@ func (m *SinkManager) GetTableStats(tableID model.TableID) TableStats { zap.Any("checkpointTs", checkpointTs), zap.Float64("stuckCheck", stuckCheck.Seconds()), zap.Uint64("factoryVersion", version)) +======= + if m.needsStuckCheck() { + isStuck, sinkVersion := tableSink.sinkMaybeStuck(stuckCheck) + if isStuck && m.putSinkFactoryError(errors.New("table sink stuck"), sinkVersion) { + log.Warn("Table checkpoint is stuck too long, will restart the sink backend", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), + zap.Stringer("span", &span), + zap.Any("checkpointTs", checkpointTs), + zap.Float64("stuckCheck", stuckCheck.Seconds()), + zap.Uint64("factoryVersion", sinkVersion)) + } +>>>>>>> 141c9a782f (sink(cdc): only check sink stuck for MQ sinks (#9742)) } var resolvedTs model.Ts diff --git a/cdc/processor/sinkmanager/manager_test.go b/cdc/processor/sinkmanager/manager_test.go index 63d79185c2f..d1ce99256a4 100644 --- a/cdc/processor/sinkmanager/manager_test.go +++ b/cdc/processor/sinkmanager/manager_test.go @@ -365,3 +365,18 @@ func TestSinkManagerRunWithErrors(t *testing.T) { log.Panic("must get an error instead of a timeout") } } + +func TestSinkManagerNeedsStuckCheck(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + errCh := make(chan error, 16) + changefeedInfo := getChangefeedInfo() + manager, _, _ := CreateManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, errCh) + defer func() { + cancel() + manager.Close() + }() + + require.False(t, manager.needsStuckCheck()) +} diff --git a/cdc/processor/sinkmanager/table_sink_wrapper.go b/cdc/processor/sinkmanager/table_sink_wrapper.go index 7dc99436a4f..c1372d27345 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper.go @@ -33,7 +33,7 @@ import ( "go.uber.org/zap" ) -var version uint64 = 0 +var tableSinkWrapperVersion uint64 = 0 // tableSinkWrapper is a wrapper of TableSink, it is used in SinkManager to manage TableSink. // Because in the SinkManager, we write data to TableSink and RedoManager concurrently, @@ -110,7 +110,7 @@ func newTableSinkWrapper( genReplicateTs func(ctx context.Context) (model.Ts, error), ) *tableSinkWrapper { res := &tableSinkWrapper{ - version: atomic.AddUint64(&version, 1), + version: atomic.AddUint64(&tableSinkWrapperVersion, 1), changefeed: changefeed, tableID: tableID, tableSinkCreater: tableSinkCreater, diff --git a/cdc/sinkv2/eventsink/factory/factory.go b/cdc/sinkv2/eventsink/factory/factory.go index 06f3d49baf1..ead5362956a 100644 --- a/cdc/sinkv2/eventsink/factory/factory.go +++ b/cdc/sinkv2/eventsink/factory/factory.go @@ -32,15 +32,35 @@ import ( "github.com/prometheus/client_golang/prometheus" ) +// Category is for different DML sink categories. +type Category = int + +const ( + // CategoryTxn is for Txn sink. + CategoryTxn Category = 1 + // CategoryMQ is for MQ sink. + CategoryMQ = 2 + // CategoryCloudStorage is for CloudStorage sink. + CategoryCloudStorage = 3 + // CategoryBlackhole is for Blackhole sink. + CategoryBlackhole = 4 +) + // SinkFactory is the factory of sink. // It is responsible for creating sink and closing it. // Because there is no way to convert the eventsink.EventSink[*model.RowChangedEvent] // to eventsink.EventSink[eventsink.TableEvent]. // So we have to use this factory to create and store the sink. type SinkFactory struct { +<<<<<<< HEAD:cdc/sinkv2/eventsink/factory/factory.go sinkType sink.Type rowSink eventsink.EventSink[*model.RowChangedEvent] txnSink eventsink.EventSink[*model.SingleTableTxn] +======= + rowSink dmlsink.EventSink[*model.RowChangedEvent] + txnSink dmlsink.EventSink[*model.SingleTableTxn] + category Category +>>>>>>> 141c9a782f (sink(cdc): only check sink stuck for MQ sinks (#9742)):cdc/sink/dmlsink/factory/factory.go } // New creates a new SinkFactory by schema. @@ -63,7 +83,11 @@ func New(ctx context.Context, return nil, err } s.txnSink = txnSink +<<<<<<< HEAD:cdc/sinkv2/eventsink/factory/factory.go s.sinkType = sink.TxnSink +======= + s.category = CategoryTxn +>>>>>>> 141c9a782f (sink(cdc): only check sink stuck for MQ sinks (#9742)):cdc/sink/dmlsink/factory/factory.go case sink.KafkaScheme, sink.KafkaSSLScheme: mqs, err := mq.NewKafkaDMLSink(ctx, sinkURI, cfg, errCh, kafka.NewSaramaAdminClient, dmlproducer.NewKafkaDMLProducer) @@ -71,18 +95,39 @@ func New(ctx context.Context, return nil, err } s.txnSink = mqs +<<<<<<< HEAD:cdc/sinkv2/eventsink/factory/factory.go s.sinkType = sink.TxnSink +======= + s.category = CategoryMQ +>>>>>>> 141c9a782f (sink(cdc): only check sink stuck for MQ sinks (#9742)):cdc/sink/dmlsink/factory/factory.go case sink.S3Scheme, sink.FileScheme, sink.GCSScheme, sink.GSScheme, sink.AzblobScheme, sink.AzureScheme, sink.CloudStorageNoopScheme: storageSink, err := cloudstorage.NewCloudStorageSink(ctx, sinkURI, cfg, errCh) if err != nil { return nil, err } s.txnSink = storageSink +<<<<<<< HEAD:cdc/sinkv2/eventsink/factory/factory.go s.sinkType = sink.TxnSink +======= + s.category = CategoryCloudStorage +>>>>>>> 141c9a782f (sink(cdc): only check sink stuck for MQ sinks (#9742)):cdc/sink/dmlsink/factory/factory.go case sink.BlackHoleScheme: bs := blackhole.New() s.rowSink = bs +<<<<<<< HEAD:cdc/sinkv2/eventsink/factory/factory.go s.sinkType = sink.RowSink +======= + s.category = CategoryBlackhole + case sink.PulsarScheme: + mqs, err := mq.NewPulsarDMLSink(ctx, changefeedID, sinkURI, cfg, errCh, + manager.NewPulsarTopicManager, + pulsarConfig.NewCreatorFactory, dmlproducer.NewPulsarDMLProducer) + if err != nil { + return nil, err + } + s.txnSink = mqs + s.category = CategoryMQ +>>>>>>> 141c9a782f (sink(cdc): only check sink stuck for MQ sinks (#9742)):cdc/sink/dmlsink/factory/factory.go default: return nil, cerror.ErrSinkURIInvalid.GenWithStack("the sink scheme (%s) is not supported", schema) @@ -146,3 +191,11 @@ func (s *SinkFactory) Close() { panic("unknown sink type") } } + +// Category returns category of s. +func (s *SinkFactory) Category() Category { + if s.category == 0 { + panic("should never happen") + } + return s.category +} diff --git a/tests/integration_tests/hang_sink_suicide/run.sh b/tests/integration_tests/hang_sink_suicide/run.sh index 3489df74e05..e4e663cb975 100644 --- a/tests/integration_tests/hang_sink_suicide/run.sh +++ b/tests/integration_tests/hang_sink_suicide/run.sh @@ -41,6 +41,7 @@ function run() { } trap stop_tidb_cluster EXIT -run $* -check_logs $WORK_DIR +# TODO: update the case to use kafka sink instead of mysql sink. +# run $* +# check_logs $WORK_DIR echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" From 2f747cfb39baa7e20bb46767172d812aae8ae1a6 Mon Sep 17 00:00:00 2001 From: qupeng Date: Thu, 14 Sep 2023 13:45:20 +0800 Subject: [PATCH 2/3] fix conflicts Signed-off-by: qupeng --- cdc/processor/sinkmanager/manager.go | 14 +------------ cdc/sinkv2/eventsink/factory/factory.go | 26 ------------------------- 2 files changed, 1 insertion(+), 39 deletions(-) diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index c73bfce6ca9..bf92f812f2f 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -995,29 +995,17 @@ func (m *SinkManager) GetTableStats(tableID model.TableID) TableStats { } stuckCheck := time.Duration(advanceTimeoutInSec) * time.Second -<<<<<<< HEAD - isStuck, sinkVersion := tableSink.sinkMaybeStuck(stuckCheck) - if isStuck && m.putSinkFactoryError(errors.New("table sink stuck"), sinkVersion) { - log.Warn("Table checkpoint is stuck too long, will restart the sink backend", - zap.String("namespace", m.changefeedID.Namespace), - zap.String("changefeed", m.changefeedID.ID), - zap.Int64("tableID", tableID), - zap.Any("checkpointTs", checkpointTs), - zap.Float64("stuckCheck", stuckCheck.Seconds()), - zap.Uint64("factoryVersion", version)) -======= if m.needsStuckCheck() { isStuck, sinkVersion := tableSink.sinkMaybeStuck(stuckCheck) if isStuck && m.putSinkFactoryError(errors.New("table sink stuck"), sinkVersion) { log.Warn("Table checkpoint is stuck too long, will restart the sink backend", zap.String("namespace", m.changefeedID.Namespace), zap.String("changefeed", m.changefeedID.ID), - zap.Stringer("span", &span), + zap.Int64("tableID", tableID), zap.Any("checkpointTs", checkpointTs), zap.Float64("stuckCheck", stuckCheck.Seconds()), zap.Uint64("factoryVersion", sinkVersion)) } ->>>>>>> 141c9a782f (sink(cdc): only check sink stuck for MQ sinks (#9742)) } var resolvedTs model.Ts diff --git a/cdc/sinkv2/eventsink/factory/factory.go b/cdc/sinkv2/eventsink/factory/factory.go index ead5362956a..b91fa6fda7a 100644 --- a/cdc/sinkv2/eventsink/factory/factory.go +++ b/cdc/sinkv2/eventsink/factory/factory.go @@ -52,15 +52,10 @@ const ( // to eventsink.EventSink[eventsink.TableEvent]. // So we have to use this factory to create and store the sink. type SinkFactory struct { -<<<<<<< HEAD:cdc/sinkv2/eventsink/factory/factory.go sinkType sink.Type rowSink eventsink.EventSink[*model.RowChangedEvent] txnSink eventsink.EventSink[*model.SingleTableTxn] -======= - rowSink dmlsink.EventSink[*model.RowChangedEvent] - txnSink dmlsink.EventSink[*model.SingleTableTxn] category Category ->>>>>>> 141c9a782f (sink(cdc): only check sink stuck for MQ sinks (#9742)):cdc/sink/dmlsink/factory/factory.go } // New creates a new SinkFactory by schema. @@ -83,11 +78,8 @@ func New(ctx context.Context, return nil, err } s.txnSink = txnSink -<<<<<<< HEAD:cdc/sinkv2/eventsink/factory/factory.go s.sinkType = sink.TxnSink -======= s.category = CategoryTxn ->>>>>>> 141c9a782f (sink(cdc): only check sink stuck for MQ sinks (#9742)):cdc/sink/dmlsink/factory/factory.go case sink.KafkaScheme, sink.KafkaSSLScheme: mqs, err := mq.NewKafkaDMLSink(ctx, sinkURI, cfg, errCh, kafka.NewSaramaAdminClient, dmlproducer.NewKafkaDMLProducer) @@ -95,39 +87,21 @@ func New(ctx context.Context, return nil, err } s.txnSink = mqs -<<<<<<< HEAD:cdc/sinkv2/eventsink/factory/factory.go s.sinkType = sink.TxnSink -======= s.category = CategoryMQ ->>>>>>> 141c9a782f (sink(cdc): only check sink stuck for MQ sinks (#9742)):cdc/sink/dmlsink/factory/factory.go case sink.S3Scheme, sink.FileScheme, sink.GCSScheme, sink.GSScheme, sink.AzblobScheme, sink.AzureScheme, sink.CloudStorageNoopScheme: storageSink, err := cloudstorage.NewCloudStorageSink(ctx, sinkURI, cfg, errCh) if err != nil { return nil, err } s.txnSink = storageSink -<<<<<<< HEAD:cdc/sinkv2/eventsink/factory/factory.go s.sinkType = sink.TxnSink -======= s.category = CategoryCloudStorage ->>>>>>> 141c9a782f (sink(cdc): only check sink stuck for MQ sinks (#9742)):cdc/sink/dmlsink/factory/factory.go case sink.BlackHoleScheme: bs := blackhole.New() s.rowSink = bs -<<<<<<< HEAD:cdc/sinkv2/eventsink/factory/factory.go s.sinkType = sink.RowSink -======= s.category = CategoryBlackhole - case sink.PulsarScheme: - mqs, err := mq.NewPulsarDMLSink(ctx, changefeedID, sinkURI, cfg, errCh, - manager.NewPulsarTopicManager, - pulsarConfig.NewCreatorFactory, dmlproducer.NewPulsarDMLProducer) - if err != nil { - return nil, err - } - s.txnSink = mqs - s.category = CategoryMQ ->>>>>>> 141c9a782f (sink(cdc): only check sink stuck for MQ sinks (#9742)):cdc/sink/dmlsink/factory/factory.go default: return nil, cerror.ErrSinkURIInvalid.GenWithStack("the sink scheme (%s) is not supported", schema) From 8f5a24652ac91317f8720277c06e93a8cd6624a9 Mon Sep 17 00:00:00 2001 From: qupeng Date: Thu, 14 Sep 2023 14:16:33 +0800 Subject: [PATCH 3/3] fix Signed-off-by: qupeng --- cdc/processor/sinkmanager/manager_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdc/processor/sinkmanager/manager_test.go b/cdc/processor/sinkmanager/manager_test.go index d1ce99256a4..d33fcb5aeae 100644 --- a/cdc/processor/sinkmanager/manager_test.go +++ b/cdc/processor/sinkmanager/manager_test.go @@ -372,7 +372,7 @@ func TestSinkManagerNeedsStuckCheck(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) errCh := make(chan error, 16) changefeedInfo := getChangefeedInfo() - manager, _, _ := CreateManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, errCh) + manager, _ := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, errCh) defer func() { cancel() manager.Close()