Skip to content

Commit

Permalink
Add support for executing agent-local workloads
Browse files Browse the repository at this point in the history
Workload deploy location can be specified with file:// or nats://
schemes. File-based deploy locations are treated as "agent-local"
workload deploy requests, meaning the agent will attempt to resolve the
file as an executable installed on its rootfs. File-based workload
deploy requests can only be executed in sandbox mode.
  • Loading branch information
kthomas committed Jul 12, 2024
1 parent c90e457 commit 20ce7f5
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 84 deletions.
87 changes: 63 additions & 24 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ const (
runloopSleepInterval = 250 * time.Millisecond
runloopTickInterval = 2500 * time.Millisecond
workloadExecutionSleepTimeoutMillis = 100
workloadLocationSchemeFile = "file"
workloadLocationSchemeNATS = "nats"
)

// Agent facilitates communication between the nex agent running in the firecracker VM
Expand Down Expand Up @@ -168,33 +170,70 @@ func (a *Agent) Version() string {
return VERSION
}

// cacheExecutableArtifact uses the underlying agent configuration to fetch
// the executable workload artifact from the cache bucket, write it to a
// temporary file and make it executable; this method returns the full
// path to the cached artifact if successful
func (a *Agent) cacheExecutableArtifact(req *agentapi.DeployRequest) (*string, error) {
fileName := fmt.Sprintf("workload-%s", *a.md.VmID)
tempFile := path.Join(os.TempDir(), fileName)
// resolveExecutableArtifact uses the underlying agent configuration and
// location specified in the workload deploy request to prepare the workload
// artifact for execution by the agent.
//
// in the case of the workload artifact being deployed from a nats:// location
// this method fetches the workload artifact from the object store bucket,
// writes it to a temporary file and makes it executable.
//
// in the case of the workload artifact being deployed from a file:// location,
// this method verifies that the specified file exists on the agent rootfs and
// is executable.
func (a *Agent) resolveExecutableArtifact(req *agentapi.DeployRequest) (*string, error) {
var file *string

switch strings.ToLower(req.Location.Scheme) {
case workloadLocationSchemeFile:
if !isSandboxed() {
return nil, fmt.Errorf("attempted to execute agent-local workload artifact %s outside of sandbox ", req.Location.String())
}

if strings.EqualFold(runtime.GOOS, "windows") && req.WorkloadType == controlapi.NexWorkloadNative {
tempFile = fmt.Sprintf("%s.exe", tempFile)
}
path, _ := strings.CutPrefix(req.Location.String(), fmt.Sprintf("%s://", workloadLocationSchemeFile))

err := a.cacheBucket.GetFile(*a.md.VmID, tempFile)
if err != nil {
msg := fmt.Sprintf("Failed to get and write workload artifact to temp dir: %s", err)
a.LogError(msg)
return nil, errors.New(msg)
}
fi, err := os.Stat(path)
if os.IsNotExist(err) {
return nil, fmt.Errorf("attempted to execute agent-local workload artifact %s; file does not exist: %s", path, err.Error())
}

err = os.Chmod(tempFile, 0777)
if err != nil {
msg := fmt.Sprintf("Failed to set workload artifact as executable: %s", err)
a.LogError(msg)
return nil, errors.New(msg)
if err != nil {
return nil, fmt.Errorf("failed to stat agent-local workload artifact %s; %s", path, err.Error())
}

if fi.IsDir() {
return nil, fmt.Errorf("failed to execute agent-local workload artifact; %s is a directory", path)
} else if fi.Mode()&0111 == 0 {
return nil, fmt.Errorf("failed to execute agent-local workload artifact; %s is not executable", path)
}

file = &path
case workloadLocationSchemeNATS:
fileName := fmt.Sprintf("workload-%s", *a.md.VmID)
tempFile := path.Join(os.TempDir(), fileName)

if strings.EqualFold(runtime.GOOS, "windows") && req.WorkloadType == controlapi.NexWorkloadNative {
tempFile = fmt.Sprintf("%s.exe", tempFile)
}

err := a.cacheBucket.GetFile(*a.md.VmID, tempFile)
if err != nil {
msg := fmt.Sprintf("Failed to get and write workload artifact to temp dir: %s", err)
a.LogError(msg)
return nil, errors.New(msg)
}

err = os.Chmod(tempFile, 0777)
if err != nil {
msg := fmt.Sprintf("Failed to set workload artifact as executable: %s", err)
a.LogError(msg)
return nil, errors.New(msg)
}

file = &tempFile
}

return &tempFile, nil
return file, nil
}

// deleteExecutableArtifact deletes the installed workload executable
Expand Down Expand Up @@ -269,13 +308,13 @@ func (a *Agent) handleDeploy(m *nats.Msg) {
return
}

tmpFile, err := a.cacheExecutableArtifact(&request)
path, err := a.resolveExecutableArtifact(&request)
if err != nil {
_ = a.workAck(m, false, err.Error())
return
}

params, err := a.newExecutionProviderParams(&request, *tmpFile)
params, err := a.newExecutionProviderParams(&request, *path)
if err != nil {
_ = a.workAck(m, false, err.Error())
return
Expand Down
24 changes: 18 additions & 6 deletions control-api/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,16 @@ import (
"fmt"
"net/url"
"regexp"
"strings"
"time"

"github.com/nats-io/jwt/v2"
"github.com/nats-io/nkeys"
)

const WorkloadLocationSchemeFile = "file"
const WorkloadLocationSchemeNATS = "nats"

type DeployRequest struct {
Argv []string `json:"argv,omitempty"`
Description *string `json:"description,omitempty"`
Expand Down Expand Up @@ -82,17 +86,17 @@ func NewDeployRequest(opts ...RequestOption) (*DeployRequest, error) {
req := &DeployRequest{
Argv: reqOpts.argv,
Description: &reqOpts.workloadDescription,
WorkloadType: reqOpts.workloadType,
Location: &reqOpts.location,
WorkloadJwt: &workloadJwt,
Environment: &encryptedEnv,
Essential: &reqOpts.essential,
HostServicesConfig: reqOpts.hostServicesConfiguration,
JsDomain: &reqOpts.jsDomain,
Location: &reqOpts.location,
SenderPublicKey: &senderPublic,
TargetNode: &reqOpts.targetNode,
TriggerSubjects: reqOpts.triggerSubjects,
JsDomain: &reqOpts.jsDomain,
HostServicesConfig: reqOpts.hostServicesConfiguration,
TriggerConnection: reqOpts.triggerConnection,
TriggerSubjects: reqOpts.triggerSubjects,
WorkloadJwt: &workloadJwt,
WorkloadType: reqOpts.workloadType,
}

return req, nil
Expand All @@ -111,6 +115,14 @@ func (request *DeployRequest) Validate() (*jwt.GenericClaims, error) {
return nil, fmt.Errorf("workload name claim ('%s') does not match requirements (%s)", claims.Subject, workloadRegex)
}

if request.Essential != nil && *request.Essential && request.WorkloadType != NexWorkloadNative {
return nil, errors.New("essential workloads must be native")
}

if !strings.EqualFold(request.Location.Scheme, WorkloadLocationSchemeFile) && !strings.EqualFold(request.Location.Scheme, WorkloadLocationSchemeNATS) {
return nil, errors.New("workload location scheme invalid")
}

var vr jwt.ValidationResults
claims.Validate(&vr)
if len(vr.Issues) > 0 || len(vr.Errors()) > 0 {
Expand Down
1 change: 1 addition & 0 deletions internal/node/controlapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ func (api *ApiListener) handleDeploy(ctx context.Context, span trace.Span, m *na
respondFail(controlapi.RunResponseType, m, "Could not deploy workload, agent pool did not initialize properly")
return
}

workloadName := request.DecodedClaims.Subject
span.SetAttributes(attribute.String("workload_name", workloadName))

Expand Down
35 changes: 21 additions & 14 deletions internal/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@ import (
)

const (
systemNamespace = "system"
agentPoolRetryMax = 100
autostartAgentRetryMax = 10
heartbeatInterval = 30 * time.Second
publicNATSServerStartTimeout = 50 * time.Millisecond
runloopSleepInterval = 100 * time.Millisecond
runloopTickInterval = 2500 * time.Millisecond
agentPoolRetryMax = 100
systemNamespace = "system"
)

// Nex node process
Expand Down Expand Up @@ -339,26 +340,26 @@ func (n *Node) handleAutostarts() {
agentClient, err = n.manager.SelectRandomAgent()
if err != nil {
n.log.Warn("Failed to resolve agent for autostart", slog.String("error", err.Error()))
time.Sleep(50 * time.Millisecond)

retry += 1
if retry > agentPoolRetryMax {
n.log.Error("Exceeded warm agent retrieval retry count, terminating node",
slog.Int("allowed_retries", agentPoolRetryMax),
if retry > autostartAgentRetryMax {
n.log.Error("Exceeded warm agent retrieval retry count during attempted autostart; terminating node",
slog.Int("allowed_retries", autostartAgentRetryMax),
)

n.shutdown()
return
}

time.Sleep(time.Millisecond * 50)
}
}

// functions cannot be essential
essential := autostart.Essential && autostart.WorkloadType == controlapi.NexWorkloadNative

request, err := controlapi.NewDeployRequest(
controlapi.Argv(autostart.Argv),
controlapi.Location(autostart.Location),
controlapi.Environment(autostart.Environment),
controlapi.Essential(essential),
controlapi.Essential(autostart.Essential),
controlapi.Issuer(n.issuerKeypair),
controlapi.SenderXKey(n.api.xk),
controlapi.TargetNode(n.publicKey),
Expand All @@ -382,7 +383,7 @@ func (n *Node) handleAutostarts() {

_, err = request.Validate()
if err != nil {
n.log.Error("Failed to validate autostart deployment request",
n.log.Error("Failed to validate autostart workload deploy request",
slog.Any("error", err),
)
agentClient.MarkUnselected()
Expand All @@ -402,9 +403,15 @@ func (n *Node) handleAutostarts() {
continue
}

agentDeployRequest := agentDeployRequestFromControlDeployRequest(request, autostart.Namespace, numBytes, *workloadHash)
var hash string
if workloadHash != nil {
hash = *workloadHash // HACK!!! for agent-local workloads, this should be read from the release manifest
}

agentDeployRequest := agentDeployRequestFromControlDeployRequest(request, autostart.Namespace, numBytes, hash)

agentDeployRequest.TotalBytes = int64(numBytes)
agentDeployRequest.Hash = *workloadHash
agentDeployRequest.Hash = hash

err = n.manager.DeployWorkload(agentClient, agentDeployRequest)
if err != nil {
Expand All @@ -427,7 +434,7 @@ func (n *Node) handleAutostarts() {
}

if successCount < len(n.config.AutostartConfiguration.Workloads) {
n.log.Error("Not all startup workloads suceeded",
n.log.Error("Failed to initialize autostart workloads",
slog.Int("expected", len(n.config.AutostartConfiguration.Workloads)),
slog.Int("actual", successCount),
)
Expand Down
91 changes: 51 additions & 40 deletions internal/node/workload_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,57 +156,68 @@ func (w *WorkloadManager) Start() {
}

func (m *WorkloadManager) CacheWorkload(workloadID string, request *controlapi.DeployRequest) (uint64, *string, error) {
bucket := request.Location.Host
key := strings.Trim(request.Location.Path, "/")
switch strings.ToLower(request.Location.Scheme) {
case controlapi.WorkloadLocationSchemeFile:
if m.config.NoSandbox {
return 0, nil, fmt.Errorf("Attempted to deploy agent-local workload artifact outside of sandbox")
}

jsLogAttr := []any{slog.String("bucket", bucket), slog.String("key", key)}
opts := []nats.JSOpt{}
if request.JsDomain != nil {
opts = append(opts, nats.Domain(*request.JsDomain))
return 0, nil, nil
case controlapi.WorkloadLocationSchemeNATS:
bucket := request.Location.Host
key := strings.Trim(request.Location.Path, "/")

jsLogAttr = append(jsLogAttr, slog.String("jsdomain", *request.JsDomain))
}
jsLogAttr := []any{slog.String("bucket", bucket), slog.String("key", key)}
opts := []nats.JSOpt{}
if request.JsDomain != nil {
opts = append(opts, nats.Domain(*request.JsDomain))

m.log.Info("Attempting object store download", jsLogAttr...)
jsLogAttr = append(jsLogAttr, slog.String("jsdomain", *request.JsDomain))
}

js, err := m.nc.JetStream(opts...)
if err != nil {
return 0, nil, err
}
m.log.Info("Attempting object store download", jsLogAttr...)

store, err := js.ObjectStore(bucket)
if err != nil {
m.log.Error("Failed to bind to source object store", slog.Any("err", err), slog.String("bucket", bucket))
return 0, nil, err
}
js, err := m.nc.JetStream(opts...)
if err != nil {
return 0, nil, err
}

_, err = store.GetInfo(key)
if err != nil {
m.log.Error("Failed to locate workload binary in source object store", slog.Any("err", err), slog.String("key", key), slog.String("bucket", bucket))
return 0, nil, err
}
store, err := js.ObjectStore(bucket)
if err != nil {
m.log.Error("Failed to bind to source object store", slog.Any("err", err), slog.String("bucket", bucket))
return 0, nil, err
}

workload, err := store.GetBytes(key)
if err != nil {
m.log.Error("Failed to download bytes from source object store", slog.Any("err", err), slog.String("key", key))
return 0, nil, err
}
_, err = store.GetInfo(key)
if err != nil {
m.log.Error("Failed to locate workload binary in source object store", slog.Any("err", err), slog.String("key", key), slog.String("bucket", bucket))
return 0, nil, err
}

err = m.natsint.StoreFileForID(workloadID, workload)
if err != nil {
m.log.Error("Failed to store bytes from source object store in cache", slog.Any("err", err), slog.String("key", key))
}
workload, err := store.GetBytes(key)
if err != nil {
m.log.Error("Failed to download bytes from source object store", slog.Any("err", err), slog.String("key", key))
return 0, nil, err
}

workloadHash := sha256.New()
workloadHash.Write(workload)
workloadHashString := hex.EncodeToString(workloadHash.Sum(nil))
err = m.natsint.StoreFileForID(workloadID, workload)
if err != nil {
m.log.Error("Failed to store bytes from source object store in cache", slog.Any("err", err), slog.String("key", key))
}

m.log.Info("Successfully stored workload in internal object store",
slog.String("workload", request.DecodedClaims.Subject),
slog.String("workload_id", workloadID),
slog.Int("bytes", len(workload)))
workloadHash := sha256.New()
workloadHash.Write(workload)
workloadHashString := hex.EncodeToString(workloadHash.Sum(nil))

m.log.Info("Successfully stored workload in internal object store",
slog.String("workload", request.DecodedClaims.Subject),
slog.String("workload_id", workloadID),
slog.Int("bytes", len(workload)))

return uint64(len(workload)), &workloadHashString, nil
}

return uint64(len(workload)), &workloadHashString, nil
return 0, nil, fmt.Errorf("Unsupported scheme specified for workload location")
}

// Deploy a workload as specified by the given deploy request to an available
Expand Down

0 comments on commit 20ce7f5

Please sign in to comment.