From 36ea1b40bb2244fe2ee36df00509488617d8fedd Mon Sep 17 00:00:00 2001 From: MoonBoi9001 Date: Thu, 23 Apr 2026 12:18:47 +0800 Subject: [PATCH 1/3] feat(offer): add Offer entity for dipper's offer idempotency gate Add an Offer entity keyed by agreementId, populated from RecurringCollector.OfferStored. Dipper queries this entity to avoid re-submitting an offer after a crashed-mid-flight restart where the on-chain tx landed but dipper lost track of it. Co-Authored-By: Claude Opus 4.7 (1M context) --- abis/RecurringCollector.json | 11 ++++++ schema.graphql | 20 ++++++++++ src/recurringCollector.ts | 24 +++++++++++- subgraph.template.yaml | 3 ++ tests/recurringCollector.test.ts | 67 ++++++++++++++++++++++++++++++++ 5 files changed, 124 insertions(+), 1 deletion(-) create mode 100644 tests/recurringCollector.test.ts diff --git a/abis/RecurringCollector.json b/abis/RecurringCollector.json index 702cba3..7ba7df2 100644 --- a/abis/RecurringCollector.json +++ b/abis/RecurringCollector.json @@ -59,5 +59,16 @@ ], "name": "RCACollected", "type": "event" + }, + { + "anonymous": false, + "inputs": [ + { "indexed": true, "name": "agreementId", "type": "bytes16" }, + { "indexed": true, "name": "payer", "type": "address" }, + { "indexed": true, "name": "offerType", "type": "uint8" }, + { "indexed": false, "name": "offerHash", "type": "bytes32" } + ], + "name": "OfferStored", + "type": "event" } ] diff --git a/schema.graphql b/schema.graphql index 2e88a21..1179435 100644 --- a/schema.graphql +++ b/schema.graphql @@ -71,3 +71,23 @@ type IndexerDeploymentLatest @entity(immutable: false) { blockNumber: BigInt! blockTimestamp: BigInt! } + +# First stored offer per agreementId, keyed by bytes16 agreement ID. +# Dipper queries this entity as an idempotency gate -- avoids re-submitting +# an offer after a crashed-mid-flight restart where the on-chain tx landed +# but dipper lost track of it. +# +# Declared immutable because, for a given agreementId, the RCA identifying +# fields (payer, dataService, serviceProvider, deadline, nonce) are fixed by +# the id derivation, so any duplicate OfferStored event for the same id would +# carry the same offerHash. The handler enforces this by returning early on +# the second event instead of attempting to overwrite. +type Offer @entity(immutable: true) { + id: Bytes! + payer: Bytes! + offerType: Int! + offerHash: Bytes! + createdAtBlock: BigInt! + createdAtTimestamp: BigInt! + createdAtTx: Bytes! +} diff --git a/src/recurringCollector.ts b/src/recurringCollector.ts index 32348d2..5dc67ed 100644 --- a/src/recurringCollector.ts +++ b/src/recurringCollector.ts @@ -1,9 +1,10 @@ -import { IndexingAgreement } from '../generated/schema' +import { IndexingAgreement, Offer } from '../generated/schema' import { AgreementAccepted, AgreementCanceled, AgreementUpdated, RCACollected, + OfferStored as OfferStoredEvent, } from '../generated/RecurringCollector/RecurringCollector' import { createOrLoadIndexingAgreement, BIGINT_ZERO } from './helpers' @@ -61,3 +62,24 @@ export function handleRCACollected(event: RCACollected): void { agreement.tokensCollected = agreement.tokensCollected.plus(event.params.tokens) agreement.save() } + +export function handleOfferStored(event: OfferStoredEvent): void { + // First-offer entity keyed by agreementId (bytes16). Immutable: if an + // entity already exists, a duplicate OfferStored event for the same + // agreement id (e.g. dipper crashed and re-submitted, or a chain reorg + // re-emitted) carries the same offerHash by construction and we return + // early. Writing to an immutable entity a second time is a graph-node + // error that would halt the subgraph, so the guard is load-bearing. + let existing = Offer.load(event.params.agreementId) + if (existing != null) { + return + } + let offer = new Offer(event.params.agreementId) + offer.payer = event.params.payer + offer.offerType = event.params.offerType + offer.offerHash = event.params.offerHash + offer.createdAtBlock = event.block.number + offer.createdAtTimestamp = event.block.timestamp + offer.createdAtTx = event.transaction.hash + offer.save() +} diff --git a/subgraph.template.yaml b/subgraph.template.yaml index 1d7f146..43263f8 100644 --- a/subgraph.template.yaml +++ b/subgraph.template.yaml @@ -43,6 +43,7 @@ dataSources: language: wasm/assemblyscript entities: - IndexingAgreement + - Offer abis: - name: RecurringCollector file: ./abis/RecurringCollector.json @@ -59,4 +60,6 @@ dataSources: - event: RCACollected(indexed address,indexed address,indexed address,bytes16,bytes32,uint256,uint256) handler: handleRCACollected topic1: ["{{subgraphServiceAddress}}"] + - event: OfferStored(indexed bytes16,indexed address,indexed uint8,bytes32) + handler: handleOfferStored file: ./src/recurringCollector.ts diff --git a/tests/recurringCollector.test.ts b/tests/recurringCollector.test.ts new file mode 100644 index 0000000..aecd44c --- /dev/null +++ b/tests/recurringCollector.test.ts @@ -0,0 +1,67 @@ +import { assert, describe, test, clearStore, afterEach, newMockEvent } from 'matchstick-as' +import { Address, Bytes, BigInt, ethereum } from '@graphprotocol/graph-ts' +import { handleOfferStored } from '../src/recurringCollector' +import { OfferStored as OfferStoredEvent } from '../generated/RecurringCollector/RecurringCollector' + +const PAYER = Address.fromString('0x0000000000000000000000000000000000000002') +const AGREEMENT_ID = Bytes.fromHexString('0x0102030405060708090a0b0c0d0e0f10') + +function createOfferStoredEvent( + agreementId: Bytes, + offerType: i32, + offerHash: Bytes, +): OfferStoredEvent { + let event = changetype(newMockEvent()) + + event.parameters = new Array() + event.parameters.push( + new ethereum.EventParam('agreementId', ethereum.Value.fromFixedBytes(agreementId)), + ) + event.parameters.push(new ethereum.EventParam('payer', ethereum.Value.fromAddress(PAYER))) + event.parameters.push( + new ethereum.EventParam( + 'offerType', + ethereum.Value.fromUnsignedBigInt(BigInt.fromI32(offerType)), + ), + ) + event.parameters.push( + new ethereum.EventParam('offerHash', ethereum.Value.fromFixedBytes(offerHash)), + ) + + return event +} + +describe('handleOfferStored', () => { + afterEach(() => { + clearStore() + }) + + test('first event creates Offer entity', () => { + let offerHash = Bytes.fromHexString('0x' + 'aa'.repeat(32)) + let event = createOfferStoredEvent(AGREEMENT_ID, 0, offerHash) + handleOfferStored(event) + + assert.entityCount('Offer', 1) + + let id = AGREEMENT_ID.toHexString() + assert.fieldEquals('Offer', id, 'payer', PAYER.toHexString()) + assert.fieldEquals('Offer', id, 'offerType', '0') + assert.fieldEquals('Offer', id, 'offerHash', offerHash.toHexString()) + }) + + test('duplicate event for same agreementId is a no-op (idempotency guard)', () => { + let offerHash = Bytes.fromHexString('0x' + 'aa'.repeat(32)) + + let event1 = createOfferStoredEvent(AGREEMENT_ID, 0, offerHash) + handleOfferStored(event1) + + // Second event for same agreementId must not halt on immutable re-write. + let event2 = createOfferStoredEvent(AGREEMENT_ID, 0, offerHash) + event2.transaction.hash = Bytes.fromHexString( + '0x1111111111111111111111111111111111111111111111111111111111111111', + ) as Bytes + handleOfferStored(event2) + + assert.entityCount('Offer', 1) + }) +}) From ad182dd3bcad9a09486f3f3e002d4dfb1aaf0d5b Mon Sep 17 00:00:00 2001 From: MoonBoi9001 Date: Thu, 23 Apr 2026 12:49:34 +0800 Subject: [PATCH 2/3] feat(agreement): track state-change block and canceler Add `lastStateChangeBlock` so consumers can poll for agreements that moved since a given block, and `canceledBy` (taken from the SubgraphService event's canceledOnBehalfOf) so operator-initiated cancels capture the real signer, not just the enum. Co-Authored-By: Claude Opus 4.7 (1M context) --- schema.graphql | 4 ++ src/helpers.ts | 8 +++- src/recurringCollector.ts | 9 ++++- src/subgraphService.ts | 14 +++++++ subgraph.template.yaml | 2 + tests/subgraphService.test.ts | 69 +++++++++++++++++++++++++++++++++++ 6 files changed, 104 insertions(+), 2 deletions(-) diff --git a/schema.graphql b/schema.graphql index 1179435..c1ae5b9 100644 --- a/schema.graphql +++ b/schema.graphql @@ -42,8 +42,12 @@ type IndexingAgreement @entity(immutable: false) { lastUpdatedAt: BigInt! "Timestamp when agreement was canceled (0 if not canceled)" canceledAt: BigInt! + "Address that initiated the cancel (zero address if not canceled). Taken from SubgraphService.IndexingAgreementCanceled.canceledOnBehalfOf so operator-initiated cancels are captured correctly." + canceledBy: Bytes! "Total tokens collected over lifetime" tokensCollected: BigInt! + "Block number of the latest state change on this agreement (Accepted / Updated / Canceled / RCACollected). Consumers that reconcile state diffs poll with `lastStateChangeBlock_gt` since last seen block." + lastStateChangeBlock: BigInt! "Fee collection history" collections: [IndexingFeeCollection!]! @derivedFrom(field: "agreement") } diff --git a/src/helpers.ts b/src/helpers.ts index 4f9cf23..dbec73c 100644 --- a/src/helpers.ts +++ b/src/helpers.ts @@ -1,4 +1,4 @@ -import { BigInt, Bytes } from '@graphprotocol/graph-ts' +import { Address, BigInt, Bytes } from '@graphprotocol/graph-ts' import { IndexingAgreement } from '../generated/schema' export const BIGINT_ZERO = BigInt.fromI32(0) @@ -23,7 +23,13 @@ export function createOrLoadIndexingAgreement(agreementId: Bytes): IndexingAgree agreement.maxSecondsPerCollection = 0 agreement.lastUpdatedAt = BIGINT_ZERO agreement.canceledAt = BIGINT_ZERO + // Default to 20-byte zero address rather than Bytes.empty(). Graph-node + // serializes empty Bytes on non-nullable fields with unpredictable + // padding (observed as "0x00000000" in practice), which breaks strict + // 20-byte-address parsers on the consumer side. + agreement.canceledBy = Address.zero() as Bytes agreement.tokensCollected = BIGINT_ZERO + agreement.lastStateChangeBlock = BIGINT_ZERO } return agreement } diff --git a/src/recurringCollector.ts b/src/recurringCollector.ts index 5dc67ed..4ce32ab 100644 --- a/src/recurringCollector.ts +++ b/src/recurringCollector.ts @@ -23,6 +23,7 @@ export function handleAgreementAccepted(event: AgreementAccepted): void { agreement.maxSecondsPerCollection = event.params.maxSecondsPerCollection.toI32() agreement.canceledAt = BIGINT_ZERO agreement.tokensCollected = BIGINT_ZERO + agreement.lastStateChangeBlock = event.block.number agreement.save() } @@ -31,13 +32,17 @@ export function handleAgreementCanceled(event: AgreementCanceled): void { let agreement = IndexingAgreement.load(event.params.agreementId) if (agreement == null) return - // canceledBy enum: 0=ServiceProvider, 1=Payer + // canceledBy enum: 0=ServiceProvider, 1=Payer. The actual canceler address + // is written by subgraphService.handleIndexingAgreementCanceled, which + // fires in the same transaction and reads the SubgraphService event's + // canceledOnBehalfOf parameter. if (event.params.canceledBy == 0) { agreement.state = 'CanceledByServiceProvider' } else { agreement.state = 'CanceledByPayer' } agreement.canceledAt = event.params.canceledAt + agreement.lastStateChangeBlock = event.block.number agreement.save() } @@ -51,6 +56,7 @@ export function handleAgreementUpdated(event: AgreementUpdated): void { agreement.maxOngoingTokensPerSecond = event.params.maxOngoingTokensPerSecond agreement.minSecondsPerCollection = event.params.minSecondsPerCollection.toI32() agreement.maxSecondsPerCollection = event.params.maxSecondsPerCollection.toI32() + agreement.lastStateChangeBlock = event.block.number agreement.save() } @@ -60,6 +66,7 @@ export function handleRCACollected(event: RCACollected): void { agreement.lastCollectionAt = event.block.timestamp agreement.tokensCollected = agreement.tokensCollected.plus(event.params.tokens) + agreement.lastStateChangeBlock = event.block.number agreement.save() } diff --git a/src/subgraphService.ts b/src/subgraphService.ts index b1a00b9..6f550af 100644 --- a/src/subgraphService.ts +++ b/src/subgraphService.ts @@ -1,6 +1,7 @@ import { ethereum } from '@graphprotocol/graph-ts' import { IndexingAgreementAccepted as AcceptedEvent, + IndexingAgreementCanceled as CanceledEvent, IndexingAgreementUpdated as UpdatedEvent, IndexingFeesCollectedV1 as FeesCollectedEvent, } from '../generated/SubgraphService/SubgraphService' @@ -19,6 +20,18 @@ export function handleIndexingAgreementAccepted(event: AcceptedEvent): void { agreement.tokensPerEntityPerSecond = terms[1].toBigInt() } + agreement.lastStateChangeBlock = event.block.number + agreement.save() +} + +export function handleIndexingAgreementCanceled(event: CanceledEvent): void { + let agreement = createOrLoadIndexingAgreement(event.params.agreementId) + // canceledOnBehalfOf is the actual signer that initiated the cancel. For + // operator-initiated cancels this is the operator, not the payer/indexer + // directly. Dipper's chain_listener compares this to its own signer + // address to decide CanceledByRequester vs CanceledByIndexer. + agreement.canceledBy = event.params.canceledOnBehalfOf + agreement.lastStateChangeBlock = event.block.number agreement.save() } @@ -33,6 +46,7 @@ export function handleIndexingAgreementUpdated(event: UpdatedEvent): void { agreement.tokensPerEntityPerSecond = terms[1].toBigInt() } + agreement.lastStateChangeBlock = event.block.number agreement.save() } diff --git a/subgraph.template.yaml b/subgraph.template.yaml index 43263f8..1903840 100644 --- a/subgraph.template.yaml +++ b/subgraph.template.yaml @@ -25,6 +25,8 @@ dataSources: eventHandlers: - event: IndexingAgreementAccepted(indexed address,indexed address,indexed bytes16,address,bytes32,uint8,bytes) handler: handleIndexingAgreementAccepted + - event: IndexingAgreementCanceled(indexed address,indexed address,indexed bytes16,address) + handler: handleIndexingAgreementCanceled - event: IndexingAgreementUpdated(indexed address,indexed address,indexed bytes16,address,uint8,bytes) handler: handleIndexingAgreementUpdated - event: IndexingFeesCollectedV1(indexed address,indexed address,indexed bytes16,address,bytes32,uint256,uint256,uint256,bytes32,uint256,bytes) diff --git a/tests/subgraphService.test.ts b/tests/subgraphService.test.ts index d43b67f..aa9e5f5 100644 --- a/tests/subgraphService.test.ts +++ b/tests/subgraphService.test.ts @@ -2,11 +2,13 @@ import { assert, describe, test, clearStore, afterEach } from 'matchstick-as' import { Address, Bytes, BigInt, ethereum } from '@graphprotocol/graph-ts' import { handleIndexingAgreementAccepted, + handleIndexingAgreementCanceled, handleIndexingAgreementUpdated, handleIndexingFeesCollectedV1, } from '../src/subgraphService' import { IndexingAgreementAccepted as AcceptedEvent, + IndexingAgreementCanceled as CanceledEvent, IndexingAgreementUpdated as UpdatedEvent, IndexingFeesCollectedV1 as FeesCollectedEvent, } from '../generated/SubgraphService/SubgraphService' @@ -46,6 +48,27 @@ function createAcceptedEvent( return event } +function createCanceledEvent( + indexer: Address, + payer: Address, + agreementId: Bytes, + canceledOnBehalfOf: Address, +): CanceledEvent { + let event = changetype(newMockEvent()) + + event.parameters = new Array() + event.parameters.push(new ethereum.EventParam('indexer', ethereum.Value.fromAddress(indexer))) + event.parameters.push(new ethereum.EventParam('payer', ethereum.Value.fromAddress(payer))) + event.parameters.push( + new ethereum.EventParam('agreementId', ethereum.Value.fromFixedBytes(agreementId)), + ) + event.parameters.push( + new ethereum.EventParam('canceledOnBehalfOf', ethereum.Value.fromAddress(canceledOnBehalfOf)), + ) + + return event +} + function createUpdatedEvent( indexer: Address, payer: Address, @@ -153,6 +176,7 @@ describe('handleIndexingAgreementAccepted', () => { 1, versionTerms, ) + event.block.number = BigInt.fromI32(100) handleIndexingAgreementAccepted(event) assert.entityCount('IndexingAgreement', 1) @@ -175,11 +199,49 @@ describe('handleIndexingAgreementAccepted', () => { 'tokensPerEntityPerSecond', '50', ) + assert.fieldEquals( + 'IndexingAgreement', + agreementId.toHexString(), + 'lastStateChangeBlock', + '100', + ) // State remains NotAccepted until RC handler fires assert.fieldEquals('IndexingAgreement', agreementId.toHexString(), 'state', 'NotAccepted') }) }) +describe('handleIndexingAgreementCanceled', () => { + afterEach(() => { + clearStore() + }) + + test('sets canceledBy to canceledOnBehalfOf and stamps lastStateChangeBlock', () => { + let indexer = Address.fromString('0x0000000000000000000000000000000000000001') + let payer = Address.fromString('0x0000000000000000000000000000000000000002') + let agreementId = Bytes.fromHexString('0x0102030405060708090a0b0c0d0e0f10') + // Operator address, distinct from payer/indexer, to prove the handler + // captures whoever actually initiated the cancel rather than inferring it. + let operator = Address.fromString('0x000000000000000000000000000000000000000a') + + let event = createCanceledEvent(indexer, payer, agreementId, operator) + event.block.number = BigInt.fromI32(200) + handleIndexingAgreementCanceled(event) + + assert.fieldEquals( + 'IndexingAgreement', + agreementId.toHexString(), + 'canceledBy', + operator.toHexString(), + ) + assert.fieldEquals( + 'IndexingAgreement', + agreementId.toHexString(), + 'lastStateChangeBlock', + '200', + ) + }) +}) + describe('handleIndexingAgreementUpdated', () => { afterEach(() => { clearStore() @@ -216,6 +278,7 @@ describe('handleIndexingAgreementUpdated', () => { 1, newVersionTerms, ) + updateEvent.block.number = BigInt.fromI32(300) handleIndexingAgreementUpdated(updateEvent) assert.entityCount('IndexingAgreement', 1) @@ -232,6 +295,12 @@ describe('handleIndexingAgreementUpdated', () => { 'tokensPerEntityPerSecond', '100', ) + assert.fieldEquals( + 'IndexingAgreement', + agreementId.toHexString(), + 'lastStateChangeBlock', + '300', + ) }) }) From df0b0c16e11484f7f1957deac843e992a6df24af Mon Sep 17 00:00:00 2001 From: MoonBoi9001 Date: Mon, 11 May 2026 16:31:54 +0800 Subject: [PATCH 3/3] fix(events): catch up to audited RecurringCollector event surface The audit dropped redundant timestamps and added OfferCancelled; the subgraph still used pre-audit signatures so every handler stayed silent. Catch up the ABI/yaml/mappings, subscribe to OfferCancelled, and refresh the Offer on every OfferStored. Co-Authored-By: Claude Opus 4.7 (1M context) --- abis/RecurringCollector.json | 13 +++-- schema.graphql | 19 ++++--- src/recurringCollector.ts | 75 +++++++++++++++++++--------- subgraph.template.yaml | 8 +-- tests/recurringCollector.test.ts | 86 ++++++++++++++++++++++++++------ 5 files changed, 150 insertions(+), 51 deletions(-) diff --git a/abis/RecurringCollector.json b/abis/RecurringCollector.json index 7ba7df2..4dafa1e 100644 --- a/abis/RecurringCollector.json +++ b/abis/RecurringCollector.json @@ -6,7 +6,6 @@ { "indexed": true, "name": "payer", "type": "address" }, { "indexed": true, "name": "serviceProvider", "type": "address" }, { "indexed": false, "name": "agreementId", "type": "bytes16" }, - { "indexed": false, "name": "acceptedAt", "type": "uint64" }, { "indexed": false, "name": "endsAt", "type": "uint64" }, { "indexed": false, "name": "maxInitialTokens", "type": "uint256" }, { "indexed": false, "name": "maxOngoingTokensPerSecond", "type": "uint256" }, @@ -23,7 +22,6 @@ { "indexed": true, "name": "payer", "type": "address" }, { "indexed": true, "name": "serviceProvider", "type": "address" }, { "indexed": false, "name": "agreementId", "type": "bytes16" }, - { "indexed": false, "name": "canceledAt", "type": "uint64" }, { "indexed": false, "name": "canceledBy", "type": "uint8" } ], "name": "AgreementCanceled", @@ -36,7 +34,6 @@ { "indexed": true, "name": "payer", "type": "address" }, { "indexed": true, "name": "serviceProvider", "type": "address" }, { "indexed": false, "name": "agreementId", "type": "bytes16" }, - { "indexed": false, "name": "updatedAt", "type": "uint64" }, { "indexed": false, "name": "endsAt", "type": "uint64" }, { "indexed": false, "name": "maxInitialTokens", "type": "uint256" }, { "indexed": false, "name": "maxOngoingTokensPerSecond", "type": "uint256" }, @@ -70,5 +67,15 @@ ], "name": "OfferStored", "type": "event" + }, + { + "anonymous": false, + "inputs": [ + { "indexed": true, "name": "caller", "type": "address" }, + { "indexed": true, "name": "agreementId", "type": "bytes16" }, + { "indexed": true, "name": "hash", "type": "bytes32" } + ], + "name": "OfferCancelled", + "type": "event" } ] diff --git a/schema.graphql b/schema.graphql index c1ae5b9..3a17c03 100644 --- a/schema.graphql +++ b/schema.graphql @@ -76,21 +76,26 @@ type IndexerDeploymentLatest @entity(immutable: false) { blockTimestamp: BigInt! } -# First stored offer per agreementId, keyed by bytes16 agreement ID. +# Latest stored offer per agreementId, keyed by bytes16 agreement ID. # Dipper queries this entity as an idempotency gate -- avoids re-submitting # an offer after a crashed-mid-flight restart where the on-chain tx landed # but dipper lost track of it. # -# Declared immutable because, for a given agreementId, the RCA identifying -# fields (payer, dataService, serviceProvider, deadline, nonce) are fixed by -# the id derivation, so any duplicate OfferStored event for the same id would -# carry the same offerHash. The handler enforces this by returning early on -# the second event instead of attempting to overwrite. -type Offer @entity(immutable: true) { +# Mutable because the contract overwrites the stored offer in two cases: +# (1) an OFFER_TYPE_UPDATE event refreshes the hash and type to the latest +# RCAU terms; (2) an OfferCancelled event stamps `canceledAt` so dipper +# treats `canceledAt > 0` as "no live offer; safe to re-submit." Storage +# stays bounded because there is at most one Offer entity per agreementId. +type Offer @entity(immutable: false) { id: Bytes! payer: Bytes! + "Latest offer type observed (1 = OFFER_TYPE_NEW, 2 = OFFER_TYPE_UPDATE)." offerType: Int! + "Latest stored offer hash; tracks the on-chain rcaOffers / rcauOffers entry." offerHash: Bytes! + "Timestamp when the offer was canceled on-chain (0 if currently live)." + canceledAt: BigInt! + "Block/timestamp/tx of the first OFFER_TYPE_NEW for this agreement id." createdAtBlock: BigInt! createdAtTimestamp: BigInt! createdAtTx: Bytes! diff --git a/src/recurringCollector.ts b/src/recurringCollector.ts index 4ce32ab..88ab0ec 100644 --- a/src/recurringCollector.ts +++ b/src/recurringCollector.ts @@ -5,17 +5,29 @@ import { AgreementUpdated, RCACollected, OfferStored as OfferStoredEvent, + OfferCancelled as OfferCancelledEvent, } from '../generated/RecurringCollector/RecurringCollector' import { createOrLoadIndexingAgreement, BIGINT_ZERO } from './helpers' +// CancelAgreementBy enum from IRecurringCollector.sol: +// 0 = ServiceProvider, 1 = Payer, 2 = ThirdParty +// The contract treats anything that isn't Payer as ServiceProvider when +// emitting the SubgraphService-side IndexingAgreementCanceled event, so we +// mirror that mapping here. ThirdParty (2) is currently unreachable from +// SubgraphService — adding the explicit branch documents the contract's +// intent and keeps the mapping correct if a future data service surfaces it. +const CANCEL_BY_PAYER: i32 = 1 + export function handleAgreementAccepted(event: AgreementAccepted): void { let agreement = createOrLoadIndexingAgreement(event.params.agreementId) + // The contract sets `agreement.acceptedAt = uint64(block.timestamp)` inside + // accept(), so the event's block timestamp is the canonical value. agreement.payer = event.params.payer agreement.indexer = event.params.serviceProvider agreement.state = 'Accepted' - agreement.acceptedAt = event.params.acceptedAt - agreement.lastCollectionAt = event.params.acceptedAt + agreement.acceptedAt = event.block.timestamp + agreement.lastCollectionAt = event.block.timestamp agreement.endsAt = event.params.endsAt agreement.maxInitialTokens = event.params.maxInitialTokens agreement.maxOngoingTokensPerSecond = event.params.maxOngoingTokensPerSecond @@ -32,16 +44,18 @@ export function handleAgreementCanceled(event: AgreementCanceled): void { let agreement = IndexingAgreement.load(event.params.agreementId) if (agreement == null) return - // canceledBy enum: 0=ServiceProvider, 1=Payer. The actual canceler address - // is written by subgraphService.handleIndexingAgreementCanceled, which - // fires in the same transaction and reads the SubgraphService event's - // canceledOnBehalfOf parameter. - if (event.params.canceledBy == 0) { - agreement.state = 'CanceledByServiceProvider' - } else { + // The actual canceler address is written by + // subgraphService.handleIndexingAgreementCanceled, which fires in the + // same transaction and reads the SubgraphService event's + // canceledOnBehalfOf parameter. The contract sets + // `agreement.canceledAt = uint64(block.timestamp)` inside cancel(), so + // the event's block timestamp is the canonical value. + if (event.params.canceledBy == CANCEL_BY_PAYER) { agreement.state = 'CanceledByPayer' + } else { + agreement.state = 'CanceledByServiceProvider' } - agreement.canceledAt = event.params.canceledAt + agreement.canceledAt = event.block.timestamp agreement.lastStateChangeBlock = event.block.number agreement.save() } @@ -50,7 +64,7 @@ export function handleAgreementUpdated(event: AgreementUpdated): void { let agreement = IndexingAgreement.load(event.params.agreementId) if (agreement == null) return - agreement.lastUpdatedAt = event.params.updatedAt + agreement.lastUpdatedAt = event.block.timestamp agreement.endsAt = event.params.endsAt agreement.maxInitialTokens = event.params.maxInitialTokens agreement.maxOngoingTokensPerSecond = event.params.maxOngoingTokensPerSecond @@ -71,22 +85,35 @@ export function handleRCACollected(event: RCACollected): void { } export function handleOfferStored(event: OfferStoredEvent): void { - // First-offer entity keyed by agreementId (bytes16). Immutable: if an - // entity already exists, a duplicate OfferStored event for the same - // agreement id (e.g. dipper crashed and re-submitted, or a chain reorg - // re-emitted) carries the same offerHash by construction and we return - // early. Writing to an immutable entity a second time is a graph-node - // error that would halt the subgraph, so the guard is load-bearing. - let existing = Offer.load(event.params.agreementId) - if (existing != null) { - return + // OfferStored fires once per agreementId for OFFER_TYPE_NEW and again + // for each OFFER_TYPE_UPDATE that changes the stored offer hash. The + // contract overwrites $.rcaOffers / $.rcauOffers in-place, so dipper's + // idempotency gate has to see the latest terms — keep the entity + // mutable and refresh offerType / offerHash on every event. `createdAt` + // fields stay pinned to the first OFFER_TYPE_NEW so consumers can + // distinguish initial offer from subsequent updates. + let offer = Offer.load(event.params.agreementId) + if (offer == null) { + offer = new Offer(event.params.agreementId) + offer.createdAtBlock = event.block.number + offer.createdAtTimestamp = event.block.timestamp + offer.createdAtTx = event.transaction.hash } - let offer = new Offer(event.params.agreementId) offer.payer = event.params.payer offer.offerType = event.params.offerType offer.offerHash = event.params.offerHash - offer.createdAtBlock = event.block.number - offer.createdAtTimestamp = event.block.timestamp - offer.createdAtTx = event.transaction.hash + offer.canceledAt = BIGINT_ZERO + offer.save() +} + +export function handleOfferCancelled(event: OfferCancelledEvent): void { + // OfferCancelled fires when a payer (or any signer at SCOPE_SIGNED) + // cancels a stored RCA/RCAU offer. The contract deletes the on-chain + // entry, so dipper's idempotency gate must treat the Offer as no longer + // live. Set canceledAt to the event's block timestamp; consumers query + // `canceledAt > 0` to decide "safe to re-submit". + let offer = Offer.load(event.params.agreementId) + if (offer == null) return + offer.canceledAt = event.block.timestamp offer.save() } diff --git a/subgraph.template.yaml b/subgraph.template.yaml index 1903840..272473e 100644 --- a/subgraph.template.yaml +++ b/subgraph.template.yaml @@ -50,13 +50,13 @@ dataSources: - name: RecurringCollector file: ./abis/RecurringCollector.json eventHandlers: - - event: AgreementAccepted(indexed address,indexed address,indexed address,bytes16,uint64,uint64,uint256,uint256,uint32,uint32) + - event: AgreementAccepted(indexed address,indexed address,indexed address,bytes16,uint64,uint256,uint256,uint32,uint32) handler: handleAgreementAccepted topic1: ["{{subgraphServiceAddress}}"] - - event: AgreementCanceled(indexed address,indexed address,indexed address,bytes16,uint64,uint8) + - event: AgreementCanceled(indexed address,indexed address,indexed address,bytes16,uint8) handler: handleAgreementCanceled topic1: ["{{subgraphServiceAddress}}"] - - event: AgreementUpdated(indexed address,indexed address,indexed address,bytes16,uint64,uint64,uint256,uint256,uint32,uint32) + - event: AgreementUpdated(indexed address,indexed address,indexed address,bytes16,uint64,uint256,uint256,uint32,uint32) handler: handleAgreementUpdated topic1: ["{{subgraphServiceAddress}}"] - event: RCACollected(indexed address,indexed address,indexed address,bytes16,bytes32,uint256,uint256) @@ -64,4 +64,6 @@ dataSources: topic1: ["{{subgraphServiceAddress}}"] - event: OfferStored(indexed bytes16,indexed address,indexed uint8,bytes32) handler: handleOfferStored + - event: OfferCancelled(indexed address,indexed bytes16,indexed bytes32) + handler: handleOfferCancelled file: ./src/recurringCollector.ts diff --git a/tests/recurringCollector.test.ts b/tests/recurringCollector.test.ts index aecd44c..23faf52 100644 --- a/tests/recurringCollector.test.ts +++ b/tests/recurringCollector.test.ts @@ -1,11 +1,21 @@ import { assert, describe, test, clearStore, afterEach, newMockEvent } from 'matchstick-as' import { Address, Bytes, BigInt, ethereum } from '@graphprotocol/graph-ts' -import { handleOfferStored } from '../src/recurringCollector' -import { OfferStored as OfferStoredEvent } from '../generated/RecurringCollector/RecurringCollector' +import { handleOfferStored, handleOfferCancelled } from '../src/recurringCollector' +import { + OfferStored as OfferStoredEvent, + OfferCancelled as OfferCancelledEvent, +} from '../generated/RecurringCollector/RecurringCollector' const PAYER = Address.fromString('0x0000000000000000000000000000000000000002') +const CALLER = Address.fromString('0x0000000000000000000000000000000000000003') const AGREEMENT_ID = Bytes.fromHexString('0x0102030405060708090a0b0c0d0e0f10') +// OFFER_TYPE_NEW from IAgreementCollector.sol after the audit reshuffle +// (NONE=0, NEW=1, UPDATE=2). Tests construct events with the live value so +// the stored entity matches what production indexers would see. +const OFFER_TYPE_NEW: i32 = 1 +const OFFER_TYPE_UPDATE: i32 = 2 + function createOfferStoredEvent( agreementId: Bytes, offerType: i32, @@ -31,6 +41,23 @@ function createOfferStoredEvent( return event } +function createOfferCancelledEvent( + caller: Address, + agreementId: Bytes, + hash: Bytes, +): OfferCancelledEvent { + let event = changetype(newMockEvent()) + + event.parameters = new Array() + event.parameters.push(new ethereum.EventParam('caller', ethereum.Value.fromAddress(caller))) + event.parameters.push( + new ethereum.EventParam('agreementId', ethereum.Value.fromFixedBytes(agreementId)), + ) + event.parameters.push(new ethereum.EventParam('hash', ethereum.Value.fromFixedBytes(hash))) + + return event +} + describe('handleOfferStored', () => { afterEach(() => { clearStore() @@ -38,30 +65,61 @@ describe('handleOfferStored', () => { test('first event creates Offer entity', () => { let offerHash = Bytes.fromHexString('0x' + 'aa'.repeat(32)) - let event = createOfferStoredEvent(AGREEMENT_ID, 0, offerHash) + let event = createOfferStoredEvent(AGREEMENT_ID, OFFER_TYPE_NEW, offerHash) handleOfferStored(event) assert.entityCount('Offer', 1) let id = AGREEMENT_ID.toHexString() assert.fieldEquals('Offer', id, 'payer', PAYER.toHexString()) - assert.fieldEquals('Offer', id, 'offerType', '0') + assert.fieldEquals('Offer', id, 'offerType', OFFER_TYPE_NEW.toString()) assert.fieldEquals('Offer', id, 'offerHash', offerHash.toHexString()) + assert.fieldEquals('Offer', id, 'canceledAt', '0') }) - test('duplicate event for same agreementId is a no-op (idempotency guard)', () => { - let offerHash = Bytes.fromHexString('0x' + 'aa'.repeat(32)) + test('OFFER_TYPE_UPDATE overwrites offerType and offerHash; createdAt stays pinned', () => { + let initialHash = Bytes.fromHexString('0x' + 'aa'.repeat(32)) + let updatedHash = Bytes.fromHexString('0x' + 'bb'.repeat(32)) + let id = AGREEMENT_ID.toHexString() - let event1 = createOfferStoredEvent(AGREEMENT_ID, 0, offerHash) - handleOfferStored(event1) + let initial = createOfferStoredEvent(AGREEMENT_ID, OFFER_TYPE_NEW, initialHash) + initial.block.number = BigInt.fromI32(100) + initial.block.timestamp = BigInt.fromI32(1000) + handleOfferStored(initial) + assert.fieldEquals('Offer', id, 'offerType', OFFER_TYPE_NEW.toString()) + assert.fieldEquals('Offer', id, 'offerHash', initialHash.toHexString()) + assert.fieldEquals('Offer', id, 'createdAtBlock', '100') + assert.fieldEquals('Offer', id, 'createdAtTimestamp', '1000') - // Second event for same agreementId must not halt on immutable re-write. - let event2 = createOfferStoredEvent(AGREEMENT_ID, 0, offerHash) - event2.transaction.hash = Bytes.fromHexString( - '0x1111111111111111111111111111111111111111111111111111111111111111', - ) as Bytes - handleOfferStored(event2) + let update = createOfferStoredEvent(AGREEMENT_ID, OFFER_TYPE_UPDATE, updatedHash) + update.block.number = BigInt.fromI32(200) + update.block.timestamp = BigInt.fromI32(2000) + handleOfferStored(update) assert.entityCount('Offer', 1) + // Latest terms reflect the UPDATE event... + assert.fieldEquals('Offer', id, 'offerType', OFFER_TYPE_UPDATE.toString()) + assert.fieldEquals('Offer', id, 'offerHash', updatedHash.toHexString()) + // ...but createdAt stays pinned to the initial OFFER_TYPE_NEW. + assert.fieldEquals('Offer', id, 'createdAtBlock', '100') + assert.fieldEquals('Offer', id, 'createdAtTimestamp', '1000') + }) + + test('OfferCancelled stamps canceledAt; OfferStored after that clears it', () => { + let offerHash = Bytes.fromHexString('0x' + 'aa'.repeat(32)) + let id = AGREEMENT_ID.toHexString() + + handleOfferStored(createOfferStoredEvent(AGREEMENT_ID, OFFER_TYPE_NEW, offerHash)) + assert.fieldEquals('Offer', id, 'canceledAt', '0') + + let cancelEvent = createOfferCancelledEvent(CALLER, AGREEMENT_ID, offerHash) + cancelEvent.block.timestamp = BigInt.fromI32(12345) + handleOfferCancelled(cancelEvent) + assert.fieldEquals('Offer', id, 'canceledAt', '12345') + + // A fresh OfferStored for the same agreement id should reset canceledAt + // so the idempotency gate sees the entity as live again. + handleOfferStored(createOfferStoredEvent(AGREEMENT_ID, OFFER_TYPE_NEW, offerHash)) + assert.fieldEquals('Offer', id, 'canceledAt', '0') }) })