Skip to content

Commit

Permalink
Merge pull request #56 from aarani/inheritStream
Browse files Browse the repository at this point in the history
Network: TorStream should inherit IO.Stream
  • Loading branch information
aarani authored Apr 25, 2023
2 parents 737ac26 + 8a60fd1 commit 6069c42
Show file tree
Hide file tree
Showing 8 changed files with 138 additions and 66 deletions.
27 changes: 25 additions & 2 deletions NOnion.Tests/HiddenServicesTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
using System;
using System.IO;
using System.Linq;
using System.Net.Security;
using System.Security.Authentication;
using System.Security.Cryptography;
using System.Threading.Tasks;

Expand Down Expand Up @@ -86,7 +88,7 @@ private async Task<int> ReadExact(TorStream stream, byte[] buffer, int off, int
{
if (len - off <= 0) return 0;

var bytesRead = await stream.ReceiveAsync(buffer, off, len - off);
var bytesRead = await stream.ReadAsync(buffer, off, len - off);

if (bytesRead == 0 || bytesRead == -1)
throw new Exception("Not enough data");
Expand All @@ -110,6 +112,27 @@ public void CanBrowseFacebookOverHS()
Assert.ThrowsAsync(typeof(UnsuccessfulHttpRequestException), BrowseFacebookOverHS);
}

public async Task BrowseFacebookOverHSWithTLS()
{
TorDirectory directory = await TorDirectory.BootstrapAsync(FallbackDirectorySelector.GetRandomFallbackDirectory(), new DirectoryInfo(Path.GetTempPath()));

var client = await TorServiceClient.ConnectAsync(directory, "facebookwkhpilnemxj7asaniu7vnjjbiltxjqhye3mhbshg7kx5tfyd.onion:443");

var sslStream = new SslStream(client.GetStream(), true, (sender, cert, chain, sslPolicyErrors) => true);
await sslStream.AuthenticateAsClientAsync(string.Empty, null, SslProtocols.Tls12, false);

var httpClientOverSslStream = new TorHttpClient(sslStream, "www.facebookwkhpilnemxj7asaniu7vnjjbiltxjqhye3mhbshg7kx5tfyd.onion");
var facebookResponse = await httpClientOverSslStream.GetAsStringAsync("/", false);
Assert.That(facebookResponse.Contains("<html"), "Response from facebook was invalid.");
}

[Test]
[Retry(TestsRetryCount)]
public void CanBrowseFacebookOverHSWithTLS()
{
Assert.DoesNotThrowAsync(BrowseFacebookOverHSWithTLS);
}

public async Task EstablishAndCommunicateOverHSConnectionOnionStyle()
{
int descriptorUploadRetryLimit = 2;
Expand All @@ -134,7 +157,7 @@ public async Task EstablishAndCommunicateOverHSConnectionOnionStyle()
Task.Run(async () => {
var stream = await host.AcceptClientAsync();
var bytesToSendWithLength = BitConverter.GetBytes(dataToSendAndReceive.Length).Concat(dataToSendAndReceive).ToArray();
await stream.SendDataAsync(bytesToSendWithLength);
await stream.WriteAsync(bytesToSendWithLength, 0, bytesToSendWithLength.Length);
await stream.EndAsync();
});

Expand Down
2 changes: 0 additions & 2 deletions NOnion/Constants.fs
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,6 @@ module Constants =
// Time limit used for receving data in stream
let internal StreamReceiveTimeout = TimeSpan.FromSeconds 1.

let internal HttpClientBufferSize = 1024

let internal DefaultHttpHost = "127.0.0.1"

// NTor Handshake Constants
Expand Down
8 changes: 4 additions & 4 deletions NOnion/Directory/TorDirectory.fs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type TorDirectory =
)

let circuit = TorCircuit(guard)
let stream = TorStream(circuit)
use stream = new TorStream(circuit)

(*
* We always use FastCreate authentication because privacy is not important for mono-hop
Expand Down Expand Up @@ -216,7 +216,7 @@ type TorDirectory =
)

let circuit = TorCircuit(guard)
let stream = TorStream(circuit)
use stream = new TorStream(circuit)

(*
* We always use FastCreate authentication because privacy is not important for mono-hop
Expand Down Expand Up @@ -252,7 +252,7 @@ type TorDirectory =
circuit.Create CircuitNodeDetail.FastCreate
|> Async.Ignore

let consensusStream = TorStream circuit
use consensusStream = new TorStream(circuit)
do! consensusStream.ConnectToDirectory() |> Async.Ignore

let consensusHttpClient =
Expand Down Expand Up @@ -340,7 +340,7 @@ type TorDirectory =
(digestsChunk: array<string>)
=
async {
let descriptorsStream = TorStream circuit
use descriptorsStream = new TorStream(circuit)

do!
descriptorsStream.ConnectToDirectory()
Expand Down
35 changes: 12 additions & 23 deletions NOnion/Http/TorHttpClient.fs
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,11 @@ open NOnion
open NOnion.Network
open NOnion.Utility

type TorHttpClient(stream: TorStream, host: string) =
type TorHttpClient(stream: Stream, host: string) =

// Receives all the data stream until it reaches EOF (until stream receive a RELAY_END)
let rec ReceiveAll(memStream: MemoryStream) =
async {
let buffer = Array.zeroCreate Constants.HttpClientBufferSize

// Try to fill the buffer
let! bytesRead =
stream.Receive buffer 0 Constants.HttpClientBufferSize

if bytesRead > 0 then
memStream.Write(buffer, 0, bytesRead)
return! ReceiveAll memStream
}
let ReceiveAll memStream =
stream.CopyToAsync memStream |> Async.AwaitTask

member __.GetAsString (path: string) (forceUncompressed: bool) =
async {
Expand All @@ -42,10 +32,11 @@ type TorHttpClient(stream: TorStream, host: string) =
|> List.map(fun (k, v) -> sprintf "%s: %s\r\n" k v)
|> String.concat String.Empty

do!
let buffer =
sprintf "GET %s HTTP/1.0\r\n%s\r\n" path headers
|> Encoding.UTF8.GetBytes
|> stream.SendData
|> Encoding.ASCII.GetBytes

do! stream.AsyncWrite(buffer, 0, buffer.Length)

use memStream = new MemoryStream()

Expand Down Expand Up @@ -99,9 +90,7 @@ type TorHttpClient(stream: TorStream, host: string) =
|> Map.ofArray

match headersMap.TryGetValue "Content-Encoding" with
| false, _ ->
return
failwith "GetAsString: Content-Encoding header is missing"
| false, _
| true, "identity" -> return body |> Encoding.UTF8.GetString
| true, "deflate" ->
// DeflateStream needs the zlib header to be chopped off first
Expand Down Expand Up @@ -136,10 +125,11 @@ type TorHttpClient(stream: TorStream, host: string) =
|> List.map(fun (k, v) -> sprintf "%s: %s\r\n" k v)
|> String.concat String.Empty

do!
let buffer =
sprintf "POST %s HTTP/1.0\r\n%s\r\n%s" path headers payload
|> Encoding.ASCII.GetBytes
|> stream.SendData

do! stream.AsyncWrite(buffer, 0, buffer.Length)

use memStream = new MemoryStream()

Expand Down Expand Up @@ -188,8 +178,7 @@ type TorHttpClient(stream: TorStream, host: string) =

match headersMap.TryGetValue "Content-Encoding" with
| false, _ when body.Length = 0 -> return String.Empty
| false, _ ->
return failwith "PostString: Content-Encoding header is missing"
| false, _
| true, "identity" -> return body |> Encoding.UTF8.GetString
| true, "deflate" ->
// DeflateStream needs the zlib header to be chopped off first
Expand Down
114 changes: 89 additions & 25 deletions NOnion/Network/TorStream.fs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
namespace NOnion.Network

open System
open System.IO
open System.Threading
open System.Threading.Tasks
open System.Threading.Tasks.Dataflow

Expand All @@ -22,7 +24,9 @@ type private StreamReceiveMessage =
type private StreamControlMessage =
| End of replyChannel: AsyncReplyChannel<OperationResult<unit>>
| Send of
array<byte> *
data: array<byte> *
offset: int *
length: int *
replyChannel: AsyncReplyChannel<OperationResult<unit>>
| StartServiceConnectionProcess of
port: int *
Expand All @@ -44,6 +48,7 @@ type private StreamControlMessage =
| SendSendMe of replyChannel: AsyncReplyChannel<OperationResult<unit>>

type TorStream(circuit: TorCircuit) =
inherit Stream()

let mutable streamState: StreamState = StreamState.Initialized

Expand Down Expand Up @@ -79,12 +84,15 @@ type TorStream(circuit: TorCircuit) =
| _ -> failwith "Unexpected state when trying to end the stream"
}

let safeSend(data: array<byte>) =
let safeSend (data: array<byte>) (offset: int) (length: int) =
async {
match streamState with
| Connected streamId ->
let dataChunks =
SeqUtils.Chunk Constants.MaximumRelayPayloadLength data
data
|> Seq.skip offset
|> Seq.take length
|> Seq.chunkBySize Constants.MaximumRelayPayloadLength

let rec sendChunks dataChunks =
async {
Expand All @@ -98,7 +106,7 @@ type TorStream(circuit: TorCircuit) =
circuit.SendRelayCell
streamId
(head
|> Array.ofSeq
|> Seq.toArray
|> RelayData.RelayData)
None

Expand Down Expand Up @@ -224,9 +232,9 @@ type TorStream(circuit: TorCircuit) =
match command with
| End replyChannel ->
do! safeEnd() |> TryExecuteAsyncAndReplyAsResult replyChannel
| Send(data, replyChannel) ->
| Send(data, offset, length, replyChannel) ->
do!
safeSend data
safeSend data offset length
|> TryExecuteAsyncAndReplyAsResult replyChannel
| StartServiceConnectionProcess(port, streamObj, replyChannel) ->
do!
Expand Down Expand Up @@ -348,7 +356,7 @@ type TorStream(circuit: TorCircuit) =
do! refillBufferIfNeeded()

if isEOF then
return -1
return 0
else
let rec tryRead bytesRead bytesRemaining =
async {
Expand Down Expand Up @@ -401,9 +409,31 @@ type TorStream(circuit: TorCircuit) =
let streamReceiveMailBox =
MailboxProcessor.Start StreamReceiveMailBoxProcessor

override _.CanRead = not isEOF
override _.CanWrite = not isEOF

override _.CanSeek = false

override _.Length = failwith "Length is not supported"

override _.SetLength _ =
failwith "SetLength is not supported"

override _.Position
with get () = failwith "No seek, GetPosition is not supported"
and set _position = failwith "No seek, SetPosition is not supported"

override _.Seek(_, _) =
failwith "No seek, Seek is not supported"

override _.Flush() =
()

static member Accept (streamId: uint16) (circuit: TorCircuit) =
async {
let stream = TorStream circuit
// We can't use the "use" keyword since this stream needs
// to outlive this function.
let stream = new TorStream(circuit)
do! stream.RegisterIncomingStream streamId

do! circuit.SendRelayCell streamId (RelayConnected Array.empty) None
Expand All @@ -428,20 +458,6 @@ type TorStream(circuit: TorCircuit) =
member self.EndAsync() =
self.End() |> Async.StartAsTask


member __.SendData(data: array<byte>) =
async {
let! sendResult =
streamControlMailBox.PostAndAsyncReply(fun replyChannel ->
StreamControlMessage.Send(data, replyChannel)
)

return UnwrapResult sendResult
}

member self.SendDataAsync data =
self.SendData data |> Async.StartAsTask

member self.ConnectToService(port: int) =
async {
let! completionTaskRes =
Expand Down Expand Up @@ -500,7 +516,34 @@ type TorStream(circuit: TorCircuit) =
return UnwrapResult registerationResult
}

member self.Receive (buffer: array<byte>) (offset: int) (length: int) =
override _.Read(buffer: array<byte>, offset: int, length: int) =
let receiveResult =
streamReceiveMailBox.PostAndReply(fun replyChannel ->
{
StreamBuffer = buffer
BufferOffset = offset
BufferLength = length
ReplyChannel = replyChannel
}
)

UnwrapResult receiveResult

override _.Write(buffer: array<byte>, offset: int, length: int) =
let sendResult =
streamControlMailBox.PostAndReply(fun replyChannel ->
StreamControlMessage.Send(buffer, offset, length, replyChannel)
)

UnwrapResult sendResult

override _.ReadAsync
(
buffer: array<byte>,
offset: int,
length: int,
_cancelToken: CancellationToken
) =
async {
let! receiveResult =
streamReceiveMailBox.PostAndAsyncReply(fun replyChannel ->
Expand All @@ -514,9 +557,30 @@ type TorStream(circuit: TorCircuit) =

return UnwrapResult receiveResult
}
|> Async.StartAsTask

override _.WriteAsync
(
buffer: array<byte>,
offset: int,
length: int,
_cancelToken: CancellationToken
) =
async {
let! sendResult =
streamControlMailBox.PostAndAsyncReply(fun replyChannel ->
StreamControlMessage.Send(
buffer,
offset,
length,
replyChannel
)
)

member self.ReceiveAsync(buffer: array<byte>, offset: int, length: int) =
self.Receive buffer offset length |> Async.StartAsTask
return UnwrapResult sendResult
}
|> Async.StartAsTask
:> Task

interface ITorStream with
member __.HandleDestroyedCircuit() =
Expand Down
6 changes: 4 additions & 2 deletions NOnion/Services/TorServiceClient.fs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ type TorServiceClient =
circuit.Extend hsDirectoryNode
|> Async.Ignore

let dirStream = TorStream circuit
use dirStream = new TorStream(circuit)

do!
dirStream.ConnectToDirectory()
Expand Down Expand Up @@ -493,7 +493,9 @@ type TorServiceClient =
Async.Parallel [ introduceJob; rendezvousJoin ]
|> Async.Ignore

let serviceStream = TorStream rendezvousCircuit
// We can't use the "use" keyword since this stream needs
// to outlive this function.
let serviceStream = new TorStream(rendezvousCircuit)
do! serviceStream.ConnectToService port |> Async.Ignore

return
Expand Down
Loading

0 comments on commit 6069c42

Please sign in to comment.