Skip to content

Commit

Permalink
dumpling: export consistency names, fix path log format and ping for …
Browse files Browse the repository at this point in the history
…new created db (#34084)

close #31064
  • Loading branch information
lichunzhu authored May 30, 2022
1 parent eb46685 commit 5d895ee
Show file tree
Hide file tree
Showing 10 changed files with 83 additions and 70 deletions.
11 changes: 6 additions & 5 deletions dumpling/export/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ import (
"github.com/docker/go-units"
"github.com/go-sql-driver/mysql"
"github.com/pingcap/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/pflag"
"go.uber.org/zap"

"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/version"
"github.com/pingcap/tidb/util"
filter "github.com/pingcap/tidb/util/table-filter"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/pflag"
"go.uber.org/zap"
)

const (
Expand Down Expand Up @@ -165,7 +166,7 @@ func DefaultConfig() *Config {
SortByPk: true,
Tables: nil,
Snapshot: "",
Consistency: consistencyTypeAuto,
Consistency: ConsistencyTypeAuto,
NoViews: true,
NoSequences: true,
Rows: UnspecifiedSize,
Expand Down Expand Up @@ -231,7 +232,7 @@ func (conf *Config) DefineFlags(flags *pflag.FlagSet) {
flags.String(flagLoglevel, "info", "Log level: {debug|info|warn|error|dpanic|panic|fatal}")
flags.StringP(flagLogfile, "L", "", "Log file `path`, leave empty to write to console")
flags.String(flagLogfmt, "text", "Log `format`: {text|json}")
flags.String(flagConsistency, consistencyTypeAuto, "Consistency level during dumping: {auto|none|flush|lock|snapshot}")
flags.String(flagConsistency, ConsistencyTypeAuto, "Consistency level during dumping: {auto|none|flush|lock|snapshot}")
flags.String(flagSnapshot, "", "Snapshot position (uint64 or MySQL style string timestamp). Valid only when consistency=snapshot")
flags.BoolP(flagNoViews, "W", true, "Do not dump views")
flags.Bool(flagNoSequences, true, "Do not dump sequences")
Expand Down
24 changes: 15 additions & 9 deletions dumpling/export/consistency.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,17 @@ import (
)

const (
consistencyTypeAuto = "auto"
consistencyTypeFlush = "flush"
consistencyTypeLock = "lock"
consistencyTypeSnapshot = "snapshot"
consistencyTypeNone = "none"
// ConsistencyTypeAuto will use flush for MySQL/MariaDB and snapshot for TiDB.
ConsistencyTypeAuto = "auto"
// ConsistencyTypeFlush will use FLUSH TABLES WITH READ LOCK to temporarily interrupt the DML and DDL operations of the replica database,
// to ensure the global consistency of the backup connection.
ConsistencyTypeFlush = "flush"
// ConsistencyTypeLock will add read locks on all tables to be exported.
ConsistencyTypeLock = "lock"
// ConsistencyTypeSnapshot gets a consistent snapshot of the specified timestamp and exports it.
ConsistencyTypeSnapshot = "snapshot"
// ConsistencyTypeNone doesn't guarantee for consistency.
ConsistencyTypeNone = "none"
)

var tiDBDisableTableLockErr = errors.New("try to apply lock consistency on TiDB but it doesn't enable table lock. please set enable-table-lock=true in tidb server config")
Expand All @@ -30,22 +36,22 @@ func NewConsistencyController(ctx context.Context, conf *Config, session *sql.DB
return nil, errors.Trace(err)
}
switch conf.Consistency {
case consistencyTypeFlush:
case ConsistencyTypeFlush:
return &ConsistencyFlushTableWithReadLock{
serverType: conf.ServerInfo.ServerType,
conn: conn,
}, nil
case consistencyTypeLock:
case ConsistencyTypeLock:
return &ConsistencyLockDumpingTables{
conn: conn,
conf: conf,
}, nil
case consistencyTypeSnapshot:
case ConsistencyTypeSnapshot:
if conf.ServerInfo.ServerType != version.ServerTypeTiDB {
return nil, errors.New("snapshot consistency is not supported for this server")
}
return &ConsistencyNone{}, nil
case consistencyTypeNone:
case ConsistencyTypeNone:
return &ConsistencyNone{}, nil
default:
return nil, errors.Errorf("invalid consistency option %s", conf.Consistency)
Expand Down
28 changes: 14 additions & 14 deletions dumpling/export/consistency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ func TestConsistencyController(t *testing.T) {
conf := defaultConfigForTest(t)
resultOk := sqlmock.NewResult(0, 1)

conf.Consistency = consistencyTypeNone
conf.Consistency = ConsistencyTypeNone
ctrl, _ := NewConsistencyController(ctx, conf, db)
_, ok := ctrl.(*ConsistencyNone)
require.True(t, ok)
require.NoError(t, ctrl.Setup(tctx))
require.NoError(t, ctrl.TearDown(tctx))

conf.Consistency = consistencyTypeFlush
conf.Consistency = ConsistencyTypeFlush
mock.ExpectExec("FLUSH TABLES WITH READ LOCK").WillReturnResult(resultOk)
mock.ExpectExec("UNLOCK TABLES").WillReturnResult(resultOk)
ctrl, _ = NewConsistencyController(ctx, conf, db)
Expand All @@ -48,7 +48,7 @@ func TestConsistencyController(t *testing.T) {
require.NoError(t, ctrl.TearDown(tctx))
require.NoError(t, mock.ExpectationsWereMet())

conf.Consistency = consistencyTypeSnapshot
conf.Consistency = ConsistencyTypeSnapshot
conf.ServerInfo.ServerType = version.ServerTypeTiDB
ctrl, _ = NewConsistencyController(ctx, conf, db)
_, ok = ctrl.(*ConsistencyNone)
Expand All @@ -57,7 +57,7 @@ func TestConsistencyController(t *testing.T) {
require.NoError(t, ctrl.TearDown(tctx))

conf.ServerInfo.ServerType = version.ServerTypeMySQL
conf.Consistency = consistencyTypeLock
conf.Consistency = ConsistencyTypeLock
conf.Tables = NewDatabaseTables().
AppendTables("db1", []string{"t1", "t2", "t3"}, []uint64{1, 2, 3}).
AppendViews("db2", "t4")
Expand Down Expand Up @@ -85,7 +85,7 @@ func TestConsistencyLockControllerRetry(t *testing.T) {
resultOk := sqlmock.NewResult(0, 1)

conf.ServerInfo.ServerType = version.ServerTypeMySQL
conf.Consistency = consistencyTypeLock
conf.Consistency = ConsistencyTypeLock
conf.Tables = NewDatabaseTables().
AppendTables("db1", []string{"t1", "t2", "t3"}, []uint64{1, 2, 3}).
AppendViews("db2", "t4")
Expand Down Expand Up @@ -113,14 +113,14 @@ func TestResolveAutoConsistency(t *testing.T) {
serverTp version.ServerType
resolvedConsistency string
}{
{version.ServerTypeTiDB, consistencyTypeSnapshot},
{version.ServerTypeMySQL, consistencyTypeFlush},
{version.ServerTypeMariaDB, consistencyTypeFlush},
{version.ServerTypeUnknown, consistencyTypeNone},
{version.ServerTypeTiDB, ConsistencyTypeSnapshot},
{version.ServerTypeMySQL, ConsistencyTypeFlush},
{version.ServerTypeMariaDB, ConsistencyTypeFlush},
{version.ServerTypeUnknown, ConsistencyTypeNone},
}

for _, x := range cases {
conf.Consistency = consistencyTypeAuto
conf.Consistency = ConsistencyTypeAuto
conf.ServerInfo.ServerType = x.serverTp
d := &Dumper{conf: conf}
require.NoError(t, resolveAutoConsistency(d))
Expand All @@ -146,20 +146,20 @@ func TestConsistencyControllerError(t *testing.T) {
require.Contains(t, err.Error(), "invalid consistency option")

// snapshot consistency is only available in TiDB
conf.Consistency = consistencyTypeSnapshot
conf.Consistency = ConsistencyTypeSnapshot
conf.ServerInfo.ServerType = version.ServerTypeUnknown
_, err = NewConsistencyController(ctx, conf, db)
require.Error(t, err)

// flush consistency is unavailable in TiDB
conf.Consistency = consistencyTypeFlush
conf.Consistency = ConsistencyTypeFlush
conf.ServerInfo.ServerType = version.ServerTypeTiDB
ctrl, _ := NewConsistencyController(ctx, conf, db)
err = ctrl.Setup(tctx)
require.Error(t, err)

// lock table fail
conf.Consistency = consistencyTypeLock
conf.Consistency = ConsistencyTypeLock
conf.Tables = NewDatabaseTables().AppendTables("db", []string{"t"}, []uint64{1})
mock.ExpectExec("LOCK TABLE").WillReturnError(errors.New(""))
ctrl, _ = NewConsistencyController(ctx, conf, db)
Expand All @@ -181,7 +181,7 @@ func TestConsistencyLockTiDBCheck(t *testing.T) {
resultOk := sqlmock.NewResult(0, 1)

conf.ServerInfo.ServerType = version.ServerTypeTiDB
conf.Consistency = consistencyTypeLock
conf.Consistency = ConsistencyTypeLock
conf.Tables = NewDatabaseTables().
AppendTables("db1", []string{"t1"}, []uint64{1})
ctrl, err := NewConsistencyController(ctx, conf, db)
Expand Down
26 changes: 13 additions & 13 deletions dumpling/export/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (d *Dumper) Dump() (dumpErr error) {
}()

// for consistency lock, we should get table list at first to generate the lock tables SQL
if conf.Consistency == consistencyTypeLock {
if conf.Consistency == ConsistencyTypeLock {
conn, err = createConnWithConsistency(tctx, pool, repeatableRead)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -176,7 +176,7 @@ func (d *Dumper) Dump() (dumpErr error) {
}

// for other consistencies, we should get table list after consistency is set up and GlobalMetaData is cached
if conf.Consistency != consistencyTypeLock {
if conf.Consistency != ConsistencyTypeLock {
if err = prepareTableListToDump(tctx, conf, metaConn); err != nil {
return err
}
Expand Down Expand Up @@ -221,7 +221,7 @@ func (d *Dumper) Dump() (dumpErr error) {
defer tearDownWriters()

if conf.TransactionalConsistency {
if conf.Consistency == consistencyTypeFlush || conf.Consistency == consistencyTypeLock {
if conf.Consistency == ConsistencyTypeFlush || conf.Consistency == ConsistencyTypeLock {
tctx.L().Info("All the dumping transactions have started. Start to unlock tables")
}
if err = conCtrl.TearDown(tctx); err != nil {
Expand Down Expand Up @@ -1075,10 +1075,10 @@ func extractTiDBRowIDFromDecodedKey(indexField, key string) (string, error) {
func getListTableTypeByConf(conf *Config) listTableType {
// use listTableByShowTableStatus by default because it has better performance
listType := listTableByShowTableStatus
if conf.Consistency == consistencyTypeLock {
if conf.Consistency == ConsistencyTypeLock {
// for consistency lock, we need to build the tables to dump as soon as possible
listType = listTableByInfoSchema
} else if conf.Consistency == consistencyTypeFlush && matchMysqlBugversion(conf.ServerInfo) {
} else if conf.Consistency == ConsistencyTypeFlush && matchMysqlBugversion(conf.ServerInfo) {
// For some buggy versions of mysql, we need a workaround to get a list of table names.
listType = listTableByShowFullTables
}
Expand Down Expand Up @@ -1208,9 +1208,9 @@ func (d *Dumper) dumpSQL(tctx *tcontext.Context, metaConn *BaseConn, taskChan ch

func canRebuildConn(consistency string, trxConsistencyOnly bool) bool {
switch consistency {
case consistencyTypeLock, consistencyTypeFlush:
case ConsistencyTypeLock, ConsistencyTypeFlush:
return !trxConsistencyOnly
case consistencyTypeSnapshot, consistencyTypeNone:
case ConsistencyTypeSnapshot, ConsistencyTypeNone:
return true
default:
return false
Expand Down Expand Up @@ -1313,23 +1313,23 @@ func detectServerInfo(d *Dumper) error {
// resolveAutoConsistency is an initialization step of Dumper.
func resolveAutoConsistency(d *Dumper) error {
conf := d.conf
if conf.Consistency != consistencyTypeAuto {
if conf.Consistency != ConsistencyTypeAuto {
return nil
}
switch conf.ServerInfo.ServerType {
case version.ServerTypeTiDB:
conf.Consistency = consistencyTypeSnapshot
conf.Consistency = ConsistencyTypeSnapshot
case version.ServerTypeMySQL, version.ServerTypeMariaDB:
conf.Consistency = consistencyTypeFlush
conf.Consistency = ConsistencyTypeFlush
default:
conf.Consistency = consistencyTypeNone
conf.Consistency = ConsistencyTypeNone
}
return nil
}

func validateResolveAutoConsistency(d *Dumper) error {
conf := d.conf
if conf.Consistency != consistencyTypeSnapshot && conf.Snapshot != "" {
if conf.Consistency != ConsistencyTypeSnapshot && conf.Snapshot != "" {
return errors.Errorf("can't specify --snapshot when --consistency isn't snapshot, resolved consistency: %s", conf.Consistency)
}
return nil
Expand Down Expand Up @@ -1457,7 +1457,7 @@ func setSessionParam(d *Dumper) error {
if si.ServerType != version.ServerTypeTiDB {
return errors.New("snapshot consistency is not supported for this server")
}
if consistency == consistencyTypeSnapshot {
if consistency == ConsistencyTypeSnapshot {
conf.ServerInfo.HasTiKV, err = CheckTiDBWithTiKV(pool)
if err != nil {
d.L().Info("cannot check whether TiDB has TiKV, will apply tidb_snapshot by default. This won't affect dump process", log.ShortError(err))
Expand Down
15 changes: 8 additions & 7 deletions dumpling/export/dump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"golang.org/x/sync/errgroup"

"github.com/pingcap/errors"

"github.com/pingcap/tidb/br/pkg/version"
tcontext "github.com/pingcap/tidb/dumpling/context"
"github.com/pingcap/tidb/parser"
Expand Down Expand Up @@ -113,16 +114,16 @@ func TestGetListTableTypeByConf(t *testing.T) {
consistency string
expected listTableType
}{
{version.ParseServerInfo("5.7.25-TiDB-3.0.6"), consistencyTypeSnapshot, listTableByShowTableStatus},
{version.ParseServerInfo("5.7.25-TiDB-3.0.6"), ConsistencyTypeSnapshot, listTableByShowTableStatus},
// no bug version
{version.ParseServerInfo("8.0.2"), consistencyTypeLock, listTableByInfoSchema},
{version.ParseServerInfo("8.0.2"), consistencyTypeFlush, listTableByShowTableStatus},
{version.ParseServerInfo("8.0.23"), consistencyTypeNone, listTableByShowTableStatus},
{version.ParseServerInfo("8.0.2"), ConsistencyTypeLock, listTableByInfoSchema},
{version.ParseServerInfo("8.0.2"), ConsistencyTypeFlush, listTableByShowTableStatus},
{version.ParseServerInfo("8.0.23"), ConsistencyTypeNone, listTableByShowTableStatus},

// bug version
{version.ParseServerInfo("8.0.3"), consistencyTypeLock, listTableByInfoSchema},
{version.ParseServerInfo("8.0.3"), consistencyTypeFlush, listTableByShowFullTables},
{version.ParseServerInfo("8.0.3"), consistencyTypeNone, listTableByShowTableStatus},
{version.ParseServerInfo("8.0.3"), ConsistencyTypeLock, listTableByInfoSchema},
{version.ParseServerInfo("8.0.3"), ConsistencyTypeFlush, listTableByShowFullTables},
{version.ParseServerInfo("8.0.3"), ConsistencyTypeNone, listTableByShowTableStatus},
}

for _, x := range cases {
Expand Down
20 changes: 10 additions & 10 deletions dumpling/export/prepare_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,16 +308,16 @@ func TestValidateResolveAutoConsistency(t *testing.T) {
confSnapshot string
err bool
}{
{consistencyTypeAuto, "", true},
{consistencyTypeAuto, "123", false},
{consistencyTypeFlush, "", true},
{consistencyTypeFlush, "456", false},
{consistencyTypeLock, "", true},
{consistencyTypeLock, "789", false},
{consistencyTypeSnapshot, "", true},
{consistencyTypeSnapshot, "456", true},
{consistencyTypeNone, "", true},
{consistencyTypeNone, "123", false},
{ConsistencyTypeAuto, "", true},
{ConsistencyTypeAuto, "123", false},
{ConsistencyTypeFlush, "", true},
{ConsistencyTypeFlush, "456", false},
{ConsistencyTypeLock, "", true},
{ConsistencyTypeLock, "789", false},
{ConsistencyTypeSnapshot, "", true},
{ConsistencyTypeSnapshot, "456", true},
{ConsistencyTypeNone, "", true},
{ConsistencyTypeNone, "123", false},
}
for _, testCase := range testCases {
conf.Consistency = testCase.confConsistency
Expand Down
7 changes: 6 additions & 1 deletion dumpling/export/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -873,9 +873,14 @@ func resetDBWithSessionParams(tctx *tcontext.Context, db *sql.DB, dsn string, pa
dsn += fmt.Sprintf("&%s=%s", k, url.QueryEscape(s))
}

db.Close()
newDB, err := sql.Open("mysql", dsn)
if err == nil {
db.Close()
// ping to make sure all session parameters are set correctly
err = newDB.PingContext(tctx)
if err != nil {
newDB.Close()
}
}
return newDB, errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion dumpling/export/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,5 +77,5 @@ func string2Map(a, b []string) map[string]string {
}

func needRepeatableRead(serverType version.ServerType, consistency string) bool {
return consistency != consistencyTypeSnapshot || serverType != version.ServerTypeTiDB
return consistency != ConsistencyTypeSnapshot || serverType != version.ServerTypeTiDB
}
15 changes: 8 additions & 7 deletions dumpling/export/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,19 @@ import (
"fmt"
"testing"

"github.com/pingcap/tidb/br/pkg/version"
"github.com/stretchr/testify/require"

"github.com/pingcap/tidb/br/pkg/version"
)

func TestRepeatableRead(t *testing.T) {
data := [][]interface{}{
{version.ServerTypeUnknown, consistencyTypeNone, true},
{version.ServerTypeMySQL, consistencyTypeFlush, true},
{version.ServerTypeMariaDB, consistencyTypeLock, true},
{version.ServerTypeTiDB, consistencyTypeNone, true},
{version.ServerTypeTiDB, consistencyTypeSnapshot, false},
{version.ServerTypeTiDB, consistencyTypeLock, true},
{version.ServerTypeUnknown, ConsistencyTypeNone, true},
{version.ServerTypeMySQL, ConsistencyTypeFlush, true},
{version.ServerTypeMariaDB, ConsistencyTypeLock, true},
{version.ServerTypeTiDB, ConsistencyTypeNone, true},
{version.ServerTypeTiDB, ConsistencyTypeSnapshot, false},
{version.ServerTypeTiDB, ConsistencyTypeLock, true},
}
dec := func(d []interface{}) (version.ServerType, string, bool) {
return version.ServerType(d[0].(int)), d[1].(string), d[2].(bool)
Expand Down
Loading

0 comments on commit 5d895ee

Please sign in to comment.