Module linux_plus_plus.apps.ssh_daemon

Classes

class SSHDaemon (shell: Shell)
Expand source code
class SSHDaemon:
    """
    A real SSH server built on paramiko's Transport layer.

    Features:
      - RSA host key (auto-generated, saved to ~/.linuxpp/ssh/host_rsa_key)
      - Password auth via OS database (PAM → spwd → single-user fallback)
      - Public key auth via ~/.ssh/authorized_keys
      - Each client gets an isolated linux++ shell session
      - Runs in a background daemon thread — non-blocking
      - Multiple concurrent clients supported

    Commands:
      sshd start [port]   start (default port 2222)
      sshd stop           stop and disconnect all clients
      sshd status         show port and connected clients
      sshd keygen         regenerate the host key

    Requires: pip install paramiko
    """

    KEY_DIR       = os.path.join(os.path.expanduser("~"), ".linuxpp", "ssh")
    HOST_KEY_PATH = os.path.join(KEY_DIR, "host_rsa_key")
    DEFAULT_PORT  = 2222
    BANNER        = b"linux++ sshd\r\n"

    def __init__(self, shell: "Shell"):
        self._shell       = shell
        self._port        = self.DEFAULT_PORT
        self._server_sock: Optional[socket.socket] = None
        self._thread:      Optional[threading.Thread] = None
        self._running     = False
        self._clients:    list[dict] = []
        self._host_key    = None
        self._lock        = threading.Lock()

    # ------------------------------------------------------------------
    # Public commands
    # ------------------------------------------------------------------

    def start(self, port: int = DEFAULT_PORT) -> int:
        if self._running:
            IOManager.error(f"sshd: already running on port {self._port}")
            return 1
        try:
            import paramiko  # type: ignore
        except ImportError:
            IOManager.error(
                "sshd: paramiko is required.\n"
                "      pip install paramiko"
            )
            return 1

        self._port     = port
        self._host_key = self._load_or_generate_key(paramiko)
        if self._host_key is None:
            return 1

        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            sock.bind(("0.0.0.0", port))
            sock.listen(10)
            sock.settimeout(1.0)
            self._server_sock = sock
        except OSError as e:
            IOManager.error(f"sshd: cannot bind port {port}: {e}")
            return 1

        self._running = True
        self._thread  = threading.Thread(
            target=self._accept_loop,
            args=(paramiko,),
            daemon=True,
            name="sshd-accept",
        )
        self._thread.start()

        IOManager.write(
            f"sshd: listening on 0.0.0.0:{port}\n"
            f"sshd: host key fingerprint: {self._fingerprint(self._host_key)}\n"
            f"sshd: connect with:  ssh {EnvManager.username()}@localhost -p {port}"
        )
        return 0

    def stop(self) -> int:
        if not self._running:
            IOManager.error("sshd: not running")
            return 1
        self._running = False
        if self._server_sock:
            try:
                self._server_sock.close()
            except Exception:
                pass
            self._server_sock = None
        with self._lock:
            for c in self._clients:
                try:
                    c["transport"].close()
                except Exception:
                    pass
            self._clients.clear()
        IOManager.write("sshd: stopped")
        return 0

    def status(self) -> int:
        if not self._running:
            IOManager.write("sshd: not running")
            return 0
        with self._lock:
            clients = list(self._clients)
        IOManager.write(
            f"sshd: running on port {self._port}\n"
            f"sshd: {len(clients)} client(s) connected"
        )
        for c in clients:
            addr = c.get("addr", ("?", 0))
            user = c.get("user", "?")
            IOManager.write(f"       {user}@{addr[0]}:{addr[1]}")
        return 0

    def keygen(self) -> int:
        try:
            import paramiko  # type: ignore
        except ImportError:
            IOManager.error("sshd: paramiko required — pip install paramiko")
            return 1
        os.makedirs(self.KEY_DIR, exist_ok=True)
        key = paramiko.RSAKey.generate(2048)
        key.write_private_key_file(self.HOST_KEY_PATH)
        if not IS_WINDOWS:
            os.chmod(self.HOST_KEY_PATH, 0o600)
        IOManager.write(
            f"sshd: new host key → {self.HOST_KEY_PATH}\n"
            f"sshd: fingerprint:   {self._fingerprint(key)}"
        )
        return 0

    # ------------------------------------------------------------------
    # Accept loop
    # ------------------------------------------------------------------

    def _accept_loop(self, paramiko) -> None:
        while self._running:
            try:
                conn, addr = self._server_sock.accept()
            except socket.timeout:
                continue
            except OSError:
                break
            threading.Thread(
                target=self._handle_client,
                args=(paramiko, conn, addr),
                daemon=True,
                name=f"sshd-{addr[0]}:{addr[1]}",
            ).start()

    # ------------------------------------------------------------------
    # Per-client handler
    # ------------------------------------------------------------------

    def _handle_client(self, paramiko, conn: socket.socket, addr: tuple) -> None:
        transport = None
        iface     = _SSHServerInterface()
        entry     = {"addr": addr, "transport": None, "user": "?"}

        try:
            transport = paramiko.Transport(conn)
            transport.add_server_key(self._host_key)
            transport.start_server(server=iface)

            entry["transport"] = transport
            with self._lock:
                self._clients.append(entry)

            chan = transport.accept(30)
            if chan is None:
                return

            iface.shell_event.wait(10)
            entry["user"] = iface.username or "?"

            if iface.exec_command:
                self._run_command(chan, iface.exec_command)
            else:
                self._run_shell(chan)

        except Exception:
            pass
        finally:
            if transport:
                try:
                    transport.close()
                except Exception:
                    pass
            try:
                conn.close()
            except Exception:
                pass
            with self._lock:
                self._clients[:] = [c for c in self._clients if c is not entry]

    def _run_command(self, chan, command: str) -> None:
        """Execute one command in a fresh shell and send output back."""
        import io as _io
        from ..kernel import Kernel
        from ..shell  import Shell as _Shell

        k  = Kernel()
        sh = _Shell(k)
        buf = _io.StringIO()
        old_out, old_err = sys.stdout, sys.stderr
        sys.stdout = sys.stderr = buf
        try:
            rc = sh.execute_line(command)
        finally:
            sys.stdout, sys.stderr = old_out, old_err

        output = buf.getvalue().replace("\n", "\r\n")
        try:
            if output:
                chan.send(output.encode("utf-8", errors="replace"))
            chan.send_exit_status(rc)
        except Exception:
            pass
        finally:
            chan.close()

    def _run_shell(self, chan) -> None:
        """Interactive shell session over the SSH channel."""
        import io as _io
        import re as _re
        from ..kernel import Kernel
        from ..shell  import Shell as _Shell

        k  = Kernel()
        sh = _Shell(k)

        chan.send(self.BANNER)

        try:
            while sh.running:
                # build and strip ANSI from prompt (client renders its own)
                raw_prompt = sh._prompt()
                plain_prompt = _re.sub(r'\033\[[0-9;]*[mGKHF]', '', raw_prompt)
                # also strip readline \001..\002 markers
                plain_prompt = plain_prompt.replace("\001", "").replace("\002", "")
                chan.send(plain_prompt.encode("utf-8", errors="replace"))

                # read one line char-by-char
                line = ""
                while True:
                    try:
                        data = chan.recv(1)
                    except Exception:
                        return
                    if not data:
                        return
                    ch = data.decode("utf-8", errors="replace")

                    if ch in ("\r", "\n"):
                        chan.send(b"\r\n")
                        break
                    elif ch == "\x03":       # Ctrl+C
                        chan.send(b"^C\r\n")
                        line = ""
                        break
                    elif ch == "\x04":       # Ctrl+D / EOF
                        chan.send(b"logout\r\n")
                        return
                    elif ch in ("\x7f", "\x08"):  # Backspace
                        if line:
                            line = line[:-1]
                            chan.send(b"\x08 \x08")
                    elif ord(ch) >= 32:
                        line += ch
                        chan.send(ch.encode())

                if not line.strip():
                    continue

                # capture output
                buf = _io.StringIO()
                old_out, old_err = sys.stdout, sys.stderr
                sys.stdout = sys.stderr = buf
                try:
                    sh.execute_line(line)
                finally:
                    sys.stdout, sys.stderr = old_out, old_err

                out = buf.getvalue()
                if out:
                    # convert bare \n to \r\n for SSH terminals
                    out = out.replace("\n", "\r\n")
                    chan.send(out.encode("utf-8", errors="replace"))

        except Exception:
            pass
        finally:
            try:
                chan.close()
            except Exception:
                pass

    # ------------------------------------------------------------------
    # Key management
    # ------------------------------------------------------------------

    def _load_or_generate_key(self, paramiko):
        os.makedirs(self.KEY_DIR, exist_ok=True)
        if os.path.isfile(self.HOST_KEY_PATH):
            try:
                key = paramiko.RSAKey(filename=self.HOST_KEY_PATH)
                IOManager.write(f"sshd: loaded host key from {self.HOST_KEY_PATH}")
                return key
            except Exception as e:
                IOManager.write(f"sshd: could not load host key ({e}), regenerating...")

        IOManager.write("sshd: generating RSA host key ...")
        try:
            key = paramiko.RSAKey.generate(2048)
            key.write_private_key_file(self.HOST_KEY_PATH)
            if not IS_WINDOWS:
                os.chmod(self.HOST_KEY_PATH, 0o600)
            IOManager.write(f"sshd: host key saved to {self.HOST_KEY_PATH}")
            return key
        except Exception as e:
            IOManager.error(f"sshd: failed to generate host key: {e}")
            return None

    @staticmethod
    def _fingerprint(key) -> str:
        data   = key.asbytes()
        digest = hashlib.md5(data).hexdigest()
        return ":".join(digest[i:i+2] for i in range(0, 32, 2))

