-
Notifications
You must be signed in to change notification settings - Fork 40
/
merklecrdt.go
127 lines (106 loc) · 3.26 KB
/
merklecrdt.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
// Copyright 2022 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package crdt
import (
"context"
"time"
"github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
"github.com/sourcenetwork/defradb/core"
corenet "github.com/sourcenetwork/defradb/core/net"
"github.com/sourcenetwork/defradb/errors"
"github.com/sourcenetwork/defradb/logging"
)
var (
log = logging.MustNewLogger("defra.merklecrdt")
)
const broadcasterTimeout = time.Second
// MerkleCRDT is the implementation of a Merkle Clock along with a
// CRDT payload. It implements the ReplicatedData interface
// so it can be merged with any given semantics.
type MerkleCRDT interface {
core.ReplicatedData
Clock() core.MerkleClock
}
var (
// defaultMerkleCRDTs = make(map[Type]MerkleCRDTFactory)
_ core.ReplicatedData = (*baseMerkleCRDT)(nil)
)
// baseMerkleCRDT handles the MerkleCRDT overhead functions that aren't CRDT specific like the mutations and state
// retrieval functions. It handles creating and publishing the CRDT DAG with the help of the MerkleClock.
type baseMerkleCRDT struct {
clock core.MerkleClock
crdt core.ReplicatedData
broadcaster corenet.Broadcaster
}
func (base *baseMerkleCRDT) Clock() core.MerkleClock {
return base.clock
}
func (base *baseMerkleCRDT) Merge(ctx context.Context, other core.Delta, id string) error {
return base.crdt.Merge(ctx, other, id)
}
func (base *baseMerkleCRDT) DeltaDecode(node ipld.Node) (core.Delta, error) {
return base.crdt.DeltaDecode(node)
}
func (base *baseMerkleCRDT) Value(ctx context.Context) ([]byte, error) {
return base.crdt.Value(ctx)
}
func (base *baseMerkleCRDT) ID() string {
return base.crdt.ID()
}
// Publishes the delta to state.
func (base *baseMerkleCRDT) Publish(
ctx context.Context,
delta core.Delta,
) (cid.Cid, ipld.Node, error) {
log.Debug(ctx, "Processing CRDT state", logging.NewKV("DocKey", base.crdt.ID()))
c, nd, err := base.clock.AddDAGNode(ctx, delta)
if err != nil {
return cid.Undef, nil, err
}
return c, nd, nil
}
func (base *baseMerkleCRDT) Broadcast(ctx context.Context, nd ipld.Node, delta core.Delta) error {
if base.broadcaster == nil {
return nil // just skip if we dont have a broadcaster set
}
dockey := core.NewDataStoreKey(base.crdt.ID()).DocKey
c := nd.Cid()
netdelta, ok := delta.(core.NetDelta)
if !ok {
return errors.New("Can't broadcast a delta payload that doesn't implement core.NetDelta")
}
log.Debug(
ctx,
"Broadcasting new DAG node",
logging.NewKV("DocKey", dockey),
logging.NewKV("CID", c),
)
// we dont want to wait around for the broadcast
go func() {
lg := core.Log{
DocKey: dockey,
Cid: c,
SchemaID: netdelta.GetSchemaID(),
Block: nd,
Priority: netdelta.GetPriority(),
}
if err := base.broadcaster.SendWithTimeout(lg, broadcasterTimeout); err != nil {
log.ErrorE(
ctx,
"Failed to broadcast MerkleCRDT update",
err,
logging.NewKV("DocKey", dockey),
logging.NewKV("CID", c),
)
}
}()
return nil
}