Skip to content

Commit

Permalink
Improve encryption performance (#448)
Browse files Browse the repository at this point in the history
  • Loading branch information
bdraco authored Jul 31, 2023
1 parent 54d174d commit 20833bc
Showing 1 changed file with 28 additions and 20 deletions.
48 changes: 28 additions & 20 deletions pyhap/hap_crypto.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
"""This module partially implements crypto for HAP."""
import logging
import struct

from functools import partial
from typing import List
from struct import Struct
from chacha20poly1305_reuseable import ChaCha20Poly1305Reusable as ChaCha20Poly1305
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
Expand All @@ -11,6 +13,9 @@

CRYPTO_BACKEND = default_backend()

PACK_NONCE = partial(Struct("<LQ").pack, 0)
PACK_LENGTH = Struct("H").pack


class HAP_CRYPTO:
HKDF_KEYLEN = 32 # bytes, length of expanded HKDF keys
Expand Down Expand Up @@ -74,51 +79,54 @@ def decrypt(self) -> bytes:
blocks are buffered locally.
"""
result = b""
crypt_in_buffer = self._crypt_in_buffer
length_length = self.LENGTH_LENGTH
tag_length = HAP_CRYPTO.TAG_LENGTH

while len(self._crypt_in_buffer) > self.MIN_BLOCK_LENGTH:
block_length_bytes = self._crypt_in_buffer[: self.LENGTH_LENGTH]
while len(crypt_in_buffer) > self.MIN_BLOCK_LENGTH:
block_length_bytes = crypt_in_buffer[:length_length]
block_size = struct.unpack("H", block_length_bytes)[0]
block_size_with_length = (
self.LENGTH_LENGTH + block_size + HAP_CRYPTO.TAG_LENGTH
)
block_size_with_length = length_length + block_size + tag_length

if len(self._crypt_in_buffer) < block_size_with_length:
if len(crypt_in_buffer) < block_size_with_length:
logger.debug("Incoming buffer does not have the full block")
return result

# Trim off the length
del self._crypt_in_buffer[: self.LENGTH_LENGTH]
del crypt_in_buffer[:length_length]

data_size = block_size + HAP_CRYPTO.TAG_LENGTH
nonce = pad_tls_nonce(struct.pack("Q", self._in_count))
data_size = block_size + tag_length
nonce = PACK_NONCE(self._in_count)

result += self._in_cipher.decrypt(
nonce,
bytes(self._crypt_in_buffer[:data_size]),
bytes(crypt_in_buffer[:data_size]),
bytes(block_length_bytes),
)

self._in_count += 1

# Now trim out the decrypted data
del self._crypt_in_buffer[:data_size]
del crypt_in_buffer[:data_size]

return result

def encrypt(self, data: bytes) -> bytes:
"""Encrypt and send the return bytes."""
result = b""
result: List[bytes] = []
offset = 0
total = len(data)
while offset < total:
length = min(total - offset, self.MAX_BLOCK_LENGTH)
length_bytes = struct.pack("H", length)
length_bytes = PACK_LENGTH(length)
block = bytes(data[offset : offset + length])
nonce = pad_tls_nonce(struct.pack("Q", self._out_count))
ciphertext = length_bytes + self._out_cipher.encrypt(
nonce, block, length_bytes
)
nonce = PACK_NONCE(self._out_count)
result.append(length_bytes)
result.append(self._out_cipher.encrypt(nonce, block, length_bytes))
offset += length
self._out_count += 1
result += ciphertext
return result

# Join the result once instead of concatenating each time
# as this is much faster than generating an new immutable
# byte string each time.
return b"".join(result)

0 comments on commit 20833bc

Please sign in to comment.