Skip to content

Commit

Permalink
dumpling: Add support for Create Placement (#29724)
Browse files Browse the repository at this point in the history
  • Loading branch information
sylzd committed Nov 29, 2021
1 parent 352811d commit 4b826a0
Show file tree
Hide file tree
Showing 11 changed files with 179 additions and 3 deletions.
29 changes: 29 additions & 0 deletions dumpling/export/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"database/sql"
"encoding/hex"
"fmt"
"github.com/go-sql-driver/mysql"
"math/big"
"sort"
"strconv"
Expand Down Expand Up @@ -298,6 +299,34 @@ func (d *Dumper) startWriters(tctx *tcontext.Context, wg *errgroup.Group, taskCh
func (d *Dumper) dumpDatabases(tctx *tcontext.Context, metaConn *sql.Conn, taskChan chan<- Task) error {
conf := d.conf
allTables := conf.Tables

// policy should be created before database
// placement policy in other server type can be different, so we only handle the tidb server
if conf.ServerInfo.ServerType == version.ServerTypeTiDB {
policyNames, err := ListAllPlacementPolicyNames(metaConn)
if err != nil {
errCause := errors.Cause(err)
if mysqlErr, ok := errCause.(*mysql.MySQLError); ok && mysqlErr.Number == ErrNoSuchTable {
// some old tidb version and other server type doesn't support placement rules, we can skip it.
tctx.L().Debug("cannot dump placement policy, maybe the server doesn't support it", log.ShortError(err))
} else {
tctx.L().Warn("fail to dump placement policy: ", log.ShortError(err))
}
}
for _, policy := range policyNames {
createPolicySQL, err := ShowCreatePlacementPolicy(metaConn, policy)
if err != nil {
return err
}
wrappedCreatePolicySQL := fmt.Sprintf("/*T![placement] %s */", createPolicySQL)
task := NewTaskPolicyMeta(policy, wrappedCreatePolicySQL)
ctxDone := d.sendTaskToChan(tctx, task, taskChan)
if ctxDone {
return tctx.Err()
}
}
}

for dbName, tables := range allTables {
if !conf.NoSchemas {
createDatabaseSQL, err := ShowCreateDatabase(metaConn, dbName)
Expand Down
1 change: 1 addition & 0 deletions dumpling/export/dump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func TestDumpBlock(t *testing.T) {
taskChan := make(chan Task, 1)
taskChan <- &TaskDatabaseMeta{}
d.conf.Tables = DatabaseTables{}.AppendTable(database, nil)
d.conf.ServerInfo.ServerType = version.ServerTypeMySQL
require.ErrorIs(t, d.dumpDatabases(writerCtx, conn, taskChan), context.Canceled)
require.ErrorIs(t, wg.Wait(), writerErr)
}
Expand Down
34 changes: 34 additions & 0 deletions dumpling/export/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,21 @@ func ShowCreateTable(db *sql.Conn, database, table string) (string, error) {
return oneRow[1], nil
}

// ShowCreatePlacementPolicy constructs the create policy SQL for a specified table
// returns (createPoilicySQL, error)
func ShowCreatePlacementPolicy(db *sql.Conn, policy string) (string, error) {
var oneRow [2]string
handleOneRow := func(rows *sql.Rows) error {
return rows.Scan(&oneRow[0], &oneRow[1])
}
query := fmt.Sprintf("SHOW CREATE PLACEMENT POLICY `%s`", escapeString(policy))
err := simpleQuery(db, query, handleOneRow)
if err != nil {
return "", errors.Annotatef(err, "sql: %s", query)
}
return oneRow[1], nil
}

// ShowCreateView constructs the create view SQL for a specified view
// returns (createFakeTableSQL, createViewSQL, error)
func ShowCreateView(db *sql.Conn, database, view string) (createFakeTableSQL string, createRealViewSQL string, err error) {
Expand Down Expand Up @@ -279,6 +294,25 @@ func ListAllDatabasesTables(tctx *tcontext.Context, db *sql.Conn, databaseNames
return dbTables, nil
}

func ListAllPlacementPolicyNames(db *sql.Conn) ([]string, error) {
var policyList []string
var policy string
const query = "select distinct policy_name from information_schema.placement_rules where policy_name is not null;"
rows, err := db.QueryContext(context.Background(), query)
if err != nil {
return policyList, errors.Annotatef(err, "sql: %s", query)
}
defer rows.Close()
for rows.Next() {
err := rows.Scan(&policy)
if err != nil {
return policyList, errors.Trace(err)
}
policyList = append(policyList, policy)
}
return policyList, errors.Annotatef(rows.Err(), "sql: %s", query)
}

// SelectVersion gets the version information from the database server
func SelectVersion(db *sql.DB) (string, error) {
var versionInfo string
Expand Down
54 changes: 54 additions & 0 deletions dumpling/export/sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"encoding/csv"
"encoding/json"
"fmt"
"github.com/go-sql-driver/mysql"
"io"
"os"
"path"
Expand Down Expand Up @@ -340,6 +341,59 @@ func TestShowCreateView(t *testing.T) {
require.NoError(t, mock.ExpectationsWereMet())
}

func TestShowCreatePolicy(t *testing.T) {
t.Parallel()

db, mock, err := sqlmock.New()
require.NoError(t, err)
defer func() {
require.NoError(t, db.Close())
}()

conn, err := db.Conn(context.Background())
require.NoError(t, err)

mock.ExpectQuery("SHOW CREATE PLACEMENT POLICY `policy_x`").
WillReturnRows(sqlmock.NewRows([]string{"Policy", "Create Policy"}).
AddRow("policy_x", "CREATE PLACEMENT POLICY `policy_x` LEARNERS=1"))

createPolicySQL, err := ShowCreatePlacementPolicy(conn, "policy_x")
require.NoError(t, err)
require.Equal(t, "CREATE PLACEMENT POLICY `policy_x` LEARNERS=1", createPolicySQL)
require.NoError(t, mock.ExpectationsWereMet())

}

func TestListPolicyNames(t *testing.T) {
t.Parallel()

db, mock, err := sqlmock.New()
require.NoError(t, err)
defer func() {
require.NoError(t, db.Close())
}()

conn, err := db.Conn(context.Background())
require.NoError(t, err)

mock.ExpectQuery("select distinct policy_name from information_schema.placement_rules where policy_name is not null;").
WillReturnRows(sqlmock.NewRows([]string{"policy_name"}).
AddRow("policy_x"))
policies, err := ListAllPlacementPolicyNames(conn)
require.NoError(t, err)
require.Equal(t, []string{"policy_x"}, policies)
require.NoError(t, mock.ExpectationsWereMet())

// some old tidb version doesn't support placement rules returns error
expectedErr := &mysql.MySQLError{Number: ErrNoSuchTable, Message: "Table 'information_schema.placement_rules' doesn't exist"}
mock.ExpectExec("select distinct policy_name from information_schema.placement_rules where policy_name is not null;").
WillReturnError(expectedErr)
policies, err = ListAllPlacementPolicyNames(conn)
if mysqlErr, ok := err.(*mysql.MySQLError); ok {
require.Equal(t, mysqlErr.Number, ErrNoSuchTable)
}
}

func TestGetSuitableRows(t *testing.T) {
t.Parallel()

Expand Down
22 changes: 21 additions & 1 deletion dumpling/export/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ package export

import "fmt"

// Task is a file dump task for dumpling, it could either be dumping database/table/view metadata, table data
// Task is a file dump task for dumpling, it could either be dumping database/table/view/policy metadata, table data
type Task interface {
// Brief is the brief for a dumping task
Brief() string
Expand Down Expand Up @@ -34,6 +34,13 @@ type TaskViewMeta struct {
CreateViewSQL string
}

// TaskPolicyMeta is a dumping view metadata task
type TaskPolicyMeta struct {
Task
PolicyName string
CreatePolicySQL string
}

// TaskTableData is a dumping table data task
type TaskTableData struct {
Task
Expand Down Expand Up @@ -70,6 +77,14 @@ func NewTaskViewMeta(dbName, tblName, createTableSQL, createViewSQL string) *Tas
}
}

// NewTaskPolicyMeta returns a new dumping placement policy metadata task
func NewTaskPolicyMeta(policyName, createPolicySQL string) *TaskPolicyMeta {
return &TaskPolicyMeta{
PolicyName: policyName,
CreatePolicySQL: createPolicySQL,
}
}

// NewTaskTableData returns a new dumping table data task
func NewTaskTableData(meta TableMeta, data TableDataIR, currentChunk, totalChunks int) *TaskTableData {
return &TaskTableData{
Expand All @@ -95,6 +110,11 @@ func (t *TaskViewMeta) Brief() string {
return fmt.Sprintf("meta of view '%s'.'%s'", t.DatabaseName, t.ViewName)
}

// Brief implements task.Brief
func (t *TaskPolicyMeta) Brief() string {
return fmt.Sprintf("meta of placement policy '%s'", t.PolicyName)
}

// Brief implements task.Brief
func (t *TaskTableData) Brief() string {
db, tbl := t.Meta.DatabaseName(), t.Meta.TableName()
Expand Down
2 changes: 2 additions & 0 deletions dumpling/export/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ func (w *Writer) handleTask(task Task) error {
return w.WriteTableMeta(t.DatabaseName, t.TableName, t.CreateTableSQL)
case *TaskViewMeta:
return w.WriteViewMeta(t.DatabaseName, t.ViewName, t.CreateTableSQL, t.CreateViewSQL)
case *TaskPolicyMeta:
return w.WritePolicyMeta(t.PolicyName, t.CreatePolicySQL)
case *TaskTableData:
err := w.WriteTableData(t.Meta, t.Data, t.ChunkIndex)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion dumpling/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ cd $pwd/tidb-lightning && make
cd $pwd
mv tidb-lightning/bin/tidb-lightning bin/

TIDB_TAG="v4.0.4"
TIDB_TAG="master"
# download tidb-server
git clone -b $TIDB_TAG https://github.com/pingcap/tidb
cd $pwd/tidb && make
Expand Down
2 changes: 1 addition & 1 deletion dumpling/tests/basic/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ echo "expected 2, actual ${actual}"

# Test for tidb_mem_quota_query configuration
export GO_FAILPOINTS="github.com/pingcap/tidb/dumpling/export/PrintTiDBMemQuotaQuery=1*return"
run_dumpling > ${DUMPLING_OUTPUT_DIR}/dumpling.log
run_dumpling | tee ${DUMPLING_OUTPUT_DIR}/dumpling.log
actual=$(grep -w "tidb_mem_quota_query == 1073741824" ${DUMPLING_OUTPUT_DIR}/dumpling.log|wc -l)
echo "expected 1, actual ${actual}"
[ "$actual" = 1 ]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
/*!40101 SET NAMES binary*/;
/*T![placement] SET PLACEMENT_CHECKS = 0*/;
/*T![placement] CREATE PLACEMENT POLICY `x` PRIMARY_REGION="cn-east-1" REGIONS="cn-east-1,cn-east" */;
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
/*!40101 SET NAMES binary*/;
/*T![placement] SET PLACEMENT_CHECKS = 0*/;
/*T![placement] CREATE PLACEMENT POLICY `x1` FOLLOWERS=4 */;
30 changes: 30 additions & 0 deletions dumpling/tests/placement_policy/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#!/bin/sh
#
# Copyright 2021 PingCAP, Inc. Licensed under Apache-2.0.

set -eu

export DUMPLING_TEST_PORT=4000

run_sql "drop database if exists policy"
run_sql "drop placement policy if exists x"
run_sql "drop placement policy if exists x1"
run_sql "create database policy"

export DUMPLING_TEST_DATABASE="policy"

run_sql 'CREATE PLACEMENT POLICY x PRIMARY_REGION="cn-east-1" REGIONS="cn-east-1,cn-east";'
run_sql 'CREATE PLACEMENT POLICY x1 FOLLOWERS=4;'

run_dumpling

file_should_exist "$DUMPLING_OUTPUT_DIR/policy-schema-create.sql"
file_should_exist "$DUMPLING_OUTPUT_DIR/x-placement-policy-create.sql"
file_should_exist "$DUMPLING_OUTPUT_DIR/x1-placement-policy-create.sql"

diff "$DUMPLING_BASE_NAME/result/x-placement-policy-create.sql" "$DUMPLING_OUTPUT_DIR/x-placement-policy-create.sql"
diff "$DUMPLING_BASE_NAME/result/x1-placement-policy-create.sql" "$DUMPLING_OUTPUT_DIR/x1-placement-policy-create.sql"

run_sql "drop database if exists policy"
run_sql "drop placement policy if exists x"
run_sql "drop placement policy if exists x1"

0 comments on commit 4b826a0

Please sign in to comment.