diff --git a/dev-packages/node-integration-tests/suites/tracing/vercelai/v6_v7/instrument-with-pii.mjs b/dev-packages/node-integration-tests/suites/tracing/vercelai/v6_v7/instrument-with-pii.mjs index 7a021162097c..a5a385355c79 100644 --- a/dev-packages/node-integration-tests/suites/tracing/vercelai/v6_v7/instrument-with-pii.mjs +++ b/dev-packages/node-integration-tests/suites/tracing/vercelai/v6_v7/instrument-with-pii.mjs @@ -1,6 +1,10 @@ import * as Sentry from '@sentry/node'; import { loggingTransport } from '@sentry-internal/node-integration-tests'; +if (process.env.USE_ORCHESTRION) { + Sentry.experimentalUseDiagnosticsChannelInjection(); +} + Sentry.init({ dsn: 'https://public@dsn.ingest.sentry.io/1337', release: '1.0', diff --git a/dev-packages/node-integration-tests/suites/tracing/vercelai/v6_v7/instrument.mjs b/dev-packages/node-integration-tests/suites/tracing/vercelai/v6_v7/instrument.mjs index 46a27dd03b74..ce1e1b394c66 100644 --- a/dev-packages/node-integration-tests/suites/tracing/vercelai/v6_v7/instrument.mjs +++ b/dev-packages/node-integration-tests/suites/tracing/vercelai/v6_v7/instrument.mjs @@ -1,6 +1,10 @@ import * as Sentry from '@sentry/node'; import { loggingTransport } from '@sentry-internal/node-integration-tests'; +if (process.env.USE_ORCHESTRION) { + Sentry.experimentalUseDiagnosticsChannelInjection(); +} + Sentry.init({ dsn: 'https://public@dsn.ingest.sentry.io/1337', release: '1.0', diff --git a/dev-packages/node-integration-tests/suites/tracing/vercelai/v6_v7/test.ts b/dev-packages/node-integration-tests/suites/tracing/vercelai/v6_v7/test.ts index bb579a8f9443..5811020ee218 100644 --- a/dev-packages/node-integration-tests/suites/tracing/vercelai/v6_v7/test.ts +++ b/dev-packages/node-integration-tests/suites/tracing/vercelai/v6_v7/test.ts @@ -24,9 +24,11 @@ import { import { cleanupChildProcesses, createEsmAndCjsTests, createEsmTests } from '../../../../utils/runner'; describe.each([ - ['6', '^6.0.0'], - ['7', '7.0.0-beta.179'], -])('Vercel AI integration (version %s)', (version, vercelAiVersion) => { + ['6', {}, '^6.0.0'], + ['6', { USE_ORCHESTRION: 'true' }, '^6.0.0'], + ['7', {}, '7.0.0-beta.179'], + ['7', { USE_ORCHESTRION: 'true' }, '7.0.0-beta.179'], +])('Vercel AI integration (version %s, env %o)', (version, env: Record, vercelAiVersion) => { afterAll(() => { cleanupChildProcesses(); }); @@ -36,9 +38,12 @@ describe.each([ const nodeVersion = NODE_VERSION.major; const failsOnCjs = version === '7' && nodeVersion === 18; - // v6 is instrumented via the OTel processor, v7 via the `ai:telemetry` tracing-channel subscriber, - // so the span origin differs by version. - const expectedOrigin = version === '7' ? 'auto.vercelai.channel' : 'auto.vercelai.otel'; + const useOrchestrion = env.USE_ORCHESTRION === 'true'; + const usesChannels = version === '7' || useOrchestrion; + + // in v7 and orchestrion mode, we use the channel-based integration + // else, we use the OTel processor + const expectedOrigin = usesChannels ? 'auto.vercelai.channel' : 'auto.vercelai.otel'; // We only run this in ESM and CJS to verify full support // Other suites we only run in ESM to simplify the test setup @@ -49,6 +54,7 @@ describe.each([ (createRunner, test) => { test('creates ai spans for dataCollection defaults', async () => { await createRunner() + .withEnv(env) .expect({ transaction: { transaction: 'main' } }) .expect({ span: container => { @@ -174,6 +180,7 @@ describe.each([ (createRunner, test) => { test('creates ai spans when dataCollection.genAi has inputs and outputs disabled', async () => { await createRunner() + .withEnv(env) .expect({ transaction: { transaction: 'main' } }) .expect({ span: container => { @@ -233,10 +240,10 @@ describe.each([ // On v6, vercel AI natively defaults to recording inputs and outputs by default when telemetry is enabled // On v7, we do not have access to this, so this defaults to false in this case expect(secondInvokeAgentSpan.attributes?.[GEN_AI_INPUT_MESSAGES_ATTRIBUTE]?.value).toEqual( - version === '6' ? '[{"role":"user","content":"Where is the second span?"}]' : undefined, + !usesChannels ? '[{"role":"user","content":"Where is the second span?"}]' : undefined, ); expect(secondInvokeAgentSpan.attributes?.[GEN_AI_OUTPUT_MESSAGES_ATTRIBUTE]?.value).toEqual( - version === '6' + !usesChannels ? '[{"role":"assistant","parts":[{"type":"text","content":"Second span here!"}],"finish_reason":"stop"}]' : undefined, ); @@ -300,6 +307,7 @@ describe.each([ let errorEvent: Event | undefined; await createRunner() + .withEnv(env) .expect({ transaction: transaction => { transactionEvent = transaction; @@ -369,6 +377,7 @@ describe.each([ (createRunner, test) => { test('creates ai related spans', async () => { await createRunner() + .withEnv(env) .expect({ transaction: { transaction: 'main' } }) .expect({ span: container => { @@ -411,6 +420,7 @@ describe.each([ (createRunner, test) => { test('creates spans for ToolLoopAgent with tool calls', async () => { await createRunner() + .withEnv(env) .expect({ transaction: { transaction: 'main' } }) .expect({ span: container => { @@ -474,6 +484,7 @@ describe.each([ (createRunner, test) => { test('parents concurrent calls that share one model instance correctly', async () => { await createRunner() + .withEnv(env) .expect({ transaction: { transaction: 'main' } }) .expect({ span: container => { @@ -529,6 +540,7 @@ describe.each([ 'creates streamText spans with the model call parented to invoke_agent', async () => { await createRunner() + .withEnv(env) .expect({ transaction: { transaction: 'main' } }) .expect({ span: container => { @@ -569,6 +581,7 @@ describe.each([ (createRunner, test) => { test('finishes spans with an error status when the operation rejects', async () => { await createRunner() + .withEnv(env) .expect({ transaction: { transaction: 'main' } }) .expect({ span: container => { @@ -609,6 +622,7 @@ describe.each([ (createRunner, test) => { test('derives provider-metadata token breakdown, conversation id and system instructions', async () => { await createRunner() + .withEnv(env) .expect({ transaction: { transaction: 'main' } }) .expect({ span: container => { diff --git a/packages/core/src/asyncContext/index.ts b/packages/core/src/asyncContext/index.ts index ea5f1db67ac0..dd8eab259313 100644 --- a/packages/core/src/asyncContext/index.ts +++ b/packages/core/src/asyncContext/index.ts @@ -33,10 +33,28 @@ export function getAsyncContextStrategy(carrier: Carrier): AsyncContextStrategy } /** - * Get the runtime binding needed to connect tracing channels to async context. + * Execute a callback whenever the tracing channel binding is available. + * If it is not available after retry, the callback is not executed. */ -export function getTracingChannelBinding(): TracingChannelBinding | undefined { - return getAsyncContextStrategy(getMainCarrier()).getTracingChannelBinding?.(); +export function waitForTracingChannelBinding(callback: () => void, retries = 1): void { + const binding = getAsyncContextStrategy(getMainCarrier()).getTracingChannelBinding?.(); + + if (binding) { + callback(); + return; + } + + if (!retries) { + return; + } + + // It is possible that the binding is not available yet when this is initially called + // This happens when users use a custom OTEL setup + // In this case, we wait for a tick and try again afterwards + // If it still fails, we bail and do nothing + setTimeout(() => { + waitForTracingChannelBinding(callback, retries - 1); + }, 1); } /** diff --git a/packages/core/src/shared-exports.ts b/packages/core/src/shared-exports.ts index 3ee80a85ec17..6cc65bca20f3 100644 --- a/packages/core/src/shared-exports.ts +++ b/packages/core/src/shared-exports.ts @@ -51,7 +51,8 @@ export { export { getDefaultCurrentScope, getDefaultIsolationScope } from './defaultScopes'; export { setAsyncContextStrategy, - getTracingChannelBinding as _INTERNAL_getTracingChannelBinding, + getAsyncContextStrategy, + waitForTracingChannelBinding, _INTERNAL_createTracingChannelBinding, } from './asyncContext'; export { getGlobalSingleton, getMainCarrier } from './carrier'; diff --git a/packages/node-core/src/sdk/index.ts b/packages/node-core/src/sdk/index.ts index 31493a273d4a..dd245d85ad08 100644 --- a/packages/node-core/src/sdk/index.ts +++ b/packages/node-core/src/sdk/index.ts @@ -114,7 +114,7 @@ function _init( initializeEsmLoader(); } - setOpenTelemetryContextAsyncContextStrategy(); + setOpenTelemetryContextAsyncContextStrategy(options); const scope = getCurrentScope(); scope.update(options.initialScope); diff --git a/packages/node/src/integrations/tracing/redis/index.ts b/packages/node/src/integrations/tracing/redis/index.ts index 001024a0bd23..1636ade095a8 100644 --- a/packages/node/src/integrations/tracing/redis/index.ts +++ b/packages/node/src/integrations/tracing/redis/index.ts @@ -7,6 +7,7 @@ import { SEMANTIC_ATTRIBUTE_SENTRY_OP, spanToJSON, truncate, + waitForTracingChannelBinding, } from '@sentry/core'; import * as dc from 'node:diagnostics_channel'; import { subscribeRedisDiagnosticChannels, type RedisTracingChannelFactory } from '@sentry/server-utils'; @@ -128,9 +129,9 @@ export const instrumentRedis = Object.assign( // so defer to the next tick. // Check this here to ensure this does not fail at runtime for Node <= 18.18.0 if (dc.tracingChannel) { - void Promise.resolve().then(() => - subscribeRedisDiagnosticChannels(dc.tracingChannel as RedisTracingChannelFactory, cacheResponseHook), - ); + waitForTracingChannelBinding(() => { + subscribeRedisDiagnosticChannels(dc.tracingChannel as RedisTracingChannelFactory, cacheResponseHook); + }); } // todo: implement them gradually diff --git a/packages/node/src/sdk/experimentalUseDiagnosticsChannelInjection.ts b/packages/node/src/sdk/experimentalUseDiagnosticsChannelInjection.ts index d51f2d86a610..c5b62f908b9e 100644 --- a/packages/node/src/sdk/experimentalUseDiagnosticsChannelInjection.ts +++ b/packages/node/src/sdk/experimentalUseDiagnosticsChannelInjection.ts @@ -1,4 +1,8 @@ -import { mysqlChannelIntegration, detectOrchestrionSetup } from '@sentry/server-utils/orchestrion'; +import { + mysqlChannelIntegration, + vercelAiChannelIntegration, + detectOrchestrionSetup, +} from '@sentry/server-utils/orchestrion'; import { registerDiagnosticsChannelInjection } from '@sentry/server-utils/orchestrion/register'; import type { DiagnosticsChannelInjection } from './diagnosticsChannelInjection'; import { setDiagnosticsChannelInjectionLoader } from './diagnosticsChannelInjection'; @@ -38,8 +42,8 @@ import { setDiagnosticsChannelInjectionLoader } from './diagnosticsChannelInject export function experimentalUseDiagnosticsChannelInjection(): void { setDiagnosticsChannelInjectionLoader( (): DiagnosticsChannelInjection => ({ - integrations: [mysqlChannelIntegration()], - replacedOtelIntegrationNames: ['Mysql'], + integrations: [mysqlChannelIntegration(), vercelAiChannelIntegration()], + replacedOtelIntegrationNames: ['Mysql', 'VercelAI'], register: registerDiagnosticsChannelInjection, detect: detectOrchestrionSetup, }), diff --git a/packages/opentelemetry/src/asyncContextStrategy.ts b/packages/opentelemetry/src/asyncContextStrategy.ts index b9ad711c299c..c8c6deb7d9ef 100644 --- a/packages/opentelemetry/src/asyncContextStrategy.ts +++ b/packages/opentelemetry/src/asyncContextStrategy.ts @@ -1,5 +1,5 @@ import * as api from '@opentelemetry/api'; -import type { Scope, Span, withActiveSpan as defaultWithActiveSpan } from '@sentry/core'; +import type { Scope, withActiveSpan as defaultWithActiveSpan } from '@sentry/core'; import { getDefaultCurrentScope, getDefaultIsolationScope, setAsyncContextStrategy } from '@sentry/core'; import { SENTRY_FORK_ISOLATION_SCOPE_CONTEXT_KEY, @@ -12,20 +12,23 @@ import { getContextFromScope, getScopesFromContext } from './utils/contextData'; import { getActiveSpan } from './utils/getActiveSpan'; import { getTraceData } from './utils/getTraceData'; import { suppressTracing } from './utils/suppressTracing'; +import { getAsyncLocalStorage } from './asyncLocalStorageContextManager'; interface ContextApi { - _getContextManager(): { - getAsyncLocalStorageLookup(): { - asyncLocalStorage: unknown; - }; - }; + _getContextManager(): + | undefined + | { + getAsyncLocalStorageLookup(): { + asyncLocalStorage: unknown; + }; + }; } /** * Sets the async context strategy to use follow the OTEL context under the hood. * We handle forking a hub inside of our custom OTEL Context Manager (./otelContextManager.ts) */ -export function setOpenTelemetryContextAsyncContextStrategy(): void { +export function setOpenTelemetryContextAsyncContextStrategy(options?: { skipOpenTelemetrySetup?: boolean }): void { function getScopes(): CurrentScopes { const ctx = api.context.active(); const scopes = getScopesFromContext(ctx); @@ -117,13 +120,27 @@ export function setOpenTelemetryContextAsyncContextStrategy(): void { // than the OTEL one - but this is OK for here, as we now we'll only have OTEL spans passed around withActiveSpan: withActiveSpan as typeof defaultWithActiveSpan, getTracingChannelBinding: () => { + // Default case: by default we can just access the async local storage instance here + // this will work no matter if this called before or after the Otel ContextManager was setup + if (!options?.skipOpenTelemetrySetup) { + const asyncLocalStorage = getAsyncLocalStorage(); + + return { + asyncLocalStorage, + getStoreWithActiveSpan: span => api.trace.setSpan(api.context.active(), span), + }; + } + + // Else, if we have a custom context manager, we need to access it via the context manager + // this may not be available yet, if this is called before the Otel ContextManager was setup + // in this case, we need to return undefined and retry later, hoping that the setup works by then try { const contextManager = (api.context as unknown as ContextApi)._getContextManager(); - const lookup = contextManager.getAsyncLocalStorageLookup(); + const asyncLocalStorage = contextManager?.getAsyncLocalStorageLookup().asyncLocalStorage; return { - asyncLocalStorage: lookup.asyncLocalStorage, - getStoreWithActiveSpan: (span: Span) => api.trace.setSpan(api.context.active(), span as api.Span), + asyncLocalStorage, + getStoreWithActiveSpan: span => api.trace.setSpan(api.context.active(), span as api.Span), }; } catch { return undefined; diff --git a/packages/opentelemetry/src/asyncLocalStorageContextManager.ts b/packages/opentelemetry/src/asyncLocalStorageContextManager.ts index e1a7db98e527..948d7eef92e8 100644 --- a/packages/opentelemetry/src/asyncLocalStorageContextManager.ts +++ b/packages/opentelemetry/src/asyncLocalStorageContextManager.ts @@ -39,18 +39,29 @@ type PatchMap = Record>; const ADD_LISTENER_METHODS = ['addListener', 'on', 'once', 'prependListener', 'prependOnceListener'] as const; +let _asyncLocalStorage: AsyncLocalStorage | undefined; + +/** Get hold of the async local storage instance. */ +export function getAsyncLocalStorage(): AsyncLocalStorage { + if (!_asyncLocalStorage) { + _asyncLocalStorage = new AsyncLocalStorage(); + } + return _asyncLocalStorage; +} + /** * OpenTelemetry-compatible context manager using Node.js `AsyncLocalStorage`. * Semantics match `@opentelemetry/context-async-hooks` (function `bind` + `EventEmitter` patching). */ export class SentryAsyncLocalStorageContextManager implements ContextManager { - protected readonly _asyncLocalStorage = new AsyncLocalStorage(); + protected readonly _asyncLocalStorage; private readonly _kOtListeners = Symbol('OtListeners'); private _wrapped = false; public constructor() { setIsSetup('SentryContextManager'); + this._asyncLocalStorage = getAsyncLocalStorage(); } public active(): Context { diff --git a/packages/server-utils/src/integrations/tracing-channel/vercel-ai.ts b/packages/server-utils/src/integrations/tracing-channel/vercel-ai.ts new file mode 100644 index 000000000000..7227b3bdf1f8 --- /dev/null +++ b/packages/server-utils/src/integrations/tracing-channel/vercel-ai.ts @@ -0,0 +1,49 @@ +import type { IntegrationFn } from '@sentry/core'; +import { defineIntegration, waitForTracingChannelBinding } from '@sentry/core'; +import { vercelAiIntegration as baseVercelAiIntegration } from '../../vercel-ai'; +import * as dc from 'node:diagnostics_channel'; +import { subscribeVercelAiOrchestrionChannels } from '../../vercel-ai/vercel-ai-orchestrion-v6-subscriber'; + +type VercelAiOptions = Parameters[0]; + +// In channel-based (orchestrion) mode we emit our own `gen_ai.*` spans from the +// diagnostics channels. The `ai` SDK still emits its own native OpenTelemetry +// spans whenever the user enables `experimental_telemetry`, which would be +// duplicates. Every native `ai` span carries an `ai.operationId` attribute +// (e.g. `ai.generateText`, `ai.generateText.doGenerate`, `ai.toolCall`) at span +// start, whereas our channel spans use `vercel.ai.operationId` — so we drop the +// native ones up front via `ignoreSpans`, before any vercel-ai processing runs. +const NATIVE_VERCEL_AI_SPANS = { attributes: { 'ai.operationId': /^ai\./ } }; + +const _vercelAiChannelIntegration = ((options: VercelAiOptions = {}) => { + const parentIntegration = baseVercelAiIntegration(options); + + return { + name: 'VercelAI' as const, + beforeSetup(client) { + // Ensure we drop spans emitted by ai v6 or below + // To avoid double-instrumentation - in this scenario, we only want to rely on our own spans + const options = client.getOptions(); + options.ignoreSpans = [...(options.ignoreSpans || []), NATIVE_VERCEL_AI_SPANS]; + }, + setupOnce() { + parentIntegration?.setupOnce?.(); + + // Bail if this is not available + if (!dc.tracingChannel) { + return; + } + + waitForTracingChannelBinding(() => { + subscribeVercelAiOrchestrionChannels(dc.tracingChannel, options); + }); + }, + }; +}) satisfies IntegrationFn; + +/** + * Auto-instrument the `ai` SDK. Supported are: + * - v7 via native `ai:telemetry` tracing channel + * - v6 via orchestrion `orchestrion:ai:*` channels + */ +export const vercelAiChannelIntegration = defineIntegration(_vercelAiChannelIntegration); diff --git a/packages/server-utils/src/orchestrion/channels.ts b/packages/server-utils/src/orchestrion/channels.ts index 28dcf0c33468..fd30b7433fd3 100644 --- a/packages/server-utils/src/orchestrion/channels.ts +++ b/packages/server-utils/src/orchestrion/channels.ts @@ -13,6 +13,18 @@ */ export const CHANNELS = { MYSQL_QUERY: 'orchestrion:mysql:query', + // Vercel AI (`ai`) v6: orchestrion injects these so the same channel-based + // integration that consumes `ai`'s native `ai:telemetry` channel (v7) can + // also instrument v6. Each maps to a top-level function in `ai`'s bundle. + VERCEL_AI_GENERATE_TEXT: 'orchestrion:ai:generateText', + VERCEL_AI_STREAM_TEXT: 'orchestrion:ai:streamText', + VERCEL_AI_EMBED: 'orchestrion:ai:embed', + VERCEL_AI_EXECUTE_TOOL_CALL: 'orchestrion:ai:executeToolCall', + // `resolveLanguageModel` is the single chokepoint every model call flows + // through; we wrap it to monkey-patch `doGenerate`/`doStream` on the returned + // model (the model-call site itself is an inline call with no injectable + // definition). + VERCEL_AI_RESOLVE_LANGUAGE_MODEL: 'orchestrion:ai:resolveLanguageModel', } as const; export type ChannelName = (typeof CHANNELS)[keyof typeof CHANNELS]; diff --git a/packages/server-utils/src/orchestrion/config.ts b/packages/server-utils/src/orchestrion/config.ts index 35b326fb8eb1..04bfede5fce0 100644 --- a/packages/server-utils/src/orchestrion/config.ts +++ b/packages/server-utils/src/orchestrion/config.ts @@ -11,6 +11,19 @@ import type { InstrumentationConfig } from '@apm-js-collab/code-transformer'; * `channelName` here is the unprefixed suffix; the actual diagnostics_channel * name is `orchestrion:${module.name}:${channelName}` (see `channels.ts`). */ +/** + * `ai` ships a single bundled entry per module system, so each instrumented + * function needs one config entry per file (the app loads whichever matches its + * module system). This expands a single target into both. + */ +function vercelAiV6Entries(channelName: string, functionName: string, kind: 'Async' | 'Sync'): InstrumentationConfig[] { + return ['dist/index.js', 'dist/index.mjs'].map(filePath => ({ + channelName, + module: { name: 'ai', versionRange: '>=6.0.0 <7.0.0', filePath }, + functionQuery: { functionName, kind }, + })); +} + export const SENTRY_INSTRUMENTATIONS: InstrumentationConfig[] = [ { channelName: 'query', @@ -32,6 +45,19 @@ export const SENTRY_INSTRUMENTATIONS: InstrumentationConfig[] = [ // attach `'end'`/`'error'` listeners that finish the span. functionQuery: { expressionName: 'query', kind: 'Auto' }, }, + // Vercel AI v6: mirror the v7 native `ai:telemetry` channel by injecting + // channels into the top-level entry points. `resolveLanguageModel` is wrapped + // not to span it, but so the subscriber can monkey-patch `doGenerate`/ + // `doStream` on the returned model (the only way to span the model call, + // which is an inline call with no injectable definition in `ai`). + // `streamText` returns its result synchronously (streaming is lazy), so it's + // `Sync`; the subscriber binds the span via `bindTracingChannelToSpan`, which + // ends it when the (synchronous) call returns. + ...vercelAiV6Entries('generateText', 'generateText', 'Async'), + ...vercelAiV6Entries('streamText', 'streamText', 'Sync'), + ...vercelAiV6Entries('embed', 'embed', 'Async'), + ...vercelAiV6Entries('executeToolCall', 'executeToolCall', 'Async'), + ...vercelAiV6Entries('resolveLanguageModel', 'resolveLanguageModel', 'Sync'), ]; /** diff --git a/packages/server-utils/src/orchestrion/index.ts b/packages/server-utils/src/orchestrion/index.ts index dd3ecd0f8f19..467eb3777134 100644 --- a/packages/server-utils/src/orchestrion/index.ts +++ b/packages/server-utils/src/orchestrion/index.ts @@ -1,2 +1,3 @@ export { detectOrchestrionSetup } from './detect'; export { mysqlChannelIntegration } from '../integrations/tracing-channel/mysql'; +export { vercelAiChannelIntegration } from '../integrations/tracing-channel/vercel-ai'; diff --git a/packages/server-utils/src/tracing-channel.ts b/packages/server-utils/src/tracing-channel.ts index a0d449eb1489..b00a977cd74f 100644 --- a/packages/server-utils/src/tracing-channel.ts +++ b/packages/server-utils/src/tracing-channel.ts @@ -1,7 +1,7 @@ import type { TracingChannel, TracingChannelSubscribers } from 'node:diagnostics_channel'; import type { AsyncLocalStorage } from 'node:async_hooks'; import type { ExclusiveEventHintOrCaptureContext, Span } from '@sentry/core'; -import { _INTERNAL_getTracingChannelBinding, debug, captureException, SPAN_STATUS_ERROR } from '@sentry/core'; +import { debug, captureException, SPAN_STATUS_ERROR, getAsyncContextStrategy, getMainCarrier } from '@sentry/core'; import { DEBUG_BUILD } from './debug-build'; export type TracingChannelPayloadWithSpan = TData & { @@ -133,7 +133,7 @@ function bindSpanToChannelStore( getSpan: (data: TracingChannelPayloadWithSpan) => Span | undefined, ): TracingChannelBindingHandle { // Grabs the tracing channel binding defined by the AsyncContext strategy implementation - const binding = _INTERNAL_getTracingChannelBinding(); + const binding = getAsyncContextStrategy(getMainCarrier()).getTracingChannelBinding?.(); // If no binding, then either the implementer doesn't support tracing channels or there is no active strategy // Failure mode here means we would still access the channel and potentially subscribe to it, but parenting will be off. diff --git a/packages/server-utils/src/vercel-ai/index.ts b/packages/server-utils/src/vercel-ai/index.ts index 18e9dccf3ef5..251ba91519c5 100644 --- a/packages/server-utils/src/vercel-ai/index.ts +++ b/packages/server-utils/src/vercel-ai/index.ts @@ -1,4 +1,4 @@ -import { defineIntegration, type IntegrationFn } from '@sentry/core'; +import { defineIntegration, waitForTracingChannelBinding, type IntegrationFn } from '@sentry/core'; import { subscribeVercelAiTracingChannel } from './vercel-ai-dc-subscriber'; import * as dc from 'node:diagnostics_channel'; @@ -35,9 +35,9 @@ const _vercelAiIntegration = ((options: VercelAiOptions = {}) => { // Subscribe to the `ai` SDK's native telemetry tracing channel (ai >= 7). // This is a no-op on versions that don't publish to the channel, so it is always safe to call. - // The factory needs the Sentry OTel context manager, which `initOpenTelemetry()` registers after `setupOnce`, so defer a tick. - // Options are passed in here rather than read back off the integration per event. - void Promise.resolve().then(() => subscribeVercelAiTracingChannel(dc.tracingChannel, options)); + waitForTracingChannelBinding(() => { + subscribeVercelAiTracingChannel(dc.tracingChannel, options); + }); }, }; }) satisfies IntegrationFn; diff --git a/packages/server-utils/src/vercel-ai/vercel-ai-dc-subscriber.ts b/packages/server-utils/src/vercel-ai/vercel-ai-dc-subscriber.ts index f2b4b0debff9..74ff4ad6cc9c 100644 --- a/packages/server-utils/src/vercel-ai/vercel-ai-dc-subscriber.ts +++ b/packages/server-utils/src/vercel-ai/vercel-ai-dc-subscriber.ts @@ -95,11 +95,21 @@ export function clearOperationId(data: VercelAiChannelMessage): void { } const callId = asString(data.event.callId); if (callId) { - operationIdByCallId.delete(callId); - toolDescriptionsByCallId.delete(callId); + clearOperationCallId(callId); } } +/** + * Drop the per-operation `callId` maps for a single id. The v6 orchestrion adapter uses this to clear a + * `streamText` operation only after its lazily-run model call settles — the operation's own span ends + * synchronously (when `streamText` returns) but the model call runs later as the stream is consumed, and + * it still needs the operation's `operationId`/`isStream` entry to name itself `ai.streamText.doStream`. + */ +export function clearOperationCallId(callId: string): void { + operationIdByCallId.delete(callId); + toolDescriptionsByCallId.delete(callId); +} + /** Record tool name → description from an event's `tools`, so tool spans can backfill the description. */ function recordToolDescriptions(callId: string | undefined, tools: unknown): void { if (!callId || !Array.isArray(tools)) { @@ -172,10 +182,10 @@ export interface VercelAiChannelMessage { * nested AI SDK operations (model calls, tool calls) become children of the enclosing span without * any manual parent bookkeeping here. */ -type VercelAiTracingChannelFactory = (name: string) => TracingChannel; +export type VercelAiTracingChannelFactory = (name: string) => TracingChannel; /** Integration-level recording options, pinned at subscribe time so we never look the integration up per event. */ -interface VercelAiChannelOptions { +export interface VercelAiChannelOptions { recordInputs?: boolean; recordOutputs?: boolean; enableTruncation?: boolean; diff --git a/packages/server-utils/src/vercel-ai/vercel-ai-orchestrion-v6-subscriber.ts b/packages/server-utils/src/vercel-ai/vercel-ai-orchestrion-v6-subscriber.ts new file mode 100644 index 000000000000..dc374314544b --- /dev/null +++ b/packages/server-utils/src/vercel-ai/vercel-ai-orchestrion-v6-subscriber.ts @@ -0,0 +1,411 @@ +import { AsyncLocalStorage } from 'node:async_hooks'; +import type { Span } from '@sentry/core'; +import { debug, getActiveSpan, SPAN_STATUS_ERROR, withActiveSpan } from '@sentry/core'; +import { DEBUG_BUILD } from '../debug-build'; +import { CHANNELS } from '../orchestrion/channels'; +import { bindTracingChannelToSpan, type TracingChannelPayloadWithSpan } from '../tracing-channel'; +import { + clearOperationCallId, + clearOperationId, + createSpanFromMessage, + enrichSpanOnEnd, + type VercelAiChannelMessage, + type VercelAiChannelOptions, + type VercelAiTracingChannelFactory, +} from './vercel-ai-dc-subscriber'; + +/** + * v6 channel adapter for the Vercel AI (`ai`) SDK. + * + * `ai` >= 7 publishes a normalized `ai:telemetry` tracing channel natively + * (consumed by `subscribeVercelAiTracingChannel`). v6 has no such channel, so + * orchestrion injects `orchestrion:ai:*` channels around the top-level + * functions (see `orchestrion/config.ts`). The injected channels carry only the + * wrapped call's `{ arguments, result, error }` — NOT v7's normalized `event` + * object — so this adapter reconstructs an equivalent {@link VercelAiChannelMessage} + * from v6's argument/result shapes and delegates to the SAME span-building core + * (`createSpanFromMessage` / `enrichSpanOnEnd`) the v7 subscriber uses, so the + * emitted spans are identical between v6 and v7. + * + * Like the v7 subscriber, each operation channel is wired up via + * {@link bindTracingChannelToSpan}, which binds the opened span into the runtime's + * async context for the duration of the traced call and ends it when the call + * settles. That binding is what lets the model call below find its enclosing + * `invoke_agent` span via the active context (see {@link resolveModelCallParent}). + * + * The model call (`languageModelCall` / `generate_content` span) has no + * injectable definition in `ai`, so we instead wrap `resolveLanguageModel` (the + * single chokepoint every model call flows through) and monkey-patch + * `doGenerate`/`doStream` on the returned model. + */ + +/** Shape orchestrion's transform attaches to the tracing-channel context. */ +interface OrchestrionContext { + arguments: unknown[]; + result?: unknown; + error?: unknown; +} + +/** Builds the normalized message for a channel from the wrapped call's first-arg options. */ +type MessageBuilder = (options: Record, telemetry: Record) => VercelAiChannelMessage; + +/** A resolved `ai` language model — has `doGenerate`/`doStream` and identity fields. */ +interface ResolvedModel { + modelId?: string; + provider?: string; + doGenerate?: (...args: unknown[]) => Promise; + doStream?: (...args: unknown[]) => Promise; +} + +const PATCHED = Symbol('SentryVercelAiModelPatched'); +const PARENT = Symbol('SentryVercelAiModelParent'); + +/** A resolved model with our patch bookkeeping (idempotency flag + captured parent span). */ +type PatchableModel = ResolvedModel & { [PATCHED]?: boolean; [PARENT]?: Span }; + +// Per-operation correlation id. No Date/random (unavailable / non-deterministic) — a counter is enough. +let callIdCounter = 0; +function nextCallId(): string { + return `v6-${++callIdCounter}`; +} + +// The message built on `start` for each operation, keyed by the (stable-identity) channel context, so +// the `beforeSpanEnd` handler can enrich the span from the settled result and clear the `callId` maps. +const messages = new WeakMap(); +// The spans we opened for top-level operations, and each one's `callId`. A model call resolves its +// parent against this set (so it never mis-attributes to the enclosing `main`/user span) and reads the +// parent's `callId` so its span can be named after the operation (e.g. `ai.streamText.doStream`). +const operationSpans = new WeakSet(); +const callIdBySpan = new WeakMap(); +// Carries the enclosing operation span down to the patched `doGenerate`/`doStream`, where the active +// span is the `ai` SDK's own (ignored) model-call span rather than our operation span. It's bound onto +// the operation channel via `bindStore` (see `bindOperation`), so it's scoped per traced operation and +// propagates across the awaits inside it — which is what lets concurrent operations sharing a single +// model instance each resolve their own parent (the `[PARENT]` slot on the shared model cannot). +const operationParentStore = new AsyncLocalStorage(); + +let subscribed = false; + +/** + * Subscribe the v6 orchestrion channel adapter. Safe to always call: inert on + * `ai` >= 7 (those channels are never published) and when orchestrion injection + * isn't active. Idempotent. + * + * `tracingChannel` is the platform-provided factory (the same one passed to + * `subscribeVercelAiTracingChannel`); `options` pins the recording settings at + * subscribe time so we never look the integration up per event. + */ +export function subscribeVercelAiOrchestrionChannels( + tracingChannel: VercelAiTracingChannelFactory, + options: VercelAiChannelOptions = {}, +): void { + if (subscribed) { + return; + } + subscribed = true; + + try { + bindOperation(tracingChannel, CHANNELS.VERCEL_AI_GENERATE_TEXT, buildTextMessage('generateText'), options); + bindOperation(tracingChannel, CHANNELS.VERCEL_AI_STREAM_TEXT, buildTextMessage('streamText'), options); + bindOperation( + tracingChannel, + CHANNELS.VERCEL_AI_EMBED, + (callOptions, telemetry) => ({ + type: 'embed', + event: { + callId: nextCallId(), + ...modelFields(callOptions.model), + maxRetries: callOptions.maxRetries, + value: callOptions.value, + ...recording(telemetry), + }, + }), + options, + ); + bindOperation( + tracingChannel, + CHANNELS.VERCEL_AI_EXECUTE_TOOL_CALL, + (callOptions, telemetry) => ({ + type: 'executeTool', + // v6 carries the tool definitions on the executeToolCall args (a record keyed by name); + // the shared core reads the matching tool's `description` for the span. + event: { + callId: nextCallId(), + toolCall: callOptions.toolCall, + tools: callOptions.tools, + ...recording(telemetry), + }, + }), + options, + ); + subscribeResolveLanguageModel(tracingChannel, CHANNELS.VERCEL_AI_RESOLVE_LANGUAGE_MODEL, options); + } catch { + DEBUG_BUILD && debug.log('Vercel AI orchestrion channel subscription failed.'); + } +} + +/** + * Bind one operation channel: `getSpan` opens a span from the message reconstructed out of the wrapped + * call's first argument; `beforeSpanEnd` enriches it from the settled result (tokens, output messages, + * finish reasons, …) before the helper ends the span. + * + * An operation whose `experimental_telemetry.isEnabled` is explicitly `false` is skipped entirely (no + * span): the orchestrion channel fires regardless of that flag, whereas v7's native `ai:telemetry` + * channel is simply not published in that case — so we reproduce v7's "no telemetry → no span". + */ +function bindOperation( + tracingChannel: VercelAiTracingChannelFactory, + channelName: string, + build: MessageBuilder, + options: VercelAiChannelOptions, +): void { + const channel = tracingChannel(channelName); + + // Bind the operation span into our own async-context store. We bind it BEFORE `bindTracingChannelToSpan` + // so that — bound stores run last-in-first-out — this transform runs AFTER the helper's producer has + // stashed the span on `data._sentrySpan`. `bindStore` activates the store via `runStores` for the + // traced operation, and that propagates across the awaits inside it, so a model call awaited within the + // operation reads ITS operation's span — no leak across sequential calls, no clobbering across + // concurrent ones (which a single mutable slot on the shared model instance cannot achieve). + // `bindStore`'s store type is the channel's data type; our store value is the operation span, so cast + // (the runtime treats the store value opaquely — same as `bindTracingChannelToSpan` does internally). + channel.start.bindStore(operationParentStore as unknown as AsyncLocalStorage, data => { + return (data as TracingChannelPayloadWithSpan)._sentrySpan as unknown as OrchestrionContext; + }); + + bindTracingChannelToSpan( + channel, + (data: TracingChannelPayloadWithSpan) => { + const callOptions = isRecord(data.arguments[0]) ? data.arguments[0] : {}; + const telemetry = isRecord(callOptions.experimental_telemetry) ? callOptions.experimental_telemetry : {}; + if (telemetry.isEnabled === false) { + return undefined; + } + const message = build(callOptions, telemetry); + const span = createSpanFromMessage(message, options); + if (span) { + messages.set(data, message); + operationSpans.add(span); + const callId = asString(message.event.callId); + if (callId) { + callIdBySpan.set(span, callId); + } + } + return span; + }, + { + beforeSpanEnd: (span, data) => { + const message = messages.get(data); + if (!message) { + return; + } + // The helper's `error` handler already set the span status; only enrich from a successful result. + if (!('error' in data)) { + // v6's `executeToolCall` returns the tool result/error object directly, whereas the shared core + // (matching v7) expects it nested under `output`; wrap it so tool-error detection works. + message.result = message.type === 'executeTool' ? { output: data.result } : data.result; + enrichSpanOnEnd(span, message, options); + } + // A `streamText` model call runs lazily, after this (synchronously-returning) operation's span has + // already ended, so its `callId` entry must outlive the operation — it's cleared once the model + // call settles (see `patchModelMethod`). Every other operation can clear here. + if (message.type !== 'streamText') { + clearOperationId(message); + } + messages.delete(data); + }, + }, + ); +} + +/** + * `resolveLanguageModel` returns the model every call flows through. We don't span it — on `end` we + * monkey-patch `doGenerate`/`doStream` on the returned model so each invocation produces a + * `languageModelCall` span parented to the enclosing invoke_agent span. + */ +function subscribeResolveLanguageModel( + tracingChannel: VercelAiTracingChannelFactory, + channelName: string, + options: VercelAiChannelOptions, +): void { + tracingChannel(channelName).subscribe({ + end(rawCtx) { + const ctx = rawCtx as OrchestrionContext; + if (!isRecord(ctx.result)) { + return; + } + const model = ctx.result as PatchableModel; + // `resolveLanguageModel` runs synchronously inside the operation body, where the operation span is + // the active span. `generateText`/`embed` recover that parent from `operationParentStore` at model + // call time, but `streamText` runs its model call lazily — after the operation's async context (and + // thus `operationParentStore`) has unwound — so stash the operation span on the model as its + // fallback parent here. + const active = getActiveSpan(); + if (active && operationSpans.has(active)) { + model[PARENT] = active; + } + if (!model[PATCHED]) { + model[PATCHED] = true; + patchModelMethod(model, 'doGenerate', options); + patchModelMethod(model, 'doStream', options); + } + }, + start() { + /* no-op */ + }, + asyncStart() { + /* no-op */ + }, + asyncEnd() { + /* no-op */ + }, + error() { + /* no-op */ + }, + }); +} + +/** + * Pick the invoke_agent span a model call should hang under. + * + * Prefer the active span when it is an operation span: for `generateText`/`embed` the model call is + * awaited inside the operation body, so the async context still carries the right operation span — and + * crucially this disambiguates concurrent calls that share one model instance (where the captured + * `model[PARENT]` would be whichever operation resolved the model last). Fall back to the captured + * parent for `streamText`, whose model call runs after the operation returned and the bound context has + * unwound. Returns `undefined` when neither is an operation span — e.g. telemetry was disabled for the + * enclosing call — so the model call is skipped too. + */ +function resolveModelCallParent(model: PatchableModel): Span | undefined { + // The model call is awaited inside the operation body (`generateText`/`embed`), so the operation span + // bound onto `operationParentStore` for that operation is still in scope — and is per-operation, so it + // stays correct even when concurrent operations share one model instance. + const fromContext = operationParentStore.getStore(); + if (fromContext && operationSpans.has(fromContext)) { + return fromContext; + } + // Fallback for `streamText`, whose `doStream` runs after the operation's async context has unwound: + // the parent captured on the model at resolve time. + const captured = model[PARENT]; + return captured && operationSpans.has(captured) ? captured : undefined; +} + +function patchModelMethod( + model: PatchableModel, + method: 'doGenerate' | 'doStream', + options: VercelAiChannelOptions, +): void { + const original = model[method]; + if (typeof original !== 'function') { + return; + } + model[method] = function (this: unknown, ...args: unknown[]): Promise { + const parent = resolveModelCallParent(model); + // No enclosing operation span (e.g. telemetry disabled for the call) → don't open a model-call span. + if (!parent) { + return Promise.resolve(original.apply(this, args)); + } + + const callArgs = isRecord(args[0]) ? args[0] : {}; + // Carry the operation's `callId` so the shared core can name the span after it + // (`ai.generateText.doGenerate` / `ai.streamText.doStream`). + const callId = callIdBySpan.get(parent); + const message: VercelAiChannelMessage = { + type: 'languageModelCall', + event: { + callId, + provider: model.provider, + modelId: model.modelId, + tools: callArgs.tools, + messages: callArgs.prompt, + }, + }; + const span = withActiveSpan(parent, () => createSpanFromMessage(message, options)); + // `languageModelCall` always opens a span; the guard just keeps the wrapper safe if that changes. + if (!span) { + return Promise.resolve(original.apply(this, args)); + } + + // `streamText` ends its operation span synchronously, so its `callId` entry was deliberately left in + // place for this (lazy) model call; drop it now that we've used it. + const clearStreamCallId = (): void => { + if (method === 'doStream' && callId) { + clearOperationCallId(callId); + } + }; + + let result: Promise; + try { + result = Promise.resolve(original.apply(this, args)); + } catch (error) { + span.setStatus({ code: SPAN_STATUS_ERROR, message: error instanceof Error ? error.message : 'unknown_error' }); + span.end(); + clearStreamCallId(); + throw error; + } + // `doStream` resolves to `{ stream, ... }` before the stream is consumed; we end here (start/end + // bracket the call) to match the channel timing. + return result.then( + value => { + message.result = value; + enrichSpanOnEnd(span, message, options); + span.end(); + clearStreamCallId(); + return value; + }, + error => { + span.setStatus({ code: SPAN_STATUS_ERROR, message: error instanceof Error ? error.message : 'unknown_error' }); + span.end(); + clearStreamCallId(); + throw error; + }, + ); + }; +} + +function buildTextMessage(type: 'generateText' | 'streamText'): MessageBuilder { + return (options, telemetry) => ({ + type, + event: { + callId: nextCallId(), + operationId: type === 'streamText' ? 'ai.streamText' : 'ai.generateText', + functionId: asString(telemetry.functionId), + ...modelFields(options.model), + maxRetries: options.maxRetries, + // Normalize to the message-array shape the shared core (and v7's channel) expects: a bare string + // `prompt` becomes a single user message, matching the SDK's own normalization. + messages: normalizePromptMessages(options), + ...recording(telemetry), + }, + }); +} + +function normalizePromptMessages(options: Record): unknown { + if (Array.isArray(options.messages)) { + return options.messages; + } + if (typeof options.prompt === 'string') { + return [{ role: 'user', content: options.prompt }]; + } + return options.messages ?? options.prompt; +} + +function recording(telemetry: Record): { recordInputs: unknown; recordOutputs: unknown } { + return { recordInputs: telemetry.recordInputs, recordOutputs: telemetry.recordOutputs }; +} + +function modelFields(model: unknown): { provider?: string; modelId?: string } { + return { provider: modelField(model, 'provider'), modelId: modelField(model, 'modelId') }; +} + +function modelField(model: unknown, field: 'modelId' | 'provider'): string | undefined { + return isRecord(model) ? asString(model[field]) : undefined; +} + +function asString(value: unknown): string | undefined { + return typeof value === 'string' ? value : undefined; +} + +function isRecord(value: unknown): value is Record { + return typeof value === 'object' && value !== null; +} diff --git a/packages/vercel-edge/src/sdk.ts b/packages/vercel-edge/src/sdk.ts index be5ef0ea9403..97d622824c44 100644 --- a/packages/vercel-edge/src/sdk.ts +++ b/packages/vercel-edge/src/sdk.ts @@ -63,7 +63,7 @@ export function getDefaultIntegrations(_options: Options): Integration[] { /** Inits the Sentry NextJS SDK on the Edge Runtime. */ export function init(options: VercelEdgeOptions = {}): Client | undefined { - setOpenTelemetryContextAsyncContextStrategy(); + setOpenTelemetryContextAsyncContextStrategy(options); const scope = getCurrentScope(); scope.update(options.initialScope);