Skip to content

Commit a9c193c

Browse files
authored
Merge pull request #458 from hairyhum/kpro-connection-timeout
Propagate connect_timeout argument to kpro API functions
2 parents 6410360 + 3e4f590 commit a9c193c

File tree

3 files changed

+33
-13
lines changed

3 files changed

+33
-13
lines changed

changelog.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
* 3.16.2
2+
* Propagate `connect_timeout` config to `kpro` API functions as `timeout` arg
3+
affected APIs: connect_group_coordinator, create_topics, delete_topics,
4+
resolve_offset, fetch, fold, fetch_committed_offsets
15
* 3.16.1
26
* Fix `brod` script in `brod-cli` in release.
37
* Support `rebalance_timeout` consumer group option

src/brod.erl

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -938,7 +938,8 @@ fetch(Hosts, Topic, Partition, Offset,
938938
-spec connect_leader([endpoint()], topic(), partition(),
939939
conn_config()) -> {ok, pid()}.
940940
connect_leader(Hosts, Topic, Partition, ConnConfig) ->
941-
kpro:connect_partition_leader(Hosts, ConnConfig, Topic, Partition).
941+
KproOptions = brod_utils:kpro_connection_options(ConnConfig),
942+
kpro:connect_partition_leader(Hosts, ConnConfig, Topic, Partition, KproOptions).
942943

943944
%% @doc List ALL consumer groups in the given kafka cluster.
944945
%% NOTE: Exception if failed to connect any of the coordinator brokers.
@@ -975,7 +976,8 @@ describe_groups(CoordinatorEndpoint, ConnCfg, IDs) ->
975976
-spec connect_group_coordinator([endpoint()], conn_config(), group_id()) ->
976977
{ok, pid()} | {error, any()}.
977978
connect_group_coordinator(BootstrapEndpoints, ConnCfg, GroupId) ->
978-
Args = #{type => group, id => GroupId},
979+
KproOptions = brod_utils:kpro_connection_options(ConnCfg),
980+
Args = maps:merge(KproOptions, #{type => group, id => GroupId}),
979981
kpro:connect_coordinator(BootstrapEndpoints, ConnCfg, Args).
980982

981983
%% @doc Fetch committed offsets for ALL topics in the given consumer group.

src/brod_utils.erl

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
, resolve_offset/4
5959
, resolve_offset/5
6060
, resolve_offset/6
61+
, kpro_connection_options/1
6162
]).
6263

