Source code for eventsourcing.utils.cipher.aes

from Crypto.Cipher import AES

from eventsourcing.exceptions import DataIntegrityError
from eventsourcing.utils.random import random_bytes


[docs]class AESCipher(object): """ Cipher strategy that uses Crypto library AES cipher in GCM mode. """
[docs] def __init__(self, cipher_key: bytes): """ Initialises AES cipher strategy with ``cipher_key``. :param cipher_key: 16, 24, or 32 random bytes """ assert len(cipher_key) in [16, 24, 32] self.cipher_key = cipher_key
[docs] def encrypt(self, plaintext: bytes) -> bytes: """Return ciphertext for given plaintext.""" # Construct AES-GCM cipher, with 96-bit nonce. cipher = AES.new(self.cipher_key, AES.MODE_GCM, nonce=random_bytes(12)) # Encrypt and digest. encrypted, tag = cipher.encrypt_and_digest(plaintext) # type: ignore # Combine with nonce. ciphertext = cipher.nonce + tag + encrypted # type: ignore # Return ciphertext. return ciphertext
[docs] def decrypt(self, ciphertext: bytes) -> bytes: """Return plaintext for given ciphertext.""" # Split out the nonce, tag, and encrypted data. nonce = ciphertext[:12] if len(nonce) != 12: raise DataIntegrityError("Cipher text is damaged: invalid nonce length") tag = ciphertext[12:28] if len(tag) != 16: raise DataIntegrityError("Cipher text is damaged: invalid tag length") encrypted = ciphertext[28:] # Construct AES cipher, with old nonce. cipher = AES.new(self.cipher_key, AES.MODE_GCM, nonce) # Decrypt and verify. try: plaintext = cipher.decrypt_and_verify(encrypted, tag) # type: ignore except ValueError as e: raise DataIntegrityError("Cipher text is damaged: {}".format(e)) return plaintext