Skip to content

Commit

Permalink
Make streams work similar to Ecto
Browse files Browse the repository at this point in the history
  • Loading branch information
cichacz committed Dec 8, 2023
1 parent 5011205 commit cf5d1a9
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 47 deletions.
3 changes: 0 additions & 3 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ import Config

config :neo4ex, Neo4ex.Neo4jConnection,
hostname: "localhost",
principal: "neo4j",
credentials: "letmein",
pool_size: 1,
show_sensitive_data_on_connection_error: true

config :neo4ex, Neo4ex.Connector.Socket, transport_module: Neo4ex.Connector.SocketMock
22 changes: 12 additions & 10 deletions example_app/lib/example_app.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,19 @@ defmodule ExampleApp do
LIMIT 10
"""

%Cypher.Query{query: query, params: %{company: "Davenport Inc"}}
|> Connector.stream(fn msg, acc ->
case msg do
[%Node{properties: properties}] ->
properties = Map.new(properties, fn {k, v} -> {String.to_atom(k), v} end)
{:cont, [struct(Customer, properties) | acc]}
Connector.transaction(fn ->
%Cypher.Query{query: query, params: %{company: "Davenport Inc"}}
|> Connector.stream()
|> Enum.reduce_while([], fn msg, acc ->
case msg do
[%Node{properties: properties}] ->
properties = Map.new(properties, fn {k, v} -> {String.to_atom(k), v} end)
{:cont, [struct(Customer, properties) | acc]}

_ ->
{:halt, acc}
end
_ ->
{:halt, acc}
end
end)
end)
|> Enum.to_list()
end
end
119 changes: 93 additions & 26 deletions lib/neo4ex/connector.ex
Original file line number Diff line number Diff line change
Expand Up @@ -52,51 +52,49 @@ defmodule Neo4ex.Connector do
If you need more control, use `stream/2`.
"""
def run(%Cypher.Query{} = query, opts \\ []) do
case prepare_query(query, opts) do
{:ok, args} -> apply(DBConnection, :execute, args)
other -> other
with {:ok, args} <- prepare_query(query, opts) do
apply(DBConnection, :execute, args)
end
end

@doc """
Executes Cypher query on the database and returns results.
This function uses reducer to read data from stream.
Reducer may finish prematurely. In that case, remaining part of the stream will be thrown away.
Returns lazy enumerable that emits all results matching given query.
It has to be ran inside transaction to read the data:
```
stream = Connector.stream(query)
Connector.transaction(fn ->
Enum.to_list(stream)
end)
```
"""
def stream(%Cypher.Query{} = query, reducer, opts \\ []) when is_function(reducer, 2) do
with(
{:ok, [conn | args]} <- prepare_query(query, opts),
{:ok, stream} <-
DBConnection.transaction(conn, fn conn ->
# we have to consume stream within transaction
DBConnection
|> apply(:stream, [conn | args])
|> Stream.reject(&is_nil/1)
|> Enum.reduce_while([], reducer)
end)
) do
stream
end
def stream(%Cypher.Query{} = query, opts \\ []) do
pool = connection_pool!()
%Cypher.Stream{pool: pool, query: query}
end

def transaction(func) when is_function(func, 0) do
pool = connection_pool!()
Neo4ex.Connector.transaction(pool, [], fn _ -> func.() end)
end

def transaction(func) when is_function(func, 1) do
conn = connection_pool!()
DBConnection.transaction(conn, func)
pool = connection_pool!()
Neo4ex.Connector.transaction(pool, [], func)
end

defp prepare_query(query, opts) do
conn = connection_pool!()
pool = connection_pool!()
opts = Keyword.merge([debug: @debug_queries], opts)

case DBConnection.prepare(conn, query, opts) do
{:ok, query} -> {:ok, [conn, query, opts]}
case DBConnection.prepare(pool, query, opts) do
{:ok, query} -> {:ok, [pool, query, opts]}
other -> other
end
end

defp connection_pool!() do
case Supervisor.which_children(__MODULE__) do
[{_, conn, _, [DBConnection]} | _] -> conn
[{_, pool, _, [DBConnection]} | _] -> pool
[] -> raise "Please add #{__MODULE__} to application Supervision tree"
end
end
Expand All @@ -113,6 +111,37 @@ defmodule Neo4ex.Connector do
end)
end

