Skip to content

Commit

Permalink
worker: retrieval of podspec and AskFlux
Browse files Browse the repository at this point in the history
This changeset creates separate worker and podgroup fluxnetes
package files, and they handle worker definition and pod group
parsing functions, respectively. Up to this point we can now
1. retrieve a new pod and see if it is in a group.
2. if no (size 1) add to worker queue immediatel.
   if yes (size N) add to pods table to be inspected later
3. retrieve the podspec in the work function
4. parse back into podspec and ask flux for the allocation.
I next need to do two things. First, figure out how to pass
the node assignment back to the scheduler - I am hoping
the job object "JobRow" can be modified to add metadata.
Then we need to write the function to run at the end of
a schedule cycle that moves groups from the provisional
table to the worker queue

Signed-off-by: vsoch <[email protected]>
  • Loading branch information
vsoch committed Jul 28, 2024
1 parent 8047000 commit 633bf36
Show file tree
Hide file tree
Showing 6 changed files with 334 additions and 47 deletions.
123 changes: 123 additions & 0 deletions kubernetes/pkg/fluxnetes/podgroup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package fluxnetes

import (
"encoding/json"
"strconv"
"time"

"github.com/jackc/pgx/v5/pgtype"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"context"
"fmt"

klog "k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/labels"
)

// Queries
const (
getTimestampQuery = "select created_at from pods_provisional where group_name=$1 limit 1"
getPodQuery = "select * from pods_provisional where group_name=$1 and namespace=$2 and name=$3"
insertPodQuery = "insert into pods_provisional (podspec, namespace, name, created_at, group_name, group_size) values ($1, $2, $3, $4, $5, $6)"
)

// A PodGroup holds the name and size of a pod group
// It is just a temporary holding structure
type PodGroup struct {
Name string
Size int32
Timestamp metav1.MicroTime
}

// GetPodGroup returns the PodGroup that a Pod belongs to in cache.
func (q *Queue) GetPodGroup(pod *corev1.Pod) (*PodGroup, error) {
groupName := labels.GetPodGroupLabel(pod)

// If we don't have a group, create one under fluxnetes namespace
if groupName == "" {
groupName = fmt.Sprintf("fluxnetes-group-%s", pod.Name)
}

// Do we have a group size? This will be parsed as a string, likely
groupSize, ok := pod.Labels[labels.PodGroupSizeLabel]
if !ok {
groupSize = "1"
pod.Labels[labels.PodGroupSizeLabel] = groupSize
}

// Log the namespace/name, group name, and size
klog.Infof("Pod Group Name for %s is %s (%s)", pod.Name, groupName, groupSize)

// Get the creation timestamp for the group
ts, err := q.GetCreationTimestamp(pod, groupName)
if err != nil {
return nil, err
}

// We need the group size to be an integer now!
size, err := strconv.ParseInt(groupSize, 10, 32)
if err != nil {
return nil, err
}

// This can be improved - only get once for the group
// and add to some single table
return &PodGroup{Size: int32(size), Name: groupName, Timestamp: ts}, nil
}

// EnqueuePod checks to see if a specific pod exists for a group. If yes, do nothing.
// If not, add it. If yes, update the podspec.
func (q *Queue) EnqueuePod(ctx context.Context, pod *corev1.Pod, group *PodGroup) error {

// This query will fail if there are no rows (the podGroup is not known)
var groupName, name, namespace string
row := q.Pool.QueryRow(context.Background(), getPodQuery, group.Name, pod.Namespace, pod.Name)
err := row.Scan(groupName, name, namespace)
if err == nil {

// TODO vsoch: if we can update a podspec, need to retrieve, check, update here
klog.Info("Found existing pod %s/%s in group %s", namespace, name, groupName)
return nil
}
klog.Error("Did not find pod in table", group)

// Prepare timestamp and podspec for insertion...
ts := &pgtype.Timestamptz{Time: group.Timestamp.Time, Valid: true}
podspec, err := json.Marshal(pod)
if err != nil {
return err
}
_, err = q.Pool.Query(ctx, insertPodQuery, string(podspec), pod.Namespace, pod.Name, ts, group.Name, group.Size)

// Show the user a success or an error, we return it either way
if err != nil {
klog.Infof("Error inserting rows %s", err)
}
return err
}

