Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove system kafka config from db and api #178

Merged
merged 4 commits into from
Mar 11, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions api/turing/api/request/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,6 @@ func (r CreateOrUpdateRouterRequest) BuildRouterVersion(
Brokers: r.Config.LogConfig.KafkaConfig.Brokers,
Topic: r.Config.LogConfig.KafkaConfig.Topic,
SerializationFormat: r.Config.LogConfig.KafkaConfig.SerializationFormat,
MaxMessageBytes: defaults.KafkaConfig.MaxMessageBytes,
CompressionType: defaults.KafkaConfig.CompressionType,
}
}
if rv.ExperimentEngine.Type != models.ExperimentEngineTypeNop {
Expand Down
2 changes: 0 additions & 2 deletions api/turing/api/request/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,6 @@ func TestRequestBuildRouterVersionLoggerConfiguration(t *testing.T) {
Brokers: "10:11",
Topic: "2222",
SerializationFormat: "json",
MaxMessageBytes: 1110,
CompressionType: "gzip",
},
BigQueryConfig: nil,
},
Expand Down
17 changes: 8 additions & 9 deletions api/turing/cluster/servicebuilder/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
fiberconfig "github.com/gojek/fiber/config"
mlp "github.com/gojek/mlp/api/client"
"github.com/gojek/turing/api/turing/cluster"
"github.com/gojek/turing/api/turing/config"
"github.com/gojek/turing/api/turing/models"
"github.com/gojek/turing/api/turing/utils"
"github.com/gojek/turing/engines/router/missionctl/fiberapi"
Expand Down Expand Up @@ -85,8 +86,7 @@ func (sb *clusterSvcBuilder) NewRouterService(
envType string,
secretName string,
experimentConfig json.RawMessage,
fluentdTag string,
jaegerCollectorEndpoint string,
routerDefault *config.RouterDefaults,
leonlnj marked this conversation as resolved.
Show resolved Hide resolved
sentryEnabled bool,
sentryDSN string,
knativeTargetConcurrency int,
Expand All @@ -106,7 +106,7 @@ func (sb *clusterSvcBuilder) NewRouterService(
volumes, volumeMounts := buildRouterVolumes(routerVersion, configMap.Name, secretName)

// Build env vars
envs, err := sb.buildRouterEnvs(namespace, envType, fluentdTag, jaegerCollectorEndpoint,
envs, err := sb.buildRouterEnvs(namespace, envType, routerDefault,
sentryEnabled, sentryDSN, secretName, routerVersion)
if err != nil {
return nil, err
Expand Down Expand Up @@ -174,8 +174,7 @@ func (sb *clusterSvcBuilder) GetRouterServiceName(routerVersion *models.RouterVe
func (sb *clusterSvcBuilder) buildRouterEnvs(
namespace string,
environmentType string,
fluentdTag string,
jaegerCollectorEndpoint string,
routerDefault *config.RouterDefaults,
sentryEnabled bool,
sentryDSN string,
secretName string,
Expand All @@ -186,7 +185,7 @@ func (sb *clusterSvcBuilder) buildRouterEnvs(
{Name: envAppName, Value: fmt.Sprintf("%s-%d.%s", ver.Router.Name, ver.Version, namespace)},
{Name: envAppEnvironment, Value: environmentType},
{Name: envRouterTimeout, Value: ver.Timeout},
{Name: envJaegerEndpoint, Value: jaegerCollectorEndpoint},
{Name: envJaegerEndpoint, Value: routerDefault.JaegerCollectorEndpoint},
{Name: envRouterConfigFile, Value: routerConfigMapMountPath + routerConfigFileName},
{Name: envSentryEnabled, Value: strconv.FormatBool(sentryEnabled)},
{Name: envSentryDSN, Value: sentryDSN},
Expand Down Expand Up @@ -246,16 +245,16 @@ func (sb *clusterSvcBuilder) buildRouterEnvs(
envs = append(envs, []corev1.EnvVar{
{Name: envFluentdHost, Value: buildFluentdHost(ver, namespace)},
{Name: envFluentdPort, Value: strconv.Itoa(fluentdPort)},
{Name: envFluentdTag, Value: fluentdTag},
{Name: envFluentdTag, Value: routerDefault.FluentdConfig.Tag},
}...)
}
case models.KafkaLogger:
envs = append(envs, []corev1.EnvVar{
{Name: envKafkaBrokers, Value: logConfig.KafkaConfig.Brokers},
{Name: envKafkaTopic, Value: logConfig.KafkaConfig.Topic},
{Name: envKafkaSerializationFormat, Value: string(logConfig.KafkaConfig.SerializationFormat)},
{Name: envKafkaMaxMessageBytes, Value: strconv.Itoa(logConfig.KafkaConfig.MaxMessageBytes)},
{Name: envKafkaCompressionType, Value: logConfig.KafkaConfig.CompressionType},
{Name: envKafkaMaxMessageBytes, Value: strconv.Itoa(routerDefault.KafkaConfig.MaxMessageBytes)},
{Name: envKafkaCompressionType, Value: routerDefault.KafkaConfig.CompressionType},
}...)
}

Expand Down
58 changes: 37 additions & 21 deletions api/turing/cluster/servicebuilder/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

mlp "github.com/gojek/mlp/api/client"
"github.com/gojek/turing/api/turing/cluster"
"github.com/gojek/turing/api/turing/config"
tu "github.com/gojek/turing/api/turing/internal/testutils"
"github.com/gojek/turing/api/turing/models"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -501,8 +502,21 @@ func TestNewRouterService(t *testing.T) {
Stream: "test-stream",
Team: "test-team",
}
svc, err := sb.NewRouterService(&routerVersion, project, "test-env", "service-account",
data.expRawConfig, "fluentd-tag", "jaeger-endpoint", true, "sentry-dsn", 1, 20, 1.5)
svc, err := sb.NewRouterService(
&routerVersion,
project,
"test-env",
"service-account",
data.expRawConfig,
&config.RouterDefaults{
JaegerCollectorEndpoint: "jaeger-endpoint",
FluentdConfig: &config.FluentdConfig{Tag: "fluentd-tag"},
},
true,
"sentry-dsn",
1,
20,
1.5)

if data.err == "" {
require.NoError(t, err)
Expand Down Expand Up @@ -556,14 +570,13 @@ func TestNewRouterEndpoint(t *testing.T) {

func TestBuildRouterEnvsResultLogger(t *testing.T) {
type args struct {
namespace string
environmentType string
fluentdTag string
jaegerCollectorEndpoint string
sentryEnabled bool
sentryDSN string
secretName string
ver *models.RouterVersion
namespace string
environmentType string
routerDefault *config.RouterDefaults
sentryEnabled bool
sentryDSN string
secretName string
ver *models.RouterVersion
}
namespace := "testnamespace"
tests := []struct {
Expand All @@ -574,13 +587,19 @@ func TestBuildRouterEnvsResultLogger(t *testing.T) {
{
name: "KafkaLogger",
args: args{
namespace: "testnamespace",
environmentType: "dev",
fluentdTag: "",
jaegerCollectorEndpoint: "",
sentryEnabled: false,
sentryDSN: "",
secretName: "",
namespace: "testnamespace",
environmentType: "dev",
routerDefault: &config.RouterDefaults{
JaegerCollectorEndpoint: "",
FluentdConfig: &config.FluentdConfig{Tag: ""},
KafkaConfig: &config.KafkaConfig{
MaxMessageBytes: 123,
CompressionType: "gzip",
},
},
sentryEnabled: false,
sentryDSN: "",
secretName: "",
ver: &models.RouterVersion{
Router: &models.Router{Name: "test1"},
Version: 1,
Expand All @@ -595,8 +614,6 @@ func TestBuildRouterEnvsResultLogger(t *testing.T) {
Brokers: "1.1.1.1:1111",
Topic: "kafkatopic",
SerializationFormat: "protobuf",
MaxMessageBytes: 123,
CompressionType: "gzip",
},
},
},
Expand Down Expand Up @@ -631,8 +648,7 @@ func TestBuildRouterEnvsResultLogger(t *testing.T) {
got, _ := sb.buildRouterEnvs(
namespace,
tt.args.environmentType,
tt.args.fluentdTag,
tt.args.jaegerCollectorEndpoint,
tt.args.routerDefault,
tt.args.sentryEnabled,
tt.args.sentryDSN,
tt.args.secretName,
Expand Down
3 changes: 1 addition & 2 deletions api/turing/cluster/servicebuilder/service_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,7 @@ type ClusterServiceBuilder interface {
envType string,
secretName string,
experimentConfig json.RawMessage,
fluentdTag string,
jaegerEndpoint string,
routerDefault *config.RouterDefaults,
sentryEnabled bool,
sentryDSN string,
knativeTargetConcurrency int,
Expand Down
4 changes: 0 additions & 4 deletions api/turing/models/log_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,6 @@ type KafkaConfig struct {
Topic string `json:"topic"`
// Serialization Format used for the messages
SerializationFormat SerializationFormat `json:"serialization_format"`
// Producer Config - Max message byte to send to broker
MaxMessageBytes int `json:"-"`
// Producer Config - Compression Type of message
CompressionType string `json:"-"`
}

// LogConfig contains all log configuration necessary for a deployment
Expand Down
2 changes: 0 additions & 2 deletions api/turing/models/log_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ func TestLogConfigValue(t *testing.T) {
Brokers: "test-brokers",
Topic: "test-topic",
SerializationFormat: "test-serialization",
MaxMessageBytes: 10000,
CompressionType: "none",
},
},
expected: string(`{
Expand Down
8 changes: 4 additions & 4 deletions api/turing/service/alert_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func TestGitlabOpsAlertServiceSaveShouldRevertGitWhenDbFail(t *testing.T) {
mockSQL.ExpectBegin()
mockSQL.
ExpectQuery(`INSERT INTO "alerts"`).
WillReturnError(errors.New("insertion error"))
WillReturnError(errors.New("test mocked error - insertion error"))

service, err := NewGitlabOpsAlertService(
mockDb,
Expand Down Expand Up @@ -260,7 +260,7 @@ func TestGitlabOpsAlertServiceFindByIDShouldReturnErrWhenNotFound(t *testing.T)
mockSQL.
ExpectQuery(`SELECT (.+) FROM "alerts"`).
WithArgs(1).
WillReturnError(errors.New("select not found"))
WillReturnError(errors.New("test mocked error - select not found"))

service, err := NewGitlabOpsAlertService(
mockDb,
Expand Down Expand Up @@ -364,7 +364,7 @@ func TestGitlabOpsAlertServiceUpdateShouldRevertGitWhenDbFail(t *testing.T) {
mockSQL.ExpectBegin()
mockSQL.
ExpectExec(`UPDATE "alerts"`).
WillReturnError(errors.New("update error"))
WillReturnError(errors.New("test mocked error - update error"))

service, err := NewGitlabOpsAlertService(
mockDb,
Expand Down Expand Up @@ -477,7 +477,7 @@ func TestGitlabOpsAlertSeviceDeleteShouldRevertGitWhenDbFail(t *testing.T) {
mockSQL.ExpectBegin()
mockSQL.
ExpectExec(`DELETE FROM "alerts"`).
WillReturnError(errors.New("delete error"))
WillReturnError(errors.New("test mocked error - delete error"))

service, err := NewGitlabOpsAlertService(
mockDb,
Expand Down
2 changes: 1 addition & 1 deletion api/turing/service/mocks/router_deployment_service.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 10 additions & 14 deletions api/turing/service/router_deployment_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,9 @@ type deploymentService struct {
environmentType string

// Router configs
fluentdConfig *config.FluentdConfig
jaegerCollectorEndpoint string
sentryEnabled bool
sentryDSN string
sentryEnabled bool
sentryDSN string
routerDefault *config.RouterDefaults
leonlnj marked this conversation as resolved.
Show resolved Hide resolved

// Knative service configs
knativeServiceConfig *config.KnativeServiceDefaults
Expand Down Expand Up @@ -85,8 +84,7 @@ func NewDeploymentService(
deploymentTimeout: cfg.DeployConfig.Timeout,
deploymentDeletionTimeout: cfg.DeployConfig.DeletionTimeout,
environmentType: cfg.DeployConfig.EnvironmentType,
fluentdConfig: cfg.RouterDefaults.FluentdConfig,
jaegerCollectorEndpoint: cfg.RouterDefaults.JaegerCollectorEndpoint,
routerDefault: cfg.RouterDefaults,
knativeServiceConfig: cfg.KnativeServiceDefaults,
ensemblerServiceImageBuilder: ensemblerServiceImageBuilder,
sentryEnabled: cfg.Sentry.Enabled,
Expand Down Expand Up @@ -155,7 +153,7 @@ func (ds *deploymentService) DeployRouterVersion(
// Deploy fluentd if enabled
if routerVersion.LogConfig.ResultLoggerType == models.BigQueryLogger {
fluentdService := ds.svcBuilder.NewFluentdService(routerVersion, project,
ds.environmentType, secretName, ds.fluentdConfig)
ds.environmentType, secretName, ds.routerDefault.FluentdConfig)
// Create pvc
err = createPVC(ctx, controller, project.Name, fluentdService.PersistentVolumeClaim)
if err != nil {
Expand Down Expand Up @@ -191,7 +189,7 @@ func (ds *deploymentService) DeployRouterVersion(
// Construct service objects for each of the components and deploy
services, err := ds.createServices(
routerVersion, project, ds.environmentType, secretName, experimentConfig,
ds.fluentdConfig.Tag, ds.jaegerCollectorEndpoint, ds.sentryEnabled, ds.sentryDSN,
ds.routerDefault, ds.sentryEnabled, ds.sentryDSN,
ds.knativeServiceConfig.TargetConcurrency, ds.knativeServiceConfig.QueueProxyResourcePercentage,
ds.knativeServiceConfig.UserContainerLimitRequestFactor,
)
Expand Down Expand Up @@ -245,7 +243,7 @@ func (ds *deploymentService) UndeployRouterVersion(
// Construct service objects for each of the components to be deleted
services, err := ds.createServices(
routerVersion, project, ds.environmentType, "", nil,
ds.fluentdConfig.Tag, ds.jaegerCollectorEndpoint, ds.sentryEnabled, ds.sentryDSN,
ds.routerDefault, ds.sentryEnabled, ds.sentryDSN,
ds.knativeServiceConfig.TargetConcurrency, ds.knativeServiceConfig.QueueProxyResourcePercentage,
ds.knativeServiceConfig.UserContainerLimitRequestFactor,
)
Expand All @@ -265,7 +263,7 @@ func (ds *deploymentService) UndeployRouterVersion(
// Delete fluentd if required
if routerVersion.LogConfig.ResultLoggerType == models.BigQueryLogger {
fluentdService := ds.svcBuilder.NewFluentdService(routerVersion,
project, ds.environmentType, "", ds.fluentdConfig)
project, ds.environmentType, "", ds.routerDefault.FluentdConfig)
err = deleteK8sService(controller, fluentdService, ds.deploymentTimeout)
if err != nil {
errs = append(errs, err.Error())
Expand Down Expand Up @@ -340,8 +338,7 @@ func (ds *deploymentService) createServices(
envType string,
secretName string,
experimentConfig json.RawMessage,
fluentdTag string,
jaegerCollectorEndpoint string,
routerDefault *config.RouterDefaults,
sentryEnabled bool,
sentryDSN string,
knativeTargetConcurrency int,
Expand Down Expand Up @@ -391,8 +388,7 @@ func (ds *deploymentService) createServices(
envType,
secretName,
experimentConfig,
fluentdTag,
jaegerCollectorEndpoint,
routerDefault,
sentryEnabled,
sentryDSN,
knativeTargetConcurrency,
Expand Down
23 changes: 11 additions & 12 deletions api/turing/service/router_deployment_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,7 @@ func (msb *mockClusterServiceBuilder) NewRouterService(
envType string,
secretName string,
expConfig json.RawMessage,
fluentTag string,
jaegerEndpoint string,
routerDefault *config.RouterDefaults,
sentryEnabled bool,
sentryDSN string,
targetConcurrency int,
Expand All @@ -137,8 +136,8 @@ func (msb *mockClusterServiceBuilder) NewRouterService(
Name: fmt.Sprintf("%s-router-%d", rv.Router.Name, rv.Version),
Namespace: project.Name,
Envs: []corev1.EnvVar{
{Name: "JAEGER_EP", Value: jaegerEndpoint},
{Name: "FLUENTD_TAG", Value: fluentTag},
{Name: "JAEGER_EP", Value: routerDefault.JaegerCollectorEndpoint},
{Name: "FLUENTD_TAG", Value: routerDefault.FluentdConfig.Tag},
{Name: "ENVIRONMENT", Value: envType},
{Name: "SENTRY_ENABLED", Value: strconv.FormatBool(sentryEnabled)},
{Name: "SENTRY_DSN", Value: sentryDSN},
Expand Down Expand Up @@ -211,10 +210,10 @@ func TestDeployEndpoint(t *testing.T) {

// Create test endpoint service with mock controller and service builder
ds := &deploymentService{
fluentdConfig: &config.FluentdConfig{
Tag: "fluentd-tag",
routerDefault: &config.RouterDefaults{
JaegerCollectorEndpoint: "jaeger-endpoint",
FluentdConfig: &config.FluentdConfig{Tag: "fluentd-tag"},
},
jaegerCollectorEndpoint: "jaeger-endpoint",
deploymentTimeout: time.Second * 5,
deploymentDeletionTimeout: time.Second * 5,
environmentType: envType,
Expand Down Expand Up @@ -307,8 +306,8 @@ func TestDeployEndpoint(t *testing.T) {
Name: fmt.Sprintf("%s-router-%d", routerVersion.Router.Name, routerVersion.Version),
Namespace: testNamespace,
Envs: []corev1.EnvVar{
{Name: "JAEGER_EP", Value: ds.jaegerCollectorEndpoint},
{Name: "FLUENTD_TAG", Value: ds.fluentdConfig.Tag},
{Name: "JAEGER_EP", Value: ds.routerDefault.JaegerCollectorEndpoint},
{Name: "FLUENTD_TAG", Value: ds.routerDefault.FluentdConfig.Tag},
{Name: "ENVIRONMENT", Value: envType},
{Name: "SENTRY_ENABLED", Value: "true"},
{Name: "SENTRY_DSN", Value: "test:dsn"},
Expand Down Expand Up @@ -373,10 +372,10 @@ func TestDeleteEndpoint(t *testing.T) {

// Create test endpoint service with mock controller and service builder
ds := &deploymentService{
fluentdConfig: &config.FluentdConfig{
Tag: "fluentd-tag",
routerDefault: &config.RouterDefaults{
JaegerCollectorEndpoint: "jaeger-endpoint",
FluentdConfig: &config.FluentdConfig{Tag: "fluentd-tag"},
},
jaegerCollectorEndpoint: "jaeger-endpoint",
deploymentTimeout: timeout,
deploymentDeletionTimeout: timeout,
knativeServiceConfig: &config.KnativeServiceDefaults{
Expand Down