Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enterprise GeoIp Task experiment #110207

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from
Draft
1 change: 1 addition & 0 deletions modules/ingest-geoip/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@
requires com.maxmind.db;

exports org.elasticsearch.ingest.geoip.stats to org.elasticsearch.server;
exports org.elasticsearch.ingest.geoip.enterprise;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there some to [...] that could go here that would work?

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.ingest.geoip;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.persistent.PersistentTaskParams;
import org.elasticsearch.persistent.PersistentTaskState;
import org.elasticsearch.persistent.PersistentTasksExecutor;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.threadpool.ThreadPool;

import static org.elasticsearch.ingest.geoip.enterprise.EnterpriseGeoIpTaskParams.ENTERPRISE_GEOIP_DOWNLOADER;

public class EnterpriseGeoIpDownloaderTaskExecutor extends PersistentTasksExecutor<PersistentTaskParams> {
private static final Logger logger = LogManager.getLogger(GeoIpDownloader.class);

private final PersistentTasksService persistentTasksService;

protected EnterpriseGeoIpDownloaderTaskExecutor(
Client client,
HttpClient httpClient,
ClusterService clusterService,
ThreadPool threadPool
) {
super(ENTERPRISE_GEOIP_DOWNLOADER, threadPool.generic());
persistentTasksService = new PersistentTasksService(clusterService, threadPool, client);
}

@Override
protected void nodeOperation(AllocatedPersistentTask task, PersistentTaskParams params, PersistentTaskState state) {
logger.info("Running enterprise downloader");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.geoip.enterprise.EnterpriseGeoIpTaskParams;
import org.elasticsearch.ingest.geoip.stats.GeoIpDownloaderStats;
import org.elasticsearch.ingest.geoip.stats.GeoIpStatsAction;
import org.elasticsearch.ingest.geoip.stats.GeoIpStatsTransportAction;
Expand Down Expand Up @@ -61,6 +62,7 @@
import static org.elasticsearch.ingest.geoip.GeoIpDownloader.DATABASES_INDEX;
import static org.elasticsearch.ingest.geoip.GeoIpDownloader.DATABASES_INDEX_PATTERN;
import static org.elasticsearch.ingest.geoip.GeoIpDownloader.GEOIP_DOWNLOADER;
import static org.elasticsearch.ingest.geoip.enterprise.EnterpriseGeoIpTaskParams.ENTERPRISE_GEOIP_DOWNLOADER;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;

public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, SystemIndexPlugin, Closeable, PersistentTaskPlugin, ActionPlugin {
Expand All @@ -78,6 +80,7 @@ public class IngestGeoIpPlugin extends Plugin implements IngestPlugin, SystemInd
private final SetOnce<IngestService> ingestService = new SetOnce<>();
private final SetOnce<DatabaseNodeService> databaseRegistry = new SetOnce<>();
private GeoIpDownloaderTaskExecutor geoIpDownloaderTaskExecutor;
private EnterpriseGeoIpDownloaderTaskExecutor enterpriseGeoIpDownloaderTaskExecutor;

@Override
public List<Setting<?>> getSettings() {
Expand All @@ -90,23 +93,6 @@ public List<Setting<?>> getSettings() {
);
}

@Override
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
ingestService.set(parameters.ingestService);

long cacheSize = CACHE_SIZE.get(parameters.env.settings());
GeoIpCache geoIpCache = new GeoIpCache(cacheSize);
DatabaseNodeService registry = new DatabaseNodeService(
parameters.env,
parameters.client,
geoIpCache,
parameters.genericExecutor,
parameters.ingestService.getClusterService()
);
databaseRegistry.set(registry);
return Map.of(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory(registry));
}

@Override
public Collection<?> createComponents(PluginServices services) {
try {
Expand All @@ -123,7 +109,32 @@ public Collection<?> createComponents(PluginServices services) {
services.threadPool()
);
geoIpDownloaderTaskExecutor.init();
return List.of(databaseRegistry.get(), geoIpDownloaderTaskExecutor);

enterpriseGeoIpDownloaderTaskExecutor = new EnterpriseGeoIpDownloaderTaskExecutor(
services.client(),
new HttpClient(),
services.clusterService(),
services.threadPool()
);

return List.of(databaseRegistry.get(), geoIpDownloaderTaskExecutor, enterpriseGeoIpDownloaderTaskExecutor);
}

@Override
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
ingestService.set(parameters.ingestService);

long cacheSize = CACHE_SIZE.get(parameters.env.settings());
GeoIpCache geoIpCache = new GeoIpCache(cacheSize);
DatabaseNodeService registry = new DatabaseNodeService(
parameters.env,
parameters.client,
geoIpCache,
parameters.genericExecutor,
parameters.ingestService.getClusterService()
);
databaseRegistry.set(registry);
return Map.of(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory(registry));
}

@Override
Expand All @@ -139,7 +150,7 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(
SettingsModule settingsModule,
IndexNameExpressionResolver expressionResolver
) {
return List.of(geoIpDownloaderTaskExecutor);
return List.of(geoIpDownloaderTaskExecutor, enterpriseGeoIpDownloaderTaskExecutor);
}

@Override
Expand All @@ -166,7 +177,12 @@ public List<RestHandler> getRestHandlers(
public List<NamedXContentRegistry.Entry> getNamedXContent() {
return List.of(
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(GEOIP_DOWNLOADER), GeoIpTaskParams::fromXContent),
new NamedXContentRegistry.Entry(PersistentTaskState.class, new ParseField(GEOIP_DOWNLOADER), GeoIpTaskState::fromXContent)
new NamedXContentRegistry.Entry(PersistentTaskState.class, new ParseField(GEOIP_DOWNLOADER), GeoIpTaskState::fromXContent),
new NamedXContentRegistry.Entry(
PersistentTaskParams.class,
new ParseField(ENTERPRISE_GEOIP_DOWNLOADER),
EnterpriseGeoIpTaskParams::fromXContent
)
);
}

Expand All @@ -175,6 +191,7 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
return List.of(
new NamedWriteableRegistry.Entry(PersistentTaskState.class, GEOIP_DOWNLOADER, GeoIpTaskState::new),
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, GEOIP_DOWNLOADER, GeoIpTaskParams::new),
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, ENTERPRISE_GEOIP_DOWNLOADER, EnterpriseGeoIpTaskParams::new),
new NamedWriteableRegistry.Entry(Task.Status.class, GEOIP_DOWNLOADER, GeoIpDownloaderStats::new)
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.ingest.geoip.enterprise;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.persistent.PersistentTaskParams;
import org.elasticsearch.xcontent.ObjectParser;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;

import java.io.IOException;

import static org.elasticsearch.ingest.geoip.GeoIpDownloader.GEOIP_DOWNLOADER;

public class EnterpriseGeoIpTaskParams implements PersistentTaskParams {

public static final String ENTERPRISE_GEOIP_DOWNLOADER = "enterprise-" + GEOIP_DOWNLOADER;

public static final ObjectParser<EnterpriseGeoIpTaskParams, Void> PARSER = new ObjectParser<>(
ENTERPRISE_GEOIP_DOWNLOADER,
true,
EnterpriseGeoIpTaskParams::new
);

public EnterpriseGeoIpTaskParams() {}

public EnterpriseGeoIpTaskParams(StreamInput in) {}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.endObject();
return builder;
}

@Override
public String getWriteableName() {
return ENTERPRISE_GEOIP_DOWNLOADER;
}

@Override
public TransportVersion getMinimalSupportedVersion() {
return TransportVersions.ENTERPRISE_GEOIP_DOWNLOADER;
}

@Override
public void writeTo(StreamOutput out) {}

public static EnterpriseGeoIpTaskParams fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}

