-
Notifications
You must be signed in to change notification settings - Fork 0
/
__init__.py
327 lines (236 loc) · 8.75 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
from collections import namedtuple
from datetime import datetime
import sys
import calendar
from enum import Enum
Candle = namedtuple('Candle', ('time', 'open', 'high', 'low', 'close', 'vol', 'emas', 'emal', 'trend'))
EMAS = 9
EMAL = 18
EMAMULTS = 2. / (EMAS + 1.)
EMAMULTL = 2. / (EMAL + 1.)
TIMEMULT = 1
ID_0003 = '00-03'
ID_0306 = '03-06'
ID_0609 = '06-09'
ID_0912 = '09-12'
ID_1215 = '12-15'
ID_1518 = '15-18'
ID_1821 = '18-21'
ID_2100 = '21-24'
ID_0006 = '00-06'
ID_0612 = '06-12'
ID_1218 = '12-18'
ID_1800 = '18-24'
ID_3hs = [ID_0003, ID_0306, ID_0609, ID_0912, ID_1215, ID_1518, ID_1821, ID_2100]
ID_6hs = [ID_0006, ID_0612, ID_1218, ID_1800]
WEEKS_YR = [str(wk) for wk in range(1, 54)]
DAYS_MTH = [str(day) for day in range(1, 32)]
YEARS = [str(yr) for yr in range(2013, 2020)]
PAIRS = ['AUDCAD', 'AUDCHF', 'AUDJPY', 'AUDNZD', 'AUDUSD',
'CADCHF', 'CADJPY', 'CHFJPY',
'EURAUD', 'EURCAD', 'EURCHF', 'EURGBP', 'EURJPY', 'EURNZD', 'EURUSD',
'GBPAUD', 'GBPCAD', 'GBPCHF', 'GBPJPY', 'GBPNZD', 'GBPUSD',
'NZDCAD', 'NZDCHF', 'NZDJPY', 'NZDUSD',
'USDCAD', 'USDCHF', 'USDJPY']
class BearBull(Enum):
BEARISH = 0
BULLISH = 1
COMPRESSBEAR = 2
COMPRESSBULL = 3
COMPRESS = 4
BEARBULL_KEYS = [str(i) for i in list(BearBull)]
class Summaries(Enum):
HOURS = 0
DAY = 1
DAYOFWEEK = 2
WEEKS = 3
MONTHS = 4
YEARS = 5
def load_candles(candle_file):
timestr = '%d.%m.%Y %H:%M:00.000 GMT%z'
candles = []
with open(candle_file, 'r') as candles_:
candles_.readline()
while True:
line = candles_.readline().split()
if not line:
break
ohlc = line[2].split(',')
line[2] = ohlc[0]
ohlc = ohlc[1:]
time = datetime.strptime(" ".join(line), timestr)
weekday = time.weekday()
if weekday == 5: # ignore saturday candles
continue
open_ = float(ohlc[0])
high = float(ohlc[1])
low = float(ohlc[2])
close = float(ohlc[3])
vol = float(ohlc[4])
candles_ema = candles[::TIMEMULT]
emas = emal = emas_1 = emal_1 = close
if len(candles_ema) == EMAS:
emas = sum([c.close for c in candles_ema[-EMAS:]]) / EMAS
elif len(candles_ema) > EMAS:
emas_1 = candles_ema[-1].emas
emas = (close - emas_1) * EMAMULTS + emas_1
if len(candles_ema) == EMAL:
emal = sum([c.close for c in candles_ema[-EMAL:]]) / EMAL
elif len(candles_ema) > EMAL:
emal_1 = candles_ema[-1].emal
emal = (close - emal_1) * EMAMULTL + emal_1
gap = abs(emas - emal)
gap_1 = abs(emas_1 - emal_1)
if emas > emal:
if gap >= gap_1 and gap > 0:
trend = BearBull.BULLISH
else:
trend = BearBull.COMPRESSBULL
elif emal > emas:
if gap >= gap_1 and gap > 0:
trend = BearBull.BEARISH
else:
trend = BearBull.COMPRESSBEAR
else:
trend = BearBull.COMPRESS
candles.append(
Candle(time, open_, high, low, close, vol, emas, emal, trend))
return candles
def _time(item):
time = item
if isinstance(item, Candle):
time = item.time
return time
def dayofweek(item):
return calendar.day_name[_time(item).weekday()]
def filter_hr(item, hours):
return _time(item).hour in hours
def filter_day(item, days):
return _time(item).day in days
def filter_dayofweek(item, days):
return dayofweek(item) in days
def filter_week(item, weeks):
return str(_time(item).isocalendar()[1]) in weeks
def filter_month(item, months):
return calendar.month_name[_time(item).month] in months
def filter_weekday(item, _inlist):
return _time(item).weekday() < 5
def filter_year(item, years):
return _time(item).year in years
def filter_bearish(item, compress):
compressbear = compress and item.trend == BearBull.COMPRESSBEAR
return item.trend == BearBull.BEARISH or compressbear
def filter_bullish(item, compress):
compressbull = compress and item.trend == BearBull.COMPRESSBULL
return item.trend == BearBull.BULLISH or compressbull
def filter(data_, filter_fun, inlist=None):
data = []
for item in data_:
if filter_fun(item, inlist):
data.append(item)
return data
def filter_thisyr(data):
return filter(data, filter_year, [datetime.now().year])
def filter_thismth(data):
now = datetime.now()
return filter(
filter(
data, filter_year, [now.year]), filter_month, [calendar.month_name[now.month]])
def classify_hr(time, periods):
for period_ in periods:
period = period_.split('-')
hour = (time.hour + 12) % 24
if int(period[0]) <= hour < int(period[1]):
return period_
def classify_day(time, _periods):
return str(time.day)
def classify_dayofweek(time, _periods):
return calendar.day_name[time.weekday()]
def classify_week(time, _periods):
return str(time.isocalendar()[1])
def classify_month(time, _periods):
return calendar.month_name[time.month]
def classify_year(time, _periods):
return str(time.year)
def classify(item, buckets, classify_fun, periods=None):
time = item
if isinstance(item, Candle):
time = item.time
bucket_period = classify_fun(time, periods)
bucket = buckets.get(bucket_period, [])
bucket.append(item)
buckets[bucket_period] = bucket
def first_fun_hour(item):
return _time(item).hour == 0
def first_fun_day(item):
return _time(item).day == 1
def first_fun_week(item):
return _time(item).isocalendar()[1] == 1
def first_fun_weekday(item):
return _time(item).weekday() == 0
def first_fun_month(item):
return _time(item).month == 1
def get_intraday_summary(data):
buckets_3h = {}
buckets_6h = {}
for item in data:
classify(item, buckets_3h, classify_hr, ID_3hs)
classify(item, buckets_6h, classify_hr, ID_6hs)
return buckets_3h, buckets_6h
def get_interday_summary(data, classify_fun):
buckets = {}
for item in data:
classify(item, buckets, classify_fun)
return buckets
def get_bearbull_summary(data):
buckets = {}
for item in data:
bucket_name = str(item.trend)
bucket = buckets.get(bucket_name, [])
bucket.append(item)
buckets[bucket_name] = bucket
return buckets
def print_buckets_summary(buckets, buckets_keys, samplesizes=None):
num_items = 0
for bucket in buckets_keys:
num_items += len(buckets.get(bucket, []))
for bucket in buckets_keys:
bucket_len = len(buckets.get(bucket, []))
if bucket_len:
percent_buckets = float(bucket_len * 100) / num_items
if samplesizes and not (samplesizes[bucket] == 1 and bucket_len == 1):
percent_samples = float(bucket_len * 100) / samplesizes[bucket]
print("{bucket}: {len}/{num} {pcb:.2f}%, {len}/{samples} {pcs:.2f}%".format(
bucket=bucket, len=bucket_len, num=num_items, pcb=percent_buckets, samples=samplesizes[bucket],
pcs=percent_samples))
else:
print("{bucket}: {len}/{num} {pcb:.2f}%".format(
bucket=bucket, len=bucket_len, num=num_items, pcb=percent_buckets))
print()
def print_complete_summary(data, which=list(Summaries), samplesizes=None):
if Summaries.HOURS in which:
buckets_3h, buckets_6h = get_intraday_summary(data)
print_buckets_summary(buckets_3h, ID_3hs, samplesizes)
if Summaries.DAY in which:
buckets = get_interday_summary(data, classify_day)
print_buckets_summary(buckets, DAYS_MTH, samplesizes)
if Summaries.DAYOFWEEK in which:
buckets = get_interday_summary(data, classify_dayofweek)
print_buckets_summary(buckets, list(calendar.day_name), samplesizes)
if Summaries.WEEKS in which:
buckets = get_interday_summary(data, classify_week)
print_buckets_summary(buckets, WEEKS_YR, samplesizes)
if Summaries.MONTHS in which:
buckets = get_interday_summary(data, classify_month)
print_buckets_summary(buckets, list(calendar.month_name)[1:], samplesizes)
if Summaries.YEARS in which:
buckets = get_interday_summary(data, classify_year)
print_buckets_summary(buckets, YEARS, samplesizes)
def get_sample_sizes(refdata):
samplesizes = {}
for key in refdata.keys():
samplesizes[key] = len(refdata.get(key))
return samplesizes
if __name__ == '__main__':
data = filter_thisyr(load_candles(sys.argv[1]))
print_complete_summary(data)