From 98e2c0874687fbe8cc5dc478e3521801027c255c Mon Sep 17 00:00:00 2001 From: vsoch Date: Sun, 28 Jul 2024 00:50:58 -0600 Subject: [PATCH] queue-manager: reorganize into strategies I realized that we need flexibility in defining queue strategies, not just in how the worker is designed, but also how the queue strategy handles the schedule function. This is an overhaul (not quite done yet) that does that. I stil need to plug the final query back in to move provisional to the worker queue. Also note that it looks like we have priority, pending, and other insert params to play with. Signed-off-by: vsoch --- kubernetes/pkg/fluxnetes/group/group.go | 43 ++++ kubernetes/pkg/fluxnetes/podgroup.go | 123 ------------ kubernetes/pkg/fluxnetes/queries/queries.go | 10 + kubernetes/pkg/fluxnetes/queue.go | 188 ++++++++++++------ kubernetes/pkg/fluxnetes/strategy/fcfs.go | 96 +++++++++ kubernetes/pkg/fluxnetes/strategy/strategy.go | 22 ++ .../workers/workers.go} | 42 ++-- kubernetes/pkg/scheduler/schedule_one.go | 15 +- src/build/postgres/create-tables.sql | 9 + 9 files changed, 341 insertions(+), 207 deletions(-) delete mode 100644 kubernetes/pkg/fluxnetes/podgroup.go create mode 100644 kubernetes/pkg/fluxnetes/queries/queries.go create mode 100644 kubernetes/pkg/fluxnetes/strategy/fcfs.go create mode 100644 kubernetes/pkg/fluxnetes/strategy/strategy.go rename kubernetes/pkg/fluxnetes/{worker.go => strategy/workers/workers.go} (75%) create mode 100644 src/build/postgres/create-tables.sql diff --git a/kubernetes/pkg/fluxnetes/group/group.go b/kubernetes/pkg/fluxnetes/group/group.go index f29647a..1735c24 100644 --- a/kubernetes/pkg/fluxnetes/group/group.go +++ b/kubernetes/pkg/fluxnetes/group/group.go @@ -4,13 +4,56 @@ import ( "fmt" "time" + "strconv" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" klog "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/labels" "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" ) +// A PodGroup holds the name and size of a pod group +// It is just a temporary holding structure +type PodGroup struct { + Name string + Size int32 + Timestamp metav1.MicroTime +} + +// getPodGroupName returns the pod group name +// 1. We first look to see if the pod is explicitly labeled +// 2. If not, we fall back to a default based on the pod name and namespace +func GetPodGroupName(pod *corev1.Pod) string { + groupName := labels.GetPodGroupLabel(pod) + + // If we don't have a group, create one under fluxnetes namespace + if groupName == "" { + groupName = fmt.Sprintf("fluxnetes-group-%s-%s", pod.Namespace, pod.Name) + } + return groupName +} + +// getPodGroupSize gets the group size, first from label then default of 1 +func GetPodGroupSize(pod *corev1.Pod) (int32, error) { + + // Do we have a group size? This will be parsed as a string, likely + groupSize, ok := pod.Labels[labels.PodGroupSizeLabel] + if !ok { + groupSize = "1" + pod.Labels[labels.PodGroupSizeLabel] = groupSize + } + + // We need the group size to be an integer now! + size, err := strconv.ParseInt(groupSize, 10, 32) + if err != nil { + return 0, err + } + return int32(size), nil +} + +// TODO(vsoch) delete everything below here when PodGroup is no longer used // DefaultWaitTime is 60s if ScheduleTimeoutSeconds is not specified. const DefaultWaitTime = 60 * time.Second diff --git a/kubernetes/pkg/fluxnetes/podgroup.go b/kubernetes/pkg/fluxnetes/podgroup.go deleted file mode 100644 index c985e41..0000000 --- a/kubernetes/pkg/fluxnetes/podgroup.go +++ /dev/null @@ -1,123 +0,0 @@ -package fluxnetes - -import ( - "encoding/json" - "strconv" - "time" - - "github.com/jackc/pgx/v5/pgtype" - - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - - "context" - "fmt" - - klog "k8s.io/klog/v2" - "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/labels" -) - -// Queries -const ( - getTimestampQuery = "select created_at from pods_provisional where group_name=$1 limit 1" - getPodQuery = "select * from pods_provisional where group_name=$1 and namespace=$2 and name=$3" - insertPodQuery = "insert into pods_provisional (podspec, namespace, name, created_at, group_name, group_size) values ($1, $2, $3, $4, $5, $6)" -) - -// A PodGroup holds the name and size of a pod group -// It is just a temporary holding structure -type PodGroup struct { - Name string - Size int32 - Timestamp metav1.MicroTime -} - -// GetPodGroup returns the PodGroup that a Pod belongs to in cache. -func (q *Queue) GetPodGroup(pod *corev1.Pod) (*PodGroup, error) { - groupName := labels.GetPodGroupLabel(pod) - - // If we don't have a group, create one under fluxnetes namespace - if groupName == "" { - groupName = fmt.Sprintf("fluxnetes-group-%s", pod.Name) - } - - // Do we have a group size? This will be parsed as a string, likely - groupSize, ok := pod.Labels[labels.PodGroupSizeLabel] - if !ok { - groupSize = "1" - pod.Labels[labels.PodGroupSizeLabel] = groupSize - } - - // Log the namespace/name, group name, and size - klog.Infof("Pod Group Name for %s is %s (%s)", pod.Name, groupName, groupSize) - - // Get the creation timestamp for the group - ts, err := q.GetCreationTimestamp(pod, groupName) - if err != nil { - return nil, err - } - - // We need the group size to be an integer now! - size, err := strconv.ParseInt(groupSize, 10, 32) - if err != nil { - return nil, err - } - - // This can be improved - only get once for the group - // and add to some single table - return &PodGroup{Size: int32(size), Name: groupName, Timestamp: ts}, nil -} - -// EnqueuePod checks to see if a specific pod exists for a group. If yes, do nothing. -// If not, add it. If yes, update the podspec. -func (q *Queue) EnqueuePod(ctx context.Context, pod *corev1.Pod, group *PodGroup) error { - - // This query will fail if there are no rows (the podGroup is not known) - var groupName, name, namespace string - row := q.Pool.QueryRow(context.Background(), getPodQuery, group.Name, pod.Namespace, pod.Name) - err := row.Scan(groupName, name, namespace) - if err == nil { - - // TODO vsoch: if we can update a podspec, need to retrieve, check, update here - klog.Info("Found existing pod %s/%s in group %s", namespace, name, groupName) - return nil - } - klog.Error("Did not find pod in table", group) - - // Prepare timestamp and podspec for insertion... - ts := &pgtype.Timestamptz{Time: group.Timestamp.Time, Valid: true} - podspec, err := json.Marshal(pod) - if err != nil { - return err - } - _, err = q.Pool.Query(ctx, insertPodQuery, string(podspec), pod.Namespace, pod.Name, ts, group.Name, group.Size) - - // Show the user a success or an error, we return it either way - if err != nil { - klog.Infof("Error inserting rows %s", err) - } - return err -} - -// GetCreationTimestamp returns the creation time of a podGroup or a pod in seconds (time.MicroTime) -// We either get this from the pod itself (if size 1) or from the database -func (q *Queue) GetCreationTimestamp(pod *corev1.Pod, groupName string) (metav1.MicroTime, error) { - - // First see if we've seen the group before, the creation times are shared across a group - ts := metav1.MicroTime{} - - // This query will fail if there are no rows (the podGroup is not known) - err := q.Pool.QueryRow(context.Background(), getTimestampQuery, groupName).Scan(&ts) - if err == nil { - klog.Info("Creation timestamp is", ts) - return ts, err - } - klog.Error("This is the error", err) - - // This is the first member of the group - use its CreationTimestamp - if !pod.CreationTimestamp.IsZero() { - return metav1.NewMicroTime(pod.CreationTimestamp.Time), nil - } - // If the pod for some reasond doesn't have a timestamp, assume now - return metav1.NewMicroTime(time.Now()), nil -} diff --git a/kubernetes/pkg/fluxnetes/queries/queries.go b/kubernetes/pkg/fluxnetes/queries/queries.go new file mode 100644 index 0000000..a424e75 --- /dev/null +++ b/kubernetes/pkg/fluxnetes/queries/queries.go @@ -0,0 +1,10 @@ +package queries + +// Queries used by the main queue (and shared across strategies sometimes) +const ( + GetTimestampQuery = "select created_at from pods_provisional where group_name=$1 limit 1" + SelectGroupsQuery = "select group_name, group_size, podspec from pods_provisional" + GetPodQuery = "select * from pods_provisional where group_name=$1 and namespace=$2 and name=$3" + InsertPodQuery = "insert into pods_provisional (podspec, namespace, name, created_at, group_name, group_size) values ($1, $2, $3, $4, $5, $6)" + CountPodsQuery = "select count(*) from pods_provisional where group_name=$1" +) diff --git a/kubernetes/pkg/fluxnetes/queue.go b/kubernetes/pkg/fluxnetes/queue.go index c205323..fde2e7b 100644 --- a/kubernetes/pkg/fluxnetes/queue.go +++ b/kubernetes/pkg/fluxnetes/queue.go @@ -5,21 +5,28 @@ import ( "encoding/json" "log/slog" "os" + "time" - "github.com/jackc/pgx/v5" - "github.com/jackc/pgx/v5/pgxpool" corev1 "k8s.io/api/core/v1" - - // v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" klog "k8s.io/klog/v2" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + + "github.com/jackc/pgx/v5/pgtype" "github.com/riverqueue/river" "github.com/riverqueue/river/riverdriver/riverpgxv5" - // "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/podspec" + + groups "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/group" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/queries" + strategies "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/strategy" ) const ( queueMaxWorkers = 10 + MaxUint = ^uint(0) + MaxAttempts = int(MaxUint >> 1) ) // Queue holds handles to queue database and event handles @@ -28,6 +35,7 @@ type Queue struct { Pool *pgxpool.Pool riverClient *river.Client[pgx.Tx] EventChannels []*QueueEvent + Strategy strategies.QueueStrategy } type ChannelFunction func() @@ -44,8 +52,14 @@ func NewQueue(ctx context.Context) (*Queue, error) { if err != nil { return nil, err } + + // The default strategy now mirrors what fluence with Kubernetes does + // This can eventually be customizable + strategy := strategies.FCFSBackfill{} workers := river.NewWorkers() - river.AddWorker(workers, &JobWorker{}) + + // Each strategy has its own worker type + strategy.AddWorkers(workers) riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ Logger: slog.Default().With("id", "Fluxnetes"), Queues: map[string]river.QueueConfig{ @@ -62,7 +76,7 @@ func NewQueue(ctx context.Context) (*Queue, error) { if err != nil { return nil, err } - queue := Queue{riverClient: riverClient, Pool: dbPool} + queue := Queue{riverClient: riverClient, Pool: dbPool, Strategy: strategy} queue.setupEvents() return &queue, nil } @@ -83,84 +97,134 @@ func (q *Queue) setupEvents() { q.EventChannels = []*QueueEvent{} // Subscribers tell the River client the kinds of events they'd like to receive. - // We add them to a listing to be used by Kubernetes. Note that we are skipping - // the snooze channel (schedule job for later) - completedChan, completedFunc := q.riverClient.Subscribe(river.EventKindJobCompleted) - channel := &QueueEvent{Function: completedFunc, Channel: completedChan} - q.EventChannels = append(q.EventChannels, channel) - - failedChan, failedFunc := q.riverClient.Subscribe(river.EventKindJobFailed) - channel = &QueueEvent{Function: failedFunc, Channel: failedChan} - q.EventChannels = append(q.EventChannels, channel) - - delayChan, delayFunc := q.riverClient.Subscribe(river.EventKindJobSnoozed) - channel = &QueueEvent{Function: delayFunc, Channel: delayChan} - q.EventChannels = append(q.EventChannels, channel) - - cancelChan, cancelFunc := q.riverClient.Subscribe(river.EventKindJobCancelled) - channel = &QueueEvent{Function: cancelFunc, Channel: cancelChan} - q.EventChannels = append(q.EventChannels, channel) -} - -// MoveQueue moves pods from provisional into the jobs queue -func (q *Queue) MoveQueue() { - - // If there is a group, wait for minMember. - // When we dispatch the group, will need to clean up this table - + // We add them to a listing to be used by Kubernetes. These can be subscribed + // to from elsewhere too (anywhere) + for _, event := range []river.EventKind{ + river.EventKindJobCompleted, + river.EventKindJobCancelled, + river.EventKindJobFailed, + river.EventKindJobSnoozed, + } { + c, trigger := q.riverClient.Subscribe(event) + channel := &QueueEvent{Function: trigger, Channel: c} + q.EventChannels = append(q.EventChannels, channel) + } } -// Enqueue a new job to the queue -// When we add a job, we have generated the jobspec and the group is ready. +// Enqueue a new job to the provisional queue +// 1. Assemble (discover or define) the group +// 2. Add to provisional table func (q *Queue) Enqueue(ctx context.Context, pod *corev1.Pod) error { - // Get the pod group - this does not alter the database, but just looks for it - podGroup, err := q.GetPodGroup(pod) + // Get the pod name and size, first from labels, then defaults + groupName := groups.GetPodGroupName(pod) + size, err := groups.GetPodGroupSize(pod) if err != nil { return err } - klog.Infof("Derived podgroup %s (%d) created at %s", podGroup.Name, podGroup.Size, podGroup.Timestamp) - // Case 1: add to queue if size 1 - we don't need to keep a record of it. - if podGroup.Size == 1 { - err = q.SubmitJob(ctx, pod, podGroup) + // Get the creation timestamp for the group + ts, err := q.GetCreationTimestamp(pod, groupName) + if err != nil { + return err } - // Case 2: all other cases, add to pod table (if does not exist) - // Pods added to the table are checked at the end (and Submit if - // the group is ready, meaning having achieved minimum size. - q.EnqueuePod(ctx, pod, podGroup) - return nil + + // Log the namespace/name, group name, and size + klog.Infof("Pod %s has Group %s is %s (%d) created at %s", pod.Name, groupName, size, ts) + + // Add the pod to the provisional table. + group := &groups.PodGroup{Size: size, Name: groupName, Timestamp: ts} + return q.enqueueProvisional(ctx, pod, group) } -// SubmitJob subits the podgroup (and podspec) to the queue -// It will later be given to AskFlux -// TODO this should not rely on having a single pod. -func (q *Queue) SubmitJob(ctx context.Context, pod *corev1.Pod, group *PodGroup) error { +// EnqueueProvisional adds a pod to the provisional queue +func (q *Queue) enqueueProvisional(ctx context.Context, pod *corev1.Pod, group *groups.PodGroup) error { + + // This query will fail if there are no rows (the podGroup is not known) + var groupName, name, namespace string + row := q.Pool.QueryRow(context.Background(), queries.GetPodQuery, group.Name, pod.Namespace, pod.Name) + err := row.Scan(groupName, name, namespace) - // podspec needs to be serialized to pass to the job - asJson, err := json.Marshal(pod) + // We didn't find the pod in the table - add it. if err != nil { + klog.Error("Did not find pod in table", group) + + // Prepare timestamp and podspec for insertion... + ts := &pgtype.Timestamptz{Time: group.Timestamp.Time, Valid: true} + podspec, err := json.Marshal(pod) + if err != nil { + return err + } + _, err = q.Pool.Query(ctx, queries.InsertPodQuery, string(podspec), pod.Namespace, pod.Name, ts, group.Name, group.Size) + + // Show the user a success or an error, we return it either way + if err != nil { + klog.Infof("Error inserting provisional pod %s", err) + } return err } + return err +} - // Create and submit the new job! This needs to serialize as json, hence converting - // the podspec. It's not clear to me if we need to keep the original pod object - job := JobArgs{GroupName: group.Name, Podspec: string(asJson), GroupSize: group.Size} +// Schedule moves jobs from provisional to work queue +// This is based on a queue strategy. The default is fcfs with backfill. +// This mimics what Kubernetes does. Note that jobs can be sorted +// based on the scheduled at time AND priority. +func (q *Queue) Schedule(ctx context.Context) error { - // Start a transaction to insert a job - without this it won't run! - tx, err := q.Pool.Begin(ctx) + // Queue Strategy "Schedule" moves provional to the worker queue TBA + // This doesn't have priority and insert opts added, but can easily. + // Get back river.JobArgs, interface of args for worker type + err, jobs := q.Strategy.Schedule(ctx, q.Pool) if err != nil { return err } - defer tx.Rollback(ctx) - row, err := q.riverClient.InsertTx(ctx, tx, job, nil) - err = tx.Commit(ctx) + // Shared insertOpts. + // Tags can eventually be specific to job attributes, queues, etc. + // These should eventually be moved into the strategy. + insertOpts := river.InsertOpts{ + MaxAttempts: MaxAttempts, + Tags: []string{q.Strategy.Name()}, + } + + // https://riverqueue.com/docs/batch-job-insertion + // Note: this is how to eventually add Priority (1-4, 4 is lowest) + // And we can customize other InsertOpts. Of interest is Pending: + // https://github.com/riverqueue/river/blob/master/insert_opts.go#L35-L40 + // Note also that ScheduledAt can be used for a reservation! + batch := []river.InsertManyParams{} + for _, jobArgs := range jobs { + args := river.InsertManyParams{Args: jobArgs, InsertOpts: &insertOpts} + batch = append(batch, args) + } + count, err := q.riverClient.InsertMany(ctx, batch) if err != nil { return err } - if row.Job != nil { - klog.Infof("Job %d was added to the queue", row.Job.ID) - } + klog.Infof("[Fluxnetes] Schedule inserted %d jobs\n", count) return nil } + +// GetCreationTimestamp returns the creation time of a podGroup or a pod in seconds (time.MicroTime) +// We either get this from the pod itself (if size 1) or from the database +func (q *Queue) GetCreationTimestamp(pod *corev1.Pod, groupName string) (metav1.MicroTime, error) { + + // First see if we've seen the group before, the creation times are shared across a group + ts := metav1.MicroTime{} + + // This query will fail if there are no rows (the podGroup is not known) + err := q.Pool.QueryRow(context.Background(), queries.GetTimestampQuery, groupName).Scan(&ts) + if err == nil { + klog.Info("Creation timestamp is", ts) + return ts, err + } + klog.Error("This is the error", err) + + // This is the first member of the group - use its CreationTimestamp + if !pod.CreationTimestamp.IsZero() { + return metav1.NewMicroTime(pod.CreationTimestamp.Time), nil + } + // If the pod for some reasond doesn't have a timestamp, assume now + return metav1.NewMicroTime(time.Now()), nil +} diff --git a/kubernetes/pkg/fluxnetes/strategy/fcfs.go b/kubernetes/pkg/fluxnetes/strategy/fcfs.go new file mode 100644 index 0000000..c49d620 --- /dev/null +++ b/kubernetes/pkg/fluxnetes/strategy/fcfs.go @@ -0,0 +1,96 @@ +package strategy + +import ( + "context" + "fmt" + + klog "k8s.io/klog/v2" + + "github.com/jackc/pgx/v5/pgxpool" + "github.com/riverqueue/river" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/queries" + work "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/strategy/workers" + // "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/group" +) + +// FCFS with Backfill +// Schedule jobs that come in first, but allow smaller jobs to fill in +type FCFSBackfill struct{} + +// Name returns shortened "first come first serve" +func (FCFSBackfill) Name() string { + return "fcfs-backfill" +} + +// GetWorker returns the worker for the queue strategy +// TODO(vsoch) rename this to be more specific for the worker type +func (FCFSBackfill) AddWorkers(workers *river.Workers) { + river.AddWorker(workers, &work.JobWorker{}) +} + +// queryReady checks if the number of pods we know of is >= required group size +// we also sort by the timestamp added so jobs added earlier are submit sooner +func (s FCFSBackfill) queryReady(ctx context.Context, pool *pgxpool.Pool) (bool, error) { + // var count int32 + // row, err := pool.Query(ctx, queries.CountPodsQuery, group.Name) + // + // if err != nil { + // return false, err + // } + // + // err = row.Scan(count) + // + // if err != nil { + // klog.Error("Cannot count number of members in group", err) + // return false, err + // } + // + // if count >= group.Size { + // return true, nil + // } + // + return false, nil +} + +func (s FCFSBackfill) queryAll(ctx context.Context, pool *pgxpool.Pool) error { + var groupName, podspec []string + var groupSize []int32 + rows, err := pool.Query(ctx, queries.SelectGroupsQuery) + if err != nil { + return err + } + fmt.Println(rows) + err = rows.Scan(groupName, groupSize, podspec) + fmt.Println(groupName) + if err != nil { + klog.Error("Cannot retrieve all members in group", err) + return err + } + return nil +} + +// Schedule moves pod groups from provisional to workers based on a strategy. +// We return a listing of river.JobArgs (JobArgs here) to be submit with batch. +// In this case it is first come first serve - we just sort based on the timestamp +// and add them to the worker queue. They run with they can, with smaller +// jobs being allowed to fill in. Other strategies will need to handle AskFlux +// and submitting batch differently. +func (s FCFSBackfill) Schedule(ctx context.Context, pool *pgxpool.Pool) (error, []river.JobArgs) { + + // Is this group ready to be scheduled with the addition of this pod? + err := s.queryAll(ctx, pool) + if err != nil { + klog.Errorf("Issue FCFS with backfill querying for ready groups", err) + } + + // podspec needs to be serialized to pass to the job + // asJson, err := json.Marshal(pod) + // if err != nil { + // return err + // } + + // Create and submit the new job! This needs to serialize as json, hence converting + // the podspec. It's not clear to me if we need to keep the original pod object + // job := JobArgs{GroupName: group.Name, Podspec: string(asJson), GroupSize: group.Size} + return nil, []river.JobArgs{} +} diff --git a/kubernetes/pkg/fluxnetes/strategy/strategy.go b/kubernetes/pkg/fluxnetes/strategy/strategy.go new file mode 100644 index 0000000..b625a52 --- /dev/null +++ b/kubernetes/pkg/fluxnetes/strategy/strategy.go @@ -0,0 +1,22 @@ +package strategy + +import ( + "context" + + "github.com/jackc/pgx/v5/pgxpool" + "github.com/riverqueue/river" +) + +// Interface for a queue strategy +// A queue strategy both controls a work function (what the worker does, and arguments) +// Along with how to orchestrate the last part of the schedule loop, schedule, which +// moves pods from provisional (waiting for groups to be ready) into worker queues + +// We currently just return a name, and provide a schedule function to move things around! +type QueueStrategy interface { + Name() string + + // provide the entire queue to interact with + Schedule(ctx context.Context, pool *pgxpool.Pool) (error, []river.JobArgs) + AddWorkers(*river.Workers) +} diff --git a/kubernetes/pkg/fluxnetes/worker.go b/kubernetes/pkg/fluxnetes/strategy/workers/workers.go similarity index 75% rename from kubernetes/pkg/fluxnetes/worker.go rename to kubernetes/pkg/fluxnetes/strategy/workers/workers.go index 60a903b..cb99529 100644 --- a/kubernetes/pkg/fluxnetes/worker.go +++ b/kubernetes/pkg/fluxnetes/strategy/workers/workers.go @@ -1,4 +1,4 @@ -package fluxnetes +package workers import ( "context" @@ -18,10 +18,19 @@ import ( ) type JobArgs struct { + + // Submit Args Jobspec string `json:"jobspec"` Podspec string `json:"podspec"` GroupName string `json:"groupName"` GroupSize int32 `json:"groupSize"` + + // Nodes return to Kubernetes to bind + // We can eventually have a kubectl command + // to get a job too ;) + Nodes []string `json:"nodes"` + FluxJob int64 `json:"jobid"` + PodId string `json:"podid"` } // The Kind MUST correspond to the Args and Worker @@ -82,23 +91,18 @@ func (w JobWorker) Work(ctx context.Context, job *river.Job[JobArgs]) error { } klog.Info("Fluxion response %s", response) - // TODO we need a way to pass this information back to the scheduler as a notification - // TODO GetPodID should be renamed, because it will reflect the group - // podGroupManager.log.Info("[PodGroup AskFlux] Match response ID %s\n", response.GetPodID()) - - // Get the nodelist and inspect - // nodelist := response.GetNodelist() - // for _, node := range nodelist { - // nodes = append(nodes, node.NodeID) - // } - // jobid := uint64(response.GetJobID()) - // podGroupManager.log.Info("[PodGroup AskFlux] parsed node pods list %s for job id %d\n", nodes, jobid) - - // TODO would be nice to actually be able to ask flux jobs -a to fluxnetes - // That way we can verify assignments, etc. - // podGroupManager.mutex.Lock() - // podGroupManager.groupToJobId[groupName] = jobid - // podGroupManager.mutex.Unlock() - // return nodes, nil + // We update the "Args" of the job to pass the node assignment back to the scheduler + job.Args.PodId = response.GetPodID() + job.Args.FluxJob = response.GetJobID() + + // Get the nodelist and serialize into list of strings for job args + nodelist := response.GetNodelist() + nodes := []string{} + for _, node := range nodelist { + nodes = append(nodes, node.NodeID) + } + job.Args.Nodes = nodes + + klog.Infof("[Fluxnetes] nodes allocated %s for flux job id %d\n", nodes, job.Args.FluxJob) return nil } diff --git a/kubernetes/pkg/scheduler/schedule_one.go b/kubernetes/pkg/scheduler/schedule_one.go index 84ac7e4..9093cc5 100644 --- a/kubernetes/pkg/scheduler/schedule_one.go +++ b/kubernetes/pkg/scheduler/schedule_one.go @@ -95,11 +95,20 @@ func (sched *Scheduler) ScheduleOne(ctx context.Context) { return } - // Dummy test to add job to queue, single pod - err = sched.Queue.Enqueue(ctx, pod) + // Add the pod to the provisional queue + fluxCtx, cancelEnqueue := context.WithCancel(ctx) + defer cancelEnqueue() + err = sched.Queue.Enqueue(fluxCtx, pod) if err != nil { - logger.Error(err, "Issue with enqueue") + logger.Error(err, "Issue with fluxnetes Enqueue") } + + // Move from provisional to worker queue to schedule via events + err = sched.Queue.Schedule(fluxCtx) + if err != nil { + logger.Error(err, "Issue with fluxnetes Schedule") + } + logger.V(3).Info("Attempting to schedule pod", "pod", klog.KObj(pod)) // Synchronously attempt to find a fit for the pod. diff --git a/src/build/postgres/create-tables.sql b/src/build/postgres/create-tables.sql new file mode 100644 index 0000000..6e7545b --- /dev/null +++ b/src/build/postgres/create-tables.sql @@ -0,0 +1,9 @@ +CREATE TABLE pods_provisional ( + podspec TEXT, + namespace TEXT, + name TEXT, + created_at timestamptz not null default NOW(), + group_name TEXT, + group_size INTEGER +); +CREATE INDEX group_name_index ON pods_provisional (group_name);