-
Notifications
You must be signed in to change notification settings - Fork 0
/
data-stream-test.py
51 lines (35 loc) · 1.33 KB
/
data-stream-test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import json
import math
from unittest.mock import MagicMock
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from time import sleep
data_stream_module = __import__("data-stream")
topic = 'test_topic'
# 42000 / 3 = 14000
test_input = [
json.dumps({'Timestap': '1526900000001', 'Symbol': 'BTC-USD', 'LastTradePrice':'10000'}),
json.dumps({'Timestap': '1526900000001', 'Symbol': 'BTC-USD', 'LastTradePrice':'12000'}),
json.dumps({'Timestap': '1526900000001', 'Symbol': 'BTC-USD', 'LastTradePrice':'20000'}),
]
def _make_dstream_helper(sc, ssc, test_input):
input_rdds = [sc.parallelize(test_input, 1)]
input_stream = ssc.queueStream(input_rdds)
return input_stream
def test_data_stream(sc, ssc, topic):
input_stream = _make_dstream_helper(sc, ssc, test_input)
print(input_stream)
mock_kafka_producer = MagicMock()
data_stream_module.process_stream(input_stream, mock_kafka_producer, topic)
ssc.start()
sleep(5)
ssc.stop()
mock_kafka_producer.send.assert_called_once()
args, kwargs = mock_kafka_producer.send.call_args
print(kwargs)
assert math.isclose(json.loads(kwargs['value'])['Average'], 14000.0, rel_tol=1e-10)
print('test_data_stream passed!')
if __name__ == '__main__':
sc = SparkContext('local[2]', 'local-testing')
ssc = StreamingContext(sc, 1)
test_data_stream(sc, ssc, topic)