-
Notifications
You must be signed in to change notification settings - Fork 1
/
phos.go
206 lines (188 loc) · 4.43 KB
/
phos.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
// Copyright 2023 BINARY Members
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except In compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to In writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package phos
import (
"context"
"slices"
"sync"
"time"
)
// Phos short for Phosphophyllite
// PHOS is a channel with internal handler chain
type Phos[T any] struct {
In chan<- T
Out <-chan Result[T]
handlers []Handler[T]
options *Options
once sync.Once
mu sync.RWMutex
wg sync.WaitGroup
appendC chan Handler[T]
deleteC chan int
receiveC chan Result[T]
closeC chan struct{}
}
// Handler handles the data of PHOS channel
type Handler[T any] func(ctx context.Context, input T) (output T, err error)
// Result PHOS output result
type Result[T any] struct {
Data T
// Note: You should use the OK of Result rather than the second return value of PHOS Out channel
OK bool
Err *Error
}
// New PHOS channel
// TODO: support buffer channel
// TODO: remove timeout, context
// TODO: keep simple
func New[T any](opts ...Option) *Phos[T] {
options := newOptions(opts...)
in := make(chan T, 1)
out := make(chan Result[T], 1)
ph := &Phos[T]{
options: options,
In: in,
Out: out,
handlers: make([]Handler[T], 0),
appendC: make(chan Handler[T]),
deleteC: make(chan int),
receiveC: make(chan Result[T]),
closeC: make(chan struct{}),
}
go ph.handle(in, out)
return ph
}
// Close PHOS channel
// Note: You should not close In channel manually before or after calling Close
func (ph *Phos[T]) Close() {
ph.once.Do(func() {
close(ph.In)
close(ph.appendC)
close(ph.deleteC)
<-ph.closeC
close(ph.receiveC)
})
}
// Len return the number of handlers
func (ph *Phos[T]) Len() int {
ph.mu.RLock()
defer ph.mu.RUnlock()
return len(ph.handlers)
}
// Append add handler for PHOS to execute
func (ph *Phos[T]) Append(handlers ...Handler[T]) {
for _, handler := range handlers {
ph.appendC <- handler
}
}
// Delete handler according to the index
func (ph *Phos[T]) Delete(index int) {
ph.deleteC <- index
}
// Remove handler from PHOS
// Deprecated: use Delete instead
func (ph *Phos[T]) Remove(index int) {
ph.Delete(index)
}
func (ph *Phos[T]) handle(in chan T, out chan Result[T]) {
defer close(ph.closeC)
ctx := ph.options.Ctx
LOOP:
for {
select {
case handler, ok := <-ph.appendC:
if !ok {
break LOOP
}
ph.handlers = append(ph.handlers, handler)
case index, ok := <-ph.deleteC:
if !ok {
break LOOP
}
if index < 0 || index > len(ph.handlers)-1 {
continue
}
ph.handlers = slices.Delete(ph.handlers, index, index+1)
case data, ok := <-in:
if !ok {
out <- ph.result(data, false, nil)
break LOOP
}
timer := time.NewTimer(ph.options.Timeout)
now := time.Now()
go ph.doHandle(ctx, data, now)
select {
case <-timer.C:
timer.Stop()
if ph.options.ErrTimeoutFunc != nil {
data = ph.options.ErrTimeoutFunc(ctx, data).(T)
}
out <- ph.result(data, true, timeoutError())
case res, ok := <-ph.receiveC:
timer.Stop()
if !ok {
break LOOP
}
out <- res
case <-ctx.Done():
timer.Stop()
if ph.options.ErrDoneFunc != nil {
data = ph.options.ErrDoneFunc(ctx, data, ctx.Err()).(T)
}
out <- ph.result(data, true, ctxError(ctx.Err()))
}
}
}
ph.wg.Wait()
}
func (ph *Phos[T]) doHandle(ctx context.Context, data T, past time.Time) {
launch := func(err *Error) {
if time.Now().After(past.Add(ph.options.Timeout)) {
return
}
select {
case ph.receiveC <- ph.result(data, true, err):
default:
}
}
ph.wg.Add(1)
defer ph.wg.Done()
var err error
for _, handler := range ph.handlers {
data, err = handler(ctx, data)
if err != nil {
if ph.options.ErrHandleFunc != nil {
data = ph.options.ErrHandleFunc(ctx, data, err).(T)
}
launch(handlerError(err))
return
}
}
launch(nil)
}
func (ph *Phos[T]) result(data T, ok bool, err *Error) Result[T] {
if ph.options.Zero && err != nil {
return Result[T]{
Data: *new(T),
OK: ok,
Err: err,
}
}
return Result[T]{
Data: data,
OK: ok,
Err: err,
}
}