Skip to content

Commit

Permalink
feat: new queue to handle groups
Browse files Browse the repository at this point in the history
This changeset adds a new queue to the fluxnetes in-tree plugin,
which currently knows how to accept a pod for work, and then
just sleep (basically reschedule for 5 seconds into the future).
This is not currently hooked into Kubernetes scheduling because
I want to develop the functionality I need first, in parallel,
before splicing it in. I should still be able to schedule to
Fluxion and trigger cleanup when the actual job is done. I
think we might do better to remove the group CRD too - it would
hugely simplify things (the in-tree plugin would barely need
anything aside from the fluxion interactions and queue) and
instead we can keep track of group names and counts (that are
still growing) in a separate table, since we already have postgres.
Two things I am not sure about include 1. the extent to which
in-tree plugins support scheduling. I can either keep them (and
then would need to integrate) or have their functionality move
into what fluxion can offer. I suspect they add supplementary
features since we were able to disable most of them. The second
thing I am not sure about (I will figure out) is, given that
we customize the plugin framework, where the right place to
put sort is. If we are adding pods to a table we will need to
store the same metadata (priority, timestamp, etc) to allow
for this equivalent sort.

Signed-off-by: vsoch <[email protected]>
  • Loading branch information
vsoch committed Jul 26, 2024
1 parent 2c1a903 commit 8047000
Show file tree
Hide file tree
Showing 18 changed files with 933 additions and 36 deletions.
6 changes: 4 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ ENV ARCH=${ARCH}
# but since we are adding custom kube-scheduler, and we don't need the controller
# I moved the build logic up here instead of using hack/build-images.sh

RUN apt-get update && apt-get install -y wget git vim build-essential iputils-ping
RUN apt-get update && apt-get install -y wget git vim build-essential iputils-ping postgresql-client curl

# Install Go
ENV GO_VERSION=1.22.2
ENV GO_VERSION=1.22.5
RUN wget https://go.dev/dl/go${GO_VERSION}.linux-amd64.tar.gz && tar -xvf go${GO_VERSION}.linux-amd64.tar.gz && \
mv go /usr/local && rm go${GO_VERSION}.linux-amd64.tar.gz

Expand All @@ -28,6 +28,8 @@ COPY ${K8S_UPSTREAM} .
RUN go get github.com/patrickmn/go-cache && \
go get sigs.k8s.io/controller-runtime/pkg/client && \
go get sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1 && \
go get github.com/riverqueue/river && \
go get github.com/riverqueue/river/riverdriver/riverpgxv5 && \
go work vendor && \
make WHAT=cmd/kube-scheduler && \
cp /go/src/k8s.io/kubernetes/_output/local/go/bin/kube-scheduler /bin/kube-scheduler
Expand Down
8 changes: 6 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ ARCH ?= amd64
# These are passed to build the sidecar
REGISTRY ?= ghcr.io/flux-framework
SIDECAR_IMAGE ?= fluxnetes-sidecar:latest
POSTGRES_IMAGE ?= fluxnetes-postgres:latest
SCHEDULER_IMAGE ?= fluxnetes

.PHONY: all build build-sidecar clone update push push-sidecar push-fluxnetes
.PHONY: all build build-sidecar clone update push push-sidecar push-fluxnetes build-postgres

all: prepare build-sidecar build
all: prepare build-sidecar build build-postgres

upstreams:
mkdir -p $(UPSTREAMS)
Expand Down Expand Up @@ -48,4 +49,7 @@ push-fluxnetes:
build-sidecar:
make -C ./src LOCAL_REGISTRY=${REGISTRY} LOCAL_IMAGE=${SIDECAR_IMAGE}

build-postgres:
docker build -f src/build/postgres/Dockerfile -t ${REGISTRY}/${POSTGRES_IMAGE} .

push: push-sidecar push-fluxnetes
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

![docs/images/fluxnetes.png](docs/images/fluxnetes.png)

