Skip to content

Commit

Permalink
Support file operations for FUSE
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex committed Nov 3, 2022
1 parent c952df9 commit c5df57a
Show file tree
Hide file tree
Showing 6 changed files with 376 additions and 20 deletions.
7 changes: 7 additions & 0 deletions pkg/dataplane/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,4 +159,11 @@ type Container interface {

// PutOOSObjectSync
PutOOSObjectSync(*PutOOSObjectInput) error

GetFileAttributesSync(*GetFileAttributesInput, *GetFileAttributesOutput) error
OpenFileSync(*OpenFileInput, *OpenFileOutput) error
CloseFileSync(*CloseFileInput) error
TruncateFileSync(*TruncateFileInput) error
SymlinkSync(*SymlinkInput) error
GetWorkerDedicatedPortsSync(*DataPlaneInput) ([]string, error)
}
31 changes: 30 additions & 1 deletion pkg/dataplane/http/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ such restriction.
package v3iohttp

import (
"github.com/v3io/v3io-go/pkg/dataplane"
v3io "github.com/v3io/v3io-go/pkg/dataplane"

"github.com/nuclio/logger"
)
Expand Down Expand Up @@ -314,3 +314,32 @@ func (c *container) PutOOSObjectSync(putOOSObjectInput *v3io.PutOOSObjectInput)
c.populateInputFields(&putOOSObjectInput.DataPlaneInput)
return c.session.context.PutOOSObjectSync(putOOSObjectInput)
}

func (c *container) GetFileAttributesSync(input *v3io.GetFileAttributesInput, out *v3io.GetFileAttributesOutput) error {
c.populateInputFields(&input.DataPlaneInput)
return c.session.context.GetFileAttributesSync(input, out)
}

func (c *container) OpenFileSync(input *v3io.OpenFileInput, out *v3io.OpenFileOutput) error {
c.populateInputFields(&input.DataPlaneInput)
return c.session.context.OpenFileSync(input, out)
}

func (c *container) CloseFileSync(input *v3io.CloseFileInput) error {
c.populateInputFields(&input.DataPlaneInput)
return c.session.context.CloseFileSync(input)
}

func (c *container) TruncateFileSync(input *v3io.TruncateFileInput) error {
c.populateInputFields(&input.DataPlaneInput)
return c.session.context.TruncateFileSync(input)
}

func (c *container) SymlinkSync(input *v3io.SymlinkInput) error {
c.populateInputFields(&input.DataPlaneInput)
return c.session.context.SymlinkSync(input)
}