// GetCreationTimestamp returns the creation time of a podGroup or a pod in seconds (time.MicroTime)
// We either get this from the pod itself (if size 1) or from the database
func (q *Queue) GetCreationTimestamp(pod *corev1.Pod, groupName string) (metav1.MicroTime, error) {

// First see if we've seen the group before, the creation times are shared across a group
ts := metav1.MicroTime{}

// This query will fail if there are no rows (the podGroup is not known)
err := q.Pool.QueryRow(context.Background(), getTimestampQuery, groupName).Scan(&ts)
if err == nil {
klog.Info("Creation timestamp is", ts)
return ts, err
}
klog.Error("This is the error", err)

// This is the first member of the group - use its CreationTimestamp
if !pod.CreationTimestamp.IsZero() {
return metav1.NewMicroTime(pod.CreationTimestamp.Time), nil
}
// If the pod for some reasond doesn't have a timestamp, assume now
return metav1.NewMicroTime(time.Now()), nil
}
57 changes: 56 additions & 1 deletion kubernetes/pkg/fluxnetes/podspec/podspec.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,63 @@ import (
pb "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/fluxion-grpc"
)

// TODO this package should be renamed something related to a PodSpec Info
// PreparePodspec prepares a podspec based on one pod (that will be multipled)
// This should eventually handle unique pods
func PreparePodspec(pod *v1.Pod, groupName string) *pb.PodSpec {
podSpec := new(pb.PodSpec)
podSpec.Id = groupName

// There was an if check here to see if we had labels,
// I don't think there is risk to adding an empty list but we can add
// the check back if there is
podSpec.Labels = getPodJobspecLabels(pod)

// the jobname should be the group name
podSpec.Container = groupName

// Create accumulated requests for cpu and limits
// CPU and memory are summed across containers
// GPU cannot be shared across containers, but we
// take a count for the pod for the PodSpec
var cpus int32 = 0
var memory int64 = 0
var gpus int64 = 0

// I think we are OK to sum this too
// https://github.com/kubernetes/kubectl/blob/master/pkg/describe/describe.go#L4211-L4213
var storage int64 = 0

for _, container := range pod.Spec.Containers {

// Add on Cpu, Memory, GPU from container requests
// This is a limited set of resources owned by the pod
specRequests := container.Resources.Requests
cpus += int32(specRequests.Cpu().Value())
memory += specRequests.Memory().Value()
storage += specRequests.StorageEphemeral().Value()

specLimits := container.Resources.Limits
gpuSpec := specLimits["nvidia.com/gpu"]
gpus += gpuSpec.Value()

}

// If we have zero cpus, assume 1
// We could use math.Max here, but it is expecting float64
if cpus == 0 {
cpus = 1
}
podSpec.Cpu = cpus
podSpec.Gpu = gpus
podSpec.Memory = memory
podSpec.Storage = storage

// I removed specRequests.Cpu().MilliValue() but we can add back some derivative if desired
klog.Infof("[Jobspec] Pod spec: CPU %v, memory %v, GPU %v, storage %v", podSpec.Cpu, podSpec.Memory, podSpec.Gpu, podSpec.Storage)
return podSpec
}

