From 965873c591368f84d43542dd9584f9c383a96370 Mon Sep 17 00:00:00 2001
From: Mark Rousskov <thismark@amazon.com>
Date: Wed, 22 Jan 2025 21:38:29 +0000
Subject: [PATCH 1/5] Swap receiver map to new data structure

A key id flows through the following structures:

* max seen key ID
* 32-bit bitset
* variable sized set (bitset or sorted list, tracking back 65k)
---
 dc/s2n-quic-dc/Cargo.toml                     |   1 +
 dc/s2n-quic-dc/src/path/secret/receiver.rs    | 469 ++++++------------
 .../src/path/secret/receiver/tests.rs         | 169 ++-----
 3 files changed, 209 insertions(+), 430 deletions(-)

diff --git a/dc/s2n-quic-dc/Cargo.toml b/dc/s2n-quic-dc/Cargo.toml
index 8b4bad8e83..209f9e1171 100644
--- a/dc/s2n-quic-dc/Cargo.toml
+++ b/dc/s2n-quic-dc/Cargo.toml
@@ -44,6 +44,7 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"], optional = tr
 zerocopy = { version = "0.7", features = ["derive"] }
 zeroize = "1"
 parking_lot = "0.12"
+slab = "0.4"
 
 [dev-dependencies]
 bolero = "0.12"
diff --git a/dc/s2n-quic-dc/src/path/secret/receiver.rs b/dc/s2n-quic-dc/src/path/secret/receiver.rs
index 41e413dac2..4320ec4549 100644
--- a/dc/s2n-quic-dc/src/path/secret/receiver.rs
+++ b/dc/s2n-quic-dc/src/path/secret/receiver.rs
@@ -1,293 +1,202 @@
 // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
 // SPDX-License-Identifier: Apache-2.0
 
-use crate::credentials::{Credentials, Id, KeyId};
-use s2n_quic_core::packet::number::{
-    PacketNumber, PacketNumberSpace, SlidingWindow, SlidingWindowError,
-};
-use std::{
-    cell::UnsafeCell,
-    ptr::NonNull,
-    sync::{
-        atomic::{AtomicU64, Ordering},
-        Arc, Mutex,
-    },
-};
+use std::sync::{Arc, Mutex};
 
-const SHARED_ENTRIES: usize = 1 << 20;
-// Maximum page size on current machines (macOS aarch64 has 16kb pages)
-//
-// mmap is documented as failing if we don't request a page boundary. Currently our sizes work out
-// such that rounding is useless, but this is good future proofing.
-const MAX_PAGE: usize = 16_384;
-const SHARED_ALLOCATION: usize = {
-    let element = std::mem::size_of::<SharedSlot>();
-    let size = element * SHARED_ENTRIES;
-    // TODO use `next_multiple_of` once MSRV is >=1.73
-    (size + MAX_PAGE - 1) / MAX_PAGE * MAX_PAGE
-};
+use crate::credentials::{Credentials, KeyId};
 
 #[derive(Debug)]
 pub struct Shared {
-    secret: u64,
-    backing: NonNull<SharedSlot>,
+    // FIXME: Improve scalability by avoiding the global mutex.
+    // Most likely strategy is something like fixed-size which (in principle) allows per-entry
+    // Mutex's. Likely means dropping the slab dependency.
+    entries: Mutex<slab::Slab<InnerState>>,
 }
 