func (c *container) GetWorkerDedicatedPortsSync(in *v3io.DataPlaneInput) ([]string, error) {
return c.session.context.GetWorkerDedicatedPortsSync(in)
}
249 changes: 238 additions & 11 deletions pkg/dataplane/http/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,18 @@ import (
"encoding/xml"
"fmt"
"io"
"math/rand"
"net"
"net/http"
"net/url"
"os"
"path"
"reflect"
"regexp"
"strconv"
"strings"
"sync/atomic"
"syscall"
"time"

v3io "github.com/v3io/v3io-go/pkg/dataplane"
Expand Down Expand Up @@ -541,7 +544,10 @@ func (c *context) GetObjectSync(getObjectInput *v3io.GetObjectInput) (*v3io.Resp
// Range header is inclusive in both 'start' and 'end', thus reducing 1
headers["Range"] = fmt.Sprintf("bytes=%v-%v", getObjectInput.Offset, getObjectInput.Offset+getObjectInput.NumBytes-1)
}

if getObjectInput.Handle.Fh != "" {
headers["x-v3io-handle"] = getObjectInput.Handle.Fh
getObjectInput.DataPlaneInput.URLAlternativePorts = []string{getObjectInput.Handle.URLPort}
}
if getObjectInput.CtimeSec > 0 {
if headers == nil {
headers = make(map[string]string)
Expand Down Expand Up @@ -569,12 +575,22 @@ func (c *context) PutObject(putObjectInput *v3io.PutObjectInput,
// PutObjectSync
func (c *context) PutObjectSync(putObjectInput *v3io.PutObjectInput) error {

var headers map[string]string
headers := make(map[string]string)
if putObjectInput.Offset != 0 {
headers["Range"] = strconv.Itoa(putObjectInput.Offset)
}
if putObjectInput.Append {
headers = make(map[string]string)
headers["Range"] = "-1"
}

if putObjectInput.Mode != 0 {
headers["fs-open-mode"] = fmt.Sprintf("%d", putObjectInput.Mode)
}
if putObjectInput.Handle.Fh != "" {
headers["x-v3io-handle"] = putObjectInput.Handle.Fh
putObjectInput.DataPlaneInput.URLAlternativePorts = []string{putObjectInput.Handle.URLPort}
}

_, err := c.sendRequest(&putObjectInput.DataPlaneInput,
http.MethodPut,
putObjectInput.Path,
Expand All @@ -589,7 +605,12 @@ func (c *context) PutObjectSync(putObjectInput *v3io.PutObjectInput) error {
// UpdateObjectSync
func (c *context) UpdateObjectSync(updateObjectInput *v3io.UpdateObjectInput) error {
headers := map[string]string{
"X-v3io-function": "DirSetAttr",
"X-v3io-function": "DirSetAttrs",
}

if updateObjectInput.Handle.Fh != "" {
headers["x-v3io-handle"] = updateObjectInput.Handle.Fh
updateObjectInput.DataPlaneInput.URLAlternativePorts = []string{updateObjectInput.Handle.URLPort}
}

marshaledDirAttributes, err := json.Marshal(updateObjectInput.DirAttributes)
Expand Down Expand Up @@ -1064,7 +1085,7 @@ func (c *context) sendRequest(dataPlaneInput *v3io.DataPlaneInput,
request := fasthttp.AcquireRequest()
response := c.allocateResponse()

uri, err := c.buildRequestURI(dataPlaneInput.URL, dataPlaneInput.ContainerName, query, path)
uri, err := c.buildRequestURI(dataPlaneInput.URL, dataPlaneInput.URLAlternativePorts, dataPlaneInput.ContainerName, query, path)
if err != nil {
return nil, err
}
Expand All @@ -1079,9 +1100,12 @@ func (c *context) sendRequest(dataPlaneInput *v3io.DataPlaneInput,
if len(dataPlaneInput.AuthenticationToken) > 0 {
request.Header.Set("Authorization", dataPlaneInput.AuthenticationToken)
}

if len(dataPlaneInput.AccessKey) > 0 {
request.Header.Set("X-v3io-session-key", dataPlaneInput.AccessKey)
if strings.Contains(dataPlaneInput.AccessKey, "Basic") {
request.Header.Set("Authorization", dataPlaneInput.AccessKey)
} else {
if len(dataPlaneInput.AccessKey) > 0 {
request.Header.Set("X-v3io-session-key", dataPlaneInput.AccessKey)
}
}

for headerName, headerValue := range headers {
Expand Down Expand Up @@ -1144,13 +1168,25 @@ func (c *context) sendRequest(dataPlaneInput *v3io.DataPlaneInput,
sanitizedRequest := re.ReplaceAllString(request.String(), "X-V3io-Session-Key: SANITIZED")
_err := fmt.Errorf("Expected a 2xx response status code: %s\nRequest details:\n%s",
response.HTTPResponse.String(), sanitizedRequest)
responseBodyParsed := struct {
ErrorCode int
}{}
var errnoCode int
if json.Unmarshal(response.Body(), &responseBodyParsed) == nil {
errnoCode = -responseBodyParsed.ErrorCode
if errnoCode == 0 || errnoCode > 300 {
errnoCode = int(syscall.EINVAL)
}
} else {
errnoCode = int(syscall.EINVAL)
}

// Include response in error only if caller has requested it
// Otherwise it will be released automatically
if dataPlaneInput.IncludeResponseInError {
err = v3ioerrors.NewErrorWithStatusCodeAndResponse(_err, statusCode, response)
err = v3ioerrors.NewErrorWithStatusCodeAndResponse(_err, statusCode, errnoCode, response)
} else {
err = v3ioerrors.NewErrorWithStatusCode(_err, statusCode)
err = v3ioerrors.NewErrorWithStatusCode(_err, statusCode, errnoCode)
}

goto cleanup
Expand Down Expand Up @@ -1178,7 +1214,7 @@ cleanup:
return response, nil
}

func (c *context) buildRequestURI(urlString string, containerName string, query string, pathStr string) (*url.URL, error) {
func (c *context) buildRequestURI(urlString string, alternativePorts []string, containerName string, query string, pathStr string) (*url.URL, error) {
uri, err := url.Parse(urlString)
if err != nil {
return nil, errors.Wrapf(err, "Failed to parse cluster endpoint URL %s", urlString)
Expand All @@ -1188,6 +1224,11 @@ func (c *context) buildRequestURI(urlString string, containerName string, query
uri.Path += "/" // retain trailing slash
}
uri.RawQuery = strings.Replace(query, " ", "%20", -1)
if len(alternativePorts) != 0 {
alternativePort := alternativePorts[rand.Intn(len(alternativePorts))]
uri.Host = uri.Hostname() + ":" + alternativePort
}

return uri, nil
}

Expand Down Expand Up @@ -1706,6 +1747,26 @@ func trimAndParseInt(str string) (int, error) {
return strconv.Atoi(trimmed)
}

func trimAndParseUInt(str string) (uint64, error) {
trimmed := strings.TrimSpace(str)
return strconv.ParseUint(trimmed, 10, 64)
}

func trimAndParsePairUInt(str string) (uint64, uint64, error) {
var err error
var firstInt, secondInt uint64
parts := strings.Split(str, ",")
firstInt, err = trimAndParseUInt(parts[0])
if err != nil {
return 0, 0, err
}
secondInt, err = trimAndParseUInt(parts[1])
if err != nil {
return 0, 0, err
}
return firstInt, secondInt, nil
}

// PutOOSObject
func (c *context) PutOOSObject(putOOSObjectInput *v3io.PutOOSObjectInput,
context interface{},
Expand Down Expand Up @@ -1755,3 +1816,169 @@ func (c *context) PutOOSObjectSync(putOOSObjectInput *v3io.PutOOSObjectInput) er

return err
}

// checkPathExistsSync
func (c *context) GetFileAttributesSync(getAttrInput *v3io.GetFileAttributesInput, out *v3io.GetFileAttributesOutput) error {
var headers map[string]string
if getAttrInput.Handle.Fh != "" {
headers = map[string]string{
"x-v3io-handle": getAttrInput.Handle.Fh,
}
getAttrInput.DataPlaneInput.URLAlternativePorts = []string{getAttrInput.Handle.URLPort}

}
response, err := c.sendRequest(&getAttrInput.DataPlaneInput,
http.MethodHead,
getAttrInput.Path,
"",
headers,
nil,
false)

if err == nil {
out.Ino, _ = trimAndParseUInt(string(response.HeaderPeek("x-v3io-inode")))
out.Size, _ = trimAndParseUInt(string(response.HeaderPeek("Content-Length")))
out.Atime, out.Atimensec, _ = trimAndParsePairUInt(string(response.HeaderPeek("x-v3io-atime")))
out.Ctime, out.Ctimensec, _ = trimAndParsePairUInt(string(response.HeaderPeek("x-v3io-ctime")))
out.Mtime, out.Mtimensec, _ = trimAndParsePairUInt(string(response.HeaderPeek("x-v3io-mtime")))
out.Mode, _ = trimAndParseUInt(string(response.HeaderPeek("x-v3io-mode")))
out.OwnerUid, _ = trimAndParseUInt(string(response.HeaderPeek("x-v3io-uid")))
out.OwnerGid, _ = trimAndParseUInt(string(response.HeaderPeek("x-v3io-gid")))
response.Release()
}
return err
}

func (c *context) OpenFileSync(input *v3io.OpenFileInput, out *v3io.OpenFileOutput) error {
openFlags := 0
if (int(input.Flags) & 0xF) == os.O_RDONLY {
openFlags = 1
}
if (int(input.Flags) & 0xF) == os.O_WRONLY {
openFlags = 2
}
if (int(input.Flags) & 0xF) == os.O_RDWR {
openFlags = 3
}
if input.Flags&uint32(os.O_TRUNC) != 0 {
openFlags |= 4 // Truncate
}

createFlags := 0 // No creation
if (input.Flags & uint32(os.O_CREATE)) != 0 {
if (input.Flags & uint32(os.O_EXCL)) != 0 {
createFlags = 1
} else {
createFlags = 2
}
if openFlags < 2 {
openFlags = 2
}
}

headers := map[string]string{
"Content-Type": OpenFileHeaders["Content-Type"],
"X-v3io-function": OpenFileHeaders["X-v3io-function"],
"fs-open-mode": fmt.Sprintf("%d", input.Mode),
"fs-open-flags": fmt.Sprintf("%d", openFlags),
"fs-creation": fmt.Sprintf("%d", createFlags),
}
var alternativePort string
if len(input.DataPlaneInput.URLAlternativePorts) != 0 {
alternativePort = input.DataPlaneInput.URLAlternativePorts[rand.Intn(len(input.DataPlaneInput.URLAlternativePorts))]
input.DataPlaneInput.URLAlternativePorts = []string{alternativePort}
}

response, err := c.sendRequest(&input.DataPlaneInput,
http.MethodPut,
input.Path,
"",
headers,
nil,
false)
if err == nil {
out.Handle = v3io.FileHandle{
Fh: string(response.HeaderPeek("x-v3io-handle")),
URLPort: alternativePort,
}
response.Release()
}
return err
}

func (c *context) CloseFileSync(input *v3io.CloseFileInput) error {
headers := map[string]string{
"x-v3io-handle": input.Handle.Fh,
"Content-Type": CloseFileHeaders["Content-Type"],
"X-v3io-function": CloseFileHeaders["X-v3io-function"],
}
input.DataPlaneInput.URLAlternativePorts = []string{input.Handle.URLPort}

_, err := c.sendRequest(&input.DataPlaneInput,
http.MethodPut,
input.Handle.Fh,
"",
headers,
nil,
true)

return err
}

func (c *context) TruncateFileSync(input *v3io.TruncateFileInput) error {
headers := map[string]string{
"x-v3io-handle": input.Handle.Fh,
"Content-Type": TruncateFileHeaders["Content-Type"],
"X-v3io-function": TruncateFileHeaders["X-v3io-function"],
"range": fmt.Sprintf("%v", input.Size),
}
input.DataPlaneInput.URLAlternativePorts = []string{input.Handle.URLPort}

_, err := c.sendRequest(&input.DataPlaneInput,
http.MethodPut,
input.Handle.Fh,
"",
headers,
nil,
true)
return err
}

func (c *context) SymlinkSync(input *v3io.SymlinkInput) error {
headers := map[string]string{
"Content-Type": SymlinkHeaders["Content-Type"],
"X-v3io-function": SymlinkHeaders["X-v3io-function"],
}
_, err := c.sendRequest(&input.DataPlaneInput,
http.MethodPut,
input.Path,
"",
headers,
[]byte(input.TargetPath),
true)
return err
}

func (c *context) GetWorkerDedicatedPortsSync(in *v3io.DataPlaneInput) ([]string, error) {
response, err := c.sendRequest(in,
http.MethodPut,
"",
"",
nil,
nil,
false)
if err == nil {
responseBodyParsed := struct {
DedicatedWorkerPorts []string
}{}
err = json.Unmarshal(response.Body(), &responseBodyParsed)
response.Release()
if err == nil {
return responseBodyParsed.DedicatedWorkerPorts, nil
} else {
return nil, err
}
}
return nil, err

}
Loading

0 comments on commit c5df57a

Please sign in to comment.