Skip to content

Commit

Permalink
Kubernetes source plugin (#985)
Browse files Browse the repository at this point in the history
  • Loading branch information
huseyinbabal authored Mar 2, 2023
1 parent 44f14e2 commit 3e1d8a9
Show file tree
Hide file tree
Showing 89 changed files with 3,724 additions and 3,272 deletions.
10 changes: 10 additions & 0 deletions .goreleaser.plugin.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -74,5 +74,15 @@ builds:
goarch: *goarch
goarm: *goarm

- id: kubernetes
main: cmd/source/kubernetes/main.go
binary: source_kubernetes_{{ .Os }}_{{ .Arch }}

no_unique_dist_dir: true
env: *env
goos: *goos
goarch: *goarch
goarm: *goarm

snapshot:
name_template: 'v{{ .Version }}'
73 changes: 31 additions & 42 deletions cmd/botkube/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,18 @@ import (
"github.com/sirupsen/logrus"
"github.com/spf13/pflag"
"golang.org/x/sync/errgroup"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/discovery"
"k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/utils/strings"
"sigs.k8s.io/controller-runtime/pkg/manager/signals"

"github.com/kubeshop/botkube/internal/analytics"
"github.com/kubeshop/botkube/internal/audit"
"github.com/kubeshop/botkube/internal/command"
"github.com/kubeshop/botkube/internal/graphql"
"github.com/kubeshop/botkube/internal/lifecycle"
"github.com/kubeshop/botkube/internal/loggerx"
Expand All @@ -43,10 +41,8 @@ import (
"github.com/kubeshop/botkube/pkg/controller"
"github.com/kubeshop/botkube/pkg/execute"
"github.com/kubeshop/botkube/pkg/execute/kubectl"
"github.com/kubeshop/botkube/pkg/filterengine"
"github.com/kubeshop/botkube/pkg/httpsrv"
"github.com/kubeshop/botkube/pkg/notifier"
"github.com/kubeshop/botkube/pkg/recommendation"
"github.com/kubeshop/botkube/pkg/sink"
"github.com/kubeshop/botkube/pkg/version"
)
Expand Down Expand Up @@ -129,7 +125,7 @@ func run(ctx context.Context) error {
if err != nil {
return reportFatalError("while loading k8s config", err)
}
dynamicCli, discoveryCli, mapper, err := getK8sClients(kubeConfig)
discoveryCli, err := getK8sClients(kubeConfig)
if err != nil {
return reportFatalError("while getting K8s clients", err)
}
Expand All @@ -145,7 +141,8 @@ func run(ctx context.Context) error {
}

// Health endpoint
healthSrv := newHealthServer(logger.WithField(componentLogFieldKey, "Health server"), conf.Settings.HealthPort)
healthChecker := healthChecker{applicationStarted: false}
healthSrv := newHealthServer(logger.WithField(componentLogFieldKey, "Health server"), conf.Settings.HealthPort, &healthChecker)
errGroup.Go(func() error {
defer analytics.ReportPanicIfOccurs(logger, reporter)
return healthSrv.Serve(ctx)
Expand All @@ -158,9 +155,6 @@ func run(ctx context.Context) error {
return metricsSrv.Serve(ctx)
})

// Set up the filter engine
filterEngine := filterengine.WithAllFilters(logger, dynamicCli, mapper, conf.Filters)

// Kubectl config merger
kcMerger := kubectl.NewMerger(conf.Executors)

Expand All @@ -177,7 +171,7 @@ func run(ctx context.Context) error {
resourceNameNormalizerFunc = resourceNameNormalizer.Normalize
}

cmdGuard := kubectl.NewCommandGuard(logger.WithField(componentLogFieldKey, "Command Guard"), discoveryCli)
cmdGuard := command.NewCommandGuard(logger.WithField(componentLogFieldKey, "Command Guard"), discoveryCli)
commander := kubectl.NewCommander(logger.WithField(componentLogFieldKey, "Commander"), kcMerger, cmdGuard)

runner := &execute.OSCommand{}
Expand All @@ -194,7 +188,6 @@ func run(ctx context.Context) error {
Log: logger.WithField(componentLogFieldKey, "Executor"),
CmdRunner: runner,
Cfg: *conf,
FilterEngine: filterEngine,
KcChecker: kubectl.NewChecker(resourceNameNormalizerFunc),
Merger: kcMerger,
CfgManager: cfgManager,
Expand All @@ -211,8 +204,6 @@ func run(ctx context.Context) error {
return reportFatalError("while creating executor factory", err)
}

router := source.NewRouter(mapper, dynamicCli, logger.WithField(componentLogFieldKey, "Router"))

var (
notifiers []notifier.Notifier
bots = map[string]bot.Bot{}
Expand All @@ -225,8 +216,6 @@ func run(ctx context.Context) error {
for commGroupName, commGroupCfg := range conf.Communications {
commGroupLogger := logger.WithField(commGroupFieldKey, commGroupName)

router.AddCommunicationsBindings(commGroupCfg)

scheduleBot := func(in bot.Bot) {
notifiers = append(notifiers, in)
bots[fmt.Sprintf("%s-%s", commGroupName, in.IntegrationName())] = in
Expand Down Expand Up @@ -352,12 +341,9 @@ func run(ctx context.Context) error {
})
}

recommFactory := recommendation.NewFactory(logger.WithField(componentLogFieldKey, "Recommendations"), dynamicCli)

actionProvider := action.NewProvider(logger.WithField(componentLogFieldKey, "Action Provider"), conf.Actions, executorFactory)
router.AddEnabledActionBindings(conf.Actions)

sourcePluginDispatcher := source.NewDispatcher(logger, notifiers, pluginManager, auditReporter)
sourcePluginDispatcher := source.NewDispatcher(logger, notifiers, pluginManager, actionProvider, reporter, auditReporter)
scheduler := source.NewScheduler(logger, conf, sourcePluginDispatcher)
err = scheduler.Start(ctx)
if err != nil {
Expand All @@ -369,21 +355,14 @@ func run(ctx context.Context) error {
logger.WithField(componentLogFieldKey, "Controller"),
conf,
notifiers,
recommFactory,
filterEngine,
dynamicCli,
mapper,
conf.Settings.InformersResyncPeriod,
router.BuildTable(conf),
actionProvider,
reporter,
statusReporter,
)

if _, err := statusReporter.ReportDeploymentStartup(ctx); err != nil {
return reportFatalError("while reporting botkube startup", err)
}

healthChecker.MarkAsReady()
err = ctrl.Start(ctx)
if err != nil {
return reportFatalError("while starting controller", err)
Expand All @@ -404,17 +383,33 @@ func newMetricsServer(log logrus.FieldLogger, metricsPort string) *httpsrv.Serve
return httpsrv.New(log, addr, router)
}

func newHealthServer(log logrus.FieldLogger, port string) *httpsrv.Server {
func newHealthServer(log logrus.FieldLogger, port string, healthChecker *healthChecker) *httpsrv.Server {
addr := fmt.Sprintf(":%s", port)
router := mux.NewRouter()
router.Handle(healthEndpointName, healthChecker{})
router.Handle(healthEndpointName, healthChecker)
return httpsrv.New(log, addr, router)
}

type healthChecker struct{}
type healthChecker struct {
applicationStarted bool
}

func (h *healthChecker) MarkAsReady() {
h.applicationStarted = true
}

func (healthChecker) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
fmt.Fprint(resp, "ok")
func (h *healthChecker) IsReady() bool {
return h.applicationStarted
}

func (h *healthChecker) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
if h.IsReady() {
resp.WriteHeader(http.StatusOK)
fmt.Fprint(resp, "ok")
} else {
resp.WriteHeader(http.StatusServiceUnavailable)
fmt.Fprint(resp, "unavailable")
}
}

func newAnalyticsReporter(disableAnalytics bool, logger logrus.FieldLogger) (analytics.Reporter, error) {
Expand Down Expand Up @@ -446,20 +441,14 @@ func newAnalyticsReporter(disableAnalytics bool, logger logrus.FieldLogger) (ana
return analyticsReporter, nil
}

func getK8sClients(cfg *rest.Config) (dynamic.Interface, discovery.DiscoveryInterface, meta.RESTMapper, error) {
func getK8sClients(cfg *rest.Config) (discovery.DiscoveryInterface, error) {
discoveryClient, err := discovery.NewDiscoveryClientForConfig(cfg)
if err != nil {
return nil, nil, nil, fmt.Errorf("while creating discovery client: %w", err)
}

dynamicK8sCli, err := dynamic.NewForConfig(cfg)
if err != nil {
return nil, nil, nil, fmt.Errorf("while creating dynamic K8s client: %w", err)
return nil, fmt.Errorf("while creating discovery client: %w", err)
}

discoCacheClient := memory.NewMemCacheClient(discoveryClient)
mapper := restmapper.NewDeferredDiscoveryRESTMapper(discoCacheClient)
return dynamicK8sCli, discoCacheClient, mapper, nil
return discoCacheClient, nil
}

func reportFatalErrFn(logger logrus.FieldLogger, reporter analytics.Reporter, status status.StatusReporter) func(ctx string, err error) error {
Expand Down
19 changes: 19 additions & 0 deletions cmd/source/kubernetes/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package main

import (
"github.com/hashicorp/go-plugin"

"github.com/kubeshop/botkube/internal/source/kubernetes"
"github.com/kubeshop/botkube/pkg/api/source"
)

// version is set via ldflags by GoReleaser.
var version = "dev"

func main() {
source.Serve(map[string]plugin.Plugin{
kubernetes.PluginName: &source.Plugin{
Source: kubernetes.NewSource(version),
},
})
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ require (
github.com/segmentio/analytics-go v3.1.0+incompatible
github.com/sha1sum/aws_signing_client v0.0.0-20200229211254-f7815c59d5c1
github.com/sirupsen/logrus v1.9.0
github.com/slack-go/slack v0.10.4-0.20220606002947-9fd6da5aee56
github.com/slack-go/slack v0.12.1
github.com/spf13/pflag v1.0.5
github.com/spiffe/spire v1.5.3
github.com/stretchr/testify v1.8.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1773,8 +1773,8 @@ github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0=
github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/slack-go/slack v0.10.4-0.20220606002947-9fd6da5aee56 h1:MH0qxpIb/gmsc/MAbsgMNAK3giE5Zd/6gH8yX/4wsrM=
github.com/slack-go/slack v0.10.4-0.20220606002947-9fd6da5aee56/go.mod h1:hlGi5oXA+Gt+yWTPP0plCdRKmjsDxecdHxYQdlMQKOw=
github.com/slack-go/slack v0.12.1 h1:X97b9g2hnITDtNsNe5GkGx6O2/Sz/uC20ejRZN6QxOw=
github.com/slack-go/slack v0.12.1/go.mod h1:hlGi5oXA+Gt+yWTPP0plCdRKmjsDxecdHxYQdlMQKOw=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/assertions v1.0.0/go.mod h1:kHHU4qYBaI3q23Pp3VPrmWhuIUrLW/7eUrw0BU5VaoM=
github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
Expand Down
2 changes: 1 addition & 1 deletion hack/gen-grpc-resources.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ REPO_ROOT_DIR=$(cd "${CURRENT_DIR}/.." && pwd)
readonly CURRENT_DIR
readonly REPO_ROOT_DIR

readonly STABLE_PROTOC_VERSION=3.19.4
readonly STABLE_PROTOC_VERSION=3.20.2
readonly STABLE_PROTOC_GEN_GO_GRPC_VERSION=1.2.0
readonly STABLE_PROTOC_GEN_GO_VERSION=v1.27.1

Expand Down
Loading

0 comments on commit 3e1d8a9

Please sign in to comment.