Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to unix sockets #153

Merged
merged 11 commits into from
Oct 16, 2021
26 changes: 19 additions & 7 deletions lib/finch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,10 @@ defmodule Finch do
* `:name` - The name of your Finch instance. This field is required.

* `:pools` - A map specifying the configuration for your pools. The keys should be URLs
provided as binaries, or the atom `:default` to provide a catch-all configuration to be used
for any unspecified URLs. See "Pool Configuration Options" below for details on the possible
map values. Default value is `%{default: [size: #{@default_pool_size}, count: #{@default_pool_count}]}`.
provided as binaries, a tuple `{scheme, {:local, unix_socket}}` where `unix_socket` is the path for
the socket, or the atom `:default` to provide a catch-all configuration to be used for any
unspecified URLs. See "Pool Configuration Options" below for details on the possible map
values. Default value is `%{default: [size: #{@default_pool_size}, count: #{@default_pool_count}]}`.

### Pool Configuration Options

Expand Down Expand Up @@ -133,6 +134,9 @@ defmodule Finch do
:default ->
{:ok, destination}

{scheme, {:local, path}} when is_atom(scheme) and is_binary(path) ->
{:ok, {scheme, {:local, path}, 0}}

url when is_binary(url) ->
cast_binary_destination(url)

Expand Down Expand Up @@ -198,7 +202,7 @@ defmodule Finch do
where `body_stream` is a `Stream`. This feature is not yet supported for HTTP/2 requests.
"""
@spec build(Request.method(), Request.url(), Request.headers(), Request.body()) :: Request.t()
defdelegate build(method, url, headers \\ [], body \\ nil), to: Request
defdelegate build(method, url, headers \\ [], body \\ nil, opts \\ []), to: Request

@doc """
Streams an HTTP request and returns the accumulator.
Expand Down Expand Up @@ -227,11 +231,19 @@ defmodule Finch do
{:ok, acc} | {:error, Exception.t()}
when acc: term()
def stream(%Request{} = req, name, acc, fun, opts \\ []) when is_function(fun, 2) do
%{scheme: scheme, host: host, port: port} = req
{pool, pool_mod} = PoolManager.get_pool(name, {scheme, host, port})
shp = build_shp(req)
{pool, pool_mod} = PoolManager.get_pool(name, shp)
pool_mod.request(pool, req, acc, fun, opts)
end

defp build_shp(%Request{scheme: scheme, unix_socket: unix_socket}) when is_binary(unix_socket) do
{scheme, {:local, unix_socket}, 0}
end

defp build_shp(%Request{scheme: scheme, host: host, port: port}) do
{scheme, host, port}
end

@doc """
Sends an HTTP request and returns a `Finch.Response` struct.

Expand Down Expand Up @@ -274,7 +286,7 @@ defmodule Finch do
end

def request(name, method, url, headers, body \\ nil, opts \\ []) do
IO.warn("Finch.request/6 is deprecated, use Finch.build/4 + Finch.request/3 instead")
IO.warn("Finch.request/6 is deprecated, use Finch.build/5 + Finch.request/3 instead")

build(method, url, headers, body)
|> request(name, opts)
Expand Down
25 changes: 19 additions & 6 deletions lib/finch/pool_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ defmodule Finch.PoolManager do
:versions
]

@default_conn_hostname "localhost"

def start_link(config) do
GenServer.start_link(__MODULE__, config, name: config.manager_name)
end
Expand Down Expand Up @@ -75,15 +77,15 @@ defmodule Finch.PoolManager do
end

defp pool_config(%{pools: config, default_pool_config: default}, shp) do
case Map.get(config, shp, config[:default]) do
nil -> maybe_drop_tls_options(shp, default)
config -> config
end
config
|> Map.get(shp, default)
cgerling marked this conversation as resolved.
Show resolved Hide resolved
|> maybe_drop_tls_options(shp)
|> maybe_add_hostname(shp)
end

# Drop TLS options from :conn_opts for default pools with :http scheme,
# otherwise you will get :badarg error from :gen_tcp
defp maybe_drop_tls_options({:http, _, _} = _shp, config) when is_map(config) do
defp maybe_drop_tls_options(config, {:http, _, _} = _shp) when is_map(config) do
with conn_opts when is_list(conn_opts) <- config[:conn_opts],
trns_opts when is_list(trns_opts) <- conn_opts[:transport_opts] do
trns_opts = Keyword.drop(trns_opts, @mint_tls_opts)
Expand All @@ -94,7 +96,18 @@ defmodule Finch.PoolManager do
end
end

defp maybe_drop_tls_options(_, config), do: config
defp maybe_drop_tls_options(config, _), do: config

# Hostname is required when the address is not a url (binary) so we need to specify
# a default value in case the configuration does not specify one.
defp maybe_add_hostname(config, {_scheme, {:local, _path}, _port} = _shp) when is_map(config) do
conn_opts =
config |> Map.get(:conn_opts, []) |> Keyword.put_new(:hostname, @default_conn_hostname)

Map.put(config, :conn_opts, conn_opts)
end

defp maybe_add_hostname(config, _), do: config

defp pool_mod(:http1), do: Finch.HTTP1.Pool
defp pool_mod(:http2), do: Finch.HTTP2.Pool
Expand Down
8 changes: 5 additions & 3 deletions lib/finch/request.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Finch.Request do
"""

