Skip to content

Commit

Permalink
Add an AD transport client (#1110)
Browse files Browse the repository at this point in the history
Signed-off-by: Tyler Ohlsen <[email protected]>
  • Loading branch information
ohltyler authored Dec 12, 2023
1 parent 353dcae commit 29711f9
Show file tree
Hide file tree
Showing 4 changed files with 269 additions and 0 deletions.
54 changes: 54 additions & 0 deletions src/main/java/org/opensearch/ad/client/AnomalyDetectionClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.ad.client;

import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.core.action.ActionListener;

/**
* A client to provide interfaces for anomaly detection functionality. This will be used by other plugins.
*/
public interface AnomalyDetectionClient {
/**
* Search anomaly detectors - refer to https://opensearch.org/docs/latest/observing-your-data/ad/api/#search-detector
* @param searchRequest search request to search the anomaly detectors
* @return ActionFuture of SearchResponse
*/
default ActionFuture<SearchResponse> searchAnomalyDetectors(SearchRequest searchRequest) {
PlainActionFuture<SearchResponse> actionFuture = PlainActionFuture.newFuture();
searchAnomalyDetectors(searchRequest, actionFuture);
return actionFuture;
}

/**
* Search anomaly detectors - refer to https://opensearch.org/docs/latest/observing-your-data/ad/api/#search-detector
* @param searchRequest search request to search the anomaly detectors
* @param listener a listener to be notified of the result
*/
void searchAnomalyDetectors(SearchRequest searchRequest, ActionListener<SearchResponse> listener);

/**
* Search anomaly results - refer to https://opensearch.org/docs/latest/observing-your-data/ad/api/#search-detector-result
* @param searchRequest search request to search the anomaly results
* @return ActionFuture of SearchResponse
*/
default ActionFuture<SearchResponse> searchAnomalyResults(SearchRequest searchRequest) {
PlainActionFuture<SearchResponse> actionFuture = PlainActionFuture.newFuture();
searchAnomalyDetectors(searchRequest, actionFuture);
return actionFuture;
}

/**
* Search anomaly results - refer to https://opensearch.org/docs/latest/observing-your-data/ad/api/#search-detector-result
* @param searchRequest search request to search the anomaly results
* @param listener a listener to be notified of the result
*/
void searchAnomalyResults(SearchRequest searchRequest, ActionListener<SearchResponse> listener);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.ad.client;

import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.ad.transport.SearchAnomalyDetectorAction;
import org.opensearch.ad.transport.SearchAnomalyResultAction;
import org.opensearch.client.Client;
import org.opensearch.core.action.ActionListener;

public class AnomalyDetectionNodeClient implements AnomalyDetectionClient {
private final Client client;

public AnomalyDetectionNodeClient(Client client) {
this.client = client;
}

@Override
public void searchAnomalyDetectors(SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
this.client.execute(SearchAnomalyDetectorAction.INSTANCE, searchRequest, ActionListener.wrap(searchResponse -> {
listener.onResponse(searchResponse);
}, listener::onFailure));
}

@Override
public void searchAnomalyResults(SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
this.client.execute(SearchAnomalyResultAction.INSTANCE, searchRequest, ActionListener.wrap(searchResponse -> {
listener.onResponse(searchResponse);
}, listener::onFailure));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.ad.client;

import static org.junit.Assert.assertEquals;

import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.core.action.ActionListener;

public class AnomalyDetectionClientTests {

AnomalyDetectionClient anomalyDetectionClient;

@Mock
SearchResponse searchResponse;

@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
// Implementing req'd methods of the interface. These methods are all called internally by the
// default methods that we test below.
anomalyDetectionClient = new AnomalyDetectionClient() {
@Override
public void searchAnomalyDetectors(SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
listener.onResponse(searchResponse);
}

@Override
public void searchAnomalyResults(SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
listener.onResponse(searchResponse);
}
};
}

@Test
public void searchAnomalyDetectors() {
assertEquals(searchResponse, anomalyDetectionClient.searchAnomalyDetectors(new SearchRequest()).actionGet());
}

@Test
public void searchAnomalyResults() {
assertEquals(searchResponse, anomalyDetectionClient.searchAnomalyResults(new SearchRequest()).actionGet());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.ad.client;

import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.opensearch.ad.indices.ADIndexManagement.ALL_AD_RESULTS_INDEX_PATTERN;
import static org.opensearch.ad.model.AnomalyDetector.DETECTOR_TYPE_FIELD;
import static org.opensearch.timeseries.TestHelpers.matchAllRequest;

import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;

import org.junit.Before;
import org.junit.Test;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.ad.HistoricalAnalysisIntegTestCase;
import org.opensearch.ad.constant.ADCommonName;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyDetectorType;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.TermQueryBuilder;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.timeseries.TestHelpers;

import com.google.common.collect.ImmutableList;

// These tests are intended to ensure the underlying transport actions of the client methods
// are being exercised and returning expected results, covering some of the basic use cases.
// The exhaustive set of transport action scenarios are within the respective transport action
// test suites themselves. We do not want to unnecessarily duplicate all of those tests here.
public class AnomalyDetectionNodeClientTests extends HistoricalAnalysisIntegTestCase {

private String indexName = "test-data";
private Instant startTime = Instant.now().minus(2, ChronoUnit.DAYS);
private AnomalyDetectionNodeClient adClient;
private PlainActionFuture<SearchResponse> future;

@Before
public void setup() {
adClient = new AnomalyDetectionNodeClient(client());
}

@Test
public void testSearchAnomalyDetectors_NoIndices() {
deleteIndexIfExists(ADCommonName.ANOMALY_RESULT_INDEX_ALIAS);

SearchResponse searchResponse = adClient.searchAnomalyDetectors(matchAllRequest()).actionGet(10000);
assertEquals(0, searchResponse.getInternalResponse().hits().getTotalHits().value);
}

@Test
public void testSearchAnomalyDetectors_Empty() throws IOException {
deleteIndexIfExists(ADCommonName.ANOMALY_RESULT_INDEX_ALIAS);
createDetectorIndex();

SearchResponse searchResponse = adClient.searchAnomalyDetectors(matchAllRequest()).actionGet(10000);
assertEquals(0, searchResponse.getInternalResponse().hits().getTotalHits().value);
}

@Test
public void searchAnomalyDetectors_Populated() throws IOException {
ingestTestData(indexName, startTime, 1, "test", 3000);
String detectorType = AnomalyDetectorType.SINGLE_ENTITY.name();
AnomalyDetector detector = TestHelpers
.randomAnomalyDetector(
ImmutableList.of(indexName),
ImmutableList.of(TestHelpers.randomFeature(true)),
null,
Instant.now(),
1,
false,
null
);
createDetectorIndex();
String detectorId = createDetector(detector);

BoolQueryBuilder query = new BoolQueryBuilder().filter(new TermQueryBuilder(DETECTOR_TYPE_FIELD, detectorType));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(query);
SearchRequest request = new SearchRequest().source(searchSourceBuilder);

SearchResponse searchResponse = adClient.searchAnomalyDetectors(request).actionGet(10000);
assertEquals(1, searchResponse.getInternalResponse().hits().getTotalHits().value);
assertEquals(detectorId, searchResponse.getInternalResponse().hits().getAt(0).getId());
}

@Test
public void testSearchAnomalyResults_NoIndices() {
future = mock(PlainActionFuture.class);
SearchRequest request = new SearchRequest().indices(new String[] {});

adClient.searchAnomalyResults(request, future);
verify(future).onFailure(any(IllegalArgumentException.class));
}

@Test
public void testSearchAnomalyResults_Empty() throws IOException {
createADResultIndex();
SearchResponse searchResponse = adClient
.searchAnomalyResults(matchAllRequest().indices(ALL_AD_RESULTS_INDEX_PATTERN))
.actionGet(10000);
assertEquals(0, searchResponse.getInternalResponse().hits().getTotalHits().value);
}

@Test
public void testSearchAnomalyResults_Populated() throws IOException {
deleteIndexIfExists(ALL_AD_RESULTS_INDEX_PATTERN);
createADResultIndex();
String adResultId = createADResult(TestHelpers.randomAnomalyDetectResult());

SearchResponse searchResponse = adClient
.searchAnomalyResults(matchAllRequest().indices(ALL_AD_RESULTS_INDEX_PATTERN))
.actionGet(10000);
assertEquals(1, searchResponse.getInternalResponse().hits().getTotalHits().value);

assertEquals(adResultId, searchResponse.getInternalResponse().hits().getAt(0).getId());
}

}

0 comments on commit 29711f9

Please sign in to comment.