Skip to content

Commit

Permalink
Added an optional config in the CR to specify Flink application servi…
Browse files Browse the repository at this point in the history
…ce account (#83)

* Added support for specifying service accounts for JM and TM deployments
  • Loading branch information
yuchaoran2011 authored and anandswaminathan committed Aug 22, 2019
1 parent 94725bf commit 2d8a52e
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 4 deletions.
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.12.7-alpine3.10 as builder
FROM golang:1.12.9-alpine3.10 as builder
RUN apk add git openssh-client make curl bash

COPY boilerplate/lyft/golang_test_targets/dep_install.sh /go/src/github.com/lyft/flinkk8soperator/
Expand All @@ -23,6 +23,6 @@ RUN make linux_compile
ENV PATH="/artifacts:${PATH}"

# This will eventually move to centurylink/ca-certs:latest for minimum possible image size
FROM alpine:3.9
FROM alpine:3.10
COPY --from=builder /artifacts /bin
CMD ["flinkoperator"]
2 changes: 2 additions & 0 deletions deploy/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ spec:
properties:
name:
type: string
serviceAccountName:
type: string
jarName:
type: string
programArgs:
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/app/v1beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type FlinkApplicationSpec struct {
Image string `json:"image,omitempty" protobuf:"bytes,2,opt,name=image"`
ImagePullPolicy apiv1.PullPolicy `json:"imagePullPolicy,omitempty" protobuf:"bytes,14,opt,name=imagePullPolicy,casttype=PullPolicy"`
ImagePullSecrets []apiv1.LocalObjectReference `json:"imagePullSecrets,omitempty" patchStrategy:"merge" patchMergeKey:"name" protobuf:"bytes,15,rep,name=imagePullSecrets"`
ServiceAccountName string `json:"serviceAccountName,omitempty"`
FlinkConfig FlinkConfig `json:"flinkConfig"`
FlinkVersion string `json:"flinkVersion"`
TaskManagerConfig TaskManagerConfig `json:"taskManagerConfig,omitempty"`
Expand Down
4 changes: 4 additions & 0 deletions pkg/controller/flink/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ func getJobmanagerReplicas(app *v1beta1.FlinkApplication) int32 {
return firstNonNil(app.Spec.JobManagerConfig.Replicas, JobManagerDefaultReplicaCount)
}

func getServiceAccountName(app *v1beta1.FlinkApplication) string {
return app.Spec.ServiceAccountName
}

func getRPCPort(app *v1beta1.FlinkApplication) int32 {
return firstNonNil(app.Spec.RPCPort, RPCDefaultPort)
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/controller/flink/job_manager_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func jobmanagerTemplate(app *v1beta1.FlinkApplication) *v1.Deployment {
replicas := getJobmanagerReplicas(app)
jobManagerContainer := FetchJobManagerContainerObj(app)

return &v1.Deployment{
deployment := &v1.Deployment{
TypeMeta: metaV1.TypeMeta{
APIVersion: v1.SchemeGroupVersion.String(),
Kind: k8.Deployment,
Expand Down Expand Up @@ -335,6 +335,12 @@ func jobmanagerTemplate(app *v1beta1.FlinkApplication) *v1.Deployment {
},
},
}

serviceAccountName := getServiceAccountName(app)
if serviceAccountName != "" {
deployment.Spec.Template.Spec.ServiceAccountName = serviceAccountName
}
return deployment
}

func FetchJobMangerDeploymentCreateObj(app *v1beta1.FlinkApplication, hash string) *v1.Deployment {
Expand Down
9 changes: 8 additions & 1 deletion pkg/controller/flink/task_manager_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ func taskmanagerTemplate(app *v1beta1.FlinkApplication) *v1.Deployment {
taskContainer := FetchTaskManagerContainerObj(app)

replicas := computeTaskManagerReplicas(app)
return &v1.Deployment{

deployment := &v1.Deployment{
TypeMeta: metaV1.TypeMeta{
APIVersion: v1.SchemeGroupVersion.String(),
Kind: k8.Deployment,
Expand Down Expand Up @@ -212,6 +213,12 @@ func taskmanagerTemplate(app *v1beta1.FlinkApplication) *v1.Deployment {
},
},
}

serviceAccountName := getServiceAccountName(app)
if serviceAccountName != "" {
deployment.Spec.Template.Spec.ServiceAccountName = serviceAccountName
}
return deployment
}

func FetchTaskMangerDeploymentCreateObj(app *v1beta1.FlinkApplication, hash string) *v1.Deployment {
Expand Down

0 comments on commit 2d8a52e

Please sign in to comment.