diff --git a/documentation/src/main/asciidoc/reference/introduction.adoc b/documentation/src/main/asciidoc/reference/introduction.adoc index 4f2237edb..e43170185 100644 --- a/documentation/src/main/asciidoc/reference/introduction.adoc +++ b/documentation/src/main/asciidoc/reference/introduction.adoc @@ -1070,6 +1070,20 @@ sessionFactory.withTransaction( (session, tx) -> session.persist(book) ) This is probably the most convenient thing to use most of the time. +=== Calling the Vert.x SQL client directly + +The `withConnection()` method makes it possible to call the Vert.x `SqlConnection` directly, for example: + +[source,java] +---- +session.withConnection((SqlConnection connection) + -> connection.query("select title from book").execute() + .toCompletionStage() ) + .invoke( results -> results.forEach( row -> ... ) ) +---- + +Most of the time, it's easier to just use a <<_queries,native query>>. + == Integrating with Vert.x :vertx-context-introduction: https://vertx.io/blog/an-introduction-to-the-vert-x-context-object/ diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/common/ConnectionConsumer.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/common/ConnectionConsumer.java new file mode 100644 index 000000000..3fa7c900a --- /dev/null +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/common/ConnectionConsumer.java @@ -0,0 +1,24 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: Apache-2.0 + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive.common; + +import org.hibernate.Incubating; + +import java.util.concurrent.CompletionStage; + +/** + * An operation which makes direct use of a database connection. + * + * @param <C> the connection type, usually + * {@link io.vertx.sqlclient.SqlConnection} + * @param <R> the result type of the operation, or {@link Void} + * + * @author Gavin King + */ +@Incubating @FunctionalInterface +public interface ConnectionConsumer<C, R> { + CompletionStage<R> accept(C connection); +} diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/Mutiny.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/Mutiny.java index 59edbb875..f211333e3 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/Mutiny.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/Mutiny.java @@ -28,6 +28,7 @@ import org.hibernate.query.criteria.HibernateCriteriaBuilder; import org.hibernate.query.criteria.JpaCriteriaInsert; import org.hibernate.reactive.common.AffectedEntities; +import org.hibernate.reactive.common.ConnectionConsumer; import org.hibernate.reactive.common.Identifier; import org.hibernate.reactive.common.ResultSetMapping; import org.hibernate.reactive.logging.impl.Log; @@ -1525,6 +1526,18 @@ default Session setCacheRetrieveMode(CacheRetrieveMode cacheRetrieveMode) { * The {@link SessionFactory} which created this session. */ SessionFactory getFactory(); + + /** + * Execute the given operation using the connection underlying the reactive session. + * + * @param consumer the operation to be executed + * @return the result of the operation via a {@link Uni} + * + * @param <C> the connection type, usually + * {@link io.vertx.sqlclient.SqlConnection} + * @param <R> the result type of the operation, or {@link Void} + */ + <C,R> Uni<R> withConnection(ConnectionConsumer<C,R> consumer); } /** @@ -2104,6 +2117,18 @@ default Uni<Void> refresh(Object entity, LockModeType lockModeType) { * The {@link SessionFactory} which created this session. */ SessionFactory getFactory(); + + /** + * Execute the given operation using the connection underlying the reactive session. + * + * @param consumer the operation to be executed + * @return the result of the operation via a {@link Uni} + * + * @param <C> the connection type, usually + * {@link io.vertx.sqlclient.SqlConnection} + * @param <R> the result type of the operation, or {@link Void} + */ + <C,R> Uni<R> withConnection(ConnectionConsumer<C,R> consumer); } /** diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/impl/MutinySessionImpl.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/impl/MutinySessionImpl.java index 929987b55..61cff4316 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/impl/MutinySessionImpl.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/impl/MutinySessionImpl.java @@ -24,6 +24,7 @@ import org.hibernate.graph.RootGraph; import org.hibernate.query.criteria.JpaCriteriaInsert; import org.hibernate.reactive.common.AffectedEntities; +import org.hibernate.reactive.common.ConnectionConsumer; import org.hibernate.reactive.common.Identifier; import org.hibernate.reactive.common.ResultSetMapping; import org.hibernate.reactive.engine.spi.ReactiveSharedSessionContractImplementor; @@ -606,4 +607,8 @@ else if ( ReactiveSharedSessionContractImplementor.class.isAssignableFrom( clazz throw new PersistenceException( "Cannot unwrap type " + clazz ); } + @Override + public <C, T> Uni<T> withConnection(ConnectionConsumer<C, T> consumer) { + return uni( () -> delegate.getReactiveConnection().withConnection( consumer ) ); + } } diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/impl/MutinyStatelessSessionImpl.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/impl/MutinyStatelessSessionImpl.java index d6e8e984e..88627581b 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/impl/MutinyStatelessSessionImpl.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/impl/MutinyStatelessSessionImpl.java @@ -12,6 +12,7 @@ import jakarta.persistence.criteria.CriteriaUpdate; import org.hibernate.LockMode; import org.hibernate.graph.spi.RootGraphImplementor; +import org.hibernate.reactive.common.ConnectionConsumer; import org.hibernate.reactive.common.ResultSetMapping; import org.hibernate.reactive.mutiny.Mutiny; import org.hibernate.reactive.mutiny.Mutiny.Query; @@ -370,4 +371,9 @@ public <T> EntityGraph<T> createEntityGraph(Class<T> rootType) { public <T> EntityGraph<T> createEntityGraph(Class<T> rootType, String graphName) { return delegate.createEntityGraph( rootType, graphName ); } + + @Override + public <C, T> Uni<T> withConnection(ConnectionConsumer<C, T> consumer) { + return uni( () -> delegate.getReactiveConnection().withConnection( consumer ) ); + } } diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/BatchingConnection.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/BatchingConnection.java index ce7ee013d..0eed8b8f4 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/BatchingConnection.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/BatchingConnection.java @@ -14,6 +14,7 @@ import org.hibernate.reactive.adaptor.impl.ResultSetAdaptor; import io.vertx.sqlclient.spi.DatabaseMetadata; +import org.hibernate.reactive.common.ConnectionConsumer; import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture; @@ -214,4 +215,9 @@ public CompletionStage<Void> rollbackTransaction() { public CompletionStage<Void> close() { return delegate.close(); } + + @Override + public <C, R> CompletionStage<R> withConnection(ConnectionConsumer<C, R> consumer) { + return delegate.withConnection( consumer ); + } } diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/ReactiveConnection.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/ReactiveConnection.java index 10e304116..31251baa2 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/ReactiveConnection.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/ReactiveConnection.java @@ -13,6 +13,7 @@ import org.hibernate.Incubating; import io.vertx.sqlclient.spi.DatabaseMetadata; +import org.hibernate.reactive.common.ConnectionConsumer; /** * Abstracts over reactive database connections, defining @@ -80,4 +81,6 @@ interface Result extends Iterator<Object[]> { CompletionStage<Void> executeBatch(); CompletionStage<Void> close(); + + <C,R> CompletionStage<R> withConnection(ConnectionConsumer<C,R> consumer); } diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/SqlClientConnection.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/SqlClientConnection.java index 3e8647f6c..9d8eb8756 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/SqlClientConnection.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/SqlClientConnection.java @@ -18,6 +18,7 @@ import org.hibernate.engine.jdbc.spi.SqlStatementLogger; import org.hibernate.reactive.adaptor.impl.JdbcNull; import org.hibernate.reactive.adaptor.impl.ResultSetAdaptor; +import org.hibernate.reactive.common.ConnectionConsumer; import org.hibernate.reactive.logging.impl.Log; import org.hibernate.reactive.logging.impl.LoggerFactory; import org.hibernate.reactive.pool.BatchingConnection; @@ -325,6 +326,11 @@ public CompletionStage<Void> close() { .toCompletionStage(); } + @Override @SuppressWarnings("unchecked") + public <C,R> CompletionStage<R> withConnection(ConnectionConsumer<C,R> consumer) { + return consumer.accept( (C) client() ); + } + @SuppressWarnings("unchecked") private static <T> T getLastInsertedId(RowSet<Row> rows, Class<T> idClass, String idColumnName) { final Long mySqlId = rows.property( MYSQL_LAST_INSERTED_ID ); diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/Stage.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/Stage.java index b490f2fb3..055f92748 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/Stage.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/Stage.java @@ -29,6 +29,7 @@ import org.hibernate.query.criteria.HibernateCriteriaBuilder; import org.hibernate.query.criteria.JpaCriteriaInsert; import org.hibernate.reactive.common.AffectedEntities; +import org.hibernate.reactive.common.ConnectionConsumer; import org.hibernate.reactive.common.Identifier; import org.hibernate.reactive.common.ResultSetMapping; import org.hibernate.reactive.logging.impl.Log; @@ -1557,6 +1558,18 @@ default Session setCacheRetrieveMode(CacheRetrieveMode cacheRetrieveMode) { * The {@link SessionFactory} which created this session. */ SessionFactory getFactory(); + + /** + * Execute the given operation using the connection underlying the reactive session. + * + * @param consumer the operation to be executed + * @return the result of the operation via a {@link CompletionStage} + * + * @param <C> the connection type, usually + * {@link io.vertx.sqlclient.SqlConnection} + * @param <R> the result type of the operation, or {@link Void} + */ + <C,R> CompletionStage<R> withConnection(ConnectionConsumer<C,R> consumer); } /** @@ -2155,6 +2168,18 @@ default CompletionStage<Void> refresh(Object entity, LockModeType lockModeType) * The {@link SessionFactory} which created this session. */ SessionFactory getFactory(); + + /** + * Execute the given operation using the connection underlying the reactive session. + * + * @param consumer the operation to be executed + * @return the result of the operation via a {@link CompletionStage} + * + * @param <C> the connection type, usually + * {@link io.vertx.sqlclient.SqlConnection} + * @param <R> the result type of the operation, or {@link Void} + */ + <C,R> CompletionStage<R> withConnection(ConnectionConsumer<C,R> consumer); } /** diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/impl/StageSessionImpl.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/impl/StageSessionImpl.java index 78ce3c891..819c55b9a 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/impl/StageSessionImpl.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/impl/StageSessionImpl.java @@ -24,6 +24,7 @@ import org.hibernate.jpa.internal.util.LockModeTypeHelper; import org.hibernate.query.criteria.JpaCriteriaInsert; import org.hibernate.reactive.common.AffectedEntities; +import org.hibernate.reactive.common.ConnectionConsumer; import org.hibernate.reactive.common.Identifier; import org.hibernate.reactive.common.ResultSetMapping; import org.hibernate.reactive.engine.ReactiveActionQueue; @@ -609,4 +610,9 @@ else if ( ReactiveSharedSessionContractImplementor.class.isAssignableFrom( clazz } throw new PersistenceException( "Cannot unwrap type " + clazz ); } + + @Override + public <C, T> CompletionStage<T> withConnection(ConnectionConsumer<C, T> consumer) { + return delegate.getReactiveConnection().withConnection( consumer ); + } } diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/impl/StageStatelessSessionImpl.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/impl/StageStatelessSessionImpl.java index aadd76285..376e2b11a 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/impl/StageStatelessSessionImpl.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/impl/StageStatelessSessionImpl.java @@ -8,6 +8,7 @@ import org.hibernate.LockMode; import org.hibernate.graph.spi.RootGraphImplementor; import org.hibernate.query.criteria.JpaCriteriaInsert; +import org.hibernate.reactive.common.ConnectionConsumer; import org.hibernate.reactive.common.ResultSetMapping; import org.hibernate.reactive.pool.ReactiveConnection; import org.hibernate.reactive.session.ReactiveStatelessSession; @@ -373,4 +374,9 @@ public <T> EntityGraph<T> createEntityGraph(Class<T> rootType) { public <T> EntityGraph<T> createEntityGraph(Class<T> rootType, String graphName) { return delegate.createEntityGraph( rootType, graphName ); } + + @Override + public <C, T> CompletionStage<T> withConnection(ConnectionConsumer<C, T> consumer) { + return delegate.getReactiveConnection().withConnection( consumer ); + } } diff --git a/hibernate-reactive-core/src/test/java/org/hibernate/reactive/MutinySessionTest.java b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/MutinySessionTest.java index fb8c7e1f1..0b2a5d5ce 100644 --- a/hibernate-reactive-core/src/test/java/org/hibernate/reactive/MutinySessionTest.java +++ b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/MutinySessionTest.java @@ -10,6 +10,7 @@ import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; +import io.vertx.sqlclient.SqlConnection; import org.hibernate.LockMode; import org.hibernate.reactive.mutiny.Mutiny; @@ -668,6 +669,22 @@ public void testForceFlushWithDelete(VertxTestContext context) { ); } + @Test + public void reactiveWithConnection(VertxTestContext context) { + test( + context, + getMutinySessionFactory() + .withTransaction( (s,t) -> s.withConnection((SqlConnection c) + -> c.query("select name from Pig").execute().toCompletionStage() ) ) + .invoke( res -> res.forEach( row -> { + assertEquals(1, row.size() ); + assertEquals("Aloi", row.getString("name") ); + }) ) + ); + } + + + private void assertThatPigsAreEqual(GuineaPig expected, GuineaPig actual) { assertNotNull( actual ); assertEquals( expected.getId(), actual.getId() ); diff --git a/hibernate-reactive-core/src/test/java/org/hibernate/reactive/ReactiveSessionTest.java b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/ReactiveSessionTest.java index 78494bd1c..41ce31075 100644 --- a/hibernate-reactive-core/src/test/java/org/hibernate/reactive/ReactiveSessionTest.java +++ b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/ReactiveSessionTest.java @@ -10,6 +10,7 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; +import io.vertx.sqlclient.SqlConnection; import org.hibernate.LockMode; import org.hibernate.reactive.common.AffectedEntities; import org.hibernate.reactive.stage.Stage; @@ -969,6 +970,20 @@ context, openSession() ); } + @Test + public void reactiveWithConnection(VertxTestContext context) { + test( + context, + getMutinySessionFactory() + .withTransaction( (s,t) -> s.withConnection((SqlConnection c) + -> c.query("select name from Pig").execute().toCompletionStage() ) ) + .invoke( res -> res.forEach( row -> { + assertEquals(1, row.size() ); + assertEquals("Aloi", row.getString("name") ); + }) ) + ); + } + private void assertThatPigsAreEqual(GuineaPig expected, GuineaPig actual) { assertNotNull( actual ); assertEquals( expected.getId(), actual.getId() );