Skip to content

Commit

Permalink
Merge branch 'pingcap:master' into duptable
Browse files Browse the repository at this point in the history
  • Loading branch information
RidRisR authored Aug 18, 2024
2 parents 3d08410 + 60b96b4 commit a76c067
Show file tree
Hide file tree
Showing 460 changed files with 19,904 additions and 14,748 deletions.
168 changes: 84 additions & 84 deletions DEPS.bzl

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ bazel_coverage_test: failpoint-enable bazel_ci_simple_prepare
bazel_build:
mkdir -p bin
bazel $(BAZEL_GLOBAL_CONFIG) build $(BAZEL_CMD_CONFIG) \
//... --//build:with_nogo_flag=$(NOGO_FLAG)
//... --//build:with_nogo_flag=$(NOGO_FLAG) --subcommands
bazel $(BAZEL_GLOBAL_CONFIG) build $(BAZEL_CMD_CONFIG) \
//cmd/importer:importer //cmd/tidb-server:tidb-server //cmd/tidb-server:tidb-server-check --//build:with_nogo_flag=$(NOGO_FLAG)
cp bazel-out/k8-fastbuild/bin/cmd/tidb-server/tidb-server_/tidb-server ./bin
Expand Down
2 changes: 1 addition & 1 deletion OWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ approvers:
- guo-shaoge
- hanfei1991
- hawkingrei
- hi-rustin
- hicqu
- holys
- hongyunyan
Expand Down Expand Up @@ -80,6 +79,7 @@ approvers:
- qw4990
- rebelice
- Reminiscent
- Rustin170506
- sdojjy
- shenli
- siddontang
Expand Down
15 changes: 7 additions & 8 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ http_archive(

http_archive(
name = "bazel_features",
sha256 = "d7787da289a7fb497352211ad200ec9f698822a9e0757a4976fd9f713ff372b3",
strip_prefix = "bazel_features-1.9.1",
url = "https://github.com/bazel-contrib/bazel_features/releases/download/v1.9.1/bazel_features-v1.9.1.tar.gz",
sha256 = "ba1282c1aa1d1fffdcf994ab32131d7c7551a9bc960fbf05f42d55a1b930cbfb",
strip_prefix = "bazel_features-1.15.0",
url = "https://github.com/bazel-contrib/bazel_features/releases/download/v1.15.0/bazel_features-v1.15.0.tar.gz",
)

load("@bazel_features//:deps.bzl", "bazel_features_deps")
Expand Down Expand Up @@ -48,12 +48,11 @@ http_archive(

http_archive(
name = "bazel_gazelle",
sha256 = "d76bf7a60fd8b050444090dfa2837a4eaf9829e1165618ee35dceca5cbdf58d5",
sha256 = "8ad77552825b078a10ad960bec6ef77d2ff8ec70faef2fd038db713f410f5d87",
urls = [
"http://bazel-cache.pingcap.net:8080/bazelbuild/bazel-gazelle/releases/download/v0.37.0/bazel-gazelle-v0.37.0.tar.gz",
"https://mirror.bazel.build/github.com/bazelbuild/bazel-gazelle/releases/download/v0.37.0/bazel-gazelle-v0.37.0.tar.gz",
"https://github.com/bazelbuild/bazel-gazelle/releases/download/v0.37.0/bazel-gazelle-v0.37.0.tar.gz",
"http://ats.apps.svc/bazelbuild/bazel-gazelle/releases/download/v0.37.0/bazel-gazelle-v0.37.0.tar.gz",
"http://bazel-cache.pingcap.net:8080/bazelbuild/bazel-gazelle/releases/download/v0.38.0/bazel-gazelle-v0.38.0.tar.gz",
"https://github.com/bazelbuild/bazel-gazelle/releases/download/v0.38.0/bazel-gazelle-v0.38.0.tar.gz",
"http://ats.apps.svc/bazelbuild/bazel-gazelle/releases/download/v0.38.0/bazel-gazelle-v0.38.0.tar.gz",
],
)

Expand Down
4 changes: 3 additions & 1 deletion br/pkg/backup/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,12 @@ go_test(
"client_test.go",
"main_test.go",
"schema_test.go",
"store_test.go",
],
embed = [":backup"],
flaky = True,
race = "on",
shard_count = 12,
shard_count = 14,
deps = [
"//br/pkg/conn",
"//br/pkg/gluetidb/mock",
Expand All @@ -92,6 +93,7 @@ go_test(
"@com_github_tikv_client_go_v2//testutils",
"@com_github_tikv_pd_client//:client",
"@io_opencensus_go//stats/view",
"@org_golang_google_grpc//:grpc",
"@org_uber_go_goleak//:goleak",
],
)
65 changes: 64 additions & 1 deletion br/pkg/backup/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"io"
"os"
"sync"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -57,12 +58,73 @@ func (r ResponseAndStore) GetStoreID() uint64 {
return r.StoreID
}

// timeoutRecv cancel the context if `Refresh()` is not called within the specified time `timeout`.
type timeoutRecv struct {
wg sync.WaitGroup
parentCtx context.Context
cancel context.CancelCauseFunc

refresh chan struct{}
}

// Refresh the timeout ticker
func (trecv *timeoutRecv) Refresh() {
select {
case <-trecv.parentCtx.Done():
case trecv.refresh <- struct{}{}:
}
}

// Stop the timeout ticker
func (trecv *timeoutRecv) Stop() {
close(trecv.refresh)
trecv.wg.Wait()
}

var TimeoutOneResponse = time.Hour

func (trecv *timeoutRecv) loop(timeout time.Duration) {
defer trecv.wg.Done()
ticker := time.NewTicker(timeout)
defer ticker.Stop()
for {
ticker.Reset(timeout)
select {
case <-trecv.parentCtx.Done():
return
case _, ok := <-trecv.refresh:
if !ok {
return
}
case <-ticker.C:
log.Warn("receive a backup response timeout")
trecv.cancel(errors.Errorf("receive a backup response timeout"))
}
}
}

func StartTimeoutRecv(ctx context.Context, timeout time.Duration) (context.Context, *timeoutRecv) {
cctx, cancel := context.WithCancelCause(ctx)
trecv := &timeoutRecv{
parentCtx: ctx,
cancel: cancel,
refresh: make(chan struct{}),
}
trecv.wg.Add(1)
go trecv.loop(timeout)
return cctx, trecv
}

func doSendBackup(
ctx context.Context,
pctx context.Context,
client backuppb.BackupClient,
req backuppb.BackupRequest,
respFn func(*backuppb.BackupResponse) error,
) error {
// Backup might be stuck on GRPC `waitonHeader`, so start a timeout ticker to
// terminate the backup if it does not receive any new response for a long time.
ctx, timerecv := StartTimeoutRecv(pctx, TimeoutOneResponse)
defer timerecv.Stop()
failpoint.Inject("hint-backup-start", func(v failpoint.Value) {
logutil.CL(ctx).Info("failpoint hint-backup-start injected, " +
"process will notify the shell.")
Expand Down Expand Up @@ -107,6 +169,7 @@ func doSendBackup(

for {
resp, err := bCli.Recv()
timerecv.Refresh()
if err != nil {
if errors.Cause(err) == io.EOF { // nolint:errorlint
logutil.CL(ctx).Debug("backup streaming finish",
Expand Down
98 changes: 98 additions & 0 deletions br/pkg/backup/store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package backup

import (
"context"
"io"
"testing"
"time"

backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
)

type MockBackupClient struct {
backuppb.BackupClient

recvFunc func(context.Context) (*backuppb.BackupResponse, error)
}

func (mbc *MockBackupClient) Backup(ctx context.Context, _ *backuppb.BackupRequest, _ ...grpc.CallOption) (backuppb.Backup_BackupClient, error) {
return &MockBackupBackupClient{ctx: ctx, recvFunc: mbc.recvFunc}, nil
}

type MockBackupBackupClient struct {
backuppb.Backup_BackupClient

ctx context.Context
recvFunc func(context.Context) (*backuppb.BackupResponse, error)
}

func (mbbc *MockBackupBackupClient) CloseSend() error {
return nil
}

func (mbbc *MockBackupBackupClient) Recv() (*backuppb.BackupResponse, error) {
if mbbc.recvFunc != nil {
return mbbc.recvFunc(mbbc.ctx)
}
return &backuppb.BackupResponse{}, nil
}

func TestTimeoutRecv(t *testing.T) {
ctx := context.Background()
TimeoutOneResponse = time.Millisecond * 800
// Just Timeout Once
{
err := doSendBackup(ctx, &MockBackupClient{
recvFunc: func(ctx context.Context) (*backuppb.BackupResponse, error) {
time.Sleep(time.Second)
require.Error(t, ctx.Err())
return nil, io.EOF
},
}, backuppb.BackupRequest{}, func(br *backuppb.BackupResponse) error { return nil })
require.NoError(t, err)
}

// Timeout Not At First
{
count := 0
err := doSendBackup(ctx, &MockBackupClient{
recvFunc: func(ctx context.Context) (*backuppb.BackupResponse, error) {
require.NoError(t, ctx.Err())
if count == 15 {
time.Sleep(time.Second)
require.Error(t, ctx.Err())
return nil, io.EOF
}
count += 1
time.Sleep(time.Millisecond * 80)
return &backuppb.BackupResponse{}, nil
},
}, backuppb.BackupRequest{}, func(br *backuppb.BackupResponse) error { return nil })
require.NoError(t, err)
}
}

func TestTimeoutRecvCancel(t *testing.T) {
ctx := context.Background()
cctx, cancel := context.WithCancel(ctx)

_, trecv := StartTimeoutRecv(cctx, time.Hour)
cancel()
trecv.wg.Wait()
}
1 change: 0 additions & 1 deletion br/pkg/mock/mock_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ func NewCluster() (*Cluster, error) {
}
cluster.Storage = storage

session.SetSchemaLease(0)
session.DisableStats4Test()
dom, err := session.BootstrapSession(storage)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/log_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1577,7 +1577,7 @@ func (rc *LogClient) FailpointDoChecksumForLogRestore(
reidRules[downstreamID] = upstreamID
}
for upstreamID, downstreamID := range idrules {
newTable, ok := infoSchema.TableByID(downstreamID)
newTable, ok := infoSchema.TableByID(ctx, downstreamID)
if !ok {
// a dropped table
continue
Expand Down
5 changes: 3 additions & 2 deletions br/pkg/restore/tiflashrec/tiflash_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package tiflashrec

import (
"bytes"
"context"
"fmt"

"github.com/pingcap/log"
Expand Down Expand Up @@ -90,7 +91,7 @@ func (r *TiFlashRecorder) Rewrite(oldID int64, newID int64) {
func (r *TiFlashRecorder) GenerateResetAlterTableDDLs(info infoschema.InfoSchema) []string {
items := make([]string, 0, len(r.items))
r.Iterate(func(id int64, replica model.TiFlashReplicaInfo) {
table, ok := info.TableByID(id)
table, ok := info.TableByID(context.Background(), id)
if !ok {
log.Warn("Table do not exist, skipping", zap.Int64("id", id))
return
Expand Down Expand Up @@ -130,7 +131,7 @@ func (r *TiFlashRecorder) GenerateResetAlterTableDDLs(info infoschema.InfoSchema
func (r *TiFlashRecorder) GenerateAlterTableDDLs(info infoschema.InfoSchema) []string {
items := make([]string, 0, len(r.items))
r.Iterate(func(id int64, replica model.TiFlashReplicaInfo) {
table, ok := info.TableByID(id)
table, ok := info.TableByID(context.Background(), id)
if !ok {
log.Warn("Table do not exist, skipping", zap.Int64("id", id))
return
Expand Down
1 change: 1 addition & 0 deletions br/pkg/utils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ go_library(
"@org_golang_google_grpc//credentials/insecure",
"@org_golang_google_grpc//keepalive",
"@org_golang_google_grpc//status",
"@org_golang_x_term//:term",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_multierr//:multierr",
"@org_uber_go_zap//:zap",
Expand Down
18 changes: 17 additions & 1 deletion br/pkg/utils/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"encoding/json"
"io"
"os"
"sync"
"sync/atomic"
"time"
Expand All @@ -14,6 +15,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
"go.uber.org/zap"
"golang.org/x/term"
)

type logFunc func(msg string, fields ...zap.Field)
Expand Down Expand Up @@ -74,14 +76,27 @@ func (pp *ProgressPrinter) Close() {
}
}

// getTerminalOutput try to use os.Stderr as terminal output
func getTerminalOutput() io.Writer {
output := os.Stdout
if term.IsTerminal(int(output.Fd())) {
return output
}
return nil
}

// goPrintProgress starts a gorouinte and prints progress.
func (pp *ProgressPrinter) goPrintProgress(
ctx context.Context,
logFuncImpl logFunc,
testWriter io.Writer, // Only for tests
) {
var terminalOutput io.Writer
if !pp.redirectLog && testWriter == nil {
terminalOutput = getTerminalOutput()
}
bar := pb.New64(pp.total)
if pp.redirectLog || testWriter != nil {
if terminalOutput == nil {
tmpl := `{"P":"{{percent .}}","C":"{{counters . }}","E":"{{etime .}}","R":"{{rtime .}}","S":"{{speed .}}"}`
bar.SetTemplateString(tmpl)
bar.SetRefreshRate(2 * time.Minute)
Expand All @@ -98,6 +113,7 @@ func (pp *ProgressPrinter) goPrintProgress(
tmpl := `{{string . "barName" | green}} {{ bar . "<" "-" (cycle . "-" "\\" "|" "/" ) "." ">"}} {{percent .}}`
bar.SetTemplateString(tmpl)
bar.Set("barName", pp.name)
bar.SetWriter(terminalOutput)
}
if testWriter != nil {
bar.SetWriter(testWriter)
Expand Down
1 change: 1 addition & 0 deletions build/linter/allrevive/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//build/linter/util",
"@com_github_hashicorp_go_version//:go-version",
"@com_github_mgechev_revive//config",
"@com_github_mgechev_revive//lint",
"@com_github_mgechev_revive//rule",
Expand Down
Loading

0 comments on commit a76c067

Please sign in to comment.