Skip to content

Commit

Permalink
Added transaction management functions
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexR2D2 committed Nov 10, 2024
1 parent 31f3308 commit d6484a6
Show file tree
Hide file tree
Showing 7 changed files with 283 additions and 47 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

0.3.8
- Added transaction managing functions: begin_transaction, commit, rollback, set_auto_commit, is_auto_commit, has_active_transaction

0.3.7
- DuckDB 1.1.3 bugfix release.
- Added release(resource) function.
Expand Down
96 changes: 96 additions & 0 deletions c_src/nif.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,96 @@ execute_statement(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) {
return nif::make_ok_tuple(env, resource_builder.make_and_release_resource(env));
}

static ERL_NIF_TERM
begin_transaction(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) {
if (argc != 1)
return enif_make_badarg(env);

auto connres = get_resource<duckdb::Connection>(env, argv[0]);
if (!connres)
return enif_make_badarg(env);

duckdb::unique_ptr<duckdb::QueryResult> result = connres->data->Query("BEGIN TRANSACTION");
if (result->HasError())
return nif::make_error_tuple(env, result->GetErrorObject().Message());

return nif::make_atom(env, "ok");
}

static ERL_NIF_TERM
commit(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) {
if (argc != 1)
return enif_make_badarg(env);

auto connres = get_resource<duckdb::Connection>(env, argv[0]);
if (!connres)
return enif_make_badarg(env);

duckdb::unique_ptr<duckdb::QueryResult> result = connres->data->Query("COMMIT");
if (result->HasError())
return nif::make_error_tuple(env, result->GetErrorObject().Message());

return nif::make_atom(env, "ok");
}

static ERL_NIF_TERM
rollback(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) {
if (argc != 1)
return enif_make_badarg(env);

auto connres = get_resource<duckdb::Connection>(env, argv[0]);
if (!connres)
return enif_make_badarg(env);

duckdb::unique_ptr<duckdb::QueryResult> result = connres->data->Query("ROLLBACK");
if (result->HasError())
return nif::make_error_tuple(env, result->GetErrorObject().Message());

return nif::make_atom(env, "ok");
}

static ERL_NIF_TERM
set_auto_commit(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) {
if (argc != 2)
return enif_make_badarg(env);

auto connres = get_resource<duckdb::Connection>(env, argv[0]);
if (!connres)
return enif_make_badarg(env);

duckdb::Value boolean;
if (!nif::term_to_boolean(env, argv[1], boolean))
return enif_make_badarg(env);

connres->data->SetAutoCommit(duckdb::BooleanValue::Get(boolean));

return nif::make_atom(env, "ok");
}

static ERL_NIF_TERM
is_auto_commit(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) {
if (argc != 1)
return enif_make_badarg(env);

auto connres = get_resource<duckdb::Connection>(env, argv[0]);
if (!connres)
return enif_make_badarg(env);

return nif::make_ok_tuple(env, nif::make_atom(env, connres->data->IsAutoCommit() ? "true" : "false"));
}

static ERL_NIF_TERM
has_active_transaction(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) {
if (argc != 1)
return enif_make_badarg(env);

auto connres = get_resource<duckdb::Connection>(env, argv[0]);
if (!connres)
return enif_make_badarg(env);

return nif::make_ok_tuple(env, nif::make_atom(env, connres->data->HasActiveTransaction() ? "true" : "false"));
}

