-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmongodb_pipeline.py
160 lines (119 loc) · 4.99 KB
/
mongodb_pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import dlt
from dlt.common import pendulum
from dlt.common.data_writers import TDataItemFormat
from dlt.common.pipeline import LoadInfo
from dlt.common.typing import TDataItems
from dlt.pipeline.pipeline import Pipeline
from geopy.geocoders import Nominatim
geolocator=Nominatim(user_agent="makhbazti")
# As this pipeline can be run as standalone script or as part of the tests, we need to handle the import differently.
try:
from .mongodb import mongodb, mongodb_collection # type: ignore
except ImportError:
from mongodb import mongodb, mongodb_collection
def stores() -> TDataItems:
"""Load the stores collection"""
# By default the mongo source reflects all collections in the database
source = mongodb().with_resources("stores")
source.resources["stores"].apply_hints(columns={"location": {"data_type": "complex"}})
for row in source:
coordinates=row.get("location" , {}).get("coordinates" , [])
latitude = coordinates[0]
longitude = coordinates[1]
address=geolocator.reverse(f"{latitude},{longitude}").raw['address']
state = address.get('state', '')
row['gouvernorat'] = state
yield row
def stores_loading(data : TDataItems ,pipeline: Pipeline = None) -> LoadInfo:
"""Use the mongo source to load the stores collection"""
if pipeline is None:
# Create a pipeline
pipeline = dlt.pipeline(
import_schema_path="schemas/import",
export_schema_path="schemas/export",
pipeline_name="LOADING STORES",
destination='postgres',
dataset_name="public",
progress='log'
)
hints = {
"write_disposition": "merge",
"primary_key": "_id",
"columns": {"updated_at":{"dedup_sort":"desc"}}
}
# Run the pipeline. For a large db this may take a while
info = pipeline.run(data, **hints)
return info
def load_select_collection_db(pipeline: Pipeline = None) -> LoadInfo:
"""Use the mongodb source to reflect an entire database schema and load select tables from it.
This example sources data from a sample mongo database data from [mongodb-sample-dataset](https://github.com/neelabalan/mongodb-sample-dataset).
"""
if pipeline is None:
# Create a pipeline
pipeline = dlt.pipeline(
import_schema_path="schemas/import",
export_schema_path="schemas/export",
pipeline_name="LOADING PRODUCTS_PROMOTIONS_PAYMENTMETHODS_EMPLOYEES",
destination='postgres',
dataset_name="public",
progress='log'
)
mflix = mongodb().with_resources(
"products","paymentMethods","employees"
)
mflix.resources['products'].apply_hints(columns={"price": {"data_type": "double", "nullable": True}})
hints = {
"write_disposition": "merge",
"primary_key": "_id",
"columns": {"updated_at":{"dedup_sort":"desc"}}
}
# Run the pipeline. The merge write disposition merges existing rows in the destination by primary key
info = pipeline.run(mflix, **hints , refresh="drop_resources")
return info
def load_sales_collection(pipeline: Pipeline = None) -> LoadInfo:
if pipeline is None:
# Create a pipeline
pipeline = dlt.pipeline(
import_schema_path="schemas/import",
export_schema_path="schemas/export",
pipeline_name="LOADING SALES",
destination='postgres',
dataset_name="public",
progress='log'
)
# Load a table incrementally with append write disposition
# this is good when a table only has new rows inserted, but not updated
sales_collection = mongodb_collection(
collection="sales",
incremental=dlt.sources.incremental(
"createdAt"
))
info = pipeline.run(sales_collection, write_disposition="append" , refresh="drop_resources" )
return info
def load_products_collection(pipeline: Pipeline = None) -> LoadInfo:
if pipeline is None:
# Create a pipeline
pipeline = dlt.pipeline(
import_schema_path="schemas/import",
export_schema_path="schemas/export",
pipeline_name="LOADING PRODUCTS",
destination='postgres',
dataset_name="public",
progress='log'
)
# Configure the source to load a few select collections incrementally
p = mongodb().with_resources("products")
hints = {
"write_disposition": "merge",
"primary_key": "_id",
"columns": {"updated_at":{"dedup_sort":"desc"}}
}
# Run the pipeline. The merge write disposition merges existing rows in the destination by primary key
info = pipeline.run(p, **hints )
return info
if __name__ == "__main__":
# Credentials for the sample database.
# Load selected tables with different settings
print(stores_loading(stores()))
print(load_sales_collection())
print(load_select_collection_db())