Skip to content

Commit

Permalink
Fix regressions related to AgentWorkloadInfo refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
kthomas committed Aug 5, 2024
1 parent 501ce3c commit 307bfbc
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 37 deletions.
6 changes: 6 additions & 0 deletions control-api/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ func NewDeployRequest(opts ...RequestOption) (*DeployRequest, error) {
return req, nil
}

// Returns true if the run request supports trigger subjects
func (request *DeployRequest) SupportsTriggerSubjects() bool {
return request.WorkloadType == NexWorkloadV8 ||
request.WorkloadType == NexWorkloadWasm
}

// This will validate a request's workload JWT and return the parsed claims
func (request *DeployRequest) Validate() (*jwt.GenericClaims, error) {
claims, err := jwt.DecodeGeneric(*request.WorkloadJWT)
Expand Down
3 changes: 2 additions & 1 deletion control-api/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controlapi

import (
"encoding/base64"
"encoding/hex"
"strings"
)

Expand All @@ -17,5 +18,5 @@ func SanitizeNATSDigest(input string) string {
return after
}

return string(h)
return hex.EncodeToString(h)
}
2 changes: 1 addition & 1 deletion internal/agent-api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type ContactLostCallback func(string)

const (
defaultAgentPingIntervalMillis = 5000
maxRetryAttempts = 3
maxRetryAttempts = 10

NexTriggerSubject = "x-nex-trigger-subject"
NexRuntimeNs = "x-nex-runtime-ns"
Expand Down
10 changes: 9 additions & 1 deletion internal/agent-api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type AgentWorkloadInfo struct {
Essential *bool `json:"essential,omitempty"`
Hash string `json:"hash,omitempty"`
ID *string `json:"id"`
Location *url.URL `json:"location"`
Namespace *string `json:"namespace,omitempty"`
RetriedAt *time.Time `json:"retried_at,omitempty"`
RetryCount *uint `json:"retry_count,omitempty"`
Expand All @@ -68,7 +69,10 @@ type AgentWorkloadInfo struct {
Stdout io.Writer `json:"-"`
TmpFilename *string `json:"-"`

Location *url.URL `json:"-"`
EncryptedEnvironment *string `json:"-"`
JsDomain *string `json:"-"`
SenderPublicKey *string `json:"-"`
WorkloadJWT *string `json:"-"`

Errors []error `json:"errors,omitempty"`
}
Expand Down Expand Up @@ -109,6 +113,10 @@ func (r *AgentWorkloadInfo) Validate() error {
err = errors.Join(err, errors.New("hash is required"))
}

if r.Location == nil {
err = errors.Join(err, errors.New("location is required"))
}

if r.TotalBytes == 0 { // FIXME--- this should probably be checked against *string
err = errors.Join(err, errors.New("total bytes is required"))
}
Expand Down
4 changes: 2 additions & 2 deletions internal/node/controlapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,7 @@ func (api *ApiListener) handleDeploy(ctx context.Context, span trace.Span, m *na
return
}

if len(request.TriggerSubjects) > 0 && (request.WorkloadType != controlapi.NexWorkloadV8 &&
request.WorkloadType != controlapi.NexWorkloadWasm) { // FIXME -- workload type comparison
if len(request.TriggerSubjects) > 0 && !request.SupportsTriggerSubjects() { // FIXME -- workload type comparison
span.SetStatus(codes.Error, "Unsupported workload type for trigger subjects")
api.log.Error("Workload type does not support trigger subject registration", slog.String("trigger_subjects", string(request.WorkloadType)))
respondFail(controlapi.RunResponseType, m, fmt.Sprintf("Unsupported workload type for trigger subject registration: %s", string(request.WorkloadType)))
Expand Down Expand Up @@ -393,6 +392,7 @@ func (api *ApiListener) handleDeploy(ctx context.Context, span trace.Span, m *na
slog.String("namespace", namespace),
slog.String("workload", *agentDeployRequest.WorkloadName),
slog.String("workload_id", workloadID),
slog.String("workload_location", agentDeployRequest.Location.String()),
slog.Uint64("workload_size", numBytes),
slog.String("workload_sha256", *workloadHash),
slog.String("type", string(request.WorkloadType)),
Expand Down
45 changes: 25 additions & 20 deletions internal/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,12 +453,14 @@ func (n *Node) handleAutostarts() {
hash = *workloadHash // HACK!!! for agent-local workloads, this should be read from the release manifest
}

agentDeployRequest := agentWorkloadInfoFromControlDeployRequest(request, autostart.Namespace, numBytes, hash)
agentWorkloadInfo := agentWorkloadInfoFromControlDeployRequest(request, autostart.Namespace, numBytes, hash)

agentDeployRequest.TotalBytes = int64(numBytes)
agentDeployRequest.Hash = hash
agentWorkloadInfo.TotalBytes = int64(numBytes)
agentWorkloadInfo.Hash = hash

err = n.manager.DeployWorkload(agentClient, agentDeployRequest)
agentWorkloadInfo.Environment = autostart.Environment // HACK!!! we need to fix autostart config to allow encrypted environment...

err = n.manager.DeployWorkload(agentClient, agentWorkloadInfo)
if err != nil {
n.log.Error("Failed to deploy autostart workload",
slog.Any("error", err),
Expand Down Expand Up @@ -704,21 +706,24 @@ func (n *Node) shuttingDown() bool {

func agentWorkloadInfoFromControlDeployRequest(request *controlapi.DeployRequest, namespace string, numBytes uint64, hash string) *agentapi.AgentWorkloadInfo {
return &agentapi.AgentWorkloadInfo{
Argv: request.Argv,
DecodedClaims: request.DecodedClaims,
Description: request.Description,
Environment: request.WorkloadEnvironment,
Essential: request.Essential,
Hash: hash,
HostServicesConfig: request.HostServicesConfig,
ID: request.ID,
Location: request.Location,
Namespace: &namespace,
RetriedAt: request.RetriedAt,
RetryCount: request.RetryCount,
TotalBytes: int64(numBytes),
TriggerSubjects: request.TriggerSubjects,
WorkloadName: request.WorkloadName,
WorkloadType: request.WorkloadType,
Argv: request.Argv,
DecodedClaims: request.DecodedClaims,
Description: request.Description,
EncryptedEnvironment: request.Environment,
Environment: request.WorkloadEnvironment, // HACK!!! we need to fix autostart config to allow encrypted environment...
Essential: request.Essential,
Hash: hash,
HostServicesConfig: request.HostServicesConfig,
ID: request.ID,
Location: request.Location,
Namespace: &namespace,
RetriedAt: request.RetriedAt,
RetryCount: request.RetryCount,
SenderPublicKey: request.SenderPublicKey,
TotalBytes: int64(numBytes),
TriggerSubjects: request.TriggerSubjects,
WorkloadJWT: request.WorkloadJWT,
WorkloadName: request.WorkloadName,
WorkloadType: request.WorkloadType,
}
}
2 changes: 1 addition & 1 deletion internal/node/workload_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (m *WorkloadManager) CacheWorkload(workloadID string, request *controlapi.D
return uint64(len(workload)), &workloadHashString, nil
}

return 0, nil, fmt.Errorf("Unsupported scheme specified for workload location")
return 0, nil, fmt.Errorf("unsupported scheme specified for workload location")
}

// Deploy a workload as specified by the given deploy request to an available
Expand Down
27 changes: 16 additions & 11 deletions internal/node/workload_mgr_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,17 +94,22 @@ func (w *WorkloadManager) agentEvent(agentId string, evt cloudevents.Event) {
digest := controlapi.SanitizeNATSDigest(info.Digest)

req, _ := json.Marshal(&controlapi.DeployRequest{
Argv: agentWorkloadInfo.Argv,
Description: agentWorkloadInfo.Description,
Hash: &digest,
Essential: agentWorkloadInfo.Essential,
ID: &id,
Location: agentWorkloadInfo.Location,
RetriedAt: agentWorkloadInfo.RetriedAt,
RetryCount: agentWorkloadInfo.RetryCount,
TriggerSubjects: agentWorkloadInfo.TriggerSubjects,
WorkloadName: agentWorkloadInfo.WorkloadName,
WorkloadType: agentWorkloadInfo.WorkloadType,
Argv: agentWorkloadInfo.Argv,
Description: agentWorkloadInfo.Description,
Hash: &digest,
Environment: agentWorkloadInfo.EncryptedEnvironment,
Essential: agentWorkloadInfo.Essential,
HostServicesConfig: agentWorkloadInfo.HostServicesConfig,
ID: &id,
JsDomain: agentWorkloadInfo.JsDomain,
Location: agentWorkloadInfo.Location,
RetriedAt: agentWorkloadInfo.RetriedAt,
RetryCount: agentWorkloadInfo.RetryCount,
SenderPublicKey: agentWorkloadInfo.SenderPublicKey,
TriggerSubjects: agentWorkloadInfo.TriggerSubjects,
WorkloadJWT: agentWorkloadInfo.WorkloadJWT,
WorkloadName: agentWorkloadInfo.WorkloadName,
WorkloadType: agentWorkloadInfo.WorkloadType,
})

nodeID := w.publicKey
Expand Down

0 comments on commit 307bfbc

Please sign in to comment.