Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/upgrading/upgrading_v4.md
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,10 @@ The high-level storage classes (`Dataset`, `KeyValueStore`, `RequestQueue`) now

`timeoutSecs` and `doNotRetryTimeouts` were removed from `RecordOptions` (used by `KeyValueStore.setValue`). Only `contentType` remains.

### `maybeStringify` is removed

The `maybeStringify` helper exported from `@crawlee/core` has been removed. Value (de)serialization now lives entirely in the `KeyValueStore` frontend: writing serializes the value (and infers its content type), reading parses it back, and the storage client is a plain byte transport. If you imported `maybeStringify` directly, use the `serializeValue` / `parseValue` functions exported from `@crawlee/core` instead.

### `KeyValueStoreIteratorOptions` simplified

`exclusiveStartKey` and `collection` were removed. Only `prefix` remains.
Expand Down
1 change: 1 addition & 0 deletions packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
"@crawlee/utils": "workspace:*",
"@sapphire/async-queue": "^1.5.5",
"@vladfrangu/async_event_emitter": "^2.4.6",
"content-type": "^1.0.5",
"csv-stringify": "^6.5.2",
"json5": "^2.2.3",
"minimatch": "^10.0.1",
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/storages/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
export * from './dataset.js';
export * from './key_value_store.js';
export * from './key_value_store_codec.js';
export * from './request_list.js';
export type * from './request_loader.js';
export type * from './request_manager.js';
Expand Down
47 changes: 8 additions & 39 deletions packages/core/src/storages/key_value_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import JSON5 from 'json5';
import ow, { ArgumentError } from 'ow';

import { KEY_VALUE_STORE_KEY_REGEX } from '@apify/consts';
import { jsonStringifyExtended } from '@apify/utilities';

import { Configuration } from '../configuration.js';
import { serviceLocator } from '../service_locator.js';
import type { Awaitable } from '../typedefs.js';
import { checkStorageAccess } from './access_checking.js';
import { parseValue, serializeValue } from './key_value_store_codec.js';
import type { StorageIdentifier } from './storage_instance_manager.js';
import type { StorageOpenOptions } from './utils.js';
import { resolveStorageIdentifier } from './storage_instance_manager.js';
Expand All @@ -20,39 +20,6 @@ import { createDualIterable, purgeDefaultStorages } from './utils.js';
/** @internal */
const KVS_KEYS_DEFAULT_LIMIT = 1000;

/**
* Helper function to possibly stringify value if options.contentType is not set.
*
* @ignore
*/
export const maybeStringify = <T>(value: T, options: { contentType?: string }) => {
// If contentType is missing, value will be stringified to JSON
if (options.contentType === null || options.contentType === undefined) {
options.contentType = 'application/json; charset=utf-8';

try {
// Format JSON to simplify debugging, the overheads with compression is negligible
value = jsonStringifyExtended(value as Dictionary, null, 2) as unknown as T;
} catch (e) {
const error = e as Error;
// Give more meaningful error message
if (error.message?.includes('Invalid string length')) {
error.message = 'Object is too large';
}
throw new Error(`The "value" parameter cannot be stringified to JSON: ${error.message}`);
}

if (value === undefined) {
throw new Error(
'The "value" parameter was stringified to JSON and returned undefined. ' +
"Make sure you're not trying to stringify an undefined value.",
);
}
}

return value;
};

