Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

simplify the method of stream forward in dispatch_stream #206

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

xmh0511
Copy link

@xmh0511 xmh0511 commented Dec 6, 2023

No description provided.

match res {
Poll::Ready(Ok(count)) => {
*a_to_b = TransferState::ShuttingDown(count);
match tokio::time::timeout(a_to_b_timeout_duration, a_reader.read(&mut buf[..])).await {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have you tested this?

does this not break the long running connections ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As exposed by the signature copy_buf_bidirectional_with_timeout, which is specified by timeout, and this function is only called in dispatch_stream in https://github.com/Watfaq/clash-rs/blob/master/clash_lib/src/app/dispatcher/dispatcher.rs#L138, which specifies the Duration::from_secs(10), so this is not for long-running connection. The current implementation does work for me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think you are confused with the timeout. it's mean to be the connection idle timeout.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you test if your change can successfully request https://httpbin.yba.dev/drip\?duration\=11 ?

Copy link
Author

@xmh0511 xmh0511 Dec 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not confused by timeout, tokio::time::timeout does mean the read should read data within the specified time duration. In other words, if the read future is idle over a specified time, the read future is cancelled.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a numbytes param to the url you'll see more *s https://httpbin.yba.dev/drip\?duration\=20\&numbytes\=20

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok i finished the other stuff. now i'm getting back to this.

as you mentioned in the link, this timeout needs to be the duration during which no traffic is sent, not the entire time it takes to finish the connection.

think about you need to download a large file, it take 2hrs, but in the 2hrs, there's always data being transfered, you can't just give it a 10s timeout or whatever.

Copy link
Author

@xmh0511 xmh0511 Dec 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

think about you need to download a large file, it take 2hrs, but in the 2hrs, there's always data being transfered, you can't just give it a 10s timeout or whatever.

Consider this case, the peer sends a byte every 1hrs or 12hrs, is this permitted? In this implementation, the timeout just regulates that the peer must send some traffic within every specified time. In other words, if the peer occupies the connection and does not send some traffic within the specified time, the connection will be closed. Every send traffic within the specified time will make the timeout reset(i.e. recalculate the time)

I think the meaning of the timeout is to prevent "Slow HTTP Attack"/"Slowloris". If I didn't understand correctly, please tell me what the necessary logic here, and I will try to implement it in this PR.

Copy link
Author

@xmh0511 xmh0511 Dec 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After reviewing the original implementation carefully, the timeout logic is, that only if one direction is complete, it will set the timeout for the other direction. In other words, if two directions are both pending(i.e. no traffic can be accepted), there is no timeout imposed on them. Is this really what we want here?

Copy link
Author

@xmh0511 xmh0511 Dec 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify what the #1 implementation does in the PR, I need to say the implementation cannot close a long-running traffic.

think about you need to download a large file, it take 2hrs, but in the 2hrs, there's always data being transfered, you can't just give it a 10s timeout or whatever.

The specified timeout 10 does not mean the whole data communication must be completed within 10s, it's a misunderstanding of the implementation. The #1 implementation means when we do read in each loop, some data traffic must be read within 10s, and in the next loop, the timeout is re-calculated for the next read.

Consider this scenario, after A and B have been connected, B writes some bytes to A but is not end, then B does not send any traffic to A within 2hrs, and after 2hrs, B then continues to send data to A. Isn't the duration of the 2hrs considered an idle connection?

@ibigbug ibigbug closed this Dec 6, 2023
@ibigbug ibigbug reopened this Dec 6, 2023
@xmh0511
Copy link
Author

xmh0511 commented Dec 7, 2023

@ibigbug I have 1:1 implemented the original logic of forwarding stream in a concise way, could you take a look at the latest implementation and make further progress?

@ibigbug
Copy link
Member

ibigbug commented Dec 7, 2023

what are you exactly trying to do in your patch? could you add a description, what's the issue and why your change is useful?

@xmh0511
Copy link
Author

xmh0511 commented Dec 8, 2023

what are you exactly trying to do in your patch? could you add a description, what's the issue and why your change is useful?

I'm trying to implement the original logic concisely and effectively. The original logic is a hand-write state machine, which mixes the a_to_b and b_to_a into a single machine state, which results in whenever one task is wakened up, the other task will be polled again, especially when both of them are in TransferState::Running state, and in this state, when one poll_read is wakened, the other hand's poll_read will be called again even though it previously returns Poll::Pending and the current call is not triggered by the waken of itself, which is not a correct use of poll_read.

In this patch, I divorce a_to_b and b_to_a into separate tasks, and they do not influence each other when one of them is wakened, which is effective and maintainable.

@ibigbug
Copy link
Member

ibigbug commented Dec 8, 2023

have you checked the tokio implementation? https://docs.rs/tokio/latest/src/tokio/io/util/copy_bidirectional.rs.html#72-91

does it have the same issue that you are mentioning here?

i think i still don't understand how your change is different, i see that you are creating to tokio spawns and awaiting them in sequence, what's the difference of ready! two futures? isn't it the same thing.

@xmh0511
Copy link
Author

xmh0511 commented Dec 9, 2023

what's the difference of ready! two futures? isn't it the same thing.

When a future is wakened, the other future will be polled again even though it is not notified by its ctx.wake(). Specifically, when the poll_read of a_to_b is wakened by its ctx.wake(), that means it has something to be read, however, this waken will make poll_read of b_to_a be invoked again and it can still return Poll::Pending.

In this patch, the implementation can make sure that they are wakened again by their ctx.wake(), respectively. Furthermore, the patch eliminate the complex hand-written machine state in the source code.

@ibigbug
Copy link
Member

ibigbug commented Dec 10, 2023

i think this line is problematic - the await is block, and in the case the 2nd future is errored/finished before the 1st, it'd never know https://github.com/Watfaq/clash-rs/pull/206/files#diff-85611fef80536aedcb4665af1a4b566df2341c0e3b6cdf60b28eaf9a1d81ba92R433

this may not be a problem.

@ibigbug
Copy link
Member

ibigbug commented Dec 10, 2023

When a future is wakened, the other future will be polled again even though it is not notified by its ctx.wake().

i think this is not a big issue https://rust-lang.github.io/async-book/02_execution/02_future.html

it's how Rust tell people to run 2 futures (the Join example)

@xmh0511
Copy link
Author

xmh0511 commented Dec 11, 2023

I think the main goal of this patch is the following:

  1. Implement the forwarding stream in a briefer way, the original impl needs 281 lines while this patch just needs 118 lines
  2. Eliminate the complex hand-written state machine to make the implementation more readable and maintainable
  3. This patch may have an effective performance since it does not have meaningless awaken, a_to_b and b_to_a are separately run on their own task space.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants