diff --git a/data_sources/modules/google_analytics.py b/data_sources/modules/google_analytics.py index d75f28e..b64625f 100644 --- a/data_sources/modules/google_analytics.py +++ b/data_sources/modules/google_analytics.py @@ -18,10 +18,13 @@ ) from google.oauth2 import service_account + class GoogleAnalytics: """Google Analytics 4 data fetcher""" - def __init__(self, property_id: Optional[str] = None, credentials_path: Optional[str] = None): + def __init__( + self, property_id: Optional[str] = None, credentials_path: Optional[str] = None + ): """ Initialize GA4 client @@ -29,8 +32,8 @@ def __init__(self, property_id: Optional[str] = None, credentials_path: Optional property_id: GA4 property ID (defaults to env var GA4_PROPERTY_ID) credentials_path: Path to credentials JSON (defaults to env var) """ - self.property_id = property_id or os.getenv('GA4_PROPERTY_ID') - credentials_path = credentials_path or os.getenv('GA4_CREDENTIALS_PATH') + self.property_id = property_id or os.getenv("GA4_PROPERTY_ID") + credentials_path = credentials_path or os.getenv("GA4_CREDENTIALS_PATH") if not self.property_id: raise ValueError("GA4_PROPERTY_ID must be provided or set in environment") @@ -41,16 +44,13 @@ def __init__(self, property_id: Optional[str] = None, credentials_path: Optional # Initialize client with service account credentials = service_account.Credentials.from_service_account_file( credentials_path, - scopes=['https://www.googleapis.com/auth/analytics.readonly'] + scopes=["https://www.googleapis.com/auth/analytics.readonly"], ) self.client = BetaAnalyticsDataClient(credentials=credentials) def get_top_pages( - self, - days: int = 30, - limit: int = 20, - path_filter: Optional[str] = "/blog/" + self, days: int = 30, limit: int = 20, path_filter: Optional[str] = "/blog/" ) -> List[Dict[str, Any]]: """ Get top performing pages by pageviews @@ -65,10 +65,7 @@ def get_top_pages( """ request = RunReportRequest( property=f"properties/{self.property_id}", - date_ranges=[DateRange( - start_date=f"{days}daysAgo", - end_date="today" - )], + date_ranges=[DateRange(start_date=f"{days}daysAgo", end_date="today")], dimensions=[ Dimension(name="pagePath"), Dimension(name="pageTitle"), @@ -81,10 +78,7 @@ def get_top_pages( Metric(name="engagementRate"), ], limit=limit, - order_bys=[{ - "metric": {"metric_name": "screenPageViews"}, - "desc": True - }] + order_bys=[{"metric": {"metric_name": "screenPageViews"}, "desc": True}], ) # Add path filter if provided @@ -94,8 +88,8 @@ def get_top_pages( field_name="pagePath", string_filter=Filter.StringFilter( match_type=Filter.StringFilter.MatchType.CONTAINS, - value=path_filter - ) + value=path_filter, + ), ) ) @@ -103,23 +97,43 @@ def get_top_pages( results = [] for row in response.rows: - results.append({ - 'path': row.dimension_values[0].value, - 'title': row.dimension_values[1].value, - 'pageviews': int(row.metric_values[0].value), - 'sessions': int(row.metric_values[1].value), - 'avg_session_duration': float(row.metric_values[2].value), - 'bounce_rate': float(row.metric_values[3].value), - 'engagement_rate': float(row.metric_values[4].value), - }) + results.append( + { + "path": row.dimension_values[0].value, + "title": row.dimension_values[1].value, + "pageviews": int(row.metric_values[0].value), + "sessions": int(row.metric_values[1].value), + "avg_session_duration": float(row.metric_values[2].value), + "avg_engagement_time": float(row.metric_values[2].value), + "bounce_rate": float(row.metric_values[3].value), + "engagement_rate": float(row.metric_values[4].value), + } + ) return results - def get_page_trends( + def get_page_performance( self, url: str, - days: int = 90, - granularity: str = "week" + days: int = 30, + ) -> Dict[str, Any]: + """Get performance metrics for a single page path. + + Returns the exact path match when available, falling back to the first + filtered result for compatibility with existing callers. + """ + pages = self.get_top_pages(days=days, limit=100, path_filter=url) + if not pages: + return {} + + for page in pages: + if page.get("path") == url: + return page + + return pages[0] + + def get_page_trends( + self, url: str, days: int = 90, granularity: str = "week" ) -> Dict[str, Any]: """ Get traffic trends for a specific page @@ -136,10 +150,7 @@ def get_page_trends( request = RunReportRequest( property=f"properties/{self.property_id}", - date_ranges=[DateRange( - start_date=f"{days}daysAgo", - end_date="today" - )], + date_ranges=[DateRange(start_date=f"{days}daysAgo", end_date="today")], dimensions=[Dimension(name=dimension_name)], metrics=[ Metric(name="screenPageViews"), @@ -150,55 +161,59 @@ def get_page_trends( filter=Filter( field_name="pagePath", string_filter=Filter.StringFilter( - match_type=Filter.StringFilter.MatchType.EXACT, - value=url - ) + match_type=Filter.StringFilter.MatchType.EXACT, value=url + ), ) ), - order_bys=[{ - "dimension": {"dimension_name": dimension_name}, - "desc": False - }] + order_bys=[ + {"dimension": {"dimension_name": dimension_name}, "desc": False} + ], ) response = self.client.run_report(request) timeline = [] for row in response.rows: - timeline.append({ - 'period': row.dimension_values[0].value, - 'pageviews': int(row.metric_values[0].value), - 'sessions': int(row.metric_values[1].value), - 'avg_duration': float(row.metric_values[2].value), - }) + timeline.append( + { + "period": row.dimension_values[0].value, + "pageviews": int(row.metric_values[0].value), + "sessions": int(row.metric_values[1].value), + "avg_duration": float(row.metric_values[2].value), + } + ) # Calculate trend direction if len(timeline) >= 2: - recent_views = sum(t['pageviews'] for t in timeline[-4:]) # Last 4 periods - older_views = sum(t['pageviews'] for t in timeline[:4]) # First 4 periods + recent_views = sum(t["pageviews"] for t in timeline[-4:]) # Last 4 periods + older_views = sum(t["pageviews"] for t in timeline[:4]) # First 4 periods if older_views > 0: trend_percent = ((recent_views - older_views) / older_views) * 100 else: trend_percent = 0 - trend_direction = "rising" if trend_percent > 10 else "declining" if trend_percent < -10 else "stable" + trend_direction = ( + "rising" + if trend_percent > 10 + else "declining" + if trend_percent < -10 + else "stable" + ) else: trend_percent = 0 trend_direction = "unknown" return { - 'url': url, - 'timeline': timeline, - 'trend_direction': trend_direction, - 'trend_percent': round(trend_percent, 2), - 'total_pageviews': sum(t['pageviews'] for t in timeline), + "url": url, + "timeline": timeline, + "trend_direction": trend_direction, + "trend_percent": round(trend_percent, 2), + "total_pageviews": sum(t["pageviews"] for t in timeline), } def get_conversions( - self, - days: int = 30, - path_filter: Optional[str] = "/blog/" + self, days: int = 30, path_filter: Optional[str] = "/blog/" ) -> List[Dict[str, Any]]: """ Get conversion data by page @@ -212,10 +227,7 @@ def get_conversions( """ request = RunReportRequest( property=f"properties/{self.property_id}", - date_ranges=[DateRange( - start_date=f"{days}daysAgo", - end_date="today" - )], + date_ranges=[DateRange(start_date=f"{days}daysAgo", end_date="today")], dimensions=[ Dimension(name="pagePath"), Dimension(name="pageTitle"), @@ -225,10 +237,7 @@ def get_conversions( Metric(name="conversions"), Metric(name="totalRevenue"), ], - order_bys=[{ - "metric": {"metric_name": "conversions"}, - "desc": True - }] + order_bys=[{"metric": {"metric_name": "conversions"}, "desc": True}], ) if path_filter: @@ -237,8 +246,8 @@ def get_conversions( field_name="pagePath", string_filter=Filter.StringFilter( match_type=Filter.StringFilter.MatchType.CONTAINS, - value=path_filter - ) + value=path_filter, + ), ) ) @@ -249,21 +258,23 @@ def get_conversions( pageviews = int(row.metric_values[0].value) conversions = float(row.metric_values[1].value) - results.append({ - 'path': row.dimension_values[0].value, - 'title': row.dimension_values[1].value, - 'pageviews': pageviews, - 'conversions': conversions, - 'conversion_rate': (conversions / pageviews * 100) if pageviews > 0 else 0, - 'revenue': float(row.metric_values[2].value), - }) + results.append( + { + "path": row.dimension_values[0].value, + "title": row.dimension_values[1].value, + "pageviews": pageviews, + "conversions": conversions, + "conversion_rate": (conversions / pageviews * 100) + if pageviews > 0 + else 0, + "revenue": float(row.metric_values[2].value), + } + ) return results def get_traffic_sources( - self, - url: Optional[str] = None, - days: int = 30 + self, url: Optional[str] = None, days: int = 30 ) -> List[Dict[str, Any]]: """ Get traffic source breakdown for a page or entire site @@ -277,10 +288,7 @@ def get_traffic_sources( """ request = RunReportRequest( property=f"properties/{self.property_id}", - date_ranges=[DateRange( - start_date=f"{days}daysAgo", - end_date="today" - )], + date_ranges=[DateRange(start_date=f"{days}daysAgo", end_date="today")], dimensions=[ Dimension(name="sessionDefaultChannelGroup"), ], @@ -289,10 +297,7 @@ def get_traffic_sources( Metric(name="screenPageViews"), Metric(name="engagementRate"), ], - order_bys=[{ - "metric": {"metric_name": "sessions"}, - "desc": True - }] + order_bys=[{"metric": {"metric_name": "sessions"}, "desc": True}], ) if url: @@ -300,9 +305,8 @@ def get_traffic_sources( filter=Filter( field_name="pagePath", string_filter=Filter.StringFilter( - match_type=Filter.StringFilter.MatchType.EXACT, - value=url - ) + match_type=Filter.StringFilter.MatchType.EXACT, value=url + ), ) ) @@ -310,12 +314,14 @@ def get_traffic_sources( results = [] for row in response.rows: - results.append({ - 'source': row.dimension_values[0].value, - 'sessions': int(row.metric_values[0].value), - 'pageviews': int(row.metric_values[1].value), - 'engagement_rate': float(row.metric_values[2].value), - }) + results.append( + { + "source": row.dimension_values[0].value, + "sessions": int(row.metric_values[0].value), + "pageviews": int(row.metric_values[1].value), + "engagement_rate": float(row.metric_values[2].value), + } + ) return results @@ -323,7 +329,7 @@ def get_declining_pages( self, comparison_days: int = 30, threshold_percent: float = -20.0, - path_filter: str = "/blog/" + path_filter: str = "/blog/", ) -> List[Dict[str, Any]]: """ Identify pages with declining traffic @@ -338,40 +344,40 @@ def get_declining_pages( """ # Get recent period data recent_pages = self.get_top_pages( - days=comparison_days, - limit=100, - path_filter=path_filter + days=comparison_days, limit=100, path_filter=path_filter ) # Get previous period data previous_pages = self.get_top_pages( - days=comparison_days * 2, - limit=100, - path_filter=path_filter + days=comparison_days * 2, limit=100, path_filter=path_filter ) # Create lookup for previous data - previous_lookup = {p['path']: p['pageviews'] for p in previous_pages} + previous_lookup = {p["path"]: p["pageviews"] for p in previous_pages} declining = [] for page in recent_pages: - path = page['path'] - recent_views = page['pageviews'] + path = page["path"] + recent_views = page["pageviews"] previous_views = previous_lookup.get(path, 0) if previous_views > 0: - change_percent = ((recent_views - previous_views) / previous_views) * 100 + change_percent = ( + (recent_views - previous_views) / previous_views + ) * 100 if change_percent < threshold_percent: - declining.append({ - **page, - 'previous_pageviews': previous_views, - 'change_percent': round(change_percent, 2), - 'priority': 'high' if change_percent < -40 else 'medium' - }) + declining.append( + { + **page, + "previous_pageviews": previous_views, + "change_percent": round(change_percent, 2), + "priority": "high" if change_percent < -40 else "medium", + } + ) # Sort by worst decline - declining.sort(key=lambda x: x['change_percent']) + declining.sort(key=lambda x: x["change_percent"]) return declining @@ -379,7 +385,8 @@ def get_declining_pages( # Example usage if __name__ == "__main__": from dotenv import load_dotenv - load_dotenv('data_sources/config/.env') + + load_dotenv("data_sources/config/.env") ga = GoogleAnalytics() @@ -388,7 +395,9 @@ def get_declining_pages( for i, page in enumerate(top_pages, 1): print(f"{i}. {page['title']}") print(f" {page['path']}") - print(f" {page['pageviews']:,} pageviews | {page['engagement_rate']:.1%} engagement") + print( + f" {page['pageviews']:,} pageviews | {page['engagement_rate']:.1%} engagement" + ) print() print("\nDeclining articles:") @@ -396,5 +405,7 @@ def get_declining_pages( for page in declining[:5]: print(f"- {page['title']}") print(f" {page['path']}") - print(f" {page['change_percent']:.1f}% change ({page['previous_pageviews']} → {page['pageviews']})") + print( + f" {page['change_percent']:.1f}% change ({page['previous_pageviews']} → {page['pageviews']})" + ) print() diff --git a/tests/test_google_analytics_compat.py b/tests/test_google_analytics_compat.py new file mode 100644 index 0000000..5b3c919 --- /dev/null +++ b/tests/test_google_analytics_compat.py @@ -0,0 +1,102 @@ +import importlib.util +import sys +import types +import unittest +from pathlib import Path + + +MODULE_PATH = ( + Path(__file__).resolve().parents[1] + / "data_sources" + / "modules" + / "google_analytics.py" +) + + +def load_google_analytics_module(): + fake_google_analytics = types.ModuleType("google.analytics") + fake_google_analytics_v1beta = types.ModuleType("google.analytics.data_v1beta") + fake_google_analytics_types = types.ModuleType("google.analytics.data_v1beta.types") + fake_google_oauth2 = types.ModuleType("google.oauth2") + fake_service_account = types.SimpleNamespace( + Credentials=types.SimpleNamespace( + from_service_account_file=lambda *args, **kwargs: None + ) + ) + + fake_google_analytics_v1beta.BetaAnalyticsDataClient = object + for name in [ + "DateRange", + "Dimension", + "Metric", + "RunReportRequest", + "FilterExpression", + "Filter", + ]: + setattr(fake_google_analytics_types, name, object) + fake_google_oauth2.service_account = fake_service_account + + modules = { + "google.analytics": fake_google_analytics, + "google.analytics.data_v1beta": fake_google_analytics_v1beta, + "google.analytics.data_v1beta.types": fake_google_analytics_types, + "google.oauth2": fake_google_oauth2, + } + + previous = {name: sys.modules.get(name) for name in modules} + sys.modules.update(modules) + try: + spec = importlib.util.spec_from_file_location( + "google_analytics_under_test", MODULE_PATH + ) + if spec is None or spec.loader is None: + raise RuntimeError(f"Unable to load {MODULE_PATH}") + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + return module + finally: + for name, value in previous.items(): + if value is None: + sys.modules.pop(name, None) + else: + sys.modules[name] = value + + +class GoogleAnalyticsCompatTests(unittest.TestCase): + def test_get_page_performance_returns_exact_match_and_aliases_engagement(self): + module = load_google_analytics_module() + ga = object.__new__(module.GoogleAnalytics) + + ga.get_top_pages = lambda days=30, limit=20, path_filter="/blog/": [ + { + "path": "/blog/post-a", + "pageviews": 100, + "avg_session_duration": 12.0, + "avg_engagement_time": 12.0, + "bounce_rate": 0.25, + }, + { + "path": "/blog/post-b", + "pageviews": 50, + "avg_session_duration": 6.0, + "avg_engagement_time": 6.0, + "bounce_rate": 0.10, + }, + ] + + page = ga.get_page_performance("/blog/post-b", days=45) + + self.assertEqual(page["path"], "/blog/post-b") + self.assertEqual(page["pageviews"], 50) + self.assertEqual(page["avg_engagement_time"], 6.0) + + def test_get_page_performance_returns_empty_dict_when_no_pages_match(self): + module = load_google_analytics_module() + ga = object.__new__(module.GoogleAnalytics) + ga.get_top_pages = lambda **kwargs: [] + + self.assertEqual(ga.get_page_performance("/blog/missing"), {}) + + +if __name__ == "__main__": + unittest.main()