Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Translator restart if disconnected from upstream #1001

Conversation

lorbax
Copy link
Collaborator

@lorbax lorbax commented Jun 24, 2024

When the jdc changes upstream, it disconnects the TProxy.
This PR addresses this issue by making the TProxy reconnecting to the JDC.

Removed the part related to the MG test.

How to test this PR:
The idea is to have a pool that rejects a share. It is possible to do that by tweaking the part of the code needed to check the target, forcing the pool to calculate a wrong hash for a share.
Following @GitGab19 suggestion, the part of the code responsible for that is roles-logc-sv2::channel_logic::channel_factory::on_submit_shares_extended(...). Since other roles check the share using the same function, directly modifying this would have the consequence that no share at all is ever sent to the pool. If I understood correctly, the way to fix this issue is to apply @GitGab19 suggestion to a clone of function on_submit_shares_extended, which can be called call on_submit_shares_extended_pool. After that, modify the implementation of handle_submit_share_extended trait function of ParseDownstreamMiningMessages trait in the following way

fn handle_submit_shares_extended(
        &mut self,
        m: SubmitSharesExtended,
    ) -> Result<SendTo<()>, Error> {
        let res = self
            .channel_factory
            //.safe_lock(|cf| cf.on_submit_shares_extended(m.clone()))
            .safe_lock(|cf| cf.on_submit_shares_extended_pool(m.clone()))
            .map_err(|e| roles_logic_sv2::Error::PoisonLock(e.to_string()))?;
        ...
    }
  • run a "good" pool on port 34254, this is the pool that the jdc will fallback to.
  • run a "bad" pool with the modification above on port 44254, this pool will refuse the first share submitted to the upstream (recall to adjust the channel hashrate in the tproxy config)
  • run a jdc client with these upstreams in the config
[[upstreams]]
authority_pubkey = "9auqWEzQDVyd2oe1JVGFLMLHZtCo2FFqZwtKA5gd9xbuEu7PH72"
pool_address = "127.0.0.1:44254"
jd_address = "127.0.0.1:34264"
pool_signature = "Stratum v2 SRI Pool"
[[upstreams]]
authority_pubkey = "9auqWEzQDVyd2oe1JVGFLMLHZtCo2FFqZwtKA5gd9xbuEu7PH72"
pool_address = "127.0.0.1:34254"
jd_address = "127.0.0.1:34264"
pool_signature = "Stratum v2 SRI Pool"
  • run the downstreams (usually tproxy + sv1 miner)

@lorbax lorbax changed the base branch from main to dev June 24, 2024 15:56
@lorbax lorbax force-pushed the translator-restart-if-disconnected-from-upstream branch from 6094b69 to b092514 Compare June 24, 2024 18:04
Copy link
Contributor

github-actions bot commented Jun 24, 2024

🐰Bencher

ReportTue, August 13, 2024 at 09:44:14 UTC
ProjectStratum v2 (SRI)
Branchtranslator-restart-if-disconnected-from-upstream
Testbedsv2

🚨 1 ALERT: Threshold Boundary Limit exceeded!
BenchmarkMeasure (units)ViewValueLower BoundaryUpper Boundary
client_sv2_mining_message_submit_standardLatency (nanoseconds (ns))🚨 (view plot | view alert)14.70 (+0.31%)14.69 (100.03%)

Click to view all benchmark results
BenchmarkLatencyLatency Results
nanoseconds (ns) | (Δ%)
Latency Upper Boundary
nanoseconds (ns) | (%)
client_sv2_handle_message_common✅ (view plot)44.53 (-0.09%)45.22 (98.47%)
client_sv2_handle_message_mining✅ (view plot)73.34 (+0.24%)80.47 (91.15%)
client_sv2_mining_message_submit_standard🚨 (view plot | view alert)14.70 (+0.31%)14.69 (100.03%)
client_sv2_mining_message_submit_standard_serialize✅ (view plot)268.86 (+1.81%)283.15 (94.95%)
client_sv2_mining_message_submit_standard_serialize_deserialize✅ (view plot)607.61 (+2.15%)626.47 (96.99%)
client_sv2_open_channel✅ (view plot)160.87 (-2.95%)173.28 (92.84%)
client_sv2_open_channel_serialize✅ (view plot)271.91 (-3.42%)294.17 (92.43%)
client_sv2_open_channel_serialize_deserialize✅ (view plot)373.93 (-0.96%)419.47 (89.14%)
client_sv2_setup_connection✅ (view plot)166.61 (+1.41%)174.70 (95.37%)
client_sv2_setup_connection_serialize✅ (view plot)451.38 (-4.29%)507.43 (88.95%)
client_sv2_setup_connection_serialize_deserialize✅ (view plot)991.03 (+2.04%)1,038.14 (95.46%)

Bencher - Continuous Benchmarking
View Public Perf Page
Docs | Repo | Chat | Help

Copy link
Contributor

github-actions bot commented Jun 24, 2024

🐰Bencher

ReportTue, August 13, 2024 at 09:44:11 UTC
ProjectStratum v2 (SRI)
Branch1001/merge
Testbedsv1

🚨 2 ALERTS: Threshold Boundary Limits exceeded!
BenchmarkMeasure (units)ViewValueLower BoundaryUpper Boundary
client-sv1-authorize-serialize/client-sv1-authorize-serializeLatency (nanoseconds (ns))🚨 (view plot | view alert)334.35 (+33.09%)280.39 (119.25%)
client-sv1-get-subscribe/client-sv1-get-subscribeLatency (nanoseconds (ns))🚨 (view plot | view alert)303.27 (+8.68%)292.72 (103.60%)

Click to view all benchmark results
BenchmarkLatencyLatency Results
nanoseconds (ns) | (Δ%)
Latency Upper Boundary
nanoseconds (ns) | (%)
client-submit-serialize✅ (view plot)6,412.60 (-6.17%)7,368.25 (87.03%)
client-submit-serialize-deserialize✅ (view plot)7,402.30 (-4.54%)8,330.19 (88.86%)
client-submit-serialize-deserialize-handle/client-submit-serialize-deserialize-handle✅ (view plot)7,998.40 (-4.19%)8,849.63 (90.38%)
client-sv1-authorize-serialize-deserialize-handle/client-sv1-authorize-serialize-deserialize-handle✅ (view plot)901.66 (+0.20%)927.66 (97.20%)
client-sv1-authorize-serialize-deserialize/client-sv1-authorize-serialize-deserialize✅ (view plot)701.82 (+0.45%)719.55 (97.54%)
client-sv1-authorize-serialize/client-sv1-authorize-serialize🚨 (view plot | view alert)334.35 (+33.09%)280.39 (119.25%)
client-sv1-get-authorize/client-sv1-get-authorize✅ (view plot)156.52 (-0.48%)162.56 (96.28%)
client-sv1-get-submit✅ (view plot)6,268.50 (-5.18%)7,144.01 (87.74%)
client-sv1-get-subscribe/client-sv1-get-subscribe🚨 (view plot | view alert)303.27 (+8.68%)292.72 (103.60%)
client-sv1-subscribe-serialize-deserialize-handle/client-sv1-subscribe-serialize-deserialize-handle✅ (view plot)764.22 (+1.57%)786.89 (97.12%)
client-sv1-subscribe-serialize-deserialize/client-sv1-subscribe-serialize-deserialize✅ (view plot)636.22 (+3.34%)640.63 (99.31%)
client-sv1-subscribe-serialize/client-sv1-subscribe-serialize✅ (view plot)201.14 (-2.29%)218.08 (92.23%)

Bencher - Continuous Benchmarking
View Public Perf Page
Docs | Repo | Chat | Help

Copy link
Contributor

github-actions bot commented Jun 24, 2024

🐰Bencher

ReportTue, August 13, 2024 at 09:44:11 UTC
ProjectStratum v2 (SRI)
Branchtranslator-restart-if-disconnected-from-upstream
Testbedsv2

🚨 3 ALERTS: Threshold Boundary Limits exceeded!
BenchmarkMeasure (units)ViewValueLower BoundaryUpper Boundary
client_sv2_handle_message_commonEstimated Cycles (estimated cycles)🚨 (view plot | view alert)2,191.00 (+6.36%)2,151.57 (101.83%)
client_sv2_handle_message_commonL2 Accesses (accesses)🚨 (view plot | view alert)13.00 (+68.08%)12.91 (100.66%)
client_sv2_handle_message_commonRAM Accesses (accesses)🚨 (view plot | view alert)40.00 (+8.57%)38.98 (102.62%)

Click to view all benchmark results
BenchmarkEstimated CyclesEstimated Cycles Results
estimated cycles | (Δ%)
Estimated Cycles Upper Boundary
estimated cycles | (%)
InstructionsInstructions Results
instructions | (Δ%)
Instructions Upper Boundary
instructions | (%)
L1 AccessesL1 Accesses Results
accesses | (Δ%)
L1 Accesses Upper Boundary
accesses | (%)
L2 AccessesL2 Accesses Results
accesses | (Δ%)
L2 Accesses Upper Boundary
accesses | (%)
RAM AccessesRAM Accesses Results
accesses | (Δ%)
RAM Accesses Upper Boundary
accesses | (%)
client_sv2_handle_message_common🚨 (view plot | view alert)2,191.00 (+6.36%)2,151.57 (101.83%)✅ (view plot)473.00 (+0.39%)485.72 (97.38%)✅ (view plot)726.00 (-0.80%)753.15 (96.39%)🚨 (view plot | view alert)13.00 (+68.08%)12.91 (100.66%)🚨 (view plot | view alert)40.00 (+8.57%)38.98 (102.62%)
client_sv2_handle_message_mining✅ (view plot)8,239.00 (+0.46%)8,330.53 (98.90%)✅ (view plot)2,143.00 (+0.64%)2,171.03 (98.71%)✅ (view plot)3,169.00 (+0.68%)3,214.83 (98.57%)✅ (view plot)34.00 (-11.36%)43.42 (78.30%)✅ (view plot)140.00 (+0.79%)141.80 (98.73%)
client_sv2_mining_message_submit_standard✅ (view plot)6,366.00 (+1.37%)6,388.11 (99.65%)✅ (view plot)1,756.00 (+0.33%)1,763.40 (99.58%)✅ (view plot)2,556.00 (+0.09%)2,574.15 (99.30%)✅ (view plot)20.00 (+8.20%)25.27 (79.15%)✅ (view plot)106.00 (+2.09%)106.76 (99.29%)
client_sv2_mining_message_submit_standard_serialize✅ (view plot)14,873.00 (+0.69%)15,021.58 (99.01%)✅ (view plot)4,700.00 (+0.12%)4,707.40 (99.84%)✅ (view plot)6,758.00 (+0.05%)6,773.88 (99.77%)✅ (view plot)48.00 (-0.10%)53.97 (88.94%)✅ (view plot)225.00 (+1.27%)229.59 (98.00%)
client_sv2_mining_message_submit_standard_serialize_deserialize✅ (view plot)27,655.00 (+0.61%)27,833.27 (99.36%)✅ (view plot)10,591.00 (+0.39%)10,591.87 (99.99%)✅ (view plot)15,405.00 (+0.35%)15,408.53 (99.98%)✅ (view plot)84.00 (-0.04%)89.68 (93.67%)✅ (view plot)338.00 (+0.97%)344.58 (98.09%)
client_sv2_open_channel✅ (view plot)4,349.00 (-2.72%)4,623.74 (94.06%)✅ (view plot)1,461.00 (+0.04%)1,473.54 (99.15%)✅ (view plot)2,159.00 (+0.24%)2,172.96 (99.36%)✅ (view plot)11.00 (-7.00%)14.96 (73.51%)✅ (view plot)61.00 (-5.43%)68.71 (88.78%)
client_sv2_open_channel_serialize✅ (view plot)14,054.00 (-0.93%)14,453.56 (97.24%)✅ (view plot)5,064.00 (+0.01%)5,076.54 (99.75%)✅ (view plot)7,324.00 (+0.07%)7,338.82 (99.80%)✅ (view plot)37.00 (-1.78%)42.98 (86.09%)✅ (view plot)187.00 (-2.00%)198.81 (94.06%)
client_sv2_open_channel_serialize_deserialize✅ (view plot)22,685.00 (+0.22%)22,989.23 (98.68%)✅ (view plot)8,027.00 (+0.45%)8,031.29 (99.95%)✅ (view plot)11,670.00 (+0.39%)11,678.40 (99.93%)✅ (view plot)82.00 (+9.84%)84.92 (96.56%)✅ (view plot)303.00 (-0.30%)314.02 (96.49%)
client_sv2_setup_connection✅ (view plot)4,715.00 (+0.44%)4,759.91 (99.06%)✅ (view plot)1,502.00 (+0.04%)1,514.54 (99.17%)✅ (view plot)2,275.00 (-0.07%)2,297.55 (99.02%)✅ (view plot)12.00 (+21.33%)15.50 (77.41%)✅ (view plot)68.00 (+0.48%)69.64 (97.65%)
client_sv2_setup_connection_serialize✅ (view plot)16,134.00 (-0.58%)16,510.61 (97.72%)✅ (view plot)5,963.00 (+0.01%)5,975.54 (99.79%)✅ (view plot)8,659.00 (+0.04%)8,677.14 (99.79%)✅ (view plot)46.00 (+1.66%)50.22 (91.59%)✅ (view plot)207.00 (-1.37%)218.32 (94.82%)
client_sv2_setup_connection_serialize_deserialize✅ (view plot)35,572.00 (+0.13%)35,734.42 (99.55%)✅ (view plot)14,855.00 (+0.25%)14,859.22 (99.97%)✅ (view plot)21,817.00 (+0.25%)21,818.37 (99.99%)✅ (view plot)98.00 (-2.44%)113.53 (86.32%)✅ (view plot)379.00 (+0.02%)384.86 (98.48%)

