Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add parameter to drop fields #70

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
96 changes: 91 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# tap-mongodb

This is a [Singer](https://singer.io) tap that produces JSON-formatted data following the [Singer spec](https://github.com/singer-io/getting-started/blob/master/SPEC.md) from a MongoDB source.
This is a [Singer](https://singer.io) tap that produces JSON-formatted data following the [Singer spec](https://github.com/singer-io/getting-started/blob/master/SPEC.md) from a MongoDB source.
This plugin extends the original [implementation](https://github.com/singer-io/tap-mongodb) by introducing support for **srv** mode.

## Set up Virtual Environment
```
Expand All @@ -20,17 +21,17 @@ Create json file called `config.json`, with the following contents:
{
"password": "<password>",
"user": "<username>",
"host": "<host ip address>",
"port": "<port>",
"database": "<database name>"
"host": "<host ip address>"
}
```
The folowing parameters are optional for your config file:

| Name | Type | Description |
| -----|------|------------ |
| `replica_set` | string | name of replica set |
| `port` | integer | port number (default is 27017) |
| `database` | string | name of the database |
|`ssl` | Boolean | can be set to true to connect using ssl |
|`srv` | Boolean | use DNS Seed List connection string - [link](https://docs.mongodb.com/manual/reference/connection-string/) |
| `include_schema_in_destination_stream_name` | Boolean | forces the stream names to take the form `<database_name>_<collection_name>` instead of `<collection_name>`|

All of the above attributes are required by the tap to connect to your mongo instance.
Expand Down Expand Up @@ -133,6 +134,91 @@ The tap will write bookmarks to stdout which can be captured and passed as an op
### Local MongoDB Setup
If you haven't yet set up a local mongodb client, follow [these instructions](https://github.com/singer-io/tap-mongodb/blob/master/spikes/local_mongo_setup.md)

## Use as a Meltano plugin
To use this tap in Meltano, the configuration `meltano.yml` file needs to be extended with a definition of a **custom plugin** as follows:

```
plugins:
extractors:
- name: tap-mongodb
label: MongoDB
description: General purpose, document-based, distributed database
namespace: tap_mongodb
variants:
- name: strv
repo: https://github.com/strvcom/tap-mongodb
pip_url: git+https://github.com/strvcom/tap-mongodb.git
executable: tap-mongodb
capabilities:
- catalog
- discover
- state
settings_group_validation:
- [ 'host', 'user', 'password']
settings:
- name: host
label: Host URL
value: localhost
- name: port
kind: integer
value: 27017
- name: user
- name: password
kind: password
- name: database
label: Database Name
- name: replica_set
- name: ssl
kind: boolean
value: false
value_post_processor: stringify
label: SSL
- name: verify_mode
kind: boolean
value: true
value_post_processor: stringify
description: SSL Verify Mode
- name: srv
kind: boolean
value: true
description: Use DNS Seed List connection string
- name: include_schemas_in_destination_stream_name
kind: boolean
value: false
description: Forces the stream names to take the form `<database_name>_<collection_name>` instead of `<collection_name>`
```

To configure the plugin, there is an [official guide](https://hub.meltano.com/extractors/mongodb.html) to follow.
The official guide & plugin does not support **srv** mode, to configure this plugin to run in this mode, extend the configuration file with the following:

```
plugins:
extractors:
- name: tap-mongodb
...
config:
host: <host ip address>
user: <username>
srv: true
```

The passwords should be set as described in the original guide.

**Note**: An extractor configuration needs to be defined for this plugin to work. The configuration is described in the [official guide](https://meltano.com/docs/getting-started.html#add-an-extractor-to-pull-data-from-a-source).
A simple example of additional configuration is the following extension of configuration file:

```
plugins:
extractors:
- name: tap-mongodb
...
config:
...
metadata:
'*':
replication-method: FULL_TABLE
```

---

Copyright &copy; 2019 Stitch
47 changes: 24 additions & 23 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,31 @@

from setuptools import setup

setup(name='tap-mongodb',
version='2.0.1',
description='Singer.io tap for extracting data from MongoDB',
author='Stitch',
url='https://singer.io',
classifiers=['Programming Language :: Python :: 3 :: Only'],
py_modules=['tap_mongodb'],
install_requires=[
'singer-python==5.8.0',
'pymongo==3.8.0',
'tzlocal==2.0.0',
'terminaltables==3.1.0',
],
extras_require={
'dev': [
'pylint',
'nose',
'ipdb'
]
},
entry_points='''
setup(
name='tap-mongodb',
version='2.0.1',
description='Singer.io tap for extracting data from MongoDB',
author='Stitch',
url='https://singer.io',
classifiers=['Programming Language :: Python :: 3 :: Only'],
py_modules=['tap_mongodb'],
install_requires=[
'singer-python==5.8.0',
'pymongo==3.12.2',
'tzlocal==2.0.0',
'terminaltables==3.1.0',
'dnspython==2.1.0' # to support srv
],
extras_require={
'dev': [
'pylint',
'nose',
'ipdb'
]
},
entry_points='''
[console_scripts]
tap-mongodb=tap_mongodb:main
''',
packages=['tap_mongodb', 'tap_mongodb.sync_strategies'],

packages=['tap_mongodb', 'tap_mongodb.sync_strategies'],
)
96 changes: 57 additions & 39 deletions tap_mongodb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,22 @@
import json
import ssl
import sys
import time
import pymongo
from bson import timestamp

import pymongo
import singer
from singer import metadata, metrics, utils

import tap_mongodb.sync_strategies.common as common
import tap_mongodb.sync_strategies.full_table as full_table
import tap_mongodb.sync_strategies.oplog as oplog
import tap_mongodb.sync_strategies.incremental as incremental

import tap_mongodb.sync_strategies.oplog as oplog

LOGGER = singer.get_logger()

REQUIRED_CONFIG_KEYS = [
'host',
'port',
'user',
'password',
'database'
'password'
]

IGNORE_DBS = ['system', 'local', 'config']
Expand Down Expand Up @@ -103,16 +98,22 @@ def get_roles(client, config):
roles.append(sub_role)
return roles

def get_databases(client, config):
roles = get_roles(client, config)
LOGGER.info('Roles: %s', roles)

can_read_all = len([r for r in roles if r['role'] in ROLES_WITH_ALL_DB_FIND_PRIVILEGES]) > 0

if can_read_all:
def get_databases(client, config):
if 'database' not in config:
# handle the case when no database is provided - read all databases
LOGGER.info('No Roles loaded')
db_names = [d for d in client.list_database_names() if d not in IGNORE_DBS]
else:
db_names = [r['db'] for r in roles if r['db'] not in IGNORE_DBS]
# take roles into consideration
roles = get_roles(client, config)
LOGGER.info('Roles: %s', roles)
can_read_all = any([r['role'] in ROLES_WITH_ALL_DB_FIND_PRIVILEGES for r in roles])
if can_read_all:
db_names = [d for d in client.list_database_names() if d not in IGNORE_DBS]
else:
db_names = [r['db'] for r in roles if r['db'] not in IGNORE_DBS]

db_names = list(set(db_names)) # Make sure each db is only in the list once
LOGGER.info('Datbases: %s', db_names)
return db_names
Expand Down Expand Up @@ -179,7 +180,7 @@ def do_discover(client, config):
db_name, collection_name)
streams.append(produce_collection_schema(collection))

json.dump({'streams' : streams}, sys.stdout, indent=2)
json.dump({'streams': streams}, sys.stdout, indent=2)


def is_stream_selected(stream):
Expand All @@ -191,7 +192,6 @@ def is_stream_selected(stream):


def get_streams_to_sync(streams, state):

# get selected streams
selected_streams = [s for s in streams if is_stream_selected(s)]
# prioritize streams that have not been processed
Expand All @@ -214,7 +214,7 @@ def get_streams_to_sync(streams, state):
lambda s: s['tap_stream_id'] == currently_syncing,
ordered_streams))
non_currently_syncing_streams = list(filter(lambda s: s['tap_stream_id']
!= currently_syncing,
!= currently_syncing,
ordered_streams))

streams_to_sync = currently_syncing_stream + non_currently_syncing_streams
Expand All @@ -230,6 +230,7 @@ def write_schema_message(stream):
schema=stream['schema'],
key_properties=['_id']))


def load_stream_projection(stream):
md_map = metadata.to_map(stream['metadata'])
stream_projection = metadata.get(md_map, (), 'tap-mongodb.projection')
Expand All @@ -246,10 +247,11 @@ def load_stream_projection(stream):
if stream_projection and stream_projection.get('_id') == 0:
raise common.InvalidProjectionException(
"Projection blacklists key property id for collection {}" \
.format(stream['tap_stream_id']))
.format(stream['tap_stream_id']))

return stream_projection


def clear_state_on_replication_change(stream, state):
md_map = metadata.to_map(stream['metadata'])
tap_stream_id = stream['tap_stream_id']
Expand All @@ -276,15 +278,15 @@ def clear_state_on_replication_change(stream, state):

return state

def sync_stream(client, stream, state):

def sync_stream(client, stream, state, fields_to_drop):
tap_stream_id = stream['tap_stream_id']

common.COUNTS[tap_stream_id] = 0
common.TIMES[tap_stream_id] = 0
common.SCHEMA_COUNT[tap_stream_id] = 0
common.SCHEMA_TIMES[tap_stream_id] = 0


md_map = metadata.to_map(stream['metadata'])
replication_method = metadata.get(md_map, (), 'replication-method')
database_name = metadata.get(md_map, (), 'database-name')
Expand Down Expand Up @@ -321,15 +323,15 @@ def sync_stream(client, stream, state):
collection_oplog_ts = oplog.get_latest_ts(client)
oplog.update_bookmarks(state, tap_stream_id, collection_oplog_ts)

full_table.sync_collection(client, stream, state, stream_projection)
full_table.sync_collection(client, stream, state, stream_projection, fields_to_drop)

oplog.sync_collection(client, stream, state, stream_projection)

elif replication_method == 'FULL_TABLE':
full_table.sync_collection(client, stream, state, stream_projection)
full_table.sync_collection(client, stream, state, stream_projection, fields_to_drop)

elif replication_method == 'INCREMENTAL':
incremental.sync_collection(client, stream, state, stream_projection)
incremental.sync_collection(client, stream, state, stream_projection, fields_to_drop)
else:
raise Exception(
"only FULL_TABLE, LOG_BASED, and INCREMENTAL replication \
Expand All @@ -340,12 +342,12 @@ def sync_stream(client, stream, state):
singer.write_message(singer.StateMessage(value=copy.deepcopy(state)))


def do_sync(client, catalog, state):
def do_sync(client, catalog, state, fields_to_drop):
all_streams = catalog['streams']
streams_to_sync = get_streams_to_sync(all_streams, state)

for stream in streams_to_sync:
sync_stream(client, stream, state)
sync_stream(client, stream, state, fields_to_drop)

LOGGER.info(common.get_sync_summary(catalog))

Expand All @@ -358,20 +360,35 @@ def main_impl():
verify_mode = config.get('verify_mode', 'true') == 'true'
use_ssl = config.get('ssl') == 'true'

connection_params = {"host": config['host'],
"port": int(config['port']),
"username": config.get('user', None),
"password": config.get('password', None),
"authSource": config['database'],
"ssl": use_ssl,
"replicaset": config.get('replica_set', None),
"readPreference": 'secondaryPreferred'}
# Use DNS Seed List
srv = config.get('srv') == 'true'

# if no dropping fields specified, create empty list
if not 'fields_to_drop' in list(config.keys()):
config['fields_to_drop'] = []

# create the connection
if srv:
connection_params = {
"host": config['host'],
"port": int(config.get('port', 27017)),
"username": config.get('user', None),
"password": config.get('password', None),
"authSource": config.get('database', None),
"ssl": use_ssl,
"replicaset": config.get('replica_set', None),
"readPreference": 'secondaryPreferred'
}

# NB: "ssl_cert_reqs" must ONLY be supplied if `SSL` is true.
if not verify_mode and use_ssl:
connection_params["ssl_cert_reqs"] = ssl.CERT_NONE
# NB: "ssl_cert_reqs" must ONLY be supplied if `SSL` is true.
if not verify_mode and use_ssl:
connection_params["ssl_cert_reqs"] = ssl.CERT_NONE

client = pymongo.MongoClient(**connection_params)
client = pymongo.MongoClient(**connection_params)
else:
# TODO - does not take all parameters into account -> just our case
connection_url = "mongodb+srv://{}:{}@{}".format(config['user'], config['password'], config['host'])
client = pymongo.MongoClient(connection_url)

LOGGER.info('Connected to MongoDB host: %s, version: %s',
config['host'],
Expand All @@ -384,7 +401,8 @@ def main_impl():
do_discover(client, config)
elif args.catalog:
state = args.state or {}
do_sync(client, args.catalog.to_dict(), state)
do_sync(client, args.catalog.to_dict(), state, config['fields_to_drop'])


def main():
try:
Expand Down
Loading