Skip to content

Commit

Permalink
feat(extractor-plugins): add crawler specifications (#478)
Browse files Browse the repository at this point in the history
* feat(mongodb-extractor): add exclude resource list

* chore(mongo): update check for default collections

* feat(couchdb): add exclude databases  list

* feat(mysql): add exclude resources list

* feat(mssql): add exclude resource list

* feat(mariadb): add exclude resources list

* feat(snowflake): add exclude resource list

* feat(gcs): add excluded buckets list

* feat(grafana): add exclude resources list

* feat(cassandra): add exclude resources list

* feat(clickhouse): add exclude resource list

* chore(snowflake): revert fields reading from API resoponse
  • Loading branch information
Chief-Rishab authored Apr 19, 2023
1 parent 4c18210 commit dac373f
Show file tree
Hide file tree
Showing 20 changed files with 478 additions and 227 deletions.
39 changes: 22 additions & 17 deletions plugins/extractors/cassandra/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,34 +10,39 @@ source:
password: 1234
host: localhost
port: 9042
exclude:
keyspaces: [mykeyspace]
tables: [mykeyspace_2.tableName_1]
```
## Inputs
| Key | Value | Example | Description | |
| :-- | :---- | :------ | :---------- | :- |
| `user_id` | `string` | `admin` | User ID to access the cassandra server| *required* |
| `password` | `string` | `1234` | Password for the cassandra Server | *required* |
| `host` | `string` | `127.0.0.1` | The Host address at which server is running | *required* |
| `port` | `int` | `9042` | The Port number at which server is running | *required* |
| Key | Value | Example | Description | |
| :------------------ | :--------- | :---------------------- | :--------------------------------------------- | :--------- |
| `user_id` | `string` | `admin` | User ID to access the cassandra server | _required_ |
| `password` | `string` | `1234` | Password for the cassandra Server | _required_ |
| `host` | `string` | `127.0.0.1` | The Host address at which server is running | _required_ |
| `port` | `int` | `9042` | The Port number at which server is running | _required_ |
| `exclude.keyspcaes` | `[]string` | `[keyspace1,keyspace2]` | List of keyspaces to be excluded from crawling | _optional_ |
| `exclude.tables` | `[]string` | `[keyspace3.table1]` | List of tables to be excluded from crawling | _optional_ |

## Outputs

| Field | Sample Value |
| :---- | :---- |
| `resource.urn` | `my_keyspace.my_table` |
| `resource.name` | `my_table` |
| `resource.service` | `cassandra` |
| `description` | `table description` |
| `profile.total_rows` | `2100` |
| `schema` | [][Column](#column) |
| Field | Sample Value |
| :------------------- | :--------------------- |
| `resource.urn` | `my_keyspace.my_table` |
| `resource.name` | `my_table` |
| `resource.service` | `cassandra` |
| `description` | `table description` |
| `profile.total_rows` | `2100` |
| `schema` | [][column](#column) |

### Column

| Field | Sample Value |
| :---- | :---- |
| Field | Sample Value |
| :----- | :------------ |
| `name` | `total_price` |
| `type` | `text` |
| `type` | `text` |

## Contributing

Expand Down
29 changes: 23 additions & 6 deletions plugins/extractors/cassandra/cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/gocql/gocql"
"github.com/odpf/meteor/models"
_ "github.com/odpf/meteor/models"
v1beta2 "github.com/odpf/meteor/models/odpf/assets/v1beta2"
"github.com/odpf/meteor/plugins/sqlutil"

Expand All @@ -37,10 +36,16 @@ const (

// Config holds the set of configuration for the cassandra extractor
type Config struct {
UserID string `json:"user_id" yaml:"user_id" mapstructure:"user_id" validate:"required"`
Password string `json:"password" yaml:"password" mapstructure:"password" validate:"required"`
Host string `json:"host" yaml:"host" mapstructure:"host" validate:"required"`
Port int `json:"port" yaml:"port" mapstructure:"port" validate:"required"`
UserID string `json:"user_id" yaml:"user_id" mapstructure:"user_id" validate:"required"`
Password string `json:"password" yaml:"password" mapstructure:"password" validate:"required"`
Host string `json:"host" yaml:"host" mapstructure:"host" validate:"required"`
Port int `json:"port" yaml:"port" mapstructure:"port" validate:"required"`
Exclude Exclude `json:"exclude" yaml:"exclude" mapstructure:"exclude"`
}

type Exclude struct {
Keyspaces []string `json:"keyspaces" yaml:"keyspaces" mapstructure:"keyspaces"`
Tables []string `json:"tables" yaml:"tables" mapstructure:"tables"`
}

var sampleConfig = `
Expand All @@ -61,6 +66,7 @@ var info = plugins.Info{
type Extractor struct {
plugins.BaseExtractor
excludedKeyspaces map[string]bool
excludeTables map[string]bool
logger log.Logger
config Config
session *gocql.Session
Expand All @@ -84,7 +90,9 @@ func (e *Extractor) Init(ctx context.Context, config plugins.Config) (err error)
}

// build excluded database list
e.excludedKeyspaces = sqlutil.BuildBoolMap(defaultKeyspaceList)
excludedKeyspacesList := append(defaultKeyspaceList, e.config.Exclude.Keyspaces...)
e.excludedKeyspaces = sqlutil.BuildBoolMap(excludedKeyspacesList)
e.excludeTables = sqlutil.BuildBoolMap(e.config.Exclude.Tables)

// connect to cassandra
cluster := gocql.NewCluster(e.config.Host)
Expand Down Expand Up @@ -144,6 +152,9 @@ func (e *Extractor) extractTables(keyspace string) (err error) {
if err = scanner.Scan(&tableName); err != nil {
return errors.Wrapf(err, "failed to iterate over %s", tableName)
}
if e.isExcludedTable(keyspace, tableName) {
continue
}
if err = e.processTable(keyspace, tableName); err != nil {
return errors.Wrap(err, "failed to process table")
}
Expand Down Expand Up @@ -212,6 +223,12 @@ func (e *Extractor) isExcludedKeyspace(keyspace string) bool {
return ok
}

func (e *Extractor) isExcludedTable(keyspace, table string) bool {
tableName := fmt.Sprintf("%s.%s", keyspace, table)
_, ok := e.excludeTables[tableName]
return ok
}

// init register the extractor to the catalog
func init() {
if err := registry.Extractors.Register("cassandra", func() plugins.Extractor {
Expand Down
37 changes: 21 additions & 16 deletions plugins/extractors/clickhouse/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,33 +6,38 @@
source:
name: clickhouse
config:
connection_url: tcp://localhost:3306?username=admin&password=pass123&debug=true
connection_url: clickhouse://username:password@clickhouse-server:9000
exclude:
databases: [database_a, database_b]
tables: [database_c.table_a]
```
## Inputs
| Key | Value | Example | Description | |
| :-- | :---- | :------ | :---------- | :- |
| `connection_url` | `string` | `tcp://localhost:3306?username=admin&password=pass123&debug=true` | URL to access the clickhouse server | *required* |
| Key | Value | Example | Description | |
| :------------------ | :--------- | :---------------------------------------------------------------- | :---------------------------------- | :--------- |
| `connection_url` | `string` | `tcp://localhost:3306?username=admin&password=pass123&debug=true` | URL to access the clickhouse server | _required_ |
| `exclude.databases` | `[]string` | `[database_a`, `database_b]` | List of databases to be excluded | _optional_ |
| `exclude.tables` | `[]string` | `[database_c.table_a, database_c.table_b]` | List of tables to be excluded | _optional_ |

## Outputs

| Field | Sample Value |
| :---- | :---- |
| `resource.urn` | `my_database.my_table` |
| `resource.name` | `my_table` |
| `resource.service` | `clickhouse` |
| `description` | `table description` |
| `profile.total_rows` | `2100` |
| `schema` | [][Column](#column) |
| Field | Sample Value |
| :------------------- | :--------------------- |
| `resource.urn` | `my_database.my_table` |
| `resource.name` | `my_table` |
| `resource.service` | `clickhouse` |
| `description` | `table description` |
| `profile.total_rows` | `2100` |
| `schema` | [][column](#column) |

### Column

| Field | Sample Value |
| :---- | :---- |
| `name` | `total_price` |
| Field | Sample Value |
| :------------ | :------------------- |
| `name` | `total_price` |
| `description` | `item's total price` |
| `data_type` | `String` |
| `data_type` | `String` |

## Contributing

Expand Down
31 changes: 26 additions & 5 deletions plugins/extractors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/odpf/meteor/models"
v1beta2 "github.com/odpf/meteor/models/odpf/assets/v1beta2"
"github.com/odpf/meteor/plugins"
"github.com/odpf/meteor/plugins/sqlutil"
"github.com/odpf/meteor/registry"
"github.com/odpf/salt/log"
)
Expand All @@ -22,10 +23,19 @@ var summary string

// Config holds the connection URL for the extractor
type Config struct {
ConnectionURL string `json:"connection_url" yaml:"connection_url" mapstructure:"connection_url" validate:"required"`
ConnectionURL string `json:"connection_url" yaml:"connection_url" mapstructure:"connection_url" validate:"required"`
Exclude Exclude `json:"exclude" yaml:"exclude" mapstructure:"exclude"`
}

var sampleConfig = `connection_url: "tcp://localhost:3306?username=admin&password=pass123&debug=true"`
type Exclude struct {
Databases []string `json:"databases" yaml:"databases" mapstructure:"databases"`
Tables []string `json:"tables" yaml:"tables" mapstructure:"tables"`
}

var sampleConfig = `connection_url: "tcp://localhost:3306?username=admin&password=pass123&debug=true"
exclude:
databases: [database_a, database_b]
tables: [dataset_c.table_a]`

var info = plugins.Info{
Description: "Column-oriented DBMS for online analytical processing.",
Expand All @@ -38,9 +48,11 @@ var info = plugins.Info{
// and logger interface for the extractor
type Extractor struct {
plugins.BaseExtractor
config Config
logger log.Logger
db *sql.DB
config Config
logger log.Logger
excludedDBs map[string]bool
excludedTbl map[string]bool
db *sql.DB
}

// New returns a pointer to an initialized Extractor Object
Expand All @@ -59,6 +71,10 @@ func (e *Extractor) Init(ctx context.Context, config plugins.Config) (err error)
return err
}

// initialize excluded databases and tables
e.excludedDBs = sqlutil.BuildBoolMap(e.config.Exclude.Databases)
e.excludedTbl = sqlutil.BuildBoolMap(e.config.Exclude.Tables)

if e.db, err = sql.Open("clickhouse", e.config.ConnectionURL); err != nil {
return errors.Wrap(err, "failed to create a client")
}
Expand Down Expand Up @@ -91,6 +107,11 @@ func (e *Extractor) extractTables(emit plugins.Emit) (err error) {
return
}

// skip excluded databases and tables
if e.excludedDBs[dbName] || e.excludedTbl[fmt.Sprintf("%s.%s", dbName, tableName)] {
continue
}

var columns []*v1beta2.Column
columns, err = e.getColumnsInfo(dbName, tableName)
if err != nil {
Expand Down
30 changes: 16 additions & 14 deletions plugins/extractors/couchdb/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,33 @@ source:
name: couchdb
config:
connection_url: http://admin:pass123@localhost:3306/
exclude: database_a,database_b
```
## Inputs
| Key | Value | Example | Description | |
| :-- | :---- | :------ | :---------- | :- |
| `connection_url` | `string` | `http://admin:pass123@localhost:3306/` | URL to access the couchdb server | *required* |
| Key | Value | Example | Description | |
| :--------------- | :------- | :------------------------------------- | :--------------------------------------------------------- | :--------- |
| `connection_url` | `string` | `http://admin:pass123@localhost:3306/` | URL to access the couchdb server | _required_ |
| `exclude` | `string` | `primaryDB,secondaryDB` | Comma separated database list to be excluded from crawling | _optional_ |

## Outputs

| Field | Sample Value |
| :---- | :---- |
| `resource.urn` | `database_name.docID` |
| `resource.name` | `docID` |
| `resource.service` | `couchdb` |
| `schema` | [][Column](#column) |
| Field | Sample Value |
| :----------------- | :-------------------- |
| `resource.urn` | `database_name.docID` |
| `resource.name` | `docID` |
| `resource.service` | `couchdb` |
| `schema` | [][column](#column) |

### Column

| Field | Sample Value |
| :---- | :---- |
| `name` | `field1` |
| Field | Sample Value |
| :------------ | :------------------------- |
| `name` | `field1` |
| `description` | `rev for revision history` |
| `data_type` | `float64` |
| `length` | `` |
| `data_type` | `float64` |
| `length` | `` |

## Contributing

Expand Down
26 changes: 11 additions & 15 deletions plugins/extractors/couchdb/couchdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ import (
_ "embed" // used to print the embedded assets
"fmt"
"reflect"
"strings"

_ "github.com/go-kivik/couchdb"
"github.com/go-kivik/kivik"
"github.com/odpf/meteor/models"
v1beta2 "github.com/odpf/meteor/models/odpf/assets/v1beta2"
"github.com/odpf/meteor/plugins"
"github.com/odpf/meteor/plugins/sqlutil"
"github.com/odpf/meteor/registry"
"github.com/odpf/salt/log"
"google.golang.org/protobuf/types/known/anypb"
Expand All @@ -28,9 +30,12 @@ var defaultDBList = []string{
// Config holds the connection URL for the extractor
type Config struct {
ConnectionURL string `json:"connection_url" yaml:"connection_url" mapstructure:"connection_url" validate:"required"`
Exclude string `json:"exclude" yaml:"exclude" mapstructure:"exclude"`
}

var sampleConfig = `connection_url: http://admin:pass123@localhost:3306/`
var sampleConfig = `
connection_url: http://admin:pass123@localhost:3306/
exclude: database_a,database_b`

var info = plugins.Info{
Description: "Table metadata from CouchDB server,",
Expand Down Expand Up @@ -67,7 +72,8 @@ func (e *Extractor) Init(ctx context.Context, config plugins.Config) (err error)
}

// build excluded database list
e.buildExcludedDBs()
excludedList := append(defaultDBList, strings.Split(e.config.Exclude, ",")...)
e.excludedDbs = sqlutil.BuildBoolMap(excludedList)

// create client
e.client, err = kivik.New("couch", e.config.ConnectionURL)
Expand All @@ -90,6 +96,9 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error)
}

for _, dbName := range res {
if e.isExcludedDB(dbName) {
continue
}
if err := e.extractTables(ctx, dbName); err != nil {
return err
}
Expand All @@ -99,10 +108,6 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error)

// Extract tables from a given database
func (e *Extractor) extractTables(ctx context.Context, dbName string) (err error) {
// skip if database is default
if e.isExcludedDB(dbName) {
return
}
e.db = e.client.DB(ctx, dbName)

// extract documents
Expand Down Expand Up @@ -176,15 +181,6 @@ func (e *Extractor) extractColumns(ctx context.Context, docID string) (columns [
return
}

func (e *Extractor) buildExcludedDBs() {
excludedMap := make(map[string]bool)
for _, db := range defaultDBList {
excludedMap[db] = true
}

e.excludedDbs = excludedMap
}

func (e *Extractor) isExcludedDB(database string) bool {
_, ok := e.excludedDbs[database]
return ok
Expand Down
Loading

0 comments on commit dac373f

Please sign in to comment.