Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ to override the context values with configured ones.

You can start an erl with:

$ erl -pa ebin -pa deps/*/ebin -s inets -s crypto -s ssl -s lhttpc -config development -s kinetic
$ erl -pa ebin -pa deps/*/ebin -s inets -s crypto -s ssl -s ehttpc -config development -s kinetic
Erlang R16B03-1 (erts-5.10.4) [source] [64-bit] [smp:4:4] [async-threads:10] [hipe] [kernel-poll:false] [dtrace]

Eshell V5.10.4 (abort with ^G)
Expand Down
1 change: 0 additions & 1 deletion include/kinetic.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,5 @@
date :: undefined | string(),
host :: undefined | string(),
url :: undefined | string(),
lhttpc_opts = [] :: [any()],
timeout :: undefined | pos_integer(),
aws_credentials}).
8 changes: 4 additions & 4 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
{cover_print_enabled, true}.

{deps,
[{lhttpc, "1.4.0", {pkg, nextroll_lhttpc}},
{jiffy, "1.1.1"},
[{jiffy, "1.1.1"},
{erliam, "1.0.1"},
{b64fast, "0.2.3"}]}.
{b64fast, "0.2.3"},
{ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.7.1"}}}]}.

{xref_checks,
[undefined_function_calls,
Expand All @@ -39,7 +39,7 @@

{spellcheck,
[{ignore_regex,
"(eunit|~>|<-|//|=|[|]|[.]hrl|\\d[.]\\d|<<[\"]|[a-z][a-z][-][a-z]|[?][A-Z])"},
"(eunit|~>|<-|//|=|[|]|[.]hrl|\\d[.]\\d|<<[\"]|[a-z][a-z][-][a-z]|[?][A-Z]|\/)"},
{files, ["src/*"]},
{additional_dictionaries, ["nextroll.dict", "kinesis.dict"]}]}.

Expand Down
25 changes: 20 additions & 5 deletions rebar.lock
Original file line number Diff line number Diff line change
@@ -1,17 +1,32 @@
{"1.2.0",
[{<<"b64fast">>,{pkg,<<"b64fast">>,<<"0.2.3">>},0},
{<<"cowlib">>,{pkg,<<"cowlib">>,<<"2.13.0">>},2},
{<<"ehttpc">>,
{git,"https://github.com/emqx/ehttpc",
{ref,"c18e5efacde5556bb8bf86b1bc1e35a85a67928d"}},
0},
{<<"erliam">>,{pkg,<<"erliam">>,<<"1.0.1">>},0},
{<<"gproc">>,
{git,"https://github.com/emqx/gproc",
{ref,"21a5995812498969bb5e47b520b47ea7c514f16b"}},
1},
{<<"gun">>,{pkg,<<"gun">>,<<"2.1.0">>},1},
{<<"jiffy">>,{pkg,<<"jiffy">>,<<"1.1.1">>},0},
{<<"lhttpc">>,{pkg,<<"nextroll_lhttpc">>,<<"1.4.0">>},0}]}.
{<<"snabbkaffe">>,
{git,"https://github.com/kafka4beam/snabbkaffe.git",
{ref,"b59298334ed349556f63405d1353184c63c66534"}},
1}]}.
[
{pkg_hash,[
{<<"b64fast">>, <<"07649CF971A0ED088DEFC4F75767A52E08C468CC1D448693F4FB3051092AB987">>},
{<<"cowlib">>, <<"DB8F7505D8332D98EF50A3EF34B34C1AFDDEC7506E4EE4DD4A3A266285D282CA">>},
{<<"erliam">>, <<"20E1ECB876AFDEEC2DE07483E2D174B1E3DB38848ED981145DAB9A889E7B55F9">>},
{<<"jiffy">>, <<"ACA10F47AA91697BF24AB9582C74E00E8E95474C7EF9F76D4F1A338D0F5DE21B">>},
{<<"lhttpc">>, <<"45282FF22BC55E6AE751CF87AC42C261DC4FAAFADD9C034E127ECED74E672FAB">>}]},
{<<"gun">>, <<"B4E4CBBF3026D21981C447E9E7CA856766046EFF693720BA43114D7F5DE36E87">>},
{<<"jiffy">>, <<"ACA10F47AA91697BF24AB9582C74E00E8E95474C7EF9F76D4F1A338D0F5DE21B">>}]},
{pkg_hash_ext,[
{<<"b64fast">>, <<"DE874B5302D607840B787E93785628035FF55334EBCFAFB6413B735983913D23">>},
{<<"cowlib">>, <<"E1E1284DC3FC030A64B1AD0D8382AE7E99DA46C3246B815318A4B848873800A4">>},
{<<"erliam">>, <<"2EE375544AC36711BEEB5EC56DB060488447CECC308763BC8B4A4FEE894AAF76">>},
{<<"jiffy">>, <<"62E1F0581C3C19C33A725C781DFA88410D8BFF1BBAFC3885A2552286B4785C4C">>},
{<<"lhttpc">>, <<"57BA3D5720FBD17C75D8563169394B5F6CD160161D64A8A9F96F7E829221C648">>}]}
{<<"gun">>, <<"52FC7FC246BFC3B00E01AEA1C2854C70A366348574AB50C57DFE796D24A0101D">>},
{<<"jiffy">>, <<"62E1F0581C3C19C33A725C781DFA88410D8BFF1BBAFC3885A2552286B4785C4C">>}]}
].
2 changes: 1 addition & 1 deletion src/kinetic.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
{vsn, git},
{modules, []},
{registered, [kinetic_config]},
{applications, [kernel, stdlib, inets, crypto, ssl, jiffy, lhttpc, erliam]},
{applications, [kernel, stdlib, inets, crypto, ssl, jiffy, ehttpc, erliam]},
{env, [{args, []}]},
{mod, {kinetic, []}},
{licenses, ["BSD-3-Clause"]}]}.
58 changes: 40 additions & 18 deletions src/kinetic.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,21 @@ stop() ->
application:stop(kinetic).

