Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: Replace token-bucket with token-limiter #2033

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion chainweb.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,8 @@ library
, Chainweb.Utils.RequestLog
, Chainweb.Utils.Rule
, Chainweb.Utils.Serialization
, Chainweb.Utils.Throttling
, Chainweb.Utils.TokenLimiting
, Chainweb.VerifierPlugin
, Chainweb.VerifierPlugin.Allow
, Chainweb.VerifierPlugin.Hyperlane.Announcement
Expand Down Expand Up @@ -361,6 +363,7 @@ library
, base64-bytestring-kadena == 0.1
, binary >= 0.8
, bytestring >= 0.10.12
, cache >= 0.1.1.2
, case-insensitive >= 1.2
, cassava >= 0.5.1
, chainweb-storage >= 0.1
Expand Down Expand Up @@ -434,7 +437,7 @@ library
, time >= 1.12.2
, tls >=1.9
, tls-session-manager >= 0.0
, token-bucket >= 0.1
, token-limiter >= 0.1
, transformers >= 0.5
, trifecta >= 2.1
, unliftio >= 0.2
Expand Down
62 changes: 40 additions & 22 deletions src/Chainweb/Chainweb.hs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ import P2P.Peer

import qualified Pact.Types.ChainMeta as P
import qualified Pact.Types.Command as P
import qualified Chainweb.Utils.Throttling as Throttling

-- -------------------------------------------------------------------------- --
-- Chainweb Resources
Expand Down Expand Up @@ -718,28 +719,29 @@ runChainweb cw nowServing = do
logg Warn $ "OpenAPI spec validation enabled on service API, make sure this is what you want"
mkValidationMiddleware
else return id

concurrentlies_

-- 1. Start serving Rest API
[ (if tls then serve else servePlain)
$ httpLog
. throttle (_chainwebPutPeerThrottler cw)
. throttle (_chainwebMempoolThrottler cw)
. throttle (_chainwebThrottler cw)
. p2pRequestSizeLimit
. p2pValidationMiddleware

-- 2. Start Clients (with a delay of 500ms)
, threadDelay 500000 >> clients

-- 3. Start serving local API
, threadDelay 500000 >> do
serveServiceApi
$ serviceHttpLog
. serviceRequestSizeLimit
. serviceApiValidationMiddleware
]
Throttling.throttleMiddleware (logFunction $ _chainwebLogger cw) "p2p" p2pThrottleEconomy $ \p2pThrottler ->
Throttling.throttleMiddleware (logFunction $ _chainwebLogger cw) "service" serviceThrottleEconomy $ \serviceThrottler ->

concurrentlies_

-- 1. Start serving Rest API
[ (if tls then serve else servePlain)
$ httpLog
. p2pRequestSizeLimit
. p2pThrottler
. p2pValidationMiddleware

-- 2. Start Clients (with a delay of 500ms)
, threadDelay 500000 >> clients

-- 3. Start serving local API
, threadDelay 500000 >> do
serveServiceApi
$ serviceHttpLog
. serviceRequestSizeLimit
. serviceThrottler
. serviceApiValidationMiddleware
]

where

Expand Down Expand Up @@ -864,6 +866,22 @@ runChainweb cw nowServing = do
setMaxLengthForRequest (\_req -> pure $ Just $ 2 * 1024 * 1024) -- 2MB
defaultRequestSizeLimitSettings

p2pThrottleEconomy = Throttling.ThrottleEconomy
{ Throttling.requestCost = 10
, Throttling.requestBody100ByteCost = 1
, Throttling.responseBody100ByteCost = 2
, Throttling.maxBudget = 35_000
, Throttling.freeRate = 35_000
}

serviceThrottleEconomy = Throttling.ThrottleEconomy
{ Throttling.requestCost = 10
, Throttling.requestBody100ByteCost = 1
, Throttling.responseBody100ByteCost = 2
, Throttling.maxBudget = 50_000
, Throttling.freeRate = 50_000
}

