From 6d17c59cd6365d4716bdfb0e688b57a766b9bddf Mon Sep 17 00:00:00 2001 From: s1gr1d <32902192+s1gr1d@users.noreply.github.com> Date: Wed, 24 Jun 2026 13:57:52 +0200 Subject: [PATCH 1/2] streamline instrumentation --- .../src/integrations/tracing/amqplib/index.ts | 17 +- .../tracing/amqplib/vendored/amqplib.ts | 621 ------------------ .../amqplib/vendored/instrumentation.ts | 143 ++++ .../tracing/amqplib/vendored/patches.ts | 299 +++++++++ .../amqplib/vendored/semconv-obsolete.ts | 90 --- .../tracing/amqplib/vendored/semconv.ts | 83 ++- .../tracing/amqplib/vendored/types.ts | 109 +-- .../tracing/amqplib/vendored/utils.ts | 220 ++++--- 8 files changed, 603 insertions(+), 979 deletions(-) delete mode 100644 packages/node/src/integrations/tracing/amqplib/vendored/amqplib.ts create mode 100644 packages/node/src/integrations/tracing/amqplib/vendored/instrumentation.ts create mode 100644 packages/node/src/integrations/tracing/amqplib/vendored/patches.ts delete mode 100644 packages/node/src/integrations/tracing/amqplib/vendored/semconv-obsolete.ts diff --git a/packages/node/src/integrations/tracing/amqplib/index.ts b/packages/node/src/integrations/tracing/amqplib/index.ts index a9a1446ce4ee..5e04f8f6a137 100644 --- a/packages/node/src/integrations/tracing/amqplib/index.ts +++ b/packages/node/src/integrations/tracing/amqplib/index.ts @@ -1,22 +1,11 @@ -import type { Span } from '@opentelemetry/api'; -import { AmqplibInstrumentation } from './vendored/amqplib'; -import type { AmqplibInstrumentationConfig } from './vendored/types'; import type { IntegrationFn } from '@sentry/core'; import { defineIntegration } from '@sentry/core'; -import { addOriginToSpan, generateInstrumentOnce } from '@sentry/node-core'; +import { generateInstrumentOnce } from '@sentry/node-core'; +import { AmqplibInstrumentation } from './vendored/instrumentation'; const INTEGRATION_NAME = 'Amqplib'; -const config: AmqplibInstrumentationConfig = { - consumeEndHook: (span: Span) => { - addOriginToSpan(span, 'auto.amqplib.otel.consumer'); - }, - publishHook: (span: Span) => { - addOriginToSpan(span, 'auto.amqplib.otel.publisher'); - }, -}; - -export const instrumentAmqplib = generateInstrumentOnce(INTEGRATION_NAME, () => new AmqplibInstrumentation(config)); +export const instrumentAmqplib = generateInstrumentOnce(INTEGRATION_NAME, () => new AmqplibInstrumentation()); const _amqplibIntegration = (() => { return { diff --git a/packages/node/src/integrations/tracing/amqplib/vendored/amqplib.ts b/packages/node/src/integrations/tracing/amqplib/vendored/amqplib.ts deleted file mode 100644 index 4b65c9f3f3dc..000000000000 --- a/packages/node/src/integrations/tracing/amqplib/vendored/amqplib.ts +++ /dev/null @@ -1,621 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - * - * NOTICE from the Sentry authors: - * - Vendored from: https://github.com/open-telemetry/opentelemetry-js-contrib/tree/15ef7506553f631ea4181391e0c5725a56f0d082/packages/instrumentation-amqplib - * - Upstream version: @opentelemetry/instrumentation-amqplib@0.65.0 - */ -/* eslint-disable */ - -import { - context, - diag, - propagation, - trace, - Span, - SpanKind, - SpanStatusCode, - ROOT_CONTEXT, - Link, - Context, -} from '@opentelemetry/api'; -import { timestampInSeconds } from '@sentry/core'; -import { - InstrumentationBase, - InstrumentationNodeModuleDefinition, - isWrapped, - safeExecuteInTheMiddle, - SemconvStability, - semconvStabilityFromStr, -} from '@opentelemetry/instrumentation'; -import { InstrumentationNodeModuleFile } from '../../InstrumentationNodeModuleFile'; -import { ATTR_MESSAGING_OPERATION } from './semconv'; -import { - ATTR_MESSAGING_DESTINATION, - ATTR_MESSAGING_DESTINATION_KIND, - ATTR_MESSAGING_RABBITMQ_ROUTING_KEY, - MESSAGING_DESTINATION_KIND_VALUE_TOPIC, - MESSAGING_OPERATION_VALUE_PROCESS, - OLD_ATTR_MESSAGING_MESSAGE_ID, - ATTR_MESSAGING_CONVERSATION_ID, -} from './semconv-obsolete'; -import type { Connection, Options, Replies } from './amqplib-types'; -import { AmqplibInstrumentationConfig, DEFAULT_CONFIG, EndOperation, type ConsumeMessage, type Message } from './types'; -import { - CHANNEL_CONSUME_TIMEOUT_TIMER, - CHANNEL_SPANS_NOT_ENDED, - CONNECTION_ATTRIBUTES, - getConnectionAttributesFromServer, - getConnectionAttributesFromUrl, - InstrumentationConnection, - InstrumentationConsumeChannel, - InstrumentationConsumeMessage, - InstrumentationMessage, - InstrumentationPublishChannel, - isConfirmChannelTracing, - markConfirmChannelTracing, - MESSAGE_STORED_SPAN, - normalizeExchange, - unmarkConfirmChannelTracing, -} from './utils'; - -import { SDK_VERSION } from '@sentry/core'; - -const PACKAGE_NAME = '@sentry/instrumentation-amqplib'; -const supportedVersions = ['>=0.5.5 <2']; - -export class AmqplibInstrumentation extends InstrumentationBase { - private _netSemconvStability!: SemconvStability; - - constructor(config: AmqplibInstrumentationConfig = {}) { - super(PACKAGE_NAME, SDK_VERSION, { ...DEFAULT_CONFIG, ...config }); - this._setSemconvStabilityFromEnv(); - } - - // Used for testing. - private _setSemconvStabilityFromEnv() { - this._netSemconvStability = semconvStabilityFromStr('http', process.env.OTEL_SEMCONV_STABILITY_OPT_IN); - } - - override setConfig(config: AmqplibInstrumentationConfig = {}) { - super.setConfig({ ...DEFAULT_CONFIG, ...config }); - } - - protected init() { - const channelModelModuleFile = new InstrumentationNodeModuleFile( - 'amqplib/lib/channel_model.js', - supportedVersions, - this.patchChannelModel.bind(this), - this.unpatchChannelModel.bind(this), - ); - - const callbackModelModuleFile = new InstrumentationNodeModuleFile( - 'amqplib/lib/callback_model.js', - supportedVersions, - this.patchChannelModel.bind(this), - this.unpatchChannelModel.bind(this), - ); - - const connectModuleFile = new InstrumentationNodeModuleFile( - 'amqplib/lib/connect.js', - supportedVersions, - this.patchConnect.bind(this), - this.unpatchConnect.bind(this), - ); - - const module = new InstrumentationNodeModuleDefinition('amqplib', supportedVersions, undefined, undefined, [ - channelModelModuleFile, - connectModuleFile, - callbackModelModuleFile, - ]); - return module; - } - - private patchConnect(moduleExports: any) { - moduleExports = this.unpatchConnect(moduleExports); - if (!isWrapped(moduleExports.connect)) { - this._wrap(moduleExports, 'connect', this.getConnectPatch.bind(this)); - } - return moduleExports; - } - - private unpatchConnect(moduleExports: any) { - if (isWrapped(moduleExports.connect)) { - this._unwrap(moduleExports, 'connect'); - } - return moduleExports; - } - - private patchChannelModel(moduleExports: any, moduleVersion: string | undefined) { - if (!isWrapped(moduleExports.Channel.prototype.publish)) { - this._wrap(moduleExports.Channel.prototype, 'publish', this.getPublishPatch.bind(this, moduleVersion)); - } - if (!isWrapped(moduleExports.Channel.prototype.consume)) { - this._wrap(moduleExports.Channel.prototype, 'consume', this.getConsumePatch.bind(this, moduleVersion)); - } - if (!isWrapped(moduleExports.Channel.prototype.ack)) { - this._wrap(moduleExports.Channel.prototype, 'ack', this.getAckPatch.bind(this, false, EndOperation.Ack)); - } - if (!isWrapped(moduleExports.Channel.prototype.nack)) { - this._wrap(moduleExports.Channel.prototype, 'nack', this.getAckPatch.bind(this, true, EndOperation.Nack)); - } - if (!isWrapped(moduleExports.Channel.prototype.reject)) { - this._wrap(moduleExports.Channel.prototype, 'reject', this.getAckPatch.bind(this, true, EndOperation.Reject)); - } - if (!isWrapped(moduleExports.Channel.prototype.ackAll)) { - this._wrap(moduleExports.Channel.prototype, 'ackAll', this.getAckAllPatch.bind(this, false, EndOperation.AckAll)); - } - if (!isWrapped(moduleExports.Channel.prototype.nackAll)) { - this._wrap( - moduleExports.Channel.prototype, - 'nackAll', - this.getAckAllPatch.bind(this, true, EndOperation.NackAll), - ); - } - if (!isWrapped(moduleExports.Channel.prototype.emit)) { - this._wrap(moduleExports.Channel.prototype, 'emit', this.getChannelEmitPatch.bind(this)); - } - if (!isWrapped(moduleExports.ConfirmChannel.prototype.publish)) { - this._wrap( - moduleExports.ConfirmChannel.prototype, - 'publish', - this.getConfirmedPublishPatch.bind(this, moduleVersion), - ); - } - return moduleExports; - } - - private unpatchChannelModel(moduleExports: any) { - if (isWrapped(moduleExports.Channel.prototype.publish)) { - this._unwrap(moduleExports.Channel.prototype, 'publish'); - } - if (isWrapped(moduleExports.Channel.prototype.consume)) { - this._unwrap(moduleExports.Channel.prototype, 'consume'); - } - if (isWrapped(moduleExports.Channel.prototype.ack)) { - this._unwrap(moduleExports.Channel.prototype, 'ack'); - } - if (isWrapped(moduleExports.Channel.prototype.nack)) { - this._unwrap(moduleExports.Channel.prototype, 'nack'); - } - if (isWrapped(moduleExports.Channel.prototype.reject)) { - this._unwrap(moduleExports.Channel.prototype, 'reject'); - } - if (isWrapped(moduleExports.Channel.prototype.ackAll)) { - this._unwrap(moduleExports.Channel.prototype, 'ackAll'); - } - if (isWrapped(moduleExports.Channel.prototype.nackAll)) { - this._unwrap(moduleExports.Channel.prototype, 'nackAll'); - } - if (isWrapped(moduleExports.Channel.prototype.emit)) { - this._unwrap(moduleExports.Channel.prototype, 'emit'); - } - if (isWrapped(moduleExports.ConfirmChannel.prototype.publish)) { - this._unwrap(moduleExports.ConfirmChannel.prototype, 'publish'); - } - return moduleExports; - } - - private getConnectPatch( - original: ( - url: string | Options.Connect, - socketOptions: any, - openCallback: (err: any, connection: Connection) => void, - ) => Connection, - ) { - const self = this; - return function patchedConnect( - this: unknown, - url: string | Options.Connect, - socketOptions: any, - openCallback: Function, - ) { - return original.call( - this, - url, - socketOptions, - function (this: unknown, err: any, conn: InstrumentationConnection) { - if (err == null) { - const urlAttributes = getConnectionAttributesFromUrl(url, self._netSemconvStability); - const serverAttributes = getConnectionAttributesFromServer(conn); - conn[CONNECTION_ATTRIBUTES] = { - ...urlAttributes, - ...serverAttributes, - }; - } - openCallback.apply(this, arguments); - }, - ); - }; - } - - private getChannelEmitPatch(original: Function) { - const self = this; - return function emit(this: InstrumentationConsumeChannel, eventName: string) { - if (eventName === 'close') { - self.endAllSpansOnChannel(this, true, EndOperation.ChannelClosed, undefined); - const activeTimer = this[CHANNEL_CONSUME_TIMEOUT_TIMER]; - if (activeTimer) { - clearInterval(activeTimer); - } - this[CHANNEL_CONSUME_TIMEOUT_TIMER] = undefined; - } else if (eventName === 'error') { - self.endAllSpansOnChannel(this, true, EndOperation.ChannelError, undefined); - } - return original.apply(this, arguments); - }; - } - - private getAckAllPatch(isRejected: boolean, endOperation: EndOperation, original: Function) { - const self = this; - return function ackAll(this: InstrumentationConsumeChannel, requeueOrEmpty?: boolean): void { - self.endAllSpansOnChannel(this, isRejected, endOperation, requeueOrEmpty); - return original.apply(this, arguments); - }; - } - - private getAckPatch(isRejected: boolean, endOperation: EndOperation, original: Function) { - const self = this; - return function ack( - this: InstrumentationConsumeChannel, - message: Message, - allUpToOrRequeue?: boolean, - requeue?: boolean, - ): void { - const channel = this; - // we use this patch in reject function as well, but it has different signature - const requeueResolved = endOperation === EndOperation.Reject ? allUpToOrRequeue : requeue; - - const spansNotEnded: { msg: Message }[] = channel[CHANNEL_SPANS_NOT_ENDED] ?? []; - const msgIndex = spansNotEnded.findIndex(msgDetails => msgDetails.msg === message); - if (msgIndex < 0) { - // should not happen in happy flow - // but possible if user is calling the api function ack twice with same message - self.endConsumerSpan(message, isRejected, endOperation, requeueResolved); - } else if (endOperation !== EndOperation.Reject && allUpToOrRequeue) { - for (let i = 0; i <= msgIndex; i++) { - self.endConsumerSpan(spansNotEnded[i]!.msg, isRejected, endOperation, requeueResolved); - } - spansNotEnded.splice(0, msgIndex + 1); - } else { - self.endConsumerSpan(message, isRejected, endOperation, requeueResolved); - spansNotEnded.splice(msgIndex, 1); - } - return original.apply(this, arguments); - }; - } - - private getConsumePatch(moduleVersion: string | undefined, original: Function) { - const self = this; - return function consume( - this: InstrumentationConsumeChannel, - queue: string, - onMessage: (msg: ConsumeMessage | null) => void, - options?: Options.Consume, - ): Promise { - const channel = this; - if (!Object.prototype.hasOwnProperty.call(channel, CHANNEL_SPANS_NOT_ENDED)) { - const { consumeTimeoutMs } = self.getConfig(); - if (consumeTimeoutMs) { - const timer = setInterval(() => { - self.checkConsumeTimeoutOnChannel(channel); - }, consumeTimeoutMs); - timer.unref(); - channel[CHANNEL_CONSUME_TIMEOUT_TIMER] = timer; - } - channel[CHANNEL_SPANS_NOT_ENDED] = []; - } - - const patchedOnMessage = function (this: unknown, msg: InstrumentationConsumeMessage | null) { - // msg is expected to be null for signaling consumer cancel notification - // https://www.rabbitmq.com/consumer-cancel.html - // in this case, we do not start a span, as this is not a real message. - if (!msg) { - return onMessage.call(this, msg); - } - - const headers = msg.properties.headers ?? {}; - let parentContext: Context | undefined = propagation.extract(ROOT_CONTEXT, headers); - const exchange = msg.fields?.exchange; - let links: Link[] | undefined; - if (self._config.useLinksForConsume) { - const parentSpanContext = parentContext ? trace.getSpan(parentContext)?.spanContext() : undefined; - parentContext = undefined; - if (parentSpanContext) { - links = [ - { - context: parentSpanContext, - }, - ]; - } - } - const span = self.tracer.startSpan( - `${queue} process`, - { - kind: SpanKind.CONSUMER, - attributes: { - ...channel?.connection?.[CONNECTION_ATTRIBUTES], - [ATTR_MESSAGING_DESTINATION]: exchange, - [ATTR_MESSAGING_DESTINATION_KIND]: MESSAGING_DESTINATION_KIND_VALUE_TOPIC, - [ATTR_MESSAGING_RABBITMQ_ROUTING_KEY]: msg.fields?.routingKey, - [ATTR_MESSAGING_OPERATION]: MESSAGING_OPERATION_VALUE_PROCESS, - [OLD_ATTR_MESSAGING_MESSAGE_ID]: msg?.properties.messageId, - [ATTR_MESSAGING_CONVERSATION_ID]: msg?.properties.correlationId, - }, - links, - }, - parentContext, - ); - - const { consumeHook } = self.getConfig(); - if (consumeHook) { - safeExecuteInTheMiddle( - () => consumeHook(span, { moduleVersion, msg }), - e => { - if (e) { - diag.error('amqplib instrumentation: consumerHook error', e); - } - }, - true, - ); - } - - if (!options?.noAck) { - // store the message on the channel so we can close the span on ackAll etc - channel[CHANNEL_SPANS_NOT_ENDED]!.push({ - msg, - timeOfConsume: timestampInSeconds(), - }); - - // store the span on the message, so we can end it when user call 'ack' on it - msg[MESSAGE_STORED_SPAN] = span; - } - const setContext: Context = parentContext ? parentContext : ROOT_CONTEXT; - context.with(trace.setSpan(setContext, span), () => { - onMessage.call(this, msg); - }); - - if (options?.noAck) { - self.callConsumeEndHook(span, msg, false, EndOperation.AutoAck); - span.end(); - } - }; - arguments[1] = patchedOnMessage; - return original.apply(this, arguments); - }; - } - - private getConfirmedPublishPatch(moduleVersion: string | undefined, original: Function) { - const self = this; - return function confirmedPublish( - this: InstrumentationConsumeChannel, - exchange: string, - routingKey: string, - content: Buffer, - options?: Options.Publish, - callback?: (err: any, ok: Replies.Empty) => void, - ): boolean { - const channel = this; - const { span, modifiedOptions } = self.createPublishSpan(self, exchange, routingKey, channel, options); - - const { publishHook } = self.getConfig(); - if (publishHook) { - safeExecuteInTheMiddle( - () => - publishHook(span, { - moduleVersion, - exchange, - routingKey, - content, - options: modifiedOptions, - isConfirmChannel: true, - }), - e => { - if (e) { - diag.error('amqplib instrumentation: publishHook error', e); - } - }, - true, - ); - } - - const patchedOnConfirm = function (this: unknown, err: any, ok: Replies.Empty) { - try { - callback?.call(this, err, ok); - } finally { - const { publishConfirmHook } = self.getConfig(); - if (publishConfirmHook) { - safeExecuteInTheMiddle( - () => - publishConfirmHook(span, { - moduleVersion, - exchange, - routingKey, - content, - options, - isConfirmChannel: true, - confirmError: err, - }), - e => { - if (e) { - diag.error('amqplib instrumentation: publishConfirmHook error', e); - } - }, - true, - ); - } - - if (err) { - span.setStatus({ - code: SpanStatusCode.ERROR, - message: "message confirmation has been nack'ed", - }); - } - span.end(); - } - }; - - // calling confirm channel publish function is storing the message in queue and registering the callback for broker confirm. - // span ends in the patched callback. - const markedContext = markConfirmChannelTracing(context.active()); - const argumentsCopy = [...arguments]; - argumentsCopy[3] = modifiedOptions; - argumentsCopy[4] = context.bind( - unmarkConfirmChannelTracing(trace.setSpan(markedContext, span)), - patchedOnConfirm, - ); - return context.with(markedContext, original.bind(this, ...argumentsCopy)); - }; - } - - private getPublishPatch(moduleVersion: string | undefined, original: Function) { - const self = this; - return function publish( - this: InstrumentationPublishChannel, - exchange: string, - routingKey: string, - content: Buffer, - options?: Options.Publish, - ): boolean { - if (isConfirmChannelTracing(context.active())) { - // work already done - return original.apply(this, arguments); - } else { - const channel = this; - const { span, modifiedOptions } = self.createPublishSpan(self, exchange, routingKey, channel, options); - - const { publishHook } = self.getConfig(); - if (publishHook) { - safeExecuteInTheMiddle( - () => - publishHook(span, { - moduleVersion, - exchange, - routingKey, - content, - options: modifiedOptions, - isConfirmChannel: false, - }), - e => { - if (e) { - diag.error('amqplib instrumentation: publishHook error', e); - } - }, - true, - ); - } - - // calling normal channel publish function is only storing the message in queue. - // it does not send it and waits for an ack, so the span duration is expected to be very short. - const argumentsCopy = [...arguments]; - argumentsCopy[3] = modifiedOptions; - const originalRes = original.apply(this, argumentsCopy as any); - span.end(); - return originalRes; - } - }; - } - - private createPublishSpan( - self: this, - exchange: string, - routingKey: string, - channel: InstrumentationPublishChannel, - options?: Options.Publish, - ) { - const normalizedExchange = normalizeExchange(exchange); - - const span = self.tracer.startSpan(`publish ${normalizedExchange}`, { - kind: SpanKind.PRODUCER, - attributes: { - ...channel.connection[CONNECTION_ATTRIBUTES], - [ATTR_MESSAGING_DESTINATION]: exchange, - [ATTR_MESSAGING_DESTINATION_KIND]: MESSAGING_DESTINATION_KIND_VALUE_TOPIC, - - [ATTR_MESSAGING_RABBITMQ_ROUTING_KEY]: routingKey, - [OLD_ATTR_MESSAGING_MESSAGE_ID]: options?.messageId, - [ATTR_MESSAGING_CONVERSATION_ID]: options?.correlationId, - }, - }); - const modifiedOptions = options ?? {}; - modifiedOptions.headers = modifiedOptions.headers ?? {}; - - propagation.inject(trace.setSpan(context.active(), span), modifiedOptions.headers); - - return { span, modifiedOptions }; - } - - private endConsumerSpan( - message: InstrumentationMessage, - isRejected: boolean | null, - operation: EndOperation, - requeue: boolean | undefined, - ) { - const storedSpan: Span | undefined = message[MESSAGE_STORED_SPAN]; - if (!storedSpan) return; - if (isRejected !== false) { - storedSpan.setStatus({ - code: SpanStatusCode.ERROR, - message: - operation !== EndOperation.ChannelClosed && operation !== EndOperation.ChannelError - ? `${operation} called on message${ - requeue === true ? ' with requeue' : requeue === false ? ' without requeue' : '' - }` - : operation, - }); - } - this.callConsumeEndHook(storedSpan, message, isRejected, operation); - storedSpan.end(); - message[MESSAGE_STORED_SPAN] = undefined; - } - - private endAllSpansOnChannel( - channel: InstrumentationConsumeChannel, - isRejected: boolean, - operation: EndOperation, - requeue: boolean | undefined, - ) { - const spansNotEnded: { msg: Message }[] = channel[CHANNEL_SPANS_NOT_ENDED] ?? []; - spansNotEnded.forEach(msgDetails => { - this.endConsumerSpan(msgDetails.msg, isRejected, operation, requeue); - }); - channel[CHANNEL_SPANS_NOT_ENDED] = []; - } - - private callConsumeEndHook( - span: Span, - msg: InstrumentationMessage, - rejected: boolean | null, - endOperation: EndOperation, - ) { - const { consumeEndHook } = this.getConfig(); - if (!consumeEndHook) return; - - safeExecuteInTheMiddle( - () => consumeEndHook(span, { msg, rejected, endOperation }), - e => { - if (e) { - diag.error('amqplib instrumentation: consumerEndHook error', e); - } - }, - true, - ); - } - - private checkConsumeTimeoutOnChannel(channel: InstrumentationConsumeChannel) { - const currentTime = timestampInSeconds(); - const spansNotEnded = channel[CHANNEL_SPANS_NOT_ENDED] ?? []; - let i: number; - const { consumeTimeoutMs } = this.getConfig(); - for (i = 0; i < spansNotEnded.length; i++) { - const currMessage = spansNotEnded[i]!; - const timeFromConsumeMs = (currentTime - currMessage.timeOfConsume) * 1000; - if (timeFromConsumeMs < consumeTimeoutMs!) { - break; - } - this.endConsumerSpan(currMessage.msg, null, EndOperation.InstrumentationTimeout, true); - } - spansNotEnded.splice(0, i); - } -} diff --git a/packages/node/src/integrations/tracing/amqplib/vendored/instrumentation.ts b/packages/node/src/integrations/tracing/amqplib/vendored/instrumentation.ts new file mode 100644 index 000000000000..e4a7d630e2f5 --- /dev/null +++ b/packages/node/src/integrations/tracing/amqplib/vendored/instrumentation.ts @@ -0,0 +1,143 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + * + * NOTICE from the Sentry authors: + * - Vendored from: https://github.com/open-telemetry/opentelemetry-js-contrib/tree/15ef7506553f631ea4181391e0c5725a56f0d082/packages/instrumentation-amqplib + * - Upstream version: @opentelemetry/instrumentation-amqplib@0.65.0 + * - Refactored to use Sentry's span APIs instead of OpenTelemetry tracing APIs; the channel/connection + * patches live in `patches.ts` and span creation in `utils.ts` + * - Dropped the instrumentation config and all hooks; origin is folded into span creation instead of + * `index.ts` hooks (see `patches.ts`/`utils.ts` for details) + */ + +import type { InstrumentationConfig } from '@opentelemetry/instrumentation'; +import { InstrumentationBase, InstrumentationNodeModuleDefinition, isWrapped } from '@opentelemetry/instrumentation'; +import { SDK_VERSION } from '@sentry/core'; +import { InstrumentationNodeModuleFile } from '../../InstrumentationNodeModuleFile'; +import { + getAckAllPatch, + getAckPatch, + getChannelEmitPatch, + getConfirmedPublishPatch, + getConnectPatch, + getConsumePatch, + getPublishPatch, +} from './patches'; +import { EndOperation } from './types'; + +const PACKAGE_NAME = '@sentry/instrumentation-amqplib'; +const supportedVersions = ['>=0.5.5 <2']; + +export class AmqplibInstrumentation extends InstrumentationBase { + public constructor(config: InstrumentationConfig = {}) { + super(PACKAGE_NAME, SDK_VERSION, config); + } + + protected init(): InstrumentationNodeModuleDefinition { + const channelModelModuleFile = new InstrumentationNodeModuleFile( + 'amqplib/lib/channel_model.js', + supportedVersions, + this.patchChannelModel.bind(this), + this.unpatchChannelModel.bind(this), + ); + + const callbackModelModuleFile = new InstrumentationNodeModuleFile( + 'amqplib/lib/callback_model.js', + supportedVersions, + this.patchChannelModel.bind(this), + this.unpatchChannelModel.bind(this), + ); + + const connectModuleFile = new InstrumentationNodeModuleFile( + 'amqplib/lib/connect.js', + supportedVersions, + this.patchConnect.bind(this), + this.unpatchConnect.bind(this), + ); + + const module = new InstrumentationNodeModuleDefinition('amqplib', supportedVersions, undefined, undefined, [ + channelModelModuleFile, + connectModuleFile, + callbackModelModuleFile, + ]); + return module; + } + + private patchConnect(moduleExports: any): any { + const unpatchedExports = this.unpatchConnect(moduleExports); + if (!isWrapped(unpatchedExports.connect)) { + this._wrap(unpatchedExports, 'connect', getConnectPatch); + } + return unpatchedExports; + } + + private unpatchConnect(moduleExports: any): any { + if (isWrapped(moduleExports.connect)) { + this._unwrap(moduleExports, 'connect'); + } + return moduleExports; + } + + private patchChannelModel(moduleExports: any): any { + if (!isWrapped(moduleExports.Channel.prototype.publish)) { + this._wrap(moduleExports.Channel.prototype, 'publish', getPublishPatch); + } + if (!isWrapped(moduleExports.Channel.prototype.consume)) { + this._wrap(moduleExports.Channel.prototype, 'consume', getConsumePatch); + } + if (!isWrapped(moduleExports.Channel.prototype.ack)) { + this._wrap(moduleExports.Channel.prototype, 'ack', getAckPatch(false, EndOperation.Ack)); + } + if (!isWrapped(moduleExports.Channel.prototype.nack)) { + this._wrap(moduleExports.Channel.prototype, 'nack', getAckPatch(true, EndOperation.Nack)); + } + if (!isWrapped(moduleExports.Channel.prototype.reject)) { + this._wrap(moduleExports.Channel.prototype, 'reject', getAckPatch(true, EndOperation.Reject)); + } + if (!isWrapped(moduleExports.Channel.prototype.ackAll)) { + this._wrap(moduleExports.Channel.prototype, 'ackAll', getAckAllPatch(false, EndOperation.AckAll)); + } + if (!isWrapped(moduleExports.Channel.prototype.nackAll)) { + this._wrap(moduleExports.Channel.prototype, 'nackAll', getAckAllPatch(true, EndOperation.NackAll)); + } + if (!isWrapped(moduleExports.Channel.prototype.emit)) { + this._wrap(moduleExports.Channel.prototype, 'emit', getChannelEmitPatch); + } + if (!isWrapped(moduleExports.ConfirmChannel.prototype.publish)) { + this._wrap(moduleExports.ConfirmChannel.prototype, 'publish', getConfirmedPublishPatch); + } + return moduleExports; + } + + private unpatchChannelModel(moduleExports: any): any { + if (isWrapped(moduleExports.Channel.prototype.publish)) { + this._unwrap(moduleExports.Channel.prototype, 'publish'); + } + if (isWrapped(moduleExports.Channel.prototype.consume)) { + this._unwrap(moduleExports.Channel.prototype, 'consume'); + } + if (isWrapped(moduleExports.Channel.prototype.ack)) { + this._unwrap(moduleExports.Channel.prototype, 'ack'); + } + if (isWrapped(moduleExports.Channel.prototype.nack)) { + this._unwrap(moduleExports.Channel.prototype, 'nack'); + } + if (isWrapped(moduleExports.Channel.prototype.reject)) { + this._unwrap(moduleExports.Channel.prototype, 'reject'); + } + if (isWrapped(moduleExports.Channel.prototype.ackAll)) { + this._unwrap(moduleExports.Channel.prototype, 'ackAll'); + } + if (isWrapped(moduleExports.Channel.prototype.nackAll)) { + this._unwrap(moduleExports.Channel.prototype, 'nackAll'); + } + if (isWrapped(moduleExports.Channel.prototype.emit)) { + this._unwrap(moduleExports.Channel.prototype, 'emit'); + } + if (isWrapped(moduleExports.ConfirmChannel.prototype.publish)) { + this._unwrap(moduleExports.ConfirmChannel.prototype, 'publish'); + } + return moduleExports; + } +} diff --git a/packages/node/src/integrations/tracing/amqplib/vendored/patches.ts b/packages/node/src/integrations/tracing/amqplib/vendored/patches.ts new file mode 100644 index 000000000000..cf1233f47bf7 --- /dev/null +++ b/packages/node/src/integrations/tracing/amqplib/vendored/patches.ts @@ -0,0 +1,299 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + * + * NOTICE from the Sentry authors: + * - Vendored from: https://github.com/open-telemetry/opentelemetry-js-contrib/tree/15ef7506553f631ea4181391e0c5725a56f0d082/packages/instrumentation-amqplib + * - Upstream version: @opentelemetry/instrumentation-amqplib@0.65.0 + * - The channel/connection patches were extracted from the instrumentation class into standalone factories + * and migrated to Sentry's span APIs; origin is folded into span creation instead of `index.ts` hooks + * - Cross-service trace propagation uses Sentry's `getTraceData`/`continueTrace` instead of the OTel propagator + * - Replaced the OTel context-key confirm-channel marker with a synchronous flag on the channel instance + * - Dropped the instrumentation config and all hooks (publish/publishConfirm/consume/consumeEnd) and the + * `useLinksForConsume` path; the SDK never used them + */ + +import { continueTrace, SPAN_STATUS_ERROR, timestampInSeconds, withActiveSpan } from '@sentry/core'; +import type { Connection, Options, Replies } from './amqplib-types'; +import { EndOperation, type ConsumeMessage, type Message } from './types'; +import type { + InstrumentationConnection, + InstrumentationConsumeChannel, + InstrumentationConsumeMessage, + InstrumentationMessage, + InstrumentationPublishChannel, +} from './utils'; +import { + CHANNEL_CONSUME_TIMEOUT_TIMER, + CHANNEL_IS_CONFIRM_PUBLISHING, + CHANNEL_SPANS_NOT_ENDED, + CONNECTION_ATTRIBUTES, + getConnectionAttributesFromServer, + getConnectionAttributesFromUrl, + getHeaderAsString, + MESSAGE_STORED_SPAN, + startConsumeSpan, + startPublishSpan, +} from './utils'; + +// To prevent reference leaks from un-acked messages, their spans are closed after this timeout. The +// upstream instrumentation exposed this as the `consumeTimeoutMs` option; the SDK always used the default. +const CONSUME_TIMEOUT_MS = 1000 * 60; // 1 minute + +function endConsumerSpan( + message: InstrumentationMessage, + isRejected: boolean | null, + operation: EndOperation, + requeue: boolean | undefined, +): void { + const storedSpan = message[MESSAGE_STORED_SPAN]; + if (!storedSpan) { + return; + } + if (isRejected !== false) { + storedSpan.setStatus({ + code: SPAN_STATUS_ERROR, + message: + operation !== EndOperation.ChannelClosed && operation !== EndOperation.ChannelError + ? `${operation} called on message${ + requeue === true ? ' with requeue' : requeue === false ? ' without requeue' : '' + }` + : operation, + }); + } + storedSpan.end(); + message[MESSAGE_STORED_SPAN] = undefined; +} + +function endAllSpansOnChannel( + channel: InstrumentationConsumeChannel, + isRejected: boolean, + operation: EndOperation, + requeue: boolean | undefined, +): void { + const spansNotEnded: { msg: Message }[] = channel[CHANNEL_SPANS_NOT_ENDED] ?? []; + spansNotEnded.forEach(msgDetails => { + endConsumerSpan(msgDetails.msg, isRejected, operation, requeue); + }); + channel[CHANNEL_SPANS_NOT_ENDED] = []; +} + +function checkConsumeTimeoutOnChannel(channel: InstrumentationConsumeChannel): void { + const currentTime = timestampInSeconds(); + const spansNotEnded = channel[CHANNEL_SPANS_NOT_ENDED] ?? []; + let i: number; + for (i = 0; i < spansNotEnded.length; i++) { + const currMessage = spansNotEnded[i]!; + const timeFromConsumeMs = (currentTime - currMessage.timeOfConsume) * 1000; + if (timeFromConsumeMs < CONSUME_TIMEOUT_MS) { + break; + } + endConsumerSpan(currMessage.msg, null, EndOperation.InstrumentationTimeout, true); + } + spansNotEnded.splice(0, i); +} + +export function getConnectPatch( + original: ( + url: string | Options.Connect, + socketOptions: any, + openCallback: (err: any, connection: Connection) => void, + ) => Connection, +) { + return function patchedConnect( + this: unknown, + url: string | Options.Connect, + socketOptions: any, + openCallback: Function, + ): Connection { + return original.call(this, url, socketOptions, function (this: unknown, err: any, conn: InstrumentationConnection) { + if (err == null) { + const urlAttributes = getConnectionAttributesFromUrl(url); + const serverAttributes = getConnectionAttributesFromServer(conn); + conn[CONNECTION_ATTRIBUTES] = { + ...urlAttributes, + ...serverAttributes, + }; + } + openCallback.apply(this, arguments); + }); + }; +} + +export function getChannelEmitPatch(original: Function) { + return function emit(this: InstrumentationConsumeChannel, eventName: string): void { + if (eventName === 'close') { + endAllSpansOnChannel(this, true, EndOperation.ChannelClosed, undefined); + const activeTimer = this[CHANNEL_CONSUME_TIMEOUT_TIMER]; + if (activeTimer) { + clearInterval(activeTimer); + } + this[CHANNEL_CONSUME_TIMEOUT_TIMER] = undefined; + } else if (eventName === 'error') { + endAllSpansOnChannel(this, true, EndOperation.ChannelError, undefined); + } + return original.apply(this, arguments); + }; +} + +export function getAckAllPatch(isRejected: boolean, endOperation: EndOperation) { + return (original: Function) => + function ackAll(this: InstrumentationConsumeChannel, requeueOrEmpty?: boolean): void { + endAllSpansOnChannel(this, isRejected, endOperation, requeueOrEmpty); + return original.apply(this, arguments); + }; +} + +export function getAckPatch(isRejected: boolean, endOperation: EndOperation) { + return (original: Function) => + function ack( + this: InstrumentationConsumeChannel, + message: Message, + allUpToOrRequeue?: boolean, + requeue?: boolean, + ): void { + const channel = this; + // we use this patch in the reject function as well, but it has a different signature + const requeueResolved = endOperation === EndOperation.Reject ? allUpToOrRequeue : requeue; + + const spansNotEnded: { msg: Message }[] = channel[CHANNEL_SPANS_NOT_ENDED] ?? []; + const msgIndex = spansNotEnded.findIndex(msgDetails => msgDetails.msg === message); + if (msgIndex < 0) { + // should not happen in the happy flow, but possible if the user calls ack twice with the same message + endConsumerSpan(message, isRejected, endOperation, requeueResolved); + } else if (endOperation !== EndOperation.Reject && allUpToOrRequeue) { + for (let i = 0; i <= msgIndex; i++) { + endConsumerSpan(spansNotEnded[i]!.msg, isRejected, endOperation, requeueResolved); + } + spansNotEnded.splice(0, msgIndex + 1); + } else { + endConsumerSpan(message, isRejected, endOperation, requeueResolved); + spansNotEnded.splice(msgIndex, 1); + } + return original.apply(this, arguments); + }; +} + +export function getConsumePatch(original: Function) { + return function consume( + this: InstrumentationConsumeChannel, + queue: string, + onMessage: (msg: ConsumeMessage | null) => void, + options?: Options.Consume, + ): Promise { + const channel = this; + if (!Object.prototype.hasOwnProperty.call(channel, CHANNEL_SPANS_NOT_ENDED)) { + const timer = setInterval(() => { + checkConsumeTimeoutOnChannel(channel); + }, CONSUME_TIMEOUT_MS); + timer.unref(); + channel[CHANNEL_CONSUME_TIMEOUT_TIMER] = timer; + channel[CHANNEL_SPANS_NOT_ENDED] = []; + } + + const patchedOnMessage = function (this: unknown, msg: InstrumentationConsumeMessage | null): void { + // msg is expected to be null for a consumer cancel notification + // https://www.rabbitmq.com/consumer-cancel.html + // in this case, we do not start a span, as this is not a real message. + if (!msg) { + return onMessage.call(this, msg); + } + + const headers = msg.properties.headers ?? {}; + const sentryTrace = getHeaderAsString(headers, 'sentry-trace'); + const baggage = getHeaderAsString(headers, 'baggage'); + + // Continue the producer's trace so the consumer span is parented to the message's producer. + continueTrace({ sentryTrace, baggage }, () => { + const span = startConsumeSpan(queue, msg, channel); + + if (!options?.noAck) { + // store the message on the channel so we can close the span on ackAll etc + channel[CHANNEL_SPANS_NOT_ENDED]!.push({ msg, timeOfConsume: timestampInSeconds() }); + // store the span on the message so we can end it when the user calls 'ack' on it + msg[MESSAGE_STORED_SPAN] = span; + } + withActiveSpan(span, () => { + onMessage.call(this, msg); + }); + + if (options?.noAck) { + span.end(); + } + }); + }; + + // Copy `arguments` instead of mutating it: in CJS builds it's aliased to the named parameters, + // so `arguments[1] = ...` would also reassign `onMessage` to `patchedOnMessage`, making the wrapper + // call itself and recurse infinitely. + const callArgs = Array.prototype.slice.call(arguments); + callArgs[1] = patchedOnMessage; + return original.apply(this, callArgs); + }; +} + +export function getConfirmedPublishPatch(original: Function) { + return function confirmedPublish( + this: InstrumentationPublishChannel, + exchange: string, + routingKey: string, + content: Buffer, + options?: Options.Publish, + callback?: (err: any, ok: Replies.Empty) => void, + ): boolean { + const channel = this; + const { span, modifiedOptions } = startPublishSpan(exchange, routingKey, channel, options); + + const patchedOnConfirm = function (this: unknown, err: any, ok: Replies.Empty): void { + try { + withActiveSpan(span, () => { + callback?.call(this, err, ok); + }); + } finally { + if (err) { + span.setStatus({ code: SPAN_STATUS_ERROR, message: "message confirmation has been nack'ed" }); + } + span.end(); + } + }; + + // The confirm channel publish stores the message and registers a broker-confirm callback; the span + // ends in that callback. The confirm publish internally delegates to the base channel publish, so we + // flag the channel to stop the base publish patch from creating a second span. The inner call is + // synchronous, so a flag on the instance is enough and avoids the OTel context machinery. + const argumentsCopy = [...arguments]; + argumentsCopy[3] = modifiedOptions; + argumentsCopy[4] = patchedOnConfirm; + channel[CHANNEL_IS_CONFIRM_PUBLISHING] = true; + try { + return original.apply(this, argumentsCopy); + } finally { + channel[CHANNEL_IS_CONFIRM_PUBLISHING] = false; + } + }; +} + +export function getPublishPatch(original: Function) { + return function publish( + this: InstrumentationPublishChannel, + exchange: string, + routingKey: string, + content: Buffer, + options?: Options.Publish, + ): boolean { + if (this[CHANNEL_IS_CONFIRM_PUBLISHING]) { + // already instrumented by the confirm-channel publish patch + return original.apply(this, arguments); + } + const channel = this; + const { span, modifiedOptions } = startPublishSpan(exchange, routingKey, channel, options); + + // calling the normal channel publish function only stores the message in the queue; it does not send + // it and wait for an ack, so the span duration is expected to be very short. + const argumentsCopy = [...arguments]; + argumentsCopy[3] = modifiedOptions; + const originalRes = original.apply(this, argumentsCopy); + span.end(); + return originalRes; + }; +} diff --git a/packages/node/src/integrations/tracing/amqplib/vendored/semconv-obsolete.ts b/packages/node/src/integrations/tracing/amqplib/vendored/semconv-obsolete.ts deleted file mode 100644 index 9c9a10e68456..000000000000 --- a/packages/node/src/integrations/tracing/amqplib/vendored/semconv-obsolete.ts +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - * - * NOTICE from the Sentry authors: - * - Vendored from: https://github.com/open-telemetry/opentelemetry-js-contrib/tree/15ef7506553f631ea4181391e0c5725a56f0d082/packages/instrumentation-amqplib - * - Upstream version: @opentelemetry/instrumentation-amqplib@0.65.0 - */ -/* eslint-disable */ - -/* - * This file contains constants for values that where replaced/removed from - * Semantic Conventions long enough ago that they do not have `ATTR_*` - * constants in the `@opentelemetry/semantic-conventions` package. Eventually - * it is expected that this instrumention will be updated to emit telemetry - * using modern Semantic Conventions, dropping the need for the constants in - * this file. - */ - -/** - * The message destination name. This might be equal to the span name but is required nevertheless. - * - * @deprecated Use ATTR_MESSAGING_DESTINATION_NAME in [incubating entry-point]({@link https://github.com/open-telemetry/opentelemetry-js/blob/main/semantic-conventions/README.md#unstable-semconv}). - */ -export const ATTR_MESSAGING_DESTINATION = 'messaging.destination' as const; - -/** - * The kind of message destination. - * - * @deprecated Removed in semconv v1.20.0. - */ -export const ATTR_MESSAGING_DESTINATION_KIND = 'messaging.destination_kind' as const; - -/** - * RabbitMQ message routing key. - * - * @deprecated Use ATTR_MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY in [incubating entry-point]({@link https://github.com/open-telemetry/opentelemetry-js/blob/main/semantic-conventions/README.md#unstable-semconv}). - */ -export const ATTR_MESSAGING_RABBITMQ_ROUTING_KEY = 'messaging.rabbitmq.routing_key' as const; - -/** - * A string identifying the kind of message consumption as defined in the [Operation names](#operation-names) section above. If the operation is "send", this attribute MUST NOT be set, since the operation can be inferred from the span kind in that case. - * - * @deprecated Use MESSAGING_OPERATION_TYPE_VALUE_PROCESS in [incubating entry-point]({@link https://github.com/open-telemetry/opentelemetry-js/blob/main/semantic-conventions/README.md#unstable-semconv}). - */ -export const MESSAGING_OPERATION_VALUE_PROCESS = 'process' as const; - -/** - * The name of the transport protocol. - * - * @deprecated Use ATTR_NETWORK_PROTOCOL_NAME. - */ -export const ATTR_MESSAGING_PROTOCOL = 'messaging.protocol' as const; - -/** - * The version of the transport protocol. - * - * @deprecated Use ATTR_NETWORK_PROTOCOL_VERSION. - */ -export const ATTR_MESSAGING_PROTOCOL_VERSION = 'messaging.protocol_version' as const; - -/** - * Connection string. - * - * @deprecated Removed in semconv v1.17.0. - */ -export const ATTR_MESSAGING_URL = 'messaging.url' as const; - -/** - * The kind of message destination. - * - * @deprecated Removed in semconv v1.20.0. - */ -export const MESSAGING_DESTINATION_KIND_VALUE_TOPIC = 'topic' as const; - -/** - * A value used by the messaging system as an identifier for the message, represented as a string. - * - * @deprecated Use ATTR_MESSAGING_MESSAGE_ID in [incubating entry-point]({@link https://github.com/open-telemetry/opentelemetry-js/blob/main/semantic-conventions/README.md#unstable-semconv}). - * - * Note: changing to `ATTR_MESSAGING_MESSAGE_ID` means a change in value from `messaging.message_id` to `messaging.message.id`. - */ -export const OLD_ATTR_MESSAGING_MESSAGE_ID = 'messaging.message_id' as const; - -/** - * The [conversation ID](#conversations) identifying the conversation to which the message belongs, represented as a string. Sometimes called "Correlation ID". - * - * @deprecated Use ATTR_MESSAGING_MESSAGE_CONVERSATION_ID in [incubating entry-point]({@link https://github.com/open-telemetry/opentelemetry-js/blob/main/semantic-conventions/README.md#unstable-semconv}). - */ -export const ATTR_MESSAGING_CONVERSATION_ID = 'messaging.conversation_id' as const; diff --git a/packages/node/src/integrations/tracing/amqplib/vendored/semconv.ts b/packages/node/src/integrations/tracing/amqplib/vendored/semconv.ts index e72194c62ad3..32248dfbf215 100644 --- a/packages/node/src/integrations/tracing/amqplib/vendored/semconv.ts +++ b/packages/node/src/integrations/tracing/amqplib/vendored/semconv.ts @@ -5,55 +5,50 @@ * NOTICE from the Sentry authors: * - Vendored from: https://github.com/open-telemetry/opentelemetry-js-contrib/tree/15ef7506553f631ea4181391e0c5725a56f0d082/packages/instrumentation-amqplib * - Upstream version: @opentelemetry/instrumentation-amqplib@0.65.0 + * - Merged the upstream `semconv.ts` and `semconv-obsolete.ts` into a single file containing only the + * constants this instrumentation emits. These mirror the (now legacy) messaging semantic conventions + * that the SDK has always emitted; the `@deprecated` annotations were dropped since these vendored + * copies are intentionally the chosen output and we don't want to flag every usage site. */ -/* eslint-disable */ -/* - * This file contains a copy of unstable semantic convention definitions - * used by this package. - * @see https://github.com/open-telemetry/opentelemetry-js/tree/main/semantic-conventions#unstable-semconv - */ +/** The messaging system as identified by the client instrumentation, e.g. `rabbitmq`. */ +export const ATTR_MESSAGING_SYSTEM = 'messaging.system' as const; -/** - * Deprecated, use `messaging.operation.type` instead. - * - * @example publish - * @example create - * @example process - * - * @experimental This attribute is experimental and is subject to breaking changes in minor releases of `@opentelemetry/semantic-conventions`. - * - * @deprecated Replaced by `messaging.operation.type`. - */ +/** A string identifying the kind of message consumption. */ export const ATTR_MESSAGING_OPERATION = 'messaging.operation' as const; -/** - * The messaging system as identified by the client instrumentation. - * - * @note The actual messaging system may differ from the one known by the client. For example, when using Kafka client libraries to communicate with Azure Event Hubs, the `messaging.system` is set to `kafka` based on the instrumentation's best knowledge. - * - * @experimental This attribute is experimental and is subject to breaking changes in minor releases of `@opentelemetry/semantic-conventions`. - */ -export const ATTR_MESSAGING_SYSTEM = 'messaging.system' as const; - -/** - * Deprecated, use `server.address` on client spans and `client.address` on server spans. - * - * @example example.com - * - * @experimental This attribute is experimental and is subject to breaking changes in minor releases of `@opentelemetry/semantic-conventions`. - * - * @deprecated Replaced by `server.address` on client spans and `client.address` on server spans. - */ +/** Host name of the message broker. */ export const ATTR_NET_PEER_NAME = 'net.peer.name' as const; -/** - * Deprecated, use `server.port` on client spans and `client.port` on server spans. - * - * @example 8080 - * - * @experimental This attribute is experimental and is subject to breaking changes in minor releases of `@opentelemetry/semantic-conventions`. - * - * @deprecated Replaced by `server.port` on client spans and `client.port` on server spans. - */ +/** Port of the message broker. */ export const ATTR_NET_PEER_PORT = 'net.peer.port' as const; + +/** The message destination name (the exchange for amqplib). */ +export const ATTR_MESSAGING_DESTINATION = 'messaging.destination' as const; + +/** The kind of message destination. */ +export const ATTR_MESSAGING_DESTINATION_KIND = 'messaging.destination_kind' as const; + +/** RabbitMQ message routing key. */ +export const ATTR_MESSAGING_RABBITMQ_ROUTING_KEY = 'messaging.rabbitmq.routing_key' as const; + +/** The name of the transport protocol. */ +export const ATTR_MESSAGING_PROTOCOL = 'messaging.protocol' as const; + +/** The version of the transport protocol. */ +export const ATTR_MESSAGING_PROTOCOL_VERSION = 'messaging.protocol_version' as const; + +/** Connection string. */ +export const ATTR_MESSAGING_URL = 'messaging.url' as const; + +/** A value used by the messaging system as an identifier for the message, represented as a string. */ +export const OLD_ATTR_MESSAGING_MESSAGE_ID = 'messaging.message_id' as const; + +/** The conversation ID (a.k.a. correlation ID) identifying the conversation the message belongs to. */ +export const ATTR_MESSAGING_CONVERSATION_ID = 'messaging.conversation_id' as const; + +/** Value for `messaging.destination_kind` when the destination is a topic. */ +export const MESSAGING_DESTINATION_KIND_VALUE_TOPIC = 'topic' as const; + +/** Value for `messaging.operation` when the message is being processed by a consumer. */ +export const MESSAGING_OPERATION_VALUE_PROCESS = 'process' as const; diff --git a/packages/node/src/integrations/tracing/amqplib/vendored/types.ts b/packages/node/src/integrations/tracing/amqplib/vendored/types.ts index 228ada2239fc..1b444d6723c1 100644 --- a/packages/node/src/integrations/tracing/amqplib/vendored/types.ts +++ b/packages/node/src/integrations/tracing/amqplib/vendored/types.ts @@ -5,51 +5,9 @@ * NOTICE from the Sentry authors: * - Vendored from: https://github.com/open-telemetry/opentelemetry-js-contrib/tree/15ef7506553f631ea4181391e0c5725a56f0d082/packages/instrumentation-amqplib * - Upstream version: @opentelemetry/instrumentation-amqplib@0.65.0 + * - Some types vendored from @types/amqplib with simplifications + * - Dropped the instrumentation config and all hooks; the SDK folds origin into span creation instead */ -/* eslint-disable */ - -import { Span } from '@opentelemetry/api'; -import { InstrumentationConfig } from '@opentelemetry/instrumentation'; - -export interface PublishInfo { - moduleVersion: string | undefined; - exchange: string; - routingKey: string; - content: Buffer; - options?: AmqplibPublishOptions; - isConfirmChannel?: boolean; -} - -export interface PublishConfirmedInfo extends PublishInfo { - confirmError?: any; -} - -export interface ConsumeInfo { - moduleVersion: string | undefined; - msg: ConsumeMessage; -} - -export interface ConsumeEndInfo { - msg: ConsumeMessage; - rejected: boolean | null; - endOperation: EndOperation; -} - -export interface AmqplibPublishCustomAttributeFunction { - (span: Span, publishInfo: PublishInfo): void; -} - -export interface AmqplibPublishConfirmCustomAttributeFunction { - (span: Span, publishConfirmedInto: PublishConfirmedInfo): void; -} - -export interface AmqplibConsumeCustomAttributeFunction { - (span: Span, consumeInfo: ConsumeInfo): void; -} - -export interface AmqplibConsumeEndCustomAttributeFunction { - (span: Span, consumeEndInfo: ConsumeEndInfo): void; -} export enum EndOperation { AutoAck = 'auto ack', @@ -63,71 +21,8 @@ export enum EndOperation { InstrumentationTimeout = 'instrumentation timeout', } -export interface AmqplibInstrumentationConfig extends InstrumentationConfig { - /** hook for adding custom attributes before publish message is sent */ - publishHook?: AmqplibPublishCustomAttributeFunction; - - /** hook for adding custom attributes after publish message is confirmed by the broker */ - publishConfirmHook?: AmqplibPublishConfirmCustomAttributeFunction; - - /** hook for adding custom attributes before consumer message is processed */ - consumeHook?: AmqplibConsumeCustomAttributeFunction; - - /** hook for adding custom attributes after consumer message is acked to server */ - consumeEndHook?: AmqplibConsumeEndCustomAttributeFunction; - - /** - * When user is setting up consume callback, it is user's responsibility to call - * ack/nack etc on the msg to resolve it in the server. - * If user is not calling the ack, the message will stay in the queue until - * channel is closed, or until server timeout expires (if configured). - * While we wait for the ack, a reference to the message is stored in plugin, which - * will never be garbage collected. - * To prevent memory leak, plugin has it's own configuration of timeout, which - * will close the span if user did not call ack after this timeout. - * If timeout is not big enough, span might be closed with 'InstrumentationTimeout', - * and then received valid ack from the user later which will not be instrumented. - * - * Default is 1 minute - */ - consumeTimeoutMs?: number; - - /** option to use a span link for the consume message instead of continuing a trace */ - useLinksForConsume?: boolean; -} - -export const DEFAULT_CONFIG: AmqplibInstrumentationConfig = { - consumeTimeoutMs: 1000 * 60, // 1 minute - useLinksForConsume: false, -}; - // The following types are vendored from `@types/amqplib@0.10.1` - commit SHA: 4205e03127692a40b4871709a7134fe4e2ed5510 -// Vendored from: https://github.com/DefinitelyTyped/DefinitelyTyped/blob/4205e03127692a40b4871709a7134fe4e2ed5510/types/amqplib/properties.d.ts#L108 -// This exists in `@types/amqplib` as `Options.Publish`. We're renaming things -// here to avoid importing the whole Options namespace. -export interface AmqplibPublishOptions { - expiration?: string | number; - userId?: string; - CC?: string | string[]; - - mandatory?: boolean; - persistent?: boolean; - deliveryMode?: boolean | number; - BCC?: string | string[]; - - contentType?: string; - contentEncoding?: string; - headers?: any; - priority?: number; - correlationId?: string; - replyTo?: string; - messageId?: string; - timestamp?: number; - type?: string; - appId?: string; -} - // Vendored from: https://github.com/DefinitelyTyped/DefinitelyTyped/blob/4205e03127692a40b4871709a7134fe4e2ed5510/types/amqplib/properties.d.ts#L142 export interface Message { content: Buffer; diff --git a/packages/node/src/integrations/tracing/amqplib/vendored/utils.ts b/packages/node/src/integrations/tracing/amqplib/vendored/utils.ts index 8817b5750382..6edf577d03d8 100644 --- a/packages/node/src/integrations/tracing/amqplib/vendored/utils.ts +++ b/packages/node/src/integrations/tracing/amqplib/vendored/utils.ts @@ -6,29 +6,51 @@ * - Vendored from: https://github.com/open-telemetry/opentelemetry-js-contrib/tree/15ef7506553f631ea4181391e0c5725a56f0d082/packages/instrumentation-amqplib * - Upstream version: @opentelemetry/instrumentation-amqplib@0.65.0 * - Some types vendored from @types/amqplib with simplifications + * - Span creation extracted here and migrated to the @sentry/core API; origin folded into span creation + * - Cross-service trace propagation uses Sentry's `getTraceData` instead of the OTel propagator + * - Dropped the env-gated stable-semconv dual emission; only the (default) old semantic conventions are emitted + * - Replaced the OTel context-key confirm-channel marker with a synchronous flag on the channel instance */ -/* eslint-disable */ - -import { Context, createContextKey, diag, Span, Attributes, AttributeValue } from '@opentelemetry/api'; -import { SemconvStability } from '@opentelemetry/instrumentation'; -import { ATTR_SERVER_ADDRESS, ATTR_SERVER_PORT } from '@opentelemetry/semantic-conventions'; -import { ATTR_MESSAGING_SYSTEM, ATTR_NET_PEER_NAME, ATTR_NET_PEER_PORT } from './semconv'; -import { ATTR_MESSAGING_PROTOCOL, ATTR_MESSAGING_PROTOCOL_VERSION, ATTR_MESSAGING_URL } from './semconv-obsolete'; -import type { Connection, Channel, ConfirmChannel, Options } from './amqplib-types'; + +import { SpanKind } from '@opentelemetry/api'; +import type { Span, SpanAttributes } from '@sentry/core'; +import { getTraceData, SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN, startInactiveSpan } from '@sentry/core'; +import type { Channel, ConfirmChannel, Connection, Options } from './amqplib-types'; +import { + ATTR_MESSAGING_CONVERSATION_ID, + ATTR_MESSAGING_DESTINATION, + ATTR_MESSAGING_DESTINATION_KIND, + ATTR_MESSAGING_OPERATION, + ATTR_MESSAGING_PROTOCOL, + ATTR_MESSAGING_PROTOCOL_VERSION, + ATTR_MESSAGING_RABBITMQ_ROUTING_KEY, + ATTR_MESSAGING_SYSTEM, + ATTR_MESSAGING_URL, + ATTR_NET_PEER_NAME, + ATTR_NET_PEER_PORT, + MESSAGING_DESTINATION_KIND_VALUE_TOPIC, + MESSAGING_OPERATION_VALUE_PROCESS, + OLD_ATTR_MESSAGING_MESSAGE_ID, +} from './semconv'; import type { ConsumeMessage, Message } from './types'; +const PUBLISHER_ORIGIN = 'auto.amqplib.otel.publisher'; +const CONSUMER_ORIGIN = 'auto.amqplib.otel.consumer'; + export const MESSAGE_STORED_SPAN: unique symbol = Symbol('opentelemetry.amqplib.message.stored-span'); export const CHANNEL_SPANS_NOT_ENDED: unique symbol = Symbol('opentelemetry.amqplib.channel.spans-not-ended'); export const CHANNEL_CONSUME_TIMEOUT_TIMER: unique symbol = Symbol( 'opentelemetry.amqplib.channel.consumer-timeout-timer', ); export const CONNECTION_ATTRIBUTES: unique symbol = Symbol('opentelemetry.amqplib.connection.attributes'); +export const CHANNEL_IS_CONFIRM_PUBLISHING: unique symbol = Symbol('sentry.amqplib.channel.is-confirm-publishing'); export type InstrumentationConnection = Connection & { - [CONNECTION_ATTRIBUTES]?: Attributes; + [CONNECTION_ATTRIBUTES]?: SpanAttributes; }; export type InstrumentationPublishChannel = (Channel | ConfirmChannel) & { connection: InstrumentationConnection; + [CHANNEL_IS_CONFIRM_PUBLISHING]?: boolean; }; export type InstrumentationConsumeChannel = Channel & { connection: InstrumentationConnection; @@ -45,9 +67,7 @@ export type InstrumentationConsumeMessage = ConsumeMessage & { [MESSAGE_STORED_SPAN]?: Span; }; -const IS_CONFIRM_CHANNEL_CONTEXT_KEY: symbol = createContextKey('opentelemetry.amqplib.channel.is-confirm-channel'); - -export const normalizeExchange = (exchangeName: string) => (exchangeName !== '' ? exchangeName : ''); +export const normalizeExchange = (exchangeName: string): string => (exchangeName !== '' ? exchangeName : ''); const censorPassword = (url: string): string => { return url.replace(/:[^:@/]*@/, ':***@'); @@ -55,43 +75,27 @@ const censorPassword = (url: string): string => { const getPort = (portFromUrl: number | undefined, resolvedProtocol: string): number => { // we are using the resolved protocol which is upper case - // this code mimic the behavior of the amqplib which is used to set connection params + // this code mimics the behavior of amqplib which is used to set connection params return portFromUrl || (resolvedProtocol === 'AMQP' ? 5672 : 5671); }; const getProtocol = (protocolFromUrl: string | undefined): string => { const resolvedProtocol = protocolFromUrl || 'amqp'; - // the substring removed the ':' part of the protocol ('amqp:' -> 'amqp') + // the substring removes the ':' part of the protocol ('amqp:' -> 'amqp') const noEndingColon = resolvedProtocol.endsWith(':') ? resolvedProtocol.substring(0, resolvedProtocol.length - 1) : resolvedProtocol; - // upper cases to match spec + // upper case to match spec return noEndingColon.toUpperCase(); }; const getHostname = (hostnameFromUrl: string | undefined): string => { - // if user supplies empty hostname, it gets forwarded to 'net' package which default it to localhost. + // if user supplies empty hostname, it gets forwarded to 'net' package which defaults it to localhost. // https://nodejs.org/docs/latest-v12.x/api/net.html#net_socket_connect_options_connectlistener return hostnameFromUrl || 'localhost'; }; -const extractConnectionAttributeOrLog = ( - url: string | Options.Connect, - attributeKey: string, - attributeValue: AttributeValue, - nameForLog: string, -): Attributes => { - if (attributeValue) { - return { [attributeKey]: attributeValue }; - } else { - diag.error(`amqplib instrumentation: could not extract connection attribute ${nameForLog} from user supplied url`, { - url, - }); - return {}; - } -}; - -export const getConnectionAttributesFromServer = (conn: Connection): Attributes => { +export const getConnectionAttributesFromServer = (conn: Connection): SpanAttributes => { const product = conn.serverProperties.product?.toLowerCase?.(); if (product) { return { @@ -102,90 +106,100 @@ export const getConnectionAttributesFromServer = (conn: Connection): Attributes } }; -export const getConnectionAttributesFromUrl = ( - url: string | Options.Connect, - netSemconvStability: SemconvStability, -): Attributes => { - const attributes: Attributes = { +export const getConnectionAttributesFromUrl = (url: string | Options.Connect): SpanAttributes => { + const attributes: SpanAttributes = { [ATTR_MESSAGING_PROTOCOL_VERSION]: '0.9.1', // this is the only protocol supported by the instrumented library }; - url = url || 'amqp://localhost'; - if (typeof url === 'object') { - const connectOptions = url as Options.Connect; + const resolvedUrl = url || 'amqp://localhost'; + if (typeof resolvedUrl === 'object') { + const connectOptions = resolvedUrl; const protocol = getProtocol(connectOptions?.protocol); - Object.assign(attributes, { - ...extractConnectionAttributeOrLog(url, ATTR_MESSAGING_PROTOCOL, protocol, 'protocol'), - }); - - const hostname = getHostname(connectOptions?.hostname); - if (netSemconvStability & SemconvStability.OLD) { - Object.assign(attributes, { - ...extractConnectionAttributeOrLog(url, ATTR_NET_PEER_NAME, hostname, 'hostname'), - }); - } - if (netSemconvStability & SemconvStability.STABLE) { - Object.assign(attributes, { - ...extractConnectionAttributeOrLog(url, ATTR_SERVER_ADDRESS, hostname, 'hostname'), - }); - } - - const port = getPort(connectOptions.port, protocol); - if (netSemconvStability & SemconvStability.OLD) { - Object.assign(attributes, extractConnectionAttributeOrLog(url, ATTR_NET_PEER_PORT, port, 'port')); - } - if (netSemconvStability & SemconvStability.STABLE) { - Object.assign(attributes, extractConnectionAttributeOrLog(url, ATTR_SERVER_PORT, port, 'port')); - } + attributes[ATTR_MESSAGING_PROTOCOL] = protocol; + attributes[ATTR_NET_PEER_NAME] = getHostname(connectOptions?.hostname); + attributes[ATTR_NET_PEER_PORT] = getPort(connectOptions.port, protocol); } else { - const censoredUrl = censorPassword(url); + const censoredUrl = censorPassword(resolvedUrl); attributes[ATTR_MESSAGING_URL] = censoredUrl; try { const urlParts = new URL(censoredUrl); const protocol = getProtocol(urlParts.protocol); - Object.assign(attributes, { - ...extractConnectionAttributeOrLog(censoredUrl, ATTR_MESSAGING_PROTOCOL, protocol, 'protocol'), - }); - - const hostname = getHostname(urlParts.hostname); - if (netSemconvStability & SemconvStability.OLD) { - Object.assign(attributes, { - ...extractConnectionAttributeOrLog(censoredUrl, ATTR_NET_PEER_NAME, hostname, 'hostname'), - }); - } - if (netSemconvStability & SemconvStability.STABLE) { - Object.assign(attributes, { - ...extractConnectionAttributeOrLog(censoredUrl, ATTR_SERVER_ADDRESS, hostname, 'hostname'), - }); - } - - const port = getPort(urlParts.port ? parseInt(urlParts.port) : undefined, protocol); - if (netSemconvStability & SemconvStability.OLD) { - Object.assign(attributes, extractConnectionAttributeOrLog(censoredUrl, ATTR_NET_PEER_PORT, port, 'port')); - } - if (netSemconvStability & SemconvStability.STABLE) { - Object.assign(attributes, extractConnectionAttributeOrLog(censoredUrl, ATTR_SERVER_PORT, port, 'port')); - } - } catch (err) { - diag.error('amqplib instrumentation: error while extracting connection details from connection url', { - censoredUrl, - err, - }); + attributes[ATTR_MESSAGING_PROTOCOL] = protocol; + attributes[ATTR_NET_PEER_NAME] = getHostname(urlParts.hostname); + attributes[ATTR_NET_PEER_PORT] = getPort(urlParts.port ? parseInt(urlParts.port) : undefined, protocol); + } catch { + // best-effort: a malformed url simply yields fewer connection attributes } } return attributes; }; -export const markConfirmChannelTracing = (context: Context) => { - return context.setValue(IS_CONFIRM_CHANNEL_CONTEXT_KEY, true); -}; - -export const unmarkConfirmChannelTracing = (context: Context) => { - return context.deleteValue(IS_CONFIRM_CHANNEL_CONTEXT_KEY); -}; +/** Reads a propagation header value off an amqplib message as a string. */ +export function getHeaderAsString(headers: Record | undefined, key: string): string | undefined { + const value = headers?.[key]; + if (value == null) { + return undefined; + } + return Array.isArray(value) ? String(value[0]) : String(value); +} + +/** Starts an inactive producer span and propagates its trace into the publish `options.headers`. */ +export function startPublishSpan( + exchange: string, + routingKey: string, + channel: InstrumentationPublishChannel, + options?: Options.Publish, +): { span: Span; modifiedOptions: Options.Publish } { + const normalizedExchange = normalizeExchange(exchange); + + const span = startInactiveSpan({ + name: `publish ${normalizedExchange}`, + kind: SpanKind.PRODUCER, + attributes: { + ...channel.connection[CONNECTION_ATTRIBUTES], + [ATTR_MESSAGING_DESTINATION]: exchange, + [ATTR_MESSAGING_DESTINATION_KIND]: MESSAGING_DESTINATION_KIND_VALUE_TOPIC, + [ATTR_MESSAGING_RABBITMQ_ROUTING_KEY]: routingKey, + [OLD_ATTR_MESSAGING_MESSAGE_ID]: options?.messageId, + [ATTR_MESSAGING_CONVERSATION_ID]: options?.correlationId, + [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: PUBLISHER_ORIGIN, + }, + }); + + const modifiedOptions = options ?? {}; + modifiedOptions.headers = modifiedOptions.headers ?? {}; + + const traceData = getTraceData({ span }); + if (traceData['sentry-trace']) { + modifiedOptions.headers['sentry-trace'] = traceData['sentry-trace']; + } + if (traceData.baggage) { + modifiedOptions.headers['baggage'] = traceData.baggage; + } -export const isConfirmChannelTracing = (context: Context) => { - return context.getValue(IS_CONFIRM_CHANNEL_CONTEXT_KEY) === true; -}; + return { span, modifiedOptions }; +} + +/** Starts an inactive consumer (process) span carrying the amqplib messaging attributes. */ +export function startConsumeSpan( + queue: string, + msg: InstrumentationConsumeMessage, + channel: InstrumentationConsumeChannel, +): Span { + return startInactiveSpan({ + name: `${queue} process`, + kind: SpanKind.CONSUMER, + attributes: { + ...channel?.connection?.[CONNECTION_ATTRIBUTES], + [ATTR_MESSAGING_DESTINATION]: msg.fields?.exchange, + [ATTR_MESSAGING_DESTINATION_KIND]: MESSAGING_DESTINATION_KIND_VALUE_TOPIC, + [ATTR_MESSAGING_RABBITMQ_ROUTING_KEY]: msg.fields?.routingKey, + [ATTR_MESSAGING_OPERATION]: MESSAGING_OPERATION_VALUE_PROCESS, + [OLD_ATTR_MESSAGING_MESSAGE_ID]: msg?.properties.messageId, + [ATTR_MESSAGING_CONVERSATION_ID]: msg?.properties.correlationId, + [SEMANTIC_ATTRIBUTE_SENTRY_ORIGIN]: CONSUMER_ORIGIN, + }, + }); +} From b509330e8a70dc781d9b647451bb507fc8285ce9 Mon Sep 17 00:00:00 2001 From: s1gr1d <32902192+s1gr1d@users.noreply.github.com> Date: Wed, 24 Jun 2026 14:48:21 +0200 Subject: [PATCH 2/2] ref(node): Streamline amqplib instrumentation --- .../tracing/amqplib/scenario-confirm.mjs | 59 ++++++++ .../suites/tracing/amqplib/scenario-error.mjs | 86 ++++++++++++ .../suites/tracing/amqplib/test.ts | 132 ++++++++++++++++-- 3 files changed, 262 insertions(+), 15 deletions(-) create mode 100644 dev-packages/node-integration-tests/suites/tracing/amqplib/scenario-confirm.mjs create mode 100644 dev-packages/node-integration-tests/suites/tracing/amqplib/scenario-error.mjs diff --git a/dev-packages/node-integration-tests/suites/tracing/amqplib/scenario-confirm.mjs b/dev-packages/node-integration-tests/suites/tracing/amqplib/scenario-confirm.mjs new file mode 100644 index 000000000000..d37d3109e062 --- /dev/null +++ b/dev-packages/node-integration-tests/suites/tracing/amqplib/scenario-confirm.mjs @@ -0,0 +1,59 @@ +import * as Sentry from '@sentry/node'; +import amqp from 'amqplib'; + +// Dedicated queue so the unconsumed confirm message doesn't leak into other scenarios sharing the broker. +const queueName = 'queue-confirm'; +const amqpUsername = 'sentry'; +const amqpPassword = 'sentry'; + +const AMQP_URL = `amqp://${amqpUsername}:${amqpPassword}@localhost:5672/`; + +const QUEUE_OPTIONS = { + durable: true, + exclusive: false, + autoDelete: false, + arguments: { + 'x-message-ttl': 30000, + 'x-max-length': 1000, + }, +}; + +(async () => { + const { connection, channel } = await connectToRabbitMQ(); + await channel.assertQueue(queueName, QUEUE_OPTIONS); + + await Sentry.startSpan({ name: 'root span' }, async () => { + await new Promise((resolve, reject) => { + // On a confirm channel, sendToQueue delegates to publish and registers a broker-confirm + // callback. The producer span ends when the broker confirms. + channel.sendToQueue(queueName, Buffer.from(JSON.stringify({ foo: 'bar01' })), {}, err => { + if (err) { + reject(err); + } else { + resolve(); + } + }); + }); + }); + + await channel.close(); + await connection.close(); +})(); + +async function connectToRabbitMQ() { + // Retry the connection: the broker can accept TCP before AMQP handshakes succeed, so the first + // connects may reject during the broker boot window. An unretried rejection would surface as + // an unhandled rejection and be reported as an error envelope instead of the expected transaction. + let lastError; + for (let attempt = 0; attempt < 5; attempt++) { + try { + const connection = await amqp.connect(AMQP_URL); + const channel = await connection.createConfirmChannel(); + return { connection, channel }; + } catch (err) { + lastError = err; + await new Promise(resolve => setTimeout(resolve, 1000)); + } + } + throw lastError; +} diff --git a/dev-packages/node-integration-tests/suites/tracing/amqplib/scenario-error.mjs b/dev-packages/node-integration-tests/suites/tracing/amqplib/scenario-error.mjs new file mode 100644 index 000000000000..5ab1e856d52f --- /dev/null +++ b/dev-packages/node-integration-tests/suites/tracing/amqplib/scenario-error.mjs @@ -0,0 +1,86 @@ +import * as Sentry from '@sentry/node'; +import amqp from 'amqplib'; + +// Dedicated queue to keep this scenario isolated from the others sharing the broker. +const queueName = 'queue-error'; +const amqpUsername = 'sentry'; +const amqpPassword = 'sentry'; + +const AMQP_URL = `amqp://${amqpUsername}:${amqpPassword}@localhost:5672/`; +const ACKNOWLEDGEMENT = { noAck: false }; + +const QUEUE_OPTIONS = { + durable: true, + exclusive: false, + autoDelete: false, + arguments: { + 'x-message-ttl': 30000, + 'x-max-length': 1000, + }, +}; + +(async () => { + const { connection, channel } = await connectToRabbitMQ(); + await createQueue(queueName, channel); + + const consumeMessagePromise = consumeMessageFromQueue(queueName, channel); + + await Sentry.startSpan({ name: 'root span' }, async () => { + sendMessageToQueue(queueName, channel, JSON.stringify({ foo: 'bar01' })); + }); + + await consumeMessagePromise; + + await channel.close(); + await connection.close(); +})(); + +async function connectToRabbitMQ() { + // Retry the connection: the broker can accept TCP before AMQP handshakes succeed, so the first + // connects may reject during the broker boot window. An unretried rejection would surface as + // an unhandled rejection and be reported as an error envelope instead of the expected transaction. + let lastError; + for (let attempt = 0; attempt < 5; attempt++) { + try { + const connection = await amqp.connect(AMQP_URL); + const channel = await connection.createChannel(); + return { connection, channel }; + } catch (err) { + lastError = err; + await new Promise(resolve => setTimeout(resolve, 1000)); + } + } + throw lastError; +} + +async function createQueue(queueName, channel) { + await channel.assertQueue(queueName, QUEUE_OPTIONS); +} + +function sendMessageToQueue(queueName, channel, message) { + channel.sendToQueue(queueName, Buffer.from(message)); +} + +async function consumer(queueName, channel) { + return new Promise((resolve, reject) => { + channel + .consume( + queueName, + message => { + if (message) { + // Reject the message without requeue. This ends the consumer span with an error status. + channel.nack(message, false, false); + resolve(); + } else { + reject(new Error('No message received')); + } + }, + ACKNOWLEDGEMENT, + ) + .catch(reject); + }); +} + +async function consumeMessageFromQueue(queueName, channel) { + await consumer(queueName, channel); +} diff --git a/dev-packages/node-integration-tests/suites/tracing/amqplib/test.ts b/dev-packages/node-integration-tests/suites/tracing/amqplib/test.ts index fd535a00bc74..f3381b2f5038 100644 --- a/dev-packages/node-integration-tests/suites/tracing/amqplib/test.ts +++ b/dev-packages/node-integration-tests/suites/tracing/amqplib/test.ts @@ -2,21 +2,26 @@ import type { TransactionEvent } from '@sentry/core'; import { afterAll, describe, expect } from 'vitest'; import { cleanupChildProcesses, createEsmAndCjsTests } from '../../../utils/runner'; -const EXPECTED_MESSAGE_SPAN_PRODUCER = expect.objectContaining({ - op: 'message', - data: expect.objectContaining({ - 'messaging.system': 'rabbitmq', - 'otel.kind': 'PRODUCER', - 'sentry.op': 'message', - 'sentry.origin': 'auto.amqplib.otel.publisher', - }), - status: 'ok', -}); +// Each scenario uses its own queue name to keep them isolated on the shared broker, so the +// expected producer span is parameterized by the routing key (queue name) it publishes to. +const expectedProducerSpan = (routingKey: string) => + expect.objectContaining({ + op: 'message', + data: expect.objectContaining({ + 'messaging.system': 'rabbitmq', + 'messaging.rabbitmq.routing_key': routingKey, + 'otel.kind': 'PRODUCER', + 'sentry.op': 'message', + 'sentry.origin': 'auto.amqplib.otel.publisher', + }), + status: 'ok', + }); const EXPECTED_MESSAGE_SPAN_CONSUMER = expect.objectContaining({ op: 'message', data: expect.objectContaining({ 'messaging.system': 'rabbitmq', + 'messaging.rabbitmq.routing_key': 'queue1', 'otel.kind': 'CONSUMER', 'sentry.op': 'message', 'sentry.origin': 'auto.amqplib.otel.consumer', @@ -32,7 +37,7 @@ describe('amqplib auto-instrumentation', () => { describe.each([ ['v1', { amqplib: '^1.0.0' }], ['v2', {}], - ])('%s', (version, additionalDependencies) => { + ])('%s', (_version, additionalDependencies) => { createEsmAndCjsTests( __dirname, 'scenario.mjs', @@ -56,14 +61,26 @@ describe('amqplib auto-instrumentation', () => { transaction: (transaction: TransactionEvent) => { receivedTransactions.push(transaction); - const producer = receivedTransactions.find(t => t.transaction === 'root span'); - const consumer = receivedTransactions.find(t => t.transaction === 'queue1 process'); + // The producer span is a child of the manually-started 'root span' transaction, so we + // identify it by its origin rather than by transaction name. The consumer span is its + // own transaction, identified by the origin on its trace context. + const producer = receivedTransactions.find(t => + t.spans?.some(s => s.data?.['sentry.origin'] === 'auto.amqplib.otel.publisher'), + ); + const consumer = receivedTransactions.find( + t => t.contexts?.trace?.data?.['sentry.origin'] === 'auto.amqplib.otel.consumer', + ); expect(producer).toBeDefined(); expect(consumer).toBeDefined(); - expect(producer!.spans?.length).toEqual(1); - expect(producer!.spans![0]).toMatchObject(EXPECTED_MESSAGE_SPAN_PRODUCER); + expect(producer!.transaction).toBe('root span'); + expect(consumer!.transaction).toBe('queue1 process'); + + const producerSpan = producer!.spans?.find( + s => s.data?.['sentry.origin'] === 'auto.amqplib.otel.publisher', + ); + expect(producerSpan).toMatchObject(expectedProducerSpan('queue1')); expect(consumer!.contexts?.trace).toMatchObject(EXPECTED_MESSAGE_SPAN_CONSUMER); }, @@ -74,5 +91,90 @@ describe('amqplib auto-instrumentation', () => { }, { additionalDependencies }, ); + + createEsmAndCjsTests( + __dirname, + 'scenario-error.mjs', + 'instrument.mjs', + (createTestRunner, test) => { + test('marks the consumer span as errored when the message is rejected', { timeout: 60_000 }, async () => { + // The error scenario emits the producer ('root span') and the rejected consumer + // ('queue1 process') transactions in any order, so we collect both and assert on the consumer. + const receivedTransactions: TransactionEvent[] = []; + + await createTestRunner() + .withDockerCompose({ + workingDirectory: [__dirname], + }) + .expect({ + transaction: (transaction: TransactionEvent) => { + receivedTransactions.push(transaction); + }, + }) + .expect({ + transaction: (transaction: TransactionEvent) => { + receivedTransactions.push(transaction); + + const consumer = receivedTransactions.find( + t => t.contexts?.trace?.data?.['sentry.origin'] === 'auto.amqplib.otel.consumer', + ); + + expect(consumer).toBeDefined(); + expect(consumer!.transaction).toBe('queue-error process'); + expect(consumer!.contexts?.trace).toMatchObject( + expect.objectContaining({ + op: 'message', + status: 'internal_error', + data: expect.objectContaining({ + 'messaging.system': 'rabbitmq', + 'otel.kind': 'CONSUMER', + 'sentry.op': 'message', + 'sentry.origin': 'auto.amqplib.otel.consumer', + }), + }), + ); + }, + }) + .start() + .completed(); + }); + }, + { additionalDependencies }, + ); + + createEsmAndCjsTests( + __dirname, + 'scenario-confirm.mjs', + 'instrument.mjs', + (createTestRunner, test) => { + test( + 'creates exactly one producer span when publishing on a confirm channel', + { timeout: 60_000 }, + async () => { + await createTestRunner() + .withDockerCompose({ + workingDirectory: [__dirname], + }) + .expect({ + transaction: (transaction: TransactionEvent) => { + expect(transaction.transaction).toBe('root span'); + + const producerSpans = transaction.spans?.filter( + s => s.data?.['sentry.origin'] === 'auto.amqplib.otel.publisher', + ); + + // The confirm channel internally calls the base publish; the instrumentation must not + // double-instrument, so we expect exactly one producer span. + expect(producerSpans?.length).toBe(1); + expect(producerSpans![0]).toMatchObject(expectedProducerSpan('queue-confirm')); + }, + }) + .start() + .completed(); + }, + ); + }, + { additionalDependencies }, + ); }); });