Skip to content

Commit

Permalink
Add support for executing agent-local workloads
Browse files Browse the repository at this point in the history
Workload deploy location can be specified with file:// or nats://
schemes. File-based deploy locations are treated as "agent-local"
workload deploy requests, meaning the agent will attempt to resolve the
file as an executable installed on its rootfs. File-based workload
deploy requests can only be executed in sandbox mode.
  • Loading branch information
kthomas committed Aug 5, 2024
1 parent 9b94b23 commit 501ce3c
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 79 deletions.
87 changes: 63 additions & 24 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ const (
runloopSleepInterval = 250 * time.Millisecond
runloopTickInterval = 2500 * time.Millisecond
workloadExecutionSleepTimeoutMillis = 50
workloadLocationSchemeFile = "file"
workloadLocationSchemeNATS = "nats"
)

// Agent facilitates communication between the nex agent running in the firecracker VM
Expand Down Expand Up @@ -171,33 +173,70 @@ func (a *Agent) Version() string {
return VERSION
}

// cacheExecutableArtifact uses the underlying agent configuration to fetch
// the executable workload artifact from the cache bucket, write it to a
// temporary file and make it executable; this method returns the full
// path to the cached artifact if successful
func (a *Agent) cacheExecutableArtifact(req *agentapi.AgentWorkloadInfo) (*string, error) {
fileName := fmt.Sprintf("workload-%s", *a.md.VmID)
tempFile := path.Join(os.TempDir(), fileName)
// resolveExecutableArtifact uses the underlying agent configuration and
// location specified in the workload deploy request to prepare the workload
// artifact for execution by the agent.
//
// in the case of the workload artifact being deployed from a nats:// location
// this method fetches the workload artifact from the object store bucket,
// writes it to a temporary file and makes it executable.
//
// in the case of the workload artifact being deployed from a file:// location,
// this method verifies that the specified file exists on the agent rootfs and
// is executable.
func (a *Agent) resolveExecutableArtifact(req *agentapi.AgentWorkloadInfo) (*string, error) {
var file *string

switch strings.ToLower(req.Location.Scheme) {
case workloadLocationSchemeFile:
if !isSandboxed() {
return nil, fmt.Errorf("attempted to execute agent-local workload artifact %s outside of sandbox ", req.Location.String())
}

if strings.EqualFold(runtime.GOOS, "windows") && req.WorkloadType == controlapi.NexWorkloadNative {
tempFile = fmt.Sprintf("%s.exe", tempFile)
}
path, _ := strings.CutPrefix(req.Location.String(), fmt.Sprintf("%s://", workloadLocationSchemeFile))

err := a.cacheBucket.GetFile(*a.md.VmID, tempFile)
if err != nil {
msg := fmt.Sprintf("Failed to get and write workload artifact to temp dir: %s", err)
a.submitLog(msg, slog.LevelError)
return nil, errors.New(msg)
}
fi, err := os.Stat(path)
if os.IsNotExist(err) {
return nil, fmt.Errorf("attempted to execute agent-local workload artifact %s; file does not exist: %s", path, err.Error())
}

err = os.Chmod(tempFile, 0777)
if err != nil {
msg := fmt.Sprintf("Failed to set workload artifact as executable: %s", err)
a.submitLog(msg, slog.LevelError)
return nil, errors.New(msg)
if err != nil {
return nil, fmt.Errorf("failed to stat agent-local workload artifact %s; %s", path, err.Error())
}

if fi.IsDir() {
return nil, fmt.Errorf("failed to execute agent-local workload artifact; %s is a directory", path)
} else if fi.Mode()&0111 == 0 {
return nil, fmt.Errorf("failed to execute agent-local workload artifact; %s is not executable", path)
}

file = &path
case workloadLocationSchemeNATS:
fileName := fmt.Sprintf("workload-%s", *a.md.VmID)
tempFile := path.Join(os.TempDir(), fileName)

if strings.EqualFold(runtime.GOOS, "windows") && req.WorkloadType == controlapi.NexWorkloadNative {
tempFile = fmt.Sprintf("%s.exe", tempFile)
}

err := a.cacheBucket.GetFile(*a.md.VmID, tempFile)
if err != nil {
msg := fmt.Sprintf("Failed to get and write workload artifact to temp dir: %s", err)
a.submitLog(msg, slog.LevelError)
return nil, errors.New(msg)
}

err = os.Chmod(tempFile, 0777)
if err != nil {
msg := fmt.Sprintf("Failed to set workload artifact as executable: %s", err)
a.submitLog(msg, slog.LevelError)
return nil, errors.New(msg)
}

file = &tempFile
}

return &tempFile, nil
return file, nil
}

// deleteExecutableArtifact deletes the installed workload executable
Expand Down Expand Up @@ -275,13 +314,13 @@ func (a *Agent) handleDeploy(m *nats.Msg) {
return
}

tmpFile, err := a.cacheExecutableArtifact(&info)
path, err := a.resolveExecutableArtifact(&info)
if err != nil {
_ = a.workAck(m, false, err.Error())
return
}

params, err := a.newExecutionProviderParams(&info, *tmpFile)
params, err := a.newExecutionProviderParams(&info, *path)
if err != nil {
_ = a.workAck(m, false, err.Error())
return
Expand Down
12 changes: 12 additions & 0 deletions control-api/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"net/url"
"regexp"
"strings"
"time"

"github.com/google/uuid"
Expand All @@ -16,6 +17,9 @@ import (

const (
workloadRegex = `^[a-zA-Z0-9_-]+$`

WorkloadLocationSchemeFile = "file"
WorkloadLocationSchemeNATS = "nats"
)

type DeployRequest struct {
Expand Down Expand Up @@ -133,6 +137,14 @@ func (request *DeployRequest) Validate() (*jwt.GenericClaims, error) {
return nil, errors.New("artifact hash claim does not match request")
}

if request.Essential != nil && *request.Essential && request.WorkloadType != NexWorkloadNative {
return nil, errors.New("essential workloads must be native")
}

if !strings.EqualFold(request.Location.Scheme, WorkloadLocationSchemeFile) && !strings.EqualFold(request.Location.Scheme, WorkloadLocationSchemeNATS) {
return nil, errors.New("workload location scheme invalid")
}

var vr jwt.ValidationResults
claims.Validate(&vr)
if len(vr.Issues) > 0 || len(vr.Errors()) > 0 {
Expand Down
27 changes: 18 additions & 9 deletions internal/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ import (
)

const (
systemNamespace = "system"
agentPoolRetryMax = 100
autostartAgentRetryMax = 10
heartbeatInterval = 30 * time.Second
publicNATSServerStartTimeout = 50 * time.Millisecond
runloopSleepInterval = 100 * time.Millisecond
runloopTickInterval = 2500 * time.Millisecond
agentPoolRetryMax = 100
systemNamespace = "system"
)

// Nex node process
Expand Down Expand Up @@ -343,14 +344,16 @@ func (n *Node) handleAutostarts() {
time.Sleep(50 * time.Millisecond)

retry += 1
if retry > agentPoolRetryMax {
n.log.Error("Exceeded warm agent retrieval retry count, terminating node",
slog.Int("allowed_retries", agentPoolRetryMax),
if retry > autostartAgentRetryMax {
n.log.Error("Exceeded warm agent retrieval retry count during attempted autostart; terminating node",
slog.Int("allowed_retries", autostartAgentRetryMax),
)

n.shutdown()
return
}

time.Sleep(time.Millisecond * 50)
}
}

Expand Down Expand Up @@ -425,7 +428,7 @@ func (n *Node) handleAutostarts() {

_, err = request.Validate()
if err != nil {
n.log.Error("Failed to validate autostart deployment request",
n.log.Error("Failed to validate autostart workload deploy request",
slog.Any("error", err),
)
agentClient.MarkUnselected()
Expand All @@ -445,9 +448,15 @@ func (n *Node) handleAutostarts() {
continue
}

agentDeployRequest := agentWorkloadInfoFromControlDeployRequest(request, autostart.Namespace, numBytes, *workloadHash)
var hash string
if workloadHash != nil {
hash = *workloadHash // HACK!!! for agent-local workloads, this should be read from the release manifest
}

agentDeployRequest := agentWorkloadInfoFromControlDeployRequest(request, autostart.Namespace, numBytes, hash)

agentDeployRequest.TotalBytes = int64(numBytes)
agentDeployRequest.Hash = *workloadHash
agentDeployRequest.Hash = hash

err = n.manager.DeployWorkload(agentClient, agentDeployRequest)
if err != nil {
Expand All @@ -470,7 +479,7 @@ func (n *Node) handleAutostarts() {
}

if successCount < len(n.config.AutostartConfiguration.Workloads) {
n.log.Error("Not all startup workloads suceeded",
n.log.Error("Failed to initialize autostart workloads",
slog.Int("expected", len(n.config.AutostartConfiguration.Workloads)),
slog.Int("actual", successCount),
)
Expand Down
109 changes: 63 additions & 46 deletions internal/node/workload_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,63 +155,80 @@ func (w *WorkloadManager) Start() {
}

func (m *WorkloadManager) CacheWorkload(workloadID string, request *controlapi.DeployRequest) (uint64, *string, error) {
bucket := request.Location.Host
key := strings.Trim(request.Location.Path, "/")
switch strings.ToLower(request.Location.Scheme) {
case controlapi.WorkloadLocationSchemeFile:
if m.config.NoSandbox {
return 0, nil, fmt.Errorf("Attempted to deploy agent-local workload artifact outside of sandbox")
}

jsLogAttr := []any{slog.String("bucket", bucket), slog.String("key", key)}
opts := []nats.JSOpt{}
if request.JsDomain != nil {
opts = append(opts, nats.Domain(*request.JsDomain))
return 0, nil, nil
case controlapi.WorkloadLocationSchemeNATS:
bucket := request.Location.Host
key := strings.Trim(request.Location.Path, "/")

jsLogAttr = append(jsLogAttr, slog.String("jsdomain", *request.JsDomain))
}
jsLogAttr := []any{slog.String("bucket", bucket), slog.String("key", key)}
opts := []nats.JSOpt{}
if request.JsDomain != nil {
opts = append(opts, nats.Domain(*request.JsDomain))

m.log.Info("Attempting object store download", jsLogAttr...)
jsLogAttr = append(jsLogAttr, slog.String("jsdomain", *request.JsDomain))
}

js, err := m.nc.JetStream(opts...)
if err != nil {
return 0, nil, err
}
m.log.Info("Attempting object store download", jsLogAttr...)

store, err := js.ObjectStore(bucket)
if err != nil {
m.log.Error("Failed to bind to source object store", slog.Any("err", err), slog.String("bucket", bucket))
return 0, nil, err
}
js, err := m.nc.JetStream(opts...)
if err != nil {
return 0, nil, err
}

_, err = store.GetInfo(key)
if err != nil {
m.log.Error("Failed to locate workload binary in source object store", slog.Any("err", err), slog.String("key", key), slog.String("bucket", bucket))
return 0, nil, err
}
store, err := js.ObjectStore(bucket)
if err != nil {
m.log.Error("Failed to bind to source object store", slog.Any("err", err), slog.String("bucket", bucket))
return 0, nil, err
}

started := time.Now()
workload, err := store.GetBytes(key)
if err != nil {
m.log.Error("Failed to download bytes from source object store", slog.Any("err", err), slog.String("key", key))
return 0, nil, err
}
finished := time.Since(started)
dlRate := float64(len(workload)) / finished.Seconds()
m.log.Debug("CacheWorkload object store download completed", slog.String("name", key), slog.String("duration", fmt.Sprintf("%.2f sec", finished.Seconds())), slog.String("rate", fmt.Sprintf("%s/sec", byteConvert(dlRate))))
_, err = store.GetInfo(key)
if err != nil {
m.log.Error("Failed to locate workload binary in source object store", slog.Any("err", err), slog.String("key", key), slog.String("bucket", bucket))
return 0, nil, err
}

err = m.natsint.StoreFileForID(workloadID, workload)
if err != nil {
m.log.Error("Failed to store bytes from source object store in cache", slog.Any("err", err), slog.String("key", key))
}
started := time.Now()
workload, err := store.GetBytes(key)
if err != nil {
m.log.Error("Failed to download bytes from source object store", slog.Any("err", err), slog.String("key", key))
return 0, nil, err
}

workloadHash := sha256.New()
workloadHash.Write(workload)
workloadHashString := hex.EncodeToString(workloadHash.Sum(nil))
finished := time.Since(started)
dlRate := float64(len(workload)) / finished.Seconds()

m.log.Info("Successfully stored workload in internal object store",
slog.String("workload_name", *request.WorkloadName),
slog.String("workload_id", workloadID),
slog.String("workload_hash", workloadHashString),
slog.Int("bytes", len(workload)),
)
m.log.Debug("CacheWorkload object store download completed",
slog.String("name", key),
slog.String("duration", fmt.Sprintf("%.2f sec", finished.Seconds())),
slog.String("rate", fmt.Sprintf("%s/sec", byteConvert(dlRate))),
)

err = m.natsint.StoreFileForID(workloadID, workload)
if err != nil {
m.log.Error("Failed to store bytes from source object store in cache", slog.Any("err", err), slog.String("key", key))
}

workloadHash := sha256.New()
workloadHash.Write(workload)
workloadHashString := hex.EncodeToString(workloadHash.Sum(nil))

m.log.Info("Successfully stored workload in internal object store",
slog.String("workload_name", *request.WorkloadName),
slog.String("workload_id", workloadID),
slog.String("workload_hash", workloadHashString),
slog.Int("bytes", len(workload)),
)

return uint64(len(workload)), &workloadHashString, nil
}

return uint64(len(workload)), &workloadHashString, nil
return 0, nil, fmt.Errorf("Unsupported scheme specified for workload location")
}

// Deploy a workload as specified by the given deploy request to an available
Expand Down

0 comments on commit 501ce3c

Please sign in to comment.