Skip to content

Commit

Permalink
union streams and cursors (#10900)
Browse files Browse the repository at this point in the history
closes #10764

Co-authored-by: Ilya Miheev <[email protected]>
  • Loading branch information
JkLondon and Ilya Miheev authored Jun 25, 2024
1 parent 12bfd43 commit 60943b2
Showing 1 changed file with 24 additions and 33 deletions.
57 changes: 24 additions & 33 deletions erigon-lib/kv/mdbx/kv_mdbx.go
Original file line number Diff line number Diff line change
Expand Up @@ -830,11 +830,8 @@ type MdbxTx struct {
readOnly bool
ctx context.Context

cursors map[uint64]*mdbx.Cursor
cursorID uint64

streams map[int]kv.Closer
streamID int
toCloseMap map[uint64]kv.Closer
ID uint64
}

type MdbxCursor struct {
Expand Down Expand Up @@ -1165,18 +1162,12 @@ func (tx *MdbxTx) PrintDebugInfo() {
}

func (tx *MdbxTx) closeCursors() {
for _, c := range tx.cursors {
if c != nil {
c.Close()
}
}
tx.cursors = nil
for _, c := range tx.streams {
for _, c := range tx.toCloseMap {
if c != nil {
c.Close()
}
}
tx.streams = nil
tx.toCloseMap = nil
tx.statelessCursors = nil
}

Expand Down Expand Up @@ -1339,8 +1330,8 @@ func (tx *MdbxTx) Cursor(bucket string) (kv.Cursor, error) {

func (tx *MdbxTx) stdCursor(bucket string) (kv.RwCursor, error) {
b := tx.db.buckets[bucket]
c := &MdbxCursor{bucketName: bucket, tx: tx, bucketCfg: b, dbi: mdbx.DBI(tx.db.buckets[bucket].DBI), id: tx.cursorID}
tx.cursorID++
c := &MdbxCursor{bucketName: bucket, tx: tx, bucketCfg: b, dbi: mdbx.DBI(tx.db.buckets[bucket].DBI), id: tx.ID}
tx.ID++

var err error
c.c, err = tx.tx.OpenCursor(c.dbi)
Expand All @@ -1349,10 +1340,10 @@ func (tx *MdbxTx) stdCursor(bucket string) (kv.RwCursor, error) {
}

// add to auto-cleanup on end of transactions
if tx.cursors == nil {
tx.cursors = map[uint64]*mdbx.Cursor{}
if tx.toCloseMap == nil {
tx.toCloseMap = make(map[uint64]kv.Closer)
}
tx.cursors[c.id] = c.c
tx.toCloseMap[c.id] = c.c
return c, nil
}

Expand Down Expand Up @@ -1761,7 +1752,7 @@ func (c *MdbxCursor) Append(k []byte, v []byte) error {
func (c *MdbxCursor) Close() {
if c.c != nil {
c.c.Close()
delete(c.tx.cursors, c.id)
delete(c.tx.toCloseMap, c.id)
c.c = nil
}
}
Expand Down Expand Up @@ -1985,7 +1976,7 @@ func (tx *MdbxTx) RangeDescend(table string, fromPrefix, toPrefix []byte, limit

type cursor2iter struct {
c kv.Cursor
id int
id uint64
tx *MdbxTx

fromPrefix, toPrefix, nextK, nextV []byte
Expand All @@ -1995,12 +1986,12 @@ type cursor2iter struct {
}

func (tx *MdbxTx) rangeOrderLimit(table string, fromPrefix, toPrefix []byte, orderAscend order.By, limit int) (*cursor2iter, error) {
s := &cursor2iter{ctx: tx.ctx, tx: tx, fromPrefix: fromPrefix, toPrefix: toPrefix, orderAscend: orderAscend, limit: int64(limit), id: tx.streamID}
tx.streamID++
if tx.streams == nil {
tx.streams = map[int]kv.Closer{}
s := &cursor2iter{ctx: tx.ctx, tx: tx, fromPrefix: fromPrefix, toPrefix: toPrefix, orderAscend: orderAscend, limit: int64(limit), id: tx.ID}
tx.ID++
if tx.toCloseMap == nil {
tx.toCloseMap = make(map[uint64]kv.Closer)
}
tx.streams[s.id] = s
tx.toCloseMap[s.id] = s
if err := s.init(table, tx); err != nil {
s.Close() //it's responsibility of constructor (our) to close resource on error
return nil, err
Expand Down Expand Up @@ -2109,7 +2100,7 @@ func (s *cursor2iter) Close() {
}
if s.c != nil {
s.c.Close()
delete(s.tx.streams, s.id)
delete(s.tx.toCloseMap, s.id)
s.c = nil
}
}
Expand Down Expand Up @@ -2146,12 +2137,12 @@ func (s *cursor2iter) Next() (k, v []byte, err error) {
}

func (tx *MdbxTx) RangeDupSort(table string, key []byte, fromPrefix, toPrefix []byte, asc order.By, limit int) (iter.KV, error) {
s := &cursorDup2iter{ctx: tx.ctx, tx: tx, key: key, fromPrefix: fromPrefix, toPrefix: toPrefix, orderAscend: bool(asc), limit: int64(limit), id: tx.streamID}
tx.streamID++
if tx.streams == nil {
tx.streams = map[int]kv.Closer{}
s := &cursorDup2iter{ctx: tx.ctx, tx: tx, key: key, fromPrefix: fromPrefix, toPrefix: toPrefix, orderAscend: bool(asc), limit: int64(limit), id: tx.ID}
tx.ID++
if tx.toCloseMap == nil {
tx.toCloseMap = make(map[uint64]kv.Closer)
}
tx.streams[s.id] = s
tx.toCloseMap[s.id] = s
if err := s.init(table, tx); err != nil {
s.Close() //it's responsibility of constructor (our) to close resource on error
return nil, err
Expand All @@ -2161,7 +2152,7 @@ func (tx *MdbxTx) RangeDupSort(table string, key []byte, fromPrefix, toPrefix []

type cursorDup2iter struct {
c kv.CursorDupSort
id int
id uint64
tx *MdbxTx

key []byte
Expand Down Expand Up @@ -2272,7 +2263,7 @@ func (s *cursorDup2iter) Close() {
}
if s.c != nil {
s.c.Close()
delete(s.tx.streams, s.id)
delete(s.tx.toCloseMap, s.id)
s.c = nil
}
}
Expand Down

0 comments on commit 60943b2

Please sign in to comment.