Skip to content

Commit

Permalink
bugfix: postgresql exporter delete task (#119)
Browse files Browse the repository at this point in the history
  • Loading branch information
tzaffi committed Jul 21, 2023
1 parent f2711fa commit 98efe5a
Show file tree
Hide file tree
Showing 20 changed files with 345 additions and 438 deletions.
2 changes: 1 addition & 1 deletion conduit/data/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"time"

log "github.com/sirupsen/logrus"
"gopkg.in/yaml.v3"
yaml "gopkg.in/yaml.v3"
)

// DefaultConfigBaseName is the default conduit configuration filename without the extension.
Expand Down
2 changes: 1 addition & 1 deletion conduit/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
"gopkg.in/yaml.v3"
yaml "gopkg.in/yaml.v3"

sdk "github.com/algorand/go-algorand-sdk/v2/types"

Expand Down
23 changes: 12 additions & 11 deletions conduit/pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"os"
"path"
"path/filepath"
"regexp"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -914,32 +915,32 @@ func TestMetrics(t *testing.T) {
if strings.HasSuffix(*stat.Name, metrics.BlockImportTimeName) {
found++
// 1 hour in seconds
assert.Contains(t, stat.String(), "sample_count:1 sample_sum:3600")
assert.Regexp(t, regexp.MustCompile("sample_count:1 +sample_sum:3600"), stat.String())
}
if strings.HasSuffix(*stat.Name, metrics.ImportedRoundGaugeName) {
found++
assert.Contains(t, stat.String(), "value:1234")
}
if strings.HasSuffix(*stat.Name, metrics.ImportedTxnsPerBlockName) {
found++
assert.Contains(t, stat.String(), "sample_count:1 sample_sum:14")
assert.Regexp(t, regexp.MustCompile("sample_count:1 +sample_sum:14"), stat.String())
}
if strings.HasSuffix(*stat.Name, metrics.ImportedTxnsName) {
found++
str := stat.String()
// the 6 single txns
assert.Contains(t, str, `label:<name:"txn_type" value:"acfg" > gauge:<value:1 >`)
assert.Contains(t, str, `label:<name:"txn_type" value:"afrz" > gauge:<value:1 >`)
assert.Contains(t, str, `label:<name:"txn_type" value:"axfer" > gauge:<value:1 >`)
assert.Contains(t, str, `label:<name:"txn_type" value:"keyreg" > gauge:<value:1 >`)
assert.Contains(t, str, `label:<name:"txn_type" value:"pay" > gauge:<value:1 >`)
assert.Contains(t, str, `label:<name:"txn_type" value:"stpf" > gauge:<value:1 >`)
assert.Regexp(t, regexp.MustCompile(`label:.name:"txn_type" +value:"acfg". +gauge:.value:1.`), str)
assert.Regexp(t, regexp.MustCompile(`label:.name:"txn_type" +value:"afrz". +gauge:.value:1.`), str)
assert.Regexp(t, regexp.MustCompile(`label:.name:"txn_type" +value:"axfer". +gauge:.value:1.`), str)
assert.Regexp(t, regexp.MustCompile(`label:.name:"txn_type" +value:"keyreg". +gauge:.value:1.`), str)
assert.Regexp(t, regexp.MustCompile(`label:.name:"txn_type" +value:"pay". +gauge:.value:1.`), str)
assert.Regexp(t, regexp.MustCompile(`label:.name:"txn_type" +value:"stpf". +gauge:.value:1.`), str)

// 2 app call txns
assert.Contains(t, str, `label:<name:"txn_type" value:"appl" > gauge:<value:2 >`)
// // 2 app call txns
assert.Regexp(t, regexp.MustCompile(`label:.name:"txn_type" +value:"appl". +gauge:.value:2.`), str)

// 1 app had 6 inner txns
assert.Contains(t, str, `label:<name:"txn_type" value:"inner" > gauge:<value:6 >`)
assert.Regexp(t, regexp.MustCompile(`label:.name:"txn_type" +value:"inner". +gauge:.value:6.`), str)
}
}
assert.Equal(t, 4, found)
Expand Down
2 changes: 1 addition & 1 deletion conduit/plugins/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package plugins

