diff --git a/config/test.exs b/config/test.exs index a43d1e9..5ad2ad5 100644 --- a/config/test.exs +++ b/config/test.exs @@ -2,9 +2,6 @@ import Config config :neo4ex, Neo4ex.Neo4jConnection, hostname: "localhost", - principal: "neo4j", - credentials: "letmein", - pool_size: 1, show_sensitive_data_on_connection_error: true config :neo4ex, Neo4ex.Connector.Socket, transport_module: Neo4ex.Connector.SocketMock diff --git a/example_app/lib/example_app.ex b/example_app/lib/example_app.ex index 2ef3b2c..d8fe6b4 100644 --- a/example_app/lib/example_app.ex +++ b/example_app/lib/example_app.ex @@ -34,17 +34,19 @@ defmodule ExampleApp do LIMIT 10 """ - %Cypher.Query{query: query, params: %{company: "Davenport Inc"}} - |> Connector.stream(fn msg, acc -> - case msg do - [%Node{properties: properties}] -> - properties = Map.new(properties, fn {k, v} -> {String.to_atom(k), v} end) - {:cont, [struct(Customer, properties) | acc]} + Connector.transaction(fn -> + %Cypher.Query{query: query, params: %{company: "Davenport Inc"}} + |> Connector.stream() + |> Enum.reduce_while([], fn msg, acc -> + case msg do + [%Node{properties: properties}] -> + properties = Map.new(properties, fn {k, v} -> {String.to_atom(k), v} end) + {:cont, [struct(Customer, properties) | acc]} - _ -> - {:halt, acc} - end + _ -> + {:halt, acc} + end + end) end) - |> Enum.to_list() end end diff --git a/lib/neo4ex/connector.ex b/lib/neo4ex/connector.ex index 90434a9..65c191e 100644 --- a/lib/neo4ex/connector.ex +++ b/lib/neo4ex/connector.ex @@ -52,51 +52,49 @@ defmodule Neo4ex.Connector do If you need more control, use `stream/2`. """ def run(%Cypher.Query{} = query, opts \\ []) do - case prepare_query(query, opts) do - {:ok, args} -> apply(DBConnection, :execute, args) - other -> other + with {:ok, args} <- prepare_query(query, opts) do + apply(DBConnection, :execute, args) end end @doc """ - Executes Cypher query on the database and returns results. - This function uses reducer to read data from stream. - Reducer may finish prematurely. In that case, remaining part of the stream will be thrown away. + Returns lazy enumerable that emits all results matching given query. + It has to be ran inside transaction to read the data: + ``` + stream = Connector.stream(query) + Connector.transaction(fn -> + Enum.to_list(stream) + end) + ``` """ - def stream(%Cypher.Query{} = query, reducer, opts \\ []) when is_function(reducer, 2) do - with( - {:ok, [conn | args]} <- prepare_query(query, opts), - {:ok, stream} <- - DBConnection.transaction(conn, fn conn -> - # we have to consume stream within transaction - DBConnection - |> apply(:stream, [conn | args]) - |> Stream.reject(&is_nil/1) - |> Enum.reduce_while([], reducer) - end) - ) do - stream - end + def stream(%Cypher.Query{} = query, opts \\ []) do + pool = connection_pool!() + %Cypher.Stream{pool: pool, query: query} + end + + def transaction(func) when is_function(func, 0) do + pool = connection_pool!() + Neo4ex.Connector.transaction(pool, [], fn _ -> func.() end) end def transaction(func) when is_function(func, 1) do - conn = connection_pool!() - DBConnection.transaction(conn, func) + pool = connection_pool!() + Neo4ex.Connector.transaction(pool, [], func) end defp prepare_query(query, opts) do - conn = connection_pool!() + pool = connection_pool!() opts = Keyword.merge([debug: @debug_queries], opts) - case DBConnection.prepare(conn, query, opts) do - {:ok, query} -> {:ok, [conn, query, opts]} + case DBConnection.prepare(pool, query, opts) do + {:ok, query} -> {:ok, [pool, query, opts]} other -> other end end defp connection_pool!() do case Supervisor.which_children(__MODULE__) do - [{_, conn, _, [DBConnection]} | _] -> conn + [{_, pool, _, [DBConnection]} | _] -> pool [] -> raise "Please add #{__MODULE__} to application Supervision tree" end end @@ -113,6 +111,37 @@ defmodule Neo4ex.Connector do end) end + @doc false + def transaction(pool, opts, callback) when is_function(callback, 1) do + checkout_or_transaction(:transaction, pool, opts, callback) + end + + @doc false + def reduce(pool, query, params, opts, acc, fun) do + case get_conn(pool) do + %DBConnection{conn_mode: :transaction} = conn -> + DBConnection + |> apply(:stream, [conn, query, params, opts]) + |> Enumerable.reduce(acc, fun) + + _ -> + raise "cannot reduce stream outside of transaction" + end + end + + @doc false + def into(pool, query, params, opts) do + case get_conn(pool) do + %DBConnection{conn_mode: :transaction} = conn -> + DBConnection + |> apply(:stream, [conn, query, params, opts]) + |> Collectable.into() + + _ -> + raise "cannot collect into stream outside of transaction" + end + end + def send_noop(%Socket{sock: sock}), do: Socket.send(sock, @noop) def send(message, %Socket{sock: sock, bolt_version: bolt_version}) do @@ -172,4 +201,42 @@ defmodule Neo4ex.Connector do raise DBConnection.ConnectionError.exception(inspect(error)) end end + + ## Connection helpers + + defp checkout_or_transaction(fun, pool, opts, callback) do + callback = fn conn -> + previous_conn = put_conn(pool, conn) + + try do + callback.(conn) + after + reset_conn(pool, previous_conn) + end + end + + apply(DBConnection, fun, [get_conn_or_pool(pool), callback, opts]) + end + + defp get_conn_or_pool(pool) do + Process.get(key(pool), pool) + end + + defp get_conn(pool) do + Process.get(key(pool)) + end + + defp put_conn(pool, conn) do + Process.put(key(pool), conn) + end + + defp reset_conn(pool, conn) do + if conn do + put_conn(pool, conn) + else + Process.delete(key(pool)) + end + end + + defp key(pool), do: {__MODULE__, pool} end diff --git a/lib/neo4ex/cypher/stream.ex b/lib/neo4ex/cypher/stream.ex new file mode 100644 index 0000000..9784931 --- /dev/null +++ b/lib/neo4ex/cypher/stream.ex @@ -0,0 +1,47 @@ +defmodule Neo4ex.Cypher.Stream do + alias Neo4ex.Cypher.Query + + defstruct pool: nil, query: nil + + defimpl Enumerable do + def count(_), do: {:error, __MODULE__} + + def member?(_, _), do: {:error, __MODULE__} + + def slice(_), do: {:error, __MODULE__} + + def reduce( + %Neo4ex.Cypher.Stream{ + pool: pool, + query: %Query{params: params, opts: opts} = query + }, + acc, + fun + ) do + Neo4ex.Connector.reduce(pool, query, params, opts, acc, fun) + end + end + + defimpl Collectable do + def into( + %Neo4ex.Cypher.Stream{ + pool: pool, + query: %Query{params: params, opts: opts} = query + } = stream + ) do + {state, fun} = Neo4ex.Connector.into(pool, query, params, opts) + {state, make_into(fun, stream)} + end + + defp make_into(fun, stream) do + fn + state, :done -> + fun.(state, :done) + stream + + state, acc -> + fun.(state, acc) + end + end + end +end diff --git a/test/neo4ex_test.exs b/test/neo4ex_test.exs index e89d447..0c40fa2 100644 --- a/test/neo4ex_test.exs +++ b/test/neo4ex_test.exs @@ -127,13 +127,15 @@ defmodule Neo4exTest do |> expect(:send, fn _, _ -> :ok end) |> expect_message(encoded_success_message) - q = %Query{} - - assert ["hello", "hello", "goodbye"] == - Neo4ex.stream(q, fn x, acc -> - # this will stop stream in the middle - if acc == x, do: {:halt, [hd(x) | acc]}, else: {:cont, acc ++ x} - end) + stream = Neo4ex.stream(%Query{}) + + Neo4ex.transaction(fn -> + assert ["hello", "hello", "goodbye"] == + Enum.reduce_while(stream, [], fn x, acc -> + # this will stop stream in the middle + if acc == x, do: {:halt, [hd(x) | acc]}, else: {:cont, acc ++ x} + end) + end) end end @@ -151,7 +153,7 @@ defmodule Neo4exTest do |> expect(:send, fn _, ^chunk -> :ok end) |> expect_message(encoded_success_message) - Neo4ex.transaction(fn _ -> + Neo4ex.transaction(fn -> :ok end) end