Skip to content

Commit

Permalink
Merge branch 'master' into supportOuterJoinLeadingHint
Browse files Browse the repository at this point in the history
  • Loading branch information
qw4990 authored Jun 22, 2022
2 parents 7bc62e4 + 38ac198 commit 96d796d
Show file tree
Hide file tree
Showing 264 changed files with 22,975 additions and 14,142 deletions.
2 changes: 1 addition & 1 deletion .github/licenserc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ header:
- '**/*.result'
- '**/*.example'
- '**/*.patch'
- 'DEPS.bzl'
- '**/*.bzl'
- '.codecov.yml'
- 'Jenkinsfile'
- '.editorconfig'
Expand Down
8 changes: 4 additions & 4 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ def go_deps():
name = "com_github_blacktear23_go_proxyprotocol",
build_file_proto_mode = "disable_global",
importpath = "github.com/blacktear23/go-proxyprotocol",
sum = "h1:rQlvB2AYWme2bIB18r/SipGiMEVJYE9U0z+MGoU/LtQ=",
version = "v0.0.0-20180807104634-af7a81e8dd0d",
sum = "h1:WmMmtZanGEfIHnJN9N3A4Pl6mM69D+GxEph2eOaCf7g=",
version = "v1.0.0",
)
go_repository(
name = "com_github_burntsushi_toml",
Expand Down Expand Up @@ -1914,8 +1914,8 @@ def go_deps():
name = "com_github_pingcap_tipb",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/tipb",
sum = "h1:+46isFI9fR9R+nJVDMI55tCC/TCwp+bvVA4HLGEv1rY=",
version = "v0.0.0-20220314125451-bfb5c2c55188",
sum = "h1:L4nZwfYSrIsWPAZR8zMwHaNQJy0Rjy3Od6Smj5mlOms=",
version = "v0.0.0-20220602075447-4847c5d68e73",
)
go_repository(
name = "com_github_pkg_browser",
Expand Down
14 changes: 11 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@ buildsucc:

all: dev server benchkv

parser:
@echo "remove this command later, when our CI script doesn't call it"

dev: checklist check explaintest gogenerate br_unit_test test_part_parser_dev ut
@>&2 echo "Great, all tests passed."

Expand Down Expand Up @@ -96,6 +93,9 @@ test_part_parser: parser_yacc test_part_parser_dev

test_part_parser_dev: parser_fmt parser_unit_test

parser:
@cd parser && make parser

parser_yacc:
@cd parser && mv parser.go parser.go.committed && make parser && diff -u parser.go.committed parser.go && rm parser.go.committed

Expand Down Expand Up @@ -327,6 +327,14 @@ build_for_br_integration_test:
) || (make failpoint-disable && exit 1)
@make failpoint-disable

build_for_lightning_test:
@make failpoint-enable
$(GOTEST) -c -cover -covermode=count \
-coverpkg=github.com/pingcap/tidb/br/... \
-o $(LIGHTNING_BIN).test \
github.com/pingcap/tidb/br/cmd/tidb-lightning
@make failpoint-disable

br_unit_test: export ARGS=$$($(BR_PACKAGES))
br_unit_test:
@make failpoint-enable
Expand Down
10 changes: 8 additions & 2 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,22 @@ http_archive(
)

load("@io_bazel_rules_go//go:deps.bzl", "go_register_toolchains", "go_rules_dependencies")

load("//:DEPS.bzl", "go_deps")
load("//build:lint.bzl", "nogo_deps")

# gazelle:repository_macro DEPS.bzl%go_deps
go_deps()

nogo_deps()

load("@bazel_gazelle//:deps.bzl", "gazelle_dependencies", "go_repository")

go_rules_dependencies()

go_register_toolchains(version = "1.18.3")
go_register_toolchains(
nogo = "@//build:tidb_nogo",
version = "1.18.3",
)

gazelle_dependencies()

Expand Down
16 changes: 8 additions & 8 deletions br/pkg/lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ func makeTag(tableName string, engineID int32) string {
return fmt.Sprintf("%s:%d", tableName, engineID)
}

