Skip to content

Commit

Permalink
pulsar (ticdc): fix pulsar sink related errors (#9770)
Browse files Browse the repository at this point in the history
ref #9413
  • Loading branch information
asddongmen authored Sep 20, 2023
1 parent bd83f8d commit 907d937
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 39 deletions.
8 changes: 4 additions & 4 deletions cdc/sink/dmlsink/mq/manager/pulsar_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ func NewPulsarTopicManager(
return mgr, nil
}

// GetPartitionNum spend more time,but no use.
// Neither synchronous nor asynchronous sending of pulsar will use PartitionNum
// but this method is used in mq_ddl_sink.go, so an empty implementation is required
// GetPartitionNum always return 1 because we pass a message key to pulsar producer,
// and pulsar producer will hash the key to a partition.
// This method is only used to meet the requirement of mq sink's interface.
func (m *pulsarTopicManager) GetPartitionNum(ctx context.Context, topic string) (int32, error) {
return 0, nil
return 1, nil
}

// CreateTopicAndWaitUntilVisible no need to create first
Expand Down
37 changes: 2 additions & 35 deletions cmd/pulsar-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,9 @@ import (
"github.com/pingcap/tiflow/cdc/sink/ddlsink"
ddlsinkfactory "github.com/pingcap/tiflow/cdc/sink/ddlsink/factory"
eventsinkfactory "github.com/pingcap/tiflow/cdc/sink/dmlsink/factory"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher"
"github.com/pingcap/tiflow/cdc/sink/tablesink"
sutil "github.com/pingcap/tiflow/cdc/sink/util"
cmdUtil "github.com/pingcap/tiflow/pkg/cmd/util"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/logutil"
"github.com/pingcap/tiflow/pkg/quotes"
"github.com/pingcap/tiflow/pkg/sink"
Expand All @@ -64,9 +61,6 @@ type ConsumerOption struct {
protocol config.Protocol
enableTiDBExtension bool

// the replicaConfig of the changefeed which produce data to the pulsar topic
replicaConfig *config.ReplicaConfig

logPath string
logLevel string
timezone string
Expand All @@ -83,7 +77,7 @@ func newConsumerOption() *ConsumerOption {
}

// Adjust the consumer option by the upstream uri passed in parameters.
func (o *ConsumerOption) Adjust(upstreamURI *url.URL, configFile string) error {
func (o *ConsumerOption) Adjust(upstreamURI *url.URL, configFile string) {
// the default value of partitionNum is 1
o.partitionNum = 1

Expand Down Expand Up @@ -120,26 +114,12 @@ func (o *ConsumerOption) Adjust(upstreamURI *url.URL, configFile string) error {
o.enableTiDBExtension = enableTiDBExtension
}

if configFile != "" {
replicaConfig := config.GetDefaultReplicaConfig()
replicaConfig.Sink.Protocol = util.AddressOf(o.protocol.String())
err := cmdUtil.StrictDecodeFile(configFile, "pulsar consumer", replicaConfig)
if err != nil {
return errors.Trace(err)
}
if _, err := filter.VerifyTableRules(replicaConfig.Filter); err != nil {
return errors.Trace(err)
}
o.replicaConfig = replicaConfig
}

log.Info("consumer option adjusted",
zap.String("configFile", configFile),
zap.String("address", strings.Join(o.address, ",")),
zap.String("topic", o.topic),
zap.Any("protocol", o.protocol),
zap.Bool("enableTiDBExtension", o.enableTiDBExtension))
return nil
}

var (
Expand Down Expand Up @@ -195,10 +175,7 @@ func run(cmd *cobra.Command, args []string) {
zap.String("upstreamURI", upstreamURIStr))
}

err = consumerOption.Adjust(upstreamURI, configFile)
if err != nil {
log.Panic("adjust consumer option failed", zap.Error(err))
}
consumerOption.Adjust(upstreamURI, configFile)

ctx, cancel := context.WithCancel(context.Background())
consumer, err := NewConsumer(ctx, consumerOption)
Expand Down Expand Up @@ -311,8 +288,6 @@ type Consumer struct {
// initialize to 0 by default
globalResolvedTs uint64

eventRouter *dispatcher.EventRouter

tz *time.Location

codecConfig *common.Config
Expand Down Expand Up @@ -344,14 +319,6 @@ func NewConsumer(ctx context.Context, o *ConsumerOption) (*Consumer, error) {
c.codecConfig.AvroEnableWatermark = true
}

if o.replicaConfig != nil {
eventRouter, err := dispatcher.NewEventRouter(o.replicaConfig, o.protocol, o.topic, sink.PulsarScheme)
if err != nil {
return nil, errors.Trace(err)
}
c.eventRouter = eventRouter
}

c.sinks = make([]*partitionSinks, o.partitionNum)
ctx, cancel := context.WithCancel(ctx)
errChan := make(chan error, 1)
Expand Down
6 changes: 6 additions & 0 deletions pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,12 @@ func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL) error {
}
}

if sink.IsPulsarScheme(sinkURI.Scheme) && s.PulsarConfig == nil {
s.PulsarConfig = &PulsarConfig{
SinkURI: sinkURI,
}
}

if s.PulsarConfig != nil {
if err := s.PulsarConfig.validate(); err != nil {
return err
Expand Down

0 comments on commit 907d937

Please sign in to comment.