diff --git a/build.sbt b/build.sbt index 2e7266e0f..d236edc22 100644 --- a/build.sbt +++ b/build.sbt @@ -198,16 +198,18 @@ lazy val cats_effect_2 = (project in file("elastic4s-effect-cats-2")) .settings(libraryDependencies += cats2) lazy val zio_1 = (project in file("elastic4s-effect-zio-1")) - .dependsOn(core, testkit % "test") + .dependsOn(core, clientsttp % "test", testkit % "test") .settings(name := "elastic4s-effect-zio-1") .settings(scala3Settings) .settings(libraryDependencies ++= Dependencies.zio1) + .settings(libraryDependencies ++= Seq(Dependencies.sttpZio1Backend % "test")) lazy val zio = (project in file("elastic4s-effect-zio")) - .dependsOn(core, testkit % "test") + .dependsOn(core, clientsttp % "test", testkit % "test") .settings(name := "elastic4s-effect-zio") .settings(scala3Settings) .settings(libraryDependencies ++= Dependencies.zio) + .settings(libraryDependencies ++= Seq(Dependencies.sttpZioBackend % "test")) lazy val scalaz = (project in file("elastic4s-effect-scalaz")) .dependsOn(core) @@ -302,7 +304,7 @@ lazy val clientsttp = (project in file("elastic4s-client-sttp")) .dependsOn(core, testkit % "test") .settings(name := "elastic4s-client-sttp") .settings(scala3Settings) - .settings(libraryDependencies ++= Seq(sttp, asyncHttpClientBackendFuture)) + .settings(libraryDependencies ++= Seq(sttp, sttpFutureBackend)) lazy val clientakka = (project in file("elastic4s-client-akka")) .dependsOn(core, testkit % "test") diff --git a/elastic4s-client-akka/src/main/scala/com/sksamuel/elastic4s/akka/AkkaHttpClient.scala b/elastic4s-client-akka/src/main/scala/com/sksamuel/elastic4s/akka/AkkaHttpClient.scala index 7a8f41b36..f600ef0c4 100644 --- a/elastic4s-client-akka/src/main/scala/com/sksamuel/elastic4s/akka/AkkaHttpClient.scala +++ b/elastic4s-client-akka/src/main/scala/com/sksamuel/elastic4s/akka/AkkaHttpClient.scala @@ -23,7 +23,7 @@ class AkkaHttpClient private[akka] ( blacklist: Blacklist, httpPoolFactory: HttpPoolFactory )(implicit system: ActorSystem) - extends ElasticHttpClient { + extends ElasticHttpClient[Future] { import AkkaHttpClient._ import system.dispatcher @@ -200,30 +200,16 @@ class AkkaHttpClient private[akka] ( } } - private[akka] def sendAsync( + override def send( request: ElasticRequest ): Future[ElasticHttpResponse] = { queueRequestWithRetry(request) } - override def send( - request: ElasticRequest, - callback: Either[Throwable, ElasticHttpResponse] => Unit - ): Unit = { - sendAsync(request).onComplete { - case Success(r) => callback(Right(r)) - case Failure(e) => callback(Left(e)) - } - } - - def shutdown(): Future[Unit] = { + override def close(): Future[Unit] = { httpPoolFactory.shutdown() } - override def close(): Unit = { - shutdown() - } - private def toRequest(request: ElasticRequest, host: String): Try[HttpRequest] = Try { val httpRequest = HttpRequest( diff --git a/elastic4s-client-akka/src/test/scala/com/sksamuel/elastic4s/akka/AkkaHttpClientMockTest.scala b/elastic4s-client-akka/src/test/scala/com/sksamuel/elastic4s/akka/AkkaHttpClientMockTest.scala index d21319320..8d861d05a 100644 --- a/elastic4s-client-akka/src/test/scala/com/sksamuel/elastic4s/akka/AkkaHttpClientMockTest.scala +++ b/elastic4s-client-akka/src/test/scala/com/sksamuel/elastic4s/akka/AkkaHttpClientMockTest.scala @@ -68,7 +68,7 @@ class AkkaHttpClientMockTest .thenReturn(Success(HttpResponse().withEntity("ok"))) client - .sendAsync(ElasticRequest("GET", "/test")) + .send(ElasticRequest("GET", "/test")) .futureValue shouldBe ElasticResponse( 200, Some(ElasticEntity.StringEntity("ok", None)), @@ -102,7 +102,7 @@ class AkkaHttpClientMockTest .thenReturn(Success(HttpResponse(StatusCodes.BadGateway))) client - .sendAsync(ElasticRequest("GET", "/test")) + .send(ElasticRequest("GET", "/test")) .futureValue shouldBe ElasticResponse( 502, Some(ElasticEntity.StringEntity("", None)), @@ -141,7 +141,7 @@ class AkkaHttpClientMockTest .thenReturn(Success(HttpResponse().withEntity("host2"))) client - .sendAsync(ElasticRequest("GET", "/test")) + .send(ElasticRequest("GET", "/test")) .futureValue } @@ -177,7 +177,7 @@ class AkkaHttpClientMockTest .thenReturn(Success(HttpResponse().withEntity("host2"))) client - .sendAsync(ElasticRequest("GET", "/test")) + .send(ElasticRequest("GET", "/test")) .futureValue } @@ -207,7 +207,7 @@ class AkkaHttpClientMockTest .thenReturn(Success(HttpResponse().withEntity("host2"))) client - .sendAsync(ElasticRequest("GET", "/test")) + .send(ElasticRequest("GET", "/test")) .futureValue shouldBe ElasticResponse( 200, Some(ElasticEntity.StringEntity("host2", None)), diff --git a/elastic4s-client-akka/src/test/scala/com/sksamuel/elastic4s/akka/AkkaHttpClientTest.scala b/elastic4s-client-akka/src/test/scala/com/sksamuel/elastic4s/akka/AkkaHttpClientTest.scala index ee86f4806..56bff89cc 100644 --- a/elastic4s-client-akka/src/test/scala/com/sksamuel/elastic4s/akka/AkkaHttpClientTest.scala +++ b/elastic4s-client-akka/src/test/scala/com/sksamuel/elastic4s/akka/AkkaHttpClientTest.scala @@ -31,14 +31,16 @@ class AkkaHttpClientTest extends AnyFlatSpec with Matchers with DockerTests with deleteIndex("testindex") }.await - akkaClient.shutdown().await + akkaClient.close().await system.terminate().await } } private lazy val akkaClient = AkkaHttpClient(AkkaHttpClientSettings(List(s"$elasticHost:$elasticPort"))) - override val client = ElasticClient(akkaClient) + def mkAkkaBasedClient(implicit executor: Executor[Future]): ElasticClient[Future] = ElasticClient(akkaClient) + + override lazy val client = mkAkkaBasedClient "AkkaHttpClient" should "support utf-8" in { @@ -106,14 +108,12 @@ class AkkaHttpClientTest extends AnyFlatSpec with Matchers with DockerTests with } it should "propagate headers if included" in { - implicit val executor: Executor[Future] = new Executor[Future] { - override def exec(client: HttpClient, request: ElasticRequest): Future[HttpResponse] = { - val cred = Base64.getEncoder.encodeToString("user123:pass123".getBytes(StandardCharsets.UTF_8)) - Executor.FutureExecutor.exec(client, request.copy(headers = Map("Authorization" -> s"Basic $cred"))) - } + implicit val executor: Executor[Future] = (client: HttpClient[Future], request: ElasticRequest) => { + val cred = Base64.getEncoder.encodeToString("user123:pass123".getBytes(StandardCharsets.UTF_8)) + client.send(request.copy(headers = Map("Authorization" -> s"Basic $cred"))) } - client.execute { + mkAkkaBasedClient.execute { catHealth() }.await.result.status shouldBe "401" } diff --git a/elastic4s-client-esjava/src/main/scala/com/sksamuel/elastic4s/http/JavaClient.scala b/elastic4s-client-esjava/src/main/scala/com/sksamuel/elastic4s/http/JavaClient.scala index d02140e99..faf1ad0e7 100644 --- a/elastic4s-client-esjava/src/main/scala/com/sksamuel/elastic4s/http/JavaClient.scala +++ b/elastic4s-client-esjava/src/main/scala/com/sksamuel/elastic4s/http/JavaClient.scala @@ -12,6 +12,7 @@ import org.elasticsearch.client.RestClientBuilder.{HttpClientConfigCallback, Req import org.elasticsearch.client.{Request, ResponseException, ResponseListener, RestClient} import org.slf4j.{Logger, LoggerFactory} +import scala.concurrent.{ExecutionContext, Future, Promise} import scala.io.{Codec, Source} import scala.language.higherKinds @@ -20,7 +21,7 @@ case class JavaClientExceptionWrapper(t: Throwable) extends RuntimeException(t) /** * An implementation of HttpClient that wraps the Elasticsearch Java Rest Client */ -class JavaClient(client: RestClient) extends HttpClient { +class JavaClient(client: RestClient)(implicit ec: ExecutionContext) extends HttpClient[Future] { def apacheEntity(entity: HttpEntity): AbstractHttpEntity = entity match { case e: HttpEntity.StringEntity => @@ -58,16 +59,18 @@ class JavaClient(client: RestClient) extends HttpClient { HttpResponse(r.getStatusLine.getStatusCode, entity, headers) } - override def send(req: ElasticRequest, callback: Either[Throwable, HttpResponse] => Unit): Unit = { + override def send(req: ElasticRequest): Future[HttpResponse] = { if (logger.isDebugEnabled) { logger.debug("Executing elastic request {}", Show[ElasticRequest].show(req)) } + val promise = Promise[HttpResponse]() + val l = new ResponseListener { - override def onSuccess(r: org.elasticsearch.client.Response): Unit = callback(Right(fromResponse(r))) + override def onSuccess(r: org.elasticsearch.client.Response): Unit = promise.success(fromResponse(r)) override def onFailure(e: Exception): Unit = e match { - case re: ResponseException => callback(Right(fromResponse(re.getResponse))) - case t => callback(Left(JavaClientExceptionWrapper(t))) + case re: ResponseException => promise.success(fromResponse(re.getResponse)) + case t => promise.failure(JavaClientExceptionWrapper(t)) } } @@ -78,9 +81,11 @@ class JavaClient(client: RestClient) extends HttpClient { req.headers.foreach((optBuilder.addHeader _).tupled) request.setOptions(optBuilder) client.performRequestAsync(request, l) + + promise.future } - override def close(): Unit = client.close() + override def close(): Future[Unit] = Future(client.close()) private def isEntityGziped(entity: org.apache.http.HttpEntity): Boolean = { Option(entity.getContentEncoding).flatMap(x => Option(x.getValue)).contains("gzip") @@ -97,27 +102,33 @@ object JavaClient { * @param client the Java client to wrap * @return newly created Scala client */ - def fromRestClient(client: RestClient): JavaClient = new JavaClient(client) + def fromRestClient(client: RestClient)(implicit ec: ExecutionContext): JavaClient = new JavaClient(client) /** * Creates a new [[ElasticClient]] using the elasticsearch Java API rest client * as the underlying client. Optional callbacks can be passed in to configure the client. */ - def apply(props: ElasticProperties): JavaClient = + def apply(props: ElasticProperties)(implicit ec: ExecutionContext): JavaClient = apply(props, NoOpRequestConfigCallback, NoOpHttpClientConfigCallback) /** * Creates a new [[ElasticClient]] using the elasticsearch Java API rest client * as the underlying client. Optional callbacks can be passed in to configure the client. */ - def apply(props: ElasticProperties, requestConfigCallback: RequestConfigCallback): JavaClient = + def apply( + props: ElasticProperties, + requestConfigCallback: RequestConfigCallback + )(implicit ec: ExecutionContext): JavaClient = apply(props, requestConfigCallback, NoOpHttpClientConfigCallback) /** * Creates a new [[ElasticClient]] using the elasticsearch Java API rest client * as the underlying client. Optional callbacks can be passed in to configure the client. */ - def apply(props: ElasticProperties, httpClientConfigCallback: HttpClientConfigCallback): JavaClient = + def apply( + props: ElasticProperties, + httpClientConfigCallback: HttpClientConfigCallback + )(implicit ec: ExecutionContext): JavaClient = apply(props, NoOpRequestConfigCallback, httpClientConfigCallback) /** @@ -126,7 +137,7 @@ object JavaClient { */ def apply(props: ElasticProperties, requestConfigCallback: RequestConfigCallback, - httpClientConfigCallback: HttpClientConfigCallback): JavaClient = { + httpClientConfigCallback: HttpClientConfigCallback)(implicit ec: ExecutionContext): JavaClient = { val hosts = props.endpoints.map { case ElasticNodeEndpoint(protocol, host, port, _) => new HttpHost(host, port, protocol) } diff --git a/elastic4s-client-pekko/src/main/scala/com/sksamuel/elastic4s/pekko/PekkoHttpClient.scala b/elastic4s-client-pekko/src/main/scala/com/sksamuel/elastic4s/pekko/PekkoHttpClient.scala index b95aeb18c..756a4e524 100644 --- a/elastic4s-client-pekko/src/main/scala/com/sksamuel/elastic4s/pekko/PekkoHttpClient.scala +++ b/elastic4s-client-pekko/src/main/scala/com/sksamuel/elastic4s/pekko/PekkoHttpClient.scala @@ -23,7 +23,7 @@ class PekkoHttpClient private[pekko]( blacklist: Blacklist, httpPoolFactory: HttpPoolFactory )(implicit system: ActorSystem) - extends ElasticHttpClient { + extends ElasticHttpClient[Future] { import PekkoHttpClient._ import system.dispatcher @@ -200,30 +200,14 @@ class PekkoHttpClient private[pekko]( } } - private[pekko] def sendAsync( - request: ElasticRequest - ): Future[ElasticHttpResponse] = { + override def send(request: ElasticRequest): Future[ElasticHttpResponse] = { queueRequestWithRetry(request) } - override def send( - request: ElasticRequest, - callback: Either[Throwable, ElasticHttpResponse] => Unit - ): Unit = { - sendAsync(request).onComplete { - case Success(r) => callback(Right(r)) - case Failure(e) => callback(Left(e)) - } - } - - def shutdown(): Future[Unit] = { + override def close(): Future[Unit] = { httpPoolFactory.shutdown() } - override def close(): Unit = { - shutdown() - } - private def toRequest(request: ElasticRequest, host: String): Try[HttpRequest] = Try { val httpRequest = HttpRequest( diff --git a/elastic4s-client-pekko/src/test/scala/com/sksamuel/elastic4s/pekko/PekkoHttpClientMockTest.scala b/elastic4s-client-pekko/src/test/scala/com/sksamuel/elastic4s/pekko/PekkoHttpClientMockTest.scala index 76a29fe97..0dc950d3d 100644 --- a/elastic4s-client-pekko/src/test/scala/com/sksamuel/elastic4s/pekko/PekkoHttpClientMockTest.scala +++ b/elastic4s-client-pekko/src/test/scala/com/sksamuel/elastic4s/pekko/PekkoHttpClientMockTest.scala @@ -68,7 +68,7 @@ class PekkoHttpClientMockTest .thenReturn(Success(HttpResponse().withEntity("ok"))) client - .sendAsync(ElasticRequest("GET", "/test")) + .send(ElasticRequest("GET", "/test")) .futureValue shouldBe ElasticResponse( 200, Some(ElasticEntity.StringEntity("ok", None)), @@ -102,7 +102,7 @@ class PekkoHttpClientMockTest .thenReturn(Success(HttpResponse(StatusCodes.BadGateway))) client - .sendAsync(ElasticRequest("GET", "/test")) + .send(ElasticRequest("GET", "/test")) .futureValue shouldBe ElasticResponse( 502, Some(ElasticEntity.StringEntity("", None)), @@ -141,7 +141,7 @@ class PekkoHttpClientMockTest .thenReturn(Success(HttpResponse().withEntity("host2"))) client - .sendAsync(ElasticRequest("GET", "/test")) + .send(ElasticRequest("GET", "/test")) .futureValue } @@ -177,7 +177,7 @@ class PekkoHttpClientMockTest .thenReturn(Success(HttpResponse().withEntity("host2"))) client - .sendAsync(ElasticRequest("GET", "/test")) + .send(ElasticRequest("GET", "/test")) .futureValue } @@ -207,7 +207,7 @@ class PekkoHttpClientMockTest .thenReturn(Success(HttpResponse().withEntity("host2"))) client - .sendAsync(ElasticRequest("GET", "/test")) + .send(ElasticRequest("GET", "/test")) .futureValue shouldBe ElasticResponse( 200, Some(ElasticEntity.StringEntity("host2", None)), diff --git a/elastic4s-client-pekko/src/test/scala/com/sksamuel/elastic4s/pekko/PekkoHttpClientTest.scala b/elastic4s-client-pekko/src/test/scala/com/sksamuel/elastic4s/pekko/PekkoHttpClientTest.scala index 4393b8a9c..4c0fe71bf 100644 --- a/elastic4s-client-pekko/src/test/scala/com/sksamuel/elastic4s/pekko/PekkoHttpClientTest.scala +++ b/elastic4s-client-pekko/src/test/scala/com/sksamuel/elastic4s/pekko/PekkoHttpClientTest.scala @@ -31,14 +31,17 @@ class PekkoHttpClientTest extends AnyFlatSpec with Matchers with DockerTests wit deleteIndex("testindex") }.await - pekkoClient.shutdown().await + pekkoClient.close().await system.terminate().await } } private lazy val pekkoClient = PekkoHttpClient(PekkoHttpClientSettings(List(s"$elasticHost:$elasticPort"))) - override val client = ElasticClient(pekkoClient) + def mkPekkoBasedClient(implicit executor: Executor[Future]): ElasticClient[Future] = + ElasticClient(pekkoClient) + + override lazy val client = mkPekkoBasedClient "PekkoHttpClient" should "support utf-8" in { @@ -106,14 +109,12 @@ class PekkoHttpClientTest extends AnyFlatSpec with Matchers with DockerTests wit } it should "propagate headers if included" in { - implicit val executor: Executor[Future] = new Executor[Future] { - override def exec(client: HttpClient, request: ElasticRequest): Future[HttpResponse] = { - val cred = Base64.getEncoder.encodeToString("user123:pass123".getBytes(StandardCharsets.UTF_8)) - Executor.FutureExecutor.exec(client, request.copy(headers = Map("Authorization" -> s"Basic $cred"))) - } + implicit val executor: Executor[Future] = (client: HttpClient[Future], request: ElasticRequest) => { + val cred = Base64.getEncoder.encodeToString("user123:pass123".getBytes(StandardCharsets.UTF_8)) + client.send(request.copy(headers = Map("Authorization" -> s"Basic $cred"))) } - client.execute { + mkPekkoBasedClient.execute { catHealth() }.await.result.status shouldBe "401" } diff --git a/elastic4s-client-sniffed/src/main/scala/com/sksamuel/elastic4s/http/JavaClientSniffed.scala b/elastic4s-client-sniffed/src/main/scala/com/sksamuel/elastic4s/http/JavaClientSniffed.scala index 348842821..1f4b0caeb 100644 --- a/elastic4s-client-sniffed/src/main/scala/com/sksamuel/elastic4s/http/JavaClientSniffed.scala +++ b/elastic4s-client-sniffed/src/main/scala/com/sksamuel/elastic4s/http/JavaClientSniffed.scala @@ -8,6 +8,7 @@ import org.elasticsearch.client.RestClientBuilder.{HttpClientConfigCallback, Req import org.elasticsearch.client.sniff.{NodesSniffer, SniffOnFailureListener, Sniffer} import org.slf4j.{Logger, LoggerFactory} +import scala.concurrent.ExecutionContext import scala.concurrent.duration.{FiniteDuration, _} /** @@ -35,7 +36,7 @@ object JavaClientSniffed { * as the underlying client. Optional callbacks can be passed in to configure the client. * Sniffing is added by the [[SniffingConfiguration]] */ - def apply(props: ElasticProperties, sniffingConfiguration: SniffingConfiguration): JavaClient = + def apply(props: ElasticProperties, sniffingConfiguration: SniffingConfiguration)(implicit ec: ExecutionContext): JavaClient = apply(props, NoOpRequestConfigCallback, NoOpHttpClientConfigCallback, sniffingConfiguration) /** @@ -46,7 +47,7 @@ object JavaClientSniffed { def apply(props: ElasticProperties, requestConfigCallback: RequestConfigCallback, httpClientConfigCallback: HttpClientConfigCallback, - sniffingConfiguration: SniffingConfiguration): JavaClient = { + sniffingConfiguration: SniffingConfiguration)(implicit ec: ExecutionContext): JavaClient = { val hosts = props.endpoints.map { case ElasticNodeEndpoint(protocol, host, port, _) => new HttpHost(host, port, protocol) } diff --git a/elastic4s-client-sttp/src/main/scala/com/sksamuel/elastic4s/sttp/SttpRequestHttpClient.scala b/elastic4s-client-sttp/src/main/scala/com/sksamuel/elastic4s/sttp/SttpRequestHttpClient.scala index e2898bf80..cd270e6e4 100644 --- a/elastic4s-client-sttp/src/main/scala/com/sksamuel/elastic4s/sttp/SttpRequestHttpClient.scala +++ b/elastic4s-client-sttp/src/main/scala/com/sksamuel/elastic4s/sttp/SttpRequestHttpClient.scala @@ -2,27 +2,24 @@ package com.sksamuel.elastic4s.sttp import java.io._ import java.nio.file.Files - import com.sksamuel.elastic4s.HttpEntity.{ByteArrayEntity, FileEntity, InputStreamEntity, StringEntity} import com.sksamuel.elastic4s.{ElasticNodeEndpoint, ElasticRequest, ElasticsearchClientUri, HttpClient, HttpEntity, HttpResponse} import com.sksamuel.elastic4s.ext.OptionImplicits._ + import scala.concurrent.{ExecutionContext, Future} import scala.io.Source -import scala.util.{Failure, Success} import sttp.client3._ import sttp.client3.asynchttpclient.future.AsyncHttpClientFutureBackend import sttp.model.Uri import sttp.model.Uri.{PathSegments, QuerySegment} +import sttp.monad.{FutureMonad, MonadError} +import sttp.monad.syntax.MonadErrorOps -class SttpRequestHttpClient(nodeEndpoint: ElasticNodeEndpoint)( - implicit ec: ExecutionContext, sttpBackend: SttpBackend[Future, Any]) extends HttpClient { +import scala.language.higherKinds - /** Alternative constructor for backwards compatibility. */ - @deprecated("Use the constructor which takes an ElasticNodeEndpoint", "7.3.2") - def this(clientUri: ElasticsearchClientUri) = { - this(ElasticNodeEndpoint("http", clientUri.hosts.head._1, clientUri.hosts.head._2, None))( - SttpRequestHttpClient.defaultEc, SttpRequestHttpClient.defaultSttpBackend) - } +class SttpRequestHttpClient[F[_] : MonadError]( + nodeEndpoint: ElasticNodeEndpoint +)(implicit sttpBackend: SttpBackend[F, Any]) extends HttpClient[F] { private def request(method: String, endpoint: String, params: Map[String, Any], headers: Map[String, String]): Request[String, Any] = { val url = new Uri( @@ -68,35 +65,44 @@ class SttpRequestHttpClient(nodeEndpoint: ElasticNodeEndpoint)( } } - override def close(): Unit = sttpBackend.close() + override def close(): F[Unit] = sttpBackend.close() /** - * Sends the given request to elasticsearch. - * - * Implementations should invoke the callback function once the response is known. - * - * The callback function should be invoked with a HttpResponse for all requests that received - * a response, including 4xx and 5xx responses. The callback function should only be invoked - * with an exception if the client failed. - */ - override def send(request: ElasticRequest, callback: Either[Throwable, HttpResponse] => Unit): Unit = { + * Sends the given request to elasticsearch. + * + * Implementations should invoke the callback function once the response is known. + * + * The callback function should be invoked with a HttpResponse for all requests that received + * a response, including 4xx and 5xx responses. The callback function should only be invoked + * with an exception if the client failed. + */ + override def send(request: ElasticRequest): F[HttpResponse] = { val f = request.entity match { case Some(entity) => async(request.method, request.endpoint, request.params, request.headers, entity).send(sttpBackend) case None => async(request.method, request.endpoint, request.params, request.headers).send(sttpBackend) } - f.onComplete { - case Success(resp) => callback(Right(processResponse(resp))) - case Failure(t) => callback(Left(t)) - } + + f.map(processResponse) } } object SttpRequestHttpClient { - private def defaultEc: ExecutionContext = ExecutionContext.global - private def defaultSttpBackend: SttpBackend[Future, Any] = AsyncHttpClientFutureBackend() + private implicit def futureMonad(implicit ec: ExecutionContext): MonadError[Future] = new FutureMonad() + + private implicit def defaultSttpBackend: SttpBackend[Future, Any] = AsyncHttpClientFutureBackend() + + /** Alternative constructor for backwards compatibility. */ + @deprecated("Use the constructor which takes an ElasticNodeEndpoint", "7.3.2") + def apply(clientUri: ElasticsearchClientUri)(implicit ec: ExecutionContext): SttpRequestHttpClient[Future] = { + val endpoint = ElasticNodeEndpoint("http", clientUri.hosts.head._1, clientUri.hosts.head._2, None) + implicit val futureMonad: MonadError[Future] = SttpRequestHttpClient.futureMonad + + new SttpRequestHttpClient[Future](endpoint) + } /** Instantiate an [[SttpRequestHttpClient]] with reasonable defaults for the implicit parameters. */ - def apply(nodeEndpoint: ElasticNodeEndpoint): SttpRequestHttpClient = new SttpRequestHttpClient(nodeEndpoint)( - defaultEc, defaultSttpBackend) + def apply(nodeEndpoint: ElasticNodeEndpoint)(implicit ec: ExecutionContext): SttpRequestHttpClient[Future] = { + new SttpRequestHttpClient(nodeEndpoint) + } } diff --git a/elastic4s-client-sttp/src/test/scala/com/sksamuel/elastic4s/sttp/SttpRequestHttpClientTest.scala b/elastic4s-client-sttp/src/test/scala/com/sksamuel/elastic4s/sttp/SttpRequestHttpClientTest.scala index e75244f52..e66ee3667 100644 --- a/elastic4s-client-sttp/src/test/scala/com/sksamuel/elastic4s/sttp/SttpRequestHttpClientTest.scala +++ b/elastic4s-client-sttp/src/test/scala/com/sksamuel/elastic4s/sttp/SttpRequestHttpClientTest.scala @@ -7,20 +7,18 @@ import org.scalatest.matchers.should.Matchers import java.nio.charset.StandardCharsets import java.util.Base64 +import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future class SttpRequestHttpClientTest extends AnyFlatSpec with Matchers with DockerTests { + implicit val executor: Executor[Future] = (client: HttpClient[Future], request: ElasticRequest) => { + val cred = Base64.getEncoder.encodeToString("user123:pass123".getBytes(StandardCharsets.UTF_8)) + client.send(request.copy(headers = Map("Authorization" -> s"Basic $cred"))) + } private lazy val sttpClient = SttpRequestHttpClient(ElasticNodeEndpoint("http", elasticHost, elasticPort.toInt, None)) - override val client = ElasticClient(sttpClient) + override lazy val client = ElasticClient(sttpClient) "SttpRequestHttpClient" should "propagate headers if included" in { - implicit val executor: Executor[Future] = new Executor[Future] { - override def exec(client: HttpClient, request: ElasticRequest): Future[HttpResponse] = { - val cred = Base64.getEncoder.encodeToString("user123:pass123".getBytes(StandardCharsets.UTF_8)) - Executor.FutureExecutor.exec(client, request.copy(headers = Map("Authorization" -> s"Basic $cred"))) - } - } - client.execute { catHealth() }.await.result.status shouldBe "401" diff --git a/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/ElasticClient.scala b/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/ElasticClient.scala index cf2888cfe..38bb25238 100644 --- a/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/ElasticClient.scala +++ b/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/ElasticClient.scala @@ -1,6 +1,5 @@ package com.sksamuel.elastic4s -import com.fasterxml.jackson.module.scala.JavaTypeable import org.slf4j.{Logger, LoggerFactory} import scala.concurrent.duration.{Duration, _} @@ -16,7 +15,7 @@ import scala.language.higherKinds * * @param client the HTTP client library to use **/ -case class ElasticClient(client: HttpClient) extends AutoCloseable { +case class ElasticClient[F[_] : Executor : Functor](client: HttpClient[F]) { protected val logger: Logger = LoggerFactory.getLogger(getClass.getName) @@ -29,12 +28,7 @@ case class ElasticClient(client: HttpClient) extends AutoCloseable { // Executes the given request type T, and returns an effect of Response[U] // where U is particular to the request type. // For example a search request will return a Response[SearchResponse]. - def execute[T, U, F[_]](t: T)(implicit - executor: Executor[F], - functor: Functor[F], - handler: Handler[T, U], - javaTypeable: JavaTypeable[U], - options: CommonRequestOptions): F[Response[U]] = { + def execute[T, U](t: T)(implicit handler: Handler[T, U], options: CommonRequestOptions): F[Response[U]] = { val request = handler.build(t) val request2 = if (options.timeout.toMillis > 0) { @@ -51,8 +45,8 @@ case class ElasticClient(client: HttpClient) extends AutoCloseable { val request4 = options.headers.foldLeft(request3){ case (acc, (key, value)) => acc.addHeader(key, value) } - val f = executor.exec(client, request4) - functor.map(f) { resp => + val f = Executor[F].exec(client, request4) + Functor[F].map(f) { resp => handler.responseHandler.handle(resp) match { case Right(u) => RequestSuccess(resp.statusCode, resp.entity.map(_.content), resp.headers, u) case Left(error) => RequestFailure(resp.statusCode, resp.entity.map(_.content), resp.headers, error) @@ -61,7 +55,7 @@ case class ElasticClient(client: HttpClient) extends AutoCloseable { } - def close(): Unit = client.close() + def close(): F[Unit] = client.close() } case class CommonRequestOptions(timeout: Duration, masterNodeTimeout: Duration, headers: Map[String, String] = Map.empty) diff --git a/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/Executor.scala b/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/Executor.scala index abf1c6169..23c5a1f39 100644 --- a/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/Executor.scala +++ b/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/Executor.scala @@ -1,26 +1,14 @@ package com.sksamuel.elastic4s -import scala.concurrent.{ExecutionContext, Future, Promise} import scala.language.higherKinds trait Executor[F[_]] { - def exec(client: HttpClient, request: ElasticRequest): F[HttpResponse] + def exec(client: HttpClient[F], request: ElasticRequest): F[HttpResponse] } object Executor { + def apply[F[_]](implicit ev: Executor[F]): Executor[F] = ev - def apply[F[_]: Executor](): Executor[F] = implicitly[Executor[F]] - - implicit def FutureExecutor(implicit ec: ExecutionContext = ExecutionContext.Implicits.global): Executor[Future] = - new Executor[Future] { - override def exec(client: HttpClient, request: ElasticRequest): Future[HttpResponse] = { - val promise = Promise[HttpResponse]() - val callback: Either[Throwable, HttpResponse] => Unit = { - case Left(t) => promise.tryFailure(t) - case Right(r) => promise.trySuccess(r) - } - client.send(request, callback) - promise.future - } - } + implicit def defaultExecutor[F[_]]: Executor[F] = + (client: HttpClient[F], request: ElasticRequest) => client.send(request) } diff --git a/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/Functor.scala b/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/Functor.scala index b496d6f12..abdafa068 100644 --- a/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/Functor.scala +++ b/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/Functor.scala @@ -9,7 +9,7 @@ trait Functor[F[_]] { object Functor { - def apply[F[_]: Functor](): Functor[F] = implicitly[Functor[F]] + def apply[F[_]](implicit f: Functor[F]): Functor[F] = f implicit def FutureFunctor(implicit ec: ExecutionContext = ExecutionContext.Implicits.global): Functor[Future] = new Functor[Future] { diff --git a/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/HttpClient.scala b/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/HttpClient.scala index 127d1e8a6..18f4a5619 100644 --- a/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/HttpClient.scala +++ b/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/HttpClient.scala @@ -2,6 +2,8 @@ package com.sksamuel.elastic4s import org.slf4j.{Logger, LoggerFactory} +import scala.language.higherKinds + /** * A typeclass that an underlying http client can implement, so that it can be used * by the [[ElasticClient]] implementation by elastic4s. @@ -10,7 +12,7 @@ import org.slf4j.{Logger, LoggerFactory} * Akka HTTP client, STTP or whatever can be used with elasticsearch. * The wrapped client can then be passed into the ElasticClient. */ -trait HttpClient extends AutoCloseable { +trait HttpClient[F[_]] { protected val logger: Logger = LoggerFactory.getLogger(getClass.getName) @@ -23,13 +25,14 @@ trait HttpClient extends AutoCloseable { * a response, including 4xx and 5xx responses. The callback function should only be invoked * with an exception if the client could not complete the request. */ - def send(request: ElasticRequest, callback: Either[Throwable, HttpResponse] => Unit): Unit + def send(request: ElasticRequest): F[HttpResponse] /** * Closes the underlying http client. Can be a no-op if the underlying client does not have * state that needs to be closed. */ - def close(): Unit + def close(): F[Unit] + } diff --git a/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/requests/searches/SearchIterator.scala b/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/requests/searches/SearchIterator.scala index a03f7a225..9ef23dde6 100644 --- a/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/requests/searches/SearchIterator.scala +++ b/elastic4s-core/src/main/scala/com/sksamuel/elastic4s/requests/searches/SearchIterator.scala @@ -2,7 +2,7 @@ package com.sksamuel.elastic4s.requests.searches import com.sksamuel.elastic4s.{ElasticClient, HitReader, RequestFailure, RequestSuccess} -import scala.concurrent.Await +import scala.concurrent.{Await, Future} import scala.concurrent.duration.Duration import scala.language.higherKinds @@ -13,16 +13,12 @@ import scala.language.higherKinds * Each time the iterator needs to request more data, the iterator will block until the request * returns. If you require a completely lazy style iterator, consider using reactive streams. */ -trait Awaitable[F[_]] { - def result[U](f: F[U], timeout: Duration): U -} - object SearchIterator { /** * Creates a new Iterator for instances of SearchHit by wrapping the given HTTP client. */ - def hits(client: ElasticClient, searchreq: SearchRequest)(implicit timeout: Duration): Iterator[SearchHit] = + def hits(client: ElasticClient[Future], searchreq: SearchRequest)(implicit timeout: Duration): Iterator[SearchHit] = new Iterator[SearchHit] { require(searchreq.keepAlive.isDefined, "Search request must define keep alive value") @@ -64,7 +60,7 @@ object SearchIterator { * A typeclass HitReader[T] must be provided for marshalling of the search * responses into instances of type T. */ - def iterate[T](client: ElasticClient, searchreq: SearchRequest)(implicit + def iterate[T](client: ElasticClient[Future], searchreq: SearchRequest)(implicit reader: HitReader[T], timeout: Duration): Iterator[T] = hits(client, searchreq).map(_.to[T]) diff --git a/elastic4s-effect-cats-2/src/main/scala/com/sksamuel/elastic4s/cats/effect/CatsEffectExecutor.scala b/elastic4s-effect-cats-2/src/main/scala/com/sksamuel/elastic4s/cats/effect/CatsEffectExecutor.scala deleted file mode 100644 index 90028c564..000000000 --- a/elastic4s-effect-cats-2/src/main/scala/com/sksamuel/elastic4s/cats/effect/CatsEffectExecutor.scala +++ /dev/null @@ -1,12 +0,0 @@ -package com.sksamuel.elastic4s.cats.effect - -import cats.effect.{Async, IO} -import com.sksamuel.elastic4s.{ElasticRequest, Executor, HttpClient, HttpResponse} - -class CatsEffectExecutor[F[_]: Async] extends Executor[F] { - override def exec(client: HttpClient, request: ElasticRequest): F[HttpResponse] = - Async[F].async[HttpResponse](k => client.send(request, k)) -} - -@deprecated("Use CatsEffectExecutor[IO] instead") -class IOExecutor extends CatsEffectExecutor[IO] diff --git a/elastic4s-effect-cats-2/src/main/scala/com/sksamuel/elastic4s/cats/effect/instances/CatsEffectInstances.scala b/elastic4s-effect-cats-2/src/main/scala/com/sksamuel/elastic4s/cats/effect/instances/CatsEffectInstances.scala index 67125cc63..6232730b9 100644 --- a/elastic4s-effect-cats-2/src/main/scala/com/sksamuel/elastic4s/cats/effect/instances/CatsEffectInstances.scala +++ b/elastic4s-effect-cats-2/src/main/scala/com/sksamuel/elastic4s/cats/effect/instances/CatsEffectInstances.scala @@ -1,20 +1,14 @@ package com.sksamuel.elastic4s.cats.effect.instances -import cats.effect.{Async, IO} import cats.{Functor => CatsFunctor} -import com.sksamuel.elastic4s.cats.effect.CatsEffectExecutor -import com.sksamuel.elastic4s.{Executor, Functor} +import com.sksamuel.elastic4s.Functor + +import scala.language.higherKinds trait CatsEffectInstances { implicit def catsFunctor[F[_]: CatsFunctor]: Functor[F] = new Functor[F] { override def map[A, B](fa: F[A])(f: A => B): F[B] = CatsFunctor[F].map(fa)(f) } - - implicit def catsEffectExecutor[F[_]: Async]: Executor[F] = - new CatsEffectExecutor[F] - - //this needs to be at the bottom - implicit val ioExecutor: Executor[IO] = new CatsEffectExecutor[IO] } @deprecated("Use CatsEffectInstances instead") diff --git a/elastic4s-effect-cats-2/src/test/scala/com.sksamuel.elastic4s.cats.effect/CatsEffectTest.scala b/elastic4s-effect-cats-2/src/test/scala/com.sksamuel.elastic4s.cats.effect/CatsEffectTest.scala index 0ab10951b..52f08b64e 100644 --- a/elastic4s-effect-cats-2/src/test/scala/com.sksamuel.elastic4s.cats.effect/CatsEffectTest.scala +++ b/elastic4s-effect-cats-2/src/test/scala/com.sksamuel.elastic4s.cats.effect/CatsEffectTest.scala @@ -1,7 +1,7 @@ package com.sksamuel.elastic4s.cats.effect import cats.data.OptionT -import cats.effect.IO +import cats.effect.{ContextShift, IO} import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties} import com.sksamuel.elastic4s.ElasticDsl._ import org.scalatest.flatspec.AnyFlatSpec @@ -16,32 +16,34 @@ import scala.concurrent.ExecutionContext.Implicits.global class CatsEffectTest extends AnyFlatSpec { type OptionIO[A] = OptionT[IO, A] - val client: ElasticClient = ElasticClient(JavaClient(ElasticProperties("http://dummy"))) + val client: ElasticClient[Future] = ElasticClient(JavaClient(ElasticProperties("http://dummy"))) val index = "index" "ElasticClient#execute" should "compile and infer effect type as `IO`" in { + implicit val cs: ContextShift[IO] = IO.contextShift(global) for { - r1 <- client.execute(createIndex(index)) + r1 <- IO.fromFuture(IO(client.execute(createIndex(index)))) _ <- IO(println(r1)) - r2 <- client.execute(deleteIndex(index)) + r2 <- IO.fromFuture(IO(client.execute(deleteIndex(index)))) _ <- IO(println(r2)) } yield (r1, r2) } - it should "still compile with other Cats `Async` instances with explicit type annotations" in { - for { - r1 <- client.execute[CreateIndexRequest, CreateIndexResponse, OptionIO](createIndex(index)) - _ <- IO(println(r1)).to[OptionIO] - r2 <- client.execute[DeleteIndexRequest, DeleteIndexResponse, OptionIO](deleteIndex(index)) - _ <- IO(println(r2)).to[OptionIO] - } yield (r1, r2) - } + // TODO: for this to work, http4s client needs to be merged which can work with attributary effects + // it should "still compile with other Cats `Async` instances with explicit type annotations" in { + // for { + // r1 <- client.execute[CreateIndexRequest, CreateIndexResponse, OptionIO](createIndex(index)) + // _ <- IO(println(r1)).to[OptionIO] + // r2 <- client.execute[DeleteIndexRequest, DeleteIndexResponse, OptionIO](deleteIndex(index)) + // _ <- IO(println(r2)).to[OptionIO] + // } yield (r1, r2) + // } it should "still compile with `Future` with explicit type annotations" in { for { - r1 <- client.execute[CreateIndexRequest, CreateIndexResponse, Future](createIndex(index)) + r1 <- client.execute(createIndex(index)) _ <- Future(println(r1)) - r2 <- client.execute[DeleteIndexRequest, DeleteIndexResponse, Future](deleteIndex(index)) + r2 <- client.execute(deleteIndex(index)) _ <- Future(println(r2)) } yield (r1, r2) } diff --git a/elastic4s-effect-cats/src/main/scala/com/sksamuel/elastic4s/cats/effect/CatsEffectExecutor.scala b/elastic4s-effect-cats/src/main/scala/com/sksamuel/elastic4s/cats/effect/CatsEffectExecutor.scala deleted file mode 100644 index b2249700b..000000000 --- a/elastic4s-effect-cats/src/main/scala/com/sksamuel/elastic4s/cats/effect/CatsEffectExecutor.scala +++ /dev/null @@ -1,15 +0,0 @@ -package com.sksamuel.elastic4s.cats.effect - -import cats.effect.{Async, IO} -import com.sksamuel.elastic4s.{ElasticRequest, Executor, HttpClient, HttpResponse} - -class CatsEffectExecutor[F[_]: Async] extends Executor[F] { - override def exec(client: HttpClient, request: ElasticRequest): F[HttpResponse] = - Async[F].async[HttpResponse] { k => - client.send(request, k) - Async[F].pure(Some(Async[F].unit)) - } -} - -@deprecated("Use CatsEffectExecutor[IO] instead") -class IOExecutor extends CatsEffectExecutor[IO] diff --git a/elastic4s-effect-cats/src/main/scala/com/sksamuel/elastic4s/cats/effect/instances/CatsEffectInstances.scala b/elastic4s-effect-cats/src/main/scala/com/sksamuel/elastic4s/cats/effect/instances/CatsEffectInstances.scala index 67125cc63..6232730b9 100644 --- a/elastic4s-effect-cats/src/main/scala/com/sksamuel/elastic4s/cats/effect/instances/CatsEffectInstances.scala +++ b/elastic4s-effect-cats/src/main/scala/com/sksamuel/elastic4s/cats/effect/instances/CatsEffectInstances.scala @@ -1,20 +1,14 @@ package com.sksamuel.elastic4s.cats.effect.instances -import cats.effect.{Async, IO} import cats.{Functor => CatsFunctor} -import com.sksamuel.elastic4s.cats.effect.CatsEffectExecutor -import com.sksamuel.elastic4s.{Executor, Functor} +import com.sksamuel.elastic4s.Functor + +import scala.language.higherKinds trait CatsEffectInstances { implicit def catsFunctor[F[_]: CatsFunctor]: Functor[F] = new Functor[F] { override def map[A, B](fa: F[A])(f: A => B): F[B] = CatsFunctor[F].map(fa)(f) } - - implicit def catsEffectExecutor[F[_]: Async]: Executor[F] = - new CatsEffectExecutor[F] - - //this needs to be at the bottom - implicit val ioExecutor: Executor[IO] = new CatsEffectExecutor[IO] } @deprecated("Use CatsEffectInstances instead") diff --git a/elastic4s-effect-cats/src/test/scala/com.sksamuel.elastic4s.cats.effect/CatsEffectTest.scala b/elastic4s-effect-cats/src/test/scala/com.sksamuel.elastic4s.cats.effect/CatsEffectTest.scala index 0ab10951b..36c6b61a4 100644 --- a/elastic4s-effect-cats/src/test/scala/com.sksamuel.elastic4s.cats.effect/CatsEffectTest.scala +++ b/elastic4s-effect-cats/src/test/scala/com.sksamuel.elastic4s.cats.effect/CatsEffectTest.scala @@ -16,32 +16,33 @@ import scala.concurrent.ExecutionContext.Implicits.global class CatsEffectTest extends AnyFlatSpec { type OptionIO[A] = OptionT[IO, A] - val client: ElasticClient = ElasticClient(JavaClient(ElasticProperties("http://dummy"))) + val client: ElasticClient[Future] = ElasticClient(JavaClient(ElasticProperties("http://dummy"))) val index = "index" "ElasticClient#execute" should "compile and infer effect type as `IO`" in { for { - r1 <- client.execute(createIndex(index)) - _ <- IO(println(r1)) - r2 <- client.execute(deleteIndex(index)) - _ <- IO(println(r2)) + r1 <- IO.fromFuture(IO(client.execute(createIndex(index)))) + _ <- IO.println(r1) + r2 <- IO.fromFuture(IO(client.execute(deleteIndex(index)))) + _ <- IO.println(r2) } yield (r1, r2) } - it should "still compile with other Cats `Async` instances with explicit type annotations" in { - for { - r1 <- client.execute[CreateIndexRequest, CreateIndexResponse, OptionIO](createIndex(index)) - _ <- IO(println(r1)).to[OptionIO] - r2 <- client.execute[DeleteIndexRequest, DeleteIndexResponse, OptionIO](deleteIndex(index)) - _ <- IO(println(r2)).to[OptionIO] - } yield (r1, r2) - } + // TODO: for this to work, http4s client needs to be merged which can work with attributary effects + // it should "still compile with other Cats `Async` instances with explicit type annotations" in { + // for { + // r1 <- client.execute[CreateIndexRequest, CreateIndexResponse, OptionIO](createIndex(index)) + // _ <- IO.println(r1).to[OptionIO] + // r2 <- client.execute[DeleteIndexRequest, DeleteIndexResponse, OptionIO](deleteIndex(index)) + // _ <- IO.println(r2).to[OptionIO] + // } yield (r1, r2) + // } it should "still compile with `Future` with explicit type annotations" in { for { - r1 <- client.execute[CreateIndexRequest, CreateIndexResponse, Future](createIndex(index)) + r1 <- client.execute(createIndex(index)) _ <- Future(println(r1)) - r2 <- client.execute[DeleteIndexRequest, DeleteIndexResponse, Future](deleteIndex(index)) + r2 <- client.execute(deleteIndex(index)) _ <- Future(println(r2)) } yield (r1, r2) } diff --git a/elastic4s-effect-monix/src/main/scala/com/sksamuel/elastic4s/monix/TaskExecutor.scala b/elastic4s-effect-monix/src/main/scala/com/sksamuel/elastic4s/monix/TaskExecutor.scala deleted file mode 100644 index e2cf2041e..000000000 --- a/elastic4s-effect-monix/src/main/scala/com/sksamuel/elastic4s/monix/TaskExecutor.scala +++ /dev/null @@ -1,9 +0,0 @@ -package com.sksamuel.elastic4s.monix - -import com.sksamuel.elastic4s.{ElasticRequest, Executor, HttpClient, HttpResponse} -import monix.eval.Task - -class TaskExecutor extends Executor[Task] { - override def exec(client: HttpClient, request: ElasticRequest): Task[HttpResponse] = - Task.async (k => client.send(request, k)) -} diff --git a/elastic4s-effect-monix/src/main/scala/com/sksamuel/elastic4s/monix/instances/TaskInstances.scala b/elastic4s-effect-monix/src/main/scala/com/sksamuel/elastic4s/monix/instances/TaskInstances.scala index 57b184d7b..8161c820a 100644 --- a/elastic4s-effect-monix/src/main/scala/com/sksamuel/elastic4s/monix/instances/TaskInstances.scala +++ b/elastic4s-effect-monix/src/main/scala/com/sksamuel/elastic4s/monix/instances/TaskInstances.scala @@ -1,13 +1,10 @@ package com.sksamuel.elastic4s.monix.instances import com.sksamuel.elastic4s.Functor -import com.sksamuel.elastic4s.monix.TaskExecutor import monix.eval.Task trait TaskInstances { implicit val taskFunctor: Functor[Task] = new Functor[Task] { override def map[A, B](fa: Task[A])(f: A => B): Task[B] = fa.map(f) } - - implicit val taskExecutor: TaskExecutor = new TaskExecutor } diff --git a/elastic4s-effect-scalaz/src/main/scala/com/sksamuel/elastic4s/scalaz/TaskExecutor.scala b/elastic4s-effect-scalaz/src/main/scala/com/sksamuel/elastic4s/scalaz/TaskExecutor.scala deleted file mode 100644 index b35a6794f..000000000 --- a/elastic4s-effect-scalaz/src/main/scala/com/sksamuel/elastic4s/scalaz/TaskExecutor.scala +++ /dev/null @@ -1,14 +0,0 @@ -package com.sksamuel.elastic4s.scalaz - -import com.sksamuel.elastic4s.{ElasticRequest, Executor, HttpClient, HttpResponse} -import scalaz.\/ -import scalaz.concurrent.Task - -class TaskExecutor extends Executor[Task] { - override def exec(client: HttpClient, request: ElasticRequest): Task[HttpResponse] = - Task.async { k => - client.send(request, { j => - k(\/.fromEither(j)) - }) - } -} diff --git a/elastic4s-effect-scalaz/src/main/scala/com/sksamuel/elastic4s/scalaz/instances/TaskInstances.scala b/elastic4s-effect-scalaz/src/main/scala/com/sksamuel/elastic4s/scalaz/instances/TaskInstances.scala index f4026fc89..acd370678 100644 --- a/elastic4s-effect-scalaz/src/main/scala/com/sksamuel/elastic4s/scalaz/instances/TaskInstances.scala +++ b/elastic4s-effect-scalaz/src/main/scala/com/sksamuel/elastic4s/scalaz/instances/TaskInstances.scala @@ -1,13 +1,10 @@ package com.sksamuel.elastic4s.scalaz.instances import com.sksamuel.elastic4s.Functor -import com.sksamuel.elastic4s.scalaz.TaskExecutor import scalaz.concurrent.Task trait TaskInstances { implicit val taskFunctor: Functor[Task] = new Functor[Task] { override def map[A, B](fa: Task[A])(f: A => B): Task[B] = fa.map(f) } - - implicit val taskExecutor: TaskExecutor = new TaskExecutor } diff --git a/elastic4s-effect-zio-1/src/main/scala/com/sksamuel/elastic4s/zio/instances/TaskInstances.scala b/elastic4s-effect-zio-1/src/main/scala/com/sksamuel/elastic4s/zio/instances/TaskInstances.scala index e7d31f526..02438d8d3 100644 --- a/elastic4s-effect-zio-1/src/main/scala/com/sksamuel/elastic4s/zio/instances/TaskInstances.scala +++ b/elastic4s-effect-zio-1/src/main/scala/com/sksamuel/elastic4s/zio/instances/TaskInstances.scala @@ -1,17 +1,10 @@ package com.sksamuel.elastic4s.zio.instances -import com.sksamuel.elastic4s.{ElasticRequest, Executor, Functor, HttpClient, HttpResponse} +import com.sksamuel.elastic4s.Functor import zio.Task trait TaskInstances { implicit val taskFunctor: Functor[Task] = new Functor[Task] { override def map[A, B](fa: Task[A])(f: A => B): Task[B] = fa.map(f) } - - implicit val taskExecutor: Executor[Task] = new Executor[Task] { - override def exec(client: HttpClient, request: ElasticRequest): Task[HttpResponse] = - Task.effectAsyncM { cb => - Task.effect(client.send(request, v => cb(Task.fromEither(v)))) - } - } } diff --git a/elastic4s-effect-zio-1/src/test/scala/com/sksamuel/elastic4s/zio/ZIOTaskTest.scala b/elastic4s-effect-zio-1/src/test/scala/com/sksamuel/elastic4s/zio/ZIOTaskTest.scala index 5e596e3dc..740c4ded1 100644 --- a/elastic4s-effect-zio-1/src/test/scala/com/sksamuel/elastic4s/zio/ZIOTaskTest.scala +++ b/elastic4s-effect-zio-1/src/test/scala/com/sksamuel/elastic4s/zio/ZIOTaskTest.scala @@ -1,13 +1,27 @@ package com.sksamuel.elastic4s.zio +import com.sksamuel.elastic4s.{ElasticClient, ElasticNodeEndpoint} +import com.sksamuel.elastic4s.sttp.SttpRequestHttpClient import com.sksamuel.elastic4s.testkit.DockerTests import com.sksamuel.elastic4s.zio.instances._ import org.scalatest.BeforeAndAfterAll import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers +import sttp.client3.SttpBackend +import sttp.client3.httpclient.zio.HttpClientZioBackend +import sttp.client3.impl.zio.RIOMonadAsyncError +import sttp.monad.MonadError import zio.Task class ZIOTaskTest extends AnyFlatSpec with Matchers with DockerTests with BeforeAndAfterAll { + private implicit val sttpZioMonadError: MonadError[Task] = new RIOMonadAsyncError() + private implicit val sttpBackend: SttpBackend[Task, Any] = + HttpClientZioBackend().unsafeRun.fold(throw _, identity) + + private lazy val sttpClient: SttpRequestHttpClient[Task] = + new SttpRequestHttpClient(ElasticNodeEndpoint("http", elasticHost, elasticPort.toInt, None)) + + val zioClient: ElasticClient[Task] = ElasticClient(sttpClient) implicit class RichZIO[A](task: Task[A]) { def unsafeRun: Either[Throwable, A] = { @@ -16,19 +30,19 @@ class ZIOTaskTest extends AnyFlatSpec with Matchers with DockerTests with Before } override def beforeAll(): Unit = { - client.execute { + zioClient.execute { deleteIndex("testindex") }.unsafeRun } override def afterAll(): Unit = { - client.execute { + zioClient.execute { deleteIndex("testindex") }.unsafeRun } it should "index doc successfully" in { - val r = client.execute { + val r = zioClient.execute { indexInto("testindex").doc("""{ "text":"Buna ziua!" }""") }.unsafeRun r shouldBe Symbol("right") diff --git a/elastic4s-effect-zio/src/main/scala/com/sksamuel/elastic4s/zio/instances/TaskInstances.scala b/elastic4s-effect-zio/src/main/scala/com/sksamuel/elastic4s/zio/instances/TaskInstances.scala index b7daa87d4..02438d8d3 100644 --- a/elastic4s-effect-zio/src/main/scala/com/sksamuel/elastic4s/zio/instances/TaskInstances.scala +++ b/elastic4s-effect-zio/src/main/scala/com/sksamuel/elastic4s/zio/instances/TaskInstances.scala @@ -1,17 +1,10 @@ package com.sksamuel.elastic4s.zio.instances -import com.sksamuel.elastic4s.{ElasticRequest, Executor, Functor, HttpClient, HttpResponse} -import zio.{Task, ZIO} +import com.sksamuel.elastic4s.Functor +import zio.Task trait TaskInstances { implicit val taskFunctor: Functor[Task] = new Functor[Task] { override def map[A, B](fa: Task[A])(f: A => B): Task[B] = fa.map(f) } - - implicit val taskExecutor: Executor[Task] = new Executor[Task] { - override def exec(client: HttpClient, request: ElasticRequest): Task[HttpResponse] = - ZIO.asyncZIO { cb => - ZIO.attempt(client.send(request, v => cb(ZIO.fromEither(v)))) - } - } } diff --git a/elastic4s-effect-zio/src/test/scala/com/sksamuel/elastic4s/zio/ZIOTaskTest.scala b/elastic4s-effect-zio/src/test/scala/com/sksamuel/elastic4s/zio/ZIOTaskTest.scala index 1b1cec7cf..816ddf686 100644 --- a/elastic4s-effect-zio/src/test/scala/com/sksamuel/elastic4s/zio/ZIOTaskTest.scala +++ b/elastic4s-effect-zio/src/test/scala/com/sksamuel/elastic4s/zio/ZIOTaskTest.scala @@ -1,13 +1,27 @@ package com.sksamuel.elastic4s.zio +import com.sksamuel.elastic4s.{ElasticClient, ElasticNodeEndpoint} +import com.sksamuel.elastic4s.sttp.SttpRequestHttpClient import com.sksamuel.elastic4s.testkit.DockerTests import com.sksamuel.elastic4s.zio.instances._ import org.scalatest.BeforeAndAfterAll import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers +import sttp.client3.SttpBackend +import sttp.client3.httpclient.zio.HttpClientZioBackend +import sttp.client3.impl.zio.RIOMonadAsyncError +import sttp.monad.MonadError import zio.{Task, Unsafe} class ZIOTaskTest extends AnyFlatSpec with Matchers with DockerTests with BeforeAndAfterAll { + private implicit val sttpZioMonadError: MonadError[Task] = new RIOMonadAsyncError() + private implicit val sttpBackend: SttpBackend[Task, Any] = + HttpClientZioBackend().unsafeRun.fold(throw _, identity) + + private lazy val sttpClient: SttpRequestHttpClient[Task] = + new SttpRequestHttpClient(ElasticNodeEndpoint("http", elasticHost, elasticPort.toInt, None)) + + val zioClient: ElasticClient[Task] = ElasticClient(sttpClient) implicit class RichZIO[A](task: Task[A]) { def unsafeRun: Either[Throwable, A] = { @@ -18,19 +32,19 @@ class ZIOTaskTest extends AnyFlatSpec with Matchers with DockerTests with Before } override def beforeAll(): Unit = { - client.execute { + zioClient.execute { deleteIndex("testindex") }.unsafeRun } override def afterAll(): Unit = { - client.execute { + zioClient.execute { deleteIndex("testindex") }.unsafeRun } it should "index doc successfully" in { - val r = client.execute { + val r = zioClient.execute { indexInto("testindex").doc("""{ "text":"Buna ziua!" }""") }.unsafeRun r shouldBe Symbol("right") diff --git a/elastic4s-http-streams/src/main/scala/com/sksamuel/elastic4s/streams/BulkIndexingSubscriber.scala b/elastic4s-http-streams/src/main/scala/com/sksamuel/elastic4s/streams/BulkIndexingSubscriber.scala index 00464e049..bbd8dbcf3 100644 --- a/elastic4s-http-streams/src/main/scala/com/sksamuel/elastic4s/streams/BulkIndexingSubscriber.scala +++ b/elastic4s-http-streams/src/main/scala/com/sksamuel/elastic4s/streams/BulkIndexingSubscriber.scala @@ -7,6 +7,7 @@ import com.sksamuel.elastic4s.{ElasticClient, RequestFailure, RequestSuccess} import org.reactivestreams.{Subscriber, Subscription} import scala.collection.mutable.ArrayBuffer +import scala.concurrent.Future import scala.concurrent.duration._ import scala.language.higherKinds import scala.util.{Failure, Success} @@ -24,7 +25,7 @@ import scala.util.{Failure, Success} * @tparam T the type of element provided by the publisher this subscriber will subscribe with */ class BulkIndexingSubscriber[T] private[streams] ( - client: ElasticClient, + client: ElasticClient[Future], builder: RequestBuilder[T], config: SubscriberConfig[T] )(implicit actorRefFactory: ActorRefFactory) @@ -78,7 +79,7 @@ object BulkActor { } -class BulkActor[T](client: ElasticClient, +class BulkActor[T](client: ElasticClient[Future], subscription: Subscription, builder: RequestBuilder[T], config: SubscriberConfig[T]) diff --git a/elastic4s-http-streams/src/main/scala/com/sksamuel/elastic4s/streams/ReactiveElastic.scala b/elastic4s-http-streams/src/main/scala/com/sksamuel/elastic4s/streams/ReactiveElastic.scala index 18351273e..d38613b0a 100644 --- a/elastic4s-http-streams/src/main/scala/com/sksamuel/elastic4s/streams/ReactiveElastic.scala +++ b/elastic4s-http-streams/src/main/scala/com/sksamuel/elastic4s/streams/ReactiveElastic.scala @@ -4,12 +4,13 @@ import akka.actor.ActorRefFactory import com.sksamuel.elastic4s.requests.searches.SearchRequest import com.sksamuel.elastic4s.{ElasticClient, IndexesAndTypes} +import scala.concurrent.Future import scala.concurrent.duration._ import scala.language.implicitConversions object ReactiveElastic { - implicit class ReactiveElastic(client: ElasticClient) { + implicit class ReactiveElastic(client: ElasticClient[Future]) { import com.sksamuel.elastic4s.ElasticDsl._ diff --git a/elastic4s-http-streams/src/main/scala/com/sksamuel/elastic4s/streams/ScrollPublisher.scala b/elastic4s-http-streams/src/main/scala/com/sksamuel/elastic4s/streams/ScrollPublisher.scala index a21e10da0..79379e1fb 100644 --- a/elastic4s-http-streams/src/main/scala/com/sksamuel/elastic4s/streams/ScrollPublisher.scala +++ b/elastic4s-http-streams/src/main/scala/com/sksamuel/elastic4s/streams/ScrollPublisher.scala @@ -9,6 +9,7 @@ import org.reactivestreams.{Publisher, Subscriber, Subscription} import org.slf4j.{Logger, LoggerFactory} import scala.collection.mutable +import scala.concurrent.Future import scala.util.{Failure, Success} /** @@ -21,7 +22,7 @@ import scala.util.{Failure, Success} * @param maxItems the maximum number of elements to return * @param actorRefFactory an Actor reference factory required by the publisher */ -class ScrollPublisher private[streams] (client: ElasticClient, search: SearchRequest, maxItems: Long)( +class ScrollPublisher private[streams] (client: ElasticClient[Future], search: SearchRequest, maxItems: Long)( implicit actorRefFactory: ActorRefFactory ) extends Publisher[SearchHit] { require(search.keepAlive.isDefined, "Search Definition must have a scroll to be used as Publisher") @@ -38,7 +39,7 @@ class ScrollPublisher private[streams] (client: ElasticClient, search: SearchReq } } -class ScrollSubscription(client: ElasticClient, query: SearchRequest, s: Subscriber[_ >: SearchHit], max: Long)( +class ScrollSubscription(client: ElasticClient[Future], query: SearchRequest, s: Subscriber[_ >: SearchHit], max: Long)( implicit actorRefFactory: ActorRefFactory ) extends Subscription { @@ -66,7 +67,7 @@ object PublishActor { case class Request(n: Long) } -class PublishActor(client: ElasticClient, query: SearchRequest, s: Subscriber[_ >: SearchHit], max: Long) +class PublishActor(client: ElasticClient[Future], query: SearchRequest, s: Subscriber[_ >: SearchHit], max: Long) extends Actor with Stash { diff --git a/elastic4s-streams-akka/src/main/scala/com/sksamuel/elastic4s/akka/streams/BatchElasticSink.scala b/elastic4s-streams-akka/src/main/scala/com/sksamuel/elastic4s/akka/streams/BatchElasticSink.scala index fbb10b6e8..fed3e9425 100644 --- a/elastic4s-streams-akka/src/main/scala/com/sksamuel/elastic4s/akka/streams/BatchElasticSink.scala +++ b/elastic4s-streams-akka/src/main/scala/com/sksamuel/elastic4s/akka/streams/BatchElasticSink.scala @@ -11,7 +11,7 @@ import scala.util.{ Failure, Success, Try } case class SinkSettings(refreshAfterOp: Boolean = false) -class BatchElasticSink[T](client: ElasticClient, settings: SinkSettings)(implicit +class BatchElasticSink[T](client: ElasticClient[Future], settings: SinkSettings)(implicit ec: ExecutionContext, builder: RequestBuilder[T]) extends GraphStage[SinkShape[Seq[T]]] { @@ -20,8 +20,6 @@ class BatchElasticSink[T](client: ElasticClient, settings: SinkSettings)(implici override val shape: SinkShape[Seq[T]] = SinkShape.of(in) private implicit val bulkHandler: BulkHandlers.BulkHandler.type = BulkHandlers.BulkHandler - private implicit val executor: Executor[Future] = Executor.FutureExecutor - private implicit val functor: Functor[Future] = Functor.FutureFunctor override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { diff --git a/elastic4s-streams-akka/src/main/scala/com/sksamuel/elastic4s/akka/streams/ElasticSource.scala b/elastic4s-streams-akka/src/main/scala/com/sksamuel/elastic4s/akka/streams/ElasticSource.scala index 4fd986990..29a01a6fe 100644 --- a/elastic4s-streams-akka/src/main/scala/com/sksamuel/elastic4s/akka/streams/ElasticSource.scala +++ b/elastic4s-streams-akka/src/main/scala/com/sksamuel/elastic4s/akka/streams/ElasticSource.scala @@ -18,7 +18,7 @@ import scala.util.{Failure, Success, Try} * @param client a client for the cluster * @param settings settings for how documents are queried */ -class ElasticSource(client: ElasticClient, settings: SourceSettings) +class ElasticSource(client: ElasticClient[Future], settings: SourceSettings) (implicit ec: ExecutionContext) extends GraphStage[SourceShape[SearchHit]] { require(settings.search.keepAlive.isDefined, "The SearchRequest must have a scroll defined (a keep alive time)") @@ -28,8 +28,6 @@ class ElasticSource(client: ElasticClient, settings: SourceSettings) private implicit val searchHandler: Handler[SearchRequest, SearchResponse] = SearchHandlers.SearchHandler private implicit val scrollHandler: Handler[SearchScrollRequest, SearchResponse] = SearchScrollHandlers.SearchScrollHandler private implicit val clearScrollHandler: Handler[ClearScrollRequest, ClearScrollResponse] = SearchScrollHandlers.ClearScrollHandler - private implicit val executor: Executor[Future] = Executor.FutureExecutor - private implicit val functor: Functor[Future] = Functor.FutureFunctor override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler { diff --git a/elastic4s-streams-pekko/src/main/scala/com/sksamuel/elastic4s/pekko/streams/BatchElasticSink.scala b/elastic4s-streams-pekko/src/main/scala/com/sksamuel/elastic4s/pekko/streams/BatchElasticSink.scala index ba0a73f59..366fb8cfa 100644 --- a/elastic4s-streams-pekko/src/main/scala/com/sksamuel/elastic4s/pekko/streams/BatchElasticSink.scala +++ b/elastic4s-streams-pekko/src/main/scala/com/sksamuel/elastic4s/pekko/streams/BatchElasticSink.scala @@ -12,7 +12,7 @@ import scala.util.{Failure, Success, Try} case class SinkSettings(refreshAfterOp: Boolean = false) -class BatchElasticSink[T](client: ElasticClient, settings: SinkSettings)(implicit +class BatchElasticSink[T](client: ElasticClient[Future], settings: SinkSettings)(implicit ec: ExecutionContext, builder: RequestBuilder[T]) extends GraphStage[SinkShape[Seq[T]]] { @@ -21,8 +21,6 @@ class BatchElasticSink[T](client: ElasticClient, settings: SinkSettings)(implici override val shape: SinkShape[Seq[T]] = SinkShape.of(in) private implicit val bulkHandler: BulkHandlers.BulkHandler.type = BulkHandlers.BulkHandler - private implicit val executor: Executor[Future] = Executor.FutureExecutor - private implicit val functor: Functor[Future] = Functor.FutureFunctor override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { diff --git a/elastic4s-streams-pekko/src/main/scala/com/sksamuel/elastic4s/pekko/streams/ElasticSource.scala b/elastic4s-streams-pekko/src/main/scala/com/sksamuel/elastic4s/pekko/streams/ElasticSource.scala index be0d494a0..f036c8c34 100644 --- a/elastic4s-streams-pekko/src/main/scala/com/sksamuel/elastic4s/pekko/streams/ElasticSource.scala +++ b/elastic4s-streams-pekko/src/main/scala/com/sksamuel/elastic4s/pekko/streams/ElasticSource.scala @@ -18,7 +18,7 @@ import scala.util.{Failure, Success, Try} * @param client a client for the cluster * @param settings settings for how documents are queried */ -class ElasticSource(client: ElasticClient, settings: SourceSettings) +class ElasticSource(client: ElasticClient[Future], settings: SourceSettings) (implicit ec: ExecutionContext) extends GraphStage[SourceShape[SearchHit]] { require(settings.search.keepAlive.isDefined, "The SearchRequest must have a scroll defined (a keep alive time)") @@ -28,8 +28,6 @@ class ElasticSource(client: ElasticClient, settings: SourceSettings) private implicit val searchHandler: Handler[SearchRequest, SearchResponse] = SearchHandlers.SearchHandler private implicit val scrollHandler: Handler[SearchScrollRequest, SearchResponse] = SearchScrollHandlers.SearchScrollHandler private implicit val clearScrollHandler: Handler[ClearScrollRequest, ClearScrollResponse] = SearchScrollHandlers.ClearScrollHandler - private implicit val executor: Executor[Future] = Executor.FutureExecutor - private implicit val functor: Functor[Future] = Functor.FutureFunctor override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler { diff --git a/elastic4s-testkit/src/main/scala/com/sksamuel/elastic4s/testkit/ClientProvider.scala b/elastic4s-testkit/src/main/scala/com/sksamuel/elastic4s/testkit/ClientProvider.scala index e616a9f1b..22dc73b0d 100644 --- a/elastic4s-testkit/src/main/scala/com/sksamuel/elastic4s/testkit/ClientProvider.scala +++ b/elastic4s-testkit/src/main/scala/com/sksamuel/elastic4s/testkit/ClientProvider.scala @@ -2,6 +2,8 @@ package com.sksamuel.elastic4s.testkit import com.sksamuel.elastic4s.ElasticClient +import scala.concurrent.Future + trait ClientProvider { - def client: ElasticClient + def client: ElasticClient[Future] } diff --git a/elastic4s-testkit/src/main/scala/com/sksamuel/elastic4s/testkit/DockerTests.scala b/elastic4s-testkit/src/main/scala/com/sksamuel/elastic4s/testkit/DockerTests.scala index 0800722fc..13d14febe 100644 --- a/elastic4s-testkit/src/main/scala/com/sksamuel/elastic4s/testkit/DockerTests.scala +++ b/elastic4s-testkit/src/main/scala/com/sksamuel/elastic4s/testkit/DockerTests.scala @@ -1,8 +1,10 @@ package com.sksamuel.elastic4s.testkit import com.sksamuel.elastic4s.http.JavaClient -import com.sksamuel.elastic4s.{ElasticClient, ElasticDsl, ElasticProperties} +import com.sksamuel.elastic4s.{ElasticClient, ElasticDsl, ElasticProperties, Executor} +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.Future import scala.util.Try trait DockerTests extends ElasticDsl with ClientProvider { @@ -12,7 +14,12 @@ trait DockerTests extends ElasticDsl with ClientProvider { // use obscure ports for the tests to reduce the risk of interfering with existing elastic installations/containers "39227" ) - val client = ElasticClient(JavaClient(ElasticProperties(s"http://$elasticHost:$elasticPort"))) + + def mkJavaBasedClient(implicit executor: Executor[Future]): ElasticClient[Future] = + ElasticClient(JavaClient(ElasticProperties(s"http://$elasticHost:$elasticPort"))) + + // TODO: client is not closed, consider using beforeAll/afterAll to close it + lazy val client: ElasticClient[Future] = mkJavaBasedClient protected def deleteIdx(indexName: String): Unit = { Try { diff --git a/elastic4s-testkit/src/main/scala/com/sksamuel/elastic4s/testkit/IndexMatchers.scala b/elastic4s-testkit/src/main/scala/com/sksamuel/elastic4s/testkit/IndexMatchers.scala index 568ee02c0..a1a2722be 100644 --- a/elastic4s-testkit/src/main/scala/com/sksamuel/elastic4s/testkit/IndexMatchers.scala +++ b/elastic4s-testkit/src/main/scala/com/sksamuel/elastic4s/testkit/IndexMatchers.scala @@ -4,13 +4,15 @@ import com.sksamuel.elastic4s.{ElasticClient, HitReader} import org.scalatest.matchers.should.Matchers import org.scalatest.matchers.{MatchResult, Matcher} +import scala.concurrent.Future + trait IndexMatchers extends Matchers { import com.sksamuel.elastic4s.ElasticDsl._ import scala.concurrent.duration._ - def haveCount(expectedCount: Int)(implicit client: ElasticClient, + def haveCount(expectedCount: Int)(implicit client: ElasticClient[Future], timeout: FiniteDuration = 10.seconds): Matcher[String] = new Matcher[String] { @@ -25,7 +27,7 @@ trait IndexMatchers extends Matchers { } def haveDocument[T](expectedId: String, expectedDocument: T)( - implicit client: ElasticClient, + implicit client: ElasticClient[Future], hitReader: HitReader[T], timeout: FiniteDuration = 10.seconds ): Matcher[String] = { @@ -45,7 +47,7 @@ trait IndexMatchers extends Matchers { } } - def containDoc(expectedId: Any)(implicit client: ElasticClient, + def containDoc(expectedId: Any)(implicit client: ElasticClient[Future], timeout: FiniteDuration = 10.seconds): Matcher[String] = new Matcher[String] { @@ -59,7 +61,7 @@ trait IndexMatchers extends Matchers { } } - def beCreated(implicit client: ElasticClient, timeout: FiniteDuration = 10.seconds): Matcher[String] = + def beCreated(implicit client: ElasticClient[Future], timeout: FiniteDuration = 10.seconds): Matcher[String] = new Matcher[String] { override def apply(left: String): MatchResult = { @@ -72,7 +74,7 @@ trait IndexMatchers extends Matchers { } } - def beEmpty(implicit client: ElasticClient, timeout: FiniteDuration = 10.seconds): Matcher[String] = + def beEmpty(implicit client: ElasticClient[Future], timeout: FiniteDuration = 10.seconds): Matcher[String] = new Matcher[String] { override def apply(left: String): MatchResult = { diff --git a/elastic4s-testkit/src/main/scala/com/sksamuel/elastic4s/testkit/SearchMatchers.scala b/elastic4s-testkit/src/main/scala/com/sksamuel/elastic4s/testkit/SearchMatchers.scala index ead944d99..36d1b7aa1 100644 --- a/elastic4s-testkit/src/main/scala/com/sksamuel/elastic4s/testkit/SearchMatchers.scala +++ b/elastic4s-testkit/src/main/scala/com/sksamuel/elastic4s/testkit/SearchMatchers.scala @@ -5,14 +5,14 @@ import com.sksamuel.elastic4s.requests.searches.SearchRequest import org.scalatest.matchers.should.Matchers import org.scalatest.matchers.{MatchResult, Matcher} +import scala.concurrent.Future import scala.concurrent.duration._ -import scala.language.higherKinds trait SearchMatchers extends Matchers { import com.sksamuel.elastic4s.ElasticDsl._ - def containId(expectedId: Any)(implicit client: ElasticClient, + def containId(expectedId: Any)(implicit client: ElasticClient[Future], timeout: FiniteDuration = 10.seconds): Matcher[SearchRequest] = new Matcher[SearchRequest] { override def apply(left: SearchRequest): MatchResult = { @@ -26,7 +26,7 @@ trait SearchMatchers extends Matchers { } } - def haveFieldValue(value: String)(implicit client: ElasticClient, + def haveFieldValue(value: String)(implicit client: ElasticClient[Future], timeout: FiniteDuration = 10.seconds): Matcher[SearchRequest] = new Matcher[SearchRequest] { override def apply(left: SearchRequest): MatchResult = { @@ -40,7 +40,7 @@ trait SearchMatchers extends Matchers { } } - def haveSourceField(value: String)(implicit client: ElasticClient, + def haveSourceField(value: String)(implicit client: ElasticClient[Future], timeout: FiniteDuration = 10.seconds): Matcher[SearchRequest] = new Matcher[SearchRequest] { override def apply(left: SearchRequest): MatchResult = { @@ -55,7 +55,7 @@ trait SearchMatchers extends Matchers { } def haveSourceFieldValue(field: String, value: String)( - implicit client: ElasticClient, + implicit client: ElasticClient[Future], timeout: FiniteDuration = 10.seconds ): Matcher[SearchRequest] = new Matcher[SearchRequest] { override def apply(left: SearchRequest): MatchResult = { @@ -69,7 +69,7 @@ trait SearchMatchers extends Matchers { } } - def haveTotalHits(expectedCount: Int)(implicit client: ElasticClient, + def haveTotalHits(expectedCount: Int)(implicit client: ElasticClient[Future], timeout: FiniteDuration = 10.seconds): Matcher[SearchRequest] = new Matcher[SearchRequest] { override def apply(left: SearchRequest): MatchResult = { @@ -83,7 +83,7 @@ trait SearchMatchers extends Matchers { } } - def haveHits(expectedCount: Int)(implicit client: ElasticClient, + def haveHits(expectedCount: Int)(implicit client: ElasticClient[Future], timeout: FiniteDuration = 10.seconds): Matcher[SearchRequest] = new Matcher[SearchRequest] { override def apply(left: SearchRequest): MatchResult = { @@ -97,7 +97,7 @@ trait SearchMatchers extends Matchers { } } - def haveNoHits(implicit client: ElasticClient, timeout: FiniteDuration = 10.seconds): Matcher[SearchRequest] = + def haveNoHits(implicit client: ElasticClient[Future], timeout: FiniteDuration = 10.seconds): Matcher[SearchRequest] = new Matcher[SearchRequest] { override def apply(left: SearchRequest): MatchResult = { val resp = client.execute(left).await(timeout).result diff --git a/elastic4s-tests/src/test/scala/com/sksamuel/elastic4s/http/ElasticClientTests.scala b/elastic4s-tests/src/test/scala/com/sksamuel/elastic4s/http/ElasticClientTests.scala index 45f583713..359f1c90d 100644 --- a/elastic4s-tests/src/test/scala/com/sksamuel/elastic4s/http/ElasticClientTests.scala +++ b/elastic4s-tests/src/test/scala/com/sksamuel/elastic4s/http/ElasticClientTests.scala @@ -25,14 +25,12 @@ class ElasticClientTests extends AnyFlatSpec with Matchers with DockerTests { } it should "propagate headers if included" in { - implicit val executor: Executor[Future] = new Executor[Future] { - override def exec(client: HttpClient, request: ElasticRequest): Future[HttpResponse] = { - val cred = Base64.getEncoder.encodeToString("user123:pass123".getBytes(StandardCharsets.UTF_8)) - Executor.FutureExecutor.exec(client, request.copy(headers = Map("Authorization" -> s"Basic $cred"))) - } + implicit val executor: Executor[Future] = (client: HttpClient[Future], request: ElasticRequest) => { + val cred = Base64.getEncoder.encodeToString("user123:pass123".getBytes(StandardCharsets.UTF_8)) + client.send(request.copy(headers = Map("Authorization" -> s"Basic $cred"))) } - client.execute { + mkJavaBasedClient.execute { catHealth() }.await.result.status shouldBe "401" } diff --git a/elastic4s-tests/src/test/scala/com/sksamuel/elastic4s/requests/update/UpdateByQueryTest.scala b/elastic4s-tests/src/test/scala/com/sksamuel/elastic4s/requests/update/UpdateByQueryTest.scala index 8382f2ea9..11c91b21a 100644 --- a/elastic4s-tests/src/test/scala/com/sksamuel/elastic4s/requests/update/UpdateByQueryTest.scala +++ b/elastic4s-tests/src/test/scala/com/sksamuel/elastic4s/requests/update/UpdateByQueryTest.scala @@ -6,7 +6,6 @@ import org.scalatest.OptionValues import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers -import scala.concurrent.ExecutionContext.Implicits.global import scala.util.Try class UpdateByQueryTest diff --git a/project/Dependencies.scala b/project/Dependencies.scala index cb08eeedd..c7f4ff17c 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -61,7 +61,9 @@ object Dependencies { lazy val akkaActor = "com.typesafe.akka" %% "akka-actor" % AkkaVersion lazy val akkaHTTP = "com.typesafe.akka" %% "akka-http" % AkkaHttpVersion lazy val akkaStream = "com.typesafe.akka" %% "akka-stream" % AkkaVersion - lazy val asyncHttpClientBackendFuture = "com.softwaremill.sttp.client3" %% "async-http-client-backend-future" % SttpVersion + lazy val sttpFutureBackend = "com.softwaremill.sttp.client3" %% "async-http-client-backend-future" % SttpVersion + lazy val sttpZioBackend = "com.softwaremill.sttp.client3" %% "zio" % SttpVersion + lazy val sttpZio1Backend = "com.softwaremill.sttp.client3" %% "zio1" % SttpVersion lazy val cats = "org.typelevel" %% "cats-effect" % CatsEffectVersion lazy val cats2 = "org.typelevel" %% "cats-effect" % CatsEffect2Version lazy val elasticsearchRestClient = "org.elasticsearch.client" % "elasticsearch-rest-client" % ElasticsearchVersion