Skip to content

Commit

Permalink
[chore] mechanical cleanup
Browse files Browse the repository at this point in the history
- Move loglet mod to follow new file-style module as it contains type defs and logic
- Removal of dead_code attribute and cleanup of unused types/structs/vars/etc.

```
// left intentionally to make sapling+github happy
```
  • Loading branch information
AhmedSoliman committed Dec 17, 2024
1 parent 3da5356 commit 6d4c66b
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,10 @@ pub trait LogletReadStream: Stream<Item = Result<LogEntry<LogletOffset>, Operati

pub type SendableLogletReadStream = Pin<Box<dyn LogletReadStream + Send>>;

#[allow(dead_code)]
pub(crate) struct LogletCommitResolver {
pub struct LogletCommitResolver {
tx: oneshot::Sender<Result<LogletOffset, AppendError>>,
}

#[allow(dead_code)]
impl LogletCommitResolver {
pub fn sealed(self) {
let _ = self.tx.send(Err(AppendError::Sealed));
Expand All @@ -179,20 +177,19 @@ pub struct LogletCommit {
}

impl LogletCommit {
pub(crate) fn sealed() -> Self {
pub fn sealed() -> Self {
let (tx, rx) = oneshot::channel();
let _ = tx.send(Err(AppendError::Sealed));
Self { rx }
}

pub(crate) fn resolved(offset: LogletOffset) -> Self {
pub fn resolved(offset: LogletOffset) -> Self {
let (tx, rx) = oneshot::channel();
let _ = tx.send(Ok(offset));
Self { rx }
}

#[allow(dead_code)]
pub(crate) fn deferred() -> (Self, LogletCommitResolver) {
pub fn deferred() -> (Self, LogletCommitResolver) {
let (tx, rx) = oneshot::channel();
(Self { rx }, LogletCommitResolver { tx })
}
Expand Down
2 changes: 0 additions & 2 deletions crates/bifrost/src/providers/replicated_loglet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@ pub(crate) mod metric_definitions;
mod network;
mod provider;
mod read_path;
#[allow(dead_code)]
mod remote_sequencer;
pub mod replication;
mod rpc_routers;
#[allow(dead_code)]
pub mod sequencer;
mod tasks;
#[cfg(any(test, feature = "test-util"))]
Expand Down
54 changes: 10 additions & 44 deletions crates/bifrost/src/providers/replicated_loglet/remote_sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,27 +401,6 @@ impl RemoteSequencerConnection {
}
}

struct MaybeTerminalAppendError {
pub original: AppendError,
pub terminal: bool,
}

impl MaybeTerminalAppendError {
fn terminal(error: AppendError) -> Self {
Self {
original: error,
terminal: true,
}
}

fn non_terminal(error: AppendError) -> Self {
Self {
original: error,
terminal: false,
}
}
}

pub(crate) struct RemoteInflightAppend {
rpc_token: RpcToken<Appended>,
commit_resolver: LogletCommitResolver,
Expand Down Expand Up @@ -493,7 +472,7 @@ mod test {

use restate_core::{
network::{Incoming, MessageHandler, MockConnector},
TestCoreEnv, TestCoreEnvBuilder,
TestCoreEnvBuilder,
};
use restate_types::{
logs::{LogId, LogletOffset, Record, SequenceNumber, TailState},
Expand Down Expand Up @@ -552,15 +531,10 @@ mod test {
}
}

struct TestEnv {
pub core_env: TestCoreEnv<MockConnector>,
pub remote_sequencer: RemoteSequencer<MockConnector>,
}

async fn setup<F, O>(sequencer: SequencerMockHandler, test: F)
where
O: Future<Output = ()>,
F: FnOnce(TestEnv) -> O,
F: FnOnce(RemoteSequencer<MockConnector>) -> O,
{
let (connector, _receiver) = MockConnector::new(100);
let connector = Arc::new(connector);
Expand All @@ -587,30 +561,24 @@ mod test {
sequencer_rpc,
);

let core_env = builder.build().await;
let env = TestEnv {
core_env,
remote_sequencer,
};
test(env).await;
let _env = builder.build().await;
test(remote_sequencer).await;
}

#[restate_core::test]
async fn test_remote_stream_ok() {
let handler = SequencerMockHandler::default();

setup(handler, |test_env| async move {
setup(handler, |remote_sequencer| async move {
let records: Vec<Record> =
vec!["record 1".into(), "record 2".into(), "record 3".into()];

let commit_1 = test_env
.remote_sequencer
let commit_1 = remote_sequencer
.append(records.clone().into())
.await
.unwrap();

let commit_2 = test_env
.remote_sequencer
let commit_2 = remote_sequencer
.append(records.clone().into())
.await
.unwrap();
Expand All @@ -627,18 +595,16 @@ mod test {
async fn test_remote_stream_sealed() {
let handler = SequencerMockHandler::with_reply_status(SequencerStatus::Sealed);

setup(handler, |test_env| async move {
setup(handler, |remote_sequencer| async move {
let records: Vec<Record> =
vec!["record 1".into(), "record 2".into(), "record 3".into()];

let commit_1 = test_env
.remote_sequencer
let commit_1 = remote_sequencer
.append(records.clone().into())
.await
.unwrap();

let commit_2 = test_env
.remote_sequencer
let commit_2 = remote_sequencer
.append(records.clone().into())
.await
.unwrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ impl<T: TransportConnect> SequencerAppender<T> {
tracing::debug!(%peer, error=%err, "Failed to send batch to node");
continue;
}
StoreTaskStatus::Sealed(_) => {
StoreTaskStatus::Sealed => {
tracing::debug!(%peer, "Store task cancelled, the node is sealed");
checker.set_attribute(peer, NodeAttributes::sealed());
continue;
Expand Down Expand Up @@ -439,7 +439,7 @@ impl NodeAttributes {

#[derive(Debug)]
enum StoreTaskStatus {
Sealed(LogletOffset),
Sealed,
Stored(Stored),
Error(NetworkError),
}
Expand Down Expand Up @@ -529,7 +529,7 @@ impl<'a, T: TransportConnect> LogServerStoreTask<'a, T> {
};

match tail_state {
TailState::Sealed(offset) => return Ok(StoreTaskStatus::Sealed(offset)),
TailState::Sealed(_) => return Ok(StoreTaskStatus::Sealed),
TailState::Open(offset) => {
match offset.cmp(&self.first_offset) {
Ordering::Equal | Ordering::Greater => {
Expand Down Expand Up @@ -558,7 +558,7 @@ impl<'a, T: TransportConnect> LogServerStoreTask<'a, T> {
match incoming.body().status {
Status::Sealing | Status::Sealed => {
server.local_tail().notify_seal();
return Ok(StoreTaskStatus::Sealed(incoming.body().header.local_tail));
return Ok(StoreTaskStatus::Sealed);
}
_ => {
// all other status types are handled by the caller
Expand Down

0 comments on commit 6d4c66b

Please sign in to comment.