Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions handwritten/pubsub/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@
"@google-cloud/precise-date": "^5.0.0",
"@google-cloud/projectify": "^5.0.0",
"@google-cloud/promisify": "^5.0.0",
"@opentelemetry/api": "~1.9.0",
"@opentelemetry/core": "^1.30.1",
"@opentelemetry/semantic-conventions": "~1.39.0",
"@opentelemetry/api": "^1.9.1",
"@opentelemetry/core": "^2.8.0",
"@opentelemetry/semantic-conventions": "^1.41.1",
"arrify": "^2.0.0",
"extend": "^3.0.2",
"google-auth-library": "^10.5.0",
Expand All @@ -70,7 +70,7 @@
},
"devDependencies": {
"@grpc/proto-loader": "^0.8.0",
"@opentelemetry/sdk-trace-base": "^1.17.0",
"@opentelemetry/sdk-trace-base": "^2.8.0",
"@types/duplexify": "^3.6.4",
"@types/extend": "^3.0.4",
"@types/lodash.snakecase": "^4.1.9",
Expand Down
384 changes: 192 additions & 192 deletions handwritten/pubsub/protos/protos.js

Large diffs are not rendered by default.

12 changes: 9 additions & 3 deletions handwritten/pubsub/src/lease-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,16 @@ import {logs as baseLogs, LoggingFunction} from './logs';
* @private
*/
export const logs = {
callbackDelivery: baseLogs.pubsub.sublog('callback-delivery') as LoggingFunction,
callbackExceptions: baseLogs.pubsub.sublog('callback-exceptions') as LoggingFunction,
callbackDelivery: baseLogs.pubsub.sublog(
'callback-delivery',
) as LoggingFunction,
callbackExceptions: baseLogs.pubsub.sublog(
'callback-exceptions',
) as LoggingFunction,
expiry: baseLogs.pubsub.sublog('expiry') as LoggingFunction,
subscriberFlowControl: baseLogs.pubsub.sublog('subscriber-flow-control') as LoggingFunction,
subscriberFlowControl: baseLogs.pubsub.sublog(
'subscriber-flow-control',
) as LoggingFunction,
};

export interface FlowControlOptions {
Expand Down
4 changes: 3 additions & 1 deletion handwritten/pubsub/src/message-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ import {logs as baseLogs, LoggingFunction} from './logs';
* @private
*/
export const logs = {
subscriberStreams: baseLogs.pubsub.sublog('subscriber-streams') as LoggingFunction,
subscriberStreams: baseLogs.pubsub.sublog(
'subscriber-streams',
) as LoggingFunction,
};

/*!
Expand Down
2 changes: 1 addition & 1 deletion handwritten/pubsub/src/snapshot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
import {CallOptions} from 'google-gax';

import {google} from '../protos/protos';
import {PubSub} from './pubsub';
import {
PubSub,
EmptyCallback,
EmptyResponse,
RequestCallback,
Expand Down
66 changes: 50 additions & 16 deletions handwritten/pubsub/src/telemetry-tracing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,21 @@ import {
Link,
} from '@opentelemetry/api';
import {W3CTraceContextPropagator} from '@opentelemetry/core';
import {ATTR_CODE_FUNCTION_NAME} from '@opentelemetry/semantic-conventions';
import {
ATTR_MESSAGING_SYSTEM,
MESSAGING_SYSTEM_VALUE_GCP_PUBSUB,
ATTR_MESSAGING_DESTINATION_NAME,
ATTR_MESSAGING_BATCH_MESSAGE_COUNT,
ATTR_MESSAGING_MESSAGE_ENVELOPE_SIZE,
ATTR_MESSAGING_OPERATION,
ATTR_MESSAGING_OPERATION_TYPE,
ATTR_MESSAGING_OPERATION_NAME,
ATTR_MESSAGING_GCP_PUBSUB_MESSAGE_ACK_ID,
ATTR_MESSAGING_GCP_PUBSUB_MESSAGE_ORDERING_KEY,
ATTR_MESSAGING_GCP_PUBSUB_MESSAGE_ACK_DEADLINE,
ATTR_CODE_FUNCTION,
} from '@opentelemetry/semantic-conventions/incubating';
import {Attributes, PubsubMessage} from './publisher/pubsub-message';
import {Duration} from './temporal';

Expand Down Expand Up @@ -325,35 +340,46 @@ export class PubsubSpans {
const spanAttributes = {
// Add Opentelemetry semantic convention attributes to the span, based on:
// https://github.com/open-telemetry/semantic-conventions/blob/v1.24.0/docs/messaging/messaging-spans.md
['messaging.system']: 'gcp_pubsub',
['messaging.destination.name']: destinationId ?? destinationName,
[ATTR_MESSAGING_SYSTEM]: MESSAGING_SYSTEM_VALUE_GCP_PUBSUB,
[ATTR_MESSAGING_DESTINATION_NAME]: destinationId ?? destinationName,
['gcp.project_id']: projectId,
['code.function']: caller ?? 'unknown',
[ATTR_CODE_FUNCTION]: caller ?? 'unknown',
[ATTR_CODE_FUNCTION_NAME]: caller ?? 'unknown',
} as SpanAttributes;

if (operation) {
spanAttributes[ATTR_MESSAGING_OPERATION] = operation;

// Populate new messaging.operation.type and messaging.operation.name attributes
if (operation === 'create') {
spanAttributes[ATTR_MESSAGING_OPERATION_TYPE] = 'send';
spanAttributes[ATTR_MESSAGING_OPERATION_NAME] = 'publish';
} else if (operation === 'receive') {
spanAttributes[ATTR_MESSAGING_OPERATION_TYPE] = 'receive';
}
}

if (message) {
if (message.calculatedSize) {
spanAttributes['messaging.message.envelope.size'] =
spanAttributes[ATTR_MESSAGING_MESSAGE_ENVELOPE_SIZE] =
message.calculatedSize;
} else {
if (message.data?.length) {
spanAttributes['messaging.message.envelope.size'] =
spanAttributes[ATTR_MESSAGING_MESSAGE_ENVELOPE_SIZE] =
message.data?.length;
}
}
if (message.orderingKey) {
spanAttributes['messaging.gcp_pubsub.message.ordering_key'] =
spanAttributes[ATTR_MESSAGING_GCP_PUBSUB_MESSAGE_ORDERING_KEY] =
message.orderingKey;
}
if (message.isExactlyOnceDelivery) {
spanAttributes['messaging.gcp_pubsub.message.exactly_once_delivery'] =
message.isExactlyOnceDelivery;
}
if (message.ackId) {
spanAttributes['messaging.gcp_pubsub.message.ack_id'] = message.ackId;
}
if (operation) {
spanAttributes['messaging.operation'] = operation;
spanAttributes[ATTR_MESSAGING_GCP_PUBSUB_MESSAGE_ACK_ID] =
message.ackId;
}
}

Expand Down Expand Up @@ -381,7 +407,7 @@ export class PubsubSpans {
});
if (topicInfo.topicId) {
span.updateName(`${topicInfo.topicId} create`);
span.setAttribute('messaging.destination.name', topicInfo.topicId);
span.setAttribute(ATTR_MESSAGING_DESTINATION_NAME, topicInfo.topicId);
}

return span;
Expand All @@ -391,7 +417,7 @@ export class PubsubSpans {
const topicInfo = getTopicInfo(topicName);
if (topicInfo.topicId) {
span.updateName(`${topicInfo.topicId} create`);
span.setAttribute('messaging.destination.name', topicInfo.topicId);
span.setAttribute(ATTR_MESSAGING_DESTINATION_NAME, topicInfo.topicId);
} else {
span.updateName(`${topicName} create`);
}
Expand Down Expand Up @@ -419,7 +445,7 @@ export class PubsubSpans {
'receive',
);
if (subInfo.subId) {
attributes['messaging.destination.name'] = subInfo.subId;
attributes[ATTR_MESSAGING_DESTINATION_NAME] = subInfo.subId;
}

if (context) {
Expand Down Expand Up @@ -500,7 +526,7 @@ export class PubsubSpans {
},
ROOT_CONTEXT,
);
span?.setAttribute('messaging.batch.message_count', messages.length);
span?.setAttribute(ATTR_MESSAGING_BATCH_MESSAGE_COUNT, messages.length);
if (span) {
// Also attempt to link from message spans back to the publish RPC span.
messages.forEach(m => {
Expand Down Expand Up @@ -544,7 +570,9 @@ export class PubsubSpans {
ROOT_CONTEXT,
);

span?.setAttribute('messaging.batch.message_count', messageSpans.length);
span?.setAttribute(ATTR_MESSAGING_BATCH_MESSAGE_COUNT, messageSpans.length);
span?.setAttribute(ATTR_MESSAGING_OPERATION_TYPE, 'settle');
span?.setAttribute(ATTR_MESSAGING_OPERATION_NAME, 'ack');

if (span) {
// Also attempt to link from the subscribe span(s) back to the publish RPC span.
Expand Down Expand Up @@ -592,7 +620,9 @@ export class PubsubSpans {
ROOT_CONTEXT,
);

span?.setAttribute('messaging.batch.message_count', messageSpans.length);
span?.setAttribute(ATTR_MESSAGING_BATCH_MESSAGE_COUNT, messageSpans.length);
span?.setAttribute(ATTR_MESSAGING_OPERATION_TYPE, 'settle');
span?.setAttribute(ATTR_MESSAGING_OPERATION_NAME, type);

if (span) {
// Also attempt to link from the subscribe span(s) back to the publish RPC span.
Expand All @@ -608,6 +638,10 @@ export class PubsubSpans {
'messaging.gcp_pubsub.message.ack_deadline_seconds',
deadline.totalOf('second'),
);
span?.setAttribute(
ATTR_MESSAGING_GCP_PUBSUB_MESSAGE_ACK_DEADLINE,
deadline.totalOf('second'),
);
}

if (isInitial !== undefined) {
Expand Down
12 changes: 12 additions & 0 deletions handwritten/pubsub/test/publisher/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,18 @@ describe('Publisher', () => {
createdSpan.attributes['messaging.destination.name'],
topicId,
);
assert.strictEqual(
createdSpan.attributes['messaging.operation.type'],
'send',
);
assert.strictEqual(
createdSpan.attributes['messaging.operation.name'],
'publish',
);
assert.strictEqual(
createdSpan.attributes['code.function.name'],
'Publisher.publishMessage',
);
assert.strictEqual(createdSpan.name, `${topicId} create`);
assert.strictEqual(
createdSpan.kind,
Expand Down
5 changes: 4 additions & 1 deletion handwritten/pubsub/test/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1200,7 +1200,10 @@ describe('Subscriber', () => {
assert.strictEqual(spans[0].events.length, 2);
const firstSpan = spans.pop();
assert.ok(firstSpan);
assert.strictEqual(firstSpan.parentSpanId, parentSpanContext.spanId);
assert.strictEqual(
firstSpan.parentSpanContext?.spanId,
parentSpanContext.spanId,
);
assert.strictEqual(
firstSpan.name,
`${subId} subscribe`,
Expand Down
Loading
Loading