diff --git a/README.md b/README.md index 84d286b..26ec731 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ to override the context values with configured ones. You can start an erl with: - $ erl -pa ebin -pa deps/*/ebin -s inets -s crypto -s ssl -s lhttpc -config development -s kinetic + $ erl -pa ebin -pa deps/*/ebin -s inets -s crypto -s ssl -s ehttpc -config development -s kinetic Erlang R16B03-1 (erts-5.10.4) [source] [64-bit] [smp:4:4] [async-threads:10] [hipe] [kernel-poll:false] [dtrace] Eshell V5.10.4 (abort with ^G) diff --git a/include/kinetic.hrl b/include/kinetic.hrl index d1ba9df..59f1f71 100644 --- a/include/kinetic.hrl +++ b/include/kinetic.hrl @@ -7,6 +7,5 @@ date :: undefined | string(), host :: undefined | string(), url :: undefined | string(), - lhttpc_opts = [] :: [any()], timeout :: undefined | pos_integer(), aws_credentials}). diff --git a/rebar.config b/rebar.config index c697423..391ffaa 100644 --- a/rebar.config +++ b/rebar.config @@ -16,10 +16,10 @@ {cover_print_enabled, true}. {deps, - [{lhttpc, "1.4.0", {pkg, nextroll_lhttpc}}, - {jiffy, "1.1.1"}, + [{jiffy, "1.1.1"}, {erliam, "1.0.1"}, - {b64fast, "0.2.3"}]}. + {b64fast, "0.2.3"}, + {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.7.1"}}}]}. {xref_checks, [undefined_function_calls, @@ -39,7 +39,7 @@ {spellcheck, [{ignore_regex, - "(eunit|~>|<-|//|=|[|]|[.]hrl|\\d[.]\\d|<<[\"]|[a-z][a-z][-][a-z]|[?][A-Z])"}, + "(eunit|~>|<-|//|=|[|]|[.]hrl|\\d[.]\\d|<<[\"]|[a-z][a-z][-][a-z]|[?][A-Z]|\/)"}, {files, ["src/*"]}, {additional_dictionaries, ["nextroll.dict", "kinesis.dict"]}]}. diff --git a/rebar.lock b/rebar.lock index 9d43cec..10385b0 100644 --- a/rebar.lock +++ b/rebar.lock @@ -1,17 +1,32 @@ {"1.2.0", [{<<"b64fast">>,{pkg,<<"b64fast">>,<<"0.2.3">>},0}, + {<<"cowlib">>,{pkg,<<"cowlib">>,<<"2.13.0">>},2}, + {<<"ehttpc">>, + {git,"https://github.com/emqx/ehttpc", + {ref,"c18e5efacde5556bb8bf86b1bc1e35a85a67928d"}}, + 0}, {<<"erliam">>,{pkg,<<"erliam">>,<<"1.0.1">>},0}, + {<<"gproc">>, + {git,"https://github.com/emqx/gproc", + {ref,"21a5995812498969bb5e47b520b47ea7c514f16b"}}, + 1}, + {<<"gun">>,{pkg,<<"gun">>,<<"2.1.0">>},1}, {<<"jiffy">>,{pkg,<<"jiffy">>,<<"1.1.1">>},0}, - {<<"lhttpc">>,{pkg,<<"nextroll_lhttpc">>,<<"1.4.0">>},0}]}. + {<<"snabbkaffe">>, + {git,"https://github.com/kafka4beam/snabbkaffe.git", + {ref,"b59298334ed349556f63405d1353184c63c66534"}}, + 1}]}. [ {pkg_hash,[ {<<"b64fast">>, <<"07649CF971A0ED088DEFC4F75767A52E08C468CC1D448693F4FB3051092AB987">>}, + {<<"cowlib">>, <<"DB8F7505D8332D98EF50A3EF34B34C1AFDDEC7506E4EE4DD4A3A266285D282CA">>}, {<<"erliam">>, <<"20E1ECB876AFDEEC2DE07483E2D174B1E3DB38848ED981145DAB9A889E7B55F9">>}, - {<<"jiffy">>, <<"ACA10F47AA91697BF24AB9582C74E00E8E95474C7EF9F76D4F1A338D0F5DE21B">>}, - {<<"lhttpc">>, <<"45282FF22BC55E6AE751CF87AC42C261DC4FAAFADD9C034E127ECED74E672FAB">>}]}, + {<<"gun">>, <<"B4E4CBBF3026D21981C447E9E7CA856766046EFF693720BA43114D7F5DE36E87">>}, + {<<"jiffy">>, <<"ACA10F47AA91697BF24AB9582C74E00E8E95474C7EF9F76D4F1A338D0F5DE21B">>}]}, {pkg_hash_ext,[ {<<"b64fast">>, <<"DE874B5302D607840B787E93785628035FF55334EBCFAFB6413B735983913D23">>}, + {<<"cowlib">>, <<"E1E1284DC3FC030A64B1AD0D8382AE7E99DA46C3246B815318A4B848873800A4">>}, {<<"erliam">>, <<"2EE375544AC36711BEEB5EC56DB060488447CECC308763BC8B4A4FEE894AAF76">>}, - {<<"jiffy">>, <<"62E1F0581C3C19C33A725C781DFA88410D8BFF1BBAFC3885A2552286B4785C4C">>}, - {<<"lhttpc">>, <<"57BA3D5720FBD17C75D8563169394B5F6CD160161D64A8A9F96F7E829221C648">>}]} + {<<"gun">>, <<"52FC7FC246BFC3B00E01AEA1C2854C70A366348574AB50C57DFE796D24A0101D">>}, + {<<"jiffy">>, <<"62E1F0581C3C19C33A725C781DFA88410D8BFF1BBAFC3885A2552286B4785C4C">>}]} ]. diff --git a/src/kinetic.app.src b/src/kinetic.app.src index 1afe6f5..27c6b3c 100644 --- a/src/kinetic.app.src +++ b/src/kinetic.app.src @@ -4,7 +4,7 @@ {vsn, git}, {modules, []}, {registered, [kinetic_config]}, - {applications, [kernel, stdlib, inets, crypto, ssl, jiffy, lhttpc, erliam]}, + {applications, [kernel, stdlib, inets, crypto, ssl, jiffy, ehttpc, erliam]}, {env, [{args, []}]}, {mod, {kinetic, []}}, {licenses, ["BSD-3-Clause"]}]}. diff --git a/src/kinetic.erl b/src/kinetic.erl index 5431cfd..bb116d2 100644 --- a/src/kinetic.erl +++ b/src/kinetic.erl @@ -32,14 +32,21 @@ stop() -> application:stop(kinetic). start(Opts) when is_list(Opts) -> + start_pool(), kinetic_sup:start_link(Opts). -spec start(normal | {takeover, node()} | {failover, node()}, any()) -> {ok, pid()}. start(_, Opts) -> + start_pool(), kinetic_sup:start_link(Opts). -spec stop(any()) -> ok. stop(_) -> + lists:foreach(fun(Region) -> + PoolName = kinetic_utils:pool_name(Region), + ok = ehttpc_sup:stop_pool(PoolName) + end, + kinetic_utils:regions()), ok. % Public API @@ -252,9 +259,7 @@ execute(Operation, Payload, Opts) -> #kinetic_arguments{aws_credentials = AwsCreds, region = Region, date = Date, - url = Url, host = Host, - lhttpc_opts = LHttpcOpts, timeout = Timeout} = kinetic_config:merge_args(Args, Opts), case kinetic_utils:encode({Payload}) of @@ -267,23 +272,26 @@ execute(Operation, Payload, Opts) -> #{"content-type" => "application/x-amz-json-1.1", "connection" => "keep-alive"}, Headers = - awsv4:headers(AwsCreds, - #{service => "kinesis", - target_api => Target, - method => "POST", - region => Region, - host => Host, - signed_headers => SignedHeaders, - aws_date => Date}, - iolist_to_binary(Body)), - - case lhttpc:request(Url, post, Headers, Body, Timeout, LHttpcOpts) of - {ok, {{200, _}, _ResponseHeaders, ResponseBody}} -> + [{Key, iolist_to_binary(Value)} + || {Key, Value} + <- awsv4:headers(AwsCreds, + #{service => "kinesis", + target_api => Target, + method => "POST", + region => Region, + host => Host, + signed_headers => SignedHeaders, + aws_date => Date}, + iolist_to_binary(Body))], + PoolName = kinetic_utils:pool_name(Region), + Worker = ehttpc_pool:pick_worker(PoolName), + case ehttpc:request(Worker, post, {"/", Headers, Body}, Timeout) of + {ok, 200, _, ResponseBody} -> {ok, kinetic_utils:decode(ResponseBody)}; - {ok, {{Code, _}, ResponseHeaders, ResponseBody}} -> - {error, {Code, ResponseHeaders, ResponseBody}}; - {error, E} -> - {error, E} + {ok, Code, RespHeaders, ResponseBody} -> + {error, {Code, RespHeaders, ResponseBody}}; + {error, Error} -> + {error, Error} end end end. @@ -299,3 +307,17 @@ record_status(Record) -> get_value(Key, TupleList) -> {Key, Value} = lists:keyfind(Key, 1, TupleList), Value. + +start_pool() -> + PoolSize = application:get_env(?MODULE, pool_size, 100), + GunOpts = application:get_env(?MODULE, gun_opts, []), + lists:foreach(fun(Region) -> + PoolName = kinetic_utils:pool_name(Region), + Endpoint = kinetic_utils:endpoint(Region), + ehttpc_sup:start_pool(PoolName, + [{host, Endpoint}, + {port, 443}, + {pool_size, PoolSize}, + {gun_opts, GunOpts}]) + end, + kinetic_utils:regions()). diff --git a/src/kinetic_config.erl b/src/kinetic_config.erl index f9b433e..ded230c 100644 --- a/src/kinetic_config.erl +++ b/src/kinetic_config.erl @@ -85,32 +85,18 @@ handle_info(_Info, State) -> % Internal implementation -region("us-east-1" ++ _R) -> - "us-east-1"; -region("us-west-1" ++ _R) -> - "us-west-1"; -region("us-west-2" ++ _R) -> - "us-west-2"; -region("ap-northeast-1" ++ _R) -> - "ap-northeast-1"; -region("ap-southeast-1" ++ _R) -> - "ap-southeast-1"; -region("eu-west-1" ++ _R) -> - "eu-west-1". - new_args(Opts) -> Region = case proplists:get_value(region, Opts, undefined) of undefined -> {ok, Zone} = imds:zone(), - region(Zone); + kinetic_utils:region(Zone); R -> R end, - LHttpcOpts = proplists:get_value(lhttpc_opts, Opts, []), DefaultTimeout = proplists:get_value(timeout, Opts, 5000), - Host = kinetic_utils:endpoint("kinesis", Region), + Host = kinetic_utils:endpoint(Region), Url = "https://" ++ Host, %% erliam should support named profiles for using specific roles or preconfigured @@ -133,7 +119,6 @@ new_args(Opts) -> date = awsv4:isonow(), host = Host, url = Url, - lhttpc_opts = LHttpcOpts, timeout = DefaultTimeout, aws_credentials = erliam:credentials()}. @@ -143,7 +128,7 @@ new_args(Opts) -> merge_args(Args, []) -> Args; merge_args(Args, [{region, Region} | Rest]) -> - Host = kinetic_utils:endpoint("kinesis", Region), + Host = kinetic_utils:endpoint(Region), Url = "https://" ++ Host, merge_args(Args#kinetic_arguments{region = Region, host = Host, diff --git a/src/kinetic_utils.erl b/src/kinetic_utils.erl index 5e8556a..862cb0b 100644 --- a/src/kinetic_utils.erl +++ b/src/kinetic_utils.erl @@ -1,18 +1,35 @@ -module(kinetic_utils). --export([endpoint/2, decode/1, encode/1]). +-export([pool_name/1, regions/0, region/1, endpoint/1, decode/1, encode/1]). -endpoint("kinesis", "us-east-1") -> +pool_name("us-east-1" ++ _R) -> + 'kinetic_pool_us-east-1'; +pool_name("us-west-2" ++ _R) -> + 'kinetic_pool_us-west-2'; +pool_name("ap-southeast-1" ++ _R) -> + 'kinetic_pool_ap-southeast-1'; +pool_name("eu-west-1" ++ _R) -> + 'kinetic_pool_eu-west-1'. + +regions() -> + ["us-east-1", "us-west-2", "ap-southeast-1", "eu-west-1"]. + +region("us-east-1" ++ _R) -> + "us-east-1"; +region("us-west-2" ++ _R) -> + "us-west-2"; +region("ap-southeast-1" ++ _R) -> + "ap-southeast-1"; +region("eu-west-1" ++ _R) -> + "eu-west-1". + +endpoint("us-east-1") -> "kinesis.us-east-1.amazonaws.com"; -endpoint("kinesis", "us-west-1") -> - "kinesis.us-west-1.amazonaws.com"; -endpoint("kinesis", "us-west-2") -> +endpoint("us-west-2") -> "kinesis.us-west-2.amazonaws.com"; -endpoint("kinesis", "eu-west-1") -> +endpoint("eu-west-1") -> "kinesis.eu-west-1.amazonaws.com"; -endpoint("kinesis", "ap-northeast-1") -> - "kinesis.ap-northeast-1.amazonaws.com"; -endpoint("kinesis", "ap-southeast-1") -> +endpoint("ap-southeast-1") -> "kinesis.ap-southeast-1.amazonaws.com". decode(<<"">>) -> diff --git a/test/kinetic_config_tests.erl b/test/kinetic_config_tests.erl index 11470f2..999b0fc 100644 --- a/test/kinetic_config_tests.erl +++ b/test/kinetic_config_tests.erl @@ -69,17 +69,11 @@ test_passed_metadata() -> kinetic_config:start_link([{aws_access_key_id, "whatever"}, {aws_secret_access_key, "secret"}]), ?assert(ets:info(?KINETIC_STREAM) =/= undefined), - {ok, - #kinetic_arguments{aws_credentials = fake_creds, - region = "us-east-1", - lhttpc_opts = []}} = + {ok, #kinetic_arguments{aws_credentials = fake_creds, region = "us-east-1"}} = kinetic_config:get_args(), kinetic_config:update_data([{aws_access_key_id, "whatever"}, {aws_secret_access_key, "secret"}]), - {ok, - #kinetic_arguments{aws_credentials = fake_creds, - region = "us-east-1", - lhttpc_opts = []}} = + {ok, #kinetic_arguments{aws_credentials = fake_creds, region = "us-east-1"}} = kinetic_config:get_args(), kinetic_config:stop(), {error, _} = kinetic_config:get_args(), diff --git a/test/kinetic_sup_tests.erl b/test/kinetic_sup_tests.erl index ae8d0e6..e561ccf 100644 --- a/test/kinetic_sup_tests.erl +++ b/test/kinetic_sup_tests.erl @@ -25,10 +25,7 @@ test_supervisor() -> {ok, Pid} = kinetic_sup:start_link([{aws_access_key_id, "whatever"}, {aws_secret_access_key, "secret"}]), - {ok, - #kinetic_arguments{aws_credentials = fake_creds, - region = "us-east-1", - lhttpc_opts = []}} = + {ok, #kinetic_arguments{aws_credentials = fake_creds, region = "us-east-1"}} = kinetic_config:get_args(), kinetic_sup:stop(Pid), diff --git a/test/kinetic_tests.erl b/test/kinetic_tests.erl index c0e3e28..120f64d 100644 --- a/test/kinetic_tests.erl +++ b/test/kinetic_tests.erl @@ -24,13 +24,12 @@ test_arg_setup(Opts) -> {ok, _args} = kinetic_config:update_data(Opts), - meck:new(lhttpc), - meck:expect(lhttpc, + meck:new(ehttpc), + meck:expect(ehttpc_pool, pick_worker, fun(_PoolName) -> worker end), + meck:expect(ehttpc, request, - fun (_Url, post, _Headers, _Body, _Timeout, error) -> - {ok, {{400, bla}, headers, body}}; - (_Url, post, _Headers, _Body, _Timeout, _Opts) -> - {ok, {{200, bla}, headers, <<"{\"hello\": \"world\"}">>}} + fun(_Worker, post, {_Url, _Headers, _Body}, _Timeout) -> + {ok, 200, headers, <<"{\"hello\": \"world\"}">>} end). test_setup() -> @@ -38,16 +37,13 @@ test_setup() -> test_arg_setup(Opts). test_error_setup() -> - Opts = - [{aws_access_key_id, "whatever"}, - {aws_secret_access_key, "secret"}, - {lhttpc_opts, error}], + Opts = [{aws_access_key_id, "whatever"}, {aws_secret_access_key, "secret"}], test_arg_setup(Opts). test_teardown(_) -> ets:delete(?KINETIC_DATA), meck:unload(imds), - meck:unload(lhttpc), + meck:unload(ehttpc), meck:unload(erliam), application:stop(ssl). @@ -94,23 +90,6 @@ test_normal_functions() -> split_shard]). test_error_functions() -> - {ok, _args} = - kinetic_config:update_data([{aws_access_key_id, "whatever"}, - {aws_secret_access_key, "secret"}, - {lhttpc_opts, error}]), - lists:foreach(fun(F) -> - [{error, {400, headers, body}} = erlang:apply(kinetic, F, Args) - || Args <- sample_arglists([])] - end, - [create_stream, - delete_stream, - describe_stream, - get_records, - get_shard_iterator, - list_streams, - merge_shards, - put_record, - split_shard]), ets:delete_all_objects(?KINETIC_DATA), lists:foreach(fun(F) -> [{error, missing_args} = erlang:apply(kinetic, F, Args) @@ -130,18 +109,17 @@ put_records_test_() -> {setup, fun test_setup/0, fun test_teardown/1, fun test_put_records/0}. test_put_records() -> - meck:expect(lhttpc, + meck:expect(ehttpc_pool, pick_worker, fun(_PoolName) -> worker end), + meck:expect(ehttpc, request, - fun (_Url, post, _Headers, _Body, _Timeout, error) -> - {ok, {{400, bla}, headers, body}}; - (_Url, post, _Headers, _Body, _Timeout, _Opts) -> - {ok, - {{200, bla}, - headers, - <<"{\"FailedRecordCount\": 1,\n \"Records\":\n " - " [{\"SequenceNumber\": \"10\", \"ShardId\": " - "\"5\" },\n {\"ErrorCode\": \"404\", " - "\"ErrorMessage\": \"Not found\"}]}">>}} + fun(_Worker, post, {_Url, _Headers, _Body}, _Timeout) -> + {ok, + 200, + headers, + <<"{\"FailedRecordCount\": 1,\n \"Records\":\n " + " [{\"SequenceNumber\": \"10\", \"ShardId\": " + "\"5\" },\n {\"ErrorCode\": \"404\", " + "\"ErrorMessage\": \"Not found\"}]}">>} end), {ok, [Result1, Result2]} = erlang:apply(kinetic, put_records, [[]]), diff --git a/test/kinetic_utils_tests.erl b/test/kinetic_utils_tests.erl index da4cdeb..609aa99 100644 --- a/test/kinetic_utils_tests.erl +++ b/test/kinetic_utils_tests.erl @@ -19,10 +19,9 @@ test_json_encoding_decoding() -> test_endpoint() -> Service = "kinesis", - Regions = - ["us-east-1", "us-west-1", "us-west-2", "eu-west-1", "ap-northeast-1", "ap-southeast-1"], + Regions = ["us-east-1", "us-west-2", "eu-west-1", "ap-southeast-1"], lists:foreach(fun(Region) -> Url = Service ++ "." ++ Region ++ ".amazonaws.com", - Url = kinetic_utils:endpoint(Service, Region) + Url = kinetic_utils:endpoint(Region) end, Regions).