Skip to content

Record: SP8192 + 3-Layer Recurrence + Parallel Residuals + QK-Gain 5.25 + Legal TTT + Asynchronous Data Loader - val_bpb 1.0803#1532

Open
nogakeren wants to merge 9 commits intoopenai:mainfrom
nogakeren:final-sysopt-on-pr-1493-multithreaded
Open

Record: SP8192 + 3-Layer Recurrence + Parallel Residuals + QK-Gain 5.25 + Legal TTT + Asynchronous Data Loader - val_bpb 1.0803#1532
nogakeren wants to merge 9 commits intoopenai:mainfrom
nogakeren:final-sysopt-on-pr-1493-multithreaded

Conversation

@nogakeren
Copy link
Copy Markdown

@nogakeren nogakeren commented Apr 11, 2026

This submission introduces two system optimizations, both of ShuffledSequenceLoader:

  1. Migrated all next_batch logic to numpy to prevent redundant copies. By calling torch.from_numpy only at the final return, we reduced aten::copy_ overhead by 50% on 1xH100 benchmarks.
  2. Implemented a multi-threaded producer-consumer queue for batch loading. Worker threads pre-fetch and pin memory to the GPU device asynchronously. This hides the latency of CPU-to-GPU data movement and eliminates compute-starvation during the next_batch call.

Using these optimizations yielded a ~1.5% imrpovement in throughput (70-80 more steps).

Attaching the relevant code as non lzma here for anyone who'd like to use :)

import threading
import queue
import os
import glob
from pathlib import Path
def _e(x,y):return os.environ.get(x,y)
class ShuffledSequenceLoader:
    def __init__(self, h, device, prefetch_size=100):
        self.device = device;self.world_size = h.world_size;self.seq_len = h.train_seq_len;self.global_tokens = int(_e('TRAIN_BATCH_TOKENS', 786432));world_size_env = int(_e('WORLD_SIZE', '1'));self.grad_accum_steps = 8 // world_size_env;self.device_tokens = self.global_tokens // (self.world_size * self.grad_accum_steps);self.device_batch_size = self.device_tokens // self.seq_len;all_files = [Path(p) for p in sorted(glob.glob(h.train_files))]
        if not all_files:
            raise FileNotFoundError(f"No files found for pattern: {h.train_files}")
        self.files = all_files[h.rank::h.world_size];self.rng = np.random.Generator(np.random.PCG64(h.rank));self.num_tokens = [_read_num_tokens(f) for f in self.files];self.start_inds = [[] for _ in self.files]
        for si in range(len(self.files)):self._reset_shard(si)
        self.batch_queue = queue.Queue(maxsize=prefetch_size);self.stop_event = threading.Event();self.loader_thread = threading.Thread(target=self._worker_loop, daemon=True);self.loader_thread.start()
    def _reset_shard(self, si):
        max_phase = min(self.seq_len - 1, max(0, self.num_tokens[si] - self.seq_len - 1));phase = int(self.rng.integers(max_phase + 1)) if max_phase > 0 else 0;num_sequences = (self.num_tokens[si] - 1 - phase) // self.seq_len;sequence_order = self.rng.permutation(num_sequences);self.start_inds[si] = (phase + sequence_order * self.seq_len).tolist()
    def _produce_and_upload_batch(self):
        remaining = np.array([len(s) for s in self.start_inds], dtype=np.float64);x_np = np.empty((self.device_batch_size, self.seq_len), dtype=np.int64);y_np = np.empty((self.device_batch_size, self.seq_len), dtype=np.int64)
        for bi in range(self.device_batch_size):
            total = remaining.sum()
            if total <= 0:
                for si in range(len(self.files)):
                    self._reset_shard(si)
                remaining = np.array([len(s) for s in self.start_inds], dtype=np.float64);total = remaining.sum()
            probs = remaining / total;si = int(self.rng.choice(len(self.files), p=probs));start_ind = self.start_inds[si].pop();remaining[si] -= 1;mm = _get_shard_memmap(self.files[si]);window = mm[start_ind : start_ind + self.seq_len + 1];x_np[bi] = window[:-1];y_np[bi] = window[1:]
        x_gpu = torch.from_numpy(x_np).to(self.device, non_blocking=True);y_gpu = torch.from_numpy(y_np).to(self.device, non_blocking=True)
        return x_gpu, y_gpu
    def _worker_loop(self):
        while not self.stop_event.is_set():
            batch = self._produce_and_upload_batch();self.batch_queue.put(batch)
    def next_batch(self):
        return self.batch_queue.get()
    def stop(self):
        self.stop_event.set()
        try:
            self.batch_queue.get_nowait()
        except queue.Empty:
            pass
        self.loader_thread.join()

@nogakeren nogakeren changed the title Record: SP8192 + 3-Layer Recurrence + Parallel Residuals + QK-Gain 5.25 + Legal TTT + Asynchronous Data Loader Record: SP8192 + 3-Layer Recurrence + Parallel Residuals + QK-Gain 5.25 + Legal TTT + Asynchronous Data Loader - val_bpb 1.0803 Apr 11, 2026
@nogakeren nogakeren force-pushed the final-sysopt-on-pr-1493-multithreaded branch from 5695e8f to 2ac8fa9 Compare April 11, 2026 01:15
@MatoTeziTanka
Copy link
Copy Markdown

MatoTeziTanka commented Apr 11, 2026

Community Review — Record: SP8192 + 3-Layer Recurrence + Parallel Residuals + QK-Gain 5.25 + Legal TTT + Asynchronous Data Loader - val_bpb 1.0803

Compliance: NEEDS AUTHOR ACTION — train_gpt.py fails to import on CT2038 (Python 3.10 / torch 2.10.0+cpu)

What I found: The CPU smoke test on CT2038 (proteus-engine, 128 GB RAM, Triton 3.6.0, flash_attn stub, cutlass_evt_fusion stub) failed at the import step with:

SyntaxError: f-string: expecting '}' (line 289)

A few of the common patterns I've seen for this class of error in the 2026-04-11 sweep:

Recommendation: Could you run python3 -c "import py_compile; py_compile.compile('train_gpt.py')" on your records-folder train_gpt.py under Python 3.10 specifically? The eval image is Python 3.10 per Issue #17 / the README, so any parse error on 3.10 blocks the submission at import time before any of the scored-eval logic runs.

Once the parse/import issue is fixed, I'll re-run the compliance audit through the normal pipeline. No other flags identified yet because the audit halts at the import step.


Reviewed by @MatoTeziTankaThe Agora. CPU smoke test (CT2038 proteus-engine, 2026-04-11): IMPORT_FAIL — SyntaxError: f-string: expecting '}' (line 289). Classification via classify_prs.py AST-based classifier; full compliance audit deferred until the import issue is resolved. Auto-drafted from a template and spot-checked before posting.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants