From 94cf528c9ffbfae3fa62472fd60232900bab4609 Mon Sep 17 00:00:00 2001 From: EthanShoeDev <13422990+EthanShoeDev@users.noreply.github.com> Date: Mon, 22 Sep 2025 00:15:44 -0400 Subject: [PATCH] g --- .../rust/uniffi-russh/src/lib.rs | 1028 ----------------- .../rust/uniffi-russh/src/private_key.rs | 203 ++++ .../rust/uniffi-russh/src/ssh_connection.rs | 303 +++++ .../rust/uniffi-russh/src/ssh_shell.rs | 508 ++++++++ .../rust/uniffi-russh/src/utils.rs | 49 + 5 files changed, 1063 insertions(+), 1028 deletions(-) create mode 100644 packages/react-native-uniffi-russh/rust/uniffi-russh/src/private_key.rs create mode 100644 packages/react-native-uniffi-russh/rust/uniffi-russh/src/ssh_connection.rs create mode 100644 packages/react-native-uniffi-russh/rust/uniffi-russh/src/ssh_shell.rs create mode 100644 packages/react-native-uniffi-russh/rust/uniffi-russh/src/utils.rs diff --git a/packages/react-native-uniffi-russh/rust/uniffi-russh/src/lib.rs b/packages/react-native-uniffi-russh/rust/uniffi-russh/src/lib.rs index 41799f2..acb9fcb 100644 --- a/packages/react-native-uniffi-russh/rust/uniffi-russh/src/lib.rs +++ b/packages/react-native-uniffi-russh/rust/uniffi-russh/src/lib.rs @@ -28,1031 +28,3 @@ use ed25519_dalek::SigningKey; uniffi::setup_scaffolding!(); - -#[derive(Debug, Clone, PartialEq, uniffi::Enum)] -pub enum Security { - Password { password: String }, - Key { private_key_content: String }, // (key-based auth can be wired later) -} - -#[derive(Debug, Clone, PartialEq, uniffi::Record)] -pub struct ConnectionDetails { - pub host: String, - pub port: u16, - pub username: String, - pub security: Security, -} - -#[derive(Clone, uniffi::Record)] -pub struct ConnectOptions { - pub connection_details: ConnectionDetails, - pub on_connection_progress_callback: Option>, - pub on_disconnected_callback: Option>, -} - -#[derive(Debug, Clone, Copy, PartialEq, uniffi::Enum)] -pub enum SshConnectionProgressEvent { - // Before any progress events, assume: TcpConnecting - TcpConnected, - SshHandshake, - // If promise has not resolved, assume: Authenticating - // After promise resolves, assume: Connected -} - -#[uniffi::export(with_foreign)] -pub trait ConnectProgressCallback: Send + Sync { - fn on_change(&self, status: SshConnectionProgressEvent); -} - -#[uniffi::export(with_foreign)] -pub trait ConnectionDisconnectedCallback: Send + Sync { - fn on_change(&self, connection_id: String); -} - -// Note: russh accepts an untyped string for the terminal type -#[derive(Debug, Clone, Copy, PartialEq, uniffi::Enum)] -pub enum TerminalType { - Vanilla, - Vt100, - Vt102, - Vt220, - Ansi, - Xterm, - Xterm256, -} -impl TerminalType { - fn as_ssh_name(self) -> &'static str { - match self { - TerminalType::Vanilla => "vanilla", - TerminalType::Vt100 => "vt100", - TerminalType::Vt102 => "vt102", - TerminalType::Vt220 => "vt220", - TerminalType::Ansi => "ansi", - TerminalType::Xterm => "xterm", - TerminalType::Xterm256 => "xterm-256color", - } - } -} - -#[derive(Debug, Clone, Copy, PartialEq, uniffi::Enum)] -pub enum StreamKind { Stdout, Stderr } - -#[derive(Debug, Clone, PartialEq, uniffi::Record)] -pub struct TerminalChunk { - pub seq: u64, - pub t_ms: f64, - pub stream: StreamKind, - pub bytes: Vec, -} - -#[derive(Debug, Clone, PartialEq, uniffi::Record)] -pub struct DroppedRange { pub from_seq: u64, pub to_seq: u64 } - -#[derive(Debug, Clone, PartialEq, uniffi::Enum)] -pub enum ShellEvent { - Chunk(TerminalChunk), - Dropped { from_seq: u64, to_seq: u64 }, -} - -#[uniffi::export(with_foreign)] -pub trait ShellListener: Send + Sync { - fn on_event(&self, ev: ShellEvent); -} - -/// Key types for generation -#[derive(Debug, Clone, Copy, PartialEq, uniffi::Enum)] -pub enum KeyType { - Rsa, - Ecdsa, - Ed25519, - Ed448, -} - -#[derive(Debug, Clone, Copy, PartialEq, uniffi::Record)] -pub struct TerminalMode { - pub opcode: u8, // PTY opcode (matches russh::Pty discriminants) - pub value: u32, -} - -#[derive(Debug, Clone, Copy, PartialEq, uniffi::Record)] -pub struct TerminalSize { - pub row_height: Option, - pub col_width: Option, -} - -#[derive(Debug, Clone, Copy, PartialEq, uniffi::Record)] -pub struct TerminalPixelSize { - pub pixel_width: Option, - pub pixel_height: Option, -} - -#[derive(Clone, uniffi::Record)] -pub struct StartShellOptions { - pub term: TerminalType, - pub terminal_mode: Option>, - pub terminal_size: Option, - pub terminal_pixel_size: Option, - pub on_closed_callback: Option>, -} - -#[uniffi::export(with_foreign)] -pub trait ShellClosedCallback: Send + Sync { - fn on_change(&self, channel_id: u32); -} - -#[derive(Debug, Clone, PartialEq, uniffi::Record)] -pub struct SshConnectionInfoProgressTimings { - // TODO: We should have a field for each SshConnectionProgressEvent. Would be great if this were enforced by the compiler. - pub tcp_established_at_ms: f64, - pub ssh_handshake_at_ms: f64, -} - -/// Snapshot of current connection info for property-like access in TS. -#[derive(Debug, Clone, PartialEq, uniffi::Record)] -pub struct SshConnectionInfo { - pub connection_id: String, - pub connection_details: ConnectionDetails, - pub created_at_ms: f64, - pub connected_at_ms: f64, - pub progress_timings: SshConnectionInfoProgressTimings, -} - -/// Snapshot of shell session info for property-like access in TS. -#[derive(Debug, Clone, PartialEq, uniffi::Record)] -pub struct ShellSessionInfo { - pub channel_id: u32, - pub created_at_ms: f64, - pub connected_at_ms: f64, - pub term: TerminalType, - pub connection_id: String, -} - - -#[derive(uniffi::Object)] -pub struct SshConnection { - info: SshConnectionInfo, - on_disconnected_callback: Option>, - client_handle: AsyncMutex>, - - shells: AsyncMutex>>, - - // Weak self for child sessions to refer back without cycles. - self_weak: AsyncMutex>, -} - -#[derive(uniffi::Object)] -pub struct ShellSession { - info: ShellSessionInfo, - on_closed_callback: Option>, - - // Weak backref; avoid retain cycle. - parent: std::sync::Weak, - - writer: AsyncMutex>, - // We keep the reader task to allow cancellation on close. - reader_task: tokio::task::JoinHandle<()>, - - // Ring buffer - ring: Arc>>>, - ring_bytes_capacity: Arc, - used_bytes: Arc>, - dropped_bytes_total: Arc, - head_seq: Arc, - tail_seq: Arc, - - // Live broadcast - sender: broadcast::Sender>, - - // Listener tasks management - listener_tasks: Arc>>>, - next_listener_id: AtomicU64, - default_coalesce_ms: AtomicU64, - rt_handle: tokio::runtime::Handle, -} - - -#[derive(Debug, Clone, PartialEq, uniffi::Enum)] -pub enum Cursor { - Head, // start from the beginning - TailBytes { bytes: u64 }, // start from the end of the last N bytes - Seq { seq: u64 }, // start from the given sequence number - TimeMs { t_ms: f64 }, // start from the given time in milliseconds - Live, // start from the live stream -} - -#[derive(Debug, Clone, PartialEq, uniffi::Record)] -pub struct ListenerOptions { - pub cursor: Cursor, - pub coalesce_ms: Option, // coalesce chunks into this many milliseconds -} - -#[derive(Debug, Clone, PartialEq, uniffi::Record)] -pub struct BufferReadResult { - pub chunks: Vec, - pub next_seq: u64, - pub dropped: Option, -} - -#[derive(Debug, Clone, PartialEq, uniffi::Record)] -pub struct BufferStats { - pub ring_bytes_count: u64, - pub used_bytes: u64, - pub head_seq: u64, - pub tail_seq: u64, - pub dropped_bytes_total: u64, - - pub chunks_count: u64, -} - - - -impl fmt::Debug for SshConnection { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("SshConnectionHandle") - .field("info.connection_details", &self.info.connection_details) - .field("info.created_at_ms", &self.info.created_at_ms) - .field("info.connected_at_ms", &self.info.connected_at_ms) - .finish() - } -} - -// Internal chunk type kept in ring/broadcast -#[derive(Debug)] -struct Chunk { // TODO: This is very similar to TerminalChunk. The only difference is the bytes field - seq: u64, - t_ms: f64, - stream: StreamKind, - bytes: Bytes, -} - -/// Minimal client::Handler. -struct NoopHandler; -impl client::Handler for NoopHandler { - type Error = SshError; - // Accept any server key for now so dev UX isn't blocked. - // TODO: Add known-hosts verification and surface API to control this. - #[allow(unused_variables)] - fn check_server_key( - &mut self, - _server_public_key: &russh::keys::PublicKey, - ) -> impl std::future::Future::Error>> + std::marker::Send { - std::future::ready(Ok(true)) - } -} - - -/// ---------- Methods ---------- -static DEFAULT_TERMINAL_MODES: &[(russh::Pty, u32)] = &[ - (russh::Pty::ECHO, 1), // This will cause the terminal to echo the characters back to the client. - (russh::Pty::ECHOK, 1), // After the line-kill character (often Ctrl+U), echo a newline. - (russh::Pty::ECHOE, 1), // Visually erase on backspace (erase using BS-SP-BS sequence). - (russh::Pty::ICANON, 1), // Canonical (cooked) mode: line editing; input delivered line-by-line. - (russh::Pty::ISIG, 1), // Generate signals on special chars (e.g., Ctrl+C -> SIGINT, Ctrl+Z -> SIGTSTP). - (russh::Pty::ICRNL, 1), // Convert carriage return (CR, \r) to newline (NL, \n) on input. - (russh::Pty::ONLCR, 1), // Convert newline (NL) to CR+NL on output (LF -> CRLF). - (russh::Pty::TTY_OP_ISPEED, 38400), // Set input baud rate (here 38400). The baud rate is the number of characters per second. - (russh::Pty::TTY_OP_OSPEED, 38400), // Set output baud rate (here 38400). The baud rate is the number of characters per second. -]; - -static DEFAULT_TERM_ROW_HEIGHT: u32 = 24; -static DEFAULT_TERM_COL_WIDTH: u32 = 80; -static DEFAULT_TERM_PIXEL_WIDTH: u32 = 0; -static DEFAULT_TERM_PIXEL_HEIGHT: u32 = 0; -static DEFAULT_TERM_COALESCE_MS: u64 = 16; - -// Number of recent live chunks retained by the broadcast channel for each -// subscriber. If a subscriber falls behind this many messages, they will get a -// Lagged error and skip to the latest. Tune to: peak_chunks_per_sec × max_pause_sec. -static DEFAULT_BROADCAST_CHUNK_CAPACITY: usize = 1024; - -// Byte budget for the on-heap replay/history ring buffer. When the total bytes -// of stored chunks exceed this, oldest chunks are evicted. Increase for a -// longer replay window at the cost of memory. -static DEFAULT_SHELL_RING_BUFFER_CAPACITY: usize = 2 * 1024 * 1024; // default 2MiB - -// Upper bound for the size of a single appended/broadcast chunk. Incoming data -// is split into slices no larger than this. Smaller values reduce latency and -// loss impact; larger values reduce per-message overhead. -static DEFAULT_MAX_CHUNK_SIZE: usize = 16 * 1024; // 16KB - -static DEFAULT_READ_BUFFER_MAX_BYTES: u64 = 512 * 1024; // 512KB - -#[uniffi::export(async_runtime = "tokio")] -impl SshConnection { - /// Convenience snapshot for property-like access in TS. - pub fn get_info(&self) -> SshConnectionInfo { - self.info.clone() - } - - pub async fn start_shell(&self, opts: StartShellOptions) -> Result, SshError> { - - let started_at_ms = now_ms(); - - let term = opts.term; - let on_closed_callback = opts.on_closed_callback.clone(); - - let client_handle = self.client_handle.lock().await; - - let ch = client_handle.channel_open_session().await?; - let channel_id: u32 = ch.id().into(); - - let mut modes: Vec<(russh::Pty, u32)> = DEFAULT_TERMINAL_MODES.to_vec(); - if let Some(terminal_mode_params) = &opts.terminal_mode { - for m in terminal_mode_params { - if let Some(pty) = russh::Pty::from_u8(m.opcode) { - if let Some(pos) = modes.iter().position(|(p, _)| *p as u8 == m.opcode) { - modes[pos].1 = m.value; // override - } else { - modes.push((pty, m.value)); // add - } - } - } - } - - let row_height = opts.terminal_size.as_ref().and_then(|s| s.row_height).unwrap_or(DEFAULT_TERM_ROW_HEIGHT); - let col_width = opts.terminal_size.as_ref().and_then(|s| s.col_width).unwrap_or(DEFAULT_TERM_COL_WIDTH); - let pixel_width = opts.terminal_pixel_size.as_ref().and_then(|s| s.pixel_width).unwrap_or(DEFAULT_TERM_PIXEL_WIDTH); - let pixel_height= opts.terminal_pixel_size.as_ref().and_then(|s| s.pixel_height).unwrap_or(DEFAULT_TERM_PIXEL_HEIGHT); - - ch.request_pty(true, term.as_ssh_name(), col_width, row_height, pixel_width, pixel_height, &modes).await?; - ch.request_shell(true).await?; - - // Split for read/write; spawn reader. - let (mut reader, writer) = ch.split(); - - // Setup ring + broadcast for this session - let (tx, _rx) = broadcast::channel::>(DEFAULT_BROADCAST_CHUNK_CAPACITY); - let ring = Arc::new(Mutex::new(std::collections::VecDeque::>::new())); - let used_bytes = Arc::new(Mutex::new(0usize)); - let next_seq = Arc::new(AtomicU64::new(1)); - let head_seq = Arc::new(AtomicU64::new(1)); - let tail_seq = Arc::new(AtomicU64::new(0)); - let dropped_bytes_total = Arc::new(AtomicU64::new(0)); - let ring_bytes_capacity = Arc::new(AtomicUsize::new(DEFAULT_SHELL_RING_BUFFER_CAPACITY)); - let default_coalesce_ms = AtomicU64::new(DEFAULT_TERM_COALESCE_MS); - - let ring_clone = ring.clone(); - let used_bytes_clone = used_bytes.clone(); - let tx_clone = tx.clone(); - let ring_bytes_capacity_c = ring_bytes_capacity.clone(); - let dropped_bytes_total_c = dropped_bytes_total.clone(); - let head_seq_c = head_seq.clone(); - let tail_seq_c = tail_seq.clone(); - let next_seq_c = next_seq.clone(); - - let on_closed_callback_for_reader = on_closed_callback.clone(); - - let reader_task = tokio::spawn(async move { - let max_chunk = DEFAULT_MAX_CHUNK_SIZE; - loop { - match reader.wait().await { - Some(ChannelMsg::Data { data }) => { - append_and_broadcast( - &data, - StreamKind::Stdout, - &ring_clone, - &used_bytes_clone, - &ring_bytes_capacity_c, - &dropped_bytes_total_c, - &head_seq_c, - &tail_seq_c, - &next_seq_c, - &tx_clone, - max_chunk, - ); - } - Some(ChannelMsg::ExtendedData { data, .. }) => { - append_and_broadcast( - &data, - StreamKind::Stderr, - &ring_clone, - &used_bytes_clone, - &ring_bytes_capacity_c, - &dropped_bytes_total_c, - &head_seq_c, - &tail_seq_c, - &next_seq_c, - &tx_clone, - max_chunk, - ); - } - Some(ChannelMsg::Close) | None => { - if let Some(sl) = on_closed_callback_for_reader.as_ref() { - sl.on_change(channel_id); - } - break; - } - _ => {} - } - } - }); - - let session = Arc::new(ShellSession { - info: ShellSessionInfo { - channel_id, - created_at_ms: started_at_ms, - connected_at_ms: now_ms(), - term, - connection_id: self.info.connection_id.clone(), - }, - on_closed_callback, - parent: self.self_weak.lock().await.clone(), - - writer: AsyncMutex::new(writer), - reader_task, - - // Ring buffer - ring, - ring_bytes_capacity, - used_bytes, - dropped_bytes_total, - head_seq, - tail_seq, - - // Listener tasks management - sender: tx, - listener_tasks: Arc::new(Mutex::new(HashMap::new())), - next_listener_id: AtomicU64::new(1), - default_coalesce_ms, - rt_handle: tokio::runtime::Handle::current(), - }); - - self.shells.lock().await.insert(channel_id, session.clone()); - - - Ok(session) - } - - - pub async fn disconnect(&self) -> Result<(), SshError> { - // TODO: Check if we need to close all these if we are about to disconnect? - let sessions: Vec> = { - let map = self.shells.lock().await; - map.values().cloned().collect() - }; - for s in sessions { - s.close().await?; - } - - let h = self.client_handle.lock().await; - h.disconnect(Disconnect::ByApplication, "bye", "").await?; - - if let Some(on_disconnected_callback) = self.on_disconnected_callback.as_ref() { - on_disconnected_callback.on_change(self.info.connection_id.clone()); - } - - Ok(()) - } -} - -#[uniffi::export(async_runtime = "tokio")] -impl ShellSession { - pub fn get_info(&self) -> ShellSessionInfo { - self.info.clone() - } - - /// Send bytes to the active shell (stdin). - pub async fn send_data(&self, data: Vec) -> Result<(), SshError> { - let w = self.writer.lock().await; - w.data(&data[..]).await?; - Ok(()) - } - - /// Close the associated shell channel and stop its reader task. - pub async fn close(&self) -> Result<(), SshError> { self.close_internal().await } - - /// Buffer statistics snapshot. - pub fn buffer_stats(&self) -> BufferStats { - let used = *self.used_bytes.lock().unwrap_or_else(|p| p.into_inner()) as u64; - let chunks_count = match self.ring.lock() { Ok(q) => q.len() as u64, Err(p) => p.into_inner().len() as u64 }; - BufferStats { - ring_bytes_count: self.ring_bytes_capacity.load(Ordering::Relaxed) as u64, - used_bytes: used, - chunks_count, - head_seq: self.head_seq.load(Ordering::Relaxed), - tail_seq: self.tail_seq.load(Ordering::Relaxed), - dropped_bytes_total: self.dropped_bytes_total.load(Ordering::Relaxed), - } - } - - /// Current next sequence number. - pub fn current_seq(&self) -> u64 { self.tail_seq.load(Ordering::Relaxed).saturating_add(1) } - - /// Read the ring buffer from a cursor. - pub fn read_buffer(&self, cursor: Cursor, max_bytes: Option) -> BufferReadResult { - let max_total = max_bytes.unwrap_or(DEFAULT_READ_BUFFER_MAX_BYTES) as usize; - let mut out_chunks: Vec = Vec::new(); - let mut dropped: Option = None; - let head_seq_now = self.head_seq.load(Ordering::Relaxed); - let tail_seq_now = self.tail_seq.load(Ordering::Relaxed); - - // Lock ring to determine start and collect arcs, then drop lock. - let (_start_idx_unused, _start_seq, arcs): (usize, u64, Vec>) = { - let ring = match self.ring.lock() { Ok(g) => g, Err(p) => p.into_inner() }; - let (start_seq, idx) = match cursor { - Cursor::Head => (head_seq_now, 0usize), - Cursor::Seq { seq: mut s } => { - if s < head_seq_now { dropped = Some(DroppedRange { from_seq: s, to_seq: head_seq_now - 1 }); s = head_seq_now; } - let idx = s.saturating_sub(head_seq_now) as usize; - (s, idx.min(ring.len())) - } - Cursor::TimeMs { t_ms: t } => { - // linear scan to find first chunk with t_ms >= t - let mut idx = 0usize; let mut s = head_seq_now; - for (i, ch) in ring.iter().enumerate() { if ch.t_ms >= t { idx = i; s = ch.seq; break; } } - (s, idx) - } - Cursor::TailBytes { bytes: n } => { - // Walk from tail backwards until approx n bytes, then forward. - let mut bytes = 0usize; let mut idx = ring.len(); - for i in (0..ring.len()).rev() { - let b = ring[i].bytes.len(); - if bytes >= n as usize { idx = i + 1; break; } - bytes += b; idx = i; - } - let s = if idx < ring.len() { ring[idx].seq } else { tail_seq_now.saturating_add(1) }; - (s, idx) - } - Cursor::Live => (tail_seq_now.saturating_add(1), ring.len()), - }; - let arcs: Vec> = ring.iter().skip(idx).cloned().collect(); - (idx, start_seq, arcs) - }; - - // Build output respecting max_bytes - let mut total = 0usize; - for ch in arcs { - let len = ch.bytes.len(); - if total + len > max_total { break; } - out_chunks.push(TerminalChunk { seq: ch.seq, t_ms: ch.t_ms, stream: ch.stream, bytes: ch.bytes.clone().to_vec() }); - total += len; - } - let next_seq = if let Some(last) = out_chunks.last() { last.seq + 1 } else { tail_seq_now.saturating_add(1) }; - BufferReadResult { chunks: out_chunks, next_seq, dropped } - } - - /// Add a listener with optional replay and live follow. - pub fn add_listener(&self, listener: Arc, opts: ListenerOptions) -> Result { - // Snapshot for replay; emit from task to avoid re-entrant callbacks during FFI. - let replay = self.read_buffer(opts.cursor.clone(), None); - let mut rx = self.sender.subscribe(); - let id = self.next_listener_id.fetch_add(1, Ordering::Relaxed); - let default_coalesce_ms = self.default_coalesce_ms.load(Ordering::Relaxed) as u32; - let coalesce_ms = opts.coalesce_ms.unwrap_or(default_coalesce_ms); - - let rt = self.rt_handle.clone(); - let handle = rt.spawn(async move { - // Emit replay first - if let Some(dr) = replay.dropped.as_ref() { - listener.on_event(ShellEvent::Dropped { from_seq: dr.from_seq, to_seq: dr.to_seq }); - } - for ch in replay.chunks.into_iter() { - listener.on_event(ShellEvent::Chunk(ch)); - } - - let mut last_seq_seen: u64 = replay.next_seq.saturating_sub(1); - let mut acc: Vec = Vec::new(); - let mut acc_stream: Option; - let mut acc_last_seq: u64; - let mut acc_last_t: f64; - let window = Duration::from_millis(coalesce_ms as u64); - let mut pending_drop_from: Option = None; - - loop { - // First receive an item - let first = match rx.recv().await { - Ok(c) => c, - Err(broadcast::error::RecvError::Lagged(_n)) => { pending_drop_from = Some(last_seq_seen.saturating_add(1)); continue; } - Err(broadcast::error::RecvError::Closed) => break, - }; - if let Some(from) = pending_drop_from.take() { - if from <= first.seq.saturating_sub(1) { - listener.on_event(ShellEvent::Dropped { from_seq: from, to_seq: first.seq - 1 }); - } - } - // Start accumulating - acc.clear(); acc_stream = Some(first.stream); acc_last_seq = first.seq; acc_last_t = first.t_ms; acc.extend_from_slice(&first.bytes); - last_seq_seen = first.seq; - - // Drain within window while same stream - let mut deadline = tokio::time::Instant::now() + window; - loop { - let timeout = tokio::time::sleep_until(deadline); - tokio::pin!(timeout); - tokio::select! { - _ = &mut timeout => break, - msg = rx.recv() => { - match msg { - Ok(c) => { - if Some(c.stream) == acc_stream { acc.extend_from_slice(&c.bytes); acc_last_seq = c.seq; acc_last_t = c.t_ms; last_seq_seen = c.seq; } - else { // flush and start new - let chunk = TerminalChunk { seq: acc_last_seq, t_ms: acc_last_t, stream: acc_stream.unwrap_or(StreamKind::Stdout), bytes: std::mem::take(&mut acc) }; - listener.on_event(ShellEvent::Chunk(chunk)); - acc_stream = Some(c.stream); acc_last_seq = c.seq; acc_last_t = c.t_ms; acc.extend_from_slice(&c.bytes); last_seq_seen = c.seq; - deadline = tokio::time::Instant::now() + window; - } - } - Err(broadcast::error::RecvError::Lagged(_n)) => { pending_drop_from = Some(last_seq_seen.saturating_add(1)); break; } - Err(broadcast::error::RecvError::Closed) => { break; } - } - } - } - } - if let Some(s) = acc_stream.take() { - let chunk = TerminalChunk { seq: acc_last_seq, t_ms: acc_last_t, stream: s, bytes: std::mem::take(&mut acc) }; - listener.on_event(ShellEvent::Chunk(chunk)); - } - } - }); - if let Ok(mut map) = self.listener_tasks.lock() { map.insert(id, handle); } - Ok(id) - } - - pub fn remove_listener(&self, id: u64) { - if let Ok(mut map) = self.listener_tasks.lock() { - if let Some(h) = map.remove(&id) { h.abort(); } - } - } -} - -// Internal lifecycle helpers (not exported via UniFFI) -impl ShellSession { - async fn close_internal(&self) -> Result<(), SshError> { - // Try to close channel gracefully; ignore error. - self.writer.lock().await.close().await.ok(); - self.reader_task.abort(); - if let Some(sl) = self.on_closed_callback.as_ref() { - sl.on_change(self.info.channel_id); - } - // Clear parent's notion of active shell if it matches us. - if let Some(parent) = self.parent.upgrade() { - parent.shells.lock().await.remove(&self.info.channel_id); - } - Ok(()) - } - - // /// This was on the public interface but I don't think we need it - // pub async fn set_buffer_policy(&self, ring_bytes: Option, coalesce_ms: Option) { - // if let Some(rb) = ring_bytes { self.ring_bytes_capacity.store(rb as usize, Ordering::Relaxed); self.evict_if_needed(); } - // if let Some(cm) = coalesce_ms { self.default_coalesce_ms.store(cm as u64, Ordering::Relaxed); } - // } - - // fn evict_if_needed(&self) { - // let cap = self.ring_bytes_capacity.load(Ordering::Relaxed); - // let mut ring = match self.ring.lock() { Ok(g) => g, Err(p) => p.into_inner() }; - // let mut used = self.used_bytes.lock().unwrap_or_else(|p| p.into_inner()); - // while *used > cap { - // if let Some(front) = ring.pop_front() { - // *used -= front.bytes.len(); - // self.dropped_bytes_total.fetch_add(front.bytes.len() as u64, Ordering::Relaxed); - // self.head_seq.store(front.seq.saturating_add(1), Ordering::Relaxed); - // } else { break; } - // } - // } -} - -/// ---------- Top-level API ---------- - -#[uniffi::export(async_runtime = "tokio")] -pub async fn connect(options: ConnectOptions) -> Result, SshError> { - let started_at_ms = now_ms(); - let details = ConnectionDetails { - host: options.connection_details.host.clone(), - port: options.connection_details.port, - username: options.connection_details.username.clone(), - security: options.connection_details.security.clone(), - }; - - - // TCP - let addr = format!("{}:{}", details.host, details.port); - let socket = tokio::net::TcpStream::connect(&addr).await?; - let local_port = socket.local_addr()?.port(); - - let tcp_established_at_ms = now_ms(); - if let Some(sl) = options.on_connection_progress_callback.as_ref() { - sl.on_change(SshConnectionProgressEvent::TcpConnected); - } - let cfg = Arc::new(Config::default()); - let mut handle: ClientHandle = russh::client::connect_stream(cfg, socket, NoopHandler).await?; - let ssh_handshake_at_ms = now_ms(); - if let Some(sl) = options.on_connection_progress_callback.as_ref() { - sl.on_change(SshConnectionProgressEvent::SshHandshake); - } - let auth_result = match &details.security { - Security::Password { password } => { - handle.authenticate_password(details.username.clone(), password.clone()).await? - } - Security::Key { private_key_content } => { - // Normalize and parse using shared helper so RN-validated keys match runtime parsing. - let (_canonical, parsed) = normalize_openssh_ed25519_seed_key(private_key_content)?; - let pk_with_hash = PrivateKeyWithHashAlg::new(Arc::new(parsed), None); - handle.authenticate_publickey(details.username.clone(), pk_with_hash).await? - } - }; - if !matches!(auth_result, russh::client::AuthResult::Success) { return Err(auth_result.into()); } - - let connection_id = format!("{}@{}:{}:{}", details.username, details.host, details.port, local_port); - let conn = Arc::new(SshConnection { - info: SshConnectionInfo { - connection_id, - connection_details: details, - created_at_ms: started_at_ms, - connected_at_ms: now_ms(), - progress_timings: SshConnectionInfoProgressTimings { tcp_established_at_ms, ssh_handshake_at_ms }, - }, - client_handle: AsyncMutex::new(handle), - shells: AsyncMutex::new(HashMap::new()), - self_weak: AsyncMutex::new(Weak::new()), - on_disconnected_callback: options.on_disconnected_callback.clone(), - }); - // Initialize weak self reference. - *conn.self_weak.lock().await = Arc::downgrade(&conn); - Ok(conn) -} - -#[uniffi::export] -pub fn validate_private_key(private_key_content: String) -> Result { - // Normalize and parse once; return canonical OpenSSH string. - let (canonical, _parsed) = normalize_openssh_ed25519_seed_key(&private_key_content)?; - Ok(canonical) -} - -#[uniffi::export] -pub fn generate_key_pair(key_type: KeyType) -> Result { - let mut rng = OsRng; - let key = match key_type { - KeyType::Rsa => russh_keys::PrivateKey::random(&mut rng, Algorithm::Rsa { hash: None })?, - KeyType::Ecdsa => russh_keys::PrivateKey::random(&mut rng, Algorithm::Ecdsa { curve: EcdsaCurve::NistP256 })?, - KeyType::Ed25519 => russh_keys::PrivateKey::random(&mut rng, Algorithm::Ed25519)?, - KeyType::Ed448 => return Err(SshError::UnsupportedKeyType), - }; - Ok(key.to_openssh(LineEnding::LF)?.to_string()) -} - -fn now_ms() -> f64 { - let d = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap_or_default(); - d.as_millis() as f64 -} - -#[allow(clippy::too_many_arguments)] -fn append_and_broadcast( - data: &[u8], - stream: StreamKind, - ring: &Arc>>>, - used_bytes: &Arc>, - ring_bytes_capacity: &Arc, - dropped_bytes_total: &Arc, - head_seq: &Arc, - tail_seq: &Arc, - next_seq: &Arc, - sender: &broadcast::Sender>, - max_chunk: usize, -) { - let mut offset = 0usize; - while offset < data.len() { - let end = (offset + max_chunk).min(data.len()); - let slice = &data[offset..end]; - let seq = next_seq.fetch_add(1, Ordering::Relaxed); - let t_ms = now_ms(); - let chunk = Arc::new(Chunk { seq, t_ms, stream, bytes: Bytes::copy_from_slice(slice) }); - // push to ring - { - let mut q = match ring.lock() { Ok(g) => g, Err(p) => p.into_inner() }; - q.push_back(chunk.clone()); - } - { - let mut used = used_bytes.lock().unwrap_or_else(|p| p.into_inner()); - *used += slice.len(); - tail_seq.store(seq, Ordering::Relaxed); - // evict if needed - let cap = ring_bytes_capacity.load(Ordering::Relaxed); - if *used > cap { - let mut q = match ring.lock() { Ok(g) => g, Err(p) => p.into_inner() }; - while *used > cap { - if let Some(front) = q.pop_front() { - *used -= front.bytes.len(); - dropped_bytes_total.fetch_add(front.bytes.len() as u64, Ordering::Relaxed); - head_seq.store(front.seq.saturating_add(1), Ordering::Relaxed); - } else { break; } - } - } - } - // broadcast - let _ = sender.send(chunk); - - offset = end; - } -} - - -// TODO: Split this into different errors for each public function -#[derive(Debug, Error, uniffi::Error)] -pub enum SshError { - #[error("Disconnected")] - Disconnected, - #[error("Unsupported key type")] - UnsupportedKeyType, - #[error("Auth failed: {0}")] - Auth(String), - #[error("Shell already running")] - ShellAlreadyRunning, - #[error("russh error: {0}")] - Russh(String), - #[error("russh-keys error: {0}")] - RusshKeys(String), -} -impl From for SshError { - fn from(e: russh::Error) -> Self { SshError::Russh(e.to_string()) } -} -impl From for SshError { - fn from(e: russh_keys::Error) -> Self { SshError::RusshKeys(e.to_string()) } -} -impl From for SshError { - fn from(e: ssh_key::Error) -> Self { SshError::RusshKeys(e.to_string()) } -} -impl From for SshError { - fn from(e: russh_ssh_key::Error) -> Self { SshError::RusshKeys(e.to_string()) } -} -impl From for SshError { - fn from(e: std::io::Error) -> Self { SshError::Russh(e.to_string()) } -} -impl From for SshError { - fn from(a: russh::client::AuthResult) -> Self { - SshError::Auth(format!("{a:?}")) - } -} - - -// Best-effort fix for OpenSSH ed25519 keys that store only a 32-byte seed in -// the private section (instead of 64 bytes consisting of seed || public). -// If the input matches an unencrypted OpenSSH ed25519 key with a 32-byte -// private field, this function returns a normalized PEM string with the -// correct 64-byte private field (seed || public). Otherwise, returns None. -fn normalize_openssh_ed25519_seed_key( - input: &str, -) -> Result<(String, russh::keys::PrivateKey), russh_ssh_key::Error> { - // If it already parses, return canonical string and parsed key. - if let Ok(parsed) = russh::keys::PrivateKey::from_openssh(input) { - let canonical = parsed.to_openssh(russh_ssh_key::LineEnding::LF)?.to_string(); - return Ok((canonical, parsed)); - } - - // Try to fix seed-only Ed25519 keys and re-parse. - fn try_fix_seed_only_ed25519(input: &str) -> Option { - // Minimal OpenSSH container parse to detect seed-only Ed25519 - const HEADER: &str = "-----BEGIN OPENSSH PRIVATE KEY-----"; - const FOOTER: &str = "-----END OPENSSH PRIVATE KEY-----"; - let (start, end) = match (input.find(HEADER), input.find(FOOTER)) { - (Some(h), Some(f)) => (h + HEADER.len(), f), - _ => return None, - }; - let body = &input[start..end]; - let b64: String = body - .lines() - .map(str::trim) - .filter(|l| !l.is_empty()) - .collect::>() - .join(""); - - let raw = match base64::engine::general_purpose::STANDARD.decode(b64.as_bytes()) { - Ok(v) => v, - Err(_) => return None, - }; - - let mut idx = 0usize; - let magic = b"openssh-key-v1\0"; - if raw.len() < magic.len() || &raw[..magic.len()] != magic { return None; } - idx += magic.len(); - - fn read_u32(buf: &[u8], idx: &mut usize) -> Option { - if *idx + 4 > buf.len() { return None; } - let v = u32::from_be_bytes([buf[*idx], buf[*idx + 1], buf[*idx + 2], buf[*idx + 3]]); - *idx += 4; - Some(v) - } - fn read_string<'a>(buf: &'a [u8], idx: &mut usize) -> Option<&'a [u8]> { - let n = read_u32(buf, idx)? as usize; - if *idx + n > buf.len() { return None; } - let s = &buf[*idx..*idx + n]; - *idx += n; - Some(s) - } - - let ciphername = read_string(&raw, &mut idx)?; - let kdfname = read_string(&raw, &mut idx)?; - let _kdfopts = read_string(&raw, &mut idx)?; - if ciphername != b"none" || kdfname != b"none" { return None; } - - let nkeys = read_u32(&raw, &mut idx)? as usize; - for _ in 0..nkeys { - let _ = read_string(&raw, &mut idx)?; - } - let private_block = read_string(&raw, &mut idx)?; - - let mut pidx = 0usize; - let check1 = read_u32(private_block, &mut pidx)?; - let check2 = read_u32(private_block, &mut pidx)?; - if check1 != check2 { return None; } - - let alg = read_string(private_block, &mut pidx)?; - if alg != b"ssh-ed25519" { return None; } - let _pubkey = read_string(private_block, &mut pidx)?; - let privkey = read_string(private_block, &mut pidx)?; - let comment_bytes = read_string(private_block, &mut pidx)?; - - // Build canonical keypair bytes - let mut keypair_bytes = [0u8; 64]; - if privkey.len() == 32 { - let seed: [u8; 32] = match privkey.try_into() { Ok(a) => a, Err(_) => return None }; - let sk = SigningKey::from_bytes(&seed); - let vk = sk.verifying_key(); - let pub_bytes = vk.to_bytes(); - keypair_bytes[..32].copy_from_slice(&seed); - keypair_bytes[32..].copy_from_slice(pub_bytes.as_ref()); - } else if privkey.len() == 64 { - keypair_bytes.copy_from_slice(privkey); - } else { - return None; - } - let ed_kp = match Ed25519Keypair::from_bytes(&keypair_bytes) { Ok(k) => k, Err(_) => return None }; - let comment = String::from_utf8(comment_bytes.to_vec()).unwrap_or_default(); - let key_data = KeypairData::from(ed_kp); - let private = match ssh_key::PrivateKey::new(key_data, comment) { Ok(p) => p, Err(_) => return None }; - match private.to_openssh(LineEnding::LF) { Ok(s) => Some(s.to_string()), Err(_) => None } - } - - let candidate = try_fix_seed_only_ed25519(input).unwrap_or_else(|| input.to_string()); - let parsed = russh::keys::PrivateKey::from_openssh(&candidate)?; - let canonical = parsed.to_openssh(russh_ssh_key::LineEnding::LF)?.to_string(); - Ok((canonical, parsed)) -} - -// ---------- Unit Tests ---------- -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn validate_private_key_rejects_invalid_constant() { - let invalid_key = "this is not a private key".to_string(); - let result = validate_private_key(invalid_key); - assert!(result.is_err(), "Expected Err for invalid key content"); - } - - - #[test] - fn validate_private_key_accepts_1() { - // Generated with: ssh-keygen -t ed25519 -C "test-ed25519@fressh.com" -f ./ed25519-with-comment - let valid_key = "-----BEGIN OPENSSH PRIVATE KEY----- -b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZW -QyNTUxOQAAACC7PhmC0yS0Q8LcUkRnoYCxpb4gkCjJhadvvf+TDlRBJwAAAKCX5GEsl+Rh -LAAAAAtzc2gtZWQyNTUxOQAAACC7PhmC0yS0Q8LcUkRnoYCxpb4gkCjJhadvvf+TDlRBJw -AAAEBmrg8TL0+2xypHjVpFeuQmgQf3Qn/A45Jz+zCwVgoBt7s+GYLTJLRDwtxSRGehgLGl -viCQKMmFp2+9/5MOVEEnAAAAF3Rlc3QtZWQyNTUxOUBmcmVzc2guY29tAQIDBAUG ------END OPENSSH PRIVATE KEY----- -".to_string(); - let result = validate_private_key(valid_key); - assert!(result.is_ok(), "Expected Ok for valid key content"); - } - #[test] - fn validate_private_key_accepts_2() { - // Generated with: ssh-keygen -t ed25519 -f ./ed25519-wo-comment - let valid_key = "-----BEGIN OPENSSH PRIVATE KEY----- -b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZW -QyNTUxOQAAACD/icJYduvcR9JPKw9g/bPWpsgS0IAaJxlYL5yeuOaNMgAAAJjDAt7NwwLe -zQAAAAtzc2gtZWQyNTUxOQAAACD/icJYduvcR9JPKw9g/bPWpsgS0IAaJxlYL5yeuOaNMg -AAAEDYE6BYf7QlpAaJCfaxA/HN487NM9iIF7VGue/iefZIyP+Jwlh269xH0k8rD2D9s9am -yBLQgBonGVgvnJ645o0yAAAADmV0aGFuQEV0aGFuLVBDAQIDBAUGBw== ------END OPENSSH PRIVATE KEY----- -".to_string(); - let result = validate_private_key(valid_key); - assert!(result.is_ok(), "Expected Ok for valid key content"); - } - #[test] - fn validate_private_key_accepts_3() { - // Generated with: ssh-keygen -t ed25519 -C "" -N "" -f ./ed25519-wo-comment-and-hostname - let valid_key = "-----BEGIN OPENSSH PRIVATE KEY----- -b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZW -QyNTUxOQAAACDt2ZcFrEhB8/B4uu30mPIi3BWWEa/wE//IUXLeL9YevAAAAIg90nGHPdJx -hwAAAAtzc2gtZWQyNTUxOQAAACDt2ZcFrEhB8/B4uu30mPIi3BWWEa/wE//IUXLeL9YevA -AAAEBMtZWpjpVnzDhYKR3V09SLohGqkW7HgMXoF8f0zf+/Pu3ZlwWsSEHz8Hi67fSY8iLc -FZYRr/AT/8hRct4v1h68AAAAAAECAwQF ------END OPENSSH PRIVATE KEY----- -".to_string(); - let result = validate_private_key(valid_key); - assert!(result.is_ok(), "Expected Ok for valid key content"); - } - #[test] - fn validate_private_key_accepts_4() { - // Generated with juicessh - let valid_key = "-----BEGIN OPENSSH PRIVATE KEY----- -b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZWQyNTUxOQAAACCh5IbLI9ypdFzNW8WvezgBrzJT/2mT9BKSdZScB4EYoQAAAJB8YyoafGMqGgAAAAtzc2gtZWQyNTUxOQAAACCh5IbLI9ypdFzNW8WvezgBrzJT/2mT9BKSdZScB4EYoQAAAECpYzHTSiKC2iehjck1n8GAp5mdGuB2J5vV+9U3MAvthKHkhssj3Kl0XM1bxa97OAGvMlP/aZP0EpJ1lJwHgRihAAAAAAECAwQFBgcICQoLDA0= ------END OPENSSH PRIVATE KEY----- -".to_string(); - let result = validate_private_key(valid_key); - assert!(result.is_ok(), "Expected Ok for valid key content"); - } -} diff --git a/packages/react-native-uniffi-russh/rust/uniffi-russh/src/private_key.rs b/packages/react-native-uniffi-russh/rust/uniffi-russh/src/private_key.rs new file mode 100644 index 0000000..717c0dd --- /dev/null +++ b/packages/react-native-uniffi-russh/rust/uniffi-russh/src/private_key.rs @@ -0,0 +1,203 @@ + + + +#[derive(Debug, Clone, Copy, PartialEq, uniffi::Enum)] +pub enum KeyType { + Rsa, + Ecdsa, + Ed25519, + Ed448, +} + + +#[uniffi::export] +pub fn validate_private_key(private_key_content: String) -> Result { + // Normalize and parse once; return canonical OpenSSH string. + let (canonical, _parsed) = normalize_openssh_ed25519_seed_key(&private_key_content)?; + Ok(canonical) +} + +#[uniffi::export] +pub fn generate_key_pair(key_type: KeyType) -> Result { + let mut rng = OsRng; + let key = match key_type { + KeyType::Rsa => russh_keys::PrivateKey::random(&mut rng, Algorithm::Rsa { hash: None })?, + KeyType::Ecdsa => russh_keys::PrivateKey::random(&mut rng, Algorithm::Ecdsa { curve: EcdsaCurve::NistP256 })?, + KeyType::Ed25519 => russh_keys::PrivateKey::random(&mut rng, Algorithm::Ed25519)?, + KeyType::Ed448 => return Err(SshError::UnsupportedKeyType), + }; + Ok(key.to_openssh(LineEnding::LF)?.to_string()) +} + + + +// Best-effort fix for OpenSSH ed25519 keys that store only a 32-byte seed in +// the private section (instead of 64 bytes consisting of seed || public). +// If the input matches an unencrypted OpenSSH ed25519 key with a 32-byte +// private field, this function returns a normalized PEM string with the +// correct 64-byte private field (seed || public). Otherwise, returns None. +fn normalize_openssh_ed25519_seed_key( + input: &str, +) -> Result<(String, russh::keys::PrivateKey), russh_ssh_key::Error> { + // If it already parses, return canonical string and parsed key. + if let Ok(parsed) = russh::keys::PrivateKey::from_openssh(input) { + let canonical = parsed.to_openssh(russh_ssh_key::LineEnding::LF)?.to_string(); + return Ok((canonical, parsed)); + } + + // Try to fix seed-only Ed25519 keys and re-parse. + fn try_fix_seed_only_ed25519(input: &str) -> Option { + // Minimal OpenSSH container parse to detect seed-only Ed25519 + const HEADER: &str = "-----BEGIN OPENSSH PRIVATE KEY-----"; + const FOOTER: &str = "-----END OPENSSH PRIVATE KEY-----"; + let (start, end) = match (input.find(HEADER), input.find(FOOTER)) { + (Some(h), Some(f)) => (h + HEADER.len(), f), + _ => return None, + }; + let body = &input[start..end]; + let b64: String = body + .lines() + .map(str::trim) + .filter(|l| !l.is_empty()) + .collect::>() + .join(""); + + let raw = match base64::engine::general_purpose::STANDARD.decode(b64.as_bytes()) { + Ok(v) => v, + Err(_) => return None, + }; + + let mut idx = 0usize; + let magic = b"openssh-key-v1\0"; + if raw.len() < magic.len() || &raw[..magic.len()] != magic { return None; } + idx += magic.len(); + + fn read_u32(buf: &[u8], idx: &mut usize) -> Option { + if *idx + 4 > buf.len() { return None; } + let v = u32::from_be_bytes([buf[*idx], buf[*idx + 1], buf[*idx + 2], buf[*idx + 3]]); + *idx += 4; + Some(v) + } + fn read_string<'a>(buf: &'a [u8], idx: &mut usize) -> Option<&'a [u8]> { + let n = read_u32(buf, idx)? as usize; + if *idx + n > buf.len() { return None; } + let s = &buf[*idx..*idx + n]; + *idx += n; + Some(s) + } + + let ciphername = read_string(&raw, &mut idx)?; + let kdfname = read_string(&raw, &mut idx)?; + let _kdfopts = read_string(&raw, &mut idx)?; + if ciphername != b"none" || kdfname != b"none" { return None; } + + let nkeys = read_u32(&raw, &mut idx)? as usize; + for _ in 0..nkeys { + let _ = read_string(&raw, &mut idx)?; + } + let private_block = read_string(&raw, &mut idx)?; + + let mut pidx = 0usize; + let check1 = read_u32(private_block, &mut pidx)?; + let check2 = read_u32(private_block, &mut pidx)?; + if check1 != check2 { return None; } + + let alg = read_string(private_block, &mut pidx)?; + if alg != b"ssh-ed25519" { return None; } + let _pubkey = read_string(private_block, &mut pidx)?; + let privkey = read_string(private_block, &mut pidx)?; + let comment_bytes = read_string(private_block, &mut pidx)?; + + // Build canonical keypair bytes + let mut keypair_bytes = [0u8; 64]; + if privkey.len() == 32 { + let seed: [u8; 32] = match privkey.try_into() { Ok(a) => a, Err(_) => return None }; + let sk = SigningKey::from_bytes(&seed); + let vk = sk.verifying_key(); + let pub_bytes = vk.to_bytes(); + keypair_bytes[..32].copy_from_slice(&seed); + keypair_bytes[32..].copy_from_slice(pub_bytes.as_ref()); + } else if privkey.len() == 64 { + keypair_bytes.copy_from_slice(privkey); + } else { + return None; + } + let ed_kp = match Ed25519Keypair::from_bytes(&keypair_bytes) { Ok(k) => k, Err(_) => return None }; + let comment = String::from_utf8(comment_bytes.to_vec()).unwrap_or_default(); + let key_data = KeypairData::from(ed_kp); + let private = match ssh_key::PrivateKey::new(key_data, comment) { Ok(p) => p, Err(_) => return None }; + match private.to_openssh(LineEnding::LF) { Ok(s) => Some(s.to_string()), Err(_) => None } + } + + let candidate = try_fix_seed_only_ed25519(input).unwrap_or_else(|| input.to_string()); + let parsed = russh::keys::PrivateKey::from_openssh(&candidate)?; + let canonical = parsed.to_openssh(russh_ssh_key::LineEnding::LF)?.to_string(); + Ok((canonical, parsed)) +} + + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn validate_private_key_rejects_invalid_constant() { + let invalid_key = "this is not a private key".to_string(); + let result = validate_private_key(invalid_key); + assert!(result.is_err(), "Expected Err for invalid key content"); + } + + + #[test] + fn validate_private_key_accepts_1() { + // Generated with: ssh-keygen -t ed25519 -C "test-ed25519@fressh.com" -f ./ed25519-with-comment + let valid_key = "-----BEGIN OPENSSH PRIVATE KEY----- +b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZW +QyNTUxOQAAACC7PhmC0yS0Q8LcUkRnoYCxpb4gkCjJhadvvf+TDlRBJwAAAKCX5GEsl+Rh +LAAAAAtzc2gtZWQyNTUxOQAAACC7PhmC0yS0Q8LcUkRnoYCxpb4gkCjJhadvvf+TDlRBJw +AAAEBmrg8TL0+2xypHjVpFeuQmgQf3Qn/A45Jz+zCwVgoBt7s+GYLTJLRDwtxSRGehgLGl +viCQKMmFp2+9/5MOVEEnAAAAF3Rlc3QtZWQyNTUxOUBmcmVzc2guY29tAQIDBAUG +-----END OPENSSH PRIVATE KEY----- +".to_string(); + let result = validate_private_key(valid_key); + assert!(result.is_ok(), "Expected Ok for valid key content"); + } + #[test] + fn validate_private_key_accepts_2() { + // Generated with: ssh-keygen -t ed25519 -f ./ed25519-wo-comment + let valid_key = "-----BEGIN OPENSSH PRIVATE KEY----- +b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZW +QyNTUxOQAAACD/icJYduvcR9JPKw9g/bPWpsgS0IAaJxlYL5yeuOaNMgAAAJjDAt7NwwLe +zQAAAAtzc2gtZWQyNTUxOQAAACD/icJYduvcR9JPKw9g/bPWpsgS0IAaJxlYL5yeuOaNMg +AAAEDYE6BYf7QlpAaJCfaxA/HN487NM9iIF7VGue/iefZIyP+Jwlh269xH0k8rD2D9s9am +yBLQgBonGVgvnJ645o0yAAAADmV0aGFuQEV0aGFuLVBDAQIDBAUGBw== +-----END OPENSSH PRIVATE KEY----- +".to_string(); + let result = validate_private_key(valid_key); + assert!(result.is_ok(), "Expected Ok for valid key content"); + } + #[test] + fn validate_private_key_accepts_3() { + // Generated with: ssh-keygen -t ed25519 -C "" -N "" -f ./ed25519-wo-comment-and-hostname + let valid_key = "-----BEGIN OPENSSH PRIVATE KEY----- +b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZW +QyNTUxOQAAACDt2ZcFrEhB8/B4uu30mPIi3BWWEa/wE//IUXLeL9YevAAAAIg90nGHPdJx +hwAAAAtzc2gtZWQyNTUxOQAAACDt2ZcFrEhB8/B4uu30mPIi3BWWEa/wE//IUXLeL9YevA +AAAEBMtZWpjpVnzDhYKR3V09SLohGqkW7HgMXoF8f0zf+/Pu3ZlwWsSEHz8Hi67fSY8iLc +FZYRr/AT/8hRct4v1h68AAAAAAECAwQF +-----END OPENSSH PRIVATE KEY----- +".to_string(); + let result = validate_private_key(valid_key); + assert!(result.is_ok(), "Expected Ok for valid key content"); + } + #[test] + fn validate_private_key_accepts_4() { + // Generated with juicessh + let valid_key = "-----BEGIN OPENSSH PRIVATE KEY----- +b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZWQyNTUxOQAAACCh5IbLI9ypdFzNW8WvezgBrzJT/2mT9BKSdZScB4EYoQAAAJB8YyoafGMqGgAAAAtzc2gtZWQyNTUxOQAAACCh5IbLI9ypdFzNW8WvezgBrzJT/2mT9BKSdZScB4EYoQAAAECpYzHTSiKC2iehjck1n8GAp5mdGuB2J5vV+9U3MAvthKHkhssj3Kl0XM1bxa97OAGvMlP/aZP0EpJ1lJwHgRihAAAAAAECAwQFBgcICQoLDA0= +-----END OPENSSH PRIVATE KEY----- +".to_string(); + let result = validate_private_key(valid_key); + assert!(result.is_ok(), "Expected Ok for valid key content"); + } +} diff --git a/packages/react-native-uniffi-russh/rust/uniffi-russh/src/ssh_connection.rs b/packages/react-native-uniffi-russh/rust/uniffi-russh/src/ssh_connection.rs new file mode 100644 index 0000000..10e4410 --- /dev/null +++ b/packages/react-native-uniffi-russh/rust/uniffi-russh/src/ssh_connection.rs @@ -0,0 +1,303 @@ + + + + + + +#[derive(Debug, Clone, PartialEq, uniffi::Enum)] +pub enum Security { + Password { password: String }, + Key { private_key_content: String }, // (key-based auth can be wired later) +} + +#[derive(Debug, Clone, PartialEq, uniffi::Record)] +pub struct ConnectionDetails { + pub host: String, + pub port: u16, + pub username: String, + pub security: Security, +} + +#[derive(Clone, uniffi::Record)] +pub struct ConnectOptions { + pub connection_details: ConnectionDetails, + pub on_connection_progress_callback: Option>, + pub on_disconnected_callback: Option>, +} + +#[derive(Debug, Clone, Copy, PartialEq, uniffi::Enum)] +pub enum SshConnectionProgressEvent { + // Before any progress events, assume: TcpConnecting + TcpConnected, + SshHandshake, + // If promise has not resolved, assume: Authenticating + // After promise resolves, assume: Connected +} + +#[uniffi::export(with_foreign)] +pub trait ConnectProgressCallback: Send + Sync { + fn on_change(&self, status: SshConnectionProgressEvent); +} + +#[uniffi::export(with_foreign)] +pub trait ConnectionDisconnectedCallback: Send + Sync { + fn on_change(&self, connection_id: String); +} + + + +#[derive(uniffi::Object)] +pub struct SshConnection { + info: SshConnectionInfo, + on_disconnected_callback: Option>, + client_handle: AsyncMutex>, + + shells: AsyncMutex>>, + + // Weak self for child sessions to refer back without cycles. + self_weak: AsyncMutex>, +} + + + + + +impl fmt::Debug for SshConnection { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("SshConnectionHandle") + .field("info.connection_details", &self.info.connection_details) + .field("info.created_at_ms", &self.info.created_at_ms) + .field("info.connected_at_ms", &self.info.connected_at_ms) + .finish() + } +} + + + + +#[uniffi::export(async_runtime = "tokio")] +impl SshConnection { + /// Convenience snapshot for property-like access in TS. + pub fn get_info(&self) -> SshConnectionInfo { + self.info.clone() + } + + pub async fn start_shell(&self, opts: StartShellOptions) -> Result, SshError> { + + let started_at_ms = now_ms(); + + let term = opts.term; + let on_closed_callback = opts.on_closed_callback.clone(); + + let client_handle = self.client_handle.lock().await; + + let ch = client_handle.channel_open_session().await?; + let channel_id: u32 = ch.id().into(); + + let mut modes: Vec<(russh::Pty, u32)> = DEFAULT_TERMINAL_MODES.to_vec(); + if let Some(terminal_mode_params) = &opts.terminal_mode { + for m in terminal_mode_params { + if let Some(pty) = russh::Pty::from_u8(m.opcode) { + if let Some(pos) = modes.iter().position(|(p, _)| *p as u8 == m.opcode) { + modes[pos].1 = m.value; // override + } else { + modes.push((pty, m.value)); // add + } + } + } + } + + let row_height = opts.terminal_size.as_ref().and_then(|s| s.row_height).unwrap_or(DEFAULT_TERM_ROW_HEIGHT); + let col_width = opts.terminal_size.as_ref().and_then(|s| s.col_width).unwrap_or(DEFAULT_TERM_COL_WIDTH); + let pixel_width = opts.terminal_pixel_size.as_ref().and_then(|s| s.pixel_width).unwrap_or(DEFAULT_TERM_PIXEL_WIDTH); + let pixel_height= opts.terminal_pixel_size.as_ref().and_then(|s| s.pixel_height).unwrap_or(DEFAULT_TERM_PIXEL_HEIGHT); + + ch.request_pty(true, term.as_ssh_name(), col_width, row_height, pixel_width, pixel_height, &modes).await?; + ch.request_shell(true).await?; + + // Split for read/write; spawn reader. + let (mut reader, writer) = ch.split(); + + // Setup ring + broadcast for this session + let (tx, _rx) = broadcast::channel::>(DEFAULT_BROADCAST_CHUNK_CAPACITY); + let ring = Arc::new(Mutex::new(std::collections::VecDeque::>::new())); + let used_bytes = Arc::new(Mutex::new(0usize)); + let next_seq = Arc::new(AtomicU64::new(1)); + let head_seq = Arc::new(AtomicU64::new(1)); + let tail_seq = Arc::new(AtomicU64::new(0)); + let dropped_bytes_total = Arc::new(AtomicU64::new(0)); + let ring_bytes_capacity = Arc::new(AtomicUsize::new(DEFAULT_SHELL_RING_BUFFER_CAPACITY)); + let default_coalesce_ms = AtomicU64::new(DEFAULT_TERM_COALESCE_MS); + + let ring_clone = ring.clone(); + let used_bytes_clone = used_bytes.clone(); + let tx_clone = tx.clone(); + let ring_bytes_capacity_c = ring_bytes_capacity.clone(); + let dropped_bytes_total_c = dropped_bytes_total.clone(); + let head_seq_c = head_seq.clone(); + let tail_seq_c = tail_seq.clone(); + let next_seq_c = next_seq.clone(); + + let on_closed_callback_for_reader = on_closed_callback.clone(); + + let reader_task = tokio::spawn(async move { + let max_chunk = DEFAULT_MAX_CHUNK_SIZE; + loop { + match reader.wait().await { + Some(ChannelMsg::Data { data }) => { + append_and_broadcast( + &data, + StreamKind::Stdout, + &ring_clone, + &used_bytes_clone, + &ring_bytes_capacity_c, + &dropped_bytes_total_c, + &head_seq_c, + &tail_seq_c, + &next_seq_c, + &tx_clone, + max_chunk, + ); + } + Some(ChannelMsg::ExtendedData { data, .. }) => { + append_and_broadcast( + &data, + StreamKind::Stderr, + &ring_clone, + &used_bytes_clone, + &ring_bytes_capacity_c, + &dropped_bytes_total_c, + &head_seq_c, + &tail_seq_c, + &next_seq_c, + &tx_clone, + max_chunk, + ); + } + Some(ChannelMsg::Close) | None => { + if let Some(sl) = on_closed_callback_for_reader.as_ref() { + sl.on_change(channel_id); + } + break; + } + _ => {} + } + } + }); + + let session = Arc::new(ShellSession { + info: ShellSessionInfo { + channel_id, + created_at_ms: started_at_ms, + connected_at_ms: now_ms(), + term, + connection_id: self.info.connection_id.clone(), + }, + on_closed_callback, + parent: self.self_weak.lock().await.clone(), + + writer: AsyncMutex::new(writer), + reader_task, + + // Ring buffer + ring, + ring_bytes_capacity, + used_bytes, + dropped_bytes_total, + head_seq, + tail_seq, + + // Listener tasks management + sender: tx, + listener_tasks: Arc::new(Mutex::new(HashMap::new())), + next_listener_id: AtomicU64::new(1), + default_coalesce_ms, + rt_handle: tokio::runtime::Handle::current(), + }); + + self.shells.lock().await.insert(channel_id, session.clone()); + + + Ok(session) + } + + + pub async fn disconnect(&self) -> Result<(), SshError> { + // TODO: Check if we need to close all these if we are about to disconnect? + let sessions: Vec> = { + let map = self.shells.lock().await; + map.values().cloned().collect() + }; + for s in sessions { + s.close().await?; + } + + let h = self.client_handle.lock().await; + h.disconnect(Disconnect::ByApplication, "bye", "").await?; + + if let Some(on_disconnected_callback) = self.on_disconnected_callback.as_ref() { + on_disconnected_callback.on_change(self.info.connection_id.clone()); + } + + Ok(()) + } +} + + +#[uniffi::export(async_runtime = "tokio")] +pub async fn connect(options: ConnectOptions) -> Result, SshError> { + let started_at_ms = now_ms(); + let details = ConnectionDetails { + host: options.connection_details.host.clone(), + port: options.connection_details.port, + username: options.connection_details.username.clone(), + security: options.connection_details.security.clone(), + }; + + + // TCP + let addr = format!("{}:{}", details.host, details.port); + let socket = tokio::net::TcpStream::connect(&addr).await?; + let local_port = socket.local_addr()?.port(); + + let tcp_established_at_ms = now_ms(); + if let Some(sl) = options.on_connection_progress_callback.as_ref() { + sl.on_change(SshConnectionProgressEvent::TcpConnected); + } + let cfg = Arc::new(Config::default()); + let mut handle: ClientHandle = russh::client::connect_stream(cfg, socket, NoopHandler).await?; + let ssh_handshake_at_ms = now_ms(); + if let Some(sl) = options.on_connection_progress_callback.as_ref() { + sl.on_change(SshConnectionProgressEvent::SshHandshake); + } + let auth_result = match &details.security { + Security::Password { password } => { + handle.authenticate_password(details.username.clone(), password.clone()).await? + } + Security::Key { private_key_content } => { + // Normalize and parse using shared helper so RN-validated keys match runtime parsing. + let (_canonical, parsed) = normalize_openssh_ed25519_seed_key(private_key_content)?; + let pk_with_hash = PrivateKeyWithHashAlg::new(Arc::new(parsed), None); + handle.authenticate_publickey(details.username.clone(), pk_with_hash).await? + } + }; + if !matches!(auth_result, russh::client::AuthResult::Success) { return Err(auth_result.into()); } + + let connection_id = format!("{}@{}:{}:{}", details.username, details.host, details.port, local_port); + let conn = Arc::new(SshConnection { + info: SshConnectionInfo { + connection_id, + connection_details: details, + created_at_ms: started_at_ms, + connected_at_ms: now_ms(), + progress_timings: SshConnectionInfoProgressTimings { tcp_established_at_ms, ssh_handshake_at_ms }, + }, + client_handle: AsyncMutex::new(handle), + shells: AsyncMutex::new(HashMap::new()), + self_weak: AsyncMutex::new(Weak::new()), + on_disconnected_callback: options.on_disconnected_callback.clone(), + }); + // Initialize weak self reference. + *conn.self_weak.lock().await = Arc::downgrade(&conn); + Ok(conn) +} diff --git a/packages/react-native-uniffi-russh/rust/uniffi-russh/src/ssh_shell.rs b/packages/react-native-uniffi-russh/rust/uniffi-russh/src/ssh_shell.rs new file mode 100644 index 0000000..02923ac --- /dev/null +++ b/packages/react-native-uniffi-russh/rust/uniffi-russh/src/ssh_shell.rs @@ -0,0 +1,508 @@ + + + + +// Note: russh accepts an untyped string for the terminal type +#[derive(Debug, Clone, Copy, PartialEq, uniffi::Enum)] +pub enum TerminalType { + Vanilla, + Vt100, + Vt102, + Vt220, + Ansi, + Xterm, + Xterm256, +} +impl TerminalType { + fn as_ssh_name(self) -> &'static str { + match self { + TerminalType::Vanilla => "vanilla", + TerminalType::Vt100 => "vt100", + TerminalType::Vt102 => "vt102", + TerminalType::Vt220 => "vt220", + TerminalType::Ansi => "ansi", + TerminalType::Xterm => "xterm", + TerminalType::Xterm256 => "xterm-256color", + } + } +} + + + +#[derive(Debug, Clone, Copy, PartialEq, uniffi::Enum)] +pub enum StreamKind { Stdout, Stderr } + +#[derive(Debug, Clone, PartialEq, uniffi::Record)] +pub struct TerminalChunk { + pub seq: u64, + pub t_ms: f64, + pub stream: StreamKind, + pub bytes: Vec, +} + +#[derive(Debug, Clone, PartialEq, uniffi::Record)] +pub struct DroppedRange { pub from_seq: u64, pub to_seq: u64 } + +#[derive(Debug, Clone, PartialEq, uniffi::Enum)] +pub enum ShellEvent { + Chunk(TerminalChunk), + Dropped { from_seq: u64, to_seq: u64 }, +} + +#[uniffi::export(with_foreign)] +pub trait ShellListener: Send + Sync { + fn on_event(&self, ev: ShellEvent); +} + + + +#[derive(Debug, Clone, Copy, PartialEq, uniffi::Record)] +pub struct TerminalMode { + pub opcode: u8, // PTY opcode (matches russh::Pty discriminants) + pub value: u32, +} + + + + +#[derive(Debug, Clone, Copy, PartialEq, uniffi::Record)] +pub struct TerminalSize { + pub row_height: Option, + pub col_width: Option, +} + +#[derive(Debug, Clone, Copy, PartialEq, uniffi::Record)] +pub struct TerminalPixelSize { + pub pixel_width: Option, + pub pixel_height: Option, +} + +#[derive(Clone, uniffi::Record)] +pub struct StartShellOptions { + pub term: TerminalType, + pub terminal_mode: Option>, + pub terminal_size: Option, + pub terminal_pixel_size: Option, + pub on_closed_callback: Option>, +} + +#[uniffi::export(with_foreign)] +pub trait ShellClosedCallback: Send + Sync { + fn on_change(&self, channel_id: u32); +} + +#[derive(Debug, Clone, PartialEq, uniffi::Record)] +pub struct SshConnectionInfoProgressTimings { + // TODO: We should have a field for each SshConnectionProgressEvent. Would be great if this were enforced by the compiler. + pub tcp_established_at_ms: f64, + pub ssh_handshake_at_ms: f64, +} + +/// Snapshot of current connection info for property-like access in TS. +#[derive(Debug, Clone, PartialEq, uniffi::Record)] +pub struct SshConnectionInfo { + pub connection_id: String, + pub connection_details: ConnectionDetails, + pub created_at_ms: f64, + pub connected_at_ms: f64, + pub progress_timings: SshConnectionInfoProgressTimings, +} + +/// Snapshot of shell session info for property-like access in TS. +#[derive(Debug, Clone, PartialEq, uniffi::Record)] +pub struct ShellSessionInfo { + pub channel_id: u32, + pub created_at_ms: f64, + pub connected_at_ms: f64, + pub term: TerminalType, + pub connection_id: String, +} + + +#[derive(uniffi::Object)] +pub struct ShellSession { + info: ShellSessionInfo, + on_closed_callback: Option>, + + // Weak backref; avoid retain cycle. + parent: std::sync::Weak, + + writer: AsyncMutex>, + // We keep the reader task to allow cancellation on close. + reader_task: tokio::task::JoinHandle<()>, + + // Ring buffer + ring: Arc>>>, + ring_bytes_capacity: Arc, + used_bytes: Arc>, + dropped_bytes_total: Arc, + head_seq: Arc, + tail_seq: Arc, + + // Live broadcast + sender: broadcast::Sender>, + + // Listener tasks management + listener_tasks: Arc>>>, + next_listener_id: AtomicU64, + default_coalesce_ms: AtomicU64, + rt_handle: tokio::runtime::Handle, +} + + +#[derive(Debug, Clone, PartialEq, uniffi::Enum)] +pub enum Cursor { + Head, // start from the beginning + TailBytes { bytes: u64 }, // start from the end of the last N bytes + Seq { seq: u64 }, // start from the given sequence number + TimeMs { t_ms: f64 }, // start from the given time in milliseconds + Live, // start from the live stream +} + +#[derive(Debug, Clone, PartialEq, uniffi::Record)] +pub struct ListenerOptions { + pub cursor: Cursor, + pub coalesce_ms: Option, // coalesce chunks into this many milliseconds +} + +#[derive(Debug, Clone, PartialEq, uniffi::Record)] +pub struct BufferReadResult { + pub chunks: Vec, + pub next_seq: u64, + pub dropped: Option, +} + +#[derive(Debug, Clone, PartialEq, uniffi::Record)] +pub struct BufferStats { + pub ring_bytes_count: u64, + pub used_bytes: u64, + pub head_seq: u64, + pub tail_seq: u64, + pub dropped_bytes_total: u64, + + pub chunks_count: u64, +} + + + +// Internal chunk type kept in ring/broadcast +#[derive(Debug)] +struct Chunk { // TODO: This is very similar to TerminalChunk. The only difference is the bytes field + seq: u64, + t_ms: f64, + stream: StreamKind, + bytes: Bytes, +} + + +/// Minimal client::Handler. +struct NoopHandler; +impl client::Handler for NoopHandler { + type Error = SshError; + // Accept any server key for now so dev UX isn't blocked. + // TODO: Add known-hosts verification and surface API to control this. + #[allow(unused_variables)] + fn check_server_key( + &mut self, + _server_public_key: &russh::keys::PublicKey, + ) -> impl std::future::Future::Error>> + std::marker::Send { + std::future::ready(Ok(true)) + } +} + + +/// ---------- Methods ---------- +static DEFAULT_TERMINAL_MODES: &[(russh::Pty, u32)] = &[ + (russh::Pty::ECHO, 1), // This will cause the terminal to echo the characters back to the client. + (russh::Pty::ECHOK, 1), // After the line-kill character (often Ctrl+U), echo a newline. + (russh::Pty::ECHOE, 1), // Visually erase on backspace (erase using BS-SP-BS sequence). + (russh::Pty::ICANON, 1), // Canonical (cooked) mode: line editing; input delivered line-by-line. + (russh::Pty::ISIG, 1), // Generate signals on special chars (e.g., Ctrl+C -> SIGINT, Ctrl+Z -> SIGTSTP). + (russh::Pty::ICRNL, 1), // Convert carriage return (CR, \r) to newline (NL, \n) on input. + (russh::Pty::ONLCR, 1), // Convert newline (NL) to CR+NL on output (LF -> CRLF). + (russh::Pty::TTY_OP_ISPEED, 38400), // Set input baud rate (here 38400). The baud rate is the number of characters per second. + (russh::Pty::TTY_OP_OSPEED, 38400), // Set output baud rate (here 38400). The baud rate is the number of characters per second. +]; + +static DEFAULT_TERM_ROW_HEIGHT: u32 = 24; +static DEFAULT_TERM_COL_WIDTH: u32 = 80; +static DEFAULT_TERM_PIXEL_WIDTH: u32 = 0; +static DEFAULT_TERM_PIXEL_HEIGHT: u32 = 0; +static DEFAULT_TERM_COALESCE_MS: u64 = 16; + +// Number of recent live chunks retained by the broadcast channel for each +// subscriber. If a subscriber falls behind this many messages, they will get a +// Lagged error and skip to the latest. Tune to: peak_chunks_per_sec × max_pause_sec. +static DEFAULT_BROADCAST_CHUNK_CAPACITY: usize = 1024; + +// Byte budget for the on-heap replay/history ring buffer. When the total bytes +// of stored chunks exceed this, oldest chunks are evicted. Increase for a +// longer replay window at the cost of memory. +static DEFAULT_SHELL_RING_BUFFER_CAPACITY: usize = 2 * 1024 * 1024; // default 2MiB + +// Upper bound for the size of a single appended/broadcast chunk. Incoming data +// is split into slices no larger than this. Smaller values reduce latency and +// loss impact; larger values reduce per-message overhead. +static DEFAULT_MAX_CHUNK_SIZE: usize = 16 * 1024; // 16KB + +static DEFAULT_READ_BUFFER_MAX_BYTES: u64 = 512 * 1024; // 512KB + + + +#[uniffi::export(async_runtime = "tokio")] +impl ShellSession { + pub fn get_info(&self) -> ShellSessionInfo { + self.info.clone() + } + + /// Send bytes to the active shell (stdin). + pub async fn send_data(&self, data: Vec) -> Result<(), SshError> { + let w = self.writer.lock().await; + w.data(&data[..]).await?; + Ok(()) + } + + /// Close the associated shell channel and stop its reader task. + pub async fn close(&self) -> Result<(), SshError> { self.close_internal().await } + + /// Buffer statistics snapshot. + pub fn buffer_stats(&self) -> BufferStats { + let used = *self.used_bytes.lock().unwrap_or_else(|p| p.into_inner()) as u64; + let chunks_count = match self.ring.lock() { Ok(q) => q.len() as u64, Err(p) => p.into_inner().len() as u64 }; + BufferStats { + ring_bytes_count: self.ring_bytes_capacity.load(Ordering::Relaxed) as u64, + used_bytes: used, + chunks_count, + head_seq: self.head_seq.load(Ordering::Relaxed), + tail_seq: self.tail_seq.load(Ordering::Relaxed), + dropped_bytes_total: self.dropped_bytes_total.load(Ordering::Relaxed), + } + } + + /// Current next sequence number. + pub fn current_seq(&self) -> u64 { self.tail_seq.load(Ordering::Relaxed).saturating_add(1) } + + /// Read the ring buffer from a cursor. + pub fn read_buffer(&self, cursor: Cursor, max_bytes: Option) -> BufferReadResult { + let max_total = max_bytes.unwrap_or(DEFAULT_READ_BUFFER_MAX_BYTES) as usize; + let mut out_chunks: Vec = Vec::new(); + let mut dropped: Option = None; + let head_seq_now = self.head_seq.load(Ordering::Relaxed); + let tail_seq_now = self.tail_seq.load(Ordering::Relaxed); + + // Lock ring to determine start and collect arcs, then drop lock. + let (_start_idx_unused, _start_seq, arcs): (usize, u64, Vec>) = { + let ring = match self.ring.lock() { Ok(g) => g, Err(p) => p.into_inner() }; + let (start_seq, idx) = match cursor { + Cursor::Head => (head_seq_now, 0usize), + Cursor::Seq { seq: mut s } => { + if s < head_seq_now { dropped = Some(DroppedRange { from_seq: s, to_seq: head_seq_now - 1 }); s = head_seq_now; } + let idx = s.saturating_sub(head_seq_now) as usize; + (s, idx.min(ring.len())) + } + Cursor::TimeMs { t_ms: t } => { + // linear scan to find first chunk with t_ms >= t + let mut idx = 0usize; let mut s = head_seq_now; + for (i, ch) in ring.iter().enumerate() { if ch.t_ms >= t { idx = i; s = ch.seq; break; } } + (s, idx) + } + Cursor::TailBytes { bytes: n } => { + // Walk from tail backwards until approx n bytes, then forward. + let mut bytes = 0usize; let mut idx = ring.len(); + for i in (0..ring.len()).rev() { + let b = ring[i].bytes.len(); + if bytes >= n as usize { idx = i + 1; break; } + bytes += b; idx = i; + } + let s = if idx < ring.len() { ring[idx].seq } else { tail_seq_now.saturating_add(1) }; + (s, idx) + } + Cursor::Live => (tail_seq_now.saturating_add(1), ring.len()), + }; + let arcs: Vec> = ring.iter().skip(idx).cloned().collect(); + (idx, start_seq, arcs) + }; + + // Build output respecting max_bytes + let mut total = 0usize; + for ch in arcs { + let len = ch.bytes.len(); + if total + len > max_total { break; } + out_chunks.push(TerminalChunk { seq: ch.seq, t_ms: ch.t_ms, stream: ch.stream, bytes: ch.bytes.clone().to_vec() }); + total += len; + } + let next_seq = if let Some(last) = out_chunks.last() { last.seq + 1 } else { tail_seq_now.saturating_add(1) }; + BufferReadResult { chunks: out_chunks, next_seq, dropped } + } + + /// Add a listener with optional replay and live follow. + pub fn add_listener(&self, listener: Arc, opts: ListenerOptions) -> Result { + // Snapshot for replay; emit from task to avoid re-entrant callbacks during FFI. + let replay = self.read_buffer(opts.cursor.clone(), None); + let mut rx = self.sender.subscribe(); + let id = self.next_listener_id.fetch_add(1, Ordering::Relaxed); + let default_coalesce_ms = self.default_coalesce_ms.load(Ordering::Relaxed) as u32; + let coalesce_ms = opts.coalesce_ms.unwrap_or(default_coalesce_ms); + + let rt = self.rt_handle.clone(); + let handle = rt.spawn(async move { + // Emit replay first + if let Some(dr) = replay.dropped.as_ref() { + listener.on_event(ShellEvent::Dropped { from_seq: dr.from_seq, to_seq: dr.to_seq }); + } + for ch in replay.chunks.into_iter() { + listener.on_event(ShellEvent::Chunk(ch)); + } + + let mut last_seq_seen: u64 = replay.next_seq.saturating_sub(1); + let mut acc: Vec = Vec::new(); + let mut acc_stream: Option; + let mut acc_last_seq: u64; + let mut acc_last_t: f64; + let window = Duration::from_millis(coalesce_ms as u64); + let mut pending_drop_from: Option = None; + + loop { + // First receive an item + let first = match rx.recv().await { + Ok(c) => c, + Err(broadcast::error::RecvError::Lagged(_n)) => { pending_drop_from = Some(last_seq_seen.saturating_add(1)); continue; } + Err(broadcast::error::RecvError::Closed) => break, + }; + if let Some(from) = pending_drop_from.take() { + if from <= first.seq.saturating_sub(1) { + listener.on_event(ShellEvent::Dropped { from_seq: from, to_seq: first.seq - 1 }); + } + } + // Start accumulating + acc.clear(); acc_stream = Some(first.stream); acc_last_seq = first.seq; acc_last_t = first.t_ms; acc.extend_from_slice(&first.bytes); + last_seq_seen = first.seq; + + // Drain within window while same stream + let mut deadline = tokio::time::Instant::now() + window; + loop { + let timeout = tokio::time::sleep_until(deadline); + tokio::pin!(timeout); + tokio::select! { + _ = &mut timeout => break, + msg = rx.recv() => { + match msg { + Ok(c) => { + if Some(c.stream) == acc_stream { acc.extend_from_slice(&c.bytes); acc_last_seq = c.seq; acc_last_t = c.t_ms; last_seq_seen = c.seq; } + else { // flush and start new + let chunk = TerminalChunk { seq: acc_last_seq, t_ms: acc_last_t, stream: acc_stream.unwrap_or(StreamKind::Stdout), bytes: std::mem::take(&mut acc) }; + listener.on_event(ShellEvent::Chunk(chunk)); + acc_stream = Some(c.stream); acc_last_seq = c.seq; acc_last_t = c.t_ms; acc.extend_from_slice(&c.bytes); last_seq_seen = c.seq; + deadline = tokio::time::Instant::now() + window; + } + } + Err(broadcast::error::RecvError::Lagged(_n)) => { pending_drop_from = Some(last_seq_seen.saturating_add(1)); break; } + Err(broadcast::error::RecvError::Closed) => { break; } + } + } + } + } + if let Some(s) = acc_stream.take() { + let chunk = TerminalChunk { seq: acc_last_seq, t_ms: acc_last_t, stream: s, bytes: std::mem::take(&mut acc) }; + listener.on_event(ShellEvent::Chunk(chunk)); + } + } + }); + if let Ok(mut map) = self.listener_tasks.lock() { map.insert(id, handle); } + Ok(id) + } + + pub fn remove_listener(&self, id: u64) { + if let Ok(mut map) = self.listener_tasks.lock() { + if let Some(h) = map.remove(&id) { h.abort(); } + } + } +} + + +// Internal lifecycle helpers (not exported via UniFFI) +impl ShellSession { + async fn close_internal(&self) -> Result<(), SshError> { + // Try to close channel gracefully; ignore error. + self.writer.lock().await.close().await.ok(); + self.reader_task.abort(); + if let Some(sl) = self.on_closed_callback.as_ref() { + sl.on_change(self.info.channel_id); + } + // Clear parent's notion of active shell if it matches us. + if let Some(parent) = self.parent.upgrade() { + parent.shells.lock().await.remove(&self.info.channel_id); + } + Ok(()) + } + + // /// This was on the public interface but I don't think we need it + // pub async fn set_buffer_policy(&self, ring_bytes: Option, coalesce_ms: Option) { + // if let Some(rb) = ring_bytes { self.ring_bytes_capacity.store(rb as usize, Ordering::Relaxed); self.evict_if_needed(); } + // if let Some(cm) = coalesce_ms { self.default_coalesce_ms.store(cm as u64, Ordering::Relaxed); } + // } + + // fn evict_if_needed(&self) { + // let cap = self.ring_bytes_capacity.load(Ordering::Relaxed); + // let mut ring = match self.ring.lock() { Ok(g) => g, Err(p) => p.into_inner() }; + // let mut used = self.used_bytes.lock().unwrap_or_else(|p| p.into_inner()); + // while *used > cap { + // if let Some(front) = ring.pop_front() { + // *used -= front.bytes.len(); + // self.dropped_bytes_total.fetch_add(front.bytes.len() as u64, Ordering::Relaxed); + // self.head_seq.store(front.seq.saturating_add(1), Ordering::Relaxed); + // } else { break; } + // } + // } +} + + +#[allow(clippy::too_many_arguments)] +fn append_and_broadcast( + data: &[u8], + stream: StreamKind, + ring: &Arc>>>, + used_bytes: &Arc>, + ring_bytes_capacity: &Arc, + dropped_bytes_total: &Arc, + head_seq: &Arc, + tail_seq: &Arc, + next_seq: &Arc, + sender: &broadcast::Sender>, + max_chunk: usize, +) { + let mut offset = 0usize; + while offset < data.len() { + let end = (offset + max_chunk).min(data.len()); + let slice = &data[offset..end]; + let seq = next_seq.fetch_add(1, Ordering::Relaxed); + let t_ms = now_ms(); + let chunk = Arc::new(Chunk { seq, t_ms, stream, bytes: Bytes::copy_from_slice(slice) }); + // push to ring + { + let mut q = match ring.lock() { Ok(g) => g, Err(p) => p.into_inner() }; + q.push_back(chunk.clone()); + } + { + let mut used = used_bytes.lock().unwrap_or_else(|p| p.into_inner()); + *used += slice.len(); + tail_seq.store(seq, Ordering::Relaxed); + // evict if needed + let cap = ring_bytes_capacity.load(Ordering::Relaxed); + if *used > cap { + let mut q = match ring.lock() { Ok(g) => g, Err(p) => p.into_inner() }; + while *used > cap { + if let Some(front) = q.pop_front() { + *used -= front.bytes.len(); + dropped_bytes_total.fetch_add(front.bytes.len() as u64, Ordering::Relaxed); + head_seq.store(front.seq.saturating_add(1), Ordering::Relaxed); + } else { break; } + } + } + } + // broadcast + let _ = sender.send(chunk); + + offset = end; + } +} diff --git a/packages/react-native-uniffi-russh/rust/uniffi-russh/src/utils.rs b/packages/react-native-uniffi-russh/rust/uniffi-russh/src/utils.rs new file mode 100644 index 0000000..4e70b15 --- /dev/null +++ b/packages/react-native-uniffi-russh/rust/uniffi-russh/src/utils.rs @@ -0,0 +1,49 @@ + + + +fn now_ms() -> f64 { + let d = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default(); + d.as_millis() as f64 +} + + + + +// TODO: Split this into different errors for each public function +#[derive(Debug, Error, uniffi::Error)] +pub enum SshError { + #[error("Disconnected")] + Disconnected, + #[error("Unsupported key type")] + UnsupportedKeyType, + #[error("Auth failed: {0}")] + Auth(String), + #[error("Shell already running")] + ShellAlreadyRunning, + #[error("russh error: {0}")] + Russh(String), + #[error("russh-keys error: {0}")] + RusshKeys(String), +} +impl From for SshError { + fn from(e: russh::Error) -> Self { SshError::Russh(e.to_string()) } +} +impl From for SshError { + fn from(e: russh_keys::Error) -> Self { SshError::RusshKeys(e.to_string()) } +} +impl From for SshError { + fn from(e: ssh_key::Error) -> Self { SshError::RusshKeys(e.to_string()) } +} +impl From for SshError { + fn from(e: russh_ssh_key::Error) -> Self { SshError::RusshKeys(e.to_string()) } +} +impl From for SshError { + fn from(e: std::io::Error) -> Self { SshError::Russh(e.to_string()) } +} +impl From for SshError { + fn from(a: russh::client::AuthResult) -> Self { + SshError::Auth(format!("{a:?}")) + } +}