-
Notifications
You must be signed in to change notification settings - Fork 0
/
csv2json.py
85 lines (75 loc) · 3.15 KB
/
csv2json.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import csv
import json
import itertools
def load(fname):
"""as geojson"""
with open(fname) as f:
reader = csv.DictReader(f, delimiter=';')
for d in reader:
geom = json.loads(d.pop('geometry'))
yield {'type': 'feature', 'geometry': geom, 'properties': d}
def to_screen(features, l, b, r, t, width, height):
"as {styles, coords} list"
tx = width / (r - l)
ty = height / (t - b)
features_by_layers = sorted((k, list(g)) for k, g in\
itertools.groupby(features,
lambda x: (int(x['properties']['layer']), int(x['properties']['is_tunnel']))))
#features_by_layers.reverse()
lines = []
tunnels = []
for (layer, is_tunnel), features in features_by_layers:
inner = []
casing = []
dash = []
for feature in features:
inner_style = {'type': 'line', 'lineJoin': 'round', 'lineCap': 'round'}
casing_style = {'type': 'line', 'lineJoin': 'round'}
dash_style = {'type': 'dashline'}
geom = feature['geometry']
coords = list(map(int, ((x - l) * tx, (y - b) * ty)) for x, y in geom['coordinates'])
# filter coords
coords = list((x, y) for x, y in coords if 0 <= x < 65536 and 0 <= y <= 65536)
#print coords
# if not any(((0 < x < 1200) and (0 < y < 1200)) for x, y in coords):
# continue
# if not feature['properties']['type'] == 'primary':
# continue
properties = feature['properties']
highway = properties['type']
klass = properties['class']
if highway == 'primary':
inner.append(dict(line=coords, stroke=0, **inner_style))
casing.append(dict(line=coords, stroke=1, **casing_style))
elif highway == 'motorway':
inner.append(dict(line=coords, stroke=2, **inner_style))
casing.append(dict(line=coords, stroke=3, **casing_style))
elif klass == 'railway':
# use dash symbolizer
inner.append(dict(line=coords, stroke=6, **inner_style))
casing.append(dict(line=coords, stroke=7, **casing_style))
dash.append(dict(line=coords, stroke=0, **dash_style))
else:
inner.append(dict(line=coords, stroke=4, **inner_style))
casing.append(dict(line=coords, stroke=5, **casing_style))
if is_tunnel:
tunnels.extend(casing)
tunnels.extend(inner)
tunnels.extend(dash)
else:
lines.extend(casing)
lines.extend(inner)
lines.extend(dash)
return tunnels + lines
def packjson(fname, out, screen):
features = load(fname)
lines = list(to_screen(features, *screen))
#lines.reverse()
print("len(lines): %s" % len(lines))
data = json.dumps(lines, separators=(',',':'))
with open(out, 'wb') as f:
f.write('var ROADS = ')
f.write(data)
f.write(';')
if __name__ == '__main__':
packjson('roads.csv', 'roads.js', [-7910500.0, 5212900.0, -7909900.0, 5213500.0, 600.0, 600.0])