Skip to content

Commit

Permalink
basic alerting, for internal use primarily
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Dec 7, 2023
1 parent 92e50b3 commit 8b2c0f0
Show file tree
Hide file tree
Showing 8 changed files with 138 additions and 21 deletions.
2 changes: 1 addition & 1 deletion docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ services:
dockerfile: stacks/flow.Dockerfile
target: flow-snapshot-worker
environment:
<<: [*flow-worker-env]
<<: [*catalog-config, *flow-worker-env]
depends_on:
temporal-admin-tools:
condition: service_healthy
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ services:
container_name: flow-snapshot-worker
image: ghcr.io/peerdb-io/flow-snapshot-worker:latest-dev
environment:
<<: [*flow-worker-env]
<<: [*catalog-config, *flow-worker-env]
depends_on:
temporal-admin-tools:
condition: service_healthy
Expand Down
3 changes: 0 additions & 3 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,6 @@ type CDCPullConnector interface {
// PullFlowCleanup drops both the Postgres publication and replication slot, as a part of DROP MIRROR
PullFlowCleanup(jobName string) error

// SendWALHeartbeat allows for activity to progress restart_lsn on postgres.
SendWALHeartbeat() error

// GetSlotInfo returns the WAL (or equivalent) info of a slot for the connector.
GetSlotInfo(slotName string) ([]*protos.SlotInfo, error)
}
Expand Down
28 changes: 12 additions & 16 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/PeerDB-io/peer-flow/utils/evervigil"
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
Expand All @@ -29,6 +30,7 @@ type PostgresConnector struct {
tableSchemaMapping map[string]*protos.TableSchema
customTypesMapping map[uint32]string
metadataSchema string
vigil *evervigil.EverVigil
}

// NewPostgresConnector creates a new instance of PostgresConnector.
Expand Down Expand Up @@ -79,6 +81,11 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig)
metadataSchema = *pgConfig.MetadataSchema
}

vigil, err := evervigil.NewVigil()
if err != nil {
return nil, fmt.Errorf("failed to initialize vigil: %w", err)
}

return &PostgresConnector{
connStr: connectionString,
ctx: ctx,
Expand All @@ -87,6 +94,7 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig)
replPool: replPool,
customTypesMapping: customTypeMap,
metadataSchema: metadataSchema,
vigil: vigil,
}, nil
}

Expand All @@ -100,6 +108,10 @@ func (c *PostgresConnector) Close() error {
c.replPool.Close()
}

if c.vigil != nil {
c.vigil.Close()
}

return nil
}

Expand Down Expand Up @@ -883,19 +895,3 @@ func (c *PostgresConnector) SyncFlowCleanup(jobName string) error {
}
return nil
}

func (c *PostgresConnector) SendWALHeartbeat() error {
command := `
BEGIN;
DROP aggregate IF EXISTS PEERDB_EPHEMERAL_HEARTBEAT(float4);
CREATE AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4) (SFUNC = float4pl, STYPE = float4);
DROP aggregate PEERDB_EPHEMERAL_HEARTBEAT(float4);
END;
`
_, err := c.pool.Exec(c.ctx, command)
if err != nil {
return fmt.Errorf("error bumping wal position: %w", err)
}

return nil
}
3 changes: 3 additions & 0 deletions flow/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,18 @@ require (
github.com/cockroachdb/redact v1.1.5 // indirect
github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect
github.com/getsentry/sentry-go v0.18.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/nikoksr/notify v0.41.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.12.0 // indirect
github.com/prometheus/client_model v0.2.1-0.20210607210712-147c58e9608a // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect
github.com/slack-go/slack v0.12.2 // indirect
)

