Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 40 additions & 8 deletions deps/rabbit/src/rabbit_logger_exchange_h.erl
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,16 @@ filter_config(Config) ->
log(#{meta := #{mfa := {?MODULE, _, _}}}, _) ->
ok;
log(LogEvent, Config) ->
%% Publishing the log message to an exchange might trigger more logging,
%% triggering an infinite logging loop. To prevent that, we make use the
%% process dictionary to record the fact that this logger was already
%% entered. If that's the case when this function is called, we just drop
%% the log event.
Key = ?MODULE,
ReEntered = erlang:get(Key) =/= undefined,
case rabbit_boot_state:get() of
ready ->
ready when not ReEntered ->
erlang:put(Key, ?FUNCTION_NAME),
try
do_log(LogEvent, Config)
catch
Expand All @@ -53,22 +61,30 @@ log(LogEvent, Config) ->
%% removes the logger_exchange handler, which in
%% turn deletes the log exchange and its bindings
erlang:display({?MODULE, crashed, {C, R, S}})
after
erlang:erase(Key)
end,
ok;
_ -> ok
end.

do_log(LogEvent, #{config := #{exchange := Exchange}} = Config) ->
do_log(
LogEvent,
#{config := #{exchange := Exchange,
setup_proc := Pid}} = Config) ->
RoutingKey = make_routing_key(LogEvent, Config),
PBasic = log_event_to_amqp_msg(LogEvent, Config),
Body = try_format_body(LogEvent, Config),
Content = rabbit_basic:build_content(PBasic, Body),
case mc_amqpl:message(Exchange, RoutingKey, Content) of
{ok, Msg} ->
case rabbit_queue_type:publish_at_most_once(Exchange, Msg) of
ok -> ok;
{error, not_found} -> ok
end;
%% Publishing a message might involve a Erlang process, like a Ra
%% server process, to log something and call itself. We need to
%% publish the message asynchronously from a separate process and
%% ignore the fate of that publish, to not block an Erlang
%% process.
Pid ! {publish, Msg},
ok;
{error, _Reason} ->
%% it would be good to log this error but can we?
ok
Expand Down Expand Up @@ -164,12 +180,19 @@ wait_for_initial_pass(N) ->
end.

setup_proc(
#{config := #{exchange := Exchange}} = Config) ->
#{id := Id,
config := #{exchange := Exchange}} = Config) ->
%% We register this process using the logger handler ID. It makes
%% debugging convenient but it's not critical. That's why we catch any
%% exceptions and ignore the return value.
_ = catch erlang:register(Id, self()),

case declare_exchange(Config) of
ok ->
?LOG_INFO(
"Logging to ~ts ready", [rabbit_misc:rs(Exchange)],
#{domain => ?RMQLOG_DOMAIN_GLOBAL});
#{domain => ?RMQLOG_DOMAIN_GLOBAL}),
loop(Config);
error ->
?LOG_DEBUG(
"Logging to ~ts not ready, trying again in ~b second(s)",
Expand All @@ -182,6 +205,15 @@ setup_proc(
end
end.

loop(#{config := #{exchange := Exchange}} = Config) ->
receive
{publish, Msg} ->
_ = rabbit_queue_type:publish_at_most_once(Exchange, Msg),
loop(Config);
stop ->
ok
end.

declare_exchange(#{config := #{exchange := Exchange}}) ->
try rabbit_exchange:declare(
Exchange, topic, true, false, true, [], ?INTERNAL_USER) of
Expand Down
34 changes: 27 additions & 7 deletions deps/rabbit/test/logging_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@

logging_to_exchange_works/1,
update_log_exchange_config/1,
use_exchange_logger_when_enabling_khepri_db/1,

logging_to_syslog_works/1]).

Expand Down Expand Up @@ -99,7 +100,8 @@ groups() ->

{exchange_output, [],
[logging_to_exchange_works,
update_log_exchange_config]},
update_log_exchange_config,
use_exchange_logger_when_enabling_khepri_db]},

{syslog_output, [],
[logging_to_syslog_works]}
Expand Down Expand Up @@ -150,17 +152,27 @@ init_per_testcase(Testcase, Config) ->
%% group will run in the context of that RabbitMQ node.
exchange_output ->
ExchProps = [{enabled, true},
{level, debug}] ,
{level, debug}],
Config1 = rabbit_ct_helpers:set_config(
Config,
[{rmq_nodes_count, 1},
{rmq_nodename_suffix, Testcase}]),
Config2 = rabbit_ct_helpers:merge_app_env(
Config1,
[{rmq_nodename_suffix, Testcase}]),
Config2 = case Testcase of
use_exchange_logger_when_enabling_khepri_db ->
rabbit_ct_helpers:set_config(
Config1,
[{rmq_nodes_count, 3},
{metadata_store, mnesia}]);
_ ->
rabbit_ct_helpers:set_config(
Config1,
[{rmq_nodes_count, 1}])
end,
Config3 = rabbit_ct_helpers:merge_app_env(
Config2,
{rabbit, [{log, [{exchange, ExchProps},
{file, [{level, debug}]}]}]}),
rabbit_ct_helpers:run_steps(
Config2,
Config3,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps());

Expand Down Expand Up @@ -1102,6 +1114,14 @@ update_log_exchange_config(Config) ->
?assertEqual(HandlerConfig1, HandlerConfig2),
ok.

use_exchange_logger_when_enabling_khepri_db(Config) ->
?assertNot(rabbit_ct_broker_helpers:rpc(
Config, 0,
rabbit_feature_flags, is_enabled, [khepri_db])),
?assertEqual(
ok,
rabbit_ct_broker_helpers:enable_feature_flag(Config, khepri_db)).

logging_to_syslog_works(Config) ->
Context = default_context(Config),
ok = application:set_env(
Expand Down
Loading