Skip to content

Commit

Permalink
Add Azure support (close #1294)
Browse files Browse the repository at this point in the history
  • Loading branch information
pondzix authored and spenes committed Jul 20, 2023
1 parent 07a2923 commit 7241296
Show file tree
Hide file tree
Showing 54 changed files with 2,656 additions and 236 deletions.
5 changes: 4 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ jobs:
- databricksLoader
- transformerKinesis
- transformerPubsub
- transformerKafka
steps:
- name: Checkout Github
uses: actions/checkout@v2
Expand Down Expand Up @@ -164,7 +165,8 @@ jobs:
'project databricksLoader; assembly' \
'project transformerBatch; assembly' \
'project transformerKinesis; assembly' \
'project transformerPubsub; assembly'
'project transformerPubsub; assembly' \
'project transformerKafka; assembly'
- name: Get current version
id: ver
run: echo "::set-output name=project_version::${GITHUB_REF#refs/tags/}"
Expand All @@ -182,5 +184,6 @@ jobs:
modules/transformer-batch/target/scala-2.12/snowplow-transformer-batch-${{ steps.ver.outputs.project_version }}.jar
modules/transformer-kinesis/target/scala-2.12/snowplow-transformer-kinesis-${{ steps.ver.outputs.project_version }}.jar
modules/transformer-pubsub/target/scala-2.12/snowplow-transformer-pubsub-${{ steps.ver.outputs.project_version }}.jar
modules/transformer-kafka/target/scala-2.12/snowplow-transformer-kafka-${{ steps.ver.outputs.project_version }}.jar
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
35 changes: 34 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ lazy val root = project.in(file("."))
.aggregate(
aws,
gcp,
azure,
common,
commonTransformerStream,
loader,
Expand All @@ -38,6 +39,7 @@ lazy val root = project.in(file("."))
transformerBatch,
transformerKinesis,
transformerPubsub,
transformerKafka
)

lazy val common: Project = project
Expand All @@ -60,6 +62,13 @@ lazy val gcp: Project = project
.dependsOn(common % "compile->compile;test->test")
.enablePlugins(BuildInfoPlugin)

lazy val azure: Project = project
.in(file("modules/azure"))
.settings(BuildSettings.azureBuildSettings)
.settings(libraryDependencies ++= Dependencies.azureDependencies)
.dependsOn(common % "compile->compile;test->test")
.enablePlugins(BuildInfoPlugin)

lazy val commonTransformerStream = project
.in(file("modules/common-transformer-stream"))
.settings(BuildSettings.commonStreamTransformerBuildSettings)
Expand All @@ -74,7 +83,11 @@ lazy val loader = project
.settings(BuildSettings.loaderBuildSettings)
.settings(addCompilerPlugin(Dependencies.betterMonadicFor))
.settings(libraryDependencies ++= Dependencies.loaderDependencies)
.dependsOn(aws % "compile->compile;test->test;runtime->runtime", gcp % "compile->compile;test->test")
.dependsOn(
aws % "compile->compile;test->test;runtime->runtime",
gcp % "compile->compile;test->test",
azure % "compile->compile;test->test"
)

lazy val redshiftLoader = project
.in(file("modules/redshift-loader"))
Expand Down Expand Up @@ -169,3 +182,23 @@ lazy val transformerPubsubDistroless = project
.settings(excludeDependencies ++= Dependencies.commonStreamTransformerExclusions)
.dependsOn(commonTransformerStream % "compile->compile;test->test;runtime->runtime", gcp % "compile->compile;test->test")
.enablePlugins(JavaAppPackaging, SnowplowDistrolessDockerPlugin, BuildInfoPlugin)

lazy val transformerKafka = project
.in(file("modules/transformer-kafka"))
.settings(BuildSettings.transformerKafkaBuildSettings)
.settings(addCompilerPlugin(Dependencies.betterMonadicFor))
.settings(libraryDependencies ++= Dependencies.transformerKafkaDependencies)
.settings(excludeDependencies ++= Dependencies.commonStreamTransformerExclusions)
.dependsOn(commonTransformerStream % "compile->compile;test->test;runtime->runtime", azure % "compile->compile;test->test;runtime->runtime")
.enablePlugins(JavaAppPackaging, SnowplowDockerPlugin, BuildInfoPlugin)


lazy val transformerKafkaDistroless = project
.in(file("modules/distroless/transformer-kafka"))
.settings(sourceDirectory := (transformerKafka / sourceDirectory).value)
.settings(BuildSettings.transformerKafkaBuildSettings)
.settings(addCompilerPlugin(Dependencies.betterMonadicFor))
.settings(libraryDependencies ++= Dependencies.transformerKafkaDependencies)
.settings(excludeDependencies ++= Dependencies.commonStreamTransformerExclusions)
.dependsOn(commonTransformerStream % "compile->compile;test->test;runtime->runtime", azure % "compile->compile;test->test")
.enablePlugins(JavaAppPackaging, SnowplowDistrolessDockerPlugin, BuildInfoPlugin)
15 changes: 15 additions & 0 deletions config/loader/azure/databricks.config.minimal.hocon
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"blobStorageEndpoint": "https://accountName.blob.core.windows.net/container-name"
"messageQueue": {
"type": "kafka"
"bootstrapServers": "localhost:9092"
"topicName": "loaderTopic"
},
"storage" : {
"host": "abc.cloud.databricks.com"
"password": "Supersecret1"
"schema": "atomic",
"port": 443,
"httpPath": "/databricks/http/path",
}
}
274 changes: 274 additions & 0 deletions config/loader/azure/databricks.config.reference.hocon
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
{

# Azure Blob Storage endpoint, should contain container with transformer's output
"blobStorageEndpoint": "https://accountName.blob.core.windows.net/container-name"

# Name of the Azure Key Vault where application secrets are stored.
# Required if secret store is used in storage.password field.
"azureVaultName": "azure-vault"

# Kafka topic used by Transformer and Loader to communicate
"messageQueue": {
"type": "kafka"

# Name of the Kafka topic to read from
"topicName": "loaderTopic"

# A list of host:port pairs to use for establishing the initial connection to the Kafka cluster
# This list should be in the form host1:port1,host2:port2,...
"bootstrapServers": "localhost:9092"

# Optional, Kafka Consumer configuration
# See https://kafka.apache.org/documentation/#consumerconfigs for all properties
"consumerConf": {
"enable.auto.commit": "false"
"auto.offset.reset" : "latest"
"group.id": "loader"
"allow.auto.create.topics": "false"
}
},

# Warehouse connection details
"storage" : {
# Hostname of Databricks cluster
"host": "abc.cloud.databricks.com",
# DB password
"password": {
# A password can be placed in Azure Key Vault or be a plain text
"secretStore": {
"parameterName": "snowplow.databricks.password"
}
},
# Optional. Override the Databricks default catalog, e.g. with a Unity catalog name.
"catalog": "hive_metastore",
# DB schema
"schema": "atomic",
# Database port
"port": 443,
# Http Path of Databricks cluster
"httpPath": "/databricks/http/path",
# User agent name for Databricks connection. Optional, default value "snowplow-rdbloader-oss"
"userAgent": "snowplow-rdbloader-oss"

# Optimize period per table, that will be used as predicate for the OPTIMIZE command.
"eventsOptimizePeriod": "2 days"

# Optional, default method is 'NoCreds'
# Specifies the auth method to use with 'COPY INTO' statement.
"loadAuthMethod": {
# With 'NoCreds', no credentials will be passed to 'COPY INTO' statement.
# Databricks cluster needs to have permission to access transformer
# output Azure Blob Storage container. More information can be found here:
# https://docs.databricks.com/storage/azure-storage.html
"type": "NoCreds"
}
#"loadAuthMethod": {
# # With 'TempCreds', temporary credentials will be created for every
# # load operation and these temporary credentials will be passed to
# # 'COPY INTO' statement. With this way, Databricks cluster doesn't need
# # permission to access to transformer output Azure Blob Storage container.
# # This access will be provided by temporary credentials.
# "type": "TempCreds"
#
# # If 'TempCreds' load auth method is used, this value will be used as a session duration
# # of temporary credentials used for loading data and folder monitoring.
# # Optional, default value "1 hour"
# "credentialsTtl": "1 hour"
#}
},

"schedules": {
# Periodic schedules to stop loading, e.g. for Databricks maintenance window
# Any amount of schedules is supported, but recommended to not overlap them
# The schedule works with machine's local timezone (and UTC is recommended)
"noOperation": [
{
# Human-readable name of the no-op window
"name": "Maintenance window",
# Cron expression with second granularity
"when": "0 0 12 * * ?",
# For how long the loader should be paused
"duration": "1 hour"
}
],
# Loader runs periodic OPTIMIZE statements to prevent growing number of files behind delta tables.
"optimizeEvents": "0 0 0 ? * *",
"optimizeManifest": "0 0 5 ? * *"
}

# Observability and reporting options
"monitoring": {
# Snowplow tracking (optional)
"snowplow": {
"appId": "databricks-loader",
"collector": "snplow.acme.com",
},

# An endpoint for alerts and infromational events
# Everything sent to snowplow collector (as properly formed self-describing events)
# will also be sent to the webhook as POST payloads with self-describing JSONs
"webhook": {
# An actual HTTP endpoint
"endpoint": "https://webhook.acme.com",
# Set of arbitrary key-value pairs attached to the payload
"tags": {
"pipeline": "production"
}
},

# Optional, for tracking runtime exceptions
"sentry": {
"dsn": "http://sentry.acme.com"
},

# Optional, configure how metrics are reported
"metrics": {
# Optional, send metrics to StatsD server
"statsd": {
"hostname": "localhost",
"port": 8125,
# Any key-value pairs to be tagged on every StatsD metric
"tags": {
"app": "rdb-loader"
}
# Optional, override the default metric prefix
# "prefix": "snowplow.rdbloader."
},

# Optional, print metrics on stdout (with slf4j)
"stdout": {
# Optional, override the default metric prefix
# "prefix": "snowplow.rdbloader."
}

# Optional, period for metrics emitted periodically
# Default value 5 minutes
# There is only one periodic metric at the moment.
# This metric is minimum_age_of_loaded_data.
# It specifies how old is the latest event in the warehouse.
"period": "5 minutes"
},

# Optional, configuration for periodic unloaded/corrupted folders checks
"folders": {
# Path where Loader could store auxiliary logs
# Loader should be able to write here, Databricks should be able to load from here
"staging": "https://accountName.blob.core.windows.net/staging/",
# How often to check
"period": "1 hour"
# Specifies since when folder monitoring will check
"since": "14 days"
# Specifies until when folder monitoring will check
"until": "7 days"
# Path to transformer archive (must be same as Transformer's `output.path`)
"transformerOutput": "https://accountName.blob.core.windows.net/transformed/"
# How many times the check can fail before generating an alarm instead of warning
"failBeforeAlarm": 3
},

# Periodic DB health-check, raising a warning if DB hasn't responded to `SELECT 1`
"healthCheck": {
# How often query a DB
"frequency": "20 minutes",
# How long to wait for a response
"timeout": "15 seconds"
}
},

# Immediate retries configuration
# Unlike retryQueue these retries happen immediately, without proceeding to another message
"retries": {
# Starting backoff period
"backoff": "30 seconds"
# A strategy to use when deciding on next backoff
"strategy": "EXPONENTIAL"
# How many attempts to make before sending the message into retry queue
# If missing - the loader will be retrying until cumulative bound
"attempts": 3,
# When backoff reaches this delay the Loader will stop retrying
# Missing cumulativeBound with missing attempts will force to retry inifintely
"cumulativeBound": "1 hour"
},

# Check the target destination to make sure it is ready.
# Retry the checking until target got ready and block the application in the meantime
"readyCheck": {
# Starting backoff period
"backoff": "15 seconds"
# A strategy to use when deciding on next backoff
"strategy": "CONSTANT"
# When backoff reaches this delay the Loader will stop retrying
"cumulativeBound": "10 minutes"
},

# Retries configuration for initilization block
# It will retry on all exceptions from there
"initRetries": {
# Starting backoff period
"backoff": "30 seconds"
# A strategy to use when deciding on next backoff
"strategy": "EXPONENTIAL"
# How many attempts to make before sending the message into retry queue
# If missing - the loader will be retrying until cumulative bound
"attempts": 3,
# When backoff reaches this delay the Loader will stop retrying
# Missing cumulativeBound with missing attempts will force to retry inifintely
"cumulativeBound": "1 hour"
},

# Additional backlog of recently failed folders that could be automatically retried
# Retry Queue saves a failed folder and then re-reads the info from shredding_complete S3 file
"retryQueue": {
# How often batch of failed folders should be pulled into a discovery queue
"period": "30 minutes",
# How many failures should be kept in memory
# After the limit is reached new failures are dropped
"size": 64,
# How many attempt to make for each folder
# After the limit is reached new failures are dropped
"maxAttempts": 3,
# Artificial pause after each failed folder being added to the queue
"interval": "5 seconds"
},

"timeouts": {
# How long loading (actual COPY statements) can take before considering Databricks unhealthy
# Without any progress (i.e. different subfolder) within this period, loader
# will abort the transaction
"loading": "45 minutes",

# How long non-loading steps (such as ALTER TABLE or metadata queries) can take
# before considering Databricks unhealthy
"nonLoading": "10 minutes"
}

# Optional. Configure telemetry
# All the fields are optional
"telemetry": {
# Set to true to disable telemetry
"disable": false
# Interval for the heartbeat event
"interval": 15 minutes
# HTTP method used to send the heartbeat event
"method": "POST"
# URI of the collector receiving the heartbeat event
"collectorUri": "collector-g.snowplowanalytics.com"
# Port of the collector receiving the heartbeat event
"collectorPort": 443
# Whether to use https or not
"secure": true
# Identifier intended to tie events together across modules,
# infrastructure and apps when used consistently
"userProvidedId": "my_pipeline"
# ID automatically generated upon running a modules deployment script
# Intended to identify each independent module, and the infrastructure it controls
"autoGeneratedId": "hfy67e5ydhtrd"
# Unique identifier for the VM instance
# Unique for each instance of the app running within a module
"instanceId": "665bhft5u6udjf"
# Name of the terraform module that deployed the app
"moduleName": "rdb-loader-ce"
# Version of the terraform module that deployed the app
"moduleVersion": "1.0.0"
}
}
Loading

0 comments on commit 7241296

Please sign in to comment.