diff --git a/control-api/run.go b/control-api/run.go index 405bf5fb..78088ecc 100644 --- a/control-api/run.go +++ b/control-api/run.go @@ -120,6 +120,12 @@ func NewDeployRequest(opts ...RequestOption) (*DeployRequest, error) { return req, nil } +// Returns true if the run request supports trigger subjects +func (request *DeployRequest) SupportsTriggerSubjects() bool { + return request.WorkloadType == NexWorkloadV8 || + request.WorkloadType == NexWorkloadWasm +} + // This will validate a request's workload JWT and return the parsed claims func (request *DeployRequest) Validate() (*jwt.GenericClaims, error) { claims, err := jwt.DecodeGeneric(*request.WorkloadJWT) diff --git a/control-api/utils.go b/control-api/utils.go index 9ea90c5f..679e8ca1 100644 --- a/control-api/utils.go +++ b/control-api/utils.go @@ -2,6 +2,7 @@ package controlapi import ( "encoding/base64" + "encoding/hex" "strings" ) @@ -17,5 +18,5 @@ func SanitizeNATSDigest(input string) string { return after } - return string(h) + return hex.EncodeToString(h) } diff --git a/internal/agent-api/client.go b/internal/agent-api/client.go index 9d067edb..9a4a0cf4 100644 --- a/internal/agent-api/client.go +++ b/internal/agent-api/client.go @@ -27,7 +27,7 @@ type ContactLostCallback func(string) const ( defaultAgentPingIntervalMillis = 5000 - maxRetryAttempts = 3 + maxRetryAttempts = 10 NexTriggerSubject = "x-nex-trigger-subject" NexRuntimeNs = "x-nex-runtime-ns" diff --git a/internal/agent-api/types.go b/internal/agent-api/types.go index eee99394..36f9f484 100644 --- a/internal/agent-api/types.go +++ b/internal/agent-api/types.go @@ -54,6 +54,7 @@ type AgentWorkloadInfo struct { Essential *bool `json:"essential,omitempty"` Hash string `json:"hash,omitempty"` ID *string `json:"id"` + Location *url.URL `json:"location"` Namespace *string `json:"namespace,omitempty"` RetriedAt *time.Time `json:"retried_at,omitempty"` RetryCount *uint `json:"retry_count,omitempty"` @@ -68,7 +69,10 @@ type AgentWorkloadInfo struct { Stdout io.Writer `json:"-"` TmpFilename *string `json:"-"` - Location *url.URL `json:"-"` + EncryptedEnvironment *string `json:"-"` + JsDomain *string `json:"-"` + SenderPublicKey *string `json:"-"` + WorkloadJWT *string `json:"-"` Errors []error `json:"errors,omitempty"` } @@ -109,6 +113,10 @@ func (r *AgentWorkloadInfo) Validate() error { err = errors.Join(err, errors.New("hash is required")) } + if r.Location == nil { + err = errors.Join(err, errors.New("location is required")) + } + if r.TotalBytes == 0 { // FIXME--- this should probably be checked against *string err = errors.Join(err, errors.New("total bytes is required")) } diff --git a/internal/node/controlapi.go b/internal/node/controlapi.go index 59c12f3f..cce443dc 100644 --- a/internal/node/controlapi.go +++ b/internal/node/controlapi.go @@ -284,8 +284,7 @@ func (api *ApiListener) handleDeploy(ctx context.Context, span trace.Span, m *na return } - if len(request.TriggerSubjects) > 0 && (request.WorkloadType != controlapi.NexWorkloadV8 && - request.WorkloadType != controlapi.NexWorkloadWasm) { // FIXME -- workload type comparison + if len(request.TriggerSubjects) > 0 && !request.SupportsTriggerSubjects() { // FIXME -- workload type comparison span.SetStatus(codes.Error, "Unsupported workload type for trigger subjects") api.log.Error("Workload type does not support trigger subject registration", slog.String("trigger_subjects", string(request.WorkloadType))) respondFail(controlapi.RunResponseType, m, fmt.Sprintf("Unsupported workload type for trigger subject registration: %s", string(request.WorkloadType))) @@ -393,6 +392,7 @@ func (api *ApiListener) handleDeploy(ctx context.Context, span trace.Span, m *na slog.String("namespace", namespace), slog.String("workload", *agentDeployRequest.WorkloadName), slog.String("workload_id", workloadID), + slog.String("workload_location", agentDeployRequest.Location.String()), slog.Uint64("workload_size", numBytes), slog.String("workload_sha256", *workloadHash), slog.String("type", string(request.WorkloadType)), diff --git a/internal/node/node.go b/internal/node/node.go index 010e3a85..13209717 100644 --- a/internal/node/node.go +++ b/internal/node/node.go @@ -453,12 +453,14 @@ func (n *Node) handleAutostarts() { hash = *workloadHash // HACK!!! for agent-local workloads, this should be read from the release manifest } - agentDeployRequest := agentWorkloadInfoFromControlDeployRequest(request, autostart.Namespace, numBytes, hash) + agentWorkloadInfo := agentWorkloadInfoFromControlDeployRequest(request, autostart.Namespace, numBytes, hash) - agentDeployRequest.TotalBytes = int64(numBytes) - agentDeployRequest.Hash = hash + agentWorkloadInfo.TotalBytes = int64(numBytes) + agentWorkloadInfo.Hash = hash - err = n.manager.DeployWorkload(agentClient, agentDeployRequest) + agentWorkloadInfo.Environment = autostart.Environment // HACK!!! we need to fix autostart config to allow encrypted environment... + + err = n.manager.DeployWorkload(agentClient, agentWorkloadInfo) if err != nil { n.log.Error("Failed to deploy autostart workload", slog.Any("error", err), @@ -704,21 +706,24 @@ func (n *Node) shuttingDown() bool { func agentWorkloadInfoFromControlDeployRequest(request *controlapi.DeployRequest, namespace string, numBytes uint64, hash string) *agentapi.AgentWorkloadInfo { return &agentapi.AgentWorkloadInfo{ - Argv: request.Argv, - DecodedClaims: request.DecodedClaims, - Description: request.Description, - Environment: request.WorkloadEnvironment, - Essential: request.Essential, - Hash: hash, - HostServicesConfig: request.HostServicesConfig, - ID: request.ID, - Location: request.Location, - Namespace: &namespace, - RetriedAt: request.RetriedAt, - RetryCount: request.RetryCount, - TotalBytes: int64(numBytes), - TriggerSubjects: request.TriggerSubjects, - WorkloadName: request.WorkloadName, - WorkloadType: request.WorkloadType, + Argv: request.Argv, + DecodedClaims: request.DecodedClaims, + Description: request.Description, + EncryptedEnvironment: request.Environment, + Environment: request.WorkloadEnvironment, // HACK!!! we need to fix autostart config to allow encrypted environment... + Essential: request.Essential, + Hash: hash, + HostServicesConfig: request.HostServicesConfig, + ID: request.ID, + Location: request.Location, + Namespace: &namespace, + RetriedAt: request.RetriedAt, + RetryCount: request.RetryCount, + SenderPublicKey: request.SenderPublicKey, + TotalBytes: int64(numBytes), + TriggerSubjects: request.TriggerSubjects, + WorkloadJWT: request.WorkloadJWT, + WorkloadName: request.WorkloadName, + WorkloadType: request.WorkloadType, } } diff --git a/internal/node/workload_mgr.go b/internal/node/workload_mgr.go index 3b1d86a9..7f9856f9 100644 --- a/internal/node/workload_mgr.go +++ b/internal/node/workload_mgr.go @@ -228,7 +228,7 @@ func (m *WorkloadManager) CacheWorkload(workloadID string, request *controlapi.D return uint64(len(workload)), &workloadHashString, nil } - return 0, nil, fmt.Errorf("Unsupported scheme specified for workload location") + return 0, nil, fmt.Errorf("unsupported scheme specified for workload location") } // Deploy a workload as specified by the given deploy request to an available diff --git a/internal/node/workload_mgr_events.go b/internal/node/workload_mgr_events.go index c2849bd9..9c8dff56 100644 --- a/internal/node/workload_mgr_events.go +++ b/internal/node/workload_mgr_events.go @@ -94,17 +94,22 @@ func (w *WorkloadManager) agentEvent(agentId string, evt cloudevents.Event) { digest := controlapi.SanitizeNATSDigest(info.Digest) req, _ := json.Marshal(&controlapi.DeployRequest{ - Argv: agentWorkloadInfo.Argv, - Description: agentWorkloadInfo.Description, - Hash: &digest, - Essential: agentWorkloadInfo.Essential, - ID: &id, - Location: agentWorkloadInfo.Location, - RetriedAt: agentWorkloadInfo.RetriedAt, - RetryCount: agentWorkloadInfo.RetryCount, - TriggerSubjects: agentWorkloadInfo.TriggerSubjects, - WorkloadName: agentWorkloadInfo.WorkloadName, - WorkloadType: agentWorkloadInfo.WorkloadType, + Argv: agentWorkloadInfo.Argv, + Description: agentWorkloadInfo.Description, + Hash: &digest, + Environment: agentWorkloadInfo.EncryptedEnvironment, + Essential: agentWorkloadInfo.Essential, + HostServicesConfig: agentWorkloadInfo.HostServicesConfig, + ID: &id, + JsDomain: agentWorkloadInfo.JsDomain, + Location: agentWorkloadInfo.Location, + RetriedAt: agentWorkloadInfo.RetriedAt, + RetryCount: agentWorkloadInfo.RetryCount, + SenderPublicKey: agentWorkloadInfo.SenderPublicKey, + TriggerSubjects: agentWorkloadInfo.TriggerSubjects, + WorkloadJWT: agentWorkloadInfo.WorkloadJWT, + WorkloadName: agentWorkloadInfo.WorkloadName, + WorkloadType: agentWorkloadInfo.WorkloadType, }) nodeID := w.publicKey