diff --git a/Dockerfile b/Dockerfile index 7836725..c6a8f41 100644 --- a/Dockerfile +++ b/Dockerfile @@ -10,10 +10,10 @@ ENV ARCH=${ARCH} # but since we are adding custom kube-scheduler, and we don't need the controller # I moved the build logic up here instead of using hack/build-images.sh -RUN apt-get update && apt-get install -y wget git vim build-essential iputils-ping +RUN apt-get update && apt-get install -y wget git vim build-essential iputils-ping postgresql-client curl # Install Go -ENV GO_VERSION=1.22.2 +ENV GO_VERSION=1.22.5 RUN wget https://go.dev/dl/go${GO_VERSION}.linux-amd64.tar.gz && tar -xvf go${GO_VERSION}.linux-amd64.tar.gz && \ mv go /usr/local && rm go${GO_VERSION}.linux-amd64.tar.gz @@ -28,6 +28,8 @@ COPY ${K8S_UPSTREAM} . RUN go get github.com/patrickmn/go-cache && \ go get sigs.k8s.io/controller-runtime/pkg/client && \ go get sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1 && \ + go get github.com/riverqueue/river && \ + go get github.com/riverqueue/river/riverdriver/riverpgxv5 && \ go work vendor && \ make WHAT=cmd/kube-scheduler && \ cp /go/src/k8s.io/kubernetes/_output/local/go/bin/kube-scheduler /bin/kube-scheduler diff --git a/Makefile b/Makefile index c0f985c..e12df4a 100644 --- a/Makefile +++ b/Makefile @@ -15,11 +15,12 @@ ARCH ?= amd64 # These are passed to build the sidecar REGISTRY ?= ghcr.io/flux-framework SIDECAR_IMAGE ?= fluxnetes-sidecar:latest +POSTGRES_IMAGE ?= fluxnetes-postgres:latest SCHEDULER_IMAGE ?= fluxnetes -.PHONY: all build build-sidecar clone update push push-sidecar push-fluxnetes +.PHONY: all build build-sidecar clone update push push-sidecar push-fluxnetes build-postgres -all: prepare build-sidecar build +all: prepare build-sidecar build build-postgres upstreams: mkdir -p $(UPSTREAMS) @@ -48,4 +49,7 @@ push-fluxnetes: build-sidecar: make -C ./src LOCAL_REGISTRY=${REGISTRY} LOCAL_IMAGE=${SIDECAR_IMAGE} +build-postgres: + docker build -f src/build/postgres/Dockerfile -t ${REGISTRY}/${POSTGRES_IMAGE} . + push: push-sidecar push-fluxnetes diff --git a/README.md b/README.md index 171a25e..61beb58 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ ![docs/images/fluxnetes.png](docs/images/fluxnetes.png) -Fluxnetes enables is a combination of Kubernetes and [Fluence](https://github.com/flux-framework/flux-k8s), both of which use the HPC-grade pod scheduling [Fluxion scheduler](https://github.com/flux-framework/flux-sched) to schedule pod groups to nodes. +Fluxnetes enables is a combination of Kubernetes and [Fluence](https://github.com/flux-framework/flux-k8s), both of which use the HPC-grade pod scheduling [Fluxion scheduler](https://github.com/flux-framework/flux-sched) to schedule pod groups to nodes. For our queue, we use [river](https://riverqueue.com/docs) backed by a Postgres database. The database is deployed alongside fluence and could be customized to use an operator instead. **Important** This is an experiment, and is under development. I will change this design a million times - it's how I tend to learn and work. I'll share updates when there is something to share. It deploys but does not work yet! @@ -37,14 +37,15 @@ Then you can deploy as follows: ```bash ./hack/quick-build-kind.sh ``` -You'll then have the fluxnetes service running, along with the scheduler plugins controller, which we +You'll then have the fluxnetes service running, a postgres database (for the job queue), along with the scheduler plugins controller, which we currently have to use PodGroup. ```bash $ kubectl get pods NAME READY STATUS RESTARTS AGE -fluxnetes-66575b59d8-ghx8h 2/2 Running 0 8m53s -scheduler-plugins-controller-8676df7769-ss9kz 1/1 Running 0 8m53s +fluxnetes-6954cdcf64-gv7s7 2/2 Running 0 87s +postgres-c8d55999c-t6dtt 1/1 Running 0 87s +scheduler-plugins-controller-8676df7769-jvtwp 1/1 Running 0 87s ``` You can then create a job: @@ -75,6 +76,41 @@ scheduler-plugins-controller-8676df7769-ss9kz 1/1 Running 0 10 And that's it! This is fully working, but this only means that we are going to next work on the new design. See [docs](docs) for notes on that. +## Development + +### Debugging Postgres + +It is often helpful to shell into the postgres container to see the database directly: + +```bash +kubectl exec -it postgres-597db46977-9lb25 bash +psql -U postgres + +# Connect to database +\c + +# list databases +\l + +# show tables +\dt + +# test a query +SELECT group_name, group_size from pods_provisional; +``` + +### TODO + +- [ ] I'd like a more efficient query (or strategy) to move pods from provisional into the worker queue. Right now I have three queries and it's too many. +- [ ] Discussion about how to respond to a "failed" allocation request (meaning we just can't give nodes now, likely to happen a lot). Maybe we need to do a reservation instead? +- [ ] I think maybe we should do a match allocate else reserve instead (see issue [here](https://github.com/converged-computing/fluxnetes/issues/4)) +- [ ] Restarting with postgres shouldn't have crashloopbackoff when the database isn't ready yet +- [ ] In-tree registry plugins (that are related to resources) should be run first to inform fluxion what nodes not to bind, where there are volumes, etc. +- [ ] The queue should inherit (and return) the start time (when the pod was first seen) "start" in scheduler.go +- [ ] The provisional -> scheduled should do a sort for the timestamp (I mostly just forgot this)! +- [ ] when in basic working state, add back build and test workflows +- [ ] There should be a label (or existing value in the pod) to indicate an expected completion time (this is for Fluxion). We can have a worker task that explicitly cleans up the pods when the job should be completed. +- [x] remove fluence previous code ## License diff --git a/chart/templates/configmap.yaml b/chart/templates/configmap.yaml index 9f3d8bf..59fc5c5 100644 --- a/chart/templates/configmap.yaml +++ b/chart/templates/configmap.yaml @@ -1,4 +1,3 @@ -{{- if .Values.plugins.enabled }} apiVersion: v1 kind: ConfigMap metadata: @@ -14,6 +13,11 @@ data: # Compose all plugins in one profile - schedulerName: {{ .Values.scheduler.name }} plugins: + queueSort: + enabled: + {{- range $.Values.plugins.enabled }} + - name: {{ title . }} + {{- end }} preBind: disabled: - name: {{ .Values.scheduler.name }} @@ -48,17 +52,10 @@ data: - name: {{ title . }} {{- end }} multiPoint: - enabled: - {{- range $.Values.plugins.enabled }} - - name: {{ title . }} - {{- end }} disabled: {{- range $.Values.plugins.disabled }} - name: {{ title . }} {{- end }} {{- if $.Values.pluginConfig }} pluginConfig: {{ toYaml $.Values.pluginConfig | nindent 6 }} - {{- end }} - - {{- /* TODO: wire CRD installation with enabled plugins. */}} -{{- end }} + {{- end }} \ No newline at end of file diff --git a/chart/templates/deployment.yaml b/chart/templates/deployment.yaml index 85898a8..7d8d9f8 100644 --- a/chart/templates/deployment.yaml +++ b/chart/templates/deployment.yaml @@ -66,6 +66,17 @@ spec: - command: - /bin/kube-scheduler - --config=/etc/kubernetes/scheduler-config.yaml + env: + - name: DATABASE_URL + value: postgres://postgres:postgres@postgres:5432/postgres + - name: PGHOST + value: postgres + - name: PGDATABASE + value: postgres + - name: PGPORT + value: "5432" + - name: PGPASSWORD + value: postgres image: {{ .Values.scheduler.image }} imagePullPolicy: {{ .Values.scheduler.pullPolicy }} livenessProbe: @@ -76,10 +87,13 @@ spec: initialDelaySeconds: 15 name: scheduler readinessProbe: - httpGet: - path: /healthz - port: 10259 - scheme: HTTPS + exec: + command: + - "sh" + - "-c" + - > + status=$(curl -ks https://localhost:10259/healthz); if [ "$status" -ne "ok" ]; then exit 1; fi + pg_isready -d postgres -h postgres -p 5432 -U postgres; resources: requests: cpu: '0.1' diff --git a/chart/templates/postgres.yaml b/chart/templates/postgres.yaml new file mode 100644 index 0000000..5ef6f11 --- /dev/null +++ b/chart/templates/postgres.yaml @@ -0,0 +1,63 @@ +# Note: This is intended for development/test deployments +apiVersion: apps/v1 +kind: Deployment +metadata: + name: postgres + namespace: {{ .Release.Namespace }} +spec: + replicas: 1 + selector: + matchLabels: + app: postgres + template: + metadata: + labels: + app: postgres + spec: + containers: + - name: postgres + image: {{ .Values.postgres.image }} + imagePullPolicy: {{ .Values.postgres.pullPolicy }} + ports: + - name: postgres-port + containerPort: 5432 + env: + - name: POSTGRES_USER + value: postgres + - name: POSTGRES_PASSWORD + value: postgres + - name: POSTGRES_DB + value: postgres + resources: + requests: + memory: "1Gi" + cpu: "500m" + limits: + memory: "1Gi" + cpu: "2" + readinessProbe: + exec: + command: + - "sh" + - "-c" + - > + pg_isready -q -d postgres -U postgres; + runuser -l postgres -c '/usr/local/bin/river migrate-up --database-url postgres://localhost:5432/postgres > /tmp/post-start.log' + initialDelaySeconds: 5 + periodSeconds: 10 + timeoutSeconds: 5 + successThreshold: 1 + failureThreshold: 3 +--- +apiVersion: v1 +kind: Service +metadata: + name: postgres + namespace: {{ .Release.Namespace }} +spec: + selector: + app: postgres + ports: + - protocol: TCP + port: 5432 + targetPort: 5432 \ No newline at end of file diff --git a/chart/values.yaml b/chart/values.yaml index 53ee9bf..5893400 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -11,6 +11,10 @@ scheduler: pullPolicy: Always leaderElect: false +database: + image: ghcr.io/flux-framework/fluxnetes-postgres:latest + pullPolicy: Always + # The sidecar is explicitly the fluxion service. I'd like to # simplify this to use fluxion as a service sidecar: @@ -38,6 +42,7 @@ controller: # as they need extra RBAC privileges on metrics.k8s.io. plugins: + # We keep this enabled for the custom queue sort enabled: ["Fluxnetes"] disabled: ["CapacityScheduling","NodeResourceTopologyMatch","NodeResourcesAllocatable","PrioritySort","Coscheduling"] # only in-tree plugins need to be defined here # Disable EVERYTHING except for fluxnetes diff --git a/docs/README.md b/docs/README.md index 866503d..3712812 100644 --- a/docs/README.md +++ b/docs/README.md @@ -2,13 +2,39 @@ ## Design Notes +> July 29, 2024 + +Today we are merging in the "gut out and refactor" branch that does the following: + + - Add Queue Manager (and queues design, shown and described below) + - Remove what remains of Fluence (now Fluxnetes is just a shell to provide sort) + - Replace the default scheduler (schedulingCycle) with this approach (we still use bindingCycle) + +The new queue design is based on a producer consumer model in that there are workers (the number of our choosing) each associated with different queues. The workers themselves can do different things, and this depends on both the queue and Queuing strategy (I say this because two different strategies can share a common worker design). Before we hit a worker queue, we have a provisional queue step. This means that: + +1. Incoming pods are added to a provisional table with their name, group, timestamp, and expected size. +2. Pods are moved from the provisional table to the worker queue when they reach quorum (the minimum size) +3. At this point, they go into the hands of a Queue Manager, ordered by their group timestamp. + +For the first that I've added, which I'm calling FCFS with backfill, the worker task does a call to fluxion, specifically a `MatchAllocate`. I am planning to change this to a `MatchAllocateElseReserve` so I can "snooze" the job to trigger again in the future given that it cannot be scheduled then and there. When the work is allocated, the metadata for the job (specifically args for "Nodes") is updated to carry the nodes forward to events that are listening for them. A subscription event is sent back to the main scheduler, which receives the nodes, and then performs binding. The pods are received as a group, meaning the binding of the group happens at the same time (in a loop, still one by one, but guaranteed to be in that order I think) and the work is run. Some (high level) work that still needs to be done: + +- The provisional queue hardened up to be provided (and exposed) as explicit interfaces (it is part of the main fluxnetes queue module now) +- A pod label for an expected time (and a default time) could be used so every job has an expected end time (for Fluxion). A cancel queue would handle this. +- The in-tree plugin outputs (needs for volumes, and what nodes can provide) needs to be exposed to Fluxion. Either fluxion can be told: + - "These nodes aren't possible for this work" + - "These are the only nodes you can consider for this work" + - "Here is a resource requirement you know about in your graph" + +There are more features that still need to be worked on and added (see the README.md of this repository) but this is a good start! One thing I am tickled by is that this does not need to be Kubernetes specific. It happens to be implemented within it, but the only detail that is relevant to Kubernetes is having a pod derive the underlying unit of work. All of the logic could be moved outside of it, with some other unit of work. + +![images/fluxnetes.png](images/fluxnetes.png) + > July 10th, 2024 Fluxnetes is functioning, on equal par with what fluence does to schedule and cancel pods. The difference is that I removed the webhook and controller to create PodGroup, and (for the time being) am relying on the user to create them. The reason is because I don't want to add the complexity of a new controller and webhook to Kubernetes. And instead of doing a custom CR (custom resource) for our PodGroup, I am using the one from coscheduling. THis allows install of the module without breaking smaller level dependencies. I'm not sure why that works, but it does! So the current state is that Fluxnetes is scheduling! My next step is to slowly add components for the new design, ensuring I don't break anything as I go, and going as far with that approach as I can until I need to swap it in. Then I'll likely need to be a bit more destructive and careful. - > This was a group update on July 8th, 2024 An update on design thinking for what I'm calling "fluxnetes" - a next step experiment for Kubernetes and Fluxion integration. Apologies in advance this is long - I do a lot of thinking and have desire to express it, because I don't think the design process (our thinking!) is always shared transparently. To start, there are two strategies to take: diff --git a/docs/images/fluxnetes-v1.png b/docs/images/fluxnetes-v1.png new file mode 100644 index 0000000..3c1e2d2 Binary files /dev/null and b/docs/images/fluxnetes-v1.png differ diff --git a/docs/images/fluxnetes.png b/docs/images/fluxnetes.png index 3c1e2d2..90e3f89 100644 Binary files a/docs/images/fluxnetes.png and b/docs/images/fluxnetes.png differ diff --git a/examples/job.yaml b/examples/job.yaml index 3bfd890..d809e17 100644 --- a/examples/job.yaml +++ b/examples/job.yaml @@ -1,12 +1,3 @@ -# PodGroup CRD spec -apiVersion: scheduling.x-k8s.io/v1alpha1 -kind: PodGroup -metadata: - name: job -spec: - scheduleTimeoutSeconds: 10 - minMember: 1 ---- apiVersion: batch/v1 kind: Job metadata: diff --git a/hack/quick-build-kind.sh b/hack/quick-build-kind.sh index 2bca901..d941353 100755 --- a/hack/quick-build-kind.sh +++ b/hack/quick-build-kind.sh @@ -16,12 +16,15 @@ make REGISTRY=${REGISTRY} SCHEDULER_IMAGE=fluxnetes SIDECAR_IMAGE=fluxnetes-side # We load into kind so we don't need to push/pull and use up internet data ;) kind load docker-image ${REGISTRY}/fluxnetes-sidecar:latest kind load docker-image ${REGISTRY}/fluxnetes:latest +kind load docker-image ${REGISTRY}/fluxnetes-postgres:latest # And then install using the charts. The pull policy ensures we use the loaded ones helm uninstall fluxnetes || true helm install \ + --set postgres.image=${REGISTRY}/fluxnetes-postgres:latest \ --set scheduler.image=${REGISTRY}/fluxnetes:latest \ + --set sidecar.image=${REGISTRY}/fluxnetes-sidecar:latest \ + --set postgres.pullPolicy=Never \ --set scheduler.pullPolicy=Never \ --set sidecar.pullPolicy=Never \ - --set sidecar.image=${REGISTRY}/fluxnetes-sidecar:latest \ fluxnetes chart/ diff --git a/kubernetes/pkg/fluxnetes/core/core.go b/kubernetes/pkg/fluxnetes/core/core.go deleted file mode 100644 index 95ce293..0000000 --- a/kubernetes/pkg/fluxnetes/core/core.go +++ /dev/null @@ -1,415 +0,0 @@ -/* -Copyright 2020 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package core - -import ( - "context" - "fmt" - "sync" - "time" - - gochache "github.com/patrickmn/go-cache" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/types" - informerv1 "k8s.io/client-go/informers/core/v1" - listerv1 "k8s.io/client-go/listers/core/v1" - klog "k8s.io/klog/v2" - - "k8s.io/kubernetes/pkg/scheduler/framework" - "sigs.k8s.io/controller-runtime/pkg/client" - - "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/logger" - "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" - - // Moved from sig-scheduler-plugins/pkg/util - util "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/labels" -) - -type Status string - -const ( - // PodGroupNotSpecified denotes no PodGroup is specified in the Pod spec. - PodGroupNotSpecified Status = "PodGroup not specified" - // PodGroupNotFound denotes the specified PodGroup in the Pod spec is - // not found in API server. - PodGroupNotFound Status = "PodGroup not found" - Success Status = "Success" - Wait Status = "Wait" - - permitStateKey = "PermitFluxnetes" -) - -// TODO should eventually store group name here to reassociate on reload -type FluxStateData struct { - NodeName string -} - -type PermitState struct { - Activate bool -} - -func (s *PermitState) Clone() framework.StateData { - return &PermitState{Activate: s.Activate} -} - -func (s *FluxStateData) Clone() framework.StateData { - clone := &FluxStateData{ - NodeName: s.NodeName, - } - return clone -} - -// Manager defines the interfaces for PodGroup management. -type Manager interface { - PreFilter(context.Context, *corev1.Pod, *framework.CycleState) error - GetPodNode(*corev1.Pod) string - GetPodGroup(context.Context, *corev1.Pod) (string, *v1alpha1.PodGroup) - GetCreationTimestamp(*corev1.Pod, time.Time) metav1.MicroTime - DeletePermittedPodGroup(string) - Permit(context.Context, *framework.CycleState, *corev1.Pod) Status - CalculateAssignedPods(string, string) int - ActivateSiblings(pod *corev1.Pod, state *framework.CycleState) - BackoffPodGroup(string, time.Duration) -} - -// PodGroupManager defines the scheduling operation called -type PodGroupManager struct { - // client is a generic controller-runtime client to manipulate both core resources and PodGroups. - client client.Client - // snapshotSharedLister is pod shared list - snapshotSharedLister framework.SharedLister - // scheduleTimeout is the default timeout for podgroup scheduling. - // If podgroup's scheduleTimeoutSeconds is set, it will be used. - scheduleTimeout *time.Duration - // permittedpodGroup stores the podgroup name which has passed the pre resource check. - permittedpodGroup *gochache.Cache - // backedOffpodGroup stores the podgorup name which failed scheduling recently. - backedOffpodGroup *gochache.Cache - // podLister is pod lister - podLister listerv1.PodLister - - // This isn't great to save state, but we can improve upon it - // we should have a way to load jobids into this if fluxnetes is recreated - // If we can annotate them in fluxion and query for that, we can! - groupToJobId map[string]uint64 - podToNode map[string]string - - // Probably should just choose one... oh well - sync.RWMutex - mutex sync.Mutex - log *logger.DebugLogger -} - -// NewPodGroupManager creates a new operation object. -func NewPodGroupManager( - client client.Client, - snapshotSharedLister framework.SharedLister, - scheduleTimeout *time.Duration, - podInformer informerv1.PodInformer, - log *logger.DebugLogger, -) *PodGroupManager { - podGroupManager := &PodGroupManager{ - client: client, - snapshotSharedLister: snapshotSharedLister, - scheduleTimeout: scheduleTimeout, - podLister: podInformer.Lister(), - permittedpodGroup: gochache.New(3*time.Second, 3*time.Second), - backedOffpodGroup: gochache.New(10*time.Second, 10*time.Second), - groupToJobId: map[string]uint64{}, - podToNode: map[string]string{}, - log: log, - } - return podGroupManager -} - -func (podGroupManager *PodGroupManager) BackoffPodGroup(groupName string, backoff time.Duration) { - if backoff == time.Duration(0) { - return - } - podGroupManager.backedOffpodGroup.Add(groupName, nil, backoff) -} - -// ActivateSiblings stashes the pods belonging to the same PodGroup of the given pod -// in the given state, with a reserved key "kubernetes.io/pods-to-activate". -func (podGroupManager *PodGroupManager) ActivateSiblings(pod *corev1.Pod, state *framework.CycleState) { - groupName := util.GetPodGroupLabel(pod) - if groupName == "" { - return - } - - // Only proceed if it's explicitly requested to activate sibling pods. - if c, err := state.Read(permitStateKey); err != nil { - return - } else if s, ok := c.(*PermitState); !ok || !s.Activate { - return - } - - pods, err := podGroupManager.podLister.Pods(pod.Namespace).List( - labels.SelectorFromSet(labels.Set{v1alpha1.PodGroupLabel: groupName}), - ) - if err != nil { - klog.ErrorS(err, "Failed to obtain pods belong to a PodGroup", "podGroup", groupName) - return - } - - for i := range pods { - if pods[i].UID == pod.UID { - pods = append(pods[:i], pods[i+1:]...) - break - } - } - - if len(pods) != 0 { - if c, err := state.Read(framework.PodsToActivateKey); err == nil { - if s, ok := c.(*framework.PodsToActivate); ok { - s.Lock() - for _, pod := range pods { - namespacedName := GetNamespacedName(pod) - s.Map[namespacedName] = pod - } - s.Unlock() - } - } - } -} - -// GetStatuses string (of all pods) to show for debugging purposes -func (podGroupManager *PodGroupManager) GetStatuses( - pods []*corev1.Pod, -) string { - statuses := "" - - // We need to distinguish 0 from the default and not finding anything - for _, pod := range pods { - statuses += " " + fmt.Sprintf("%s", pod.Status.Phase) - } - return statuses -} - -// GetPodNode is a quick lookup to see if we have a node -func (podGroupManager *PodGroupManager) GetPodNode(pod *corev1.Pod) string { - node, ok := podGroupManager.podToNode[pod.Name] - if !ok { - return "" - } - return node -} - -// Permit permits a pod to run, if the minMember match, it would send a signal to chan. -func (podGroupManager *PodGroupManager) Permit(ctx context.Context, state *framework.CycleState, pod *corev1.Pod) Status { - groupName, podGroup := podGroupManager.GetPodGroup(ctx, pod) - if groupName == "" { - return PodGroupNotSpecified - } - if podGroup == nil { - // A Pod with a podGroup name but without a PodGroup found is denied. - return PodGroupNotFound - } - - assigned := podGroupManager.CalculateAssignedPods(podGroup.Name, podGroup.Namespace) - // The number of pods that have been assigned nodes is calculated from the snapshot. - // The current pod in not included in the snapshot during the current scheduling cycle. - if int32(assigned)+1 >= podGroup.Spec.MinMember { - return Success - } - - if assigned == 0 { - // Given we've reached Permit(), it's mean all PreFilter checks (minMember & minResource) - // already pass through, so if assigned == 0, it could be due to: - // - minResource get satisfied - // - new pods added - // In either case, we should and only should use this 0-th pod to trigger activating - // its siblings. - // It'd be in-efficient if we trigger activating siblings unconditionally. - // See https://github.com/kubernetes-sigs/scheduler-plugins/issues/682 - state.Write(permitStateKey, &PermitState{Activate: true}) - } - - return Wait -} - -// PreFilter filters out a pod if -// 1. it belongs to a podgroup that was recently denied or -// 2. the total number of pods in the podgroup is less than the minimum number of pods -// that is required to be scheduled. -func (podGroupManager *PodGroupManager) PreFilter( - ctx context.Context, - pod *corev1.Pod, - state *framework.CycleState, -) error { - - podGroupManager.log.Info("[PodGroup PreFilter] pod %s", klog.KObj(pod)) - groupName, podGroup := podGroupManager.GetPodGroup(ctx, pod) - if podGroup == nil { - return nil - } - - _, exists := podGroupManager.backedOffpodGroup.Get(groupName) - if exists { - return fmt.Errorf("podGroup %v failed recently", groupName) - } - - pods, err := podGroupManager.podLister.Pods(pod.Namespace).List( - labels.SelectorFromSet(labels.Set{v1alpha1.PodGroupLabel: util.GetPodGroupLabel(pod)}), - ) - if err != nil { - return fmt.Errorf("podLister list pods failed: %w", err) - } - - // Only allow scheduling the first in the group so the others come after - - // Get statuses to show for debugging - statuses := podGroupManager.GetStatuses(pods) - - // This shows us the number of pods we have in the set and their states - podGroupManager.log.Info("[PodGroup PreFilter] group: %s pods: %s MinMember: %d Size: %d", groupName, statuses, podGroup.Spec.MinMember, len(pods)) - if len(pods) < int(podGroup.Spec.MinMember) { - return fmt.Errorf("pre-filter pod %v cannot find enough sibling pods, "+ - "current pods number: %v, minMember of group: %v", pod.Name, len(pods), podGroup.Spec.MinMember) - } - - // TODO we likely can take advantage of these resources or other custom - // attributes we add. For now ignore and calculate based on pod needs (above) - // if podGroup.Spec.MinResources == nil { - // fmt.Printf("Fluxnetes Min resources are null, skipping PreFilter") - // return nil - // } - - // This is from coscheduling. - // TODO(cwdsuzhou): This resource check may not always pre-catch unschedulable pod group. - // It only tries to PreFilter resource constraints so even if a PodGroup passed here, - // it may not necessarily pass Filter due to other constraints such as affinity/taints. - _, exists = podGroupManager.permittedpodGroup.Get(groupName) - if exists { - podGroupManager.log.Info("[PodGroup PreFilter] Pod Group %s is already admitted", groupName) - return nil - } - - // TODO: right now we ask Fluxion for a podspec based on ONE representative pod, but - // we have the whole group! We can handle different pod needs now :) - repPod := pods[0] - nodes, err := podGroupManager.AskFlux(ctx, *repPod, podGroup, groupName) - if err != nil { - podGroupManager.log.Info("[PodGroup PreFilter] Fluxion returned an error %s, not schedulable", err.Error()) - return err - } - podGroupManager.log.Info("Node Selected %s (pod group %s)", nodes, groupName) - - // Some reason fluxion gave us the wrong size? - if len(nodes) != len(pods) { - podGroupManager.log.Warning("[PodGroup PreFilter] group %s needs %d nodes but Fluxion returned the wrong number nodes %d.", groupName, len(pods), len(nodes)) - podGroupManager.mutex.Lock() - podGroupManager.cancelFluxJob(groupName, repPod) - podGroupManager.mutex.Unlock() - } - - // Create a fluxState (CycleState) with all nodes - this is used to retrieve - // the specific node assigned to the pod in Filter, which returns a node - // Note that this probably is not useful beyond the pod we are in the context - // of, but why not do it. - for i, node := range nodes { - pod := pods[i] - stateData := FluxStateData{NodeName: node} - state.Write(framework.StateKey(pod.Name), &stateData) - // Also save to the podToNode lookup - podGroupManager.mutex.Lock() - podGroupManager.podToNode[pod.Name] = node - podGroupManager.mutex.Unlock() - } - podGroupManager.permittedpodGroup.Add(groupName, groupName, *podGroupManager.scheduleTimeout) - return nil -} - -// GetCreationTimestamp returns the creation time of a podGroup or a pod in seconds (time.MicroTime) -// The Status.CreationTime is set by the PodGroup reconciler, which has to happen before we have -// a PodGroup. I don't see cases when this wouldn't happen, but in case we fall back to -// converting the pg.CreationTime to a MicroTime -func (podGroupManager *PodGroupManager) GetCreationTimestamp(pod *corev1.Pod, ts time.Time) metav1.MicroTime { - groupName := util.GetPodGroupLabel(pod) - if len(groupName) == 0 { - return metav1.NewMicroTime(ts) - } - - var podGroup v1alpha1.PodGroup - if err := podGroupManager.client.Get(context.TODO(), types.NamespacedName{Namespace: pod.Namespace, Name: groupName}, &podGroup); err != nil { - return metav1.NewMicroTime(ts) - } - // First preference goes to microseconds. This should be set, as it is set by the first - // reconcile, and we wouldn'thave a pod group if it didn't pass through that. - if !podGroup.CreationTimestamp.IsZero() { - return metav1.NewMicroTime(podGroup.CreationTimestamp.Time) - } - // Fall back to CreationTime from Kubernetes, in seconds - // In practice this should not happen - return metav1.NewMicroTime(podGroup.CreationTimestamp.Time) -} - -// CalculateAssignedPods returns the number of pods that has been assigned nodes: assumed or bound. -func (podGroupManager *PodGroupManager) CalculateAssignedPods(podGroupName, namespace string) int { - nodeInfos, err := podGroupManager.snapshotSharedLister.NodeInfos().List() - if err != nil { - podGroupManager.log.Error("Cannot get nodeInfos from frameworkHandle: %s", err) - return 0 - } - var count int - for _, nodeInfo := range nodeInfos { - for _, podInfo := range nodeInfo.Pods { - pod := podInfo.Pod - if util.GetPodGroupLabel(pod) == podGroupName && pod.Namespace == namespace && pod.Spec.NodeName != "" { - count++ - } - } - } - return count -} - -// DeletePermittedPodGroup deletes a podGroup that passes Pre-Filter but reaches PostFilter. -func (podGroupManager *PodGroupManager) DeletePermittedPodGroup(groupName string) { - podGroupManager.permittedpodGroup.Delete(groupName) -} - -// GetPodGroup returns the PodGroup that a Pod belongs to in cache. -func (podGroupManager *PodGroupManager) GetPodGroup(ctx context.Context, pod *corev1.Pod) (string, *v1alpha1.PodGroup) { - groupName := util.GetPodGroupLabel(pod) - - podGroupManager.log.Info("Pod Group Name is %s", groupName) - - // If we don't have a group, create one under fluxnetes namespace - if groupName == "" { - groupName = fmt.Sprintf("fluxnetes-group-%s", pod.Name) - } - - // Do we have a group size? This will be parsed as a string, likely - //groupSize, ok := pod.Labels[labels.PodGroupSizeLabel] - //if !ok { - // groupSize = "1" - // pod.Labels[labels.PodGroupSizeLabel] = groupSize - //} - - var podGroup v1alpha1.PodGroup - if err := podGroupManager.client.Get(ctx, types.NamespacedName{Namespace: pod.Namespace, Name: groupName}, &podGroup); err != nil { - return fmt.Sprintf("%v/%v", pod.Namespace, groupName), nil - } - return fmt.Sprintf("%v/%v", pod.Namespace, groupName), &podGroup -} - -// GetNamespacedName returns the namespaced name. -func GetNamespacedName(obj metav1.Object) string { - return fmt.Sprintf("%v/%v", obj.GetNamespace(), obj.GetName()) -} diff --git a/kubernetes/pkg/fluxnetes/core/flux.go b/kubernetes/pkg/fluxnetes/core/flux.go deleted file mode 100644 index 52732a0..0000000 --- a/kubernetes/pkg/fluxnetes/core/flux.go +++ /dev/null @@ -1,258 +0,0 @@ -package core - -import ( - "context" - "time" - - "google.golang.org/grpc" - "k8s.io/apimachinery/pkg/labels" - fgroup "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/group" - - pb "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/fluxion-grpc" - "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" - - "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/podspec" - - corev1 "k8s.io/api/core/v1" -) - -// AskFlux will ask flux for an allocation for nodes for the pod group. -// We return the list of nodes, and assign to the entire group! -func (podGroupManager *PodGroupManager) AskFlux( - ctx context.Context, - pod corev1.Pod, - podGroup *v1alpha1.PodGroup, - groupName string, -) ([]string, error) { - - // clean up previous match if a pod has already allocated previously - podGroupManager.mutex.Lock() - _, isAllocated := podGroupManager.groupToJobId[groupName] - podGroupManager.mutex.Unlock() - - // This case happens when there is some reason that an initial job pods partially allocated, - // but then the job restarted, and new pods are present but fluxnetes had assigned nodes to - // the old ones (and there aren't enough). The job would have had to complete in some way, - // and the PodGroup would have to then recreate, and have the same job id (the group name). - // This happened when I cancalled a bunch of jobs and they didn't have the chance to - // cancel in fluxnetes. What we can do here is assume the previous pods are no longer running - // and cancel the flux job to create again. - if isAllocated { - podGroupManager.log.Warning("[PodGroup AskFlux] group %s was previously allocated and is requesting again, so must have completed.", groupName) - podGroupManager.mutex.Lock() - podGroupManager.cancelFluxJob(groupName, &pod) - podGroupManager.mutex.Unlock() - } - nodes := []string{} - - // IMPORTANT: this is a JobSpec for *one* pod, assuming they are all the same. - // This obviously may not be true if we have a hetereogenous PodGroup. - // We name it based on the group, since it will represent the group - jobspec := podspec.PreparePodJobSpec(&pod, groupName) - podGroupManager.log.Info("[PodGroup AskFlux] Inspect pod info, jobspec: %s\n", jobspec) - conn, err := grpc.Dial("127.0.0.1:4242", grpc.WithInsecure()) - - // TODO change this to just return fmt.Errorf - if err != nil { - podGroupManager.log.Error("[PodGroup AskFlux] Error connecting to server: %v\n", err) - return nodes, err - } - defer conn.Close() - - grpcclient := pb.NewFluxionServiceClient(conn) - _, cancel := context.WithTimeout(context.Background(), 200*time.Second) - defer cancel() - - request := &pb.MatchRequest{ - Ps: jobspec, - Request: "allocate", - Count: podGroup.Spec.MinMember, - } - - // An error here is an error with making the request - response, err := grpcclient.Match(context.Background(), request) - if err != nil { - podGroupManager.log.Warning("[PodGroup AskFlux] did not receive any match response: %v\n", err) - return nodes, err - } - - // TODO GetPodID should be renamed, because it will reflect the group - podGroupManager.log.Info("[PodGroup AskFlux] Match response ID %s\n", response.GetPodID()) - - // Get the nodelist and inspect - nodelist := response.GetNodelist() - for _, node := range nodelist { - nodes = append(nodes, node.NodeID) - } - jobid := uint64(response.GetJobID()) - podGroupManager.log.Info("[PodGroup AskFlux] parsed node pods list %s for job id %d\n", nodes, jobid) - - // TODO would be nice to actually be able to ask flux jobs -a to fluxnetes - // That way we can verify assignments, etc. - podGroupManager.mutex.Lock() - podGroupManager.groupToJobId[groupName] = jobid - podGroupManager.mutex.Unlock() - return nodes, nil -} - -// cancelFluxJobForPod cancels the flux job for a pod. -// We assume that the cancelled job also means deleting the pod group -func (podGroupManager *PodGroupManager) cancelFluxJob(groupName string, pod *corev1.Pod) error { - - jobid, exists := podGroupManager.groupToJobId[groupName] - - // The job was already cancelled by another pod - if !exists { - podGroupManager.log.Info("[PodGroup cancelFluxJob] Request for cancel of group %s is already complete.", groupName) - return nil - } - podGroupManager.log.Info("[PodGroup cancelFluxJob] Cancel flux job: %v for group %s", jobid, groupName) - - // This first error is about connecting to the server - conn, err := grpc.Dial("127.0.0.1:4242", grpc.WithInsecure()) - if err != nil { - podGroupManager.log.Error("[PodGroup cancelFluxJob] Error connecting to server: %v", err) - return err - } - defer conn.Close() - - grpcclient := pb.NewFluxionServiceClient(conn) - _, cancel := context.WithTimeout(context.Background(), 200*time.Second) - defer cancel() - - // This error reflects the success or failure of the cancel request - request := &pb.CancelRequest{JobID: int64(jobid)} - response, err := grpcclient.Cancel(context.Background(), request) - if err != nil { - podGroupManager.log.Error("[PodGroup cancelFluxJob] did not receive any cancel response: %v", err) - return err - } - podGroupManager.log.Info("[PodGroup cancelFluxJob] Job cancellation for group %s result: %d", groupName, response.Error) - - // And this error is if the cancel was successful or not - if response.Error == 0 { - podGroupManager.log.Info("[PodGroup cancelFluxJob] Successful cancel of flux job: %d for group %s", jobid, groupName) - podGroupManager.cleanup(pod, groupName) - } else { - podGroupManager.log.Warning("[PodGroup cancelFluxJob] Failed to cancel flux job %d for group %s", jobid, groupName) - } - return nil -} - -// cleanup deletes the group name from groupToJobId, and pods names from the node lookup -func (podGroupManager *PodGroupManager) cleanup(pod *corev1.Pod, groupName string) { - - delete(podGroupManager.groupToJobId, groupName) - - // Clean up previous pod->node assignments - pods, err := podGroupManager.podLister.Pods(pod.Namespace).List( - labels.SelectorFromSet(labels.Set{v1alpha1.PodGroupLabel: groupName}), - ) - // TODO need to handle this / understand why it's the case - if err != nil { - return - } - for _, pod := range pods { - delete(podGroupManager.podToNode, pod.Name) - } -} - -// UpdatePod is called on an update, and the old and new object are presented -func (podGroupManager *PodGroupManager) UpdatePod(oldObj, newObj interface{}) { - - oldPod := oldObj.(*corev1.Pod) - newPod := newObj.(*corev1.Pod) - - // a pod is updated, get the group - // TODO should we be checking group / size for old vs new? - groupName, podGroup := podGroupManager.GetPodGroup(context.TODO(), oldPod) - - // If PodGroup is nil, still try to look up a faux name - // TODO need to check if this might be problematic - if podGroup == nil { - podGroup = fgroup.CreateFakeGroup(oldPod) - groupName = podGroup.Name - } - - podGroupManager.log.Verbose("[PodGroup UpdatePod] Processing event for pod %s in group %s from %s to %s", newPod.Name, groupName, oldPod.Status.Phase, newPod.Status.Phase) - - switch newPod.Status.Phase { - case corev1.PodPending: - // in this state we don't know if a pod is going to be running, thus we don't need to update job map - case corev1.PodRunning: - // if a pod is start running, we can add it state to the delta graph if it is scheduled by other scheduler - case corev1.PodSucceeded: - podGroupManager.log.Info("[PodGroup UpdatePod] Pod %s succeeded, Fluxnetes needs to free the resources", newPod.Name) - - podGroupManager.mutex.Lock() - defer podGroupManager.mutex.Unlock() - - // Do we have the group id in our cache? If yes, we haven't deleted the jobid yet - // I am worried here that if some pods are succeeded and others pending, this could - // be a mistake - fluxnetes would schedule it again - _, exists := podGroupManager.groupToJobId[groupName] - if exists { - podGroupManager.cancelFluxJob(groupName, oldPod) - } else { - podGroupManager.log.Verbose("[PodGroup UpdatePod] Succeeded pod %s/%s in group %s doesn't have flux jobid", newPod.Namespace, newPod.Name, groupName) - } - - case corev1.PodFailed: - - // a corner case need to be tested, the pod exit code is not 0, can be created with segmentation fault pi test - podGroupManager.log.Warning("[PodGroup UpdatePod] Pod %s in group %s failed, Fluxnetes needs to free the resources", newPod.Name, groupName) - - podGroupManager.mutex.Lock() - defer podGroupManager.mutex.Unlock() - - _, exists := podGroupManager.groupToJobId[groupName] - if exists { - podGroupManager.cancelFluxJob(groupName, oldPod) - } else { - podGroupManager.log.Error("[PodGroup UpdatePod] Failed pod %s/%s in group %s doesn't have flux jobid", newPod.Namespace, newPod.Name, groupName) - } - case corev1.PodUnknown: - // don't know how to deal with it as it's unknown phase - default: - // shouldn't enter this branch - } -} - -// DeletePod handles the delete event handler -func (podGroupManager *PodGroupManager) DeletePod(podObj interface{}) { - pod := podObj.(*corev1.Pod) - groupName, podGroup := podGroupManager.GetPodGroup(context.TODO(), pod) - - // If PodGroup is nil, still try to look up a faux name - if podGroup == nil { - podGroup = fgroup.CreateFakeGroup(pod) - groupName = podGroup.Name - } - - podGroupManager.log.Verbose("[PodGroup DeletePod] Delete pod %s in group %s has status %s", pod.Status.Phase, pod.Name, groupName) - switch pod.Status.Phase { - case corev1.PodSucceeded: - case corev1.PodPending: - podGroupManager.log.Verbose("[PodGroup DeletePod] Pod %s completed and is Pending termination, Fluxnetes needs to free the resources", pod.Name) - - podGroupManager.mutex.Lock() - defer podGroupManager.mutex.Unlock() - - _, exists := podGroupManager.groupToJobId[groupName] - if exists { - podGroupManager.cancelFluxJob(groupName, pod) - } else { - podGroupManager.log.Info("[PodGroup DeletePod] Terminating pod %s/%s in group %s doesn't have flux jobid", pod.Namespace, pod.Name, groupName) - } - case corev1.PodRunning: - podGroupManager.mutex.Lock() - defer podGroupManager.mutex.Unlock() - - _, exists := podGroupManager.groupToJobId[groupName] - if exists { - podGroupManager.cancelFluxJob(groupName, pod) - } else { - podGroupManager.log.Info("[PodGroup DeletePod] Deleted pod %s/%s in group %s doesn't have flux jobid", pod.Namespace, pod.Name, groupName) - } - } -} diff --git a/kubernetes/pkg/fluxnetes/defaults/defaults.go b/kubernetes/pkg/fluxnetes/defaults/defaults.go new file mode 100644 index 0000000..4b64d0f --- /dev/null +++ b/kubernetes/pkg/fluxnetes/defaults/defaults.go @@ -0,0 +1,11 @@ +package defaults + +import ( + "math" +) + +const ( + // https://github.com/riverqueue/river/discussions/475 + // The database column is an int16 + MaxAttempts = math.MaxInt16 +) diff --git a/kubernetes/pkg/fluxnetes/fluxnetes.go b/kubernetes/pkg/fluxnetes/fluxnetes.go index fc5781a..d755ead 100644 --- a/kubernetes/pkg/fluxnetes/fluxnetes.go +++ b/kubernetes/pkg/fluxnetes/fluxnetes.go @@ -18,172 +18,59 @@ package fluxnetes import ( "context" - "fmt" - "sync" - "time" + "strings" - v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/client-go/tools/cache" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" - - // Our logger moved into Kubernetes - "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/logger" - - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" - clientscheme "k8s.io/client-go/kubernetes/scheme" - corev1helpers "k8s.io/component-helpers/scheduling/corev1" + helpers "k8s.io/component-helpers/scheduling/corev1" "k8s.io/kubernetes/pkg/scheduler/framework" - fcore "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/core" - fgroup "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/group" - flabel "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/labels" + groups "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/group" ) var ( GroupName = "scheduling.x-k8s.io" ) -// Fluxnetes schedules pods in a group using Fluxion as a backend -// We inherit cosched.Coscheduling to use some of the primary functions -type Fluxnetes struct { - mutex sync.Mutex - handle framework.Handle - podGroupManager fcore.Manager - scheduleTimeout *time.Duration - podGroupBackoff *time.Duration - log *logger.DebugLogger +// JobResult serializes a result from Fluxnetes in the scheduler back to metadata +type JobResult struct { + JobID int32 `json:"jobid"` + Nodes string `json:"nodes"` + PodID string `json:"podid"` + PodSpec string `json:"podspec"` } -var ( - _ framework.QueueSortPlugin = &Fluxnetes{} - _ framework.PreFilterPlugin = &Fluxnetes{} - _ framework.FilterPlugin = &Fluxnetes{} - - _ framework.PostFilterPlugin = &Fluxnetes{} - _ framework.PermitPlugin = &Fluxnetes{} - _ framework.ReservePlugin = &Fluxnetes{} +func (j JobResult) GetNodes() []string { + return strings.Split(j.Nodes, ",") +} - _ framework.EnqueueExtensions = &Fluxnetes{} +// Fluxnetes (as a plugin) is only enabled for the queue sort +type Fluxnetes struct{} - // Set to be the same as coscheduling - permitWaitingTimeSeconds int64 = 300 - podGroupBackoffSeconds int64 = 0 +var ( + _ framework.QueueSortPlugin = &Fluxnetes{} ) const ( - // Name is the name of the plugin used in Registry and configurations. Name = "Fluxnetes" ) -// Initialize and return a new Fluxnetes Scheduler Plugin -func New(_ context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) { - - ctx := context.TODO() - - scheme := runtime.NewScheme() - _ = clientscheme.AddToScheme(scheme) - _ = v1.AddToScheme(scheme) - _ = v1alpha1.AddToScheme(scheme) - kubeClient, err := client.New(handle.KubeConfig(), client.Options{Scheme: scheme}) - if err != nil { - return nil, err - } - - // Make fluxnetes his own little logger! - // This can eventually be a flag, but just going to set for now - // It shall be a very chonky file. Oh lawd he comin! - l := logger.NewDebugLogger(logger.LevelDebug, "/tmp/fluxnetes.log") - - // PermitWaitingTimeSeconds is the waiting timeout in seconds. - scheduleTimeDuration := time.Duration(permitWaitingTimeSeconds) * time.Second - - // Performance improvement when retrieving list of objects by namespace or we'll log 'index not exist' warning. - fluxPodsInformer := handle.SharedInformerFactory().Core().V1().Pods().Informer() - fluxPodsInformer.AddIndexers(cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) - - podGroupManager := fcore.NewPodGroupManager( - kubeClient, - handle.SnapshotSharedLister(), - &scheduleTimeDuration, - // Keep the podInformer (from frameworkHandle) as the single source of Pods. - handle.SharedInformerFactory().Core().V1().Pods(), - l, - ) - - // Event handlers to call on podGroupManager - fluxPodsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - UpdateFunc: podGroupManager.UpdatePod, - DeleteFunc: podGroupManager.DeletePod, - }) - go fluxPodsInformer.Run(ctx.Done()) - - backoffSeconds := time.Duration(podGroupBackoffSeconds) * time.Second - plugin := &Fluxnetes{ - handle: handle, - podGroupManager: podGroupManager, - scheduleTimeout: &scheduleTimeDuration, - log: l, - podGroupBackoff: &backoffSeconds, - } - - // TODO this is not supported yet - // Account for resources in running cluster - err = plugin.RegisterExisting(ctx) - return plugin, err -} - func (fluxnetes *Fluxnetes) Name() string { return Name } -// Fluxnetes has added delete, although I wonder if update includes that signal -// and it's redundant? -func (fluxnetes *Fluxnetes) EventsToRegister() []framework.ClusterEventWithHint { - // To register a custom event, follow the naming convention at: - // https://git.k8s.io/kubernetes/pkg/scheduler/eventhandlers.go#L403-L410 - podGroupGVK := fmt.Sprintf("podgroups.v1alpha1.%v", GroupName) - return []framework.ClusterEventWithHint{ - {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Add | framework.Delete}}, - {Event: framework.ClusterEvent{Resource: framework.GVK(podGroupGVK), ActionType: framework.Add | framework.Update | framework.Delete}}, - } -} - -func (fluxnetes *Fluxnetes) Filter( - ctx context.Context, - cycleState *framework.CycleState, - pod *corev1.Pod, - nodeInfo *framework.NodeInfo, -) *framework.Status { - - fluxnetes.log.Verbose("[Fluxnetes Filter] Filtering input node %s", nodeInfo.Node().Name) - state, err := cycleState.Read(framework.StateKey(pod.Name)) - - // No error means we retrieved the state - if err == nil { - - // Try to convert the state to FluxStateDate - value, ok := state.(*fcore.FluxStateData) - - // If we have state data that isn't equal to the current assignment, no go - if ok && value.NodeName != nodeInfo.Node().Name { - return framework.NewStatus(framework.Unschedulable, "pod is not permitted") - } else { - fluxnetes.log.Info("[Fluxnetes Filter] node %s selected for %s\n", value.NodeName, pod.Name) - } - } - return framework.NewStatus(framework.Success) +// New returns an empty Fluxnetes plugin, which only provides a queue sort! +func New(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) { + return &Fluxnetes{}, nil } // Less is used to sort pods in the scheduling queue in the following order. // 1. Compare the priorities of Pods. -// 2. Compare the initialization timestamps of PodGroups or Pods. +// 2. Compare the initialization timestamps of Pods. // 3. Compare the keys of PodGroups/Pods: /. +// In practice this step isn't hugely important because the Fluxnetes queue does +// more arranging of pods, but this helps to pre-sort. func (fluxnetes *Fluxnetes) Less(podInfo1, podInfo2 *framework.QueuedPodInfo) bool { - prio1 := corev1helpers.PodPriority(podInfo1.Pod) - prio2 := corev1helpers.PodPriority(podInfo2.Pod) + prio1 := helpers.PodPriority(podInfo1.Pod) + prio2 := helpers.PodPriority(podInfo2.Pod) if prio1 != prio2 { return prio1 > prio2 } @@ -192,169 +79,17 @@ func (fluxnetes *Fluxnetes) Less(podInfo1, podInfo2 *framework.QueuedPodInfo) bo // which is what fluxnetes needs to distinguish between namespaces. Just the // name could be replicated between different namespaces // TODO add some representation of PodGroup back - ctx := context.TODO() - name1, podGroup1 := fluxnetes.podGroupManager.GetPodGroup(ctx, podInfo1.Pod) - name2, podGroup2 := fluxnetes.podGroupManager.GetPodGroup(ctx, podInfo2.Pod) + name1 := groups.GetPodGroupFullName(podInfo1.Pod) + name2 := groups.GetPodGroupFullName(podInfo2.Pod) - // Fluxnetes can only compare if we have two known groups. - // This tries for that first, and falls back to the initial attempt timestamp - creationTime1 := fgroup.GetCreationTimestamp(name1, podGroup1, podInfo1) - creationTime2 := fgroup.GetCreationTimestamp(name2, podGroup2, podInfo2) + // Try for creation time first, and fall back to naming + creationTime1 := groups.GetPodCreationTimestamp(podInfo1.Pod) + creationTime2 := groups.GetPodCreationTimestamp(podInfo2.Pod) // If they are the same, fall back to sorting by name. if creationTime1.Equal(&creationTime2) { - return fcore.GetNamespacedName(podInfo1.Pod) < fcore.GetNamespacedName(podInfo2.Pod) + return name1 < name2 } return creationTime1.Before(&creationTime2) } - -// PreFilterExtensions allow for callbacks on filtered states -// This is required to be defined for a PreFilter plugin -// https://github.com/kubernetes/kubernetes/blob/master/pkg/scheduler/framework/interface.go#L383 -func (fluxnetes *Fluxnetes) PreFilterExtensions() framework.PreFilterExtensions { - return nil -} - -// PreFilter performs the following validations. -// 1. Whether the PodGroup that the Pod belongs to is on the deny list. -// 2. Whether the total number of pods in a PodGroup is less than its `minMember`. -func (fluxnetes *Fluxnetes) PreFilter( - ctx context.Context, - state *framework.CycleState, - pod *corev1.Pod, -) (*framework.PreFilterResult, *framework.Status) { - - // Quick check if the pod is already scheduled - fluxnetes.mutex.Lock() - node := fluxnetes.podGroupManager.GetPodNode(pod) - fluxnetes.mutex.Unlock() - if node != "" { - fluxnetes.log.Info("[Fluxnetes PreFilter] assigned pod %s to node %s\n", pod.Name, node) - result := framework.PreFilterResult{NodeNames: sets.New(node)} - return &result, framework.NewStatus(framework.Success, "") - } - fluxnetes.log.Info("[Fluxnetes PreFilter] pod %s does not have a node assigned\n", pod.Name) - - // This will populate the node name into the pod group manager - err := fluxnetes.podGroupManager.PreFilter(ctx, pod, state) - if err != nil { - fluxnetes.log.Error("[Fluxnetes PreFilter] failed pod %s: %s", pod.Name, err.Error()) - return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error()) - } - node = fluxnetes.podGroupManager.GetPodNode(pod) - result := framework.PreFilterResult{NodeNames: sets.New(node)} - return &result, framework.NewStatus(framework.Success, "") -} - -// PostFilter is used to reject a group of pods if a pod does not pass PreFilter or Filter. -func (fluxnetes *Fluxnetes) PostFilter( - ctx context.Context, - state *framework.CycleState, - pod *corev1.Pod, - filteredNodeStatusMap framework.NodeToStatusMap, -) (*framework.PostFilterResult, *framework.Status) { - - groupName, podGroup := fluxnetes.podGroupManager.GetPodGroup(ctx, pod) - if podGroup == nil { - fluxnetes.log.Info("Pod does not belong to any group, pod %s", pod.Name) - return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable, "cannot find pod group") - } - - // This explicitly checks nodes, and we can skip scheduling another pod if we already - // have the minimum. For fluxnetes since we expect an exact size this likely is not needed - assigned := fluxnetes.podGroupManager.CalculateAssignedPods(podGroup.Name, pod.Namespace) - if assigned >= int(podGroup.Spec.MinMember) { - fluxnetes.log.Info("Assigned pods podGroup %s is assigned %s", groupName, assigned) - return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable) - } - - // Took out percentage chcek here, doesn't make sense to me. - - // It's based on an implicit assumption: if the nth Pod failed, - // it's inferrable other Pods belonging to the same PodGroup would be very likely to fail. - fluxnetes.handle.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) { - if waitingPod.GetPod().Namespace == pod.Namespace && flabel.GetPodGroupLabel(waitingPod.GetPod()) == podGroup.Name { - fluxnetes.log.Info("PostFilter rejects the pod for podGroup %s and pod %s", groupName, waitingPod.GetPod().Name) - waitingPod.Reject(fluxnetes.Name(), "optimistic rejection in PostFilter") - } - }) - - if fluxnetes.podGroupBackoff != nil { - pods, err := fluxnetes.handle.SharedInformerFactory().Core().V1().Pods().Lister().Pods(pod.Namespace).List( - labels.SelectorFromSet(labels.Set{v1alpha1.PodGroupLabel: flabel.GetPodGroupLabel(pod)}), - ) - if err == nil && len(pods) >= int(podGroup.Spec.MinMember) { - fluxnetes.podGroupManager.BackoffPodGroup(groupName, *fluxnetes.podGroupBackoff) - } - } - - fluxnetes.podGroupManager.DeletePermittedPodGroup(groupName) - return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable, - fmt.Sprintf("PodGroup %v gets rejected due to Pod %v is unschedulable even after PostFilter", groupName, pod.Name)) -} - -// Permit is the functions invoked by the framework at "Permit" extension point. -func (fluxnetes *Fluxnetes) Permit( - ctx context.Context, - state *framework.CycleState, - pod *corev1.Pod, - nodeName string, -) (*framework.Status, time.Duration) { - - fluxnetes.log.Info("Checking permit for pod %s to node %s", pod.Name, nodeName) - waitTime := *fluxnetes.scheduleTimeout - s := fluxnetes.podGroupManager.Permit(ctx, state, pod) - var retStatus *framework.Status - switch s { - case fcore.PodGroupNotSpecified: - fluxnetes.log.Info("Checking permit for pod %s to node %s: PodGroupNotSpecified", pod.Name, nodeName) - return framework.NewStatus(framework.Success, ""), 0 - case fcore.PodGroupNotFound: - fluxnetes.log.Info("Checking permit for pod %s to node %s: PodGroupNotFound", pod.Name, nodeName) - return framework.NewStatus(framework.Unschedulable, "PodGroup not found"), 0 - case fcore.Wait: - fluxnetes.log.Info("Pod %s is waiting to be scheduled to node %s", pod.Name, nodeName) - _, podGroup := fluxnetes.podGroupManager.GetPodGroup(ctx, pod) - if wait := fgroup.GetWaitTimeDuration(podGroup, fluxnetes.scheduleTimeout); wait != 0 { - waitTime = wait - } - retStatus = framework.NewStatus(framework.Wait) - - // We will also request to move the sibling pods back to activeQ. - fluxnetes.podGroupManager.ActivateSiblings(pod, state) - case fcore.Success: - podGroupFullName := flabel.GetPodGroupFullName(pod) - fluxnetes.handle.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) { - if flabel.GetPodGroupFullName(waitingPod.GetPod()) == podGroupFullName { - fluxnetes.log.Info("Permit allows pod %s", waitingPod.GetPod().Name) - waitingPod.Allow(fluxnetes.Name()) - } - }) - fluxnetes.log.Info("Permit allows pod %s", pod.Name) - retStatus = framework.NewStatus(framework.Success) - waitTime = 0 - } - - return retStatus, waitTime -} - -// Reserve is the functions invoked by the framework at "reserve" extension point. -func (fluxnetes *Fluxnetes) Reserve(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, nodeName string) *framework.Status { - return nil -} - -// Unreserve rejects all other Pods in the PodGroup when one of the pods in the group times out. -func (fluxnetes *Fluxnetes) Unreserve(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, nodeName string) { - groupName, podGroup := fluxnetes.podGroupManager.GetPodGroup(ctx, pod) - if podGroup == nil { - return - } - fluxnetes.handle.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) { - if waitingPod.GetPod().Namespace == pod.Namespace && flabel.GetPodGroupLabel(waitingPod.GetPod()) == podGroup.Name { - fluxnetes.log.Info("Unreserve rejects pod %s in group %s", waitingPod.GetPod().Name, groupName) - waitingPod.Reject(fluxnetes.Name(), "rejection in Unreserve") - } - }) - fluxnetes.podGroupManager.DeletePermittedPodGroup(groupName) -} diff --git a/kubernetes/pkg/fluxnetes/group/group.go b/kubernetes/pkg/fluxnetes/group/group.go index f29647a..13c32ed 100644 --- a/kubernetes/pkg/fluxnetes/group/group.go +++ b/kubernetes/pkg/fluxnetes/group/group.go @@ -4,60 +4,66 @@ import ( "fmt" "time" + "strconv" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - klog "k8s.io/klog/v2" - "k8s.io/kubernetes/pkg/scheduler/framework" - "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/labels" ) -// DefaultWaitTime is 60s if ScheduleTimeoutSeconds is not specified. -const DefaultWaitTime = 60 * time.Second - -// CreateFakeGroup wraps an arbitrary pod in a fake group for fluxnetes to schedule -// This happens only in PreFilter so we already sorted -func CreateFakeGroup(pod *corev1.Pod) *v1alpha1.PodGroup { - groupName := fmt.Sprintf("fluxnetes-solo-%s-%s", pod.Namespace, pod.Name) - return &v1alpha1.PodGroup{ - ObjectMeta: metav1.ObjectMeta{ - Name: groupName, - Namespace: pod.Namespace, - }, - Spec: v1alpha1.PodGroupSpec{MinMember: int32(1)}, +// A PodGroup holds the name and size of a pod group +// It is just a temporary holding structure +type PodGroup struct { + Name string + Size int32 + Timestamp metav1.MicroTime +} + +// getPodGroupName returns the pod group name +// 1. We first look to see if the pod is explicitly labeled +// 2. If not, we fall back to a default based on the pod name and namespace +func GetPodGroupName(pod *corev1.Pod) string { + groupName := labels.GetPodGroupLabel(pod) + + // If we don't have a group, create one under fluxnetes namespace + if groupName == "" { + groupName = fmt.Sprintf("fluxnetes-group-%s-%s", pod.Namespace, pod.Name) } + return groupName } -// GetCreationTimestamp first tries the group, then falls back to the initial attempt timestamp -// This is the only update we have made to the upstream PodGroupManager, because we are expecting -// a MicroTime and not a time.Time. -func GetCreationTimestamp(groupName string, podGroup *v1alpha1.PodGroup, podInfo *framework.QueuedPodInfo) metav1.MicroTime { +// GetPodGroupFullName get namespaced group name from pod labels +// This is primarily for sorting, so we consider namespace too. +func GetPodGroupFullName(pod *corev1.Pod) string { + groupName := GetPodGroupName(pod) + return fmt.Sprintf("%v/%v", pod.Namespace, groupName) +} + +// getPodGroupSize gets the group size, first from label then default of 1 +func GetPodGroupSize(pod *corev1.Pod) (int32, error) { - // Don't try to get a time for a pod group that does not exist - if podGroup == nil { - return metav1.NewMicroTime(*podInfo.InitialAttemptTimestamp) + // Do we have a group size? This will be parsed as a string, likely + groupSize, ok := pod.Labels[labels.PodGroupSizeLabel] + if !ok { + groupSize = "1" + pod.Labels[labels.PodGroupSizeLabel] = groupSize } - // IsZero is an indicator if this was actually set - // If the group label was present and we have a group, this will be true - if !podGroup.Status.ScheduleStartTime.IsZero() { - klog.Infof(" [Fluxnetes] Pod group %s was created at %s\n", groupName, podGroup.Status.ScheduleStartTime) - return metav1.NewMicroTime(podGroup.Status.ScheduleStartTime.Time) + // We need the group size to be an integer now! + size, err := strconv.ParseInt(groupSize, 10, 32) + if err != nil { + return 0, err } - // We should actually never get here. - klog.Errorf(" [Fluxnetes] Pod group %s time IsZero, we should not have reached here", groupName) - return metav1.NewMicroTime(*podInfo.InitialAttemptTimestamp) + return int32(size), nil } -// GetWaitTimeDuration returns a wait timeout based on the following precedences: -// 1. spec.scheduleTimeoutSeconds of the given podGroup, if specified -// 2. given scheduleTimeout, if not nil -// 3. fall back to DefaultWaitTime -func GetWaitTimeDuration(podGroup *v1alpha1.PodGroup, scheduleTimeout *time.Duration) time.Duration { - if podGroup != nil && podGroup.Spec.ScheduleTimeoutSeconds != nil { - return time.Duration(*podGroup.Spec.ScheduleTimeoutSeconds) * time.Second - } - if scheduleTimeout != nil && *scheduleTimeout != 0 { - return *scheduleTimeout +// GetPodCreationTimestamp +func GetPodCreationTimestamp(pod *corev1.Pod) metav1.MicroTime { + + // This is the first member of the group - use its CreationTimestamp + if !pod.CreationTimestamp.IsZero() { + return metav1.NewMicroTime(pod.CreationTimestamp.Time) } - return DefaultWaitTime + // If the pod for some reasond doesn't have a timestamp, assume now + return metav1.NewMicroTime(time.Now()) } diff --git a/kubernetes/pkg/fluxnetes/labels/labels.go b/kubernetes/pkg/fluxnetes/labels/labels.go index fd1579c..41aeaa0 100644 --- a/kubernetes/pkg/fluxnetes/labels/labels.go +++ b/kubernetes/pkg/fluxnetes/labels/labels.go @@ -1,27 +1,7 @@ -/* -Copyright 2020 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - package labels import ( - "fmt" - "time" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) // Labels to be shared between different components @@ -40,32 +20,3 @@ const ( func GetPodGroupLabel(pod *v1.Pod) string { return pod.Labels[PodGroupLabel] } - -// GetPodGroupFullName get namespaced group name from pod labels -func GetPodGroupFullName(pod *v1.Pod) string { - groupName := GetPodGroupLabel(pod) - if len(groupName) == 0 { - return "" - } - return fmt.Sprintf("%v/%v", pod.Namespace, groupName) -} - -// GetPodGroupSize gets the pod group size from the label -func GetPodGroupSize(pod *v1.Pod) string { - return pod.Labels[PodGroupSizeLabel] -} - -// getTimeCreated returns the timestamp when we saw the object -func GetTimeCreated() string { - - // Set the time created for a label - createdAt := metav1.NewMicroTime(time.Now()) - - // If we get an error here, the reconciler will set the time - var timestamp string - timeCreated, err := createdAt.MarshalJSON() - if err == nil { - timestamp = string(timeCreated) - } - return timestamp -} diff --git a/kubernetes/pkg/fluxnetes/logger/logger.go b/kubernetes/pkg/fluxnetes/logger/logger.go deleted file mode 100644 index d1e238e..0000000 --- a/kubernetes/pkg/fluxnetes/logger/logger.go +++ /dev/null @@ -1,87 +0,0 @@ -package logger - -// A small debug logger to write to file instead of klog -// I don't know where to close, so I'm opening and appending each time -// It's a bad design, but will work for debugging. - -import ( - "fmt" - "log" - "os" -) - -const ( - LevelNone = iota - LevelInfo - LevelWarning - LevelError - LevelVerbose - LevelDebug -) - -type DebugLogger struct { - level int - Filename string - handle *os.File -} - -func NewDebugLogger(level int, filename string) *DebugLogger { - return &DebugLogger{ - level: level, - Filename: filename, - } -} - -func (l *DebugLogger) Start() (*log.Logger, error) { - f, err := os.OpenFile(l.Filename, os.O_WRONLY|os.O_CREATE|os.O_APPEND, os.ModePerm) - if err != nil { - return nil, err - } - logger := log.New(f, "", 0) - l.handle = f - return logger, nil -} -func (l *DebugLogger) Stop() error { - if l.handle != nil { - return l.handle.Close() - } - return nil -} - -// Logging functions you should use! -func (l *DebugLogger) Info(message ...any) error { - return l.log(LevelInfo, " INFO: ", message...) -} -func (l *DebugLogger) Error(message ...any) error { - return l.log(LevelError, " ERROR: ", message...) -} -func (l *DebugLogger) Debug(message ...any) error { - return l.log(LevelDebug, " DEBUG: ", message...) -} -func (l *DebugLogger) Verbose(message ...any) error { - return l.log(LevelVerbose, "VERBOSE: ", message...) -} -func (l *DebugLogger) Warning(message ...any) error { - return l.log(LevelWarning, "WARNING: ", message...) -} - -// log is the shared class function for actually printing to the log -func (l *DebugLogger) log(level int, prefix string, message ...any) error { - logger, err := l.Start() - if err != nil { - return err - } - // Assume the prolog (to be formatted) is at index 0 - prolog := message[0].(string) - if prefix != "" { - prolog = prefix + " " + prolog - } - rest := message[1:] - - // msg := fmt.Sprintf(message...) - fmt.Printf("Compariing level %d <= %d\n", level, l.level) - if level <= l.level { - logger.Printf(prolog, rest...) - } - return l.Stop() -} diff --git a/kubernetes/pkg/fluxnetes/podgroup/podgroup.go b/kubernetes/pkg/fluxnetes/podgroup/podgroup.go deleted file mode 100644 index de90afa..0000000 --- a/kubernetes/pkg/fluxnetes/podgroup/podgroup.go +++ /dev/null @@ -1,61 +0,0 @@ -/* -Copyright 2020 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package util - -import ( - "encoding/json" - "fmt" - "time" - - v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/util/strategicpatch" - "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" -) - -// DefaultWaitTime is 60s if ScheduleTimeoutSeconds is not specified. -const DefaultWaitTime = 60 * time.Second - -// CreateMergePatch return patch generated from original and new interfaces -func CreateMergePatch(original, new interface{}) ([]byte, error) { - pvByte, err := json.Marshal(original) - if err != nil { - return nil, err - } - cloneByte, err := json.Marshal(new) - if err != nil { - return nil, err - } - patch, err := strategicpatch.CreateTwoWayMergePatch(pvByte, cloneByte, original) - if err != nil { - return nil, err - } - return patch, nil -} - -// GetPodGroupLabel get pod group name from pod labels -func GetPodGroupLabel(pod *v1.Pod) string { - return pod.Labels[v1alpha1.PodGroupLabel] -} - -// GetPodGroupFullName get namespaced group name from pod labels -func GetPodGroupFullName(pod *v1.Pod) string { - pgName := GetPodGroupLabel(pod) - if len(pgName) == 0 { - return "" - } - return fmt.Sprintf("%v/%v", pod.Namespace, pgName) -} diff --git a/kubernetes/pkg/fluxnetes/podgroup/resource.go b/kubernetes/pkg/fluxnetes/podgroup/resource.go deleted file mode 100644 index aee05f9..0000000 --- a/kubernetes/pkg/fluxnetes/podgroup/resource.go +++ /dev/null @@ -1,77 +0,0 @@ -/* -Copyright 2020 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package util - -import ( - "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/resource" - v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" - "k8s.io/kubernetes/pkg/scheduler/framework" -) - -// ResourceList returns a resource list of this resource. -// Note: this code used to exist in k/k, but removed in k/k#101465. -func ResourceList(r *framework.Resource) v1.ResourceList { - result := v1.ResourceList{ - v1.ResourceCPU: *resource.NewMilliQuantity(r.MilliCPU, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(r.Memory, resource.BinarySI), - v1.ResourcePods: *resource.NewQuantity(int64(r.AllowedPodNumber), resource.BinarySI), - v1.ResourceEphemeralStorage: *resource.NewQuantity(r.EphemeralStorage, resource.BinarySI), - } - for rName, rQuant := range r.ScalarResources { - if v1helper.IsHugePageResourceName(rName) { - result[rName] = *resource.NewQuantity(rQuant, resource.BinarySI) - } else { - result[rName] = *resource.NewQuantity(rQuant, resource.DecimalSI) - } - } - return result -} - -// GetPodEffectiveRequest gets the effective request resource of a pod to the origin resource. -// The Pod's effective request is the higher of: -// - the sum of all app containers(spec.Containers) request for a resource. -// - the effective init containers(spec.InitContainers) request for a resource. -// The effective init containers request is the highest request on all init containers. -func GetPodEffectiveRequest(pod *v1.Pod) v1.ResourceList { - initResources := make(v1.ResourceList) - resources := make(v1.ResourceList) - - for _, container := range pod.Spec.InitContainers { - for name, quantity := range container.Resources.Requests { - if q, ok := initResources[name]; ok && quantity.Cmp(q) <= 0 { - continue - } - initResources[name] = quantity - } - } - for _, container := range pod.Spec.Containers { - for name, quantity := range container.Resources.Requests { - if q, ok := resources[name]; ok { - quantity.Add(q) - } - resources[name] = quantity - } - } - for name, quantity := range initResources { - if q, ok := resources[name]; ok && quantity.Cmp(q) <= 0 { - continue - } - resources[name] = quantity - } - return resources -} diff --git a/kubernetes/pkg/fluxnetes/podspec/podspec.go b/kubernetes/pkg/fluxnetes/podspec/podspec.go index 919d636..636c5bd 100644 --- a/kubernetes/pkg/fluxnetes/podspec/podspec.go +++ b/kubernetes/pkg/fluxnetes/podspec/podspec.go @@ -1,19 +1,3 @@ -/* -Copyright 2022 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - package podspec import ( @@ -24,8 +8,7 @@ import ( pb "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/fluxion-grpc" ) -// TODO this package should be renamed something related to a PodSpec Info - +// TODO delete this function to be replaced by one above // getPodJobpsecLabels looks across labels and returns those relevant // to a jobspec func getPodJobspecLabels(pod *v1.Pod) []string { diff --git a/kubernetes/pkg/fluxnetes/queries/queries.go b/kubernetes/pkg/fluxnetes/queries/queries.go new file mode 100644 index 0000000..62b3d8f --- /dev/null +++ b/kubernetes/pkg/fluxnetes/queries/queries.go @@ -0,0 +1,19 @@ +package queries + +// Queries used by the main queue (and shared across strategies sometimes) +const ( + GetTimestampQuery = "select created_at from pods_provisional where group_name=$1 limit 1" + GetPodQuery = "select * from pods_provisional where group_name=$1 and namespace=$2 and name=$3" + InsertPodQuery = "insert into pods_provisional (podspec, namespace, name, created_at, group_name, group_size) values ($1, $2, $3, $4, $5, $6)" + CountPodsQuery = "select count(*) from pods_provisional where group_name=$1" + UpdateNodesQuery = "update river_job set args = jsonb_set(args, '{nodes}', to_jsonb($1::text)) where id=$2;" + + // This could use improvement from someone good at SQL. We need to: + // 1. Select groups for which the size >= the number of pods we've seen + // 2. Then get the group_name, group_size, and podspec for each (this goes to scheduler) + // 3. Delete all from the table + // Ensure we are sorting by the timestamp when they were added (should be DESC I think) + SelectGroupsAtSizeQuery = "select group_name from pods_provisional group by group_name, group_size having group_size >= count(*);" + SelectGroupsQuery = "select group_name, group_size, podspec from pods_provisional where group_name in ('%s');" + DeleteGroupsQuery = "delete from pods_provisional where group_name in ('%s');" +) diff --git a/kubernetes/pkg/fluxnetes/queue.go b/kubernetes/pkg/fluxnetes/queue.go new file mode 100644 index 0000000..db3a60d --- /dev/null +++ b/kubernetes/pkg/fluxnetes/queue.go @@ -0,0 +1,203 @@ +package fluxnetes + +import ( + "context" + "encoding/json" + "log/slog" + "os" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + klog "k8s.io/klog/v2" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + + "github.com/jackc/pgx/v5/pgtype" + "github.com/riverqueue/river" + "github.com/riverqueue/river/riverdriver/riverpgxv5" + + groups "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/group" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/queries" + strategies "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/strategy" +) + +const ( + queueMaxWorkers = 10 +) + +// Queue holds handles to queue database and event handles +// The database Pool also allows interacting with the pods table (database.go) +type Queue struct { + Pool *pgxpool.Pool + riverClient *river.Client[pgx.Tx] + EventChannels []*QueueEvent + Strategy strategies.QueueStrategy +} + +type ChannelFunction func() + +// QueueEvent holds the channel and defer function +type QueueEvent struct { + Channel <-chan *river.Event + Function ChannelFunction +} + +// NewQueue starts a new queue with a river client +func NewQueue(ctx context.Context) (*Queue, error) { + dbPool, err := pgxpool.New(context.Background(), os.Getenv("DATABASE_URL")) + if err != nil { + return nil, err + } + + // The default strategy now mirrors what fluence with Kubernetes does + // This can eventually be customizable + strategy := strategies.FCFSBackfill{} + workers := river.NewWorkers() + + // Each strategy has its own worker type + strategy.AddWorkers(workers) + riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ + Logger: slog.Default().With("id", "Fluxnetes"), + Queues: map[string]river.QueueConfig{ + river.QueueDefault: {MaxWorkers: queueMaxWorkers}, + }, + Workers: workers, + }) + if err != nil { + return nil, err + } + + // Create the queue and setup events for it + err = riverClient.Start(ctx) + if err != nil { + return nil, err + } + queue := Queue{riverClient: riverClient, Pool: dbPool, Strategy: strategy} + queue.setupEvents() + return &queue, nil +} + +// StopQueue creates a client (without calling start) only intended to +// issue stop, so we can leave out workers and queue from Config +func (q *Queue) Stop(ctx context.Context) error { + if q.riverClient != nil { + return q.riverClient.Stop(ctx) + } + return nil +} + +// We can tell how a job runs via events +// setupEvents create subscription channels for each event type +func (q *Queue) setupEvents() { + + q.EventChannels = []*QueueEvent{} + + // Subscribers tell the River client the kinds of events they'd like to receive. + // We add them to a listing to be used by Kubernetes. These can be subscribed + // to from elsewhere too (anywhere). Note that we are not subscribing to failed + // or snoozed, because they right now mean "allocation not possible" and that + // is too much noise. + for _, event := range []river.EventKind{ + river.EventKindJobCompleted, + river.EventKindJobCancelled, + // river.EventKindJobFailed, + // river.EventKindJobSnoozed, + } { + c, trigger := q.riverClient.Subscribe(event) + channel := &QueueEvent{Function: trigger, Channel: c} + q.EventChannels = append(q.EventChannels, channel) + } +} + +// Enqueue a new job to the provisional queue +// 1. Assemble (discover or define) the group +// 2. Add to provisional table +func (q *Queue) Enqueue(ctx context.Context, pod *corev1.Pod) error { + + // Get the pod name and size, first from labels, then defaults + groupName := groups.GetPodGroupName(pod) + size, err := groups.GetPodGroupSize(pod) + if err != nil { + return err + } + + // Get the creation timestamp for the group + ts, err := q.GetCreationTimestamp(pod, groupName) + if err != nil { + return err + } + + // Log the namespace/name, group name, and size + klog.Infof("Pod %s has Group %s (%d) created at %s", pod.Name, groupName, size, ts) + + // Add the pod to the provisional table. + group := &groups.PodGroup{Size: size, Name: groupName, Timestamp: ts} + return q.enqueueProvisional(ctx, pod, group) +} + +// EnqueueProvisional adds a pod to the provisional queue +func (q *Queue) enqueueProvisional(ctx context.Context, pod *corev1.Pod, group *groups.PodGroup) error { + + // This query will fail if there are no rows (the podGroup is not known) + var groupName, name, namespace string + row := q.Pool.QueryRow(context.Background(), queries.GetPodQuery, group.Name, pod.Namespace, pod.Name) + err := row.Scan(groupName, name, namespace) + + // We didn't find the pod in the table - add it. + if err != nil { + klog.Errorf("Did not find pod %s in group %s in table", pod.Name, group) + + // Prepare timestamp and podspec for insertion... + ts := &pgtype.Timestamptz{Time: group.Timestamp.Time, Valid: true} + podspec, err := json.Marshal(pod) + if err != nil { + return err + } + _, err = q.Pool.Query(ctx, queries.InsertPodQuery, string(podspec), pod.Namespace, pod.Name, ts, group.Name, group.Size) + + // Show the user a success or an error, we return it either way + if err != nil { + klog.Infof("Error inserting provisional pod %s", err) + } + return err + } + return err +} + +// Schedule moves jobs from provisional to work queue +// This is based on a queue strategy. The default is fcfs with backfill. +// This mimics what Kubernetes does. Note that jobs can be sorted +// based on the scheduled at time AND priority. +func (q *Queue) Schedule(ctx context.Context) error { + + // Queue Strategy "Schedule" moves provional to the worker queue + // We get them back in a back to schedule + batch, err := q.Strategy.Schedule(ctx, q.Pool) + if err != nil { + return err + } + + count, err := q.riverClient.InsertMany(ctx, batch) + if err != nil { + return err + } + klog.Infof("[Fluxnetes] Schedule inserted %d jobs\n", count) + return nil +} + +// GetCreationTimestamp returns the creation time of a podGroup or a pod in seconds (time.MicroTime) +// We either get this from the pod itself (if size 1) or from the database +func (q *Queue) GetCreationTimestamp(pod *corev1.Pod, groupName string) (metav1.MicroTime, error) { + + // First see if we've seen the group before, the creation times are shared across a group + ts := metav1.MicroTime{} + + // This query will fail if there are no rows (the podGroup is not known) + err := q.Pool.QueryRow(context.Background(), queries.GetTimestampQuery, groupName).Scan(&ts) + if err == nil { + klog.Info("Creation timestamp is", ts) + return ts, err + } + return groups.GetPodCreationTimestamp(pod), nil +} diff --git a/kubernetes/pkg/fluxnetes/register.go b/kubernetes/pkg/fluxnetes/register.go deleted file mode 100644 index 0fd36ff..0000000 --- a/kubernetes/pkg/fluxnetes/register.go +++ /dev/null @@ -1,55 +0,0 @@ -/* -Copyright 2020 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package fluxnetes - -import ( - "context" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" -) - -// RegisterExisting uses the in cluster API to ensure existing pods -// are known to fluxnetes, This is a one-time, static approach, so if a resource -// here goes away we cannot remove it from being known. But it's better than -// not having it, and having fluxion assume more resources than the -// cluster has available. This is a TODO as fluxion does not support it -func (fluxnetes *Fluxnetes) RegisterExisting(ctx context.Context) error { - - // creates an in-cluster config and client - config, err := rest.InClusterConfig() - if err != nil { - fluxnetes.log.Error("[Fluxnetes RegisterExisting] Error creating in-cluster config: %s\n", err) - return err - } - // creates the clientset - clientset, err := kubernetes.NewForConfig(config) - if err != nil { - fluxnetes.log.Error("[Fluxnetes RegisterExisting] Error creating client for config: %s\n", err) - return err - } - // get pods in all the namespaces by omitting namespace - // Or specify namespace to get pods in particular namespace - pods, err := clientset.CoreV1().Pods("").List(ctx, metav1.ListOptions{}) - if err != nil { - fluxnetes.log.Info("[Fluxnetes RegisterExisting] Error listing pods: %s\n", err) - return err - } - fluxnetes.log.Info("[Fluxnetes RegisterExisting] Found %d existing pods in the cluster\n", len(pods.Items)) - return nil -} diff --git a/kubernetes/pkg/fluxnetes/strategy/fcfs.go b/kubernetes/pkg/fluxnetes/strategy/fcfs.go new file mode 100644 index 0000000..58e0d53 --- /dev/null +++ b/kubernetes/pkg/fluxnetes/strategy/fcfs.go @@ -0,0 +1,159 @@ +package strategy + +import ( + "context" + "fmt" + "strings" + + klog "k8s.io/klog/v2" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/riverqueue/river" + + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/defaults" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/queries" + work "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/strategy/workers" +) + +// FCFS with Backfill +// Schedule jobs that come in first, but allow smaller jobs to fill in +type FCFSBackfill struct{} + +// Name returns shortened "first come first serve" +func (FCFSBackfill) Name() string { + return "fcfs-backfill" +} + +// Database Model we are retrieving for the Schedule function +// We will eventually want more than these three +type JobModel struct { + GroupName string `db:"group_name"` + GroupSize int32 `db:"group_size"` + Podspec string `db:"podspec"` + // CreatedAt time.Time `db:"created_at"` +} + +// GetWorker returns the worker for the queue strategy +// TODO(vsoch) rename this to be more specific for the worker type +func (FCFSBackfill) AddWorkers(workers *river.Workers) { + river.AddWorker(workers, &work.JobWorker{}) +} + +// queryGroupsAtSize returns groups that have achieved minimum size +func (s FCFSBackfill) queryGroupsAtSize(ctx context.Context, pool *pgxpool.Pool) ([]string, error) { + + // First retrieve the group names that are the right size + rows, err := pool.Query(ctx, queries.SelectGroupsAtSizeQuery) + if err != nil { + return nil, err + } + defer rows.Close() + + // Collect rows into single result + groupNames, err := pgx.CollectRows(rows, pgx.RowTo[string]) + klog.Infof("GROUP NAMES %s", groupNames) + return groupNames, err +} + +// queryGroupsAtSize returns groups that have achieved minimum size +func (s FCFSBackfill) deleteGroups(ctx context.Context, pool *pgxpool.Pool, groupNames []string) error { + + // First retrieve the group names that are the right size + query := fmt.Sprintf(queries.DeleteGroupsQuery, strings.Join(groupNames, ",")) + klog.Infof("DELETE %s", query) + rows, err := pool.Query(ctx, query) + if err != nil { + return err + } + defer rows.Close() + return err +} + +// queryGroupsAtSize returns groups that have achieved minimum size +func (s FCFSBackfill) getGroupsAtSize(ctx context.Context, pool *pgxpool.Pool, groupNames []string) ([]work.JobArgs, error) { + + // Now we need to collect all the pods that match that. + query := fmt.Sprintf(queries.SelectGroupsQuery, strings.Join(groupNames, "','")) + klog.Infof("GET %s", query) + rows, err := pool.Query(ctx, query) + if err != nil { + return nil, err + } + defer rows.Close() + + // Collect rows into map, and then slice of jobs + // The map whittles down the groups into single entries + // We will eventually not want to do that, assuming podspecs are different in a group + jobs := []work.JobArgs{} + lookup := map[string]work.JobArgs{} + + // Collect rows into single result + models, err := pgx.CollectRows(rows, pgx.RowToStructByName[JobModel]) + + // TODO(vsoch) we need to collect all podspecs here and be able to give that to the worker + for _, model := range models { + jobArgs := work.JobArgs{GroupName: model.GroupName, Podspec: model.Podspec, GroupSize: model.GroupSize} + lookup[model.GroupName] = jobArgs + } + for _, jobArgs := range lookup { + jobs = append(jobs, jobArgs) + } + return jobs, nil +} + +// queryReady checks if the number of pods we know of is >= required group size +// TODO(vsoch) This currently returns the entire table, and the sql needs to be tweaked +func (s FCFSBackfill) queryReady(ctx context.Context, pool *pgxpool.Pool) ([]work.JobArgs, error) { + + // 1. Get the list of group names that have pod count >= their size + groupNames, err := s.queryGroupsAtSize(ctx, pool) + if err != nil { + return nil, err + } + + // 2. Now we need to collect all the pods that match that. + jobs, err := s.getGroupsAtSize(ctx, pool, groupNames) + if err != nil { + return nil, err + } + + // 3. Finally, we need to delete them from provisional + err = s.deleteGroups(ctx, pool, groupNames) + return jobs, err +} + +// Schedule moves pod groups from provisional to workers based on a strategy. +// We return a listing of river.JobArgs (JobArgs here) to be submit with batch. +// In this case it is first come first serve - we just sort based on the timestamp +// and add them to the worker queue. They run with they can, with smaller +// jobs being allowed to fill in. Other strategies will need to handle AskFlux +// and submitting batch differently. +func (s FCFSBackfill) Schedule(ctx context.Context, pool *pgxpool.Pool) ([]river.InsertManyParams, error) { + + // Is this group ready to be scheduled with the addition of this pod? + jobs, err := s.queryReady(ctx, pool) + if err != nil { + klog.Errorf("Issue FCFS with backfill querying for ready groups", err) + return nil, err + } + + // Shared insertOpts. + // Tags can eventually be specific to job attributes, queues, etc. + insertOpts := river.InsertOpts{ + MaxAttempts: defaults.MaxAttempts, + Tags: []string{s.Name()}, + } + + // https://riverqueue.com/docs/batch-job-insertion + // Note: this is how to eventually add Priority (1-4, 4 is lowest) + // And we can customize other InsertOpts. Of interest is Pending: + // https://github.com/riverqueue/river/blob/master/insert_opts.go#L35-L40 + // Note also that ScheduledAt can be used for a reservation! + batch := []river.InsertManyParams{} + for _, jobArgs := range jobs { + args := river.InsertManyParams{Args: jobArgs, InsertOpts: &insertOpts} + batch = append(batch, args) + } + return batch, err +} diff --git a/kubernetes/pkg/fluxnetes/strategy/strategy.go b/kubernetes/pkg/fluxnetes/strategy/strategy.go new file mode 100644 index 0000000..c9b2e53 --- /dev/null +++ b/kubernetes/pkg/fluxnetes/strategy/strategy.go @@ -0,0 +1,22 @@ +package strategy + +import ( + "context" + + "github.com/jackc/pgx/v5/pgxpool" + "github.com/riverqueue/river" +) + +// Interface for a queue strategy +// A queue strategy both controls a work function (what the worker does, and arguments) +// Along with how to orchestrate the last part of the schedule loop, schedule, which +// moves pods from provisional (waiting for groups to be ready) into worker queues + +// We currently just return a name, and provide a schedule function to move things around! +type QueueStrategy interface { + Name() string + + // provide the entire queue to interact with + Schedule(ctx context.Context, pool *pgxpool.Pool) ([]river.InsertManyParams, error) + AddWorkers(*river.Workers) +} diff --git a/kubernetes/pkg/fluxnetes/strategy/workers/workers.go b/kubernetes/pkg/fluxnetes/strategy/workers/workers.go new file mode 100644 index 0000000..b235cdb --- /dev/null +++ b/kubernetes/pkg/fluxnetes/strategy/workers/workers.go @@ -0,0 +1,136 @@ +package workers + +import ( + "context" + "encoding/json" + "os" + "strings" + "time" + + "github.com/jackc/pgx/v5/pgxpool" + "google.golang.org/grpc" + + corev1 "k8s.io/api/core/v1" + klog "k8s.io/klog/v2" + + pb "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/fluxion-grpc" + + "github.com/riverqueue/river" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/podspec" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes/queries" +) + +type JobArgs struct { + + // Submit Args + Jobspec string `json:"jobspec"` + Podspec string `json:"podspec"` + GroupName string `json:"groupName"` + GroupSize int32 `json:"groupSize"` + + // Nodes return to Kubernetes to bind, and MUST + // have attributes for the Nodes and Podspecs. + // We can eventually have a kubectl command + // to get a job too ;) + Nodes string `json:"nodes"` + FluxJob int64 `json:"jobid"` + PodId string `json:"podid"` +} + +// The Kind MUST correspond to the Args and Worker +func (args JobArgs) Kind() string { return "job" } + +type JobWorker struct { + river.WorkerDefaults[JobArgs] +} + +// Work performs the AskFlux action. Cases include: +// Allocated: the job was successful and does not need to be re-queued. We return nil (completed) +// NotAllocated: the job cannot be allocated and needs to be requeued +// Not possible for some reason, likely needs a cancel +// Are there cases of scheduling out into the future further? +// See https://riverqueue.com/docs/snoozing-jobs +func (w JobWorker) Work(ctx context.Context, job *river.Job[JobArgs]) error { + klog.Infof("[WORKER] JobStatus Running for group %s", job.Args.GroupName) + + // Convert jobspec back to json, and then pod + var pod corev1.Pod + err := json.Unmarshal([]byte(job.Args.Podspec), &pod) + if err != nil { + return err + } + + // IMPORTANT: this is a JobSpec for *one* pod, assuming they are all the same. + // This obviously may not be true if we have a hetereogenous PodGroup. + // We name it based on the group, since it will represent the group + jobspec := podspec.PreparePodJobSpec(&pod, job.Args.GroupName) + klog.Infof("Prepared pod jobspec %s", jobspec) + + // Connect to the Fluxion service. Returning an error means we retry + // see: https://riverqueue.com/docs/job-retries + conn, err := grpc.Dial("127.0.0.1:4242", grpc.WithInsecure()) + if err != nil { + klog.Error("[Fluxnetes] AskFlux error connecting to server: %v\n", err) + return err + } + defer conn.Close() + + // Let's ask Flux if we can allocate the job! + fluxion := pb.NewFluxionServiceClient(conn) + _, cancel := context.WithTimeout(context.Background(), 200*time.Second) + defer cancel() + + // Prepare the request to allocate. + request := &pb.MatchRequest{ + Ps: jobspec, + Request: "allocate", + Count: job.Args.GroupSize, + } + + // An error here is an error with making the request + response, err := fluxion.Match(context.Background(), request) + if err != nil { + klog.Error("[Fluxnetes] AskFlux did not receive any match response: %v\n", err) + return err + } + klog.Info("Fluxion response %s", response) + + // These don't actually update, eventually we can update them also in the database update + // We update the "Args" of the job to pass the node assignment back to the scheduler + // job.Args.PodId = response.GetPodID() + // job.Args.FluxJob = response.GetJobID() + + // Get the nodelist and serialize into list of strings for job args + nodelist := response.GetNodelist() + nodes := []string{} + for _, node := range nodelist { + nodes = append(nodes, node.NodeID) + } + nodeStr := strings.Join(nodes, ",") + + // We must update the database with nodes from here with a query + // This will be sent back to the Kubernetes scheduler + pool, err := pgxpool.New(ctx, os.Getenv("DATABASE_URL")) + if err != nil { + return err + } + + rows, err := pool.Query(ctx, queries.UpdateNodesQuery, nodeStr, job.ID) + if err != nil { + return err + } + defer rows.Close() + + // Collect rows into single result + // pgx.CollectRows(rows, pgx.RowTo[string]) + // klog.Infof("Values: %s", values) + + klog.Infof("[Fluxnetes] nodes allocated %s for flux job id %d\n", nodeStr, job.Args.FluxJob) + return nil +} + +// If needed, to get a client from a worker (to submit more jobs) +// client, err := river.ClientFromContextSafely[pgx.Tx](ctx) +// if err != nil { +// return fmt.Errorf("error getting client from context: %w", err) +// } diff --git a/kubernetes/pkg/fluxnetes/types/types.go b/kubernetes/pkg/fluxnetes/types/types.go deleted file mode 100644 index ae77ef1..0000000 --- a/kubernetes/pkg/fluxnetes/types/types.go +++ /dev/null @@ -1,136 +0,0 @@ -/* -Copyright 2020 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package types - -import ( - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" -) - -// PodGroupPhase is the phase of a pod group at the current time. -type PodGroupPhase string - -// These are the valid phase of podGroups. -const ( - // PodGroupPending means the pod group has been accepted by the system, but scheduler can not allocate - // enough resources to it. - PodGroupPending PodGroupPhase = "Pending" - - // PodGroupRunning means the `spec.minMember` pods of the pod group are in running phase. - PodGroupRunning PodGroupPhase = "Running" - - // PodGroupScheduling means the number of pods scheduled is bigger than `spec.minMember` - // but the number of running pods has not reached the `spec.minMember` pods of PodGroups. - PodGroupScheduling PodGroupPhase = "Scheduling" - - // PodGroupUnknown means a part of `spec.minMember` pods of the pod group have been scheduled but the others can not - // be scheduled due to, e.g. not enough resource; scheduler will wait for related controllers to recover them. - PodGroupUnknown PodGroupPhase = "Unknown" - - // PodGroupFinished means the `spec.minMember` pods of the pod group are successfully finished. - PodGroupFinished PodGroupPhase = "Finished" - - // PodGroupFailed means at least one of `spec.minMember` pods have failed. - PodGroupFailed PodGroupPhase = "Failed" - - // PodGroupLabel is the default label of coscheduling - PodGroupLabel = "scheduling.x-k8s.io/pod-group" -) - -// PodGroup is a collection of Pod; used for batch workload. -// +genclient -// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object -// +kubebuilder:object:root=true -// +kubebuilder:resource:shortName={pg,pgs} -// +kubebuilder:subresource:status -// +kubebuilder:metadata:annotations="api-approved.kubernetes.io=https://github.com/kubernetes-sigs/scheduler-plugins/pull/50" -type PodGroup struct { - metav1.TypeMeta `json:",inline"` - // Standard object's metadata. - // +optional - metav1.ObjectMeta `json:"metadata,omitempty"` - - // Specification of the desired behavior of the pod group. - // +optional - Spec PodGroupSpec `json:"spec,omitempty"` - - // Status represents the current information about a pod group. - // This data may not be up to date. - // +optional - Status PodGroupStatus `json:"status,omitempty"` -} - -// PodGroupSpec represents the template of a pod group. -type PodGroupSpec struct { - // MinMember defines the minimal number of members/tasks to run the pod group; - // if there's not enough resources to start all tasks, the scheduler - // will not start any. - MinMember int32 `json:"minMember,omitempty"` - - // MinResources defines the minimal resource of members/tasks to run the pod group; - // if there's not enough resources to start all tasks, the scheduler - // will not start any. - MinResources v1.ResourceList `json:"minResources,omitempty"` - - // ScheduleTimeoutSeconds defines the maximal time of members/tasks to wait before run the pod group; - ScheduleTimeoutSeconds *int32 `json:"scheduleTimeoutSeconds,omitempty"` -} - -// PodGroupStatus represents the current state of a pod group. -type PodGroupStatus struct { - // Current phase of PodGroup. - Phase PodGroupPhase `json:"phase,omitempty"` - - // OccupiedBy marks the workload (e.g., deployment, statefulset) UID that occupy the podgroup. - // It is empty if not initialized. - OccupiedBy string `json:"occupiedBy,omitempty"` - - // The number of actively running pods. - // +optional - Running int32 `json:"running,omitempty"` - - // The number of pods which reached phase Succeeded. - // +optional - Succeeded int32 `json:"succeeded,omitempty"` - - // The number of pods which reached phase Failed. - // +optional - Failed int32 `json:"failed,omitempty"` - - // CreationTime is intended to mock the object CreationTime, - // but set by us to be MicroTime instead of Time. - // +optional - CreationTime metav1.MicroTime `json:"creationTime,omitempty"` - - // ScheduleStartTime of the group is when we want to start counting - // "at time N plus 48 hours, this is when we deem time waited is too long" - // +optional - ScheduleStartTime metav1.MicroTime `json:"scheduleStartTime,omitempty"` -} - -// +kubebuilder:object:root=true - -// PodGroupList is a collection of pod groups. -type PodGroupList struct { - metav1.TypeMeta `json:",inline"` - // Standard list metadata - // +optional - metav1.ListMeta `json:"metadata,omitempty"` - - // Items is the list of PodGroup - Items []PodGroup `json:"items"` -} diff --git a/kubernetes/pkg/scheduler/schedule_one.go b/kubernetes/pkg/scheduler/schedule_one.go index 02f7916..d42e0d7 100644 --- a/kubernetes/pkg/scheduler/schedule_one.go +++ b/kubernetes/pkg/scheduler/schedule_one.go @@ -95,43 +95,23 @@ func (sched *Scheduler) ScheduleOne(ctx context.Context) { return } - logger.V(3).Info("Attempting to schedule pod", "pod", klog.KObj(pod)) - - // Synchronously attempt to find a fit for the pod. - start := time.Now() - state := framework.NewCycleState() - state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent) - - // Initialize an empty podsToActivate struct, which will be filled up by plugins or stay empty. - podsToActivate := framework.NewPodsToActivate() - state.Write(framework.PodsToActivateKey, podsToActivate) - - schedulingCycleCtx, cancel := context.WithCancel(ctx) - defer cancel() - - scheduleResult, assumedPodInfo, status := sched.schedulingCycle(schedulingCycleCtx, state, fwk, podInfo, start, podsToActivate) - if !status.IsSuccess() { - sched.FailureHandler(schedulingCycleCtx, fwk, assumedPodInfo, status, scheduleResult.nominatingInfo, start) - return + // Add the pod to the provisional queue + fluxCtx, cancelEnqueue := context.WithCancel(ctx) + defer cancelEnqueue() + err = sched.Queue.Enqueue(fluxCtx, pod) + if err != nil { + logger.Error(err, "Issue with fluxnetes Enqueue") } - // bind the pod to its host asynchronously (we can do this b/c of the assumption step above). - go func() { - bindingCycleCtx, cancel := context.WithCancel(ctx) - defer cancel() - - metrics.Goroutines.WithLabelValues(metrics.Binding).Inc() - defer metrics.Goroutines.WithLabelValues(metrics.Binding).Dec() - - status := sched.bindingCycle(bindingCycleCtx, state, fwk, scheduleResult, assumedPodInfo, start, podsToActivate) - if !status.IsSuccess() { - sched.handleBindingCycleError(bindingCycleCtx, state, fwk, assumedPodInfo, start, scheduleResult, status) - return - } - // Usually, DonePod is called inside the scheduling queue, - // but in this case, we need to call it here because this Pod won't go back to the scheduling queue. - sched.SchedulingQueue.Done(assumedPodInfo.Pod.UID) - }() + // TODO(vsoch): the schedulingCycle should be run here, and we should save everything we need for either + // the bindingCycle directly, OR information to pass to fluxion (nodes to skip, resources needed, etc) + // Right now I'm running it "faux" before the binding cycle, but not using any results, mainly to + // populate information about volumes, etc. + // Move from provisional to worker queue to schedule via events + err = sched.Queue.Schedule(fluxCtx) + if err != nil { + logger.Error(err, "Issue with fluxnetes Schedule") + } } var clearNominatedNode = &framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: ""} @@ -191,6 +171,9 @@ func (sched *Scheduler) schedulingCycle( } metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start)) + + // NOTE: when refactored, this needs to be given the node from fluence + // Right now we get a message about mismatch between scheduled and assumed // Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet. // This allows us to keep scheduling without waiting on binding to occur. assumedPodInfo := podInfo.DeepCopy() @@ -251,14 +234,6 @@ func (sched *Scheduler) schedulingCycle( return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, runPermitStatus } - - // At the end of a successful scheduling cycle, pop and move up Pods if needed. - if len(podsToActivate.Map) != 0 { - sched.SchedulingQueue.Activate(logger, podsToActivate.Map) - // Clear the entries after activation. - podsToActivate.Map = make(map[string]*v1.Pod) - } - return scheduleResult, assumedPodInfo, nil } @@ -275,22 +250,7 @@ func (sched *Scheduler) bindingCycle( assumedPod := assumedPodInfo.Pod - // Run "permit" plugins. - if status := fwk.WaitOnPermit(ctx, assumedPod); !status.IsSuccess() { - if status.IsRejected() { - fitErr := &framework.FitError{ - NumAllNodes: 1, - Pod: assumedPodInfo.Pod, - Diagnosis: framework.Diagnosis{ - NodeToStatusMap: framework.NodeToStatusMap{scheduleResult.SuggestedHost: status}, - UnschedulablePlugins: sets.New(status.Plugin()), - }, - } - return framework.NewStatus(status.Code()).WithError(fitErr) - } - return status - } - + // NOTE: permit plugins removed from here // Run "prebind" plugins. if status := fwk.RunPreBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost); !status.IsSuccess() { if status.IsRejected() { @@ -314,22 +274,12 @@ func (sched *Scheduler) bindingCycle( // Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2. logger.V(2).Info("Successfully bound pod to node", "pod", klog.KObj(assumedPod), "node", scheduleResult.SuggestedHost, "evaluatedNodes", scheduleResult.EvaluatedNodes, "feasibleNodes", scheduleResult.FeasibleNodes) - metrics.PodScheduled(fwk.ProfileName(), metrics.SinceInSeconds(start)) - metrics.PodSchedulingAttempts.Observe(float64(assumedPodInfo.Attempts)) - if assumedPodInfo.InitialAttemptTimestamp != nil { - metrics.PodSchedulingDuration.WithLabelValues(getAttemptsLabel(assumedPodInfo)).Observe(metrics.SinceInSeconds(*assumedPodInfo.InitialAttemptTimestamp)) - metrics.PodSchedulingSLIDuration.WithLabelValues(getAttemptsLabel(assumedPodInfo)).Observe(metrics.SinceInSeconds(*assumedPodInfo.InitialAttemptTimestamp)) - } + + // Note(vsoch): metrics removed from here. // Run "postbind" plugins. fwk.RunPostBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost) - // At the end of a successful binding cycle, move up Pods if needed. - if len(podsToActivate.Map) != 0 { - sched.SchedulingQueue.Activate(logger, podsToActivate.Map) - // Unlike the logic in schedulingCycle(), we don't bother deleting the entries - // as `podsToActivate.Map` is no longer consumed. - } - + // Note(vsoch): pods to activate update removed from here. return nil } diff --git a/kubernetes/pkg/scheduler/scheduler.go b/kubernetes/pkg/scheduler/scheduler.go new file mode 100644 index 0000000..a689630 --- /dev/null +++ b/kubernetes/pkg/scheduler/scheduler.go @@ -0,0 +1,684 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "time" + + "github.com/riverqueue/river" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/client-go/dynamic/dynamicinformer" + "k8s.io/client-go/informers" + coreinformers "k8s.io/client-go/informers/core/v1" + clientset "k8s.io/client-go/kubernetes" + restclient "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" + configv1 "k8s.io/kube-scheduler/config/v1" + "k8s.io/kubernetes/pkg/features" + schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config" + "k8s.io/kubernetes/pkg/scheduler/apis/config/scheme" + "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/parallelize" + frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/fluxnetes" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources" + frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" + internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache" + cachedebugger "k8s.io/kubernetes/pkg/scheduler/internal/cache/debugger" + internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue" + "k8s.io/kubernetes/pkg/scheduler/metrics" + "k8s.io/kubernetes/pkg/scheduler/profile" + "k8s.io/kubernetes/pkg/scheduler/util/assumecache" +) + +const ( + // Duration the scheduler will wait before expiring an assumed pod. + // See issue #106361 for more details about this parameter and its value. + durationToExpireAssumedPod time.Duration = 0 +) + +// ErrNoNodesAvailable is used to describe the error that no nodes available to schedule pods. +var ErrNoNodesAvailable = fmt.Errorf("no nodes available to schedule pods") + +// Scheduler watches for new unscheduled pods. It attempts to find +// nodes that they fit on and writes bindings back to the api server. +type Scheduler struct { + // It is expected that changes made via Cache will be observed + // by NodeLister and Algorithm. + Cache internalcache.Cache + + Extenders []framework.Extender + + // Fluxnetes queue + Queue *fluxnetes.Queue + + // NextPod should be a function that blocks until the next pod + // is available. We don't use a channel for this, because scheduling + // a pod may take some amount of time and we don't want pods to get + // stale while they sit in a channel. + NextPod func(logger klog.Logger) (*framework.QueuedPodInfo, error) + + // FailureHandler is called upon a scheduling failure. + FailureHandler FailureHandlerFn + + // SchedulePod tries to schedule the given pod to one of the nodes in the node list. + // Return a struct of ScheduleResult with the name of suggested host on success, + // otherwise will return a FitError with reasons. + SchedulePod func(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResult, error) + + // Close this to shut down the scheduler. + StopEverything <-chan struct{} + + // SchedulingQueue holds pods to be scheduled + SchedulingQueue internalqueue.SchedulingQueue + + // Profiles are the scheduling profiles. + Profiles profile.Map + + client clientset.Interface + + nodeInfoSnapshot *internalcache.Snapshot + + percentageOfNodesToScore int32 + + nextStartNodeIndex int + + // logger *must* be initialized when creating a Scheduler, + // otherwise logging functions will access a nil sink and + // panic. + logger klog.Logger + + // registeredHandlers contains the registrations of all handlers. It's used to check if all handlers have finished syncing before the scheduling cycles start. + registeredHandlers []cache.ResourceEventHandlerRegistration +} + +func (sched *Scheduler) applyDefaultHandlers() { + sched.SchedulePod = sched.schedulePod + sched.FailureHandler = sched.handleSchedulingFailure +} + +type schedulerOptions struct { + componentConfigVersion string + kubeConfig *restclient.Config + // Overridden by profile level percentageOfNodesToScore if set in v1. + percentageOfNodesToScore int32 + podInitialBackoffSeconds int64 + podMaxBackoffSeconds int64 + podMaxInUnschedulablePodsDuration time.Duration + // Contains out-of-tree plugins to be merged with the in-tree registry. + frameworkOutOfTreeRegistry frameworkruntime.Registry + profiles []schedulerapi.KubeSchedulerProfile + extenders []schedulerapi.Extender + frameworkCapturer FrameworkCapturer + parallelism int32 + applyDefaultProfile bool +} + +// Option configures a Scheduler +type Option func(*schedulerOptions) + +// ScheduleResult represents the result of scheduling a pod. +type ScheduleResult struct { + // Name of the selected node. + SuggestedHost string + // The number of nodes the scheduler evaluated the pod against in the filtering + // phase and beyond. + EvaluatedNodes int + // The number of nodes out of the evaluated ones that fit the pod. + FeasibleNodes int + // The nominating info for scheduling cycle. + nominatingInfo *framework.NominatingInfo +} + +// WithComponentConfigVersion sets the component config version to the +// KubeSchedulerConfiguration version used. The string should be the full +// scheme group/version of the external type we converted from (for example +// "kubescheduler.config.k8s.io/v1") +func WithComponentConfigVersion(apiVersion string) Option { + return func(o *schedulerOptions) { + o.componentConfigVersion = apiVersion + } +} + +// WithKubeConfig sets the kube config for Scheduler. +func WithKubeConfig(cfg *restclient.Config) Option { + return func(o *schedulerOptions) { + o.kubeConfig = cfg + } +} + +// WithProfiles sets profiles for Scheduler. By default, there is one profile +// with the name "default-scheduler". +func WithProfiles(p ...schedulerapi.KubeSchedulerProfile) Option { + return func(o *schedulerOptions) { + o.profiles = p + o.applyDefaultProfile = false + } +} + +// WithParallelism sets the parallelism for all scheduler algorithms. Default is 16. +func WithParallelism(threads int32) Option { + return func(o *schedulerOptions) { + o.parallelism = threads + } +} + +// WithPercentageOfNodesToScore sets percentageOfNodesToScore for Scheduler. +// The default value of 0 will use an adaptive percentage: 50 - (num of nodes)/125. +func WithPercentageOfNodesToScore(percentageOfNodesToScore *int32) Option { + return func(o *schedulerOptions) { + if percentageOfNodesToScore != nil { + o.percentageOfNodesToScore = *percentageOfNodesToScore + } + } +} + +// WithFrameworkOutOfTreeRegistry sets the registry for out-of-tree plugins. Those plugins +// will be appended to the default registry. +func WithFrameworkOutOfTreeRegistry(registry frameworkruntime.Registry) Option { + return func(o *schedulerOptions) { + o.frameworkOutOfTreeRegistry = registry + } +} + +// WithPodInitialBackoffSeconds sets podInitialBackoffSeconds for Scheduler, the default value is 1 +func WithPodInitialBackoffSeconds(podInitialBackoffSeconds int64) Option { + return func(o *schedulerOptions) { + o.podInitialBackoffSeconds = podInitialBackoffSeconds + } +} + +// WithPodMaxBackoffSeconds sets podMaxBackoffSeconds for Scheduler, the default value is 10 +func WithPodMaxBackoffSeconds(podMaxBackoffSeconds int64) Option { + return func(o *schedulerOptions) { + o.podMaxBackoffSeconds = podMaxBackoffSeconds + } +} + +// WithPodMaxInUnschedulablePodsDuration sets podMaxInUnschedulablePodsDuration for PriorityQueue. +func WithPodMaxInUnschedulablePodsDuration(duration time.Duration) Option { + return func(o *schedulerOptions) { + o.podMaxInUnschedulablePodsDuration = duration + } +} + +// WithExtenders sets extenders for the Scheduler +func WithExtenders(e ...schedulerapi.Extender) Option { + return func(o *schedulerOptions) { + o.extenders = e + } +} + +// FrameworkCapturer is used for registering a notify function in building framework. +type FrameworkCapturer func(schedulerapi.KubeSchedulerProfile) + +// WithBuildFrameworkCapturer sets a notify function for getting buildFramework details. +func WithBuildFrameworkCapturer(fc FrameworkCapturer) Option { + return func(o *schedulerOptions) { + o.frameworkCapturer = fc + } +} + +var defaultSchedulerOptions = schedulerOptions{ + percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore, + podInitialBackoffSeconds: int64(internalqueue.DefaultPodInitialBackoffDuration.Seconds()), + podMaxBackoffSeconds: int64(internalqueue.DefaultPodMaxBackoffDuration.Seconds()), + podMaxInUnschedulablePodsDuration: internalqueue.DefaultPodMaxInUnschedulablePodsDuration, + parallelism: int32(parallelize.DefaultParallelism), + // Ideally we would statically set the default profile here, but we can't because + // creating the default profile may require testing feature gates, which may get + // set dynamically in tests. Therefore, we delay creating it until New is actually + // invoked. + applyDefaultProfile: true, +} + +// New returns a Scheduler +func New(ctx context.Context, + client clientset.Interface, + informerFactory informers.SharedInformerFactory, + dynInformerFactory dynamicinformer.DynamicSharedInformerFactory, + recorderFactory profile.RecorderFactory, + opts ...Option) (*Scheduler, error) { + + logger := klog.FromContext(ctx) + stopEverything := ctx.Done() + + options := defaultSchedulerOptions + for _, opt := range opts { + opt(&options) + } + + if options.applyDefaultProfile { + var versionedCfg configv1.KubeSchedulerConfiguration + scheme.Scheme.Default(&versionedCfg) + cfg := schedulerapi.KubeSchedulerConfiguration{} + if err := scheme.Scheme.Convert(&versionedCfg, &cfg, nil); err != nil { + return nil, err + } + options.profiles = cfg.Profiles + } + + registry := frameworkplugins.NewInTreeRegistry() + if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil { + return nil, err + } + + metrics.Register() + + extenders, err := buildExtenders(logger, options.extenders, options.profiles) + if err != nil { + return nil, fmt.Errorf("couldn't build extenders: %w", err) + } + + podLister := informerFactory.Core().V1().Pods().Lister() + nodeLister := informerFactory.Core().V1().Nodes().Lister() + + snapshot := internalcache.NewEmptySnapshot() + metricsRecorder := metrics.NewMetricsAsyncRecorder(1000, time.Second, stopEverything) + // waitingPods holds all the pods that are in the scheduler and waiting in the permit stage + waitingPods := frameworkruntime.NewWaitingPodsMap() + + var resourceClaimCache *assumecache.AssumeCache + if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) { + resourceClaimInformer := informerFactory.Resource().V1alpha2().ResourceClaims().Informer() + resourceClaimCache = assumecache.NewAssumeCache(logger, resourceClaimInformer, "ResourceClaim", "", nil) + } + + profiles, err := profile.NewMap(ctx, options.profiles, registry, recorderFactory, + frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion), + frameworkruntime.WithClientSet(client), + frameworkruntime.WithKubeConfig(options.kubeConfig), + frameworkruntime.WithInformerFactory(informerFactory), + frameworkruntime.WithResourceClaimCache(resourceClaimCache), + frameworkruntime.WithSnapshotSharedLister(snapshot), + frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)), + frameworkruntime.WithParallelism(int(options.parallelism)), + frameworkruntime.WithExtenders(extenders), + frameworkruntime.WithMetricsRecorder(metricsRecorder), + frameworkruntime.WithWaitingPods(waitingPods), + ) + if err != nil { + return nil, fmt.Errorf("initializing profiles: %v", err) + } + + if len(profiles) == 0 { + return nil, errors.New("at least one profile is required") + } + + preEnqueuePluginMap := make(map[string][]framework.PreEnqueuePlugin) + queueingHintsPerProfile := make(internalqueue.QueueingHintMapPerProfile) + for profileName, profile := range profiles { + preEnqueuePluginMap[profileName] = profile.PreEnqueuePlugins() + queueingHintsPerProfile[profileName] = buildQueueingHintMap(profile.EnqueueExtensions()) + } + + podQueue := internalqueue.NewSchedulingQueue( + profiles[options.profiles[0].SchedulerName].QueueSortFunc(), + informerFactory, + internalqueue.WithPodInitialBackoffDuration(time.Duration(options.podInitialBackoffSeconds)*time.Second), + internalqueue.WithPodMaxBackoffDuration(time.Duration(options.podMaxBackoffSeconds)*time.Second), + internalqueue.WithPodLister(podLister), + internalqueue.WithPodMaxInUnschedulablePodsDuration(options.podMaxInUnschedulablePodsDuration), + internalqueue.WithPreEnqueuePluginMap(preEnqueuePluginMap), + internalqueue.WithQueueingHintMapPerProfile(queueingHintsPerProfile), + internalqueue.WithPluginMetricsSamplePercent(pluginMetricsSamplePercent), + internalqueue.WithMetricsRecorder(*metricsRecorder), + ) + + for _, fwk := range profiles { + fwk.SetPodNominator(podQueue) + } + + schedulerCache := internalcache.New(ctx, durationToExpireAssumedPod) + + // Setup cache debugger. + debugger := cachedebugger.New(nodeLister, podLister, schedulerCache, podQueue) + debugger.ListenForSignal(ctx) + + sched := &Scheduler{ + Cache: schedulerCache, + client: client, + nodeInfoSnapshot: snapshot, + percentageOfNodesToScore: options.percentageOfNodesToScore, + Extenders: extenders, + StopEverything: stopEverything, + SchedulingQueue: podQueue, + Profiles: profiles, + logger: logger, + } + sched.NextPod = podQueue.Pop + sched.applyDefaultHandlers() + + if err = addAllEventHandlers(sched, informerFactory, dynInformerFactory, resourceClaimCache, unionedGVKs(queueingHintsPerProfile)); err != nil { + return nil, fmt.Errorf("adding event handlers: %w", err) + } + + return sched, nil +} + +// defaultQueueingHintFn is the default queueing hint function. +// It always returns Queue as the queueing hint. +var defaultQueueingHintFn = func(_ klog.Logger, _ *v1.Pod, _, _ interface{}) (framework.QueueingHint, error) { + return framework.Queue, nil +} + +func buildQueueingHintMap(es []framework.EnqueueExtensions) internalqueue.QueueingHintMap { + queueingHintMap := make(internalqueue.QueueingHintMap) + for _, e := range es { + events := e.EventsToRegister() + + // This will happen when plugin registers with empty events, it's usually the case a pod + // will become reschedulable only for self-update, e.g. schedulingGates plugin, the pod + // will enter into the activeQ via priorityQueue.Update(). + if len(events) == 0 { + continue + } + + // Note: Rarely, a plugin implements EnqueueExtensions but returns nil. + // We treat it as: the plugin is not interested in any event, and hence pod failed by that plugin + // cannot be moved by any regular cluster event. + // So, we can just ignore such EventsToRegister here. + + registerNodeAdded := false + registerNodeTaintUpdated := false + for _, event := range events { + fn := event.QueueingHintFn + if fn == nil || !utilfeature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints) { + fn = defaultQueueingHintFn + } + + if event.Event.Resource == framework.Node { + if event.Event.ActionType&framework.Add != 0 { + registerNodeAdded = true + } + if event.Event.ActionType&framework.UpdateNodeTaint != 0 { + registerNodeTaintUpdated = true + } + } + + queueingHintMap[event.Event] = append(queueingHintMap[event.Event], &internalqueue.QueueingHintFunction{ + PluginName: e.Name(), + QueueingHintFn: fn, + }) + } + if registerNodeAdded && !registerNodeTaintUpdated { + // Temporally fix for the issue https://github.com/kubernetes/kubernetes/issues/109437 + // NodeAdded QueueingHint isn't always called because of preCheck. + // It's definitely not something expected for plugin developers, + // and registering UpdateNodeTaint event is the only mitigation for now. + // + // So, here registers UpdateNodeTaint event for plugins that has NodeAdded event, but don't have UpdateNodeTaint event. + // It has a bad impact for the requeuing efficiency though, a lot better than some Pods being stuch in the + // unschedulable pod pool. + // This behavior will be removed when we remove the preCheck feature. + // See: https://github.com/kubernetes/kubernetes/issues/110175 + queueingHintMap[framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeTaint}] = + append(queueingHintMap[framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeTaint}], + &internalqueue.QueueingHintFunction{ + PluginName: e.Name(), + QueueingHintFn: defaultQueueingHintFn, + }, + ) + } + } + return queueingHintMap +} + +// Run begins watching and scheduling. It starts scheduling and blocked until the context is done. +func (sched *Scheduler) Run(ctx context.Context) { + logger := klog.FromContext(ctx) + sched.SchedulingQueue.Run(logger) + + // We need to start scheduleOne loop in a dedicated goroutine, + // because scheduleOne function hangs on getting the next item + // from the SchedulingQueue. + // If there are no new pods to schedule, it will be hanging there + // and if done in this goroutine it will be blocking closing + // SchedulingQueue, in effect causing a deadlock on shutdown. + go wait.UntilWithContext(ctx, sched.ScheduleOne, 0) + + // This is the only added line to start our queue + logger.Info("[FLUXNETES]", "Starting", "queue") + queue, err := fluxnetes.NewQueue(ctx) + if err != nil { + logger.Error(err, "Issue with Fluxnetes queue") + } + stopping := make(chan struct{}) + + // Set the queue on the scheduler for access across interface + sched.Queue = queue + defer sched.Queue.Pool.Close() + + // Make an empty state for now, just for functions + state := framework.NewCycleState() + + // Setup a function to fire when a job event happens + // We probably want to add sched functions here + // Note that we can see queue stats here too: + // https://github.com/riverqueue/river/blob/master/event.go#L67-L71 + waitForJob := func(subscribeChan <-chan *river.Event) { + select { + case event := <-subscribeChan: + if event == nil { + klog.Infof("Channel is closed\n") + return + } + + // Parse event result into type + args := fluxnetes.JobResult{} + json.Unmarshal(event.Job.EncodedArgs[:], &args) + nodes := args.GetNodes() + + if len(nodes) > 0 { + + // TODO(vsoch): if we care about this, get from original schedule + start := time.Now() + podsToActivate := framework.NewPodsToActivate() + + klog.Infof("Got job with state %s and nodes: %s\n", event.Job.State, nodes) + + var pod v1.Pod + err := json.Unmarshal([]byte(args.PodSpec), &pod) + if err != nil { + klog.Errorf("Podspec unmarshall error", err) + } + fwk, _ := sched.frameworkForPod(&pod) + + // Parse the pod into PodInfo + // TODO add back in creation timestamp + podInfo, _ := framework.NewPodInfo(&pod) + queuedInfo := &framework.QueuedPodInfo{ + PodInfo: podInfo, + Timestamp: time.Now(), + } + + // This is temporary because we need to still run the scheduling plugins that are in-tree (core) + // However - we aren't going to use the scheduleResult from here, we will derive our own! + // We eventually want to run this in the function above and provide the same volume, etc. + // information to fluxnetes (fluxion) to take into account. + schedulingCycleCtx, cancel := context.WithCancel(ctx) + defer cancel() + _, queuedInfo, _ = sched.schedulingCycle(schedulingCycleCtx, state, fwk, queuedInfo, start, podsToActivate) + + // We need to run a bind for each pod and node + for _, node := range nodes { + plan := ScheduleResult{SuggestedHost: node} + + // bind the pod to its host asynchronously (we can do this b/c of the assumption step above). + go func() { + bindingCycleCtx, cancel := context.WithCancel(ctx) + defer cancel() + + status := sched.bindingCycle(bindingCycleCtx, state, fwk, plan, queuedInfo, start, podsToActivate) + if !status.IsSuccess() { + sched.handleBindingCycleError(bindingCycleCtx, state, fwk, queuedInfo, start, plan, status) + return + } + // Usually, DonePod is called inside the scheduling queue, + // but in this case, we need to call it here because this Pod won't go back to the scheduling queue. + sched.SchedulingQueue.Done(queuedInfo.Pod.UID) + }() + + } + // assumedPodInfo.Pod should be the Podinfo "QueuedPodInfo" + + klog.Infof("Pod %s", pod) + } else { + klog.Infof("Got job with state %s\n", event.Job.State) + } + + } + } + + // For each channel, defer until we finish up here + for _, channel := range sched.Queue.EventChannels { + defer channel.Function() + waitForJob(channel.Channel) + } + + <-stopping + queue.Stop(ctx) + + <-ctx.Done() + sched.SchedulingQueue.Close() + + logger.Info("[FLUXNETES]", "Stopping", "queue") + err = queue.Stop(ctx) + if err != nil { + logger.Error(err, "Failed to stop Fluxnetes") + } + + // If the plugins satisfy the io.Closer interface, they are closed. + err = sched.Profiles.Close() + if err != nil { + logger.Error(err, "Failed to close plugins") + } +} + +// NewInformerFactory creates a SharedInformerFactory and initializes a scheduler specific +// in-place podInformer. +func NewInformerFactory(cs clientset.Interface, resyncPeriod time.Duration) informers.SharedInformerFactory { + informerFactory := informers.NewSharedInformerFactory(cs, resyncPeriod) + informerFactory.InformerFor(&v1.Pod{}, newPodInformer) + return informerFactory +} + +func buildExtenders(logger klog.Logger, extenders []schedulerapi.Extender, profiles []schedulerapi.KubeSchedulerProfile) ([]framework.Extender, error) { + var fExtenders []framework.Extender + if len(extenders) == 0 { + return nil, nil + } + + var ignoredExtendedResources []string + var ignorableExtenders []framework.Extender + for i := range extenders { + logger.V(2).Info("Creating extender", "extender", extenders[i]) + extender, err := NewHTTPExtender(&extenders[i]) + if err != nil { + return nil, err + } + if !extender.IsIgnorable() { + fExtenders = append(fExtenders, extender) + } else { + ignorableExtenders = append(ignorableExtenders, extender) + } + for _, r := range extenders[i].ManagedResources { + if r.IgnoredByScheduler { + ignoredExtendedResources = append(ignoredExtendedResources, r.Name) + } + } + } + // place ignorable extenders to the tail of extenders + fExtenders = append(fExtenders, ignorableExtenders...) + + // If there are any extended resources found from the Extenders, append them to the pluginConfig for each profile. + // This should only have an effect on ComponentConfig, where it is possible to configure Extenders and + // plugin args (and in which case the extender ignored resources take precedence). + if len(ignoredExtendedResources) == 0 { + return fExtenders, nil + } + + for i := range profiles { + prof := &profiles[i] + var found = false + for k := range prof.PluginConfig { + if prof.PluginConfig[k].Name == noderesources.Name { + // Update the existing args + pc := &prof.PluginConfig[k] + args, ok := pc.Args.(*schedulerapi.NodeResourcesFitArgs) + if !ok { + return nil, fmt.Errorf("want args to be of type NodeResourcesFitArgs, got %T", pc.Args) + } + args.IgnoredResources = ignoredExtendedResources + found = true + break + } + } + if !found { + return nil, fmt.Errorf("can't find NodeResourcesFitArgs in plugin config") + } + } + return fExtenders, nil +} + +type FailureHandlerFn func(ctx context.Context, fwk framework.Framework, podInfo *framework.QueuedPodInfo, status *framework.Status, nominatingInfo *framework.NominatingInfo, start time.Time) + +func unionedGVKs(queueingHintsPerProfile internalqueue.QueueingHintMapPerProfile) map[framework.GVK]framework.ActionType { + gvkMap := make(map[framework.GVK]framework.ActionType) + for _, queueingHints := range queueingHintsPerProfile { + for evt := range queueingHints { + if _, ok := gvkMap[evt.Resource]; ok { + gvkMap[evt.Resource] |= evt.ActionType + } else { + gvkMap[evt.Resource] = evt.ActionType + } + } + } + return gvkMap +} + +// newPodInformer creates a shared index informer that returns only non-terminal pods. +// The PodInformer allows indexers to be added, but note that only non-conflict indexers are allowed. +func newPodInformer(cs clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + selector := fmt.Sprintf("status.phase!=%v,status.phase!=%v", v1.PodSucceeded, v1.PodFailed) + tweakListOptions := func(options *metav1.ListOptions) { + options.FieldSelector = selector + } + informer := coreinformers.NewFilteredPodInformer(cs, metav1.NamespaceAll, resyncPeriod, cache.Indexers{}, tweakListOptions) + + // Dropping `.metadata.managedFields` to improve memory usage. + // The Extract workflow (i.e. `ExtractPod`) should be unused. + trim := func(obj interface{}) (interface{}, error) { + if accessor, err := meta.Accessor(obj); err == nil { + if accessor.GetManagedFields() != nil { + accessor.SetManagedFields(nil) + } + } + return obj, nil + } + informer.SetTransform(trim) + return informer +} diff --git a/src/build/postgres/Dockerfile b/src/build/postgres/Dockerfile new file mode 100644 index 0000000..68d105d --- /dev/null +++ b/src/build/postgres/Dockerfile @@ -0,0 +1,16 @@ +FROM postgres:15.5-bookworm + +ENV DEBIAN_FRONTEND=noninteractive +ENV GO_VERSION=1.22.5 + +RUN apt-get update && apt-get install -y wget && apt clean -y && apt -y autoremove + +# Install go +RUN wget https://go.dev/dl/go${GO_VERSION}.linux-amd64.tar.gz && tar -xvf go${GO_VERSION}.linux-amd64.tar.gz && \ + mv go /usr/local && rm go${GO_VERSION}.linux-amd64.tar.gz +ENV PATH=/usr/local/go/bin:/root/go/bin:$PATH +RUN go install github.com/riverqueue/river/cmd/river@latest && mv /root/go/bin/river /usr/local/bin +# On start need to run: +# runuser -l postgres -c '/usr/local/bin/river migrate-up --database-url postgres://localhost:5432/postgres' +# this is relative to the Makefile that calls build +COPY ./src/build/postgres/create-tables.sql /docker-entrypoint-initdb.d/ diff --git a/src/build/postgres/create-tables.sql b/src/build/postgres/create-tables.sql new file mode 100644 index 0000000..7104941 --- /dev/null +++ b/src/build/postgres/create-tables.sql @@ -0,0 +1,9 @@ +CREATE TABLE pods_provisional ( + podspec TEXT NOT NULL, + namespace TEXT NOT NULL, + name TEXT NOT NULL, + created_at timestamptz NOT NULL default NOW(), + group_name TEXT NOT NULL, + group_size INTEGER NOT NULL +); +CREATE INDEX group_name_index ON pods_provisional (group_name); diff --git a/src/fluxnetes/pkg/fluxion/fluxion.go b/src/fluxnetes/pkg/fluxion/fluxion.go index 9b4a029..070613b 100644 --- a/src/fluxnetes/pkg/fluxion/fluxion.go +++ b/src/fluxnetes/pkg/fluxion/fluxion.go @@ -20,7 +20,7 @@ type Fluxion struct { } // InitFluxion creates a new client to interaction with the fluxion API (via go bindings) -func (fluxion *Fluxion) InitFluxion(policy string, label string) { +func (fluxion *Fluxion) InitFluxion(policy, label string) { fluxion.cli = fluxcli.NewReapiClient() klog.Infof("[Fluxnetes] Created flux resource client %s", fluxion.cli) @@ -29,6 +29,7 @@ func (fluxion *Fluxion) InitFluxion(policy string, label string) { return } + // This file needs to be written for GetResources to read later jgf, err := os.ReadFile(defaults.KubernetesJsonGraphFormat) if err != nil { klog.Error("Error reading JGF") diff --git a/src/fluxnetes/pkg/jgf/jgf.go b/src/fluxnetes/pkg/jgf/jgf.go index ba4d268..070703a 100644 --- a/src/fluxnetes/pkg/jgf/jgf.go +++ b/src/fluxnetes/pkg/jgf/jgf.go @@ -231,12 +231,11 @@ func (g *FluxJGF) InitCluster(name string) (Node, error) { return g.makeNewNode(resource, "", defaultUnit, defaultSize), nil } +// WriteJGF writes the JGF to file +// We need to do this to ensure GetResources can be called to return the graph func (g *FluxJGF) WriteJGF(path string) error { - - encodedJGF, err := json.MarshalIndent(g, "", " ") - + encodedJGF, err := g.ToBytes() if err != nil { - log.Fatalf("[JGF] json.Marshal failed with '%s'\n", err) return err } @@ -254,3 +253,15 @@ func (g *FluxJGF) WriteJGF(path string) error { } return nil } + +// ToString returns the JGF as bytes +func (g *FluxJGF) ToBytes() ([]byte, error) { + encodedJGF, err := json.MarshalIndent(g, "", " ") + + // This is only provided as a meaningful error message, otherwise + // we could just return the above + if err != nil { + log.Fatalf("[JGF] json.Marshal failed with '%s'\n", err) + } + return encodedJGF, nil +} diff --git a/src/fluxnetes/pkg/service-grpc/service.proto b/src/fluxnetes/pkg/service-grpc/service.proto index 64bd79a..6dd508c 100644 --- a/src/fluxnetes/pkg/service-grpc/service.proto +++ b/src/fluxnetes/pkg/service-grpc/service.proto @@ -29,6 +29,4 @@ message GroupResponse { message ResourceRequest {} message ResourceResponse { string graph = 1; -} - - +} \ No newline at end of file diff --git a/src/fluxnetes/pkg/utils/utils.go b/src/fluxnetes/pkg/utils/utils.go index e14de28..9144119 100644 --- a/src/fluxnetes/pkg/utils/utils.go +++ b/src/fluxnetes/pkg/utils/utils.go @@ -69,10 +69,7 @@ func RegisterExisting(clientset *kubernetes.Clientset, ctx context.Context) (map } // CreateInClusterJGF creates the Json Graph Format from the Kubernetes API -// We currently don't have support in fluxion to allocate jobs for existing pods, -// so instead we create the graph with fewer resources. When that support is -// remove the adjustment here, which is more of a hack -func CreateInClusterJGF(filename string, skipLabel string) error { +func CreateInClusterJGF(filename, skipLabel string) error { ctx := context.Background() config, err := rest.InClusterConfig() if err != nil { @@ -231,6 +228,8 @@ func CreateInClusterJGF(filename string, skipLabel string) error { } } fmt.Printf("\nCan request at most %d exclusive cpu", totalAllocCpu) + + // Get the jgf back as bytes, and we will return string err = fluxgraph.WriteJGF(filename) if err != nil { return err @@ -239,8 +238,8 @@ func CreateInClusterJGF(filename string, skipLabel string) error { } // computeTotalRequests sums up the pod requests for the list. We do not consider limits. -func computeTotalRequests(podList *corev1.PodList) (total map[corev1.ResourceName]resource.Quantity) { - total = map[corev1.ResourceName]resource.Quantity{} +func computeTotalRequests(podList *corev1.PodList) map[corev1.ResourceName]resource.Quantity { + total := map[corev1.ResourceName]resource.Quantity{} for _, pod := range podList.Items { podReqs, _ := resourcehelper.PodRequestsAndLimits(&pod) for podReqName, podReqValue := range podReqs { @@ -252,7 +251,7 @@ func computeTotalRequests(podList *corev1.PodList) (total map[corev1.ResourceNam } } } - return + return total } type allocation struct {