Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import * as Sentry from '@sentry/node';
import amqp from 'amqplib';

// Dedicated queue so the unconsumed confirm message doesn't leak into other scenarios sharing the broker.
const queueName = 'queue-confirm';
const amqpUsername = 'sentry';
const amqpPassword = 'sentry';

const AMQP_URL = `amqp://${amqpUsername}:${amqpPassword}@localhost:5672/`;

const QUEUE_OPTIONS = {
durable: true,
exclusive: false,
autoDelete: false,
arguments: {
'x-message-ttl': 30000,
'x-max-length': 1000,
},
};

(async () => {
const { connection, channel } = await connectToRabbitMQ();
await channel.assertQueue(queueName, QUEUE_OPTIONS);

await Sentry.startSpan({ name: 'root span' }, async () => {
await new Promise((resolve, reject) => {
// On a confirm channel, sendToQueue delegates to publish and registers a broker-confirm
// callback. The producer span ends when the broker confirms.
channel.sendToQueue(queueName, Buffer.from(JSON.stringify({ foo: 'bar01' })), {}, err => {
if (err) {
reject(err);
} else {
resolve();
}
});
});
});

await channel.close();
await connection.close();
})();

async function connectToRabbitMQ() {
// Retry the connection: the broker can accept TCP before AMQP handshakes succeed, so the first
// connects may reject during the broker boot window. An unretried rejection would surface as
// an unhandled rejection and be reported as an error envelope instead of the expected transaction.
let lastError;
for (let attempt = 0; attempt < 5; attempt++) {
try {
const connection = await amqp.connect(AMQP_URL);
const channel = await connection.createConfirmChannel();
return { connection, channel };
} catch (err) {
lastError = err;
await new Promise(resolve => setTimeout(resolve, 1000));
}
}
throw lastError;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import * as Sentry from '@sentry/node';
import amqp from 'amqplib';

// Dedicated queue to keep this scenario isolated from the others sharing the broker.
const queueName = 'queue-error';
const amqpUsername = 'sentry';
const amqpPassword = 'sentry';

const AMQP_URL = `amqp://${amqpUsername}:${amqpPassword}@localhost:5672/`;
const ACKNOWLEDGEMENT = { noAck: false };

const QUEUE_OPTIONS = {
durable: true,
exclusive: false,
autoDelete: false,
arguments: {
'x-message-ttl': 30000,
'x-max-length': 1000,
},
};

(async () => {
const { connection, channel } = await connectToRabbitMQ();
await createQueue(queueName, channel);

const consumeMessagePromise = consumeMessageFromQueue(queueName, channel);

await Sentry.startSpan({ name: 'root span' }, async () => {
sendMessageToQueue(queueName, channel, JSON.stringify({ foo: 'bar01' }));
});

await consumeMessagePromise;

await channel.close();
await connection.close();
})();

async function connectToRabbitMQ() {
// Retry the connection: the broker can accept TCP before AMQP handshakes succeed, so the first
// connects may reject during the broker boot window. An unretried rejection would surface as
// an unhandled rejection and be reported as an error envelope instead of the expected transaction.
let lastError;
for (let attempt = 0; attempt < 5; attempt++) {
try {
const connection = await amqp.connect(AMQP_URL);
const channel = await connection.createChannel();
return { connection, channel };
} catch (err) {
lastError = err;
await new Promise(resolve => setTimeout(resolve, 1000));
}
}
throw lastError;
}

async function createQueue(queueName, channel) {
await channel.assertQueue(queueName, QUEUE_OPTIONS);
}

function sendMessageToQueue(queueName, channel, message) {
channel.sendToQueue(queueName, Buffer.from(message));
}

async function consumer(queueName, channel) {
return new Promise((resolve, reject) => {
channel
.consume(
queueName,
message => {
if (message) {
// Reject the message without requeue. This ends the consumer span with an error status.
channel.nack(message, false, false);
resolve();
} else {
reject(new Error('No message received'));
}
},
ACKNOWLEDGEMENT,
)
.catch(reject);
});
}

async function consumeMessageFromQueue(queueName, channel) {
await consumer(queueName, channel);
}
132 changes: 117 additions & 15 deletions dev-packages/node-integration-tests/suites/tracing/amqplib/test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,26 @@ import type { TransactionEvent } from '@sentry/core';
import { afterAll, describe, expect } from 'vitest';
import { cleanupChildProcesses, createEsmAndCjsTests } from '../../../utils/runner';

const EXPECTED_MESSAGE_SPAN_PRODUCER = expect.objectContaining({
op: 'message',
data: expect.objectContaining({
'messaging.system': 'rabbitmq',
'otel.kind': 'PRODUCER',
'sentry.op': 'message',
'sentry.origin': 'auto.amqplib.otel.publisher',
}),
status: 'ok',
});
// Each scenario uses its own queue name to keep them isolated on the shared broker, so the
// expected producer span is parameterized by the routing key (queue name) it publishes to.
const expectedProducerSpan = (routingKey: string) =>
expect.objectContaining({
op: 'message',
data: expect.objectContaining({
'messaging.system': 'rabbitmq',
'messaging.rabbitmq.routing_key': routingKey,
'otel.kind': 'PRODUCER',
'sentry.op': 'message',
'sentry.origin': 'auto.amqplib.otel.publisher',
}),
status: 'ok',
});

const EXPECTED_MESSAGE_SPAN_CONSUMER = expect.objectContaining({
op: 'message',
data: expect.objectContaining({
'messaging.system': 'rabbitmq',
'messaging.rabbitmq.routing_key': 'queue1',
'otel.kind': 'CONSUMER',
'sentry.op': 'message',
'sentry.origin': 'auto.amqplib.otel.consumer',
Expand All @@ -32,7 +37,7 @@ describe('amqplib auto-instrumentation', () => {
describe.each([
['v1', { amqplib: '^1.0.0' }],
['v2', {}],
])('%s', (version, additionalDependencies) => {
])('%s', (_version, additionalDependencies) => {
createEsmAndCjsTests(
__dirname,
'scenario.mjs',
Expand All @@ -56,14 +61,26 @@ describe('amqplib auto-instrumentation', () => {
transaction: (transaction: TransactionEvent) => {
receivedTransactions.push(transaction);

const producer = receivedTransactions.find(t => t.transaction === 'root span');
const consumer = receivedTransactions.find(t => t.transaction === 'queue1 process');
// The producer span is a child of the manually-started 'root span' transaction, so we
// identify it by its origin rather than by transaction name. The consumer span is its
// own transaction, identified by the origin on its trace context.
const producer = receivedTransactions.find(t =>
t.spans?.some(s => s.data?.['sentry.origin'] === 'auto.amqplib.otel.publisher'),
);
const consumer = receivedTransactions.find(
t => t.contexts?.trace?.data?.['sentry.origin'] === 'auto.amqplib.otel.consumer',
);

expect(producer).toBeDefined();
expect(consumer).toBeDefined();

expect(producer!.spans?.length).toEqual(1);
expect(producer!.spans![0]).toMatchObject(EXPECTED_MESSAGE_SPAN_PRODUCER);
expect(producer!.transaction).toBe('root span');
expect(consumer!.transaction).toBe('queue1 process');

const producerSpan = producer!.spans?.find(
s => s.data?.['sentry.origin'] === 'auto.amqplib.otel.publisher',
);
expect(producerSpan).toMatchObject(expectedProducerSpan('queue1'));

expect(consumer!.contexts?.trace).toMatchObject(EXPECTED_MESSAGE_SPAN_CONSUMER);
},
Expand All @@ -74,5 +91,90 @@ describe('amqplib auto-instrumentation', () => {
},
{ additionalDependencies },
);

createEsmAndCjsTests(
__dirname,
'scenario-error.mjs',
'instrument.mjs',
(createTestRunner, test) => {
test('marks the consumer span as errored when the message is rejected', { timeout: 60_000 }, async () => {
// The error scenario emits the producer ('root span') and the rejected consumer
// ('queue1 process') transactions in any order, so we collect both and assert on the consumer.
const receivedTransactions: TransactionEvent[] = [];

await createTestRunner()
.withDockerCompose({
workingDirectory: [__dirname],
})
.expect({
transaction: (transaction: TransactionEvent) => {
receivedTransactions.push(transaction);
},
})
.expect({
transaction: (transaction: TransactionEvent) => {
receivedTransactions.push(transaction);

const consumer = receivedTransactions.find(
t => t.contexts?.trace?.data?.['sentry.origin'] === 'auto.amqplib.otel.consumer',
);

expect(consumer).toBeDefined();
expect(consumer!.transaction).toBe('queue-error process');
expect(consumer!.contexts?.trace).toMatchObject(
expect.objectContaining({
op: 'message',
status: 'internal_error',
data: expect.objectContaining({
'messaging.system': 'rabbitmq',
'otel.kind': 'CONSUMER',
'sentry.op': 'message',
'sentry.origin': 'auto.amqplib.otel.consumer',
}),
}),
);
},
})
.start()
.completed();
});
},
{ additionalDependencies },
);

createEsmAndCjsTests(
__dirname,
'scenario-confirm.mjs',
'instrument.mjs',
(createTestRunner, test) => {
test(
'creates exactly one producer span when publishing on a confirm channel',
{ timeout: 60_000 },
async () => {
await createTestRunner()
.withDockerCompose({
workingDirectory: [__dirname],
})
.expect({
transaction: (transaction: TransactionEvent) => {
expect(transaction.transaction).toBe('root span');

const producerSpans = transaction.spans?.filter(
s => s.data?.['sentry.origin'] === 'auto.amqplib.otel.publisher',
);

// The confirm channel internally calls the base publish; the instrumentation must not
// double-instrument, so we expect exactly one producer span.
expect(producerSpans?.length).toBe(1);
expect(producerSpans![0]).toMatchObject(expectedProducerSpan('queue-confirm'));
},
})
.start()
.completed();
},
);
},
{ additionalDependencies },
);
});
});
17 changes: 3 additions & 14 deletions packages/node/src/integrations/tracing/amqplib/index.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,11 @@
import type { Span } from '@opentelemetry/api';
import { AmqplibInstrumentation } from './vendored/amqplib';
import type { AmqplibInstrumentationConfig } from './vendored/types';
import type { IntegrationFn } from '@sentry/core';
import { defineIntegration } from '@sentry/core';
import { addOriginToSpan, generateInstrumentOnce } from '@sentry/node-core';
import { generateInstrumentOnce } from '@sentry/node-core';
import { AmqplibInstrumentation } from './vendored/instrumentation';

const INTEGRATION_NAME = 'Amqplib';

const config: AmqplibInstrumentationConfig = {
consumeEndHook: (span: Span) => {
addOriginToSpan(span, 'auto.amqplib.otel.consumer');
},
publishHook: (span: Span) => {
addOriginToSpan(span, 'auto.amqplib.otel.publisher');
},
};

export const instrumentAmqplib = generateInstrumentOnce(INTEGRATION_NAME, () => new AmqplibInstrumentation(config));
export const instrumentAmqplib = generateInstrumentOnce(INTEGRATION_NAME, () => new AmqplibInstrumentation());

const _amqplibIntegration = (() => {
return {
Expand Down
Loading
Loading