Package e2eeftp

Changelog: changelog.html

e2eeftp: A prototype for end-to-end encrypted file transfers.

This package contains the core client and server logic for a secure file transfer application using a Diffie-Hellman key exchange.

Sub-modules

e2eeftp.auth

This is the authentication module for the server and the client.

e2eeftp.cli

Command-line interface for E2EEFTP …

e2eeftp.client

Client package for E2EEFTP …

e2eeftp.server

Server package for E2EEFTP …

Functions

def generate_keys()
Expand source code
def generate_keys():
    """
    Generates and saves Ed25519 key pairs for server and client authentication.

    This function creates cryptographic key pairs for both the server and client
    components of E2EEFTP. It generates:

    - Server private key (server_id.key) - kept secret on the server
    - Server public key (known_server.pub) - shared with clients
    - Client private key (client_id.key) - kept secret on the client
    - Client public key - added to authorized_clients.pub for server authorization

    The keys are saved in PEM format for private keys and appropriate formats
    for public keys. The function provides user feedback about where to place
    the generated files.

    Note:
        This function appends to authorized_clients.pub if it exists, allowing
        multiple client keys to be authorized.
    """
    # --- Generate Server Keys ---
    print("--- Generating Server Keys ---")
    server_priv_key = ed25519.Ed25519PrivateKey.generate()
    server_pub_key = server_priv_key.public_key()

    # Save server private key in PEM format
    with open("server_id.key", "wb") as f:
        f.write(server_priv_key.private_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PrivateFormat.PKCS8,
            encryption_algorithm=serialization.NoEncryption()
        ))
    print("Saved 'server_id.key' [blue](private)[/blue]. Place this in your server's root directory.")

    # Save server public key in PEM format (for client's known_server.pub)
    with open("known_server.pub", "wb") as f:
        f.write(server_pub_key.public_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PublicFormat.SubjectPublicKeyInfo
        ))
    print("Saved 'known_server.pub' [blue](public)[/blue]. Copy this to your client's directory.")

    # --- Generate Client Keys ---
    print("\n--- Generating Client Keys ---")
    client_priv_key = ed25519.Ed25519PrivateKey.generate()
    client_pub_key = client_priv_key.public_key()

    # Save client private key in PEM format
    with open("client_id.key", "wb") as f:
        f.write(client_priv_key.private_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PrivateFormat.PKCS8,
            encryption_algorithm=serialization.NoEncryption()
        ))
    print("Saved 'client_id.key' [blue](private)[/blue]. Place this in your client's directory.")

    # Get client public key in raw format, then base64 encode it for authorized_clients.pub
    client_pub_key_raw_b64 = base64.b64encode(client_pub_key.public_bytes(
        encoding=serialization.Encoding.Raw,
        format=serialization.PublicFormat.Raw
    ))

    # Create or append to the authorized_clients.pub file
    with open("authorized_clients.pub", "a") as f:
        f.write(client_pub_key_raw_b64.decode() + '\n')
    
    print("\n--- Authorization ---")
    print("The client's public key has been added to 'authorized_clients.pub'.")
    print("Place this file in your server's root directory.")
    print(f"Key added: [yellow]{client_pub_key_raw_b64.decode()}[/yellow]")
    print("------------------------")

Generates and saves Ed25519 key pairs for server and client authentication.

This function creates cryptographic key pairs for both the server and client components of E2EEFTP. It generates:

  • Server private key (server_id.key) - kept secret on the server
  • Server public key (known_server.pub) - shared with clients
  • Client private key (client_id.key) - kept secret on the client
  • Client public key - added to authorized_clients.pub for server authorization

The keys are saved in PEM format for private keys and appropriate formats for public keys. The function provides user feedback about where to place the generated files.

Note

This function appends to authorized_clients.pub if it exists, allowing multiple client keys to be authorized.

Classes

