Skip to content

Commit

Permalink
Merge branch 'master' into fix-relay-heartbeat
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Apr 1, 2022
2 parents 46d0de9 + 7b7a79b commit b46e622
Show file tree
Hide file tree
Showing 13 changed files with 136 additions and 81 deletions.
2 changes: 1 addition & 1 deletion dm/pkg/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ func (tr *Tracker) getTableInfoByCreateStmt(tctx *tcontext.Context, tableID stri
return nil, err
}
}
createStr, err := utils.GetTableCreateSQL(tctx.Ctx, tr.dsTracker.downstreamConn.BaseConn.DBConn, tableID)
createStr, err := dbconn.GetTableCreateSQL(tctx, tr.dsTracker.downstreamConn, tableID)
if err != nil {
return nil, dmterror.ErrSchemaTrackerCannotFetchDownstreamCreateTableStmt.Delegate(err, tableID)
}
Expand Down
18 changes: 9 additions & 9 deletions dm/pkg/schema/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (s *trackerSuite) SetUpSuite(c *C) {
c.Assert(err, IsNil)
con, err := db.Conn(context.Background())
c.Assert(err, IsNil)
s.dbConn = &dbconn.DBConn{Cfg: s.cfg, BaseConn: conn.NewBaseConn(con, nil)}
s.dbConn = dbconn.NewDBConn(s.cfg, conn.NewBaseConn(con, nil))
}

