Skip to content

Commit

Permalink
Merge pull request #1001 from lorbax/translator-restart-if-disconnect…
Browse files Browse the repository at this point in the history
…ed-from-upstream

Translator restart if disconnected from upstream
  • Loading branch information
pavlenex authored Aug 13, 2024
2 parents 2541703 + a74262a commit ee63dc5
Show file tree
Hide file tree
Showing 6 changed files with 238 additions and 91 deletions.
2 changes: 1 addition & 1 deletion roles/translator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ error_handling = { version = "1.0.0", path = "../../utils/error-handling" }
key-utils = { version = "^1.0.0", path = "../../utils/key-utils" }
tokio-util = { version = "0.7.10", features = ["codec"] }
async-compat = "0.2.1"
rand = "0.8.4"


[dev-dependencies]
rand = "0.8.4"
sha2 = "0.10.6"

[features]
Expand Down
40 changes: 35 additions & 5 deletions roles/translator/src/lib/downstream_sv1/downstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use async_std::{
};
use error_handling::handle_result;
use futures::FutureExt;
use tokio::sync::broadcast;
use tokio::{sync::broadcast, task::AbortHandle};

use super::{kill, DownstreamMessages, SubmitShareWithChannelId, SUBSCRIBE_TIMEOUT_SECS};

