-
Notifications
You must be signed in to change notification settings - Fork 1
/
operators.go
112 lines (102 loc) · 2.38 KB
/
operators.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package rxgo
import (
"time"
)
func Delay(d time.Duration) OperatorFunc {
return func(o Observable) Observable {
return Create(func(v ValueChan, e ErrChan, c CompleteChan) TeardownFunc {
return o.Subscribe(OnNext(func(val Value) {
time.Sleep(d)
v.Next(val)
}).OnErr(e.Error).OnComplete(c.Complete)).Unsubscribe
})
}
}
func Debounce(d time.Duration) OperatorFunc {
return func(o Observable) Observable {
return Create(func(v ValueChan, e ErrChan, c CompleteChan) TeardownFunc {
debounce := make(chan Value)
done := make(chan bool)
go func() {
LOOP:
for {
time.Sleep(d)
v.Next(<-debounce)
select {
case <-done:
break LOOP
default:
}
}
c.Complete()
}()
return o.Subscribe(OnNext(func(val Value) {
select {
case debounce <- val:
default:
}
}).OnErr(e.Error).OnComplete(func() {
done <- true
c.Complete()
})).Unsubscribe
})
}
}
func Buffer(num int) OperatorFunc {
return BufferWithCount(num, num)
}
func BufferWithCount(num, count int) OperatorFunc {
if count < 1 {
panic("count must be greater than 0")
}
return func(o Observable) Observable {
return Create(func(v ValueChan, e ErrChan, c CompleteChan) TeardownFunc {
var buffer []Value
return o.Subscribe(OnNext(func(val Value) {
buffer = append(buffer, val)
if len(buffer) >= num && len(buffer)%count == 0 {
v.Next(buffer[len(buffer)-num:])
buffer = buffer[len(buffer)-num:]
}
}).OnErr(e.Error).OnComplete(c.Complete)).Unsubscribe
})
}
}
// ToArray is an alias of ToSlice.
func ToArray() OperatorFunc {
return ToSlice()
}
func ToSlice() OperatorFunc {
return func(o Observable) Observable {
return o.Pipe(
Reduce(func(acc, val Value) Value {
return append(acc.([]Value), val)
}, []Value(nil)),
)
}
}
func PairWise() OperatorFunc {
return BufferWithCount(2, 1)
}
func First(predicate func(v Value) bool) OperatorFunc {
if predicate == nil {
return Take(1)
}
return Filter(predicate).Pipe(Take(1))
}
func Last() OperatorFunc {
return func(o Observable) Observable {
return Create(func(v ValueChan, e ErrChan, c CompleteChan) TeardownFunc {
var lastValue Value
return o.Subscribe(OnNext(func(val Value) {
lastValue = val
}).OnErr(func(err error) {
v.Next(lastValue)
e.Error(err)
}).OnComplete(func() {
v.Next(lastValue)
c.Complete()
})).Unsubscribe
})
}
}