Skip to content

Commit

Permalink
[STRMCMP-1133] Fix for handling nil deployments (#219)
Browse files Browse the repository at this point in the history
  • Loading branch information
premsantosh authored Oct 16, 2020
1 parent 236fe19 commit e71e249
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 0 deletions.
3 changes: 3 additions & 0 deletions pkg/controller/flinkapplication/flink_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,9 @@ func (s *FlinkStateMachine) updateGenericService(ctx context.Context, app *v1bet
}

deployments, err := s.flinkController.GetDeploymentsForHash(ctx, app, newHash)
if deployments == nil {
return errors.New("Could not find deployments for service " + service.Name)
}
if err != nil {
return err
}
Expand Down
93 changes: 93 additions & 0 deletions pkg/controller/flinkapplication/flink_state_machine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,99 @@ func TestSubmittingToRunning(t *testing.T) {
assert.Equal(t, 2, statusUpdateCount)
}

func TestHandleNilDeployments(t *testing.T) {
jobID := "j1"

app := v1beta1.FlinkApplication{
ObjectMeta: metav1.ObjectMeta{
Name: "test-app",
Namespace: "flink",
},
Spec: v1beta1.FlinkApplicationSpec{
JarName: "job.jar",
Parallelism: 5,
EntryClass: "com.my.Class",
ProgramArgs: "--test",
},
Status: v1beta1.FlinkApplicationStatus{
Phase: v1beta1.FlinkApplicationSubmittingJob,
DeployHash: "old-hash",
},
}

stateMachineForTest := getTestStateMachine()
mockFlinkController := stateMachineForTest.flinkController.(*mock.FlinkController)
mockFlinkController.IsServiceReadyFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (bool, error) {
return true, nil
}

mockFlinkController.GetJobForApplicationFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (*client.FlinkJobOverview, error) {
return &client.FlinkJobOverview{
JobID: jobID,
State: client.Running,
}, nil
}

startCount := 0
mockFlinkController.StartFlinkJobFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string,
jarName string, parallelism int32, entryClass string, programArgs string, allowNonRestoredState bool, savepointPath string) (string, error) {
startCount++
return jobID, nil
}

mockFlinkController.GetJobsForApplicationFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) ([]client.FlinkJob, error) {
if startCount > 0 {
return []client.FlinkJob{
{
JobID: jobID,
Status: client.Running,
},
}, nil
}
return nil, nil
}

mockFlinkController.GetDeploymentsForHashFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (deployment *common.FlinkDeployment, err error) {
return nil, nil
}

mockK8Cluster := stateMachineForTest.k8Cluster.(*k8mock.K8Cluster)

getServiceCount := 0
mockK8Cluster.GetServiceFunc = func(ctx context.Context, namespace string, name string, version string) (*v1.Service, error) {
assert.Equal(t, "flink", namespace)
assert.Equal(t, "test-app", name)

getServiceCount++
return &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "test-app",
Namespace: "flink",
},
Spec: v1.ServiceSpec{
Selector: map[string]string{
"pod-deployment-selector": "blah",
},
},
}, nil
}

updateCount := 0
mockK8Cluster.UpdateK8ObjectFunc = func(ctx context.Context, object runtime.Object) error {
updateCount++
return nil
}

statusUpdateCount := 0
mockK8Cluster.UpdateStatusFunc = func(ctx context.Context, object runtime.Object) error {
statusUpdateCount++
return nil
}

err := stateMachineForTest.Handle(context.Background(), &app)
assert.Error(t, err)
}

func TestHandleApplicationRunning(t *testing.T) {
stateMachineForTest := getTestStateMachine()
mockFlinkController := stateMachineForTest.flinkController.(*mock.FlinkController)
Expand Down

0 comments on commit e71e249

Please sign in to comment.