forked from metabrainz/listenbrainz-server
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathspark_manage.py
177 lines (154 loc) · 6.67 KB
/
spark_manage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
import os
import sys
import click
import logging
import time
import listenbrainz_spark
from listenbrainz_spark import path
from listenbrainz_spark import utils
from listenbrainz_spark import config
from listenbrainz_spark import hdfs_connection
from hdfs.util import HdfsError
from py4j.protocol import Py4JJavaError
app = utils.create_app(debug=True)
@click.group()
def cli():
pass
@cli.command(name='init_dir')
@click.option('--rm', is_flag=True, help='Delete existing directories from HDFS.')
@click.option('--recursive', is_flag=True, help='Delete existing directories from HDFS recursively.')
@click.option('--create_dir', is_flag=True, help='Create directories in HDFS.')
def init_dir(rm, recursive, create_dir):
""" Create directories in HDFS to run the recommendation engine.
"""
try:
listenbrainz_spark.init_spark_session('Manage Directories')
except Py4JJavaError as err:
logging.error('{}\n{}\nAborting...'.format(str(err), err.java_exception))
sys.exit(-1)
hdfs_connection.init_hdfs(config.HDFS_HTTP_URI)
if rm:
try:
utils.delete_dir(path.RECOMMENDATION_PARENT_DIR)
utils.delete_dir(path.CHECKPOINT_DIR)
logging.info('Successfully deleted directories.')
except HdfsError as err:
logging.error('{}: Some/all directories are non-empty. Try "--recursive" to delete recursively.'.format(
type(err).__name__))
logging.warning('Deleting directory recursively will delete all the recommendation data.')
sys.exit(-1)
if recursive:
try:
utils.delete_dir(path.RECOMMENDATION_PARENT_DIR, recursive=True)
utils.delete_dir(path.CHECKPOINT_DIR, recursive=True)
logging.info('Successfully deleted directories recursively.')
except HdfsError as err:
logging.error('{}: An error occurred while deleting directories recursively.\n{}\nAborting...'.format(
type(err).__name__, str(err)))
sys.exit(-1)
if create_dir:
try:
logging.info('Creating directory to store dataframes...')
utils.create_dir(path.DATAFRAME_DIR)
logging.info('Creating directory to store models...')
utils.create_dir(path.MODEL_DIR)
logging.info('Creating directory to store candidate sets...')
utils.create_dir(path.CANDIDATE_SET_DIR)
logging.info('Creating directory to store RDD checkpoints...')
utils.create_dir(path.CHECKPOINT_DIR)
print('Done!')
except HdfsError as err:
logging.error('{}: An error occured while creating some/more directories.\n{}\nAborting...'.format(
type(err).__name__, str(err)))
sys.exit(-1)
@cli.command(name='upload_mapping')
@click.option("--force", "-f", is_flag=True, help="Deletes existing mapping.")
def upload_mapping(force):
""" Invoke script to upload mapping to HDFS.
"""
from listenbrainz_spark.ftp.download import ListenbrainzDataDownloader
from listenbrainz_spark.hdfs.upload import ListenbrainzDataUploader
with app.app_context():
downloader_obj = ListenbrainzDataDownloader()
src = downloader_obj.download_msid_mbid_mapping(path.FTP_FILES_PATH)
uploader_obj = ListenbrainzDataUploader()
uploader_obj.upload_mapping(src, force=force)
@cli.command(name='upload_listens')
@click.option('--incremental', '-i', is_flag=True, default=False, help="Use a smaller dump (more for testing purposes)")
@click.option("--force", "-f", is_flag=True, help="Deletes existing listens.")
@click.option("--id", default=None, type=int, help="Get a specific dump based on index")
def upload_listens(force, incremental, id):
""" Invoke script to upload listens to HDFS.
"""
from listenbrainz_spark.ftp.download import ListenbrainzDataDownloader
from listenbrainz_spark.hdfs.upload import ListenbrainzDataUploader
with app.app_context():
downloader_obj = ListenbrainzDataDownloader()
dump_type = 'incremental' if incremental else 'full'
src, _ = downloader_obj.download_listens(directory=path.FTP_FILES_PATH, listens_dump_id=id, dump_type=dump_type)
uploader_obj = ListenbrainzDataUploader()
uploader_obj.upload_listens(src, force=force)
@cli.command(name='upload_artist_relation')
@click.option("--force", "-f", is_flag=True, help="Deletes existing artist relation.")
def upload_artist_relation(force):
""" Invoke script to upload artist relation to HDFS.
"""
from listenbrainz_spark.ftp.download import ListenbrainzDataDownloader
from listenbrainz_spark.hdfs.upload import ListenbrainzDataUploader
with app.app_context():
downloader_obj = ListenbrainzDataDownloader()
src = downloader_obj.download_artist_relation(path.FTP_FILES_PATH)
uploader_obj = ListenbrainzDataUploader()
uploader_obj.upload_artist_relation(src, force=force)
@cli.command(name='dataframe')
def dataframes():
""" Invoke script responsible for pre-processing data.
"""
from listenbrainz_spark.recommendations import create_dataframes
with app.app_context():
create_dataframes.main()
@cli.command(name='model')
def model():
""" Invoke script responsible for training data.
"""
from listenbrainz_spark.recommendations import train_models
with app.app_context():
train_models.main()
@cli.command(name='candidate')
def candidate():
""" Invoke script responsible for generating candidate sets.
"""
from listenbrainz_spark.recommendations import candidate_sets
with app.app_context():
candidate_sets.main()
@cli.command(name='recommend')
def recommend():
""" Invoke script responsible for generating recommendations.
"""
from listenbrainz_spark.recommendations import recommend
with app.app_context():
recommend.main()
@cli.command(name='user')
def user():
""" Invoke script responsible for calculating user statistics.
"""
from listenbrainz_spark.stats import user
with app.app_context():
user.main()
@cli.command(name='request_consumer')
def request_consumer():
""" Invoke script responsible for the request consumer
"""
from listenbrainz_spark.request_consumer.request_consumer import main
with app.app_context():
main('request-consumer-%s' % str(int(time.time())))
@cli.resultcallback()
def remove_zip(result, **kwargs):
""" Remove zip created by spark-submit.
"""
os.remove(os.path.join('/', 'rec', 'listenbrainz_spark.zip'))
if __name__ == '__main__':
# The root logger always defaults to WARNING level
# The level is changed from WARNING to INFO
logging.getLogger().setLevel(logging.INFO)
cli()