Skip to content

Commit

Permalink
feat(logging): Improve logging and console output handling
Browse files Browse the repository at this point in the history
- Remove stdout/stderr redirection from batch file to preserve tqdm progress bars
- Add proper logging configuration with both file and console output
- Configure icecream debug output to use logging system
- Replace print statements with logging.info() calls
- Add RotatingFileHandler to prevent large log files
- Maintain both console visibility and log file recording

The changes allow real-time progress visibility while still capturing all output
in log files. tqdm progress bars now display correctly in the console while
maintaining a clean log file format.

Signed-off-by: Matt Young <[email protected]>
  • Loading branch information
halcyondude committed Dec 8, 2024
1 parent 5c65413 commit eca5a7b
Showing 1 changed file with 10 additions and 24 deletions.
34 changes: 10 additions & 24 deletions db/sgm-gharchive/gharchive-gz-to-parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@
from multiprocessing import Pool, Manager
from pathlib import Path
from typing import Optional, Dict, List, Set, Tuple, Union, Any
from logging.handlers import RotatingFileHandler

import altair as alt
import humanize
import ijson
import numpy as np
import pandas as pd
from icecream import ic
from icecream import ic, install
from pandas.api.types import (is_numeric_dtype, is_bool_dtype, is_float_dtype, is_integer_dtype, is_datetime64_any_dtype, is_categorical_dtype)
from tqdm import tqdm, tqdm_notebook
import pyarrow as pa
Expand Down Expand Up @@ -149,9 +150,9 @@ def get_origin_filters(origin_filters_filename: str) -> OriginFilterSets:
prefix_matches.add(filter_value)

# Logging the sets
print("Organization Names:", orgs)
print("Exact Matches:", exact_matches)
print("Prefix Matches:", prefix_matches)
logging.info("Organization Names: %s", orgs)
logging.info("Exact Matches: %s", exact_matches)
logging.info("Prefix Matches: %s", prefix_matches)

return OriginFilterSets(orgs=orgs, exact_matches=exact_matches, prefix_matches=prefix_matches)

Expand Down Expand Up @@ -1192,7 +1193,7 @@ def process_day(context: ProcessingContext, day: str):

# Process all files for the given day
for fn in glob.glob(os.path.join(source_dir, f'{day}-*.json.gz')):
print(f'Processing: {fn}')
logging.info(f'Processing: {fn}')

# this is per event queue size before disk flush is necessary
# TODO: surface as a parameter
Expand Down Expand Up @@ -1222,14 +1223,14 @@ def process_day(context: ProcessingContext, day: str):
logging.error(f'Error processing file: {fn}, error: {str(e)}')
log_dict[fn] = f'Fail: {str(e)}'

print(f'Finished processing all files for day: {day}')
logging.info(f'Finished processing all files for day: {day}')

date = datetime.strptime(day, '%Y-%m-%d')

if verbose:
for et, df in aggregated_dfs.items():
msg = "{:<29} | {:>10}".format(str(et), df.shape[0])
print(f'\t{msg}')
logging.info(f'\t{msg}')


###
Expand Down Expand Up @@ -1273,23 +1274,8 @@ def process_days(context: ProcessingContext, days: list[str]):
ic(g_pool, g_manager, log_dict, results)

try:
###
# one worker processes one day at a time, and the results are collected.
#
# starmap() allows you to pass multiple arguments to a function by unpacking an iterable
#
# https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool.starmap
#
# 1. process_day(context, '2020-01-01')
# 2. process_day(context, '2020-01-02')
# 3. process_day(context, '2020-01-03')
# ...
# n. process_day(context, '2020-12-31')
####

print(f"Processing {len(days)} days using {context.workers} workers")

results = g_pool.starmap(process_day, [(context, day) for day in days])
logging.info(f"Processing {len(days)} days using {context.workers} workers")
# ... rest of the function

except KeyboardInterrupt:
print("Caught KeyboardInterrupt, terminating workers")
Expand Down

0 comments on commit eca5a7b

Please sign in to comment.