Skip to content

Commit

Permalink
[STRMCMP-437] Making the checkpoint restoration age configurable (#151)
Browse files Browse the repository at this point in the history
* [STRMCMP-437] Making the checkpoint restoration age configurable

* [STRMCMP-437] Fixing lint issues

* [STRMCMP-437] Updating docs and adding unit test

* [STRMCMP-437] Updating the docs on maxCheckpointRestoreAgeSeconds
  • Loading branch information
premsantosh authored Jan 3, 2020
1 parent 2a5baaf commit 7fc2230
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 27 deletions.
3 changes: 3 additions & 0 deletions deploy/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ spec:
type: string
savepointPath:
type: string
maxCheckpointRestoreAgeSeconds:
type: integer
minimum: 1
jobManagerConfig:
type: object
properties:
Expand Down
5 changes: 5 additions & 0 deletions docs/crd.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,8 @@ Below is the list of fields in the custom resource and their description
Can be set to true to force rollback a deploy/update. The rollback is **not** performed when the application is in a **RUNNING** phase.
If an application is successfully rolled back, it is moved to a *DeployFailed* phase. Un-setting or setting `ForceRollback` to `False` will allow updates to progress normally.

* **maxCheckpointRestoreAgeSeconds** `type:int32`
Used to prevent the application from restoring state from a checkpoint whose age (in seconds) is greater than the value set. It defaults to 1 hour (3600 seconds). This config
is used during the operator update workflow. This default exists only
to protect one from accidentally restarting the application using a very old checkpoint (which might put your application
under huge load). **Note:** this doesn't affect the flink application's checkpointing mechanism in anyway.
29 changes: 15 additions & 14 deletions pkg/apis/app/v1beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,21 @@ type FlinkApplicationSpec struct {
EntryClass string `json:"entryClass,omitempty"`
ProgramArgs string `json:"programArgs,omitempty"`
// Deprecated: use SavepointPath instead
SavepointInfo SavepointInfo `json:"savepointInfo,omitempty"`
SavepointPath string `json:"savepointPath,omitempty"`
DeploymentMode DeploymentMode `json:"deploymentMode,omitempty"`
RPCPort *int32 `json:"rpcPort,omitempty"`
BlobPort *int32 `json:"blobPort,omitempty"`
QueryPort *int32 `json:"queryPort,omitempty"`
UIPort *int32 `json:"uiPort,omitempty"`
MetricsQueryPort *int32 `json:"metricsQueryPort,omitempty"`
Volumes []apiv1.Volume `json:"volumes,omitempty"`
VolumeMounts []apiv1.VolumeMount `json:"volumeMounts,omitempty"`
RestartNonce string `json:"restartNonce"`
DeleteMode DeleteMode `json:"deleteMode,omitempty"`
AllowNonRestoredState bool `json:"allowNonRestoredState,omitempty"`
ForceRollback bool `json:"forceRollback"`
SavepointInfo SavepointInfo `json:"savepointInfo,omitempty"`
SavepointPath string `json:"savepointPath,omitempty"`
DeploymentMode DeploymentMode `json:"deploymentMode,omitempty"`
RPCPort *int32 `json:"rpcPort,omitempty"`
BlobPort *int32 `json:"blobPort,omitempty"`
QueryPort *int32 `json:"queryPort,omitempty"`
UIPort *int32 `json:"uiPort,omitempty"`
MetricsQueryPort *int32 `json:"metricsQueryPort,omitempty"`
Volumes []apiv1.Volume `json:"volumes,omitempty"`
VolumeMounts []apiv1.VolumeMount `json:"volumeMounts,omitempty"`
RestartNonce string `json:"restartNonce"`
DeleteMode DeleteMode `json:"deleteMode,omitempty"`
AllowNonRestoredState bool `json:"allowNonRestoredState,omitempty"`
ForceRollback bool `json:"forceRollback"`
MaxCheckpointRestoreAgeSeconds *int32 `json:"maxCheckpointRestoreAgeSeconds,omitempty"`
}

type FlinkConfig map[string]interface{}
Expand Down
23 changes: 14 additions & 9 deletions pkg/controller/flink/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@ import (
)

const (
JobManagerDefaultReplicaCount = 1
TaskManagerDefaultSlots = 16
RPCDefaultPort = 6123
QueryDefaultPort = 6124
BlobDefaultPort = 6125
UIDefaultPort = 8081
MetricsQueryDefaultPort = 50101
OffHeapMemoryDefaultFraction = 0.5
HighAvailabilityKey = "high-availability"
JobManagerDefaultReplicaCount = 1
TaskManagerDefaultSlots = 16
RPCDefaultPort = 6123
QueryDefaultPort = 6124
BlobDefaultPort = 6125
UIDefaultPort = 8081
MetricsQueryDefaultPort = 50101
OffHeapMemoryDefaultFraction = 0.5
HighAvailabilityKey = "high-availability"
MaxCheckpointRestoreAgeSeconds = 3600
)

func firstNonNil(x *int32, y int32) int32 {
Expand Down Expand Up @@ -67,6 +68,10 @@ func getInternalMetricsQueryPort(app *v1beta1.FlinkApplication) int32 {
return firstNonNil(app.Spec.MetricsQueryPort, MetricsQueryDefaultPort)
}

func getMaxCheckpointRestoreAgeSeconds(app *v1beta1.FlinkApplication) int32 {
return firstNonNil(app.Spec.MaxCheckpointRestoreAgeSeconds, MaxCheckpointRestoreAgeSeconds)
}

func getTaskManagerMemory(application *v1beta1.FlinkApplication) int64 {
tmResources := application.Spec.TaskManagerConfig.Resources
if tmResources == nil {
Expand Down
9 changes: 5 additions & 4 deletions pkg/controller/flink/flink.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ import (
const proxyURL = "http://localhost:%d/api/v1/namespaces/%s/services/%s:8081/proxy"
const port = 8081

// Maximum age of an externalized checkpoint that we will attempt to restore
const maxRestoreCheckpointAge = 24 * time.Hour

// If the last hearbeat from a taskmanager was more than taskManagerHeartbeatThreshold, the task
// manager is considered unhealthy.
const taskManagerHeartbeatThreshold = 2 * time.Minute
Expand Down Expand Up @@ -457,14 +454,18 @@ func (f *Controller) FindExternalizedCheckpoint(ctx context.Context, application
return "", nil
}

if time.Since(time.Unix(checkpoint.TriggerTimestamp, 0)) > maxRestoreCheckpointAge {
if isCheckpointOldToRecover(checkpoint.TriggerTimestamp, getMaxCheckpointRestoreAgeSeconds(application)) {
logger.Info(ctx, "Found checkpoint to restore from, but was too old")
return "", nil
}

return checkpoint.ExternalPath, nil
}

func isCheckpointOldToRecover(checkpointTime int64, maxCheckpointRecoveryAgeSec int32) bool {
return time.Since(time.Unix(checkpointTime, 0)) > (time.Duration(maxCheckpointRecoveryAgeSec) * time.Second)
}

func (f *Controller) LogEvent(ctx context.Context, app *v1beta1.FlinkApplication, eventType string, reason string, message string) {
f.eventRecorder.Event(app, eventType, reason, message)
logger.Infof(ctx, "Logged %s event: %s: %s", eventType, reason, message)
Expand Down
8 changes: 8 additions & 0 deletions pkg/controller/flink/flink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -920,3 +920,11 @@ func TestGetAndUpdateJobStatusHealth(t *testing.T) {
assert.Equal(t, app1.Status.JobStatus.Health, v1beta1.Red)

}

func TestMaxCheckpointRestoreAge(t *testing.T) {
// Test invalid checkpoint that cannot be recovered from. Recovery age is 10 minutes
assert.True(t, isCheckpointOldToRecover(time.Now().Unix()-700, 600))

// Test valid checkpoint that can be recovered. Recovery age is 10 minutes
assert.False(t, isCheckpointOldToRecover(time.Now().Unix()-100, 600))
}

0 comments on commit 7fc2230

Please sign in to comment.