Skip to content

Commit

Permalink
Merge branch 'master' into fix-4166
Browse files Browse the repository at this point in the history
  • Loading branch information
dsdashun authored Jan 25, 2022
2 parents 61fd8ae + 43a1d78 commit 9607554
Show file tree
Hide file tree
Showing 19 changed files with 1,011 additions and 637 deletions.
72 changes: 72 additions & 0 deletions cdc/api/middleware.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package api

import (
"net/http"
"time"

"github.com/gin-gonic/gin"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"go.uber.org/zap"
)

func logMiddleware() gin.HandlerFunc {
return func(c *gin.Context) {
start := time.Now()
path := c.Request.URL.Path
query := c.Request.URL.RawQuery
c.Next()

cost := time.Since(start)

err := c.Errors.Last()
var stdErr error
if err != nil {
stdErr = err.Err
}

log.Info(path,
zap.Int("status", c.Writer.Status()),
zap.String("method", c.Request.Method),
zap.String("path", path),
zap.String("query", query),
zap.String("ip", c.ClientIP()),
zap.String("user-agent", c.Request.UserAgent()),
zap.Error(stdErr),
zap.Duration("duration", cost),
)
}
}

func errorHandleMiddleware() gin.HandlerFunc {
return func(c *gin.Context) {
c.Next()
// because we will return immediately after an error occurs in http_handler
// there wil be only one error in c.Errors
lastError := c.Errors.Last()
if lastError != nil {
err := lastError.Err
// put the error into response
if IsHTTPBadRequestError(err) {
c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err))
} else {
c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err))
}
c.Abort()
return
}
}
}
115 changes: 66 additions & 49 deletions cdc/api/open.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,41 +49,62 @@ const (
// openAPI provides capture APIs.
type openAPI struct {
capture *capture.Capture
// use for unit test only
testStatusProvider owner.StatusProvider
}

// RegisterOpoenAPIRoutes registers routes for OpenAPI
func RegisterOpoenAPIRoutes(router *gin.Engine, capture *capture.Capture) {
openAPI := openAPI{capture: capture}
func NewOpenAPI(c *capture.Capture) openAPI {
return openAPI{capture: c}
}

// NewOpenAPI4Test return a openAPI for test
func NewOpenAPI4Test(c *capture.Capture, p owner.StatusProvider) openAPI {
return openAPI{capture: c, testStatusProvider: p}
}

func (h *openAPI) statusProvider() owner.StatusProvider {
if h.testStatusProvider != nil {
return h.testStatusProvider
}
return h.capture.StatusProvider()
}

// RegisterOpenAPIRoutes registers routes for OpenAPI
func RegisterOpenAPIRoutes(router *gin.Engine, api openAPI) {
v1 := router.Group("/api/v1")

v1.Use(logMiddleware())
v1.Use(errorHandleMiddleware())

// common API
router.GET("/api/v1/status", openAPI.ServerStatus)
router.GET("/api/v1/health", openAPI.Health)
router.POST("/api/v1/log", SetLogLevel)
v1.GET("/status", api.ServerStatus)
v1.GET("/health", api.Health)
v1.POST("/log", SetLogLevel)

// changefeed API
changefeedGroup := router.Group("/api/v1/changefeeds")
changefeedGroup.GET("", openAPI.ListChangefeed)
changefeedGroup.GET("/:changefeed_id", openAPI.GetChangefeed)
changefeedGroup.POST("", openAPI.CreateChangefeed)
changefeedGroup.PUT("/:changefeed_id", openAPI.UpdateChangefeed)
changefeedGroup.POST("/:changefeed_id/pause", openAPI.PauseChangefeed)
changefeedGroup.POST("/:changefeed_id/resume", openAPI.ResumeChangefeed)
changefeedGroup.DELETE("/:changefeed_id", openAPI.RemoveChangefeed)
changefeedGroup.POST("/:changefeed_id/tables/rebalance_table", openAPI.RebalanceTable)
changefeedGroup.POST("/:changefeed_id/tables/move_table", openAPI.MoveTable)
changefeedGroup := v1.Group("/changefeeds")
changefeedGroup.GET("", api.ListChangefeed)
changefeedGroup.GET("/:changefeed_id", api.GetChangefeed)
changefeedGroup.POST("", api.CreateChangefeed)
changefeedGroup.PUT("/:changefeed_id", api.UpdateChangefeed)
changefeedGroup.POST("/:changefeed_id/pause", api.PauseChangefeed)
changefeedGroup.POST("/:changefeed_id/resume", api.ResumeChangefeed)
changefeedGroup.DELETE("/:changefeed_id", api.RemoveChangefeed)
changefeedGroup.POST("/:changefeed_id/tables/rebalance_table", api.RebalanceTable)
changefeedGroup.POST("/:changefeed_id/tables/move_table", api.MoveTable)

// owner API
ownerGroup := router.Group("/api/v1/owner")
ownerGroup.POST("/resign", openAPI.ResignOwner)
ownerGroup := v1.Group("/owner")
ownerGroup.POST("/resign", api.ResignOwner)

// processor API
processorGroup := router.Group("/api/v1/processors")
processorGroup.GET("", openAPI.ListProcessor)
processorGroup.GET("/:changefeed_id/:capture_id", openAPI.GetProcessor)
processorGroup := v1.Group("/processors")
processorGroup.GET("", api.ListProcessor)
processorGroup.GET("/:changefeed_id/:capture_id", api.GetProcessor)

// capture API
captureGroup := router.Group("/api/v1/captures")
captureGroup.GET("", openAPI.ListCapture)
captureGroup := v1.Group("/captures")
captureGroup.GET("", api.ListCapture)
}

// ListChangefeed lists all changgefeeds in cdc cluster
Expand All @@ -101,17 +122,17 @@ func (h *openAPI) ListChangefeed(c *gin.Context) {
h.forwardToOwner(c)
return
}
statusProvider := h.capture.StatusProvider()

ctx := c.Request.Context()
state := c.Query(apiOpVarChangefeedState)
// get all changefeed status
statuses, err := statusProvider.GetAllChangeFeedStatuses(ctx)
statuses, err := h.statusProvider().GetAllChangeFeedStatuses(ctx)
if err != nil {
_ = c.Error(err)
return
}
// get all changefeed infos
infos, err := statusProvider.GetAllChangeFeedInfo(ctx)
infos, err := h.statusProvider().GetAllChangeFeedInfo(ctx)
if err != nil {
// this call will return a parsedError generated by the error we passed in
// so it is no need to check the parsedError
Expand Down Expand Up @@ -166,7 +187,6 @@ func (h *openAPI) GetChangefeed(c *gin.Context) {
h.forwardToOwner(c)
return
}
statusProvider := h.capture.StatusProvider()

ctx := c.Request.Context()
changefeedID := c.Param(apiOpVarChangefeedID)
Expand All @@ -175,19 +195,19 @@ func (h *openAPI) GetChangefeed(c *gin.Context) {
return
}

info, err := statusProvider.GetChangeFeedInfo(ctx, changefeedID)
info, err := h.statusProvider().GetChangeFeedInfo(ctx, changefeedID)
if err != nil {
_ = c.Error(err)
return
}

status, err := statusProvider.GetChangeFeedStatus(ctx, changefeedID)
status, err := h.statusProvider().GetChangeFeedStatus(ctx, changefeedID)
if err != nil {
_ = c.Error(err)
return
}

processorInfos, err := statusProvider.GetAllTaskStatuses(ctx, changefeedID)
processorInfos, err := h.statusProvider().GetAllTaskStatuses(ctx, changefeedID)
if err != nil {
_ = c.Error(err)
return
Expand Down Expand Up @@ -279,7 +299,7 @@ func (h *openAPI) PauseChangefeed(c *gin.Context) {
h.forwardToOwner(c)
return
}
statusProvider := h.capture.StatusProvider()

ctx := c.Request.Context()

changefeedID := c.Param(apiOpVarChangefeedID)
Expand All @@ -288,7 +308,7 @@ func (h *openAPI) PauseChangefeed(c *gin.Context) {
return
}
// check if the changefeed exists
_, err := statusProvider.GetChangeFeedStatus(ctx, changefeedID)
_, err := h.statusProvider().GetChangeFeedStatus(ctx, changefeedID)
if err != nil {
_ = c.Error(err)
return
Expand Down Expand Up @@ -322,15 +342,15 @@ func (h *openAPI) ResumeChangefeed(c *gin.Context) {
h.forwardToOwner(c)
return
}
statusProvider := h.capture.StatusProvider()

ctx := c.Request.Context()
changefeedID := c.Param(apiOpVarChangefeedID)
if err := model.ValidateChangefeedID(changefeedID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID))
return
}
// check if the changefeed exists
_, err := statusProvider.GetChangeFeedStatus(ctx, changefeedID)
_, err := h.statusProvider().GetChangeFeedStatus(ctx, changefeedID)
if err != nil {
_ = c.Error(err)
return
Expand Down Expand Up @@ -370,15 +390,15 @@ func (h *openAPI) UpdateChangefeed(c *gin.Context) {
h.forwardToOwner(c)
return
}
statusProvider := h.capture.StatusProvider()

ctx := c.Request.Context()
changefeedID := c.Param(apiOpVarChangefeedID)

if err := model.ValidateChangefeedID(changefeedID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID))
return
}
info, err := statusProvider.GetChangeFeedInfo(ctx, changefeedID)
info, err := h.statusProvider().GetChangeFeedInfo(ctx, changefeedID)
if err != nil {
_ = c.Error(err)
return
Expand Down Expand Up @@ -426,15 +446,15 @@ func (h *openAPI) RemoveChangefeed(c *gin.Context) {
h.forwardToOwner(c)
return
}
statusProvider := h.capture.StatusProvider()

ctx := c.Request.Context()
changefeedID := c.Param(apiOpVarChangefeedID)
if err := model.ValidateChangefeedID(changefeedID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID))
return
}
// check if the changefeed exists
_, err := statusProvider.GetChangeFeedStatus(ctx, changefeedID)
_, err := h.statusProvider().GetChangeFeedStatus(ctx, changefeedID)
if err != nil {
_ = c.Error(err)
return
Expand Down Expand Up @@ -468,7 +488,7 @@ func (h *openAPI) RebalanceTable(c *gin.Context) {
h.forwardToOwner(c)
return
}
statusProvider := h.capture.StatusProvider()

ctx := c.Request.Context()
changefeedID := c.Param(apiOpVarChangefeedID)

Expand All @@ -477,7 +497,7 @@ func (h *openAPI) RebalanceTable(c *gin.Context) {
return
}
// check if the changefeed exists
_, err := statusProvider.GetChangeFeedStatus(ctx, changefeedID)
_, err := h.statusProvider().GetChangeFeedStatus(ctx, changefeedID)
if err != nil {
_ = c.Error(err)
return
Expand Down Expand Up @@ -508,15 +528,15 @@ func (h *openAPI) MoveTable(c *gin.Context) {
h.forwardToOwner(c)
return
}
statusProvider := h.capture.StatusProvider()

ctx := c.Request.Context()
changefeedID := c.Param(apiOpVarChangefeedID)
if err := model.ValidateChangefeedID(changefeedID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID))
return
}
// check if the changefeed exists
_, err := statusProvider.GetChangeFeedStatus(ctx, changefeedID)
_, err := h.statusProvider().GetChangeFeedStatus(ctx, changefeedID)
if err != nil {
_ = c.Error(err)
return
Expand Down Expand Up @@ -582,7 +602,6 @@ func (h *openAPI) GetProcessor(c *gin.Context) {
h.forwardToOwner(c)
return
}
statusProvider := h.capture.StatusProvider()

ctx := c.Request.Context()

Expand All @@ -598,7 +617,7 @@ func (h *openAPI) GetProcessor(c *gin.Context) {
return
}

statuses, err := statusProvider.GetAllTaskStatuses(ctx, changefeedID)
statuses, err := h.statusProvider().GetAllTaskStatuses(ctx, changefeedID)
if err != nil {
_ = c.Error(err)
return
Expand All @@ -609,7 +628,7 @@ func (h *openAPI) GetProcessor(c *gin.Context) {
return
}

positions, err := statusProvider.GetTaskPositions(ctx, changefeedID)
positions, err := h.statusProvider().GetTaskPositions(ctx, changefeedID)
if err != nil {
_ = c.Error(err)
return
Expand Down Expand Up @@ -648,10 +667,9 @@ func (h *openAPI) ListProcessor(c *gin.Context) {
h.forwardToOwner(c)
return
}
statusProvider := h.capture.StatusProvider()

ctx := c.Request.Context()
infos, err := statusProvider.GetProcessors(ctx)
infos, err := h.statusProvider().GetProcessors(ctx)
if err != nil {
_ = c.Error(err)
return
Expand All @@ -678,10 +696,9 @@ func (h *openAPI) ListCapture(c *gin.Context) {
h.forwardToOwner(c)
return
}
statusProvider := h.capture.StatusProvider()

ctx := c.Request.Context()
captureInfos, err := statusProvider.GetCaptures(ctx)
captureInfos, err := h.statusProvider().GetCaptures(ctx)
if err != nil {
_ = c.Error(err)
return
Expand Down
Loading

0 comments on commit 9607554

Please sign in to comment.