class AESCipher (encryption_key: bytes, authentication_key: bytes)
Expand source code
class AESCipher:
    """A custom cipher class that implements AES-256-CBC encryption with
    HMAC-SHA256 authentication, mimicking the primitives used in Signal/WhatsApp.

    The encrypted payload is structured as: IV || Ciphertext || HMAC Tag
    """
    def __init__(self, encryption_key: bytes, authentication_key: bytes) -> None:
        """Initializes the cipher with encryption and authentication keys.

        Args:
            encryption_key (bytes): The key used for AES-256-CBC encryption.
            authentication_key (bytes): The key used for HMAC-SHA256 authentication.
        """
        self.encryption_key = encryption_key
        self.authentication_key = authentication_key
        self.iv_size = 16  # AES-CBC uses a 128-bit (16-byte) IV
        self.tag_size = 32 # HMAC-SHA256 produces a 256-bit (32-byte) tag

    def encrypt(self, plaintext: bytes) -> bytes:
        """
        Encrypts data with AES-256-CBC and signs it with HMAC-SHA256.
        
        Args:
            plaintext (bytes): The data to encrypt.

        Returns:
            bytes: The encrypted payload.
        """
        iv = os.urandom(self.iv_size)
        
        # Pad plaintext to AES block size
        padder = padding.PKCS7(algorithms.AES.block_size).padder()
        padded_data = padder.update(plaintext) + padder.finalize()

        # Encrypt
        cipher = Cipher(algorithms.AES(self.encryption_key), modes.CBC(iv))
        encryptor = cipher.encryptor()
        ciphertext = encryptor.update(padded_data) + encryptor.finalize()

        # Sign the IV and ciphertext
        mac = hmac.HMAC(self.authentication_key, hashes.SHA256())
        mac.update(iv + ciphertext)
        tag = mac.finalize()

        return iv + ciphertext + tag

    def decrypt(self, payload: bytes) -> bytes:
        """
        Verifies HMAC and decrypts AES-256-CBC ciphertext.
        
        Args:
            payload (bytes): The encrypted payload.

        Returns:
            bytes: The decrypted data.
        """
        # Extract components from payload
        iv = payload[:self.iv_size]
        tag = payload[-self.tag_size:]
        ciphertext = payload[self.iv_size:-self.tag_size]

        # Verify the HMAC tag first
        mac = hmac.HMAC(self.authentication_key, hashes.SHA256())
        mac.update(iv + ciphertext)
        mac.verify(tag) # Raises InvalidSignature on failure

        # Decrypt
        cipher = Cipher(algorithms.AES(self.encryption_key), modes.CBC(iv))
        decryptor = cipher.decryptor()
        padded_plaintext = decryptor.update(ciphertext) + decryptor.finalize()

        # Unpad
        unpadder = padding.PKCS7(algorithms.AES.block_size).unpadder()
        plaintext = unpadder.update(padded_plaintext) + unpadder.finalize()

        return plaintext

A custom cipher class that implements AES-256-CBC encryption with HMAC-SHA256 authentication, mimicking the primitives used in Signal/WhatsApp.

The encrypted payload is structured as: IV || Ciphertext || HMAC Tag

Initializes the cipher with encryption and authentication keys.

Args

encryption_key : bytes
The key used for AES-256-CBC encryption.
authentication_key : bytes
The key used for HMAC-SHA256 authentication.

Methods

def decrypt(self, payload: bytes) ‑> bytes
Expand source code
def decrypt(self, payload: bytes) -> bytes:
    """
    Verifies HMAC and decrypts AES-256-CBC ciphertext.
    
    Args:
        payload (bytes): The encrypted payload.

    Returns:
        bytes: The decrypted data.
    """
    # Extract components from payload
    iv = payload[:self.iv_size]
    tag = payload[-self.tag_size:]
    ciphertext = payload[self.iv_size:-self.tag_size]

    # Verify the HMAC tag first
    mac = hmac.HMAC(self.authentication_key, hashes.SHA256())
    mac.update(iv + ciphertext)
    mac.verify(tag) # Raises InvalidSignature on failure

    # Decrypt
    cipher = Cipher(algorithms.AES(self.encryption_key), modes.CBC(iv))
    decryptor = cipher.decryptor()
    padded_plaintext = decryptor.update(ciphertext) + decryptor.finalize()

    # Unpad
    unpadder = padding.PKCS7(algorithms.AES.block_size).unpadder()
    plaintext = unpadder.update(padded_plaintext) + unpadder.finalize()

    return plaintext

Verifies HMAC and decrypts AES-256-CBC ciphertext.

Args

payload : bytes
The encrypted payload.

Returns

bytes
The decrypted data.
def encrypt(self, plaintext: bytes) ‑> bytes
Expand source code
def encrypt(self, plaintext: bytes) -> bytes:
    """
    Encrypts data with AES-256-CBC and signs it with HMAC-SHA256.
    
    Args:
        plaintext (bytes): The data to encrypt.

    Returns:
        bytes: The encrypted payload.
    """
    iv = os.urandom(self.iv_size)
    
    # Pad plaintext to AES block size
    padder = padding.PKCS7(algorithms.AES.block_size).padder()
    padded_data = padder.update(plaintext) + padder.finalize()

    # Encrypt
    cipher = Cipher(algorithms.AES(self.encryption_key), modes.CBC(iv))
    encryptor = cipher.encryptor()
    ciphertext = encryptor.update(padded_data) + encryptor.finalize()

    # Sign the IV and ciphertext
    mac = hmac.HMAC(self.authentication_key, hashes.SHA256())
    mac.update(iv + ciphertext)
    tag = mac.finalize()

    return iv + ciphertext + tag

Encrypts data with AES-256-CBC and signs it with HMAC-SHA256.

Args

plaintext : bytes
The data to encrypt.

Returns

