New rust lib passing lint

This commit is contained in:
EthanShoeDev
2025-09-17 22:06:54 -04:00
parent beb3b5fc6c
commit 0fa28b2134
2 changed files with 580 additions and 279 deletions

View File

@@ -7,35 +7,22 @@
use std::collections::HashMap;
use std::fmt;
use std::sync::{Arc, Mutex, Weak};
use std::time::{SystemTime, UNIX_EPOCH};
use std::sync::{atomic::{AtomicU64, AtomicUsize, Ordering}, Arc, Mutex, Weak};
use std::time::{SystemTime, UNIX_EPOCH, Duration};
use rand::rngs::OsRng;
use thiserror::Error;
use tokio::sync::Mutex as AsyncMutex;
use tokio::sync::{broadcast, Mutex as AsyncMutex};
use russh::{self, client, ChannelMsg, Disconnect};
use russh::client::{Config as ClientConfig, Handle as ClientHandle};
use russh_keys::{Algorithm as KeyAlgorithm, EcdsaCurve, PrivateKey};
use russh_keys::ssh_key::{self, LineEnding};
use once_cell::sync::Lazy;
use bytes::Bytes;
uniffi::setup_scaffolding!();
// Simpler aliases to satisfy clippy type-complexity.
type ListenerEntry = (u64, Arc<dyn ChannelListener>);
type ListenerList = Vec<ListenerEntry>;
// Type aliases to keep static types simple and satisfy clippy.
type ConnectionId = String;
type ChannelId = u32;
type ShellKey = (ConnectionId, ChannelId);
type ConnMap = HashMap<ConnectionId, Arc<SSHConnection>>;
type ShellMap = HashMap<ShellKey, Arc<ShellSession>>;
// ---------- Global registries (strong references; lifecycle managed explicitly) ----------
static CONNECTIONS: Lazy<Mutex<ConnMap>> = Lazy::new(|| Mutex::new(HashMap::new()));
static SHELLS: Lazy<Mutex<ShellMap>> = Lazy::new(|| Mutex::new(HashMap::new()));
// No global registries; handles are the only access points.
/// ---------- Types ----------
@@ -130,10 +117,30 @@ pub trait StatusListener: Send + Sync {
fn on_change(&self, status: SSHConnectionStatus);
}
/// Channel data callback (stdout/stderr unified)
// Stream kind for terminal output
#[derive(Debug, Clone, Copy, PartialEq, uniffi::Enum)]
pub enum StreamKind { Stdout, Stderr }
#[derive(Debug, Clone, PartialEq, uniffi::Record)]
pub struct TerminalChunk {
pub seq: u64,
pub t_ms: f64,
pub stream: StreamKind,
pub bytes: Vec<u8>,
}
#[derive(Debug, Clone, PartialEq, uniffi::Record)]
pub struct DroppedRange { pub from_seq: u64, pub to_seq: u64 }
#[derive(Debug, Clone, PartialEq, uniffi::Enum)]
pub enum ShellEvent {
Chunk(TerminalChunk),
Dropped { from_seq: u64, to_seq: u64 },
}
#[uniffi::export(with_foreign)]
pub trait ChannelListener: Send + Sync {
fn on_data(&self, data: Vec<u8>);
pub trait ShellListener: Send + Sync {
fn on_event(&self, ev: ShellEvent);
}
/// Key types for generation
@@ -152,6 +159,38 @@ pub struct StartShellOptions {
pub on_status_change: Option<Arc<dyn StatusListener>>,
}
#[derive(Debug, Clone, PartialEq, uniffi::Enum)]
pub enum Cursor {
Head,
TailBytes { bytes: u64 },
Seq { seq: u64 },
TimeMs { t_ms: f64 },
Live,
}
#[derive(Debug, Clone, PartialEq, uniffi::Record)]
pub struct ListenerOptions {
pub cursor: Cursor,
pub coalesce_ms: Option<u32>,
}
#[derive(Debug, Clone, PartialEq, uniffi::Record)]
pub struct BufferReadResult {
pub chunks: Vec<TerminalChunk>,
pub next_seq: u64,
pub dropped: Option<DroppedRange>,
}
#[derive(Debug, Clone, PartialEq, uniffi::Record)]
pub struct BufferStats {
pub ring_bytes: u64,
pub used_bytes: u64,
pub chunks: u64,
pub head_seq: u64,
pub tail_seq: u64,
pub dropped_bytes_total: u64,
}
/// Snapshot of current connection info for property-like access in TS.
#[derive(Debug, Clone, PartialEq, uniffi::Record)]
pub struct SshConnectionInfo {
@@ -186,10 +225,6 @@ pub struct SSHConnection {
// Weak self for child sessions to refer back without cycles.
self_weak: AsyncMutex<Weak<SSHConnection>>,
// Data listeners for whatever shell is active. We track by id for removal.
listeners: Arc<Mutex<ListenerList>>,
next_listener_id: Arc<Mutex<u64>>, // simple counter guarded by same kind of mutex
}
#[derive(uniffi::Object)]
@@ -204,20 +239,43 @@ pub struct ShellSession {
shell_status_listener: Option<Arc<dyn StatusListener>>,
created_at_ms: f64,
pty: PtyType,
// Ring buffer
ring: Arc<Mutex<std::collections::VecDeque<Arc<Chunk>>>>,
ring_bytes_capacity: Arc<AtomicUsize>,
used_bytes: Arc<Mutex<usize>>,
dropped_bytes_total: Arc<AtomicU64>,
head_seq: Arc<AtomicU64>,
tail_seq: Arc<AtomicU64>,
// Live broadcast
sender: broadcast::Sender<Arc<Chunk>>,
// Listener tasks management
listener_tasks: Arc<Mutex<HashMap<u64, tokio::task::JoinHandle<()>>>>,
next_listener_id: AtomicU64,
default_coalesce_ms: AtomicU64,
}
impl fmt::Debug for SSHConnection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let listeners_len = self.listeners.lock().map(|v| v.len()).unwrap_or(0);
f.debug_struct("SSHConnection")
.field("connection_details", &self.connection_details)
.field("created_at_ms", &self.created_at_ms)
.field("tcp_established_at_ms", &self.tcp_established_at_ms)
.field("listeners_len", &listeners_len)
.finish()
}
}
// Internal chunk type kept in ring/broadcast
#[derive(Debug)]
struct Chunk {
seq: u64,
t_ms: f64,
stream: StreamKind,
bytes: Bytes,
}
/// Minimal client::Handler.
struct NoopHandler;
impl client::Handler for NoopHandler {
@@ -247,21 +305,6 @@ impl SSHConnection {
}
}
/// Add a channel listener and get an id you can later remove with.
pub fn add_channel_listener(&self, listener: Arc<dyn ChannelListener>) -> u64 {
let mut guard = self.listeners.lock().unwrap();
let mut id_guard = self.next_listener_id.lock().unwrap();
let id = *id_guard + 1;
*id_guard = id;
guard.push((id, listener));
id
}
pub fn remove_channel_listener(&self, id: u64) {
if let Ok(mut v) = self.listeners.lock() {
v.retain(|(lid, _)| *lid != id);
}
}
/// Start a shell with the given PTY. Emits only Shell* statuses via options.on_status_change.
pub async fn start_shell(&self, opts: StartShellOptions) -> Result<Arc<ShellSession>, SshError> {
// Prevent double-start (safe default).
@@ -286,24 +329,60 @@ impl SSHConnection {
// Split for read/write; spawn reader.
let (mut reader, writer) = ch.split();
let listeners = self.listeners.clone();
// Setup ring + broadcast for this session
let (tx, _rx) = broadcast::channel::<Arc<Chunk>>(1024);
let ring = Arc::new(Mutex::new(std::collections::VecDeque::<Arc<Chunk>>::new()));
let used_bytes = Arc::new(Mutex::new(0usize));
let next_seq = Arc::new(AtomicU64::new(1));
let head_seq = Arc::new(AtomicU64::new(1));
let tail_seq = Arc::new(AtomicU64::new(0));
let dropped_bytes_total = Arc::new(AtomicU64::new(0));
let ring_bytes_capacity = Arc::new(AtomicUsize::new(2 * 1024 * 1024)); // default 2MiB
let default_coalesce_ms = AtomicU64::new(16); // default 16ms
let ring_clone = ring.clone();
let used_bytes_clone = used_bytes.clone();
let tx_clone = tx.clone();
let ring_bytes_capacity_c = ring_bytes_capacity.clone();
let dropped_bytes_total_c = dropped_bytes_total.clone();
let head_seq_c = head_seq.clone();
let tail_seq_c = tail_seq.clone();
let next_seq_c = next_seq.clone();
let shell_listener_for_task = shell_status_listener.clone();
let reader_task = tokio::spawn(async move {
let max_chunk = 16 * 1024; // 16KB
loop {
match reader.wait().await {
Some(ChannelMsg::Data { data }) => {
if let Ok(cl) = listeners.lock() {
let snapshot = cl.clone();
let buf = data.to_vec();
for (_, l) in snapshot { l.on_data(buf.clone()); }
}
append_and_broadcast(
&data,
StreamKind::Stdout,
&ring_clone,
&used_bytes_clone,
&ring_bytes_capacity_c,
&dropped_bytes_total_c,
&head_seq_c,
&tail_seq_c,
&next_seq_c,
&tx_clone,
max_chunk,
);
}
Some(ChannelMsg::ExtendedData { data, .. }) => {
if let Ok(cl) = listeners.lock() {
let snapshot = cl.clone();
let buf = data.to_vec();
for (_, l) in snapshot { l.on_data(buf.clone()); }
}
append_and_broadcast(
&data,
StreamKind::Stderr,
&ring_clone,
&used_bytes_clone,
&ring_bytes_capacity_c,
&dropped_bytes_total_c,
&head_seq_c,
&tail_seq_c,
&next_seq_c,
&tx_clone,
max_chunk,
);
}
Some(ChannelMsg::Close) | None => {
if let Some(sl) = shell_listener_for_task.as_ref() {
@@ -311,7 +390,7 @@ impl SSHConnection {
}
break;
}
_ => { /* ignore others */ }
_ => {}
}
}
});
@@ -324,16 +403,20 @@ impl SSHConnection {
shell_status_listener,
created_at_ms: now_ms(),
pty,
ring,
ring_bytes_capacity,
used_bytes,
dropped_bytes_total,
head_seq,
tail_seq,
sender: tx,
listener_tasks: Arc::new(Mutex::new(HashMap::new())),
next_listener_id: AtomicU64::new(1),
default_coalesce_ms,
});
*self.shell.lock().await = Some(session.clone());
// Register shell in global registry
if let Some(parent) = self.self_weak.lock().await.upgrade() {
let key = (parent.connection_id.clone(), channel_id);
if let Ok(mut map) = SHELLS.lock() { map.insert(key, session.clone()); }
}
// Report ShellConnected.
if let Some(sl) = session.shell_status_listener.as_ref() {
sl.on_change(SSHConnectionStatus::ShellConnected);
@@ -342,12 +425,7 @@ impl SSHConnection {
Ok(session)
}
/// Send bytes to the active shell (stdin).
pub async fn send_data(&self, data: Vec<u8>) -> Result<(), SshError> {
let guard = self.shell.lock().await;
let session = guard.as_ref().ok_or(SshError::Disconnected)?;
session.send_data(data).await
}
// Note: send_data now lives on ShellSession
// No exported close_shell: shell closure is handled via ShellSession::close()
@@ -360,8 +438,6 @@ impl SSHConnection {
let h = self.handle.lock().await;
h.disconnect(Disconnect::ByApplication, "bye", "").await?;
// Remove from registry after disconnect
if let Ok(mut map) = CONNECTIONS.lock() { map.remove(&self.connection_id); }
Ok(())
}
}
@@ -386,6 +462,160 @@ impl ShellSession {
/// Close the associated shell channel and stop its reader task.
pub async fn close(&self) -> Result<(), SshError> { self.close_internal().await }
/// Configure ring buffer policy.
pub async fn set_buffer_policy(&self, ring_bytes: Option<u64>, coalesce_ms: Option<u32>) {
if let Some(rb) = ring_bytes { self.ring_bytes_capacity.store(rb as usize, Ordering::Relaxed); self.evict_if_needed(); }
if let Some(cm) = coalesce_ms { self.default_coalesce_ms.store(cm as u64, Ordering::Relaxed); }
}
/// Buffer statistics snapshot.
pub fn buffer_stats(&self) -> BufferStats {
let used = *self.used_bytes.lock().unwrap() as u64;
let chunks = self.ring.lock().map(|q| q.len() as u64).unwrap_or(0);
BufferStats {
ring_bytes: self.ring_bytes_capacity.load(Ordering::Relaxed) as u64,
used_bytes: used,
chunks,
head_seq: self.head_seq.load(Ordering::Relaxed),
tail_seq: self.tail_seq.load(Ordering::Relaxed),
dropped_bytes_total: self.dropped_bytes_total.load(Ordering::Relaxed),
}
}
/// Current next sequence number.
pub fn current_seq(&self) -> u64 { self.tail_seq.load(Ordering::Relaxed).saturating_add(1) }
/// Read the ring buffer from a cursor.
pub fn read_buffer(&self, cursor: Cursor, max_bytes: Option<u64>) -> BufferReadResult {
let max_total = max_bytes.unwrap_or(512 * 1024) as usize; // default 512KB
let mut out_chunks: Vec<TerminalChunk> = Vec::new();
let mut dropped: Option<DroppedRange> = None;
let head_seq_now = self.head_seq.load(Ordering::Relaxed);
let tail_seq_now = self.tail_seq.load(Ordering::Relaxed);
// Lock ring to determine start and collect arcs, then drop lock.
let (_start_idx_unused, _start_seq, arcs): (usize, u64, Vec<Arc<Chunk>>) = {
let ring = self.ring.lock().unwrap();
let (start_seq, idx) = match cursor {
Cursor::Head => (head_seq_now, 0usize),
Cursor::Seq { seq: mut s } => {
if s < head_seq_now { dropped = Some(DroppedRange { from_seq: s, to_seq: head_seq_now - 1 }); s = head_seq_now; }
let idx = s.saturating_sub(head_seq_now) as usize;
(s, idx.min(ring.len()))
}
Cursor::TimeMs { t_ms: t } => {
// linear scan to find first chunk with t_ms >= t
let mut idx = 0usize; let mut s = head_seq_now;
for (i, ch) in ring.iter().enumerate() { if ch.t_ms >= t { idx = i; s = ch.seq; break; } }
(s, idx)
}
Cursor::TailBytes { bytes: n } => {
// Walk from tail backwards until approx n bytes, then forward.
let mut bytes = 0usize; let mut idx = ring.len();
for i in (0..ring.len()).rev() {
let b = ring[i].bytes.len();
if bytes >= n as usize { idx = i + 1; break; }
bytes += b; idx = i;
}
let s = if idx < ring.len() { ring[idx].seq } else { tail_seq_now.saturating_add(1) };
(s, idx)
}
Cursor::Live => (tail_seq_now.saturating_add(1), ring.len()),
};
let arcs: Vec<Arc<Chunk>> = ring.iter().skip(idx).cloned().collect();
(idx, start_seq, arcs)
};
// Build output respecting max_bytes
let mut total = 0usize;
for ch in arcs {
let len = ch.bytes.len();
if total + len > max_total { break; }
out_chunks.push(TerminalChunk { seq: ch.seq, t_ms: ch.t_ms, stream: ch.stream, bytes: ch.bytes.clone().to_vec() });
total += len;
}
let next_seq = if let Some(last) = out_chunks.last() { last.seq + 1 } else { tail_seq_now.saturating_add(1) };
BufferReadResult { chunks: out_chunks, next_seq, dropped }
}
/// Add a listener with optional replay and live follow.
pub fn add_listener(&self, listener: Arc<dyn ShellListener>, opts: ListenerOptions) -> u64 {
// Synchronous replay phase
let replay = self.read_buffer(opts.cursor.clone(), None);
if let Some(dr) = replay.dropped.as_ref() { listener.on_event(ShellEvent::Dropped { from_seq: dr.from_seq, to_seq: dr.to_seq }); }
for ch in replay.chunks { listener.on_event(ShellEvent::Chunk(ch)); }
// Live phase
let mut rx = self.sender.subscribe();
let id = self.next_listener_id.fetch_add(1, Ordering::Relaxed);
let default_coalesce_ms = self.default_coalesce_ms.load(Ordering::Relaxed) as u32;
let coalesce_ms = opts.coalesce_ms.unwrap_or(default_coalesce_ms);
let handle = tokio::spawn(async move {
let mut last_seq_seen: u64 = replay.next_seq.saturating_sub(1);
let mut acc: Vec<u8> = Vec::new();
let mut acc_stream: Option<StreamKind>;
let mut acc_last_seq: u64;
let mut acc_last_t: f64;
let window = Duration::from_millis(coalesce_ms as u64);
let mut pending_drop_from: Option<u64> = None;
loop {
// First receive an item
let first = match rx.recv().await {
Ok(c) => c,
Err(broadcast::error::RecvError::Lagged(_n)) => { pending_drop_from = Some(last_seq_seen.saturating_add(1)); continue; }
Err(broadcast::error::RecvError::Closed) => break,
};
if let Some(from) = pending_drop_from.take() {
if from <= first.seq.saturating_sub(1) {
listener.on_event(ShellEvent::Dropped { from_seq: from, to_seq: first.seq - 1 });
}
}
// Start accumulating
acc.clear(); acc_stream = Some(first.stream); acc_last_seq = first.seq; acc_last_t = first.t_ms; acc.extend_from_slice(&first.bytes);
last_seq_seen = first.seq;
// Drain within window while same stream
let mut deadline = tokio::time::Instant::now() + window;
loop {
let timeout = tokio::time::sleep_until(deadline);
tokio::pin!(timeout);
tokio::select! {
_ = &mut timeout => break,
msg = rx.recv() => {
match msg {
Ok(c) => {
if Some(c.stream) == acc_stream { acc.extend_from_slice(&c.bytes); acc_last_seq = c.seq; acc_last_t = c.t_ms; last_seq_seen = c.seq; }
else { // flush and start new
let chunk = TerminalChunk { seq: acc_last_seq, t_ms: acc_last_t, stream: acc_stream.unwrap_or(StreamKind::Stdout), bytes: std::mem::take(&mut acc) };
listener.on_event(ShellEvent::Chunk(chunk));
acc_stream = Some(c.stream); acc_last_seq = c.seq; acc_last_t = c.t_ms; acc.extend_from_slice(&c.bytes); last_seq_seen = c.seq;
deadline = tokio::time::Instant::now() + window;
}
}
Err(broadcast::error::RecvError::Lagged(_n)) => { pending_drop_from = Some(last_seq_seen.saturating_add(1)); break; }
Err(broadcast::error::RecvError::Closed) => { break; }
}
}
}
}
if let Some(s) = acc_stream.take() {
let chunk = TerminalChunk { seq: acc_last_seq, t_ms: acc_last_t, stream: s, bytes: std::mem::take(&mut acc) };
listener.on_event(ShellEvent::Chunk(chunk));
}
}
});
if let Ok(mut map) = self.listener_tasks.lock() { map.insert(id, handle); }
id
}
pub fn remove_listener(&self, id: u64) {
if let Ok(mut map) = self.listener_tasks.lock() {
if let Some(h) = map.remove(&id) { h.abort(); }
}
}
}
// Internal lifecycle helpers (not exported via UniFFI)
@@ -403,13 +633,22 @@ impl ShellSession {
if let Some(current) = guard.as_ref() {
if current.channel_id == self.channel_id { *guard = None; }
}
// Remove from registry
if let Ok(mut map) = SHELLS.lock() {
map.remove(&(parent.connection_id.clone(), self.channel_id));
}
}
Ok(())
}
fn evict_if_needed(&self) {
let cap = self.ring_bytes_capacity.load(Ordering::Relaxed);
let mut ring = self.ring.lock().unwrap();
let mut used = self.used_bytes.lock().unwrap();
while *used > cap {
if let Some(front) = ring.pop_front() {
*used -= front.bytes.len();
self.dropped_bytes_total.fetch_add(front.bytes.len() as u64, Ordering::Relaxed);
self.head_seq.store(front.seq.saturating_add(1), Ordering::Relaxed);
} else { break; }
}
}
}
/// ---------- Top-level API ----------
@@ -459,57 +698,12 @@ pub async fn connect(options: ConnectOptions) -> Result<Arc<SSHConnection>, SshE
handle: AsyncMutex::new(handle),
shell: AsyncMutex::new(None),
self_weak: AsyncMutex::new(Weak::new()),
listeners: Arc::new(Mutex::new(Vec::new())),
next_listener_id: Arc::new(Mutex::new(0)),
});
// Initialize weak self reference.
*conn.self_weak.lock().await = Arc::downgrade(&conn);
// Register connection in global registry (strong ref; explicit lifecycle)
if let Ok(mut map) = CONNECTIONS.lock() { map.insert(conn.connection_id.clone(), conn.clone()); }
Ok(conn)
}
/// ---------- Registry/listing API ----------
#[uniffi::export]
pub fn list_ssh_connections() -> Vec<SshConnectionInfo> {
// Collect clones outside the lock to avoid holding a MutexGuard across await
let conns: Vec<Arc<SSHConnection>> = CONNECTIONS
.lock()
.map(|map| map.values().cloned().collect())
.unwrap_or_default();
let mut out = Vec::with_capacity(conns.len());
for conn in conns { out.push(conn.info()); }
out
}
#[uniffi::export]
pub fn list_ssh_shells() -> Vec<ShellSessionInfo> {
// Collect shells outside the lock to avoid holding a MutexGuard across await
let shells: Vec<Arc<ShellSession>> = SHELLS
.lock()
.map(|map| map.values().cloned().collect())
.unwrap_or_default();
let mut out = Vec::with_capacity(shells.len());
for shell in shells { out.push(shell.info()); }
out
}
#[uniffi::export]
pub fn get_ssh_connection(id: String) -> Result<Arc<SSHConnection>, SshError> {
if let Ok(map) = CONNECTIONS.lock() { if let Some(conn) = map.get(&id) { return Ok(conn.clone()); } }
Err(SshError::Disconnected)
}
// list_ssh_shells_for_connection removed; derive in JS from list_ssh_connections + get_ssh_shell
#[uniffi::export]
pub fn get_ssh_shell(connection_id: String, channel_id: u32) -> Result<Arc<ShellSession>, SshError> {
let key = (connection_id, channel_id);
if let Ok(map) = SHELLS.lock() { if let Some(shell) = map.get(&key) { return Ok(shell.clone()); } }
Err(SshError::Disconnected)
}
#[uniffi::export(async_runtime = "tokio")]
pub async fn generate_key_pair(key_type: KeyType) -> Result<String, SshError> {
let mut rng = OsRng;
@@ -532,3 +726,53 @@ fn now_ms() -> f64 {
.unwrap_or_default();
d.as_millis() as f64
}
#[allow(clippy::too_many_arguments)]
fn append_and_broadcast(
data: &[u8],
stream: StreamKind,
ring: &Arc<Mutex<std::collections::VecDeque<Arc<Chunk>>>>,
used_bytes: &Arc<Mutex<usize>>,
ring_bytes_capacity: &Arc<AtomicUsize>,
dropped_bytes_total: &Arc<AtomicU64>,
head_seq: &Arc<AtomicU64>,
tail_seq: &Arc<AtomicU64>,
next_seq: &Arc<AtomicU64>,
sender: &broadcast::Sender<Arc<Chunk>>,
max_chunk: usize,
) {
let mut offset = 0usize;
while offset < data.len() {
let end = (offset + max_chunk).min(data.len());
let slice = &data[offset..end];
let seq = next_seq.fetch_add(1, Ordering::Relaxed);
let t_ms = now_ms();
let chunk = Arc::new(Chunk { seq, t_ms, stream, bytes: Bytes::copy_from_slice(slice) });
// push to ring
{
let mut q = ring.lock().unwrap();
q.push_back(chunk.clone());
}
{
let mut used = used_bytes.lock().unwrap();
*used += slice.len();
tail_seq.store(seq, Ordering::Relaxed);
// evict if needed
let cap = ring_bytes_capacity.load(Ordering::Relaxed);
if *used > cap {
let mut q = ring.lock().unwrap();
while *used > cap {
if let Some(front) = q.pop_front() {
*used -= front.bytes.len();
dropped_bytes_total.fetch_add(front.bytes.len() as u64, Ordering::Relaxed);
head_seq.store(front.seq.saturating_add(1), Ordering::Relaxed);
} else { break; }
}
}
}
// broadcast
let _ = sender.send(chunk);
offset = end;
}
}