// TODO delete this function to be replaced by one above
// getPodJobpsecLabels looks across labels and returns those relevant
// to a jobspec
func getPodJobspecLabels(pod *v1.Pod) []string {
Expand Down
91 changes: 46 additions & 45 deletions kubernetes/pkg/fluxnetes/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,28 @@ package fluxnetes

import (
"context"
"fmt"
"encoding/json"
"log/slog"
"os"
"time"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
corev1 "k8s.io/api/core/v1"

// v1 "k8s.io/api/core/v1"
klog "k8s.io/klog/v2"

"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdriver/riverpgxv5"

"k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/logger"
// "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/podspec"
)

const (
queueMaxWorkers = 10
)

// Queue holds handles to queue database and event handles
// The database Pool also allows interacting with the pods table (database.go)
type Queue struct {
Pool *pgxpool.Pool
riverClient *river.Client[pgx.Tx]
Expand All @@ -36,35 +38,6 @@ type QueueEvent struct {
Function ChannelFunction
}

type JobArgs struct {
ShouldSnooze bool `json:"shouldSnooze"`
// Jobspec *pb.PodSpec
GroupName string `json:"groupName"`
}

// The Kind MUST correspond to the <type>Args and <type>Worker
func (args JobArgs) Kind() string { return "job" }

type JobWorker struct {
river.WorkerDefaults[JobArgs]
}

// Work performs the AskFlux action. Cases include:
// Allocated: the job was successful and does not need to be re-queued. We return nil (completed)
// NotAllocated: the job cannot be allocated and needs to be retried (Snoozed)
// Not possible for some reason, likely needs a cancel
// See https://riverqueue.com/docs/snoozing-jobs
func (w JobWorker) Work(ctx context.Context, job *river.Job[JobArgs]) error {
l := logger.NewDebugLogger(logger.LevelDebug, "/tmp/workers.log")
l.Info("I am running")
klog.Infof("[WORKER]", "JobStatus", "Running")
fmt.Println("I am running")
if job.Args.ShouldSnooze {
return river.JobSnooze(1 * time.Minute)
}
return nil
}

// NewQueue starts a new queue with a river client
func NewQueue(ctx context.Context) (*Queue, error) {
dbPool, err := pgxpool.New(context.Background(), os.Getenv("DATABASE_URL"))
Expand Down Expand Up @@ -129,22 +102,50 @@ func (q *Queue) setupEvents() {
q.EventChannels = append(q.EventChannels, channel)
}

// MoveQueue moves pods from provisional into the jobs queue
func (q *Queue) MoveQueue() {

// If there is a group, wait for minMember.
// When we dispatch the group, will need to clean up this table

}

// Enqueue a new job to the queue
// When we add a job, we have generated the jobspec and the group is ready.
func (q *Queue) Enqueue(ctx context.Context, pod *corev1.Pod) error {
// TODO create database that has pod names, etc.
// We will add immediately to queue if no group
// If there is a group, wait for minMember.
// When we dispatch the group, will need to clean up this table
groupName := "test-group"

// TODO: this needs to be passed somehow to worker as json
// serializable for reference later, for both pod/jobspec
// Get the jobspec for the pod
//jobspec := podspec.PreparePodJobSpec(pod, groupName)
// Get the pod group - this does not alter the database, but just looks for it
podGroup, err := q.GetPodGroup(pod)
if err != nil {
return err
}
klog.Infof("Derived podgroup %s (%d) created at %s", podGroup.Name, podGroup.Size, podGroup.Timestamp)

// Case 1: add to queue if size 1 - we don't need to keep a record of it.
if podGroup.Size == 1 {
err = q.SubmitJob(ctx, pod, podGroup)
}
// Case 2: all other cases, add to pod table (if does not exist)
// Pods added to the table are checked at the end (and Submit if
// the group is ready, meaning having achieved minimum size.
q.EnqueuePod(ctx, pod, podGroup)
return nil
}

// SubmitJob subits the podgroup (and podspec) to the queue
// It will later be given to AskFlux
// TODO this should not rely on having a single pod.
func (q *Queue) SubmitJob(ctx context.Context, pod *corev1.Pod, group *PodGroup) error {

// podspec needs to be serialized to pass to the job
asJson, err := json.Marshal(pod)
if err != nil {
return err
}

// Create an enqueue the new job!
job := JobArgs{ShouldSnooze: true, GroupName: groupName}
// Create and submit the new job! This needs to serialize as json, hence converting
// the podspec. It's not clear to me if we need to keep the original pod object
job := JobArgs{GroupName: group.Name, Podspec: string(asJson), GroupSize: group.Size}

// Start a transaction to insert a job - without this it won't run!
tx, err := q.Pool.Begin(ctx)
Expand All @@ -161,5 +162,5 @@ func (q *Queue) Enqueue(ctx context.Context, pod *corev1.Pod) error {
if row.Job != nil {
klog.Infof("Job %d was added to the queue", row.Job.ID)
}
return err
return nil
}
Loading

0 comments on commit 633bf36

Please sign in to comment.