Skip to content

Commit

Permalink
Merge branch 'master' into flatten_trace_output
Browse files Browse the repository at this point in the history
  • Loading branch information
Yisaer authored Dec 23, 2021
2 parents 18aef85 + 7121bf0 commit 2d94321
Show file tree
Hide file tree
Showing 151 changed files with 5,419 additions and 613 deletions.
10 changes: 10 additions & 0 deletions .github/pull_request_template.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,16 @@ PR Title Format:
-->

### What problem does this PR solve?
<!--
Please create an issue first to describe the problem.
There MUST be one line starting with "Issue Number: " and
linking the relevant issues via the "close" or "ref".
For more info, check https://pingcap.github.io/tidb-dev-guide/contribute-to-tidb/contribute-code.html#referring-to-an-issue.
-->

Issue Number: close #xxx

Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ coverage.out
*.iml
*.swp
*.log
*.test.bin
tags
profile.coverprofile
explain_test
Expand Down
17 changes: 16 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ dev: checklist check explaintest devgotest gogenerate br_unit_test test_part_par
# Install the check tools.
check-setup:tools/bin/revive tools/bin/goword

check: fmt unconvert lint tidy testSuite check-static vet errdoc
check: fmt check-parallel unconvert lint tidy testSuite check-static vet errdoc

fmt:
@echo "gofmt (simplify)"
Expand Down Expand Up @@ -75,6 +75,13 @@ testSuite:
@echo "testSuite"
./tools/check/check_testSuite.sh

check-parallel:
# Make sure no tests are run in parallel to prevent possible unstable tests.
# See https://github.com/pingcap/tidb/pull/30692.
@! find . -name "*_test.go" -not -path "./vendor/*" -print0 | \
xargs -0 grep -F -n "t.Parallel()" || \
! echo "Error: all the go tests should be run in serial."

clean: failpoint-disable
$(GO) clean -i ./...

Expand Down Expand Up @@ -119,6 +126,10 @@ devgotest: failpoint-enable
$(GOTEST) -ldflags '$(TEST_LDFLAGS)' $(EXTRA_TEST_ARGS) -cover $(PACKAGES_TIDB_TESTS) -check.p true > gotest.log || { $(FAILPOINT_DISABLE); grep -v '^\([[]20\|PASS:\|ok \)' 'gotest.log'; exit 1; }
@$(FAILPOINT_DISABLE)

ut: failpoint-enable tools/bin/ut
tools/bin/ut $(X);
@$(FAILPOINT_DISABLE)

gotest: failpoint-enable
@echo "Running in native mode."
@export log_level=info; export TZ='Asia/Shanghai'; \
Expand Down Expand Up @@ -213,6 +224,10 @@ failpoint-disable: tools/bin/failpoint-ctl
# Restoring gofail failpoints...
@$(FAILPOINT_DISABLE)

tools/bin/ut: tools/check/ut.go
cd tools/check; \
$(GO) build -o ../bin/ut ut.go

tools/bin/megacheck: tools/check/go.mod
cd tools/check; \
$(GO) build -o ../bin/megacheck honnef.co/go/tools/cmd/megacheck
Expand Down
8 changes: 7 additions & 1 deletion br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,13 @@ func (bc *Client) BackupRanges(
elctx := logutil.ContextWithField(ectx, logutil.RedactAny("range-sn", id))
err := bc.BackupRange(elctx, sk, ek, req, metaWriter, progressCallBack)
if err != nil {
return errors.Trace(err)
// The error due to context cancel, stack trace is meaningless, the stack shall be suspended (also clear)
if errors.Cause(err) == context.Canceled {
return errors.SuspendStack(err)
} else {
return errors.Trace(err)
}

}
return nil
})
Expand Down
15 changes: 9 additions & 6 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,9 @@ type local struct {
duplicateDetection bool
duplicateDB *pebble.DB
errorMgr *errormanager.ErrorManager
}

var bufferPool = membuf.NewPool(1024, manual.Allocator{})
bufferPool *membuf.Pool
}

