diff --git a/apps/mobile/src/app/(tabs)/shell/detail.tsx b/apps/mobile/src/app/(tabs)/shell/detail.tsx index 8915c54..a28a0e8 100644 --- a/apps/mobile/src/app/(tabs)/shell/detail.tsx +++ b/apps/mobile/src/app/(tabs)/shell/detail.tsx @@ -148,6 +148,7 @@ function ShellDetail() { onMessage={(m) => { console.log('received msg', m); if (m.type === 'initialized') { + if (terminalReadyRef.current) return; terminalReadyRef.current = true; // Replay from head, then attach live listener @@ -173,8 +174,9 @@ function ShellDetail() { const chunk = ev as TerminalChunk; xtermRef.current?.write(chunk.bytes); }, - { cursor: { mode: 'live' } }, + { cursor: { mode: 'seq', seq: res.nextSeq } }, ); + console.log('shell listener attached', id.toString()); listenerIdRef.current = id; })(); } @@ -185,7 +187,10 @@ function ShellDetail() { } if (m.type === 'data') { console.log('xterm->SSH', { len: m.data.length }); - void shell?.sendData(m.data.buffer as ArrayBuffer); + // Ensure we send the exact slice; send CR only for Enter. + const { buffer, byteOffset, byteLength } = m.data; + const ab = buffer.slice(byteOffset, byteOffset + byteLength); + void shell?.sendData(ab as ArrayBuffer); return; } if (m.type === 'debug') { diff --git a/packages/react-native-uniffi-russh/rust/uniffi-russh/src/lib.rs b/packages/react-native-uniffi-russh/rust/uniffi-russh/src/lib.rs index a72febf..710f06c 100644 --- a/packages/react-native-uniffi-russh/rust/uniffi-russh/src/lib.rs +++ b/packages/react-native-uniffi-russh/rust/uniffi-russh/src/lib.rs @@ -255,6 +255,7 @@ pub struct ShellSession { listener_tasks: Arc>>>, next_listener_id: AtomicU64, default_coalesce_ms: AtomicU64, + rt_handle: tokio::runtime::Handle, } impl fmt::Debug for SSHConnection { @@ -324,7 +325,20 @@ impl SSHConnection { let channel_id: u32 = ch.id().into(); // Request PTY & shell. - ch.request_pty(true, pty.as_ssh_name(), 80, 24, 0, 0, &[]).await?; + // Request a PTY with basic sane defaults: enable ECHO and set speeds. + // RFC4254 terminal mode opcodes: 53=ECHO, 128=TTY_OP_ISPEED, 129=TTY_OP_OSPEED + let modes: &[(russh::Pty, u32)] = &[ + (russh::Pty::ECHO, 1), + (russh::Pty::ECHOK, 1), + (russh::Pty::ECHOE, 1), + (russh::Pty::ICANON, 1), + (russh::Pty::ISIG, 1), + (russh::Pty::ICRNL, 1), + (russh::Pty::ONLCR, 1), + (russh::Pty::TTY_OP_ISPEED, 38400), + (russh::Pty::TTY_OP_OSPEED, 38400), + ]; + ch.request_pty(true, pty.as_ssh_name(), 80, 24, 0, 0, modes).await?; ch.request_shell(true).await?; // Split for read/write; spawn reader. @@ -413,6 +427,7 @@ impl SSHConnection { listener_tasks: Arc::new(Mutex::new(HashMap::new())), next_listener_id: AtomicU64::new(1), default_coalesce_ms, + rt_handle: tokio::runtime::Handle::current(), }); *self.shell.lock().await = Some(session.clone()); @@ -471,8 +486,8 @@ impl ShellSession { /// Buffer statistics snapshot. pub fn buffer_stats(&self) -> BufferStats { - let used = *self.used_bytes.lock().unwrap() as u64; - let chunks = self.ring.lock().map(|q| q.len() as u64).unwrap_or(0); + let used = *self.used_bytes.lock().unwrap_or_else(|p| p.into_inner()) as u64; + let chunks = match self.ring.lock() { Ok(q) => q.len() as u64, Err(p) => p.into_inner().len() as u64 }; BufferStats { ring_bytes: self.ring_bytes_capacity.load(Ordering::Relaxed) as u64, used_bytes: used, @@ -496,7 +511,7 @@ impl ShellSession { // Lock ring to determine start and collect arcs, then drop lock. let (_start_idx_unused, _start_seq, arcs): (usize, u64, Vec>) = { - let ring = self.ring.lock().unwrap(); + let ring = match self.ring.lock() { Ok(g) => g, Err(p) => p.into_inner() }; let (start_seq, idx) = match cursor { Cursor::Head => (head_seq_now, 0usize), Cursor::Seq { seq: mut s } => { @@ -540,19 +555,25 @@ impl ShellSession { } /// Add a listener with optional replay and live follow. - pub fn add_listener(&self, listener: Arc, opts: ListenerOptions) -> u64 { - // Synchronous replay phase + pub fn add_listener(&self, listener: Arc, opts: ListenerOptions) -> Result { + // Snapshot for replay; emit from task to avoid re-entrant callbacks during FFI. let replay = self.read_buffer(opts.cursor.clone(), None); - if let Some(dr) = replay.dropped.as_ref() { listener.on_event(ShellEvent::Dropped { from_seq: dr.from_seq, to_seq: dr.to_seq }); } - for ch in replay.chunks { listener.on_event(ShellEvent::Chunk(ch)); } - - // Live phase let mut rx = self.sender.subscribe(); let id = self.next_listener_id.fetch_add(1, Ordering::Relaxed); + eprintln!("ShellSession.add_listener -> id={id}"); let default_coalesce_ms = self.default_coalesce_ms.load(Ordering::Relaxed) as u32; let coalesce_ms = opts.coalesce_ms.unwrap_or(default_coalesce_ms); - let handle = tokio::spawn(async move { + let rt = self.rt_handle.clone(); + let handle = rt.spawn(async move { + // Emit replay first + if let Some(dr) = replay.dropped.as_ref() { + listener.on_event(ShellEvent::Dropped { from_seq: dr.from_seq, to_seq: dr.to_seq }); + } + for ch in replay.chunks.into_iter() { + listener.on_event(ShellEvent::Chunk(ch)); + } + let mut last_seq_seen: u64 = replay.next_seq.saturating_sub(1); let mut acc: Vec = Vec::new(); let mut acc_stream: Option; @@ -608,7 +629,7 @@ impl ShellSession { } }); if let Ok(mut map) = self.listener_tasks.lock() { map.insert(id, handle); } - id + Ok(id) } pub fn remove_listener(&self, id: u64) { @@ -639,8 +660,8 @@ impl ShellSession { fn evict_if_needed(&self) { let cap = self.ring_bytes_capacity.load(Ordering::Relaxed); - let mut ring = self.ring.lock().unwrap(); - let mut used = self.used_bytes.lock().unwrap(); + let mut ring = match self.ring.lock() { Ok(g) => g, Err(p) => p.into_inner() }; + let mut used = self.used_bytes.lock().unwrap_or_else(|p| p.into_inner()); while *used > cap { if let Some(front) = ring.pop_front() { *used -= front.bytes.len(); @@ -750,17 +771,17 @@ fn append_and_broadcast( let chunk = Arc::new(Chunk { seq, t_ms, stream, bytes: Bytes::copy_from_slice(slice) }); // push to ring { - let mut q = ring.lock().unwrap(); + let mut q = match ring.lock() { Ok(g) => g, Err(p) => p.into_inner() }; q.push_back(chunk.clone()); } { - let mut used = used_bytes.lock().unwrap(); + let mut used = used_bytes.lock().unwrap_or_else(|p| p.into_inner()); *used += slice.len(); tail_seq.store(seq, Ordering::Relaxed); // evict if needed let cap = ring_bytes_capacity.load(Ordering::Relaxed); if *used > cap { - let mut q = ring.lock().unwrap(); + let mut q = match ring.lock() { Ok(g) => g, Err(p) => p.into_inner() }; while *used > cap { if let Some(front) = q.pop_front() { *used -= front.bytes.len(); diff --git a/packages/react-native-uniffi-russh/src/api.ts b/packages/react-native-uniffi-russh/src/api.ts index 34befea..97f5385 100644 --- a/packages/react-native-uniffi-russh/src/api.ts +++ b/packages/react-native-uniffi-russh/src/api.ts @@ -245,8 +245,15 @@ function wrapShellSession(shell: GeneratedRussh.ShellSessionInterface): SshShell } } satisfies GeneratedRussh.ShellListener; - const id = shell.addListener(listener, { cursor: cursorToGenerated(opts.cursor), coalesceMs: opts.coalesceMs }); - return BigInt(id); + try { + const id = shell.addListener(listener, { cursor: cursorToGenerated(opts.cursor), coalesceMs: opts.coalesceMs }); + if (id === 0n) { + throw new Error('Failed to attach shell listener (id=0)'); + } + return BigInt(id); + } catch (e) { + throw new Error(`addListener failed: ${String((e as any)?.message ?? e)}`); + } }; return { diff --git a/packages/react-native-xtermjs-webview/src-internal/main.tsx b/packages/react-native-xtermjs-webview/src-internal/main.tsx index 7d5272e..7b0b51e 100644 --- a/packages/react-native-xtermjs-webview/src-internal/main.tsx +++ b/packages/react-native-xtermjs-webview/src-internal/main.tsx @@ -54,7 +54,7 @@ if (window.__FRESSH_XTERM_BRIDGE__) { const enc = new TextEncoder(); // Initial handshake (send once) - setTimeout(() => post({ type: 'initialized' }), 8_000); + setTimeout(() => post({ type: 'initialized' }), 500); // User input from xterm -> RN (SSH) as UTF-8 bytes (Base64) term.onData((data /* string */) => {