Skip to content

Commit

Permalink
Merge pull request #30 from metabolicdata/feature/after_action
Browse files Browse the repository at this point in the history
Feature/after action
  • Loading branch information
margon8 authored Mar 19, 2024
2 parents 9d8cf6f + 7a655d9 commit 680f893
Show file tree
Hide file tree
Showing 31 changed files with 504 additions and 235 deletions.
8 changes: 4 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@ version := "SNAPSHOT"
scalaVersion := "2.12.17"

/* Reusable versions */
val sparkVersion = "3.3.0"
val awsVersion = "1.12.401"
val sparkVersion = "3.3.2"
val awsVersion = "1.12.682"
val testContainersVersion = "0.40.12"

libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % sparkVersion % Provided,
"org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion,
"org.apache.kafka" % "kafka-clients" % "3.3.2",

"io.delta" %% "delta-core" % "2.1.1",
"io.delta" %% "delta-core" % "2.3.0",

"org.apache.logging.log4j" %% "log4j-api-scala" % "12.0",
"com.typesafe" % "config" % "1.4.0",
Expand Down Expand Up @@ -57,7 +57,7 @@ dependencyOverrides ++= {
libraryDependencies += "org.scalatest" %% "scalatest" % "3.2.9" % Test


libraryDependencies += "com.holdenkarau" %% "spark-testing-base" % s"${sparkVersion}_1.4.0" % Test
libraryDependencies += "com.holdenkarau" %% "spark-testing-base" % s"${sparkVersion}_1.4.7" % Test
dependencyOverrides += "org.xerial.snappy" % "snappy-java" % "1.1.8.2" % Test

libraryDependencies += "com.dimafeng" %% "testcontainers-scala-scalatest" % testContainersVersion % Test
Expand Down
14 changes: 14 additions & 0 deletions docu/fundamentals/operations.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,20 @@ When using Airbyte with Datalakes (fileformat) , Airbytes uses structures to man

</details>

### **Watermark Op**

Constraints how further in time a source will be considered.

```hoon
op.watermark {
onColumn: "created_at"
value: "60 seconds"
}
```

In general used to limit the scope of the source to a certain time window. This is useful for sources that are not append only, like a table that is updated with new data, but also has old data that is not relevant anymore. This is a must in streaming modes especially when joining with other sources so the join window is not too large, and the job runs out of memory waiting for a possible source delay.


## Mapping Operations

Mapping Operations have all the context of the sources and can be exectued before (preOps section) or after (postOps section) the SQL Mapping.
Expand Down
69 changes: 69 additions & 0 deletions examples/confs/log4j2.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Set everything to be logged to the console
rootLogger.level = WARN, console
rootLogger.appenderRef.stdout.ref = console

# In the pattern layout configuration below, we specify an explicit `%ex` conversion
# pattern for logging Throwables. If this was omitted, then (by default) Log4J would
# implicitly add an `%xEx` conversion pattern which logs stacktraces with additional
# class packaging information. That extra information can sometimes add a substantial
# performance overhead, so we disable it in our default logging config.
# For more information, see SPARK-39361.
appender.console.type = Console
appender.console.name = console
appender.console.target = SYSTEM_ERR
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n%ex

# Set the default spark-shell/spark-sql log level to WARN. When running the
# spark-shell/spark-sql, the log level for these classes is used to overwrite
# the root logger's log level, so that the user can have different defaults
# for the shell and regular Spark apps.
logger.repl.name = org.apache.spark.repl.Main
logger.repl.level = warn

logger.thriftserver.name = org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver
logger.thriftserver.level = warn

# Settings to quiet third party logs that are too verbose
logger.jetty1.name = org.sparkproject.jetty
logger.jetty1.level = warn
logger.jetty2.name = org.sparkproject.jetty.util.component.AbstractLifeCycle
logger.jetty2.level = error
logger.replexprTyper.name = org.apache.spark.repl.SparkIMain$exprTyper
logger.replexprTyper.level = info
logger.replSparkILoopInterpreter.name = org.apache.spark.repl.SparkILoop$SparkILoopInterpreter
logger.replSparkILoopInterpreter.level = info
logger.parquet1.name = org.apache.parquet
logger.parquet1.level = error
logger.parquet2.name = parquet
logger.parquet2.level = error

# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
logger.RetryingHMSHandler.name = org.apache.hadoop.hive.metastore.RetryingHMSHandler
logger.RetryingHMSHandler.level = fatal
logger.FunctionRegistry.name = org.apache.hadoop.hive.ql.exec.FunctionRegistry
logger.FunctionRegistry.level = error

# For deploying Spark ThriftServer
# SPARK-34128: Suppress undesirable TTransportException warnings involved in THRIFT-4805
appender.console.filter.1.type = RegexFilter
appender.console.filter.1.regex = .*Thrift error occurred during processing of message.*
appender.console.filter.1.onMatch = deny
appender.console.filter.1.onMismatch = neutral
69 changes: 69 additions & 0 deletions examples/confs/log4j2.properties.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Set everything to be logged to the console
rootLogger.level = info
rootLogger.appenderRef.stdout.ref = console

# In the pattern layout configuration below, we specify an explicit `%ex` conversion
# pattern for logging Throwables. If this was omitted, then (by default) Log4J would
# implicitly add an `%xEx` conversion pattern which logs stacktraces with additional
# class packaging information. That extra information can sometimes add a substantial
# performance overhead, so we disable it in our default logging config.
# For more information, see SPARK-39361.
appender.console.type = Console
appender.console.name = console
appender.console.target = SYSTEM_ERR
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n%ex

# Set the default spark-shell/spark-sql log level to WARN. When running the
# spark-shell/spark-sql, the log level for these classes is used to overwrite
# the root logger's log level, so that the user can have different defaults
# for the shell and regular Spark apps.
logger.repl.name = org.apache.spark.repl.Main
logger.repl.level = warn

logger.thriftserver.name = org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver
logger.thriftserver.level = warn

# Settings to quiet third party logs that are too verbose
logger.jetty1.name = org.sparkproject.jetty
logger.jetty1.level = warn
logger.jetty2.name = org.sparkproject.jetty.util.component.AbstractLifeCycle
logger.jetty2.level = error
logger.replexprTyper.name = org.apache.spark.repl.SparkIMain$exprTyper
logger.replexprTyper.level = info
logger.replSparkILoopInterpreter.name = org.apache.spark.repl.SparkILoop$SparkILoopInterpreter
logger.replSparkILoopInterpreter.level = info
logger.parquet1.name = org.apache.parquet
logger.parquet1.level = error
logger.parquet2.name = parquet
logger.parquet2.level = error

# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
logger.RetryingHMSHandler.name = org.apache.hadoop.hive.metastore.RetryingHMSHandler
logger.RetryingHMSHandler.level = fatal
logger.FunctionRegistry.name = org.apache.hadoop.hive.ql.exec.FunctionRegistry
logger.FunctionRegistry.level = error

# For deploying Spark ThriftServer
# SPARK-34128: Suppress undesirable TTransportException warnings involved in THRIFT-4805
appender.console.filter.1.type = RegexFilter
appender.console.filter.1.regex = .*Thrift error occurred during processing of message.*
appender.console.filter.1.onMatch = deny
appender.console.filter.1.onMismatch = neutral
19 changes: 19 additions & 0 deletions examples/confs/model.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
entities: [
{
name: "My First Entity"
sources: [
{
name: fruits
inputPath: "./examples/data/in"
format: CSV
}
]
mapping: {
sql: "SELECT * FROM fruits"
}
sink: {
outputPath: "./examples/data/out"
format: DELTA
}
}
]
15 changes: 15 additions & 0 deletions examples/data/in/fruits.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
FruitID,FruitName,SoldQuantity,PricePerUnit,DateSold
1,Apple,100,0.25,2023-04-01
2,Banana,150,0.15,2023-04-01
3,Grape,200,0.05,2023-04-02
4,Cherry,75,0.30,2023-04-02
4,Cherry,75,0.30,2023-04-02 # Duplicated row with no change
5,Orange,50,0.20,2023-04-03
6,Kiwi,120,0.50,2023-04-03
7,Mango,85,0.70,2023-04-04
8,Pineapple,60,0.60,2023-04-04
9,Strawberry,190,0.25,2023-04-05
10,Peach,95,0.35,2023-04-05
1,Apple,120,0.25,2023-04-05 # Updated row with increased SoldQuantity
4,Cherry,80,0.30,2023-04-06 # Updated row with increased SoldQuantity
7,Mango,100,0.80,2023-04-06 # Updated row with increased SoldQuantity and PricePerUnit
20 changes: 20 additions & 0 deletions local.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
SPARK_COMMAND='spark-submit
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2,io.delta:delta-core_2.12/2.3.0,com.amazonaws:aws-java-sdk-s3:1.12.401,com.amazonaws:aws-java-sdk-secretsmanager:1.12.401
--class MapperEntrypoint
--files examples/confs/log4j2.properties
--conf spark.driver.extraJavaOptions="-Divy.cache.dir=/tmp -Divy.home=/tmp -Dlog4j.configuration=log4j2.properties"
--conf spark.executor.extraJavaOptions="-Dlog4j.configuration=log4j2.properties"
--deploy-mode client target/scala-2.12/metabolic-core-assembly-SNAPSHOT.jar
--dp.crawl false
--dp.region "eu-central-1"
--dp.iamrole "random"
--dp.envPrefix "local/local"
--dp.database "local"
--dp.environment production
--dp.checkpointLocation examples/metadata
--dp.mysqldb factorial
--dp.stream true
--dp.historical true
--configFile'

eval $SPARK_COMMAND examples/confs/model.conf
14 changes: 7 additions & 7 deletions metabolic.sh
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
#!/bin/bash


# Set the path to your Spark installation
SPARK_HOME=/Users/margon/Code/Metabolic/spark-3.3.2-bin-hadoop3/bin/
# Optionally Set the path to your Spark installation
#SPARK_HOME=

# Set the path to the directory containing your application jar file
APP_DIR=s3a://factorial-metabolic-gitpod
APP_DIR=target/scala-2.12

# Set the name of your application jar file
APP_JAR=metabolic-core-assembly-SNAPSHOT.jar
Expand Down Expand Up @@ -73,14 +73,14 @@ fi
echo

# Submit the Spark application to the local cluster
$SPARK_HOME/spark-submit \
--packages com.amazonaws:aws-java-sdk-s3:1.12.401,com.amazonaws:aws-java-sdk-secretsmanager:1.12.401,org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2
COMMAND='spark-submit \
--packages com.amazonaws:aws-java-sdk-s3:1.12.401,com.amazonaws:aws-java-sdk-secretsmanager:1.12.401,org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.4
--class $MAIN_CLASS \
--conf spark.driver.extraJavaOptions="-Divy.cache.dir=/tmp -Divy.home=/tmp" \
$SPARK_CONF \
$APP_DIR/$APP_JAR \
${HISTORICAL_FLAG:-"--dp.historical false"} ${STREAMING_FLAG:-"--dp.stream false"} $METABOLIC_CONF --configFile $CONFIG_FILE
${HISTORICAL_FLAG:-"--dp.historical false"} ${STREAMING_FLAG:-"--dp.stream false"} $METABOLIC_CONF --configFile $CONFIG_FILE'


echo
echo $COMMAND
#eval $COMMAND
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.metabolic.data.core.domain

import com.amazonaws.regions.Regions
import com.metabolic.data.mapper.domain.io.EngineMode
import com.typesafe.config.ConfigFactory

abstract class CoreConfig(val defaults: Defaults = Defaults(ConfigFactory.load()),
val environment: Environment = Environment("", EngineMode.Batch, "", false, "","", Option.empty, Option.empty))
val environment: Environment = Environment("", EngineMode.Batch, "", false, "","",
Regions.fromName("eu-central-1"), Option.empty, Option.empty))
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
package com.metabolic.data.core.domain
import com.amazonaws.regions.Regions
import com.metabolic.data.mapper.domain.io.EngineMode.EngineMode

