diff --git a/crt/aws-c-mqtt b/crt/aws-c-mqtt index 627c3334..9c093c40 160000 --- a/crt/aws-c-mqtt +++ b/crt/aws-c-mqtt @@ -1 +1 @@ -Subproject commit 627c3334e52021aa8d5772b6ca076884610f3219 +Subproject commit 9c093c4039478c04c7e55a1825c391dd4742fd61 diff --git a/lib/browser/mqtt_request_response.ts b/lib/browser/mqtt_request_response.ts index 7f1f8251..a69a5d84 100644 --- a/lib/browser/mqtt_request_response.ts +++ b/lib/browser/mqtt_request_response.ts @@ -808,6 +808,7 @@ export class RequestResponseClient extends BufferedEventEmitter implements mqtt_ let streamingOperation = operation as StreamingOperation; streamingOperation.operation.triggerIncomingPublishEvent({ + topic: event.topic, payload: event.payload }); } diff --git a/lib/common/mqtt_request_response.ts b/lib/common/mqtt_request_response.ts index 1f3c8832..c0949011 100644 --- a/lib/common/mqtt_request_response.ts +++ b/lib/common/mqtt_request_response.ts @@ -57,6 +57,11 @@ export interface SubscriptionStatusEvent { */ export interface IncomingPublishEvent { + /** + * MQTT Topic that the response was received on. + */ + topic: string + /** * The payload of the incoming message. */ @@ -235,4 +240,3 @@ export interface IRequestResponseClient { */ submitRequest(requestOptions: RequestResponseOperationOptions): Promise; } - diff --git a/source/mqtt_request_response.c b/source/mqtt_request_response.c index 85a6ccf4..fcef48f2 100644 --- a/source/mqtt_request_response.c +++ b/source/mqtt_request_response.c @@ -1270,6 +1270,7 @@ struct on_incoming_publish_user_data { struct aws_allocator *allocator; struct aws_request_response_streaming_operation_binding *binding_ref; + struct aws_byte_buf topic; struct aws_byte_buf *payload; }; @@ -1280,6 +1281,8 @@ static void s_on_incoming_publish_user_data_destroy(struct on_incoming_publish_u user_data->binding_ref = s_aws_request_response_streaming_operation_binding_release(user_data->binding_ref); + aws_byte_buf_clean_up(&user_data->topic); + if (user_data->payload != NULL) { aws_byte_buf_clean_up(user_data->payload); aws_mem_release(user_data->allocator, user_data->payload); @@ -1290,12 +1293,17 @@ static void s_on_incoming_publish_user_data_destroy(struct on_incoming_publish_u static struct on_incoming_publish_user_data *s_on_incoming_publish_user_data_new( struct aws_request_response_streaming_operation_binding *binding, + struct aws_byte_cursor topic, struct aws_byte_cursor payload) { struct on_incoming_publish_user_data *user_data = aws_mem_calloc(binding->allocator, 1, sizeof(struct on_incoming_publish_user_data)); user_data->allocator = binding->allocator; + if (aws_byte_buf_init_copy_from_cursor(&user_data->topic, binding->allocator, topic)) { + goto error; + } + user_data->payload = aws_mem_calloc(binding->allocator, 1, sizeof(struct aws_byte_buf)); if (aws_byte_buf_init_copy_from_cursor(user_data->payload, binding->allocator, payload)) { goto error; @@ -1325,6 +1333,11 @@ static int s_aws_create_napi_value_from_incoming_publish_event( AWS_NAPI_CALL( env, napi_create_object(env, &napi_event), { return aws_raise_error(AWS_CRT_NODEJS_ERROR_NAPI_FAILURE); }); + struct aws_byte_cursor topic_cursor = aws_byte_cursor_from_buf(&publish_event->topic); + if (aws_napi_attach_object_property_string(napi_event, env, AWS_NAPI_KEY_TOPIC, topic_cursor)) { + return AWS_OP_ERR; + } + if (aws_napi_attach_object_property_binary_as_finalizable_external( napi_event, env, AWS_NAPI_KEY_PAYLOAD, publish_event->payload)) { return AWS_OP_ERR; @@ -1386,10 +1399,14 @@ static void s_napi_mqtt_streaming_operation_on_incoming_publish( s_on_incoming_publish_user_data_destroy(publish_event); } -static void s_mqtt_streaming_operation_on_incoming_publish(struct aws_byte_cursor payload, void *user_data) { +static void s_mqtt_streaming_operation_on_incoming_publish( + struct aws_byte_cursor payload, + struct aws_byte_cursor topic, + void *user_data) { struct aws_request_response_streaming_operation_binding *binding = user_data; - struct on_incoming_publish_user_data *incoming_publish_ud = s_on_incoming_publish_user_data_new(binding, payload); + struct on_incoming_publish_user_data *incoming_publish_ud = + s_on_incoming_publish_user_data_new(binding, topic, payload); if (incoming_publish_ud == NULL) { return; } diff --git a/test/mqtt_request_response.ts b/test/mqtt_request_response.ts index 0470f27a..3a984f81 100644 --- a/test/mqtt_request_response.ts +++ b/test/mqtt_request_response.ts @@ -467,6 +467,7 @@ export async function do_streaming_operation_incoming_publish_test(version: Prot let incoming_publish : mqtt_request_response.IncomingPublishEvent = (await publish_received_promise)[0]; + expect(incoming_publish.topic).toEqual(topic_filter); expect(Buffer.from(incoming_publish.payload as ArrayBuffer)).toEqual(payload); stream.close(); @@ -539,4 +540,4 @@ export async function do_invalid_streaming_operation_config_test(config: Streami }).toThrow(expected_error); await context.close(); -} \ No newline at end of file +}