forked from lAmeR1/kaspa-db-filler
-
Notifications
You must be signed in to change notification settings - Fork 1
/
TxAddrMappingUpdater.py
156 lines (118 loc) · 6.08 KB
/
TxAddrMappingUpdater.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# encoding: utf-8
import logging
import time
from datetime import datetime, timedelta
from dbsession import session_maker
from helper import KeyValueStore
from models.TxAddrMapping import TxAddrMapping
from sqlalchemy import text
LIMIT = 1000
PRECONDITION_RETRIES = 2
_logger = logging.getLogger(__name__)
class TxAddrMappingUpdater(object):
def __init__(self):
self.last_block_time = None
self.id_counter_inputs = None
self.id_counter_outputs = None
def precondition(self):
with session_maker() as s:
self.id_counter_inputs = int(KeyValueStore.get("last_id_counter_inputs") or 0)
self.id_counter_outputs = int(KeyValueStore.get("last_id_counter_outputs") or 0)
@staticmethod
def minimum_timestamp():
return round((datetime.now() - timedelta(minutes=1)).timestamp() * 1000)
def loop(self):
self.precondition()
error_cnt = 0
_logger.debug('Start TxAddrMappingUpdater') # type: TxAddrMapping
while True:
# get max id ( either LIMIT or maximum in DB )
with session_maker() as s:
max_in = min(self.id_counter_inputs + LIMIT,
s.execute(
text(f"""SELECT id FROM transactions_inputs ORDER by id DESC LIMIT 1"""))
.scalar() or 0)
max_out = min(self.id_counter_outputs + LIMIT,
s.execute(
text(f"""SELECT id FROM transactions_outputs ORDER by id DESC LIMIT 1"""))
.scalar() or 0)
try:
count_outputs, new_last_block_time_outputs = self.update_outputs(self.id_counter_outputs,
max_out)
count_inputs, new_last_block_time_inputs = self.update_inputs(self.id_counter_inputs,
max_in)
# save last runs ids in case of restart
KeyValueStore.set("last_id_counter_inputs", max_in)
KeyValueStore.set("last_id_counter_outputs", max_out)
except Exception:
error_cnt += 1
if error_cnt <= 3:
time.sleep(10)
continue
raise
if count_inputs > 0:
_logger.info(f"Updated {count_inputs} input mappings.")
if count_outputs > 0:
_logger.info(f"Updated {count_outputs} outputs mappings.")
last_id_counter_inputs = self.id_counter_inputs
last_id_counter_outputs = self.id_counter_outputs
# next start id is the maximum of last request
self.id_counter_inputs = max_in
self.id_counter_outputs = max_out
# _logger.debug(f"Next TX-Input ID: {self.id_counter_inputs}.")
# _logger.debug(f"Next TX-Output ID: {self.id_counter_outputs}.")
if last_id_counter_inputs + LIMIT > self.id_counter_inputs and \
last_id_counter_outputs + LIMIT > self.id_counter_outputs:
time.sleep(10)
def get_last_block_time(self, start_block_time):
with session_maker() as s:
result = s.execute(text(f"""SELECT
transactions.block_time
FROM transactions
WHERE transactions.block_time >= :blocktime
ORDER by transactions.block_time ASC
LIMIT {LIMIT}"""), {"blocktime": start_block_time}).all()
try:
return result[-1][0]
except TypeError:
return start_block_time
def update_inputs(self, min_id: int, max_id: int):
with session_maker() as s:
result = s.execute(text(f"""INSERT INTO tx_id_address_mapping (transaction_id, address, block_time)
SELECT DISTINCT * FROM (
SELECT transactions_inputs.transaction_id,
transactions_outputs.script_public_key_address,
transactions.block_time FROM transactions_inputs
LEFT JOIN transactions_outputs ON
transactions_outputs.transaction_id = transactions_inputs.previous_outpoint_hash AND
transactions_outputs.index = transactions_inputs.previous_outpoint_index
LEFT JOIN transactions ON transactions.transaction_id = transactions_inputs.transaction_id
WHERE transactions_inputs.id > :minId AND transactions_inputs.id <= :maxId
AND transactions_outputs.script_public_key_address IS NOT NULL
ORDER by transactions_inputs.id
) as distinct_query
ON CONFLICT DO NOTHING
RETURNING block_time;"""), {"minId": min_id, "maxId": max_id})
s.commit()
try:
result = result.all()
return len(result), result[-1][0]
except IndexError:
return 0, None
def update_outputs(self, min_id: int, max_id: int):
with session_maker() as s:
result = s.execute(text(f"""
INSERT INTO tx_id_address_mapping (transaction_id, address, block_time)
(SELECT sq.*, transactions.block_time FROM (SELECT transaction_id, script_public_key_address
FROM transactions_outputs
WHERE transactions_outputs.id > :minId and transactions_outputs.id <= :maxId
ORDER by transactions_outputs.id DESC) as sq
JOIN transactions ON transactions.transaction_id = sq.transaction_id)
ON CONFLICT DO NOTHING
RETURNING block_time;"""), {"minId": min_id, "maxId": max_id})
s.commit()
try:
result = result.all()
return len(result), result[-1][0]
except IndexError:
return 0, None