6364
-include("brod_int.hrl").
@@ -93,7 +94,8 @@ create_topics(Hosts, TopicConfigs, RequestConfigs) ->
9394
validate_only => boolean()}, conn_config()) ->
9495
{ok, kpro:struct()} | {error, any()} | ok.
9596
create_topics(Hosts, TopicConfigs, RequestConfigs, ConnCfg) ->
96-
with_conn(kpro:connect_controller(Hosts, nolink(ConnCfg)),
97+
KproOpts = kpro_connection_options(ConnCfg),
98+
with_conn(kpro:connect_controller(Hosts, nolink(ConnCfg), KproOpts),
9799
fun(Pid) ->
98100
Request = brod_kafka_request:create_topics(
99101
Pid, TopicConfigs, RequestConfigs),
@@ -110,7 +112,8 @@ delete_topics(Hosts, Topics, Timeout) ->
110112
-spec delete_topics([endpoint()], [topic()], pos_integer(), conn_config()) ->
111113
{ok, kpro:struct()} | {error, any()}.
112114
delete_topics(Hosts, Topics, Timeout, ConnCfg) ->
113-
with_conn(kpro:connect_controller(Hosts, nolink(ConnCfg)),
115+
KproOpts = kpro_connection_options(ConnCfg),
116+
with_conn(kpro:connect_controller(Hosts, nolink(ConnCfg), KproOpts),
114117
fun(Pid) ->
115118
Request = brod_kafka_request:delete_topics(
116119
Pid, Topics, Timeout),
@@ -148,20 +151,18 @@ get_metadata(Hosts, Topics, ConnCfg) ->
148151
offset_time(), conn_config()) ->
149152
{ok, offset()} | {error, any()}.
150153
resolve_offset(Hosts, Topic, Partition, Time, ConnCfg) ->
151-
Timeout =
152-
proplists:get_value(connect_timeout, ConnCfg, ?BROD_DEFAULT_TIMEOUT),
153-
Opts = #{timeout => Timeout},
154-
resolve_offset(Hosts, Topic, Partition, Time, ConnCfg, Opts).
154+
KproOpts = kpro_connection_options(ConnCfg),
155+
resolve_offset(Hosts, Topic, Partition, Time, ConnCfg, KproOpts).
155156

156157
%% @doc Resolve timestamp to real offset.
157158
-spec resolve_offset([endpoint()], topic(), partition(),
158159
offset_time(), conn_config(),
159160
#{timeout => kpro:int32()}) ->
160161
{ok, offset()} | {error, any()}.
161-
resolve_offset(Hosts, Topic, Partition, Time, ConnCfg, Opts) ->
162+
resolve_offset(Hosts, Topic, Partition, Time, ConnCfg, KproOpts) ->
162163
with_conn(
163164
kpro:connect_partition_leader(Hosts, nolink(ConnCfg),
164-
Topic, Partition, Opts),
165+
Topic, Partition, KproOpts),
165166
fun(Pid) -> resolve_offset(Pid, Topic, Partition, Time) end).
166167

167168
%% @doc Resolve timestamp or semantic offset to real offset.
@@ -284,8 +285,9 @@ flatten_batches(BeginOffset, Header, Batches0) ->
284285
fetch(Hosts, Topic, Partition, Offset, Opts) when is_list(Hosts) ->
285286
fetch({Hosts, []}, Topic, Partition, Offset, Opts);
286287
fetch({Hosts, ConnCfg}, Topic, Partition, Offset, Opts) ->
288+
KproOpts = kpro_connection_options(ConnCfg),
287289
with_conn(
288-
kpro:connect_partition_leader(Hosts, nolink(ConnCfg), Topic, Partition),
290+
kpro:connect_partition_leader(Hosts, nolink(ConnCfg), Topic, Partition, KproOpts),
289291
fun(Conn) -> fetch(Conn, Topic, Partition, Offset, Opts) end);
290292
fetch(Client, Topic, Partition, Offset, Opts) when is_atom(Client) ->
291293
case brod_client:get_leader_connection(Client, Topic, Partition) of
@@ -306,8 +308,9 @@ fold(Hosts, Topic, Partition, Offset, Opts,
306308
Acc, Fun, Limits) when is_list(Hosts) ->
307309
fold({Hosts, []}, Topic, Partition, Offset, Opts, Acc, Fun, Limits);
308310
fold({Hosts, ConnCfg}, Topic, Partition, Offset, Opts, Acc, Fun, Limits) ->
311+
KproOpts = kpro_connection_options(ConnCfg),
309312
case with_conn(
310-
kpro:connect_partition_leader(Hosts, nolink(ConnCfg), Topic, Partition),
313+
kpro:connect_partition_leader(Hosts, nolink(ConnCfg), Topic, Partition, KproOpts),
311314
fun(Conn) -> fold(Conn, Topic, Partition, Offset, Opts,
312315
Acc, Fun, Limits) end) of
313316
{error, Reason} ->
@@ -376,7 +379,8 @@ init_sasl_opt(Config) ->
376379
group_id(), [topic()]) ->
377380
{ok, [kpro:struct()]} | {error, any()}.
378381
fetch_committed_offsets(BootstrapEndpoints, ConnCfg, GroupId, Topics) ->
379-
Args = #{type => group, id => GroupId},
382+
KproOpts = kpro_connection_options(ConnCfg),
383+
Args = maps:merge(KproOpts, #{type => group, id => GroupId}),
380384
with_conn(
381385
kpro:connect_coordinator(BootstrapEndpoints, nolink(ConnCfg), Args),
382386
fun(Pid) -> do_fetch_committed_offsets(Pid, GroupId, Topics) end).
@@ -595,6 +599,16 @@ get_stable_offset(Header) ->
595599
%% handle the case when high_watermark < last_stable_offset
596600
min(StableOffset, HighWmOffset).
597601

602+
%% @doc get kpro connection options from brod connection config
603+
kpro_connection_options(ConnCfg) ->
604+
Timeout = case ConnCfg of
605+
List when is_list(List) ->
606+
proplists:get_value(connect_timeout, List, ?BROD_DEFAULT_TIMEOUT);
607+
Map when is_map(Map) ->
608+
maps:get(connect_timeout, Map, ?BROD_DEFAULT_TIMEOUT)
609+
end,
610+
#{timeout => Timeout}.
611+
598612
%%%_* Internal functions =======================================================
599613

600614
do_fold(Spawn, {Pid, Mref}, Offset, Acc, Fun, End, Count) ->

0 commit comments

Comments
 (0)