-
Notifications
You must be signed in to change notification settings - Fork 2
/
invalidblockrequest.py
executable file
·143 lines (117 loc) · 4.97 KB
/
invalidblockrequest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
#!/usr/bin/env python3
# Copyright (c) 2015-2016 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
from test_framework.test_framework import ComparisonTestFramework
from test_framework.util import *
from test_framework.comptool import TestManager, TestInstance, RejectResult
from test_framework.blocktools import *
import copy
import time
'''
In this test we connect to one node over p2p, and test block requests:
1) Valid blocks should be requested and become chain tip.
2) Invalid block with duplicated transaction should be re-requested.
3) Invalid block with bad coinbase value should be rejected and not
re-requested.
'''
# Use the ComparisonTestFramework with 1 node: only use --testbinary.
class InvalidBlockRequestTest(ComparisonTestFramework):
''' Can either run this test as 1 node with expected answers, or two and compare them.
Change the "outcome" variable from each TestInstance object to only do the comparison. '''
def set_test_params(self):
self.num_nodes = 1
self.setup_clean_chain = True
def run_test(self):
test = TestManager(self, self.options.tmpdir)
test.add_all_connections(self.nodes)
self.tip = None
self.block_time = None
self.extra_args = [["--whitelist=127.0.0.1"]]
NetworkThread().start() # Start up network handling in another thread
test.run()
def get_tests(self):
if self.tip is None:
self.tip = int("0x" + self.nodes[0].getbestblockhash(), 0)
self.block_time = int(time.time()) + 1
'''
Create a new block with an anyone-can-spend coinbase
'''
height = 1
block = create_block(
self.tip, create_coinbase(height), self.block_time)
self.block_time += 1
block.solve()
self.tip = block.sha256
height += 1
yield TestInstance([[block, True]])
block1 = block
'''
Now we need that block to mature so we can spend the coinbase.
'''
test = TestInstance(sync_every_block=False)
for i in range(100):
block = create_block(
self.tip, create_coinbase(height), self.block_time)
block.solve()
self.tip = block.sha256
self.block_time += 1
test.blocks_and_transactions.append([block, True])
height += 1
yield test
assert(block.sha256 == int(self.nodes[0].getbestblockhash(), 16))
'''
Now we use merkle-root malleability to generate an invalid block with
same blockheader.
Manufacture a block with 3 transactions (coinbase, spend of prior
coinbase, spend of that spend). Duplicate the 3rd transaction to
leave merkle root and blockheader unchanged but invalidate the block.
'''
block2 = create_block(
self.tip, create_coinbase(height), self.block_time)
self.block_time += 1
# b'0x51' is OP_TRUE
tx1 = create_transaction(block1.vtx[0], 0, b'', 50 * COIN)
tx2 = create_transaction(tx1, 0, b'\x51', 50 * COIN)
block2.vtx.extend([tx1, tx2])
block2.vtx = [block2.vtx[0]] + \
sorted(block2.vtx[1:], key=lambda tx: tx.get_id())
block2.hashMerkleRoot = block2.calc_merkle_root()
block2.rehash()
block2.solve()
orig_hash = block2.sha256
block2_orig = copy.deepcopy(block2)
# Mutate block 2
block2.vtx.append(block2.vtx[2])
assert_equal(block2.hashMerkleRoot, block2.calc_merkle_root())
assert_equal(orig_hash, block2.rehash())
assert(block2_orig.vtx != block2.vtx)
self.tip = block2.sha256
yield TestInstance([[block2, RejectResult(16, b'bad-txns-duplicate')]])
yield TestInstance([[block2_orig, True]])
height += 1
# Check transactions for duplicate inputs
self.log.info("Test duplicate input block.")
block2_orig.vtx[2].vin.append(block2_orig.vtx[2].vin[0])
block2.vtx = [block2.vtx[0]] + \
sorted(block2.vtx[1:], key=lambda tx: tx.get_id())
block2_orig.vtx[2].rehash()
block2_orig.hashMerkleRoot = block2_orig.calc_merkle_root()
block2_orig.rehash()
block2_orig.solve()
yield TestInstance([[block2_orig, RejectResult(16, b'bad-txns-inputs-duplicate')]])
'''
Make sure that a totally screwed up block is not valid.
'''
block3 = create_block(
self.tip, create_coinbase(height), self.block_time)
self.block_time += 1
block3.vtx[0].vout[0].nValue = 51 * COIN # Too high!
block3.vtx[0].sha256 = None
block3.vtx[0].calc_sha256()
block3.hashMerkleRoot = block3.calc_merkle_root()
block3.rehash()
block3.solve()
yield TestInstance([[block3, RejectResult(16, b'bad-cb-amount')]])
if __name__ == '__main__':
InvalidBlockRequestTest().main()