Skip to content

Commit

Permalink
Revert "Enforce DOCUMENT Replication for AD Indices and Adjust Primar…
Browse files Browse the repository at this point in the history
…y Shards (#948)"

This reverts commit bc16499.
  • Loading branch information
jackiehanyang committed Aug 28, 2023
1 parent 4a09b74 commit 8b47679
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 41 deletions.
15 changes: 3 additions & 12 deletions src/main/java/org/opensearch/ad/indices/ADIndexManagement.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
import static org.opensearch.ad.settings.AnomalyDetectorSettings.ANOMALY_DETECTION_STATE_INDEX_MAPPING_FILE;
import static org.opensearch.ad.settings.AnomalyDetectorSettings.ANOMALY_RESULTS_INDEX_MAPPING_FILE;
import static org.opensearch.ad.settings.AnomalyDetectorSettings.CHECKPOINT_INDEX_MAPPING_FILE;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
import static org.opensearch.indices.replication.common.ReplicationType.DOCUMENT;

import java.io.IOException;
import java.util.EnumMap;
Expand Down Expand Up @@ -66,7 +64,7 @@ public class ADIndexManagement extends IndexManagement<ADIndex> {
* @param settings OS cluster setting
* @param nodeFilter Used to filter eligible nodes to host AD indices
* @param maxUpdateRunningTimes max number of retries to update index mapping and setting
* @throws IOException when failing to get mapping file
* @throws IOException
*/
public ADIndexManagement(
Client client,
Expand Down Expand Up @@ -197,10 +195,7 @@ public void initDefaultResultIndexDirectly(ActionListener<CreateIndexResponse> a
@Override
public void initStateIndex(ActionListener<CreateIndexResponse> actionListener) {
try {
// AD indices need RAW (e.g., we want users to be able to consume AD results as soon as possible and send out an alert if
// anomalies found).
Settings replicationSettings = Settings.builder().put(SETTING_REPLICATION_TYPE, DOCUMENT.name()).build();
CreateIndexRequest request = new CreateIndexRequest(ADCommonName.DETECTION_STATE_INDEX, replicationSettings)
CreateIndexRequest request = new CreateIndexRequest(ADCommonName.DETECTION_STATE_INDEX)
.mapping(getStateMappings(), XContentType.JSON)
.settings(settings);
adminClient.indices().create(request, markMappingUpToDate(ADIndex.STATE, actionListener));
Expand All @@ -224,11 +219,7 @@ public void initCheckpointIndex(ActionListener<CreateIndexResponse> actionListen
} catch (IOException e) {
throw new EndRunException("", "Cannot find checkpoint mapping file", true);
}
// AD indices need RAW (e.g., we want users to be able to consume AD results as soon as possible and send out an alert if anomalies
// found).
Settings replicationSettings = Settings.builder().put(SETTING_REPLICATION_TYPE, DOCUMENT.name()).build();
CreateIndexRequest request = new CreateIndexRequest(ADCommonName.CHECKPOINT_INDEX_NAME, replicationSettings)
.mapping(mapping, XContentType.JSON);
CreateIndexRequest request = new CreateIndexRequest(ADCommonName.CHECKPOINT_INDEX_NAME).mapping(mapping, XContentType.JSON);
choosePrimaryShards(request, true);
adminClient.indices().create(request, markMappingUpToDate(ADIndex.CHECKPOINT, actionListener));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

package org.opensearch.forecast.indices;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
import static org.opensearch.forecast.constant.ForecastCommonName.DUMMY_FORECAST_RESULT_ID;
import static org.opensearch.forecast.settings.ForecastSettings.FORECAST_CHECKPOINT_INDEX_MAPPING_FILE;
import static org.opensearch.forecast.settings.ForecastSettings.FORECAST_MAX_PRIMARY_SHARDS;
Expand All @@ -20,7 +19,6 @@
import static org.opensearch.forecast.settings.ForecastSettings.FORECAST_RESULT_HISTORY_RETENTION_PERIOD;
import static org.opensearch.forecast.settings.ForecastSettings.FORECAST_RESULT_HISTORY_ROLLOVER_PERIOD;
import static org.opensearch.forecast.settings.ForecastSettings.FORECAST_STATE_INDEX_MAPPING_FILE;
import static org.opensearch.indices.replication.common.ReplicationType.DOCUMENT;

import java.io.IOException;
import java.util.EnumMap;
Expand Down Expand Up @@ -63,7 +61,7 @@ public class ForecastIndexManagement extends IndexManagement<ForecastIndex> {
* @param settings OS cluster setting
* @param nodeFilter Used to filter eligible nodes to host forecast indices
* @param maxUpdateRunningTimes max number of retries to update index mapping and setting
* @throws IOException when failing to get mapping file
* @throws IOException
*/
public ForecastIndexManagement(
Client client,
Expand Down Expand Up @@ -179,8 +177,7 @@ public boolean doesCheckpointIndexExist() {
@Override
public void initStateIndex(ActionListener<CreateIndexResponse> actionListener) {
try {
Settings replicationSettings = Settings.builder().put(SETTING_REPLICATION_TYPE, DOCUMENT.name()).build();
CreateIndexRequest request = new CreateIndexRequest(ForecastCommonName.FORECAST_STATE_INDEX, replicationSettings)
CreateIndexRequest request = new CreateIndexRequest(ForecastCommonName.FORECAST_STATE_INDEX)
.mapping(getStateMappings(), XContentType.JSON)
.settings(settings);
adminClient.indices().create(request, markMappingUpToDate(ForecastIndex.STATE, actionListener));
Expand All @@ -204,10 +201,7 @@ public void initCheckpointIndex(ActionListener<CreateIndexResponse> actionListen
} catch (IOException e) {
throw new EndRunException("", "Cannot find checkpoint mapping file", true);
}
// forecast indices need RAW (e.g., we want users to be able to consume forecast results as soon as
// possible and send out an alert if a threshold is breached).
Settings replicationSettings = Settings.builder().put(SETTING_REPLICATION_TYPE, DOCUMENT.name()).build();
CreateIndexRequest request = new CreateIndexRequest(ForecastCommonName.FORECAST_CHECKPOINT_INDEX_NAME, replicationSettings)
CreateIndexRequest request = new CreateIndexRequest(ForecastCommonName.FORECAST_CHECKPOINT_INDEX_NAME)
.mapping(mapping, XContentType.JSON);
choosePrimaryShards(request, true);
adminClient.indices().create(request, markMappingUpToDate(ForecastIndex.CHECKPOINT, actionListener));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public final class ForecastSettings {

// max number of primary shards of a forecast index
public static final Setting<Integer> FORECAST_MAX_PRIMARY_SHARDS = Setting
.intSetting("plugins.forecast.max_primary_shards", 20, 0, 200, Setting.Property.NodeScope, Setting.Property.Dynamic);
.intSetting("plugins.forecast.max_primary_shards", 10, 0, 200, Setting.Property.NodeScope, Setting.Property.Dynamic);

// saving checkpoint every 12 hours.
// To support 1 million entities in 36 data nodes, each node has roughly 28K models.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@

package org.opensearch.timeseries.indices;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
import static org.opensearch.indices.replication.common.ReplicationType.DOCUMENT;
import static org.opensearch.timeseries.constant.CommonMessages.CAN_NOT_FIND_RESULT_INDEX;

import java.io.IOException;
Expand Down Expand Up @@ -428,10 +426,7 @@ public void initConfigIndexIfAbsent(ActionListener<CreateIndexResponse> actionLi
* @throws IOException IOException from {@link IndexManagement#getConfigMappings}
*/
public void initConfigIndex(ActionListener<CreateIndexResponse> actionListener) throws IOException {
// time series indices need RAW (e.g., we want users to be able to consume AD results as soon as possible
// and send out an alert if anomalies found).
Settings replicationSettings = Settings.builder().put(SETTING_REPLICATION_TYPE, DOCUMENT.name()).build();
CreateIndexRequest request = new CreateIndexRequest(CommonName.CONFIG_INDEX, replicationSettings)
CreateIndexRequest request = new CreateIndexRequest(CommonName.CONFIG_INDEX)
.mapping(getConfigMappings(), XContentType.JSON)
.settings(settings);
adminClient.indices().create(request, actionListener);
Expand Down Expand Up @@ -482,11 +477,7 @@ public static String getJobMappings() throws IOException {
*/
public void initJobIndex(ActionListener<CreateIndexResponse> actionListener) {
try {
// time series indices need RAW (e.g., we want users to be able to consume AD results as soon as
// possible and send out an alert if anomalies found).
Settings replicationSettings = Settings.builder().put(SETTING_REPLICATION_TYPE, DOCUMENT.name()).build();
CreateIndexRequest request = new CreateIndexRequest(CommonName.JOB_INDEX, replicationSettings)
.mapping(getJobMappings(), XContentType.JSON);
CreateIndexRequest request = new CreateIndexRequest(CommonName.JOB_INDEX).mapping(getJobMappings(), XContentType.JSON);
request
.settings(
Settings
Expand Down Expand Up @@ -937,10 +928,7 @@ protected void rolloverAndDeleteHistoryIndex(

CreateIndexRequest createRequest = rollOverRequest.getCreateIndexRequest();

// time series indices need RAW (e.g., we want users to be able to consume AD results as soon as possible
// and send out an alert if anomalies found).
Settings replicationSettings = Settings.builder().put(SETTING_REPLICATION_TYPE, DOCUMENT.name()).build();
createRequest.index(rolloverIndexPattern).settings(replicationSettings).mapping(resultMapping, XContentType.JSON);
createRequest.index(rolloverIndexPattern).mapping(resultMapping, XContentType.JSON);

choosePrimaryShards(createRequest, true);

Expand All @@ -965,10 +953,7 @@ protected void initResultIndexDirectly(
IndexType resultIndex,
ActionListener<CreateIndexResponse> actionListener
) {
// time series indices need RAW (e.g., we want users to be able to consume AD results as soon as possible
// and send out an alert if anomalies found).
Settings replicationSettings = Settings.builder().put(SETTING_REPLICATION_TYPE, DOCUMENT.name()).build();
CreateIndexRequest request = new CreateIndexRequest(resultIndexName, replicationSettings).mapping(resultMapping, XContentType.JSON);
CreateIndexRequest request = new CreateIndexRequest(resultIndexName).mapping(resultMapping, XContentType.JSON);
if (alias != null) {
request.alias(new Alias(alias));
}
Expand Down

0 comments on commit 8b47679

Please sign in to comment.