Skip to content

Commit

Permalink
Merge remote-tracking branch 'pingcap/master' into coalesce-partition
Browse files Browse the repository at this point in the history
  • Loading branch information
mjonss committed Apr 16, 2023
2 parents b4e6d3a + 9a02bbc commit 2e248a3
Show file tree
Hide file tree
Showing 319 changed files with 18,793 additions and 18,504 deletions.
1 change: 1 addition & 0 deletions .github/licenserc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ header:
- "docs/"
- "br/"
- ".gitignore"
- ".gitmodules"
- ".dockerignore"
- ".gitattributes"
- ".cilinter.yaml"
Expand Down
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[submodule "extension/enterprise"]
path = extension/enterprise
url = [email protected]:pingcap-inc/enterprise-extensions.git
39 changes: 39 additions & 0 deletions Dockerfile.enterprise
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Copyright 2023 PingCAP, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Builder image
FROM rockylinux:9 as builder

ENV GOLANG_VERSION 1.19.3
ENV ARCH amd64
ENV GOLANG_DOWNLOAD_URL https://dl.google.com/go/go$GOLANG_VERSION.linux-$ARCH.tar.gz
ENV GOPATH /go
ENV GOROOT /usr/local/go
ENV PATH $GOPATH/bin:$GOROOT/bin:$PATH
RUN yum update -y && yum groupinstall 'Development Tools' -y \
&& curl -fsSL "$GOLANG_DOWNLOAD_URL" -o golang.tar.gz \
&& tar -C /usr/local -xzf golang.tar.gz \
&& rm golang.tar.gz

COPY . /tidb
ARG GOPROXY
RUN export GOPROXY=${GOPROXY} && cd /tidb && make enterprise-server-build

FROM rockylinux:9-minimal

COPY --from=builder /tidb/bin/tidb-server /tidb-server

WORKDIR /
EXPOSE 4000
ENTRYPOINT ["/tidb-server"]
22 changes: 18 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,24 @@ else
CGO_ENABLED=1 $(GOBUILD) -gcflags="all=-N -l" $(RACE_FLAG) -ldflags '$(LDFLAGS) $(CHECK_FLAG)' -o '$(TARGET)' ./tidb-server
endif

enterprise-prepare:
git submodule init && git submodule update && cd extension/enterprise/generate && $(GO) generate -run genfile main.go

enterprise-clear:
cd extension/enterprise/generate && $(GO) generate -run clear main.go

enterprise-docker: enterprise-prepare
docker build -t "$(DOCKERPREFIX)tidb:latest" --build-arg 'GOPROXY=$(shell go env GOPROXY),' -f Dockerfile.enterprise .

enterprise-server-build:
ifeq ($(TARGET), "")
CGO_ENABLED=1 $(GOBUILD) -tags enterprise $(RACE_FLAG) -ldflags '$(LDFLAGS) $(CHECK_FLAG)' -o bin/tidb-server tidb-server/main.go
else
CGO_ENABLED=1 $(GOBUILD) -tags enterprise $(RACE_FLAG) -ldflags '$(LDFLAGS) $(CHECK_FLAG)' -o '$(TARGET)' tidb-server/main.go
endif

enterprise-server: enterprise-prepare enterprise-server-build

server_check:
ifeq ($(TARGET), "")
$(GOBUILD) $(RACE_FLAG) -ldflags '$(CHECK_LDFLAGS)' -o bin/tidb-server ./tidb-server
Expand Down Expand Up @@ -424,10 +442,6 @@ bazel_coverage_test: check-bazel-prepare failpoint-enable bazel_ci_prepare
--@io_bazel_rules_go//go/config:cover_format=go_cover --define gotags=deadlock,intest \
-- //... -//cmd/... -//tests/graceshutdown/... \
-//tests/globalkilltest/... -//tests/readonlytest/... -//br/pkg/task:task_test -//tests/realtikvtest/...
bazel $(BAZEL_GLOBAL_CONFIG) coverage $(BAZEL_CMD_CONFIG) --build_tests_only --test_keep_going=false \
--@io_bazel_rules_go//go/config:cover_format=go_cover --define gotags=deadlock,intest,disttask \
-- //... -//cmd/... -//tests/graceshutdown/... \
-//tests/globalkilltest/... -//tests/readonlytest/... -//br/pkg/task:task_test -//tests/realtikvtest/...

