From 0fa28b2134a14740413a4e6c8e9502a5b1ef8ef0 Mon Sep 17 00:00:00 2001 From: EthanShoeDev <13422990+EthanShoeDev@users.noreply.github.com> Date: Wed, 17 Sep 2025 22:06:54 -0400 Subject: [PATCH] New rust lib passing lint --- .../rust/uniffi-russh/src/lib.rs | 478 +++++++++++++----- packages/react-native-uniffi-russh/src/api.ts | 381 ++++++++------ 2 files changed, 580 insertions(+), 279 deletions(-) diff --git a/packages/react-native-uniffi-russh/rust/uniffi-russh/src/lib.rs b/packages/react-native-uniffi-russh/rust/uniffi-russh/src/lib.rs index 0653ae9..a72febf 100644 --- a/packages/react-native-uniffi-russh/rust/uniffi-russh/src/lib.rs +++ b/packages/react-native-uniffi-russh/rust/uniffi-russh/src/lib.rs @@ -7,35 +7,22 @@ use std::collections::HashMap; use std::fmt; -use std::sync::{Arc, Mutex, Weak}; -use std::time::{SystemTime, UNIX_EPOCH}; +use std::sync::{atomic::{AtomicU64, AtomicUsize, Ordering}, Arc, Mutex, Weak}; +use std::time::{SystemTime, UNIX_EPOCH, Duration}; use rand::rngs::OsRng; use thiserror::Error; -use tokio::sync::Mutex as AsyncMutex; +use tokio::sync::{broadcast, Mutex as AsyncMutex}; use russh::{self, client, ChannelMsg, Disconnect}; use russh::client::{Config as ClientConfig, Handle as ClientHandle}; use russh_keys::{Algorithm as KeyAlgorithm, EcdsaCurve, PrivateKey}; use russh_keys::ssh_key::{self, LineEnding}; -use once_cell::sync::Lazy; +use bytes::Bytes; uniffi::setup_scaffolding!(); -// Simpler aliases to satisfy clippy type-complexity. -type ListenerEntry = (u64, Arc); -type ListenerList = Vec; - -// Type aliases to keep static types simple and satisfy clippy. -type ConnectionId = String; -type ChannelId = u32; -type ShellKey = (ConnectionId, ChannelId); -type ConnMap = HashMap>; -type ShellMap = HashMap>; - -// ---------- Global registries (strong references; lifecycle managed explicitly) ---------- -static CONNECTIONS: Lazy> = Lazy::new(|| Mutex::new(HashMap::new())); -static SHELLS: Lazy> = Lazy::new(|| Mutex::new(HashMap::new())); +// No global registries; handles are the only access points. /// ---------- Types ---------- @@ -130,10 +117,30 @@ pub trait StatusListener: Send + Sync { fn on_change(&self, status: SSHConnectionStatus); } -/// Channel data callback (stdout/stderr unified) +// Stream kind for terminal output +#[derive(Debug, Clone, Copy, PartialEq, uniffi::Enum)] +pub enum StreamKind { Stdout, Stderr } + +#[derive(Debug, Clone, PartialEq, uniffi::Record)] +pub struct TerminalChunk { + pub seq: u64, + pub t_ms: f64, + pub stream: StreamKind, + pub bytes: Vec, +} + +#[derive(Debug, Clone, PartialEq, uniffi::Record)] +pub struct DroppedRange { pub from_seq: u64, pub to_seq: u64 } + +#[derive(Debug, Clone, PartialEq, uniffi::Enum)] +pub enum ShellEvent { + Chunk(TerminalChunk), + Dropped { from_seq: u64, to_seq: u64 }, +} + #[uniffi::export(with_foreign)] -pub trait ChannelListener: Send + Sync { - fn on_data(&self, data: Vec); +pub trait ShellListener: Send + Sync { + fn on_event(&self, ev: ShellEvent); } /// Key types for generation @@ -152,6 +159,38 @@ pub struct StartShellOptions { pub on_status_change: Option>, } +#[derive(Debug, Clone, PartialEq, uniffi::Enum)] +pub enum Cursor { + Head, + TailBytes { bytes: u64 }, + Seq { seq: u64 }, + TimeMs { t_ms: f64 }, + Live, +} + +#[derive(Debug, Clone, PartialEq, uniffi::Record)] +pub struct ListenerOptions { + pub cursor: Cursor, + pub coalesce_ms: Option, +} + +#[derive(Debug, Clone, PartialEq, uniffi::Record)] +pub struct BufferReadResult { + pub chunks: Vec, + pub next_seq: u64, + pub dropped: Option, +} + +#[derive(Debug, Clone, PartialEq, uniffi::Record)] +pub struct BufferStats { + pub ring_bytes: u64, + pub used_bytes: u64, + pub chunks: u64, + pub head_seq: u64, + pub tail_seq: u64, + pub dropped_bytes_total: u64, +} + /// Snapshot of current connection info for property-like access in TS. #[derive(Debug, Clone, PartialEq, uniffi::Record)] pub struct SshConnectionInfo { @@ -186,10 +225,6 @@ pub struct SSHConnection { // Weak self for child sessions to refer back without cycles. self_weak: AsyncMutex>, - - // Data listeners for whatever shell is active. We track by id for removal. - listeners: Arc>, - next_listener_id: Arc>, // simple counter guarded by same kind of mutex } #[derive(uniffi::Object)] @@ -204,20 +239,43 @@ pub struct ShellSession { shell_status_listener: Option>, created_at_ms: f64, pty: PtyType, + + // Ring buffer + ring: Arc>>>, + ring_bytes_capacity: Arc, + used_bytes: Arc>, + dropped_bytes_total: Arc, + head_seq: Arc, + tail_seq: Arc, + + // Live broadcast + sender: broadcast::Sender>, + + // Listener tasks management + listener_tasks: Arc>>>, + next_listener_id: AtomicU64, + default_coalesce_ms: AtomicU64, } impl fmt::Debug for SSHConnection { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let listeners_len = self.listeners.lock().map(|v| v.len()).unwrap_or(0); f.debug_struct("SSHConnection") .field("connection_details", &self.connection_details) .field("created_at_ms", &self.created_at_ms) .field("tcp_established_at_ms", &self.tcp_established_at_ms) - .field("listeners_len", &listeners_len) .finish() } } +// Internal chunk type kept in ring/broadcast +#[derive(Debug)] +struct Chunk { + seq: u64, + t_ms: f64, + stream: StreamKind, + bytes: Bytes, +} + /// Minimal client::Handler. struct NoopHandler; impl client::Handler for NoopHandler { @@ -247,21 +305,6 @@ impl SSHConnection { } } - /// Add a channel listener and get an id you can later remove with. - pub fn add_channel_listener(&self, listener: Arc) -> u64 { - let mut guard = self.listeners.lock().unwrap(); - let mut id_guard = self.next_listener_id.lock().unwrap(); - let id = *id_guard + 1; - *id_guard = id; - guard.push((id, listener)); - id - } - pub fn remove_channel_listener(&self, id: u64) { - if let Ok(mut v) = self.listeners.lock() { - v.retain(|(lid, _)| *lid != id); - } - } - /// Start a shell with the given PTY. Emits only Shell* statuses via options.on_status_change. pub async fn start_shell(&self, opts: StartShellOptions) -> Result, SshError> { // Prevent double-start (safe default). @@ -286,24 +329,60 @@ impl SSHConnection { // Split for read/write; spawn reader. let (mut reader, writer) = ch.split(); - let listeners = self.listeners.clone(); + + // Setup ring + broadcast for this session + let (tx, _rx) = broadcast::channel::>(1024); + let ring = Arc::new(Mutex::new(std::collections::VecDeque::>::new())); + let used_bytes = Arc::new(Mutex::new(0usize)); + let next_seq = Arc::new(AtomicU64::new(1)); + let head_seq = Arc::new(AtomicU64::new(1)); + let tail_seq = Arc::new(AtomicU64::new(0)); + let dropped_bytes_total = Arc::new(AtomicU64::new(0)); + let ring_bytes_capacity = Arc::new(AtomicUsize::new(2 * 1024 * 1024)); // default 2MiB + let default_coalesce_ms = AtomicU64::new(16); // default 16ms + + let ring_clone = ring.clone(); + let used_bytes_clone = used_bytes.clone(); + let tx_clone = tx.clone(); + let ring_bytes_capacity_c = ring_bytes_capacity.clone(); + let dropped_bytes_total_c = dropped_bytes_total.clone(); + let head_seq_c = head_seq.clone(); + let tail_seq_c = tail_seq.clone(); + let next_seq_c = next_seq.clone(); let shell_listener_for_task = shell_status_listener.clone(); let reader_task = tokio::spawn(async move { + let max_chunk = 16 * 1024; // 16KB loop { match reader.wait().await { Some(ChannelMsg::Data { data }) => { - if let Ok(cl) = listeners.lock() { - let snapshot = cl.clone(); - let buf = data.to_vec(); - for (_, l) in snapshot { l.on_data(buf.clone()); } - } + append_and_broadcast( + &data, + StreamKind::Stdout, + &ring_clone, + &used_bytes_clone, + &ring_bytes_capacity_c, + &dropped_bytes_total_c, + &head_seq_c, + &tail_seq_c, + &next_seq_c, + &tx_clone, + max_chunk, + ); } Some(ChannelMsg::ExtendedData { data, .. }) => { - if let Ok(cl) = listeners.lock() { - let snapshot = cl.clone(); - let buf = data.to_vec(); - for (_, l) in snapshot { l.on_data(buf.clone()); } - } + append_and_broadcast( + &data, + StreamKind::Stderr, + &ring_clone, + &used_bytes_clone, + &ring_bytes_capacity_c, + &dropped_bytes_total_c, + &head_seq_c, + &tail_seq_c, + &next_seq_c, + &tx_clone, + max_chunk, + ); } Some(ChannelMsg::Close) | None => { if let Some(sl) = shell_listener_for_task.as_ref() { @@ -311,7 +390,7 @@ impl SSHConnection { } break; } - _ => { /* ignore others */ } + _ => {} } } }); @@ -324,16 +403,20 @@ impl SSHConnection { shell_status_listener, created_at_ms: now_ms(), pty, + ring, + ring_bytes_capacity, + used_bytes, + dropped_bytes_total, + head_seq, + tail_seq, + sender: tx, + listener_tasks: Arc::new(Mutex::new(HashMap::new())), + next_listener_id: AtomicU64::new(1), + default_coalesce_ms, }); *self.shell.lock().await = Some(session.clone()); - // Register shell in global registry - if let Some(parent) = self.self_weak.lock().await.upgrade() { - let key = (parent.connection_id.clone(), channel_id); - if let Ok(mut map) = SHELLS.lock() { map.insert(key, session.clone()); } - } - // Report ShellConnected. if let Some(sl) = session.shell_status_listener.as_ref() { sl.on_change(SSHConnectionStatus::ShellConnected); @@ -342,12 +425,7 @@ impl SSHConnection { Ok(session) } - /// Send bytes to the active shell (stdin). - pub async fn send_data(&self, data: Vec) -> Result<(), SshError> { - let guard = self.shell.lock().await; - let session = guard.as_ref().ok_or(SshError::Disconnected)?; - session.send_data(data).await - } + // Note: send_data now lives on ShellSession // No exported close_shell: shell closure is handled via ShellSession::close() @@ -360,8 +438,6 @@ impl SSHConnection { let h = self.handle.lock().await; h.disconnect(Disconnect::ByApplication, "bye", "").await?; - // Remove from registry after disconnect - if let Ok(mut map) = CONNECTIONS.lock() { map.remove(&self.connection_id); } Ok(()) } } @@ -386,6 +462,160 @@ impl ShellSession { /// Close the associated shell channel and stop its reader task. pub async fn close(&self) -> Result<(), SshError> { self.close_internal().await } + + /// Configure ring buffer policy. + pub async fn set_buffer_policy(&self, ring_bytes: Option, coalesce_ms: Option) { + if let Some(rb) = ring_bytes { self.ring_bytes_capacity.store(rb as usize, Ordering::Relaxed); self.evict_if_needed(); } + if let Some(cm) = coalesce_ms { self.default_coalesce_ms.store(cm as u64, Ordering::Relaxed); } + } + + /// Buffer statistics snapshot. + pub fn buffer_stats(&self) -> BufferStats { + let used = *self.used_bytes.lock().unwrap() as u64; + let chunks = self.ring.lock().map(|q| q.len() as u64).unwrap_or(0); + BufferStats { + ring_bytes: self.ring_bytes_capacity.load(Ordering::Relaxed) as u64, + used_bytes: used, + chunks, + head_seq: self.head_seq.load(Ordering::Relaxed), + tail_seq: self.tail_seq.load(Ordering::Relaxed), + dropped_bytes_total: self.dropped_bytes_total.load(Ordering::Relaxed), + } + } + + /// Current next sequence number. + pub fn current_seq(&self) -> u64 { self.tail_seq.load(Ordering::Relaxed).saturating_add(1) } + + /// Read the ring buffer from a cursor. + pub fn read_buffer(&self, cursor: Cursor, max_bytes: Option) -> BufferReadResult { + let max_total = max_bytes.unwrap_or(512 * 1024) as usize; // default 512KB + let mut out_chunks: Vec = Vec::new(); + let mut dropped: Option = None; + let head_seq_now = self.head_seq.load(Ordering::Relaxed); + let tail_seq_now = self.tail_seq.load(Ordering::Relaxed); + + // Lock ring to determine start and collect arcs, then drop lock. + let (_start_idx_unused, _start_seq, arcs): (usize, u64, Vec>) = { + let ring = self.ring.lock().unwrap(); + let (start_seq, idx) = match cursor { + Cursor::Head => (head_seq_now, 0usize), + Cursor::Seq { seq: mut s } => { + if s < head_seq_now { dropped = Some(DroppedRange { from_seq: s, to_seq: head_seq_now - 1 }); s = head_seq_now; } + let idx = s.saturating_sub(head_seq_now) as usize; + (s, idx.min(ring.len())) + } + Cursor::TimeMs { t_ms: t } => { + // linear scan to find first chunk with t_ms >= t + let mut idx = 0usize; let mut s = head_seq_now; + for (i, ch) in ring.iter().enumerate() { if ch.t_ms >= t { idx = i; s = ch.seq; break; } } + (s, idx) + } + Cursor::TailBytes { bytes: n } => { + // Walk from tail backwards until approx n bytes, then forward. + let mut bytes = 0usize; let mut idx = ring.len(); + for i in (0..ring.len()).rev() { + let b = ring[i].bytes.len(); + if bytes >= n as usize { idx = i + 1; break; } + bytes += b; idx = i; + } + let s = if idx < ring.len() { ring[idx].seq } else { tail_seq_now.saturating_add(1) }; + (s, idx) + } + Cursor::Live => (tail_seq_now.saturating_add(1), ring.len()), + }; + let arcs: Vec> = ring.iter().skip(idx).cloned().collect(); + (idx, start_seq, arcs) + }; + + // Build output respecting max_bytes + let mut total = 0usize; + for ch in arcs { + let len = ch.bytes.len(); + if total + len > max_total { break; } + out_chunks.push(TerminalChunk { seq: ch.seq, t_ms: ch.t_ms, stream: ch.stream, bytes: ch.bytes.clone().to_vec() }); + total += len; + } + let next_seq = if let Some(last) = out_chunks.last() { last.seq + 1 } else { tail_seq_now.saturating_add(1) }; + BufferReadResult { chunks: out_chunks, next_seq, dropped } + } + + /// Add a listener with optional replay and live follow. + pub fn add_listener(&self, listener: Arc, opts: ListenerOptions) -> u64 { + // Synchronous replay phase + let replay = self.read_buffer(opts.cursor.clone(), None); + if let Some(dr) = replay.dropped.as_ref() { listener.on_event(ShellEvent::Dropped { from_seq: dr.from_seq, to_seq: dr.to_seq }); } + for ch in replay.chunks { listener.on_event(ShellEvent::Chunk(ch)); } + + // Live phase + let mut rx = self.sender.subscribe(); + let id = self.next_listener_id.fetch_add(1, Ordering::Relaxed); + let default_coalesce_ms = self.default_coalesce_ms.load(Ordering::Relaxed) as u32; + let coalesce_ms = opts.coalesce_ms.unwrap_or(default_coalesce_ms); + + let handle = tokio::spawn(async move { + let mut last_seq_seen: u64 = replay.next_seq.saturating_sub(1); + let mut acc: Vec = Vec::new(); + let mut acc_stream: Option; + let mut acc_last_seq: u64; + let mut acc_last_t: f64; + let window = Duration::from_millis(coalesce_ms as u64); + let mut pending_drop_from: Option = None; + + loop { + // First receive an item + let first = match rx.recv().await { + Ok(c) => c, + Err(broadcast::error::RecvError::Lagged(_n)) => { pending_drop_from = Some(last_seq_seen.saturating_add(1)); continue; } + Err(broadcast::error::RecvError::Closed) => break, + }; + if let Some(from) = pending_drop_from.take() { + if from <= first.seq.saturating_sub(1) { + listener.on_event(ShellEvent::Dropped { from_seq: from, to_seq: first.seq - 1 }); + } + } + // Start accumulating + acc.clear(); acc_stream = Some(first.stream); acc_last_seq = first.seq; acc_last_t = first.t_ms; acc.extend_from_slice(&first.bytes); + last_seq_seen = first.seq; + + // Drain within window while same stream + let mut deadline = tokio::time::Instant::now() + window; + loop { + let timeout = tokio::time::sleep_until(deadline); + tokio::pin!(timeout); + tokio::select! { + _ = &mut timeout => break, + msg = rx.recv() => { + match msg { + Ok(c) => { + if Some(c.stream) == acc_stream { acc.extend_from_slice(&c.bytes); acc_last_seq = c.seq; acc_last_t = c.t_ms; last_seq_seen = c.seq; } + else { // flush and start new + let chunk = TerminalChunk { seq: acc_last_seq, t_ms: acc_last_t, stream: acc_stream.unwrap_or(StreamKind::Stdout), bytes: std::mem::take(&mut acc) }; + listener.on_event(ShellEvent::Chunk(chunk)); + acc_stream = Some(c.stream); acc_last_seq = c.seq; acc_last_t = c.t_ms; acc.extend_from_slice(&c.bytes); last_seq_seen = c.seq; + deadline = tokio::time::Instant::now() + window; + } + } + Err(broadcast::error::RecvError::Lagged(_n)) => { pending_drop_from = Some(last_seq_seen.saturating_add(1)); break; } + Err(broadcast::error::RecvError::Closed) => { break; } + } + } + } + } + if let Some(s) = acc_stream.take() { + let chunk = TerminalChunk { seq: acc_last_seq, t_ms: acc_last_t, stream: s, bytes: std::mem::take(&mut acc) }; + listener.on_event(ShellEvent::Chunk(chunk)); + } + } + }); + if let Ok(mut map) = self.listener_tasks.lock() { map.insert(id, handle); } + id + } + + pub fn remove_listener(&self, id: u64) { + if let Ok(mut map) = self.listener_tasks.lock() { + if let Some(h) = map.remove(&id) { h.abort(); } + } + } } // Internal lifecycle helpers (not exported via UniFFI) @@ -403,13 +633,22 @@ impl ShellSession { if let Some(current) = guard.as_ref() { if current.channel_id == self.channel_id { *guard = None; } } - // Remove from registry - if let Ok(mut map) = SHELLS.lock() { - map.remove(&(parent.connection_id.clone(), self.channel_id)); - } } Ok(()) } + + fn evict_if_needed(&self) { + let cap = self.ring_bytes_capacity.load(Ordering::Relaxed); + let mut ring = self.ring.lock().unwrap(); + let mut used = self.used_bytes.lock().unwrap(); + while *used > cap { + if let Some(front) = ring.pop_front() { + *used -= front.bytes.len(); + self.dropped_bytes_total.fetch_add(front.bytes.len() as u64, Ordering::Relaxed); + self.head_seq.store(front.seq.saturating_add(1), Ordering::Relaxed); + } else { break; } + } + } } /// ---------- Top-level API ---------- @@ -459,57 +698,12 @@ pub async fn connect(options: ConnectOptions) -> Result, SshE handle: AsyncMutex::new(handle), shell: AsyncMutex::new(None), self_weak: AsyncMutex::new(Weak::new()), - listeners: Arc::new(Mutex::new(Vec::new())), - next_listener_id: Arc::new(Mutex::new(0)), }); // Initialize weak self reference. *conn.self_weak.lock().await = Arc::downgrade(&conn); - // Register connection in global registry (strong ref; explicit lifecycle) - if let Ok(mut map) = CONNECTIONS.lock() { map.insert(conn.connection_id.clone(), conn.clone()); } Ok(conn) } -/// ---------- Registry/listing API ---------- - -#[uniffi::export] -pub fn list_ssh_connections() -> Vec { - // Collect clones outside the lock to avoid holding a MutexGuard across await - let conns: Vec> = CONNECTIONS - .lock() - .map(|map| map.values().cloned().collect()) - .unwrap_or_default(); - let mut out = Vec::with_capacity(conns.len()); - for conn in conns { out.push(conn.info()); } - out -} - -#[uniffi::export] -pub fn list_ssh_shells() -> Vec { - // Collect shells outside the lock to avoid holding a MutexGuard across await - let shells: Vec> = SHELLS - .lock() - .map(|map| map.values().cloned().collect()) - .unwrap_or_default(); - let mut out = Vec::with_capacity(shells.len()); - for shell in shells { out.push(shell.info()); } - out -} - -#[uniffi::export] -pub fn get_ssh_connection(id: String) -> Result, SshError> { - if let Ok(map) = CONNECTIONS.lock() { if let Some(conn) = map.get(&id) { return Ok(conn.clone()); } } - Err(SshError::Disconnected) -} - -// list_ssh_shells_for_connection removed; derive in JS from list_ssh_connections + get_ssh_shell - -#[uniffi::export] -pub fn get_ssh_shell(connection_id: String, channel_id: u32) -> Result, SshError> { - let key = (connection_id, channel_id); - if let Ok(map) = SHELLS.lock() { if let Some(shell) = map.get(&key) { return Ok(shell.clone()); } } - Err(SshError::Disconnected) -} - #[uniffi::export(async_runtime = "tokio")] pub async fn generate_key_pair(key_type: KeyType) -> Result { let mut rng = OsRng; @@ -532,3 +726,53 @@ fn now_ms() -> f64 { .unwrap_or_default(); d.as_millis() as f64 } + +#[allow(clippy::too_many_arguments)] +fn append_and_broadcast( + data: &[u8], + stream: StreamKind, + ring: &Arc>>>, + used_bytes: &Arc>, + ring_bytes_capacity: &Arc, + dropped_bytes_total: &Arc, + head_seq: &Arc, + tail_seq: &Arc, + next_seq: &Arc, + sender: &broadcast::Sender>, + max_chunk: usize, +) { + let mut offset = 0usize; + while offset < data.len() { + let end = (offset + max_chunk).min(data.len()); + let slice = &data[offset..end]; + let seq = next_seq.fetch_add(1, Ordering::Relaxed); + let t_ms = now_ms(); + let chunk = Arc::new(Chunk { seq, t_ms, stream, bytes: Bytes::copy_from_slice(slice) }); + // push to ring + { + let mut q = ring.lock().unwrap(); + q.push_back(chunk.clone()); + } + { + let mut used = used_bytes.lock().unwrap(); + *used += slice.len(); + tail_seq.store(seq, Ordering::Relaxed); + // evict if needed + let cap = ring_bytes_capacity.load(Ordering::Relaxed); + if *used > cap { + let mut q = ring.lock().unwrap(); + while *used > cap { + if let Some(front) = q.pop_front() { + *used -= front.bytes.len(); + dropped_bytes_total.fetch_add(front.bytes.len() as u64, Ordering::Relaxed); + head_seq.store(front.seq.saturating_add(1), Ordering::Relaxed); + } else { break; } + } + } + } + // broadcast + let _ = sender.send(chunk); + + offset = end; + } +} diff --git a/packages/react-native-uniffi-russh/src/api.ts b/packages/react-native-uniffi-russh/src/api.ts index ab6923c..34befea 100644 --- a/packages/react-native-uniffi-russh/src/api.ts +++ b/packages/react-native-uniffi-russh/src/api.ts @@ -1,7 +1,14 @@ /** * We cannot make the generated code match this API exactly because uniffi * - Doesn't support ts literals for rust enums - * - Doesn't support passing a js object with methods and properties to rust + * - Doesn't support passing a js object with methods and properties to or from rust. + * + * The second issue is much harder to get around than the first. + * In practice it means that if you want to pass an object with callbacks and props to rust, it need to be in seperate args. + * If you want to pass an object with callbacks and props from rust to js (like ssh handles), you need to instead only pass an object with callbacks + * just make one of the callbacks a sync info() callback. + * + * Then in this api wrapper we can smooth over those rough edges. * See: - https://jhugman.github.io/uniffi-bindgen-react-native/idioms/callback-interfaces.html */ import * as GeneratedRussh from './index'; @@ -9,6 +16,10 @@ import * as GeneratedRussh from './index'; // #region Ideal API +// ───────────────────────────────────────────────────────────────────────────── +// Core types +// ───────────────────────────────────────────────────────────────────────────── + export type ConnectionDetails = { host: string; port: number; @@ -18,6 +29,17 @@ export type ConnectionDetails = { | { type: 'key'; privateKey: string }; }; +export type SshConnectionStatus = + | 'tcpConnecting' + | 'tcpConnected' + | 'tcpDisconnected' + | 'shellConnecting' + | 'shellConnected' + | 'shellDisconnected'; + +export type PtyType = + | 'Vanilla' | 'Vt100' | 'Vt102' | 'Vt220' | 'Ansi' | 'Xterm' | 'Xterm256'; + export type ConnectOptions = ConnectionDetails & { onStatusChange?: (status: SshConnectionStatus) => void; abortSignal?: AbortSignal; @@ -27,56 +49,97 @@ export type StartShellOptions = { pty: PtyType; onStatusChange?: (status: SshConnectionStatus) => void; abortSignal?: AbortSignal; -} +}; + +export type StreamKind = 'stdout' | 'stderr'; + +export type TerminalChunk = { + /** Monotonic sequence number from the shell start (Rust u64; JS uses number). */ + seq: number; + /** Milliseconds since UNIX epoch (double). */ + tMs: number; + stream: StreamKind; + bytes: Uint8Array; +}; + +export type DropNotice = { kind: 'dropped'; fromSeq: number; toSeq: number }; +export type ListenerEvent = TerminalChunk | DropNotice; + +export type Cursor = + | { mode: 'head' } // earliest available in ring + | { mode: 'tailBytes'; bytes: number } // last N bytes (best-effort) + | { mode: 'seq'; seq: number } // from a given sequence + | { mode: 'time'; tMs: number } // from timestamp + | { mode: 'live' }; // no replay, live only + +export type ListenerOptions = { + cursor: Cursor; + /** Optional per-listener coalescing window in ms (e.g., 10–25). */ + coalesceMs?: number; +}; + +export type BufferStats = { + ringBytes: number; // configured capacity + usedBytes: number; // current usage + chunks: number; // chunks kept + headSeq: number; // oldest seq retained + tailSeq: number; // newest seq retained + droppedBytesTotal: number; // cumulative eviction +}; + +export type BufferReadResult = { + chunks: TerminalChunk[]; + nextSeq: number; + dropped?: { fromSeq: number; toSeq: number }; +}; + +// ───────────────────────────────────────────────────────────────────────────── +// Handles +// ───────────────────────────────────────────────────────────────────────────── + export type SshConnection = { - connectionId: string; + readonly connectionId: string; readonly createdAtMs: number; readonly tcpEstablishedAtMs: number; readonly connectionDetails: ConnectionDetails; - startShell: (params: StartShellOptions) => Promise; - addChannelListener: (listener: (data: ArrayBuffer) => void) => bigint; - removeChannelListener: (id: bigint) => void; - disconnect: (params?: { signal: AbortSignal }) => Promise; + + startShell: (opts: StartShellOptions) => Promise; + disconnect: (opts?: { signal?: AbortSignal }) => Promise; }; -export type SshShellSession = { +export type SshShell = { readonly channelId: number; readonly createdAtMs: number; - readonly pty: GeneratedRussh.PtyType; + readonly pty: PtyType; readonly connectionId: string; - sendData: ( - data: ArrayBuffer, - options?: { signal: AbortSignal } - ) => Promise; - close: (params?: { signal: AbortSignal }) => Promise; + + // I/O + sendData: (data: ArrayBuffer, opts?: { signal?: AbortSignal }) => Promise; + close: (opts?: { signal?: AbortSignal }) => Promise; + + // Buffer policy & stats + setBufferPolicy: (policy: { ringBytes?: number; coalesceMs?: number }) => Promise; + bufferStats: () => Promise; + currentSeq: () => Promise; + + // Replay + live + readBuffer: (cursor: Cursor, maxBytes?: number) => Promise; + addListener: ( + cb: (ev: ListenerEvent) => void, + opts: ListenerOptions + ) => bigint; + removeListener: (id: bigint) => void; }; - type RusshApi = { - connect: (options: ConnectOptions) => Promise; - - getSshConnection: (id: string) => SshConnection | undefined; - getSshShell: (connectionId: string, channelId: number) => SshShellSession | undefined; - listSshConnections: () => SshConnection[]; - listSshShells: () => SshShellSession[]; - listSshConnectionsWithShells: () => (SshConnection & { shells: SshShellSession[] })[]; - - generateKeyPair: (type: PrivateKeyType) => Promise; - uniffiInitAsync: () => Promise; -} + connect: (opts: ConnectOptions) => Promise; + generateKeyPair: (type: 'rsa' | 'ecdsa' | 'ed25519') => Promise; +}; // #endregion -// #region Weird stuff we have to do to get uniffi to have that ideal API - -const privateKeyTypeLiteralToEnum = { - rsa: GeneratedRussh.KeyType.Rsa, - ecdsa: GeneratedRussh.KeyType.Ecdsa, - ed25519: GeneratedRussh.KeyType.Ed25519, -} as const satisfies Record; -export type PrivateKeyType = keyof typeof privateKeyTypeLiteralToEnum; - +// #region Wrapper to match the ideal API const ptyTypeLiteralToEnum = { Vanilla: GeneratedRussh.PtyType.Vanilla, @@ -87,8 +150,16 @@ const ptyTypeLiteralToEnum = { Xterm: GeneratedRussh.PtyType.Xterm, Xterm256: GeneratedRussh.PtyType.Xterm256, } as const satisfies Record; -export type PtyType = keyof typeof ptyTypeLiteralToEnum; +const ptyEnumToLiteral: Record = { + [GeneratedRussh.PtyType.Vanilla]: 'Vanilla', + [GeneratedRussh.PtyType.Vt100]: 'Vt100', + [GeneratedRussh.PtyType.Vt102]: 'Vt102', + [GeneratedRussh.PtyType.Vt220]: 'Vt220', + [GeneratedRussh.PtyType.Ansi]: 'Ansi', + [GeneratedRussh.PtyType.Xterm]: 'Xterm', + [GeneratedRussh.PtyType.Xterm256]: 'Xterm256', +}; const sshConnStatusEnumToLiteral = { [GeneratedRussh.SshConnectionStatus.TcpConnecting]: 'tcpConnecting', @@ -97,160 +168,151 @@ const sshConnStatusEnumToLiteral = { [GeneratedRussh.SshConnectionStatus.ShellConnecting]: 'shellConnecting', [GeneratedRussh.SshConnectionStatus.ShellConnected]: 'shellConnected', [GeneratedRussh.SshConnectionStatus.ShellDisconnected]: 'shellDisconnected', -} as const satisfies Record; -export type SshConnectionStatus = (typeof sshConnStatusEnumToLiteral)[keyof typeof sshConnStatusEnumToLiteral]; +} as const satisfies Record; +const streamEnumToLiteral = { + [GeneratedRussh.StreamKind.Stdout]: 'stdout', + [GeneratedRussh.StreamKind.Stderr]: 'stderr', +} as const satisfies Record; function generatedConnDetailsToIdeal(details: GeneratedRussh.ConnectionDetails): ConnectionDetails { + const security: ConnectionDetails['security'] = details.security instanceof GeneratedRussh.Security.Password + ? { type: 'password', password: details.security.inner.password } + : { type: 'key', privateKey: details.security.inner.keyId }; + return { host: details.host, port: details.port, username: details.username, security }; +} + +function cursorToGenerated(cursor: Cursor): GeneratedRussh.Cursor { + switch (cursor.mode) { + case 'head': + return new GeneratedRussh.Cursor.Head(); + case 'tailBytes': + return new GeneratedRussh.Cursor.TailBytes({ bytes: BigInt(cursor.bytes) }); + case 'seq': + return new GeneratedRussh.Cursor.Seq({ seq: BigInt(cursor.seq) }); + case 'time': + return new GeneratedRussh.Cursor.TimeMs({ tMs: cursor.tMs }); + case 'live': + return new GeneratedRussh.Cursor.Live(); + } +} + +function toTerminalChunk(ch: GeneratedRussh.TerminalChunk): TerminalChunk { return { - host: details.host, - port: details.port, - username: details.username, - security: details.security instanceof GeneratedRussh.Security.Password ? { type: 'password', password: details.security.inner.password } : { type: 'key', privateKey: details.security.inner.keyId }, + seq: Number(ch.seq), + tMs: ch.tMs, + stream: streamEnumToLiteral[ch.stream], + bytes: new Uint8Array(ch.bytes as any), }; } -function wrapConnection(conn: GeneratedRussh.SshConnectionInterface): SshConnection { - // Wrap startShell in-place to preserve the UniFFI object's internal pointer. - const originalStartShell = conn.startShell.bind(conn); - const betterStartShell = async (params: StartShellOptions) => { - const shell = await originalStartShell( - { - pty: ptyTypeLiteralToEnum[params.pty], - onStatusChange: params.onStatusChange - ? { onChange: (statusEnum) => params.onStatusChange?.(sshConnStatusEnumToLiteral[statusEnum]!) } - : undefined, - }, - params.abortSignal ? { signal: params.abortSignal } : undefined, - ); - return wrapShellSession(shell); - }; - - // Accept a function for onData and adapt to the generated listener object. - const originalAddChannelListener = conn.addChannelListener.bind(conn); - const betterAddChannelListener = (listener: (data: ArrayBuffer) => void) => - originalAddChannelListener({ onData: listener }); - - const connInfo = conn.info(); - return { - connectionId: connInfo.connectionId, - connectionDetails: generatedConnDetailsToIdeal(connInfo.connectionDetails), - createdAtMs: connInfo.createdAtMs, - tcpEstablishedAtMs: connInfo.tcpEstablishedAtMs, - startShell: betterStartShell, - addChannelListener: betterAddChannelListener, - removeChannelListener: conn.removeChannelListener.bind(conn), - disconnect: conn.disconnect.bind(conn), - }; -} - -function wrapShellSession(shell: GeneratedRussh.ShellSessionInterface): SshShellSession { +function wrapShellSession(shell: GeneratedRussh.ShellSessionInterface): SshShell { const info = shell.info(); + const setBufferPolicy: SshShell['setBufferPolicy'] = async (policy) => { + await shell.setBufferPolicy(policy.ringBytes != null ? BigInt(policy.ringBytes) : undefined, policy.coalesceMs); + }; + + const bufferStats: SshShell['bufferStats'] = async () => { + const s = shell.bufferStats(); + return { + ringBytes: Number(s.ringBytes), + usedBytes: Number(s.usedBytes), + chunks: Number(s.chunks), + headSeq: Number(s.headSeq), + tailSeq: Number(s.tailSeq), + droppedBytesTotal: Number(s.droppedBytesTotal), + }; + }; + + const readBuffer: SshShell['readBuffer'] = async (cursor, maxBytes) => { + const res = shell.readBuffer(cursorToGenerated(cursor), maxBytes != null ? BigInt(maxBytes) : undefined); + return { + chunks: res.chunks.map(toTerminalChunk), + nextSeq: Number(res.nextSeq), + dropped: res.dropped ? { fromSeq: Number(res.dropped.fromSeq), toSeq: Number(res.dropped.toSeq) } : undefined, + } satisfies BufferReadResult; + }; + + const addListener: SshShell['addListener'] = (cb, opts) => { + const listener = { + onEvent: (ev: GeneratedRussh.ShellEvent) => { + if (ev instanceof GeneratedRussh.ShellEvent.Chunk) { + cb(toTerminalChunk(ev.inner[0]!)); + } else if (ev instanceof GeneratedRussh.ShellEvent.Dropped) { + cb({ kind: 'dropped', fromSeq: Number(ev.inner.fromSeq), toSeq: Number(ev.inner.toSeq) }); + } + } + } satisfies GeneratedRussh.ShellListener; + + const id = shell.addListener(listener, { cursor: cursorToGenerated(opts.cursor), coalesceMs: opts.coalesceMs }); + return BigInt(id); + }; + return { channelId: info.channelId, createdAtMs: info.createdAtMs, - pty: info.pty, + pty: ptyEnumToLiteral[info.pty], connectionId: info.connectionId, - sendData: shell.sendData.bind(shell), - close: shell.close.bind(shell) + sendData: (data, o) => shell.sendData(data, o?.signal ? { signal: o.signal } : undefined), + close: (o) => shell.close(o?.signal ? { signal: o.signal } : undefined), + setBufferPolicy, + bufferStats, + currentSeq: async () => Number(shell.currentSeq()), + readBuffer, + addListener, + removeListener: (id) => shell.removeListener(id), + }; +} + +function wrapConnection(conn: GeneratedRussh.SshConnectionInterface): SshConnection { + const inf = conn.info(); + return { + connectionId: inf.connectionId, + connectionDetails: generatedConnDetailsToIdeal(inf.connectionDetails), + createdAtMs: inf.createdAtMs, + tcpEstablishedAtMs: inf.tcpEstablishedAtMs, + startShell: async (params) => { + const shell = await conn.startShell( + { + pty: ptyTypeLiteralToEnum[params.pty], + onStatusChange: params.onStatusChange + ? { onChange: (statusEnum) => params.onStatusChange!(sshConnStatusEnumToLiteral[statusEnum]) } + : undefined, + }, + params.abortSignal ? { signal: params.abortSignal } : undefined, + ); + return wrapShellSession(shell); + }, + disconnect: (opts) => conn.disconnect(opts?.signal ? { signal: opts.signal } : undefined), }; } async function connect(options: ConnectOptions): Promise { const security = options.security.type === 'password' - ? new GeneratedRussh.Security.Password({ - password: options.security.password, - }) + ? new GeneratedRussh.Security.Password({ password: options.security.password }) : new GeneratedRussh.Security.Key({ keyId: options.security.privateKey }); - const sshConnectionInterface = await GeneratedRussh.connect( + const sshConnection = await GeneratedRussh.connect( { host: options.host, port: options.port, username: options.username, security, onStatusChange: options.onStatusChange ? { - onChange: (statusEnum) => { - const tsLiteral = sshConnStatusEnumToLiteral[statusEnum]; - if (!tsLiteral) throw new Error(`Invalid status enum: ${statusEnum}`); - options.onStatusChange?.(tsLiteral); - }, + onChange: (statusEnum) => options.onStatusChange!(sshConnStatusEnumToLiteral[statusEnum]) } : undefined, }, - options.abortSignal - ? { - signal: options.abortSignal, - } - : undefined + options.abortSignal ? { signal: options.abortSignal } : undefined, ); - return wrapConnection(sshConnectionInterface); + return wrapConnection(sshConnection); } -// Optional registry lookups: return undefined if not found/disconnected -function getSshConnection(id: string): SshConnection | undefined { - try { - const conn = GeneratedRussh.getSshConnection(id); - return wrapConnection(conn); - } catch { - return undefined; - } +async function generateKeyPair(type: 'rsa' | 'ecdsa' | 'ed25519') { + const map = { rsa: GeneratedRussh.KeyType.Rsa, ecdsa: GeneratedRussh.KeyType.Ecdsa, ed25519: GeneratedRussh.KeyType.Ed25519 } as const; + return GeneratedRussh.generateKeyPair(map[type]); } -function getSshShell(connectionId: string, channelId: number): SshShellSession | undefined { - try { - const shell = GeneratedRussh.getSshShell(connectionId, channelId); - return wrapShellSession(shell); - } catch { - return undefined; - } -} - -function listSshConnections(): SshConnection[] { - const infos = GeneratedRussh.listSshConnections(); - const out: SshConnection[] = []; - for (const info of infos) { - try { - const conn = GeneratedRussh.getSshConnection(info.connectionId); - out.push(wrapConnection(conn)); - } catch { - // ignore entries that no longer exist between snapshot and lookup - } - } - return out; -} - -function listSshShells(): SshShellSession[] { - const infos = GeneratedRussh.listSshShells(); - const out: SshShellSession[] = []; - for (const info of infos) { - try { - const shell = GeneratedRussh.getSshShell(info.connectionId, info.channelId); - out.push(wrapShellSession(shell)); - } catch { - // ignore entries that no longer exist between snapshot and lookup - } - } - return out; -} - -/** - * TODO: This feels a bit hacky. It is probably more effecient to do this join in rust and send - * the joined result to the app. - */ -function listSshConnectionsWithShells(): (SshConnection & { shells: SshShellSession[] })[] { - const connections = listSshConnections(); - const shells = listSshShells(); - return connections.map(connection => ({ - ...connection, - shells: shells.filter(shell => shell.connectionId === connection.connectionId), - })); -} - - -async function generateKeyPair(type: PrivateKeyType) { - return GeneratedRussh.generateKeyPair(privateKeyTypeLiteralToEnum[type]); -} // #endregion @@ -258,9 +320,4 @@ export const RnRussh = { uniffiInitAsync: GeneratedRussh.uniffiInitAsync, connect, generateKeyPair, - getSshConnection, - listSshConnections, - listSshShells, - listSshConnectionsWithShells, - getSshShell, } satisfies RusshApi;