Skip to content

Commit

Permalink
[*] prettify code
Browse files Browse the repository at this point in the history
  • Loading branch information
pashagolub committed May 20, 2024
1 parent bb6abf1 commit b6be453
Showing 1 changed file with 12 additions and 8 deletions.
20 changes: 12 additions & 8 deletions src/sinks/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
var (
cacheLimit = 512
highLoadTimeout = time.Second * 5
deleterDelay = time.Hour
)

func NewPostgresWriter(ctx context.Context, connstr string, opts *config.MeasurementOpts, metricDefs *metrics.Metrics) (pgw *PostgresWriter, err error) {
Expand Down Expand Up @@ -60,8 +61,8 @@ func NewWriterFromPostgresConn(ctx context.Context, conn db.PgxPoolIface, opts *
if err = pgw.EnsureBuiltinMetricDummies(); err != nil {
return
}
go pgw.OldPostgresMetricsDeleter()
go pgw.UniqueDbnamesListingMaintainer()
go pgw.deleteOldPartitions(deleterDelay)
go pgw.maintainUniqueSources()
go pgw.poll()
return
}
Expand Down Expand Up @@ -484,7 +485,8 @@ func (pgw *PostgresWriter) EnsureMetricDbnameTime(metricDbnamePartBounds map[str
return nil
}

func (pgw *PostgresWriter) OldPostgresMetricsDeleter() {
// deleteOldPartitions is a background task that deletes old partitions from the measurements DB
func (pgw *PostgresWriter) deleteOldPartitions(delay time.Duration) {
metricAgeDaysThreshold := pgw.opts.Retention
if metricAgeDaysThreshold <= 0 {
return
Expand All @@ -493,7 +495,7 @@ func (pgw *PostgresWriter) OldPostgresMetricsDeleter() {
select {
case <-pgw.сtx.Done():
return
case <-time.After(time.Hour):
case <-time.After(delay):
// to reduce distracting log messages at startup
}

Expand All @@ -515,7 +517,7 @@ func (pgw *PostgresWriter) OldPostgresMetricsDeleter() {
if len(partsToDrop) > 0 {
logger.Infof("Dropping %d old metric partitions one by one...", len(partsToDrop))
for _, toDrop := range partsToDrop {
sqlDropTable := `DROP TABLE IF EXISTS ` + pgx.Identifier{toDrop}.Sanitize()
sqlDropTable := `DROP TABLE IF EXISTS ` + toDrop
logger.Debugf("Dropping old metric data partition: %s", toDrop)

if _, err := pgw.sinkDb.Exec(pgw.сtx, sqlDropTable); err != nil {
Expand All @@ -537,7 +539,9 @@ func (pgw *PostgresWriter) OldPostgresMetricsDeleter() {
}
}

func (pgw *PostgresWriter) UniqueDbnamesListingMaintainer() {
// maintainUniqueSources is a background task that maintains a listing of unique sources for each metric.
// This is used to avoid listing the same source multiple times in Grafana dropdowns.
func (pgw *PostgresWriter) maintainUniqueSources() {
logger := log.GetLogger(pgw.сtx)
// due to metrics deletion the listing can go out of sync (a trigger not really wanted)
sqlGetAdvisoryLock := `SELECT pg_try_advisory_lock(1571543679778230000) AS have_lock` // 1571543679778230000 is just a random bigint
Expand All @@ -548,8 +552,8 @@ func (pgw *PostgresWriter) UniqueDbnamesListingMaintainer() {
UNION
SELECT (SELECT MIN(dbname) FROM %s WHERE dbname > t.dbname) FROM t )
SELECT dbname FROM t WHERE dbname NOTNULL ORDER BY 1`
sqlDelete := `DELETE FROM admin.all_distinct_dbname_metrics WHERE NOT dbname = ANY($1) and metric = $2 RETURNING *`
sqlDeleteAll := `DELETE FROM admin.all_distinct_dbname_metrics WHERE metric = $1 RETURNING *`
sqlDelete := `DELETE FROM admin.all_distinct_dbname_metrics WHERE NOT dbname = ANY($1) and metric = $2`
sqlDeleteAll := `DELETE FROM admin.all_distinct_dbname_metrics WHERE metric = $1`
sqlAdd := `
INSERT INTO admin.all_distinct_dbname_metrics SELECT u, $2 FROM (select unnest($1::text[]) as u) x
WHERE NOT EXISTS (select * from admin.all_distinct_dbname_metrics where dbname = u and metric = $2)
Expand Down

0 comments on commit b6be453

Please sign in to comment.