Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add ability to perform octet streaming #1012

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion lib/ex_aws.ex
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,13 @@ defmodule ExAws do
@impl ExAws.Behaviour
@spec stream!(ExAws.Operation.t(), keyword) :: Enumerable.t()
def stream!(op, config_overrides \\ []) do
ExAws.Operation.stream!(op, ExAws.Config.new(op.service, config_overrides))
case ExAws.Operation.stream!(op, ExAws.Config.new(op.service, config_overrides)) do
{:ok, result} ->
result

result ->
result
end
end

@doc false
Expand Down
4 changes: 2 additions & 2 deletions lib/ex_aws/behaviour.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ defmodule ExAws.Behaviour do
@callback request!(ExAws.Operation.t(), Keyword.t()) :: term | no_return

@doc "See `ExAws.stream!/2`."
@callback stream!(ExAws.Operation.t()) :: Enumerable.t()
@callback stream!(ExAws.Operation.t()) :: Enumerable.t() | {:ok, term} | {:error, term}

@doc "See `ExAws.stream!/2`."
@callback stream!(ExAws.Operation.t(), Keyword.t()) :: Enumerable.t()
@callback stream!(ExAws.Operation.t(), Keyword.t()) :: Enumerable.t() | {:ok, term} | {:error, term}
end
26 changes: 21 additions & 5 deletions lib/ex_aws/operation/s3.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,26 @@ defmodule ExAws.Operation.S3 do

defimpl ExAws.Operation do
def perform(operation, config) do
{operation, config, url, body, headers, http_method} =
build_request_params(operation, config)

ExAws.Request.request(http_method, url, body, headers, config, operation.service)
|> ExAws.Request.default_aws_error()
|> operation.parser.()
end

def stream!(%{stream_builder: :octet_stream} = operation, config) do
{operation, config, url, body, headers, http_method} =
build_request_params(operation, config)

ExAws.Request.request(http_method, url, body, headers, config, operation.service, true)
|> ExAws.Request.default_aws_error()
|> operation.parser.()
end

def stream!(%{stream_builder: fun}, config), do: fun.(config)

def build_request_params(operation, config) do
body = operation.body
headers = operation.headers
http_method = operation.http_method
Expand All @@ -37,13 +57,9 @@ defmodule ExAws.Operation.S3 do
|> put_content_length_header(body, http_method)
|> Map.to_list()

ExAws.Request.request(http_method, url, body, headers, config, operation.service)
|> ExAws.Request.default_aws_error()
|> operation.parser.()
{operation, config, url, body, headers, http_method}
end

def stream!(%{stream_builder: fun}, config), do: fun.(config)

defp put_content_length_header(headers, "", :get), do: headers

defp put_content_length_header(headers, body, _) do
Expand Down
44 changes: 35 additions & 9 deletions lib/ex_aws/request.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,39 @@ defmodule ExAws.Request do
@type error_t :: {:error, {:http_error, http_status, binary}}
@type response_t :: success_t | error_t

def request(http_method, url, data, headers, config, service) do
def request(http_method, url, data, headers, config, service, stream \\ false) do
body =
case data do
[] -> "{}"
d when is_binary(d) -> d
_ -> config[:json_codec].encode!(data)
end

request_and_retry(http_method, url, service, config, headers, body, {:attempt, 1})
request_and_retry(http_method, url, service, config, headers, body, stream, {:attempt, 1})
end

def request_and_retry(_method, _url, _service, _config, _headers, _req_body, {:error, reason}),
do: {:error, reason}

def request_and_retry(method, url, service, config, headers, req_body, {:attempt, attempt}) do
def request_and_retry(
_method,
_url,
_service,
_config,
_headers,
_req_body,
_stream,
{:error, reason}
),
do: {:error, reason}

def request_and_retry(
method,
url,
service,
config,
headers,
req_body,
stream,
{:attempt, attempt}
) do
full_headers = ExAws.Auth.headers(method, url, service, config, headers, req_body)

