-
Notifications
You must be signed in to change notification settings - Fork 0
/
blocky.py
196 lines (159 loc) · 7.14 KB
/
blocky.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
import os
from Crypto import Random
from Crypto.PublicKey import RSA
from Crypto.Hash import SHA256
from Crypto.Cipher import PKCS1_OAEP
from Crypto.PublicKey import RSA
import six
import datetime
from operator import itemgetter
from pytz import timezone
import json
import base64
home_directory = os.path.expanduser('~')
base_directory = os.path.join(home_directory, '.blocky')
chain_directory = os.path.join(base_directory, 'chains')
foreignPublicKeysDirectory = os.path.join(base_directory, 'fpkeys')
selfkeysDirectory = os.path.join(base_directory, 'skeys')
privatKeyPath = os.path.join(selfkeysDirectory, 'private.key')
publicKeyPath = os.path.join(selfkeysDirectory, 'public.key')
def setup():
dirs = [base_directory, chain_directory, foreignPublicKeysDirectory, selfkeysDirectory]
for dir in dirs:
print('dir: {}'.format(dir))
if not os.path.isdir(dir):
os.makedirs(dir)
def create_self_keys():
"""Be careful rewrite your keys"""
random_generator = Random.new().read
key = RSA.generate(1024, random_generator)
private, public = key.exportKey(), key.publickey().exportKey()
with open(privatKeyPath, 'wb') as private_file:
private_file.write(private)
with open(publicKeyPath, 'wb') as public_file:
public_file.write(public)
def to_format_for_encrypt(value):
if isinstance(value, int):
return six.binary_type(value)
for str_type in six.string_types:
if isinstance(value, str_type):
return value.encode('utf8')
if isinstance(value, six.binary_type):
return value
def encrypt(message, recipient=None, auto=False):
if auto:
with open(publicKeyPath) as f:
key_text = f.read()
else:
publicKeyPathRecipient = os.path.join(foreignPublicKeysDirectory, '{}.key'.format(recipient))
with open(publicKeyPathRecipient) as f:
key_text = f.read()
public_key_object = RSA.importKey(key_text)
cipher = PKCS1_OAEP.new(public_key_object)
message = to_format_for_encrypt(message)
encrypted_message = cipher.encrypt(message)
return encrypted_message
def decrypt(message):
# read key file
with open(privatKeyPath) as f:
key_text = f.read()
private_key_object = RSA.importKey(key_text)
cipher = PKCS1_OAEP.new(private_key_object)
decrypted_message = cipher.decrypt(message)
return six.text_type(decrypted_message, encoding='utf8')
def sign(text):
with open(privatKeyPath) as f:
key_text = f.read()
private_key_object = RSA.importKey(key_text)
text = text.encode('utf8')
hash = SHA256.new(text).digest()
signature = private_key_object.sign(hash, '')
return signature
def verify_signature(text, signature, author):
text = text.encode('utf8')
authorPublicKeyPath = os.path.join(foreignPublicKeysDirectory, '{}.key'.format(author))
with open(authorPublicKeyPath) as f:
key_text = f.read()
authorPublicKey = RSA.importKey(key_text)
hash = SHA256.new(text).digest()
return authorPublicKey.verify(hash, signature)
def build_block(previous_block_hash, message, recipients, difficulty=0):
blockstring = str(previous_block_hash)
timestamp = datetime.datetime.utcfromtimestamp(datetime.datetime.now().timestamp()) #current utc time
timestamp = timestamp.replace(tzinfo=timezone('UTC'))
timestamp = timestamp.strftime('%Y-%m-%d %H:%M:%S %Z')
blockstring += timestamp
message_hash = SHA256.new(message.encode('utf8')).hexdigest()
blockstring += str(message_hash)
message_signature = sign(message_hash)
blockstring += str(message_signature[0])
blockstring += str(difficulty)
with open(publicKeyPath) as f:
key_text = f.read()
author = SHA256.new(key_text.encode('utf8')).hexdigest()
blockstring += str(author)
message_encryptions = []
auto_encryption = str(encrypt(message, auto=True))
recipients.append(author)
recipients.sort()
for recipient in recipients:
if recipient == author:
message_encryptions.append({'recipient': recipient, 'encrypted': auto_encryption})
blockstring += str(recipient) + auto_encryption
else:
encrypted_message = str(encrypt(message, recipient))
recipient = str(recipient)
message_encryptions.append({'recipient': recipient, 'encrypted': encrypted_message})
blockstring += recipient + encrypted_message
block_hash = str(SHA256.new(blockstring.strip().encode('utf8')).hexdigest())
nonce = ''
if difficulty > 0:
nonce = -1
while not block_hash.startswith('0' * difficulty):
nonce += 1
block_hash = str(SHA256.new((blockstring + str(nonce)).strip().encode('utf8')).hexdigest())
print('blockstring: {}'.format(blockstring + str(nonce)))
return {'previous_block_hash': previous_block_hash, 'timestamp': timestamp, 'author': author,
'message_hash': message_hash, 'encryptions': message_encryptions, 'difficulty': str(difficulty),
'nonce': nonce, 'block_hash': block_hash, 'signature': message_signature}
def verify_block_content(block):
# verify signature
signature_ok = verify_signature(block['message_hash'], block['signature'], block['author'])
if not signature_ok:
print('WARNING: block signature not correct')
# verify difficulty
difficulty_ok = True
difficulty = int(block['difficulty'])
if difficulty > 0:
difficulty_ok = block['block_hash'].startswith('0' * difficulty)
if not difficulty_ok:
print('WARNING: block hash does not respect block difficulty')
# verify block hash
blockstring = str(block['previous_block_hash']) + str(block['timestamp']) + str(block['message_hash']) + str(block['signature'][0]) + str(block['difficulty']) + str(block['author'])
encryptions = block['encryptions']
encryptions = sorted(encryptions, key=itemgetter('recipient'))
for encryption in encryptions:
blockstring += str(encryption['recipient']) + str(encryption['encrypted'])
blockstring += str(block['nonce'])
print('blockstring: {}'.format(blockstring))
block_hash = str(SHA256.new(str(blockstring).strip().encode('utf8')).hexdigest())
print('block_hash computed: {}\nblock_hash gotten: {}'.format(block_hash, block['block_hash']))
block_hash_ok = (block_hash == block['block_hash'])
if not block_hash_ok:
print('WARNING: block_hash is not valid for this block')
#verify that the timestamp doesn't lie in the future
tblock_timestamp = datetime.datetime.strptime(block['timestamp'], '%Y-%m-%d %H:%M:%S %Z')
now = datetime.datetime.utcfromtimestamp(datetime.datetime.now().timestamp())
timestamp_ok = (tblock_timestamp < now)
if not timestamp_ok:
print('WARNING: Block timestamp lies in the future')
return signature_ok & difficulty_ok & block_hash_ok & timestamp_ok
def write_block(block):
name = block['block_hash']
filename = os.path.join(chain_directory, '{}.json'.format(name))
with open(filename, 'w') as f:
json.dump(block, f)
def read_block(name):
filename = os.path.join(chain_directory, '{}.json'.format(name))
block = json.load(open(filename))
return block