Skip to content

Commit

Permalink
Added: dirty bot implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
capcom6 committed Dec 27, 2023
1 parent 26ccc5f commit 73fde8b
Show file tree
Hide file tree
Showing 16 changed files with 282 additions and 101 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ monitor:
CONFIG_PATH=./configs/monitor.yml go run ./cmd/monitor/main.go

bot:
go run ./cmd/bot/main.go
CONFIG_PATH=./configs/bot.yml go run ./cmd/bot/main.go

init-dev:
go mod download \
Expand Down
106 changes: 102 additions & 4 deletions internal/botx/app.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,116 @@
package botx

import (
"fmt"
"context"
"sync"

"github.com/capcom6/go-infra-fx/logger"
"github.com/capcom6/service-monitor-tgbot/internal/botx/config"
"github.com/capcom6/service-monitor-tgbot/internal/botx/telegram"
"github.com/capcom6/service-monitor-tgbot/pkg/eventbus"
"github.com/capcom6/service-monitor-tgbot/pkg/events"
"go.uber.org/fx"
"go.uber.org/fx/fxevent"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

func Run() {
module := fx.Module(
"bot",
fx.Invoke(func() {
fmt.Println("Bot")
logger.Module,
config.Module,
telegram.Module,
eventbus.Module,
fx.Provide(NewMessages),
fx.Invoke(func(eventbus eventbus.EventBus, telegram *telegram.TelegramBot, messages *Messages, logger *zap.Logger, lc fx.Lifecycle) error {
ctx, cancel := context.WithCancel(context.Background())

wg := &sync.WaitGroup{}

lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
cancel()
wg.Wait()
return nil
},
})

ch, err := eventbus.Receive(ctx)
if err != nil {
return err
}

wg.Add(1)
go func() {
defer wg.Done()

logger.Debug("start receive events")
defer logger.Debug("stop receive events")

event := events.Event[events.ServiceStateChanged]{}
for payload := range ch {
if err := event.Decode(payload); err != nil {
continue
}
if event.Name != events.EventNameServiceStateChanged {
continue
}

v := event.Payload

msg := ""
if v.State == events.ServiceStateOffline {
context := OfflineContext{
OnlineContext: OnlineContext{
Name: telegram.EscapeText(v.Name),
},
Error: telegram.EscapeText(v.Error),
}
msg, err = messages.Render(TemplateOffline, context)
} else {
context := OnlineContext{
Name: telegram.EscapeText(v.Name),
}
msg, err = messages.Render(TemplateOnline, context)
}

if err != nil {
logger.Error("can't render template", zap.Error(err))
continue
}

if err := telegram.SendMessage(msg); err != nil {
logger.Error("can't send message", zap.Error(err))
}
}
// for {
// select {
// case payload := <-ch:
// if err := event.Decode(payload); err != nil {
// continue
// }
// if event.Name != events.EventNameServiceStateChanged {
// continue
// }

// logger.Info("service state changed", zap.Any("event", event.Payload))
// case <-ctx.Done():
// return
// }
// }
}()

return nil
}),
)

fx.New(module).Run()
fx.New(
module,
fx.WithLogger(func(logger *zap.Logger) fxevent.Logger {
logOption := fxevent.ZapLogger{Logger: logger}
logOption.UseLogLevel(zapcore.DebugLevel)
return &logOption
}),
).Run()
}
5 changes: 5 additions & 0 deletions internal/botx/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package config

type Config struct {
Telegram Telegram `yaml:"telegram"`
EventBus EventBus `yaml:"eventBus"`
}

