diff --git a/packages/coding-agent/CHANGELOG.md b/packages/coding-agent/CHANGELOG.md index cde28978fe..1b69514cea 100644 --- a/packages/coding-agent/CHANGELOG.md +++ b/packages/coding-agent/CHANGELOG.md @@ -34,6 +34,9 @@ - Fixed configured model discovery caches to refresh when `models.yml`/`models.json` is newer than the cached row, so updated local model metadata is not shadowed by fresh `models.db` entries. ([#3242](https://github.com/can1357/oh-my-pi/issues/3242)) - Fixed hide-secrets handling so advisor session updates are redacted before the advisor model sees them and opaque assistant thinking blocks are no longer deobfuscated. - Filtered alias definitions brush's whitespace-only expander cannot execute (`(`, `)`, `|`, `&`, `;`, `<`, `>`, `` ` ``) from the bash-tool shell snapshot, so user rc-files containing compound aliases like Fedora's default `which='(alias; declare -f) | /usr/bin/which …'` no longer poison the brush session with `error: command not found: (alias;` ([#3234](https://github.com/can1357/oh-my-pi/issues/3234)). +### Fixed + +- Fixed large parked-agent/advisor transcript viewers and live chat rebuilds stalling on long sessions by tailing appended session JSONL, preserving partial writes, and collapsing compacted display history for hot TUI surfaces. ## [16.1.14] - 2026-06-22 diff --git a/packages/coding-agent/src/modes/components/agent-transcript-viewer.ts b/packages/coding-agent/src/modes/components/agent-transcript-viewer.ts index 5ac0b01701..795378cfe3 100644 --- a/packages/coding-agent/src/modes/components/agent-transcript-viewer.ts +++ b/packages/coding-agent/src/modes/components/agent-transcript-viewer.ts @@ -7,26 +7,26 @@ * compositing into the live transcript's scrollback. It renders a parked * subagent / advisor / collab-guest transcript that has no live in-view session. * - * The transcript is rebuilt from scratch on every refresh ({@link ChatTranscriptBuilder.rebuild}) - * rather than synced incrementally, so a growing file-backed transcript (the - * advisor appends while you watch) can never duplicate or misorder rows. Scroll - * is owned end-to-end by a single {@link ScrollView}; the viewer follows the tail - * until the reader scrolls up. + * The viewer tails append-only JSONL when possible and falls back to a full + * compaction-aware rebuild when file identity changes, content is replaced, or a + * structural session entry arrives. Scroll is owned end-to-end by a single + * {@link ScrollView}; the viewer follows the tail until the reader scrolls up. * - * Local agents re-read the whole session file whenever its size or mtime changes - * (covering SessionManager's in-place rewrites, not just appends). Collab guests - * keep the incremental byte cursor the host's capped `readTranscript` requires - * and rebuild components from the accumulated entries. + * Local agents read only newly appended bytes for normal writes while preserving + * an incomplete trailing JSONL line across polls. Collab guests keep the + * incremental byte cursor the host's capped `readTranscript` requires and clear + * stale rows when the host reports rotation. */ import * as fs from "node:fs"; -import type { AgentTool } from "@oh-my-pi/pi-agent-core"; +import type { AgentMessage, AgentTool } from "@oh-my-pi/pi-agent-core"; import { type Component, Editor, matchesKey, parseSgrMouse, ScrollView, type TUI } from "@oh-my-pi/pi-tui"; import { formatDuration, formatNumber, logger } from "@oh-my-pi/pi-utils"; import type { KeyId } from "../../config/keybindings"; import type { MessageRenderer } from "../../extensibility/extensions/types"; import type { AgentLifecycleManager } from "../../registry/agent-lifecycle"; import type { AgentRegistry, AgentStatus } from "../../registry/agent-registry"; -import type { FileEntry, SessionMessageEntry } from "../../session/session-entries"; +import { buildSessionContext } from "../../session/session-context"; +import type { FileEntry, SessionEntry } from "../../session/session-entries"; import { parseSessionEntries } from "../../session/session-loader"; import type { ObservableSession, SessionObserverRegistry } from "../session-observer-registry"; import { getEditorTheme, theme } from "../theme/theme"; @@ -64,6 +64,27 @@ export interface AgentTranscriptViewerDeps { /** How often to re-stat a file-backed transcript for growth (advisor/live tail). */ const POLL_MS = 250; +type LocalTailState = { + path: string; + dev: number; + ino: number; + size: number; + mtimeMs: number; + ctimeMs: number; + offset: number; + pending: string; +}; + +function splitCompleteJsonl(text: string): { complete: string; pending: string } { + const lastNewline = text.lastIndexOf("\n"); + if (lastNewline < 0) return { complete: "", pending: text }; + return { complete: text.slice(0, lastNewline + 1), pending: text.slice(lastNewline + 1) }; +} + +function isSessionEntry(entry: FileEntry): entry is SessionEntry { + return entry.type !== "session"; +} + function statusBadge(status: AgentStatus): string { switch (status) { case "running": @@ -85,10 +106,12 @@ export class AgentTranscriptViewer implements Component { #notice: string | undefined; #expanded = false; - // Local file transcript state: re-read when the file size or mtime changes. - #lastSignature = ""; + // Local file transcript state: append-tail same-inode growth; rebuild on replacement. + #localState: LocalTailState | undefined; + #localEmptyReason: "none" | "missing" | undefined; // Remote transcript state (incremental; the host caps each read). - #remoteEntries: SessionMessageEntry[] = []; + #remoteEntries: FileEntry[] = []; + #remotePending = ""; #remoteBytes = 0; #remoteFetchInFlight = false; #remoteToken = 0; @@ -145,7 +168,7 @@ export class AgentTranscriptViewer implements Component { // Transcript loading // ======================================================================== - /** Re-read the transcript and rebuild components when it changed. */ + /** Tail the transcript and rebuild components only when necessary. */ #refresh(): void { if (this.#disposed) return; if (this.deps.remote) { @@ -154,39 +177,99 @@ export class AgentTranscriptViewer implements Component { } const sessionFile = this.deps.registry.get(this.deps.agentId)?.sessionFile; if (!sessionFile) { - if (this.#lastSignature !== "none") { - this.#lastSignature = "none"; - this.#rebuild([]); + if (this.#localEmptyReason !== "none") { + this.#localState = undefined; + this.#localEmptyReason = "none"; + this.#model = undefined; + this.#rebuildMessages([]); } return; } - let signature: string; + let stat: fs.Stats; try { - const stat = fs.statSync(sessionFile); - // Include the path: a different file with the same size/mtime must not alias. - signature = `${sessionFile}:${stat.size}:${stat.mtimeMs}`; + stat = fs.statSync(sessionFile); } catch { - // File deleted/rotated while open (e.g. the owning session was dropped): - // clear stale content once instead of freezing on it forever. - if (this.#lastSignature !== "missing") { - this.#lastSignature = "missing"; + if (this.#localEmptyReason !== "missing") { + this.#localState = undefined; + this.#localEmptyReason = "missing"; this.#model = undefined; - this.#rebuild([]); + this.#rebuildMessages([]); } return; } - if (signature === this.#lastSignature) return; - let text: string; + this.#localEmptyReason = undefined; + const state = this.#localState; + const identityChanged = !state || state.path !== sessionFile || state.dev !== stat.dev || state.ino !== stat.ino; + const contentReplaced = + state && + state.path === sessionFile && + state.dev === stat.dev && + state.ino === stat.ino && + stat.size === state.size && + (stat.mtimeMs !== state.mtimeMs || stat.ctimeMs !== state.ctimeMs); + if (identityChanged || stat.size < (state?.offset ?? 0) || contentReplaced) { + this.#loadLocalFull(sessionFile, stat); + return; + } + if (!state || stat.size === state.offset) return; + let fd: number | undefined; + try { + fd = fs.openSync(sessionFile, "r"); + const length = stat.size - state.offset; + const buffer = Buffer.allocUnsafe(length); + fs.readSync(fd, buffer, 0, length, state.offset); + const { complete, pending } = splitCompleteJsonl(state.pending + buffer.toString("utf-8")); + const entries = complete ? parseSessionEntries(complete) : []; + this.#localState = { + ...state, + size: stat.size, + mtimeMs: stat.mtimeMs, + ctimeMs: stat.ctimeMs, + offset: stat.size, + pending, + }; + const incremental = this.#incrementalMessages(entries); + if (incremental) { + this.#appendMessages(incremental); + } else { + this.#loadLocalFull(sessionFile, stat); + } + } catch (err) { + logger.debug("transcript viewer: append read failed", { err: String(err) }); + } finally { + if (fd !== undefined) fs.closeSync(fd); + } + } + + #loadLocalFull(sessionFile: string, stat: fs.Stats): void { + let data: Buffer; try { - text = fs.readFileSync(sessionFile, "utf-8"); + data = fs.readFileSync(sessionFile); } catch (err) { - // Leave #lastSignature unchanged so a transient read error retries next poll. logger.debug("transcript viewer: read failed", { err: String(err) }); return; } - this.#lastSignature = signature; + const { complete, pending } = splitCompleteJsonl(data.toString("utf-8")); + const entries = complete ? parseSessionEntries(complete) : []; this.#model = undefined; - this.#rebuild(this.#extractMessages(parseSessionEntries(text))); + this.#scanModel(entries); + this.#rebuildMessages(this.#messagesFromEntries(entries)); + let nextStat = stat; + try { + nextStat = fs.statSync(sessionFile); + } catch { + nextStat = stat; + } + this.#localState = { + path: sessionFile, + dev: nextStat.dev, + ino: nextStat.ino, + size: data.byteLength, + mtimeMs: nextStat.mtimeMs, + ctimeMs: nextStat.ctimeMs, + offset: data.byteLength, + pending, + }; } #fetchRemote(): void { @@ -209,27 +292,39 @@ export class AgentTranscriptViewer implements Component { return; } if (result.newSize < fromByte) { - // Host transcript rotated/truncated — restart from 0. + // Host transcript rotated/truncated — clear stale rows and restart from 0. this.#remoteBytes = 0; + this.#remotePending = ""; this.#remoteEntries = []; + this.#hasRemoteData = false; + this.#rebuildMessages([]); this.#fetchRemote(); return; } this.#remoteUnavailable = false; const firstData = !this.#hasRemoteData; this.#hasRemoteData = true; - const lastNewline = result.text.lastIndexOf("\n"); - if (lastNewline >= 0) { - const completeChunk = result.text.slice(0, lastNewline + 1); - this.#remoteBytes = fromByte + Buffer.byteLength(completeChunk, "utf-8"); - const parsed = this.#extractMessages(parseSessionEntries(completeChunk)); - if (parsed.length > 0) { - this.#remoteEntries.push(...parsed); - this.#rebuild(this.#remoteEntries); - return; + const { complete, pending } = splitCompleteJsonl(this.#remotePending + result.text); + const parsed = complete ? parseSessionEntries(complete) : []; + this.#remotePending = pending; + this.#remoteBytes = result.newSize; + if (parsed.length > 0) { + this.#remoteEntries.push(...parsed); + const incremental = this.#incrementalMessages(parsed); + if (incremental) { + if (incremental.length > 0) { + this.#appendMessages(incremental); + } else if (firstData) { + this.deps.requestRender(); + } + } else { + this.#model = undefined; + this.#scanModel(this.#remoteEntries); + this.#rebuildMessages(this.#messagesFromEntries(this.#remoteEntries)); } + return; } - // First completed fetch (even empty) clears the "Loading…" placeholder. + // First completed fetch (even empty/header-only) clears the "Loading…" placeholder. if (firstData) this.deps.requestRender(); }) .catch((error: unknown) => { @@ -238,22 +333,60 @@ export class AgentTranscriptViewer implements Component { }); } - /** Filter to message entries, tracking the model from the first assistant / a model_change. */ - #extractMessages(entries: FileEntry[]): SessionMessageEntry[] { - const messages: SessionMessageEntry[] = []; + #messagesFromEntries(entries: readonly FileEntry[]): AgentMessage[] { + const sessionEntries = entries.filter(isSessionEntry); + // Display semantics differ from LLM context: a parked/advisor transcript must + // keep pending (resultless) tool calls visible. buildSessionContext strips + // dangling tool_use blocks, so only route through it when a compaction is + // present (where collapse is required); otherwise map messages verbatim. + if (!sessionEntries.some(entry => entry.type === "compaction")) { + const messages: AgentMessage[] = []; + for (const entry of sessionEntries) { + if (entry.type === "message") messages.push(entry.message); + } + return messages; + } + return buildSessionContext(sessionEntries, undefined, undefined, { + transcript: true, + collapseCompactedHistory: true, + }).messages; + } + + /** Return appendable messages, or undefined when a structural entry requires rebuild. */ + #incrementalMessages(entries: readonly FileEntry[]): AgentMessage[] | undefined { + const messages: AgentMessage[] = []; for (const entry of entries) { + if (entry.type === "session") continue; if (entry.type === "message") { - messages.push(entry); + messages.push(entry.message); if (!this.#model && entry.message.role === "assistant") this.#model = entry.message.model; } else if (entry.type === "model_change") { this.#model = entry.model; + } else { + return undefined; } } return messages; } - #rebuild(entries: SessionMessageEntry[]): void { - this.#builder.rebuild(entries); + #scanModel(entries: readonly FileEntry[]): void { + for (const entry of entries) { + if (entry.type === "message" && !this.#model && entry.message.role === "assistant") { + this.#model = entry.message.model; + } else if (entry.type === "model_change") { + this.#model = entry.model; + } + } + } + + #rebuildMessages(messages: readonly AgentMessage[]): void { + this.#builder.rebuildMessages(messages); + this.deps.requestRender(); + } + + #appendMessages(messages: readonly AgentMessage[]): void { + if (messages.length === 0) return; + this.#builder.appendMessages(messages); this.deps.requestRender(); } @@ -457,6 +590,7 @@ export class AgentTranscriptViewer implements Component { #placeholder(): string { if (this.deps.remote && this.#remoteUnavailable) return "Transcript lives on the host — not available."; if (this.deps.remote && !this.#hasRemoteData) return "Loading transcript from host…"; + if (this.deps.remote) return "No messages yet."; if (!this.deps.registry.get(this.deps.agentId)?.sessionFile) return "No session file available yet."; return "No messages yet."; } diff --git a/packages/coding-agent/src/modes/components/chat-transcript-builder.ts b/packages/coding-agent/src/modes/components/chat-transcript-builder.ts index 9a854d71f4..8e15a425e0 100644 --- a/packages/coding-agent/src/modes/components/chat-transcript-builder.ts +++ b/packages/coding-agent/src/modes/components/chat-transcript-builder.ts @@ -5,11 +5,9 @@ * viewer ({@link AgentTranscriptViewer}) to render a parked subagent / advisor / * collab-guest transcript that has no live session. * - * Unlike the old incremental hub sync, {@link ChatTranscriptBuilder.rebuild} - * always discards prior components and rebuilds the whole transcript from the - * supplied entries. Re-rendering a growing transcript is therefore O(n) in the - * entry count, but it cannot duplicate or misorder rows the way incremental - * component reuse could. + * {@link ChatTranscriptBuilder.append} tails new messages through the same + * per-message path used by full rebuilds. Identity changes still rebuild from + * scratch; append-only refreshes avoid re-rendering old rows. */ import type { AgentMessage, AgentTool } from "@oh-my-pi/pi-agent-core"; import type { Usage } from "@oh-my-pi/pi-ai"; @@ -94,9 +92,25 @@ export class ChatTranscriptBuilder { } /** Discard all components and rebuild the whole transcript from `entries`. */ - rebuild(entries: SessionMessageEntry[]): void { + rebuild(entries: readonly SessionMessageEntry[]): void { this.reset(); - for (const entry of entries) this.#appendChatMessage(entry.message); + this.append(entries); + } + + /** Append persisted session entries to the existing transcript. */ + append(entries: readonly SessionMessageEntry[]): void { + this.appendMessages(entries.map(entry => entry.message)); + } + + /** Discard all components and rebuild the whole transcript from messages. */ + rebuildMessages(messages: readonly AgentMessage[]): void { + this.reset(); + this.appendMessages(messages); + } + + /** Append messages to the existing transcript using the normal render path. */ + appendMessages(messages: readonly AgentMessage[]): void { + for (const message of messages) this.#appendChatMessage(message); // Flush the trailing turn's usage row only once its tools are materialized // (a read whose result has not arrived stays pending); otherwise the row // would sit above its tools. The drain happens here at the end of the pass. diff --git a/packages/coding-agent/src/modes/interactive-mode.ts b/packages/coding-agent/src/modes/interactive-mode.ts index 4533052985..89dc1b3b78 100644 --- a/packages/coding-agent/src/modes/interactive-mode.ts +++ b/packages/coding-agent/src/modes/interactive-mode.ts @@ -1443,7 +1443,7 @@ export class InteractiveMode implements InteractiveModeContext { this.chatContainer.clear(); // Full-history transcript: compactions render as inline dividers instead // of restarting the visible conversation (the LLM context still resets). - const context = this.viewSession.buildTranscriptSessionContext(); + const context = this.viewSession.buildTranscriptSessionContext({ collapseCompactedHistory: true }); this.renderSessionContext(context); // During the pre-streaming window — after `startPendingSubmission` has // optimistically rendered the user's message but before the user diff --git a/packages/coding-agent/src/modes/utils/ui-helpers.ts b/packages/coding-agent/src/modes/utils/ui-helpers.ts index 2be0c90f47..48b8887501 100644 --- a/packages/coding-agent/src/modes/utils/ui-helpers.ts +++ b/packages/coding-agent/src/modes/utils/ui-helpers.ts @@ -519,7 +519,7 @@ export class UiHelpers { // Display always uses the full-history transcript: compactions show as // inline dividers instead of restarting the visible conversation. - const context = this.ctx.viewSession.buildTranscriptSessionContext(); + const context = this.ctx.viewSession.buildTranscriptSessionContext({ collapseCompactedHistory: true }); this.ctx.renderSessionContext(context, { updateFooter: true, populateHistory: !this.ctx.focusedAgentId, diff --git a/packages/coding-agent/src/session/agent-session.ts b/packages/coding-agent/src/session/agent-session.ts index 2b1f21597b..6a5dd39ffa 100644 --- a/packages/coding-agent/src/session/agent-session.ts +++ b/packages/coding-agent/src/session/agent-session.ts @@ -1115,6 +1115,10 @@ function toRestoredQueuedMessage(message: AgentMessage): RestoredQueuedMessage { return { text: queueChipText(message), images: queuedImageContent(message) }; } +export interface TranscriptSessionContextOptions { + collapseCompactedHistory?: boolean; +} + export class AgentSession { readonly agent: Agent; readonly sessionManager: SessionManager; @@ -5233,8 +5237,14 @@ export class AgentSession { * fired (instead of replacing prior history). Display-only — NEVER feed * the result to `agent.replaceMessages` or a provider. */ - buildTranscriptSessionContext(): SessionContext { - return deobfuscateSessionContext(this.sessionManager.buildSessionContext({ transcript: true }), this.#obfuscator); + buildTranscriptSessionContext(options: TranscriptSessionContextOptions = {}): SessionContext { + return deobfuscateSessionContext( + this.sessionManager.buildSessionContext({ + transcript: true, + collapseCompactedHistory: options.collapseCompactedHistory === true, + }), + this.#obfuscator, + ); } #obfuscateTextForProvider(text: string | undefined): string | undefined { diff --git a/packages/coding-agent/src/session/session-context.ts b/packages/coding-agent/src/session/session-context.ts index 2d9fa39322..0b0e99a209 100644 --- a/packages/coding-agent/src/session/session-context.ts +++ b/packages/coding-agent/src/session/session-context.ts @@ -69,6 +69,12 @@ export interface BuildSessionContextOptions { * result to a provider. */ transcript?: boolean; + /** + * Display-only transcript optimization honored only when `transcript: true`: + * collapse history before the latest compaction while keeping display-visible + * turns from `firstKeptEntryId` onward. + */ + collapseCompactedHistory?: boolean; } /** @@ -255,59 +261,73 @@ export function buildSessionContext( } }; + const providerPayloadForCompaction = (entry: CompactionEntry): ProviderPayload | undefined => { + const candidate = entry.preserveData?.openaiRemoteCompaction; + if (!candidate || typeof candidate !== "object") return undefined; + const remote = candidate as { provider?: unknown; replacementHistory?: unknown }; + if (typeof remote.provider !== "string" || remote.provider.length === 0) return undefined; + if (!Array.isArray(remote.replacementHistory)) return undefined; + return { + type: "openaiResponsesHistory", + provider: remote.provider, + items: remote.replacementHistory as Array>, + }; + }; + + const pushCompactionSummary = (entry: CompactionEntry) => { + const snapcompactArchive = snapcompact.getPreservedArchive(entry.preserveData); + pushMessage( + createCompactionSummaryMessage( + entry.summary, + entry.tokensBefore, + entry.timestamp, + entry.shortSummary, + providerPayloadForCompaction(entry), + undefined, + snapcompactArchive ? snapcompact.historyBlocks(snapcompactArchive) : undefined, + ), + ); + }; + if (options?.transcript) { - // Display transcript: every entry in chronological order. Compactions do - // not erase prior history here — each renders inline (as a divider in the - // TUI) at the point it fired, with any snapcompact frames re-attached so - // the component can report them. - for (const entry of path) { - handleEntryResetTracking(entry); - if (entry.type === "compaction") { - const snapcompactArchive = snapcompact.getPreservedArchive(entry.preserveData); - pushMessage( - createCompactionSummaryMessage( - entry.summary, - entry.tokensBefore, - entry.timestamp, - entry.shortSummary, - undefined, - undefined, - snapcompactArchive ? snapcompact.historyBlocks(snapcompactArchive) : undefined, - ), - ); - } else { - appendMessage(entry); + // Display transcript: every entry in chronological order by default. + // Compactions render inline (as dividers in the TUI) instead of erasing + // prior history. Hot live surfaces may request a display-only collapse + // around the latest compaction to avoid rebuilding very old rows. + if (options.collapseCompactedHistory === true && compaction) { + handleEntryResetTracking(compaction); + pushCompactionSummary(compaction); + const compactionIdx = path.findIndex(e => e.type === "compaction" && e.id === compaction.id); + let foundFirstKept = false; + for (let i = 0; i < compactionIdx; i++) { + const entry = path[i]; + if (entry.id === compaction.firstKeptEntryId) { + foundFirstKept = true; + } + if (foundFirstKept) { + appendMessage(entry); + } + } + for (let i = compactionIdx + 1; i < path.length; i++) { + appendMessage(path[i]); + } + } else { + for (const entry of path) { + handleEntryResetTracking(entry); + if (entry.type === "compaction") { + pushCompactionSummary(entry); + } else { + appendMessage(entry); + } } } } else if (compaction) { - const providerPayload: ProviderPayload | undefined = (() => { - const candidate = compaction.preserveData?.openaiRemoteCompaction; - if (!candidate || typeof candidate !== "object") return undefined; - const remote = candidate as { provider?: unknown; replacementHistory?: unknown }; - if (typeof remote.provider !== "string" || remote.provider.length === 0) return undefined; - if (!Array.isArray(remote.replacementHistory)) return undefined; - return { - type: "openaiResponsesHistory", - provider: remote.provider, - items: remote.replacementHistory as Array>, - }; - })(); + const providerPayload = providerPayloadForCompaction(compaction); const remoteReplacementHistory = providerPayload?.items; // Emit summary first; re-attach any archived snapcompact frames so the // model can keep reading the archived history after every context rebuild. - const snapcompactArchive = snapcompact.getPreservedArchive(compaction.preserveData); - pushMessage( - createCompactionSummaryMessage( - compaction.summary, - compaction.tokensBefore, - compaction.timestamp, - compaction.shortSummary, - providerPayload, - undefined, - snapcompactArchive ? snapcompact.historyBlocks(snapcompactArchive) : undefined, - ), - ); + pushCompactionSummary(compaction); // Find compaction index in path const compactionIdx = path.findIndex(e => e.type === "compaction" && e.id === compaction.id); diff --git a/packages/coding-agent/src/session/session-storage.ts b/packages/coding-agent/src/session/session-storage.ts index 557fc20a8e..6507705517 100644 --- a/packages/coding-agent/src/session/session-storage.ts +++ b/packages/coding-agent/src/session/session-storage.ts @@ -137,8 +137,97 @@ export class FileSessionStorage implements SessionStorage { } writeTextSync(fpath: string, content: string): void { - this.ensureDirSync(path.dirname(fpath)); - fs.writeFileSync(fpath, content); + const dir = path.dirname(fpath); + this.ensureDirSync(dir); + const tempPath = path.join(dir, `.${path.basename(fpath)}.${Snowflake.next()}.tmp`); + try { + fs.writeFileSync(tempPath, content); + try { + fs.renameSync(tempPath, fpath); + } catch (err) { + if (hasFsCode(err, "EPERM")) { + this.#replaceSessionFileAfterEpermSync(tempPath, fpath, err); + return; + } + try { + fs.rmSync(tempPath, { force: true }); + } catch (cleanupErr) { + if (!isEnoent(cleanupErr)) { + logger.warn("Failed to remove session rewrite temp file", { + sessionFile: fpath, + tempPath, + error: toError(cleanupErr).message, + }); + } + } + throw toError(err); + } + } catch (err) { + try { + fs.rmSync(tempPath, { force: true }); + } catch (cleanupErr) { + if (!isEnoent(cleanupErr)) { + logger.warn("Failed to remove session rewrite temp file", { + sessionFile: fpath, + tempPath, + error: toError(cleanupErr).message, + }); + } + } + throw toError(err); + } + } + + #replaceSessionFileAfterEpermSync(tempPath: string, targetPath: string, renameError: unknown): void { + const dir = path.dirname(targetPath); + const backupPath = path.join(dir, `${path.basename(targetPath)}.${Snowflake.next()}.bak`); + try { + fs.renameSync(targetPath, backupPath); + } catch (moveAsideError) { + if (isEnoent(moveAsideError)) { + fs.renameSync(tempPath, targetPath); + return; + } + try { + fs.rmSync(tempPath, { force: true }); + } catch (cleanupErr) { + if (!isEnoent(cleanupErr)) { + logger.warn("Failed to remove session rewrite temp file", { + sessionFile: targetPath, + tempPath, + error: toError(cleanupErr).message, + }); + } + } + throw toError(renameError); + } + try { + fs.renameSync(tempPath, targetPath); + } catch (replaceError) { + try { + fs.renameSync(backupPath, targetPath); + } catch (rollbackErr) { + const rollbackError = toError(rollbackErr); + throw new Error( + `Failed to replace session file after EPERM (original: ${toError(renameError).message}; retry: ${ + toError(replaceError).message + }; rollback: ${rollbackError.message})`, + { cause: toError(renameError) }, + ); + } + throw toError(replaceError); + } + try { + fs.rmSync(backupPath, { force: true }); + } catch (err) { + if (!isEnoent(err)) { + logger.warn("Failed to remove session rewrite backup", { + sessionFile: targetPath, + backupPath, + error: toError(err).message, + }); + } + } } statSync(path: string): SessionStorageStat { diff --git a/packages/coding-agent/test/agent-hub-advisor-scroll.test.ts b/packages/coding-agent/test/agent-hub-advisor-scroll.test.ts index 2aea8989ad..438935d320 100644 --- a/packages/coding-agent/test/agent-hub-advisor-scroll.test.ts +++ b/packages/coding-agent/test/agent-hub-advisor-scroll.test.ts @@ -11,58 +11,93 @@ import * as fs from "node:fs"; import * as os from "node:os"; import * as path from "node:path"; import { resetSettingsForTest, Settings } from "@oh-my-pi/pi-coding-agent/config/settings"; +import type { AgentHubRemote } from "@oh-my-pi/pi-coding-agent/modes/components/agent-hub"; import { AgentTranscriptViewer } from "@oh-my-pi/pi-coding-agent/modes/components/agent-transcript-viewer"; import { initTheme } from "@oh-my-pi/pi-coding-agent/modes/theme/theme"; import { AgentRegistry } from "@oh-my-pi/pi-coding-agent/registry/agent-registry"; import { CURRENT_SESSION_VERSION } from "@oh-my-pi/pi-coding-agent/session/session-entries"; const TS = new Date().toISOString(); +const usage = { + input: 1, + output: 1, + cacheRead: 0, + cacheWrite: 0, + totalTokens: 2, + cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, +}; + +function sessionHeader(id = "adv"): string { + return JSON.stringify({ type: "session", version: CURRENT_SESSION_VERSION, id, timestamp: TS, cwd: "/tmp" }); +} + +function userLine(id: string, text: string, parentId: string | null = null): string { + return JSON.stringify({ + type: "message", + id, + parentId, + timestamp: TS, + message: { role: "user", synthetic: true, attribution: "agent", content: text, timestamp: 0 }, + }); +} + +function assistantLine(id: string, text: string, parentId: string | null = null): string { + return JSON.stringify({ + type: "message", + id, + parentId, + timestamp: TS, + message: { + role: "assistant", + content: [{ type: "text", text }], + api: "anthropic-messages", + provider: "anthropic", + model: "gpt-5.5", + usage, + stopReason: "stop", + timestamp: 1, + }, + }); +} + +function jsonl(lines: string[]): string { + return `${lines.join("\n")}\n`; +} + +function renderedBody(viewer: AgentTranscriptViewer): string { + return viewer + .render(80) + .map(l => Bun.stripANSI(l)) + .join("\n"); +} + +function countOccurrences(text: string, marker: string): number { + return text.split(marker).length - 1; +} + +async function waitForBody(viewer: AgentTranscriptViewer, predicate: (body: string) => boolean): Promise { + const deadline = Date.now() + 5000; + let body = renderedBody(viewer); + while (!predicate(body) && Date.now() < deadline) { + await Bun.sleep(50); + body = renderedBody(viewer); + } + return body; +} function buildJsonl(): string { - const usage = { - input: 1, - output: 1, - cacheRead: 0, - cacheWrite: 0, - totalTokens: 2, - cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 }, - }; - const lines = [ - JSON.stringify({ type: "session", version: CURRENT_SESSION_VERSION, id: "adv", timestamp: TS, cwd: "/tmp" }), - ]; - lines.push( - JSON.stringify({ - type: "message", - id: "u0", - parentId: null, - timestamp: TS, - message: { role: "user", synthetic: true, attribution: "agent", content: "PROMPTMARKER", timestamp: 0 }, - }), - ); + const lines = [sessionHeader()]; + let parentId = "u0"; + lines.push(userLine(parentId, "PROMPTMARKER")); for (let i = 0; i < 40; i++) { - lines.push( - JSON.stringify({ - type: "message", - id: `a${i}`, - parentId: null, - timestamp: TS, - message: { - role: "assistant", - content: [{ type: "text", text: `Reviewing step ${i}.` }], - api: "anthropic-messages", - provider: "anthropic", - model: "gpt-5.5", - usage, - stopReason: "stop", - timestamp: i, - }, - }), - ); + const id = `a${i}`; + lines.push(assistantLine(id, `Reviewing step ${i}.`, parentId)); + parentId = id; } - return `${lines.join("\n")}\n`; + return jsonl(lines); } -function makeViewer(file: string) { +function makeViewer(file: string | null, remote?: AgentHubRemote, requestRender: () => void = () => {}) { const agents = new AgentRegistry(); agents.register({ id: "Main/advisor", @@ -76,11 +111,12 @@ function makeViewer(file: string) { return new AgentTranscriptViewer({ agentId: "Main/advisor", registry: agents, + remote, ui: { requestRender: () => {}, requestComponentRender: () => {} } as never, cwd: "/tmp", expandKeys: ["ctrl+o"], hubKeys: ["ctrl+s"], - requestRender: () => {}, + requestRender, onClose: () => {}, onHubClose: () => {}, }); @@ -153,6 +189,112 @@ describe("AgentTranscriptViewer", () => { }); }); + it("tails complete local appends without duplicating old transcript rows", async () => { + const dir = fs.mkdtempSync(path.join(os.tmpdir(), "adv-view-")); + const file = path.join(dir, "__advisor.jsonl"); + fs.writeFileSync(file, jsonl([sessionHeader(), userLine("old", "LOCAL-OLD")])); + const viewer = makeViewer(file); + try { + viewer.render(80); + fs.appendFileSync(file, `${userLine("new", "LOCAL-NEW", "old")}\n`); + const body = await waitForBody(viewer, text => text.includes("LOCAL-NEW")); + expect(body).toContain("LOCAL-NEW"); + expect(countOccurrences(body, "LOCAL-OLD")).toBe(1); + } finally { + viewer.dispose(); + fs.rmSync(dir, { recursive: true, force: true }); + } + }); + + it("buffers partial trailing local JSONL until the newline arrives", async () => { + const dir = fs.mkdtempSync(path.join(os.tmpdir(), "adv-view-")); + const file = path.join(dir, "__advisor.jsonl"); + fs.writeFileSync(file, jsonl([sessionHeader(), userLine("old", "LOCAL-OLD")])); + const viewer = makeViewer(file); + const partial = userLine("partial", "LOCAL-PARTIAL", "old"); + try { + viewer.render(80); + fs.appendFileSync(file, partial.slice(0, -2)); + const withoutPartial = await waitForBody(viewer, text => text.includes("LOCAL-OLD")); + expect(withoutPartial).not.toContain("LOCAL-PARTIAL"); + + fs.appendFileSync(file, `${partial.slice(-2)}\n`); + const body = await waitForBody(viewer, text => text.includes("LOCAL-PARTIAL")); + expect(countOccurrences(body, "LOCAL-PARTIAL")).toBe(1); + } finally { + viewer.dispose(); + fs.rmSync(dir, { recursive: true, force: true }); + } + }); + + it("fully rebuilds local content when the transcript file is replaced", async () => { + const dir = fs.mkdtempSync(path.join(os.tmpdir(), "adv-view-")); + const file = path.join(dir, "__advisor.jsonl"); + fs.writeFileSync(file, jsonl([sessionHeader(), userLine("old", "LOCAL-OLD")])); + const viewer = makeViewer(file); + try { + viewer.render(80); + const replacement = path.join(dir, "replacement.jsonl"); + fs.writeFileSync(replacement, jsonl([sessionHeader(), userLine("replaced", "LOCAL-REPLACED")])); + fs.renameSync(replacement, file); + const body = await waitForBody(viewer, text => text.includes("LOCAL-REPLACED") && !text.includes("LOCAL-OLD")); + expect(body).toContain("LOCAL-REPLACED"); + expect(body).not.toContain("LOCAL-OLD"); + } finally { + viewer.dispose(); + fs.rmSync(dir, { recursive: true, force: true }); + } + }); + + it("clears remote placeholders, appends later messages, and drops stale rows after rotation", async () => { + const firstHeader = jsonl([sessionHeader("remote")]); + const laterMessage = userLine("remote-message", "REMOTE-LATER"); + const rotatedHeader = jsonl([sessionHeader("remote-rotated")]); + const laterChunk = `${laterMessage}\n`; + const responses = [ + { text: firstHeader, newSize: Buffer.byteLength(firstHeader) }, + { text: laterChunk, newSize: Buffer.byteLength(firstHeader) + Buffer.byteLength(laterChunk) }, + { text: "", newSize: 0 }, + { text: rotatedHeader, newSize: Buffer.byteLength(rotatedHeader) }, + ]; + const gates = [Promise.withResolvers(), Promise.withResolvers(), Promise.withResolvers()]; + let calls = 0; + const remote: AgentHubRemote = { + chat: () => {}, + kill: () => {}, + revive: () => {}, + readTranscript: async () => { + const index = calls++; + if (index > 0) await gates[index - 1]?.promise; + return responses[index] ?? { text: "", newSize: Buffer.byteLength(rotatedHeader) }; + }, + }; + let renderRequests = 0; + const viewer = makeViewer(null, remote, () => { + renderRequests++; + }); + try { + const emptyBody = await waitForBody(viewer, text => text.includes("No messages yet.")); + expect(emptyBody).toContain("No messages yet."); + expect(emptyBody).not.toContain("Loading transcript from host"); + expect(renderRequests).toBeGreaterThan(0); + gates[0].resolve(); + + const messageBody = await waitForBody(viewer, text => text.includes("REMOTE-LATER")); + expect(messageBody).toContain("REMOTE-LATER"); + gates[1].resolve(); + gates[2].resolve(); + + const rotatedBody = await waitForBody( + viewer, + text => !text.includes("REMOTE-LATER") && text.includes("No messages yet."), + ); + expect(rotatedBody).not.toContain("REMOTE-LATER"); + expect(rotatedBody).toContain("No messages yet."); + } finally { + viewer.dispose(); + } + }, 10000); it("clears stale content when the transcript file is deleted while open", async () => { const dir = fs.mkdtempSync(path.join(os.tmpdir(), "adv-view-")); const file = path.join(dir, "__advisor.jsonl"); diff --git a/packages/coding-agent/test/modes/utils/render-initial-messages.test.ts b/packages/coding-agent/test/modes/utils/render-initial-messages.test.ts index 63c97b1f23..2306940f98 100644 --- a/packages/coding-agent/test/modes/utils/render-initial-messages.test.ts +++ b/packages/coding-agent/test/modes/utils/render-initial-messages.test.ts @@ -52,7 +52,7 @@ function makeEmptyContext(): SessionContext { /** Build a minimal InteractiveModeContext mock, returning spies for assertions. */ function makeCtx(): { ctx: InteractiveModeContext; - transcriptSpy: Mock<() => SessionContext>; + transcriptSpy: Mock<(options?: { collapseCompactedHistory?: boolean }) => SessionContext>; llmContextSpy: Mock<() => SessionContext>; renderSessionContextSpy: Mock<(...args: unknown[]) => void>; } { @@ -190,6 +190,7 @@ describe("UiHelpers.renderInitialMessages — transcript source", () => { new UiHelpers(ctx).renderInitialMessages(); expect(transcriptSpy).toHaveBeenCalledTimes(1); + expect(transcriptSpy).toHaveBeenCalledWith({ collapseCompactedHistory: true }); expect(llmContextSpy).not.toHaveBeenCalled(); expect(renderSessionContextSpy).toHaveBeenCalledWith(transcript, { updateFooter: true, diff --git a/packages/coding-agent/test/session-manager/build-context.test.ts b/packages/coding-agent/test/session-manager/build-context.test.ts index bc3d94ffbb..2bc37d4fa0 100644 --- a/packages/coding-agent/test/session-manager/build-context.test.ts +++ b/packages/coding-agent/test/session-manager/build-context.test.ts @@ -181,6 +181,87 @@ describe("buildSessionContext", () => { expect((ctx.messages[4] as any).content[0].text).toBe("response3"); }); + it("collapses display transcript history to latest compaction while full transcript keeps inline history", () => { + const entries: SessionEntry[] = [ + msg("1", null, "user", "first"), + msg("2", "1", "assistant", "response1"), + msg("3", "2", "user", "second"), + msg("4", "3", "assistant", "response2"), + compaction("5", "4", "Summary of first two turns", "3"), + msg("6", "5", "user", "third"), + msg("7", "6", "assistant", "response3"), + ]; + + const collapsed = buildSessionContext(entries, undefined, undefined, { + transcript: true, + collapseCompactedHistory: true, + }); + expect(collapsed.messages.map(message => message.role)).toEqual([ + "compactionSummary", + "user", + "assistant", + "user", + "assistant", + ]); + expect((collapsed.messages[0] as any).summary).toContain("Summary of first two turns"); + expect((collapsed.messages[1] as any).content).toBe("second"); + expect((collapsed.messages[2] as any).content[0].text).toBe("response2"); + expect((collapsed.messages[3] as any).content).toBe("third"); + expect((collapsed.messages[4] as any).content[0].text).toBe("response3"); + + const fullTranscript = buildSessionContext(entries, undefined, undefined, { transcript: true }); + expect(fullTranscript.messages.map(message => message.role)).toEqual([ + "user", + "assistant", + "user", + "assistant", + "compactionSummary", + "user", + "assistant", + ]); + }); + + it("collapses OpenAI remote display transcripts with raw kept turns and preserved provider payload", () => { + const remoteCompaction: CompactionEntry = { + ...compaction("3", "2", "Remote summary", "1"), + preserveData: { + openaiRemoteCompaction: { + provider: "openai", + replacementHistory: [ + { type: "message", role: "user", content: [{ type: "input_text", text: "Preserved user" }] }, + { type: "compaction", encrypted_content: "enc_123" }, + ], + compactionItem: { type: "compaction", encrypted_content: "enc_123" }, + }, + }, + }; + const entries: SessionEntry[] = [ + msg("1", null, "user", "first"), + msg("2", "1", "assistant", "response"), + remoteCompaction, + msg("4", "3", "user", "after compact"), + ]; + + const ctx = buildSessionContext(entries, undefined, undefined, { + transcript: true, + collapseCompactedHistory: true, + }); + expect(ctx.messages.map(message => message.role)).toEqual(["compactionSummary", "user", "assistant", "user"]); + expect(ctx.messages[0]?.role).toBe("compactionSummary"); + if (ctx.messages[0]?.role !== "compactionSummary") throw new Error("Expected compaction summary message"); + expect(ctx.messages[0].providerPayload).toEqual({ + type: "openaiResponsesHistory", + provider: "openai", + items: [ + { type: "message", role: "user", content: [{ type: "input_text", text: "Preserved user" }] }, + { type: "compaction", encrypted_content: "enc_123" }, + ], + }); + expect((ctx.messages[1] as { content: string }).content).toBe("first"); + expect((ctx.messages[2] as { content: Array<{ text: string }> }).content[0]?.text).toBe("response"); + expect((ctx.messages[3] as { content: string }).content).toBe("after compact"); + }); + it("handles compaction keeping from first message", () => { const entries: SessionEntry[] = [ msg("1", null, "user", "first"), diff --git a/packages/coding-agent/test/session-storage.test.ts b/packages/coding-agent/test/session-storage.test.ts index 5ac043d035..ade5c0b99b 100644 --- a/packages/coding-agent/test/session-storage.test.ts +++ b/packages/coding-agent/test/session-storage.test.ts @@ -3,14 +3,14 @@ import * as fs from "node:fs"; import * as fsp from "node:fs/promises"; import * as os from "node:os"; import * as path from "node:path"; +import { FileSessionStorage } from "@oh-my-pi/pi-coding-agent/session/session-storage"; describe("FileSessionStorage.deleteSessionWithArtifacts", () => { let tempDir: string; - let storage: { deleteSessionWithArtifacts(sessionPath: string): Promise }; + let storage: FileSessionStorage; beforeEach(async () => { tempDir = await fsp.mkdtemp(path.join(os.tmpdir(), "omp-session-storage-")); - const { FileSessionStorage } = await import("@oh-my-pi/pi-coding-agent/session/session-storage"); storage = new FileSessionStorage(); }); @@ -56,4 +56,18 @@ describe("FileSessionStorage.deleteSessionWithArtifacts", () => { expect(fs.existsSync(sessionPath)).toBe(false); expect(fs.existsSync(artifactsDir)).toBe(true); }); + + it("replaces synchronous writes with a new file identity on POSIX", async () => { + const sessionPath = path.join(tempDir, "rewrite.jsonl"); + storage.writeTextSync(sessionPath, "first\n"); + const before = await fsp.stat(sessionPath); + + storage.writeTextSync(sessionPath, "second\n"); + const after = await fsp.stat(sessionPath); + + expect(await Bun.file(sessionPath).text()).toBe("second\n"); + if (process.platform !== "win32") { + expect(`${after.dev}:${after.ino}`).not.toBe(`${before.dev}:${before.ino}`); + } + }); });