Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decomposedfs fix revision download #3473

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
fced7bd
rewrite finish upload to get atomic size diff
butonic Nov 17, 2022
c58f166
decomposedfs: make finish upload atomic
butonic Nov 17, 2022
150fe7d
add lock functions
butonic Nov 17, 2022
4dae4b6
allow locking non existing files, fix locking with existing lock
butonic Nov 17, 2022
3d0977a
make returned error recognizable
butonic Nov 17, 2022
11fc7c4
more lock fixes
butonic Nov 17, 2022
47f29a1
do not log nil error
butonic Nov 18, 2022
bf30d98
don't overwrite original error when deleting the blob fails
butonic Nov 18, 2022
786d0b3
always release node lock
butonic Nov 18, 2022
b955128
keep correct mtimes
butonic Nov 18, 2022
3db9243
fix adler checksum
butonic Nov 18, 2022
38c1d93
stat before closing
butonic Nov 18, 2022
cb990cd
fix lint ... and proper revision download is not covered by the CS3 api
butonic Nov 18, 2022
57aaca2
fix permissions when downloading grants
butonic Nov 18, 2022
82419a3
update changelog
butonic Nov 18, 2022
0686a7c
fix locks and revision restore
butonic Nov 18, 2022
b162894
assemble permissions on the node when checking a revision
butonic Nov 18, 2022
d9534ef
fix typos
butonic Nov 18, 2022
36b8a84
allow revision download when user has initiate download and list revi…
butonic Nov 18, 2022
c90d8c4
fix reading revision node
butonic Nov 21, 2022
8679103
do not forget revision delimiter
butonic Nov 21, 2022
f2d29bb
drop old revision
butonic Nov 21, 2022
2401c1f
remove unexpected failures
butonic Nov 21, 2022
19ee0cc
update changelog and unexpected passes
butonic Nov 21, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions changelog/unreleased/decomposedfs-finish-upload-rewrite.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Bugfix: decomposedfs fix revision download

We rewrote the finish upload code to use a write lock when creating and updating node metadata. This prevents some cornercases, allows us to calculate the size diff atomically and fixes downloading revisions.

https://github.com/cs3org/reva/pull/3473
https://github.com/owncloud/ocis/issues/765
https://github.com/owncloud/ocis/issues/3868
10 changes: 10 additions & 0 deletions internal/http/services/owncloud/ocdav/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"net/http"
"path"
"strings"

rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
Expand Down Expand Up @@ -70,6 +71,15 @@ func (h *MetaHandler) Handler(s *svc) http.Handler {
errors.HandleWebdavError(logger, w, b, err)
return
}
if did.StorageId == "" && did.OpaqueId == "" && strings.Count(id, ":") >= 2 {
logger := appctx.GetLogger(r.Context())
logger.Warn().Str("id", id).Msg("detected invalid : separated resourceid id, trying to split it ... but fix the client that made the request")
// try splitting with :
parts := strings.SplitN(id, ":", 3)
did.StorageId = parts[0]
did.SpaceId = parts[1]
did.OpaqueId = parts[2]
}

var head string
head, r.URL.Path = router.ShiftPath(r.URL.Path)
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/utils/decomposedfs/decomposedfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"path"
"path/filepath"
"strconv"
"strings"
"syscall"