Fluxnetes enables is a combination of Kubernetes and [Fluence](https://github.com/flux-framework/flux-k8s), both of which use the HPC-grade pod scheduling [Fluxion scheduler](https://github.com/flux-framework/flux-sched) to schedule pod groups to nodes.
Fluxnetes enables is a combination of Kubernetes and [Fluence](https://github.com/flux-framework/flux-k8s), both of which use the HPC-grade pod scheduling [Fluxion scheduler](https://github.com/flux-framework/flux-sched) to schedule pod groups to nodes. For our queue, we use [river](https://riverqueue.com/docs) backed by a Postgres database. The database is deployed alongside fluence and could be customized to use an operator instead.

**Important** This is an experiment, and is under development. I will change this design a million times - it's how I tend to learn and work. I'll share updates when there is something to share. It deploys but does not work yet!

Expand Down Expand Up @@ -37,14 +37,15 @@ Then you can deploy as follows:
```bash
./hack/quick-build-kind.sh
```
You'll then have the fluxnetes service running, along with the scheduler plugins controller, which we
You'll then have the fluxnetes service running, a postgres database (for the job queue), along with the scheduler plugins controller, which we
currently have to use PodGroup.

```bash
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
fluxnetes-66575b59d8-ghx8h 2/2 Running 0 8m53s
scheduler-plugins-controller-8676df7769-ss9kz 1/1 Running 0 8m53s
fluxnetes-6954cdcf64-gv7s7 2/2 Running 0 87s
postgres-c8d55999c-t6dtt 1/1 Running 0 87s
scheduler-plugins-controller-8676df7769-jvtwp 1/1 Running 0 87s
```

You can then create a job:
Expand Down
22 changes: 18 additions & 4 deletions chart/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,17 @@ spec:
- command:
- /bin/kube-scheduler
- --config=/etc/kubernetes/scheduler-config.yaml
env:
- name: DATABASE_URL
value: postgres://postgres:postgres@postgres:5432/postgres
- name: PGHOST
value: postgres
- name: PGDATABASE
value: postgres
- name: PGPORT
value: "5432"
- name: PGPASSWORD
value: postgres
image: {{ .Values.scheduler.image }}
imagePullPolicy: {{ .Values.scheduler.pullPolicy }}
livenessProbe:
Expand All @@ -76,10 +87,13 @@ spec:
initialDelaySeconds: 15
name: scheduler
readinessProbe:
httpGet:
path: /healthz
port: 10259
scheme: HTTPS
exec:
command:
- "sh"
- "-c"
- >
status=$(curl -ks https://localhost:10259/healthz); if [ "$status" -ne "ok" ]; then exit 1; fi
pg_isready -d postgres -h postgres -p 5432 -U postgres;
resources:
requests:
cpu: '0.1'
Expand Down
63 changes: 63 additions & 0 deletions chart/templates/postgres.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Note: This is intended for development/test deployments
apiVersion: apps/v1
kind: Deployment
metadata:
name: postgres
namespace: {{ .Release.Namespace }}
spec:
replicas: 1
selector:
matchLabels:
app: postgres
template:
metadata:
labels:
app: postgres
spec:
containers:
- name: postgres
image: {{ .Values.postgres.image }}
imagePullPolicy: {{ .Values.postgres.pullPolicy }}
ports:
- name: postgres-port
containerPort: 5432
env:
- name: POSTGRES_USER
value: postgres
- name: POSTGRES_PASSWORD
value: postgres
- name: POSTGRES_DB
value: postgres
resources:
requests:
memory: "1Gi"
cpu: "500m"
limits:
memory: "1Gi"
cpu: "2"
readinessProbe:
exec:
command:
- "sh"
- "-c"
- >
pg_isready -q -d postgres -U postgres;
runuser -l postgres -c '/usr/local/bin/river migrate-up --database-url postgres://localhost:5432/postgres > /tmp/post-start.log'
initialDelaySeconds: 5
periodSeconds: 10
timeoutSeconds: 5
successThreshold: 1
failureThreshold: 3
---
apiVersion: v1
kind: Service
metadata:
name: postgres
namespace: {{ .Release.Namespace }}
spec:
selector:
app: postgres
ports:
- protocol: TCP
port: 5432
targetPort: 5432
4 changes: 4 additions & 0 deletions chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ scheduler:
pullPolicy: Always
leaderElect: false

database:
image: ghcr.io/flux-framework/fluxnetes-postgres:latest
pullPolicy: Always

# The sidecar is explicitly the fluxion service. I'd like to
# simplify this to use fluxion as a service
sidecar:
Expand Down
5 changes: 4 additions & 1 deletion hack/quick-build-kind.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ make REGISTRY=${REGISTRY} SCHEDULER_IMAGE=fluxnetes SIDECAR_IMAGE=fluxnetes-side
# We load into kind so we don't need to push/pull and use up internet data ;)
kind load docker-image ${REGISTRY}/fluxnetes-sidecar:latest
kind load docker-image ${REGISTRY}/fluxnetes:latest
kind load docker-image ${REGISTRY}/fluxnetes-postgres:latest

# And then install using the charts. The pull policy ensures we use the loaded ones
helm uninstall fluxnetes || true
helm install \
--set postgres.image=${REGISTRY}/fluxnetes-postgres:latest \
--set scheduler.image=${REGISTRY}/fluxnetes:latest \
--set sidecar.image=${REGISTRY}/fluxnetes-sidecar:latest \
--set postgres.pullPolicy=Never \
--set scheduler.pullPolicy=Never \
--set sidecar.pullPolicy=Never \
--set sidecar.image=${REGISTRY}/fluxnetes-sidecar:latest \
fluxnetes chart/
2 changes: 2 additions & 0 deletions kubernetes/pkg/fluxnetes/core/flux.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ func (podGroupManager *PodGroupManager) AskFlux(
_, cancel := context.WithTimeout(context.Background(), 200*time.Second)
defer cancel()

// TODO: get a response, and if the response is "cannot allocate now"
// we need to just save to our data structure for later (service)
request := &pb.MatchRequest{
Ps: jobspec,
Request: "allocate",
Expand Down
8 changes: 4 additions & 4 deletions kubernetes/pkg/fluxnetes/fluxnetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ const (
Name = "Fluxnetes"
)

func (fluxnetes *Fluxnetes) Name() string {
return Name
}

// Initialize and return a new Fluxnetes Scheduler Plugin
func New(_ context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) {

Expand Down Expand Up @@ -135,10 +139,6 @@ func New(_ context.Context, obj runtime.Object, handle framework.Handle) (framew
return plugin, err
}

func (fluxnetes *Fluxnetes) Name() string {
return Name
}

// Fluxnetes has added delete, although I wonder if update includes that signal
// and it's redundant?
func (fluxnetes *Fluxnetes) EventsToRegister() []framework.ClusterEventWithHint {
Expand Down
4 changes: 0 additions & 4 deletions kubernetes/pkg/fluxnetes/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package logger
// It's a bad design, but will work for debugging.

import (
"fmt"
"log"
"os"
)
Expand Down Expand Up @@ -77,9 +76,6 @@ func (l *DebugLogger) log(level int, prefix string, message ...any) error {
prolog = prefix + " " + prolog
}
rest := message[1:]

// msg := fmt.Sprintf(message...)
fmt.Printf("Compariing level %d <= %d\n", level, l.level)
if level <= l.level {
logger.Printf(prolog, rest...)
}
Expand Down
Loading

0 comments on commit 8047000

Please sign in to comment.