Bencher - Continuous Benchmarking
View Public Perf Page
Docs | Repo | Chat | Help

Copy link
Contributor

github-actions bot commented Jun 24, 2024

🐰Bencher

ReportTue, August 13, 2024 at 09:44:14 UTC
ProjectStratum v2 (SRI)
Branchtranslator-restart-if-disconnected-from-upstream
Testbedsv1

🚨 3 ALERTS: Threshold Boundary Limits exceeded!
BenchmarkMeasure (units)ViewValueLower BoundaryUpper Boundary
serialize_deserialize_subscribeRAM Accesses (accesses)🚨 (view plot | view alert)322.00 (+1.05%)321.59 (100.13%)
serialize_submitRAM Accesses (accesses)🚨 (view plot | view alert)330.00 (+1.63%)329.22 (100.24%)
serialize_subscribeRAM Accesses (accesses)🚨 (view plot | view alert)160.00 (+3.10%)159.70 (100.19%)

Click to view all benchmark results
BenchmarkEstimated CyclesEstimated Cycles Results
estimated cycles | (Δ%)
Estimated Cycles Upper Boundary
estimated cycles | (%)
InstructionsInstructions Results
instructions | (Δ%)
Instructions Upper Boundary
instructions | (%)
L1 AccessesL1 Accesses Results
accesses | (Δ%)
L1 Accesses Upper Boundary
accesses | (%)
L2 AccessesL2 Accesses Results
accesses | (Δ%)
L2 Accesses Upper Boundary
accesses | (%)
RAM AccessesRAM Accesses Results
accesses | (Δ%)
RAM Accesses Upper Boundary
accesses | (%)
get_authorize✅ (view plot)8,553.00 (+1.25%)8,719.31 (98.09%)✅ (view plot)3,772.00 (+0.78%)3,849.29 (97.99%)✅ (view plot)5,293.00 (+0.86%)5,393.33 (98.14%)✅ (view plot)8.00 (+2.56%)10.24 (78.09%)✅ (view plot)92.00 (+1.87%)94.25 (97.61%)
get_submit✅ (view plot)95,702.00 (+0.16%)96,092.95 (99.59%)✅ (view plot)59,522.00 (+0.09%)59,753.45 (99.61%)✅ (view plot)85,502.00 (+0.11%)85,802.81 (99.65%)✅ (view plot)45.00 (-16.60%)62.94 (71.49%)✅ (view plot)285.00 (+1.01%)287.36 (99.18%)
get_subscribe✅ (view plot)8,072.00 (+1.19%)8,248.17 (97.86%)✅ (view plot)2,848.00 (+0.55%)2,934.35 (97.06%)✅ (view plot)3,982.00 (+0.61%)4,093.55 (97.28%)✅ (view plot)13.00 (-16.67%)20.06 (64.82%)✅ (view plot)115.00 (+2.13%)116.64 (98.59%)
serialize_authorize✅ (view plot)12,396.00 (+1.46%)12,498.97 (99.18%)✅ (view plot)5,343.00 (+0.55%)5,420.29 (98.57%)✅ (view plot)7,456.00 (+0.62%)7,556.69 (98.67%)✅ (view plot)8.00 (-24.42%)13.28 (60.22%)✅ (view plot)140.00 (+3.06%)140.43 (99.70%)
serialize_deserialize_authorize✅ (view plot)24,614.00 (+0.53%)24,715.16 (99.59%)✅ (view plot)9,950.00 (+0.49%)10,019.52 (99.31%)✅ (view plot)14,049.00 (+0.57%)14,141.70 (99.34%)✅ (view plot)34.00 (-5.56%)41.48 (81.97%)✅ (view plot)297.00 (+0.58%)298.81 (99.39%)
serialize_deserialize_handle_authorize✅ (view plot)30,256.00 (+0.28%)30,386.55 (99.57%)✅ (view plot)12,127.00 (+0.24%)12,204.29 (99.37%)✅ (view plot)17,161.00 (+0.24%)17,270.28 (99.37%)✅ (view plot)64.00 (+8.42%)64.84 (98.70%)✅ (view plot)365.00 (+0.15%)368.83 (98.96%)
serialize_deserialize_handle_submit✅ (view plot)126,625.00 (+0.16%)126,992.57 (99.71%)✅ (view plot)73,307.00 (+0.08%)73,590.18 (99.62%)✅ (view plot)105,090.00 (+0.10%)105,470.23 (99.64%)✅ (view plot)114.00 (-5.15%)130.39 (87.43%)✅ (view plot)599.00 (+0.65%)599.47 (99.92%)
serialize_deserialize_handle_subscribe✅ (view plot)27,562.00 (+0.36%)27,604.96 (99.84%)✅ (view plot)9,650.00 (+0.16%)9,736.35 (99.11%)✅ (view plot)13,647.00 (+0.15%)13,768.27 (99.12%)✅ (view plot)67.00 (+2.37%)73.25 (91.47%)✅ (view plot)388.00 (+0.52%)388.75 (99.81%)
serialize_deserialize_submit✅ (view plot)115,333.00 (+0.22%)115,616.21 (99.76%)✅ (view plot)68,167.00 (+0.17%)68,376.69 (99.69%)✅ (view plot)97,843.00 (+0.20%)98,115.40 (99.72%)✅ (view plot)61.00 (-11.04%)75.56 (80.73%)✅ (view plot)491.00 (+0.55%)492.40 (99.72%)
serialize_deserialize_subscribe✅ (view plot)23,016.00 (+0.59%)23,101.58 (99.63%)✅ (view plot)8,209.00 (+0.25%)8,291.83 (99.00%)✅ (view plot)11,566.00 (+0.29%)11,675.74 (99.06%)✅ (view plot)36.00 (-7.18%)44.09 (81.64%)🚨 (view plot | view alert)322.00 (+1.05%)321.59 (100.13%)
serialize_submit✅ (view plot)100,113.00 (+0.23%)100,426.30 (99.69%)✅ (view plot)61,566.00 (+0.09%)61,802.36 (99.62%)✅ (view plot)88,348.00 (+0.11%)88,655.73 (99.65%)✅ (view plot)43.00 (-20.96%)63.42 (67.80%)🚨 (view plot | view alert)330.00 (+1.63%)329.22 (100.24%)
serialize_subscribe✅ (view plot)11,507.00 (+1.59%)11,591.56 (99.27%)✅ (view plot)4,195.00 (+0.37%)4,281.35 (97.98%)✅ (view plot)5,837.00 (+0.37%)5,952.25 (98.06%)✅ (view plot)14.00 (-12.67%)19.02 (73.62%)🚨 (view plot | view alert)160.00 (+3.10%)159.70 (100.19%)

