Skip to content

Commit

Permalink
Merge branch 'master' into handle-data-race
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkingrei committed Apr 8, 2022
2 parents e870a77 + b8d65ce commit b26e6a1
Show file tree
Hide file tree
Showing 24 changed files with 8,784 additions and 8,324 deletions.
6 changes: 4 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,11 @@ gotest_unstable_in_verify_ci: tools/bin/xprog tools/bin/ut failpoint-enable
@$(CLEAN_UT_BINARY)

race: failpoint-enable
@export log_level=debug; \
$(GOTEST) -timeout 25m -race $(PACKAGES) || { $(FAILPOINT_DISABLE); exit 1; }
@mkdir -p $(TEST_COVERAGE_DIR)
@export TZ='Asia/Shanghai'; \
tools/bin/ut --race --junitfile "$(TEST_COVERAGE_DIR)/tidb-junit-report.xml" --coverprofile "$(TEST_COVERAGE_DIR)/tidb_cov.unit_test" --except unstable.txt || { $(FAILPOINT_DISABLE); exit 1; }
@$(FAILPOINT_DISABLE)
@$(CLEAN_UT_BINARY)

leak: failpoint-enable
@export log_level=debug; \
Expand Down
16 changes: 13 additions & 3 deletions br/pkg/lightning/backend/local/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,8 +426,15 @@ func getSizeProperties(logger log.Logger, db *pebble.DB, keyAdapter KeyAdapter)
}