func makeLogger(tag string, engineUUID uuid.UUID) log.Logger {
return log.With(
func makeLogger(logger log.Logger, tag string, engineUUID uuid.UUID) log.Logger {
return logger.With(
zap.String("engineTag", tag),
zap.Stringer("engineUUID", engineUUID),
)
Expand Down Expand Up @@ -143,7 +143,7 @@ type AbstractBackend interface {
ShouldPostProcess() bool

// NewEncoder creates an encoder of a TiDB table.
NewEncoder(tbl table.Table, options *kv.SessionOptions) (kv.Encoder, error)
NewEncoder(ctx context.Context, tbl table.Table, options *kv.SessionOptions) (kv.Encoder, error)

OpenEngine(ctx context.Context, config *EngineConfig, engineUUID uuid.UUID) error

Expand Down Expand Up @@ -260,8 +260,8 @@ func (be Backend) MakeEmptyRows() kv.Rows {
return be.abstract.MakeEmptyRows()
}

func (be Backend) NewEncoder(tbl table.Table, options *kv.SessionOptions) (kv.Encoder, error) {
return be.abstract.NewEncoder(tbl, options)
func (be Backend) NewEncoder(ctx context.Context, tbl table.Table, options *kv.SessionOptions) (kv.Encoder, error) {
return be.abstract.NewEncoder(ctx, tbl, options)
}

func (be Backend) ShouldPostProcess() bool {
Expand Down Expand Up @@ -321,7 +321,7 @@ func (be Backend) UnsafeImportAndReset(ctx context.Context, engineUUID uuid.UUID
closedEngine := ClosedEngine{
engine: engine{
backend: be.abstract,
logger: makeLogger("<import-and-reset>", engineUUID),
logger: makeLogger(log.FromContext(ctx), "<import-and-reset>", engineUUID),
uuid: engineUUID,
},
}
Expand All @@ -334,7 +334,7 @@ func (be Backend) UnsafeImportAndReset(ctx context.Context, engineUUID uuid.UUID
// OpenEngine opens an engine with the given table name and engine ID.
func (be Backend) OpenEngine(ctx context.Context, config *EngineConfig, tableName string, engineID int32) (*OpenedEngine, error) {
tag, engineUUID := MakeUUID(tableName, engineID)
logger := makeLogger(tag, engineUUID)
logger := makeLogger(log.FromContext(ctx), tag, engineUUID)

if err := be.abstract.OpenEngine(ctx, config, engineUUID); err != nil {
return nil, err
Expand Down Expand Up @@ -437,7 +437,7 @@ func (be Backend) UnsafeCloseEngine(ctx context.Context, cfg *EngineConfig, tabl
func (be Backend) UnsafeCloseEngineWithUUID(ctx context.Context, cfg *EngineConfig, tag string, engineUUID uuid.UUID) (*ClosedEngine, error) {
return engine{
backend: be.abstract,
logger: makeLogger(tag, engineUUID),
logger: makeLogger(log.FromContext(ctx), tag, engineUUID),
uuid: engineUUID,
}.unsafeClose(ctx, cfg)
}
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,9 +326,9 @@ func TestNewEncoder(t *testing.T) {

encoder := mock.NewMockEncoder(s.controller)
options := &kv.SessionOptions{SQLMode: mysql.ModeANSIQuotes, Timestamp: 1234567890}
s.mockBackend.EXPECT().NewEncoder(nil, options).Return(encoder, nil)
s.mockBackend.EXPECT().NewEncoder(nil, nil, options).Return(encoder, nil)

realEncoder, err := s.mockBackend.NewEncoder(nil, options)
realEncoder, err := s.mockBackend.NewEncoder(nil, nil, options)
require.Equal(t, realEncoder, encoder)
require.NoError(t, err)
}
Expand Down
10 changes: 8 additions & 2 deletions br/pkg/lightning/backend/kv/kv2sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package kv
import (
"fmt"

"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/table"
Expand Down Expand Up @@ -104,8 +105,13 @@ func (t *TableKVDecoder) IterRawIndexKeys(h kv.Handle, rawRow []byte, fn func([]
return nil
}

func NewTableKVDecoder(tbl table.Table, tableName string, options *SessionOptions) (*TableKVDecoder, error) {
se := newSession(options)
func NewTableKVDecoder(
tbl table.Table,
tableName string,
options *SessionOptions,
logger log.Logger,
) (*TableKVDecoder, error) {
se := newSession(options, logger)
cols := tbl.Cols()
// Set CommonAddRecordCtx to session to reuse the slices and BufStore in AddRecord
recordCtx := tables.NewCommonAddRecordCtx(len(cols))
Expand Down
10 changes: 5 additions & 5 deletions br/pkg/lightning/backend/kv/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,11 +246,11 @@ type SessionOptions struct {
}

// NewSession creates a new trimmed down Session matching the options.
func NewSession(options *SessionOptions) sessionctx.Context {
return newSession(options)
func NewSession(options *SessionOptions, logger log.Logger) sessionctx.Context {
return newSession(options, logger)
}

func newSession(options *SessionOptions) *session {
func newSession(options *SessionOptions, logger log.Logger) *session {
sqlMode := options.SQLMode
vars := variable.NewSessionVars()
vars.SkipUTF8Check = true
Expand All @@ -265,15 +265,15 @@ func newSession(options *SessionOptions) *session {
if options.SysVars != nil {
for k, v := range options.SysVars {
if err := vars.SetSystemVar(k, v); err != nil {
log.L().DPanic("new session: failed to set system var",
logger.DPanic("new session: failed to set system var",
log.ShortError(err),
zap.String("key", k))
}
}
}
vars.StmtCtx.TimeZone = vars.Location()
if err := vars.SetSystemVar("timestamp", strconv.FormatInt(options.Timestamp, 10)); err != nil {
log.L().Warn("new session: failed to set timestamp",
logger.Warn("new session: failed to set timestamp",
log.ShortError(err))
}
vars.TxnCtx = nil
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/lightning/backend/kv/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ package kv
import (
"testing"

"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/parser/mysql"
"github.com/stretchr/testify/require"
)

func TestSession(t *testing.T) {
session := newSession(&SessionOptions{SQLMode: mysql.ModeNone, Timestamp: 1234567890})
session := newSession(&SessionOptions{SQLMode: mysql.ModeNone, Timestamp: 1234567890}, log.L())
_, err := session.Txn(true)
require.NoError(t, err)
}
59 changes: 56 additions & 3 deletions br/pkg/lightning/backend/kv/sql2kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,18 @@ type tableKVEncoder struct {
metrics *metric.Metrics
}

func NewTableKVEncoder(tbl table.Table, options *SessionOptions, metrics *metric.Metrics) (Encoder, error) {
func NewTableKVEncoder(
tbl table.Table,
options *SessionOptions,
metrics *metric.Metrics,
logger log.Logger,
) (Encoder, error) {
if metrics != nil {
metrics.KvEncoderCounter.WithLabelValues("open").Inc()
}
meta := tbl.Meta()
cols := tbl.Cols()
se := newSession(options)
se := newSession(options, logger)
// Set CommonAddRecordCtx to session to reuse the slices and BufStore in AddRecord
recordCtx := tables.NewCommonAddRecordCtx(len(cols))
tables.SetAddRecordCtx(se, recordCtx)
Expand Down Expand Up @@ -267,7 +272,7 @@ func logKVConvertFailed(logger log.Logger, row []types.Datum, j int, colInfo *mo
log.ShortError(err),
)

log.L().Error("failed to covert kv value", logutil.RedactAny("origVal", original.GetValue()),
logger.Error("failed to convert kv value", logutil.RedactAny("origVal", original.GetValue()),
zap.Stringer("fieldType", &colInfo.FieldType), zap.String("column", colInfo.Name.O),
zap.Int("columnID", j+1))
return errors.Annotatef(
Expand Down Expand Up @@ -445,6 +450,49 @@ func isPKCol(colInfo *model.ColumnInfo) bool {
return mysql.HasPriKeyFlag(colInfo.GetFlag())
}

func isRowIDOverflow(meta *model.ColumnInfo, rowID int64) bool {
isUnsigned := mysql.HasUnsignedFlag(meta.GetFlag())
switch meta.GetType() {
// MEDIUM INT
case mysql.TypeInt24:
if !isUnsigned {
return rowID > mysql.MaxInt24
}
return rowID > mysql.MaxUint24
// INT
case mysql.TypeLong:
if !isUnsigned {
return rowID > math.MaxInt32
}
return rowID > math.MaxUint32
// SMALLINT
case mysql.TypeShort:
if !isUnsigned {
return rowID > math.MaxInt16
}
return rowID > math.MaxUint16
// TINYINT
case mysql.TypeTiny:
if !isUnsigned {
return rowID > math.MaxInt8
}
return rowID > math.MaxUint8
// FLOAT
case mysql.TypeFloat:
if !isUnsigned {
return float32(rowID) > math.MaxFloat32
}
return float64(rowID) > math.MaxFloat32*2
// DOUBLE
case mysql.TypeDouble:
if !isUnsigned {
return float64(rowID) > math.MaxFloat64
}
// impossible for rowID exceeding MaxFloat64
}
return false
}

func (kvcodec *tableKVEncoder) getActualDatum(rowID int64, colIndex int, inputDatum *types.Datum) (types.Datum, error) {
var (
value types.Datum
Expand Down Expand Up @@ -472,6 +520,11 @@ func (kvcodec *tableKVEncoder) getActualDatum(rowID int64, colIndex int, inputDa
// handle special values
switch {
case isAutoIncCol(col.ToInfo()):
// rowID is going to auto-filled the omitted column,
// which should be checked before restore
if isRowIDOverflow(col.ToInfo(), rowID) {
return value, errors.Errorf("PK %d is out of range", rowID)
}
// we still need a conversion, e.g. to catch overflow with a TINYINT column.
value, err = table.CastValue(kvcodec.se, types.NewIntDatum(rowID), col.ToInfo(), false, false)
case isTableAutoRandom(tblMeta) && isPKCol(col.ToInfo()):
Expand Down
Loading

0 comments on commit 96d796d

Please sign in to comment.