Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: longpoll for playbook runners #1374

Merged
merged 5 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ require (
github.com/samber/oops v1.13.1
github.com/samber/slog-echo v1.14.4
github.com/santhosh-tekuri/jsonschema/v6 v6.0.1
github.com/sethvargo/go-retry v0.3.0
github.com/slack-go/slack v0.14.0
github.com/tg123/go-htpasswd v1.2.2
github.com/timberio/go-datemath v0.1.0
Expand Down Expand Up @@ -207,7 +208,6 @@ require (
github.com/rodaine/table v1.3.0 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
github.com/sergi/go-diff v1.3.2-0.20230802210424-5b0b94c5c0d3 // indirect
github.com/sethvargo/go-retry v0.3.0 // indirect
github.com/shirou/gopsutil/v3 v3.24.5 // indirect
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/shurcooL/githubv4 v0.0.0-20190718010115-4ba037080260 // indirect
Expand Down
25 changes: 24 additions & 1 deletion jobs/jobs.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
package jobs

import (
gocontext "context"
"fmt"
"strings"
"time"

"github.com/flanksource/commons/logger"
"github.com/flanksource/duty/context"
"github.com/flanksource/duty/job"
"github.com/flanksource/duty/query"
"github.com/flanksource/incident-commander/api"
"github.com/flanksource/incident-commander/incidents"
"github.com/robfig/cron/v3"
"github.com/sethvargo/go-retry"
)

const (
Expand All @@ -24,7 +30,22 @@ var agentJobs = []*job.Job{
ReconcileAll,
SyncArtifactData,
PushPlaybookActions,
PullPlaybookActions,
}

func RunPullPlaybookActionsJob(ctx context.Context) {
for {
job := PullPlaybookActions(ctx)
backoff := retry.WithMaxRetries(10, retry.NewExponential(time.Second))
_ = retry.Do(ctx, backoff, func(_ctx gocontext.Context) error {
job.Run()

if len(job.LastJob.Errors) != 0 {
return retry.RetryableError(fmt.Errorf("%s", strings.Join(job.LastJob.Errors, ", ")))
}

return nil
})
}
}

func Start(ctx context.Context) {
Expand Down Expand Up @@ -56,6 +77,8 @@ func Start(ctx context.Context) {
logger.Errorf("Failed to schedule %s: %v", j, err)
}
}

go RunPullPlaybookActionsJob(ctx)
}

cleanupStaleJobHistory.Context = ctx
Expand Down
27 changes: 15 additions & 12 deletions jobs/playbook.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package jobs

import (
"github.com/flanksource/duty/context"
"github.com/flanksource/duty/job"
"github.com/flanksource/incident-commander/api"

Expand All @@ -9,19 +10,21 @@ import (

// PullPlaybookActions periodically pulls playbook actions to run
// from the upstream
var PullPlaybookActions = &job.Job{
Name: "PullPlaybookActions",
Schedule: "@every 60s",
Retention: job.RetentionFailed,
JobHistory: true,
RunNow: true,
Singleton: false,
Fn: func(ctx job.JobRuntime) error {
ctx.History.ResourceType = job.ResourceTypePlaybook
ctx.History.ResourceID = api.UpstreamConf.Host
func PullPlaybookActions(ctx context.Context) *job.Job {
return &job.Job{
Name: "PullPlaybookActions",
Retention: job.RetentionFailed,
JobHistory: true,
RunNow: true,
Context: ctx,
Singleton: false,
Fn: func(ctx job.JobRuntime) error {
ctx.History.ResourceType = job.ResourceTypePlaybook
ctx.History.ResourceID = api.UpstreamConf.Host

return playbook.PullPlaybookAction(ctx, api.UpstreamConf)
},
return playbook.PullPlaybookAction(ctx, api.UpstreamConf)
},
}
}

// PullPlaybookActions pushes actions, that have been fully run, to the upstream
Expand Down
2 changes: 2 additions & 0 deletions playbook/run_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ func StartPlaybookConsumers(ctx context.Context) error {
go pg.Listen(ctx, pgNotifyPlaybookActionUpdates, actionAgentUpdatesPGNotifyChannel)
go actionAgentEventConsumer.Listen(ctx, actionAgentUpdatesPGNotifyChannel)

go runner.ActionNotifyRouter.Run(ctx, pgNotifyPlaybookActionUpdates)

return nil
}

Expand Down
28 changes: 27 additions & 1 deletion playbook/runner/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package runner
import (
"fmt"
"strconv"
"time"

"github.com/flanksource/duty/context"
"github.com/flanksource/duty/models"
Expand All @@ -19,7 +20,32 @@ type ActionForAgent struct {
TemplateEnv actions.TemplateEnv `json:"template_env"`
}

func GetActionForAgent(ctx context.Context, agent *models.Agent) (*ActionForAgent, error) {
func GetActionForAgentWithWait(ctx context.Context, agent *models.Agent) (*ActionForAgent, error) {
action, err := getActionForAgent(ctx, agent)
if err != nil {
return nil, err
}

if action != nil {
return action, err
}

// Go into waiting state
select {
case <-time.After(ctx.Properties().Duration("playbook.runner.longpoll.timeout", DefaultLongpollTimeout)):
return &ActionForAgent{}, nil

case <-ActionNotifyRouter.GetOrCreateChannel(agent.ID.String()):
action, err := getActionForAgent(ctx, agent)
if err != nil {
return nil, err
}

return action, err
}
}

func getActionForAgent(ctx context.Context, agent *models.Agent) (*ActionForAgent, error) {
tx := ctx.DB().Begin()
if tx.Error != nil {
return nil, fmt.Errorf("error initiating db tx: %w", tx.Error)
Expand Down
30 changes: 30 additions & 0 deletions playbook/runner/longpoll.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package runner

import (
"encoding/json"
"time"

"github.com/flanksource/duty/postq/pg"
)

var DefaultLongpollTimeout = time.Minute

// Global instance
var ActionNotifyRouter = pg.NewNotifyRouter().WithRouteExtractor(playbookActionNotifyRouteExtractor)

type playbookActionNotifyPayload struct {
ID string `json:"id"`
AgentID string `json:"agent_id"`
}

func playbookActionNotifyRouteExtractor(payload string) (string, string, error) {
var p playbookActionNotifyPayload
if err := json.Unmarshal([]byte(payload), &p); err != nil {
return "", "", err
}

route := p.AgentID
extractedPayload := p.ID

return route, extractedPayload, nil
}
2 changes: 1 addition & 1 deletion upstream/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func handlePlaybookActionRequest(c echo.Context) error {
ctx := c.Request().Context().(context.Context)

agent := ctx.Agent()
response, err := runner.GetActionForAgent(ctx, agent)
response, err := runner.GetActionForAgentWithWait(ctx, agent)
if err != nil {
logger.Warnf("failed to get action for agent: %+v", err)
return dutyAPI.WriteError(c, err)
Expand Down
Loading