Skip to content

Commit

Permalink
refactor: Merkle clock heads cleanup (#918)
Browse files Browse the repository at this point in the history
* Remove commented out code

* Remove dead code (Len)

* Remove unnessecary var declaration

* Remove unessecary func param

* Remove extra Add func

Add is a poor name, and adds an extra layer of misdirection.

* Remove extra delete func

Is only called from here, and the caller to this function knows that this row exists, so we will never actually hit the isNotFound error allowing it to be safely dropped.

* Rename func params

Old names unhelpful and the distinction was easily missed resulting in a bug (caught by tests).

* Unfactor IsHead

Is only called once, and the unfactoring allows for future refactorings

* Remove unused return param

* Use Has instead of Get+IsError

* Remove private constructor

* Remove incorrect error

Error would have been way up the callstack where ever height was declared, and it is on those funcs to make sure it fits whatever constraints they may have.

* Remove unhelpful comment

Is also potentially misleading as the cid's location in the key is dependent on the index format, not the code in this file/package

* Remove cid-based shortcut

This optimization should be handled by the index, not the scan code.  It also produces undesirable errors which have been corrected in this commit.
  • Loading branch information
AndrewSisley authored Oct 27, 2022
1 parent 3b53193 commit dbc8cd0
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 250 deletions.
8 changes: 4 additions & 4 deletions merkle/clock/clock.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func NewMerkleClock(
return &MerkleClock{
headstore: headstore,
dagstore: dagstore,
headset: newHeadset(headstore, namespace),
headset: NewHeadSet(headstore, namespace),
crdt: crdt,
}
}
Expand Down Expand Up @@ -154,7 +154,7 @@ func (mc *MerkleClock) ProcessNode(
}
if !hasHeads { // reached the bottom, at a leaf
log.Debug(ctx, "No heads found")
err := mc.headset.Add(ctx, root, rootPrio)
err := mc.headset.Write(ctx, root, rootPrio)
if err != nil {
return nil, errors.Wrap(fmt.Sprintf("error adding head (when reached the bottom) %s ", root), err)
}
Expand All @@ -165,7 +165,7 @@ func (mc *MerkleClock) ProcessNode(
for _, l := range links {
child := l.Cid
log.Debug(ctx, "Scanning for replacement heads", logging.NewKV("Child", child))
isHead, _, err := mc.headset.IsHead(ctx, child)
isHead, err := mc.headset.IsHead(ctx, child)
if err != nil {
return nil, errors.Wrap(fmt.Sprintf("error checking if %s is head ", child), err)
}
Expand All @@ -190,7 +190,7 @@ func (mc *MerkleClock) ProcessNode(
// we reached a non-head node in the known tree.
// This means our root block is a new head
log.Debug(ctx, "Adding head")
err := mc.headset.Add(ctx, root, rootPrio)
err := mc.headset.Write(ctx, root, rootPrio)
if err != nil {
log.ErrorE(
ctx,
Expand Down
12 changes: 0 additions & 12 deletions merkle/clock/clock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,18 +151,6 @@ func TestMerkleClockAddDAGNodeWithHeads(t *testing.T) {
)
}

// check if lww state is correct (val is test2)
// check if head/blockstore state is correct (one head, two blocks)
nHeads, err := clk.headset.Len(ctx)
if err != nil {
t.Error("Error getting MerkleClock heads size:", err)
return
}
if nHeads != 1 {
t.Errorf("Incorrect number of heads of current clock state, have %v, want %v", nHeads, 1)
return
}

numBlocks := 0
cids, err := clk.dagstore.AllKeysChan(ctx)
if err != nil {
Expand Down
80 changes: 10 additions & 70 deletions merkle/clock/heads.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"sort"

cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"

"github.com/sourcenetwork/defradb/core"
Expand All @@ -33,109 +32,50 @@ type heads struct {
}

func NewHeadSet(store datastore.DSReaderWriter, namespace core.HeadStoreKey) *heads {
return newHeadset(store, namespace)
}

func newHeadset(store datastore.DSReaderWriter, namespace core.HeadStoreKey) *heads {
return &heads{
store: store,
namespace: namespace,
}
}

func (hh *heads) key(c cid.Cid) core.HeadStoreKey {
// /<namespace>/<cid>
return hh.namespace.WithCid(c)
}

func (hh *heads) load(ctx context.Context, c cid.Cid) (uint64, error) {
v, err := hh.store.Get(ctx, hh.key(c).ToDS())
if err != nil {
return 0, err
}
height, n := binary.Uvarint(v)
if n <= 0 {
return 0, errors.New("error decoding height")
}
return height, nil
}

func (hh *heads) write(ctx context.Context, store ds.Write, c cid.Cid, height uint64) error {
func (hh *heads) Write(ctx context.Context, c cid.Cid, height uint64) error {
buf := make([]byte, binary.MaxVarintLen64)
n := binary.PutUvarint(buf, height)
if n == 0 {
return errors.New("error encoding height")
}
return store.Put(ctx, hh.key(c).ToDS(), buf[0:n])
}

func (hh *heads) delete(ctx context.Context, store ds.Write, c cid.Cid) error {
err := store.Delete(ctx, hh.key(c).ToDS())
if errors.Is(err, ds.ErrNotFound) {
return nil
}
return err
return hh.store.Put(ctx, hh.key(c).ToDS(), buf[0:n])
}

// IsHead returns if a given cid is among the current heads.
func (hh *heads) IsHead(ctx context.Context, c cid.Cid) (bool, uint64, error) {
height, err := hh.load(ctx, c)
if errors.Is(err, ds.ErrNotFound) {
return false, 0, nil
}
return err == nil, height, err
}

func (hh *heads) Len(ctx context.Context) (int, error) {
list, _, err := hh.List(ctx)
return len(list), err
func (hh *heads) IsHead(ctx context.Context, c cid.Cid) (bool, error) {
return hh.store.Has(ctx, hh.key(c).ToDS())
}

// Replace replaces a head with a new CID.
func (hh *heads) Replace(ctx context.Context, h, c cid.Cid, height uint64) error {
func (hh *heads) Replace(ctx context.Context, old cid.Cid, new cid.Cid, height uint64) error {
log.Info(
ctx,
"Replacing DAG head",
logging.NewKV("Old", h),
logging.NewKV("CID", c),
logging.NewKV("Old", old),
logging.NewKV("CID", new),
logging.NewKV("Height", height))
var store ds.Write = hh.store
var err error

// batchingDs, batching := store.(ds.Batching)
// if batching {
// store, err = batchingDs.Batch()
// if err != nil {
// return err
// }
// }

err = hh.delete(ctx, store, h)

err := hh.store.Delete(ctx, hh.key(old).ToDS())
if err != nil {
return err
}

err = hh.write(ctx, store, c, height)
err = hh.Write(ctx, new, height)
if err != nil {
return err
}

// if batching {
// err := store.(ds.Batch).Commit()
// if err != nil {
// return err
// }
// }
return nil
}

func (hh *heads) Add(ctx context.Context, c cid.Cid, height uint64) error {
log.Debug(ctx, "Adding new DAG head",
logging.NewKV("CID", c),
logging.NewKV("Height", height))
return hh.write(ctx, hh.store, c, height)
}

// List returns the list of current heads plus the max height.
// @todo Document Heads.List function
func (hh *heads) List(ctx context.Context) ([]cid.Cid, uint64, error) {
Expand Down
148 changes: 6 additions & 142 deletions merkle/clock/heads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,10 @@ import (
"testing"

"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
mh "github.com/multiformats/go-multihash"

"github.com/sourcenetwork/defradb/core"
"github.com/sourcenetwork/defradb/datastore"
"github.com/sourcenetwork/defradb/errors"
)

func newRandomCID() cid.Cid {
Expand All @@ -52,7 +50,7 @@ func newRandomCID() cid.Cid {
func newHeadSet() *heads {
s := newDS()

return newHeadset(
return NewHeadSet(
datastore.AsDSReaderWriter(s),
core.HeadStoreKey{}.WithDocKey("mydockey").WithFieldId("1"),
)
Expand All @@ -62,125 +60,13 @@ func TestHeadsWrite(t *testing.T) {
ctx := context.Background()
heads := newHeadSet()
c := newRandomCID()
err := heads.write(ctx, heads.store, c, uint64(1))
err := heads.Write(ctx, c, uint64(1))
if err != nil {
t.Error("Failed to write to head set:", err)
return
}
}

func TestHeadsLoad(t *testing.T) {
ctx := context.Background()
heads := newHeadSet()
c := newRandomCID()
err := heads.write(ctx, heads.store, c, uint64(1))
if err != nil {
t.Error("Failed to write to head set:", err)
return
}

h, err := heads.load(ctx, c)
if err != nil {
t.Error("failed to load from head set:", err)
return
}

if h != uint64(1) {
t.Errorf("Incorrect value from head set load(), have %v, want %v", h, uint64(1))
return
}
}

func TestHeadsDelete(t *testing.T) {
ctx := context.Background()
heads := newHeadSet()
c := newRandomCID()
err := heads.write(ctx, heads.store, c, uint64(1))
if err != nil {
t.Error("Failed to write to head set:", err)
return
}

err = heads.delete(ctx, heads.store, c)
if err != nil {
t.Error("Failed to delete from head set:", err)
return
}

_, err = heads.load(ctx, c)
if !errors.Is(err, ds.ErrNotFound) {
t.Error("failed to delete from head set, value still set")
return
}
}

func TestHeadsIsHead(t *testing.T) {
ctx := context.Background()
heads := newHeadSet()
c := newRandomCID()
err := heads.write(ctx, heads.store, c, uint64(1))
if err != nil {
t.Error("Failed to write to head set:", err)
return
}

ishead, h, err := heads.IsHead(ctx, c)
if err != nil {
t.Error("Failedd to check isHead:", err)
return
}

if ishead == false {
t.Error("Expected isHead to return true, instead false")
return
}

if h != uint64(1) {
t.Errorf("Incorrect height value from isHead, have %v, want %v", h, uint64(1))
return
}
}

func TestHeadsLen(t *testing.T) {
ctx := context.Background()
heads := newHeadSet()
c := newRandomCID()
err := heads.write(ctx, heads.store, c, uint64(1))
if err != nil {
t.Error("Failed to write to head set:", err)
return
}

l, err := heads.Len(ctx)
if err != nil {
t.Error("Failed to get head set length:", err)
return
}

if l != 1 {
t.Errorf("Incorrect length for head set, have %v, want %v", l, 1)
return
}

c = newRandomCID()
err = heads.write(ctx, heads.store, c, uint64(1))
if err != nil {
t.Error("Failed to write to head set:", err)
return
}

l, err = heads.Len(ctx)
if err != nil {
t.Error("Failed to get head set length (second call):", err)
return
}

if l != 2 {
t.Errorf("Incorrect length for head set, have %v, want %v", l, 2)
return
}
}

func TestHeadsReplaceEmpty(t *testing.T) {
ctx := context.Background()
heads := newHeadSet()
Expand All @@ -191,24 +77,13 @@ func TestHeadsReplaceEmpty(t *testing.T) {
t.Error("Failed to Replace items in head set:", err)
return
}

h, err := heads.load(ctx, c2)
if err != nil {
t.Error("Failed to load items in head set:", err)
return
}

if h != uint64(3) {
t.Errorf("Invalid value for replaced head element, have %v, want %v", h, uint64(3))
return
}
}

func TestHeadsReplaceNonEmpty(t *testing.T) {
ctx := context.Background()
heads := newHeadSet()
c1 := newRandomCID()
err := heads.write(ctx, heads.store, c1, uint64(1))
err := heads.Write(ctx, c1, uint64(1))
if err != nil {
t.Error("Failed to write to head set:", err)
return
Expand All @@ -220,17 +95,6 @@ func TestHeadsReplaceNonEmpty(t *testing.T) {
t.Error("Failed to Replace items in head set:", err)
return
}

h, err := heads.load(ctx, c2)
if err != nil {
t.Error("Failed to load items in head set:", err)
return
}

if h != uint64(3) {
t.Errorf("Invalid value for replaced head element, have %v, want %v", h, uint64(3))
return
}
}

// this test is largely unneeded from a functional point of view
Expand All @@ -241,7 +105,7 @@ func TestHeadsAdd(t *testing.T) {
ctx := context.Background()
heads := newHeadSet()
c1 := newRandomCID()
err := heads.Add(ctx, c1, uint64(1))
err := heads.Write(ctx, c1, uint64(1))
if err != nil {
t.Error("Failed to Add element to head set:", err)
return
Expand All @@ -253,8 +117,8 @@ func TestHeaddsList(t *testing.T) {
heads := newHeadSet()
c1 := newRandomCID()
c2 := newRandomCID()
heads.Add(ctx, c1, uint64(1))
heads.Add(ctx, c2, uint64(2))
heads.Write(ctx, c1, uint64(1))
heads.Write(ctx, c2, uint64(2))

list, h, err := heads.List(ctx)
if err != nil {
Expand Down
Loading

0 comments on commit dbc8cd0

Please sign in to comment.