Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink(cdc): only check sink stuck for MQ sinks (#9742) #9747

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 17 additions & 9 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,12 @@ func (m *SinkManager) run(ctx context.Context, warnings ...chan<- error) (err er
}
}

func (m *SinkManager) needsStuckCheck() bool {
m.sinkFactory.Lock()
defer m.sinkFactory.Unlock()
return m.sinkFactory.f != nil && m.sinkFactory.f.Category() == factory.CategoryMQ
}

func (m *SinkManager) initSinkFactory() (chan error, uint64) {
m.sinkFactory.Lock()
defer m.sinkFactory.Unlock()
Expand Down Expand Up @@ -989,15 +995,17 @@ func (m *SinkManager) GetTableStats(tableID model.TableID) TableStats {
}
stuckCheck := time.Duration(advanceTimeoutInSec) * time.Second

isStuck, sinkVersion := tableSink.sinkMaybeStuck(stuckCheck)
if isStuck && m.putSinkFactoryError(errors.New("table sink stuck"), sinkVersion) {
log.Warn("Table checkpoint is stuck too long, will restart the sink backend",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Int64("tableID", tableID),
zap.Any("checkpointTs", checkpointTs),
zap.Float64("stuckCheck", stuckCheck.Seconds()),
zap.Uint64("factoryVersion", version))
if m.needsStuckCheck() {
isStuck, sinkVersion := tableSink.sinkMaybeStuck(stuckCheck)
if isStuck && m.putSinkFactoryError(errors.New("table sink stuck"), sinkVersion) {
log.Warn("Table checkpoint is stuck too long, will restart the sink backend",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Int64("tableID", tableID),
zap.Any("checkpointTs", checkpointTs),
zap.Float64("stuckCheck", stuckCheck.Seconds()),
zap.Uint64("factoryVersion", sinkVersion))
}
}

var resolvedTs model.Ts
Expand Down
15 changes: 15 additions & 0 deletions cdc/processor/sinkmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,3 +365,18 @@ func TestSinkManagerRunWithErrors(t *testing.T) {
log.Panic("must get an error instead of a timeout")
}
}

func TestSinkManagerNeedsStuckCheck(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
errCh := make(chan error, 16)
changefeedInfo := getChangefeedInfo()
manager, _ := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, errCh)
defer func() {
cancel()
manager.Close()
}()

require.False(t, manager.needsStuckCheck())
}
4 changes: 2 additions & 2 deletions cdc/processor/sinkmanager/table_sink_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"go.uber.org/zap"
)

var version uint64 = 0
var tableSinkWrapperVersion uint64 = 0

// tableSinkWrapper is a wrapper of TableSink, it is used in SinkManager to manage TableSink.
// Because in the SinkManager, we write data to TableSink and RedoManager concurrently,
Expand Down Expand Up @@ -110,7 +110,7 @@ func newTableSinkWrapper(
genReplicateTs func(ctx context.Context) (model.Ts, error),
) *tableSinkWrapper {
res := &tableSinkWrapper{
version: atomic.AddUint64(&version, 1),
version: atomic.AddUint64(&tableSinkWrapperVersion, 1),
changefeed: changefeed,
tableID: tableID,
tableSinkCreater: tableSinkCreater,
Expand Down
27 changes: 27 additions & 0 deletions cdc/sinkv2/eventsink/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,20 @@ import (
"github.com/prometheus/client_golang/prometheus"
)

// Category is for different DML sink categories.
type Category = int

const (
// CategoryTxn is for Txn sink.
CategoryTxn Category = 1
// CategoryMQ is for MQ sink.
CategoryMQ = 2
// CategoryCloudStorage is for CloudStorage sink.
CategoryCloudStorage = 3
// CategoryBlackhole is for Blackhole sink.
CategoryBlackhole = 4
)

// SinkFactory is the factory of sink.
// It is responsible for creating sink and closing it.
// Because there is no way to convert the eventsink.EventSink[*model.RowChangedEvent]
Expand All @@ -41,6 +55,7 @@ type SinkFactory struct {
sinkType sink.Type
rowSink eventsink.EventSink[*model.RowChangedEvent]
txnSink eventsink.EventSink[*model.SingleTableTxn]
category Category
}

// New creates a new SinkFactory by schema.
Expand All @@ -64,6 +79,7 @@ func New(ctx context.Context,
}
s.txnSink = txnSink
s.sinkType = sink.TxnSink
s.category = CategoryTxn
case sink.KafkaScheme, sink.KafkaSSLScheme:
mqs, err := mq.NewKafkaDMLSink(ctx, sinkURI, cfg, errCh,
kafka.NewSaramaAdminClient, dmlproducer.NewKafkaDMLProducer)
Expand All @@ -72,17 +88,20 @@ func New(ctx context.Context,
}
s.txnSink = mqs
s.sinkType = sink.TxnSink
s.category = CategoryMQ
case sink.S3Scheme, sink.FileScheme, sink.GCSScheme, sink.GSScheme, sink.AzblobScheme, sink.AzureScheme, sink.CloudStorageNoopScheme:
storageSink, err := cloudstorage.NewCloudStorageSink(ctx, sinkURI, cfg, errCh)
if err != nil {
return nil, err
}
s.txnSink = storageSink
s.sinkType = sink.TxnSink
s.category = CategoryCloudStorage
case sink.BlackHoleScheme:
bs := blackhole.New()
s.rowSink = bs
s.sinkType = sink.RowSink
s.category = CategoryBlackhole
default:
return nil,
cerror.ErrSinkURIInvalid.GenWithStack("the sink scheme (%s) is not supported", schema)
Expand Down Expand Up @@ -146,3 +165,11 @@ func (s *SinkFactory) Close() {
panic("unknown sink type")
}
}

// Category returns category of s.
func (s *SinkFactory) Category() Category {
if s.category == 0 {
panic("should never happen")
}
return s.category
}
5 changes: 3 additions & 2 deletions tests/integration_tests/hang_sink_suicide/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ function run() {
}

trap stop_tidb_cluster EXIT
run $*
check_logs $WORK_DIR
# TODO: update the case to use kafka sink instead of mysql sink.
# run $*
# check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"
Loading