-
Notifications
You must be signed in to change notification settings - Fork 2
/
kinesumer_test.go
97 lines (88 loc) · 2.79 KB
/
kinesumer_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package kinesumer
import (
"math/rand"
"testing"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/remind101/kinesumer/mocks"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
func makeTestKinesumer(t *testing.T) (*Kinesumer, *mocks.Kinesis, *mocks.Checkpointer,
*mocks.Provisioner) {
kin := new(mocks.Kinesis)
sssm := new(mocks.Checkpointer)
prov := new(mocks.Provisioner)
k, err := New(
kin,
sssm,
prov,
rand.NewSource(0),
"TestStream",
nil,
time.Duration(0),
)
if err != nil {
t.Error(err)
}
return k, kin, sssm, prov
}
func TestKinesumerGetStreams(t *testing.T) {
k, kin, _, _ := makeTestKinesumer(t)
kin.On("ListStreamsPages", mock.Anything, mock.Anything).Return(nil)
streams, err := k.GetStreams()
assert.Nil(t, err)
kin.AssertNumberOfCalls(t, "ListStreamsPages", 1)
assert.Equal(t, 3, len(streams))
assert.Equal(t, streams[2], "c")
}
func TestKinesumerStreamExists(t *testing.T) {
k, kin, _, _ := makeTestKinesumer(t)
k.Stream = "c"
kin.On("ListStreamsPages", mock.Anything, mock.Anything).Return(nil)
e, err := k.StreamExists()
assert.Nil(t, err)
kin.AssertNumberOfCalls(t, "ListStreamsPages", 1)
assert.True(t, e)
}
func TestKinesumerGetShards(t *testing.T) {
k, kin, _, _ := makeTestKinesumer(t)
k.Stream = "c"
kin.On("DescribeStreamPages", mock.Anything, mock.Anything).Return(nil)
shards, err := k.GetShards()
assert.Nil(t, err)
kin.AssertNumberOfCalls(t, "DescribeStreamPages", 1)
assert.Equal(t, 2, len(shards))
assert.Equal(t, "shard1", *shards[1].ShardId)
}
func TestKinesumerBeginEnd(t *testing.T) {
k, kin, sssm, prov := makeTestKinesumer(t)
k.Stream = "c"
kin.On("DescribeStreamPages", mock.Anything, mock.Anything).Return(awserr.New("bad", "bad", nil)).Once()
_, err := k.Begin()
assert.Error(t, err)
prov.On("TTL").Return(time.Millisecond * 10)
prov.On("TryAcquire", mock.Anything).Return(nil)
prov.On("Heartbeat", mock.Anything).Return(nil)
prov.On("Release", mock.Anything).Return(nil)
kin.On("DescribeStreamPages", mock.Anything, mock.Anything).Return(awserr.Error(nil))
sssm.On("Begin", mock.Anything).Return(nil)
sssm.On("GetStartSequence", mock.Anything).Return("0").Once()
sssm.On("GetStartSequence", mock.Anything).Return("")
sssm.On("TryAcquire", mock.Anything).Return(nil)
kin.On("GetShardIterator", mock.Anything).Return(&kinesis.GetShardIteratorOutput{
ShardIterator: aws.String("0"),
}, awserr.Error(nil))
kin.On("GetRecords", mock.Anything).Return(&kinesis.GetRecordsOutput{
MillisBehindLatest: aws.Int64(0),
NextShardIterator: aws.String("AAAAA"),
Records: []*kinesis.Record{},
}, awserr.Error(nil))
sssm.On("End").Return()
_, err = k.Begin()
assert.Nil(t, err)
assert.Equal(t, 2, k.nRunning)
k.End()
}