bytes
The encrypted payload.
class E2EE
Expand source code
class E2EE:
    """
    Manages the End-to-End Encryption handshake using Elliptic Curve
    Diffie-Hellman (ECDH) to establish a secure, ephemeral session key.
    """
    def __init__(self) -> None:
        """
        Initializes the E2EE object by generating an ephemeral private/public
        key pair for this session.
        """
        # Generate an ephemeral private key for this session
        # Use X25519, the curve used by Signal/WhatsApp
        self._private_key = x25519.X25519PrivateKey.generate()
        self.public_key_bytes = self._private_key.public_key().public_bytes(
            encoding=serialization.Encoding.Raw,
            format=serialization.PublicFormat.Raw
        )

    def _derive_cipher(self, peer_public_key_bytes: bytes) -> AESCipher:
        """
        Derives a shared secret and creates a custom AESCipher object.

        Using the peer's public key and our own private key, this method
        computes a shared secret via X25519. It then uses HKDF to derive
        keys for AES-256 encryption and HMAC-SHA256 authentication.

        Args:
            peer_public_key_bytes (bytes): The raw public key from the
                                           other party.

        Returns:
            AESCipher: The symmetric cipher object for this session.
        """
        peer_public_key = x25519.X25519PublicKey.from_public_bytes(peer_public_key_bytes)
        shared_secret = self._private_key.exchange(peer_public_key)
        
        # Derive 64 bytes: 32 for AES-256 key, 32 for HMAC-SHA256 key
        derived_key = HKDF(
            algorithm=hashes.SHA256(),
            length=64,
            salt=None,
            info=b'file-transfer-e2ee',
        ).derive(shared_secret)

        encryption_key = derived_key[:32]
        authentication_key = derived_key[32:]
        return AESCipher(encryption_key, authentication_key)

    def client_handshake(self, sock: socket.socket, client_id_priv_key: ed25519.Ed25519PrivateKey, known_server_pub_key: ed25519.Ed25519PublicKey) -> AESCipher:
        """
        Performs the authenticated client-side handshake.

        This extends the ECDH handshake with a signature-based authentication
        step to verify both the client's and the server's identity, preventing
        Man-in-the-Middle (MitM) attacks.

        Args:
            sock (socket.socket): The connected client socket.
            client_id_priv_key (ed25519.Ed25519PrivateKey): The client's long-term private identity key.
            known_server_pub_key (ed25519.Ed25519PublicKey): The server's expected long-term public identity key.

        Returns:
            AESCipher: The derived symmetric cipher for this session.
        
        Raises:
            ConnectionError: If the server's identity cannot be verified.
        """
        # 1. Sign our ephemeral public key with our long-term identity key.
        client_signature = client_id_priv_key.sign(self.public_key_bytes)
        client_id_pub_key_bytes = client_id_priv_key.public_key().public_bytes(
            encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw
        )

        # 2. Send our ephemeral key, identity key, and signature.
        payload = self.public_key_bytes + client_id_pub_key_bytes + client_signature
        sock.sendall(len(payload).to_bytes(4, 'big'))
        sock.sendall(payload)

        # 3. Receive the server's response.
        size_bytes = _recv_all(sock, 4)
        server_payload_size = int.from_bytes(size_bytes, 'big')
        server_payload = _recv_all(sock, server_payload_size)

        # 4. Unpack and verify the server's identity and signature.
        server_eph_pub_bytes = server_payload[:32]
        server_id_pub_bytes = server_payload[32:64]
        server_signature = server_payload[64:]

        # Verify server identity against our known key
        known_server_pub_key_bytes = known_server_pub_key.public_bytes(
            encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw
        )
        if server_id_pub_bytes != known_server_pub_key_bytes:
            raise ConnectionError("Server identity verification failed! Mismatched public key.")

        # Verify server's signature
        try:
            known_server_pub_key.verify(server_signature, server_eph_pub_bytes)
        except InvalidSignature:
            raise ConnectionError("Server identity verification failed! Invalid signature.")

        # 5. If all checks pass, derive the shared secret.
        return self._derive_cipher(server_eph_pub_bytes)

    def server_handshake(self, sock: socket.socket, server_id_priv_key: ed25519.Ed25519PrivateKey, authorized_client_keys: list[ed25519.Ed25519PublicKey]) -> AESCipher:
        """
        Performs the authenticated server-side handshake.

        This authenticates the client against a list of authorized public keys
        and proves the server's identity to the client.

        Args:
            sock (socket.socket): The connected client socket.
            server_id_priv_key (ed25519.Ed25519PrivateKey): The server's long-term private identity key.
            authorized_client_keys (list[ed25519.Ed25519PublicKey]): A list of authorized client public keys.

        Returns:
            AESCipher: The derived symmetric cipher for this session.
        
        Raises:
            ConnectionError: If the client is not authorized or sends an invalid signature.
        """
        # 1. Receive the client's payload.
        size_bytes = _recv_all(sock, 4)
        client_payload_size = int.from_bytes(size_bytes, 'big')
        client_payload = _recv_all(sock, client_payload_size)

        # 2. Unpack and verify the client's identity and signature.
        client_eph_pub_bytes = client_payload[:32]
        client_id_pub_bytes = client_payload[32:64]
        client_signature = client_payload[64:]

        # Check if client is authorized
        authorized_raw_keys = [key.public_bytes(encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw) for key in authorized_client_keys]
        if client_id_pub_bytes not in authorized_raw_keys:
            raise ConnectionError(f"Client authentication failed. Unknown public key.")

        # Verify client's signature
        try:
            client_id_pub_key = ed25519.Ed25519PublicKey.from_public_bytes(client_id_pub_bytes)
            client_id_pub_key.verify(client_signature, client_eph_pub_bytes)
        except InvalidSignature:
            raise ConnectionError("Client authentication failed. Invalid signature.")

        # 3. Client is authenticated. Now, prove our identity to the client.
        server_signature = server_id_priv_key.sign(self.public_key_bytes)
        server_id_pub_key_bytes = server_id_priv_key.public_key().public_bytes(encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw)
        payload = self.public_key_bytes + server_id_pub_key_bytes + server_signature
        sock.sendall(len(payload).to_bytes(4, 'big'))
        sock.sendall(payload)

        # 4. If all checks pass, derive the shared secret.
        return self._derive_cipher(client_eph_pub_bytes)

