Skip to content

Commit

Permalink
Merge pull request #2 from converged-computing/reorganize-queue-manager
Browse files Browse the repository at this point in the history
queue-manager: reorganize into strategies
  • Loading branch information
vsoch committed Jul 29, 2024
2 parents 633bf36 + 73d587d commit 7add491
Show file tree
Hide file tree
Showing 13 changed files with 551 additions and 281 deletions.
32 changes: 32 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,38 @@ scheduler-plugins-controller-8676df7769-ss9kz 1/1 Running 0 10
And that's it! This is fully working, but this only means that we are going to next work on the new design.
See [docs](docs) for notes on that.

## Development

### Debugging Postgres

It is often helpful to shell into the postgres container to see the database directly:

```bash
kubectl exec -it postgres-597db46977-9lb25 bash
psql -U postgres

# Connect to database
\c

# list databases
\l

# show tables
\dt

# test a query
SELECT group_name, group_size from pods_provisional;
```

### TODO

- [ ] I'd like a more efficient query (or strategy) to move pods from provisional into the worker queue. Right now I have three queries and it's too many.
- [ ] Discussion about how to respond to a "failed" allocation request (meaning we just can't give nodes now, likely to happen a lot). Maybe we need to do a reservation instead?
- [ ] I think maybe we should do a match allocate else reserve instead (see issue [here](https://github.com/converged-computing/fluxnetes/issues/4))
- [ ] Restarting with postgres shouldn't have crashloopbackoff when the database isn't ready yet
- [ ] In-tree registry plugins (that are related to resources) should be run first to inform fluxion what nodes not to bind, where there are volumes, etc.
- [ ] The queue should inherit (and return) the start time (when the pod was first seen) "start" in scheduler.go
- [ ] The provisional -> scheduled should do a sort for the timestamp (I mostly just forgot this)!

## License

Expand Down
11 changes: 11 additions & 0 deletions kubernetes/pkg/fluxnetes/defaults/defaults.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package defaults

import (
"math"
)

const (
// https://github.com/riverqueue/river/discussions/475
// The database column is an int16
MaxAttempts = math.MaxInt16
)
13 changes: 13 additions & 0 deletions kubernetes/pkg/fluxnetes/fluxnetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package fluxnetes
import (
"context"
"fmt"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -46,6 +47,18 @@ var (
GroupName = "scheduling.x-k8s.io"
)

// JobResult serializes a result from Fluxnetes in the scheduler back to metadata
type JobResult struct {
JobID int32 `json:"jobid"`
Nodes string `json:"nodes"`
PodID string `json:"podid"`
PodSpec string `json:"podspec"`
}

func (j JobResult) GetNodes() []string {
return strings.Split(j.Nodes, ",")
}

// Fluxnetes schedules pods in a group using Fluxion as a backend
// We inherit cosched.Coscheduling to use some of the primary functions
type Fluxnetes struct {
Expand Down
43 changes: 43 additions & 0 deletions kubernetes/pkg/fluxnetes/group/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,56 @@ import (
"fmt"
"time"

"strconv"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
klog "k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/labels"
"sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1"
)

// A PodGroup holds the name and size of a pod group
// It is just a temporary holding structure
type PodGroup struct {
Name string
Size int32
Timestamp metav1.MicroTime
}

// getPodGroupName returns the pod group name
// 1. We first look to see if the pod is explicitly labeled
// 2. If not, we fall back to a default based on the pod name and namespace
func GetPodGroupName(pod *corev1.Pod) string {
groupName := labels.GetPodGroupLabel(pod)

// If we don't have a group, create one under fluxnetes namespace
if groupName == "" {
groupName = fmt.Sprintf("fluxnetes-group-%s-%s", pod.Namespace, pod.Name)
}
return groupName
}

// getPodGroupSize gets the group size, first from label then default of 1
func GetPodGroupSize(pod *corev1.Pod) (int32, error) {

// Do we have a group size? This will be parsed as a string, likely
groupSize, ok := pod.Labels[labels.PodGroupSizeLabel]
if !ok {
groupSize = "1"
pod.Labels[labels.PodGroupSizeLabel] = groupSize
}

// We need the group size to be an integer now!
size, err := strconv.ParseInt(groupSize, 10, 32)
if err != nil {
return 0, err
}
return int32(size), nil
}

// TODO(vsoch) delete everything below here when PodGroup is no longer used
// DefaultWaitTime is 60s if ScheduleTimeoutSeconds is not specified.
const DefaultWaitTime = 60 * time.Second

Expand Down
123 changes: 0 additions & 123 deletions kubernetes/pkg/fluxnetes/podgroup.go

This file was deleted.

19 changes: 19 additions & 0 deletions kubernetes/pkg/fluxnetes/queries/queries.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package queries

// Queries used by the main queue (and shared across strategies sometimes)
const (
GetTimestampQuery = "select created_at from pods_provisional where group_name=$1 limit 1"
GetPodQuery = "select * from pods_provisional where group_name=$1 and namespace=$2 and name=$3"
InsertPodQuery = "insert into pods_provisional (podspec, namespace, name, created_at, group_name, group_size) values ($1, $2, $3, $4, $5, $6)"
CountPodsQuery = "select count(*) from pods_provisional where group_name=$1"
UpdateNodesQuery = "update river_job set args = jsonb_set(args, '{nodes}', to_jsonb($1::text)) where id=$2;"

// This could use improvement from someone good at SQL. We need to:
// 1. Select groups for which the size >= the number of pods we've seen
// 2. Then get the group_name, group_size, and podspec for each (this goes to scheduler)
// 3. Delete all from the table
// Ensure we are sorting by the timestamp when they were added (should be DESC I think)
SelectGroupsAtSizeQuery = "select group_name from pods_provisional group by group_name, group_size having group_size >= count(*);"
SelectGroupsQuery = "select group_name, group_size, podspec from pods_provisional where group_name in ('%s');"
DeleteGroupsQuery = "delete from pods_provisional where group_name in ('%s');"
)
Loading

0 comments on commit 7add491

Please sign in to comment.