Skip to content

Commit

Permalink
update go mod
Browse files Browse the repository at this point in the history
  • Loading branch information
sdojjy committed Jan 19, 2022
1 parent f0dc68e commit 0775b69
Show file tree
Hide file tree
Showing 42 changed files with 400 additions and 228 deletions.
2 changes: 1 addition & 1 deletion cdc/entry/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ import (
tidbkv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/regionspan"
"github.com/pingcap/tiflow/pkg/util/testleak"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
)

Expand Down
2 changes: 1 addition & 1 deletion cdc/entry/schema_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ import (
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tiflow/cdc/kv"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/util/testleak"
"github.com/tikv/client-go/v2/oracle"
)

type schemaSuite struct{}
Expand Down
2 changes: 1 addition & 1 deletion cdc/entry/schema_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import (
timeta "github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/util/testkit"
"github.com/tikv/client-go/v2/oracle"
)

// SchemaTestHelper is a test helper for schema which creates an internal tidb instance to generate DDL jobs with meta information
Expand Down
2 changes: 1 addition & 1 deletion cdc/http_api_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tiflow/cdc/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
)

Expand Down
2 changes: 1 addition & 1 deletion cdc/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/owner"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/logutil"
"github.com/tikv/client-go/v2/oracle"
"go.etcd.io/etcd/clientv3/concurrency"
"go.uber.org/zap"
)
Expand Down
6 changes: 3 additions & 3 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/store/tikv"
tidbkv "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
Expand All @@ -40,6 +38,8 @@ import (
"github.com/pingcap/tiflow/pkg/util"
"github.com/pingcap/tiflow/pkg/version"
"github.com/prometheus/client_golang/prometheus"
tidbkv "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -1079,7 +1079,7 @@ func (s *eventFeedSession) handleError(ctx context.Context, errInfo regionErrorI
if notLeader := innerErr.GetNotLeader(); notLeader != nil {
metricFeedNotLeaderCounter.Inc()
// TODO: Handle the case that notleader.GetLeader() is nil.
s.regionCache.UpdateLeader(errInfo.verID, notLeader.GetLeader().GetStoreId(), errInfo.rpcCtx.AccessIdx)
s.regionCache.UpdateLeader(errInfo.verID, notLeader.GetLeader(), errInfo.rpcCtx.AccessIdx)
} else if innerErr.GetEpochNotMatch() != nil {
// TODO: If only confver is updated, we don't need to reload the region from region cache.
metricFeedEpochNotMatchCounter.Inc()
Expand Down
10 changes: 5 additions & 5 deletions cdc/kv/client_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ import (
"github.com/pingcap/kvproto/pkg/cdcpb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/store/mockstore/mockcopr"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/mockstore/mocktikv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/regionspan"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/pingcap/tiflow/pkg/security"
"github.com/pingcap/tiflow/pkg/txnutil"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/testutils"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -140,7 +140,7 @@ func prepareBenchMultiStore(b *testing.B, storeNum, regionNum int) (
}()
}

rpcClient, cluster, pdClient, err := mocktikv.NewTiKVAndPDClient("", mockcopr.NewCoprRPCHandler())
rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler())
if err != nil {
b.Error(err)
}
Expand Down Expand Up @@ -253,7 +253,7 @@ func prepareBench(b *testing.B, regionNum int) (
server1.Stop()
}()

rpcClient, cluster, pdClient, err := mocktikv.NewTiKVAndPDClient("", mockcopr.NewCoprRPCHandler())
rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler())
if err != nil {
b.Error(err)
}
Expand Down
67 changes: 33 additions & 34 deletions cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/mockstore/mockcopr"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/mockstore/mocktikv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
Expand All @@ -47,6 +44,9 @@ import (
"github.com/pingcap/tiflow/pkg/util"
"github.com/pingcap/tiflow/pkg/util/testleak"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/testutils"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
Expand All @@ -69,16 +69,15 @@ var _ = check.Suite(&clientSuite{})

func (s *clientSuite) TestNewClose(c *check.C) {
defer testleak.AfterTest(c)()
store := mocktikv.MustNewMVCCStore()
defer store.Close() //nolint:errcheck
cluster := mocktikv.NewCluster(store)
pdCli := mocktikv.NewPDClient(cluster)
defer pdCli.Close() //nolint:errcheck
rpcClient, _, pdClient, err := testutils.NewMockTiKV("", nil)
c.Assert(err, check.IsNil)
defer pdClient.Close()
defer rpcClient.Close()

grpcPool := NewGrpcPoolImpl(context.Background(), &security.Credential{})
defer grpcPool.Close()
cli := NewCDCClient(context.Background(), pdCli, nil, grpcPool, "")
err := cli.Close()
cli := NewCDCClient(context.Background(), pdClient, nil, grpcPool, "")
err = cli.Close()
c.Assert(err, check.IsNil)
}

Expand Down Expand Up @@ -331,7 +330,7 @@ func (s *etcdSuite) TestConnectOfflineTiKV(c *check.C) {
wg.Wait()
}()

rpcClient, cluster, pdClient, err := mocktikv.NewTiKVAndPDClient("", mockcopr.NewCoprRPCHandler())
rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler())
c.Assert(err, check.IsNil)
pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen}
tiStore, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
Expand Down Expand Up @@ -431,7 +430,7 @@ func (s *etcdSuite) TestRecvLargeMessageSize(c *check.C) {
// Cancel first, and then close the server.
defer cancel()

rpcClient, cluster, pdClient, err := mocktikv.NewTiKVAndPDClient("", mockcopr.NewCoprRPCHandler())
rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler())
c.Assert(err, check.IsNil)
pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen}
defer pdClient.Close() //nolint:errcheck
Expand Down Expand Up @@ -522,7 +521,7 @@ func (s *etcdSuite) TestHandleError(c *check.C) {
wg.Wait()
}()