Manages the End-to-End Encryption handshake using Elliptic Curve Diffie-Hellman (ECDH) to establish a secure, ephemeral session key.

Initializes the E2EE object by generating an ephemeral private/public key pair for this session.

Methods

def client_handshake(self,
sock: socket.socket,
client_id_priv_key: cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey,
known_server_pub_key: cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PublicKey) ‑> AESCipher
Expand source code
def client_handshake(self, sock: socket.socket, client_id_priv_key: ed25519.Ed25519PrivateKey, known_server_pub_key: ed25519.Ed25519PublicKey) -> AESCipher:
    """
    Performs the authenticated client-side handshake.

    This extends the ECDH handshake with a signature-based authentication
    step to verify both the client's and the server's identity, preventing
    Man-in-the-Middle (MitM) attacks.

    Args:
        sock (socket.socket): The connected client socket.
        client_id_priv_key (ed25519.Ed25519PrivateKey): The client's long-term private identity key.
        known_server_pub_key (ed25519.Ed25519PublicKey): The server's expected long-term public identity key.

    Returns:
        AESCipher: The derived symmetric cipher for this session.
    
    Raises:
        ConnectionError: If the server's identity cannot be verified.
    """
    # 1. Sign our ephemeral public key with our long-term identity key.
    client_signature = client_id_priv_key.sign(self.public_key_bytes)
    client_id_pub_key_bytes = client_id_priv_key.public_key().public_bytes(
        encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw
    )

    # 2. Send our ephemeral key, identity key, and signature.
    payload = self.public_key_bytes + client_id_pub_key_bytes + client_signature
    sock.sendall(len(payload).to_bytes(4, 'big'))
    sock.sendall(payload)

    # 3. Receive the server's response.
    size_bytes = _recv_all(sock, 4)
    server_payload_size = int.from_bytes(size_bytes, 'big')
    server_payload = _recv_all(sock, server_payload_size)

    # 4. Unpack and verify the server's identity and signature.
    server_eph_pub_bytes = server_payload[:32]
    server_id_pub_bytes = server_payload[32:64]
    server_signature = server_payload[64:]

    # Verify server identity against our known key
    known_server_pub_key_bytes = known_server_pub_key.public_bytes(
        encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw
    )
    if server_id_pub_bytes != known_server_pub_key_bytes:
        raise ConnectionError("Server identity verification failed! Mismatched public key.")

    # Verify server's signature
    try:
        known_server_pub_key.verify(server_signature, server_eph_pub_bytes)
    except InvalidSignature:
        raise ConnectionError("Server identity verification failed! Invalid signature.")

    # 5. If all checks pass, derive the shared secret.
    return self._derive_cipher(server_eph_pub_bytes)

Performs the authenticated client-side handshake.

This extends the ECDH handshake with a signature-based authentication step to verify both the client's and the server's identity, preventing Man-in-the-Middle (MitM) attacks.

Args

sock : socket.socket
The connected client socket.
client_id_priv_key : ed25519.Ed25519PrivateKey
The client's long-term private identity key.
known_server_pub_key : ed25519.Ed25519PublicKey
The server's expected long-term public identity key.

Returns

AESCipher
The derived symmetric cipher for this session.

Raises

ConnectionError
If the server's identity cannot be verified.
def server_handshake(self,
sock: socket.socket,
server_id_priv_key: cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey,
authorized_client_keys: list[cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PublicKey]) ‑> AESCipher
Expand source code
def server_handshake(self, sock: socket.socket, server_id_priv_key: ed25519.Ed25519PrivateKey, authorized_client_keys: list[ed25519.Ed25519PublicKey]) -> AESCipher:
    """
    Performs the authenticated server-side handshake.

    This authenticates the client against a list of authorized public keys
    and proves the server's identity to the client.

    Args:
        sock (socket.socket): The connected client socket.
        server_id_priv_key (ed25519.Ed25519PrivateKey): The server's long-term private identity key.
        authorized_client_keys (list[ed25519.Ed25519PublicKey]): A list of authorized client public keys.

    Returns:
        AESCipher: The derived symmetric cipher for this session.
    
    Raises:
        ConnectionError: If the client is not authorized or sends an invalid signature.
    """
    # 1. Receive the client's payload.
    size_bytes = _recv_all(sock, 4)
    client_payload_size = int.from_bytes(size_bytes, 'big')
    client_payload = _recv_all(sock, client_payload_size)

    # 2. Unpack and verify the client's identity and signature.
    client_eph_pub_bytes = client_payload[:32]
    client_id_pub_bytes = client_payload[32:64]
    client_signature = client_payload[64:]

    # Check if client is authorized
    authorized_raw_keys = [key.public_bytes(encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw) for key in authorized_client_keys]
    if client_id_pub_bytes not in authorized_raw_keys:
        raise ConnectionError(f"Client authentication failed. Unknown public key.")

    # Verify client's signature
    try:
        client_id_pub_key = ed25519.Ed25519PublicKey.from_public_bytes(client_id_pub_bytes)
        client_id_pub_key.verify(client_signature, client_eph_pub_bytes)
    except InvalidSignature:
        raise ConnectionError("Client authentication failed. Invalid signature.")

    # 3. Client is authenticated. Now, prove our identity to the client.
    server_signature = server_id_priv_key.sign(self.public_key_bytes)
    server_id_pub_key_bytes = server_id_priv_key.public_key().public_bytes(encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw)
    payload = self.public_key_bytes + server_id_pub_key_bytes + server_signature
    sock.sendall(len(payload).to_bytes(4, 'big'))
    sock.sendall(payload)

    # 4. If all checks pass, derive the shared secret.
    return self._derive_cipher(client_eph_pub_bytes)

