Skip to content

Commit

Permalink
Remove system kafka config from db and api (#178)
Browse files Browse the repository at this point in the history
* update kafka config to not be saved in db

* update kafka config to not be saved in db - sort imports

* use plural for routerDefaults var

* use plural for routerDefaults var

Co-authored-by: leonlnj <[email protected]>
  • Loading branch information
leonlnj and leonlnj authored Mar 11, 2022
1 parent 332f41b commit e6a6dfa
Show file tree
Hide file tree
Showing 11 changed files with 72 additions and 73 deletions.
2 changes: 0 additions & 2 deletions api/turing/api/request/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,6 @@ func (r CreateOrUpdateRouterRequest) BuildRouterVersion(
Brokers: r.Config.LogConfig.KafkaConfig.Brokers,
Topic: r.Config.LogConfig.KafkaConfig.Topic,
SerializationFormat: r.Config.LogConfig.KafkaConfig.SerializationFormat,
MaxMessageBytes: defaults.KafkaConfig.MaxMessageBytes,
CompressionType: defaults.KafkaConfig.CompressionType,
}
}
if rv.ExperimentEngine.Type != models.ExperimentEngineTypeNop {
Expand Down
2 changes: 0 additions & 2 deletions api/turing/api/request/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,6 @@ func TestRequestBuildRouterVersionLoggerConfiguration(t *testing.T) {
Brokers: "10:11",
Topic: "2222",
SerializationFormat: "json",
MaxMessageBytes: 1110,
CompressionType: "gzip",
},
BigQueryConfig: nil,
},
Expand Down
17 changes: 8 additions & 9 deletions api/turing/cluster/servicebuilder/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
fiberconfig "github.com/gojek/fiber/config"
mlp "github.com/gojek/mlp/api/client"
"github.com/gojek/turing/api/turing/cluster"
"github.com/gojek/turing/api/turing/config"
"github.com/gojek/turing/api/turing/models"
"github.com/gojek/turing/api/turing/utils"
"github.com/gojek/turing/engines/router/missionctl/fiberapi"
Expand Down Expand Up @@ -85,8 +86,7 @@ func (sb *clusterSvcBuilder) NewRouterService(
envType string,
secretName string,
experimentConfig json.RawMessage,
fluentdTag string,
jaegerCollectorEndpoint string,
routerDefaults *config.RouterDefaults,
sentryEnabled bool,
sentryDSN string,
knativeTargetConcurrency int,
Expand All @@ -106,7 +106,7 @@ func (sb *clusterSvcBuilder) NewRouterService(
volumes, volumeMounts := buildRouterVolumes(routerVersion, configMap.Name, secretName)

// Build env vars
envs, err := sb.buildRouterEnvs(namespace, envType, fluentdTag, jaegerCollectorEndpoint,
envs, err := sb.buildRouterEnvs(namespace, envType, routerDefaults,
sentryEnabled, sentryDSN, secretName, routerVersion)
if err != nil {
return nil, err
Expand Down Expand Up @@ -174,8 +174,7 @@ func (sb *clusterSvcBuilder) GetRouterServiceName(routerVersion *models.RouterVe
func (sb *clusterSvcBuilder) buildRouterEnvs(
namespace string,
environmentType string,
fluentdTag string,
jaegerCollectorEndpoint string,
routerDefaults *config.RouterDefaults,
sentryEnabled bool,
sentryDSN string,
secretName string,
Expand All @@ -186,7 +185,7 @@ func (sb *clusterSvcBuilder) buildRouterEnvs(
{Name: envAppName, Value: fmt.Sprintf("%s-%d.%s", ver.Router.Name, ver.Version, namespace)},
{Name: envAppEnvironment, Value: environmentType},
{Name: envRouterTimeout, Value: ver.Timeout},
{Name: envJaegerEndpoint, Value: jaegerCollectorEndpoint},
{Name: envJaegerEndpoint, Value: routerDefaults.JaegerCollectorEndpoint},
{Name: envRouterConfigFile, Value: routerConfigMapMountPath + routerConfigFileName},
{Name: envSentryEnabled, Value: strconv.FormatBool(sentryEnabled)},
{Name: envSentryDSN, Value: sentryDSN},
Expand Down Expand Up @@ -246,16 +245,16 @@ func (sb *clusterSvcBuilder) buildRouterEnvs(
envs = append(envs, []corev1.EnvVar{
{Name: envFluentdHost, Value: buildFluentdHost(ver, namespace)},
{Name: envFluentdPort, Value: strconv.Itoa(fluentdPort)},
{Name: envFluentdTag, Value: fluentdTag},
{Name: envFluentdTag, Value: routerDefaults.FluentdConfig.Tag},
}...)
}
case models.KafkaLogger:
envs = append(envs, []corev1.EnvVar{
{Name: envKafkaBrokers, Value: logConfig.KafkaConfig.Brokers},
{Name: envKafkaTopic, Value: logConfig.KafkaConfig.Topic},
{Name: envKafkaSerializationFormat, Value: string(logConfig.KafkaConfig.SerializationFormat)},
{Name: envKafkaMaxMessageBytes, Value: strconv.Itoa(logConfig.KafkaConfig.MaxMessageBytes)},
{Name: envKafkaCompressionType, Value: logConfig.KafkaConfig.CompressionType},
{Name: envKafkaMaxMessageBytes, Value: strconv.Itoa(routerDefaults.KafkaConfig.MaxMessageBytes)},
{Name: envKafkaCompressionType, Value: routerDefaults.KafkaConfig.CompressionType},
}...)
}

Expand Down
58 changes: 37 additions & 21 deletions api/turing/cluster/servicebuilder/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

mlp "github.com/gojek/mlp/api/client"
"github.com/gojek/turing/api/turing/cluster"
"github.com/gojek/turing/api/turing/config"
tu "github.com/gojek/turing/api/turing/internal/testutils"
"github.com/gojek/turing/api/turing/models"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -501,8 +502,21 @@ func TestNewRouterService(t *testing.T) {
Stream: "test-stream",
Team: "test-team",
}
svc, err := sb.NewRouterService(&routerVersion, project, "test-env", "service-account",
data.expRawConfig, "fluentd-tag", "jaeger-endpoint", true, "sentry-dsn", 1, 20, 1.5)
svc, err := sb.NewRouterService(
&routerVersion,
project,
"test-env",
"service-account",
data.expRawConfig,
&config.RouterDefaults{
JaegerCollectorEndpoint: "jaeger-endpoint",
FluentdConfig: &config.FluentdConfig{Tag: "fluentd-tag"},
},
true,
"sentry-dsn",
1,
20,
1.5)

if data.err == "" {
require.NoError(t, err)
Expand Down Expand Up @@ -556,14 +570,13 @@ func TestNewRouterEndpoint(t *testing.T) {

func TestBuildRouterEnvsResultLogger(t *testing.T) {
type args struct {
namespace string
environmentType string
fluentdTag string
jaegerCollectorEndpoint string
sentryEnabled bool
sentryDSN string
secretName string
ver *models.RouterVersion
namespace string
environmentType string
routerDefaults *config.RouterDefaults
sentryEnabled bool
sentryDSN string
secretName string
ver *models.RouterVersion
}
namespace := "testnamespace"
tests := []struct {
Expand All @@ -574,13 +587,19 @@ func TestBuildRouterEnvsResultLogger(t *testing.T) {
{
name: "KafkaLogger",
args: args{
namespace: "testnamespace",
environmentType: "dev",
fluentdTag: "",
jaegerCollectorEndpoint: "",
sentryEnabled: false,
sentryDSN: "",
secretName: "",
namespace: "testnamespace",
environmentType: "dev",
routerDefaults: &config.RouterDefaults{
JaegerCollectorEndpoint: "",
FluentdConfig: &config.FluentdConfig{Tag: ""},
KafkaConfig: &config.KafkaConfig{
MaxMessageBytes: 123,
CompressionType: "gzip",
},
},
sentryEnabled: false,
sentryDSN: "",
secretName: "",
ver: &models.RouterVersion{
Router: &models.Router{Name: "test1"},
Version: 1,
Expand All @@ -595,8 +614,6 @@ func TestBuildRouterEnvsResultLogger(t *testing.T) {
Brokers: "1.1.1.1:1111",
Topic: "kafkatopic",
SerializationFormat: "protobuf",
MaxMessageBytes: 123,
CompressionType: "gzip",
},
},
},
Expand Down Expand Up @@ -631,8 +648,7 @@ func TestBuildRouterEnvsResultLogger(t *testing.T) {
got, _ := sb.buildRouterEnvs(
namespace,
tt.args.environmentType,
tt.args.fluentdTag,
tt.args.jaegerCollectorEndpoint,
tt.args.routerDefaults,
tt.args.sentryEnabled,
tt.args.sentryDSN,
tt.args.secretName,
Expand Down
3 changes: 1 addition & 2 deletions api/turing/cluster/servicebuilder/service_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,7 @@ type ClusterServiceBuilder interface {
envType string,
secretName string,
experimentConfig json.RawMessage,
fluentdTag string,
jaegerEndpoint string,
routerDefaults *config.RouterDefaults,
sentryEnabled bool,
sentryDSN string,
knativeTargetConcurrency int,
Expand Down
4 changes: 0 additions & 4 deletions api/turing/models/log_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,6 @@ type KafkaConfig struct {
Topic string `json:"topic"`
// Serialization Format used for the messages
SerializationFormat SerializationFormat `json:"serialization_format"`
// Producer Config - Max message byte to send to broker
MaxMessageBytes int `json:"-"`
// Producer Config - Compression Type of message
CompressionType string `json:"-"`
}

// LogConfig contains all log configuration necessary for a deployment
Expand Down
2 changes: 0 additions & 2 deletions api/turing/models/log_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ func TestLogConfigValue(t *testing.T) {
Brokers: "test-brokers",
Topic: "test-topic",
SerializationFormat: "test-serialization",
MaxMessageBytes: 10000,
CompressionType: "none",
},
},
expected: string(`{
Expand Down
8 changes: 4 additions & 4 deletions api/turing/service/alert_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func TestGitlabOpsAlertServiceSaveShouldRevertGitWhenDbFail(t *testing.T) {
mockSQL.ExpectBegin()
mockSQL.
ExpectQuery(`INSERT INTO "alerts"`).
WillReturnError(errors.New("insertion error"))
WillReturnError(errors.New("test mocked error - insertion error"))

service, err := NewGitlabOpsAlertService(
mockDb,
Expand Down Expand Up @@ -260,7 +260,7 @@ func TestGitlabOpsAlertServiceFindByIDShouldReturnErrWhenNotFound(t *testing.T)
mockSQL.
ExpectQuery(`SELECT (.+) FROM "alerts"`).
WithArgs(1).
WillReturnError(errors.New("select not found"))
WillReturnError(errors.New("test mocked error - select not found"))

service, err := NewGitlabOpsAlertService(
mockDb,
Expand Down Expand Up @@ -364,7 +364,7 @@ func TestGitlabOpsAlertServiceUpdateShouldRevertGitWhenDbFail(t *testing.T) {
mockSQL.ExpectBegin()
mockSQL.
ExpectExec(`UPDATE "alerts"`).
WillReturnError(errors.New("update error"))
WillReturnError(errors.New("test mocked error - update error"))

service, err := NewGitlabOpsAlertService(
mockDb,
Expand Down Expand Up @@ -477,7 +477,7 @@ func TestGitlabOpsAlertSeviceDeleteShouldRevertGitWhenDbFail(t *testing.T) {
mockSQL.ExpectBegin()
mockSQL.
ExpectExec(`DELETE FROM "alerts"`).
WillReturnError(errors.New("delete error"))
WillReturnError(errors.New("test mocked error - delete error"))

service, err := NewGitlabOpsAlertService(
mockDb,
Expand Down
2 changes: 1 addition & 1 deletion api/turing/service/mocks/router_deployment_service.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 10 additions & 14 deletions api/turing/service/router_deployment_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,9 @@ type deploymentService struct {
environmentType string

// Router configs
fluentdConfig *config.FluentdConfig
jaegerCollectorEndpoint string
sentryEnabled bool
sentryDSN string
sentryEnabled bool
sentryDSN string
routerDefaults *config.RouterDefaults

// Knative service configs
knativeServiceConfig *config.KnativeServiceDefaults
Expand Down Expand Up @@ -85,8 +84,7 @@ func NewDeploymentService(
deploymentTimeout: cfg.DeployConfig.Timeout,
deploymentDeletionTimeout: cfg.DeployConfig.DeletionTimeout,
environmentType: cfg.DeployConfig.EnvironmentType,
fluentdConfig: cfg.RouterDefaults.FluentdConfig,
jaegerCollectorEndpoint: cfg.RouterDefaults.JaegerCollectorEndpoint,
routerDefaults: cfg.RouterDefaults,
knativeServiceConfig: cfg.KnativeServiceDefaults,
ensemblerServiceImageBuilder: ensemblerServiceImageBuilder,
sentryEnabled: cfg.Sentry.Enabled,
Expand Down Expand Up @@ -155,7 +153,7 @@ func (ds *deploymentService) DeployRouterVersion(
// Deploy fluentd if enabled
if routerVersion.LogConfig.ResultLoggerType == models.BigQueryLogger {
fluentdService := ds.svcBuilder.NewFluentdService(routerVersion, project,
ds.environmentType, secretName, ds.fluentdConfig)
ds.environmentType, secretName, ds.routerDefaults.FluentdConfig)
// Create pvc
err = createPVC(ctx, controller, project.Name, fluentdService.PersistentVolumeClaim)
if err != nil {
Expand Down Expand Up @@ -191,7 +189,7 @@ func (ds *deploymentService) DeployRouterVersion(
// Construct service objects for each of the components and deploy
services, err := ds.createServices(
routerVersion, project, ds.environmentType, secretName, experimentConfig,
ds.fluentdConfig.Tag, ds.jaegerCollectorEndpoint, ds.sentryEnabled, ds.sentryDSN,
ds.routerDefaults, ds.sentryEnabled, ds.sentryDSN,
ds.knativeServiceConfig.TargetConcurrency, ds.knativeServiceConfig.QueueProxyResourcePercentage,
ds.knativeServiceConfig.UserContainerLimitRequestFactor,
)
Expand Down Expand Up @@ -245,7 +243,7 @@ func (ds *deploymentService) UndeployRouterVersion(
// Construct service objects for each of the components to be deleted
services, err := ds.createServices(
routerVersion, project, ds.environmentType, "", nil,
ds.fluentdConfig.Tag, ds.jaegerCollectorEndpoint, ds.sentryEnabled, ds.sentryDSN,
ds.routerDefaults, ds.sentryEnabled, ds.sentryDSN,
ds.knativeServiceConfig.TargetConcurrency, ds.knativeServiceConfig.QueueProxyResourcePercentage,
ds.knativeServiceConfig.UserContainerLimitRequestFactor,
)
Expand All @@ -265,7 +263,7 @@ func (ds *deploymentService) UndeployRouterVersion(
// Delete fluentd if required
if routerVersion.LogConfig.ResultLoggerType == models.BigQueryLogger {
fluentdService := ds.svcBuilder.NewFluentdService(routerVersion,
project, ds.environmentType, "", ds.fluentdConfig)
project, ds.environmentType, "", ds.routerDefaults.FluentdConfig)
err = deleteK8sService(controller, fluentdService, ds.deploymentTimeout)
if err != nil {
errs = append(errs, err.Error())
Expand Down Expand Up @@ -340,8 +338,7 @@ func (ds *deploymentService) createServices(
envType string,
secretName string,
experimentConfig json.RawMessage,
fluentdTag string,
jaegerCollectorEndpoint string,
routerDefaults *config.RouterDefaults,
sentryEnabled bool,
sentryDSN string,
knativeTargetConcurrency int,
Expand Down Expand Up @@ -391,8 +388,7 @@ func (ds *deploymentService) createServices(
envType,
secretName,
experimentConfig,
fluentdTag,
jaegerCollectorEndpoint,
routerDefaults,
sentryEnabled,
sentryDSN,
knativeTargetConcurrency,
Expand Down
23 changes: 11 additions & 12 deletions api/turing/service/router_deployment_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,7 @@ func (msb *mockClusterServiceBuilder) NewRouterService(
envType string,
secretName string,
expConfig json.RawMessage,
fluentTag string,
jaegerEndpoint string,
routerDefaults *config.RouterDefaults,
sentryEnabled bool,
sentryDSN string,
targetConcurrency int,
Expand All @@ -137,8 +136,8 @@ func (msb *mockClusterServiceBuilder) NewRouterService(
Name: fmt.Sprintf("%s-router-%d", rv.Router.Name, rv.Version),
Namespace: project.Name,
Envs: []corev1.EnvVar{
{Name: "JAEGER_EP", Value: jaegerEndpoint},
{Name: "FLUENTD_TAG", Value: fluentTag},
{Name: "JAEGER_EP", Value: routerDefaults.JaegerCollectorEndpoint},
{Name: "FLUENTD_TAG", Value: routerDefaults.FluentdConfig.Tag},
{Name: "ENVIRONMENT", Value: envType},
{Name: "SENTRY_ENABLED", Value: strconv.FormatBool(sentryEnabled)},
{Name: "SENTRY_DSN", Value: sentryDSN},
Expand Down Expand Up @@ -211,10 +210,10 @@ func TestDeployEndpoint(t *testing.T) {

// Create test endpoint service with mock controller and service builder
ds := &deploymentService{
fluentdConfig: &config.FluentdConfig{
Tag: "fluentd-tag",
routerDefaults: &config.RouterDefaults{
JaegerCollectorEndpoint: "jaeger-endpoint",
FluentdConfig: &config.FluentdConfig{Tag: "fluentd-tag"},
},
jaegerCollectorEndpoint: "jaeger-endpoint",
deploymentTimeout: time.Second * 5,
deploymentDeletionTimeout: time.Second * 5,
environmentType: envType,
Expand Down Expand Up @@ -307,8 +306,8 @@ func TestDeployEndpoint(t *testing.T) {
Name: fmt.Sprintf("%s-router-%d", routerVersion.Router.Name, routerVersion.Version),
Namespace: testNamespace,
Envs: []corev1.EnvVar{
{Name: "JAEGER_EP", Value: ds.jaegerCollectorEndpoint},
{Name: "FLUENTD_TAG", Value: ds.fluentdConfig.Tag},
{Name: "JAEGER_EP", Value: ds.routerDefaults.JaegerCollectorEndpoint},
{Name: "FLUENTD_TAG", Value: ds.routerDefaults.FluentdConfig.Tag},
{Name: "ENVIRONMENT", Value: envType},
{Name: "SENTRY_ENABLED", Value: "true"},
{Name: "SENTRY_DSN", Value: "test:dsn"},
Expand Down Expand Up @@ -373,10 +372,10 @@ func TestDeleteEndpoint(t *testing.T) {

// Create test endpoint service with mock controller and service builder
ds := &deploymentService{
fluentdConfig: &config.FluentdConfig{
Tag: "fluentd-tag",
routerDefaults: &config.RouterDefaults{
JaegerCollectorEndpoint: "jaeger-endpoint",
FluentdConfig: &config.FluentdConfig{Tag: "fluentd-tag"},
},
jaegerCollectorEndpoint: "jaeger-endpoint",
deploymentTimeout: timeout,
deploymentDeletionTimeout: timeout,
knativeServiceConfig: &config.KnativeServiceDefaults{
Expand Down

0 comments on commit e6a6dfa

Please sign in to comment.