Skip to content

Commit

Permalink
Implement new index type that also includes mutltihash code
Browse files Browse the repository at this point in the history
Implement a new CARv2 index that contains enough information to
reconstruct the multihashes of the data payload, since `CarIndexSorted`
only includes multihash digests. Note, this index intentionally ignores
any given record with `multihash.IDENTITY` CID hash.

Add a test that asserts offsets for the same CID across sorted index and
new multihash sorted index are consistent.

Add tests that assert marshal unmarshalling of the new index type is as
expected, and it does not load records with `multihash.IDENTITY` digest.

Note, there is a need for a multicodec to be defined for the new index
type. For now TODOs are left since it requires coordination across
repos.

Relates to:
- multiformats/multicodec#227

Fixes:
- #214
  • Loading branch information
masih committed Sep 2, 2021
1 parent 1bac13d commit ac2b95a
Show file tree
Hide file tree
Showing 4 changed files with 389 additions and 15 deletions.
3 changes: 3 additions & 0 deletions v2/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ func New(codec multicodec.Code) (Index, error) {
switch codec {
case multicodec.CarIndexSorted:
return newSorted(), nil
// TODO replace with proper multicodec once defined.
case CarMultihashIndexSorted:
return newMultihashSorted(), nil
default:
return nil, fmt.Errorf("unknwon index codec: %v", codec)
}
Expand Down
174 changes: 174 additions & 0 deletions v2/index/mhindexsorted.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package index

import (
"encoding/binary"
"io"
"sort"

"github.com/ipfs/go-cid"
"github.com/multiformats/go-multicodec"
"github.com/multiformats/go-multihash"
)

// TODO replace with propper multicodec once defined.
const CarMultihashIndexSorted = multicodec.Code(0x0401)

// multiWidthCodedIndex maps multihash code (i.e. hashing algorithm) to singleWidthIndex.
// This index type is implemented with the underlying assumption that all digests generated by the
// same multihash code are of the same length.
// This index ignores any Record with multihash.IDENTITY.
type multiWidthCodedIndex map[uint64]*singleWidthCodedIndex

type singleWidthCodedIndex struct {
singleWidthIndex
code uint64
}

func (m *singleWidthCodedIndex) Marshal(w io.Writer) error {
if err := binary.Write(w, binary.LittleEndian, m.code); err != nil {
return err
}
return m.singleWidthIndex.Marshal(w)
}

func (m *singleWidthCodedIndex) Unmarshal(r io.Reader) error {
if err := binary.Read(r, binary.LittleEndian, &m.code); err != nil {
return err
}
return m.singleWidthIndex.Unmarshal(r)
}

func (m *multiWidthCodedIndex) Codec() multicodec.Code {
// TODO introduce this codec to mutlicodec table once finalized.
return CarMultihashIndexSorted
}

func (m *multiWidthCodedIndex) Marshal(w io.Writer) error {
if err := binary.Write(w, binary.LittleEndian, int32(len(*m))); err != nil {
return err
}
// The codes are unique, but ranging over a map isn't deterministic.
// As per the CARv2 spec, we must order buckets by digest length.
// TODO update CARv2 spec to reflect this for the new index type.
codes := m.sortedKeys()

for _, code := range codes {
swci := (*m)[code]
if err := swci.Marshal(w); err != nil {
return err
}
}
return nil
}

func (m *multiWidthCodedIndex) sortedKeys() []uint64 {
codes := make([]uint64, 0, len(*m))
for code := range *m {
codes = append(codes, code)
}
sort.Slice(codes, func(i, j int) bool {
return codes[i] < codes[j]
})
return codes
}

func (m *multiWidthCodedIndex) Unmarshal(r io.Reader) error {
var l int32
if err := binary.Read(r, binary.LittleEndian, &l); err != nil {
return err
}
for i := 0; i < int(l); i++ {
swci := &singleWidthCodedIndex{}
if err := swci.Unmarshal(r); err != nil {
return err
}
m.put(swci)
}
return nil
}

func (m *multiWidthCodedIndex) put(swci *singleWidthCodedIndex) {
(*m)[swci.code] = swci
}

func (m *multiWidthCodedIndex) Load(records []Record) error {
// Split cids on their digest length
byCode := make(map[uint64][]digestRecord)
for _, item := range records {
dmh, err := multihash.Decode(item.Hash())
if err != nil {
return err
}

code := dmh.Code

// Ignore IDENTITY multihashes in the index.
if code == multihash.IDENTITY {
continue
}
digest := dmh.Digest
swi, ok := byCode[code]
if !ok {
swi = make([]digestRecord, 0)
byCode[code] = swi
}

byCode[code] = append(swi, digestRecord{digest, item.Offset})
}

// Sort each list. then write to compact form.
for code, lst := range byCode {
sort.Sort(recordSet(lst))

// None of the lists can possibly be empty at this point; so we grab the first one
width := len(lst[0].digest)

// TODO: refactor compaction as a receiver on singleWidthIndex
swci := newSingleWidthCodedIndex(width, lst, code)
m.put(swci)
}
return nil
}

func newSingleWidthCodedIndex(width int, lst []digestRecord, code uint64) *singleWidthCodedIndex {
// TODO refactor duplicate compaction code in singleWidthIndex type
rcrdWdth := width + 8
compact := make([]byte, rcrdWdth*len(lst))
for off, itm := range lst {
itm.write(compact[off*rcrdWdth : (off+1)*rcrdWdth])
}
swci := &singleWidthCodedIndex{
singleWidthIndex: singleWidthIndex{
width: uint32(rcrdWdth),
len: uint64(len(lst)),
index: compact,
},
code: code,
}
return swci
}

func (m *multiWidthCodedIndex) GetAll(cid cid.Cid, f func(uint64) bool) error {
hash := cid.Hash()
dmh, err := multihash.Decode(hash)
if err != nil {
return err
}
swci, err := m.get(dmh)
if err != nil {
return err
}
return swci.getAll(dmh.Digest, f)
}

func (m *multiWidthCodedIndex) get(dmh *multihash.DecodedMultihash) (*singleWidthCodedIndex, error) {
if codedIdx, ok := (*m)[dmh.Code]; ok {
return codedIdx, nil
}
return nil, ErrNotFound
}

func newMultihashSorted() Index {
index := make(multiWidthCodedIndex)
return &index
}
99 changes: 99 additions & 0 deletions v2/index/mhindexsorted_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package index_test

import (
"bytes"
"fmt"
"math/rand"
"testing"

"github.com/ipfs/go-cid"
"github.com/ipld/go-car/v2/index"
"github.com/multiformats/go-multihash"
"github.com/stretchr/testify/require"
)

func TestMultiWidthCodedIndex_LoadDoesNotLoadIdentityMultihash(t *testing.T) {
rng := rand.New(rand.NewSource(1413))
identityRecords := generateIndexRecords(t, multihash.IDENTITY, rng)
nonIdentityRecords := generateIndexRecords(t, multihash.SHA2_256, rng)
records := append(identityRecords, nonIdentityRecords...)

subject, err := index.New(index.CarMultihashIndexSorted)
require.NoError(t, err)
err = subject.Load(records)
require.NoError(t, err)

// Assert index does not contain any records with IDENTITY multihash code.
for _, r := range identityRecords {
wantCid := r.Cid
err = subject.GetAll(wantCid, func(o uint64) bool {
require.Fail(t, "subject should not contain any records with IDENTITY multihash code")
return false
})
require.Equal(t, index.ErrNotFound, err)
}

// Assert however, index does contain the non IDENTITY records.
requireContainsAll(t, subject, nonIdentityRecords)
}

func TestMultiWidthCodedIndex_MarshalUnmarshal(t *testing.T) {
rng := rand.New(rand.NewSource(1413))
records := generateIndexRecords(t, multihash.SHA2_256, rng)

// Create a new mh sorted index and load randomly generated records into it.
subject, err := index.New(index.CarMultihashIndexSorted)
require.NoError(t, err)
err = subject.Load(records)
require.NoError(t, err)

// Marshal the index.
buf := new(bytes.Buffer)
err = subject.Marshal(buf)
require.NoError(t, err)

// Unmarshal it back to another instance of mh sorted index.
umSubject, err := index.New(index.CarMultihashIndexSorted)
require.NoError(t, err)
err = umSubject.Unmarshal(buf)
require.NoError(t, err)

// Assert original records are present in both index instances with expected offset.
requireContainsAll(t, subject, records)
requireContainsAll(t, umSubject, records)
}

func generateIndexRecords(t *testing.T, hasherCode uint64, rng *rand.Rand) []index.Record {
var records []index.Record
recordCount := rng.Intn(99) + 1 // Up to 100 records
for i := 0; i < recordCount; i++ {
records = append(records, index.Record{
Cid: generateCidV1(t, hasherCode, rng),
Offset: rng.Uint64(),
})
}
return records
}

func generateCidV1(t *testing.T, hasherCode uint64, rng *rand.Rand) cid.Cid {
data := []byte(fmt.Sprintf("🌊d-%d", rng.Uint64()))
mh, err := multihash.Sum(data, hasherCode, -1)
require.NoError(t, err)
return cid.NewCidV1(cid.Raw, mh)
}

func requireContainsAll(t *testing.T, subject index.Index, nonIdentityRecords []index.Record) {
for _, r := range nonIdentityRecords {
wantCid := r.Cid
wantOffset := r.Offset

var gotOffsets []uint64
err := subject.GetAll(wantCid, func(o uint64) bool {
gotOffsets = append(gotOffsets, o)
return false
})
require.NoError(t, err)
require.Equal(t, 1, len(gotOffsets))
require.Equal(t, wantOffset, gotOffsets[0])
}
}
Loading

0 comments on commit ac2b95a

Please sign in to comment.