-
Notifications
You must be signed in to change notification settings - Fork 1
/
runsend.go
82 lines (67 loc) · 2.28 KB
/
runsend.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
// runsend.go
package crdt
import (
"context"
"io"
"github.com/dgraph-io/badger"
stan "github.com/nats-io/stan.go"
"github.com/pkg/errors"
)
//
// processes the stream of json objects from the given reader;
// converts each object into a crdt, compares with any exisitng
// local version of the same object and performs an update and sends
// the resultiing object on.
//
//
// Sends the resulting object to the
// streaming provider, nats in the case of N3.
//
// func runSendWithReader(sdb *badger.DB, swb *badger.WriteBatch, userId string, topicName string, sc stan.Conn, r io.Reader, auditLevel string) error {
func runSendWithReader(sdb *badger.DB, userId string, topicName string, sc stan.Conn, r io.Reader, auditLevel string) error {
// set up a context to manage send pipeline
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
// monitor all error channels
var errcList []<-chan error
//
// build the pipleine by connecting all stages
//
// seaparate stream into json objects
jsonOut, errc, err := jsonReaderSource(ctx, r)
if err != nil {
return errors.Wrap(err, "Error: cannot create json-reader source component: ")
}
errcList = append(errcList, errc)
// classify the json data & add meta-data
classOut, errc, err := objectClassifier(ctx, userId, topicName, jsonOut)
if err != nil {
return errors.Wrap(err, "Error: cannot create object-classifier component: ")
}
errcList = append(errcList, errc)
// find existing crdt
findOut, errc, err := updateCRDT(ctx, sdb, classOut)
if err != nil {
return errors.Wrap(err, "Error: cannot create find-crdt component: ")
}
errcList = append(errcList, errc)
// publish crdt
publishOut, errc, err := publishCRDT(ctx, sc, findOut)
if err != nil {
return errors.Wrap(err, "Error: cannot create publish-crdt component: ")
}
errcList = append(errcList, errc)
// save the crdt
// saveOut, errc, err := saveCRDT(ctx, swb, publishOut)
saveOut, errc, err := saveCRDT(ctx, sdb, publishOut)
if err != nil {
return errors.Wrap(err, "Error: cannot create save-crdt component: ")
}
errcList = append(errcList, errc)
// minimal audit log
for cd := range saveOut {
// fmt.Printf("sent & saved:\t%s, v: %v id: %s\n", cd.N3id, cd.CRDT.Version(), cd.CRDT.ID())
_ = cd
}
return WaitForPipeline(errcList...)
}