Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
c3bb3c8
WIP
0xideas Aug 15, 2025
4faac4e
Change
0xideas Aug 15, 2025
4c5c954
Change
0xideas Aug 15, 2025
43741e7
Fix device target
0xideas Aug 15, 2025
13073bd
Add valid_sampler
0xideas Aug 15, 2025
1835fa9
Distribute validation loss calc
0xideas Aug 15, 2025
b105559
refactor dataset
0xideas Aug 15, 2025
25ecc5c
use concat_list
0xideas Aug 15, 2025
be58827
Fix multi gpu training
0xideas Aug 15, 2025
ce0e1ac
Adapt logging
0xideas Aug 15, 2025
cee0695
Preprocess into separate files
0xideas Aug 18, 2025
5539028
multi file only for pt
0xideas Aug 18, 2025
c406d68
Rename to SequifierDatasetFromFile
0xideas Aug 18, 2025
5b34ac2
WIP
0xideas Aug 18, 2025
65853cf
Filter on selected columns
0xideas Aug 18, 2025
4228c98
Fix lru_cache location
0xideas Aug 18, 2025
e32e5cb
preprocess into batches
0xideas Aug 18, 2025
bdcc253
Preload data in SequifierDatasetFromFolder
0xideas Aug 18, 2025
b2ebd0d
Add torch.compile
0xideas Aug 18, 2025
3a0963f
Share memory
0xideas Aug 18, 2025
0a57217
add compare_preprocessing
0xideas Aug 19, 2025
bc98f7b
check
0xideas Aug 19, 2025
7636bbe
Fix check
0xideas Aug 20, 2025
b78a6bb
Add grouped random samplers
0xideas Aug 20, 2025
469a0d1
Add lazy loading
0xideas Aug 20, 2025
3480c86
Fix dependency & type hint
0xideas Aug 21, 2025
af03674
Revert to old loading and retrieval logic in SequifierDatasetFromFile
0xideas Aug 21, 2025
1d51cbd
Fix dependency
0xideas Aug 21, 2025
61143b1
change
0xideas Aug 21, 2025
b195ad4
WIP
0xideas Aug 21, 2025
0636a20
separate dataloaders
0xideas Aug 21, 2025
c2432c1
worker aware Sequif
0xideas Aug 21, 2025
425fa20
Improve from file training
0xideas Aug 21, 2025
aa08db2
Set non_blocking=True
0xideas Aug 21, 2025
447241b
WIP multi file preprocessing input
0xideas Aug 22, 2025
c5d6379
WIP multi file preprocessing input
0xideas Aug 22, 2025
8ca31cc
Implement multi file processing
0xideas Aug 22, 2025
9bf4e51
Move cleanup call
0xideas Aug 22, 2025
253d03f
Fix column processing and file overwriting
0xideas Aug 22, 2025
1397f4e
Improve parallelism
0xideas Aug 22, 2025
d8a8f4d
Fix spawn and add prints
0xideas Aug 25, 2025
affd379
Remove threading lock
0xideas Aug 27, 2025
c13e20c
Fix numpy_to_pytorch pass
0xideas Aug 27, 2025
beab81a
move n_cores
0xideas Aug 27, 2025
11041db
cosmetic
0xideas Aug 27, 2025
683d199
Improve preprocessing
0xideas Aug 27, 2025
78d2688
WIP
0xideas Aug 28, 2025
d51c2b9
hopefully fix multiprocessing
0xideas Aug 28, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 132 additions & 0 deletions dev/snippets/compare_preprocessing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
import os

import numpy as np
import polars as pl
import torch


def reconstruct_data(contents, cols):
rows = []

vals = np.array([])
for file in contents.keys():
col_vals = {}
for col in cols:
vals = np.array(contents[file][0][col])
target_vals = np.array(contents[file][1][col])

assert np.all(vals[:, 1:] == target_vals[:, :-1])

# print(f"{file = }, {col = }")
vals = np.concatenate([vals, target_vals[:, -1:]], axis=1)
col_vals[col] = vals

