Skip to content

Commit

Permalink
src: add and implement shiftmanger and refactor code to adapt to it #583
Browse files Browse the repository at this point in the history


[summary]
add shiftmanager module and implement shiftMgr interface
[test case]
src/plugins/shiftmanager/shiftmanager_test
src/vendor/github.com/radondb/shift/shift/shift_test.go
src/vendor/github.com/radondb/shift/shift/pool_test.go

[patch codecov]
src/plugins/plugin.go 87.5%
src/plugins/shiftmanager/mockshift.go 94.1%
src/plugins/shiftmanager/shiftmanager.go 97.1%
src/vendor/github.com/radondb/shift/shift/shift.go  87.6%
  • Loading branch information
hustjieke authored and BohuTANG committed Mar 16, 2020
1 parent e4cd7a7 commit a23bb53
Show file tree
Hide file tree
Showing 14 changed files with 1,309 additions and 54 deletions.
3 changes: 2 additions & 1 deletion makefile
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ testplugins:
go test -v plugins
go test -v plugins/autoincrement
go test -v plugins/privilege
go test -v plugins/shiftmanager
testmysqlstack:
cd src/vendor/github.com/xelabs/go-mysqlstack&&make test

Expand All @@ -92,7 +93,7 @@ allpkgs = xbase\
audit\
syncer\
monitor\
plugins
plugins/...
coverage:
go build -v -o bin/gotestcover \
src/vendor/github.com/pierrre/gotestcover/*.go;
Expand Down
15 changes: 15 additions & 0 deletions src/plugins/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"plugins/autoincrement"
"plugins/privilege"
"plugins/shiftmanager"

"github.com/xelabs/go-mysqlstack/xlog"
)
Expand All @@ -27,6 +28,7 @@ type Plugin struct {
scatter *backend.Scatter
autoincrement autoincrement.AutoIncrementHandler
privilege privilege.PrivilegeHandler
shiftMgr shiftmanager.ShiftMgrHandler
}

// NewPlugin -- creates new Plugin.
Expand Down Expand Up @@ -60,13 +62,21 @@ func (plugin *Plugin) Init() error {
}
plugin.privilege = privilegePlug

// Register shiftmanager plug
shiftMgr := shiftmanager.NewShiftManager(log)
if err := shiftMgr.Init(); err != nil {
return err
}
plugin.shiftMgr = shiftMgr

return nil
}

// Close -- do nothing.
func (plugin *Plugin) Close() {
plugin.autoincrement.Close()
plugin.privilege.Close()
plugin.shiftMgr.Close()
}

// PlugAutoIncrement -- return AutoIncrement plug.
Expand All @@ -78,3 +88,8 @@ func (plugin *Plugin) PlugAutoIncrement() autoincrement.AutoIncrementHandler {
func (plugin *Plugin) PlugPrivilege() privilege.PrivilegeHandler {
return plugin.privilege
}

// PlugShiftMgr -- return ShiftMgr plug.
func (plugin *Plugin) PlugShiftMgr() shiftmanager.ShiftMgrHandler {
return plugin.shiftMgr
}
3 changes: 3 additions & 0 deletions src/plugins/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,7 @@ func TestPlugins(t *testing.T) {

privilegePlug := plugin.PlugPrivilege()
assert.NotNil(t, privilegePlug)

shiftMgrPlug := plugin.PlugShiftMgr()
assert.NotNil(t, shiftMgrPlug)
}
84 changes: 84 additions & 0 deletions src/plugins/shiftmanager/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Radon
*
* Copyright 2018-2020 The Radon Authors.
* Code is licensed under the GPLv3.
*
*/

package shiftmanager

import (
"github.com/radondb/shift/shift"
"github.com/xelabs/go-mysqlstack/sqlparser/depends/sqltypes"
)

// ShiftStatus used to indicates shift instance's status.
type ShiftStatus int

const (
// ShiftStatusNone enum, if status is none, some errors happen
ShiftStatusNone ShiftStatus = iota

// ShiftStatusMigrating enum
ShiftStatusMigrating

// ShiftStatusSuccess enum
ShiftStatusSuccess

// ShiftStatusFail enum
ShiftStatusFail
)

// ShiftType is used to distinguish what different type of shift
// If it is called by reshard, the type will be ShiftTypeReshard
// If it is called by rebalance, the type will be ShiftTypeRebalance
type ShiftType int