require (
Expand Down
9 changes: 9 additions & 0 deletions flow/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG
github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE=
github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-test/deep v1.0.4/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA=
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 h1:ZpnhV/YsD2/4cESfV5+Hoeu/iUR3ruzNvZ+yQfO03a0=
Expand Down Expand Up @@ -273,6 +274,7 @@ github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
Expand Down Expand Up @@ -300,6 +302,9 @@ github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/googleapis/gax-go/v2 v2.12.0 h1:A+gCJKdRfqXkr+BIRGtZLibNXf0m1f9E4HG56etFpas=
github.com/googleapis/gax-go/v2 v2.12.0/go.mod h1:y+aIqrI5eb1YGMVJfuV3185Ts/D7qKpsEkdD5+I6QGU=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grafana/pyroscope-go v1.0.4 h1:oyQX0BOkL+iARXzHuCdIF5TQ7/sRSel1YFViMHC7Bm0=
github.com/grafana/pyroscope-go v1.0.4/go.mod h1:0d7ftwSMBV/Awm7CCiYmHQEG8Y44Ma3YSjt+nWcWztY=
github.com/grafana/pyroscope-go/godeltaprof v0.1.5 h1:gkFVqihFRL1Nro2FCC0u6mW47jclef96Zu8I/ykq+4E=
Expand Down Expand Up @@ -402,6 +407,8 @@ github.com/mtibben/percent v0.2.1/go.mod h1:KG9uO+SZkUp+VkRHsCdYQV3XSZrrSpR3O9ib
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/nikoksr/notify v0.41.0 h1:4LGE41GpWdHX5M3Xo6DlWRwS2WLDbOq1Rk7IzY4vjmQ=
github.com/nikoksr/notify v0.41.0/go.mod h1:FoE0UVPeopz1Vy5nm9vQZ+JVmYjEIjQgbFstbkw+cRE=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/orcaman/concurrent-map/v2 v2.0.1 h1:jOJ5Pg2w1oeB6PeDurIYf6k9PQ+aTITr/6lP/L/zp6c=
github.com/orcaman/concurrent-map/v2 v2.0.1/go.mod h1:9Eq3TG2oBe5FirmYWQfYO5iH1q0Jv47PLaNK++uCdOM=
Expand Down Expand Up @@ -457,6 +464,8 @@ github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6Mwd
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/slack-go/slack v0.12.2 h1:x3OppyMyGIbbiyFhsBmpf9pwkUzMhthJMRNmNlA4LaQ=
github.com/slack-go/slack v0.12.2/go.mod h1:hlGi5oXA+Gt+yWTPP0plCdRKmjsDxecdHxYQdlMQKOw=
github.com/snowflakedb/gosnowflake v1.6.25 h1:o5zUmxTOo0Eo9AdkEj8blCeiMuILrQJ+rjUMAeZhcRE=
github.com/snowflakedb/gosnowflake v1.6.25/go.mod h1:KfO4F7bk+aXPUIvBqYxvPhxLlu2/w4TtSC8Rw/yr5Mg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
99 changes: 99 additions & 0 deletions flow/utils/evervigil/ever_vigil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package evervigil

import (
"context"
"encoding/json"
"fmt"
"time"

catalog "github.com/PeerDB-io/peer-flow/connectors/utils/catalog"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/nikoksr/notify"
"github.com/nikoksr/notify/service/slack"
"github.com/sirupsen/logrus"
)

// alerting service, cool name
type EverVigil struct {
notifier *notify.Notify
catalogPool *pgxpool.Pool
}

type slackServiceConfig struct {
AuthToken string `json:"auth_token"`
ChannelIDs []string `json:"channel_ids"`
}

func NewVigil() (*EverVigil, error) {
catalogPool, catalogErr := catalog.GetCatalogConnectionPoolFromEnv()
if catalogErr != nil {
return nil, fmt.Errorf("error getting catalog connection pool: %w", catalogErr)
}

notifier := notify.New()

rows, err := catalogPool.Query(context.Background(),
"SELECT service_type,service_config FROM peerdb_stats.alerting_config")
if err != nil {
return nil, fmt.Errorf("failed to read everVigil config from catalog: %w", err)
}

var serviceType, serviceConfig string
_, err = pgx.ForEachRow(rows, []any{&serviceType, &serviceConfig}, func() error {
switch serviceType {
case "slack":
var slackServiceConfig slackServiceConfig
err = json.Unmarshal([]byte(serviceConfig), &slackServiceConfig)
if err != nil {
return fmt.Errorf("failed to unmarshal Slack service config: %w", err)
}

slackService := slack.New(slackServiceConfig.AuthToken)
slackService.AddReceivers(slackServiceConfig.ChannelIDs...)
notifier.UseServices(slackService)
default:
return fmt.Errorf("unknown service type: %s", serviceType)
}
return nil
})

return &EverVigil{
notifier: notifier,
catalogPool: catalogPool,
}, nil
}

func (ev *EverVigil) Close() {
if ev.catalogPool != nil {
ev.catalogPool.Close()
}
}

// Only raises an alert if another alert with the same key hasn't been raised
// in the past 15 minutes
func (ev *EverVigil) AlertIf(alertKey string, alertMessage string) {
row := ev.catalogPool.QueryRow(context.Background(),
`SELECT created_timestamp FROM peerdb_stats.alerts_v1 WHERE alert_key=$1
ORDER BY created_timestamp DESC LIMIT 1`,
alertKey)
var createdTimestamp time.Time
err := row.Scan(&createdTimestamp)
if err != nil && err != pgx.ErrNoRows {
logrus.Warnf("failed to send alert: %v", err)
return
}

if time.Since(createdTimestamp) >= 15*time.Minute {
err = ev.notifier.Send(context.Background(),
fmt.Sprintf(":rotating_light: *Alert Alert* :rotating_light:: %s since %s", alertKey,
time.Now().Format("2006-01-02 15:04:05.999999")), alertMessage)
if err != nil {
logrus.Warnf("failed to send alert: %v", err)
return
}
_, _ = ev.catalogPool.Exec(context.Background(),
"INSERT INTO peerdb_stats.alerts_v1(alert_key,alert_message) VALUES($1,$2)",
alertKey, alertMessage)
}
}
13 changes: 13 additions & 0 deletions nexus/catalog/migrations/V12__alerting_config_init.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
CREATE TABLE IF NOT EXISTS peerdb_stats.alerting_config (
id BIGINT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
service_type TEXT NOT NULL CHECK (service_type IN ('slack')),
service_config JSONB NOT NULL
);

CREATE TABLE IF NOT EXISTS peerdb_stats.alerts_v1 (
id BIGINT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
alert_key TEXT NOT NULL,
alert_level TEXT NOT NULL CHECK (alert_level IN ('critical')) DEFAULT 'critical',
alert_message TEXT NOT NULL,
created_timestamp TIMESTAMP DEFAULT now()
);

0 comments on commit 8b2c0f0

Please sign in to comment.