Skip to content

Commit

Permalink
Merge pull request #2118 from zhiqiang-bianjie/develop
Browse files Browse the repository at this point in the history
R4R: Solve the bug that snapshot is sometimes unsuccessful
  • Loading branch information
zhangyelong authored Dec 9, 2019
2 parents 82402a6 + aee9561 commit 0500be0
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 55 deletions.
2 changes: 1 addition & 1 deletion cmd/iris/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func main() {
tendermintCmd,
server.ResetCmd(ctx, cdc, resetAppState),
server.ExportCmd(ctx, cdc, exportAppStateAndTMValidators),
server.SnapshotCmd(cdc),
server.SnapshotCmd(ctx, cdc, resetAppState),
client.LineBreak,
)

Expand Down
143 changes: 89 additions & 54 deletions server/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"strings"

"github.com/irisnet/irishub/codec"

"github.com/spf13/cobra"
"github.com/spf13/viper"
bc "github.com/tendermint/tendermint/blockchain"
Expand All @@ -23,96 +22,101 @@ const flagTmpDir = "tmp-dir"
const pathSeparator = string(os.PathSeparator)

// SnapshotCmd delete historical block data and index data
func SnapshotCmd(cdc *codec.Codec) *cobra.Command {
func SnapshotCmd(ctx *Context, cdc *codec.Codec, appReset AppReset) *cobra.Command {
cmd := &cobra.Command{
Use: "snapshot",
Short: "snapshot the latest information and drop the others",
RunE: func(cmd *cobra.Command, args []string) error {
defer func() {
if r := recover(); r != nil {
err := r.(error)
ctx.Logger.Error("snapshot file is created failed", "err", err.Error())
}
}()

home := viper.GetString(tmcli.HomeFlag)
emptyState, err := isEmptyState(home)
if err != nil || emptyState {
fmt.Println("WARNING: State is not initialized.")
ctx.Logger.Error("State is not initialized")
return nil
}
srcDir := filepath.Join(home, "data")

targetDir := viper.GetString(flagTmpDir)
if len(targetDir) == 0 {
targetDir = filepath.Join(home, "data.bak")
}
dataDir := filepath.Join(home, "data")

if err = dumpData(srcDir, targetDir, cdc); err != nil {
if err = snapshot(ctx, cdc, dataDir, targetDir, appReset); err != nil {
_ = os.RemoveAll(targetDir)
ctx.Logger.Error("snapshot file is created failed")
return err
}
fmt.Println(fmt.Sprintf("snapshot file is stored in [%s]", targetDir))
ctx.Logger.Info("snapshot file is created successful", "location", targetDir)
return nil
},
}
cmd.Flags().String(flagTmpDir, "", "snapshot file storage directory")
cmd.Flags().String(flagTmpDir, "", "Snapshot file storage directory")
return cmd
}

func loadDb(name, path string) *dbm.GoLevelDB {
db, err := dbm.NewGoLevelDB(name, path)
if err != nil {
panic("load db failed")
panic(err)
}
return db
}

func dumpData(home, targetDir string, cdc *codec.Codec) error {
//save local current block and flush disk
lastHeight := snapshotBlock(home, targetDir)
func snapshot(ctx *Context, cdc *codec.Codec, dataDir, targetDir string, appReset AppReset) error {
blockDB := loadDb("blockstore", dataDir)
blockStore := bc.NewBlockStore(blockDB)

//save local current block height consensus data
_ = snapshotCsWAL(home, targetDir, lastHeight)
stateDB := loadDb("state", dataDir)
state := tmsm.LoadState(stateDB)

//save local current block height state
if err := snapshotState(home, targetDir, lastHeight, cdc); err != nil {
return err
defer func() {
blockDB.Close()
stateDB.Close()
}()
if blockStore.Height() != state.LastBlockHeight {
if err := reset(ctx, appReset, state.LastBlockHeight); err != nil {
return err
}
}

//save local current block and flush disk
snapshotBlock(blockStore, targetDir, state.LastBlockHeight)
//save local current block height state
snapshotState(cdc, stateDB, targetDir)
//save local current block height consensus data
snapshotCsWAL(ctx, dataDir, targetDir, state.LastBlockHeight)

//copy application
appDir := filepath.Join(home, "application.db")
appDir := filepath.Join(dataDir, "application.db")
appTargetDir := filepath.Join(targetDir, "application.db")
if err := copyDir(appDir, appTargetDir); err != nil {
return err
}

//copy evidence.db
evidenceDir := filepath.Join(home, "evidence.db")
evidenceDir := filepath.Join(dataDir, "evidence.db")
evidenceTargetDir := filepath.Join(targetDir, "evidence.db")
return copyDir(evidenceDir, evidenceTargetDir)
}

func snapshotState(home, targetDir string, height int64, cdc *codec.Codec) error {
originDb := loadDb("state", home)
defer originDb.Close()

func snapshotState(cdc *codec.Codec, tmDB *dbm.GoLevelDB, targetDir string) {
targetDb := loadDb("state", targetDir)
defer targetDb.Close()

state := tmsm.LoadState(originDb)

if height != state.LastBlockHeight {
return fmt.Errorf("wrong block height,should: %d, but got %d. please restart the node, then try to snapshot again", height, state.LastBlockHeight)
}
state := tmsm.LoadState(tmDB)

saveValidatorsInfo(cdc, originDb, targetDb, height)
saveConsensusParamsInfo(cdc, originDb, targetDb, height)
saveValidatorsInfo(cdc, tmDB, targetDb, state.LastBlockHeight)
saveConsensusParamsInfo(cdc, tmDB, targetDb, state.LastBlockHeight)
tmsm.SaveState(targetDb, state)

return nil
}

func snapshotBlock(home, targetDir string) int64 {
originDb := loadDb("blockstore", home)
defer originDb.Close()

originStore := bc.NewBlockStore(originDb)
height := originStore.Height()

func snapshotBlock(originStore *bc.BlockStore, targetDir string, height int64) int64 {
targetDb := loadDb("blockstore", targetDir)
defer targetDb.Close()

Expand All @@ -127,42 +131,54 @@ func snapshotBlock(home, targetDir string) int64 {
return height
}

func snapshotCsWAL(home, targetDir string, height int64) error {
func snapshotCsWAL(ctx *Context, home, targetDir string, height int64) {
walTargetDir := filepath.Join(targetDir, "cs.wal", "wal")
targetWAL, err := consensus.NewWAL(walTargetDir)

walSourceDir := filepath.Join(home, "cs.wal", "wal")
sourceWAL, err := consensus.NewWAL(walSourceDir)
if err != nil {
return fmt.Errorf("failed to open WAL for consensus state")
ctx.Logger.Info("failed to open WAL for consensus state", "err", err.Error())
return
}

gr, found, err := sourceWAL.SearchForEndHeight(height, &consensus.WALSearchOptions{IgnoreDataCorruptionErrors: true})
if gr != nil {
defer gr.Close()
}
if err != nil {
return err
}
if !found {
return fmt.Errorf("not found cs.wal file in %s", walSourceDir)

if err != nil || !found {
ctx.Logger.Info(fmt.Sprintf("cannot replay height %d. WAL does not contain #ENDHEIGHT for %d", height, height-1))
return
}

defer func() {
if err = gr.Close(); err != nil {
ctx.Logger.Info("resource release failed", "err", err.Error())
return
}
}()

var msg *consensus.TimedWALMessage
dec := consensus.NewWALDecoder(gr)
for {
msg, err = dec.Decode()
if err == io.EOF {
break
} else if consensus.IsDataCorruptionError(err) {
return fmt.Errorf("data has been corrupted in last height %d of consensus WAL", height)
ctx.Logger.Info("data has been corrupted in last height %d of consensus WAL", height)
return
} else if err != nil {
return err
ctx.Logger.Info("decode WALMessage failed", "err", err.Error())
return
}
if err := targetWAL.Write(msg.Msg); err != nil {
ctx.Logger.Info("write data to file failed", "err", err.Error())
return
}
_ = targetWAL.Write(msg.Msg)
}
_ = targetWAL.WriteSync(consensus.EndHeightMessage{Height: height})
return nil
err = targetWAL.WriteSync(consensus.EndHeightMessage{Height: height})
if err != nil {
ctx.Logger.Info("write data to file failed", "err", err.Error())
return
}
}

func copyDir(srcPath string, destPath string) error {
Expand Down Expand Up @@ -207,10 +223,10 @@ func copyFile(src, dest string) (w int64, err error) {
}
}
dstFile, err := os.Create(dest)
defer dstFile.Close()
if err != nil {
return
}
defer dstFile.Close()
return io.Copy(dstFile, srcFile)
}

Expand Down Expand Up @@ -282,3 +298,22 @@ func calcValidatorsKey(height int64) []byte {
func calcConsensusParamsKey(height int64) []byte {
return []byte(fmt.Sprintf("consensusParamsKey:%v", height))
}

func reset(ctx *Context, appReset AppReset, height int64) error {
cfg := ctx.Config
home := cfg.RootDir
traceWriterFile := viper.GetString(flagTraceStore)

db, err := openDB(home)
if err != nil {
return err
}
traceWriter, err := openTraceWriter(traceWriterFile)
if err != nil {
return err
}
if err := appReset(ctx, ctx.Logger, db, traceWriter, height); err != nil {
return err
}
return nil
}

0 comments on commit 0500be0

Please sign in to comment.