Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions abis/RecurringCollector.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
{ "indexed": true, "name": "payer", "type": "address" },
{ "indexed": true, "name": "serviceProvider", "type": "address" },
{ "indexed": false, "name": "agreementId", "type": "bytes16" },
{ "indexed": false, "name": "acceptedAt", "type": "uint64" },
{ "indexed": false, "name": "endsAt", "type": "uint64" },
{ "indexed": false, "name": "maxInitialTokens", "type": "uint256" },
{ "indexed": false, "name": "maxOngoingTokensPerSecond", "type": "uint256" },
Expand All @@ -23,7 +22,6 @@
{ "indexed": true, "name": "payer", "type": "address" },
{ "indexed": true, "name": "serviceProvider", "type": "address" },
{ "indexed": false, "name": "agreementId", "type": "bytes16" },
{ "indexed": false, "name": "canceledAt", "type": "uint64" },
{ "indexed": false, "name": "canceledBy", "type": "uint8" }
],
"name": "AgreementCanceled",
Expand All @@ -36,7 +34,6 @@
{ "indexed": true, "name": "payer", "type": "address" },
{ "indexed": true, "name": "serviceProvider", "type": "address" },
{ "indexed": false, "name": "agreementId", "type": "bytes16" },
{ "indexed": false, "name": "updatedAt", "type": "uint64" },
{ "indexed": false, "name": "endsAt", "type": "uint64" },
{ "indexed": false, "name": "maxInitialTokens", "type": "uint256" },
{ "indexed": false, "name": "maxOngoingTokensPerSecond", "type": "uint256" },
Expand All @@ -59,5 +56,26 @@
],
"name": "RCACollected",
"type": "event"
},
{
"anonymous": false,
"inputs": [
{ "indexed": true, "name": "agreementId", "type": "bytes16" },
{ "indexed": true, "name": "payer", "type": "address" },
{ "indexed": true, "name": "offerType", "type": "uint8" },
{ "indexed": false, "name": "offerHash", "type": "bytes32" }
],
"name": "OfferStored",
"type": "event"
},
{
"anonymous": false,
"inputs": [
{ "indexed": true, "name": "caller", "type": "address" },
{ "indexed": true, "name": "agreementId", "type": "bytes16" },
{ "indexed": true, "name": "hash", "type": "bytes32" }
],
"name": "OfferCancelled",
"type": "event"
}
]
29 changes: 29 additions & 0 deletions schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,12 @@ type IndexingAgreement @entity(immutable: false) {
lastUpdatedAt: BigInt!
"Timestamp when agreement was canceled (0 if not canceled)"
canceledAt: BigInt!
"Address that initiated the cancel (zero address if not canceled). Taken from SubgraphService.IndexingAgreementCanceled.canceledOnBehalfOf so operator-initiated cancels are captured correctly."
canceledBy: Bytes!
"Total tokens collected over lifetime"
tokensCollected: BigInt!
"Block number of the latest state change on this agreement (Accepted / Updated / Canceled / RCACollected). Consumers that reconcile state diffs poll with `lastStateChangeBlock_gt` since last seen block."
lastStateChangeBlock: BigInt!
"Fee collection history"
collections: [IndexingFeeCollection!]! @derivedFrom(field: "agreement")
}
Expand Down Expand Up @@ -71,3 +75,28 @@ type IndexerDeploymentLatest @entity(immutable: false) {
blockNumber: BigInt!
blockTimestamp: BigInt!
}

