Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Attempt for cleaner implementation #1

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 20 additions & 13 deletions src/distributing_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ pub struct DistributingIterator<T, ID> {
pos: usize,
original_size: usize,
spread: usize,
queue_per_id: Option<FnvHashMap<ID, VecDeque<T>>>,
queue_per_id: FnvHashMap<ID, VecDeque<T>>,
last_pos: IndexMap<ID, usize>,
iterator_reached_end: bool,
id_func: Box<dyn Fn(&T) -> ID + Send>,
}

impl<T, ID> DistributingIterator<T, ID>
where
ID: Eq + std::hash::Hash,
ID: Eq + std::hash::Hash + Clone,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not please

{
pub fn new(
data: VecDeque<T>,
Expand All @@ -59,7 +59,7 @@ where
spread,
pos: 0,
id_func: Box::new(id_func),
queue_per_id: Some(FnvHashMap::default()),
queue_per_id: FnvHashMap::default(),
last_pos: IndexMap::default(),
iterator_reached_end: false,
}
Expand All @@ -80,17 +80,17 @@ where
}

fn get_next_item(&mut self) -> Option<T> {
let mut queue_per_id = self.queue_per_id.take().unwrap();
let mut adjust_spread = false;

let result = loop {
let mut result = None;
for id in self.sorted_spreadable_ids() {
match queue_per_id.get_mut(id) {
let ids: Vec<ID> = self.sorted_spreadable_ids().map(|id| id.clone()).collect();
for id in ids.iter() {
match self.queue_per_id.get_mut(id) {
Some(queue) => {
if let Some(item) = queue.pop_front() {
if self.iterator_reached_end && queue.is_empty() {
queue_per_id.remove(id);
self.queue_per_id.remove(id);
adjust_spread = true
}
result = Some(item);
Expand All @@ -105,7 +105,7 @@ where
}

if self.iterator_reached_end {
if queue_per_id.values().flatten().any(|_| true) {
if self.queue_per_id.values().flatten().any(|_| true) {
panic!(
"Nothing can be returned even though the queue is not empty. This is a bug"
);
Expand All @@ -121,22 +121,21 @@ where
break Some(item);
} else {
// queue_per_id.entry(id).or_default().push_back(item);
queue_per_id
self.queue_per_id
.entry(id)
.or_insert_with(|| VecDeque::with_capacity(100))
.push_back(item);
}
}
None => {
self.spread = Self::calculate_spread(&queue_per_id);
self.spread = Self::calculate_spread(&self.queue_per_id);
self.iterator_reached_end = true;
}
}
};
if adjust_spread {
self.spread = Self::calculate_spread(&queue_per_id);
self.spread = Self::calculate_spread(&self.queue_per_id);
}
self.queue_per_id = Some(queue_per_id);
result
}

Expand All @@ -158,7 +157,7 @@ where
impl<T, ID> Iterator for DistributingIterator<T, ID>
where
T: std::fmt::Debug,
ID: Eq + std::hash::Hash + std::fmt::Debug,
ID: Eq + std::hash::Hash + std::fmt::Debug + Clone,
{
type Item = T;

Expand Down Expand Up @@ -223,4 +222,12 @@ mod tests {
]
);
}

#[test]
fn test_empty_array() {
let data: Vec<Item> = vec![];
let iterator = DistributingIterator::new(data.into(), 3, |item| item.id);
let data: Vec<_> = iterator.collect();
assert_eq!(data, vec![]);
}
}
Loading