type Telegram struct {
Expand All @@ -13,3 +14,7 @@ type Telegram struct {
}

type TelegramMessages map[string]string

type EventBus struct {
DSN string `yaml:"dsn" envconfig:"EVENTBUS__DSN" validate:"required"`
}
3 changes: 3 additions & 0 deletions internal/botx/config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,8 @@ var (

defaultConfig Config = Config{
Telegram: Telegram{Token: "", ChatID: 0, WebhookURL: "", Debug: false, Messages: defaultTelegramMessages},
EventBus: EventBus{
DSN: "redis://localhost:6379/0?channel=events",
},
}
)
6 changes: 6 additions & 0 deletions internal/botx/config/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package config

import (
"github.com/capcom6/go-infra-fx/config"
"github.com/capcom6/service-monitor-tgbot/pkg/eventbus"
"go.uber.org/fx"
"go.uber.org/zap"
)
Expand All @@ -20,4 +21,9 @@ var Module = fx.Module(
fx.Provide(func(cfg Config) TelegramMessages {
return cfg.Telegram.Messages
}),
fx.Provide(func(cfg Config) eventbus.Config {
return eventbus.Config{
DSN: cfg.EventBus.DSN,
}
}),
)
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package infrastructure
package telegram

import (
"go.uber.org/fx"
"go.uber.org/zap"
)

var Module = fx.Module(
"infrastructure",
"telegram",
fx.Decorate(func(log *zap.Logger) *zap.Logger {
return log.Named("infrastructure")
return log.Named("telegram")
}),
fx.Provide(NewTelegramBot),
)
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package infrastructure
package telegram

import (
"github.com/capcom6/service-monitor-tgbot/internal/botx/config"
Expand Down Expand Up @@ -42,35 +42,3 @@ func (b *TelegramBot) SendMessage(text string) error {
func (b *TelegramBot) EscapeText(text string) string {
return tgbotapi.EscapeText(tgbotapi.ModeMarkdownV2, text)
}

// func (b *TelegramBot) Api() (*tgbotapi.BotAPI, error) {
// b.mux.Lock()
// defer b.mux.Unlock()

// if b.api != nil {
// return b.api, nil
// }

// api, err := tgbotapi.NewBotAPI(b.Config.Token)
// if err != nil {
// return nil, err
// }

// api.Debug = b.Config.Debug
// b.api = api

// return api, nil
// }

// func (b *TelegramBot) Listen(ctx context.Context) (tgbotapi.UpdatesChannel, error) {
// u := tgbotapi.NewUpdate(0)
// u.Timeout = 60
// u.AllowedUpdates = []string{"message", "callback_query"}

// go func() {
// <-ctx.Done()
// b.api.StopReceivingUpdates()
// }()

// return b.api.GetUpdatesChan(u), nil
// }
14 changes: 11 additions & 3 deletions internal/monitorx/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ import (

"github.com/capcom6/go-infra-fx/logger"
"github.com/capcom6/service-monitor-tgbot/internal/monitorx/config"
"github.com/capcom6/service-monitor-tgbot/internal/monitorx/eventbus"
"github.com/capcom6/service-monitor-tgbot/internal/monitorx/monitor"
"github.com/capcom6/service-monitor-tgbot/internal/monitorx/storage"
"github.com/capcom6/service-monitor-tgbot/pkg/eventbus"
"github.com/capcom6/service-monitor-tgbot/pkg/events"
"go.uber.org/fx"
"go.uber.org/fx/fxevent"
"go.uber.org/zap"
Expand All @@ -18,7 +19,7 @@ import (

func Run() {
module := fx.Module(
"bot",
"monitor",
logger.Module,
config.Module,
monitor.Module,
Expand Down Expand Up @@ -51,8 +52,15 @@ func Run() {
for v := range ch {
logger.Debug("probe", zap.Any("state", v))

event := events.NewServiceStateChangedEvent(v.Id, v.Name, string(v.State), v.Error)
payload, err := event.Encode()
if err != nil {
logger.Error("failed to encode event", zap.Error(err))
continue
}

ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
if err := eventbus.Send(ctx, v); err != nil {
if err := eventbus.Send(ctx, payload); err != nil {
logger.Error("failed to send event", zap.Error(err))
}
cancel()
Expand Down
4 changes: 2 additions & 2 deletions internal/monitorx/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ type Config struct {
}

type Storage struct {
DSN string `yaml:"dsn"`
DSN string `yaml:"dsn" envconfig:"STORAGE__DSN" validate:"required"`
}

type EventBus struct {
DSN string `yaml:"dsn"`
DSN string `yaml:"dsn" envconfig:"EVENTBUS__DSN" validate:"required"`
}
5 changes: 3 additions & 2 deletions internal/monitorx/config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package config

var (
defaultConfig Config = Config{
Storage: Storage{
DSN: "file://./config.yaml",
Storage: Storage{DSN: "file://./config.yaml"},
EventBus: EventBus{
DSN: "redis://localhost:6379/0?channel=events",
},
}
)
2 changes: 1 addition & 1 deletion internal/monitorx/config/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package config

import (
"github.com/capcom6/go-infra-fx/config"
"github.com/capcom6/service-monitor-tgbot/internal/monitorx/eventbus"
"github.com/capcom6/service-monitor-tgbot/internal/monitorx/storage"
"github.com/capcom6/service-monitor-tgbot/pkg/eventbus"
"go.uber.org/fx"
"go.uber.org/zap"
)
Expand Down
47 changes: 0 additions & 47 deletions internal/monitorx/eventbus/redis.go

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import (
)

type EventBus interface {
Send(ctx context.Context, event interface{}) error
Send(ctx context.Context, event string) error
Receive(ctx context.Context) (<-chan string, error)
}

type Params struct {
Expand All @@ -22,20 +23,23 @@ type Params struct {
}

var Module = fx.Module(
"storage",
"eventbus",
fx.Decorate(func(log *zap.Logger) *zap.Logger {
return log.Named("eventbus")
}),
fx.Provide(func(p Params) (EventBus, error) {
return New(p.Config)
return New(p.Config, p.Logger)
}),
)

func New(cfg Config) (EventBus, error) {
func New(cfg Config, logger *zap.Logger) (EventBus, error) {
u, err := url.Parse(cfg.DSN)
if err != nil {
return nil, fmt.Errorf("invalid dsn: %w", err)
}

if u.Scheme == "redis" || u.Scheme == "rediss" {
return newRedisBus(u)
return newRedisBus(u, logger)
}

return nil, errors.New("unknown scheme")
Expand Down
Loading

0 comments on commit 73fde8b

Please sign in to comment.