Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport][2.x]Add ignoreFailure and pipelineContext (#152) #158

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public List<SearchExtSpec<?>> getSearchExts() {
}

@Override
public Map<String, Processor.Factory<SearchResponseProcessor>> getResponseProcessors(Processor.Parameters parameters) {
public Map<String, Processor.Factory<SearchResponseProcessor>> getResponseProcessors(Parameters parameters) {
return Map.of(PersonalizeRankingResponseProcessor.TYPE, new PersonalizeRankingResponseProcessor.Factory(PersonalizeClientSettings.getClientSettings(parameters.env.settings())),
KendraRankingResponseProcessor.TYPE, new KendraRankingResponseProcessor.Factory(this.kendraClientSettings));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,30 @@

public class ConfigurationUtils {

/**
* Get result transformer configurations from Search Request
* @param settings all index settings configured for this plugin
* @return ordered and validated list of result transformers, empty list if not specified
*/
public static List<ResultTransformerConfiguration> getResultTransformersFromIndexConfiguration(Settings settings,
Map<String, ResultTransformer> resultTransformerMap) {
List<ResultTransformerConfiguration> indexLevelConfigs = new ArrayList<>();
/**
* Get result transformer configurations from Search Request
*
* @param settings all index settings configured for this plugin
* @param resultTransformerMap map of transformed results
* @return ordered and validated list of result transformers, empty list if not specified
*/
public static List<ResultTransformerConfiguration> getResultTransformersFromIndexConfiguration(Settings settings,
Map<String, ResultTransformer> resultTransformerMap) {
List<ResultTransformerConfiguration> indexLevelConfigs = new ArrayList<>();

if (settings != null) {
if (settings.getGroups(RESULT_TRANSFORMER_SETTING_PREFIX) != null) {
for (Map.Entry<String, Settings> transformerSettings : settings.getGroups(RESULT_TRANSFORMER_SETTING_PREFIX).entrySet()) {
if (resultTransformerMap.containsKey(transformerSettings.getKey())) {
ResultTransformer transformer = resultTransformerMap.get(transformerSettings.getKey());
indexLevelConfigs.add(transformer.getConfigurationFactory().configure(transformerSettings.getValue()));
if (settings != null) {
if (settings.getGroups(RESULT_TRANSFORMER_SETTING_PREFIX) != null) {
for (Map.Entry<String, Settings> transformerSettings : settings.getGroups(RESULT_TRANSFORMER_SETTING_PREFIX).entrySet()) {
if (resultTransformerMap.containsKey(transformerSettings.getKey())) {
ResultTransformer transformer = resultTransformerMap.get(transformerSettings.getKey());
indexLevelConfigs.add(transformer.getConfigurationFactory().configure(transformerSettings.getValue()));
}
}
}
}
}

return reorderAndValidateConfigs(indexLevelConfigs);
}
return reorderAndValidateConfigs(indexLevelConfigs);
}

/**
* Get result transformer configurations from Search Request
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.search.SearchHits;
import org.opensearch.search.aggregations.InternalAggregations;
import org.opensearch.search.internal.InternalSearchResponse;
import org.opensearch.search.pipeline.AbstractProcessor;
import org.opensearch.search.pipeline.Processor;
import org.opensearch.search.pipeline.SearchResponseProcessor;
import org.opensearch.search.profile.SearchProfileShardResults;
Expand All @@ -35,7 +36,7 @@
/**
* This is a {@link SearchResponseProcessor} that applies kendra intelligence ranking
*/
public class KendraRankingResponseProcessor implements SearchResponseProcessor {
public class KendraRankingResponseProcessor extends AbstractProcessor implements SearchResponseProcessor {
/**
* key to reference this processor type from a search pipeline
*/
Expand All @@ -54,13 +55,14 @@ public class KendraRankingResponseProcessor implements SearchResponseProcessor {
*
* @param tag processor tag
* @param description processor description
* @param ignoreFailure processor ignoreFailure config
* @param titleField titleField applied to kendra re-ranking
* @param bodyField bodyField applied to kendra re-ranking
* @param inputDocLimit docLimit applied to kendra re-ranking
* @param kendraClient kendraClient to connect with kendra
*/
public KendraRankingResponseProcessor(String tag, String description, List<String> titleField, List<String> bodyField, Integer inputDocLimit, KendraHttpClient kendraClient) {
super();
public KendraRankingResponseProcessor(String tag, String description, boolean ignoreFailure, List<String> titleField, List<String> bodyField, Integer inputDocLimit, KendraHttpClient kendraClient) {
super(tag, description, ignoreFailure);
this.titleField = titleField;
this.bodyField = bodyField;
this.tag = tag;
Expand Down Expand Up @@ -99,6 +101,7 @@ public String getDescription() {
return description;
}


/**
* Transform the response hit and apply kendra re-ranking logic
*/
Expand Down Expand Up @@ -156,7 +159,9 @@ public KendraRankingResponseProcessor create(
Map<String, Processor.Factory<SearchResponseProcessor>> processorFactories,
String tag,
String description,
Map<String, Object> config
boolean ignoreFailure,
Map<String, Object> config,
PipelineContext pipelineContext
) throws Exception {
List<String> titleField = Collections.singletonList(ConfigurationUtils.readOptionalStringProperty(TYPE, tag, config, "title_field"));
List<String> bodyField = Collections.singletonList(ConfigurationUtils.readStringProperty(TYPE, tag, config, "body_field"));
Expand All @@ -168,7 +173,7 @@ public KendraRankingResponseProcessor create(
} else {
docLimit = Integer.parseInt(inputDocLimit);
}
return new KendraRankingResponseProcessor(tag, description, titleField, bodyField, docLimit, kendraClient);
return new KendraRankingResponseProcessor(tag, description, ignoreFailure, titleField, bodyField, docLimit, kendraClient);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.search.SearchHits;
import org.opensearch.search.aggregations.InternalAggregations;
import org.opensearch.search.internal.InternalSearchResponse;
import org.opensearch.search.pipeline.AbstractProcessor;
import org.opensearch.search.pipeline.Processor;
import org.opensearch.search.pipeline.SearchResponseProcessor;
import org.opensearch.search.profile.SearchProfileShardResults;
Expand All @@ -36,7 +37,7 @@
/**
* This is a {@link SearchResponseProcessor} that applies Personalized intelligent ranking
*/
public class PersonalizeRankingResponseProcessor implements SearchResponseProcessor {
public class PersonalizeRankingResponseProcessor extends AbstractProcessor implements SearchResponseProcessor {

private static final Logger logger = LogManager.getLogger(PersonalizeRankingResponseProcessor.class);

Expand All @@ -51,14 +52,16 @@ public class PersonalizeRankingResponseProcessor implements SearchResponseProces
*
* @param tag processor tag
* @param description processor description
* @param ignoreFailure processor ignoreFailure config
* @param rankerConfig personalize ranker config
* @param client personalize client
*/
public PersonalizeRankingResponseProcessor(String tag,
String description,
boolean ignoreFailure,
PersonalizeIntelligentRankerConfiguration rankerConfig,
PersonalizeClient client) {
super();
super(tag, description, ignoreFailure);
this.tag = tag;
this.description = description;
this.rankerConfig = rankerConfig;
Expand Down Expand Up @@ -150,7 +153,7 @@ public Factory(PersonalizeClientSettings settings) {
}

@Override
public PersonalizeRankingResponseProcessor create(Map<String, Processor.Factory<SearchResponseProcessor>> processorFactories, String tag, String description, Map<String, Object> config) throws Exception {
public PersonalizeRankingResponseProcessor create(Map<String, Processor.Factory<SearchResponseProcessor>> processorFactories, String tag, String description, boolean ignoreFailure, Map<String, Object> config, PipelineContext pipelineContext) throws Exception {
String personalizeCampaign = ConfigurationUtils.readStringProperty(TYPE, tag, config, CAMPAIGN_ARN_CONFIG_NAME);
String iamRoleArn = ConfigurationUtils.readOptionalStringProperty(TYPE, tag, config, IAM_ROLE_ARN_CONFIG_NAME);
String recipe = ConfigurationUtils.readStringProperty(TYPE, tag, config, RECIPE_CONFIG_NAME);
Expand All @@ -162,7 +165,7 @@ public PersonalizeRankingResponseProcessor create(Map<String, Processor.Factory<
new PersonalizeIntelligentRankerConfiguration(personalizeCampaign, iamRoleArn, recipe, itemIdField, awsRegion, weight);
AWSCredentialsProvider credentialsProvider = PersonalizeCredentialsProviderFactory.getCredentialsProvider(personalizeClientSettings, iamRoleArn, awsRegion);
PersonalizeClient personalizeClient = clientBuilder.apply(credentialsProvider, awsRegion);
return new PersonalizeRankingResponseProcessor(tag, description, rankerConfig, personalizeClient);
return new PersonalizeRankingResponseProcessor(tag, description, ignoreFailure, rankerConfig, personalizeClient);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ public void testFactory() throws Exception {
Collections.emptyMap(),
null,
null,
Collections.emptyMap()
false,
Collections.emptyMap(),
null
));

//test create with all fields
Expand All @@ -76,15 +78,15 @@ public void testFactory() throws Exception {
configuration.put("title_field","field");
configuration.put("body_field","body");
configuration.put("doc_limit","500");
KendraRankingResponseProcessor processorWithAllFields = factory.create(Collections.emptyMap(),"tmp0","testingAllFields", configuration);
KendraRankingResponseProcessor processorWithAllFields = factory.create(Collections.emptyMap(),"tmp0","testingAllFields", false, configuration,null);
assertEquals(TYPE, processorWithAllFields.getType());
assertEquals("tmp0", processorWithAllFields.getTag());
assertEquals("testingAllFields", processorWithAllFields.getDescription());

//test create with required field
Map<String,Object> shortConfiguration = new HashMap<>();
shortConfiguration.put("body_field","body");
KendraRankingResponseProcessor processorWithOneFields = factory.create(Collections.emptyMap(),"tmp1","testingBodyField", shortConfiguration);
KendraRankingResponseProcessor processorWithOneFields = factory.create(Collections.emptyMap(),"tmp1","testingBodyField", false, shortConfiguration, null);
assertEquals(TYPE, processorWithOneFields.getType());
assertEquals("tmp1", processorWithOneFields.getTag());
assertEquals("testingBodyField", processorWithOneFields.getDescription());
Expand All @@ -93,7 +95,7 @@ public void testFactory() throws Exception {
Map<String,Object> nullDocLimitConfiguration = new HashMap<>();
nullDocLimitConfiguration.put("body_field","body");
nullDocLimitConfiguration.put("doc_limit",null);
KendraRankingResponseProcessor processorWithNullDocLimit = factory.create(Collections.emptyMap(),"tmp2","testingNullDocLimit", nullDocLimitConfiguration);
KendraRankingResponseProcessor processorWithNullDocLimit = factory.create(Collections.emptyMap(),"tmp2","testingNullDocLimit", false, nullDocLimitConfiguration, null );
assertEquals(TYPE, processorWithNullDocLimit.getType());
assertEquals("tmp2", processorWithNullDocLimit.getTag());
assertEquals("testingNullDocLimit", processorWithNullDocLimit.getDescription());
Expand All @@ -102,7 +104,7 @@ public void testFactory() throws Exception {
Map<String,Object> nullTitleConfiguration = new HashMap<>();
nullTitleConfiguration.put("body_field","body");
nullTitleConfiguration.put("title_field",null);
KendraRankingResponseProcessor processorWithNullTitleField = factory.create(Collections.emptyMap(),"tmp3","testingNullTitleField", nullTitleConfiguration);
KendraRankingResponseProcessor processorWithNullTitleField = factory.create(Collections.emptyMap(),"tmp3","testingNullTitleField", false, nullTitleConfiguration, null);
assertEquals(TYPE, processorWithNullTitleField.getType());
assertEquals("tmp3", processorWithNullTitleField.getTag());
assertEquals("testingNullTitleField", processorWithNullTitleField.getDescription());
Expand All @@ -116,18 +118,18 @@ public void testRankingResponse() throws Exception {
bodyField.add("body");

//test response with titleField, bodyField and docLimit
KendraRankingResponseProcessor processorWtOptionalConfig = new KendraRankingResponseProcessor(null,null,titleField,bodyField,500,kendraClient);
KendraRankingResponseProcessor processorWtOptionalConfig = new KendraRankingResponseProcessor(null,null,false, titleField,bodyField,500,kendraClient);
int size = 5;
SearchResponse reRankedResponse0 = processorWtOptionalConfig.processResponse(createRequest(),createResponse(size));
assertEquals(size,reRankedResponse0.getHits().getHits().length);

//test response with null doc limit
KendraRankingResponseProcessor processorWtTwoConfig = new KendraRankingResponseProcessor(null,null,titleField,bodyField,null,kendraClient);
KendraRankingResponseProcessor processorWtTwoConfig = new KendraRankingResponseProcessor(null,null,false, titleField,bodyField,null,kendraClient);
SearchResponse reRankedResponse1 = processorWtTwoConfig.processResponse(createRequest(),createResponse(size));
assertEquals(size,reRankedResponse1.getHits().getHits().length);

//test response with null doc limit and null title field
KendraRankingResponseProcessor processorWtOneConfig = new KendraRankingResponseProcessor(null,null,null,bodyField,null,kendraClient);
KendraRankingResponseProcessor processorWtOneConfig = new KendraRankingResponseProcessor(null,null,false,null,bodyField,null,kendraClient);
SearchResponse reRankedResponse2 = processorWtOneConfig.processResponse(createRequest(),createResponse(size));
assertEquals(size,reRankedResponse2.getHits().getHits().length);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ public void testCreateFactoryThrowsExceptionWithEmptyConfig() {
Collections.emptyMap(),
null,
null,
Collections.emptyMap()
false,
Collections.emptyMap(),
null
));
}

Expand All @@ -72,7 +74,7 @@ public void testCreateFactoryWithAllPersonalizeConfig() throws Exception {
configuration.put("aws_region", region);

PersonalizeRankingResponseProcessor personalizeResponseProcessor =
factory.create(Collections.emptyMap(), "testTag", "testingAllFields", configuration);
factory.create(Collections.emptyMap(), "testTag", "testingAllFields", false, configuration, null);

assertEquals(TYPE, personalizeResponseProcessor.getType());
assertEquals("testTag", personalizeResponseProcessor.getTag());
Expand All @@ -94,7 +96,7 @@ public void testProcessorWithNoHits() throws Exception {
configuration.put("aws_region", region);

PersonalizeRankingResponseProcessor personalizeResponseProcessor =
factory.create(Collections.emptyMap(), "testTag", "testingAllFields", configuration);
factory.create(Collections.emptyMap(), "testTag", "testingAllFields", false, configuration, null);
SearchRequest searchRequest = new SearchRequest();
SearchHits hits = new SearchHits(new SearchHit[0], new TotalHits(0, TotalHits.Relation.EQUAL_TO), 0.0f);
SearchResponseSections searchResponseSections = new SearchResponseSections(hits, null, null, false, false, null, 0);
Expand All @@ -118,7 +120,7 @@ public void testProcessorWithHits() throws Exception {
configuration.put("aws_region", region);

PersonalizeRankingResponseProcessor personalizeResponseProcessor =
factory.create(Collections.emptyMap(), "testTag", "testingAllFields", configuration);
factory.create(Collections.emptyMap(), "testTag", "testingAllFields", false, configuration, null);
SearchRequest searchRequest = new SearchRequest();
SearchHit[] searchHits = new SearchHit[10];
for (int i = 0; i < searchHits.length; i++) {
Expand Down Expand Up @@ -147,7 +149,7 @@ public void testProcessorWithHitsAndSearchProcessorExt() throws Exception {
configuration.put("aws_region", region);

PersonalizeRankingResponseProcessor personalizeResponseProcessor =
factory.create(Collections.emptyMap(), "testTag", "testingAllFields", configuration);
factory.create(Collections.emptyMap(), "testTag", "testingAllFields", false, configuration, null);

Map<String, Object> personalizeContext = new HashMap<>();
personalizeContext.put("contextKey2", "contextValue2");
Expand Down Expand Up @@ -186,7 +188,7 @@ public void testProcessorWithHitsWithInvalidPersonalizeContext() throws Exceptio
configuration.put("aws_region", region);

PersonalizeRankingResponseProcessor personalizeResponseProcessor =
factory.create(Collections.emptyMap(), "testTag", "testingAllFields", configuration);
factory.create(Collections.emptyMap(), "testTag", "testingAllFields", false, configuration,null);

Map<String, Object> personalizeContext = new HashMap<>();
personalizeContext.put("contextKey2", 5);
Expand Down
Loading