Expand Down Expand Up @@ -110,6 +110,7 @@ impl Downstream {
host: String,
difficulty_config: DownstreamDifficultyConfig,
upstream_difficulty_config: Arc<Mutex<UpstreamDifficultyConfig>>,
task_collector: Arc<Mutex<Vec<(AbortHandle, String)>>>,
) {
let stream = std::sync::Arc::new(stream);

Expand Down Expand Up @@ -150,11 +151,12 @@ impl Downstream {
let rx_shutdown_clone = rx_shutdown.clone();
let tx_shutdown_clone = tx_shutdown.clone();
let tx_status_reader = tx_status.clone();
let task_collector_mining_device = task_collector.clone();
// Task to read from SV1 Mining Device Client socket via `socket_reader`. Depending on the
// SV1 message received, a message response is sent directly back to the SV1 Downstream
// role, or the message is sent upwards to the Bridge for translation into a SV2 message
// and then sent to the SV2 Upstream role.
let _socket_reader_task = task::spawn(async move {
let socket_reader_task = tokio::task::spawn(async move {
let reader = BufReader::new(&*socket_reader);
let mut messages = FramedRead::new(
async_compat::Compat::new(reader),
Expand Down Expand Up @@ -205,15 +207,22 @@ impl Downstream {
kill(&tx_shutdown_clone).await;
warn!("Downstream: Shutting down sv1 downstream reader");
});
let _ = task_collector_mining_device.safe_lock(|a| {
a.push((
socket_reader_task.abort_handle(),
"socket_reader_task".to_string(),
))
});

let rx_shutdown_clone = rx_shutdown.clone();
let tx_shutdown_clone = tx_shutdown.clone();
let tx_status_writer = tx_status.clone();
let host_ = host.clone();

let task_collector_new_sv1_message_no_transl = task_collector.clone();
// Task to receive SV1 message responses to SV1 messages that do NOT need translation.
// These response messages are sent directly to the SV1 Downstream role.
let _socket_writer_task = task::spawn(async move {
let socket_writer_task = tokio::task::spawn(async move {
loop {
select! {
res = receiver_outgoing.recv().fuse() => {
Expand Down Expand Up @@ -242,11 +251,18 @@ impl Downstream {
&host_
);
});
let _ = task_collector_new_sv1_message_no_transl.safe_lock(|a| {
a.push((
socket_writer_task.abort_handle(),
"socket_writer_task".to_string(),
))
});

let tx_status_notify = tx_status;
let self_ = downstream.clone();

let _notify_task = task::spawn(async move {
let task_collector_notify_task = task_collector.clone();
let notify_task = tokio::task::spawn(async move {
let timeout_timer = std::time::Instant::now();
let mut first_sent = false;
loop {
Expand Down Expand Up @@ -329,10 +345,14 @@ impl Downstream {
&host
);
});

let _ = task_collector_notify_task
.safe_lock(|a| a.push((notify_task.abort_handle(), "notify_task".to_string())));
}

/// Accept connections from one or more SV1 Downstream roles (SV1 Mining Devices) and create a
/// new `Downstream` for each connection.
#[allow(clippy::too_many_arguments)]
pub fn accept_connections(
downstream_addr: SocketAddr,
tx_sv1_submit: Sender<DownstreamMessages>,
Expand All @@ -341,8 +361,11 @@ impl Downstream {
bridge: Arc<Mutex<crate::proxy::Bridge>>,
downstream_difficulty_config: DownstreamDifficultyConfig,
upstream_difficulty_config: Arc<Mutex<UpstreamDifficultyConfig>>,
task_collector: Arc<Mutex<Vec<(AbortHandle, String)>>>,
) {
task::spawn(async move {
let task_collector_downstream = task_collector.clone();

let accept_connections = tokio::task::spawn(async move {
let downstream_listener = TcpListener::bind(downstream_addr).await.unwrap();
let mut downstream_incoming = downstream_listener.incoming();

Expand All @@ -369,6 +392,7 @@ impl Downstream {
host,
downstream_difficulty_config.clone(),
upstream_difficulty_config.clone(),
task_collector_downstream.clone(),
)
.await;
}
Expand All @@ -378,6 +402,12 @@ impl Downstream {
}
}
});
let _ = task_collector.safe_lock(|a| {
a.push((
accept_connections.abort_handle(),
"accept_connections".to_string(),
))
});
}

/// As SV1 messages come in, determines if the message response needs to be translated to SV2
Expand Down
38 changes: 33 additions & 5 deletions roles/translator/src/lib/proxy/bridge.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use async_channel::{Receiver, Sender};
use async_std::task;
use roles_logic_sv2::{
channel_logic::channel_factory::{ExtendedChannelKind, ProxyExtendedChannelFactory, Share},
mining_sv2::{
Expand All @@ -9,7 +8,7 @@ use roles_logic_sv2::{
utils::{GroupId, Mutex},
};
use std::sync::Arc;
use tokio::sync::broadcast;
use tokio::{sync::broadcast, task::AbortHandle};
use v1::{client_to_server::Submit, server_to_client, utils::HexU32Be};

use super::super::{
Expand Down Expand Up @@ -64,6 +63,7 @@ pub struct Bridge {
last_p_hash: Option<SetNewPrevHash<'static>>,
target: Arc<Mutex<Vec<u8>>>,
last_job_id: u32,
task_collector: Arc<Mutex<Vec<(AbortHandle, String)>>>,
}

impl Bridge {
Expand All @@ -79,6 +79,7 @@ impl Bridge {
extranonces: ExtendedExtranonce,
target: Arc<Mutex<Vec<u8>>>,
up_id: u32,
task_collector: Arc<Mutex<Vec<(AbortHandle, String)>>>,
) -> Arc<Mutex<Self>> {
let ids = Arc::new(Mutex::new(GroupId::new()));
let share_per_min = 1.0;
Expand Down Expand Up @@ -107,6 +108,7 @@ impl Bridge {
last_p_hash: None,
target,
last_job_id: 0,
task_collector,
}))
}

Expand Down Expand Up @@ -162,10 +164,12 @@ impl Bridge {
/// Receives a `DownstreamMessages` message from the `Downstream`, handles based on the
/// variant received.
fn handle_downstream_messages(self_: Arc<Mutex<Self>>) {
let task_collector_handle_downstream =
self_.safe_lock(|b| b.task_collector.clone()).unwrap();
let (rx_sv1_downstream, tx_status) = self_
.safe_lock(|s| (s.rx_sv1_downstream.clone(), s.tx_status.clone()))
.unwrap();
task::spawn(async move {
let handle_downstream = tokio::task::spawn(async move {
loop {
let msg = handle_result!(tx_status, rx_sv1_downstream.clone().recv().await);

Expand All @@ -185,6 +189,12 @@ impl Bridge {
};
}
});
let _ = task_collector_handle_downstream.safe_lock(|a| {
a.push((
handle_downstream.abort_handle(),
"handle_downstream_message".to_string(),
))
});
}
/// receives a `SetDownstreamTarget` and updates the downstream target for the channel
#[allow(clippy::result_large_err)]
Expand Down Expand Up @@ -367,6 +377,8 @@ impl Bridge {
/// corresponding `job_id` has already been received. If this is not the case, an error has
/// occurred on the Upstream pool role and the connection will close.
fn handle_new_prev_hash(self_: Arc<Mutex<Self>>) {
let task_collector_handle_new_prev_hash =
self_.safe_lock(|b| b.task_collector.clone()).unwrap();
let (tx_sv1_notify, rx_sv2_set_new_prev_hash, tx_status) = self_
.safe_lock(|s| {
(
Expand All @@ -377,7 +389,7 @@ impl Bridge {
})
.unwrap();
debug!("Starting handle_new_prev_hash task");
task::spawn(async move {
let handle_new_prev_hash = tokio::task::spawn(async move {
loop {
// Receive `SetNewPrevHash` from `Upstream`
let sv2_set_new_prev_hash: SetNewPrevHash =
Expand All @@ -397,6 +409,12 @@ impl Bridge {
)
}
});
let _ = task_collector_handle_new_prev_hash.safe_lock(|a| {
a.push((
handle_new_prev_hash.abort_handle(),
"handle_new_prev_hash".to_string(),
))
});
}

async fn handle_new_extended_mining_job_(
Expand Down Expand Up @@ -460,6 +478,8 @@ impl Bridge {
/// `SetNewPrevHash` `job_id`, an error has occurred on the Upstream pool role and the
/// connection will close.
fn handle_new_extended_mining_job(self_: Arc<Mutex<Self>>) {
let task_collector_new_extended_mining_job =
self_.safe_lock(|b| b.task_collector.clone()).unwrap();
let (tx_sv1_notify, rx_sv2_new_ext_mining_job, tx_status) = self_
.safe_lock(|s| {
(
Expand All @@ -470,7 +490,7 @@ impl Bridge {
})
.unwrap();
debug!("Starting handle_new_extended_mining_job task");
task::spawn(async move {
let handle_new_extended_mining_job = tokio::task::spawn(async move {
loop {
// Receive `NewExtendedMiningJob` from `Upstream`
let sv2_new_extended_mining_job: NewExtendedMiningJob = handle_result!(
Expand All @@ -494,6 +514,12 @@ impl Bridge {
.store(true, std::sync::atomic::Ordering::SeqCst);
}
});
let _ = task_collector_new_extended_mining_job.safe_lock(|a| {
a.push((
handle_new_extended_mining_job.abort_handle(),
"handle_new_extended_mining_job".to_string(),
))
});
}
}
pub struct OpenSv1Downstream {
Expand Down Expand Up @@ -543,6 +569,7 @@ mod test {
rx_sv1_notify,
};

let task_collector = Arc::new(Mutex::new(vec![]));
let b = Bridge::new(
rx_sv1_submit,
tx_sv2_submit_shares_ext,
Expand All @@ -553,6 +580,7 @@ mod test {
extranonces,
Arc::new(Mutex::new(upstream_target)),
1,
task_collector,
);
(b, interface)
}
Expand Down
24 changes: 17 additions & 7 deletions roles/translator/src/lib/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ pub enum State<'a> {
DownstreamShutdown(Error<'a>),
BridgeShutdown(Error<'a>),
UpstreamShutdown(Error<'a>),
UpstreamTryReconnect(Error<'a>),
Healthy(String),
}

Expand Down Expand Up @@ -83,13 +84,22 @@ async fn send_status(
.await
.unwrap_or(());
}
Sender::Upstream(tx) => {
tx.send(Status {
state: State::UpstreamShutdown(e),
})
.await
.unwrap_or(());
}
Sender::Upstream(tx) => match e {
Error::ChannelErrorReceiver(_) => {
tx.send(Status {
state: State::UpstreamTryReconnect(e),
})
.await
.unwrap_or(());
}
_ => {
tx.send(Status {
state: State::UpstreamShutdown(e),
})
.await
.unwrap_or(());
}
},
Sender::TemplateReceiver(tx) => {
tx.send(Status {
state: State::UpstreamShutdown(e),
Expand Down
Loading

0 comments on commit ee63dc5

Please sign in to comment.