Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 140 additions & 0 deletions internal/pkg/object/command/postgres/postgres.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package postgres

import (
"fmt"
"strings"
"sync"

"github.com/patterninc/heimdall/internal/pkg/database"
pkgcontext "github.com/patterninc/heimdall/pkg/context"
"github.com/patterninc/heimdall/pkg/object/cluster"
"github.com/patterninc/heimdall/pkg/object/job"
"github.com/patterninc/heimdall/pkg/plugin"
"github.com/patterninc/heimdall/pkg/result"
"github.com/patterninc/heimdall/pkg/result/column"
)

// postgresJobContext represents the context for a PostgreSQL job
type postgresJobContext struct {
Query string `yaml:"query,omitempty" json:"query,omitempty"`
ReturnResult bool `yaml:"return_result,omitempty" json:"return_result,omitempty"`
}

type postgresClusterContext struct {
ConnectionString string `yaml:"connection_string,omitempty" json:"connection_string,omitempty"`
}

type postgresCommandContext struct {
mu sync.Mutex
}

// New creates a new PostgreSQL plugin handler.
func New(_ *pkgcontext.Context) (plugin.Handler, error) {
p := &postgresCommandContext{}
return p.handler, nil
}

// Handler for the PostgreSQL query execution.
func (p *postgresCommandContext) handler(r *plugin.Runtime, j *job.Job, c *cluster.Cluster) error {
jobContext := &postgresJobContext{}
if j.Context != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe let's move it to 2 functions

  • create and validate job context
  • Create an validate cluster context

In that case method vill be a little bit clearer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done added separate functions for job and cluster context validation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also added the test results from local to PR description

if err := j.Context.Unmarshal(jobContext); err != nil {
return fmt.Errorf("failed to unmarshal job context: %w", err)
}
}
if jobContext.Query == "" {
return fmt.Errorf("query is required in job context")
}

clusterContext := &postgresClusterContext{}
if c.Context != nil {
if err := c.Context.Unmarshal(clusterContext); err != nil {
return fmt.Errorf("failed to unmarshal cluster context: %w", err)
}
}
if clusterContext.ConnectionString == "" {
return fmt.Errorf("connection_string is required in cluster context")
}

db := &database.Database{ConnectionString: clusterContext.ConnectionString}

if jobContext.ReturnResult {
return executeSyncQuery(db, jobContext.Query, j)
}
return p.executeAsyncQueries(db, jobContext.Query, j)
}

func executeSyncQuery(db *database.Database, query string, j *job.Job) error {
// Allow a single query, even if it ends with a semicolon
queries := splitAndTrimQueries(query)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does postres not allow to call query with semicolon at the end?

Copy link
Contributor Author

@sanketjadhavSF sanketjadhavSF Jul 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this splitAndTrimQueries() function is used to parse the query param and find if there are multiple queries based on ; separator and route the query execution to different function executeSyncQuery() or executeAsyncQueries()
executeSyncQuery - this function is executed when return_result is set to true and only single query is executed.
executeAsyncQueries - this is used to run .sql scripts with multiple sql statements. this will return only errors if any and not results are returned.

if len(queries) != 1 {
return fmt.Errorf("multiple queries are not allowed when return_result is true")
}

sess, err := db.NewSession(false)
if err != nil {
return fmt.Errorf("failed to open PostgreSQL connection: %w", err)
}
defer sess.Close()

rows, err := sess.Query(queries[0])
Copy link

Copilot AI Aug 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Executing raw SQL queries without prepared statements or input validation could expose the system to SQL injection attacks. Consider using parameterized queries or implementing input validation for the query parameter.

Copilot uses AI. Check for mistakes.
if err != nil {
return fmt.Errorf("PostgreSQL query execution failed: %w", err)
}
defer rows.Close()

rowsResult, err := result.FromRows(rows)
if err != nil {
return fmt.Errorf("failed to process PostgreSQL query results: %w", err)
}

j.Result = rowsResult
return nil
}