A real SSH server built on paramiko's Transport layer.

Features

  • RSA host key (auto-generated, saved to ~/.linuxpp/ssh/host_rsa_key)
  • Password auth via OS database (PAM → spwd → single-user fallback)
  • Public key auth via ~/.ssh/authorized_keys
  • Each client gets an isolated linux++ shell session
  • Runs in a background daemon thread — non-blocking
  • Multiple concurrent clients supported

Commands

sshd start [port] start (default port 2222) sshd stop stop and disconnect all clients sshd status show port and connected clients sshd keygen regenerate the host key

Requires: pip install paramiko

Class variables

var BANNER

The type of the None singleton.

var DEFAULT_PORT

The type of the None singleton.

var HOST_KEY_PATH

The type of the None singleton.

var KEY_DIR

The type of the None singleton.

Methods

def keygen(self) ‑> int
Expand source code
def keygen(self) -> int:
    try:
        import paramiko  # type: ignore
    except ImportError:
        IOManager.error("sshd: paramiko required — pip install paramiko")
        return 1
    os.makedirs(self.KEY_DIR, exist_ok=True)
    key = paramiko.RSAKey.generate(2048)
    key.write_private_key_file(self.HOST_KEY_PATH)
    if not IS_WINDOWS:
        os.chmod(self.HOST_KEY_PATH, 0o600)
    IOManager.write(
        f"sshd: new host key → {self.HOST_KEY_PATH}\n"
        f"sshd: fingerprint:   {self._fingerprint(key)}"
    )
    return 0
