From 06a9b0196575a7d2da1a383aeb5aca52c62d04aa Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Wed, 4 Sep 2024 01:57:23 +0200 Subject: [PATCH] waku_store protocol: better logs to track time and new store metrics --- waku/waku_store/protocol.nim | 34 ++++++++++++++++++++-------- waku/waku_store/protocol_metrics.nim | 5 ++++ 2 files changed, 30 insertions(+), 9 deletions(-) diff --git a/waku/waku_store/protocol.nim b/waku/waku_store/protocol.nim index 2f47cc6c8c..4e94d4c489 100644 --- a/waku/waku_store/protocol.nim +++ b/waku/waku_store/protocol.nim @@ -4,7 +4,7 @@ {.push raises: [].} import - std/options, + std/[options, times], results, chronicles, chronos, @@ -36,9 +36,11 @@ type WakuStore* = ref object of LPProtocol ## Protocol +type StoreResp = tuple[resp: seq[byte], requestId: string] + proc handleQueryRequest( self: WakuStore, requestor: PeerId, raw_request: seq[byte] -): Future[seq[byte]] {.async.} = +): Future[StoreResp] {.async.} = var res = StoreQueryResponse() let req = StoreQueryRequest.decode(raw_request).valueOr: @@ -48,7 +50,7 @@ proc handleQueryRequest( res.statusCode = uint32(ErrorCode.BAD_REQUEST) res.statusDesc = "decoding rpc failed: " & $error - return res.encode().buffer + return (res.encode().buffer, "not_parsed_requestId") let requestId = req.requestId @@ -65,7 +67,7 @@ proc handleQueryRequest( res.statusCode = uint32(error.kind) res.statusDesc = $error - return res.encode().buffer + return (res.encode().buffer, "not_parsed_requestId") res.requestId = requestId res.statusCode = 200 @@ -74,7 +76,7 @@ proc handleQueryRequest( info "sending store query response", peerId = requestor, requestId = requestId, messages = res.messages.len - return res.encode().buffer + return (res.encode().buffer, requestId) proc initProtocolHandler(self: WakuStore) = let rejectReposnseBuffer = StoreQueryResponse( @@ -87,7 +89,8 @@ proc initProtocolHandler(self: WakuStore) = ).encode().buffer proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} = - var resBuf: seq[byte] + var successfulQuery = false ## only consider the correct queries in metrics + var resBuf: StoreResp self.requestRateLimiter.checkUsageLimit(WakuStoreCodec, conn): let readRes = catch: await conn.readLp(DefaultMaxRpcSize.int) @@ -100,21 +103,34 @@ proc initProtocolHandler(self: WakuStore) = amount = reqBuf.len().int64, labelValues = [WakuStoreCodec, "in"] ) + let queryStartTime = getTime().toUnixFloat() + resBuf = await self.handleQueryRequest(conn.peerId, reqBuf) + + let queryDuration = getTime().toUnixFloat() - queryStartTime + waku_store_time_seconds.inc(amount = queryDuration, labelValues = ["query-db"]) + successfulQuery = true do: debug "store query request rejected due rate limit exceeded", peerId = conn.peerId, limit = $self.requestRateLimiter.setting - resBuf = rejectReposnseBuffer + resBuf = (rejectReposnseBuffer, "rejected") + + let writeRespStartTime = getTime().toUnixFloat() let writeRes = catch: - await conn.writeLp(resBuf) + await conn.writeLp(resBuf.resp) if writeRes.isErr(): error "Connection write error", error = writeRes.error.msg return + debug "after sending response", requestId = resBuf.requestId + if successfulQuery: + let writeDuration = getTime().toUnixFloat() - writeRespStartTime + waku_store_time_seconds.inc(amount = writeDuration, labelValues = ["send-resp"]) + waku_service_network_bytes.inc( - amount = resBuf.len().int64, labelValues = [WakuStoreCodec, "out"] + amount = resBuf.resp.len().int64, labelValues = [WakuStoreCodec, "out"] ) self.handler = handler diff --git a/waku/waku_store/protocol_metrics.nim b/waku/waku_store/protocol_metrics.nim index d413c0a678..851670cdba 100644 --- a/waku/waku_store/protocol_metrics.nim +++ b/waku/waku_store/protocol_metrics.nim @@ -5,6 +5,11 @@ import metrics declarePublicGauge waku_store_errors, "number of store protocol errors", ["type"] declarePublicGauge waku_store_queries, "number of store queries received" +## f.e., we have the "query" phase, where the node performs the query to the database, +## and the "libp2p" phase, where the node writes the store response to the libp2p stream. +declarePublicGauge waku_store_time_seconds, + "Time in seconds spent by each store phase", labels = ["phase"] + # Error types (metric label values) const dialFailure* = "dial_failure"