Skip to content

Commit

Permalink
Merge branch 'main' into parse-command
Browse files Browse the repository at this point in the history
  • Loading branch information
YANG-DB committed Aug 20, 2024
2 parents 16d7cc0 + 6fc2010 commit 57e0c4a
Show file tree
Hide file tree
Showing 106 changed files with 3,317 additions and 1,008 deletions.
1 change: 1 addition & 0 deletions DEVELOPER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ If you get integration test failures with error message "Previous attempts to fi
The `aws-integration` folder contains tests for cloud server providers. For instance, test against AWS OpenSearch domain, configure the following settings. The client will use the default credential provider to access the AWS OpenSearch domain.
```
export AWS_OPENSEARCH_HOST=search-xxx.us-west-2.on.aws
export AWS_OPENSEARCH_SERVERLESS_HOST=xxx.us-west-2.aoss.amazonaws.com
export AWS_REGION=us-west-2
export AWS_EMRS_APPID=xxx
export AWS_EMRS_EXECUTION_ROLE=xxx
Expand Down
22 changes: 14 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ Version compatibility:
| 0.2.0 | 11+ | 3.3.1 | 2.12.14 | 2.6+ |
| 0.3.0 | 11+ | 3.3.2 | 2.12.14 | 2.13+ |
| 0.4.0 | 11+ | 3.3.2 | 2.12.14 | 2.13+ |
| 0.5.0 | 11+ | 3.3.2 | 2.12.14 | 2.13+ |
| 0.5.0 | 11+ | 3.5.1 | 2.12.14 | 2.13+ |
| 0.6.0 | 11+ | 3.5.1 | 2.12.14 | 2.13+ |

## Flint Extension Usage

Expand All @@ -42,31 +43,36 @@ spark-sql --conf "spark.sql.extensions=org.opensearch.flint.spark.FlintPPLSparkE

### Running With both Extension
```
spark-sql --conf "spark.sql.extensions='org.opensearch.flint.spark.FlintPPLSparkExtensions, org.opensearch.flint.spark.FlintSparkExtensions'"
spark-sql --conf "spark.sql.extensions=org.opensearch.flint.spark.FlintPPLSparkExtensions,org.opensearch.flint.spark.FlintSparkExtensions"
```

## Build

To build and run this application with Spark, you can run:
To build and run this application with Spark, you can run (requires Java 11):

```
sbt clean standaloneCosmetic/publishM2
```
then add org.opensearch:opensearch-spark_2.12 when run spark application, for example,
then add org.opensearch:opensearch-spark-standalone_2.12 when run spark application, for example,
```
bin/spark-shell --packages "org.opensearch:opensearch-spark_2.12:0.5.0-SNAPSHOT"
bin/spark-shell --packages "org.opensearch:opensearch-spark-standalone_2.12:0.6.0-SNAPSHOT" \
--conf "spark.sql.extensions=org.opensearch.flint.spark.FlintSparkExtensions" \
--conf "spark.sql.catalog.dev=org.apache.spark.opensearch.catalog.OpenSearchCatalog"
```

### PPL Build & Run

To build and run this PPL in Spark, you can run:
To build and run this PPL in Spark, you can run (requires Java 11):

```
sbt clean sparkPPLCosmetic/publishM2
```
then add org.opensearch:opensearch-spark_2.12 when run spark application, for example,
then add org.opensearch:opensearch-spark-ppl_2.12 when run spark application, for example,
```
bin/spark-shell --packages "org.opensearch:opensearch-spark-ppl_2.12:0.5.0-SNAPSHOT"
bin/spark-shell --packages "org.opensearch:opensearch-spark-ppl_2.12:0.6.0-SNAPSHOT" \
--conf "spark.sql.extensions=org.opensearch.flint.spark.FlintPPLSparkExtensions" \
--conf "spark.sql.catalog.dev=org.apache.spark.opensearch.catalog.OpenSearchCatalog"
```

## Code of Conduct
Expand Down
12 changes: 4 additions & 8 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
import Dependencies._

lazy val scala212 = "2.12.14"
lazy val sparkVersion = "3.3.2"
// Spark jackson version. Spark jackson-module-scala strictly check the jackson-databind version hould compatbile
lazy val sparkVersion = "3.5.1"
// Spark jackson version. Spark jackson-module-scala strictly check the jackson-databind version should compatible
// https://github.com/FasterXML/jackson-module-scala/blob/2.18/src/main/scala/com/fasterxml/jackson/module/scala/JacksonModule.scala#L59
lazy val jacksonVersion = "2.13.4"
lazy val jacksonVersion = "2.15.2"

// The transitive opensearch jackson-databind dependency version should align with Spark jackson databind dependency version.
// Issue: https://github.com/opensearch-project/opensearch-spark/issues/442
Expand All @@ -20,7 +20,7 @@ val sparkMinorVersion = sparkVersion.split("\\.").take(2).mkString(".")

ThisBuild / organization := "org.opensearch"

ThisBuild / version := "0.5.0-SNAPSHOT"
ThisBuild / version := "0.6.0-SNAPSHOT"

ThisBuild / scalaVersion := scala212

Expand Down Expand Up @@ -210,10 +210,6 @@ lazy val flintSparkIntegration = (project in file("flint-spark-integration"))
val oldStrategy = (assembly / assemblyMergeStrategy).value
oldStrategy(x)
},
assembly / assemblyExcludedJars := {
val cp = (assembly / fullClasspath).value
cp filter { file => file.data.getName.contains("LogsConnectorSpark")}
},
assembly / test := (Test / test).value)

lazy val IntegrationTest = config("it") extend Test
Expand Down
5 changes: 5 additions & 0 deletions docs/PPL-Correlation-command.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## PPL Correlation Command

> This is an experimental command - it may be removed in future versions

## Overview

In the past year OpenSearch Observability & security teams have been busy with many aspects of improving data monitoring and visibility.
Expand Down Expand Up @@ -262,6 +265,8 @@ The new correlation command is actually a ‘hidden’ join command therefore th

Catalyst engine will optimize this query according to the most efficient join ordering.

> This is an experimental command - it may be removed in future versions
* * *

## Appendix
Expand Down
4 changes: 2 additions & 2 deletions docs/PPL-on-Spark.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ sbt clean sparkPPLCosmetic/publishM2
```
then add org.opensearch:opensearch-spark_2.12 when run spark application, for example,
```
bin/spark-shell --packages "org.opensearch:opensearch-spark-ppl_2.12:0.5.0-SNAPSHOT"
bin/spark-shell --packages "org.opensearch:opensearch-spark-ppl_2.12:0.6.0-SNAPSHOT"
```

