diff --git a/Dockerfile b/Dockerfile index 748b45b4..1912435a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.12.7-alpine3.10 as builder +FROM golang:1.12.9-alpine3.10 as builder RUN apk add git openssh-client make curl bash COPY boilerplate/lyft/golang_test_targets/dep_install.sh /go/src/github.com/lyft/flinkk8soperator/ @@ -23,6 +23,6 @@ RUN make linux_compile ENV PATH="/artifacts:${PATH}" # This will eventually move to centurylink/ca-certs:latest for minimum possible image size -FROM alpine:3.9 +FROM alpine:3.10 COPY --from=builder /artifacts /bin CMD ["flinkoperator"] diff --git a/deploy/crd.yaml b/deploy/crd.yaml index 1f2ea7ce..ed120683 100644 --- a/deploy/crd.yaml +++ b/deploy/crd.yaml @@ -40,6 +40,8 @@ spec: properties: name: type: string + serviceAccountName: + type: string jarName: type: string programArgs: diff --git a/pkg/apis/app/v1beta1/types.go b/pkg/apis/app/v1beta1/types.go index 60b51be1..0c9bd244 100644 --- a/pkg/apis/app/v1beta1/types.go +++ b/pkg/apis/app/v1beta1/types.go @@ -33,6 +33,7 @@ type FlinkApplicationSpec struct { Image string `json:"image,omitempty" protobuf:"bytes,2,opt,name=image"` ImagePullPolicy apiv1.PullPolicy `json:"imagePullPolicy,omitempty" protobuf:"bytes,14,opt,name=imagePullPolicy,casttype=PullPolicy"` ImagePullSecrets []apiv1.LocalObjectReference `json:"imagePullSecrets,omitempty" patchStrategy:"merge" patchMergeKey:"name" protobuf:"bytes,15,rep,name=imagePullSecrets"` + ServiceAccountName string `json:"serviceAccountName,omitempty"` FlinkConfig FlinkConfig `json:"flinkConfig"` FlinkVersion string `json:"flinkVersion"` TaskManagerConfig TaskManagerConfig `json:"taskManagerConfig,omitempty"` diff --git a/pkg/controller/flink/config.go b/pkg/controller/flink/config.go index efa31498..17993444 100644 --- a/pkg/controller/flink/config.go +++ b/pkg/controller/flink/config.go @@ -42,6 +42,10 @@ func getJobmanagerReplicas(app *v1beta1.FlinkApplication) int32 { return firstNonNil(app.Spec.JobManagerConfig.Replicas, JobManagerDefaultReplicaCount) } +func getServiceAccountName(app *v1beta1.FlinkApplication) string { + return app.Spec.ServiceAccountName +} + func getRPCPort(app *v1beta1.FlinkApplication) int32 { return firstNonNil(app.Spec.RPCPort, RPCDefaultPort) } diff --git a/pkg/controller/flink/job_manager_controller.go b/pkg/controller/flink/job_manager_controller.go index 1fc35180..c7295de2 100644 --- a/pkg/controller/flink/job_manager_controller.go +++ b/pkg/controller/flink/job_manager_controller.go @@ -299,7 +299,7 @@ func jobmanagerTemplate(app *v1beta1.FlinkApplication) *v1.Deployment { replicas := getJobmanagerReplicas(app) jobManagerContainer := FetchJobManagerContainerObj(app) - return &v1.Deployment{ + deployment := &v1.Deployment{ TypeMeta: metaV1.TypeMeta{ APIVersion: v1.SchemeGroupVersion.String(), Kind: k8.Deployment, @@ -335,6 +335,12 @@ func jobmanagerTemplate(app *v1beta1.FlinkApplication) *v1.Deployment { }, }, } + + serviceAccountName := getServiceAccountName(app) + if serviceAccountName != "" { + deployment.Spec.Template.Spec.ServiceAccountName = serviceAccountName + } + return deployment } func FetchJobMangerDeploymentCreateObj(app *v1beta1.FlinkApplication, hash string) *v1.Deployment { diff --git a/pkg/controller/flink/task_manager_controller.go b/pkg/controller/flink/task_manager_controller.go index cfa93928..757afbc1 100644 --- a/pkg/controller/flink/task_manager_controller.go +++ b/pkg/controller/flink/task_manager_controller.go @@ -176,7 +176,8 @@ func taskmanagerTemplate(app *v1beta1.FlinkApplication) *v1.Deployment { taskContainer := FetchTaskManagerContainerObj(app) replicas := computeTaskManagerReplicas(app) - return &v1.Deployment{ + + deployment := &v1.Deployment{ TypeMeta: metaV1.TypeMeta{ APIVersion: v1.SchemeGroupVersion.String(), Kind: k8.Deployment, @@ -212,6 +213,12 @@ func taskmanagerTemplate(app *v1beta1.FlinkApplication) *v1.Deployment { }, }, } + + serviceAccountName := getServiceAccountName(app) + if serviceAccountName != "" { + deployment.Spec.Template.Spec.ServiceAccountName = serviceAccountName + } + return deployment } func FetchTaskMangerDeploymentCreateObj(app *v1beta1.FlinkApplication, hash string) *v1.Deployment {