static ERL_NIF_TERM
columns(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) {
if (argc != 1)
Expand Down Expand Up @@ -637,6 +727,12 @@ static ErlNifFunc nif_funcs[] = {
{"prepare_statement", 2, prepare_statement, ERL_NIF_DIRTY_JOB_IO_BOUND},
{"execute_statement", 1, execute_statement, ERL_NIF_DIRTY_JOB_IO_BOUND},
{"execute_statement", 2, execute_statement, ERL_NIF_DIRTY_JOB_IO_BOUND},
{"begin_transaction", 1, begin_transaction, ERL_NIF_DIRTY_JOB_IO_BOUND},
{"commit", 1, commit, ERL_NIF_DIRTY_JOB_IO_BOUND},
{"rollback", 1, rollback, ERL_NIF_DIRTY_JOB_IO_BOUND},
{"set_auto_commit", 2, set_auto_commit, ERL_NIF_DIRTY_JOB_IO_BOUND},
{"is_auto_commit", 1, is_auto_commit, ERL_NIF_DIRTY_JOB_IO_BOUND},
{"has_active_transaction", 1, has_active_transaction, ERL_NIF_DIRTY_JOB_IO_BOUND},
{"columns", 1, columns, ERL_NIF_DIRTY_JOB_IO_BOUND},
{"fetch_chunk", 1, fetch_chunk, ERL_NIF_DIRTY_JOB_IO_BOUND},
{"fetch_all", 1, fetch_all, ERL_NIF_DIRTY_JOB_IO_BOUND},
Expand Down
122 changes: 101 additions & 21 deletions lib/duckdbex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,26 @@ defmodule Duckdbex do
@type query_result() :: reference()
@type appender :: reference()

@doc """
Release resource (db, connection, stmt, query_result)
Will cause destruction and automatic closing the releasing resource in the calling process on dirty schedulers. The released resource cannot be used after this point.
If you do not release some resource you have the resource will be automatically released by the Erlang garbage collector (GC) when you lose last ref to the resource, but the GC may run resource releasing in the main schedulers which may affect the performance of your main system perfomance.
## Examples
iex> {:ok, db} = Duckdbex.open("my_database.duckdb", %Duckdbex.Config{})
iex> {:ok, conn} = Duckdbex.connection(db)
iex> {:ok, res} = Duckdbex.query(conn, "SELECT 1 WHERE $1 = 1;", [1])
iex> :ok = Duckdbex.release(res)
iex> :ok = Duckdbex.release(conn)
iex> :ok = Duckdbex.release(db)
"""
@spec release(db() | connection() | statement() | query_result() | appender()) :: :ok
def release(resource) when is_reference(resource),
do: Duckdbex.NIF.release(resource)

@doc """
Opens database in the specified file.
Expand Down Expand Up @@ -48,7 +68,6 @@ defmodule Duckdbex do
## Examples
iex> {:ok, _db} = Duckdbex.open()
"""
@spec open() :: {:ok, db()} | {:error, reason()}
def open(),
Expand Down Expand Up @@ -139,6 +158,87 @@ defmodule Duckdbex do
def execute_statement(statement, args) when is_reference(statement) and is_list(args),
do: Duckdbex.NIF.execute_statement(statement, args)

@doc """
Begin a transaction
## Examples
iex> {:ok, db} = Duckdbex.open()
iex> {:ok, conn} = Duckdbex.connection(db)
iex> :ok = Duckdbex.begin_transaction(conn)
"""
@spec begin_transaction(connection()) :: :ok | {:error, reason()}
def begin_transaction(connection) when is_reference(connection),
do: Duckdbex.NIF.begin_transaction(connection)

@doc """
Commit the transaction
## Examples
iex> {:ok, db} = Duckdbex.open()
iex> {:ok, conn} = Duckdbex.connection(db)
iex> :ok = Duckdbex.begin_transaction(conn)
iex> :ok = Duckdbex.commit(conn)
"""
@spec commit(connection()) :: :ok | {:error, reason()}
def commit(connection) when is_reference(connection),
do: Duckdbex.NIF.commit(connection)

@doc """
Rollback the transaction
## Examples
iex> {:ok, db} = Duckdbex.open()
iex> {:ok, conn} = Duckdbex.connection(db)
iex> :ok = Duckdbex.begin_transaction(conn)
iex> :ok = Duckdbex.rollback(conn)
"""
@spec rollback(connection()) :: :ok | {:error, reason()}
def rollback(connection) when is_reference(connection),
do: Duckdbex.NIF.rollback(connection)

@doc """
Set transaction autocommit for connection
## Examples
iex> {:ok, db} = Duckdbex.open()
iex> {:ok, conn} = Duckdbex.connection(db)
iex> :ok = Duckdbex.set_auto_commit(conn, true)
"""
@spec set_auto_commit(connection(), boolean()) :: :ok | {:error, reason()}
def set_auto_commit(connection, auto_commit?)
when is_reference(connection) and is_boolean(auto_commit?),
do: Duckdbex.NIF.set_auto_commit(connection, auto_commit?)

@doc """
Check if transaction autocommit is set for connection
## Examples
iex> {:ok, db} = Duckdbex.open()
iex> {:ok, conn} = Duckdbex.connection(db)
iex> {:ok, true} = Duckdbex.is_auto_commit(conn)
"""
@spec is_auto_commit(connection()) :: {:ok, boolean()} | {:error, reason()}
def is_auto_commit(connection) when is_reference(connection),
do: Duckdbex.NIF.is_auto_commit(connection)

@doc """
Checks if there is an active transaction for connection
## Examples
iex> {:ok, db} = Duckdbex.open()
iex> {:ok, conn} = Duckdbex.connection(db)
iex> {:ok, false} = Duckdbex.has_active_transaction(conn)
"""
@spec has_active_transaction(connection()) :: {:ok, boolean()} | {:error, reason()}
def has_active_transaction(connection) when is_reference(connection),
do: Duckdbex.NIF.has_active_transaction(connection)

@doc """
Returns columns names from the query result.
Expand Down Expand Up @@ -295,26 +395,6 @@ defmodule Duckdbex do
def appender_close(appender) when is_reference(appender),
do: Duckdbex.NIF.appender_close(appender)

@doc """
Release resource (db, connection, stmt, query_result)
Will cause destruction and automatic closing the releasing resource in the calling process on dirty schedulers. The released resource cannot be used after this point.
If you do not release some resource you have the resource will be automatically released by the Erlang garbage collector (GC) when you lose last ref to the resource, but the GC may run resource releasing in the main schedulers which may affect the performance of your main system perfomance.
## Examples
iex> {:ok, db} = Duckdbex.open("my_database.duckdb", %Duckdbex.Config{})
iex> {:ok, conn} = Duckdbex.connection(db)
iex> {:ok, res} = Duckdbex.query(conn, "SELECT 1 WHERE $1 = 1;", [1])
iex> :ok = Duckdbex.release(res)
iex> :ok = Duckdbex.release(conn)
iex> :ok = Duckdbex.release(db)
"""
@spec release(db() | connection() | statement() | query_result() | appender()) :: :ok
def release(resource) when is_reference(resource),
do: Duckdbex.NIF.release(resource)

@doc """
Returns the version of the linked DuckDB, with a version postfix for dev versions
Expand Down
24 changes: 21 additions & 3 deletions lib/nif.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ defmodule Duckdbex.NIF do
def init(),
do: :ok = :erlang.load_nif(Path.join(:code.priv_dir(:duckdbex), "duckdb_nif"), 0)

@spec release(db() | connection() | statement() | query_result() | appender()) :: :ok
def release(_resource), do: :erlang.nif_error(:not_loaded)

@spec open(binary(), Duckdbex.Config.t()) :: {:ok, db()} | {:error, reason()}
def open(_path, _config), do: :erlang.nif_error(:not_loaded)

Expand All @@ -34,6 +37,24 @@ defmodule Duckdbex.NIF do
@spec execute_statement(statement(), list()) :: {:ok, query_result()} | {:error, reason()}
def execute_statement(_statement, _args), do: :erlang.nif_error(:not_loaded)

@spec begin_transaction(connection()) :: :ok | {:error, reason()}
def begin_transaction(_conn), do: :erlang.nif_error(:not_loaded)

@spec commit(connection()) :: :ok | {:error, reason()}
def commit(_conn), do: :erlang.nif_error(:not_loaded)

@spec rollback(connection()) :: :ok | {:error, reason()}
def rollback(_conn), do: :erlang.nif_error(:not_loaded)

@spec set_auto_commit(connection(), boolean()) :: :ok | {:error, reason()}
def set_auto_commit(_conn, _auto_commit?), do: :erlang.nif_error(:not_loaded)

@spec is_auto_commit(connection()) :: {:ok, boolean()} | {:error, reason()}
def is_auto_commit(_conn), do: :erlang.nif_error(:not_loaded)

@spec has_active_transaction(connection()) :: {:ok, boolean()} | {:error, reason()}
def has_active_transaction(_conn), do: :erlang.nif_error(:not_loaded)

@spec columns(query_result()) :: list(binary()) | {:error, reason()}
def columns(_query_result), do: :erlang.nif_error(:not_loaded)

Expand Down Expand Up @@ -61,9 +82,6 @@ defmodule Duckdbex.NIF do
@spec appender_flush(appender()) :: :ok | {:error, reason()}
def appender_close(_appender), do: :erlang.nif_error(:not_loaded)

@spec release(db() | connection() | statement() | query_result() | appender()) :: :ok
def release(_resource), do: :erlang.nif_error(:not_loaded)

@spec library_version() :: binary()
def library_version(), do: :erlang.nif_error(:not_loaded)

Expand Down
4 changes: 2 additions & 2 deletions mix.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule Duckdbex.MixProject do
use Mix.Project

@version "0.3.7"
@version "0.3.8"
@duckdb_version "1.1.3"

def project do
Expand Down Expand Up @@ -48,7 +48,7 @@ defmodule Duckdbex.MixProject do
[
{:elixir_make, "~> 0.8", runtime: false},
{:cc_precompiler, "~> 0.1", runtime: false},
{:ex_doc, "~> 0.24", only: :dev, runtime: false}
{:ex_doc, "~> 0.34", only: :dev, runtime: false}
]
end

Expand Down
60 changes: 60 additions & 0 deletions test/duckdbex_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,66 @@ defmodule DuckdbexTest do
assert {:ok, _res} = Duckdbex.query(conn, "SELECT 1 WHERE 1 = $1;", [1])
end

test "prepare/execute statement" do
assert {:ok, db} = Duckdbex.open()
assert {:ok, conn} = Duckdbex.connection(db)
assert {:ok, stmt} = Duckdbex.prepare_statement(conn, "SELECT 1;")
assert {:ok, res} = Duckdbex.execute_statement(stmt)
assert [[1]] = Duckdbex.fetch_all(res)
end

test "prepare/execute statement with params" do
assert {:ok, db} = Duckdbex.open()
assert {:ok, conn} = Duckdbex.connection(db)
assert {:ok, stmt} = Duckdbex.prepare_statement(conn, "SELECT 1 WHERE $1 = 1;")
assert {:ok, res} = Duckdbex.execute_statement(stmt, [1])
assert [[1]] = Duckdbex.fetch_all(res)
end

test "begin_transaction/1" do
assert {:ok, db} = Duckdbex.open()
assert {:ok, conn} = Duckdbex.connection(db)
assert :ok = Duckdbex.begin_transaction(conn)
end

test "commit/1" do
assert {:ok, db} = Duckdbex.open()
assert {:ok, conn} = Duckdbex.connection(db)
assert :ok = Duckdbex.begin_transaction(conn)
assert :ok = Duckdbex.commit(conn)
end

test "rollback/1" do
assert {:ok, db} = Duckdbex.open()
assert {:ok, conn} = Duckdbex.connection(db)
assert :ok = Duckdbex.begin_transaction(conn)
assert :ok = Duckdbex.rollback(conn)
end

test "set_auto_commit/2" do
assert {:ok, db} = Duckdbex.open()
assert {:ok, conn} = Duckdbex.connection(db)
assert :ok = Duckdbex.set_auto_commit(conn, true)
end

test "is_auto_commit/1" do
assert {:ok, db} = Duckdbex.open()
assert {:ok, conn} = Duckdbex.connection(db)
assert {:ok, true} = Duckdbex.is_auto_commit(conn)
assert :ok = Duckdbex.set_auto_commit(conn, false)
assert {:ok, false} = Duckdbex.is_auto_commit(conn)
end

test "has_active_transaction/1" do
assert {:ok, db} = Duckdbex.open()
assert {:ok, conn} = Duckdbex.connection(db)
assert {:ok, false} = Duckdbex.has_active_transaction(conn)
assert :ok = Duckdbex.begin_transaction(conn)
assert {:ok, true} = Duckdbex.has_active_transaction(conn)
assert :ok = Duckdbex.rollback(conn)
assert {:ok, false} = Duckdbex.has_active_transaction(conn)
end

test "columns/1" do
assert {:ok, db} = Duckdbex.open()
assert {:ok, conn} = Duckdbex.connection(db)
Expand Down
Loading

0 comments on commit d6484a6

Please sign in to comment.