rpcClient, cluster, pdClient, err := mocktikv.NewTiKVAndPDClient("", mockcopr.NewCoprRPCHandler())
rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler())
c.Assert(err, check.IsNil)
pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen}
tiStore, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
Expand Down Expand Up @@ -687,7 +686,7 @@ func (s *etcdSuite) TestCompatibilityWithSameConn(c *check.C) {
wg.Wait()
}()

rpcClient, cluster, pdClient, err := mocktikv.NewTiKVAndPDClient("", mockcopr.NewCoprRPCHandler())
rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler())
c.Assert(err, check.IsNil)
pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen}
tiStore, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
Expand Down Expand Up @@ -749,7 +748,7 @@ func (s *etcdSuite) testHandleFeedEvent(c *check.C) {
wg.Wait()
}()

rpcClient, cluster, pdClient, err := mocktikv.NewTiKVAndPDClient("", mockcopr.NewCoprRPCHandler())
rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler())
c.Assert(err, check.IsNil)
pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen}
tiStore, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
Expand Down Expand Up @@ -1196,7 +1195,7 @@ func (s *etcdSuite) TestStreamSendWithError(c *check.C) {
}
}

rpcClient, cluster, pdClient, err := mocktikv.NewTiKVAndPDClient("", mockcopr.NewCoprRPCHandler())
rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler())
c.Assert(err, check.IsNil)
pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen}
tiStore, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
Expand Down Expand Up @@ -1302,7 +1301,7 @@ func (s *etcdSuite) testStreamRecvWithError(c *check.C, failpointStr string) {
wg.Wait()
}()

rpcClient, cluster, pdClient, err := mocktikv.NewTiKVAndPDClient("", mockcopr.NewCoprRPCHandler())
rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler())
c.Assert(err, check.IsNil)
pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen}
tiStore, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
Expand Down Expand Up @@ -1436,7 +1435,7 @@ func (s *etcdSuite) TestStreamRecvWithErrorAndResolvedGoBack(c *check.C) {
wg.Wait()
}()

rpcClient, cluster, pdClient, err := mocktikv.NewTiKVAndPDClient("", mockcopr.NewCoprRPCHandler())
rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler())
c.Assert(err, check.IsNil)
pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen}
tiStore, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
Expand Down Expand Up @@ -1641,7 +1640,7 @@ func (s *etcdSuite) TestIncompatibleTiKV(c *check.C) {
wg.Wait()
}()

rpcClient, cluster, pdClient, err := mocktikv.NewTiKVAndPDClient("", mockcopr.NewCoprRPCHandler())
rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler())
c.Assert(err, check.IsNil)
pdClient = &mockPDClient{Client: pdClient, versionGen: gen}
tiStore, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
Expand Down Expand Up @@ -1722,7 +1721,7 @@ func (s *etcdSuite) TestNoPendingRegionError(c *check.C) {
wg.Wait()
}()

rpcClient, cluster, pdClient, err := mocktikv.NewTiKVAndPDClient("", mockcopr.NewCoprRPCHandler())
rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler())
c.Assert(err, check.IsNil)
pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen}
tiStore, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
Expand Down Expand Up @@ -1798,7 +1797,7 @@ func (s *etcdSuite) TestDropStaleRequest(c *check.C) {
wg.Wait()
}()

rpcClient, cluster, pdClient, err := mocktikv.NewTiKVAndPDClient("", mockcopr.NewCoprRPCHandler())
rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler())
c.Assert(err, check.IsNil)
pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen}
tiStore, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
Expand Down Expand Up @@ -1902,7 +1901,7 @@ func (s *etcdSuite) TestResolveLock(c *check.C) {
wg.Wait()
}()

rpcClient, cluster, pdClient, err := mocktikv.NewTiKVAndPDClient("", mockcopr.NewCoprRPCHandler())
rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler())
c.Assert(err, check.IsNil)
pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen}
tiStore, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
Expand Down Expand Up @@ -1996,7 +1995,7 @@ func (s *etcdSuite) testEventCommitTsFallback(c *check.C, events []*cdcpb.Change
wg.Wait()
}()