const (
// ShiftTypeNone enum, a shift type should not be none
ShiftTypeNone ShiftType = iota

// ShiftTypeReshard enum
ShiftTypeReshard

// ShiftTypeRebalance enum
ShiftTypeRebalance
)

// ShiftInfo used to record basic infos used by shift
type ShiftInfo struct {
From string
FromUser string
FromPassword string
FromDatabase string
FromTable string

To string
ToUser string
ToPassword string
ToDatabase string
ToTable string

Cleanup bool
Checksum bool
MysqlDump string
Threads int
PosBehinds int
WaitTimeBeforeChecksum int
RadonURL string
}

type ShiftMgrHandler interface {
Init() error
NewShiftInstance(shiftInfo *ShiftInfo, typ ShiftType) (shift.ShiftHandler, error)
StartShiftInstance(key string, shift shift.ShiftHandler, typ ShiftType) error
WaitInstanceFinishThread(key string) error
WaitInstanceFinish(key string) error
StopOneInstance(key string) error
StopAllInstance() error
GetStatus(key string) ShiftStatus
GetProgress(key string) *sqltypes.Result
GetShiftType(key string) ShiftType
Close() error
}
114 changes: 114 additions & 0 deletions src/plugins/shiftmanager/mockshift.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Radon
*
* Copyright 2018-2020 The Radon Authors.
* Code is licensed under the GPLv3.
*
*/

package shiftmanager

import (
"fmt"

"github.com/radondb/shift/shift"
"github.com/xelabs/go-mysqlstack/xlog"
)

// MockShiftInfo used to mock a shift info
var MockShiftInfo = &ShiftInfo{
From: "src",
FromUser: "test",
FromPassword: "test",
FromDatabase: "testdb",
FromTable: "tbl",

To: "dst",
ToUser: "user",
ToPassword: "user",
ToDatabase: "todb",
ToTable: "totbl",

Cleanup: false,
Checksum: true,
MysqlDump: "mysqldump",
Threads: 128,
PosBehinds: 2048,
WaitTimeBeforeChecksum: 10,
RadonURL: "",
}

// MockShift used to mock a shift for test
type MockShift struct {
log *xlog.Log

err chan struct{}
allDone chan struct{}
stopSignal chan struct{}
}

// NewMockShift used to new a mockshift
func NewMockShift(log *xlog.Log) shift.ShiftHandler {
return &MockShift{
log: log,
err: make(chan struct{}),
allDone: make(chan struct{}),
stopSignal: make(chan struct{}),
}
}

// Start used to start a shift work.
func (mockShift *MockShift) Start() error {
return nil
}

// WaitFinish used to wait success or fail signal to finish.
func (mockShift *MockShift) WaitFinish() error {
log := mockShift.log
select {
case <-mockShift.getAllDoneCh():
log.Info("mockshift.table.OK")
return nil
case <-mockShift.getErrorCh():
log.Error("mockshift.table.get.error")
return fmt.Errorf("mockshift.table.get.error")
case <-mockShift.getStopSignal():
log.Info("mockshift.table.get.stop.signal")
return fmt.Errorf("mockshift.table.get.stop.signal")
}
}

// ChecksumTable used to checksum data src tbl and dst tbl.
func (mockShift *MockShift) ChecksumTable() error {
return nil
}

// SetStopSignal used set a stop signal to stop a shift work.
func (mockShift *MockShift) SetStopSignal() {
close(mockShift.stopSignal)
}

// setAllDoneSignal used to set allDone signal
func (mockShift MockShift) setAllDoneSignal() {
close(mockShift.allDone)
}

// setErrSignal used to set allDone signal
func (mockShift MockShift) setErrSignal() {
close(mockShift.err)
}

// getAllDoneCh used to get success signal
func (mockShift *MockShift) getAllDoneCh() <-chan struct{} {
return mockShift.allDone
}

// getErrorCh used to get error signal
func (mockShift *MockShift) getErrorCh() <-chan struct{} {
return mockShift.err
}

// getStopSignal used to get stop signal
func (mockShift *MockShift) getStopSignal() <-chan struct{} {
return mockShift.stopSignal
}
Loading

0 comments on commit a23bb53

Please sign in to comment.