Skip to content

Commit

Permalink
Create Job API
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <[email protected]>
  • Loading branch information
vamsi-amazon committed Sep 18, 2023
1 parent a7af359 commit d19c862
Show file tree
Hide file tree
Showing 45 changed files with 1,642 additions and 60 deletions.
4 changes: 2 additions & 2 deletions common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ dependencies {
api group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0'
api group: 'com.squareup.okhttp3', name: 'okhttp', version: '4.9.3'
implementation 'com.github.babbel:okhttp-aws-signer:1.0.2'
api group: 'com.amazonaws', name: 'aws-java-sdk-core', version: '1.12.1'
api group: 'com.amazonaws', name: 'aws-java-sdk-sts', version: '1.12.1'
api group: 'com.amazonaws', name: 'aws-java-sdk-core', version: '1.12.545'
api group: 'com.amazonaws', name: 'aws-java-sdk-sts', version: '1.12.545'
implementation "com.github.seancfoley:ipaddress:5.4.0"

testImplementation group: 'junit', name: 'junit', version: '4.13.2'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public enum Key {

METRICS_ROLLING_WINDOW("plugins.query.metrics.rolling_window"),
METRICS_ROLLING_INTERVAL("plugins.query.metrics.rolling_interval"),

SPARK_EXECUTION_ENGINE_CONFIG("plugins.query.executionengine.spark.config"),
CLUSTER_NAME("cluster.name");

@Getter private final String keyValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ public interface DataSourceService {
*/
DataSourceMetadata getDataSourceMetadata(String name);

/**
* Returns dataSourceMetadata object with specific name. The returned objects contain all the
* metadata information without any filtering.
*
* @param name name of the {@link DataSource}.
* @return set of {@link DataSourceMetadata}.
*/
DataSourceMetadata getRawDataSourceMetadata(String name);

/**
* Register {@link DataSource} defined by {@link DataSourceMetadata}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,11 @@ public DataSourceMetadata getDataSourceMetadata(String name) {
return null;
}

@Override
public DataSourceMetadata getRawDataSourceMetadata(String name) {
return null;
}

@Override
public void createDataSource(DataSourceMetadata metadata) {
throw new UnsupportedOperationException("unsupported operation");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,29 +64,17 @@ public Set<DataSourceMetadata> getDataSourceMetadata(boolean isDefaultDataSource
}

@Override
public DataSourceMetadata getDataSourceMetadata(String datasourceName) {
Optional<DataSourceMetadata> dataSourceMetadataOptional =
getDataSourceMetadataFromName(datasourceName);
if (dataSourceMetadataOptional.isEmpty()) {
throw new IllegalArgumentException(
"DataSource with name: " + datasourceName + " doesn't exist.");
}
removeAuthInfo(dataSourceMetadataOptional.get());
return dataSourceMetadataOptional.get();
public DataSourceMetadata getDataSourceMetadata(String dataSourceName) {
DataSourceMetadata dataSourceMetadata = getRawDataSourceMetadata(dataSourceName);
removeAuthInfo(dataSourceMetadata);
return dataSourceMetadata;
}

@Override
public DataSource getDataSource(String dataSourceName) {
Optional<DataSourceMetadata> dataSourceMetadataOptional =
getDataSourceMetadataFromName(dataSourceName);
if (dataSourceMetadataOptional.isEmpty()) {
throw new DataSourceNotFoundException(
String.format("DataSource with name %s doesn't exist.", dataSourceName));
} else {
DataSourceMetadata dataSourceMetadata = dataSourceMetadataOptional.get();
this.dataSourceUserAuthorizationHelper.authorizeDataSource(dataSourceMetadata);
return dataSourceLoaderCache.getOrLoadDataSource(dataSourceMetadata);
}
DataSourceMetadata dataSourceMetadata = getRawDataSourceMetadata(dataSourceName);
this.dataSourceUserAuthorizationHelper.authorizeDataSource(dataSourceMetadata);
return dataSourceLoaderCache.getOrLoadDataSource(dataSourceMetadata);
}

@Override
Expand Down Expand Up @@ -146,11 +134,20 @@ private void validateDataSourceMetaData(DataSourceMetadata metadata) {
+ " Properties are required parameters.");
}

private Optional<DataSourceMetadata> getDataSourceMetadataFromName(String dataSourceName) {
@Override
public DataSourceMetadata getRawDataSourceMetadata(String dataSourceName) {
if (dataSourceName.equals(DEFAULT_DATASOURCE_NAME)) {
return Optional.of(DataSourceMetadata.defaultOpenSearchDataSourceMetadata());
return DataSourceMetadata.defaultOpenSearchDataSourceMetadata();

} else {
return this.dataSourceMetadataStorage.getDataSourceMetadata(dataSourceName);
Optional<DataSourceMetadata> dataSourceMetadataOptional =
this.dataSourceMetadataStorage.getDataSourceMetadata(dataSourceName);
if (dataSourceMetadataOptional.isEmpty()) {
throw new DataSourceNotFoundException(
String.format("DataSource with name %s doesn't exist.", dataSourceName));
} else {
return dataSourceMetadataOptional.get();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,11 +359,11 @@ void testRemovalOfAuthorizationInfo() {
@Test
void testGetDataSourceMetadataForNonExistingDataSource() {
when(dataSourceMetadataStorage.getDataSourceMetadata("testDS")).thenReturn(Optional.empty());
IllegalArgumentException exception =
DataSourceNotFoundException exception =
assertThrows(
IllegalArgumentException.class,
DataSourceNotFoundException.class,
() -> dataSourceService.getDataSourceMetadata("testDS"));
assertEquals("DataSource with name: testDS doesn't exist.", exception.getMessage());
assertEquals("DataSource with name testDS doesn't exist.", exception.getMessage());
}

@Test
Expand All @@ -385,4 +385,28 @@ void testGetDataSourceMetadataForSpecificDataSourceName() {
assertFalse(dataSourceMetadata.getProperties().containsKey("prometheus.auth.password"));
verify(dataSourceMetadataStorage, times(1)).getDataSourceMetadata("testDS");
}

@Test
void testGetRawDataSourceMetadata() {
HashMap<String, String> properties = new HashMap<>();
properties.put("prometheus.uri", "https://localhost:9090");
properties.put("prometheus.auth.type", "basicauth");
properties.put("prometheus.auth.username", "username");
properties.put("prometheus.auth.password", "password");
DataSourceMetadata dataSourceMetadata =
new DataSourceMetadata(
"testDS",
DataSourceType.PROMETHEUS,
Collections.singletonList("prometheus_access"),
properties);
when(dataSourceMetadataStorage.getDataSourceMetadata("testDS"))
.thenReturn(Optional.of(dataSourceMetadata));

DataSourceMetadata dataSourceMetadata1 = dataSourceService.getRawDataSourceMetadata("testDS");
assertEquals("testDS", dataSourceMetadata1.getName());
assertEquals(DataSourceType.PROMETHEUS, dataSourceMetadata1.getConnector());
assertTrue(dataSourceMetadata1.getProperties().containsKey("prometheus.auth.type"));
assertTrue(dataSourceMetadata1.getProperties().containsKey("prometheus.auth.username"));
assertTrue(dataSourceMetadata1.getProperties().containsKey("prometheus.auth.password"));
}
}
18 changes: 18 additions & 0 deletions docs/user/interfaces/jobinterface.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
.. highlight:: sh

========
Endpoint
========

.. rubric:: Table of contents

.. contents::
:local:
:depth: 1


Introduction
============

For supporting s3Glue and Cloudwatch datasources, we have introduced a new Execution Engine on top of Spark.
All the queries to be executed on spark execution engine are exposed via Job APIs.
1 change: 1 addition & 0 deletions integ-test/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ configurations.all {
resolutionStrategy.force "org.jetbrains.kotlin:kotlin-stdlib-jdk7:1.5.31"
resolutionStrategy.force "joda-time:joda-time:2.10.12"
resolutionStrategy.force "org.slf4j:slf4j-api:1.7.36"
resolutionStrategy.force "com.amazonaws:aws-java-sdk-core:1.12.545"
}

configurations {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@ public class OpenSearchSettings extends Settings {
Setting.Property.NodeScope,
Setting.Property.Dynamic);

public static final Setting<String> SPARK_EXECUTION_ENGINE_CONFIG =
Setting.simpleString(
Key.SPARK_EXECUTION_ENGINE_CONFIG.getKeyValue(),
Setting.Property.NodeScope,
Setting.Property.Dynamic);

/** Construct OpenSearchSetting. The OpenSearchSetting must be singleton. */
@SuppressWarnings("unchecked")
public OpenSearchSettings(ClusterSettings clusterSettings) {
Expand Down Expand Up @@ -193,6 +199,12 @@ public OpenSearchSettings(ClusterSettings clusterSettings) {
Key.DATASOURCES_URI_HOSTS_DENY_LIST,
DATASOURCE_URI_HOSTS_DENY_LIST,
new Updater(Key.DATASOURCES_URI_HOSTS_DENY_LIST));
register(
settingBuilder,
clusterSettings,
Key.SPARK_EXECUTION_ENGINE_CONFIG,
SPARK_EXECUTION_ENGINE_CONFIG,
new Updater(Key.SPARK_EXECUTION_ENGINE_CONFIG));
registerNonDynamicSettings(
settingBuilder, clusterSettings, Key.CLUSTER_NAME, ClusterName.CLUSTER_NAME_SETTING);
defaultSettings = settingBuilder.build();
Expand Down Expand Up @@ -257,6 +269,7 @@ public static List<Setting<?>> pluginSettings() {
.add(METRICS_ROLLING_WINDOW_SETTING)
.add(METRICS_ROLLING_INTERVAL_SETTING)
.add(DATASOURCE_URI_HOSTS_DENY_LIST)
.add(SPARK_EXECUTION_ENGINE_CONFIG)
.build();
}

Expand Down
4 changes: 2 additions & 2 deletions plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ dependencies {

testImplementation group: 'net.bytebuddy', name: 'byte-buddy-agent', version: '1.12.13'
testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: '2.1'
testImplementation group: 'org.mockito', name: 'mockito-core', version: '5.4.0'
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '5.4.0'
testImplementation group: 'org.mockito', name: 'mockito-core', version: '5.5.0'
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '5.5.0'
testImplementation 'org.junit.jupiter:junit-jupiter:5.6.2'
}

Expand Down
49 changes: 48 additions & 1 deletion plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,13 @@

import static org.opensearch.sql.datasource.model.DataSourceMetadata.defaultOpenSearchDataSourceMetadata;

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.emrserverless.AWSEMRServerless;
import com.amazonaws.services.emrserverless.AWSEMRServerlessClientBuilder;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -83,6 +88,15 @@
import org.opensearch.sql.plugin.transport.TransportPPLQueryAction;
import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse;
import org.opensearch.sql.prometheus.storage.PrometheusStorageFactory;
import org.opensearch.sql.spark.client.EmrServerlessClient;
import org.opensearch.sql.spark.client.EmrServerlessClientImpl;
import org.opensearch.sql.spark.config.SparkExecutionEngineConfig;
import org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher;
import org.opensearch.sql.spark.jobs.JobExecutorService;
import org.opensearch.sql.spark.jobs.JobExecutorServiceImpl;
import org.opensearch.sql.spark.jobs.JobMetadataStorageService;
import org.opensearch.sql.spark.jobs.OpensearchJobMetadataStorageService;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;
import org.opensearch.sql.spark.rest.RestJobManagementAction;
import org.opensearch.sql.spark.storage.SparkStorageFactory;
import org.opensearch.sql.spark.transport.TransportCreateJobRequestAction;
Expand Down Expand Up @@ -110,6 +124,7 @@ public class SQLPlugin extends Plugin implements ActionPlugin, ScriptPlugin {

private NodeClient client;
private DataSourceServiceImpl dataSourceService;
private JobExecutorService jobExecutorService;
private Injector injector;

public String name() {
Expand Down Expand Up @@ -202,6 +217,7 @@ public Collection<Object> createComponents(
dataSourceService.createDataSource(defaultOpenSearchDataSourceMetadata());
LocalClusterState.state().setClusterService(clusterService);
LocalClusterState.state().setPluginSettings((OpenSearchSettings) pluginSettings);
this.jobExecutorService = createJobManagementService();

ModulesBuilder modules = new ModulesBuilder();
modules.add(new OpenSearchPluginModule());
Expand All @@ -213,7 +229,7 @@ public Collection<Object> createComponents(
});

injector = modules.createInjector();
return ImmutableList.of(dataSourceService);
return ImmutableList.of(dataSourceService, jobExecutorService);
}

@Override
Expand Down Expand Up @@ -270,4 +286,35 @@ private DataSourceServiceImpl createDataSourceService() {
dataSourceMetadataStorage,
dataSourceUserAuthorizationHelper);
}

private JobExecutorService createJobManagementService() {
JobMetadataStorageService jobMetadataStorageService =
new OpensearchJobMetadataStorageService(client, clusterService);
EmrServerlessClient emrServerlessClient = createEMRServerlessClient();
JobExecutionResponseReader jobExecutionResponseReader = new JobExecutionResponseReader(client);
SparkQueryDispatcher sparkQueryDispatcher =
new SparkQueryDispatcher(
emrServerlessClient, this.dataSourceService, jobExecutionResponseReader);
return new JobExecutorServiceImpl(
jobMetadataStorageService, sparkQueryDispatcher, pluginSettings);
}

private EmrServerlessClient createEMRServerlessClient() {
String sparkExecutionEngineConfigString =
this.pluginSettings.getSettingValue(
org.opensearch.sql.common.setting.Settings.Key.SPARK_EXECUTION_ENGINE_CONFIG);
return AccessController.doPrivileged(
(PrivilegedAction<EmrServerlessClient>)
() -> {
SparkExecutionEngineConfig sparkExecutionEngineConfig =
SparkExecutionEngineConfig.toSparkExecutionEngineConfig(
sparkExecutionEngineConfigString);
AWSEMRServerless awsemrServerless =
AWSEMRServerlessClientBuilder.standard()
.withRegion(sparkExecutionEngineConfig.getRegion())
.withCredentials(new DefaultAWSCredentialsProviderChain())
.build();
return new EmrServerlessClientImpl(awsemrServerless);
});
}
}
9 changes: 9 additions & 0 deletions plugin/src/main/plugin-metadata/plugin-security.policy
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,13 @@ grant {

// ml-commons client
permission java.lang.RuntimePermission "setContextClassLoader";

// aws credentials
permission java.io.FilePermission "${user.home}${/}.aws${/}*", "read";

// Permissions for aws emr servless sdk
permission javax.management.MBeanServerPermission "createMBeanServer";
permission javax.management.MBeanServerPermission "findMBeanServer";
permission javax.management.MBeanPermission "com.amazonaws.metrics.*", "*";
permission javax.management.MBeanTrustPermission "register";
};
10 changes: 8 additions & 2 deletions spark/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@ repositories {

dependencies {
api project(':core')
implementation project(':protocol')
implementation project(':datasources')

implementation group: 'org.opensearch', name: 'opensearch', version: "${opensearch_version}"
implementation group: 'org.json', name: 'json', version: '20230227'
implementation group: 'com.amazonaws', name: 'aws-java-sdk-emr', version: '1.12.1'
api group: 'com.amazonaws', name: 'aws-java-sdk-emr', version: '1.12.545'
api group: 'com.amazonaws', name: 'aws-java-sdk-emrserverless', version: '1.12.545'
implementation group: 'commons-io', name: 'commons-io', version: '2.8.0'

testImplementation('org.junit.jupiter:junit-jupiter:5.6.2')
testImplementation group: 'org.mockito', name: 'mockito-core', version: '5.2.0'
Expand Down Expand Up @@ -56,7 +59,10 @@ jacocoTestCoverageVerification {
excludes = [
'org.opensearch.sql.spark.data.constants.*',
'org.opensearch.sql.spark.rest.*',
'org.opensearch.sql.spark.transport.model.*'
'org.opensearch.sql.spark.transport.model.*',
'org.opensearch.sql.spark.jobs.model.*',
'org.opensearch.sql.spark.jobs.config.*',
'org.opensearch.sql.spark.jobs.execution.*'
]
limit {
counter = 'LINE'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package org.opensearch.sql.spark.client;

import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_INDEX_NAME;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_RESPONSE_BUFFER_INDEX_NAME;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_SQL_APPLICATION_JAR;

import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
Expand Down Expand Up @@ -74,7 +74,7 @@ void runEmrApplication(String query) {
flint.getFlintIntegrationJar(),
sparkApplicationJar,
query,
SPARK_INDEX_NAME,
SPARK_RESPONSE_BUFFER_INDEX_NAME,
flint.getFlintHost(),
flint.getFlintPort(),
flint.getFlintScheme(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.opensearch.sql.spark.client;

import com.amazonaws.services.emrserverless.model.GetJobRunResult;

public interface EmrServerlessClient {

String startJobRun(
String query,
String jobName,
String applicationId,
String executionRoleArn,
String sparkSubmitParams);

GetJobRunResult getJobRunResult(String applicationId, String jobId);
}
Loading

0 comments on commit d19c862

Please sign in to comment.