From 95918ec778dfb4866366bbc96eed389e870df8a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Kosiec?= Date: Fri, 5 Aug 2022 11:08:12 +0200 Subject: [PATCH] Handle multiple configurations for multiple bots and sinks (#670) - Spawn multiple bots and sinks for each communication group - Support multiple notification channels for all bots - Support multiple indices for Elasticsearch sink - Support toggling notifications on / off by channel for all bots - Do not fall back to sending notifications to default channel (as there are no "default channel") when annotation config is wrong; instead, report an error - Pass executor bindings to Executor - Refactor trimming message prefix for all bots - Always return when running notifier commands (even if in not authorized channel) - Add E2E tests - Fix outdated E2E instructions - Fix security vulnerability with yaml library --- cmd/botkube/main.go | 118 ++++--- go.mod | 2 +- go.sum | 4 +- helm/botkube/e2e-test-values.yaml | 11 +- helm/botkube/templates/tests/e2e-test.yaml | 4 +- pkg/bot/bot.go | 18 +- pkg/bot/discord.go | 203 +++++++---- pkg/bot/discord_fmt.go | 4 +- pkg/bot/discord_test.go | 57 ++++ pkg/bot/mattermost.go | 323 ++++++++++-------- pkg/bot/mattermost_fmt.go | 27 ++ pkg/bot/mattermost_test.go | 64 ++++ pkg/bot/slack.go | 250 ++++++++------ pkg/bot/slack_fmt.go | 9 +- pkg/bot/slack_test.go | 52 +++ pkg/bot/teams.go | 295 ++++++++++------ pkg/bot/teams_test.go | 49 +++ pkg/config/config.go | 74 ++-- pkg/config/config_test.go | 6 +- .../TestLoadConfigSuccess/config-all.yaml | 13 +- .../TestLoadConfigSuccess/config.golden.yaml | 13 +- pkg/controller/notifier.go | 1 + pkg/execute/executor.go | 71 ++-- pkg/execute/factory.go | 12 +- pkg/execute/notifier.go | 48 ++- pkg/execute/notifier_test.go | 76 +++-- pkg/sink/elasticsearch.go | 53 ++- test/README.md | 2 +- test/e2e/k8s_helpers_test.go | 16 +- test/e2e/slack_test.go | 69 +++- test/e2e/slack_tester_test.go | 17 + 31 files changed, 1312 insertions(+), 649 deletions(-) create mode 100644 pkg/bot/discord_test.go create mode 100644 pkg/bot/mattermost_test.go create mode 100644 pkg/bot/slack_test.go create mode 100644 pkg/bot/teams_test.go diff --git a/cmd/botkube/main.go b/cmd/botkube/main.go index 3c99a2438..868780e13 100644 --- a/cmd/botkube/main.go +++ b/cmd/botkube/main.go @@ -41,6 +41,7 @@ const ( componentLogFieldKey = "component" botLogFieldKey = "bot" sinkLogFieldKey = "sink" + commGroupFieldKey = "commGroup" printAPIKeyCharCount = 3 ) @@ -142,71 +143,82 @@ func run() error { reporter, ) - commCfg := conf.Communications.GetFirst() + commCfg := conf.Communications var notifiers []controller.Notifier - // Run bots - if commCfg.Slack.Enabled { - sb, err := bot.NewSlack(logger.WithField(botLogFieldKey, "Slack"), conf, executorFactory, reporter) - if err != nil { - return reportFatalError("while creating Slack bot", err) + // TODO: Current limitation: Communication platform config should be separate inside every group: + // For example, if in both communication groups there's a Slack configuration pointing to the same workspace, + // when user executes `kubectl` command, one Bot instance will execute the command and return response, + // and the second "Sorry, this channel is not authorized to execute kubectl command" error. + for commGroupName, commGroupCfg := range commCfg { + commGroupLogger := logger.WithField(commGroupFieldKey, commGroupName) + + // Run bots + if commGroupCfg.Slack.Enabled { + sb, err := bot.NewSlack(commGroupLogger.WithField(botLogFieldKey, "Slack"), commGroupCfg.Slack, executorFactory, reporter) + if err != nil { + return reportFatalError("while creating Slack bot", err) + } + notifiers = append(notifiers, sb) + errGroup.Go(func() error { + defer analytics.ReportPanicIfOccurs(commGroupLogger, reporter) + return sb.Start(ctx) + }) } - notifiers = append(notifiers, sb) - errGroup.Go(func() error { - defer analytics.ReportPanicIfOccurs(logger, reporter) - return sb.Start(ctx) - }) - } - if commCfg.Mattermost.Enabled { - mb, err := bot.NewMattermost(logger.WithField(botLogFieldKey, "Mattermost"), conf, executorFactory, reporter) - if err != nil { - return reportFatalError("while creating Mattermost bot", err) + if commGroupCfg.Mattermost.Enabled { + mb, err := bot.NewMattermost(commGroupLogger.WithField(botLogFieldKey, "Mattermost"), commGroupCfg.Mattermost, executorFactory, reporter) + if err != nil { + return reportFatalError("while creating Mattermost bot", err) + } + notifiers = append(notifiers, mb) + errGroup.Go(func() error { + defer analytics.ReportPanicIfOccurs(commGroupLogger, reporter) + return mb.Start(ctx) + }) } - notifiers = append(notifiers, mb) - errGroup.Go(func() error { - defer analytics.ReportPanicIfOccurs(logger, reporter) - return mb.Start(ctx) - }) - } - if commCfg.Teams.Enabled { - tb := bot.NewTeams(logger.WithField(botLogFieldKey, "MS Teams"), conf, executorFactory, reporter) - notifiers = append(notifiers, tb) - errGroup.Go(func() error { - defer analytics.ReportPanicIfOccurs(logger, reporter) - return tb.Start(ctx) - }) - } - - if commCfg.Discord.Enabled { - db, err := bot.NewDiscord(logger.WithField(botLogFieldKey, "Discord"), conf, executorFactory, reporter) - if err != nil { - return reportFatalError("while creating Discord bot", err) + if commGroupCfg.Teams.Enabled { + tb, err := bot.NewTeams(commGroupLogger.WithField(botLogFieldKey, "MS Teams"), commGroupCfg.Teams, conf.Settings.ClusterName, executorFactory, reporter) + if err != nil { + return reportFatalError("while creating Teams bot", err) + } + notifiers = append(notifiers, tb) + errGroup.Go(func() error { + defer analytics.ReportPanicIfOccurs(commGroupLogger, reporter) + return tb.Start(ctx) + }) } - notifiers = append(notifiers, db) - errGroup.Go(func() error { - defer analytics.ReportPanicIfOccurs(logger, reporter) - return db.Start(ctx) - }) - } - // Run sinks - if commCfg.Elasticsearch.Enabled { - es, err := sink.NewElasticsearch(logger.WithField(sinkLogFieldKey, "Elasticsearch"), commCfg.Elasticsearch, reporter) - if err != nil { - return reportFatalError("while creating Elasticsearch sink", err) + if commGroupCfg.Discord.Enabled { + db, err := bot.NewDiscord(commGroupLogger.WithField(botLogFieldKey, "Discord"), commGroupCfg.Discord, executorFactory, reporter) + if err != nil { + return reportFatalError("while creating Discord bot", err) + } + notifiers = append(notifiers, db) + errGroup.Go(func() error { + defer analytics.ReportPanicIfOccurs(commGroupLogger, reporter) + return db.Start(ctx) + }) } - notifiers = append(notifiers, es) - } - if commCfg.Webhook.Enabled { - wh, err := sink.NewWebhook(logger.WithField(sinkLogFieldKey, "Webhook"), commCfg.Webhook, reporter) - if err != nil { - return reportFatalError("while creating Webhook sink", err) + // Run sinks + if commGroupCfg.Elasticsearch.Enabled { + es, err := sink.NewElasticsearch(commGroupLogger.WithField(sinkLogFieldKey, "Elasticsearch"), commGroupCfg.Elasticsearch, reporter) + if err != nil { + return reportFatalError("while creating Elasticsearch sink", err) + } + notifiers = append(notifiers, es) } - notifiers = append(notifiers, wh) + if commGroupCfg.Webhook.Enabled { + wh, err := sink.NewWebhook(commGroupLogger.WithField(sinkLogFieldKey, "Webhook"), commGroupCfg.Webhook, reporter) + if err != nil { + return reportFatalError("while creating Webhook sink", err) + } + + notifiers = append(notifiers, wh) + } } // Start upgrade checker diff --git a/go.mod b/go.mod index a0bd14758..1ef3b8aba 100644 --- a/go.mod +++ b/go.mod @@ -25,7 +25,7 @@ require ( github.com/vrischmann/envconfig v1.3.0 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c golang.org/x/text v0.3.7 - gopkg.in/yaml.v3 v3.0.0 + gopkg.in/yaml.v3 v3.0.1 gotest.tools/v3 v3.0.3 k8s.io/api v0.24.0 k8s.io/apimachinery v0.24.0 diff --git a/go.sum b/go.sum index dd7fa3cd0..6bea05684 100644 --- a/go.sum +++ b/go.sum @@ -1790,8 +1790,8 @@ gopkg.in/yaml.v3 v3.0.0-20191120175047-4206685974f2/go.mod h1:K4uyk7z7BCEPqu6E+C gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.0 h1:hjy8E9ON/egN1tAYqKb61G10WtihqetD4sz2H+8nIeA= -gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk= gotest.tools/v3 v3.0.3 h1:4AuOwCGf4lLR9u3YOe2awrHygurzhO/HeQ6laiA6Sx0= diff --git a/helm/botkube/e2e-test-values.yaml b/helm/botkube/e2e-test-values.yaml index c6ff20ea6..f2c96c047 100644 --- a/helm/botkube/e2e-test-values.yaml +++ b/helm/botkube/e2e-test-values.yaml @@ -5,13 +5,22 @@ communications: token: "" # Provide a valid token for BotKube app channels: 'default': - name: "" # Test will override that + name: "" # Tests will override this temporarily bindings: executors: - kubectl-read-only - kubectl-wait-cmd - kubectl-exec-cmd - kubectl-allow-all + sources: + - k8s-events + 'secondary': + name: "" # Tests will override this temporarily + bindings: + executors: + - kubectl-read-only + sources: + - k8s-events sources: 'k8s-events': diff --git a/helm/botkube/templates/tests/e2e-test.yaml b/helm/botkube/templates/tests/e2e-test.yaml index 7c9bc5fc9..bed1f00d4 100644 --- a/helm/botkube/templates/tests/e2e-test.yaml +++ b/helm/botkube/templates/tests/e2e-test.yaml @@ -32,8 +32,10 @@ spec: value: "{{ .Values.e2eTest.deployment.waitTimeout }}" - name: DEPLOYMENT_ENVS_SLACK_ENABLED_NAME value: "BOTKUBE_COMMUNICATIONS_DEFAULT-GROUP_SLACK_ENABLED" - - name: DEPLOYMENT_ENVS_SLACK_CHANNEL_ID_NAME + - name: DEPLOYMENT_ENVS_DEFAULT_SLACK_CHANNEL_ID_NAME value: "BOTKUBE_COMMUNICATIONS_DEFAULT-GROUP_SLACK_CHANNELS_DEFAULT_NAME" + - name: DEPLOYMENT_ENVS_SECONDARY_SLACK_CHANNEL_ID_NAME + value: "BOTKUBE_COMMUNICATIONS_DEFAULT-GROUP_SLACK_CHANNELS_SECONDARY_NAME" - name: CLUSTER_NAME value: "{{ .Values.settings.clusterName }}" - name: SLACK_BOT_NAME diff --git a/pkg/bot/bot.go b/pkg/bot/bot.go index 64e31705d..4e1417cf8 100644 --- a/pkg/bot/bot.go +++ b/pkg/bot/bot.go @@ -8,6 +8,10 @@ import ( "github.com/kubeshop/botkube/pkg/execute" ) +const ( + defaultNotifyValue = true +) + // Bot connects to communication channels and reads/sends messages. It is a two-way integration. type Bot interface { Start(ctx context.Context) error @@ -17,7 +21,7 @@ type Bot interface { // ExecutorFactory facilitates creation of execute.Executor instances. type ExecutorFactory interface { - NewDefault(platform config.CommPlatformIntegration, notifierHandler execute.NotifierHandler, isAuthChannel bool, message string) execute.Executor + NewDefault(platform config.CommPlatformIntegration, notifierHandler execute.NotifierHandler, isAuthChannel bool, conversationID string, bindings []string, message string) execute.Executor } // AnalyticsReporter defines a reporter that collects analytics data. @@ -36,3 +40,15 @@ type FatalErrorAnalyticsReporter interface { // Close cleans up the reporter resources. Close() error } + +type channelConfigByID struct { + config.ChannelBindingsByID + + notify bool +} + +type channelConfigByName struct { + config.ChannelBindingsByName + + notify bool +} diff --git a/pkg/bot/discord.go b/pkg/bot/discord.go index 41d189f9d..7386c95a0 100644 --- a/pkg/bot/discord.go +++ b/pkg/bot/discord.go @@ -3,6 +3,7 @@ package bot import ( "context" "fmt" + "regexp" "strings" "sync" @@ -11,7 +12,9 @@ import ( "github.com/kubeshop/botkube/pkg/config" "github.com/kubeshop/botkube/pkg/events" + "github.com/kubeshop/botkube/pkg/execute" "github.com/kubeshop/botkube/pkg/format" + "github.com/kubeshop/botkube/pkg/multierror" ) // TODO: Refactor this file as a part of https://github.com/kubeshop/botkube/issues/667 @@ -22,7 +25,13 @@ import ( var _ Bot = &Discord{} // customTimeFormat holds custom time format string. -const customTimeFormat = "2006-01-02T15:04:05Z" +const ( + customTimeFormat = "2006-01-02T15:04:05Z" + + // discordBotMentionRegexFmt supports also nicknames (the exclamation mark). + // Read more: https://discordjs.guide/miscellaneous/parsing-mention-arguments.html#how-discord-mentions-work + discordBotMentionRegexFmt = "^<@!?%s>" +) var embedColor = map[config.Level]int{ config.Info: 8311585, // green @@ -37,15 +46,13 @@ type Discord struct { log logrus.FieldLogger executorFactory ExecutorFactory reporter AnalyticsReporter - notifyMutex sync.RWMutex - notify bool api *discordgo.Session - - Notification config.Notification - Token string - ChannelID string - BotID string - ExecutorBindings []string + notification config.Notification + botID string + channelsMutex sync.RWMutex + channels map[string]channelConfigByID + notifyMutex sync.Mutex + botMentionRegex *regexp.Regexp } // discordMessage contains message details to execute command and send back the result. @@ -54,7 +61,6 @@ type discordMessage struct { executorFactory ExecutorFactory Event *discordgo.MessageCreate - BotID string Request string Response string IsAuthChannel bool @@ -62,26 +68,28 @@ type discordMessage struct { } // NewDiscord creates a new Discord instance. -func NewDiscord(log logrus.FieldLogger, c *config.Config, executorFactory ExecutorFactory, reporter AnalyticsReporter) (*Discord, error) { - discord := c.Communications.GetFirst().Discord +func NewDiscord(log logrus.FieldLogger, cfg config.Discord, executorFactory ExecutorFactory, reporter AnalyticsReporter) (*Discord, error) { + botMentionRegex, err := discordBotMentionRegex(cfg.BotID) + if err != nil { + return nil, err + } - api, err := discordgo.New("Bot " + discord.Token) + api, err := discordgo.New("Bot " + cfg.Token) if err != nil { return nil, fmt.Errorf("while creating Discord session: %w", err) } + channelsCfg := discordChannelsConfigFrom(cfg.Channels) + return &Discord{ log: log, reporter: reporter, executorFactory: executorFactory, - notify: true, // enabled by default api: api, - - Token: discord.Token, - BotID: discord.BotID, - ChannelID: discord.Channels.GetFirst().ID, - ExecutorBindings: discord.Channels.GetFirst().Bindings.Executors, - Notification: discord.Notification, + botID: cfg.BotID, + notification: cfg.Notification, + channels: channelsCfg, + botMentionRegex: botMentionRegex, }, nil } @@ -95,7 +103,6 @@ func (b *Discord) Start(ctx context.Context) error { log: b.log, executorFactory: b.executorFactory, Event: m, - BotID: b.BotID, Session: s, } @@ -128,33 +135,39 @@ func (b *Discord) Start(ctx context.Context) error { // SendEvent sends event notification to Discord ChannelID. // Context is not supported by client: See https://github.com/bwmarrin/discordgo/issues/752. func (b *Discord) SendEvent(_ context.Context, event events.Event) (err error) { - if !b.notify { - b.log.Info("Notifications are disabled. Skipping event...") - return nil - } - b.log.Debugf(">> Sending to Discord: %+v", event) - messageSend := b.formatMessage(event, b.Notification) + msgToSend := b.formatMessage(event) + + errs := multierror.New() + for _, channelID := range b.getChannelsToNotify() { + msg := msgToSend // copy as the struct is modified when using Discord API client + if _, err := b.api.ChannelMessageSendComplex(channelID, &msg); err != nil { + errs = multierror.Append(errs, fmt.Errorf("while sending Discord message to channel %q: %w", channelID, err)) + continue + } - if _, err := b.api.ChannelMessageSendComplex(b.ChannelID, &messageSend); err != nil { - return fmt.Errorf("while sending Discord message to channel %q: %w", b.ChannelID, err) + b.log.Debugf("Event successfully sent to channel %q", channelID) } - b.log.Debugf("Event successfully sent to channel %s", b.ChannelID) - return nil + return errs.ErrorOrNil() } // SendMessage sends message to Discord channel. // Context is not supported by client: See https://github.com/bwmarrin/discordgo/issues/752. func (b *Discord) SendMessage(_ context.Context, msg string) error { - b.log.Debugf(">> Sending to Discord: %+v", msg) - - if _, err := b.api.ChannelMessageSend(b.ChannelID, msg); err != nil { - return fmt.Errorf("while sending Discord message to channel %q: %w", b.ChannelID, err) + errs := multierror.New() + for _, channel := range b.getChannels() { + channelID := channel.ID + b.log.Debugf(">> Sending message to channel %q: %+v", channelID, msg) + if _, err := b.api.ChannelMessageSend(channelID, msg); err != nil { + errs = multierror.Append(errs, fmt.Errorf("while sending Discord message to channel %q: %w", channelID, err)) + continue + } + b.log.Debugf("Message successfully sent to channel %q", channelID) } - b.log.Debugf("Event successfully sent to Discord %v", msg) - return nil + + return errs.ErrorOrNil() } // IntegrationName describes the integration name. @@ -167,51 +180,70 @@ func (b *Discord) Type() config.IntegrationType { return config.BotIntegrationType } -// NotificationsEnabled returns current notification status. -func (b *Discord) NotificationsEnabled() bool { - b.notifyMutex.RLock() - defer b.notifyMutex.RUnlock() - return b.notify +// TODO: Support custom routing via annotations for Discord as well +func (b *Discord) getChannelsToNotify() []string { + // TODO(https://github.com/kubeshop/botkube/issues/596): Support source bindings - filter events here or at source level and pass it every time via event property? + var channelsToNotify []string + for _, channelCfg := range b.getChannels() { + if !channelCfg.notify { + b.log.Info("Skipping notification for channel %q as notifications are disabled.", channelCfg.Identifier()) + continue + } + + channelsToNotify = append(channelsToNotify, channelCfg.Identifier()) + } + return channelsToNotify } -// SetNotificationsEnabled sets a new notification status. -func (b *Discord) SetNotificationsEnabled(enabled bool) error { +// NotificationsEnabled returns current notification status for a given channel ID. +func (b *Discord) NotificationsEnabled(channelID string) bool { + channel, exists := b.getChannels()[channelID] + if !exists { + return false + } + + return channel.notify +} + +// SetNotificationsEnabled sets a new notification status for a given channel ID. +func (b *Discord) SetNotificationsEnabled(channelID string, enabled bool) error { + // avoid race conditions with using the setter concurrently, as we set whole map b.notifyMutex.Lock() defer b.notifyMutex.Unlock() - b.notify = enabled + + channels := b.getChannels() + channel, exists := channels[channelID] + if !exists { + return execute.ErrNotificationsNotConfigured + } + + channel.notify = enabled + channels[channelID] = channel + b.setChannels(channels) + return nil } // HandleMessage handles the incoming messages. func (dm *discordMessage) HandleMessage(b *Discord) { - // Serve only if starts with mention - if !strings.HasPrefix(dm.Event.Content, "<@!"+dm.BotID+"> ") && !strings.HasPrefix(dm.Event.Content, "<@"+dm.BotID+"> ") { + // Handle message only if starts with mention + trimmedMsg, found := b.findAndTrimBotMention(dm.Event.Content) + if !found { + b.log.Debugf("Ignoring message as it doesn't contain %q mention", b.botID) return } + dm.Request = trimmedMsg - // Serve only if current channel is in config - if b.ChannelID == dm.Event.ChannelID { - dm.IsAuthChannel = true - } - - // Trim the @BotKube prefix - if strings.HasPrefix(dm.Event.Content, "<@!"+dm.BotID+"> ") { - dm.Request = strings.TrimPrefix(dm.Event.Content, "<@!"+dm.BotID+"> ") - } else if strings.HasPrefix(dm.Event.Content, "<@"+dm.BotID+"> ") { - dm.Request = strings.TrimPrefix(dm.Event.Content, "<@"+dm.BotID+"> ") - } - - if len(dm.Request) == 0 { - return - } + channel, exists := b.getChannels()[dm.Event.ChannelID] + dm.IsAuthChannel = exists - e := dm.executorFactory.NewDefault(b.IntegrationName(), b, dm.IsAuthChannel, dm.Request) + e := dm.executorFactory.NewDefault(b.IntegrationName(), b, dm.IsAuthChannel, channel.Identifier(), channel.Bindings.Executors, dm.Request) - dm.Response = e.Execute(b.ExecutorBindings) + dm.Response = e.Execute() dm.Send() } -func (dm discordMessage) Send() { +func (dm *discordMessage) Send() { dm.log.Debugf("Discord incoming Request: %s", dm.Request) dm.log.Debugf("Discord Response: %s", dm.Response) @@ -241,3 +273,44 @@ func (dm discordMessage) Send() { dm.log.Error("Error in sending message:", err) } } + +func (b *Discord) getChannels() map[string]channelConfigByID { + b.channelsMutex.RLock() + defer b.channelsMutex.RUnlock() + return b.channels +} + +func (b *Discord) setChannels(channels map[string]channelConfigByID) { + b.channelsMutex.Lock() + defer b.channelsMutex.Unlock() + b.channels = channels +} + +func (b *Discord) findAndTrimBotMention(msg string) (string, bool) { + if !b.botMentionRegex.MatchString(msg) { + return "", false + } + + return b.botMentionRegex.ReplaceAllString(msg, ""), true +} + +func discordChannelsConfigFrom(channelsCfg config.IdentifiableMap[config.ChannelBindingsByID]) map[string]channelConfigByID { + res := make(map[string]channelConfigByID) + for _, channCfg := range channelsCfg { + res[channCfg.Identifier()] = channelConfigByID{ + ChannelBindingsByID: channCfg, + notify: defaultNotifyValue, + } + } + + return res +} + +func discordBotMentionRegex(botID string) (*regexp.Regexp, error) { + botMentionRegex, err := regexp.Compile(fmt.Sprintf(discordBotMentionRegexFmt, botID)) + if err != nil { + return nil, fmt.Errorf("while compiling bot mention regex: %w", err) + } + + return botMentionRegex, nil +} diff --git a/pkg/bot/discord_fmt.go b/pkg/bot/discord_fmt.go index 467e5f12a..2fb7676e2 100644 --- a/pkg/bot/discord_fmt.go +++ b/pkg/bot/discord_fmt.go @@ -10,10 +10,10 @@ import ( formatx "github.com/kubeshop/botkube/pkg/format" ) -func (b *Discord) formatMessage(event events.Event, notification config.Notification) discordgo.MessageSend { +func (b *Discord) formatMessage(event events.Event) discordgo.MessageSend { var messageEmbed discordgo.MessageEmbed - switch notification.Type { + switch b.notification.Type { case config.LongNotification: // generate Long notification message messageEmbed = b.longNotification(event) diff --git a/pkg/bot/discord_test.go b/pkg/bot/discord_test.go new file mode 100644 index 000000000..b483d7b52 --- /dev/null +++ b/pkg/bot/discord_test.go @@ -0,0 +1,57 @@ +package bot + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestDiscord_FindAndTrimBotMention(t *testing.T) { + /// given + botID := "976786722706821120" + testCases := []struct { + Name string + Input string + ExpectedTrimmedMsg string + ExpectedFound bool + }{ + { + Name: "Mention", + Input: "<@976786722706821120> get pods", + ExpectedFound: true, + ExpectedTrimmedMsg: " get pods", + }, + { + Name: "Nickname", + Input: "<@!976786722706821120> get pods", + ExpectedFound: true, + ExpectedTrimmedMsg: " get pods", + }, + { + Name: "Not at the beginning", + Input: "Not at the beginning <@!976786722706821120> get pods", + ExpectedFound: false, + }, + { + Name: "Different mention", + Input: "<@97678> get pods", + ExpectedFound: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.Name, func(t *testing.T) { + botMentionRegex, err := discordBotMentionRegex(botID) + require.NoError(t, err) + b := &Discord{botMentionRegex: botMentionRegex} + + // when + actualTrimmedMsg, actualFound := b.findAndTrimBotMention(tc.Input) + + // then + assert.Equal(t, tc.ExpectedFound, actualFound) + assert.Equal(t, tc.ExpectedTrimmedMsg, actualTrimmedMsg) + }) + } +} diff --git a/pkg/bot/mattermost.go b/pkg/bot/mattermost.go index f278609d0..67f8deb2e 100644 --- a/pkg/bot/mattermost.go +++ b/pkg/bot/mattermost.go @@ -2,11 +2,9 @@ package bot import ( "context" - "encoding/json" "fmt" "net/url" "regexp" - "strconv" "strings" "sync" @@ -15,6 +13,7 @@ import ( "github.com/kubeshop/botkube/pkg/config" "github.com/kubeshop/botkube/pkg/events" + "github.com/kubeshop/botkube/pkg/execute" formatx "github.com/kubeshop/botkube/pkg/format" "github.com/kubeshop/botkube/pkg/multierror" ) @@ -26,21 +25,14 @@ import ( var _ Bot = &Mattermost{} -// mmChannelType to find Mattermost channel type -type mmChannelType string - -const ( - mmChannelPrivate mmChannelType = "P" - mmChannelPublic mmChannelType = "O" -) - const ( // WebSocketProtocol stores protocol initials for web socket WebSocketProtocol = "ws://" // WebSocketSecureProtocol stores protocol initials for web socket WebSocketSecureProtocol = "wss://" - httpsScheme = "https" + httpsScheme = "https" + mattermostBotMentionRegexFmt = "^@(?i)%s" ) // TODO: @@ -52,19 +44,17 @@ type Mattermost struct { log logrus.FieldLogger executorFactory ExecutorFactory reporter AnalyticsReporter - notifyMutex sync.RWMutex - notify bool - - Notification config.Notification - Token string - BotName string - TeamName string - ChannelID string - ServerURL string - WebSocketURL string - WSClient *model.WebSocketClient - APIClient *model.Client4 - ExecutorBindings []string + notification config.Notification + serverURL string + botName string + teamName string + webSocketURL string + wsClient *model.WebSocketClient + apiClient *model.Client4 + channelsMutex sync.RWMutex + channels map[string]channelConfigByID + notifyMutex sync.Mutex + botMentionRegex *regexp.Regexp } // mattermostMessage contains message details to execute command and send back the result @@ -80,12 +70,15 @@ type mattermostMessage struct { } // NewMattermost creates a new Mattermost instance. -func NewMattermost(log logrus.FieldLogger, c *config.Config, executorFactory ExecutorFactory, reporter AnalyticsReporter) (*Mattermost, error) { - mattermost := c.Communications.GetFirst().Mattermost +func NewMattermost(log logrus.FieldLogger, cfg config.Mattermost, executorFactory ExecutorFactory, reporter AnalyticsReporter) (*Mattermost, error) { + botMentionRegex, err := mattermostBotMentionRegex(cfg.BotName) + if err != nil { + return nil, err + } - checkURL, err := url.Parse(mattermost.URL) + checkURL, err := url.Parse(cfg.URL) if err != nil { - return nil, fmt.Errorf("while parsing Mattermost URL %q: %w", mattermost.URL, err) + return nil, fmt.Errorf("while parsing Mattermost URL %q: %w", cfg.URL, err) } // Create WebSocketClient and handle messages @@ -94,32 +87,31 @@ func NewMattermost(log logrus.FieldLogger, c *config.Config, executorFactory Exe webSocketURL = WebSocketSecureProtocol + checkURL.Host } - client := model.NewAPIv4Client(mattermost.URL) - client.SetOAuthToken(mattermost.Token) + client := model.NewAPIv4Client(cfg.URL) + client.SetOAuthToken(cfg.Token) - botTeam, resp := client.GetTeamByName(mattermost.Team, "") + botTeam, resp := client.GetTeamByName(cfg.Team, "") if resp.Error != nil { - return nil, resp.Error + return nil, fmt.Errorf("while getting team by name: %w", resp.Error) } - channel, resp := client.GetChannelByName(mattermost.Channels.GetFirst().Name, botTeam.Id, "") - if resp.Error != nil { - return nil, resp.Error + + channelsByIDCfg, err := mattermostChannelsCfgFrom(client, botTeam.Id, cfg.Channels) + if err != nil { + return nil, fmt.Errorf("while producing channels configuration map by ID: %w", err) } return &Mattermost{ - log: log, - executorFactory: executorFactory, - reporter: reporter, - notify: true, // enabled by default - Notification: mattermost.Notification, - ServerURL: mattermost.URL, - BotName: mattermost.BotName, - Token: mattermost.Token, - TeamName: mattermost.Team, - ChannelID: channel.Id, - ExecutorBindings: mattermost.Channels.GetFirst().Bindings.Executors, - APIClient: client, - WebSocketURL: webSocketURL, + log: log, + executorFactory: executorFactory, + reporter: reporter, + notification: cfg.Notification, + serverURL: cfg.URL, + botName: cfg.BotName, + teamName: cfg.Team, + apiClient: client, + webSocketURL: webSocketURL, + channels: channelsByIDCfg, + botMentionRegex: botMentionRegex, }, nil } @@ -130,7 +122,7 @@ func (b *Mattermost) Start(ctx context.Context) error { // Check connection to Mattermost server err := b.checkServerConnection() if err != nil { - return fmt.Errorf("while pinging Mattermost server %q: %w", b.ServerURL, err) + return fmt.Errorf("while pinging Mattermost server %q: %w", b.serverURL, err) } err = b.reporter.ReportBotEnabled(b.IntegrationName()) @@ -149,7 +141,7 @@ func (b *Mattermost) Start(ctx context.Context) error { return nil default: var appErr *model.AppError - b.WSClient, appErr = model.NewWebSocketClient4(b.WebSocketURL, b.APIClient.AuthToken) + b.wsClient, appErr = model.NewWebSocketClient4(b.webSocketURL, b.apiClient.AuthToken) if appErr != nil { return fmt.Errorf("while creating WebSocket connection: %w", appErr) } @@ -168,45 +160,53 @@ func (b *Mattermost) Type() config.IntegrationType { return config.BotIntegrationType } -// NotificationsEnabled returns current notification status. -func (b *Mattermost) NotificationsEnabled() bool { - b.notifyMutex.RLock() - defer b.notifyMutex.RUnlock() - return b.notify +// NotificationsEnabled returns current notification status for a given channel ID. +func (b *Mattermost) NotificationsEnabled(channelID string) bool { + channel, exists := b.getChannels()[channelID] + if !exists { + return false + } + + return channel.notify } -// SetNotificationsEnabled sets a new notification status. -func (b *Mattermost) SetNotificationsEnabled(enabled bool) error { +// SetNotificationsEnabled sets a new notification status for a given channel ID. +func (b *Mattermost) SetNotificationsEnabled(channelID string, enabled bool) error { + // avoid race conditions with using the setter concurrently, as we set whole map b.notifyMutex.Lock() defer b.notifyMutex.Unlock() - b.notify = enabled + + channels := b.getChannels() + channel, exists := channels[channelID] + if !exists { + return execute.ErrNotificationsNotConfigured + } + + channel.notify = enabled + channels[channelID] = channel + b.setChannels(channels) + return nil } // Check incoming message and take action func (mm *mattermostMessage) handleMessage(b *Mattermost) { post := model.PostFromJson(strings.NewReader(mm.Event.Data["post"].(string))) - channelType := mmChannelType(mm.Event.Data["channel_type"].(string)) - if channelType == mmChannelPrivate || channelType == mmChannelPublic { - // Message posted in a channel - // Serve only if starts with mention - if !strings.HasPrefix(strings.ToLower(post.Message), fmt.Sprintf("@%s ", strings.ToLower(b.BotName))) { - return - } - } - // Check if message posted in authenticated channel - if mm.Event.Broadcast.ChannelId == b.ChannelID { - mm.IsAuthChannel = true + // Handle message only if starts with mention + trimmedMsg, found := b.findAndTrimBotMention(post.Message) + if !found { + b.log.Debugf("Ignoring message as it doesn't contain %q mention", b.botName) + return } - mm.log.Debugf("Received mattermost event: %+v", mm.Event.Data) + mm.Request = trimmedMsg - // remove @BotKube prefix if exists - r := regexp.MustCompile(`^(?i)@BotKube `) - mm.Request = r.ReplaceAllString(post.Message, ``) + channelID := mm.Event.Broadcast.ChannelId + channel, exists := b.getChannels()[channelID] + mm.IsAuthChannel = exists - e := mm.executorFactory.NewDefault(b.IntegrationName(), b, mm.IsAuthChannel, mm.Request) - mm.Response = e.Execute(b.ExecutorBindings) + e := mm.executorFactory.NewDefault(b.IntegrationName(), b, mm.IsAuthChannel, channelID, channel.Bindings.Executors, mm.Request) + mm.Response = e.Execute() mm.sendMessage() } @@ -240,12 +240,12 @@ func (mm mattermostMessage) sendMessage() { // Check if Mattermost server is reachable func (b *Mattermost) checkServerConnection() error { // Check api connection - if _, resp := b.APIClient.GetOldClientConfig(""); resp.Error != nil { + if _, resp := b.apiClient.GetOldClientConfig(""); resp.Error != nil { return resp.Error } // Get channel list - _, resp := b.APIClient.GetTeamByName(b.TeamName, "") + _, resp := b.apiClient.GetTeamByName(b.teamName, "") if resp.Error != nil { return resp.Error } @@ -254,34 +254,34 @@ func (b *Mattermost) checkServerConnection() error { // Check if team exists in Mattermost func (b *Mattermost) getTeam() *model.Team { - botTeam, resp := b.APIClient.GetTeamByName(b.TeamName, "") + botTeam, resp := b.apiClient.GetTeamByName(b.teamName, "") if resp.Error != nil { - b.log.Fatalf("There was a problem finding Mattermost team %s. %s", b.TeamName, resp.Error) + b.log.Fatalf("There was a problem finding Mattermost team %s. %s", b.teamName, resp.Error) } return botTeam } // Check if BotKube user exists in Mattermost func (b *Mattermost) getUser() *model.User { - users, resp := b.APIClient.AutocompleteUsersInTeam(b.getTeam().Id, b.BotName, 1, "") + users, resp := b.apiClient.AutocompleteUsersInTeam(b.getTeam().Id, b.botName, 1, "") if resp.Error != nil { - b.log.Fatalf("There was a problem finding Mattermost user %s. %s", b.BotName, resp.Error) + b.log.Fatalf("There was a problem finding Mattermost user %s. %s", b.botName, resp.Error) } return users.Users[0] } func (b *Mattermost) listen(ctx context.Context) { - b.WSClient.Listen() - defer b.WSClient.Close() + b.wsClient.Listen() + defer b.wsClient.Close() for { select { case <-ctx.Done(): b.log.Info("Shutdown requested. Finishing...") return - case event, ok := <-b.WSClient.EventChannel: + case event, ok := <-b.wsClient.EventChannel: if !ok { - if b.WSClient.ListenError != nil { - b.log.Debugf("while listening on websocket connection: %s", b.WSClient.ListenError.Error()) + if b.wsClient.ListenError != nil { + b.log.Debugf("while listening on websocket connection: %s", b.wsClient.ListenError.Error()) } b.log.Info("Incoming events channel closed. Finishing...") @@ -309,7 +309,7 @@ func (b *Mattermost) listen(ctx context.Context) { executorFactory: b.executorFactory, Event: event, IsAuthChannel: false, - APIClient: b.APIClient, + APIClient: b.apiClient, } mm.handleMessage(b) } @@ -317,92 +317,113 @@ func (b *Mattermost) listen(ctx context.Context) { } // SendEvent sends event notification to Mattermost -func (b *Mattermost) SendEvent(ctx context.Context, event events.Event) error { - if !b.notify { - b.log.Info("Notifications are disabled. Skipping event...") - return nil - } - +func (b *Mattermost) SendEvent(_ context.Context, event events.Event) error { b.log.Debugf(">> Sending to Mattermost: %+v", event) + attachment := b.formatAttachments(event) + + errs := multierror.New() + for _, channelID := range b.getChannelsToNotify(event) { + post := &model.Post{ + Props: map[string]interface{}{ + "attachments": attachment, + }, + ChannelId: channelID, + } - var fields []*model.SlackAttachmentField + _, resp := b.apiClient.CreatePost(post) + if resp.Error != nil { + errs = multierror.Append(errs, fmt.Errorf("while posting message to channel %q: %w", channelID, resp.Error)) + continue + } - switch b.Notification.Type { - case config.LongNotification: - fields = b.longNotification(event) - case config.ShortNotification: - fallthrough + b.log.Debugf("Event successfully sent to channel %q", post.ChannelId) + } - default: - // set missing cluster name to event object - fields = b.shortNotification(event) + return errs.ErrorOrNil() +} + +func (b *Mattermost) getChannelsToNotify(event events.Event) []string { + if event.Channel != "" { + return []string{event.Channel} } - attachment := []*model.SlackAttachment{ - { - Color: attachmentColor[event.Level], - Title: event.Title, - Fields: fields, - Footer: "BotKube", - Timestamp: json.Number(strconv.FormatInt(event.TimeStamp.Unix(), 10)), - }, + // TODO(https://github.com/kubeshop/botkube/issues/596): Support source bindings - filter events here or at source level and pass it every time via event property? + var channelsToNotify []string + for _, channelCfg := range b.getChannels() { + if !channelCfg.notify { + b.log.Info("Skipping notification for channel %q as notifications are disabled.", channelCfg.Identifier()) + continue + } + + channelsToNotify = append(channelsToNotify, channelCfg.Identifier()) } + return channelsToNotify +} + +// SendMessage sends message to Mattermost channel +func (b *Mattermost) SendMessage(_ context.Context, msg string) error { + errs := multierror.New() + for _, channel := range b.getChannels() { + channelID := channel.ID + b.log.Debugf(">> Sending message to channel %q: %+v", channelID, msg) + post := &model.Post{ + ChannelId: channelID, + Message: msg, + } + if _, resp := b.apiClient.CreatePost(post); resp.Error != nil { + errs = multierror.Append(errs, fmt.Errorf("while creating a post: %w", resp.Error)) + } - targetChannel := event.Channel - if targetChannel == "" { - // empty value in event.channel sends notifications to default channel. - targetChannel = b.ChannelID + b.log.Debugf("Message successfully sent to channel %q", channelID) } - isDefaultChannel := targetChannel == b.ChannelID + return errs.ErrorOrNil() +} - post := &model.Post{ - Props: map[string]interface{}{ - "attachments": attachment, - }, - ChannelId: targetChannel, +func (b *Mattermost) findAndTrimBotMention(msg string) (string, bool) { + if !b.botMentionRegex.MatchString(msg) { + return "", false } - _, resp := b.APIClient.CreatePost(post) - if resp.Error != nil { - createPostWrappedErr := fmt.Errorf("while posting message to channel %q: %w", targetChannel, resp.Error) + return b.botMentionRegex.ReplaceAllString(msg, ""), true +} - if isDefaultChannel { - return createPostWrappedErr - } +func (b *Mattermost) getChannels() map[string]channelConfigByID { + b.channelsMutex.RLock() + defer b.channelsMutex.RUnlock() + return b.channels +} - // fallback to default channel +func (b *Mattermost) setChannels(channels map[string]channelConfigByID) { + b.channelsMutex.Lock() + defer b.channelsMutex.Unlock() + b.channels = channels +} - // send error message to default channel - msg := fmt.Sprintf("Unable to send message to channel %q: `%s`\n```add Botkube app to the channel %q\nMissed events follows below:```", targetChannel, resp.Error, targetChannel) - sendMessageErr := b.SendMessage(ctx, msg) - if sendMessageErr != nil { - return multierror.Append(createPostWrappedErr, sendMessageErr) +func mattermostChannelsCfgFrom(client *model.Client4, teamID string, channelsCfg config.IdentifiableMap[config.ChannelBindingsByName]) (map[string]channelConfigByID, error) { + res := make(map[string]channelConfigByID) + for _, channCfg := range channelsCfg { + fetchedChannel, resp := client.GetChannelByName(channCfg.Identifier(), teamID, "") + if resp.Error != nil { + return nil, fmt.Errorf("while getting channel by name %q: %w", channCfg.Name, resp.Error) } - // sending missed event to default channel - // reset channel, so it will be defaulted - event.Channel = "" - sendEventErr := b.SendEvent(ctx, event) - if sendEventErr != nil { - return multierror.Append(createPostWrappedErr, sendEventErr) + res[fetchedChannel.Id] = channelConfigByID{ + ChannelBindingsByID: config.ChannelBindingsByID{ + ID: fetchedChannel.Id, + Bindings: channCfg.Bindings, + }, + notify: defaultNotifyValue, } - - return createPostWrappedErr } - b.log.Debugf("Event successfully sent to channel %q", post.ChannelId) - return nil + return res, nil } -// SendMessage sends message to Mattermost channel -func (b *Mattermost) SendMessage(_ context.Context, msg string) error { - post := &model.Post{ - ChannelId: b.ChannelID, - Message: msg, - } - if _, resp := b.APIClient.CreatePost(post); resp.Error != nil { - return fmt.Errorf("while creating a post: %w", resp.Error) +func mattermostBotMentionRegex(botName string) (*regexp.Regexp, error) { + botMentionRegex, err := regexp.Compile(fmt.Sprintf(mattermostBotMentionRegexFmt, botName)) + if err != nil { + return nil, fmt.Errorf("while compiling bot mention regex: %w", err) } - return nil + return botMentionRegex, nil } diff --git a/pkg/bot/mattermost_fmt.go b/pkg/bot/mattermost_fmt.go index 4b0a83ce2..7f02590af 100644 --- a/pkg/bot/mattermost_fmt.go +++ b/pkg/bot/mattermost_fmt.go @@ -1,12 +1,39 @@ package bot import ( + "encoding/json" + "strconv" + "github.com/mattermost/mattermost-server/v5/model" + "github.com/kubeshop/botkube/pkg/config" "github.com/kubeshop/botkube/pkg/events" formatx "github.com/kubeshop/botkube/pkg/format" ) +func (b *Mattermost) formatAttachments(event events.Event) []*model.SlackAttachment { + var fields []*model.SlackAttachmentField + switch b.notification.Type { + case config.LongNotification: + fields = b.longNotification(event) + case config.ShortNotification: + fallthrough + default: + // set missing cluster name to the event object + fields = b.shortNotification(event) + } + + return []*model.SlackAttachment{ + { + Color: attachmentColor[event.Level], + Title: event.Title, + Fields: fields, + Footer: "BotKube", + Timestamp: json.Number(strconv.FormatInt(event.TimeStamp.Unix(), 10)), + }, + } +} + func (b *Mattermost) longNotification(event events.Event) []*model.SlackAttachmentField { fields := []*model.SlackAttachmentField{ { diff --git a/pkg/bot/mattermost_test.go b/pkg/bot/mattermost_test.go new file mode 100644 index 000000000..4667052d9 --- /dev/null +++ b/pkg/bot/mattermost_test.go @@ -0,0 +1,64 @@ +package bot + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestMattermost_FindAndTrimBotMention(t *testing.T) { + /// given + botName := "BotKube" + testCases := []struct { + Name string + Input string + ExpectedTrimmedMsg string + ExpectedFound bool + }{ + { + Name: "Mention", + Input: "@BotKube get pods", + ExpectedFound: true, + ExpectedTrimmedMsg: " get pods", + }, + { + Name: "Lowercase", + Input: "@botkube get pods", + ExpectedFound: true, + ExpectedTrimmedMsg: " get pods", + }, + { + Name: "Yet another different casing", + Input: "@BOTKUBE get pods", + ExpectedFound: true, + ExpectedTrimmedMsg: " get pods", + }, + { + Name: "Not at the beginning", + Input: "Not at the beginning @BotKube get pods", + ExpectedFound: false, + }, + { + Name: "Different mention", + Input: "@bootkube get pods", + ExpectedFound: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.Name, func(t *testing.T) { + botMentionRegex, err := mattermostBotMentionRegex(botName) + require.NoError(t, err) + b := &Mattermost{botMentionRegex: botMentionRegex} + require.NoError(t, err) + + // when + actualTrimmedMsg, actualFound := b.findAndTrimBotMention(tc.Input) + + // then + assert.Equal(t, tc.ExpectedFound, actualFound) + assert.Equal(t, tc.ExpectedTrimmedMsg, actualTrimmedMsg) + }) + } +} diff --git a/pkg/bot/slack.go b/pkg/bot/slack.go index ba07045bd..9f09ba534 100644 --- a/pkg/bot/slack.go +++ b/pkg/bot/slack.go @@ -3,7 +3,7 @@ package bot import ( "context" "fmt" - "strings" + "regexp" "sync" "github.com/sirupsen/logrus" @@ -12,6 +12,7 @@ import ( "github.com/kubeshop/botkube/internal/analytics" "github.com/kubeshop/botkube/pkg/config" "github.com/kubeshop/botkube/pkg/events" + "github.com/kubeshop/botkube/pkg/execute" formatx "github.com/kubeshop/botkube/pkg/format" "github.com/kubeshop/botkube/pkg/multierror" ) @@ -23,10 +24,7 @@ import ( var _ Bot = &Slack{} -const ( - sendFailureMessageFmt = "Unable to send message to channel `%s`: `%s`\n```add Botkube app to the channel %s\nMissed events follows below:```" - channelNotFoundCode = "channel_not_found" -) +const slackBotMentionPrefixFmt = "^<@%s>" var attachmentColor = map[config.Level]string{ config.Info: "good", @@ -41,14 +39,13 @@ type Slack struct { log logrus.FieldLogger executorFactory ExecutorFactory reporter FatalErrorAnalyticsReporter - notifyMutex sync.RWMutex - notify bool botID string - - Client *slack.Client - Notification config.Notification - ChannelName string - ExecutorBindings []string + client *slack.Client + notification config.Notification + channelsMutex sync.RWMutex + channels map[string]channelConfigByName + notifyMutex sync.Mutex + botMentionRegex *regexp.Regexp } // slackMessage contains message details to execute command and send back the result @@ -56,19 +53,16 @@ type slackMessage struct { log logrus.FieldLogger executorFactory ExecutorFactory - Event *slack.MessageEvent - BotID string - Request string - Response string - IsAuthChannel bool - RTM *slack.RTM + Event *slack.MessageEvent + BotID string + Request string + Response string + RTM *slack.RTM } // NewSlack creates a new Slack instance. -func NewSlack(log logrus.FieldLogger, c *config.Config, executorFactory ExecutorFactory, reporter FatalErrorAnalyticsReporter) (*Slack, error) { - slackCfg := c.Communications.GetFirst().Slack - - client := slack.New(slackCfg.Token) +func NewSlack(log logrus.FieldLogger, cfg config.Slack, executorFactory ExecutorFactory, reporter FatalErrorAnalyticsReporter) (*Slack, error) { + client := slack.New(cfg.Token) authResp, err := client.AuthTest() if err != nil { @@ -76,16 +70,25 @@ func NewSlack(log logrus.FieldLogger, c *config.Config, executorFactory Executor } botID := authResp.UserID + botMentionRegex, err := slackBotMentionRegex(botID) + if err != nil { + return nil, err + } + + channels := slackChannelsConfigFrom(cfg.Channels) + if err != nil { + return nil, fmt.Errorf("while producing channels configuration map by ID: %w", err) + } + return &Slack{ - log: log, - executorFactory: executorFactory, - reporter: reporter, - notify: true, // enabled by default - botID: botID, - Client: client, - Notification: slackCfg.Notification, - ChannelName: slackCfg.Channels.GetFirst().Name, - ExecutorBindings: slackCfg.Channels.GetFirst().Bindings.Executors, + log: log, + executorFactory: executorFactory, + reporter: reporter, + botID: botID, + client: client, + notification: cfg.Notification, + channels: channels, + botMentionRegex: botMentionRegex, }, nil } @@ -93,7 +96,7 @@ func NewSlack(log logrus.FieldLogger, c *config.Config, executorFactory Executor func (b *Slack) Start(ctx context.Context) error { b.log.Info("Starting bot") - rtm := b.Client.NewRTM() + rtm := b.client.NewRTM() go func() { defer analytics.ReportPanicIfOccurs(b.log, b.reporter) rtm.ManageConnection() @@ -172,48 +175,58 @@ func (b *Slack) IntegrationName() config.CommPlatformIntegration { return config.SlackCommPlatformIntegration } -// NotificationsEnabled returns current notification status. -func (b *Slack) NotificationsEnabled() bool { - b.notifyMutex.RLock() - defer b.notifyMutex.RUnlock() - return b.notify +// NotificationsEnabled returns current notification status for a given channel name. +func (b *Slack) NotificationsEnabled(channelName string) bool { + channel, exists := b.getChannels()[channelName] + if !exists { + return false + } + + return channel.notify } -// SetNotificationsEnabled sets a new notification status. -func (b *Slack) SetNotificationsEnabled(enabled bool) error { +// SetNotificationsEnabled sets a new notification status for a given channel name. +func (b *Slack) SetNotificationsEnabled(channelName string, enabled bool) error { + // avoid race conditions with using the setter concurrently, as we set whole map b.notifyMutex.Lock() defer b.notifyMutex.Unlock() - b.notify = enabled + + channels := b.getChannels() + channel, exists := channels[channelName] + if !exists { + return execute.ErrNotificationsNotConfigured + } + + channel.notify = enabled + channels[channelName] = channel + b.setChannels(channels) + return nil } func (sm *slackMessage) HandleMessage(b *Slack) error { - // Check if message posted in authenticated channel - info, err := b.Client.GetConversationInfo(sm.Event.Channel, true) - if err == nil { - if info.IsChannel || info.IsPrivate { - // Message posted in a channel - // Serve only if starts with mention - if !strings.HasPrefix(sm.Event.Text, "<@"+sm.BotID+">") { - sm.log.Debugf("Ignoring message as it doesn't contain %q prefix", sm.BotID) - return nil - } - // Serve only if current channel is in config - if b.ChannelName == info.Name { - sm.IsAuthChannel = true - } - } + // Handle message only if starts with mention + trimmedMsg, found := b.findAndTrimBotMention(sm.Event.Text) + if !found { + b.log.Debugf("Ignoring message as it doesn't contain %q mention", b.botID) + return nil } - // Serve only if current channel is in config - if b.ChannelName == sm.Event.Channel { - sm.IsAuthChannel = true + sm.Request = trimmedMsg + + // Unfortunately we need to do a call for channel name based on ID every time a message arrives. + // I wanted to query for channel IDs based on names and prepare a map in the `slackChannelsConfigFrom`, + // but unfortunately BotKube would need another scope (get all conversations). + // Keeping current way of doing this until we come up with a better idea. + channelID := sm.Event.Channel + info, err := b.client.GetConversationInfo(channelID, true) + if err != nil { + return fmt.Errorf("while getting conversation info: %w", err) } - // Trim the @BotKube prefix - sm.Request = strings.TrimPrefix(sm.Event.Text, "<@"+sm.BotID+">") + channel, isAuthChannel := b.getChannels()[info.Name] - e := sm.executorFactory.NewDefault(b.IntegrationName(), b, sm.IsAuthChannel, sm.Request) - sm.Response = e.Execute(b.ExecutorBindings) + e := sm.executorFactory.NewDefault(b.IntegrationName(), b, isAuthChannel, info.Name, channel.Bindings.Executors, sm.Request) + sm.Response = e.Execute() err = sm.Send() if err != nil { return fmt.Errorf("while sending message: %w", err) @@ -259,61 +272,96 @@ func (sm *slackMessage) Send() error { // SendEvent sends event notification to slack func (b *Slack) SendEvent(ctx context.Context, event events.Event) error { - if !b.notify { - b.log.Info("Notifications are disabled. Skipping event...") - return nil - } + b.log.Debugf(">> Sending to Slack: %+v", event) + attachment := b.formatMessage(event) - b.log.Debugf(">> Sending to slack: %+v", event) - attachment := b.formatMessage(event, b.Notification) + errs := multierror.New() + for _, channelName := range b.getChannelsToNotify(event) { + channelID, timestamp, err := b.client.PostMessageContext(ctx, channelName, slack.MsgOptionAttachments(attachment), slack.MsgOptionAsUser(true)) + if err != nil { + errs = multierror.Append(errs, fmt.Errorf("while posting message to channel %q: %w", channelName, err)) + continue + } - targetChannel := event.Channel - if targetChannel == "" { - // empty value in event.channel sends notifications to default channel. - targetChannel = b.ChannelName + b.log.Debugf("Event successfully sent to channel %q (ID: %q) at %b", channelName, channelID, timestamp) } - isDefaultChannel := targetChannel == b.ChannelName - channelID, timestamp, err := b.Client.PostMessage(targetChannel, slack.MsgOptionAttachments(attachment), slack.MsgOptionAsUser(true)) - if err != nil { - postMessageWrappedErr := fmt.Errorf("while posting message to channel %q: %w", targetChannel, err) + return errs.ErrorOrNil() +} + +func (b *Slack) getChannelsToNotify(event events.Event) []string { + // support custom event routing + if event.Channel != "" { + return []string{event.Channel} + } - if isDefaultChannel || err.Error() != channelNotFoundCode { - return postMessageWrappedErr + // TODO(https://github.com/kubeshop/botkube/issues/596): Support source bindings - filter events here or at source level and pass it every time via event property? + var channelsToNotify []string + for _, channelCfg := range b.getChannels() { + if !channelCfg.notify { + b.log.Info("Skipping notification for channel %q as notifications are disabled.", channelCfg.Identifier()) + continue } - // channel not found, fallback to default channel + channelsToNotify = append(channelsToNotify, channelCfg.Identifier()) + } + return channelsToNotify +} - // send error message to default channel - msg := fmt.Sprintf(sendFailureMessageFmt, targetChannel, err.Error(), targetChannel) - sendMessageErr := b.SendMessage(ctx, msg) - if sendMessageErr != nil { - return multierror.Append(postMessageWrappedErr, sendMessageErr) +// SendMessage sends message to slack channel +func (b *Slack) SendMessage(ctx context.Context, msg string) error { + errs := multierror.New() + for _, channel := range b.getChannels() { + channelName := channel.Name + b.log.Debugf(">> Sending message to channel %q: %+v", channelName, msg) + channelID, timestamp, err := b.client.PostMessageContext(ctx, channelName, slack.MsgOptionText(msg, false), slack.MsgOptionAsUser(true)) + if err != nil { + errs = multierror.Append(errs, fmt.Errorf("while sending Slack message to channel %q: %w", channelName, err)) + continue } + b.log.Debugf("Message successfully sent to channel %b at %b", channelID, timestamp) + } - // sending missed event to default channel - // reset channel, so it will be defaulted - event.Channel = "" - sendEventErr := b.SendEvent(ctx, event) - if sendEventErr != nil { - return multierror.Append(postMessageWrappedErr, sendEventErr) - } + return errs.ErrorOrNil() +} - return postMessageWrappedErr +func (b *Slack) getChannels() map[string]channelConfigByName { + b.channelsMutex.RLock() + defer b.channelsMutex.RUnlock() + return b.channels +} + +func (b *Slack) setChannels(channels map[string]channelConfigByName) { + b.channelsMutex.Lock() + defer b.channelsMutex.Unlock() + b.channels = channels +} + +func (b *Slack) findAndTrimBotMention(msg string) (string, bool) { + if !b.botMentionRegex.MatchString(msg) { + return "", false } - b.log.Debugf("Event successfully sent to channel %q at %b", channelID, timestamp) - return nil + return b.botMentionRegex.ReplaceAllString(msg, ""), true } -// SendMessage sends message to slack channel -func (b *Slack) SendMessage(ctx context.Context, msg string) error { - b.log.Debugf(">> Sending to slack: %+v", msg) - channelID, timestamp, err := b.Client.PostMessageContext(ctx, b.ChannelName, slack.MsgOptionText(msg, false), slack.MsgOptionAsUser(true)) +func slackChannelsConfigFrom(channelsCfg config.IdentifiableMap[config.ChannelBindingsByName]) map[string]channelConfigByName { + channels := make(map[string]channelConfigByName) + for _, channCfg := range channelsCfg { + channels[channCfg.Identifier()] = channelConfigByName{ + ChannelBindingsByName: channCfg, + notify: defaultNotifyValue, + } + } + + return channels +} + +func slackBotMentionRegex(botID string) (*regexp.Regexp, error) { + botMentionRegex, err := regexp.Compile(fmt.Sprintf(slackBotMentionPrefixFmt, botID)) if err != nil { - return fmt.Errorf("while sending Slack message to channel %q: %w", b.ChannelName, err) + return nil, fmt.Errorf("while compiling bot mention regex: %w", err) } - b.log.Debugf("Message successfully sent to channel %b at %b", channelID, timestamp) - return nil + return botMentionRegex, nil } diff --git a/pkg/bot/slack_fmt.go b/pkg/bot/slack_fmt.go index c688ea05d..2798456a8 100644 --- a/pkg/bot/slack_fmt.go +++ b/pkg/bot/slack_fmt.go @@ -12,16 +12,15 @@ import ( formatx "github.com/kubeshop/botkube/pkg/format" ) -func (b *Slack) formatMessage(event events.Event, notification config.Notification) (attachment slack.Attachment) { - switch notification.Type { +func (b *Slack) formatMessage(event events.Event) slack.Attachment { + var attachment slack.Attachment + + switch b.notification.Type { case config.LongNotification: attachment = b.longNotification(event) - case config.ShortNotification: fallthrough - default: - // set missing cluster name to an event object attachment = b.shortNotification(event) } diff --git a/pkg/bot/slack_test.go b/pkg/bot/slack_test.go new file mode 100644 index 000000000..b7ad553c4 --- /dev/null +++ b/pkg/bot/slack_test.go @@ -0,0 +1,52 @@ +package bot + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestSlack_FindAndTrimBotMention(t *testing.T) { + /// given + botName := "BotKube" + testCases := []struct { + Name string + Input string + ExpectedTrimmedMsg string + ExpectedFound bool + }{ + { + Name: "Mention", + Input: "<@BotKube> get pods", + ExpectedFound: true, + ExpectedTrimmedMsg: " get pods", + }, + { + Name: "Not at the beginning", + Input: "Not at the beginning <@BotKube> get pods", + ExpectedFound: false, + }, + { + Name: "Different mention", + Input: "<@bootkube> get pods", + ExpectedFound: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.Name, func(t *testing.T) { + botMentionRegex, err := slackBotMentionRegex(botName) + require.NoError(t, err) + b := &Slack{botMentionRegex: botMentionRegex} + require.NoError(t, err) + + // when + actualTrimmedMsg, actualFound := b.findAndTrimBotMention(tc.Input) + + // then + assert.Equal(t, tc.ExpectedFound, actualFound) + assert.Equal(t, tc.ExpectedTrimmedMsg, actualTrimmedMsg) + }) + } +} diff --git a/pkg/bot/teams.go b/pkg/bot/teams.go index 69056dcb8..f553546aa 100644 --- a/pkg/bot/teams.go +++ b/pkg/bot/teams.go @@ -7,7 +7,7 @@ import ( "fmt" "net/http" "net/url" - "strings" + "regexp" "sync" "github.com/gorilla/mux" @@ -46,13 +46,25 @@ const ( var _ Bot = &Teams{} +const teamsBotMentionPrefixFmt = "^%s" + +type conversation struct { + ref schema.ConversationReference + notify bool +} + // Teams listens for user's message, execute commands and sends back the response. type Teams struct { log logrus.FieldLogger executorFactory ExecutorFactory reporter AnalyticsReporter - notifyMutex sync.RWMutex - notify bool + // TODO: Be consistent with other communicators when Teams supports multiple channels + //channels map[string][ChannelBindingsByName] + bindings config.BotBindings + conversationsMutex sync.RWMutex + conversations map[string]conversation + notifyMutex sync.Mutex + botMentionRegex *regexp.Regexp BotName string AppID string @@ -62,10 +74,6 @@ type Teams struct { ClusterName string Notification config.Notification Adapter core.Adapter - - conversationRefMutex sync.RWMutex - conversationRef *schema.ConversationReference - ExecutorsBindings []string } type consentContext struct { @@ -73,31 +81,35 @@ type consentContext struct { } // NewTeams creates a new Teams instance. -func NewTeams(log logrus.FieldLogger, c *config.Config, executorFactory ExecutorFactory, reporter AnalyticsReporter) *Teams { - teams := c.Communications.GetFirst().Teams +func NewTeams(log logrus.FieldLogger, cfg config.Teams, clusterName string, executorFactory ExecutorFactory, reporter AnalyticsReporter) (*Teams, error) { + botMentionRegex, err := teamsBotMentionRegex(cfg.BotName) + if err != nil { + return nil, err + } - port := teams.Port + port := cfg.Port if port == "" { port = defaultPort } - msgPath := teams.MessagePath + msgPath := cfg.MessagePath if msgPath == "" { msgPath = "/" } return &Teams{ - log: log, - executorFactory: executorFactory, - reporter: reporter, - notify: false, // disabled by default - BotName: teams.BotName, - AppID: teams.AppID, - AppPassword: teams.AppPassword, - Notification: teams.Notification, - MessagePath: msgPath, - Port: port, - ExecutorsBindings: teams.Channels.GetFirst().Bindings.Executors, - ClusterName: c.Settings.ClusterName, - } + log: log, + executorFactory: executorFactory, + reporter: reporter, + BotName: cfg.BotName, + ClusterName: clusterName, + AppID: cfg.AppID, + AppPassword: cfg.AppPassword, + Notification: cfg.Notification, + bindings: cfg.Bindings, + MessagePath: msgPath, + Port: port, + conversations: make(map[string]conversation), + botMentionRegex: botMentionRegex, + }, nil } // Start MS Teams server to serve messages from Teams client @@ -180,7 +192,7 @@ func (b *Teams) processActivity(w http.ResponseWriter, req *http.Request) { OnInvokeFunc: func(turn *coreActivity.TurnContext) (schema.Activity, error) { b.deleteConsent(ctx, turn.Activity.ReplyToID, coreActivity.GetCoversationReference(turn.Activity)) if err != nil { - return schema.Activity{}, fmt.Errorf("failed to read file: %s", err.Error()) + return schema.Activity{}, fmt.Errorf("while reading file: %w", err) } if turn.Activity.Value["type"] != activityFileUpload { return schema.Activity{}, nil @@ -199,32 +211,39 @@ func (b *Teams) processActivity(w http.ResponseWriter, req *http.Request) { return schema.Activity{}, err } if err := json.Unmarshal(infoJSON, &uploadInfo); err != nil { - return schema.Activity{}, err + return schema.Activity{}, fmt.Errorf("while unmarshalling activity: %w", err) } // Parse context consentCtx := consentContext{} ctxJSON, err := json.Marshal(turn.Activity.Value["context"]) if err != nil { - return schema.Activity{}, err + return schema.Activity{}, fmt.Errorf("while marshalling activity context: %w", err) } if err := json.Unmarshal(ctxJSON, &consentCtx); err != nil { - return schema.Activity{}, err + return schema.Activity{}, fmt.Errorf("while unmarshalling activity context: %w", err) + } + + msgWithoutPrefix := b.trimBotMention(consentCtx.Command) + + ref, err := b.getConversationReferenceFrom(activity) + if err != nil { + return schema.Activity{}, fmt.Errorf("while getting conversation reference: %w", err) } - msgPrefix := fmt.Sprintf("%s", b.BotName) - msgWithoutPrefix := strings.TrimPrefix(consentCtx.Command, msgPrefix) - msg := strings.TrimSpace(msgWithoutPrefix) - e := b.executorFactory.NewDefault(b.IntegrationName(), newTeamsNotifMgrForActivity(b, activity), true, msg) - out := e.Execute(b.ExecutorsBindings) + e := b.executorFactory.NewDefault(b.IntegrationName(), newTeamsNotifMgrForActivity(b, ref), true, activity.ChannelID, b.bindings.Executors, msgWithoutPrefix) + out := e.Execute() - actJSON, _ := json.MarshalIndent(turn.Activity, "", " ") + actJSON, err := json.MarshalIndent(turn.Activity, "", " ") + if err != nil { + return schema.Activity{}, fmt.Errorf("while marshalling activity: %w", err) + } b.log.Debugf("Incoming MSTeams Activity: %s", actJSON) // upload file err = b.putRequest(uploadInfo.UploadURL, []byte(out)) if err != nil { - return schema.Activity{}, fmt.Errorf("failed to upload file: %s", err.Error()) + return schema.Activity{}, fmt.Errorf("while uploading file: %w", err) } // notify user about uploaded file @@ -248,14 +267,18 @@ func (b *Teams) processActivity(w http.ResponseWriter, req *http.Request) { } func (b *Teams) processMessage(activity schema.Activity) string { - msgPrefix := fmt.Sprintf("%s", b.BotName) - msgWithoutPrefix := strings.TrimPrefix(activity.Text, msgPrefix) - msg := strings.TrimSpace(msgWithoutPrefix) + trimmedMsg := b.trimBotMention(activity.Text) // Multicluster is not supported for Teams - e := b.executorFactory.NewDefault(b.IntegrationName(), newTeamsNotifMgrForActivity(b, activity), true, msg) - return format.CodeBlock(e.Execute(b.ExecutorsBindings)) + ref, err := b.getConversationReferenceFrom(activity) + if err != nil { + b.log.Errorf("while getting conversation reference: %s", err.Error()) + return "" + } + + e := b.executorFactory.NewDefault(b.IntegrationName(), newTeamsNotifMgrForActivity(b, ref), true, ref.ChannelID, b.bindings.Executors, trimmedMsg) + return format.CodeBlock(e.Execute()) } func (b *Teams) putRequest(u string, data []byte) (err error) { @@ -290,37 +313,43 @@ func (b *Teams) putRequest(u string, data []byte) (err error) { // SendEvent sends event message via Bot interface func (b *Teams) SendEvent(ctx context.Context, event events.Event) error { - if !b.notify { - b.log.Info("Notifications are disabled. Skipping event...") - return nil - } + b.log.Debugf(">> Sending to Teams: %+v", event) card := b.formatMessage(event, b.Notification) - if err := b.sendProactiveMessage(ctx, card); err != nil { - return fmt.Errorf("while sending notification: %w", err) + + errs := multierror.New() + for _, convRef := range b.getConversationRefsToNotify() { + err := b.sendProactiveMessage(ctx, convRef, card) + if err != nil { + errs = multierror.Append(errs, fmt.Errorf("while posting message to channel %q: %w", convRef.ChannelID, err)) + continue + } + + b.log.Debugf("Event successfully sent to channel %q at %b", convRef.ChannelID) } - b.log.Debugf("Event successfully sent to MS Teams >> %+v", event) - return nil + return errs.ErrorOrNil() } // SendMessage sends message to MsTeams func (b *Teams) SendMessage(ctx context.Context, msg string) error { - b.conversationRefMutex.RLock() - defer b.conversationRefMutex.RUnlock() - if b.conversationRef == nil { - b.log.Infof("Skipping SendMessage since conversation ref not set") - return nil - } - err := b.Adapter.ProactiveMessage(ctx, *b.conversationRef, coreActivity.HandlerFuncs{ - OnMessageFunc: func(turn *coreActivity.TurnContext) (schema.Activity, error) { - return turn.SendActivity(coreActivity.MsgOptionText(msg)) - }, - }) - if err != nil { - return err + errs := multierror.New() + for _, convCfg := range b.getConversations() { + channelID := convCfg.ref.ChannelID + + b.log.Debugf(">> Sending message to channel %q: %+v", channelID, msg) + err := b.Adapter.ProactiveMessage(ctx, convCfg.ref, coreActivity.HandlerFuncs{ + OnMessageFunc: func(turn *coreActivity.TurnContext) (schema.Activity, error) { + return turn.SendActivity(coreActivity.MsgOptionText(msg)) + }, + }) + if err != nil { + errs = multierror.Append(errs, fmt.Errorf("while sending Teams message to channel %q: %w", channelID, err)) + continue + } + b.log.Debugf("Message successfully sent to channel %q", channelID) } - b.log.Debug("Message successfully sent to MS Teams") - return nil + + return errs.ErrorOrNil() } // IntegrationName describes the integration name. @@ -333,43 +362,42 @@ func (b *Teams) Type() config.IntegrationType { return config.BotIntegrationType } -// NotificationsEnabled returns current notification status. -func (b *Teams) NotificationsEnabled() bool { - b.notifyMutex.RLock() - defer b.notifyMutex.RUnlock() - return b.notify +// NotificationsEnabled returns current notification status for a given channel ID. +func (b *Teams) NotificationsEnabled(channelID string) bool { + channel, exists := b.getConversations()[channelID] + if !exists { + return false + } + + return channel.notify } -// SetNotificationsEnabled sets a new notification status. -func (b *Teams) SetNotificationsEnabled(enabled bool, activity schema.Activity) error { - if enabled { - b.conversationRefMutex.Lock() - defer b.conversationRefMutex.Unlock() - - // Set conversation reference - ref := coreActivity.GetCoversationReference(activity) - b.conversationRef = &ref - // Remove messageID from the ChannelID - if ID, ok := activity.ChannelData["teamsChannelId"]; ok { - b.conversationRef.ChannelID = ID.(string) - b.conversationRef.Conversation.ID = ID.(string) +// SetNotificationsEnabled sets a new notification status for a given channel ID. +func (b *Teams) SetNotificationsEnabled(enabled bool, ref schema.ConversationReference) error { + // avoid race conditions with using the setter concurrently, as we set whole map + b.notifyMutex.Lock() + defer b.notifyMutex.Unlock() + + conversations := b.getConversations() + conv, exists := conversations[ref.ChannelID] + if !exists { + // not returning execute.ErrNotificationsNotConfigured error, as MS Teams channels are configured dynamically. + // In such case this shouldn't be considered as an error. + + conv = conversation{ + ref: ref, } } - b.notifyMutex.Lock() - defer b.notifyMutex.Unlock() - b.notify = enabled + conv.notify = enabled + conversations[ref.ChannelID] = conv + b.setConversations(conversations) + return nil } -func (b *Teams) sendProactiveMessage(ctx context.Context, card map[string]interface{}) error { - b.conversationRefMutex.RLock() - defer b.conversationRefMutex.RUnlock() - if b.conversationRef == nil { - b.log.Infof("Skipping SendMessage since conversation ref not set") - return nil - } - err := b.Adapter.ProactiveMessage(ctx, *b.conversationRef, coreActivity.HandlerFuncs{ +func (b *Teams) sendProactiveMessage(ctx context.Context, convRef schema.ConversationReference, card map[string]interface{}) error { + err := b.Adapter.ProactiveMessage(ctx, convRef, coreActivity.HandlerFuncs{ OnMessageFunc: func(turn *coreActivity.TurnContext) (schema.Activity, error) { attachments := []schema.Attachment{ { @@ -383,21 +411,84 @@ func (b *Teams) sendProactiveMessage(ctx context.Context, card map[string]interf return err } +func (b *Teams) getConversationRefsToNotify() []schema.ConversationReference { + // TODO(https://github.com/kubeshop/botkube/issues/596): Support source bindings - filter events here or at source level and pass it every time via event property? + var convRefsToNotify []schema.ConversationReference + for _, convConfig := range b.getConversations() { + if !convConfig.notify { + b.log.Infof("Skipping notification for channel %q as notifications are disabled.", convConfig.ref.ChannelID) + continue + } + + convRefsToNotify = append(convRefsToNotify, convConfig.ref) + } + return convRefsToNotify +} + +func (b *Teams) getConversations() map[string]conversation { + b.conversationsMutex.RLock() + defer b.conversationsMutex.RUnlock() + return b.conversations +} + +func (b *Teams) setConversations(conversations map[string]conversation) { + b.conversationsMutex.Lock() + defer b.conversationsMutex.Unlock() + b.conversations = conversations +} + +// The whole integration should be rewritten using a different library. See the TODO on the top of the file. +func (b *Teams) getConversationReferenceFrom(activity schema.Activity) (schema.ConversationReference, error) { + // Such ref has the ChannelID property always set to `msteams`. Why? ¯\_(ツ)_/¯ + ref := coreActivity.GetCoversationReference(activity) + + // Set proper IDs as seen in previous implementation. Why both activity and channel IDs are needed? ¯\_(ツ)_/¯ + rawChannelID, exists := activity.ChannelData["teamsChannelId"] + if !exists { + // Apparently `msteams` ID is sometimes OK, for example in private conversation. + // Why? Is there a separation for two users? I guess the Activity ID also matters... ¯\_(ツ)_/¯ + b.log.Info("Teams Channel ID not found. Using default ID...`") + return ref, nil + } + + channelID, ok := rawChannelID.(string) + if !ok { + return schema.ConversationReference{}, fmt.Errorf("couldn't convert channelID from channel data to string") + } + + ref.ChannelID = channelID + ref.Conversation.ID = channelID + return ref, nil +} + +func (b *Teams) trimBotMention(msg string) string { + return b.botMentionRegex.ReplaceAllString(msg, "") +} + type teamsNotificationManager struct { - b *Teams - activity schema.Activity + b *Teams + ref schema.ConversationReference +} + +func newTeamsNotifMgrForActivity(b *Teams, ref schema.ConversationReference) *teamsNotificationManager { + return &teamsNotificationManager{b: b, ref: ref} } -func newTeamsNotifMgrForActivity(b *Teams, activity schema.Activity) *teamsNotificationManager { - return &teamsNotificationManager{b: b, activity: activity} +// NotificationsEnabled returns current notification status for a given channel ID. +func (n *teamsNotificationManager) NotificationsEnabled(channelID string) bool { + return n.b.NotificationsEnabled(channelID) } -// NotificationsEnabled returns current notification status. -func (n *teamsNotificationManager) NotificationsEnabled() bool { - return n.b.NotificationsEnabled() +// SetNotificationsEnabled sets a new notification status for a given channel ID. +func (n *teamsNotificationManager) SetNotificationsEnabled(_ string, enabled bool) error { + return n.b.SetNotificationsEnabled(enabled, n.ref) } -// SetNotificationsEnabled sets a new notification status. -func (n *teamsNotificationManager) SetNotificationsEnabled(enabled bool) error { - return n.b.SetNotificationsEnabled(enabled, n.activity) +func teamsBotMentionRegex(BotName string) (*regexp.Regexp, error) { + botMentionRegex, err := regexp.Compile(fmt.Sprintf(teamsBotMentionPrefixFmt, BotName)) + if err != nil { + return nil, fmt.Errorf("while compiling bot mention regex: %w", err) + } + + return botMentionRegex, nil } diff --git a/pkg/bot/teams_test.go b/pkg/bot/teams_test.go new file mode 100644 index 000000000..c783858a9 --- /dev/null +++ b/pkg/bot/teams_test.go @@ -0,0 +1,49 @@ +package bot + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestTeams_TrimBotMention(t *testing.T) { + /// given + botName := "BotKube" + testCases := []struct { + Name string + Input string + ExpectedTrimmedMsg string + }{ + { + Name: "Mention", + Input: "BotKube get pods", + ExpectedTrimmedMsg: " get pods", + }, + { + Name: "Not at the beginning", + Input: "Not at the beginning BotKube get pods", + ExpectedTrimmedMsg: "Not at the beginning BotKube get pods", + }, + { + Name: "Different mention", + Input: "bootkube get pods", + ExpectedTrimmedMsg: "bootkube get pods", + }, + } + + for _, tc := range testCases { + t.Run(tc.Name, func(t *testing.T) { + botMentionRegex, err := teamsBotMentionRegex(botName) + require.NoError(t, err) + b := &Teams{botMentionRegex: botMentionRegex} + require.NoError(t, err) + + // when + actualTrimmedMsg := b.trimBotMention(tc.Input) + + // then + assert.Equal(t, tc.ExpectedTrimmedMsg, actualTrimmedMsg) + }) + } +} diff --git a/pkg/config/config.go b/pkg/config/config.go index acf86f2b2..e367dfa2c 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -122,8 +122,8 @@ const ( // Config structure of configuration yaml file type Config struct { Sources IndexableMap[Sources] `yaml:"sources"` - Executors IndexableMap[Executors] `yaml:"executors" validate:"required,min=1"` - Communications IndexableMap[Communications] `yaml:"communications" validate:"required,eq=1"` + Executors IndexableMap[Executors] `yaml:"executors" validate:"required"` + Communications IndexableMap[Communications] `yaml:"communications" validate:"required,min=1"` Analytics Analytics `yaml:"analytics"` Settings Settings `yaml:"settings"` @@ -135,12 +135,22 @@ type ChannelBindingsByName struct { Bindings BotBindings `yaml:"bindings"` } +// Identifier returns ChannelBindingsByID identifier. +func (c ChannelBindingsByName) Identifier() string { + return c.Name +} + // ChannelBindingsByID contains configuration bindings per channel. type ChannelBindingsByID struct { ID string `yaml:"id"` Bindings BotBindings `yaml:"bindings"` } +// Identifier returns ChannelBindingsByID identifier. +func (c ChannelBindingsByID) Identifier() string { + return c.ID +} + // BotBindings contains configuration for possible Bot bindings. type BotBindings struct { Sources []string `yaml:"sources"` @@ -254,10 +264,10 @@ type Communications struct { // Slack configuration to authentication and send notifications type Slack struct { - Enabled bool `yaml:"enabled"` - Channels IndexableMap[ChannelBindingsByName] `yaml:"channels" validate:"required,eq=1"` - Notification Notification `yaml:"notification,omitempty"` - Token string `yaml:"token,omitempty"` + Enabled bool `yaml:"enabled"` + Channels IdentifiableMap[ChannelBindingsByName] `yaml:"channels" validate:"required,eq=1"` + Notification Notification `yaml:"notification,omitempty"` + Token string `yaml:"token,omitempty"` } // Elasticsearch config auth settings @@ -290,13 +300,13 @@ type ELSIndex struct { // Mattermost configuration to authentication and send notifications type Mattermost struct { - Enabled bool `yaml:"enabled"` - BotName string `yaml:"botName"` - URL string `yaml:"url"` - Token string `yaml:"token"` - Team string `yaml:"team"` - Channels IndexableMap[ChannelBindingsByName] `yaml:"channels" validate:"required,eq=1"` - Notification Notification `yaml:"notification,omitempty"` + Enabled bool `yaml:"enabled"` + BotName string `yaml:"botName"` + URL string `yaml:"url"` + Token string `yaml:"token"` + Team string `yaml:"team"` + Channels IdentifiableMap[ChannelBindingsByName] `yaml:"channels" validate:"required,eq=1"` + Notification Notification `yaml:"notification,omitempty"` } // Teams creds for authentication with MS Teams @@ -308,18 +318,19 @@ type Teams struct { Team string `yaml:"team"` Port string `yaml:"port"` MessagePath string `yaml:"messagePath,omitempty"` - // TODO: not used yet. - Channels IndexableMap[ChannelBindingsByName] `yaml:"channels"` - Notification Notification `yaml:"notification,omitempty"` + // TODO: Be consistent with other communicators when MS Teams support multiple channels + //Channels IndexableMap[ChannelBindingsByName] `yaml:"channels"` + Bindings BotBindings `yaml:"bindings"` + Notification Notification `yaml:"notification,omitempty"` } // Discord configuration for authentication and send notifications type Discord struct { - Enabled bool `yaml:"enabled"` - Token string `yaml:"token"` - BotID string `yaml:"botID"` - Channels IndexableMap[ChannelBindingsByID] `yaml:"channels" validate:"required,eq=1"` - Notification Notification `yaml:"notification,omitempty"` + Enabled bool `yaml:"enabled"` + Token string `yaml:"token"` + BotID string `yaml:"botID"` + Channels IdentifiableMap[ChannelBindingsByID] `yaml:"channels" validate:"required,eq=1"` + Notification Notification `yaml:"notification,omitempty"` } // Webhook configuration to send notifications @@ -455,3 +466,24 @@ func (t IndexableMap[T]) GetFirst() T { return empty } + +// IdentifiableMap provides an option to construct an indexable map for identifiable items. +type IdentifiableMap[T Identifiable] map[string]T + +// Identifiable exports an Identifier method. +type Identifiable interface { + Identifier() string +} + +// GetByIdentifier gets an item from a map by identifier. +func (t IdentifiableMap[T]) GetByIdentifier(val string) (T, bool) { + for _, v := range t { + if v.Identifier() != val { + continue + } + return v, true + } + + var empty T + return empty, false +} diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 0b6370ac1..4391207db 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -158,12 +158,10 @@ func TestLoadedConfigValidation(t *testing.T) { configFiles: nil, }, { - // TODO(remove): https://github.com/kubeshop/botkube/issues/596 name: "empty executors and communications settings", expErrMsg: heredoc.Doc(` - while validating loaded configuration: 2 errors occurred: - * Key: 'Config.Executors' Error:Field validation for 'Executors' failed on the 'min' tag - * Key: 'Config.Communications' Error:Field validation for 'Communications' failed on the 'eq' tag`), + while validating loaded configuration: 1 error occurred: + * Key: 'Config.Communications' Error:Field validation for 'Communications' failed on the 'min' tag`), configFiles: []string{ testdataFile(t, "empty-executors-communications.yaml"), }, diff --git a/pkg/config/testdata/TestLoadConfigSuccess/config-all.yaml b/pkg/config/testdata/TestLoadConfigSuccess/config-all.yaml index 6bb74901b..c9705b5d8 100644 --- a/pkg/config/testdata/TestLoadConfigSuccess/config-all.yaml +++ b/pkg/config/testdata/TestLoadConfigSuccess/config-all.yaml @@ -34,14 +34,11 @@ communications: # req 1 elm. enabled: false appID: 'APPLICATION_ID' appPassword: 'APPLICATION_PASSWORD' - channels: - 'alias': - name: 'TEAMS_CHANNEL_ID' - bindings: - executors: - - kubectl-read-only - sources: - - k8s-events + bindings: + executors: + - kubectl-read-only + sources: + - k8s-events notification: type: short port: 3978 diff --git a/pkg/config/testdata/TestLoadConfigSuccess/config.golden.yaml b/pkg/config/testdata/TestLoadConfigSuccess/config.golden.yaml index cd4d05b01..eee2d0967 100644 --- a/pkg/config/testdata/TestLoadConfigSuccess/config.golden.yaml +++ b/pkg/config/testdata/TestLoadConfigSuccess/config.golden.yaml @@ -320,14 +320,11 @@ communications: appPassword: APPLICATION_PASSWORD team: "" port: "3978" - channels: - alias: - name: TEAMS_CHANNEL_ID - bindings: - sources: - - k8s-events - executors: - - kubectl-read-only + bindings: + sources: + - k8s-events + executors: + - kubectl-read-only notification: type: short webhook: diff --git a/pkg/controller/notifier.go b/pkg/controller/notifier.go index e4b9b9a45..cd6f41ea6 100644 --- a/pkg/controller/notifier.go +++ b/pkg/controller/notifier.go @@ -16,6 +16,7 @@ type Notifier interface { // SendMessage is used for notifying about BotKube start/stop listening, possible BotKube upgrades and other events. // Some integrations may decide to ignore such messages and have SendMessage method no-op. + // TODO: Consider option per channel to turn on/off "announcements" (BotKube start/stop/upgrade notify/config change. SendMessage(context.Context, string) error // IntegrationName returns a name of a given communication platform. diff --git a/pkg/execute/executor.go b/pkg/execute/executor.go index 4f427b263..bb43957a7 100644 --- a/pkg/execute/executor.go +++ b/pkg/execute/executor.go @@ -2,6 +2,7 @@ package execute import ( "bytes" + "errors" "fmt" "strings" "text/tabwriter" @@ -16,10 +17,6 @@ import ( ) var ( - // validNotifierCommand is a map of valid notifier commands - validNotifierCommand = map[string]bool{ - "notifier": true, - } validPingCommand = map[string]bool{ "ping": true, } @@ -66,18 +63,18 @@ const ( // DefaultExecutor is a default implementations of Executor type DefaultExecutor struct { - cfg config.Config - filterEngine filterengine.FilterEngine - log logrus.FieldLogger - runCmdFn CommandRunnerFunc - notifierExecutor *NotifierExecutor - notifierHandler NotifierHandler - - Message string - IsAuthChannel bool - Platform config.CommPlatformIntegration - + cfg config.Config + filterEngine filterengine.FilterEngine + log logrus.FieldLogger analyticsReporter AnalyticsReporter + runCmdFn CommandRunnerFunc + notifierExecutor *NotifierExecutor + notifierHandler NotifierHandler + bindings []string + message string + isAuthChannel bool + platform config.CommPlatformIntegration + conversationID string kubectlExecutor *Kubectl } @@ -138,9 +135,9 @@ func (action FiltersAction) String() string { } // Execute executes commands and returns output -func (e *DefaultExecutor) Execute(channelBindings []string) string { +func (e *DefaultExecutor) Execute() string { // Remove hyperlink if it got added automatically - command := utils.RemoveHyperlink(e.Message) + command := utils.RemoveHyperlink(e.message) var ( clusterName = e.cfg.Settings.ClusterName @@ -148,8 +145,8 @@ func (e *DefaultExecutor) Execute(channelBindings []string) string { args = strings.Fields(strings.TrimSpace(command)) ) if len(args) == 0 { - if e.IsAuthChannel { - return e.printDefaultMsg(e.Platform) + if e.isAuthChannel { + return e.printDefaultMsg(e.platform) } return "" // this prevents all bots on all clusters to answer something } @@ -162,17 +159,17 @@ func (e *DefaultExecutor) Execute(channelBindings []string) string { return "" // user specified different target cluster } - if e.kubectlExecutor.CanHandle(channelBindings, args) { + if e.kubectlExecutor.CanHandle(e.bindings, args) { // Currently the verb is always at the first place of `args`, and, in a result, `finalArgs`. // The length of the slice was already checked before // See the DefaultExecutor.Execute() logic. verb := args[0] - err := e.analyticsReporter.ReportCommand(e.Platform, verb) + err := e.analyticsReporter.ReportCommand(e.platform, verb) if err != nil { // TODO: Return error when the DefaultExecutor is refactored as a part of https://github.com/kubeshop/botkube/issues/589 e.log.Errorf("while reporting executed command: %s", err.Error()) } - out, err := e.kubectlExecutor.Execute(channelBindings, e.Message, e.IsAuthChannel) + out, err := e.kubectlExecutor.Execute(e.bindings, e.message, e.isAuthChannel) if err != nil { // TODO: Return error when the DefaultExecutor is refactored as a part of https://github.com/kubeshop/botkube/issues/589 e.log.Errorf("while executing kubectl: %s", err.Error()) @@ -180,21 +177,17 @@ func (e *DefaultExecutor) Execute(channelBindings []string) string { } return out } - if validNotifierCommand[args[0]] { - if !e.IsAuthChannel { - // TODO: Return error when the DefaultExecutor is refactored as a part of https://github.com/kubeshop/botkube/issues/589 - return "" - } - - res, err := e.notifierExecutor.Do(args, e.Platform, clusterName, e.notifierHandler) + if e.notifierExecutor.CanHandle(args) { + res, err := e.notifierExecutor.Do(args, e.platform, e.conversationID, clusterName, e.notifierHandler) if err != nil { - if err == errInvalidNotifierCommand { + if errors.Is(err, errInvalidNotifierCommand) { return incompleteCmdMsg } - if err == errUnsupportedCommand { + if errors.Is(err, errUnsupportedCommand) { return unsupportedCmdMsg } + // TODO: Return error when the DefaultExecutor is refactored as a part of https://github.com/kubeshop/botkube/issues/589 e.log.Errorf("while executing notifier command: %s", err.Error()) } @@ -213,16 +206,16 @@ func (e *DefaultExecutor) Execute(channelBindings []string) string { } // Check if filter command if validFilterCommand[args[0]] { - return e.runFilterCommand(args, clusterName, e.IsAuthChannel) + return e.runFilterCommand(args, clusterName, e.isAuthChannel) } //Check if info command if validInfoCommand[args[0]] { - return e.runInfoCommand(args, e.IsAuthChannel) + return e.runInfoCommand(args, e.isAuthChannel) } - if e.IsAuthChannel { - return e.printDefaultMsg(e.Platform) + if e.isAuthChannel { + return e.printDefaultMsg(e.platform) } return "" } @@ -247,7 +240,7 @@ func (e *DefaultExecutor) runFilterCommand(args []string, clusterName string, is var cmdVerb = args[1] defer func() { cmdToReport := fmt.Sprintf("%s %s", args[0], cmdVerb) - err := e.analyticsReporter.ReportCommand(e.Platform, cmdToReport) + err := e.analyticsReporter.ReportCommand(e.platform, cmdToReport) if err != nil { // TODO: Return error when the DefaultExecutor is refactored as a part of https://github.com/kubeshop/botkube/issues/589 e.log.Errorf("while reporting filter command: %s", err.Error()) @@ -283,7 +276,7 @@ func (e *DefaultExecutor) runFilterCommand(args []string, clusterName string, is } cmdVerb = anonymizedInvalidVerb // prevent passing any personal information - return e.printDefaultMsg(e.Platform) + return e.printDefaultMsg(e.platform) } //runInfoCommand to list allowed commands @@ -295,7 +288,7 @@ func (e *DefaultExecutor) runInfoCommand(args []string, isAuthChannel bool) stri return incompleteCmdMsg } - err := e.analyticsReporter.ReportCommand(e.Platform, strings.Join(args, " ")) + err := e.analyticsReporter.ReportCommand(e.platform, strings.Join(args, " ")) if err != nil { // TODO: Return error when the DefaultExecutor is refactored as a part of https://github.com/kubeshop/botkube/issues/589 e.log.Errorf("while reporting info command: %s", err.Error()) @@ -347,7 +340,7 @@ func (e *DefaultExecutor) findBotKubeVersion() (versions string) { } func (e *DefaultExecutor) runVersionCommand(args []string, clusterName string) string { - err := e.analyticsReporter.ReportCommand(e.Platform, args[0]) + err := e.analyticsReporter.ReportCommand(e.platform, args[0]) if err != nil { // TODO: Return error when the DefaultExecutor is refactored as a part of https://github.com/kubeshop/botkube/issues/589 e.log.Errorf("while reporting version command: %s", err.Error()) diff --git a/pkg/execute/factory.go b/pkg/execute/factory.go index 9e7d79cd7..8fca2f605 100644 --- a/pkg/execute/factory.go +++ b/pkg/execute/factory.go @@ -21,7 +21,7 @@ type DefaultExecutorFactory struct { // Executor is an interface for processes to execute commands type Executor interface { - Execute(bindings []string) string + Execute() string } // AnalyticsReporter defines a reporter that collects analytics data. @@ -54,7 +54,7 @@ func NewExecutorFactory(log logrus.FieldLogger, runCmdFn CommandRunnerFunc, cfg } // NewDefault creates new Default Executor. -func (f *DefaultExecutorFactory) NewDefault(platform config.CommPlatformIntegration, notifierHandler NotifierHandler, isAuthChannel bool, message string) Executor { +func (f *DefaultExecutorFactory) NewDefault(platform config.CommPlatformIntegration, notifierHandler NotifierHandler, isAuthChannel bool, conversationID string, bindings []string, message string) Executor { return &DefaultExecutor{ log: f.log, runCmdFn: f.runCmdFn, @@ -65,8 +65,10 @@ func (f *DefaultExecutorFactory) NewDefault(platform config.CommPlatformIntegrat filterEngine: f.filterEngine, notifierHandler: notifierHandler, - IsAuthChannel: isAuthChannel, - Message: message, - Platform: platform, + isAuthChannel: isAuthChannel, + bindings: bindings, + message: message, + platform: platform, + conversationID: conversationID, } } diff --git a/pkg/execute/notifier.go b/pkg/execute/notifier.go index 87545ef6f..b5577a3b5 100644 --- a/pkg/execute/notifier.go +++ b/pkg/execute/notifier.go @@ -11,23 +11,28 @@ import ( ) const ( - notifierStartMsgFmt = "Brace yourselves, incoming notifications from cluster %q." - notifierStopMsgFmt = "Sure! I won't send you notifications from cluster %q anymore." - notifierStatusMsgFmt = "Notifications from cluster %q are %s." + notifierStartMsgFmt = "Brace yourselves, incoming notifications from cluster '%s'." + notifierStopMsgFmt = "Sure! I won't send you notifications from cluster '%s' here." + notifierStatusMsgFmt = "Notifications from cluster '%s' are %s here." + notifierNotConfiguredMsgFmt = "I'm not configured to send notifications here ('%s') from cluster '%s', so you cannot turn them on or off." + + notifierCmdFirstArg = "notifier" ) // NotifierHandler handles disabling and enabling notifications for a given communication platform. type NotifierHandler interface { - // NotificationsEnabled returns current notification status. - NotificationsEnabled() bool + // NotificationsEnabled returns current notification status for a given conversation ID. + NotificationsEnabled(conversationID string) bool - // SetNotificationsEnabled sets a new notification status. - SetNotificationsEnabled(enabled bool) error + // SetNotificationsEnabled sets a new notification status for a given conversation ID. + SetNotificationsEnabled(conversationID string, enabled bool) error } var ( errInvalidNotifierCommand = errors.New("invalid notifier command") errUnsupportedCommand = errors.New("unsupported command") + // ErrNotificationsNotConfigured describes an error when user wants to toggle on/off the notifications for not configured channel. + ErrNotificationsNotConfigured = errors.New("notifications not configured for this channel") ) // NotifierExecutor executes all commands that are related to notifications. @@ -44,8 +49,21 @@ func NewNotifierExecutor(log logrus.FieldLogger, cfg config.Config, analyticsRep return &NotifierExecutor{log: log, cfg: cfg, analyticsReporter: analyticsReporter} } +// CanHandle returns true if the arguments can be handled by this executor. +func (e *NotifierExecutor) CanHandle(args []string) bool { + if len(args) == 0 { + return false + } + + if args[0] != notifierCmdFirstArg { + return false + } + + return true +} + // Do executes a given Notifier command based on args. -func (e *NotifierExecutor) Do(args []string, platform config.CommPlatformIntegration, clusterName string, handler NotifierHandler) (string, error) { +func (e *NotifierExecutor) Do(args []string, platform config.CommPlatformIntegration, conversationID string, clusterName string, handler NotifierHandler) (string, error) { if len(args) != 2 { return "", errInvalidNotifierCommand } @@ -66,21 +84,29 @@ func (e *NotifierExecutor) Do(args []string, platform config.CommPlatformIntegra switch NotifierAction(cmdVerb) { case Start: - err := handler.SetNotificationsEnabled(true) + err := handler.SetNotificationsEnabled(conversationID, true) if err != nil { + if errors.Is(err, ErrNotificationsNotConfigured) { + return fmt.Sprintf(notifierNotConfiguredMsgFmt, conversationID, clusterName), nil + } + return "", fmt.Errorf("while setting notifications to true: %w", err) } return fmt.Sprintf(notifierStartMsgFmt, clusterName), nil case Stop: - err := handler.SetNotificationsEnabled(false) + err := handler.SetNotificationsEnabled(conversationID, false) if err != nil { + if errors.Is(err, ErrNotificationsNotConfigured) { + return fmt.Sprintf(notifierNotConfiguredMsgFmt, conversationID, clusterName), nil + } + return "", fmt.Errorf("while setting notifications to false: %w", err) } return fmt.Sprintf(notifierStopMsgFmt, clusterName), nil case Status: - enabled := handler.NotificationsEnabled() + enabled := handler.NotificationsEnabled(conversationID) enabledStr := "enabled" if !enabled { diff --git a/pkg/execute/notifier_test.go b/pkg/execute/notifier_test.go index 4318a5f8a..864c22257 100644 --- a/pkg/execute/notifier_test.go +++ b/pkg/execute/notifier_test.go @@ -27,28 +27,56 @@ func TestNotifierExecutor_Do_Success(t *testing.T) { Name string InputArgs []string InputNotifierHandler NotifierHandler + ConversationID string ExpectedResult string ExpectedStatusAfter string ExpectedErrorMessage string }{ { - Name: "Start", - InputArgs: []string{"notifier", "start"}, - InputNotifierHandler: &fakeNotifierHandler{enabled: false}, - ExpectedResult: `Brace yourselves, incoming notifications from cluster "cluster-name".`, - ExpectedStatusAfter: `Notifications from cluster "cluster-name" are enabled.`, + Name: "Start", + InputArgs: []string{"notifier", "start"}, + ConversationID: "conv-id", + InputNotifierHandler: &fakeNotifierHandler{ + conf: map[string]bool{"conv-id": false}, + }, + ExpectedResult: `Brace yourselves, incoming notifications from cluster 'cluster-name'.`, + ExpectedStatusAfter: `Notifications from cluster 'cluster-name' are enabled here.`, }, { - Name: "Stop", - InputArgs: []string{"notifier", "stop"}, - InputNotifierHandler: &fakeNotifierHandler{enabled: true}, - ExpectedResult: `Sure! I won't send you notifications from cluster "cluster-name" anymore.`, - ExpectedStatusAfter: `Notifications from cluster "cluster-name" are disabled.`, + Name: "Start for non-configured channel", + InputArgs: []string{"notifier", "start"}, + ConversationID: "non-existing", + InputNotifierHandler: &fakeNotifierHandler{ + conf: map[string]bool{"conv-id": false}, + }, + ExpectedResult: `I'm not configured to send notifications here ('non-existing') from cluster 'cluster-name', so you cannot turn them on or off.`, + ExpectedStatusAfter: `Notifications from cluster 'cluster-name' are disabled here.`, + }, + { + Name: "Stop", + ConversationID: "conv-id", + InputArgs: []string{"notifier", "stop"}, + InputNotifierHandler: &fakeNotifierHandler{ + conf: map[string]bool{"conv-id": true}, + }, + ExpectedResult: `Sure! I won't send you notifications from cluster 'cluster-name' here.`, + ExpectedStatusAfter: `Notifications from cluster 'cluster-name' are disabled here.`, + }, + { + Name: "Stop for non-configured channel", + ConversationID: "non-existing", + InputArgs: []string{"notifier", "stop"}, + InputNotifierHandler: &fakeNotifierHandler{conf: map[string]bool{ + "conv-id": true, + }}, + ExpectedResult: `I'm not configured to send notifications here ('non-existing') from cluster 'cluster-name', so you cannot turn them on or off.`, + ExpectedStatusAfter: `Notifications from cluster 'cluster-name' are disabled here.`, }, { Name: "Show config", + ConversationID: "conv-id", InputArgs: []string{"notifier", "showconfig"}, - InputNotifierHandler: &fakeNotifierHandler{enabled: false}, + InputNotifierHandler: &fakeNotifierHandler{}, ExpectedResult: heredoc.Doc(` Showing config for cluster "cluster-name": @@ -69,7 +97,7 @@ func TestNotifierExecutor_Do_Success(t *testing.T) { informersResyncPeriod: 0s kubeconfig: "" `), - ExpectedStatusAfter: `Notifications from cluster "cluster-name" are disabled.`, + ExpectedStatusAfter: `Notifications from cluster 'cluster-name' are disabled here.`, }, { Name: "Invalid verb", @@ -95,7 +123,7 @@ func TestNotifierExecutor_Do_Success(t *testing.T) { // execute command // when - actual, err := e.Do(tc.InputArgs, platform, clusterName, tc.InputNotifierHandler) + actual, err := e.Do(tc.InputArgs, platform, tc.ConversationID, clusterName, tc.InputNotifierHandler) // then @@ -113,7 +141,7 @@ func TestNotifierExecutor_Do_Success(t *testing.T) { // get status after executing a given command // when - actual, err = e.Do(statusArgs, platform, clusterName, tc.InputNotifierHandler) + actual, err = e.Do(statusArgs, platform, tc.ConversationID, clusterName, tc.InputNotifierHandler) // then require.Nil(t, err) assert.Equal(t, tc.ExpectedStatusAfter, actual) @@ -122,15 +150,25 @@ func TestNotifierExecutor_Do_Success(t *testing.T) { } type fakeNotifierHandler struct { - enabled bool + conf map[string]bool } -func (f *fakeNotifierHandler) NotificationsEnabled() bool { - return f.enabled +func (f *fakeNotifierHandler) NotificationsEnabled(convID string) bool { + enabled, exists := f.conf[convID] + if !exists { + return false + } + + return enabled } -func (f *fakeNotifierHandler) SetNotificationsEnabled(enabled bool) error { - f.enabled = enabled +func (f *fakeNotifierHandler) SetNotificationsEnabled(convID string, enabled bool) error { + _, exists := f.conf[convID] + if !exists { + return ErrNotificationsNotConfigured + } + + f.conf[convID] = enabled return nil } diff --git a/pkg/sink/elasticsearch.go b/pkg/sink/elasticsearch.go index 25fec06da..887c36078 100644 --- a/pkg/sink/elasticsearch.go +++ b/pkg/sink/elasticsearch.go @@ -20,6 +20,7 @@ import ( "github.com/kubeshop/botkube/pkg/config" "github.com/kubeshop/botkube/pkg/events" + "github.com/kubeshop/botkube/pkg/multierror" ) var _ Sink = &Elasticsearch{} @@ -40,14 +41,8 @@ const ( type Elasticsearch struct { log logrus.FieldLogger reporter AnalyticsReporter - - ELSClient *elastic.Client - Server string - SkipTLSVerify bool - Index string - Shards int - Replicas int - IndexType string + client *elastic.Client + indices map[string]config.ELSIndex } // NewElasticsearch creates a new Elasticsearch instance. @@ -113,13 +108,10 @@ func NewElasticsearch(log logrus.FieldLogger, c config.Elasticsearch, reporter A } esNotifier := &Elasticsearch{ - log: log, - reporter: reporter, - ELSClient: elsClient, - Index: c.Indices.GetFirst().Name, - IndexType: c.Indices.GetFirst().Type, - Shards: c.Indices.GetFirst().Shards, - Replicas: c.Indices.GetFirst().Replicas, + log: log, + reporter: reporter, + client: elsClient, + indices: c.Indices, } err = reporter.ReportSinkEnabled(esNotifier.IntegrationName()) @@ -142,11 +134,11 @@ type index struct { Replicas int `json:"number_of_replicas"` } -func (e *Elasticsearch) flushIndex(ctx context.Context, event interface{}) error { +func (e *Elasticsearch) flushIndex(ctx context.Context, indexCfg config.ELSIndex, event interface{}) error { // Construct the ELS Index Name with timestamp suffix - indexName := e.Index + "-" + time.Now().Format(indexSuffixFormat) + indexName := indexCfg.Name + "-" + time.Now().Format(indexSuffixFormat) // Create index if not exists - exists, err := e.ELSClient.IndexExists(indexName).Do(ctx) + exists, err := e.client.IndexExists(indexName).Do(ctx) if err != nil { return fmt.Errorf("while getting index: %w", err) } @@ -155,23 +147,23 @@ func (e *Elasticsearch) flushIndex(ctx context.Context, event interface{}) error mapping := mapping{ Settings: settings{ index{ - Shards: e.Shards, - Replicas: e.Replicas, + Shards: indexCfg.Shards, + Replicas: indexCfg.Replicas, }, }, } - _, err := e.ELSClient.CreateIndex(indexName).BodyJson(mapping).Do(ctx) + _, err := e.client.CreateIndex(indexName).BodyJson(mapping).Do(ctx) if err != nil { return fmt.Errorf("while creating index: %w", err) } } // Send event to els - _, err = e.ELSClient.Index().Index(indexName).Type(e.IndexType).BodyJson(event).Do(ctx) + _, err = e.client.Index().Index(indexName).Type(indexCfg.Type).BodyJson(event).Do(ctx) if err != nil { return fmt.Errorf("while posting data to ELS: %w", err) } - _, err = e.ELSClient.Flush().Index(indexName).Do(ctx) + _, err = e.client.Flush().Index(indexName).Do(ctx) if err != nil { return fmt.Errorf("while flushing data in ELS: %w", err) } @@ -183,12 +175,19 @@ func (e *Elasticsearch) flushIndex(ctx context.Context, event interface{}) error func (e *Elasticsearch) SendEvent(ctx context.Context, event events.Event) (err error) { e.log.Debugf(">> Sending to Elasticsearch: %+v", event) - // Create index if not exists - if err := e.flushIndex(ctx, event); err != nil { - return fmt.Errorf("while sending event to Elasticsearch: %w", err) + errs := multierror.New() + // TODO(https://github.com/kubeshop/botkube/issues/596): Support source bindings - filter events here or at source level and pass it every time via event property? + for _, indexCfg := range e.indices { + err := e.flushIndex(ctx, indexCfg, event) + if err != nil { + errs = multierror.Append(errs, fmt.Errorf("while sending event to Elasticsearch index %q: %w", indexCfg.Name, err)) + continue + } + + e.log.Debugf("Event successfully sent to Elasticsearch index %q", indexCfg.Name) } - return nil + return errs.ErrorOrNil() } // SendMessage is no-op diff --git a/test/README.md b/test/README.md index a136652a1..b4150624d 100644 --- a/test/README.md +++ b/test/README.md @@ -71,7 +71,7 @@ This directory contains E2E tests which are run against BotKube installed on Kub ```bash helm install botkube --namespace botkube ./helm/botkube --wait --create-namespace \ -f ./helm/botkube/e2e-test-values.yaml \ - --set communications.slack.token="${SLACK_BOT_TOKEN}" \ + --set communications.default-group.slack.token="${SLACK_BOT_TOKEN}" \ --set image.registry="${IMAGE_REGISTRY}" \ --set image.repository="${IMAGE_REPOSITORY}" \ --set image.tag="${IMAGE_TAG}" \ diff --git a/test/e2e/k8s_helpers_test.go b/test/e2e/k8s_helpers_test.go index 2e39bf0d9..310f78f96 100644 --- a/test/e2e/k8s_helpers_test.go +++ b/test/e2e/k8s_helpers_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + "github.com/slack-go/slack" "github.com/stretchr/testify/require" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" @@ -19,11 +20,10 @@ import ( deploymentutil "k8s.io/kubectl/pkg/util/deployment" ) -func setTestEnvsForDeploy(t *testing.T, appCfg Config, deployNsCli appsv1cli.DeploymentInterface, channelName string) func(t *testing.T) { +func setTestEnvsForDeploy(t *testing.T, appCfg Config, deployNsCli appsv1cli.DeploymentInterface, channels map[string]*slack.Channel) func(t *testing.T) { t.Helper() slackEnabledEnvName := appCfg.Deployment.Envs.SlackEnabledName - slackChannelIDEnvName := appCfg.Deployment.Envs.SlackChannelIDName deployment, err := deployNsCli.Get(context.Background(), appCfg.Deployment.Name, metav1.GetOptions{}) require.NoError(t, err) @@ -52,12 +52,16 @@ func setTestEnvsForDeploy(t *testing.T, appCfg Config, deployNsCli appsv1cli.Dep require.NoError(t, err) } + newEnvs := []v1.EnvVar{ + {Name: slackEnabledEnvName, Value: strconv.FormatBool(true)}, + } + for envName, slackChannel := range channels { + newEnvs = append(newEnvs, v1.EnvVar{Name: envName, Value: slackChannel.Name}) + } + deployment.Spec.Template.Spec.Containers[containerIdx].Env = updateEnv( envs, - []v1.EnvVar{ - {Name: slackEnabledEnvName, Value: strconv.FormatBool(true)}, - {Name: slackChannelIDEnvName, Value: channelName}, - }, + newEnvs, nil, ) diff --git a/test/e2e/slack_test.go b/test/e2e/slack_test.go index d1dcfdcf1..78c61da45 100644 --- a/test/e2e/slack_test.go +++ b/test/e2e/slack_test.go @@ -31,8 +31,9 @@ type Config struct { ContainerName string `envconfig:"default=botkube"` WaitTimeout time.Duration `envconfig:"default=3m"` Envs struct { - SlackEnabledName string `envconfig:"default=BOTKUBE_COMMUNICATIONS_DEFAULT-GROUP_SLACK_ENABLED"` - SlackChannelIDName string `envconfig:"default=BOTKUBE_COMMUNICATIONS_DEFAULT-GROUP_SLACK_CHANNELS_DEFAULT_NAME"` + SlackEnabledName string `envconfig:"default=BOTKUBE_COMMUNICATIONS_DEFAULT-GROUP_SLACK_ENABLED"` + DefaultSlackChannelIDName string `envconfig:"default=BOTKUBE_COMMUNICATIONS_DEFAULT-GROUP_SLACK_CHANNELS_DEFAULT_NAME"` + SecondarySlackChannelIDName string `envconfig:"default=BOTKUBE_COMMUNICATIONS_DEFAULT-GROUP_SLACK_CHANNELS_SECONDARY_NAME"` } } ClusterName string `envconfig:"default=sample"` @@ -70,17 +71,26 @@ func TestSlack(t *testing.T) { require.NoError(t, err) t.Log("Setting up test Slack setup...") + botUserID := slackTester.FindUserIDForBot(t) + testerUserID := slackTester.FindUserIDForTester(t) + channel, cleanupChannelFn := slackTester.CreateChannel(t) t.Cleanup(func() { cleanupChannelFn(t) }) + secondChannel, cleanupSecondChannelFn := slackTester.CreateChannel(t) + t.Cleanup(func() { cleanupSecondChannelFn(t) }) - slackTester.PostInitialMessage(t, channel.Name) - botUserID := slackTester.FindUserIDForBot(t) - testerUserID := slackTester.FindUserIDForTester(t) - slackTester.InviteBotToChannel(t, botUserID, channel.ID) + channels := map[string]*slack.Channel{ + appCfg.Deployment.Envs.DefaultSlackChannelIDName: channel, + appCfg.Deployment.Envs.SecondarySlackChannelIDName: secondChannel, + } + for _, currentChannel := range channels { + slackTester.PostInitialMessage(t, currentChannel.Name) + slackTester.InviteBotToChannel(t, botUserID, currentChannel.ID) + } t.Log("Patching Deployment with test env variables...") deployNsCli := k8sCli.AppsV1().Deployments(appCfg.Deployment.Namespace) - revertDeployFn := setTestEnvsForDeploy(t, appCfg, deployNsCli, channel.Name) + revertDeployFn := setTestEnvsForDeploy(t, appCfg, deployNsCli, channels) t.Cleanup(func() { revertDeployFn(t) }) t.Log("Waiting for Deployment") @@ -317,8 +327,12 @@ func TestSlack(t *testing.T) { }) }) - t.Run("Notifications", func(t *testing.T) { + t.Run("Multi-channel notifications", func(t *testing.T) { cfgMapCli := k8sCli.CoreV1().ConfigMaps(appCfg.Deployment.Namespace) + var channelIDs []string + for _, channel := range channels { + channelIDs = append(channelIDs, channel.ID) + } t.Log("Creating ConfigMap...") var cfgMapAlreadyDeleted bool @@ -342,7 +356,7 @@ func TestSlack(t *testing.T) { fmt.Sprintf("ConfigMap *%s/%s* has been created in *%s* cluster", cfgMap.Namespace, cfgMap.Name, appCfg.ClusterName), ) } - err = slackTester.WaitForMessagePosted(botUserID, channel.ID, 1, assertionFn) + err = slackTester.WaitForMessagesPostedOnChannels(botUserID, channelIDs, 1, assertionFn) require.NoError(t, err) t.Log("Updating ConfigMap...") @@ -361,17 +375,31 @@ func TestSlack(t *testing.T) { fmt.Sprintf("ConfigMap *%s/%s* has been updated in *%s* cluster", cfgMap.Namespace, cfgMap.Name, appCfg.ClusterName), ) } - err = slackTester.WaitForMessagePosted(botUserID, channel.ID, 1, assertionFn) + err = slackTester.WaitForMessagesPostedOnChannels(botUserID, channelIDs, 1, assertionFn) require.NoError(t, err) t.Log("Stopping notifier...") command := "notifier stop" - expectedMessage := codeBlock(fmt.Sprintf("Sure! I won't send you notifications from cluster %q anymore.", appCfg.ClusterName)) + expectedMessage := codeBlock(fmt.Sprintf("Sure! I won't send you notifications from cluster '%s' here.", appCfg.ClusterName)) slackTester.PostMessageToBot(t, channel.Name, command) err = slackTester.WaitForLastMessageEqual(botUserID, channel.ID, expectedMessage) assert.NoError(t, err) + t.Log("Getting notifier status from second channel...") + command = "notifier status" + expectedMessage = codeBlock(fmt.Sprintf("Notifications from cluster '%s' are enabled here.", appCfg.ClusterName)) + slackTester.PostMessageToBot(t, secondChannel.Name, command) + err = slackTester.WaitForLastMessageEqual(botUserID, secondChannel.ID, expectedMessage) + assert.NoError(t, err) + + t.Log("Getting notifier status from first channel...") + command = "notifier status" + expectedMessage = codeBlock(fmt.Sprintf("Notifications from cluster '%s' are disabled here.", appCfg.ClusterName)) + slackTester.PostMessageToBot(t, channel.Name, command) + err = slackTester.WaitForLastMessageEqual(botUserID, channel.ID, expectedMessage) + assert.NoError(t, err) + t.Log("Updating ConfigMap once again...") cfgMap.Data = map[string]string{ "operation": "update-second", @@ -379,18 +407,29 @@ func TestSlack(t *testing.T) { _, err = cfgMapCli.Update(context.Background(), cfgMap, metav1.UpdateOptions{}) require.NoError(t, err) - t.Log("Ensuring bot didn't write anything new...") + t.Log("Ensuring bot didn't write anything new on first channel...") time.Sleep(appCfg.Slack.MessageWaitTimeout) // Same expected message as before err = slackTester.WaitForLastMessageEqual(botUserID, channel.ID, expectedMessage) require.NoError(t, err) + t.Log("Expecting bot message on second channel...") + assertionFn = func(msg slack.Message) bool { + return doesMessageContainExactlyOneAttachment( + msg, + "v1/configmaps updated", + "daa038", + fmt.Sprintf("ConfigMap *%s/%s* has been updated in *%s* cluster", cfgMap.Namespace, cfgMap.Name, appCfg.ClusterName), + ) + } + err = slackTester.WaitForMessagePosted(botUserID, secondChannel.ID, 1, assertionFn) + t.Log("Starting notifier") command = "notifier start" - expectedMessage = codeBlock(fmt.Sprintf("Brace yourselves, incoming notifications from cluster %q.", appCfg.ClusterName)) - + expectedMessage = codeBlock(fmt.Sprintf("Brace yourselves, incoming notifications from cluster '%s'.", appCfg.ClusterName)) slackTester.PostMessageToBot(t, channel.Name, command) err = slackTester.WaitForLastMessageEqual(botUserID, channel.ID, expectedMessage) + require.NoError(t, err) t.Log("Creating and deleting ignored ConfigMap") ignoredCfgMap := &v1.ConfigMap{ @@ -426,7 +465,7 @@ func TestSlack(t *testing.T) { fmt.Sprintf("ConfigMap *%s/%s* has been deleted in *%s* cluster", cfgMap.Namespace, cfgMap.Name, appCfg.ClusterName), ) } - err = slackTester.WaitForMessagePosted(botUserID, channel.ID, 1, assertionFn) + err = slackTester.WaitForMessagesPostedOnChannels(botUserID, channelIDs, 1, assertionFn) require.NoError(t, err) }) diff --git a/test/e2e/slack_tester_test.go b/test/e2e/slack_tester_test.go index 81050957b..84631f2b5 100644 --- a/test/e2e/slack_tester_test.go +++ b/test/e2e/slack_tester_test.go @@ -15,6 +15,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "k8s.io/apimachinery/pkg/util/wait" + + "github.com/kubeshop/botkube/pkg/multierror" ) const recentMessagesLimit = 5 @@ -135,6 +137,12 @@ func (s *slackTester) WaitForLastMessageEqual(userID, channelID string, expected }) } +func (s *slackTester) WaitForLastMessageEqualOnChannels(userID string, channelIDs []string, expectedMsg string) error { + return s.WaitForMessagesPostedOnChannels(userID, channelIDs, 1, func(msg slack.Message) bool { + return msg.Text == expectedMsg + }) +} + func (s *slackTester) WaitForMessagePosted(userID, channelID string, limitMessages int, msgAssertFn func(msg slack.Message) bool) error { var fetchedMessages []slack.Message var lastErr error @@ -175,3 +183,12 @@ func (s *slackTester) WaitForMessagePosted(userID, channelID string, limitMessag return nil } + +func (s *slackTester) WaitForMessagesPostedOnChannels(userID string, channelIDs []string, limitMessages int, msgAssertFn func(msg slack.Message) bool) error { + errs := multierror.New() + for _, channelID := range channelIDs { + errs = multierror.Append(errs, s.WaitForMessagePosted(userID, channelID, limitMessages, msgAssertFn)) + } + + return errs.ErrorOrNil() +}