-unsafe impl Send for Shared {}
-unsafe impl Sync for Shared {}
+impl Shared {
+    pub fn new() -> Arc<Shared> {
+        Arc::new(Shared {
+            entries: Mutex::new(slab::Slab::new()),
+        })
+    }
 
-impl Drop for Shared {
-    fn drop(&mut self) {
-        unsafe {
-            if libc::munmap(self.backing.as_ptr().cast(), SHARED_ALLOCATION) != 0 {
-                // Avoid panicking in a destructor, just let the memory leak while logging. We
-                // expect this to be essentially a global singleton in most production cases so
-                // likely we're exiting the process anyway.
-                eprintln!(
-                    "Failed to unmap memory: {:?}",
-                    std::io::Error::last_os_error()
-                );
-            }
+    pub fn new_receiver(self: Arc<Self>) -> State {
+        let mut guard = self.entries.lock().unwrap();
+        let key = guard.insert(InnerState::new());
+        State {
+            shared: self.clone(),
+            entry: key,
         }
     }
-}
-
-const fn assert_copy<T: Copy>() {}
 
-struct SharedSlot {
-    id: UnsafeCell<Id>,
-    key_id: AtomicU64,
+    fn remove(&self, entry: usize) {
+        let mut guard = self.entries.lock().unwrap_or_else(|e| e.into_inner());
+        guard.remove(entry);
+    }
 }
 
-impl SharedSlot {
-    fn try_lock(&self) -> Option<SharedSlotGuard<'_>> {
-        let current = self.key_id.load(Ordering::Relaxed);
-        if current & LOCK != 0 {
-            // If we are already locked, then give up.
-            // A concurrent thread updated this slot, any write we do would squash that thread's
-            // write. Doing so if that thread remove()d may make sense in the future but not right
-            // now.
-            return None;
-        }
-        let Ok(_) = self.key_id.compare_exchange(
-            current,
-            current | LOCK,
-            Ordering::Acquire,
-            Ordering::Relaxed,
-        ) else {
-            return None;
-        };
+#[derive(Debug)]
+pub struct State {
+    // FIXME: Avoid storing Shared pointer inside every path secret entry.
+    // Instead thread the pointer through all the methods.
+    shared: Arc<Shared>,
+    // FIXME: shrink to u32 index?
+    entry: usize,
+}
 
-        Some(SharedSlotGuard {
-            slot: self,
-            key_id: current,
-        })
+impl Drop for State {
+    fn drop(&mut self) {
+        self.shared.remove(self.entry);
     }
 }
 
-struct SharedSlotGuard<'a> {
-    slot: &'a SharedSlot,
-    key_id: u64,
+#[derive(Clone, Debug)]
+pub struct InnerState {
+    max_seen: u64,
+    bitset: u32,
+    // FIXME: simple, later move into shared memory.
+    list: Vec<u64>,
 }
 
-impl SharedSlotGuard<'_> {
-    fn write_id(&mut self, id: Id) {
-        // Store the new ID.
-        // SAFETY: We hold the lock since we are in the guard.
-        unsafe {
-            // Note: no destructor is run for the previously stored element, but Id is Copy.
-            // If we did want to run a destructor we'd have to ensure that we replaced a PRESENT
-            // entry.
-            assert_copy::<Id>();
-            std::ptr::write(self.slot.id.get(), id);
+impl InnerState {
+    fn new() -> Self {
+        Self {
+            max_seen: u64::MAX,
+            bitset: 0,
+            list: Vec::new(),
         }
     }
-
-    fn id(&self) -> Id {
-        // SAFETY: We hold the lock, so copying out the Id is safe.
-        unsafe { *self.slot.id.get() }
-    }
 }
 
-impl Drop for SharedSlotGuard<'_> {
-    fn drop(&mut self) {
-        self.slot.key_id.store(self.key_id, Ordering::Release);
+impl State {
+    pub fn without_shared() -> State {
+        let mut entries = Mutex::new(slab::Slab::with_capacity(1));
+        entries.get_mut().unwrap().insert(InnerState::new());
+        State {
+            shared: Arc::new(Shared { entries }),
+            entry: 0,
+        }
     }
-}
-
-const LOCK: u64 = 1 << 62;
-const PRESENT: u64 = 1 << 63;
 
-impl Shared {
-    pub fn new() -> Arc<Shared> {
-        let mut secret = [0; 8];
-        aws_lc_rs::rand::fill(&mut secret).expect("random is available");
-        let shared = Shared {
-            secret: u64::from_ne_bytes(secret),
-            backing: unsafe {
-                // Note: We rely on the zero-initialization provided by the kernel. That ensures
-                // that an entry in the map is not LOCK'd to begin with and is not PRESENT as well.
-                let ptr = libc::mmap(
-                    std::ptr::null_mut(),
-                    SHARED_ALLOCATION,
-                    libc::PROT_READ | libc::PROT_WRITE,
-                    libc::MAP_ANONYMOUS | libc::MAP_PRIVATE,
-                    0,
-                    0,
-                );
-                // -1
-                if ptr as usize == usize::MAX {
-                    panic!(
-                        "Failed to allocate backing allocation for shared: {:?}",
-                        std::io::Error::last_os_error()
-                    );
-                }
-                NonNull::new(ptr).unwrap().cast()
-            },
-        };
-
-        // We need to modify the slot to which an all-zero path secert ID and key ID map. Otherwise
-        // we'd return Err(AlreadyExists) for that entry which isn't correct - it has not been
-        // inserted or removed, so it should be Err(Unknown).
-        //
-        // This is the only slot that needs modification. All other slots are never used for lookup
-        // of this set of credentials and so containing this set of credentials is fine.
-        let slot = shared.slot(&Credentials {
-            id: Id::from([0; 16]),
-            key_id: KeyId::new(0).unwrap(),
-        });
-        // The max key ID is never used by senders (checked on the sending side), while avoiding
-        // taking a full bit out of the range of key IDs. We also statically return Unknown for it
-        // on removal to avoid a non-local invariant.
-        slot.key_id.store(KeyId::MAX.as_u64(), Ordering::Relaxed);
-
-        Arc::new(shared)
+    #[cfg(test)]
+    pub fn snapshot(&self) -> InnerState {
+        self.shared.entries.lock().unwrap()[self.entry].clone()
     }
 
-    pub fn new_receiver(self: Arc<Shared>) -> State {
-        State::with_shared(self)
+    pub fn with_shared(shared: Arc<Shared>) -> State {
+        shared.new_receiver()
     }
 
-    fn insert(&self, identity: &Credentials) {
-        let slot = self.slot(identity);
-        let Some(mut guard) = slot.try_lock() else {
-            return;
-        };
-        guard.write_id(identity.id);
-        guard.key_id = *identity.key_id | PRESENT;
+    pub fn minimum_unseen_key_id(&self) -> KeyId {
+        // wrapping_add ensures that our sentinel u64::MAX is zero, which is accurate (i.e., if we
+        // have not seen any keys, then we have not seen the zeroth key either).
+        KeyId::new(
+            self.shared.entries.lock().unwrap()[self.entry]
+                .max_seen
+                .wrapping_add(1),
+        )
+        .unwrap()
+    }
+
+    pub fn pre_authentication(&self, _credentials: &Credentials) -> Result<(), Error> {
+        // always pass for now
+        Ok(())
     }
 
-    fn remove(&self, identity: &Credentials) -> Result<(), Error> {
-        // See `new` for details.
-        if identity.key_id == KeyId::MAX.as_u64() {
-            return Err(Error::Unknown);
-        }
-
-        let slot = self.slot(identity);
-        let previous = slot.key_id.load(Ordering::Relaxed);
-        if previous & LOCK != 0 {
-            // If we are already locked, then give up.
-            // A concurrent thread updated this slot, any write we do would squash that thread's
-            // write. No concurrent thread could have inserted what we're looking for since
-            // both insert and remove for a single path secret ID run under a Mutex.
-            return Err(Error::Unknown);
-        }
-        if previous & (!PRESENT) != *identity.key_id {
-            // If the currently stored entry does not match our desired KeyId,
-            // then we don't know whether this key has been replayed or not.
-            return Err(Error::Unknown);
-        }
-
-        let Some(mut guard) = slot.try_lock() else {
-            // Don't try to win the race by spinning, let the other thread proceed.
-            return Err(Error::Unknown);
-        };
+    pub fn post_authentication(&self, credentials: &Credentials) -> Result<(), Error> {
+        let entry = &mut self.shared.entries.lock().unwrap()[self.entry];
 
-        // Check if the path secret ID matches.
-        if guard.id() != identity.id {
-            return Err(Error::Unknown);
-        }
+        if entry.max_seen == u64::MAX {
+            // no need to touch the bitset, we've not seen any of the previous entries.
+            entry.max_seen = *credentials.key_id;
 
-        // Ok, at this point we know that the key ID and the path secret ID both match.
+            // Iterate over the unseen IDs that were > previous max seen, and
+            // will not *become* tracked now (i.e., don't fall into the new bitset).
+            //
+            // The bitset tracks (max_seen-32)..=(max_seen-1)
+            let end = entry.max_seen.saturating_sub(u32::BITS as u64);
+            // Push start up so we don't push more than 65k elements, which is our list limit.
+            // This avoids a too-long loop if we jump forward too much.
+            let start = end.saturating_sub(u16::MAX as u64);
+            for id in start..end {
+                entry.list.push(id);
+            }
 
-        let ret = if guard.key_id & PRESENT != 0 {
             Ok(())
-        } else {
-            Err(Error::AlreadyExists)
-        };
-
-        // Release the lock, removing the PRESENT bit (which may already be missing).
-        guard.key_id = *identity.key_id;
-
-        ret
-    }
-
-    fn index(&self, identity: &Credentials) -> usize {
-        let hash = u64::from_ne_bytes(identity.id[..8].try_into().unwrap())
-            ^ *identity.key_id
-            ^ self.secret;
-        let index = hash & (SHARED_ENTRIES as u64 - 1);
-        index as usize
-    }
-
-    fn slot(&self, identity: &Credentials) -> &SharedSlot {
-        let index = self.index(identity);
-        // SAFETY: in-bounds -- the & above truncates such that we're always in the appropriate
-        // range that we allocated with mmap above.
-        //
-        // Casting to a reference is safe -- the Slot type has an UnsafeCell around all of the data
-        // (either inside the atomic or directly).
-        unsafe { self.backing.as_ptr().add(index).as_ref().unwrap_unchecked() }
-    }
-}
-
-#[derive(Debug)]
-pub struct State {
-    // Minimum that we're potentially willing to accept.
-    // This is lazily updated and so may be out of date.
-    min_key_id: AtomicU64,
-
-    // This is the maximum ID we've seen so far. This is sent to peers for when we cannot determine
-    // if the packet sent is replayed as it falls outside our replay window. Peers use this
-    // information to resynchronize on the latest state.
-    max_seen_key_id: AtomicU64,
+        } else if credentials.key_id > entry.max_seen {
+            let previous_max = entry.max_seen;
+            entry.max_seen = *credentials.key_id;
+            let delta = entry.max_seen - previous_max;
+
+            // This is the range that is going to get shifted out.
+            //
+            // Any bit not set means we haven't yet seen it, so we should add it to our list.
+            //
+            // If we shifted by 1, then the range we want is 31..=31 (1 bit, 1 << 31, top bit)
+            // If we shifted by 2, then the range we want is 30..=31 (2 bits)
+            // If we shifted by 30, then the range we want is 2..=31 (30 bits)
+            // If we shifted by 60, then the range we want is 0..=31 (all 32 bits)
+            for bit in (32u64.saturating_sub(delta)..=31).rev() {
+                // +1 since bit 0 is previous_max - 1
+                let Some(id) = previous_max.checked_sub(bit + 1) else {
+                    continue;
+                };
+                if entry.bitset & (1 << bit) == 0 {
+                    entry.list.push(id);
+                }
+            }
 
-    seen: Mutex<SlidingWindow>,
+            // Iterate over the unseen IDs that were > previous max seen, and
+            // will not *become* tracked now (i.e., don't fall into the new bitset).
+            //
+            // The bitset tracks (max_seen-32)..=(max_seen-1)
+            let end = entry.max_seen.saturating_sub(u32::BITS as u64);
+            // Push start up so we don't push more than 65k elements, which is our list limit.
+            // This avoids a too-long loop if we jump forward too much.
+            let start = (previous_max + 1).max(end.saturating_sub(u16::MAX as u64));
+            for id in start..end {
+                entry.list.push(id);
+            }
 
-    shared: Option<Arc<Shared>>,
-}
+            if delta <= u32::BITS as u64 {
+                // as u32 is safe since we checked we're less than 32.
+                let delta = delta as u32;
+
+                // Shift the no longer fitting bits out
+                // 0s mean we have *not* seen the entry, so shifting those in for the middle part
+                entry.bitset = entry.bitset.checked_shl(delta).unwrap_or(0);
+                // Set the bit corresponding to previously max-seen.
+                entry.bitset |= 1 << (delta - 1);
+            } else {
+                entry.bitset = 0;
+            }
 
-impl super::map::SizeOf for Mutex<SlidingWindow> {
-    fn size(&self) -> usize {
-        // If we don't need drop, it's very likely that this type is fully contained in size_of
-        // Self. This simplifies implementing this trait for e.g. std types.
-        //
-        // Mutex on macOS (at least) has a more expensive, pthread-based impl that allocates. But
-        // on Linux there's no extra allocation.
-        if cfg!(target_os = "linux") {
-            assert!(
-                !std::mem::needs_drop::<Self>(),
-                "{:?} requires custom SizeOf impl",
-                std::any::type_name::<Self>()
-            );
+            // forward shift is always successful
+            Ok(())
+        } else if credentials.key_id == entry.max_seen {
+            Err(Error::AlreadyExists)
+        } else {
+            let delta = entry.max_seen - *credentials.key_id;
+            if delta <= u32::BITS as u64 {
+                // -1 for the transition from max seen to the bitset
+                if (entry.bitset & (1 << (delta - 1) as u32)) != 0 {
+                    Err(Error::AlreadyExists)
+                } else {
+                    entry.bitset |= 1 << (delta - 1) as u32;
+                    Ok(())
+                }
+            } else if let Ok(idx) = entry.list.binary_search(&*credentials.key_id) {
+                // FIXME: augment with bitset for fast removal
+                entry.list.remove(idx);
+                Ok(())
+            } else {
+                Err(Error::Unknown)
+            }
         }
-        std::mem::size_of::<Self>()
     }
 }
 
-impl super::map::SizeOf for State {
-    fn size(&self) -> usize {
-        let State {
-            min_key_id,
-            max_seen_key_id,
-            seen,
-            shared,
-        } = self;
-        // shared is shared across all State's (effectively) so we don't currently account for that
-        // allocation.
-        min_key_id.size() + max_seen_key_id.size() + seen.size() + std::mem::size_of_val(shared)
-    }
-}
+impl super::map::SizeOf for State {}
 
 #[derive(Clone, Copy, Debug, PartialEq, Eq, thiserror::Error)]
 pub enum Error {
@@ -300,73 +209,5 @@ pub enum Error {
     Unknown,
 }
 
-impl State {
-    pub fn without_shared() -> State {
-        State {
-            min_key_id: Default::default(),
-            max_seen_key_id: Default::default(),
-            seen: Default::default(),
-            shared: None,
-        }
-    }
-
-    pub fn with_shared(shared: Arc<Shared>) -> State {
-        State {
-            min_key_id: Default::default(),
-            max_seen_key_id: Default::default(),
-            seen: Default::default(),
-            shared: Some(shared),
-        }
-    }
-
-    pub fn pre_authentication(&self, identity: &Credentials) -> Result<(), Error> {
-        if self.min_key_id.load(Ordering::Relaxed) > *identity.key_id {
-            return Err(Error::Unknown);
-        }
-
-        Ok(())
-    }
-
-    pub fn minimum_unseen_key_id(&self) -> KeyId {
-        KeyId::try_from(self.max_seen_key_id.load(Ordering::Relaxed) + 1).unwrap()
-    }
-
-    /// Called after decryption has been performed
-    pub fn post_authentication(&self, identity: &Credentials) -> Result<(), Error> {
-        let key_id = identity.key_id;
-        self.max_seen_key_id.fetch_max(*key_id, Ordering::Relaxed);
-        let pn = PacketNumberSpace::Initial.new_packet_number(key_id);
-
-        // Note: intentionally retaining this lock across potential insertion into the shared map.
-        // This avoids the case where we have evicted an entry but cannot see it in the shared map
-        // yet from a concurrent thread. This should not be required for correctness but helps
-        // reasoning about the state of the world.
-        let mut seen = self.seen.lock().unwrap();
-        match seen.insert_with_evicted(pn) {
-            Ok(evicted) => {
-                if let Some(shared) = &self.shared {
-                    // FIXME: Consider bounding the number of evicted entries to insert or
-                    // otherwise optimizing? This can run for at most 128 entries today...
-                    for evicted in evicted {
-                        shared.insert(&Credentials {
-                            id: identity.id,
-                            key_id: PacketNumber::as_varint(evicted),
-                        });
-                    }
-                }
-                Ok(())
-            }
-            Err(SlidingWindowError::TooOld) => {
-                if let Some(shared) = &self.shared {
-                    shared.remove(identity)
-                } else {
-                    Err(Error::Unknown)
-                }
-            }
-            Err(SlidingWindowError::Duplicate) => Err(Error::AlreadyExists),
-        }
-    }
-}
-
 #[cfg(test)]
 mod tests;
diff --git a/dc/s2n-quic-dc/src/path/secret/receiver/tests.rs b/dc/s2n-quic-dc/src/path/secret/receiver/tests.rs
index 8e49634a2b..67f5a5f96e 100644
--- a/dc/s2n-quic-dc/src/path/secret/receiver/tests.rs
+++ b/dc/s2n-quic-dc/src/path/secret/receiver/tests.rs
@@ -2,6 +2,7 @@
 // SPDX-License-Identifier: Apache-2.0
 
 use super::*;
+use crate::credentials::Id;
 use bolero::check;
 use rand::{seq::SliceRandom, Rng, SeedableRng};
 use std::collections::{binary_heap::PeekMut, BinaryHeap, HashSet};
@@ -90,61 +91,6 @@ fn check_ordered_u16() {
     });
 }
 
-#[test]
-fn shared() {
-    let subject = Shared::new();
-    let id1 = Id::from([0; 16]);
-    let mut id2 = Id::from([0; 16]);
-    // This is a part of the key ID not used for hashing.
-    id2[10] = 1;
-    let key1 = KeyId::new(0).unwrap();
-    let key2 = KeyId::new(1).unwrap();
-    subject.insert(&Credentials {
-        id: id1,
-        key_id: key1,
-    });
-    assert_eq!(
-        subject.remove(&Credentials {
-            id: id1,
-            key_id: key1,
-        }),
-        Ok(())
-    );
-    assert_eq!(
-        subject.remove(&Credentials {
-            id: id1,
-            key_id: key1,
-        }),
-        Err(Error::AlreadyExists)
-    );
-    subject.insert(&Credentials {
-        id: id2,
-        key_id: key1,
-    });
-    assert_eq!(
-        subject.remove(&Credentials {
-            id: id1,
-            key_id: key1,
-        }),
-        Err(Error::Unknown)
-    );
-    assert_eq!(
-        subject.remove(&Credentials {
-            id: id1,
-            key_id: key2,
-        }),
-        Err(Error::Unknown)
-    );
-    // Removal never taints an entry, so this is still fine.
-    assert_eq!(
-        subject.remove(&Credentials {
-            id: id2,
-            key_id: key1,
-        }),
-        Ok(())
-    );
-}
-
 // This test is not particularly interesting, it's mostly just the same as the random tests above
 // which insert ordered and unordered values. Mostly it tests that we continue to allow 129 IDs of
 // arbitrary reordering.
@@ -209,6 +155,9 @@ fn check_delayed_specific() {
 
 // delay represents the *minimum* delay a delayed entry sees. The maximum is up to SHARED_ENTRIES.
 fn check_delayed_inner(seed: u64, delay: u16) {
+    // FIXME: re-word
+    const SHARED_ENTRIES: usize = 65_000;
+
     // We expect that the shared map is always big enough to absorb our delay.
     // (This is statically true; u16::MAX < SHARED_ENTRIES).
     assert!((delay as usize) < SHARED_ENTRIES);
@@ -267,19 +216,22 @@ impl Model {
     fn insert(&mut self, op: u64) {
         let pid = Id::from([0; 16]);
         let id = KeyId::new(op).unwrap();
-        let expected = self.oracle.insert(op);
-        if expected {
+        let expected = match self.oracle.insert(op) {
+            true => Ok(()),
+            false => Err(Error::AlreadyExists),
+        };
+        if expected.is_ok() {
             self.insert_order.push(op);
         }
         let actual = self.subject.post_authentication(&Credentials {
             id: pid,
             key_id: id,
         });
-        if actual.is_ok() != expected {
+        if actual.is_ok() != expected.is_ok() {
             let mut oracle = self.oracle.iter().collect::<Vec<_>>();
             oracle.sort_unstable();
             panic!(
-                "Inserting {:?} failed, in oracle: {}, in subject: {:?}, inserted: {:?}",
+                "Inserting {:?} failed, in oracle: {:?}, in subject: {:?}, inserted: {:?}",
                 op, expected, actual, self.insert_order
             );
         }
@@ -287,66 +239,51 @@ impl Model {
 }
 
 #[test]
-fn shared_no_collisions() {
-    let mut seen = HashSet::new();
-    let shared = Shared::new();
-    for key_id in 0..SHARED_ENTRIES as u64 {
-        let index = shared.index(&Credentials {
-            id: Id::from([0; 16]),
-            key_id: KeyId::new(key_id).unwrap(),
-        });
-        assert!(seen.insert(index));
-    }
-
-    // The next entry should collide, since we will wrap around.
-    let index = shared.index(&Credentials {
-        id: Id::from([0; 16]),
-        key_id: KeyId::new(SHARED_ENTRIES as u64 + 1).unwrap(),
-    });
-    assert!(!seen.insert(index));
-}
+fn check_manual_state() {
+    let state = State::without_shared();
+    let pid = Id::from([0; 16]);
+    let creds = |id: u64| Credentials {
+        id: pid,
+        key_id: KeyId::new(id).unwrap(),
+    };
+    state.post_authentication(&creds(0)).unwrap();
+    assert_eq!(state.snapshot().max_seen, 0);
+    assert_eq!(state.snapshot().bitset, 0);
+    assert_eq!(state.snapshot().list, vec![]);
 
-#[test]
-fn shared_id_pair_no_collisions() {
-    let shared = Shared::new();
+    state.post_authentication(&creds(32)).unwrap();
+    assert_eq!(state.snapshot().max_seen, 32);
+    // bitset tracks 0..=31
+    assert_eq!(state.snapshot().bitset, 0x8000_0000);
+    assert_eq!(state.snapshot().list, vec![]);
 
-    // Two random IDs. Exact constants shouldn't matter much, we're mainly aiming to test overall
-    // quality of our mapping from Id + KeyId.
-    let id1 = Id::from(u128::to_ne_bytes(0x25add729cce683cd0cda41d35436bdc6));
-    let id2 = Id::from(u128::to_ne_bytes(0x2862115d0691fe180f2aeb26af3c2e5e));
+    state.post_authentication(&creds(33)).unwrap();
+    assert_eq!(state.snapshot().max_seen, 33);
+    // bitset tracks 1..=32
+    assert_eq!(state.snapshot().bitset, 0x0000_0001);
+    assert_eq!(state.snapshot().list, vec![]);
 
-    for key_id in 0..SHARED_ENTRIES as u64 {
-        let index1 = shared.index(&Credentials {
-            id: id1,
-            key_id: KeyId::new(key_id).unwrap(),
-        });
-        let index2 = shared.index(&Credentials {
-            id: id2,
-            key_id: KeyId::new(key_id).unwrap(),
-        });
+    state.post_authentication(&creds(35)).unwrap();
+    assert_eq!(state.snapshot().max_seen, 35);
+    // bitset tracks 3..=34
+    assert_eq!(state.snapshot().bitset, 0x0000_0006);
+    assert_eq!(state.snapshot().list, vec![1, 2]);
 
-        // Our path secret IDs are sufficiently different that we expect that for any given index
-        // we map to a different slot. This test is not *really* saying much since it's highly
-        // dependent on the exact values of the path secret IDs, but it prevents simple bugs like
-        // ignoring the IDs entirely.
-        assert_ne!(index1, index2);
-    }
-}
+    state.post_authentication(&creds(70)).unwrap();
+    assert_eq!(state.snapshot().max_seen, 70);
+    // bitset tracks 38..=69
+    assert_eq!(state.snapshot().bitset, 0x0000_0000);
+    assert_eq!(
+        state.snapshot().list,
+        (1..=37)
+            .filter(|v| ![32, 33, 35].contains(v))
+            .collect::<Vec<_>>()
+    );
 
-// Confirms that we start out without any entries present in the map.
-#[test]
-fn shared_no_entries() {
-    let shared = Shared::new();
-    // We have to check all slots to be sure. The index used for lookup is going to be shuffled due
-    // to the hashing in of the secret. We need to use an all-zero path secret ID since the entries
-    // in the map start out zero-initialized today.
-    for key_id in 0..SHARED_ENTRIES as u64 {
-        assert_eq!(
-            shared.remove(&Credentials {
-                id: Id::from([0; 16]),
-                key_id: KeyId::new(key_id).unwrap(),
-            }),
-            Err(Error::Unknown)
-        );
-    }
+    // zero has fallen out of tracking
+    assert_eq!(
+        state.post_authentication(&creds(0)).unwrap_err(),
+        // FIXME: this should be AlreadyExists as we have not evicted any unseen entries yet.
+        Error::Unknown
+    );
 }

From 313e9773d8410682125f7152bc04f580bc2bcf3e Mon Sep 17 00:00:00 2001
From: Mark Rousskov <thismark@amazon.com>
Date: Fri, 24 Jan 2025 21:21:55 +0000
Subject: [PATCH 2/5] Add custom allocator for pooled variable-sized allocation

---
 dc/s2n-quic-dc/src/path/secret/receiver.rs    | 107 ++--
 .../src/path/secret/receiver/allocator.rs     | 483 ++++++++++++++++++
 .../path/secret/receiver/allocator/test.rs    | 132 +++++
 3 files changed, 689 insertions(+), 33 deletions(-)
 create mode 100644 dc/s2n-quic-dc/src/path/secret/receiver/allocator.rs
 create mode 100644 dc/s2n-quic-dc/src/path/secret/receiver/allocator/test.rs

diff --git a/dc/s2n-quic-dc/src/path/secret/receiver.rs b/dc/s2n-quic-dc/src/path/secret/receiver.rs
index 4320ec4549..79c43f489c 100644
--- a/dc/s2n-quic-dc/src/path/secret/receiver.rs
+++ b/dc/s2n-quic-dc/src/path/secret/receiver.rs
@@ -1,21 +1,34 @@
 // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
 // SPDX-License-Identifier: Apache-2.0
 
+use self::allocator::Allocator;
+use crate::credentials::{Credentials, KeyId};
+use std::alloc::Layout;
 use std::sync::{Arc, Mutex};
 
-use crate::credentials::{Credentials, KeyId};
+mod allocator;
 
 #[derive(Debug)]
 pub struct Shared {
-    // FIXME: Improve scalability by avoiding the global mutex.
-    // Most likely strategy is something like fixed-size which (in principle) allows per-entry
-    // Mutex's. Likely means dropping the slab dependency.
+    alloc: Allocator,
     entries: Mutex<slab::Slab<InnerState>>,
 }
 
+unsafe impl Send for Shared {}
+unsafe impl Sync for Shared {}
+
 impl Shared {
+    pub fn without_region() -> Arc<Shared> {
+        Arc::new(Shared {
+            alloc: Allocator::with_capacity(0),
+            entries: Mutex::new(slab::Slab::new()),
+        })
+    }
+
     pub fn new() -> Arc<Shared> {
         Arc::new(Shared {
+            // ~20MB
+            alloc: Allocator::with_capacity(20 * 1024 * 1024),
             entries: Mutex::new(slab::Slab::new()),
         })
     }
@@ -29,9 +42,9 @@ impl Shared {
         }
     }
 
-    fn remove(&self, entry: usize) {
+    fn remove(&self, entry: usize) -> InnerState {
         let mut guard = self.entries.lock().unwrap_or_else(|e| e.into_inner());
-        guard.remove(entry);
+        guard.remove(entry)
     }
 }
 
@@ -46,15 +59,43 @@ pub struct State {
 
 impl Drop for State {
     fn drop(&mut self) {
-        self.shared.remove(self.entry);
+        let entry = self.shared.remove(self.entry);
+        if let Some(handle) = entry.shared {
+            // SAFETY: Entry is being dropped, so this is called at most once.
+            unsafe { self.shared.alloc.deallocate(handle as usize) };
+        }
     }
 }
 
+// KeyIDs move through two filters:
+//
+// * `max_seen` + bitset absorbs traffic with minimal reordering. Conceptually they are a single
+//   33-bit bitset ending at (inclusively) `max_seen`. 1-bits indicate seen entries. This is
+//   currently expected to be enough to absorb the vast majority (>99.99%) of traffic seen in
+//   practice. This space is always available to every Path Secret.
+// * If we don't see a key ID (i.e., we shift out a zero bit from the bitset) we insert into a list
+//   or bitset within the Shared state. This list tracks *only* unseen entries, so we expect it to
+//   generally be short. Currently the list can track entries within a region 2**16 wide. Note that
+//   this region is independent of `max_seen` and so only needs to potentially be changed if we
+//   evict a zero bit (which happens pretty rarely), and even then only if we still haven't caught
+//   a packet that's 2**16 old. See more details on `SortedListHeader` and `BitsetHeader`.
 #[derive(Clone, Debug)]
 pub struct InnerState {
     max_seen: u64,
+
+    // Any key ID > to this is either AlreadyExists or Ok.
+    // Note that == is Unknown, since += 1 is *not* a safe operation.
+    //
+    // This is updated when we evict from the list/bitset (i.e., drop a still-Ok value).
+    minimum_evicted: u64,
+
+    // Directly stored bitset.
     bitset: u32,
-    // FIXME: simple, later move into shared memory.
+
+    // Index into the shared allocator's entry array, if any.
+    shared: Option<u32>,
+
+    // FIXME: Move into shared allocation.
     list: Vec<u64>,
 }
 
@@ -62,20 +103,34 @@ impl InnerState {
     fn new() -> Self {
         Self {
             max_seen: u64::MAX,
+            minimum_evicted: u64::MAX,
             bitset: 0,
-            list: Vec::new(),
+            shared: None,
+
+            list: vec![],
         }
     }
+
+    // Iterate over the unseen IDs that were > previous max seen, and
+    // will not *become* tracked now (i.e., don't fall into the new bitset).
+    //
+    // The bitset tracks (max_seen-32)..=(max_seen-1)
+    fn skipped_bitset(&self, previous_max: Option<u64>) -> std::ops::Range<u64> {
+        let end = self.max_seen.saturating_sub(u32::BITS as u64);
+        // Push start up so we don't push more than 65k elements, which is our list limit.
+        // This avoids a too-long loop if we jump forward too much.
+        let start = match previous_max {
+            Some(previous_max) => (previous_max + 1).max(end.saturating_sub(u16::MAX as u64)),
+            None => end.saturating_sub(u16::MAX as u64),
+        };
+        start..end
+    }
 }
 
 impl State {
     pub fn without_shared() -> State {
-        let mut entries = Mutex::new(slab::Slab::with_capacity(1));
-        entries.get_mut().unwrap().insert(InnerState::new());
-        State {
-            shared: Arc::new(Shared { entries }),
-            entry: 0,
-        }
+        let shared = Shared::without_region();
+        shared.new_receiver()
     }
 
     #[cfg(test)]
@@ -110,15 +165,7 @@ impl State {
             // no need to touch the bitset, we've not seen any of the previous entries.
             entry.max_seen = *credentials.key_id;
 
-            // Iterate over the unseen IDs that were > previous max seen, and
-            // will not *become* tracked now (i.e., don't fall into the new bitset).
-            //
-            // The bitset tracks (max_seen-32)..=(max_seen-1)
-            let end = entry.max_seen.saturating_sub(u32::BITS as u64);
-            // Push start up so we don't push more than 65k elements, which is our list limit.
-            // This avoids a too-long loop if we jump forward too much.
-            let start = end.saturating_sub(u16::MAX as u64);
-            for id in start..end {
+            for id in entry.skipped_bitset(None) {
                 entry.list.push(id);
             }
 
@@ -146,15 +193,7 @@ impl State {
                 }
             }
 
-            // Iterate over the unseen IDs that were > previous max seen, and
-            // will not *become* tracked now (i.e., don't fall into the new bitset).
-            //
-            // The bitset tracks (max_seen-32)..=(max_seen-1)
-            let end = entry.max_seen.saturating_sub(u32::BITS as u64);
-            // Push start up so we don't push more than 65k elements, which is our list limit.
-            // This avoids a too-long loop if we jump forward too much.
-            let start = (previous_max + 1).max(end.saturating_sub(u16::MAX as u64));
-            for id in start..end {
+            for id in entry.skipped_bitset(Some(previous_max)) {
                 entry.list.push(id);
             }
 
@@ -189,6 +228,8 @@ impl State {
                 // FIXME: augment with bitset for fast removal
                 entry.list.remove(idx);
                 Ok(())
+            } else if *credentials.key_id > entry.minimum_evicted {
+                Err(Error::AlreadyExists)
             } else {
                 Err(Error::Unknown)
             }
diff --git a/dc/s2n-quic-dc/src/path/secret/receiver/allocator.rs b/dc/s2n-quic-dc/src/path/secret/receiver/allocator.rs
new file mode 100644
index 0000000000..053c00abb0
--- /dev/null
+++ b/dc/s2n-quic-dc/src/path/secret/receiver/allocator.rs
@@ -0,0 +1,483 @@
+//! Slab allocator drawing from a fixed arena.
+//!
+//! The arena is allocated at initialization time, providing a fixed memory region from which to
+//! allocate entries from. We support allocating a compile-time fixed set of types, however, the
+//! internals are mostly uncaring about *what* that set is (including the size).
+//!
+//! The arena has three types of pages:
+//!
+//! * Free (empty)
+//! * Partially allocated
+//! * Fully initialized/allocated
+//!
+//! Initially, all pages start empty. When an allocation request is made, a page is moved into the
+//! partially allocated state. A u16 counter is placed at the top of the page for the # of entries
+//! allocated so far,. A given size class always allocates from this page until the page is
+//! exhausted. A page, when moved into the partially allocated state, is also threaded into an
+//! intrusive doubly linked list of allocated pages for this size class. This list supports
+//! deallocation operations. A partially-empty page, if it exists, is always at the top of this
+//! list.
+//!
+//! Effectively, we have a `LinkedList<Vec<T>>` for each T, with at most one of the Vecs being
+//! non-fixed-size.
+//!
+//! On deallocation, we swap the entry we just allocated with one from the top of the page list.
+//! This ensures that at most one page for this type is not contiguously allocated, meaning that
+//! wasted memory due to fragmentation is bounded to a single page per allocatable type.
+
+#![allow(dead_code)]
+
+use std::alloc::Layout;
+use std::collections::{BTreeMap, HashMap, VecDeque};
+use std::ptr::NonNull;
+use std::sync::Mutex;
+
+#[derive(Debug)]
+pub struct Allocator {
+    inner: Mutex<AllocatorInner>,
+}
+
+#[derive(Debug)]
+struct AllocatorInner {
+    // layout and region are only used for Drop, otherwise we always manage through the other
+    // fields.
+    layout: Layout,
+    region: NonNull<u8>,
+
+    free_pages: Vec<NonNull<u8>>,
+
+    // This slab indirects into the allocator's internally memory, allowing us to move allocated
+    // entries ~transparently to callers.
+    //
+    // FIXME: Remove the indirection by moving the containing memory (path secret entries) into the
+    // allocator and/or guarantee that they don't move without invalidation of the child via `Pin`.
+    parents: slab::Slab<parking_lot::Mutex<Option<u32>>>,
+
+    // These are lists of *allocated* entries.
+    allocated_pages: BTreeMap<BySize, VecDeque<NonNull<u8>>>,
+
+    // For each size class, the page we are currently allocating from.
+    //
+    // When this runs out we can grab a new region from `free_pages`.
+    reserved_page: HashMap<Layout, Option<NonNull<u8>>>,
+}
+
+#[derive(Debug, PartialEq, Eq)]
+struct BySize(Layout);
+
+impl std::ops::Deref for BySize {
+    type Target = Layout;
+
+    fn deref(&self) -> &Self::Target {
+        &self.0
+    }
+}
+
+impl Ord for BySize {
+    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+        self.0
+            .size()
+            .cmp(&other.0.size())
+            .then(self.0.align().cmp(&other.0.align()))
+    }
+}
+
+impl PartialOrd for BySize {
+    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+        Some(self.cmp(other))
+    }
+}
+
+// Note: must contain at least one entry for every size class.
+// 2**14 is sufficient for our purposes.
+const PAGE_SIZE: usize = 1 << 13;
+
+impl Allocator {
+    pub fn with_capacity(capacity: usize) -> Allocator {
+        let layout =
+            Layout::from_size_align(capacity.next_multiple_of(PAGE_SIZE), PAGE_SIZE).unwrap();
+        let region = unsafe { NonNull::new(std::alloc::alloc(layout)).unwrap() };
+        // ensures step_by does the right thing.
+        assert_eq!(layout.size() % PAGE_SIZE, 0);
+        let free_pages: Vec<_> = (0..layout.size())
+            .step_by(PAGE_SIZE)
+            .map(|offset| unsafe { dbg!(NonNull::new(region.as_ptr().add(offset)).unwrap()) })
+            .collect();
+        let end = (region.as_ptr() as usize)
+            .checked_add(layout.size())
+            .unwrap();
+        for page in free_pages.iter().copied() {
+            let start = (page.as_ptr() as usize).checked_add(PAGE_SIZE).unwrap();
+            assert!(start <= end, "{:x} < {:x}", start, end);
+
+            let end = page.as_ptr().wrapping_add(PAGE_SIZE & !(PAGE_SIZE - 1));
+            assert_eq!(
+                page.as_ptr() as usize,
+                (end as usize - 1) & !(PAGE_SIZE - 1)
+            );
+        }
+        let inner = AllocatorInner {
+            layout,
+            region,
+            parents: slab::Slab::new(),
+            free_pages,
+            allocated_pages: Default::default(),
+            reserved_page: Default::default(),
+        };
+        Allocator {
+            inner: Mutex::new(inner),
+        }
+    }
+
+    /// Allocate `Layout`.
+    ///
+    /// Returns a handle which can be used to lookup the allocation.
+    pub fn allocate(&self, layout: Layout) -> usize {
+        self.inner
+            .lock()
+            .unwrap_or_else(|e| e.into_inner())
+            .allocate(layout)
+    }
+
+    pub fn read_allocation(&self, handle: usize) -> Option<AllocationGuard<'_>> {
+        let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
+
+        let guard = inner.parents[handle].lock();
+        let Some(offset) = *guard else {
+            return None;
+        };
+
+        parking_lot::MutexGuard::leak(guard);
+
+        Some(AllocationGuard {
+            this: self,
+            mutex: handle,
+            ptr: unsafe {
+                NonNull::new(inner.region.as_ptr().add(usize::try_from(offset).unwrap())).unwrap()
+            },
+        })
+    }
+
+    /// Must only be called once.
+    pub unsafe fn deallocate(&self, handle: usize) {
+        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
+
+        let entry = inner.parents.remove(handle);
+        // FIXME: ABA avoidance needed?
+        assert!(!entry.is_locked());
+        let Some(offset) = entry.into_inner() else {
+            // If already deallocated, nothing more to do.
+            return;
+        };
+        let deallocate_ptr = unsafe { inner.region.as_ptr().add(usize::try_from(offset).unwrap()) };
+        // Round the pointer we are deallocating down to the page size, giving us a pointer to the
+        // start of the page.
+        let deallocating_page = map_addr(deallocate_ptr, |addr| addr & !(PAGE_SIZE - 1));
+        let layout = deallocating_page
+            .add(std::mem::size_of::<u16>())
+            .cast::<Layout>()
+            .read_unaligned();
+
+        // Lookup `layout` (after adjustment) in reserved_page, if possible, popping the last
+        // allocated entry, and potentially freeing the page. If not possible, that means
+        // reserved_page is empty;
+        if let Some(Some(start)) = inner.reserved_page.get_mut(&layout) {
+            unsafe {
+                let page_start = start.as_ptr();
+
+                let mut page = PageWithLayout {
+                    start: page_start,
+                    layout,
+                };
+
+                page.pop_to(&mut inner, deallocate_ptr);
+
+                if page.count() == 0 {
+                    // remove this reserved page.
+                    assert!(inner.reserved_page.remove(&layout).is_some());
+
+                    // we already popped the page, so just put it back on the free pages list.
+                    inner.free_pages.push(NonNull::new(page_start).unwrap());
+                }
+
+                return;
+            }
+        }
+
+        // Lookup `layout` in allocated pages and pop an entry off of a page, moving that page into
+        // `reserved` and replacing the deallocated entry with the popped one.
+        //
+        // Note that no reserved page exists (since we didn't exit above).
+        if let Some(pages) = inner.allocated_pages.get_mut(&BySize(layout)) {
+            unsafe {
+                let page_start = pages.pop_back().unwrap().as_ptr();
+
+                let mut page = PageWithLayout {
+                    start: page_start,
+                    layout,
+                };
+
+                page.pop_to(&mut inner, deallocate_ptr);
+
+                // This is reachable if this is the only entry on the page.
+                if page.count() == 0 {
+                    // we already popped the page, so just put it back on the free pages list.
+                    inner.free_pages.push(NonNull::new(page_start).unwrap());
+                }
+
+                // OK, at this point `deallocated` is filled, and we have a page with N-1 entries
+                // that we add to reserved_page.
+                inner
+                    .reserved_page
+                    .insert(layout, Some(NonNull::new(page_start).unwrap()));
+
+                return;
+            }
+        }
+
+        // This entry cannot be in a partial page pre-dealloc (it would be a reserved page in that
+        // case) and it cannot be a full page (2nd case was hit above). No other cases exist.
+        unreachable!()
+    }
+}
+
+struct PageWithLayout {
+    start: *mut u8,
+    layout: Layout,
+}
+
+impl PageWithLayout {
+    fn count(&self) -> u16 {
+        unsafe { self.start.cast::<u16>().read() }
+    }
+
+    fn add_count(&mut self, v: i16) {
+        unsafe {
+            *self.start.cast::<u16>() = self.count().checked_add_signed(v).unwrap();
+        }
+    }
+
+    /// Pop an entry from this page and move the contents into `dest`.
+    ///
+    /// `dest` is assumed to be an entry (possibly on this page) which is of size `layout`.
+    unsafe fn pop_to(&mut self, inner: &mut AllocatorInner, dest: *mut u8) {
+        let page_end = self.start.add(PAGE_SIZE & !(self.layout.align() - 1));
+
+        let last_allocated = page_end.sub(self.layout.size() * self.count() as usize);
+        // last_allocated.is_aligned_to(layout.align()), except stable.
+        assert!(last_allocated as usize & (self.layout.align() - 1) == 0);
+
+        self.add_count(-1);
+
+        // If we are deallocating the last entry on this page, no parent swapping is
+        // needed, we can just drop it.
+        if last_allocated != dest {
+            // Lock the parent pointer for the entry we're popping and replacing the
+            // deallocated entry with.
+            let src_parent_idx = last_allocated.cast::<u32>().read() as usize;
+            let mut src_parent = inner.parents[src_parent_idx].lock();
+            assert!(src_parent.is_some());
+
+            // Copy the data into the now-deallocated entry.
+            dest.copy_from_nonoverlapping(last_allocated, self.layout.size());
+
+            // Update parent pointer to point to new entry.
+            *src_parent =
+                Some(u32::try_from(dest as usize - inner.region.as_ptr() as usize).unwrap());
+            drop(src_parent);
+        }
+    }
+
+    /// Add an entry to this page.
+    ///
+    /// Returns Some(entry address) if this was successful.
+    fn push(&mut self) -> Option<*mut u8> {
+        unsafe {
+            let page_end = self.start.add(PAGE_SIZE & !(self.layout.align() - 1));
+
+            let last_allocated = page_end.sub(self.layout.size() * self.count() as usize);
+            // last_allocated.is_aligned_to(layout.align()), except stable.
+            assert!(last_allocated as usize & (self.layout.align() - 1) == 0);
+
+            let new_allocation = last_allocated.wrapping_sub(self.layout.size());
+
+            if new_allocation
+                >= self
+                    .start
+                    .add(std::mem::size_of::<u16>())
+                    .add(std::mem::size_of::<Layout>())
+            {
+                self.add_count(1);
+                Some(new_allocation)
+            } else {
+                None
+            }
+        }
+    }
+}
+
+pub struct AllocationGuard<'a> {
+    this: &'a Allocator,
+    mutex: usize,
+    ptr: NonNull<u8>,
+}
+
+impl AllocationGuard<'_> {
+    fn as_ptr(&self) -> NonNull<u8> {
+        self.ptr
+    }
+}
+
+impl Drop for AllocationGuard<'_> {
+    fn drop(&mut self) {
+        // creation leaked the guard, so now we force unlock.
+        unsafe {
+            self.this
+                .inner
+                .lock()
+                .unwrap_or_else(|e| e.into_inner())
+                .parents[self.mutex]
+                .force_unlock();
+        }
+    }
+}
+
+impl AllocatorInner {
+    fn allocate(&mut self, layout: Layout) -> usize {
+        // Add parent pointer field.
+        let (layout, _) = Layout::new::<u32>().extend(layout).unwrap();
+        let layout = layout.pad_to_align();
+
+        let reserved = self.reserved_page.entry(layout).or_insert_with(|| None);
+        let align = layout.align();
+        assert!(align.is_power_of_two());
+
+        let allocation = 'allocate: loop {
+            match reserved {
+                Some(page_start) => {
+                    let mut page = PageWithLayout {
+                        start: page_start.as_ptr(),
+                        layout,
+                    };
+                    if let Some(ptr) = page.push() {
+                        break ptr;
+                    } else {
+                        // move before the counter
+                        self.allocated_pages
+                            .entry(BySize(layout))
+                            .or_default()
+                            .push_back(*page_start);
+                        // no more entries left...
+                        *reserved = None;
+                        // fallthrough
+                    }
+                }
+                None => {}
+            }
+
+            // Ok, we failed to pull from the reserved page, re-populate reserved.
+            if let Some(page) = self.free_pages.pop() {
+                unsafe {
+                    // Each page has a u16 counter at the front of allocated entries.
+                    // Initialize the counter.
+                    // It is discoverable later by aligning the end of the page down.
+                    page.as_ptr().cast::<u16>().write(0);
+                    page.as_ptr()
+                        .add(std::mem::size_of::<u16>())
+                        .cast::<Layout>()
+                        .write_unaligned(layout);
+                    *reserved = Some(page);
+
+                    // and loop around to allocate from the reserved page...
+                    continue;
+                }
+            }
+
+            // Ok, no free pages left either, we need to deallocate entries.
+            for (page_layout, pages) in self.allocated_pages.iter_mut().rev() {
+                let Some(page) = pages.pop_front() else {
+                    continue;
+                };
+
+                // OK, we are going to empty this page and return it to free pages, which will then
+                // move it into the reserved pages for `layout`.
+                //
+                // We need to deallocate all of the entries on this page (in theory we can try to
+                // move them into a reserved page if it's available, but for simplicitly we're just
+                // deallocating right now).
+                //
+                // FIXME: This does mean that when we call allocate() to *grow* we might actually
+                // deallocate our own memory. That's not ideal, but seems like an OK failure mode -
+                // one workaround could be to pin the local page to avoid using it.
+
+                // SAFETY: `count` is protected by the mutex we're in and pages are initialized to
+                // a zero count when we move them out of free pages.
+                let count = unsafe { page.as_ptr().cast::<u16>().read() };
+
+                unsafe {
+                    let mut next = page.as_ptr().add(PAGE_SIZE);
+                    for _ in 0..count {
+                        next = map_addr(next, |v| {
+                            v.checked_sub(page_layout.size())
+                                .map(|v| v & !(align - 1))
+                                // should never overflow / underflow since we're iterating by `count`.
+                                .unwrap()
+                        });
+
+                        // We prepend a u32 to layouts which contains the parent index.
+                        let parent = next.cast::<u32>().read();
+
+                        // Mark the parent as deallocated.
+                        *self.parents[parent as usize].lock() = None;
+                    }
+                }
+
+                // All entries on the page have been deallocated and are no longer in use, so this
+                // page is now free.
+                self.free_pages.push(page);
+
+                // We don't need more than one page to be freed, so break out.
+                continue 'allocate;
+            }
+
+            unreachable!("if no free pages must have at least some allocated pages")
+        };
+
+        // OK, we've allocated a block of memory (allocation). Now we need to initialize the parent
+        // pointer.
+
+        let parent_idx = self.parents.insert(parking_lot::Mutex::new(Some(
+            u32::try_from(allocation as usize - self.region.as_ptr() as usize).unwrap(),
+        )));
+        unsafe {
+            allocation
+                .cast::<u32>()
+                .write(u32::try_from(parent_idx).unwrap());
+        }
+
+        parent_idx
+    }
+}
+
+#[cfg(miri)]
+fn map_addr<T>(v: *mut T, mapper: impl FnOnce(usize) -> usize) -> *mut T {
+    v.map_addr(mapper)
+}
+
+// Actually this is "new enough Rust", i.e., support for Strict Provenance.
+// Remove when we bump MSRV to 1.84.
+#[cfg(not(miri))]
+fn map_addr<T>(v: *mut T, mapper: impl FnOnce(usize) -> usize) -> *mut T {
+    mapper(v as usize) as *mut T
+}
+
+impl Drop for AllocatorInner {
+    fn drop(&mut self) {
+        unsafe {
+            std::alloc::dealloc(self.region.as_ptr(), self.layout);
+        }
+    }
+}
+
+#[cfg(test)]
+mod test;
diff --git a/dc/s2n-quic-dc/src/path/secret/receiver/allocator/test.rs b/dc/s2n-quic-dc/src/path/secret/receiver/allocator/test.rs
new file mode 100644
index 0000000000..856f7058d6
--- /dev/null
+++ b/dc/s2n-quic-dc/src/path/secret/receiver/allocator/test.rs
@@ -0,0 +1,132 @@
+use super::Allocator;
+use std::alloc::Layout;
+
+#[test]
+fn trivial_check() {
+    let allocator = Allocator::with_capacity(8192);
+    let handle1 = allocator.allocate(Layout::new::<u32>());
+    let handle2 = allocator.allocate(Layout::new::<u32>());
+    let ptr1 = allocator.read_allocation(handle1).unwrap();
+    let ptr2 = allocator.read_allocation(handle2).unwrap();
+    assert_ne!(ptr1.as_ptr(), ptr2.as_ptr());
+    drop(ptr1);
+    drop(ptr2);
+    unsafe {
+        allocator.deallocate(handle1);
+        allocator.deallocate(handle2);
+    }
+}
+
+#[test]
+fn fills_page() {
+    // 1 means we allocate a single page
+    let allocator = Allocator::with_capacity(1);
+    let mut handles = vec![];
+    for _ in 0..1021 {
+        handles.push(allocator.allocate(Layout::new::<u32>()));
+    }
+    let mut count = 0;
+    for handle in handles.iter() {
+        count += allocator.read_allocation(*handle).is_some() as usize;
+    }
+    assert_eq!(count, handles.len());
+}
+
+#[test]
+fn allocates_indefinitely() {
+    // 1 means we allocate a single page
+    let allocator = Allocator::with_capacity(1);
+    assert_eq!(allocator.inner.lock().unwrap().free_pages.len(), 1);
+    let layout = Layout::new::<u32>();
+    let mut handles = vec![];
+    for _ in 0..(1021 * if cfg!(miri) { 2 } else { 1000 }) {
+        handles.push(allocator.allocate(layout));
+    }
+    let mut count = 0;
+    for handle in handles {
+        count += allocator.read_allocation(handle).is_some() as usize;
+
+        unsafe {
+            allocator.deallocate(handle);
+        }
+    }
+    // no fragmentation - we emptied a bunch of pages but we still have a full page allocated at
+    // the end.
+    assert_eq!(count, 1021);
+}
+
+#[test]
+fn allocate_and_deallocate_multipage() {
+    let allocator = Allocator::with_capacity(super::PAGE_SIZE * 3);
+    assert_eq!(allocator.inner.lock().unwrap().free_pages.len(), 3);
+    let mut handles = vec![];
+    let layout = Layout::new::<u32>();
+    for _ in 0..3000 {
+        handles.push(allocator.allocate(layout));
+    }
+    let mut count = 0;
+    for handle in handles.iter() {
+        count += allocator.read_allocation(*handle).is_some() as usize;
+    }
+    assert_eq!(count, 3000);
+
+    for handle in handles {
+        unsafe {
+            allocator.deallocate(handle);
+        }
+    }
+}
+
+#[test]
+fn allocate_and_deallocate_multilayout() {
+    let allocator = Allocator::with_capacity(super::PAGE_SIZE * 10);
+    assert_eq!(allocator.inner.lock().unwrap().free_pages.len(), 10);
+    let mut handles = vec![];
+    let layout1 = Layout::new::<[u32; 1]>();
+    let layout2 = Layout::new::<[u32; 2]>();
+    let layout3 = Layout::new::<[u32; 3]>();
+    for _ in 0..1000 {
+        handles.push(allocator.allocate(layout1));
+    }
+    for _ in 0..1000 {
+        handles.push(allocator.allocate(layout2));
+    }
+    for _ in 0..1000 {
+        handles.push(allocator.allocate(layout3));
+    }
+    let mut count = 0;
+    for handle in handles.iter() {
+        count += allocator.read_allocation(*handle).is_some() as usize;
+    }
+    assert_eq!(count, 3000);
+
+    for handle in handles[..1000].iter() {
+        unsafe {
+            allocator.deallocate(*handle);
+        }
+    }
+    for handle in handles[1000..2000].iter() {
+        unsafe {
+            allocator.deallocate(*handle);
+        }
+    }
+    for handle in handles[2000..].iter() {
+        unsafe {
+            allocator.deallocate(*handle);
+        }
+    }
+}
+
+#[test]
+fn reuse_handle() {
+    let allocator = Allocator::with_capacity(1);
+    let handle1 = allocator.allocate(Layout::new::<u32>());
+    unsafe {
+        allocator.deallocate(handle1);
+    }
+    let handle2 = allocator.allocate(Layout::new::<u32>());
+    unsafe {
+        allocator.deallocate(handle2);
+    }
+    assert_eq!(handle1, handle2);
+}

From ea96553ef78df594138c269b15228b6785767d4c Mon Sep 17 00:00:00 2001
From: Mark Rousskov <thismark@amazon.com>
Date: Tue, 28 Jan 2025 15:12:26 +0000
Subject: [PATCH 3/5] Integrate list with allocator

---
 dc/s2n-quic-dc/src/path/secret/receiver.rs    | 97 ++++++++++++++++---
 .../src/path/secret/receiver/allocator.rs     | 38 +++++++-
 .../path/secret/receiver/allocator/test.rs    | 20 ++--
 3 files changed, 126 insertions(+), 29 deletions(-)

diff --git a/dc/s2n-quic-dc/src/path/secret/receiver.rs b/dc/s2n-quic-dc/src/path/secret/receiver.rs
index 79c43f489c..2fd25d57db 100644
--- a/dc/s2n-quic-dc/src/path/secret/receiver.rs
+++ b/dc/s2n-quic-dc/src/path/secret/receiver.rs
@@ -60,9 +60,9 @@ pub struct State {
 impl Drop for State {
     fn drop(&mut self) {
         let entry = self.shared.remove(self.entry);
-        if let Some(handle) = entry.shared {
+        if let SharedIndex::Bitset(handle) | SharedIndex::Array(handle) = entry.shared.unpack() {
             // SAFETY: Entry is being dropped, so this is called at most once.
-            unsafe { self.shared.alloc.deallocate(handle as usize) };
+            unsafe { self.shared.alloc.deallocate(handle) };
         }
     }
 }
@@ -83,29 +83,86 @@ impl Drop for State {
 pub struct InnerState {
     max_seen: u64,
 
+    // Directly stored bitset, adjacent to max_seen.
+    bitset: u32,
+
     // Any key ID > to this is either AlreadyExists or Ok.
     // Note that == is Unknown, since += 1 is *not* a safe operation.
     //
     // This is updated when we evict from the list/bitset (i.e., drop a still-Ok value).
+    // FIXME: actually not updated today, because we need to thread this into deallocation for
+    // proper updates.
     minimum_evicted: u64,
 
-    // Directly stored bitset.
-    bitset: u32,
-
-    // Index into the shared allocator's entry array, if any.
-    shared: Option<u32>,
+    // Index into the shared allocator's parents/entry array, if any.
+    shared: SharedIndexMemory,
 
     // FIXME: Move into shared allocation.
     list: Vec<u64>,
 }
 
+// "u24" indices keep the in-memory size down.
+#[derive(Copy, Clone)]
+enum SharedIndexMemory {
+    None,
+    Array([u8; 3]),
+    Bitset([u8; 3]),
+}
+
+impl std::fmt::Debug for SharedIndexMemory {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        self.unpack().fmt(f)
+    }
+}
+
+#[derive(Debug, Copy, Clone)]
+enum SharedIndex {
+    None,
+    Array(usize),
+    Bitset(usize),
+}
+
+impl SharedIndexMemory {
+    fn unpack(self) -> SharedIndex {
+        match self {
+            SharedIndexMemory::None => SharedIndex::None,
+            SharedIndexMemory::Array([a, b, c]) => {
+                SharedIndex::Array(u32::from_le_bytes([0, a, b, c]) as usize)
+            }
+            SharedIndexMemory::Bitset([a, b, c]) => {
+                SharedIndex::Bitset(u32::from_le_bytes([0, a, b, c]) as usize)
+            }
+        }
+    }
+}
+
+impl SharedIndex {
+    fn pack(self) -> SharedIndexMemory {
+        match self {
+            SharedIndex::None => SharedIndexMemory::None,
+            SharedIndex::Array(i) => {
+                assert!(i < (1 << 24));
+                let [a, b, c, d] = (i as u32).to_le_bytes();
+                assert!(a == 0);
+                SharedIndexMemory::Array([b, c, d])
+            }
+            SharedIndex::Bitset(i) => {
+                assert!(i < (1 << 24));
+                let [a, b, c, d] = (i as u32).to_le_bytes();
+                assert!(a == 0);
+                SharedIndexMemory::Bitset([b, c, d])
+            }
+        }
+    }
+}
+
 impl InnerState {
     fn new() -> Self {
         Self {
             max_seen: u64::MAX,
             minimum_evicted: u64::MAX,
             bitset: 0,
-            shared: None,
+            shared: SharedIndexMemory::None,
 
             list: vec![],
         }
@@ -166,7 +223,7 @@ impl State {
             entry.max_seen = *credentials.key_id;
 
             for id in entry.skipped_bitset(None) {
-                entry.list.push(id);
+                self.push_list(entry, id);
             }
 
             Ok(())
@@ -189,12 +246,12 @@ impl State {
                     continue;
                 };
                 if entry.bitset & (1 << bit) == 0 {
-                    entry.list.push(id);
+                    self.push_list(entry, id);
                 }
             }
 
             for id in entry.skipped_bitset(Some(previous_max)) {
-                entry.list.push(id);
+                self.push_list(entry, id);
             }
 
             if delta <= u32::BITS as u64 {
@@ -224,9 +281,7 @@ impl State {
                     entry.bitset |= 1 << (delta - 1) as u32;
                     Ok(())
                 }
-            } else if let Ok(idx) = entry.list.binary_search(&*credentials.key_id) {
-                // FIXME: augment with bitset for fast removal
-                entry.list.remove(idx);
+            } else if let Ok(()) = self.try_remove_list(entry, *credentials.key_id) {
                 Ok(())
             } else if *credentials.key_id > entry.minimum_evicted {
                 Err(Error::AlreadyExists)
@@ -235,6 +290,20 @@ impl State {
             }
         }
     }
+
+    fn push_list(&self, entry: &mut InnerState, id: u64) {
+        entry.list.push(id);
+    }
+
+    fn try_remove_list(&self, entry: &mut InnerState, id: u64) -> Result<(), ()> {
+        if let Ok(idx) = entry.list.binary_search(&id) {
+            // FIXME: augment with bitset for fast removal
+            entry.list.remove(idx);
+            Ok(())
+        } else {
+            Err(())
+        }
+    }
 }
 
 impl super::map::SizeOf for State {}
diff --git a/dc/s2n-quic-dc/src/path/secret/receiver/allocator.rs b/dc/s2n-quic-dc/src/path/secret/receiver/allocator.rs
index 053c00abb0..a3fcdbccc4 100644
--- a/dc/s2n-quic-dc/src/path/secret/receiver/allocator.rs
+++ b/dc/s2n-quic-dc/src/path/secret/receiver/allocator.rs
@@ -132,11 +132,11 @@ impl Allocator {
     /// Allocate `Layout`.
     ///
     /// Returns a handle which can be used to lookup the allocation.
-    pub fn allocate(&self, layout: Layout) -> usize {
-        self.inner
-            .lock()
-            .unwrap_or_else(|e| e.into_inner())
-            .allocate(layout)
+    pub fn allocate(&self, layout: Layout) -> AllocationGuard<'_> {
+        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
+        let handle = inner.allocate(layout);
+        // this allocation cannot be freed yet as we didn't release the `inner` lock.
+        inner.read_allocation(self, handle).unwrap()
     }
 
     pub fn read_allocation(&self, handle: usize) -> Option<AllocationGuard<'_>> {
@@ -326,6 +326,10 @@ impl AllocationGuard<'_> {
     fn as_ptr(&self) -> NonNull<u8> {
         self.ptr
     }
+
+    fn handle(&self) -> usize {
+        self.mutex
+    }
 }
 
 impl Drop for AllocationGuard<'_> {
@@ -457,6 +461,30 @@ impl AllocatorInner {
 
         parent_idx
     }
+
+    fn read_allocation<'a>(
+        &self,
+        parent: &'a Allocator,
+        handle: usize,
+    ) -> Option<AllocationGuard<'a>> {
+        let guard = self.parents[handle].lock();
+        let Some(offset) = *guard else {
+            return None;
+        };
+
+        // FIXME: we leak this guard, and then release the `inner` mutex lock which is a bit
+        // problematic since &mut could let you get_mut() on the Mutex... some safety condition is
+        // probably missing somewhere.
+        parking_lot::MutexGuard::leak(guard);
+
+        Some(AllocationGuard {
+            this: parent,
+            mutex: handle,
+            ptr: unsafe {
+                NonNull::new(self.region.as_ptr().add(usize::try_from(offset).unwrap())).unwrap()
+            },
+        })
+    }
 }
 
 #[cfg(miri)]
diff --git a/dc/s2n-quic-dc/src/path/secret/receiver/allocator/test.rs b/dc/s2n-quic-dc/src/path/secret/receiver/allocator/test.rs
index 856f7058d6..39b0a017d8 100644
--- a/dc/s2n-quic-dc/src/path/secret/receiver/allocator/test.rs
+++ b/dc/s2n-quic-dc/src/path/secret/receiver/allocator/test.rs
@@ -4,8 +4,8 @@ use std::alloc::Layout;
 #[test]
 fn trivial_check() {
     let allocator = Allocator::with_capacity(8192);
-    let handle1 = allocator.allocate(Layout::new::<u32>());
-    let handle2 = allocator.allocate(Layout::new::<u32>());
+    let handle1 = allocator.allocate(Layout::new::<u32>()).handle();
+    let handle2 = allocator.allocate(Layout::new::<u32>()).handle();
     let ptr1 = allocator.read_allocation(handle1).unwrap();
     let ptr2 = allocator.read_allocation(handle2).unwrap();
     assert_ne!(ptr1.as_ptr(), ptr2.as_ptr());
@@ -23,7 +23,7 @@ fn fills_page() {
     let allocator = Allocator::with_capacity(1);
     let mut handles = vec![];
     for _ in 0..1021 {
-        handles.push(allocator.allocate(Layout::new::<u32>()));
+        handles.push(allocator.allocate(Layout::new::<u32>()).handle());
     }
     let mut count = 0;
     for handle in handles.iter() {
@@ -40,7 +40,7 @@ fn allocates_indefinitely() {
     let layout = Layout::new::<u32>();
     let mut handles = vec![];
     for _ in 0..(1021 * if cfg!(miri) { 2 } else { 1000 }) {
-        handles.push(allocator.allocate(layout));
+        handles.push(allocator.allocate(layout).handle());
     }
     let mut count = 0;
     for handle in handles {
@@ -62,7 +62,7 @@ fn allocate_and_deallocate_multipage() {
     let mut handles = vec![];
     let layout = Layout::new::<u32>();
     for _ in 0..3000 {
-        handles.push(allocator.allocate(layout));
+        handles.push(allocator.allocate(layout).handle());
     }
     let mut count = 0;
     for handle in handles.iter() {
@@ -86,13 +86,13 @@ fn allocate_and_deallocate_multilayout() {
     let layout2 = Layout::new::<[u32; 2]>();
     let layout3 = Layout::new::<[u32; 3]>();
     for _ in 0..1000 {
-        handles.push(allocator.allocate(layout1));
+        handles.push(allocator.allocate(layout1).handle());
     }
     for _ in 0..1000 {
-        handles.push(allocator.allocate(layout2));
+        handles.push(allocator.allocate(layout2).handle());
     }
     for _ in 0..1000 {
-        handles.push(allocator.allocate(layout3));
+        handles.push(allocator.allocate(layout3).handle());
     }
     let mut count = 0;
     for handle in handles.iter() {
@@ -120,11 +120,11 @@ fn allocate_and_deallocate_multilayout() {
 #[test]
 fn reuse_handle() {
     let allocator = Allocator::with_capacity(1);
-    let handle1 = allocator.allocate(Layout::new::<u32>());
+    let handle1 = allocator.allocate(Layout::new::<u32>()).handle();
     unsafe {
         allocator.deallocate(handle1);
     }
-    let handle2 = allocator.allocate(Layout::new::<u32>());
+    let handle2 = allocator.allocate(Layout::new::<u32>()).handle();
     unsafe {
         allocator.deallocate(handle2);
     }

From 856d481a454f94b2cd021661e9249e1bbc3a0177 Mon Sep 17 00:00:00 2001
From: Mark Rousskov <thismark@amazon.com>
Date: Tue, 28 Jan 2025 16:14:16 +0000
Subject: [PATCH 4/5] partial progress on SortedList

---
 dc/s2n-quic-dc/src/path/secret/receiver.rs    | 375 +++++++++++++++++-
 .../src/path/secret/receiver/allocator.rs     |   6 +-
 2 files changed, 371 insertions(+), 10 deletions(-)

diff --git a/dc/s2n-quic-dc/src/path/secret/receiver.rs b/dc/s2n-quic-dc/src/path/secret/receiver.rs
index 2fd25d57db..521b2d8844 100644
--- a/dc/s2n-quic-dc/src/path/secret/receiver.rs
+++ b/dc/s2n-quic-dc/src/path/secret/receiver.rs
@@ -4,6 +4,8 @@
 use self::allocator::Allocator;
 use crate::credentials::{Credentials, KeyId};
 use std::alloc::Layout;
+use std::mem::MaybeUninit;
+use std::ptr::NonNull;
 use std::sync::{Arc, Mutex};
 
 mod allocator;
@@ -127,10 +129,10 @@ impl SharedIndexMemory {
         match self {
             SharedIndexMemory::None => SharedIndex::None,
             SharedIndexMemory::Array([a, b, c]) => {
-                SharedIndex::Array(u32::from_le_bytes([0, a, b, c]) as usize)
+                SharedIndex::Array(u32::from_le_bytes([a, b, c, 0]) as usize)
             }
             SharedIndexMemory::Bitset([a, b, c]) => {
-                SharedIndex::Bitset(u32::from_le_bytes([0, a, b, c]) as usize)
+                SharedIndex::Bitset(u32::from_le_bytes([a, b, c, 0]) as usize)
             }
         }
     }
@@ -143,14 +145,14 @@ impl SharedIndex {
             SharedIndex::Array(i) => {
                 assert!(i < (1 << 24));
                 let [a, b, c, d] = (i as u32).to_le_bytes();
-                assert!(a == 0);
-                SharedIndexMemory::Array([b, c, d])
+                assert!(d == 0);
+                SharedIndexMemory::Array([a, b, c])
             }
             SharedIndex::Bitset(i) => {
                 assert!(i < (1 << 24));
                 let [a, b, c, d] = (i as u32).to_le_bytes();
-                assert!(a == 0);
-                SharedIndexMemory::Bitset([b, c, d])
+                assert!(d == 0);
+                SharedIndexMemory::Bitset([a, b, c])
             }
         }
     }
@@ -291,8 +293,88 @@ impl State {
         }
     }
 
+    fn deallocate_shared(&self, entry: &mut InnerState) {
+        if let SharedIndex::Bitset(handle) | SharedIndex::Array(handle) = entry.shared.unpack() {
+            entry.shared = SharedIndexMemory::None;
+            // SAFETY: we've cleared the shared field, so won't get called again.
+            unsafe { self.shared.alloc.deallocate(handle) };
+        }
+    }
+
     fn push_list(&self, entry: &mut InnerState, id: u64) {
-        entry.list.push(id);
+        for _ in 0..2 {
+            match entry.shared.unpack() {
+                SharedIndex::None => {
+                    let guard = self.shared.alloc.allocate(SortedList::layout_for_cap(1));
+                    entry.shared = SharedIndex::Array(guard.handle()).pack();
+                    unsafe {
+                        let mut list = SortedList::initialize(guard.as_ptr(), 1);
+                        // Safe to unwrap because it can't need to grow -- we allocated with capacity
+                        // for 1 element and that element will get used up here.
+                        list.insert(id).unwrap();
+                    }
+
+                    // we're done, exit
+                    return;
+                }
+                SharedIndex::Array(handle) => {
+                    let Some(existing) = self.shared.alloc.read_allocation(handle) else {
+                        self.deallocate_shared(entry);
+                        // loop around to try again with a new allocation
+                        continue;
+                    };
+
+                    let mut list = unsafe { SortedList::from_existing(existing.as_ptr()) };
+                    let Err(err) = list.insert(id) else {
+                        // successfully inserted, done.
+                        return;
+                    };
+
+                    // drop the lock before we allocate, cannot hold entry lock across
+                    // allocation or we may deadlock.
+                    drop(existing);
+
+                    let (_new_guard, mut list) = match err {
+                        CapacityError::Array(cap) => {
+                            let guard = self.shared.alloc.allocate(SortedList::layout_for_cap(cap));
+                            entry.shared = SharedIndex::Array(guard.handle()).pack();
+                            let list = unsafe {
+                                SortedList::initialize(guard.as_ptr(), cap.try_into().unwrap())
+                            };
+                            (guard, list)
+                        }
+                        CapacityError::Bitset => {
+                            todo!()
+                        }
+                    };
+
+                    let previous = self.shared.alloc.read_allocation(handle);
+                    if let Some(previous) = previous {
+                        let mut prev_list = unsafe { SortedList::from_existing(previous.as_ptr()) };
+                        prev_list.copy_to(&mut list);
+                    }
+
+                    // Safe to unwrap because it can't need to grow -- we allocated with
+                    // capacity for at least one more element and that element will get used up
+                    // here. We haven't released the lock on this list since allocation so it's
+                    // impossible for some other thread to have used up the space.
+                    //
+                    // FIXME: that assumption is not true if we failed to copy, since we probably
+                    // need to *shrink* then. Maybe we should allocate a temporary buffer to copy
+                    // into?
+                    list.insert(id).unwrap();
+
+                    return;
+                }
+                SharedIndex::Bitset(_) => {
+                    todo!()
+                }
+            }
+        }
+
+        // Should be unreachable - we should always exit from the loop in at most two "turns" via
+        // `return`.
+        unreachable!()
     }
 
     fn try_remove_list(&self, entry: &mut InnerState, id: u64) -> Result<(), ()> {
@@ -308,6 +390,285 @@ impl State {
 
 impl super::map::SizeOf for State {}
 
+#[derive(Copy, Clone)]
+struct SortedListHeader {
+    len: u16,
+    count: u16,
+    cap: u16,
+    minimum: u64,
+}
+
+struct SortedList {
+    p: NonNull<u8>,
+}
+
+impl SortedList {
+    unsafe fn initialize(ptr: NonNull<u8>, cap: u16) -> SortedList {
+        ptr.as_ptr()
+            .cast::<SortedListHeader>()
+            .write(SortedListHeader {
+                len: 0,
+                count: 0,
+                cap,
+                minimum: 0,
+            });
+        SortedList { p: ptr.cast() }
+    }
+
+    fn layout_for_cap(cap: usize) -> Layout {
+        Layout::new::<SortedListHeader>()
+            .extend(Layout::array::<u16>(cap).unwrap())
+            .unwrap()
+            .0
+            .extend(Layout::array::<u8>(cap.div_ceil(8)).unwrap())
+            .unwrap()
+            .0
+    }
+    fn bitset_offset(cap: usize) -> usize {
+        Layout::new::<SortedListHeader>()
+            .extend(Layout::array::<u16>(cap).unwrap())
+            .unwrap()
+            .0
+            .extend(Layout::array::<u8>(cap.div_ceil(8)).unwrap())
+            .unwrap()
+            .1
+    }
+
+    fn slice_offset(cap: usize) -> usize {
+        Layout::new::<SortedListHeader>()
+            .extend(Layout::array::<u16>(cap).unwrap())
+            .unwrap()
+            .1
+    }
+
+    fn minimum(&self) -> u64 {
+        // aligned to 8 bytes, so should be aligned.
+        unsafe { self.p.cast::<SortedListHeader>().as_ref().minimum }
+    }
+
+    fn set_minimum(&self, min: u64) {
+        unsafe {
+            self.p.cast::<SortedListHeader>().as_mut().minimum = min;
+        }
+    }
+
+    fn len(&self) -> usize {
+        unsafe { usize::from(self.p.cast::<SortedListHeader>().as_ref().len) }
+    }
+
+    fn set_len(&self, len: usize) {
+        unsafe {
+            self.p.cast::<SortedListHeader>().as_mut().len = len.try_into().unwrap();
+        }
+    }
+
+    fn capacity(&self) -> usize {
+        unsafe { usize::from(self.p.cast::<SortedListHeader>().as_ref().cap) }
+    }
+
+    fn set_capacity(&self, cap: usize) {
+        unsafe {
+            self.p.cast::<SortedListHeader>().as_mut().cap = cap.try_into().unwrap();
+        }
+    }
+
+    fn count(&self) -> usize {
+        unsafe { usize::from(self.p.cast::<SortedListHeader>().as_ref().count) }
+    }
+
+    fn set_count(&self, count: usize) {
+        unsafe {
+            self.p.cast::<SortedListHeader>().as_mut().count = count.try_into().unwrap();
+        }
+    }
+
+    #[inline(never)]
+    fn insert(&mut self, value: u64) -> Result<(), CapacityError> {
+        let value = match self.to_offset(value) {
+            Some(v) => v,
+            None => {
+                self.compact_ensuring(value);
+                self.to_offset(value).expect("compact ensuring guarantee")
+            }
+        };
+        if self.len() == self.capacity() {
+            // FIXME: might actually need to go to bitset or compact
+            return Err(CapacityError::Array(self.len() + 1));
+        }
+        unsafe {
+            // move past the header
+            self.p
+                .as_ptr()
+                .add(Self::slice_offset(self.capacity()))
+                .cast::<u16>()
+                .add(self.len())
+                .write(value);
+            self.set_len(self.len() + 1);
+            self.set_count(self.count() + 1);
+        }
+
+        Ok(())
+    }
+
+    #[inline(never)]
+    fn remove(&mut self, value: u64) -> Result<(), Error> {
+        let Some(value) = self.to_offset(value) else {
+            // If the value is >= minimum, but we can't compute an offset, we know for sure that it
+            // was not inserted into the array. As such it must have been received already.
+            return if value >= self.minimum() {
+                Err(Error::AlreadyExists)
+            } else {
+                Err(Error::Unknown)
+            };
+        };
+        let slice = unsafe {
+            std::slice::from_raw_parts::<u16>(
+                self.p
+                    .as_ptr()
+                    .add(Self::slice_offset(self.capacity()))
+                    .cast::<u16>(),
+                self.len(),
+            )
+        };
+
+        let Ok(idx) = slice.binary_search(&value) else {
+            return Err(Error::Unknown);
+        };
+        let bitset = unsafe {
+            std::slice::from_raw_parts_mut::<u8>(
+                self.p.as_ptr().add(Self::bitset_offset(self.capacity())),
+                self.len().div_ceil(8),
+            )
+        };
+        let pos = idx / 8;
+        let mask = 1 << (idx % 8);
+        if bitset[pos] & mask != 0 {
+            return Err(Error::AlreadyExists);
+        }
+        bitset[pos] |= mask;
+
+        self.set_count(self.count() - 1);
+
+        if self.count() * 2 < self.len() {
+            self.shrink();
+        }
+
+        Ok(())
+    }
+
+    //fn grow(&mut self) {
+    //    todo!()
+    //    let new_cap = (self.capacity() + 1)
+    //        .next_power_of_two()
+    //        .clamp(0, u16::MAX as usize);
+    //    self.reallocate_to(new_cap);
+    //}
+
+    fn copy_to(&mut self, new: &mut SortedList) {
+        unsafe {
+            let new_cap = new.capacity();
+            let new = new.p;
+
+            // copy header
+            self.p
+                .as_ptr()
+                .copy_to_nonoverlapping(new.as_ptr(), std::mem::size_of::<SortedListHeader>());
+
+            // copy bitset
+            self.p
+                .as_ptr()
+                .add(Self::bitset_offset(self.capacity()))
+                .copy_to_nonoverlapping(
+                    new.as_ptr().add(Self::bitset_offset(new_cap)),
+                    self.capacity().div_ceil(8),
+                );
+
+            // Zero out tail of the new bitset (that didn't get init'd by the copy above).
+            std::slice::from_raw_parts_mut::<MaybeUninit<u8>>(
+                new.as_ptr().add(Self::bitset_offset(new_cap)).cast(),
+                new_cap.div_ceil(8),
+            )[self.capacity().div_ceil(8)..]
+                .fill(MaybeUninit::zeroed());
+
+            // Copy the actual values
+            self.p
+                .as_ptr()
+                .add(Self::slice_offset(self.capacity()))
+                .cast::<u16>()
+                .copy_to_nonoverlapping(
+                    new.as_ptr().add(Self::slice_offset(new_cap)).cast(),
+                    self.len(),
+                );
+
+            self.p = new;
+            self.set_capacity(new_cap);
+        }
+    }
+
+    // this also updates `minimum` to be best-possible given the data.
+    fn shrink(&mut self) {
+        todo!()
+        //let slice = unsafe {
+        //    std::slice::from_raw_parts::<u16>(
+        //        self.p
+        //            .as_ptr()
+        //            .add(Self::slice_offset(self.capacity()))
+        //            .cast::<u16>(),
+        //        self.len(),
+        //    )
+        //};
+        //let bitset = unsafe {
+        //    std::slice::from_raw_parts::<u8>(
+        //        self.p.as_ptr().add(Self::bitset_offset(self.capacity())),
+        //        self.len().div_ceil(8),
+        //    )
+        //};
+
+        //let mut new = Self::new();
+        //let mut cap = 0;
+        //while cap < self.count() {
+        //    // should match grow()'s impl
+        //    cap = (cap + 1).next_power_of_two().clamp(0, u16::MAX as usize);
+        //}
+        //new.reallocate_to(cap);
+        //for (idx, value) in slice.iter().copied().enumerate() {
+        //    let pos = idx / 8;
+        //    let mask = 1 << (idx % 8);
+        //    // not yet removed...
+        //    if bitset[pos] & mask == 0 {
+        //        new.insert(self.minimum() + value as u64);
+        //    }
+        //}
+        //*self = new;
+    }
+
+    fn to_offset(&mut self, value: u64) -> Option<u16> {
+        if self.minimum() == u64::MAX {
+            self.set_minimum(value);
+        }
+        let value = value.checked_sub(self.minimum())?;
+        u16::try_from(value).ok()
+    }
+
+    unsafe fn from_existing(p: NonNull<u8>) -> SortedList {
+        SortedList { p }
+    }
+
+    /// Re-pack the sorted list, potentially dropping values, to ensure that `can_fit` fits into
+    /// the list.
+    fn compact_ensuring(&self, can_fit: u64) {
+        todo!()
+    }
+}
+
+#[derive(Clone, Copy, Debug, PartialEq, Eq, thiserror::Error)]
+enum CapacityError {
+    #[error("need to grow or shrink to an array with capacity {0}")]
+    Array(usize),
+    #[error("need to grow or shrink to a bitset")]
+    Bitset,
+}
+
 #[derive(Clone, Copy, Debug, PartialEq, Eq, thiserror::Error)]
 pub enum Error {
     /// This indicates that we know about this element and it *definitely* already exists.
diff --git a/dc/s2n-quic-dc/src/path/secret/receiver/allocator.rs b/dc/s2n-quic-dc/src/path/secret/receiver/allocator.rs
index a3fcdbccc4..878449e147 100644
--- a/dc/s2n-quic-dc/src/path/secret/receiver/allocator.rs
+++ b/dc/s2n-quic-dc/src/path/secret/receiver/allocator.rs
@@ -101,7 +101,7 @@ impl Allocator {
         assert_eq!(layout.size() % PAGE_SIZE, 0);
         let free_pages: Vec<_> = (0..layout.size())
             .step_by(PAGE_SIZE)
-            .map(|offset| unsafe { dbg!(NonNull::new(region.as_ptr().add(offset)).unwrap()) })
+            .map(|offset| unsafe { NonNull::new(region.as_ptr().add(offset)).unwrap() })
             .collect();
         let end = (region.as_ptr() as usize)
             .checked_add(layout.size())
@@ -323,11 +323,11 @@ pub struct AllocationGuard<'a> {
 }
 
 impl AllocationGuard<'_> {
-    fn as_ptr(&self) -> NonNull<u8> {
+    pub fn as_ptr(&self) -> NonNull<u8> {
         self.ptr
     }
 
-    fn handle(&self) -> usize {
+    pub fn handle(&self) -> usize {
         self.mutex
     }
 }

From 895b7d2719094b942e8eff6995382ee58cda64ef Mon Sep 17 00:00:00 2001
From: Mark Rousskov <thismark@amazon.com>
Date: Tue, 28 Jan 2025 16:17:40 +0000
Subject: [PATCH 5/5] Add license headers

---
 dc/s2n-quic-dc/src/path/secret/receiver/allocator.rs      | 3 +++
 dc/s2n-quic-dc/src/path/secret/receiver/allocator/test.rs | 3 +++
 2 files changed, 6 insertions(+)

diff --git a/dc/s2n-quic-dc/src/path/secret/receiver/allocator.rs b/dc/s2n-quic-dc/src/path/secret/receiver/allocator.rs
index 878449e147..5052b67bb0 100644
--- a/dc/s2n-quic-dc/src/path/secret/receiver/allocator.rs
+++ b/dc/s2n-quic-dc/src/path/secret/receiver/allocator.rs
@@ -1,3 +1,6 @@
+// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+// SPDX-License-Identifier: Apache-2.0
+
 //! Slab allocator drawing from a fixed arena.
 //!
 //! The arena is allocated at initialization time, providing a fixed memory region from which to
diff --git a/dc/s2n-quic-dc/src/path/secret/receiver/allocator/test.rs b/dc/s2n-quic-dc/src/path/secret/receiver/allocator/test.rs
index 39b0a017d8..aedaf51f43 100644
--- a/dc/s2n-quic-dc/src/path/secret/receiver/allocator/test.rs
+++ b/dc/s2n-quic-dc/src/path/secret/receiver/allocator/test.rs
@@ -1,3 +1,6 @@
+// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+// SPDX-License-Identifier: Apache-2.0
+
 use super::Allocator;
 use std::alloc::Layout;