Crypt



In [ ]:
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives import hashes

from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric import padding

from cryptography.hazmat.primitives.ciphers.aead import AESGCM as aes

from cryptography.hazmat.backends import default_backend

from cryptography.exceptions import InvalidSignature, InvalidTag

from os import urandom

from zlib import compress, decompress

RSA

Keys generation


In [ ]:
def GenRSAKeyPair(key_size=512):
    private_key = rsa.generate_private_key(public_exponent=65537,
                                           key_size=key_size,
                                           backend=default_backend())
    return private_key

Keys Uploading and Downloading


In [ ]:
def WriteRSAKeys(private_key, password,
                 private_key_filename="private_key",
                 public_key_filename="public_key"):
    bytes_password = bytes(password, "utf-8")
    private_key_bytes = private_key.private_bytes(
        encoding=serialization.Encoding.PEM,
        format=serialization.PrivateFormat.PKCS8,
        encryption_algorithm=serialization.BestAvailableEncryption(bytes_password))
    if not private_key_filename.endswith(".pem"):
        private_key_filename = private_key_filename + ".pem"
    with open(private_key_filename, "wb") as file:
        file.write(private_key_bytes)

    public_key = private_key.public_key()
    public_key_bytes = public_key.public_bytes(
        encoding=serialization.Encoding.PEM,
        format=serialization.PublicFormat.SubjectPublicKeyInfo)
    if not public_key_filename.endswith(".pem"):
        public_key_filename = public_key_filename + ".pem"
    with open(public_key_filename, "wb") as file:
        file.write(public_key_bytes)
    return

def ReadRSAPrivateKey(password,
                      private_key_filename="private_key"):
    bytes_password = bytes(password, "utf-8")
    if not private_key_filename.endswith(".pem"):
        private_key_filename = private_key_filename + ".pem"
    with open(private_key_filename, "rb") as file:
        private_key = serialization.load_pem_private_key(
            data=file.read(),
            password=bytes_password,
            backend=default_backend())
    return private_key

def ReadRSAPublicKey(public_key_filename="public_key"):
    if not public_key_filename.endswith(".pem"):
        public_key_filename = public_key_filename + ".pem"
    with open(public_key_filename, "rb") as file:
        public_key = serialization.load_pem_public_key(
            data=file.read(),
            backend=default_backend())
    return public_key

Signing and Verification


In [ ]:
def SignUsingRSA(message, private_key):
    signature = private_key.sign(
        data=message,
        padding=padding.PSS(
            mgf=padding.MGF1(hashes.SHA512()),
            salt_length=padding.PSS.MAX_LENGTH),
        algorithm=hashes.SHA512())
    return signature

def VerifyUsingRSA(signature, message, public_key):
    try:
        public_key.verify(
            signature=signature,
            data=message,
            padding=padding.PSS(
                mgf=padding.MGF1(hashes.SHA512()),
                salt_length=padding.PSS.MAX_LENGTH),
            algorithm=hashes.SHA512())
        return True
    except InvalidSignature:
        return False

Encryption and Decryption


In [ ]:
def EncryptUsingRSA(message, public_key):
    cipher = public_key.encrypt(
        plaintext=message,
        padding=padding.OAEP(
            mgf=padding.MGF1(algorithm=hashes.SHA512()),
            algorithm=hashes.SHA512(),
            label=None))
    return cipher

def DecryptUsingRSA(cipher, private_key):
    message = private_key.decrypt(
        ciphertext=cipher,
        padding=padding.OAEP(
            mgf=padding.MGF1(algorithm=hashes.SHA512()),
            algorithm=hashes.SHA512(),
            label=None))
    return message

AES

Key generation, Encryption, Signing and Decryption, Verification


In [ ]:
def GenerateAESKeyAndNonce(key_size=256, nonce_size=96):
    key = aes.generate_key(bit_length=key_size)
    nonce = urandom(nonce_size)
    return key, nonce

def EncryptAndSignUsingAES(message, key, nonce):
    aes_obj = aes(key)
    cipher = aes_obj.encrypt(nonce, message, None)
    return cipher

def DecryptAndVerifyUsingAES(cipher, key, nonce):
    aes_obj = aes(key)
    try:
        message = aes_obj.decrypt(nonce, cipher, None)
        verification = True
    except InvalidTag:
        message = b""
        verification = False
    return verification, message

Interface