Performs the authenticated server-side handshake.

This authenticates the client against a list of authorized public keys and proves the server's identity to the client.

Args

sock : socket.socket
The connected client socket.
server_id_priv_key : ed25519.Ed25519PrivateKey
The server's long-term private identity key.
authorized_client_keys : list[ed25519.Ed25519PublicKey]
A list of authorized client public keys.

Returns

AESCipher
The derived symmetric cipher for this session.

Raises

ConnectionError
If the client is not authorized or sends an invalid signature.
class E2EEFTPRequestHandler (request, client_address, server)
Expand source code
class E2EEFTPRequestHandler(socketserver.BaseRequestHandler):
    """
    Request handler for each client connection, instantiated by the socketserver.

    This class manages the entire lifecycle of a client connection. It is
    responsible for performing the secure handshake, parsing client commands,
    and dispatching to the appropriate handler methods (e.g., for sending or
    receiving files).

    The command dispatch approach is intentionally flexible: subclasses can
    override `command_handlers` to add or modify supported commands without
    changing _handle_request logic.
    """

    command_handlers: dict[str, str] = {
        "SEND": "_receive_file",
        "GET": "_send_file",
        "LIST": "_send_list",
        "DELETE": "_delete_file",
        "HLIST": "_hlist"
    }

    req: dict[str, Comm] = {}

    def setup(self) -> None:
        """
        Called before handle() to perform any initialization actions.
        Runs exactly once per connection/instance.
        """
        self.update_command_handlers()
        super().setup()

    def handle(self):
        """
        The main entry point for handling a new client connection.

        This method orchestrates the session:
        1.  Performs the E2EE handshake to establish a secure channel.
        2.  Waits for and parses a command from the client.
        3.  Calls the internal method corresponding to the command.
        4.  Handles any exceptions during the session.
        5.  Ensures the connection is logged and closed cleanly.
        """
        address = self.client_address
        log.info(f"Accepted connection from {address}")
        try:
            server_key_path = "server_id.key"
            auth_keys_path = "authorized_clients.pub"

            log.info("Loading server identity and authorized keys...")
            
            if not os.path.exists(server_key_path):
                log.error(f"Server identity key '{server_key_path}' not found. Cannot secure connection.")
                return

            with open(server_key_path, "rb") as f:
                server_id_priv_key = serialization.load_pem_private_key(f.read(), password=None)
            
            authorized_client_keys = []
            if os.path.exists(auth_keys_path):
                with open(auth_keys_path, "r") as f:
                    for line in f:
                        line = line.strip()
                        if line and not line.startswith('#'):
                            try:
                                key_bytes = base64.b64decode(line)
                                authorized_client_keys.append(ed25519.Ed25519PublicKey.from_public_bytes(key_bytes))
                            except Exception as e:
                                log.warning(f"Skipping invalid key in {auth_keys_path}: {e}")
            else:
                log.warning(f"'{auth_keys_path}' not found. No clients will be authorized.")
                log.warning("Run 'generate_keys.py' to create client keys and the authorization file.")

            e2ee = E2EE()
            log.info(f"Performing secure handshake with {address}...")
            cipher = e2ee.server_handshake(self.request, server_id_priv_key, authorized_client_keys)
            log.info(f"Secure handshake with {address} complete.")
            self._handle_request(cipher)
        except FileNotFoundError as e:
            log.error(f"Identity key file not found: {e}. Please generate keys and authorize clients.")
        except ConnectionError as e:
            log.error(f"Handshake failed with {address}: {e}")
        except Exception as e:
            log.error(f"Error during session with {address}: {e}")
        finally:
            log.info(f"Connection with {address} closed.")

    def _recv_until(self, delimiter: bytes) -> bytes: 
        """ 
        Receives data from the socket until a specific delimiter is found. 

        This is a helper method to read data from the stream-based TCP socket 
        in a message-oriented way. It reads one byte at a time until the 
        `delimiter` is encountered.

        Args:
            delimiter (bytes): The byte sequence that marks the end of a message.

        Returns:
            bytes: The data received from the socket, including the delimiter.
                   Returns an empty bytestring if the client disconnects before
                   sending any data.
        """
        data = b''
        while not data.endswith(delimiter):
            chunk = self.request.recv(1)
            if not chunk: break
            data += chunk
        return data

    def _handle_request(self, cipher: AESCipher) -> None:
        """
        Parses the client's command and dispatches to the correct handler.

        Uses a dispatch map so subclasses can modify/add commands by overriding
        `command_handlers`.
        """
        header_data = self._recv_until(b'\n')
        if not header_data:
            return

        try:
            parts = header_data.decode().strip().split("|")
            command = parts[0].upper()

            handler_name = self.command_handlers.get(command)
            if handler_name is None:
                self.request.sendall(b"400|Invalid Command\n")
                log.warning(f"Invalid command from {self.client_address}: {command}")
                return

            handler = getattr(self, handler_name, None)
            if not callable(handler):
                self.request.sendall(b"500|Server Misconfigured\n")
                log.error(f"Handler {handler_name} for command {command} not implemented")
                return

            handler(*self._arg_paser(parts, cipher)) 

        except (IndexError, ValueError) as e:
            log.error(f"Malformed request from {self.client_address}: {header_data.strip()!r} - {e}")
            self.request.sendall(b"400|Malformed request\n")

    def _arg_paser(self, request_parts: list[str], cipher: AESCipher) -> tuple:
        """_summary_

        Args:
            request_parts (list[str]): The list of parts from the client's command header in the form of a list.

        Returns:
            tuple: The arguments to be passed to the command handler method, based on the command type. The mapping is as follows:
        """
        cmd_args: list = []
        command = request_parts[0].upper()
        if command == "SEND":
            cmd_args = [request_parts[1], int(request_parts[2]), cipher]
        elif command == "GET":
            cmd_args = [request_parts[1], cipher]
        elif command == "LIST" or command == "HLIST":
            cmd_args = []
        elif command == "DELETE":
            cmd_args = [request_parts[1]]
        return tuple(cmd_args) 

    def _hlist(self) -> None:
        """
        Sends a text file of all available commands that the server supports to the client.
        
        The server responds with a header `200|<content_length>` followed by a newline-separated string of commands.
        **Protocol**:
        1.  Sends header: `b"200|<size>"`
        2.  Sends body: A string of commands.
        """
        self.req["HLIST"] = Hlist(commands=list(self.command_handlers.keys()), request=self.request, log=log, )
        self.req["HLIST"].__hlist__ = self._hlist.__name__
        self.req["HLIST"].run()

    def _send_list(self) -> None:
        """
        Sends a list of available files in the 'received' directory to the client.

        The server responds with a header `200|<content_length>` followed by a
        newline-separated string of filenames.

        **Protocol**:
        1.  Sends header: `b"200|<size>"`
        2.  Sends body: A string of filenames.
        """
        self.req["LIST"] = List(request=self.request, log=log)
        self.req["LIST"].set_hlist(self._send_list.__name__)
        self.req["LIST"].run()

    def _delete_file(self, filename: str) -> None:
        """
        Deletes a specified file from the server's 'received' directory.

        Args:
            filename (str): The name of the file to delete.

        **Responses**:
        - On success: `b"200|File deleted\\n"`
        - If file not found: `b"404|File not found\\n"`
        """
        self.req["DELETE"] = Delete(filename=filename, request=self.request, log=log)
        self.req["DELETE"].set_hlist(self._delete_file.__name__)
        self.req["DELETE"].run()

    def _receive_file(self, filename: str, filesize: int, cipher: AESCipher) -> None:
        """
        Receives, decrypts, and saves a file sent by the client.

        This method reads a specified number of bytes (`filesize`) from the socket,
        which contains the encrypted file data. It then attempts to decrypt this
        data using the session's cipher and saves it to the `received` directory.

        Args:
            filename (str): The name to save the file as.
            filesize (int): The exact size of the incoming encrypted data buffer.
            cipher (AESCipher): The cipher instance for this session.

        **Responses**:
        - On success: `b"226|Transfer Complete\\n"`
        - On decryption failure: `b"500|Decryption Failed\\n"`
        """
        self.req["SEND"] = Send(filename=filename, filesize=filesize, cipher=cipher, request=self.request, log=log)
        self.req["SEND"].set_hlist(self._receive_file.__name__)
        self.req["SEND"].run()

    def _send_file(self, filename: str, cipher: AESCipher) -> None:
        """
        Encrypts and sends a requested file to the client.

        If the file exists in the 'received' directory, it is read, encrypted
        with the session cipher, and sent over the socket.

        Args:
            filename (str): The name of the file to send.
            cipher (AESCipher): The cipher instance for this session.

        **Protocol & Responses**:
        - If file found:
            1. Sends header: `b"200|<encrypted_size>\\n"`
            2. Sends body: The encrypted file data.
        - If file not found: `b"404|File not found: {filename}\\n"`
        - On server-side error: `b"500|Server Read Error\\n"`
        """
        self.req["GET"] = Get(filename=filename, cipher=cipher, request=self.request, log=log)
        self.req["GET"].set_hlist(self._send_file.__name__)
        self.req["GET"].run()

    def update_command_handlers(self):
        """
        Update the command handlers dictionary with dynamic handlers from request objects.

        This method iterates through the req dictionary (containing command objects)
        and updates the command_handlers mapping to include the __hlist__ method
        names for each command. This allows for dynamic command handler registration
        based on the current request state.

        The updated handlers are logged at debug level for troubleshooting.
        """
        self.command_handlers.update({comm_name: comm.__hlist__ for comm_name, comm in self.req.items()})
        log.debug(self.command_handlers)