for i in range(vals.shape[0]):
for col in cols:
rows.append([col] + list(np.array(col_vals[col][i, :]).flatten()))

assert vals is not None
schema = ["inputCol"] + [str(i) for i in range(vals.shape[1] - 1, -1, -1)]
# print(f"{len(schema) = }, {vals.shape[1] = }")

data = pl.DataFrame(rows, schema=schema, orient="row")

return data


def apply_ddconfig_to_raw_data(data, ddconfig):
for col, id_map in ddconfig["id_maps"].items():
data = data.with_columns(pl.col(col).replace(id_map))

for col, stats in ddconfig["selected_columns_statistics"].items():
std, mean = stats["std"], stats["mean"]
data = data.with_columns(((pl.col(col) - mean) / (std + 1e-9)).alias(col))

return data


def find_sequence_in_raw_data(data, col, sequence):
n = len(sequence)
found_indices = []
for i in range(data.shape[0] - n):
test = True
for j, s in enumerate(sequence):
if test and data[i + j, col] != s:
test = False

if test:
found_indices.append(i)
print(i)
return found_indices


def equal(a, b):
return a == b


def find_sequence_in_preprocessed_data(data, col, sequence, fn=equal):
n = len(sequence)
col_index = np.where(np.array(list(data.columns)) == "inputCol")[0][0]
found_indices = []
for i, row in enumerate(data.iter_rows(named=False)):
if row[col_index] == col:
test = True
for j, e in enumerate(row[col_index + 1 :]):
if test and j < n:
# print(f"{e = }, {sequence[j] = }, {fn(e, sequence[j]) = }")
if not fn(e, sequence[j]):
test = False
if test:
found_indices.append(i)
return found_indices


def compare_preprocessed_data(data1, data2, col, fn, n_cols=4):
col_index = np.where(np.array(list(data1.columns)) == "inputCol")[0][0]

start_sequence_cols = list(data1.columns)[col_index + 1 : col_index + 1 + n_cols]
# print(start_sequence_cols)

for i in [int(ii) for ii in np.where(data1["inputCol"].to_numpy() == col)[0]]:
target_sequence = list(data1[i, start_sequence_cols].to_numpy().flatten())
# print(f"{target_sequence = }")
found_indices = find_sequence_in_preprocessed_data(
data2, col, target_sequence, fn
)

assert len(found_indices) >= 1, f"{found_indices = }"


def load_pt_data(path, cols):
contents = {}
for root, dirs, files in os.walk(path):
for file in files:
# print(file)
try:
contents[file] = torch.load(
os.path.join(root, file), map_location="cpu"
)
except Exception as e:
print(f"failed: {file}: {e}")

data = reconstruct_data(contents, cols)

return data


def load_and_compare_pt_and_parquet(parquet_path, pt_path, n_cols):
data1 = pl.read_parquet(parquet_path)
ci = [
int(ii)
for ii in np.where(data1["inputCol"].to_numpy() == data1[0, "inputCol"])[0]
]
cols = data1["inputCol"].to_numpy()[: ci[1]]
data2 = load_pt_data(pt_path, cols)

for col in cols:
compare_preprocessed_data(
data1, data2, col, lambda a, b: np.abs(a - b) < 0.001, n_cols=n_cols
)
compare_preprocessed_data(
data2, data1, col, lambda a, b: np.abs(a - b) < 0.001, n_cols=n_cols
)
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "sequifier"
version = "0.6.2.8"
version = "0.6.5.7"
authors = [
{ name = "Leon Luithlen", email = "[email protected]" },
]
Expand Down Expand Up @@ -32,6 +32,7 @@ dependencies = [
"pyarrow>=15.0,<16.0",
"fastparquet>=2024.2.0,<2025.0.0",
"beartype>=0.18.5,<0.19.0",
"psutil>=7.0.0,<8.0.0"
]

[project.urls]
Expand Down
23 changes: 22 additions & 1 deletion src/sequifier/config/preprocess_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class PreprocessorModel(BaseModel):
data_path: str
read_format: str = "csv"
write_format: str = "parquet"
combine_into_single_file: bool = True
selected_columns: Optional[list[str]]