In [ ]:
def SignAndEncryptMessage(sender_private_key, receiver_public_key, text=None, filename=None):
    assert(text is not None or filename is not None)
    def len_to_2bytes(l):
        assert(l< 256 ** 2)
        return bytes((l // 256, l % 256))
    def len_to_4bytes(l):
        assert(l < 256 ** 4)
        return bytes((l // (256 ** 3), (l // (256 ** 2)) % 256, (l // 256) % 256, l % 256))

    if text is not None:
        bytes_text = bytes(text, "utf-8")
    else:
        bytes_text = b""
    if filename is not None:
        bytes_filename = bytes(filename, "utf-8")
        with open(filename, "rb") as file:
            bytes_file = file.read()
    else:
        bytes_filename = b""
        bytes_file = b""

    bytes_text_len, bytes_filename_len, bytes_file_len = tuple(map(len,
        (bytes_text, bytes_filename, bytes_file)))
    prefix_text_len, prefix_filename_len, prefix_file_len = tuple(map(len_to_4bytes,
        (bytes_text_len, bytes_filename_len, bytes_file_len)))
    message = prefix_text_len + prefix_filename_len + prefix_file_len + bytes_text + bytes_filename + bytes_file
    compressed_message = compress(message)
    aes_key, aes_iv = GenerateAESKeyAndNonce()
    aes_auth_cipher = EncryptAndSignUsingAES(compressed_message, aes_key, aes_iv)

    aes_key_len, aes_iv_len = tuple(map(len,
        (aes_key, aes_iv)))
    prefix_aes_key_len, prefix_aes_iv_len = tuple(map(len_to_2bytes,
        (aes_key_len, aes_iv_len)))
    aes_key_info = prefix_aes_key_len + prefix_aes_iv_len + aes_key + aes_iv
    cipher_aes_key_info = EncryptUsingRSA(aes_key_info, receiver_public_key)
    sign_cipher_aes_key_info = SignUsingRSA(cipher_aes_key_info, sender_private_key)

    cipher_aes_key_info_len, sign_cipher_aes_key_info_len = tuple(map(len,
        (cipher_aes_key_info, sign_cipher_aes_key_info)))
    prefix_cipher_aes_key_info_len, prefix_sign_cipher_aes_key_info_len = tuple(map(len_to_2bytes,
        (cipher_aes_key_info_len, sign_cipher_aes_key_info_len)))

    datagram = prefix_cipher_aes_key_info_len + prefix_sign_cipher_aes_key_info_len + cipher_aes_key_info + sign_cipher_aes_key_info + aes_auth_cipher
    return datagram

def DecryptAndVerifyMessage(datagram, sender_public_key, receiver_private_key):
    def two_bytes_to_len(b):
        return int(b[0]) * 256 + int(b[1])
    def four_bytes_to_len(b):
        return int(b[0]) * (256 ** 3) + int(b[1]) * (256 ** 2) + int(b[2]) * 256 + int(b[3])

    cipher_aes_key_info_len = two_bytes_to_len(datagram[:2])
    sign_cipher_aes_key_info_len = two_bytes_to_len(datagram[2:4])
    cipher_aes_key_info = datagram[4:(4 + cipher_aes_key_info_len)]
    sign_cipher_aes_key_info = datagram[(4 + cipher_aes_key_info_len):(4 + cipher_aes_key_info_len + sign_cipher_aes_key_info_len)]
    aes_auth_cipher = datagram[(4 + cipher_aes_key_info_len + sign_cipher_aes_key_info_len):]
    verification_aes_key_info = VerifyUsingRSA(sign_cipher_aes_key_info, cipher_aes_key_info, sender_public_key)
    aes_key_info = DecryptUsingRSA(cipher_aes_key_info, receiver_private_key)

    aes_key_len = two_bytes_to_len(aes_key_info[:2])
    aes_iv_len = two_bytes_to_len(aes_key_info[2:4])
    aes_key = aes_key_info[4:(4 + aes_key_len)]
    aes_iv = aes_key_info[(4 + aes_key_len):(4 + aes_key_len + aes_iv_len)]
    verification_message, compressed_message = DecryptAndVerifyUsingAES(aes_auth_cipher, aes_key, aes_iv)

    if verification_aes_key_info and verification_message:
        message = decompress(compressed_message)
        bytes_text_len = four_bytes_to_len(message[:4])
        bytes_filename_len = four_bytes_to_len(message[4:8])
        bytes_file_len = four_bytes_to_len(message[8:12])
        bytes_text = message[12:(12 + bytes_text_len)]
        bytes_filename = message[(12 + bytes_text_len):(12 + bytes_text_len + bytes_filename_len)]
        bytes_file = message[(12 + bytes_text_len + bytes_filename_len):]
        if bytes_text_len != 0:
            text = bytes_text.decode("utf-8")
        else:
            text = ""
        if bytes_filename_len != 0:
            filename = bytes_filename.decode("utf-8")
            with open(filename, "wb") as file:
                file.write(bytes_file)
        return True, text
    else:
        return False, ""

Simple Test


In [ ]:
sender_private_key = GenRSAKeyPair(key_size=8192)
sender_public_key = sender_private_key.public_key()

receiver_private_key = GenRSAKeyPair(key_size=8192)
receiver_public_key = receiver_private_key.public_key()

datagram = SignAndEncryptMessage(sender_private_key, receiver_public_key, text="Просто пример текста", filename="23.jpg")

verification, text = DecryptAndVerifyMessage(datagram, sender_public_key, receiver_private_key)

print(verification, text)