Skip to content

Commit

Permalink
Registry file fsync improvements (#6988)
Browse files Browse the repository at this point in the history
* Registry file fsync improvements

- Return error if Sync fails
- Execute fsync on new parent directory
- improve metrics:
  - registrar.writes.total: total number of registry write attempts
  - registrar.writes.fail: total number of failed write attempts
  - registrar.writes.success: total number of successfull write attempts
  • Loading branch information
Steffen Siering authored and ph committed May 3, 2018
1 parent ffa812b commit 5916636
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 34 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di
- Fix panic when log prospector configuration fails to load. {issue}6800[6800]
- Fix memory leak in log prospector when files cannot be read. {issue}6797[6797]
- Add raw JSON to message field when JSON parsing fails. {issue}6516[6516]
- Commit registry writes to stable storage to avoid corrupt registry files. {pull}6877[6877]
- Commit registry writes to stable storage to avoid corrupt registry files. {issue}6792[6792]
- Fix a data race between stopping and starting of the harverters. {issue}#6879[6879]

*Heartbeat*
Expand Down
87 changes: 54 additions & 33 deletions filebeat/registrar/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ import (
)

type Registrar struct {
Channel chan []file.State
out successLogger
done chan struct{}
registryFile string // Path to the Registry File
registryFilePermissions os.FileMode // Permissions to apply on the Registry File
wg sync.WaitGroup
Channel chan []file.State
out successLogger
done chan struct{}
registryFile string // Path to the Registry File
fileMode os.FileMode // Permissions to apply on the Registry File
wg sync.WaitGroup

states *file.States // Map with all file paths inside and the corresponding state
gcRequired bool // gcRequired is set if registry state needs to be gc'ed before the next write
Expand All @@ -35,16 +35,20 @@ type successLogger interface {
}

var (
statesUpdate = monitoring.NewInt(nil, "registrar.states.update")
statesCleanup = monitoring.NewInt(nil, "registrar.states.cleanup")
statesCurrent = monitoring.NewInt(nil, "registrar.states.current")
registryWrites = monitoring.NewInt(nil, "registrar.writes")
statesUpdate = monitoring.NewInt(nil, "registrar.states.update")
statesCleanup = monitoring.NewInt(nil, "registrar.states.cleanup")
statesCurrent = monitoring.NewInt(nil, "registrar.states.current")
registryWrites = monitoring.NewInt(nil, "registrar.writes.total")
registryFails = monitoring.NewInt(nil, "registrar.writes.fail")
registrySuccess = monitoring.NewInt(nil, "registrar.writes.success")
)

func New(registryFile string, registryFilePermissions os.FileMode, flushTimeout time.Duration, out successLogger) (*Registrar, error) {
// New creates a new Registrar instance, updating the registry file on
// `file.State` updates. New fails if the file can not be opened or created.
func New(registryFile string, fileMode os.FileMode, flushTimeout time.Duration, out successLogger) (*Registrar, error) {
r := &Registrar{
registryFile: registryFile,
registryFilePermissions: registryFilePermissions,
registryFile: registryFile,
fileMode: fileMode,
done: make(chan struct{}),
states: file.NewStates(),
Channel: make(chan []file.State, 1),
Expand Down Expand Up @@ -258,38 +262,55 @@ func (r *Registrar) flushRegistry() {

// writeRegistry writes the new json registry file to disk.
func (r *Registrar) writeRegistry() error {
// First clean up states
r.gcStates()
states := r.states.GetStates()
statesCurrent.Set(int64(len(states)))

logp.Debug("registrar", "Write registry file: %s", r.registryFile)
registryWrites.Inc()

tempfile := r.registryFile + ".new"
f, err := os.OpenFile(tempfile, os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_SYNC, r.registryFilePermissions)
tempfile, err := writeTmpFile(r.registryFile, r.fileMode, states)
if err != nil {
logp.Err("Failed to create tempfile (%s) for writing: %s", tempfile, err)
registryFails.Inc()
return err
}

// First clean up states
states := r.states.GetStates()

encoder := json.NewEncoder(f)
err = encoder.Encode(states)
err = helper.SafeFileRotate(r.registryFile, tempfile)
if err != nil {
f.Close()
logp.Err("Error when encoding the states: %s", err)
registryFails.Inc()
return err
}

// Commit the changes to storage to avoid corrupt registry files
f.Sync()
// Directly close file because of windows
f.Close()
logp.Debug("registrar", "Registry file updated. %d states written.", len(states))
registrySuccess.Inc()

err = helper.SafeFileRotate(r.registryFile, tempfile)
return nil
}

logp.Debug("registrar", "Registry file updated. %d states written.", len(states))
registryWrites.Add(1)
statesCurrent.Set(int64(len(states)))
func writeTmpFile(baseName string, perm os.FileMode, states []file.State) (string, error) {
logp.Debug("registrar", "Write registry file: %s", baseName)

tempfile := baseName + ".new"
f, err := os.OpenFile(tempfile, os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_SYNC, perm)
if err != nil {
logp.Err("Failed to create tempfile (%s) for writing: %s", tempfile, err)
return "", err
}

defer f.Close()

encoder := json.NewEncoder(f)

if err := encoder.Encode(states); err != nil {
logp.Err("Error when encoding the states: %s", err)
return "", err
}

// Commit the changes to storage to avoid corrupt registry files
if err = f.Sync(); err != nil {
logp.Err("Error when syncing new registry file contents: %s", err)
return "", err
}

return err
return tempfile, nil
}
14 changes: 14 additions & 0 deletions libbeat/common/file/helper_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,26 @@ package file

import (
"os"
"path/filepath"
)

// SafeFileRotate safely rotates an existing file under path and replaces it with the tempfile
func SafeFileRotate(path, tempfile string) error {
parent := filepath.Dir(path)

if e := os.Rename(tempfile, path); e != nil {
return e
}

// best-effort fsync on parent directory. The fsync is required by some
// filesystems, so to update the parents directory metadata to actually
// contain the new file being rotated in.
f, err := os.Open(parent)
if err != nil {
return nil // ignore error
}
defer f.Close()
f.Sync()

return nil
}

0 comments on commit 5916636

Please sign in to comment.