Skip to content

Commit

Permalink
Reject GRPC connections on stand-by replicas
Browse files Browse the repository at this point in the history
  • Loading branch information
aledbf committed Aug 18, 2023
1 parent 5c7fa0b commit 3371869
Showing 1 changed file with 36 additions and 13 deletions.
49 changes: 36 additions & 13 deletions components/ws-manager-mk2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ package main

import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"net"
"os"
"sync/atomic"
"time"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
Expand Down Expand Up @@ -59,6 +61,8 @@ var (

scheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")

LeaderInstance atomic.Bool
)

func init() {
Expand Down Expand Up @@ -138,6 +142,11 @@ func main() {

mgrCtx := ctrl.SetupSignalHandler()

go func() {
<-mgr.Elected()
LeaderInstance.Store(true)
}()

maintenanceReconciler, err := controllers.NewMaintenanceReconciler(mgr.GetClient())
if err != nil {
setupLog.Error(err, "unable to create maintenance controller", "controller", "Maintenance")
Expand Down Expand Up @@ -170,18 +179,13 @@ func main() {
os.Exit(1)
}

// Wait for leader election to start the GRPC server
go func() {
<-mgr.Elected()

wsmanService, err := setupGRPCService(cfg, mgr.GetClient(), maintenanceReconciler)
if err != nil {
setupLog.Error(err, "unable to start manager service")
os.Exit(1)
}
wsmanService, err := setupGRPCService(cfg, mgr.GetClient(), maintenanceReconciler)
if err != nil {
setupLog.Error(err, "unable to start manager service")
os.Exit(1)
}

workspaceReconciler.OnReconcile = wsmanService.OnWorkspaceReconcile
}()
workspaceReconciler.OnReconcile = wsmanService.OnWorkspaceReconcile

if err = workspaceReconciler.SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to setup workspace controller with manager", "controller", "Workspace")
Expand Down Expand Up @@ -234,8 +238,27 @@ func setupGRPCService(cfg *config.ServiceConfiguration, k8s client.Client, maint
metrics.Registry.MustRegister(grpcMetrics)

grpcOpts := common_grpc.ServerOptionsWithInterceptors(
[]grpc.StreamServerInterceptor{grpcMetrics.StreamServerInterceptor()},
[]grpc.UnaryServerInterceptor{grpcMetrics.UnaryServerInterceptor(), ratelimits.UnaryInterceptor()},
[]grpc.StreamServerInterceptor{
grpcMetrics.StreamServerInterceptor(),
func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
if LeaderInstance.Load() {
return handler(srv, ss)
}

return fmt.Errorf("Rejecting connection due leader election")
},
},
[]grpc.UnaryServerInterceptor{
grpcMetrics.UnaryServerInterceptor(),
ratelimits.UnaryInterceptor(),
func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
if LeaderInstance.Load() {
return handler(ctx, req)
}

return nil, fmt.Errorf("Rejecting connection due leader election")
},
},
)
if cfg.RPCServer.TLS.CA != "" && cfg.RPCServer.TLS.Certificate != "" && cfg.RPCServer.TLS.PrivateKey != "" {
tlsConfig, err := common_grpc.ClientAuthTLSConfig(
Expand Down

0 comments on commit 3371869

Please sign in to comment.