Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@ hf-hub = { version = "0.3.1", features = ["tokio"] }
[profile.release]
debug = 1
incremental = true
panic = "abort"

[profile.release-opt]
inherits = "release"
debug = 0
incremental = false
lto = "fat"
opt-level = 3
codegen-units = 1
panic = "abort"
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ COPY proto proto
COPY benchmark benchmark
COPY router router
COPY launcher launcher
RUN cargo build --release
RUN cargo build --profile release-opt

# Python builder
# Adapted from: https://github.com/pytorch/pytorch/blob/master/Dockerfile
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile_amd
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ COPY proto proto
COPY benchmark benchmark
COPY router router
COPY launcher launcher
RUN cargo build --release
RUN cargo build --profile release-opt

# Text Generation Inference base image for RoCm
FROM rocm/dev-ubuntu-22.04:6.1.1_hip_update as base
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile_intel
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ COPY proto proto
COPY benchmark benchmark
COPY router router
COPY launcher launcher
RUN cargo build --release
RUN cargo build --profile release-opt


# Text Generation Inference base image for Intel
Expand Down
3 changes: 3 additions & 0 deletions benchmark/src/generation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ async fn prefill(
ignore_eos_token: true, // Will not stop even if a eos token is generated
}),
top_n_tokens: top_n_tokens.unwrap_or(0),
blocks: vec![],
slots: vec![],
})
.collect();

Expand All @@ -163,6 +165,7 @@ async fn prefill(
requests,
size: batch_size,
max_tokens: batch_size * (sequence_length + decode_length),
max_blocks: 0,
};

// Run prefill
Expand Down
6 changes: 6 additions & 0 deletions proto/v3/generate.proto
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ message Request {
bool prefill_logprobs = 6;
/// Return most likely n tokens
uint32 top_n_tokens = 7;
/// Paged attention blocks
repeated uint32 blocks = 9;
/// Paged attention slots
repeated uint32 slots = 10;
}

message Batch {
Expand All @@ -141,6 +145,8 @@ message Batch {
uint32 size = 3;
/// Maximum number of tokens this batch will grow to
uint32 max_tokens = 4;
/// Maximum number of Paged Attention blocks
uint32 max_blocks = 5;
}

message CachedBatch {
Expand Down
6 changes: 5 additions & 1 deletion router/client/src/v3/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,9 @@ impl Client {
}),
// We truncate the input on the server side to be sure that it has the correct size
truncate,
// Blocks and slots will be set on the server side if we use paged attention
blocks: vec![],
slots: vec![],
// Set sampling parameters to also take these ops into account in the max memory
parameters: Some(NextTokenChooserParameters {
temperature: 0.9,
Expand Down Expand Up @@ -187,7 +190,8 @@ impl Client {
id: 0,
size: requests.len() as u32,
requests,
max_tokens: 0,
max_tokens: max_input_length,
max_blocks: 0,
};

let request = tonic::Request::new(WarmupRequest {
Expand Down
4 changes: 4 additions & 0 deletions router/client/src/v3/sharded_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,12 +241,16 @@ impl Health for ShardedClient {
ignore_eos_token: false,
}),
top_n_tokens: 0,
// Block 0 is reserved for health checks
blocks: vec![0],
slots: (0..16).collect(),
};
let batch = Batch {
id: u64::MAX,
requests: vec![liveness_request],
size: 1,
max_tokens: 2,
max_blocks: 1,
};
self.clone().prefill(batch).await?;
Ok(())
Expand Down
136 changes: 136 additions & 0 deletions router/src/infer/v3/block_allocator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
use std::cmp::min;
use tokio::sync::{mpsc, oneshot};

#[derive(Debug, Clone)]
pub(crate) struct BlockAllocation {
pub blocks: Vec<u32>,
pub slots: Vec<u32>,
block_allocator: BlockAllocator,
}

impl Drop for BlockAllocation {
fn drop(&mut self) {
self.block_allocator.free(self.blocks.clone())
}
}

#[derive(Debug, Clone)]
pub(crate) struct BlockAllocator {
/// Channel to communicate with the background task
block_allocator: mpsc::UnboundedSender<BlockAllocatorCommand>,
}

impl BlockAllocator {
pub(crate) fn new(
max_batch_total_tokens: u32,
block_size: u32,
window_size: Option<u32>,
) -> Self {
// Create channel
let (sender, receiver) = mpsc::unbounded_channel();

// Launch background queue task
tokio::spawn(block_allocator_task(
max_batch_total_tokens / block_size,
block_size,
window_size,
receiver,
));

Self {
block_allocator: sender,
}
}

pub(crate) async fn allocate(&self, tokens: u32) -> Option<BlockAllocation> {
let (response_sender, response_receiver) = oneshot::channel();
self.block_allocator
.send(BlockAllocatorCommand::Allocate {
tokens,
response_sender,
})
.unwrap();

response_receiver
.await
.unwrap()
.map(|(blocks, slots)| BlockAllocation {
blocks,
slots,
block_allocator: self.clone(),
})
}

pub(crate) fn free(&self, blocks: Vec<u32>) {
self.block_allocator
.send(BlockAllocatorCommand::Free { blocks })
.unwrap();
}
}

async fn block_allocator_task(
blocks: u32,
block_size: u32,
window_size: Option<u32>,
mut receiver: mpsc::UnboundedReceiver<BlockAllocatorCommand>,
) {
// Block 0 is reserved for health checks
let mut free_blocks: Vec<u32> = (1..blocks).collect();
while let Some(cmd) = receiver.recv().await {
match cmd {
BlockAllocatorCommand::Free { blocks } => free_blocks.extend(blocks),
BlockAllocatorCommand::Allocate {
tokens,
response_sender,
} => {
// Apply window size
let (required_blocks, repeats) = {
let (tokens, repeats) = match window_size {
None => (tokens, 1),
Some(window_size) => {
let repeats = (tokens + window_size - 1) / window_size;
let tokens = min(tokens, window_size);
(tokens, repeats as usize)
}
};
// Pad to a multiple of block size
let required_blocks = (tokens + block_size - 1) / block_size;
(required_blocks, repeats)
};

let tokens = tokens as usize;
let allocation = if required_blocks > free_blocks.len() as u32 {
None
} else {
let blocks =
free_blocks.split_off(free_blocks.len() - required_blocks as usize);
let mut slots = Vec::with_capacity(
(required_blocks * block_size * repeats as u32) as usize,
);

'slots: for block_id in blocks.repeat(repeats).iter() {
for s in (block_id * block_size)..((block_id + 1) * block_size) {
slots.push(s);
}
if slots.len() == tokens {
break 'slots;
}
}
Some((blocks, slots))
};
response_sender.send(allocation).unwrap();
}
}
}
}

#[derive(Debug)]
enum BlockAllocatorCommand {
Free {
blocks: Vec<u32>,
},
Allocate {
tokens: u32,
response_sender: oneshot::Sender<Option<(Vec<u32>, Vec<u32>)>>,
},
}
1 change: 1 addition & 0 deletions router/src/infer/v3/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod block_allocator;
mod queue;
mod scheduler;

Expand Down
Loading