-
Notifications
You must be signed in to change notification settings - Fork 42
Home
#rxjava-jdbc rewrite for RxJava 2 These are some of the aims of the rewrite:
- leverage RxJava 2 features including stronger type system (
Completable
,Single
,Maybe
,Flowable
,Observable
), new creation methods, better performance, improved API. - aim support at
Flowable
because backpressure can be applied in database interactions (for example result set paging) - support transactions across asynchronous boundaries
- support returning generated keys
- support batching
- reactive connection pools - connection pools for example c3po or Hikari provide blocking semaphore access to database connections. We can make management of our threads and connections more efficient by using a non-blocking reactive pool of connections (draft already written, tests required).
To achieve a reactive pool of objects:
- when an object becomes available for use all registered consumers race (non-blocking) to allocate the object to themselves. If a consuming stream wins the race then the object is emitted for processing through the reactive stream associated with the consumer.
- when an object is released, the object will be emitted to the winning consumer on the same thread that the release occurred on.
- a health check may be run on an object before being raced on
rxjava-jdbc supported transactions but relied on synchronous processing and ThreadLocal
s to hold context. This meant that a bunch of operators could not be applied within the stream for fear of stuffing up access to the ThreadLocal
. The rewrite will provide support for using Transactions across asynchronous boundaries at the expense of a slightly more verbose api.
###Example
Across asynchronous boundaries we need to pass a Tx
object. A select may not return any results but we still may want to have access to the transaction so we can for instance do another call within the same transaction. Tx
contains an atomic counter so we know when we can finalize the transaction with commit or rollback.
Tx
is like a Notification
object:
public interface Tx<T> {
boolean isValue();
boolean isComplete();
boolean isError();
T value();
Throwable throwable();
Connection connection();
void commit();
void rollback();
}
Example usage:
// use a new transaction for the select/update
db.transacted()
.select("select name from person where score > 1")
// filter out valueless terminal Tx<String> emissions
.valuesOnly()
// return the query results as Flowable<Tx<String>>
.getAs(String.class)
// for each value
.flatMap(tx ->
db.in(tx)
.select("select name, score from person where name=:name")
.param("name", tx.value())
.getAs(String.class, Number.class)
// not going to pass the transaction on further
// so let's make life simple by dealing only with
// the values from the select query
.flatMap(Tx.TO_VALUE)
.doOnNext(System.out::println)
// on termination will decrement Tx counter
// cross an asynchronous boundary at subscribe time!
.subscribeOn(Schedulers.io())
)
// note that on termination will decrement Tx counter
// and if 0 will commit/rollback as appropriate
.subscribe();
Please add your comments/requests to this wiki page to aid development of new API!