-
Notifications
You must be signed in to change notification settings - Fork 1.3k
fix(coding-agent): tail appended session JSONL in large-session TUI #3281
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,26 +7,26 @@ | |
| * compositing into the live transcript's scrollback. It renders a parked | ||
| * subagent / advisor / collab-guest transcript that has no live in-view session. | ||
| * | ||
| * The transcript is rebuilt from scratch on every refresh ({@link ChatTranscriptBuilder.rebuild}) | ||
| * rather than synced incrementally, so a growing file-backed transcript (the | ||
| * advisor appends while you watch) can never duplicate or misorder rows. Scroll | ||
| * is owned end-to-end by a single {@link ScrollView}; the viewer follows the tail | ||
| * until the reader scrolls up. | ||
| * The viewer tails append-only JSONL when possible and falls back to a full | ||
| * compaction-aware rebuild when file identity changes, content is replaced, or a | ||
| * structural session entry arrives. Scroll is owned end-to-end by a single | ||
| * {@link ScrollView}; the viewer follows the tail until the reader scrolls up. | ||
| * | ||
| * Local agents re-read the whole session file whenever its size or mtime changes | ||
| * (covering SessionManager's in-place rewrites, not just appends). Collab guests | ||
| * keep the incremental byte cursor the host's capped `readTranscript` requires | ||
| * and rebuild components from the accumulated entries. | ||
| * Local agents read only newly appended bytes for normal writes while preserving | ||
| * an incomplete trailing JSONL line across polls. Collab guests keep the | ||
| * incremental byte cursor the host's capped `readTranscript` requires and clear | ||
| * stale rows when the host reports rotation. | ||
| */ | ||
| import * as fs from "node:fs"; | ||
| import type { AgentTool } from "@oh-my-pi/pi-agent-core"; | ||
| import type { AgentMessage, AgentTool } from "@oh-my-pi/pi-agent-core"; | ||
| import { type Component, Editor, matchesKey, parseSgrMouse, ScrollView, type TUI } from "@oh-my-pi/pi-tui"; | ||
| import { formatDuration, formatNumber, logger } from "@oh-my-pi/pi-utils"; | ||
| import type { KeyId } from "../../config/keybindings"; | ||
| import type { MessageRenderer } from "../../extensibility/extensions/types"; | ||
| import type { AgentLifecycleManager } from "../../registry/agent-lifecycle"; | ||
| import type { AgentRegistry, AgentStatus } from "../../registry/agent-registry"; | ||
| import type { FileEntry, SessionMessageEntry } from "../../session/session-entries"; | ||
| import { buildSessionContext } from "../../session/session-context"; | ||
| import type { FileEntry, SessionEntry } from "../../session/session-entries"; | ||
| import { parseSessionEntries } from "../../session/session-loader"; | ||
| import type { ObservableSession, SessionObserverRegistry } from "../session-observer-registry"; | ||
| import { getEditorTheme, theme } from "../theme/theme"; | ||
|
|
@@ -64,6 +64,27 @@ export interface AgentTranscriptViewerDeps { | |
| /** How often to re-stat a file-backed transcript for growth (advisor/live tail). */ | ||
| const POLL_MS = 250; | ||
|
|
||
| type LocalTailState = { | ||
| path: string; | ||
| dev: number; | ||
| ino: number; | ||
| size: number; | ||
| mtimeMs: number; | ||
| ctimeMs: number; | ||
| offset: number; | ||
| pending: string; | ||
| }; | ||
|
|
||
| function splitCompleteJsonl(text: string): { complete: string; pending: string } { | ||
| const lastNewline = text.lastIndexOf("\n"); | ||
| if (lastNewline < 0) return { complete: "", pending: text }; | ||
| return { complete: text.slice(0, lastNewline + 1), pending: text.slice(lastNewline + 1) }; | ||
| } | ||
|
|
||
| function isSessionEntry(entry: FileEntry): entry is SessionEntry { | ||
| return entry.type !== "session"; | ||
| } | ||
|
|
||
| function statusBadge(status: AgentStatus): string { | ||
| switch (status) { | ||
| case "running": | ||
|
|
@@ -85,10 +106,12 @@ export class AgentTranscriptViewer implements Component { | |
| #notice: string | undefined; | ||
| #expanded = false; | ||
|
|
||
| // Local file transcript state: re-read when the file size or mtime changes. | ||
| #lastSignature = ""; | ||
| // Local file transcript state: append-tail same-inode growth; rebuild on replacement. | ||
| #localState: LocalTailState | undefined; | ||
| #localEmptyReason: "none" | "missing" | undefined; | ||
| // Remote transcript state (incremental; the host caps each read). | ||
| #remoteEntries: SessionMessageEntry[] = []; | ||
| #remoteEntries: FileEntry[] = []; | ||
| #remotePending = ""; | ||
| #remoteBytes = 0; | ||
| #remoteFetchInFlight = false; | ||
| #remoteToken = 0; | ||
|
|
@@ -145,7 +168,7 @@ export class AgentTranscriptViewer implements Component { | |
| // Transcript loading | ||
| // ======================================================================== | ||
|
|
||
| /** Re-read the transcript and rebuild components when it changed. */ | ||
| /** Tail the transcript and rebuild components only when necessary. */ | ||
| #refresh(): void { | ||
| if (this.#disposed) return; | ||
| if (this.deps.remote) { | ||
|
|
@@ -154,39 +177,99 @@ export class AgentTranscriptViewer implements Component { | |
| } | ||
| const sessionFile = this.deps.registry.get(this.deps.agentId)?.sessionFile; | ||
| if (!sessionFile) { | ||
| if (this.#lastSignature !== "none") { | ||
| this.#lastSignature = "none"; | ||
| this.#rebuild([]); | ||
| if (this.#localEmptyReason !== "none") { | ||
| this.#localState = undefined; | ||
| this.#localEmptyReason = "none"; | ||
| this.#model = undefined; | ||
| this.#rebuildMessages([]); | ||
| } | ||
| return; | ||
| } | ||
| let signature: string; | ||
| let stat: fs.Stats; | ||
| try { | ||
| const stat = fs.statSync(sessionFile); | ||
| // Include the path: a different file with the same size/mtime must not alias. | ||
| signature = `${sessionFile}:${stat.size}:${stat.mtimeMs}`; | ||
| stat = fs.statSync(sessionFile); | ||
| } catch { | ||
| // File deleted/rotated while open (e.g. the owning session was dropped): | ||
| // clear stale content once instead of freezing on it forever. | ||
| if (this.#lastSignature !== "missing") { | ||
| this.#lastSignature = "missing"; | ||
| if (this.#localEmptyReason !== "missing") { | ||
| this.#localState = undefined; | ||
| this.#localEmptyReason = "missing"; | ||
| this.#model = undefined; | ||
| this.#rebuild([]); | ||
| this.#rebuildMessages([]); | ||
| } | ||
| return; | ||
| } | ||
| if (signature === this.#lastSignature) return; | ||
| let text: string; | ||
| this.#localEmptyReason = undefined; | ||
| const state = this.#localState; | ||
| const identityChanged = !state || state.path !== sessionFile || state.dev !== stat.dev || state.ino !== stat.ino; | ||
| const contentReplaced = | ||
| state && | ||
| state.path === sessionFile && | ||
| state.dev === stat.dev && | ||
| state.ino === stat.ino && | ||
| stat.size === state.size && | ||
| (stat.mtimeMs !== state.mtimeMs || stat.ctimeMs !== state.ctimeMs); | ||
| if (identityChanged || stat.size < (state?.offset ?? 0) || contentReplaced) { | ||
| this.#loadLocalFull(sessionFile, stat); | ||
| return; | ||
| } | ||
| if (!state || stat.size === state.offset) return; | ||
| let fd: number | undefined; | ||
| try { | ||
| fd = fs.openSync(sessionFile, "r"); | ||
| const length = stat.size - state.offset; | ||
| const buffer = Buffer.allocUnsafe(length); | ||
| fs.readSync(fd, buffer, 0, length, state.offset); | ||
| const { complete, pending } = splitCompleteJsonl(state.pending + buffer.toString("utf-8")); | ||
|
Comment on lines
+220
to
+221
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When the transcript is replaced or truncated between Useful? React with 👍 / 👎. |
||
| const entries = complete ? parseSessionEntries(complete) : []; | ||
| this.#localState = { | ||
| ...state, | ||
| size: stat.size, | ||
| mtimeMs: stat.mtimeMs, | ||
| ctimeMs: stat.ctimeMs, | ||
| offset: stat.size, | ||
| pending, | ||
| }; | ||
| const incremental = this.#incrementalMessages(entries); | ||
| if (incremental) { | ||
| this.#appendMessages(incremental); | ||
| } else { | ||
| this.#loadLocalFull(sessionFile, stat); | ||
| } | ||
| } catch (err) { | ||
| logger.debug("transcript viewer: append read failed", { err: String(err) }); | ||
| } finally { | ||
| if (fd !== undefined) fs.closeSync(fd); | ||
| } | ||
| } | ||
|
|
||
| #loadLocalFull(sessionFile: string, stat: fs.Stats): void { | ||
| let data: Buffer; | ||
| try { | ||
| text = fs.readFileSync(sessionFile, "utf-8"); | ||
| data = fs.readFileSync(sessionFile); | ||
| } catch (err) { | ||
| // Leave #lastSignature unchanged so a transient read error retries next poll. | ||
| logger.debug("transcript viewer: read failed", { err: String(err) }); | ||
| return; | ||
| } | ||
| this.#lastSignature = signature; | ||
| const { complete, pending } = splitCompleteJsonl(data.toString("utf-8")); | ||
| const entries = complete ? parseSessionEntries(complete) : []; | ||
| this.#model = undefined; | ||
| this.#rebuild(this.#extractMessages(parseSessionEntries(text))); | ||
| this.#scanModel(entries); | ||
| this.#rebuildMessages(this.#messagesFromEntries(entries)); | ||
| let nextStat = stat; | ||
| try { | ||
| nextStat = fs.statSync(sessionFile); | ||
| } catch { | ||
| nextStat = stat; | ||
| } | ||
| this.#localState = { | ||
| path: sessionFile, | ||
| dev: nextStat.dev, | ||
| ino: nextStat.ino, | ||
| size: data.byteLength, | ||
| mtimeMs: nextStat.mtimeMs, | ||
| ctimeMs: nextStat.ctimeMs, | ||
| offset: data.byteLength, | ||
| pending, | ||
| }; | ||
| } | ||
|
|
||
| #fetchRemote(): void { | ||
|
|
@@ -209,27 +292,39 @@ export class AgentTranscriptViewer implements Component { | |
| return; | ||
| } | ||
| if (result.newSize < fromByte) { | ||
| // Host transcript rotated/truncated — restart from 0. | ||
| // Host transcript rotated/truncated — clear stale rows and restart from 0. | ||
| this.#remoteBytes = 0; | ||
| this.#remotePending = ""; | ||
| this.#remoteEntries = []; | ||
| this.#hasRemoteData = false; | ||
| this.#rebuildMessages([]); | ||
| this.#fetchRemote(); | ||
| return; | ||
| } | ||
| this.#remoteUnavailable = false; | ||
| const firstData = !this.#hasRemoteData; | ||
| this.#hasRemoteData = true; | ||
| const lastNewline = result.text.lastIndexOf("\n"); | ||
| if (lastNewline >= 0) { | ||
| const completeChunk = result.text.slice(0, lastNewline + 1); | ||
| this.#remoteBytes = fromByte + Buffer.byteLength(completeChunk, "utf-8"); | ||
| const parsed = this.#extractMessages(parseSessionEntries(completeChunk)); | ||
| if (parsed.length > 0) { | ||
| this.#remoteEntries.push(...parsed); | ||
| this.#rebuild(this.#remoteEntries); | ||
| return; | ||
| const { complete, pending } = splitCompleteJsonl(this.#remotePending + result.text); | ||
| const parsed = complete ? parseSessionEntries(complete) : []; | ||
| this.#remotePending = pending; | ||
| this.#remoteBytes = result.newSize; | ||
| if (parsed.length > 0) { | ||
| this.#remoteEntries.push(...parsed); | ||
| const incremental = this.#incrementalMessages(parsed); | ||
| if (incremental) { | ||
| if (incremental.length > 0) { | ||
| this.#appendMessages(incremental); | ||
| } else if (firstData) { | ||
| this.deps.requestRender(); | ||
| } | ||
| } else { | ||
| this.#model = undefined; | ||
| this.#scanModel(this.#remoteEntries); | ||
| this.#rebuildMessages(this.#messagesFromEntries(this.#remoteEntries)); | ||
| } | ||
| return; | ||
| } | ||
| // First completed fetch (even empty) clears the "Loading…" placeholder. | ||
| // First completed fetch (even empty/header-only) clears the "Loading…" placeholder. | ||
| if (firstData) this.deps.requestRender(); | ||
| }) | ||
| .catch((error: unknown) => { | ||
|
|
@@ -238,22 +333,60 @@ export class AgentTranscriptViewer implements Component { | |
| }); | ||
| } | ||
|
|
||
| /** Filter to message entries, tracking the model from the first assistant / a model_change. */ | ||
| #extractMessages(entries: FileEntry[]): SessionMessageEntry[] { | ||
| const messages: SessionMessageEntry[] = []; | ||
| #messagesFromEntries(entries: readonly FileEntry[]): AgentMessage[] { | ||
| const sessionEntries = entries.filter(isSessionEntry); | ||
| // Display semantics differ from LLM context: a parked/advisor transcript must | ||
| // keep pending (resultless) tool calls visible. buildSessionContext strips | ||
| // dangling tool_use blocks, so only route through it when a compaction is | ||
| // present (where collapse is required); otherwise map messages verbatim. | ||
| if (!sessionEntries.some(entry => entry.type === "compaction")) { | ||
| const messages: AgentMessage[] = []; | ||
| for (const entry of sessionEntries) { | ||
| if (entry.type === "message") messages.push(entry.message); | ||
| } | ||
| return messages; | ||
| } | ||
| return buildSessionContext(sessionEntries, undefined, undefined, { | ||
| transcript: true, | ||
| collapseCompactedHistory: true, | ||
| }).messages; | ||
|
Comment on lines
+336
to
+352
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When Agent Hub opens a legacy transcript file (header has no version or version < 2), Useful? React with 👍 / 👎.
Comment on lines
+349
to
+352
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
For any transcript that already contains a compaction, this full-rebuild path routes the display through Useful? React with 👍 / 👎. |
||
| } | ||
|
|
||
| /** Return appendable messages, or undefined when a structural entry requires rebuild. */ | ||
| #incrementalMessages(entries: readonly FileEntry[]): AgentMessage[] | undefined { | ||
| const messages: AgentMessage[] = []; | ||
| for (const entry of entries) { | ||
| if (entry.type === "session") continue; | ||
| if (entry.type === "message") { | ||
| messages.push(entry); | ||
| messages.push(entry.message); | ||
| if (!this.#model && entry.message.role === "assistant") this.#model = entry.message.model; | ||
| } else if (entry.type === "model_change") { | ||
| this.#model = entry.model; | ||
|
Comment on lines
363
to
364
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should-fix: A |
||
| } else { | ||
| return undefined; | ||
| } | ||
| } | ||
| return messages; | ||
| } | ||
|
|
||
| #rebuild(entries: SessionMessageEntry[]): void { | ||
| this.#builder.rebuild(entries); | ||
| #scanModel(entries: readonly FileEntry[]): void { | ||
| for (const entry of entries) { | ||
| if (entry.type === "message" && !this.#model && entry.message.role === "assistant") { | ||
| this.#model = entry.message.model; | ||
| } else if (entry.type === "model_change") { | ||
| this.#model = entry.model; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| #rebuildMessages(messages: readonly AgentMessage[]): void { | ||
| this.#builder.rebuildMessages(messages); | ||
| this.deps.requestRender(); | ||
| } | ||
|
|
||
| #appendMessages(messages: readonly AgentMessage[]): void { | ||
| if (messages.length === 0) return; | ||
| this.#builder.appendMessages(messages); | ||
| this.deps.requestRender(); | ||
| } | ||
|
|
||
|
|
@@ -457,6 +590,7 @@ export class AgentTranscriptViewer implements Component { | |
| #placeholder(): string { | ||
| if (this.deps.remote && this.#remoteUnavailable) return "Transcript lives on the host — not available."; | ||
| if (this.deps.remote && !this.#hasRemoteData) return "Loading transcript from host…"; | ||
| if (this.deps.remote) return "No messages yet."; | ||
| if (!this.deps.registry.get(this.deps.agentId)?.sessionFile) return "No session file available yet."; | ||
| return "No messages yet."; | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should-fix: This adds a second
### Fixedinside the already-released## [16.1.15]section. Repo convention requires new entries under## [Unreleased]and released sections are immutable; this entry should be moved up to the existing Unreleased### Fixedblock at lines 9-15.