Skip to content
Dave Moten edited this page May 26, 2017 · 29 revisions

rxjava-jdbc rewrite for RxJava 2

These are some of the aims of the rewrite:

  • leverage RxJava 2 features including stronger type system (Completable, Single, Maybe, Flowable, Observable), new creation methods, better performance, improved API.
  • aim support at Flowable because backpressure can be applied in database interactions (for example result set paging)
  • support transactions across asynchronous boundaries
  • support returning generated keys
  • support batching
  • reactive connection pools - connection pools for example c3po or Hikari provide blocking semaphore access to database connections. We can make management of our threads and connections more efficient by using a non-blocking reactive pool of connections (done)
  • support CallableStatement and cursors.
  • improve error handling (throw meaningful error messages rather than NPE or whatever)
  • add Database.test() method to enable easier play/testing with the library

API ideas

Transaction support

rxjava-jdbc supported transactions but relied on synchronous processing and ThreadLocals to hold context. This meant that a bunch of operators could not be applied within the stream for fear of stuffing up access to the ThreadLocal. The rewrite will provide support for using Transactions across asynchronous boundaries at the expense of a slightly more verbose api.

Example

Across asynchronous boundaries we need to pass a Tx object. A select may not return any results but we still may want to have access to the transaction so we can for instance do another call within the same transaction. Tx contains an atomic counter so we know when we can finalize the transaction with commit or rollback.

Tx is like a Notification object:

public interface Tx<T> {
   boolean isValue();
   boolean isComplete();
   boolean isError();
   T value();
   Throwable throwable();
   Connection connection();
   void commit();
   void rollback();
   SelectBuilder select(String sql);
}

Example usage:

    // use a new transaction for the select/update
    db.transacted()
      .select("select name from person where score > 1")
      // filter out valueless terminal Tx<String> emissions
      .valuesOnly()
      // return the query results as Flowable<Tx<String>>
      .getAs(String.class)
      // for each value
      .flatMap(tx -> 
          tx
            .select("select name, score from person where name=:name")
            .parameter("name", tx.value())
            .getAs(String.class, Number.class)
            // not going to pass the transaction on further
            // so let's make life simple by dealing only with 
            // the values from the select query
            .flatMap(Tx.TO_VALUE)
            .doOnNext(System.out::println)
            // on termination will decrement Tx counter
            // cross an asynchronous boundary at subscribe time!
            .subscribeOn(Schedulers.io())
       )
      // note that on termination will decrement Tx counter
      // and if 0 will commit/rollback as appropriate 
      .subscribe();

Please add your comments/requests to this wiki page to aid development of new API!

Clone this wiki locally