Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Merkle clock heads cleanup #918

Merged
merged 14 commits into from
Oct 27, 2022
8 changes: 4 additions & 4 deletions merkle/clock/clock.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func NewMerkleClock(
return &MerkleClock{
headstore: headstore,
dagstore: dagstore,
headset: newHeadset(headstore, namespace),
headset: NewHeadSet(headstore, namespace),
crdt: crdt,
}
}
Expand Down Expand Up @@ -154,7 +154,7 @@ func (mc *MerkleClock) ProcessNode(
}
if !hasHeads { // reached the bottom, at a leaf
log.Debug(ctx, "No heads found")
err := mc.headset.Add(ctx, root, rootPrio)
err := mc.headset.Write(ctx, root, rootPrio)
if err != nil {
return nil, errors.Wrap(fmt.Sprintf("error adding head (when reached the bottom) %s ", root), err)
}
Expand All @@ -165,7 +165,7 @@ func (mc *MerkleClock) ProcessNode(
for _, l := range links {
child := l.Cid
log.Debug(ctx, "Scanning for replacement heads", logging.NewKV("Child", child))
isHead, _, err := mc.headset.IsHead(ctx, child)
isHead, err := mc.headset.IsHead(ctx, child)
if err != nil {
return nil, errors.Wrap(fmt.Sprintf("error checking if %s is head ", child), err)
}
Expand All @@ -190,7 +190,7 @@ func (mc *MerkleClock) ProcessNode(
// we reached a non-head node in the known tree.
// This means our root block is a new head
log.Debug(ctx, "Adding head")
err := mc.headset.Add(ctx, root, rootPrio)
err := mc.headset.Write(ctx, root, rootPrio)
if err != nil {
log.ErrorE(
ctx,
Expand Down
12 changes: 0 additions & 12 deletions merkle/clock/clock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,18 +151,6 @@ func TestMerkleClockAddDAGNodeWithHeads(t *testing.T) {
)
}

// check if lww state is correct (val is test2)
// check if head/blockstore state is correct (one head, two blocks)
nHeads, err := clk.headset.Len(ctx)
if err != nil {
t.Error("Error getting MerkleClock heads size:", err)
return
}
if nHeads != 1 {
t.Errorf("Incorrect number of heads of current clock state, have %v, want %v", nHeads, 1)
return
}

numBlocks := 0
cids, err := clk.dagstore.AllKeysChan(ctx)
if err != nil {
Expand Down
80 changes: 10 additions & 70 deletions merkle/clock/heads.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"sort"

cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"

"github.com/sourcenetwork/defradb/core"
Expand All @@ -33,109 +32,50 @@ type heads struct {
}

func NewHeadSet(store datastore.DSReaderWriter, namespace core.HeadStoreKey) *heads {
return newHeadset(store, namespace)
}

func newHeadset(store datastore.DSReaderWriter, namespace core.HeadStoreKey) *heads {
return &heads{
store: store,
namespace: namespace,
}
}

func (hh *heads) key(c cid.Cid) core.HeadStoreKey {
// /<namespace>/<cid>
return hh.namespace.WithCid(c)
}

func (hh *heads) load(ctx context.Context, c cid.Cid) (uint64, error) {
v, err := hh.store.Get(ctx, hh.key(c).ToDS())
if err != nil {
return 0, err
}
height, n := binary.Uvarint(v)
if n <= 0 {
return 0, errors.New("error decoding height")
}
return height, nil
}

func (hh *heads) write(ctx context.Context, store ds.Write, c cid.Cid, height uint64) error {
func (hh *heads) Write(ctx context.Context, c cid.Cid, height uint64) error {
buf := make([]byte, binary.MaxVarintLen64)
n := binary.PutUvarint(buf, height)
if n == 0 {
return errors.New("error encoding height")
}
return store.Put(ctx, hh.key(c).ToDS(), buf[0:n])
}

func (hh *heads) delete(ctx context.Context, store ds.Write, c cid.Cid) error {
err := store.Delete(ctx, hh.key(c).ToDS())
if errors.Is(err, ds.ErrNotFound) {
return nil
}
return err
return hh.store.Put(ctx, hh.key(c).ToDS(), buf[0:n])
}

// IsHead returns if a given cid is among the current heads.
func (hh *heads) IsHead(ctx context.Context, c cid.Cid) (bool, uint64, error) {
height, err := hh.load(ctx, c)
if errors.Is(err, ds.ErrNotFound) {
return false, 0, nil
}
return err == nil, height, err
}

func (hh *heads) Len(ctx context.Context) (int, error) {
list, _, err := hh.List(ctx)
return len(list), err
func (hh *heads) IsHead(ctx context.Context, c cid.Cid) (bool, error) {
return hh.store.Has(ctx, hh.key(c).ToDS())
fredcarle marked this conversation as resolved.
Show resolved Hide resolved
}

// Replace replaces a head with a new CID.
func (hh *heads) Replace(ctx context.Context, h, c cid.Cid, height uint64) error {
func (hh *heads) Replace(ctx context.Context, old cid.Cid, new cid.Cid, height uint64) error {
fredcarle marked this conversation as resolved.
Show resolved Hide resolved
log.Info(
ctx,
"Replacing DAG head",
logging.NewKV("Old", h),
logging.NewKV("CID", c),
logging.NewKV("Old", old),
logging.NewKV("CID", new),
logging.NewKV("Height", height))
var store ds.Write = hh.store
var err error

// batchingDs, batching := store.(ds.Batching)
// if batching {
// store, err = batchingDs.Batch()
// if err != nil {
// return err
// }
// }

err = hh.delete(ctx, store, h)

err := hh.store.Delete(ctx, hh.key(old).ToDS())
if err != nil {
return err
}

err = hh.write(ctx, store, c, height)
err = hh.Write(ctx, new, height)
if err != nil {
return err
}

// if batching {
// err := store.(ds.Batch).Commit()
// if err != nil {
// return err
// }
// }
return nil
}

func (hh *heads) Add(ctx context.Context, c cid.Cid, height uint64) error {
log.Debug(ctx, "Adding new DAG head",
logging.NewKV("CID", c),
logging.NewKV("Height", height))
return hh.write(ctx, hh.store, c, height)
}

// List returns the list of current heads plus the max height.
// @todo Document Heads.List function
func (hh *heads) List(ctx context.Context) ([]cid.Cid, uint64, error) {
Expand Down
148 changes: 6 additions & 142 deletions merkle/clock/heads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,10 @@ import (
"testing"

"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
mh "github.com/multiformats/go-multihash"

"github.com/sourcenetwork/defradb/core"
"github.com/sourcenetwork/defradb/datastore"
"github.com/sourcenetwork/defradb/errors"
)

func newRandomCID() cid.Cid {
Expand All @@ -52,7 +50,7 @@ func newRandomCID() cid.Cid {
func newHeadSet() *heads {
s := newDS()

return newHeadset(
return NewHeadSet(
datastore.AsDSReaderWriter(s),
core.HeadStoreKey{}.WithDocKey("mydockey").WithFieldId("1"),
)
Expand All @@ -62,125 +60,13 @@ func TestHeadsWrite(t *testing.T) {
ctx := context.Background()
heads := newHeadSet()
c := newRandomCID()
err := heads.write(ctx, heads.store, c, uint64(1))
err := heads.Write(ctx, c, uint64(1))
if err != nil {
t.Error("Failed to write to head set:", err)
return
}
}

func TestHeadsLoad(t *testing.T) {
ctx := context.Background()
heads := newHeadSet()
c := newRandomCID()
err := heads.write(ctx, heads.store, c, uint64(1))
if err != nil {
t.Error("Failed to write to head set:", err)
return
}

h, err := heads.load(ctx, c)
if err != nil {
t.Error("failed to load from head set:", err)
return
}

if h != uint64(1) {
t.Errorf("Incorrect value from head set load(), have %v, want %v", h, uint64(1))
return
}
}

func TestHeadsDelete(t *testing.T) {
ctx := context.Background()
heads := newHeadSet()
c := newRandomCID()
err := heads.write(ctx, heads.store, c, uint64(1))
if err != nil {
t.Error("Failed to write to head set:", err)
return
}

err = heads.delete(ctx, heads.store, c)
if err != nil {
t.Error("Failed to delete from head set:", err)
return
}

_, err = heads.load(ctx, c)
if !errors.Is(err, ds.ErrNotFound) {
t.Error("failed to delete from head set, value still set")
return
}
}

func TestHeadsIsHead(t *testing.T) {
ctx := context.Background()
heads := newHeadSet()
c := newRandomCID()
err := heads.write(ctx, heads.store, c, uint64(1))
if err != nil {
t.Error("Failed to write to head set:", err)
return
}

ishead, h, err := heads.IsHead(ctx, c)
if err != nil {
t.Error("Failedd to check isHead:", err)
return
}

if ishead == false {
t.Error("Expected isHead to return true, instead false")
return
}

if h != uint64(1) {
t.Errorf("Incorrect height value from isHead, have %v, want %v", h, uint64(1))
return
}
}

func TestHeadsLen(t *testing.T) {
ctx := context.Background()
heads := newHeadSet()
c := newRandomCID()
err := heads.write(ctx, heads.store, c, uint64(1))
if err != nil {
t.Error("Failed to write to head set:", err)
return
}

l, err := heads.Len(ctx)
if err != nil {
t.Error("Failed to get head set length:", err)
return
}

if l != 1 {
t.Errorf("Incorrect length for head set, have %v, want %v", l, 1)
return
}

c = newRandomCID()
err = heads.write(ctx, heads.store, c, uint64(1))
if err != nil {
t.Error("Failed to write to head set:", err)
return
}

l, err = heads.Len(ctx)
if err != nil {
t.Error("Failed to get head set length (second call):", err)
return
}

if l != 2 {
t.Errorf("Incorrect length for head set, have %v, want %v", l, 2)
return
}
}

func TestHeadsReplaceEmpty(t *testing.T) {
ctx := context.Background()
heads := newHeadSet()
Expand All @@ -191,24 +77,13 @@ func TestHeadsReplaceEmpty(t *testing.T) {
t.Error("Failed to Replace items in head set:", err)
return
}

h, err := heads.load(ctx, c2)
if err != nil {
t.Error("Failed to load items in head set:", err)
return
}

if h != uint64(3) {
t.Errorf("Invalid value for replaced head element, have %v, want %v", h, uint64(3))
return
}
}

func TestHeadsReplaceNonEmpty(t *testing.T) {
ctx := context.Background()
heads := newHeadSet()
c1 := newRandomCID()
err := heads.write(ctx, heads.store, c1, uint64(1))
err := heads.Write(ctx, c1, uint64(1))
if err != nil {
t.Error("Failed to write to head set:", err)
return
Expand All @@ -220,17 +95,6 @@ func TestHeadsReplaceNonEmpty(t *testing.T) {
t.Error("Failed to Replace items in head set:", err)
return
}

h, err := heads.load(ctx, c2)
if err != nil {
t.Error("Failed to load items in head set:", err)
return
}

if h != uint64(3) {
t.Errorf("Invalid value for replaced head element, have %v, want %v", h, uint64(3))
return
}
}

// this test is largely unneeded from a functional point of view
Expand All @@ -241,7 +105,7 @@ func TestHeadsAdd(t *testing.T) {
ctx := context.Background()
heads := newHeadSet()
c1 := newRandomCID()
err := heads.Add(ctx, c1, uint64(1))
err := heads.Write(ctx, c1, uint64(1))
if err != nil {
t.Error("Failed to Add element to head set:", err)
return
Expand All @@ -253,8 +117,8 @@ func TestHeaddsList(t *testing.T) {
heads := newHeadSet()
c1 := newRandomCID()
c2 := newRandomCID()
heads.Add(ctx, c1, uint64(1))
heads.Add(ctx, c2, uint64(2))
heads.Write(ctx, c1, uint64(1))
heads.Write(ctx, c2, uint64(2))

list, h, err := heads.List(ctx)
if err != nil {
Expand Down
Loading