Skip to content

Commit

Permalink
idx: "less false positives" feature (erigontech#9506)
Browse files Browse the repository at this point in the history
for erigontech#9486
it will apply only to new .idx files. 
it doesn't require user to re-gen .idx files (backward compatible), but
if you need: delete `snapshots/*.idx` and run `erigon snapshots index`
  • Loading branch information
AskAlexSharov authored and mriccobene committed Mar 13, 2024
1 parent ed67295 commit 878b76d
Show file tree
Hide file tree
Showing 18 changed files with 214 additions and 59 deletions.
5 changes: 4 additions & 1 deletion cmd/hack/hack.go
Original file line number Diff line number Diff line change
Expand Up @@ -1301,7 +1301,10 @@ func iterate(filename string, prefix string) error {
txNum, _ := efIt.Next()
var txKey [8]byte
binary.BigEndian.PutUint64(txKey[:], txNum)
offset := r.Lookup2(txKey[:], key)
offset, ok := r.Lookup2(txKey[:], key)
if !ok {
continue
}
gv.Reset(offset)
v, _ := gv.Next(nil)
fmt.Printf(" %d", txNum)
Expand Down
54 changes: 48 additions & 6 deletions erigon-lib/recsplit/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,30 @@ type Features byte

const (
No Features = 0b0
// Enums - Whether to build two level index with perfect hash table pointing to enumeration and enumeration pointing to offsets

// Enums - To build 2-lvl index with perfect hash table pointing to enumeration and enumeration pointing to offsets
Enums Features = 0b1
//LessFalsePositives Features = 0b10 // example of adding new feature
// LessFalsePositives - Reduce false-positives to 1/256=0.4% in cost of 1byte per key
// Implementation:
// PerfectHashMap - does false-positives if unknown key is requested. But "false-positives itself" is not a problem.
// Problem is "nature of false-positives" - they are randomly/smashed across .seg files.
// It makes .seg files "warm" - which is bad because they are big and
// data-locality of touches is bad (and maybe need visit a lot of shards to find key).
// Can add build-in "existence filter" (like bloom/cucko/ribbon/xor-filter/fuse-filter) it will improve
// data-locality - filters are small-enough and existance-chekcs will be co-located on disk.
// But there are 2 additional properties we have in our data:
// "keys are known", "keys are hashed" (.idx works on murmur3), ".idx can calc key-number by key".
// It means: if we rely on this properties then we can do better than general-purpose-existance-filter.
// Seems just an "array of 1-st bytes of key-hashes" is great alternative:
// general-purpose-filter: 9bits/key, 0.3% false-positives, 3 mem access
// first-bytes-array: 8bits/key, 1/256=0.4% false-positives, 1 mem access
//
// See also: https://github.com/ledgerwatch/erigon/issues/9486
LessFalsePositives Features = 0b10 //
)

// SupportedFeaturs - if see feature not from this list (likely after downgrade) - return IncompatibleErr and recommend for user manually delete file
var SupportedFeatures = []Features{Enums}
var SupportedFeatures = []Features{Enums, LessFalsePositives}
var IncompatibleErr = errors.New("incompatible. can re-build such files by command 'erigon snapshots index'")

// Index implements index lookup from the file created by the RecSplit
Expand Down Expand Up @@ -78,6 +95,9 @@ type Index struct {
primaryAggrBound uint16 // The lower bound for primary key aggregation (computed from leafSize)
enums bool

lessFalsePositives bool
existence []byte

readers *sync.Pool
}

Expand Down Expand Up @@ -153,11 +173,22 @@ func OpenIndex(indexFilePath string) (*Index, error) {
}

idx.enums = features&Enums != No
idx.lessFalsePositives = features&LessFalsePositives != No
offset++
if idx.enums && idx.keyCount > 0 {
var size int
idx.offsetEf, size = eliasfano32.ReadEliasFano(idx.data[offset:])
offset += size

if idx.lessFalsePositives {
arrSz := binary.BigEndian.Uint64(idx.data[offset:])
offset += 8
if arrSz != idx.keyCount {
return nil, fmt.Errorf("%w. size of existence filter %d != keys count %d", IncompatibleErr, arrSz, idx.keyCount)
}
idx.existence = idx.data[offset : offset+int(arrSz)]
offset += int(arrSz)
}
}
// Size of golomb rice params
golombParamSize := binary.BigEndian.Uint16(idx.data[offset:])
Expand Down Expand Up @@ -248,13 +279,13 @@ func (idx *Index) KeyCount() uint64 {
}

// Lookup is not thread-safe because it used id.hasher
func (idx *Index) Lookup(bucketHash, fingerprint uint64) uint64 {
func (idx *Index) Lookup(bucketHash, fingerprint uint64) (uint64, bool) {
if idx.keyCount == 0 {
_, fName := filepath.Split(idx.filePath)
panic("no Lookup should be done when keyCount==0, please use Empty function to guard " + fName)
}
if idx.keyCount == 1 {
return 0
return 0, true
}
var gr GolombRiceReader
gr.data = idx.grData
Expand Down Expand Up @@ -311,7 +342,11 @@ func (idx *Index) Lookup(bucketHash, fingerprint uint64) uint64 {
rec := int(cumKeys) + int(remap16(remix(fingerprint+idx.startSeed[level]+b), m))
pos := 1 + 8 + idx.bytesPerRec*(rec+1)

return binary.BigEndian.Uint64(idx.data[pos:]) & idx.recMask
found := binary.BigEndian.Uint64(idx.data[pos:]) & idx.recMask
if idx.lessFalsePositives {
return found, idx.existence[found] == byte(bucketHash)
}
return found, true
}

// OrdinalLookup returns the offset of i-th element in the index
Expand All @@ -321,6 +356,13 @@ func (idx *Index) OrdinalLookup(i uint64) uint64 {
return idx.offsetEf.Get(i)
}

func (idx *Index) Has(bucketHash, i uint64) bool {
if idx.lessFalsePositives {
return idx.existence[i] == byte(bucketHash)
}
return true
}

func (idx *Index) ExtractOffsets() map[uint64]uint64 {
m := map[uint64]uint64{}
pos := 1 + 8 + idx.bytesPerRec
Expand Down
8 changes: 4 additions & 4 deletions erigon-lib/recsplit/index_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,20 +55,20 @@ func (r *IndexReader) sum2(key1, key2 []byte) (uint64, uint64) {
}

// Lookup wraps index Lookup
func (r *IndexReader) Lookup(key []byte) uint64 {
func (r *IndexReader) Lookup(key []byte) (uint64, bool) {
bucketHash, fingerprint := r.sum(key)
if r.index != nil {
return r.index.Lookup(bucketHash, fingerprint)
}
return 0
return 0, true
}

func (r *IndexReader) Lookup2(key1, key2 []byte) uint64 {
func (r *IndexReader) Lookup2(key1, key2 []byte) (uint64, bool) {
bucketHash, fingerprint := r.sum2(key1, key2)
if r.index != nil {
return r.index.Lookup(bucketHash, fingerprint)
}
return 0
return 0, true
}

func (r *IndexReader) Empty() bool {
Expand Down
2 changes: 1 addition & 1 deletion erigon-lib/recsplit/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestReWriteIndex(t *testing.T) {
defer reidx.Close()
for i := 0; i < 100; i++ {
reader := NewIndexReader(reidx)
offset := reader.Lookup([]byte(fmt.Sprintf("key %d", i)))
offset, _ := reader.Lookup([]byte(fmt.Sprintf("key %d", i)))
if offset != uint64(i*3965) {
t.Errorf("expected offset: %d, looked up: %d", i*3965, offset)
}
Expand Down
72 changes: 64 additions & 8 deletions erigon-lib/recsplit/recsplit.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,15 @@ func remix(z uint64) uint64 {
type RecSplit struct {
hasher murmur3.Hash128 // Salted hash function to use for splitting into initial buckets and mapping to 64-bit fingerprints
offsetCollector *etl.Collector // Collector that sorts by offsets

indexW *bufio.Writer
indexF *os.File
offsetEf *eliasfano32.EliasFano // Elias Fano instance for encoding the offsets
bucketCollector *etl.Collector // Collector that sorts by buckets

existenceF *os.File
existenceW *bufio.Writer

indexFileName string
indexFile, tmpFilePath string

Expand Down Expand Up @@ -108,6 +112,7 @@ type RecSplit struct {
numBuf [8]byte
collision bool
enums bool // Whether to build two level index with perfect hash table pointing to enumeration and enumeration pointing to offsets
lessFalsePositives bool
built bool // Flag indicating that the hash function has been built and no more keys can be added
trace bool
logger log.Logger
Expand All @@ -119,7 +124,8 @@ type RecSplitArgs struct {
// Whether two level index needs to be built, where perfect hash map points to an enumeration, and enumeration points to offsets
// if Enum=false: can have unsorted and duplicated values
// if Enum=true: must have sorted values (can have duplicates) - monotonically growing sequence
Enums bool
Enums bool
LessFalsePositives bool

IndexFile string // File name where the index and the minimal perfect hash function will be written to
TmpDir string
Expand Down Expand Up @@ -174,6 +180,15 @@ func NewRecSplit(args RecSplitArgs, logger log.Logger) (*RecSplit, error) {
rs.offsetCollector = etl.NewCollector(RecSplitLogPrefix+" "+fname, rs.tmpDir, etl.NewSortableBuffer(rs.etlBufLimit), logger)
rs.offsetCollector.LogLvl(log.LvlDebug)
}
rs.lessFalsePositives = args.LessFalsePositives
if rs.enums && args.KeyCount > 0 && rs.lessFalsePositives {
bufferFile, err := os.CreateTemp(rs.tmpDir, "erigon-lfp-buf-")
if err != nil {
return nil, err
}
rs.existenceF = bufferFile
rs.existenceW = bufio.NewWriter(rs.existenceF)
}
rs.currentBucket = make([]uint64, 0, args.BucketSize)
rs.currentBucketOffs = make([]uint64, 0, args.BucketSize)
rs.maxOffset = 0
Expand All @@ -198,6 +213,9 @@ func (rs *RecSplit) Close() {
if rs.indexF != nil {
rs.indexF.Close()
}
if rs.existenceF != nil {
rs.existenceF.Close()
}
if rs.bucketCollector != nil {
rs.bucketCollector.Close()
}
Expand All @@ -214,8 +232,8 @@ func (rs *RecSplit) SetTrace(trace bool) {

// remap converts the number x which is assumed to be uniformly distributed over the range [0..2^64) to the number that is uniformly
// distributed over the range [0..n)
func remap(x uint64, n uint64) uint64 {
hi, _ := bits.Mul64(x, n)
func remap(x uint64, n uint64) (hi uint64) {
hi, _ = bits.Mul64(x, n)
return hi
}

Expand Down Expand Up @@ -264,6 +282,8 @@ func splitParams(m, leafSize, primaryAggrBound, secondaryAggrBound uint16) (fano
return
}

var golombBaseLog2 = -math.Log((math.Sqrt(5) + 1.0) / 2.0)

func computeGolombRice(m uint16, table []uint32, leafSize, primaryAggrBound, secondaryAggrBound uint16) {
fanout, unit := splitParams(m, leafSize, primaryAggrBound, secondaryAggrBound)
k := make([]uint16, fanout)
Expand All @@ -277,7 +297,7 @@ func computeGolombRice(m uint16, table []uint32, leafSize, primaryAggrBound, sec
sqrtProd *= math.Sqrt(float64(k[i]))
}
p := math.Sqrt(float64(m)) / (math.Pow(2*math.Pi, (float64(fanout)-1.)/2.0) * sqrtProd)
golombRiceLength := uint32(math.Ceil(math.Log2(-math.Log((math.Sqrt(5)+1.0)/2.0) / math.Log1p(-p)))) // log2 Golomb modulus
golombRiceLength := uint32(math.Ceil(math.Log2(golombBaseLog2 / math.Log1p(-p)))) // log2 Golomb modulus
if golombRiceLength > 0x1F {
panic("golombRiceLength > 0x1F")
}
Expand All @@ -303,8 +323,7 @@ func computeGolombRice(m uint16, table []uint32, leafSize, primaryAggrBound, sec
// salt for the part of the hash function separating m elements. It is based on
// calculations with assumptions that we draw hash functions at random
func (rs *RecSplit) golombParam(m uint16) int {
s := uint16(len(rs.golombRice))
for m >= s {
for s := uint16(len(rs.golombRice)); m >= s; s++ {
rs.golombRice = append(rs.golombRice, 0)
// For the case where bucket is larger than planned
if s == 0 {
Expand All @@ -314,7 +333,6 @@ func (rs *RecSplit) golombParam(m uint16) int {
} else {
computeGolombRice(s, rs.golombRice, rs.leafSize, rs.primaryAggrBound, rs.secondaryAggrBound)
}
s++
}
return int(rs.golombRice[m] >> 27)
}
Expand Down Expand Up @@ -350,6 +368,12 @@ func (rs *RecSplit) AddKey(key []byte, offset uint64) error {
if err := rs.bucketCollector.Collect(rs.bucketKeyBuf[:], rs.numBuf[:]); err != nil {
return err
}
if rs.lessFalsePositives {
//1 byte from each hashed key
if err := rs.existenceW.WriteByte(byte(hi)); err != nil {
return err
}
}
} else {
if err := rs.bucketCollector.Collect(rs.bucketKeyBuf[:], rs.numBuf[:]); err != nil {
return err
Expand Down Expand Up @@ -561,7 +585,7 @@ func (rs *RecSplit) Build(ctx context.Context) error {
return fmt.Errorf("create index file %s: %w", rs.indexFile, err)
}

rs.logger.Debug("[index] created", "file", rs.tmpFilePath, "fs", rs.indexF)
rs.logger.Debug("[index] created", "file", rs.tmpFilePath)

defer rs.indexF.Close()
rs.indexW = bufio.NewWriterSize(rs.indexF, etl.BufIOSize)
Expand Down Expand Up @@ -652,6 +676,9 @@ func (rs *RecSplit) Build(ctx context.Context) error {
var features Features
if rs.enums {
features |= Enums
if rs.lessFalsePositives {
features |= LessFalsePositives
}
}
if err := rs.indexW.WriteByte(byte(features)); err != nil {
return fmt.Errorf("writing enums = true: %w", err)
Expand All @@ -662,6 +689,10 @@ func (rs *RecSplit) Build(ctx context.Context) error {
return fmt.Errorf("writing elias fano for offsets: %w", err)
}
}
if err := rs.flushExistenceFilter(); err != nil {
return err
}

// Write out the size of golomb rice params
binary.BigEndian.PutUint16(rs.numBuf[:], uint16(len(rs.golombRice)))
if _, err := rs.indexW.Write(rs.numBuf[:4]); err != nil {
Expand Down Expand Up @@ -694,6 +725,31 @@ func (rs *RecSplit) Build(ctx context.Context) error {
return nil
}

func (rs *RecSplit) flushExistenceFilter() error {
if !rs.enums || rs.keysAdded == 0 || !rs.lessFalsePositives {
return nil
}
defer rs.existenceF.Close()

//Write len of array
binary.BigEndian.PutUint64(rs.numBuf[:], rs.keysAdded)
if _, err := rs.indexW.Write(rs.numBuf[:]); err != nil {
return err
}

// flush bufio and rewind before io.Copy, but no reason to fsync the file - it temporary
if err := rs.existenceW.Flush(); err != nil {
return err
}
if _, err := rs.existenceF.Seek(0, io.SeekStart); err != nil {
return err
}
if _, err := io.CopyN(rs.indexW, rs.existenceF, int64(rs.keysAdded)); err != nil {
return err
}
return nil
}

func (rs *RecSplit) DisableFsync() { rs.noFsync = true }

// Fsync - other processes/goroutines must see only "fully-complete" (valid) files. No partial-writes.
Expand Down
2 changes: 1 addition & 1 deletion erigon-lib/recsplit/recsplit_fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func FuzzRecSplit(f *testing.F) {
bits := make([]uint64, bitCount)
reader := NewIndexReader(idx)
for i = 0; i < len(in)-l; i += l {
off = reader.Lookup(in[i : i+l])
off, _ = reader.Lookup(in[i : i+l])
if int(off) >= count {
t.Errorf("off %d >= count %d", off, count)
}
Expand Down
20 changes: 11 additions & 9 deletions erigon-lib/recsplit/recsplit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func TestIndexLookup(t *testing.T) {
defer idx.Close()
for i := 0; i < 100; i++ {
reader := NewIndexReader(idx)
offset := reader.Lookup([]byte(fmt.Sprintf("key %d", i)))
offset, _ := reader.Lookup([]byte(fmt.Sprintf("key %d", i)))
if offset != uint64(i*17) {
t.Errorf("expected offset: %d, looked up: %d", i*17, offset)
}
Expand All @@ -138,14 +138,16 @@ func TestTwoLayerIndex(t *testing.T) {
logger := log.New()
tmpDir := t.TempDir()
indexFile := filepath.Join(tmpDir, "index")
salt := uint32(1)
rs, err := NewRecSplit(RecSplitArgs{
KeyCount: 100,
BucketSize: 10,
Salt: 0,
TmpDir: tmpDir,
IndexFile: indexFile,
LeafSize: 8,
Enums: true,
KeyCount: 100,
BucketSize: 10,
Salt: salt,
TmpDir: tmpDir,
IndexFile: indexFile,
LeafSize: 8,
Enums: true,
LessFalsePositives: true,
}, logger)
if err != nil {
t.Fatal(err)
Expand All @@ -163,7 +165,7 @@ func TestTwoLayerIndex(t *testing.T) {
defer idx.Close()
for i := 0; i < 100; i++ {
reader := NewIndexReader(idx)
e := reader.Lookup([]byte(fmt.Sprintf("key %d", i)))
e, _ := reader.Lookup([]byte(fmt.Sprintf("key %d", i)))
if e != uint64(i) {
t.Errorf("expected enumeration: %d, lookup up: %d", i, e)
}
Expand Down
2 changes: 1 addition & 1 deletion erigon-lib/state/aggregator_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func Benchmark_Recsplit_Find_ExternalFile(b *testing.B) {
for i := 0; i < b.N; i++ {
p := rnd.Intn(len(keys))

offset := idxr.Lookup(keys[p])
offset, _ := idxr.Lookup(keys[p])
getter.Reset(offset)

require.True(b, getter.HasNext())
Expand Down
Loading

0 comments on commit 878b76d

Please sign in to comment.