/**
* The `KeyValueStore` class represents a key-value store, a simple data storage that is used
* for saving and reading data records or files. Each data record is
Expand Down Expand Up @@ -232,7 +199,9 @@ export class KeyValueStore {
ow(key, ow.string.nonEmpty);
const record = await this.client.getValue(key);

return (record?.value as T) ?? defaultValue ?? null;
const parsed = record ? parseValue(record.value, record.contentType ?? '') : undefined;
Comment thread
janbuchar marked this conversation as resolved.
Outdated

return (parsed as T) ?? defaultValue ?? null;
}

/**
Expand Down Expand Up @@ -301,7 +270,7 @@ export class KeyValueStore {
const results: T[] = [];
for (const item of page) {
const record = await this.client.getValue(item.key);
if (record) results.push(mapRecord(item.key, record.value));
if (record) results.push(mapRecord(item.key, parseValue(record.value, record.contentType ?? '')));
}
yield results;
}
Expand Down Expand Up @@ -417,12 +386,12 @@ export class KeyValueStore {
// In this case delete the record.
if (value === null) return this.client.deleteValue(key);

value = maybeStringify(value, optionsCopy);
const serialized = serializeValue(value, optionsCopy.contentType);

return this.client.setValue({
key,
value,
contentType: optionsCopy.contentType,
value: serialized.value,
contentType: serialized.contentType,
});
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,57 @@
import type { Dictionary } from '@crawlee/types';
import contentTypeParser from 'content-type';
import JSON5 from 'json5';

import { jsonStringifyExtended } from '@apify/utilities';

const CONTENT_TYPE_JSON = 'application/json';
const STRINGIFIABLE_CONTENT_TYPE_RXS = [new RegExp(`^${CONTENT_TYPE_JSON}$`, 'i'), /^application\/.*xml$/i, /^text\//i];

/**
* Canonical write path for key-value store records. When a content type is provided, the value is
* taken as-is; when it is absent, the value is serialized to JSON and the content type is set to
* `application/json; charset=utf-8`.
*
* Does NOT drain streams — that is storage mechanics and stays in the storage client.
*
* Backend-independent.
*/
export function serializeValue(
value: unknown,
contentType?: string,
): { value: Buffer | string | NodeJS.ReadableStream; contentType: string } {
// When an explicit content type is provided, the value is taken as-is — it is the caller's
// responsibility to pass a String/Buffer/Stream (the frontend validates this). When no content
// type is provided, the value is serialized to JSON.
if (contentType !== null && contentType !== undefined) {
return { value: value as Buffer | string | NodeJS.ReadableStream, contentType };
}

const resolvedContentType = 'application/json; charset=utf-8';

let serialized: string;
try {
// Format JSON to simplify debugging, the overheads with compression is negligible
serialized = jsonStringifyExtended(value as Dictionary, null, 2);
Comment thread
janbuchar marked this conversation as resolved.
} catch (e) {
const error = e as Error;
// Give more meaningful error message
if (error.message?.includes('Invalid string length')) {
error.message = 'Object is too large';
}
throw new Error(`The "value" parameter cannot be stringified to JSON: ${error.message}`);
}

if (serialized === undefined) {
throw new Error(
'The "value" parameter was stringified to JSON and returned undefined. ' +
"Make sure you're not trying to stringify an undefined value.",
);
}

return { value: serialized, contentType: resolvedContentType };
}

/**
* Parses a Buffer or ArrayBuffer using the provided content type header.
*
Expand All @@ -13,8 +61,10 @@ const STRINGIFIABLE_CONTENT_TYPE_RXS = [new RegExp(`^${CONTENT_TYPE_JSON}$`, 'i'
*
* If the header includes a charset, the body will be stringified only
* if the charset represents a known encoding to Node.js or Browser.
*
* Backend-independent — this is the canonical read path for the {@apilink KeyValueStore} frontend.
*/
export function maybeParseBody(
export function parseValue(
body: Buffer | ArrayBuffer,
contentTypeHeader: string,
): string | Buffer | ArrayBuffer | Record<string, unknown> {
Expand Down
37 changes: 9 additions & 28 deletions packages/memory-storage/src/resource-clients/key-value-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ import { randomUUID } from 'node:crypto';
import type * as storage from '@crawlee/types';
import { s } from '@sapphire/shapeshift';

import { maybeParseBody } from '../body-parser.js';
import type { MemoryStorageClient } from '../index.js';
import { isBuffer, isStream } from '../utils.js';
import { isStream } from '../utils.js';
import { BaseClient } from './common/base-client.js';
import mime from 'mime-types';

Expand Down Expand Up @@ -168,15 +167,14 @@ export class KeyValueStoreClient extends BaseClient implements storage.KeyValueS
return undefined;
}

// Return raw bytes + verbatim content type. Parsing is the frontend's job (see the
// KeyValueStore codec). The mime fallback reconstructs the content type for on-disk records.
const record: storage.KeyValueStoreRecord = {
key: entry.key,
value: entry.value,
contentType: entry.contentType ?? (mime.contentType(entry.extension) as string),
};

// Auto-parse the body (JSON → object, text → string, etc.)
record.value = maybeParseBody(record.value, record.contentType!);

this.updateTimestamps(false);

return record;
Expand All @@ -199,32 +197,15 @@ export class KeyValueStoreClient extends BaseClient implements storage.KeyValueS
}).parse(record);