# Latest stored offer per agreementId, keyed by bytes16 agreement ID.
# Dipper queries this entity as an idempotency gate -- avoids re-submitting
# an offer after a crashed-mid-flight restart where the on-chain tx landed
# but dipper lost track of it.
#
# Mutable because the contract overwrites the stored offer in two cases:
# (1) an OFFER_TYPE_UPDATE event refreshes the hash and type to the latest
# RCAU terms; (2) an OfferCancelled event stamps `canceledAt` so dipper
# treats `canceledAt > 0` as "no live offer; safe to re-submit." Storage
# stays bounded because there is at most one Offer entity per agreementId.
type Offer @entity(immutable: false) {
id: Bytes!
payer: Bytes!
"Latest offer type observed (1 = OFFER_TYPE_NEW, 2 = OFFER_TYPE_UPDATE)."
offerType: Int!
"Latest stored offer hash; tracks the on-chain rcaOffers / rcauOffers entry."
offerHash: Bytes!
"Timestamp when the offer was canceled on-chain (0 if currently live)."
canceledAt: BigInt!
"Block/timestamp/tx of the first OFFER_TYPE_NEW for this agreement id."
createdAtBlock: BigInt!
createdAtTimestamp: BigInt!
createdAtTx: Bytes!
}
8 changes: 7 additions & 1 deletion src/helpers.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { BigInt, Bytes } from '@graphprotocol/graph-ts'
import { Address, BigInt, Bytes } from '@graphprotocol/graph-ts'
import { IndexingAgreement } from '../generated/schema'

export const BIGINT_ZERO = BigInt.fromI32(0)
Expand All @@ -23,7 +23,13 @@ export function createOrLoadIndexingAgreement(agreementId: Bytes): IndexingAgree
agreement.maxSecondsPerCollection = 0
agreement.lastUpdatedAt = BIGINT_ZERO
agreement.canceledAt = BIGINT_ZERO
// Default to 20-byte zero address rather than Bytes.empty(). Graph-node
// serializes empty Bytes on non-nullable fields with unpredictable
// padding (observed as "0x00000000" in practice), which breaks strict
// 20-byte-address parsers on the consumer side.
agreement.canceledBy = Address.zero() as Bytes
agreement.tokensCollected = BIGINT_ZERO
agreement.lastStateChangeBlock = BIGINT_ZERO
}
return agreement
}
74 changes: 65 additions & 9 deletions src/recurringCollector.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,41 @@
import { IndexingAgreement } from '../generated/schema'
import { IndexingAgreement, Offer } from '../generated/schema'
import {
AgreementAccepted,
AgreementCanceled,
AgreementUpdated,
RCACollected,
OfferStored as OfferStoredEvent,
OfferCancelled as OfferCancelledEvent,
} from '../generated/RecurringCollector/RecurringCollector'
import { createOrLoadIndexingAgreement, BIGINT_ZERO } from './helpers'

// CancelAgreementBy enum from IRecurringCollector.sol:
// 0 = ServiceProvider, 1 = Payer, 2 = ThirdParty
// The contract treats anything that isn't Payer as ServiceProvider when
// emitting the SubgraphService-side IndexingAgreementCanceled event, so we
// mirror that mapping here. ThirdParty (2) is currently unreachable from
// SubgraphService — adding the explicit branch documents the contract's
// intent and keeps the mapping correct if a future data service surfaces it.
const CANCEL_BY_PAYER: i32 = 1

