This repository has been archived by the owner on Sep 28, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 6
/
transformer.go
407 lines (387 loc) · 14.5 KB
/
transformer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package eth
import (
"fmt"
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
"math/big"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff"
node "github.com/ipfs/go-ipld-format"
"github.com/jmoiron/sqlx"
"github.com/multiformats/go-multihash"
"github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-eth-indexer/pkg/ipfs/ipld"
"github.com/vulcanize/ipld-eth-indexer/pkg/postgres"
"github.com/vulcanize/ipld-eth-indexer/pkg/prom"
"github.com/vulcanize/ipld-eth-indexer/pkg/shared"
)
// Transformer interface to allow substitution of mocks for testing
type Transformer interface {
Transform(workerID int, payload statediff.Payload) (uint64, error)
}
// StateDiffTransformer satisfies the Transformer interface for ethereum statediff objects
type StateDiffTransformer struct {
chainConfig *params.ChainConfig
indexer *CIDIndexer
}
// NewStateDiffTransformer creates a pointer to a new PayloadConverter which satisfies the PayloadConverter interface
func NewStateDiffTransformer(chainConfig *params.ChainConfig, db *postgres.DB) *StateDiffTransformer {
return &StateDiffTransformer{
chainConfig: chainConfig,
indexer: NewCIDIndexer(db),
}
}
// Transform method is used to process statediff.Payload objects
// It performs the necessary data conversions and database persistence
func (sdt *StateDiffTransformer) Transform(workerID int, payload statediff.Payload) (uint64, error) {
start, t := time.Now(), time.Now()
// Unpack block rlp to access fields
block := new(types.Block)
if err := rlp.DecodeBytes(payload.BlockRlp, block); err != nil {
return 0, fmt.Errorf("error decoding payload block rlp: %s", err.Error())
}
blockHash := block.Hash()
blockHashStr := blockHash.String()
height := block.NumberU64()
traceMsg := fmt.Sprintf("worker %d transformer stats for payload at %d with hash %s:\r\n", workerID, height, blockHashStr)
transactions := block.Transactions()
// Decode receipts for this block
receipts := make(types.Receipts, 0)
if err := rlp.DecodeBytes(payload.ReceiptsRlp, &receipts); err != nil {
return 0, fmt.Errorf("error decoding payload receipts rlp: %s", err.Error())
}
// Decode state diff rlp for this block
stateDiff := new(statediff.StateObject)
if err := rlp.DecodeBytes(payload.StateObjectRlp, stateDiff); err != nil {
return 0, fmt.Errorf("error decoding payload state object rlp: %s", err.Error())
}
// Derive any missing fields
if err := receipts.DeriveFields(sdt.chainConfig, blockHash, height, transactions); err != nil {
return 0, err
}
// Generate the block iplds
headerNode, uncleNodes, txNodes, txTrieNodes, rctNodes, rctTrieNodes, err := ipld.FromBlockAndReceipts(block, receipts)
if err != nil {
return 0, err
}
if len(txNodes) != len(txTrieNodes) && len(rctNodes) != len(rctTrieNodes) && len(txNodes) != len(rctNodes) {
return 0, fmt.Errorf("expected number of transactions (%d), transaction trie nodes (%d), receipts (%d), and receipt trie nodes (%d)to be equal", len(txNodes), len(txTrieNodes), len(rctNodes), len(rctTrieNodes))
}
// Calculate reward
reward := CalcEthBlockReward(block.Header(), block.Uncles(), block.Transactions(), receipts)
tDiff := time.Now().Sub(t)
prom.SetTimeMetric("t_payload_decode", tDiff)
traceMsg += fmt.Sprintf("payload decoding time: %s\r\n", tDiff.String())
t = time.Now()
// Begin new db tx for everything
tx, err := sdt.indexer.db.Beginx()
if err != nil {
return 0, err
}
// defer to handle transaction commit or rollback for any return case
defer func() {
if p := recover(); p != nil {
shared.Rollback(tx)
panic(p)
} else if err != nil {
shared.Rollback(tx)
} else {
err = tx.Commit()
tDiff := time.Now().Sub(t)
prom.SetTimeMetric("t_postgres_commit", tDiff)
traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String())
}
traceMsg += fmt.Sprintf(" TOTAL PROCESSING TIME: %s\r\n", time.Now().Sub(start).String())
logrus.Trace(traceMsg)
}()
tDiff = time.Now().Sub(t)
prom.SetTimeMetric("t_free_postgres", tDiff)
traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff.String())
t = time.Now()
// Publish and index header, collect headerID
headerID, err := sdt.processHeader(tx, block.Header(), headerNode, reward, payload.TotalDifficulty)
if err != nil {
return 0, err
}
tDiff = time.Now().Sub(t)
prom.SetTimeMetric("t_header_processing", tDiff)
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String())
t = time.Now()
// Publish and index uncles
if err := sdt.processUncles(tx, headerID, height, uncleNodes); err != nil {
return 0, err
}
tDiff = time.Now().Sub(t)
prom.SetTimeMetric("t_uncle_processing", tDiff)
traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String())
t = time.Now()
// Publish and index receipts and txs
if err := sdt.processReceiptsAndTxs(tx, processArgs{
headerID: headerID,
blockNumber: block.Number(),
receipts: receipts,
txs: transactions,
rctNodes: rctNodes,
rctTrieNodes: rctTrieNodes,
txNodes: txNodes,
txTrieNodes: txTrieNodes,
}); err != nil {
return 0, err
}
tDiff = time.Now().Sub(t)
prom.SetTimeMetric("t_tx_receipt_processing", tDiff)
traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff.String())
t = time.Now()
// Publish and index state and storage nodes
if err := sdt.processStateAndStorage(tx, headerID, stateDiff); err != nil {
return 0, err
}
tDiff = time.Now().Sub(t)
prom.SetTimeMetric("t_state_store_processing", tDiff)
traceMsg += fmt.Sprintf("state and storage processing time: %s\r\n", tDiff.String())
t = time.Now()
if err := sdt.processCodeAndCodeHashes(tx, stateDiff.CodeAndCodeHashes); err != nil {
return 0, err
}
tDiff = time.Now().Sub(t)
prom.SetTimeMetric("t_code_codehash_processing", tDiff)
traceMsg += fmt.Sprintf("code and codehash processing time: %s\r\n", tDiff.String())
t = time.Now()
return height, err // return error explicity so that the defer() assigns to it
}
// processHeader publishes and indexes a header IPLD in Postgres
// it returns the headerID
func (sdt *StateDiffTransformer) processHeader(tx *sqlx.Tx, header *types.Header, headerNode node.Node, reward, td *big.Int) (int64, error) {
// publish header
if err := shared.PublishIPLD(tx, headerNode); err != nil {
return 0, err
}
// index header
return sdt.indexer.indexHeaderCID(tx, HeaderModel{
CID: headerNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(headerNode.Cid()),
ParentHash: header.ParentHash.String(),
BlockNumber: header.Number.String(),
BlockHash: header.Hash().String(),
TotalDifficulty: td.String(),
Reward: reward.String(),
Bloom: header.Bloom.Bytes(),
StateRoot: header.Root.String(),
RctRoot: header.ReceiptHash.String(),
TxRoot: header.TxHash.String(),
UncleRoot: header.UncleHash.String(),
Timestamp: header.Time,
})
}
func (sdt *StateDiffTransformer) processUncles(tx *sqlx.Tx, headerID int64, blockNumber uint64, uncleNodes []*ipld.EthHeader) error {
// publish and index uncles
for _, uncleNode := range uncleNodes {
if err := shared.PublishIPLD(tx, uncleNode); err != nil {
return err
}
uncleReward := CalcUncleMinerReward(blockNumber, uncleNode.Number.Uint64())
uncle := UncleModel{
CID: uncleNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()),
ParentHash: uncleNode.ParentHash.String(),
BlockHash: uncleNode.Hash().String(),
Reward: uncleReward.String(),
}
if err := sdt.indexer.indexUncleCID(tx, uncle, headerID); err != nil {
return err
}
}
return nil
}
// processArgs bundles arugments to processReceiptsAndTxs
type processArgs struct {
headerID int64
blockNumber *big.Int
receipts types.Receipts
txs types.Transactions
rctNodes []*ipld.EthReceipt
rctTrieNodes []*ipld.EthRctTrie
txNodes []*ipld.EthTx
txTrieNodes []*ipld.EthTxTrie
}
// processReceiptsAndTxs publishes and indexes receipt and transaction IPLDs in Postgres
func (sdt *StateDiffTransformer) processReceiptsAndTxs(tx *sqlx.Tx, args processArgs) error {
// Process receipts and txs
signer := types.MakeSigner(sdt.chainConfig, args.blockNumber)
for i, receipt := range args.receipts {
// tx that corresponds with this receipt
trx := args.txs[i]
from, err := types.Sender(signer, trx)
if err != nil {
return err
}
// Publishing
// publish trie nodes, these aren't indexed directly
if err := shared.PublishIPLD(tx, args.txTrieNodes[i]); err != nil {
return err
}
if err := shared.PublishIPLD(tx, args.rctTrieNodes[i]); err != nil {
return err
}
// publish the txs and receipts
txNode, rctNode := args.txNodes[i], args.rctNodes[i]
if err := shared.PublishIPLD(tx, txNode); err != nil {
return err
}
if err := shared.PublishIPLD(tx, rctNode); err != nil {
return err
}
// Indexing
// extract topic and contract data from the receipt for indexing
topicSets := make([][]string, 4)
mappedContracts := make(map[string]bool) // use map to avoid duplicate addresses
for _, log := range receipt.Logs {
for i, topic := range log.Topics {
topicSets[i] = append(topicSets[i], topic.Hex())
}
mappedContracts[log.Address.String()] = true
}
// these are the contracts seen in the logs
logContracts := make([]string, 0, len(mappedContracts))
for addr := range mappedContracts {
logContracts = append(logContracts, addr)
}
// this is the contract address if this receipt is for a contract creation tx
contract := shared.HandleZeroAddr(receipt.ContractAddress)
var contractHash string
if contract != "" {
contractHash = crypto.Keccak256Hash(common.HexToAddress(contract).Bytes()).String()
}
// index tx first so that the receipt can reference it by FK
txModel := TxModel{
Dst: shared.HandleZeroAddrPointer(trx.To()),
Src: shared.HandleZeroAddr(from),
TxHash: trx.Hash().String(),
Index: int64(i),
Data: trx.Data(),
CID: txNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(txNode.Cid()),
}
txID, err := sdt.indexer.indexTransactionCID(tx, txModel, args.headerID)
if err != nil {
return err
}
// index the receipt
rctModel := ReceiptModel{
Topic0s: topicSets[0],
Topic1s: topicSets[1],
Topic2s: topicSets[2],
Topic3s: topicSets[3],
Contract: contract,
ContractHash: contractHash,
LogContracts: logContracts,
CID: rctNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(rctNode.Cid()),
}
if len(receipt.PostState) == 0 {
rctModel.PostStatus = receipt.Status
} else {
rctModel.PostState = common.Bytes2Hex(receipt.PostState)
}
if err := sdt.indexer.indexReceiptCID(tx, rctModel, txID); err != nil {
return err
}
}
return nil
}
// processStateAndStorage publishes and indexes state and storage nodes in Postgres
func (sdt *StateDiffTransformer) processStateAndStorage(tx *sqlx.Tx, headerID int64, stateDiff *statediff.StateObject) error {
for _, stateNode := range stateDiff.Nodes {
// publish the state node
stateCIDStr, err := shared.PublishRaw(tx, ipld.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue)
if err != nil {
return err
}
mhKey, _ := shared.MultihashKeyFromCIDString(stateCIDStr)
stateModel := StateNodeModel{
Path: stateNode.Path,
StateKey: common.BytesToHash(stateNode.LeafKey).String(),
CID: stateCIDStr,
MhKey: mhKey,
NodeType: ResolveFromNodeType(stateNode.NodeType),
}
// index the state node, collect the stateID to reference by FK
stateID, err := sdt.indexer.indexStateCID(tx, stateModel, headerID)
if err != nil {
return err
}
// if we have a leaf, decode and index the account data
if stateNode.NodeType == sdtypes.Leaf {
var i []interface{}
if err := rlp.DecodeBytes(stateNode.NodeValue, &i); err != nil {
return fmt.Errorf("error decoding state leaf node rlp: %s", err.Error())
}
if len(i) != 2 {
return fmt.Errorf("eth IPLDPublisher expected state leaf node rlp to decode into two elements")
}
var account state.Account
if err := rlp.DecodeBytes(i[1].([]byte), &account); err != nil {
return fmt.Errorf("error decoding state account rlp: %s", err.Error())
}
accountModel := StateAccountModel{
Balance: account.Balance.String(),
Nonce: account.Nonce,
CodeHash: account.CodeHash,
StorageRoot: account.Root.String(),
}
if err := sdt.indexer.indexStateAccount(tx, accountModel, stateID); err != nil {
return err
}
}
// if there are any storage nodes associated with this node, publish and index them
for _, storageNode := range stateNode.StorageNodes {
storageCIDStr, err := shared.PublishRaw(tx, ipld.MEthStorageTrie, multihash.KECCAK_256, storageNode.NodeValue)
if err != nil {
return err
}
mhKey, _ := shared.MultihashKeyFromCIDString(storageCIDStr)
storageModel := StorageNodeModel{
Path: storageNode.Path,
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
CID: storageCIDStr,
MhKey: mhKey,
NodeType: ResolveFromNodeType(storageNode.NodeType),
}
if err := sdt.indexer.indexStorageCID(tx, storageModel, stateID); err != nil {
return err
}
}
}
return nil
}
// processCodeAndCodeHashes publishes code and codehash pairs to the ipld database
func (sdt *StateDiffTransformer) processCodeAndCodeHashes(tx *sqlx.Tx, codeAndCodeHashes []sdtypes.CodeAndCodeHash) error {
for _, c := range codeAndCodeHashes {
// codec doesn't matter since db key is multihash-based
mhKey, err := shared.MultihashKeyFromKeccak256(c.Hash)
if err != nil {
return err
}
if err := shared.PublishDirect(tx, mhKey, c.Code); err != nil {
return err
}
}
return nil
}