Skip to content

Commit

Permalink
prevent concurrent map access
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta committed Nov 25, 2022
1 parent 009058d commit 1399840
Showing 1 changed file with 6 additions and 4 deletions.
10 changes: 6 additions & 4 deletions ddl/index_cop.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ type copReqSenderPool struct {

idxBufPool sync.Pool // []*indexRecord
srcChkPool sync.Pool // *chunk.Chunk
binding map[*[]*indexRecord]*chunk.Chunk
binding generic.SyncMap[*[]*indexRecord, *chunk.Chunk]
}

type copReqSender struct {
Expand Down Expand Up @@ -160,7 +160,7 @@ func newCopReqSenderPool(ctx context.Context, copCtx *copContext, startTS uint64
return chunk.NewChunkWithCapacity(copCtx.fieldTps, int(variable.GetDDLReorgBatchSize())*copReadBatchFactor)
},
},
binding: make(map[*[]*indexRecord]*chunk.Chunk),
binding: generic.NewSyncMap[*[]*indexRecord, *chunk.Chunk](4),
}
}

Expand Down Expand Up @@ -206,7 +206,7 @@ func (c *copReqSenderPool) getIndexRecordsAndChunks() ([]*indexRecord, *chunk.Ch
chk = chunk.NewChunkWithCapacity(c.copCtx.fieldTps, newCap)
}
chk.Reset()
c.binding[&ir] = chk
c.binding.Store(&ir, chk)
return ir[:0], chk
}

Expand All @@ -216,7 +216,9 @@ func (c *copReqSenderPool) recycleIdxRecords(idxRecs []*indexRecord) {
return
}
c.idxBufPool.Put(idxRecs[:0])
c.srcChkPool.Put(c.binding[&idxRecs])
if bindingChunk, ok := c.binding.Load(&idxRecs); ok {
c.srcChkPool.Put(bindingChunk)
}
}

// copContext contains the information that is needed when building a coprocessor request.
Expand Down

0 comments on commit 1399840

Please sign in to comment.