Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#11339
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
asddongmen authored and ti-chi-bot committed Jul 4, 2024
1 parent 1ecce1d commit 90ed4d8
Show file tree
Hide file tree
Showing 10 changed files with 173 additions and 99 deletions.
29 changes: 10 additions & 19 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,16 @@ import (
"github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine"
"github.com/pingcap/tiflow/cdc/processor/tablepb"
"github.com/pingcap/tiflow/cdc/redo"
<<<<<<< HEAD
"github.com/pingcap/tiflow/cdc/sinkv2/eventsink/factory"
"github.com/pingcap/tiflow/cdc/sinkv2/tablesink"
"github.com/pingcap/tiflow/pkg/config"
=======
"github.com/pingcap/tiflow/cdc/sink/dmlsink/factory"
tablesinkmetrics "github.com/pingcap/tiflow/cdc/sink/metrics/tablesink"
"github.com/pingcap/tiflow/cdc/sink/tablesink"
pconfig "github.com/pingcap/tiflow/pkg/config"
>>>>>>> 695f93240c (processor: fix a bug that will cause processor Tick get stuck when downstream is Kafka (#11339))
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/pingcap/tiflow/pkg/upstream"
Expand Down Expand Up @@ -345,12 +352,6 @@ func (m *SinkManager) run(ctx context.Context, warnings ...chan<- error) (err er
}
}

func (m *SinkManager) needsStuckCheck() bool {
m.sinkFactory.Lock()
defer m.sinkFactory.Unlock()
return m.sinkFactory.f != nil && m.sinkFactory.f.Category() == factory.CategoryMQ
}

func (m *SinkManager) initSinkFactory() (chan error, uint64) {
m.sinkFactory.Lock()
defer m.sinkFactory.Unlock()
Expand Down Expand Up @@ -418,19 +419,6 @@ func (m *SinkManager) clearSinkFactory() {
}
}

func (m *SinkManager) putSinkFactoryError(err error, version uint64) (success bool) {
m.sinkFactory.Lock()
defer m.sinkFactory.Unlock()
if version == m.sinkFactory.version {
select {
case m.sinkFactory.errors <- err:
default:
}
return true
}
return false
}

func (m *SinkManager) startSinkWorkers(ctx context.Context, eg *errgroup.Group, splitTxn bool) {
for i := 0; i < sinkWorkerNum; i++ {
w := newSinkWorker(m.changefeedID, m.sourceManager,
Expand Down Expand Up @@ -1042,6 +1030,7 @@ func (m *SinkManager) GetTableStats(tableID model.TableID) TableStats {
m.sinkMemQuota.Release(tableID, checkpointTs)
m.redoMemQuota.Release(tableID, checkpointTs)

<<<<<<< HEAD
advanceTimeoutInSec := m.changefeedInfo.Config.Sink.AdvanceTimeoutInSec
if advanceTimeoutInSec <= 0 {
advanceTimeoutInSec = config.DefaultAdvanceTimeoutInSec
Expand All @@ -1061,6 +1050,8 @@ func (m *SinkManager) GetTableStats(tableID model.TableID) TableStats {
}
}

=======
>>>>>>> 695f93240c (processor: fix a bug that will cause processor Tick get stuck when downstream is Kafka (#11339))
var resolvedTs model.Ts
// If redo log is enabled, we have to use redo log's resolved ts to calculate processor's min resolved ts.
if m.redoDMLMgr != nil {
Expand Down
3 changes: 3 additions & 0 deletions cdc/processor/sinkmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ func TestSinkManagerRunWithErrors(t *testing.T) {
}
}

<<<<<<< HEAD
func TestSinkManagerNeedsStuckCheck(t *testing.T) {
t.Parallel()

Expand All @@ -404,6 +405,8 @@ func TestSinkManagerNeedsStuckCheck(t *testing.T) {
require.False(t, manager.needsStuckCheck())
}

=======
>>>>>>> 695f93240c (processor: fix a bug that will cause processor Tick get stuck when downstream is Kafka (#11339))
func TestSinkManagerRestartTableSinks(t *testing.T) {
failpoint.Enable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/SinkWorkerTaskHandlePause", "return")
defer failpoint.Disable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/SinkWorkerTaskHandlePause")
Expand Down
3 changes: 3 additions & 0 deletions cdc/processor/sinkmanager/table_sink_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,7 @@ func (t *tableSinkWrapper) cleanRangeEventCounts(upperBound engine.Position, min
return shouldClean
}

<<<<<<< HEAD
func (t *tableSinkWrapper) sinkMaybeStuck(stuckCheck time.Duration) (bool, uint64) {
t.getCheckpointTs()

Expand All @@ -473,6 +474,8 @@ func (t *tableSinkWrapper) sinkMaybeStuck(stuckCheck time.Duration) (bool, uint6

// handleRowChangedEvents uses to convert RowChangedEvents to TableSinkRowChangedEvents.
// It will deal with the old value compatibility.
=======
>>>>>>> 695f93240c (processor: fix a bug that will cause processor Tick get stuck when downstream is Kafka (#11339))
func handleRowChangedEvents(
changefeed model.ChangeFeedID, tableID model.TableID, events ...*model.PolymorphicEvent,
) ([]*model.RowChangedEvent, uint64) {
Expand Down
5 changes: 3 additions & 2 deletions cdc/processor/sinkmanager/table_sink_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"math"
"sync"
"testing"
"time"

"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/tablepb"
Expand All @@ -28,7 +27,6 @@ import (
"github.com/pingcap/tiflow/pkg/sink"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
)

type mockSink struct {
Expand Down Expand Up @@ -384,6 +382,7 @@ func TestTableSinkWrapperSinkVersion(t *testing.T) {
require.Nil(t, wrapper.tableSink.s)
require.Equal(t, wrapper.tableSink.version, uint64(0))
}
<<<<<<< HEAD

func TestTableSinkWrapperSinkInner(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -444,3 +443,5 @@ func TestTableSinkWrapperSinkInner(t *testing.T) {
isStuck, _ = wrapper.sinkMaybeStuck(100 * time.Millisecond)
require.True(t, isStuck)
}
=======
>>>>>>> 695f93240c (processor: fix a bug that will cause processor Tick get stuck when downstream is Kafka (#11339))
5 changes: 5 additions & 0 deletions pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,13 @@ type SinkConfig struct {

// AdvanceTimeoutInSec is a duration in second. If a table sink progress hasn't been
// advanced for this given duration, the sink will be canceled and re-established.
<<<<<<< HEAD
AdvanceTimeoutInSec uint `toml:"advance-timeout-in-sec" json:"advance-timeout-in-sec,omitempty"`
}
=======
// Deprecated since v8.1.1
AdvanceTimeoutInSec *uint `toml:"advance-timeout-in-sec" json:"advance-timeout-in-sec,omitempty"`
>>>>>>> 695f93240c (processor: fix a bug that will cause processor Tick get stuck when downstream is Kafka (#11339))

// KafkaConfig represents a kafka sink configuration
type KafkaConfig struct {
Expand Down
144 changes: 144 additions & 0 deletions pkg/sink/kafka/sarama_factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package kafka

import (
"context"
"time"

"github.com/IBM/sarama"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/util"
"github.com/rcrowley/go-metrics"
"go.uber.org/zap"
)

type saramaFactory struct {
changefeedID model.ChangeFeedID
option *Options

registry metrics.Registry
}

// NewSaramaFactory constructs a Factory with sarama implementation.
func NewSaramaFactory(
o *Options,
changefeedID model.ChangeFeedID,
) (Factory, error) {
return &saramaFactory{
changefeedID: changefeedID,
option: o,
registry: metrics.NewRegistry(),
}, nil
}

func (f *saramaFactory) AdminClient(ctx context.Context) (ClusterAdminClient, error) {
start := time.Now()
config, err := NewSaramaConfig(ctx, f.option)
duration := time.Since(start).Seconds()
if duration > 2 {
log.Warn("new sarama config cost too much time", zap.Any("duration", duration), zap.Stringer("changefeedID", f.changefeedID))
}
if err != nil {
return nil, err
}

start = time.Now()
client, err := sarama.NewClient(f.option.BrokerEndpoints, config)
duration = time.Since(start).Seconds()
if duration > 2 {
log.Warn("new sarama client cost too much time", zap.Any("duration", duration), zap.Stringer("changefeedID", f.changefeedID))
}
if err != nil {
return nil, errors.Trace(err)
}

start = time.Now()
admin, err := sarama.NewClusterAdminFromClient(client)
duration = time.Since(start).Seconds()
if duration > 2 {
log.Warn("new sarama cluster admin cost too much time", zap.Any("duration", duration), zap.Stringer("changefeedID", f.changefeedID))
}
if err != nil {
return nil, errors.Trace(err)
}
return &saramaAdminClient{
client: client,
admin: admin,
changefeed: f.changefeedID,
}, nil
}

// SyncProducer returns a Sync Producer,
// it should be the caller's responsibility to close the producer
func (f *saramaFactory) SyncProducer(ctx context.Context) (SyncProducer, error) {
config, err := NewSaramaConfig(ctx, f.option)
if err != nil {
return nil, err
}
config.MetricRegistry = f.registry

client, err := sarama.NewClient(f.option.BrokerEndpoints, config)
if err != nil {
return nil, errors.Trace(err)
}

p, err := sarama.NewSyncProducerFromClient(client)
if err != nil {
return nil, errors.Trace(err)
}
return &saramaSyncProducer{
id: f.changefeedID,
client: client,
producer: p,
}, nil
}

// AsyncProducer return an Async Producer,
// it should be the caller's responsibility to close the producer
func (f *saramaFactory) AsyncProducer(
ctx context.Context,
failpointCh chan error,
) (AsyncProducer, error) {
config, err := NewSaramaConfig(ctx, f.option)
if err != nil {
return nil, err
}
config.MetricRegistry = f.registry

client, err := sarama.NewClient(f.option.BrokerEndpoints, config)
if err != nil {
return nil, errors.Trace(err)
}
p, err := sarama.NewAsyncProducerFromClient(client)
if err != nil {
return nil, errors.Trace(err)
}
return &saramaAsyncProducer{
client: client,
producer: p,
changefeedID: f.changefeedID,
failpointCh: failpointCh,
}, nil
}

func (f *saramaFactory) MetricsCollector(
role util.Role,
adminClient ClusterAdminClient,
) MetricsCollector {
return NewSaramaMetricsCollector(
f.changefeedID, role, adminClient, f.registry)
}

This file was deleted.

29 changes: 0 additions & 29 deletions tests/integration_tests/hang_sink_suicide/conf/diff_config.toml

This file was deleted.

47 changes: 0 additions & 47 deletions tests/integration_tests/hang_sink_suicide/run.sh

This file was deleted.

5 changes: 5 additions & 0 deletions tests/integration_tests/run_group.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,13 @@ group=$2
# Other tests that only support mysql: batch_update_to_no_batch ddl_reentrant
# changefeed_fast_fail changefeed_resume_with_checkpoint_ts sequence
# multi_cdc_cluster capture_suicide_while_balance_table
<<<<<<< HEAD
mysql_only="bdr_mode capture_suicide_while_balance_table syncpoint hang_sink_suicide server_config_compatibility changefeed_dup_error_restart"
mysql_only_http="http_api http_api_tls api_v2"
=======
mysql_only="bdr_mode capture_suicide_while_balance_table syncpoint syncpoint_check_ts server_config_compatibility changefeed_dup_error_restart"
mysql_only_http="http_api http_api_tls api_v2 http_api_tls_with_user_auth cli_tls_with_auth"
>>>>>>> 695f93240c (processor: fix a bug that will cause processor Tick get stuck when downstream is Kafka (#11339))
mysql_only_consistent_replicate="consistent_replicate_ddl consistent_replicate_gbk consistent_replicate_nfs consistent_replicate_storage_file consistent_replicate_storage_file_large_value consistent_replicate_storage_s3 consistent_partition_table"

kafka_only="kafka_big_messages kafka_compression kafka_messages kafka_sink_error_resume mq_sink_lost_callback"
Expand Down

0 comments on commit 90ed4d8

Please sign in to comment.