From f5b1f4eafdc799b321e4f542591044f81cd7de5c Mon Sep 17 00:00:00 2001 From: Gabor Olah Date: Mon, 4 Dec 2023 10:51:03 +0000 Subject: [PATCH 1/4] Convert AMQP 1.0 props and app props to AMQP 0.9.1 props and headers - Timestamps are milliseconds in AMQP 1.0, but in AMQP 0.9.1 it is seconds. Fixed by multiplying the timestamp by 1 000. - Shovel crashed if user_id was set in the message because the encoding was as utf8 while it should be a byte array. - Negative integers were encoded as integers - therefore leading to incorrect positive values. - Float values were not supported by the client. - Fixed priority header encoding in AMQP 1.0. It was set as uint but it should be ubyte. - Priority of the message is now in the Headers instead of Application Properties. This is potentially a breaking change. Fixes: #7508 (cherry picked from commit 8e954ff366db988ab08cdc194fe804b16420c3cb) --- deps/amqp10_client/src/amqp10_msg.erl | 6 + .../src/rabbit_amqp10_shovel.erl | 71 +++++++- .../test/amqp10_dynamic_SUITE.erl | 166 +++++++++++++++++- 3 files changed, 238 insertions(+), 5 deletions(-) diff --git a/deps/amqp10_client/src/amqp10_msg.erl b/deps/amqp10_client/src/amqp10_msg.erl index 91a7efebe329..0e512ec6b1b1 100644 --- a/deps/amqp10_client/src/amqp10_msg.erl +++ b/deps/amqp10_client/src/amqp10_msg.erl @@ -342,6 +342,8 @@ set_properties(Props, #amqp10_msg{properties = Current} = Msg) -> Acc#'v1_0.properties'{message_id = utf8(V)}; (user_id, V, Acc) when is_binary(V) -> Acc#'v1_0.properties'{user_id = {binary, V}}; + (user_id, V, Acc) when is_binary(V) orelse is_list(V) -> + Acc#'v1_0.properties'{user_id = binary(V)}; (to, V, Acc) -> Acc#'v1_0.properties'{to = utf8(V)}; (subject, V, Acc) -> @@ -422,6 +424,8 @@ wrap_ap_value(true) -> {boolean, true}; wrap_ap_value(false) -> {boolean, false}; +wrap_ap_value(F) when is_float(F) -> + {double, F}; wrap_ap_value(V) when is_binary(V) -> utf8(V); wrap_ap_value(V) when is_list(V) -> @@ -472,6 +476,8 @@ utf8(V) -> amqp10_client_types:utf8(V). sym(B) when is_list(B) -> {symbol, list_to_binary(B)}; sym(B) when is_binary(B) -> {symbol, B}. uint(B) -> {uint, B}. +binary(B) when is_binary(B) -> {binary, B}; +binary(B) when is_list(B) -> {binary, erlang:list_to_binary(B)}. has_value(undefined) -> false; has_value(_) -> true. diff --git a/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl index eafe5e15a1ff..6a5daadcd73d 100644 --- a/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl +++ b/deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl @@ -182,7 +182,8 @@ dest_endpoint(#{shovel_type := dynamic, handle_source({amqp10_msg, _LinkRef, Msg}, State) -> Tag = amqp10_msg:delivery_id(Msg), Payload = amqp10_msg:body_bin(Msg), - rabbit_shovel_behaviour:forward(Tag, #{}, Payload, State); + Props = props_to_map(Msg), + rabbit_shovel_behaviour:forward(Tag, Props, Payload, State); handle_source({amqp10_event, {connection, Conn, opened}}, State = #{source := #{current := #{conn := Conn}}}) -> State; @@ -380,7 +381,9 @@ add_forward_headers(_, Msg) -> Msg. set_message_properties(Props, Msg) -> %% this is effectively special handling properties from amqp 0.9.1 maps:fold( - fun(content_type, Ct, M) -> + fun(_Key, undefined, M) -> + M; + (content_type, Ct, M) -> amqp10_msg:set_properties( #{content_type => to_binary(Ct)}, M); (content_encoding, Ct, M) -> @@ -400,8 +403,8 @@ set_message_properties(Props, Msg) -> (message_id, Ct, M) -> amqp10_msg:set_properties(#{message_id => to_binary(Ct)}, M); (timestamp, Ct, M) -> - amqp10_msg:set_properties(#{creation_time => Ct}, M); - (user_id, Ct, M) -> + amqp10_msg:set_properties(#{creation_time => timestamp_091_to_10(Ct)}, M); + (user_id, Ct, M) when Ct =/= undefined -> amqp10_msg:set_properties(#{user_id => Ct}, M); (headers, Headers0, M) when is_list(Headers0) -> %% AMPQ 0.9.1 are added as applicatin properties @@ -443,3 +446,63 @@ is_amqp10_compat(T) -> %% TODO: not all lists are compatible is_list(T) orelse is_boolean(T). + +to_amqp091_compatible_value(Key, Value) when is_binary(Value) -> + {Key, longstr, Value}; +to_amqp091_compatible_value(Key, Value) when is_integer(Value) -> + {Key, long, Value}; +to_amqp091_compatible_value(Key, Value) when is_float(Value) -> + {Key, double, Value}; +to_amqp091_compatible_value(Key, true) -> + {Key, bool, true}; +to_amqp091_compatible_value(Key, false) -> + {Key, bool, false}; +to_amqp091_compatible_value(_Key, _Value) -> + undefined. + +delivery_mode(Headers) -> + case maps:get(durable, Headers, undefined) of + undefined -> undefined; + true -> 2; + false -> 1 + end. + +timestamp_10_to_091(T) when is_integer(T) -> + trunc(T / 1000); +timestamp_10_to_091(_) -> + undefined. + +timestamp_091_to_10(T) when is_integer(T) -> + T * 1000; +timestamp_091_to_10(_Value) -> + undefined. + +ttl(T) when is_integer(T) -> + erlang:integer_to_binary(T); +ttl(_T) -> undefined. + +props_to_map(Msg) -> + AppProps = amqp10_msg:application_properties(Msg), + AppProps091Headers = lists:filtermap(fun({K, V}) -> + case to_amqp091_compatible_value(K, V) of + undefined -> + false; + Value -> + {true, Value} + end + end, maps:to_list(AppProps)), + InProps = amqp10_msg:properties(Msg), + Headers = amqp10_msg:headers(Msg), + #{ + headers => AppProps091Headers, + content_type => maps:get(content_type, InProps, undefined), + content_encoding => maps:get(content_encoding, InProps, undefined), + delivery_mode => delivery_mode(Headers), + priority => maps:get(priority, Headers, undefined), + correlation_id => maps:get(correlation_id, InProps, undefined), + reply_to => maps:get(reply_to, InProps, undefined), + expiration => ttl(maps:get(ttl, Headers, undefined)), + message_id => maps:get(message_id, InProps, undefined), + timestamp => timestamp_10_to_091(maps:get(creation_time, InProps, undefined)), + user_id => maps:get(user_id, InProps, undefined) + }. diff --git a/deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl b/deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl index 18b5ef3595e6..1198b81bb3db 100644 --- a/deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl +++ b/deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl @@ -27,7 +27,9 @@ groups() -> autodelete_amqp091_dest_on_confirm, autodelete_amqp091_dest_on_publish, simple_amqp10_dest, - simple_amqp10_src + simple_amqp10_src, + message_prop_conversion, + message_prop_conversion_no_props ]}, {with_map_config, [], [ simple, @@ -169,6 +171,168 @@ simple_amqp10_src(Config) -> ok end). +message_prop_conversion(Config) -> + MapConfig = ?config(map_config, Config), + Src = ?config(srcq, Config), + Dest = ?config(destq, Config), + with_session(Config, + fun (Sess) -> + shovel_test_utils:set_param( + Config, + <<"test">>, [{<<"src-protocol">>, <<"amqp10">>}, + {<<"src-address">>, Src}, + {<<"dest-protocol">>, <<"amqp091">>}, + {<<"dest-queue">>, Dest}, + {<<"add-forward-headers">>, true}, + {<<"dest-add-timestamp-header">>, true}, + {<<"publish-properties">>, + case MapConfig of + true -> #{<<"cluster_id">> => <<"x">>}; + _ -> [{<<"cluster_id">>, <<"x">>}] + end} + ]), + LinkName = <<"dynamic-sender-", Dest/binary>>, + Tag = <<"tag1">>, + Payload = <<"payload">>, + {ok, Sender} = amqp10_client:attach_sender_link(Sess, LinkName, Src, + unsettled, unsettled_state), + ok = await_amqp10_event(link, Sender, attached), + Headers = #{durable => true, priority => 3, ttl => 180000}, + Msg = amqp10_msg:set_headers(Headers, + amqp10_msg:new(Tag, Payload, false)), + Msg2 = amqp10_msg:set_properties(#{ + message_id => <<"message-id">>, + user_id => <<"guest">>, + to => <<"to">>, + subject => <<"subject">>, + reply_to => <<"reply-to">>, + correlation_id => <<"correlation-id">>, + content_type => <<"content-type">>, + content_encoding => <<"content-encoding">>, + %absolute_expiry_time => 123456789, + creation_time => 123456789, + group_id => <<"group-id">>, + group_sequence => 123, + reply_to_group_id => <<"reply-to-group-id">> + }, Msg), + Msg3 = amqp10_msg:set_application_properties(#{ + <<"x-binary">> => <<"binary">>, + <<"x-int">> => 33, + <<"x-negative-int">> => -33, + <<"x-float">> => 1.3, + <<"x-true">> => true, + <<"x-false">> => false + }, Msg2), + ok = amqp10_client:send_msg(Sender, Msg3), + receive + {amqp10_disposition, {accepted, Tag}} -> ok + after 3000 -> + exit(publish_disposition_not_received) + end, + amqp10_client:detach_link(Sender), + Channel = rabbit_ct_client_helpers:open_channel(Config), + {#'basic.get_ok'{}, #amqp_msg{payload = Payload, props = #'P_basic'{ + content_type = ReceivedContentType, + content_encoding = ReceivedContentEncoding, + headers = Headers2, + delivery_mode = ReceivedDeliveryMode, + priority = ReceivedPriority, + correlation_id = ReceivedCorrelationId, + reply_to = ReceivedReplyTo, + expiration = ReceivedExpiration, + message_id = ReceivedMessageId, + timestamp = ReceivedTimestamp, + type = _ReceivedType, + user_id = ReceivedUserId, + app_id = _ReceivedAppId, + cluster_id = _ReceivedClusterId + }}} = amqp_channel:call(Channel, #'basic.get'{queue = Dest, no_ack = true}), + + ?assertEqual(<<"payload">>, Payload), + ?assertEqual(2, ReceivedDeliveryMode), + ?assertEqual({longstr, <<"binary">>}, rabbit_misc:table_lookup(Headers2, <<"x-binary">>)), + ?assertEqual({long, 33}, rabbit_misc:table_lookup(Headers2, <<"x-int">>)), + ?assertEqual({long, -33}, rabbit_misc:table_lookup(Headers2, <<"x-negative-int">>)), + ?assertEqual({double, 1.3}, rabbit_misc:table_lookup(Headers2, <<"x-float">>)), + ?assertEqual({bool, true}, rabbit_misc:table_lookup(Headers2, <<"x-true">>)), + ?assertEqual({bool, false}, rabbit_misc:table_lookup(Headers2, <<"x-false">>)), + + ?assertEqual(<<"content-type">>, ReceivedContentType), + ?assertEqual(<<"content-encoding">>, ReceivedContentEncoding), + + ?assertEqual(3, ReceivedPriority), + ?assertEqual(<<"correlation-id">>, ReceivedCorrelationId), + ?assertEqual(<<"reply-to">>, ReceivedReplyTo), + ?assertEqual(<<"180000">>, ReceivedExpiration), + ?assertEqual(<<"message-id">>, ReceivedMessageId), + ?assertEqual(123456, ReceivedTimestamp), % timestamp is divided by 1 000 + ?assertEqual(<<"guest">>, ReceivedUserId), + ok + end). + +message_prop_conversion_no_props(Config) -> + MapConfig = ?config(map_config, Config), + Src = ?config(srcq, Config), + Dest = ?config(destq, Config), + with_session(Config, + fun (Sess) -> + shovel_test_utils:set_param( + Config, + <<"test">>, [{<<"src-protocol">>, <<"amqp10">>}, + {<<"src-address">>, Src}, + {<<"dest-protocol">>, <<"amqp091">>}, + {<<"dest-queue">>, Dest}, + {<<"add-forward-headers">>, true}, + {<<"dest-add-timestamp-header">>, true}, + {<<"publish-properties">>, + case MapConfig of + true -> #{<<"cluster_id">> => <<"x">>}; + _ -> [{<<"cluster_id">>, <<"x">>}] + end} + ]), + LinkName = <<"dynamic-sender-", Dest/binary>>, + Tag = <<"tag1">>, + Payload = <<"payload">>, + {ok, Sender} = amqp10_client:attach_sender_link(Sess, LinkName, Src, + unsettled, unsettled_state), + ok = await_amqp10_event(link, Sender, attached), + Msg = amqp10_msg:new(Tag, Payload, false), + ok = amqp10_client:send_msg(Sender, Msg), + receive + {amqp10_disposition, {accepted, Tag}} -> ok + after 3000 -> + exit(publish_disposition_not_received) + end, + amqp10_client:detach_link(Sender), + Channel = rabbit_ct_client_helpers:open_channel(Config), + {#'basic.get_ok'{}, #amqp_msg{payload = ReceivedPayload, props = #'P_basic'{ + content_type = undefined, + content_encoding = undefined, + headers = ReceivedHeaders, + delivery_mode = ReceivedDeliveryMode, + priority = ReceivedPriority, + correlation_id = undefined, + reply_to = undefined, + expiration = undefined, + message_id = undefined, + timestamp = undefined, + type = undefined, + user_id = undefined, + app_id = undefined, + cluster_id = ReceivedClusterId + }}} = amqp_channel:call(Channel, #'basic.get'{queue = Dest, no_ack = true}), + + ?assertEqual(<<"payload">>, ReceivedPayload), + ?assertEqual(1, ReceivedDeliveryMode), + ?assertEqual(<<"x">>, ReceivedClusterId), + ?assertEqual(4, ReceivedPriority), + + ?assertNotEqual(undefined, rabbit_misc:table_lookup(ReceivedHeaders, <<"x-shovelled">>)), + + ok + end). + + change_definition(Config) -> Src = ?config(srcq, Config), Dest = ?config(destq, Config), From cad6143c62979d944ec1bc5eac5c384f59a75a76 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Tue, 16 Jul 2024 13:45:26 -0400 Subject: [PATCH 2/4] AMQP 1.0 Erlang client: drop a redundant match head --- deps/amqp10_client/src/amqp10_msg.erl | 2 -- 1 file changed, 2 deletions(-) diff --git a/deps/amqp10_client/src/amqp10_msg.erl b/deps/amqp10_client/src/amqp10_msg.erl index 0e512ec6b1b1..ad3159e45aaa 100644 --- a/deps/amqp10_client/src/amqp10_msg.erl +++ b/deps/amqp10_client/src/amqp10_msg.erl @@ -340,8 +340,6 @@ set_properties(Props, #amqp10_msg{properties = Current} = Msg) -> (message_id, V, Acc) when is_binary(V) -> %% backward compat clause Acc#'v1_0.properties'{message_id = utf8(V)}; - (user_id, V, Acc) when is_binary(V) -> - Acc#'v1_0.properties'{user_id = {binary, V}}; (user_id, V, Acc) when is_binary(V) orelse is_list(V) -> Acc#'v1_0.properties'{user_id = binary(V)}; (to, V, Acc) -> From 3c3ccb8bc1d05e6a789683c1d19021180cb758cc Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Wed, 17 Jul 2024 11:20:55 +0100 Subject: [PATCH 3/4] Shovel: fix some test assertions. In line with internal RabbitMQ behaviour AMQP defaults to durable messages. --- .../test/amqp10_dynamic_SUITE.erl | 45 +++++++++++-------- deps/rabbitmq_shovel/test/dynamic_SUITE.erl | 1 + 2 files changed, 28 insertions(+), 18 deletions(-) diff --git a/deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl b/deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl index 1198b81bb3db..a9ad497a3c21 100644 --- a/deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl +++ b/deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl @@ -9,6 +9,9 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). + +-compile(nowarn_export_all). -compile(export_all). all() -> @@ -305,29 +308,35 @@ message_prop_conversion_no_props(Config) -> end, amqp10_client:detach_link(Sender), Channel = rabbit_ct_client_helpers:open_channel(Config), - {#'basic.get_ok'{}, #amqp_msg{payload = ReceivedPayload, props = #'P_basic'{ - content_type = undefined, - content_encoding = undefined, - headers = ReceivedHeaders, - delivery_mode = ReceivedDeliveryMode, - priority = ReceivedPriority, - correlation_id = undefined, - reply_to = undefined, - expiration = undefined, - message_id = undefined, - timestamp = undefined, - type = undefined, - user_id = undefined, - app_id = undefined, - cluster_id = ReceivedClusterId - }}} = amqp_channel:call(Channel, #'basic.get'{queue = Dest, no_ack = true}), + {#'basic.get_ok'{}, + #amqp_msg{payload = ReceivedPayload, + props = #'P_basic'{ + content_type = undefined, + content_encoding = undefined, + headers = ReceivedHeaders, + delivery_mode = ReceivedDeliveryMode, + priority = ReceivedPriority, + correlation_id = undefined, + reply_to = undefined, + expiration = undefined, + message_id = undefined, + timestamp = undefined, + type = undefined, + user_id = undefined, + app_id = undefined, + cluster_id = ReceivedClusterId + }}} = amqp_channel:call(Channel, #'basic.get'{queue = Dest, + no_ack = true}), ?assertEqual(<<"payload">>, ReceivedPayload), - ?assertEqual(1, ReceivedDeliveryMode), + %% in 4.0 the default durability is durable=true for AMQP + %% messages + ?assertEqual(2, ReceivedDeliveryMode), ?assertEqual(<<"x">>, ReceivedClusterId), ?assertEqual(4, ReceivedPriority), - ?assertNotEqual(undefined, rabbit_misc:table_lookup(ReceivedHeaders, <<"x-shovelled">>)), + ?assertNotEqual(undefined, rabbit_misc:table_lookup(ReceivedHeaders, + <<"x-shovelled">>)), ok end). diff --git a/deps/rabbitmq_shovel/test/dynamic_SUITE.erl b/deps/rabbitmq_shovel/test/dynamic_SUITE.erl index 6c7846c44d24..be68840cb70d 100644 --- a/deps/rabbitmq_shovel/test/dynamic_SUITE.erl +++ b/deps/rabbitmq_shovel/test/dynamic_SUITE.erl @@ -10,6 +10,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). +-compile(nowarn_export_all). -compile(export_all). -export([spawn_suspender_proc/1]). From eb29656d6f30e150dc6a36b3d54eb71c329dd531 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Wed, 17 Jul 2024 06:45:39 -0400 Subject: [PATCH 4/4] Shovel amqp10_dynamic_SUITE: Add a missing header --- deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl | 2 -- 1 file changed, 2 deletions(-) diff --git a/deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl b/deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl index a9ad497a3c21..f074ae3b79f6 100644 --- a/deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl +++ b/deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl @@ -10,8 +10,6 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). - --compile(nowarn_export_all). -compile(export_all). all() ->