-
Notifications
You must be signed in to change notification settings - Fork 0
/
steam_statistics_luigi_ETL.py
83 lines (65 loc) · 2.46 KB
/
steam_statistics_luigi_ETL.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
from random import uniform, randint
from luigi import run
from Steam_statistics_ETL.Steam_statistics_tasks.AllSteamProductsData_luigi_task import AllSteamProductsDataTask
from Steam_statistics_ETL.Steam_statistics_tasks.GetSteamProductsDataInfo_luigi_task import GetSteamProductsDataInfoTask
from Steam_statistics_ETL.Steam_statistics_tasks.SteamProductsInfoCSVJoiner_luigi_task import \
SteamProductsInfoInfoCSVJoinerTask
from Steam_statistics_ETL.Steam_statistics_tasks.CreateDiagrams_luigi_task import CreateDiagramsSteamStatisticsTask
"""
Steam statistics Luigi ETL.
"""
class AllSteamProductsData(AllSteamProductsDataTask):
"""
Gets a list of products from the SteamAPI.
"""
# Task settings:
task_namespace: str = 'AllSteamProductsData'
priority = 300
class GetSteamProductsDataInfo(GetSteamProductsDataInfoTask):
"""
Parses and scrapes the list of products available on Steam.
"""
# Task settings:
task_namespace: str = 'GetSteamProductsDataInfo'
priority: int = 5000
retry_count: int = 30
# Wait settings:
time_wait: int = randint(1, 3)
# time_wait: float = uniform(0.1, 0.3)
def requires(self):
return {'AllSteamProductsData': AllSteamProductsData()}
class SteamAppsInfoCSVJoiner(SteamProductsInfoInfoCSVJoinerTask):
"""
Merges all raw CSV tables into one MasterData for Steam Apps.
"""
# Task settings:
directory_for_csv_join = 'Apps_info'
task_namespace: str = 'SteamProductsInfo'
priority: int = 100
retry_count: int = 2
def requires(self):
return {'GetSteamProductsDataInfo': GetSteamProductsDataInfo()}
class SteamDLCInfoCSVJoiner(SteamProductsInfoInfoCSVJoinerTask):
"""
Merges all raw CSV tables into one MasterData for Steam DLC.
"""
# Task settings:
directory_for_csv_join = 'DLC_info'
task_namespace: str = 'SteamProductsInfo'
priority: int = 100
retry_count: int = 2
def requires(self):
return {'GetSteamProductsDataInfo': GetSteamProductsDataInfo()}
class CreateDiagramsSteamStatistics(CreateDiagramsSteamStatisticsTask):
"""
Create diagrams for the report.
"""
# Task settings:
task_namespace = 'CreateDiagramsSteamStatistics'
priority = 200
def requires(self):
return {'SteamAppInfoCSVJoiner': SteamAppsInfoCSVJoiner(),
'SteamDLCInfoCSVJoiner': SteamDLCInfoCSVJoiner()}
if __name__ == "__main__":
# luigi.build([task], local_scheduler=True)
run()