Skip to content

Commit

Permalink
Support versionType and pipeline parameters in reindex request (#3087)
Browse files Browse the repository at this point in the history
* Add "externalgt" and "external_gt" cases

* Support versionType parameter in reindex request

* Support pipeline parameter in reindex request
  • Loading branch information
Philippus committed Jun 17, 2024
1 parent e8ef111 commit a20926f
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.sksamuel.elastic4s.requests.reindex

import com.sksamuel.elastic4s.handlers.reindex.ReindexBuilderFn
import com.sksamuel.elastic4s.requests.common.VersionType.ExternalGte
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers

class ReindexBuilderFnTest extends AnyFunSuite with Matchers {
import com.sksamuel.elastic4s.ElasticApi._

test("reindex content builder should support version type") {
val req = reindex("source", "target").versionType(ExternalGte)

ReindexBuilderFn(req).string shouldBe
"""{"source":{"index":["source"]},"dest":{"index":"target","version_type":"external_gte"}}""".stripMargin
}

test("reindex content builder should support pipeline") {
val req = reindex("source", "target").pipeline("my_pipeline")

ReindexBuilderFn(req).string shouldBe
"""{"source":{"index":["source"]},"dest":{"index":"target","pipeline":"my_pipeline"}}""".stripMargin
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ sealed trait VersionType
object VersionType {

def valueOf(str: String): VersionType = str.toLowerCase match {
case "external" => VersionType.External
case "externalgte" | "external_gte" => VersionType.ExternalGte
case _ => VersionType.Internal
case "external" | "externalgt" | "external_gt" => VersionType.External
case "externalgte" | "external_gte" => VersionType.ExternalGte
case _ => VersionType.Internal
}

case object External extends VersionType
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package com.sksamuel.elastic4s.requests.reindex

import com.sksamuel.elastic4s.ext.OptionImplicits._
import com.sksamuel.elastic4s.requests.common.{RefreshPolicy, Slice}
import com.sksamuel.elastic4s.requests.common.{RefreshPolicy, Slice, VersionType}
import com.sksamuel.elastic4s.requests.script.Script
import com.sksamuel.elastic4s.requests.searches.queries.Query
import com.sksamuel.elastic4s.{Index, Indexes}

import scala.concurrent.duration.FiniteDuration

case class ReindexRequest(sourceIndexes: Indexes,
Expand All @@ -30,7 +29,9 @@ case class ReindexRequest(sourceIndexes: Indexes,
size: Option[Int] = None,
createOnly: Option[Boolean] = None,
slices: Option[Int] = None,
slice: Option[Slice] = None) {
slice: Option[Slice] = None,
versionType: Option[VersionType] = None,
pipeline: Option[String] = None) {

def remote(uri: String): ReindexRequest = copy(remoteHost = Option(uri))
def remote(uri: String, user: String, pass: String): ReindexRequest =
Expand Down Expand Up @@ -73,4 +74,9 @@ case class ReindexRequest(sourceIndexes: Indexes,
def createOnly(createOnly: Boolean): ReindexRequest = copy(createOnly = createOnly.some)
def slice(slice: Slice): ReindexRequest = copy(slice = slice.some)
def slices(slices: Int): ReindexRequest = copy(slices = slices.some)

def versionType(versionType: String): ReindexRequest = this.versionType(VersionType.valueOf(versionType))
def versionType(versionType: VersionType): ReindexRequest = copy(versionType = versionType.some)

def pipeline(pipeline: String): ReindexRequest = copy(pipeline = pipeline.some)
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,15 @@ object ReindexBuilderFn {
builder.startObject("dest")
builder.field("index", request.targetIndex.name)

request.versionType.foreach(versionType => builder.field("version_type", handlers.VersionTypeHttpString(versionType)))

request.createOnly.foreach {
case true => builder.field("op_type", "create")
case false => builder.field("op_type", "index")
}

request.pipeline.foreach(builder.field("pipeline", _))

// end dest
builder.endObject()
}
Expand Down

0 comments on commit a20926f

Please sign in to comment.