export function handleAgreementAccepted(event: AgreementAccepted): void {
let agreement = createOrLoadIndexingAgreement(event.params.agreementId)

// The contract sets `agreement.acceptedAt = uint64(block.timestamp)` inside
// accept(), so the event's block timestamp is the canonical value.
agreement.payer = event.params.payer
agreement.indexer = event.params.serviceProvider
agreement.state = 'Accepted'
agreement.acceptedAt = event.params.acceptedAt
agreement.lastCollectionAt = event.params.acceptedAt
agreement.acceptedAt = event.block.timestamp
agreement.lastCollectionAt = event.block.timestamp
agreement.endsAt = event.params.endsAt
agreement.maxInitialTokens = event.params.maxInitialTokens
agreement.maxOngoingTokensPerSecond = event.params.maxOngoingTokensPerSecond
agreement.minSecondsPerCollection = event.params.minSecondsPerCollection.toI32()
agreement.maxSecondsPerCollection = event.params.maxSecondsPerCollection.toI32()
agreement.canceledAt = BIGINT_ZERO
agreement.tokensCollected = BIGINT_ZERO
agreement.lastStateChangeBlock = event.block.number

agreement.save()
}
Expand All @@ -30,26 +44,33 @@ export function handleAgreementCanceled(event: AgreementCanceled): void {
let agreement = IndexingAgreement.load(event.params.agreementId)
if (agreement == null) return

// canceledBy enum: 0=ServiceProvider, 1=Payer
if (event.params.canceledBy == 0) {
agreement.state = 'CanceledByServiceProvider'
} else {
// The actual canceler address is written by
// subgraphService.handleIndexingAgreementCanceled, which fires in the
// same transaction and reads the SubgraphService event's
// canceledOnBehalfOf parameter. The contract sets
// `agreement.canceledAt = uint64(block.timestamp)` inside cancel(), so
// the event's block timestamp is the canonical value.
if (event.params.canceledBy == CANCEL_BY_PAYER) {
agreement.state = 'CanceledByPayer'
} else {
agreement.state = 'CanceledByServiceProvider'
}
agreement.canceledAt = event.params.canceledAt
agreement.canceledAt = event.block.timestamp
agreement.lastStateChangeBlock = event.block.number
agreement.save()
}

export function handleAgreementUpdated(event: AgreementUpdated): void {
let agreement = IndexingAgreement.load(event.params.agreementId)
if (agreement == null) return

agreement.lastUpdatedAt = event.params.updatedAt
agreement.lastUpdatedAt = event.block.timestamp
agreement.endsAt = event.params.endsAt
agreement.maxInitialTokens = event.params.maxInitialTokens
agreement.maxOngoingTokensPerSecond = event.params.maxOngoingTokensPerSecond
agreement.minSecondsPerCollection = event.params.minSecondsPerCollection.toI32()
agreement.maxSecondsPerCollection = event.params.maxSecondsPerCollection.toI32()
agreement.lastStateChangeBlock = event.block.number
agreement.save()
}

Expand All @@ -59,5 +80,40 @@ export function handleRCACollected(event: RCACollected): void {

agreement.lastCollectionAt = event.block.timestamp
agreement.tokensCollected = agreement.tokensCollected.plus(event.params.tokens)
agreement.lastStateChangeBlock = event.block.number
agreement.save()
}

export function handleOfferStored(event: OfferStoredEvent): void {
// OfferStored fires once per agreementId for OFFER_TYPE_NEW and again
// for each OFFER_TYPE_UPDATE that changes the stored offer hash. The
// contract overwrites $.rcaOffers / $.rcauOffers in-place, so dipper's
// idempotency gate has to see the latest terms — keep the entity
// mutable and refresh offerType / offerHash on every event. `createdAt`
// fields stay pinned to the first OFFER_TYPE_NEW so consumers can
// distinguish initial offer from subsequent updates.
let offer = Offer.load(event.params.agreementId)
if (offer == null) {
offer = new Offer(event.params.agreementId)
offer.createdAtBlock = event.block.number
offer.createdAtTimestamp = event.block.timestamp
offer.createdAtTx = event.transaction.hash
}
offer.payer = event.params.payer
offer.offerType = event.params.offerType
offer.offerHash = event.params.offerHash
offer.canceledAt = BIGINT_ZERO
offer.save()
}

export function handleOfferCancelled(event: OfferCancelledEvent): void {
// OfferCancelled fires when a payer (or any signer at SCOPE_SIGNED)
// cancels a stored RCA/RCAU offer. The contract deletes the on-chain
// entry, so dipper's idempotency gate must treat the Offer as no longer
// live. Set canceledAt to the event's block timestamp; consumers query
// `canceledAt > 0` to decide "safe to re-submit".
let offer = Offer.load(event.params.agreementId)
if (offer == null) return
offer.canceledAt = event.block.timestamp
offer.save()
}
14 changes: 14 additions & 0 deletions src/subgraphService.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { ethereum } from '@graphprotocol/graph-ts'
import {
IndexingAgreementAccepted as AcceptedEvent,
IndexingAgreementCanceled as CanceledEvent,
IndexingAgreementUpdated as UpdatedEvent,
IndexingFeesCollectedV1 as FeesCollectedEvent,
} from '../generated/SubgraphService/SubgraphService'
Expand All @@ -19,6 +20,18 @@ export function handleIndexingAgreementAccepted(event: AcceptedEvent): void {
agreement.tokensPerEntityPerSecond = terms[1].toBigInt()
}

agreement.lastStateChangeBlock = event.block.number
agreement.save()
}

