Skip to content

Commit

Permalink
stops infinite recursion. attempts to clarify difference between the …
Browse files Browse the repository at this point in the history
…2 deploy requests (#337)

* stops infinite recursion. attempts to clarify difference between the 2 deploy requests

* a few more renames

* another rename

* renaming more variables
  • Loading branch information
autodidaddict authored Aug 2, 2024
1 parent 71fb011 commit 9b94b23
Show file tree
Hide file tree
Showing 13 changed files with 111 additions and 119 deletions.
30 changes: 15 additions & 15 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (a *Agent) Version() string {
// the executable workload artifact from the cache bucket, write it to a
// temporary file and make it executable; this method returns the full
// path to the cached artifact if successful
func (a *Agent) cacheExecutableArtifact(req *agentapi.DeployRequest) (*string, error) {
func (a *Agent) cacheExecutableArtifact(req *agentapi.AgentWorkloadInfo) (*string, error) {
fileName := fmt.Sprintf("workload-%s", *a.md.VmID)
tempFile := path.Join(os.TempDir(), fileName)

Expand Down Expand Up @@ -260,28 +260,28 @@ func (a *Agent) dispatchLogs() {
// bucket, write it to tmp, initialize the execution provider per the
// request, and then validate and deploy a workload
func (a *Agent) handleDeploy(m *nats.Msg) {
var request agentapi.DeployRequest
err := json.Unmarshal(m.Data, &request)
var info agentapi.AgentWorkloadInfo
err := json.Unmarshal(m.Data, &info)
if err != nil {
msg := fmt.Sprintf("Failed to unmarshal deploy request: %s", err)
a.submitLog(msg, slog.LevelError)
_ = a.workAck(m, false, msg)
return
}

err = request.Validate()
err = info.Validate()
if err != nil {
_ = a.workAck(m, false, fmt.Sprintf("%v", err)) // FIXME-- this message can be formatted prettier
return
}

tmpFile, err := a.cacheExecutableArtifact(&request)
tmpFile, err := a.cacheExecutableArtifact(&info)
if err != nil {
_ = a.workAck(m, false, err.Error())
return
}

params, err := a.newExecutionProviderParams(&request, *tmpFile)
params, err := a.newExecutionProviderParams(&info, *tmpFile)
if err != nil {
_ = a.workAck(m, false, err.Error())
return
Expand All @@ -297,7 +297,7 @@ func (a *Agent) handleDeploy(m *nats.Msg) {
a.provider = provider

shouldValidate := true
if !a.sandboxed && request.WorkloadType == controlapi.NexWorkloadNative {
if !a.sandboxed && info.WorkloadType == controlapi.NexWorkloadNative {
shouldValidate = false
}

Expand Down Expand Up @@ -455,28 +455,28 @@ func (a *Agent) installSignalHandlers() {

// newExecutionProviderParams initializes new execution provider params
// for the given work request and starts a goroutine listening
func (a *Agent) newExecutionProviderParams(req *agentapi.DeployRequest, tmpFile string) (*agentapi.ExecutionProviderParams, error) {
func (a *Agent) newExecutionProviderParams(info *agentapi.AgentWorkloadInfo, tmpFile string) (*agentapi.ExecutionProviderParams, error) {
if a.md.VmID == nil {
return nil, errors.New("vm id is required to initialize execution provider params")
}

if req.WorkloadName == nil {
if info.WorkloadName == nil {
return nil, errors.New("workload name is required to initialize execution provider params")
}

params := &agentapi.ExecutionProviderParams{
DeployRequest: *req,
Stderr: &logEmitter{stderr: true, name: *req.WorkloadName, logs: a.agentLogs},
Stdout: &logEmitter{stderr: false, name: *req.WorkloadName, logs: a.agentLogs},
TmpFilename: &tmpFile,
VmID: *a.md.VmID,
AgentWorkloadInfo: *info,
Stderr: &logEmitter{stderr: true, name: *info.WorkloadName, logs: a.agentLogs},
Stdout: &logEmitter{stderr: false, name: *info.WorkloadName, logs: a.agentLogs},
TmpFilename: &tmpFile,
VmID: *a.md.VmID,

Fail: make(chan bool),
Run: make(chan bool),
Exit: make(chan int),

NATSConn: a.nc,
TriggerSubjects: req.TriggerSubjects,
TriggerSubjects: info.TriggerSubjects,
PluginPath: a.md.PluginPath,
}

Expand Down
2 changes: 1 addition & 1 deletion agent/providers/plugins_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func TestNoopPluginLoad(t *testing.T) {
plugPath := "../../test/fixtures"
wName := "echofunctionjs"
params := &agentapi.ExecutionProviderParams{
DeployRequest: agentapi.DeployRequest{
AgentWorkloadInfo: agentapi.AgentWorkloadInfo{
WorkloadName: &wName,
WorkloadType: "noop",
},
Expand Down
24 changes: 16 additions & 8 deletions internal/agent-api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type ContactLostCallback func(string)

const (
defaultAgentPingIntervalMillis = 5000
maxRetryAttempts = 3

NexTriggerSubject = "x-nex-trigger-subject"
NexRuntimeNs = "x-nex-runtime-ns"
Expand Down Expand Up @@ -56,9 +57,10 @@ type AgentClient struct {
execTotalNanos int64
workloadStartedAt time.Time

deployRequest *DeployRequest
workloadBytes uint64
subz []*nats.Subscription
workloadInfo *AgentWorkloadInfo
workloadBytes uint64
subz []*nats.Subscription
deployRetryCount uint

selected bool // FIXME-- rename...
}
Expand All @@ -82,6 +84,7 @@ func NewAgentClient(
handshakeSucceeded: onSuccess,
log: log,
logReceived: onLog,
deployRetryCount: 0,
nc: nc,
pingTimeout: pingTimeout,
subz: make([]*nats.Subscription, 0),
Expand Down Expand Up @@ -130,7 +133,11 @@ func (a *AgentClient) Start(agentID string) error {
return nil
}

func (a *AgentClient) DeployWorkload(request *DeployRequest) (*DeployResponse, error) {
func (a *AgentClient) DeployWorkload(request *AgentWorkloadInfo) (*DeployResponse, error) {
if a.deployRetryCount > maxRetryAttempts {
return nil, fmt.Errorf("exceeded maximum number of agent workload deploy attempts: %d", maxRetryAttempts)
}

bytes, err := json.Marshal(request)
if err != nil {
return nil, err
Expand All @@ -148,6 +155,7 @@ func (a *AgentClient) DeployWorkload(request *DeployRequest) (*DeployResponse, e
return nil, errors.New("timed out waiting for acknowledgement of workload deployment")
} else if errors.Is(err, nats.ErrNoResponders) {
time.Sleep(time.Millisecond * 100)
a.deployRetryCount += 1
return a.DeployWorkload(request)
} else {
return nil, fmt.Errorf("failed to submit request for workload deployment: %s", err)
Expand All @@ -161,7 +169,7 @@ func (a *AgentClient) DeployWorkload(request *DeployRequest) (*DeployResponse, e
return nil, err
}

a.deployRequest = request
a.workloadInfo = request
a.workloadStartedAt = time.Now().UTC()
a.workloadBytes = uint64(request.TotalBytes)
return &deployResponse, nil
Expand Down Expand Up @@ -250,8 +258,8 @@ func (a *AgentClient) UptimeMillis() time.Duration {
return time.Since(a.workloadStartedAt)
}

func (a *AgentClient) DeployRequest() *DeployRequest {
return a.deployRequest
func (a *AgentClient) WorkloadInfo() *AgentWorkloadInfo {
return a.workloadInfo
}

func (a *AgentClient) IsSelected() bool {
Expand Down Expand Up @@ -286,7 +294,7 @@ func (a *AgentClient) RunTrigger(ctx context.Context, tracer trace.Tracer, subje
}

func (a *AgentClient) WorkloadType() controlapi.NexWorkload {
return a.deployRequest.WorkloadType
return a.workloadInfo.WorkloadType
}

func (a *AgentClient) awaitHandshake(agentID string) {
Expand Down
33 changes: 14 additions & 19 deletions internal/agent-api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const DefaultRunloopSleepTimeoutMillis = 25

// ExecutionProviderParams parameters for initializing a specific execution provider
type ExecutionProviderParams struct {
DeployRequest
AgentWorkloadInfo
TriggerSubjects []string `json:"trigger_subjects"`

// Fail channel receives bool upon command failing to start
Expand All @@ -45,8 +45,8 @@ type ExecutionProviderParams struct {
PluginPath *string `json:"-"`
}

// DeployRequest processed by the agent
type DeployRequest struct {
// AgentWorkloadInfo processed by the agent
type AgentWorkloadInfo struct {
Argv []string `json:"argv,omitempty"`
DecodedClaims jwt.GenericClaims `json:"-"`
Description *string `json:"description"`
Expand All @@ -68,34 +68,29 @@ type DeployRequest struct {
Stdout io.Writer `json:"-"`
TmpFilename *string `json:"-"`

EncryptedEnvironment *string `json:"-"`
JsDomain *string `json:"-"`
Location *url.URL `json:"-"`
SenderPublicKey *string `json:"-"`
TargetNode *string `json:"-"`
WorkloadJwt *string `json:"-"`
Location *url.URL `json:"-"`

Errors []error `json:"errors,omitempty"`
}

func (request *DeployRequest) IsEssential() bool {
return request.Essential != nil && *request.Essential
func (info *AgentWorkloadInfo) IsEssential() bool {
return info.Essential != nil && *info.Essential
}

// Returns true if the run request supports essential flag
func (request *DeployRequest) SupportsEssential() bool {
return request.WorkloadType == controlapi.NexWorkloadNative ||
request.WorkloadType == controlapi.NexWorkloadOCI
func (info *AgentWorkloadInfo) SupportsEssential() bool {
return info.WorkloadType == controlapi.NexWorkloadNative ||
info.WorkloadType == controlapi.NexWorkloadOCI
}

// Returns true if the run request supports trigger subjects
func (request *DeployRequest) SupportsTriggerSubjects() bool {
return (request.WorkloadType == controlapi.NexWorkloadV8 ||
request.WorkloadType == controlapi.NexWorkloadWasm) &&
len(request.TriggerSubjects) > 0
func (info *AgentWorkloadInfo) SupportsTriggerSubjects() bool {
return (info.WorkloadType == controlapi.NexWorkloadV8 ||
info.WorkloadType == controlapi.NexWorkloadWasm) &&
len(info.TriggerSubjects) > 0
}

func (r *DeployRequest) Validate() error {
func (r *AgentWorkloadInfo) Validate() error {
var err error

if r.Namespace == nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/node/controlapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ func (api *ApiListener) handleDeploy(ctx context.Context, span trace.Span, m *na
return
}

agentDeployRequest := agentDeployRequestFromControlDeployRequest(&request, namespace, numBytes, *workloadHash)
agentDeployRequest := agentWorkloadInfoFromControlDeployRequest(&request, namespace, numBytes, *workloadHash)

api.log.
Info("Submitting workload to agent",
Expand Down
45 changes: 19 additions & 26 deletions internal/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ func (n *Node) handleAutostarts() {
continue
}

agentDeployRequest := agentDeployRequestFromControlDeployRequest(request, autostart.Namespace, numBytes, *workloadHash)
agentDeployRequest := agentWorkloadInfoFromControlDeployRequest(request, autostart.Namespace, numBytes, *workloadHash)
agentDeployRequest.TotalBytes = int64(numBytes)
agentDeployRequest.Hash = *workloadHash

Expand Down Expand Up @@ -693,30 +693,23 @@ func (n *Node) shuttingDown() bool {
return (atomic.LoadUint32(&n.closing) > 0)
}

// For the curious - I tried a number of ways of making a common/shared deploy request and only modeling the differences
// here, but it made all the code that creates deployment requests look hideous. Will look into cleaning this up again.
func agentDeployRequestFromControlDeployRequest(request *controlapi.DeployRequest, namespace string, numBytes uint64, hash string) *agentapi.DeployRequest {
return &agentapi.DeployRequest{
Argv: request.Argv,
DecodedClaims: request.DecodedClaims,
Description: request.Description,
EncryptedEnvironment: request.Environment,
Environment: request.WorkloadEnvironment,
Essential: request.Essential,
Hash: hash,
HostServicesConfig: request.HostServicesConfig,
ID: request.ID,
JsDomain: request.JsDomain,
Location: request.Location,
Namespace: &namespace,
RetriedAt: request.RetriedAt,
RetryCount: request.RetryCount,
SenderPublicKey: request.SenderPublicKey,
TargetNode: request.TargetNode,
TotalBytes: int64(numBytes),
TriggerSubjects: request.TriggerSubjects,
WorkloadJwt: request.WorkloadJWT,
WorkloadName: request.WorkloadName,
WorkloadType: request.WorkloadType,
func agentWorkloadInfoFromControlDeployRequest(request *controlapi.DeployRequest, namespace string, numBytes uint64, hash string) *agentapi.AgentWorkloadInfo {
return &agentapi.AgentWorkloadInfo{
Argv: request.Argv,
DecodedClaims: request.DecodedClaims,
Description: request.Description,
Environment: request.WorkloadEnvironment,
Essential: request.Essential,
Hash: hash,
HostServicesConfig: request.HostServicesConfig,
ID: request.ID,
Location: request.Location,
Namespace: &namespace,
RetriedAt: request.RetriedAt,
RetryCount: request.RetryCount,
TotalBytes: int64(numBytes),
TriggerSubjects: request.TriggerSubjects,
WorkloadName: request.WorkloadName,
WorkloadType: request.WorkloadType,
}
}
6 changes: 3 additions & 3 deletions internal/node/processmanager/firecracker_procman.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type FirecrackerProcessManager struct {
natsint *internalnats.InternalNatsServer

delegate ProcessDelegate
deployRequests map[string]*agentapi.DeployRequest
deployRequests map[string]*agentapi.AgentWorkloadInfo
}

func NewFirecrackerProcessManager(
Expand All @@ -61,7 +61,7 @@ func NewFirecrackerProcessManager(
allVMs: make(map[string]*runningFirecracker),
poolVMs: make(chan *runningFirecracker, config.MachinePoolSize),
stopMutex: make(map[string]*sync.Mutex),
deployRequests: make(map[string]*agentapi.DeployRequest),
deployRequests: make(map[string]*agentapi.AgentWorkloadInfo),
}, nil
}

Expand Down Expand Up @@ -95,7 +95,7 @@ func (f *FirecrackerProcessManager) EnterLameDuck() error {
}

// Preparing a workload reads from the warmVMs channel
func (f *FirecrackerProcessManager) PrepareWorkload(workloadId string, deployRequest *agentapi.DeployRequest) error {
func (f *FirecrackerProcessManager) PrepareWorkload(workloadId string, deployRequest *agentapi.AgentWorkloadInfo) error {
f.mutex.Lock()
defer f.mutex.Unlock()

Expand Down
4 changes: 2 additions & 2 deletions internal/node/processmanager/process_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const runloopSleepInterval = 100 * time.Millisecond

// Information about an agent process without regard to the implementation of the agent process manager
type ProcessInfo struct {
DeployRequest *agentapi.DeployRequest
DeployRequest *agentapi.AgentWorkloadInfo
ID string
Name string
Namespace string
Expand All @@ -37,7 +37,7 @@ type ProcessManager interface {

// Associate a deploy request with the given workload id, and perform any
// just in time initialization of resources if necessary
PrepareWorkload(id string, request *agentapi.DeployRequest) error
PrepareWorkload(id string, request *agentapi.AgentWorkloadInfo) error

// Start the process manager and allocate a pool of agents based on an implementation-specific
// strategy, delegating callbacks to the given delegate
Expand Down
2 changes: 1 addition & 1 deletion internal/node/processmanager/running_vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type runningFirecracker struct {

closing uint32
config *nexmodels.NodeConfiguration
deployRequest *agentapi.DeployRequest
deployRequest *agentapi.AgentWorkloadInfo
ip net.IP
log *slog.Logger
machine *firecracker.Machine
Expand Down
8 changes: 4 additions & 4 deletions internal/node/processmanager/spawn_procman.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ type SpawningProcessManager struct {
natsint *internalnats.InternalNatsServer

delegate ProcessDelegate
deployRequests map[string]*agentapi.DeployRequest
deployRequests map[string]*agentapi.AgentWorkloadInfo

log *slog.Logger
}

type spawnedProcess struct {
cmd *exec.Cmd
deployRequest *agentapi.DeployRequest
deployRequest *agentapi.AgentWorkloadInfo
workloadStarted time.Time

ID string
Expand Down Expand Up @@ -74,7 +74,7 @@ func NewSpawningProcessManager(

stopMutexes: make(map[string]*sync.Mutex),

deployRequests: make(map[string]*agentapi.DeployRequest),
deployRequests: make(map[string]*agentapi.AgentWorkloadInfo),
allProcs: make(map[string]*spawnedProcess),
poolProcs: make(chan *spawnedProcess, config.MachinePoolSize),
}, nil
Expand Down Expand Up @@ -110,7 +110,7 @@ func (s *SpawningProcessManager) EnterLameDuck() error {
}

// Attaches a deployment request to a running process. Until a process is prepared, it's just an empty agent
func (s *SpawningProcessManager) PrepareWorkload(workloadID string, deployRequest *agentapi.DeployRequest) error {
func (s *SpawningProcessManager) PrepareWorkload(workloadID string, deployRequest *agentapi.AgentWorkloadInfo) error {
s.mutex.Lock()
defer s.mutex.Unlock()

Expand Down
Loading

0 comments on commit 9b94b23

Please sign in to comment.