case class Environment(name: String,
Expand All @@ -7,6 +8,7 @@ case class Environment(name: String,
crawl: Boolean,
dbName: String,
iamRole: String,
region: Regions,
atlanToken: Option[String],
atlanBaseUrl: Option[String],
historical: Boolean = false,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.metabolic.data.core.services.athena

import com.metabolic.data.core.services.util.ConfigUtilsService
import com.metabolic.data.mapper.domain.Config
import com.metabolic.data.mapper.domain.io.{FileSink, IOFormat}
import com.metabolic.data.mapper.services.AfterAction
import org.apache.logging.log4j.scala.Logging

class AthenaAction extends AfterAction with Logging {

val name: String = "AthenaAction"

def run(config: Config): Unit = {
logger.info(f"Running After Action $name")

val options = config.environment

val region = options.region
val dbName = options.dbName
val tableName = ConfigUtilsService.getTableName(config)

val athena = new AthenaCatalogueService()(region)

config.sink match {
case sink: FileSink =>
sink.format match {
case IOFormat.DELTA =>

logger.info(f"After Action $name: Creating Delta Table for ${config.name}")

athena.dropView(dbName, tableName)

val s3Path = sink.path.replaceAll("version=\\d+", "")
athena.createDeltaTable(dbName, tableName, s3Path)

case _ =>
logger.warn(f"After Action: Skipping $name for ${config.name} as it is not a DeltaSink")
}
case _ =>
logger.warn(f"After Action: Skipping $name for ${config.name} as it is not a FileSink")
}

}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package com.metabolic.data.core.services.glue
package com.metabolic.data.core.services.athena

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
import com.amazonaws.regions.Regions
import com.amazonaws.services.athena._
import com.amazonaws.services.athena.model.{GetQueryResultsRequest, QueryExecutionContext, ResultConfiguration, StartQueryExecutionRequest}
import com.amazonaws.services.athena.model.{GetQueryResultsRequest, QueryExecutionContext, StartQueryExecutionRequest}
import org.apache.logging.log4j.scala.Logging
import org.apache.spark.sql.{DataFrame, SparkSession}


class AthenaCatalogueService(implicit val region: Regions) extends Logging {
Expand Down
Loading

0 comments on commit 680f893

Please sign in to comment.