Skip to content

Commit

Permalink
fix: perf issue with too many API reqs when listing pods in all ns
Browse files Browse the repository at this point in the history
change burst and qps

print

list jobs performance improvement

updates from brad
  • Loading branch information
amandavialva01 authored and stoksc committed Nov 8, 2024
1 parent 5b03599 commit b6e5603
Show file tree
Hide file tree
Showing 4 changed files with 306 additions and 48 deletions.
173 changes: 134 additions & 39 deletions master/internal/rm/kubernetesrm/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/exp/maps"
"golang.org/x/sync/errgroup"
batchV1 "k8s.io/api/batch/v1"
k8sV1 "k8s.io/api/core/v1"
k8error "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -28,6 +29,7 @@ import (
k8sClient "k8s.io/client-go/kubernetes"
typedBatchV1 "k8s.io/client-go/kubernetes/typed/batch/v1"
typedV1 "k8s.io/client-go/kubernetes/typed/core/v1"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
Expand All @@ -51,9 +53,6 @@ import (
"github.com/determined-ai/determined/master/pkg/syncx/waitgroupx"
"github.com/determined-ai/determined/master/pkg/tasks"
"github.com/determined-ai/determined/proto/pkg/apiv1"

// Used to load all auth plugins.
_ "k8s.io/client-go/plugin/pkg/client/auth"
)

