Skip to content

Commit

Permalink
feat(vm-route-forge): add pprof server (#244)
Browse files Browse the repository at this point in the history
add pprof server
---------
Signed-off-by: yaroslavborbat <[email protected]>
Signed-off-by: Yaroslav Borbat <[email protected]>
Co-authored-by: Ivan Mikheykin <[email protected]>
  • Loading branch information
yaroslavborbat authored Aug 1, 2024
1 parent 41e43ae commit c61eb2e
Show file tree
Hide file tree
Showing 11 changed files with 203 additions and 87 deletions.
8 changes: 8 additions & 0 deletions images/virtualization-artifact/Taskfile.dist.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,14 @@ tasks:
desc: "Wipe local pyroscope for virtualization-controller"
cmd: ./hack/pyroscope.sh wipe --namespace={{ .BaseNamespace }}

pyroscope:local:run:vm-route-forge:
desc: "Run pyroscope locally for vm-route-forge"
cmd: ./hack/pyroscope.sh run --namespace={{ .BaseNamespace }} --service=vm-route-forge --port=8119

pyroscope:local:wipe:vm-route-forge:
desc: "Wipe local pyroscope for vm-route-forge"
cmd: ./hack/pyroscope.sh wipe --namespace={{ .BaseNamespace }}

_copy_d8_registry_secret:
internal: true
vars:
Expand Down
8 changes: 4 additions & 4 deletions images/virtualization-artifact/hack/pyroscope/config.alloy
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ local.file "endpoints" {
// for the Alloy export to.
filename = "/etc/alloy/endpoints.json"
}
pyroscope.write "write_virtualziation_controller" {
pyroscope.write "write_virtualziation" {
endpoint {
url = json_path(local.file.endpoints.content, ".profiles.url")[0]
}
}

pyroscope.scrape "scrape_virtualziation_controller" {
targets = [{"__address__" = "localhost:8081", "service_name" = "virtualziation_controller"}]
forward_to = [pyroscope.write.write_virtualziation_controller.receiver]
pyroscope.scrape "scrape_virtualziation" {
targets = [{"__address__" = "localhost:8081", "service_name" = "virtualziation"}]
forward_to = [pyroscope.write.write_virtualziation.receiver]

profiling_config {
profile.process_cpu {
Expand Down
13 changes: 9 additions & 4 deletions images/vm-route-forge/cmd/vm-route-forge/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Options struct {
Cidrs netutil.CIDRSet
DryRun bool
ProbeAddr string
PprofAddr string
NodeName string
TableID string
}
Expand All @@ -41,14 +42,17 @@ const (
flagCidr, flagCidrShort = "cidr", "c"
flagDryRun, flagDryRunShort = "dry-run", "d"
flagProbeAddr = "health-probe-bind-address"
flagPprofAddr = "pprof-bind-address"
flagVerbosity, flagVerbosityShort = "verbosity", "v"
flagNodeName, flagNodeNameShort = "nodeName", "n"
flagTableId, flagTableIdShort = "tableId", "t"
defaultVerbosity = 1

VerbosityEnv = "VERBOSITY"
NodeNameEnv = "NODE_NAME"
TableIDEnv = "TABLE_ID"
HealthProbeBindAddressEnv = "HEALTH_PROBE_BIND_ADDRESS"
PprofBindAddressEnv = "PPROF_BIND_ADDRESS"
VerbosityEnv = "VERBOSITY"
NodeNameEnv = "NODE_NAME"
TableIDEnv = "TABLE_ID"
)

func NewOptions() Options {
Expand All @@ -62,7 +66,8 @@ func NewOptions() Options {
func (o *Options) Flags(fs *pflag.FlagSet) {
fs.StringSliceVarP((*[]string)(&o.Cidrs), flagCidr, flagCidrShort, []string{}, "CIDRs enabled to route (multiple flags allowed)")
fs.BoolVarP(&o.DryRun, flagDryRun, flagDryRunShort, false, "Don't perform any changes on the node.")
fs.StringVar(&o.ProbeAddr, flagProbeAddr, ":0", "The address the probe endpoint binds to.")
fs.StringVar(&o.ProbeAddr, flagProbeAddr, os.Getenv(HealthProbeBindAddressEnv), "The address the probe endpoint binds to.")
fs.StringVar(&o.PprofAddr, flagPprofAddr, os.Getenv(PprofBindAddressEnv), "The address the pprof endpoint binds to.")
fs.StringVarP(&o.NodeName, flagNodeName, flagNodeNameShort, os.Getenv(NodeNameEnv), "The name of the node.")
fs.StringVarP(&o.TableID, flagTableId, flagTableIdShort, os.Getenv(TableIDEnv), "The id of the table.")
fs.IntVarP(&o.Verbosity, flagVerbosity, flagVerbosityShort, getDefaultVerbosity(), "Verbosity of output")
Expand Down
6 changes: 5 additions & 1 deletion images/vm-route-forge/cmd/vm-route-forge/app/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,13 @@ func run(opts options.Options) error {
}
go routeCtrl.Run(ctx, countWorkersRouteController)

serverOptions := server.Options{
HealthProbeBindAddress: opts.ProbeAddr,
PprofBindAddress: opts.PprofAddr,
}
srv, err := server.NewServer(
kubeClient,
server.Options{HealthProbeBindAddress: opts.ProbeAddr},
serverOptions,
log,
)
if err != nil {
Expand Down
23 changes: 12 additions & 11 deletions images/vm-route-forge/internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,18 @@ limitations under the License.
package cache

import (
"net"
"sync"

"k8s.io/apimachinery/pkg/types"
)

type Cache interface {
GetAddresses(k types.NamespacedName) (Addresses, bool)
GetName(ip string) (types.NamespacedName, bool)
GetName(ip net.IP) (types.NamespacedName, bool)
Set(k types.NamespacedName, addrs Addresses)
DeleteByKey(k types.NamespacedName)
DeleteByIP(ip string)
DeleteByIP(ip net.IP)
}

func NewCache() Cache {
Expand All @@ -50,41 +51,41 @@ func (c *defaultCache) GetAddresses(k types.NamespacedName) (Addresses, bool) {
return res, ok
}

func (c *defaultCache) GetName(ip string) (types.NamespacedName, bool) {
func (c *defaultCache) GetName(ip net.IP) (types.NamespacedName, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
res, ok := c.addrVm[ip]
res, ok := c.addrVm[ip.String()]
return res, ok
}

func (c *defaultCache) Set(k types.NamespacedName, addrs Addresses) {
c.mu.Lock()
defer c.mu.Unlock()
c.vmAddr[k] = addrs
c.addrVm[addrs.VMIP] = k
c.addrVm[addrs.VMIP.String()] = k
}

func (c *defaultCache) DeleteByKey(k types.NamespacedName) {
c.mu.Lock()
defer c.mu.Unlock()
addrs, ok := c.vmAddr[k]
if ok {
delete(c.addrVm, addrs.VMIP)
delete(c.addrVm, addrs.VMIP.String())
}
delete(c.vmAddr, k)
}

func (c *defaultCache) DeleteByIP(ip string) {
func (c *defaultCache) DeleteByIP(ip net.IP) {
c.mu.Lock()
defer c.mu.Unlock()
k, ok := c.addrVm[ip]
k, ok := c.addrVm[ip.String()]
if ok {
delete(c.vmAddr, k)
}
delete(c.addrVm, ip)
delete(c.addrVm, ip.String())
}

type Addresses struct {
NodeIP string
VMIP string
NodeIP net.IP
VMIP net.IP
}
56 changes: 39 additions & 17 deletions images/vm-route-forge/internal/controller/route/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,25 @@ import (
"k8s.io/client-go/util/workqueue"

"vm-route-forge/internal/cache"
"vm-route-forge/internal/netlinkwrap"
)

func NewHostController(queue workqueue.RateLimitingInterface, cidrs []*net.IPNet, cache cache.Cache, log logr.Logger) *HostRouteController {
return &HostRouteController{
queue: queue,
cidrs: cidrs,
cache: cache,
log: log,
queue: queue,
cidrs: cidrs,
cache: cache,
log: log,
routeGet: netlinkwrap.NewFuncs().RouteGet,
}
}

type HostRouteController struct {
queue workqueue.RateLimitingInterface
cidrs []*net.IPNet
cache cache.Cache
log logr.Logger
queue workqueue.RateLimitingInterface
cidrs []*net.IPNet
cache cache.Cache
log logr.Logger
routeGet func(net.IP) ([]netlink.Route, error)
}

func (r *HostRouteController) Run(ctx context.Context) error {
Expand All @@ -65,39 +68,58 @@ func (r *HostRouteController) Run(ctx context.Context) error {
// We monitor updates in the routes and if we find a mismatch with the cache,
// we put the virtual machine in the queue for processing.
func (r *HostRouteController) sync(ru netlink.RouteUpdate) error {
dst := ru.Dst
if dst == nil {
vmIP := ru.Dst.IP
if vmIP == nil {
return nil
}
isManaged, err := r.isManagedIP(dst.IP)
isManaged, err := r.isManagedIP(vmIP)
if err != nil {
return err
}
if !isManaged {
return nil
}
src := ru.Src
ciliumInternalIP := ru.Src

r.log.V(7).Info("Got new RouteUpdate", "value", ru)

key, found := r.cache.GetName(vmIP)

log := r.log.WithValues(
"ciliumInternalIP", ciliumInternalIP,
"inHostVMIP", vmIP.String(),
"virtualMachine", key)
log.Info("Started processing route")

key, found := r.cache.GetName(dst.IP.String())
switch ru.Type {
case unix.RTM_NEWROUTE:
// if the route was added but not added to cache, then do nothing, because we can't get name of vm.
if !found {
break
}
// if the route has been added, but there is no addresses in the cache, then add the VM to the queue.
addrs, found := r.cache.GetAddresses(key)
if !found {
log.Info("The route was added, but there is no addresses in the cache. Add the VM to the queue.")
r.enqueueKey(key)
break
}
// if the route was added, but the addresses from the cache and from the route do not match, then add the VM to the queue.
if addrs.NodeIP != src.String() || addrs.VMIP != dst.String() {
routes, err := r.routeGet(addrs.NodeIP)
if err != nil || len(routes) == 0 {
return fmt.Errorf("failed to get routes: %w", err)
}
ciliumInternalIPByNodeIP := routes[0].Src

if !ciliumInternalIP.Equal(ciliumInternalIPByNodeIP) || !addrs.VMIP.Equal(vmIP) {
log.Info("The route was added, but the addresses from the cache and from the route do not match. Add the VM to the queue.",
"inCacheNodeIP", addrs.NodeIP.String(),
"inCacheVMIP", addrs.VMIP.String(),
"ciliumInternalIPByNodeIP", ciliumInternalIPByNodeIP.String(),
)
r.enqueueKey(key)
}
// if the route was deleted but not deleted from the cache, then add the VM to the queue.
case unix.RTM_DELROUTE:
if found {
log.Info("The route was deleted but not deleted from the cache. Add the VM to the queue.")
r.enqueueKey(key)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func (c *Controller) worker(ctx context.Context) {
}
defer c.queue.Done(key)

if err := c.sync(ctx, key.(string)); err != nil {
if err := c.sync(key.(string)); err != nil {
c.log.Error(err, fmt.Sprintf("re-enqueuing VirtualMachine %v", key))
c.queue.AddRateLimited(key)
} else {
Expand All @@ -265,27 +265,36 @@ func (c *Controller) worker(ctx context.Context) {
}
}

func (c *Controller) sync(ctx context.Context, key string) error {
func (c *Controller) sync(key string) error {
log := c.log.WithValues("virtualmachine", key)
log.Info("Started processing vm")

obj, exists, err := c.vmIndexer.GetByKey(key)
if err != nil {
return err
}
ns, name, _ := strings.Cut(key, "/")
k := types.NamespacedName{Name: name, Namespace: ns}
if !exists {
c.netlinkMgr.DeleteRoute(k, "")
if err = c.netlinkMgr.DeleteRoute(k, ""); err != nil {
log.Error(err, "Failed to delete route")
}
return nil
}
originalVM := obj.(*v1alpha2.VirtualMachine)
vm := originalVM.DeepCopy()
log := c.log.WithValues("virtualmachine", key)
log.Info("Started processing vm")

if vm.GetDeletionTimestamp() != nil {
c.netlinkMgr.DeleteRoute(k, vm.Status.IPAddress)
if err = c.netlinkMgr.DeleteRoute(k, vm.Status.IPAddress); err != nil {
log.Error(err, "Failed to delete toute")
return err
}
return nil
}

c.netlinkMgr.UpdateRoute(ctx, vm)
if err = c.netlinkMgr.UpdateRoute(vm); err != nil {
log.Error(err, "Failed to update route")
return err
}
return nil
}
Loading

0 comments on commit c61eb2e

Please sign in to comment.