start(Opts) when is_list(Opts) ->
start_pool(),
kinetic_sup:start_link(Opts).

-spec start(normal | {takeover, node()} | {failover, node()}, any()) -> {ok, pid()}.
start(_, Opts) ->
start_pool(),
kinetic_sup:start_link(Opts).

-spec stop(any()) -> ok.
stop(_) ->
lists:foreach(fun(Region) ->
PoolName = kinetic_utils:pool_name(Region),
ok = ehttpc_sup:stop_pool(PoolName)
end,
kinetic_utils:regions()),
ok.

% Public API
Expand Down Expand Up @@ -252,9 +259,7 @@ execute(Operation, Payload, Opts) ->
#kinetic_arguments{aws_credentials = AwsCreds,
region = Region,
date = Date,
url = Url,
host = Host,
lhttpc_opts = LHttpcOpts,
timeout = Timeout} =
kinetic_config:merge_args(Args, Opts),
case kinetic_utils:encode({Payload}) of
Expand All @@ -267,23 +272,26 @@ execute(Operation, Payload, Opts) ->
#{"content-type" => "application/x-amz-json-1.1",
"connection" => "keep-alive"},
Headers =
awsv4:headers(AwsCreds,
#{service => "kinesis",
target_api => Target,
method => "POST",
region => Region,
host => Host,
signed_headers => SignedHeaders,
aws_date => Date},
iolist_to_binary(Body)),

case lhttpc:request(Url, post, Headers, Body, Timeout, LHttpcOpts) of
{ok, {{200, _}, _ResponseHeaders, ResponseBody}} ->
[{Key, iolist_to_binary(Value)}
|| {Key, Value}
<- awsv4:headers(AwsCreds,
#{service => "kinesis",
target_api => Target,
method => "POST",
region => Region,
host => Host,
signed_headers => SignedHeaders,
aws_date => Date},
iolist_to_binary(Body))],
PoolName = kinetic_utils:pool_name(Region),
Worker = ehttpc_pool:pick_worker(PoolName),
case ehttpc:request(Worker, post, {"/", Headers, Body}, Timeout) of
{ok, 200, _, ResponseBody} ->
{ok, kinetic_utils:decode(ResponseBody)};
{ok, {{Code, _}, ResponseHeaders, ResponseBody}} ->
{error, {Code, ResponseHeaders, ResponseBody}};
{error, E} ->
{error, E}
{ok, Code, RespHeaders, ResponseBody} ->
{error, {Code, RespHeaders, ResponseBody}};
{error, Error} ->
{error, Error}
end
end
end.
Expand All @@ -299,3 +307,17 @@ record_status(Record) ->
get_value(Key, TupleList) ->
{Key, Value} = lists:keyfind(Key, 1, TupleList),
Value.

start_pool() ->
PoolSize = application:get_env(?MODULE, pool_size, 100),
GunOpts = application:get_env(?MODULE, gun_opts, []),
lists:foreach(fun(Region) ->
PoolName = kinetic_utils:pool_name(Region),
Endpoint = kinetic_utils:endpoint(Region),
ehttpc_sup:start_pool(PoolName,
[{host, Endpoint},
{port, 443},
{pool_size, PoolSize},
{gun_opts, GunOpts}])
end,
kinetic_utils:regions()).
21 changes: 3 additions & 18 deletions src/kinetic_config.erl
Original file line number Diff line number Diff line change
Expand Up @@ -85,32 +85,18 @@ handle_info(_Info, State) ->

