Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Translator restart if disconnected from upstream #1001

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion roles/translator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ error_handling = { version = "1.0.0", path = "../../utils/error-handling" }
key-utils = { version = "^1.0.0", path = "../../utils/key-utils" }
tokio-util = { version = "0.7.10", features = ["codec"] }
async-compat = "0.2.1"
rand = "0.8.4"


[dev-dependencies]
rand = "0.8.4"
sha2 = "0.10.6"

[features]
Expand Down
40 changes: 35 additions & 5 deletions roles/translator/src/lib/downstream_sv1/downstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use async_std::{
};
use error_handling::handle_result;
use futures::FutureExt;
use tokio::sync::broadcast;
use tokio::{sync::broadcast, task::AbortHandle};

use super::{kill, DownstreamMessages, SubmitShareWithChannelId, SUBSCRIBE_TIMEOUT_SECS};

Expand Down Expand Up @@ -110,6 +110,7 @@ impl Downstream {
host: String,
difficulty_config: DownstreamDifficultyConfig,
upstream_difficulty_config: Arc<Mutex<UpstreamDifficultyConfig>>,
task_collector: Arc<Mutex<Vec<(AbortHandle, String)>>>,
) {
let stream = std::sync::Arc::new(stream);

Expand Down Expand Up @@ -150,11 +151,12 @@ impl Downstream {
let rx_shutdown_clone = rx_shutdown.clone();
let tx_shutdown_clone = tx_shutdown.clone();
let tx_status_reader = tx_status.clone();
let task_collector_mining_device = task_collector.clone();
// Task to read from SV1 Mining Device Client socket via `socket_reader`. Depending on the
// SV1 message received, a message response is sent directly back to the SV1 Downstream
// role, or the message is sent upwards to the Bridge for translation into a SV2 message
// and then sent to the SV2 Upstream role.
let _socket_reader_task = task::spawn(async move {
let socket_reader_task = tokio::task::spawn(async move {
let reader = BufReader::new(&*socket_reader);
let mut messages = FramedRead::new(
async_compat::Compat::new(reader),
Expand Down Expand Up @@ -205,15 +207,22 @@ impl Downstream {
kill(&tx_shutdown_clone).await;
warn!("Downstream: Shutting down sv1 downstream reader");
});
let _ = task_collector_mining_device.safe_lock(|a| {
a.push((
socket_reader_task.abort_handle(),
"socket_reader_task".to_string(),
))
});

let rx_shutdown_clone = rx_shutdown.clone();
let tx_shutdown_clone = tx_shutdown.clone();
let tx_status_writer = tx_status.clone();
let host_ = host.clone();

let task_collector_new_sv1_message_no_transl = task_collector.clone();
// Task to receive SV1 message responses to SV1 messages that do NOT need translation.
// These response messages are sent directly to the SV1 Downstream role.
let _socket_writer_task = task::spawn(async move {
let socket_writer_task = tokio::task::spawn(async move {
loop {
select! {
res = receiver_outgoing.recv().fuse() => {
Expand Down Expand Up @@ -242,11 +251,18 @@ impl Downstream {
&host_
);
});
let _ = task_collector_new_sv1_message_no_transl.safe_lock(|a| {
a.push((
socket_writer_task.abort_handle(),
"socket_writer_task".to_string(),
))
GitGab19 marked this conversation as resolved.
Show resolved Hide resolved
});

let tx_status_notify = tx_status;
let self_ = downstream.clone();

let _notify_task = task::spawn(async move {
let task_collector_notify_task = task_collector.clone();
let notify_task = tokio::task::spawn(async move {
let timeout_timer = std::time::Instant::now();
let mut first_sent = false;
loop {
Expand Down Expand Up @@ -329,10 +345,14 @@ impl Downstream {
&host
);
});

let _ = task_collector_notify_task
.safe_lock(|a| a.push((notify_task.abort_handle(), "notify_task".to_string())));
}

/// Accept connections from one or more SV1 Downstream roles (SV1 Mining Devices) and create a
/// new `Downstream` for each connection.
#[allow(clippy::too_many_arguments)]
pub fn accept_connections(
downstream_addr: SocketAddr,
tx_sv1_submit: Sender<DownstreamMessages>,
Expand All @@ -341,8 +361,11 @@ impl Downstream {
bridge: Arc<Mutex<crate::proxy::Bridge>>,
downstream_difficulty_config: DownstreamDifficultyConfig,
upstream_difficulty_config: Arc<Mutex<UpstreamDifficultyConfig>>,
task_collector: Arc<Mutex<Vec<(AbortHandle, String)>>>,
) {
task::spawn(async move {
let task_collector_downstream = task_collector.clone();

let accept_connections = tokio::task::spawn(async move {
let downstream_listener = TcpListener::bind(downstream_addr).await.unwrap();
let mut downstream_incoming = downstream_listener.incoming();

Expand All @@ -369,6 +392,7 @@ impl Downstream {
host,
downstream_difficulty_config.clone(),
upstream_difficulty_config.clone(),
task_collector_downstream.clone(),
)
.await;
}
Expand All @@ -378,6 +402,12 @@ impl Downstream {
}
}
});
let _ = task_collector.safe_lock(|a| {
a.push((
accept_connections.abort_handle(),
"accept_connections".to_string(),
))
});
}

/// As SV1 messages come in, determines if the message response needs to be translated to SV2
Expand Down
38 changes: 33 additions & 5 deletions roles/translator/src/lib/proxy/bridge.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use async_channel::{Receiver, Sender};
use async_std::task;
use roles_logic_sv2::{
channel_logic::channel_factory::{ExtendedChannelKind, ProxyExtendedChannelFactory, Share},
mining_sv2::{
Expand All @@ -9,7 +8,7 @@ use roles_logic_sv2::{
utils::{GroupId, Mutex},
};
use std::sync::Arc;
use tokio::sync::broadcast;
use tokio::{sync::broadcast, task::AbortHandle};
use v1::{client_to_server::Submit, server_to_client, utils::HexU32Be};

use super::super::{
Expand Down Expand Up @@ -64,6 +63,7 @@ pub struct Bridge {
last_p_hash: Option<SetNewPrevHash<'static>>,
target: Arc<Mutex<Vec<u8>>>,
last_job_id: u32,
task_collector: Arc<Mutex<Vec<(AbortHandle, String)>>>,
}

impl Bridge {
Expand All @@ -79,6 +79,7 @@ impl Bridge {
extranonces: ExtendedExtranonce,
target: Arc<Mutex<Vec<u8>>>,
up_id: u32,
task_collector: Arc<Mutex<Vec<(AbortHandle, String)>>>,
) -> Arc<Mutex<Self>> {
let ids = Arc::new(Mutex::new(GroupId::new()));
let share_per_min = 1.0;
Expand Down Expand Up @@ -107,6 +108,7 @@ impl Bridge {
last_p_hash: None,
target,
last_job_id: 0,
task_collector,
}))
}

Expand Down Expand Up @@ -162,10 +164,12 @@ impl Bridge {
/// Receives a `DownstreamMessages` message from the `Downstream`, handles based on the
/// variant received.
fn handle_downstream_messages(self_: Arc<Mutex<Self>>) {
let task_collector_handle_downstream =
self_.safe_lock(|b| b.task_collector.clone()).unwrap();
let (rx_sv1_downstream, tx_status) = self_
.safe_lock(|s| (s.rx_sv1_downstream.clone(), s.tx_status.clone()))
.unwrap();
task::spawn(async move {
let handle_downstream = tokio::task::spawn(async move {
loop {
let msg = handle_result!(tx_status, rx_sv1_downstream.clone().recv().await);

Expand All @@ -185,6 +189,12 @@ impl Bridge {
};
}
});
let _ = task_collector_handle_downstream.safe_lock(|a| {
a.push((
handle_downstream.abort_handle(),
"handle_downstream_message".to_string(),
))
});
}
/// receives a `SetDownstreamTarget` and updates the downstream target for the channel
#[allow(clippy::result_large_err)]
Expand Down Expand Up @@ -367,6 +377,8 @@ impl Bridge {
/// corresponding `job_id` has already been received. If this is not the case, an error has
/// occurred on the Upstream pool role and the connection will close.
fn handle_new_prev_hash(self_: Arc<Mutex<Self>>) {
let task_collector_handle_new_prev_hash =
self_.safe_lock(|b| b.task_collector.clone()).unwrap();
let (tx_sv1_notify, rx_sv2_set_new_prev_hash, tx_status) = self_
.safe_lock(|s| {
(
Expand All @@ -377,7 +389,7 @@ impl Bridge {
})
.unwrap();
debug!("Starting handle_new_prev_hash task");
task::spawn(async move {
let handle_new_prev_hash = tokio::task::spawn(async move {
loop {
// Receive `SetNewPrevHash` from `Upstream`
let sv2_set_new_prev_hash: SetNewPrevHash =
Expand All @@ -397,6 +409,12 @@ impl Bridge {
)
}
});
let _ = task_collector_handle_new_prev_hash.safe_lock(|a| {
a.push((
handle_new_prev_hash.abort_handle(),
"handle_new_prev_hash".to_string(),
))
});
}

async fn handle_new_extended_mining_job_(
Expand Down Expand Up @@ -460,6 +478,8 @@ impl Bridge {
/// `SetNewPrevHash` `job_id`, an error has occurred on the Upstream pool role and the
/// connection will close.
fn handle_new_extended_mining_job(self_: Arc<Mutex<Self>>) {
let task_collector_new_extended_mining_job =
self_.safe_lock(|b| b.task_collector.clone()).unwrap();
let (tx_sv1_notify, rx_sv2_new_ext_mining_job, tx_status) = self_
.safe_lock(|s| {
(
Expand All @@ -470,7 +490,7 @@ impl Bridge {
})
.unwrap();
debug!("Starting handle_new_extended_mining_job task");
task::spawn(async move {
let handle_new_extended_mining_job = tokio::task::spawn(async move {
loop {
// Receive `NewExtendedMiningJob` from `Upstream`
let sv2_new_extended_mining_job: NewExtendedMiningJob = handle_result!(
Expand All @@ -494,6 +514,12 @@ impl Bridge {
.store(true, std::sync::atomic::Ordering::SeqCst);
}
});
let _ = task_collector_new_extended_mining_job.safe_lock(|a| {
a.push((
handle_new_extended_mining_job.abort_handle(),
"handle_new_extended_mining_job".to_string(),
))
});
}
}
pub struct OpenSv1Downstream {
Expand Down Expand Up @@ -543,6 +569,7 @@ mod test {
rx_sv1_notify,
};

let task_collector = Arc::new(Mutex::new(vec![]));
let b = Bridge::new(
rx_sv1_submit,
tx_sv2_submit_shares_ext,
Expand All @@ -553,6 +580,7 @@ mod test {
extranonces,
Arc::new(Mutex::new(upstream_target)),
1,
task_collector,
);
(b, interface)
}
Expand Down
24 changes: 17 additions & 7 deletions roles/translator/src/lib/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ pub enum State<'a> {
DownstreamShutdown(Error<'a>),
BridgeShutdown(Error<'a>),
UpstreamShutdown(Error<'a>),
UpstreamTryReconnect(Error<'a>),
Healthy(String),
}

Expand Down Expand Up @@ -83,13 +84,22 @@ async fn send_status(
.await
.unwrap_or(());
}
Sender::Upstream(tx) => {
tx.send(Status {
state: State::UpstreamShutdown(e),
})
.await
.unwrap_or(());
}
Sender::Upstream(tx) => match e {
Error::ChannelErrorReceiver(_) => {
tx.send(Status {
state: State::UpstreamTryReconnect(e),
})
.await
.unwrap_or(());
}
_ => {
tx.send(Status {
state: State::UpstreamShutdown(e),
})
.await
.unwrap_or(());
}
},
Sender::TemplateReceiver(tx) => {
tx.send(Status {
state: State::UpstreamShutdown(e),
Expand Down
Loading
Loading