-- Request size limit for the P2P API
--
-- NOTE: this may need to have to be adjusted if the p2p limits for batch
Expand Down
12 changes: 8 additions & 4 deletions src/Chainweb/Utils.hs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ import Configuration.Utils hiding (Error, Lens)
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async
import Control.Concurrent.MVar
import Control.Concurrent.TokenBucket
import Control.Concurrent.TokenLimiter
import Control.DeepSeq
import Control.Exception (SomeAsyncException(..), evaluate)
import Control.Lens hiding ((.=))
Expand Down Expand Up @@ -970,9 +970,13 @@ runForeverThrottled
-> IO ()
-> IO ()
runForeverThrottled logfun name burst rate a = mask $ \umask -> do
tokenBucket <- newTokenBucket
let config = defaultLimitConfig
{ maxBucketTokens = fromIntegral burst
, bucketRefillTokensPerSecond = fromIntegral rate
}
tokenBucket <- newRateLimiter config
logfun Debug $ "start " <> name
let runThrottled = tokenBucketWait tokenBucket burst rate >> a
let runThrottled = waitDebit config tokenBucket 1 >> a
go = do
forever (umask runThrottled) `catchAllSynchronous` \e ->
logfun Error $ name <> " failed: " <> sshow e <> ". Restarting ..."
Expand Down Expand Up @@ -1494,4 +1498,4 @@ unsafeHead msg = \case
unsafeTail :: HasCallStack => String -> [a] -> [a]
unsafeTail msg = \case
_ : xs -> xs
[] -> error $ "unsafeTail: empty list: " <> msg
[] -> error $ "unsafeTail: empty list: " <> msg
145 changes: 145 additions & 0 deletions src/Chainweb/Utils/Throttling.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}

module Chainweb.Utils.Throttling
( ThrottleEconomy(..)
, ThrottledException(..)
, throttleMiddleware
) where

import Data.LogMessage
import Data.Text (Text)
import qualified Network.Wai as Wai
import qualified Network.Wai.Internal as Wai.Internal
import Chainweb.Utils.TokenLimiting
import Control.Exception.Safe
import Network.HTTP.Types.Status

Check failure on line 21 in src/Chainweb/Utils/Throttling.hs

View workflow job for this annotation

GitHub Actions / Build (9.8.2, 3.12, macos-latest, true)

The import of ‘Network.HTTP.Types.Status’ is redundant

Check failure on line 21 in src/Chainweb/Utils/Throttling.hs

View workflow job for this annotation

GitHub Actions / Build (9.8.2, 3.12, ubuntu-22.04, false)

The import of ‘Network.HTTP.Types.Status’ is redundant

Check failure on line 21 in src/Chainweb/Utils/Throttling.hs

View workflow job for this annotation

GitHub Actions / Build (9.10.1, 3.12, ubuntu-22.04, false)

The import of ‘Network.HTTP.Types.Status’ is redundant

Check failure on line 21 in src/Chainweb/Utils/Throttling.hs

View workflow job for this annotation

GitHub Actions / Build (9.6.6, 3.12, ubuntu-22.04, false)

The import of ‘Network.HTTP.Types.Status’ is redundant

Check failure on line 21 in src/Chainweb/Utils/Throttling.hs

View workflow job for this annotation

GitHub Actions / Build (9.8.2, 3.12, ubuntu-22.04, true)

The import of ‘Network.HTTP.Types.Status’ is redundant
import qualified Data.ByteString as BS
import qualified Data.Text as T
import Data.Hashable
import Network.Socket (SockAddr(..))
import qualified Data.ByteString.Builder as BSB
import System.IO.Unsafe (unsafeInterleaveIO)
import qualified Data.ByteString.Lazy as LBS

data ThrottleEconomy = ThrottleEconomy
{ requestCost :: Int
, requestBody100ByteCost :: Int
, responseBody100ByteCost :: Int
, maxBudget :: Int
, freeRate :: Int
}

data ThrottledException = ThrottledException Text
deriving (Show, Exception)

hashWithSalt' :: Hashable a => a -> Int -> Int
hashWithSalt' = flip hashWithSalt

newtype HashableSockAddr = HashableSockAddr SockAddr
deriving newtype Eq
instance Hashable HashableSockAddr where
hashWithSalt salt (HashableSockAddr sockAddr) = case sockAddr of
SockAddrInet port hostAddr ->
-- constructor tag
hashWithSalt' (1 :: Word)
. hashWithSalt' (fromIntegral port :: Word)
. hashWithSalt' hostAddr
$ salt
SockAddrInet6 port flowInfo hostAddr scopeId ->
hashWithSalt' (2 :: Word)
. hashWithSalt' (fromIntegral port :: Word)
. hashWithSalt' flowInfo
. hashWithSalt' hostAddr
. hashWithSalt' scopeId
$ salt
SockAddrUnix str ->
hashWithSalt' (3 :: Word)
. hashWithSalt' str
$ salt

