diff --git a/src/handler.js b/src/handler.js index f9e7326d..26432cef 100644 --- a/src/handler.js +++ b/src/handler.js @@ -56,7 +56,7 @@ function handle ({ context, logger, batchSize = config.blocksBatchSize }) { const normalizedWantlist = getNormalizedWantlist(wantlist, context, logger) // Set state of processing blocks in canceled state - updateCanceledState(normalizedWantlist, context) + updateCanceledState(normalizedWantlist, context, logger) process.nextTick(async () => { // catch async error in libp2p connection @@ -186,8 +186,20 @@ async function batchResponse ({ blocks, context, logger }) { let message = new Message() for (let i = 0; i < blocks.length; i++) { const block = blocks[i] + // console.log('block key', block.key) const canceledItem = context.canceled.get(block.key) - if (!canceledItem || canceledItem !== block.type) { + logger.info({ keyList: context.canceled.keyList }, 'check keyList') + logger.info({ key: block.key, type: block.type, wantType: block.wantType, block: block }, 'check') + logger.info({ canceled: canceledItem }, 'canceled') + + if (canceledItem === block.type) { + const size = messageSize[block.type](block) + telemetry.increaseLabelCount('bitswap-block-success-cancel', [block.type]) + telemetry.increaseLabelCount('bitswap-cancel-size', [block.type], size) + + logger.info({ key: block.key }, 'delete') + context.canceled.delete(block.key) + } else { const size = messageSize[block.type](block) // maxMessageSize MUST BE larger than a single block info/data @@ -198,12 +210,6 @@ async function batchResponse ({ blocks, context, logger }) { message.push(block, size, context.protocol) sentMetrics[block.type](block, size) - } else { - const size = messageSize[block.type](block) - telemetry.increaseLabelCount('bitswap-block-success-cancel', [block.type]) - telemetry.increaseLabelCount('bitswap-cancel-size', [block.type], size) - - context.canceled.delete(block.key) } } @@ -217,22 +223,27 @@ async function batchResponse ({ blocks, context, logger }) { } } -function updateCanceledState (wantList, context) { +function updateCanceledState (wantList, context, logger) { const { wantedBlocks, wantedHave, canceled } = wantList // Removed previous canceled blocks wantedBlocks.forEach(block => { + logger.info({ key: block.key }, 'remove block from canceled') context.canceled.delete(block.key) }) wantedHave.forEach(block => { + logger.info({ key: block.key }, 'remove wanted block from canceled') context.canceled.delete(block.key) }) // Add new canceled blocks canceled.forEach(block => { + logger.info({ key: block.key }, 'add block to canceled') context.canceled.set(block.key, block.wantType) }) + + logger.info({ keyList: context.canceled.keyList }, 'check keyList') } // end response, close connection diff --git a/src/service.js b/src/service.js index ba56066f..f0f111a1 100644 --- a/src/service.js +++ b/src/service.js @@ -82,6 +82,7 @@ async function startService ({ peerId, port, peerAnnounceAddr, awsClient, connec autoDialInterval: connectionConfig.p2p.autoDialInterval } }) + const cancelsPerPeer = new Map() const handlerOptions = { maxInboundStreams: connectionConfig.handler.maxInboundStreams, @@ -109,7 +110,9 @@ async function startService ({ peerId, port, peerAnnounceAddr, awsClient, connec service.handle(protocol, async ({ connection: dial, stream }) => { try { const connection = new Connection(stream) - const canceled = new LRU({ max: 200 }) + const canceled = cancelsPerPeer.get(dial.remotePeer.toString()) + + !canceled && console.log('canceled') const hrTime = process.hrtime() const connectionId = hrTime[0] * 1000000000 + hrTime[1] @@ -146,12 +149,13 @@ async function startService ({ peerId, port, peerAnnounceAddr, awsClient, connec // another multiplexed stream. connection.on('end:receive', () => { // GC canceled LRU on finish - canceled.clear() + logger.info({}, 'end:receive') connection.close() }) connection.on('error', err => { // GC canceled LRU on error + logger.info({}, 'error') canceled.clear() logger.error({ err, dial, stream, protocol }, 'Connection error') }) @@ -163,20 +167,27 @@ async function startService ({ peerId, port, peerAnnounceAddr, awsClient, connec // TODO move to networking service.connectionManager.addEventListener('peer:connect', connection => { + cancelsPerPeer.set( + connection.detail.remotePeer.toString(), + new LRU({ max: 200 }) + ) try { telemetry.increaseCount('bitswap-connections') telemetry.increaseGauge('bitswap-active-connections') } catch (err) { - logger.warn({ err, remotePeer: connection.remotePeer }, 'Error while peer connecting') + logger.warn({ err, remotePeer: connection.detail.remotePeer.toString() }, 'Error while peer connecting') } }) // TODO move to networking service.connectionManager.addEventListener('peer:disconnect', connection => { + cancelsPerPeer.delete( + connection.detail.remotePeer.toString() + ) try { telemetry.decreaseGauge('bitswap-active-connections') } catch (err) { - logger.warn({ err, remotePeer: connection.remotePeer }, 'Error while peer disconnecting') + logger.warn({ err, remotePeer: connection.detail.remotePeer.toString() }, 'Error while peer disconnecting') } })