forked from streamingfast/firehose-core
-
Notifications
You must be signed in to change notification settings - Fork 0
/
stream_factory.go
149 lines (125 loc) · 4.12 KB
/
stream_factory.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package firecore
import (
"context"
"fmt"
"github.com/streamingfast/dmetering"
"github.com/streamingfast/firehose-core/metering"
"github.com/streamingfast/bstream"
"github.com/streamingfast/bstream/hub"
"github.com/streamingfast/bstream/stream"
"github.com/streamingfast/bstream/transform"
"github.com/streamingfast/dauth"
"github.com/streamingfast/dstore"
pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
var StreamMergedBlocksPreprocThreads = 25
type StreamFactory struct {
mergedBlocksStore dstore.Store
forkedBlocksStore dstore.Store
hub *hub.ForkableHub
transformRegistry *transform.Registry
}
func NewStreamFactory(
mergedBlocksStore dstore.Store,
forkedBlocksStore dstore.Store,
hub *hub.ForkableHub,
transformRegistry *transform.Registry,
) *StreamFactory {
return &StreamFactory{
mergedBlocksStore: mergedBlocksStore,
forkedBlocksStore: forkedBlocksStore,
hub: hub,
transformRegistry: transformRegistry,
}
}
func (sf *StreamFactory) New(
ctx context.Context,
handler bstream.Handler,
request *pbfirehose.Request,
logger *zap.Logger,
extraOpts ...stream.Option) (*stream.Stream, error) {
reqLogger := logger.With(
zap.Int64("req_start_block", request.StartBlockNum),
zap.String("req_cursor", request.Cursor),
zap.Uint64("req_stop_block", request.StopBlockNum),
zap.Bool("final_blocks_only", request.FinalBlocksOnly),
)
options := []stream.Option{
stream.WithStopBlock(request.StopBlockNum),
}
preprocFunc, blockIndexProvider, desc, err := sf.transformRegistry.BuildFromTransforms(request.Transforms)
if err != nil {
reqLogger.Error("cannot process incoming blocks request transforms", zap.Error(err))
return nil, fmt.Errorf("building from transforms: %w", err)
}
if preprocFunc != nil {
options = append(options, stream.WithPreprocessFunc(preprocFunc, StreamMergedBlocksPreprocThreads))
}
if blockIndexProvider != nil {
reqLogger = reqLogger.With(zap.Bool("with_index_provider", true))
}
if desc != "" {
reqLogger = reqLogger.With(zap.String("transform_desc", desc))
}
options = append(options, stream.WithLogger(logger)) // stream won't have the full reqLogger, use the traceID to connect them together
if blockIndexProvider != nil {
options = append(options, stream.WithBlockIndexProvider(blockIndexProvider))
}
if request.FinalBlocksOnly {
options = append(options, stream.WithFinalBlocksOnly())
}
var fields []zap.Field
auth := dauth.FromContext(ctx)
if auth != nil {
fields = append(fields,
zap.String("api_key_id", auth.APIKeyID()),
zap.String("user_id", auth.UserID()),
zap.String("real_ip", auth.RealIP()),
)
if auth["x-deployment-id"] != "" {
fields = append(fields, zap.String("deployment_id", auth["x-deployment-id"]))
}
}
reqLogger.Info("processing incoming blocks request", fields...)
if request.Cursor != "" {
cur, err := bstream.CursorFromOpaque(request.Cursor)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "invalid start cursor %q: %s", request.Cursor, err)
}
options = append(options, stream.WithCursor(cur))
}
forkedBlocksStore := sf.forkedBlocksStore
if clonable, ok := forkedBlocksStore.(dstore.Clonable); ok {
var err error
forkedBlocksStore, err = clonable.Clone(ctx, metering.WithForkedBlockBytesReadMeteringOptions(dmetering.GetBytesMeter(ctx), logger)...)
if err != nil {
return nil, err
}
//todo: (deprecated) remove this
forkedBlocksStore.SetMeter(dmetering.GetBytesMeter(ctx))
}
mergedBlocksStore := sf.mergedBlocksStore
if clonable, ok := mergedBlocksStore.(dstore.Clonable); ok {
var err error
mergedBlocksStore, err = clonable.Clone(ctx, metering.WithBlockBytesReadMeteringOptions(dmetering.GetBytesMeter(ctx), logger)...)
if err != nil {
return nil, err
}
//todo: (deprecated) remove this
mergedBlocksStore.SetMeter(dmetering.GetBytesMeter(ctx))
}
for _, opt := range extraOpts {
options = append(options, opt)
}
str := stream.New(
forkedBlocksStore,
mergedBlocksStore,
sf.hub,
request.StartBlockNum,
handler,
options...)
return str, nil
}