group_proportions: list[float]
Expand All @@ -46,6 +47,8 @@ class PreprocessorModel(BaseModel):
max_rows: Optional[int]
seed: int
n_cores: Optional[int]
batches_per_file: int = 1024
process_by_file: bool = True

@validator("data_path", always=True)
def validate_data_path(cls, v: str) -> str:
Expand All @@ -55,13 +58,25 @@ def validate_data_path(cls, v: str) -> str:

@validator("read_format", "write_format", always=True)
def validate_format(cls, v: str) -> str:
supported_formats = ["csv", "parquet"]
supported_formats = ["csv", "parquet", "pt"]
if v not in supported_formats:
raise ValueError(
f"Currently only {', '.join(supported_formats)} are supported"
)
return v

@validator("combine_into_single_file", always=True)
def validate_format2(cls, v: bool, values: dict):
if v is False and values["write_format"] != "pt":
raise ValueError(
"Only with write_format 'pt' can combine_into_single_file be set to False"
)
if values["write_format"] == "pt" and v is True:
raise ValueError(
"With write_format 'pt', combine_into_single_file must be set to False"
)
return v

@validator("group_proportions")
def validate_proportions_sum(cls, v: list[float]) -> list[float]:
if not np.isclose(np.sum(v), 1.0):
Expand Down Expand Up @@ -90,6 +105,12 @@ def validate_step_sizes(cls, v: Optional[list[int]], values: dict) -> list[int]:
raise ValueError(f"All seq_step_sizes must be positive integers: {v}")
return v

@validator("batches_per_file")
def validate_batches_per_file(cls, v: int) -> int:
if v < 1:
raise ValueError("batches_per_file must be a positive integer")
return v

def __init__(self, **kwargs):
default_seq_step_size = [kwargs["seq_length"]] * len(
kwargs["group_proportions"]
Expand Down
21 changes: 18 additions & 3 deletions src/sequifier/config/train_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ def load_train_config(
) as f:
dd_config = json.loads(f.read())

split_paths = dd_config["split_paths"]
if config_values["read_format"] == "pt":
split_paths = [path.replace(".pt", "") for path in split_paths]

config_values["column_types"] = config_values.get(
"column_types", dd_config["column_types"]
)
Expand Down Expand Up @@ -154,13 +158,13 @@ def load_train_config(
"n_classes", dd_config["n_classes"]
)
config_values["training_data_path"] = normalize_path(
config_values.get("training_data_path", dd_config["split_paths"][0]),
config_values.get("training_data_path", split_paths[0]),
config_values["project_path"],
)
config_values["validation_data_path"] = normalize_path(
config_values.get(
"validation_data_path",
dd_config["split_paths"][min(1, len(dd_config["split_paths"]) - 1)],
split_paths[min(1, len(split_paths) - 1)],
),
config_values["project_path"],
)
Expand Down Expand Up @@ -205,6 +209,11 @@ class TrainingSpecModel(BaseModel):
)
)
continue_training: bool = True
distributed: bool = False
load_full_data_to_ram: bool = True
world_size: int = 1
num_workers: int = 0
backend: str = "nccl"

def __init__(self, **kwargs):
super().__init__(
Expand Down Expand Up @@ -303,14 +312,20 @@ def validate_read_format(cls, v):
assert v in [
"csv",
"parquet",
], "Currently only 'csv' and 'parquet' are supported"
"pt",
], "Currently only 'csv', 'parquet' and 'pt' are supported"
return v

@validator("training_spec")
def validate_training_spec(cls, v, values):
assert set(values["target_columns"]) == set(
v.criterion.keys()
), "target_columns and criterion must contain the same values/keys"

if v.distributed:
assert (
values["read_format"] == "pt"
), "If distributed is set to 'true', the format has to be 'pt'"
return v

