diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/EnterpriseGeoIpDownloaderTaskExecutor.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/EnterpriseGeoIpDownloaderTaskExecutor.java new file mode 100644 index 0000000000000..463f53ffbf20e --- /dev/null +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/EnterpriseGeoIpDownloaderTaskExecutor.java @@ -0,0 +1,34 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.ingest.geoip; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.persistent.AllocatedPersistentTask; +import org.elasticsearch.persistent.PersistentTaskParams; +import org.elasticsearch.persistent.PersistentTaskState; +import org.elasticsearch.persistent.PersistentTasksExecutor; +import org.elasticsearch.threadpool.ThreadPool; + +import static org.elasticsearch.ingest.EnterpriseGeoIpTask.ENTERPRISE_GEOIP_DOWNLOADER; + +public class EnterpriseGeoIpDownloaderTaskExecutor extends PersistentTasksExecutor { + private static final Logger logger = LogManager.getLogger(GeoIpDownloader.class); + + protected EnterpriseGeoIpDownloaderTaskExecutor(ThreadPool threadPool) { + super(ENTERPRISE_GEOIP_DOWNLOADER, threadPool.generic()); + } + + @Override + protected void nodeOperation(AllocatedPersistentTask task, PersistentTaskParams params, PersistentTaskState state) { + // TODO so we'd want to override createTask and have our own AllocatedPersistentTask and its associated state, + // but this is enough to prove the principle in the meantime. + logger.info("Running enterprise downloader, state was [{}]", state); + } +} diff --git a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java index 9d0f9848d97b6..03638ace419c1 100644 --- a/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java +++ b/modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.settings.SettingsModule; import org.elasticsearch.features.NodeFeature; import org.elasticsearch.indices.SystemIndexDescriptor; +import org.elasticsearch.ingest.EnterpriseGeoIpTask.EnterpriseGeoIpTaskParams; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.ingest.Processor; import org.elasticsearch.ingest.geoip.stats.GeoIpDownloaderStats; @@ -57,6 +58,7 @@ import java.util.function.Supplier; import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME; +import static org.elasticsearch.ingest.EnterpriseGeoIpTask.ENTERPRISE_GEOIP_DOWNLOADER; import static org.elasticsearch.ingest.IngestService.INGEST_ORIGIN; import static org.elasticsearch.ingest.geoip.GeoIpDownloader.DATABASES_INDEX; import static org.elasticsearch.ingest.geoip.GeoIpDownloader.DATABASES_INDEX_PATTERN; @@ -78,6 +80,7 @@ public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, SystemInd private final SetOnce ingestService = new SetOnce<>(); private final SetOnce databaseRegistry = new SetOnce<>(); private GeoIpDownloaderTaskExecutor geoIpDownloaderTaskExecutor; + private EnterpriseGeoIpDownloaderTaskExecutor enterpriseGeoIpDownloaderTaskExecutor; @Override public List> getSettings() { @@ -123,7 +126,10 @@ public Collection createComponents(PluginServices services) { services.threadPool() ); geoIpDownloaderTaskExecutor.init(); - return List.of(databaseRegistry.get(), geoIpDownloaderTaskExecutor); + + enterpriseGeoIpDownloaderTaskExecutor = new EnterpriseGeoIpDownloaderTaskExecutor(services.threadPool()); + + return List.of(databaseRegistry.get(), geoIpDownloaderTaskExecutor, enterpriseGeoIpDownloaderTaskExecutor); } @Override @@ -139,7 +145,7 @@ public List> getPersistentTasksExecutor( SettingsModule settingsModule, IndexNameExpressionResolver expressionResolver ) { - return List.of(geoIpDownloaderTaskExecutor); + return List.of(geoIpDownloaderTaskExecutor, enterpriseGeoIpDownloaderTaskExecutor); } @Override @@ -166,7 +172,12 @@ public List getRestHandlers( public List getNamedXContent() { return List.of( new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(GEOIP_DOWNLOADER), GeoIpTaskParams::fromXContent), - new NamedXContentRegistry.Entry(PersistentTaskState.class, new ParseField(GEOIP_DOWNLOADER), GeoIpTaskState::fromXContent) + new NamedXContentRegistry.Entry(PersistentTaskState.class, new ParseField(GEOIP_DOWNLOADER), GeoIpTaskState::fromXContent), + new NamedXContentRegistry.Entry( + PersistentTaskParams.class, + new ParseField(ENTERPRISE_GEOIP_DOWNLOADER), + EnterpriseGeoIpTaskParams::fromXContent + ) ); } @@ -175,6 +186,7 @@ public List getNamedWriteables() { return List.of( new NamedWriteableRegistry.Entry(PersistentTaskState.class, GEOIP_DOWNLOADER, GeoIpTaskState::new), new NamedWriteableRegistry.Entry(PersistentTaskParams.class, GEOIP_DOWNLOADER, GeoIpTaskParams::new), + new NamedWriteableRegistry.Entry(PersistentTaskParams.class, ENTERPRISE_GEOIP_DOWNLOADER, EnterpriseGeoIpTaskParams::new), new NamedWriteableRegistry.Entry(Task.Status.class, GEOIP_DOWNLOADER, GeoIpDownloaderStats::new) ); } diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 0a75ccfbbedf3..d57fdadd4862b 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -204,6 +204,7 @@ static TransportVersion def(int id) { public static final TransportVersion EVENT_INGESTED_RANGE_IN_CLUSTER_STATE = def(8_695_00_0); public static final TransportVersion ESQL_ADD_AGGREGATE_TYPE = def(8_696_00_0); public static final TransportVersion SECURITY_MIGRATIONS_MIGRATION_NEEDED_ADDED = def(8_697_00_0); + public static final TransportVersion ENTERPRISE_GEOIP_DOWNLOADER = def(8_698_00_0); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/ingest/EnterpriseGeoIpTask.java b/server/src/main/java/org/elasticsearch/ingest/EnterpriseGeoIpTask.java new file mode 100644 index 0000000000000..c457c6a5770fd --- /dev/null +++ b/server/src/main/java/org/elasticsearch/ingest/EnterpriseGeoIpTask.java @@ -0,0 +1,84 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.ingest; + +import org.elasticsearch.TransportVersion; +import org.elasticsearch.TransportVersions; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.persistent.PersistentTaskParams; +import org.elasticsearch.xcontent.ObjectParser; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentParser; + +import java.io.IOException; + +/** + * As a relatively minor hack, this class holds the string constant that defines both the id + * and the name of the task for the new ip geolocation database downloader feature. It also provides the + * PersistentTaskParams that are necessary to start the task and to run it. + *

+ * Defining this in Elasticsearch itself gives us a reasonably tidy version of things where we don't + * end up with strange inter-module dependencies. It's not ideal, but it works fine. + */ +public final class EnterpriseGeoIpTask { + + private EnterpriseGeoIpTask() { + // utility class + } + + public static final String ENTERPRISE_GEOIP_DOWNLOADER = "enterprise-geoip-downloader"; + + public static class EnterpriseGeoIpTaskParams implements PersistentTaskParams { + + public static final ObjectParser PARSER = new ObjectParser<>( + ENTERPRISE_GEOIP_DOWNLOADER, + true, + EnterpriseGeoIpTaskParams::new + ); + + public EnterpriseGeoIpTaskParams() {} + + public EnterpriseGeoIpTaskParams(StreamInput in) {} + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.endObject(); + return builder; + } + + @Override + public String getWriteableName() { + return ENTERPRISE_GEOIP_DOWNLOADER; + } + + @Override + public TransportVersion getMinimalSupportedVersion() { + return TransportVersions.ENTERPRISE_GEOIP_DOWNLOADER; + } + + @Override + public void writeTo(StreamOutput out) {} + + public static EnterpriseGeoIpTaskParams fromXContent(XContentParser parser) { + return PARSER.apply(parser, null); + } + + @Override + public int hashCode() { + return 0; + } + + @Override + public boolean equals(Object obj) { + return obj instanceof EnterpriseGeoIpTaskParams; + } + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackField.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackField.java index 801ef2c463e95..42af2f905c5ee 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackField.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/XPackField.java @@ -87,6 +87,7 @@ public final class XPackField { /** Name constant for the redact processor feature. */ public static final String REDACT_PROCESSOR = "redact_processor"; + public static final String ENTERPRISE_GEOIP_DOWNLOADER = "enterprise_geoip_downloader"; /* Name for Universal Profiling. */ public static final String UNIVERSAL_PROFILING = "universal_profiling"; diff --git a/x-pack/plugin/geoip-enterprise-downloader/build.gradle b/x-pack/plugin/geoip-enterprise-downloader/build.gradle new file mode 100644 index 0000000000000..ab16609ac7aad --- /dev/null +++ b/x-pack/plugin/geoip-enterprise-downloader/build.gradle @@ -0,0 +1,19 @@ +apply plugin: 'elasticsearch.internal-es-plugin' +apply plugin: 'elasticsearch.internal-yaml-rest-test' +apply plugin: 'elasticsearch.internal-cluster-test' +esplugin { + name 'x-pack-geoip-enterprise-downloader' + description 'Elasticsearch Expanded Pack Plugin - Geoip Enterprise Downloader' + classname 'org.elasticsearch.xpack.geoip.EnterpriseDownloaderPlugin' + extendedPlugins = ['x-pack-core'] +} +base { + archivesName = 'x-pack-geoip-enterprise-downloader' +} + +dependencies { + compileOnly project(path: xpackModule('core')) + testImplementation(testArtifact(project(xpackModule('core')))) +} + +addQaCheckDependencies(project) diff --git a/x-pack/plugin/geoip-enterprise-downloader/src/main/java/org/elasticsearch/xpack/geoip/EnterpriseDownloaderPlugin.java b/x-pack/plugin/geoip-enterprise-downloader/src/main/java/org/elasticsearch/xpack/geoip/EnterpriseDownloaderPlugin.java new file mode 100644 index 0000000000000..9790fcd4eac6e --- /dev/null +++ b/x-pack/plugin/geoip-enterprise-downloader/src/main/java/org/elasticsearch/xpack/geoip/EnterpriseDownloaderPlugin.java @@ -0,0 +1,42 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.geoip; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.xpack.core.XPackPlugin; + +import java.util.Collection; +import java.util.List; + +public class EnterpriseDownloaderPlugin extends Plugin { + + private final Settings settings; + private EnterpriseGeoIpDownloaderLicenseListener enterpriseGeoIpDownloaderTaskExecutor; + + public EnterpriseDownloaderPlugin(final Settings settings) { + this.settings = settings; + } + + protected XPackLicenseState getLicenseState() { + return XPackPlugin.getSharedLicenseState(); + } + + @Override + public Collection createComponents(PluginServices services) { + enterpriseGeoIpDownloaderTaskExecutor = new EnterpriseGeoIpDownloaderLicenseListener( + services.client(), + services.clusterService(), + services.threadPool(), + getLicenseState() + ); + enterpriseGeoIpDownloaderTaskExecutor.init(); + // TODO do we even need to return this? is there a benefit or cost to doing or not doing it? + return List.of(enterpriseGeoIpDownloaderTaskExecutor); + } +} diff --git a/x-pack/plugin/geoip-enterprise-downloader/src/main/java/org/elasticsearch/xpack/geoip/EnterpriseGeoIpDownloaderLicenseListener.java b/x-pack/plugin/geoip-enterprise-downloader/src/main/java/org/elasticsearch/xpack/geoip/EnterpriseGeoIpDownloaderLicenseListener.java new file mode 100644 index 0000000000000..9e430f4c8f0d2 --- /dev/null +++ b/x-pack/plugin/geoip-enterprise-downloader/src/main/java/org/elasticsearch/xpack/geoip/EnterpriseGeoIpDownloaderLicenseListener.java @@ -0,0 +1,115 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.geoip; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.ResourceAlreadyExistsException; +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.UpdateForV9; +import org.elasticsearch.ingest.EnterpriseGeoIpTask.EnterpriseGeoIpTaskParams; +import org.elasticsearch.license.License; +import org.elasticsearch.license.LicenseStateListener; +import org.elasticsearch.license.LicensedFeature; +import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata; +import org.elasticsearch.persistent.PersistentTasksService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.RemoteTransportException; +import org.elasticsearch.xpack.core.XPackField; + +import static org.elasticsearch.ingest.EnterpriseGeoIpTask.ENTERPRISE_GEOIP_DOWNLOADER; + +public class EnterpriseGeoIpDownloaderLicenseListener implements LicenseStateListener { + private static final Logger logger = LogManager.getLogger(EnterpriseGeoIpDownloaderLicenseListener.class); + + private final PersistentTasksService persistentTasksService; + private final ClusterService clusterService; + private final XPackLicenseState licenseState; + private final LicensedFeature.Momentary feature; + + protected EnterpriseGeoIpDownloaderLicenseListener( + Client client, + ClusterService clusterService, + ThreadPool threadPool, + XPackLicenseState licenseState + ) { + this.persistentTasksService = new PersistentTasksService(clusterService, threadPool, client); + this.clusterService = clusterService; + // TODO maybe a static feature is more conventional? i dunno! + this.feature = LicensedFeature.momentary(null, XPackField.ENTERPRISE_GEOIP_DOWNLOADER, License.OperationMode.PLATINUM); + this.licenseState = licenseState; + } + + @UpdateForV9 // use MINUS_ONE once that means no timeout + private static final TimeValue MASTER_TIMEOUT = TimeValue.MAX_VALUE; + private volatile boolean licenseStateListenerRegistered; + + public void init() { + // TODO alternatively we could have the equivalent of this code in EnterpriseDownloaderPlugin itself... :shrug: + listenForLicenseStateChanges(); + } + + void listenForLicenseStateChanges() { + assert licenseStateListenerRegistered == false; + licenseState.addListener(this); + licenseStateListenerRegistered = true; + } + + @Override + public void licenseStateChanged() { + if (clusterService.state().nodes().isLocalNodeElectedMaster() == false) { + // we should only start/stop task from single node, master is the best as it will go through it anyway + return; + } + assert licenseStateListenerRegistered; + // TODO remove dev-time only logging + // TODO we send a start even if it was already started... + // TODO we send a stop even if there was never a start... + if (feature.checkWithoutTracking(licenseState)) { + logger.debug("License is valid, ensuring enterprise geoip downloader is started"); + startTask(); + } else { + logger.debug("License is not valid, ensuring enterprise geoip downloader is stopped"); + stopTask(); + } + } + + private void startTask() { + persistentTasksService.sendStartRequest( + ENTERPRISE_GEOIP_DOWNLOADER, + ENTERPRISE_GEOIP_DOWNLOADER, + new EnterpriseGeoIpTaskParams(), + MASTER_TIMEOUT, + ActionListener.wrap(r -> logger.debug("Started geoip downloader task"), e -> { + Throwable t = e instanceof RemoteTransportException ? ExceptionsHelper.unwrapCause(e) : e; + if (t instanceof ResourceAlreadyExistsException == false) { + logger.error("failed to create geoip downloader task", e); + } + }) + ); + } + + private void stopTask() { + ActionListener> listener = ActionListener.wrap( + r -> logger.debug("Stopped geoip downloader task"), + e -> { + Throwable t = e instanceof RemoteTransportException ? ExceptionsHelper.unwrapCause(e) : e; + if (t instanceof ResourceNotFoundException == false) { + logger.error("failed to remove geoip downloader task", e); + } + } + ); + persistentTasksService.sendRemoveRequest(ENTERPRISE_GEOIP_DOWNLOADER, MASTER_TIMEOUT, listener); + } +}