import "gopkg.in/yaml.v3"
import yaml "gopkg.in/yaml.v3"

// PluginConfig is a generic string which can be deserialized by each individual Plugin
type PluginConfig struct {
Expand Down
21 changes: 12 additions & 9 deletions conduit/plugins/exporters/postgresql/postgresql_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,12 @@ func (exp *postgresqlExporter) Metadata() plugins.Metadata {
}

// createIndexerDB common code for creating the IndexerDb instance.
func createIndexerDB(logger *logrus.Logger, readonly bool, cfg plugins.PluginConfig) (idb.IndexerDb, chan struct{}, error) {
func createIndexerDB(logger *logrus.Logger, readonly bool, cfg plugins.PluginConfig) (idb.IndexerDb, chan struct{}, ExporterConfig, error) {
var eCfg ExporterConfig
if err := cfg.UnmarshalConfig(&eCfg); err != nil {
return nil, nil, fmt.Errorf("connect failure in unmarshalConfig: %v", err)
return nil, nil, eCfg, fmt.Errorf("connect failure in unmarshalConfig: %v", err)
}
logger.Debugf("createIndexerDB: eCfg.Delete=%+v", eCfg.Delete)

// Inject a dummy db for unit testing
dbName := "postgres"
Expand All @@ -73,14 +74,14 @@ func createIndexerDB(logger *logrus.Logger, readonly bool, cfg plugins.PluginCon
// connecting to a local instance that's running.
// this behavior can be reproduced in TestConnectDbFailure.
if !eCfg.Test && eCfg.ConnectionString == "" {
return nil, nil, fmt.Errorf("connection string is empty for %s", dbName)
return nil, nil, eCfg, fmt.Errorf("connection string is empty for %s", dbName)
}
db, ready, err := idb.IndexerDbByName(dbName, eCfg.ConnectionString, opts, logger)
if err != nil {
return nil, nil, fmt.Errorf("connect failure constructing db, %s: %v", dbName, err)
return nil, nil, eCfg, fmt.Errorf("connect failure constructing db, %s: %v", dbName, err)
}

return db, ready, nil
return db, ready, eCfg, nil
}

// RoundRequest connects to the database, queries the round, and closes the
Expand All @@ -90,7 +91,7 @@ func (exp *postgresqlExporter) RoundRequest(cfg plugins.PluginConfig) (uint64, e
nullLogger := logrus.New()
nullLogger.Out = io.Discard // no logging

db, _, err := createIndexerDB(nullLogger, true, cfg)
db, _, _, err := createIndexerDB(nullLogger, true, cfg)
if err != nil {
// Assume the error is related to an uninitialized DB.
// If it is something more serious, the failure will be detected during Init.
Expand All @@ -112,10 +113,11 @@ func (exp *postgresqlExporter) Init(ctx context.Context, initProvider data.InitP
exp.ctx, exp.cf = context.WithCancel(ctx)
exp.logger = logger

db, ready, err := createIndexerDB(exp.logger, false, cfg)
db, ready, exporterConfig, err := createIndexerDB(exp.logger, false, cfg)
if err != nil {
return fmt.Errorf("db create error: %v", err)
}
exp.cfg = exporterConfig
<-ready

exp.db = db
Expand All @@ -132,8 +134,9 @@ func (exp *postgresqlExporter) Init(ctx context.Context, initProvider data.InitP
}
exp.round = uint64(initProvider.NextDBRound())

// if data pruning is enabled
if !exp.cfg.Test && exp.cfg.Delete.Rounds > 0 {
dataPruningEnabled := !exp.cfg.Test && exp.cfg.Delete.Rounds > 0
exp.logger.Debugf("postgresql exporter Init(): data pruning enabled: %t; exp.cfg.Delete: %+v", dataPruningEnabled, exp.cfg.Delete)
if dataPruningEnabled {
exp.dm = util.MakeDataManager(exp.ctx, &exp.cfg.Delete, exp.db, logger)
exp.wg.Add(1)
go exp.dm.DeleteLoop(&exp.wg, &exp.round)
Expand Down
2 changes: 2 additions & 0 deletions conduit/plugins/exporters/postgresql/util/prune.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ func MakeDataManager(ctx context.Context, cfg *PruneConfigurations, db idb.Index
func (p *postgresql) DeleteLoop(wg *sync.WaitGroup, nextRound *uint64) {

defer wg.Done()

p.logger.Debugf("DeleteLoop(): starting delete loop")
// If the interval is disabled
if p.config.Interval == disabled {
// A helpful warning to say that despite a number of rounds being above 0
Expand Down
2 changes: 1 addition & 1 deletion conduit/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"time"

"github.com/google/uuid"
"github.com/opensearch-project/opensearch-go/v2"
opensearch "github.com/opensearch-project/opensearch-go/v2"
"github.com/opensearch-project/opensearch-go/v2/opensearchapi"

"github.com/algorand/conduit/version"
Expand Down
2 changes: 1 addition & 1 deletion conduit/telemetry/telemetryCommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package telemetry
import (
"time"

"github.com/opensearch-project/opensearch-go/v2"
opensearch "github.com/opensearch-project/opensearch-go/v2"
)

// Config represents the configuration of Telemetry logging
Expand Down
1 change: 1 addition & 0 deletions e2e_tests/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ requires-python = ">=3.8"
dependencies = [
"boto3==1.24.71",
"msgpack==1.0.4",
"psycopg2==2.9.6",
"py-algorand-sdk==1.17.0",
"pytest==6.2.5",
"PyYAML==6.0",
Expand Down
46 changes: 46 additions & 0 deletions e2e_tests/src/e2e_common/indexer_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import psycopg2


class IndexerDB:
def __init__(self, host, port, user, password, dbname):
self.host = host
self.port = port
self.user = user
self.password = password
self.dbname = dbname

@classmethod
def from_connection_string(cls, connection_string):
init_args = {
keyval.split("=")[0]: keyval.split("=")[1]
for keyval in connection_string.split()
}
return cls(
host=init_args["host"],
port=init_args["port"],
user=init_args["user"],
password=init_args["password"],
dbname=init_args["dbname"],
)

def select_one(self, query) -> tuple:
with psycopg2.connect(
host=self.host,
port=self.port,
user=self.user,
password=self.password,
dbname=self.dbname,
) as connection:
with connection.cursor() as cursor:
cursor.execute(query)
return cursor.fetchone() # type: ignore

def get_txn_min_max_round(self):
min_round, max_round = self.select_one("SELECT min(round), max(round) FROM txn")
return min_round, max_round

def get_table_row_count(self, table_name):
return self.select_one(f"SELECT count(*) FROM {table_name}")[0]

def get_block_header_final_round(self):
return self.select_one("SELECT max(round) FROM block_header")[0]
31 changes: 20 additions & 11 deletions e2e_tests/src/e2e_conduit/e2econduit.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
#!/usr/bin/env python3
#

import argparse
import logging
import os
import sys

from e2e_common.util import find_binary
import e2e_conduit.fixtures.importers as importers
import e2e_conduit.fixtures.processors as processors
import e2e_conduit.fixtures.exporters as exporters
from e2e_conduit.runner import ConduitE2ETestRunner
from e2e_conduit.scenarios import scenarios
from e2e_conduit.scenarios.follower_indexer_scenario import follower_indexer_scenario
from e2e_conduit.scenarios.filter_scenario import app_filter_indexer_scenario, pay_filter_indexer_scenario
from e2e_conduit.scenarios.follower_indexer_scenario import (
FollowerIndexerScenario,
FollowerIndexerScenarioWithDeleteTask,
)
from e2e_conduit.scenarios.filter_scenario import (
app_filter_indexer_scenario,
pay_filter_indexer_scenario,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -43,21 +44,29 @@ def main():
else:
logging.basicConfig(level=logging.INFO)
sourcenet = args.source_net
source_is_tar = False
if not sourcenet:
e2edata = os.getenv("E2EDATA")
sourcenet = e2edata and os.path.join(e2edata, "net")
importer_source = sourcenet if sourcenet else args.s3_source_net
if importer_source:
scenarios.append(follower_indexer_scenario(importer_source))
scenarios.append(app_filter_indexer_scenario(importer_source))
scenarios.append(pay_filter_indexer_scenario(importer_source))
scenarios.extend(
[
FollowerIndexerScenario(importer_source),
FollowerIndexerScenarioWithDeleteTask(importer_source),
app_filter_indexer_scenario(importer_source),
pay_filter_indexer_scenario(importer_source),
]
)

runner = ConduitE2ETestRunner(args.conduit_bin, keep_temps=args.keep_temps)

success = True
for scenario in scenarios:
runner.setup_scenario(scenario)
if scenario.exporter.name == "postgresql":
print(
f"postgresql exporter with connect info: {scenario.exporter.config_input['connection-string']}"
)
if runner.run_scenario(scenario) != 0:
success = False
return 0 if success else 1
Expand Down
14 changes: 12 additions & 2 deletions e2e_tests/src/e2e_conduit/fixtures/exporters/postgresql.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from dataclasses import dataclass, asdict
import logging
import random
import string
Expand All @@ -9,16 +10,24 @@

logger = logging.getLogger(__name__)

CONFIG_DELETE_TASK = "delete-task"

@dataclass
class DeleteTask:
interval: int
rounds: int


class PostgresqlExporter(PluginFixture):
def __init__(self, max_conn=0):
def __init__(self, max_conn=0, delete_interval=0, delete_rounds=0):
self.user = "algorand"
self.password = "algorand"
self.db_name = "e2e_db"
# Should we have a random port here so that we can run multiple of these in parallel?
self.port = "45432"
self.container_name = ""
self.max_conn = max_conn
self.delete_task = DeleteTask(delete_interval, delete_rounds)
super().__init__()

@property
Expand All @@ -27,7 +36,7 @@ def name(self):

def setup(self, _):
self.container_name = "".join(
random.choice(string.ascii_lowercase) for i in range(10)
random.choice(string.ascii_lowercase) for _ in range(10)
)
self.port = f"{random.randint(1150, 65535)}"
try:
Expand Down Expand Up @@ -60,4 +69,5 @@ def resolve_config_input(self):
self.config_input = {
"connection-string": f"host=localhost port={self.port} user={self.user} password={self.password} dbname={self.db_name} sslmode=disable",
"max-conn": self.max_conn,
CONFIG_DELETE_TASK: asdict(self.delete_task),
}
18 changes: 7 additions & 11 deletions e2e_tests/src/e2e_conduit/fixtures/importers/follower_algod.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
import atexit
import glob
import json
import logging
import os
import shutil
import tempfile

import boto3
from botocore.config import Config
from botocore import UNSIGNED

from e2e_common.util import hassuffix, xrun, firstFromS3Prefix, countblocks, atexitrun
from e2e_conduit.fixtures.importers.importer_plugin import ImporterPlugin

Expand Down Expand Up @@ -67,8 +63,8 @@ def setup(self, accumulated_config):
if "/" in tarname:
cmhash_tarnme = tarname.split("/")
cmhash = cmhash_tarnme[0]
tarname =cmhash_tarnme[1]
prefix+="/"+cmhash
tarname = cmhash_tarnme[1]
prefix += "/" + cmhash
tarpath = os.path.join(conduit_dir, tarname)
else:
tarpath = os.path.join(conduit_dir, tarname)
Expand All @@ -90,14 +86,14 @@ def setup(self, accumulated_config):
self.last = countblocks(blockfiles[0])
# Reset the secondary node, and enable follow mode.
# This is what conduit will connect to for data access.
for root, dirs, files in os.walk(os.path.join(tempnet, 'Node', 'tbd-v1')):
for f in files:
if ".sqlite" in f:
os.remove(os.path.join(root, f))
for root, _, files in os.walk(os.path.join(tempnet, "Node", "tbd-v1")):
for f in files:
if ".sqlite" in f:
os.remove(os.path.join(root, f))
cf = {}
with open(os.path.join(tempnet, "Node", "config.json"), "r") as config_file:
cf = json.load(config_file)
cf['EnableFollowMode'] = True
cf["EnableFollowMode"] = True
with open(os.path.join(tempnet, "Node", "config.json"), "w") as config_file:
config_file.write(json.dumps(cf))
try:
Expand Down
Loading

0 comments on commit 98efe5a

Please sign in to comment.