From 97daad34650aa09e7b0df1a8f7984bbb6dfdaf5d Mon Sep 17 00:00:00 2001 From: Matt Young Date: Wed, 13 Nov 2024 04:21:35 -0500 Subject: [PATCH] sgm-gharchive - misc tweaks and nits ahead of KubeCon 2024 --- db/sgm-gharchive/cncf-consolidate-gz.sh | 15 ++-- db/sgm-gharchive/gharchive-gz-to-parquet.bat | 5 +- db/sgm-gharchive/gharchive-gz-to-parquet.py | 70 ++++++++++++++----- .../json2parquet-consolidated-events.sh | 10 +-- 4 files changed, 70 insertions(+), 30 deletions(-) diff --git a/db/sgm-gharchive/cncf-consolidate-gz.sh b/db/sgm-gharchive/cncf-consolidate-gz.sh index 2795273..140a5d9 100755 --- a/db/sgm-gharchive/cncf-consolidate-gz.sh +++ b/db/sgm-gharchive/cncf-consolidate-gz.sh @@ -19,7 +19,14 @@ set -euox pipefail # -d, --dry-run Perform a dry run without actual concatenation # -v, --verbose Enable verbose output # -h, --help Display this help text -./consolidate-gz.sh --source ~/gharchive-cncf/debug.cncf.all \ - --target ~/gharchive-cncf/debug.cncf.byrepo \ - --verbose - # --dry-run \ No newline at end of file + +./consolidate-gz.sh --source /p/gha-parquet-daily/2024 \ + --target /p/gha-parquet-cncf \ + --verbose | tee consolidate-gz-2024.log + + + +# ./consolidate-gz.sh --source ~/gharchive-cncf/debug.cncf.all \ +# --target ~/gharchive-cncf/debug.cncf.byrepo \ +# --verbose +# --dry-run \ No newline at end of file diff --git a/db/sgm-gharchive/gharchive-gz-to-parquet.bat b/db/sgm-gharchive/gharchive-gz-to-parquet.bat index 214395f..6d5d4e0 100644 --- a/db/sgm-gharchive/gharchive-gz-to-parquet.bat +++ b/db/sgm-gharchive/gharchive-gz-to-parquet.bat @@ -20,7 +20,7 @@ set "orgfile=%~3" REM Set optional arguments with defaults if "%~4"=="" ( for %%I in ("%source%") do set "sourcename=%%~nxI" - set "logs=%target%\gharchive-gz-hour2day-%sourcename%.csv" + set "logs=%target%\gharchive-gz-to-parquet-%sourcename%.csv" ) else ( set "logs=%~4" ) @@ -54,6 +54,7 @@ if not exist "%target%" ( ) REM Remove the redirection to allow console output while still logging +echo python gharchive-gz-to-parquet.py --source %source% --target %target% --org-file %orgfile% --log-results %logs% --verbose --workers %workers% python gharchive-gz-to-parquet.py --source %source% --target %target% --org-file %orgfile% --log-results %logs% --verbose --workers %workers% REM Calculate duration @@ -86,7 +87,7 @@ echo (default: target\gharchive-gz-hour2day-pylog-YYYYMMDD.log) echo workers Number of parallel workers (default: 55) echo. echo Example: -echo %~nx0 "p:\gha-raw-daily\2024" "p:\gha-parquet-daily\2024" "C:\data\gharchive\orgs.csv" +echo %~nx0 "p:\gha-raw-daily\2024" "p:\gha-parquet-daily\2024" "org-list-cncf.txt" REM exit /b 1 returns error code 1 to calling process to indicate help was shown REM due to missing required arguments or explicit help request diff --git a/db/sgm-gharchive/gharchive-gz-to-parquet.py b/db/sgm-gharchive/gharchive-gz-to-parquet.py index 0b100c2..291429e 100644 --- a/db/sgm-gharchive/gharchive-gz-to-parquet.py +++ b/db/sgm-gharchive/gharchive-gz-to-parquet.py @@ -232,26 +232,38 @@ def print_categoricals(df: pd.DataFrame): ic({col}, df[col].cat.categories, df[col].value_counts()) def discover_days(path_to_hourly_archives: str) -> list[str]: - '''Enumerate all files, extracting unique YYYY-MM-DD from YY-MM-DD-hh.json.gz (hack: just checks for the last hour of the day!)''' + '''Enumerate all files, extracting unique YYYY-MM-DD from YYYY-MM-DD-HH.json.gz''' if not os.path.exists(path_to_hourly_archives): raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), path_to_hourly_archives) - days = list[str]() + days = set() # Use a set to collect unique days - # the last archive of the day is YYYY-MM-DD-23.json.gz - for fn in glob.glob(f"{path_to_hourly_archives}/*-23.json.gz"): - file = os.path.split(fn) - day = file[1].removesuffix('-23.json.gz') - - # day := YYYY-MM-DD - days.append(day) - - days.sort() + # Match files with pattern YYYY-MM-DD-HH.json.gz + for fn in glob.glob(f"{path_to_hourly_archives}/*.json.gz"): + file = os.path.split(fn)[1] + # Extract YYYY-MM-DD from YYYY-MM-DD-HH.json.gz + try: + day = file.rsplit('-', 1)[0] # Split on last hyphen and take first part + # Validate the date format + datetime.strptime(day, '%Y-%m-%d') + days.add(day) + except (ValueError, IndexError) as e: + logging.warning(f"Skipping malformed filename: {file} - {str(e)}") + continue - if not len(days): - raise Exception(f'ERROR! No days found at: {path_to_hourly_archives}') + if not days: + raise Exception(f'ERROR! No days found at: {path_to_hourly_archives}') - return days + # Convert set to sorted list + days_list = sorted(list(days)) + + # Log summary at INFO level + logging.info(f"Discovered {len(days_list)} days:") + for day in days_list: + logging.info(f" {day}") + logging.info(f"Date range: {days_list[0]} to {days_list[-1]}") + + return days_list def generate_markdown_documentation(df: pd.DataFrame, file_path: Optional[str] = None) -> str: """ @@ -1054,7 +1066,7 @@ def create_common_columns(df: pd.DataFrame, column_dtypes: dict[str, str]) -> pd # defaults: DataFrame.convert_dtypes(infer_objects=True, convert_string=True, convert_integer=True, convert_boolean=True, convert_floating=True, dtype_backend='numpy_nullable' # https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.convert_dtypes.html#pandas.DataFrame.convert_dtypes - # df_arrow_backed = df.convert_dtypes(dtype_backend='pyarrow') + # df_arrow_backed = df.convert_dtypes(infer_objects=True, convert_string=True, convert_integer=True, convert_boolean=True, convert_floating=True, dtype_backend='numpy_nullable') # # both int64 and Int64 result in int64[pyarrow] (the nullable type) # df_delta = compare_dataframes(df, df_arrow_backed, generate_viz=False) @@ -1264,7 +1276,16 @@ def process_day(context: ProcessingContext, day: str): #@timing_decorator def process_days(context: ProcessingContext, days: list[str]): - """process_days Create multiprocessing Pool, Manager, and create one task per day to process.""" + """process_days Create multiprocessing Pool, Manager, and create one task per day to process. + one worker processes one day at a time, and the results are collected. + starmap() allows you to pass multiple arguments to a function by unpacking an iterable + https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool.starmap + 1. process_day(context, '2020-01-01') + 2. process_day(context, '2020-01-02') + 3. process_day(context, '2020-01-03') + ... + n. process_day(context, '2020-12-31') + """ ic() log_dict = {} # Use a regular dictionary instead of g_manager.dict() @@ -1275,8 +1296,9 @@ def process_days(context: ProcessingContext, days: list[str]): try: logging.info(f"Processing {len(days)} days using {context.workers} workers") - # ... rest of the function + # process all of the days using workers in parallel + results = g_pool.starmap(process_day, [(context, day) for day in days]) except KeyboardInterrupt: print("Caught KeyboardInterrupt, terminating workers") g_pool.terminate() @@ -1318,6 +1340,8 @@ def parse_args(): parser.add_argument("--verbose", action='store_true', help="Enable verbose output") parser.add_argument("--workers", type=int, default=multiprocessing.cpu_count(), help="Number of workers in the pool manager") parser.add_argument("--log-results", default=f'{os.path.splitext(os.path.basename(__file__))[0]}-worker-results.csv', help="Path to save the log_dict file") + parser.add_argument("--log-level", default="INFO", choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], + help="Set the logging level (default: INFO)") args = parser.parse_args() # Environment variables override command line arguments @@ -1328,9 +1352,21 @@ def parse_args(): args.verbose = bool(os.getenv('GHA_VERBOSE', args.verbose)) args.workers = int(os.getenv('GHA_WORKERS', args.workers)) args.log_results = os.getenv('GHA_LOG_RESULTS', args.log_results) + args.log_level = os.getenv('GHA_LOG_LEVEL', args.log_level) args.log_results = os.path.join(args.target_dir, args.log_results) + # Set up logging configuration + log_file = os.path.join(args.target_dir, f'{os.path.splitext(os.path.basename(__file__))[0]}-pylog.log') + logging.basicConfig( + level=getattr(logging, args.log_level.upper()), + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler(log_file), + logging.StreamHandler() + ] + ) + if args.verbose: ic(args) diff --git a/db/sgm-gharchive/json2parquet-consolidated-events.sh b/db/sgm-gharchive/json2parquet-consolidated-events.sh index 31868c1..6db3359 100755 --- a/db/sgm-gharchive/json2parquet-consolidated-events.sh +++ b/db/sgm-gharchive/json2parquet-consolidated-events.sh @@ -16,11 +16,6 @@ start_dstat() { dstat -cdngy --output "$1/dstat.log" & } -# # Usage: graph_dstat -# graph_dstat() { -# dstat-to-graphite < "$1/dstat.log" > "$1/dstat.png" -# } - display_banner() { echo "====================================" echo " gharchive: JSON to Parquet " @@ -195,7 +190,9 @@ process_file() { echo "[decompress] $gzfile --> $json_file" - pigz -v -d -k "$gz_file" -c > "$json_file" || { echo "Decompression failed for $gz_file"; return 1; } + # pigz -v -d -k "$gz_file" -c > "$json_file" || { echo "Decompression failed for $gz_file"; return 1; } + + gzip -v -d -k "$gz_file" -c > "$json_file" || { echo "Decompression failed for $gz_file"; return 1; } echo "[json2parquet] $json_file --> $parquet_file, $schema_file" echo json2parquet "${json_file}" "${parquet_file}" -c gzip --dictionary --statistics page 2> "${schema_file}" @@ -219,7 +216,6 @@ export -f process_file # parallelArgs=(--verbose --progress --tag --linebuffer --color \ --jobs ${#consolidated_files[@]} \ -# --jobs 5 \ --joblog "$jobLogfile" \ --results "$resultsLogfile")