-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbuild-epss-matrix.py
More file actions
75 lines (62 loc) · 2.05 KB
/
Copy pathbuild-epss-matrix.py
File metadata and controls
75 lines (62 loc) · 2.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import os
import glob
import pandas as pd
import duckdb
import re
# Configuration details
csv_folder = './data/epss-csv/' # Folder where your CSV files live
parquet_folder = './data/parquet/' # Folder to store Parquet files
output_parquet = './data/epss_matrix.parquet' # Output file
output_csv = './data/epss_matrix.csv' # Output file
# Create parquet folder if needed
os.makedirs(parquet_folder, exist_ok=True)
# Convert compressed CSVs to Parquet
print("Converting CSVs to Parquet...")
for csv_path in sorted(glob.glob(os.path.join(csv_folder, '*.csv.gz'))):
filename = os.path.basename(csv_path)
# Extract date from filename
match = re.search(r'(\d{4}-\d{2}-\d{2})', filename)
if not match:
print(f"Skipping {filename}, no valid date found.")
continue
date = match.group(1)
# Read the CSV, handling the comment lines and missing values, and only keep the 'epss' column
df = pd.read_csv(
csv_path,
compression='infer',
comment='#',
dtype={'cve': str, 'epss': float} # No 'percentile' column for this exercise
)
df['date'] = date
df = df[['cve', 'epss', 'date']]
parquet_path = os.path.join(parquet_folder, f"{date}.parquet")
df.to_parquet(parquet_path, index=False)
print("Building wide matrix using DuckDB...")
all_dates = sorted([
os.path.basename(p).replace('.parquet', '')
for p in glob.glob(os.path.join(parquet_folder, '*.parquet'))
])
pivot_columns = ',\n '.join([f"'{d}'" for d in all_dates])
# SQL query for DuckDB
query = f"""
WITH all_data AS (
SELECT * FROM read_parquet('{parquet_folder}/*.parquet')
),
pivoted AS (
SELECT *
FROM all_data
PIVOT (
MAX(epss) FOR date IN (
{pivot_columns}
)
)
)
SELECT * FROM pivoted
"""
con = duckdb.connect(database=':memory:')
result_df = con.execute(query).fetchdf()
print(f"Saving Parquet matrix to {output_parquet}")
result_df.to_parquet(output_parquet, index=False)
print(f"Saving CSV to {output_csv}")
result_df.to_csv(output_csv, index=False)
print("Done!")