Skip to content

Commit

Permalink
Fix stability of workload deployment and process accounting (#324)
Browse files Browse the repository at this point in the history
  • Loading branch information
kthomas authored Jul 12, 2024
1 parent 2e4e2e9 commit c90e457
Show file tree
Hide file tree
Showing 29 changed files with 665 additions and 367 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ltb.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:
matrix:
os: [ubuntu-latest, windows-latest, macos-latest]
runs-on: ${{ matrix.os }}
timeout-minutes: 10
timeout-minutes: 20
steps:
-
uses: actions/checkout@v4
Expand Down
8 changes: 6 additions & 2 deletions Taskfile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ tasks:
sources:
- "*.go"
cmds:
# - go build -tags netgo -ldflags '-extldflags "-static"'
- go build
- go build -tags netgo -ldflags '-extldflags "-static"'

clean:
cmds:
- rm -f nex/nex
- rm -f rootfs.ext4.gz
- sudo killall -9 nex || true
- sudo killall -9 nex-agent || true
- sudo killall -9 firecracker || true
- sudo rm -rf /opt/cni/bin/*
- sudo rm -rf /var/lib/cni/*
- sudo rm -rf /etc/cni/conf.d/*
Expand All @@ -33,6 +35,8 @@ tasks:
- sudo rm -rf /tmp/*-spec-nex-wd
- sudo rm -rf /tmp/rootfs-*.ext4
- sudo rm -rf /tmp/*-rootfs.ext4
- sudo rm -rf /tmp/workload-*
- sudo rm -rf /tmp/*-non-existent-nex-resource-dir

nex:
dir: nex
Expand Down
77 changes: 50 additions & 27 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ import (
)

const (
defaultAgentHandshakeTimeoutMillis = 1000
defaultAgentHandshakeAttempts = 5
defaultAgentHandshakeTimeoutMillis = 500
runloopSleepInterval = 250 * time.Millisecond
runloopTickInterval = 2500 * time.Millisecond
workloadExecutionSleepTimeoutMillis = 1000
workloadCacheFileKey = "workload"
workloadExecutionSleepTimeoutMillis = 100
)

// Agent facilitates communication between the nex agent running in the firecracker VM
Expand All @@ -46,6 +46,7 @@ type Agent struct {
sigs chan os.Signal

provider providers.ExecutionProvider
subz []*nats.Subscription

cacheBucket nats.ObjectStore
md *agentapi.MachineMetadata
Expand Down Expand Up @@ -84,6 +85,7 @@ func NewAgent(ctx context.Context, cancelF context.CancelFunc) (*Agent, error) {
sandboxed: isSandboxed(),
md: metadata,
started: time.Now().UTC(),
subz: make([]*nats.Subscription, 0),
}, nil
}

Expand Down Expand Up @@ -136,32 +138,30 @@ func (a *Agent) requestHandshake() error {
}
raw, _ := json.Marshal(msg)

hs := false
var attempts int
for attempts = 0; attempts < 3; attempts++ {
attempts := 0
for attempts < defaultAgentHandshakeAttempts-1 && !a.shuttingDown() {
attempts++

resp, err := a.nc.Request(fmt.Sprintf("hostint.%s.handshake", *a.md.VmID), raw, time.Millisecond*defaultAgentHandshakeTimeoutMillis)
if err != nil {
a.LogError(fmt.Sprintf("Agent failed to request initial sync message: %s, attempt %d", err, attempts+1))
time.Sleep(time.Millisecond * 100)
time.Sleep(time.Millisecond * 25)
continue
}

var handshakeResponse *agentapi.HandshakeResponse
err = json.Unmarshal(resp.Data, &handshakeResponse)
if err != nil {
a.LogError(fmt.Sprintf("Failed to parse handshake response: %s", err))
time.Sleep(time.Millisecond * 100)
time.Sleep(time.Millisecond * 25)
continue
}
hs = true
break
}
if hs {
a.LogInfo(fmt.Sprintf("Agent is up after %d attempts", attempts+1))

a.LogInfo(fmt.Sprintf("Agent is up after %d attempt(s)", attempts))
return nil
} else {
return errors.New("Failed to obtain handshake from host")
}

return errors.New("Failed to obtain handshake from host")
}

func (a *Agent) Version() string {
Expand All @@ -180,7 +180,7 @@ func (a *Agent) cacheExecutableArtifact(req *agentapi.DeployRequest) (*string, e
tempFile = fmt.Sprintf("%s.exe", tempFile)
}

err := a.cacheBucket.GetFile(workloadCacheFileKey, tempFile)
err := a.cacheBucket.GetFile(*a.md.VmID, tempFile)
if err != nil {
msg := fmt.Sprintf("Failed to get and write workload artifact to temp dir: %s", err)
a.LogError(msg)
Expand All @@ -197,6 +197,18 @@ func (a *Agent) cacheExecutableArtifact(req *agentapi.DeployRequest) (*string, e
return &tempFile, nil
}

// deleteExecutableArtifact deletes the installed workload executable
// and purges it from the internal object store
func (a *Agent) deleteExecutableArtifact() error {
fileName := fmt.Sprintf("workload-%s", *a.md.VmID)
tempFile := path.Join(os.TempDir(), fileName)

_ = os.Remove(tempFile)
_ = a.cacheBucket.Delete(*a.md.VmID)

return nil
}

// Run inside a goroutine to pull event entries and publish them to the node host.
func (a *Agent) dispatchEvents() {
for !a.shuttingDown() {
Expand Down Expand Up @@ -351,24 +363,27 @@ func (a *Agent) init() error {
}

subject := fmt.Sprintf("agentint.%s.deploy", *a.md.VmID)
_, err = a.nc.Subscribe(subject, a.handleDeploy)
sub, err := a.nc.Subscribe(subject, a.handleDeploy)
if err != nil {
a.LogError(fmt.Sprintf("Failed to subscribe to agent deploy subject: %s", err))
return err
}
a.subz = append(a.subz, sub)

udsubject := fmt.Sprintf("agentint.%s.undeploy", *a.md.VmID)
_, err = a.nc.Subscribe(udsubject, a.handleUndeploy)
sub, err = a.nc.Subscribe(udsubject, a.handleUndeploy)
if err != nil {
a.LogError(fmt.Sprintf("Failed to subscribe to agent undeploy subject: %s", err))
return err
}
a.subz = append(a.subz, sub)

pingSubject := fmt.Sprintf("agentint.%s.ping", *a.md.VmID)
_, err = a.nc.Subscribe(pingSubject, a.handlePing)
sub, err = a.nc.Subscribe(pingSubject, a.handlePing)
if err != nil {
a.LogError(fmt.Sprintf("failed to subscribe to ping subject: %s", err))
}
a.subz = append(a.subz, sub)

go a.dispatchEvents()
go a.dispatchLogs()
Expand Down Expand Up @@ -405,13 +420,13 @@ func (a *Agent) initNATS() error {

js, err := a.nc.JetStream()
if err != nil {
fmt.Fprintf(os.Stderr, "failed to get JetStream context from shared NATS: %s", err)
fmt.Fprintf(os.Stderr, "failed to get JetStream context from internal NATS: %s", err)
return err
}

a.cacheBucket, err = js.ObjectStore(agentapi.WorkloadCacheBucket)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to get reference to shared object store: %s", err)
fmt.Fprintf(os.Stderr, "failed to get reference to internal object store: %s", err)
return err
}

Expand Down Expand Up @@ -483,18 +498,26 @@ func (a *Agent) newExecutionProviderParams(req *agentapi.DeployRequest, tmpFile

func (a *Agent) shutdown() {
if atomic.AddUint32(&a.closing, 1) == 1 {
_ = a.deleteExecutableArtifact()

for _, sub := range a.subz {
_ = sub.Drain()
}

if a.nc != nil {
_ = a.nc.Drain()
for !a.nc.IsClosed() {
time.Sleep(time.Millisecond * 25)
}
}

if a.provider != nil {
err := a.provider.Undeploy()
if err != nil {
fmt.Printf("failed to undeploy workload: %s", err)
fmt.Printf("failed to undeploy workload: %s\n", err)
}
}

_ = a.nc.Drain()
for !a.nc.IsClosed() {
time.Sleep(time.Millisecond * 25)
}

HaltVM(nil)
}
}
Expand Down
1 change: 1 addition & 0 deletions agent/providers/lib/v8.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ func (v *V8) Validate() error {
}

v.initUtils()
_ = os.Remove(v.tmpFilename)

return nil
}
Expand Down
6 changes: 4 additions & 2 deletions agent/providers/lib/wasm.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Wasm struct {
runtime wazero.Runtime
runtimeConfig wazero.ModuleConfig
module wazero.CompiledModule
tmpFile string

fail chan bool
run chan bool
Expand Down Expand Up @@ -92,7 +93,7 @@ func (e *Wasm) Execute(ctx context.Context, payload []byte) ([]byte, error) {
}

func (e *Wasm) Undeploy() error {
// We shouldn't have to do anything here since the wasm "owns" no resources
_ = os.Remove(e.tmpFile)
return nil
}

Expand Down Expand Up @@ -157,7 +158,8 @@ func InitNexExecutionProviderWasm(params *agentapi.ExecutionProviderParams) (*Wa
run: params.Run,
exit: params.Exit,

nc: params.NATSConn,
nc: params.NATSConn,
tmpFile: *params.TmpFilename,
}, nil
}

Expand Down
7 changes: 7 additions & 0 deletions control-api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func (api *Client) StartWorkload(request *DeployRequest) (*RunResponse, error) {
if err != nil {
return nil, err
}

return &response, nil
}

Expand Down Expand Up @@ -394,6 +395,7 @@ func handleLogEntry(api *Client, ch chan EmittedLog) func(m *nats.Msg) {
func (api *Client) performRequest(subject string, raw interface{}) ([]byte, error) {
var bytes []byte
var err error

if raw == nil {
bytes = []byte{}
} else {
Expand All @@ -407,24 +409,29 @@ func (api *Client) performRequest(subject string, raw interface{}) ([]byte, erro
if err != nil {
return nil, err
}

env, err := extractEnvelope(resp.Data)
if err != nil {
return nil, err
}

if env.Error != nil {
return nil, fmt.Errorf("%v", env.Error)
}

return json.Marshal(env.Data)
}

func extractEnvelope(data []byte) (*Envelope, error) {
if len(data) == 0 {
return nil, errors.New("no data for envelope")
}

var env Envelope
err := json.Unmarshal(data, &env)
if err != nil {
return nil, err
}

return &env, nil
}
18 changes: 10 additions & 8 deletions control-api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,29 +99,31 @@ type MemoryStat struct {
}

type InfoResponse struct {
Version string `json:"version"`
Uptime string `json:"uptime"`
PublicXKey string `json:"public_xkey"`
Tags map[string]string `json:"tags,omitempty"`
AvailableAgents int `json:"available_agents"`
Machines []MachineSummary `json:"machines"` // FIXME-- rename to workloads?
Memory *MemoryStat `json:"memory,omitempty"`
Machines []MachineSummary `json:"machines"`
PublicXKey string `json:"public_xkey"`
SupportedWorkloadTypes []NexWorkload `json:"supported_workload_types,omitempty"`
Tags map[string]string `json:"tags,omitempty"`
Uptime string `json:"uptime"`
Version string `json:"version"`
}

type MachineSummary struct {
type MachineSummary struct { // FIXME-- rename to workload summary?
Id string `json:"id"`
Healthy bool `json:"healthy"`
Uptime string `json:"uptime"`
Namespace string `json:"namespace,omitempty"`
Workload WorkloadSummary `json:"workload,omitempty"`
Workload WorkloadSummary `json:"workload,omitempty"` // FIXME-- rename to deploy request?
}

type WorkloadSummary struct {
Name string `json:"name"`
Description string `json:"description,omitempty"`
Hash string `json:"hash"`
Runtime string `json:"runtime"`
Uptime string `json:"uptime"`
WorkloadType NexWorkload `json:"type"`
Hash string `json:"hash"`
}

type Envelope struct {
Expand Down
Loading

0 comments on commit c90e457

Please sign in to comment.