-
Notifications
You must be signed in to change notification settings - Fork 42
Home
#rxjava-jdbc rewrite for RxJava 2 These are some of the aims of the rewrite:
- leverage RxJava 2 features including more explicit types (Completable, Single, Maybe, Flowable, Observable), new creation methods, better performance, improved API.
- aim support at
Flowable
because backpressure can be applied in database interactions (for example result set paging) - support transactions across asynchronous boundaries
- support returning generated keys
- support batching
- reactive connection pools - connection pools for example c3po or Hikari provide blocking semaphore access to groups of connections. Blocking for access to jdbc
Connection
objects is not in keeping with reactive style so an rx connection pool is proposed (draft already written, tests required).
rxjava-jdbc supported transactions but relied on synchronous processing and ThreadLocal
s to hold context. This meant that a bunch of operators could not be applied within the stream for fear of stuffing up access to the ThreadLocal
. The rewrite will provide support for using Transactions across asynchronous boundaries at the expense of a slightly more verbose api.
###Example
Across asynchronous boundaries we need to pass a Tx
object. A select may not return any results but we still may want to have access to the transaction so we can for instance do another call within the same transaction. Tx
contains an atomic counter so we know when we can finalize the transaction with commit or rollback.
Tx
is like a Notification
object:
public interface Tx<T> {
boolean isValue();
boolean isComplete();
boolean isError();
T value();
Throwable throwable();
Connection connection();
void commit();
void rollback();
}
Example usage:
// use a new transaction for the select/update
db.transacted()
.select("select name from person where score > 1")
// don't care about valueless Completed Tx<String> emissions
.hasValueOnly()
// return the query results as Flowable<Tx<String>>
.getAs(String.class)
// for each value
.flatMap(tx ->
db.in(tx)
.select("select name, score from person where name=:name")
.param("name", tx.value())
.getAs(Double.class, String.class)
// not going to pass the transaction on further
// so let's make life simple by dealing only with
// the values from the select query
.flatMap(Tx.TO_VALUE)
.doOnNext(System.out::println)
// on termination will decrement Tx counter
)
// note that on termination will decrement Tx counter
// and if 0 will commit/rollback as appropriate
.subscribe();