Skip to content

Commit

Permalink
Merge pull request #153 from cgerling/add-support-to-unix-sockets
Browse files Browse the repository at this point in the history
Add support to unix sockets
  • Loading branch information
sneako authored Oct 16, 2021
2 parents da2adfc + 976f6f8 commit ebbe8f0
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 18 deletions.
26 changes: 19 additions & 7 deletions lib/finch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,10 @@ defmodule Finch do
* `:name` - The name of your Finch instance. This field is required.
* `:pools` - A map specifying the configuration for your pools. The keys should be URLs
provided as binaries, or the atom `:default` to provide a catch-all configuration to be used
for any unspecified URLs. See "Pool Configuration Options" below for details on the possible
map values. Default value is `%{default: [size: #{@default_pool_size}, count: #{@default_pool_count}]}`.
provided as binaries, a tuple `{scheme, {:local, unix_socket}}` where `unix_socket` is the path for
the socket, or the atom `:default` to provide a catch-all configuration to be used for any
unspecified URLs. See "Pool Configuration Options" below for details on the possible map
values. Default value is `%{default: [size: #{@default_pool_size}, count: #{@default_pool_count}]}`.
### Pool Configuration Options
Expand Down Expand Up @@ -133,6 +134,9 @@ defmodule Finch do
:default ->
{:ok, destination}

{scheme, {:local, path}} when is_atom(scheme) and is_binary(path) ->
{:ok, {scheme, {:local, path}, 0}}

url when is_binary(url) ->
cast_binary_destination(url)

Expand Down Expand Up @@ -198,7 +202,7 @@ defmodule Finch do
where `body_stream` is a `Stream`. This feature is not yet supported for HTTP/2 requests.
"""
@spec build(Request.method(), Request.url(), Request.headers(), Request.body()) :: Request.t()
defdelegate build(method, url, headers \\ [], body \\ nil), to: Request
defdelegate build(method, url, headers \\ [], body \\ nil, opts \\ []), to: Request

@doc """
Streams an HTTP request and returns the accumulator.
Expand Down Expand Up @@ -227,11 +231,19 @@ defmodule Finch do
{:ok, acc} | {:error, Exception.t()}
when acc: term()
def stream(%Request{} = req, name, acc, fun, opts \\ []) when is_function(fun, 2) do
%{scheme: scheme, host: host, port: port} = req
{pool, pool_mod} = PoolManager.get_pool(name, {scheme, host, port})
shp = build_shp(req)
{pool, pool_mod} = PoolManager.get_pool(name, shp)
pool_mod.request(pool, req, acc, fun, opts)
end

defp build_shp(%Request{scheme: scheme, unix_socket: unix_socket}) when is_binary(unix_socket) do
{scheme, {:local, unix_socket}, 0}
end

defp build_shp(%Request{scheme: scheme, host: host, port: port}) do
{scheme, host, port}
end