export function handleIndexingAgreementCanceled(event: CanceledEvent): void {
let agreement = createOrLoadIndexingAgreement(event.params.agreementId)
// canceledOnBehalfOf is the actual signer that initiated the cancel. For
// operator-initiated cancels this is the operator, not the payer/indexer
// directly. Dipper's chain_listener compares this to its own signer
// address to decide CanceledByRequester vs CanceledByIndexer.
agreement.canceledBy = event.params.canceledOnBehalfOf
agreement.lastStateChangeBlock = event.block.number
agreement.save()
}

Expand All @@ -33,6 +46,7 @@ export function handleIndexingAgreementUpdated(event: UpdatedEvent): void {
agreement.tokensPerEntityPerSecond = terms[1].toBigInt()
}

agreement.lastStateChangeBlock = event.block.number
agreement.save()
}

Expand Down
13 changes: 10 additions & 3 deletions subgraph.template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ dataSources:
eventHandlers:
- event: IndexingAgreementAccepted(indexed address,indexed address,indexed bytes16,address,bytes32,uint8,bytes)
handler: handleIndexingAgreementAccepted
- event: IndexingAgreementCanceled(indexed address,indexed address,indexed bytes16,address)
handler: handleIndexingAgreementCanceled
- event: IndexingAgreementUpdated(indexed address,indexed address,indexed bytes16,address,uint8,bytes)
handler: handleIndexingAgreementUpdated
- event: IndexingFeesCollectedV1(indexed address,indexed address,indexed bytes16,address,bytes32,uint256,uint256,uint256,bytes32,uint256,bytes)
Expand All @@ -43,20 +45,25 @@ dataSources:
language: wasm/assemblyscript
entities:
- IndexingAgreement
- Offer
abis:
- name: RecurringCollector
file: ./abis/RecurringCollector.json
eventHandlers:
- event: AgreementAccepted(indexed address,indexed address,indexed address,bytes16,uint64,uint64,uint256,uint256,uint32,uint32)
- event: AgreementAccepted(indexed address,indexed address,indexed address,bytes16,uint64,uint256,uint256,uint32,uint32)
handler: handleAgreementAccepted
topic1: ["{{subgraphServiceAddress}}"]
- event: AgreementCanceled(indexed address,indexed address,indexed address,bytes16,uint64,uint8)
- event: AgreementCanceled(indexed address,indexed address,indexed address,bytes16,uint8)
handler: handleAgreementCanceled
topic1: ["{{subgraphServiceAddress}}"]
- event: AgreementUpdated(indexed address,indexed address,indexed address,bytes16,uint64,uint64,uint256,uint256,uint32,uint32)
- event: AgreementUpdated(indexed address,indexed address,indexed address,bytes16,uint64,uint256,uint256,uint32,uint32)
handler: handleAgreementUpdated
topic1: ["{{subgraphServiceAddress}}"]
- event: RCACollected(indexed address,indexed address,indexed address,bytes16,bytes32,uint256,uint256)
handler: handleRCACollected
topic1: ["{{subgraphServiceAddress}}"]
- event: OfferStored(indexed bytes16,indexed address,indexed uint8,bytes32)
handler: handleOfferStored
- event: OfferCancelled(indexed address,indexed bytes16,indexed bytes32)
handler: handleOfferCancelled
file: ./src/recurringCollector.ts
Loading