@Override
public int hashCode() {
return 0;
}

@Override
public boolean equals(Object obj) {
return obj instanceof EnterpriseGeoIpTaskParams;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ML_ANTHROPIC_INTEGRATION_ADDED = def(8_693_00_0);
public static final TransportVersion ML_INFERENCE_GOOGLE_VERTEX_AI_EMBEDDINGS_ADDED = def(8_694_00_0);
public static final TransportVersion EVENT_INGESTED_RANGE_IN_CLUSTER_STATE = def(8_695_00_0);
public static final TransportVersion ENTERPRISE_GEOIP_DOWNLOADER = def(8_696_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,4 @@
/**
* Parameters used to start persistent task
*/
public interface PersistentTaskParams extends VersionedNamedWriteable, ToXContentObject {

}
public interface PersistentTaskParams extends VersionedNamedWriteable, ToXContentObject {}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public final class XPackField {

/** Name constant for the redact processor feature. */
public static final String REDACT_PROCESSOR = "redact_processor";
public static final String ENTERPRISE_GEOIP_DOWNLOADER = "enterprise_geoip_downloader";
/* Name for Universal Profiling. */
public static final String UNIVERSAL_PROFILING = "universal_profiling";

Expand Down
20 changes: 20 additions & 0 deletions x-pack/plugin/geoip-enterprise-downloader/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
apply plugin: 'elasticsearch.internal-es-plugin'
apply plugin: 'elasticsearch.internal-yaml-rest-test'
apply plugin: 'elasticsearch.internal-cluster-test'
esplugin {
name 'x-pack-geoip-enterprise-downloader'
description 'Elasticsearch Expanded Pack Plugin - Geoip Enterprise Downloader'
classname 'org.elasticsearch.xpack.geoip.EnterpriseDownloaderPlugin'
extendedPlugins = ['x-pack-core']
}
base {
archivesName = 'x-pack-geoip-enterprise-downloader'
}

dependencies {
compileOnly project(path: xpackModule('core'))
implementation project(':modules:ingest-geoip')
testImplementation(testArtifact(project(xpackModule('core'))))
}

addQaCheckDependencies(project)
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.geoip;

import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.persistent.PersistentTasksExecutor;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.PersistentTaskPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.XPackPlugin;

import java.util.Collection;
import java.util.List;

public class EnterpriseDownloaderPlugin extends Plugin implements IngestPlugin, PersistentTaskPlugin {

private final Settings settings;
private EnterpriseGeoIpDownloaderLicenseListener enterpriseGeoIpDownloaderTaskExecutor;

public EnterpriseDownloaderPlugin(final Settings settings) {
this.settings = settings;
}

protected XPackLicenseState getLicenseState() {
return XPackPlugin.getSharedLicenseState();
}

@Override
public Collection<?> createComponents(PluginServices services) {
enterpriseGeoIpDownloaderTaskExecutor = new EnterpriseGeoIpDownloaderLicenseListener(
services.client(),
services.clusterService(),
services.threadPool(),
getLicenseState()
);
enterpriseGeoIpDownloaderTaskExecutor.init();
return List.of(enterpriseGeoIpDownloaderTaskExecutor);
}

@Override
public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(
ClusterService clusterService,
ThreadPool threadPool,
Client client,
SettingsModule settingsModule,
IndexNameExpressionResolver expressionResolver
) {
return List.of();
}
}
Loading