Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/spicy-sessions-pin.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@browserbasehq/stagehand-server-v3": patch
---

Fix long-running requests (e.g. `agentExecute`) sometimes failing with "Stagehand session was closed" even though the action had already completed.
72 changes: 56 additions & 16 deletions packages/server-v3/src/lib/InMemorySessionStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ interface LruNode {
stagehand: V3 | null;
loggerRef: { current?: (message: LogLine) => void };
expiry: number;
/** Number of in-flight requests using this session's V3 instance. */
inUse: number;
prev: LruNode | null;
next: LruNode | null;
}
Expand Down Expand Up @@ -91,7 +93,8 @@ export class InMemorySessionStore implements SessionStore {
const expiredIds: string[] = [];

for (const [sessionId, node] of this.items.entries()) {
if (this.ttlMs > 0 && node.expiry <= now) {
// Never expire a session that is actively serving a request.
if (this.ttlMs > 0 && node.expiry <= now && node.inUse === 0) {
expiredIds.push(sessionId);
}
}
Expand Down Expand Up @@ -129,13 +132,18 @@ export class InMemorySessionStore implements SessionStore {
}

/**
* Evict the least recently used session
* Evict the least recently used session that is not actively serving a
* request. If every cached session is in use, skip eviction rather than
* tear down a live request's browser context.
*/
private async evictLru(): Promise<void> {
const lruNode = this.first;
if (!lruNode) return;
let node = this.first;
while (node && node.inUse > 0) {
node = node.next;
}
if (!node) return;

await this.deleteSession(lruNode.sessionId);
await this.deleteSession(node.sessionId);
}

async startSession(params: CreateSessionParams): Promise<SessionStartResult> {
Expand All @@ -160,8 +168,8 @@ export class InMemorySessionStore implements SessionStore {
const node = this.items.get(sessionId);
if (!node) return false;

// Check if expired
if (this.ttlMs > 0 && node.expiry <= Date.now()) {
// Check if expired, but don't expire a session that is actively serving a request
if (this.ttlMs > 0 && node.expiry <= Date.now() && node.inUse === 0) {
await this.deleteSession(sessionId);
return false;
}
Expand All @@ -180,7 +188,7 @@ export class InMemorySessionStore implements SessionStore {
}

// Check if expired
if (this.ttlMs > 0 && node.expiry <= Date.now()) {
if (this.ttlMs > 0 && node.expiry <= Date.now() && node.inUse === 0) {
await this.deleteSession(sessionId);
throw new Error(`Session expired: ${sessionId}`);
}
Expand All @@ -193,6 +201,9 @@ export class InMemorySessionStore implements SessionStore {
node.loggerRef.current = ctx.logger;
}

// Pin before any await so the node can't be evicted or TTL-expired during a lazy init()
node.inUse += 1;

// If V3 instance exists, return it
if (node.stagehand) {
return node.stagehand;
Expand All @@ -204,6 +215,9 @@ export class InMemorySessionStore implements SessionStore {
try {
await stagehand.init();
} catch (error) {
// Undo the pin taken above; the node stays (stagehand still null) so a
// later request can retry init.
node.inUse -= 1;
try {
await stagehand.close();
} catch {
Expand All @@ -215,6 +229,21 @@ export class InMemorySessionStore implements SessionStore {
return stagehand;
}

async releaseSession(sessionId: string): Promise<void> {
const node = this.items.get(sessionId);
if (!node) return;

// Ignore unmatched/double releases: never go negative, and don't refresh
// the TTL of an already-idle session
if (node.inUse === 0) return;

node.inUse -= 1;

if (this.ttlMs > 0) {
node.expiry = Date.now() + this.ttlMs;
}
}

/**
* Build V3Options from stored params and request context
*/
Expand Down Expand Up @@ -286,6 +315,7 @@ export class InMemorySessionStore implements SessionStore {
stagehand: null, // Lazy initialization
loggerRef: {},
expiry: this.ttlMs > 0 ? Date.now() + this.ttlMs : Infinity,
inUse: 0,
prev: this.last,
next: null,
};
Expand Down Expand Up @@ -342,16 +372,26 @@ export class InMemorySessionStore implements SessionStore {
throw new Error("Max capacity must be greater than 0");
}
const previousCapacity = this.maxCapacity;
this.maxCapacity = config.maxCapacity;

// Evict excess if new capacity is smaller
if (this.maxCapacity < previousCapacity) {
const excess = this.items.size - this.maxCapacity;
for (let i = 0; i < excess; i++) {
// Fire and forget - don't await to match cloud behavior
this.evictLru().catch(console.error);
}
// Evict before lowering capacity. Pinned (in-use) sessions can't be
// evicted, so the cache may briefly exceed the new capacity and converge
// as those requests finish — that's expected, not an error.
if (config.maxCapacity < previousCapacity) {
const excess = this.items.size - config.maxCapacity;
// Evict sequentially: deleteSession removes the node only after awaiting
// close, so firing these concurrently would make every call target the
// same LRU node. The batch stays fire-and-forget to match cloud behavior.
void (async () => {
for (let i = 0; i < excess; i++) {
try {
await this.evictLru();
} catch (err) {
console.error(err);
}
}
})();
}
this.maxCapacity = config.maxCapacity;
}

if (config.ttlMs !== undefined) {
Expand Down
19 changes: 19 additions & 0 deletions packages/server-v3/src/lib/SessionStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,32 @@ export interface SessionStore {
* - On cache miss: loading config, creating V3, caching it
* - Updating the logger reference for streaming
*
* Acquire/release contract: this call PINS the session. Implementations MUST
* NOT evict or TTL-expire a session between a getOrCreateStagehand and its
* matching releaseSession — otherwise a long-running request (e.g. an
* agentExecute that spends tens of seconds in agent "think time" with no CDP
* traffic) can have its browser context torn out mid-flight. Always use this
* via the withSession() wrapper, which guarantees the matching release.
*
* @param sessionId - The session identifier
* @param ctx - Request-time context containing values from headers
* @returns The V3 instance ready for use
* @throws Error if session not found
*/
getOrCreateStagehand(sessionId: string, ctx: RequestContext): Promise<V3>;

/**
* Release a session previously pinned by getOrCreateStagehand.
*
* Decrements the in-use count so the session becomes eligible for eviction/
* TTL again once no requests hold it. MUST clamp at zero (a release without a
* matching acquire is a no-op, never a negative count). Prefer calling this
* via withSession() rather than directly.
*
* @param sessionId - The session identifier
*/
releaseSession(sessionId: string): void | Promise<void>;

/**
* Create a new session with the given parameters.
* Lower-level than startSession - just stores the config.
Expand Down
37 changes: 9 additions & 28 deletions packages/server-v3/src/lib/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
import { error, success } from "./response.js";
import { getSessionStore } from "./sessionStoreManager.js";
import type { RequestContext } from "./SessionStore.js";
import { withSession } from "./withSession.js";

interface StreamingResponseOptions<TV3> {
sessionId: string;
Expand Down Expand Up @@ -180,38 +181,18 @@ export async function createStreamingResponse<TV3>({
: undefined,
};

let stagehand: V3Stagehand;
try {
stagehand = (await sessionStore.getOrCreateStagehand(
sessionId,
requestContext,
)) as V3Stagehand;
} catch (err) {
const loadError = err instanceof Error ? err : new Error(String(err));

sendData("error", "system", { status: "error", error: loadError.message });

if (shouldStreamResponse) {
reply.raw.end();
return reply;
}

return error(
reply,
loadError.message,
loadError instanceof AppError
? loadError.statusCode
: StatusCodes.INTERNAL_SERVER_ERROR,
);
}

sendData("connected", "system", { status: "connected" });

let result: Awaited<ReturnType<typeof handler>> | null = null;
let handlerError: Error | null = null;

// withSession pins the session for the whole handler so it can't be evicted
// or TTL-expired mid-request, and releases once the handler settles. Acquire
// failures (session not found/expired) surface here too and are rendered as
// an error response below, same as handler failures.
try {
result = await handler({ stagehand, data: parsedData });
result = await withSession(sessionId, requestContext, (stagehand) => {
sendData("connected", "system", { status: "connected" });
return handler({ stagehand, data: parsedData });
});
} catch (err) {
handlerError = err instanceof Error ? err : new Error("Unknown error");
request.log.error(
Expand Down
54 changes: 54 additions & 0 deletions packages/server-v3/src/lib/withSession.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import type { Stagehand as V3Stagehand } from "@browserbasehq/stagehand";

import { getSessionStore } from "./sessionStoreManager.js";
import type { RequestContext } from "./SessionStore.js";

/**
* Acquire a session's V3 instance, run `fn`, and release exactly once when
* `fn` settles.
*
* The session is pinned — excluded from LRU eviction and TTL expiry — for the
* full duration of `fn`, including agent "think time" when no CDP traffic
* flows. Release happens only in the `finally`, i.e. strictly AFTER `fn`
* settles, so the session can never be evicted while the handler is still
* using its Stagehand instance.
*
* Note: we intentionally do NOT release on client disconnect. If the client
* goes away, the handler keeps running server-side (and may still be driving
* the browser — e.g. completing a payment); releasing then would let the
* session be evicted mid-operation, the exact bug this pinning prevents. The
* handler is bounded by its own step/timeout limits, so the `finally` always
* runs and the pin is released when the work actually finishes.
*
* This is the only supported way to use a session's V3 instance for a request:
* callers must never hold a stagehand reference past the end of `fn`.
*
* Acquire failures propagate to the caller before any pin is taken.
*/
export async function withSession<T>(
sessionId: string,
ctx: RequestContext,
fn: (stagehand: V3Stagehand) => Promise<T>,
): Promise<T> {
const sessionStore = getSessionStore();
const stagehand = (await sessionStore.getOrCreateStagehand(
sessionId,
ctx,
)) as V3Stagehand;

try {
return await fn(stagehand);
} finally {
try {
await sessionStore.releaseSession(sessionId);
} catch (err) {
// A failed release leaves the session pinned (inUse not decremented),
// which leaks capacity. Don't rethrow (that would clobber the handler's
// result/error in a finally) — record it so the leak is detectable.
console.error(
`Failed to release session ${sessionId}; it may remain pinned:`,
err,
);
}
}
}
5 changes: 3 additions & 2 deletions packages/server-v3/src/routes/v1/sessions/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
} from "../../../lib/header.js";
import { error, success } from "../../../lib/response.js";
import { getSessionStore } from "../../../lib/sessionStoreManager.js";
import { withSession } from "../../../lib/withSession.js";
import { AISDK_PROVIDERS } from "../../../types/model.js";

// Extended schema with custom refinement for local browser validation
Expand Down Expand Up @@ -234,11 +235,11 @@ const startRouteHandler: RouteHandler = withErrorHandling(
let finalCdpUrl = connectUrl ?? session.cdpUrl ?? "";
if (localBrowserLaunchOptions) {
try {
const stagehand = await sessionStore.getOrCreateStagehand(
finalCdpUrl = await withSession(
session.sessionId,
{ modelApiKey: localBrowserModelApiKey },
(stagehand) => Promise.resolve(stagehand.connectURL()),
);
finalCdpUrl = stagehand.connectURL();
} catch (err) {
request.log.error(
{
Expand Down
Loading
Loading