Request handler for each client connection, instantiated by the socketserver.

This class manages the entire lifecycle of a client connection. It is responsible for performing the secure handshake, parsing client commands, and dispatching to the appropriate handler methods (e.g., for sending or receiving files).

The command dispatch approach is intentionally flexible: subclasses can override command_handlers to add or modify supported commands without changing _handle_request logic.

Ancestors

  • socketserver.BaseRequestHandler

Class variables

var command_handlers : dict[str, str]

The type of the None singleton.

var req : dict[str, Comm]

The type of the None singleton.

Methods

def handle(self)
Expand source code
def handle(self):
    """
    The main entry point for handling a new client connection.

    This method orchestrates the session:
    1.  Performs the E2EE handshake to establish a secure channel.
    2.  Waits for and parses a command from the client.
    3.  Calls the internal method corresponding to the command.
    4.  Handles any exceptions during the session.
    5.  Ensures the connection is logged and closed cleanly.
    """
    address = self.client_address
    log.info(f"Accepted connection from {address}")
    try:
        server_key_path = "server_id.key"
        auth_keys_path = "authorized_clients.pub"

        log.info("Loading server identity and authorized keys...")
        
        if not os.path.exists(server_key_path):
            log.error(f"Server identity key '{server_key_path}' not found. Cannot secure connection.")
            return

        with open(server_key_path, "rb") as f:
            server_id_priv_key = serialization.load_pem_private_key(f.read(), password=None)
        
        authorized_client_keys = []
        if os.path.exists(auth_keys_path):
            with open(auth_keys_path, "r") as f:
                for line in f:
                    line = line.strip()
                    if line and not line.startswith('#'):
                        try:
                            key_bytes = base64.b64decode(line)
                            authorized_client_keys.append(ed25519.Ed25519PublicKey.from_public_bytes(key_bytes))
                        except Exception as e:
                            log.warning(f"Skipping invalid key in {auth_keys_path}: {e}")
        else:
            log.warning(f"'{auth_keys_path}' not found. No clients will be authorized.")
            log.warning("Run 'generate_keys.py' to create client keys and the authorization file.")

        e2ee = E2EE()
        log.info(f"Performing secure handshake with {address}...")
        cipher = e2ee.server_handshake(self.request, server_id_priv_key, authorized_client_keys)
        log.info(f"Secure handshake with {address} complete.")
        self._handle_request(cipher)
    except FileNotFoundError as e:
        log.error(f"Identity key file not found: {e}. Please generate keys and authorize clients.")
    except ConnectionError as e:
        log.error(f"Handshake failed with {address}: {e}")
    except Exception as e:
        log.error(f"Error during session with {address}: {e}")
    finally:
        log.info(f"Connection with {address} closed.")

