Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/valkey-cache-backend.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@browserbasehq/stagehand": minor
---

Add Valkey as an optional cache backend via iovalkey. Configure with `valkeyHost` (and optional `valkeyPort`, `valkeyTls`, `valkeyPassword`, `valkeyUsername`, `cacheTtl`, `valkeyKeyPrefix`, `valkeyRequestTimeout`, `valkeyMaxCacheValueBytes`) to store act/agent cache entries in Valkey instead of the local filesystem. Gracefully falls back to disabled caching if the connection fails.
9 changes: 9 additions & 0 deletions packages/core/lib/v3/cache/AgentCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,12 @@ export class AgentCache {
path,
} = await this.storage.readJson<CachedAgentEntry>(
`agent-${context.cacheKey}.json`,
// NOTE: The `category` parameter provides act/agent namespace isolation only for
// the Valkey backend (via key prefix). File and in-memory backends ignore it and
// store entries at `agent-${cacheKey}.json` — collisions don't occur in practice
// because act and agent hash different payloads, producing distinct SHA-256 keys,
// but the `agent-` prefix provides an additional layer of separation on disk.
"agent",
);
if (error && path) {
this.logger({
Expand Down Expand Up @@ -365,6 +371,7 @@ export class AgentCache {
const { error, path } = await this.storage.writeJson(
`agent-${context.cacheKey}.json`,
entry,
"agent",
);
if (error && path) {
this.logger({
Expand Down Expand Up @@ -415,6 +422,7 @@ export class AgentCache {
const { error, path } = await this.storage.writeJson(
`agent-${payload.cacheKey}.json`,
entry,
"agent",
);
if (error && path) {
this.logger({
Expand Down Expand Up @@ -880,6 +888,7 @@ export class AgentCache {
const { error, path } = await this.storage.writeJson(
`agent-${context.cacheKey}.json`,
updatedEntry,
"agent",
);
if (error && path) {
this.logger({
Expand Down
278 changes: 274 additions & 4 deletions packages/core/lib/v3/cache/CacheStorage.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import fs from "fs";
import path from "path";
import type { Logger } from "../types/public/index.js";
import { ReadJsonResult, WriteJsonResult } from "../types/private/index.js";
import {
CacheCategory,
ReadJsonResult,
WriteJsonResult,
} from "../types/private/index.js";

const jsonClone = <T>(value: T): T => {
const serialized = JSON.stringify(value);
Expand All @@ -11,11 +15,76 @@ const jsonClone = <T>(value: T): T => {
return JSON.parse(serialized) as T;
};

/**
* Configuration for the Valkey cache backend.
*/
export interface ValkeyCacheOptions {
/** Valkey host address. */
host: string;
/** Valkey port (default: 6379). */
port?: number;
/** Enable TLS for the connection. */
useTls?: boolean;
/** Authentication password (IAM token or static auth token). */
password?: string;
/** Authentication username (for ACL-enabled instances). */
username?: string;
/** Default TTL in seconds for cache entries. Omit for no expiry. */
cacheTtl?: number;
/** Key prefix namespace (default: "stagehand"). */
keyPrefix?: string;
/** Request timeout in ms (default: 5000). */
requestTimeout?: number;
/** Max allowed cache value size in bytes (default: 5MB). Writes exceeding this are skipped. */
maxCacheValueBytes?: number;
}

/**
* Options shape for ValkeyClientLike.set(), matching GLIDE's expiry API.
*/
interface ValkeySetOptions {
expiry?: { type: "EX" | "PX" | "EXAT" | "PXAT"; count: number };
}

/**
* Minimal interface matching the subset of Valkey client methods used by
* CacheStorage. This avoids a hard compile-time dependency on iovalkey
* for users who don't need the Valkey backend.
*/
interface ValkeyClientLike {
get(key: string): Promise<string | null>;
set(
key: string,
value: string,
options?: ValkeySetOptions,
): Promise<string | null>;
del(keys: string[]): Promise<number>;
close(): Promise<void>;
}

/**
* Minimal type shape for the dynamically imported iovalkey module.
*/
interface IovalkeyModule {
default: new (options: Record<string, unknown>) => IovalkeyClient;
}

interface IovalkeyClient {
connect(): Promise<void>;
get(key: string): Promise<string | null>;
set(key: string, value: string, ...args: unknown[]): Promise<string | null>;
del(...keys: string[]): Promise<number>;
quit(): Promise<string>;
disconnect(): void;
}

export class CacheStorage {
private constructor(
private readonly logger: Logger,
private readonly dir?: string,
private readonly memoryStore?: Map<string, unknown>,
private readonly valkeyClient?: ValkeyClientLike,
private readonly valkeyOptions?: ValkeyCacheOptions,
) {}

static create(
Expand Down Expand Up @@ -49,20 +118,179 @@ export class CacheStorage {
return new CacheStorage(logger, undefined, new Map());
}

/**
* Create a CacheStorage backed by Valkey via iovalkey.
* Requires `iovalkey` to be installed as an optional dependency.
* Returns a disabled CacheStorage if the connection fails.
*/
static async createValkey(
options: ValkeyCacheOptions,
logger: Logger,
): Promise<CacheStorage> {
try {
const mod = (await import(
/* webpackIgnore: true */ /* @vite-ignore */ "iovalkey"
)) as unknown as IovalkeyModule;
const Valkey = mod.default;

if (options.username && !options.password) {
throw new Error(
"Valkey cache: username was provided without a password. " +
"Supply both username and password, or omit both.",
);
}

// Default TLS on when credentials are present to avoid plaintext transit.
const useTLS = options.useTls ?? !!options.password;
const port = options.port ?? 6379;

const iovalkeyOpts: Record<string, unknown> = {
host: options.host,
port,
...(options.password ? { password: options.password } : {}),
...(options.username ? { username: options.username } : {}),
...(useTLS ? { tls: {} } : {}),
commandTimeout: options.requestTimeout ?? 5000,
maxRetriesPerRequest: 3,
retryStrategy: (times: number): number | null =>
times > 5 ? null : Math.min(times * 500, 5000),
connectionName: "stagehand-cache",
lazyConnect: true,
};

const rawClient = new Valkey(iovalkeyOpts);
await rawClient.connect();

// Adapt iovalkey's API to ValkeyClientLike
const client: ValkeyClientLike = {
get: (key) => rawClient.get(key),
set: (key, value, setOpts?) => {
if (setOpts?.expiry) {
return rawClient.set(
key,
value,
setOpts.expiry.type,
setOpts.expiry.count,
);
}
return rawClient.set(key, value);
},
del: (keys) =>
keys.length > 0 ? rawClient.del(...keys) : Promise.resolve(0),
close: (): Promise<void> => rawClient.quit().then((): void => {}),
};

logger({
category: "cache",
message: `valkey cache connected to ${options.host}:${port}`,
level: 1,
});

return new CacheStorage(logger, undefined, undefined, client, options);
} catch (err) {
const safeMessage = err instanceof Error ? err.message : "unknown error";
logger({
category: "cache",
message: `unable to initialize valkey cache: ${safeMessage}`,
level: 1,
auxiliary: {
error: { value: safeMessage, type: "string" },
},
});
return new CacheStorage(logger);
}
}

get directory(): string | undefined {
return this.dir;
}

get enabled(): boolean {
return !!this.dir || !!this.memoryStore;
return !!this.dir || !!this.memoryStore || !!this.valkeyClient;
}

/**
* Close the underlying Valkey client connection, if any.
* Safe to call multiple times or when no Valkey client is attached.
*/
async close(): Promise<void> {
if (this.valkeyClient) {
try {
await this.valkeyClient.close();
} catch (err) {
this.logger({
category: "cache",
message: `valkey close error (best-effort): ${err instanceof Error ? err.message : "unknown"}`,
level: 2,
});
}
}
}

private resolvePath(fileName: string): string | null {
if (!this.dir) return null;
return path.join(this.dir, fileName);
}

async readJson<T>(fileName: string): Promise<ReadJsonResult<T>> {
/**
* Derive the Valkey key from a cache fileName and explicit category.
* Strips any redundant category prefix from the fileName (e.g. "agent-")
* since the category is already encoded in the key namespace.
*/
private toValkeyKey(fileName: string, category: CacheCategory): string {
const prefix = this.valkeyOptions?.keyPrefix ?? "stagehand";
const base = fileName.replace(/\.json$/, "").replace(/^agent-/, "");
return `${prefix}:${category}:${base}`;
}

async readJson<T>(
fileName: string,
category: CacheCategory = "act",
): Promise<ReadJsonResult<T>> {
if (this.valkeyClient) {
const key = this.toValkeyKey(fileName, category);
try {
const raw = await this.valkeyClient.get(key);
if (raw === null) {
return { value: null };
}
try {
return { value: JSON.parse(raw) as T };
} catch (parseErr) {
// Corrupt data — delete the poisoned key so subsequent reads don't
// keep failing until TTL expiry.
this.logger({
category: "cache",
message: `valkey key ${key} contains corrupt JSON; deleting`,
level: 1,
auxiliary: {
error: { value: String(parseErr), type: "string" },
},
});
try {
await this.valkeyClient.del([key]);
} catch (delErr) {
this.logger({
category: "cache",
message: `valkey del error for corrupt key ${key} (best-effort): ${delErr instanceof Error ? delErr.message : "unknown"}`,
level: 2,
});
}
return { value: null, error: parseErr, path: key };
}
} catch (err) {
this.logger({
category: "cache",
message: `valkey read error for key ${key}`,
level: 1,
auxiliary: {
error: { value: String(err), type: "string" },
},
});
return { value: null, error: err, path: key };
}
}

if (this.memoryStore) {
if (!this.memoryStore.has(fileName)) {
return { value: null };
Expand All @@ -88,7 +316,49 @@ export class CacheStorage {
}
}

async writeJson(fileName: string, data: unknown): Promise<WriteJsonResult> {
async writeJson(
fileName: string,
data: unknown,
category: CacheCategory = "act",
): Promise<WriteJsonResult> {
if (this.valkeyClient) {
const key = this.toValkeyKey(fileName, category);
try {
const serialized = JSON.stringify(data);
const maxBytes = this.valkeyOptions?.maxCacheValueBytes ?? 5_242_880;
if (Buffer.byteLength(serialized, "utf8") > maxBytes) {
this.logger({
category: "cache",
message: `valkey write skipped: payload exceeds ${maxBytes} byte limit`,
level: 1,
});
return {
error: new Error("cache value exceeds size limit"),
path: key,
};
}
const ttl = this.valkeyOptions?.cacheTtl;
if (ttl !== undefined && ttl > 0) {
await this.valkeyClient.set(key, serialized, {
expiry: { type: "EX", count: ttl },
});
} else {
await this.valkeyClient.set(key, serialized);
}
return {};
} catch (err) {
this.logger({
category: "cache",
message: `valkey write error for key ${key}`,
level: 1,
auxiliary: {
error: { value: String(err), type: "string" },
},
});
return { error: err, path: key };
}
}

if (this.memoryStore) {
this.memoryStore.set(fileName, jsonClone(data));
return {};
Expand Down
2 changes: 2 additions & 0 deletions packages/core/lib/v3/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import {

export { V3 } from "./v3.js";
export { V3 as Stagehand } from "./v3.js";
export { mapV3OptsToValkeyConfig } from "./v3.js";
Comment thread
cubic-dev-ai[bot] marked this conversation as resolved.

export * from "./types/public/index.js";
export { AnnotatedScreenshotText, LLMClient } from "./llm/LLMClient.js";
Expand Down Expand Up @@ -119,6 +120,7 @@ export { getAISDKLanguageModel } from "./llm/LLMProvider.js";
export { __internalCreateInMemoryAgentCacheHandle } from "./cache/serverAgentCache.js";
export { maybeRunShutdownSupervisorFromArgv as __internalMaybeRunShutdownSupervisorFromArgv } from "./shutdown/supervisor.js";
export type { ServerAgentCacheHandle } from "./cache/serverAgentCache.js";
export type { ValkeyCacheOptions } from "./cache/CacheStorage.js";

export type {
ChatMessage,
Expand Down
6 changes: 6 additions & 0 deletions packages/core/lib/v3/types/private/cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ export type WriteJsonResult = {
error?: unknown;
};

/**
* Discriminator for the Valkey key namespace.
* Passed explicitly to readJson/writeJson to avoid fragile filename parsing.
*/
export type CacheCategory = "act" | "agent";

export interface CachedActEntry {
version: 1;
instruction: string;
Expand Down
Loading
Loading