@doc """
Sends an HTTP request and returns a `Finch.Response` struct.
Expand Down Expand Up @@ -274,7 +286,7 @@ defmodule Finch do
end

def request(name, method, url, headers, body \\ nil, opts \\ []) do
IO.warn("Finch.request/6 is deprecated, use Finch.build/4 + Finch.request/3 instead")
IO.warn("Finch.request/6 is deprecated, use Finch.build/5 + Finch.request/3 instead")

build(method, url, headers, body)
|> request(name, opts)
Expand Down
25 changes: 19 additions & 6 deletions lib/finch/pool_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ defmodule Finch.PoolManager do
:versions
]

@default_conn_hostname "localhost"

def start_link(config) do
GenServer.start_link(__MODULE__, config, name: config.manager_name)
end
Expand Down Expand Up @@ -75,15 +77,15 @@ defmodule Finch.PoolManager do
end

defp pool_config(%{pools: config, default_pool_config: default}, shp) do
case Map.get(config, shp, config[:default]) do
nil -> maybe_drop_tls_options(shp, default)
config -> config
end
config
|> Map.get(shp, default)
|> maybe_drop_tls_options(shp)
|> maybe_add_hostname(shp)
end

# Drop TLS options from :conn_opts for default pools with :http scheme,
# otherwise you will get :badarg error from :gen_tcp
defp maybe_drop_tls_options({:http, _, _} = _shp, config) when is_map(config) do
defp maybe_drop_tls_options(config, {:http, _, _} = _shp) when is_map(config) do
with conn_opts when is_list(conn_opts) <- config[:conn_opts],
trns_opts when is_list(trns_opts) <- conn_opts[:transport_opts] do
trns_opts = Keyword.drop(trns_opts, @mint_tls_opts)
Expand All @@ -94,7 +96,18 @@ defmodule Finch.PoolManager do
end
end

defp maybe_drop_tls_options(_, config), do: config
defp maybe_drop_tls_options(config, _), do: config

# Hostname is required when the address is not a url (binary) so we need to specify
# a default value in case the configuration does not specify one.
defp maybe_add_hostname(config, {_scheme, {:local, _path}, _port} = _shp) when is_map(config) do
conn_opts =
config |> Map.get(:conn_opts, []) |> Keyword.put_new(:hostname, @default_conn_hostname)

Map.put(config, :conn_opts, conn_opts)
end

defp maybe_add_hostname(config, _), do: config

defp pool_mod(:http1), do: Finch.HTTP1.Pool
defp pool_mod(:http2), do: Finch.HTTP2.Pool
Expand Down
8 changes: 5 additions & 3 deletions lib/finch/request.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Finch.Request do
"""

@enforce_keys [:scheme, :host, :port, :method, :path, :headers, :body, :query]
defstruct [:scheme, :host, :port, :method, :path, :headers, :body, :query]
defstruct [:scheme, :host, :port, :method, :path, :headers, :body, :query, :unix_socket]