func (p *postgresCommandContext) executeAsyncQueries(db *database.Database, query string, j *job.Job) error {
sess, err := db.NewSession(false)
if err != nil {
j.Result = &result.Result{
Columns: []*column.Column{{
Name: "error",
Type: column.Type("string"),
}},
Data: [][]any{{fmt.Sprintf("Async PostgreSQL connection error: %v", err)}},
}
return fmt.Errorf("Async PostgreSQL connection error: %v", err)
}
defer sess.Close()

_, err = sess.Exec(query)
Copy link

Copilot AI Aug 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Executing raw SQL queries without prepared statements or input validation could expose the system to SQL injection attacks. Consider using parameterized queries or implementing input validation for the query parameter.

Copilot uses AI. Check for mistakes.
if err != nil {
j.Result = &result.Result{
Columns: []*column.Column{{
Name: "error",
Type: column.Type("string"),
}},
Data: [][]any{{fmt.Sprintf("Async PostgreSQL query execution error: %v", err)}},
}
Comment on lines +111 to +133
Copy link

Copilot AI Aug 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error result construction is duplicated in both error handling blocks (lines 114-120 and 127-133). Consider extracting this into a helper function to reduce code duplication and improve maintainability.

Suggested change
func (p *postgresCommandContext) executeAsyncQueries(db *database.Database, query string, j *job.Job) error {
sess, err := db.NewSession(false)
if err != nil {
j.Result = &result.Result{
Columns: []*column.Column{{
Name: "error",
Type: column.Type("string"),
}},
Data: [][]any{{fmt.Sprintf("Async PostgreSQL connection error: %v", err)}},
}
return fmt.Errorf("Async PostgreSQL connection error: %v", err)
}
defer sess.Close()
_, err = sess.Exec(query)
if err != nil {
j.Result = &result.Result{
Columns: []*column.Column{{
Name: "error",
Type: column.Type("string"),
}},
Data: [][]any{{fmt.Sprintf("Async PostgreSQL query execution error: %v", err)}},
}
// errorResult constructs a result.Result containing a single error message.
func errorResult(msg string) *result.Result {
return &result.Result{
Columns: []*column.Column{{
Name: "error",
Type: column.Type("string"),
}},
Data: [][]any{{msg}},
}
}
func (p *postgresCommandContext) executeAsyncQueries(db *database.Database, query string, j *job.Job) error {
sess, err := db.NewSession(false)
if err != nil {
j.Result = errorResult(fmt.Sprintf("Async PostgreSQL connection error: %v", err))
return fmt.Errorf("Async PostgreSQL connection error: %v", err)
}
defer sess.Close()
_, err = sess.Exec(query)
if err != nil {
j.Result = errorResult(fmt.Sprintf("Async PostgreSQL query execution error: %v", err))

Copilot uses AI. Check for mistakes.
return fmt.Errorf("Async PostgreSQL query execution error: %v", err)
}

j.Result = &result.Result{
Columns: []*column.Column{{
Name: "message",
Type: column.Type("string"),
}},
Data: [][]any{{"All queries executed successfully"}},
}
return nil
}

func splitAndTrimQueries(query string) []string {
queries := []string{}
for _, q := range strings.Split(query, ";") {
Copy link

Copilot AI Aug 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The query splitting logic using semicolon delimiter is naive and could incorrectly split queries that contain semicolons within string literals, comments, or function definitions. This could break valid SQL statements or create security vulnerabilities.

Copilot uses AI. Check for mistakes.
q = strings.TrimSpace(q)
if q != "" {
queries = append(queries, q)
}
}
return queries
}
73 changes: 73 additions & 0 deletions plugins/postgres/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# PostgreSQL Plugin

This plugin provides an interface to PostgreSQL databases with support for direct SQL queries, SQL files, and batch execution.

## Features

- Execute single or multiple SQL statements
- Support for both direct query strings and SQL file execution
- Synchronous (return_result: true) and asynchronous (return_result: false) execution modes
- Error reporting with line number and query for batch execution
- Transaction support for batch execution

## Configuration

### Cluster Context

```yaml
connection_string: postgresql://user:password@host:port/database # Required
```

### Job Context

```yaml
query: SELECT * FROM my_table # Required - SQL query to execute or path to .sql file
return_result: true # Optional - Whether to return query results (default: false)
```

#### Execution Modes

1. **Return Results Mode** (`return_result: true`):
- Only a single query is allowed (trailing semicolon is fine)
- Returns query results as structured data
- Fails if multiple queries are provided

2. **Execute Only Mode** (`return_result: false`):
- Multiple queries separated by `;` are allowed
- Executes all queries in order, stops and fails on first error
- Returns error message with line number and query if any query fails
- Returns a success message if all queries succeed

## Usage

```yaml
# Example 1: SELECT query with results
- name: postgresql-select
description: Execute a SELECT query and return results
command: postgres
cluster: postgres-cluster
context:
query: SELECT * FROM my_table WHERE status = 'active'
return_result: true

# Example 2: Execute an UPDATE statement
- name: postgresql-update
description: Execute an UPDATE statement
command: postgres
cluster: postgres-cluster
context:
query: UPDATE my_table SET status = 'inactive' WHERE last_active < '2025-01-01'
return_result: false


# Example 3: Execute multiple statements in a batch
- name: postgresql-batch
description: Execute multiple SQL statements in a batch
command: postgres
cluster: postgres-cluster
context:
query: |
INSERT INTO orders (id, customer_id, amount) VALUES ('ORD001', 'CUST123', 299.99);
UPDATE inventory SET stock = stock - 1 WHERE product_id = 'PROD456';
return_result: false
```
12 changes: 12 additions & 0 deletions plugins/postgres/postgres.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package main

import (
"github.com/patterninc/heimdall/internal/pkg/object/command/postgres"
"github.com/patterninc/heimdall/pkg/context"
"github.com/patterninc/heimdall/pkg/plugin"
)

// New creates a new instance of the postgres plugin.
func New(c *context.Context) (plugin.Handler, error) {
return postgres.New(c)
}