The main entry point for handling a new client connection.

This method orchestrates the session: 1. Performs the E2EE handshake to establish a secure channel. 2. Waits for and parses a command from the client. 3. Calls the internal method corresponding to the command. 4. Handles any exceptions during the session. 5. Ensures the connection is logged and closed cleanly.

def setup(self) ‑> None
Expand source code
def setup(self) -> None:
    """
    Called before handle() to perform any initialization actions.
    Runs exactly once per connection/instance.
    """
    self.update_command_handlers()
    super().setup()

Called before handle() to perform any initialization actions. Runs exactly once per connection/instance.

def update_command_handlers(self)
Expand source code
def update_command_handlers(self):
    """
    Update the command handlers dictionary with dynamic handlers from request objects.

    This method iterates through the req dictionary (containing command objects)
    and updates the command_handlers mapping to include the __hlist__ method
    names for each command. This allows for dynamic command handler registration
    based on the current request state.

    The updated handlers are logged at debug level for troubleshooting.
    """
    self.command_handlers.update({comm_name: comm.__hlist__ for comm_name, comm in self.req.items()})
    log.debug(self.command_handlers)

Update the command handlers dictionary with dynamic handlers from request objects.

This method iterates through the req dictionary (containing command objects) and updates the command_handlers mapping to include the hlist method names for each command. This allows for dynamic command handler registration based on the current request state.

The updated handlers are logged at debug level for troubleshooting.