% Internal implementation

region("us-east-1" ++ _R) ->
"us-east-1";
region("us-west-1" ++ _R) ->
"us-west-1";
region("us-west-2" ++ _R) ->
"us-west-2";
region("ap-northeast-1" ++ _R) ->
"ap-northeast-1";
region("ap-southeast-1" ++ _R) ->
"ap-southeast-1";
region("eu-west-1" ++ _R) ->
"eu-west-1".

new_args(Opts) ->
Region =
case proplists:get_value(region, Opts, undefined) of
undefined ->
{ok, Zone} = imds:zone(),
region(Zone);
kinetic_utils:region(Zone);
R ->
R
end,

LHttpcOpts = proplists:get_value(lhttpc_opts, Opts, []),
DefaultTimeout = proplists:get_value(timeout, Opts, 5000),
Host = kinetic_utils:endpoint("kinesis", Region),
Host = kinetic_utils:endpoint(Region),
Url = "https://" ++ Host,

%% erliam should support named profiles for using specific roles or preconfigured
Expand All @@ -133,7 +119,6 @@ new_args(Opts) ->
date = awsv4:isonow(),
host = Host,
url = Url,
lhttpc_opts = LHttpcOpts,
timeout = DefaultTimeout,
aws_credentials = erliam:credentials()}.

Expand All @@ -143,7 +128,7 @@ new_args(Opts) ->
merge_args(Args, []) ->
Args;
merge_args(Args, [{region, Region} | Rest]) ->
Host = kinetic_utils:endpoint("kinesis", Region),
Host = kinetic_utils:endpoint(Region),
Url = "https://" ++ Host,
merge_args(Args#kinetic_arguments{region = Region,
host = Host,
Expand Down
35 changes: 26 additions & 9 deletions src/kinetic_utils.erl
Original file line number Diff line number Diff line change
@@ -1,18 +1,35 @@
-module(kinetic_utils).

-export([endpoint/2, decode/1, encode/1]).
-export([pool_name/1, regions/0, region/1, endpoint/1, decode/1, encode/1]).

endpoint("kinesis", "us-east-1") ->
pool_name("us-east-1" ++ _R) ->
'kinetic_pool_us-east-1';
pool_name("us-west-2" ++ _R) ->
'kinetic_pool_us-west-2';
pool_name("ap-southeast-1" ++ _R) ->
'kinetic_pool_ap-southeast-1';
pool_name("eu-west-1" ++ _R) ->
'kinetic_pool_eu-west-1'.

regions() ->
["us-east-1", "us-west-2", "ap-southeast-1", "eu-west-1"].

region("us-east-1" ++ _R) ->
"us-east-1";
region("us-west-2" ++ _R) ->
"us-west-2";
region("ap-southeast-1" ++ _R) ->
"ap-southeast-1";
region("eu-west-1" ++ _R) ->
"eu-west-1".

endpoint("us-east-1") ->
"kinesis.us-east-1.amazonaws.com";
endpoint("kinesis", "us-west-1") ->
"kinesis.us-west-1.amazonaws.com";
endpoint("kinesis", "us-west-2") ->
endpoint("us-west-2") ->
"kinesis.us-west-2.amazonaws.com";
endpoint("kinesis", "eu-west-1") ->
endpoint("eu-west-1") ->
"kinesis.eu-west-1.amazonaws.com";
endpoint("kinesis", "ap-northeast-1") ->
"kinesis.ap-northeast-1.amazonaws.com";
endpoint("kinesis", "ap-southeast-1") ->
endpoint("ap-southeast-1") ->
"kinesis.ap-southeast-1.amazonaws.com".

decode(<<"">>) ->
Expand Down
10 changes: 2 additions & 8 deletions test/kinetic_config_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -69,17 +69,11 @@ test_passed_metadata() ->
kinetic_config:start_link([{aws_access_key_id, "whatever"},
{aws_secret_access_key, "secret"}]),
?assert(ets:info(?KINETIC_STREAM) =/= undefined),
{ok,
#kinetic_arguments{aws_credentials = fake_creds,
region = "us-east-1",
lhttpc_opts = []}} =
{ok, #kinetic_arguments{aws_credentials = fake_creds, region = "us-east-1"}} =
kinetic_config:get_args(),
kinetic_config:update_data([{aws_access_key_id, "whatever"},
{aws_secret_access_key, "secret"}]),
{ok,
#kinetic_arguments{aws_credentials = fake_creds,
region = "us-east-1",
lhttpc_opts = []}} =
{ok, #kinetic_arguments{aws_credentials = fake_creds, region = "us-east-1"}} =
kinetic_config:get_args(),
kinetic_config:stop(),
{error, _} = kinetic_config:get_args(),
Expand Down
5 changes: 1 addition & 4 deletions test/kinetic_sup_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,7 @@ test_supervisor() ->
{ok, Pid} =
kinetic_sup:start_link([{aws_access_key_id, "whatever"},
{aws_secret_access_key, "secret"}]),
{ok,
#kinetic_arguments{aws_credentials = fake_creds,
region = "us-east-1",
lhttpc_opts = []}} =
{ok, #kinetic_arguments{aws_credentials = fake_creds, region = "us-east-1"}} =
kinetic_config:get_args(),

kinetic_sup:stop(Pid),
Expand Down
56 changes: 17 additions & 39 deletions test/kinetic_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,30 +24,26 @@ test_arg_setup(Opts) ->

{ok, _args} = kinetic_config:update_data(Opts),

meck:new(lhttpc),
meck:expect(lhttpc,
meck:new(ehttpc),
meck:expect(ehttpc_pool, pick_worker, fun(_PoolName) -> worker end),
meck:expect(ehttpc,
request,
fun (_Url, post, _Headers, _Body, _Timeout, error) ->
{ok, {{400, bla}, headers, body}};
(_Url, post, _Headers, _Body, _Timeout, _Opts) ->
{ok, {{200, bla}, headers, <<"{\"hello\": \"world\"}">>}}
fun(_Worker, post, {_Url, _Headers, _Body}, _Timeout) ->
{ok, 200, headers, <<"{\"hello\": \"world\"}">>}
end).

test_setup() ->
Opts = [{aws_access_key_id, "whatever"}, {aws_secret_access_key, "secret"}],
test_arg_setup(Opts).

test_error_setup() ->
Opts =
[{aws_access_key_id, "whatever"},
{aws_secret_access_key, "secret"},
{lhttpc_opts, error}],
Opts = [{aws_access_key_id, "whatever"}, {aws_secret_access_key, "secret"}],
test_arg_setup(Opts).

test_teardown(_) ->
ets:delete(?KINETIC_DATA),
meck:unload(imds),
meck:unload(lhttpc),
meck:unload(ehttpc),
meck:unload(erliam),
application:stop(ssl).

Expand Down Expand Up @@ -94,23 +90,6 @@ test_normal_functions() ->
split_shard]).

