forked from OpenEnergyPlatform/saio
-
Notifications
You must be signed in to change notification settings - Fork 0
/
saio.py
208 lines (158 loc) · 6.31 KB
/
saio.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
"""
SQLAlchemyIO (saio): Module hack for autoloading table definitions
## Usage
After
```python
import saio
saio.register_schema("model_draft", engine)
````
one can import table declarations easily using
```python
from saio.model_draft import lis_charging_poi as LisChargingPoi
```
Note that `ipython` and Jupyter Notebook, allow using `<TAB>` to auto-complete
table names.
## Implementation details
`saio.register_schema` instantiates a declarative base using
```python
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base(bind=engine)
# The Base can be imported using from saio.model_draft import Base
```
and then whenever one imports any table from `saio.model_draft`, ie. by calling
`from saio.model_draft import lis_charging_poi as LisChargingPoi`, saio does
approximately
```python
class LisChargingPoi(Base):
__tablename__ = 'lis_charging_poi'
__table_args__ = {'schema': 'model_draft', 'autoload': True}
```
"""
__copyright__ = "© Reiner Lemoine Institut"
__license__ = "MIT"
__url__ = "https://github.com/coroa/saio/blob/master/saio.py"
__author__ = "coroa"
import sqlalchemy as sa
import sqlalchemy.ext.declarative
import sqlalchemy.engine.reflection
import warnings
from types import ModuleType
import logging
logger = logging.getLogger(__name__)
def memoize(func):
cache = {}
def memoizer(*args):
if args not in cache:
cache[args] = func(*args)
return cache[args]
return memoizer
class SchemaInspectorModule(ModuleType):
__all__ = () # to make help() happy
__package__ = __name__
__path__ = []
__file__ = __file__
def __init__(self, modulepath, moduledoc, schema, engine):
super().__init__(modulepath, moduledoc)
assert isinstance(engine, sa.engine.Engine)
self.schema = schema
self.engine = engine
self.Base = sa.ext.declarative.declarative_base(bind=engine)
self.Meta = self.Base.metadata
@memoize
def __dir__(self):
insp = sa.engine.reflection.Inspector.from_engine(self.engine)
return insp.get_table_names(self.schema) + insp.get_view_names(self.schema)
@memoize
def __getattr__(self, name):
# IPython queries several strange attributes, which should not become tables
# if tblname.startswith("_ipython") or tblname.startswith("_repr"):
# raise AttributeError(tblname)
# Idea for treating views from https://hultner.se/quickbits/2017-10-23-postgresql-reflection-views-python-sqlalchemy.html
tab = sa.Table(name, self.Meta, autoload=True, schema=self.schema)
attrs = {'__module__': self.__name__,
'__table__': tab}
if not tab.primary_key:
first_col = next(iter(tab.columns))
logger.warning(f"Reflection was unable to determine primary key (normal for views), assuming: {first_col}")
attrs['__mapper_args__'] = {'primary_key': [first_col]}
return type(name, (self.Base,), attrs)
del ModuleType
del memoize
def register_schema(schema, engine):
import sys
module = SchemaInspectorModule(f"{__name__}.{schema}", "", schema, engine)
# Make `from saio.{schema} import {table}` work
sys.modules[module.__name__] = module
# We could register the schema module in the saio module, as well
#setattr(sys.modules[__name__], schema, module)
try:
import pandas as pd
has_pandas = True
except ImportError:
has_pandas = False
try:
import geopandas as gpd
import shapely.wkb
import shapely.geos
has_geopandas = True
except ImportError:
has_geopandas = False
def as_pandas(query, index_col=None, coerce_float=True, params=None,
geometry='geom', crs=None, hex_encoded=True):
"""
Import a query into a pandas DataFrame or a geopandas GeoDataFrame
Arguments
---------
query : sqlalchemy.orm.query.Query
index_col : string or list of strings, optional, default: None
Column(s) to set as index(MultiIndex).
coerce_float : boolean, default: True
Attempts to convert values of non-string, non-numeric objects (like
decimal.Decimal) to floating point. Useful for SQL result sets.
params : list, tuple or dict, optional, default: None
List of parameters to pass to execute method. The syntax used
to pass parameters is database driver dependent. Check your
database driver documentation for which of the five syntax styles,
described in PEP 249's paramstyle, is supported.
Eg. for psycopg2, uses %(name)s so use params={'name' : 'value'}
geometry : string, default: "geom"
Column name to convert to shapely geometries
crs : dict|string|None, default: None
CRS to use for the returned GeoDataFrame; if None, CRS is determined
from the SRID of the first geometry.
hex_encoded : bool, optional, default: True
Whether the geometry is in a hex-encoded string. Default is True,
standard for postGIS. Use hex_encoded=False for sqlite databases.
Note
----
Adapted from geopandas' read_postgis function.
Usage
-----
import saio
saio.register_schema("boundaries", engine)
from saio.boundaries import bkg_vg250_2_lan as BkgLan
df = saio.as_pandas(session.query(BkgLan.geom))
df.plot()
"""
assert has_pandas, "Pandas failed to import. Please check your installation!"
df = pd.read_sql_query(query.statement, query.session.bind,
index_col=index_col, coerce_float=coerce_float,
params=params)
if geometry not in df:
return df
if not has_geopandas:
warnings.warn("GeoPandas failed to import. Geometry column is left as WKB.")
return df
if len(df) > 0:
obj = df[geometry].iloc[0]
if isinstance(obj, bytes):
load_geom = lambda s: shapely.wkb.loads(s, hex=hex_encoded)
else:
load_geom = lambda s: shapely.wkb.loads(str(s), hex=hex_encoded)
srid = getattr(obj, 'srid', -1)
if srid == -1:
srid = shapely.geos.lgeos.GEOSGetSRID(load_geom(obj)._geom)
if crs is None and srid != 0:
crs = dict(init="epsg:{}".format(srid))
df[geometry] = df[geometry].map(load_geom)
return gpd.GeoDataFrame(df, crs=crs, geometry=geometry)