Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: parallel read inner table and build hash table. #7544

Merged
merged 12 commits into from
Sep 4, 2018
Merged
63 changes: 46 additions & 17 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,17 +259,37 @@ func (e *HashJoinExec) wait4Inner() (finished bool, err error) {

// fetchInnerRows fetches all rows from inner executor,
// and append them to e.innerResult.
func (e *HashJoinExec) fetchInnerRows(ctx context.Context) (err error) {
func (e *HashJoinExec) fetchInnerRows(ctx context.Context, chkCh chan<- *chunk.Chunk, doneCh chan struct{}) {
defer func() {
close(chkCh)
if r := recover(); r != nil {
buf := make([]byte, 4096)
stackSize := runtime.Stack(buf, false)
buf = buf[:stackSize]
log.Errorf("hash join inner fetcher panic stack is:\n%s", buf)
}
}()
e.innerResult = chunk.NewList(e.innerExec.retTypes(), e.maxChunkSize)
e.innerResult.GetMemTracker().AttachTo(e.memTracker)
e.innerResult.GetMemTracker().SetLabel("innerResult")
var err error
for {
chk := e.children[e.innerIdx].newChunk()
err = e.innerExec.Next(ctx, chk)
if err != nil || chk.NumRows() == 0 {
return errors.Trace(err)
select {
case <-doneCh:
return
default:
chk := e.children[e.innerIdx].newChunk()
err = e.innerExec.Next(ctx, chk)
if err != nil {
e.innerFinished <- errors.Trace(err)
return
}
if chk.NumRows() == 0 {
return
}
chkCh <- chk
e.innerResult.Add(chk)
}
e.innerResult.Add(chk)
}
}

Expand Down Expand Up @@ -495,6 +515,8 @@ func (e *HashJoinExec) Next(ctx context.Context, chk *chunk.Chunk) (err error) {
}

func (e *HashJoinExec) fetchInnerAndBuildHashTable(ctx context.Context) {
chkCh := make(chan *chunk.Chunk, e.concurrency)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. move this two definition to after the defer func.
  2. add a comment for chkCh. (s/ chkCh/ innerResultCh ?)

doneCh := make(chan struct{})
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 4096)
Expand All @@ -506,21 +528,25 @@ func (e *HashJoinExec) fetchInnerAndBuildHashTable(ctx context.Context) {
close(e.innerFinished)
}()

if err := e.fetchInnerRows(ctx); err != nil {
e.innerFinished <- errors.Trace(err)
return
}
go e.fetchInnerRows(ctx, chkCh, doneCh)

if err := e.buildHashTableForList(); err != nil {
// TODO: Parallel build hash table. Currently not support because `mvmap` is not thread-safe.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe this TODO can be removed? @XuHuaiyu how do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's both ok to keep it or remove it.

err := e.buildHashTableForList(chkCh)
if err != nil {
e.innerFinished <- errors.Trace(err)
return
close(doneCh)
// Func fetchInnerRows may blocked by this channel, so read from the channel to unblock it.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. remove Func
  2. s/ blocked/ be blocked

select {
case <-chkCh:
default:
}
}
}

// buildHashTableForList builds hash table from `list`.
// key of hash table: hash value of key columns
// value of hash table: RowPtr of the corresponded row
func (e *HashJoinExec) buildHashTableForList() error {
func (e *HashJoinExec) buildHashTableForList(chkCh chan *chunk.Chunk) error {
e.hashTable = mvmap.NewMVMap()
e.innerKeyColIdx = make([]int, len(e.innerKeys))
for i := range e.innerKeys {
Expand All @@ -532,20 +558,23 @@ func (e *HashJoinExec) buildHashTableForList() error {
keyBuf = make([]byte, 0, 64)
valBuf = make([]byte, 8)
)
for i := 0; i < e.innerResult.NumChunks(); i++ {
chk := e.innerResult.GetChunk(i)
for j := 0; j < chk.NumRows(); j++ {

numChks := 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/ numChks/ chkIdx := uint32(0)

for chk := range chkCh {
numRows := chk.NumRows()
for j := 0; j < numRows; j++ {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

j < chk.NumRows()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

func BenchmarkFor1(b *testing.B) {
	chk := getChks()
	for i := 0; i < b.N; i++ {
		for j := 0; j < chk.NumRows(); j++ {
			_ = j
		}
	}
}
func BenchmarkFor2(b *testing.B) {
	chk := getChks()
	for i := 0; i < b.N; i++ {
		numRows := chk.NumRows()
		for j := 0; j < numRows; j++ {
			_ = j
		}
	}
}

chk.NumRows is 1024.

BenchmarkFor1    5000000              1115 ns/op
BenchmarkFor1    5000000              1119 ns/op
BenchmarkFor1    5000000              1106 ns/op
BenchmarkFor2   10000000               557 ns/op
BenchmarkFor2   10000000               577 ns/op
BenchmarkFor2   10000000               587 ns/op

so, I think the original is better.

hasNull, keyBuf, err = e.getJoinKeyFromChkRow(false, chk.GetRow(j), keyBuf)
if err != nil {
return errors.Trace(err)
}
if hasNull {
continue
}
rowPtr := chunk.RowPtr{ChkIdx: uint32(i), RowIdx: uint32(j)}
rowPtr := chunk.RowPtr{ChkIdx: uint32(numChks), RowIdx: uint32(j)}
*(*chunk.RowPtr)(unsafe.Pointer(&valBuf[0])) = rowPtr
e.hashTable.Put(keyBuf, valBuf)
}
numChks++
}
return nil
}
Expand Down