From 0a19afe893bc4d481d4c2c30d6373a4d902008b8 Mon Sep 17 00:00:00 2001 From: Afshin Arani Date: Fri, 14 Oct 2022 17:10:38 +0330 Subject: [PATCH 1/3] Network,Utility: remove usage of SemaphoreLocker Locking mutable shared variables has caused weird bugs which were exteremly hard to find/detect, this commit removes the usage of SemaphoreLocker in the network layer of NOnion and replaces it by using MailBoxes which allows implementation of multi-writer, single-reader access pattern. Co-authored-by: Parham Saremi --- NOnion/Constants.fs | 3 + NOnion/NOnion.fsproj | 2 + NOnion/Network/StreamState.fs | 4 +- NOnion/Network/TorCircuit.fs | 1628 +++++++++++++++++++-------------- NOnion/Network/TorGuard.fs | 129 ++- NOnion/Network/TorStream.fs | 746 +++++++++------ NOnion/Utility/MailboxUtil.fs | 40 + NOnion/Utility/ResultUtil.fs | 19 + 8 files changed, 1510 insertions(+), 1061 deletions(-) create mode 100644 NOnion/Utility/MailboxUtil.fs create mode 100644 NOnion/Utility/ResultUtil.fs diff --git a/NOnion/Constants.fs b/NOnion/Constants.fs index 9be2f7bb..767b32e6 100644 --- a/NOnion/Constants.fs +++ b/NOnion/Constants.fs @@ -82,6 +82,9 @@ module Constants = // Time limit used for Create and Extend operations let internal CircuitOperationTimeout = TimeSpan.FromSeconds 10. + // Time limit used for RendezvousJoin operation + let internal CircuitRendezvousTimeout = TimeSpan.FromMinutes 2. + // Time limit used for StreamBegin operation let internal StreamCreationTimeout = TimeSpan.FromSeconds 10. diff --git a/NOnion/NOnion.fsproj b/NOnion/NOnion.fsproj index c9f3d75b..d62a61d1 100644 --- a/NOnion/NOnion.fsproj +++ b/NOnion/NOnion.fsproj @@ -19,6 +19,8 @@ + + diff --git a/NOnion/Network/StreamState.fs b/NOnion/Network/StreamState.fs index ab2b012a..ab3507be 100644 --- a/NOnion/Network/StreamState.fs +++ b/NOnion/Network/StreamState.fs @@ -10,11 +10,9 @@ type StreamState = streamId: uint16 * completionTask: TaskCompletionSource | Connected of streamId: uint16 - | Ended of streamId: uint16 * reason: EndReason member self.Id = match self with | Connecting(streamId, _) - | Connected streamId - | Ended(streamId, _) -> string streamId + | Connected streamId -> string streamId | Initialized -> "TBD" diff --git a/NOnion/Network/TorCircuit.fs b/NOnion/Network/TorCircuit.fs index 0b81f7e4..6f1c1de4 100644 --- a/NOnion/Network/TorCircuit.fs +++ b/NOnion/Network/TorCircuit.fs @@ -4,6 +4,7 @@ open System open System.Security.Cryptography open System.Threading.Tasks open System.Net +open System.Net.Sockets open Org.BouncyCastle.Crypto open Org.BouncyCastle.Crypto.Parameters @@ -18,6 +19,7 @@ open NOnion.Crypto open NOnion.Crypto.Kdf open NOnion.TorHandshakes open NOnion.Utility +open MailboxResultUtil type CircuitNodeDetail = | FastCreate @@ -26,194 +28,210 @@ type CircuitNodeDetail = NTorOnionKey: array * IdentityKey: array -type TorCircuit +type private CircuitIdTaskResult = OperationResult> + +type private RequestSendIntroduceResult = + OperationResult> + +type private TaskResult = OperationResult + +type private UnitResult = OperationResult + +[] +type private CircuitOperation = + | GetCircuitLastNode of + replyChannel: AsyncReplyChannel> + | SendRelayCell of + streamId: uint16 * + relayData: RelayData * + customDestinationOpt: Option * + replyChannel: AsyncReplyChannel + | HandleIncomingCell of + circuit: TorCircuit * + cell: ICell * + replyChannel: AsyncReplyChannel + | Create of + circuitObj: ITorCircuit * + guardDetailInfo: CircuitNodeDetail * + replyChannel: AsyncReplyChannel + | Extend of + guardDetailInfo: CircuitNodeDetail * + replyChannel: AsyncReplyChannel + | RegisterAsIntroductionPoint of + authKeyPairOpt: Option * + callback: (RelayIntroduce -> Async) * + replyChannel: AsyncReplyChannel + | RegisterAsRendezvousPoint of + cookie: array * + replyChannel: AsyncReplyChannel + | SendIntroduceRequest of + introduceMsg: RelayIntroduce * + replyChannel: AsyncReplyChannel + | WaitForRendezvous of + clientRandomPrivateKey: X25519PrivateKeyParameters * + clientRandomPublicKey: X25519PublicKeyParameters * + introAuthPublicKey: Ed25519PublicKeyParameters * + introEncPublicKey: X25519PublicKeyParameters * + replyChannel: AsyncReplyChannel + | SendRendezvousRequest of + cookie: array * + clientRandomKey: X25519PublicKeyParameters * + introAuthPublicKey: Ed25519PublicKeyParameters * + introEncPrivateKey: X25519PrivateKeyParameters * + introEncPublicKey: X25519PublicKeyParameters * + replyChannel: AsyncReplyChannel + +and TorCircuit private ( guard: TorGuard, serviceStreamCallback: Option TorCircuit -> Async> ) = let mutable circuitState: CircuitState = CircuitState.Initialized - let controlLock: SemaphoreLocker = SemaphoreLocker() let mutable streamsCount: int = 1 let mutable streamsMap: Map = Map.empty // Prevents two stream setup happening at once (to prevent race condition on writing to StreamIds list) let streamSetupLock: obj = obj() - //Client-only - new(guard: TorGuard) = TorCircuit(guard, None) - //Client-Server F# version - new(guard: TorGuard, - serviceStreamCallback: uint16 -> TorCircuit -> Async) = - TorCircuit(guard, Some serviceStreamCallback) - //Client-Server C# version - new(guard: TorGuard, serviceStreamCallback: Func) = - let callback streamId circuit = + let rec CircuitMailBoxProcessor(inbox: MailboxProcessor) = + let getCircuitLastNode() = + match circuitState with + | Ready(_, nodesStates) -> + let lastNodeOpt = nodesStates |> List.rev |> List.tryHead + + match lastNodeOpt with + | None -> failwith "BUG: circuit has no nodes" + | Some lastNode -> lastNode + | _ -> + failwith + "Unexpected state when trying to find the last circuit node" + + let internalSendRelayCell + (streamId: uint16) + (relayData: RelayData) + (customDestinationOpt: Option) + (early: bool) + = async { - return! - serviceStreamCallback.Invoke(streamId, circuit) - |> Async.AwaitTask - } + match circuitState with + | Ready(circuitId, nodesStates) + | Extending(circuitId, _, nodesStates, _) + | RegisteringAsIntroductionPoint + ( + circuitId, nodesStates, _, _, _, _ + ) + | WaitingForIntroduceAcknowledge(circuitId, nodesStates, _) + | RegisteringAsRendezvousPoint(circuitId, nodesStates, _) -> + let onionList, destination = + match customDestinationOpt with + | None -> + let reversedNodesList = nodesStates |> Seq.rev - TorCircuit(guard, Some callback) + let destinationNodeOpt = + reversedNodesList |> Seq.tryHead - member __.Id = - match circuitState with - | Creating(circuitId, _, _) - | Extending(circuitId, _, _, _) - | RegisteringAsIntroductionPoint(circuitId, _, _, _, _, _) - | RegisteringAsRendezvousPoint(circuitId, _, _) - | WaitingForIntroduceAcknowledge(circuitId, _, _) - | WaitingForRendezvousRequest(circuitId, _, _, _, _, _, _) - | Ready(circuitId, _) - | ReadyAsIntroductionPoint(circuitId, _, _, _, _) - | ReadyAsRendezvousPoint(circuitId, _) - | Destroyed(circuitId, _) - | Truncated(circuitId, _) -> circuitId - | _ -> failwith "should not happen!" + match destinationNodeOpt with + | None -> + failwith + "Circuit has no nodes, can't relay data" + | Some destinationNode -> + reversedNodesList, destinationNode + | Some destination -> + nodesStates + |> Seq.takeWhile(fun node -> node <> destination) + |> Seq.append(Seq.singleton destination) + |> Seq.rev, + destination + + let plainRelayCell = + CellPlainRelay.Create + streamId + relayData + (Array.zeroCreate Constants.RelayDigestLength) - member __.LastNode = - match circuitState with - | Ready(_, nodesStates) -> - let lastNodeOpt = nodesStates |> List.rev |> List.tryHead + let relayPlainBytes = plainRelayCell.ToBytes true - match lastNodeOpt with - | None -> failwith "BUG: circuit has no nodes" - | Some lastNode -> lastNode - | _ -> - failwith - "Unexpected state when trying to find the last circuit node" + destination.CryptoState.ForwardDigest.Update + relayPlainBytes + 0 + relayPlainBytes.Length - member private self.UnsafeSendRelayCell - (streamId: uint16) - (relayData: RelayData) - (customDestinationOpt: Option) - (early: bool) - = - async { - match circuitState with - | Ready(circuitId, nodesStates) - | Extending(circuitId, _, nodesStates, _) - | RegisteringAsIntroductionPoint(circuitId, nodesStates, _, _, _, _) - | WaitingForIntroduceAcknowledge(circuitId, nodesStates, _) - | RegisteringAsRendezvousPoint(circuitId, nodesStates, _) -> - let onionList, destination = - match customDestinationOpt with - | None -> - let reversedNodesList = nodesStates |> Seq.rev - - let destinationNodeOpt = - reversedNodesList |> Seq.tryHead - - match destinationNodeOpt with - | None -> - failwith "Circuit has no nodes, can't relay data" - | Some destinationNode -> - reversedNodesList, destinationNode - | Some destination -> - nodesStates - |> Seq.takeWhile(fun node -> node <> destination) - |> Seq.append(Seq.singleton destination) - |> Seq.rev, - destination - - let plainRelayCell = - CellPlainRelay.Create - streamId - relayData - (Array.zeroCreate Constants.RelayDigestLength) - - let relayPlainBytes = plainRelayCell.ToBytes true - - destination.CryptoState.ForwardDigest.Update - relayPlainBytes - 0 - relayPlainBytes.Length - - let digest = - destination.CryptoState.ForwardDigest.GetDigestBytes() - |> Array.take Constants.RelayDigestLength - - let plainRelayCell = - { plainRelayCell with - Digest = digest - } - - let rec encryptMessage - (nodes: seq) - (message: array) - = - match Seq.tryHeadTail nodes with - | Some(node, nextNodes) -> - encryptMessage - nextNodes - (node.CryptoState.ForwardCipher.Encrypt message) - | None -> message + let digest = + destination.CryptoState.ForwardDigest.GetDigestBytes() + |> Array.take Constants.RelayDigestLength - do! - { - CellEncryptedRelay.EncryptedData = - plainRelayCell.ToBytes false - |> encryptMessage onionList - Early = early - } - |> guard.Send circuitId - - match relayData with - | RelayData _ - | RelaySendMe _ -> - // too many to log - () - | _ -> - TorLogger.Log( - sprintf - "TorCircuit[%i,%s]: Sent relay cell %A over circuit" - self.Id - circuitState.Name - relayData - ) - | _ -> - failwithf - "Can't relay cell over circuit, %s state" - (circuitState.ToString()) - } + let plainRelayCell = + { plainRelayCell with + Digest = digest + } - member self.SendRelayCell - (streamId: uint16) - (relayData: RelayData) - (customDestinationOpt: Option) - = - async { - let safeSend() = - async { - match circuitState with - | Ready _ -> - return! - self.UnsafeSendRelayCell - streamId - relayData - customDestinationOpt - false + let rec encryptMessage + (nodes: seq) + (message: array) + = + match Seq.tryHeadTail nodes with + | Some(node, nextNodes) -> + encryptMessage + nextNodes + (node.CryptoState.ForwardCipher.Encrypt message) + | None -> message + + do! + { + CellEncryptedRelay.EncryptedData = + plainRelayCell.ToBytes false + |> encryptMessage onionList + Early = early + } + |> guard.Send circuitId + + match relayData with + | RelayData _ + | RelaySendMe _ -> + // too many to log + () | _ -> + TorLogger.Log( + sprintf + "TorCircuit[%i,%s]: Sent relay cell %A over circuit" + circuitId + circuitState.Name + relayData + ) + | _ -> + failwithf + "Can't relay cell over circuit, %s state" + (circuitState.ToString()) + } + + let sendRelayCell streamId relayData customDestinationOpt = + async { + match circuitState with + | Ready _ -> + return! + internalSendRelayCell + streamId + relayData + customDestinationOpt + false + | _ -> + return failwithf "Can't relay cell over circuit, %s state" (circuitState.ToString()) - } - - return! controlLock.RunAsyncWithSemaphore safeSend - } + } - member private self.DecryptCell(encryptedRelayCell: CellEncryptedRelay) = - let safeDecryptCell() = + let decryptCell(encryptedRelayCell: CellEncryptedRelay) = match circuitState with - | Ready(_circuitId, nodes) - | ReadyAsIntroductionPoint(_circuitId, nodes, _, _, _) - | ReadyAsRendezvousPoint(_circuitId, nodes) - | RegisteringAsIntroductionPoint(_circuitId, nodes, _, _, _, _) - | RegisteringAsRendezvousPoint(_circuitId, nodes, _) - | WaitingForIntroduceAcknowledge(_circuitId, nodes, _) - | WaitingForRendezvousRequest(_circuitId, nodes, _, _, _, _, _) - | Extending(_circuitId, _, nodes, _) -> + | Ready(circuitId, nodes) + | ReadyAsIntroductionPoint(circuitId, nodes, _, _, _) + | ReadyAsRendezvousPoint(circuitId, nodes) + | RegisteringAsIntroductionPoint(circuitId, nodes, _, _, _, _) + | RegisteringAsRendezvousPoint(circuitId, nodes, _) + | WaitingForIntroduceAcknowledge(circuitId, nodes, _) + | WaitingForRendezvousRequest(circuitId, nodes, _, _, _, _, _) + | Extending(circuitId, _, nodes, _) -> let rec decryptMessage (message: array) (nodes: List) @@ -274,7 +292,7 @@ type TorCircuit TorLogger.Log( sprintf "TorCircuit[%i,%s]: decrypted relay cell %A over circuit" - self.Id + circuitId circuitState.Name decryptedRelayCell.Data ) @@ -287,222 +305,434 @@ type TorCircuit decryptMessage encryptedRelayCell.EncryptedData nodes | _ -> failwith "Unexpected state when receiving relay cell" - controlLock.RunSyncWithSemaphore safeDecryptCell - - member self.Create(guardDetailOpt: CircuitNodeDetail) = - async { - let create() = - async { + let handleIncomingCell (circuitObj: TorCircuit) (cell: ICell) = + async { + //TODO: Handle circuit-level cells like destroy/truncate etc.. + match cell with + | :? ICreatedCell as createdMsg -> match circuitState with - | CircuitState.Initialized -> - let circuitId = guard.RegisterCircuit self - let connectionCompletionSource = TaskCompletionSource() - - let handshakeState, handshakeCell = - match guardDetailOpt with - | FastCreate -> - let state = FastHandshake.Create() :> IHandshake - - state, - { - CellCreateFast.X = - state.GenerateClientMaterial() - } - :> ICell - | Create(_, onionKey, identityKey) -> - let state = - NTorHandshake.Create identityKey onionKey - :> IHandshake - - state, - { - CellCreate2.HandshakeType = - HandshakeType.NTor - HandshakeData = - state.GenerateClientMaterial() - } - :> ICell + | Creating(circuitId, handshakeState, tcs) -> + let kdfResult = + handshakeState.GenerateKdfResult createdMsg circuitState <- - Creating( + Ready( circuitId, - handshakeState, - connectionCompletionSource + List.singleton + { + TorCircuitNode.CryptoState = + TorCryptoState.FromKdfResult + kdfResult + false + Window = + TorWindow + Constants.DefaultCircuitLevelWindowParams + } ) - do! guard.Send circuitId handshakeCell - - TorLogger.Log( - sprintf - "TorCircuit[%i,%s]: sending create cell" - self.Id - circuitState.Name - ) - - return connectionCompletionSource.Task - | _ -> return invalidOp "Circuit is already created" - } + tcs.SetResult circuitId + | _ -> + failwith + "Unexpected circuit state when receiving CreatedFast cell" + | :? CellEncryptedRelay as enRelay -> + let streamId, relayData, fromNode = decryptCell enRelay - let! completionTask = controlLock.RunAsyncWithSemaphore create + match relayData with + | RelayData.RelayData _ -> + fromNode.Window.DeliverDecrease() - return! - completionTask - |> Async.AwaitTask - |> FSharpUtil.WithTimeout Constants.CircuitOperationTimeout - } + if fromNode.Window.NeedSendme() then + do! + sendRelayCell + Constants.DefaultStreamId + RelayData.RelaySendMe + (Some fromNode) + | RelayData.RelayExtended2 extended2 -> + match circuitState with + | Extending(circuitId, handshakeState, nodes, tcs) -> + let kdfResult = + handshakeState.GenerateKdfResult extended2 - member self.Extend(nodeDetail: CircuitNodeDetail) = - async { - let extend() = - async { - match circuitState with - | CircuitState.Ready(circuitId, nodes) -> - return! - match nodeDetail with - | FastCreate -> - async { - return - failwith - "Only first hop can be created using CREATE_FAST" - } - | Create(address, onionKey, identityKey) -> - async { - let connectionCompletionSource = - TaskCompletionSource() - - let handshakeState, handshakeCell = - let state = - NTorHandshake.Create - identityKey - onionKey - :> IHandshake - - state, + circuitState <- + Ready( + circuitId, + nodes + @ List.singleton { - RelayExtend2.LinkSpecifiers = - [ - LinkSpecifier.CreateFromEndPoint - address - { - LinkSpecifier.Type = - LinkSpecifierType.LegacyIdentity - Data = identityKey - } - ] - HandshakeType = HandshakeType.NTor - HandshakeData = - state.GenerateClientMaterial() + TorCircuitNode.CryptoState = + TorCryptoState.FromKdfResult + kdfResult + false + Window = + TorWindow + Constants.DefaultCircuitLevelWindowParams } - |> RelayData.RelayExtend2 - - circuitState <- - Extending( - circuitId, - handshakeState, - nodes, - connectionCompletionSource - ) - - do! - self.UnsafeSendRelayCell - Constants.DefaultStreamId - handshakeCell - None - true - - return connectionCompletionSource.Task - } - | _ -> - return - invalidOp - "Circuit is not in a state suitable for extending" - } + ) - let! completionTask = controlLock.RunAsyncWithSemaphore extend + tcs.SetResult circuitId + | _ -> + failwith + "Unexpected circuit state when receiving Extended cell" + | RelayData.RelayEstablishedIntro _ -> + match circuitState with + | RegisteringAsIntroductionPoint + ( + circuitId, + nodes, + privateKey, + publicKey, + tcs, + callback + ) -> + circuitState <- + ReadyAsIntroductionPoint( + circuitId, + nodes, + privateKey, + publicKey, + callback + ) - return! - completionTask - |> Async.AwaitTask - |> FSharpUtil.WithTimeout Constants.CircuitOperationTimeout + tcs.SetResult() + | _ -> + failwith + "Unexpected circuit state when receiving ESTABLISHED_INTRO cell" + | RelayData.RelayEstablishedRendezvous -> + match circuitState with + | RegisteringAsRendezvousPoint(circuitId, nodes, tcs) -> + circuitState <- + ReadyAsRendezvousPoint(circuitId, nodes) - } + tcs.SetResult() + | _ -> + failwith + "Unexpected circuit state when receiving RENDEZVOUS_ESTABLISHED cell" + | RelaySendMe _ when streamId = Constants.DefaultStreamId -> + fromNode.Window.PackageIncrease() + | RelayIntroduce2 introduceMsg -> + match circuitState with + | ReadyAsIntroductionPoint(_, _, _, _, callback) -> + do! callback(introduceMsg) + | _ -> + return + failwith + "Received introduce2 cell over non-introduction-circuit huh?" + | RelayTruncated reason -> + match circuitState with + | CircuitState.Initialized -> + // Circuit isn't created yet! + () + | Creating(circuitId, _, tcs) + | Extending(circuitId, _, _, tcs) -> + circuitState <- Truncated(circuitId, reason) - member self.RegisterAsIntroductionPoint - (authKeyPairOpt: Option) - callback - = - let registerAsIntroduction() = - async { - match circuitState with - | Ready(circuitId, nodes) -> - let lastNode = self.LastNode + tcs.SetException(CircuitTruncatedException reason) + | WaitingForIntroduceAcknowledge(circuitId, _, tcs) -> + circuitState <- Truncated(circuitId, reason) - let authPrivateKey, authPublicKey = - let authKeyPair = - match authKeyPairOpt with - | Some authKeyPair -> authKeyPair - | None -> - let kpGen = Ed25519KeyPairGenerator() - let random = SecureRandom() + tcs.SetException(CircuitTruncatedException reason) + | RegisteringAsRendezvousPoint(circuitId, _, tcs) + | RegisteringAsIntroductionPoint + ( + circuitId, _, _, _, tcs, _ + ) + | WaitingForRendezvousRequest + ( + circuitId, _, _, _, _, _, tcs + ) -> + circuitState <- Truncated(circuitId, reason) - kpGen.Init( - Ed25519KeyGenerationParameters random - ) + tcs.SetException(CircuitTruncatedException reason) + //FIXME: how can we tell the user that circuit is destroyed? if we throw here the listening thread with throw and user never finds out why + | Ready(circuitId, _) + | ReadyAsIntroductionPoint(circuitId, _, _, _, _) + | ReadyAsRendezvousPoint(circuitId, _) + // The circuit was already dead in our eyes, so we don't care about it being destroyed, just update the state to new destroyed state + | Destroyed(circuitId, _) + | Truncated(circuitId, _) -> + circuitState <- Truncated(circuitId, reason) + | RelayData.RelayIntroduceAck ackMsg -> + match circuitState with + | WaitingForIntroduceAcknowledge(circuitId, nodes, tcs) -> + circuitState <- Ready(circuitId, nodes) - kpGen.GenerateKeyPair() + tcs.SetResult ackMsg + | _ -> + failwith + "Unexpected circuit state when receiving RelayIntroduceAck cell" + | RelayData.RelayRendezvous2 rendMsg -> + match circuitState with + | WaitingForRendezvousRequest + ( + circuitId, + nodes, + clientRandomPrivateKey, + clientRandomPublicKey, + introAuthPublicKey, + introEncPublicKey, + tcs + ) -> - authKeyPair.Private :?> Ed25519PrivateKeyParameters, - authKeyPair.Public :?> Ed25519PublicKeyParameters + let serverPublicKey = + rendMsg.HandshakeData + |> Array.take Constants.KeyS256Length - let establishIntroCell = - RelayEstablishIntro.Create - authPrivateKey - authPublicKey - lastNode.CryptoState.KeyHandshake - |> RelayData.RelayEstablishIntro + let ntorKeySeed, mac = + HiddenServicesCipher.CalculateClientRendezvousKeys + (X25519PublicKeyParameters( + serverPublicKey, + 0 + )) + clientRandomPublicKey + clientRandomPrivateKey + introAuthPublicKey + introEncPublicKey - let connectionCompletionSource = TaskCompletionSource() + if mac + <> (rendMsg.HandshakeData + |> Array.skip Constants.KeyS256Length + |> Array.take Constants.Digest256Length) then + failwith "Invalid handshake data" - circuitState <- - CircuitState.RegisteringAsIntroductionPoint( - circuitId, - nodes, - authPrivateKey, - authPublicKey, - connectionCompletionSource, - callback - ) - do! - self.UnsafeSendRelayCell - Constants.DefaultStreamId + circuitState <- + Ready( + circuitId, + nodes + @ List.singleton + { + TorCircuitNode.CryptoState = + TorCryptoState.FromKdfResult + (Kdf.ComputeHSKdf + ntorKeySeed) + false + Window = + TorWindow + Constants.DefaultCircuitLevelWindowParams + } + ) + + tcs.SetResult() + | _ -> + failwith + "Unexpected circuit state when receiving Rendevzous2 cell" + | RelayBegin beginRequest -> + if beginRequest.Address.Split(':').[0] = String.Empty + && serviceStreamCallback.IsSome then + do! serviceStreamCallback.Value streamId circuitObj + | _ -> () + + if streamId <> Constants.DefaultStreamId then + match (streamsMap.TryFind streamId, relayData) with + | (Some stream, _) -> + do! stream.HandleIncomingData relayData + | (None, RelayBegin _) -> () + | (None, _) -> failwith "Unknown stream" + | :? CellDestroy as destroyCell -> + match circuitState with + | CircuitState.Initialized -> + // Circuit isn't created yet! + () + | Creating(circuitId, _, tcs) + | Extending(circuitId, _, _, tcs) -> + + circuitState <- Destroyed(circuitId, destroyCell.Reason) + + tcs.SetException( + CircuitDestroyedException destroyCell.Reason + ) + | RegisteringAsRendezvousPoint(circuitId, _, tcs) + | RegisteringAsIntroductionPoint(circuitId, _, _, _, tcs, _) + | WaitingForRendezvousRequest(circuitId, _, _, _, _, _, tcs) -> + + circuitState <- Destroyed(circuitId, destroyCell.Reason) + + tcs.SetException( + CircuitDestroyedException destroyCell.Reason + ) + | WaitingForIntroduceAcknowledge(circuitId, _, tcs) -> + circuitState <- Destroyed(circuitId, destroyCell.Reason) + + tcs.SetException( + CircuitDestroyedException destroyCell.Reason + ) + //FIXME: how can we tell the user that circuit is destroyed? if we throw here the listening thread will throw and user never finds out why + | Ready(circuitId, _) + | ReadyAsIntroductionPoint(circuitId, _, _, _, _) + | ReadyAsRendezvousPoint(circuitId, _) + // The circuit was already dead in our eyes, so we don't care about it being destroyed, just update the state to new destroyed state + | Destroyed(circuitId, _) + | Truncated(circuitId, _) -> + circuitState <- Destroyed(circuitId, destroyCell.Reason) + | _ -> () + } + + let requestCreation circuitObj guardDetailOpt = + async { + match circuitState with + | CircuitState.Initialized -> + let circuitId = guard.RegisterCircuit circuitObj + let connectionCompletionSource = TaskCompletionSource() + + let handshakeState, handshakeCell = + match guardDetailOpt with + | FastCreate -> + let state = FastHandshake.Create() :> IHandshake + + state, + { + CellCreateFast.X = + state.GenerateClientMaterial() + } + :> ICell + | Create(_, onionKey, identityKey) -> + let state = + NTorHandshake.Create identityKey onionKey + :> IHandshake + + state, + { + CellCreate2.HandshakeType = HandshakeType.NTor + HandshakeData = state.GenerateClientMaterial() + } + :> ICell + + circuitState <- + Creating( + circuitId, + handshakeState, + connectionCompletionSource + ) + + do! guard.Send circuitId handshakeCell + + TorLogger.Log( + sprintf + "TorCircuit[%i,%s]: sending create cell" + circuitId + circuitState.Name + ) + + return connectionCompletionSource.Task + | _ -> return invalidOp "Circuit is already created" + } + + let requestExtension nodeDetail = + async { + match circuitState with + | CircuitState.Ready(circuitId, nodes) -> + match nodeDetail with + | FastCreate -> + return + invalidOp + "Only first hop can be created using CREATE_FAST" + | Create(address, onionKey, identityKey) -> + let connectionCompletionSource = TaskCompletionSource() + + let handshakeState, handshakeCell = + let state = + NTorHandshake.Create identityKey onionKey + :> IHandshake + + state, + { + RelayExtend2.LinkSpecifiers = + [ + LinkSpecifier.CreateFromEndPoint address + { + LinkSpecifier.Type = + LinkSpecifierType.LegacyIdentity + Data = identityKey + } + ] + HandshakeType = HandshakeType.NTor + HandshakeData = state.GenerateClientMaterial() + } + |> RelayData.RelayExtend2 + + circuitState <- + Extending( + circuitId, + handshakeState, + nodes, + connectionCompletionSource + ) + + do! + internalSendRelayCell + Constants.DefaultStreamId + handshakeCell + None + true + + return connectionCompletionSource.Task + | _ -> + return + invalidOp + "Circuit is not in a state suitable for extending" + } + + let registerAsIntroductionPoint authKeyPairOpt callback = + async { + match circuitState with + | Ready(circuitId, nodes) -> + let lastNode = getCircuitLastNode() + + let authPrivateKey, authPublicKey = + let authKeyPair = + match authKeyPairOpt with + | Some authKeyPair -> authKeyPair + | None -> + let kpGen = Ed25519KeyPairGenerator() + let random = SecureRandom() + + kpGen.Init( + Ed25519KeyGenerationParameters random + ) + + kpGen.GenerateKeyPair() + + authKeyPair.Private :?> Ed25519PrivateKeyParameters, + authKeyPair.Public :?> Ed25519PublicKeyParameters + + let establishIntroCell = + RelayEstablishIntro.Create + authPrivateKey + authPublicKey + lastNode.CryptoState.KeyHandshake + |> RelayData.RelayEstablishIntro + + let connectionCompletionSource = TaskCompletionSource() + + circuitState <- + CircuitState.RegisteringAsIntroductionPoint( + circuitId, + nodes, + authPrivateKey, + authPublicKey, + connectionCompletionSource, + callback + ) + + do! + internalSendRelayCell + Constants.DefaultStreamId establishIntroCell (Some lastNode) false - return connectionCompletionSource.Task + return connectionCompletionSource.Task :> Task | _ -> return failwith "Unexpected state for registering as introduction point" } - async { - let! completionTask = - controlLock.RunAsyncWithSemaphore registerAsIntroduction - - return! - completionTask - |> Async.AwaitTask - |> FSharpUtil.WithTimeout Constants.CircuitOperationTimeout - } - - member self.RegisterAsRendezvousPoint(cookie: array) = - let registerAsRendezvousPoint() = + let registerAsRendezvousPoint(cookie: array) = async { match circuitState with | Ready(circuitId, nodes) -> - let lastNode = self.LastNode + let lastNode = getCircuitLastNode() let establishRendezvousCell = RelayData.RelayEstablishRendezvous cookie @@ -517,37 +747,20 @@ type TorCircuit ) do! - self.UnsafeSendRelayCell + internalSendRelayCell Constants.DefaultStreamId establishRendezvousCell (Some lastNode) false - return connectionCompletionSource.Task + return connectionCompletionSource.Task :> Task | _ -> return failwith "Unexpected state for registering as rendezvous point" } - async { - let! completionTask = - controlLock.RunAsyncWithSemaphore registerAsRendezvousPoint - - return! - completionTask - |> Async.AwaitTask - |> FSharpUtil.WithTimeout Constants.CircuitOperationTimeout - } - - member self.ExtendAsync nodeDetail = - self.Extend nodeDetail |> Async.StartAsTask - - member self.CreateAsync guardDetailOpt = - self.Create guardDetailOpt |> Async.StartAsTask - - member self.Introduce(introduceMsg: RelayIntroduce) = - let sendIntroduceCell() = + let sendIntroduceRequest(introduceMsg: RelayIntroduce) = async { match circuitState with | Ready(circuitId, nodes) -> @@ -562,7 +775,7 @@ type TorCircuit ) do! - self.UnsafeSendRelayCell + internalSendRelayCell 0us (RelayIntroduce1 introduceMsg) None @@ -574,23 +787,12 @@ type TorCircuit failwith "Unexpected state when sending introduce msg" } - async { - let! completionTask = - controlLock.RunAsyncWithSemaphore sendIntroduceCell - - return! - completionTask - |> Async.AwaitTask - |> FSharpUtil.WithTimeout Constants.CircuitOperationTimeout - } - - member self.WaitingForRendezvousJoin - (clientRandomPrivateKey: X25519PrivateKeyParameters) - (clientRandomPublicKey: X25519PublicKeyParameters) - (introAuthPublicKey: Ed25519PublicKeyParameters) - (introEncPublicKey: X25519PublicKeyParameters) - = - let waitingForRendezvous() = + let waitForRendezvous + (clientRandomPrivateKey: X25519PrivateKeyParameters) + (clientRandomPublicKey: X25519PublicKeyParameters) + (introAuthPublicKey: Ed25519PublicKeyParameters) + (introEncPublicKey: X25519PublicKeyParameters) + = async { match circuitState with | ReadyAsRendezvousPoint(circuitId, nodes) -> @@ -607,31 +809,20 @@ type TorCircuit connectionCompletionSource ) - return connectionCompletionSource.Task + return connectionCompletionSource.Task :> Task | _ -> return failwith "Unexpected state when waiting for rendezvous join" } - async { - let! completionTask = - controlLock.RunAsyncWithSemaphore waitingForRendezvous - - return! - completionTask - |> Async.AwaitTask - |> FSharpUtil.WithTimeout(TimeSpan.FromMinutes 2.) - } - - member self.Rendezvous - (cookie: array) - (clientRandomKey: X25519PublicKeyParameters) - (introAuthPublicKey: Ed25519PublicKeyParameters) - (introEncPrivateKey: X25519PrivateKeyParameters) - (introEncPublicKey: X25519PublicKeyParameters) - = - let sendRendezvousCell() = + let sendRendezvousCell + (cookie: array) + (clientRandomKey: X25519PublicKeyParameters) + (introAuthPublicKey: Ed25519PublicKeyParameters) + (introEncPrivateKey: X25519PrivateKeyParameters) + (introEncPublicKey: X25519PublicKeyParameters) + = async { match circuitState with | Ready(circuitId, nodes) -> @@ -687,18 +878,327 @@ type TorCircuit ) do! - self.UnsafeSendRelayCell + internalSendRelayCell 0us rendezvousCell (List.tryLast nodes) false - | _ -> return failwith "Unexpected state when sending rendezvous msg" } - async { do! controlLock.RunAsyncWithSemaphore sendRendezvousCell } + async { + let! cancelToken = Async.CancellationToken + cancelToken.ThrowIfCancellationRequested() + + let! op = inbox.Receive() + + match op with + | CircuitOperation.GetCircuitLastNode replyChannel -> + TryExecuteAndReplyAsResult replyChannel getCircuitLastNode + | CircuitOperation.SendRelayCell + ( + streamId, relayData, customDestinationOpt, replyChannel + ) -> + do! + sendRelayCell streamId relayData customDestinationOpt + |> TryExecuteAsyncAndReplyAsResult replyChannel + | CircuitOperation.HandleIncomingCell + ( + circuitObj, cell, replyChannel + ) -> + do! + handleIncomingCell circuitObj cell + |> TryExecuteAsyncAndReplyAsResult replyChannel + | CircuitOperation.Create(circuitObj, guardDetailOpt, replyChannel) -> + do! + requestCreation circuitObj guardDetailOpt + |> TryExecuteAsyncAndReplyAsResult replyChannel + | CircuitOperation.Extend(nodeDetail, replyChannel) -> + do! + requestExtension nodeDetail + |> TryExecuteAsyncAndReplyAsResult replyChannel + | CircuitOperation.RegisterAsIntroductionPoint + ( + authKeyPairOpt, callback, replyChannel + ) -> + do! + registerAsIntroductionPoint authKeyPairOpt callback + |> TryExecuteAsyncAndReplyAsResult replyChannel + | CircuitOperation.RegisterAsRendezvousPoint(cookie, replyChannel) -> + do! + registerAsRendezvousPoint cookie + |> TryExecuteAsyncAndReplyAsResult replyChannel + | CircuitOperation.SendIntroduceRequest(msg, replyChannel) -> + do! + sendIntroduceRequest msg + |> TryExecuteAsyncAndReplyAsResult replyChannel + | CircuitOperation.WaitForRendezvous + ( + clientRandomPrivateKey, + clientRandomPublicKey, + introAuthPublicKey, + introEncPublicKey, + replyChannel + ) -> + do! + waitForRendezvous + clientRandomPrivateKey + clientRandomPublicKey + introAuthPublicKey + introEncPublicKey + |> TryExecuteAsyncAndReplyAsResult replyChannel + | CircuitOperation.SendRendezvousRequest + ( + cookie: array, + clientRandomKey, + introAuthPublicKey, + introEncPrivateKey, + introEncPublicKey, + replyChannel + ) -> + do! + sendRendezvousCell + cookie + clientRandomKey + introAuthPublicKey + introEncPrivateKey + introEncPublicKey + |> TryExecuteAsyncAndReplyAsResult replyChannel + + return! CircuitMailBoxProcessor inbox + } + + let circuitOperationsMailBox = + MailboxProcessor.Start CircuitMailBoxProcessor + + //Client-only + new(guard: TorGuard) = TorCircuit(guard, None) + //Client-Server F# version + new(guard: TorGuard, + serviceStreamCallback: uint16 -> TorCircuit -> Async) = + TorCircuit(guard, Some serviceStreamCallback) + //Client-Server C# version + new(guard: TorGuard, serviceStreamCallback: Func) = + let callback streamId circuit = + async { + return! + serviceStreamCallback.Invoke(streamId, circuit) + |> Async.AwaitTask + } + + TorCircuit(guard, Some callback) + + member __.Id = + match circuitState with + | Creating(circuitId, _, _) + | Extending(circuitId, _, _, _) + | RegisteringAsIntroductionPoint(circuitId, _, _, _, _, _) + | RegisteringAsRendezvousPoint(circuitId, _, _) + | WaitingForIntroduceAcknowledge(circuitId, _, _) + | WaitingForRendezvousRequest(circuitId, _, _, _, _, _, _) + | Ready(circuitId, _) + | ReadyAsIntroductionPoint(circuitId, _, _, _, _) + | ReadyAsRendezvousPoint(circuitId, _) + | Destroyed(circuitId, _) + | Truncated(circuitId, _) -> circuitId + | CircuitState.Initialized -> + failwith + "Should not happen: can't get circuitId for non-initialized circuit." + + member __.GetLastNode() = + async { + let! lastNodeResult = + circuitOperationsMailBox.PostAndAsyncReply + CircuitOperation.GetCircuitLastNode + + return UnwrapResult lastNodeResult + } + + member __.SendRelayCell + (streamId: uint16) + (relayData: RelayData) + (customDestinationOpt: Option) + = + async { + let! sendResult = + circuitOperationsMailBox.PostAndAsyncReply(fun replyChannel -> + CircuitOperation.SendRelayCell( + streamId, + relayData, + customDestinationOpt, + replyChannel + ) + ) + + return UnwrapResult sendResult + } + + member self.Create(guardDetailOpt: CircuitNodeDetail) = + async { + let! completionTaskRes = + circuitOperationsMailBox.PostAndAsyncReply( + (fun replyChannel -> + CircuitOperation.Create( + self, + guardDetailOpt, + replyChannel + ) + ), + Constants.CircuitOperationTimeout.TotalMilliseconds |> int + ) + + return! + completionTaskRes + |> UnwrapResult + |> Async.AwaitTask + |> FSharpUtil.WithTimeout Constants.CircuitOperationTimeout + } + + member __.Extend(nodeDetail: CircuitNodeDetail) = + async { + let! completionTaskRes = + circuitOperationsMailBox.PostAndAsyncReply( + (fun replyChannel -> + CircuitOperation.Extend(nodeDetail, replyChannel) + ), + Constants.CircuitOperationTimeout.TotalMilliseconds |> int + ) + + return! + completionTaskRes + |> UnwrapResult + |> Async.AwaitTask + |> FSharpUtil.WithTimeout Constants.CircuitOperationTimeout + } + + member __.RegisterAsIntroductionPoint + (authKeyPairOpt: Option) + callback + = + async { + let! completionTaskRes = + circuitOperationsMailBox.PostAndAsyncReply( + (fun replyChannel -> + CircuitOperation.RegisterAsIntroductionPoint( + authKeyPairOpt, + callback, + replyChannel + ) + ), + Constants.CircuitOperationTimeout.TotalMilliseconds |> int + ) + + return! + completionTaskRes + |> UnwrapResult + |> Async.AwaitTask + |> FSharpUtil.WithTimeout Constants.CircuitOperationTimeout + } + + member __.RegisterAsRendezvousPoint(cookie: array) = + async { + let! completionTaskRes = + circuitOperationsMailBox.PostAndAsyncReply( + (fun replyChannel -> + CircuitOperation.RegisterAsRendezvousPoint( + cookie, + replyChannel + ) + ), + Constants.CircuitOperationTimeout.TotalMilliseconds |> int + ) + + + return! + completionTaskRes + |> UnwrapResult + |> Async.AwaitTask + |> FSharpUtil.WithTimeout Constants.CircuitOperationTimeout + } + + member self.ExtendAsync nodeDetail = + self.Extend nodeDetail |> Async.StartAsTask + + member self.CreateAsync guardDetailOpt = + self.Create guardDetailOpt |> Async.StartAsTask + + member __.Introduce(introduceMsg: RelayIntroduce) = + async { + let! completionTaskRes = + circuitOperationsMailBox.PostAndAsyncReply( + (fun replyChannel -> + CircuitOperation.SendIntroduceRequest( + introduceMsg, + replyChannel + ) + ), + Constants.CircuitOperationTimeout.TotalMilliseconds |> int + ) + + return! + completionTaskRes + |> UnwrapResult + |> Async.AwaitTask + |> FSharpUtil.WithTimeout Constants.CircuitOperationTimeout + } + + member __.WaitingForRendezvousJoin + (clientRandomPrivateKey: X25519PrivateKeyParameters) + (clientRandomPublicKey: X25519PublicKeyParameters) + (introAuthPublicKey: Ed25519PublicKeyParameters) + (introEncPublicKey: X25519PublicKeyParameters) + = + async { + let! completionTaskRes = + circuitOperationsMailBox.PostAndAsyncReply( + (fun replyChannel -> + CircuitOperation.WaitForRendezvous( + clientRandomPrivateKey, + clientRandomPublicKey, + introAuthPublicKey, + introEncPublicKey, + replyChannel + ) + ), + Constants.CircuitRendezvousTimeout.TotalMilliseconds |> int + ) + + return! + completionTaskRes + |> UnwrapResult + |> Async.AwaitTask + |> FSharpUtil.WithTimeout Constants.CircuitRendezvousTimeout + + } + + member __.Rendezvous + (cookie: array) + (clientRandomKey: X25519PublicKeyParameters) + (introAuthPublicKey: Ed25519PublicKeyParameters) + (introEncPrivateKey: X25519PrivateKeyParameters) + (introEncPublicKey: X25519PublicKeyParameters) + = + + async { + let! completionRes = + circuitOperationsMailBox.PostAndAsyncReply( + (fun replyChannel -> + CircuitOperation.SendRendezvousRequest( + cookie, + clientRandomKey, + introAuthPublicKey, + introEncPrivateKey, + introEncPublicKey, + replyChannel + ) + ), + Constants.CircuitOperationTimeout.TotalMilliseconds |> int + ) + + return UnwrapResult completionRes + } member self.RegisterAsIntroductionPointAsync (authKeyPairOpt: Option) @@ -732,315 +1232,19 @@ type TorCircuit interface ITorCircuit with member self.HandleIncomingCell(cell: ICell) = async { - //TODO: Handle circuit-level cells like destroy/truncate etc.. - match cell with - | :? ICreatedCell as createdMsg -> - let handleCreated() = - match circuitState with - | Creating(circuitId, handshakeState, tcs) -> - let kdfResult = - handshakeState.GenerateKdfResult createdMsg - - circuitState <- - Ready( - circuitId, - List.singleton - { - TorCircuitNode.CryptoState = - TorCryptoState.FromKdfResult - kdfResult - false - Window = - TorWindow - Constants.DefaultCircuitLevelWindowParams - } - ) - - tcs.SetResult circuitId - | _ -> - failwith - "Unexpected circuit state when receiving CreatedFast cell" - - controlLock.RunSyncWithSemaphore handleCreated - | :? CellEncryptedRelay as enRelay -> - let streamId, relayData, fromNode = self.DecryptCell enRelay - - match relayData with - | RelayData.RelayData _ -> - fromNode.Window.DeliverDecrease() - - if fromNode.Window.NeedSendme() then - do! - self.SendRelayCell - Constants.DefaultStreamId - RelayData.RelaySendMe - (Some fromNode) - | RelayData.RelayExtended2 extended2 -> - let handleExtended() = - match circuitState with - | Extending(circuitId, handshakeState, nodes, tcs) -> - let kdfResult = - handshakeState.GenerateKdfResult extended2 - - circuitState <- - Ready( - circuitId, - nodes - @ List.singleton - { - TorCircuitNode.CryptoState = - TorCryptoState.FromKdfResult - kdfResult - false - Window = - TorWindow - Constants.DefaultCircuitLevelWindowParams - } - ) - - tcs.SetResult circuitId - | _ -> - failwith - "Unexpected circuit state when receiving Extended cell" - - controlLock.RunSyncWithSemaphore handleExtended - - | RelayData.RelayEstablishedIntro _ -> - let handleEstablished() = - match circuitState with - | RegisteringAsIntroductionPoint - ( - circuitId, - nodes, - _privateKey, - _publicKey, - tcs, - _callback - ) -> - circuitState <- - ReadyAsIntroductionPoint( - circuitId, - nodes, - _privateKey, - _publicKey, - _callback - ) - - tcs.SetResult() - | _ -> - failwith - "Unexpected circuit state when receiving ESTABLISHED_INTRO cell" - - controlLock.RunSyncWithSemaphore handleEstablished - | RelayData.RelayEstablishedRendezvous -> - let handleEstablished() = - match circuitState with - | RegisteringAsRendezvousPoint - ( - circuitId, nodes, tcs - ) -> - circuitState <- - ReadyAsRendezvousPoint(circuitId, nodes) - - tcs.SetResult() - | _ -> - failwith - "Unexpected circuit state when receiving RENDEZVOUS_ESTABLISHED cell" - - controlLock.RunSyncWithSemaphore handleEstablished - | RelaySendMe _ when streamId = Constants.DefaultStreamId -> - fromNode.Window.PackageIncrease() - | RelayIntroduce2 introduceMsg -> - let handleIntroduce() = - async { - match circuitState with - | ReadyAsIntroductionPoint(_, _, _, _, callback) -> - do! callback(introduceMsg) - | _ -> - return - failwith - "Received introduce2 cell over non-introduction-circuit huh?" - } - - do! controlLock.RunAsyncWithSemaphore handleIntroduce - | RelayTruncated reason -> - let handleTruncated() = - match circuitState with - | CircuitState.Initialized -> - // Circuit isn't created yet! - () - | Creating(circuitId, _, tcs) - | Extending(circuitId, _, _, tcs) -> - circuitState <- Truncated(circuitId, reason) - - tcs.SetException( - CircuitTruncatedException reason - ) - | WaitingForIntroduceAcknowledge(circuitId, _, tcs) -> - circuitState <- Truncated(circuitId, reason) - - tcs.SetException( - CircuitTruncatedException reason - ) - | RegisteringAsRendezvousPoint(circuitId, _, tcs) - | RegisteringAsIntroductionPoint - ( - circuitId, _, _, _, tcs, _ - ) - | WaitingForRendezvousRequest - ( - circuitId, _, _, _, _, _, tcs - ) -> - circuitState <- Truncated(circuitId, reason) - - tcs.SetException( - CircuitTruncatedException reason - ) - //FIXME: how can we tell the user that circuit is destroyed? if we throw here the listening thread with throw and user never finds out why - | Ready(circuitId, _) - | ReadyAsIntroductionPoint(circuitId, _, _, _, _) - | ReadyAsRendezvousPoint(circuitId, _) - // The circuit was already dead in our eyes, so we don't care about it being destroyed, just update the state to new destroyed state - | Destroyed(circuitId, _) - | Truncated(circuitId, _) -> - circuitState <- Truncated(circuitId, reason) - - controlLock.RunSyncWithSemaphore handleTruncated - | RelayData.RelayIntroduceAck ackMsg -> - let handleIntroduceAck() = - match circuitState with - | WaitingForIntroduceAcknowledge - ( - circuitId, nodes, tcs - ) -> - circuitState <- Ready(circuitId, nodes) - - tcs.SetResult(ackMsg) - | _ -> - failwith - "Unexpected circuit state when receiving RelayIntroduceAck cell" - - controlLock.RunSyncWithSemaphore handleIntroduceAck - | RelayData.RelayRendezvous2 rendMsg -> - let handleRendezvous() = - match circuitState with - | WaitingForRendezvousRequest - ( - circuitId, - nodes, - clientRandomPrivateKey, - clientRandomPublicKey, - introAuthPublicKey, - introEncPublicKey, - tcs - ) -> - - let serverPublicKey = - rendMsg.HandshakeData - |> Array.take Constants.KeyS256Length - - let ntorKeySeed, mac = - HiddenServicesCipher.CalculateClientRendezvousKeys - (X25519PublicKeyParameters( - serverPublicKey, - 0 - )) - clientRandomPublicKey - clientRandomPrivateKey - introAuthPublicKey - introEncPublicKey - - if mac - <> (rendMsg.HandshakeData - |> Array.skip Constants.KeyS256Length - |> Array.take Constants.Digest256Length) then - failwith "Invalid handshake data" - - - circuitState <- - Ready( - circuitId, - nodes - @ List.singleton - { - TorCircuitNode.CryptoState = - TorCryptoState.FromKdfResult - (Kdf.ComputeHSKdf - ntorKeySeed) - false - Window = - TorWindow - Constants.DefaultCircuitLevelWindowParams - } - ) - - tcs.SetResult() - | _ -> - failwith - "Unexpected circuit state when receiving Rendevzous2 cell" - - controlLock.RunSyncWithSemaphore handleRendezvous - | RelayBegin beginRequest -> - if beginRequest.Address.Split(':').[0] = String.Empty - && serviceStreamCallback.IsSome then - do! serviceStreamCallback.Value streamId self - | _ -> () - - if streamId <> Constants.DefaultStreamId then - match (streamsMap.TryFind streamId, relayData) with - | (Some stream, _) -> - do! stream.HandleIncomingData relayData - | (None, RelayBegin _) -> () - | (None, _) -> failwith "Unknown stream" - | :? CellDestroy as destroyCell -> - let handleDestroyed() = - - match circuitState with - | CircuitState.Initialized -> - // Circuit isn't created yet! - () - | Creating(circuitId, _, tcs) - | Extending(circuitId, _, _, tcs) -> - - circuitState <- - Destroyed(circuitId, destroyCell.Reason) - - tcs.SetException( - CircuitDestroyedException destroyCell.Reason - ) - | RegisteringAsRendezvousPoint(circuitId, _, tcs) - | RegisteringAsIntroductionPoint - ( - circuitId, _, _, _, tcs, _ - ) - | WaitingForRendezvousRequest - ( - circuitId, _, _, _, _, _, tcs - ) -> - - circuitState <- - Destroyed(circuitId, destroyCell.Reason) - - tcs.SetException( - CircuitDestroyedException destroyCell.Reason + //FIXME: add exception handling to mailbox and remove reply from here? + let! handleRes = + circuitOperationsMailBox.PostAndAsyncReply( + (fun replyChannel -> + CircuitOperation.HandleIncomingCell( + self, + cell, + replyChannel ) - | WaitingForIntroduceAcknowledge(circuitId, _, tcs) -> - circuitState <- - Destroyed(circuitId, destroyCell.Reason) - - tcs.SetException( - CircuitDestroyedException destroyCell.Reason - ) - //FIXME: how can we tell the user that circuit is destroyed? if we throw here the listening thread will throw and user never finds out why - | Ready(circuitId, _) - | ReadyAsIntroductionPoint(circuitId, _, _, _, _) - | ReadyAsRendezvousPoint(circuitId, _) - // The circuit was already dead in our eyes, so we don't care about it being destroyed, just update the state to new destroyed state - | Destroyed(circuitId, _) - | Truncated(circuitId, _) -> - circuitState <- - Destroyed(circuitId, destroyCell.Reason) + ), + Constants.CircuitRendezvousTimeout.TotalMilliseconds + |> int + ) - controlLock.RunSyncWithSemaphore handleDestroyed - | _ -> () + return UnwrapResult handleRes } diff --git a/NOnion/Network/TorGuard.fs b/NOnion/Network/TorGuard.fs index 4abadc2d..13128db3 100644 --- a/NOnion/Network/TorGuard.fs +++ b/NOnion/Network/TorGuard.fs @@ -13,6 +13,13 @@ open NOnion open NOnion.Cells open NOnion.Utility +type internal GuardSendMessage = + { + CircuitId: uint16 + CellToSend: ICell + ReplyChannel: AsyncReplyChannel> + } + type TorGuard private (client: TcpClient, sslStream: SslStream) = let shutdownToken = new CancellationTokenSource() @@ -20,7 +27,69 @@ type TorGuard private (client: TcpClient, sslStream: SslStream) = // Prevents two circuit setup happening at once (to prevent race condition on writing to CircuitIds list) let circuitSetupLock: obj = obj() - let sendLock = SemaphoreLocker() + let rec SendMailBoxProcessor(inbox: MailboxProcessor) = + let innerSend circuitId (cellToSend: ICell) = + async { + use memStream = new MemoryStream(Constants.FixedPayloadLength) + + use writer = new BinaryWriter(memStream) + cellToSend.Serialize writer + + // Write circuitId and command for the cell + // (We assume every cell that is being sent here uses 0 as circuitId + // because we haven't completed the handshake yet to have a circuit + // up.) + do! + circuitId + |> IntegerSerialization.FromUInt16ToBigEndianByteArray + |> StreamUtil.Write sslStream + + do! + Array.singleton cellToSend.Command + |> StreamUtil.Write sslStream + + if Command.IsVariableLength cellToSend.Command then + do! + memStream.Length + |> uint16 + |> IntegerSerialization.FromUInt16ToBigEndianByteArray + |> StreamUtil.Write sslStream + else + Array.zeroCreate( + Constants.FixedPayloadLength - int memStream.Position + ) + |> writer.Write + + do! memStream.ToArray() |> StreamUtil.Write sslStream + } + + async { + let! cancellationToken = Async.CancellationToken + cancellationToken.ThrowIfCancellationRequested() + + let! { + CircuitId = circuitId + CellToSend = cellToSend + ReplyChannel = replyChannel + } = inbox.Receive() + + try + do! innerSend circuitId cellToSend + OperationResult.Ok() |> replyChannel.Reply + with + | exn -> + match FSharpUtil.FindException exn with + | Some socketExn -> + NOnionSocketException socketExn :> exn + |> OperationResult.Failure + |> replyChannel.Reply + | None -> OperationResult.Failure exn |> replyChannel.Reply + + return! SendMailBoxProcessor inbox + } + + let sendMailBox = + MailboxProcessor.Start(SendMailBoxProcessor, shutdownToken.Token) static member NewClient(ipEndpoint: IPEndPoint) = async { @@ -96,50 +165,22 @@ type TorGuard private (client: TcpClient, sslStream: SslStream) = static member NewClientAsync ipEndpoint = TorGuard.NewClient ipEndpoint |> Async.StartAsTask - member __.Send (circuidId: uint16) (cellToSend: ICell) = - let safeSend() = - async { - use memStream = new MemoryStream(Constants.FixedPayloadLength) - use writer = new BinaryWriter(memStream) - cellToSend.Serialize writer - - // Write circuitId and command for the cell - // (We assume every cell that is being sent here uses 0 as circuitId - // because we haven't completed the handshake yet to have a circuit - // up.) - try - do! - circuidId - |> IntegerSerialization.FromUInt16ToBigEndianByteArray - |> StreamUtil.Write sslStream - - do! - Array.singleton cellToSend.Command - |> StreamUtil.Write sslStream - - if Command.IsVariableLength cellToSend.Command then - do! - memStream.Length - |> uint16 - |> IntegerSerialization.FromUInt16ToBigEndianByteArray - |> StreamUtil.Write sslStream - else - Array.zeroCreate( - Constants.FixedPayloadLength - - int memStream.Position - ) - |> writer.Write - - do! memStream.ToArray() |> StreamUtil.Write sslStream - with - | exn -> - match FSharpUtil.FindException exn with - | Some socketExn -> - return raise <| NOnionSocketException socketExn - | None -> return raise exn - } + member __.Send (circuitId: uint16) (cellToSend: ICell) = + async { + let! sendResult = + sendMailBox.PostAndAsyncReply(fun replyChannel -> + { + CircuitId = circuitId + CellToSend = cellToSend + ReplyChannel = replyChannel + } + ) - sendLock.RunAsyncWithSemaphore safeSend + match sendResult with + | OperationResult.Ok _ -> return () + | OperationResult.Failure exn -> + return raise <| FSharpUtil.ReRaise exn + } member self.SendAsync (circuidId: uint16) (cellToSend: ICell) = self.Send circuidId cellToSend |> Async.StartAsTask diff --git a/NOnion/Network/TorStream.fs b/NOnion/Network/TorStream.fs index fa2203a9..0a971d3e 100644 --- a/NOnion/Network/TorStream.fs +++ b/NOnion/Network/TorStream.fs @@ -9,11 +9,43 @@ open FSharpx.Collections open NOnion open NOnion.Cells.Relay open NOnion.Utility +open MailboxResultUtil + +type private StreamReceiveMessage = + { + StreamBuffer: array + BufferOffset: int + BufferLength: int + ReplyChannel: AsyncReplyChannel> + } + +type private StreamControlMessage = + | End of replayChannel: AsyncReplyChannel> + | Send of + array * + replayChannel: AsyncReplyChannel> + | StartServiceConnectionProcess of + port: int * + streamObj: ITorStream * + replayChannel: AsyncReplyChannel>> + | StartDirectoryConnectionProcess of + streamObj: ITorStream * + replayChannel: AsyncReplyChannel>> + | RegisterStream of + streamObj: ITorStream * + streamId: uint16 * + replayChannel: AsyncReplyChannel> + | HandleRelayConnected of + replayChannel: AsyncReplyChannel> + | HandleRelayEnd of + message: RelayData * + reason: EndReason * + replayChannel: AsyncReplyChannel> + | SendSendMe of replayChannel: AsyncReplyChannel> type TorStream(circuit: TorCircuit) = let mutable streamState: StreamState = StreamState.Initialized - let controlLock: SemaphoreLocker = SemaphoreLocker() let window: TorWindow = TorWindow Constants.DefaultStreamLevelWindowParams @@ -25,99 +57,64 @@ type TorStream(circuit: TorCircuit) = let mutable isEOF: bool = false let incomingCells: BufferBlock = BufferBlock() - let receiveLock: SemaphoreLocker = SemaphoreLocker() - static member Accept (streamId: uint16) (circuit: TorCircuit) = - async { - let stream = TorStream circuit - stream.RegisterIncomingStream streamId - - do! circuit.SendRelayCell streamId (RelayConnected Array.empty) None - - sprintf - "TorStream[%i,%i]: incoming stream accepted" - streamId - circuit.Id - |> TorLogger.Log - - return stream - } - - member __.End() = - async { - let safeEnd() = - async { - match streamState with - | Connected streamId -> - do! - circuit.SendRelayCell - streamId - (RelayEnd EndReason.Done) - None - - sprintf - "TorStream[%i,%i]: sending stream end packet" + let rec StreamControlMailBoxProcessor + (inbox: MailboxProcessor) + = + let safeEnd() = + async { + match streamState with + | Connected streamId -> + do! + circuit.SendRelayCell streamId - circuit.Id - |> TorLogger.Log - | _ -> - failwith - "Unexpected state when trying to end the stream" - } - - return! controlLock.RunAsyncWithSemaphore safeEnd - } - - member self.EndAsync() = - self.End() |> Async.StartAsTask - - - member __.SendData(data: array) = - async { - let safeSend() = - async { - match streamState with - | Connected streamId -> - let dataChunks = - SeqUtils.Chunk - Constants.MaximumRelayPayloadLength - data - - let rec sendChunks dataChunks = - async { - match Seq.tryHeadTail dataChunks with - | None -> () - | Some(head, nextDataChunks) -> - circuit.LastNode.Window.PackageDecrease() - - do! - circuit.SendRelayCell - streamId - (head - |> Array.ofSeq - |> RelayData.RelayData) - None - - window.PackageDecrease() - do! nextDataChunks |> sendChunks - } - - do! sendChunks dataChunks - | _ -> - failwith - "Unexpected state when trying to send data over stream" - } + (RelayEnd EndReason.Done) + None - return! controlLock.RunAsyncWithSemaphore safeSend - } + sprintf + "TorStream[%i,%i]: sending stream end packet" + streamId + circuit.Id + |> TorLogger.Log + | _ -> failwith "Unexpected state when trying to end the stream" + } - member self.SendDataAsync data = - self.SendData data |> Async.StartAsTask + let safeSend(data: array) = + async { + match streamState with + | Connected streamId -> + let dataChunks = + SeqUtils.Chunk Constants.MaximumRelayPayloadLength data + + let rec sendChunks dataChunks = + async { + match Seq.tryHeadTail dataChunks with + | None -> () + | Some(head, nextDataChunks) -> + let! lastNode = circuit.GetLastNode() + lastNode.Window.PackageDecrease() + + do! + circuit.SendRelayCell + streamId + (head + |> Array.ofSeq + |> RelayData.RelayData) + None + + window.PackageDecrease() + do! nextDataChunks |> sendChunks + } + + do! sendChunks dataChunks + | _ -> + failwith + "Unexpected state when trying to send data over stream" + } - member self.ConnectToService(port: int) = - let startConnectionProcess() = + let startServiceConnectionProcess (port: int) (streamObj: ITorStream) = async { - let streamId = circuit.RegisterStream self None + let streamId = circuit.RegisterStream streamObj None let tcs = TaskCompletionSource() @@ -142,200 +139,377 @@ type TorStream(circuit: TorCircuit) = return tcs.Task } - async { - let! connectionProcessTcs = - controlLock.RunAsyncWithSemaphore startConnectionProcess + let startDirectoryConnectionProcess(streamObj: ITorStream) = + async { + let streamId = circuit.RegisterStream streamObj None - return! - connectionProcessTcs - |> Async.AwaitTask - |> FSharpUtil.WithTimeout Constants.StreamCreationTimeout - } + let tcs = TaskCompletionSource() - member self.ConnectToDirectory() = - async { - let startConnectionProcess() = - async { - let streamId = circuit.RegisterStream self None + streamState <- Connecting(streamId, tcs) + + sprintf + "TorStream[%i,%i]: creating a directory stream" + streamId + circuit.Id + |> TorLogger.Log + + do! + circuit.SendRelayCell + streamId + RelayData.RelayBeginDirectory + None + + return tcs.Task + } + + let registerProcess (streamObj: ITorStream) (streamId: uint16) = + streamState <- + circuit.RegisterStream streamObj (Some streamId) |> Connected - let tcs = TaskCompletionSource() + let handleRelayConnected() = + match streamState with + | Connecting(streamId, tcs) -> + streamState <- Connected streamId + tcs.SetResult streamId - streamState <- Connecting(streamId, tcs) + sprintf "TorStream[%i,%i]: connected!" streamId circuit.Id + |> TorLogger.Log + | _ -> + failwith "Unexpected state when receiving RelayConnected cell" + + let handleRelayEnd (message: RelayData) (reason: EndReason) = + match streamState with + | Connecting(streamId, tcs) -> + sprintf + "TorStream[%i,%i]: received end packet while connecting" + streamId + circuit.Id + |> TorLogger.Log + Failure( sprintf - "TorStream[%i,%i]: creating a directory stream" - streamId - circuit.Id - |> TorLogger.Log + "Stream connection process failed! Reason: %s" + (reason.ToString()) + ) + |> tcs.SetException + | Connected streamId -> + sprintf + "TorStream[%i,%i]: received end packet while connected" + streamId + circuit.Id + |> TorLogger.Log - do! + incomingCells.Post message |> ignore + | _ -> failwith "Unexpected state when receiving RelayEnd cell" + + let sendSendMe() = + async { + match streamState with + | Connected streamId -> + return! circuit.SendRelayCell streamId - RelayData.RelayBeginDirectory + RelayData.RelaySendMe None + | _ -> + failwith "Unexpected state when sending stream-level sendme" + } - return tcs.Task - } + async { + let! cancellationToken = Async.CancellationToken + cancellationToken.ThrowIfCancellationRequested() - let! connectionProcessTcs = - controlLock.RunAsyncWithSemaphore startConnectionProcess + let! command = inbox.Receive() - return! - connectionProcessTcs - |> Async.AwaitTask - |> FSharpUtil.WithTimeout Constants.StreamCreationTimeout + match command with + | End replyChannel -> + do! safeEnd() |> TryExecuteAsyncAndReplyAsResult replyChannel + | Send(data, replyChannel) -> + do! + safeSend data + |> TryExecuteAsyncAndReplyAsResult replyChannel + | StartServiceConnectionProcess(port, streamObj, replyChannel) -> + do! + startServiceConnectionProcess port streamObj + |> TryExecuteAsyncAndReplyAsResult replyChannel + | StartDirectoryConnectionProcess(streamObj, replyChannel) -> + do! + startDirectoryConnectionProcess(streamObj) + |> TryExecuteAsyncAndReplyAsResult replyChannel + | RegisterStream(streamObj, streamId, replyChannel) -> + TryExecuteAndReplyAsResult + replyChannel + (fun _ -> registerProcess streamObj streamId) + | HandleRelayConnected replyChannel -> + TryExecuteAndReplyAsResult replyChannel handleRelayConnected + | HandleRelayEnd(message, reason, replyChannel) -> + TryExecuteAndReplyAsResult + replyChannel + (fun _ -> handleRelayEnd message reason) + | SendSendMe replyChannel -> + do! sendSendMe() |> TryExecuteAsyncAndReplyAsResult replyChannel + + return! StreamControlMailBoxProcessor inbox } - member self.ConnectToDirectoryAsync() = - self.ConnectToDirectory() |> Async.StartAsTask + let streamControlMailBox = + MailboxProcessor.Start StreamControlMailBoxProcessor - member private self.RegisterIncomingStream(streamId: uint16) = - let registerProcess() = - streamState <- - circuit.RegisterStream self (Some streamId) |> Connected + let rec StreamReceiveMailBoxProcessor + (inbox: MailboxProcessor) + = + let currentBufferHasRemainingBytes() = + bufferLength > bufferOffset - controlLock.RunSyncWithSemaphore registerProcess + let currentBufferRemainingBytes() = + bufferLength - bufferOffset - member self.Receive (buffer: array) (offset: int) (length: int) = - async { - let currentBufferHasRemainingBytes() = - bufferLength > bufferOffset - - let currentBufferRemainingBytes() = - bufferLength - bufferOffset - - let readFromCurrentBuffer - (buffer: array) - (offset: int) - (len: int) - = - let readLength = min len (currentBufferRemainingBytes()) - Array.blit currentBuffer bufferOffset buffer offset readLength - bufferOffset <- bufferOffset + readLength - - readLength - - let processIncomingCell() = - async { - let! nextCell = - incomingCells.ReceiveAsync() |> Async.AwaitTask - - match nextCell with - | RelayData data -> - Array.blit data 0 currentBuffer 0 data.Length - bufferOffset <- 0 - bufferLength <- data.Length - - window.DeliverDecrease() - - if window.NeedSendme() then - let sendSendMe() = - async { - match streamState with - | Connected streamId -> - return! - circuit.SendRelayCell - streamId - RelayData.RelaySendMe - None - | _ -> - failwith - "Unexpected state when sending stream-level sendme" - } - - do! controlLock.RunAsyncWithSemaphore sendSendMe - - | RelayEnd reason when reason = EndReason.Done -> - TorLogger.Log( - sprintf - "TorStream[%s,%i]: pushed EOF to consumer" - streamState.Id - circuit.Id - ) + let readFromCurrentBuffer + (buffer: array) + (offset: int) + (len: int) + = + let readLength = min len (currentBufferRemainingBytes()) + Array.blit currentBuffer bufferOffset buffer offset readLength + bufferOffset <- bufferOffset + readLength + + readLength + + let processIncomingCell() = + async { + let! nextCell = incomingCells.ReceiveAsync() |> Async.AwaitTask + + match nextCell with + | RelayData data -> + Array.blit data 0 currentBuffer 0 data.Length + bufferOffset <- 0 + bufferLength <- data.Length + + window.DeliverDecrease() + + if window.NeedSendme() then + + let! sendResult = + streamControlMailBox.PostAndAsyncReply + StreamControlMessage.SendSendMe + + return UnwrapResult sendResult + + | RelayEnd reason when reason = EndReason.Done -> + TorLogger.Log( + sprintf + "TorStream[%i]: pushed EOF to consumer" + circuit.Id + ) + + currentBuffer <- Array.empty + bufferOffset <- 0 + bufferLength <- 0 + isEOF <- true - currentBuffer <- Array.empty - bufferOffset <- 0 - bufferLength <- 0 - isEOF <- true - - let markStreamAsEnded() = - match streamState with - | Connected streamId -> - streamState <- Ended(streamId, reason) - | _ -> () - - controlLock.RunSyncWithSemaphore markStreamAsEnded - | RelayEnd reason -> - return - failwithf - "Stream closed unexpectedly, reason = %s" - (reason.ToString()) - | _ -> - return - failwith - "IncomingCells should not keep unrelated cells" - } - - let rec fillBuffer() = - async { - do! processIncomingCell() - - if isEOF || currentBufferHasRemainingBytes() then + | RelayEnd reason -> + return + failwithf + "Stream closed unexpectedly, reason = %s" + (reason.ToString()) + | _ -> + return + failwith "IncomingCells should not keep unrelated cells" + } + + let rec fillBuffer() = + async { + do! processIncomingCell() + + if isEOF || currentBufferHasRemainingBytes() then + return () + else + return! fillBuffer() + } + + let refillBufferIfNeeded() = + async { + if not isEOF then + if currentBufferHasRemainingBytes() then return () else return! fillBuffer() - } - - let refillBufferIfNeeded() = - async { - if not isEOF then - if currentBufferHasRemainingBytes() then - return () - else - return! fillBuffer() - } - - - let safeReceive() = - async { - if length = 0 then - return 0 + } + + + let safeReceive(buffer: array, offset: int, length: int) = + async { + if length = 0 then + return 0 + else + do! refillBufferIfNeeded() + + if isEOF then + return -1 else - do! refillBufferIfNeeded() - - if isEOF then - return -1 - else - let rec tryRead bytesRead bytesRemaining = - async { - if bytesRemaining > 0 && not isEOF then - do! refillBufferIfNeeded() - - let newBytesRead = - bytesRead - + (readFromCurrentBuffer - buffer - (offset + bytesRead) - (length - bytesRead)) - - let newBytesRemaining = - length - newBytesRead - - if incomingCells.Count = 0 then - return newBytesRead - else - return! - tryRead - newBytesRead - newBytesRemaining + let rec tryRead bytesRead bytesRemaining = + async { + if bytesRemaining > 0 && not isEOF then + do! refillBufferIfNeeded() + + let newBytesRead = + bytesRead + + (readFromCurrentBuffer + buffer + (offset + bytesRead) + (length - bytesRead)) + + let newBytesRemaining = + length - newBytesRead + + if incomingCells.Count = 0 then + return newBytesRead else - return bytesRead - } + return! + tryRead + newBytesRead + newBytesRemaining + else + return bytesRead + } - return! tryRead 0 length - } + return! tryRead 0 length + } - return! receiveLock.RunAsyncWithSemaphore safeReceive + + async { + let! cancellationToken = Async.CancellationToken + cancellationToken.ThrowIfCancellationRequested() + + let! { + StreamBuffer = buffer + BufferOffset = offset + BufferLength = length + ReplyChannel = replyChannel + } = inbox.Receive() + + do! + safeReceive(buffer, offset, length) + |> TryExecuteAsyncAndReplyAsResult replyChannel + + return! StreamReceiveMailBoxProcessor inbox + } + + let streamReceiveMailBox = + MailboxProcessor.Start StreamReceiveMailBoxProcessor + + static member Accept (streamId: uint16) (circuit: TorCircuit) = + async { + let stream = TorStream circuit + do! stream.RegisterIncomingStream streamId + + do! circuit.SendRelayCell streamId (RelayConnected Array.empty) None + + sprintf + "TorStream[%i,%i]: incoming stream accepted" + streamId + circuit.Id + |> TorLogger.Log + + return stream + } + + member __.End() = + async { + let! sendResult = + streamControlMailBox.PostAndAsyncReply StreamControlMessage.End + + return UnwrapResult sendResult + } + + member self.EndAsync() = + self.End() |> Async.StartAsTask + + + member __.SendData(data: array) = + async { + let! sendResult = + streamControlMailBox.PostAndAsyncReply(fun replyChannel -> + StreamControlMessage.Send(data, replyChannel) + ) + + return UnwrapResult sendResult + } + + member self.SendDataAsync data = + self.SendData data |> Async.StartAsTask + + member self.ConnectToService(port: int) = + async { + let! completionTaskRes = + streamControlMailBox.PostAndAsyncReply( + (fun replyChannel -> + StreamControlMessage.StartServiceConnectionProcess( + port, + self, + replyChannel + ) + ), + Constants.StreamCreationTimeout.TotalMilliseconds |> int + ) + + return! + completionTaskRes + |> UnwrapResult + |> Async.AwaitTask + |> FSharpUtil.WithTimeout Constants.StreamCreationTimeout + } + + member self.ConnectToDirectory() = + async { + let! completionTaskResult = + streamControlMailBox.PostAndAsyncReply( + (fun replyChannel -> + StreamControlMessage.StartDirectoryConnectionProcess( + self, + replyChannel + ) + ), + Constants.StreamCreationTimeout.TotalMilliseconds |> int + ) + + return! + completionTaskResult + |> UnwrapResult + |> Async.AwaitTask + |> FSharpUtil.WithTimeout Constants.StreamCreationTimeout + } + + member self.ConnectToDirectoryAsync() = + self.ConnectToDirectory() |> Async.StartAsTask + + member private self.RegisterIncomingStream(streamId: uint16) = + async { + let! registerationResult = + streamControlMailBox.PostAndAsyncReply(fun replyChannel -> + StreamControlMessage.RegisterStream( + self, + streamId, + replyChannel + ) + ) + + return UnwrapResult registerationResult + } + + member self.Receive (buffer: array) (offset: int) (length: int) = + async { + let! receiveResult = + streamReceiveMailBox.PostAndAsyncReply(fun replyChannel -> + { + StreamBuffer = buffer + BufferOffset = offset + BufferLength = length + ReplyChannel = replyChannel + } + ) + + return UnwrapResult receiveResult } member self.ReceiveAsync(buffer: array, offset: int, length: int) = @@ -346,55 +520,23 @@ type TorStream(circuit: TorCircuit) = async { match message with | RelayConnected _ -> - let handleRelayConnected() = - match streamState with - | Connecting(streamId, tcs) -> - streamState <- Connected streamId - tcs.SetResult streamId - - sprintf - "TorStream[%i,%i]: connected!" - streamId - circuit.Id - |> TorLogger.Log - | _ -> - failwith - "Unexpected state when receiving RelayConnected cell" - - - controlLock.RunSyncWithSemaphore handleRelayConnected + let! handleConnectedResult = + streamControlMailBox.PostAndAsyncReply + StreamControlMessage.HandleRelayConnected + + return UnwrapResult handleConnectedResult | RelayData _ -> incomingCells.Post message |> ignore | RelaySendMe _ -> window.PackageIncrease() | RelayEnd reason -> - let handleRelayEnd() = - match streamState with - | Connecting(streamId, tcs) -> - sprintf - "TorStream[%i,%i]: received end packet while connecting" - streamId - circuit.Id - |> TorLogger.Log - - streamState <- Ended(streamId, reason) - - Failure( - sprintf - "Stream connection process failed! Reason: %s" - (reason.ToString()) + let! handleEndResult = + streamControlMailBox.PostAndAsyncReply(fun replyChannel -> + StreamControlMessage.HandleRelayEnd( + message, + reason, + replyChannel ) - |> tcs.SetException - | Connected streamId -> - sprintf - "TorStream[%i,%i]: received end packet while connected" - streamId - circuit.Id - |> TorLogger.Log - - incomingCells.Post message |> ignore - | _ -> - failwith - "Unexpected state when receiving RelayEnd cell" - - controlLock.RunSyncWithSemaphore handleRelayEnd + ) + + return UnwrapResult handleEndResult | _ -> () } diff --git a/NOnion/Utility/MailboxUtil.fs b/NOnion/Utility/MailboxUtil.fs new file mode 100644 index 00000000..7d70a547 --- /dev/null +++ b/NOnion/Utility/MailboxUtil.fs @@ -0,0 +1,40 @@ +namespace NOnion.Utility + +open System.Net.Sockets + +//FIXME: for some reason FSharpUtil is in NOnion namespace instead of NOnion.Utility +open NOnion + +module internal MailboxResultUtil = + let HandleError<'T> + exn + (replyChannel: AsyncReplyChannel>) + = + match FSharpUtil.FindException exn with + | Some socketExn -> + NOnionSocketException socketExn :> exn + |> OperationResult.Failure + |> replyChannel.Reply + | None -> OperationResult.Failure exn |> replyChannel.Reply + + let TryExecuteAsyncAndReplyAsResult<'T> + (replyChannel: AsyncReplyChannel>) + (job: Async<'T>) + = + async { + try + let! result = job + OperationResult.Ok result |> replyChannel.Reply + with + | exn -> HandleError exn replyChannel + } + + let TryExecuteAndReplyAsResult<'T> + (replyChannel: AsyncReplyChannel>) + (job: unit -> 'T) + = + try + let result = job() + OperationResult.Ok result |> replyChannel.Reply + with + | exn -> HandleError exn replyChannel diff --git a/NOnion/Utility/ResultUtil.fs b/NOnion/Utility/ResultUtil.fs new file mode 100644 index 00000000..e997ec7c --- /dev/null +++ b/NOnion/Utility/ResultUtil.fs @@ -0,0 +1,19 @@ +namespace NOnion.Utility + +//FIXME: for some reason FSharpUtil is in NOnion namespace instead of NOnion.Utility +open NOnion + +// RequireQualifiedAccess is needed to prevent collision with +// Failure function that creates general exceptions + +[] +type internal OperationResult<'T> = + | Ok of 'T + | Failure of exn + +[] +module internal ResultUtil = + let UnwrapResult<'T>(resultObj: OperationResult<'T>) = + match resultObj with + | OperationResult.Ok result -> result + | OperationResult.Failure ex -> raise <| FSharpUtil.ReRaise ex From b38804086eeb8a8f57c59879f0278894b76e4937 Mon Sep 17 00:00:00 2001 From: Afshin Arani Date: Fri, 21 Oct 2022 14:07:49 +0330 Subject: [PATCH 2/3] Utility: move FSharpUtil to correct namespace FSharpUtil was inside the NOnion namespace instead of NOnion.Utility, this commit fixes that. --- NOnion/Http/TorHttpClient.fs | 1 + NOnion/Utility/FSharpUtil.fs | 4 +++- NOnion/Utility/MailboxUtil.fs | 1 - 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/NOnion/Http/TorHttpClient.fs b/NOnion/Http/TorHttpClient.fs index 829fd6bb..d5e83914 100644 --- a/NOnion/Http/TorHttpClient.fs +++ b/NOnion/Http/TorHttpClient.fs @@ -7,6 +7,7 @@ open System.IO.Compression open NOnion open NOnion.Network +open NOnion.Utility type TorHttpClient(stream: TorStream, host: string) = diff --git a/NOnion/Utility/FSharpUtil.fs b/NOnion/Utility/FSharpUtil.fs index 5235fade..f3c42494 100644 --- a/NOnion/Utility/FSharpUtil.fs +++ b/NOnion/Utility/FSharpUtil.fs @@ -1,10 +1,12 @@ -namespace NOnion +namespace NOnion.Utility open System open System.Runtime.ExceptionServices open FSharpx.Collections +open NOnion + module FSharpUtil = //Implementation copied from https://github.com/nblockchain/geewallet/blob/master/src/GWallet.Backend/FSharpUtil.fs let ReRaise(ex: Exception) : Exception = diff --git a/NOnion/Utility/MailboxUtil.fs b/NOnion/Utility/MailboxUtil.fs index 7d70a547..6024f954 100644 --- a/NOnion/Utility/MailboxUtil.fs +++ b/NOnion/Utility/MailboxUtil.fs @@ -2,7 +2,6 @@ open System.Net.Sockets -//FIXME: for some reason FSharpUtil is in NOnion namespace instead of NOnion.Utility open NOnion module internal MailboxResultUtil = From 098a925b2ee369ac0e7d5ace385ac01decb45202 Mon Sep 17 00:00:00 2001 From: Afshin Arani Date: Fri, 21 Oct 2022 14:31:03 +0330 Subject: [PATCH 3/3] NOnion: fix problem in NOnion project file VS keeps trying to change the symbol `>` to `>` everytime it changes the project file (e.g. when adding/removing file) which is annoying and causes unrelated diff in commits, this commit fixes that. --- NOnion/NOnion.fsproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/NOnion/NOnion.fsproj b/NOnion/NOnion.fsproj index d62a61d1..b7a72d39 100644 --- a/NOnion/NOnion.fsproj +++ b/NOnion/NOnion.fsproj @@ -99,7 +99,7 @@ - +