Bencher - Continuous Benchmarking
View Public Perf Page
Docs | Repo | Chat | Help

Copy link
Contributor

@Shourya742 Shourya742 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concept ACK
This approach can be optimized. Currently, we are spawning a new task immediately following the task we intend to terminate. A more efficient method would involve incorporating the receiver as one of the branches in the select! block within the task we wish to terminate. This adjustment would eliminate the need for an additional asynchronous task, thereby facilitating a more graceful shutdown rather than an abrupt one.
A possible code snippet for one such task can be:

let socket_reader_task = tokio::task::spawn(async move {
    let reader = BufReader::new(&*socket_reader);
    let mut messages = FramedRead::new(
        async_compat::Compat::new(reader),
        LinesCodec::new_with_max_length(MAX_LINE_LENGTH),
    );

    loop {
        tokio::select! {
            res = messages.next().fuse() => {
                match res {
                    Some(Ok(incoming)) => {
                        debug!("Receiving from Mining Device {}: {:?}", &host_, &incoming);
                        let incoming: json_rpc::Message = handle_result!(tx_status_reader, serde_json::from_str(&incoming));

                        if let v1::Message::StandardRequest(standard_req) = incoming {
                            if let Ok(Submit{..}) = standard_req.try_into() {
                                handle_result!(tx_status_reader, Self::save_share(self_.clone()));
                            }
                        }

                        let res = Self::handle_incoming_sv1(self_.clone(), incoming).await;
                        handle_result!(tx_status_reader, res);
                    },
                    Some(Err(_)) => {
                        handle_result!(tx_status_reader, Err(Error::Sv1MessageTooLong));
                    },
                    None => {
                        handle_result!(tx_status_reader, Err(
                            std::io::Error::new(
                                std::io::ErrorKind::ConnectionAborted,
                                "Connection closed by client"
                            )
                        ));
                        break;                    
                      }
                }
            },
            _ = cancellation_token_mining_device.cancelled().fuse() => {
                warn!("Cancellation token triggered: Initiating shutdown of sv1 downstream reader");
                break; 
            },
            _ = rx_shutdown_clone.recv().fuse() => {
                warn!("Shutdown signal received: Initiating shutdown of sv1 downstream reader");
                break;
            }
        }
    }
    warn!("Downstream: Reader task is terminating. Performing cleanup.");
    kill(&tx_shutdown_clone).await; 
});

roles/translator/src/lib/downstream_sv1/downstream.rs Outdated Show resolved Hide resolved
roles/translator/src/lib/downstream_sv1/downstream.rs Outdated Show resolved Hide resolved
roles/translator/src/lib/downstream_sv1/downstream.rs Outdated Show resolved Hide resolved
roles/translator/src/lib/downstream_sv1/downstream.rs Outdated Show resolved Hide resolved
roles/translator/src/lib/proxy/bridge.rs Outdated Show resolved Hide resolved
roles/translator/src/lib/proxy/bridge.rs Outdated Show resolved Hide resolved
roles/translator/src/lib/upstream_sv2/upstream.rs Outdated Show resolved Hide resolved
roles/translator/src/lib/upstream_sv2/upstream.rs Outdated Show resolved Hide resolved
roles/translator/src/lib/upstream_sv2/upstream.rs Outdated Show resolved Hide resolved
roles/translator/src/main.rs Outdated Show resolved Hide resolved
@lorbax
Copy link
Collaborator Author

lorbax commented Jun 25, 2024

Concept ACK This approach can be optimized. Currently, we are spawning a new task immediately following the task we intend to terminate. A more efficient method would involve incorporating the receiver as one of the branches in the select! block within the task we wish to terminate. This adjustment would eliminate the need for an additional asynchronous task, thereby facilitating a more graceful shutdown rather than an abrupt one. A possible code snippet for one such task can be:

let socket_reader_task = tokio::task::spawn(async move {
    let reader = BufReader::new(&*socket_reader);
    let mut messages = FramedRead::new(
        async_compat::Compat::new(reader),
        LinesCodec::new_with_max_length(MAX_LINE_LENGTH),
    );

    loop {
        tokio::select! {
            res = messages.next().fuse() => {
                match res {
                    Some(Ok(incoming)) => {
                        debug!("Receiving from Mining Device {}: {:?}", &host_, &incoming);
                        let incoming: json_rpc::Message = handle_result!(tx_status_reader, serde_json::from_str(&incoming));

                        if let v1::Message::StandardRequest(standard_req) = incoming {
                            if let Ok(Submit{..}) = standard_req.try_into() {
                                handle_result!(tx_status_reader, Self::save_share(self_.clone()));
                            }
                        }

                        let res = Self::handle_incoming_sv1(self_.clone(), incoming).await;
                        handle_result!(tx_status_reader, res);
                    },
                    Some(Err(_)) => {
                        handle_result!(tx_status_reader, Err(Error::Sv1MessageTooLong));
                    },
                    None => {
                        handle_result!(tx_status_reader, Err(
                            std::io::Error::new(
                                std::io::ErrorKind::ConnectionAborted,
                                "Connection closed by client"
                            )
                        ));
                        break;                    
                      }
                }
            },
            _ = cancellation_token_mining_device.cancelled().fuse() => {
                warn!("Cancellation token triggered: Initiating shutdown of sv1 downstream reader");
                break; 
            },
            _ = rx_shutdown_clone.recv().fuse() => {
                warn!("Shutdown signal received: Initiating shutdown of sv1 downstream reader");
                break;
            }
        }
    }
    warn!("Downstream: Reader task is terminating. Performing cleanup.");
    kill(&tx_shutdown_clone).await; 
});

