Skip to content

Commit 18281f4

Browse files
committed
de-dupe unique balance updates
1 parent 42b01ca commit 18281f4

File tree

2 files changed

+80
-71
lines changed

2 files changed

+80
-71
lines changed

src/scripts/migrate-to-bank-denom-balance.ts

Lines changed: 62 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { Op, QueryTypes } from 'sequelize'
22

33
import { BankBalance, BankDenomBalance, Block, loadDb } from '@/db'
44
import { DbType } from '@/types'
5+
import { batch } from '@/utils'
56

67
async function main() {
78
console.log('Starting bank denom balance migration...')
@@ -16,10 +17,10 @@ async function main() {
1617
`Found ${initialBankBalanceCount.toLocaleString()} bank balances to migrate`
1718
)
1819

19-
// Migrate in batches of 3000.
20+
// Migrate in batches of 1,000.
21+
2022
let processed = 0
2123
let updated = 0
22-
const batchSize = 3000
2324
let lastAddress: string | undefined = undefined
2425

2526
while (processed < initialBankBalanceCount) {
@@ -30,7 +31,7 @@ async function main() {
3031
...(lastAddress ? { address: { [Op.gt]: lastAddress } } : {}),
3132
},
3233
order: [['address', 'ASC']],
33-
limit: batchSize,
34+
limit: 1_000,
3435
})
3536

3637
// Build values for bulk insert with timestamps
@@ -71,74 +72,66 @@ async function main() {
7172
}))
7273
)
7374

