Skip to content

Commit

Permalink
waku_store protocol: better logs to track time and new store metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
Ivansete-status committed Sep 3, 2024
1 parent 7766e54 commit 06a9b01
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 9 deletions.
34 changes: 25 additions & 9 deletions waku/waku_store/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
{.push raises: [].}

import
std/options,
std/[options, times],
results,
chronicles,
chronos,
Expand Down Expand Up @@ -36,9 +36,11 @@ type WakuStore* = ref object of LPProtocol

## Protocol

type StoreResp = tuple[resp: seq[byte], requestId: string]

proc handleQueryRequest(
self: WakuStore, requestor: PeerId, raw_request: seq[byte]
): Future[seq[byte]] {.async.} =
): Future[StoreResp] {.async.} =
var res = StoreQueryResponse()

let req = StoreQueryRequest.decode(raw_request).valueOr:
Expand All @@ -48,7 +50,7 @@ proc handleQueryRequest(
res.statusCode = uint32(ErrorCode.BAD_REQUEST)
res.statusDesc = "decoding rpc failed: " & $error

return res.encode().buffer
return (res.encode().buffer, "not_parsed_requestId")

let requestId = req.requestId

Expand All @@ -65,7 +67,7 @@ proc handleQueryRequest(
res.statusCode = uint32(error.kind)
res.statusDesc = $error

return res.encode().buffer
return (res.encode().buffer, "not_parsed_requestId")

res.requestId = requestId
res.statusCode = 200
Expand All @@ -74,7 +76,7 @@ proc handleQueryRequest(
info "sending store query response",
peerId = requestor, requestId = requestId, messages = res.messages.len

return res.encode().buffer
return (res.encode().buffer, requestId)

proc initProtocolHandler(self: WakuStore) =
let rejectReposnseBuffer = StoreQueryResponse(
Expand All @@ -87,7 +89,8 @@ proc initProtocolHandler(self: WakuStore) =
).encode().buffer

proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} =
var resBuf: seq[byte]
var successfulQuery = false ## only consider the correct queries in metrics
var resBuf: StoreResp
self.requestRateLimiter.checkUsageLimit(WakuStoreCodec, conn):
let readRes = catch:
await conn.readLp(DefaultMaxRpcSize.int)
Expand All @@ -100,21 +103,34 @@ proc initProtocolHandler(self: WakuStore) =
amount = reqBuf.len().int64, labelValues = [WakuStoreCodec, "in"]
)

let queryStartTime = getTime().toUnixFloat()

resBuf = await self.handleQueryRequest(conn.peerId, reqBuf)

let queryDuration = getTime().toUnixFloat() - queryStartTime
waku_store_time_seconds.inc(amount = queryDuration, labelValues = ["query-db"])
successfulQuery = true
do:
debug "store query request rejected due rate limit exceeded",
peerId = conn.peerId, limit = $self.requestRateLimiter.setting
resBuf = rejectReposnseBuffer
resBuf = (rejectReposnseBuffer, "rejected")

let writeRespStartTime = getTime().toUnixFloat()

let writeRes = catch:
await conn.writeLp(resBuf)
await conn.writeLp(resBuf.resp)

if writeRes.isErr():
error "Connection write error", error = writeRes.error.msg
return

debug "after sending response", requestId = resBuf.requestId
if successfulQuery:
let writeDuration = getTime().toUnixFloat() - writeRespStartTime
waku_store_time_seconds.inc(amount = writeDuration, labelValues = ["send-resp"])

waku_service_network_bytes.inc(
amount = resBuf.len().int64, labelValues = [WakuStoreCodec, "out"]
amount = resBuf.resp.len().int64, labelValues = [WakuStoreCodec, "out"]
)

self.handler = handler
Expand Down
5 changes: 5 additions & 0 deletions waku/waku_store/protocol_metrics.nim
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ import metrics
declarePublicGauge waku_store_errors, "number of store protocol errors", ["type"]
declarePublicGauge waku_store_queries, "number of store queries received"

## f.e., we have the "query" phase, where the node performs the query to the database,
## and the "libp2p" phase, where the node writes the store response to the libp2p stream.
declarePublicGauge waku_store_time_seconds,
"Time in seconds spent by each store phase", labels = ["phase"]

# Error types (metric label values)
const
dialFailure* = "dial_failure"
Expand Down

0 comments on commit 06a9b01

Please sign in to comment.