Skip to content

Commit

Permalink
[YUNIKORN-2844] Inject event recorder externally (#922)
Browse files Browse the repository at this point in the history
Closes: #922

Signed-off-by: Craig Condit <[email protected]>
  • Loading branch information
pbacsko authored and craigcondit committed Oct 9, 2024
1 parent f79d10c commit 4d6b4de
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 36 deletions.
39 changes: 9 additions & 30 deletions pkg/common/events/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,43 +19,22 @@
package events

import (
"sync"
"sync/atomic"

"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/events"

"github.com/apache/yunikorn-k8shim/pkg/client"
"github.com/apache/yunikorn-k8shim/pkg/common/constants"
"github.com/apache/yunikorn-k8shim/pkg/conf"
"github.com/apache/yunikorn-k8shim/pkg/locking"
)

var eventRecorder events.EventRecorder = events.NewFakeRecorder(1024)
var once sync.Once
var lock locking.RWMutex
var eventRecorder atomic.Pointer[events.EventRecorder]

func init() {
r := events.EventRecorder(NewMockedRecorder())
eventRecorder.Store(&r)
}

func GetRecorder() events.EventRecorder {
lock.Lock()
defer lock.Unlock()
once.Do(func() {
// note, the initiation of the event recorder requires on a workable Kubernetes client,
// in test mode we should skip this and just use a fake recorder instead.
configs := conf.GetSchedulerConf()
if !configs.IsTestMode() {
k8sClient := client.NewKubeClient(configs.KubeConfig)
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{
Interface: k8sClient.GetClientSet().EventsV1()})
eventBroadcaster.StartRecordingToSink(make(<-chan struct{}))
eventRecorder = eventBroadcaster.NewRecorder(scheme.Scheme, constants.SchedulerName)
}
})

return eventRecorder
return *eventRecorder.Load()
}

func SetRecorder(recorder events.EventRecorder) {
lock.Lock()
defer lock.Unlock()
eventRecorder = recorder
once.Do(func() {}) // make sure Do() doesn't fire elsewhere
eventRecorder.Store(&recorder)
}
7 changes: 1 addition & 6 deletions pkg/common/events/recorder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,10 @@ import (
"testing"

"gotest.tools/v3/assert"

"github.com/apache/yunikorn-k8shim/pkg/conf"
)

func TestInit(t *testing.T) {
// simply test the get won't fail
// which means the get function honors the testMode and
// skips initiating a real event recorder
conf.GetSchedulerConf().SetTestMode(true)
recorder := GetRecorder()
assert.Equal(t, reflect.TypeOf(recorder).String(), "*events.FakeRecorder")
assert.Equal(t, reflect.TypeOf(recorder).String(), "*events.MockedRecorder")
}
17 changes: 17 additions & 0 deletions pkg/shim/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,20 @@
package shim

import (
ctx "context"
"time"

"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/scheme"
k8events "k8s.io/client-go/tools/events"

"github.com/apache/yunikorn-k8shim/pkg/cache"
"github.com/apache/yunikorn-k8shim/pkg/client"
"github.com/apache/yunikorn-k8shim/pkg/common/constants"
"github.com/apache/yunikorn-k8shim/pkg/common/events"
"github.com/apache/yunikorn-k8shim/pkg/common/utils"
"github.com/apache/yunikorn-k8shim/pkg/conf"
"github.com/apache/yunikorn-k8shim/pkg/dispatcher"
Expand Down Expand Up @@ -67,6 +72,18 @@ func NewShimScheduler(scheduler api.SchedulerAPI, configs *conf.SchedulerConf, b
apiFactory := client.NewAPIFactory(scheduler, informerFactory, configs, false)
context := cache.NewContextWithBootstrapConfigMaps(apiFactory, bootstrapConfigMaps)
rmCallback := cache.NewAsyncRMCallback(context)

eventBroadcaster := k8events.NewBroadcaster(&k8events.EventSinkImpl{
Interface: kubeClient.GetClientSet().EventsV1()})
err := eventBroadcaster.StartRecordingToSinkWithContext(ctx.Background())
if err != nil {
log.Log(log.Shim).Error("Could not create event broadcaster",
zap.Error(err))
} else {
eventRecorder := eventBroadcaster.NewRecorder(scheme.Scheme, constants.SchedulerName)
events.SetRecorder(eventRecorder)
}

return newShimSchedulerInternal(context, apiFactory, rmCallback)
}

Expand Down

0 comments on commit 4d6b4de

Please sign in to comment.