Skip to content

Commit

Permalink
Runners (#1444)
Browse files Browse the repository at this point in the history
* feat(runners): add register endpoint

* feat(runners): add remote runner

* refactor(runners): move functionality TaskRunner -> AnsibleJobRunner

* fix(runners): init job

* chore(runners): remote unused field

* feat(runners): use external logger from AnsibleJobRunner

* refactor(runners): remove status field

* refactor(runners): remove mutation from job

* feat(runners): pass username and verison to task

* test(runners): fix tests

* fix(runners): params for Run

* feat(runners): implement runner selection

* feat(runners): fill required fields

* fix(runners): session block

* feat(runners): kill process

* refactor(runners): rename fields to public

* feat(runners): remote runner functionallity

* refactor(runners): remove unused class

* fix(runners): send json

* feat(runners): runner registration

* feat(runners): logging

* feat(runners): server <-> running communication works

* feat(runners): pass creds to runenr
  • Loading branch information
fiftin authored Aug 28, 2023
1 parent 58fc07e commit d1b7ad0
Show file tree
Hide file tree
Showing 25 changed files with 1,361 additions and 858 deletions.
54 changes: 0 additions & 54 deletions api-docs-runners.yml

This file was deleted.

26 changes: 26 additions & 0 deletions api-docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,11 @@ definitions:
position:
type: integer

Runner:
type: object
properties:
token:
type: string

Event:
type: object
Expand Down Expand Up @@ -1624,3 +1629,24 @@ paths:
type: array
items:
$ref: "#/definitions/TaskOutput"

# /runners:
# post:
# tags:
# - project
# summary: Starts a job
# parameters:
# - name: task
# in: body
# required: true
# schema:
# type: object
# properties:
# registration_token:
# type: string
# example: test123
# responses:
# 201:
# description: Task queued
# schema:
# $ref: "#/definitions/Runner"
6 changes: 3 additions & 3 deletions api/helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ func GetIntParam(name string, w http.ResponseWriter, r *http.Request) (int, erro
return intParam, nil
}

//H just a string-to-anything map
// H just a string-to-anything map
type H map[string]interface{}

//Bind decodes json into object
// Bind decodes json into object
func Bind(w http.ResponseWriter, r *http.Request, out interface{}) bool {
err := json.NewDecoder(r.Body).Decode(out)
if err != nil {
Expand All @@ -61,7 +61,7 @@ func Bind(w http.ResponseWriter, r *http.Request, out interface{}) bool {
return err == nil
}

//WriteJSON writes object as JSON
// WriteJSON writes object as JSON
func WriteJSON(w http.ResponseWriter, code int, out interface{}) {
w.Header().Set("content-type", "application/json")
w.WriteHeader(code)
Expand Down
12 changes: 10 additions & 2 deletions api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package api

import (
"fmt"
"github.com/ansible-semaphore/semaphore/api/runners"
"net/http"
"os"
"strings"
Expand All @@ -17,12 +18,13 @@ import (

var publicAssets2 = packr.NewBox("../web/dist")

// StoreMiddleware WTF?
func StoreMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
store := helpers.Store(r)
var url = r.URL.String()
//var url = r.URL.String()

db.StoreSession(store, url, func() {
db.StoreSession(store, util.RandString(12), func() {
next.ServeHTTP(w, r)
})
})
Expand Down Expand Up @@ -81,11 +83,17 @@ func Route() *mux.Router {

publicAPIRouter.Use(StoreMiddleware, JSONMiddleware)

publicAPIRouter.HandleFunc("/runners", runners.RegisterRunner).Methods("POST")
publicAPIRouter.HandleFunc("/auth/login", login).Methods("GET", "POST")
publicAPIRouter.HandleFunc("/auth/logout", logout).Methods("POST")
publicAPIRouter.HandleFunc("/auth/oidc/{provider}/login", oidcLogin).Methods("GET")
publicAPIRouter.HandleFunc("/auth/oidc/{provider}/redirect", oidcRedirect).Methods("GET")

routersAPI := r.PathPrefix(webPath + "api").Subrouter()
routersAPI.Use(StoreMiddleware, JSONMiddleware, runners.RunnerMiddleware)
routersAPI.Path("/runners/{runner_id}").HandlerFunc(runners.GetRunner).Methods("GET", "HEAD")
routersAPI.Path("/runners/{runner_id}").HandlerFunc(runners.UpdateRunner).Methods("PUT")

authenticatedWS := r.PathPrefix(webPath + "api").Subrouter()
authenticatedWS.Use(JSONMiddleware, authenticationWithStore)
authenticatedWS.Path("/ws").HandlerFunc(sockets.Handler).Methods("GET", "HEAD")
Expand Down
52 changes: 0 additions & 52 deletions api/runners/handler.go

This file was deleted.

154 changes: 154 additions & 0 deletions api/runners/runners.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package runners

import (
"github.com/ansible-semaphore/semaphore/api/helpers"
"github.com/ansible-semaphore/semaphore/db"
"github.com/ansible-semaphore/semaphore/services/runners"
"github.com/ansible-semaphore/semaphore/util"
"github.com/gorilla/context"
"net/http"
)

func RunnerMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

runnerID, err := helpers.GetIntParam("runner_id", w, r)

if err != nil {
helpers.WriteJSON(w, http.StatusBadRequest, map[string]string{
"error": "runner_id required",
})
return
}

store := helpers.Store(r)

runner, err := store.GetGlobalRunner(runnerID)

if err != nil {
helpers.WriteJSON(w, http.StatusNotFound, map[string]string{
"error": "Runner not found",
})
return
}

context.Set(r, "runner", runner)
next.ServeHTTP(w, r)
})
}

func GetRunner(w http.ResponseWriter, r *http.Request) {
runner := context.Get(r, "runner").(db.Runner)

data := runners.RunnerState{
AccessKeys: make(map[int]db.AccessKey),
}

tasks := helpers.TaskPool(r).GetRunningTasks()

for _, tsk := range tasks {
if tsk.RunnerID != runner.ID {
continue
}

if tsk.Task.Status == db.TaskRunningStatus {

data.NewJobs = append(data.NewJobs, runners.JobData{
Username: tsk.Username,
IncomingVersion: tsk.IncomingVersion,
Task: tsk.Task,
Template: tsk.Template,
Inventory: tsk.Inventory,
Repository: tsk.Repository,
Environment: tsk.Environment,
})

if tsk.Inventory.SSHKeyID != nil {
data.AccessKeys[*tsk.Inventory.SSHKeyID] = tsk.Inventory.SSHKey
}

if tsk.Inventory.BecomeKeyID != nil {
data.AccessKeys[*tsk.Inventory.BecomeKeyID] = tsk.Inventory.BecomeKey
}

data.AccessKeys[tsk.Repository.SSHKeyID] = tsk.Repository.SSHKey

} else {
data.CurrentJobs = append(data.CurrentJobs, runners.JobState{
ID: tsk.Task.ID,
Status: tsk.Task.Status,
})
}
}

helpers.WriteJSON(w, http.StatusOK, data)
}

func UpdateRunner(w http.ResponseWriter, r *http.Request) {
var body runners.RunnerProgress

if !helpers.Bind(w, r, &body) {
helpers.WriteJSON(w, http.StatusBadRequest, map[string]string{
"error": "Invalid format",
})
return
}

taskPool := helpers.TaskPool(r)

if body.Jobs == nil {
w.WriteHeader(http.StatusNoContent)
return
}

for _, job := range body.Jobs {
tsk := taskPool.GetTask(job.ID)

if tsk == nil {
// TODO: log
continue
}

for _, logRecord := range job.LogRecords {
tsk.Log2(logRecord.Message, logRecord.Time)
}
}

w.WriteHeader(http.StatusNoContent)
}

func RegisterRunner(w http.ResponseWriter, r *http.Request) {
var register runners.RunnerRegistration

if !helpers.Bind(w, r, &register) {
helpers.WriteJSON(w, http.StatusBadRequest, map[string]string{
"error": "Invalid format",
})
return
}

if util.Config.RunnerRegistrationToken == "" || register.RegistrationToken != util.Config.RunnerRegistrationToken {
helpers.WriteJSON(w, http.StatusBadRequest, map[string]string{
"error": "Invalid registration token",
})
return
}

runner, err := helpers.Store(r).CreateRunner(db.Runner{
State: db.RunnerActive,
})

if err != nil {
helpers.WriteJSON(w, http.StatusInternalServerError, map[string]string{
"error": "Unexpected error",
})
return
}

res := runners.RunnerConfig{
RunnerID: runner.ID,
Token: runner.Token,
}

helpers.WriteJSON(w, http.StatusOK, res)
}
10 changes: 0 additions & 10 deletions cli/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,6 @@ func createStore(token string) db.Store {

store.Connect(token)

//if err := store.Connect(token); err != nil {
// switch err {
// case bbolt.ErrTimeout:
// fmt.Println("\n BoltDB supports only one connection at a time. You should stop Semaphore to use CLI.")
// default:
// fmt.Println("\n Have you run `semaphore setup`?")
// }
// os.Exit(1)
//}

err := db.Migrate(store)

if err != nil {
Expand Down
Loading

0 comments on commit d1b7ad0

Please sign in to comment.