Skip to content

Commit

Permalink
fix(decomposedfs): Fix sync propagation
Browse files Browse the repository at this point in the history
Signed-off-by: jkoberg <[email protected]>
  • Loading branch information
kobergj committed Nov 11, 2024
1 parent e4191d8 commit 8a3bfd5
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 117 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/fix-sync-propagation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Bugfix: Fix sync propagation

Fixes the defers in the sync propagation.

https://github.com/cs3org/reva/pull/4924
236 changes: 119 additions & 117 deletions pkg/storage/utils/decomposedfs/tree/propagator/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/metadata/prefixes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/rogpeppe/go-internal/lockedfile"
"github.com/rs/zerolog"
)

// SyncPropagator implements synchronous treetime & treesize propagation
Expand Down Expand Up @@ -72,137 +73,138 @@ func (p SyncPropagator) Propagate(ctx context.Context, n *node.Node, sizeDiff in
sTime := time.Now().UTC()

// we loop until we reach the root
var err error
for err == nil && n.ID != root.ID {
sublog.Debug().Msg("propagating")

attrs := node.Attributes{}

var f *lockedfile.File
// lock parent before reading treesize or tree time

_, subspan := tracer.Start(ctx, "lockedfile.OpenFile")
parentFilename := p.lookup.MetadataBackend().LockfilePath(n.ParentPath())
f, err = lockedfile.OpenFile(parentFilename, os.O_RDWR|os.O_CREATE, 0600)
subspan.End()
if err != nil {
sublog.Error().Err(err).
Str("parent filename", parentFilename).
Msg("Propagation failed. Could not open metadata for parent with lock.")
return err
}
// always log error if closing node fails
defer func() {
// ignore already closed error
cerr := f.Close()
if err == nil && cerr != nil && !errors.Is(cerr, os.ErrClosed) {
err = cerr // only overwrite err with en error from close if the former was nil
}
_ = os.Remove(f.Name())
}()
var (
err error
stop bool
)

if n, err = n.Parent(ctx); err != nil {
sublog.Error().Err(err).
Msg("Propagation failed. Could not read parent node.")
return err
}
for err == nil && !stop && n.ID != root.ID {
n, stop, err = p.propagateItem(ctx, n, sTime, sizeDiff, sublog)
}

if !n.HasPropagation(ctx) {
sublog.Debug().Str("attr", prefixes.PropagationAttr).Msg("propagation attribute not set or unreadable, not propagating")
// if the attribute is not set treat it as false / none / no propagation
return nil
}
if err != nil {
sublog.Error().Err(err).Msg("error propagating")
return err
}
return nil
}

sublog = sublog.With().Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Logger()

if p.treeTimeAccounting {
// update the parent tree time if it is older than the nodes mtime
updateSyncTime := false

var tmTime time.Time
tmTime, err = n.GetTMTime(ctx)
switch {
case err != nil:
// missing attribute, or invalid format, overwrite
sublog.Debug().Err(err).
Msg("could not read tmtime attribute, overwriting")
updateSyncTime = true
case tmTime.Before(sTime):
sublog.Debug().
Time("tmtime", tmTime).
Time("stime", sTime).
Msg("parent tmtime is older than node mtime, updating")
updateSyncTime = true
default:
sublog.Debug().
Time("tmtime", tmTime).
Time("stime", sTime).
Dur("delta", sTime.Sub(tmTime)).
Msg("parent tmtime is younger than node mtime, not updating")
}
func (p SyncPropagator) propagateItem(ctx context.Context, n *node.Node, sTime time.Time, sizeDiff int64, log zerolog.Logger) (*node.Node, bool, error) {
log.Debug().Msg("propagating")

if updateSyncTime {
// update the tree time of the parent node
attrs.SetString(prefixes.TreeMTimeAttr, sTime.UTC().Format(time.RFC3339Nano))
}
attrs := node.Attributes{}

var f *lockedfile.File
// lock parent before reading treesize or tree time

attrs.SetString(prefixes.TmpEtagAttr, "")
_, subspan := tracer.Start(ctx, "lockedfile.OpenFile")
parentFilename := p.lookup.MetadataBackend().LockfilePath(n.ParentPath())
f, err := lockedfile.OpenFile(parentFilename, os.O_RDWR|os.O_CREATE, 0600)
subspan.End()
if err != nil {
log.Error().Err(err).
Str("parent filename", parentFilename).
Msg("Propagation failed. Could not open metadata for parent with lock.")
return nil, true, err
}
// always log error if closing node fails
defer func() {
// ignore already closed error
cerr := f.Close()
if err == nil && cerr != nil && !errors.Is(cerr, os.ErrClosed) {
err = cerr // only overwrite err with en error from close if the former was nil
}
_ = os.Remove(f.Name())
}()

// size accounting
if p.treeSizeAccounting && sizeDiff != 0 {
var newSize uint64

// read treesize
treeSize, err := n.GetTreeSize(ctx)
switch {
case metadata.IsAttrUnset(err):
// fallback to calculating the treesize
sublog.Warn().Msg("treesize attribute unset, falling back to calculating the treesize")
newSize, err = calculateTreeSize(ctx, p.lookup, n.InternalPath())
if err != nil {
return err
}
case err != nil:
sublog.Error().Err(err).
Msg("Faild to propagate treesize change. Error when reading the treesize attribute from parent")
return err
case sizeDiff > 0:
newSize = treeSize + uint64(sizeDiff)
case uint64(-sizeDiff) > treeSize:
// The sizeDiff is larger than the current treesize. Which would result in
// a negative new treesize. Something must have gone wrong with the accounting.
// Reset the current treesize to 0.
sublog.Error().Uint64("treeSize", treeSize).Int64("sizeDiff", sizeDiff).
Msg("Error when updating treesize of parent node. Updated treesize < 0. Reestting to 0")
newSize = 0
default:
newSize = treeSize - uint64(-sizeDiff)
}
if n, err = n.Parent(ctx); err != nil {
log.Error().Err(err).
Msg("Propagation failed. Could not read parent node.")
return n, true, err
}

if !n.HasPropagation(ctx) {
log.Debug().Str("attr", prefixes.PropagationAttr).Msg("propagation attribute not set or unreadable, not propagating")
// if the attribute is not set treat it as false / none / no propagation
return n, true, nil
}

// update the tree size of the node
attrs.SetString(prefixes.TreesizeAttr, strconv.FormatUint(newSize, 10))
sublog.Debug().Uint64("newSize", newSize).Msg("updated treesize of parent node")
log = log.With().Str("spaceid", n.SpaceID).Str("nodeid", n.ID).Logger()

if p.treeTimeAccounting {
// update the parent tree time if it is older than the nodes mtime
updateSyncTime := false

var tmTime time.Time
tmTime, err = n.GetTMTime(ctx)
switch {
case err != nil:
// missing attribute, or invalid format, overwrite
log.Debug().Err(err).
Msg("could not read tmtime attribute, overwriting")
updateSyncTime = true
case tmTime.Before(sTime):
log.Debug().
Time("tmtime", tmTime).
Time("stime", sTime).
Msg("parent tmtime is older than node mtime, updating")
updateSyncTime = true
default:
log.Debug().
Time("tmtime", tmTime).
Time("stime", sTime).
Dur("delta", sTime.Sub(tmTime)).
Msg("parent tmtime is younger than node mtime, not updating")
}

if err = n.SetXattrsWithContext(ctx, attrs, false); err != nil {
sublog.Error().Err(err).Msg("Failed to update extend attributes of parent node")
return err
if updateSyncTime {
// update the tree time of the parent node
attrs.SetString(prefixes.TreeMTimeAttr, sTime.UTC().Format(time.RFC3339Nano))
}

// Release node lock early, ignore already closed error
_, subspan = tracer.Start(ctx, "f.Close")
cerr := f.Close()
subspan.End()
if cerr != nil && !errors.Is(cerr, os.ErrClosed) {
sublog.Error().Err(cerr).Msg("Failed to close parent node and release lock")
return cerr
attrs.SetString(prefixes.TmpEtagAttr, "")
}

// size accounting
if p.treeSizeAccounting && sizeDiff != 0 {
var newSize uint64

// read treesize
treeSize, err := n.GetTreeSize(ctx)
switch {
case metadata.IsAttrUnset(err):
// fallback to calculating the treesize
log.Warn().Msg("treesize attribute unset, falling back to calculating the treesize")
newSize, err = calculateTreeSize(ctx, p.lookup, n.InternalPath())
if err != nil {
return n, true, err
}
case err != nil:
log.Error().Err(err).
Msg("Faild to propagate treesize change. Error when reading the treesize attribute from parent")
return n, true, err
case sizeDiff > 0:
newSize = treeSize + uint64(sizeDiff)
case uint64(-sizeDiff) > treeSize:
// The sizeDiff is larger than the current treesize. Which would result in
// a negative new treesize. Something must have gone wrong with the accounting.
// Reset the current treesize to 0.
log.Error().Uint64("treeSize", treeSize).Int64("sizeDiff", sizeDiff).
Msg("Error when updating treesize of parent node. Updated treesize < 0. Reestting to 0")
newSize = 0
default:
newSize = treeSize - uint64(-sizeDiff)
}

// update the tree size of the node
attrs.SetString(prefixes.TreesizeAttr, strconv.FormatUint(newSize, 10))
log.Debug().Uint64("newSize", newSize).Msg("updated treesize of parent node")
}
if err != nil {
sublog.Error().Err(err).Msg("error propagating")
return err

if err = n.SetXattrsWithContext(ctx, attrs, false); err != nil {
log.Error().Err(err).Msg("Failed to update extend attributes of parent node")
return n, true, err
}
return nil

return n, false, nil
}

0 comments on commit 8a3bfd5

Please sign in to comment.