def start(self, port: int = 2222) ‑> int
Expand source code
def start(self, port: int = DEFAULT_PORT) -> int:
    if self._running:
        IOManager.error(f"sshd: already running on port {self._port}")
        return 1
    try:
        import paramiko  # type: ignore
    except ImportError:
        IOManager.error(
            "sshd: paramiko is required.\n"
            "      pip install paramiko"
        )
        return 1

    self._port     = port
    self._host_key = self._load_or_generate_key(paramiko)
    if self._host_key is None:
        return 1

    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        sock.bind(("0.0.0.0", port))
        sock.listen(10)
        sock.settimeout(1.0)
        self._server_sock = sock
    except OSError as e:
        IOManager.error(f"sshd: cannot bind port {port}: {e}")
        return 1

    self._running = True
    self._thread  = threading.Thread(
        target=self._accept_loop,
        args=(paramiko,),
        daemon=True,
        name="sshd-accept",
    )
    self._thread.start()

    IOManager.write(
        f"sshd: listening on 0.0.0.0:{port}\n"
        f"sshd: host key fingerprint: {self._fingerprint(self._host_key)}\n"
        f"sshd: connect with:  ssh {EnvManager.username()}@localhost -p {port}"
    )
    return 0
def status(self) ‑> int
Expand source code
def status(self) -> int:
    if not self._running:
        IOManager.write("sshd: not running")
        return 0
    with self._lock:
        clients = list(self._clients)
    IOManager.write(
        f"sshd: running on port {self._port}\n"
        f"sshd: {len(clients)} client(s) connected"
    )
    for c in clients:
        addr = c.get("addr", ("?", 0))
        user = c.get("user", "?")
        IOManager.write(f"       {user}@{addr[0]}:{addr[1]}")
    return 0
def stop(self) ‑> int
Expand source code
def stop(self) -> int:
    if not self._running:
        IOManager.error("sshd: not running")
        return 1
    self._running = False
    if self._server_sock:
        try:
            self._server_sock.close()
        except Exception:
            pass
        self._server_sock = None
    with self._lock:
        for c in self._clients:
            try:
                c["transport"].close()
            except Exception:
                pass
        self._clients.clear()
    IOManager.write("sshd: stopped")
    return 0