-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathaberdeenshire_council_scraper.py
156 lines (130 loc) · 4.68 KB
/
aberdeenshire_council_scraper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
from urllib.request import Request, urlopen
from bs4 import BeautifulSoup
import datefinder
import csv
import math
# https://stackoverflow.com/a/14822210/13940304
def convert_size(size_bytes):
"""Takes a size in bytes and convert units
Args:
size_bytes (int): raw size in bytes
Returns:
tuple: string with size and a unit string
"""
if size_bytes == 0:
return "0B"
size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
i = int(math.floor(math.log(size_bytes, 1024)))
p = math.pow(1024, i)
s = round(size_bytes / p, 2)
return ("%s" % (s), size_name[i])
def get_last_updated(link_text: str) -> str:
# Strict to prevent numbers in dataset name returning false dates
matches = datefinder.find_dates(link_text, strict=True)
date = next(matches)
return date.strftime("%d/%m/%Y")
def get_feeds(soup):
"""Get feeds and construct dictionaries with all the values that will be used for the output csv
Args:
soup (BeautifulSoup): Beautifulsoup object
Returns:
list: list containing all feed dictionaries
"""
# Get table and rows
table = soup.find("table")
rows = table.find_all("tr")[1:]
feeds = []
# get title and files associated to the feed
for row in rows:
tds = row.find_all("td")
feed = {}
# Add title and files key
title = tds[0].get_text()
print(title)
feed["title"] = title
feed["files"] = {}
# Add files with their links, last updated and filesize.
files = [a_tag for a_tag in tds[1].find_all("a", href=True)]
for anchor in files:
link = anchor.get("href")
if link.endswith(".kmz") or link.endswith(".csv") or link.endswith(".zip"):
filename = link.rsplit("/", 1)[-1]
# get last updated
last_updated = get_last_updated(anchor.text)
# get size of file
# try:
# filesize = urlopen(link).length
# except Exception as e:
# print("Couldn't get file size!")
# print(e)
# filesize = 0
filesize = 0
formatted_fs, unit = convert_size(filesize)
feed["files"][filename] = {
"link": link,
"filesize": {"value": formatted_fs, "unit": unit},
"last-updated": last_updated,
"filetype": filename[-3:].upper(),
}
feeds.append(feed)
return feeds
def parse_feeds(feeds):
"""Process feeds to be ready for csv ouput"""
proc_feeds = []
for feed in feeds:
for datafile in feed["files"].keys():
formatted_feed = []
formatted_feed.append(feed["title"])
formatted_feed.append("Aberdeenshire Council")
formatted_feed.append("https://www.aberdeenshire.gov.uk/online/open-data/")
formatted_feed.append(feed["files"][datafile]["link"])
formatted_feed.append("NULL")
formatted_feed.append(feed["files"][datafile]["last-updated"])
formatted_feed.append(feed["files"][datafile]["filesize"]["value"])
formatted_feed.append(feed["files"][datafile]["filesize"]["unit"])
formatted_feed.append(feed["files"][datafile]["filetype"])
formatted_feed.append("NULL")
formatted_feed.append("NULL")
formatted_feed.append("NULL")
formatted_feed.append("Open Government")
formatted_feed.append("NULL")
proc_feeds.append(formatted_feed)
return proc_feeds
def output(parsed):
with open("data/scraped-results/aberdeenshire.csv", "w", encoding="UTF8") as f:
writer = csv.writer(f)
# write the header
header = [
"Title",
"Owner",
"PageURL",
"AssetURL",
"DateCreated",
"DateUpdated",
"FileSize",
"FileSizeUnit",
"FileType",
"NumRecords",
"OriginalTags",
"ManualTags",
"License",
"Description",
]
writer.writerow(header)
# write the data
for record in parsed:
writer.writerow(record)
def main():
### construct array of feed objects
req = Request(
"https://www.aberdeenshire.gov.uk/data/open-data/",
headers={"User-Agent": "Mozilla/5.0"},
)
page = urlopen(req).read()
soup = BeautifulSoup(page, "html.parser")
feeds = get_feeds(soup)
parsed = parse_feeds(feeds)
# make csv file
output(parsed)
if __name__ == "__main__":
main()