From 29711f96dbb2c9abc1181b933a8b334eed63ca66 Mon Sep 17 00:00:00 2001 From: Tyler Ohlsen Date: Tue, 12 Dec 2023 08:32:53 -0800 Subject: [PATCH] Add an AD transport client (#1110) Signed-off-by: Tyler Ohlsen --- .../ad/client/AnomalyDetectionClient.java | 54 ++++++++ .../ad/client/AnomalyDetectionNodeClient.java | 35 +++++ .../client/AnomalyDetectionClientTests.java | 53 ++++++++ .../AnomalyDetectionNodeClientTests.java | 127 ++++++++++++++++++ 4 files changed, 269 insertions(+) create mode 100644 src/main/java/org/opensearch/ad/client/AnomalyDetectionClient.java create mode 100644 src/main/java/org/opensearch/ad/client/AnomalyDetectionNodeClient.java create mode 100644 src/test/java/org/opensearch/ad/client/AnomalyDetectionClientTests.java create mode 100644 src/test/java/org/opensearch/ad/client/AnomalyDetectionNodeClientTests.java diff --git a/src/main/java/org/opensearch/ad/client/AnomalyDetectionClient.java b/src/main/java/org/opensearch/ad/client/AnomalyDetectionClient.java new file mode 100644 index 000000000..c4fdb6644 --- /dev/null +++ b/src/main/java/org/opensearch/ad/client/AnomalyDetectionClient.java @@ -0,0 +1,54 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.ad.client; + +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.common.action.ActionFuture; +import org.opensearch.core.action.ActionListener; + +/** + * A client to provide interfaces for anomaly detection functionality. This will be used by other plugins. + */ +public interface AnomalyDetectionClient { + /** + * Search anomaly detectors - refer to https://opensearch.org/docs/latest/observing-your-data/ad/api/#search-detector + * @param searchRequest search request to search the anomaly detectors + * @return ActionFuture of SearchResponse + */ + default ActionFuture searchAnomalyDetectors(SearchRequest searchRequest) { + PlainActionFuture actionFuture = PlainActionFuture.newFuture(); + searchAnomalyDetectors(searchRequest, actionFuture); + return actionFuture; + } + + /** + * Search anomaly detectors - refer to https://opensearch.org/docs/latest/observing-your-data/ad/api/#search-detector + * @param searchRequest search request to search the anomaly detectors + * @param listener a listener to be notified of the result + */ + void searchAnomalyDetectors(SearchRequest searchRequest, ActionListener listener); + + /** + * Search anomaly results - refer to https://opensearch.org/docs/latest/observing-your-data/ad/api/#search-detector-result + * @param searchRequest search request to search the anomaly results + * @return ActionFuture of SearchResponse + */ + default ActionFuture searchAnomalyResults(SearchRequest searchRequest) { + PlainActionFuture actionFuture = PlainActionFuture.newFuture(); + searchAnomalyDetectors(searchRequest, actionFuture); + return actionFuture; + } + + /** + * Search anomaly results - refer to https://opensearch.org/docs/latest/observing-your-data/ad/api/#search-detector-result + * @param searchRequest search request to search the anomaly results + * @param listener a listener to be notified of the result + */ + void searchAnomalyResults(SearchRequest searchRequest, ActionListener listener); + +} diff --git a/src/main/java/org/opensearch/ad/client/AnomalyDetectionNodeClient.java b/src/main/java/org/opensearch/ad/client/AnomalyDetectionNodeClient.java new file mode 100644 index 000000000..a9d690ca1 --- /dev/null +++ b/src/main/java/org/opensearch/ad/client/AnomalyDetectionNodeClient.java @@ -0,0 +1,35 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.ad.client; + +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.ad.transport.SearchAnomalyDetectorAction; +import org.opensearch.ad.transport.SearchAnomalyResultAction; +import org.opensearch.client.Client; +import org.opensearch.core.action.ActionListener; + +public class AnomalyDetectionNodeClient implements AnomalyDetectionClient { + private final Client client; + + public AnomalyDetectionNodeClient(Client client) { + this.client = client; + } + + @Override + public void searchAnomalyDetectors(SearchRequest searchRequest, ActionListener listener) { + this.client.execute(SearchAnomalyDetectorAction.INSTANCE, searchRequest, ActionListener.wrap(searchResponse -> { + listener.onResponse(searchResponse); + }, listener::onFailure)); + } + + @Override + public void searchAnomalyResults(SearchRequest searchRequest, ActionListener listener) { + this.client.execute(SearchAnomalyResultAction.INSTANCE, searchRequest, ActionListener.wrap(searchResponse -> { + listener.onResponse(searchResponse); + }, listener::onFailure)); + } +} diff --git a/src/test/java/org/opensearch/ad/client/AnomalyDetectionClientTests.java b/src/test/java/org/opensearch/ad/client/AnomalyDetectionClientTests.java new file mode 100644 index 000000000..2fe3e976f --- /dev/null +++ b/src/test/java/org/opensearch/ad/client/AnomalyDetectionClientTests.java @@ -0,0 +1,53 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.ad.client; + +import static org.junit.Assert.assertEquals; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.core.action.ActionListener; + +public class AnomalyDetectionClientTests { + + AnomalyDetectionClient anomalyDetectionClient; + + @Mock + SearchResponse searchResponse; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + // Implementing req'd methods of the interface. These methods are all called internally by the + // default methods that we test below. + anomalyDetectionClient = new AnomalyDetectionClient() { + @Override + public void searchAnomalyDetectors(SearchRequest searchRequest, ActionListener listener) { + listener.onResponse(searchResponse); + } + + @Override + public void searchAnomalyResults(SearchRequest searchRequest, ActionListener listener) { + listener.onResponse(searchResponse); + } + }; + } + + @Test + public void searchAnomalyDetectors() { + assertEquals(searchResponse, anomalyDetectionClient.searchAnomalyDetectors(new SearchRequest()).actionGet()); + } + + @Test + public void searchAnomalyResults() { + assertEquals(searchResponse, anomalyDetectionClient.searchAnomalyResults(new SearchRequest()).actionGet()); + } + +} diff --git a/src/test/java/org/opensearch/ad/client/AnomalyDetectionNodeClientTests.java b/src/test/java/org/opensearch/ad/client/AnomalyDetectionNodeClientTests.java new file mode 100644 index 000000000..e8a64c92a --- /dev/null +++ b/src/test/java/org/opensearch/ad/client/AnomalyDetectionNodeClientTests.java @@ -0,0 +1,127 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.ad.client; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.opensearch.ad.indices.ADIndexManagement.ALL_AD_RESULTS_INDEX_PATTERN; +import static org.opensearch.ad.model.AnomalyDetector.DETECTOR_TYPE_FIELD; +import static org.opensearch.timeseries.TestHelpers.matchAllRequest; + +import java.io.IOException; +import java.time.Instant; +import java.time.temporal.ChronoUnit; + +import org.junit.Before; +import org.junit.Test; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.ad.HistoricalAnalysisIntegTestCase; +import org.opensearch.ad.constant.ADCommonName; +import org.opensearch.ad.model.AnomalyDetector; +import org.opensearch.ad.model.AnomalyDetectorType; +import org.opensearch.index.query.BoolQueryBuilder; +import org.opensearch.index.query.TermQueryBuilder; +import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.timeseries.TestHelpers; + +import com.google.common.collect.ImmutableList; + +// These tests are intended to ensure the underlying transport actions of the client methods +// are being exercised and returning expected results, covering some of the basic use cases. +// The exhaustive set of transport action scenarios are within the respective transport action +// test suites themselves. We do not want to unnecessarily duplicate all of those tests here. +public class AnomalyDetectionNodeClientTests extends HistoricalAnalysisIntegTestCase { + + private String indexName = "test-data"; + private Instant startTime = Instant.now().minus(2, ChronoUnit.DAYS); + private AnomalyDetectionNodeClient adClient; + private PlainActionFuture future; + + @Before + public void setup() { + adClient = new AnomalyDetectionNodeClient(client()); + } + + @Test + public void testSearchAnomalyDetectors_NoIndices() { + deleteIndexIfExists(ADCommonName.ANOMALY_RESULT_INDEX_ALIAS); + + SearchResponse searchResponse = adClient.searchAnomalyDetectors(matchAllRequest()).actionGet(10000); + assertEquals(0, searchResponse.getInternalResponse().hits().getTotalHits().value); + } + + @Test + public void testSearchAnomalyDetectors_Empty() throws IOException { + deleteIndexIfExists(ADCommonName.ANOMALY_RESULT_INDEX_ALIAS); + createDetectorIndex(); + + SearchResponse searchResponse = adClient.searchAnomalyDetectors(matchAllRequest()).actionGet(10000); + assertEquals(0, searchResponse.getInternalResponse().hits().getTotalHits().value); + } + + @Test + public void searchAnomalyDetectors_Populated() throws IOException { + ingestTestData(indexName, startTime, 1, "test", 3000); + String detectorType = AnomalyDetectorType.SINGLE_ENTITY.name(); + AnomalyDetector detector = TestHelpers + .randomAnomalyDetector( + ImmutableList.of(indexName), + ImmutableList.of(TestHelpers.randomFeature(true)), + null, + Instant.now(), + 1, + false, + null + ); + createDetectorIndex(); + String detectorId = createDetector(detector); + + BoolQueryBuilder query = new BoolQueryBuilder().filter(new TermQueryBuilder(DETECTOR_TYPE_FIELD, detectorType)); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(query); + SearchRequest request = new SearchRequest().source(searchSourceBuilder); + + SearchResponse searchResponse = adClient.searchAnomalyDetectors(request).actionGet(10000); + assertEquals(1, searchResponse.getInternalResponse().hits().getTotalHits().value); + assertEquals(detectorId, searchResponse.getInternalResponse().hits().getAt(0).getId()); + } + + @Test + public void testSearchAnomalyResults_NoIndices() { + future = mock(PlainActionFuture.class); + SearchRequest request = new SearchRequest().indices(new String[] {}); + + adClient.searchAnomalyResults(request, future); + verify(future).onFailure(any(IllegalArgumentException.class)); + } + + @Test + public void testSearchAnomalyResults_Empty() throws IOException { + createADResultIndex(); + SearchResponse searchResponse = adClient + .searchAnomalyResults(matchAllRequest().indices(ALL_AD_RESULTS_INDEX_PATTERN)) + .actionGet(10000); + assertEquals(0, searchResponse.getInternalResponse().hits().getTotalHits().value); + } + + @Test + public void testSearchAnomalyResults_Populated() throws IOException { + deleteIndexIfExists(ALL_AD_RESULTS_INDEX_PATTERN); + createADResultIndex(); + String adResultId = createADResult(TestHelpers.randomAnomalyDetectResult()); + + SearchResponse searchResponse = adClient + .searchAnomalyResults(matchAllRequest().indices(ALL_AD_RESULTS_INDEX_PATTERN)) + .actionGet(10000); + assertEquals(1, searchResponse.getInternalResponse().hits().getTotalHits().value); + + assertEquals(adResultId, searchResponse.getInternalResponse().hits().getAt(0).getId()); + } + +}