Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ export class Message {
getPartitionKey(): string;
getOrderingKey(): string;
getProducerName(): string;
isReplicated(): boolean;
getReplicatedFrom(): string;
getEncryptionContext(): EncryptionContext | null;
}

Expand Down
25 changes: 25 additions & 0 deletions src/Message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ Napi::Object Message::Init(Napi::Env env, Napi::Object exports) {
InstanceMethod("getPartitionKey", &Message::GetPartitionKey),
InstanceMethod("getOrderingKey", &Message::GetOrderingKey),
InstanceMethod("getProducerName", &Message::GetProducerName),
InstanceMethod("isReplicated", &Message::GetIsReplicated),
InstanceMethod("getReplicatedFrom", &Message::GetReplicatedFrom),
InstanceMethod("getEncryptionContext", &Message::GetEncryptionContext)});

constructor = Napi::Persistent(func);
Expand Down Expand Up @@ -172,6 +174,29 @@ Napi::Value Message::GetProducerName(const Napi::CallbackInfo &info) {
return Napi::String::New(env, pulsar_message_get_producer_name(this->cMessage.get()));
}

Napi::Value Message::GetIsReplicated(const Napi::CallbackInfo &info) {
Comment thread
BewareMyPower marked this conversation as resolved.
Outdated
Napi::Env env = info.Env();
if (!ValidateCMessage(env)) {
return env.Null();
}

const char *replicatedFrom = pulsar_message_get_replicated_from(this->cMessage.get());
return Napi::Boolean::New(env, replicatedFrom && replicatedFrom[0] != '\0');
}

Napi::Value Message::GetReplicatedFrom(const Napi::CallbackInfo &info) {
Napi::Env env = info.Env();
if (!ValidateCMessage(env)) {
return env.Null();
}

const char *replicatedFrom = pulsar_message_get_replicated_from(this->cMessage.get());
if (!replicatedFrom) {
return Napi::String::New(env, "");
}
return Napi::String::New(env, replicatedFrom);
}

Napi::Value Message::GetEncryptionContext(const Napi::CallbackInfo &info) {
Napi::Env env = info.Env();
if (!ValidateCMessage(env)) {
Expand Down
2 changes: 2 additions & 0 deletions src/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class Message : public Napi::ObjectWrap<Message> {
Napi::Value GetPartitionKey(const Napi::CallbackInfo &info);
Napi::Value GetOrderingKey(const Napi::CallbackInfo &info);
Napi::Value GetProducerName(const Napi::CallbackInfo &info);
Napi::Value GetIsReplicated(const Napi::CallbackInfo &info);
Napi::Value GetReplicatedFrom(const Napi::CallbackInfo &info);
Napi::Value GetRedeliveryCount(const Napi::CallbackInfo &info);
Napi::Value GetEncryptionContext(const Napi::CallbackInfo &info);
bool ValidateCMessage(Napi::Env env);
Expand Down
2 changes: 2 additions & 0 deletions tests/end_to_end.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ const Pulsar = require('../index');
const msg = await consumer.receive();
consumer.acknowledge(msg);
expect(msg.getProducerName()).toBe(producerName);
expect(msg.isReplicated()).toBe(false);
expect(msg.getReplicatedFrom()).toBe('');
results.push(msg.getData().toString());
}
expect(lodash.difference(messages, results)).toEqual([]);
Expand Down
2 changes: 2 additions & 0 deletions tstest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,8 @@ import Pulsar = require('./index');
const publishTime: number = message1.getPublishTimestamp();
const eventTime: number = message1.getEventTimestamp();
const redeliveryCount: number = message1.getRedeliveryCount();
const isReplicated: boolean = message1.isReplicated();
const replicatedFrom: string = message1.getReplicatedFrom();
const partitionKey: string = message1.getPartitionKey();

const message3: Pulsar.Message = await reader1.readNext();
Expand Down
Loading