-
Notifications
You must be signed in to change notification settings - Fork 2
/
process.go
139 lines (112 loc) · 3.24 KB
/
process.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package subee
import (
"context"
"sync"
"time"
"github.com/pkg/errors"
)
// process encapsulates a subscribing routine and a consuming messages routine.
type process interface {
Start(ctx context.Context) error
}
type processImpl struct {
*Engine
wg sync.WaitGroup
}
func newProcess(e *Engine) process {
return &processImpl{
Engine: e,
}
}
func (p *processImpl) Start(ctx context.Context) error {
p.Logger.Print("Start process")
defer p.Logger.Print("Finish process")
defer p.wg.Wait() // To wait for consuming all received messages.
switch {
case p.Consumer != nil:
return errors.WithStack(p.startConsumingProcess(ctx))
case p.BatchConsumer != nil:
return errors.WithStack(p.startBatchConsumingProcess(ctx))
default:
panic("unreachable")
}
}
func (p *processImpl) startConsumingProcess(ctx context.Context) error {
consumer := chainConsumerInterceptors(p.Consumer, p.ConsumerInterceptors...)
err := p.subscribe(ctx, func(in Message) {
p.handleMessage(p.createConsumingContext(), &singleMessage{Message: in}, func(ctx context.Context) error {
return errors.WithStack(consumer.Consume(ctx, in))
})
})
return errors.WithStack(err)
}
func (p *processImpl) startBatchConsumingProcess(ctx context.Context) error {
inCh, outCh := createBufferedQueue(
p.createConsumingContext,
p.ChunkSize,
p.FlushInterval,
)
defer close(inCh)
p.wg.Add(1)
go func() {
defer p.wg.Done()
batchConsumer := chainBatchConsumerInterceptors(p.BatchConsumer, p.BatchConsumerInterceptors...)
p.Logger.Print("Start batch consuming process")
defer p.Logger.Print("Finish batch consuming process")
for m := range outCh {
msgs := m.Msgs
p.handleMessage(m.Ctx, m, func(ctx context.Context) error {
return errors.WithStack(batchConsumer.BatchConsume(ctx, msgs))
})
}
}()
return errors.WithStack(p.subscribe(ctx, func(msg Message) { inCh <- msg }))
}
func (p *processImpl) subscribe(ctx context.Context, f func(msg Message)) error {
p.Logger.Print("Start subscribing messages")
defer p.Logger.Print("Finish subscribing messages")
return errors.WithStack(p.subscriber.Subscribe(ctx, f))
}
func (p *processImpl) createConsumingContext() context.Context {
ctx := context.Background()
ctx = p.StatsHandler.TagProcess(ctx, &BeginTag{})
ctx = p.StatsHandler.TagProcess(ctx, &EnqueueTag{})
ctx = setEnqueuedAt(ctx, time.Now().UTC())
return ctx
}
func (p *processImpl) handleMessage(ctx context.Context, m queuedMessage, handle func(context.Context) error) {
p.wg.Add(1)
go func() {
defer p.wg.Done()
p.Logger.Printf("Start consuming %d messages", m.Count())
defer p.Logger.Printf("Finish consuming %d messages", m.Count())
if p.AckImmediately {
m.Ack()
}
enqueuedAt := getEnqueuedAt(ctx)
p.StatsHandler.HandleProcess(ctx, &Dequeue{
BeginTime: enqueuedAt,
EndTime: time.Now(),
})
beginTime := time.Now()
ctx = p.StatsHandler.TagProcess(ctx, &ConsumeBeginTag{})
err := handle(ctx)
if !p.AckImmediately {
if err != nil {
m.Nack()
} else {
m.Ack()
}
}
p.StatsHandler.HandleProcess(ctx, &ConsumeEnd{
BeginTime: beginTime,
EndTime: time.Now(),
Error: err,
})
p.StatsHandler.HandleProcess(ctx, &End{
MsgCount: m.Count(),
BeginTime: enqueuedAt,
EndTime: time.Now(),
})
}()
}