Skip to content

Commit

Permalink
refactor: refactor to disk space error while backing up (#474)
Browse files Browse the repository at this point in the history
* fix: detect disk space error

Signed-off-by: mlycore <[email protected]>

* feat: add data node ip and address to error message

Signed-off-by: mlycore <[email protected]>

* fix: remove useless log

Signed-off-by: mlycore <[email protected]>

* feat: add disk space error check to cmd output

Signed-off-by: mlycore <[email protected]>

* fix: fix return error

Signed-off-by: mlycore <[email protected]>

* feat: hide delete backupfiles output

Signed-off-by: mlycore <[email protected]>

* fix: remove comments

Signed-off-by: mlycore <[email protected]>

* chore: add golint comment

Signed-off-by: mlycore <[email protected]>

* fix: fix test

Signed-off-by: mlycore <[email protected]>

---------

Signed-off-by: mlycore <[email protected]>
  • Loading branch information
mlycore authored Nov 27, 2023
1 parent cf30c75 commit 31e7cbc
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 29 deletions.
23 changes: 13 additions & 10 deletions pitr/agent/internal/pkg/opengauss.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ const (
)

func (og *openGauss) AsyncBackup(backupPath, instanceName, backupMode string, threadsNum uint8, dbPort uint16) (string, error) {
var (
bid string
err error
)
cmd := fmt.Sprintf(_backupFmt, backupPath, instanceName, backupMode, og.pgData, threadsNum, dbPort)
outputs, err := cmds.AsyncExec(og.shell, cmd)
if err != nil {
Expand All @@ -117,17 +121,15 @@ func (og *openGauss) AsyncBackup(backupPath, instanceName, backupMode string, th
return "", output.Error
}

// get the backup id from the first line
bid, err := og.getBackupID(output.Message)
if err != nil {
og.log.Error(fmt.Sprintf("og.getBackupID[source=%s] return err wrap: %s", output.Message, err))
return "", err
if strings.Contains(output.Message, "INFO: Backup start") {
bid, err = og.getBackupID(output.Message)
if err != nil {
og.log.Error(fmt.Sprintf("og.getBackupID[source=%s] return err wrap: %s", output.Message, err))
return "", err
}
}
// ignore other output
go og.ignore(outputs)
return bid, nil //nolint
}
return "", fmt.Errorf("unknow err")
return bid, nil //nolint
}

//nolint:dupl
Expand Down Expand Up @@ -192,7 +194,7 @@ func (og *openGauss) AddInstance(backupPath, instance string) error {

if errors.Is(err, cons.CmdOperateFailed) {
og.log.Error(fmt.Sprintf("add instance failure[output=%s], err: %s, wrap: %s", output, err, cons.InstanceAlreadyExist))
return err
return fmt.Errorf("add instance failure[output=%s], err: %s, wrap: %w", output, err, cons.InstanceAlreadyExist)
}
if err != nil {
og.log.Error(fmt.Sprintf(_CmdErrorFmt, og.shell, cmd, cons.CmdAddInstanceFailed))
Expand Down Expand Up @@ -347,6 +349,7 @@ func (og *openGauss) ShowBackupList(backupPath, instanceName string) ([]*model.B
return og.showbackup(cmd, instanceName)
}

//nolint:unused
func (og *openGauss) ignore(outputs chan *cmds.Output) {
defer func() {
_ = recover()
Expand Down
36 changes: 27 additions & 9 deletions pitr/agent/pkg/cmds/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func AsyncExec(name string, args ...string) (chan *Output, error) {
if err != nil {
return nil, fmt.Errorf("can not obtain stdout pipe for command[args=%+v]:%s", args, err)
}

if err = cmd.Start(); err != nil {
return nil, fmt.Errorf("the command is err[args=%+v]:%s", args, err)
}
Expand All @@ -61,11 +62,16 @@ func AsyncExec(name string, args ...string) (chan *Output, error) {
go func() {
if err = syncutils.NewRecoverFuncWithErrRet("", func() error {
for scanner.Scan() {
output <- &Output{
op := &Output{
LineNo: index,
Message: scanner.Text(),
Error: err,
}
if strings.Contains(scanner.Text(), "No space left on device") {
op.Error = fmt.Errorf("%s", "No space left on device")
}

output <- op
index++
}

Expand All @@ -78,13 +84,14 @@ func AsyncExec(name string, args ...string) (chan *Output, error) {

if err = cmd.Wait(); err != nil {
if ee, ok := err.(*exec.ExitError); ok {
logging.Error(fmt.Sprintf("exec failure[ee=%s], wrap=%s", ee, cons.CmdOperateFailed))
output <- &Output{
Error: fmt.Errorf("exec failure[ee=%s], wrap=%w", ee, cons.CmdOperateFailed),
}
} else {
output <- &Output{
Error: fmt.Errorf("%s err: %s", cmd.String(), err),
}
}

output <- &Output{
Error: fmt.Errorf("%s err: %s", cmd.String(), err),
}

}
return nil
})(); err != nil {
Expand Down Expand Up @@ -113,6 +120,12 @@ func Exec(name string, args ...string) (string, error) {
if err != nil {
return "", fmt.Errorf("can not obtain stdout pipe for command[args=%+v]:%s", args, err)
}

stderr, err := cmd.StderrPipe()
if err != nil {
return "", fmt.Errorf("can not obtain stderr pipe for cmand[args=%+v]:%s", args, err)
}

if err = cmd.Start(); err != nil {
return "", fmt.Errorf("the command is err[args=%+v]:%s", args, err)
}
Expand All @@ -122,11 +135,16 @@ func Exec(name string, args ...string) (string, error) {
return "", fmt.Errorf("io.ReadAll return err=%w", err)
}

ereader, err := io.ReadAll(stderr)
if err != nil {
return "", fmt.Errorf("io.ReadAll return err=%w", err)
}

if err = cmd.Wait(); err != nil {
if ee, ok := err.(*exec.ExitError); ok {
logging.Error(fmt.Sprintf("exec failure[ee=%s,stdout=%s]", ee, string(reader)))
return "", fmt.Errorf("exec failure[ee=%s,stdout=%s], wrap:%w", ee, string(reader), cons.CmdOperateFailed)
}
return "", fmt.Errorf("%s err: %s", cmd.String(), err)
return "", fmt.Errorf("%s err: %s", cmd.String(), string(ereader))
}
return string(reader), nil
}
Expand Down
15 changes: 7 additions & 8 deletions pitr/cli/internal/cmd/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,7 @@ func backup() error {
}

if lsBackup != nil {
if cancel {
deleteBackupFiles(ls, lsBackup, deleteModeQuiet)
} else {
logging.Warn("Try to delete backup data ...")
deleteBackupFiles(ls, lsBackup, deleteModeNormal)
}
deleteBackupFiles(ls, lsBackup, deleteModeQuiet)
}
}
}()
Expand Down Expand Up @@ -310,20 +305,24 @@ func _execBackup(as pkg.IAgentServer, node *model.StorageNode, dnCh chan *model.
Instance: defaultInstance,
}
backupID, err := as.Backup(in)
status := model.SsBackupStatusRunning
if err != nil {
return xerr.NewCliErr(err.Error())
status = model.BackupStatus(err.Error())
}

// update DnList of lsBackup
dn := &model.DataNode{
IP: node.IP,
Port: node.Port,
Status: model.SsBackupStatusRunning,
Status: status,
BackupID: backupID,
StartTime: timeutil.Now().String(),
EndTime: timeutil.Init(),
}
dnCh <- dn
if err != nil {
return fmt.Errorf("data node %s:%d backup error: %s", node.IP, node.Port, err)
}
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions pitr/cli/internal/cmd/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,9 @@ var _ = Describe("Backup", func() {

as.EXPECT().Backup(gomock.Any()).Return("", xerr.NewCliErr("backup failed"))

Expect(_execBackup(as, bak.SsBackup.StorageNodes[0], dnCh)).ToNot(BeNil())
Expect(_execBackup(as, bak.SsBackup.StorageNodes[1], dnCh)).ToNot(BeNil())
close(dnCh)
Expect(len(dnCh)).To(Equal(1))
Expect(len(dnCh)).To(Equal(2))

})
})
Expand Down

0 comments on commit 31e7cbc

Please sign in to comment.