Skip to content

Commit

Permalink
Fix(migrate): fix migrate
Browse files Browse the repository at this point in the history
Signed-off-by: caoxianfei1 <[email protected]>
  • Loading branch information
caoxianfei1 committed Dec 8, 2023
1 parent 9d83a9c commit 1153173
Show file tree
Hide file tree
Showing 14 changed files with 751 additions and 47 deletions.
4 changes: 1 addition & 3 deletions cli/command/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,6 @@ func genDeployPlaybook(curveadm *cli.CurveAdm,
Name: options.poolset,
Type: options.poolsetDiskType,
}
diskType := options.poolsetDiskType

pb := playbook.NewPlaybook(curveadm)
for _, step := range steps {
Expand All @@ -255,8 +254,7 @@ func genDeployPlaybook(curveadm *cli.CurveAdm,
options[comm.KEY_NUMBER_OF_CHUNKSERVER] = calcNumOfChunkserver(curveadm, dcs)
} else if step == CREATE_LOGICAL_POOL {
options[comm.KEY_CREATE_POOL_TYPE] = comm.POOL_TYPE_LOGICAL
options[comm.POOLSET] = poolset
options[comm.POOLSET_DISK_TYPE] = diskType
options[comm.KEY_POOLSET] = poolset
options[comm.KEY_NUMBER_OF_CHUNKSERVER] = calcNumOfChunkserver(curveadm, dcs)
}

Expand Down
108 changes: 87 additions & 21 deletions cli/command/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,19 @@
package command

import (
"fmt"

"github.com/fatih/color"
"github.com/opencurve/curveadm/cli/cli"
comm "github.com/opencurve/curveadm/internal/common"
"github.com/opencurve/curveadm/internal/configure"
"github.com/opencurve/curveadm/internal/configure/topology"
"github.com/opencurve/curveadm/internal/errno"
"github.com/opencurve/curveadm/internal/playbook"
tui "github.com/opencurve/curveadm/internal/tui/common"
"github.com/opencurve/curveadm/internal/task/task/common"
tui "github.com/opencurve/curveadm/internal/tui"
tuicomm "github.com/opencurve/curveadm/internal/tui/common"

cliutil "github.com/opencurve/curveadm/internal/utils"
"github.com/spf13/cobra"
)
Expand Down Expand Up @@ -71,14 +76,12 @@ var (
// chunkserevr (curvebs)
MIGRATE_CHUNKSERVER_STEPS = []int{
playbook.BACKUP_ETCD_DATA,
playbook.STOP_SERVICE,
playbook.CLEAN_SERVICE, // only container
playbook.CREATE_PHYSICAL_POOL, // add machine that migrate to
playbook.PULL_IMAGE,
playbook.CREATE_CONTAINER,
playbook.SYNC_CONFIG,
playbook.CREATE_PHYSICAL_POOL,
playbook.START_CHUNKSERVER,
playbook.CREATE_LOGICAL_POOL,
playbook.MARK_CHUNKSERVER_PENGDDING,
}

// metaserver (curvefs)
Expand All @@ -100,12 +103,23 @@ var (
topology.ROLE_SNAPSHOTCLONE: MIGRATE_SNAPSHOTCLONE_STEPS,
topology.ROLE_METASERVER: MIGRATE_METASERVER_STEPS,
}
GET_MIGRATE_STATUS = []int{
playbook.GET_MIGRATE_STATUS,
}
MIGRATE_POST_CLEAN_STEPS = []int{
playbook.STOP_SERVICE,
playbook.CLEAN_SERVICE, // only container
playbook.CREATE_PHYSICAL_POOL, // remove machine that migrate from
playbook.UPDATE_TOPOLOGY,
}
)

type migrateOptions struct {
filename string
poolset string
poolsetDiskType string
showStatus bool
clean bool
}

func NewMigrateCommand(curveadm *cli.CurveAdm) *cobra.Command {
Expand All @@ -125,7 +139,8 @@ func NewMigrateCommand(curveadm *cli.CurveAdm) *cobra.Command {
flags := cmd.Flags()
flags.StringVar(&options.poolset, "poolset", "default", "Specify the poolset")
flags.StringVar(&options.poolsetDiskType, "poolset-disktype", "ssd", "Specify the disk type of physical pool")

flags.BoolVar(&options.showStatus, "status", false, "show copyset transferring status")
flags.BoolVar(&options.clean, "clean", false, "show copyset transferring status")
return cmd
}

Expand Down Expand Up @@ -189,10 +204,21 @@ func genMigratePlaybook(curveadm *cli.CurveAdm,
dcs2add := diffs[topology.DIFF_ADD]
dcs2del := diffs[topology.DIFF_DELETE]
migrates := getMigrates(curveadm, data)
if len(migrates) <= 0 {
return nil, fmt.Errorf("no service will be migrated")
}
role := migrates[0].From.GetRole()
steps := MIGRATE_ROLE_STEPS[role]
poolset := options.poolset
poolsetDiskType := options.poolsetDiskType
if options.showStatus {
steps = GET_MIGRATE_STATUS
}
if options.clean {
steps = MIGRATE_POST_CLEAN_STEPS
}
poolset := configure.Poolset{
Name: options.poolset,
Type: options.poolsetDiskType,
}

pb := playbook.NewPlaybook(curveadm)
for _, step := range steps {
Expand All @@ -204,8 +230,11 @@ func genMigratePlaybook(curveadm *cli.CurveAdm,
config = dcs2del
case playbook.BACKUP_ETCD_DATA:
config = curveadm.FilterDeployConfigByRole(dcs, topology.ROLE_ETCD)
case CREATE_PHYSICAL_POOL,
CREATE_LOGICAL_POOL:
case
playbook.CREATE_PHYSICAL_POOL,
playbook.CREATE_LOGICAL_POOL,
playbook.MARK_CHUNKSERVER_PENGDDING,
playbook.GET_MIGRATE_STATUS:
config = curveadm.FilterDeployConfigByRole(dcs, topology.ROLE_MDS)[:1]
}

Expand All @@ -215,19 +244,20 @@ func genMigratePlaybook(curveadm *cli.CurveAdm,
case playbook.CLEAN_SERVICE:
options[comm.KEY_CLEAN_ITEMS] = []string{comm.CLEAN_ITEM_CONTAINER}
options[comm.KEY_CLEAN_BY_RECYCLE] = true
options[comm.KEY_REMOVE_MIGRATED_SERVER] = true
case playbook.CREATE_PHYSICAL_POOL:
options[comm.KEY_CREATE_POOL_TYPE] = comm.POOL_TYPE_PHYSICAL
options[comm.KEY_MIGRATE_SERVERS] = migrates
options[comm.POOLSET] = poolset
options[comm.POOLSET_DISK_TYPE] = poolsetDiskType
options[comm.KEY_POOLSET] = poolset
case playbook.CREATE_LOGICAL_POOL:
options[comm.KEY_CREATE_POOL_TYPE] = comm.POOL_TYPE_LOGICAL
options[comm.KEY_MIGRATE_SERVERS] = migrates
options[comm.KEY_NEW_TOPOLOGY_DATA] = data
options[comm.POOLSET] = poolset
options[comm.POOLSET_DISK_TYPE] = poolsetDiskType
options[comm.KEY_POOLSET] = poolset
case playbook.UPDATE_TOPOLOGY:
options[comm.KEY_NEW_TOPOLOGY_DATA] = data
case playbook.GET_MIGRATE_STATUS:
options[comm.KEY_MIGRATE_SERVERS] = migrates
}

pb.AddStep(&playbook.PlaybookStep{
Expand All @@ -252,6 +282,36 @@ func displayMigrateTitle(curveadm *cli.CurveAdm, data string) {
curveadm.WriteOutln(color.YellowString(" - Migrate host: from %s to %s", from.GetHost(), to.GetHost()))
}

func displayMigrateStatus(curveadm *cli.CurveAdm, role string) {
var output string
if role == topology.ROLE_CHUNKSERVER {
statuses := []common.MigrateStatus{}
v := curveadm.MemStorage().Get(comm.KEY_MIGRATE_STATUS)
if v != nil {
m := v.(map[string]common.MigrateStatus)
for _, status := range m {
statuses = append(statuses, status)
}
}

output = tui.FormatMigrateStatus(statuses)
} else {
statuses := []common.MigrateCommonStatus{}
v := curveadm.MemStorage().Get(comm.KEY_MIGRATE_COMMON_STATUS)
if v != nil {
m := v.(map[string]common.MigrateCommonStatus)
for _, status := range m {
statuses = append(statuses, status)
}
}

output = tui.FormatMigrateCommonStatus(statuses)
}

curveadm.WriteOutln("")
curveadm.WriteOut("%s", output)
}

func runMigrate(curveadm *cli.CurveAdm, options migrateOptions) error {
// TODO(P0): added prechek for target host
// 1) parse cluster topology
Expand All @@ -261,7 +321,7 @@ func runMigrate(curveadm *cli.CurveAdm, options migrateOptions) error {
}

// 2) read topology from file
data, err := readTopology(curveadm, options.filename)
data, err := readTopology(curveadm, options.filename, options.showStatus)
if err != nil {
return err
}
Expand All @@ -272,13 +332,15 @@ func runMigrate(curveadm *cli.CurveAdm, options migrateOptions) error {
return err
}

// 4) display title
displayMigrateTitle(curveadm, data)
if !options.showStatus {
// 4) display title
displayMigrateTitle(curveadm, data)

// 5) confirm by user
if pass := tui.ConfirmYes(tui.DEFAULT_CONFIRM_PROMPT); !pass {
curveadm.WriteOutln(tui.PromptCancelOpetation("migrate service"))
return errno.ERR_CANCEL_OPERATION
// 5) confirm by user
if pass := tuicomm.ConfirmYes(tuicomm.DEFAULT_CONFIRM_PROMPT); !pass {
curveadm.WriteOutln(tuicomm.PromptCancelOpetation("migrate service"))
return errno.ERR_CANCEL_OPERATION
}
}

// 6) generate migrate playbook
Expand All @@ -294,6 +356,10 @@ func runMigrate(curveadm *cli.CurveAdm, options migrateOptions) error {
}

// 9) print success prompt
if options.showStatus {
displayMigrateStatus(curveadm, getMigrates(curveadm, data)[0].From.GetRole())
return nil
}
curveadm.WriteOutln("")
curveadm.WriteOutln(color.GreenString("Services successfully migrateed ^_^."))
// TODO(P1): warning iff there is changed configs
Expand Down
8 changes: 5 additions & 3 deletions cli/command/scale_out.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func NewScaleOutCommand(curveadm *cli.CurveAdm) *cobra.Command {
return cmd
}

func readTopology(curveadm *cli.CurveAdm, filename string) (string, error) {
func readTopology(curveadm *cli.CurveAdm, filename string, showStatus bool) (string, error) {
if !utils.PathExist(filename) {
return "", errno.ERR_TOPOLOGY_FILE_NOT_FOUND.
F("%s: no such file", utils.AbsPath(filename))
Expand All @@ -156,7 +156,9 @@ func readTopology(curveadm *cli.CurveAdm, filename string) (string, error) {
}

oldData := curveadm.ClusterTopologyData()
curveadm.WriteOut("%s", utils.Diff(oldData, data))
if !showStatus {
curveadm.WriteOut("%s", utils.Diff(oldData, data))
}
return data, nil
}

Expand Down Expand Up @@ -384,7 +386,7 @@ func runScaleOut(curveadm *cli.CurveAdm, options scaleOutOptions) error {
}

// 2) read topology from file
data, err := readTopology(curveadm, options.filename)
data, err := readTopology(curveadm, options.filename, false)
if err != nil {
return err
}
Expand Down
17 changes: 11 additions & 6 deletions internal/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ const (
// format
KEY_ALL_FORMAT_STATUS = "ALL_FORMAT_STATUS"

// migrate
KEY_MIGRATE_STATUS = "MIGRATE_STATUS"
KEY_MIGRATE_COMMON_STATUS = "MIGRATE_COMMON_STATUS"

// check
KEY_CHECK_WITH_WEAK = "CHECK_WITH_WEAK"
KEY_CHECK_KERNEL_MODULE_NAME = "CHECK_KERNEL_MODULE_NAME"
Expand All @@ -71,12 +75,13 @@ const (
SERVICE_STATUS_UNKNOWN = "Unknown"

// clean
KEY_CLEAN_ITEMS = "CLEAN_ITEMS"
KEY_CLEAN_BY_RECYCLE = "CLEAN_BY_RECYCLE"
CLEAN_ITEM_LOG = "log"
CLEAN_ITEM_DATA = "data"
CLEAN_ITEM_CONTAINER = "container"
CLEANED_CONTAINER_ID = "-"
KEY_CLEAN_ITEMS = "CLEAN_ITEMS"
KEY_CLEAN_BY_RECYCLE = "CLEAN_BY_RECYCLE"
CLEAN_ITEM_LOG = "log"
CLEAN_ITEM_DATA = "data"
CLEAN_ITEM_CONTAINER = "container"
CLEANED_CONTAINER_ID = "-"
KEY_REMOVE_MIGRATED_SERVER = "REMOVE_MIGRATED_SERVER"

// client
KEY_CLIENT_HOST = "CLIENT_HOST"
Expand Down
49 changes: 37 additions & 12 deletions internal/configure/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,26 +263,51 @@ func ScaleOutClusterPool(old *CurveClusterTopo, dcs []*topology.DeployConfig, po
old.NPools = old.NPools + 1
}

func MigrateClusterServer(old *CurveClusterTopo, migrates []*MigrateServer) {
func MigrateClusterServer(old *CurveClusterTopo, migrates []*MigrateServer, removeMigratedServer bool) {
m := map[string]*topology.DeployConfig{} // key: from.Name, value: to.DeployConfig
// 此时m里面保存的就是所有的要迁移的server,比如migrate.From是curve16, migrate.To是curve21
for _, migrate := range migrates {
m[formatName(migrate.From)] = migrate.To
}

for i, server := range old.Servers {
dc, ok := m[server.Name]
if !ok {
continue
// server.InternalIp = dc.GetListenIp()
// server.ExternalIp = dc.GetListenExternalIp()
// server.Name = formatName(dc)
// if server.InternalPort != 0 && server.ExternalPort != 0 {
// server.InternalPort = dc.GetListenPort()
// server.ExternalPort = dc.GetListenExternalPort()
// }
// old.Servers[i] = server
// }

// add server that will migrate to
if !removeMigratedServer {
for fromName, toDc := range m {
server := Server{}
server.InternalIp = toDc.GetListenIp()
server.ExternalIp = toDc.GetListenExternalIp()
server.InternalPort = toDc.GetListenPort()
server.ExternalPort = toDc.GetListenExternalPort()
server.Name = formatName(toDc)

for _, oldServer := range old.Servers {
if oldServer.Name == fromName {
server.PhysicalPool = oldServer.PhysicalPool
server.Poolset = oldServer.Poolset
server.Zone = oldServer.Zone
}
}
old.Servers = append(old.Servers, server)
}
return
}

server.InternalIp = dc.GetListenIp()
server.ExternalIp = dc.GetListenExternalIp()
server.Name = formatName(dc)
if server.InternalPort != 0 && server.ExternalPort != 0 {
server.InternalPort = dc.GetListenPort()
server.ExternalPort = dc.GetListenExternalPort()
// remove server that has migrated
for i := 0; i < len(old.Servers); i++ {
_, ok := m[old.Servers[i].Name]
if ok {
old.Servers = append(old.Servers[:i], old.Servers[i+1:]...)
}
old.Servers[i] = server
}
}

Expand Down
6 changes: 5 additions & 1 deletion internal/errno/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,11 @@ var (
ERR_ENCRYPT_FILE_FAILED = EC(410021, "encrypt file failed")
ERR_CLIENT_ID_NOT_FOUND = EC(410022, "client id not found")
ERR_ENABLE_ETCD_AUTH_FAILED = EC(410023, "enable etcd auth failed")

ERR_MARK_CHUNKSERVER_PENDDING = EC(410024, "mark chunkserver pendding status failed when migrate")
RRR_GET_CLUSTER_MDSADDR = EC(410025, "failed to get cluster mds addr")
ERR_GET_CHUNKSERVER_COPYSET = EC(410026, "failed to get chunkserver copyset")
ERR_GET_MIGRATE_COPYSET = EC(410027, "migrate chunkserver copyset info must be 2")
ERR_CONTAINER_NOT_REMOVED = EC(410027, "container not removed")
// 420: common (curvebs client)
ERR_VOLUME_ALREADY_MAPPED = EC(420000, "volume already mapped")
ERR_VOLUME_CONTAINER_LOSED = EC(420001, "volume container is losed")
Expand Down
Loading

0 comments on commit 1153173

Please sign in to comment.