diff --git a/api-docs-runners.yml b/api-docs-runners.yml deleted file mode 100644 index 1f834c43b..000000000 --- a/api-docs-runners.yml +++ /dev/null @@ -1,54 +0,0 @@ -swagger: '3.0' -info: - title: SEMAPHORE - description: Semaphore Runner API - version: "2.2.0" - -host: localhost:3000 - -consumes: - - application/json -produces: - - application/json - - text/plain; charset=utf-8 - -tags: - - name: authentication - description: Authentication, Logout & API Tokens - - name: project - description: Everything related to a project - - name: user - description: User-related API - -schemes: - - http - - https - -basePath: /api/runners - -definitions: - -paths: - /register: - post: - requestBody: - content: - application/json: - schema: - type: object - required: - - registrationToken - properties: - registrationToken: { type: string } - responses: - 200: - description: API Token - - /unregister: - post: - - /status: - put: - - /jobs: - get: \ No newline at end of file diff --git a/api-docs.yml b/api-docs.yml index 95e25e33f..e93c6ec07 100644 --- a/api-docs.yml +++ b/api-docs.yml @@ -470,6 +470,11 @@ definitions: position: type: integer + Runner: + type: object + properties: + token: + type: string Event: type: object @@ -1624,3 +1629,24 @@ paths: type: array items: $ref: "#/definitions/TaskOutput" + +# /runners: +# post: +# tags: +# - project +# summary: Starts a job +# parameters: +# - name: task +# in: body +# required: true +# schema: +# type: object +# properties: +# registration_token: +# type: string +# example: test123 +# responses: +# 201: +# description: Task queued +# schema: +# $ref: "#/definitions/Runner" diff --git a/api/helpers/helpers.go b/api/helpers/helpers.go index 10ec2f341..e3e911ed8 100644 --- a/api/helpers/helpers.go +++ b/api/helpers/helpers.go @@ -48,10 +48,10 @@ func GetIntParam(name string, w http.ResponseWriter, r *http.Request) (int, erro return intParam, nil } -//H just a string-to-anything map +// H just a string-to-anything map type H map[string]interface{} -//Bind decodes json into object +// Bind decodes json into object func Bind(w http.ResponseWriter, r *http.Request, out interface{}) bool { err := json.NewDecoder(r.Body).Decode(out) if err != nil { @@ -61,7 +61,7 @@ func Bind(w http.ResponseWriter, r *http.Request, out interface{}) bool { return err == nil } -//WriteJSON writes object as JSON +// WriteJSON writes object as JSON func WriteJSON(w http.ResponseWriter, code int, out interface{}) { w.Header().Set("content-type", "application/json") w.WriteHeader(code) diff --git a/api/router.go b/api/router.go index dbae5099c..b970f787c 100644 --- a/api/router.go +++ b/api/router.go @@ -2,6 +2,7 @@ package api import ( "fmt" + "github.com/ansible-semaphore/semaphore/api/runners" "net/http" "os" "strings" @@ -17,12 +18,13 @@ import ( var publicAssets2 = packr.NewBox("../web/dist") +// StoreMiddleware WTF? func StoreMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { store := helpers.Store(r) - var url = r.URL.String() + //var url = r.URL.String() - db.StoreSession(store, url, func() { + db.StoreSession(store, util.RandString(12), func() { next.ServeHTTP(w, r) }) }) @@ -81,11 +83,17 @@ func Route() *mux.Router { publicAPIRouter.Use(StoreMiddleware, JSONMiddleware) + publicAPIRouter.HandleFunc("/runners", runners.RegisterRunner).Methods("POST") publicAPIRouter.HandleFunc("/auth/login", login).Methods("GET", "POST") publicAPIRouter.HandleFunc("/auth/logout", logout).Methods("POST") publicAPIRouter.HandleFunc("/auth/oidc/{provider}/login", oidcLogin).Methods("GET") publicAPIRouter.HandleFunc("/auth/oidc/{provider}/redirect", oidcRedirect).Methods("GET") + routersAPI := r.PathPrefix(webPath + "api").Subrouter() + routersAPI.Use(StoreMiddleware, JSONMiddleware, runners.RunnerMiddleware) + routersAPI.Path("/runners/{runner_id}").HandlerFunc(runners.GetRunner).Methods("GET", "HEAD") + routersAPI.Path("/runners/{runner_id}").HandlerFunc(runners.UpdateRunner).Methods("PUT") + authenticatedWS := r.PathPrefix(webPath + "api").Subrouter() authenticatedWS.Use(JSONMiddleware, authenticationWithStore) authenticatedWS.Path("/ws").HandlerFunc(sockets.Handler).Methods("GET", "HEAD") diff --git a/api/runners/handler.go b/api/runners/handler.go deleted file mode 100644 index eb94f1bd9..000000000 --- a/api/runners/handler.go +++ /dev/null @@ -1,52 +0,0 @@ -package runners - -import ( - "github.com/ansible-semaphore/semaphore/api/helpers" - "github.com/ansible-semaphore/semaphore/db" - "github.com/ansible-semaphore/semaphore/util" - "github.com/gorilla/mux" - "net/http" - "strings" -) - -func RunnerRoute() *mux.Router { - r := mux.NewRouter() - - webPath := "/" - if util.WebHostURL != nil { - webPath = util.WebHostURL.Path - if !strings.HasSuffix(webPath, "/") { - webPath += "/" - } - } - - pingRouter := r.Path(webPath + "api/runners/register").Subrouter() - - pingRouter.Methods("POST", "HEAD").HandlerFunc(registerRunner) - - return r -} - -func registerRunner(w http.ResponseWriter, r *http.Request) { - var register struct { - RegistrationToken string `json:"registration_token" binding:"required"` - } - - if !helpers.Bind(w, r, ®ister) { - return - } - - if register.RegistrationToken != util.Config.RegistrationToken { - return - } - - runner, err := helpers.Store(r).CreateRunner(db.Runner{ - State: db.RunnerActive, - }) - - if err != nil { - return - } - - helpers.WriteJSON(w, http.StatusOK, runner) -} diff --git a/api/runners/runners.go b/api/runners/runners.go new file mode 100644 index 000000000..0b8ce5a9f --- /dev/null +++ b/api/runners/runners.go @@ -0,0 +1,154 @@ +package runners + +import ( + "github.com/ansible-semaphore/semaphore/api/helpers" + "github.com/ansible-semaphore/semaphore/db" + "github.com/ansible-semaphore/semaphore/services/runners" + "github.com/ansible-semaphore/semaphore/util" + "github.com/gorilla/context" + "net/http" +) + +func RunnerMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + + runnerID, err := helpers.GetIntParam("runner_id", w, r) + + if err != nil { + helpers.WriteJSON(w, http.StatusBadRequest, map[string]string{ + "error": "runner_id required", + }) + return + } + + store := helpers.Store(r) + + runner, err := store.GetGlobalRunner(runnerID) + + if err != nil { + helpers.WriteJSON(w, http.StatusNotFound, map[string]string{ + "error": "Runner not found", + }) + return + } + + context.Set(r, "runner", runner) + next.ServeHTTP(w, r) + }) +} + +func GetRunner(w http.ResponseWriter, r *http.Request) { + runner := context.Get(r, "runner").(db.Runner) + + data := runners.RunnerState{ + AccessKeys: make(map[int]db.AccessKey), + } + + tasks := helpers.TaskPool(r).GetRunningTasks() + + for _, tsk := range tasks { + if tsk.RunnerID != runner.ID { + continue + } + + if tsk.Task.Status == db.TaskRunningStatus { + + data.NewJobs = append(data.NewJobs, runners.JobData{ + Username: tsk.Username, + IncomingVersion: tsk.IncomingVersion, + Task: tsk.Task, + Template: tsk.Template, + Inventory: tsk.Inventory, + Repository: tsk.Repository, + Environment: tsk.Environment, + }) + + if tsk.Inventory.SSHKeyID != nil { + data.AccessKeys[*tsk.Inventory.SSHKeyID] = tsk.Inventory.SSHKey + } + + if tsk.Inventory.BecomeKeyID != nil { + data.AccessKeys[*tsk.Inventory.BecomeKeyID] = tsk.Inventory.BecomeKey + } + + data.AccessKeys[tsk.Repository.SSHKeyID] = tsk.Repository.SSHKey + + } else { + data.CurrentJobs = append(data.CurrentJobs, runners.JobState{ + ID: tsk.Task.ID, + Status: tsk.Task.Status, + }) + } + } + + helpers.WriteJSON(w, http.StatusOK, data) +} + +func UpdateRunner(w http.ResponseWriter, r *http.Request) { + var body runners.RunnerProgress + + if !helpers.Bind(w, r, &body) { + helpers.WriteJSON(w, http.StatusBadRequest, map[string]string{ + "error": "Invalid format", + }) + return + } + + taskPool := helpers.TaskPool(r) + + if body.Jobs == nil { + w.WriteHeader(http.StatusNoContent) + return + } + + for _, job := range body.Jobs { + tsk := taskPool.GetTask(job.ID) + + if tsk == nil { + // TODO: log + continue + } + + for _, logRecord := range job.LogRecords { + tsk.Log2(logRecord.Message, logRecord.Time) + } + } + + w.WriteHeader(http.StatusNoContent) +} + +func RegisterRunner(w http.ResponseWriter, r *http.Request) { + var register runners.RunnerRegistration + + if !helpers.Bind(w, r, ®ister) { + helpers.WriteJSON(w, http.StatusBadRequest, map[string]string{ + "error": "Invalid format", + }) + return + } + + if util.Config.RunnerRegistrationToken == "" || register.RegistrationToken != util.Config.RunnerRegistrationToken { + helpers.WriteJSON(w, http.StatusBadRequest, map[string]string{ + "error": "Invalid registration token", + }) + return + } + + runner, err := helpers.Store(r).CreateRunner(db.Runner{ + State: db.RunnerActive, + }) + + if err != nil { + helpers.WriteJSON(w, http.StatusInternalServerError, map[string]string{ + "error": "Unexpected error", + }) + return + } + + res := runners.RunnerConfig{ + RunnerID: runner.ID, + Token: runner.Token, + } + + helpers.WriteJSON(w, http.StatusOK, res) +} diff --git a/cli/cmd/root.go b/cli/cmd/root.go index a79bb645d..90b206d08 100644 --- a/cli/cmd/root.go +++ b/cli/cmd/root.go @@ -95,16 +95,6 @@ func createStore(token string) db.Store { store.Connect(token) - //if err := store.Connect(token); err != nil { - // switch err { - // case bbolt.ErrTimeout: - // fmt.Println("\n BoltDB supports only one connection at a time. You should stop Semaphore to use CLI.") - // default: - // fmt.Println("\n Have you run `semaphore setup`?") - // } - // os.Exit(1) - //} - err := db.Migrate(store) if err != nil { diff --git a/cli/cmd/runner.go b/cli/cmd/runner.go index 229c6cb7e..c2270d6df 100644 --- a/cli/cmd/runner.go +++ b/cli/cmd/runner.go @@ -2,6 +2,7 @@ package cmd import ( "github.com/ansible-semaphore/semaphore/services/runners" + "github.com/ansible-semaphore/semaphore/util" "github.com/spf13/cobra" ) @@ -10,10 +11,11 @@ func init() { } func runRunner() { + util.ConfigInit(configPath) taskPool := runners.JobPool{} - go taskPool.Run() + taskPool.Run() } var runnerCmd = &cobra.Command{ diff --git a/db/Runner.go b/db/Runner.go index 4057c49bb..8cefc2d88 100644 --- a/db/Runner.go +++ b/db/Runner.go @@ -9,7 +9,7 @@ const ( type Runner struct { ID int `db:"id" json:"-"` - Token string `db:"token" json:"token"` + Token string `db:"token" json:"-"` ProjectID *int `db:"project_id" json:"project_id"` State RunnerState `db:"state" json:"state"` } diff --git a/db/Task.go b/db/Task.go index c09347648..d7159fa0e 100644 --- a/db/Task.go +++ b/db/Task.go @@ -15,15 +15,15 @@ const ( TaskFailStatus TaskStatus = "error" ) -//Task is a model of a task which will be executed by the runner +// Task is a model of a task which will be executed by the runner type Task struct { ID int `db:"id" json:"id"` TemplateID int `db:"template_id" json:"template_id" binding:"required"` ProjectID int `db:"project_id" json:"project_id"` Status TaskStatus `db:"status" json:"status"` - Debug bool `db:"debug" json:"debug"` + Debug bool `db:"debug" json:"debug"` DryRun bool `db:"dry_run" json:"dry_run"` Diff bool `db:"diff" json:"diff"` diff --git a/db/bolt/BoltDb.go b/db/bolt/BoltDb.go index 9c8cb3746..04faf1dce 100644 --- a/db/bolt/BoltDb.go +++ b/db/bolt/BoltDb.go @@ -83,7 +83,7 @@ func (d *BoltDb) Connect(token string) { if _, exists := d.connections[token]; exists { // Use for debugging - //panic(fmt.Errorf("Connection " + token + " already exists")) + panic(fmt.Errorf("Connection " + token + " already exists")) } if len(d.connections) > 0 { @@ -122,7 +122,7 @@ func (d *BoltDb) Close(token string) { if !exists { // Use for debugging - //panic(fmt.Errorf("can not close closed connection " + token)) + panic(fmt.Errorf("can not close closed connection " + token)) } if len(d.connections) > 1 { diff --git a/db/bolt/runner.go b/db/bolt/runner.go index bb91ffbbe..0623a4777 100644 --- a/db/bolt/runner.go +++ b/db/bolt/runner.go @@ -1,6 +1,9 @@ package bolt -import "github.com/ansible-semaphore/semaphore/db" +import ( + "github.com/ansible-semaphore/semaphore/db" + "github.com/ansible-semaphore/semaphore/util" +) func (d *BoltDb) GetRunner(projectID int, runnerID int) (runner db.Runner, err error) { return @@ -15,10 +18,13 @@ func (d *BoltDb) DeleteRunner(projectID int, runnerID int) (err error) { } func (d *BoltDb) GetGlobalRunner(runnerID int) (runner db.Runner, err error) { + err = d.getObject(0, db.RunnerProps, intObjectID(runnerID), &runner) + return } func (d *BoltDb) GetGlobalRunners() (runners []db.Runner, err error) { + err = d.getObjects(0, db.RunnerProps, db.RetrieveQueryParams{}, nil, &runners) return } @@ -31,5 +37,12 @@ func (d *BoltDb) UpdateRunner(runner db.Runner) (err error) { } func (d *BoltDb) CreateRunner(runner db.Runner) (newRunner db.Runner, err error) { + runner.Token = util.RandString(12) + + res, err := d.createObject(0, db.RunnerProps, runner) + if err != nil { + return + } + newRunner = res.(db.Runner) return } diff --git a/lib/Logger.go b/lib/Logger.go index 3a65668e0..e2610da30 100644 --- a/lib/Logger.go +++ b/lib/Logger.go @@ -1,8 +1,12 @@ package lib -import "os/exec" +import ( + "os/exec" + "time" +) type Logger interface { Log(msg string) + Log2(msg string, now time.Time) LogCmd(cmd *exec.Cmd) } diff --git a/services/runners/JobPool.go b/services/runners/JobPool.go index 72400c91a..9222c419a 100644 --- a/services/runners/JobPool.go +++ b/services/runners/JobPool.go @@ -6,21 +6,26 @@ package runners import ( + "bufio" + "bytes" "encoding/json" "fmt" log "github.com/Sirupsen/logrus" "github.com/ansible-semaphore/semaphore/db" + "github.com/ansible-semaphore/semaphore/lib" "github.com/ansible-semaphore/semaphore/services/tasks" + "github.com/ansible-semaphore/semaphore/util" "io/ioutil" "net/http" + "os" + "os/exec" "strconv" "time" ) -type logRecord struct { - job *job - output string - time time.Time +type jobLogRecord struct { + taskID int + record LogRecord } type resourceLock struct { @@ -30,74 +35,128 @@ type resourceLock struct { // job presents current job on semaphore server. type job struct { + username string + incomingVersion *string // job presents remote or local job information - job *tasks.LocalAnsibleJob + job *tasks.LocalJob Status db.TaskStatus - kind jobType args []string environmentVars []string - id int } -type jobType int +type RunnerConfig struct { + RunnerID int `json:"runner_id"` + Token string `json:"token"` +} -type Response struct { - Message string `json:"message"` - Status int `json:"status"` +type JobData struct { + Username string + IncomingVersion *string + Task db.Task `json:"task" binding:"required"` + Template db.Template `json:"template" binding:"required"` + Inventory db.Inventory `json:"inventory" binding:"required"` + Repository db.Repository `json:"repository" binding:"required"` + Environment db.Environment `json:"environment" binding:"required"` } -const ( - playbook jobType = iota - galaxy -) +type RunnerState struct { + CurrentJobs []JobState + NewJobs []JobData `json:"new_jobs" binding:"required"` + AccessKeys map[int]db.AccessKey `json:"access_keys" binding:"required"` +} -func (j *job) run() { - var err error - switch j.kind { - case playbook: - err = j.job.RunPlaybook(j.args, &j.environmentVars, nil) - case galaxy: - err = j.job.RunGalaxy(j.args) - default: - panic("Unknown job type") - } +type JobState struct { + ID int `json:"id" binding:"required"` + Status db.TaskStatus `json:"status" binding:"required"` +} - if err != nil { - // TODO: some logging - } +type LogRecord struct { + Time time.Time `json:"time" binding:"required"` + Message string `json:"message" binding:"required"` +} + +type RunnerProgress struct { + Jobs []JobProgress `json:"jobs" binding:"required"` +} + +type JobProgress struct { + ID int + Status db.TaskStatus + LogRecords []LogRecord +} + +type runningJob struct { + status db.TaskStatus + logRecords []LogRecord } type JobPool struct { // logger channel used to putting log records to database. - logger chan logRecord + logger chan jobLogRecord // register channel used to put tasks to queue. register chan *job resourceLocker chan *resourceLock - logRecords []logRecord + runningJobs map[int]*runningJob queue []*job + + config *RunnerConfig +} + +type RunnerRegistration struct { + RegistrationToken string `json:"registration_token" binding:"required"` +} + +func (p *runningJob) Log2(msg string, now time.Time) { + p.logRecords = append(p.logRecords, LogRecord{Time: now, Message: msg}) +} + +func (p *runningJob) Log(msg string) { + p.Log2(msg, time.Now()) +} + +func (p *runningJob) LogCmd(cmd *exec.Cmd) { + stderr, _ := cmd.StderrPipe() + stdout, _ := cmd.StdoutPipe() + + go p.logPipe(bufio.NewReader(stderr)) + go p.logPipe(bufio.NewReader(stdout)) +} + +func (p *runningJob) logPipe(reader *bufio.Reader) { + + line, err := tasks.Readln(reader) + for err == nil { + p.Log(line) + line, err = tasks.Readln(reader) + } + + if err != nil && err.Error() != "EOF" { + //don't panic on these errors, sometimes it throws not dangerous "read |0: file already closed" error + util.LogWarningWithFields(err, log.Fields{"error": "Failed to read TaskRunner output"}) + } + } func (p *JobPool) Run() { - ticker := time.NewTicker(5 * time.Second) + queueTicker := time.NewTicker(5 * time.Second) + requestTimer := time.NewTicker(5 * time.Second) + p.runningJobs = make(map[int]*runningJob) defer func() { - ticker.Stop() + queueTicker.Stop() }() for { select { - case record := <-p.logger: // new log message which should be put to database - p.logRecords = append(p.logRecords, record) - - case job := <-p.register: // new task created by API or schedule - p.queue = append(p.queue, job) + //case j := <-p.register: // new task created by API or schedule + // p.queue = append(p.queue, j) - case <-ticker.C: // timer 5 seconds: get task from queue and run it + case <-queueTicker.C: // timer 5 seconds: get task from queue and run it if len(p.queue) == 0 { break } @@ -106,25 +165,169 @@ func (p *JobPool) Run() { if t.Status == db.TaskFailStatus { //delete failed TaskRunner from queue p.queue = p.queue[1:] - log.Info("Task " + strconv.Itoa(t.id) + " removed from queue") + log.Info("Task " + strconv.Itoa(t.job.Task.ID) + " removed from queue") break } - log.Info("Set resource locker with TaskRunner " + strconv.Itoa(t.id)) - p.resourceLocker <- &resourceLock{lock: true, holder: t} + //log.Info("Set resource locker with TaskRunner " + strconv.Itoa(t.id)) + //p.resourceLocker <- &resourceLock{lock: true, holder: t} + + p.runningJobs[t.job.Task.ID] = &runningJob{} + + t.job.Logger = p.runningJobs[t.job.Task.ID] + t.job.Playbook.Logger = t.job.Logger + + go t.job.Run(t.username, t.incomingVersion) - go t.run() p.queue = p.queue[1:] - log.Info("Task " + strconv.Itoa(t.id) + " removed from queue") + log.Info("Task " + strconv.Itoa(t.job.Task.ID) + " removed from queue") + + case <-requestTimer.C: + + go p.checkNewJobs() + go p.sendProgress() + + } + } +} + +func (p *JobPool) sendProgress() { + + if !p.tryRegisterRunner() { + return + } + + client := &http.Client{} + + url := util.Config.Runner.ApiURL + "/runners/" + strconv.Itoa(p.config.RunnerID) + + body := RunnerProgress{ + Jobs: nil, + } + + for id, j := range p.runningJobs { + body.Jobs = append(body.Jobs, JobProgress{ + ID: id, + LogRecords: j.logRecords, + Status: j.status, + }) + + // TODO: clean logs + } + + jsonBytes, err := json.Marshal(body) + + req, err := http.NewRequest("PUT", url, bytes.NewBuffer(jsonBytes)) + if err != nil { + fmt.Println("Error creating request:", err) + return + } + + resp, err := client.Do(req) + if err != nil { + fmt.Println("Error making request:", err) + return + } + + defer resp.Body.Close() +} + +func (p *JobPool) tryRegisterRunner() bool { + if p.config != nil { + return true + } + + _, err := os.Stat(util.Config.Runner.ConfigFile) + + if err == nil { + configBytes, err := os.ReadFile(util.Config.Runner.ConfigFile) + + if err != nil { + panic(err) } + + var config RunnerConfig + + err = json.Unmarshal(configBytes, &config) + + if err != nil { + panic(err) + } + + p.config = &config + + return true + } + + if !os.IsNotExist(err) { + panic(err) + } + + if util.Config.Runner.RegistrationToken == "" { + panic("registration token cannot be empty") } + + client := &http.Client{} + + url := util.Config.Runner.ApiURL + "/runners" + + jsonBytes, err := json.Marshal(RunnerRegistration{ + RegistrationToken: util.Config.Runner.RegistrationToken, + }) + + req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonBytes)) + if err != nil { + fmt.Println("Error creating request:", err) + return false + } + + resp, err := client.Do(req) + if err != nil || resp.StatusCode != 200 { + fmt.Println("Error making request:", err) + return false + } + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + fmt.Println("Error reading response body:", err) + return false + } + + var config RunnerConfig + err = json.Unmarshal(body, &config) + if err != nil { + fmt.Println("Error parsing JSON:", err) + return false + } + + configBytes, err := json.Marshal(config) + + if err != nil { + panic("cannot save runner config") + } + + err = os.WriteFile(util.Config.Runner.ConfigFile, configBytes, 0644) + + p.config = &config + + defer resp.Body.Close() + + return true } // checkNewJobs tries to find runner to queued jobs func (p *JobPool) checkNewJobs() { + + if !p.tryRegisterRunner() { + return + } + client := &http.Client{} - url := "https://example.com" + + url := util.Config.Runner.ApiURL + "/runners/" + strconv.Itoa(p.config.RunnerID) + req, err := http.NewRequest("GET", url, nil) + if err != nil { fmt.Println("Error creating request:", err) return @@ -143,16 +346,45 @@ func (p *JobPool) checkNewJobs() { return } - var response Response + var response RunnerState err = json.Unmarshal(body, &response) if err != nil { fmt.Println("Error parsing JSON:", err) return } - taskRunner := job{ - job: &tasks.LocalAnsibleJob{}, - } + for _, newJob := range response.NewJobs { + if _, exists := p.runningJobs[newJob.Task.ID]; exists { + continue + } + + taskRunner := job{ + username: newJob.Username, + incomingVersion: newJob.IncomingVersion, - p.register <- &taskRunner + job: &tasks.LocalJob{ + Task: newJob.Task, + Template: newJob.Template, + Inventory: newJob.Inventory, + Repository: newJob.Repository, + Environment: newJob.Environment, + Playbook: &lib.AnsiblePlaybook{ + TemplateID: newJob.Template.ID, + Repository: newJob.Repository, + }, + }, + } + + taskRunner.job.Repository.SSHKey = response.AccessKeys[taskRunner.job.Repository.SSHKeyID] + + if taskRunner.job.Inventory.SSHKeyID != nil { + taskRunner.job.Inventory.SSHKey = response.AccessKeys[*taskRunner.job.Inventory.SSHKeyID] + } + + if taskRunner.job.Inventory.BecomeKeyID != nil { + taskRunner.job.Inventory.BecomeKey = response.AccessKeys[*taskRunner.job.Inventory.BecomeKeyID] + } + + p.queue = append(p.queue, &taskRunner) + } } diff --git a/services/tasks/AnsibleJob.go b/services/tasks/AnsibleJob.go deleted file mode 100644 index 6683743de..000000000 --- a/services/tasks/AnsibleJob.go +++ /dev/null @@ -1,23 +0,0 @@ -package tasks - -import ( - "github.com/ansible-semaphore/semaphore/lib" - "os" -) - -type AnsibleJob interface { - RunGalaxy(args []string) error - RunPlaybook(args []string, environmentVars *[]string, cb func(*os.Process)) error -} - -type LocalAnsibleJob struct { - playbook *lib.AnsiblePlaybook -} - -func (j *LocalAnsibleJob) RunGalaxy(args []string) error { - return j.playbook.RunGalaxy(args) -} - -func (j *LocalAnsibleJob) RunPlaybook(args []string, environmentVars *[]string, cb func(*os.Process)) error { - return j.playbook.RunPlaybook(args, environmentVars, cb) -} diff --git a/services/tasks/LocalJob.go b/services/tasks/LocalJob.go new file mode 100644 index 000000000..860b4098d --- /dev/null +++ b/services/tasks/LocalJob.go @@ -0,0 +1,476 @@ +package tasks + +import ( + "encoding/json" + "fmt" + "github.com/ansible-semaphore/semaphore/db" + "github.com/ansible-semaphore/semaphore/lib" + "github.com/ansible-semaphore/semaphore/util" + "os" + "path" + "strconv" +) + +type LocalJob struct { + // Received constant fields + Task db.Task + Template db.Template + Inventory db.Inventory + Repository db.Repository + Environment db.Environment + Playbook *lib.AnsiblePlaybook + Logger lib.Logger + + // Internal field + Process *os.Process +} + +func (t *LocalJob) Kill() { + if t.Process == nil { + panic("running process can not be nil") + } + err := t.Process.Kill() + if err != nil { + t.Log(err.Error()) + } +} + +func (t *LocalJob) Log(msg string) { + t.Logger.Log(msg) +} + +func (t *LocalJob) getEnvironmentExtraVars(username string, incomingVersion *string) (str string, err error) { + extraVars := make(map[string]interface{}) + + if t.Environment.JSON != "" { + err = json.Unmarshal([]byte(t.Environment.JSON), &extraVars) + if err != nil { + return + } + } + + taskDetails := make(map[string]interface{}) + + taskDetails["id"] = t.Task.ID + + if t.Task.Message != "" { + taskDetails["message"] = t.Task.Message + } + + taskDetails["username"] = username + + if t.Template.Type != db.TemplateTask { + taskDetails["type"] = t.Template.Type + if incomingVersion != nil { + taskDetails["incoming_version"] = incomingVersion + } + if t.Template.Type == db.TemplateBuild { + taskDetails["target_version"] = t.Task.Version + } + } + + vars := make(map[string]interface{}) + vars["task_details"] = taskDetails + extraVars["semaphore_vars"] = vars + + ev, err := json.Marshal(extraVars) + if err != nil { + return + } + + str = string(ev) + + return +} + +func (t *LocalJob) getEnvironmentENV() (arr []string, err error) { + environmentVars := make(map[string]string) + + if t.Environment.ENV != nil { + err = json.Unmarshal([]byte(*t.Environment.ENV), &environmentVars) + if err != nil { + return + } + } + + for key, val := range environmentVars { + arr = append(arr, fmt.Sprintf("%s=%s", key, val)) + } + + return +} + +// nolint: gocyclo +func (t *LocalJob) getPlaybookArgs(username string, incomingVersion *string) (args []string, err error) { + playbookName := t.Task.Playbook + if playbookName == "" { + playbookName = t.Template.Playbook + } + + var inventory string + switch t.Inventory.Type { + case db.InventoryFile: + inventory = t.Inventory.Inventory + case db.InventoryStatic, db.InventoryStaticYaml: + inventory = util.Config.TmpPath + "/inventory_" + strconv.Itoa(t.Task.ID) + if t.Inventory.Type == db.InventoryStaticYaml { + inventory += ".yml" + } + default: + err = fmt.Errorf("invalid invetory type") + return + } + + args = []string{ + "-i", inventory, + } + + if t.Inventory.SSHKeyID != nil { + switch t.Inventory.SSHKey.Type { + case db.AccessKeySSH: + args = append(args, "--private-key="+t.Inventory.SSHKey.GetPath()) + //args = append(args, "--extra-vars={\"ansible_ssh_private_key_file\": \""+t.inventory.SSHKey.GetPath()+"\"}") + if t.Inventory.SSHKey.SshKey.Login != "" { + args = append(args, "--extra-vars={\"ansible_user\": \""+t.Inventory.SSHKey.SshKey.Login+"\"}") + } + case db.AccessKeyLoginPassword: + args = append(args, "--extra-vars=@"+t.Inventory.SSHKey.GetPath()) + case db.AccessKeyNone: + default: + err = fmt.Errorf("access key does not suite for inventory's user credentials") + return + } + } + + if t.Inventory.BecomeKeyID != nil { + switch t.Inventory.BecomeKey.Type { + case db.AccessKeyLoginPassword: + args = append(args, "--extra-vars=@"+t.Inventory.BecomeKey.GetPath()) + case db.AccessKeyNone: + default: + err = fmt.Errorf("access key does not suite for inventory's sudo user credentials") + return + } + } + + if t.Task.Debug { + args = append(args, "-vvvv") + } + + if t.Task.Diff { + args = append(args, "--diff") + } + + if t.Task.DryRun { + args = append(args, "--check") + } + + if t.Template.VaultKeyID != nil { + args = append(args, "--vault-password-file", t.Template.VaultKey.GetPath()) + } + + extraVars, err := t.getEnvironmentExtraVars(username, incomingVersion) + if err != nil { + t.Log(err.Error()) + t.Log("Could not remove command environment, if existant it will be passed to --extra-vars. This is not fatal but be aware of side effects") + } else if extraVars != "" { + args = append(args, "--extra-vars", extraVars) + } + + var templateExtraArgs []string + if t.Template.Arguments != nil { + err = json.Unmarshal([]byte(*t.Template.Arguments), &templateExtraArgs) + if err != nil { + t.Log("Invalid format of the template extra arguments, must be valid JSON") + return + } + } + + var taskExtraArgs []string + if t.Template.AllowOverrideArgsInTask && t.Task.Arguments != nil { + err = json.Unmarshal([]byte(*t.Task.Arguments), &taskExtraArgs) + if err != nil { + t.Log("Invalid format of the TaskRunner extra arguments, must be valid JSON") + return + } + } + + if t.Task.Limit != "" { + t.Log("--limit=" + t.Task.Limit) + taskExtraArgs = append(taskExtraArgs, "--limit="+t.Task.Limit) + } + + args = append(args, templateExtraArgs...) + args = append(args, taskExtraArgs...) + args = append(args, playbookName) + + return +} + +func (t *LocalJob) destroyKeys() { + err := t.Inventory.SSHKey.Destroy() + if err != nil { + t.Log("Can't destroy inventory user key, error: " + err.Error()) + } + + err = t.Inventory.BecomeKey.Destroy() + if err != nil { + t.Log("Can't destroy inventory become user key, error: " + err.Error()) + } + + err = t.Template.VaultKey.Destroy() + if err != nil { + t.Log("Can't destroy inventory vault password file, error: " + err.Error()) + } +} + +func (t *LocalJob) Run(username string, incomingVersion *string) (err error) { + err = t.prepareRun() + if err != nil { + return err + } + + defer func() { + t.destroyKeys() + }() + + args, err := t.getPlaybookArgs(username, incomingVersion) + if err != nil { + return + } + + environmentVariables, err := t.getEnvironmentENV() + if err != nil { + return + } + + return t.Playbook.RunPlaybook(args, &environmentVariables, func(p *os.Process) { + t.Process = p + }) + +} + +func (t *LocalJob) prepareRun() error { + defer func() { + //t.pool.resourceLocker <- &resourceLock{lock: false, holder: t} + + //log.Info("Stopped preparing TaskRunner " + strconv.Itoa(t.task.ID)) + //log.Info("Release resource locker with TaskRunner " + strconv.Itoa(t.task.ID)) + // + //t.createTaskEvent() + + err := t.Repository.SSHKey.Destroy() + if err != nil { + t.Log("Can't destroy repository access key, error: " + err.Error()) + } + }() + + t.Log("Preparing: " + strconv.Itoa(t.Task.ID)) + + if err := checkTmpDir(util.Config.TmpPath); err != nil { + t.Log("Creating tmp dir failed: " + err.Error()) + return err + } + + if t.Repository.GetType() == db.RepositoryLocal { + if _, err := os.Stat(t.Repository.GitURL); err != nil { + t.Log("Failed in finding static repository at " + t.Repository.GitURL + ": " + err.Error()) + return err + } + } else { + if err := t.updateRepository(); err != nil { + t.Log("Failed updating repository: " + err.Error()) + return err + } + if err := t.checkoutRepository(); err != nil { + t.Log("Failed to checkout repository to required commit: " + err.Error()) + return err + } + } + + if err := t.installInventory(); err != nil { + t.Log("Failed to install inventory: " + err.Error()) + return err + } + + if err := t.installRequirements(); err != nil { + t.Log("Running galaxy failed: " + err.Error()) + return err + } + + if err := t.installVaultKeyFile(); err != nil { + t.Log("Failed to install vault password file: " + err.Error()) + return err + } + + return nil +} + +func (t *LocalJob) updateRepository() error { + repo := lib.GitRepository{ + Logger: t.Logger, + TemplateID: t.Template.ID, + Repository: t.Repository, + Client: lib.CreateDefaultGitClient(), + } + + err := repo.ValidateRepo() + + if err != nil { + if !os.IsNotExist(err) { + err = os.RemoveAll(repo.GetFullPath()) + if err != nil { + return err + } + } + return repo.Clone() + } + + if repo.CanBePulled() { + err = repo.Pull() + if err == nil { + return nil + } + } + + err = os.RemoveAll(repo.GetFullPath()) + if err != nil { + return err + } + + return repo.Clone() +} + +func (t *LocalJob) checkoutRepository() error { + + repo := lib.GitRepository{ + Logger: t.Logger, + TemplateID: t.Template.ID, + Repository: t.Repository, + Client: lib.CreateDefaultGitClient(), + } + + err := repo.ValidateRepo() + + if err != nil { + return err + } + + if t.Task.CommitHash != nil { + // checkout to commit if it is provided for TaskRunner + return repo.Checkout(*t.Task.CommitHash) + } + + // store commit to TaskRunner table + + //commitHash, err := repo.GetLastCommitHash() + // + //if err != nil { + // return err + //} + // + //commitMessage, _ := repo.GetLastCommitMessage() + // + //t.task.CommitHash = &commitHash + //t.task.CommitMessage = commitMessage + // + //return t.pool.store.UpdateTask(t.task) + return nil +} + +func (t *LocalJob) installRequirements() error { + if err := t.installCollectionsRequirements(); err != nil { + return err + } + if err := t.installRolesRequirements(); err != nil { + return err + } + return nil +} + +func (t *LocalJob) getRepoPath() string { + repo := lib.GitRepository{ + Logger: t.Logger, + TemplateID: t.Template.ID, + Repository: t.Repository, + Client: lib.CreateDefaultGitClient(), + } + + return repo.GetFullPath() +} + +func (t *LocalJob) installRolesRequirements() error { + requirementsFilePath := fmt.Sprintf("%s/roles/requirements.yml", t.getRepoPath()) + requirementsHashFilePath := fmt.Sprintf("%s.md5", requirementsFilePath) + + if _, err := os.Stat(requirementsFilePath); err != nil { + t.Log("No roles/requirements.yml file found. Skip galaxy install process.\n") + return nil + } + + if hasRequirementsChanges(requirementsFilePath, requirementsHashFilePath) { + if err := t.runGalaxy([]string{ + "role", + "install", + "-r", + requirementsFilePath, + "--force", + }); err != nil { + return err + } + if err := writeMD5Hash(requirementsFilePath, requirementsHashFilePath); err != nil { + return err + } + } else { + t.Log("roles/requirements.yml has no changes. Skip galaxy install process.\n") + } + + return nil +} + +func (t *LocalJob) getPlaybookDir() string { + playbookPath := path.Join(t.getRepoPath(), t.Template.Playbook) + + return path.Dir(playbookPath) +} + +func (t *LocalJob) installCollectionsRequirements() error { + requirementsFilePath := path.Join(t.getPlaybookDir(), "collections", "requirements.yml") + requirementsHashFilePath := fmt.Sprintf("%s.md5", requirementsFilePath) + + if _, err := os.Stat(requirementsFilePath); err != nil { + t.Log("No collections/requirements.yml file found. Skip galaxy install process.\n") + return nil + } + + if hasRequirementsChanges(requirementsFilePath, requirementsHashFilePath) { + if err := t.runGalaxy([]string{ + "collection", + "install", + "-r", + requirementsFilePath, + "--force", + }); err != nil { + return err + } + if err := writeMD5Hash(requirementsFilePath, requirementsHashFilePath); err != nil { + return err + } + } else { + t.Log("collections/requirements.yml has no changes. Skip galaxy install process.\n") + } + + return nil +} + +func (t *LocalJob) runGalaxy(args []string) error { + return t.Playbook.RunGalaxy(args) +} + +func (t *LocalJob) installVaultKeyFile() error { + if t.Template.VaultKeyID == nil { + return nil + } + + return t.Template.VaultKey.Install(db.AccessKeyRoleAnsiblePasswordVault) +} diff --git a/services/tasks/RemoteJob.go b/services/tasks/RemoteJob.go new file mode 100644 index 000000000..d8ba18e96 --- /dev/null +++ b/services/tasks/RemoteJob.go @@ -0,0 +1,70 @@ +package tasks + +import ( + "fmt" + "github.com/ansible-semaphore/semaphore/db" + "github.com/ansible-semaphore/semaphore/lib" + "math/rand" + "time" +) + +type RemoteJob struct { + Task db.Task + Template db.Template + Inventory db.Inventory + Repository db.Repository + Environment db.Environment + Playbook *lib.AnsiblePlaybook + Logger lib.Logger + + taskPool *TaskPool +} + +func (t *RemoteJob) Run(username string, incomingVersion *string) (err error) { + + tsk := t.taskPool.GetTask(t.Task.ID) + + if tsk == nil { + return fmt.Errorf("task not found") + } + + tsk.IncomingVersion = incomingVersion + tsk.Username = username + + var runners []db.Runner + db.StoreSession(t.taskPool.store, "run remote job", func() { + runners, err = t.taskPool.store.GetGlobalRunners() + }) + + if err != nil { + return + } + + if len(runners) == 0 { + err = fmt.Errorf("no runners") + return + } + + runner := runners[rand.Intn(len(runners))] + + if err != nil { + return + } + + tsk.RunnerID = runner.ID + + for { + time.Sleep(1000000000) + tsk = t.taskPool.GetTask(t.Task.ID) + if tsk.Task.Status == db.TaskSuccessStatus || + tsk.Task.Status == db.TaskStoppedStatus || + tsk.Task.Status == db.TaskFailStatus { + break + } + } + + return +} + +func (t *RemoteJob) Kill() { +} diff --git a/services/tasks/RunnerPool.go b/services/tasks/RunnerPool.go deleted file mode 100644 index 45cb10acd..000000000 --- a/services/tasks/RunnerPool.go +++ /dev/null @@ -1,12 +0,0 @@ -package tasks - -import "github.com/ansible-semaphore/semaphore/lib" - -// RunnerPool is a collection of the registered runners. -type RunnerPool struct { -} - -func (p *RunnerPool) CreateJob(playbook *lib.AnsiblePlaybook) (AnsibleJob, error) { - - return &LocalAnsibleJob{playbook: playbook}, nil -} diff --git a/services/tasks/TaskPool.go b/services/tasks/TaskPool.go index c6910fe38..9a85311e5 100644 --- a/services/tasks/TaskPool.go +++ b/services/tasks/TaskPool.go @@ -41,14 +41,19 @@ type TaskPool struct { store db.Store resourceLocker chan *resourceLock +} - runners RunnerPool +func (p *TaskPool) GetRunningTasks() (res []*TaskRunner) { + for _, task := range p.runningTasks { + res = append(res, task) + } + return } func (p *TaskPool) GetTask(id int) (task *TaskRunner) { for _, t := range p.queue { - if t.task.ID == id { + if t.Task.ID == id { task = t break } @@ -56,7 +61,7 @@ func (p *TaskPool) GetTask(id int) (task *TaskRunner) { if task == nil { for _, t := range p.runningTasks { - if t.task.ID == id { + if t.Task.ID == id { task = t break } @@ -85,24 +90,24 @@ func (p *TaskPool) Run() { panic("Trying to lock an already locked resource!") } - projTasks, ok := p.activeProj[t.task.ProjectID] + projTasks, ok := p.activeProj[t.Task.ProjectID] if !ok { projTasks = make(map[int]*TaskRunner) - p.activeProj[t.task.ProjectID] = projTasks + p.activeProj[t.Task.ProjectID] = projTasks } - projTasks[t.task.ID] = t - p.runningTasks[t.task.ID] = t + projTasks[t.Task.ID] = t + p.runningTasks[t.Task.ID] = t continue } - if p.activeProj[t.task.ProjectID] != nil && p.activeProj[t.task.ProjectID][t.task.ID] != nil { - delete(p.activeProj[t.task.ProjectID], t.task.ID) - if len(p.activeProj[t.task.ProjectID]) == 0 { - delete(p.activeProj, t.task.ProjectID) + if p.activeProj[t.Task.ProjectID] != nil && p.activeProj[t.Task.ProjectID][t.Task.ID] != nil { + delete(p.activeProj[t.Task.ProjectID], t.Task.ID) + if len(p.activeProj[t.Task.ProjectID]) == 0 { + delete(p.activeProj, t.Task.ProjectID) } } - delete(p.runningTasks, t.task.ID) + delete(p.runningTasks, t.Task.ID) } }(p.resourceLocker) @@ -111,7 +116,7 @@ func (p *TaskPool) Run() { case record := <-p.logger: // new log message which should be put to database db.StoreSession(p.store, "logger", func() { _, err := p.store.CreateTaskOutput(db.TaskOutput{ - TaskID: record.task.task.ID, + TaskID: record.task.Task.ID, Output: record.output, Time: record.time, }) @@ -125,7 +130,7 @@ func (p *TaskPool) Run() { db.StoreSession(p.store, "new task", func() { p.queue = append(p.queue, task) log.Debug(task) - msg := "Task " + strconv.Itoa(task.task.ID) + " added to queue" + msg := "Task " + strconv.Itoa(task.Task.ID) + " added to queue" task.Log(msg) log.Info(msg) task.updateStatus() @@ -138,10 +143,10 @@ func (p *TaskPool) Run() { //get TaskRunner from top of queue t := p.queue[0] - if t.task.Status == db.TaskFailStatus { + if t.Task.Status == db.TaskFailStatus { //delete failed TaskRunner from queue p.queue = p.queue[1:] - log.Info("Task " + strconv.Itoa(t.task.ID) + " removed from queue") + log.Info("Task " + strconv.Itoa(t.Task.ID) + " removed from queue") break } @@ -151,16 +156,13 @@ func (p *TaskPool) Run() { break } - log.Info("Set resource locker with TaskRunner " + strconv.Itoa(t.task.ID)) + log.Info("Set resource locker with TaskRunner " + strconv.Itoa(t.Task.ID)) p.resourceLocker <- &resourceLock{lock: true, holder: t} - if !t.prepared { - go t.prepareRun() - break - } go t.run() + p.queue = p.queue[1:] - log.Info("Task " + strconv.Itoa(t.task.ID) + " removed from queue") + log.Info("Task " + strconv.Itoa(t.Task.ID) + " removed from queue") } } } @@ -171,24 +173,24 @@ func (p *TaskPool) blocks(t *TaskRunner) bool { return true } - if p.activeProj[t.task.ProjectID] == nil || len(p.activeProj[t.task.ProjectID]) == 0 { + if p.activeProj[t.Task.ProjectID] == nil || len(p.activeProj[t.Task.ProjectID]) == 0 { return false } - for _, r := range p.activeProj[t.task.ProjectID] { - if r.template.ID == t.task.TemplateID { + for _, r := range p.activeProj[t.Task.ProjectID] { + if r.Template.ID == t.Task.TemplateID { return true } } - proj, err := p.store.GetProject(t.task.ProjectID) + proj, err := p.store.GetProject(t.Task.ProjectID) if err != nil { log.Error(err) return false } - return proj.MaxParallelTasks > 0 && len(p.activeProj[t.task.ProjectID]) >= proj.MaxParallelTasks + return proj.MaxParallelTasks > 0 && len(p.activeProj[t.Task.ProjectID]) >= proj.MaxParallelTasks } func CreateTaskPool(store db.Store) TaskPool { @@ -207,7 +209,7 @@ func (p *TaskPool) StopTask(targetTask db.Task) error { tsk := p.GetTask(targetTask.ID) if tsk == nil { // task not active, but exists in database tsk = &TaskRunner{ - task: targetTask, + Task: targetTask, pool: p, } err := tsk.populateDetails() @@ -217,16 +219,12 @@ func (p *TaskPool) StopTask(targetTask db.Task) error { tsk.setStatus(db.TaskStoppedStatus) tsk.createTaskEvent() } else { - status := tsk.task.Status + status := tsk.Task.Status + tsk.setStatus(db.TaskStoppingStatus) + if status == db.TaskRunningStatus { - if tsk.process == nil { - panic("running process can not be nil") - } - err := tsk.process.Kill() - if err != nil { - return err - } + tsk.kill() } } @@ -323,7 +321,7 @@ func (p *TaskPool) AddTask(taskObj db.Task, userID *int, projectID int) (newTask } taskRunner := TaskRunner{ - task: newTask, + Task: newTask, pool: p, } @@ -334,17 +332,26 @@ func (p *TaskPool) AddTask(taskObj db.Task, userID *int, projectID int) (newTask return } - job, err := p.runners.CreateJob(&lib.AnsiblePlaybook{ - Logger: &taskRunner, - TemplateID: taskRunner.template.ID, - Repository: taskRunner.repository, - }) + job := RemoteJob{ + Task: taskRunner.Task, + Template: taskRunner.Template, + Inventory: taskRunner.Inventory, + Repository: taskRunner.Repository, + Environment: taskRunner.Environment, + Logger: &taskRunner, + Playbook: &lib.AnsiblePlaybook{ + Logger: &taskRunner, + TemplateID: taskRunner.Template.ID, + Repository: taskRunner.Repository, + }, + taskPool: p, + } if err != nil { return } - taskRunner.job = job + taskRunner.job = &job p.register <- &taskRunner diff --git a/services/tasks/TaskRunner.go b/services/tasks/TaskRunner.go index 132be8c5d..98f5377e6 100644 --- a/services/tasks/TaskRunner.go +++ b/services/tasks/TaskRunner.go @@ -7,35 +7,39 @@ import ( "io" "io/ioutil" "os" - "path" "strconv" "strings" "time" - "github.com/ansible-semaphore/semaphore/lib" - log "github.com/Sirupsen/logrus" "github.com/ansible-semaphore/semaphore/api/sockets" "github.com/ansible-semaphore/semaphore/db" "github.com/ansible-semaphore/semaphore/util" ) +type Job interface { + Run(username string, incomingVersion *string) error + Kill() +} + type TaskRunner struct { - task db.Task - template db.Template - inventory db.Inventory - repository db.Repository - environment db.Environment + Task db.Task + Template db.Template + Inventory db.Inventory + Repository db.Repository + Environment db.Environment users []int alert bool alertChat *string - prepared bool - process *os.Process pool *TaskPool // job executes Ansible and returns stdout to Semaphore logs - job AnsibleJob + job Job + + RunnerID int + Username string + IncomingVersion *string } func getMD5Hash(filepath string) (string, error) { @@ -52,25 +56,8 @@ func getMD5Hash(filepath string) (string, error) { return fmt.Sprintf("%x", hash.Sum(nil)), nil } -func (t *TaskRunner) getPlaybookDir() string { - playbookPath := path.Join(t.getRepoPath(), t.template.Playbook) - - return path.Dir(playbookPath) -} - -func (t *TaskRunner) getRepoPath() string { - repo := lib.GitRepository{ - Logger: t, - TemplateID: t.template.ID, - Repository: t.repository, - Client: lib.CreateDefaultGitClient(), - } - - return repo.GetFullPath() -} - func (t *TaskRunner) setStatus(status db.TaskStatus) { - if t.task.Status == db.TaskStoppingStatus { + if t.Task.Status == db.TaskStoppingStatus { switch status { case db.TaskFailStatus: status = db.TaskStoppedStatus @@ -80,7 +67,7 @@ func (t *TaskRunner) setStatus(status db.TaskStatus) { } } - t.task.Status = status + t.Task.Status = status t.updateStatus() @@ -98,13 +85,13 @@ func (t *TaskRunner) updateStatus() { for _, user := range t.users { b, err := json.Marshal(&map[string]interface{}{ "type": "update", - "start": t.task.Start, - "end": t.task.End, - "status": t.task.Status, - "task_id": t.task.ID, - "template_id": t.task.TemplateID, - "project_id": t.task.ProjectID, - "version": t.task.Version, + "start": t.Task.Start, + "end": t.Task.End, + "status": t.Task.Status, + "task_id": t.Task.ID, + "template_id": t.Task.TemplateID, + "project_id": t.Task.ProjectID, + "version": t.Task.Version, }) util.LogPanic(err) @@ -112,41 +99,28 @@ func (t *TaskRunner) updateStatus() { sockets.Message(user, b) } - if err := t.pool.store.UpdateTask(t.task); err != nil { + if err := t.pool.store.UpdateTask(t.Task); err != nil { t.panicOnError(err, "Failed to update TaskRunner status") } } -func (t *TaskRunner) fail() { - t.setStatus(db.TaskFailStatus) +func (t *TaskRunner) kill() { + t.job.Kill() } -func (t *TaskRunner) destroyKeys() { - err := t.inventory.SSHKey.Destroy() - if err != nil { - t.Log("Can't destroy inventory user key, error: " + err.Error()) - } - - err = t.inventory.BecomeKey.Destroy() - if err != nil { - t.Log("Can't destroy inventory become user key, error: " + err.Error()) - } - - err = t.template.VaultKey.Destroy() - if err != nil { - t.Log("Can't destroy inventory vault password file, error: " + err.Error()) - } +func (t *TaskRunner) fail() { + t.setStatus(db.TaskFailStatus) } func (t *TaskRunner) createTaskEvent() { objType := db.EventTask - desc := "Task ID " + strconv.Itoa(t.task.ID) + " (" + t.template.Name + ")" + " finished - " + strings.ToUpper(string(t.task.Status)) + desc := "Task ID " + strconv.Itoa(t.Task.ID) + " (" + t.Template.Name + ")" + " finished - " + strings.ToUpper(string(t.Task.Status)) _, err := t.pool.store.CreateEvent(db.Event{ - UserID: t.task.UserID, - ProjectID: &t.task.ProjectID, + UserID: t.Task.UserID, + ProjectID: &t.Task.ProjectID, ObjectType: &objType, - ObjectID: &t.task.ID, + ObjectID: &t.Task.ID, Description: &desc, }) @@ -155,130 +129,41 @@ func (t *TaskRunner) createTaskEvent() { } } -func (t *TaskRunner) prepareRun() { - t.prepared = false - - if !t.pool.store.PermanentConnection() { - t.pool.store.Connect("prepare task " + strconv.Itoa(t.task.ID)) - defer t.pool.store.Close("prepare task " + strconv.Itoa(t.task.ID)) - } - - defer func() { - log.Info("Stopped preparing TaskRunner " + strconv.Itoa(t.task.ID)) - log.Info("Release resource locker with TaskRunner " + strconv.Itoa(t.task.ID)) - t.pool.resourceLocker <- &resourceLock{lock: false, holder: t} - - t.createTaskEvent() - - err := t.repository.SSHKey.Destroy() - if err != nil { - t.Log("Can't destroy repository access key, error: " + err.Error()) - } - }() - - t.Log("Preparing: " + strconv.Itoa(t.task.ID)) - - if err := checkTmpDir(util.Config.TmpPath); err != nil { - t.Log("Creating tmp dir failed: " + err.Error()) - t.fail() - return - } - - objType := db.EventTask - desc := "Task ID " + strconv.Itoa(t.task.ID) + " (" + t.template.Name + ")" + " is preparing" - evt := db.Event{ - UserID: t.task.UserID, - ProjectID: &t.task.ProjectID, - ObjectType: &objType, - ObjectID: &t.task.ID, - Description: &desc, - } - - if _, err := t.pool.store.CreateEvent(evt); err != nil { - t.Log("Fatal error inserting an event") - panic(err) - } - - t.Log("Prepare TaskRunner with template: " + t.template.Name + "\n") - - t.updateStatus() - - if t.repository.GetType() == db.RepositoryLocal { - if _, err := os.Stat(t.repository.GitURL); err != nil { - t.Log("Failed in finding static repository at " + t.repository.GitURL + ": " + err.Error()) - t.fail() - return - } - } else { - if err := t.updateRepository(); err != nil { - t.Log("Failed updating repository: " + err.Error()) - t.fail() - return - } - if err := t.checkoutRepository(); err != nil { - t.Log("Failed to checkout repository to required commit: " + err.Error()) - t.fail() - return - } - } - - if err := t.installInventory(); err != nil { - t.Log("Failed to install inventory: " + err.Error()) - t.fail() - return - } - - if err := t.installRequirements(); err != nil { - t.Log("Running galaxy failed: " + err.Error()) - t.fail() - return - } - - if err := t.installVaultKeyFile(); err != nil { - t.Log("Failed to install vault password file: " + err.Error()) - t.fail() - return - } - - t.prepared = true -} - func (t *TaskRunner) run() { if !t.pool.store.PermanentConnection() { - t.pool.store.Connect("run task " + strconv.Itoa(t.task.ID)) - defer t.pool.store.Close("run task " + strconv.Itoa(t.task.ID)) + t.pool.store.Connect("run task " + strconv.Itoa(t.Task.ID)) + defer t.pool.store.Close("run task " + strconv.Itoa(t.Task.ID)) } defer func() { - log.Info("Stopped running TaskRunner " + strconv.Itoa(t.task.ID)) - log.Info("Release resource locker with TaskRunner " + strconv.Itoa(t.task.ID)) + log.Info("Stopped running TaskRunner " + strconv.Itoa(t.Task.ID)) + log.Info("Release resource locker with TaskRunner " + strconv.Itoa(t.Task.ID)) t.pool.resourceLocker <- &resourceLock{lock: false, holder: t} now := time.Now() - t.task.End = &now + t.Task.End = &now t.updateStatus() t.createTaskEvent() - t.destroyKeys() }() // TODO: more details - if t.task.Status == db.TaskStoppingStatus { + if t.Task.Status == db.TaskStoppingStatus { t.setStatus(db.TaskStoppedStatus) return } now := time.Now() - t.task.Start = &now + t.Task.Start = &now t.setStatus(db.TaskRunningStatus) objType := db.EventTask - desc := "Task ID " + strconv.Itoa(t.task.ID) + " (" + t.template.Name + ")" + " is running" + desc := "Task ID " + strconv.Itoa(t.Task.ID) + " (" + t.Template.Name + ")" + " is running" _, err := t.pool.store.CreateEvent(db.Event{ - UserID: t.task.UserID, - ProjectID: &t.task.ProjectID, + UserID: t.Task.UserID, + ProjectID: &t.Task.ProjectID, ObjectType: &objType, - ObjectID: &t.task.ID, + ObjectID: &t.Task.ID, Description: &desc, }) @@ -287,16 +172,33 @@ func (t *TaskRunner) run() { panic(err) } - t.Log("Started: " + strconv.Itoa(t.task.ID)) - t.Log("Run TaskRunner with template: " + t.template.Name + "\n") + t.Log("Started: " + strconv.Itoa(t.Task.ID)) + t.Log("Run TaskRunner with template: " + t.Template.Name + "\n") // Mark task as stopped if user stops task during preparation (before task run). - if t.task.Status == db.TaskStoppingStatus { + if t.Task.Status == db.TaskStoppingStatus { t.setStatus(db.TaskStoppedStatus) return } - err = t.runPlaybook() + var username string + var incomingVersion *string + + if t.Task.UserID != nil { + var user db.User + user, err = t.pool.store.GetUser(*t.Task.UserID) + if err == nil { + username = user.Username + } + } + + if t.Template.Type != db.TemplateTask { + incomingVersion = t.Task.GetIncomingVersion(t.pool.store) + + } + + err = t.job.Run(username, incomingVersion) + if err != nil { t.Log("Running playbook failed: " + err.Error()) t.fail() @@ -305,8 +207,8 @@ func (t *TaskRunner) run() { t.setStatus(db.TaskSuccessStatus) - templates, err := t.pool.store.GetTemplates(t.task.ProjectID, db.TemplateFilter{ - BuildTemplateID: &t.task.TemplateID, + templates, err := t.pool.store.GetTemplates(t.Task.ProjectID, db.TemplateFilter{ + BuildTemplateID: &t.Task.TemplateID, AutorunOnly: true, }, db.RetrieveQueryParams{}) if err != nil { @@ -318,7 +220,7 @@ func (t *TaskRunner) run() { _, err = t.pool.AddTask(db.Task{ TemplateID: tpl.ID, ProjectID: tpl.ProjectID, - BuildTaskID: &t.task.ID, + BuildTaskID: &t.Task.ID, }, nil, tpl.ProjectID) if err != nil { t.Log("Running playbook failed: " + err.Error()) @@ -346,13 +248,13 @@ func (t *TaskRunner) populateDetails() error { // get template var err error - t.template, err = t.pool.store.GetTemplate(t.task.ProjectID, t.task.TemplateID) + t.Template, err = t.pool.store.GetTemplate(t.Task.ProjectID, t.Task.TemplateID) if err != nil { return t.prepareError(err, "Template not found!") } // get project alert setting - project, err := t.pool.store.GetProject(t.template.ProjectID) + project, err := t.pool.store.GetProject(t.Template.ProjectID) if err != nil { return t.prepareError(err, "Project not found!") } @@ -361,7 +263,7 @@ func (t *TaskRunner) populateDetails() error { t.alertChat = project.AlertChat // get project users - users, err := t.pool.store.GetProjectUsers(t.template.ProjectID, db.RetrieveQueryParams{}) + users, err := t.pool.store.GetProjectUsers(t.Template.ProjectID, db.RetrieveQueryParams{}) if err != nil { return t.prepareError(err, "Users not found!") } @@ -372,42 +274,42 @@ func (t *TaskRunner) populateDetails() error { } // get inventory - t.inventory, err = t.pool.store.GetInventory(t.template.ProjectID, t.template.InventoryID) + t.Inventory, err = t.pool.store.GetInventory(t.Template.ProjectID, t.Template.InventoryID) if err != nil { return t.prepareError(err, "Template Inventory not found!") } // get repository - t.repository, err = t.pool.store.GetRepository(t.template.ProjectID, t.template.RepositoryID) + t.Repository, err = t.pool.store.GetRepository(t.Template.ProjectID, t.Template.RepositoryID) if err != nil { return err } - err = t.repository.SSHKey.DeserializeSecret() + err = t.Repository.SSHKey.DeserializeSecret() if err != nil { return err } // get environment - if t.template.EnvironmentID != nil { - t.environment, err = t.pool.store.GetEnvironment(t.template.ProjectID, *t.template.EnvironmentID) + if t.Template.EnvironmentID != nil { + t.Environment, err = t.pool.store.GetEnvironment(t.Template.ProjectID, *t.Template.EnvironmentID) if err != nil { return err } } - if t.task.Environment != "" { + if t.Task.Environment != "" { environment := make(map[string]interface{}) - if t.environment.JSON != "" { - err = json.Unmarshal([]byte(t.task.Environment), &environment) + if t.Environment.JSON != "" { + err = json.Unmarshal([]byte(t.Task.Environment), &environment) if err != nil { return err } } taskEnvironment := make(map[string]interface{}) - err = json.Unmarshal([]byte(t.environment.JSON), &taskEnvironment) + err = json.Unmarshal([]byte(t.Environment.JSON), &taskEnvironment) if err != nil { return err } @@ -422,352 +324,12 @@ func (t *TaskRunner) populateDetails() error { return err } - t.environment.JSON = string(ev) + t.Environment.JSON = string(ev) } return nil } -func (t *TaskRunner) installVaultKeyFile() error { - if t.template.VaultKeyID == nil { - return nil - } - - return t.template.VaultKey.Install(db.AccessKeyRoleAnsiblePasswordVault) -} - -func (t *TaskRunner) checkoutRepository() error { - - repo := lib.GitRepository{ - Logger: t, - TemplateID: t.template.ID, - Repository: t.repository, - Client: lib.CreateDefaultGitClient(), - } - - err := repo.ValidateRepo() - - if err != nil { - return err - } - - if t.task.CommitHash != nil { - // checkout to commit if it is provided for TaskRunner - return repo.Checkout(*t.task.CommitHash) - } - - // store commit to TaskRunner table - - commitHash, err := repo.GetLastCommitHash() - - if err != nil { - return err - } - - commitMessage, _ := repo.GetLastCommitMessage() - - t.task.CommitHash = &commitHash - t.task.CommitMessage = commitMessage - - return t.pool.store.UpdateTask(t.task) -} - -func (t *TaskRunner) updateRepository() error { - repo := lib.GitRepository{ - Logger: t, - TemplateID: t.template.ID, - Repository: t.repository, - Client: lib.CreateDefaultGitClient(), - } - - err := repo.ValidateRepo() - - if err != nil { - if !os.IsNotExist(err) { - err = os.RemoveAll(repo.GetFullPath()) - if err != nil { - return err - } - } - return repo.Clone() - } - - if repo.CanBePulled() { - err = repo.Pull() - if err == nil { - return nil - } - } - - err = os.RemoveAll(repo.GetFullPath()) - if err != nil { - return err - } - - return repo.Clone() -} - -func (t *TaskRunner) installCollectionsRequirements() error { - requirementsFilePath := path.Join(t.getPlaybookDir(), "collections", "requirements.yml") - requirementsHashFilePath := fmt.Sprintf("%s.md5", requirementsFilePath) - - if _, err := os.Stat(requirementsFilePath); err != nil { - t.Log("No collections/requirements.yml file found. Skip galaxy install process.\n") - return nil - } - - if hasRequirementsChanges(requirementsFilePath, requirementsHashFilePath) { - if err := t.runGalaxy([]string{ - "collection", - "install", - "-r", - requirementsFilePath, - "--force", - }); err != nil { - return err - } - if err := writeMD5Hash(requirementsFilePath, requirementsHashFilePath); err != nil { - return err - } - } else { - t.Log("collections/requirements.yml has no changes. Skip galaxy install process.\n") - } - - return nil -} - -func (t *TaskRunner) installRolesRequirements() error { - requirementsFilePath := fmt.Sprintf("%s/roles/requirements.yml", t.getRepoPath()) - requirementsHashFilePath := fmt.Sprintf("%s.md5", requirementsFilePath) - - if _, err := os.Stat(requirementsFilePath); err != nil { - t.Log("No roles/requirements.yml file found. Skip galaxy install process.\n") - return nil - } - - if hasRequirementsChanges(requirementsFilePath, requirementsHashFilePath) { - if err := t.runGalaxy([]string{ - "role", - "install", - "-r", - requirementsFilePath, - "--force", - }); err != nil { - return err - } - if err := writeMD5Hash(requirementsFilePath, requirementsHashFilePath); err != nil { - return err - } - } else { - t.Log("roles/requirements.yml has no changes. Skip galaxy install process.\n") - } - - return nil -} - -func (t *TaskRunner) installRequirements() error { - if err := t.installCollectionsRequirements(); err != nil { - return err - } - if err := t.installRolesRequirements(); err != nil { - return err - } - return nil -} - -func (t *TaskRunner) runGalaxy(args []string) error { - return t.job.RunGalaxy(args) -} - -func (t *TaskRunner) runPlaybook() (err error) { - args, err := t.getPlaybookArgs() - if err != nil { - return - } - - environmentVariables, err := t.getEnvironmentENV() - if err != nil { - return - } - - return t.job.RunPlaybook(args, &environmentVariables, func(p *os.Process) { t.process = p }) -} - -func (t *TaskRunner) getEnvironmentENV() (arr []string, err error) { - environmentVars := make(map[string]string) - - if t.environment.ENV != nil { - err = json.Unmarshal([]byte(*t.environment.ENV), &environmentVars) - if err != nil { - return - } - } - - for key, val := range environmentVars { - arr = append(arr, fmt.Sprintf("%s=%s", key, val)) - } - - return -} - -func (t *TaskRunner) getEnvironmentExtraVars() (str string, err error) { - extraVars := make(map[string]interface{}) - - if t.environment.JSON != "" { - err = json.Unmarshal([]byte(t.environment.JSON), &extraVars) - if err != nil { - return - } - } - - taskDetails := make(map[string]interface{}) - - taskDetails["id"] = t.task.ID - - if t.task.Message != "" { - taskDetails["message"] = t.task.Message - } - - if t.task.UserID != nil { - var user db.User - user, err = t.pool.store.GetUser(*t.task.UserID) - if err == nil { - taskDetails["username"] = user.Username - } - } - - if t.template.Type != db.TemplateTask { - taskDetails["type"] = t.template.Type - incomingVersion := t.task.GetIncomingVersion(t.pool.store) - if incomingVersion != nil { - taskDetails["incoming_version"] = incomingVersion - } - if t.template.Type == db.TemplateBuild { - taskDetails["target_version"] = t.task.Version - } - } - - vars := make(map[string]interface{}) - vars["task_details"] = taskDetails - extraVars["semaphore_vars"] = vars - - ev, err := json.Marshal(extraVars) - if err != nil { - return - } - - str = string(ev) - - return -} - -// nolint: gocyclo -func (t *TaskRunner) getPlaybookArgs() (args []string, err error) { - playbookName := t.task.Playbook - if playbookName == "" { - playbookName = t.template.Playbook - } - - var inventory string - switch t.inventory.Type { - case db.InventoryFile: - inventory = t.inventory.Inventory - case db.InventoryStatic, db.InventoryStaticYaml: - inventory = util.Config.TmpPath + "/inventory_" + strconv.Itoa(t.task.ID) - if t.inventory.Type == db.InventoryStaticYaml { - inventory += ".yml" - } - default: - err = fmt.Errorf("invalid invetory type") - return - } - - args = []string{ - "-i", inventory, - } - - if t.inventory.SSHKeyID != nil { - switch t.inventory.SSHKey.Type { - case db.AccessKeySSH: - args = append(args, "--private-key="+t.inventory.SSHKey.GetPath()) - //args = append(args, "--extra-vars={\"ansible_ssh_private_key_file\": \""+t.inventory.SSHKey.GetPath()+"\"}") - if t.inventory.SSHKey.SshKey.Login != "" { - args = append(args, "--extra-vars={\"ansible_user\": \""+t.inventory.SSHKey.SshKey.Login+"\"}") - } - case db.AccessKeyLoginPassword: - args = append(args, "--extra-vars=@"+t.inventory.SSHKey.GetPath()) - case db.AccessKeyNone: - default: - err = fmt.Errorf("access key does not suite for inventory's user credentials") - return - } - } - - if t.inventory.BecomeKeyID != nil { - switch t.inventory.BecomeKey.Type { - case db.AccessKeyLoginPassword: - args = append(args, "--extra-vars=@"+t.inventory.BecomeKey.GetPath()) - case db.AccessKeyNone: - default: - err = fmt.Errorf("access key does not suite for inventory's sudo user credentials") - return - } - } - - if t.task.Debug { - args = append(args, "-vvvv") - } - - if t.task.Diff { - args = append(args, "--diff") - } - - if t.task.DryRun { - args = append(args, "--check") - } - - if t.template.VaultKeyID != nil { - args = append(args, "--vault-password-file", t.template.VaultKey.GetPath()) - } - - extraVars, err := t.getEnvironmentExtraVars() - if err != nil { - t.Log(err.Error()) - t.Log("Could not remove command environment, if existant it will be passed to --extra-vars. This is not fatal but be aware of side effects") - } else if extraVars != "" { - args = append(args, "--extra-vars", extraVars) - } - - var templateExtraArgs []string - if t.template.Arguments != nil { - err = json.Unmarshal([]byte(*t.template.Arguments), &templateExtraArgs) - if err != nil { - t.Log("Invalid format of the template extra arguments, must be valid JSON") - return - } - } - - var taskExtraArgs []string - if t.template.AllowOverrideArgsInTask && t.task.Arguments != nil { - err = json.Unmarshal([]byte(*t.task.Arguments), &taskExtraArgs) - if err != nil { - t.Log("Invalid format of the TaskRunner extra arguments, must be valid JSON") - return - } - } - - if t.task.Limit != "" { - t.Log("--limit=" + t.task.Limit) - taskExtraArgs = append(taskExtraArgs, "--limit="+t.task.Limit) - } - - args = append(args, templateExtraArgs...) - args = append(args, taskExtraArgs...) - args = append(args, playbookName) - - return -} - func hasRequirementsChanges(requirementsFilePath string, requirementsHashFilePath string) bool { oldFileMD5HashBytes, err := ioutil.ReadFile(requirementsHashFilePath) if err != nil { diff --git a/services/tasks/TaskRunner_test.go b/services/tasks/TaskRunner_test.go index b4ae00100..5575281e7 100644 --- a/services/tasks/TaskRunner_test.go +++ b/services/tasks/TaskRunner_test.go @@ -1,6 +1,7 @@ package tasks import ( + "github.com/ansible-semaphore/semaphore/lib" "math/rand" "os" "path" @@ -47,10 +48,22 @@ func TestTaskRunnerRun(t *testing.T) { } taskRunner := TaskRunner{ - task: task, + Task: task, pool: &pool, } - + taskRunner.job = &LocalJob{ + Task: taskRunner.Task, + Template: taskRunner.Template, + Inventory: taskRunner.Inventory, + Repository: taskRunner.Repository, + Environment: taskRunner.Environment, + Logger: &taskRunner, + Playbook: &lib.AnsiblePlaybook{ + Logger: &taskRunner, + TemplateID: taskRunner.Template.ID, + Repository: taskRunner.Repository, + }, + } taskRunner.run() } @@ -62,8 +75,8 @@ func TestGetRepoPath(t *testing.T) { inventoryID := 1 tsk := TaskRunner{ - task: db.Task{}, - inventory: db.Inventory{ + Task: db.Task{}, + Inventory: db.Inventory{ SSHKeyID: &inventoryID, SSHKey: db.AccessKey{ ID: 12345, @@ -71,12 +84,25 @@ func TestGetRepoPath(t *testing.T) { }, Type: db.InventoryStatic, }, - template: db.Template{ + Template: db.Template{ Playbook: "deploy/test.yml", }, } + tsk.job = &LocalJob{ + Task: tsk.Task, + Template: tsk.Template, + Inventory: tsk.Inventory, + Repository: tsk.Repository, + Environment: tsk.Environment, + Logger: &tsk, + Playbook: &lib.AnsiblePlaybook{ + Logger: &tsk, + TemplateID: tsk.Template.ID, + Repository: tsk.Repository, + }, + } - dir := tsk.getPlaybookDir() + dir := tsk.job.(*LocalJob).getPlaybookDir() if dir != "/tmp/repository_0_0/deploy" { t.Fatal("Invalid playbook dir: " + dir) } @@ -90,8 +116,8 @@ func TestGetRepoPath_whenStartsWithSlash(t *testing.T) { inventoryID := 1 tsk := TaskRunner{ - task: db.Task{}, - inventory: db.Inventory{ + Task: db.Task{}, + Inventory: db.Inventory{ SSHKeyID: &inventoryID, SSHKey: db.AccessKey{ ID: 12345, @@ -99,12 +125,25 @@ func TestGetRepoPath_whenStartsWithSlash(t *testing.T) { }, Type: db.InventoryStatic, }, - template: db.Template{ + Template: db.Template{ Playbook: "/deploy/test.yml", }, } + tsk.job = &LocalJob{ + Task: tsk.Task, + Template: tsk.Template, + Inventory: tsk.Inventory, + Repository: tsk.Repository, + Environment: tsk.Environment, + Logger: &tsk, + Playbook: &lib.AnsiblePlaybook{ + Logger: &tsk, + TemplateID: tsk.Template.ID, + Repository: tsk.Repository, + }, + } - dir := tsk.getPlaybookDir() + dir := tsk.job.(*LocalJob).getPlaybookDir() if dir != "/tmp/repository_0_0/deploy" { t.Fatal("Invalid playbook dir: " + dir) } @@ -171,18 +210,31 @@ func TestPopulateDetails(t *testing.T) { tsk := TaskRunner{ pool: &pool, - task: db.Task{ + Task: db.Task{ TemplateID: tpl.ID, ProjectID: proj.ID, Environment: `{"comment": "Just do it!", "time": "2021-11-02"}`, }, } + tsk.job = &LocalJob{ + Task: tsk.Task, + Template: tsk.Template, + Inventory: tsk.Inventory, + Repository: tsk.Repository, + Environment: tsk.Environment, + Logger: &tsk, + Playbook: &lib.AnsiblePlaybook{ + Logger: &tsk, + TemplateID: tsk.Template.ID, + Repository: tsk.Repository, + }, + } err = tsk.populateDetails() if err != nil { t.Fatal(err) } - if tsk.environment.JSON != `{"author":"Denis","comment":"Hello, World!","time":"2021-11-02"}` { + if tsk.Environment.JSON != `{"author":"Denis","comment":"Hello, World!","time":"2021-11-02"}` { t.Fatal(err) } } @@ -195,8 +247,8 @@ func TestTaskGetPlaybookArgs(t *testing.T) { inventoryID := 1 tsk := TaskRunner{ - task: db.Task{}, - inventory: db.Inventory{ + Task: db.Task{}, + Inventory: db.Inventory{ SSHKeyID: &inventoryID, SSHKey: db.AccessKey{ ID: 12345, @@ -204,19 +256,32 @@ func TestTaskGetPlaybookArgs(t *testing.T) { }, Type: db.InventoryStatic, }, - template: db.Template{ + Template: db.Template{ Playbook: "test.yml", }, } + tsk.job = &LocalJob{ + Task: tsk.Task, + Template: tsk.Template, + Inventory: tsk.Inventory, + Repository: tsk.Repository, + Environment: tsk.Environment, + Logger: &tsk, + Playbook: &lib.AnsiblePlaybook{ + Logger: &tsk, + TemplateID: tsk.Template.ID, + Repository: tsk.Repository, + }, + } - args, err := tsk.getPlaybookArgs() + args, err := tsk.job.(*LocalJob).getPlaybookArgs("", nil) if err != nil { t.Fatal(err) } res := strings.Join(args, " ") - if res != "-i /tmp/inventory_0 --private-key=/tmp/access_key_0 --extra-vars {\"semaphore_vars\":{\"task_details\":{\"id\":0}}} test.yml" { + if res != "-i /tmp/inventory_0 --private-key=/tmp/access_key_0 --extra-vars {\"semaphore_vars\":{\"task_details\":{\"id\":0,\"username\":\"\"}}} test.yml" { t.Fatal("incorrect result") } } @@ -229,8 +294,8 @@ func TestTaskGetPlaybookArgs2(t *testing.T) { inventoryID := 1 tsk := TaskRunner{ - task: db.Task{}, - inventory: db.Inventory{ + Task: db.Task{}, + Inventory: db.Inventory{ Type: db.InventoryStatic, SSHKeyID: &inventoryID, SSHKey: db.AccessKey{ @@ -242,19 +307,32 @@ func TestTaskGetPlaybookArgs2(t *testing.T) { }, }, }, - template: db.Template{ + Template: db.Template{ Playbook: "test.yml", }, } + tsk.job = &LocalJob{ + Task: tsk.Task, + Template: tsk.Template, + Inventory: tsk.Inventory, + Repository: tsk.Repository, + Environment: tsk.Environment, + Logger: &tsk, + Playbook: &lib.AnsiblePlaybook{ + Logger: &tsk, + TemplateID: tsk.Template.ID, + Repository: tsk.Repository, + }, + } - args, err := tsk.getPlaybookArgs() + args, err := tsk.job.(*LocalJob).getPlaybookArgs("", nil) if err != nil { t.Fatal(err) } res := strings.Join(args, " ") - if res != "-i /tmp/inventory_0 --extra-vars=@/tmp/access_key_0 --extra-vars {\"semaphore_vars\":{\"task_details\":{\"id\":0}}} test.yml" { + if res != "-i /tmp/inventory_0 --extra-vars=@/tmp/access_key_0 --extra-vars {\"semaphore_vars\":{\"task_details\":{\"id\":0,\"username\":\"\"}}} test.yml" { t.Fatal("incorrect result") } } @@ -267,8 +345,8 @@ func TestTaskGetPlaybookArgs3(t *testing.T) { inventoryID := 1 tsk := TaskRunner{ - task: db.Task{}, - inventory: db.Inventory{ + Task: db.Task{}, + Inventory: db.Inventory{ Type: db.InventoryStatic, BecomeKeyID: &inventoryID, BecomeKey: db.AccessKey{ @@ -280,19 +358,32 @@ func TestTaskGetPlaybookArgs3(t *testing.T) { }, }, }, - template: db.Template{ + Template: db.Template{ Playbook: "test.yml", }, } + tsk.job = &LocalJob{ + Task: tsk.Task, + Template: tsk.Template, + Inventory: tsk.Inventory, + Repository: tsk.Repository, + Environment: tsk.Environment, + Logger: &tsk, + Playbook: &lib.AnsiblePlaybook{ + Logger: &tsk, + TemplateID: tsk.Template.ID, + Repository: tsk.Repository, + }, + } - args, err := tsk.getPlaybookArgs() + args, err := tsk.job.(*LocalJob).getPlaybookArgs("", nil) if err != nil { t.Fatal(err) } res := strings.Join(args, " ") - if res != "-i /tmp/inventory_0 --extra-vars=@/tmp/access_key_0 --extra-vars {\"semaphore_vars\":{\"task_details\":{\"id\":0}}} test.yml" { + if res != "-i /tmp/inventory_0 --extra-vars=@/tmp/access_key_0 --extra-vars {\"semaphore_vars\":{\"task_details\":{\"id\":0,\"username\":\"\"}}} test.yml" { t.Fatal("incorrect result") } } diff --git a/services/tasks/alert.go b/services/tasks/alert.go index ff54342e1..43206c68a 100644 --- a/services/tasks/alert.go +++ b/services/tasks/alert.go @@ -43,11 +43,11 @@ func (t *TaskRunner) sendMailAlert() { var mailBuffer bytes.Buffer alert := Alert{ - TaskID: strconv.Itoa(t.task.ID), - Name: t.template.Name, - TaskURL: util.Config.WebHost + "/project/" + strconv.Itoa(t.template.ProjectID) + - "/templates/" + strconv.Itoa(t.template.ID) + - "?t=" + strconv.Itoa(t.task.ID), + TaskID: strconv.Itoa(t.Task.ID), + Name: t.Template.Name, + TaskURL: util.Config.WebHost + "/project/" + strconv.Itoa(t.Template.ProjectID) + + "/templates/" + strconv.Itoa(t.Template.ID) + + "?t=" + strconv.Itoa(t.Task.ID), From: util.Config.EmailSender, } tpl := template.New("mail body template") @@ -83,7 +83,7 @@ func (t *TaskRunner) sendTelegramAlert() { return } - if t.template.SuppressSuccessAlerts && t.task.Status == db.TaskSuccessStatus { + if t.Template.SuppressSuccessAlerts && t.Task.Status == db.TaskSuccessStatus { return } @@ -95,22 +95,22 @@ func (t *TaskRunner) sendTelegramAlert() { var telegramBuffer bytes.Buffer var version string - if t.task.Version != nil { - version = *t.task.Version - } else if t.task.BuildTaskID != nil { - version = "build " + strconv.Itoa(*t.task.BuildTaskID) + if t.Task.Version != nil { + version = *t.Task.Version + } else if t.Task.BuildTaskID != nil { + version = "build " + strconv.Itoa(*t.Task.BuildTaskID) } else { version = "" } var message string - if t.task.Message != "" { - message = "- " + t.task.Message + if t.Task.Message != "" { + message = "- " + t.Task.Message } var author string - if t.task.UserID != nil { - user, err := t.pool.store.GetUser(*t.task.UserID) + if t.Task.UserID != nil { + user, err := t.pool.store.GetUser(*t.Task.UserID) if err != nil { panic(err) } @@ -118,11 +118,11 @@ func (t *TaskRunner) sendTelegramAlert() { } alert := Alert{ - TaskID: strconv.Itoa(t.task.ID), - Name: t.template.Name, - TaskURL: util.Config.WebHost + "/project/" + strconv.Itoa(t.template.ProjectID) + "/templates/" + strconv.Itoa(t.template.ID) + "?t=" + strconv.Itoa(t.task.ID), + TaskID: strconv.Itoa(t.Task.ID), + Name: t.Template.Name, + TaskURL: util.Config.WebHost + "/project/" + strconv.Itoa(t.Template.ProjectID) + "/templates/" + strconv.Itoa(t.Template.ID) + "?t=" + strconv.Itoa(t.Task.ID), ChatID: chatID, - TaskResult: strings.ToUpper(string(t.task.Status)), + TaskResult: strings.ToUpper(string(t.Task.Status)), TaskVersion: version, TaskDescription: message, Author: author, @@ -156,7 +156,7 @@ func (t *TaskRunner) sendSlackAlert() { return } - if t.template.SuppressSuccessAlerts && t.task.Status == db.TaskSuccessStatus { + if t.Template.SuppressSuccessAlerts && t.Task.Status == db.TaskSuccessStatus { return } @@ -165,22 +165,22 @@ func (t *TaskRunner) sendSlackAlert() { var slackBuffer bytes.Buffer var version string - if t.task.Version != nil { - version = *t.task.Version - } else if t.task.BuildTaskID != nil { - version = "build " + strconv.Itoa(*t.task.BuildTaskID) + if t.Task.Version != nil { + version = *t.Task.Version + } else if t.Task.BuildTaskID != nil { + version = "build " + strconv.Itoa(*t.Task.BuildTaskID) } else { version = "" } var message string - if t.task.Message != "" { - message = "- " + t.task.Message + if t.Task.Message != "" { + message = "- " + t.Task.Message } var author string - if t.task.UserID != nil { - user, err := t.pool.store.GetUser(*t.task.UserID) + if t.Task.UserID != nil { + user, err := t.pool.store.GetUser(*t.Task.UserID) if err != nil { panic(err) } @@ -188,24 +188,24 @@ func (t *TaskRunner) sendSlackAlert() { } var color string - if t.task.Status == db.TaskSuccessStatus { + if t.Task.Status == db.TaskSuccessStatus { color = "good" - } else if t.task.Status == db.TaskFailStatus { + } else if t.Task.Status == db.TaskFailStatus { color = "bad" - } else if t.task.Status == db.TaskRunningStatus { + } else if t.Task.Status == db.TaskRunningStatus { color = "#333CFF" - } else if t.task.Status == db.TaskWaitingStatus { + } else if t.Task.Status == db.TaskWaitingStatus { color = "#FFFC33" - } else if t.task.Status == db.TaskStoppingStatus { + } else if t.Task.Status == db.TaskStoppingStatus { color = "#BEBEBE" - } else if t.task.Status == db.TaskStoppedStatus { + } else if t.Task.Status == db.TaskStoppedStatus { color = "#5B5B5B" } alert := Alert{ - TaskID: strconv.Itoa(t.task.ID), - Name: t.template.Name, - TaskURL: util.Config.WebHost + "/project/" + strconv.Itoa(t.template.ProjectID) + "/templates/" + strconv.Itoa(t.template.ID) + "?t=" + strconv.Itoa(t.task.ID), - TaskResult: strings.ToUpper(string(t.task.Status)), + TaskID: strconv.Itoa(t.Task.ID), + Name: t.Template.Name, + TaskURL: util.Config.WebHost + "/project/" + strconv.Itoa(t.Template.ProjectID) + "/templates/" + strconv.Itoa(t.Template.ID) + "?t=" + strconv.Itoa(t.Task.ID), + TaskResult: strings.ToUpper(string(t.Task.Status)), TaskVersion: version, TaskDescription: message, Author: author, diff --git a/services/tasks/inventory.go b/services/tasks/inventory.go index dd67e9206..62691c6a4 100644 --- a/services/tasks/inventory.go +++ b/services/tasks/inventory.go @@ -8,36 +8,36 @@ import ( "github.com/ansible-semaphore/semaphore/util" ) -func (t *TaskRunner) installInventory() (err error) { - if t.inventory.SSHKeyID != nil { - err = t.inventory.SSHKey.Install(db.AccessKeyRoleAnsibleUser) +func (t *LocalJob) installInventory() (err error) { + if t.Inventory.SSHKeyID != nil { + err = t.Inventory.SSHKey.Install(db.AccessKeyRoleAnsibleUser) if err != nil { return } } - if t.inventory.BecomeKeyID != nil { - err = t.inventory.BecomeKey.Install(db.AccessKeyRoleAnsibleBecomeUser) + if t.Inventory.BecomeKeyID != nil { + err = t.Inventory.BecomeKey.Install(db.AccessKeyRoleAnsibleBecomeUser) if err != nil { return } } - if t.inventory.Type == db.InventoryStatic || t.inventory.Type == db.InventoryStaticYaml { + if t.Inventory.Type == db.InventoryStatic || t.Inventory.Type == db.InventoryStaticYaml { err = t.installStaticInventory() } return } -func (t *TaskRunner) installStaticInventory() error { +func (t *LocalJob) installStaticInventory() error { t.Log("installing static inventory") - path := util.Config.TmpPath + "/inventory_" + strconv.Itoa(t.task.ID) - if t.inventory.Type == db.InventoryStaticYaml { + path := util.Config.TmpPath + "/inventory_" + strconv.Itoa(t.Task.ID) + if t.Inventory.Type == db.InventoryStaticYaml { path += ".yml" } // create inventory file - return ioutil.WriteFile(path, []byte(t.inventory.Inventory), 0664) + return ioutil.WriteFile(path, []byte(t.Inventory.Inventory), 0664) } diff --git a/services/tasks/logging.go b/services/tasks/logging.go index 5ff9a072d..7b38da7fa 100644 --- a/services/tasks/logging.go +++ b/services/tasks/logging.go @@ -3,24 +3,21 @@ package tasks import ( "bufio" "encoding/json" - "os/exec" - "time" - log "github.com/Sirupsen/logrus" "github.com/ansible-semaphore/semaphore/api/sockets" "github.com/ansible-semaphore/semaphore/util" + "os/exec" + "time" ) -func (t *TaskRunner) Log(msg string) { - now := time.Now() - +func (t *TaskRunner) Log2(msg string, now time.Time) { for _, user := range t.users { b, err := json.Marshal(&map[string]interface{}{ "type": "log", "output": msg, "time": now, - "task_id": t.task.ID, - "project_id": t.task.ProjectID, + "task_id": t.Task.ID, + "project_id": t.Task.ProjectID, }) util.LogPanic(err) @@ -35,6 +32,10 @@ func (t *TaskRunner) Log(msg string) { } } +func (t *TaskRunner) Log(msg string) { + t.Log2(msg, time.Now()) +} + // Readln reads from the pipe func Readln(r *bufio.Reader) (string, error) { var ( diff --git a/util/config.go b/util/config.go index 9660b6e52..363c1436b 100644 --- a/util/config.go +++ b/util/config.go @@ -82,6 +82,12 @@ const ( CmdGitClientId GitClientId = "cmd_git" ) +type RunnerSettings struct { + ApiURL string `json:"api_url"` + RegistrationToken string `json:"registration_token"` + ConfigFile string `json:"config_file"` +} + // ConfigType mapping between Config and the json file that sets it type ConfigType struct { MySQL DbConfig `json:"mysql"` @@ -150,11 +156,13 @@ type ConfigType struct { // task concurrency MaxParallelTasks int `json:"max_parallel_tasks"` - RegistrationToken string `json:"registration_token"` + RunnerRegistrationToken string `json:"runner_registration_token"` // feature switches PasswordLoginDisable bool `json:"password_login_disable"` NonAdminCanCreateProject bool `json:"non_admin_can_create_project"` + + Runner RunnerSettings `json:"runner"` } // Config exposes the application configuration storage for use in the application