Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into fix-default-usage
Browse files Browse the repository at this point in the history
  • Loading branch information
morgo committed Dec 9, 2021
2 parents a20e69c + fbcf757 commit 5d05315
Show file tree
Hide file tree
Showing 59 changed files with 2,269 additions and 1,406 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ gotest_in_verify_ci_part_2: failpoint-enable tools/bin/gotestsum tools/bin/gocov

race: failpoint-enable
@export log_level=debug; \
$(GOTEST) -timeout 20m -race $(PACKAGES) || { $(FAILPOINT_DISABLE); exit 1; }
$(GOTEST) -timeout 25m -race $(PACKAGES) || { $(FAILPOINT_DISABLE); exit 1; }
@$(FAILPOINT_DISABLE)

leak: failpoint-enable
Expand Down
29 changes: 21 additions & 8 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,27 +552,37 @@ type Security struct {
KeyPath string `toml:"key-path" json:"key-path"`
// RedactInfoLog indicates that whether enabling redact log
RedactInfoLog bool `toml:"redact-info-log" json:"redact-info-log"`

// TLSConfigName is used to set tls config for lightning in DM, so we don't expose this field to user
// DM may running many lightning instances at same time, so we need to set different tls config name for each lightning
TLSConfigName string `toml:"-" json:"-"`
}

// RegistersMySQL registers (or deregisters) the TLS config with name "cluster"
// RegisterMySQL registers the TLS config with name "cluster" or security.TLSConfigName
// for use in `sql.Open()`. This method is goroutine-safe.
func (sec *Security) RegisterMySQL() error {
if sec == nil {
return nil
}
tlsConfig, err := common.ToTLSConfig(sec.CAPath, sec.CertPath, sec.KeyPath)
switch {
case err != nil:
if err != nil {
return errors.Trace(err)
case tlsConfig != nil:
}
if tlsConfig != nil {
// error happens only when the key coincides with the built-in names.
_ = gomysql.RegisterTLSConfig("cluster", tlsConfig)
default:
gomysql.DeregisterTLSConfig("cluster")
_ = gomysql.RegisterTLSConfig(sec.TLSConfigName, tlsConfig)
}
return nil
}

// DeregisterMySQL deregisters the TLS config with security.TLSConfigName
func (sec *Security) DeregisterMySQL() {
if sec == nil || len(sec.CAPath) == 0 {
return
}
gomysql.DeregisterTLSConfig(sec.TLSConfigName)
}

// A duration which can be deserialized from a TOML string.
// Implemented as https://github.com/BurntSushi/toml#using-the-encodingtextunmarshaler-interface
type Duration struct {
Expand Down Expand Up @@ -1124,7 +1134,10 @@ func (cfg *Config) CheckAndAdjustSecurity() error {
switch cfg.TiDB.TLS {
case "":
if len(cfg.TiDB.Security.CAPath) > 0 {
cfg.TiDB.TLS = "cluster"
if cfg.TiDB.Security.TLSConfigName == "" {
cfg.TiDB.Security.TLSConfigName = "cluster" // adjust this the default value
}
cfg.TiDB.TLS = cfg.TiDB.Security.TLSConfigName
} else {
cfg.TiDB.TLS = "false"
}
Expand Down
7 changes: 7 additions & 0 deletions br/pkg/lightning/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,13 @@ func (s *configTestSuite) TestAdjustSecuritySection(c *C) {
c.Assert(cfg.TiDB.Security.CAPath, Equals, tc.expectedCA, comment)
c.Assert(cfg.TiDB.TLS, Equals, tc.expectedTLS, comment)
}
// test different tls config name
cfg := config.NewConfig()
assignMinimalLegalValue(cfg)
cfg.Security.CAPath = "/path/to/ca.pem"
cfg.Security.TLSConfigName = "tidb-tls"
c.Assert(cfg.Adjust(context.Background()), IsNil)
c.Assert(cfg.TiDB.Security.TLSConfigName, Equals, cfg.TiDB.TLS)
}

func (s *configTestSuite) TestInvalidCSV(c *C) {
Expand Down
5 changes: 1 addition & 4 deletions br/pkg/lightning/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,10 +274,7 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, g glue.
if taskCfg.TiDB.Security == nil {
return
}
taskCfg.TiDB.Security.CAPath = ""
if err := taskCfg.TiDB.Security.RegisterMySQL(); err != nil {
log.L().Warn("failed to deregister TLS config", log.ShortError(err))
}
taskCfg.TiDB.Security.DeregisterMySQL()
}()

// initiation of default glue should be after RegisterMySQL, which is ready to be called after taskCfg.Adjust
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,7 @@ func (rc *Controller) restoreSchema(ctx context.Context) error {
os.Exit(0)
})

rc.checkpointsWg.Add(1) // checkpointsWg will be done in `rc.listenCheckpointUpdates`
go rc.listenCheckpointUpdates()

sysVars := ObtainImportantVariables(ctx, rc.tidbGlue.GetSQLExecutor(), !rc.isTiDBBackend())
Expand Down Expand Up @@ -993,7 +994,7 @@ func (rc *Controller) saveStatusCheckpoint(ctx context.Context, tableName string

// listenCheckpointUpdates will combine several checkpoints together to reduce database load.
func (rc *Controller) listenCheckpointUpdates() {
rc.checkpointsWg.Add(1)
defer rc.checkpointsWg.Done()

var lock sync.Mutex
coalesed := make(map[string]*checkpoints.TableCheckpointDiff)
Expand Down Expand Up @@ -1082,7 +1083,6 @@ func (rc *Controller) listenCheckpointUpdates() {
}
})
}
rc.checkpointsWg.Done()
}

