Skip to content

Commit

Permalink
wip: new rate limiter
Browse files Browse the repository at this point in the history
  • Loading branch information
edmundnoble committed Mar 5, 2025
1 parent 80a188b commit 2f93e35
Show file tree
Hide file tree
Showing 12 changed files with 527 additions and 146 deletions.
6 changes: 6 additions & 0 deletions cabal.project
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ package yet-another-logger
--
-- nix-prefetch-git --url <location> --rev <tag>

source-repository-package
type: git
location: https://github.com/alephcloud/hs-configuration-tools.git
tag: 2bf1a000a5566d61d7129efe7685ec19ec48c066
--sha256: ...

source-repository-package
type: git
location: https://github.com/kadena-io/pact.git
Expand Down
6 changes: 5 additions & 1 deletion chainweb.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,8 @@ library
, Chainweb.Utils.RequestLog
, Chainweb.Utils.Rule
, Chainweb.Utils.Serialization
, Chainweb.Utils.Throttle
, Chainweb.Utils.TokenLimiting
, Chainweb.VerifierPlugin
, Chainweb.VerifierPlugin.Allow
, Chainweb.VerifierPlugin.Hyperlane.Announcement
Expand Down Expand Up @@ -380,6 +382,7 @@ library
, base64-bytestring-kadena == 0.1
, binary >= 0.8
, bytestring >= 0.10.12
, cache >= 0.1.1.2
, case-insensitive >= 1.2
, cassava >= 0.5.1
, chainweb-storage >= 0.1
Expand Down Expand Up @@ -455,7 +458,7 @@ library
, time >= 1.12.2
, tls >=2.1.4
, tls-session-manager >= 0.0
, token-bucket >= 0.1
, token-limiter >= 0.1
, transformers >= 0.5
, trifecta >= 2.1
, unliftio >= 0.2
Expand Down Expand Up @@ -665,6 +668,7 @@ test-suite chainweb-tests
Chainweb.Test.SPV
Chainweb.Test.SPV.EventProof
Chainweb.Test.Sync.WebBlockHeaderStore
Chainweb.Test.Throttle
Chainweb.Test.TreeDB
Chainweb.Test.TreeDB.RemoteDB
Chainweb.Test.Version
Expand Down
5 changes: 5 additions & 0 deletions notes
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
- Data.Cache's pruning is slow
- Not sure how well the exception throwing behaves in the throttler, especially after the response has happened
- Take some comments and/or tests from lars/new-throttler, figure out why it wasn't merged
- How can we penalize peers in a way that prevents us from grabbing or prioritizing cuts from them? Should we grab some tokens from the bucket each time we do something in a P2P session with them too?
- TODO: try the hashable random-initial-seed flag!
116 changes: 40 additions & 76 deletions src/Chainweb/Chainweb.hs
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,6 @@ module Chainweb.Chainweb
, chainwebPeer
, chainwebPayloadDb
, chainwebPactData
, chainwebThrottler
, chainwebPutPeerThrottler
, chainwebMempoolThrottler
, chainwebConfig
, chainwebServiceSocket
, chainwebBackup
Expand All @@ -79,15 +76,7 @@ module Chainweb.Chainweb
, runChainweb

-- * Throttler
, mkGenericThrottler
, mkPutPeerThrottler
, checkPathPrefix
, mkThrottler

, ThrottlingConfig(..)
, throttlingRate
, throttlingPeerRate
, defaultThrottlingConfig

-- * Cut Config
, CutConfig(..)
Expand Down Expand Up @@ -128,11 +117,9 @@ import Network.Wai
import Network.Wai.Handler.Warp hiding (Port)
import Network.Wai.Handler.WarpTLS (WarpTLSException(..))
import Network.Wai.Middleware.RequestSizeLimit
import Network.Wai.Middleware.Throttle

import Prelude hiding (log)

import System.Clock
import System.LogLevel

-- internal modules
Expand Down Expand Up @@ -184,6 +171,7 @@ import P2P.Peer

import qualified Pact.Types.ChainMeta as P
import qualified Pact.Types.Command as P
import qualified Chainweb.Utils.Throttle as Throttle

-- -------------------------------------------------------------------------- --
-- Chainweb Resources
Expand All @@ -199,9 +187,6 @@ data Chainweb logger tbl = Chainweb
, _chainwebPayloadDb :: !(PayloadDb tbl)
, _chainwebManager :: !HTTP.Manager
, _chainwebPactData :: ![(ChainId, PactServerData logger tbl)]
, _chainwebThrottler :: !(Throttle Address)
, _chainwebPutPeerThrottler :: !(Throttle Address)
, _chainwebMempoolThrottler :: !(Throttle Address)
, _chainwebConfig :: !ChainwebConfiguration
, _chainwebServiceSocket :: !(Port, Socket)
, _chainwebBackup :: !(BackupEnv logger)
Expand Down Expand Up @@ -488,13 +473,6 @@ withChainwebInternal conf logger peer serviceSock rocksDb pactDbDir backupDir re
let !mLogger = setComponent "miner" logger
!mConf = _configMining conf
!mCutDb = _cutResCutDb cuts
!throt = _configThrottling conf

-- initialize throttler
throttler <- mkGenericThrottler $ _throttlingRate throt
putPeerThrottler <- mkPutPeerThrottler $ _throttlingPeerRate throt
mempoolThrottler <- mkMempoolThrottler $ _throttlingMempoolRate throt
logg Debug "initialized throttlers"

