forked from pingcap/tidb
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
consistency: move code to a new file and add unit tests (pingcap#24)
* consistency: move code to a new file and add unit tests * consistency: resolve auto consistency for different db server * config: set default consistency option to 'auto'
- Loading branch information
Showing
8 changed files
with
295 additions
and
100 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,134 @@ | ||
package export | ||
|
||
import ( | ||
"database/sql" | ||
"errors" | ||
"fmt" | ||
) | ||
|
||
func NewConsistencyController(conf *Config, session *sql.DB) (ConsistencyController, error) { | ||
resolveAutoConsistency(conf) | ||
switch conf.Consistency { | ||
case "flush": | ||
return &ConsistencyFlushTableWithReadLock{ | ||
serverType: conf.ServerInfo.ServerType, | ||
db: session, | ||
}, nil | ||
case "lock": | ||
return &ConsistencyLockDumpingTables{ | ||
db: session, | ||
allTables: conf.Tables, | ||
}, nil | ||
case "snapshot": | ||
return &ConsistencySnapshot{ | ||
serverType: conf.ServerInfo.ServerType, | ||
snapshot: conf.Snapshot, | ||
db: session, | ||
}, nil | ||
case "none": | ||
return &ConsistencyNone{}, nil | ||
default: | ||
return nil, withStack(fmt.Errorf("invalid consistency option %s", conf.Consistency)) | ||
} | ||
} | ||
|
||
type ConsistencyController interface { | ||
Setup() error | ||
TearDown() error | ||
} | ||
|
||
type ConsistencyNone struct{} | ||
|
||
func (c *ConsistencyNone) Setup() error { | ||
return nil | ||
} | ||
|
||
func (c *ConsistencyNone) TearDown() error { | ||
return nil | ||
} | ||
|
||
type ConsistencyFlushTableWithReadLock struct { | ||
serverType ServerType | ||
db *sql.DB | ||
} | ||
|
||
func (c *ConsistencyFlushTableWithReadLock) Setup() error { | ||
if c.serverType == ServerTypeTiDB { | ||
return withStack(errors.New("'flush table with read lock' cannot be used to ensure the consistency in TiDB")) | ||
} | ||
return FlushTableWithReadLock(c.db) | ||
} | ||
|
||
func (c *ConsistencyFlushTableWithReadLock) TearDown() error { | ||
err := c.db.Ping() | ||
if err != nil { | ||
return withStack(errors.New("ConsistencyFlushTableWithReadLock lost database connection")) | ||
} | ||
return UnlockTables(c.db) | ||
} | ||
|
||
type ConsistencyLockDumpingTables struct { | ||
db *sql.DB | ||
allTables DatabaseTables | ||
} | ||
|
||
func (c *ConsistencyLockDumpingTables) Setup() error { | ||
for dbName, tables := range c.allTables { | ||
for _, table := range tables { | ||
err := LockTables(c.db, dbName, table) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
func (c *ConsistencyLockDumpingTables) TearDown() error { | ||
err := c.db.Ping() | ||
if err != nil { | ||
return withStack(errors.New("ConsistencyLockDumpingTables lost database connection")) | ||
} | ||
return UnlockTables(c.db) | ||
} | ||
|
||
type ConsistencySnapshot struct { | ||
serverType ServerType | ||
snapshot string | ||
db *sql.DB | ||
} | ||
|
||
const showMasterStatusFieldNum = 5 | ||
const snapshotFieldIndex = 1 | ||
|
||
func (c *ConsistencySnapshot) Setup() error { | ||
if c.serverType != ServerTypeTiDB { | ||
return withStack(errors.New("snapshot consistency is not supported for this server")) | ||
} | ||
if c.snapshot == "" { | ||
str, err := ShowMasterStatus(c.db, showMasterStatusFieldNum) | ||
if err != nil { | ||
return err | ||
} | ||
c.snapshot = str[snapshotFieldIndex] | ||
} | ||
return SetTiDBSnapshot(c.db, c.snapshot) | ||
} | ||
|
||
func (c *ConsistencySnapshot) TearDown() error { | ||
return nil | ||
} | ||
|
||
func resolveAutoConsistency(conf *Config) { | ||
if conf.Consistency != "auto" { | ||
return | ||
} | ||
switch conf.ServerInfo.ServerType { | ||
case ServerTypeTiDB: | ||
conf.Consistency = "snapshot" | ||
case ServerTypeMySQL, ServerTypeMariaDB: | ||
conf.Consistency = "flush" | ||
default: | ||
conf.Consistency = "none" | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
package export | ||
|
||
import ( | ||
"errors" | ||
"strings" | ||
|
||
"github.com/DATA-DOG/go-sqlmock" | ||
. "github.com/pingcap/check" | ||
) | ||
|
||
var _ = Suite(&testConsistencySuite{}) | ||
|
||
type testConsistencySuite struct{} | ||
|
||
func (s *testConsistencySuite) assertNil(err error, c *C) { | ||
if err != nil { | ||
c.Fatalf(err.Error()) | ||
} | ||
} | ||
|
||
func (s *testConsistencySuite) assertLifetimeErrNil(ctrl ConsistencyController, c *C) { | ||
s.assertNil(ctrl.Setup(), c) | ||
s.assertNil(ctrl.TearDown(), c) | ||
} | ||
|
||
func (s *testConsistencySuite) TestConsistencyController(c *C) { | ||
db, mock, err := sqlmock.New() | ||
c.Assert(err, IsNil) | ||
defer db.Close() | ||
conf := DefaultConfig() | ||
resultOk := sqlmock.NewResult(0, 1) | ||
|
||
conf.Consistency = "none" | ||
ctrl, _ := NewConsistencyController(conf, db) | ||
_, ok := ctrl.(*ConsistencyNone) | ||
c.Assert(ok, IsTrue) | ||
s.assertLifetimeErrNil(ctrl, c) | ||
|
||
conf.Consistency = "flush" | ||
mock.ExpectExec("FLUSH TABLES WITH READ LOCK").WillReturnResult(resultOk) | ||
mock.ExpectExec("UNLOCK TABLES").WillReturnResult(resultOk) | ||
ctrl, _ = NewConsistencyController(conf, db) | ||
_, ok = ctrl.(*ConsistencyFlushTableWithReadLock) | ||
c.Assert(ok, IsTrue) | ||
s.assertLifetimeErrNil(ctrl, c) | ||
if err = mock.ExpectationsWereMet(); err != nil { | ||
c.Fatalf(err.Error()) | ||
} | ||
|
||
conf.Consistency = "snapshot" | ||
conf.ServerInfo.ServerType = ServerTypeTiDB | ||
conf.Snapshot = "" // let dumpling detect the TSO | ||
rows := sqlmock.NewRows([]string{"File", "Position", "Binlog_Do_DB", "Binlog_Ignore_DB", "Executed_Gtid_Set"}) | ||
rows.AddRow("tidb-binlog", "413802961528946688", "", "", "") | ||
mock.ExpectQuery("SHOW MASTER STATUS").WillReturnRows(rows) | ||
mock.ExpectExec("SET SESSION tidb_snapshot"). | ||
WillReturnResult(sqlmock.NewResult(0, 1)) | ||
ctrl, _ = NewConsistencyController(conf, db) | ||
_, ok = ctrl.(*ConsistencySnapshot) | ||
c.Assert(ok, IsTrue) | ||
s.assertLifetimeErrNil(ctrl, c) | ||
if err = mock.ExpectationsWereMet(); err != nil { | ||
c.Fatalf(err.Error()) | ||
} | ||
|
||
conf.Consistency = "lock" | ||
conf.Tables = map[databaseName][]tableName{ | ||
"db1": {"t1", "t2", "t3"}, | ||
"db2": {"t4"}, | ||
} | ||
for i := 0; i < 4; i += 1 { | ||
mock.ExpectExec("LOCK TABLES").WillReturnResult(resultOk) | ||
} | ||
mock.ExpectExec("UNLOCK TABLES").WillReturnResult(resultOk) | ||
ctrl, _ = NewConsistencyController(conf, db) | ||
_, ok = ctrl.(*ConsistencyLockDumpingTables) | ||
c.Assert(ok, IsTrue) | ||
s.assertLifetimeErrNil(ctrl, c) | ||
if err = mock.ExpectationsWereMet(); err != nil { | ||
c.Fatalf(err.Error()) | ||
} | ||
} | ||
|
||
func (s *testConsistencySuite) TestResolveAutoConsistency(c *C) { | ||
conf := DefaultConfig() | ||
cases := []struct { | ||
serverTp ServerType | ||
resolvedConsistency string | ||
}{ | ||
{ServerTypeTiDB, "snapshot"}, | ||
{ServerTypeMySQL, "flush"}, | ||
{ServerTypeMariaDB, "flush"}, | ||
{ServerTypeUnknown, "none"}, | ||
} | ||
|
||
for _, x := range cases { | ||
conf.Consistency = "auto" | ||
conf.ServerInfo.ServerType = x.serverTp | ||
resolveAutoConsistency(conf) | ||
cmt := Commentf("server type %s", x.serverTp.String()) | ||
c.Assert(conf.Consistency, Equals, x.resolvedConsistency, cmt) | ||
} | ||
} | ||
|
||
func (s *testConsistencySuite) TestConsistencyControllerError(c *C) { | ||
db, mock, err := sqlmock.New() | ||
c.Assert(err, IsNil) | ||
defer db.Close() | ||
conf := DefaultConfig() | ||
|
||
conf.Consistency = "invalid_str" | ||
_, err = NewConsistencyController(conf, db) | ||
c.Assert(err, NotNil) | ||
c.Assert(strings.Contains(err.Error(), "invalid consistency option"), IsTrue) | ||
|
||
// snapshot consistency is only available in TiDB | ||
conf.Consistency = "snapshot" | ||
conf.ServerInfo.ServerType = ServerTypeUnknown | ||
ctrl, _ := NewConsistencyController(conf, db) | ||
err = ctrl.Setup() | ||
c.Assert(err, NotNil) | ||
|
||
// flush consistency is unavailable in TiDB | ||
conf.Consistency = "flush" | ||
conf.ServerInfo.ServerType = ServerTypeTiDB | ||
ctrl, _ = NewConsistencyController(conf, db) | ||
err = ctrl.Setup() | ||
c.Assert(err, NotNil) | ||
|
||
// lock table fail | ||
conf.Consistency = "lock" | ||
conf.Tables = map[databaseName][]tableName{"db": {"t"}} | ||
mock.ExpectExec("LOCK TABLE").WillReturnError(errors.New("")) | ||
ctrl, _ = NewConsistencyController(conf, db) | ||
err = ctrl.Setup() | ||
c.Assert(err, NotNil) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.