Skip to content

Commit

Permalink
sgm-gharchive - misc tweaks and nits ahead of KubeCon 2024
Browse files Browse the repository at this point in the history
  • Loading branch information
halcyondude committed Dec 8, 2024
1 parent 81f55e4 commit 97daad3
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 30 deletions.
15 changes: 11 additions & 4 deletions db/sgm-gharchive/cncf-consolidate-gz.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,14 @@ set -euox pipefail
# -d, --dry-run Perform a dry run without actual concatenation
# -v, --verbose Enable verbose output
# -h, --help Display this help text
./consolidate-gz.sh --source ~/gharchive-cncf/debug.cncf.all \
--target ~/gharchive-cncf/debug.cncf.byrepo \
--verbose
# --dry-run

./consolidate-gz.sh --source /p/gha-parquet-daily/2024 \
--target /p/gha-parquet-cncf \
--verbose | tee consolidate-gz-2024.log



# ./consolidate-gz.sh --source ~/gharchive-cncf/debug.cncf.all \
# --target ~/gharchive-cncf/debug.cncf.byrepo \
# --verbose
# --dry-run
5 changes: 3 additions & 2 deletions db/sgm-gharchive/gharchive-gz-to-parquet.bat
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ set "orgfile=%~3"
REM Set optional arguments with defaults
if "%~4"=="" (
for %%I in ("%source%") do set "sourcename=%%~nxI"
set "logs=%target%\gharchive-gz-hour2day-%sourcename%.csv"
set "logs=%target%\gharchive-gz-to-parquet-%sourcename%.csv"
) else (
set "logs=%~4"
)
Expand Down Expand Up @@ -54,6 +54,7 @@ if not exist "%target%" (
)

REM Remove the redirection to allow console output while still logging
echo python gharchive-gz-to-parquet.py --source %source% --target %target% --org-file %orgfile% --log-results %logs% --verbose --workers %workers%
python gharchive-gz-to-parquet.py --source %source% --target %target% --org-file %orgfile% --log-results %logs% --verbose --workers %workers%

REM Calculate duration
Expand Down Expand Up @@ -86,7 +87,7 @@ echo (default: target\gharchive-gz-hour2day-pylog-YYYYMMDD.log)
echo workers Number of parallel workers (default: 55)
echo.
echo Example:
echo %~nx0 "p:\gha-raw-daily\2024" "p:\gha-parquet-daily\2024" "C:\data\gharchive\orgs.csv"
echo %~nx0 "p:\gha-raw-daily\2024" "p:\gha-parquet-daily\2024" "org-list-cncf.txt"

REM exit /b 1 returns error code 1 to calling process to indicate help was shown
REM due to missing required arguments or explicit help request
Expand Down
70 changes: 53 additions & 17 deletions db/sgm-gharchive/gharchive-gz-to-parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,26 +232,38 @@ def print_categoricals(df: pd.DataFrame):
ic({col}, df[col].cat.categories, df[col].value_counts())

def discover_days(path_to_hourly_archives: str) -> list[str]:
'''Enumerate all files, extracting unique YYYY-MM-DD from YY-MM-DD-hh.json.gz (hack: just checks for the last hour of the day!)'''
'''Enumerate all files, extracting unique YYYY-MM-DD from YYYY-MM-DD-HH.json.gz'''
if not os.path.exists(path_to_hourly_archives):
raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), path_to_hourly_archives)

days = list[str]()
days = set() # Use a set to collect unique days

# the last archive of the day is YYYY-MM-DD-23.json.gz
for fn in glob.glob(f"{path_to_hourly_archives}/*-23.json.gz"):
file = os.path.split(fn)
day = file[1].removesuffix('-23.json.gz')

# day := YYYY-MM-DD
days.append(day)

days.sort()
# Match files with pattern YYYY-MM-DD-HH.json.gz
for fn in glob.glob(f"{path_to_hourly_archives}/*.json.gz"):
file = os.path.split(fn)[1]
# Extract YYYY-MM-DD from YYYY-MM-DD-HH.json.gz
try:
day = file.rsplit('-', 1)[0] # Split on last hyphen and take first part
# Validate the date format
datetime.strptime(day, '%Y-%m-%d')
days.add(day)
except (ValueError, IndexError) as e:
logging.warning(f"Skipping malformed filename: {file} - {str(e)}")
continue

if not len(days):
raise Exception(f'ERROR! No days found at: {path_to_hourly_archives}')
if not days:
raise Exception(f'ERROR! No days found at: {path_to_hourly_archives}')

return days
# Convert set to sorted list
days_list = sorted(list(days))

# Log summary at INFO level
logging.info(f"Discovered {len(days_list)} days:")
for day in days_list:
logging.info(f" {day}")
logging.info(f"Date range: {days_list[0]} to {days_list[-1]}")

return days_list

