-
Notifications
You must be signed in to change notification settings - Fork 1.3k
fix(tui): reduce large transcript stalls #3259
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
3bcbf15
e5de11c
e65b8e4
6bc0e56
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,16 +7,11 @@ | |
| * compositing into the live transcript's scrollback. It renders a parked | ||
| * subagent / advisor / collab-guest transcript that has no live in-view session. | ||
| * | ||
| * The transcript is rebuilt from scratch on every refresh ({@link ChatTranscriptBuilder.rebuild}) | ||
| * rather than synced incrementally, so a growing file-backed transcript (the | ||
| * advisor appends while you watch) can never duplicate or misorder rows. Scroll | ||
| * is owned end-to-end by a single {@link ScrollView}; the viewer follows the tail | ||
| * until the reader scrolls up. | ||
| * | ||
| * Local agents re-read the whole session file whenever its size or mtime changes | ||
| * (covering SessionManager's in-place rewrites, not just appends). Collab guests | ||
| * keep the incremental byte cursor the host's capped `readTranscript` requires | ||
| * and rebuild components from the accumulated entries. | ||
| * Local transcripts tail append-only growth: unchanged file identity plus stable | ||
| * sentinels means only newly appended JSONL is parsed and rendered. Rewrites, | ||
| * truncation, rotation, or sentinel drift fall back to a full rebuild so changed | ||
| * historical entries cannot leave stale components behind. Collab guests use the | ||
| * same append path over the host's byte-capped transcript reads. | ||
| */ | ||
| import * as fs from "node:fs"; | ||
| import type { AgentTool } from "@oh-my-pi/pi-agent-core"; | ||
|
|
@@ -64,6 +59,56 @@ export interface AgentTranscriptViewerDeps { | |
| /** How often to re-stat a file-backed transcript for growth (advisor/live tail). */ | ||
| const POLL_MS = 250; | ||
|
|
||
| const SENTINEL_BYTES = 4096; | ||
|
|
||
| interface LocalTranscriptSentinel { | ||
| offset: number; | ||
| bytes: Buffer; | ||
| } | ||
|
|
||
| interface LocalTranscriptState { | ||
| path: string; | ||
| dev: number; | ||
| ino: number; | ||
| size: number; | ||
| mtimeMs: number; | ||
| offset: number; | ||
| pending: string; | ||
| sentinels: LocalTranscriptSentinel[]; | ||
| } | ||
|
|
||
| function readFileRangeSync(file: string, offset: number, length: number): Buffer { | ||
| if (length <= 0) return Buffer.alloc(0); | ||
| const fd = fs.openSync(file, "r"); | ||
| try { | ||
| const buffer = Buffer.alloc(length); | ||
| const bytesRead = fs.readSync(fd, buffer, 0, length, offset); | ||
| return bytesRead === length ? buffer : buffer.subarray(0, bytesRead); | ||
| } finally { | ||
| fs.closeSync(fd); | ||
| } | ||
| } | ||
|
|
||
| function sentinelOffsets(size: number): number[] { | ||
| if (size <= 0) return []; | ||
| const length = Math.min(SENTINEL_BYTES, size); | ||
| return [...new Set([0, Math.max(0, Math.floor((size - length) / 2)), Math.max(0, size - length)])]; | ||
| } | ||
|
|
||
| function sentinelsFromBuffer(buffer: Buffer): LocalTranscriptSentinel[] { | ||
| const size = buffer.byteLength; | ||
| const length = Math.min(SENTINEL_BYTES, size); | ||
| return sentinelOffsets(size).map(offset => ({ | ||
| offset, | ||
| bytes: Buffer.from(buffer.subarray(offset, offset + length)), | ||
| })); | ||
| } | ||
|
|
||
| function sentinelsFromFile(file: string, size: number): LocalTranscriptSentinel[] { | ||
| const length = Math.min(SENTINEL_BYTES, size); | ||
| return sentinelOffsets(size).map(offset => ({ offset, bytes: readFileRangeSync(file, offset, length) })); | ||
| } | ||
|
|
||
| function statusBadge(status: AgentStatus): string { | ||
| switch (status) { | ||
| case "running": | ||
|
|
@@ -85,10 +130,9 @@ export class AgentTranscriptViewer implements Component { | |
| #notice: string | undefined; | ||
| #expanded = false; | ||
|
|
||
| // Local file transcript state: re-read when the file size or mtime changes. | ||
| #lastSignature = ""; | ||
| #localState: LocalTranscriptState | undefined; | ||
| #localUnavailable = ""; | ||
| // Remote transcript state (incremental; the host caps each read). | ||
| #remoteEntries: SessionMessageEntry[] = []; | ||
| #remoteBytes = 0; | ||
| #remoteFetchInFlight = false; | ||
| #remoteToken = 0; | ||
|
|
@@ -145,7 +189,7 @@ export class AgentTranscriptViewer implements Component { | |
| // Transcript loading | ||
| // ======================================================================== | ||
|
|
||
| /** Re-read the transcript and rebuild components when it changed. */ | ||
| /** Refresh the transcript from a local file or remote host. */ | ||
| #refresh(): void { | ||
| if (this.#disposed) return; | ||
| if (this.deps.remote) { | ||
|
|
@@ -154,39 +198,114 @@ export class AgentTranscriptViewer implements Component { | |
| } | ||
| const sessionFile = this.deps.registry.get(this.deps.agentId)?.sessionFile; | ||
| if (!sessionFile) { | ||
| if (this.#lastSignature !== "none") { | ||
| this.#lastSignature = "none"; | ||
| this.#rebuild([]); | ||
| } | ||
| this.#clearLocal("none"); | ||
| return; | ||
| } | ||
| let signature: string; | ||
| let stat: fs.Stats; | ||
| try { | ||
| const stat = fs.statSync(sessionFile); | ||
| // Include the path: a different file with the same size/mtime must not alias. | ||
| signature = `${sessionFile}:${stat.size}:${stat.mtimeMs}`; | ||
| stat = fs.statSync(sessionFile); | ||
| } catch { | ||
| // File deleted/rotated while open (e.g. the owning session was dropped): | ||
| // clear stale content once instead of freezing on it forever. | ||
| if (this.#lastSignature !== "missing") { | ||
| this.#lastSignature = "missing"; | ||
| this.#model = undefined; | ||
| this.#rebuild([]); | ||
| } | ||
| this.#clearLocal("missing"); | ||
| return; | ||
| } | ||
| if (signature === this.#lastSignature) return; | ||
| let text: string; | ||
| const state = this.#localState; | ||
| if (state && this.#canAppendLocal(sessionFile, stat, state)) { | ||
| if (stat.size === state.size && stat.mtimeMs === state.mtimeMs) return; | ||
| if (stat.size > state.size) { | ||
| this.#appendLocal(sessionFile, stat, state); | ||
| return; | ||
| } | ||
| } | ||
| this.#loadLocalFull(sessionFile, stat); | ||
| } | ||
|
|
||
| #clearLocal(reason: string): void { | ||
| if (!this.#localState && this.#localUnavailable === reason) return; | ||
| this.#localState = undefined; | ||
| this.#localUnavailable = reason; | ||
| this.#model = undefined; | ||
| this.#rebuild([]); | ||
| } | ||
|
|
||
| #canAppendLocal(sessionFile: string, stat: fs.Stats, state: LocalTranscriptState): boolean { | ||
| if (state.path !== sessionFile || state.dev !== stat.dev || state.ino !== stat.ino || stat.size < state.size) | ||
| return false; | ||
| for (const sentinel of state.sentinels) { | ||
| const current = readFileRangeSync(sessionFile, sentinel.offset, sentinel.bytes.byteLength); | ||
| if (!current.equals(sentinel.bytes)) return false; | ||
| } | ||
| return true; | ||
| } | ||
|
|
||
| #loadLocalFull(sessionFile: string, stat: fs.Stats): void { | ||
| let data: Buffer; | ||
| try { | ||
| text = fs.readFileSync(sessionFile, "utf-8"); | ||
| data = fs.readFileSync(sessionFile); | ||
| } catch (err) { | ||
| // Leave #lastSignature unchanged so a transient read error retries next poll. | ||
| // Leave #localState unchanged so a transient read error retries next poll. | ||
| logger.debug("transcript viewer: read failed", { err: String(err) }); | ||
| return; | ||
| } | ||
| this.#lastSignature = signature; | ||
| // The file may have grown between the earlier `statSync` and this read. | ||
| // Anchor the tail cursor to what we actually consumed so the next poll's | ||
| // `#appendLocal` never re-renders bytes already in the rebuilt transcript; | ||
| // re-stat for mtime/identity so the post-read clock matches what's on disk. | ||
| let post: fs.Stats; | ||
| try { | ||
| post = fs.statSync(sessionFile); | ||
| } catch { | ||
| post = stat; | ||
| } | ||
| // A reader that opens the file mid-append sees a trailing partial line | ||
| // (no terminating newline). Carry those bytes as `pending` so the next | ||
| // poll's `#appendLocal` joins them with the completion bytes instead of | ||
| // parsing a headless line fragment and dropping the entry. | ||
| const text = data.toString("utf-8"); | ||
| const lastNewline = text.lastIndexOf("\n"); | ||
| const complete = lastNewline >= 0 ? text.slice(0, lastNewline + 1) : ""; | ||
| const pending = lastNewline >= 0 ? text.slice(lastNewline + 1) : text; | ||
| this.#localUnavailable = ""; | ||
| this.#localState = { | ||
| path: sessionFile, | ||
| dev: post.dev, | ||
| ino: post.ino, | ||
| size: data.byteLength, | ||
| mtimeMs: post.mtimeMs, | ||
| offset: data.byteLength, | ||
| pending, | ||
| sentinels: sentinelsFromBuffer(data), | ||
| }; | ||
| this.#model = undefined; | ||
| this.#rebuild(this.#extractMessages(parseSessionEntries(text))); | ||
| this.#rebuild(this.#extractMessages(parseSessionEntries(complete))); | ||
| } | ||
|
|
||
| #appendLocal(sessionFile: string, stat: fs.Stats, state: LocalTranscriptState): void { | ||
| let chunk: string; | ||
| try { | ||
| chunk = readFileRangeSync(sessionFile, state.offset, stat.size - state.offset).toString("utf-8"); | ||
| } catch (err) { | ||
| logger.debug("transcript viewer: tail read failed", { err: String(err) }); | ||
| this.#loadLocalFull(sessionFile, stat); | ||
| return; | ||
| } | ||
| const combined = state.pending + chunk; | ||
| const lastNewline = combined.lastIndexOf("\n"); | ||
| const complete = lastNewline >= 0 ? combined.slice(0, lastNewline + 1) : ""; | ||
| const previousModel = this.#model; | ||
| const parsed = complete ? this.#extractMessages(parseSessionEntries(complete)) : []; | ||
| this.#localState = { | ||
| ...state, | ||
| size: stat.size, | ||
| mtimeMs: stat.mtimeMs, | ||
| offset: stat.size, | ||
| pending: lastNewline >= 0 ? combined.slice(lastNewline + 1) : combined, | ||
| sentinels: sentinelsFromFile(sessionFile, stat.size), | ||
| }; | ||
| if (parsed.length > 0) { | ||
| this.#append(parsed); | ||
| } else if (this.#model !== previousModel) { | ||
| this.deps.requestRender(); | ||
| } | ||
| } | ||
|
|
||
| #fetchRemote(): void { | ||
|
|
@@ -209,9 +328,13 @@ export class AgentTranscriptViewer implements Component { | |
| return; | ||
| } | ||
| if (result.newSize < fromByte) { | ||
| // Host transcript rotated/truncated — restart from 0. | ||
| // Host transcript rotated/truncated — drop the stale rendered rows | ||
| // before restarting; otherwise the post-rotation fetch would stack | ||
| // new content under the pre-rotation history. | ||
| this.#remoteBytes = 0; | ||
| this.#remoteEntries = []; | ||
| this.#hasRemoteData = false; | ||
| this.#model = undefined; | ||
| this.#rebuild([]); | ||
| this.#fetchRemote(); | ||
|
Comment on lines
330
to
338
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
In the collab-guest path, a host-side transcript truncation/rotation only resets the byte cursor before fetching from byte 0 again. Because this change now appends parsed remote entries directly into the existing Useful? React with 👍 / 👎. |
||
| return; | ||
| } | ||
|
|
@@ -222,10 +345,14 @@ export class AgentTranscriptViewer implements Component { | |
| if (lastNewline >= 0) { | ||
| const completeChunk = result.text.slice(0, lastNewline + 1); | ||
| this.#remoteBytes = fromByte + Buffer.byteLength(completeChunk, "utf-8"); | ||
| const previousModel = this.#model; | ||
| const parsed = this.#extractMessages(parseSessionEntries(completeChunk)); | ||
| if (parsed.length > 0) { | ||
| this.#remoteEntries.push(...parsed); | ||
| this.#rebuild(this.#remoteEntries); | ||
| this.#append(parsed); | ||
| return; | ||
| } | ||
| if (this.#model !== previousModel) { | ||
| this.deps.requestRender(); | ||
| return; | ||
| } | ||
| } | ||
|
|
@@ -257,6 +384,11 @@ export class AgentTranscriptViewer implements Component { | |
| this.deps.requestRender(); | ||
| } | ||
|
|
||
| #append(entries: SessionMessageEntry[]): void { | ||
| this.#builder.append(entries); | ||
| this.deps.requestRender(); | ||
| } | ||
|
|
||
| // ======================================================================== | ||
| // Input | ||
| // ======================================================================== | ||
|
|
@@ -455,8 +587,10 @@ export class AgentTranscriptViewer implements Component { | |
| } | ||
|
|
||
| #placeholder(): string { | ||
| if (this.deps.remote && this.#remoteUnavailable) return "Transcript lives on the host — not available."; | ||
| if (this.deps.remote && !this.#hasRemoteData) return "Loading transcript from host…"; | ||
| if (this.deps.remote) { | ||
| if (this.#remoteUnavailable) return "Transcript lives on the host — not available."; | ||
| return this.#hasRemoteData ? "No messages yet." : "Loading transcript from host…"; | ||
| } | ||
| if (!this.deps.registry.get(this.deps.agentId)?.sessionFile) return "No session file available yet."; | ||
| return "No messages yet."; | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On a local viewer, after
statSyncsucceeds,#canAppendLocalopens the file again for each sentinel without any catch. If the session file is deleted or rotated in that small window,fs.openSyncthrows out of#refresh/the poll timer instead of clearing or rebuilding the transcript, which can take down the TUI while an agent/session is being removed. Treat sentinel read failures as non-appendable or missing rather than letting them escape.Useful? React with 👍 / 👎.