Skip to content

Commit

Permalink
Add primary first preference to all search requests (#912)
Browse files Browse the repository at this point in the history
  • Loading branch information
bowenlan-amzn authored Sep 1, 2023
1 parent a0f611d commit f62f0c2
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.opensearch.cluster.ClusterState
import org.opensearch.cluster.ClusterStateListener
import org.opensearch.cluster.block.ClusterBlockException
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.routing.Preference
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.lifecycle.LifecycleListener
import org.opensearch.common.regex.Regex
Expand Down Expand Up @@ -427,6 +428,7 @@ class ManagedIndexCoordinator(
).size(MAX_HITS)
)
.indices(INDEX_MANAGEMENT_INDEX)
.preference(Preference.PRIMARY_FIRST.type())

return try {
val response: SearchResponse = client.suspendUntil { search(searchRequest, it) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.opensearch.action.support.HandledTransportAction
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.client.node.NodeClient
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.routing.Preference
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.inject.Inject
import org.opensearch.common.util.concurrent.ThreadContext
Expand Down Expand Up @@ -177,6 +178,7 @@ class TransportExplainAction @Inject constructor(
return SearchRequest()
.indices(INDEX_MANAGEMENT_INDEX)
.source(searchSourceBuilder)
.preference(Preference.PRIMARY_FIRST.type())
}

private fun searchForMetadata(searchRequest: SearchRequest) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.HandledTransportAction
import org.opensearch.client.Client
import org.opensearch.cluster.routing.Preference
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.inject.Inject
import org.opensearch.common.settings.Settings
Expand Down Expand Up @@ -90,6 +91,7 @@ class TransportGetPoliciesAction @Inject constructor(
val searchRequest = SearchRequest()
.source(searchSourceBuilder)
.indices(INDEX_MANAGEMENT_INDEX)
.preference(Preference.PRIMARY_FIRST.type())

client.threadPool().threadContext.stashContext().use {
client.search(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.opensearch.action.support.HandledTransportAction
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.client.node.NodeClient
import org.opensearch.cluster.metadata.AutoExpandReplicas
import org.opensearch.cluster.routing.Preference
import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.ValidationException
Expand Down Expand Up @@ -180,6 +181,7 @@ class TransportIndexPolicyAction @Inject constructor(
).size(MAX_HITS)
)
.indices(IndexManagementPlugin.INDEX_MANAGEMENT_INDEX)
.preference(Preference.PRIMARY_FIRST.type())

client.search(
searchRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.opensearch.action.support.WriteRequest
import org.opensearch.action.update.UpdateRequest
// import org.opensearch.alerting.destination.message.BaseMessage
import org.opensearch.client.Client
import org.opensearch.cluster.routing.Preference
import org.opensearch.core.common.unit.ByteSizeValue
import org.opensearch.common.unit.TimeValue
import org.opensearch.common.xcontent.LoggingDeprecationHandler
Expand Down Expand Up @@ -199,6 +200,7 @@ fun getSweptManagedIndexSearchRequest(scroll: Boolean = false, size: Int = Manag
.fetchSource(emptyArray(), emptyArray())
.query(boolQueryBuilder)
)
.preference(Preference.PRIMARY_FIRST.type())
if (scroll) req.scroll(TimeValue.timeValueMinutes(1))
return req
}
Expand Down

0 comments on commit f62f0c2

Please sign in to comment.