diff --git a/api/turing/api/request/request.go b/api/turing/api/request/request.go index d442a1513..93cbf1195 100644 --- a/api/turing/api/request/request.go +++ b/api/turing/api/request/request.go @@ -186,8 +186,6 @@ func (r CreateOrUpdateRouterRequest) BuildRouterVersion( Brokers: r.Config.LogConfig.KafkaConfig.Brokers, Topic: r.Config.LogConfig.KafkaConfig.Topic, SerializationFormat: r.Config.LogConfig.KafkaConfig.SerializationFormat, - MaxMessageBytes: defaults.KafkaConfig.MaxMessageBytes, - CompressionType: defaults.KafkaConfig.CompressionType, } } if rv.ExperimentEngine.Type != models.ExperimentEngineTypeNop { diff --git a/api/turing/api/request/request_test.go b/api/turing/api/request/request_test.go index 7579bf213..b46e53ac4 100644 --- a/api/turing/api/request/request_test.go +++ b/api/turing/api/request/request_test.go @@ -266,8 +266,6 @@ func TestRequestBuildRouterVersionLoggerConfiguration(t *testing.T) { Brokers: "10:11", Topic: "2222", SerializationFormat: "json", - MaxMessageBytes: 1110, - CompressionType: "gzip", }, BigQueryConfig: nil, }, diff --git a/api/turing/cluster/servicebuilder/router.go b/api/turing/cluster/servicebuilder/router.go index e47ae8e38..e3155dedc 100644 --- a/api/turing/cluster/servicebuilder/router.go +++ b/api/turing/cluster/servicebuilder/router.go @@ -12,6 +12,7 @@ import ( fiberconfig "github.com/gojek/fiber/config" mlp "github.com/gojek/mlp/api/client" "github.com/gojek/turing/api/turing/cluster" + "github.com/gojek/turing/api/turing/config" "github.com/gojek/turing/api/turing/models" "github.com/gojek/turing/api/turing/utils" "github.com/gojek/turing/engines/router/missionctl/fiberapi" @@ -85,8 +86,7 @@ func (sb *clusterSvcBuilder) NewRouterService( envType string, secretName string, experimentConfig json.RawMessage, - fluentdTag string, - jaegerCollectorEndpoint string, + routerDefaults *config.RouterDefaults, sentryEnabled bool, sentryDSN string, knativeTargetConcurrency int, @@ -106,7 +106,7 @@ func (sb *clusterSvcBuilder) NewRouterService( volumes, volumeMounts := buildRouterVolumes(routerVersion, configMap.Name, secretName) // Build env vars - envs, err := sb.buildRouterEnvs(namespace, envType, fluentdTag, jaegerCollectorEndpoint, + envs, err := sb.buildRouterEnvs(namespace, envType, routerDefaults, sentryEnabled, sentryDSN, secretName, routerVersion) if err != nil { return nil, err @@ -174,8 +174,7 @@ func (sb *clusterSvcBuilder) GetRouterServiceName(routerVersion *models.RouterVe func (sb *clusterSvcBuilder) buildRouterEnvs( namespace string, environmentType string, - fluentdTag string, - jaegerCollectorEndpoint string, + routerDefaults *config.RouterDefaults, sentryEnabled bool, sentryDSN string, secretName string, @@ -186,7 +185,7 @@ func (sb *clusterSvcBuilder) buildRouterEnvs( {Name: envAppName, Value: fmt.Sprintf("%s-%d.%s", ver.Router.Name, ver.Version, namespace)}, {Name: envAppEnvironment, Value: environmentType}, {Name: envRouterTimeout, Value: ver.Timeout}, - {Name: envJaegerEndpoint, Value: jaegerCollectorEndpoint}, + {Name: envJaegerEndpoint, Value: routerDefaults.JaegerCollectorEndpoint}, {Name: envRouterConfigFile, Value: routerConfigMapMountPath + routerConfigFileName}, {Name: envSentryEnabled, Value: strconv.FormatBool(sentryEnabled)}, {Name: envSentryDSN, Value: sentryDSN}, @@ -246,7 +245,7 @@ func (sb *clusterSvcBuilder) buildRouterEnvs( envs = append(envs, []corev1.EnvVar{ {Name: envFluentdHost, Value: buildFluentdHost(ver, namespace)}, {Name: envFluentdPort, Value: strconv.Itoa(fluentdPort)}, - {Name: envFluentdTag, Value: fluentdTag}, + {Name: envFluentdTag, Value: routerDefaults.FluentdConfig.Tag}, }...) } case models.KafkaLogger: @@ -254,8 +253,8 @@ func (sb *clusterSvcBuilder) buildRouterEnvs( {Name: envKafkaBrokers, Value: logConfig.KafkaConfig.Brokers}, {Name: envKafkaTopic, Value: logConfig.KafkaConfig.Topic}, {Name: envKafkaSerializationFormat, Value: string(logConfig.KafkaConfig.SerializationFormat)}, - {Name: envKafkaMaxMessageBytes, Value: strconv.Itoa(logConfig.KafkaConfig.MaxMessageBytes)}, - {Name: envKafkaCompressionType, Value: logConfig.KafkaConfig.CompressionType}, + {Name: envKafkaMaxMessageBytes, Value: strconv.Itoa(routerDefaults.KafkaConfig.MaxMessageBytes)}, + {Name: envKafkaCompressionType, Value: routerDefaults.KafkaConfig.CompressionType}, }...) } diff --git a/api/turing/cluster/servicebuilder/router_test.go b/api/turing/cluster/servicebuilder/router_test.go index 662d80d55..f6fee10fb 100644 --- a/api/turing/cluster/servicebuilder/router_test.go +++ b/api/turing/cluster/servicebuilder/router_test.go @@ -9,6 +9,7 @@ import ( mlp "github.com/gojek/mlp/api/client" "github.com/gojek/turing/api/turing/cluster" + "github.com/gojek/turing/api/turing/config" tu "github.com/gojek/turing/api/turing/internal/testutils" "github.com/gojek/turing/api/turing/models" "github.com/stretchr/testify/assert" @@ -501,8 +502,21 @@ func TestNewRouterService(t *testing.T) { Stream: "test-stream", Team: "test-team", } - svc, err := sb.NewRouterService(&routerVersion, project, "test-env", "service-account", - data.expRawConfig, "fluentd-tag", "jaeger-endpoint", true, "sentry-dsn", 1, 20, 1.5) + svc, err := sb.NewRouterService( + &routerVersion, + project, + "test-env", + "service-account", + data.expRawConfig, + &config.RouterDefaults{ + JaegerCollectorEndpoint: "jaeger-endpoint", + FluentdConfig: &config.FluentdConfig{Tag: "fluentd-tag"}, + }, + true, + "sentry-dsn", + 1, + 20, + 1.5) if data.err == "" { require.NoError(t, err) @@ -556,14 +570,13 @@ func TestNewRouterEndpoint(t *testing.T) { func TestBuildRouterEnvsResultLogger(t *testing.T) { type args struct { - namespace string - environmentType string - fluentdTag string - jaegerCollectorEndpoint string - sentryEnabled bool - sentryDSN string - secretName string - ver *models.RouterVersion + namespace string + environmentType string + routerDefaults *config.RouterDefaults + sentryEnabled bool + sentryDSN string + secretName string + ver *models.RouterVersion } namespace := "testnamespace" tests := []struct { @@ -574,13 +587,19 @@ func TestBuildRouterEnvsResultLogger(t *testing.T) { { name: "KafkaLogger", args: args{ - namespace: "testnamespace", - environmentType: "dev", - fluentdTag: "", - jaegerCollectorEndpoint: "", - sentryEnabled: false, - sentryDSN: "", - secretName: "", + namespace: "testnamespace", + environmentType: "dev", + routerDefaults: &config.RouterDefaults{ + JaegerCollectorEndpoint: "", + FluentdConfig: &config.FluentdConfig{Tag: ""}, + KafkaConfig: &config.KafkaConfig{ + MaxMessageBytes: 123, + CompressionType: "gzip", + }, + }, + sentryEnabled: false, + sentryDSN: "", + secretName: "", ver: &models.RouterVersion{ Router: &models.Router{Name: "test1"}, Version: 1, @@ -595,8 +614,6 @@ func TestBuildRouterEnvsResultLogger(t *testing.T) { Brokers: "1.1.1.1:1111", Topic: "kafkatopic", SerializationFormat: "protobuf", - MaxMessageBytes: 123, - CompressionType: "gzip", }, }, }, @@ -631,8 +648,7 @@ func TestBuildRouterEnvsResultLogger(t *testing.T) { got, _ := sb.buildRouterEnvs( namespace, tt.args.environmentType, - tt.args.fluentdTag, - tt.args.jaegerCollectorEndpoint, + tt.args.routerDefaults, tt.args.sentryEnabled, tt.args.sentryDSN, tt.args.secretName, diff --git a/api/turing/cluster/servicebuilder/service_builder.go b/api/turing/cluster/servicebuilder/service_builder.go index b3555289c..3e9c7077c 100644 --- a/api/turing/cluster/servicebuilder/service_builder.go +++ b/api/turing/cluster/servicebuilder/service_builder.go @@ -75,8 +75,7 @@ type ClusterServiceBuilder interface { envType string, secretName string, experimentConfig json.RawMessage, - fluentdTag string, - jaegerEndpoint string, + routerDefaults *config.RouterDefaults, sentryEnabled bool, sentryDSN string, knativeTargetConcurrency int, diff --git a/api/turing/models/log_config.go b/api/turing/models/log_config.go index 2cf5265e1..545a02f39 100644 --- a/api/turing/models/log_config.go +++ b/api/turing/models/log_config.go @@ -52,10 +52,6 @@ type KafkaConfig struct { Topic string `json:"topic"` // Serialization Format used for the messages SerializationFormat SerializationFormat `json:"serialization_format"` - // Producer Config - Max message byte to send to broker - MaxMessageBytes int `json:"-"` - // Producer Config - Compression Type of message - CompressionType string `json:"-"` } // LogConfig contains all log configuration necessary for a deployment diff --git a/api/turing/models/log_config_test.go b/api/turing/models/log_config_test.go index b26b65b64..53991b674 100644 --- a/api/turing/models/log_config_test.go +++ b/api/turing/models/log_config_test.go @@ -59,8 +59,6 @@ func TestLogConfigValue(t *testing.T) { Brokers: "test-brokers", Topic: "test-topic", SerializationFormat: "test-serialization", - MaxMessageBytes: 10000, - CompressionType: "none", }, }, expected: string(`{ diff --git a/api/turing/service/alert_service_test.go b/api/turing/service/alert_service_test.go index 897248f82..7899b6aa4 100644 --- a/api/turing/service/alert_service_test.go +++ b/api/turing/service/alert_service_test.go @@ -139,7 +139,7 @@ func TestGitlabOpsAlertServiceSaveShouldRevertGitWhenDbFail(t *testing.T) { mockSQL.ExpectBegin() mockSQL. ExpectQuery(`INSERT INTO "alerts"`). - WillReturnError(errors.New("insertion error")) + WillReturnError(errors.New("test mocked error - insertion error")) service, err := NewGitlabOpsAlertService( mockDb, @@ -260,7 +260,7 @@ func TestGitlabOpsAlertServiceFindByIDShouldReturnErrWhenNotFound(t *testing.T) mockSQL. ExpectQuery(`SELECT (.+) FROM "alerts"`). WithArgs(1). - WillReturnError(errors.New("select not found")) + WillReturnError(errors.New("test mocked error - select not found")) service, err := NewGitlabOpsAlertService( mockDb, @@ -364,7 +364,7 @@ func TestGitlabOpsAlertServiceUpdateShouldRevertGitWhenDbFail(t *testing.T) { mockSQL.ExpectBegin() mockSQL. ExpectExec(`UPDATE "alerts"`). - WillReturnError(errors.New("update error")) + WillReturnError(errors.New("test mocked error - update error")) service, err := NewGitlabOpsAlertService( mockDb, @@ -477,7 +477,7 @@ func TestGitlabOpsAlertSeviceDeleteShouldRevertGitWhenDbFail(t *testing.T) { mockSQL.ExpectBegin() mockSQL. ExpectExec(`DELETE FROM "alerts"`). - WillReturnError(errors.New("delete error")) + WillReturnError(errors.New("test mocked error - delete error")) service, err := NewGitlabOpsAlertService( mockDb, diff --git a/api/turing/service/mocks/router_deployment_service.go b/api/turing/service/mocks/router_deployment_service.go index 2bd0df645..89b105fba 100644 --- a/api/turing/service/mocks/router_deployment_service.go +++ b/api/turing/service/mocks/router_deployment_service.go @@ -47,7 +47,7 @@ func (_m *DeploymentService) DeployRouterVersion(project *client.Project, enviro } var r1 error - if rf, ok := ret.Get(1).(func(*client.Project, *merlinclient.Environment, *models.RouterVersion, string, string, string, *models.PyFuncEnsembler, json.RawMessage, *service.EventChannel) error); ok { + if rf, ok := ret.Get(1).(func(*client.Project, *merlinclient.Environment, *models.RouterVersion, string, string, string, *models.PyFuncEnsembler, json.RawMessage, *service.EventChannel) error); ok { r1 = rf(project, environment, routerVersion, routerServiceAccountKey, enricherServiceAccountKey, ensemblerServiceAccountKey, pyfuncEnsembler, experimentConfig, eventsCh) } else { r1 = ret.Error(1) diff --git a/api/turing/service/router_deployment_service.go b/api/turing/service/router_deployment_service.go index 64e33504a..15376be60 100644 --- a/api/turing/service/router_deployment_service.go +++ b/api/turing/service/router_deployment_service.go @@ -51,10 +51,9 @@ type deploymentService struct { environmentType string // Router configs - fluentdConfig *config.FluentdConfig - jaegerCollectorEndpoint string - sentryEnabled bool - sentryDSN string + sentryEnabled bool + sentryDSN string + routerDefaults *config.RouterDefaults // Knative service configs knativeServiceConfig *config.KnativeServiceDefaults @@ -85,8 +84,7 @@ func NewDeploymentService( deploymentTimeout: cfg.DeployConfig.Timeout, deploymentDeletionTimeout: cfg.DeployConfig.DeletionTimeout, environmentType: cfg.DeployConfig.EnvironmentType, - fluentdConfig: cfg.RouterDefaults.FluentdConfig, - jaegerCollectorEndpoint: cfg.RouterDefaults.JaegerCollectorEndpoint, + routerDefaults: cfg.RouterDefaults, knativeServiceConfig: cfg.KnativeServiceDefaults, ensemblerServiceImageBuilder: ensemblerServiceImageBuilder, sentryEnabled: cfg.Sentry.Enabled, @@ -155,7 +153,7 @@ func (ds *deploymentService) DeployRouterVersion( // Deploy fluentd if enabled if routerVersion.LogConfig.ResultLoggerType == models.BigQueryLogger { fluentdService := ds.svcBuilder.NewFluentdService(routerVersion, project, - ds.environmentType, secretName, ds.fluentdConfig) + ds.environmentType, secretName, ds.routerDefaults.FluentdConfig) // Create pvc err = createPVC(ctx, controller, project.Name, fluentdService.PersistentVolumeClaim) if err != nil { @@ -191,7 +189,7 @@ func (ds *deploymentService) DeployRouterVersion( // Construct service objects for each of the components and deploy services, err := ds.createServices( routerVersion, project, ds.environmentType, secretName, experimentConfig, - ds.fluentdConfig.Tag, ds.jaegerCollectorEndpoint, ds.sentryEnabled, ds.sentryDSN, + ds.routerDefaults, ds.sentryEnabled, ds.sentryDSN, ds.knativeServiceConfig.TargetConcurrency, ds.knativeServiceConfig.QueueProxyResourcePercentage, ds.knativeServiceConfig.UserContainerLimitRequestFactor, ) @@ -245,7 +243,7 @@ func (ds *deploymentService) UndeployRouterVersion( // Construct service objects for each of the components to be deleted services, err := ds.createServices( routerVersion, project, ds.environmentType, "", nil, - ds.fluentdConfig.Tag, ds.jaegerCollectorEndpoint, ds.sentryEnabled, ds.sentryDSN, + ds.routerDefaults, ds.sentryEnabled, ds.sentryDSN, ds.knativeServiceConfig.TargetConcurrency, ds.knativeServiceConfig.QueueProxyResourcePercentage, ds.knativeServiceConfig.UserContainerLimitRequestFactor, ) @@ -265,7 +263,7 @@ func (ds *deploymentService) UndeployRouterVersion( // Delete fluentd if required if routerVersion.LogConfig.ResultLoggerType == models.BigQueryLogger { fluentdService := ds.svcBuilder.NewFluentdService(routerVersion, - project, ds.environmentType, "", ds.fluentdConfig) + project, ds.environmentType, "", ds.routerDefaults.FluentdConfig) err = deleteK8sService(controller, fluentdService, ds.deploymentTimeout) if err != nil { errs = append(errs, err.Error()) @@ -340,8 +338,7 @@ func (ds *deploymentService) createServices( envType string, secretName string, experimentConfig json.RawMessage, - fluentdTag string, - jaegerCollectorEndpoint string, + routerDefaults *config.RouterDefaults, sentryEnabled bool, sentryDSN string, knativeTargetConcurrency int, @@ -391,8 +388,7 @@ func (ds *deploymentService) createServices( envType, secretName, experimentConfig, - fluentdTag, - jaegerCollectorEndpoint, + routerDefaults, sentryEnabled, sentryDSN, knativeTargetConcurrency, diff --git a/api/turing/service/router_deployment_service_test.go b/api/turing/service/router_deployment_service_test.go index f77238d6d..5d308106c 100644 --- a/api/turing/service/router_deployment_service_test.go +++ b/api/turing/service/router_deployment_service_test.go @@ -121,8 +121,7 @@ func (msb *mockClusterServiceBuilder) NewRouterService( envType string, secretName string, expConfig json.RawMessage, - fluentTag string, - jaegerEndpoint string, + routerDefaults *config.RouterDefaults, sentryEnabled bool, sentryDSN string, targetConcurrency int, @@ -137,8 +136,8 @@ func (msb *mockClusterServiceBuilder) NewRouterService( Name: fmt.Sprintf("%s-router-%d", rv.Router.Name, rv.Version), Namespace: project.Name, Envs: []corev1.EnvVar{ - {Name: "JAEGER_EP", Value: jaegerEndpoint}, - {Name: "FLUENTD_TAG", Value: fluentTag}, + {Name: "JAEGER_EP", Value: routerDefaults.JaegerCollectorEndpoint}, + {Name: "FLUENTD_TAG", Value: routerDefaults.FluentdConfig.Tag}, {Name: "ENVIRONMENT", Value: envType}, {Name: "SENTRY_ENABLED", Value: strconv.FormatBool(sentryEnabled)}, {Name: "SENTRY_DSN", Value: sentryDSN}, @@ -211,10 +210,10 @@ func TestDeployEndpoint(t *testing.T) { // Create test endpoint service with mock controller and service builder ds := &deploymentService{ - fluentdConfig: &config.FluentdConfig{ - Tag: "fluentd-tag", + routerDefaults: &config.RouterDefaults{ + JaegerCollectorEndpoint: "jaeger-endpoint", + FluentdConfig: &config.FluentdConfig{Tag: "fluentd-tag"}, }, - jaegerCollectorEndpoint: "jaeger-endpoint", deploymentTimeout: time.Second * 5, deploymentDeletionTimeout: time.Second * 5, environmentType: envType, @@ -307,8 +306,8 @@ func TestDeployEndpoint(t *testing.T) { Name: fmt.Sprintf("%s-router-%d", routerVersion.Router.Name, routerVersion.Version), Namespace: testNamespace, Envs: []corev1.EnvVar{ - {Name: "JAEGER_EP", Value: ds.jaegerCollectorEndpoint}, - {Name: "FLUENTD_TAG", Value: ds.fluentdConfig.Tag}, + {Name: "JAEGER_EP", Value: ds.routerDefaults.JaegerCollectorEndpoint}, + {Name: "FLUENTD_TAG", Value: ds.routerDefaults.FluentdConfig.Tag}, {Name: "ENVIRONMENT", Value: envType}, {Name: "SENTRY_ENABLED", Value: "true"}, {Name: "SENTRY_DSN", Value: "test:dsn"}, @@ -373,10 +372,10 @@ func TestDeleteEndpoint(t *testing.T) { // Create test endpoint service with mock controller and service builder ds := &deploymentService{ - fluentdConfig: &config.FluentdConfig{ - Tag: "fluentd-tag", + routerDefaults: &config.RouterDefaults{ + JaegerCollectorEndpoint: "jaeger-endpoint", + FluentdConfig: &config.FluentdConfig{Tag: "fluentd-tag"}, }, - jaegerCollectorEndpoint: "jaeger-endpoint", deploymentTimeout: timeout, deploymentDeletionTimeout: timeout, knativeServiceConfig: &config.KnativeServiceDefaults{