That is a particular task, but how would you do with this task? I could not find a better solution.

        let task = tokio::task::spawn(async move {
            loop {
                let msg = handle_result!(tx_status, rx_sv1_downstream.clone().recv().await);

                match msg {
                    DownstreamMessages::SubmitShares(share) => {
                        handle_result!(
                            tx_status,
                            Self::handle_submit_shares(self_.clone(), share).await
                        );
                    }
                    DownstreamMessages::SetDownstreamTarget(new_target) => {
                        handle_result!(
                            tx_status,
                            Self::handle_update_downstream_target(self_.clone(), new_target)
                        );
                    }
                };
            }
        });
        tokio::task::spawn(async move {
            tokio::select! {
                _ = cancellation_token_handle_downstream.cancelled() => {
                    task.abort();
                    warn!("Shutting down handle_result task");
                },
            }
        });

and what do you think about using a mutex as JoinaHandler` collector for spawned tasks and close all of them in task in the main?

@Shourya742
Copy link
Contributor

Shourya742 commented Jun 25, 2024

Concept ACK This approach can be optimized. Currently, we are spawning a new task immediately following the task we intend to terminate. A more efficient method would involve incorporating the receiver as one of the branches in the select! block within the task we wish to terminate. This adjustment would eliminate the need for an additional asynchronous task, thereby facilitating a more graceful shutdown rather than an abrupt one. A possible code snippet for one such task can be:

let socket_reader_task = tokio::task::spawn(async move {
    let reader = BufReader::new(&*socket_reader);
    let mut messages = FramedRead::new(
        async_compat::Compat::new(reader),
        LinesCodec::new_with_max_length(MAX_LINE_LENGTH),
    );

    loop {
        tokio::select! {
            res = messages.next().fuse() => {
                match res {
                    Some(Ok(incoming)) => {
                        debug!("Receiving from Mining Device {}: {:?}", &host_, &incoming);
                        let incoming: json_rpc::Message = handle_result!(tx_status_reader, serde_json::from_str(&incoming));

                        if let v1::Message::StandardRequest(standard_req) = incoming {
                            if let Ok(Submit{..}) = standard_req.try_into() {
                                handle_result!(tx_status_reader, Self::save_share(self_.clone()));
                            }
                        }

                        let res = Self::handle_incoming_sv1(self_.clone(), incoming).await;
                        handle_result!(tx_status_reader, res);
                    },
                    Some(Err(_)) => {
                        handle_result!(tx_status_reader, Err(Error::Sv1MessageTooLong));
                    },
                    None => {
                        handle_result!(tx_status_reader, Err(
                            std::io::Error::new(
                                std::io::ErrorKind::ConnectionAborted,
                                "Connection closed by client"
                            )
                        ));
                        break;                    
                      }
                }
            },
            _ = cancellation_token_mining_device.cancelled().fuse() => {
                warn!("Cancellation token triggered: Initiating shutdown of sv1 downstream reader");
                break; 
            },
            _ = rx_shutdown_clone.recv().fuse() => {
                warn!("Shutdown signal received: Initiating shutdown of sv1 downstream reader");
                break;
            }
        }
    }
    warn!("Downstream: Reader task is terminating. Performing cleanup.");
    kill(&tx_shutdown_clone).await; 
});

That is a particular task, but how would you do with this task? I could not find a better solution.

        let task = tokio::task::spawn(async move {
            loop {
                let msg = handle_result!(tx_status, rx_sv1_downstream.clone().recv().await);

                match msg {
                    DownstreamMessages::SubmitShares(share) => {
                        handle_result!(
                            tx_status,
                            Self::handle_submit_shares(self_.clone(), share).await
                        );
                    }
                    DownstreamMessages::SetDownstreamTarget(new_target) => {
                        handle_result!(
                            tx_status,
                            Self::handle_update_downstream_target(self_.clone(), new_target)
                        );
                    }
                };
            }
        });
        tokio::task::spawn(async move {
            tokio::select! {
                _ = cancellation_token_handle_downstream.cancelled() => {
                    task.abort();
                    warn!("Shutting down handle_result task");
                },
            }
        });

and what do you think about using a mutex as JoinaHandler` collector for spawned tasks and close all of them in task in the main?

Ah, I missed it! In that case, aggregating JoinHandles with simultaneous termination would make more sense.

@lorbax lorbax force-pushed the translator-restart-if-disconnected-from-upstream branch from b092514 to c8d8472 Compare June 25, 2024 15:00
@lorbax
Copy link
Collaborator Author

lorbax commented Jun 25, 2024

Applied the changes suggested by @Shourya742

@lorbax
Copy link
Collaborator Author

lorbax commented Jun 25, 2024

Concept ACK This approach can be optimized. Currently, we are spawning a new task immediately following the task we intend to terminate. A more efficient method would involve incorporating the receiver as one of the branches in the select! block within the task we wish to terminate. This adjustment would eliminate the need for an additional asynchronous task, thereby facilitating a more graceful shutdown rather than an abrupt one. A possible code snippet for one such task can be:

let socket_reader_task = tokio::task::spawn(async move {
    let reader = BufReader::new(&*socket_reader);
    let mut messages = FramedRead::new(
        async_compat::Compat::new(reader),
        LinesCodec::new_with_max_length(MAX_LINE_LENGTH),
    );

    loop {
        tokio::select! {
            res = messages.next().fuse() => {
                match res {
                    Some(Ok(incoming)) => {
                        debug!("Receiving from Mining Device {}: {:?}", &host_, &incoming);
                        let incoming: json_rpc::Message = handle_result!(tx_status_reader, serde_json::from_str(&incoming));

                        if let v1::Message::StandardRequest(standard_req) = incoming {
                            if let Ok(Submit{..}) = standard_req.try_into() {
                                handle_result!(tx_status_reader, Self::save_share(self_.clone()));
                            }
                        }

                        let res = Self::handle_incoming_sv1(self_.clone(), incoming).await;
                        handle_result!(tx_status_reader, res);
                    },
                    Some(Err(_)) => {
                        handle_result!(tx_status_reader, Err(Error::Sv1MessageTooLong));
                    },
                    None => {
                        handle_result!(tx_status_reader, Err(
                            std::io::Error::new(
                                std::io::ErrorKind::ConnectionAborted,
                                "Connection closed by client"
                            )
                        ));
                        break;                    
                      }
                }
            },
            _ = cancellation_token_mining_device.cancelled().fuse() => {
                warn!("Cancellation token triggered: Initiating shutdown of sv1 downstream reader");
                break; 
            },
            _ = rx_shutdown_clone.recv().fuse() => {
                warn!("Shutdown signal received: Initiating shutdown of sv1 downstream reader");
                break;
            }
        }
    }
    warn!("Downstream: Reader task is terminating. Performing cleanup.");
    kill(&tx_shutdown_clone).await; 
});