func (s *trackerSuite) TearDownSuite(c *C) {
Expand All @@ -88,7 +88,7 @@ func (s *trackerSuite) TestTiDBAndSessionCfg(c *C) {
con, err := db.Conn(context.Background())
c.Assert(err, IsNil)
baseConn := conn.NewBaseConn(con, nil)
dbConn := &dbconn.DBConn{Cfg: s.cfg, BaseConn: baseConn}
dbConn := dbconn.NewDBConn(s.cfg, baseConn)
// user give correct session config

t, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn)
Expand Down Expand Up @@ -690,7 +690,7 @@ func (s *trackerSuite) TestNotSupportedVariable(c *C) {
con, err := db.Conn(context.Background())
c.Assert(err, IsNil)
baseConn := conn.NewBaseConn(con, nil)
dbConn := &dbconn.DBConn{Cfg: s.cfg, BaseConn: baseConn}
dbConn := dbconn.NewDBConn(s.cfg, baseConn)

mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(
sqlmock.NewRows([]string{"Variable_name", "Value"}).
Expand All @@ -717,7 +717,7 @@ func (s *trackerSuite) TestInitDownStreamSQLModeAndParser(c *C) {
con, err := db.Conn(context.Background())
c.Assert(err, IsNil)
baseConn := conn.NewBaseConn(con, nil)
dbConn := &dbconn.DBConn{Cfg: s.cfg, BaseConn: baseConn}
dbConn := dbconn.NewDBConn(s.cfg, baseConn)

tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -755,7 +755,7 @@ func (s *trackerSuite) TestGetDownStreamIndexInfo(c *C) {
con, err := db.Conn(context.Background())
c.Assert(err, IsNil)
baseConn := conn.NewBaseConn(con, nil)
dbConn := &dbconn.DBConn{Cfg: s.cfg, BaseConn: baseConn}
dbConn := dbconn.NewDBConn(s.cfg, baseConn)
tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn)
c.Assert(err, IsNil)
defer func() {
Expand Down Expand Up @@ -797,7 +797,7 @@ func (s *trackerSuite) TestReTrackDownStreamIndex(c *C) {
con, err := db.Conn(context.Background())
c.Assert(err, IsNil)
baseConn := conn.NewBaseConn(con, nil)
dbConn := &dbconn.DBConn{Cfg: s.cfg, BaseConn: baseConn}
dbConn := dbconn.NewDBConn(s.cfg, baseConn)
tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn)
c.Assert(err, IsNil)
defer func() {
Expand Down Expand Up @@ -889,7 +889,7 @@ func (s *trackerSuite) TestVarchar20000(c *C) {
con, err := db.Conn(context.Background())
c.Assert(err, IsNil)
baseConn := conn.NewBaseConn(con, nil)
dbConn := &dbconn.DBConn{Cfg: s.cfg, BaseConn: baseConn}
dbConn := dbconn.NewDBConn(s.cfg, baseConn)
tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn)
c.Assert(err, IsNil)
defer func() {
Expand Down Expand Up @@ -929,7 +929,7 @@ func (s *trackerSuite) TestPlacementRule(c *C) {
con, err := db.Conn(context.Background())
c.Assert(err, IsNil)
baseConn := conn.NewBaseConn(con, nil)
dbConn := &dbconn.DBConn{Cfg: s.cfg, BaseConn: baseConn}
dbConn := dbconn.NewDBConn(s.cfg, baseConn)
tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn)
c.Assert(err, IsNil)
defer func() {
Expand Down Expand Up @@ -967,7 +967,7 @@ func TestTrackerRecreateTables(t *testing.T) {
defer db.Close()
con, err := db.Conn(context.Background())
require.NoError(t, err)
dbConn := &dbconn.DBConn{Cfg: cfg, BaseConn: conn.NewBaseConn(con, nil)}
dbConn := dbconn.NewDBConn(cfg, conn.NewBaseConn(con, nil))
log.SetLevel(zapcore.ErrorLevel)

tracker, err := NewTracker(context.Background(), "test-tracker", defaultTestSessionCfg, dbConn)
Expand Down
5 changes: 2 additions & 3 deletions dm/syncer/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,7 @@ func (s *testCheckpointSuite) TestCheckPoint(c *C) {

dbConn, err := db.Conn(tcontext.Background().Context())
c.Assert(err, IsNil)
conn := &dbconn.DBConn{Cfg: s.cfg, BaseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})}

conn := dbconn.NewDBConn(s.cfg, conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{}))
cp.(*RemoteCheckPoint).dbConn = conn
err = cp.(*RemoteCheckPoint).prepare(tctx)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -511,7 +510,7 @@ func TestRemoteCheckPointLoadIntoSchemaTracker(t *testing.T) {
require.NoError(t, err)
dbConn, err := db.Conn(ctx)
require.NoError(t, err)
downstreamTrackConn := &dbconn.DBConn{Cfg: cfg, BaseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})}
downstreamTrackConn := dbconn.NewDBConn(cfg, conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{}))
schemaTracker, err := schema.NewTracker(ctx, cfg.Name, defaultTestSessionCfg, downstreamTrackConn)
require.NoError(t, err)
defer schemaTracker.Close() //nolint
Expand Down
7 changes: 2 additions & 5 deletions dm/syncer/data_validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,7 @@ func genDBConn(t *testing.T, db *sql.DB, cfg *config.SubTaskConfig) *dbconn.DBCo
baseDB := conn.NewBaseDB(db, func() {})
baseConn, err := baseDB.GetBaseConn(context.Background())
require.NoError(t, err)
return &dbconn.DBConn{
BaseConn: baseConn,
Cfg: cfg,
}
return dbconn.NewDBConn(cfg, baseConn)
}

func TestValidatorStartStop(t *testing.T) {
Expand Down Expand Up @@ -335,7 +332,7 @@ func TestValidatorDoValidate(t *testing.T) {
)
dbConn, err := db.Conn(context.Background())
require.NoError(t, err)
syncerObj.downstreamTrackConn = &dbconn.DBConn{Cfg: cfg, BaseConn: conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})}
syncerObj.downstreamTrackConn = dbconn.NewDBConn(cfg, conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{}))
syncerObj.schemaTracker, err = schema.NewTracker(context.Background(), cfg.Name, defaultTestSessionCfg, syncerObj.downstreamTrackConn)
defer syncerObj.schemaTracker.Close()
require.NoError(t, err)
Expand Down
45 changes: 26 additions & 19 deletions dm/syncer/dbconn/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,26 +56,33 @@ func CloseBaseDB(logCtx *tcontext.Context, baseDB *conn.BaseDB) {
// DBConn represents a live DB connection
// it's not thread-safe.
type DBConn struct {
Cfg *config.SubTaskConfig
BaseConn *conn.BaseConn
cfg *config.SubTaskConfig
baseConn *conn.BaseConn

// generate new BaseConn and close old one
ResetBaseConnFn func(*tcontext.Context, *conn.BaseConn) (*conn.BaseConn, error)
}

func NewDBConn(cfg *config.SubTaskConfig, baseConn *conn.BaseConn) *DBConn {
return &DBConn{
cfg: cfg,
baseConn: baseConn,
}
}

// ResetConn reset one worker connection from specify *BaseDB.
func (conn *DBConn) ResetConn(tctx *tcontext.Context) error {
baseConn, err := conn.ResetBaseConnFn(tctx, conn.BaseConn)
baseConn, err := conn.ResetBaseConnFn(tctx, conn.baseConn)
if err != nil {
return err
}
conn.BaseConn = baseConn
conn.baseConn = baseConn
return nil
}

// QuerySQL does one query.
func (conn *DBConn) QuerySQL(tctx *tcontext.Context, query string, args ...interface{}) (*sql.Rows, error) {
if conn == nil || conn.BaseConn == nil {
if conn == nil || conn.baseConn == nil {
return nil, terror.ErrDBUnExpect.Generate("database base connection not valid")
}
// nolint:dupl
Expand All @@ -93,35 +100,35 @@ func (conn *DBConn) QuerySQL(tctx *tcontext.Context, query string, args ...inter
log.ShortError(err))
return false
}
metrics.SQLRetriesTotal.WithLabelValues("query", conn.Cfg.Name).Add(1)
metrics.SQLRetriesTotal.WithLabelValues("query", conn.cfg.Name).Add(1)
return true
}
if dbutil.IsRetryableError(err) {
tctx.L().Warn("query statement", zap.Int("retry", retryTime),
zap.String("query", utils.TruncateString(query, -1)),
zap.String("argument", utils.TruncateInterface(args, -1)),
log.ShortError(err))
metrics.SQLRetriesTotal.WithLabelValues("query", conn.Cfg.Name).Add(1)
metrics.SQLRetriesTotal.WithLabelValues("query", conn.cfg.Name).Add(1)
return true
}
return false
},
}

ret, _, err := conn.BaseConn.ApplyRetryStrategy(
ret, _, err := conn.baseConn.ApplyRetryStrategy(
tctx,
params,
func(ctx *tcontext.Context) (interface{}, error) {
startTime := time.Now()
ret, err := conn.BaseConn.QuerySQL(ctx, query, args...)
ret, err := conn.baseConn.QuerySQL(ctx, query, args...)
if err == nil {
if ret.Err() != nil {
return err, ret.Err()
}
cost := time.Since(startTime)
// duration seconds
ds := cost.Seconds()
metrics.QueryHistogram.WithLabelValues(conn.Cfg.Name, conn.Cfg.WorkerName, conn.Cfg.SourceID).Observe(ds)
metrics.QueryHistogram.WithLabelValues(conn.cfg.Name, conn.cfg.WorkerName, conn.cfg.SourceID).Observe(ds)
if ds > 1 {
ctx.L().Warn("query statement too slow",
zap.Duration("cost time", cost),
Expand Down Expand Up @@ -156,7 +163,7 @@ func (conn *DBConn) ExecuteSQLWithIgnore(tctx *tcontext.Context, ignoreError fun
return 0, nil
}

if conn == nil || conn.BaseConn == nil {
if conn == nil || conn.baseConn == nil {
return 0, terror.ErrDBUnExpect.Generate("database base connection not valid")
}

Expand All @@ -177,7 +184,7 @@ func (conn *DBConn) ExecuteSQLWithIgnore(tctx *tcontext.Context, ignoreError fun
}
tctx.L().Warn("execute sql failed by connection error", zap.Int("retry", retryTime),
zap.Error(err))
metrics.SQLRetriesTotal.WithLabelValues("stmt_exec", conn.Cfg.Name).Add(1)
metrics.SQLRetriesTotal.WithLabelValues("stmt_exec", conn.cfg.Name).Add(1)
return true
}
if dbutil.IsRetryableError(err) {
Expand All @@ -187,27 +194,27 @@ func (conn *DBConn) ExecuteSQLWithIgnore(tctx *tcontext.Context, ignoreError fun
log.ShortError(err))
tctx.L().Warn("execute sql failed by retryable error", zap.Int("retry", retryTime),
zap.Error(err))
metrics.SQLRetriesTotal.WithLabelValues("stmt_exec", conn.Cfg.Name).Add(1)
metrics.SQLRetriesTotal.WithLabelValues("stmt_exec", conn.cfg.Name).Add(1)
return true
}
return false
},
}

ret, _, err := conn.BaseConn.ApplyRetryStrategy(
ret, _, err := conn.baseConn.ApplyRetryStrategy(
tctx,
params,
func(ctx *tcontext.Context) (interface{}, error) {
startTime := time.Now()
ret, err := conn.BaseConn.ExecuteSQLWithIgnoreError(ctx, metrics.StmtHistogram, conn.Cfg.Name, ignoreError, queries, args...)
ret, err := conn.baseConn.ExecuteSQLWithIgnoreError(ctx, metrics.StmtHistogram, conn.cfg.Name, ignoreError, queries, args...)
if err == nil {
cost := time.Since(startTime)
// duration seconds
ds := cost.Seconds()
metrics.TxnHistogram.WithLabelValues(conn.Cfg.Name, conn.Cfg.WorkerName, conn.Cfg.SourceID).Observe(ds)
metrics.TxnHistogram.WithLabelValues(conn.cfg.Name, conn.cfg.WorkerName, conn.cfg.SourceID).Observe(ds)
// calculate idealJobCount metric: connection count * 1 / (one sql cost time)
qps := float64(conn.Cfg.WorkerCount) / (cost.Seconds() / float64(len(queries)))
metrics.IdealQPS.WithLabelValues(conn.Cfg.Name, conn.Cfg.WorkerName, conn.Cfg.SourceID).Set(qps)
qps := float64(conn.cfg.WorkerCount) / (cost.Seconds() / float64(len(queries)))
metrics.IdealQPS.WithLabelValues(conn.cfg.Name, conn.cfg.WorkerName, conn.cfg.SourceID).Set(qps)
if ds > 1 {
ctx.L().Warn("execute transaction too slow",
zap.Duration("cost time", cost),
Expand Down Expand Up @@ -252,7 +259,7 @@ func CreateConns(tctx *tcontext.Context, cfg *config.SubTaskConfig, dbCfg *confi
}
return baseDB.GetBaseConn(tctx.Context())
}
conns = append(conns, &DBConn{BaseConn: baseConn, Cfg: cfg, ResetBaseConnFn: resetBaseConnFn})
conns = append(conns, &DBConn{baseConn: baseConn, cfg: cfg, ResetBaseConnFn: resetBaseConnFn})
}
return baseDB, conns, nil
}
52 changes: 52 additions & 0 deletions dm/syncer/dbconn/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,17 @@ package dbconn

import (
"fmt"
"strconv"
"strings"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/parser"
tmysql "github.com/pingcap/tidb/parser/mysql"
tcontext "github.com/pingcap/tiflow/dm/pkg/context"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/dm/pkg/terror"
"github.com/pingcap/tiflow/dm/pkg/utils"
"go.uber.org/zap"
)

// GetTableCreateSQL gets table create sql by 'show create table schema.table'.
Expand All @@ -42,3 +50,47 @@ func GetTableCreateSQL(tctx *tcontext.Context, conn *DBConn, tableID string) (sq
}
return createStr, nil
}

func GetParserForConn(tctx *tcontext.Context, conn *DBConn) (*parser.Parser, error) {
sqlMode, err := getSessionVariable(tctx, conn, "sql_mode")
if err != nil {
return nil, err
}
return utils.GetParserFromSQLModeStr(sqlMode)
}

//nolint:unparam
func getSessionVariable(tctx *tcontext.Context, conn *DBConn, variable string) (value string, err error) {
failpoint.Inject("GetSessionVariableFailed", func(val failpoint.Value) {
items := strings.Split(val.(string), ",")
if len(items) != 2 {
log.L().Fatal("failpoint GetSessionVariableFailed's value is invalid", zap.String("val", val.(string)))
}
variableName := items[0]
errCode, err1 := strconv.ParseUint(items[1], 10, 16)
if err1 != nil {
log.L().Fatal("failpoint GetSessionVariableFailed's value is invalid", zap.String("val", val.(string)))
}
if variable == variableName {
err = tmysql.NewErr(uint16(errCode))
log.L().Warn("GetSessionVariable failed", zap.String("variable", variable), zap.String("failpoint", "GetSessionVariableFailed"), zap.Error(err))
failpoint.Return("", terror.DBErrorAdapt(err, terror.ErrDBDriverError))
}
})
template := "SHOW VARIABLES LIKE '%s'"
query := fmt.Sprintf(template, variable)
rows, err := conn.QuerySQL(tctx, query)
if err != nil {
return "", terror.DBErrorAdapt(err, terror.ErrDBDriverError)
}
defer rows.Close()
if rows.Next() {
if err = rows.Scan(&variable, &value); err != nil {
return "", terror.DBErrorAdapt(err, terror.ErrDBDriverError)
}
}
if err = rows.Close(); err != nil {
return "", terror.DBErrorAdapt(rows.Err(), terror.ErrDBDriverError)
}
return value, nil
}
18 changes: 11 additions & 7 deletions dm/syncer/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,9 @@ func newMysqlErr(number uint16, message string) *mysql.MySQLError {

func (s *testSyncerSuite) TestHandleSpecialDDLError(c *C) {
var (
syncer = NewSyncer(s.cfg, nil, nil)
tctx = tcontext.Background()
conn2 = &dbconn.DBConn{Cfg: s.cfg, ResetBaseConnFn: func(*tcontext.Context, *conn.BaseConn) (*conn.BaseConn, error) {
return nil, nil
}}
syncer = NewSyncer(s.cfg, nil, nil)
tctx = tcontext.Background()
conn2 = dbconn.NewDBConn(s.cfg, nil)
customErr = errors.New("custom error")
invalidDDL = "SQL CAN NOT BE PARSED"
insertDML = "INSERT INTO tbl VALUES (1)"
Expand Down Expand Up @@ -140,7 +138,9 @@ func (s *testSyncerSuite) TestHandleSpecialDDLError(c *C) {
},
}
)

conn2.ResetBaseConnFn = func(*tcontext.Context, *conn.BaseConn) (*conn.BaseConn, error) {
return nil, nil
}
for _, cs := range cases {
err2 := syncer.handleSpecialDDLError(tctx, cs.err, cs.ddls, cs.index, conn2)
if cs.handled {
Expand All @@ -159,7 +159,11 @@ func (s *testSyncerSuite) TestHandleSpecialDDLError(c *C) {
c.Assert(err, IsNil)
conn1, err := db.Conn(context.Background())
c.Assert(err, IsNil)
conn2.BaseConn = conn.NewBaseConn(conn1, nil)
conn2.ResetBaseConnFn = func(_ *tcontext.Context, _ *conn.BaseConn) (*conn.BaseConn, error) {
return conn.NewBaseConn(conn1, nil), nil
}
err = conn2.ResetConn(tctx)
c.Assert(err, IsNil)

// dropColumnF test successful
mock.ExpectQuery("SELECT INDEX_NAME FROM information_schema.statistics WHERE.*").WillReturnRows(
Expand Down
Loading

0 comments on commit b46e622

Please sign in to comment.