Skip to content

Commit

Permalink
Merge branch 'master' into use_local_files
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Oct 21, 2022
2 parents d44df2e + 438e042 commit af7ec6e
Show file tree
Hide file tree
Showing 19 changed files with 1,072 additions and 341 deletions.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -512,11 +512,15 @@ check_third_party_binary_for_engine:
@which go || (echo "go not found in ${PATH}"; exit 1)
@which mysql || (echo "mysql not found in ${PATH}"; exit 1)
@which jq || (echo "jq not found in ${PATH}"; exit 1)
@which mc || (echo "mc not found in ${PATH}, you can use 'make bin/mc' and move bin/mc to ${PATH}"; exit 1)
@which bin/sync_diff_inspector || (echo "run 'make bin/sync_diff_inspector' to download it if you need")

check_engine_integration_test:
./engine/test/utils/check_case.sh

bin/mc:
./scripts/download-mc.sh

bin/sync_diff_inspector:
./scripts/download-sync-diff.sh

Expand Down
42 changes: 42 additions & 0 deletions cdc/model/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,3 +284,45 @@ type ProcInfoSnap struct {
CfID ChangeFeedID `json:"changefeed-id"`
CaptureID string `json:"capture-id"`
}

// TableSet maintains a set of TableID.
type TableSet struct {
memo map[TableID]struct{}
}

// NewTableSet creates a TableSet.
func NewTableSet() *TableSet {
return &TableSet{
memo: make(map[TableID]struct{}),
}
}

// Add adds a tableID to TableSet.
func (s *TableSet) Add(tableID TableID) {
s.memo[tableID] = struct{}{}
}

// Remove removes a tableID from a TableSet.
func (s *TableSet) Remove(tableID TableID) {
delete(s.memo, tableID)
}

// Keys returns a collection of TableID.
func (s *TableSet) Keys() []TableID {
result := make([]TableID, 0, len(s.memo))
for k := range s.memo {
result = append(result, k)
}
return result
}

// Contain checks whether a TableID is in TableSet.
func (s *TableSet) Contain(tableID TableID) bool {
_, ok := s.memo[tableID]
return ok
}

// Size returns the size of TableSet.
func (s *TableSet) Size() int {
return len(s.memo)
}
6 changes: 3 additions & 3 deletions cdc/scheduler/internal/v3/scheduler/scheduler_move_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,15 @@ func (m *moveTableScheduler) Schedule(
return result
}

allTables := newTableSet()
allTables := model.NewTableSet()
for _, tableID := range currentTables {
allTables.add(tableID)
allTables.Add(tableID)
}

