Skip to content

Commit db25654

Browse files
michaelklishinansd
authored andcommitted
4.1: Avoid an exception when an AMQP 0-9-1-originating message with expiration set is converted for an MQTT consumer (#12710)
* MQTT: avoid an exception when an AMQP 0-9-1 publisher publishes a message that has expiration set. Stack trace was contributed in #12707 by @rdsilio. * mc_mqtt_SUITE test for #12707 #12710 * MQTT protocol_interop_SUITE: new test for #12710 #12707 * Simplify tests --------- Co-authored-by: David Ansari <[email protected]> (cherry picked from commit c78bc8a)
1 parent 707a0ff commit db25654

File tree

3 files changed

+42
-3
lines changed

3 files changed

+42
-3
lines changed

deps/rabbitmq_mqtt/src/mc_mqtt.erl

+1-1
Original file line numberDiff line numberDiff line change
@@ -426,7 +426,7 @@ protocol_state(Msg = #mqtt_msg{props = Props0,
426426
undefined ->
427427
Props2;
428428
Ttl ->
429-
case maps:get(?ANN_TIMESTAMP, Anns) of
429+
case maps:get(?ANN_TIMESTAMP, Anns, undefined) of
430430
undefined ->
431431
Props2;
432432
Timestamp ->

deps/rabbitmq_mqtt/test/mc_mqtt_SUITE.erl

+14-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ groups() ->
3333
mqtt_amqp,
3434
mqtt_amqp_alt,
3535
amqp_mqtt,
36-
is_persistent
36+
is_persistent,
37+
amqpl_to_mqtt_gh_12707
3738
]}
3839
].
3940

@@ -156,6 +157,18 @@ roundtrip_amqpl(_Config) ->
156157
ExpectedUserProperty = lists:keysort(1, UserProperty),
157158
?assertMatch(#{'User-Property' := ExpectedUserProperty}, Props).
158159

160+
amqpl_to_mqtt_gh_12707(_Config) ->
161+
Props = #'P_basic'{expiration = <<"12707">>},
162+
Payload = [<<"gh_12707">>],
163+
Content = #content{properties = Props,
164+
payload_fragments_rev = Payload},
165+
Anns = #{?ANN_EXCHANGE => <<"amq.topic">>,
166+
?ANN_ROUTING_KEYS => [<<"dummy">>]},
167+
OriginalMsg = mc:init(mc_amqpl, Content, Anns),
168+
Converted = mc:convert(mc_mqtt, OriginalMsg),
169+
?assertMatch(#mqtt_msg{}, mc:protocol_state(Converted)),
170+
?assertEqual(12707, mc:get_annotation(ttl, Converted)).
171+
159172
%% Non-UTF-8 Correlation Data should also be converted (via AMQP 0.9.1 header x-correlation-id).
160173
roundtrip_amqpl_correlation(_Config) ->
161174
Msg0 = mqtt_msg(),

deps/rabbitmq_mqtt/test/protocol_interop_SUITE.erl

+27-1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ groups() ->
3535
[{cluster_size_1, [shuffle],
3636
[
3737
mqtt_amqpl_mqtt,
38+
amqpl_mqtt_gh_12707,
3839
mqtt_amqp_mqtt,
3940
amqp_mqtt_amqp,
4041
mqtt_stomp_mqtt,
@@ -104,7 +105,6 @@ mqtt_amqpl_mqtt(Config) ->
104105
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = Q,
105106
exchange = <<"amq.topic">>,
106107
routing_key = <<"my.topic">>}),
107-
%% MQTT 5.0 to AMQP 0.9.1
108108
C = connect(ClientId, Config),
109109
MqttResponseTopic = <<"response/topic">>,
110110
{ok, _, [1]} = emqtt:subscribe(C, #{'Subscription-Identifier' => 999}, [{MqttResponseTopic, [{qos, 1}]}]),
@@ -169,6 +169,32 @@ mqtt_amqpl_mqtt(Config) ->
169169

170170
ok = emqtt:disconnect(C).
171171

172+
amqpl_mqtt_gh_12707(Config) ->
173+
ClientId = atom_to_binary(?FUNCTION_NAME),
174+
Topic = Payload = <<"gh_12707">>,
175+
C = connect(ClientId, Config),
176+
{ok, _, [1]} = emqtt:subscribe(C, Topic, qos1),
177+
178+
Ch = rabbit_ct_client_helpers:open_channel(Config),
179+
amqp_channel:call(Ch,
180+
#'basic.publish'{exchange = <<"amq.topic">>,
181+
routing_key = Topic},
182+
#amqp_msg{payload = Payload,
183+
props = #'P_basic'{expiration = <<"12707">>,
184+
headers = []}}),
185+
186+
receive {publish,
187+
#{topic := MqttTopic,
188+
payload := MqttPayload}} ->
189+
?assertEqual(Topic, MqttTopic),
190+
?assertEqual(Payload, MqttPayload)
191+
after 5000 ->
192+
ct:fail("did not receive a delivery")
193+
end,
194+
195+
ok = rabbit_ct_client_helpers:close_channel(Ch),
196+
ok = emqtt:disconnect(C).
197+
172198
mqtt_amqp_mqtt(Config) ->
173199
Host = ?config(rmq_hostname, Config),
174200
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),

0 commit comments

Comments
 (0)