Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add report - connection init (botVer, k8sVer) #1257

Merged
merged 4 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ linters-settings:
local-prefixes: github.com/kubeshop/botkube
gocyclo:
# https://github.com/kubeshop/botkube/issues/745
min-complexity: 50
min-complexity: 51
revive:
rules:
# Disable warns about capitalized and ended with punctuation error messages
Expand Down
68 changes: 36 additions & 32 deletions cmd/botkube-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ func run(ctx context.Context) (err error) {
deployClient = remote.NewDeploymentClient(gqlClient)
}

statusReporter := status.GetReporter(remoteCfgEnabled, gqlClient, deployClient, nil)
if err = statusReporter.ReportDeploymentConnectionInit(ctx, ""); err != nil {
return fmt.Errorf("while reporting botkube connection initialization %w", err)
}

cfgProvider := intconfig.GetProvider(remoteCfgEnabled, deployClient)
configs, cfgVersion, err := cfgProvider.Configs(ctx)
if err != nil {
Expand All @@ -100,15 +105,10 @@ func run(ctx context.Context) (err error) {
if err != nil {
return fmt.Errorf("while merging app configuration: %w", err)
}

logger := loggerx.New(conf.Settings.Log)
if confDetails.ValidateWarnings != nil {
logger.Warnf("Configuration validation warnings: %v", confDetails.ValidateWarnings.Error())
}

statusReporter := status.GetReporter(remoteCfgEnabled, logger, gqlClient, deployClient, cfgVersion)
auditReporter := audit.GetReporter(remoteCfgEnabled, logger, gqlClient)

// Set up analytics reporter
reporter, err := getAnalyticsReporter(conf.Analytics.Disable, logger)
if err != nil {
Expand All @@ -125,6 +125,34 @@ func run(ctx context.Context) (err error) {
defer analytics.ReportPanicIfOccurs(logger, reporter)

reportFatalError := reportFatalErrFn(logger, reporter, statusReporter)
// Prepare K8s clients and mapper
kubeConfig, err := kubex.BuildConfigFromFlags("", conf.Settings.Kubeconfig, conf.Settings.SACredentialsPathPrefix)
if err != nil {
return reportFatalError("while loading k8s config", err)
}
discoveryCli, err := getK8sClients(kubeConfig)
if err != nil {
return reportFatalError("while getting K8s clients", err)
}

// Register current anonymous identity
k8sCli, err := kubernetes.NewForConfig(kubeConfig)
if err != nil {
return reportFatalError("while creating K8s clientset", err)
}
botkubeVersion, k8sVer, err := findVersions(k8sCli)
if err = statusReporter.ReportDeploymentConnectionInit(ctx, k8sVer); err != nil {
return reportFatalError("while reporting botkube connection initialization", err)
}
err = reporter.RegisterCurrentIdentity(ctx, k8sCli, remoteCfg.Identifier)
if err != nil {
return reportFatalError("while registering current identity", err)
}

statusReporter.SetLogger(logger)
statusReporter.SetResourceVersion(cfgVersion)
auditReporter := audit.GetReporter(remoteCfgEnabled, logger, gqlClient)

ctx, cancel := context.WithCancel(ctx)
errGroup, ctx := errgroup.WithContext(ctx)
defer func() {
Expand Down Expand Up @@ -162,26 +190,6 @@ func run(ctx context.Context) (err error) {
}
defer pluginManager.Shutdown()

// Prepare K8s clients and mapper
kubeConfig, err := kubex.BuildConfigFromFlags("", conf.Settings.Kubeconfig, conf.Settings.SACredentialsPathPrefix)
if err != nil {
return reportFatalError("while loading k8s config", err)
}
discoveryCli, err := getK8sClients(kubeConfig)
if err != nil {
return reportFatalError("while getting K8s clients", err)
}

// Register current anonymous identity
k8sCli, err := kubernetes.NewForConfig(kubeConfig)
if err != nil {
return reportFatalError("while creating K8s clientset", err)
}
err = reporter.RegisterCurrentIdentity(ctx, k8sCli, remoteCfg.Identifier)
if err != nil {
return reportFatalError("while registering current identity", err)
}

// Health endpoint
healthChecker := healthChecker{applicationStarted: false}
healthSrv := newHealthServer(logger.WithField(componentLogFieldKey, "Health server"), conf.Settings.HealthPort, &healthChecker)
Expand All @@ -198,10 +206,6 @@ func run(ctx context.Context) (err error) {
})

cmdGuard := command.NewCommandGuard(logger.WithField(componentLogFieldKey, "Command Guard"), discoveryCli)
botkubeVersion, err := findVersions(k8sCli)
if err != nil {
return reportFatalError("while fetching versions", err)
}
// Create executor factory
cfgManager := config.NewManager(remoteCfgEnabled, logger.WithField(componentLogFieldKey, "Config manager"), conf.Settings.PersistentConfig, cfgVersion, k8sCli, gqlClient, deployClient)
executorFactory, err := execute.NewExecutorFactory(
Expand Down Expand Up @@ -586,16 +590,16 @@ func sendHelp(ctx context.Context, s *storage.Help, clusterName string, executor
return s.MarkHelpAsSent(ctx, sent)
}

func findVersions(cli *kubernetes.Clientset) (string, error) {
func findVersions(cli *kubernetes.Clientset) (string, string, error) {
k8sVer, err := cli.ServerVersion()
if err != nil {
return "", fmt.Errorf("while getting server version: %w", err)
return "", "", fmt.Errorf("while getting server version: %w", err)
}

botkubeVersion := version.Short()
if len(botkubeVersion) == 0 {
botkubeVersion = "Unknown"
}

return fmt.Sprintf("K8s Server Version: %s\nBotkube version: %s", k8sVer.String(), botkubeVersion), nil
return fmt.Sprintf("K8s Server Version: %s\nBotkube version: %s", k8sVer.String(), botkubeVersion), k8sVer.String(), nil
}
47 changes: 42 additions & 5 deletions internal/status/gql_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,48 @@ type GraphQLStatusReporter struct {
resVerMutex sync.RWMutex
}

func newGraphQLStatusReporter(logger logrus.FieldLogger, client GraphQLClient, resVerClient ResVerClient, cfgVersion int) *GraphQLStatusReporter {
func newGraphQLStatusReporter(logger logrus.FieldLogger, client GraphQLClient, resVerClient ResVerClient) *GraphQLStatusReporter {
return &GraphQLStatusReporter{
log: logger,
gql: client,
resVerClient: resVerClient,
resourceVersion: cfgVersion,
log: logger,
gql: client,
resVerClient: resVerClient,
}
}

// ReportDeploymentConnectionInit reports connection initialization.
func (r *GraphQLStatusReporter) ReportDeploymentConnectionInit(ctx context.Context, k8sVer string) error {
logger := r.log.WithFields(logrus.Fields{
"deploymentID": r.gql.DeploymentID(),
"type": "connecting",
})
logger.Debug("Reporting...")
var mutation struct {
Success bool `graphql:"reportDeploymentConnectionInit(id: $id, botkubeVersion: $botkubeVersion, k8sVer: $k8sVer)"`
}
variables := map[string]interface{}{
"id": graphql.ID(r.gql.DeploymentID()),
"botkubeVersion": version.Info().Version,
"k8sVer": k8sVer,
}

err := r.withRetry(ctx, logger, func() error {
err := r.gql.Client().Mutate(ctx, &mutation, variables)
if err != nil {
return err
}
if !mutation.Success {
return errors.New("failed to report connection initialization")
}
return nil
})
if err != nil {
return errors.Wrap(err, "while reporting deployment connection initialization")
}
logger.Debug("Reporting successful.")

return nil
}

// ReportDeploymentStartup reports deployment startup to GraphQL server.
func (r *GraphQLStatusReporter) ReportDeploymentStartup(ctx context.Context) error {
logger := r.log.WithFields(logrus.Fields{
Expand Down Expand Up @@ -139,6 +172,10 @@ func (r *GraphQLStatusReporter) SetResourceVersion(resourceVersion int) {
r.resourceVersion = resourceVersion
}

func (r *GraphQLStatusReporter) SetLogger(logger logrus.FieldLogger) {
r.log = logger.WithField("component", "GraphQLStatusReporter")
}

const (
retries = 3
delay = 200 * time.Millisecond
Expand Down
8 changes: 8 additions & 0 deletions internal/status/noop_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,18 @@ package status

import (
"context"

"github.com/sirupsen/logrus"
)

var _ StatusReporter = (*NoopStatusReporter)(nil)

type NoopStatusReporter struct{}

func (n NoopStatusReporter) ReportDeploymentConnectionInit(context.Context, string) error {
return nil
}

func (n NoopStatusReporter) ReportDeploymentStartup(context.Context) error {
return nil
}
Expand All @@ -23,6 +29,8 @@ func (n NoopStatusReporter) ReportDeploymentFailure(context.Context, string) err
func (n NoopStatusReporter) SetResourceVersion(int) {
}

func (n NoopStatusReporter) SetLogger(logrus.FieldLogger) {}

func newNoopStatusReporter() *NoopStatusReporter {
return &NoopStatusReporter{}
}
20 changes: 17 additions & 3 deletions internal/status/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,38 @@ import (
"context"

"github.com/sirupsen/logrus"

"github.com/kubeshop/botkube/internal/loggerx"
"github.com/kubeshop/botkube/pkg/config"
)

type StatusReporter interface {
ReportDeploymentConnectionInit(ctx context.Context, k8sVer string) error
ReportDeploymentStartup(ctx context.Context) error
ReportDeploymentShutdown(ctx context.Context) error
ReportDeploymentFailure(ctx context.Context, errMsg string) error
SetResourceVersion(resourceVersion int)
SetLogger(logger logrus.FieldLogger)
}

func GetReporter(remoteCfgEnabled bool, logger logrus.FieldLogger, gql GraphQLClient, resVerClient ResVerClient, cfgVersion int) StatusReporter {
func GetReporter(remoteCfgEnabled bool, gql GraphQLClient, resVerClient ResVerClient, log logrus.FieldLogger) StatusReporter {
if remoteCfgEnabled {
log = withDefaultLogger(log)
return newGraphQLStatusReporter(
logger.WithField("component", "GraphQLStatusReporter"),
log.WithField("component", "GraphQLStatusReporter"),
gql,
resVerClient,
cfgVersion,
)
}

return newNoopStatusReporter()
}

func withDefaultLogger(log logrus.FieldLogger) logrus.FieldLogger {
if log != nil {
return log
}
return loggerx.New(config.Logger{
Level: "info",
})
}
Loading