From 0eb0fa1646e8060aba55aa8512ee516a69a95676 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Mon, 23 Aug 2021 14:16:03 +0800 Subject: [PATCH] *: reduce dumpling accessing database and information_schema usage to improve its stability (#305) (#314) --- Makefile | 1 + go.mod | 3 - go.sum | 13 - tests/run.sh | 2 - tests/views/run.sh | 12 +- v4/export/block_allow_list.go | 6 +- v4/export/block_allow_list_test.go | 10 +- v4/export/consistency.go | 22 +- v4/export/consistency_test.go | 19 +- v4/export/dump.go | 213 ++++++----- v4/export/dump_test.go | 39 ++- v4/export/ir.go | 4 + v4/export/ir_impl.go | 35 +- v4/export/metrics.go | 45 ++- v4/export/prepare.go | 51 ++- v4/export/prepare_test.go | 94 ++++- v4/export/sql.go | 298 ++++++++-------- v4/export/sql_test.go | 544 ++++++++++++----------------- v4/export/status.go | 25 +- v4/export/test_util.go | 39 ++- v4/export/util.go | 8 + v4/export/writer_util.go | 78 +++-- v4/export/writer_util_test.go | 125 +++++-- 23 files changed, 943 insertions(+), 743 deletions(-) diff --git a/Makefile b/Makefile index 545166eb..108c118f 100644 --- a/Makefile +++ b/Makefile @@ -96,6 +96,7 @@ static: tools --disable gosec \ --disable errorlint \ --disable sqlclosecheck \ + --disable scopelint \ $$($(PACKAGE_DIRECTORIES)) # pingcap/errors APIs are mixed with multiple patterns 'pkg/errors', # 'juju/errors' and 'pingcap/parser'. To avoid confusion and mistake, diff --git a/go.mod b/go.mod index bbabfdc2..3744ad0b 100644 --- a/go.mod +++ b/go.mod @@ -23,9 +23,6 @@ require ( github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible github.com/prometheus/client_golang v1.5.1 github.com/prometheus/client_model v0.2.0 - github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b // indirect - github.com/siddontang/go-log v0.0.0-20190221022429-1e957dd83bed // indirect - github.com/siddontang/go-mysql v0.0.0-20200222075837-12e89848f047 github.com/soheilhy/cmux v0.1.4 github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/spf13/pflag v1.0.5 diff --git a/go.sum b/go.sum index 0da64174..8011ef6e 100644 --- a/go.sum +++ b/go.sum @@ -464,7 +464,6 @@ github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIf github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4 h1:ERrF0fTuIOnwfGbt71Ji3DKbOEaP189tjym50u8gpC8= github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= -github.com/pingcap/parser v0.0.0-20190506092653-e336082eb825/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA= github.com/pingcap/parser v0.0.0-20210623034316-5ee95ed0081f h1:YaLHP/U93ZPMIPDH5vpX3oZDZM3oOw/QvZESl4bBcYM= github.com/pingcap/parser v0.0.0-20210623034316-5ee95ed0081f/go.mod h1:GbEr2PgY72/4XqPZzmzstlOU/+il/wrjeTNFs6ihsSE= github.com/pingcap/sysutil v0.0.0-20200206130906-2bfa6dc40bcd/go.mod h1:EB/852NMQ+aRKioCpToQ94Wl7fktV+FNnxf3CX/TTXI= @@ -476,7 +475,6 @@ github.com/pingcap/tidb v1.1.0-beta.0.20210817073159-2fbd25d032df/go.mod h1:hWsO github.com/pingcap/tidb-dashboard v0.0.0-20210312062513-eef5d6404638/go.mod h1:OzFN8H0EDMMqeulPhPMw2i2JaiZWOKFQ7zdRPhENNgo= github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible h1:ceznmu/lLseGHP/jKyOa/3u/5H3wtLLLqkH2V3ssSjg= github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= -github.com/pingcap/tipb v0.0.0-20190428032612-535e1abaa330/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI= github.com/pingcap/tipb v0.0.0-20210601083426-79a378b6d1c4 h1:n47+OwdI/uxKenfBT8Y2/be11MwbeLKNLdzOWnxNQKg= github.com/pingcap/tipb v0.0.0-20210601083426-79a378b6d1c4/go.mod h1:nsEhnMokcn7MRqd2J60yxpn/ac3ZH8A6GOJ9NslabUo= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -525,8 +523,6 @@ github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/sasha-s/go-deadlock v0.2.0/go.mod h1:StQn567HiB1fF2yJ44N9au7wOhrPS3iZqiDbRupzT10= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= -github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b h1:gQZ0qzfKHQIybLANtM3mBXNUtOfsCFXeTsnBqCsx1KM= -github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/sergi/go-diff v1.0.1-0.20180205163309-da645544ed44/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= github.com/shirou/gopsutil v2.19.10+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= @@ -534,22 +530,13 @@ github.com/shirou/gopsutil v3.20.12+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMT github.com/shirou/gopsutil v3.21.2+incompatible h1:U+YvJfjCh6MslYlIAXvPtzhW3YZEtc9uncueUNpD/0A= github.com/shirou/gopsutil v3.21.2+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/shirou/w32 v0.0.0-20160930032740-bb4de0191aa4/go.mod h1:qsXQc7+bwAM3Q1u/4XEfrquwF8Lw7D7y5cD8CuHnfIc= -github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4= github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749 h1:bUGsEnyNbVPw06Bs80sCeARAlK8lhwqGyi6UT8ymuGk= github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg= github.com/shurcooL/httpgzip v0.0.0-20190720172056-320755c1c1b0/go.mod h1:919LwcH0M7/W4fcZ0/jy0qGght1GIhqyS/EgWGH2j5Q= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd h1:ug7PpSOB5RBPK1Kg6qskGBoP3Vnj/aNYFTznWvlkGo0= github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw= -github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726 h1:xT+JlYxNGqyT+XcU8iUrN18JYed2TvG9yN5ULG2jATM= -github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726/go.mod h1:3yhqj7WBBfRhbBlzyOC3gUxftwsU0u8gqevxwIHQpMw= -github.com/siddontang/go-log v0.0.0-20180807004314-8d05993dda07/go.mod h1:yFdBgwXP24JziuRl2NMUahT7nGLNOKi1SIiFxMttVD4= -github.com/siddontang/go-log v0.0.0-20190221022429-1e957dd83bed h1:KMgQoLJGCq1IoZpLZE3AIffh9veYWoVlsvA4ib55TMM= -github.com/siddontang/go-log v0.0.0-20190221022429-1e957dd83bed/go.mod h1:yFdBgwXP24JziuRl2NMUahT7nGLNOKi1SIiFxMttVD4= -github.com/siddontang/go-mysql v0.0.0-20200222075837-12e89848f047 h1:boyJ8EgQN/aC3grvx8QUoJrptt7RvneezSJSCbW25a4= -github.com/siddontang/go-mysql v0.0.0-20200222075837-12e89848f047/go.mod h1:+W4RCzesQDI11HvIkaDjS8yM36SpAnGNQ7jmTLn5BnU= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= -github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0 h1:UBcNElsrwanuuMsnGSlYmtmgbb23qDR5dG+6X6Oo89I= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= diff --git a/tests/run.sh b/tests/run.sh index c0de699e..407a1af6 100755 --- a/tests/run.sh +++ b/tests/run.sh @@ -4,8 +4,6 @@ # To avoid permission denied error, please run `chmod +x tests/_utils/*`. -set -e - DUMPLING_TEST_DIR=${DUMPLING_TEST_DIR:-"/tmp/dumpling_test_result"} DUMPLING_TEST_USER=${DUMPLING_TEST_USER:-"root"} diff --git a/tests/views/run.sh b/tests/views/run.sh index f0d8d001..c9462e6c 100644 --- a/tests/views/run.sh +++ b/tests/views/run.sh @@ -17,8 +17,18 @@ run_dumpling --no-views file_not_exist "$DUMPLING_OUTPUT_DIR/views.v-schema.sql" file_not_exist "$DUMPLING_OUTPUT_DIR/views.v-schema-view.sql" +rm -rf $DUMPLING_OUTPUT_DIR run_dumpling --no-views=false #diff "$DUMPLING_BASE_NAME/data/views-schema-create.sql" "$DUMPLING_OUTPUT_DIR/views-schema-create.sql" diff "$DUMPLING_BASE_NAME/data/views.v-schema.sql" "$DUMPLING_OUTPUT_DIR/views.v-schema.sql" diff "$DUMPLING_BASE_NAME/data/views.v-schema-view.sql" "$DUMPLING_OUTPUT_DIR/views.v-schema-view.sql" -file_not_exist "$DUMPLING_BASE_NAME/data/views.v.000000000.sql" \ No newline at end of file +file_not_exist "$DUMPLING_OUTPUT_DIR/views.v.000000000.sql" + +# test --no-schemas +rm -rf $DUMPLING_OUTPUT_DIR +run_dumpling --no-schemas +file_not_exist "$DUMPLING_OUTPUT_DIR/views-schema-create.sql" +file_not_exist "$DUMPLING_OUTPUT_DIR/views.t-schema.sql" +file_not_exist "$DUMPLING_OUTPUT_DIR/views.v-schema.sql" +file_not_exist "$DUMPLING_OUTPUT_DIR/views.v-schema-view.sql" +file_not_exist "$DUMPLING_OUTPUT_DIR/views.v.000000000.sql" diff --git a/v4/export/block_allow_list.go b/v4/export/block_allow_list.go index 2ce57eb3..381bea64 100644 --- a/v4/export/block_allow_list.go +++ b/v4/export/block_allow_list.go @@ -9,13 +9,17 @@ import ( ) func filterTables(tctx *tcontext.Context, conf *Config) { + filterTablesFunc(tctx, conf, conf.TableFilter.MatchTable) +} + +func filterTablesFunc(tctx *tcontext.Context, conf *Config, matchTable func(string, string) bool) { tctx.L().Debug("start to filter tables") dbTables := DatabaseTables{} ignoredDBTable := DatabaseTables{} for dbName, tables := range conf.Tables { for _, table := range tables { - if conf.TableFilter.MatchTable(dbName, table.Name) { + if matchTable(dbName, table.Name) { dbTables.AppendTable(dbName, table) } else { ignoredDBTable.AppendTable(dbName, table) diff --git a/v4/export/block_allow_list_test.go b/v4/export/block_allow_list_test.go index b82a3c41..fddc7716 100644 --- a/v4/export/block_allow_list_test.go +++ b/v4/export/block_allow_list_test.go @@ -20,11 +20,11 @@ func (s *testBWListSuite) TestFilterTables(c *C) { dbTables := DatabaseTables{} expectedDBTables := DatabaseTables{} - dbTables.AppendTables(filter.InformationSchemaName, []string{"xxx"}...) - dbTables.AppendTables(strings.ToUpper(filter.PerformanceSchemaName), []string{"xxx"}...) - dbTables.AppendTables("xxx", []string{"yyy"}...) - expectedDBTables.AppendTables("xxx", []string{"yyy"}...) - dbTables.AppendTables("yyy", []string{"xxx"}...) + dbTables.AppendTables(filter.InformationSchemaName, []string{"xxx"}, []uint64{0}) + dbTables.AppendTables(strings.ToUpper(filter.PerformanceSchemaName), []string{"xxx"}, []uint64{0}) + dbTables.AppendTables("xxx", []string{"yyy"}, []uint64{0}) + expectedDBTables.AppendTables("xxx", []string{"yyy"}, []uint64{0}) + dbTables.AppendTables("yyy", []string{"xxx"}, []uint64{0}) tableFilter, err := tf.Parse([]string{"*.*"}) c.Assert(err, IsNil) diff --git a/v4/export/consistency.go b/v4/export/consistency.go index 0c418ffe..68fbea00 100644 --- a/v4/export/consistency.go +++ b/v4/export/consistency.go @@ -34,8 +34,8 @@ func NewConsistencyController(ctx context.Context, conf *Config, session *sql.DB }, nil case consistencyTypeLock: return &ConsistencyLockDumpingTables{ - conn: conn, - allTables: conf.Tables, + conn: conn, + conf: conf, }, nil case consistencyTypeSnapshot: if conf.ServerInfo.ServerType != ServerTypeTiDB { @@ -110,16 +110,28 @@ func (c *ConsistencyFlushTableWithReadLock) PingContext(ctx context.Context) err // ConsistencyLockDumpingTables execute lock tables read on all tables before dump type ConsistencyLockDumpingTables struct { - conn *sql.Conn - allTables DatabaseTables + conn *sql.Conn + conf *Config } // Setup implements ConsistencyController.Setup func (c *ConsistencyLockDumpingTables) Setup(tctx *tcontext.Context) error { blockList := make(map[string]map[string]interface{}) return utils.WithRetry(tctx, func() error { - lockTablesSQL := buildLockTablesSQL(c.allTables, blockList) + lockTablesSQL := buildLockTablesSQL(c.conf.Tables, blockList) _, err := c.conn.ExecContext(tctx, lockTablesSQL) + if err == nil { + if len(blockList) > 0 { + filterTablesFunc(tctx, c.conf, func(db string, tbl string) bool { + if blockTable, ok := blockList[db]; ok { + if _, ok := blockTable[tbl]; ok { + return false + } + } + return true + }) + } + } return errors.Trace(err) }, newLockTablesBackoffer(tctx, blockList)) } diff --git a/v4/export/consistency_test.go b/v4/export/consistency_test.go index 2f1b8834..931dc49e 100644 --- a/v4/export/consistency_test.go +++ b/v4/export/consistency_test.go @@ -65,7 +65,7 @@ func (s *testConsistencySuite) TestConsistencyController(c *C) { conf.Consistency = consistencyTypeLock conf.Tables = NewDatabaseTables(). - AppendTables("db1", "t1", "t2", "t3"). + AppendTables("db1", []string{"t1", "t2", "t3"}, []uint64{1, 2, 3}). AppendViews("db2", "t4") mock.ExpectExec("LOCK TABLES `db1`.`t1` READ,`db1`.`t2` READ,`db1`.`t3` READ").WillReturnResult(resultOk) mock.ExpectExec("UNLOCK TABLES").WillReturnResult(resultOk) @@ -73,9 +73,7 @@ func (s *testConsistencySuite) TestConsistencyController(c *C) { _, ok = ctrl.(*ConsistencyLockDumpingTables) c.Assert(ok, IsTrue) s.assertLifetimeErrNil(tctx, ctrl, c) - if err = mock.ExpectationsWereMet(); err != nil { - c.Fatal(err.Error()) - } + c.Assert(mock.ExpectationsWereMet(), IsNil) } func (s *testConsistencySuite) TestConsistencyLockControllerRetry(c *C) { @@ -90,7 +88,7 @@ func (s *testConsistencySuite) TestConsistencyLockControllerRetry(c *C) { conf.Consistency = consistencyTypeLock conf.Tables = NewDatabaseTables(). - AppendTables("db1", "t1", "t2", "t3"). + AppendTables("db1", []string{"t1", "t2", "t3"}, []uint64{1, 2, 3}). AppendViews("db2", "t4") mock.ExpectExec("LOCK TABLES `db1`.`t1` READ,`db1`.`t2` READ,`db1`.`t3` READ"). WillReturnError(&mysql.MySQLError{Number: ErrNoSuchTable, Message: "Table 'db1.t3' doesn't exist"}) @@ -100,9 +98,12 @@ func (s *testConsistencySuite) TestConsistencyLockControllerRetry(c *C) { _, ok := ctrl.(*ConsistencyLockDumpingTables) c.Assert(ok, IsTrue) s.assertLifetimeErrNil(tctx, ctrl, c) - if err = mock.ExpectationsWereMet(); err != nil { - c.Fatal(err.Error()) - } + // should remove table db1.t3 in tables to dump + expectedDumpTables := NewDatabaseTables(). + AppendTables("db1", []string{"t1", "t2"}, []uint64{1, 2}). + AppendViews("db2", "t4") + c.Assert(conf.Tables, DeepEquals, expectedDumpTables) + c.Assert(mock.ExpectationsWereMet(), IsNil) } func (s *testConsistencySuite) TestResolveAutoConsistency(c *C) { @@ -156,7 +157,7 @@ func (s *testConsistencySuite) TestConsistencyControllerError(c *C) { // lock table fail conf.Consistency = consistencyTypeLock - conf.Tables = NewDatabaseTables().AppendTables("db", "t") + conf.Tables = NewDatabaseTables().AppendTables("db", []string{"t"}, []uint64{1}) mock.ExpectExec("LOCK TABLE").WillReturnError(errors.New("")) ctrl, _ = NewConsistencyController(ctx, conf, db) err = ctrl.Setup(tctx) diff --git a/v4/export/dump.go b/v4/export/dump.go index 5f0d6582..cd9a86b6 100755 --- a/v4/export/dump.go +++ b/v4/export/dump.go @@ -45,7 +45,7 @@ type Dumper struct { dbHandle *sql.DB tidbPDClientForGC pd.Client - selectTiDBTableRegionFunc func(tctx *tcontext.Context, conn *sql.Conn, dbName, tableName string) (pkFields []string, pkVals [][]string, err error) + selectTiDBTableRegionFunc func(tctx *tcontext.Context, conn *sql.Conn, meta TableMeta) (pkFields []string, pkVals [][]string, err error) } // NewDumper returns a new Dumper @@ -226,11 +226,6 @@ func (d *Dumper) Dump() (dumpErr error) { } }) - // get estimate total count - if err = d.getEstimateTotalRowsCount(tctx, metaConn); err != nil { - tctx.L().Error("fail to get estimate total count", zap.Error(err)) - } - if conf.SQL == "" { if err = d.dumpDatabases(writerCtx, metaConn, taskChan); err != nil && !errors.ErrorEqual(err, context.Canceled) { return err @@ -239,6 +234,7 @@ func (d *Dumper) Dump() (dumpErr error) { d.dumpSQL(writerCtx, taskChan) } close(taskChan) + _ = metaConn.Close() if err := wg.Wait(); err != nil { summary.CollectFailureUnit("dump table data", err) return errors.Trace(err) @@ -298,14 +294,16 @@ func (d *Dumper) dumpDatabases(tctx *tcontext.Context, metaConn *sql.Conn, taskC conf := d.conf allTables := conf.Tables for dbName, tables := range allTables { - createDatabaseSQL, err := ShowCreateDatabase(metaConn, dbName) - if err != nil { - return err - } - task := NewTaskDatabaseMeta(dbName, createDatabaseSQL) - ctxDone := d.sendTaskToChan(tctx, task, taskChan) - if ctxDone { - return tctx.Err() + if !conf.NoSchemas { + createDatabaseSQL, err := ShowCreateDatabase(metaConn, dbName) + if err != nil { + return err + } + task := NewTaskDatabaseMeta(dbName, createDatabaseSQL) + ctxDone := d.sendTaskToChan(tctx, task, taskChan) + if ctxDone { + return tctx.Err() + } } for _, table := range tables { @@ -316,18 +314,22 @@ func (d *Dumper) dumpDatabases(tctx *tcontext.Context, metaConn *sql.Conn, taskC return err } - if table.Type == TableTypeView { - task := NewTaskViewMeta(dbName, table.Name, meta.ShowCreateTable(), meta.ShowCreateView()) - ctxDone = d.sendTaskToChan(tctx, task, taskChan) - if ctxDone { - return tctx.Err() - } - } else { - task := NewTaskTableMeta(dbName, table.Name, meta.ShowCreateTable()) - ctxDone = d.sendTaskToChan(tctx, task, taskChan) - if ctxDone { - return tctx.Err() + if !conf.NoSchemas { + if table.Type == TableTypeView { + task := NewTaskViewMeta(dbName, table.Name, meta.ShowCreateTable(), meta.ShowCreateView()) + ctxDone := d.sendTaskToChan(tctx, task, taskChan) + if ctxDone { + return tctx.Err() + } + } else { + task := NewTaskTableMeta(dbName, table.Name, meta.ShowCreateTable()) + ctxDone := d.sendTaskToChan(tctx, task, taskChan) + if ctxDone { + return tctx.Err() + } } + } + if table.Type == TableTypeBase { err = d.dumpTableData(tctx, metaConn, meta, taskChan) if err != nil { return err @@ -344,6 +346,12 @@ func (d *Dumper) dumpTableData(tctx *tcontext.Context, conn *sql.Conn, meta Tabl if conf.NoData { return nil } + + // Update total rows + fieldName, _ := pickupPossibleField(meta, conn) + c := estimateCount(tctx, meta.DatabaseName(), meta.TableName(), conn, fieldName, conf) + AddCounter(estimateTotalRowsCounter, conf.Labels, float64(c)) + if conf.Rows == UnspecifiedSize { return d.sequentialDumpTable(tctx, conn, meta, taskChan) } @@ -355,7 +363,7 @@ func (d *Dumper) buildConcatTask(tctx *tcontext.Context, conn *sql.Conn, meta Ta errCh := make(chan error, 1) go func() { // adjust rows to suitable rows for this table - d.conf.Rows = GetSuitableRows(tctx, conn, meta.DatabaseName(), meta.TableName()) + d.conf.Rows = GetSuitableRows(meta.AvgRowLength()) err := d.concurrentDumpTable(tctx, conn, meta, tableChan) d.conf.Rows = UnspecifiedSize if err != nil { @@ -412,12 +420,9 @@ func (d *Dumper) buildConcatTask(tctx *tcontext.Context, conn *sql.Conn, meta Ta } } -func (d *Dumper) dumpWholeTableDirectly(tctx *tcontext.Context, conn *sql.Conn, meta TableMeta, taskChan chan<- Task, partition string, currentChunk, totalChunks int) error { +func (d *Dumper) dumpWholeTableDirectly(tctx *tcontext.Context, meta TableMeta, taskChan chan<- Task, partition, orderByClause string, currentChunk, totalChunks int) error { conf := d.conf - tableIR, err := SelectAllFromTable(conf, conn, meta, partition) - if err != nil { - return err - } + tableIR := SelectAllFromTable(conf, meta, partition, orderByClause) task := NewTaskTableData(meta, tableIR, currentChunk, totalChunks) ctxDone := d.sendTaskToChan(tctx, task, taskChan) if ctxDone { @@ -444,7 +449,11 @@ func (d *Dumper) sequentialDumpTable(tctx *tcontext.Context, conn *sql.Conn, met zap.String("database", meta.DatabaseName()), zap.String("table", meta.TableName())) } - return d.dumpWholeTableDirectly(tctx, conn, meta, taskChan, "", 0, 1) + orderByClause, err := buildOrderByClause(conf, conn, meta.DatabaseName(), meta.TableName(), meta.HasImplicitRowID()) + if err != nil { + return err + } + return d.dumpWholeTableDirectly(tctx, meta, taskChan, "", orderByClause, 0, 1) } // concurrentDumpTable tries to split table into several chunks to dump @@ -455,26 +464,27 @@ func (d *Dumper) concurrentDumpTable(tctx *tcontext.Context, conn *sql.Conn, met conf.ServerInfo.ServerVersion != nil && (conf.ServerInfo.ServerVersion.Compare(*tableSampleVersion) >= 0 || (conf.ServerInfo.HasTiKV && conf.ServerInfo.ServerVersion.Compare(*decodeRegionVersion) >= 0)) { - return d.concurrentDumpTiDBTables(tctx, conn, meta, taskChan) + err := d.concurrentDumpTiDBTables(tctx, conn, meta, taskChan) + // don't retry on context error and successful tasks + if err2 := errors.Cause(err); err2 == nil || err2 == context.DeadlineExceeded || err2 == context.Canceled { + return err + } + tctx.L().Warn("fallback to concurrent dump tables using rows due to tidb error", + zap.String("database", db), zap.String("table", tbl), zap.Error(err)) } - field, err := pickupPossibleField(db, tbl, conn, conf) + + orderByClause, err := buildOrderByClause(conf, conn, db, tbl, meta.HasImplicitRowID()) if err != nil { return err } - if field == "" { + + field, err := pickupPossibleField(meta, conn) + if err != nil || field == "" { // skip split chunk logic if not found proper field tctx.L().Warn("fallback to sequential dump due to no proper field", - zap.String("database", db), zap.String("table", tbl)) - return d.dumpWholeTableDirectly(tctx, conn, meta, taskChan, "", 0, 1) - } - - min, max, err := d.selectMinAndMaxIntValue(conn, db, tbl, field) - if err != nil { - return err + zap.String("database", db), zap.String("table", tbl), zap.Error(err)) + return d.dumpWholeTableDirectly(tctx, meta, taskChan, "", orderByClause, 0, 1) } - tctx.L().Debug("get int bounding values", - zap.String("lower", min.String()), - zap.String("upper", max.String())) count := estimateCount(d.tctx, db, tbl, conn, field, conf) tctx.L().Info("get estimated rows count", @@ -488,9 +498,17 @@ func (d *Dumper) concurrentDumpTable(tctx *tcontext.Context, conn *sql.Conn, met zap.Uint64("conf.rows", conf.Rows), zap.String("database", db), zap.String("table", tbl)) - return d.dumpWholeTableDirectly(tctx, conn, meta, taskChan, "", 0, 1) + return d.dumpWholeTableDirectly(tctx, meta, taskChan, "", orderByClause, 0, 1) } + min, max, err := d.selectMinAndMaxIntValue(conn, db, tbl, field) + if err != nil { + return err + } + tctx.L().Debug("get int bounding values", + zap.String("lower", min.String()), + zap.String("upper", max.String())) + // every chunk would have eventual adjustments estimatedChunks := count / conf.Rows estimatedStep := new(big.Int).Sub(max, min).Uint64()/estimatedChunks + 1 @@ -501,15 +519,7 @@ func (d *Dumper) concurrentDumpTable(tctx *tcontext.Context, conn *sql.Conn, met totalChunks = new(big.Int).Sub(max, min).Uint64() + 1 } - selectField, selectLen, err := buildSelectField(conn, db, tbl, conf.CompleteInsert) - if err != nil { - return err - } - - orderByClause, err := buildOrderByClause(conf, conn, db, tbl) - if err != nil { - return err - } + selectField, selectLen := meta.SelectedField(), meta.SelectedLen() chunkIndex := 0 nullValueCondition := "" @@ -594,7 +604,7 @@ func (d *Dumper) concurrentDumpTiDBTables(tctx *tcontext.Context, conn *sql.Conn if d.conf.ServerInfo.ServerVersion.Compare(*tableSampleVersion) >= 0 { tctx.L().Debug("dumping TiDB tables with TABLESAMPLE", zap.String("database", db), zap.String("table", tbl)) - handleColNames, handleVals, err = selectTiDBTableSample(tctx, conn, db, tbl) + handleColNames, handleVals, err = selectTiDBTableSample(tctx, conn, meta) } else { // for TiDB v3.0+, we can use table region decode in TiDB directly tctx.L().Debug("dumping TiDB tables with TABLE REGIONS", @@ -605,7 +615,7 @@ func (d *Dumper) concurrentDumpTiDBTables(tctx *tcontext.Context, conn *sql.Conn } if err == nil { if len(partitions) == 0 { - handleColNames, handleVals, err = d.selectTiDBTableRegionFunc(tctx, conn, db, tbl) + handleColNames, handleVals, err = d.selectTiDBTableRegionFunc(tctx, conn, meta) } else { return d.concurrentDumpTiDBPartitionTables(tctx, conn, meta, taskChan, partitions) } @@ -614,7 +624,7 @@ func (d *Dumper) concurrentDumpTiDBTables(tctx *tcontext.Context, conn *sql.Conn if err != nil { return err } - return d.sendConcurrentDumpTiDBTasks(tctx, conn, meta, taskChan, handleColNames, handleVals, "", 0, len(handleVals)+1) + return d.sendConcurrentDumpTiDBTasks(tctx, meta, taskChan, handleColNames, handleVals, "", 0, len(handleVals)+1) } func (d *Dumper) concurrentDumpTiDBPartitionTables(tctx *tcontext.Context, conn *sql.Conn, meta TableMeta, taskChan chan<- Task, partitions []string) error { @@ -626,7 +636,7 @@ func (d *Dumper) concurrentDumpTiDBPartitionTables(tctx *tcontext.Context, conn totalChunk := 0 cachedHandleVals := make([][][]string, len(partitions)) - handleColNames, _, err := selectTiDBRowKeyFields(conn, db, tbl, checkTiDBTableRegionPkFields) + handleColNames, _, err := selectTiDBRowKeyFields(conn, meta, checkTiDBTableRegionPkFields) if err != nil { return err } @@ -640,7 +650,7 @@ func (d *Dumper) concurrentDumpTiDBPartitionTables(tctx *tcontext.Context, conn cachedHandleVals[i] = handleVals } for i, partition := range partitions { - err := d.sendConcurrentDumpTiDBTasks(tctx, conn, meta, taskChan, handleColNames, cachedHandleVals[i], partition, startChunkIdx, totalChunk) + err := d.sendConcurrentDumpTiDBTasks(tctx, meta, taskChan, handleColNames, cachedHandleVals[i], partition, startChunkIdx, totalChunk) if err != nil { return err } @@ -650,17 +660,18 @@ func (d *Dumper) concurrentDumpTiDBPartitionTables(tctx *tcontext.Context, conn } func (d *Dumper) sendConcurrentDumpTiDBTasks(tctx *tcontext.Context, - conn *sql.Conn, meta TableMeta, taskChan chan<- Task, + meta TableMeta, taskChan chan<- Task, handleColNames []string, handleVals [][]string, partition string, startChunkIdx, totalChunk int) error { + db, tbl := meta.DatabaseName(), meta.TableName() if len(handleVals) == 0 { - return d.dumpWholeTableDirectly(tctx, conn, meta, taskChan, partition, startChunkIdx, totalChunk) + if partition == "" { + // return error to make outside function try using rows method to dump data + return errors.Errorf("empty handleVals for TiDB table `%s`.`%s`", escapeString(db), escapeString(tbl)) + } + return d.dumpWholeTableDirectly(tctx, meta, taskChan, partition, buildOrderByClauseString(handleColNames), startChunkIdx, totalChunk) } conf := d.conf - db, tbl := meta.DatabaseName(), meta.TableName() - selectField, selectLen, err := buildSelectField(conn, db, tbl, conf.CompleteInsert) - if err != nil { - return err - } + selectField, selectLen := meta.SelectedField(), meta.SelectedLen() where := buildWhereClauses(handleColNames, handleVals) orderByClause := buildOrderByClauseString(handleColNames) @@ -680,13 +691,13 @@ func (d *Dumper) L() log.Logger { return d.tctx.L() } -func selectTiDBTableSample(tctx *tcontext.Context, conn *sql.Conn, dbName, tableName string) (pkFields []string, pkVals [][]string, err error) { - pkFields, pkColTypes, err := selectTiDBRowKeyFields(conn, dbName, tableName, nil) +func selectTiDBTableSample(tctx *tcontext.Context, conn *sql.Conn, meta TableMeta) (pkFields []string, pkVals [][]string, err error) { + pkFields, pkColTypes, err := selectTiDBRowKeyFields(conn, meta, nil) if err != nil { return nil, nil, errors.Trace(err) } - query := buildTiDBTableSampleQuery(pkFields, dbName, tableName) + query := buildTiDBTableSampleQuery(pkFields, meta.DatabaseName(), meta.TableName()) rows, err := conn.QueryContext(tctx, query) if err != nil { return nil, nil, errors.Trace(err) @@ -725,15 +736,11 @@ func buildTiDBTableSampleQuery(pkFields []string, dbName, tblName string) string return fmt.Sprintf(template, pks, escapeString(dbName), escapeString(tblName), pks) } -func selectTiDBRowKeyFields(conn *sql.Conn, dbName, tableName string, checkPkFields func([]string, []string) error) (pkFields, pkColTypes []string, err error) { - hasImplicitRowID, err := SelectTiDBRowID(conn, dbName, tableName) - if err != nil { - return - } - if hasImplicitRowID { +func selectTiDBRowKeyFields(conn *sql.Conn, meta TableMeta, checkPkFields func([]string, []string) error) (pkFields, pkColTypes []string, err error) { + if meta.HasImplicitRowID() { pkFields, pkColTypes = []string{"_tidb_rowid"}, []string{"BIGINT"} } else { - pkFields, pkColTypes, err = GetPrimaryKeyAndColumnTypes(conn, dbName, tableName) + pkFields, pkColTypes, err = GetPrimaryKeyAndColumnTypes(conn, meta) if err == nil { if checkPkFields != nil { err = checkPkFields(pkFields, pkColTypes) @@ -754,8 +761,8 @@ func checkTiDBTableRegionPkFields(pkFields, pkColTypes []string) (err error) { return } -func selectTiDBTableRegion(tctx *tcontext.Context, conn *sql.Conn, dbName, tableName string) (pkFields []string, pkVals [][]string, err error) { - pkFields, _, err = selectTiDBRowKeyFields(conn, dbName, tableName, checkTiDBTableRegionPkFields) +func selectTiDBTableRegion(tctx *tcontext.Context, conn *sql.Conn, meta TableMeta) (pkFields []string, pkVals [][]string, err error) { + pkFields, _, err = selectTiDBRowKeyFields(conn, meta, checkTiDBTableRegionPkFields) if err != nil { return } @@ -768,6 +775,7 @@ func selectTiDBTableRegion(tctx *tcontext.Context, conn *sql.Conn, dbName, table tableRegionSQL = "SELECT START_KEY,tidb_decode_key(START_KEY) from INFORMATION_SCHEMA.TIKV_REGION_STATUS s WHERE s.DB_NAME = ? AND s.TABLE_NAME = ? AND IS_INDEX = 0 ORDER BY START_KEY;" tidbRowID = "_tidb_rowid=" ) + dbName, tableName := meta.DatabaseName(), meta.TableName() logger := tctx.L().With(zap.String("database", dbName), zap.String("table", tableName)) err = simpleQueryWithArgs(conn, func(rows *sql.Rows) error { rowID++ @@ -832,7 +840,7 @@ func selectTiDBPartitionRegion(tctx *tcontext.Context, conn *sql.Conn, dbName, t } } - return pkVals, err + return pkVals, nil } func extractTiDBRowIDFromDecodedKey(indexField, key string) (string, error) { @@ -849,31 +857,38 @@ func prepareTableListToDump(tctx *tcontext.Context, conf *Config, db *sql.Conn) return err } - conf.Tables, err = listAllTables(db, databases) + tableTypes := []TableType{TableTypeBase} + if !conf.NoViews { + tableTypes = append(tableTypes, TableTypeView) + } + // for consistency lock, we need to build the tables to dump as soon as possible + asap := conf.Consistency == consistencyTypeLock + conf.Tables, err = ListAllDatabasesTables(tctx, db, databases, asap, tableTypes...) if err != nil { return err } - if !conf.NoViews { - views, err := listAllViews(db, databases) - if err != nil { - return err - } - conf.Tables.Merge(views) - } - filterTables(tctx, conf) return nil } func dumpTableMeta(conf *Config, conn *sql.Conn, db string, table *TableInfo) (TableMeta, error) { tbl := table.Name - selectField, _, err := buildSelectField(conn, db, tbl, conf.CompleteInsert) + selectField, selectLen, err := buildSelectField(conn, db, tbl, conf.CompleteInsert) if err != nil { return nil, err } + var ( + colTypes []*sql.ColumnType + hasImplicitRowID bool + ) + if conf.ServerInfo.ServerType == ServerTypeTiDB { + hasImplicitRowID, err = SelectTiDBRowID(conn, db, tbl) + if err != nil { + return nil, err + } + } - var colTypes []*sql.ColumnType // If all columns are generated if selectField == "" { colTypes, err = GetColumnTypes(conn, "*", db, tbl) @@ -885,10 +900,13 @@ func dumpTableMeta(conf *Config, conn *sql.Conn, db string, table *TableInfo) (T } meta := &tableMeta{ - database: db, - table: tbl, - colTypes: colTypes, - selectedField: selectField, + avgRowLength: table.AvgRowLength, + database: db, + table: tbl, + colTypes: colTypes, + selectedField: selectField, + selectedLen: selectLen, + hasImplicitRowID: hasImplicitRowID, specCmts: []string{ "/*!40101 SET NAMES binary*/;", }, @@ -1241,11 +1259,12 @@ func (d *Dumper) renewSelectTableRegionFuncForLowerTiDB(tctx *tcontext.Context) } } - d.selectTiDBTableRegionFunc = func(tctx *tcontext.Context, conn *sql.Conn, dbName, tableName string) (pkFields []string, pkVals [][]string, err error) { - pkFields, _, err = selectTiDBRowKeyFields(conn, dbName, tableName, checkTiDBTableRegionPkFields) + d.selectTiDBTableRegionFunc = func(tctx *tcontext.Context, conn *sql.Conn, meta TableMeta) (pkFields []string, pkVals [][]string, err error) { + pkFields, _, err = selectTiDBRowKeyFields(conn, meta, checkTiDBTableRegionPkFields) if err != nil { return } + dbName, tableName := meta.DatabaseName(), meta.TableName() if tbInfos, ok := tableInfoMap[dbName]; ok { if tbInfo, ok := tbInfos[tableName]; ok { pkVals = make([][]string, len(tbInfo)) diff --git a/v4/export/dump_test.go b/v4/export/dump_test.go index 64cccc29..8b2fbc92 100644 --- a/v4/export/dump_test.go +++ b/v4/export/dump_test.go @@ -20,7 +20,6 @@ func (s *testSQLSuite) TestDumpBlock(c *C) { c.Assert(err, IsNil) defer db.Close() - database := "test" mock.ExpectQuery(fmt.Sprintf("SHOW CREATE DATABASE `%s`", escapeString(database))). WillReturnRows(sqlmock.NewRows([]string{"Database", "Create Database"}). AddRow("test", "CREATE DATABASE `test` /*!40100 DEFAULT CHARACTER SET utf8mb4 */")) @@ -49,7 +48,43 @@ func (s *testSQLSuite) TestDumpBlock(c *C) { // simulate taskChan is full taskChan := make(chan Task, 1) taskChan <- &TaskDatabaseMeta{} - d.conf.Tables = DatabaseTables{}.AppendTable("test", nil) + d.conf.Tables = DatabaseTables{}.AppendTable(database, nil) c.Assert(errors.ErrorEqual(d.dumpDatabases(writerCtx, conn, taskChan), context.Canceled), IsTrue) c.Assert(errors.ErrorEqual(wg.Wait(), writerErr), IsTrue) } + +func (s *testSQLSuite) TestDumpTableMeta(c *C) { + db, mock, err := sqlmock.New() + c.Assert(err, IsNil) + defer db.Close() + + tctx, cancel := tcontext.Background().WithLogger(appLogger).WithCancel() + defer cancel() + conn, err := db.Conn(tctx) + c.Assert(err, IsNil) + conf := DefaultConfig() + conf.NoSchemas = true + + for serverType := ServerTypeUnknown; serverType < ServerTypeAll; serverType++ { + conf.ServerInfo.ServerType = ServerType(serverType) + hasImplicitRowID := false + mock.ExpectQuery("SHOW COLUMNS FROM"). + WillReturnRows(sqlmock.NewRows([]string{"Field", "Type", "Null", "Key", "Default", "Extra"}). + AddRow("id", "int(11)", "NO", "PRI", nil, "")) + if serverType == ServerTypeTiDB { + mock.ExpectExec("SELECT _tidb_rowid from"). + WillReturnResult(sqlmock.NewResult(0, 0)) + hasImplicitRowID = true + } + mock.ExpectQuery(fmt.Sprintf("SELECT \\* FROM `%s`.`%s`", database, table)). + WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow(1)) + meta, err := dumpTableMeta(conf, conn, database, &TableInfo{Type: TableTypeBase, Name: table}) + c.Assert(err, IsNil) + c.Assert(meta.DatabaseName(), Equals, database) + c.Assert(meta.TableName(), Equals, table) + c.Assert(meta.SelectedField(), Equals, "*") + c.Assert(meta.SelectedLen(), Equals, 1) + c.Assert(meta.ShowCreateTable(), Equals, "") + c.Assert(meta.HasImplicitRowID(), Equals, hasImplicitRowID) + } +} diff --git a/v4/export/ir.go b/v4/export/ir.go index 2b1d8d07..35e6614d 100644 --- a/v4/export/ir.go +++ b/v4/export/ir.go @@ -29,9 +29,12 @@ type TableMeta interface { ColumnTypes() []string ColumnNames() []string SelectedField() string + SelectedLen() int SpecialComments() StringIter ShowCreateTable() string ShowCreateView() string + AvgRowLength() uint64 + HasImplicitRowID() bool } // SQLRowIter is the iterator on a collection of sql.Row. @@ -98,6 +101,7 @@ func setTableMetaFromRows(rows *sql.Rows) (TableMeta, error) { return &tableMeta{ colTypes: tps, selectedField: strings.Join(nms, ","), + selectedLen: len(nms), specCmts: []string{"/*!40101 SET NAMES binary*/;"}, }, nil } diff --git a/v4/export/ir_impl.go b/v4/export/ir_impl.go index ee5e01d9..8170a1c4 100644 --- a/v4/export/ir_impl.go +++ b/v4/export/ir_impl.go @@ -4,7 +4,6 @@ package export import ( "database/sql" - "fmt" "strings" "github.com/pingcap/errors" @@ -250,13 +249,16 @@ func (td *tableData) RawRows() *sql.Rows { } type tableMeta struct { - database string - table string - colTypes []*sql.ColumnType - selectedField string - specCmts []string - showCreateTable string - showCreateView string + database string + table string + colTypes []*sql.ColumnType + selectedField string + selectedLen int + specCmts []string + showCreateTable string + showCreateView string + avgRowLength uint64 + hasImplicitRowID bool } func (tm *tableMeta) ColumnTypes() []string { @@ -288,10 +290,11 @@ func (tm *tableMeta) ColumnCount() uint { } func (tm *tableMeta) SelectedField() string { - if tm.selectedField == "*" || tm.selectedField == "" { - return tm.selectedField - } - return fmt.Sprintf("(%s)", tm.selectedField) + return tm.selectedField +} + +func (tm *tableMeta) SelectedLen() int { + return tm.selectedLen } func (tm *tableMeta) SpecialComments() StringIter { @@ -306,6 +309,14 @@ func (tm *tableMeta) ShowCreateView() string { return tm.showCreateView } +func (tm *tableMeta) AvgRowLength() uint64 { + return tm.avgRowLength +} + +func (tm *tableMeta) HasImplicitRowID() bool { + return tm.hasImplicitRowID +} + type metaData struct { target string metaSQL string diff --git a/v4/export/metrics.go b/v4/export/metrics.go index a2d671d0..cb62caf0 100644 --- a/v4/export/metrics.go +++ b/v4/export/metrics.go @@ -10,8 +10,8 @@ import ( ) var ( - finishedSizeCounter *prometheus.CounterVec - finishedRowsCounter *prometheus.CounterVec + finishedSizeGauge *prometheus.GaugeVec + finishedRowsGauge *prometheus.GaugeVec finishedTablesCounter *prometheus.CounterVec estimateTotalRowsCounter *prometheus.CounterVec writeTimeHistogram *prometheus.HistogramVec @@ -27,8 +27,8 @@ func InitMetricsVector(labels prometheus.Labels) { for name := range labels { labelNames = append(labelNames, name) } - finishedSizeCounter = prometheus.NewCounterVec( - prometheus.CounterOpts{ + finishedSizeGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ Namespace: "dumpling", Subsystem: "dump", Name: "finished_size", @@ -41,8 +41,8 @@ func InitMetricsVector(labels prometheus.Labels) { Name: "estimate_total_rows", Help: "estimate total rows for dumpling tables", }, labelNames) - finishedRowsCounter = prometheus.NewCounterVec( - prometheus.CounterOpts{ + finishedRowsGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ Namespace: "dumpling", Subsystem: "dump", Name: "finished_rows", @@ -89,11 +89,11 @@ func InitMetricsVector(labels prometheus.Labels) { // RegisterMetrics registers metrics. func RegisterMetrics(registry *prometheus.Registry) { - if finishedSizeCounter == nil { + if finishedSizeGauge == nil { return } - registry.MustRegister(finishedSizeCounter) - registry.MustRegister(finishedRowsCounter) + registry.MustRegister(finishedSizeGauge) + registry.MustRegister(finishedRowsGauge) registry.MustRegister(estimateTotalRowsCounter) registry.MustRegister(finishedTablesCounter) registry.MustRegister(writeTimeHistogram) @@ -104,11 +104,11 @@ func RegisterMetrics(registry *prometheus.Registry) { // RemoveLabelValuesWithTaskInMetrics removes metrics of specified labels. func RemoveLabelValuesWithTaskInMetrics(labels prometheus.Labels) { - if finishedSizeCounter == nil { + if finishedSizeGauge == nil { return } - finishedSizeCounter.Delete(labels) - finishedRowsCounter.Delete(labels) + finishedSizeGauge.Delete(labels) + finishedRowsGauge.Delete(labels) estimateTotalRowsCounter.Delete(labels) finishedTablesCounter.Delete(labels) writeTimeHistogram.Delete(labels) @@ -154,6 +154,19 @@ func ObserveHistogram(histogramVec *prometheus.HistogramVec, labels prometheus.L histogramVec.With(labels).Observe(v) } +// ReadGauge reports the current value of the gauge. +func ReadGauge(gaugeVec *prometheus.GaugeVec, labels prometheus.Labels) float64 { + if gaugeVec == nil { + return math.NaN() + } + gauge := gaugeVec.With(labels) + var metric dto.Metric + if err := gauge.Write(&metric); err != nil { + return math.NaN() + } + return metric.Gauge.GetValue() +} + // AddGauge adds a gauge func AddGauge(gaugeVec *prometheus.GaugeVec, labels prometheus.Labels, v float64) { if gaugeVec == nil { @@ -162,6 +175,14 @@ func AddGauge(gaugeVec *prometheus.GaugeVec, labels prometheus.Labels, v float64 gaugeVec.With(labels).Add(v) } +// SubGauge subs a gauge +func SubGauge(gaugeVec *prometheus.GaugeVec, labels prometheus.Labels, v float64) { + if gaugeVec == nil { + return + } + gaugeVec.With(labels).Sub(v) +} + // IncGauge incs a gauge func IncGauge(gaugeVec *prometheus.GaugeVec, labels prometheus.Labels) { if gaugeVec == nil { diff --git a/v4/export/prepare.go b/v4/export/prepare.go index 000e1cf9..e995f8aa 100644 --- a/v4/export/prepare.go +++ b/v4/export/prepare.go @@ -96,14 +96,6 @@ func prepareDumpingDatabases(conf *Config, db *sql.Conn) ([]string, error) { return conf.Databases, nil } -func listAllTables(db *sql.Conn, databaseNames []string) (DatabaseTables, error) { - return ListAllDatabasesTables(db, databaseNames, TableTypeBase) -} - -func listAllViews(db *sql.Conn, databaseNames []string) (DatabaseTables, error) { - return ListAllDatabasesTables(db, databaseNames, TableTypeView) -} - type databaseName = string // TableType represents the type of table @@ -116,10 +108,41 @@ const ( TableTypeView ) +const ( + // TableTypeBaseStr represents the basic table string + TableTypeBaseStr = "BASE TABLE" + // TableTypeViewStr represents the view table string + TableTypeViewStr = "VIEW" +) + +func (t TableType) String() string { + switch t { + case TableTypeBase: + return TableTypeBaseStr + case TableTypeView: + return TableTypeViewStr + default: + return "UNKNOWN" + } +} + +// ParseTableType parses table type string to TableType +func ParseTableType(s string) (TableType, error) { + switch s { + case TableTypeBaseStr: + return TableTypeBase, nil + case TableTypeViewStr: + return TableTypeView, nil + default: + return TableTypeBase, errors.Errorf("unknown table type %s", s) + } +} + // TableInfo is the table info for a table in database type TableInfo struct { - Name string - Type TableType + Name string + AvgRowLength uint64 + Type TableType } // Equals returns true the table info is the same with another one @@ -142,9 +165,9 @@ func (d DatabaseTables) AppendTable(dbName string, table *TableInfo) DatabaseTab } // AppendTables appends several basic tables to DatabaseTables -func (d DatabaseTables) AppendTables(dbName string, tableNames ...string) DatabaseTables { - for _, t := range tableNames { - d[dbName] = append(d[dbName], &TableInfo{t, TableTypeBase}) +func (d DatabaseTables) AppendTables(dbName string, tableNames []string, avgRowLengths []uint64) DatabaseTables { + for i, t := range tableNames { + d[dbName] = append(d[dbName], &TableInfo{t, avgRowLengths[i], TableTypeBase}) } return d } @@ -152,7 +175,7 @@ func (d DatabaseTables) AppendTables(dbName string, tableNames ...string) Databa // AppendViews appends several views to DatabaseTables func (d DatabaseTables) AppendViews(dbName string, viewNames ...string) DatabaseTables { for _, v := range viewNames { - d[dbName] = append(d[dbName], &TableInfo{v, TableTypeView}) + d[dbName] = append(d[dbName], &TableInfo{v, 0, TableTypeView}) } return d } diff --git a/v4/export/prepare_test.go b/v4/export/prepare_test.go index 28832543..acf0691a 100644 --- a/v4/export/prepare_test.go +++ b/v4/export/prepare_test.go @@ -6,6 +6,8 @@ import ( "context" "fmt" + tcontext "github.com/pingcap/dumpling/v4/context" + "github.com/DATA-DOG/go-sqlmock" . "github.com/pingcap/check" ) @@ -64,14 +66,16 @@ func (s *testPrepareSuite) TestListAllTables(c *C) { defer db.Close() conn, err := db.Conn(context.Background()) c.Assert(err, IsNil) + tctx := tcontext.Background().WithLogger(appLogger) + // Test list all tables and skipping views. data := NewDatabaseTables(). - AppendTables("db1", "t1", "t2"). - AppendTables("db2", "t3", "t4", "t5"). + AppendTables("db1", []string{"t1", "t2"}, []uint64{1, 2}). + AppendTables("db2", []string{"t3", "t4", "t5"}, []uint64{3, 4, 5}). AppendViews("db3", "t6", "t7", "t8") dbNames := make([]databaseName, 0, len(data)) - rows := sqlmock.NewRows([]string{"table_schema", "table_name"}) + rows := sqlmock.NewRows([]string{"TABLE_SCHEMA", "TABLE_NAME", "TABLE_TYPE", "AVG_ROW_LENGTH"}) for dbName, tableInfos := range data { dbNames = append(dbNames, dbName) @@ -79,13 +83,13 @@ func (s *testPrepareSuite) TestListAllTables(c *C) { if tbInfo.Type == TableTypeView { continue } - rows.AddRow(dbName, tbInfo.Name) + rows.AddRow(dbName, tbInfo.Name, tbInfo.Type.String(), tbInfo.AvgRowLength) } } - query := "SELECT table_schema,table_name FROM information_schema.tables WHERE table_type = (.*)" + query := "SELECT TABLE_SCHEMA,TABLE_NAME,TABLE_TYPE,AVG_ROW_LENGTH FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE='BASE TABLE'" mock.ExpectQuery(query).WillReturnRows(rows) - tables, err := listAllTables(conn, dbNames) + tables, err := ListAllDatabasesTables(tctx, conn, dbNames, true, TableTypeBase) c.Assert(err, IsNil) for d, t := range tables { @@ -99,15 +103,81 @@ func (s *testPrepareSuite) TestListAllTables(c *C) { // Test list all tables and not skipping views. data = NewDatabaseTables(). - AppendTables("db", "t1"). + AppendTables("db", []string{"t1"}, []uint64{1}). AppendViews("db", "t2") - query = "SELECT table_schema,table_name FROM information_schema.tables WHERE table_type = (.*)" - mock.ExpectQuery(query).WillReturnRows(sqlmock.NewRows([]string{"table_schema", "table_name"}).AddRow("db", "t2")) - tables, err = listAllViews(conn, []string{"db"}) + query = "SELECT TABLE_SCHEMA,TABLE_NAME,TABLE_TYPE,AVG_ROW_LENGTH FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE='BASE TABLE' OR TABLE_TYPE='VIEW'" + mock.ExpectQuery(query).WillReturnRows(sqlmock.NewRows([]string{"TABLE_SCHEMA", "TABLE_NAME", "TABLE_TYPE", "AVG_ROW_LENGTH"}). + AddRow("db", "t1", TableTypeBaseStr, 1).AddRow("db", "t2", TableTypeViewStr, nil)) + tables, err = ListAllDatabasesTables(tctx, conn, []string{"db"}, true, TableTypeBase, TableTypeView) c.Assert(err, IsNil) c.Assert(len(tables), Equals, 1) - c.Assert(len(tables["db"]), Equals, 1) - c.Assert(tables["db"][0].Equals(data["db"][1]), IsTrue, Commentf("%v mismatch %v", tables["db"][0], data["db"][1])) + c.Assert(len(tables["db"]), Equals, 2) + for i := 0; i < len(tables["db"]); i++ { + cmt := Commentf("%v mismatch: %v", tables["db"][i], data["db"][i]) + c.Assert(tables["db"][i].Equals(data["db"][i]), IsTrue, cmt) + } + + c.Assert(mock.ExpectationsWereMet(), IsNil) +} + +func (s *testPrepareSuite) TestListAllTablesByTableStatus(c *C) { + db, mock, err := sqlmock.New() + c.Assert(err, IsNil) + defer db.Close() + conn, err := db.Conn(context.Background()) + c.Assert(err, IsNil) + tctx := tcontext.Background().WithLogger(appLogger) + + // Test list all tables and skipping views. + data := NewDatabaseTables(). + AppendTables("db1", []string{"t1", "t2"}, []uint64{1, 2}). + AppendTables("db2", []string{"t3", "t4", "t5"}, []uint64{3, 4, 5}). + AppendViews("db3", "t6", "t7", "t8") + + query := "SHOW TABLE STATUS FROM `%s`" + showTableStatusColumnNames := []string{"Name", "Engine", "Version", "Row_format", "Rows", "Avg_row_length", "Data_length", "Max_data_length", "Index_length", "Data_free", "Auto_increment", "Create_time", "Update_time", "Check_time", "Collation", "Checksum", "Create_options", "Comment"} + dbNames := make([]databaseName, 0, len(data)) + for dbName, tableInfos := range data { + dbNames = append(dbNames, dbName) + rows := sqlmock.NewRows(showTableStatusColumnNames) + + for _, tbInfo := range tableInfos { + if tbInfo.Type == TableTypeBase { + rows.AddRow(tbInfo.Name, "InnoDB", 10, "Dynamic", 0, 0, 16384, 0, 0, 0, nil, "2021-07-08 03:04:07", nil, nil, "latin1_swedish_ci", nil, "", "") + } else { + rows.AddRow(tbInfo.Name, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, "VIEW") + } + } + mock.ExpectQuery(fmt.Sprintf(query, dbName)).WillReturnRows(rows) + } + + tables, err := ListAllDatabasesTables(tctx, conn, dbNames, false, TableTypeBase) + c.Assert(err, IsNil) + + for d, t := range tables { + expectedTbs, ok := data[d] + c.Assert(ok, IsTrue) + for i := 0; i < len(t); i++ { + cmt := Commentf("%v mismatch: %v", t[i], expectedTbs[i]) + c.Assert(t[i].Equals(expectedTbs[i]), IsTrue, cmt) + } + } + + // Test list all tables and not skipping views. + data = NewDatabaseTables(). + AppendTables("db", []string{"t1"}, []uint64{1}). + AppendViews("db", "t2") + mock.ExpectQuery(fmt.Sprintf(query, "db")).WillReturnRows(sqlmock.NewRows(showTableStatusColumnNames). + AddRow("t1", "InnoDB", 10, "Dynamic", 0, 1, 16384, 0, 0, 0, nil, "2021-07-08 03:04:07", nil, nil, "latin1_swedish_ci", nil, "", ""). + AddRow("t2", nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, "VIEW")) + tables, err = ListAllDatabasesTables(tctx, conn, []string{"db"}, false, TableTypeBase, TableTypeView) + c.Assert(err, IsNil) + c.Assert(len(tables), Equals, 1) + c.Assert(len(tables["db"]), Equals, 2) + for i := 0; i < len(tables["db"]); i++ { + cmt := Commentf("%v mismatch: %v", tables["db"][i], data["db"][i]) + c.Assert(tables["db"][i].Equals(data["db"][i]), IsTrue, cmt) + } c.Assert(mock.ExpectationsWereMet(), IsNil) } diff --git a/v4/export/sql.go b/v4/export/sql.go index f6c1b80a..4d7a03a3 100644 --- a/v4/export/sql.go +++ b/v4/export/sql.go @@ -143,36 +143,91 @@ func RestoreCharset(w io.StringWriter) { } // ListAllDatabasesTables lists all the databases and tables from the database -func ListAllDatabasesTables(db *sql.Conn, databaseNames []string, tableType TableType) (DatabaseTables, error) { - var tableTypeStr string - switch tableType { - case TableTypeBase: - tableTypeStr = "BASE TABLE" - case TableTypeView: - tableTypeStr = "VIEW" - default: - return nil, errors.Errorf("unknown table type %v", tableType) - } - - query := fmt.Sprintf("SELECT table_schema,table_name FROM information_schema.tables WHERE table_type = '%s'", tableTypeStr) +// if asap is true, will use information_schema to get table info in one query +// if asap is false, will use show table status for each database because it has better performance according to our tests +func ListAllDatabasesTables(tctx *tcontext.Context, db *sql.Conn, databaseNames []string, asap bool, tableTypes ...TableType) (DatabaseTables, error) { // revive:disable-line:flag-parameter dbTables := DatabaseTables{} - for _, schema := range databaseNames { - dbTables[schema] = make([]*TableInfo, 0) - } - - if err := simpleQueryWithArgs(db, func(rows *sql.Rows) error { - var schema, table string - if err := rows.Scan(&schema, &table); err != nil { - return errors.Trace(err) + var ( + schema, table, tableTypeStr string + tableType TableType + avgRowLength uint64 + err error + ) + if asap { + tableTypeConditions := make([]string, len(tableTypes)) + for i, tableType := range tableTypes { + tableTypeConditions[i] = fmt.Sprintf("TABLE_TYPE='%s'", tableType) + } + query := fmt.Sprintf("SELECT TABLE_SCHEMA,TABLE_NAME,TABLE_TYPE,AVG_ROW_LENGTH FROM INFORMATION_SCHEMA.TABLES WHERE %s", strings.Join(tableTypeConditions, " OR ")) + for _, schema := range databaseNames { + dbTables[schema] = make([]*TableInfo, 0) } + if err = simpleQueryWithArgs(db, func(rows *sql.Rows) error { + var ( + sqlAvgRowLength sql.NullInt64 + err2 error + ) + if err2 = rows.Scan(&schema, &table, &tableTypeStr, &sqlAvgRowLength); err != nil { + return errors.Trace(err2) + } + tableType, err2 = ParseTableType(tableTypeStr) + if err2 != nil { + return errors.Trace(err2) + } - // only append tables to schemas in databaseNames - if _, ok := dbTables[schema]; ok { - dbTables[schema] = append(dbTables[schema], &TableInfo{table, tableType}) + if sqlAvgRowLength.Valid { + avgRowLength = uint64(sqlAvgRowLength.Int64) + } else { + avgRowLength = 0 + } + // only append tables to schemas in databaseNames + if _, ok := dbTables[schema]; ok { + dbTables[schema] = append(dbTables[schema], &TableInfo{table, avgRowLength, tableType}) + } + return nil + }, query); err != nil { + return nil, errors.Annotatef(err, "sql: %s", query) + } + } else { + queryTemplate := "SHOW TABLE STATUS FROM `%s`" + selectedTableType := make(map[TableType]struct{}) + for _, tableType = range tableTypes { + selectedTableType[tableType] = struct{}{} + } + for _, schema = range databaseNames { + dbTables[schema] = make([]*TableInfo, 0) + query := fmt.Sprintf(queryTemplate, escapeString(schema)) + rows, err := db.QueryContext(tctx, query) + if err != nil { + return nil, errors.Annotatef(err, "sql: %s", query) + } + results, err := GetSpecifiedColumnValuesAndClose(rows, "NAME", "ENGINE", "AVG_ROW_LENGTH", "COMMENT") + if err != nil { + return nil, errors.Annotatef(err, "sql: %s", query) + } + for _, oneRow := range results { + table, engine, avgRowLengthStr, comment := oneRow[0], oneRow[1], oneRow[2], oneRow[3] + if avgRowLengthStr != "" { + avgRowLength, err = strconv.ParseUint(avgRowLengthStr, 10, 64) + if err != nil { + return nil, errors.Annotatef(err, "sql: %s", query) + } + } else { + avgRowLength = 0 + } + tableType = TableTypeBase + if engine == "" && (comment == "" || comment == TableTypeViewStr) { + tableType = TableTypeView + } else if engine == "" { + tctx.L().Warn("Invalid table without engine found", zap.String("database", schema), zap.String("table", table)) + continue + } + if _, ok := selectedTableType[tableType]; !ok { + continue + } + dbTables[schema] = append(dbTables[schema], &TableInfo{table, avgRowLength, tableType}) + } } - return nil - }, query); err != nil { - return nil, errors.Annotatef(err, "sql: %s", query) } return dbTables, nil } @@ -190,23 +245,15 @@ func SelectVersion(db *sql.DB) (string, error) { } // SelectAllFromTable dumps data serialized from a specified table -func SelectAllFromTable(conf *Config, db *sql.Conn, meta TableMeta, partition string) (TableDataIR, error) { +func SelectAllFromTable(conf *Config, meta TableMeta, partition, orderByClause string) TableDataIR { database, table := meta.DatabaseName(), meta.TableName() - selectedField, selectLen, err := buildSelectField(db, database, table, conf.CompleteInsert) - if err != nil { - return nil, err - } - - orderByClause, err := buildOrderByClause(conf, db, database, table) - if err != nil { - return nil, err - } + selectedField, selectLen := meta.SelectedField(), meta.SelectedLen() query := buildSelectQuery(database, table, selectedField, partition, buildWhereCondition(conf, ""), orderByClause) return &tableData{ query: query, colLen: selectLen, - }, nil + } } func buildSelectQuery(database, table, fields, partition, where, orderByClause string) string { @@ -242,18 +289,12 @@ func buildSelectQuery(database, table, fields, partition, where, orderByClause s return query.String() } -func buildOrderByClause(conf *Config, db *sql.Conn, database, table string) (string, error) { +func buildOrderByClause(conf *Config, db *sql.Conn, database, table string, hasImplicitRowID bool) (string, error) { // revive:disable-line:flag-parameter if !conf.SortByPk { return "", nil } - if conf.ServerInfo.ServerType == ServerTypeTiDB { - ok, err := SelectTiDBRowID(db, database, table) - if err != nil { - return "", errors.Trace(err) - } - if ok { - return orderByTiDBRowID, nil - } + if hasImplicitRowID { + return orderByTiDBRowID, nil } cols, err := GetPrimaryKeyColumns(db, database, table) if err != nil { @@ -278,15 +319,13 @@ func SelectTiDBRowID(db *sql.Conn, database, table string) (bool, error) { } // GetSuitableRows gets suitable rows for each table -func GetSuitableRows(tctx *tcontext.Context, db *sql.Conn, database, table string) uint64 { +func GetSuitableRows(avgRowLength uint64) uint64 { const ( defaultRows = 200000 maxRows = 1000000 bytesPerFile = 128 * 1024 * 1024 // 128MB per file by default ) - avgRowLength, err := GetAVGRowLength(tctx, db, database, table) - if err != nil || avgRowLength == 0 { - tctx.L().Debug("fail to get average row length", zap.Uint64("averageRowLength", avgRowLength), zap.Error(err)) + if avgRowLength == 0 { return defaultRows } estimateRows := bytesPerFile / avgRowLength @@ -296,18 +335,6 @@ func GetSuitableRows(tctx *tcontext.Context, db *sql.Conn, database, table strin return estimateRows } -// GetAVGRowLength gets whether this table's average row length -func GetAVGRowLength(tctx *tcontext.Context, db *sql.Conn, database, table string) (uint64, error) { - const query = "select AVG_ROW_LENGTH from INFORMATION_SCHEMA.TABLES where table_schema=? and table_name=?;" - var avgRowLength uint64 - row := db.QueryRowContext(tctx, query, database, table) - err := row.Scan(&avgRowLength) - if err != nil { - return 0, errors.Annotatef(err, "sql: %s", query) - } - return avgRowLength, nil -} - // GetColumnTypes gets *sql.ColumnTypes from a specified table func GetColumnTypes(db *sql.Conn, fields, database, table string) ([]*sql.ColumnType, error) { query := fmt.Sprintf("SELECT %s FROM `%s`.`%s` LIMIT 1", fields, escapeString(database), escapeString(table)) @@ -323,78 +350,68 @@ func GetColumnTypes(db *sql.Conn, fields, database, table string) ([]*sql.Column } // GetPrimaryKeyAndColumnTypes gets all primary columns and their types in ordinal order -func GetPrimaryKeyAndColumnTypes(conn *sql.Conn, database, table string) ([]string, []string, error) { - query := - `SELECT c.COLUMN_NAME, DATA_TYPE FROM -INFORMATION_SCHEMA.COLUMNS c INNER JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE k ON -c.column_name = k.column_name and -c.table_schema = k.table_schema and -c.table_name = k.table_name and -c.table_schema = ? and -c.table_name = ? -WHERE COLUMN_KEY = 'PRI' -ORDER BY k.ORDINAL_POSITION;` - var colNames, colTypes []string - if err := simpleQueryWithArgs(conn, func(rows *sql.Rows) error { - var colName, colType string - if err := rows.Scan(&colName, &colType); err != nil { - return errors.Trace(err) - } - colNames = append(colNames, colName) - colTypes = append(colTypes, strings.ToUpper(colType)) - return nil - }, query, database, table); err != nil { - return nil, nil, errors.Annotatef(err, "sql: %s", query) +func GetPrimaryKeyAndColumnTypes(conn *sql.Conn, meta TableMeta) ([]string, []string, error) { + var ( + colNames, colTypes []string + err error + ) + colNames, err = GetPrimaryKeyColumns(conn, meta.DatabaseName(), meta.TableName()) + if err != nil { + return nil, nil, err + } + colName2Type := string2Map(meta.ColumnNames(), meta.ColumnTypes()) + colTypes = make([]string, len(colNames)) + for i, colName := range colNames { + colTypes[i] = colName2Type[colName] } return colNames, colTypes, nil } // GetPrimaryKeyColumns gets all primary columns in ordinal order func GetPrimaryKeyColumns(db *sql.Conn, database, table string) ([]string, error) { - priKeyColsQuery := "SELECT column_name FROM information_schema.KEY_COLUMN_USAGE " + - "WHERE table_schema = ? AND table_name = ? AND CONSTRAINT_NAME = 'PRIMARY' order by ORDINAL_POSITION;" - rows, err := db.QueryContext(context.Background(), priKeyColsQuery, database, table) + priKeyColsQuery := fmt.Sprintf("SHOW INDEX FROM `%s`.`%s`", escapeString(database), escapeString(table)) + rows, err := db.QueryContext(context.Background(), priKeyColsQuery) if err != nil { return nil, errors.Annotatef(err, "sql: %s", priKeyColsQuery) } - defer rows.Close() - var cols []string - var col string - for rows.Next() { - err = rows.Scan(&col) - if err != nil { - return nil, errors.Annotatef(err, "sql: %s", priKeyColsQuery) - } - cols = append(cols, col) - } - if err = rows.Err(); err != nil { + results, err := GetSpecifiedColumnValuesAndClose(rows, "KEY_NAME", "COLUMN_NAME") + if err != nil { return nil, errors.Annotatef(err, "sql: %s", priKeyColsQuery) } + cols := make([]string, 0, len(results)) + for _, oneRow := range results { + keyName, columnName := oneRow[0], oneRow[1] + if keyName == "PRIMARY" { + cols = append(cols, columnName) + } + } return cols, nil } -// GetPrimaryKeyName try to get a numeric primary index -func GetPrimaryKeyName(db *sql.Conn, database, table string) (string, error) { - return getNumericIndex(db, database, table, "PRI") -} - -// GetUniqueIndexName try to get a numeric unique index -func GetUniqueIndexName(db *sql.Conn, database, table string) (string, error) { - return getNumericIndex(db, database, table, "UNI") -} - -func getNumericIndex(db *sql.Conn, database, table, indexType string) (string, error) { - keyQuery := "SELECT column_name FROM information_schema.columns " + - "WHERE table_schema = ? AND table_name = ? AND column_key = ? AND data_type IN ('int', 'bigint');" - var colName string - row := db.QueryRowContext(context.Background(), keyQuery, database, table, indexType) - err := row.Scan(&colName) - if errors.Cause(err) == sql.ErrNoRows { - return "", nil - } else if err != nil { - return "", errors.Annotatef(err, "sql: %s, indexType: %s", keyQuery, indexType) +func getNumericIndex(db *sql.Conn, meta TableMeta) (string, error) { + database, table := meta.DatabaseName(), meta.TableName() + colName2Type := string2Map(meta.ColumnNames(), meta.ColumnTypes()) + keyQuery := fmt.Sprintf("SHOW INDEX FROM `%s`.`%s`", escapeString(database), escapeString(table)) + rows, err := db.QueryContext(context.Background(), keyQuery) + if err != nil { + return "", errors.Annotatef(err, "sql: %s", keyQuery) + } + results, err := GetSpecifiedColumnValuesAndClose(rows, "NON_UNIQUE", "KEY_NAME", "COLUMN_NAME") + if err != nil { + return "", errors.Annotatef(err, "sql: %s", keyQuery) } - return colName, nil + uniqueColumnName := "" + // check primary key first, then unique key + for _, oneRow := range results { + var ok bool + if _, ok = dataTypeNum[colName2Type[oneRow[2]]]; ok && oneRow[1] == "PRIMARY" { + return oneRow[2], nil + } + if uniqueColumnName != "" && oneRow[0] == "0" && ok { + uniqueColumnName = oneRow[2] + } + } + return uniqueColumnName, nil } // FlushTableWithReadLock flush tables with read lock @@ -633,8 +650,8 @@ func createConnWithConsistency(ctx context.Context, db *sql.DB) (*sql.Conn, erro // buildSelectField returns the selecting fields' string(joined by comma(`,`)), // and the number of writable fields. func buildSelectField(db *sql.Conn, dbName, tableName string, completeInsert bool) (string, int, error) { // revive:disable-line:flag-parameter - query := `SELECT COLUMN_NAME,EXTRA FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA=? AND TABLE_NAME=? ORDER BY ORDINAL_POSITION;` - rows, err := db.QueryContext(context.Background(), query, dbName, tableName) + query := fmt.Sprintf("SHOW COLUMNS FROM `%s`.`%s`", escapeString(dbName), escapeString(tableName)) + rows, err := db.QueryContext(context.Background(), query) if err != nil { return "", 0, errors.Annotatef(err, "sql: %s", query) } @@ -642,13 +659,12 @@ func buildSelectField(db *sql.Conn, dbName, tableName string, completeInsert boo availableFields := make([]string, 0) hasGenerateColumn := false - var fieldName string - var extra string - for rows.Next() { - err = rows.Scan(&fieldName, &extra) - if err != nil { - return "", 0, errors.Annotatef(err, "sql: %s", query) - } + results, err := GetSpecifiedColumnValuesAndClose(rows, "FIELD", "EXTRA") + if err != nil { + return "", 0, errors.Annotatef(err, "sql: %s", query) + } + for _, oneRow := range results { + fieldName, extra := oneRow[0], oneRow[1] switch extra { case "STORED GENERATED", "VIRTUAL GENERATED": hasGenerateColumn = true @@ -656,9 +672,6 @@ func buildSelectField(db *sql.Conn, dbName, tableName string, completeInsert boo } availableFields = append(availableFields, wrapBackTicks(escapeString(fieldName))) } - if err = rows.Err(); err != nil { - return "", 0, errors.Annotatef(err, "sql: %s", query) - } if completeInsert || hasGenerateColumn { return strings.Join(availableFields, ","), len(availableFields), nil } @@ -871,29 +884,16 @@ func simpleQueryWithArgs(conn *sql.Conn, handleOneRow func(*sql.Rows) error, sql return errors.Annotatef(rows.Err(), "sql: %s, args: %s", sql, args) } -func pickupPossibleField(dbName, tableName string, db *sql.Conn, conf *Config) (string, error) { - // If detected server is TiDB, try using _tidb_rowid - if conf.ServerInfo.ServerType == ServerTypeTiDB { - ok, err := SelectTiDBRowID(db, dbName, tableName) - if err != nil { - return "", nil - } - if ok { - return "_tidb_rowid", nil - } +func pickupPossibleField(meta TableMeta, db *sql.Conn) (string, error) { + // try using _tidb_rowid first + if meta.HasImplicitRowID() { + return "_tidb_rowid", nil } // try to use pk - fieldName, err := GetPrimaryKeyName(db, dbName, tableName) + fieldName, err := getNumericIndex(db, meta) if err != nil { return "", err } - // try to use first uniqueIndex - if fieldName == "" { - fieldName, err = GetUniqueIndexName(db, dbName, tableName) - if err != nil { - return "", err - } - } // if fieldName == "", there is no proper index return fieldName, nil diff --git a/v4/export/sql_test.go b/v4/export/sql_test.go index 4a956f9b..9de2de58 100644 --- a/v4/export/sql_test.go +++ b/v4/export/sql_test.go @@ -7,7 +7,6 @@ import ( "database/sql" "database/sql/driver" "encoding/csv" - "errors" "fmt" "io" "os" @@ -21,10 +20,18 @@ import ( "github.com/DATA-DOG/go-sqlmock" "github.com/coreos/go-semver/semver" . "github.com/pingcap/check" - "github.com/siddontang/go-mysql/mysql" + "github.com/pingcap/errors" ) -var _ = Suite(&testSQLSuite{}) +var ( + _ = Suite(&testSQLSuite{}) + showIndexHeaders = []string{"Table", "Non_unique", "Key_name", "Seq_in_index", "Column_name", "Collation", "Cardinality", "Sub_part", "Packed", "Null", "Index_type", "Comment", "Index_comment"} +) + +const ( + database = "foo" + table = "bar" +) type testSQLSuite struct{} @@ -80,41 +87,33 @@ func (s *testSQLSuite) TestBuildSelectAllQuery(c *C) { // Test TiDB server. mockConf.ServerInfo.ServerType = ServerTypeTiDB - // _tidb_rowid is available. - mock.ExpectExec("SELECT _tidb_rowid from `test`.`t`"). - WillReturnResult(sqlmock.NewResult(0, 0)) - - orderByClause, err := buildOrderByClause(mockConf, conn, "test", "t") + orderByClause, err := buildOrderByClause(mockConf, conn, database, table, true) c.Assert(err, IsNil) - mock.ExpectQuery("SELECT COLUMN_NAME"). - WithArgs(sqlmock.AnyArg(), sqlmock.AnyArg()). - WillReturnRows(sqlmock.NewRows([]string{"column_name", "extra"}).AddRow("id", "")) + mock.ExpectQuery("SHOW COLUMNS FROM"). + WillReturnRows(sqlmock.NewRows([]string{"Field", "Type", "Null", "Key", "Default", "Extra"}). + AddRow("id", "int(11)", "NO", "PRI", nil, "")) - selectedField, _, err := buildSelectField(conn, "test", "t", false) + selectedField, _, err := buildSelectField(conn, database, table, false) c.Assert(err, IsNil) - q := buildSelectQuery("test", "t", selectedField, "", "", orderByClause) - c.Assert(q, Equals, "SELECT * FROM `test`.`t` ORDER BY `_tidb_rowid`") + q := buildSelectQuery(database, table, selectedField, "", "", orderByClause) + c.Assert(q, Equals, fmt.Sprintf("SELECT * FROM `%s`.`%s` ORDER BY `_tidb_rowid`", database, table)) - // _tidb_rowid is unavailable, or PKIsHandle. - mock.ExpectExec("SELECT _tidb_rowid from `test`.`t`"). - WillReturnError(errors.New(`1054, "Unknown column '_tidb_rowid' in 'field list'"`)) + mock.ExpectQuery(fmt.Sprintf("SHOW INDEX FROM `%s`.`%s`", database, table)). + WillReturnRows(sqlmock.NewRows(showIndexHeaders). + AddRow(table, 0, "PRIMARY", 1, "id", "A", 0, nil, nil, "", "BTREE", "", "")) - mock.ExpectQuery("SELECT column_name FROM information_schema.KEY_COLUMN_USAGE"). - WithArgs("test", "t"). - WillReturnRows(sqlmock.NewRows([]string{"column_name"}).AddRow("id")) - - orderByClause, err = buildOrderByClause(mockConf, conn, "test", "t") + orderByClause, err = buildOrderByClause(mockConf, conn, database, table, false) c.Assert(err, IsNil) - mock.ExpectQuery("SELECT COLUMN_NAME"). - WithArgs(sqlmock.AnyArg(), sqlmock.AnyArg()). - WillReturnRows(sqlmock.NewRows([]string{"column_name", "extra"}).AddRow("id", "")) + mock.ExpectQuery("SHOW COLUMNS FROM"). + WillReturnRows(sqlmock.NewRows([]string{"Field", "Type", "Null", "Key", "Default", "Extra"}). + AddRow("id", "int(11)", "NO", "PRI", nil, "")) - selectedField, _, err = buildSelectField(conn, "test", "t", false) + selectedField, _, err = buildSelectField(conn, database, table, false) c.Assert(err, IsNil) - q = buildSelectQuery("test", "t", selectedField, "", "", orderByClause) - c.Assert(q, Equals, "SELECT * FROM `test`.`t` ORDER BY `id`") + q = buildSelectQuery(database, table, selectedField, "", "", orderByClause) + c.Assert(q, Equals, fmt.Sprintf("SELECT * FROM `%s`.`%s` ORDER BY `id`", database, table)) c.Assert(mock.ExpectationsWereMet(), IsNil) // Test other servers. @@ -124,20 +123,20 @@ func (s *testSQLSuite) TestBuildSelectAllQuery(c *C) { for _, serverTp := range otherServers { mockConf.ServerInfo.ServerType = serverTp cmt := Commentf("server type: %s", serverTp) - mock.ExpectQuery("SELECT column_name FROM information_schema.KEY_COLUMN_USAGE"). - WithArgs("test", "t"). - WillReturnRows(sqlmock.NewRows([]string{"column_name"}).AddRow("id")) - orderByClause, err := buildOrderByClause(mockConf, conn, "test", "t") + mock.ExpectQuery(fmt.Sprintf("SHOW INDEX FROM `%s`.`%s`", database, table)). + WillReturnRows(sqlmock.NewRows(showIndexHeaders). + AddRow(table, 0, "PRIMARY", 1, "id", "A", 0, nil, nil, "", "BTREE", "", "")) + orderByClause, err := buildOrderByClause(mockConf, conn, database, table, false) c.Assert(err, IsNil, cmt) - mock.ExpectQuery("SELECT COLUMN_NAME"). - WithArgs(sqlmock.AnyArg(), sqlmock.AnyArg()). - WillReturnRows(sqlmock.NewRows([]string{"column_name", "extra"}).AddRow("id", "")) + mock.ExpectQuery("SHOW COLUMNS FROM"). + WillReturnRows(sqlmock.NewRows([]string{"Field", "Type", "Null", "Key", "Default", "Extra"}). + AddRow("id", "int(11)", "NO", "PRI", nil, "")) - selectedField, _, err = buildSelectField(conn, "test", "t", false) + selectedField, _, err = buildSelectField(conn, database, table, false) c.Assert(err, IsNil) - q = buildSelectQuery("test", "t", selectedField, "", "", orderByClause) - c.Assert(q, Equals, "SELECT * FROM `test`.`t` ORDER BY `id`", cmt) + q = buildSelectQuery(database, table, selectedField, "", "", orderByClause) + c.Assert(q, Equals, fmt.Sprintf("SELECT * FROM `%s`.`%s` ORDER BY `id`", database, table), cmt) err = mock.ExpectationsWereMet() c.Assert(err, IsNil, cmt) c.Assert(mock.ExpectationsWereMet(), IsNil, cmt) @@ -147,21 +146,20 @@ func (s *testSQLSuite) TestBuildSelectAllQuery(c *C) { for _, serverTp := range otherServers { mockConf.ServerInfo.ServerType = serverTp cmt := Commentf("server type: %s", serverTp) - mock.ExpectQuery("SELECT column_name FROM information_schema.KEY_COLUMN_USAGE"). - WithArgs("test", "t"). - WillReturnRows(sqlmock.NewRows([]string{"column_name"})) + mock.ExpectQuery(fmt.Sprintf("SHOW INDEX FROM `%s`.`%s`", database, table)). + WillReturnRows(sqlmock.NewRows(showIndexHeaders)) - orderByClause, err := buildOrderByClause(mockConf, conn, "test", "t") + orderByClause, err := buildOrderByClause(mockConf, conn, database, table, false) c.Assert(err, IsNil, cmt) - mock.ExpectQuery("SELECT COLUMN_NAME"). - WithArgs(sqlmock.AnyArg(), sqlmock.AnyArg()). - WillReturnRows(sqlmock.NewRows([]string{"column_name", "extra"}).AddRow("id", "")) + mock.ExpectQuery("SHOW COLUMNS FROM"). + WillReturnRows(sqlmock.NewRows([]string{"Field", "Type", "Null", "Key", "Default", "Extra"}). + AddRow("id", "int(11)", "NO", "PRI", nil, "")) selectedField, _, err = buildSelectField(conn, "test", "t", false) c.Assert(err, IsNil) - q := buildSelectQuery("test", "t", selectedField, "", "", orderByClause) - c.Assert(q, Equals, "SELECT * FROM `test`.`t`", cmt) + q := buildSelectQuery(database, table, selectedField, "", "", orderByClause) + c.Assert(q, Equals, fmt.Sprintf("SELECT * FROM `%s`.`%s`", database, table), cmt) err = mock.ExpectationsWereMet() c.Assert(err, IsNil, cmt) c.Assert(mock.ExpectationsWereMet(), IsNil) @@ -173,14 +171,14 @@ func (s *testSQLSuite) TestBuildSelectAllQuery(c *C) { mockConf.ServerInfo.ServerType = ServerType(tp) cmt := Commentf("current server type: ", tp) - mock.ExpectQuery("SELECT COLUMN_NAME"). - WithArgs(sqlmock.AnyArg(), sqlmock.AnyArg()). - WillReturnRows(sqlmock.NewRows([]string{"column_name", "extra"}).AddRow("id", "")) + mock.ExpectQuery("SHOW COLUMNS FROM"). + WillReturnRows(sqlmock.NewRows([]string{"Field", "Type", "Null", "Key", "Default", "Extra"}). + AddRow("id", "int(11)", "NO", "PRI", nil, "")) selectedField, _, err := buildSelectField(conn, "test", "t", false) c.Assert(err, IsNil) - q := buildSelectQuery("test", "t", selectedField, "", "", "") - c.Assert(q, Equals, "SELECT * FROM `test`.`t`", cmt) + q := buildSelectQuery(database, table, selectedField, "", "", "") + c.Assert(q, Equals, fmt.Sprintf("SELECT * FROM `%s`.`%s`", database, table), cmt) c.Assert(mock.ExpectationsWereMet(), IsNil, cmt) } } @@ -198,73 +196,47 @@ func (s *testSQLSuite) TestBuildOrderByClause(c *C) { // Test TiDB server. mockConf.ServerInfo.ServerType = ServerTypeTiDB - // _tidb_rowid is available. - mock.ExpectExec("SELECT _tidb_rowid from `test`.`t`"). - WillReturnResult(sqlmock.NewResult(0, 0)) - - orderByClause, err := buildOrderByClause(mockConf, conn, "test", "t") + orderByClause, err := buildOrderByClause(mockConf, conn, database, table, true) c.Assert(err, IsNil) c.Assert(orderByClause, Equals, orderByTiDBRowID) - // _tidb_rowid is unavailable, or PKIsHandle. - mock.ExpectExec("SELECT _tidb_rowid from `test`.`t`"). - WillReturnError(errors.New(`1054, "Unknown column '_tidb_rowid' in 'field list'"`)) - - mock.ExpectQuery("SELECT column_name FROM information_schema.KEY_COLUMN_USAGE"). - WithArgs("test", "t"). - WillReturnRows(sqlmock.NewRows([]string{"column_name"}).AddRow("id")) + mock.ExpectQuery(fmt.Sprintf("SHOW INDEX FROM `%s`.`%s`", database, table)). + WillReturnRows(sqlmock.NewRows(showIndexHeaders).AddRow(table, 0, "PRIMARY", 1, "id", "A", 0, nil, nil, "", "BTREE", "", "")) - orderByClause, err = buildOrderByClause(mockConf, conn, "test", "t") + orderByClause, err = buildOrderByClause(mockConf, conn, database, table, false) c.Assert(err, IsNil) c.Assert(orderByClause, Equals, "ORDER BY `id`") - // Test other servers. - otherServers := []ServerType{ServerTypeUnknown, ServerTypeMySQL, ServerTypeMariaDB} - // Test table with primary key. - for _, serverTp := range otherServers { - mockConf.ServerInfo.ServerType = serverTp - cmt := Commentf("server type: %s", serverTp) - mock.ExpectQuery("SELECT column_name FROM information_schema.KEY_COLUMN_USAGE"). - WithArgs("test", "t"). - WillReturnRows(sqlmock.NewRows([]string{"column_name"}).AddRow("id")) - orderByClause, err := buildOrderByClause(mockConf, conn, "test", "t") - c.Assert(err, IsNil, cmt) - c.Assert(orderByClause, Equals, "ORDER BY `id`", cmt) - } + mock.ExpectQuery(fmt.Sprintf("SHOW INDEX FROM `%s`.`%s`", database, table)). + WillReturnRows(sqlmock.NewRows(showIndexHeaders).AddRow(table, 0, "PRIMARY", 1, "id", "A", 0, nil, nil, "", "BTREE", "", "")) + orderByClause, err = buildOrderByClause(mockConf, conn, database, table, false) + c.Assert(err, IsNil) + c.Assert(orderByClause, Equals, "ORDER BY `id`") // Test table with joint primary key. - for _, serverTp := range otherServers { - mockConf.ServerInfo.ServerType = serverTp - cmt := Commentf("server type: %s", serverTp) - mock.ExpectQuery("SELECT column_name FROM information_schema.KEY_COLUMN_USAGE"). - WithArgs("test", "t"). - WillReturnRows(sqlmock.NewRows([]string{"column_name"}).AddRow("id").AddRow("name")) - orderByClause, err := buildOrderByClause(mockConf, conn, "test", "t") - c.Assert(err, IsNil, cmt) - c.Assert(orderByClause, Equals, "ORDER BY `id`,`name`", cmt) - } + mock.ExpectQuery(fmt.Sprintf("SHOW INDEX FROM `%s`.`%s`", database, table)). + WillReturnRows(sqlmock.NewRows(showIndexHeaders). + AddRow(table, 0, "PRIMARY", 1, "id", "A", 0, nil, nil, "", "BTREE", "", ""). + AddRow(table, 0, "PRIMARY", 2, "name", "A", 0, nil, nil, "", "BTREE", "", "")) + orderByClause, err = buildOrderByClause(mockConf, conn, database, table, false) + c.Assert(err, IsNil) + c.Assert(orderByClause, Equals, "ORDER BY `id`,`name`") // Test table without primary key. - for _, serverTp := range otherServers { - mockConf.ServerInfo.ServerType = serverTp - cmt := Commentf("server type: %s", serverTp) - mock.ExpectQuery("SELECT column_name FROM information_schema.KEY_COLUMN_USAGE"). - WithArgs("test", "t"). - WillReturnRows(sqlmock.NewRows([]string{"column_name"})) + mock.ExpectQuery(fmt.Sprintf("SHOW INDEX FROM `%s`.`%s`", database, table)). + WillReturnRows(sqlmock.NewRows(showIndexHeaders)) - orderByClause, err := buildOrderByClause(mockConf, conn, "test", "t") - c.Assert(err, IsNil, cmt) - c.Assert(orderByClause, Equals, "", cmt) - } + orderByClause, err = buildOrderByClause(mockConf, conn, database, table, false) + c.Assert(err, IsNil) + c.Assert(orderByClause, Equals, "") // Test when config.SortByPk is disabled. mockConf.SortByPk = false - for tp := ServerTypeUnknown; tp < ServerTypeAll; tp++ { - mockConf.ServerInfo.ServerType = ServerType(tp) - cmt := Commentf("current server type: ", tp) + for _, hasImplicitRowID := range []bool{false, true} { + cmt := Commentf("current hasImplicitRowID: ", hasImplicitRowID) - orderByClause, err := buildOrderByClause(mockConf, conn, "test", "t") + orderByClause, err := buildOrderByClause(mockConf, conn, database, table, hasImplicitRowID) c.Assert(err, IsNil, cmt) c.Assert(orderByClause, Equals, "", cmt) } @@ -278,9 +250,9 @@ func (s *testSQLSuite) TestBuildSelectField(c *C) { c.Assert(err, IsNil) // generate columns not found - mock.ExpectQuery("SELECT COLUMN_NAME"). - WithArgs(sqlmock.AnyArg(), sqlmock.AnyArg()). - WillReturnRows(sqlmock.NewRows([]string{"column_name", "extra"}).AddRow("id", "")) + mock.ExpectQuery("SHOW COLUMNS FROM"). + WillReturnRows(sqlmock.NewRows([]string{"Field", "Type", "Null", "Key", "Default", "Extra"}). + AddRow("id", "int(11)", "NO", "PRI", nil, "")) selectedField, _, err := buildSelectField(conn, "test", "t", false) c.Assert(selectedField, Equals, "*") @@ -288,10 +260,11 @@ func (s *testSQLSuite) TestBuildSelectField(c *C) { c.Assert(mock.ExpectationsWereMet(), IsNil) // user assigns completeInsert - mock.ExpectQuery("SELECT COLUMN_NAME"). - WithArgs(sqlmock.AnyArg(), sqlmock.AnyArg()). - WillReturnRows(sqlmock.NewRows([]string{"column_name", "extra"}).AddRow("id", ""). - AddRow("name", "").AddRow("quo`te", "")) + mock.ExpectQuery("SHOW COLUMNS FROM"). + WillReturnRows(sqlmock.NewRows([]string{"Field", "Type", "Null", "Key", "Default", "Extra"}). + AddRow("id", "int(11)", "NO", "PRI", nil, ""). + AddRow("name", "varchar(12)", "NO", "", nil, ""). + AddRow("quo`te", "varchar(12)", "NO", "UNI", nil, "")) selectedField, _, err = buildSelectField(conn, "test", "t", true) c.Assert(selectedField, Equals, "`id`,`name`,`quo``te`") @@ -299,10 +272,12 @@ func (s *testSQLSuite) TestBuildSelectField(c *C) { c.Assert(mock.ExpectationsWereMet(), IsNil) // found generate columns, rest columns is `id`,`name` - mock.ExpectQuery("SELECT COLUMN_NAME"). - WithArgs(sqlmock.AnyArg(), sqlmock.AnyArg()). - WillReturnRows(sqlmock.NewRows([]string{"column_name", "extra"}). - AddRow("id", "").AddRow("name", "").AddRow("quo`te", "").AddRow("generated", "VIRTUAL GENERATED")) + mock.ExpectQuery("SHOW COLUMNS FROM"). + WillReturnRows(sqlmock.NewRows([]string{"Field", "Type", "Null", "Key", "Default", "Extra"}). + AddRow("id", "int(11)", "NO", "PRI", nil, ""). + AddRow("name", "varchar(12)", "NO", "", nil, ""). + AddRow("quo`te", "varchar(12)", "NO", "UNI", nil, ""). + AddRow("generated", "varchar(12)", "NO", "", nil, "VIRTUAL GENERATED")) selectedField, _, err = buildSelectField(conn, "test", "t", false) c.Assert(selectedField, Equals, "`id`,`name`,`quo``te`") @@ -359,64 +334,64 @@ func (s *testSQLSuite) TestShowCreateView(c *C) { } func (s *testSQLSuite) TestGetSuitableRows(c *C) { - db, mock, err := sqlmock.New() - c.Assert(err, IsNil) - defer db.Close() - conn, err := db.Conn(context.Background()) - c.Assert(err, IsNil) - tctx, cancel := tcontext.Background().WithCancel() - defer cancel() - const ( - query = "select AVG_ROW_LENGTH from INFORMATION_SCHEMA.TABLES where table_schema=\\? and table_name=\\?;" - database = "foo" - table = "bar" - ) - testCases := []struct { avgRowLength uint64 expectedRows uint64 - returnErr error }{ - { - 32, - 200000, - sql.ErrNoRows, - }, { 0, 200000, - nil, }, { 32, 1000000, - nil, }, { 1024, 131072, - nil, }, { 4096, 32768, - nil, }, } for _, testCase := range testCases { - if testCase.returnErr == nil { - mock.ExpectQuery(query).WithArgs(database, table). - WillReturnRows(sqlmock.NewRows([]string{"AVG_ROW_LENGTH"}). - AddRow(testCase.avgRowLength)) - } else { - mock.ExpectQuery(query).WithArgs(database, table). - WillReturnError(testCase.returnErr) - } - rows := GetSuitableRows(tctx, conn, database, table) + rows := GetSuitableRows(testCase.avgRowLength) c.Assert(rows, Equals, testCase.expectedRows) } } +func (s *testSQLSuite) TestSelectTiDBRowID(c *C) { + db, mock, err := sqlmock.New() + c.Assert(err, IsNil) + defer db.Close() + conn, err := db.Conn(context.Background()) + c.Assert(err, IsNil) + database, table := "test", "t" + + // _tidb_rowid is unavailable, or PKIsHandle. + mock.ExpectExec("SELECT _tidb_rowid from `test`.`t`"). + WillReturnError(errors.New(`1054, "Unknown column '_tidb_rowid' in 'field list'"`)) + hasImplicitRowID, err := SelectTiDBRowID(conn, database, table) + c.Assert(err, IsNil) + c.Assert(hasImplicitRowID, IsFalse) + + // _tidb_rowid is available. + mock.ExpectExec("SELECT _tidb_rowid from `test`.`t`"). + WillReturnResult(sqlmock.NewResult(0, 0)) + hasImplicitRowID, err = SelectTiDBRowID(conn, database, table) + c.Assert(err, IsNil) + c.Assert(hasImplicitRowID, IsTrue) + + // _tidb_rowid returns error + expectedErr := errors.New("mock error") + mock.ExpectExec("SELECT _tidb_rowid from `test`.`t`"). + WillReturnError(expectedErr) + hasImplicitRowID, err = SelectTiDBRowID(conn, database, table) + c.Assert(errors.Cause(err), Equals, expectedErr) + c.Assert(hasImplicitRowID, IsFalse) +} + func (s *testSQLSuite) TestBuildTableSampleQueries(c *C) { db, mock, err := sqlmock.New() c.Assert(err, IsNil) @@ -437,11 +412,6 @@ func (s *testSQLSuite) TestBuildTableSampleQueries(c *C) { ServerVersion: tableSampleVersion, } - const ( - database = "foo" - table = "bar" - ) - testCases := []struct { handleColNames []string handleColTypes []string @@ -458,7 +428,7 @@ func (s *testSQLSuite) TestBuildTableSampleQueries(c *C) { }, { []string{"a"}, - []string{"bigint"}, + []string{"BIGINT"}, [][]driver.Value{{1}}, []string{"`a`<1", "`a`>=1"}, false, @@ -466,7 +436,7 @@ func (s *testSQLSuite) TestBuildTableSampleQueries(c *C) { // check whether dumpling can turn to dump whole table { []string{"a"}, - []string{"bigint"}, + []string{"BIGINT"}, [][]driver.Value{}, nil, false, @@ -474,21 +444,21 @@ func (s *testSQLSuite) TestBuildTableSampleQueries(c *C) { // check whether dumpling can turn to dump whole table { []string{"_tidb_rowid"}, - []string{"bigint"}, + []string{"BIGINT"}, [][]driver.Value{}, nil, true, }, { []string{"_tidb_rowid"}, - []string{"bigint"}, + []string{"BIGINT"}, [][]driver.Value{{1}}, []string{"`_tidb_rowid`<1", "`_tidb_rowid`>=1"}, true, }, { []string{"a"}, - []string{"bigint"}, + []string{"BIGINT"}, [][]driver.Value{ {1}, {2}, @@ -499,14 +469,14 @@ func (s *testSQLSuite) TestBuildTableSampleQueries(c *C) { }, { []string{"a", "b"}, - []string{"bigint", "bigint"}, + []string{"BIGINT", "BIGINT"}, [][]driver.Value{{1, 2}}, []string{"`a`<1 or(`a`=1 and `b`<2)", "`a`>1 or(`a`=1 and `b`>=2)"}, false, }, { []string{"a", "b"}, - []string{"bigint", "bigint"}, + []string{"BIGINT", "BIGINT"}, [][]driver.Value{ {1, 2}, {3, 4}, @@ -522,7 +492,7 @@ func (s *testSQLSuite) TestBuildTableSampleQueries(c *C) { }, { []string{"a", "b", "c"}, - []string{"bigint", "bigint", "bigint"}, + []string{"BIGINT", "BIGINT", "BIGINT"}, [][]driver.Value{ {1, 2, 3}, {4, 5, 6}, @@ -536,7 +506,7 @@ func (s *testSQLSuite) TestBuildTableSampleQueries(c *C) { }, { []string{"a", "b", "c"}, - []string{"bigint", "bigint", "bigint"}, + []string{"BIGINT", "BIGINT", "BIGINT"}, [][]driver.Value{ {1, 2, 3}, {1, 4, 5}, @@ -550,7 +520,7 @@ func (s *testSQLSuite) TestBuildTableSampleQueries(c *C) { }, { []string{"a", "b", "c"}, - []string{"bigint", "bigint", "bigint"}, + []string{"BIGINT", "BIGINT", "BIGINT"}, [][]driver.Value{ {1, 2, 3}, {1, 2, 8}, @@ -565,7 +535,7 @@ func (s *testSQLSuite) TestBuildTableSampleQueries(c *C) { // special case: avoid return same samples { []string{"a", "b", "c"}, - []string{"bigint", "bigint", "bigint"}, + []string{"BIGINT", "BIGINT", "BIGINT"}, [][]driver.Value{ {1, 2, 3}, {1, 2, 3}, @@ -580,7 +550,7 @@ func (s *testSQLSuite) TestBuildTableSampleQueries(c *C) { // special case: numbers has bigger lexicographically order but lower number { []string{"a", "b", "c"}, - []string{"bigint", "bigint", "bigint"}, + []string{"BIGINT", "BIGINT", "BIGINT"}, [][]driver.Value{ {12, 2, 3}, {111, 4, 5}, @@ -595,7 +565,7 @@ func (s *testSQLSuite) TestBuildTableSampleQueries(c *C) { // test string fields { []string{"a", "b", "c"}, - []string{"bigint", "bigint", "varchar"}, + []string{"BIGINT", "BIGINT", "varchar"}, [][]driver.Value{ {1, 2, "3"}, {1, 4, "5"}, @@ -609,7 +579,7 @@ func (s *testSQLSuite) TestBuildTableSampleQueries(c *C) { }, { []string{"a", "b", "c", "d"}, - []string{"bigint", "bigint", "bigint", "bigint"}, + []string{"BIGINT", "BIGINT", "BIGINT", "BIGINT"}, [][]driver.Value{ {1, 2, 3, 4}, {5, 6, 7, 8}, @@ -663,30 +633,24 @@ func (s *testSQLSuite) TestBuildTableSampleQueries(c *C) { quotaCols = append(quotaCols, wrapBackTicks(col)) } selectFields := strings.Join(quotaCols, ",") - meta := &tableMeta{ - database: database, - table: table, - selectedField: selectFields, - specCmts: []string{ + meta := &mockTableIR{ + dbName: database, + tblName: table, + selectedField: selectFields, + hasImplicitRowID: testCase.hasTiDBRowID, + colTypes: handleColTypes, + colNames: handleColNames, + specCmt: []string{ "/*!40101 SET NAMES binary*/;", }, } - if testCase.hasTiDBRowID { - mock.ExpectExec(fmt.Sprintf("SELECT _tidb_rowid from `%s`.`%s` LIMIT 0", database, table)). - WillReturnResult(sqlmock.NewResult(0, 0)) - } else { - mock.ExpectExec(fmt.Sprintf("SELECT _tidb_rowid from `%s`.`%s` LIMIT 0", database, table)). - WillReturnError(&mysql.MyError{ - Code: mysql.ER_BAD_FIELD_ERROR, - State: "42S22", - Message: "Unknown column '_tidb_rowid' in 'field list'", - }) - rows := sqlmock.NewRows([]string{"COLUMN_NAME", "DATA_TYPE"}) - for i := range handleColNames { - rows.AddRow(handleColNames[i], handleColTypes[i]) + if !testCase.hasTiDBRowID { + rows := sqlmock.NewRows(showIndexHeaders) + for i, handleColName := range handleColNames { + rows.AddRow(table, 0, "PRIMARY", i, handleColName, "A", 0, nil, nil, "", "BTREE", "", "") } - mock.ExpectQuery("SELECT c.COLUMN_NAME, DATA_TYPE FROM").WithArgs(database, table).WillReturnRows(rows) + mock.ExpectQuery(fmt.Sprintf("SHOW INDEX FROM `%s`.`%s`", database, table)).WillReturnRows(rows) } rows := sqlmock.NewRows(handleColNames) @@ -694,17 +658,21 @@ func (s *testSQLSuite) TestBuildTableSampleQueries(c *C) { rows.AddRow(handleVal...) } mock.ExpectQuery(fmt.Sprintf("SELECT .* FROM `%s`.`%s` TABLESAMPLE REGIONS", database, table)).WillReturnRows(rows) - - rows = sqlmock.NewRows([]string{"COLUMN_NAME", "EXTRA"}) - for _, handleCol := range handleColNames { - rows.AddRow(handleCol, "") - } - mock.ExpectQuery("SELECT COLUMN_NAME,EXTRA FROM INFORMATION_SCHEMA.COLUMNS").WithArgs(database, table). - WillReturnRows(rows) - // special case, no value found, will scan whole table and try build order clause + // special case, no enough value to split chunks if len(handleVals) == 0 { - mock.ExpectExec(fmt.Sprintf("SELECT _tidb_rowid from `%s`.`%s` LIMIT 0", database, table)). - WillReturnResult(sqlmock.NewResult(0, 0)) + if !testCase.hasTiDBRowID { + rows = sqlmock.NewRows(showIndexHeaders) + for i, handleColName := range handleColNames { + rows.AddRow(table, 0, "PRIMARY", i, handleColName, "A", 0, nil, nil, "", "BTREE", "", "") + } + mock.ExpectQuery(fmt.Sprintf("SHOW INDEX FROM `%s`.`%s`", database, table)).WillReturnRows(rows) + mock.ExpectQuery("SHOW INDEX FROM").WillReturnRows(sqlmock.NewRows(showIndexHeaders)) + } else { + d.conf.Rows = 200000 + mock.ExpectQuery("EXPLAIN SELECT `_tidb_rowid`"). + WillReturnRows(sqlmock.NewRows([]string{"id", "count", "task", "operator info"}). + AddRow("IndexReader_5", "0.00", "root", "index:IndexScan_4")) + } } c.Assert(d.concurrentDumpTable(tctx, conn, meta, taskChan), IsNil) @@ -723,14 +691,13 @@ func (s *testSQLSuite) TestBuildTableSampleQueries(c *C) { // special case, no value found if len(handleVals) == 0 { - orderByClause = orderByTiDBRowID - query := buildSelectQuery(database, table, "*", "", "", orderByClause) + query := buildSelectQuery(database, table, selectFields, "", "", orderByClause) checkQuery(0, query) continue } for i, w := range testCase.expectedWhereClauses { - query := buildSelectQuery(database, table, "*", "", buildWhereCondition(d.conf, w), orderByClause) + query := buildSelectQuery(database, table, selectFields, "", buildWhereCondition(d.conf, w), orderByClause) checkQuery(i, query) } } @@ -814,6 +781,7 @@ func (s *testSQLSuite) TestBuildRegionQueriesWithoutPartition(c *C) { ServerType: ServerTypeTiDB, ServerVersion: gcSafePointVersion, } + d.conf.Rows = 200000 database := "foo" table := "bar" @@ -829,7 +797,7 @@ func (s *testSQLSuite) TestBuildRegionQueriesWithoutPartition(c *C) { {"7480000000000000FF3300000000000000F8", "7480000000000000FF3300000000000000F8"}, }, []string{"a"}, - []string{"bigint"}, + []string{"BIGINT"}, []string{ "", }, @@ -840,7 +808,7 @@ func (s *testSQLSuite) TestBuildRegionQueriesWithoutPartition(c *C) { {"7480000000000000FF3300000000000000F8", "7480000000000000FF3300000000000000F8"}, }, []string{"_tidb_rowid"}, - []string{"bigint"}, + []string{"BIGINT"}, []string{ "", }, @@ -854,7 +822,7 @@ func (s *testSQLSuite) TestBuildRegionQueriesWithoutPartition(c *C) { {"7480000000000000FF335F728000000000FF2BF2010000000000FA", "tableID=51, _tidb_rowid=2880001"}, }, []string{"a"}, - []string{"bigint"}, + []string{"BIGINT"}, []string{ "`a`<960001", "`a`>=960001 and `a`<1920001", @@ -873,7 +841,7 @@ func (s *testSQLSuite) TestBuildRegionQueriesWithoutPartition(c *C) { {"7480000000000000FF335F728000000000FF2BF2010000000000FA", "tableID=51, _tidb_rowid=2880001"}, }, []string{"_tidb_rowid"}, - []string{"bigint"}, + []string{"BIGINT"}, []string{ "`_tidb_rowid`<960001", "`_tidb_rowid`>=960001 and `_tidb_rowid`<1920001", @@ -892,16 +860,15 @@ func (s *testSQLSuite) TestBuildRegionQueriesWithoutPartition(c *C) { // Test build tasks through table region taskChan := make(chan Task, 128) - quotaCols := make([]string, 0, len(handleColNames)) - for _, col := range handleColNames { - quotaCols = append(quotaCols, wrapBackTicks(col)) - } - selectFields := strings.Join(quotaCols, ",") - meta := &tableMeta{ - database: database, - table: table, - selectedField: selectFields, - specCmts: []string{ + meta := &mockTableIR{ + dbName: database, + tblName: table, + selectedField: "*", + selectedLen: len(handleColNames), + hasImplicitRowID: testCase.hasTiDBRowID, + colTypes: handleColTypes, + colNames: handleColNames, + specCmt: []string{ "/*!40101 SET NAMES binary*/;", }, } @@ -909,21 +876,12 @@ func (s *testSQLSuite) TestBuildRegionQueriesWithoutPartition(c *C) { mock.ExpectQuery("SELECT PARTITION_NAME from INFORMATION_SCHEMA.PARTITIONS"). WithArgs(database, table).WillReturnRows(sqlmock.NewRows([]string{"PARTITION_NAME"}).AddRow(nil)) - if testCase.hasTiDBRowID { - mock.ExpectExec(fmt.Sprintf("SELECT _tidb_rowid from `%s`.`%s` LIMIT 0", database, table)). - WillReturnResult(sqlmock.NewResult(0, 0)) - } else { - mock.ExpectExec(fmt.Sprintf("SELECT _tidb_rowid from `%s`.`%s` LIMIT 0", database, table)). - WillReturnError(&mysql.MyError{ - Code: mysql.ER_BAD_FIELD_ERROR, - State: "42S22", - Message: "Unknown column '_tidb_rowid' in 'field list'", - }) - rows := sqlmock.NewRows([]string{"COLUMN_NAME", "DATA_TYPE"}) - for i := range handleColNames { - rows.AddRow(handleColNames[i], handleColTypes[i]) + if !testCase.hasTiDBRowID { + rows := sqlmock.NewRows(showIndexHeaders) + for i, handleColName := range handleColNames { + rows.AddRow(table, 0, "PRIMARY", i, handleColName, "A", 0, nil, nil, "", "BTREE", "", "") } - mock.ExpectQuery("SELECT c.COLUMN_NAME, DATA_TYPE FROM").WithArgs(database, table).WillReturnRows(rows) + mock.ExpectQuery(fmt.Sprintf("SHOW INDEX FROM `%s`.`%s`", database, table)).WillReturnRows(rows) } rows := sqlmock.NewRows([]string{"START_KEY", "tidb_decode_key(START_KEY)"}) @@ -933,19 +891,15 @@ func (s *testSQLSuite) TestBuildRegionQueriesWithoutPartition(c *C) { mock.ExpectQuery("SELECT START_KEY,tidb_decode_key\\(START_KEY\\) from INFORMATION_SCHEMA.TIKV_REGION_STATUS"). WithArgs(database, table).WillReturnRows(rows) - rows = sqlmock.NewRows([]string{"COLUMN_NAME", "EXTRA"}) - for _, handleCol := range handleColNames { - rows.AddRow(handleCol, "") - } - mock.ExpectQuery("SELECT COLUMN_NAME,EXTRA FROM INFORMATION_SCHEMA.COLUMNS").WithArgs(database, table). - WillReturnRows(rows) - orderByClause := buildOrderByClauseString(handleColNames) // special case, no enough value to split chunks - if len(regionResults) <= 1 { - mock.ExpectExec(fmt.Sprintf("SELECT _tidb_rowid from `%s`.`%s` LIMIT 0", database, table)). - WillReturnResult(sqlmock.NewResult(0, 0)) - orderByClause = orderByTiDBRowID + if !testCase.hasTiDBRowID && len(regionResults) <= 1 { + rows = sqlmock.NewRows(showIndexHeaders) + for i, handleColName := range handleColNames { + rows.AddRow(table, 0, "PRIMARY", i, handleColName, "A", 0, nil, nil, "", "BTREE", "", "") + } + mock.ExpectQuery(fmt.Sprintf("SHOW INDEX FROM `%s`.`%s`", database, table)).WillReturnRows(rows) + mock.ExpectQuery("SHOW INDEX FROM").WillReturnRows(sqlmock.NewRows(showIndexHeaders)) } c.Assert(d.concurrentDumpTable(tctx, conn, meta, taskChan), IsNil) c.Assert(mock.ExpectationsWereMet(), IsNil) @@ -982,8 +936,6 @@ func (s *testSQLSuite) TestBuildRegionQueriesWithPartitions(c *C) { ServerType: ServerTypeTiDB, ServerVersion: gcSafePointVersion, } - database := "foo" - table := "bar" partitions := []string{"p0", "p1", "p2"} testCases := []struct { @@ -1010,7 +962,7 @@ func (s *testSQLSuite) TestBuildRegionQueriesWithPartitions(c *C) { }, }, []string{"_tidb_rowid"}, - []string{"bigint"}, + []string{"BIGINT"}, [][]string{ {""}, {""}, {""}, }, @@ -1039,7 +991,7 @@ func (s *testSQLSuite) TestBuildRegionQueriesWithPartitions(c *C) { }, }, []string{"_tidb_rowid"}, - []string{"bigint"}, + []string{"BIGINT"}, [][]string{ { "`_tidb_rowid`<10001", @@ -1076,7 +1028,7 @@ func (s *testSQLSuite) TestBuildRegionQueriesWithPartitions(c *C) { }, }, []string{"a"}, - []string{"bigint"}, + []string{"BIGINT"}, [][]string{ { @@ -1105,16 +1057,15 @@ func (s *testSQLSuite) TestBuildRegionQueriesWithPartitions(c *C) { // Test build tasks through table region taskChan := make(chan Task, 128) - quotaCols := make([]string, 0, len(handleColNames)) - for _, col := range handleColNames { - quotaCols = append(quotaCols, wrapBackTicks(col)) - } - selectFields := strings.Join(quotaCols, ",") - meta := &tableMeta{ - database: database, - table: table, - selectedField: selectFields, - specCmts: []string{ + meta := &mockTableIR{ + dbName: database, + tblName: table, + selectedField: "*", + selectedLen: len(handleColNames), + hasImplicitRowID: testCase.hasTiDBRowID, + colTypes: handleColTypes, + colNames: handleColNames, + specCmt: []string{ "/*!40101 SET NAMES binary*/;", }, } @@ -1126,21 +1077,12 @@ func (s *testSQLSuite) TestBuildRegionQueriesWithPartitions(c *C) { mock.ExpectQuery("SELECT PARTITION_NAME from INFORMATION_SCHEMA.PARTITIONS"). WithArgs(database, table).WillReturnRows(rows) - if testCase.hasTiDBRowID { - mock.ExpectExec(fmt.Sprintf("SELECT _tidb_rowid from `%s`.`%s` LIMIT 0", database, table)). - WillReturnResult(sqlmock.NewResult(0, 0)) - } else { - mock.ExpectExec(fmt.Sprintf("SELECT _tidb_rowid from `%s`.`%s` LIMIT 0", database, table)). - WillReturnError(&mysql.MyError{ - Code: mysql.ER_BAD_FIELD_ERROR, - State: "42S22", - Message: "Unknown column '_tidb_rowid' in 'field list'", - }) - rows = sqlmock.NewRows([]string{"COLUMN_NAME", "DATA_TYPE"}) - for i := range handleColNames { - rows.AddRow(handleColNames[i], handleColTypes[i]) + if !testCase.hasTiDBRowID { + rows = sqlmock.NewRows(showIndexHeaders) + for i, handleColName := range handleColNames { + rows.AddRow(table, 0, "PRIMARY", i, handleColName, "A", 0, nil, nil, "", "BTREE", "", "") } - mock.ExpectQuery("SELECT c.COLUMN_NAME, DATA_TYPE FROM").WithArgs(database, table).WillReturnRows(rows) + mock.ExpectQuery(fmt.Sprintf("SHOW INDEX FROM `%s`.`%s`", database, table)).WillReturnRows(rows) } for i, partition := range partitions { @@ -1152,20 +1094,6 @@ func (s *testSQLSuite) TestBuildRegionQueriesWithPartitions(c *C) { WillReturnRows(rows) } - for range partitions { - rows = sqlmock.NewRows([]string{"COLUMN_NAME", "EXTRA"}) - for _, handleCol := range handleColNames { - rows.AddRow(handleCol, "") - } - mock.ExpectQuery("SELECT COLUMN_NAME,EXTRA FROM INFORMATION_SCHEMA.COLUMNS").WithArgs(database, table). - WillReturnRows(rows) - // special case, dump whole table - if testCase.dumpWholeTable { - mock.ExpectExec(fmt.Sprintf("SELECT _tidb_rowid from `%s`.`%s` LIMIT 0", database, table)). - WillReturnResult(sqlmock.NewResult(0, 0)) - } - } - orderByClause := buildOrderByClauseString(handleColNames) c.Assert(d.concurrentDumpTable(tctx, conn, meta, taskChan), IsNil) c.Assert(mock.ExpectationsWereMet(), IsNil) @@ -1244,10 +1172,10 @@ func (s *testSQLSuite) TestBuildVersion3RegionQueries(c *C) { database := "test" conf.Tables = DatabaseTables{ database: []*TableInfo{ - {"t1", TableTypeBase}, - {"t2", TableTypeBase}, - {"t3", TableTypeBase}, - {"t4", TableTypeBase}, + {"t1", 0, TableTypeBase}, + {"t2", 0, TableTypeBase}, + {"t3", 0, TableTypeBase}, + {"t4", 0, TableTypeBase}, }, } d := &Dumper{ @@ -1380,7 +1308,7 @@ func (s *testSQLSuite) TestBuildVersion3RegionQueries(c *C) { { "t1", []string{"a"}, - []string{"int"}, + []string{"INT"}, []string{ "`a`<960001", "`a`>=960001 and `a`<1920001", @@ -1396,7 +1324,7 @@ func (s *testSQLSuite) TestBuildVersion3RegionQueries(c *C) { { "t2", []string{"a"}, - []string{"int"}, + []string{"INT"}, []string{ "`a`<960001", "`a`>=960001 and `a`<2960001", @@ -1409,7 +1337,7 @@ func (s *testSQLSuite) TestBuildVersion3RegionQueries(c *C) { { "t3", []string{"_tidb_rowid"}, - []string{"int"}, + []string{"BIGINT"}, []string{ "`_tidb_rowid`<81584", "`_tidb_rowid`>=81584 and `_tidb_rowid`<1041584", @@ -1426,7 +1354,7 @@ func (s *testSQLSuite) TestBuildVersion3RegionQueries(c *C) { { "t4", []string{"_tidb_rowid"}, - []string{"int"}, + []string{"BIGINT"}, []string{ "`_tidb_rowid`<180001", "`_tidb_rowid`>=180001 and `_tidb_rowid`<1140001", @@ -1450,43 +1378,25 @@ func (s *testSQLSuite) TestBuildVersion3RegionQueries(c *C) { // Test build tasks through table region taskChan := make(chan Task, 128) - quotaCols := make([]string, 0, len(handleColNames)) - for _, col := range handleColNames { - quotaCols = append(quotaCols, wrapBackTicks(col)) - } - selectFields := strings.Join(quotaCols, ",") - meta := &tableMeta{ - database: database, - table: table, - selectedField: selectFields, - specCmts: []string{ + meta := &mockTableIR{ + dbName: database, + tblName: table, + selectedField: "*", + hasImplicitRowID: testCase.hasTiDBRowID, + colNames: handleColNames, + colTypes: handleColTypes, + specCmt: []string{ "/*!40101 SET NAMES binary*/;", }, } - if testCase.hasTiDBRowID { - mock.ExpectExec("SELECT _tidb_rowid"). - WillReturnResult(sqlmock.NewResult(0, 0)) - } else { - mock.ExpectExec("SELECT _tidb_rowid"). - WillReturnError(&mysql.MyError{ - Code: mysql.ER_BAD_FIELD_ERROR, - State: "42S22", - Message: "Unknown column '_tidb_rowid' in 'field list'", - }) - rows := sqlmock.NewRows([]string{"COLUMN_NAME", "DATA_TYPE"}) - for i := range handleColNames { - rows.AddRow(handleColNames[i], handleColTypes[i]) + if !testCase.hasTiDBRowID { + rows := sqlmock.NewRows(showIndexHeaders) + for i, handleColName := range handleColNames { + rows.AddRow(table, 0, "PRIMARY", i, handleColName, "A", 0, nil, nil, "", "BTREE", "", "") } - mock.ExpectQuery("SELECT c.COLUMN_NAME, DATA_TYPE FROM").WithArgs(database, table).WillReturnRows(rows) - } - - rows := sqlmock.NewRows([]string{"COLUMN_NAME", "EXTRA"}) - for _, handleCol := range handleColNames { - rows.AddRow(handleCol, "") + mock.ExpectQuery(fmt.Sprintf("SHOW INDEX FROM `%s`.`%s`", database, table)).WillReturnRows(rows) } - mock.ExpectQuery("SELECT COLUMN_NAME,EXTRA FROM INFORMATION_SCHEMA.COLUMNS").WithArgs(database, table). - WillReturnRows(rows) orderByClause := buildOrderByClauseString(handleColNames) c.Assert(d.concurrentDumpTable(tctx, conn, meta, taskChan), IsNil) diff --git a/v4/export/status.go b/v4/export/status.go index 909469c0..7c368783 100644 --- a/v4/export/status.go +++ b/v4/export/status.go @@ -3,7 +3,6 @@ package export import ( - "database/sql" "fmt" "time" @@ -31,8 +30,8 @@ func (d *Dumper) runLogProgress(tctx *tcontext.Context) { nanoseconds := float64(time.Since(lastCheckpoint).Nanoseconds()) completedTables := ReadCounter(finishedTablesCounter, conf.Labels) - finishedBytes := ReadCounter(finishedSizeCounter, conf.Labels) - finishedRows := ReadCounter(finishedRowsCounter, conf.Labels) + finishedBytes := ReadGauge(finishedSizeGauge, conf.Labels) + finishedRows := ReadGauge(finishedRowsGauge, conf.Labels) estimateTotalRows := ReadCounter(estimateTotalRowsCounter, conf.Labels) tctx.L().Info("progress", @@ -60,23 +59,3 @@ func calculateTableCount(m DatabaseTables) int { } return cnt } - -func (d *Dumper) getEstimateTotalRowsCount(tctx *tcontext.Context, conn *sql.Conn) error { - conf := d.conf - var totalCount uint64 - for db, tables := range conf.Tables { - for _, m := range tables { - if m.Type == TableTypeBase { - // get pk or uk for explain - field, err := pickupPossibleField(db, m.Name, conn, conf) - if err != nil { - return err - } - c := estimateCount(tctx, db, m.Name, conn, field, conf) - totalCount += c - } - } - } - AddCounter(estimateTotalRowsCounter, conf.Labels, float64(totalCount)) - return nil -} diff --git a/v4/export/test_util.go b/v4/export/test_util.go index f8827371..d5f7e579 100644 --- a/v4/export/test_util.go +++ b/v4/export/test_util.go @@ -58,17 +58,19 @@ func newMockMetaIR(targetName string, meta string, specialComments []string) Met } type mockTableIR struct { - dbName string - tblName string - chunIndex int - data [][]driver.Value - selectedField string - specCmt []string - colTypes []string - colNames []string - escapeBackSlash bool - rowErr error - rows *sql.Rows + dbName string + tblName string + chunIndex int + data [][]driver.Value + selectedField string + selectedLen int + specCmt []string + colTypes []string + colNames []string + escapeBackSlash bool + hasImplicitRowID bool + rowErr error + rows *sql.Rows SQLRowIter } @@ -84,7 +86,15 @@ func (m *mockTableIR) ShowCreateView() string { return "" } -func (m *mockTableIR) Start(tctx *tcontext.Context, conn *sql.Conn) error { +func (m *mockTableIR) AvgRowLength() uint64 { + return 0 +} + +func (m *mockTableIR) HasImplicitRowID() bool { + return m.hasImplicitRowID +} + +func (m *mockTableIR) Start(_ *tcontext.Context, conn *sql.Conn) error { return nil } @@ -116,6 +126,10 @@ func (m *mockTableIR) SelectedField() string { return m.selectedField } +func (m *mockTableIR) SelectedLen() int { + return m.selectedLen +} + func (m *mockTableIR) SpecialComments() StringIter { return newStringIter(m.specCmt...) } @@ -160,6 +174,7 @@ func newMockTableIR(databaseName, tableName string, data [][]driver.Value, speci data: data, specCmt: specialComments, selectedField: "*", + selectedLen: len(colTypes), colTypes: colTypes, SQLRowIter: nil, } diff --git a/v4/export/util.go b/v4/export/util.go index 6df177c9..5f29119c 100644 --- a/v4/export/util.go +++ b/v4/export/util.go @@ -66,3 +66,11 @@ func sameStringArray(a, b []string) bool { } return true } + +func string2Map(a, b []string) map[string]string { + a2b := make(map[string]string, len(a)) + for i, str := range a { + a2b[str] = b[i] + } + return a2b +} diff --git a/v4/export/writer_util.go b/v4/export/writer_util.go index 7a9f7f69..18bcc6e0 100755 --- a/v4/export/writer_util.go +++ b/v4/export/writer_util.go @@ -76,7 +76,7 @@ func (b *writerPipe) Run(tctx *tcontext.Context) { receiveChunkTime = time.Now() err := writeBytes(tctx, b.w, s.Bytes()) ObserveHistogram(writeTimeHistogram, b.labels, time.Since(receiveChunkTime).Seconds()) - AddCounter(finishedSizeCounter, b.labels, float64(s.Len())) + AddGauge(finishedSizeGauge, b.labels, float64(s.Len())) b.finishedFileSize += uint64(s.Len()) s.Reset() pool.Put(s) @@ -175,11 +175,32 @@ func WriteInsert(pCtx *tcontext.Context, cfg *Config, meta TableMeta, tblIR Tabl escapeBackslash = cfg.EscapeBackslash ) + defer func() { + if err != nil { + pCtx.L().Warn("fail to dumping table(chunk), will revert some metrics now", + zap.Error(err), + zap.String("database", meta.DatabaseName()), + zap.String("table", meta.TableName()), + zap.Uint64("finished rows", lastCounter), + zap.Uint64("finished size", wp.finishedFileSize)) + SubGauge(finishedRowsGauge, cfg.Labels, float64(lastCounter)) + SubGauge(finishedSizeGauge, cfg.Labels, float64(wp.finishedFileSize)) + } else { + pCtx.L().Debug("finish dumping table(chunk)", + zap.String("database", meta.DatabaseName()), + zap.String("table", meta.TableName()), + zap.Uint64("finished rows", counter), + zap.Uint64("finished size", wp.finishedFileSize)) + summary.CollectSuccessUnit(summary.TotalBytes, 1, wp.finishedFileSize) + summary.CollectSuccessUnit("total rows", 1, counter) + } + }() + selectedField := meta.SelectedField() // if has generated column if selectedField != "" && selectedField != "*" { - insertStatementPrefix = fmt.Sprintf("INSERT INTO %s %s VALUES\n", + insertStatementPrefix = fmt.Sprintf("INSERT INTO %s (%s) VALUES\n", wrapBackTicks(escapeString(meta.TableName())), selectedField) } else { insertStatementPrefix = fmt.Sprintf("INSERT INTO %s VALUES\n", @@ -196,7 +217,6 @@ func WriteInsert(pCtx *tcontext.Context, cfg *Config, meta TableMeta, tblIR Tabl lastBfSize := bf.Len() if selectedField != "" { if err = fileRowIter.Decode(row); err != nil { - pCtx.L().Error("fail to scan from sql.Row", zap.Error(err)) return counter, errors.Trace(err) } row.WriteToBuffer(bf, escapeBackslash) @@ -227,7 +247,7 @@ func WriteInsert(pCtx *tcontext.Context, cfg *Config, meta TableMeta, tblIR Tabl if bfCap := bf.Cap(); bfCap < lengthLimit { bf.Grow(lengthLimit - bfCap) } - AddCounter(finishedRowsCounter, cfg.Labels, float64(counter-lastCounter)) + AddGauge(finishedRowsGauge, cfg.Labels, float64(counter-lastCounter)) lastCounter = counter } } @@ -240,22 +260,13 @@ func WriteInsert(pCtx *tcontext.Context, cfg *Config, meta TableMeta, tblIR Tabl break } } - pCtx.L().Debug("finish dumping table(chunk)", - zap.String("database", meta.DatabaseName()), - zap.String("table", meta.TableName()), - zap.Uint64("total rows", counter)) if bf.Len() > 0 { wp.input <- bf } close(wp.input) <-wp.closed - AddCounter(finishedRowsCounter, cfg.Labels, float64(counter-lastCounter)) - defer func() { - if err == nil { - summary.CollectSuccessUnit(summary.TotalBytes, 1, wp.finishedFileSize) - summary.CollectSuccessUnit("total rows", 1, counter) - } - }() + AddGauge(finishedRowsGauge, cfg.Labels, float64(counter-lastCounter)) + lastCounter = counter if err = fileRowIter.Error(); err != nil { return counter, errors.Trace(err) } @@ -302,6 +313,27 @@ func WriteInsertInCsv(pCtx *tcontext.Context, cfg *Config, meta TableMeta, tblIR selectedFields = meta.SelectedField() ) + defer func() { + if err != nil { + pCtx.L().Warn("fail to dumping table(chunk), will revert some metrics now", + zap.Error(err), + zap.String("database", meta.DatabaseName()), + zap.String("table", meta.TableName()), + zap.Uint64("finished rows", lastCounter), + zap.Uint64("finished size", wp.finishedFileSize)) + SubGauge(finishedRowsGauge, cfg.Labels, float64(lastCounter)) + SubGauge(finishedSizeGauge, cfg.Labels, float64(wp.finishedFileSize)) + } else { + pCtx.L().Debug("finish dumping table(chunk)", + zap.String("database", meta.DatabaseName()), + zap.String("table", meta.TableName()), + zap.Uint64("finished rows", counter), + zap.Uint64("finished size", wp.finishedFileSize)) + summary.CollectSuccessUnit(summary.TotalBytes, 1, wp.finishedFileSize) + summary.CollectSuccessUnit("total rows", 1, counter) + } + }() + if !cfg.NoHeader && len(meta.ColumnNames()) != 0 && selectedFields != "" { for i, col := range meta.ColumnNames() { bf.Write(opt.delimiter) @@ -319,7 +351,6 @@ func WriteInsertInCsv(pCtx *tcontext.Context, cfg *Config, meta TableMeta, tblIR lastBfSize := bf.Len() if selectedFields != "" { if err = fileRowIter.Decode(row); err != nil { - pCtx.L().Error("fail to scan from sql.Row", zap.Error(err)) return counter, errors.Trace(err) } row.WriteToBufferInCsv(bf, escapeBackslash, opt) @@ -339,7 +370,7 @@ func WriteInsertInCsv(pCtx *tcontext.Context, cfg *Config, meta TableMeta, tblIR if bfCap := bf.Cap(); bfCap < lengthLimit { bf.Grow(lengthLimit - bfCap) } - AddCounter(finishedRowsCounter, cfg.Labels, float64(counter-lastCounter)) + AddGauge(finishedRowsGauge, cfg.Labels, float64(counter-lastCounter)) lastCounter = counter } } @@ -350,22 +381,13 @@ func WriteInsertInCsv(pCtx *tcontext.Context, cfg *Config, meta TableMeta, tblIR } } - pCtx.L().Debug("finish dumping table(chunk)", - zap.String("database", meta.DatabaseName()), - zap.String("table", meta.TableName()), - zap.Uint64("total rows", counter)) if bf.Len() > 0 { wp.input <- bf } close(wp.input) <-wp.closed - defer func() { - if err == nil { - summary.CollectSuccessUnit(summary.TotalBytes, 1, wp.finishedFileSize) - summary.CollectSuccessUnit("total rows", 1, counter) - } - }() - AddCounter(finishedRowsCounter, cfg.Labels, float64(counter-lastCounter)) + AddGauge(finishedRowsGauge, cfg.Labels, float64(counter-lastCounter)) + lastCounter = counter if err = fileRowIter.Error(); err != nil { return counter, errors.Trace(err) } diff --git a/v4/export/writer_util_test.go b/v4/export/writer_util_test.go index 73cc5fae..63b3afef 100644 --- a/v4/export/writer_util_test.go +++ b/v4/export/writer_util_test.go @@ -14,6 +14,7 @@ import ( "github.com/pingcap/br/pkg/storage" . "github.com/pingcap/check" "github.com/pingcap/errors" + "github.com/prometheus/client_golang/prometheus" ) var appLogger log.Logger @@ -30,22 +31,34 @@ func TestT(t *testing.T) { t.Fail() } appLogger = logger + registry := prometheus.NewRegistry() + registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{})) + registry.MustRegister(prometheus.NewGoCollector()) + RegisterMetrics(registry) TestingT(t) } -var _ = Suite(&testUtilSuite{}) +var _ = SerialSuites(&testWriteSuite{}) -type testUtilSuite struct { +type testWriteSuite struct { mockCfg *Config } -func (s *testUtilSuite) SetUpSuite(_ *C) { +func (s *testWriteSuite) SetUpSuite(_ *C) { s.mockCfg = &Config{ FileSize: UnspecifiedSize, } + InitMetricsVector(s.mockCfg.Labels) } -func (s *testUtilSuite) TestWriteMeta(c *C) { +func (s *testWriteSuite) TearDownTest(c *C) { + RemoveLabelValuesWithTaskInMetrics(s.mockCfg.Labels) + + c.Assert(ReadGauge(finishedRowsGauge, s.mockCfg.Labels), Equals, float64(0)) + c.Assert(ReadGauge(finishedSizeGauge, s.mockCfg.Labels), Equals, float64(0)) +} + +func (s *testWriteSuite) TestWriteMeta(c *C) { createTableStmt := "CREATE TABLE `t1` (\n" + " `a` int(11) DEFAULT NULL\n" + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;\n" @@ -62,7 +75,7 @@ func (s *testUtilSuite) TestWriteMeta(c *C) { c.Assert(writer.String(), Equals, expected) } -func (s *testUtilSuite) TestWriteInsert(c *C) { +func (s *testWriteSuite) TestWriteInsert(c *C) { data := [][]driver.Value{ {"1", "male", "bob@mail.com", "020-1234", nil}, {"2", "female", "sarah@mail.com", "020-1253", "healthy"}, @@ -77,7 +90,7 @@ func (s *testUtilSuite) TestWriteInsert(c *C) { tableIR := newMockTableIR("test", "employee", data, specCmts, colTypes) bf := storage.NewBufferWriter() - conf := configForWriteSQL(UnspecifiedSize, UnspecifiedSize) + conf := configForWriteSQL(s.mockCfg, UnspecifiedSize, UnspecifiedSize) n, err := WriteInsert(tcontext.Background(), conf, tableIR, tableIR, bf) c.Assert(n, Equals, uint64(4)) c.Assert(err, IsNil) @@ -89,9 +102,11 @@ func (s *testUtilSuite) TestWriteInsert(c *C) { "(3,'male','john@mail.com','020-1256','healthy'),\n" + "(4,'female','sarah@mail.com','020-1235','healthy');\n" c.Assert(bf.String(), Equals, expected) + c.Assert(ReadGauge(finishedRowsGauge, conf.Labels), Equals, float64(len(data))) + c.Assert(ReadGauge(finishedSizeGauge, conf.Labels), Equals, float64(len(expected))) } -func (s *testUtilSuite) TestWriteInsertReturnsError(c *C) { +func (s *testWriteSuite) TestWriteInsertReturnsError(c *C) { data := [][]driver.Value{ {"1", "male", "bob@mail.com", "020-1234", nil}, {"2", "female", "sarah@mail.com", "020-1253", "healthy"}, @@ -109,7 +124,7 @@ func (s *testUtilSuite) TestWriteInsertReturnsError(c *C) { tableIR.rowErr = rowErr bf := storage.NewBufferWriter() - conf := configForWriteSQL(UnspecifiedSize, UnspecifiedSize) + conf := configForWriteSQL(s.mockCfg, UnspecifiedSize, UnspecifiedSize) n, err := WriteInsert(tcontext.Background(), conf, tableIR, tableIR, bf) c.Assert(n, Equals, uint64(3)) c.Assert(err, Equals, rowErr) @@ -120,9 +135,12 @@ func (s *testUtilSuite) TestWriteInsertReturnsError(c *C) { "(2,'female','sarah@mail.com','020-1253','healthy'),\n" + "(3,'male','john@mail.com','020-1256','healthy');\n" c.Assert(bf.String(), Equals, expected) + // error occurred, should revert pointer to zero + c.Assert(ReadGauge(finishedRowsGauge, conf.Labels), Equals, float64(0)) + c.Assert(ReadGauge(finishedSizeGauge, conf.Labels), Equals, float64(0)) } -func (s *testUtilSuite) TestWriteInsertInCsv(c *C) { +func (s *testWriteSuite) TestWriteInsertInCsv(c *C) { data := [][]driver.Value{ {"1", "male", "bob@mail.com", "020-1234", nil}, {"2", "female", "sarah@mail.com", "020-1253", "healthy"}, @@ -135,7 +153,7 @@ func (s *testUtilSuite) TestWriteInsertInCsv(c *C) { // test nullValue opt := &csvOption{separator: []byte(","), delimiter: doubleQuotationMark, nullValue: "\\N"} - conf := configForWriteCSV(true, opt) + conf := configForWriteCSV(s.mockCfg, true, opt) n, err := WriteInsertInCsv(tcontext.Background(), conf, tableIR, tableIR, bf) c.Assert(n, Equals, uint64(4)) c.Assert(err, IsNil) @@ -144,12 +162,15 @@ func (s *testUtilSuite) TestWriteInsertInCsv(c *C) { "3,\"male\",\"john@mail.com\",\"020-1256\",\"healthy\"\n" + "4,\"female\",\"sarah@mail.com\",\"020-1235\",\"healthy\"\n" c.Assert(bf.String(), Equals, expected) + c.Assert(ReadGauge(finishedRowsGauge, conf.Labels), Equals, float64(len(data))) + c.Assert(ReadGauge(finishedSizeGauge, conf.Labels), Equals, float64(len(expected))) + RemoveLabelValuesWithTaskInMetrics(conf.Labels) // test delimiter bf.Reset() opt.delimiter = quotationMark tableIR = newMockTableIR("test", "employee", data, nil, colTypes) - conf = configForWriteCSV(true, opt) + conf = configForWriteCSV(s.mockCfg, true, opt) n, err = WriteInsertInCsv(tcontext.Background(), conf, tableIR, tableIR, bf) c.Assert(n, Equals, uint64(4)) c.Assert(err, IsNil) @@ -158,12 +179,15 @@ func (s *testUtilSuite) TestWriteInsertInCsv(c *C) { "3,'male','john@mail.com','020-1256','healthy'\n" + "4,'female','sarah@mail.com','020-1235','healthy'\n" c.Assert(bf.String(), Equals, expected) + c.Assert(ReadGauge(finishedRowsGauge, conf.Labels), Equals, float64(len(data))) + c.Assert(ReadGauge(finishedSizeGauge, conf.Labels), Equals, float64(len(expected))) + RemoveLabelValuesWithTaskInMetrics(conf.Labels) // test separator bf.Reset() opt.separator = []byte(";") tableIR = newMockTableIR("test", "employee", data, nil, colTypes) - conf = configForWriteCSV(true, opt) + conf = configForWriteCSV(s.mockCfg, true, opt) n, err = WriteInsertInCsv(tcontext.Background(), conf, tableIR, tableIR, bf) c.Assert(n, Equals, uint64(4)) c.Assert(err, IsNil) @@ -172,6 +196,9 @@ func (s *testUtilSuite) TestWriteInsertInCsv(c *C) { "3;'male';'john@mail.com';'020-1256';'healthy'\n" + "4;'female';'sarah@mail.com';'020-1235';'healthy'\n" c.Assert(bf.String(), Equals, expected) + c.Assert(ReadGauge(finishedRowsGauge, conf.Labels), Equals, float64(len(data))) + c.Assert(ReadGauge(finishedSizeGauge, conf.Labels), Equals, float64(len(expected))) + RemoveLabelValuesWithTaskInMetrics(conf.Labels) // test delimiter that included in values bf.Reset() @@ -179,7 +206,7 @@ func (s *testUtilSuite) TestWriteInsertInCsv(c *C) { opt.delimiter = []byte("ma") tableIR = newMockTableIR("test", "employee", data, nil, colTypes) tableIR.colNames = []string{"id", "gender", "email", "phone_number", "status"} - conf = configForWriteCSV(false, opt) + conf = configForWriteCSV(s.mockCfg, false, opt) n, err = WriteInsertInCsv(tcontext.Background(), conf, tableIR, tableIR, bf) c.Assert(n, Equals, uint64(4)) c.Assert(err, IsNil) @@ -189,9 +216,42 @@ func (s *testUtilSuite) TestWriteInsertInCsv(c *C) { "3&;,?mamamalema&;,?majohn@mamail.comma&;,?ma020-1256ma&;,?mahealthyma\n" + "4&;,?mafemamalema&;,?masarah@mamail.comma&;,?ma020-1235ma&;,?mahealthyma\n" c.Assert(bf.String(), Equals, expected) + c.Assert(ReadGauge(finishedRowsGauge, conf.Labels), Equals, float64(len(data))) + c.Assert(ReadGauge(finishedSizeGauge, conf.Labels), Equals, float64(len(expected))) + RemoveLabelValuesWithTaskInMetrics(conf.Labels) +} + +func (s *testWriteSuite) TestWriteInsertInCsvReturnsError(c *C) { + data := [][]driver.Value{ + {"1", "male", "bob@mail.com", "020-1234", nil}, + {"2", "female", "sarah@mail.com", "020-1253", "healthy"}, + {"3", "male", "john@mail.com", "020-1256", "healthy"}, + {"4", "female", "sarah@mail.com", "020-1235", "healthy"}, + } + colTypes := []string{"INT", "SET", "VARCHAR", "VARCHAR", "TEXT"} + + // row errors at last line + rowErr := errors.New("mock row error") + tableIR := newMockTableIR("test", "employee", data, nil, colTypes) + tableIR.rowErr = rowErr + bf := storage.NewBufferWriter() + + // test nullValue + opt := &csvOption{separator: []byte(","), delimiter: doubleQuotationMark, nullValue: "\\N"} + conf := configForWriteCSV(s.mockCfg, true, opt) + n, err := WriteInsertInCsv(tcontext.Background(), conf, tableIR, tableIR, bf) + c.Assert(n, Equals, uint64(3)) + c.Assert(err, Equals, rowErr) + expected := "1,\"male\",\"bob@mail.com\",\"020-1234\",\\N\n" + + "2,\"female\",\"sarah@mail.com\",\"020-1253\",\"healthy\"\n" + + "3,\"male\",\"john@mail.com\",\"020-1256\",\"healthy\"\n" + c.Assert(bf.String(), Equals, expected) + c.Assert(ReadGauge(finishedRowsGauge, conf.Labels), Equals, float64(0)) + c.Assert(ReadGauge(finishedSizeGauge, conf.Labels), Equals, float64(0)) + RemoveLabelValuesWithTaskInMetrics(conf.Labels) } -func (s *testUtilSuite) TestSQLDataTypes(c *C) { +func (s *testWriteSuite) TestSQLDataTypes(c *C) { data := [][]driver.Value{ {"CHAR", "char1", `'char1'`}, {"INT", 12345, `12345`}, @@ -206,17 +266,20 @@ func (s *testUtilSuite) TestSQLDataTypes(c *C) { tableIR := newMockTableIR("test", "t", tableData, nil, colType) bf := storage.NewBufferWriter() - conf := configForWriteSQL(UnspecifiedSize, UnspecifiedSize) + conf := configForWriteSQL(s.mockCfg, UnspecifiedSize, UnspecifiedSize) n, err := WriteInsert(tcontext.Background(), conf, tableIR, tableIR, bf) c.Assert(n, Equals, uint64(1)) c.Assert(err, IsNil) lines := strings.Split(bf.String(), "\n") c.Assert(len(lines), Equals, 3) c.Assert(lines[1], Equals, fmt.Sprintf("(%s);", result)) + c.Assert(ReadGauge(finishedRowsGauge, conf.Labels), Equals, float64(1)) + c.Assert(ReadGauge(finishedSizeGauge, conf.Labels), Equals, float64(len(bf.String()))) + RemoveLabelValuesWithTaskInMetrics(conf.Labels) } } -func (s *testUtilSuite) TestWrite(c *C) { +func (s *testWriteSuite) TestWrite(c *C) { mocksw := &mockPoisonWriter{} src := []string{"test", "loooooooooooooooooooong", "poison"} exp := []string{"test", "loooooooooooooooooooong", "poison_error"} @@ -234,16 +297,26 @@ func (s *testUtilSuite) TestWrite(c *C) { c.Assert(err, IsNil) } -func configForWriteSQL(fileSize, statementSize uint64) *Config { - return &Config{FileSize: fileSize, StatementSize: statementSize} +// cloneConfigForTest clones a dumpling config. +func cloneConfigForTest(conf *Config) *Config { + clone := &Config{} + *clone = *conf + return clone } -func configForWriteCSV(noHeader bool, opt *csvOption) *Config { - return &Config{ - NoHeader: noHeader, - CsvNullValue: opt.nullValue, - CsvDelimiter: string(opt.delimiter), - CsvSeparator: string(opt.separator), - FileSize: UnspecifiedSize, - } +func configForWriteSQL(config *Config, fileSize, statementSize uint64) *Config { + cfg := cloneConfigForTest(config) + cfg.FileSize = fileSize + cfg.StatementSize = statementSize + return cfg +} + +func configForWriteCSV(config *Config, noHeader bool, opt *csvOption) *Config { + cfg := cloneConfigForTest(config) + cfg.NoHeader = noHeader + cfg.CsvNullValue = opt.nullValue + cfg.CsvDelimiter = string(opt.delimiter) + cfg.CsvSeparator = string(opt.separator) + cfg.FileSize = UnspecifiedSize + return cfg }