Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Add SplitResponseProcessor to Search Pipelines #14863

Merged
merged 1 commit into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add matchesPluginSystemIndexPattern to SystemIndexRegistry ([#14750](https://github.com/opensearch-project/OpenSearch/pull/14750))
- Add Plugin interface for loading application based configuration templates (([#14659](https://github.com/opensearch-project/OpenSearch/issues/14659)))
- Add prefix mode verification setting for repository verification (([#14790](https://github.com/opensearch-project/OpenSearch/pull/14790)))
- Add SplitResponseProcessor to Search Pipelines (([#14800](https://github.com/opensearch-project/OpenSearch/issues/14800)))
- Optimize TransportNodesAction to not send DiscoveryNodes for NodeStats, NodesInfo and ClusterStats call ([14749](https://github.com/opensearch-project/OpenSearch/pull/14749))

### Dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ public Map<String, Processor.Factory<SearchResponseProcessor>> getResponseProces
TruncateHitsResponseProcessor.TYPE,
new TruncateHitsResponseProcessor.Factory(),
CollapseResponseProcessor.TYPE,
new CollapseResponseProcessor.Factory()
new CollapseResponseProcessor.Factory(),
SplitResponseProcessor.TYPE,
new SplitResponseProcessor.Factory()
)
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.pipeline.common;

import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.document.DocumentField;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.xcontent.MediaType;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.ingest.ConfigurationUtils;
import org.opensearch.search.SearchHit;
import org.opensearch.search.pipeline.AbstractProcessor;
import org.opensearch.search.pipeline.Processor;
import org.opensearch.search.pipeline.SearchResponseProcessor;

import java.util.Arrays;
import java.util.Map;
import java.util.Objects;

/**
* Processor that sorts an array of items.
* Throws exception is the specified field is not an array.
*/
public class SplitResponseProcessor extends AbstractProcessor implements SearchResponseProcessor {
/** Key to reference this processor type from a search pipeline. */
public static final String TYPE = "split";
/** Key defining the string field to be split. */
public static final String SPLIT_FIELD = "field";
/** Key defining the delimiter used to split the string. This can be a regular expression pattern. */
public static final String SEPARATOR = "separator";
/** Optional key for handling empty trailing fields. */
public static final String PRESERVE_TRAILING = "preserve_trailing";
/** Optional key to put the split values in a different field. */
public static final String TARGET_FIELD = "target_field";

private final String splitField;
private final String separator;
private final boolean preserveTrailing;
private final String targetField;

SplitResponseProcessor(
String tag,
String description,
boolean ignoreFailure,
String splitField,
String separator,
boolean preserveTrailing,
String targetField
) {
super(tag, description, ignoreFailure);
this.splitField = Objects.requireNonNull(splitField);
this.separator = Objects.requireNonNull(separator);
this.preserveTrailing = preserveTrailing;
this.targetField = targetField == null ? splitField : targetField;
}

/**
* Getter function for splitField
* @return sortField
*/
public String getSplitField() {
return splitField;
}

/**
* Getter function for separator
* @return separator
*/
public String getSeparator() {
return separator;
}

/**
* Getter function for preserveTrailing
* @return preserveTrailing;
*/
public boolean isPreserveTrailing() {
return preserveTrailing;
}

/**
* Getter function for targetField
* @return targetField
*/
public String getTargetField() {
return targetField;
}

@Override
public String getType() {
return TYPE;
}

@Override
public SearchResponse processResponse(SearchRequest request, SearchResponse response) throws Exception {
SearchHit[] hits = response.getHits().getHits();
for (SearchHit hit : hits) {
Map<String, DocumentField> fields = hit.getFields();
if (fields.containsKey(splitField)) {
DocumentField docField = hit.getFields().get(splitField);
if (docField == null) {
throw new IllegalArgumentException("field [" + splitField + "] is null, cannot split.");
}
Object val = docField.getValue();
if (val == null || !String.class.isAssignableFrom(val.getClass())) {
throw new IllegalArgumentException("field [" + splitField + "] is not a string, cannot split");
}
Object[] strings = ((String) val).split(separator, preserveTrailing ? -1 : 0);
hit.setDocumentField(targetField, new DocumentField(targetField, Arrays.asList(strings)));
}
if (hit.hasSource()) {
BytesReference sourceRef = hit.getSourceRef();
Tuple<? extends MediaType, Map<String, Object>> typeAndSourceMap = XContentHelper.convertToMap(
sourceRef,
false,
(MediaType) null
);

Map<String, Object> sourceAsMap = typeAndSourceMap.v2();
if (sourceAsMap.containsKey(splitField)) {
Object val = sourceAsMap.get(splitField);
if (val instanceof String) {
Object[] strings = ((String) val).split(separator, preserveTrailing ? -1 : 0);
sourceAsMap.put(targetField, Arrays.asList(strings));
}
XContentBuilder builder = XContentBuilder.builder(typeAndSourceMap.v1().xContent());
builder.map(sourceAsMap);
hit.sourceRef(BytesReference.bytes(builder));
}
}
}
return response;
}

static class Factory implements Processor.Factory<SearchResponseProcessor> {

@Override
public SplitResponseProcessor create(
Map<String, Processor.Factory<SearchResponseProcessor>> processorFactories,
String tag,
String description,
boolean ignoreFailure,
Map<String, Object> config,
PipelineContext pipelineContext
) {
String splitField = ConfigurationUtils.readStringProperty(TYPE, tag, config, SPLIT_FIELD);
String separator = ConfigurationUtils.readStringProperty(TYPE, tag, config, SEPARATOR);
boolean preserveTrailing = ConfigurationUtils.readBooleanProperty(TYPE, tag, config, PRESERVE_TRAILING, false);
String targetField = ConfigurationUtils.readStringProperty(TYPE, tag, config, TARGET_FIELD, splitField);
return new SplitResponseProcessor(tag, description, ignoreFailure, splitField, separator, preserveTrailing, targetField);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void testAllowlistNotSpecified() throws IOException {
try (SearchPipelineCommonModulePlugin plugin = new SearchPipelineCommonModulePlugin()) {
assertEquals(Set.of("oversample", "filter_query", "script"), plugin.getRequestProcessors(createParameters(settings)).keySet());
assertEquals(
Set.of("rename_field", "truncate_hits", "collapse"),
Set.of("rename_field", "truncate_hits", "collapse", "split"),
plugin.getResponseProcessors(createParameters(settings)).keySet()
);
assertEquals(Set.of(), plugin.getSearchPhaseResultsProcessors(createParameters(settings)).keySet());
Expand Down
Loading
Loading