-
Notifications
You must be signed in to change notification settings - Fork 1
/
service
executable file
·252 lines (227 loc) · 8.66 KB
/
service
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
#!/usr/bin/python
import re
import time
import json
import urllib3
import requests
import pytz
import traceback
import time
import sys
from os.path import isfile, join
from os import remove, walk, environ
from datetime import datetime, timedelta, date
from urlparse import urlparse
from hosted import config
session = requests.Session()
headers = {
'User-Agent': 'info-beamer mastodon poller serial {}'.format(environ.get('SERIAL', 'unknown')),
'Accept': 'application/json'
}
def cache_image(url):
image_url = urlparse(url)
image_filename = image_url.path.split('/')[-1]
if not isfile(image_filename):
try:
with open(image_filename, 'wb') as f:
r = session.get(url)
r.raise_for_status()
f.write(r.content)
except Exception as e:
sys.stderr.write("[mastodon] exception while caching image {} - {}\n".format(url, e))
return None
return image_filename
class MastodonPost:
def __init__(self, post):
self.post = post
self.profile_image = None
self.attachment_image = None
def _log(self, text):
sys.stderr.write("[mastodon] [toot {}] {}\n".format(self, text))
def __str__(self):
return self.post.get('id', 0)
def cache_images(self):
self.profile_image = cache_image(self.post['account']['avatar_static'])
for attachment in self.post.get('media_attachments', set()):
if attachment['type'] == "image":
if attachment['meta']['original']['width'] > 2048 or attachment['meta']['original']['height'] > 2048:
self.attachment_image = cache_image(attachment['preview_url'])
else:
self.attachment_image = cache_image(attachment['url'])
break
@property
def text(self):
string = self.post["content"]
# I know, this could be done using HTMLParser - but this opens a can
# of worms because we're dealing with python2.7 and unicode.
for source, replacement in {
' ': ' ',
' ': ' ',
'&': '&',
'&': '&',
'>': '>',
'>': '>',
'<': '<',
'<': '<',
'"': '"',
'"': '"',
''': '\'',
''': '\'',
'<br>': ' ',
'<br />': ' ',
'</p><p>': ' ',
}.items():
string = string.replace(source, replacement)
string = re.sub('<[^>]+>', '', string)
return string
@property
def account_name(self):
acct = self.post['account']['acct']
if '@' not in acct:
baseurl = urlparse(config.baseurl)
return "{}@{}".format(acct, baseurl.netloc)
return acct
@property
def is_from_blocked_account(self):
try:
blocked = {}
with open('blocked.txt') as f:
for line in f.read().strip().splitlines():
line = line.strip()
if '#' in line:
acct, reason = line.split('#', 1)
else:
acct, reason = line, 'unknown'
blocked[acct.strip()] = reason.strip()
for acct in config.filter_accounts.split(','):
blocked[acct.strip()] = 'specified manually'
if self.account_name in blocked:
self._log("account is blocked: {}".format(blocked[self.account_name]))
return True
except Exception as e:
self._log("Exception while checking for blocked accounts: {}".format(repr(e)))
return False
@property
def created_timestamp(self):
date_time_obj = datetime.strptime(self.post['created_at'][0:-1], '%Y-%m-%dT%H:%M:%S.%f')
return int(time.mktime(date_time_obj.timetuple()))
@property
def ignore_post(self):
if 'content' not in self.post:
self._log("Garbage: no content")
return True
if config.filter_garbage and ('@twitter.com' in self.post['content'] or "x.com/" in self.post["content"] or 'nitter.net/' in self.post['content']):
self._log("Filtered: from Twitter")
return True
if config.filter_spoilertexttoots and self.post['spoiler_text']:
self._log("Filtered: has spoiler")
return True
if self.is_from_blocked_account:
self._log("Filtered: from blocked account")
return True
return False
class MastodonFetcher:
def __init__(self):
self.session = requests.Session()
self.session.headers = {
'User-Agent': 'info-beamer mastodon poller serial {}'.format(environ.get('SERIAL', 'unknown')),
'Accept': 'application/json'
}
self.not_before = int(time.mktime(datetime.strptime(config.not_before, "%Y-%m-%d").timetuple()))
self.posts = []
def _log(self, something, tag=None):
if not isinstance(something, str):
something = repr(something)
if tag is not None:
something = "[#{}] {}".format(tag, something)
sys.stderr.write("[mastodon] {}\n".format(something))
def _fetch(self, tag):
self._log("starting _fetch()", tag=tag)
url = "{baseurl}/api/v1/timelines/tag/{tag}?limit={limit}".format(
baseurl=config.baseurl,
tag=tag,
limit=config.count,
)
r = session.get(url)
self._log("http status code was {}".format(r.status_code), tag=tag)
r.raise_for_status()
return r.json()
def fetch_and_parse(self, tag):
try:
toots = self._fetch(tag)
for post in toots:
toot = MastodonPost(post)
if toot in self.posts:
self._log("ignoring {} because duplicate".format(toot), tag=tag)
continue
if toot.ignore_post:
# reason already printed by MastodonPost
continue
if toot.created_timestamp < self.not_before:
self._log("ignoring {} because it was created too long ago (is {}, want {})".format(toot, toot.created_timestamp, self.not_before), tag=tag)
self.posts.append(toot)
except Exception as e:
self._log(e, tag=tag)
def cache_and_cleanup_images(self):
images = {
'mastodon-logo.png',
'node.png',
'package.png',
'package-header.jpg'
}
to_delete = set()
for post in self.posts:
post.cache_images()
for path in (post.profile_image, post.attachment_image):
if path is not None:
images.add(path)
for root, dirs, files in walk('./'):
for name in files:
if (
name.endswith('png')
or name.endswith('jpg')
or name.endswith('jpeg')
or name.endswith('gif')
) and name not in images:
to_delete.add(name)
for path in to_delete:
self._log("deleting image {} because it's not used anywhere".format(path))
remove(path)
def dump_json(self):
out = []
for post in self.posts:
out.append({
'created_at': post.created_timestamp,
'content': post.text,
'account': {
'acct': post.account_name,
'display_name': post.post['account']['display_name'],
'avatar_static': post.profile_image,
},
'media_attachment': post.attachment_image or '',
})
self._log("have gotten {} posts in total".format(len(out)))
if len(out) > 0:
with open('tootlist.json', 'wb') as f:
f.write(json.dumps(out, ensure_ascii=False).encode('utf8'))
self._log("wrote {} posts to tootlist.json".format(len(out)))
def main():
config.restart_on_update()
if config.poll_interval == 0:
print >>sys.stderr, "waiting for a config change"
while 1:
time.sleep(100000)
while 1:
try:
poller = MastodonFetcher()
for tag in config.tag.split(','):
poller.fetch_and_parse(tag.strip())
poller.cache_and_cleanup_images()
poller.dump_json()
except:
traceback.print_exc()
time.sleep(60)
else:
time.sleep(60 * config.poll_interval)
if __name__ == "__main__":
main()