diff --git a/br/pkg/lightning/BUILD.bazel b/br/pkg/lightning/BUILD.bazel index 95aca448f786a..fc646195c6e2b 100644 --- a/br/pkg/lightning/BUILD.bazel +++ b/br/pkg/lightning/BUILD.bazel @@ -28,7 +28,10 @@ go_library( "//br/pkg/version/build", "//expression", "//planner/core", + "//util", "//util/promutil", + "@com_github_go_sql_driver_mysql//:mysql", + "@com_github_google_uuid//:uuid", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/import_sstpb", diff --git a/br/pkg/lightning/common/util.go b/br/pkg/lightning/common/util.go index 679ba6cc5d48b..e01e560239456 100644 --- a/br/pkg/lightning/common/util.go +++ b/br/pkg/lightning/common/util.go @@ -54,6 +54,7 @@ type MySQLConnectParam struct { SQLMode string MaxAllowedPacket uint64 TLS string + Net string Vars map[string]string } @@ -64,6 +65,9 @@ func (param *MySQLConnectParam) ToDriverConfig() *mysql.Config { cfg.User = param.User cfg.Passwd = param.Password cfg.Net = "tcp" + if param.Net != "" { + cfg.Net = param.Net + } cfg.Addr = net.JoinHostPort(param.Host, strconv.Itoa(param.Port)) cfg.Params["charset"] = "utf8mb4" cfg.Params["sql_mode"] = fmt.Sprintf("'%s'", param.SQLMode) diff --git a/br/pkg/lightning/config/config.go b/br/pkg/lightning/config/config.go index fdbe962433806..56d9bc2e8d164 100644 --- a/br/pkg/lightning/config/config.go +++ b/br/pkg/lightning/config/config.go @@ -135,6 +135,9 @@ type DBStore struct { IndexSerialScanConcurrency int `toml:"index-serial-scan-concurrency" json:"index-serial-scan-concurrency"` ChecksumTableConcurrency int `toml:"checksum-table-concurrency" json:"checksum-table-concurrency"` Vars map[string]string `toml:"-" json:"vars"` + + IOTotalBytes *atomic.Uint64 `toml:"-" json:"-"` + UUID string `toml:"-" json:"-"` } type Config struct { diff --git a/br/pkg/lightning/lightning.go b/br/pkg/lightning/lightning.go index 2db76b1001078..87a3d2b7cf46c 100644 --- a/br/pkg/lightning/lightning.go +++ b/br/pkg/lightning/lightning.go @@ -33,6 +33,8 @@ import ( "sync" "time" + "github.com/go-sql-driver/mysql" + "github.com/google/uuid" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/import_sstpb" @@ -53,11 +55,13 @@ import ( "github.com/pingcap/tidb/br/pkg/version/build" _ "github.com/pingcap/tidb/expression" // get rid of `import cycle`: just init expression.RewriteAstExpr,and called at package `backend.kv`. _ "github.com/pingcap/tidb/planner/core" + "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/promutil" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/shurcooL/httpgzip" + "go.uber.org/atomic" "go.uber.org/zap" "go.uber.org/zap/zapcore" "golang.org/x/exp/slices" @@ -370,6 +374,36 @@ func (l *Lightning) RunOnceWithOptions(taskCtx context.Context, taskCfg *config. taskCfg.TaskID = int64(val.(int)) }) + failpoint.Inject("SetIOTotalBytes", func(_ failpoint.Value) { + o.logger.Info("set io total bytes") + taskCfg.TiDB.IOTotalBytes = atomic.NewUint64(0) + taskCfg.TiDB.UUID = uuid.New().String() + go func() { + for { + time.Sleep(time.Millisecond * 10) + log.L().Info("IOTotalBytes", zap.Uint64("IOTotalBytes", taskCfg.TiDB.IOTotalBytes.Load())) + } + }() + }) + if taskCfg.TiDB.IOTotalBytes != nil { + o.logger.Info("found IO total bytes counter") + mysql.RegisterDialContext(taskCfg.TiDB.UUID, func(ctx context.Context, addr string) (net.Conn, error) { + o.logger.Debug("connection with IO bytes counter") + d := &net.Dialer{} + conn, err := d.DialContext(ctx, "tcp", addr) + if err != nil { + return nil, err + } + tcpConn := conn.(*net.TCPConn) + // try https://github.com/go-sql-driver/mysql/blob/bcc459a906419e2890a50fc2c99ea6dd927a88f2/connector.go#L56-L64 + err = tcpConn.SetKeepAlive(true) + if err != nil { + o.logger.Warn("set TCP keep alive failed", zap.Error(err)) + } + return util.NewTCPConnWithIOCounter(tcpConn, taskCfg.TiDB.IOTotalBytes), nil + }) + } + return l.run(taskCtx, taskCfg, o) } diff --git a/br/pkg/lightning/restore/tidb.go b/br/pkg/lightning/restore/tidb.go index 0e114bc035a56..30f39185e05f2 100644 --- a/br/pkg/lightning/restore/tidb.go +++ b/br/pkg/lightning/restore/tidb.go @@ -73,6 +73,7 @@ func DBFromConfig(ctx context.Context, dsn config.DBStore) (*sql.DB, error) { SQLMode: dsn.StrSQLMode, MaxAllowedPacket: dsn.MaxAllowedPacket, TLS: dsn.TLS, + Net: dsn.UUID, } db, err := param.Connect() diff --git a/br/tests/lightning_record_network/config.toml b/br/tests/lightning_record_network/config.toml new file mode 100644 index 0000000000000..2de41a1f43dab --- /dev/null +++ b/br/tests/lightning_record_network/config.toml @@ -0,0 +1,2 @@ +[tikv-importer] +backend = 'tidb' diff --git a/br/tests/lightning_record_network/data/db-schema-create.sql b/br/tests/lightning_record_network/data/db-schema-create.sql new file mode 100644 index 0000000000000..c88b0e3150e76 --- /dev/null +++ b/br/tests/lightning_record_network/data/db-schema-create.sql @@ -0,0 +1 @@ +create database db; \ No newline at end of file diff --git a/br/tests/lightning_record_network/data/db.test-schema.sql b/br/tests/lightning_record_network/data/db.test-schema.sql new file mode 100644 index 0000000000000..7bee5f9ad639c --- /dev/null +++ b/br/tests/lightning_record_network/data/db.test-schema.sql @@ -0,0 +1 @@ +create table test ( id int primary key, a int, b int ); \ No newline at end of file diff --git a/br/tests/lightning_record_network/data/db.test.1.sql b/br/tests/lightning_record_network/data/db.test.1.sql new file mode 100644 index 0000000000000..3748d5fa91e80 --- /dev/null +++ b/br/tests/lightning_record_network/data/db.test.1.sql @@ -0,0 +1,21 @@ +insert into db.test values +(1,1,1), +(2,1,1), +(3,1,1), +(4,1,1), +(5,1,1), +(6,1,1), +(7,1,1), +(8,1,1), +(9,1,1), +(10,1,1), +(11,1,1), +(12,1,1), +(13,1,1), +(14,1,1), +(15,1,1), +(16,1,1), +(17,1,1), +(18,1,1), +(19,1,1), +(20,1,1); \ No newline at end of file diff --git a/br/tests/lightning_record_network/run.sh b/br/tests/lightning_record_network/run.sh new file mode 100644 index 0000000000000..e31f9d151d76a --- /dev/null +++ b/br/tests/lightning_record_network/run.sh @@ -0,0 +1,22 @@ +#!/bin/sh +# +# Copyright 2022 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -euE + +export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/SetIOTotalBytes=return(1)" +run_lightning + +grep 'IOTotal' "$TEST_DIR/lightning.log" | grep -v 'IOTotalBytes=0' diff --git a/util/BUILD.bazel b/util/BUILD.bazel index b63581d187f66..f0a137cb20431 100644 --- a/util/BUILD.bazel +++ b/util/BUILD.bazel @@ -46,6 +46,7 @@ go_library( "@io_etcd_go_etcd_client_v3//:client", "@io_etcd_go_etcd_client_v3//concurrency", "@org_golang_google_grpc//:grpc", + "@org_uber_go_atomic//:atomic", "@org_uber_go_zap//:zap", ], ) diff --git a/util/util.go b/util/util.go index 5edb010ce6c52..0e862345f23bd 100644 --- a/util/util.go +++ b/util/util.go @@ -19,6 +19,7 @@ import ( "encoding/json" "fmt" "io/ioutil" + "net" "net/http" "strconv" "strings" @@ -26,6 +27,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/parser" + "go.uber.org/atomic" "go.uber.org/zap" ) @@ -202,3 +204,30 @@ func FmtNonASCIIPrintableCharToHex(str string) string { } return b.String() } + +// TCPConnWithIOCounter is a wrapper of net.TCPConn with counter that accumulates +// the bytes this connection reads/writes. +type TCPConnWithIOCounter struct { + *net.TCPConn + c *atomic.Uint64 +} + +// NewTCPConnWithIOCounter creates a new TCPConnWithIOCounter. +func NewTCPConnWithIOCounter(conn *net.TCPConn, c *atomic.Uint64) net.Conn { + return &TCPConnWithIOCounter{ + TCPConn: conn, + c: c, + } +} + +func (t *TCPConnWithIOCounter) Read(b []byte) (n int, err error) { + n, err = t.TCPConn.Read(b) + t.c.Add(uint64(n)) + return n, err +} + +func (t *TCPConnWithIOCounter) Write(b []byte) (n int, err error) { + n, err = t.TCPConn.Write(b) + t.c.Add(uint64(n)) + return n, err +}