@validator("column_types")
Expand Down
67 changes: 27 additions & 40 deletions src/sequifier/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,59 +90,45 @@ def subset_to_selected_columns(
def numpy_to_pytorch(
data: pl.DataFrame,
column_types: dict[str, torch.dtype],
selected_columns: list[str],
target_columns: list[str],
all_columns: list[str], # Changed from selected_columns, target_columns
seq_length: int,
device: str,
to_device: bool,
) -> tuple[dict[str, Tensor], dict[str, Tensor]]:
"""
Converts data from a Polars DataFrame to PyTorch Tensors based on specified columns.

This function processes a DataFrame that is structured in a "long" format,
where one column ('inputCol') identifies the feature or target type, and other
columns ('0', '1', '2', ...) contain the sequence data.
"""
targets = {}
# Define the column names for the target sequences, e.g., ['29', '28', ..., '0']
) -> dict[str, Tensor]: # Now returns a single dictionary
# Define both input and target sequence column names
input_seq_cols = [str(c) for c in range(seq_length, 0, -1)]
target_seq_cols = [str(c) for c in range(seq_length - 1, -1, -1)]

for target_column in target_columns:
# Filter for the target, select sequence columns, and convert to a tensor
target_tensor = torch.tensor(
data.filter(pl.col("inputCol") == target_column)
.select(target_seq_cols)
# We will create a unified dictionary
unified_tensors = {}

for col_name in all_columns:
# Create the input sequence tensor (e.g., from t=1 to t=L)
input_tensor = torch.tensor(
data.filter(pl.col("inputCol") == col_name)
.select(input_seq_cols)
.to_numpy(),
dtype=column_types[target_column],
dtype=column_types[col_name],
)
unified_tensors[col_name] = input_tensor

if to_device:
target_tensor = target_tensor.to(device)
targets[target_column] = target_tensor

sequence = {}
# Define the column names for the input sequences, e.g., ['30', '29', ..., '1']
input_seq_cols = [str(c) for c in range(seq_length, 0, -1)]

for col in selected_columns:
# Filter for the feature, select sequence columns, and convert to a tensor
feature_tensor = torch.tensor(
data.filter(pl.col("inputCol") == col).select(input_seq_cols).to_numpy(),
dtype=column_types[col],
# Create the target sequence tensor (e.g., from t=0 to t=L-1)
# We'll store it with a "_target" suffix to distinguish it
target_tensor = torch.tensor(
data.filter(pl.col("inputCol") == col_name)
.select(target_seq_cols)
.to_numpy(),
dtype=column_types[col_name],
)
unified_tensors[f"{col_name}_target"] = target_tensor

if to_device:
feature_tensor = feature_tensor.to(device)
sequence[col] = feature_tensor

return sequence, targets
return unified_tensors


class LogFile:
"""A class for logging to multiple files with different levels."""

@beartype
def __init__(self, path: str, open_mode: str):
def __init__(self, path: str, open_mode: str, rank: Optional[int] = None):
self.rank = rank
self.levels = [2, 3]
self._files = {
level: open(path.replace("[NUMBER]", str(level)), open_mode)
Expand All @@ -158,7 +144,8 @@ def write(self, string: str, level: int = 3) -> None:
self._files[level2].write(f"{string}\n")
self._files[level2].flush()
if level >= 3:
print(string)
if self.rank is None or self.rank == 0:
print(string)

@beartype
def close(self) -> None:
Expand Down
12 changes: 3 additions & 9 deletions src/sequifier/infer.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,15 +244,9 @@ def get_probs_preds(
column_types: dict[str, torch.dtype],
apply_normalization_inversion: bool = True,
) -> tuple[Optional[dict[str, np.ndarray]], dict[str, np.ndarray]]:
X, _ = numpy_to_pytorch(
data,
column_types,
config.selected_columns,
config.target_columns,
config.seq_length,
config.device,
to_device=False,
)
all_columns = sorted(list(set(config.selected_columns + config.target_columns)))

X = numpy_to_pytorch(data, column_types, all_columns, config.seq_length)
X = {col: X_col.numpy() for col, X_col in X.items()}
del data

Expand Down
Loading