debitOrDie :: Hashable k => TokenLimitMap k -> (Text, k) -> Int -> IO ()
debitOrDie tokenLimitMap (name, k) cost = do
tryDebit cost k tokenLimitMap >>= \case
True -> return ()
False -> throwIO (ThrottledException name)

throttleMiddleware :: LogFunction -> Text -> ThrottleEconomy -> (Wai.Middleware -> IO r) -> IO r
throttleMiddleware logfun name ThrottleEconomy{..} k =
withTokenLimitMap logfun ("request-throttler-" <> name) limitCachePolicy limitConfig $ \tokenLimitMap -> do
k $ middleware tokenLimitMap
where
middleware tokenLimitMap app request respond = do
debitOrDie' requestCost
meteredRequest <- meterRequest debitOrDie' request
app meteredRequest (meterResponse debitOrDie' respond)
where
host = HashableSockAddr $ Wai.remoteHost request
hostText = T.pack $ show (Wai.remoteHost request)
debitOrDie' = debitOrDie tokenLimitMap (hostText, host)

limitCachePolicy = TokenLimitCachePolicy 30
limitConfig = defaultLimitConfig
{ maxBucketTokens = maxBudget
, initialBucketTokens = maxBudget
, bucketRefillTokensPerSecond = freeRate
}

meterRequest debit request
| requestBody100ByteCost == 0 = return request
| otherwise = case Wai.requestBodyLength request of
Wai.KnownLength requestBodyLen -> do
() <- debit $ (requestBody100ByteCost * fromIntegral requestBodyLen) `div` 100
return request
Wai.ChunkedBody ->
return (Wai.setRequestBodyChunks (getMeteredRequestBodyChunk debit request) request)

getMeteredRequestBodyChunk debit request = do
chunk <- Wai.getRequestBodyChunk request
-- charge *after* receiving a request body chunk
() <- debit $ (requestBody100ByteCost * BS.length chunk) `div` 100
return chunk

-- the only way to match on responses without using internal API is via
-- responseToStream, which converts any response into a streaming response.
-- unfortunately:
-- * all of the responses produced by servant are builder responses,
-- not streaming responses
-- * streaming responses are not supported by http2; we try to use http2
-- (see https://hackage.haskell.org/package/http2-5.3.5/docs/src/Network.HTTP2.Server.Run.html#runIO)
-- * a streaming response body may be less efficient than a builder
-- response body, in particular because it needs to use a chunked
-- encoding
--
meterResponse
:: (Int -> IO ())
-> (Wai.Response -> IO a) -> Wai.Response -> IO a
meterResponse _ respond response
| responseBody100ByteCost == 0 = respond response
meterResponse debit respond (Wai.Internal.ResponseStream status headers responseBody) = do
respond
$ Wai.responseStream status headers
$ meterStreamingResponseBody debit responseBody
meterResponse debit respond (Wai.Internal.ResponseBuilder status headers responseBody) = do
respond
<$> Wai.responseLBS status headers . LBS.fromChunks
=<< meterBuilderResponseBody debit (LBS.toChunks $ BSB.toLazyByteString responseBody)
meterResponse _ _ _ = error "unrecognized response type"

meterStreamingResponseBody debit responseBody send flush = responseBody
(\chunkBSBuilder -> do
let chunkBS = BS.toStrict (BSB.toLazyByteString chunkBSBuilder)
() <- debit $ (responseBody100ByteCost * BS.length chunkBS) `div` 100
-- charger *before* sending a response body chunk
send (BSB.byteString chunkBS)
)
flush
meterBuilderResponseBody debit (chunk:chunks) = unsafeInterleaveIO $ do
() <- debit $ (responseBody100ByteCost * BS.length chunk) `div` 100
(chunk:) <$> meterBuilderResponseBody debit chunks
meterBuilderResponseBody _ [] = return []
Loading
Loading