-- synchronize pact dbs with latest cut before we start the server
-- and clients and begin mining.
Expand Down Expand Up @@ -588,9 +566,6 @@ withChainwebInternal conf logger peer serviceSock rocksDb pactDbDir backupDir re
, _chainwebPayloadDb = view cutDbPayloadDb $ _cutResCutDb cuts
, _chainwebManager = mgr
, _chainwebPactData = pactData
, _chainwebThrottler = throttler
, _chainwebPutPeerThrottler = putPeerThrottler
, _chainwebMempoolThrottler = mempoolThrottler
, _chainwebConfig = conf
, _chainwebServiceSocket = serviceSock
, _chainwebBackup = BackupEnv
Expand Down Expand Up @@ -650,41 +625,13 @@ withChainwebInternal conf logger peer serviceSock rocksDb pactDbDir backupDir re
-- -------------------------------------------------------------------------- --
-- Throttling

mkGenericThrottler :: Double -> IO (Throttle Address)
mkGenericThrottler rate = mkThrottler 30 rate (const True)

mkPutPeerThrottler :: Double -> IO (Throttle Address)
mkPutPeerThrottler rate = mkThrottler 30 rate $ \r ->
elem "peer" (pathInfo r) && requestMethod r == "PUT"

mkMempoolThrottler :: Double -> IO (Throttle Address)
mkMempoolThrottler rate = mkThrottler 30 rate $ \r ->
elem "mempool" (pathInfo r)

checkPathPrefix
:: [T.Text]
-- ^ the base rate granted to users of the endpoing
-> Request
-> Bool
checkPathPrefix endpoint r = endpoint `isPrefixOf` drop 3 (pathInfo r)

-- | The period is 1 second. Burst is 2*rate.
--
mkThrottler
:: Double
-- ^ expiration of a stall bucket in seconds
-> Double
-- ^ the base rate granted to users of the endpoint (requests per second)
-> (Request -> Bool)
-- ^ Predicate to select requests that are throttled
-> IO (Throttle Address)
mkThrottler e rate c = initThrottler (defaultThrottleSettings $ TimeSpec (ceiling e) 0) -- expiration
{ throttleSettingsRate = rate -- number of allowed requests per period
, throttleSettingsPeriod = 1_000_000 -- 1 second
, throttleSettingsBurst = 4 * ceiling rate
, throttleSettingsIsThrottled = c
}

-- -------------------------------------------------------------------------- --
-- Run Chainweb

Expand Down Expand Up @@ -720,28 +667,40 @@ runChainweb cw nowServing = do
logg Warn $ "OpenAPI spec validation enabled on service API, make sure this is what you want"
mkValidationMiddleware
else return id

concurrentlies_

-- 1. Start serving Rest API
[ (if tls then serve else servePlain)
$ httpLog
. throttle (_chainwebPutPeerThrottler cw)
. throttle (_chainwebMempoolThrottler cw)
. throttle (_chainwebThrottler cw)
. p2pRequestSizeLimit
. p2pValidationMiddleware

-- 2. Start Clients (with a delay of 500ms)
, threadDelay 500000 >> clients

-- 3. Start serving local API
, threadDelay 500000 >> do
serveServiceApi
$ serviceHttpLog
. serviceRequestSizeLimit
. serviceApiValidationMiddleware
]
let theP2pThrottleConfig = cw ^. chainwebConfig . configP2p . p2pConfigThrottleConfig
let theServiceApiThrottleConfig = cw ^. chainwebConfig . configServiceApi . serviceApiConfigThrottleConfig
let withP2pThrottleMiddleware =
if _enableConfigEnabled theP2pThrottleConfig
then Throttle.throttleMiddleware (logFunction $ _chainwebLogger cw) "p2p" (_enableConfigConfig theP2pThrottleConfig)
else \k -> k id
let withServiceApiThrottleMiddleware =
if _enableConfigEnabled theServiceApiThrottleConfig
then Throttle.throttleMiddleware (logFunction $ _chainwebLogger cw) "p2p" (_enableConfigConfig theServiceApiThrottleConfig)
else \k -> k id

withP2pThrottleMiddleware $ \p2pThrottler ->
withServiceApiThrottleMiddleware $ \serviceThrottler ->

concurrentlies_

-- 1. Start serving Rest API
[ (if tls then serve else servePlain)
$ httpLog
. p2pRequestSizeLimit
. p2pThrottler
. p2pValidationMiddleware

-- 2. Start Clients (with a delay of 500ms)
, threadDelay 500000 >> clients

-- 3. Start serving local API
, threadDelay 500000 >> do
serveServiceApi
$ serviceHttpLog
. serviceRequestSizeLimit
. serviceThrottler
. serviceApiValidationMiddleware
]

where

Expand Down Expand Up @@ -805,12 +764,16 @@ runChainweb cw nowServing = do
when (defaultShouldDisplayException e) $
logg Debug $ loggServerError msg r e

onExceptionResponse ex =
fromMaybe (defaultOnExceptionResponse ex) (Throttle.throttledResponse ex)

-- P2P Server

serverSettings :: Counter "clientClosedConnections" -> Settings
serverSettings clientClosedConnectionsCounter =
peerServerSettings (_peerResPeer $ _chainwebPeer cw)
& setOnException (logWarpException "P2P API" clientClosedConnectionsCounter)
& setOnExceptionResponse onExceptionResponse
& setBeforeMainLoop (nowServing (nowServingP2PAPI .~ True))

monitorConnectionsClosedByClient :: Counter "clientClosedConnections" -> IO ()
Expand Down Expand Up @@ -893,6 +856,7 @@ runChainweb cw nowServing = do
& setHost interface
& setOnException
(logWarpException "Service API" clientClosedConnectionsCounter)
& setOnExceptionResponse onExceptionResponse
& setBeforeMainLoop (nowServing (nowServingServiceAPI .~ True))
& setServerName "Chainweb Service API"

Expand Down
Loading

0 comments on commit 2f93e35

Please sign in to comment.