Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ Heimdall supports a growing set of pluggable command types:
| `shell` | [Shell command execution](https://github.com/patterninc/heimdall/blob/main/plugins/shell/README.md) | Sync or Async |
| `glue` | [Pulling Iceberg table metadata](https://github.com/patterninc/heimdall/blob/main/plugins/glue/README.md) | Sync or Async |
| `dynamo` | [DynamoDB read operation](https://github.com/patterninc/heimdall/blob/main/plugins/dynamo/README.md) | Sync or Async |
| `snowflake` | Query execution in Snowflake | Async |
| `snowflake` | [Query execution in Snowflake](https://github.com/patterninc/heimdall/blob/main/plugins/snowflake/README.md) | Async |
| `spark` | [SparkSQL query execution on EMR on EKS](https://github.com/patterninc/heimdall/blob/main/plugins/spark/README.md) | Async |

---
Expand Down
76 changes: 38 additions & 38 deletions internal/pkg/object/command/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,18 @@ var (
ErrInvalidKeyType = fmt.Errorf(`invalida key type`)
)

type snowflakeCommandContext struct {
type snowflakeCommandContext struct{}

type snowflakeJobContext struct {
Query string `yaml:"query,omitempty" json:"query,omitempty"`
}

type snowflakeClusterContext struct {
Account string `yaml:"account,omitempty" json:"account,omitempty"`
User string `yaml:"user,omitempty" json:"user,omitempty"`
Database string `yaml:"database,omitempty" json:"database,omitempty"`
Warehouse string `yaml:"warehouse,omitempty" json:"warehouse,omitempty"`
PrivateKey string `yaml:"private_key,omitempty" json:"private_key,omitempty"`
dsn string
}

type snowflakeJobContext struct {
Query string `yaml:"query,omitempty" json:"query,omitempty"`
}

func parsePrivateKey(privateKeyBytes []byte) (*rsa.PrivateKey, error) {
Expand Down Expand Up @@ -71,53 +72,52 @@ func New(_ *context.Context) (plugin.Handler, error) {

func (s *snowflakeCommandContext) handler(r *plugin.Runtime, j *job.Job, c *cluster.Cluster) error {

// do we have dsn defined?
if s.dsn == `` {
if c.Context != nil {
if err := c.Context.Unmarshal(s); err != nil {
return err
}
}

// prepare snowflake config
privateKeyBytes, err := os.ReadFile(s.PrivateKey)
if err != nil {
clusterContext := &snowflakeClusterContext{}
if c.Context != nil {
if err := c.Context.Unmarshal(clusterContext); err != nil {
return err
}
}

// Parse the private key
privateKey, err := parsePrivateKey(privateKeyBytes)
if err != nil {
return err
}
// let's unmarshal job context
jobContext := &snowflakeJobContext{}
if err := j.Context.Unmarshal(jobContext); err != nil {
return err
}

if s.dsn, err = sf.DSN(&sf.Config{
Account: s.Account,
User: s.User,
Database: s.Database,
Warehouse: s.Warehouse,
Authenticator: sf.AuthTypeJwt,
PrivateKey: privateKey,
Application: r.UserAgent,
}); err != nil {
return err
}
// prepare snowflake config
privateKeyBytes, err := os.ReadFile(clusterContext.PrivateKey)
if err != nil {
return err
}

// let's unmarshal job context
sc := &snowflakeJobContext{}
if err := j.Context.Unmarshal(sc); err != nil {
// Parse the private key
privateKey, err := parsePrivateKey(privateKeyBytes)
if err != nil {
return err
}

dsn, err := sf.DSN(&sf.Config{
Account: clusterContext.Account,
User: clusterContext.User,
Database: clusterContext.Database,
Warehouse: clusterContext.Warehouse,
Authenticator: sf.AuthTypeJwt,
PrivateKey: privateKey,
Application: r.UserAgent,
})
if err != nil {
return err
}

// open connection
db, err := sql.Open(snowflakeDriverName, s.dsn)
db, err := sql.Open(snowflakeDriverName, dsn)
if err != nil {
return err
}
defer db.Close()

rows, err := db.Query(sc.Query)
rows, err := db.Query(jobContext.Query)
if err != nil {
return err
}
Expand Down
117 changes: 117 additions & 0 deletions plugins/snowflake/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# ❄️ Snowflake Plugin

The **Snowflake Plugin** enables Heimdall to execute **SQL queries** against a configured Snowflake instance using **key-pair authentication**. This allows seamless integration of Snowflake into your orchestration flows.

⚠️ **Note:** The plugin supports **a single SQL statement per job**.
Multi-statement execution is ***currently not supported***, but is planned for future updates.

---

## 🧩 Plugin Overview

* **Plugin Name:** `snowflake`
* **Execution Mode:** Async
* **Use Case:** Executing data queries or transformations on Snowflake

---

## ⚙️ Defining a Snowflake Command

You must define both a **command** and a **cluster**. The command represents the logical query job, while the cluster provides Snowflake connection details.

### 🔹 Command Configuration

```yaml
- name: snowflake-0.0.1
status: active
plugin: snowflake
version: 0.0.1
description: Query user metrics from Snowflake
tags:
- type:snowflake
cluster_tags:
- type:snowflake
```

### 🔸 Cluster Configuration

```yaml
- name: snowflake-prod-cluster
status: active
version: 0.0.1
description: Production Snowflake cluster
context:
account: myorg-account-id
user: my-snowflake-user
database: MY_DB
warehouse: MY_WAREHOUSE
private_key: /etc/keys/snowflake-private-key.p8
tags:
- type:snowflake
- data:prod
```

> The `private_key` field must point to a valid **PKCS#8** PEM-formatted file accessible from the execution environment.

---

## 🚀 Submitting a Snowflake Job

Jobs must include a single SQL statement via the `context.query` field.

```json
{
"name": "country-count-report",
"version": "0.0.1",
"command_criteria": ["type:user-metrics"],
"cluster_criteria": ["type:snowflake-prod"],
"context": {
"query": "SELECT country, COUNT(*) AS users FROM user_data GROUP BY country"
}
}
```

🔹 The plugin will execute this query using the Snowflake configuration defined in the matched cluster.

> Avoid semicolon-separated or batch queries—only **a single statement per job** is supported at this time.

---

## 📊 Returning Query Results

If your query returns data, Heimdall captures it as structured output accessible via:

```
GET /api/v1/job/<job_id>/result
```

✅ Example format:

```json
{
"columns": [
{"name": "country", "type": "string"},
{"name": "users", "type": "int"}
],
"data": [
["USA", 1500],
["Canada", 340]
]
}
```

---

## 🔐 Authentication & Security

* Auth is performed using Snowflake's **key-pair authentication**.
* Ensure private keys are stored securely and only readable by the runtime environment.

---

## 🧠 Best Practices

* Write simple, single-statement queries for now.
* Keep secrets (like private keys) out of job context—store them securely in the cluster configuration.
* Use cluster and command tags to enforce environment separation (e.g., `prod`, `dev`, etc.).
* Plan for upcoming multi-statement support by structuring your queries modularly where possible.