func (e *Engine) getEngineFileSize() backend.EngineFileSize {
metrics := e.db.Metrics()
total := metrics.Total()
e.mutex.RLock()
db := e.db
e.mutex.RUnlock()

var total pebble.LevelMetrics
if db != nil {
metrics := db.Metrics()
total = metrics.Total()
}
var memSize int64
e.localWriters.Range(func(k, v interface{}) bool {
w := k.(*Writer)
Expand Down Expand Up @@ -524,7 +531,6 @@ func (e *Engine) ingestSSTLoop() {
for i := 0; i < concurrency; i++ {
e.wg.Add(1)
go func() {
defer e.wg.Done()
defer func() {
if e.ingestErr.Get() != nil {
seqLock.Lock()
Expand All @@ -534,6 +540,7 @@ func (e *Engine) ingestSSTLoop() {
flushQueue = flushQueue[:0]
seqLock.Unlock()
}
e.wg.Done()
}()
for {
select {
Expand Down Expand Up @@ -1471,5 +1478,8 @@ func (i dbSSTIngester) ingest(metas []*sstMeta) error {
for _, m := range metas {
paths = append(paths, m.path)
}
if i.e.db == nil {
return errorEngineClosed
}
return i.e.db.Ingest(paths)
}
85 changes: 85 additions & 0 deletions br/pkg/lightning/backend/local/engine_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package local

import (
"context"
"fmt"
"math"
"os"
"path"
"path/filepath"
"testing"

"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/sstable"
"github.com/google/uuid"
"github.com/stretchr/testify/require"

"github.com/pingcap/tidb/br/pkg/lightning/backend"
)

func TestIngestSSTWithClosedEngine(t *testing.T) {
dir := t.TempDir()
opt := &pebble.Options{
MemTableSize: 1024 * 1024,
MaxConcurrentCompactions: 16,
L0CompactionThreshold: math.MaxInt32, // set to max try to disable compaction
L0StopWritesThreshold: math.MaxInt32, // set to max try to disable compaction
DisableWAL: true,
ReadOnly: false,
}
db, err := pebble.Open(filepath.Join(dir, "test"), opt)
require.NoError(t, err)
tmpPath := filepath.Join(dir, "test.sst")
err = os.Mkdir(tmpPath, 0o755)
require.NoError(t, err)

_, engineUUID := backend.MakeUUID("ww", 0)
engineCtx, cancel := context.WithCancel(context.Background())
f := &Engine{
db: db,
UUID: engineUUID,
sstDir: tmpPath,
ctx: engineCtx,
cancel: cancel,
sstMetasChan: make(chan metaOrFlush, 64),
keyAdapter: noopKeyAdapter{},
}
f.sstIngester = dbSSTIngester{e: f}
sstPath := path.Join(tmpPath, uuid.New().String()+".sst")
file, err := os.Create(sstPath)
require.NoError(t, err)
w := sstable.NewWriter(file, sstable.WriterOptions{})
for i := 0; i < 10; i++ {
require.NoError(t, w.Add(sstable.InternalKey{
Trailer: uint64(sstable.InternalKeyKindSet),
UserKey: []byte(fmt.Sprintf("key%d", i)),
}, nil))
}
require.NoError(t, w.Close())

require.NoError(t, f.ingestSSTs([]*sstMeta{
{
path: sstPath,
},
}))
require.NoError(t, f.Close())
require.ErrorIs(t, f.ingestSSTs([]*sstMeta{
{
path: sstPath,
},
}), errorEngineClosed)
}
120 changes: 120 additions & 0 deletions ddl/db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/pingcap/tidb/util/admin"
"github.com/pingcap/tidb/util/gcutil"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -1991,3 +1992,122 @@ func (s *stateChangeSuite) TestRestrainDropColumnWithIndex() {
tk.MustExec("alter table t drop column a;")
tk.MustExec("drop table if exists t;")
}

func TestParallelRenameTable(t *testing.T) {
store, d, clean := testkit.CreateMockStoreAndDomain(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create database test2")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int default 0, b int default 0, key idx((b+1)))")
tk1 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")
tk3 := testkit.NewTestKit(t, store)
tk3.MustExec("use test")

var concurrentDDLQueryPre string
var concurrentDDLQuery string
firstDDL := true

var wg sync.WaitGroup
var checkErr error
d2 := d.DDL()
originalCallback := d2.GetHook()
defer d2.SetHook(originalCallback)
callback := &ddl.TestDDLCallback{Do: d}
callback.OnJobRunBeforeExported = func(job *model.Job) {
switch job.SchemaState {
case model.StateNone:
if firstDDL {
firstDDL = false
} else {
return
}
wg.Add(1)
go func() {
if concurrentDDLQueryPre != "" {
wg.Add(1)
go func() {
// We assume that no error, we don't want to test it.
tk3.MustExec(concurrentDDLQueryPre)
wg.Done()
}()
time.Sleep(10 * time.Millisecond)
}
_, err := tk1.Exec(concurrentDDLQuery)
if err != nil {
checkErr = err
}
wg.Done()
}()
time.Sleep(10 * time.Millisecond)
}
}

d2.SetHook(callback)

// rename then add column
concurrentDDLQuery = "alter table t add column g int"
tk.MustExec("rename table t to t1")
wg.Wait()
require.Error(t, checkErr)
require.True(t, strings.Contains(checkErr.Error(), "Table 'test.t' doesn't exist"), checkErr.Error())
tk.MustExec("rename table t1 to t")
checkErr = nil

// rename then add column, but rename to other database
concurrentDDLQuery = "alter table t add column g int"
firstDDL = true
tk.MustExec("rename table t to test2.t1")
wg.Wait()
require.Error(t, checkErr)
// [schema:1146]Table '(Schema ID 1).(Table ID 65)' doesn't exist
require.True(t, strings.Contains(checkErr.Error(), "doesn't exist"), checkErr.Error())
tk.MustExec("rename table test2.t1 to test.t")
checkErr = nil

// rename then add column, but rename to other database and create same name table
concurrentDDLQuery = "alter table t add column g int"
firstDDL = true
tk.MustExec("rename table t to test2.t1")
concurrentDDLQueryPre = "create table t(a int)"
wg.Wait()
require.Error(t, checkErr)
// [schema:1146]Table '(Schema ID 1).(Table ID 65)' doesn't exist
require.True(t, strings.Contains(checkErr.Error(), "doesn't exist"), checkErr.Error())
tk.MustExec("rename table test2.t1 to test.t")
concurrentDDLQueryPre = ""
checkErr = nil

// rename then rename
concurrentDDLQuery = "rename table t to t2"
firstDDL = true
tk.MustExec("rename table t to t1")
wg.Wait()
require.Error(t, checkErr)
require.True(t, strings.Contains(checkErr.Error(), "Table 'test.t' doesn't exist"), checkErr.Error())
tk.MustExec("rename table t1 to t")
checkErr = nil

// rename then rename, but rename to other database
concurrentDDLQuery = "rename table t to t2"
firstDDL = true
tk.MustExec("rename table t to test2.t1")
wg.Wait()
require.Error(t, checkErr)
require.True(t, strings.Contains(checkErr.Error(), "doesn't exist"), checkErr.Error())
tk.MustExec("rename table test2.t1 to test.t")
checkErr = nil

// renames then add index on one table
tk.MustExec("create table t2(a int)")
tk.MustExec("create table t3(a int)")
concurrentDDLQuery = "alter table t add index(a)"
firstDDL = true
tk.MustExec("rename table t to tt, t2 to tt2, t3 to tt3")
wg.Wait()
require.Error(t, checkErr)
require.True(t, strings.Contains(checkErr.Error(), "Table 'test.t' doesn't exist"), checkErr.Error())
tk.MustExec("rename table tt to t")
}
42 changes: 42 additions & 0 deletions ddl/db_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2031,6 +2031,48 @@ func TestDefaultValueIsString(t *testing.T) {
require.Equal(t, "1", tbl.Meta().Columns[0].DefaultValue)
}

func TestDefaultColumnWithRand(t *testing.T) {
// Related issue: https://github.com/pingcap/tidb/issues/10377
store, clean := testkit.CreateMockStoreWithSchemaLease(t, testLease)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t, t1, t2")

// create table
tk.MustExec("create table t (c int(10), c1 int default (rand()))")
tk.MustExec("create table t1 (c int, c1 double default (rand()))")
tk.MustExec("create table t2 (c int, c1 double default (rand(1)))")

// add column with default rand() for table t is forbidden in MySQL 8.0
tk.MustGetErrCode("alter table t add column c2 double default (rand(2))", errno.ErrBinlogUnsafeSystemFunction)
tk.MustGetErrCode("alter table t add column c3 int default ((rand()))", errno.ErrBinlogUnsafeSystemFunction)
tk.MustGetErrCode("alter table t add column c4 int default (((rand(3))))", errno.ErrBinlogUnsafeSystemFunction)

// insert records
tk.MustExec("insert into t(c) values (1),(2),(3)")
tk.MustExec("insert into t1(c) values (1),(2),(3)")
tk.MustExec("insert into t2(c) values (1),(2),(3)")

queryStmts := []string{
"SELECT c1 from t",
"SELECT c1 from t1",
"SELECT c1 from t2",
}
for _, queryStmt := range queryStmts {
r := tk.MustQuery(queryStmt).Rows()
for _, row := range r {
d, ok := row[0].(float64)
if ok {
require.True(t, 0.0 <= d && d < 1.0, "rand() return a random floating-point value in the range 0 <= v < 1.0.")
}
}
}

// use a non-existent function name
tk.MustGetErrCode("CREATE TABLE t3 (c int, c1 int default a_function_not_supported_yet());", errno.ErrDefValGeneratedNamedFunctionIsNotAllowed)
}

func TestChangingDBCharset(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
Expand Down
5 changes: 0 additions & 5 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,11 +241,6 @@ func TestIssue22819(t *testing.T) {

tk1 := testkit.NewTestKit(t, store)
tk1.MustExec("use test;")
tk1.MustExec("drop table if exists t1;")
defer func() {
tk1.MustExec("drop table if exists t1;")
}()

tk1.MustExec("create table t1 (v int) partition by hash (v) partitions 2")
tk1.MustExec("insert into t1 values (1)")

Expand Down
Loading

0 comments on commit b26e6a1

Please sign in to comment.