diff --git a/pkg/controller/flink/flink.go b/pkg/controller/flink/flink.go index 8025eb7f..ab61b9c5 100644 --- a/pkg/controller/flink/flink.go +++ b/pkg/controller/flink/flink.go @@ -156,6 +156,17 @@ func GetActiveFlinkJob(jobs []client.FlinkJob) *client.FlinkJob { return nil } +// returns the deployments for the application +func (f *Controller) getDeploymentsForApplication(ctx context.Context, application *v1alpha1.FlinkApplication) (*v1.DeploymentList, error) { + labelMap := GetAppHashSelector(application) + depList, err := f.k8Cluster.GetDeploymentsWithLabel(ctx, application.Namespace, labelMap) + if err != nil { + logger.Warnf(ctx, "Failed to get deployments for label map %v", labelMap) + return nil, err + } + return depList, nil +} + // returns true iff the deployment exactly matches the flink application func (f *Controller) deploymentMatches(ctx context.Context, deployment *v1.Deployment, application *v1alpha1.FlinkApplication) bool { if DeploymentIsTaskmanager(deployment) { @@ -258,15 +269,12 @@ func (f *Controller) GetSavepointStatus(ctx context.Context, application *v1alph } func (f *Controller) IsClusterReady(ctx context.Context, application *v1alpha1.FlinkApplication) (bool, error) { - labelMap := GetAppHashSelector(application) - - deploymentList, err := f.k8Cluster.GetDeploymentsWithLabel(ctx, application.Namespace, labelMap) + deploymentList, err := f.getDeploymentsForApplication(ctx, application) if err != nil { - logger.Warnf(ctx, "Failed to get deployments for label map %v", labelMap) return false, err } if deploymentList == nil || len(deploymentList.Items) == 0 { - logger.Infof(ctx, "No deployments present for label map %v", labelMap) + logger.Infof(ctx, "No deployments present for application") return false, nil } @@ -440,36 +448,41 @@ func (f *Controller) LogEvent(ctx context.Context, app *v1alpha1.FlinkApplicatio // Gets and updates the cluster status func (f *Controller) CompareAndUpdateClusterStatus(ctx context.Context, application *v1alpha1.FlinkApplication, hash string) (bool, error) { + // Error retrieving cluster / taskmanagers overview (after startup/readiness) --> Red + // If there is an error this loop will return with Health set to Red oldClusterStatus := application.Status.ClusterStatus - clusterErrors := "" + application.Status.ClusterStatus.Health = v1alpha1.Red + + deploymentList, err := f.getDeploymentsForApplication(ctx, application) + if err != nil || deploymentList == nil { + return false, err + } + tmDeployment := getTaskManagerDeployment(deploymentList.Items, application) + if tmDeployment == nil { + logger.Error(ctx, "Unable to find task manager deployment") + return false, nil + } + application.Status.ClusterStatus.NumberOfTaskManagers = tmDeployment.Status.AvailableReplicas // Get Cluster overview response, err := f.flinkClient.GetClusterOverview(ctx, getURLFromApp(application, hash)) - if err != nil { - clusterErrors = err.Error() - } else { - // Update cluster overview - application.Status.ClusterStatus.NumberOfTaskManagers = response.TaskManagerCount - application.Status.ClusterStatus.AvailableTaskSlots = response.SlotsAvailable - application.Status.ClusterStatus.NumberOfTaskSlots = response.NumberOfTaskSlots + return false, err } + // Update cluster overview + application.Status.ClusterStatus.AvailableTaskSlots = response.SlotsAvailable + application.Status.ClusterStatus.NumberOfTaskSlots = response.NumberOfTaskSlots // Get Healthy Taskmanagers tmResponse, tmErr := f.flinkClient.GetTaskManagers(ctx, getURLFromApp(application, hash)) if tmErr != nil { - clusterErrors += tmErr.Error() - } else { - application.Status.ClusterStatus.HealthyTaskManagers = getHealthyTaskManagerCount(tmResponse) + return false, tmErr } + application.Status.ClusterStatus.HealthyTaskManagers = getHealthyTaskManagerCount(tmResponse) + // Determine Health of the cluster. - // Error retrieving cluster / taskmanagers overview (after startup/readiness) --> Red // Healthy TaskManagers == Number of taskmanagers --> Green // Else --> Yellow - if clusterErrors != "" && (application.Status.Phase != v1alpha1.FlinkApplicationClusterStarting && - application.Status.Phase != v1alpha1.FlinkApplicationSubmittingJob) { - application.Status.ClusterStatus.Health = v1alpha1.Red - return false, errors.New(clusterErrors) - } else if application.Status.ClusterStatus.HealthyTaskManagers == application.Status.ClusterStatus.NumberOfTaskManagers { + if application.Status.ClusterStatus.HealthyTaskManagers == tmDeployment.Status.Replicas { application.Status.ClusterStatus.Health = v1alpha1.Green } else { application.Status.ClusterStatus.Health = v1alpha1.Yellow diff --git a/pkg/controller/flink/flink_test.go b/pkg/controller/flink/flink_test.go index 9f3c4e08..267ffd0f 100644 --- a/pkg/controller/flink/flink_test.go +++ b/pkg/controller/flink/flink_test.go @@ -79,10 +79,10 @@ func TestFlinkIsClusterReady(t *testing.T) { mockK8Cluster.GetDeploymentsWithLabelFunc = func(ctx context.Context, namespace string, labelMap map[string]string) (*v1.DeploymentList, error) { assert.Equal(t, testNamespace, namespace) assert.Equal(t, labelMapVal, labelMap) - jmDeployment := FetchTaskMangerDeploymentCreateObj(&flinkApp, testAppHash) + jmDeployment := FetchJobMangerDeploymentCreateObj(&flinkApp, testAppHash) jmDeployment.Status.AvailableReplicas = 1 - tmDeployment := FetchJobMangerDeploymentCreateObj(&flinkApp, testAppHash) + tmDeployment := FetchTaskMangerDeploymentCreateObj(&flinkApp, testAppHash) tmDeployment.Status.AvailableReplicas = *tmDeployment.Spec.Replicas return &v1.DeploymentList{ Items: []v1.Deployment{ @@ -562,6 +562,25 @@ func TestClusterStatusUpdated(t *testing.T) { flinkControllerForTest := getTestFlinkController() flinkApp := getFlinkTestApp() + labelMapVal := map[string]string{ + "flink-app-hash": testAppHash, + } + mockK8Cluster := flinkControllerForTest.k8Cluster.(*k8mock.K8Cluster) + mockK8Cluster.GetDeploymentsWithLabelFunc = func(ctx context.Context, namespace string, labelMap map[string]string) (*v1.DeploymentList, error) { + assert.Equal(t, testNamespace, namespace) + assert.Equal(t, labelMapVal, labelMap) + + tmDeployment := FetchTaskMangerDeploymentCreateObj(&flinkApp, testAppHash) + tmDeployment.Status.AvailableReplicas = *tmDeployment.Spec.Replicas + tmDeployment.Status.Replicas = *tmDeployment.Spec.Replicas + + return &v1.DeploymentList{ + Items: []v1.Deployment{ + *tmDeployment, + }, + }, nil + } + mockJmClient := flinkControllerForTest.flinkClient.(*clientMock.JobManagerClient) mockJmClient.GetClusterOverviewFunc = func(ctx context.Context, url string) (*client.ClusterOverviewResponse, error) { assert.Equal(t, url, "http://app-name-hash.ns:8081") @@ -602,6 +621,19 @@ func TestNoClusterStatusChange(t *testing.T) { flinkApp.Status.ClusterStatus.HealthyTaskManagers = int32(1) flinkApp.Status.ClusterStatus.Health = v1alpha1.Green flinkApp.Status.ClusterStatus.NumberOfTaskManagers = int32(1) + mockK8Cluster := flinkControllerForTest.k8Cluster.(*k8mock.K8Cluster) + mockK8Cluster.GetDeploymentsWithLabelFunc = func(ctx context.Context, namespace string, labelMap map[string]string) (*v1.DeploymentList, error) { + tmDeployment := FetchTaskMangerDeploymentCreateObj(&flinkApp, testAppHash) + tmDeployment.Status.AvailableReplicas = 1 + tmDeployment.Status.Replicas = 1 + + return &v1.DeploymentList{ + Items: []v1.Deployment{ + *tmDeployment, + }, + }, nil + } + mockJmClient := flinkControllerForTest.flinkClient.(*clientMock.JobManagerClient) mockJmClient.GetClusterOverviewFunc = func(ctx context.Context, url string) (*client.ClusterOverviewResponse, error) { assert.Equal(t, url, "http://app-name-hash.ns:8081") @@ -624,7 +656,6 @@ func TestNoClusterStatusChange(t *testing.T) { }, }, nil } - hasClusterStatusChanged, err := flinkControllerForTest.CompareAndUpdateClusterStatus(context.Background(), &flinkApp, "hash") assert.Nil(t, err) assert.False(t, hasClusterStatusChanged) @@ -634,6 +665,19 @@ func TestHealthyTaskmanagers(t *testing.T) { flinkControllerForTest := getTestFlinkController() flinkApp := getFlinkTestApp() + mockK8Cluster := flinkControllerForTest.k8Cluster.(*k8mock.K8Cluster) + mockK8Cluster.GetDeploymentsWithLabelFunc = func(ctx context.Context, namespace string, labelMap map[string]string) (*v1.DeploymentList, error) { + tmDeployment := FetchTaskMangerDeploymentCreateObj(&flinkApp, testAppHash) + tmDeployment.Status.AvailableReplicas = 1 + tmDeployment.Status.Replicas = 1 + + return &v1.DeploymentList{ + Items: []v1.Deployment{ + *tmDeployment, + }, + }, nil + } + mockJmClient := flinkControllerForTest.flinkClient.(*clientMock.JobManagerClient) mockJmClient.GetClusterOverviewFunc = func(ctx context.Context, url string) (*client.ClusterOverviewResponse, error) { diff --git a/pkg/controller/flinkapplication/flink_state_machine.go b/pkg/controller/flinkapplication/flink_state_machine.go index b1f22c8a..d9c9076c 100644 --- a/pkg/controller/flinkapplication/flink_state_machine.go +++ b/pkg/controller/flinkapplication/flink_state_machine.go @@ -342,6 +342,12 @@ func (s *FlinkStateMachine) handleSubmittingJob(ctx context.Context, app *v1alph return err } + // Update status of the cluster + _, clusterErr := s.flinkController.CompareAndUpdateClusterStatus(ctx, app, hash) + if clusterErr != nil { + logger.Errorf(ctx, "Updating cluster status failed with error: %v", clusterErr) + } + activeJob, err := s.submitJobIfNeeded(ctx, app, hash, app.Spec.JarName, app.Spec.Parallelism, app.Spec.EntryClass, app.Spec.ProgramArgs) if err != nil {