Skip to content

Commit

Permalink
restructure some files
Browse files Browse the repository at this point in the history
  • Loading branch information
zhaoxinyu committed Oct 21, 2022
1 parent aec00c6 commit 70d6749
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 24 deletions.
7 changes: 7 additions & 0 deletions cdc/model/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,24 +285,29 @@ type ProcInfoSnap struct {
CaptureID string `json:"capture-id"`
}

// TableSet maintains a set of TableID.
type TableSet struct {
memo map[TableID]struct{}
}

// NewTableSet creates a TableSet.
func NewTableSet() *TableSet {
return &TableSet{
memo: make(map[TableID]struct{}),
}
}

// Add adds a tableID to TableSet.
func (s *TableSet) Add(tableID TableID) {
s.memo[tableID] = struct{}{}
}

// Remove removes a tableID from a TableSet.
func (s *TableSet) Remove(tableID TableID) {
delete(s.memo, tableID)
}

// Keys returns a collection of TableID.
func (s *TableSet) Keys() []TableID {
result := make([]TableID, 0, len(s.memo))
for k := range s.memo {
Expand All @@ -311,11 +316,13 @@ func (s *TableSet) Keys() []TableID {
return result
}

// Contain checks whether a TableID is in TableSet.
func (s *TableSet) Contain(tableID TableID) bool {
_, ok := s.memo[tableID]
return ok
}

// Size returns the size of TableSet.
func (s *TableSet) Size() int {
return len(s.memo)
}
13 changes: 8 additions & 5 deletions cdc/sinkv2/ddlsink/cloudstorage/cloud_storage_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tiflow/cdc/sinkv2/ddlsink"
"github.com/pingcap/tiflow/cdc/sinkv2/metrics"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/sink/cloudstorage"
)

// Assert DDLEventSink implementation
Expand Down Expand Up @@ -65,14 +66,14 @@ func NewCloudStorageDDLSink(ctx context.Context, sinkURI *url.URL) (*ddlSink, er
return d, nil
}

func (d *ddlSink) generateSchemaPath(def tableDef) string {
func (d *ddlSink) generateSchemaPath(def cloudstorage.TableDef) string {
return fmt.Sprintf("%s/%s/%d/schema.json", def.Schema, def.Table, def.Version)
}

func (d *ddlSink) WriteDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
var def tableDef
var def cloudstorage.TableDef

def.fromTableInfo(ddl.TableInfo)
def.FromTableInfo(ddl.TableInfo)
encodedDef, err := json.MarshalIndent(def, "", " ")
if err != nil {
return errors.Trace(err)
Expand All @@ -97,9 +98,11 @@ func (d *ddlSink) WriteCheckpointTs(ctx context.Context,
) error {
for _, table := range tables {
ok := d.tables.Contain(table.ID)
// if table is not cached before, then create the corresponding
// schema.json file anyway.
if !ok {
var def tableDef
def.fromTableInfo(table)
var def cloudstorage.TableDef
def.FromTableInfo(table)

encodedDef, err := json.MarshalIndent(def, "", " ")
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/types"
"github.com/pingcap/tiflow/cdc/model"

"github.com/stretchr/testify/require"
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import (
"github.com/pingcap/tiflow/cdc/model"
)

type tableColumn struct {
// TableCol denotes the column info for a table definition.
type TableCol struct {
Name string `json:"ColumnName" `
Tp string `json:"ColumnType"`
Length string `json:"ColumnLength,omitempty"`
Expand All @@ -32,7 +33,8 @@ type tableColumn struct {
IsPK string `json:"ColumnIsPk,omitempty"`
}

func (t *tableColumn) fromTiColumnInfo(col *timodel.ColumnInfo) {
// FromTiColumnInfo converts from TiDB ColumnInfo to TableCol.
func (t *TableCol) FromTiColumnInfo(col *timodel.ColumnInfo) {
defaultFlen, defaultDecimal := mysql.GetDefaultFieldLengthAndDecimal(col.GetType())
isDecimalNotDefault := col.GetDecimal() != defaultDecimal &&
col.GetDecimal() != 0 &&
Expand Down Expand Up @@ -81,22 +83,24 @@ func (t *tableColumn) fromTiColumnInfo(col *timodel.ColumnInfo) {
}
}

type tableDef struct {
Table string `json:"Table"`
Schema string `json:"Schema"`
Version uint64 `json:"Version"`
Columns []tableColumn `json:"TableColumns"`
TotalColumns int `json:"TableColumnsTotal"`
// TableDef is the detailed table definition used for cloud storage sink.
type TableDef struct {
Table string `json:"Table"`
Schema string `json:"Schema"`
Version uint64 `json:"Version"`
Columns []TableCol `json:"TableColumns"`
TotalColumns int `json:"TableColumnsTotal"`
}

func (t *tableDef) fromTableInfo(info *model.TableInfo) {
// FromTableInfo converts from TableInfo to TableDef.
func (t *TableDef) FromTableInfo(info *model.TableInfo) {
t.Table = info.TableName.Table
t.Schema = info.TableName.Schema
t.Version = info.TableInfoVersion
t.TotalColumns = len(info.Columns)
for _, col := range info.Columns {
var tableCol tableColumn
tableCol.fromTiColumnInfo(col)
var tableCol TableCol
tableCol.FromTiColumnInfo(col)
t.Columns = append(t.Columns, tableCol)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@ import (
"testing"

timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tiflow/cdc/model"
"github.com/stretchr/testify/require"

"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tiflow/cdc/model"
"github.com/stretchr/testify/require"
)

func TestBuildTableColFromTiColumnInfo(t *testing.T) {
Expand Down Expand Up @@ -302,8 +301,8 @@ func TestBuildTableColFromTiColumnInfo(t *testing.T) {
ft.SetFlag(tc.flag)
}
col := &timodel.ColumnInfo{FieldType: *ft}
var tableCol tableColumn
tableCol.fromTiColumnInfo(col)
var tableCol TableCol
tableCol.FromTiColumnInfo(col)
encodedCol, err := json.Marshal(tableCol)
require.Nil(t, err, tc.name)
require.JSONEq(t, tc.expected, string(encodedCol), tc.name)
Expand All @@ -312,7 +311,7 @@ func TestBuildTableColFromTiColumnInfo(t *testing.T) {

func TestBuildTableDefFromTableInfo(t *testing.T) {
var columns []*timodel.ColumnInfo
var def tableDef
var def TableDef

tableInfo := &model.TableInfo{
TableInfoVersion: 100,
Expand Down Expand Up @@ -343,7 +342,7 @@ func TestBuildTableDefFromTableInfo(t *testing.T) {
columns = append(columns, col)

tableInfo.TableInfo = &timodel.TableInfo{Columns: columns}
def.fromTableInfo(tableInfo)
def.FromTableInfo(tableInfo)
encodedDef, err := json.MarshalIndent(def, "", " ")
require.Nil(t, err)
fmt.Println(string(encodedDef))
Expand Down

0 comments on commit 70d6749

Please sign in to comment.