Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: parallel read inner table and build hash table. #7544

Merged
merged 12 commits into from
Sep 4, 2018
Merged
72 changes: 50 additions & 22 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,20 +263,40 @@ func (e *HashJoinExec) wait4Inner() (finished bool, err error) {

// fetchInnerRows fetches all rows from inner executor,
// and append them to e.innerResult.
func (e *HashJoinExec) fetchInnerRows(ctx context.Context) (err error) {
func (e *HashJoinExec) fetchInnerRows(ctx context.Context, chkCh chan<- *chunk.Chunk, doneCh chan struct{}) {
defer func() {
close(chkCh)
if r := recover(); r != nil {
buf := make([]byte, 4096)
stackSize := runtime.Stack(buf, false)
buf = buf[:stackSize]
log.Errorf("hash join inner fetcher panic stack is:\n%s", buf)
}
}()
e.innerResult = chunk.NewList(e.innerExec.retTypes(), e.maxChunkSize)
e.innerResult.GetMemTracker().AttachTo(e.memTracker)
e.innerResult.GetMemTracker().SetLabel("innerResult")
var err error
for {
if e.finished.Load().(bool) {
return nil
}
chk := e.children[e.innerIdx].newChunk()
err = e.innerExec.Next(ctx, chk)
if err != nil || chk.NumRows() == 0 {
return errors.Trace(err)
select {
case <-doneCh:
return
default:
if e.finished.Load().(bool) {
return
}
chk := e.children[e.innerIdx].newChunk()
err = e.innerExec.Next(ctx, chk)
if err != nil {
e.innerFinished <- errors.Trace(err)
return
}
if chk.NumRows() == 0 {
return
}
chkCh <- chk
e.innerResult.Add(chk)
}
e.innerResult.Add(chk)
}
}

Expand Down Expand Up @@ -512,26 +532,31 @@ func (e *HashJoinExec) fetchInnerAndBuildHashTable(ctx context.Context) {
}
close(e.innerFinished)
}()

if err := e.fetchInnerRows(ctx); err != nil {
e.innerFinished <- errors.Trace(err)
return
}
// innerResultCh transfer inner result chunk from inner fetch to build hash table.
innerResultCh := make(chan *chunk.Chunk, e.concurrency)
doneCh := make(chan struct{})
go e.fetchInnerRows(ctx, innerResultCh, doneCh)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about don't always fork new groutine in here and keep fetchInnerRows logic in "primary" goroutine, and base on fetch result to choose whether or not to fork new groutine to buildHashTableForList.(e.g. Next 2 times but still has data?)

IMHO, The idea is that maybe common small innser case it's too heavy to fork goroutine but some case fork is worth, and fetch with single goutine is required, so make choosen in buildTable?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@crazycs520 We can bench the scenario that the inner child for the hash join operator is a simple table reader and the number of output records of that child is very small, for example, 1.

Copy link
Contributor Author

@crazycs520 crazycs520 Sep 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lysu @zz-jason here is a benchmark:
select t1.* from t1 inner join tid1 where t1.id=tid1.id and tid1.id < 1;
table t1 and tid1 both have 10 rows. and the join result is 1 row;
master:

goos: linux
goarch: amd64
pkg: sql_script
BenchmarkSelect 	    5000	   3474578 ns/op
PASS
ok  	sql_script	17.852s

this branch

goos: linux
goarch: amd64
pkg: sql_script
BenchmarkSelect 	    5000	   3486767 ns/op
PASS
ok  	sql_script	17.904s


if e.finished.Load().(bool) {
return
}

if err := e.buildHashTableForList(); err != nil {
// TODO: Parallel build hash table. Currently not support because `mvmap` is not thread-safe.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe this TODO can be removed? @XuHuaiyu how do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's both ok to keep it or remove it.

err := e.buildHashTableForList(innerResultCh)
if err != nil {
e.innerFinished <- errors.Trace(err)
return
close(doneCh)
// fetchInnerRows may be blocked by this channel, so read from the channel to unblock it.
select {
case <-innerResultCh:
default:
}
}
}

// buildHashTableForList builds hash table from `list`.
// key of hash table: hash value of key columns
// value of hash table: RowPtr of the corresponded row
func (e *HashJoinExec) buildHashTableForList() error {
func (e *HashJoinExec) buildHashTableForList(innerResultCh chan *chunk.Chunk) error {
e.hashTable = mvmap.NewMVMap()
e.innerKeyColIdx = make([]int, len(e.innerKeys))
for i := range e.innerKeys {
Expand All @@ -543,23 +568,26 @@ func (e *HashJoinExec) buildHashTableForList() error {
keyBuf = make([]byte, 0, 64)
valBuf = make([]byte, 8)
)
for i := 0; i < e.innerResult.NumChunks(); i++ {

chkIdx := uint32(0)
for chk := range innerResultCh {
if e.finished.Load().(bool) {
return nil
}
chk := e.innerResult.GetChunk(i)
for j := 0; j < chk.NumRows(); j++ {
numRows := chk.NumRows()
for j := 0; j < numRows; j++ {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

j < chk.NumRows()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

func BenchmarkFor1(b *testing.B) {
	chk := getChks()
	for i := 0; i < b.N; i++ {
		for j := 0; j < chk.NumRows(); j++ {
			_ = j
		}
	}
}
func BenchmarkFor2(b *testing.B) {
	chk := getChks()
	for i := 0; i < b.N; i++ {
		numRows := chk.NumRows()
		for j := 0; j < numRows; j++ {
			_ = j
		}
	}
}

chk.NumRows is 1024.

BenchmarkFor1    5000000              1115 ns/op
BenchmarkFor1    5000000              1119 ns/op
BenchmarkFor1    5000000              1106 ns/op
BenchmarkFor2   10000000               557 ns/op
BenchmarkFor2   10000000               577 ns/op
BenchmarkFor2   10000000               587 ns/op

so, I think the original is better.

hasNull, keyBuf, err = e.getJoinKeyFromChkRow(false, chk.GetRow(j), keyBuf)
if err != nil {
return errors.Trace(err)
}
if hasNull {
continue
}
rowPtr := chunk.RowPtr{ChkIdx: uint32(i), RowIdx: uint32(j)}
rowPtr := chunk.RowPtr{ChkIdx: chkIdx, RowIdx: uint32(j)}
*(*chunk.RowPtr)(unsafe.Pointer(&valBuf[0])) = rowPtr
e.hashTable.Put(keyBuf, valBuf)
}
chkIdx++
}
return nil
}
Expand Down