Skip to content

Commit

Permalink
queue-manager: reorganize into strategies
Browse files Browse the repository at this point in the history
I realized that we need flexibility in defining queue strategies,
not just in how the worker is designed, but also how the queue
strategy handles the schedule function. This is an overhaul (not
quite done yet) that does that. I stil need to plug the final
query back in to move provisional to the worker queue. Also
note that it looks like we have priority, pending, and other
insert params to play with.

Signed-off-by: vsoch <[email protected]>
  • Loading branch information
vsoch committed Jul 28, 2024
1 parent 633bf36 commit 98e2c08
Show file tree
Hide file tree
Showing 9 changed files with 341 additions and 207 deletions.
43 changes: 43 additions & 0 deletions kubernetes/pkg/fluxnetes/group/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,56 @@ import (
"fmt"
"time"

"strconv"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
klog "k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/labels"
"sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1"
)

// A PodGroup holds the name and size of a pod group
// It is just a temporary holding structure
type PodGroup struct {
Name string
Size int32
Timestamp metav1.MicroTime
}

// getPodGroupName returns the pod group name
// 1. We first look to see if the pod is explicitly labeled
// 2. If not, we fall back to a default based on the pod name and namespace
func GetPodGroupName(pod *corev1.Pod) string {
groupName := labels.GetPodGroupLabel(pod)

// If we don't have a group, create one under fluxnetes namespace
if groupName == "" {
groupName = fmt.Sprintf("fluxnetes-group-%s-%s", pod.Namespace, pod.Name)
}
return groupName
}

// getPodGroupSize gets the group size, first from label then default of 1
func GetPodGroupSize(pod *corev1.Pod) (int32, error) {

// Do we have a group size? This will be parsed as a string, likely
groupSize, ok := pod.Labels[labels.PodGroupSizeLabel]
if !ok {
groupSize = "1"
pod.Labels[labels.PodGroupSizeLabel] = groupSize
}

// We need the group size to be an integer now!
size, err := strconv.ParseInt(groupSize, 10, 32)
if err != nil {
return 0, err
}
return int32(size), nil
}

// TODO(vsoch) delete everything below here when PodGroup is no longer used
// DefaultWaitTime is 60s if ScheduleTimeoutSeconds is not specified.
const DefaultWaitTime = 60 * time.Second

Expand Down
123 changes: 0 additions & 123 deletions kubernetes/pkg/fluxnetes/podgroup.go

This file was deleted.

10 changes: 10 additions & 0 deletions kubernetes/pkg/fluxnetes/queries/queries.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package queries

// Queries used by the main queue (and shared across strategies sometimes)
const (
GetTimestampQuery = "select created_at from pods_provisional where group_name=$1 limit 1"
SelectGroupsQuery = "select group_name, group_size, podspec from pods_provisional"
GetPodQuery = "select * from pods_provisional where group_name=$1 and namespace=$2 and name=$3"
InsertPodQuery = "insert into pods_provisional (podspec, namespace, name, created_at, group_name, group_size) values ($1, $2, $3, $4, $5, $6)"
CountPodsQuery = "select count(*) from pods_provisional where group_name=$1"
)
Loading

0 comments on commit 98e2c08

Please sign in to comment.