const (
Expand Down Expand Up @@ -113,13 +112,15 @@ type jobsService struct {
internalTaskGWConfig *config.InternalTaskGatewayConfig

// System dependencies. Also set in initialization and never modified after.
syslog *logrus.Entry
clientSet k8sClient.Interface
syslog *logrus.Entry
clientSet k8sClient.Interface
// TODO(!!!): Not set in initialization and never changed anymore.. RIP.
podInterfaces map[string]typedV1.PodInterface
configMapInterfaces map[string]typedV1.ConfigMapInterface
jobInterfaces map[string]typedBatchV1.JobInterface
serviceInterfaces map[string]typedV1.ServiceInterface
tcpRouteInterfaces map[string]alphaGateway.TCPRouteInterface
// TODO(!!!): end.

resourceRequestQueue *requestQueue
requestQueueWorkers []*requestProcessingWorker
Expand Down Expand Up @@ -253,6 +254,7 @@ func newJobsService(
}

func (j *jobsService) syncNamespaces(ns []string, hasJSLock bool) error {
// TODO(!!!): Prob one informer per cluster too.
for _, namespace := range ns {
// Since we don't want to do duplicate namespace informers, don't start any
// listeners or informers that have already been added to namespacesWithInformers.
Expand Down Expand Up @@ -348,6 +350,8 @@ func (j *jobsService) startClientSet(namespaces []string) error {
return fmt.Errorf("failed to initialize kubernetes clientSet: %w", err)
}

j.jobInterfaces[""] = j.clientSet.BatchV1().Jobs("")
j.podInterfaces[""] = j.clientSet.CoreV1().Pods("")
for _, ns := range namespaces {
j.podInterfaces[ns] = j.clientSet.CoreV1().Pods(ns)
j.configMapInterfaces[ns] = j.clientSet.CoreV1().ConfigMaps(ns)
Expand Down Expand Up @@ -394,7 +398,17 @@ func readClientConfig(kubeconfigPath string) (*rest.Config, error) {
// and it expects to find files:
// - /var/run/secrets/kubernetes.io/serviceaccount/token
// - /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
return rest.InClusterConfig()
c, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
if c.QPS == 0.0 {
c.QPS = 20
}
if c.Burst == 0 {
c.Burst = 100
}
return c, nil
}

if parts := strings.Split(kubeconfigPath, string(os.PathSeparator)); parts[0] == "~" {
Expand Down Expand Up @@ -1037,7 +1051,7 @@ func (j *jobsService) refreshPodStates(allocationID model.AllocationID) error {
return fmt.Errorf("failed to get namespaces for resource manager: %w", err)
}

for _, pod := range pods.Items {
for _, pod := range pods {
if !slices.Contains(ns, pod.Namespace) {
continue
}
Expand Down Expand Up @@ -1082,20 +1096,40 @@ func (j *jobsService) GetSlot(msg *apiv1.GetSlotRequest) *apiv1.GetSlotResponse
return j.getSlot(msg.AgentId, msg.SlotId)
}

func (j *jobsService) HealthStatus() model.HealthStatus {
j.mu.Lock()
defer j.mu.Unlock()
for _, podInterface := range j.podInterfaces {
_, err := podInterface.List(context.TODO(), metaV1.ListOptions{Limit: 1})
if err != nil {
j.syslog.WithError(err).Error("kubernetes resource manager marked as unhealthy")
return model.Unhealthy
}
return model.Healthy
func (j *jobsService) HealthStatus(ctx context.Context) model.HealthStatus {
if len(j.podInterfaces) == 0 {
logrus.Error("expected podInterface to be non empty")
return model.Unhealthy
}

_, err := j.podInterfaces[""].List(ctx, metaV1.ListOptions{Limit: 1})
if k8error.IsForbidden(err) {
return j.healthStatusFallback(ctx)
} else if err != nil {
return model.Unhealthy
}
return model.Healthy
}

logrus.Error("expected jobInterface to be non empty")
return model.Unhealthy
func (j *jobsService) healthStatusFallback(ctx context.Context) model.HealthStatus {
var g errgroup.Group
for n, podInterface := range j.podInterfaces {
if len(n) == 0 { // TODO: We store a non-namespaced client with key "".
continue
}
g.Go(func() error {
_, err := podInterface.List(ctx, metaV1.ListOptions{Limit: 1})
if err != nil {
return err
}
return nil
})
}
err := g.Wait()
if err != nil {
return model.Unhealthy
}
return model.Healthy
}

func (j *jobsService) startNodeInformer() error {
Expand Down Expand Up @@ -1479,7 +1513,7 @@ func (j *jobsService) releaseAllocationsOnDisabledNode(nodeName string) error {
}

notifiedAllocations := make(map[model.AllocationID]bool)
for _, pod := range pods.Items {
for _, pod := range pods {
jobName, ok := pod.Labels[kubernetesJobNameLabel]
if !ok {
j.syslog.Debugf("found pod when disabling node without %s label", kubernetesJobNameLabel)
Expand Down Expand Up @@ -1572,7 +1606,6 @@ type computeUsageSummary struct {
slotsAvailable int
}

// TODO(!!!): good func comment.
func (j *jobsService) summarizeComputeUsage(poolName string) (*computeUsageSummary, error) {
summary, err := j.summarize()
if err != nil {
Expand Down Expand Up @@ -1901,7 +1934,6 @@ func (j *jobsService) summarizeClusterByNodes() map[string]model.AgentSummary {
}
podByNode[podInfo.nodeName] = append(podByNode[podInfo.nodeName], podInfo)
}

nodeToTasks, taskSlots := j.getNonDetSlots(j.slotType)
summary := make(map[string]model.AgentSummary, len(j.currentNodes))
for _, node := range j.currentNodes {
Expand Down Expand Up @@ -2015,7 +2047,7 @@ func (j *jobsService) getNonDetPods() ([]k8sV1.Pod, error) {
}

var nonDetPods []k8sV1.Pod
for _, p := range allPods.Items {
for _, p := range allPods {
_, isDet := p.Labels[determinedLabel]
_, isDetSystem := p.Labels[determinedSystemLabel]

Expand All @@ -2031,7 +2063,6 @@ func (j *jobsService) getNonDetPods() ([]k8sV1.Pod, error) {
func (j *jobsService) getNonDetSlots(deviceType device.Type) (map[string][]string, map[string]int64) {
nodeToTasks := make(map[string][]string, len(j.currentNodes))
taskSlots := make(map[string]int64)

nonDetPods, err := j.getNonDetPods()
if err != nil {
j.syslog.WithError(err).Warn("getting non determined pods, " +
Expand Down Expand Up @@ -2101,32 +2132,96 @@ func numSlots(slots model.SlotsSummary) int {
func (j *jobsService) listJobsInAllNamespaces(
ctx context.Context, opts metaV1.ListOptions,
) ([]batchV1.Job, error) {
var res []batchV1.Job
for n, i := range j.jobInterfaces {
pods, err := i.List(ctx, opts)
if err != nil {
return nil, fmt.Errorf("error listing pods for namespace %s: %w", n, err)
}
allJobs, err := j.jobInterfaces[""].List(ctx, opts)
if k8error.IsForbidden(err) {
return j.listJobsInAllNamespacesFallback(ctx, opts)
} else if err != nil {
logrus.WithError(err).WithField("function", "listJobsInAllNamespaces").Error("error listing jobs in all namespace")
return nil, err
}

res = append(res, pods.Items...)
namespaces := set.FromKeys(j.jobInterfaces)
var jobsWeCareAbout []batchV1.Job
for _, j := range allJobs.Items {
if namespaces.Contains(j.Namespace) {
jobsWeCareAbout = append(jobsWeCareAbout, j)
}
}
return jobsWeCareAbout, nil
}

func (j *jobsService) listJobsInAllNamespacesFallback(
ctx context.Context,
opts metaV1.ListOptions,
) ([]batchV1.Job, error) {
var g errgroup.Group
var res []batchV1.Job
var resLock sync.Mutex
for n, i := range j.jobInterfaces {
g.Go(func() error {
pods, err := i.List(ctx, opts)
if err != nil {
return fmt.Errorf("error listing pods for namespace %s: %w", n, err)
}
resLock.Lock()
res = append(res, pods.Items...)
resLock.Unlock()
return nil
})
}
err := g.Wait()
if err != nil {
return nil, err
}
return res, nil
}

func (j *jobsService) listPodsInAllNamespaces(
ctx context.Context, opts metaV1.ListOptions,
) (*k8sV1.PodList, error) {
res := &k8sV1.PodList{}
for n, i := range j.podInterfaces {
pods, err := i.List(ctx, opts)
if err != nil {
return nil, fmt.Errorf("error listing pods for namespace %s: %w", n, err)
}
) ([]k8sV1.Pod, error) {
allPods, err := j.podInterfaces[""].List(ctx, opts)
if k8error.IsForbidden(err) {
return j.listPodsInAllNamespacesFallback(ctx, opts)
} else if err != nil {
return nil, err
}

res.Items = append(res.Items, pods.Items...)
namespaces := set.FromKeys(j.podInterfaces)
var podsWeWant []k8sV1.Pod
for _, pod := range allPods.Items {
if namespaces.Contains(pod.Namespace) {
podsWeWant = append(podsWeWant, pod)
}
}
return podsWeWant, nil
}

func (j *jobsService) listPodsInAllNamespacesFallback(
ctx context.Context,
opts metaV1.ListOptions,
) ([]k8sV1.Pod, error) {
var g errgroup.Group
var res []k8sV1.Pod
var resLock sync.Mutex
for n, podInterface := range j.podInterfaces {
if len(n) == 0 {
continue
}
g.Go(func() error {
pods, err := podInterface.List(ctx, opts)
if err != nil {
return fmt.Errorf("error listing pods for namespace %s: %w", n, err)
}
resLock.Lock()
res = append(res, pods.Items...)
resLock.Unlock()
return nil
})
}
err := g.Wait()
if err != nil {
return nil, err
}
return res, nil
}

Expand Down
Loading

0 comments on commit b6e5603

Please sign in to comment.