// buildRunPeriodicActionAndCancelFunc build the runPeriodicAction func and a cancel func
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/restore/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1118,6 +1118,7 @@ func (s *tableRestoreSuite) TestSaveStatusCheckpoint(c *C) {
saveCpCh: saveCpCh,
checkpointsDB: checkpoints.NewNullCheckpointsDB(),
}
rc.checkpointsWg.Add(1)
go rc.listenCheckpointUpdates()

start := time.Now()
Expand Down
11 changes: 10 additions & 1 deletion br/pkg/restore/split_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,11 @@ func (c *pdClient) getMaxReplica(ctx context.Context) (int, error) {
if err != nil {
return 0, errors.Trace(err)
}
defer func() {
if err = res.Body.Close(); err != nil {
log.Error("Response fail to close", zap.Error(err))
}
}()
var conf config.Config
if err := json.NewDecoder(res.Body).Decode(&conf); err != nil {
return 0, errors.Trace(err)
Expand Down Expand Up @@ -482,11 +487,15 @@ func (c *pdClient) GetPlacementRule(ctx context.Context, groupID, ruleID string)
if err != nil {
return rule, errors.Trace(err)
}
defer func() {
if err = res.Body.Close(); err != nil {
log.Error("Response fail to close", zap.Error(err))
}
}()
b, err := io.ReadAll(res.Body)
if err != nil {
return rule, errors.Trace(err)
}
res.Body.Close()
err = json.Unmarshal(b, &rule)
if err != nil {
return rule, errors.Trace(err)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ func (cfg *Config) parseCipherInfo(flags *pflag.FlagSet) error {
}

if !checkCipherKeyMatch(&cfg.CipherInfo) {
return errors.Annotate(err, "Cipher type and key not match")
return errors.Annotate(berrors.ErrInvalidArgument, "crypter method and key length not match")
}

return nil
Expand Down
99 changes: 99 additions & 0 deletions br/pkg/task/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
package task

import (
"encoding/hex"
"fmt"

. "github.com/pingcap/check"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/encryptionpb"
"github.com/pingcap/tidb/config"
"github.com/spf13/pflag"
)
Expand Down Expand Up @@ -62,3 +65,99 @@ func (s *testCommonSuite) TestStripingPDURL(c *C) {
c.Assert(err, IsNil)
c.Assert(noChange, Equals, "127.0.0.1:2379")
}

func (s *testCommonSuite) TestCheckCipherKeyMatch(c *C) {
testCases := []struct {
CipherType encryptionpb.EncryptionMethod
CipherKey string
ok bool
}{
{
CipherType: encryptionpb.EncryptionMethod_PLAINTEXT,
ok: true,
},
{
CipherType: encryptionpb.EncryptionMethod_UNKNOWN,
ok: false,
},
{
CipherType: encryptionpb.EncryptionMethod_AES128_CTR,
CipherKey: "0123456789abcdef0123456789abcdef",
ok: true,
},
{
CipherType: encryptionpb.EncryptionMethod_AES128_CTR,
CipherKey: "0123456789abcdef0123456789abcd",
ok: false,
},
{
CipherType: encryptionpb.EncryptionMethod_AES192_CTR,
CipherKey: "0123456789abcdef0123456789abcdef0123456789abcdef",
ok: true,
},
{
CipherType: encryptionpb.EncryptionMethod_AES192_CTR,
CipherKey: "0123456789abcdef0123456789abcdef0123456789abcdefff",
ok: false,
},
{
CipherType: encryptionpb.EncryptionMethod_AES256_CTR,
CipherKey: "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef",
ok: true,
},
{
CipherType: encryptionpb.EncryptionMethod_AES256_CTR,
CipherKey: "",
ok: false,
},
}

for _, t := range testCases {
cipherKey, err := hex.DecodeString(t.CipherKey)
c.Assert(err, IsNil)

r := checkCipherKeyMatch(&backuppb.CipherInfo{
CipherType: t.CipherType,
CipherKey: cipherKey,
})
c.Assert(r, Equals, t.ok)
}
}

func (s *testCommonSuite) TestCheckCipherKey(c *C) {
cases := []struct {
cipherKey string
keyFile string
ok bool
}{
{
cipherKey: "0123456789abcdef0123456789abcdef",
keyFile: "",
ok: true,
},
{
cipherKey: "0123456789abcdef0123456789abcdef",
keyFile: "/tmp/abc",
ok: false,
},
{
cipherKey: "",
keyFile: "/tmp/abc",
ok: true,
},
{
cipherKey: "",
keyFile: "",
ok: false,
},
}

for _, t := range cases {
err := checkCipherKey(t.cipherKey, t.keyFile)
if t.ok {
c.Assert(err, IsNil)
} else {
c.Assert(err, NotNil)
}
}
}
2 changes: 0 additions & 2 deletions br/tests/lightning_error_summary/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

set -eux

# skip for temporary due to checksum for table a,c succeed, but expect to fail.
exit 0
# Check that error summary are written at the bottom of import.
run_sql 'DROP DATABASE IF EXISTS tidb_lightning_checkpoint_error_summary;'

Expand Down
46 changes: 46 additions & 0 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import (
"github.com/pingcap/tidb/util/domainutil"
"github.com/pingcap/tidb/util/israce"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testutil"
"github.com/tikv/client-go/v2/testutils"
Expand Down Expand Up @@ -7290,6 +7291,51 @@ func (s *testSerialDBSuite) TestJsonUnmarshalErrWhenPanicInCancellingPath(c *C)
c.Assert(err.Error(), Equals, "[kv:1062]Duplicate entry '0' for key 'cc'")
}

// Close issue #24172.
// See https://github.com/pingcap/tidb/issues/24172
func (s *testSerialDBSuite) TestCancelJobWriteConflict(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk1 := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(id int)")

var cancelErr error
var rs []sqlexec.RecordSet
hook := &ddl.TestDDLCallback{}
d := s.dom.DDL()
originalHook := d.GetHook()
d.(ddl.DDLForTest).SetHook(hook)
defer d.(ddl.DDLForTest).SetHook(originalHook)

// Test when cancelling cannot be retried and adding index succeeds.
hook.OnJobRunBeforeExported = func(job *model.Job) {
if job.Type == model.ActionAddIndex && job.State == model.JobStateRunning && job.SchemaState == model.StateWriteReorganization {
stmt := fmt.Sprintf("admin cancel ddl jobs %d", job.ID)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn", `return("no_retry")`), IsNil)
defer func() { c.Assert(failpoint.Disable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn"), IsNil) }()
rs, cancelErr = tk1.Se.Execute(context.Background(), stmt)
}
}
tk.MustExec("alter table t add index (id)")
c.Assert(cancelErr.Error(), Equals, "mock commit error")

// Test when cancelling is retried only once and adding index is cancelled in the end.
var jobID int64
hook.OnJobRunBeforeExported = func(job *model.Job) {
if job.Type == model.ActionAddIndex && job.State == model.JobStateRunning && job.SchemaState == model.StateWriteReorganization {
jobID = job.ID
stmt := fmt.Sprintf("admin cancel ddl jobs %d", job.ID)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn", `return("retry_once")`), IsNil)
defer func() { c.Assert(failpoint.Disable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn"), IsNil) }()
rs, cancelErr = tk1.Se.Execute(context.Background(), stmt)
}
}
tk.MustGetErrCode("alter table t add index (id)", errno.ErrCancelledDDLJob)
c.Assert(cancelErr, IsNil)
result := tk1.ResultSetToResultWithCtx(context.Background(), rs[0], Commentf("cancel ddl job fails"))
result.Check(testkit.Rows(fmt.Sprintf("%d successful", jobID)))
}

// For Close issue #24288
// see https://github.com/pingcap/tidb/issues/24288
func (s *testDBSuite8) TestDdlMaxLimitOfIdentifier(c *C) {
Expand Down
Loading

0 comments on commit 5d05315

Please sign in to comment.