Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Webhook refactored after k8s plugin extraction #1008

Merged
merged 6 commits into from
Mar 13, 2023

Conversation

huseyinbabal
Copy link
Contributor

@huseyinbabal huseyinbabal commented Mar 7, 2023

Description

Changes proposed in this pull request:

  • Webhook sink is updated to accept new contract api.Message
  • Introduced rawData param to send original data to sinks
  • Collector is updated to enable plugins for also sinks

Testing

Build plugins

PLUGIN_DOWNLOAD_URL_BASE_PATH=http://localhost:8080/plugin-dist make gen-plugins-index

Run plugin server

npx serve --listen 8080

Run Botkube with the following config

ℹ️ You can use Webhook Site Tester for webhook URL

export BOTKUBE_CONFIG_PATHS=/tmp/botkube.yml
export BOTKUBE_SETTINGS_KUBECONFIG=${USER}/.kube/config

go run cmd/botkube/main.go
## /tmp/botkube.yml
communications:
  'default-group':
    webhook:
      enabled: true
      url: 'webhook_url'
      bindings:
        sources: 
          - k8s-events
          - k8s-updates


settings:
  clusterName: gke-playground
  configWatcher: true
  log:
    level: debug
  lifecycleServer:
    deployment:
      name: botkube
      namespace: botkube
    port: "2113"

configWatcher:
  enabled: false
  initialSyncTimeout: 0

sources:
  'k8s-events':
    displayName: "K8s recommendations"
    'botkube/kubernetes':
      enabled: true
      config:
        log:
          level: debug
        recommendations:
          pod:
            noLatestImageTag: true
            labelsSet: true
          ingress:
            backendServiceValid: false
            tlsSecretValid: false
        namespaces:
          include:
            - botkube
        event:
          types:
            - create
            - update

  'k8s-updates':
    displayName: "K8s ConfigMaps updates"
    'botkube/kubernetes':
      enabled: true
      config:
        log:
          level: debug
        namespaces:
          include:
            - default
        event:
          types:
            - create
            - update
            - delete
        resources:
          - type: v1/configmaps
            namespaces:
              include:
                - botkube
            event: # overrides top level `event` entry
              types:
                - update
            updateSetting:
              includeDiff: true
              fields:
                - data

plugins:
  repositories:
    botkube:
      url: http://localhost:8080/plugins-index.yaml

Create a pod to see a notification

kubectl run nginx_webhook --image=nginx:latest -n botkube

Related issue(s)

Fixes #999

@huseyinbabal huseyinbabal marked this pull request as ready for review March 7, 2023 12:02
@huseyinbabal huseyinbabal requested review from a team and PrasadG193 as code owners March 7, 2023 12:02
@pkosiec pkosiec self-assigned this Mar 9, 2023
// SendMessageToAll is used for notifying about Botkube start/stop listening, possible Botkube upgrades and other events.
// Some integrations may decide to ignore such messages and have SendMessage method no-op.
// TODO: Consider option per channel to turn on/off "announcements" (Botkube start/stop/upgrade, notify/config change).
SendMessageToAll(context.Context, interactive.CoreMessage) error

// SendMessage sends a generic message for a given source bindings.
SendMessage(context.Context, interactive.CoreMessage, []string) error
SendMessage(context.Context, interactive.CoreMessage, []string, any) error
Copy link
Member

@pkosiec pkosiec Mar 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bots don't consume raw event, so, so can we specify a different interface for Sinks and Bots if they are different?

So for sinks we can still require such method:

type Sink interface {
    SendEvent(ctx context.Context, event any, eventSources []string) error
  // (...)
}

and for Bots:

type Bot interface {
  SendMessage(context.Context, interactive.CoreMessage, []string) error
  // (...)
}

And then in dispatcher, instead of iterating over notifiers, we can iterate over both bots and sinks and send the data properly (message + raw data).

Could you please update also Elasticsearch integration? It can be one PR if it will be easier for you 👍 Thank you!

BTW We can remove SendEvent methods from bots as they are not used and won't be in such setup 🙂

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need @mszostok 's input here, I don't want to break something :) After that I will introduce 2 interfaces.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I agree with Paweł's comment 👍 For now, Bots don't need to have the raw event and I don't see any use case for that at least for the current features that we plan to have. Having that separated will be better for our code-base 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pkosiec @mszostok I separated sink and bot notifiers as suggested

