Skip to content

Commit

Permalink
Convert AMQP 1.0 props and app props to AMQP 0.9.1 props and headers
Browse files Browse the repository at this point in the history
- Timestamps are milliseconds in AMQP 1.0, but in AMQP 0.9.1 it is seconds.
  Fixed by multiplying the timestamp by 1 000.
- Shovel crashed if user_id was set in the message because the encoding
  was as utf8 while it should be a byte array.
- Negative integers were encoded as integers - therefore leading to
  incorrect positive values.
- Float values were not supported by the client.
- Fixed priority header encoding in AMQP 1.0. It was set as uint but it
  should be ubyte.
- Priority of the message is now in the Headers instead of Application
  Properties. This is potentially a breaking change.

Fixes: #7508
(cherry picked from commit 8e954ff)

# Conflicts:
#	deps/amqp10_client/src/amqp10_msg.erl
#	deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl
  • Loading branch information
olikasg authored and mergify[bot] committed Jul 16, 2024
1 parent 2679649 commit 78ec03a
Show file tree
Hide file tree
Showing 3 changed files with 251 additions and 5 deletions.
16 changes: 16 additions & 0 deletions deps/amqp10_client/src/amqp10_msg.erl
Original file line number Diff line number Diff line change
Expand Up @@ -340,8 +340,13 @@ set_properties(Props, #amqp10_msg{properties = Current} = Msg) ->
(message_id, V, Acc) when is_binary(V) ->
%% backward compat clause
Acc#'v1_0.properties'{message_id = utf8(V)};
<<<<<<< HEAD
(user_id, V, Acc) when is_binary(V) ->
Acc#'v1_0.properties'{user_id = {binary, V}};
=======
(user_id, V, Acc) when is_binary(V) orelse is_list(V) ->
Acc#'v1_0.properties'{user_id = binary(V)};
>>>>>>> 8e954ff366 (Convert AMQP 1.0 props and app props to AMQP 0.9.1 props and headers)
(to, V, Acc) ->
Acc#'v1_0.properties'{to = utf8(V)};
(subject, V, Acc) ->
Expand Down Expand Up @@ -422,6 +427,15 @@ wrap_ap_value(true) ->
{boolean, true};
wrap_ap_value(false) ->
{boolean, false};
<<<<<<< HEAD
=======
wrap_ap_value(V) when is_integer(V) andalso V >= 0 ->
{uint, V};
wrap_ap_value(V) when is_integer(V) andalso V < 0 ->
{int, V};
wrap_ap_value(F) when is_float(F) ->
{double, F};
>>>>>>> 8e954ff366 (Convert AMQP 1.0 props and app props to AMQP 0.9.1 props and headers)
wrap_ap_value(V) when is_binary(V) ->
utf8(V);
wrap_ap_value(V) when is_list(V) ->
Expand Down Expand Up @@ -472,6 +486,8 @@ utf8(V) -> amqp10_client_types:utf8(V).
sym(B) when is_list(B) -> {symbol, list_to_binary(B)};
sym(B) when is_binary(B) -> {symbol, B}.
uint(B) -> {uint, B}.
binary(B) when is_binary(B) -> {binary, B};
binary(B) when is_list(B) -> {binary, erlang:list_to_binary(B)}.

has_value(undefined) -> false;
has_value(_) -> true.
74 changes: 70 additions & 4 deletions deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ dest_endpoint(#{shovel_type := dynamic,
handle_source({amqp10_msg, _LinkRef, Msg}, State) ->
Tag = amqp10_msg:delivery_id(Msg),
Payload = amqp10_msg:body_bin(Msg),
rabbit_shovel_behaviour:forward(Tag, #{}, Payload, State);
Props = props_to_map(Msg),
rabbit_shovel_behaviour:forward(Tag, Props, Payload, State);
handle_source({amqp10_event, {connection, Conn, opened}},
State = #{source := #{current := #{conn := Conn}}}) ->
State;
Expand Down Expand Up @@ -380,17 +381,22 @@ add_forward_headers(_, Msg) -> Msg.
set_message_properties(Props, Msg) ->
%% this is effectively special handling properties from amqp 0.9.1
maps:fold(
fun(content_type, Ct, M) ->
fun(_Key, undefined, M) ->
M;
(content_type, Ct, M) ->
amqp10_msg:set_properties(
#{content_type => to_binary(Ct)}, M);
(content_encoding, Ct, M) ->
amqp10_msg:set_properties(
#{content_encoding => to_binary(Ct)}, M);
(delivery_mode, 2, M) ->
amqp10_msg:set_headers(#{durable => true}, M);
<<<<<<< HEAD
(delivery_mode, 1, M) ->
% by default the durable flag is false
M;
=======
>>>>>>> 8e954ff366 (Convert AMQP 1.0 props and app props to AMQP 0.9.1 props and headers)
(priority, P, M) when is_integer(P) ->
amqp10_msg:set_headers(#{priority => P}, M);
(correlation_id, Ct, M) ->
Expand All @@ -400,8 +406,8 @@ set_message_properties(Props, Msg) ->
(message_id, Ct, M) ->
amqp10_msg:set_properties(#{message_id => to_binary(Ct)}, M);
(timestamp, Ct, M) ->
amqp10_msg:set_properties(#{creation_time => Ct}, M);
(user_id, Ct, M) ->
amqp10_msg:set_properties(#{creation_time => timestamp_091_to_10(Ct)}, M);
(user_id, Ct, M) when Ct =/= undefined ->
amqp10_msg:set_properties(#{user_id => Ct}, M);
(headers, Headers0, M) when is_list(Headers0) ->
%% AMPQ 0.9.1 are added as applicatin properties
Expand Down Expand Up @@ -443,3 +449,63 @@ is_amqp10_compat(T) ->
%% TODO: not all lists are compatible
is_list(T) orelse
is_boolean(T).

to_amqp091_compatible_value(Key, Value) when is_binary(Value) ->
{Key, longstr, Value};
to_amqp091_compatible_value(Key, Value) when is_integer(Value) ->
{Key, long, Value};
to_amqp091_compatible_value(Key, Value) when is_float(Value) ->
{Key, double, Value};
to_amqp091_compatible_value(Key, true) ->
{Key, bool, true};
to_amqp091_compatible_value(Key, false) ->
{Key, bool, false};
to_amqp091_compatible_value(_Key, _Value) ->
undefined.

delivery_mode(Headers) ->
case maps:get(durable, Headers, undefined) of
undefined -> undefined;
true -> 2;
false -> 1
end.

timestamp_10_to_091(T) when is_integer(T) ->
trunc(T / 1000);
timestamp_10_to_091(_) ->
undefined.

timestamp_091_to_10(T) when is_integer(T) ->
T * 1000;
timestamp_091_to_10(_Value) ->
undefined.

ttl(T) when is_integer(T) ->
erlang:integer_to_binary(T);
ttl(_T) -> undefined.

props_to_map(Msg) ->
AppProps = amqp10_msg:application_properties(Msg),
AppProps091Headers = lists:filtermap(fun({K, V}) ->
case to_amqp091_compatible_value(K, V) of
undefined ->
false;
Value ->
{true, Value}
end
end, maps:to_list(AppProps)),
InProps = amqp10_msg:properties(Msg),
Headers = amqp10_msg:headers(Msg),
#{
headers => AppProps091Headers,
content_type => maps:get(content_type, InProps, undefined),
content_encoding => maps:get(content_encoding, InProps, undefined),
delivery_mode => delivery_mode(Headers),
priority => maps:get(priority, Headers, undefined),
correlation_id => maps:get(correlation_id, InProps, undefined),
reply_to => maps:get(reply_to, InProps, undefined),
expiration => ttl(maps:get(ttl, Headers, undefined)),
message_id => maps:get(message_id, InProps, undefined),
timestamp => timestamp_10_to_091(maps:get(creation_time, InProps, undefined)),
user_id => maps:get(user_id, InProps, undefined)
}.
166 changes: 165 additions & 1 deletion deps/rabbitmq_shovel/test/amqp10_dynamic_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ groups() ->
autodelete_amqp091_dest_on_confirm,
autodelete_amqp091_dest_on_publish,
simple_amqp10_dest,
simple_amqp10_src
simple_amqp10_src,
message_prop_conversion,
message_prop_conversion_no_props
]},
{with_map_config, [], [
simple,
Expand Down Expand Up @@ -169,6 +171,168 @@ simple_amqp10_src(Config) ->
ok
end).

message_prop_conversion(Config) ->
MapConfig = ?config(map_config, Config),
Src = ?config(srcq, Config),
Dest = ?config(destq, Config),
with_session(Config,
fun (Sess) ->
shovel_test_utils:set_param(
Config,
<<"test">>, [{<<"src-protocol">>, <<"amqp10">>},
{<<"src-address">>, Src},
{<<"dest-protocol">>, <<"amqp091">>},
{<<"dest-queue">>, Dest},
{<<"add-forward-headers">>, true},
{<<"dest-add-timestamp-header">>, true},
{<<"publish-properties">>,
case MapConfig of
true -> #{<<"cluster_id">> => <<"x">>};
_ -> [{<<"cluster_id">>, <<"x">>}]
end}
]),
LinkName = <<"dynamic-sender-", Dest/binary>>,
Tag = <<"tag1">>,
Payload = <<"payload">>,
{ok, Sender} = amqp10_client:attach_sender_link(Sess, LinkName, Src,
unsettled, unsettled_state),
ok = await_amqp10_event(link, Sender, attached),
Headers = #{durable => true, priority => 3, ttl => 180000},
Msg = amqp10_msg:set_headers(Headers,
amqp10_msg:new(Tag, Payload, false)),
Msg2 = amqp10_msg:set_properties(#{
message_id => <<"message-id">>,
user_id => <<"guest">>,
to => <<"to">>,
subject => <<"subject">>,
reply_to => <<"reply-to">>,
correlation_id => <<"correlation-id">>,
content_type => <<"content-type">>,
content_encoding => <<"content-encoding">>,
%absolute_expiry_time => 123456789,
creation_time => 123456789,
group_id => <<"group-id">>,
group_sequence => 123,
reply_to_group_id => <<"reply-to-group-id">>
}, Msg),
Msg3 = amqp10_msg:set_application_properties(#{
<<"x-binary">> => <<"binary">>,
<<"x-int">> => 33,
<<"x-negative-int">> => -33,
<<"x-float">> => 1.3,
<<"x-true">> => true,
<<"x-false">> => false
}, Msg2),
ok = amqp10_client:send_msg(Sender, Msg3),
receive
{amqp10_disposition, {accepted, Tag}} -> ok
after 3000 ->
exit(publish_disposition_not_received)
end,
amqp10_client:detach_link(Sender),
Channel = rabbit_ct_client_helpers:open_channel(Config),
{#'basic.get_ok'{}, #amqp_msg{payload = Payload, props = #'P_basic'{
content_type = ReceivedContentType,
content_encoding = ReceivedContentEncoding,
headers = Headers2,
delivery_mode = ReceivedDeliveryMode,
priority = ReceivedPriority,
correlation_id = ReceivedCorrelationId,
reply_to = ReceivedReplyTo,
expiration = ReceivedExpiration,
message_id = ReceivedMessageId,
timestamp = ReceivedTimestamp,
type = _ReceivedType,
user_id = ReceivedUserId,
app_id = _ReceivedAppId,
cluster_id = _ReceivedClusterId
}}} = amqp_channel:call(Channel, #'basic.get'{queue = Dest, no_ack = true}),

?assertEqual(<<"payload">>, Payload),
?assertEqual(2, ReceivedDeliveryMode),
?assertEqual({longstr, <<"binary">>}, rabbit_misc:table_lookup(Headers2, <<"x-binary">>)),
?assertEqual({long, 33}, rabbit_misc:table_lookup(Headers2, <<"x-int">>)),
?assertEqual({long, -33}, rabbit_misc:table_lookup(Headers2, <<"x-negative-int">>)),
?assertEqual({double, 1.3}, rabbit_misc:table_lookup(Headers2, <<"x-float">>)),
?assertEqual({bool, true}, rabbit_misc:table_lookup(Headers2, <<"x-true">>)),
?assertEqual({bool, false}, rabbit_misc:table_lookup(Headers2, <<"x-false">>)),

?assertEqual(<<"content-type">>, ReceivedContentType),
?assertEqual(<<"content-encoding">>, ReceivedContentEncoding),

?assertEqual(3, ReceivedPriority),
?assertEqual(<<"correlation-id">>, ReceivedCorrelationId),
?assertEqual(<<"reply-to">>, ReceivedReplyTo),
?assertEqual(<<"180000">>, ReceivedExpiration),
?assertEqual(<<"message-id">>, ReceivedMessageId),
?assertEqual(123456, ReceivedTimestamp), % timestamp is divided by 1 000
?assertEqual(<<"guest">>, ReceivedUserId),
ok
end).

message_prop_conversion_no_props(Config) ->
MapConfig = ?config(map_config, Config),
Src = ?config(srcq, Config),
Dest = ?config(destq, Config),
with_session(Config,
fun (Sess) ->
shovel_test_utils:set_param(
Config,
<<"test">>, [{<<"src-protocol">>, <<"amqp10">>},
{<<"src-address">>, Src},
{<<"dest-protocol">>, <<"amqp091">>},
{<<"dest-queue">>, Dest},
{<<"add-forward-headers">>, true},
{<<"dest-add-timestamp-header">>, true},
{<<"publish-properties">>,
case MapConfig of
true -> #{<<"cluster_id">> => <<"x">>};
_ -> [{<<"cluster_id">>, <<"x">>}]
end}
]),
LinkName = <<"dynamic-sender-", Dest/binary>>,
Tag = <<"tag1">>,
Payload = <<"payload">>,
{ok, Sender} = amqp10_client:attach_sender_link(Sess, LinkName, Src,
unsettled, unsettled_state),
ok = await_amqp10_event(link, Sender, attached),
Msg = amqp10_msg:new(Tag, Payload, false),
ok = amqp10_client:send_msg(Sender, Msg),
receive
{amqp10_disposition, {accepted, Tag}} -> ok
after 3000 ->
exit(publish_disposition_not_received)
end,
amqp10_client:detach_link(Sender),
Channel = rabbit_ct_client_helpers:open_channel(Config),
{#'basic.get_ok'{}, #amqp_msg{payload = ReceivedPayload, props = #'P_basic'{
content_type = undefined,
content_encoding = undefined,
headers = ReceivedHeaders,
delivery_mode = ReceivedDeliveryMode,
priority = ReceivedPriority,
correlation_id = undefined,
reply_to = undefined,
expiration = undefined,
message_id = undefined,
timestamp = undefined,
type = undefined,
user_id = undefined,
app_id = undefined,
cluster_id = ReceivedClusterId
}}} = amqp_channel:call(Channel, #'basic.get'{queue = Dest, no_ack = true}),

?assertEqual(<<"payload">>, ReceivedPayload),
?assertEqual(1, ReceivedDeliveryMode),
?assertEqual(<<"x">>, ReceivedClusterId),
?assertEqual(4, ReceivedPriority),

?assertNotEqual(undefined, rabbit_misc:table_lookup(ReceivedHeaders, <<"x-shovelled">>)),

ok
end).


change_definition(Config) ->
Src = ?config(srcq, Config),
Dest = ?config(destq, Config),
Expand Down

0 comments on commit 78ec03a

Please sign in to comment.