Skip to content

Commit

Permalink
api(ticdc): Check min service gc safepoint when resume changefeed (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Feb 22, 2024
1 parent f5e2fd9 commit d49cb0b
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 18 deletions.
16 changes: 8 additions & 8 deletions cdc/api/v2/api_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ type APIV2Helpers interface {
pdClient pd.Client,
gcServiceID string,
changefeedID model.ChangeFeedID,
checkpointTs uint64,
overrideCheckpointTs uint64,
) error

// getPDClient returns a PDClient given the PD cluster addresses and a credential
Expand Down Expand Up @@ -387,24 +387,24 @@ func (APIV2HelpersImpl) verifyUpdateChangefeedConfig(
return newInfo, newUpInfo, nil
}

// verifyResumeChangefeedConfig verifies the changefeed config before resuming a changefeed
// overrideCheckpointTs is the checkpointTs of the changefeed that specified by the user.
// or it is the checkpointTs of the changefeed before it is paused.
// we need to check weather the resuming changefeed is gc safe or not.
func (APIV2HelpersImpl) verifyResumeChangefeedConfig(ctx context.Context,
pdClient pd.Client,
gcServiceID string,
changefeedID model.ChangeFeedID,
checkpointTs uint64,
overrideCheckpointTs uint64,
) error {
if checkpointTs == 0 {
return nil
}

// 1h is enough for resuming a changefeed.
gcTTL := int64(60 * 60)
err := gc.EnsureChangefeedStartTsSafety(
ctx,
pdClient,
gcServiceID,
model.DefaultChangeFeedID(changefeedID.ID),
gcTTL, checkpointTs)
changefeedID,
gcTTL, overrideCheckpointTs)
if err != nil {
if !cerror.ErrStartTsBeforeGC.Equal(err) {
return cerror.ErrPDEtcdAPIError.Wrap(err)
Expand Down
8 changes: 4 additions & 4 deletions cdc/api/v2/api_helpers_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 12 additions & 5 deletions cdc/api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,11 @@ func (h *OpenAPIV2) resumeChangefeed(c *gin.Context) {
_ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err))
return
}
status, err := h.capture.StatusProvider().GetChangeFeedStatus(ctx, changefeedID)
if err != nil {
_ = c.Error(err)
return
}

var pdClient pd.Client
// if PDAddrs is empty, use the default pdClient
Expand All @@ -694,13 +699,17 @@ func (h *OpenAPIV2) resumeChangefeed(c *gin.Context) {
}
defer pdClient.Close()
}

// If there is no overrideCheckpointTs, then check whether the currentCheckpointTs is smaller than gc safepoint or not.
newCheckpointTs := status.CheckpointTs
if cfg.OverwriteCheckpointTs != 0 {
newCheckpointTs = cfg.OverwriteCheckpointTs
}
if err := h.helpers.verifyResumeChangefeedConfig(
ctx,
pdClient,
h.capture.GetEtcdClient().GetEnsureGCServiceID(gc.EnsureGCServiceResuming),
changefeedID,
cfg.OverwriteCheckpointTs); err != nil {
newCheckpointTs); err != nil {
_ = c.Error(err)
return
}
Expand Down Expand Up @@ -728,9 +737,7 @@ func (h *OpenAPIV2) resumeChangefeed(c *gin.Context) {
}

if err := api.HandleOwnerJob(ctx, h.capture, job); err != nil {
if cfg.OverwriteCheckpointTs > 0 {
needRemoveGCSafePoint = true
}
needRemoveGCSafePoint = true
_ = c.Error(err)
return
}
Expand Down
4 changes: 3 additions & 1 deletion cdc/api/v2/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,9 @@ func TestResumeChangefeed(t *testing.T) {
pdClient := &mockPDClient{}
etcdClient := mock_etcd.NewMockCDCEtcdClient(gomock.NewController(t))
mockUpManager := upstream.NewManager4Test(pdClient)
statusProvider := &mockStatusProvider{}
statusProvider := &mockStatusProvider{
changefeedStatus: &model.ChangeFeedStatusForAPI{},
}

etcdClient.EXPECT().
GetEnsureGCServiceID(gomock.Any()).
Expand Down

0 comments on commit d49cb0b

Please sign in to comment.