func openDuplicateDB(storeDir string) (*pebble.DB, error) {
dbPath := filepath.Join(storeDir, duplicateDBName)
Expand Down Expand Up @@ -244,6 +244,8 @@ func NewLocalBackend(
checkTiKVAvaliable: cfg.App.CheckRequirements,
duplicateDB: duplicateDB,
errorMgr: errorMgr,

bufferPool: membuf.NewPool(membuf.WithAllocator(manual.Allocator{})),
}
local.conns = common.NewGRPCConns()
if err = local.checkMultiIngestSupport(ctx); err != nil {
Expand Down Expand Up @@ -423,6 +425,7 @@ func (local *local) Close() {
engine.unlock()
}
local.conns.Close()
local.bufferPool.Destroy()

if local.duplicateDB != nil {
// Check whether there are duplicates.
Expand Down Expand Up @@ -776,7 +779,7 @@ func (local *local) WriteToTiKV(
requests = append(requests, req)
}

bytesBuf := bufferPool.NewBuffer()
bytesBuf := local.bufferPool.NewBuffer()
defer bytesBuf.Destroy()
pairs := make([]*sst.Pair, 0, local.batchWriteKVPairs)
count := 0
Expand Down Expand Up @@ -1664,14 +1667,14 @@ func (local *local) LocalWriter(ctx context.Context, cfg *backend.LocalWriterCon
return nil, errors.Errorf("could not find engine for %s", engineUUID.String())
}
engine := e.(*Engine)
return openLocalWriter(cfg, engine, local.localWriterMemCacheSize)
return openLocalWriter(cfg, engine, local.localWriterMemCacheSize, local.bufferPool.NewBuffer())
}

func openLocalWriter(cfg *backend.LocalWriterConfig, engine *Engine, cacheSize int64) (*Writer, error) {
func openLocalWriter(cfg *backend.LocalWriterConfig, engine *Engine, cacheSize int64, kvBuffer *membuf.Buffer) (*Writer, error) {
w := &Writer{
engine: engine,
memtableSizeLimit: cacheSize,
kvBuffer: bufferPool.NewBuffer(),
kvBuffer: kvBuffer,
isKVSorted: cfg.IsKVSorted,
isWriteBatchSorted: true,
}
Expand Down
6 changes: 5 additions & 1 deletion br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
"github.com/pingcap/tidb/br/pkg/membuf"
"github.com/pingcap/tidb/br/pkg/mock"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/restore"
Expand Down Expand Up @@ -357,7 +358,10 @@ func testLocalWriter(c *C, needSort bool, partitialSort bool) {
f.wg.Add(1)
go f.ingestSSTLoop()
sorted := needSort && !partitialSort
w, err := openLocalWriter(&backend.LocalWriterConfig{IsKVSorted: sorted}, f, 1024)
pool := membuf.NewPool()
defer pool.Destroy()
kvBuffer := pool.NewBuffer()
w, err := openLocalWriter(&backend.LocalWriterConfig{IsKVSorted: sorted}, f, 1024, kvBuffer)
c.Assert(err, IsNil)

ctx := context.Background()
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/lightning/backend/local/localhelper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ func newTestClient(
}
}

// ScatterRegions scatters regions in a batch.
func (c *testClient) ScatterRegions(ctx context.Context, regionInfo []*restore.RegionInfo) error {
return nil
}

func (c *testClient) GetAllRegions() map[uint64]*restore.RegionInfo {
c.mu.RLock()
defer c.mu.RUnlock()
Expand Down
7 changes: 6 additions & 1 deletion br/pkg/lightning/common/storage_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,18 @@ import (
"syscall"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"golang.org/x/sys/unix"
)

// GetStorageSize gets storage's capacity and available size
func GetStorageSize(dir string) (size StorageSize, err error) {
var stat unix.Statfs_t
failpoint.Inject("GetStorageSize", func(val failpoint.Value) {
injectedSize := val.(int)
failpoint.Return(StorageSize{Capacity: uint64(injectedSize), Available: uint64(injectedSize)}, nil)
})

var stat unix.Statfs_t
err = unix.Statfs(dir, &stat)
if err != nil {
return size, errors.Annotatef(err, "cannot get disk capacity at %s", dir)
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/lightning/common/storage_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"unsafe"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
)

var (
Expand All @@ -33,6 +34,10 @@ var (

// GetStorageSize gets storage's capacity and available size
func GetStorageSize(dir string) (size StorageSize, err error) {
failpoint.Inject("GetStorageSize", func(val failpoint.Value) {
injectedSize := val.(int)
failpoint.Return(StorageSize{Capacity: uint64(injectedSize), Available: uint64(injectedSize)}, nil)
})
r, _, e := getDiskFreeSpaceExW.Call(
uintptr(unsafe.Pointer(syscall.StringToUTF16Ptr(dir))),
uintptr(unsafe.Pointer(&size.Available)),
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,7 @@ type TikvImporter struct {
DiskQuota ByteSize `toml:"disk-quota" json:"disk-quota"`
RangeConcurrency int `toml:"range-concurrency" json:"range-concurrency"`
DuplicateResolution DuplicateResolutionAlgorithm `toml:"duplicate-resolution" json:"duplicate-resolution"`
IncrementalImport bool `toml:"incremental-import" json:"incremental-import"`

EngineMemCacheSize ByteSize `toml:"engine-mem-cache-size" json:"engine-mem-cache-size"`
LocalWriterMemCacheSize ByteSize `toml:"local-writer-mem-cache-size" json:"local-writer-mem-cache-size"`
Expand Down
127 changes: 105 additions & 22 deletions br/pkg/lightning/restore/check_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,23 @@ package restore
import (
"bytes"
"context"
"database/sql"
"fmt"
"io"
"path/filepath"
"reflect"
"sort"
"strconv"
"strings"
"sync"

"github.com/docker/go-units"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"modernc.org/mathutil"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
Expand All @@ -38,16 +44,14 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
"github.com/pingcap/tidb/br/pkg/lightning/verification"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/types"
"github.com/tikv/pd/server/api"
pdconfig "github.com/tikv/pd/server/config"

"go.uber.org/zap"
"modernc.org/mathutil"
)

const (
Expand Down Expand Up @@ -464,33 +468,31 @@ func (rc *Controller) localResource(sourceSize int64) error {
if err != nil {
return errors.Trace(err)
}
localAvailable := storageSize.Available
localAvailable := int64(storageSize.Available)

var message string
var passed bool
switch {
case localAvailable > uint64(sourceSize):
case localAvailable > sourceSize:
message = fmt.Sprintf("local disk resources are rich, estimate sorted data size %s, local available is %s",
units.BytesSize(float64(sourceSize)), units.BytesSize(float64(localAvailable)))
passed = true
case int64(rc.cfg.TikvImporter.DiskQuota) > localAvailable:
message = fmt.Sprintf("local disk space may not enough to finish import, estimate sorted data size is %s,"+
" but local available is %s, please set `tikv-importer.disk-quota` to a smaller value than %s"+
" or change `mydumper.sorted-kv-dir` to another disk with enough space to finish imports",
units.BytesSize(float64(sourceSize)),
units.BytesSize(float64(localAvailable)), units.BytesSize(float64(localAvailable)))
passed = false
log.L().Error(message)
default:
if int64(rc.cfg.TikvImporter.DiskQuota) > int64(localAvailable) {
message = fmt.Sprintf("local disk space may not enough to finish import"+
"estimate sorted data size is %s, but local available is %s,"+
"you need a smaller number for tikv-importer.disk-quota (%s) to finish imports",
units.BytesSize(float64(sourceSize)),
units.BytesSize(float64(localAvailable)), units.BytesSize(float64(rc.cfg.TikvImporter.DiskQuota)))
passed = false
log.L().Error(message)
} else {
message = fmt.Sprintf("local disk space may not enough to finish import, "+
"estimate sorted data size is %s, but local available is %s,"+
"we will use disk-quota (size: %s) to finish imports, which may slow down import",
units.BytesSize(float64(sourceSize)),
units.BytesSize(float64(localAvailable)), units.BytesSize(float64(rc.cfg.TikvImporter.DiskQuota)))
passed = true
log.L().Warn(message)
}
message = fmt.Sprintf("local disk space may not enough to finish import, "+
"estimate sorted data size is %s, but local available is %s,"+
"we will use disk-quota (size: %s) to finish imports, which may slow down import",
units.BytesSize(float64(sourceSize)),
units.BytesSize(float64(localAvailable)), units.BytesSize(float64(rc.cfg.TikvImporter.DiskQuota)))
passed = true
log.L().Warn(message)
}
rc.checkTemplate.Collect(Critical, passed, message)
return nil
Expand Down Expand Up @@ -1055,3 +1057,84 @@ outloop:
log.L().Info("Sample source data", zap.String("table", tableMeta.Name), zap.Float64("IndexRatio", tableMeta.IndexRatio), zap.Bool("IsSourceOrder", tableMeta.IsRowOrdered))
return nil
}

func (rc *Controller) checkTableEmpty(ctx context.Context) error {
if rc.cfg.TikvImporter.Backend == config.BackendTiDB || rc.cfg.TikvImporter.IncrementalImport {
return nil
}
db, _ := rc.tidbGlue.GetDB()

tableCount := 0
for _, db := range rc.dbMetas {
tableCount += len(db.Tables)
}

var lock sync.Mutex
tableNames := make([]string, 0)
concurrency := utils.MinInt(tableCount, rc.cfg.App.RegionConcurrency)
ch := make(chan string, concurrency)
eg, gCtx := errgroup.WithContext(ctx)
for i := 0; i < concurrency; i++ {
eg.Go(func() error {
for tblName := range ch {
// skip tables that have checkpoint
if rc.cfg.Checkpoint.Enable {
_, err := rc.checkpointsDB.Get(gCtx, tblName)
switch {
case err == nil:
continue
case errors.IsNotFound(err):
default:
return errors.Trace(err)
}
}

hasData, err1 := tableContainsData(gCtx, db, tblName)
if err1 != nil {
return err1
}
if hasData {
lock.Lock()
tableNames = append(tableNames, tblName)
lock.Unlock()
}
}
return nil
})
}
for _, db := range rc.dbMetas {
for _, tbl := range db.Tables {
ch <- common.UniqueTable(tbl.DB, tbl.Name)
}
}
close(ch)
if err := eg.Wait(); err != nil {
if common.IsContextCanceledError(err) {
return nil
}
return errors.Trace(err)
}

if len(tableNames) > 0 {
// sort the failed names
sort.Strings(tableNames)
msg := fmt.Sprintf("table(s) [%s] are not empty", strings.Join(tableNames, ", "))
rc.checkTemplate.Collect(Critical, false, msg)
}
return nil
}

func tableContainsData(ctx context.Context, db utils.QueryExecutor, tableName string) (bool, error) {
query := "select 1 from " + tableName + " limit 1"
var dump int
err := db.QueryRowContext(ctx, query).Scan(&dump)

switch {
case err == sql.ErrNoRows:
return false, nil
case err != nil:
return false, errors.Trace(err)
default:
return true, nil
}
}
Loading

0 comments on commit 2d94321

Please sign in to comment.