Skip to content

Commit

Permalink
Update logic around number and healthy task managers in status (#37)
Browse files Browse the repository at this point in the history
  • Loading branch information
anandswaminathan committed Jun 27, 2019
1 parent 6d92339 commit cde4f38
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 25 deletions.
57 changes: 35 additions & 22 deletions pkg/controller/flink/flink.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,17 @@ func GetActiveFlinkJob(jobs []client.FlinkJob) *client.FlinkJob {
return nil
}

// returns the deployments for the application
func (f *Controller) getDeploymentsForApplication(ctx context.Context, application *v1alpha1.FlinkApplication) (*v1.DeploymentList, error) {
labelMap := GetAppHashSelector(application)
depList, err := f.k8Cluster.GetDeploymentsWithLabel(ctx, application.Namespace, labelMap)
if err != nil {
logger.Warnf(ctx, "Failed to get deployments for label map %v", labelMap)
return nil, err
}
return depList, nil
}

// returns true iff the deployment exactly matches the flink application
func (f *Controller) deploymentMatches(ctx context.Context, deployment *v1.Deployment, application *v1alpha1.FlinkApplication) bool {
if DeploymentIsTaskmanager(deployment) {
Expand Down Expand Up @@ -258,15 +269,12 @@ func (f *Controller) GetSavepointStatus(ctx context.Context, application *v1alph
}

func (f *Controller) IsClusterReady(ctx context.Context, application *v1alpha1.FlinkApplication) (bool, error) {
labelMap := GetAppHashSelector(application)

deploymentList, err := f.k8Cluster.GetDeploymentsWithLabel(ctx, application.Namespace, labelMap)
deploymentList, err := f.getDeploymentsForApplication(ctx, application)
if err != nil {
logger.Warnf(ctx, "Failed to get deployments for label map %v", labelMap)
return false, err
}
if deploymentList == nil || len(deploymentList.Items) == 0 {
logger.Infof(ctx, "No deployments present for label map %v", labelMap)
logger.Infof(ctx, "No deployments present for application")
return false, nil
}

Expand Down Expand Up @@ -440,36 +448,41 @@ func (f *Controller) LogEvent(ctx context.Context, app *v1alpha1.FlinkApplicatio

// Gets and updates the cluster status
func (f *Controller) CompareAndUpdateClusterStatus(ctx context.Context, application *v1alpha1.FlinkApplication, hash string) (bool, error) {
// Error retrieving cluster / taskmanagers overview (after startup/readiness) --> Red
// If there is an error this loop will return with Health set to Red
oldClusterStatus := application.Status.ClusterStatus
clusterErrors := ""
application.Status.ClusterStatus.Health = v1alpha1.Red

deploymentList, err := f.getDeploymentsForApplication(ctx, application)
if err != nil || deploymentList == nil {
return false, err
}
tmDeployment := getTaskManagerDeployment(deploymentList.Items, application)
if tmDeployment == nil {
logger.Error(ctx, "Unable to find task manager deployment")
return false, nil
}
application.Status.ClusterStatus.NumberOfTaskManagers = tmDeployment.Status.AvailableReplicas
// Get Cluster overview
response, err := f.flinkClient.GetClusterOverview(ctx, getURLFromApp(application, hash))

if err != nil {
clusterErrors = err.Error()
} else {
// Update cluster overview
application.Status.ClusterStatus.NumberOfTaskManagers = response.TaskManagerCount
application.Status.ClusterStatus.AvailableTaskSlots = response.SlotsAvailable
application.Status.ClusterStatus.NumberOfTaskSlots = response.NumberOfTaskSlots
return false, err
}
// Update cluster overview
application.Status.ClusterStatus.AvailableTaskSlots = response.SlotsAvailable
application.Status.ClusterStatus.NumberOfTaskSlots = response.NumberOfTaskSlots

// Get Healthy Taskmanagers
tmResponse, tmErr := f.flinkClient.GetTaskManagers(ctx, getURLFromApp(application, hash))
if tmErr != nil {
clusterErrors += tmErr.Error()
} else {
application.Status.ClusterStatus.HealthyTaskManagers = getHealthyTaskManagerCount(tmResponse)
return false, tmErr
}
application.Status.ClusterStatus.HealthyTaskManagers = getHealthyTaskManagerCount(tmResponse)

// Determine Health of the cluster.
// Error retrieving cluster / taskmanagers overview (after startup/readiness) --> Red
// Healthy TaskManagers == Number of taskmanagers --> Green
// Else --> Yellow
if clusterErrors != "" && (application.Status.Phase != v1alpha1.FlinkApplicationClusterStarting &&
application.Status.Phase != v1alpha1.FlinkApplicationSubmittingJob) {
application.Status.ClusterStatus.Health = v1alpha1.Red
return false, errors.New(clusterErrors)
} else if application.Status.ClusterStatus.HealthyTaskManagers == application.Status.ClusterStatus.NumberOfTaskManagers {
if application.Status.ClusterStatus.HealthyTaskManagers == tmDeployment.Status.Replicas {
application.Status.ClusterStatus.Health = v1alpha1.Green
} else {
application.Status.ClusterStatus.Health = v1alpha1.Yellow
Expand Down
50 changes: 47 additions & 3 deletions pkg/controller/flink/flink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,10 @@ func TestFlinkIsClusterReady(t *testing.T) {
mockK8Cluster.GetDeploymentsWithLabelFunc = func(ctx context.Context, namespace string, labelMap map[string]string) (*v1.DeploymentList, error) {
assert.Equal(t, testNamespace, namespace)
assert.Equal(t, labelMapVal, labelMap)
jmDeployment := FetchTaskMangerDeploymentCreateObj(&flinkApp, testAppHash)
jmDeployment := FetchJobMangerDeploymentCreateObj(&flinkApp, testAppHash)
jmDeployment.Status.AvailableReplicas = 1

tmDeployment := FetchJobMangerDeploymentCreateObj(&flinkApp, testAppHash)
tmDeployment := FetchTaskMangerDeploymentCreateObj(&flinkApp, testAppHash)
tmDeployment.Status.AvailableReplicas = *tmDeployment.Spec.Replicas
return &v1.DeploymentList{
Items: []v1.Deployment{
Expand Down Expand Up @@ -562,6 +562,25 @@ func TestClusterStatusUpdated(t *testing.T) {
flinkControllerForTest := getTestFlinkController()
flinkApp := getFlinkTestApp()

labelMapVal := map[string]string{
"flink-app-hash": testAppHash,
}
mockK8Cluster := flinkControllerForTest.k8Cluster.(*k8mock.K8Cluster)
mockK8Cluster.GetDeploymentsWithLabelFunc = func(ctx context.Context, namespace string, labelMap map[string]string) (*v1.DeploymentList, error) {
assert.Equal(t, testNamespace, namespace)
assert.Equal(t, labelMapVal, labelMap)

tmDeployment := FetchTaskMangerDeploymentCreateObj(&flinkApp, testAppHash)
tmDeployment.Status.AvailableReplicas = *tmDeployment.Spec.Replicas
tmDeployment.Status.Replicas = *tmDeployment.Spec.Replicas

return &v1.DeploymentList{
Items: []v1.Deployment{
*tmDeployment,
},
}, nil
}

mockJmClient := flinkControllerForTest.flinkClient.(*clientMock.JobManagerClient)
mockJmClient.GetClusterOverviewFunc = func(ctx context.Context, url string) (*client.ClusterOverviewResponse, error) {
assert.Equal(t, url, "http://app-name-hash.ns:8081")
Expand Down Expand Up @@ -602,6 +621,19 @@ func TestNoClusterStatusChange(t *testing.T) {
flinkApp.Status.ClusterStatus.HealthyTaskManagers = int32(1)
flinkApp.Status.ClusterStatus.Health = v1alpha1.Green
flinkApp.Status.ClusterStatus.NumberOfTaskManagers = int32(1)
mockK8Cluster := flinkControllerForTest.k8Cluster.(*k8mock.K8Cluster)
mockK8Cluster.GetDeploymentsWithLabelFunc = func(ctx context.Context, namespace string, labelMap map[string]string) (*v1.DeploymentList, error) {
tmDeployment := FetchTaskMangerDeploymentCreateObj(&flinkApp, testAppHash)
tmDeployment.Status.AvailableReplicas = 1
tmDeployment.Status.Replicas = 1

return &v1.DeploymentList{
Items: []v1.Deployment{
*tmDeployment,
},
}, nil
}

mockJmClient := flinkControllerForTest.flinkClient.(*clientMock.JobManagerClient)
mockJmClient.GetClusterOverviewFunc = func(ctx context.Context, url string) (*client.ClusterOverviewResponse, error) {
assert.Equal(t, url, "http://app-name-hash.ns:8081")
Expand All @@ -624,7 +656,6 @@ func TestNoClusterStatusChange(t *testing.T) {
},
}, nil
}

hasClusterStatusChanged, err := flinkControllerForTest.CompareAndUpdateClusterStatus(context.Background(), &flinkApp, "hash")
assert.Nil(t, err)
assert.False(t, hasClusterStatusChanged)
Expand All @@ -634,6 +665,19 @@ func TestHealthyTaskmanagers(t *testing.T) {
flinkControllerForTest := getTestFlinkController()
flinkApp := getFlinkTestApp()

mockK8Cluster := flinkControllerForTest.k8Cluster.(*k8mock.K8Cluster)
mockK8Cluster.GetDeploymentsWithLabelFunc = func(ctx context.Context, namespace string, labelMap map[string]string) (*v1.DeploymentList, error) {
tmDeployment := FetchTaskMangerDeploymentCreateObj(&flinkApp, testAppHash)
tmDeployment.Status.AvailableReplicas = 1
tmDeployment.Status.Replicas = 1

return &v1.DeploymentList{
Items: []v1.Deployment{
*tmDeployment,
},
}, nil
}

mockJmClient := flinkControllerForTest.flinkClient.(*clientMock.JobManagerClient)

mockJmClient.GetClusterOverviewFunc = func(ctx context.Context, url string) (*client.ClusterOverviewResponse, error) {
Expand Down
6 changes: 6 additions & 0 deletions pkg/controller/flinkapplication/flink_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,12 @@ func (s *FlinkStateMachine) handleSubmittingJob(ctx context.Context, app *v1alph
return err
}

// Update status of the cluster
_, clusterErr := s.flinkController.CompareAndUpdateClusterStatus(ctx, app, hash)
if clusterErr != nil {
logger.Errorf(ctx, "Updating cluster status failed with error: %v", clusterErr)
}

activeJob, err := s.submitJobIfNeeded(ctx, app, hash,
app.Spec.JarName, app.Spec.Parallelism, app.Spec.EntryClass, app.Spec.ProgramArgs)
if err != nil {
Expand Down

0 comments on commit cde4f38

Please sign in to comment.