class e2eeftp (host: str = '127.0.0.1', port: int = 5001, logging: bool = True)
Expand source code
class e2eeftp(socketserver.ThreadingTCPServer):
    """
    A multi-threaded TCP server for secure file transfers.

    This server uses `socketserver.ThreadingTCPServer` to handle each incoming
    client connection in a separate thread. This allows for concurrent file
    transfer operations.

    Security is established on a per-connection basis using an Elliptic Curve
    Diffie-Hellman (ECDH) key exchange. This generates a unique, ephemeral
    session key for each client, providing forward secrecy. All file data
    transferred after the handshake is encrypted with this key.

    The server listens for commands like SEND, GET, LIST, and DELETE.

    Attributes:
        allow_reuse_address (bool): Allows the server to restart and bind to the
                                    same address quickly.
        host (str): The IP address the server is bound to.
        port (int): The port the server is listening on.

    Supported Status Codes:
        - 200: Success
        - 226: Transfer Complete
        - 400: Bad Request / Invalid Command
        - 404: File Not Found
        - 500: Internal Server Error (e.g., Decryption Failed)

    The header format is:
        - SEND: SEND|filename|encrypted_data
        - GET: GET|filename
        - LIST: LIST
        - DELETE: DELETE|filename
    """
    allow_reuse_address = True

    def __init__(self, host: str='127.0.0.1', port: int=5001, logging: bool=True) -> None:
        """
        Initializes the server and binds it to a host and port.

        Args:
            host (str): The network interface IP to bind to. Defaults to '127.0.0.1'
                        (localhost). Use '0.0.0.0' to listen on all interfaces.
            port (int): The port number to listen on. Defaults to 5001.
            enable_logging (bool): If False, disables server logging output.
        """
        super().__init__((host, port), E2EEFTPRequestHandler)
        self.host, self.port = host, port
        self.enable_logging = logging
        log.disabled = not self.enable_logging

    def _generate_server_keys_if_missing(self) -> None:
        """
        Generates and saves server key pair if it doesn't exist.
        """
        server_key_path = "server_id.key"
        if not os.path.exists(server_key_path):
            log.warning(f"Server identity key '{server_key_path}' not found. Generating a new one.")

            server_priv_key = ed25519.Ed25519PrivateKey.generate()
            server_pub_key = server_priv_key.public_key()

            # Save server private key in PEM format
            with open(server_key_path, "wb") as f:
                f.write(server_priv_key.private_bytes(
                    encoding=serialization.Encoding.PEM,
                    format=serialization.PrivateFormat.PKCS8,
                    encryption_algorithm=serialization.NoEncryption()
                ))
            log.info(f"Saved '{server_key_path}' (private). This is your server's permanent identity.")

            # Save server public key in PEM format (for client's known_server.pub)
            with open("known_server.pub", "wb") as f:
                f.write(server_pub_key.public_bytes(
                    encoding=serialization.Encoding.PEM,
                    format=serialization.PublicFormat.SubjectPublicKeyInfo
                ))
            log.info("Saved 'known_server.pub' (public). Copy this file to your client's directory.")
            log.warning("You must still generate client keys and add their public keys to 'authorized_clients.pub' for them to connect.")

    def run(self) -> None:
        """
        Starts the server's main loop to listen for and handle connections.

        This method calls `serve_forever()`, which blocks and waits for incoming
        connections. Each connection is then passed to an instance of
        `E2EEFTPRequestHandler` for processing in a new thread.
        """
        self._generate_server_keys_if_missing()
        log.info(f"host: {self.host}, port: {self.port}")
        log.info("press ctrl+c to exit")
        log.info(f"Server listening on {self.host}:{self.port}")
        try:
            self.serve_forever()
        except KeyboardInterrupt:
            log.warning("Shutting down...")
        finally:
            self.server_close()

A multi-threaded TCP server for secure file transfers.

This server uses socketserver.ThreadingTCPServer to handle each incoming client connection in a separate thread. This allows for concurrent file transfer operations.

Security is established on a per-connection basis using an Elliptic Curve Diffie-Hellman (ECDH) key exchange. This generates a unique, ephemeral session key for each client, providing forward secrecy. All file data transferred after the handshake is encrypted with this key.

The server listens for commands like SEND, GET, LIST, and DELETE.

Attributes

allow_reuse_address : bool
Allows the server to restart and bind to the same address quickly.
host : str
The IP address the server is bound to.
port : int
The port the server is listening on.

Supported Status Codes: - 200: Success - 226: Transfer Complete - 400: Bad Request / Invalid Command - 404: File Not Found - 500: Internal Server Error (e.g., Decryption Failed)

The header format is: - SEND: SEND|filename|encrypted_data - GET: GET|filename - LIST: LIST - DELETE: DELETE|filename

Initializes the server and binds it to a host and port.

Args

host : str
The network interface IP to bind to. Defaults to '127.0.0.1' (localhost). Use '0.0.0.0' to listen on all interfaces.
port : int
The port number to listen on. Defaults to 5001.
enable_logging : bool
If False, disables server logging output.

Ancestors

  • socketserver.ThreadingTCPServer
  • socketserver.ThreadingMixIn
  • socketserver.TCPServer
  • socketserver.BaseServer

Class variables

var allow_reuse_address

The type of the None singleton.

Methods

def run(self) ‑> None
Expand source code
def run(self) -> None:
    """
    Starts the server's main loop to listen for and handle connections.

    This method calls `serve_forever()`, which blocks and waits for incoming
    connections. Each connection is then passed to an instance of
    `E2EEFTPRequestHandler` for processing in a new thread.
    """
    self._generate_server_keys_if_missing()
    log.info(f"host: {self.host}, port: {self.port}")
    log.info("press ctrl+c to exit")
    log.info(f"Server listening on {self.host}:{self.port}")
    try:
        self.serve_forever()
    except KeyboardInterrupt:
        log.warning("Shutting down...")
    finally:
        self.server_close()

Starts the server's main loop to listen for and handle connections.

This method calls serve_forever(), which blocks and waits for incoming connections. Each connection is then passed to an instance of E2EEFTPRequestHandler for processing in a new thread.