That is a particular task, but how would you do with this task? I could not find a better solution.

        let task = tokio::task::spawn(async move {
            loop {
                let msg = handle_result!(tx_status, rx_sv1_downstream.clone().recv().await);

                match msg {
                    DownstreamMessages::SubmitShares(share) => {
                        handle_result!(
                            tx_status,
                            Self::handle_submit_shares(self_.clone(), share).await
                        );
                    }
                    DownstreamMessages::SetDownstreamTarget(new_target) => {
                        handle_result!(
                            tx_status,
                            Self::handle_update_downstream_target(self_.clone(), new_target)
                        );
                    }
                };
            }
        });
        tokio::task::spawn(async move {
            tokio::select! {
                _ = cancellation_token_handle_downstream.cancelled() => {
                    task.abort();
                    warn!("Shutting down handle_result task");
                },
            }
        });

and what do you think about using a mutex as JoinaHandler` collector for spawned tasks and close all of them in task in the main?

Ah, I missed it! In that case, aggregating JoinHandles with simultaneous termination would make more sense.

We can discuss it at the call, when I realized using a mutex as a joinhandler collector for the tasks, I liked it but I do not want to change it if not necessary.

@pavlenex pavlenex added this to the 1.0.2 milestone Jun 25, 2024
@plebhash plebhash linked an issue Jun 25, 2024 that may be closed by this pull request
@lorbax lorbax force-pushed the translator-restart-if-disconnected-from-upstream branch 5 times, most recently from b158a03 to d4800cd Compare July 11, 2024 10:05
@GitGab19
Copy link
Collaborator

@lorbax is it ready for a final review?
Did you complete the changes we discussed last week during dev call?

@lorbax
Copy link
Collaborator Author

lorbax commented Jul 15, 2024

@lorbax is it ready for a final review? Did you complete the changes we discussed last week during dev call?

yes!

lorbax added a commit to lorbax/stratum that referenced this pull request Jul 16, 2024
in this PR is fixed the fallback to solo-mining in the case that the
upstream sends a `SubmitShareError` on a valid share AND there are no
other available upstreams in the JDC config. To check this PR you should
follow the same procedure as described in the commit messages of the
branch of this PR (which deals with a similar problem).
stratum-mining#1001
The only exception is that the JDC should use this config
test/config/change-upstream/jdc-config-local-example-change-upstream-solo-fallback.toml

panics:
2024-07-11T15:57:25.769622Z  INFO roles_logic_sv2::handlers::template_distribution: Received NewTemplate with id: 694, is future: true
thread 'tokio-runtime-worker' panicked at jd-client/src/lib/downstream.rs:379:74:
called `Option::unwrap()` on a `None` value
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
2024-07-11T15:57:35.319832Z  INFO roles_logic_sv2::handlers::mining: Received UpdateChannel->Extended message
thread 'tokio-runtime-worker' panicked at jd-client/src/lib/downstream.rs:525:13:
not yet implemented
2024-07-11T15:57:41.633138Z ERROR network_helpers_sv2::noise_connection_tokio: Disconnected from client while reading : early eof - 127.0.0.1:56278
^C2024-07-11T15:57:57.023091Z  INFO jd_client: Interrupt received
@lorbax lorbax mentioned this pull request Jul 19, 2024
@lorbax lorbax force-pushed the translator-restart-if-disconnected-from-upstream branch from d4800cd to 47fd420 Compare July 31, 2024 14:25
@plebhash
Copy link
Collaborator

plebhash commented Aug 1, 2024

yesterday we had a PR review club, where we dived into this PR

we found some issues (UpstreamShutdown(UpstreamIncoming(UnexpectedMessage(1)))), and after further investigation I reported it into #1079 and fixed via #1080

after that, I was able to manually reproduce #1004, but only with the trick described by @GitGab19 here: #844 (comment)

basically I launched one pool with this patch (listening on port 44254), and another pool without any patches (listening on port 34254)

then I modified JDC config so it would have two [[upstreams]] sections:

[[upstreams]]
authority_pubkey = "9auqWEzQDVyd2oe1JVGFLMLHZtCo2FFqZwtKA5gd9xbuEu7PH72"
pool_address = "127.0.0.1:44254"
jd_address = "127.0.0.1:34264"
# Pool signature (string to be included in coinbase tx)
pool_signature = "Stratum v2 SRI Pool"

[[upstreams]]
authority_pubkey = "9auqWEzQDVyd2oe1JVGFLMLHZtCo2FFqZwtKA5gd9xbuEu7PH72"
pool_address = "127.0.0.1:34254"
jd_address = "127.0.0.1:34264"
# Pool signature (string to be included in coinbase tx)
pool_signature = "Stratum v2 SRI Pool"

finally, I was able to verify that this PR is allowing tProxy to restart after the patched pool rejects a valid share


unfortunately the MG mock for a bad pool isn't working reliably... I tried following the instructions from 3d8793a multiple times but the behavior isn't deterministic

given that:

I think we can probably drop 3d8793a and only use this PR to fix the bug

we are keeping track of this bug on #1077 and it will eventually be added to CI (via MG or integration tests, to be defined)

@lorbax
Copy link
Collaborator Author

lorbax commented Aug 2, 2024

then I modified JDC config so it would have two [[upstreams]] sections:

[[upstreams]]
authority_pubkey = "9auqWEzQDVyd2oe1JVGFLMLHZtCo2FFqZwtKA5gd9xbuEu7PH72"
pool_address = "127.0.0.1:44254"
jd_address = "127.0.0.1:34264"
# Pool signature (string to be included in coinbase tx)
pool_signature = "Stratum v2 SRI Pool"

[[upstreams]]
authority_pubkey = "9auqWEzQDVyd2oe1JVGFLMLHZtCo2FFqZwtKA5gd9xbuEu7PH72"
pool_address = "127.0.0.1:34254"
jd_address = "127.0.0.1:34264"
# Pool signature (string to be included in coinbase tx)
pool_signature = "Stratum v2 SRI Pool"

I already had included a jdc config for testing the PR
https://github.com/lorbax/stratum/blob/translator-restart-if-disconnected-from-upstream/test/config/change-upstream/jdc-config-local-example-change-upstream.toml
so that you do have to modify it. It is described in the PR that all the configs to be used are in test/config/change-upstream/

unfortunately the MG mock for a bad pool isn't working reliably... I tried following the instructions from 3d8793a multiple times but the behavior isn't deterministic

The only thing is that the MG should need a new MG feature in order to work flawlessly, but I used the MG test yesterday to replicate the issue and to test the fix (also after the review club) and it does the job. Can you provide more information about the difficulties or inconsistencies that you got using that test? "isn't working reliably" is a bit vague.

Copy link
Collaborator

@GitGab19 GitGab19 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.
Since it seems not reliable (without the patch you mentioned us some days ago @lorbax), and we decided to move those integration tests in another place, I would just remove the MG test and the relative configs from this PR.
In the next days I will properly test it and review it again.

roles/translator/src/main.rs Outdated Show resolved Hide resolved
@lorbax lorbax force-pushed the translator-restart-if-disconnected-from-upstream branch 2 times, most recently from 8228883 to 9a696b3 Compare August 5, 2024 07:35
@lorbax
Copy link
Collaborator Author

lorbax commented Aug 5, 2024

onnection refused (os error 61)

this is the first time that I see this error. Usually the channel ID is fixed for the entire duration of the connection and starts from 1, but also in the MG test the message SubmitShareError has channel id is 1.

lorbax added a commit to lorbax/stratum that referenced this pull request Aug 5, 2024
in this PR is fixed the fallback to solo-mining in the case that the
upstream sends a `SubmitShareError` on a valid share AND there are no
other available upstreams in the JDC config. To check this PR you should
follow the same procedure as described in the commit messages of the
branch of this PR (which deals with a similar problem).
stratum-mining#1001
The only exception is that the JDC should use this config
test/config/change-upstream/jdc-config-local-example-change-upstream-solo-fallback.toml

panics:
2024-07-11T15:57:25.769622Z  INFO roles_logic_sv2::handlers::template_distribution: Received NewTemplate with id: 694, is future: true
thread 'tokio-runtime-worker' panicked at jd-client/src/lib/downstream.rs:379:74:
called `Option::unwrap()` on a `None` value
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
2024-07-11T15:57:35.319832Z  INFO roles_logic_sv2::handlers::mining: Received UpdateChannel->Extended message
thread 'tokio-runtime-worker' panicked at jd-client/src/lib/downstream.rs:525:13:
not yet implemented
2024-07-11T15:57:41.633138Z ERROR network_helpers_sv2::noise_connection_tokio: Disconnected from client while reading : early eof - 127.0.0.1:56278
^C2024-07-11T15:57:57.023091Z  INFO jd_client: Interrupt received
lorbax added a commit to lorbax/stratum that referenced this pull request Aug 5, 2024
in this PR is fixed the fallback to solo-mining in the case that the
upstream sends a `SubmitShareError` on a valid share AND there are no
other available upstreams in the JDC config. To check this PR you should
follow the same procedure as described in the commit messages of the
branch of this PR (which deals with a similar problem).
stratum-mining#1001
The only exception is that the JDC should use this config
test/config/change-upstream/jdc-config-local-example-change-upstream-solo-fallback.toml

panics:
2024-07-11T15:57:25.769622Z  INFO roles_logic_sv2::handlers::template_distribution: Received NewTemplate with id: 694, is future: true
thread 'tokio-runtime-worker' panicked at jd-client/src/lib/downstream.rs:379:74:
called `Option::unwrap()` on a `None` value
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
2024-07-11T15:57:35.319832Z  INFO roles_logic_sv2::handlers::mining: Received UpdateChannel->Extended message
thread 'tokio-runtime-worker' panicked at jd-client/src/lib/downstream.rs:525:13:
not yet implemented
2024-07-11T15:57:41.633138Z ERROR network_helpers_sv2::noise_connection_tokio: Disconnected from client while reading : early eof - 127.0.0.1:56278
^C2024-07-11T15:57:57.023091Z  INFO jd_client: Interrupt received
@lorbax lorbax force-pushed the translator-restart-if-disconnected-from-upstream branch from 9a696b3 to a88ebed Compare August 5, 2024 09:00
Copy link
Contributor

@jbesraa jbesraa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nack. I dont think the translator should look like that. the start function is far from anything we can work with.
please look here https://github.com/stratum-mining/stratum/blob/fbb846450a3053f7e57c9bdf90d407d4e10449da/roles/translator/src/lib/mod.rs I think this should be more or less the approach.
If you need to persist data, then we should add write/read to (disk?) abilities and save the data and recover it when we need it later.

roles/translator/src/main.rs Show resolved Hide resolved
roles/translator/src/main.rs Show resolved Hide resolved
Copy link
Collaborator

@plebhash plebhash left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I approved this earlier but then I noticed two MG tests are failing deterministically, so unfortunately I have to revert my approval.

Basically, now we're making tProxy attempt to reconnect to Upstream every time there's a State::UpstreamShutdown, regardless of what caused it.

pool-sri-test-close-channel

To be honest, I'm not sure what this MG tries to achieve in the first place.

There's nothing in the specs stating that a proxy must close the entire connection when it receives a CloseChannel from Upstream.

This feels like a naive test. I would lean towards completely removing this test from the repo (ideally on a separate PR, to be merged before this one), but it would be good to get feedback from others.

translation-proxy-broke-pool

This MG test asserts that the tProxy closes the connection when it receives a bad extranonce from Upstream.

That feels like a reasonable behavior, but it kind of conflicts with the current approach of this PR, because a bad extrononce also triggers a State::UpstreamShutdown, which now causes tProxy to attempt to reconnect.

So if Upstream sends a bad extranonce, should tProxy really keep trying to reconnect forever?

I'm not sure how to proceed here.


@GitGab19 @lorbax please let me know your thoughts.

@plebhash
Copy link
Collaborator

plebhash commented Aug 5, 2024

after further investigating the MG tests described above, I came up with a solution: plebhash@e63c249

basically, tProxy should only try to reconnect if the error coming from Upstream is Error::ChannelErrorReceiver(_) or Error::TokioChannelErrorRecv(_), which indicates an abrupt interruption on the channel managed by network_helpers_sv2

with this approach, both MG tests listed above are passing

I still think that pool-sri-test-close-channel is a bit weird and deserves a deeper investigation, but we can leave that for #1081

@lorbax please have a look at the solution on the suggested commit... if you agree, feel free to just port it to your branch and squash it into the single commit of this PR, no need to worry about credit/authorship

@lorbax
Copy link
Collaborator Author

lorbax commented Aug 6, 2024

after further investigating the MG tests described above, I came up with a solution: plebhash@e63c249

basically, tProxy should only try to reconnect if the error coming from Upstream is Error::ChannelErrorReceiver(_) or Error::TokioChannelErrorRecv(_), which indicates an abrupt interruption on the channel managed by network_helpers_sv2

with this approach, both MG tests listed above are passing

I still think that pool-sri-test-close-channel is a bit weird and deserves a deeper investigation, but we can leave that for #1081

@lorbax please have a look at the solution on the suggested commit... if you agree, feel free to just port it to your branch and squash it into the single commit of this PR, no need to worry about credit/authorship

looks good

@lorbax
Copy link
Collaborator Author

lorbax commented Aug 12, 2024

after further investigating the MG tests described above, I came up with a solution: plebhash@e63c249

basically, tProxy should only try to reconnect if the error coming from Upstream is Error::ChannelErrorReceiver(_) or Error::TokioChannelErrorRecv(_), which indicates an abrupt interruption on the channel managed by network_helpers_sv2

with this approach, both MG tests listed above are passing

I still think that pool-sri-test-close-channel is a bit weird and deserves a deeper investigation, but we can leave that for #1081

@lorbax please have a look at the solution on the suggested commit... if you agree, feel free to just port it to your branch and squash it into the single commit of this PR, no need to worry about credit/authorship

Hi!
I had a closer look to your proposal. When the Translator gets disconnected, it receives

translator_sv2::error::Error::ChannelErrorReceiver

but you suggests to send UpstreamTryReconnect. Did you have an actual case in which the jdc disconnects the Translator and you get

translator_sv2::error::Error::TokioChannelErrorRecv

?
why it is not enough the following?

            match e {
                Error::ChannelErrorReceiver(_) => {
                    tx.send(Status {
                        state: State::UpstreamTryReconnect(e),
                    })
                        .await
                        .unwrap_or(());
                },
                _ => {
                    tx.send(Status {
                        state: State::UpstreamShutdown(e),
                    })
                        .await
                        .unwrap_or(());
                },
            }

BTW, it appears to me that the mixed use of std_async channels and tokio channels is red flag. what do you think?

@plebhash
Copy link
Collaborator

@lorbax network_helpers_sv2 offers two flavors of channels: async_std and tokio: https://github.com/stratum-mining/stratum/tree/dev/roles/roles-utils/network-helpers/src

so I just tried to cover for both flavors

but feel free to only do ChannelErrorReceiver if you think that's overkill

@lorbax
Copy link
Collaborator Author

lorbax commented Aug 12, 2024

@lorbax network_helpers_sv2 offers two flavors of channels: async_std and tokio: https://github.com/stratum-mining/stratum/tree/dev/roles/roles-utils/network-helpers/src

so I just tried to cover for both flavors

but feel free to only do ChannelErrorReceiver if you think that's overkill

my idea is to introduce changes only when strictly needed in order to avoid unexpected behavior. What do you think about?
again, I stress also that we are using two types of channels, from tokio and from async_std. It seems a red flag to me, what do you think also about this?

@lorbax lorbax force-pushed the translator-restart-if-disconnected-from-upstream branch from a88ebed to 63d37dc Compare August 12, 2024 15:07
@plebhash
Copy link
Collaborator

@lorbax network_helpers_sv2 offers two flavors of channels: async_std and tokio: https://github.com/stratum-mining/stratum/tree/dev/roles/roles-utils/network-helpers/src
so I just tried to cover for both flavors
but feel free to only do ChannelErrorReceiver if you think that's overkill

my idea is to introduce changes only when strictly needed in order to avoid unexpected behavior. What do you think about? again, I stress also that we are using two types of channels, from tokio and from async_std. It seems a red flag to me, what do you think also about this?

yes those are very good points and I agree it's kind of a red flag... I'm not sure what are the historical motivations for having two different flavors of channels, but I think that when we start making deeper refactoring into the application layer we should discuss the possibility of getting rid of some of the two kinds of channels in favor of standardization

but that's a discussion for the future

for now, on the context of this PR, I agree we should move forward only covering ChannelErrorReceiver on the match arms and just ignore the TokioChannelErrorRecv, since tokio channels are not even enabled on the network_helpers_sv2 feature flags:

network_helpers_sv2 = { version = "2.0.0", path = "../roles-utils/network-helpers", features=["async_std", "with_buffer_pool"] }

Copy link
Collaborator

@GitGab19 GitGab19 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tested twice and it works as expected, good work @lorbax !
Just a few minor nits and it's ready to go

roles/translator/src/main.rs Outdated Show resolved Hide resolved
roles/translator/src/main.rs Outdated Show resolved Hide resolved
- Add `start` function and put starting logic there.
- Every AbortHandle of each task is stored in a collector, which is a
mutex.
- Added `kill_tasks` function that takes in input this mutex, access
it, pop each JoinHandle and kill the corresponding task.

When receive an UpstreamShutdown does the following:
1. waits a random amount of time s 0<s<3 secs (if there 100000 TProxies,
then 100000 proxies will connect again _at the same time_).
2. calls `kill_tasks`
3. calls `start`

use tokio::task in favor of async_std::task
@lorbax lorbax force-pushed the translator-restart-if-disconnected-from-upstream branch from 63d37dc to a74262a Compare August 13, 2024 09:40
@lorbax
Copy link
Collaborator Author

lorbax commented Aug 13, 2024

@lorbax network_helpers_sv2 offers two flavors of channels: async_std and tokio: https://github.com/stratum-mining/stratum/tree/dev/roles/roles-utils/network-helpers/src
so I just tried to cover for both flavors
but feel free to only do ChannelErrorReceiver if you think that's overkill

my idea is to introduce changes only when strictly needed in order to avoid unexpected behavior. What do you think about? again, I stress also that we are using two types of channels, from tokio and from async_std. It seems a red flag to me, what do you think also about this?

yes those are very good points and I agree it's kind of a red flag... I'm not sure what are the historical motivations for having two different flavors of channels, but I think that when we start making deeper refactoring into the application layer we should discuss the possibility of getting rid of some of the two kinds of channels in favor of standardization

but that's a discussion for the future

for now, on the context of this PR, I agree we should move forward only covering ChannelErrorReceiver on the match arms and just ignore the TokioChannelErrorRecv, since tokio channels are not even enabled on the network_helpers_sv2 feature flags:

network_helpers_sv2 = { version = "2.0.0", path = "../roles-utils/network-helpers", features=["async_std", "with_buffer_pool"] }

with channel I mean channels that allows a task to send data to another task, like mpsc channels.
In the translator we are using async_std channels
https://docs.rs/async-std/latest/async_std/channel/fn.bounded.html
but the error variant also take in consideration the mpmc channel in tokio
https://docs.rs/tokio/latest/tokio/sync/broadcast/index.html

These are not things that are exported by network-helpers (or at least it appears to me like this). Perhaps is a good idea to make it uniform and substitute the usage of async_std::channel with tokio::sync::mpsc::channel. But in this case we must recall to change it also in send_status function

@jbesraa
Copy link
Contributor

jbesraa commented Aug 13, 2024

@GitGab19 lets not merge unless we have all CI checks in the green please

@plebhash plebhash mentioned this pull request Aug 13, 2024
@pavlenex pavlenex merged commit ee63dc5 into stratum-mining:dev Aug 13, 2024
33 checks passed
@plebhash plebhash mentioned this pull request Aug 18, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Translator restarts if disconnected from upstream
7 participants