Skip to content

Commit

Permalink
feat: new approach for longpoll
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe committed Sep 12, 2024
1 parent 9496747 commit 4c8c62c
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 72 deletions.
10 changes: 4 additions & 6 deletions config/crds/mission-control.flanksource.com_incidentrules.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,15 @@ spec:
properties:
timeout:
description: How long after the health checks have been passing
before, autoclosing the incident.
format: int64
type: integer
before, autoclosing the incident (accepts goduration format)
type: string
type: object
autoResolve:
properties:
timeout:
description: How long after the health checks have been passing
before, autoclosing the incident.
format: int64
type: integer
before, autoclosing the incident (accepts goduration format)
type: string
type: object
breakOnMatch:
description: stop processing other incident rules, when matched
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ require (
github.com/samber/oops v1.13.1
github.com/samber/slog-echo v1.14.4
github.com/santhosh-tekuri/jsonschema/v6 v6.0.1
github.com/sethvargo/go-retry v0.3.0
github.com/slack-go/slack v0.14.0
github.com/tg123/go-htpasswd v1.2.2
github.com/timberio/go-datemath v0.1.0
Expand Down Expand Up @@ -207,7 +208,6 @@ require (
github.com/rodaine/table v1.3.0 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
github.com/sergi/go-diff v1.3.2-0.20230802210424-5b0b94c5c0d3 // indirect
github.com/sethvargo/go-retry v0.3.0 // indirect
github.com/shirou/gopsutil/v3 v3.24.5 // indirect
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/shurcooL/githubv4 v0.0.0-20190718010115-4ba037080260 // indirect
Expand Down Expand Up @@ -359,6 +359,6 @@ require (

// replace github.com/flanksource/commons => /Users/moshe/go/src/github.com/flanksource/commons

// replace github.com/flanksource/duty => ../du31
// replace github.com/flanksource/duty => ../duty

// replace github.com/flanksource/gomplate/v3 => /Users/moshe/go/src/github.com/flanksource/gomplate
3 changes: 1 addition & 2 deletions playbook/run_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,7 @@ func StartPlaybookConsumers(ctx context.Context) error {
go pg.Listen(ctx, pgNotifyPlaybookActionUpdates, actionAgentUpdatesPGNotifyChannel)
go actionAgentEventConsumer.Listen(ctx, actionAgentUpdatesPGNotifyChannel)

go pg.Listen(ctx, pgNotifyPlaybookActionUpdates, runner.ActionMgr.Chan())
go runner.ActionMgr.Listen()
go runner.ActionNotifyRouter.Run(ctx, pgNotifyPlaybookActionUpdates)

return nil
}
Expand Down
114 changes: 98 additions & 16 deletions playbook/runner/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,33 +20,114 @@ type ActionForAgent struct {
TemplateEnv actions.TemplateEnv `json:"template_env"`
}

func GetActionForAgent(ctx context.Context, agent *models.Agent) (*ActionForAgent, error) {
func GetActionForAgentWithWait(ctx context.Context, agent *models.Agent) (*ActionForAgent, error) {
action, err := getActionForAgent(ctx, agent)
if err != nil {
return nil, err
}

if action != nil {
return action, err
}

// Go into waiting state
select {
case <-time.After(LongpollTimeout):
return &ActionForAgent{}, nil

case actionID := <-ActionMgr.Register(agent.ID.String()):
tx := ctx.DB().Begin()
if tx.Error != nil {
return nil, fmt.Errorf("error initiating db tx: %w", tx.Error)
case <-ActionNotifyRouter.RegisterRoutes(agent.ID.String()):
action, err := getActionForAgent(ctx, agent)
if err != nil {
return nil, err
}
defer tx.Rollback()

ctx = ctx.WithDB(tx, ctx.Pool())
ctx = ctx.WithObject(agent)
return action, err
}
}

var action models.PlaybookRunAction
if err := ctx.DB().Where("id = ?", actionID).First(&action).Error; err != nil {
return nil, err
}
func getActionForAgent(ctx context.Context, agent *models.Agent) (*ActionForAgent, error) {
tx := ctx.DB().Begin()
if tx.Error != nil {
return nil, fmt.Errorf("error initiating db tx: %w", tx.Error)
}
defer tx.Rollback()

ctx = ctx.WithDB(tx, ctx.Pool())
ctx = ctx.WithObject(agent)

query := `
SELECT playbook_run_actions.*
FROM playbook_run_actions
INNER JOIN playbook_runs ON playbook_runs.id = playbook_run_actions.playbook_run_id
INNER JOIN playbooks ON playbooks.id = playbook_runs.playbook_id
WHERE playbook_run_actions.status = ?
AND (playbook_run_actions.scheduled_time IS NULL or playbook_run_actions.scheduled_time <= NOW())
AND playbook_run_actions.agent_id = ?
ORDER BY scheduled_time
FOR UPDATE SKIP LOCKED
LIMIT 1
`

var steps []models.PlaybookRunAction
if err := ctx.DB().Raw(query, models.PlaybookRunStatusWaiting, agent.ID).Find(&steps).Error; err != nil {
return nil, ctx.Oops("db").Wrap(err)
}

actionForAgent, err := getAgentAction(ctx, agent, &action)
if err != nil {
return nil, err
if len(steps) == 0 {
return nil, nil
}
step := &steps[0]
ctx = ctx.WithObject(agent, step)

run, err := step.GetRun(ctx.DB())
if err != nil {
return nil, ctx.Oops().Wrap(err)
}
ctx = ctx.WithObject(agent, step, run)
playbook, err := step.GetPlaybook(ctx.DB())
if err != nil {
return nil, ctx.Oops().Wrap(err)
}
ctx = ctx.WithObject(playbook, agent, step, run)

templateEnv, err := CreateTemplateEnv(ctx, playbook, run)
if err != nil {
return nil, ctx.Oops().Wrapf(err, "failed to template env")
}

spec, err := getActionSpec(ctx, playbook, step.Name)
if err != nil {
return nil, ctx.Oops().Wrap(err)
}
if err := templateActionExpressions(ctx, run, step, spec, templateEnv); err != nil {
return nil, ctx.Oops().Wrap(err)
}

if spec.TemplatesOn == "" || spec.TemplatesOn == Main {
if err := TemplateAction(ctx, run, step, spec, templateEnv); err != nil {
return nil, ctx.Oops().Wrap(err)
}
}

output := ActionForAgent{
Action: *step, // step.status will still be waiting
Run: *run,
ActionSpec: *spec,
TemplateEnv: templateEnv,
}

return actionForAgent, ctx.Oops().Wrap(tx.Commit().Error)
if skip, err := filterAction(ctx, run.ID, spec.Filter); err != nil {
return nil, ctx.Oops().Wrap(err)
} else {
// We run the filter on the upstream and simply send the filter result to the agent.
spec.Filter = strconv.FormatBool(!skip)
}
// Update the step.status to Running, so that the action is locked and running only on the agent that polled it
if err := step.Start(ctx.DB()); err != nil {
return nil, ctx.Oops().Wrap(err)
}

return &output, ctx.Oops().Wrap(tx.Commit().Error)
}

func getAgentAction(ctx context.Context, agent *models.Agent, step *models.PlaybookRunAction) (*ActionForAgent, error) {
Expand Down Expand Up @@ -95,6 +176,7 @@ func getAgentAction(ctx context.Context, agent *models.Agent, step *models.Playb
// We run the filter on the upstream and simply send the filter result to the agent.
spec.Filter = strconv.FormatBool(!skip)
}

// Update the step.status to Running, so that the action is locked and running only on the agent that polled it
if err := step.Start(ctx.DB()); err != nil {
return nil, ctx.Oops().Wrap(err)
Expand Down
56 changes: 11 additions & 45 deletions playbook/runner/longpoll.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,63 +2,29 @@ package runner

import (
"encoding/json"
"sync"
"time"
)

// Global instance
var ActionMgr = NewActionNotifyManager(make(chan string))
"github.com/flanksource/duty/postq/pg"
)

var LongpollTimeout = time.Minute

type actionNotifyManager struct {
ch chan string
mu *sync.RWMutex
subscriptions map[string]chan string
}

func NewActionNotifyManager(ch chan string) *actionNotifyManager {
return &actionNotifyManager{
ch: ch,
mu: &sync.RWMutex{},
subscriptions: make(map[string]chan string),
}
}
// Global instance
var ActionNotifyRouter = pg.NewNotifyRouter().WithRouteExtractor(playbookActionNotifyRouteExtractor)

Check failure on line 13 in playbook/runner/longpoll.go

View workflow job for this annotation

GitHub Actions / lint

pg.NewNotifyRouter().WithRouteExtractor undefined (type *pg.notifyRouter has no field or method WithRouteExtractor) (typecheck)

Check failure on line 13 in playbook/runner/longpoll.go

View workflow job for this annotation

GitHub Actions / lint

pg.NewNotifyRouter().WithRouteExtractor undefined (type *pg.notifyRouter has no field or method WithRouteExtractor)) (typecheck)

Check failure on line 13 in playbook/runner/longpoll.go

View workflow job for this annotation

GitHub Actions / lint

pg.NewNotifyRouter().WithRouteExtractor undefined (type *pg.notifyRouter has no field or method WithRouteExtractor)) (typecheck)

Check failure on line 13 in playbook/runner/longpoll.go

View workflow job for this annotation

GitHub Actions / lint

pg.NewNotifyRouter().WithRouteExtractor undefined (type *pg.notifyRouter has no field or method WithRouteExtractor)) (typecheck)

type playbookActionNotifyPayload struct {
ID string `json:"id"`
AgentID string `json:"agent_id"`
}

func (d *actionNotifyManager) Chan() chan<- string {
return d.ch
}

func (d *actionNotifyManager) Listen() {
for payload := range d.ch {
var action playbookActionNotifyPayload
if err := json.Unmarshal([]byte(payload), &action); err != nil {
continue
}

d.mu.RLock()
if e, ok := d.subscriptions[action.AgentID]; ok {
e <- action.ID
}
d.mu.RUnlock()
}
}

func (d *actionNotifyManager) Register(agentID string) chan string {
d.mu.Lock()
defer d.mu.Unlock()

if e, ok := d.subscriptions[agentID]; ok {
return e
func playbookActionNotifyRouteExtractor(payload string) (string, string, error) {
var p playbookActionNotifyPayload
if err := json.Unmarshal([]byte(payload), &p); err != nil {
return "", "", err
}

ch := make(chan string)
d.subscriptions[agentID] = ch
route := p.AgentID
extractedPayload := p.ID

return ch
return route, extractedPayload, nil
}
2 changes: 1 addition & 1 deletion upstream/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func handlePlaybookActionRequest(c echo.Context) error {
ctx := c.Request().Context().(context.Context)

agent := ctx.Agent()
response, err := runner.GetActionForAgent(ctx, agent)
response, err := runner.GetActionForAgentWithWait(ctx, agent)
if err != nil {
return dutyAPI.WriteError(c, err)
}
Expand Down

0 comments on commit 4c8c62c

Please sign in to comment.