cs3permissions "github.com/cs3org/go-cs3apis/cs3/permissions/v1beta1"
Expand Down Expand Up @@ -591,6 +592,12 @@ func (fs *Decomposedfs) Delete(ctx context.Context, ref *provider.Reference) (er

// Download returns a reader to the specified resource
func (fs *Decomposedfs) Download(ctx context.Context, ref *provider.Reference) (io.ReadCloser, error) {
// check if we are trying to download a revision
// TODO the CS3 api should allow initiating a revision download
if ref.ResourceId != nil && strings.Contains(ref.ResourceId.OpaqueId, node.RevisionIDDelimiter) {
return fs.DownloadRevision(ctx, ref, ref.ResourceId.OpaqueId)
}

node, err := fs.lu.NodeFromResource(ctx, ref)
if err != nil {
return nil, errors.Wrap(err, "Decomposedfs: error resolving ref")
Expand Down
6 changes: 2 additions & 4 deletions pkg/storage/utils/decomposedfs/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ func (fs *Decomposedfs) SetArbitraryMetadata(ctx context.Context, ref *provider.
if md.Metadata != nil {
if val, ok := md.Metadata["mtime"]; ok {
delete(md.Metadata, "mtime")
err := n.SetMtime(ctx, val)
if err != nil {
if err := n.SetMtimeString(val); err != nil {
errs = append(errs, errors.Wrap(err, "could not set mtime"))
}
}
Expand All @@ -85,8 +84,7 @@ func (fs *Decomposedfs) SetArbitraryMetadata(ctx context.Context, ref *provider.
// TODO unset when folder is updated or add timestamp to etag?
if val, ok := md.Metadata["etag"]; ok {
delete(md.Metadata, "etag")
err := n.SetEtag(ctx, val)
if err != nil {
if err := n.SetEtag(ctx, val); err != nil {
errs = append(errs, errors.Wrap(err, "could not set etag"))
}
}
Expand Down
62 changes: 42 additions & 20 deletions pkg/storage/utils/decomposedfs/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,19 @@ func ReadNode(ctx context.Context, lu PathLookup, spaceID, nodeID string, canLis
return r, nil
}

// are we reading a revision?
revisionSuffix := ""
if strings.Contains(nodeID, RevisionIDDelimiter) {
// verify revision key format
kp := strings.SplitN(nodeID, RevisionIDDelimiter, 2)
if len(kp) == 2 {
// use the actual node for the metadata lookup
nodeID = kp[0]
// remember revision for blob metadata
revisionSuffix = RevisionIDDelimiter + kp[1]
}
}

// read node
n = &Node{
SpaceID: spaceID,
Expand All @@ -223,6 +236,11 @@ func ReadNode(ctx context.Context, lu PathLookup, spaceID, nodeID string, canLis
SpaceRoot: r,
}

// append back revision to nodeid, even when returning a not existing node
defer func() {
n.ID += revisionSuffix
}()

nodePath := n.InternalPath()

// lookup name in extended attributes
Expand All @@ -237,7 +255,7 @@ func ReadNode(ctx context.Context, lu PathLookup, spaceID, nodeID string, canLis
n.Exists = true

// lookup blobID in extended attributes
n.BlobID, err = n.Xattr(xattrs.BlobIDAttr)
n.BlobID, err = ReadBlobIDAttr(nodePath + revisionSuffix)
switch {
case xattrs.IsNotExist(err):
return n, nil // swallow not found, the node defaults to exists = false
Expand All @@ -246,7 +264,7 @@ func ReadNode(ctx context.Context, lu PathLookup, spaceID, nodeID string, canLis
}

// Lookup blobsize
n.Blobsize, err = ReadBlobSizeAttr(nodePath)
n.Blobsize, err = ReadBlobSizeAttr(nodePath + revisionSuffix)
switch {
case xattrs.IsNotExist(err):
return n, nil // swallow not found, the node defaults to exists = false
Expand Down Expand Up @@ -495,25 +513,20 @@ func calculateEtag(nodeID string, tmTime time.Time) (string, error) {
return fmt.Sprintf(`"%x"`, h.Sum(nil)), nil
}

// SetMtime sets the mtime and atime of a node
func (n *Node) SetMtime(ctx context.Context, mtime string) error {
sublog := appctx.GetLogger(ctx).With().Interface("node", n).Logger()
if mt, err := parseMTime(mtime); err == nil {
nodePath := n.InternalPath()
// updating mtime also updates atime
if err := os.Chtimes(nodePath, mt, mt); err != nil {
sublog.Error().Err(err).
Time("mtime", mt).
Msg("could not set mtime")
return errors.Wrap(err, "could not set mtime")
}
} else {
sublog.Error().Err(err).
Str("mtime", mtime).
Msg("could not parse mtime")
return errors.Wrap(err, "could not parse mtime")
// SetMtimeString sets the mtime and atime of a node to the unixtime parsed from the given string
func (n *Node) SetMtimeString(mtime string) error {
mt, err := parseMTime(mtime)
if err != nil {
return err
}
return nil
return n.SetMtime(mt)
}

// SetMtime sets the mtime and atime of a node
func (n *Node) SetMtime(mtime time.Time) error {
nodePath := n.InternalPath()
// updating mtime also updates atime
return os.Chtimes(nodePath, mtime, mtime)
}

// SetEtag sets the temporary etag of a node if it differs from the current etag
Expand Down Expand Up @@ -929,6 +942,15 @@ func (n *Node) SetTreeSize(ts uint64) (err error) {
return n.SetXattr(xattrs.TreesizeAttr, strconv.FormatUint(ts, 10))
}

// GetBlobSize reads the blobsize from the extended attributes
func (n *Node) GetBlobSize() (treesize uint64, err error) {
var b string
if b, err = n.Xattr(xattrs.TreesizeAttr); err != nil {
return
}
return strconv.ParseUint(b, 10, 64)
}

// SetChecksum writes the checksum with the given checksum type to the extended attributes
func (n *Node) SetChecksum(csType string, h hash.Hash) (err error) {
return n.SetXattr(xattrs.ChecksumPrefix+csType, string(h.Sum(nil)))
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/utils/decomposedfs/node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,18 +89,18 @@ var _ = Describe("Node", func() {
n, err := env.Lookup.NodeFromResource(env.Ctx, ref)
Expect(err).ToNot(HaveOccurred())

blobsize := 239485734
blobsize := int64(239485734)
n.Name = "TestName"
n.BlobID = "TestBlobID"
n.Blobsize = int64(blobsize)
n.Blobsize = blobsize

err = n.WriteAllNodeMetadata()
Expect(err).ToNot(HaveOccurred())
n2, err := env.Lookup.NodeFromResource(env.Ctx, ref)
Expect(err).ToNot(HaveOccurred())
Expect(n2.Name).To(Equal("TestName"))
Expect(n2.BlobID).To(Equal("TestBlobID"))
Expect(n2.Blobsize).To(Equal(int64(blobsize)))
Expect(n2.Blobsize).To(Equal(blobsize))
})
})

Expand Down
13 changes: 13 additions & 0 deletions pkg/storage/utils/decomposedfs/node/permissions.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ package node

import (
"context"
"strings"

provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/v2/pkg/appctx"
ctxpkg "github.com/cs3org/reva/v2/pkg/ctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/utils"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -91,6 +93,17 @@ func (p *Permissions) AssemblePermissions(ctx context.Context, n *Node) (ap prov
return NoPermissions(), nil
}

// are we reading a revision?
if strings.Contains(n.ID, RevisionIDDelimiter) {
// verify revision key format
kp := strings.SplitN(n.ID, RevisionIDDelimiter, 2)
if len(kp) != 2 {
return NoPermissions(), errtypes.NotFound(n.ID)
}
// use the actual node for the permission assembly
n.ID = kp[0]
}

// check if the current user is the owner
if utils.UserIDEqual(u.Id, n.Owner()) {
return OwnerPermissions(), nil
Expand Down
16 changes: 15 additions & 1 deletion pkg/storage/utils/decomposedfs/node/xattrs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package node

import (
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/xattrs"
"github.com/gofrs/flock"
"github.com/pkg/xattr"
)

Expand All @@ -34,6 +35,18 @@ func (n *Node) SetXattrs(attribs map[string]string) (err error) {
return xattrs.SetMultiple(n.InternalPath(), attribs)
}

// SetXattrsWithLock sets multiple extended attributes on the write-through cache/node with a given lock
func (n *Node) SetXattrsWithLock(attribs map[string]string, fileLock *flock.Flock) (err error) {
// TODO what if writing the lock fails?
if n.xattrsCache != nil {
for k, v := range attribs {
n.xattrsCache[k] = v
}
}

return xattrs.SetMultipleWithLock(n.InternalPath(), attribs, fileLock)
}

// SetXattr sets an extended attribute on the write-through cache/node
func (n *Node) SetXattr(key, val string) (err error) {
if n.xattrsCache != nil {
Expand Down Expand Up @@ -80,5 +93,6 @@ func (n *Node) Xattr(key string) (string, error) {
if val, ok := n.xattrsCache[key]; ok {
return val, nil
}
return "", xattr.ENOATTR
// wrap the error as xattr does
return "", &xattr.Error{Op: "xattr.get", Path: n.InternalPath(), Name: key, Err: xattr.ENOATTR}
}
66 changes: 52 additions & 14 deletions pkg/storage/utils/decomposedfs/revisions.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ package decomposedfs
import (
"context"
"io"
iofs "io/fs"
"os"
"path/filepath"
"strings"
Expand All @@ -31,6 +30,7 @@ import (
"github.com/cs3org/reva/v2/pkg/appctx"
"github.com/cs3org/reva/v2/pkg/errtypes"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node"
"github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/xattrs"
"github.com/cs3org/reva/v2/pkg/storagespace"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -83,7 +83,7 @@ func (fs *Decomposedfs) ListRevisions(ctx context.Context, ref *provider.Referen
}
blobSize, err := node.ReadBlobSizeAttr(items[i])
if err != nil {
return nil, errors.Wrapf(err, "error reading blobsize xattr")
appctx.GetLogger(ctx).Error().Err(err).Str("name", fi.Name()).Msg("error reading blobsize xattr, using 0")
}
rev.Size = uint64(blobSize)
etag, err := node.CalculateEtag(np, mtime)
Expand All @@ -99,6 +99,7 @@ func (fs *Decomposedfs) ListRevisions(ctx context.Context, ref *provider.Referen
}

// DownloadRevision returns a reader for the specified revision
// FIXME the CS3 api should explicitly allow initiating revision and trash download, a related issue is https://github.com/cs3org/reva/issues/1813
func (fs *Decomposedfs) DownloadRevision(ctx context.Context, ref *provider.Reference, revisionKey string) (io.ReadCloser, error) {
log := appctx.GetLogger(ctx)

Expand All @@ -125,7 +126,7 @@ func (fs *Decomposedfs) DownloadRevision(ctx context.Context, ref *provider.Refe
switch {
case err != nil:
return nil, errtypes.InternalError(err.Error())
case !rp.ListFileVersions || !rp.RestoreFileVersion || !rp.InitiateFileDownload: // TODO add explicit permission in the CS3 api?
case !rp.ListFileVersions || !rp.InitiateFileDownload: // TODO add explicit permission in the CS3 api?
f, _ := storagespace.FormatReference(ref)
if rp.Stat {
return nil, errtypes.PermissionDenied(f)
Expand All @@ -135,14 +136,18 @@ func (fs *Decomposedfs) DownloadRevision(ctx context.Context, ref *provider.Refe

contentPath := fs.lu.InternalPath(spaceID, revisionKey)

r, err := os.Open(contentPath)
blobid, err := node.ReadBlobIDAttr(contentPath)
if err != nil {
if errors.Is(err, iofs.ErrNotExist) {
return nil, errtypes.NotFound(contentPath)
}
return nil, errors.Wrap(err, "Decomposedfs: error opening revision "+revisionKey)
return nil, errors.Wrapf(err, "Decomposedfs: could not read blob id of revision '%s' for node '%s'", n.ID, revisionKey)
}
return r, nil

revisionNode := node.Node{SpaceID: spaceID, BlobID: blobid}

reader, err := fs.tp.ReadBlob(&revisionNode)
if err != nil {
return nil, errors.Wrapf(err, "Decomposedfs: could not download blob of revision '%s' for node '%s'", n.ID, revisionKey)
}
return reader, nil
}

// RestoreRevision restores the specified revision of the resource
Expand Down Expand Up @@ -194,17 +199,50 @@ func (fs *Decomposedfs) RestoreRevision(ctx context.Context, ref *provider.Refer
// versions are stored alongside the actual file, so a rename can be efficient and does not cross storage / partition boundaries
versionsPath := fs.lu.InternalPath(spaceID, kp[0]+node.RevisionIDDelimiter+fi.ModTime().UTC().Format(time.RFC3339Nano))

err = os.Rename(nodePath, versionsPath)
// touch version node
if file, err := os.Create(versionsPath); err != nil {
return err
} else if err := file.Close(); err != nil {
return err
}

// copy blob metadata to version node
err = xattrs.CopyMetadata(nodePath, versionsPath, func(attributeName string) bool {
return strings.HasPrefix(attributeName, xattrs.ChecksumPrefix) || // for checksums
attributeName == xattrs.BlobIDAttr ||
attributeName == xattrs.BlobsizeAttr
})
if err != nil {
return
return errtypes.InternalError("failed to copy blob xattrs to version node")
}

// copy old revision to current location
// keep mtime from previous version
if err := os.Chtimes(versionsPath, fi.ModTime(), fi.ModTime()); err != nil {
return errtypes.InternalError("failed to change mtime of version node")
}

// update blob id in node

// copy blob metadata from revision to node
revisionPath := fs.lu.InternalPath(spaceID, revisionKey)
err = xattrs.CopyMetadata(revisionPath, nodePath, func(attributeName string) bool {
return strings.HasPrefix(attributeName, xattrs.ChecksumPrefix) ||
attributeName == xattrs.BlobIDAttr ||
attributeName == xattrs.BlobsizeAttr
})
if err != nil {
return errtypes.InternalError("failed to copy blob xattrs to version node")
}

// drop old revision
if err := os.Remove(revisionPath); err != nil {
log.Warn().Err(err).Interface("ref", ref).Str("originalnode", kp[0]).Str("revisionKey", revisionKey).Msg("could not delete old revision, continuing")
}

if err = os.Rename(revisionPath, nodePath); err != nil {
return
// explicitly update mtime of node as writing xattrs does not change mtime
now := time.Now()
if err := os.Chtimes(nodePath, now, now); err != nil {
return errtypes.InternalError("failed to change mtime of version node")
}

return fs.tp.Propagate(ctx, n)
Expand Down
Loading