From 8d8d06eef1d4ce0ae22a869cfa3879278c04419f Mon Sep 17 00:00:00 2001 From: Ronnak Saxena Date: Thu, 17 Aug 2023 23:28:12 -0700 Subject: [PATCH 1/2] added response interceptor Signed-off-by: Ronnak Saxena --- .../indexmanagement/IndexManagementPlugin.kt | 5 +- .../rollup/interceptor/ResponseInterceptor.kt | 74 +++++++++++++++++++ 2 files changed, 78 insertions(+), 1 deletion(-) create mode 100644 src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/ResponseInterceptor.kt diff --git a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt index bea66041f..b2d132674 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt @@ -113,6 +113,7 @@ import org.opensearch.indexmanagement.rollup.action.start.TransportStartRollupAc import org.opensearch.indexmanagement.rollup.action.stop.StopRollupAction import org.opensearch.indexmanagement.rollup.action.stop.TransportStopRollupAction import org.opensearch.indexmanagement.rollup.actionfilter.FieldCapsFilter +import org.opensearch.indexmanagement.rollup.interceptor.ResponseInterceptor import org.opensearch.indexmanagement.rollup.interceptor.RollupInterceptor import org.opensearch.indexmanagement.rollup.model.Rollup import org.opensearch.indexmanagement.rollup.model.RollupMetadata @@ -208,6 +209,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin lateinit var clusterService: ClusterService lateinit var indexNameExpressionResolver: IndexNameExpressionResolver lateinit var rollupInterceptor: RollupInterceptor + lateinit var responseInterceptor: ResponseInterceptor lateinit var fieldCapsFilter: FieldCapsFilter lateinit var indexMetadataProvider: IndexMetadataProvider private val indexMetadataServices: MutableList> = mutableListOf() @@ -391,6 +393,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin environment ) rollupInterceptor = RollupInterceptor(clusterService, settings, indexNameExpressionResolver) + responseInterceptor = ResponseInterceptor(clusterService, settings, indexNameExpressionResolver) val jvmService = JvmService(environment.settings()) val transformRunner = TransformRunner.initialize( client, @@ -612,7 +615,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin } override fun getTransportInterceptors(namedWriteableRegistry: NamedWriteableRegistry, threadContext: ThreadContext): List { - return listOf(rollupInterceptor) + return listOf(rollupInterceptor, responseInterceptor) } override fun getActionFilters(): List { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/ResponseInterceptor.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/ResponseInterceptor.kt new file mode 100644 index 000000000..fb0fa0a5d --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/ResponseInterceptor.kt @@ -0,0 +1,74 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.rollup.interceptor + +import org.apache.logging.log4j.LogManager +import org.opensearch.cluster.metadata.IndexNameExpressionResolver +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.settings.Settings +import org.opensearch.transport.TransportInterceptor +import org.opensearch.transport.TransportResponse +import org.opensearch.transport.TransportRequest +import org.opensearch.transport.Transport +import org.opensearch.transport.TransportRequestOptions +import org.opensearch.transport.TransportResponseHandler +import org.opensearch.transport.TransportException + + +class ResponseInterceptor( + val clusterService: ClusterService, + val settings: Settings, + val indexNameExpressionResolver: IndexNameExpressionResolver +) : TransportInterceptor { + private val logger = LogManager.getLogger(javaClass) + + override fun interceptSender(sender: TransportInterceptor.AsyncSender): TransportInterceptor.AsyncSender { + return CustomAsyncSender(sender) + } + + private inner class CustomAsyncSender(private val originalSender: TransportInterceptor.AsyncSender) : TransportInterceptor.AsyncSender { + + override fun sendRequest( + connection: Transport.Connection?, + action: String?, + request: TransportRequest?, + options: TransportRequestOptions?, + handler: TransportResponseHandler? + ) { + val interceptedHandler = CustomResponseHandler(handler) + + originalSender.sendRequest(connection, action, request, options, interceptedHandler) + } + } + + private inner class CustomResponseHandler( + private val originalHandler: TransportResponseHandler? + ) : TransportResponseHandler { + + override fun read(inStream: StreamInput?): T { + val response = originalHandler?.read(inStream) + // Modify the response if necessary + return response!! + } + + override fun handleResponse(response: T?) { + // Handle the response or delegate to the original handler + logger.error("ronsax response interceptoed!! $response") + originalHandler?.handleResponse(response) + } + + override fun handleException(exp: TransportException?) { + // Handle exceptions or delegate to the original handler + originalHandler?.handleException(exp) + } + + override fun executor(): String { + return originalHandler?.executor() ?: "" + } + + } +} \ No newline at end of file From b320b1479fe06d46499fc59ceb05883a3fbc156b Mon Sep 17 00:00:00 2001 From: Ronnak Saxena Date: Thu, 17 Aug 2023 23:44:21 -0700 Subject: [PATCH 2/2] Base case: Query Live and Rollup data with no overlap Signed-off-by: Ronnak Saxena --- .../rollup/interceptor/RollupInterceptor.kt | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt index ffd1e4bd7..1ccdf7995 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt @@ -85,13 +85,20 @@ class RollupInterceptor( val index = request.shardId().indexName val isRollupIndex = isRollupIndex(index, clusterService.state()) if (isRollupIndex) { - if (request.source().size() != 0) { - throw IllegalArgumentException("Rollup search must have size explicitly set to 0, but found ${request.source().size()}") - } +// if (request.source().size() != 0) { +// throw IllegalArgumentException("Rollup search must have size explicitly set to 0, but found ${request.source().size()}") +// } val indices = request.indices().map { it.toString() }.toTypedArray() val concreteIndices = indexNameExpressionResolver .concreteIndexNames(clusterService.state(), request.indicesOptions(), *indices) + val concreteRolledIndexNames = mutableListOf() + for (indexName in concreteIndices) { + if (isRollupIndex(indexName, clusterService.state())) { + concreteRolledIndexNames.add(indexName) + } + } + val filteredConcreteIndices = concreteRolledIndexNames.toTypedArray() // To extract fields from QueryStringQueryBuilder we need concrete source index name. val rollupJob = clusterService.state().metadata.index(index).getRollupJobs()?.get(0) ?: throw IllegalArgumentException("No rollup job associated with target_index") @@ -102,7 +109,7 @@ class RollupInterceptor( val aggregationFieldMappings = getAggregationMetadata(request.source().aggregations()?.aggregatorFactories) val fieldMappings = queryFieldMappings + aggregationFieldMappings - val allMatchingRollupJobs = validateIndicies(concreteIndices, fieldMappings) + val allMatchingRollupJobs = validateIndicies(filteredConcreteIndices, fieldMappings) // only rebuild if there is necessity to rebuild if (fieldMappings.isNotEmpty()) { @@ -142,7 +149,7 @@ class RollupInterceptor( var allMatchingRollupJobs: Map> = mapOf() for (concreteIndex in concreteIndices) { val rollupJobs = clusterService.state().metadata.index(concreteIndex).getRollupJobs() - ?: throw IllegalArgumentException("Not all indices have rollup job") + ?: throw IllegalArgumentException("Not all indices have rollup job, missing on $concreteIndex") val (matchingRollupJobs, issues) = findMatchingRollupJobs(fieldMappings, rollupJobs) if (issues.isNotEmpty() || matchingRollupJobs.isEmpty()) {