test_error_functions() ->
{ok, _args} =
kinetic_config:update_data([{aws_access_key_id, "whatever"},
{aws_secret_access_key, "secret"},
{lhttpc_opts, error}]),
lists:foreach(fun(F) ->
[{error, {400, headers, body}} = erlang:apply(kinetic, F, Args)
|| Args <- sample_arglists([])]
end,
[create_stream,
delete_stream,
describe_stream,
get_records,
get_shard_iterator,
list_streams,
merge_shards,
put_record,
split_shard]),
ets:delete_all_objects(?KINETIC_DATA),
lists:foreach(fun(F) ->
[{error, missing_args} = erlang:apply(kinetic, F, Args)
Expand All @@ -130,18 +109,17 @@ put_records_test_() ->
{setup, fun test_setup/0, fun test_teardown/1, fun test_put_records/0}.

test_put_records() ->
meck:expect(lhttpc,
meck:expect(ehttpc_pool, pick_worker, fun(_PoolName) -> worker end),
meck:expect(ehttpc,
request,
fun (_Url, post, _Headers, _Body, _Timeout, error) ->
{ok, {{400, bla}, headers, body}};
(_Url, post, _Headers, _Body, _Timeout, _Opts) ->
{ok,
{{200, bla},
headers,
<<"{\"FailedRecordCount\": 1,\n \"Records\":\n "
" [{\"SequenceNumber\": \"10\", \"ShardId\": "
"\"5\" },\n {\"ErrorCode\": \"404\", "
"\"ErrorMessage\": \"Not found\"}]}">>}}
fun(_Worker, post, {_Url, _Headers, _Body}, _Timeout) ->
{ok,
200,
headers,
<<"{\"FailedRecordCount\": 1,\n \"Records\":\n "
" [{\"SequenceNumber\": \"10\", \"ShardId\": "
"\"5\" },\n {\"ErrorCode\": \"404\", "
"\"ErrorMessage\": \"Not found\"}]}">>}
end),

{ok, [Result1, Result2]} = erlang:apply(kinetic, put_records, [[]]),
Expand Down
Loading