def generate_markdown_documentation(df: pd.DataFrame, file_path: Optional[str] = None) -> str:
"""
Expand Down Expand Up @@ -1054,7 +1066,7 @@ def create_common_columns(df: pd.DataFrame, column_dtypes: dict[str, str]) -> pd

# defaults: DataFrame.convert_dtypes(infer_objects=True, convert_string=True, convert_integer=True, convert_boolean=True, convert_floating=True, dtype_backend='numpy_nullable'
# https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.convert_dtypes.html#pandas.DataFrame.convert_dtypes
# df_arrow_backed = df.convert_dtypes(dtype_backend='pyarrow')
# df_arrow_backed = df.convert_dtypes(infer_objects=True, convert_string=True, convert_integer=True, convert_boolean=True, convert_floating=True, dtype_backend='numpy_nullable')
#
# both int64 and Int64 result in int64[pyarrow] (the nullable type)
# df_delta = compare_dataframes(df, df_arrow_backed, generate_viz=False)
Expand Down Expand Up @@ -1264,7 +1276,16 @@ def process_day(context: ProcessingContext, day: str):

#@timing_decorator
def process_days(context: ProcessingContext, days: list[str]):
"""process_days Create multiprocessing Pool, Manager, and create one task per day to process."""
"""process_days Create multiprocessing Pool, Manager, and create one task per day to process.
one worker processes one day at a time, and the results are collected.
starmap() allows you to pass multiple arguments to a function by unpacking an iterable
https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool.starmap
1. process_day(context, '2020-01-01')
2. process_day(context, '2020-01-02')
3. process_day(context, '2020-01-03')
...
n. process_day(context, '2020-12-31')
"""

ic()
log_dict = {} # Use a regular dictionary instead of g_manager.dict()
Expand All @@ -1275,8 +1296,9 @@ def process_days(context: ProcessingContext, days: list[str]):

try:
logging.info(f"Processing {len(days)} days using {context.workers} workers")
# ... rest of the function

# process all of the days using workers in parallel
results = g_pool.starmap(process_day, [(context, day) for day in days])
except KeyboardInterrupt:
print("Caught KeyboardInterrupt, terminating workers")
g_pool.terminate()
Expand Down Expand Up @@ -1318,6 +1340,8 @@ def parse_args():
parser.add_argument("--verbose", action='store_true', help="Enable verbose output")
parser.add_argument("--workers", type=int, default=multiprocessing.cpu_count(), help="Number of workers in the pool manager")
parser.add_argument("--log-results", default=f'{os.path.splitext(os.path.basename(__file__))[0]}-worker-results.csv', help="Path to save the log_dict file")
parser.add_argument("--log-level", default="INFO", choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
help="Set the logging level (default: INFO)")
args = parser.parse_args()

# Environment variables override command line arguments
Expand All @@ -1328,9 +1352,21 @@ def parse_args():
args.verbose = bool(os.getenv('GHA_VERBOSE', args.verbose))
args.workers = int(os.getenv('GHA_WORKERS', args.workers))
args.log_results = os.getenv('GHA_LOG_RESULTS', args.log_results)
args.log_level = os.getenv('GHA_LOG_LEVEL', args.log_level)

args.log_results = os.path.join(args.target_dir, args.log_results)

# Set up logging configuration
log_file = os.path.join(args.target_dir, f'{os.path.splitext(os.path.basename(__file__))[0]}-pylog.log')
logging.basicConfig(
level=getattr(logging, args.log_level.upper()),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler()
]
)

if args.verbose:
ic(args)

Expand Down
10 changes: 3 additions & 7 deletions db/sgm-gharchive/json2parquet-consolidated-events.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,6 @@ start_dstat() {
dstat -cdngy --output "$1/dstat.log" &
}

# # Usage: graph_dstat <log_dir>
# graph_dstat() {
# dstat-to-graphite < "$1/dstat.log" > "$1/dstat.png"
# }

display_banner() {
echo "===================================="
echo " gharchive: JSON to Parquet "
Expand Down Expand Up @@ -195,7 +190,9 @@ process_file() {


echo "[decompress] $gzfile --> $json_file"
pigz -v -d -k "$gz_file" -c > "$json_file" || { echo "Decompression failed for $gz_file"; return 1; }
# pigz -v -d -k "$gz_file" -c > "$json_file" || { echo "Decompression failed for $gz_file"; return 1; }

gzip -v -d -k "$gz_file" -c > "$json_file" || { echo "Decompression failed for $gz_file"; return 1; }

echo "[json2parquet] $json_file --> $parquet_file, $schema_file"
echo json2parquet "${json_file}" "${parquet_file}" -c gzip --dictionary --statistics page 2> "${schema_file}"
Expand All @@ -219,7 +216,6 @@ export -f process_file
#
parallelArgs=(--verbose --progress --tag --linebuffer --color \
--jobs ${#consolidated_files[@]} \
# --jobs 5 \
--joblog "$jobLogfile" \
--results "$resultsLogfile")

Expand Down

0 comments on commit 97daad3

Please sign in to comment.