diff --git a/deps/rabbit/BUILD.bazel b/deps/rabbit/BUILD.bazel index 6e119b630a82..b1ed8425b28b 100644 --- a/deps/rabbit/BUILD.bazel +++ b/deps/rabbit/BUILD.bazel @@ -197,17 +197,18 @@ rabbitmq_app( "syntax_tools", "xmerl", "crypto", + "horus", ], license_files = [":license_files"], priv = [":priv"], deps = [ "//deps/amqp10_common:erlang_app", + "//deps/khepri:erlang_app", + "//deps/khepri_mnesia_migration:erlang_app", "//deps/rabbit_common:erlang_app", "//deps/rabbitmq_prelaunch:erlang_app", "@cuttlefish//:erlang_app", "@gen_batch_server//:erlang_app", - "@khepri//:erlang_app", - "@khepri_mnesia_migration//:erlang_app", "@observer_cli//:erlang_app", "@osiris//:erlang_app", "@ra//:erlang_app", diff --git a/deps/rabbit/app.bzl b/deps/rabbit/app.bzl index 0b5f06685fda..80cee9da1080 100644 --- a/deps/rabbit/app.bzl +++ b/deps/rabbit/app.bzl @@ -248,9 +248,9 @@ def all_beam_files(name = "all_beam_files"): erlc_opts = "//:erlc_opts", deps = [ "//deps/amqp10_common:erlang_app", + "//deps/khepri:erlang_app", + "//deps/khepri_mnesia_migration:erlang_app", "//deps/rabbit_common:erlang_app", - "@khepri//:erlang_app", - "@khepri_mnesia_migration//:erlang_app", "@ra//:erlang_app", "@ranch//:erlang_app", "@stdout_formatter//:erlang_app", @@ -507,9 +507,9 @@ def all_test_beam_files(name = "all_test_beam_files"): erlc_opts = "//:test_erlc_opts", deps = [ "//deps/amqp10_common:erlang_app", + "//deps/khepri:erlang_app", + "//deps/khepri_mnesia_migration:erlang_app", "//deps/rabbit_common:erlang_app", - "@khepri//:erlang_app", - "@khepri_mnesia_migration//:erlang_app", "@ra//:erlang_app", "@ranch//:erlang_app", "@stdout_formatter//:erlang_app", @@ -2012,7 +2012,7 @@ def test_suite_beam_files(name = "test_suite_beam_files"): outs = ["test/metadata_store_phase1_SUITE.beam"], app_name = "rabbit", erlc_opts = "//:test_erlc_opts", - deps = ["//deps/rabbit_common:erlang_app", "@khepri//:erlang_app"], + deps = ["//deps/khepri:erlang_app", "//deps/rabbit_common:erlang_app"], ) erlang_bytecode( name = "mc_unit_SUITE_beam_files", diff --git a/deps/rabbitmq_consistent_hash_exchange/BUILD.bazel b/deps/rabbitmq_consistent_hash_exchange/BUILD.bazel index 182b31c0656f..a0ab36fa6d9c 100644 --- a/deps/rabbitmq_consistent_hash_exchange/BUILD.bazel +++ b/deps/rabbitmq_consistent_hash_exchange/BUILD.bazel @@ -39,10 +39,10 @@ rabbitmq_app( license_files = [":license_files"], priv = [":priv"], deps = [ + "//deps/khepri:erlang_app", + "//deps/khepri_mnesia_migration:erlang_app", "//deps/rabbit:erlang_app", "//deps/rabbit_common:erlang_app", - "@khepri//:erlang_app", - "@khepri_mnesia_migration//:erlang_app", ], ) diff --git a/deps/rabbitmq_consistent_hash_exchange/app.bzl b/deps/rabbitmq_consistent_hash_exchange/app.bzl index e6a43a75079f..16aa9bc1838c 100644 --- a/deps/rabbitmq_consistent_hash_exchange/app.bzl +++ b/deps/rabbitmq_consistent_hash_exchange/app.bzl @@ -19,11 +19,11 @@ def all_beam_files(name = "all_beam_files"): dest = "ebin", erlc_opts = "//:erlc_opts", deps = [ + "//deps/khepri:erlang_app", + "//deps/khepri_mnesia_migration:erlang_app", "//deps/rabbit:erlang_app", "//deps/rabbit_common:erlang_app", "//deps/rabbitmq_cli:erlang_app", - "@khepri//:erlang_app", - "@khepri_mnesia_migration//:erlang_app", ], ) @@ -47,11 +47,11 @@ def all_test_beam_files(name = "all_test_beam_files"): dest = "test", erlc_opts = "//:test_erlc_opts", deps = [ + "//deps/khepri:erlang_app", + "//deps/khepri_mnesia_migration:erlang_app", "//deps/rabbit:erlang_app", "//deps/rabbit_common:erlang_app", "//deps/rabbitmq_cli:erlang_app", - "@khepri//:erlang_app", - "@khepri_mnesia_migration//:erlang_app", ], ) diff --git a/deps/rabbitmq_ct_client_helpers/BUILD.bazel b/deps/rabbitmq_ct_client_helpers/BUILD.bazel index 8fa9dfa34f41..1141dd990501 100644 --- a/deps/rabbitmq_ct_client_helpers/BUILD.bazel +++ b/deps/rabbitmq_ct_client_helpers/BUILD.bazel @@ -33,6 +33,7 @@ rabbitmq_app( hdrs = [":public_hdrs"], app_name = "rabbitmq_ct_client_helpers", beam_files = [":beam_files"], + extra_apps = ["rabbit_common"], license_files = [":license_files"], priv = [":priv"], deps = [ diff --git a/deps/rabbitmq_ct_client_helpers/MODULE.bazel b/deps/rabbitmq_ct_client_helpers/MODULE.bazel new file mode 100644 index 000000000000..00bb18361f7f --- /dev/null +++ b/deps/rabbitmq_ct_client_helpers/MODULE.bazel @@ -0,0 +1,6 @@ +############################################################################### +# Bazel now uses Bzlmod by default to manage external dependencies. +# Please consider migrating your external dependencies from WORKSPACE to MODULE.bazel. +# +# For more details, please check https://github.com/bazelbuild/bazel/issues/18958 +############################################################################### diff --git a/deps/rabbitmq_ct_helpers/BUILD.bazel b/deps/rabbitmq_ct_helpers/BUILD.bazel index b5167a076972..4c90b5e887de 100644 --- a/deps/rabbitmq_ct_helpers/BUILD.bazel +++ b/deps/rabbitmq_ct_helpers/BUILD.bazel @@ -39,6 +39,12 @@ rabbitmq_app( hdrs = [":public_hdrs"], app_name = "rabbitmq_ct_helpers", beam_files = [":beam_files"], + extra_apps = [ + "common_test", + "eunit", + "inet_tcp_proxy", + "inets", + ], license_files = [":license_files"], priv = [":priv"], deps = [ diff --git a/deps/rabbitmq_jms_topic_exchange/BUILD.bazel b/deps/rabbitmq_jms_topic_exchange/BUILD.bazel index e3e49612b060..2366725c8b7c 100644 --- a/deps/rabbitmq_jms_topic_exchange/BUILD.bazel +++ b/deps/rabbitmq_jms_topic_exchange/BUILD.bazel @@ -44,10 +44,10 @@ rabbitmq_app( license_files = [":license_files"], priv = [":priv"], deps = [ + "//deps/khepri:erlang_app", + "//deps/khepri_mnesia_migration:erlang_app", "//deps/rabbit:erlang_app", "//deps/rabbit_common:erlang_app", - "@khepri//:erlang_app", - "@khepri_mnesia_migration//:erlang_app", ], ) diff --git a/deps/rabbitmq_jms_topic_exchange/app.bzl b/deps/rabbitmq_jms_topic_exchange/app.bzl index 5c73214ef386..879a8434e170 100644 --- a/deps/rabbitmq_jms_topic_exchange/app.bzl +++ b/deps/rabbitmq_jms_topic_exchange/app.bzl @@ -19,10 +19,10 @@ def all_beam_files(name = "all_beam_files"): dest = "ebin", erlc_opts = "//:erlc_opts", deps = [ + "//deps/khepri:erlang_app", + "//deps/khepri_mnesia_migration:erlang_app", "//deps/rabbit:erlang_app", "//deps/rabbit_common:erlang_app", - "@khepri//:erlang_app", - "@khepri_mnesia_migration//:erlang_app", ], ) @@ -46,10 +46,10 @@ def all_test_beam_files(name = "all_test_beam_files"): dest = "test", erlc_opts = "//:test_erlc_opts", deps = [ + "//deps/khepri:erlang_app", + "//deps/khepri_mnesia_migration:erlang_app", "//deps/rabbit:erlang_app", "//deps/rabbit_common:erlang_app", - "@khepri//:erlang_app", - "@khepri_mnesia_migration//:erlang_app", ], ) diff --git a/deps/rabbitmq_mqtt/BUILD.bazel b/deps/rabbitmq_mqtt/BUILD.bazel index 49853b99a788..978f9f35db05 100644 --- a/deps/rabbitmq_mqtt/BUILD.bazel +++ b/deps/rabbitmq_mqtt/BUILD.bazel @@ -34,6 +34,7 @@ APP_ENV = """[ {retained_message_store, rabbit_mqtt_retained_msg_store_dets}, %% only used by DETS store {retained_message_store_dets_sync_interval, 2000}, + {retained_message_store_max_retained_messages_count, 2000}, {prefetch, 10}, {ssl_listeners, []}, {tcp_listeners, [1883]}, diff --git a/deps/rabbitmq_mqtt/Makefile b/deps/rabbitmq_mqtt/Makefile index feb46e65b5c1..3605333cb5d1 100644 --- a/deps/rabbitmq_mqtt/Makefile +++ b/deps/rabbitmq_mqtt/Makefile @@ -12,6 +12,7 @@ define PROJECT_ENV {retained_message_store, rabbit_mqtt_retained_msg_store_dets}, %% only used by DETS store {retained_message_store_dets_sync_interval, 2000}, + {retained_message_store_max_retained_messages_count, 2000}, {prefetch, 10}, {ssl_listeners, []}, {tcp_listeners, [1883]}, @@ -94,7 +95,7 @@ define ct_master.erl halt(0) endef -PARALLEL_CT_SET_1_A = auth retainer +PARALLEL_CT_SET_1_A = auth rabbit_mqtt_retained_msg_store retainer PARALLEL_CT_SET_1_B = cluster command config config_schema mc_mqtt packet_prop \ processor protocol_interop proxy_protocol rabbit_mqtt_confirms reader util PARALLEL_CT_SET_1_C = java v5 diff --git a/deps/rabbitmq_mqtt/app.bzl b/deps/rabbitmq_mqtt/app.bzl index 86830f4f9c7a..925cf98c2b44 100644 --- a/deps/rabbitmq_mqtt/app.bzl +++ b/deps/rabbitmq_mqtt/app.bzl @@ -329,3 +329,12 @@ def test_suite_beam_files(name = "test_suite_beam_files"): erlc_opts = "//:test_erlc_opts", deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_ct_helpers:erlang_app"], ) + erlang_bytecode( + name = "rabbit_mqtt_retained_msg_store_SUITE_beam_files", + testonly = True, + srcs = ["test/rabbit_mqtt_retained_msg_store_SUITE.erl"], + outs = ["test/rabbit_mqtt_retained_msg_store_SUITE.beam"], + hdrs = ["include/rabbit_mqtt_packet.hrl"], + app_name = "rabbitmq_mqtt", + erlc_opts = "//:test_erlc_opts", + ) diff --git a/deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema b/deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema index b69e2b06075c..b28608be1068 100644 --- a/deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema +++ b/deps/rabbitmq_mqtt/priv/schema/rabbitmq_mqtt.schema @@ -87,6 +87,10 @@ end}. {mapping, "mqtt.retained_message_store_dets_sync_interval", "rabbitmq_mqtt.retained_message_store_dets_sync_interval", [{datatype, integer}]}. +%% Limit how many messages are returned by MQTT plugin retained messages store +{mapping, "mqtt.retained_message_store_max_retained_messages_count", "rabbitmq_mqtt.retained_message_store_max_retained_messages_count", + [{datatype, integer}]}. + %% Whether or not to enable proxy protocol support. %% %% {proxy_protocol, false} diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index 97e5edf83101..267fca63330c 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -956,44 +956,52 @@ send_retained_messages(Subscriptions, State) -> -spec send_retained_message(topic_filter(), qos(), state()) -> state(). send_retained_message(TopicFilter0, SubscribeQos, - State0 = #state{packet_id = PacketId0, - cfg = #cfg{retainer_pid = RPid}}) -> + State0 = #state{cfg = #cfg{retainer_pid = RPid}}) -> TopicFilter = amqp_to_mqtt(TopicFilter0), case rabbit_mqtt_retainer:fetch(RPid, TopicFilter) of undefined -> State0; - #mqtt_msg{qos = MsgQos, - retain = Retain, - payload = Payload, - props = Props0} -> - Qos = effective_qos(MsgQos, SubscribeQos), - %% Wildcards are currently not supported when fetching retained - %% messages. Therefore, TopicFilter must must be a topic name. - {Topic, Props, State1} = process_topic_alias_outbound(TopicFilter, Props0, State0), - {PacketId, State} = case Qos of - ?QOS_0 -> - {undefined, State1}; - ?QOS_1 -> - {PacketId0, - State1#state{packet_id = increment_packet_id(PacketId0)}} - end, - Packet = #mqtt_packet{ - fixed = #mqtt_packet_fixed{ - type = ?PUBLISH, - qos = Qos, - dup = false, - retain = Retain - }, - variable = #mqtt_packet_publish{ - packet_id = PacketId, - topic_name = Topic, - props = Props - }, - payload = Payload}, - _ = send(Packet, State), - State + Msgs when is_list(Msgs) -> + lists:foldl( + fun(Msg, S) -> + send_retained_message_to_client(Msg, TopicFilter, SubscribeQos, S) + end, State0, Msgs); + #mqtt_msg{} = SingleMsg -> + send_retained_message_to_client(SingleMsg, TopicFilter, SubscribeQos, State0) end. +send_retained_message_to_client(#mqtt_msg{qos = MsgQos, + retain = Retain, + payload = Payload, + props = Props0}, + TopicFilter, + SubscribeQos, + State0 = #state{packet_id = PacketId0}) -> + Qos = effective_qos(MsgQos, SubscribeQos), + {Topic, Props, State1} = process_topic_alias_outbound(TopicFilter, Props0, State0), + {PacketId, State} = case Qos of + ?QOS_0 -> + {undefined, State1}; + ?QOS_1 -> + {PacketId0, + State1#state{packet_id = increment_packet_id(PacketId0)}} + end, + Packet = #mqtt_packet{ + fixed = #mqtt_packet_fixed{ + type = ?PUBLISH, + qos = Qos, + dup = false, + retain = Retain + }, + variable = #mqtt_packet_publish{ + packet_id = PacketId, + topic_name = Topic, + props = Props + }, + payload = Payload}, + _ = send(Packet, State), + State. + clear_will_msg(#state{cfg = #cfg{vhost = Vhost, client_id = ClientId}} = State) -> QNameBin = rabbit_mqtt_util:queue_name_bin(ClientId, will), diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store.erl index db080b1a736b..b932d1cb23e1 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store.erl @@ -20,103 +20,97 @@ -include("rabbit_mqtt.hrl"). -include("rabbit_mqtt_packet.hrl"). + -include_lib("kernel/include/logger.hrl"). + -export([start/1, insert/3, lookup/2, delete/2, terminate/1]). --export([expire/2]). +-export([expire/2, get_max_retained_messages_count/0]). + -export_type([state/0, expire/0]). -define(STATE, ?MODULE). --record(?STATE, {store_mod :: module(), - store_state :: term()}). --opaque state() :: #?STATE{}. - --type expire() :: #{topic() := - {InsertionTimestamp :: integer(), - MessageExpiryInterval :: pos_integer()}}. --callback new(Directory :: file:name_all(), rabbit_types:vhost()) -> - State :: any(). +-record(?STATE, {store_mod :: module(), store_state :: term()}). --callback recover(Directory :: file:name_all(), rabbit_types:vhost()) -> - {ok, State :: any(), expire()} | - {error, uninitialized}. - --callback insert(topic(), mqtt_msg(), State :: any()) -> - ok. - --callback lookup(topic(), State :: any()) -> - mqtt_msg() | mqtt_msg_v0() | undefined. +-opaque state() :: #?STATE{}. --callback delete(topic(), State :: any()) -> - ok. +-type expire() :: + #{topic() := {InsertionTimestamp :: integer(), MessageExpiryInterval :: pos_integer()}}. --callback terminate(State :: any()) -> - ok. +-callback new(Directory :: file:name_all(), rabbit_types:vhost()) -> State :: any(). +-callback recover(Directory :: file:name_all(), rabbit_types:vhost()) -> + {ok, State :: any(), expire()} | {error, uninitialized}. +-callback insert(topic(), mqtt_msg(), State :: any()) -> ok. +-callback lookup(topic(), State :: any()) -> [mqtt_msg()] | [mqtt_msg_v0()] | []. +-callback delete(topic(), State :: any()) -> ok. +-callback terminate(State :: any()) -> ok. -spec start(rabbit_types:vhost()) -> {state(), expire()}. start(VHost) -> {ok, Mod} = application:get_env(?APP_NAME, retained_message_store), Dir = rabbit:data_dir(), - ?LOG_INFO("Starting MQTT retained message store ~s for vhost '~ts'", - [Mod, VHost]), - {S, Expire} = case Mod:recover(Dir, VHost) of - {ok, StoreState, Expire0} -> - ?LOG_INFO("Recovered MQTT retained message store ~s for vhost '~ts'", - [Mod, VHost]), - {StoreState, Expire0}; - {error, uninitialized} -> - StoreState = Mod:new(Dir, VHost), - ?LOG_INFO("Initialized MQTT retained message store ~s for vhost '~ts'", - [Mod, VHost]), - {StoreState, #{}} - end, - {#?STATE{store_mod = Mod, - store_state = S}, Expire}. + ?LOG_INFO("Starting MQTT retained message store ~s for vhost '~ts'", [Mod, VHost]), + {S, Expire} = + case Mod:recover(Dir, VHost) of + {ok, StoreState, Expire0} -> + ?LOG_INFO("Recovered MQTT retained message store ~s for vhost '~ts'", [Mod, VHost]), + {StoreState, Expire0}; + {error, uninitialized} -> + StoreState = Mod:new(Dir, VHost), + ?LOG_INFO("Initialized MQTT retained message store ~s for vhost '~ts'", + [Mod, VHost]), + {StoreState, #{}} + end, + {#?STATE{store_mod = Mod, store_state = S}, Expire}. -spec insert(topic(), mqtt_msg(), state()) -> ok. -insert(Topic, Msg, #?STATE{store_mod = Mod, - store_state = StoreState}) -> +insert(Topic, Msg, #?STATE{store_mod = Mod, store_state = StoreState}) -> ok = Mod:insert(Topic, Msg, StoreState). --spec lookup(topic(), state()) -> - mqtt_msg() | undefined. -lookup(Topic, #?STATE{store_mod = Mod, - store_state = StoreState}) -> +-spec lookup(topic(), state()) -> [mqtt_msg()] | []. +lookup(Topic, #?STATE{store_mod = Mod, store_state = StoreState}) -> case Mod:lookup(Topic, StoreState) of - OldMsg when is_record(OldMsg, mqtt_msg, 7) -> - convert_mqtt_msg(OldMsg); - Other -> - Other + % Handle list of messages - convert any old format ones + Messages when is_list(Messages) -> + lists:map(fun (Msg) when is_record(Msg, mqtt_msg, 7) -> + convert_mqtt_msg(Msg); + (Msg) -> + Msg + end, + Messages); + undefined -> + []; + [] -> + [] end. -spec delete(topic(), state()) -> ok. -delete(Topic, #?STATE{store_mod = Mod, - store_state = StoreState}) -> +delete(Topic, #?STATE{store_mod = Mod, store_state = StoreState}) -> ok = Mod:delete(Topic, StoreState). -spec terminate(state()) -> ok. -terminate(#?STATE{store_mod = Mod, - store_state = StoreState}) -> +terminate(#?STATE{store_mod = Mod, store_state = StoreState}) -> ok = Mod:terminate(StoreState). -spec expire(ets | dets, ets:tid() | dets:tab_name()) -> expire(). expire(Mod, Tab) -> Now = os:system_time(second), - Mod:foldl( - fun(#retained_message{topic = Topic, - mqtt_msg = #mqtt_msg{props = #{'Message-Expiry-Interval' := Expiry}, - timestamp = Timestamp}}, Acc) - when is_integer(Expiry) andalso - is_integer(Timestamp) -> - if Now - Timestamp >= Expiry -> - Mod:delete(Tab, Topic), - Acc; - true -> - maps:put(Topic, {Timestamp, Expiry}, Acc) - end; - (_, Acc) -> - Acc - end, #{}, Tab). + ExpireMsg = + fun ({NodeId, + Topic, + #mqtt_msg{props = #{'Message-Expiry-Interval' := Expiry}, timestamp = Timestamp}}, + Acc) + when is_integer(Expiry) andalso is_integer(Timestamp) -> + if Now - Timestamp >= Expiry -> + Mod:delete(Tab, NodeId), + Acc; + true -> + maps:put(Topic, {Timestamp, Expiry}, Acc) + end; + (_, Acc) -> + Acc + end, + Mod:foldl(ExpireMsg, #{}, Tab). %% Retained messages written in 3.12 (or earlier) are converted when read in 3.13 (or later). -spec convert_mqtt_msg(mqtt_msg_v0()) -> mqtt_msg(). @@ -128,3 +122,9 @@ convert_mqtt_msg({mqtt_msg, Retain, Qos, Topic, Dup, PacketId, Payload}) -> packet_id = PacketId, payload = Payload, props = #{}}. + +-spec get_max_retained_messages_count() -> pos_integer(). +get_max_retained_messages_count() -> + rabbit_misc:get_env(rabbit_mqtt, + retained_message_store_max_retained_messages_count, + 2000). diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_dets.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_dets.erl index 2941d8cf95a8..9f4b69ffa7be 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_dets.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_dets.erl @@ -10,60 +10,288 @@ -behaviour(rabbit_mqtt_retained_msg_store). -include("rabbit_mqtt_packet.hrl"). + -include_lib("kernel/include/logger.hrl"). -export([new/2, recover/2, insert/3, lookup/2, delete/2, terminate/1]). --record(store_state, {table :: dets:tab_name()}). +-record(store_state, + {node_table :: dets:tab_name(), % Stores {node_id, edge_count, is_topic} + edge_table :: dets:tab_name(), % Stores {{from_id, word}, to_id} + msg_table :: dets:tab_name(), % Stores {node_id, topic, mqtt_msg} + root_id :: binary(), % Root node ID + dir :: file:filename_all(), + vhost :: rabbit_types:vhost(), + max_retained_messages_count :: pos_integer()}). -type store_state() :: #store_state{}. -spec new(file:name_all(), rabbit_types:vhost()) -> store_state(). new(Dir, VHost) -> - {ok, TabName} = open_table(Dir, VHost), - #store_state{table = TabName}. + {ok, NodeTable} = open_table(Dir, VHost, <<"nodes">>, set), + {ok, EdgeTable} = open_table(Dir, VHost, <<"edges">>, set), + {ok, MsgTable} = open_table(Dir, VHost, <<"msgs">>, set), + {ok, RootId} = find_or_insert_root_node(NodeTable, EdgeTable), + MaxRetainedMessagesCount = + rabbit_mqtt_retained_msg_store:get_max_retained_messages_count(), + #store_state{node_table = NodeTable, + edge_table = EdgeTable, + msg_table = MsgTable, + root_id = RootId, + dir = Dir, + vhost = VHost, + max_retained_messages_count = MaxRetainedMessagesCount}. -spec recover(file:name_all(), rabbit_types:vhost()) -> - {ok, store_state(), rabbit_mqtt_retained_msg_store:expire()} | - {error, uninitialized}. + {ok, store_state(), rabbit_mqtt_retained_msg_store:expire()} | + {error, uninitialized}. recover(Dir, VHost) -> - case open_table(Dir, VHost) of - {ok, TabName} -> - {ok, - #store_state{table = TabName}, - rabbit_mqtt_retained_msg_store:expire(dets, TabName)}; - {error, Reason} -> - ?LOG_ERROR("~s failed to open table: ~p", [?MODULE, Reason]), - {error, uninitialized} - end. + try + {ok, MsgTable} = open_table(Dir, VHost, <<"msgs">>, set), + Expire = rabbit_mqtt_retained_msg_store:expire(dets, MsgTable), + {ok, NodeTable} = open_table(Dir, VHost, <<"nodes">>, set), + {ok, EdgeTable} = open_table(Dir, VHost, <<"edges">>, set), + {ok, RootId} = find_or_insert_root_node(NodeTable, EdgeTable), + MaxRetainedMessagesCount = + rabbit_mqtt_retained_msg_store:get_max_retained_messages_count(), + State = + #store_state{node_table = NodeTable, + edge_table = EdgeTable, + msg_table = MsgTable, + root_id = RootId, + dir = Dir, + vhost = VHost, + max_retained_messages_count = MaxRetainedMessagesCount}, + {ok, State, Expire} + catch + error:Reason -> + ?LOG_ERROR("Failed to recover MQTT retained message store: ~p", [Reason]), + {error, uninitialized} + end. -spec insert(topic(), mqtt_msg(), store_state()) -> ok. -insert(Topic, Msg, #store_state{table = T}) -> - ok = dets:insert(T, #retained_message{topic = Topic, mqtt_msg = Msg}). - --spec lookup(topic(), store_state()) -> - mqtt_msg() | mqtt_msg_v0() | undefined. -lookup(Topic, #store_state{table = T}) -> - case dets:lookup(T, Topic) of - [] -> undefined; - [#retained_message{mqtt_msg = Msg}] -> Msg - end. +insert(Topic, Msg, #store_state{} = State) -> + Words = split_topic(Topic), + NodeId = follow_or_create_path(Words, State), + % Mark node as topic end and store message + update_node(NodeId, true, State), + dets:insert(State#store_state.msg_table, {NodeId, Topic, Msg}), + ok. + +-spec lookup(topic(), store_state()) -> [mqtt_msg()] | [mqtt_msg_v0()] | []. +lookup(Topic, + #store_state{root_id = RootId, + msg_table = MsgTable, + max_retained_messages_count = Limit} = + State) -> + Words = split_topic(Topic), + Matches = lists:sublist(match_pattern_words(Words, RootId, State, []), Limit), + Values = + lists:flatmap(fun(NodeId) -> + case dets:lookup(MsgTable, NodeId) of + [] -> []; + [{_NodeId, _Topic, Value} | _] -> [Value]; + {error, _Reason} -> + ?LOG_ERROR("Failed to lookup MQTT retained message for node ~p", [NodeId]), + [] + end + end, + Matches), + Values. -spec delete(topic(), store_state()) -> ok. -delete(Topic, #store_state{table = T}) -> - ok = dets:delete(T, Topic). +delete(Topic, State) -> + Words = split_topic(Topic), + case follow_path(Words, State) of + {ok, NodeId} -> + dets:match_delete(State#store_state.msg_table, {NodeId, Topic, '_'}), + case dets:lookup(State#store_state.msg_table, NodeId) of + [] -> + update_node(NodeId, false, State), + maybe_clean_path(NodeId, State); + _ -> + ok + end; + error -> + ok + end, + ok. -spec terminate(store_state()) -> ok. -terminate(#store_state{table = T}) -> - ok = dets:close(T). - -open_table(Dir, VHost) -> - Tab = rabbit_mqtt_util:vhost_name_to_table_name(VHost), - Path = rabbit_mqtt_util:path_for(Dir, VHost, ".dets"), - AutoSave = rabbit_misc:get_env(rabbit_mqtt, retained_message_store_dets_sync_interval, 2000), - dets:open_file(Tab, [{type, set}, - {keypos, #retained_message.topic}, - {file, Path}, - {ram_file, true}, - {repair, true}, - {auto_save, AutoSave}]). +terminate(#store_state{node_table = NodeTable, + edge_table = EdgeTable, + msg_table = MsgTable}) -> + ok = dets:close(NodeTable), + ok = dets:close(EdgeTable), + ok = dets:close(MsgTable), + ok. + +%% Internal functions + +split_topic(Topic) -> + binary:split(Topic, <<"/">>, [global]). + +make_node_id() -> + crypto:strong_rand_bytes(16). + +get_table_name(VHost, Type) -> + TableName = rabbit_mqtt_util:vhost_name_to_table_name(VHost), + Suffix = erlang:iolist_to_binary([<<"_">>, Type]), + erlang:list_to_atom(erlang:atom_to_list(TableName) ++ erlang:binary_to_list(Suffix)). + +get_table_path(Dir, VHost, Type) -> + % rabbit_mqtt_util:path_for(Dir, VHost, Type ++ ".dets"). + % Suffix = erlang:iolist_to_binary([VHost, <<"_">>, Type]), + rabbit_mqtt_util:path_for(Dir, erlang:iolist_to_binary([VHost, Type]), ".dets"). + +find_root_node(NodeTable, EdgeTable) -> + NodeIds = dets:match(NodeTable, {'$1', '_', '_'}), + DestNodeIds = dets:match(EdgeTable, {'_', '$1'}), + case lists:flatten(NodeIds) -- lists:flatten(DestNodeIds) of + [RootId] -> + {ok, RootId}; + [] -> + error; + _ -> + error % Multiple root nodes would indicate corruption + end. + +find_or_insert_root_node(NodeTable, EdgeTable) -> + case find_root_node(NodeTable, EdgeTable) of + {ok, Id} -> + {ok, Id}; + error -> + NewId = make_node_id(), + ok = dets:insert(NodeTable, {NewId, 0, false}), + {ok, NewId} + end. + +follow_or_create_path(Words, State) -> + follow_or_create_path(Words, State#store_state.root_id, State). + +follow_or_create_path([], NodeId, _State) -> + NodeId; +follow_or_create_path([Word | Rest], NodeId, State) -> + case find_edge(NodeId, Word, State) of + {ok, ChildId} -> + follow_or_create_path(Rest, ChildId, State); + error -> + ChildId = make_node_id(), + add_edge(NodeId, Word, ChildId, State), + follow_or_create_path(Rest, ChildId, State) + end. + +follow_path(Words, State) -> + follow_path(Words, State#store_state.root_id, State). + +follow_path([], NodeId, _State) -> + {ok, NodeId}; +follow_path([Word | Rest], NodeId, State) -> + case find_edge(NodeId, Word, State) of + {ok, ChildId} -> + follow_path(Rest, ChildId, State); + error -> + error + end. + +match_pattern_words([], NodeId, _State, Acc) -> + [NodeId | Acc]; +match_pattern_words([<<"+">> | RestWords], NodeId, State, Acc) -> + % + matches any single word + Edges = get_all_edges(NodeId, State), + lists:foldl(fun({_Key, ChildId}, EdgeAcc) -> + match_pattern_words(RestWords, ChildId, State, EdgeAcc) + end, + Acc, + Edges); +match_pattern_words([<<"#">> | _], NodeId, State, Acc) -> + % # matches zero or more words + collect_descendants(NodeId, State, [NodeId | Acc]); +match_pattern_words([Word | RestWords], NodeId, State, Acc) -> + case find_edge(NodeId, Word, State) of + {ok, ChildId} -> + match_pattern_words(RestWords, ChildId, State, Acc); + error -> + Acc + end. + +collect_descendants(NodeId, State, Acc) -> + Edges = get_all_edges(NodeId, State), + lists:foldl(fun({_Key, ChildId}, EdgeAcc) -> + collect_descendants(ChildId, State, [ChildId | EdgeAcc]) + end, + Acc, + Edges). + +find_edge(NodeId, Word, State) -> + Key = {NodeId, Word}, + case dets:lookup(State#store_state.edge_table, Key) of + [{_Key, ToNode}] -> + {ok, ToNode}; + [] -> + error + end. + +get_all_edges(NodeId, State) -> + Pattern = {{NodeId, '_'}, '_'}, + dets:match_object(State#store_state.edge_table, Pattern). + +add_edge(FromId, Word, ToId, State) -> + Key = {FromId, Word}, + EdgeEntry = {Key, ToId}, + ok = dets:insert(State#store_state.edge_table, EdgeEntry), + NodeEntry = {ToId, 0, false}, + ok = dets:insert(State#store_state.node_table, NodeEntry), + update_edge_count(FromId, +1, State). + +update_edge_count(NodeId, Delta, State) -> + case dets:lookup(State#store_state.node_table, NodeId) of + [{NodeId, EdgeCount, IsTopic}] -> + NewCount = EdgeCount + Delta, + ok = dets:insert(State#store_state.node_table, {NodeId, NewCount, IsTopic}); + [] -> + error + end. + +update_node(NodeId, IsTopic, State) -> + case dets:lookup(State#store_state.node_table, NodeId) of + [{NodeId, EdgeCount, _OldIsTopic}] -> + ok = dets:insert(State#store_state.node_table, {NodeId, EdgeCount, IsTopic}); + [] -> + error + end. + +maybe_clean_path(NodeId, State) -> + case dets:lookup(State#store_state.node_table, NodeId) of + [{NodeId, 0, false}] -> + Pattern = {'_', '_'}, + Edges = dets:match_object(State#store_state.edge_table, {Pattern, NodeId}), + case Edges of + [{{ParentId, Word}, NodeId}] -> + remove_edge(ParentId, Word, State), + ok = dets:delete(State#store_state.node_table, NodeId), + maybe_clean_path(ParentId, State); + [] -> + ok + end; + _ -> + ok + end. + +remove_edge(FromId, Word, State) -> + ok = dets:delete(State#store_state.edge_table, {FromId, Word}), + update_edge_count(FromId, -1, State). + +%% Setup functions + +open_table(Dir, VHost, Type, TableType) -> + Tab = get_table_name(VHost, Type), + Path = get_table_path(Dir, VHost, Type), + AutoSave = + rabbit_misc:get_env(rabbit_mqtt, retained_message_store_dets_sync_interval, 2000), + dets:open_file(Tab, + [{type, TableType}, + {file, Path}, + {ram_file, true}, + {repair, true}, + {auto_save, AutoSave}]). diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_ets.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_ets.erl index 927939c45a3f..4fa8c2ffdd21 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_ets.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_ets.erl @@ -11,55 +11,321 @@ -include("rabbit_mqtt_packet.hrl"). +-include_lib("kernel/include/logger.hrl"). + -export([new/2, recover/2, insert/3, lookup/2, delete/2, terminate/1]). --record(store_state, { - table :: ets:tid(), - filename :: file:filename_all() - }). +-record(store_state, + {node_table :: ets:tid(), % Stores {node_id, edge_count, is_topic} + edge_table :: ets:tid(), % Stores {{from_id, word}, to_id} + msg_table :: ets:tid(), % Stores {node_id, topic, mqtt_msg} + root_id :: binary(), % Root node ID + dir :: file:filename_all(), + vhost :: rabbit_types:vhost(), + max_retained_messages_count :: pos_integer()}). -type store_state() :: #store_state{}. -spec new(file:name_all(), rabbit_types:vhost()) -> store_state(). new(Dir, VHost) -> - Path = rabbit_mqtt_util:path_for(Dir, VHost), - TableName = rabbit_mqtt_util:vhost_name_to_table_name(VHost), - _ = file:delete(Path), - Tid = ets:new(TableName, [set, public, {keypos, #retained_message.topic}]), - #store_state{table = Tid, filename = Path}. + delete_table_files(Dir, VHost), + + % Node table - will store tuples of {node_id, edge_count, is_topic} + NodeTable = ets:new(get_table_name(VHost, <<"nodes">>), [set, public]), + % Edge table - will store {{from_id, word}, to_id} + EdgeTable = ets:new(get_table_name(VHost, <<"edges">>), [ordered_set, public]), + % Topic table - will store {node_id, topic, value} + MsgTable = ets:new(get_table_name(VHost, <<"msgs">>), [set, public]), + + RootId = make_node_id(), + ets:insert(NodeTable, {RootId, 0, false}), + + MaxRetainedMessagesCount = + rabbit_mqtt_retained_msg_store:get_max_retained_messages_count(), + #store_state{node_table = NodeTable, + edge_table = EdgeTable, + msg_table = MsgTable, + root_id = RootId, + dir = Dir, + vhost = VHost, + max_retained_messages_count = MaxRetainedMessagesCount}. -spec recover(file:name_all(), rabbit_types:vhost()) -> - {ok, store_state(), rabbit_mqtt_retained_msg_store:expire()} | - {error, uninitialized}. + {ok, store_state(), rabbit_mqtt_retained_msg_store:expire()} | + {error, uninitialized}. recover(Dir, VHost) -> - Path = rabbit_mqtt_util:path_for(Dir, VHost), - case ets:file2tab(Path) of - {ok, Tid} -> - _ = file:delete(Path), - {ok, - #store_state{table = Tid, filename = Path}, - rabbit_mqtt_retained_msg_store:expire(ets, Tid)}; - {error, _} -> - {error, uninitialized} - end. + io:format("Recovering MQTT retained message store from ~s~n", [Dir]), + try + {ok, MsgTable} = recover_table(Dir, VHost, <<"msgs">>), + Expire = rabbit_mqtt_retained_msg_store:expire(ets, MsgTable), + {ok, NodeTable} = recover_table(Dir, VHost, <<"nodes">>), + {ok, EdgeTable} = recover_table(Dir, VHost, <<"edges">>), + + RootId = + case find_root_node(NodeTable, EdgeTable) of + {ok, Id} -> + io:format("Recovered existing RootId: ~p~n", [Id]), + Id; + error -> + NewId = make_node_id(), + io:format("Creating new RootId: ~p~n", [NewId]), + ets:insert(NodeTable, {NewId, 0, false}), + NewId + end, + MaxRetainedMessagesCount = + rabbit_mqtt_retained_msg_store:get_max_retained_messages_count(), + State = + #store_state{node_table = NodeTable, + edge_table = EdgeTable, + msg_table = MsgTable, + root_id = RootId, + dir = Dir, + vhost = VHost, + max_retained_messages_count = MaxRetainedMessagesCount}, + {ok, State, Expire} + catch + error:Reason -> + ?LOG_ERROR("~s failed to recover MQTT retained message store: ~p", [?MODULE, Reason]), + {error, uninitialized} + end. + +-spec terminate(store_state()) -> ok. +terminate(#store_state{node_table = NodeTable, + edge_table = EdgeTable, + msg_table = MsgTable, + dir = Dir, + vhost = VHost}) -> + ok = + ets:tab2file(NodeTable, + get_table_path(Dir, VHost, <<"nodes">>), + [{extended_info, [object_count]}]), + ok = + ets:tab2file(EdgeTable, + get_table_path(Dir, VHost, <<"edges">>), + [{extended_info, [object_count]}]), + ok = + ets:tab2file(MsgTable, + get_table_path(Dir, VHost, <<"msgs">>), + [{extended_info, [object_count]}]), + ok. -spec insert(topic(), mqtt_msg(), store_state()) -> ok. -insert(Topic, Msg, #store_state{table = T}) -> - true = ets:insert(T, #retained_message{topic = Topic, mqtt_msg = Msg}), +insert(Topic, Msg, #store_state{} = State) -> + Words = split_topic(Topic), + NodeId = follow_or_create_path(Words, State), + % Mark node as topic end and store message + update_node(NodeId, true, State), + ets:insert(State#store_state.msg_table, {NodeId, Topic, Msg}), ok. --spec lookup(topic(), store_state()) -> mqtt_msg() | mqtt_msg_v0() | undefined. -lookup(Topic, #store_state{table = T}) -> - case ets:lookup(T, Topic) of - [] -> undefined; - [#retained_message{mqtt_msg = Msg}] -> Msg - end. +-spec lookup(topic(), store_state()) -> [mqtt_msg()] | [mqtt_msg_v0()] | []. +lookup(Topic, + #store_state{max_retained_messages_count = Limit, + msg_table = MsgTable, + root_id = RootId} = + State) -> + Words = split_topic(Topic), + % limiting the length of the list of matches to avoid performance issues + % it is simpler and in most cases more efficient to use sublist after the fact than to try to limit the number of matches in the match_pattern_words function + Matches = lists:sublist(match_pattern_words(Words, RootId, State, []), Limit), + lists:flatmap(fun(NodeId) -> + case ets:lookup(MsgTable, NodeId) of + [] -> []; + [{_NodeId, _Topic, Value} | _] -> [Value]; + {error, _Reason} -> + ?LOG_ERROR("Failed to lookup MQTT retained message for node ~p", [NodeId]), + [] + end + end, + Matches). -spec delete(topic(), store_state()) -> ok. -delete(Topic, #store_state{table = T}) -> - true = ets:delete(T, Topic), +delete(Topic, State) -> + Words = split_topic(Topic), + case follow_path(Words, State) of + {ok, NodeId} -> + ets:match_delete(State#store_state.msg_table, {NodeId, Topic, '_'}), + % If no more messages at this node, mark as non-topic + case ets:lookup(State#store_state.msg_table, NodeId) of + [] -> + update_node(NodeId, false, State), + % Clean up unused path + maybe_clean_path(NodeId, State); + _ -> + ok + end; + error -> + ok + end, ok. --spec terminate(store_state()) -> ok. -terminate(#store_state{table = T, filename = Path}) -> - ok = ets:tab2file(T, Path, [{extended_info, [object_count]}]). +%% Internal setup/teardown functions +-spec get_table_name(rabbit_types:vhost(), binary()) -> atom(). +get_table_name(VHost, Type) -> + TableName = rabbit_mqtt_util:vhost_name_to_table_name(VHost), + Suffix = erlang:iolist_to_binary([<<"_">>, Type]), + erlang:list_to_atom(erlang:atom_to_list(TableName) ++ erlang:binary_to_list(Suffix)). + +-spec get_table_path(file:name_all(), rabbit_types:vhost(), binary()) -> file:name_all(). +get_table_path(Dir, VHost, Type) -> + rabbit_mqtt_util:path_for(Dir, erlang:iolist_to_binary([VHost, Type]), ".ets"). + +-spec delete_table_files(file:name_all(), rabbit_types:vhost()) -> ok. +delete_table_files(Dir, VHost) -> + Types = ["nodes", "edges", "msgs"], + lists:foreach(fun(Type) -> delete_table(Dir, VHost, Type) end, Types), + ok. + +-spec delete_table(file:name_all(), rabbit_types:vhost(), binary()) -> ok. +delete_table(Dir, VHost, Type) -> + Path = get_table_path(Dir, VHost, Type), + file:delete(Path). + +-spec recover_table(file:name_all(), rabbit_types:vhost(), binary()) -> {ok, ets:tid()}. +recover_table(Dir, VHost, Type) -> + Path = get_table_path(Dir, VHost, Type), + case ets:file2tab(Path) of + {ok, Tid} -> + _ = file:delete(Path), + {ok, Tid} + end. + +% Internal trie methods +split_topic(Topic) -> + binary:split(Topic, <<"/">>, [global]). + +% This might not be the most efficient way to find the root node, but the following options: +% Store root ID separately need additional storage/persistence and could get out of sync +% First node in table, requires ordered_set which could bring performance hit during lookup +find_root_node(NodeTable, EdgeTable) -> + NodeIds = ets:match(NodeTable, {'$1', '_', '_'}), + DestNodeIds = ets:match(EdgeTable, {'_', '$1'}), + % Find the node that doesn't appear as a destination in any edge + case lists:flatten(NodeIds) -- lists:flatten(DestNodeIds) of + [RootId] -> + {ok, RootId}; + [] -> + error; + _ -> + error % Multiple root nodes would indicate corruption + end. + +follow_or_create_path(Words, State) -> + follow_or_create_path(Words, State#store_state.root_id, State). + +follow_or_create_path([], NodeId, _State) -> + NodeId; +follow_or_create_path([Word | Rest], NodeId, State) -> + case find_edge(NodeId, Word, State) of + {ok, ChildId} -> + follow_or_create_path(Rest, ChildId, State); + error -> + ChildId = make_node_id(), + add_edge(NodeId, Word, ChildId, State), + follow_or_create_path(Rest, ChildId, State) + end. + +follow_path(Words, State) -> + follow_path(Words, State#store_state.root_id, State). + +follow_path([], NodeId, _State) -> + {ok, NodeId}; +follow_path([Word | Rest], NodeId, State) -> + case find_edge(NodeId, Word, State) of + {ok, ChildId} -> + follow_path(Rest, ChildId, State); + error -> + error + end. + +match_pattern_words([], NodeId, _State, Acc) -> + [NodeId | Acc]; +match_pattern_words([<<"+">> | RestWords], NodeId, State, Acc) -> + % + matches any single word + Edges = get_all_edges(NodeId, State), + lists:foldl(fun({_Key, ChildId}, EdgeAcc) -> + match_pattern_words(RestWords, ChildId, State, EdgeAcc) + end, + Acc, + Edges); +match_pattern_words([<<"#">> | _], NodeId, State, Acc) -> + % # matches zero or more words + collect_descendants(NodeId, State, [NodeId | Acc]); +match_pattern_words([Word | RestWords], NodeId, State, Acc) -> + case find_edge(NodeId, Word, State) of + {ok, ChildId} -> + match_pattern_words(RestWords, ChildId, State, Acc); + error -> + Acc + end. + +collect_descendants(NodeId, State, Acc) -> + Edges = get_all_edges(NodeId, State), + lists:foldl(fun({_Key, ChildId}, EdgeAcc) -> + collect_descendants(ChildId, State, [ChildId | EdgeAcc]) + end, + Acc, + Edges). + +find_edge(NodeId, Word, State) -> + Key = {NodeId, Word}, + case ets:lookup(State#store_state.edge_table, Key) of + [{_Key, ToNode}] -> + {ok, ToNode}; + [] -> + error + end. + +get_all_edges(NodeId, State) -> + % Match all edges from this node + Pattern = {{NodeId, '_'}, '_'}, + ets:match_object(State#store_state.edge_table, Pattern). + +make_node_id() -> + crypto:strong_rand_bytes(16). + +add_edge(FromId, Word, ToId, State) -> + Key = {FromId, Word}, + EdgeEntry = {Key, ToId}, + ets:insert(State#store_state.edge_table, EdgeEntry), + NodeEntry = {ToId, 0, false}, + ets:insert(State#store_state.node_table, NodeEntry), + update_edge_count(FromId, +1, State). + +update_edge_count(NodeId, Delta, State) -> + case ets:lookup(State#store_state.node_table, NodeId) of + [{NodeId, EdgeCount, IsTopic}] -> + NewCount = EdgeCount + Delta, + ets:insert(State#store_state.node_table, {NodeId, NewCount, IsTopic}); + [] -> + error + end. + +update_node(NodeId, IsTopic, State) -> + case ets:lookup(State#store_state.node_table, NodeId) of + [{NodeId, EdgeCount, _OldIsTopic}] -> + ets:insert(State#store_state.node_table, {NodeId, EdgeCount, IsTopic}); + [] -> + error + end. + +maybe_clean_path(NodeId, State) -> + case ets:lookup(State#store_state.node_table, NodeId) of + [{NodeId, 0, false}] -> + Pattern = {'_', '_'}, + Edges = ets:match_object(State#store_state.edge_table, {Pattern, NodeId}), + case Edges of + [{{ParentId, Word}, NodeId}] -> + remove_edge(ParentId, Word, State), + ets:delete(State#store_state.node_table, NodeId), + maybe_clean_path(ParentId, State); + [] -> + ok + end; + _ -> + ok + end. + +remove_edge(FromId, Word, State) -> + ets:delete(State#store_state.edge_table, {FromId, Word}), + update_edge_count(FromId, -1, State). diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_noop.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_noop.erl index b6cb6ebcd28e..968859bd9afe 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_noop.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_noop.erl @@ -21,7 +21,7 @@ insert(_Topic, _Msg, _State) -> ok. lookup(_Topic, _State) -> - undefined. + []. delete(_Topic, _State) -> ok. diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_retainer.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retainer.erl index 5a613091e4ab..9b717d6e7167 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_retainer.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retainer.erl @@ -11,21 +11,19 @@ -behaviour(gen_server). --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, start_link/1]). - +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, start_link/1]). -export([retain/3, fetch/2, clear/2]). -define(TIMEOUT, 30_000). - -define(STATE, ?MODULE). --record(?STATE, {store_state :: rabbit_mqtt_retained_msg_store:state(), - expire :: #{topic() := TimerRef :: reference()} - }). + +-record(?STATE, + {store_state :: rabbit_mqtt_retained_msg_store:state(), + expire :: #{topic() := TimerRef :: reference()}}). + -type state() :: #?STATE{}. --spec start_link(rabbit_types:vhost()) -> - gen_server:start_ret(). +-spec start_link(rabbit_types:vhost()) -> gen_server:start_ret(). start_link(VHost) -> gen_server:start_link(?MODULE, VHost, []). @@ -33,8 +31,7 @@ start_link(VHost) -> retain(Pid, Topic, Msg = #mqtt_msg{retain = true}) -> gen_server:cast(Pid, {retain, Topic, Msg}). --spec fetch(pid(), topic()) -> - undefined | mqtt_msg(). +-spec fetch(pid(), topic()) -> undefined | mqtt_msg() | [mqtt_msg()]. fetch(Pid, Topic) -> gen_server:call(Pid, {fetch, Topic}, ?TIMEOUT). @@ -42,74 +39,89 @@ fetch(Pid, Topic) -> clear(Pid, Topic) -> gen_server:cast(Pid, {clear, Topic}). --spec init(rabbit_types:vhost()) -> - {ok, state()}. +-spec init(rabbit_types:vhost()) -> {ok, state()}. init(VHost) -> process_flag(trap_exit, true), {StoreState, Expire0} = rabbit_mqtt_retained_msg_store:start(VHost), Now = os:system_time(second), - Expire = maps:map(fun(Topic, {Timestamp, Expiry}) -> - TimerSecs = max(0, Expiry - (Now - Timestamp)), - start_timer(TimerSecs, Topic) - end, Expire0), - {ok, #?STATE{store_state = StoreState, - expire = Expire}}. + Expire = + maps:map(fun(Topic, {Timestamp, Expiry}) -> + TimerSecs = max(0, Expiry - (Now - Timestamp)), + start_timer(TimerSecs, Topic) + end, + Expire0), + {ok, #?STATE{store_state = StoreState, expire = Expire}}. handle_cast({retain, Topic, Msg0 = #mqtt_msg{props = Props}}, - State = #?STATE{store_state = StoreState, - expire = Expire0}) -> - Expire2 = case maps:take(Topic, Expire0) of - {OldTimer, Expire1} -> - cancel_timer(OldTimer), - Expire1; - error -> - Expire0 - end, - {Msg, Expire} = case maps:find('Message-Expiry-Interval', Props) of - {ok, ExpirySeconds} -> - Timer = start_timer(ExpirySeconds, Topic), - {Msg0#mqtt_msg{timestamp = os:system_time(second)}, - maps:put(Topic, Timer, Expire2)}; - error -> - {Msg0, Expire2} - end, + State = #?STATE{store_state = StoreState, expire = Expire0}) -> + Expire2 = + case maps:take(Topic, Expire0) of + {OldTimer, Expire1} -> + cancel_timer(OldTimer), + Expire1; + error -> + Expire0 + end, + {Msg, Expire} = + case maps:find('Message-Expiry-Interval', Props) of + {ok, ExpirySeconds} -> + Timer = start_timer(ExpirySeconds, Topic), + {Msg0#mqtt_msg{timestamp = os:system_time(second)}, + maps:put(Topic, Timer, Expire2)}; + error -> + {Msg0, Expire2} + end, ok = rabbit_mqtt_retained_msg_store:insert(Topic, Msg, StoreState), {noreply, State#?STATE{expire = Expire}}; -handle_cast({clear, Topic}, State = #?STATE{store_state = StoreState, - expire = Expire0}) -> - Expire = case maps:take(Topic, Expire0) of - {OldTimer, Expire1} -> - cancel_timer(OldTimer), - Expire1; - error -> - Expire0 - end, +handle_cast({clear, Topic}, + State = #?STATE{store_state = StoreState, expire = Expire0}) -> + Expire = + case maps:take(Topic, Expire0) of + {OldTimer, Expire1} -> + cancel_timer(OldTimer), + Expire1; + error -> + Expire0 + end, ok = rabbit_mqtt_retained_msg_store:delete(Topic, StoreState), {noreply, State#?STATE{expire = Expire}}. handle_call({fetch, Topic}, _From, State = #?STATE{store_state = StoreState}) -> - Reply = case rabbit_mqtt_retained_msg_store:lookup(Topic, StoreState) of - #mqtt_msg{props = #{'Message-Expiry-Interval' := Expiry0} = Props, - timestamp = Timestamp} = MqttMsg -> - %% “The PUBLISH packet sent to a Client by the Server MUST contain a Message - %% Expiry Interval set to the received value minus the time that the - %% Application Message has been waiting in the Server [MQTT-3.3.2-6].” - Expiry = max(0, Expiry0 - (os:system_time(second) - Timestamp)), - MqttMsg#mqtt_msg{props = maps:put('Message-Expiry-Interval', Expiry, Props)}; - Other -> - Other - end, + Reply = + case rabbit_mqtt_retained_msg_store:lookup(Topic, StoreState) of + [] -> + undefined; + Messages when is_list(Messages) -> + lists:map(fun update_message_expiry/1, Messages); + SingleMessage = #mqtt_msg{} -> + update_message_expiry(SingleMessage); + Other -> + Other + end, {reply, Reply, State}. -handle_info({timeout, Timer, Topic}, State = #?STATE{store_state = StoreState, - expire = Expire0}) -> - Expire = case maps:take(Topic, Expire0) of - {Timer, Expire1} -> - ok = rabbit_mqtt_retained_msg_store:delete(Topic, StoreState), - Expire1; - _ -> - Expire0 - end, +-spec update_message_expiry(mqtt_msg()) -> mqtt_msg(). +update_message_expiry(#mqtt_msg{props = #{'Message-Expiry-Interval' := Expiry0} = Props, + timestamp = Timestamp} = + MqttMsg) -> + %% "The PUBLISH packet sent to a Client by the Server MUST contain a Message + %% Expiry Interval set to the received value minus the time that the + %% Application Message has been waiting in the Server [MQTT-3.3.2-6]." + Expiry = max(0, Expiry0 - (os:system_time(second) - Timestamp)), + MqttMsg#mqtt_msg{props = maps:put('Message-Expiry-Interval', Expiry, Props)}; +update_message_expiry(MqttMsg) -> + MqttMsg. + +handle_info({timeout, Timer, Topic}, + State = #?STATE{store_state = StoreState, expire = Expire0}) -> + Expire = + case maps:take(Topic, Expire0) of + {Timer, Expire1} -> + ok = rabbit_mqtt_retained_msg_store:delete(Topic, StoreState), + Expire1; + _ -> + Expire0 + end, {noreply, State#?STATE{expire = Expire}}; handle_info(stop, State) -> {stop, normal, State}; @@ -120,11 +132,10 @@ terminate(_Reason, #?STATE{store_state = StoreState}) -> rabbit_mqtt_retained_msg_store:terminate(StoreState). -spec start_timer(integer(), topic()) -> reference(). -start_timer(Seconds, Topic) - when is_binary(Topic) -> - erlang:start_timer(timer:seconds(Seconds), self(), Topic). +start_timer(Seconds, Topic) when is_binary(Topic) -> + erlang:start_timer( + timer:seconds(Seconds), self(), Topic). -spec cancel_timer(reference()) -> ok. cancel_timer(TimerRef) -> - ok = erlang:cancel_timer(TimerRef, [{async, true}, - {info, false}]). + ok = erlang:cancel_timer(TimerRef, [{async, true}, {info, false}]). diff --git a/deps/rabbitmq_mqtt/test/rabbit_mqtt_retained_msg_store_SUITE.erl b/deps/rabbitmq_mqtt/test/rabbit_mqtt_retained_msg_store_SUITE.erl new file mode 100644 index 000000000000..a1f4910442e1 --- /dev/null +++ b/deps/rabbitmq_mqtt/test/rabbit_mqtt_retained_msg_store_SUITE.erl @@ -0,0 +1,275 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% +-module(rabbit_mqtt_retained_msg_store_SUITE). + +-compile([export_all, nowarn_export_all]). + +-include_lib("rabbitmq_mqtt/include/rabbit_mqtt_packet.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +all() -> + [{group, ets}, {group, dets}]. + +groups() -> + [{dets, [parallel], tests()}, {ets, [parallel], tests()}]. + +tests() -> + [test_add_and_match, + test_delete, + test_plus_wildcard, + test_hash_wildcard, + test_combined_wildcards, + test_recovery]. + +init_per_suite(Config) -> + Config. + +end_per_suite(_Config) -> + ok. + +init_per_group(G, Config) -> + case G of + ets -> + Module = rabbit_mqtt_retained_msg_store_ets; + dets -> + Module = rabbit_mqtt_retained_msg_store_dets + end, + [{module, Module} | Config]. + +end_per_group(_Group, _Config) -> + ok. + +init_per_testcase(_TestCase, Config) -> + Mod = ?config(module, Config), + Dir = filename:join(["/tmp", "mqtt_test_" ++ erlang:integer_to_list(erlang:unique_integer())]), + ok = filelib:ensure_dir(Dir ++ "/"), + % VHost needs to be different for each test case to create a new table + VHost = erlang:iolist_to_binary([<<"test">>, erlang:integer_to_list(erlang:unique_integer())]), + State = Mod:new(Dir, VHost), + [{store_state, State}, {test_dir, Dir}, {vhost, VHost} | Config]. + +end_per_testcase(_TestCase, Config) -> + State = ?config(store_state, Config), + Mod = ?config(module, Config), + Dir = ?config(test_dir, Config), + Mod:terminate(State), + + timer:sleep(100), + case file:del_dir_r(Dir) of + ok -> + ok; + {error, enoent} -> + ok; + {error, Reason} -> + ct:pal("Failed to delete directory ~p: ~p", [Dir, Reason]), + os:cmd("rm -rf " ++ Dir) + end, + ok. + +%%---------------------------------------------------------------------------- +%% Test Cases +%%---------------------------------------------------------------------------- + +test_add_and_match(Config) -> + State = ?config(store_state, Config), + Mod = ?config(module, Config), + + Msg1 = + #mqtt_msg{retain = true, + qos = 0, + topic = <<"a/b/c">>, + dup = false, + payload = <<"msg1">>, + props = #{}, + timestamp = os:system_time(second)}, + Msg2 = + #mqtt_msg{retain = true, + qos = 0, + topic = <<"a/b/d">>, + dup = false, + payload = <<"msg2">>, + props = #{}, + timestamp = os:system_time(second)}, + ok = Mod:insert(<<"a/b/c">>, Msg1, State), + Matches1 = Mod:lookup(<<"a/b/c">>, State), + ok = Mod:insert(<<"a/b/d">>, Msg2, State), + Matches2 = Mod:lookup(<<"a/b/d">>, State), + NoMatches = Mod:lookup(<<"x/y/z">>, State), + + ?assertEqual([Msg1], Matches1), + ?assertEqual([Msg2], Matches2), + ?assertEqual([], NoMatches). + +test_delete(Config) -> + State = ?config(store_state, Config), + Mod = ?config(module, Config), + Msg1 = + #mqtt_msg{retain = true, + qos = 0, + topic = <<"a/b/c">>, + dup = false, + payload = <<"msg1">>, + props = #{}, + timestamp = os:system_time(second)}, + ok = Mod:insert(<<"a/b/c">>, Msg1, State), + ok = Mod:delete(<<"a/b/c">>, State), + Matches = Mod:lookup(<<"a/b/c">>, State), + + ?assertEqual([], Matches). + +test_plus_wildcard(Config) -> + State = ?config(store_state, Config), + Mod = ?config(module, Config), + Msg1 = + #mqtt_msg{retain = true, + qos = 0, + topic = <<"a/b/c">>, + dup = false, + payload = <<"msg1">>, + props = #{}, + timestamp = os:system_time(second)}, + Msg2 = + #mqtt_msg{retain = true, + qos = 0, + topic = <<"a/x/c">>, + dup = false, + payload = <<"msg2">>, + props = #{}, + timestamp = os:system_time(second)}, + ok = Mod:insert(<<"a/b/c">>, Msg1, State), + ok = Mod:insert(<<"a/x/c">>, Msg2, State), + Matches = Mod:lookup(<<"a/+/c">>, State), + + ?assertEqual(lists:sort([Msg1, Msg2]), lists:sort(Matches)). + +test_hash_wildcard(Config) -> + State = ?config(store_state, Config), + Mod = ?config(module, Config), + Msg1 = + #mqtt_msg{retain = true, + qos = 0, + topic = <<"a/b/c">>, + dup = false, + payload = <<"msg1">>, + props = #{}, + timestamp = os:system_time(second)}, + Msg2 = + #mqtt_msg{retain = true, + qos = 0, + topic = <<"a/b/c/d">>, + dup = false, + payload = <<"msg2">>, + props = #{}, + timestamp = os:system_time(second)}, + Msg3 = + #mqtt_msg{retain = true, + qos = 0, + topic = <<"a/b/x/y">>, + dup = false, + payload = <<"msg3">>, + props = #{}, + timestamp = os:system_time(second)}, + Msg4 = + #mqtt_msg{retain = true, + qos = 0, + topic = <<"a/q/x/y">>, + dup = false, + payload = <<"msg4">>, + props = #{}, + timestamp = os:system_time(second)}, + ok = Mod:insert(<<"a/b/c">>, Msg1, State), + ok = Mod:insert(<<"a/b/c/d">>, Msg2, State), + ok = Mod:insert(<<"a/b/x/y">>, Msg3, State), + ok = Mod:insert(<<"a/q/x/y">>, Msg4, State), + Matches = Mod:lookup(<<"a/b/#">>, State), + + ?assertEqual([Msg1, Msg2, Msg3], lists:sort(Matches)). + +test_combined_wildcards(Config) -> + State = ?config(store_state, Config), + Mod = ?config(module, Config), + + Msg1 = + #mqtt_msg{retain = true, + qos = 0, + topic = <<"a/b/c">>, + dup = false, + payload = <<"msg1">>, + props = #{}, + timestamp = os:system_time(second)}, + Msg2 = + #mqtt_msg{retain = true, + qos = 0, + topic = <<"a/b/d">>, + dup = false, + payload = <<"msg2">>, + props = #{}, + timestamp = os:system_time(second)}, + Msg3 = + #mqtt_msg{retain = true, + qos = 0, + topic = <<"a/x/c/e">>, + dup = false, + payload = <<"msg3">>, + props = #{}, + timestamp = os:system_time(second)}, + Msg4 = + #mqtt_msg{retain = true, + qos = 0, + topic = <<"a/y/c/f/g">>, + dup = false, + payload = <<"msg4">>, + props = #{}, + timestamp = os:system_time(second)}, + ok = Mod:insert(<<"a/b/c/d">>, Msg1, State), + ok = Mod:insert(<<"a/x/c/e">>, Msg2, State), + ok = Mod:insert(<<"a/y/c/f/g">>, Msg3, State), + ok = Mod:insert(<<"a/y/d/f/g">>, Msg4, State), + Matches = Mod:lookup(<<"a/+/c/#">>, State), + + ?assertEqual([Msg1, Msg2, Msg3], lists:sort(Matches)). + +test_recovery(Config) -> + State = ?config(store_state, Config), + Mod = ?config(module, Config), + + Msg1 = + #mqtt_msg{retain = true, + qos = 0, + topic = <<"a/b/c">>, + dup = false, + payload = <<"msg1">>, + props = #{}, + timestamp = os:system_time(second)}, + Msg2 = + #mqtt_msg{retain = true, + qos = 0, + topic = <<"a/b/d">>, + dup = false, + payload = <<"msg2">>, + props = #{}, + timestamp = os:system_time(second)}, + + ok = Mod:insert(<<"a/b/c">>, Msg1, State), + ok = Mod:insert(<<"a/b/d">>, Msg2, State), + Matches1 = Mod:lookup(<<"a/b/c">>, State), + Matches2 = Mod:lookup(<<"a/b/d">>, State), + NoMatches = Mod:lookup(<<"x/y/z">>, State), + ?assertEqual([Msg1], Matches1), + ?assertEqual([Msg2], Matches2), + ?assertEqual([], NoMatches), + % Recover the state + ok = Mod:terminate(State), + {ok, State2, _Expire} = Mod:recover(?config(test_dir, Config), ?config(vhost, Config)), + + Matches1 = Mod:lookup(<<"a/b/c">>, State2), + Matches2 = Mod:lookup(<<"a/b/d">>, State2), + NoMatches = Mod:lookup(<<"x/y/z">>, State2), + ?assertEqual([Msg1], Matches1), + ?assertEqual([Msg2], Matches2), + ?assertEqual([], NoMatches). diff --git a/deps/rabbitmq_mqtt/test/retainer_SUITE.erl b/deps/rabbitmq_mqtt/test/retainer_SUITE.erl index d455b3031967..9b68fcfdb747 100644 --- a/deps/rabbitmq_mqtt/test/retainer_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/retainer_SUITE.erl @@ -5,43 +5,34 @@ %% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. %% -module(retainer_SUITE). + -compile([export_all, nowarn_export_all]). -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). --import(util, [connect/2, connect/3, - expect_publishes/3, - assert_message_expiry_interval/2 - ]). + +-import(util, + [connect/2, connect/3, expect_publishes/3, assert_message_expiry_interval/2]). all() -> - [ - {group, v4}, - {group, v5} - ]. + [{group, v4}, {group, v5}]. groups() -> - [ - {v4, [], sub_groups()}, - {v5, [], sub_groups()} - ]. + [{v4, [], sub_groups()}, {v5, [], sub_groups()}]. sub_groups() -> - [ - {dets, [shuffle], tests()}, + [{dets, [shuffle], tests()}, {ets, [shuffle], tests()}, - {noop, [shuffle], [does_not_retain]} - ]. + {noop, [shuffle], [does_not_retain]}]. tests() -> - [ - coerce_configuration_data, + [coerce_configuration_data, should_translate_amqp2mqtt_on_publish, should_translate_amqp2mqtt_on_retention, should_translate_amqp2mqtt_on_retention_search, recover, - recover_with_message_expiry_interval - ]. + recover_with_message_expiry_interval, + retained_wildcard_single_level, retained_wildcard_multi_level, retained_wildcard_mixed]. suite() -> [{timetrap, {minutes, 2}}]. @@ -57,37 +48,29 @@ init_per_suite(Config) -> end_per_suite(Config) -> Config. -init_per_group(G, Config) - when G =:= v4; - G =:= v5 -> +init_per_group(G, Config) when G =:= v4; G =:= v5 -> rabbit_ct_helpers:set_config(Config, {mqtt_version, G}); init_per_group(Group, Config0) -> Suffix = rabbit_ct_helpers:testcase_absname(Config0, "", "-"), - Config = rabbit_ct_helpers:set_config( - Config0, {rmq_nodename_suffix, Suffix}), + Config = rabbit_ct_helpers:set_config(Config0, {rmq_nodename_suffix, Suffix}), Mod = list_to_atom("rabbit_mqtt_retained_msg_store_" ++ atom_to_list(Group)), Env = [{rabbitmq_mqtt, [{retained_message_store, Mod}]}, - {rabbit, [ - {default_user, "guest"}, - {default_pass, "guest"}, - {default_vhost, "/"}, - {default_permissions, [".*", ".*", ".*"]} - ]}], - rabbit_ct_helpers:run_setup_steps( - Config, - [fun(Conf) -> rabbit_ct_helpers:merge_app_env(Conf, Env) end] ++ - rabbit_ct_broker_helpers:setup_steps() ++ - rabbit_ct_client_helpers:setup_steps()). - -end_per_group(G, Config) - when G =:= v4; - G =:= v5 -> + {rabbit, + [{default_user, "guest"}, + {default_pass, "guest"}, + {default_vhost, "/"}, + {default_permissions, [".*", ".*", ".*"]}]}], + rabbit_ct_helpers:run_setup_steps(Config, + [fun(Conf) -> rabbit_ct_helpers:merge_app_env(Conf, Env) end] + ++ rabbit_ct_broker_helpers:setup_steps() + ++ rabbit_ct_client_helpers:setup_steps()). + +end_per_group(G, Config) when G =:= v4; G =:= v5 -> Config; end_per_group(_, Config) -> - rabbit_ct_helpers:run_teardown_steps( - Config, - rabbit_ct_client_helpers:teardown_steps() ++ - rabbit_ct_broker_helpers:teardown_steps()). + rabbit_ct_helpers:run_teardown_steps(Config, + rabbit_ct_client_helpers:teardown_steps() + ++ rabbit_ct_broker_helpers:teardown_steps()). init_per_testcase(recover_with_message_expiry_interval = T, Config) -> case ?config(mqtt_version, Config) of @@ -102,7 +85,6 @@ init_per_testcase(Testcase, Config) -> end_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_finished(Config, Testcase). - %% ------------------------------------------------------------------- %% Testsuite cases %% ------------------------------------------------------------------- @@ -125,7 +107,7 @@ should_translate_amqp2mqtt_on_publish(Config) -> C = connect(<<"simpleClientRetainer">>, Config, [{ack_timeout, 1}]), %% there's an active consumer {ok, _, _} = emqtt:subscribe(C, <<"TopicA/Device.Field">>, qos1), - ok = emqtt:publish(C, <<"TopicA/Device.Field">>, #{}, <<"Payload">>, [{retain, true}]), + ok = emqtt:publish(C, <<"TopicA/Device.Field">>, #{}, <<"Payload">>, [{retain, true}]), ok = expect_publishes(C, <<"TopicA/Device/Field">>, [<<"Payload">>]), ok = emqtt:disconnect(C). @@ -137,7 +119,7 @@ should_translate_amqp2mqtt_on_publish(Config) -> should_translate_amqp2mqtt_on_retention(Config) -> C = connect(<<"simpleClientRetainer">>, Config, [{ack_timeout, 1}]), %% publish with retain = true before a consumer comes around - ok = emqtt:publish(C, <<"TopicA/Device.Field">>, #{}, <<"Payload">>, [{retain, true}]), + ok = emqtt:publish(C, <<"TopicA/Device.Field">>, #{}, <<"Payload">>, [{retain, true}]), {ok, _, _} = emqtt:subscribe(C, <<"TopicA/Device.Field">>, qos1), ok = expect_publishes(C, <<"TopicA/Device/Field">>, [<<"Payload">>]), ok = emqtt:disconnect(C). @@ -149,28 +131,27 @@ should_translate_amqp2mqtt_on_retention(Config) -> %% ------------------------------------------------------------------- should_translate_amqp2mqtt_on_retention_search(Config) -> C = connect(<<"simpleClientRetainer">>, Config, [{ack_timeout, 1}]), - ok = emqtt:publish(C, <<"TopicA/Device.Field">>, #{}, <<"Payload">>, [{retain, true}]), + ok = emqtt:publish(C, <<"TopicA/Device.Field">>, #{}, <<"Payload">>, [{retain, true}]), {ok, _, _} = emqtt:subscribe(C, <<"TopicA/Device/Field">>, qos1), ok = expect_publishes(C, <<"TopicA/Device/Field">>, [<<"Payload">>]), ok = emqtt:disconnect(C). does_not_retain(Config) -> C = connect(<<"simpleClientRetainer">>, Config, [{ack_timeout, 1}]), - ok = emqtt:publish(C, <<"TopicA/Device.Field">>, #{}, <<"Payload">>, [{retain, true}]), + ok = emqtt:publish(C, <<"TopicA/Device.Field">>, #{}, <<"Payload">>, [{retain, true}]), {ok, _, _} = emqtt:subscribe(C, <<"TopicA/Device.Field">>, qos1), receive Unexpected -> ct:fail("Unexpected message: ~p", [Unexpected]) after 1000 -> - ok + ok end, ok = emqtt:disconnect(C). recover(Config) -> Topic = Payload = ClientId = atom_to_binary(?FUNCTION_NAME), C1 = connect(ClientId, Config), - {ok, _} = emqtt:publish(C1, Topic, Payload, [{retain, true}, - {qos, 1}]), + {ok, _} = emqtt:publish(C1, Topic, Payload, [{retain, true}, {qos, 1}]), ok = emqtt:disconnect(C1), ok = rabbit_ct_broker_helpers:restart_node(Config, 0), C2 = connect(ClientId, Config), @@ -182,14 +163,25 @@ recover_with_message_expiry_interval(Config) -> ClientId = atom_to_binary(?FUNCTION_NAME), C1 = connect(ClientId, Config), Start = os:system_time(second), - {ok, _} = emqtt:publish(C1, <<"topic/1">>, - <<"m1">>, [{retain, true}, {qos, 1}]), - {ok, _} = emqtt:publish(C1, <<"topic/2">>, #{'Message-Expiry-Interval' => 100}, - <<"m2">>, [{retain, true}, {qos, 1}]), - {ok, _} = emqtt:publish(C1, <<"topic/3">>, #{'Message-Expiry-Interval' => 3}, - <<"m3">>, [{retain, true}, {qos, 1}]), - {ok, _} = emqtt:publish(C1, <<"topic/4">>, #{'Message-Expiry-Interval' => 15}, - <<"m4">>, [{retain, true}, {qos, 1}]), + {ok, _} = emqtt:publish(C1, <<"topic/1">>, <<"m1">>, [{retain, true}, {qos, 1}]), + {ok, _} = + emqtt:publish(C1, + <<"topic/2">>, + #{'Message-Expiry-Interval' => 100}, + <<"m2">>, + [{retain, true}, {qos, 1}]), + {ok, _} = + emqtt:publish(C1, + <<"topic/3">>, + #{'Message-Expiry-Interval' => 3}, + <<"m3">>, + [{retain, true}, {qos, 1}]), + {ok, _} = + emqtt:publish(C1, + <<"topic/4">>, + #{'Message-Expiry-Interval' => 15}, + <<"m4">>, + [{retain, true}, {qos, 1}]), ok = emqtt:disconnect(C1), %% Takes around 9 seconds on Linux. ok = rabbit_ct_broker_helpers:restart_node(Config, 0), @@ -203,30 +195,82 @@ recover_with_message_expiry_interval(Config) -> timer:sleep(SleepMs), ElapsedSeconds2 = os:system_time(second) - Start, - {ok, _, [1,1,1,1]} = emqtt:subscribe(C2, [{<<"topic/1">>, qos1}, - {<<"topic/2">>, qos1}, - {<<"topic/3">>, qos1}, - {<<"topic/4">>, qos1}]), - receive {publish, #{client_pid := C2, - retain := true, - topic := <<"topic/1">>, - payload := <<"m1">>, - properties := Props}} - when map_size(Props) =:= 0 -> ok - after 30_000 -> ct:fail("did not topic/1") + {ok, _, [1, 1, 1, 1]} = + emqtt:subscribe(C2, + [{<<"topic/1">>, qos1}, + {<<"topic/2">>, qos1}, + {<<"topic/3">>, qos1}, + {<<"topic/4">>, qos1}]), + receive + {publish, + #{client_pid := C2, + retain := true, + topic := <<"topic/1">>, + payload := <<"m1">>, + properties := Props}} + when map_size(Props) =:= 0 -> + ok + after 30_000 -> + ct:fail("did not topic/1") end, - receive {publish, #{client_pid := C2, - retain := true, - topic := <<"topic/2">>, - payload := <<"m2">>, - properties := #{'Message-Expiry-Interval' := MEI}}} -> - assert_message_expiry_interval(100 - ElapsedSeconds2, MEI) - after 30_000 -> ct:fail("did not topic/2") + receive + {publish, + #{client_pid := C2, + retain := true, + topic := <<"topic/2">>, + payload := <<"m2">>, + properties := #{'Message-Expiry-Interval' := MEI}}} -> + assert_message_expiry_interval(100 - ElapsedSeconds2, MEI) + after 30_000 -> + ct:fail("did not topic/2") end, - receive Unexpected -> ct:fail("Received unexpectedly: ~p", [Unexpected]) - after 0 -> ok + receive + Unexpected -> + ct:fail("Received unexpectedly: ~p", [Unexpected]) + after 0 -> + ok end, ok = emqtt:disconnect(C2). + +%% ------------------------------------------------------------------- +%% If a client publishes a retained message to devices/sensor1/temperature and another +%% client subscribes to devices/+/temperature the client should be +%% sent retained message for the translated topic (devices/sensor1/temperature) +%% ------------------------------------------------------------------- +%% +retained_wildcard_single_level(Config) -> + C = connect(<<"wildcardClientRetainer">>, Config, [{ack_timeout, 1}]), + ok = + emqtt:publish(C, <<"devices/sensor1/temperature">>, #{}, <<"23.5">>, [{retain, true}]), + {ok, _, _} = emqtt:subscribe(C, <<"devices/+/temperature">>, qos1), + ok = expect_publishes(C, <<"devices/+/temperature">>, [<<"23.5">>]), + ok = emqtt:disconnect(C). + +%% Test multi-level wildcard (#) +retained_wildcard_multi_level(Config) -> + C = connect(<<"wildcardClientRetainer">>, Config, [{ack_timeout, 1}]), + ok = + emqtt:publish(C, + <<"devices/sensor1/readings/temperature">>, + #{}, + <<"23.5">>, + [{retain, true}]), + {ok, _, _} = emqtt:subscribe(C, <<"devices/#">>, qos1), + ok = expect_publishes(C, <<"devices/#">>, [<<"23.5">>]), + ok = emqtt:disconnect(C). + +%% Test mixed wildcards (+/#) +retained_wildcard_mixed(Config) -> + C = connect(<<"wildcardClientRetainer">>, Config, [{ack_timeout, 1}]), + ok = + emqtt:publish(C, + <<"devices/sensor1/readings/temperature">>, + #{}, + <<"23.5">>, + [{retain, true}]), + {ok, _, _} = emqtt:subscribe(C, <<"devices/+/readings/#">>, qos1), + ok = expect_publishes(C, <<"devices/+/readings/#">>, [<<"23.5">>]), + ok = emqtt:disconnect(C). diff --git a/deps/rabbitmq_prelaunch/BUILD.bazel b/deps/rabbitmq_prelaunch/BUILD.bazel index f9cd5eda7280..bf80ecf11e63 100644 --- a/deps/rabbitmq_prelaunch/BUILD.bazel +++ b/deps/rabbitmq_prelaunch/BUILD.bazel @@ -42,6 +42,10 @@ rabbitmq_app( app_name = APP_NAME, app_version = APP_VERSION, beam_files = [":beam_files"], + extra_apps = [ + "osiris", + "systemd", + ], license_files = [":license_files"], priv = [":priv"], deps = [ diff --git a/deps/rabbitmq_recent_history_exchange/BUILD.bazel b/deps/rabbitmq_recent_history_exchange/BUILD.bazel index 73121ad44906..9954ab0e1573 100644 --- a/deps/rabbitmq_recent_history_exchange/BUILD.bazel +++ b/deps/rabbitmq_recent_history_exchange/BUILD.bazel @@ -39,10 +39,10 @@ rabbitmq_app( license_files = [":license_files"], priv = [":priv"], deps = [ + "//deps/khepri:erlang_app", + "//deps/khepri_mnesia_migration:erlang_app", "//deps/rabbit:erlang_app", "//deps/rabbit_common:erlang_app", - "@khepri//:erlang_app", - "@khepri_mnesia_migration//:erlang_app", ], ) diff --git a/deps/rabbitmq_recent_history_exchange/app.bzl b/deps/rabbitmq_recent_history_exchange/app.bzl index 3bd05fe8ae54..606dabe4b310 100644 --- a/deps/rabbitmq_recent_history_exchange/app.bzl +++ b/deps/rabbitmq_recent_history_exchange/app.bzl @@ -18,10 +18,10 @@ def all_beam_files(name = "all_beam_files"): dest = "ebin", erlc_opts = "//:erlc_opts", deps = [ + "//deps/khepri:erlang_app", + "//deps/khepri_mnesia_migration:erlang_app", "//deps/rabbit:erlang_app", "//deps/rabbit_common:erlang_app", - "@khepri//:erlang_app", - "@khepri_mnesia_migration//:erlang_app", ], ) @@ -44,10 +44,10 @@ def all_test_beam_files(name = "all_test_beam_files"): dest = "test", erlc_opts = "//:test_erlc_opts", deps = [ + "//deps/khepri:erlang_app", + "//deps/khepri_mnesia_migration:erlang_app", "//deps/rabbit:erlang_app", "//deps/rabbit_common:erlang_app", - "@khepri//:erlang_app", - "@khepri_mnesia_migration//:erlang_app", ], ) diff --git a/deps/rabbitmq_shovel/app.bzl b/deps/rabbitmq_shovel/app.bzl index 509242770a22..d3cb55095742 100644 --- a/deps/rabbitmq_shovel/app.bzl +++ b/deps/rabbitmq_shovel/app.bzl @@ -232,7 +232,7 @@ def test_suite_beam_files(name = "test_suite_beam_files"): outs = ["test/rolling_upgrade_SUITE.beam"], app_name = "rabbitmq_shovel", erlc_opts = "//:test_erlc_opts", - deps = ["//deps/amqp_client:erlang_app", "@khepri//:erlang_app"], + deps = ["//deps/amqp_client:erlang_app", "//deps/khepri:erlang_app"], ) erlang_bytecode( name = "shovel_status_command_SUITE_beam_files", diff --git a/deps/rabbitmq_stream_common/BUILD.bazel b/deps/rabbitmq_stream_common/BUILD.bazel index ec030f85a9ce..fe6d956b249e 100644 --- a/deps/rabbitmq_stream_common/BUILD.bazel +++ b/deps/rabbitmq_stream_common/BUILD.bazel @@ -35,6 +35,7 @@ rabbitmq_app( app_description = APP_DESCRIPTION, app_name = APP_NAME, beam_files = [":beam_files"], + extra_apps = ["osiris"], license_files = [":license_files"], priv = [":priv"], )