diff --git a/README.md b/README.md index b4ff925..dcfb17c 100644 --- a/README.md +++ b/README.md @@ -94,7 +94,7 @@ Heimdall supports a growing set of pluggable command types: | `shell` | [Shell command execution](https://github.com/patterninc/heimdall/blob/main/plugins/shell/README.md) | Sync or Async | | `glue` | [Pulling Iceberg table metadata](https://github.com/patterninc/heimdall/blob/main/plugins/glue/README.md) | Sync or Async | | `dynamo` | [DynamoDB read operation](https://github.com/patterninc/heimdall/blob/main/plugins/dynamo/README.md) | Sync or Async | -| `snowflake` | Query execution in Snowflake | Async | +| `snowflake` | [Query execution in Snowflake](https://github.com/patterninc/heimdall/blob/main/plugins/snowflake/README.md) | Async | | `spark` | [SparkSQL query execution on EMR on EKS](https://github.com/patterninc/heimdall/blob/main/plugins/spark/README.md) | Async | --- diff --git a/internal/pkg/object/command/snowflake/snowflake.go b/internal/pkg/object/command/snowflake/snowflake.go index f577366..b5c8d64 100644 --- a/internal/pkg/object/command/snowflake/snowflake.go +++ b/internal/pkg/object/command/snowflake/snowflake.go @@ -27,17 +27,18 @@ var ( ErrInvalidKeyType = fmt.Errorf(`invalida key type`) ) -type snowflakeCommandContext struct { +type snowflakeCommandContext struct{} + +type snowflakeJobContext struct { + Query string `yaml:"query,omitempty" json:"query,omitempty"` +} + +type snowflakeClusterContext struct { Account string `yaml:"account,omitempty" json:"account,omitempty"` User string `yaml:"user,omitempty" json:"user,omitempty"` Database string `yaml:"database,omitempty" json:"database,omitempty"` Warehouse string `yaml:"warehouse,omitempty" json:"warehouse,omitempty"` PrivateKey string `yaml:"private_key,omitempty" json:"private_key,omitempty"` - dsn string -} - -type snowflakeJobContext struct { - Query string `yaml:"query,omitempty" json:"query,omitempty"` } func parsePrivateKey(privateKeyBytes []byte) (*rsa.PrivateKey, error) { @@ -71,53 +72,52 @@ func New(_ *context.Context) (plugin.Handler, error) { func (s *snowflakeCommandContext) handler(r *plugin.Runtime, j *job.Job, c *cluster.Cluster) error { - // do we have dsn defined? - if s.dsn == `` { - if c.Context != nil { - if err := c.Context.Unmarshal(s); err != nil { - return err - } - } - - // prepare snowflake config - privateKeyBytes, err := os.ReadFile(s.PrivateKey) - if err != nil { + clusterContext := &snowflakeClusterContext{} + if c.Context != nil { + if err := c.Context.Unmarshal(clusterContext); err != nil { return err } + } - // Parse the private key - privateKey, err := parsePrivateKey(privateKeyBytes) - if err != nil { - return err - } + // let's unmarshal job context + jobContext := &snowflakeJobContext{} + if err := j.Context.Unmarshal(jobContext); err != nil { + return err + } - if s.dsn, err = sf.DSN(&sf.Config{ - Account: s.Account, - User: s.User, - Database: s.Database, - Warehouse: s.Warehouse, - Authenticator: sf.AuthTypeJwt, - PrivateKey: privateKey, - Application: r.UserAgent, - }); err != nil { - return err - } + // prepare snowflake config + privateKeyBytes, err := os.ReadFile(clusterContext.PrivateKey) + if err != nil { + return err } - // let's unmarshal job context - sc := &snowflakeJobContext{} - if err := j.Context.Unmarshal(sc); err != nil { + // Parse the private key + privateKey, err := parsePrivateKey(privateKeyBytes) + if err != nil { + return err + } + + dsn, err := sf.DSN(&sf.Config{ + Account: clusterContext.Account, + User: clusterContext.User, + Database: clusterContext.Database, + Warehouse: clusterContext.Warehouse, + Authenticator: sf.AuthTypeJwt, + PrivateKey: privateKey, + Application: r.UserAgent, + }) + if err != nil { return err } // open connection - db, err := sql.Open(snowflakeDriverName, s.dsn) + db, err := sql.Open(snowflakeDriverName, dsn) if err != nil { return err } defer db.Close() - rows, err := db.Query(sc.Query) + rows, err := db.Query(jobContext.Query) if err != nil { return err } diff --git a/plugins/snowflake/README.md b/plugins/snowflake/README.md new file mode 100644 index 0000000..0ccf9c6 --- /dev/null +++ b/plugins/snowflake/README.md @@ -0,0 +1,117 @@ +# ❄️ Snowflake Plugin + +The **Snowflake Plugin** enables Heimdall to execute **SQL queries** against a configured Snowflake instance using **key-pair authentication**. This allows seamless integration of Snowflake into your orchestration flows. + +⚠️ **Note:** The plugin supports **a single SQL statement per job**. +Multi-statement execution is ***currently not supported***, but is planned for future updates. + +--- + +## 🧩 Plugin Overview + +* **Plugin Name:** `snowflake` +* **Execution Mode:** Async +* **Use Case:** Executing data queries or transformations on Snowflake + +--- + +## ⚙️ Defining a Snowflake Command + +You must define both a **command** and a **cluster**. The command represents the logical query job, while the cluster provides Snowflake connection details. + +### 🔹 Command Configuration + +```yaml +- name: snowflake-0.0.1 + status: active + plugin: snowflake + version: 0.0.1 + description: Query user metrics from Snowflake + tags: + - type:snowflake + cluster_tags: + - type:snowflake +``` + +### 🔸 Cluster Configuration + +```yaml +- name: snowflake-prod-cluster + status: active + version: 0.0.1 + description: Production Snowflake cluster + context: + account: myorg-account-id + user: my-snowflake-user + database: MY_DB + warehouse: MY_WAREHOUSE + private_key: /etc/keys/snowflake-private-key.p8 + tags: + - type:snowflake + - data:prod +``` + +> The `private_key` field must point to a valid **PKCS#8** PEM-formatted file accessible from the execution environment. + +--- + +## 🚀 Submitting a Snowflake Job + +Jobs must include a single SQL statement via the `context.query` field. + +```json +{ + "name": "country-count-report", + "version": "0.0.1", + "command_criteria": ["type:user-metrics"], + "cluster_criteria": ["type:snowflake-prod"], + "context": { + "query": "SELECT country, COUNT(*) AS users FROM user_data GROUP BY country" + } +} +``` + +🔹 The plugin will execute this query using the Snowflake configuration defined in the matched cluster. + +> Avoid semicolon-separated or batch queries—only **a single statement per job** is supported at this time. + +--- + +## 📊 Returning Query Results + +If your query returns data, Heimdall captures it as structured output accessible via: + +``` +GET /api/v1/job//result +``` + +✅ Example format: + +```json +{ + "columns": [ + {"name": "country", "type": "string"}, + {"name": "users", "type": "int"} + ], + "data": [ + ["USA", 1500], + ["Canada", 340] + ] +} +``` + +--- + +## 🔐 Authentication & Security + +* Auth is performed using Snowflake's **key-pair authentication**. +* Ensure private keys are stored securely and only readable by the runtime environment. + +--- + +## 🧠 Best Practices + +* Write simple, single-statement queries for now. +* Keep secrets (like private keys) out of job context—store them securely in the cluster configuration. +* Use cluster and command tags to enforce environment separation (e.g., `prod`, `dev`, etc.). +* Plan for upcoming multi-statement support by structuring your queries modularly where possible.