forked from Chavithra/degiro-connector
-
Notifications
You must be signed in to change notification settings - Fork 0
/
realtime_poller.py
72 lines (57 loc) · 1.68 KB
/
realtime_poller.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# IMPORTATIONS
import json
import logging
from degiro_connector.quotecast.api import API as QuotecastAPI
from degiro_connector.quotecast.models.quotecast_parser import QuotecastParser
from degiro_connector.quotecast.models.quotecast_pb2 import Quotecast
# SETUP LOGGING
logging.basicConfig(level=logging.INFO)
# SETUP CONFIG DICT
with open("config/config.json") as config_file:
config_dict = json.load(config_file)
# SETUP CREDENTIALS
user_token = config_dict.get("user_token") # HERE GOES YOUR USER_TOKEN
# SETUP API
quotecast_api = QuotecastAPI(user_token=user_token)
# CONNECTION
quotecast_api.connect()
# SUBSCRIBE TO METRICS
request = Quotecast.Request()
request.subscriptions["AAPL.BATS,E"].extend(
[
"LastDate",
"LastTime",
"LastPrice",
"LastVolume",
"AskPrice",
"BidPrice",
]
)
quotecast_api.subscribe(request=request)
# SETUP JSON PARSER
quotecast_parser = QuotecastParser()
while True:
try:
# FETCH DATA
quotecast = quotecast_api.fetch_data()
# DISPLAY RAW JSON
print(quotecast.json_data)
# DISPLAY TICKER (PROTOBUF/GRPC OBJECT)
quotecast_parser.put_quotecast(quotecast=quotecast)
ticker = quotecast_parser.ticker
print(ticker)
# DISPLAY DICT
ticker_dict = quotecast_parser.ticker_dict
print(ticker_dict)
# DISPLAY PANDAS.DATAFRAME
ticker_df = quotecast_parser.ticker_df
print(ticker_df)
# REMOVE THIS LINE TO RUN IT IN LOOP
# USE : CTRL+C TO QUIT
break
except Exception as e:
print(e)
break
except KeyboardInterrupt:
print("KeyboardInterrupt")
break