Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incremental stats sitewide #3114

Merged
merged 27 commits into from
Jan 22, 2025
Merged
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
169a3f6
interim checkin
amCap1712 Dec 30, 2024
006b367
fix table use
amCap1712 Dec 30, 2024
4805bd2
fix combined table
amCap1712 Dec 30, 2024
11c8042
fix partial df use
amCap1712 Dec 30, 2024
ca2e1fb
add per user limit for sitewide stats
amCap1712 Dec 30, 2024
bb62d95
testing more scenarios
amCap1712 Dec 31, 2024
d64f7ea
refactor incremental sitewide stats
amCap1712 Dec 31, 2024
570e3ce
fix import
amCap1712 Dec 31, 2024
6fae52c
add all time incremental stats for other entities
amCap1712 Dec 31, 2024
4e037c2
Delete partial sitewide aggregates on import of full dump
amCap1712 Jan 2, 2025
85ea8a4
Add bookkeeping for using aggregates of any stats_range
amCap1712 Jan 2, 2025
0d64068
fix imports
amCap1712 Jan 2, 2025
1fe0397
fix metadata path
amCap1712 Jan 2, 2025
da7fce5
add logging to debug
amCap1712 Jan 2, 2025
28551bd
fix existing agg usable check
amCap1712 Jan 2, 2025
6b97954
add schema to json read
amCap1712 Jan 2, 2025
b9fd965
fix skip_trash arg in dump upload
amCap1712 Jan 6, 2025
dd9ec00
Refactor SitewideEntity for sharing with other stats
amCap1712 Jan 7, 2025
1b9df3a
Fix constructors
amCap1712 Jan 7, 2025
f1af83c
Fix call to generate_stats
amCap1712 Jan 7, 2025
a9a62ce
Fix call to generate_stats - 2
amCap1712 Jan 7, 2025
aeb4385
Fix aggregates cleanup and remove outdated tests
amCap1712 Jan 8, 2025
53a72f9
add missing path
amCap1712 Jan 8, 2025
e60295f
make sitewide listening activity incremental
amCap1712 Jan 8, 2025
7dca0d0
simply generating stats if incremental dump doesn't exist
amCap1712 Jan 8, 2025
836236d
Refactor create messages and stats validation into class
amCap1712 Jan 9, 2025
9295ddc
add comment on bookkeeping schema
amCap1712 Jan 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add logging to debug
amCap1712 committed Jan 7, 2025
commit da7fce5c188d21bc4084b4f3803950a1a25a7811
10 changes: 7 additions & 3 deletions listenbrainz_spark/stats/incremental/sitewide/entity.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import abc
import logging
from datetime import datetime
from pathlib import Path
from typing import List
@@ -16,6 +17,7 @@
from listenbrainz_spark.utils import read_files_from_HDFS, get_listens_from_dump


logger = logging.getLogger(__name__)
BOOKKEEPING_SCHEMA = StructType([
StructField('from_date', TimestampType(), nullable=False),
StructField('to_date', TimestampType(), nullable=False),
@@ -71,21 +73,23 @@ def generate_stats(self, stats_range: str, from_date: datetime,
read_files_from_HDFS(df_path).createOrReplaceTempView(df_name)

metadata_path = self.get_bookkeeping_path(stats_range)
existing_aggregate_usable = False
try:
metadata = listenbrainz_spark.session.read.json(f"{HDFS_CLUSTER_URI}{metadata_path}").collect()[0]
existing_from_date, existing_to_date = metadata["from_date"], metadata["to_date"]
existing_aggregate_usable = existing_from_date == from_date
logger.info("Existing from date: %s, new from date: %s", existing_from_date, from_date)
except AnalysisException:
pass
existing_aggregate_usable = False
logger.info("Existing partial aggregate not found!")

prefix = f"sitewide_{self.entity}_{stats_range}"
existing_aggregate_path = self.get_existing_aggregate_path(stats_range)

if not hdfs_connection.client.status(existing_aggregate_path, strict=False) or not existing_aggregate_usable:
table = f"{prefix}_full_listens"
get_listens_from_dump(from_date, to_date).createOrReplaceTempView(table)
get_listens_from_dump(from_date, to_date, include_incremental=False).createOrReplaceTempView(table)

logger.info("Creating partial aggregate from full dump listens")
hdfs_connection.client.makedirs(Path(existing_aggregate_path).parent)
full_df = self.aggregate(table, cache_tables, user_listen_count_limit)
full_df.write.mode("overwrite").parquet(existing_aggregate_path)