bazel_build: bazel_ci_prepare
mkdir -p bin
Expand Down
3 changes: 2 additions & 1 deletion br/cmd/br/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,8 @@ func setPDConfigCommand() *cobra.Command {
return errors.Trace(err)
}

mgr, err := task.NewMgr(ctx, tidbGlue, cfg.PD, cfg.TLS, task.GetKeepalive(&cfg), cfg.CheckRequirements, false, conn.NormalVersionChecker)
mgr, err := task.NewMgr(ctx, tidbGlue, cfg.PD, cfg.TLS, task.GetKeepalive(&cfg),
cfg.CheckRequirements, false, conn.NormalVersionChecker)
if err != nil {
return errors.Trace(err)
}
Expand Down
7 changes: 4 additions & 3 deletions br/cmd/br/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,10 @@ func newStreamCheckCommand() *cobra.Command {

func newStreamAdvancerCommand() *cobra.Command {
command := &cobra.Command{
Use: "advancer",
Short: "Start a central worker for advancing the checkpoint. (only for debuging, this subcommand should be integrated to TiDB)",
Args: cobra.NoArgs,
Use: "advancer",
Short: "Start a central worker for advancing the checkpoint. " +
"(only for debuging, this subcommand should be integrated to TiDB)",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
return streamCommand(cmd, task.StreamCtl)
},
Expand Down
1 change: 1 addition & 0 deletions br/pkg/checkpoint/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ go_test(
srcs = ["checkpoint_test.go"],
flaky = True,
race = "on",
shard_count = 3,
deps = [
":checkpoint",
"//br/pkg/rtree",
Expand Down
27 changes: 18 additions & 9 deletions br/pkg/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,8 @@ type CheckpointRunner struct {
}

// only for test
func StartCheckpointRunnerForTest(ctx context.Context, storage storage.ExternalStorage, cipher *backuppb.CipherInfo, tick time.Duration, timer GlobalTimer) (*CheckpointRunner, error) {
func StartCheckpointRunnerForTest(ctx context.Context, storage storage.ExternalStorage,
cipher *backuppb.CipherInfo, tick time.Duration, timer GlobalTimer) (*CheckpointRunner, error) {
runner := &CheckpointRunner{
meta: make(map[string]*RangeGroups),

Expand All @@ -269,7 +270,8 @@ func StartCheckpointRunnerForTest(ctx context.Context, storage storage.ExternalS
return runner, nil
}

func StartCheckpointRunner(ctx context.Context, storage storage.ExternalStorage, cipher *backuppb.CipherInfo, timer GlobalTimer) (*CheckpointRunner, error) {
func StartCheckpointRunner(ctx context.Context, storage storage.ExternalStorage,
cipher *backuppb.CipherInfo, timer GlobalTimer) (*CheckpointRunner, error) {
runner := &CheckpointRunner{
meta: make(map[string]*RangeGroups),

Expand All @@ -293,7 +295,8 @@ func StartCheckpointRunner(ctx context.Context, storage storage.ExternalStorage,
return runner, nil
}

func (r *CheckpointRunner) FlushChecksum(ctx context.Context, tableID int64, crc64xor uint64, totalKvs uint64, totalBytes uint64, timeCost float64) error {
func (r *CheckpointRunner) FlushChecksum(ctx context.Context, tableID int64, crc64xor uint64,
totalKvs uint64, totalBytes uint64, timeCost float64) error {
return r.checksumRunner.FlushChecksum(ctx, r.storage, tableID, crc64xor, totalKvs, totalBytes, timeCost)
}

Expand Down Expand Up @@ -405,7 +408,8 @@ func (r *CheckpointRunner) sendError(err error) {
r.checksumRunner.RecordError(err)
}

func (r *CheckpointRunner) startCheckpointLoop(ctx context.Context, tickDurationForFlush, tickDurationForLock time.Duration) {
func (r *CheckpointRunner) startCheckpointLoop(ctx context.Context, tickDurationForFlush,
tickDurationForLock time.Duration) {
r.wg.Add(1)
checkpointLoop := func(ctx context.Context) {
defer r.wg.Done()
Expand Down Expand Up @@ -557,7 +561,8 @@ func (r *CheckpointRunner) flushLock(ctx context.Context, p int64) error {
LockId: r.lockId,
ExpireAt: p + lockTimeToLive.Milliseconds(),
}
log.Info("start to flush the checkpoint lock", zap.Int64("lock-at", p), zap.Int64("expire-at", lock.ExpireAt))
log.Info("start to flush the checkpoint lock", zap.Int64("lock-at", p),
zap.Int64("expire-at", lock.ExpireAt))
data, err := json.Marshal(lock)
if err != nil {
return errors.Trace(err)
Expand All @@ -584,12 +589,15 @@ func (r *CheckpointRunner) checkLockFile(ctx context.Context, now int64) error {
"Please check whether the BR is running. If not, you can retry.", lock.LockId, r.lockId)
}
if lock.LockId == r.lockId {
log.Warn("The lock has expired.", zap.Int64("expire-at(ms)", lock.ExpireAt), zap.Int64("now(ms)", now))
log.Warn("The lock has expired.",
zap.Int64("expire-at(ms)", lock.ExpireAt), zap.Int64("now(ms)", now))
}
} else if lock.LockId != r.lockId {
return errors.Errorf("The existing lock will expire in %d seconds. "+
"There may be another BR(%d) running. If not, you can wait for the lock to expire, or delete the file `%s%s` manually.",
(lock.ExpireAt-now)/1000, lock.LockId, strings.TrimRight(r.storage.URI(), "/"), CheckpointLockPath)
"There may be another BR(%d) running. If not, you can wait for the lock to expire, "+
"or delete the file `%s%s` manually.",
(lock.ExpireAt-now)/1000, lock.LockId,
strings.TrimRight(r.storage.URI(), "/"), CheckpointLockPath)
}

return nil
Expand Down Expand Up @@ -635,7 +643,8 @@ func (r *CheckpointRunner) initialLock(ctx context.Context) error {

// walk the whole checkpoint range files and retrieve the metadatat of backed up ranges
// and return the total time cost in the past executions
func WalkCheckpointFile(ctx context.Context, s storage.ExternalStorage, cipher *backuppb.CipherInfo, fn func(groupKey string, rg *rtree.Range)) (time.Duration, error) {
func WalkCheckpointFile(ctx context.Context, s storage.ExternalStorage, cipher *backuppb.CipherInfo,
fn func(groupKey string, rg *rtree.Range)) (time.Duration, error) {
// records the total time cost in the past executions
var pastDureTime time.Duration = 0
err := s.WalkDir(ctx, &storage.WalkOption{SubDir: CheckpointDataDir}, func(path string, size int64) error {
Expand Down
9 changes: 6 additions & 3 deletions br/pkg/checksum/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,10 @@ func buildIndexRequest(
var rule *tipb.ChecksumRewriteRule
if oldIndexInfo != nil {
rule = &tipb.ChecksumRewriteRule{
OldPrefix: append(append([]byte{}, oldKeyspace...), tablecodec.EncodeTableIndexPrefix(oldTableID, oldIndexInfo.ID)...),
NewPrefix: append(append([]byte{}, newKeyspace...), tablecodec.EncodeTableIndexPrefix(tableID, indexInfo.ID)...),
OldPrefix: append(append([]byte{}, oldKeyspace...),
tablecodec.EncodeTableIndexPrefix(oldTableID, oldIndexInfo.ID)...),
NewPrefix: append(append([]byte{}, newKeyspace...),
tablecodec.EncodeTableIndexPrefix(tableID, indexInfo.ID)...),
}
}
checksum := &tipb.ChecksumRequest{
Expand Down Expand Up @@ -332,7 +334,8 @@ func (exec *Executor) Execute(
updateFn func(),
) (*tipb.ChecksumResponse, error) {
checksumResp := &tipb.ChecksumResponse{}
checksumBackoffer := utils.InitialRetryState(utils.ChecksumRetryTime, utils.ChecksumWaitInterval, utils.ChecksumMaxWaitInterval)
checksumBackoffer := utils.InitialRetryState(utils.ChecksumRetryTime,
utils.ChecksumWaitInterval, utils.ChecksumMaxWaitInterval)
for _, req := range exec.reqs {
// Pointer to SessionVars.Killed
// Killed is a flag to indicate that this query is killed.
Expand Down
1 change: 1 addition & 0 deletions br/pkg/conn/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ go_test(
],
embed = [":conn"],
flaky = True,
shard_count = 7,
deps = [
"//br/pkg/conn/util",
"//br/pkg/pdutil",
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,8 @@ func (mgr *Mgr) GetTS(ctx context.Context) (uint64, error) {
return oracle.ComposeTS(p, l), nil
}

// GetMergeRegionSizeAndCount returns the tikv config `coprocessor.region-split-size` and `coprocessor.region-split-key`.
// GetMergeRegionSizeAndCount returns the tikv config
// `coprocessor.region-split-size` and `coprocessor.region-split-key`.
// returns the default config when failed.
func (mgr *Mgr) GetMergeRegionSizeAndCount(ctx context.Context, client *http.Client) (uint64, uint64) {
regionSplitSize := DefaultMergeRegionSizeBytes
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ var (
ErrUndefinedRestoreDbOrTable = errors.Normalize("undefined restore databases or tables", errors.RFCCodeText("BR:Common:ErrUndefinedDbOrTable"))
ErrVersionMismatch = errors.Normalize("version mismatch", errors.RFCCodeText("BR:Common:ErrVersionMismatch"))
ErrFailedToConnect = errors.Normalize("failed to make gRPC channels", errors.RFCCodeText("BR:Common:ErrFailedToConnect"))
ErrInvalidMetaFile = errors.Normalize("invalid metafile", errors.RFCCodeText("BR:Common:ErrInvalidMetaFile"))
ErrInvalidMetaFile = errors.Normalize("invalid metafile: %s", errors.RFCCodeText("BR:Common:ErrInvalidMetaFile"))
ErrEnvNotSpecified = errors.Normalize("environment variable not found", errors.RFCCodeText("BR:Common:ErrEnvNotSpecified"))
ErrUnsupportedOperation = errors.Normalize("the operation is not supported", errors.RFCCodeText("BR:Common:ErrUnsupportedOperation"))

Expand Down
6 changes: 4 additions & 2 deletions br/pkg/glue/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ type Session interface {
Execute(ctx context.Context, sql string) error
ExecuteInternal(ctx context.Context, sql string, args ...interface{}) error
CreateDatabase(ctx context.Context, schema *model.DBInfo) error
CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error
CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo,
cs ...ddl.CreateTableWithInfoConfigurier) error
CreatePlacementPolicy(ctx context.Context, policy *model.PolicyInfo) error
Close()
GetGlobalVariable(name string) (string, error)
Expand All @@ -52,7 +53,8 @@ type Session interface {

// BatchCreateTableSession is an interface to batch create table parallelly
type BatchCreateTableSession interface {
CreateTables(ctx context.Context, tables map[string][]*model.TableInfo, cs ...ddl.CreateTableWithInfoConfigurier) error
CreateTables(ctx context.Context, tables map[string][]*model.TableInfo,
cs ...ddl.CreateTableWithInfoConfigurier) error
}

// Progress is an interface recording the current execution progress.
Expand Down
20 changes: 13 additions & 7 deletions br/pkg/glue/progressing.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,13 @@ func (ops ConsoleOperations) StartProgressBar(title string, total int, extraFiel
return ops.startProgressBarOverTTY(title, total, extraFields...)
}

func (ops ConsoleOperations) startProgressBarOverDummy(title string, total int, extraFields ...ExtraField) ProgressWaiter {
func (ops ConsoleOperations) startProgressBarOverDummy(title string, total int,
extraFields ...ExtraField) ProgressWaiter {
return noOPWaiter{utils.StartProgress(context.TODO(), title, int64(total), true, nil)}
}

func (ops ConsoleOperations) startProgressBarOverTTY(title string, total int, extraFields ...ExtraField) ProgressWaiter {
func (ops ConsoleOperations) startProgressBarOverTTY(title string, total int,
extraFields ...ExtraField) ProgressWaiter {
pb := mpb.New(mpb.WithOutput(ops.Out()), mpb.WithRefreshRate(400*time.Millisecond))
bar := adjustTotal(pb, title, total, extraFields...)

Expand All @@ -142,7 +144,8 @@ func buildProgressBar(pb *mpb.Progress, title string, total int, extraFields ...
greenTitle := color.GreenString(title)
return pb.New(int64(total),
// Play as if the old BR style.
mpb.BarStyle().Lbound("<").Filler("-").Padding(".").Rbound(">").Tip("-", "\\", "|", "/", "-").TipOnComplete("-"),
mpb.BarStyle().Lbound("<").Filler("-").Padding(".").Rbound(">").
Tip("-", "\\", "|", "/", "-").TipOnComplete("-"),
mpb.BarFillerMiddleware(func(bf mpb.BarFiller) mpb.BarFiller {
return mpb.BarFillerFunc(func(w io.Writer, reqWidth int, stat decor.Statistics) {
if stat.Aborted || stat.Completed {
Expand All @@ -151,19 +154,22 @@ func buildProgressBar(pb *mpb.Progress, title string, total int, extraFields ...
bf.Fill(w, reqWidth, stat)
})
}),
mpb.PrependDecorators(decor.OnAbort(decor.OnComplete(decor.Name(greenTitle), fmt.Sprintf("%s ::", title)), fmt.Sprintf("%s ::", title))),
mpb.AppendDecorators(decor.OnAbort(decor.Any(cbOnComplete(decor.NewPercentage("%02.2f"), printFinalMessage(extraFields))), color.RedString("ABORTED"))),
mpb.PrependDecorators(decor.OnAbort(decor.OnComplete(decor.Name(greenTitle),
fmt.Sprintf("%s ::", title)), fmt.Sprintf("%s ::", title))),
mpb.AppendDecorators(decor.OnAbort(decor.Any(cbOnComplete(decor.NewPercentage("%02.2f"),
printFinalMessage(extraFields))), color.RedString("ABORTED"))),
)
}

var (
spinnerDoneText string = fmt.Sprintf("... %s", color.GreenString("DONE"))
spinnerDoneText = fmt.Sprintf("... %s", color.GreenString("DONE"))
)

func buildOneTaskBar(pb *mpb.Progress, title string, total int) *mpb.Bar {
return pb.New(int64(total),
mpb.NopStyle(),
mpb.PrependDecorators(decor.Name(title)),
mpb.AppendDecorators(decor.OnAbort(decor.OnComplete(decor.Spinner(spinnerText), spinnerDoneText), color.RedString("ABORTED"))),
mpb.AppendDecorators(decor.OnAbort(decor.OnComplete(decor.Spinner(spinnerText), spinnerDoneText),
color.RedString("ABORTED"))),
)
}
1 change: 1 addition & 0 deletions br/pkg/gluetidb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ go_test(
srcs = ["glue_test.go"],
embed = [":gluetidb"],
flaky = True,
shard_count = 4,
deps = [
"//br/pkg/glue",
"//ddl",
Expand Down
Loading

0 comments on commit 2e248a3

Please sign in to comment.