Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add core logic to support access token in postgres scaler #5589

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
6354167
add core logic to support access token in postgres scaler
Ferdinanddb Mar 11, 2024
b2855ec
minor fix
Ferdinanddb Mar 11, 2024
f3760a5
run make build to fmt code
Ferdinanddb Mar 11, 2024
bee72e1
make regexp password pattern global
Ferdinanddb Mar 13, 2024
de5b4ae
adapt to use placeholder for regexp
Ferdinanddb Mar 13, 2024
1a4e372
add missing authPodIdentity variable
Ferdinanddb Mar 13, 2024
fc778ca
Merge branch 'main' into feature/support-access-token-postgres-scaler
Ferdinanddb Mar 13, 2024
d3b0456
lint code using gci write... command
Ferdinanddb Mar 13, 2024
1885608
lint import + add 2 unite tests
Ferdinanddb Mar 14, 2024
ce1c2d1
lint with make fmt
Ferdinanddb Mar 14, 2024
c70e464
Merge branch 'main' into feature/support-access-token-postgres-scaler
Ferdinanddb May 18, 2024
1034b6c
remove podIdentityAzure references (but keep AzureWorkload ones)
Ferdinanddb May 18, 2024
5d9bcee
replace switch by if statements + fix error when comparing + close co…
Ferdinanddb May 19, 2024
acd9e28
generate a new token if the current one has expired + add log info st…
Ferdinanddb May 21, 2024
ff31521
Merge branch 'main' into feature/support-access-token-postgres-scaler
Ferdinanddb May 21, 2024
239b8e2
minor change + add entry in CHANGELOG.md
Ferdinanddb May 23, 2024
6abe03b
Merge branch 'main' into feature/support-access-token-postgres-scaler
Ferdinanddb May 27, 2024
f407153
Merge branch 'main' into feature/support-access-token-postgres-scaler
Ferdinanddb May 31, 2024
6fd7dfa
Add first draft of an e2e test
Ferdinanddb Jun 2, 2024
9ed7d39
Add comment and change package name
Ferdinanddb Jun 2, 2024
432c280
fix golanci lint
Ferdinanddb Jun 2, 2024
3783d1c
use identity 1 in e2e tests
Ferdinanddb Jun 4, 2024
b358784
Merge branch 'main' into feature/support-access-token-postgres-scaler
Ferdinanddb Jun 4, 2024
01471e6
Merge branch 'main' into feature/support-access-token-postgres-scaler
Ferdinanddb Jun 4, 2024
cc5636c
Merge branch 'main' into feature/support-access-token-postgres-scaler
Ferdinanddb Jun 8, 2024
718d7de
fix e2e tests after testing it + change .env file
Ferdinanddb Jun 8, 2024
d0a0455
go fmt
Ferdinanddb Jun 8, 2024
9bc3a6e
remove entries in .env file
Ferdinanddb Jun 10, 2024
85543de
Merge branch 'main' into feature/support-access-token-postgres-scaler
Ferdinanddb Jun 22, 2024
0fb1288
Add Postgres env variables
Ferdinanddb Jun 22, 2024
6fa97f9
Merge branch 'main' into feature/support-access-token-postgres-scaler
Ferdinanddb Jun 24, 2024
548e6d3
remove useless variables
Ferdinanddb Jun 24, 2024
84e9252
Update e2e test to reset all the task using a query
JorTurFer Jul 24, 2024
f2ef6af
Merge branch 'main' into feature/support-access-token-postgres-scaler
JorTurFer Jul 24, 2024
ea13133
missing changes after rebase
JorTurFer Jul 24, 2024
1133382
fix typo in the query
JorTurFer Jul 24, 2024
130bf5a
remove the load
JorTurFer Jul 24, 2024
21b8e3c
fix style
JorTurFer Jul 24, 2024
1a17069
Merge branch 'main' into feature/support-access-token-postgres-scaler
Ferdinanddb Jul 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ Here is an overview of all new **experimental** features:
- **IBM MQ Scaler**: Add TLS support for IBM MQ scaler ([#5974](https://github.com/kedacore/keda/issues/5974))
- **Kafka**: Fix logic to scale to zero on invalid offset even with earliest offsetResetPolicy ([#5689](https://github.com/kedacore/keda/issues/5689))
- **MYSQL Scaler**: Add support to fetch username from env ([#5883](https://github.com/kedacore/keda/issues/5883))
- **Postgres Scaler**: Add support for access token authentication to an Azure Postgres Flexible Server ([#5823](https://github.com/kedacore/keda/issues/5823))

### Fixes

Expand Down
13 changes: 6 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1930,16 +1930,15 @@ github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brv
github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ=
github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48=
github.com/hashicorp/go-hclog v0.9.2/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ=
github.com/hashicorp/go-hclog v1.6.2 h1:NOtoftovWkDheyUM/8JW3QMiXyxJK3uHRK7wV04nD2I=
github.com/hashicorp/go-hclog v1.6.2/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M=
github.com/hashicorp/go-hclog v1.6.3 h1:Qr2kF+eVWjTiYmU7Y31tYlP1h0q/X3Nl3tPGdaB11/k=
github.com/hashicorp/go-hclog v1.6.3/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M=
github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/hashicorp/go-retryablehttp v0.7.5 h1:bJj+Pj19UZMIweq/iie+1u5YCdGrnxCT9yvm0e+Nd5M=
github.com/hashicorp/go-retryablehttp v0.7.5/go.mod h1:Jy/gPYAdjqffZ/yFGCFV2doI5wjtH1ewM9u8iYVjtX8=
github.com/hashicorp/go-retryablehttp v0.7.7 h1:C8hUCYzor8PIfXHa4UrZkU4VvK8o9ISHxT2Q8+VepXU=
github.com/hashicorp/go-retryablehttp v0.7.7/go.mod h1:pkQpWZeYWskR+D1tR2O5OcBFOxfA7DoAO6xtkuQnHTk=
github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU=
github.com/hashicorp/go-rootcerts v1.0.2 h1:jzhAVGtqPKbwpyCPELlgNWhE1znq+qwJtW5Oi2viEzc=
github.com/hashicorp/go-rootcerts v1.0.2/go.mod h1:pqUvnprVnM5bf7AOirdbb01K4ccR319Vf4pU3K5EGc8=
Expand Down Expand Up @@ -2023,6 +2022,8 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
github.com/jstemmer/go-junit-report/v2 v2.1.0 h1:X3+hPYlSczH9IMIpSC9CQSZA0L+BipYafciZUWHEmsc=
github.com/jstemmer/go-junit-report/v2 v2.1.0/go.mod h1:mgHVr7VUo5Tn8OLVr1cKnLuEy0M92wdRntM99h7RkgQ=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/juju/gnuflag v0.0.0-20171113085948-2ce1bb71843d/go.mod h1:2PavIy+JPciBPrBUjwbNvtwB6RQlve+hkpll6QSNmOE=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
Expand Down Expand Up @@ -2074,7 +2075,6 @@ github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVc
github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-colorable v0.1.11/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4=
github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
Expand Down Expand Up @@ -2284,7 +2284,6 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
Expand Down
210 changes: 151 additions & 59 deletions pkg/scalers/postgresql_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,41 @@ import (
"context"
"database/sql"
"fmt"
"regexp"
"strconv"
"strings"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/go-logr/logr"
// PostreSQL drive required for this scaler
_ "github.com/jackc/pgx/v5/stdlib"
_ "github.com/jackc/pgx/v5/stdlib" // PostreSQL drive required for this scaler
v2 "k8s.io/api/autoscaling/v2"
"k8s.io/metrics/pkg/apis/external_metrics"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/pkg/scalers/azure"
"github.com/kedacore/keda/v2/pkg/scalers/scalersconfig"
kedautil "github.com/kedacore/keda/v2/pkg/util"
)

const (
// Azure AD resource ID for Azure Database for PostgreSQL is https://ossrdbms-aad.database.windows.net
// https://learn.microsoft.com/en-us/azure/postgresql/single-server/how-to-connect-with-managed-identity
azureDatabasePostgresResource = "https://ossrdbms-aad.database.windows.net/.default"
)

var (
passwordConnPattern = regexp.MustCompile(`%PASSWORD%`)
)

type postgreSQLScaler struct {
metricType v2.MetricTargetType
metadata *postgreSQLMetadata
connection *sql.DB
logger logr.Logger
metricType v2.MetricTargetType
metadata *postgreSQLMetadata
connection *sql.DB
podIdentity kedav1alpha1.AuthPodIdentity
logger logr.Logger
}

type postgreSQLMetadata struct {
Expand All @@ -30,120 +47,167 @@ type postgreSQLMetadata struct {
connection string
query string
triggerIndex int
azureAuthContext azureAuthContext
}

type azureAuthContext struct {
cred *azidentity.ChainedTokenCredential
token *azcore.AccessToken
}

// NewPostgreSQLScaler creates a new postgreSQL scaler
func NewPostgreSQLScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
func NewPostgreSQLScaler(ctx context.Context, config *scalersconfig.ScalerConfig) (Scaler, error) {
metricType, err := GetMetricTargetType(config)
if err != nil {
return nil, fmt.Errorf("error getting scaler metric type: %w", err)
}

logger := InitializeLogger(config, "postgresql_scaler")

meta, err := parsePostgreSQLMetadata(config)
meta, podIdentity, err := parsePostgreSQLMetadata(logger, config)
if err != nil {
return nil, fmt.Errorf("error parsing postgreSQL metadata: %w", err)
}

conn, err := getConnection(meta, logger)
conn, err := getConnection(ctx, meta, podIdentity, logger)
if err != nil {
return nil, fmt.Errorf("error establishing postgreSQL connection: %w", err)
}
return &postgreSQLScaler{
metricType: metricType,
metadata: meta,
connection: conn,
logger: logger,
metricType: metricType,
metadata: meta,
connection: conn,
podIdentity: podIdentity,
logger: logger,
}, nil
}

func parsePostgreSQLMetadata(config *scalersconfig.ScalerConfig) (*postgreSQLMetadata, error) {
func parsePostgreSQLMetadata(logger logr.Logger, config *scalersconfig.ScalerConfig) (*postgreSQLMetadata, kedav1alpha1.AuthPodIdentity, error) {
meta := postgreSQLMetadata{}

authPodIdentity := kedav1alpha1.AuthPodIdentity{}

if val, ok := config.TriggerMetadata["query"]; ok {
meta.query = val
} else {
return nil, fmt.Errorf("no query given")
return nil, authPodIdentity, fmt.Errorf("no query given")
}

if val, ok := config.TriggerMetadata["targetQueryValue"]; ok {
targetQueryValue, err := strconv.ParseFloat(val, 64)
if err != nil {
return nil, fmt.Errorf("queryValue parsing error %w", err)
return nil, authPodIdentity, fmt.Errorf("queryValue parsing error %w", err)
}
meta.targetQueryValue = targetQueryValue
} else {
if config.AsMetricSource {
meta.targetQueryValue = 0
} else {
return nil, fmt.Errorf("no targetQueryValue given")
return nil, authPodIdentity, fmt.Errorf("no targetQueryValue given")
}
}

meta.activationTargetQueryValue = 0
if val, ok := config.TriggerMetadata["activationTargetQueryValue"]; ok {
activationTargetQueryValue, err := strconv.ParseFloat(val, 64)
if err != nil {
return nil, fmt.Errorf("activationTargetQueryValue parsing error %w", err)
return nil, authPodIdentity, fmt.Errorf("activationTargetQueryValue parsing error %w", err)
}
meta.activationTargetQueryValue = activationTargetQueryValue
}

switch {
case config.AuthParams["connection"] != "":
meta.connection = config.AuthParams["connection"]
case config.TriggerMetadata["connectionFromEnv"] != "":
meta.connection = config.ResolvedEnv[config.TriggerMetadata["connectionFromEnv"]]
default:
host, err := GetFromAuthOrMeta(config, "host")
if err != nil {
return nil, err
}
switch config.PodIdentity.Provider {
case "", kedav1alpha1.PodIdentityProviderNone:
switch {
case config.AuthParams["connection"] != "":
meta.connection = config.AuthParams["connection"]
case config.TriggerMetadata["connectionFromEnv"] != "":
meta.connection = config.ResolvedEnv[config.TriggerMetadata["connectionFromEnv"]]
default:
params, err := buildConnArray(config)
if err != nil {
return nil, authPodIdentity, fmt.Errorf("failed to parse fields related to the connection")
}

port, err := GetFromAuthOrMeta(config, "port")
if err != nil {
return nil, err
var password string
if config.AuthParams["password"] != "" {
password = config.AuthParams["password"]
} else if config.TriggerMetadata["passwordFromEnv"] != "" {
password = config.ResolvedEnv[config.TriggerMetadata["passwordFromEnv"]]
}
params = append(params, "password="+escapePostgreConnectionParameter(password))
meta.connection = strings.Join(params, " ")
}

userName, err := GetFromAuthOrMeta(config, "userName")
case kedav1alpha1.PodIdentityProviderAzureWorkload:
params, err := buildConnArray(config)
if err != nil {
return nil, err
return nil, authPodIdentity, fmt.Errorf("failed to parse fields related to the connection")
}

dbName, err := GetFromAuthOrMeta(config, "dbName")
cred, err := azure.NewChainedCredential(logger, config.PodIdentity)
if err != nil {
return nil, err
return nil, authPodIdentity, err
}
meta.azureAuthContext.cred = cred
authPodIdentity = kedav1alpha1.AuthPodIdentity{Provider: config.PodIdentity.Provider}

sslmode, err := GetFromAuthOrMeta(config, "sslmode")
if err != nil {
return nil, err
}

var password string
if config.AuthParams["password"] != "" {
password = config.AuthParams["password"]
} else if config.TriggerMetadata["passwordFromEnv"] != "" {
password = config.ResolvedEnv[config.TriggerMetadata["passwordFromEnv"]]
}

// Build connection str
var params []string
params = append(params, "host="+escapePostgreConnectionParameter(host))
params = append(params, "port="+escapePostgreConnectionParameter(port))
params = append(params, "user="+escapePostgreConnectionParameter(userName))
params = append(params, "dbname="+escapePostgreConnectionParameter(dbName))
params = append(params, "sslmode="+escapePostgreConnectionParameter(sslmode))
params = append(params, "password="+escapePostgreConnectionParameter(password))
params = append(params, "%PASSWORD%")
meta.connection = strings.Join(params, " ")
}
meta.triggerIndex = config.TriggerIndex
return &meta, nil

return &meta, authPodIdentity, nil
}

func buildConnArray(config *scalersconfig.ScalerConfig) ([]string, error) {
var params []string

host, err := GetFromAuthOrMeta(config, "host")
if err != nil {
return nil, err
}

port, err := GetFromAuthOrMeta(config, "port")
if err != nil {
return nil, err
}

userName, err := GetFromAuthOrMeta(config, "userName")
if err != nil {
return nil, err
}

dbName, err := GetFromAuthOrMeta(config, "dbName")
if err != nil {
return nil, err
}

sslmode, err := GetFromAuthOrMeta(config, "sslmode")
if err != nil {
return nil, err
}
params = append(params, "host="+escapePostgreConnectionParameter(host))
params = append(params, "port="+escapePostgreConnectionParameter(port))
params = append(params, "user="+escapePostgreConnectionParameter(userName))
params = append(params, "dbname="+escapePostgreConnectionParameter(dbName))
params = append(params, "sslmode="+escapePostgreConnectionParameter(sslmode))

return params, nil
}

func getConnection(meta *postgreSQLMetadata, logger logr.Logger) (*sql.DB, error) {
db, err := sql.Open("pgx", meta.connection)
func getConnection(ctx context.Context, meta *postgreSQLMetadata, podIdentity kedav1alpha1.AuthPodIdentity, logger logr.Logger) (*sql.DB, error) {
connectionString := meta.connection

if podIdentity.Provider == kedav1alpha1.PodIdentityProviderAzureWorkload {
accessToken, err := getAzureAccessToken(ctx, meta, azureDatabasePostgresResource)
if err != nil {
return nil, err
}
newPasswordField := "password=" + escapePostgreConnectionParameter(accessToken)
connectionString = passwordConnPattern.ReplaceAllString(meta.connection, newPasswordField)
}

db, err := sql.Open("pgx", connectionString)
if err != nil {
logger.Error(err, fmt.Sprintf("Found error opening postgreSQL: %s", err))
return nil, err
Expand All @@ -168,6 +232,19 @@ func (s *postgreSQLScaler) Close(context.Context) error {

func (s *postgreSQLScaler) getActiveNumber(ctx context.Context) (float64, error) {
var id float64

if s.podIdentity.Provider == kedav1alpha1.PodIdentityProviderAzureWorkload {
if s.metadata.azureAuthContext.token.ExpiresOn.Before(time.Now()) {
s.logger.Info("The Azure Access Token expired, retrieving a new Azure Access Token and instantiating a new Postgres connection object.")
s.connection.Close()
newConnection, err := getConnection(ctx, s.metadata, s.podIdentity, s.logger)
if err != nil {
return 0, fmt.Errorf("error establishing postgreSQL connection: %w", err)
}
s.connection = newConnection
}
}

err := s.connection.QueryRowContext(ctx, s.metadata.query).Scan(&id)
if err != nil {
s.logger.Error(err, fmt.Sprintf("could not query postgreSQL: %s", err))
Expand Down Expand Up @@ -210,3 +287,18 @@ func escapePostgreConnectionParameter(str string) string {
str = strings.ReplaceAll(str, "'", "\\'")
return fmt.Sprintf("'%s'", str)
}

func getAzureAccessToken(ctx context.Context, metadata *postgreSQLMetadata, scope string) (string, error) {
accessToken, err := metadata.azureAuthContext.cred.GetToken(ctx, policy.TokenRequestOptions{
Scopes: []string{
scope,
},
})
if err != nil {
return "", err
}

metadata.azureAuthContext.token = &accessToken

return metadata.azureAuthContext.token.Token, nil
}
Loading
Loading