74-
// Cap total updates to 7,000, stopping before the first update that
75-
// would exceed the limit, unless there is only one update.
76-
let totalUpdates = 0
77-
for (let i = 0; i < updates.length; i++) {
78-
const update = updates[i]
79-
totalUpdates += update.updates.length
80-
if (totalUpdates > 7000 && i > 0) {
81-
// Skip the current and remaining updates.
82-
updates = updates.slice(0, i)
83-
break
84-
}
85-
}
86-
87-
processed += updates.length
88-
lastAddress =
89-
updates.length > 0
90-
? updates[updates.length - 1].bankBalance.address
91-
: undefined
92-
9375
if (updates.length > 0) {
94-
const values = updates.flatMap((update) => update.updates).flat()
95-
const valueStrings = updates
96-
.flatMap((update) => update.updates)
97-
.map(
98-
(_, index) =>
99-
`($${index * 6 + 1}, $${index * 6 + 2}, $${index * 6 + 3}, $${
100-
index * 6 + 4
101-
}, $${index * 6 + 5}, $${index * 6 + 6}, NOW(), NOW())`
102-
)
103-
.join(', ')
104-
105-
const query = `
106-
INSERT INTO "${BankDenomBalance.tableName}" (
107-
"address", "denom", "balance", "blockHeight",
108-
"blockTimeUnixMs", "blockTimestamp", "createdAt", "updatedAt"
109-
)
110-
VALUES ${valueStrings}
111-
ON CONFLICT ("address", "denom")
112-
DO UPDATE SET
113-
"balance" = CASE
114-
WHEN EXCLUDED."blockHeight"::bigint > "BankDenomBalances"."blockHeight"::bigint
115-
THEN EXCLUDED."balance"
116-
ELSE "BankDenomBalances"."balance"
117-
END,
118-
"blockHeight" = CASE
119-
WHEN EXCLUDED."blockHeight"::bigint > "BankDenomBalances"."blockHeight"::bigint
120-
THEN EXCLUDED."blockHeight"
121-
ELSE "BankDenomBalances"."blockHeight"
122-
END,
123-
"blockTimeUnixMs" = CASE
124-
WHEN EXCLUDED."blockHeight"::bigint > "BankDenomBalances"."blockHeight"::bigint
125-
THEN EXCLUDED."blockTimeUnixMs"
126-
ELSE "BankDenomBalances"."blockTimeUnixMs"
127-
END,
128-
"blockTimestamp" = CASE
129-
WHEN EXCLUDED."blockHeight"::bigint > "BankDenomBalances"."blockHeight"::bigint
130-
THEN EXCLUDED."blockTimestamp"
131-
ELSE "BankDenomBalances"."blockTimestamp"
132-
END,
133-
"updatedAt" = NOW()
134-
RETURNING id;
135-
`
136-
137-
const results = await BankDenomBalance.sequelize!.query(query, {
138-
bind: values,
139-
type: QueryTypes.SELECT,
76+
// Update in batches of 5,000.
77+
await batch({
78+
list: updates.flatMap((update) => update.updates),
79+
batchSize: 5_000,
80+
grouped: true,
81+
task: async (updates) => {
82+
const values = updates.flat()
83+
const valueStrings = updates.map(
84+
(_, index) =>
85+
`($${index * 6 + 1}, $${index * 6 + 2}, $${index * 6 + 3}, $${
86+
index * 6 + 4
87+
}, $${index * 6 + 5}, $${index * 6 + 6}, NOW(), NOW())`
88+
)
89+
90+
const query = `
91+
INSERT INTO "${BankDenomBalance.tableName}" (
92+
"address", "denom", "balance", "blockHeight",
93+
"blockTimeUnixMs", "blockTimestamp", "createdAt", "updatedAt"
94+
)
95+
VALUES ${valueStrings}
96+
ON CONFLICT ("address", "denom")
97+
DO UPDATE SET
98+
"balance" = CASE
99+
WHEN EXCLUDED."blockHeight"::bigint > "BankDenomBalances"."blockHeight"::bigint
100+
THEN EXCLUDED."balance"
101+
ELSE "BankDenomBalances"."balance"
102+
END,
103+
"blockHeight" = CASE
104+
WHEN EXCLUDED."blockHeight"::bigint > "BankDenomBalances"."blockHeight"::bigint
105+
THEN EXCLUDED."blockHeight"
106+
ELSE "BankDenomBalances"."blockHeight"
107+
END,
108+
"blockTimeUnixMs" = CASE
109+
WHEN EXCLUDED."blockHeight"::bigint > "BankDenomBalances"."blockHeight"::bigint
110+
THEN EXCLUDED."blockTimeUnixMs"
111+
ELSE "BankDenomBalances"."blockTimeUnixMs"
112+
END,
113+
"blockTimestamp" = CASE
114+
WHEN EXCLUDED."blockHeight"::bigint > "BankDenomBalances"."blockHeight"::bigint
115+
THEN EXCLUDED."blockTimestamp"
116+
ELSE "BankDenomBalances"."blockTimestamp"
117+
END,
118+
"updatedAt" = NOW()
119+
RETURNING id;
120+
`
121+
122+
await BankDenomBalance.sequelize!.query(query, {
123+
bind: values,
124+
type: QueryTypes.SELECT,
125+
})
126+
},
140127
})
141128

129+
processed += updates.length
130+
lastAddress =
131+
updates.length > 0
132+
? updates[updates.length - 1].bankBalance.address
133+
: undefined
134+
142135
// Delete BankBalances that were updated.
143136
await BankBalance.destroy({
144137
where: {
@@ -149,7 +142,7 @@ async function main() {
149142
})
150143

151144
console.log(
152-
`Updated ${results.length.toLocaleString()} bank denom balances`
145+
`Updated ${updates.length.toLocaleString()} bank denom balances`
153146
)
154147

155148
updated += updates.length

src/tracer/handlers/bank.ts

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,8 +174,24 @@ export const bank: HandlerMaker<ParsedBankStateEvent> = async ({
174174
: [],
175175
// Custom upsert logic to only update when blockHeight is newer
176176
(async () => {
177+
// Get latest event for each address/denom. The bulk insert cannot
178+
// affect the same row twice, and there may be duplicates in the same
179+
// block.
180+
const deduplicatedEvents = Object.values(
181+
events.reduce((acc, event) => {
182+
const key = `${event.address}:${event.denom}`
183+
if (
184+
!acc[key] ||
185+
BigInt(event.blockHeight) > BigInt(acc[key].blockHeight)
186+
) {
187+
acc[key] = event
188+
}
189+
return acc
190+
}, {} as Record<string, ParsedBankStateEvent>)
191+
)
192+
177193
// Build values for bulk insert with timestamps
178-
const valueStrings = events
194+
const valueStrings = deduplicatedEvents
179195
.map(
180196
(_, index) =>
181197
`($${index * 6 + 1}, $${index * 6 + 2}, $${index * 6 + 3}, $${
@@ -185,7 +201,7 @@ export const bank: HandlerMaker<ParsedBankStateEvent> = async ({
185201
.join(', ')
186202

187203
// Flatten all event values for parameterized query
188-
const values = events.flatMap((event) => [
204+
const values = deduplicatedEvents.flatMap((event) => [
189205
event.address,
190206
event.denom,
191207
event.balance,

0 commit comments

Comments
 (0)