diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoSettings.java b/src/main/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoSettings.java index 0c95fc92..16aba0e1 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoSettings.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoSettings.java @@ -40,6 +40,17 @@ public class Ip2GeoSettings { Setting.Property.Dynamic ); + /** + * Bulk size for indexing GeoIP data + */ + public static final Setting BATCH_SIZE = Setting.intSetting( + "plugins.geospatial.ip2geo.datasource.batch_size", + 10000, + 1, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + /** * Timeout value for Ip2Geo processor */ @@ -67,7 +78,7 @@ public class Ip2GeoSettings { * @return a list of all settings for Ip2Geo feature */ public static final List> settings() { - return List.of(DATASOURCE_ENDPOINT, DATASOURCE_UPDATE_INTERVAL, TIMEOUT, CACHE_SIZE); + return List.of(DATASOURCE_ENDPOINT, DATASOURCE_UPDATE_INTERVAL, BATCH_SIZE, TIMEOUT, CACHE_SIZE); } /** diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/dao/DatasourceDao.java b/src/main/java/org/opensearch/geospatial/ip2geo/dao/DatasourceDao.java index ec8e6f81..fd6ad995 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/dao/DatasourceDao.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/dao/DatasourceDao.java @@ -38,6 +38,7 @@ import org.opensearch.action.search.SearchResponse; import org.opensearch.action.support.WriteRequest; import org.opensearch.client.Client; +import org.opensearch.cluster.routing.Preference; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.settings.ClusterSettings; @@ -156,6 +157,7 @@ private IndexRequest toIndexRequest(Datasource datasource) { indexRequest.index(DatasourceExtension.JOB_INDEX_NAME); indexRequest.id(datasource.getName()); indexRequest.opType(DocWriteRequest.OpType.INDEX); + indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); indexRequest.source(datasource.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)); return indexRequest; } catch (IOException e) { @@ -215,7 +217,7 @@ public void deleteDatasource(final Datasource datasource) { * @throws IOException exception */ public Datasource getDatasource(final String name) throws IOException { - GetRequest request = new GetRequest(DatasourceExtension.JOB_INDEX_NAME, name); + GetRequest request = new GetRequest(DatasourceExtension.JOB_INDEX_NAME, name).preference(Preference.PRIMARY.type()); GetResponse response; try { response = StashedThreadContext.run(client, () -> client.get(request).actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT))); @@ -242,7 +244,7 @@ public Datasource getDatasource(final String name) throws IOException { * @param actionListener the action listener */ public void getDatasource(final String name, final ActionListener actionListener) { - GetRequest request = new GetRequest(DatasourceExtension.JOB_INDEX_NAME, name); + GetRequest request = new GetRequest(DatasourceExtension.JOB_INDEX_NAME, name).preference(Preference.PRIMARY.type()); StashedThreadContext.run(client, () -> client.get(request, new ActionListener<>() { @Override public void onResponse(final GetResponse response) { @@ -280,6 +282,7 @@ public void getDatasources(final String[] names, final ActionListener client.prepareMultiGet() .add(DatasourceExtension.JOB_INDEX_NAME, names) + .setPreference(Preference.PRIMARY.type()) .execute(createGetDataSourceQueryActionLister(MultiGetResponse.class, actionListener)) ); } @@ -293,6 +296,7 @@ public void getAllDatasources(final ActionListener> actionListe client, () -> client.prepareSearch(DatasourceExtension.JOB_INDEX_NAME) .setQuery(QueryBuilders.matchAllQuery()) + .setPreference(Preference.PRIMARY.type()) .setSize(MAX_SIZE) .execute(createGetDataSourceQueryActionLister(SearchResponse.class, actionListener)) ); @@ -306,6 +310,7 @@ public List getAllDatasources() { client, () -> client.prepareSearch(DatasourceExtension.JOB_INDEX_NAME) .setQuery(QueryBuilders.matchAllQuery()) + .setPreference(Preference.PRIMARY.type()) .setSize(MAX_SIZE) .execute() .actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT)) diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/dao/GeoIpDataDao.java b/src/main/java/org/opensearch/geospatial/ip2geo/dao/GeoIpDataDao.java index a930781c..a538e813 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/dao/GeoIpDataDao.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/dao/GeoIpDataDao.java @@ -47,6 +47,7 @@ import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.client.Client; import org.opensearch.client.Requests; +import org.opensearch.cluster.routing.Preference; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.SuppressForbidden; import org.opensearch.common.settings.ClusterSettings; @@ -68,7 +69,6 @@ */ @Log4j2 public class GeoIpDataDao { - public static final int BUNDLE_SIZE = 128; private static final String IP_RANGE_FIELD_NAME = "_cidr"; private static final String DATA_FIELD_NAME = "_data"; private static final Map INDEX_SETTING_TO_CREATE = Map.of( @@ -248,7 +248,7 @@ public Map getGeoIpData(final String indexName, final String ip) () -> client.prepareSearch(indexName) .setSize(1) .setQuery(QueryBuilders.termQuery(IP_RANGE_FIELD_NAME, ip)) - .setPreference("_local") + .setPreference(Preference.LOCAL.type()) .setRequestCache(true) .get(clusterSettings.get(Ip2GeoSettings.TIMEOUT)) ); @@ -277,9 +277,10 @@ public void putGeoIpData( @NonNull final Runnable renewLock ) throws IOException { TimeValue timeout = clusterSettings.get(Ip2GeoSettings.TIMEOUT); + Integer batchSize = clusterSettings.get(Ip2GeoSettings.BATCH_SIZE); final BulkRequest bulkRequest = new BulkRequest(); Queue requests = new LinkedList<>(); - for (int i = 0; i < BUNDLE_SIZE; i++) { + for (int i = 0; i < batchSize; i++) { requests.add(Requests.indexRequest(indexName)); } while (iterator.hasNext()) { @@ -289,7 +290,7 @@ public void putGeoIpData( indexRequest.source(document); indexRequest.id(record.get(0)); bulkRequest.add(indexRequest); - if (iterator.hasNext() == false || bulkRequest.requests().size() == BUNDLE_SIZE) { + if (iterator.hasNext() == false || bulkRequest.requests().size() == batchSize) { BulkResponse response = StashedThreadContext.run(client, () -> client.bulk(bulkRequest).actionGet(timeout)); if (response.hasFailures()) { throw new OpenSearchException( @@ -304,6 +305,7 @@ public void putGeoIpData( renewLock.run(); } freezeIndex(indexName); + } public void deleteIp2GeoDataIndex(final String index) { diff --git a/src/main/resources/mappings/ip2geo_datasource.json b/src/main/resources/mappings/ip2geo_datasource.json index 3f3d5aa1..567052d6 100644 --- a/src/main/resources/mappings/ip2geo_datasource.json +++ b/src/main/resources/mappings/ip2geo_datasource.json @@ -1,75 +1,130 @@ { - "properties" : { - "database" : { - "properties" : { - "fields" : { - "type" : "text" + "properties": { + "database": { + "properties": { + "fields": { + "type": "text" }, - "sha256_hash" : { - "type" : "text" + "provider": { + "type": "text" }, - "provider" : { - "type" : "text" + "sha256_hash": { + "type": "text" }, - "updated_at_in_epoch_millis" : { - "type" : "long" + "updated_at_in_epoch_millis": { + "type": "long" }, - "valid_for_in_days" : { - "type" : "long" + "valid_for_in_days": { + "type": "long" } } }, - "enabled_time" : { - "type" : "long" + "enabled_time": { + "type": "long" }, - "endpoint" : { - "type" : "text" + "endpoint": { + "type": "text" }, - "name" : { - "type" : "text" + "indices": { + "type": "text" }, - "indices" : { - "type" : "text" + "last_update_time": { + "type": "long" }, - "last_update_time" : { - "type" : "long" + "name": { + "type": "text" }, - "schedule" : { - "properties" : { - "interval" : { - "properties" : { - "period" : { - "type" : "long" + "schedule": { + "properties": { + "interval": { + "properties": { + "period": { + "type": "long" }, - "start_time" : { - "type" : "long" + "start_time": { + "type": "long" }, - "unit" : { - "type" : "text" + "unit": { + "type": "text" } } } } }, - "state" : { - "type" : "text" + "state": { + "type": "text" }, - "update_enabled" : { - "type" : "boolean" + "system_schedule": { + "properties": { + "interval": { + "properties": { + "period": { + "type": "long" + }, + "start_time": { + "type": "long" + }, + "unit": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + } + } + }, + "task": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } }, - "update_stats" : { - "properties" : { - "last_failed_at_in_epoch_millis" : { - "type" : "long" + "update_enabled": { + "type": "boolean" + }, + "update_stats": { + "properties": { + "last_failed_at_in_epoch_millis": { + "type": "long" }, - "last_processing_time_in_millis" : { - "type" : "long" + "last_processing_time_in_millis": { + "type": "long" }, - "last_skipped_at_in_epoch_millis" : { - "type" : "long" + "last_skipped_at_in_epoch_millis": { + "type": "long" }, - "last_succeeded_at_in_epoch_millis" : { - "type" : "long" + "last_succeeded_at_in_epoch_millis": { + "type": "long" + } + } + }, + "user_schedule": { + "properties": { + "interval": { + "properties": { + "period": { + "type": "long" + }, + "start_time": { + "type": "long" + }, + "unit": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } } } } diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/dao/DatasourceDaoTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/dao/DatasourceDaoTests.java index 09e2dd46..5bcdbbd0 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/dao/DatasourceDaoTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/dao/DatasourceDaoTests.java @@ -39,6 +39,7 @@ import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; import org.opensearch.action.support.WriteRequest; +import org.opensearch.cluster.routing.Preference; import org.opensearch.common.Randomness; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.xcontent.json.JsonXContent; @@ -205,6 +206,7 @@ private Datasource setupClientForGetRequest(final boolean isExist, final Runtime GetRequest request = (GetRequest) actionRequest; assertEquals(datasource.getName(), request.id()); assertEquals(DatasourceExtension.JOB_INDEX_NAME, request.index()); + assertEquals(Preference.PRIMARY.type(), request.preference()); GetResponse response = getMockedGetResponse(isExist ? datasource : null); if (exception != null) { throw exception; @@ -262,6 +264,7 @@ public void testGetDatasources_whenValidInput_thenSucceed() { assertTrue(actionRequest instanceof MultiGetRequest); MultiGetRequest request = (MultiGetRequest) actionRequest; assertEquals(2, request.getItems().size()); + assertEquals(Preference.PRIMARY.type(), request.preference()); for (MultiGetRequest.Item item : request.getItems()) { assertEquals(DatasourceExtension.JOB_INDEX_NAME, item.index()); assertTrue(datasources.stream().filter(datasource -> datasource.getName().equals(item.id())).findAny().isPresent()); @@ -295,6 +298,7 @@ public void testGetAllDatasources_whenAsynchronous_thenSucceed() { assertEquals(DatasourceExtension.JOB_INDEX_NAME, request.indices()[0]); assertEquals(QueryBuilders.matchAllQuery(), request.source().query()); assertEquals(1000, request.source().size()); + assertEquals(Preference.PRIMARY.type(), request.preference()); SearchResponse response = mock(SearchResponse.class); when(response.getHits()).thenReturn(searchHits); @@ -322,6 +326,7 @@ public void testGetAllDatasources_whenSynchronous_thenSucceed() { assertEquals(DatasourceExtension.JOB_INDEX_NAME, request.indices()[0]); assertEquals(QueryBuilders.matchAllQuery(), request.source().query()); assertEquals(1000, request.source().size()); + assertEquals(Preference.PRIMARY.type(), request.preference()); SearchResponse response = mock(SearchResponse.class); when(response.getHits()).thenReturn(searchHits); diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/dao/GeoIpDataDaoTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/dao/GeoIpDataDaoTests.java index 45380aa3..0caa18d0 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/dao/GeoIpDataDaoTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/dao/GeoIpDataDaoTests.java @@ -41,6 +41,7 @@ import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; import org.opensearch.action.support.master.AcknowledgedResponse; +import org.opensearch.cluster.routing.Preference; import org.opensearch.common.Strings; import org.opensearch.common.SuppressForbidden; import org.opensearch.common.bytes.BytesReference; @@ -236,7 +237,7 @@ public void testGetGeoIpData_whenDataExist_thenReturnTheData() { verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> { assert actionRequest instanceof SearchRequest; SearchRequest request = (SearchRequest) actionRequest; - assertEquals("_local", request.preference()); + assertEquals(Preference.LOCAL.type(), request.preference()); assertEquals(1, request.source().size()); assertEquals(QueryBuilders.termQuery(IP_RANGE_FIELD_NAME, ip), request.source().query()); @@ -269,7 +270,7 @@ public void testGetGeoIpData_whenNoData_thenReturnEmpty() { verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> { assert actionRequest instanceof SearchRequest; SearchRequest request = (SearchRequest) actionRequest; - assertEquals("_local", request.preference()); + assertEquals(Preference.LOCAL.type(), request.preference()); assertEquals(1, request.source().size()); assertEquals(QueryBuilders.termQuery(IP_RANGE_FIELD_NAME, ip), request.source().query());