for tableID, task := range m.tasks {
// table may not in the all current tables
// if it was removed after manual move table triggered.
if !allTables.contain(tableID) {
if !allTables.Contain(tableID) {
log.Warn("schedulerv3: move table ignored, since the table cannot found",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
Expand Down
51 changes: 8 additions & 43 deletions cdc/scheduler/internal/v3/scheduler/scheduler_rebalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,24 +112,24 @@ func newBalanceMoveTables(
maxTaskLimit int,
changefeedID model.ChangeFeedID,
) []replication.MoveTable {
tablesPerCapture := make(map[model.CaptureID]*tableSet)
tablesPerCapture := make(map[model.CaptureID]*model.TableSet)
for captureID := range captures {
tablesPerCapture[captureID] = newTableSet()
tablesPerCapture[captureID] = model.NewTableSet()
}

for tableID, rep := range replications {
if rep.State != replication.ReplicationSetStateReplicating {
continue
}
tablesPerCapture[rep.Primary].add(tableID)
tablesPerCapture[rep.Primary].Add(tableID)
}

// findVictim return tables which need to be moved
upperLimitPerCapture := int(math.Ceil(float64(len(replications)) / float64(len(captures))))

victims := make([]model.TableID, 0)
for _, ts := range tablesPerCapture {
tables := ts.keys()
tables := ts.Keys()
if random != nil {
// Complexity note: Shuffle has O(n), where `n` is the number of tables.
// Also, during a single call of `Schedule`, Shuffle can be called at most
Expand Down Expand Up @@ -158,7 +158,7 @@ func newBalanceMoveTables(
break
}
victims = append(victims, table)
ts.remove(table)
ts.Remove(table)
tableNum2Remove--
}
}
Expand All @@ -168,7 +168,7 @@ func newBalanceMoveTables(

captureWorkload := make(map[model.CaptureID]int)
for captureID, ts := range tablesPerCapture {
captureWorkload[captureID] = randomizeWorkload(random, ts.size())
captureWorkload[captureID] = randomizeWorkload(random, ts.Size())
}
// for each victim table, find the target for it
moveTables := make([]replication.MoveTable, 0, len(victims))
Expand Down Expand Up @@ -198,8 +198,8 @@ func newBalanceMoveTables(
TableID: tableID,
DestCapture: target,
})
tablesPerCapture[target].add(tableID)
captureWorkload[target] = randomizeWorkload(random, tablesPerCapture[target].size())
tablesPerCapture[target].Add(tableID)
captureWorkload[target] = randomizeWorkload(random, tablesPerCapture[target].Size())
}

return moveTables
Expand All @@ -225,38 +225,3 @@ func randomizeWorkload(random *rand.Rand, input int) int {
// result of comparison of workloads when two workloads are equal.
return (input << randomPartBitSize) | randomPart
}

type tableSet struct {
memo map[model.TableID]struct{}
}

func newTableSet() *tableSet {
return &tableSet{
memo: make(map[model.TableID]struct{}),
}
}

func (s *tableSet) add(tableID model.TableID) {
s.memo[tableID] = struct{}{}
}

func (s *tableSet) remove(tableID model.TableID) {
delete(s.memo, tableID)
}

func (s *tableSet) keys() []model.TableID {
result := make([]model.TableID, 0, len(s.memo))
for k := range s.memo {
result = append(result, k)
}
return result
}

func (s *tableSet) contain(tableID model.TableID) bool {
_, ok := s.memo[tableID]
return ok
}

func (s *tableSet) size() int {
return len(s.memo)
}
135 changes: 135 additions & 0 deletions cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package cloudstorage

import (
"context"
"encoding/json"
"fmt"
"net/url"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sinkv2/ddlsink"
"github.com/pingcap/tiflow/cdc/sinkv2/metrics"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/sink/cloudstorage"
)

// Assert DDLEventSink implementation
var _ ddlsink.DDLEventSink = (*ddlSink)(nil)

type ddlSink struct {
// id indicates which changefeed this sink belongs to.
id model.ChangeFeedID
// statistic is used to record the DDL metrics
statistics *metrics.Statistics
storage storage.ExternalStorage
tables *model.TableSet
}

// NewCloudStorageDDLSink creates a ddl sink for cloud storage.
func NewCloudStorageDDLSink(ctx context.Context, sinkURI *url.URL) (*ddlSink, error) {
// parse backend storage from sinkURI
bs, err := storage.ParseBackend(sinkURI.String(), &storage.BackendOptions{})
if err != nil {
return nil, err
}

// create an external storage.
storage, err := storage.New(ctx, bs, nil)
if err != nil {
return nil, err
}
changefeedID := contextutil.ChangefeedIDFromCtx(ctx)

d := &ddlSink{
id: changefeedID,
storage: storage,
tables: model.NewTableSet(),
statistics: metrics.NewStatistics(ctx, sink.TxnSink),
}

return d, nil
}

func (d *ddlSink) generateSchemaPath(def cloudstorage.TableDetail) string {
return fmt.Sprintf("%s/%s/%d/schema.json", def.Schema, def.Table, def.Version)
}

func (d *ddlSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
var def cloudstorage.TableDetail

def.FromTableInfo(ddl.TableInfo)
encodedDef, err := json.MarshalIndent(def, "", " ")
if err != nil {
return errors.Trace(err)
}

path := d.generateSchemaPath(def)
err = d.statistics.RecordDDLExecution(func() error {
err1 := d.storage.WriteFile(ctx, path, encodedDef)
if err1 != nil {
return err1
}

d.tables.Add(ddl.TableInfo.ID)
return nil
})

return errors.Trace(err)
}

func (d *ddlSink) WriteCheckpointTs(ctx context.Context,
ts uint64, tables []*model.TableInfo,
) error {
for _, table := range tables {
ok := d.tables.Contain(table.ID)
// if table is not cached before, then create the corresponding
// schema.json file anyway.
if !ok {
var def cloudstorage.TableDetail
def.FromTableInfo(table)

encodedDef, err := json.MarshalIndent(def, "", " ")
if err != nil {
return errors.Trace(err)
}

path := d.generateSchemaPath(def)
err = d.storage.WriteFile(ctx, path, encodedDef)
if err != nil {
return errors.Trace(err)
}
d.tables.Add(table.ID)
}
}

ckpt, err := json.Marshal(map[string]uint64{"checkpoint-ts": ts})
if err != nil {
return errors.Trace(err)
}
err = d.storage.WriteFile(ctx, "metadata", ckpt)
return errors.Trace(err)
}

func (d *ddlSink) Close() error {
if d.statistics != nil {
d.statistics.Close()
}

return nil
}
Loading

0 comments on commit af7ec6e

Please sign in to comment.