Skip to content

Commit

Permalink
variable: add txn_source into kv.context (#39159)
Browse files Browse the repository at this point in the history
ref #38587
  • Loading branch information
xiongjiwei authored Nov 25, 2022
1 parent 05d616c commit 5775995
Show file tree
Hide file tree
Showing 14 changed files with 51 additions and 14 deletions.
8 changes: 4 additions & 4 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -2915,8 +2915,8 @@ def go_deps():
name = "com_github_pingcap_kvproto",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/kvproto",
sum = "h1:HyWSOT/drBEtfXK2HLkWWR8dCO+rcf7OiRDRhBxAfU4=",
version = "v0.0.0-20221114102356-3debb6820e46",
sum = "h1:Ywk7n+4zm6W6T9XSyAwihBWdxXR2ALQzswQMEOglHkM=",
version = "v0.0.0-20221117075110-51120697d051",
)
go_repository(
name = "com_github_pingcap_log",
Expand Down Expand Up @@ -3519,8 +3519,8 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:nFVdyTXcQYZwQQCdSJcFI1vBFyzG1hVuZ39MAK6wqK4=",
version = "v2.0.3-0.20221108030801-9c0835c80eba",
sum = "h1:5df3qAcxvdGAffe0aBVFYhwQwAvl3VrF/xSX+J8ueyI=",
version = "v2.0.3-0.20221121025013-e9db9e6a8a94",
)
go_repository(
name = "com_github_tikv_pd_client",
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/streamhelper/basic_lib_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ func (f *fakeStore) GetLastFlushTSOfRegion(ctx context.Context, in *logbackup.Ge
return resp, nil
}

func (f *fakeStore) SubscribeFlushEvent(ctx context.Context, in *logbackup.SubscribeFlushEventRequest, opts ...grpc.CallOption) (logbackup.LogBackup_SubscribeFlushEventClient, error) {
return nil, nil
}

// RegionScan gets a list of regions, starts from the region that contains key.
// Limit limits the maximum number of regions returned.
func (f *fakeCluster) RegionScan(ctx context.Context, key []byte, endKey []byte, limit int) ([]streamhelper.RegionWithLeader, error) {
Expand Down
1 change: 0 additions & 1 deletion ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -979,7 +979,6 @@ func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error {
// Instead, we merge all the jobs into one pending job.
return appendToSubJobs(mci, job)
}

// Get a global job ID and put the DDL job in the queue.
setDDLJobQuery(ctx, job)
task := &limitJobTask{job, make(chan error)}
Expand Down
10 changes: 7 additions & 3 deletions domain/globalconfigsync/globalconfig_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,19 +88,23 @@ func TestStoreGlobalConfig(t *testing.T) {

_, err = se.Execute(context.Background(), "set @@global.tidb_enable_top_sql=1;")
require.NoError(t, err)
_, err = se.Execute(context.Background(), "set @@global.tidb_source_id=2;")
require.NoError(t, err)
for i := 0; i < 20; i++ {
time.Sleep(100 * time.Millisecond)
client :=
store.(kv.StorageWithPD).GetPDClient()
// enable top sql will be translated to enable_resource_metering
items, err := client.LoadGlobalConfig(context.Background(), []string{"enable_resource_metering"})
items, err := client.LoadGlobalConfig(context.Background(), []string{"enable_resource_metering", "source_id"})
require.NoError(t, err)
if len(items) == 1 && items[0].Value == "" {
if len(items) == 2 && items[0].Value == "" {
continue
}
require.Len(t, items, 1)
require.Len(t, items, 2)
require.Equal(t, items[0].Name, "/global/config/enable_resource_metering")
require.Equal(t, items[0].Value, "true")
require.Equal(t, items[1].Name, "/global/config/source_id")
require.Equal(t, items[1].Value, "2")
return
}
require.Fail(t, "timeout for waiting global config synced")
Expand Down
9 changes: 9 additions & 0 deletions executor/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,15 @@ func TestSetVar(t *testing.T) {
tk.MustQuery("SELECT @@GLOBAL.validate_password.length").Check(testkit.Rows("4"))
tk.MustExec("SET GLOBAL validate_password.mixed_case_count = 2")
tk.MustQuery("SELECT @@GLOBAL.validate_password.length").Check(testkit.Rows("6"))

// test tidb_cdc_write_source
require.Equal(t, uint64(0), tk.Session().GetSessionVars().CDCWriteSource)
tk.MustQuery("select @@tidb_cdc_write_source").Check(testkit.Rows("0"))
tk.MustExec("set @@session.tidb_cdc_write_source = 2")
tk.MustQuery("select @@tidb_cdc_write_source").Check(testkit.Rows("2"))
require.Equal(t, uint64(2), tk.Session().GetSessionVars().CDCWriteSource)
tk.MustExec("set @@session.tidb_cdc_write_source = 0")
require.Equal(t, uint64(0), tk.Session().GetSessionVars().CDCWriteSource)
}

func TestGetSetNoopVars(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ require (
github.com/pingcap/errors v0.11.5-0.20220729040631-518f63d66278
github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3
github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059
github.com/pingcap/kvproto v0.0.0-20221114102356-3debb6820e46
github.com/pingcap/kvproto v0.0.0-20221117075110-51120697d051
github.com/pingcap/log v1.1.1-0.20221116035753-734d527bc87c
github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4
github.com/pingcap/tidb/parser v0.0.0-20211011031125-9b13dc409c5e
Expand All @@ -86,7 +86,7 @@ require (
github.com/stretchr/testify v1.8.0
github.com/tdakkota/asciicheck v0.1.1
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.3-0.20221108030801-9c0835c80eba
github.com/tikv/client-go/v2 v2.0.3-0.20221121025013-e9db9e6a8a94
github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07
github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144
github.com/twmb/murmur3 v1.1.3
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -778,8 +778,8 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20221026112947-f8d61344b172/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20221114102356-3debb6820e46 h1:HyWSOT/drBEtfXK2HLkWWR8dCO+rcf7OiRDRhBxAfU4=
github.com/pingcap/kvproto v0.0.0-20221114102356-3debb6820e46/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/kvproto v0.0.0-20221117075110-51120697d051 h1:Ywk7n+4zm6W6T9XSyAwihBWdxXR2ALQzswQMEOglHkM=
github.com/pingcap/kvproto v0.0.0-20221117075110-51120697d051/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
Expand Down Expand Up @@ -928,8 +928,8 @@ github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3 h1:f+jULpR
github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3/go.mod h1:ON8b8w4BN/kE1EOhwT0o+d62W65a6aPw1nouo9LMgyY=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU=
github.com/tikv/client-go/v2 v2.0.3-0.20221108030801-9c0835c80eba h1:nFVdyTXcQYZwQQCdSJcFI1vBFyzG1hVuZ39MAK6wqK4=
github.com/tikv/client-go/v2 v2.0.3-0.20221108030801-9c0835c80eba/go.mod h1:X9s4ct/MLk1sFqe5mU79KClKegLFDTa/FCx3hzexGtk=
github.com/tikv/client-go/v2 v2.0.3-0.20221121025013-e9db9e6a8a94 h1:5df3qAcxvdGAffe0aBVFYhwQwAvl3VrF/xSX+J8ueyI=
github.com/tikv/client-go/v2 v2.0.3-0.20221121025013-e9db9e6a8a94/go.mod h1:mQQhAIZ2uJwWXOG2UEz9s9oLGRcNKGGGtDOk4b13Bos=
github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07 h1:ckPpxKcl75mO2N6a4cJXiZH43hvcHPpqc9dh1TmH1nc=
github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07/go.mod h1:CipBxPfxPUME+BImx9MUYXCnAVLS3VJUr3mnSJwh40A=
github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 h1:kl4KhGNsJIbDHS9/4U9yQo1UcPQM0kOMJHn29EoH/Ro=
Expand Down
2 changes: 2 additions & 0 deletions kv/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ const (
ReplicaReadAdjuster
// ScanBatchSize set the iter scan batch size.
ScanBatchSize
// TxnSource set the source of this transaction.
TxnSource
)

// ReplicaReadType is the type of replica to read data from
Expand Down
1 change: 1 addition & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,7 @@ func (s *session) doCommit(ctx context.Context) error {
if tables := sessVars.TxnCtx.TemporaryTables; len(tables) > 0 {
s.txn.SetOption(kv.KVFilter, temporaryTableKVFilter(tables))
}
s.txn.SetOption(kv.TxnSource, sessVars.CDCWriteSource)
if tables := sessVars.TxnCtx.CachedTables; len(tables) > 0 {
c := cachedTableRenewLease{tables: tables}
now := time.Now()
Expand Down
4 changes: 4 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1041,6 +1041,10 @@ type SessionVars struct {

// MetricSchemaStep indicates the step when query metric schema.
MetricSchemaStep int64

// CDCWriteSource indicates the following data is written by TiCDC if it is not 0.
CDCWriteSource uint64

// MetricSchemaRangeDuration indicates the step when query metric schema.
MetricSchemaRangeDuration int64

Expand Down
5 changes: 5 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,10 @@ var defaultSysVars = []*SysVar{
s.MetricSchemaStep = TidbOptInt64(val, DefTiDBMetricSchemaStep)
return nil
}},
{Scope: ScopeSession, Name: TiDBCDCWriteSource, Value: "0", Type: TypeInt, MinValue: 0, MaxValue: 15, SetSession: func(s *SessionVars, val string) error {
s.CDCWriteSource = uint64(TidbOptInt(val, 0))
return nil
}},
{Scope: ScopeSession, Name: TiDBMetricSchemaRangeDuration, Value: strconv.Itoa(DefTiDBMetricSchemaRangeDuration), skipInit: true, Type: TypeUnsigned, MinValue: 10, MaxValue: 60 * 60 * 60, SetSession: func(s *SessionVars, val string) error {
s.MetricSchemaRangeDuration = TidbOptInt64(val, DefTiDBMetricSchemaRangeDuration)
return nil
Expand Down Expand Up @@ -776,6 +780,7 @@ var defaultSysVars = []*SysVar{
// TopSQL enable only be controlled by TopSQL pub/sub sinker.
// This global variable only uses to update the global config which store in PD(ETCD).
{Scope: ScopeGlobal, Name: TiDBEnableTopSQL, Value: BoolToOnOff(topsqlstate.DefTiDBTopSQLEnable), Type: TypeBool, AllowEmpty: true, GlobalConfigName: GlobalConfigEnableTopSQL},
{Scope: ScopeGlobal, Name: TiDBSourceID, Value: "1", Type: TypeInt, MinValue: 1, MaxValue: 15, GlobalConfigName: GlobalConfigSourceID},
{Scope: ScopeGlobal, Name: TiDBTopSQLMaxTimeSeriesCount, Value: strconv.Itoa(topsqlstate.DefTiDBTopSQLMaxTimeSeriesCount), Type: TypeInt, MinValue: 1, MaxValue: 5000, GetGlobal: func(_ context.Context, s *SessionVars) (string, error) {
return strconv.FormatInt(topsqlstate.GlobalState.MaxStatementCount.Load(), 10), nil
}, SetGlobal: func(_ context.Context, vars *SessionVars, s string) error {
Expand Down
6 changes: 6 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,9 @@ const (
// TiDBMetricSchemaStep indicates the step when query metric schema.
TiDBMetricSchemaStep = "tidb_metric_query_step"

// TiDBCDCWriteSource indicates the following data is written by TiCDC if it is not 0.
TiDBCDCWriteSource = "tidb_cdc_write_source"

// TiDBMetricSchemaRangeDuration indicates the range duration when query metric schema.
TiDBMetricSchemaRangeDuration = "tidb_metric_query_range_duration"

Expand Down Expand Up @@ -627,6 +630,9 @@ const (
// TiDBEnableTopSQL indicates whether the top SQL is enabled.
TiDBEnableTopSQL = "tidb_enable_top_sql"

// TiDBSourceID indicates the source ID of the TiDB server.
TiDBSourceID = "tidb_source_id"

// TiDBTopSQLMaxTimeSeriesCount indicates the max number of statements been collected in each time series.
TiDBTopSQLMaxTimeSeriesCount = "tidb_top_sql_max_time_series_count"

Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/variable.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ const (
// Global config name list.
const (
GlobalConfigEnableTopSQL = "enable_resource_metering"
GlobalConfigSourceID = "source_id"
)

func (s ScopeFlag) String() string {
Expand Down
2 changes: 2 additions & 0 deletions store/driver/txn/txn_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,8 @@ func (txn *tikvTxn) SetOption(opt int, val interface{}) {
txn.KVTxn.SetRequestSourceType(val.(string))
case kv.ReplicaReadAdjuster:
txn.KVTxn.GetSnapshot().SetReplicaReadAdjuster(val.(txnkv.ReplicaReadAdjuster))
case kv.TxnSource:
txn.KVTxn.SetTxnSource(val.(uint64))
}
}

Expand Down

0 comments on commit 5775995

Please sign in to comment.