Skip to content

Commit

Permalink
api(ticdc): forward changefeed related api to changefeed owner capture (
Browse files Browse the repository at this point in the history
#9412)

close #9411
  • Loading branch information
sdojjy authored Jul 20, 2023
1 parent 094ffad commit 1a67111
Show file tree
Hide file tree
Showing 9 changed files with 116 additions and 37 deletions.
52 changes: 38 additions & 14 deletions cdc/api/middleware/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/tiflow/cdc/capture"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/errors"
cerror "github.com/pingcap/tiflow/pkg/errors"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -99,21 +100,13 @@ func ForwardToControllerMiddleware(p capture.Capture) gin.HandlerFunc {
// ForwardToChangefeedOwnerMiddleware forward a request to controller if current server
// is not the changefeed owner, or handle it locally.
func ForwardToChangefeedOwnerMiddleware(p capture.Capture,
changefeedID func(ctx *gin.Context) model.ChangeFeedID,
changefeedIDFunc func(ctx *gin.Context) model.ChangeFeedID,
) gin.HandlerFunc {
return func(ctx *gin.Context) {
// currently not only controller capture has the owner, remove this check in the future
if p.StatusProvider() != nil {
ok, err := p.StatusProvider().IsChangefeedOwner(ctx, changefeedID(ctx))
if err != nil {
_ = ctx.Error(err)
return
}
// this capture is the changefeed owner's capture, handle this request directly
if ok {
ctx.Next()
return
}
changefeedID := changefeedIDFunc(ctx)
// check if this capture is the changefeed owner
if handleRequestIfIsChnagefeedOwner(ctx, p, changefeedID) {
return
}

// forward to the controller to find the changefeed owner capture
Expand All @@ -131,17 +124,48 @@ func ForwardToChangefeedOwnerMiddleware(p capture.Capture,
_ = ctx.Error(err)
return
}
// controller check if the changefeed is exists, so we don't need to forward again
ok, err := controller.IsChangefeedExists(ctx, changefeedID)
if err != nil {
_ = ctx.Error(err)
return
}
if !ok {
_ = ctx.Error(cerror.ErrChangeFeedNotExists.GenWithStackByArgs(changefeedID))
return
}

info, err := p.Info()
if err != nil {
_ = ctx.Error(err)
return
}
changefeedCaptureOwner := controller.GetChangefeedOwnerCaptureInfo(model.ChangeFeedID{})
changefeedCaptureOwner := controller.GetChangefeedOwnerCaptureInfo(changefeedID)
if changefeedCaptureOwner.ID == info.ID {
return
}
api.ForwardToCapture(ctx, info.ID, changefeedCaptureOwner.AdvertiseAddr)
ctx.Abort()
}
}

func handleRequestIfIsChnagefeedOwner(ctx *gin.Context, p capture.Capture, changefeedID model.ChangeFeedID) bool {
// currently not only controller capture has the owner, remove this check in the future
if p.StatusProvider() != nil {
ok, err := p.StatusProvider().IsChangefeedOwner(ctx, changefeedID)
if err != nil {
_ = ctx.Error(err)
return true
}
// this capture is the changefeed owner's capture, handle this request directly
if ok {
ctx.Next()
return true
}
}
return false
}

// CheckServerReadyMiddleware checks if the server is ready
func CheckServerReadyMiddleware(capture capture.Capture) gin.HandlerFunc {
return func(c *gin.Context) {
Expand Down
11 changes: 7 additions & 4 deletions cdc/api/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,13 +219,16 @@ func ForwardToCapture(c *gin.Context, fromID, toAddr string) {
req.Header.Add(k, vv)
}
}
c.Header(forwardFromCapture, fromID)
lastForwardTimes++
c.Header(forwardTimes, strconv.Itoa(int(lastForwardTimes)))
log.Info("forwarding request to capture",
zap.String("url", c.Request.RequestURI),
zap.String("method", c.Request.Method),
zap.String("fromID", fromID),
zap.String("toAddr", toAddr))
zap.String("toAddr", toAddr),
zap.String("forwardTimes", timeStr))

req.Header.Add(forwardFromCapture, fromID)
lastForwardTimes++
req.Header.Add(forwardTimes, strconv.Itoa(int(lastForwardTimes)))
// forward toAddr owner
cli, err := httputil.NewClient(security)
if err != nil {
Expand Down
35 changes: 18 additions & 17 deletions cdc/api/v2/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,45 +48,46 @@ func RegisterOpenAPIV2Routes(router *gin.Engine, api OpenAPIV2) {
v2.GET("status", api.serverStatus)
v2.POST("log", api.setLogLevel)

controllerMiddleware := middleware.ForwardToControllerMiddleware(api.capture)
changefeedOwnerMiddleware := middleware.ForwardToChangefeedOwnerMiddleware(api.capture, GetChangefeedFromRequest)

// changefeed apis
changefeedGroup := v2.Group("/changefeeds")
changefeedGroup.Use(middleware.ForwardToControllerMiddleware(api.capture))
changefeedGroup.GET("/:changefeed_id", api.getChangeFeed)
changefeedGroup.POST("", api.createChangefeed)
changefeedGroup.GET("", api.listChangeFeeds)
changefeedGroup.PUT("/:changefeed_id", api.updateChangefeed)
changefeedGroup.DELETE("/:changefeed_id", api.deleteChangefeed)
changefeedGroup.GET("/:changefeed_id/meta_info", api.getChangeFeedMetaInfo)
changefeedGroup.POST("/:changefeed_id/resume", api.resumeChangefeed)
changefeedGroup.POST("/:changefeed_id/pause", api.pauseChangefeed)
changefeedGroup.GET("/:changefeed_id/status", api.status)
changefeedGroup.GET("/:changefeed_id", changefeedOwnerMiddleware, api.getChangeFeed)
changefeedGroup.POST("", controllerMiddleware, api.createChangefeed)
changefeedGroup.GET("", controllerMiddleware, api.listChangeFeeds)
changefeedGroup.PUT("/:changefeed_id", controllerMiddleware, api.updateChangefeed)
changefeedGroup.DELETE("/:changefeed_id", changefeedOwnerMiddleware, api.deleteChangefeed)
changefeedGroup.GET("/:changefeed_id/meta_info", controllerMiddleware, api.getChangeFeedMetaInfo)
changefeedGroup.POST("/:changefeed_id/resume", changefeedOwnerMiddleware, api.resumeChangefeed)
changefeedGroup.POST("/:changefeed_id/pause", changefeedOwnerMiddleware, api.pauseChangefeed)
changefeedGroup.GET("/:changefeed_id/status", changefeedOwnerMiddleware, api.status)

// capture apis
captureGroup := v2.Group("/captures")
captureGroup.Use(middleware.ForwardToControllerMiddleware(api.capture))
captureGroup.Use(controllerMiddleware)
captureGroup.POST("/:capture_id/drain", api.drainCapture)
captureGroup.GET("", api.listCaptures)

// processor apis
processorGroup := v2.Group("/processors")
processorGroup.Use(middleware.ForwardToControllerMiddleware(api.capture))
processorGroup.GET("/:changefeed_id/:capture_id", api.getProcessor)
processorGroup.GET("", api.listProcessors)
processorGroup.GET("/:changefeed_id/:capture_id", changefeedOwnerMiddleware, api.getProcessor)
processorGroup.GET("", controllerMiddleware, api.listProcessors)

verifyTableGroup := v2.Group("/verify_table")
verifyTableGroup.Use(middleware.ForwardToControllerMiddleware(api.capture))
verifyTableGroup.Use(controllerMiddleware)
verifyTableGroup.POST("", api.verifyTable)

// unsafe apis
unsafeGroup := v2.Group("/unsafe")
unsafeGroup.Use(middleware.ForwardToControllerMiddleware(api.capture))
unsafeGroup.Use(controllerMiddleware)
unsafeGroup.GET("/metadata", api.CDCMetaData)
unsafeGroup.POST("/resolve_lock", api.ResolveLock)
unsafeGroup.DELETE("/service_gc_safepoint", api.DeleteServiceGcSafePoint)

// owner apis
ownerGroup := v2.Group("/owner")
unsafeGroup.Use(middleware.ForwardToControllerMiddleware(api.capture))
unsafeGroup.Use(controllerMiddleware)
ownerGroup.POST("/resign", api.resignController)

// common APIs
Expand Down
4 changes: 4 additions & 0 deletions cdc/api/v2/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,7 @@ func (m *mockStatusProvider) GetAllChangeFeedStatuses(_ context.Context) (
) {
return m.changefeedStatuses, m.err
}

func (m *mockStatusProvider) IsChangefeedOwner(_ context.Context, id model.ChangeFeedID) (bool, error) {
return true, nil
}
3 changes: 3 additions & 0 deletions cdc/api/v2/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ func TestGetChangeFeed(t *testing.T) {
statusProvider := &mockStatusProvider{}
cp := mock_capture.NewMockCapture(gomock.NewController(t))
cp.EXPECT().IsReady().Return(true).AnyTimes()
cp.EXPECT().StatusProvider().Return(statusProvider).AnyTimes()
cp.EXPECT().IsController().Return(true).AnyTimes()

apiV2 := NewOpenAPIV2ForTest(cp, APIV2HelpersImpl{})
Expand Down Expand Up @@ -813,6 +814,8 @@ func TestDeleteChangefeed(t *testing.T) {
etcdClient := mock_etcd.NewMockCDCEtcdClient(gomock.NewController(t))
mockUpManager := upstream.NewManager4Test(pdClient)
statusProvider := mock_owner.NewMockStatusProvider(gomock.NewController(t))
statusProvider.EXPECT().IsChangefeedOwner(gomock.Any(), gomock.Any()).
Return(true, nil).AnyTimes()

etcdClient.EXPECT().
GetEnsureGCServiceID(gomock.Any()).
Expand Down
13 changes: 11 additions & 2 deletions cdc/api/v2/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/golang/mock/gomock"
mock_capture "github.com/pingcap/tiflow/cdc/capture/mock"
"github.com/pingcap/tiflow/cdc/model"
mock_owner "github.com/pingcap/tiflow/cdc/owner/mock"
"github.com/stretchr/testify/require"
)

Expand All @@ -36,8 +37,12 @@ func TestGetProcessor(t *testing.T) {

// case 1: invalid changefeed id.
{
cp := mock_capture.NewMockCapture(gomock.NewController(t))
ctl := gomock.NewController(t)
cp := mock_capture.NewMockCapture(ctl)
cp.EXPECT().IsReady().Return(true).AnyTimes()
statusProvider := mock_owner.NewMockStatusProvider(ctl)
statusProvider.EXPECT().IsChangefeedOwner(gomock.Any(), gomock.Any()).Return(true, nil).AnyTimes()
cp.EXPECT().StatusProvider().Return(statusProvider).AnyTimes()
cp.EXPECT().IsController().Return(true).AnyTimes()

apiV2 := NewOpenAPIV2ForTest(cp, APIV2HelpersImpl{})
Expand All @@ -64,8 +69,12 @@ func TestGetProcessor(t *testing.T) {

// case 2: invalid capture id.
{
cp := mock_capture.NewMockCapture(gomock.NewController(t))
ctl := gomock.NewController(t)
cp := mock_capture.NewMockCapture(ctl)
cp.EXPECT().IsReady().Return(true).AnyTimes()
statusProvider := mock_owner.NewMockStatusProvider(ctl)
statusProvider.EXPECT().IsChangefeedOwner(gomock.Any(), gomock.Any()).Return(true, nil).AnyTimes()
cp.EXPECT().StatusProvider().Return(statusProvider).AnyTimes()
cp.EXPECT().IsController().Return(true).AnyTimes()

apiV2 := NewOpenAPIV2ForTest(cp, APIV2HelpersImpl{})
Expand Down
1 change: 1 addition & 0 deletions cdc/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type Controller interface {
map[model.ChangeFeedID]uint64, error,
)
GetCaptures(ctx context.Context) ([]*model.CaptureInfo, error)
IsChangefeedExists(ctx context.Context, id model.ChangeFeedID) (bool, error)
}

type controllerImpl struct {
Expand Down
15 changes: 15 additions & 0 deletions cdc/controller/mock/controller_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions cdc/controller/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ const (
QueryAllChangeFeedInfo
// QueryCaptures is the type of query captures info.
QueryCaptures
// QueryExists is the type of query check if a changefeed is exist
QueryExists
)

// Query wraps query command and return results.
Expand All @@ -55,6 +57,7 @@ func (o *controllerImpl) GetCaptures(ctx context.Context) ([]*model.CaptureInfo,
return query.Data.([]*model.CaptureInfo), nil
}

// GetAllChangeFeedInfo returns all changefeed infos
func (o *controllerImpl) GetAllChangeFeedInfo(ctx context.Context) (
map[model.ChangeFeedID]*model.ChangeFeedInfo, error,
) {
Expand All @@ -67,6 +70,7 @@ func (o *controllerImpl) GetAllChangeFeedInfo(ctx context.Context) (
return query.Data.(map[model.ChangeFeedID]*model.ChangeFeedInfo), nil
}

// GetAllChangeFeedCheckpointTs returns all changefeed checkpoints
func (o *controllerImpl) GetAllChangeFeedCheckpointTs(ctx context.Context) (
map[model.ChangeFeedID]uint64, error,
) {
Expand All @@ -79,6 +83,18 @@ func (o *controllerImpl) GetAllChangeFeedCheckpointTs(ctx context.Context) (
return query.Data.(map[model.ChangeFeedID]uint64), nil
}

// IsChangefeedExists returns true if a changefeed is exits
func (o *controllerImpl) IsChangefeedExists(ctx context.Context, id model.ChangeFeedID) (bool, error) {
query := &Query{
Tp: QueryExists,
ChangeFeedID: id,
}
if err := o.sendQueryToController(ctx, query); err != nil {
return false, errors.Trace(err)
}
return query.Data.(bool), nil
}

// Query queries controller internal information.
func (o *controllerImpl) Query(query *Query, done chan<- error) {
o.pushControllerJob(&controllerJob{
Expand Down Expand Up @@ -178,6 +194,9 @@ func (o *controllerImpl) handleQueries(query *Query) error {
})
}
query.Data = ret
case QueryExists:
_, ok := o.changefeeds[query.ChangeFeedID]
query.Data = ok
}
return nil
}

0 comments on commit 1a67111

Please sign in to comment.