@enforce_keys [:scheme, :host, :port, :method, :path, :headers, :body, :query]
defstruct [:scheme, :host, :port, :method, :path, :headers, :body, :query]
defstruct [:scheme, :host, :port, :method, :path, :headers, :body, :query, :unix_socket]

@atom_methods [
:get,
Expand Down Expand Up @@ -57,7 +57,8 @@ defmodule Finch.Request do
def request_path(%{path: path, query: query}), do: "#{path}?#{query}"

@doc false
def build(method, url, headers, body) do
def build(method, url, headers, body, opts) do
unix_socket = Keyword.get(opts, :unix_socket)
{scheme, host, port, path, query} = parse_url(url)

%Finch.Request{
Expand All @@ -68,7 +69,8 @@ defmodule Finch.Request do
path: path,
headers: headers,
body: body,
query: query
query: query,
unix_socket: unix_socket
}
end

Expand Down
38 changes: 36 additions & 2 deletions test/finch_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ defmodule FinchTest do
doctest Finch

alias Finch.Response
alias Finch.MockSocketServer

setup do
{:ok, bypass: Bypass.open()}
Expand Down Expand Up @@ -75,18 +76,23 @@ defmodule FinchTest do
%{bypass: bypass} do
other_bypass = Bypass.open()
default_bypass = Bypass.open()
unix_socket = {:local, "/my/unix/socket"}

start_supervised!(
{Finch,
name: MyFinch,
pools: %{
endpoint(bypass, "/some-path") => [count: 5, size: 5],
endpoint(other_bypass, "/some-other-path") => [count: 10, size: 10]
endpoint(other_bypass, "/some-other-path") => [count: 10, size: 10],
{:http, unix_socket} => [count: 5, size: 5],
{:https, unix_socket} => [count: 10, size: 10]
}}
)

assert get_pools(MyFinch, shp(bypass)) |> length() == 5
assert get_pools(MyFinch, shp(other_bypass)) |> length() == 10
assert get_pools(MyFinch, shp({:http, unix_socket})) |> length() == 5
assert get_pools(MyFinch, shp({:https, unix_socket})) |> length() == 10

# no pool has been started for this unconfigured shp
assert get_pools(MyFinch, shp(default_bypass)) |> length() == 0
Expand Down Expand Up @@ -145,7 +151,7 @@ defmodule FinchTest do
end
end

describe "build/4" do
describe "build/5" do
test "raises if unsupported atom request method provided", %{bypass: bypass} do
assert_raise ArgumentError, ~r/got unsupported atom method :gimme/, fn ->
Finch.build(:gimme, endpoint(bypass))
Expand Down Expand Up @@ -255,6 +261,33 @@ defmodule FinchTest do
assert {:ok, %{status: 200}} = Finch.build(:get, uri) |> Finch.request(MyFinch)
end

test "successful get request to a unix socket" do
{:ok, {:local, socket_path}} = MockSocketServer.start()

start_supervised!({Finch, name: MyFinch})

assert {:ok, %Response{status: 200}} =
Finch.build(:get, "http://localhost/", [], nil, unix_socket: socket_path)
|> Finch.request(MyFinch)
end

@tag :capture_log
test "successful get request to a unix socket with tls" do
{:ok, socket_address = {:local, socket_path}} = MockSocketServer.start(ssl?: true)

start_supervised!(
{Finch,
name: MyFinch,
pools: %{
{:https, socket_address} => [conn_opts: [transport_opts: [verify: :verify_none]]]
}}
)

assert {:ok, %Response{status: 200}} =
Finch.build(:get, "https://localhost/", [], nil, unix_socket: socket_path)
|> Finch.request(MyFinch)
end

test "properly handles connection: close", %{bypass: bypass} do
start_supervised!({Finch, name: MyFinch})

Expand Down Expand Up @@ -742,6 +775,7 @@ defmodule FinchTest do
defp endpoint(%{port: port}, path \\ "/"), do: "http://localhost:#{port}#{path}"

defp shp(%{port: port}), do: {:http, "localhost", port}
defp shp({scheme, {:local, unix_socket}}), do: {scheme, {:local, unix_socket}, 0}

defp expect_any(bypass) do
Bypass.expect(bypass, fn conn -> Plug.Conn.send_resp(conn, 200, "OK") end)
Expand Down
99 changes: 99 additions & 0 deletions test/support/mock_socket_server.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
defmodule Finch.MockSocketServer do
@moduledoc false

@fixtures_dir Path.expand("../fixtures", __DIR__)

@socket_opts [
active: false,
mode: :binary,
packet: :raw
]

@ssl_opts [
reuseaddr: true,
nodelay: true,
certfile: Path.join([@fixtures_dir, "selfsigned.pem"]),
keyfile: Path.join([@fixtures_dir, "selfsigned_key.pem"])
]

@http_response "HTTP/1.1 200 OK\r\n\r\n"

def start(opts \\ []) do
ssl? = Keyword.get(opts, :ssl?, false)

socket_address = build_socket_address()
delete_existing_sockets(socket_address)

{:ok, socket} = listen(socket_address, ssl?)

spawn_link(fn ->
{:ok, client} = accept(socket, ssl?)

serve(client, ssl?)
end)

{:ok, socket_address}
end

defp build_socket_address do
name = "finch_mock_socket_server.sock"
socket_path = System.tmp_dir!() |> Path.join(name)

{:local, socket_path}
end

defp delete_existing_sockets({:local, socket_path}) do
File.rm(socket_path)
end

defp listen(socket_address, false = _ssl?) do
opts = [{:ifaddr, socket_address} | @socket_opts]

:gen_tcp.listen(0, opts)
end

defp listen(socket_address, true = _ssl?) do
base_opts = @socket_opts ++ @ssl_opts
opts = [{:ifaddr, socket_address} | base_opts]

:ssl.listen(0, opts)
end

defp accept(socket, false = _ssl?) do
{:ok, _client} = :gen_tcp.accept(socket)
end

defp accept(socket, true = _ssl?) do
{:ok, client} = :ssl.transport_accept(socket)

if function_exported?(:ssl, :handshake, 1) do
{:ok, _} = apply(:ssl, :handshake, [client])
else
:ok = apply(:ssl, :ssl_accept, [client])
end

{:ok, client}
end

defp serve(client, false = ssl?) do
case :gen_tcp.recv(client, 0) do
{:ok, _data} ->
:gen_tcp.send(client, @http_response)
:gen_tcp.close(client)

_ ->
serve(client, ssl?)
end
end

defp serve(client, true = ssl?) do
case :ssl.recv(client, 0) do
{:ok, _data} ->
:ssl.send(client, @http_response)
:ssl.close(client)

_ ->
serve(client, ssl?)
end
end
end