Module e2eeftp.auth.e2ee
End-to-End Encryption implementation for E2EEFTP.
This module provides the cryptographic primitives and handshake protocols for establishing secure, authenticated connections between E2EEFTP clients and servers. It implements the Signal protocol-inspired end-to-end encryption using:
- X25519 for Elliptic Curve Diffie-Hellman key exchange
- Ed25519 for identity verification and signatures
- AES-256-CBC for symmetric encryption
- HMAC-SHA256 for message authentication
- HKDF for key derivation
The module includes classes for symmetric encryption (AESCipher) and the full E2EE handshake process (E2EE), ensuring forward secrecy and protection against man-in-the-middle attacks.
Classes
class AESCipher (encryption_key: bytes, authentication_key: bytes)-
Expand source code
class AESCipher: """A custom cipher class that implements AES-256-CBC encryption with HMAC-SHA256 authentication, mimicking the primitives used in Signal/WhatsApp. The encrypted payload is structured as: IV || Ciphertext || HMAC Tag """ def __init__(self, encryption_key: bytes, authentication_key: bytes) -> None: """Initializes the cipher with encryption and authentication keys. Args: encryption_key (bytes): The key used for AES-256-CBC encryption. authentication_key (bytes): The key used for HMAC-SHA256 authentication. """ self.encryption_key = encryption_key self.authentication_key = authentication_key self.iv_size = 16 # AES-CBC uses a 128-bit (16-byte) IV self.tag_size = 32 # HMAC-SHA256 produces a 256-bit (32-byte) tag def encrypt(self, plaintext: bytes) -> bytes: """ Encrypts data with AES-256-CBC and signs it with HMAC-SHA256. Args: plaintext (bytes): The data to encrypt. Returns: bytes: The encrypted payload. """ iv = os.urandom(self.iv_size) # Pad plaintext to AES block size padder = padding.PKCS7(algorithms.AES.block_size).padder() padded_data = padder.update(plaintext) + padder.finalize() # Encrypt cipher = Cipher(algorithms.AES(self.encryption_key), modes.CBC(iv)) encryptor = cipher.encryptor() ciphertext = encryptor.update(padded_data) + encryptor.finalize() # Sign the IV and ciphertext mac = hmac.HMAC(self.authentication_key, hashes.SHA256()) mac.update(iv + ciphertext) tag = mac.finalize() return iv + ciphertext + tag def decrypt(self, payload: bytes) -> bytes: """ Verifies HMAC and decrypts AES-256-CBC ciphertext. Args: payload (bytes): The encrypted payload. Returns: bytes: The decrypted data. """ # Extract components from payload iv = payload[:self.iv_size] tag = payload[-self.tag_size:] ciphertext = payload[self.iv_size:-self.tag_size] # Verify the HMAC tag first mac = hmac.HMAC(self.authentication_key, hashes.SHA256()) mac.update(iv + ciphertext) mac.verify(tag) # Raises InvalidSignature on failure # Decrypt cipher = Cipher(algorithms.AES(self.encryption_key), modes.CBC(iv)) decryptor = cipher.decryptor() padded_plaintext = decryptor.update(ciphertext) + decryptor.finalize() # Unpad unpadder = padding.PKCS7(algorithms.AES.block_size).unpadder() plaintext = unpadder.update(padded_plaintext) + unpadder.finalize() return plaintextA custom cipher class that implements AES-256-CBC encryption with HMAC-SHA256 authentication, mimicking the primitives used in Signal/WhatsApp.
The encrypted payload is structured as: IV || Ciphertext || HMAC Tag
Initializes the cipher with encryption and authentication keys.
Args
encryption_key:bytes- The key used for AES-256-CBC encryption.
authentication_key:bytes- The key used for HMAC-SHA256 authentication.
Methods
def decrypt(self, payload: bytes) ‑> bytes-
Expand source code
def decrypt(self, payload: bytes) -> bytes: """ Verifies HMAC and decrypts AES-256-CBC ciphertext. Args: payload (bytes): The encrypted payload. Returns: bytes: The decrypted data. """ # Extract components from payload iv = payload[:self.iv_size] tag = payload[-self.tag_size:] ciphertext = payload[self.iv_size:-self.tag_size] # Verify the HMAC tag first mac = hmac.HMAC(self.authentication_key, hashes.SHA256()) mac.update(iv + ciphertext) mac.verify(tag) # Raises InvalidSignature on failure # Decrypt cipher = Cipher(algorithms.AES(self.encryption_key), modes.CBC(iv)) decryptor = cipher.decryptor() padded_plaintext = decryptor.update(ciphertext) + decryptor.finalize() # Unpad unpadder = padding.PKCS7(algorithms.AES.block_size).unpadder() plaintext = unpadder.update(padded_plaintext) + unpadder.finalize() return plaintextVerifies HMAC and decrypts AES-256-CBC ciphertext.
Args
payload:bytes- The encrypted payload.
Returns
bytes- The decrypted data.
def encrypt(self, plaintext: bytes) ‑> bytes-
Expand source code
def encrypt(self, plaintext: bytes) -> bytes: """ Encrypts data with AES-256-CBC and signs it with HMAC-SHA256. Args: plaintext (bytes): The data to encrypt. Returns: bytes: The encrypted payload. """ iv = os.urandom(self.iv_size) # Pad plaintext to AES block size padder = padding.PKCS7(algorithms.AES.block_size).padder() padded_data = padder.update(plaintext) + padder.finalize() # Encrypt cipher = Cipher(algorithms.AES(self.encryption_key), modes.CBC(iv)) encryptor = cipher.encryptor() ciphertext = encryptor.update(padded_data) + encryptor.finalize() # Sign the IV and ciphertext mac = hmac.HMAC(self.authentication_key, hashes.SHA256()) mac.update(iv + ciphertext) tag = mac.finalize() return iv + ciphertext + tagEncrypts data with AES-256-CBC and signs it with HMAC-SHA256.
Args
plaintext:bytes- The data to encrypt.
Returns
bytes- The encrypted payload.
class E2EE-
Expand source code
class E2EE: """ Manages the End-to-End Encryption handshake using Elliptic Curve Diffie-Hellman (ECDH) to establish a secure, ephemeral session key. """ def __init__(self) -> None: """ Initializes the E2EE object by generating an ephemeral private/public key pair for this session. """ # Generate an ephemeral private key for this session # Use X25519, the curve used by Signal/WhatsApp self._private_key = x25519.X25519PrivateKey.generate() self.public_key_bytes = self._private_key.public_key().public_bytes( encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw ) def _derive_cipher(self, peer_public_key_bytes: bytes) -> AESCipher: """ Derives a shared secret and creates a custom AESCipher object. Using the peer's public key and our own private key, this method computes a shared secret via X25519. It then uses HKDF to derive keys for AES-256 encryption and HMAC-SHA256 authentication. Args: peer_public_key_bytes (bytes): The raw public key from the other party. Returns: AESCipher: The symmetric cipher object for this session. """ peer_public_key = x25519.X25519PublicKey.from_public_bytes(peer_public_key_bytes) shared_secret = self._private_key.exchange(peer_public_key) # Derive 64 bytes: 32 for AES-256 key, 32 for HMAC-SHA256 key derived_key = HKDF( algorithm=hashes.SHA256(), length=64, salt=None, info=b'file-transfer-e2ee', ).derive(shared_secret) encryption_key = derived_key[:32] authentication_key = derived_key[32:] return AESCipher(encryption_key, authentication_key) def client_handshake(self, sock: socket.socket, client_id_priv_key: ed25519.Ed25519PrivateKey, known_server_pub_key: ed25519.Ed25519PublicKey) -> AESCipher: """ Performs the authenticated client-side handshake. This extends the ECDH handshake with a signature-based authentication step to verify both the client's and the server's identity, preventing Man-in-the-Middle (MitM) attacks. Args: sock (socket.socket): The connected client socket. client_id_priv_key (ed25519.Ed25519PrivateKey): The client's long-term private identity key. known_server_pub_key (ed25519.Ed25519PublicKey): The server's expected long-term public identity key. Returns: AESCipher: The derived symmetric cipher for this session. Raises: ConnectionError: If the server's identity cannot be verified. """ # 1. Sign our ephemeral public key with our long-term identity key. client_signature = client_id_priv_key.sign(self.public_key_bytes) client_id_pub_key_bytes = client_id_priv_key.public_key().public_bytes( encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw ) # 2. Send our ephemeral key, identity key, and signature. payload = self.public_key_bytes + client_id_pub_key_bytes + client_signature sock.sendall(len(payload).to_bytes(4, 'big')) sock.sendall(payload) # 3. Receive the server's response. size_bytes = _recv_all(sock, 4) server_payload_size = int.from_bytes(size_bytes, 'big') server_payload = _recv_all(sock, server_payload_size) # 4. Unpack and verify the server's identity and signature. server_eph_pub_bytes = server_payload[:32] server_id_pub_bytes = server_payload[32:64] server_signature = server_payload[64:] # Verify server identity against our known key known_server_pub_key_bytes = known_server_pub_key.public_bytes( encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw ) if server_id_pub_bytes != known_server_pub_key_bytes: raise ConnectionError("Server identity verification failed! Mismatched public key.") # Verify server's signature try: known_server_pub_key.verify(server_signature, server_eph_pub_bytes) except InvalidSignature: raise ConnectionError("Server identity verification failed! Invalid signature.") # 5. If all checks pass, derive the shared secret. return self._derive_cipher(server_eph_pub_bytes) def server_handshake(self, sock: socket.socket, server_id_priv_key: ed25519.Ed25519PrivateKey, authorized_client_keys: list[ed25519.Ed25519PublicKey]) -> AESCipher: """ Performs the authenticated server-side handshake. This authenticates the client against a list of authorized public keys and proves the server's identity to the client. Args: sock (socket.socket): The connected client socket. server_id_priv_key (ed25519.Ed25519PrivateKey): The server's long-term private identity key. authorized_client_keys (list[ed25519.Ed25519PublicKey]): A list of authorized client public keys. Returns: AESCipher: The derived symmetric cipher for this session. Raises: ConnectionError: If the client is not authorized or sends an invalid signature. """ # 1. Receive the client's payload. size_bytes = _recv_all(sock, 4) client_payload_size = int.from_bytes(size_bytes, 'big') client_payload = _recv_all(sock, client_payload_size) # 2. Unpack and verify the client's identity and signature. client_eph_pub_bytes = client_payload[:32] client_id_pub_bytes = client_payload[32:64] client_signature = client_payload[64:] # Check if client is authorized authorized_raw_keys = [key.public_bytes(encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw) for key in authorized_client_keys] if client_id_pub_bytes not in authorized_raw_keys: raise ConnectionError(f"Client authentication failed. Unknown public key.") # Verify client's signature try: client_id_pub_key = ed25519.Ed25519PublicKey.from_public_bytes(client_id_pub_bytes) client_id_pub_key.verify(client_signature, client_eph_pub_bytes) except InvalidSignature: raise ConnectionError("Client authentication failed. Invalid signature.") # 3. Client is authenticated. Now, prove our identity to the client. server_signature = server_id_priv_key.sign(self.public_key_bytes) server_id_pub_key_bytes = server_id_priv_key.public_key().public_bytes(encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw) payload = self.public_key_bytes + server_id_pub_key_bytes + server_signature sock.sendall(len(payload).to_bytes(4, 'big')) sock.sendall(payload) # 4. If all checks pass, derive the shared secret. return self._derive_cipher(client_eph_pub_bytes)Manages the End-to-End Encryption handshake using Elliptic Curve Diffie-Hellman (ECDH) to establish a secure, ephemeral session key.
Initializes the E2EE object by generating an ephemeral private/public key pair for this session.
Methods
def client_handshake(self,
sock: socket.socket,
client_id_priv_key: cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey,
known_server_pub_key: cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PublicKey) ‑> AESCipher-
Expand source code
def client_handshake(self, sock: socket.socket, client_id_priv_key: ed25519.Ed25519PrivateKey, known_server_pub_key: ed25519.Ed25519PublicKey) -> AESCipher: """ Performs the authenticated client-side handshake. This extends the ECDH handshake with a signature-based authentication step to verify both the client's and the server's identity, preventing Man-in-the-Middle (MitM) attacks. Args: sock (socket.socket): The connected client socket. client_id_priv_key (ed25519.Ed25519PrivateKey): The client's long-term private identity key. known_server_pub_key (ed25519.Ed25519PublicKey): The server's expected long-term public identity key. Returns: AESCipher: The derived symmetric cipher for this session. Raises: ConnectionError: If the server's identity cannot be verified. """ # 1. Sign our ephemeral public key with our long-term identity key. client_signature = client_id_priv_key.sign(self.public_key_bytes) client_id_pub_key_bytes = client_id_priv_key.public_key().public_bytes( encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw ) # 2. Send our ephemeral key, identity key, and signature. payload = self.public_key_bytes + client_id_pub_key_bytes + client_signature sock.sendall(len(payload).to_bytes(4, 'big')) sock.sendall(payload) # 3. Receive the server's response. size_bytes = _recv_all(sock, 4) server_payload_size = int.from_bytes(size_bytes, 'big') server_payload = _recv_all(sock, server_payload_size) # 4. Unpack and verify the server's identity and signature. server_eph_pub_bytes = server_payload[:32] server_id_pub_bytes = server_payload[32:64] server_signature = server_payload[64:] # Verify server identity against our known key known_server_pub_key_bytes = known_server_pub_key.public_bytes( encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw ) if server_id_pub_bytes != known_server_pub_key_bytes: raise ConnectionError("Server identity verification failed! Mismatched public key.") # Verify server's signature try: known_server_pub_key.verify(server_signature, server_eph_pub_bytes) except InvalidSignature: raise ConnectionError("Server identity verification failed! Invalid signature.") # 5. If all checks pass, derive the shared secret. return self._derive_cipher(server_eph_pub_bytes)Performs the authenticated client-side handshake.
This extends the ECDH handshake with a signature-based authentication step to verify both the client's and the server's identity, preventing Man-in-the-Middle (MitM) attacks.
Args
sock:socket.socket- The connected client socket.
client_id_priv_key:ed25519.Ed25519PrivateKey- The client's long-term private identity key.
known_server_pub_key:ed25519.Ed25519PublicKey- The server's expected long-term public identity key.
Returns
AESCipher- The derived symmetric cipher for this session.
Raises
ConnectionError- If the server's identity cannot be verified.
def server_handshake(self,
sock: socket.socket,
server_id_priv_key: cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey,
authorized_client_keys: list[cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PublicKey]) ‑> AESCipher-
Expand source code
def server_handshake(self, sock: socket.socket, server_id_priv_key: ed25519.Ed25519PrivateKey, authorized_client_keys: list[ed25519.Ed25519PublicKey]) -> AESCipher: """ Performs the authenticated server-side handshake. This authenticates the client against a list of authorized public keys and proves the server's identity to the client. Args: sock (socket.socket): The connected client socket. server_id_priv_key (ed25519.Ed25519PrivateKey): The server's long-term private identity key. authorized_client_keys (list[ed25519.Ed25519PublicKey]): A list of authorized client public keys. Returns: AESCipher: The derived symmetric cipher for this session. Raises: ConnectionError: If the client is not authorized or sends an invalid signature. """ # 1. Receive the client's payload. size_bytes = _recv_all(sock, 4) client_payload_size = int.from_bytes(size_bytes, 'big') client_payload = _recv_all(sock, client_payload_size) # 2. Unpack and verify the client's identity and signature. client_eph_pub_bytes = client_payload[:32] client_id_pub_bytes = client_payload[32:64] client_signature = client_payload[64:] # Check if client is authorized authorized_raw_keys = [key.public_bytes(encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw) for key in authorized_client_keys] if client_id_pub_bytes not in authorized_raw_keys: raise ConnectionError(f"Client authentication failed. Unknown public key.") # Verify client's signature try: client_id_pub_key = ed25519.Ed25519PublicKey.from_public_bytes(client_id_pub_bytes) client_id_pub_key.verify(client_signature, client_eph_pub_bytes) except InvalidSignature: raise ConnectionError("Client authentication failed. Invalid signature.") # 3. Client is authenticated. Now, prove our identity to the client. server_signature = server_id_priv_key.sign(self.public_key_bytes) server_id_pub_key_bytes = server_id_priv_key.public_key().public_bytes(encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw) payload = self.public_key_bytes + server_id_pub_key_bytes + server_signature sock.sendall(len(payload).to_bytes(4, 'big')) sock.sendall(payload) # 4. If all checks pass, derive the shared secret. return self._derive_cipher(client_eph_pub_bytes)Performs the authenticated server-side handshake.
This authenticates the client against a list of authorized public keys and proves the server's identity to the client.
Args
sock:socket.socket- The connected client socket.
server_id_priv_key:ed25519.Ed25519PrivateKey- The server's long-term private identity key.
authorized_client_keys:list[ed25519.Ed25519PublicKey]- A list of authorized client public keys.
Returns
AESCipher- The derived symmetric cipher for this session.
Raises
ConnectionError- If the client is not authorized or sends an invalid signature.