with {:ok, full_headers} <- full_headers do
Expand All @@ -35,7 +53,7 @@ defmodule ExAws.Request do
)
end

case do_request(config, method, safe_url, req_body, full_headers, attempt, service) do
case do_request(config, method, safe_url, req_body, full_headers, attempt, service, stream) do
{:ok, %{status_code: status} = resp} when status in 200..299 or status == 304 ->
{:ok, resp}

Expand All @@ -53,6 +71,7 @@ defmodule ExAws.Request do
config,
headers,
req_body,
stream,
attempt_again?(attempt, reason, config)
)

Expand All @@ -71,6 +90,7 @@ defmodule ExAws.Request do
config,
headers,
req_body,
stream,
attempt_again?(attempt, reason, config)
)

Expand All @@ -86,13 +106,14 @@ defmodule ExAws.Request do
config,
headers,
req_body,
stream,
attempt_again?(attempt, reason, config)
)
end
end
end

defp do_request(config, method, safe_url, req_body, full_headers, attempt, service) do
defp do_request(config, method, safe_url, req_body, full_headers, attempt, service, stream) do
telemetry_event = Map.get(config, :telemetry_event, [:ex_aws, :request])
telemetry_options = Map.get(config, :telemetry_options, [])

Expand All @@ -111,7 +132,8 @@ defmodule ExAws.Request do
safe_url,
req_body,
full_headers,
Map.get(config, :http_opts, [])
Map.get(config, :http_opts, []),
stream
)
|> maybe_transform_response()

Expand Down Expand Up @@ -211,6 +233,10 @@ defmodule ExAws.Request do
|> :timer.sleep()
end

def maybe_transform_response({:ok, %{status: status, stream: stream, headers: headers}}) do
{:ok, %{status_code: status, stream: stream, headers: headers}}
end

def maybe_transform_response({:ok, %{status: status, body: body, headers: headers}}) do
# Req and Finch use status (rather than status_code) as a key.
{:ok, %{status_code: status, body: body, headers: headers}}
Expand Down
39 changes: 36 additions & 3 deletions lib/ex_aws/request/hackney.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,52 @@ defmodule ExAws.Request.Hackney do

@default_opts [recv_timeout: 30_000]

def request(method, url, body \\ "", headers \\ [], http_opts \\ []) do
opts = Application.get_env(:ex_aws, :hackney_opts, @default_opts)
opts = http_opts ++ [:with_body | opts]
def request(method, url, body \\ "", headers \\ [], http_opts \\ [], stream \\ false) do
opts = Application.get_env(:ex_aws, :hackney_opts, @default_opts) ++ http_opts

opts =
if stream do
opts
else
[:with_body | opts]
end

case :hackney.request(method, url, headers, body, opts) do
{:ok, status, headers} ->
{:ok, %{status_code: status, headers: headers}}

{:ok, status, headers, client} when is_reference(client) ->
stream =
Stream.resource(
fn -> client end,
&continue_stream/1,
&finish_stream/1
)

{:ok, %{status_code: status, headers: headers, stream: stream}}

{:ok, status, headers, body} ->
{:ok, %{status_code: status, headers: headers, body: body}}

{:error, reason} ->
{:error, %{reason: reason}}
end
end

defp continue_stream(client) do
case :hackney.stream_body(client) do
{:ok, data} ->
{[data], client}

:done ->
{:halt, client}

{:error, reason} ->
raise reason
end
end

defp finish_stream(client) do
:hackney.close(client)
end
end
1 change: 1 addition & 0 deletions lib/ex_aws/request/http_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,6 @@ defmodule ExAws.Request.HttpClient do
) ::
{:ok, %{status_code: pos_integer, headers: any}}
| {:ok, %{status_code: pos_integer, headers: any, body: binary}}
| {:ok, %{status_code: pos_integer, headers: any, stream: Enumerable.t()}}
| {:error, %{reason: any}}
end
Loading