const { key } = record;
let { value, contentType } = record;

const valueIsStream = isStream(value);

const isValueStreamOrBuffer = valueIsStream || isBuffer(value);
// To allow saving Objects to JSON without providing content type
if (!contentType) {
if (isValueStreamOrBuffer) contentType = 'application/octet-stream';
else if (typeof value === 'string') contentType = 'text/plain; charset=utf-8';
else contentType = 'application/json; charset=utf-8';
}
let { value } = record;
// The frontend (KeyValueStore codec) serializes the value and resolves its content type
// before it reaches the client. We only need it here for on-disk extension bookkeeping.
const contentType = record.contentType ?? 'application/octet-stream';

const extension = mime.extension(contentType) || DEFAULT_LOCAL_FILE_EXTENSION;

const isContentTypeJson = extension === 'json';

if (isContentTypeJson && !isValueStreamOrBuffer && typeof value !== 'string') {
try {
value = JSON.stringify(value, null, 2);
} catch (err: any) {
const msg = `The record value cannot be stringified to JSON. Please provide other content type.\nCause: ${err.message}`;
throw new Error(msg);
}
}

if (valueIsStream) {
// Draining a stream into a Buffer for storage is the client's responsibility.
if (isStream(value)) {
const chunks = [];
for await (const chunk of value) {
chunks.push(chunk);
Expand Down
7 changes: 6 additions & 1 deletion packages/memory-storage/test/async-iteration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,12 @@ describe('Async iteration support', () => {
kvStore = await storage.createKeyValueStoreClient({ name: 'async-iteration-kvs' });

for (const key of keys) {
await kvStore.setValue({ key, value: { data: key } });
// The client is a byte-transport: pass serialized bytes + content type.
await kvStore.setValue({
key,
value: JSON.stringify({ data: key }),
contentType: 'application/json; charset=utf-8',
});
}
});

Expand Down
22 changes: 17 additions & 5 deletions packages/memory-storage/test/key-value-store/purge.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,22 @@ describe('MemoryStorageClient.purge preserves the default key-value store input'
const storage = new MemoryStorageClient();
const store: KeyValueStoreClient = await storage.createKeyValueStoreClient({ name: 'default' });

await store.setValue({ key: 'INPUT', value: { hello: 'world' } });
await store.setValue({ key: 'some-other-key', value: { foo: 'bar' } });
await store.setValue({
key: 'INPUT',
value: JSON.stringify({ hello: 'world' }),
contentType: 'application/json; charset=utf-8',
});
await store.setValue({
key: 'some-other-key',
value: JSON.stringify({ foo: 'bar' }),
contentType: 'application/json; charset=utf-8',
});

await storage.purge();

// INPUT must survive the purge (parity with FileSystemStorageClient)...
const input = await store.getValue('INPUT');
expect(input?.value).toEqual({ hello: 'world' });
expect(input?.value.toString()).toBe(JSON.stringify({ hello: 'world' }));

// ...while every other record is removed.
expect(await store.getValue('some-other-key')).toBeUndefined();
Expand All @@ -25,11 +33,15 @@ describe('MemoryStorageClient.purge preserves the default key-value store input'
const storage = new MemoryStorageClient();
const store: KeyValueStoreClient = await storage.createKeyValueStoreClient({ name: 'not-default' });

await store.setValue({ key: 'INPUT', value: { hello: 'world' } });
await store.setValue({
key: 'INPUT',
value: JSON.stringify({ hello: 'world' }),
contentType: 'application/json; charset=utf-8',
});

// `purge` on the storage client only touches default storages, so a named store keeps its data.
await storage.purge();
expect((await store.getValue('INPUT'))?.value).toEqual({ hello: 'world' });
expect((await store.getValue('INPUT'))?.value.toString()).toBe(JSON.stringify({ hello: 'world' }));

// Purging the store directly clears everything, including INPUT.
await store.purge();
Expand Down
3 changes: 3 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 8 additions & 6 deletions test/core/crawlers/adaptive_playwright_crawler.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { Server } from 'node:http';
import type { AddressInfo } from 'node:net';

import { Configuration, type Dictionary, EventType, KeyValueStore, serviceLocator } from '@crawlee/core';
import { type Dictionary, EventType, KeyValueStore, parseValue, serviceLocator } from '@crawlee/core';
import type {
AdaptivePlaywrightCrawlerContext,
AdaptivePlaywrightCrawlerOptions,
Expand Down Expand Up @@ -384,8 +384,9 @@ describe('AdaptivePlaywrightCrawler', () => {
);

await crawler.run();
// The client returns raw bytes now; parse them as the frontend would.
const state = await localStorageEmulator.getState();
expect(state!.value).toEqual({ count: 3 });
expect(parseValue(state!.value, state!.contentType!)).toEqual({ count: 3 });
});

test('should return deeply equal but not identical state objects across handler runs', async () => {
Expand Down Expand Up @@ -456,11 +457,12 @@ describe('AdaptivePlaywrightCrawler', () => {
);

await crawler.run();
const store = await localStorageEmulator.getKeyValueStore();

expect((await store.getValue('1'))!.value).toEqual({ content: 42 });
expect((await store.getValue('2'))!.value).toEqual({ content: 42 });
expect((await store.getValue('3'))!.value).toEqual({ content: 42 });
const store = await KeyValueStore.open();

await expect(store.getValue('1')).resolves.toEqual({ content: 42 });
await expect(store.getValue('2')).resolves.toEqual({ content: 42 });
await expect(store.getValue('3')).resolves.toEqual({ content: 42 });
});

test('should not allow direct key-value store manipulation', async () => {
Expand Down
21 changes: 3 additions & 18 deletions test/core/storages/key_value_store.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { PassThrough } from 'node:stream';

import { KeyValueStore, maybeStringify, serviceLocator } from '@crawlee/core';
import { KeyValueStore, serviceLocator } from '@crawlee/core';
import type { Dictionary } from '@crawlee/utils';
import { MemoryStorageEmulator } from '../../shared/MemoryStorageEmulator.js';

Expand Down Expand Up @@ -52,7 +52,8 @@ describe('KeyValueStore', () => {
.spyOn(store.client, 'getValue')
.mockResolvedValueOnce({
key: 'key-1',
value: record,
// The client now returns raw bytes; the frontend parses them.
value: Buffer.from(recordStr),
contentType: 'application/json; charset=utf-8',
});

Expand Down Expand Up @@ -371,22 +372,6 @@ describe('KeyValueStore', () => {
// });
// });

describe('maybeStringify()', () => {
test('should work', () => {
expect(maybeStringify({ foo: 'bar' }, { contentType: null as any })).toBe('{\n "foo": "bar"\n}');
expect(maybeStringify({ foo: 'bar' }, { contentType: undefined })).toBe('{\n "foo": "bar"\n}');

expect(maybeStringify('xxx', { contentType: undefined })).toBe('"xxx"');
expect(maybeStringify('xxx', { contentType: 'something' })).toBe('xxx');

const obj = {} as Dictionary;
obj.self = obj;
expect(() => maybeStringify(obj, { contentType: null as any })).toThrow(
'The "value" parameter cannot be stringified to JSON: Converting circular structure to JSON',
);
});
});

describe('getFileNameRegexp()', () => {
const getFileNameRegexp = (key: string) => {
const safeKey = key.replace(/[.*+?^${}()|[\]\\]/g, '\\$&');
Expand Down
Loading
Loading