From 1456182c3cf38e3c2fba2427dad69da52fc34f9b Mon Sep 17 00:00:00 2001 From: Micah Wylde Date: Mon, 17 Jun 2019 16:05:35 -0700 Subject: [PATCH] [STRMCMP-509] Make old cluster clean-up idempotent (#27) --- pkg/controller/flink/container_utils.go | 2 +- pkg/controller/flink/flink.go | 161 +++++++++--------- pkg/controller/flink/flink_test.go | 158 +++++++---------- .../flink/job_manager_controller.go | 32 +--- pkg/controller/flink/mock/mock_flink.go | 46 ++--- .../flink/task_manager_controller.go | 13 -- .../flinkapplication/flink_state_machine.go | 13 +- .../flink_state_machine_test.go | 12 +- pkg/controller/k8/cluster.go | 27 +++ pkg/controller/k8/mock/mock_k8.go | 9 + 10 files changed, 222 insertions(+), 251 deletions(-) diff --git a/pkg/controller/flink/container_utils.go b/pkg/controller/flink/container_utils.go index 2330fc24..508512cb 100644 --- a/pkg/controller/flink/container_utils.go +++ b/pkg/controller/flink/container_utils.go @@ -172,7 +172,7 @@ func InjectHashesIntoConfig(deployment *appsv1.Deployment, app *v1alpha1.FlinkAp for _, env := range container.Env { if env.Name == OperatorFlinkConfig { env.Value = fmt.Sprintf("%s\nhigh-availability.cluster-id: %s-%s\n", env.Value, app.Name, hash) - env.Value = fmt.Sprintf("%sjobmanager.rpc.address: %s\n", env.Value, VersionedJobManagerService(app, hash)) + env.Value = fmt.Sprintf("%sjobmanager.rpc.address: %s\n", env.Value, VersionedJobManagerServiceName(app, hash)) } newEnv = append(newEnv, env) } diff --git a/pkg/controller/flink/flink.go b/pkg/controller/flink/flink.go index 55c906c1..b373c0fb 100644 --- a/pkg/controller/flink/flink.go +++ b/pkg/controller/flink/flink.go @@ -7,6 +7,8 @@ import ( "fmt" "time" + "k8s.io/apimachinery/pkg/runtime" + "github.com/lyft/flinkk8soperator/pkg/controller/common" "github.com/lyft/flinkk8soperator/pkg/controller/config" @@ -45,9 +47,6 @@ type ControllerInterface interface { // Creates a Flink cluster with necessary Job Manager, Task Managers and services for UI CreateCluster(ctx context.Context, application *v1alpha1.FlinkApplication) error - // Deletes a Flink cluster based on the hash - DeleteCluster(ctx context.Context, application *v1alpha1.FlinkApplication, hash string) error - // Cancels the running/active jobs in the Cluster for the Application after savepoint is created CancelWithSavepoint(ctx context.Context, application *v1alpha1.FlinkApplication, hash string) (string, error) @@ -72,8 +71,11 @@ type ControllerInterface interface { // Returns the list of Jobs running on the Flink Cluster for the Application GetJobsForApplication(ctx context.Context, application *v1alpha1.FlinkApplication, hash string) ([]client.FlinkJob, error) - // For the application, a deployment corresponds to an image. This returns the current and older deployments for the app. - GetCurrentAndOldDeploymentsForApp(ctx context.Context, application *v1alpha1.FlinkApplication) (*common.FlinkDeployment, []common.FlinkDeployment, error) + // Returns the pair of deployments (tm/jm) for the current version of the application + GetCurrentDeploymentsForApp(ctx context.Context, application *v1alpha1.FlinkApplication) (*common.FlinkDeployment, error) + + // Deletes all old resources (deployments and services) for the app + DeleteOldResourcesForApp(ctx context.Context, app *v1alpha1.FlinkApplication) error // Attempts to find an externalized checkpoint for the job. This can be used to recover an application that is not // able to savepoint for some reason. @@ -105,18 +107,18 @@ func NewController(k8sCluster k8.ClusterInterface, config config.RuntimeConfig) func newControllerMetrics(scope promutils.Scope) *controllerMetrics { flinkControllerScope := scope.NewSubScope("flink_controller") return &controllerMetrics{ - scope: scope, - deleteClusterSuccessCounter: labeled.NewCounter("delete_cluster_success", "Flink cluster deleted successfully", flinkControllerScope), - deleteClusterFailedCounter: labeled.NewCounter("delete_cluster_failure", "Flink cluster deletion failed", flinkControllerScope), - applicationChangedCounter: labeled.NewCounter("app_changed_counter", "Flink application has changed", flinkControllerScope), + scope: scope, + deleteResourceSuccessCounter: labeled.NewCounter("delete_resource_success", "Flink resource deleted successfully", flinkControllerScope), + deleteResourceFailedCounter: labeled.NewCounter("delete_resource_failure", "Flink resource deletion failed", flinkControllerScope), + applicationChangedCounter: labeled.NewCounter("app_changed_counter", "Flink application has changed", flinkControllerScope), } } type controllerMetrics struct { - scope promutils.Scope - deleteClusterSuccessCounter labeled.Counter - deleteClusterFailedCounter labeled.Counter - applicationChangedCounter labeled.Counter + scope promutils.Scope + deleteResourceSuccessCounter labeled.Counter + deleteResourceFailedCounter labeled.Counter + applicationChangedCounter labeled.Counter } type Controller struct { @@ -128,7 +130,7 @@ type Controller struct { } func getURLFromApp(application *v1alpha1.FlinkApplication, hash string) string { - service := VersionedJobManagerService(application, hash) + service := VersionedJobManagerServiceName(application, hash) cfg := config.GetConfig() if cfg.UseProxy { return fmt.Sprintf(proxyURL, cfg.ProxyPort.Port, application.Namespace, service) @@ -251,39 +253,6 @@ func (f *Controller) GetSavepointStatus(ctx context.Context, application *v1alph return f.flinkClient.CheckSavepointStatus(ctx, getURLFromApp(application, hash), jobID, application.Spec.SavepointInfo.TriggerID) } -func (f *Controller) DeleteCluster(ctx context.Context, application *v1alpha1.FlinkApplication, hash string) error { - if hash == "" { - return errors.New("invalid hash: must not be empty") - } - - jmDeployment := FetchJobMangerDeploymentDeleteObj(application, hash) - err := f.k8Cluster.DeleteK8Object(ctx, jmDeployment) - if err != nil { - f.metrics.deleteClusterFailedCounter.Inc(ctx) - logger.Warnf(ctx, "Failed to delete jobmanager deployment") - return err - } - - tmDeployment := FetchTaskMangerDeploymentDeleteObj(application, hash) - err = f.k8Cluster.DeleteK8Object(ctx, tmDeployment) - if err != nil { - f.metrics.deleteClusterFailedCounter.Inc(ctx) - logger.Warnf(ctx, "Failed to delete taskmanager deployment") - return err - } - - versionedJobService := FetchVersionedJobManagerServiceDeleteObj(application, hash) - err = f.k8Cluster.DeleteK8Object(ctx, versionedJobService) - if err != nil { - f.metrics.deleteClusterFailedCounter.Inc(ctx) - logger.Warnf(ctx, "Failed to delete versioned service") - return err - } - - f.metrics.deleteClusterSuccessCounter.Inc(ctx) - return nil -} - func (f *Controller) IsClusterReady(ctx context.Context, application *v1alpha1.FlinkApplication) (bool, error) { labelMap := GetAppHashSelector(application) @@ -348,52 +317,90 @@ func listToFlinkDeployment(ds []v1.Deployment, hash string) *common.FlinkDeploym return &fd } +func getCurrentHash(app *v1alpha1.FlinkApplication) string { + appHash := HashForApplication(app) + + if appHash == app.Status.FailedDeployHash { + return app.Status.DeployHash + } + return appHash +} + // Gets the current deployment and any other deployments for the application. The current deployment will be the one // that matches the FlinkApplication, unless the FailedDeployHash is set, in which case it will be the one with that // hash. -func (f *Controller) GetCurrentAndOldDeploymentsForApp(ctx context.Context, - application *v1alpha1.FlinkApplication) (*common.FlinkDeployment, []common.FlinkDeployment, error) { - appLabels := k8.GetAppLabel(application.Name) - deployments, err := f.k8Cluster.GetDeploymentsWithLabel(ctx, application.Namespace, appLabels) +func (f *Controller) GetCurrentDeploymentsForApp(ctx context.Context, application *v1alpha1.FlinkApplication) (*common.FlinkDeployment, error) { + labels := k8.GetAppLabel(application.Name) + curHash := getCurrentHash(application) + labels[FlinkAppHash] = curHash + + deployments, err := f.k8Cluster.GetDeploymentsWithLabel(ctx, application.Namespace, labels) if err != nil { - return nil, nil, err + return nil, err } - byHash := map[string][]v1.Deployment{} - for _, deployment := range deployments.Items { - byHash[deployment.Labels[FlinkAppHash]] = append(byHash[deployment.Labels[FlinkAppHash]], deployment) + cur := listToFlinkDeployment(deployments.Items, curHash) + if cur != nil && application.Status.FailedDeployHash == "" && + (!f.deploymentMatches(ctx, cur.Jobmanager, application) || !f.deploymentMatches(ctx, cur.Taskmanager, application)) { + // we had a hash collision (i.e., the previous application has the same hash as the new one) + // this is *very* unlikely to occur (1/2^32) + return nil, errors.New("found hash collision for deployment, you must do a clean deploy") } - appHash := HashForApplication(application) - var curHash string + return cur, nil +} - if appHash == application.Status.FailedDeployHash { - curHash = application.Status.DeployHash - } else { - curHash = appHash +func (f *Controller) DeleteOldResourcesForApp(ctx context.Context, app *v1alpha1.FlinkApplication) error { + curHash := getCurrentHash(app) + + appLabel := k8.GetAppLabel(app.Name) + deployments, err := f.k8Cluster.GetDeploymentsWithLabel(ctx, app.Namespace, appLabel) + if err != nil { + return err } - cur := listToFlinkDeployment(byHash[curHash], curHash) - if cur != nil && application.Status.FailedDeployHash == "" && - (!f.deploymentMatches(ctx, cur.Jobmanager, application) || !f.deploymentMatches(ctx, cur.Taskmanager, application)) { - // we had a hash collision (i.e., the previous application has the same hash as the new one) - // this is *very* unlikely to occur (1/2^32) - return nil, nil, errors.New("found hash collision for deployment, you must do a clean deploy") + oldObjects := make([]metav1.Object, 0) + + for _, d := range deployments.Items { + if d.Labels[FlinkAppHash] != "" && + d.Labels[FlinkAppHash] != curHash && + // verify that this deployment matches the jobmanager or taskmanager naming format + (d.Name == fmt.Sprintf(JobManagerNameFormat, app.Name, d.Labels[FlinkAppHash]) || + d.Name == fmt.Sprintf(TaskManagerNameFormat, app.Name, d.Labels[FlinkAppHash])) { + oldObjects = append(oldObjects, d.DeepCopy()) + } } - old := make([]common.FlinkDeployment, 0) - for hash, ds := range byHash { - if hash != curHash { - fd := listToFlinkDeployment(ds, hash) - if fd != nil { - old = append(old, *fd) - } else { - logger.Warn(ctx, "Found deployments that do not have one JM and TM: %v", ds) - } + services, err := f.k8Cluster.GetServicesWithLabel(ctx, app.Namespace, appLabel) + if err != nil { + return err + } + + for _, d := range services.Items { + if d.Labels[FlinkAppHash] != "" && + d.Labels[FlinkAppHash] != curHash && + d.Name == VersionedJobManagerServiceName(app, d.Labels[FlinkAppHash]) { + oldObjects = append(oldObjects, d.DeepCopy()) } } - return cur, old, nil + deletedHashes := make(map[string]bool) + + for _, resource := range oldObjects { + err := f.k8Cluster.DeleteK8Object(ctx, resource.(runtime.Object)) + if err != nil { + f.metrics.deleteResourceFailedCounter.Inc(ctx) + return err + } + f.metrics.deleteResourceSuccessCounter.Inc(ctx) + deletedHashes[resource.GetLabels()[FlinkAppHash]] = true + } + + for k := range deletedHashes { + f.LogEvent(ctx, app, "", corev1.EventTypeNormal, fmt.Sprintf("Deleted old cluster with hash %s", k)) + } + + return nil } func (f *Controller) FindExternalizedCheckpoint(ctx context.Context, application *v1alpha1.FlinkApplication, hash string) (string, error) { diff --git a/pkg/controller/flink/flink_test.go b/pkg/controller/flink/flink_test.go index 7d88f04f..2fcc1365 100644 --- a/pkg/controller/flink/flink_test.go +++ b/pkg/controller/flink/flink_test.go @@ -17,6 +17,7 @@ import ( "github.com/pkg/errors" "github.com/stretchr/testify/assert" v1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -91,44 +92,11 @@ func TestFlinkIsClusterReady(t *testing.T) { assert.Nil(t, err) } -func TestFlinkApplicationChangedReplicas(t *testing.T) { - flinkControllerForTest := getTestFlinkController() - labelMapVal := map[string]string{ - "flink-app": testAppName, - } - - flinkApp := getFlinkTestApp() - taskSlots := int32(16) - flinkApp.Spec.TaskManagerConfig.TaskSlots = &taskSlots - flinkApp.Spec.Parallelism = 8 - - mockK8Cluster := flinkControllerForTest.k8Cluster.(*k8mock.K8Cluster) - mockK8Cluster.GetDeploymentsWithLabelFunc = func(ctx context.Context, namespace string, labelMap map[string]string) (*v1.DeploymentList, error) { - assert.Equal(t, testNamespace, namespace) - assert.Equal(t, labelMapVal, labelMap) - - newApp := flinkApp.DeepCopy() - newApp.Spec.Parallelism = 10 - hash := HashForApplication(newApp) - tm := *FetchTaskMangerDeploymentCreateObj(newApp, hash) - jm := *FetchJobMangerDeploymentCreateObj(newApp, hash) - - return &v1.DeploymentList{ - Items: []v1.Deployment{tm, jm}, - }, nil - } - - cur, _, err := flinkControllerForTest.GetCurrentAndOldDeploymentsForApp( - context.Background(), &flinkApp, - ) - assert.True(t, cur == nil) - assert.Nil(t, err) -} - func TestFlinkApplicationNotChanged(t *testing.T) { flinkControllerForTest := getTestFlinkController() labelMapVal := map[string]string{ - "flink-app": testAppName, + "flink-app": testAppName, + "flink-app-hash": testAppHash, } flinkApp := getFlinkTestApp() mockK8Cluster := flinkControllerForTest.k8Cluster.(*k8mock.K8Cluster) @@ -142,7 +110,7 @@ func TestFlinkApplicationNotChanged(t *testing.T) { }, }, nil } - cur, _, err := flinkControllerForTest.GetCurrentAndOldDeploymentsForApp( + cur, err := flinkControllerForTest.GetCurrentDeploymentsForApp( context.Background(), &flinkApp, ) assert.Nil(t, err) @@ -152,7 +120,8 @@ func TestFlinkApplicationNotChanged(t *testing.T) { func TestFlinkApplicationChanged(t *testing.T) { flinkControllerForTest := getTestFlinkController() labelMapVal := map[string]string{ - "flink-app": testAppName, + "flink-app": testAppName, + "flink-app-hash": testAppHash, } mockK8Cluster := flinkControllerForTest.k8Cluster.(*k8mock.K8Cluster) mockK8Cluster.GetDeploymentsWithLabelFunc = func(ctx context.Context, namespace string, labelMap map[string]string) (*v1.DeploymentList, error) { @@ -161,7 +130,7 @@ func TestFlinkApplicationChanged(t *testing.T) { return &v1.DeploymentList{}, nil } flinkApp := getFlinkTestApp() - cur, _, err := flinkControllerForTest.GetCurrentAndOldDeploymentsForApp( + cur, err := flinkControllerForTest.GetCurrentDeploymentsForApp( context.Background(), &flinkApp, ) assert.True(t, cur == nil) @@ -175,25 +144,27 @@ func testJobPropTriggersChange(t *testing.T, changeFun func(application *v1alpha mockK8Cluster := flinkControllerForTest.k8Cluster.(*k8mock.K8Cluster) mockK8Cluster.GetDeploymentsWithLabelFunc = func(ctx context.Context, namespace string, labelMap map[string]string) (*v1.DeploymentList, error) { assert.Equal(t, testNamespace, namespace) - if val, ok := labelMap["flink-app-hash"]; ok { - assert.Equal(t, testAppHash, val) - } if val, ok := labelMap["flink-app"]; ok { assert.Equal(t, testAppName, val) } + hash := HashForApplication(&flinkApp) - tm := FetchTaskMangerDeploymentCreateObj(&flinkApp, hash) - jm := FetchJobMangerDeploymentCreateObj(&flinkApp, hash) - return &v1.DeploymentList{ - Items: []v1.Deployment{ - *tm, *jm, - }, - }, nil + if hash == labelMap[FlinkAppHash] { + tm := FetchTaskMangerDeploymentCreateObj(&flinkApp, hash) + jm := FetchJobMangerDeploymentCreateObj(&flinkApp, hash) + return &v1.DeploymentList{ + Items: []v1.Deployment{ + *tm, *jm, + }, + }, nil + } + + return &v1.DeploymentList{}, nil } newApp := flinkApp.DeepCopy() changeFun(newApp) - cur, _, err := flinkControllerForTest.GetCurrentAndOldDeploymentsForApp( + cur, err := flinkControllerForTest.GetCurrentDeploymentsForApp( context.Background(), newApp, ) assert.True(t, cur == nil) @@ -218,42 +189,6 @@ func TestFlinkApplicationChangedJobProps(t *testing.T) { }) } -func TestFlinkApplicationNeedsUpdate(t *testing.T) { - flinkControllerForTest := getTestFlinkController() - flinkApp := getFlinkTestApp() - - mockK8Cluster := flinkControllerForTest.k8Cluster.(*k8mock.K8Cluster) - mockK8Cluster.GetDeploymentsWithLabelFunc = func(ctx context.Context, namespace string, labelMap map[string]string) (*v1.DeploymentList, error) { - assert.Equal(t, testNamespace, namespace) - if val, ok := labelMap["flink-app-hash"]; ok { - assert.Equal(t, testAppHash, val) - } - if val, ok := labelMap["flink-app"]; ok { - assert.Equal(t, testAppName, val) - } - - app := getFlinkTestApp() - jm := FetchJobMangerDeploymentCreateObj(&app, testAppHash) - tm := FetchTaskMangerDeploymentCreateObj(&app, testAppHash) - - return &v1.DeploymentList{ - Items: []v1.Deployment{ - *jm, *tm, - }, - }, nil - } - - numberOfTaskManagers := int32(2) - taskSlots := int32(2) - flinkApp.Spec.TaskManagerConfig.TaskSlots = &taskSlots - flinkApp.Spec.Parallelism = taskSlots*numberOfTaskManagers + 1 - cur, _, err := flinkControllerForTest.GetCurrentAndOldDeploymentsForApp( - context.Background(), &flinkApp, - ) - assert.True(t, cur == nil) - assert.Nil(t, err) -} - func TestFlinkIsServiceReady(t *testing.T) { flinkControllerForTest := getTestFlinkController() flinkApp := getFlinkTestApp() @@ -368,29 +303,62 @@ func TestGetActiveJobEmpty(t *testing.T) { assert.Nil(t, activeJob) } -func TestDeleteCluster(t *testing.T) { +func TestDeleteOldResources(t *testing.T) { flinkControllerForTest := getTestFlinkController() - flinkApp := getFlinkTestApp() - jmDeployment := FetchJobMangerDeploymentDeleteObj(&flinkApp, "hash") - tmDeployment := FetchTaskMangerDeploymentDeleteObj(&flinkApp, "hash") - service := FetchVersionedJobManagerServiceDeleteObj(&flinkApp, "hash") + app := getFlinkTestApp() + + jmDeployment := FetchTaskMangerDeploymentCreateObj(&app, "oldhash") + tmDeployment := FetchJobMangerDeploymentCreateObj(&app, "oldhash") + service := FetchJobManagerServiceCreateObj(&app, "oldhash") + service.Labels[FlinkAppHash] = "oldhash" + service.Name = VersionedJobManagerServiceName(&app, "oldhash") - ctr := 0 mockK8Cluster := flinkControllerForTest.k8Cluster.(*k8mock.K8Cluster) + + mockK8Cluster.GetDeploymentsWithLabelFunc = func(ctx context.Context, namespace string, labelMap map[string]string) (*v1.DeploymentList, error) { + curJobmanager := FetchJobMangerDeploymentCreateObj(&app, testAppHash) + curTaskmanager := FetchTaskMangerDeploymentCreateObj(&app, testAppHash) + return &v1.DeploymentList{ + Items: []v1.Deployment{ + *jmDeployment, + *tmDeployment, + *curJobmanager, + *curTaskmanager, + }, + }, nil + } + + mockK8Cluster.GetServicesWithLabelFunc = func(ctx context.Context, namespace string, labelMap map[string]string) (*corev1.ServiceList, error) { + curService := FetchJobManagerServiceCreateObj(&app, testAppHash) + curService.Labels[FlinkAppHash] = testAppHash + curService.Name = VersionedJobManagerServiceName(&app, testAppHash) + + generic := FetchJobManagerServiceCreateObj(&app, testAppHash) + return &corev1.ServiceList{ + Items: []corev1.Service{ + *service, + *curService, + *generic, + }, + }, nil + } + + ctr := 0 mockK8Cluster.DeleteK8ObjectFunc = func(ctx context.Context, object runtime.Object) error { ctr++ switch ctr { case 1: - assert.Equal(t, object, jmDeployment) + assert.Equal(t, jmDeployment, object) case 2: - assert.Equal(t, object, tmDeployment) + assert.Equal(t, tmDeployment, object) case 3: - assert.Equal(t, object, service) + assert.Equal(t, service, object) } return nil } - err := flinkControllerForTest.DeleteCluster(context.Background(), &flinkApp, "hash") + err := flinkControllerForTest.DeleteOldResourcesForApp(context.Background(), &app) + assert.Equal(t, 3, ctr) assert.Nil(t, err) } diff --git a/pkg/controller/flink/job_manager_controller.go b/pkg/controller/flink/job_manager_controller.go index 7f9f0c78..69684ff4 100644 --- a/pkg/controller/flink/job_manager_controller.go +++ b/pkg/controller/flink/job_manager_controller.go @@ -40,7 +40,7 @@ const ( FlinkInternalMetricPortName = "metrics" ) -func VersionedJobManagerService(app *v1alpha1.FlinkApplication, hash string) string { +func VersionedJobManagerServiceName(app *v1alpha1.FlinkApplication, hash string) string { return fmt.Sprintf("%s-%s", app.Name, hash) } @@ -121,7 +121,8 @@ func (j *JobManagerController) CreateIfNotExist(ctx context.Context, application // create the service for _this_ version of the flink application // this gives us a stable and reliable way to target a particular cluster during upgrades versionedJobManagerService := FetchJobManagerServiceCreateObj(application, hash) - versionedJobManagerService.Name = VersionedJobManagerService(application, hash) + versionedJobManagerService.Name = VersionedJobManagerServiceName(application, hash) + versionedJobManagerService.Labels[FlinkAppHash] = hash err = j.k8Cluster.CreateK8Object(ctx, versionedJobManagerService) if err != nil { @@ -174,19 +175,6 @@ func getJobManagerName(application *v1alpha1.FlinkApplication, hash string) stri return fmt.Sprintf(JobManagerNameFormat, applicationName, hash) } -func FetchVersionedJobManagerServiceDeleteObj(app *v1alpha1.FlinkApplication, hash string) *coreV1.Service { - return &coreV1.Service{ - TypeMeta: metaV1.TypeMeta{ - APIVersion: coreV1.SchemeGroupVersion.String(), - Kind: k8.Service, - }, - ObjectMeta: metaV1.ObjectMeta{ - Name: VersionedJobManagerService(app, hash), - Namespace: app.Namespace, - }, - } -} - func FetchJobManagerServiceCreateObj(app *v1alpha1.FlinkApplication, hash string) *coreV1.Service { jmServiceName := app.Name serviceLabels := getCommonAppLabels(app) @@ -204,6 +192,7 @@ func FetchJobManagerServiceCreateObj(app *v1alpha1.FlinkApplication, hash string OwnerReferences: []metaV1.OwnerReference{ *metaV1.NewControllerRef(app, app.GroupVersionKind()), }, + Labels: getCommonAppLabels(app), }, Spec: coreV1.ServiceSpec{ Ports: getJobManagerServicePorts(app), @@ -290,19 +279,6 @@ func DeploymentIsJobmanager(deployment *v1.Deployment) bool { return deployment.Labels[FlinkDeploymentType] == FlinkDeploymentTypeJobmanager } -func FetchJobMangerDeploymentDeleteObj(app *v1alpha1.FlinkApplication, hash string) *v1.Deployment { - return &v1.Deployment{ - TypeMeta: metaV1.TypeMeta{ - APIVersion: v1.SchemeGroupVersion.String(), - Kind: k8.Deployment, - }, - ObjectMeta: metaV1.ObjectMeta{ - Namespace: app.Namespace, - Name: getJobManagerName(app, hash), - }, - } -} - // Translates a FlinkApplication into a JobManager deployment. Changes to this function must be // made very carefully. Any new version v' that causes DeploymentsEqual(v(x), v'(x)) to be false // will cause redeployments for all applications, and should be considered a breaking change that diff --git a/pkg/controller/flink/mock/mock_flink.go b/pkg/controller/flink/mock/mock_flink.go index f11d6f4d..bc3d060c 100644 --- a/pkg/controller/flink/mock/mock_flink.go +++ b/pkg/controller/flink/mock/mock_flink.go @@ -11,7 +11,7 @@ import ( ) type CreateClusterFunc func(ctx context.Context, application *v1alpha1.FlinkApplication) error -type DeleteClusterFunc func(ctx context.Context, application *v1alpha1.FlinkApplication, hash string) error +type DeleteOldResourcesForApp func(ctx context.Context, application *v1alpha1.FlinkApplication) error type CancelWithSavepointFunc func(ctx context.Context, application *v1alpha1.FlinkApplication, hash string) (string, error) type ForceCancelFunc func(ctx context.Context, application *v1alpha1.FlinkApplication, hash string) error type StartFlinkJobFunc func(ctx context.Context, application *v1alpha1.FlinkApplication, hash string, @@ -20,38 +20,38 @@ type GetSavepointStatusFunc func(ctx context.Context, application *v1alpha1.Flin type IsClusterReadyFunc func(ctx context.Context, application *v1alpha1.FlinkApplication) (bool, error) type IsServiceReadyFunc func(ctx context.Context, application *v1alpha1.FlinkApplication, hash string) (bool, error) type GetJobsForApplicationFunc func(ctx context.Context, application *v1alpha1.FlinkApplication, hash string) ([]client.FlinkJob, error) -type GetCurrentAndOldDeploymentsForAppFunc func(ctx context.Context, application *v1alpha1.FlinkApplication) (*common.FlinkDeployment, []common.FlinkDeployment, error) +type GetCurrentDeploymentsForAppFunc func(ctx context.Context, application *v1alpha1.FlinkApplication) (*common.FlinkDeployment, error) type FindExternalizedCheckpointFunc func(ctx context.Context, application *v1alpha1.FlinkApplication, hash string) (string, error) type CompareAndUpdateClusterStatusFunc func(ctx context.Context, application *v1alpha1.FlinkApplication, hash string) (bool, error) type CompareAndUpdateJobStatusFunc func(ctx context.Context, application *v1alpha1.FlinkApplication, hash string) (bool, error) type FlinkController struct { - CreateClusterFunc CreateClusterFunc - DeleteClusterFunc DeleteClusterFunc - CancelWithSavepointFunc CancelWithSavepointFunc - ForceCancelFunc ForceCancelFunc - StartFlinkJobFunc StartFlinkJobFunc - GetSavepointStatusFunc GetSavepointStatusFunc - IsClusterReadyFunc IsClusterReadyFunc - IsServiceReadyFunc IsServiceReadyFunc - GetJobsForApplicationFunc GetJobsForApplicationFunc - GetCurrentAndOldDeploymentsForAppFunc GetCurrentAndOldDeploymentsForAppFunc - FindExternalizedCheckpointFunc FindExternalizedCheckpointFunc - Events []corev1.Event - CompareAndUpdateClusterStatusFunc CompareAndUpdateClusterStatusFunc - CompareAndUpdateJobStatusFunc CompareAndUpdateJobStatusFunc + CreateClusterFunc CreateClusterFunc + DeleteOldResourcesForAppFunc DeleteOldResourcesForApp + CancelWithSavepointFunc CancelWithSavepointFunc + ForceCancelFunc ForceCancelFunc + StartFlinkJobFunc StartFlinkJobFunc + GetSavepointStatusFunc GetSavepointStatusFunc + IsClusterReadyFunc IsClusterReadyFunc + IsServiceReadyFunc IsServiceReadyFunc + GetJobsForApplicationFunc GetJobsForApplicationFunc + GetCurrentDeploymentsForAppFunc GetCurrentDeploymentsForAppFunc + FindExternalizedCheckpointFunc FindExternalizedCheckpointFunc + Events []corev1.Event + CompareAndUpdateClusterStatusFunc CompareAndUpdateClusterStatusFunc + CompareAndUpdateJobStatusFunc CompareAndUpdateJobStatusFunc } -func (m *FlinkController) GetCurrentAndOldDeploymentsForApp(ctx context.Context, application *v1alpha1.FlinkApplication) (*common.FlinkDeployment, []common.FlinkDeployment, error) { - if m.GetCurrentAndOldDeploymentsForAppFunc != nil { - return m.GetCurrentAndOldDeploymentsForAppFunc(ctx, application) +func (m *FlinkController) GetCurrentDeploymentsForApp(ctx context.Context, application *v1alpha1.FlinkApplication) (*common.FlinkDeployment, error) { + if m.GetCurrentDeploymentsForAppFunc != nil { + return m.GetCurrentDeploymentsForAppFunc(ctx, application) } - return nil, nil, nil + return nil, nil } -func (m *FlinkController) DeleteCluster(ctx context.Context, application *v1alpha1.FlinkApplication, hash string) error { - if m.DeleteClusterFunc != nil { - return m.DeleteClusterFunc(ctx, application, hash) +func (m *FlinkController) DeleteOldResourcesForApp(ctx context.Context, application *v1alpha1.FlinkApplication) error { + if m.DeleteOldResourcesForAppFunc != nil { + return m.DeleteOldResourcesForAppFunc(ctx, application) } return nil } diff --git a/pkg/controller/flink/task_manager_controller.go b/pkg/controller/flink/task_manager_controller.go index e73cb3a0..bbcbdb32 100644 --- a/pkg/controller/flink/task_manager_controller.go +++ b/pkg/controller/flink/task_manager_controller.go @@ -178,19 +178,6 @@ func DeploymentIsTaskmanager(deployment *v1.Deployment) bool { return deployment.Labels[FlinkDeploymentType] == FlinkDeploymentTypeTaskmanager } -func FetchTaskMangerDeploymentDeleteObj(app *v1alpha1.FlinkApplication, hash string) *v1.Deployment { - return &v1.Deployment{ - TypeMeta: metaV1.TypeMeta{ - APIVersion: v1.SchemeGroupVersion.String(), - Kind: k8.Deployment, - }, - ObjectMeta: metaV1.ObjectMeta{ - Namespace: app.Namespace, - Name: getTaskManagerName(app, hash), - }, - } -} - // Translates a FlinkApplication into a TaskManager deployment. Changes to this function must be // made very carefully. Any new version v' that causes DeploymentsEqual(v(x), v'(x)) to be false // will cause redeployments for all applications, and should be considered a breaking change that diff --git a/pkg/controller/flinkapplication/flink_state_machine.go b/pkg/controller/flinkapplication/flink_state_machine.go index b97099ed..1f9c8219 100644 --- a/pkg/controller/flinkapplication/flink_state_machine.go +++ b/pkg/controller/flinkapplication/flink_state_machine.go @@ -428,7 +428,7 @@ func (s *FlinkStateMachine) handleApplicationRunning(ctx context.Context, applic logger.Debugf(ctx, "Application running with job %v", activeJob) - cur, old, err := s.flinkController.GetCurrentAndOldDeploymentsForApp(ctx, application) + cur, err := s.flinkController.GetCurrentDeploymentsForApp(ctx, application) if err != nil { return err } @@ -441,13 +441,10 @@ func (s *FlinkStateMachine) handleApplicationRunning(ctx context.Context, applic return s.updateApplicationPhase(ctx, application, v1alpha1.FlinkApplicationUpdating) } - // If there are old deployments left-over from a previous version, clean them up - for _, fd := range old { - s.flinkController.LogEvent(ctx, application, "", corev1.EventTypeNormal, fmt.Sprintf("Deleting old cluster with hash %s", fd.Hash)) - err := s.flinkController.DeleteCluster(ctx, application, fd.Hash) - if err != nil { - return err - } + // If there are old resources left-over from a previous version, clean them up + err = s.flinkController.DeleteOldResourcesForApp(ctx, application) + if err != nil { + logger.Warn(ctx, "Failed to clean up old resources: %v", err) } // Update status of the cluster diff --git a/pkg/controller/flinkapplication/flink_state_machine_test.go b/pkg/controller/flinkapplication/flink_state_machine_test.go index a10b2a2e..c807ee7d 100644 --- a/pkg/controller/flinkapplication/flink_state_machine_test.go +++ b/pkg/controller/flinkapplication/flink_state_machine_test.go @@ -90,11 +90,11 @@ func TestHandleStartingDual(t *testing.T) { return true, nil } - mockFlinkController.GetCurrentAndOldDeploymentsForAppFunc = func(ctx context.Context, application *v1alpha1.FlinkApplication) (*common.FlinkDeployment, []common.FlinkDeployment, error) { + mockFlinkController.GetCurrentDeploymentsForAppFunc = func(ctx context.Context, application *v1alpha1.FlinkApplication) (*common.FlinkDeployment, error) { fd := testFlinkDeployment(application) fd.Taskmanager.Status.AvailableReplicas = 2 fd.Jobmanager.Status.AvailableReplicas = 1 - return &fd, nil, nil + return &fd, nil } mockK8Cluster := stateMachineForTest.k8Cluster.(*k8mock.K8Cluster) @@ -444,9 +444,9 @@ func TestHandleApplicationNotReady(t *testing.T) { func TestHandleApplicationRunning(t *testing.T) { stateMachineForTest := getTestStateMachine() mockFlinkController := stateMachineForTest.flinkController.(*mock.FlinkController) - mockFlinkController.GetCurrentAndOldDeploymentsForAppFunc = func(ctx context.Context, application *v1alpha1.FlinkApplication) (*common.FlinkDeployment, []common.FlinkDeployment, error) { + mockFlinkController.GetCurrentDeploymentsForAppFunc = func(ctx context.Context, application *v1alpha1.FlinkApplication) (*common.FlinkDeployment, error) { fd := testFlinkDeployment(application) - return &fd, nil, nil + return &fd, nil } mockK8Cluster := stateMachineForTest.k8Cluster.(*k8mock.K8Cluster) @@ -466,8 +466,8 @@ func TestRunningToClusterStarting(t *testing.T) { updateInvoked := false stateMachineForTest := getTestStateMachine() mockFlinkController := stateMachineForTest.flinkController.(*mock.FlinkController) - mockFlinkController.GetCurrentAndOldDeploymentsForAppFunc = func(ctx context.Context, application *v1alpha1.FlinkApplication) (*common.FlinkDeployment, []common.FlinkDeployment, error) { - return nil, []common.FlinkDeployment{testFlinkDeployment(application)}, nil + mockFlinkController.GetCurrentDeploymentsForAppFunc = func(ctx context.Context, application *v1alpha1.FlinkApplication) (*common.FlinkDeployment, error) { + return nil, nil } mockK8Cluster := stateMachineForTest.k8Cluster.(*k8mock.K8Cluster) diff --git a/pkg/controller/k8/cluster.go b/pkg/controller/k8/cluster.go index a3af3794..a4f4a811 100644 --- a/pkg/controller/k8/cluster.go +++ b/pkg/controller/k8/cluster.go @@ -29,6 +29,7 @@ type ClusterInterface interface { // Tries to fetch the value from the controller runtime manager cache, if it does not exist, call API server GetService(ctx context.Context, namespace string, name string) (*coreV1.Service, error) + GetServicesWithLabel(ctx context.Context, namespace string, labelMap map[string]string) (*coreV1.ServiceList, error) CreateK8Object(ctx context.Context, object runtime.Object) error UpdateK8Object(ctx context.Context, object runtime.Object) error @@ -99,6 +100,32 @@ func (k *Cluster) GetDeploymentsWithLabel(ctx context.Context, namespace string, return deploymentList, nil } +func (k *Cluster) GetServicesWithLabel(ctx context.Context, namespace string, labelMap map[string]string) (*coreV1.ServiceList, error) { + serviceList := &coreV1.ServiceList{ + TypeMeta: metav1.TypeMeta{ + APIVersion: coreV1.SchemeGroupVersion.String(), + Kind: Service, + }, + } + labelSelector := labels.SelectorFromSet(labelMap) + options := &client.ListOptions{ + LabelSelector: labelSelector, + } + err := k.cache.List(ctx, options, serviceList) + if err != nil { + if IsK8sObjectDoesNotExist(err) { + err := k.client.List(ctx, options, serviceList) + if err != nil { + logger.Warnf(ctx, "Failed to list services %v", err) + return nil, err + } + } + logger.Warnf(ctx, "Failed to list services from cache %v", err) + return nil, err + } + return serviceList, nil +} + func (k *Cluster) CreateK8Object(ctx context.Context, object runtime.Object) error { objCreate := object.DeepCopyObject() return k.client.Create(ctx, objCreate) diff --git a/pkg/controller/k8/mock/mock_k8.go b/pkg/controller/k8/mock/mock_k8.go index cb43f207..66196d7b 100644 --- a/pkg/controller/k8/mock/mock_k8.go +++ b/pkg/controller/k8/mock/mock_k8.go @@ -11,12 +11,14 @@ import ( type GetDeploymentsWithLabelFunc func(ctx context.Context, namespace string, labelMap map[string]string) (*v1.DeploymentList, error) type CreateK8ObjectFunc func(ctx context.Context, object runtime.Object) error type GetServiceFunc func(ctx context.Context, namespace string, name string) (*corev1.Service, error) +type GetServiceWithLabelFunc func(ctx context.Context, namespace string, labelMap map[string]string) (*corev1.ServiceList, error) type UpdateK8ObjectFunc func(ctx context.Context, object runtime.Object) error type DeleteK8ObjectFunc func(ctx context.Context, object runtime.Object) error type K8Cluster struct { GetDeploymentsWithLabelFunc GetDeploymentsWithLabelFunc GetServiceFunc GetServiceFunc + GetServicesWithLabelFunc GetServiceWithLabelFunc CreateK8ObjectFunc CreateK8ObjectFunc UpdateK8ObjectFunc UpdateK8ObjectFunc DeleteK8ObjectFunc DeleteK8ObjectFunc @@ -29,6 +31,13 @@ func (m *K8Cluster) GetDeploymentsWithLabel(ctx context.Context, namespace strin return nil, nil } +func (m *K8Cluster) GetServicesWithLabel(ctx context.Context, namespace string, labelMap map[string]string) (*corev1.ServiceList, error) { + if m.GetDeploymentsWithLabelFunc != nil { + return m.GetServicesWithLabelFunc(ctx, namespace, labelMap) + } + return nil, nil +} + func (m *K8Cluster) GetService(ctx context.Context, namespace string, name string) (*corev1.Service, error) { if m.GetServiceFunc != nil { return m.GetServiceFunc(ctx, namespace, name)