@doc false
def transaction(pool, opts, callback) when is_function(callback, 1) do
checkout_or_transaction(:transaction, pool, opts, callback)
end

@doc false
def reduce(pool, query, params, opts, acc, fun) do
case get_conn(pool) do
%DBConnection{conn_mode: :transaction} = conn ->
DBConnection
|> apply(:stream, [conn, query, params, opts])
|> Enumerable.reduce(acc, fun)

_ ->
raise "cannot reduce stream outside of transaction"
end
end

@doc false
def into(pool, query, params, opts) do
case get_conn(pool) do
%DBConnection{conn_mode: :transaction} = conn ->
DBConnection
|> apply(:stream, [conn, query, params, opts])
|> Collectable.into()

_ ->
raise "cannot collect into stream outside of transaction"
end
end

def send_noop(%Socket{sock: sock}), do: Socket.send(sock, @noop)

def send(message, %Socket{sock: sock, bolt_version: bolt_version}) do
Expand Down Expand Up @@ -172,4 +201,42 @@ defmodule Neo4ex.Connector do
raise DBConnection.ConnectionError.exception(inspect(error))
end
end

## Connection helpers

defp checkout_or_transaction(fun, pool, opts, callback) do
callback = fn conn ->
previous_conn = put_conn(pool, conn)

try do
callback.(conn)
after
reset_conn(pool, previous_conn)
end
end

apply(DBConnection, fun, [get_conn_or_pool(pool), callback, opts])
end

defp get_conn_or_pool(pool) do
Process.get(key(pool), pool)
end

defp get_conn(pool) do
Process.get(key(pool))
end

defp put_conn(pool, conn) do
Process.put(key(pool), conn)
end

defp reset_conn(pool, conn) do
if conn do
put_conn(pool, conn)
else
Process.delete(key(pool))
end
end

defp key(pool), do: {__MODULE__, pool}
end
47 changes: 47 additions & 0 deletions lib/neo4ex/cypher/stream.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
defmodule Neo4ex.Cypher.Stream do

Check warning on line 1 in lib/neo4ex/cypher/stream.ex

View workflow job for this annotation

GitHub Actions / Build and test (1.14.3, 25.2)

Modules should have a @moduledoc tag.
alias Neo4ex.Cypher.Query

defstruct pool: nil, query: nil

defimpl Enumerable do
def count(_), do: {:error, __MODULE__}

def member?(_, _), do: {:error, __MODULE__}

def slice(_), do: {:error, __MODULE__}

def reduce(
%Neo4ex.Cypher.Stream{
pool: pool,
query: %Query{params: params, opts: opts} = query
},
acc,
fun
) do
Neo4ex.Connector.reduce(pool, query, params, opts, acc, fun)
end
end

defimpl Collectable do
def into(
%Neo4ex.Cypher.Stream{
pool: pool,
query: %Query{params: params, opts: opts} = query
} = stream
) do
{state, fun} = Neo4ex.Connector.into(pool, query, params, opts)
{state, make_into(fun, stream)}
end

defp make_into(fun, stream) do
fn
state, :done ->
fun.(state, :done)
stream

state, acc ->
fun.(state, acc)
end
end
end
end
18 changes: 10 additions & 8 deletions test/neo4ex_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,15 @@ defmodule Neo4exTest do
|> expect(:send, fn _, _ -> :ok end)
|> expect_message(encoded_success_message)

q = %Query{}

assert ["hello", "hello", "goodbye"] ==
Neo4ex.stream(q, fn x, acc ->
# this will stop stream in the middle
if acc == x, do: {:halt, [hd(x) | acc]}, else: {:cont, acc ++ x}
end)
stream = Neo4ex.stream(%Query{})

Neo4ex.transaction(fn ->
assert ["hello", "hello", "goodbye"] ==
Enum.reduce_while(stream, [], fn x, acc ->
# this will stop stream in the middle
if acc == x, do: {:halt, [hd(x) | acc]}, else: {:cont, acc ++ x}
end)
end)
end
end

Expand All @@ -151,7 +153,7 @@ defmodule Neo4exTest do
|> expect(:send, fn _, ^chunk -> :ok end)
|> expect_message(encoded_success_message)

Neo4ex.transaction(fn _ ->
Neo4ex.transaction(fn ->
:ok
end)
end
Expand Down

0 comments on commit cf5d1a9

Please sign in to comment.