diff --git a/packages/react-native-uniffi-russh/rust/uniffi-russh/justfile b/packages/react-native-uniffi-russh/rust/uniffi-russh/justfile index da0e4f4..9e9f555 100644 --- a/packages/react-native-uniffi-russh/rust/uniffi-russh/justfile +++ b/packages/react-native-uniffi-russh/rust/uniffi-russh/justfile @@ -13,3 +13,7 @@ update-deps: # Run crate tests test: cargo test + + +fmt: + cargo fmt \ No newline at end of file diff --git a/packages/react-native-uniffi-russh/rust/uniffi-russh/src/lib.rs b/packages/react-native-uniffi-russh/rust/uniffi-russh/src/lib.rs index acb9fcb..f1c268d 100644 --- a/packages/react-native-uniffi-russh/rust/uniffi-russh/src/lib.rs +++ b/packages/react-native-uniffi-russh/rust/uniffi-russh/src/lib.rs @@ -5,26 +5,9 @@ //! - https://jhugman.github.io/uniffi-bindgen-react-native/idioms/callback-interfaces.html //! - https://jhugman.github.io/uniffi-bindgen-react-native/idioms/async-callbacks.html -use std::collections::HashMap; -use std::fmt; -use std::sync::{atomic::{AtomicU64, AtomicUsize, Ordering}, Arc, Mutex, Weak}; -use std::time::{SystemTime, UNIX_EPOCH, Duration}; - -use rand::rngs::OsRng; -use thiserror::Error; -use tokio::sync::{broadcast, Mutex as AsyncMutex}; - -use russh::{self, client, ChannelMsg, Disconnect}; -use russh::client::{Config, Handle as ClientHandle}; -use russh_keys::{Algorithm, EcdsaCurve}; -use russh::keys::PrivateKeyWithHashAlg; -use russh_keys::ssh_key::{self, LineEnding}; -// Alias the internal ssh_key re-export used by russh for type compatibility -use russh::keys::ssh_key as russh_ssh_key; -use russh_keys::ssh_key::{private::{Ed25519Keypair, KeypairData}}; -use bytes::Bytes; -use base64::Engine as _; -use ed25519_dalek::SigningKey; +pub mod private_key; +pub mod ssh_connection; +pub mod ssh_shell; +pub mod utils; uniffi::setup_scaffolding!(); - diff --git a/packages/react-native-uniffi-russh/rust/uniffi-russh/src/private_key.rs b/packages/react-native-uniffi-russh/rust/uniffi-russh/src/private_key.rs index 717c0dd..f255da3 100644 --- a/packages/react-native-uniffi-russh/rust/uniffi-russh/src/private_key.rs +++ b/packages/react-native-uniffi-russh/rust/uniffi-russh/src/private_key.rs @@ -1,5 +1,13 @@ +use rand::rngs::OsRng; +use russh::keys::ssh_key::{ + self, + private::{Ed25519Keypair, KeypairData}, +}; +use russh_keys::{Algorithm, EcdsaCurve}; +use crate::utils::SshError; +use base64::Engine as _; #[derive(Debug, Clone, Copy, PartialEq, uniffi::Enum)] pub enum KeyType { @@ -9,7 +17,6 @@ pub enum KeyType { Ed448, } - #[uniffi::export] pub fn validate_private_key(private_key_content: String) -> Result { // Normalize and parse once; return canonical OpenSSH string. @@ -22,26 +29,31 @@ pub fn generate_key_pair(key_type: KeyType) -> Result { let mut rng = OsRng; let key = match key_type { KeyType::Rsa => russh_keys::PrivateKey::random(&mut rng, Algorithm::Rsa { hash: None })?, - KeyType::Ecdsa => russh_keys::PrivateKey::random(&mut rng, Algorithm::Ecdsa { curve: EcdsaCurve::NistP256 })?, + KeyType::Ecdsa => russh_keys::PrivateKey::random( + &mut rng, + Algorithm::Ecdsa { + curve: EcdsaCurve::NistP256, + }, + )?, KeyType::Ed25519 => russh_keys::PrivateKey::random(&mut rng, Algorithm::Ed25519)?, KeyType::Ed448 => return Err(SshError::UnsupportedKeyType), }; - Ok(key.to_openssh(LineEnding::LF)?.to_string()) + Ok(key + .to_openssh(russh_keys::ssh_key::LineEnding::LF)? + .to_string()) } - - // Best-effort fix for OpenSSH ed25519 keys that store only a 32-byte seed in // the private section (instead of 64 bytes consisting of seed || public). // If the input matches an unencrypted OpenSSH ed25519 key with a 32-byte // private field, this function returns a normalized PEM string with the // correct 64-byte private field (seed || public). Otherwise, returns None. -fn normalize_openssh_ed25519_seed_key( +pub(crate) fn normalize_openssh_ed25519_seed_key( input: &str, -) -> Result<(String, russh::keys::PrivateKey), russh_ssh_key::Error> { +) -> Result<(String, russh::keys::PrivateKey), russh::keys::ssh_key::Error> { // If it already parses, return canonical string and parsed key. if let Ok(parsed) = russh::keys::PrivateKey::from_openssh(input) { - let canonical = parsed.to_openssh(russh_ssh_key::LineEnding::LF)?.to_string(); + let canonical = parsed.to_openssh(ssh_key::LineEnding::LF)?.to_string(); return Ok((canonical, parsed)); } @@ -69,18 +81,24 @@ fn normalize_openssh_ed25519_seed_key( let mut idx = 0usize; let magic = b"openssh-key-v1\0"; - if raw.len() < magic.len() || &raw[..magic.len()] != magic { return None; } + if raw.len() < magic.len() || &raw[..magic.len()] != magic { + return None; + } idx += magic.len(); fn read_u32(buf: &[u8], idx: &mut usize) -> Option { - if *idx + 4 > buf.len() { return None; } + if *idx + 4 > buf.len() { + return None; + } let v = u32::from_be_bytes([buf[*idx], buf[*idx + 1], buf[*idx + 2], buf[*idx + 3]]); *idx += 4; Some(v) } fn read_string<'a>(buf: &'a [u8], idx: &mut usize) -> Option<&'a [u8]> { let n = read_u32(buf, idx)? as usize; - if *idx + n > buf.len() { return None; } + if *idx + n > buf.len() { + return None; + } let s = &buf[*idx..*idx + n]; *idx += n; Some(s) @@ -89,7 +107,9 @@ fn normalize_openssh_ed25519_seed_key( let ciphername = read_string(&raw, &mut idx)?; let kdfname = read_string(&raw, &mut idx)?; let _kdfopts = read_string(&raw, &mut idx)?; - if ciphername != b"none" || kdfname != b"none" { return None; } + if ciphername != b"none" || kdfname != b"none" { + return None; + } let nkeys = read_u32(&raw, &mut idx)? as usize; for _ in 0..nkeys { @@ -100,10 +120,14 @@ fn normalize_openssh_ed25519_seed_key( let mut pidx = 0usize; let check1 = read_u32(private_block, &mut pidx)?; let check2 = read_u32(private_block, &mut pidx)?; - if check1 != check2 { return None; } + if check1 != check2 { + return None; + } let alg = read_string(private_block, &mut pidx)?; - if alg != b"ssh-ed25519" { return None; } + if alg != b"ssh-ed25519" { + return None; + } let _pubkey = read_string(private_block, &mut pidx)?; let privkey = read_string(private_block, &mut pidx)?; let comment_bytes = read_string(private_block, &mut pidx)?; @@ -111,8 +135,11 @@ fn normalize_openssh_ed25519_seed_key( // Build canonical keypair bytes let mut keypair_bytes = [0u8; 64]; if privkey.len() == 32 { - let seed: [u8; 32] = match privkey.try_into() { Ok(a) => a, Err(_) => return None }; - let sk = SigningKey::from_bytes(&seed); + let seed: [u8; 32] = match privkey.try_into() { + Ok(a) => a, + Err(_) => return None, + }; + let sk = ed25519_dalek::SigningKey::from_bytes(&seed); let vk = sk.verifying_key(); let pub_bytes = vk.to_bytes(); keypair_bytes[..32].copy_from_slice(&seed); @@ -122,20 +149,28 @@ fn normalize_openssh_ed25519_seed_key( } else { return None; } - let ed_kp = match Ed25519Keypair::from_bytes(&keypair_bytes) { Ok(k) => k, Err(_) => return None }; + let ed_kp = match Ed25519Keypair::from_bytes(&keypair_bytes) { + Ok(k) => k, + Err(_) => return None, + }; let comment = String::from_utf8(comment_bytes.to_vec()).unwrap_or_default(); let key_data = KeypairData::from(ed_kp); - let private = match ssh_key::PrivateKey::new(key_data, comment) { Ok(p) => p, Err(_) => return None }; - match private.to_openssh(LineEnding::LF) { Ok(s) => Some(s.to_string()), Err(_) => None } + let private = match ssh_key::PrivateKey::new(key_data, comment) { + Ok(p) => p, + Err(_) => return None, + }; + match private.to_openssh(ssh_key::LineEnding::LF) { + Ok(s) => Some(s.to_string()), + Err(_) => None, + } } let candidate = try_fix_seed_only_ed25519(input).unwrap_or_else(|| input.to_string()); let parsed = russh::keys::PrivateKey::from_openssh(&candidate)?; - let canonical = parsed.to_openssh(russh_ssh_key::LineEnding::LF)?.to_string(); + let canonical = parsed.to_openssh(ssh_key::LineEnding::LF)?.to_string(); Ok((canonical, parsed)) } - #[cfg(test)] mod tests { use super::*; @@ -147,7 +182,6 @@ mod tests { assert!(result.is_err(), "Expected Err for invalid key content"); } - #[test] fn validate_private_key_accepts_1() { // Generated with: ssh-keygen -t ed25519 -C "test-ed25519@fressh.com" -f ./ed25519-with-comment @@ -158,7 +192,8 @@ LAAAAAtzc2gtZWQyNTUxOQAAACC7PhmC0yS0Q8LcUkRnoYCxpb4gkCjJhadvvf+TDlRBJw AAAEBmrg8TL0+2xypHjVpFeuQmgQf3Qn/A45Jz+zCwVgoBt7s+GYLTJLRDwtxSRGehgLGl viCQKMmFp2+9/5MOVEEnAAAAF3Rlc3QtZWQyNTUxOUBmcmVzc2guY29tAQIDBAUG -----END OPENSSH PRIVATE KEY----- -".to_string(); +" + .to_string(); let result = validate_private_key(valid_key); assert!(result.is_ok(), "Expected Ok for valid key content"); } @@ -172,7 +207,8 @@ zQAAAAtzc2gtZWQyNTUxOQAAACD/icJYduvcR9JPKw9g/bPWpsgS0IAaJxlYL5yeuOaNMg AAAEDYE6BYf7QlpAaJCfaxA/HN487NM9iIF7VGue/iefZIyP+Jwlh269xH0k8rD2D9s9am yBLQgBonGVgvnJ645o0yAAAADmV0aGFuQEV0aGFuLVBDAQIDBAUGBw== -----END OPENSSH PRIVATE KEY----- -".to_string(); +" + .to_string(); let result = validate_private_key(valid_key); assert!(result.is_ok(), "Expected Ok for valid key content"); } @@ -186,13 +222,14 @@ hwAAAAtzc2gtZWQyNTUxOQAAACDt2ZcFrEhB8/B4uu30mPIi3BWWEa/wE//IUXLeL9YevA AAAEBMtZWpjpVnzDhYKR3V09SLohGqkW7HgMXoF8f0zf+/Pu3ZlwWsSEHz8Hi67fSY8iLc FZYRr/AT/8hRct4v1h68AAAAAAECAwQF -----END OPENSSH PRIVATE KEY----- -".to_string(); +" + .to_string(); let result = validate_private_key(valid_key); assert!(result.is_ok(), "Expected Ok for valid key content"); } #[test] fn validate_private_key_accepts_4() { - // Generated with juicessh + // Generated with juicessh let valid_key = "-----BEGIN OPENSSH PRIVATE KEY----- b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZWQyNTUxOQAAACCh5IbLI9ypdFzNW8WvezgBrzJT/2mT9BKSdZScB4EYoQAAAJB8YyoafGMqGgAAAAtzc2gtZWQyNTUxOQAAACCh5IbLI9ypdFzNW8WvezgBrzJT/2mT9BKSdZScB4EYoQAAAECpYzHTSiKC2iehjck1n8GAp5mdGuB2J5vV+9U3MAvthKHkhssj3Kl0XM1bxa97OAGvMlP/aZP0EpJ1lJwHgRihAAAAAAECAwQFBgcICQoLDA0= -----END OPENSSH PRIVATE KEY----- diff --git a/packages/react-native-uniffi-russh/rust/uniffi-russh/src/ssh_connection.rs b/packages/react-native-uniffi-russh/rust/uniffi-russh/src/ssh_connection.rs index 10e4410..6e273ae 100644 --- a/packages/react-native-uniffi-russh/rust/uniffi-russh/src/ssh_connection.rs +++ b/packages/react-native-uniffi-russh/rust/uniffi-russh/src/ssh_connection.rs @@ -1,8 +1,27 @@ +use std::fmt; +use std::sync::{Arc, Weak}; +use tokio::sync::{broadcast, Mutex as AsyncMutex}; +use russh::client::{Config, Handle as ClientHandle}; +use russh::keys::PrivateKeyWithHashAlg; +use russh::{self, ChannelMsg, Disconnect}; +use crate::private_key::normalize_openssh_ed25519_seed_key; +use crate::ssh_shell::{ + append_and_broadcast, Chunk, NoopHandler, ShellSession, ShellSessionInfo, StartShellOptions, + StreamKind, DEFAULT_BROADCAST_CHUNK_CAPACITY, DEFAULT_MAX_CHUNK_SIZE, + DEFAULT_SHELL_RING_BUFFER_CAPACITY, DEFAULT_TERMINAL_MODES, DEFAULT_TERM_COALESCE_MS, + DEFAULT_TERM_COL_WIDTH, DEFAULT_TERM_PIXEL_HEIGHT, DEFAULT_TERM_PIXEL_WIDTH, + DEFAULT_TERM_ROW_HEIGHT, +}; +use crate::utils::{now_ms, SshError}; +use std::sync::atomic::AtomicUsize; - +use std::{ + collections::HashMap, + sync::{atomic::AtomicU64, Mutex}, +}; #[derive(Debug, Clone, PartialEq, uniffi::Enum)] pub enum Security { @@ -34,6 +53,13 @@ pub enum SshConnectionProgressEvent { // After promise resolves, assume: Connected } +#[derive(Debug, Clone, PartialEq, uniffi::Record)] +pub struct SshConnectionInfoProgressTimings { + // TODO: We should have a field for each SshConnectionProgressEvent. Would be great if this were enforced by the compiler. + pub tcp_established_at_ms: f64, + pub ssh_handshake_at_ms: f64, +} + #[uniffi::export(with_foreign)] pub trait ConnectProgressCallback: Send + Sync { fn on_change(&self, status: SshConnectionProgressEvent); @@ -44,24 +70,28 @@ pub trait ConnectionDisconnectedCallback: Send + Sync { fn on_change(&self, connection_id: String); } - +/// Snapshot of current connection info for property-like access in TS. +#[derive(Debug, Clone, PartialEq, uniffi::Record)] +pub struct SshConnectionInfo { + pub connection_id: String, + pub connection_details: ConnectionDetails, + pub created_at_ms: f64, + pub connected_at_ms: f64, + pub progress_timings: SshConnectionInfoProgressTimings, +} #[derive(uniffi::Object)] pub struct SshConnection { - info: SshConnectionInfo, - on_disconnected_callback: Option>, - client_handle: AsyncMutex>, + pub(crate) info: SshConnectionInfo, + pub(crate) on_disconnected_callback: Option>, + pub(crate) client_handle: AsyncMutex>, - shells: AsyncMutex>>, + pub(crate) shells: AsyncMutex>>, // Weak self for child sessions to refer back without cycles. - self_weak: AsyncMutex>, + pub(crate) self_weak: AsyncMutex>, } - - - - impl fmt::Debug for SshConnection { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("SshConnectionHandle") @@ -72,9 +102,6 @@ impl fmt::Debug for SshConnection { } } - - - #[uniffi::export(async_runtime = "tokio")] impl SshConnection { /// Convenience snapshot for property-like access in TS. @@ -82,8 +109,10 @@ impl SshConnection { self.info.clone() } - pub async fn start_shell(&self, opts: StartShellOptions) -> Result, SshError> { - + pub async fn start_shell( + &self, + opts: StartShellOptions, + ) -> Result, SshError> { let started_at_ms = now_ms(); let term = opts.term; @@ -107,12 +136,37 @@ impl SshConnection { } } - let row_height = opts.terminal_size.as_ref().and_then(|s| s.row_height).unwrap_or(DEFAULT_TERM_ROW_HEIGHT); - let col_width = opts.terminal_size.as_ref().and_then(|s| s.col_width).unwrap_or(DEFAULT_TERM_COL_WIDTH); - let pixel_width = opts.terminal_pixel_size.as_ref().and_then(|s| s.pixel_width).unwrap_or(DEFAULT_TERM_PIXEL_WIDTH); - let pixel_height= opts.terminal_pixel_size.as_ref().and_then(|s| s.pixel_height).unwrap_or(DEFAULT_TERM_PIXEL_HEIGHT); + let row_height = opts + .terminal_size + .as_ref() + .and_then(|s| s.row_height) + .unwrap_or(DEFAULT_TERM_ROW_HEIGHT); + let col_width = opts + .terminal_size + .as_ref() + .and_then(|s| s.col_width) + .unwrap_or(DEFAULT_TERM_COL_WIDTH); + let pixel_width = opts + .terminal_pixel_size + .as_ref() + .and_then(|s| s.pixel_width) + .unwrap_or(DEFAULT_TERM_PIXEL_WIDTH); + let pixel_height = opts + .terminal_pixel_size + .as_ref() + .and_then(|s| s.pixel_height) + .unwrap_or(DEFAULT_TERM_PIXEL_HEIGHT); - ch.request_pty(true, term.as_ssh_name(), col_width, row_height, pixel_width, pixel_height, &modes).await?; + ch.request_pty( + true, + term.as_ssh_name(), + col_width, + row_height, + pixel_width, + pixel_height, + &modes, + ) + .await?; ch.request_shell(true).await?; // Split for read/write; spawn reader. @@ -195,10 +249,10 @@ impl SshConnection { }, on_closed_callback, parent: self.self_weak.lock().await.clone(), - + writer: AsyncMutex::new(writer), reader_task, - + // Ring buffer ring, ring_bytes_capacity, @@ -206,7 +260,7 @@ impl SshConnection { dropped_bytes_total, head_seq, tail_seq, - + // Listener tasks management sender: tx, listener_tasks: Arc::new(Mutex::new(HashMap::new())), @@ -217,11 +271,9 @@ impl SshConnection { self.shells.lock().await.insert(channel_id, session.clone()); - Ok(session) } - pub async fn disconnect(&self) -> Result<(), SshError> { // TODO: Check if we need to close all these if we are about to disconnect? let sessions: Vec> = { @@ -243,7 +295,6 @@ impl SshConnection { } } - #[uniffi::export(async_runtime = "tokio")] pub async fn connect(options: ConnectOptions) -> Result, SshError> { let started_at_ms = now_ms(); @@ -253,7 +304,6 @@ pub async fn connect(options: ConnectOptions) -> Result, SshE username: options.connection_details.username.clone(), security: options.connection_details.security.clone(), }; - // TCP let addr = format!("{}:{}", details.host, details.port); @@ -265,32 +315,47 @@ pub async fn connect(options: ConnectOptions) -> Result, SshE sl.on_change(SshConnectionProgressEvent::TcpConnected); } let cfg = Arc::new(Config::default()); - let mut handle: ClientHandle = russh::client::connect_stream(cfg, socket, NoopHandler).await?; + let mut handle: ClientHandle = + russh::client::connect_stream(cfg, socket, NoopHandler).await?; let ssh_handshake_at_ms = now_ms(); if let Some(sl) = options.on_connection_progress_callback.as_ref() { sl.on_change(SshConnectionProgressEvent::SshHandshake); } - let auth_result = match &details.security { + let auth_result = match &details.security { Security::Password { password } => { - handle.authenticate_password(details.username.clone(), password.clone()).await? + handle + .authenticate_password(details.username.clone(), password.clone()) + .await? } - Security::Key { private_key_content } => { + Security::Key { + private_key_content, + } => { // Normalize and parse using shared helper so RN-validated keys match runtime parsing. let (_canonical, parsed) = normalize_openssh_ed25519_seed_key(private_key_content)?; let pk_with_hash = PrivateKeyWithHashAlg::new(Arc::new(parsed), None); - handle.authenticate_publickey(details.username.clone(), pk_with_hash).await? + handle + .authenticate_publickey(details.username.clone(), pk_with_hash) + .await? } }; - if !matches!(auth_result, russh::client::AuthResult::Success) { return Err(auth_result.into()); } + if !matches!(auth_result, russh::client::AuthResult::Success) { + return Err(auth_result.into()); + } - let connection_id = format!("{}@{}:{}:{}", details.username, details.host, details.port, local_port); + let connection_id = format!( + "{}@{}:{}:{}", + details.username, details.host, details.port, local_port + ); let conn = Arc::new(SshConnection { info: SshConnectionInfo { connection_id, connection_details: details, created_at_ms: started_at_ms, connected_at_ms: now_ms(), - progress_timings: SshConnectionInfoProgressTimings { tcp_established_at_ms, ssh_handshake_at_ms }, + progress_timings: SshConnectionInfoProgressTimings { + tcp_established_at_ms, + ssh_handshake_at_ms, + }, }, client_handle: AsyncMutex::new(handle), shells: AsyncMutex::new(HashMap::new()), diff --git a/packages/react-native-uniffi-russh/rust/uniffi-russh/src/ssh_shell.rs b/packages/react-native-uniffi-russh/rust/uniffi-russh/src/ssh_shell.rs index 02923ac..f5e695a 100644 --- a/packages/react-native-uniffi-russh/rust/uniffi-russh/src/ssh_shell.rs +++ b/packages/react-native-uniffi-russh/rust/uniffi-russh/src/ssh_shell.rs @@ -1,6 +1,20 @@ +use std::{ + collections::HashMap, + sync::{ + atomic::{AtomicU64, AtomicUsize, Ordering}, + Arc, Mutex, + }, + time::Duration, +}; +use bytes::Bytes; - +use crate::{ + ssh_connection::SshConnection, + utils::{now_ms, SshError}, +}; +use russh::{self, client}; +use tokio::sync::{broadcast, Mutex as AsyncMutex}; // Note: russh accepts an untyped string for the terminal type #[derive(Debug, Clone, Copy, PartialEq, uniffi::Enum)] @@ -14,7 +28,7 @@ pub enum TerminalType { Xterm256, } impl TerminalType { - fn as_ssh_name(self) -> &'static str { + pub(crate) fn as_ssh_name(self) -> &'static str { match self { TerminalType::Vanilla => "vanilla", TerminalType::Vt100 => "vt100", @@ -27,10 +41,11 @@ impl TerminalType { } } - - #[derive(Debug, Clone, Copy, PartialEq, uniffi::Enum)] -pub enum StreamKind { Stdout, Stderr } +pub enum StreamKind { + Stdout, + Stderr, +} #[derive(Debug, Clone, PartialEq, uniffi::Record)] pub struct TerminalChunk { @@ -41,7 +56,10 @@ pub struct TerminalChunk { } #[derive(Debug, Clone, PartialEq, uniffi::Record)] -pub struct DroppedRange { pub from_seq: u64, pub to_seq: u64 } +pub struct DroppedRange { + pub from_seq: u64, + pub to_seq: u64, +} #[derive(Debug, Clone, PartialEq, uniffi::Enum)] pub enum ShellEvent { @@ -54,17 +72,12 @@ pub trait ShellListener: Send + Sync { fn on_event(&self, ev: ShellEvent); } - - #[derive(Debug, Clone, Copy, PartialEq, uniffi::Record)] pub struct TerminalMode { - pub opcode: u8, // PTY opcode (matches russh::Pty discriminants) + pub opcode: u8, // PTY opcode (matches russh::Pty discriminants) pub value: u32, } - - - #[derive(Debug, Clone, Copy, PartialEq, uniffi::Record)] pub struct TerminalSize { pub row_height: Option, @@ -91,23 +104,6 @@ pub trait ShellClosedCallback: Send + Sync { fn on_change(&self, channel_id: u32); } -#[derive(Debug, Clone, PartialEq, uniffi::Record)] -pub struct SshConnectionInfoProgressTimings { - // TODO: We should have a field for each SshConnectionProgressEvent. Would be great if this were enforced by the compiler. - pub tcp_established_at_ms: f64, - pub ssh_handshake_at_ms: f64, -} - -/// Snapshot of current connection info for property-like access in TS. -#[derive(Debug, Clone, PartialEq, uniffi::Record)] -pub struct SshConnectionInfo { - pub connection_id: String, - pub connection_details: ConnectionDetails, - pub created_at_ms: f64, - pub connected_at_ms: f64, - pub progress_timings: SshConnectionInfoProgressTimings, -} - /// Snapshot of shell session info for property-like access in TS. #[derive(Debug, Clone, PartialEq, uniffi::Record)] pub struct ShellSessionInfo { @@ -118,45 +114,43 @@ pub struct ShellSessionInfo { pub connection_id: String, } - #[derive(uniffi::Object)] pub struct ShellSession { - info: ShellSessionInfo, - on_closed_callback: Option>, + pub(crate) info: ShellSessionInfo, + pub(crate) on_closed_callback: Option>, // Weak backref; avoid retain cycle. - parent: std::sync::Weak, + pub(crate) parent: std::sync::Weak, - writer: AsyncMutex>, + pub(crate) writer: AsyncMutex>, // We keep the reader task to allow cancellation on close. - reader_task: tokio::task::JoinHandle<()>, - + pub(crate) reader_task: tokio::task::JoinHandle<()>, + // Ring buffer - ring: Arc>>>, - ring_bytes_capacity: Arc, - used_bytes: Arc>, - dropped_bytes_total: Arc, - head_seq: Arc, - tail_seq: Arc, + pub(crate) ring: Arc>>>, + pub(crate) ring_bytes_capacity: Arc, + pub(crate) used_bytes: Arc>, + pub(crate) dropped_bytes_total: Arc, + pub(crate) head_seq: Arc, + pub(crate) tail_seq: Arc, // Live broadcast - sender: broadcast::Sender>, + pub(crate) sender: broadcast::Sender>, // Listener tasks management - listener_tasks: Arc>>>, - next_listener_id: AtomicU64, - default_coalesce_ms: AtomicU64, - rt_handle: tokio::runtime::Handle, + pub(crate) listener_tasks: Arc>>>, + pub(crate) next_listener_id: AtomicU64, + pub(crate) default_coalesce_ms: AtomicU64, + pub(crate) rt_handle: tokio::runtime::Handle, } - #[derive(Debug, Clone, PartialEq, uniffi::Enum)] pub enum Cursor { - Head, // start from the beginning + Head, // start from the beginning TailBytes { bytes: u64 }, // start from the end of the last N bytes - Seq { seq: u64 }, // start from the given sequence number - TimeMs { t_ms: f64 }, // start from the given time in milliseconds - Live, // start from the live stream + Seq { seq: u64 }, // start from the given sequence number + TimeMs { t_ms: f64 }, // start from the given time in milliseconds + Live, // start from the live stream } #[derive(Debug, Clone, PartialEq, uniffi::Record)] @@ -183,20 +177,18 @@ pub struct BufferStats { pub chunks_count: u64, } - - // Internal chunk type kept in ring/broadcast #[derive(Debug)] -struct Chunk { // TODO: This is very similar to TerminalChunk. The only difference is the bytes field +pub(crate) struct Chunk { + // TODO: This is very similar to TerminalChunk. The only difference is the bytes field seq: u64, t_ms: f64, stream: StreamKind, bytes: Bytes, } - /// Minimal client::Handler. -struct NoopHandler; +pub(crate) struct NoopHandler; impl client::Handler for NoopHandler { type Error = SshError; // Accept any server key for now so dev UX isn't blocked. @@ -205,17 +197,18 @@ impl client::Handler for NoopHandler { fn check_server_key( &mut self, _server_public_key: &russh::keys::PublicKey, - ) -> impl std::future::Future::Error>> + std::marker::Send { + ) -> impl std::future::Future< + Output = std::result::Result::Error>, + > + std::marker::Send { std::future::ready(Ok(true)) } } - /// ---------- Methods ---------- -static DEFAULT_TERMINAL_MODES: &[(russh::Pty, u32)] = &[ +pub(crate) static DEFAULT_TERMINAL_MODES: &[(russh::Pty, u32)] = &[ (russh::Pty::ECHO, 1), // This will cause the terminal to echo the characters back to the client. (russh::Pty::ECHOK, 1), // After the line-kill character (often Ctrl+U), echo a newline. - (russh::Pty::ECHOE, 1), // Visually erase on backspace (erase using BS-SP-BS sequence). + (russh::Pty::ECHOE, 1), // Visually erase on backspace (erase using BS-SP-BS sequence). (russh::Pty::ICANON, 1), // Canonical (cooked) mode: line editing; input delivered line-by-line. (russh::Pty::ISIG, 1), // Generate signals on special chars (e.g., Ctrl+C -> SIGINT, Ctrl+Z -> SIGTSTP). (russh::Pty::ICRNL, 1), // Convert carriage return (CR, \r) to newline (NL, \n) on input. @@ -224,30 +217,28 @@ static DEFAULT_TERMINAL_MODES: &[(russh::Pty, u32)] = &[ (russh::Pty::TTY_OP_OSPEED, 38400), // Set output baud rate (here 38400). The baud rate is the number of characters per second. ]; -static DEFAULT_TERM_ROW_HEIGHT: u32 = 24; -static DEFAULT_TERM_COL_WIDTH: u32 = 80; -static DEFAULT_TERM_PIXEL_WIDTH: u32 = 0; -static DEFAULT_TERM_PIXEL_HEIGHT: u32 = 0; -static DEFAULT_TERM_COALESCE_MS: u64 = 16; +pub(crate) static DEFAULT_TERM_ROW_HEIGHT: u32 = 24; +pub(crate) static DEFAULT_TERM_COL_WIDTH: u32 = 80; +pub(crate) static DEFAULT_TERM_PIXEL_WIDTH: u32 = 0; +pub(crate) static DEFAULT_TERM_PIXEL_HEIGHT: u32 = 0; +pub(crate) static DEFAULT_TERM_COALESCE_MS: u64 = 16; // Number of recent live chunks retained by the broadcast channel for each // subscriber. If a subscriber falls behind this many messages, they will get a // Lagged error and skip to the latest. Tune to: peak_chunks_per_sec × max_pause_sec. -static DEFAULT_BROADCAST_CHUNK_CAPACITY: usize = 1024; +pub(crate) static DEFAULT_BROADCAST_CHUNK_CAPACITY: usize = 1024; // Byte budget for the on-heap replay/history ring buffer. When the total bytes // of stored chunks exceed this, oldest chunks are evicted. Increase for a // longer replay window at the cost of memory. -static DEFAULT_SHELL_RING_BUFFER_CAPACITY: usize = 2 * 1024 * 1024; // default 2MiB +pub(crate) static DEFAULT_SHELL_RING_BUFFER_CAPACITY: usize = 2 * 1024 * 1024; // default 2MiB // Upper bound for the size of a single appended/broadcast chunk. Incoming data // is split into slices no larger than this. Smaller values reduce latency and // loss impact; larger values reduce per-message overhead. -static DEFAULT_MAX_CHUNK_SIZE: usize = 16 * 1024; // 16KB - -static DEFAULT_READ_BUFFER_MAX_BYTES: u64 = 512 * 1024; // 512KB - +pub(crate) static DEFAULT_MAX_CHUNK_SIZE: usize = 16 * 1024; // 16KB +pub(crate) static DEFAULT_READ_BUFFER_MAX_BYTES: u64 = 512 * 1024; // 512KB #[uniffi::export(async_runtime = "tokio")] impl ShellSession { @@ -263,12 +254,17 @@ impl ShellSession { } /// Close the associated shell channel and stop its reader task. - pub async fn close(&self) -> Result<(), SshError> { self.close_internal().await } + pub async fn close(&self) -> Result<(), SshError> { + self.close_internal().await + } /// Buffer statistics snapshot. pub fn buffer_stats(&self) -> BufferStats { let used = *self.used_bytes.lock().unwrap_or_else(|p| p.into_inner()) as u64; - let chunks_count = match self.ring.lock() { Ok(q) => q.len() as u64, Err(p) => p.into_inner().len() as u64 }; + let chunks_count = match self.ring.lock() { + Ok(q) => q.len() as u64, + Err(p) => p.into_inner().len() as u64, + }; BufferStats { ring_bytes_count: self.ring_bytes_capacity.load(Ordering::Relaxed) as u64, used_bytes: used, @@ -280,7 +276,9 @@ impl ShellSession { } /// Current next sequence number. - pub fn current_seq(&self) -> u64 { self.tail_seq.load(Ordering::Relaxed).saturating_add(1) } + pub fn current_seq(&self) -> u64 { + self.tail_seq.load(Ordering::Relaxed).saturating_add(1) + } /// Read the ring buffer from a cursor. pub fn read_buffer(&self, cursor: Cursor, max_bytes: Option) -> BufferReadResult { @@ -292,29 +290,54 @@ impl ShellSession { // Lock ring to determine start and collect arcs, then drop lock. let (_start_idx_unused, _start_seq, arcs): (usize, u64, Vec>) = { - let ring = match self.ring.lock() { Ok(g) => g, Err(p) => p.into_inner() }; + let ring = match self.ring.lock() { + Ok(g) => g, + Err(p) => p.into_inner(), + }; let (start_seq, idx) = match cursor { Cursor::Head => (head_seq_now, 0usize), Cursor::Seq { seq: mut s } => { - if s < head_seq_now { dropped = Some(DroppedRange { from_seq: s, to_seq: head_seq_now - 1 }); s = head_seq_now; } + if s < head_seq_now { + dropped = Some(DroppedRange { + from_seq: s, + to_seq: head_seq_now - 1, + }); + s = head_seq_now; + } let idx = s.saturating_sub(head_seq_now) as usize; (s, idx.min(ring.len())) } Cursor::TimeMs { t_ms: t } => { // linear scan to find first chunk with t_ms >= t - let mut idx = 0usize; let mut s = head_seq_now; - for (i, ch) in ring.iter().enumerate() { if ch.t_ms >= t { idx = i; s = ch.seq; break; } } + let mut idx = 0usize; + let mut s = head_seq_now; + for (i, ch) in ring.iter().enumerate() { + if ch.t_ms >= t { + idx = i; + s = ch.seq; + break; + } + } (s, idx) } Cursor::TailBytes { bytes: n } => { // Walk from tail backwards until approx n bytes, then forward. - let mut bytes = 0usize; let mut idx = ring.len(); + let mut bytes = 0usize; + let mut idx = ring.len(); for i in (0..ring.len()).rev() { let b = ring[i].bytes.len(); - if bytes >= n as usize { idx = i + 1; break; } - bytes += b; idx = i; + if bytes >= n as usize { + idx = i + 1; + break; + } + bytes += b; + idx = i; } - let s = if idx < ring.len() { ring[idx].seq } else { tail_seq_now.saturating_add(1) }; + let s = if idx < ring.len() { + ring[idx].seq + } else { + tail_seq_now.saturating_add(1) + }; (s, idx) } Cursor::Live => (tail_seq_now.saturating_add(1), ring.len()), @@ -327,16 +350,35 @@ impl ShellSession { let mut total = 0usize; for ch in arcs { let len = ch.bytes.len(); - if total + len > max_total { break; } - out_chunks.push(TerminalChunk { seq: ch.seq, t_ms: ch.t_ms, stream: ch.stream, bytes: ch.bytes.clone().to_vec() }); + if total + len > max_total { + break; + } + out_chunks.push(TerminalChunk { + seq: ch.seq, + t_ms: ch.t_ms, + stream: ch.stream, + bytes: ch.bytes.clone().to_vec(), + }); total += len; } - let next_seq = if let Some(last) = out_chunks.last() { last.seq + 1 } else { tail_seq_now.saturating_add(1) }; - BufferReadResult { chunks: out_chunks, next_seq, dropped } + let next_seq = if let Some(last) = out_chunks.last() { + last.seq + 1 + } else { + tail_seq_now.saturating_add(1) + }; + BufferReadResult { + chunks: out_chunks, + next_seq, + dropped, + } } /// Add a listener with optional replay and live follow. - pub fn add_listener(&self, listener: Arc, opts: ListenerOptions) -> Result { + pub fn add_listener( + &self, + listener: Arc, + opts: ListenerOptions, + ) -> Result { // Snapshot for replay; emit from task to avoid re-entrant callbacks during FFI. let replay = self.read_buffer(opts.cursor.clone(), None); let mut rx = self.sender.subscribe(); @@ -408,18 +450,21 @@ impl ShellSession { } } }); - if let Ok(mut map) = self.listener_tasks.lock() { map.insert(id, handle); } + if let Ok(mut map) = self.listener_tasks.lock() { + map.insert(id, handle); + } Ok(id) } pub fn remove_listener(&self, id: u64) { if let Ok(mut map) = self.listener_tasks.lock() { - if let Some(h) = map.remove(&id) { h.abort(); } + if let Some(h) = map.remove(&id) { + h.abort(); + } } } } - // Internal lifecycle helpers (not exported via UniFFI) impl ShellSession { async fn close_internal(&self) -> Result<(), SshError> { @@ -456,9 +501,8 @@ impl ShellSession { // } } - #[allow(clippy::too_many_arguments)] -fn append_and_broadcast( +pub(crate) fn append_and_broadcast( data: &[u8], stream: StreamKind, ring: &Arc>>>, @@ -477,10 +521,18 @@ fn append_and_broadcast( let slice = &data[offset..end]; let seq = next_seq.fetch_add(1, Ordering::Relaxed); let t_ms = now_ms(); - let chunk = Arc::new(Chunk { seq, t_ms, stream, bytes: Bytes::copy_from_slice(slice) }); + let chunk = Arc::new(Chunk { + seq, + t_ms, + stream, + bytes: Bytes::copy_from_slice(slice), + }); // push to ring { - let mut q = match ring.lock() { Ok(g) => g, Err(p) => p.into_inner() }; + let mut q = match ring.lock() { + Ok(g) => g, + Err(p) => p.into_inner(), + }; q.push_back(chunk.clone()); } { @@ -490,13 +542,18 @@ fn append_and_broadcast( // evict if needed let cap = ring_bytes_capacity.load(Ordering::Relaxed); if *used > cap { - let mut q = match ring.lock() { Ok(g) => g, Err(p) => p.into_inner() }; + let mut q = match ring.lock() { + Ok(g) => g, + Err(p) => p.into_inner(), + }; while *used > cap { if let Some(front) = q.pop_front() { *used -= front.bytes.len(); dropped_bytes_total.fetch_add(front.bytes.len() as u64, Ordering::Relaxed); head_seq.store(front.seq.saturating_add(1), Ordering::Relaxed); - } else { break; } + } else { + break; + } } } } diff --git a/packages/react-native-uniffi-russh/rust/uniffi-russh/src/utils.rs b/packages/react-native-uniffi-russh/rust/uniffi-russh/src/utils.rs index 4e70b15..362b010 100644 --- a/packages/react-native-uniffi-russh/rust/uniffi-russh/src/utils.rs +++ b/packages/react-native-uniffi-russh/rust/uniffi-russh/src/utils.rs @@ -1,16 +1,13 @@ +use std::time::{SystemTime, UNIX_EPOCH}; +use thiserror::Error; - - -fn now_ms() -> f64 { +pub(crate) fn now_ms() -> f64 { let d = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default(); d.as_millis() as f64 } - - - // TODO: Split this into different errors for each public function #[derive(Debug, Error, uniffi::Error)] pub enum SshError { @@ -28,19 +25,29 @@ pub enum SshError { RusshKeys(String), } impl From for SshError { - fn from(e: russh::Error) -> Self { SshError::Russh(e.to_string()) } + fn from(e: russh::Error) -> Self { + SshError::Russh(e.to_string()) + } } impl From for SshError { - fn from(e: russh_keys::Error) -> Self { SshError::RusshKeys(e.to_string()) } + fn from(e: russh_keys::Error) -> Self { + SshError::RusshKeys(e.to_string()) + } } -impl From for SshError { - fn from(e: ssh_key::Error) -> Self { SshError::RusshKeys(e.to_string()) } +impl From for SshError { + fn from(e: russh_keys::ssh_key::Error) -> Self { + SshError::RusshKeys(e.to_string()) + } } -impl From for SshError { - fn from(e: russh_ssh_key::Error) -> Self { SshError::RusshKeys(e.to_string()) } +impl From for SshError { + fn from(e: russh::keys::ssh_key::Error) -> Self { + SshError::RusshKeys(e.to_string()) + } } impl From for SshError { - fn from(e: std::io::Error) -> Self { SshError::Russh(e.to_string()) } + fn from(e: std::io::Error) -> Self { + SshError::Russh(e.to_string()) + } } impl From for SshError { fn from(a: russh::client::AuthResult) -> Self {