Skip to content

Commit

Permalink
notify: add working events to send nodes
Browse files Browse the repository at this point in the history
This changeset includes a query that will update Args (node)
from within a worker job so we can send them back to the
scheduler. I am lastly working on the command so that the
initial query will move provisional pods (and groups) from
the provisional table to the worker queue

Signed-off-by: vsoch <[email protected]>
  • Loading branch information
vsoch committed Jul 29, 2024
1 parent 69e7624 commit 4ef00c2
Show file tree
Hide file tree
Showing 9 changed files with 258 additions and 76 deletions.
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,16 @@ psql -U postgres
SELECT group_name, group_size from pods_provisional;
```

### TODO

- [ ] I'd like a more efficient query (or strategy) to move pods from provisional into the worker queue. Right now I have three queries and it's too many.
- [ ] Discussion about how to respond to a "failed" allocation request (meaning we just can't give nodes now, likely to happen a lot). Maybe we need to do a reservation instead?
- [ ] I think maybe we should do a match allocate else reserve instead (see issue [here](https://github.com/converged-computing/fluxnetes/issues/4))
- [ ] Restarting with postgres shouldn't have crashloopbackoff when the database isn't ready yet
- [ ] In-tree registry plugins (that are related to resources) should be run first to inform fluxion what nodes not to bind, where there are volumes, etc.
- [ ] The queue should inherit (and return) the start time (when the pod was first seen) "start" in scheduler.go
- [ ] The provisional -> scheduled should do a sort for the timestamp (I mostly just forgot this)!

## License

HPCIC DevTools is distributed under the terms of the MIT license.
Expand Down
9 changes: 7 additions & 2 deletions kubernetes/pkg/fluxnetes/defaults/defaults.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package defaults

import (
"math"
)

const (
MaxUint = ^uint(0)
MaxAttempts = int(MaxUint >> 1)
// https://github.com/riverqueue/river/discussions/475
// The database column is an int16
MaxAttempts = math.MaxInt16
)
13 changes: 13 additions & 0 deletions kubernetes/pkg/fluxnetes/fluxnetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package fluxnetes
import (
"context"
"fmt"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -46,6 +47,18 @@ var (
GroupName = "scheduling.x-k8s.io"
)

// JobResult serializes a result from Fluxnetes in the scheduler back to metadata
type JobResult struct {
JobID int32 `json:"jobid"`
Nodes string `json:"nodes"`
PodID string `json:"podid"`
PodSpec string `json:"podspec"`
}

func (j JobResult) GetNodes() []string {
return strings.Split(j.Nodes, ",")
}

// Fluxnetes schedules pods in a group using Fluxion as a backend
// We inherit cosched.Coscheduling to use some of the primary functions
type Fluxnetes struct {
Expand Down
11 changes: 10 additions & 1 deletion kubernetes/pkg/fluxnetes/queries/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,17 @@ package queries
// Queries used by the main queue (and shared across strategies sometimes)
const (
GetTimestampQuery = "select created_at from pods_provisional where group_name=$1 limit 1"
SelectGroupsQuery = "select group_name, group_size, podspec from pods_provisional"
GetPodQuery = "select * from pods_provisional where group_name=$1 and namespace=$2 and name=$3"
InsertPodQuery = "insert into pods_provisional (podspec, namespace, name, created_at, group_name, group_size) values ($1, $2, $3, $4, $5, $6)"
CountPodsQuery = "select count(*) from pods_provisional where group_name=$1"
UpdateNodesQuery = "update river_job set args = jsonb_set(args, '{nodes}', to_jsonb($1::text)) where id=$2;"

// This could use improvement from someone good at SQL. We need to:
// 1. Select groups for which the size >= the number of pods we've seen
// 2. Then get the group_name, group_size, and podspec for each (this goes to scheduler)
// 3. Delete all from the table
// Ensure we are sorting by the timestamp when they were added (should be DESC I think)
SelectGroupsAtSizeQuery = "select group_name from pods_provisional group by group_name, group_size having group_size >= count(*);"
SelectGroupsQuery = "select group_name, group_size, podspec from pods_provisional where group_name in ('%s');"
DeleteGroupsQuery = "delete from pods_provisional where group_name in ('%s');"
)
8 changes: 5 additions & 3 deletions kubernetes/pkg/fluxnetes/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,14 @@ func (q *Queue) setupEvents() {

// Subscribers tell the River client the kinds of events they'd like to receive.
// We add them to a listing to be used by Kubernetes. These can be subscribed
// to from elsewhere too (anywhere)
// to from elsewhere too (anywhere). Note that we are not subscribing to failed
// or snoozed, because they right now mean "allocation not possible" and that
// is too much noise.
for _, event := range []river.EventKind{
river.EventKindJobCompleted,
river.EventKindJobCancelled,
river.EventKindJobFailed,
river.EventKindJobSnoozed,
// river.EventKindJobFailed,
// river.EventKindJobSnoozed,
} {
c, trigger := q.riverClient.Subscribe(event)
channel := &QueueEvent{Function: trigger, Channel: c}
Expand Down
78 changes: 68 additions & 10 deletions kubernetes/pkg/fluxnetes/strategy/fcfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package strategy

import (
"context"
"fmt"
"strings"

klog "k8s.io/klog/v2"

Expand Down Expand Up @@ -38,30 +40,86 @@ func (FCFSBackfill) AddWorkers(workers *river.Workers) {
river.AddWorker(workers, &work.JobWorker{})
}

// queryReady checks if the number of pods we know of is >= required group size
// TODO(vsoch) This currently returns the entire table, and the sql needs to be tweaked
func (s FCFSBackfill) queryReady(ctx context.Context, pool *pgxpool.Pool) ([]work.JobArgs, error) {
rows, err := pool.Query(ctx, queries.SelectGroupsQuery)
// queryGroupsAtSize returns groups that have achieved minimum size
func (s FCFSBackfill) queryGroupsAtSize(ctx context.Context, pool *pgxpool.Pool) ([]string, error) {

// First retrieve the group names that are the right size
rows, err := pool.Query(ctx, queries.SelectGroupsAtSizeQuery)
if err != nil {
return nil, err
}
defer rows.Close()

// Collect rows into single result
groupNames, err := pgx.CollectRows(rows, pgx.RowTo[string])
klog.Infof("GROUP NAMES %s", groupNames)
return groupNames, err
}

// queryGroupsAtSize returns groups that have achieved minimum size
func (s FCFSBackfill) deleteGroups(ctx context.Context, pool *pgxpool.Pool, groupNames []string) error {

// First retrieve the group names that are the right size
query := fmt.Sprintf(queries.DeleteGroupsQuery, strings.Join(groupNames, ","))
klog.Infof("DELETE %s", query)
rows, err := pool.Query(ctx, query)
if err != nil {
return err
}
defer rows.Close()
return err
}

// queryGroupsAtSize returns groups that have achieved minimum size
func (s FCFSBackfill) getGroupsAtSize(ctx context.Context, pool *pgxpool.Pool, groupNames []string) ([]work.JobArgs, error) {

// Now we need to collect all the pods that match that.
query := fmt.Sprintf(queries.SelectGroupsQuery, strings.Join(groupNames, "','"))
klog.Infof("GET %s", query)
rows, err := pool.Query(ctx, query)
if err != nil {
return nil, err
}
defer rows.Close()

// Collect rows into slice of jobs
// Collect rows into map, and then slice of jobs
// The map whittles down the groups into single entries
// We will eventually not want to do that, assuming podspecs are different in a group
jobs := []work.JobArgs{}
lookup := map[string]work.JobArgs{}

// Collect rows into single result
models, err := pgx.CollectRows(rows, pgx.RowToStructByName[JobModel])
klog.Infof("Models: %s", models)

// TODO(vsoch) we need to get unique podgroups here, and one representative podspec
// we will eventually want to instead have the ability to support unique podspecs
// under one group
// TODO(vsoch) we need to collect all podspecs here and be able to give that to the worker
for _, model := range models {
jobArgs := work.JobArgs{GroupName: model.GroupName, Podspec: model.Podspec, GroupSize: model.GroupSize}
lookup[model.GroupName] = jobArgs
}
for _, jobArgs := range lookup {
jobs = append(jobs, jobArgs)
}
klog.Infof("jobs: %s\n", jobs)
return jobs, nil
}

// queryReady checks if the number of pods we know of is >= required group size
// TODO(vsoch) This currently returns the entire table, and the sql needs to be tweaked
func (s FCFSBackfill) queryReady(ctx context.Context, pool *pgxpool.Pool) ([]work.JobArgs, error) {

// 1. Get the list of group names that have pod count >= their size
groupNames, err := s.queryGroupsAtSize(ctx, pool)
if err != nil {
return nil, err
}

// 2. Now we need to collect all the pods that match that.
jobs, err := s.getGroupsAtSize(ctx, pool, groupNames)
if err != nil {
return nil, err
}

// 3. Finally, we need to delete them from provisional
err = s.deleteGroups(ctx, pool, groupNames)
return jobs, err
}

Expand Down
46 changes: 37 additions & 9 deletions kubernetes/pkg/fluxnetes/strategy/workers/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,21 @@ package workers
import (
"context"
"encoding/json"
"os"
"strings"
"time"

"github.com/jackc/pgx/v5/pgxpool"
"google.golang.org/grpc"

// v1 "k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
klog "k8s.io/klog/v2"

pb "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/fluxion-grpc"

"github.com/riverqueue/river"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/podspec"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/queries"
)

type JobArgs struct {
Expand All @@ -25,12 +28,13 @@ type JobArgs struct {
GroupName string `json:"groupName"`
GroupSize int32 `json:"groupSize"`

// Nodes return to Kubernetes to bind
// Nodes return to Kubernetes to bind, and MUST
// have attributes for the Nodes and Podspecs.
// We can eventually have a kubectl command
// to get a job too ;)
Nodes []string `json:"nodes"`
FluxJob int64 `json:"jobid"`
PodId string `json:"podid"`
Nodes string `json:"nodes"`
FluxJob int64 `json:"jobid"`
PodId string `json:"podid"`
}

// The Kind MUST correspond to the <type>Args and <type>Worker
Expand Down Expand Up @@ -91,18 +95,42 @@ func (w JobWorker) Work(ctx context.Context, job *river.Job[JobArgs]) error {
}
klog.Info("Fluxion response %s", response)

// These don't actually update, eventually we can update them also in the database update
// We update the "Args" of the job to pass the node assignment back to the scheduler
job.Args.PodId = response.GetPodID()
job.Args.FluxJob = response.GetJobID()
// job.Args.PodId = response.GetPodID()
// job.Args.FluxJob = response.GetJobID()

// Get the nodelist and serialize into list of strings for job args
nodelist := response.GetNodelist()
nodes := []string{}
for _, node := range nodelist {
nodes = append(nodes, node.NodeID)
}
job.Args.Nodes = nodes
nodeStr := strings.Join(nodes, ",")

klog.Infof("[Fluxnetes] nodes allocated %s for flux job id %d\n", nodes, job.Args.FluxJob)
// We must update the database with nodes from here with a query
// This will be sent back to the Kubernetes scheduler
pool, err := pgxpool.New(ctx, os.Getenv("DATABASE_URL"))
if err != nil {
return err
}

rows, err := pool.Query(ctx, queries.UpdateNodesQuery, nodeStr, job.ID)
if err != nil {
return err
}
defer rows.Close()

// Collect rows into single result
// pgx.CollectRows(rows, pgx.RowTo[string])
// klog.Infof("Values: %s", values)

klog.Infof("[Fluxnetes] nodes allocated %s for flux job id %d\n", nodeStr, job.Args.FluxJob)
return nil
}

// If needed, to get a client from a worker (to submit more jobs)
// client, err := river.ClientFromContextSafely[pgx.Tx](ctx)
// if err != nil {
// return fmt.Errorf("error getting client from context: %w", err)
// }
Loading

0 comments on commit 4ef00c2

Please sign in to comment.