From 4cfb37b89791a8ee266464df80bbad86e03cacff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Bj=C3=A4reholt?= Date: Fri, 12 May 2023 22:58:22 +0200 Subject: [PATCH] fix: minor improvements --- notebooks/Dashboard.ipynb | 23 +++++++++++++++-------- src/quantifiedme/derived/screentime.py | 23 ++++++++++++++++------- 2 files changed, 31 insertions(+), 15 deletions(-) diff --git a/notebooks/Dashboard.ipynb b/notebooks/Dashboard.ipynb index 6afcc21..69d07a6 100644 --- a/notebooks/Dashboard.ipynb +++ b/notebooks/Dashboard.ipynb @@ -208,8 +208,20 @@ }, "outputs": [], "source": [ - "# Using my syncing testing instance to get multi-device data\n", - "events = load_screentime(since, datasources=None, hostnames=None, personal=personal, cache=cache)\n", + "import pickle\n", + "\n", + "cachefile = Path(\"events.pickle\")\n", + "if cache and (load_pickled := False) and cachefile.exists() and cachefile.stat().st_mtime > datetime.now().timestamp() - 60*60*24:\n", + " print(\"Loading cached events\")\n", + " with cachefile.open('rb') as f:\n", + " events = pickle.load(f)\n", + "else:\n", + " # Uses my syncing testing instance to get multi-device data\n", + " events = load_screentime(since, datasources=None, hostnames=None, personal=personal, cache=cache)\n", + " # Save the events to a file so we can load them faster next time\n", + " with cachefile.open('wb') as f:\n", + " pickle.dump(events, f)\n", + "\n", "print(f\"First: {events[0].timestamp}\")\n", "print(f\"Last: {events[-1].timestamp}\")" ] @@ -219,12 +231,7 @@ "execution_count": null, "metadata": {}, "outputs": [], - "source": [ - "# Save the events to a file so we can load them faster next time\n", - "import pickle\n", - "with open('events.pickle', 'wb') as f:\n", - " pickle.dump(events, f)" - ] + "source": [] }, { "cell_type": "markdown", diff --git a/src/quantifiedme/derived/screentime.py b/src/quantifiedme/derived/screentime.py index 8d3dc15..3c67048 100644 --- a/src/quantifiedme/derived/screentime.py +++ b/src/quantifiedme/derived/screentime.py @@ -1,6 +1,7 @@ import logging from datetime import datetime, timezone, timedelta from pathlib import Path +from typing import Literal import click import pandas as pd @@ -32,10 +33,12 @@ def _get_aw_client(testing: bool) -> ActivityWatchClient: port = sec_aw.get("port", 5600 if not testing else 5666) return ActivityWatchClient(port=port, testing=testing) +DatasourceType = Literal["activitywatch", "smartertime_buckets", "fake", "toggl"] + def load_screentime( since: datetime | None, - datasources: list[str] | None, + datasources: list[DatasourceType] | None, hostnames: list[str] | None, personal: bool, cache: bool = True, @@ -63,7 +66,7 @@ def load_screentime( # Check for invalid sources for source in datasources: - assert source in ["activitywatch", "smartertime", "fake", "toggl"] + assert source in ["activitywatch", "smartertime_buckets", "fake", "toggl"], f"Invalid source: {source}" # Load hostnames from config if not specified hostnames = hostnames or config["data"]["activitywatch"]["hostnames"] @@ -89,7 +92,7 @@ def load_screentime( e.data["$source"] = "activitywatch" events = _join_events(events, events_aw, f"activitywatch {hostname}") - if "smartertime" in datasources: + if "smartertime_buckets" in datasources: events_smartertime = load_events_smartertime(since) events = _join_events(events, events_smartertime, "smartertime") @@ -167,13 +170,15 @@ def load_category_df(events: list[Event]) -> pd.DataFrame: raise df = pd.DataFrame(tss) df = df.replace(np.nan, 0) + df["All_cols"] = df.sum(axis=1) + df["All_events"] = sum([e.duration / 60 / 60 for e in events], timedelta(0)) return df @click.command() -def screentime(): +@click.option("--csv", is_flag=True, help="Print as CSV") +def screentime(csv: bool): """Load all screentime and print total duration""" - # TODO: Get awc parameters from config hostnames = load_config()["data"]["activitywatch"]["hostnames"] events = load_screentime( since=datetime.now(tz=timezone.utc) - timedelta(days=90), @@ -181,9 +186,13 @@ def screentime(): hostnames=hostnames, personal=True, ) - print(f"Total duration: {sum((e.duration for e in events), timedelta(0))}") + logger.info(f"Total duration: {sum((e.duration for e in events), timedelta(0))}") - print(load_category_df(events)) + df = load_category_df(events) + if csv: + print(df.to_csv()) + else: + print(df) if __name__ == "__main__":