Skip to content

Commit

Permalink
sink/(ticdc): Dispatch DML events to cloud storage sink (#7048)
Browse files Browse the repository at this point in the history
ref #6797
  • Loading branch information
zhaoxinyu authored Oct 31, 2022
1 parent a9c5dca commit 5fed59a
Show file tree
Hide file tree
Showing 30 changed files with 1,724 additions and 63 deletions.
9 changes: 5 additions & 4 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,10 +560,11 @@ func (d *DDLEvent) FromRenameTablesJob(job *model.Job,
//msgp:ignore SingleTableTxn
type SingleTableTxn struct {
// data fields of SingleTableTxn
Table *TableName
StartTs uint64
CommitTs uint64
Rows []*RowChangedEvent
Table *TableName
TableVersion uint64
StartTs uint64
CommitTs uint64
Rows []*RowChangedEvent

// control fields of SingleTableTxn
// FinishWg is a barrier txn, after this txn is received, the worker must
Expand Down
6 changes: 4 additions & 2 deletions cdc/redo/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ var InitS3storage = func(ctx context.Context, uri url.URL) (storage.ExternalStor
s3storage, err := storage.New(ctx, backend, &storage.ExternalStorageOptions{
SendCredentials: false,
HTTPClient: nil,
S3Retryer: defaultS3Retryer(),
S3Retryer: DefaultS3Retryer(),
})
if err != nil {
return nil, cerror.WrapChangefeedUnretryableErr(cerror.ErrS3StorageInitialize, err)
Expand Down Expand Up @@ -152,7 +152,9 @@ func (rl retryerWithLog) RetryRules(r *request.Request) time.Duration {
return backoffTime
}

func defaultS3Retryer() request.Retryer {
// DefaultS3Retryer is the default s3 retryer, maybe this function
// should be extracted to another place.
func DefaultS3Retryer() request.Retryer {
return retryerWithLog{
DefaultRetryer: client.DefaultRetryer{
NumMaxRetries: 3,
Expand Down
3 changes: 3 additions & 0 deletions cdc/sink/codec/builder/encoder_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pingcap/tiflow/cdc/sink/codec/canal"
"github.com/pingcap/tiflow/cdc/sink/codec/common"
"github.com/pingcap/tiflow/cdc/sink/codec/craft"
"github.com/pingcap/tiflow/cdc/sink/codec/csv"
"github.com/pingcap/tiflow/cdc/sink/codec/maxwell"
"github.com/pingcap/tiflow/cdc/sink/codec/open"
"github.com/pingcap/tiflow/pkg/config"
Expand All @@ -41,6 +42,8 @@ func NewEventBatchEncoderBuilder(ctx context.Context, c *common.Config) (codec.E
return canal.NewJSONBatchEncoderBuilder(c), nil
case config.ProtocolCraft:
return craft.NewBatchEncoderBuilder(c), nil
case config.ProtocolCsv:
return csv.NewBatchEncoderBuilder(c), nil
default:
return nil, cerror.ErrSinkUnknownProtocol.GenWithStackByArgs(c.Protocol)
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/codec/csv/csv_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (b *BatchEncoder) Build() (messages []*common.Message) {

ret := common.NewMsg(config.ProtocolCsv, nil, b.valueBuf.Bytes(), 0, model.MessageTypeRow, nil, nil)
ret.SetRowsCount(b.batchSize)
if len(b.callbackBuf) != 0 && len(b.callbackBuf) == b.batchSize {
if len(b.callbackBuf) != 0 {
callbacks := b.callbackBuf
ret.Callback = func() {
for _, cb := range callbacks {
Expand Down
31 changes: 23 additions & 8 deletions cdc/sink/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sinkv2/eventsink/factory"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/util"
Expand All @@ -28,21 +29,35 @@ import (
// TODO: For now, we create a real sink instance and validate it.
// Maybe we should support the dry-run mode to validate sink.
func Validate(ctx context.Context, sinkURI string, cfg *config.ReplicaConfig) error {
if err := preCheckSinkURI(sinkURI); err != nil {
var err error
if err = preCheckSinkURI(sinkURI); err != nil {
return err
}

errCh := make(chan error)
ctx, cancel := context.WithCancel(contextutil.PutRoleInCtx(ctx, util.RoleClient))
s, err := New(ctx, model.DefaultChangeFeedID("sink-verify"), sinkURI, cfg, errCh)
if err != nil {
conf := config.GetGlobalServerConfig()
if !conf.Debug.EnableNewSink {
var s Sink
s, err = New(ctx, model.DefaultChangeFeedID("sink-verify"), sinkURI, cfg, errCh)
if err != nil {
cancel()
return err
}
// NOTICE: We have to cancel the context before we close it,
// otherwise we will write data to closed chan after sink closed.
cancel()
return err
err = s.Close(ctx)
} else {
var s *factory.SinkFactory
s, err = factory.New(ctx, sinkURI, cfg, errCh)
if err != nil {
cancel()
return err
}
cancel()
err = s.Close()
}
// NOTICE: We have to cancel the context before we close it,
// otherwise we will write data to closed chan after sink closed.
cancel()
err = s.Close(ctx)
if err != nil {
return err
}
Expand Down
12 changes: 10 additions & 2 deletions cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/redo/common"
"github.com/pingcap/tiflow/cdc/sinkv2/ddlsink"
"github.com/pingcap/tiflow/cdc/sinkv2/metrics"
"github.com/pingcap/tiflow/pkg/sink"
Expand All @@ -44,13 +45,16 @@ type ddlSink struct {
// NewCloudStorageDDLSink creates a ddl sink for cloud storage.
func NewCloudStorageDDLSink(ctx context.Context, sinkURI *url.URL) (*ddlSink, error) {
// parse backend storage from sinkURI
bs, err := storage.ParseBackend(sinkURI.String(), &storage.BackendOptions{})
bs, err := storage.ParseBackend(sinkURI.String(), nil)
if err != nil {
return nil, err
}

// create an external storage.
storage, err := storage.New(ctx, bs, nil)
storage, err := storage.New(ctx, bs, &storage.ExternalStorageOptions{
SendCredentials: false,
S3Retryer: common.DefaultS3Retryer(),
})
if err != nil {
return nil, err
}
Expand All @@ -73,6 +77,10 @@ func (d *ddlSink) generateSchemaPath(def cloudstorage.TableDetail) string {
func (d *ddlSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
var def cloudstorage.TableDetail

if ddl.TableInfo.TableInfo == nil {
return nil
}

def.FromTableInfo(ddl.TableInfo)
encodedDef, err := json.MarshalIndent(def, "", " ")
if err != nil {
Expand Down
9 changes: 6 additions & 3 deletions cdc/sinkv2/ddlsink/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pingcap/tiflow/cdc/sink/mq/producer/kafka"
"github.com/pingcap/tiflow/cdc/sinkv2/ddlsink"
"github.com/pingcap/tiflow/cdc/sinkv2/ddlsink/blackhole"
"github.com/pingcap/tiflow/cdc/sinkv2/ddlsink/cloudstorage"
"github.com/pingcap/tiflow/cdc/sinkv2/ddlsink/mq"
"github.com/pingcap/tiflow/cdc/sinkv2/ddlsink/mq/ddlproducer"
"github.com/pingcap/tiflow/cdc/sinkv2/ddlsink/mysql"
Expand All @@ -41,13 +42,15 @@ func New(
}
schema := strings.ToLower(sinkURI.Scheme)
switch schema {
case sink.KafkaSchema, sink.KafkaSSLSchema:
case sink.KafkaScheme, sink.KafkaSSLScheme:
return mq.NewKafkaDDLSink(ctx, sinkURI, cfg,
kafka.NewAdminClientImpl, ddlproducer.NewKafkaDDLProducer)
case sink.BlackHoleSchema:
case sink.BlackHoleScheme:
return blackhole.New(), nil
case sink.MySQLSSLSchema, sink.MySQLSchema, sink.TiDBSchema, sink.TiDBSSLSchema:
case sink.MySQLSSLScheme, sink.MySQLScheme, sink.TiDBScheme, sink.TiDBSSLScheme:
return mysql.NewMySQLDDLSink(ctx, sinkURI, cfg, pmysql.CreateMySQLDBConn)
case sink.S3Scheme, sink.FileScheme, sink.GCSScheme, sink.AzblobScheme:
return cloudstorage.NewCloudStorageDDLSink(ctx, sinkURI)
default:
return nil,
cerror.ErrSinkURIInvalid.GenWithStack("the sink scheme (%s) is not supported", schema)
Expand Down
10 changes: 5 additions & 5 deletions cdc/sinkv2/ddlsink/mq/kafka_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/pingcap/tiflow/cdc/sink/mq/dispatcher"
"github.com/pingcap/tiflow/cdc/sink/mq/producer/kafka"
"github.com/pingcap/tiflow/cdc/sinkv2/ddlsink/mq/ddlproducer"
mqutil "github.com/pingcap/tiflow/cdc/sinkv2/util/mq"
"github.com/pingcap/tiflow/cdc/sinkv2/util"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
pkafka "github.com/pingcap/tiflow/pkg/sink/kafka"
Expand All @@ -39,7 +39,7 @@ func NewKafkaDDLSink(
adminClientCreator pkafka.ClusterAdminClientCreator,
producerCreator ddlproducer.Factory,
) (_ *ddlSink, err error) {
topic, err := mqutil.GetTopic(sinkURI)
topic, err := util.GetTopic(sinkURI)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -72,7 +72,7 @@ func NewKafkaDDLSink(
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

protocol, err := mqutil.GetProtocol(replicaConfig.Sink.Protocol)
protocol, err := util.GetProtocol(replicaConfig.Sink.Protocol)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -98,7 +98,7 @@ func NewKafkaDDLSink(
}
}()

topicManager, err := mqutil.GetTopicManagerAndTryCreateTopic(
topicManager, err := util.GetTopicManagerAndTryCreateTopic(
topic,
baseConfig.DeriveTopicConfig(),
client,
Expand All @@ -113,7 +113,7 @@ func NewKafkaDDLSink(
return nil, errors.Trace(err)
}

encoderConfig, err := mqutil.GetEncoderConfig(sinkURI, protocol, replicaConfig,
encoderConfig, err := util.GetEncoderConfig(sinkURI, protocol, replicaConfig,
saramaConfig.Producer.MaxMessageBytes)
if err != nil {
return nil, errors.Trace(err)
Expand Down
Loading

0 comments on commit 5fed59a

Please sign in to comment.