### PPL Extension Usage
Expand All @@ -46,7 +46,7 @@ spark-sql --conf "spark.sql.extensions=org.opensearch.flint.spark.FlintPPLSparkE
```

### Running With both Flint & PPL Extensions
In order to make use of both flint and ppl extension, one can simply add both jars (`org.opensearch:opensearch-spark-ppl_2.12:0.5.0-SNAPSHOT`,`org.opensearch:opensearch-spark_2.12:0.5.0-SNAPSHOT`) to the cluster's
In order to make use of both flint and ppl extension, one can simply add both jars (`org.opensearch:opensearch-spark-ppl_2.12:0.6.0-SNAPSHOT`,`org.opensearch:opensearch-spark_2.12:0.6.0-SNAPSHOT`) to the cluster's
classpath.

Next need to configure both extensions :
Expand Down
11 changes: 7 additions & 4 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ Currently, Flint metadata is only static configuration without version control a

```json
{
"version": "0.5.0",
"version": "0.6.0",
"name": "...",
"kind": "skipping",
"source": "...",
Expand Down Expand Up @@ -521,12 +521,15 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i
- `spark.datasource.flint.auth.username`: basic auth username.
- `spark.datasource.flint.auth.password`: basic auth password.
- `spark.datasource.flint.region`: default is us-west-2. only been used when auth=sigv4
- `spark.datasource.flint.customAWSCredentialsProvider`: default is empty.
- `spark.datasource.flint.customAWSCredentialsProvider`: default is empty.
- `spark.datasource.flint.customFlintMetadataLogServiceClass`: default is empty.
- `spark.datasource.flint.customFlintIndexMetadataServiceClass`: default is empty.
- `spark.datasource.flint.write.id_name`: no default value.
- `spark.datasource.flint.ignore.id_column` : default value is true.
- `spark.datasource.flint.write.batch_size`: "The number of documents written to Flint in a single batch request. Default value is Integer.MAX_VALUE.
- `spark.datasource.flint.write.batch_bytes`: The approximately amount of data in bytes written to Flint in a single batch request. The actual data write to OpenSearch may more than it. Default value is 1mb. The writing process checks after each document whether the total number of documents (docCount) has reached batch_size or the buffer size has surpassed batch_bytes. If either condition is met, the current batch is flushed and the document count resets to zero.
- `spark.datasource.flint.write.refresh_policy`: default value is false. valid values [NONE(false), IMMEDIATE(true), WAIT_UNTIL(wait_for)]
- `spark.datasource.flint.write.bulkRequestRateLimitPerNode`: [Experimental] Rate limit(request/sec) for bulk request per worker node. Only accept integer value. To reduce the traffic less than 1 req/sec, batch_bytes or batch_size should be reduced. Default value is 0, which disables rate limit.
- `spark.datasource.flint.read.scroll_size`: default value is 100.
- `spark.datasource.flint.read.scroll_duration`: default value is 5 minutes. scroll context keep alive duration.
- `spark.datasource.flint.retry.max_retries`: max retries on failed HTTP request. default value is 3. Use 0 to disable retry.
Expand Down Expand Up @@ -689,7 +692,7 @@ For now, only single or conjunct conditions (conditions connected by AND) in WHE
### AWS EMR Spark Integration - Using execution role
Flint use [DefaultAWSCredentialsProviderChain](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html). When running in EMR Spark, Flint use executionRole credentials
```
--conf spark.jars.packages=org.opensearch:opensearch-spark-standalone_2.12:0.5.0-SNAPSHOT \
--conf spark.jars.packages=org.opensearch:opensearch-spark-standalone_2.12:0.6.0-SNAPSHOT \
--conf spark.jars.repositories=https://aws.oss.sonatype.org/content/repositories/snapshots \
--conf spark.emr-serverless.driverEnv.JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64 \
--conf spark.executorEnv.JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64 \
Expand Down Expand Up @@ -731,7 +734,7 @@ Flint use [DefaultAWSCredentialsProviderChain](https://docs.aws.amazon.com/AWSJa
```
3. Set the spark.datasource.flint.customAWSCredentialsProvider property with value as com.amazonaws.emr.AssumeRoleAWSCredentialsProvider. Set the environment variable ASSUME_ROLE_CREDENTIALS_ROLE_ARN with the ARN value of CrossAccountRoleB.
```
--conf spark.jars.packages=org.opensearch:opensearch-spark-standalone_2.12:0.5.0-SNAPSHOT \
--conf spark.jars.packages=org.opensearch:opensearch-spark-standalone_2.12:0.6.0-SNAPSHOT \
--conf spark.jars.repositories=https://aws.oss.sonatype.org/content/repositories/snapshots \
--conf spark.emr-serverless.driverEnv.JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64 \
--conf spark.executorEnv.JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64 \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core
package org.opensearch.flint.common

/**
* Flint version.
Expand All @@ -19,6 +19,7 @@ object FlintVersion {
val V_0_3_0: FlintVersion = FlintVersion("0.3.0")
val V_0_4_0: FlintVersion = FlintVersion("0.4.0")
val V_0_5_0: FlintVersion = FlintVersion("0.5.0")
val V_0_6_0: FlintVersion = FlintVersion("0.6.0")

def current(): FlintVersion = V_0_5_0
def current(): FlintVersion = V_0_6_0
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.common.metadata;

import java.util.Map;

/**
* Flint index metadata service provides API for index metadata related operations on a Flint index
* regardless of underlying storage.
* <p>
* Custom implementations of this interface are expected to provide a public constructor with
* the signature {@code public MyCustomService(SparkConf sparkConf)} to be instantiated by
* the FlintIndexMetadataServiceBuilder.
*/
public interface FlintIndexMetadataService {

/**
* Retrieve metadata for a Flint index.
*
* @param indexName index name
* @return index metadata
*/
FlintMetadata getIndexMetadata(String indexName);

/**
* Retrieve all metadata for Flint index whose name matches the given pattern.
*
* @param indexNamePattern index name pattern
* @return map where the keys are the matched index names, and the values are
* corresponding index metadata
*/
Map<String, FlintMetadata> getAllIndexMetadata(String... indexNamePattern);

/**
* Update metadata for a Flint index.
*
* @param indexName index name
* @param metadata index metadata to update
*/
void updateIndexMetadata(String indexName, FlintMetadata metadata);

/**
* Delete metadata for a Flint index.
*
* @param indexName index name
*/
void deleteIndexMetadata(String indexName);
}
Loading

0 comments on commit 57e0c4a

Please sign in to comment.