Skip to content

Commit

Permalink
use upstream pd client in api v2
Browse files Browse the repository at this point in the history
  • Loading branch information
asddongmen committed Nov 15, 2023
1 parent 1e2f277 commit dd2265b
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 28 deletions.
64 changes: 36 additions & 28 deletions cdc/api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package v2

import (
"context"
"fmt"
"net/http"
"sort"
"strings"
Expand All @@ -23,16 +24,17 @@ import (
"github.com/gin-gonic/gin"
"github.com/pingcap/errors"
"github.com/pingcap/log"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tiflow/cdc/api"
"github.com/pingcap/tiflow/cdc/capture"
"github.com/pingcap/tiflow/cdc/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/pingcap/tiflow/pkg/security"
"github.com/pingcap/tiflow/pkg/txnutil/gc"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/util"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -344,26 +346,30 @@ func (h *OpenAPIV2) updateChangefeed(c *gin.Context) {
zap.String("changefeedInfo", oldCfInfo.String()),
zap.Any("upstreamInfo", OldUpInfo))

var pdAddrs []string
var credentials *security.Credential
if OldUpInfo != nil {
pdAddrs = strings.Split(OldUpInfo.PDEndpoints, ",")
credentials = &security.Credential{
CAPath: OldUpInfo.CAPath,
CertPath: OldUpInfo.CertPath,
KeyPath: OldUpInfo.KeyPath,
CertAllowedCN: OldUpInfo.CertAllowedCN,
}
upManager, err := h.capture.GetUpstreamManager()
if err != nil {
_ = c.Error(err)
return
}
if len(updateCfConfig.PDAddrs) != 0 {
pdAddrs = updateCfConfig.PDAddrs
credentials = updateCfConfig.PDConfig.toCredential()
var storage tidbkv.Storage
// if PDAddrs is not empty, use it to create a new kvstore
// Note: upManager is nil in some unit test cases
if len(updateCfConfig.PDAddrs) != 0 || upManager == nil {
pdAddrs := updateCfConfig.PDAddrs
credentials := updateCfConfig.PDConfig.toCredential()
storage, err = h.helpers.createTiStore(pdAddrs, credentials)
if err != nil {
_ = c.Error(errors.Trace(err))
}
} else { // get the upstream of the changefeed to get the kvstore
up, ok := upManager.Get(oldCfInfo.UpstreamID)
if !ok {
_ = c.Error(errors.New(fmt.Sprintf("upstream %d not found", oldCfInfo.UpstreamID)))
return
}
storage = up.KVStorage
}

storage, err := h.helpers.createTiStore(pdAddrs, credentials)
if err != nil {
_ = c.Error(errors.Trace(err))
}
newCfInfo, newUpInfo, err := h.helpers.verifyUpdateChangefeedConfig(ctx,
updateCfConfig, oldCfInfo, OldUpInfo, storage, cfStatus.CheckpointTs)
if err != nil {
Expand Down Expand Up @@ -586,22 +592,24 @@ func (h *OpenAPIV2) resumeChangefeed(c *gin.Context) {
return
}

var pdClient pd.Client
// if PDAddrs is empty, use the default pdClien
if len(cfg.PDAddrs) == 0 {
up, err := getCaptureDefaultUpstream(h.capture)
if err != nil {
_ = c.Error(err)
return
}
cfg.PDConfig = getUpstreamPDConfig(up)
}
credential := cfg.PDConfig.toCredential()

timeoutCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
pdClient, err := h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential)
if err != nil {
_ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err))
return
pdClient = up.PDClient
} else {
credential := cfg.PDConfig.toCredential()
timeoutCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
pdClient, err = h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential)
if err != nil {
_ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err))
return
}
}
defer pdClient.Close()

Expand Down
4 changes: 4 additions & 0 deletions cdc/api/v2/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,7 @@ func TestUpdateChangefeed(t *testing.T) {
createTiStore(gomock.Any(), gomock.Any()).
Return(nil, nil).
AnyTimes()
cp.EXPECT().GetUpstreamManager().Return(nil, nil).AnyTimes()
helpers.EXPECT().
verifyUpdateChangefeedConfig(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(&model.ChangeFeedInfo{}, &model.UpstreamInfo{}, cerrors.ErrChangefeedUpdateRefused).
Expand All @@ -445,6 +446,7 @@ func TestUpdateChangefeed(t *testing.T) {
require.Equal(t, http.StatusBadRequest, w.Code)

// case 7: update transaction failed
cp.EXPECT().GetUpstreamManager().Return(upstream.NewManager4Test(&mockPDClient{}), nil).AnyTimes()
helpers.EXPECT().
verifyUpdateChangefeedConfig(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(&model.ChangeFeedInfo{}, &model.UpstreamInfo{}, nil).
Expand All @@ -468,6 +470,7 @@ func TestUpdateChangefeed(t *testing.T) {
verifyUpdateChangefeedConfig(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(oldCfInfo, &model.UpstreamInfo{}, nil).
Times(1)
cp.EXPECT().GetUpstreamManager().Return(upstream.NewManager4Test(&mockPDClient{}), nil).AnyTimes()
etcdClient.EXPECT().
UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil).Times(1)
Expand All @@ -484,6 +487,7 @@ func TestUpdateChangefeed(t *testing.T) {
verifyUpdateChangefeedConfig(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(oldCfInfo, &model.UpstreamInfo{}, nil).
Times(1)
cp.EXPECT().GetUpstreamManager().Return(upstream.NewManager4Test(&mockPDClient{}), nil).AnyTimes()
etcdClient.EXPECT().
UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil).Times(1)
Expand Down

0 comments on commit dd2265b

Please sign in to comment.