@atom_methods [
:get,
Expand Down Expand Up @@ -57,7 +57,8 @@ defmodule Finch.Request do
def request_path(%{path: path, query: query}), do: "#{path}?#{query}"

@doc false
def build(method, url, headers, body) do
def build(method, url, headers, body, opts) do
unix_socket = Keyword.get(opts, :unix_socket)
{scheme, host, port, path, query} = parse_url(url)

%Finch.Request{
Expand All @@ -68,7 +69,8 @@ defmodule Finch.Request do
path: path,
headers: headers,
body: body,
query: query
query: query,
unix_socket: unix_socket
}
end

Expand Down
38 changes: 36 additions & 2 deletions test/finch_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ defmodule FinchTest do
doctest Finch

alias Finch.Response
alias Finch.MockSocketServer

setup do
{:ok, bypass: Bypass.open()}
Expand Down Expand Up @@ -75,18 +76,23 @@ defmodule FinchTest do
%{bypass: bypass} do
other_bypass = Bypass.open()
default_bypass = Bypass.open()
unix_socket = {:local, "/my/unix/socket"}

start_supervised!(
{Finch,
name: MyFinch,
pools: %{
endpoint(bypass, "/some-path") => [count: 5, size: 5],
endpoint(other_bypass, "/some-other-path") => [count: 10, size: 10]
endpoint(other_bypass, "/some-other-path") => [count: 10, size: 10],
{:http, unix_socket} => [count: 5, size: 5],
{:https, unix_socket} => [count: 10, size: 10]
}}
)

assert get_pools(MyFinch, shp(bypass)) |> length() == 5
assert get_pools(MyFinch, shp(other_bypass)) |> length() == 10
assert get_pools(MyFinch, shp({:http, unix_socket})) |> length() == 5
assert get_pools(MyFinch, shp({:https, unix_socket})) |> length() == 10

# no pool has been started for this unconfigured shp
assert get_pools(MyFinch, shp(default_bypass)) |> length() == 0
Expand Down Expand Up @@ -145,7 +151,7 @@ defmodule FinchTest do
end
end

describe "build/4" do
describe "build/5" do
test "raises if unsupported atom request method provided", %{bypass: bypass} do
assert_raise ArgumentError, ~r/got unsupported atom method :gimme/, fn ->
Finch.build(:gimme, endpoint(bypass))
Expand Down Expand Up @@ -255,6 +261,33 @@ defmodule FinchTest do
assert {:ok, %{status: 200}} = Finch.build(:get, uri) |> Finch.request(MyFinch)
end

test "successful get request to a unix socket" do
{:ok, {:local, socket_path}} = MockSocketServer.start()

start_supervised!({Finch, name: MyFinch})

assert {:ok, %Response{status: 200}} =
Finch.build(:get, "http://localhost/", [], nil, unix_socket: socket_path)
|> Finch.request(MyFinch)
end

@tag :capture_log
test "successful get request to a unix socket with tls" do
{:ok, socket_address = {:local, socket_path}} = MockSocketServer.start(ssl?: true)

start_supervised!(
{Finch,
name: MyFinch,
pools: %{
{:https, socket_address} => [conn_opts: [transport_opts: [verify: :verify_none]]]
}}
)

assert {:ok, %Response{status: 200}} =
Finch.build(:get, "https://localhost/", [], nil, unix_socket: socket_path)
|> Finch.request(MyFinch)
end

test "properly handles connection: close", %{bypass: bypass} do
start_supervised!({Finch, name: MyFinch})

Expand Down Expand Up @@ -742,6 +775,7 @@ defmodule FinchTest do
defp endpoint(%{port: port}, path \\ "/"), do: "http://localhost:#{port}#{path}"

defp shp(%{port: port}), do: {:http, "localhost", port}
defp shp({scheme, {:local, unix_socket}}), do: {scheme, {:local, unix_socket}, 0}

defp expect_any(bypass) do
Bypass.expect(bypass, fn conn -> Plug.Conn.send_resp(conn, 200, "OK") end)
Expand Down
99 changes: 99 additions & 0 deletions test/support/mock_socket_server.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
defmodule Finch.MockSocketServer do
@moduledoc false

@fixtures_dir Path.expand("../fixtures", __DIR__)

@socket_opts [
active: false,
mode: :binary,
packet: :raw
]

@ssl_opts [
reuseaddr: true,
nodelay: true,
certfile: Path.join([@fixtures_dir, "selfsigned.pem"]),
keyfile: Path.join([@fixtures_dir, "selfsigned_key.pem"])
]

@http_response "HTTP/1.1 200 OK\r\n\r\n"

def start(opts \\ []) do
ssl? = Keyword.get(opts, :ssl?, false)

socket_address = build_socket_address()
delete_existing_sockets(socket_address)

{:ok, socket} = listen(socket_address, ssl?)

spawn_link(fn ->
{:ok, client} = accept(socket, ssl?)

serve(client, ssl?)
end)

{:ok, socket_address}
end

defp build_socket_address do
name = "finch_mock_socket_server.sock"
socket_path = System.tmp_dir!() |> Path.join(name)

{:local, socket_path}
end

defp delete_existing_sockets({:local, socket_path}) do
File.rm(socket_path)
end

defp listen(socket_address, false = _ssl?) do
opts = [{:ifaddr, socket_address} | @socket_opts]

:gen_tcp.listen(0, opts)
end

defp listen(socket_address, true = _ssl?) do
base_opts = @socket_opts ++ @ssl_opts
opts = [{:ifaddr, socket_address} | base_opts]

:ssl.listen(0, opts)
end

defp accept(socket, false = _ssl?) do
{:ok, _client} = :gen_tcp.accept(socket)
end

defp accept(socket, true = _ssl?) do
{:ok, client} = :ssl.transport_accept(socket)

if function_exported?(:ssl, :handshake, 1) do
{:ok, _} = apply(:ssl, :handshake, [client])
else
:ok = apply(:ssl, :ssl_accept, [client])
end

{:ok, client}
end

defp serve(client, false = ssl?) do
case :gen_tcp.recv(client, 0) do
{:ok, _data} ->
:gen_tcp.send(client, @http_response)
:gen_tcp.close(client)

_ ->
serve(client, ssl?)
end
end

defp serve(client, true = ssl?) do
case :ssl.recv(client, 0) do
{:ok, _data} ->
:ssl.send(client, @http_response)
:ssl.close(client)

_ ->
serve(client, ssl?)
end
end
end

0 comments on commit ebbe8f0

Please sign in to comment.