Skip to content

Commit

Permalink
ticdc(pulsar): write DDL events to pulsar (pingcap#9433)
Browse files Browse the repository at this point in the history
  • Loading branch information
yumchina authored and 3AceShowHand committed Aug 29, 2023
1 parent 2af966e commit a199d59
Show file tree
Hide file tree
Showing 20 changed files with 1,181 additions and 46 deletions.
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ CURDIR := $(shell pwd)
path_to_add := $(addsuffix /bin,$(subst :,/bin:,$(GOPATH)))
export PATH := $(CURDIR)/bin:$(CURDIR)/tools/bin:$(path_to_add):$(PATH)

# DBUS_SESSION_BUS_ADDRESS pulsar client use dbus to detect the connection status,
# but it will not exit when the connection is closed.
# I try to use leak_helper to detect goroutine leak,but it does not work.
# https://github.com/benthosdev/benthos/issues/1184 suggest to use environment variable to disable dbus.
export DBUS_SESSION_BUS_ADDRESS := /dev/null

SHELL := /usr/bin/env bash

TEST_DIR := /tmp/tidb_cdc_test
Expand Down
5 changes: 5 additions & 0 deletions cdc/sink/ddlsink/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ import (
"github.com/pingcap/tiflow/cdc/sink/ddlsink/mq"
"github.com/pingcap/tiflow/cdc/sink/ddlsink/mq/ddlproducer"
"github.com/pingcap/tiflow/cdc/sink/ddlsink/mysql"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/manager"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/sink/kafka"
kafkav2 "github.com/pingcap/tiflow/pkg/sink/kafka/v2"
pulsarConfig "github.com/pingcap/tiflow/pkg/sink/pulsar"
"github.com/pingcap/tiflow/pkg/util"
)

Expand Down Expand Up @@ -59,6 +61,9 @@ func New(
return mysql.NewDDLSink(ctx, changefeedID, sinkURI, cfg)
case sink.S3Scheme, sink.FileScheme, sink.GCSScheme, sink.GSScheme, sink.AzblobScheme, sink.AzureScheme, sink.CloudStorageNoopScheme:
return cloudstorage.NewDDLSink(ctx, changefeedID, sinkURI, cfg)
case sink.PulsarScheme, sink.PulsarSSLScheme:
return mq.NewPulsarDDLSink(ctx, changefeedID, sinkURI, cfg, manager.NewPulsarTopicManager,
pulsarConfig.NewCreatorFactory, ddlproducer.NewPulsarProducer)
default:
return nil,
cerror.ErrSinkURIInvalid.GenWithStack("the sink scheme (%s) is not supported", scheme)
Expand Down
7 changes: 7 additions & 0 deletions cdc/sink/ddlsink/mq/ddlproducer/ddl_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ package ddlproducer
import (
"context"

"github.com/apache/pulsar-client-go/pulsar"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
"github.com/pingcap/tiflow/pkg/sink/kafka"
pulsarConfig "github.com/pingcap/tiflow/pkg/sink/pulsar"
)

// DDLProducer is the interface for DDL message producer.
Expand All @@ -38,3 +41,7 @@ type DDLProducer interface {
// Factory is a function to create a producer.
type Factory func(ctx context.Context, changefeedID model.ChangeFeedID,
syncProducer kafka.SyncProducer) DDLProducer

// PulsarFactory is a function to create a pulsar producer.
type PulsarFactory func(ctx context.Context, changefeedID model.ChangeFeedID,
pConfig *pulsarConfig.Config, client pulsar.Client, sinkConfig *config.SinkConfig) (DDLProducer, error)
113 changes: 113 additions & 0 deletions cdc/sink/ddlsink/mq/ddlproducer/pulsar_ddl_mock_producer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package ddlproducer

import (
"context"

"github.com/apache/pulsar-client-go/pulsar"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
pulsarConfig "github.com/pingcap/tiflow/pkg/sink/pulsar"
)

// Assert DDLEventSink implementation
var _ DDLProducer = (*PulsarMockProducers)(nil)

// PulsarMockProducers is a mock pulsar producer
type PulsarMockProducers struct {
events map[string][]*pulsar.ProducerMessage
}

// SyncBroadcastMessage pulsar consume all partitions
func (p *PulsarMockProducers) SyncBroadcastMessage(ctx context.Context, topic string,
totalPartitionsNum int32, message *common.Message,
) error {
// call SyncSendMessage

log.Info("pulsarProducers SyncBroadcastMessage in")
return p.SyncSendMessage(ctx, topic, totalPartitionsNum, message)
}

// SyncSendMessage sends a message
// partitionNum is not used,pulsar consume all partitions
func (p *PulsarMockProducers) SyncSendMessage(ctx context.Context, topic string,
partitionNum int32, message *common.Message,
) error {
data := &pulsar.ProducerMessage{
Payload: message.Value,
Key: message.GetPartitionKey(),
}
p.events[topic] = append(p.events[topic], data)

return nil
}

// NewMockPulsarProducer creates a pulsar producer
func NewMockPulsarProducer(
ctx context.Context,
changefeedID model.ChangeFeedID,
pConfig *pulsarConfig.Config,
client pulsar.Client,
) (*PulsarMockProducers, error) {
return &PulsarMockProducers{
events: map[string][]*pulsar.ProducerMessage{},
}, nil
}

// NewMockPulsarProducerDDL creates a pulsar producer for DDLProducer
func NewMockPulsarProducerDDL(
ctx context.Context,
changefeedID model.ChangeFeedID,
pConfig *pulsarConfig.Config,
client pulsar.Client,
sinkConfig *config.SinkConfig,
) (DDLProducer, error) {
return NewMockPulsarProducer(ctx, changefeedID, pConfig, client)
}

// GetProducerByTopic returns a producer by topic name
func (p *PulsarMockProducers) GetProducerByTopic(topicName string) (producer pulsar.Producer, err error) {
return producer, nil
}

// Close close all producers
func (p *PulsarMockProducers) Close() {
p.events = make(map[string][]*pulsar.ProducerMessage)
}

// Flush waits for all the messages in the async producer to be sent to Pulsar.
// Notice: this method is not thread-safe.
// Do not try to call AsyncSendMessage and Flush functions in different threads,
// otherwise Flush will not work as expected. It may never finish or flush the wrong message.
// Because inflight will be modified by mistake.
func (p *PulsarMockProducers) Flush(ctx context.Context) error {
return nil
}

// GetAllEvents returns the events received by the mock producer.
func (p *PulsarMockProducers) GetAllEvents() []*pulsar.ProducerMessage {
var events []*pulsar.ProducerMessage
for _, v := range p.events {
events = append(events, v...)
}
return events
}

// GetEvents returns the event filtered by the key.
func (p *PulsarMockProducers) GetEvents(topic string) []*pulsar.ProducerMessage {
return p.events[topic]
}
Loading

0 comments on commit a199d59

Please sign in to comment.