diff --git a/Dockerfile b/Dockerfile
index e1d44393..1801117d 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -1,4 +1,4 @@
-FROM golang:1.13.0-alpine3.10 as builder
+FROM golang:1.13.4-alpine3.10 as builder
RUN apk add git openssh-client make curl bash
COPY boilerplate/lyft/golang_test_targets/dep_install.sh /go/src/github.com/lyft/flinkk8soperator/
diff --git a/examples/wordcount/Dockerfile b/examples/wordcount/Dockerfile
index 3f7e2669..41b28814 100644
--- a/examples/wordcount/Dockerfile
+++ b/examples/wordcount/Dockerfile
@@ -1,4 +1,4 @@
-FROM flink:1.8.2-scala_2.12
+FROM flink:1.9.1-scala_2.12
# Prepare environment
ENV MAVEN_HOME=/opt/maven
diff --git a/examples/wordcount/pom.xml b/examples/wordcount/pom.xml
index 0b827617..cabfb75e 100644
--- a/examples/wordcount/pom.xml
+++ b/examples/wordcount/pom.xml
@@ -19,12 +19,12 @@
org.apache.flink
flink-java
- 1.8.1
+ 1.9.1
org.apache.flink
flink-streaming-java_2.11
- 1.8.1
+ 1.9.1
diff --git a/integ/utils/utils.go b/integ/utils/utils.go
index ae21e142..46c50f42 100644
--- a/integ/utils/utils.go
+++ b/integ/utils/utils.go
@@ -14,6 +14,7 @@ import (
"github.com/go-resty/resty"
flinkapp "github.com/lyft/flinkk8soperator/pkg/apis/app/v1beta1"
+ clientset "github.com/lyft/flinkk8soperator/pkg/client/clientset/versioned"
client "github.com/lyft/flinkk8soperator/pkg/client/clientset/versioned/typed/app/v1beta1"
"github.com/prometheus/common/log"
appsv1 "k8s.io/api/apps/v1"
@@ -25,8 +26,6 @@ import (
"k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
-
- clientset "github.com/lyft/flinkk8soperator/pkg/client/clientset/versioned"
)
type TestUtil struct {
diff --git a/pkg/apis/app/v1alpha1/types.go b/pkg/apis/app/v1alpha1/types.go
index f17c955e..72d723dd 100644
--- a/pkg/apis/app/v1alpha1/types.go
+++ b/pkg/apis/app/v1alpha1/types.go
@@ -127,15 +127,15 @@ type SavepointInfo struct {
type FlinkClusterStatus struct {
Health HealthStatus `json:"health,omitempty"`
NumberOfTaskManagers int32 `json:"numberOfTaskManagers,omitempty"`
- HealthyTaskManagers int32 `json:"healthyTaskManagers,omitepty"`
+ HealthyTaskManagers int32 `json:"healthyTaskManagers,omitempty"`
NumberOfTaskSlots int32 `json:"numberOfTaskSlots,omitempty"`
AvailableTaskSlots int32 `json:"availableTaskSlots"`
}
type FlinkJobStatus struct {
- JobID string `json:"jobID,omitEmpty"`
- Health HealthStatus `json:"health,omitEmpty"`
- State JobState `json:"state,omitEmpty"`
+ JobID string `json:"jobID,omitempty"`
+ Health HealthStatus `json:"health,omitempty"`
+ State JobState `json:"state,omitempty"`
JarName string `json:"jarName"`
Parallelism int32 `json:"parallelism"`
@@ -143,14 +143,14 @@ type FlinkJobStatus struct {
ProgramArgs string `json:"programArgs,omitempty"`
AllowNonRestoredState bool `json:"allowNonRestoredState,omitempty"`
- StartTime *metav1.Time `json:"startTime,omitEmpty"`
- JobRestartCount int32 `json:"jobRestartCount,omitEmpty"`
- CompletedCheckpointCount int32 `json:"completedCheckpointCount,omitEmpty"`
- FailedCheckpointCount int32 `json:"failedCheckpointCount,omitEmpty"`
- LastCheckpointTime *metav1.Time `json:"lastCheckpointTime,omitEmpty"`
- RestorePath string `json:"restorePath,omitEmpty"`
- RestoreTime *metav1.Time `json:"restoreTime,omitEmpty"`
- LastFailingTime *metav1.Time `json:"lastFailingTime,omitEmpty"`
+ StartTime *metav1.Time `json:"startTime,omitempty"`
+ JobRestartCount int32 `json:"jobRestartCount,omitempty"`
+ CompletedCheckpointCount int32 `json:"completedCheckpointCount,omitempty"`
+ FailedCheckpointCount int32 `json:"failedCheckpointCount,omitempty"`
+ LastCheckpointTime *metav1.Time `json:"lastCheckpointTime,omitempty"`
+ RestorePath string `json:"restorePath,omitempty"`
+ RestoreTime *metav1.Time `json:"restoreTime,omitempty"`
+ LastFailingTime *metav1.Time `json:"lastFailingTime,omitempty"`
}
type FlinkApplicationStatus struct {
@@ -160,11 +160,11 @@ type FlinkApplicationStatus struct {
Reason string `json:"reason,omitempty"`
ClusterStatus FlinkClusterStatus `json:"clusterStatus,omitempty"`
JobStatus FlinkJobStatus `json:"jobStatus"`
- FailedDeployHash string `json:"failedDeployHash,omitEmpty"`
- RollbackHash string `json:"rollbackHash,omitEmpty"`
+ FailedDeployHash string `json:"failedDeployHash,omitempty"`
+ RollbackHash string `json:"rollbackHash,omitempty"`
DeployHash string `json:"deployHash"`
- RetryCount int32 `json:"retryCount,omitEmpty"`
- LastSeenError FlinkApplicationError `json:"lastSeenError,omitEmpty"`
+ RetryCount int32 `json:"retryCount,omitempty"`
+ LastSeenError FlinkApplicationError `json:"lastSeenError,omitempty"`
}
func (in *FlinkApplicationStatus) GetPhase() FlinkApplicationPhase {
diff --git a/pkg/apis/app/v1beta1/types.go b/pkg/apis/app/v1beta1/types.go
index 7f2705e4..e87dcdba 100644
--- a/pkg/apis/app/v1beta1/types.go
+++ b/pkg/apis/app/v1beta1/types.go
@@ -130,16 +130,16 @@ type FlinkClusterStatus struct {
ClusterOverviewURL string `json:"clusterOverviewURL,omitempty"`
Health HealthStatus `json:"health,omitempty"`
NumberOfTaskManagers int32 `json:"numberOfTaskManagers,omitempty"`
- HealthyTaskManagers int32 `json:"healthyTaskManagers,omitepty"`
+ HealthyTaskManagers int32 `json:"healthyTaskManagers,omitempty"`
NumberOfTaskSlots int32 `json:"numberOfTaskSlots,omitempty"`
AvailableTaskSlots int32 `json:"availableTaskSlots"`
}
type FlinkJobStatus struct {
JobOverviewURL string `json:"jobOverviewURL,omitempty"`
- JobID string `json:"jobID,omitEmpty"`
- Health HealthStatus `json:"health,omitEmpty"`
- State JobState `json:"state,omitEmpty"`
+ JobID string `json:"jobID,omitempty"`
+ Health HealthStatus `json:"health,omitempty"`
+ State JobState `json:"state,omitempty"`
JarName string `json:"jarName"`
Parallelism int32 `json:"parallelism"`
@@ -147,14 +147,14 @@ type FlinkJobStatus struct {
ProgramArgs string `json:"programArgs,omitempty"`
AllowNonRestoredState bool `json:"allowNonRestoredState,omitempty"`
- StartTime *metav1.Time `json:"startTime,omitEmpty"`
- JobRestartCount int32 `json:"jobRestartCount,omitEmpty"`
- CompletedCheckpointCount int32 `json:"completedCheckpointCount,omitEmpty"`
- FailedCheckpointCount int32 `json:"failedCheckpointCount,omitEmpty"`
- LastCheckpointTime *metav1.Time `json:"lastCheckpointTime,omitEmpty"`
- RestorePath string `json:"restorePath,omitEmpty"`
- RestoreTime *metav1.Time `json:"restoreTime,omitEmpty"`
- LastFailingTime *metav1.Time `json:"lastFailingTime,omitEmpty"`
+ StartTime *metav1.Time `json:"startTime,omitempty"`
+ JobRestartCount int32 `json:"jobRestartCount,omitempty"`
+ CompletedCheckpointCount int32 `json:"completedCheckpointCount,omitempty"`
+ FailedCheckpointCount int32 `json:"failedCheckpointCount,omitempty"`
+ LastCheckpointTime *metav1.Time `json:"lastCheckpointTime,omitempty"`
+ RestorePath string `json:"restorePath,omitempty"`
+ RestoreTime *metav1.Time `json:"restoreTime,omitempty"`
+ LastFailingTime *metav1.Time `json:"lastFailingTime,omitempty"`
}
type FlinkApplicationStatus struct {
@@ -164,13 +164,13 @@ type FlinkApplicationStatus struct {
Reason string `json:"reason,omitempty"`
ClusterStatus FlinkClusterStatus `json:"clusterStatus,omitempty"`
JobStatus FlinkJobStatus `json:"jobStatus"`
- FailedDeployHash string `json:"failedDeployHash,omitEmpty"`
- RollbackHash string `json:"rollbackHash,omitEmpty"`
+ FailedDeployHash string `json:"failedDeployHash,omitempty"`
+ RollbackHash string `json:"rollbackHash,omitempty"`
DeployHash string `json:"deployHash"`
SavepointTriggerID string `json:"savepointTriggerId,omitempty"`
SavepointPath string `json:"savepointPath,omitempty"`
- RetryCount int32 `json:"retryCount,omitEmpty"`
- LastSeenError *FlinkApplicationError `json:"lastSeenError,omitEmpty"`
+ RetryCount int32 `json:"retryCount,omitempty"`
+ LastSeenError *FlinkApplicationError `json:"lastSeenError,omitempty"`
}
func (in *FlinkApplicationStatus) GetPhase() FlinkApplicationPhase {
diff --git a/pkg/controller/config/config.go b/pkg/controller/config/config.go
index b56f7a16..78055dad 100644
--- a/pkg/controller/config/config.go
+++ b/pkg/controller/config/config.go
@@ -15,10 +15,10 @@ type Config struct {
ResyncPeriod config.Duration `json:"resyncPeriod" pflag:"\"30s\",Determines the resync period for all watchers."`
LimitNamespace string `json:"limitNamespace" pflag:"\"\",Namespaces to watch for by flink operator"`
MetricsPrefix string `json:"metricsPrefix" pflag:"\"flinkk8soperator\",Prefix for metrics propagated to prometheus"`
- ProfilerPort config.Port `json:"prof-port" pflag:"\"10254\",Profiler port"`
+ ProfilerPort config.Port `json:"profilerPort" pflag:"\"10254\",Profiler port"`
FlinkIngressURLFormat string `json:"ingressUrlFormat"`
UseProxy bool `json:"useKubectlProxy"`
- ProxyPort config.Port `json:"ProxyPort" pflag:"\"8001\",The port at which flink cluster runs locally"`
+ ProxyPort config.Port `json:"proxyPort" pflag:"\"8001\",The port at which flink cluster runs locally"`
ContainerNameFormat string `json:"containerNameFormat"`
Workers int `json:"workers" pflag:"4,Number of routines to process custom resource"`
BaseBackoffDuration config.Duration `json:"baseBackoffDuration" pflag:"\"100ms\",Determines the base backoff for exponential retries."`
diff --git a/pkg/controller/config/config_flags.go b/pkg/controller/config/config_flags.go
index 8395f535..b2dc7c65 100755
--- a/pkg/controller/config/config_flags.go
+++ b/pkg/controller/config/config_flags.go
@@ -44,10 +44,10 @@ func (cfg Config) GetPFlagSet(prefix string) *pflag.FlagSet {
cmdFlags.String(fmt.Sprintf("%v%v", prefix, "resyncPeriod"), "30s", "Determines the resync period for all watchers.")
cmdFlags.String(fmt.Sprintf("%v%v", prefix, "limitNamespace"), "", "Namespaces to watch for by flink operator")
cmdFlags.String(fmt.Sprintf("%v%v", prefix, "metricsPrefix"), "flinkk8soperator", "Prefix for metrics propagated to prometheus")
- cmdFlags.String(fmt.Sprintf("%v%v", prefix, "prof-port"), "10254", "Profiler port")
+ cmdFlags.String(fmt.Sprintf("%v%v", prefix, "profilerPort"), "10254", "Profiler port")
cmdFlags.String(fmt.Sprintf("%v%v", prefix, "ingressUrlFormat"), *new(string), "")
cmdFlags.Bool(fmt.Sprintf("%v%v", prefix, "useKubectlProxy"), *new(bool), "")
- cmdFlags.String(fmt.Sprintf("%v%v", prefix, "ProxyPort"), "8001", "The port at which flink cluster runs locally")
+ cmdFlags.String(fmt.Sprintf("%v%v", prefix, "proxyPort"), "8001", "The port at which flink cluster runs locally")
cmdFlags.String(fmt.Sprintf("%v%v", prefix, "containerNameFormat"), *new(string), "")
cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "workers"), 4, "Number of routines to process custom resource")
cmdFlags.String(fmt.Sprintf("%v%v", prefix, "baseBackoffDuration"), "100ms", "Determines the base backoff for exponential retries.")
diff --git a/pkg/controller/config/config_flags_test.go b/pkg/controller/config/config_flags_test.go
index bfae1ed7..cf8273f2 100755
--- a/pkg/controller/config/config_flags_test.go
+++ b/pkg/controller/config/config_flags_test.go
@@ -165,10 +165,10 @@ func TestConfig_SetFlags(t *testing.T) {
}
})
})
- t.Run("Test_prof-port", func(t *testing.T) {
+ t.Run("Test_profilerPort", func(t *testing.T) {
t.Run("DefaultValue", func(t *testing.T) {
// Test that default value is set properly
- if vString, err := cmdFlags.GetString("prof-port"); err == nil {
+ if vString, err := cmdFlags.GetString("profilerPort"); err == nil {
assert.Equal(t, string("10254"), vString)
} else {
assert.FailNow(t, err.Error())
@@ -178,8 +178,8 @@ func TestConfig_SetFlags(t *testing.T) {
t.Run("Override", func(t *testing.T) {
testValue := "10254"
- cmdFlags.Set("prof-port", testValue)
- if vString, err := cmdFlags.GetString("prof-port"); err == nil {
+ cmdFlags.Set("profilerPort", testValue)
+ if vString, err := cmdFlags.GetString("profilerPort"); err == nil {
testDecodeJson_Config(t, fmt.Sprintf("%v", vString), &actual.ProfilerPort)
} else {
@@ -231,10 +231,10 @@ func TestConfig_SetFlags(t *testing.T) {
}
})
})
- t.Run("Test_ProxyPort", func(t *testing.T) {
+ t.Run("Test_proxyPort", func(t *testing.T) {
t.Run("DefaultValue", func(t *testing.T) {
// Test that default value is set properly
- if vString, err := cmdFlags.GetString("ProxyPort"); err == nil {
+ if vString, err := cmdFlags.GetString("proxyPort"); err == nil {
assert.Equal(t, string("8001"), vString)
} else {
assert.FailNow(t, err.Error())
@@ -244,8 +244,8 @@ func TestConfig_SetFlags(t *testing.T) {
t.Run("Override", func(t *testing.T) {
testValue := "8001"
- cmdFlags.Set("ProxyPort", testValue)
- if vString, err := cmdFlags.GetString("ProxyPort"); err == nil {
+ cmdFlags.Set("proxyPort", testValue)
+ if vString, err := cmdFlags.GetString("proxyPort"); err == nil {
testDecodeJson_Config(t, fmt.Sprintf("%v", vString), &actual.ProxyPort)
} else {