Skip to content

Commit

Permalink
Merge branch 'master' into not_exists_anti
Browse files Browse the repository at this point in the history
  • Loading branch information
eurekaka authored Oct 9, 2018
2 parents 5f1a585 + 75d6a3e commit d1046f0
Show file tree
Hide file tree
Showing 14 changed files with 1,709 additions and 0 deletions.
9 changes: 9 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 25 additions & 0 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package executor

import (
"math"
"sync"
"sync/atomic"
"unsafe"
Expand Down Expand Up @@ -71,6 +72,12 @@ type HashJoinExec struct {
hashTableValBufs [][][]byte

memTracker *memory.Tracker // track memory usage.

// radixBits indicates the bit number using for radix partitioning. Inner
// relation will be split to 2^radixBits sub-relations before building the
// hash tables. If the complete inner relation can be hold in L2Cache in
// which case radixBits will be 0, we can skip the partition phase.
radixBits int
}

// outerChkResource stores the result of the join outer fetch worker,
Expand Down Expand Up @@ -269,6 +276,7 @@ func (e *HashJoinExec) fetchInnerRows(ctx context.Context, chkCh chan<- *chunk.C
return
}
if chk.NumRows() == 0 {
e.evalRadixBitNum()
return
}
chkCh <- chk
Expand All @@ -277,6 +285,23 @@ func (e *HashJoinExec) fetchInnerRows(ctx context.Context, chkCh chan<- *chunk.C
}
}

// evalRadixBitNum evaluates the radix bit numbers.
func (e *HashJoinExec) evalRadixBitNum() {
sv := e.ctx.GetSessionVars()
// Calculate the bit number needed when using radix partition.
if !sv.EnableRadixJoin {
return
}
innerResultSize := float64(e.innerResult.GetMemTracker().BytesConsumed())
// To ensure that one partition of inner relation, one hash
// table and one partition of outer relation fit into the L2
// cache when the input data obeys the uniform distribution,
// we suppose every sub-partition of inner relation using
// three quarters of the L2 cache size.
l2CacheSize := float64(sv.L2CacheSize) * 3 / 4
e.radixBits = int(math.Log2(innerResultSize / l2CacheSize))
}

func (e *HashJoinExec) initializeForProbe() {
// e.outerResultChs is for transmitting the chunks which store the data of outerExec,
// it'll be written by outer worker goroutine, and read by join workers.
Expand Down
12 changes: 12 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"sync/atomic"
"time"

"github.com/klauspost/cpuid"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
Expand Down Expand Up @@ -294,6 +295,13 @@ type SessionVars struct {
EnableStreaming bool

writeStmtBufs WriteStmtBufs

// L2CacheSize indicates the size of CPU L2 cache, using byte as unit.
L2CacheSize int

// EnableRadixJoin indicates whether to use radix hash join to execute
// HashJoin.
EnableRadixJoin bool
}

// NewSessionVars creates a session vars object.
Expand All @@ -315,6 +323,8 @@ func NewSessionVars() *SessionVars {
RetryLimit: DefTiDBRetryLimit,
DisableTxnAutoRetry: DefTiDBDisableTxnAutoRetry,
DDLReorgPriority: kv.PriorityLow,
EnableRadixJoin: false,
L2CacheSize: cpuid.CPU.Cache.L2,
}
vars.Concurrency = Concurrency{
IndexLookupConcurrency: DefIndexLookupConcurrency,
Expand Down Expand Up @@ -578,6 +588,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
s.setDDLReorgPriority(val)
case TiDBForcePriority:
atomic.StoreInt32(&ForcePriority, int32(mysql.Str2Priority(val)))
case TiDBEnableRadixJoin:
s.EnableRadixJoin = TiDBOptOn(val)
}
s.systems[name] = val
return nil
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,7 @@ var defaultSysVars = []*SysVar{
{ScopeGlobal | ScopeSession, TiDBDDLReorgWorkerCount, strconv.Itoa(DefTiDBDDLReorgWorkerCount)},
{ScopeSession, TiDBDDLReorgPriority, "PRIORITY_LOW"},
{ScopeSession, TiDBForcePriority, mysql.Priority2Str[DefTiDBForcePriority]},
{ScopeSession, TiDBEnableRadixJoin, boolToIntStr(DefTiDBUseRadixJoin)},
}

// SynonymsSysVariables is synonyms of system variables.
Expand Down
5 changes: 5 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ const (
// tidb_force_priority defines the operations priority of all statements.
// It can be "NO_PRIORITY", "LOW_PRIORITY", "HIGH_PRIORITY", "DELAYED"
TiDBForcePriority = "tidb_force_priority"

// tidb_enable_radix_join indicates to use radix hash join algorithm to execute
// HashJoin.
TiDBEnableRadixJoin = "tidb_enable_radix_join"
)

// Default TiDB system variable values.
Expand Down Expand Up @@ -231,6 +235,7 @@ const (
DefTiDBHashAggPartialConcurrency = 4
DefTiDBHashAggFinalConcurrency = 4
DefTiDBForcePriority = mysql.NoPriority
DefTiDBUseRadixJoin = false
)

// Process global variables.
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/varsutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func (s *testVarsutilSuite) TestNewSessionVars(c *C) {
c.Assert(vars.MemQuotaIndexLookupReader, Equals, int64(DefTiDBMemQuotaIndexLookupReader))
c.Assert(vars.MemQuotaIndexLookupJoin, Equals, int64(DefTiDBMemQuotaIndexLookupJoin))
c.Assert(vars.MemQuotaNestedLoopApply, Equals, int64(DefTiDBMemQuotaNestedLoopApply))
c.Assert(vars.EnableRadixJoin, Equals, DefTiDBUseRadixJoin)

assertFieldsGreaterThanZero(c, reflect.ValueOf(vars.Concurrency))
assertFieldsGreaterThanZero(c, reflect.ValueOf(vars.MemQuota))
Expand Down
22 changes: 22 additions & 0 deletions vendor/github.com/klauspost/cpuid/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit d1046f0

Please sign in to comment.