Working xtermjs!

This commit is contained in:
EthanShoeDev
2025-09-18 00:17:35 -04:00
parent 7c448e2ec3
commit 808f476bac
4 changed files with 55 additions and 22 deletions

View File

@@ -255,6 +255,7 @@ pub struct ShellSession {
listener_tasks: Arc<Mutex<HashMap<u64, tokio::task::JoinHandle<()>>>>,
next_listener_id: AtomicU64,
default_coalesce_ms: AtomicU64,
rt_handle: tokio::runtime::Handle,
}
impl fmt::Debug for SSHConnection {
@@ -324,7 +325,20 @@ impl SSHConnection {
let channel_id: u32 = ch.id().into();
// Request PTY & shell.
ch.request_pty(true, pty.as_ssh_name(), 80, 24, 0, 0, &[]).await?;
// Request a PTY with basic sane defaults: enable ECHO and set speeds.
// RFC4254 terminal mode opcodes: 53=ECHO, 128=TTY_OP_ISPEED, 129=TTY_OP_OSPEED
let modes: &[(russh::Pty, u32)] = &[
(russh::Pty::ECHO, 1),
(russh::Pty::ECHOK, 1),
(russh::Pty::ECHOE, 1),
(russh::Pty::ICANON, 1),
(russh::Pty::ISIG, 1),
(russh::Pty::ICRNL, 1),
(russh::Pty::ONLCR, 1),
(russh::Pty::TTY_OP_ISPEED, 38400),
(russh::Pty::TTY_OP_OSPEED, 38400),
];
ch.request_pty(true, pty.as_ssh_name(), 80, 24, 0, 0, modes).await?;
ch.request_shell(true).await?;
// Split for read/write; spawn reader.
@@ -413,6 +427,7 @@ impl SSHConnection {
listener_tasks: Arc::new(Mutex::new(HashMap::new())),
next_listener_id: AtomicU64::new(1),
default_coalesce_ms,
rt_handle: tokio::runtime::Handle::current(),
});
*self.shell.lock().await = Some(session.clone());
@@ -471,8 +486,8 @@ impl ShellSession {
/// Buffer statistics snapshot.
pub fn buffer_stats(&self) -> BufferStats {
let used = *self.used_bytes.lock().unwrap() as u64;
let chunks = self.ring.lock().map(|q| q.len() as u64).unwrap_or(0);
let used = *self.used_bytes.lock().unwrap_or_else(|p| p.into_inner()) as u64;
let chunks = match self.ring.lock() { Ok(q) => q.len() as u64, Err(p) => p.into_inner().len() as u64 };
BufferStats {
ring_bytes: self.ring_bytes_capacity.load(Ordering::Relaxed) as u64,
used_bytes: used,
@@ -496,7 +511,7 @@ impl ShellSession {
// Lock ring to determine start and collect arcs, then drop lock.
let (_start_idx_unused, _start_seq, arcs): (usize, u64, Vec<Arc<Chunk>>) = {
let ring = self.ring.lock().unwrap();
let ring = match self.ring.lock() { Ok(g) => g, Err(p) => p.into_inner() };
let (start_seq, idx) = match cursor {
Cursor::Head => (head_seq_now, 0usize),
Cursor::Seq { seq: mut s } => {
@@ -540,19 +555,25 @@ impl ShellSession {
}
/// Add a listener with optional replay and live follow.
pub fn add_listener(&self, listener: Arc<dyn ShellListener>, opts: ListenerOptions) -> u64 {
// Synchronous replay phase
pub fn add_listener(&self, listener: Arc<dyn ShellListener>, opts: ListenerOptions) -> Result<u64, SshError> {
// Snapshot for replay; emit from task to avoid re-entrant callbacks during FFI.
let replay = self.read_buffer(opts.cursor.clone(), None);
if let Some(dr) = replay.dropped.as_ref() { listener.on_event(ShellEvent::Dropped { from_seq: dr.from_seq, to_seq: dr.to_seq }); }
for ch in replay.chunks { listener.on_event(ShellEvent::Chunk(ch)); }
// Live phase
let mut rx = self.sender.subscribe();
let id = self.next_listener_id.fetch_add(1, Ordering::Relaxed);
eprintln!("ShellSession.add_listener -> id={id}");
let default_coalesce_ms = self.default_coalesce_ms.load(Ordering::Relaxed) as u32;
let coalesce_ms = opts.coalesce_ms.unwrap_or(default_coalesce_ms);
let handle = tokio::spawn(async move {
let rt = self.rt_handle.clone();
let handle = rt.spawn(async move {
// Emit replay first
if let Some(dr) = replay.dropped.as_ref() {
listener.on_event(ShellEvent::Dropped { from_seq: dr.from_seq, to_seq: dr.to_seq });
}
for ch in replay.chunks.into_iter() {
listener.on_event(ShellEvent::Chunk(ch));
}
let mut last_seq_seen: u64 = replay.next_seq.saturating_sub(1);
let mut acc: Vec<u8> = Vec::new();
let mut acc_stream: Option<StreamKind>;
@@ -608,7 +629,7 @@ impl ShellSession {
}
});
if let Ok(mut map) = self.listener_tasks.lock() { map.insert(id, handle); }
id
Ok(id)
}
pub fn remove_listener(&self, id: u64) {
@@ -639,8 +660,8 @@ impl ShellSession {
fn evict_if_needed(&self) {
let cap = self.ring_bytes_capacity.load(Ordering::Relaxed);
let mut ring = self.ring.lock().unwrap();
let mut used = self.used_bytes.lock().unwrap();
let mut ring = match self.ring.lock() { Ok(g) => g, Err(p) => p.into_inner() };
let mut used = self.used_bytes.lock().unwrap_or_else(|p| p.into_inner());
while *used > cap {
if let Some(front) = ring.pop_front() {
*used -= front.bytes.len();
@@ -750,17 +771,17 @@ fn append_and_broadcast(
let chunk = Arc::new(Chunk { seq, t_ms, stream, bytes: Bytes::copy_from_slice(slice) });
// push to ring
{
let mut q = ring.lock().unwrap();
let mut q = match ring.lock() { Ok(g) => g, Err(p) => p.into_inner() };
q.push_back(chunk.clone());
}
{
let mut used = used_bytes.lock().unwrap();
let mut used = used_bytes.lock().unwrap_or_else(|p| p.into_inner());
*used += slice.len();
tail_seq.store(seq, Ordering::Relaxed);
// evict if needed
let cap = ring_bytes_capacity.load(Ordering::Relaxed);
if *used > cap {
let mut q = ring.lock().unwrap();
let mut q = match ring.lock() { Ok(g) => g, Err(p) => p.into_inner() };
while *used > cap {
if let Some(front) = q.pop_front() {
*used -= front.bytes.len();

View File

@@ -245,8 +245,15 @@ function wrapShellSession(shell: GeneratedRussh.ShellSessionInterface): SshShell
}
} satisfies GeneratedRussh.ShellListener;
const id = shell.addListener(listener, { cursor: cursorToGenerated(opts.cursor), coalesceMs: opts.coalesceMs });
return BigInt(id);
try {
const id = shell.addListener(listener, { cursor: cursorToGenerated(opts.cursor), coalesceMs: opts.coalesceMs });
if (id === 0n) {
throw new Error('Failed to attach shell listener (id=0)');
}
return BigInt(id);
} catch (e) {
throw new Error(`addListener failed: ${String((e as any)?.message ?? e)}`);
}
};
return {