From 11eec0131daa9d4d8bfe97780203225da443843c Mon Sep 17 00:00:00 2001 From: Chaoran Yu Date: Fri, 13 Dec 2019 16:36:39 -0500 Subject: [PATCH] Various minor fixes (#145) * Minor fixes * Reverted Travis Go version change --- Dockerfile | 2 +- examples/wordcount/Dockerfile | 2 +- examples/wordcount/pom.xml | 4 +-- integ/utils/utils.go | 3 +- pkg/apis/app/v1alpha1/types.go | 32 +++++++++++----------- pkg/apis/app/v1beta1/types.go | 32 +++++++++++----------- pkg/controller/config/config.go | 4 +-- pkg/controller/config/config_flags.go | 4 +-- pkg/controller/config/config_flags_test.go | 16 +++++------ 9 files changed, 49 insertions(+), 50 deletions(-) diff --git a/Dockerfile b/Dockerfile index e1d44393..1801117d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.13.0-alpine3.10 as builder +FROM golang:1.13.4-alpine3.10 as builder RUN apk add git openssh-client make curl bash COPY boilerplate/lyft/golang_test_targets/dep_install.sh /go/src/github.com/lyft/flinkk8soperator/ diff --git a/examples/wordcount/Dockerfile b/examples/wordcount/Dockerfile index 3f7e2669..41b28814 100644 --- a/examples/wordcount/Dockerfile +++ b/examples/wordcount/Dockerfile @@ -1,4 +1,4 @@ -FROM flink:1.8.2-scala_2.12 +FROM flink:1.9.1-scala_2.12 # Prepare environment ENV MAVEN_HOME=/opt/maven diff --git a/examples/wordcount/pom.xml b/examples/wordcount/pom.xml index 0b827617..cabfb75e 100644 --- a/examples/wordcount/pom.xml +++ b/examples/wordcount/pom.xml @@ -19,12 +19,12 @@ org.apache.flink flink-java - 1.8.1 + 1.9.1 org.apache.flink flink-streaming-java_2.11 - 1.8.1 + 1.9.1 diff --git a/integ/utils/utils.go b/integ/utils/utils.go index ae21e142..46c50f42 100644 --- a/integ/utils/utils.go +++ b/integ/utils/utils.go @@ -14,6 +14,7 @@ import ( "github.com/go-resty/resty" flinkapp "github.com/lyft/flinkk8soperator/pkg/apis/app/v1beta1" + clientset "github.com/lyft/flinkk8soperator/pkg/client/clientset/versioned" client "github.com/lyft/flinkk8soperator/pkg/client/clientset/versioned/typed/app/v1beta1" "github.com/prometheus/common/log" appsv1 "k8s.io/api/apps/v1" @@ -25,8 +26,6 @@ import ( "k8s.io/apimachinery/pkg/util/yaml" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" - - clientset "github.com/lyft/flinkk8soperator/pkg/client/clientset/versioned" ) type TestUtil struct { diff --git a/pkg/apis/app/v1alpha1/types.go b/pkg/apis/app/v1alpha1/types.go index f17c955e..72d723dd 100644 --- a/pkg/apis/app/v1alpha1/types.go +++ b/pkg/apis/app/v1alpha1/types.go @@ -127,15 +127,15 @@ type SavepointInfo struct { type FlinkClusterStatus struct { Health HealthStatus `json:"health,omitempty"` NumberOfTaskManagers int32 `json:"numberOfTaskManagers,omitempty"` - HealthyTaskManagers int32 `json:"healthyTaskManagers,omitepty"` + HealthyTaskManagers int32 `json:"healthyTaskManagers,omitempty"` NumberOfTaskSlots int32 `json:"numberOfTaskSlots,omitempty"` AvailableTaskSlots int32 `json:"availableTaskSlots"` } type FlinkJobStatus struct { - JobID string `json:"jobID,omitEmpty"` - Health HealthStatus `json:"health,omitEmpty"` - State JobState `json:"state,omitEmpty"` + JobID string `json:"jobID,omitempty"` + Health HealthStatus `json:"health,omitempty"` + State JobState `json:"state,omitempty"` JarName string `json:"jarName"` Parallelism int32 `json:"parallelism"` @@ -143,14 +143,14 @@ type FlinkJobStatus struct { ProgramArgs string `json:"programArgs,omitempty"` AllowNonRestoredState bool `json:"allowNonRestoredState,omitempty"` - StartTime *metav1.Time `json:"startTime,omitEmpty"` - JobRestartCount int32 `json:"jobRestartCount,omitEmpty"` - CompletedCheckpointCount int32 `json:"completedCheckpointCount,omitEmpty"` - FailedCheckpointCount int32 `json:"failedCheckpointCount,omitEmpty"` - LastCheckpointTime *metav1.Time `json:"lastCheckpointTime,omitEmpty"` - RestorePath string `json:"restorePath,omitEmpty"` - RestoreTime *metav1.Time `json:"restoreTime,omitEmpty"` - LastFailingTime *metav1.Time `json:"lastFailingTime,omitEmpty"` + StartTime *metav1.Time `json:"startTime,omitempty"` + JobRestartCount int32 `json:"jobRestartCount,omitempty"` + CompletedCheckpointCount int32 `json:"completedCheckpointCount,omitempty"` + FailedCheckpointCount int32 `json:"failedCheckpointCount,omitempty"` + LastCheckpointTime *metav1.Time `json:"lastCheckpointTime,omitempty"` + RestorePath string `json:"restorePath,omitempty"` + RestoreTime *metav1.Time `json:"restoreTime,omitempty"` + LastFailingTime *metav1.Time `json:"lastFailingTime,omitempty"` } type FlinkApplicationStatus struct { @@ -160,11 +160,11 @@ type FlinkApplicationStatus struct { Reason string `json:"reason,omitempty"` ClusterStatus FlinkClusterStatus `json:"clusterStatus,omitempty"` JobStatus FlinkJobStatus `json:"jobStatus"` - FailedDeployHash string `json:"failedDeployHash,omitEmpty"` - RollbackHash string `json:"rollbackHash,omitEmpty"` + FailedDeployHash string `json:"failedDeployHash,omitempty"` + RollbackHash string `json:"rollbackHash,omitempty"` DeployHash string `json:"deployHash"` - RetryCount int32 `json:"retryCount,omitEmpty"` - LastSeenError FlinkApplicationError `json:"lastSeenError,omitEmpty"` + RetryCount int32 `json:"retryCount,omitempty"` + LastSeenError FlinkApplicationError `json:"lastSeenError,omitempty"` } func (in *FlinkApplicationStatus) GetPhase() FlinkApplicationPhase { diff --git a/pkg/apis/app/v1beta1/types.go b/pkg/apis/app/v1beta1/types.go index 7f2705e4..e87dcdba 100644 --- a/pkg/apis/app/v1beta1/types.go +++ b/pkg/apis/app/v1beta1/types.go @@ -130,16 +130,16 @@ type FlinkClusterStatus struct { ClusterOverviewURL string `json:"clusterOverviewURL,omitempty"` Health HealthStatus `json:"health,omitempty"` NumberOfTaskManagers int32 `json:"numberOfTaskManagers,omitempty"` - HealthyTaskManagers int32 `json:"healthyTaskManagers,omitepty"` + HealthyTaskManagers int32 `json:"healthyTaskManagers,omitempty"` NumberOfTaskSlots int32 `json:"numberOfTaskSlots,omitempty"` AvailableTaskSlots int32 `json:"availableTaskSlots"` } type FlinkJobStatus struct { JobOverviewURL string `json:"jobOverviewURL,omitempty"` - JobID string `json:"jobID,omitEmpty"` - Health HealthStatus `json:"health,omitEmpty"` - State JobState `json:"state,omitEmpty"` + JobID string `json:"jobID,omitempty"` + Health HealthStatus `json:"health,omitempty"` + State JobState `json:"state,omitempty"` JarName string `json:"jarName"` Parallelism int32 `json:"parallelism"` @@ -147,14 +147,14 @@ type FlinkJobStatus struct { ProgramArgs string `json:"programArgs,omitempty"` AllowNonRestoredState bool `json:"allowNonRestoredState,omitempty"` - StartTime *metav1.Time `json:"startTime,omitEmpty"` - JobRestartCount int32 `json:"jobRestartCount,omitEmpty"` - CompletedCheckpointCount int32 `json:"completedCheckpointCount,omitEmpty"` - FailedCheckpointCount int32 `json:"failedCheckpointCount,omitEmpty"` - LastCheckpointTime *metav1.Time `json:"lastCheckpointTime,omitEmpty"` - RestorePath string `json:"restorePath,omitEmpty"` - RestoreTime *metav1.Time `json:"restoreTime,omitEmpty"` - LastFailingTime *metav1.Time `json:"lastFailingTime,omitEmpty"` + StartTime *metav1.Time `json:"startTime,omitempty"` + JobRestartCount int32 `json:"jobRestartCount,omitempty"` + CompletedCheckpointCount int32 `json:"completedCheckpointCount,omitempty"` + FailedCheckpointCount int32 `json:"failedCheckpointCount,omitempty"` + LastCheckpointTime *metav1.Time `json:"lastCheckpointTime,omitempty"` + RestorePath string `json:"restorePath,omitempty"` + RestoreTime *metav1.Time `json:"restoreTime,omitempty"` + LastFailingTime *metav1.Time `json:"lastFailingTime,omitempty"` } type FlinkApplicationStatus struct { @@ -164,13 +164,13 @@ type FlinkApplicationStatus struct { Reason string `json:"reason,omitempty"` ClusterStatus FlinkClusterStatus `json:"clusterStatus,omitempty"` JobStatus FlinkJobStatus `json:"jobStatus"` - FailedDeployHash string `json:"failedDeployHash,omitEmpty"` - RollbackHash string `json:"rollbackHash,omitEmpty"` + FailedDeployHash string `json:"failedDeployHash,omitempty"` + RollbackHash string `json:"rollbackHash,omitempty"` DeployHash string `json:"deployHash"` SavepointTriggerID string `json:"savepointTriggerId,omitempty"` SavepointPath string `json:"savepointPath,omitempty"` - RetryCount int32 `json:"retryCount,omitEmpty"` - LastSeenError *FlinkApplicationError `json:"lastSeenError,omitEmpty"` + RetryCount int32 `json:"retryCount,omitempty"` + LastSeenError *FlinkApplicationError `json:"lastSeenError,omitempty"` } func (in *FlinkApplicationStatus) GetPhase() FlinkApplicationPhase { diff --git a/pkg/controller/config/config.go b/pkg/controller/config/config.go index b56f7a16..78055dad 100644 --- a/pkg/controller/config/config.go +++ b/pkg/controller/config/config.go @@ -15,10 +15,10 @@ type Config struct { ResyncPeriod config.Duration `json:"resyncPeriod" pflag:"\"30s\",Determines the resync period for all watchers."` LimitNamespace string `json:"limitNamespace" pflag:"\"\",Namespaces to watch for by flink operator"` MetricsPrefix string `json:"metricsPrefix" pflag:"\"flinkk8soperator\",Prefix for metrics propagated to prometheus"` - ProfilerPort config.Port `json:"prof-port" pflag:"\"10254\",Profiler port"` + ProfilerPort config.Port `json:"profilerPort" pflag:"\"10254\",Profiler port"` FlinkIngressURLFormat string `json:"ingressUrlFormat"` UseProxy bool `json:"useKubectlProxy"` - ProxyPort config.Port `json:"ProxyPort" pflag:"\"8001\",The port at which flink cluster runs locally"` + ProxyPort config.Port `json:"proxyPort" pflag:"\"8001\",The port at which flink cluster runs locally"` ContainerNameFormat string `json:"containerNameFormat"` Workers int `json:"workers" pflag:"4,Number of routines to process custom resource"` BaseBackoffDuration config.Duration `json:"baseBackoffDuration" pflag:"\"100ms\",Determines the base backoff for exponential retries."` diff --git a/pkg/controller/config/config_flags.go b/pkg/controller/config/config_flags.go index 8395f535..b2dc7c65 100755 --- a/pkg/controller/config/config_flags.go +++ b/pkg/controller/config/config_flags.go @@ -44,10 +44,10 @@ func (cfg Config) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags.String(fmt.Sprintf("%v%v", prefix, "resyncPeriod"), "30s", "Determines the resync period for all watchers.") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "limitNamespace"), "", "Namespaces to watch for by flink operator") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "metricsPrefix"), "flinkk8soperator", "Prefix for metrics propagated to prometheus") - cmdFlags.String(fmt.Sprintf("%v%v", prefix, "prof-port"), "10254", "Profiler port") + cmdFlags.String(fmt.Sprintf("%v%v", prefix, "profilerPort"), "10254", "Profiler port") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "ingressUrlFormat"), *new(string), "") cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "useKubectlProxy"), *new(bool), "") - cmdFlags.String(fmt.Sprintf("%v%v", prefix, "ProxyPort"), "8001", "The port at which flink cluster runs locally") + cmdFlags.String(fmt.Sprintf("%v%v", prefix, "proxyPort"), "8001", "The port at which flink cluster runs locally") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "containerNameFormat"), *new(string), "") cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "workers"), 4, "Number of routines to process custom resource") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "baseBackoffDuration"), "100ms", "Determines the base backoff for exponential retries.") diff --git a/pkg/controller/config/config_flags_test.go b/pkg/controller/config/config_flags_test.go index bfae1ed7..cf8273f2 100755 --- a/pkg/controller/config/config_flags_test.go +++ b/pkg/controller/config/config_flags_test.go @@ -165,10 +165,10 @@ func TestConfig_SetFlags(t *testing.T) { } }) }) - t.Run("Test_prof-port", func(t *testing.T) { + t.Run("Test_profilerPort", func(t *testing.T) { t.Run("DefaultValue", func(t *testing.T) { // Test that default value is set properly - if vString, err := cmdFlags.GetString("prof-port"); err == nil { + if vString, err := cmdFlags.GetString("profilerPort"); err == nil { assert.Equal(t, string("10254"), vString) } else { assert.FailNow(t, err.Error()) @@ -178,8 +178,8 @@ func TestConfig_SetFlags(t *testing.T) { t.Run("Override", func(t *testing.T) { testValue := "10254" - cmdFlags.Set("prof-port", testValue) - if vString, err := cmdFlags.GetString("prof-port"); err == nil { + cmdFlags.Set("profilerPort", testValue) + if vString, err := cmdFlags.GetString("profilerPort"); err == nil { testDecodeJson_Config(t, fmt.Sprintf("%v", vString), &actual.ProfilerPort) } else { @@ -231,10 +231,10 @@ func TestConfig_SetFlags(t *testing.T) { } }) }) - t.Run("Test_ProxyPort", func(t *testing.T) { + t.Run("Test_proxyPort", func(t *testing.T) { t.Run("DefaultValue", func(t *testing.T) { // Test that default value is set properly - if vString, err := cmdFlags.GetString("ProxyPort"); err == nil { + if vString, err := cmdFlags.GetString("proxyPort"); err == nil { assert.Equal(t, string("8001"), vString) } else { assert.FailNow(t, err.Error()) @@ -244,8 +244,8 @@ func TestConfig_SetFlags(t *testing.T) { t.Run("Override", func(t *testing.T) { testValue := "8001" - cmdFlags.Set("ProxyPort", testValue) - if vString, err := cmdFlags.GetString("ProxyPort"); err == nil { + cmdFlags.Set("proxyPort", testValue) + if vString, err := cmdFlags.GetString("proxyPort"); err == nil { testDecodeJson_Config(t, fmt.Sprintf("%v", vString), &actual.ProxyPort) } else {