Skip to content

Commit

Permalink
added helper function to rewrite request
Browse files Browse the repository at this point in the history
Signed-off-by: Ronnak Saxena <[email protected]>
  • Loading branch information
ronnaksaxena committed Aug 9, 2023
1 parent 4de0392 commit 24fcda0
Showing 1 changed file with 62 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.opensearch.index.query.QueryStringQueryBuilder
import org.opensearch.index.query.RangeQueryBuilder
import org.opensearch.index.query.TermQueryBuilder
import org.opensearch.index.query.TermsQueryBuilder
import org.opensearch.index.query.QueryBuilders
import org.opensearch.index.search.MatchQuery
import org.opensearch.indexmanagement.common.model.dimension.Dimension
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
Expand All @@ -52,12 +53,14 @@ import org.opensearch.search.aggregations.metrics.MaxAggregationBuilder
import org.opensearch.search.aggregations.metrics.MinAggregationBuilder
import org.opensearch.search.aggregations.metrics.SumAggregationBuilder
import org.opensearch.search.aggregations.metrics.ValueCountAggregationBuilder
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.search.internal.ShardSearchRequest
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportChannel
import org.opensearch.transport.TransportInterceptor
import org.opensearch.transport.TransportRequest
import org.opensearch.transport.TransportRequestHandler
import java.util.Date

private val logger = LogManager.getLogger(RollupInterceptor::class.java)
class RollupInterceptor(
Expand Down Expand Up @@ -87,6 +90,53 @@ class RollupInterceptor(
val response: SearchResponse = client.suspendUntil { search(searchRequest, it) }
logger.error("ronsax response is $response")
}
/*
Returns tuple (isOverlap, min(Live Data Date)) to filter out overlapping data in rollup query
*/
suspend fun isOverlap(rollupIndex: String, liveIndex: String, job: Rollup): Pair<Boolean, Date> {
/*
1. Get the date_histogram source_field & target field from the rollup job
2. Query rollup index to find the maxRolledDate
3. Query live index to find the minLiveDate
4. Check for intersection and return values
*/
var dateSourceField: String = ""
var dateTargetField: String = ""
for (dim in job.dimensions) {
if (dim.type == Dimension.Type.DATE_HISTOGRAM) {
dateSourceField = dim.sourceField
dateTargetField = dim.targetField
break
}
}
/*
Query to find minLiveDate
{
"size": 0,
"aggs": {
"minDate": {
"min": {
"date_histogram": "tpep_pickup_datetime"
}
}
}
}
*/
val queryJson ="{\"size\":0,\"aggs\":{\"minDate\":{\"min\":{\"field\":$dateSourceField}}}}"
val searchSourceBuilder = SearchSourceBuilder()
.query(QueryBuilders.wrapperQuery(queryJson))

val minLiveDateRequest = SearchRequest()
.source(searchSourceBuilder)
.indices(liveIndex)

val minLiveDateResponse: SearchResponse = client.suspendUntil { search(minLiveDateRequest, it) }
logger.error("ronsax: aggs are ${minLiveDateResponse.aggregations}")
var minLiveDate = Date()
var maxRolledDate = Date()
logger.error("ronsax in helper $rollupIndex $dateTargetField $minLiveDate $maxRolledDate")
return Pair(false, Date())
}

@Suppress("SpreadOperator")
override fun <T : TransportRequest> interceptHandler(
Expand All @@ -103,22 +153,30 @@ class RollupInterceptor(
// if (request.source().size() != 0) {
// throw IllegalArgumentException("Rollup search must have size explicitly set to 0, but found ${request.source().size()}")
// }
CoroutineScope(Dispatchers.IO).launch {
logIndex("nyc-taxi-data")
}
val indices = request.indices().map { it.toString() }.toTypedArray()
val concreteIndices = indexNameExpressionResolver
.concreteIndexNames(clusterService.state(), request.indicesOptions(), *indices)
val concreteRolledIndexNames = mutableListOf<String>()
var liveIndexTest = ""
var rollupIndexTest = ""
for (indexName in concreteIndices) {
if (isRollupIndex(indexName, clusterService.state())) {
concreteRolledIndexNames.add(indexName)
liveIndexTest = indexName
}
else {
rollupIndexTest = indexName
}
}
val filteredConcreteIndices = concreteRolledIndexNames.toTypedArray()
// To extract fields from QueryStringQueryBuilder we need concrete source index name.
val rollupJob = clusterService.state().metadata.index(index).getRollupJobs()?.get(0)
?: throw IllegalArgumentException("No rollup job associated with target_index")
CoroutineScope(Dispatchers.IO).launch {
val (isOverlap, minLiveDataDate) = isOverlap(rollupIndexTest, liveIndexTest, rollupJob)
logger.error("ronsax in intercept $isOverlap $minLiveDataDate")
}
// To extract fields from QueryStringQueryBuilder we need concrete source index name.
// TODO add prefix to query if there is overlap
val queryFieldMappings = getQueryMetadata(
request.source().query(),
getConcreteSourceIndex(rollupJob.sourceIndex, indexNameExpressionResolver, clusterService.state())
Expand Down

0 comments on commit 24fcda0

Please sign in to comment.