-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmediaInfoService.py
113 lines (87 loc) · 3.2 KB
/
mediaInfoService.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import os.path
import json
import datetime
import falcon
# Hachoir ----------------------------------------------------------------------
from hachoir.parser import createParser
from hachoir.metadata import extractMetadata
def hachoir_metadata_dict(filename):
_metadata = extractMetadata(createParser(filename))
return {
key: _metadata.get(key)
for key in _metadata._Metadata__data.keys()
if _metadata.has(key)
}
# JSON
def json_encode_field(obj):
if isinstance(obj, datetime.datetime):
return obj.isoformat()
if isinstance(obj, datetime.timedelta):
return obj.total_seconds()
if isinstance(obj, set):
return tuple(obj)
return obj
# Request Handler --------------------------------------------------------------
class MediaInfo():
def __init__(self, path):
assert os.path.isdir(path)
self.path = path
def on_get(self, request, response):
response.media = {}
filename = os.path.join(self.path, request.path.strip('/'))
if os.path.isdir(filename):
response.media = os.listdir(filename)
response.status = falcon.HTTP_200
return
if not os.path.isfile(filename):
response.media = {'error': 'file not found'}
response.status = falcon.HTTP_404
return
try:
metadata = hachoir_metadata_dict(os.path.join(self.path, filename))
except Exception as ex:
response.media = {'error': 'unable to extract metadata', 'reason': str(ex)}
response.status = falcon.HTTP_500
return
metadata = {k: json_encode_field(v) for k, v in metadata.items()}
response.media.update(metadata)
response.status = falcon.HTTP_200
# Setup App -------------------------------------------------------------------
def create_wsgi_app(media_path='./', **kwargs):
media = MediaInfo(media_path)
app = falcon.API()
app.add_sink(media.on_get, prefix='/')
return app
# Commandlin Args -------------------------------------------------------------
def get_args():
import argparse
parser = argparse.ArgumentParser(
prog=__name__,
description='''
Provide a URL endpoint to return metadata of media
''',
)
parser.add_argument('media_path', action='store', default='./', help='')
parser.add_argument('--host', action='store', default='0.0.0.0', help='')
parser.add_argument('--port', action='store', default=8331, type=int, help='')
kwargs = vars(parser.parse_args())
return kwargs
def init_sigterm_handler():
"""
Docker Terminate
https://lemanchet.fr/articles/gracefully-stop-python-docker-container.html
"""
import signal
def handle_sigterm(*args):
raise KeyboardInterrupt()
signal.signal(signal.SIGTERM, handle_sigterm)
# Main ------------------------------------------------------------------------
if __name__ == '__main__':
init_sigterm_handler()
kwargs = get_args()
from wsgiref import simple_server
httpd = simple_server.make_server(kwargs['host'], kwargs['port'], create_wsgi_app(**kwargs))
try:
httpd.serve_forever()
except KeyboardInterrupt:
pass