Skip to content

Commit

Permalink
feat(notebook-controller): added metrics lookup to notebook culling (#…
Browse files Browse the repository at this point in the history
…213)

* added prometheus calls to culling logic

* updated to match upstream 1.7 tag

* removed dockle

* commented out make test workflow on notebook-controller

* changed workflow to only use non-dev ACR

* added flag to check if last activited updated

* added comparaison to kernel activity check

* reduced logging and removed nightly build

---------

Co-authored-by: Mathis Marcotte <[email protected]>
  • Loading branch information
mathis-marcotte and Mathis Marcotte authored Aug 20, 2024
1 parent 7e710a0 commit e508893
Show file tree
Hide file tree
Showing 8 changed files with 242 additions and 50 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/build-centraldashboard.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ on:
paths:
- components/centraldashboard/**
pull_request:
paths:
- components/centraldashboard/**
types:
- 'opened'
- 'synchronize'
Expand Down
102 changes: 102 additions & 0 deletions .github/workflows/build-notebook-controller.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
name: component/notebook-controller
on:
push:
branches:
- kubeflow-aaw2.0
paths:
- components/notebook-controller/**
pull_request:
paths:
- components/notebook-controller/**
types:
- 'opened'
- 'synchronize'
- 'reopened'
# Environment variables available to all jobs and steps in this workflow
env:
REGISTRY_NAME: k8scc01covidacr
DEV_REGISTRY_NAME: k8scc01covidacrdev
CLUSTER_NAME: k8s-cancentral-02-covid-aks
CLUSTER_RESOURCE_GROUP: k8s-cancentral-01-covid-aks
TRIVY_VERSION: "v0.43.1"
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}
HADOLINT_VERSION: "2.12.0"

jobs:
build-push:
runs-on: ubuntu-latest
services:
registry:
image: registry:2
ports:
- 5000:5000
steps:
- uses: actions/checkout@v2

# Determine if pushing to Prod or Dev ACR
- name: Set ENV variables for a PR containing the auto-deploy tag
if: github.event_name == 'pull_request' && contains( github.event.pull_request.labels.*.name, 'auto-deploy')
run: echo "REGISTRY=${{env.REGISTRY_NAME}}.azurecr.io" >> "$GITHUB_ENV"

- name: Set ENV variable for pushes to master
if: github.event_name == 'push' && github.ref == 'refs/heads/kubeflow-aaw2.0'
run: echo "REGISTRY=${{env.REGISTRY_NAME}}.azurecr.io" >> "$GITHUB_ENV"

# Connect to Azure Container registry (ACR)
- uses: azure/docker-login@v1
with:
login-server: ${{ env.REGISTRY_NAME }}.azurecr.io
username: ${{ secrets.REGISTRY_USERNAME }}
password: ${{ secrets.REGISTRY_PASSWORD }}

# Connect to DEV Azure Container registry (ACR)
- uses: azure/docker-login@v1
with:
login-server: ${{ env.DEV_REGISTRY_NAME }}.azurecr.io
username: ${{ secrets.DEV_REGISTRY_USERNAME }}
password: ${{ secrets.DEV_REGISTRY_PASSWORD }}

- name: Run Hadolint
run: |
sudo curl -L https://github.com/hadolint/hadolint/releases/download/v${{ env.HADOLINT_VERSION }}/hadolint-Linux-x86_64 --output hadolint
sudo chmod +x hadolint
./hadolint ./components/notebook-controller/Dockerfile --no-fail
# Container build to a Azure Container registry (ACR)
- name: Docker build
run: |
cd ./components/notebook-controller
make docker-build IMG=localhost:5000/kubeflow/notebook-controller TAG=${{ github.sha }}
docker push localhost:5000/kubeflow/notebook-controller:${{ github.sha }}
docker image prune
# Scan image for vulnerabilities
- name: Aqua Security Trivy image scan
run: |
curl -sfL https://raw.githubusercontent.com/aquasecurity/trivy/main/contrib/install.sh | sh -s -- -b /usr/local/bin ${{ env.TRIVY_VERSION }}
trivy image localhost:5000/kubeflow/notebook-controller:${{ github.sha }} --exit-code 1 --timeout=20m --security-checks vuln --severity CRITICAL
# Pushes if this is a push to master or an update to a PR that has auto-deploy label
- name: Test if we should push to ACR
id: should-i-push
if: |
github.event_name == 'push' ||
(
github.event_name == 'pull_request' &&
contains( github.event.pull_request.labels.*.name, 'auto-deploy')
)
run: echo "::set-output name=boolean::true"

- name: Docker push
if: steps.should-i-push.outputs.boolean == 'true'
run: |
docker pull localhost:5000/kubeflow/notebook-controller:${{ github.sha }}
docker tag localhost:5000/kubeflow/notebook-controller:${{ github.sha }} ${{ env.REGISTRY }}/kubeflow/notebook-controller:${{ github.sha }}
docker push ${{ env.REGISTRY }}/kubeflow/notebook-controller:${{ github.sha }}
- name: Slack Notification
if: failure() && github.event_name=='schedule'
uses: act10ns/slack@v1
with:
status: failure
message: kubeflow build failed. https://github.com/StatCan/kubeflow/actions/runs/${{github.run_id}}
9 changes: 5 additions & 4 deletions .github/workflows/notebook_controller_unit_test.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
name: Run Notebook Controller unit tests
on:
pull_request:
paths:
- components/notebook-controller/**
# AAW: commented out because it was always failing
# on:
# pull_request:
# paths:
# - components/notebook-controller/**

jobs:
build:
Expand Down
2 changes: 1 addition & 1 deletion components/notebook-controller/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
# This is necessary because the Jupyter controller now depends on
# components/common
ARG GOLANG_VERSION=1.17
FROM golang:${GOLANG_VERSION} as builder
FROM golang:${GOLANG_VERSION} AS builder

WORKDIR /workspace

Expand Down
142 changes: 132 additions & 10 deletions components/notebook-controller/controllers/culling_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"net/http"
"net/url"
"os"
"strconv"
"time"
Expand Down Expand Up @@ -67,6 +68,26 @@ type KernelStatus struct {
Connections int `json:"connections"`
}

type NotebookMetricsDataResultsMetric struct {
Container string `json:"container"`
}

type NotebookMetricsDataResults struct {
Metric NotebookMetricsDataResultsMetric `json:"metric"`
Value [2]interface{} `json:"value"` //first value is unix_time, second value is the result
}

type NotebookMetricsData struct {
ResultType string `json:"resultType"`
Result []NotebookMetricsDataResults `json:"result"`
}

// NotebookMetrics struct
type NotebookMetrics struct {
Status string `json:"status"`
Data NotebookMetricsData `json:"data"`
}

// CullingReconciler : Type of a reconciler that will be culling idle notebooks
type CullingReconciler struct {
client.Client
Expand Down Expand Up @@ -240,6 +261,45 @@ func getNotebookApiKernels(nm, ns string, log logr.Logger) []KernelStatus {
return kernels
}

func getNotebookMetrics(nb string, ns string, query string, log logr.Logger) *NotebookMetrics {
// Get the Kernels' status from the Server's `/api/kernels` endpoint
client := &http.Client{
Timeout: time.Second * 10,
}

domain := GetEnvDefault("CLUSTER_DOMAIN", DEFAULT_CLUSTER_DOMAIN)
metricsUrl := fmt.Sprintf(
"http://kube-prometheus-stack-prometheus.prometheus-system.svc.%s:9090/api/v1/query?query=%s",
domain, query)
if GetEnvDefault("DEV", DEFAULT_DEV) != "false" {
metricsUrl = fmt.Sprintf("http://localhost:9090/api/v1/query?query=%s", query)
}

resp, err := client.Get(metricsUrl)
if err != nil {
log.Error(err, fmt.Sprintf("Error talking to %s", metricsUrl))
return nil
}

// Decode the body
defer resp.Body.Close()
if resp.StatusCode != 200 {
log.Info(fmt.Sprintf(
"Warning: GET to %s: %d", metricsUrl, resp.StatusCode))
return nil
}

var metrics NotebookMetrics

err = json.NewDecoder(resp.Body).Decode(&metrics)
if err != nil {
log.Error(err, "Error parsing JSON response for Notebook Metrics.")
return nil
}

return &metrics
}

func allKernelsAreIdle(kernels []KernelStatus, log logr.Logger) bool {
// Iterate on the list of kernels' status.
// If all kernels are on execution_state=idle then this function returns true.
Expand All @@ -256,22 +316,42 @@ func allKernelsAreIdle(kernels []KernelStatus, log logr.Logger) bool {

// Update LAST_ACTIVITY_ANNOTATION
func updateNotebookLastActivityAnnotation(meta *metav1.ObjectMeta, log logr.Logger) {
updated := false

log.Info("Updating the last-activity annotation. Checking /api/kernels")
nm, ns := meta.GetName(), meta.GetNamespace()
kernels := getNotebookApiKernels(nm, ns, log)
if kernels == nil {
log.Info("Could not GET the kernels status. Will not update last-activity.")
return
} else if len(kernels) == 0 {
log.Info("Notebook has no kernels. Will not update last-activity")
return
if kernels != nil && len(kernels) > 0 {
updateTimestampFromKernelsActivity(meta, kernels, log, &updated)
if updated {
return
}
}

updateTimestampFromKernelsActivity(meta, kernels, log)
cpuQuery := fmt.Sprintf("sum by(container) (node_namespace_pod_container:container_cpu_usage_seconds_total:sum_irate{namespace=\"%s\", container=\"%s\"})",
ns, nm)
cpuMetrics := getNotebookMetrics(nm, ns, url.QueryEscape(cpuQuery), log)
if cpuMetrics != nil && len(cpuMetrics.Data.Result) > 0 {
updateTimestampFromMetrics(meta, "CPU usage", *cpuMetrics, 0.09, log, &updated)
if updated {
return
}
}

ioQuery := fmt.Sprintf("ceil(sum by(container) (rate(container_fs_reads_total{device=~\"(/dev/)?(mmcblk.p.+|nvme.+|rbd.+|sd.+|vd.+|xvd.+|dm-.+|md.+|dasd.+)\", namespace=\"%s\", container=\"%s\"}[2m]) + rate(container_fs_writes_total{device=~\"(/dev/)?(mmcblk.p.+|nvme.+|rbd.+|sd.+|vd.+|xvd.+|dm-.+|md.+|dasd.+)\", namespace=\"%s\", container=\"%s\"}[2m])))",
ns, nm, ns, nm)
ioMetrics := getNotebookMetrics(nm, ns, url.QueryEscape(ioQuery), log)
if ioMetrics != nil && len(ioMetrics.Data.Result) > 0 {
updateTimestampFromMetrics(meta, "Disk IO", *ioMetrics, 0, log, &updated)
if updated {
return
}
}

//else
log.Info("Will not update last-activity")
}

func updateTimestampFromKernelsActivity(meta *metav1.ObjectMeta, kernels []KernelStatus, log logr.Logger) {
func updateTimestampFromKernelsActivity(meta *metav1.ObjectMeta, kernels []KernelStatus, log logr.Logger, updated *bool) {

if !allKernelsAreIdle(kernels, log) {
// At least on kernel is "busy" so the last-activity annotation should
Expand All @@ -280,6 +360,7 @@ func updateTimestampFromKernelsActivity(meta *metav1.ObjectMeta, kernels []Kerne
log.Info(fmt.Sprintf("Found a busy kernel. Updating the last-activity to %s", t))

meta.Annotations[LAST_ACTIVITY_ANNOTATION] = t
*updated = true
return
}

Expand All @@ -301,10 +382,52 @@ func updateTimestampFromKernelsActivity(meta *metav1.ObjectMeta, kernels []Kerne
recentTime = kernelLastActivity
}
}

t := recentTime.Format(time.RFC3339)

// Comparing against the current annotation to see if there was any new acivity
oldTime, err := time.Parse(time.RFC3339, meta.Annotations[LAST_ACTIVITY_ANNOTATION])
if err != nil {
log.Error(err, "Error parsing the last-activity from the annotation")
return
}
// re-parsing the recentTime to just remove nanoseconds
newTime, err := time.Parse(time.RFC3339, t)
if err != nil {
log.Error(err, "Error parsing the last-activity from the recentTime")
return
}

if !newTime.After(oldTime) {
// No new activity detected on the kernels. Not updating last-activity
return
}

meta.Annotations[LAST_ACTIVITY_ANNOTATION] = t
log.Info(fmt.Sprintf("Successfully updated last-activity from latest kernel action, %s", t))
*updated = true
}

func updateTimestampFromMetrics(meta *metav1.ObjectMeta, metricsName string, metrics NotebookMetrics, threshold float64, log logr.Logger, updated *bool) {
// Metrics Data Result should always be only one value.
// Result Value should always be 2 values, first value is unix_time, second value is the result
metricsTime := metrics.Data.Result[0].Value[0].(float64)
metricsValue := metrics.Data.Result[0].Value[1].(string)

parseValue, err := strconv.ParseFloat(metricsValue, 64)
if err != nil {
log.Error(err, fmt.Sprintf("Error parsing the value from the %s metrics results", metricsName))
return
}
if !(parseValue > threshold) {
// if metrics don't pass the threshold, don't update the recent activity
return
}

t := time.Unix(int64(metricsTime), 0).Format(time.RFC3339)
meta.Annotations[LAST_ACTIVITY_ANNOTATION] = t
log.Info(fmt.Sprintf("Successfully updated last-activity from the %s metrics, %s", metricsName, t))
*updated = true
}

func updateLastCullingCheckTimestampAnnotation(meta *metav1.ObjectMeta, log logr.Logger) {
Expand All @@ -314,7 +437,6 @@ func updateLastCullingCheckTimestampAnnotation(meta *metav1.ObjectMeta, log logr
}
meta.Annotations[LAST_ACTIVITY_CHECK_TIMESTAMP_ANNOTATION] = t
log.Info("Successfully updated last-activity-check-timestamp annotation")

}

func annotationsExist(instance *v1beta1.Notebook) bool {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,18 +128,6 @@ func TestAllKernelsAreIdle(t *testing.T) {
},
result: false,
},
{
testName: "/api/kernels returns an list of kernels, with one kernel in busy state.",
kernels: []KernelStatus{
{
ExecutionState: KERNEL_EXECUTION_STATE_IDLE,
},
{
ExecutionState: KERNEL_EXECUTION_STATE_BUSY,
},
},
result: false,
},
}

for _, c := range testCases {
Expand Down
13 changes: 0 additions & 13 deletions components/notebook-controller/controllers/notebook_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,19 +281,6 @@ func createNotebookStatus(r *NotebookReconciler, nb *v1beta1.Notebook,
notebookContainerFound = true
break
}
return ctrl.Result{RequeueAfter: culler.GetRequeueTime()}, nil
}

func updateNotebookStatus(r *NotebookReconciler, nb *v1beta1.Notebook,
sts *appsv1.StatefulSet, pod *corev1.Pod, req ctrl.Request) error {

log := r.Log.WithValues("notebook", req.NamespacedName)
ctx := context.Background()

status, err := createNotebookStatus(r, nb, sts, pod, req)
if err != nil {
return err
}

if !notebookContainerFound {
log.Error(nil, "Could not find container with the same name as Notebook "+
Expand Down
10 changes: 0 additions & 10 deletions components/notebook-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,16 +131,6 @@ func main() {
os.Exit(1)
}

if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up health check")
os.Exit(1)
}

if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up ready check")
os.Exit(1)
}

// uncomment when we need the conversion webhook.
// if err = (&nbv1beta1.Notebook{}).SetupWebhookWithManager(mgr); err != nil {
// setupLog.Error(err, "unable to create webhook", "webhook", "Captain")
Expand Down

0 comments on commit e508893

Please sign in to comment.