Kcache is a kubernetes object data source similar to k8s.io/client-go/tools/cache which uses channels to create a flexible event-based toolkit. Features include typed producers, joining between multiple producers, and (re)filtering.
Kcache was originally created to drive a Kubernetes monitoring application and it currently powers kail.
Using kcache involves creating controllers to manage dynamic object sets with the kubernetes API. The monitored objects are cached and events about changing state are broadcast to subscribers.
Each controller represents a single kubernetes watch stream. There can be any number of subscribers to each controller, and subscribers can be publishers themselves.
controller, err := kcache.NewController(ctx,log,client)
// wait for the initial sync to be complete
<-controller.Ready()
fmt.Println("controller has been synced")
Controllers maintain a cache of the objects being watched.
// fetch the pod named 'pod-1' in the namespace 'default' from the cache.
pod, err := controller.Cache().Get("default","pod-1")
There are many ways to subscribe to a controller's events, the most basic is a simple channel-based subscription:
sub, err := controller.Subscribe()
<-sub.Ready()
// fetch cached list of objects
sub.Cache().List()
for event := range sub.Events() {
// handle add/update/delete event for objects
}
In addition to channels, callbacks can be used to handle events
handler := kcache.BuildHandler().
OnInitialize(func(objs []metav1.Object) { /* ... */ }).
OnCreate(func(obj metav1.Object){ /* ... */ }).
OnUpdate(func(obj metav1.Object){ /* ... */ }).
OnDelete(func(obj metav1.Object){ /* ... */ }).
Create()
controller
kcache.NewMonitor(controller,handler)
Typed controllers and subscribers are available to reduce the need for casting objects. Each type has all of the features of the untyped system (channels,callbacks, filtering, caches, etc...)
controller, err := pod.NewController(ctx,log,client,"default")
sub, err := controller.Subescribe()
...
Currently implemented types are:
- Pod
- Node
- Event
- Secret
- Service
- Ingress
- Daemonset
- ReplicaSet
- Deployment
- ReplicationController
The cache and events that are be exposed to a subscription can be limited by a filter object
The following will return a subscription that only sees the pod named "default/pod-1" pod in its cache and events:
sub, err := controller.SubscribeWithFilter(filter.NSName("default","pod-1"))
Additionally, new publishers can be created with filters. In the following example,
sub_a
will only receive events about "default/pod-1" and sub_b
will only receive events about "default/pod-2"
pub_a, err := controller.CloneWithFilter(filter.NSName("default","pod-1"))
pub_b, err := controller.CloneWithFilter(filter.NSName("default","pod-2"))
sub_a, err := pub_a.Subscribe()
sub_b, err := pub_b.Subscribe()
The filter used for filtered publishers and subscribers can be changed at any time. The cache for each will readjust and CREATE
, DELETE
events will be emitted as necessary.
In the example below, if the pods "default/pod-1" and "default/pod-2" exist, sub_a
will receive a delete event for "default/pod-1" and a create event for "default/pod-2"
pub_a, err := controller.CloneWithFilter(filter.NSName("default","pod-1"))
sub_a, err := pub_a.Subscribe()
<-sub_a.Ready()
go func() {
for evt := sub_a.Events() {
fmt.Println(evt)
}
}()
pub_a.Refilter(filter.NSName("default","pod-2"))
Refiltering allows for joining between different publishers. The join is dynamic -- as the objects of the joined publisher changes, so does the set of objects in the resulting publisher.
In the example below, sub
will only know about pods that are targeted by the "default/frontend" service.
pods, err := pod.NewController(/*...*/)
services, err := service.NewController(/*...*/)
frontend, err := services.CloneWithFilter(filter.NSName("default","frontend"))
sub, err := join.ServicePods(ctx,frontend,pods)
<- sub.Ready()
for evt := range sub.Events() {
/* ... */
}
Joining can be done by hand but there are a number of utility joins available:
ServicePods()
- restrict pods to those that match the services available in the given publisher.RCPods()
- restrict pods to those that match the replication controllers in the given publisher.RSPods()
- restrict pods to those that match the replica sets in the given publisher.DeploymentPods()
- restrict pods to those that match the deployments in the given publisher.DaemonSetPods()
- restrict pods to those that match the daemonsets in the given publisher.IngressServices()
- restrict services to those that match the ingresses in the given publisher.IngressPods()
- restrict pods to those that match the services which match the ingresses in the given publisher (double join)