Skip to content

Commit

Permalink
Add topic argument to publish callback in request-response stream cli…
Browse files Browse the repository at this point in the history
…ent (#599)
  • Loading branch information
sfod authored Jan 29, 2025
1 parent b59d782 commit bd88f32
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 5 deletions.
1 change: 1 addition & 0 deletions lib/browser/mqtt_request_response.ts
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,7 @@ export class RequestResponseClient extends BufferedEventEmitter implements mqtt_

let streamingOperation = operation as StreamingOperation;
streamingOperation.operation.triggerIncomingPublishEvent({
topic: event.topic,
payload: event.payload
});
}
Expand Down
6 changes: 5 additions & 1 deletion lib/common/mqtt_request_response.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ export interface SubscriptionStatusEvent {
*/
export interface IncomingPublishEvent {

/**
* MQTT Topic that the response was received on.
*/
topic: string

/**
* The payload of the incoming message.
*/
Expand Down Expand Up @@ -235,4 +240,3 @@ export interface IRequestResponseClient {
*/
submitRequest(requestOptions: RequestResponseOperationOptions): Promise<Response>;
}

21 changes: 19 additions & 2 deletions source/mqtt_request_response.c
Original file line number Diff line number Diff line change
Expand Up @@ -1270,6 +1270,7 @@ struct on_incoming_publish_user_data {
struct aws_allocator *allocator;

struct aws_request_response_streaming_operation_binding *binding_ref;
struct aws_byte_buf topic;
struct aws_byte_buf *payload;
};

Expand All @@ -1280,6 +1281,8 @@ static void s_on_incoming_publish_user_data_destroy(struct on_incoming_publish_u

user_data->binding_ref = s_aws_request_response_streaming_operation_binding_release(user_data->binding_ref);

aws_byte_buf_clean_up(&user_data->topic);

if (user_data->payload != NULL) {
aws_byte_buf_clean_up(user_data->payload);
aws_mem_release(user_data->allocator, user_data->payload);
Expand All @@ -1290,12 +1293,17 @@ static void s_on_incoming_publish_user_data_destroy(struct on_incoming_publish_u

static struct on_incoming_publish_user_data *s_on_incoming_publish_user_data_new(
struct aws_request_response_streaming_operation_binding *binding,
struct aws_byte_cursor topic,
struct aws_byte_cursor payload) {

struct on_incoming_publish_user_data *user_data =
aws_mem_calloc(binding->allocator, 1, sizeof(struct on_incoming_publish_user_data));
user_data->allocator = binding->allocator;

if (aws_byte_buf_init_copy_from_cursor(&user_data->topic, binding->allocator, topic)) {
goto error;
}

user_data->payload = aws_mem_calloc(binding->allocator, 1, sizeof(struct aws_byte_buf));
if (aws_byte_buf_init_copy_from_cursor(user_data->payload, binding->allocator, payload)) {
goto error;
Expand Down Expand Up @@ -1325,6 +1333,11 @@ static int s_aws_create_napi_value_from_incoming_publish_event(
AWS_NAPI_CALL(
env, napi_create_object(env, &napi_event), { return aws_raise_error(AWS_CRT_NODEJS_ERROR_NAPI_FAILURE); });

struct aws_byte_cursor topic_cursor = aws_byte_cursor_from_buf(&publish_event->topic);
if (aws_napi_attach_object_property_string(napi_event, env, AWS_NAPI_KEY_TOPIC, topic_cursor)) {
return AWS_OP_ERR;
}

if (aws_napi_attach_object_property_binary_as_finalizable_external(
napi_event, env, AWS_NAPI_KEY_PAYLOAD, publish_event->payload)) {
return AWS_OP_ERR;
Expand Down Expand Up @@ -1386,10 +1399,14 @@ static void s_napi_mqtt_streaming_operation_on_incoming_publish(
s_on_incoming_publish_user_data_destroy(publish_event);
}

static void s_mqtt_streaming_operation_on_incoming_publish(struct aws_byte_cursor payload, void *user_data) {
static void s_mqtt_streaming_operation_on_incoming_publish(
struct aws_byte_cursor payload,
struct aws_byte_cursor topic,
void *user_data) {
struct aws_request_response_streaming_operation_binding *binding = user_data;

struct on_incoming_publish_user_data *incoming_publish_ud = s_on_incoming_publish_user_data_new(binding, payload);
struct on_incoming_publish_user_data *incoming_publish_ud =
s_on_incoming_publish_user_data_new(binding, topic, payload);
if (incoming_publish_ud == NULL) {
return;
}
Expand Down
3 changes: 2 additions & 1 deletion test/mqtt_request_response.ts
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ export async function do_streaming_operation_incoming_publish_test(version: Prot

let incoming_publish : mqtt_request_response.IncomingPublishEvent = (await publish_received_promise)[0];

expect(incoming_publish.topic).toEqual(topic_filter);
expect(Buffer.from(incoming_publish.payload as ArrayBuffer)).toEqual(payload);

stream.close();
Expand Down Expand Up @@ -539,4 +540,4 @@ export async function do_invalid_streaming_operation_config_test(config: Streami
}).toThrow(expected_error);

await context.close();
}
}

0 comments on commit bd88f32

Please sign in to comment.