Comment on lines 61 to 57
Source: sources[0],
Data: rawData,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 That changes the contract which were before. Can we try to keep the same shape of data? so basically we can inline data.

And why do we send just a first source binding which instead of all of them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we cannot keep the previous format since, that format was all about kubernetes events. I applied same strategy we did for telemetry. So, those information is called RawData and we send it to webhook and es. Notice that, those data is coming from plugins which can be anything

We haven't changed the signature for sources yet, it is still accepting list of sources, and that is what I get only first since there is only one source inside it. this will be removed soon, and we will have only single source.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, fine with me - but please make sure we mention this change in the breaking changes 🙂

I understand that there is one item, but let's do this in the meantime it is not yet refactored to a single source:

Suggested change
Source: sources[0],
Data: rawData,
Source: strings.Join(sources, ","),
Data: rawData,

Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is good suggestion :)

Copy link
Member

@pkosiec pkosiec left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Works well! Just minor code-related suggestions.

Comment on lines 11 to 12
// SendMessage sends a generic message for a given source bindings.
SendMessage(context.Context, any, []string) error
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it could stay as SendEvent, as we don't send message, but raw event data? What do you think?

bots = map[string]bot.Bot{}
botNotifiers []notifier.Bot
sinkNotifiers []notifier.Sink
bots = map[string]bot.Bot{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

botNotifiers []notifier.Bot is redundant aa we have this bot map. This map type can be changed to

bots = map[string]notifier.Bot{}

and essentially pass it everywhere instead of the slice. But we can do it later - can you change the type and add this TODO at least? Thanks!

Suggested change
bots = map[string]bot.Bot{}
// TODO: Use bots everywhere instead of `botNotifiers` slice
bots = map[string]notifier.Bot{}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bot.Bot refers to bot implementation while bot.Notifier is for notification. When I compare with main, we have bots and notifiers

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already discussed f2f 🙂 tl;dr for others: we use bots map for sendHelp where we need only the SendMessageToAll

Comment on lines 175 to 202
for _, n := range d.sinkNotifiers {
go func(n notifier.Sink) {
defer analytics.ReportPanicIfOccurs(d.log, d.reporter)
err := n.SendMessage(ctx, event.RawObject, sources)
if err != nil {
reportErr := d.reporter.ReportHandledEventError(analytics.ReportEvent{
IntegrationType: n.Type(),
Platform: n.IntegrationName(),
PluginName: pluginName,
AnonymizedEventFields: event.AnalyticsLabels,
}, err)
if reportErr != nil {
err = multierror.Append(err, fmt.Errorf("while reporting sink analytics: %w", reportErr))
}

d.log.Errorf("while sending sink message: %s", err.Error())
}
reportErr := d.reporter.ReportHandledEventSuccess(analytics.ReportEvent{
IntegrationType: n.Type(),
Platform: n.IntegrationName(),
PluginName: pluginName,
AnonymizedEventFields: event.AnalyticsLabels,
})
if reportErr != nil {
d.log.Errorf("while reporting sink analytics: %w", err)
}
if err := d.reportAudit(ctx, pluginName, fmt.Sprintf("%v", event.RawObject), dispatch.sourceName); err != nil {
d.log.Errorf("while reporting sink audit event: %s", err.Error())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just checked and this is duplicated code for bots. Can we unify it? At least the reporting part can be extracted to a separate method with all the error wrapping

image

For example something like this:

func (d *Dispatcher) dispatchMsg(ctx context.Context, event source.Event, dispatch PluginDispatch) {
	var (
		pluginName = dispatch.pluginName
		sources    = []string{dispatch.sourceName}
	)

	for _, n := range d.getBotNotifiers(dispatch) {
		go func(n notifier.Bot) {
			defer analytics.ReportPanicIfOccurs(d.log, d.reporter)
			msg := interactive.CoreMessage{
				Message: event.Message,
			}
			err := n.SendMessage(ctx, msg, sources)
			if err != nil {
				reportErr := d.reportError(err, n, pluginName, event)
				if reportErr != nil {
					err = multierror.Append(err, fmt.Errorf("while reporting error: %w", reportErr))
				}

				d.log.Errorf("while sending bot message: %s", err.Error())
				// TODO(Huseyin): Shouldn't we have return here? Currently we don't, which is weird...
			}

			reportErr := d.reportSuccess(ctx, n, pluginName, event, dispatch.sourceName)
			if reportErr != nil {
				d.log.Error(err)
			}
		}(n)
	}

	for _, n := range d.sinkNotifiers {
		go func(n notifier.Sink) {
			defer analytics.ReportPanicIfOccurs(d.log, d.reporter)
			err := n.SendMessage(ctx, event.RawObject, sources)
			if err != nil {
				reportErr := d.reportError(err, n, pluginName, event)
				if reportErr != nil {
					err = multierror.Append(err, fmt.Errorf("while reporting error: %w", reportErr))
				}

				d.log.Errorf("while sending sink message: %s", err.Error())
				// TODO(Huseyin): Shouldn't we have return here? Currently we don't, which is weird...
			}

			reportErr := d.reportSuccess(ctx, n, pluginName, event, dispatch.sourceName)
			if reportErr != nil {
				d.log.Error(err)
			}
		}(n)
	}

	// execute actions
	actions, err := d.actionProvider.RenderedActions(event.RawObject, sources)
	if err != nil {
		d.log.Errorf("while rendering automated actions: %s", err.Error())
		return
	}
	for _, act := range actions {
		d.log.Infof("Executing action %q (command: %q)...", act.DisplayName, act.Command)
		genericMsg := d.actionProvider.ExecuteAction(ctx, act)
		for _, n := range d.getBotNotifiers(dispatch) {
			go func(n notifier.Bot) {
				defer analytics.ReportPanicIfOccurs(d.log, d.reporter)
				err := n.SendMessage(ctx, genericMsg, sources)
				if err != nil {
					d.log.Errorf("while sending action result message: %s", err.Error())
				}
			}(n)
		}
	}
}

type genericNotifier interface {
	IntegrationName() config.CommPlatformIntegration
	Type() config.IntegrationType
}

func (d *Dispatcher) reportSuccess(ctx context.Context, n genericNotifier, pluginName string, event source.Event, sourceName string) error {
	errs := multierror.New()
	reportErr := d.reporter.ReportHandledEventSuccess(analytics.ReportEvent{
		IntegrationType:       n.Type(),
		Platform:              n.IntegrationName(),
		PluginName:            pluginName,
		AnonymizedEventFields: event.AnalyticsLabels,
	})
	if reportErr != nil {
		errs = multierror.Append(errs, fmt.Errorf("while reporting %s analytics: %w", n.Type(), reportErr))
	}
	if err := d.reportAudit(ctx, pluginName, fmt.Sprintf("%v", event.RawObject), sourceName); err != nil {
		errs = multierror.Append(errs, fmt.Errorf("while reporting %s audit event: %w", n.Type(), reportErr))
	}
	
	return errs.ErrorOrNil()
}

func (d *Dispatcher) reportError(err error, n genericNotifier, pluginName string, event source.Event) error {
	reportErr := d.reporter.ReportHandledEventError(analytics.ReportEvent{
		IntegrationType:       n.Type(),
		Platform:              n.IntegrationName(),
		PluginName:            pluginName,
		AnonymizedEventFields: event.AnalyticsLabels,
	}, err)
	if reportErr != nil {
		return fmt.Errorf("while reporting %s analytics: %w", n.Type(), reportErr)
	}
	
	return nil
}

Copy link
Member

@pkosiec pkosiec Mar 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: after discussion let's go with this snippet:

func (d *Dispatcher) dispatchMsg(ctx context.Context, event source.Event, dispatch PluginDispatch) {
	var (
		pluginName = dispatch.pluginName
		sources    = []string{dispatch.sourceName}
	)

	for _, n := range d.getBotNotifiers(dispatch) {
		go func(n notifier.Bot) {
			defer analytics.ReportPanicIfOccurs(d.log, d.reporter)
			msg := interactive.CoreMessage{
				Message: event.Message,
			}
			err := n.SendMessage(ctx, msg, sources)
			if err != nil {
				reportErr := d.reportError(ctx, err, n, pluginName, event, dispatch.sourceName
				if reportErr != nil {
					err = multierror.Append(err, fmt.Errorf("while reporting error: %w", reportErr))
				}

				d.log.Errorf("while sending bot message: %s", err.Error())
				return
			}

			reportErr := d.reportSuccess(ctx, n, pluginName, event, dispatch.sourceName)
			if reportErr != nil {
				d.log.Error(err)
			}
		}(n)
	}

	for _, n := range d.sinkNotifiers {
		go func(n notifier.Sink) {
			defer analytics.ReportPanicIfOccurs(d.log, d.reporter)
			err := n.SendMessage(ctx, event.RawObject, sources)
			if err != nil {
				reportErr := d.reportError(ctx, err, n, pluginName, event, dispatch.sourceName)
				if reportErr != nil {
					err = multierror.Append(err, fmt.Errorf("while reporting error: %w", reportErr))
				}

				d.log.Errorf("while sending sink message: %s", err.Error())
				return
			}

			reportErr := d.reportSuccess(ctx, n, pluginName, event, dispatch.sourceName)
			if reportErr != nil {
				d.log.Error(err)
			}
		}(n)
	}

	// execute actions
	actions, err := d.actionProvider.RenderedActions(event.RawObject, sources)
	if err != nil {
		d.log.Errorf("while rendering automated actions: %s", err.Error())
		return
	}
	for _, act := range actions {
		d.log.Infof("Executing action %q (command: %q)...", act.DisplayName, act.Command)
		genericMsg := d.actionProvider.ExecuteAction(ctx, act)
		for _, n := range d.getBotNotifiers(dispatch) {
			go func(n notifier.Bot) {
				defer analytics.ReportPanicIfOccurs(d.log, d.reporter)
				err := n.SendMessage(ctx, genericMsg, sources)
				if err != nil {
					d.log.Errorf("while sending action result message: %s", err.Error())
				}
			}(n)
		}
	}
}

type genericNotifier interface {
	IntegrationName() config.CommPlatformIntegration
	Type() config.IntegrationType
}

func (d *Dispatcher) reportSuccess(ctx context.Context, n genericNotifier, pluginName string, event source.Event, sourceName string) error {
	errs := multierror.New()
	reportErr := d.reporter.ReportHandledEventSuccess(analytics.ReportEvent{
		IntegrationType:       n.Type(),
		Platform:              n.IntegrationName(),
		PluginName:            pluginName,
		AnonymizedEventFields: event.AnalyticsLabels,
	})
	if reportErr != nil {
		errs = multierror.Append(errs, fmt.Errorf("while reporting %s analytics: %w", n.Type(), reportErr))
	}
	if err := d.reportAudit(ctx, pluginName, fmt.Sprintf("%v", event.RawObject), sourceName); err != nil {
		errs = multierror.Append(errs, fmt.Errorf("while reporting %s audit event: %w", n.Type(), reportErr))
	}

	return errs.ErrorOrNil()
}

func (d *Dispatcher) reportError(ctx context.Context, err error, n genericNotifier, pluginName string, event source.Event, sourceName string) error {
	errs := multierror.New()
	reportErr := d.reporter.ReportHandledEventError(analytics.ReportEvent{
		IntegrationType:       n.Type(),
		Platform:              n.IntegrationName(),
		PluginName:            pluginName,
		AnonymizedEventFields: event.AnalyticsLabels,
	}, err)
	if reportErr != nil {
		errs = multierror.Append(errs, fmt.Errorf("while reporting %s analytics: %w", n.Type(), reportErr))
	}
	// TODO: add additional metadata about event failed to send
	if err := d.reportAudit(ctx, pluginName, fmt.Sprintf("%v", event.RawObject), sourceName); err != nil {
		errs = multierror.Append(errs, fmt.Errorf("while reporting %s audit event: %w", n.Type(), reportErr))
	}

	return nil
}

@huseyinbabal
Copy link
Contributor Author

@pkosiec addressed all suggestions and created a ticket for audit event metadata https://github.com/kubeshop/botkube-cloud/issues/203

@huseyinbabal huseyinbabal merged commit 0ed9a27 into kubeshop:main Mar 13, 2023
@huseyinbabal huseyinbabal deleted the webhook-sink-refactor branch March 13, 2023 11:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Refactor Webhook sink
3 participants