rpcClient, cluster, pdClient, err := mocktikv.NewTiKVAndPDClient("", mockcopr.NewCoprRPCHandler())
rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler())
c.Assert(err, check.IsNil)
pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen}
tiStore, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
Expand Down Expand Up @@ -2145,7 +2144,7 @@ func (s *etcdSuite) testEventAfterFeedStop(c *check.C) {
}
}

rpcClient, cluster, pdClient, err := mocktikv.NewTiKVAndPDClient("", mockcopr.NewCoprRPCHandler())
rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler())
c.Assert(err, check.IsNil)
pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen}
tiStore, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
Expand Down Expand Up @@ -2332,7 +2331,7 @@ func (s *etcdSuite) TestOutOfRegionRangeEvent(c *check.C) {
wg.Wait()
}()

rpcClient, cluster, pdClient, err := mocktikv.NewTiKVAndPDClient("", mockcopr.NewCoprRPCHandler())
rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler())
c.Assert(err, check.IsNil)
pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen}
tiStore, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
Expand Down Expand Up @@ -2543,7 +2542,7 @@ func (s *etcdSuite) TestResolveLockNoCandidate(c *check.C) {
wg.Wait()
}()

rpcClient, cluster, pdClient, err := mocktikv.NewTiKVAndPDClient("", mockcopr.NewCoprRPCHandler())
rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler())
c.Assert(err, check.IsNil)
pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen}
tiStore, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
Expand Down Expand Up @@ -2631,7 +2630,7 @@ func (s *etcdSuite) TestFailRegionReentrant(c *check.C) {
wg.Wait()
}()

rpcClient, cluster, pdClient, err := mocktikv.NewTiKVAndPDClient("", mockcopr.NewCoprRPCHandler())
rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler())
c.Assert(err, check.IsNil)
pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen}
tiStore, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
Expand Down Expand Up @@ -2711,7 +2710,7 @@ func (s *etcdSuite) TestClientV1UnlockRangeReentrant(c *check.C) {
srv1 := newMockChangeDataService(c, ch1)
server1, addr1 := newMockService(ctx, c, srv1, wg)

rpcClient, cluster, pdClient, err := mocktikv.NewTiKVAndPDClient("", mockcopr.NewCoprRPCHandler())
rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler())
c.Assert(err, check.IsNil)
pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen}
tiStore, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
Expand Down Expand Up @@ -2774,7 +2773,7 @@ func (s *etcdSuite) testClientErrNoPendingRegion(c *check.C) {
srv1 := newMockChangeDataService(c, ch1)
server1, addr1 := newMockService(ctx, c, srv1, wg)

rpcClient, cluster, pdClient, err := mocktikv.NewTiKVAndPDClient("", mockcopr.NewCoprRPCHandler())
rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler())
c.Assert(err, check.IsNil)
pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen}
tiStore, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
Expand Down Expand Up @@ -2855,7 +2854,7 @@ func (s *etcdSuite) testKVClientForceReconnect(c *check.C) {
}
}

rpcClient, cluster, pdClient, err := mocktikv.NewTiKVAndPDClient("", mockcopr.NewCoprRPCHandler())
rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler())
c.Assert(err, check.IsNil)
pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen}
tiStore, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
Expand Down Expand Up @@ -3007,7 +3006,7 @@ func (s *etcdSuite) TestConcurrentProcessRangeRequest(c *check.C) {
wg.Wait()
}()

rpcClient, cluster, pdClient, err := mocktikv.NewTiKVAndPDClient("", mockcopr.NewCoprRPCHandler())
rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler())
c.Assert(err, check.IsNil)
pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen}
tiStore, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
Expand Down Expand Up @@ -3121,7 +3120,7 @@ func (s *etcdSuite) TestEvTimeUpdate(c *check.C) {
wg.Wait()
}()

rpcClient, cluster, pdClient, err := mocktikv.NewTiKVAndPDClient("", mockcopr.NewCoprRPCHandler())
rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler())
c.Assert(err, check.IsNil)
pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen}
tiStore, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
Expand Down Expand Up @@ -3248,7 +3247,7 @@ func (s *etcdSuite) TestRegionWorkerExitWhenIsIdle(c *check.C) {
}
}

rpcClient, cluster, pdClient, err := mocktikv.NewTiKVAndPDClient("", mockcopr.NewCoprRPCHandler())
rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler())
c.Assert(err, check.IsNil)
pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen}
tiStore, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
Expand Down Expand Up @@ -3337,7 +3336,7 @@ func (s *etcdSuite) TestPrewriteNotMatchError(c *check.C) {
}
}

rpcClient, cluster, pdClient, err := mocktikv.NewTiKVAndPDClient("", mockcopr.NewCoprRPCHandler())
rpcClient, cluster, pdClient, err := testutils.NewMockTiKV("", mockcopr.NewCoprRPCHandler())
c.Assert(err, check.IsNil)
pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen}
tiStore, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0)
Expand Down
2 changes: 1 addition & 1 deletion cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/cdcpb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/regionspan"
"github.com/pingcap/tiflow/pkg/util"
"github.com/pingcap/tiflow/pkg/workerpool"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
Expand Down
Loading

0 comments on commit 0775b69

Please sign in to comment.