diff --git a/merkle/clock/clock.go b/merkle/clock/clock.go index 31a2edec5e..aa07e79603 100644 --- a/merkle/clock/clock.go +++ b/merkle/clock/clock.go @@ -46,7 +46,7 @@ func NewMerkleClock( return &MerkleClock{ headstore: headstore, dagstore: dagstore, - headset: newHeadset(headstore, namespace), + headset: NewHeadSet(headstore, namespace), crdt: crdt, } } @@ -154,7 +154,7 @@ func (mc *MerkleClock) ProcessNode( } if !hasHeads { // reached the bottom, at a leaf log.Debug(ctx, "No heads found") - err := mc.headset.Add(ctx, root, rootPrio) + err := mc.headset.Write(ctx, root, rootPrio) if err != nil { return nil, errors.Wrap(fmt.Sprintf("error adding head (when reached the bottom) %s ", root), err) } @@ -165,7 +165,7 @@ func (mc *MerkleClock) ProcessNode( for _, l := range links { child := l.Cid log.Debug(ctx, "Scanning for replacement heads", logging.NewKV("Child", child)) - isHead, _, err := mc.headset.IsHead(ctx, child) + isHead, err := mc.headset.IsHead(ctx, child) if err != nil { return nil, errors.Wrap(fmt.Sprintf("error checking if %s is head ", child), err) } @@ -190,7 +190,7 @@ func (mc *MerkleClock) ProcessNode( // we reached a non-head node in the known tree. // This means our root block is a new head log.Debug(ctx, "Adding head") - err := mc.headset.Add(ctx, root, rootPrio) + err := mc.headset.Write(ctx, root, rootPrio) if err != nil { log.ErrorE( ctx, diff --git a/merkle/clock/clock_test.go b/merkle/clock/clock_test.go index 378961aa5d..8a22fc1ece 100644 --- a/merkle/clock/clock_test.go +++ b/merkle/clock/clock_test.go @@ -151,18 +151,6 @@ func TestMerkleClockAddDAGNodeWithHeads(t *testing.T) { ) } - // check if lww state is correct (val is test2) - // check if head/blockstore state is correct (one head, two blocks) - nHeads, err := clk.headset.Len(ctx) - if err != nil { - t.Error("Error getting MerkleClock heads size:", err) - return - } - if nHeads != 1 { - t.Errorf("Incorrect number of heads of current clock state, have %v, want %v", nHeads, 1) - return - } - numBlocks := 0 cids, err := clk.dagstore.AllKeysChan(ctx) if err != nil { diff --git a/merkle/clock/heads.go b/merkle/clock/heads.go index 7aec511231..afe388f9cf 100644 --- a/merkle/clock/heads.go +++ b/merkle/clock/heads.go @@ -17,7 +17,6 @@ import ( "sort" cid "github.com/ipfs/go-cid" - ds "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/query" "github.com/sourcenetwork/defradb/core" @@ -33,10 +32,6 @@ type heads struct { } func NewHeadSet(store datastore.DSReaderWriter, namespace core.HeadStoreKey) *heads { - return newHeadset(store, namespace) -} - -func newHeadset(store datastore.DSReaderWriter, namespace core.HeadStoreKey) *heads { return &heads{ store: store, namespace: namespace, @@ -44,98 +39,43 @@ func newHeadset(store datastore.DSReaderWriter, namespace core.HeadStoreKey) *he } func (hh *heads) key(c cid.Cid) core.HeadStoreKey { - // // return hh.namespace.WithCid(c) } -func (hh *heads) load(ctx context.Context, c cid.Cid) (uint64, error) { - v, err := hh.store.Get(ctx, hh.key(c).ToDS()) - if err != nil { - return 0, err - } - height, n := binary.Uvarint(v) - if n <= 0 { - return 0, errors.New("error decoding height") - } - return height, nil -} - -func (hh *heads) write(ctx context.Context, store ds.Write, c cid.Cid, height uint64) error { +func (hh *heads) Write(ctx context.Context, c cid.Cid, height uint64) error { buf := make([]byte, binary.MaxVarintLen64) n := binary.PutUvarint(buf, height) - if n == 0 { - return errors.New("error encoding height") - } - return store.Put(ctx, hh.key(c).ToDS(), buf[0:n]) -} -func (hh *heads) delete(ctx context.Context, store ds.Write, c cid.Cid) error { - err := store.Delete(ctx, hh.key(c).ToDS()) - if errors.Is(err, ds.ErrNotFound) { - return nil - } - return err + return hh.store.Put(ctx, hh.key(c).ToDS(), buf[0:n]) } // IsHead returns if a given cid is among the current heads. -func (hh *heads) IsHead(ctx context.Context, c cid.Cid) (bool, uint64, error) { - height, err := hh.load(ctx, c) - if errors.Is(err, ds.ErrNotFound) { - return false, 0, nil - } - return err == nil, height, err -} - -func (hh *heads) Len(ctx context.Context) (int, error) { - list, _, err := hh.List(ctx) - return len(list), err +func (hh *heads) IsHead(ctx context.Context, c cid.Cid) (bool, error) { + return hh.store.Has(ctx, hh.key(c).ToDS()) } // Replace replaces a head with a new CID. -func (hh *heads) Replace(ctx context.Context, h, c cid.Cid, height uint64) error { +func (hh *heads) Replace(ctx context.Context, old cid.Cid, new cid.Cid, height uint64) error { log.Info( ctx, "Replacing DAG head", - logging.NewKV("Old", h), - logging.NewKV("CID", c), + logging.NewKV("Old", old), + logging.NewKV("CID", new), logging.NewKV("Height", height)) - var store ds.Write = hh.store - var err error - - // batchingDs, batching := store.(ds.Batching) - // if batching { - // store, err = batchingDs.Batch() - // if err != nil { - // return err - // } - // } - - err = hh.delete(ctx, store, h) + + err := hh.store.Delete(ctx, hh.key(old).ToDS()) if err != nil { return err } - err = hh.write(ctx, store, c, height) + err = hh.Write(ctx, new, height) if err != nil { return err } - // if batching { - // err := store.(ds.Batch).Commit() - // if err != nil { - // return err - // } - // } return nil } -func (hh *heads) Add(ctx context.Context, c cid.Cid, height uint64) error { - log.Debug(ctx, "Adding new DAG head", - logging.NewKV("CID", c), - logging.NewKV("Height", height)) - return hh.write(ctx, hh.store, c, height) -} - // List returns the list of current heads plus the max height. // @todo Document Heads.List function func (hh *heads) List(ctx context.Context) ([]cid.Cid, uint64, error) { diff --git a/merkle/clock/heads_test.go b/merkle/clock/heads_test.go index cb955fdc8d..c9c6212c5c 100644 --- a/merkle/clock/heads_test.go +++ b/merkle/clock/heads_test.go @@ -20,12 +20,10 @@ import ( "testing" "github.com/ipfs/go-cid" - ds "github.com/ipfs/go-datastore" mh "github.com/multiformats/go-multihash" "github.com/sourcenetwork/defradb/core" "github.com/sourcenetwork/defradb/datastore" - "github.com/sourcenetwork/defradb/errors" ) func newRandomCID() cid.Cid { @@ -52,7 +50,7 @@ func newRandomCID() cid.Cid { func newHeadSet() *heads { s := newDS() - return newHeadset( + return NewHeadSet( datastore.AsDSReaderWriter(s), core.HeadStoreKey{}.WithDocKey("mydockey").WithFieldId("1"), ) @@ -62,125 +60,13 @@ func TestHeadsWrite(t *testing.T) { ctx := context.Background() heads := newHeadSet() c := newRandomCID() - err := heads.write(ctx, heads.store, c, uint64(1)) + err := heads.Write(ctx, c, uint64(1)) if err != nil { t.Error("Failed to write to head set:", err) return } } -func TestHeadsLoad(t *testing.T) { - ctx := context.Background() - heads := newHeadSet() - c := newRandomCID() - err := heads.write(ctx, heads.store, c, uint64(1)) - if err != nil { - t.Error("Failed to write to head set:", err) - return - } - - h, err := heads.load(ctx, c) - if err != nil { - t.Error("failed to load from head set:", err) - return - } - - if h != uint64(1) { - t.Errorf("Incorrect value from head set load(), have %v, want %v", h, uint64(1)) - return - } -} - -func TestHeadsDelete(t *testing.T) { - ctx := context.Background() - heads := newHeadSet() - c := newRandomCID() - err := heads.write(ctx, heads.store, c, uint64(1)) - if err != nil { - t.Error("Failed to write to head set:", err) - return - } - - err = heads.delete(ctx, heads.store, c) - if err != nil { - t.Error("Failed to delete from head set:", err) - return - } - - _, err = heads.load(ctx, c) - if !errors.Is(err, ds.ErrNotFound) { - t.Error("failed to delete from head set, value still set") - return - } -} - -func TestHeadsIsHead(t *testing.T) { - ctx := context.Background() - heads := newHeadSet() - c := newRandomCID() - err := heads.write(ctx, heads.store, c, uint64(1)) - if err != nil { - t.Error("Failed to write to head set:", err) - return - } - - ishead, h, err := heads.IsHead(ctx, c) - if err != nil { - t.Error("Failedd to check isHead:", err) - return - } - - if ishead == false { - t.Error("Expected isHead to return true, instead false") - return - } - - if h != uint64(1) { - t.Errorf("Incorrect height value from isHead, have %v, want %v", h, uint64(1)) - return - } -} - -func TestHeadsLen(t *testing.T) { - ctx := context.Background() - heads := newHeadSet() - c := newRandomCID() - err := heads.write(ctx, heads.store, c, uint64(1)) - if err != nil { - t.Error("Failed to write to head set:", err) - return - } - - l, err := heads.Len(ctx) - if err != nil { - t.Error("Failed to get head set length:", err) - return - } - - if l != 1 { - t.Errorf("Incorrect length for head set, have %v, want %v", l, 1) - return - } - - c = newRandomCID() - err = heads.write(ctx, heads.store, c, uint64(1)) - if err != nil { - t.Error("Failed to write to head set:", err) - return - } - - l, err = heads.Len(ctx) - if err != nil { - t.Error("Failed to get head set length (second call):", err) - return - } - - if l != 2 { - t.Errorf("Incorrect length for head set, have %v, want %v", l, 2) - return - } -} - func TestHeadsReplaceEmpty(t *testing.T) { ctx := context.Background() heads := newHeadSet() @@ -191,24 +77,13 @@ func TestHeadsReplaceEmpty(t *testing.T) { t.Error("Failed to Replace items in head set:", err) return } - - h, err := heads.load(ctx, c2) - if err != nil { - t.Error("Failed to load items in head set:", err) - return - } - - if h != uint64(3) { - t.Errorf("Invalid value for replaced head element, have %v, want %v", h, uint64(3)) - return - } } func TestHeadsReplaceNonEmpty(t *testing.T) { ctx := context.Background() heads := newHeadSet() c1 := newRandomCID() - err := heads.write(ctx, heads.store, c1, uint64(1)) + err := heads.Write(ctx, c1, uint64(1)) if err != nil { t.Error("Failed to write to head set:", err) return @@ -220,17 +95,6 @@ func TestHeadsReplaceNonEmpty(t *testing.T) { t.Error("Failed to Replace items in head set:", err) return } - - h, err := heads.load(ctx, c2) - if err != nil { - t.Error("Failed to load items in head set:", err) - return - } - - if h != uint64(3) { - t.Errorf("Invalid value for replaced head element, have %v, want %v", h, uint64(3)) - return - } } // this test is largely unneeded from a functional point of view @@ -241,7 +105,7 @@ func TestHeadsAdd(t *testing.T) { ctx := context.Background() heads := newHeadSet() c1 := newRandomCID() - err := heads.Add(ctx, c1, uint64(1)) + err := heads.Write(ctx, c1, uint64(1)) if err != nil { t.Error("Failed to Add element to head set:", err) return @@ -253,8 +117,8 @@ func TestHeaddsList(t *testing.T) { heads := newHeadSet() c1 := newRandomCID() c2 := newRandomCID() - heads.Add(ctx, c1, uint64(1)) - heads.Add(ctx, c2, uint64(2)) + heads.Write(ctx, c1, uint64(1)) + heads.Write(ctx, c2, uint64(2)) list, h, err := heads.List(ctx) if err != nil { diff --git a/planner/commit.go b/planner/commit.go index d8ac18dcf5..7e971129ae 100644 --- a/planner/commit.go +++ b/planner/commit.go @@ -165,22 +165,6 @@ func (n *dagScanNode) Next() (bool, error) { if len(n.queuedCids) > 0 { currentCid = n.queuedCids[0] n.queuedCids = n.queuedCids[1:(len(n.queuedCids))] - } else if n.parsed.Cid.HasValue() && !n.parsed.DocKey.HasValue() { - if n.visitedNodes[n.parsed.Cid.Value()] { - // If the requested cid has been visited, we are done and should return false - return false, nil - } - - cid, err := cid.Decode(n.parsed.Cid.Value()) - if err != nil { - return false, err - } - - if hasCid, err := store.Has(n.p.ctx, cid); !hasCid || err != nil { - return false, err - } - - currentCid = &cid } else { cid, err := n.fetcher.FetchNext() if err != nil || cid == nil { diff --git a/tests/integration/query/commits/with_cid_test.go b/tests/integration/query/commits/with_cid_test.go index 0ac6b53815..71e30c17c9 100644 --- a/tests/integration/query/commits/with_cid_test.go +++ b/tests/integration/query/commits/with_cid_test.go @@ -82,8 +82,6 @@ func TestQueryCommitsWithCidForFieldCommit(t *testing.T) { executeTestCase(t, test) } -// This test is for documentation reasons only. This is not -// desired behaviour (error message could be better, or empty result). func TestQueryCommitsWithInvalidCid(t *testing.T) { test := testUtils.QueryTestCase{ Description: "query for a single block by invalid CID", @@ -102,14 +100,12 @@ func TestQueryCommitsWithInvalidCid(t *testing.T) { }`, }, }, - ExpectedError: "encoding/hex: invalid byte:", + Results: []map[string]any{}, } executeTestCase(t, test) } -// This test is for documentation reasons only. This is not -// desired behaviour (error message could be better, or empty result). func TestQueryCommitsWithInvalidShortCid(t *testing.T) { test := testUtils.QueryTestCase{ Description: "query for a single block by invalid, short CID", @@ -128,7 +124,7 @@ func TestQueryCommitsWithInvalidShortCid(t *testing.T) { }`, }, }, - ExpectedError: "length greater than remaining number of bytes in buffer", + Results: []map[string]any{}, } executeTestCase(t, test)