Skip to content

Commit

Permalink
lightning: refactor checksum to reuse (#43082)
Browse files Browse the repository at this point in the history
ref #42930
  • Loading branch information
D3Hunter authored Apr 18, 2023
1 parent b71af03 commit ab086ae
Show file tree
Hide file tree
Showing 12 changed files with 217 additions and 171 deletions.
13 changes: 12 additions & 1 deletion br/pkg/lightning/backend/local/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "local",
srcs = [
"checksum.go",
"compress.go",
"disk_quota.go",
"duplicate.go",
Expand All @@ -20,6 +21,7 @@ go_library(
importpath = "github.com/pingcap/tidb/br/pkg/lightning/backend/local",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/checksum",
"//br/pkg/lightning/backend",
"//br/pkg/lightning/backend/encode",
"//br/pkg/lightning/backend/kv",
Expand Down Expand Up @@ -66,6 +68,7 @@ go_library(
"@com_github_pingcap_kvproto//pkg/kvrpcpb",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_kvproto//pkg/pdpb",
"@com_github_pingcap_tipb//go-tipb",
"@com_github_tikv_client_go_v2//error",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//tikv",
Expand All @@ -90,6 +93,7 @@ go_test(
name = "local_test",
timeout = "short",
srcs = [
"checksum_test.go",
"compress_test.go",
"disk_quota_test.go",
"duplicate_test.go",
Expand All @@ -104,11 +108,12 @@ go_test(
embed = [":local"],
flaky = True,
race = "on",
shard_count = 49,
shard_count = 50,
deps = [
"//br/pkg/lightning/backend",
"//br/pkg/lightning/backend/encode",
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/checkpoints",
"//br/pkg/lightning/common",
"//br/pkg/lightning/config",
"//br/pkg/lightning/log",
Expand All @@ -119,16 +124,19 @@ go_test(
"//br/pkg/restore/split",
"//br/pkg/utils",
"//ddl",
"//errno",
"//keyspace",
"//kv",
"//parser",
"//parser/ast",
"//parser/model",
"//parser/mysql",
"//sessionctx/stmtctx",
"//store/pdtypes",
"//table/tables",
"//tablecodec",
"//types",
"//util",
"//util/codec",
"//util/engine",
"//util/hack",
Expand All @@ -138,6 +146,7 @@ go_test(
"@com_github_coreos_go_semver//semver",
"@com_github_data_dog_go_sqlmock//:go-sqlmock",
"@com_github_docker_go_units//:go-units",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_golang_mock//gomock",
"@com_github_google_uuid//:uuid",
"@com_github_pingcap_errors//:errors",
Expand All @@ -146,7 +155,9 @@ go_test(
"@com_github_pingcap_kvproto//pkg/import_sstpb",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_kvproto//pkg/pdpb",
"@com_github_pingcap_tipb//go-tipb",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_pd_client//:client",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//codes",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package importer
package local

import (
"container/heap"
Expand All @@ -28,10 +28,8 @@ import (
"github.com/pingcap/tidb/br/pkg/checksum"
"github.com/pingcap/tidb/br/pkg/lightning/checkpoints"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/lightning/metric"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tipb/go-tipb"
Expand All @@ -43,8 +41,8 @@ import (

const (
preUpdateServiceSafePointFactor = 3

maxErrorRetryCount = 3
maxErrorRetryCount = 3
defaultGCLifeTime = 100 * time.Hour
)

var (
Expand All @@ -67,42 +65,16 @@ type ChecksumManager interface {
Checksum(ctx context.Context, tableInfo *checkpoints.TidbTableInfo) (*RemoteChecksum, error)
}

func newChecksumManager(ctx context.Context, rc *Controller, store kv.Storage) (ChecksumManager, error) {
// if we don't need checksum, just return nil
if rc.cfg.TikvImporter.Backend == config.BackendTiDB || rc.cfg.PostRestore.Checksum == config.OpLevelOff {
return nil, nil
}

pdAddr := rc.cfg.TiDB.PdAddr
pdVersion, err := pdutil.FetchPDVersion(ctx, rc.tls, pdAddr)
if err != nil {
return nil, errors.Trace(err)
}

// for v4.0.0 or upper, we can use the gc ttl api
var manager ChecksumManager
if pdVersion.Major >= 4 {
tlsOpt := rc.tls.ToPDSecurityOption()
pdCli, err := pd.NewClientWithContext(ctx, []string{pdAddr}, tlsOpt)
if err != nil {
return nil, errors.Trace(err)
}

manager = newTiKVChecksumManager(store.GetClient(), pdCli, uint(rc.cfg.TiDB.DistSQLScanConcurrency))
} else {
manager = newTiDBChecksumExecutor(rc.db)
}

return manager, nil
}

// fetch checksum for tidb sql client
type tidbChecksumExecutor struct {
db *sql.DB
manager *gcLifeTimeManager
}

func newTiDBChecksumExecutor(db *sql.DB) *tidbChecksumExecutor {
var _ ChecksumManager = (*tidbChecksumExecutor)(nil)

// NewTiDBChecksumExecutor creates a new tidb checksum executor.
func NewTiDBChecksumExecutor(db *sql.DB) ChecksumManager {
return &tidbChecksumExecutor{
db: db,
manager: newGCLifeTimeManager(),
Expand Down Expand Up @@ -144,26 +116,6 @@ func (e *tidbChecksumExecutor) Checksum(ctx context.Context, tableInfo *checkpoi
return &cs, nil
}

// DoChecksum do checksum for tables.
// table should be in <db>.<table>, format. e.g. foo.bar
func DoChecksum(ctx context.Context, table *checkpoints.TidbTableInfo) (*RemoteChecksum, error) {
var err error
manager, ok := ctx.Value(&checksumManagerKey).(ChecksumManager)
if !ok {
return nil, errors.New("No gcLifeTimeManager found in context, check context initialization")
}

task := log.FromContext(ctx).With(zap.String("table", table.Name)).Begin(zap.InfoLevel, "remote checksum")

cs, err := manager.Checksum(ctx, table)
dur := task.End(zap.ErrorLevel, err)
if m, ok := metric.FromContext(ctx); ok {
m.ChecksumSecondsHistogram.Observe(dur.Seconds())
}

return cs, err
}

type gcLifeTimeManager struct {
runningJobsLock sync.Mutex
runningJobs int
Expand All @@ -184,7 +136,7 @@ func (m *gcLifeTimeManager) addOneJob(ctx context.Context, db *sql.DB) error {
defer m.runningJobsLock.Unlock()

if m.runningJobs == 0 {
oriGCLifeTime, err := ObtainGCLifeTime(ctx, db)
oriGCLifeTime, err := obtainGCLifeTime(ctx, db)
if err != nil {
return err
}
Expand All @@ -208,7 +160,7 @@ func (m *gcLifeTimeManager) removeOneJob(ctx context.Context, db *sql.DB) {

m.runningJobs--
if m.runningJobs == 0 {
err := UpdateGCLifeTime(ctx, db, m.oriGCLifeTime)
err := updateGCLifeTime(ctx, db, m.oriGCLifeTime)
if err != nil {
query := fmt.Sprintf(
"UPDATE mysql.tidb SET VARIABLE_VALUE = '%s' WHERE VARIABLE_NAME = 'tikv_gc_life_time'",
Expand Down Expand Up @@ -239,7 +191,7 @@ func increaseGCLifeTime(ctx context.Context, manager *gcLifeTimeManager, db *sql
}

if increaseGCLifeTime {
err = UpdateGCLifeTime(ctx, db, defaultGCLifeTime.String())
err = updateGCLifeTime(ctx, db, defaultGCLifeTime.String())
if err != nil {
return err
}
Expand All @@ -250,22 +202,49 @@ func increaseGCLifeTime(ctx context.Context, manager *gcLifeTimeManager, db *sql
return nil
}

type tikvChecksumManager struct {
// obtainGCLifeTime obtains the current GC lifetime.
func obtainGCLifeTime(ctx context.Context, db *sql.DB) (string, error) {
var gcLifeTime string
err := common.SQLWithRetry{DB: db, Logger: log.FromContext(ctx)}.QueryRow(
ctx,
"obtain GC lifetime",
"SELECT VARIABLE_VALUE FROM mysql.tidb WHERE VARIABLE_NAME = 'tikv_gc_life_time'",
&gcLifeTime,
)
return gcLifeTime, err
}

// updateGCLifeTime updates the current GC lifetime.
func updateGCLifeTime(ctx context.Context, db *sql.DB, gcLifeTime string) error {
sql := common.SQLWithRetry{
DB: db,
Logger: log.FromContext(ctx).With(zap.String("gcLifeTime", gcLifeTime)),
}
return sql.Exec(ctx, "update GC lifetime",
"UPDATE mysql.tidb SET VARIABLE_VALUE = ? WHERE VARIABLE_NAME = 'tikv_gc_life_time'",
gcLifeTime,
)
}

// TiKVChecksumManager is a manager that can compute checksum of a table using TiKV.
type TiKVChecksumManager struct {
client kv.Client
manager gcTTLManager
distSQLScanConcurrency uint
}

// newTiKVChecksumManager return a new tikv checksum manager
func newTiKVChecksumManager(client kv.Client, pdClient pd.Client, distSQLScanConcurrency uint) *tikvChecksumManager {
return &tikvChecksumManager{
var _ ChecksumManager = &TiKVChecksumManager{}

// NewTiKVChecksumManager return a new tikv checksum manager
func NewTiKVChecksumManager(client kv.Client, pdClient pd.Client, distSQLScanConcurrency uint) *TiKVChecksumManager {
return &TiKVChecksumManager{
client: client,
manager: newGCTTLManager(pdClient),
distSQLScanConcurrency: distSQLScanConcurrency,
}
}

func (e *tikvChecksumManager) checksumDB(ctx context.Context, tableInfo *checkpoints.TidbTableInfo, ts uint64) (*RemoteChecksum, error) {
func (e *TiKVChecksumManager) checksumDB(ctx context.Context, tableInfo *checkpoints.TidbTableInfo, ts uint64) (*RemoteChecksum, error) {
executor, err := checksum.NewExecutorBuilder(tableInfo.Core, ts).
SetConcurrency(e.distSQLScanConcurrency).
Build()
Expand Down Expand Up @@ -307,7 +286,8 @@ func (e *tikvChecksumManager) checksumDB(ctx context.Context, tableInfo *checkpo
return nil, err
}

func (e *tikvChecksumManager) Checksum(ctx context.Context, tableInfo *checkpoints.TidbTableInfo) (*RemoteChecksum, error) {
// Checksum implements the ChecksumManager interface.
func (e *TiKVChecksumManager) Checksum(ctx context.Context, tableInfo *checkpoints.TidbTableInfo) (*RemoteChecksum, error) {
tbl := common.UniqueTable(tableInfo.DB, tableInfo.Name)
physicalTS, logicalTS, err := e.manager.pdClient.GetTS(ctx)
if err != nil {
Expand Down
Loading

0 comments on commit ab086ae

Please sign in to comment.