Skip to content

Commit

Permalink
fix: handle firehose subscription error reconnect (Close bluesky-soci…
Browse files Browse the repository at this point in the history
  • Loading branch information
yuna0x0 authored Jun 15, 2023
1 parent 6889ac1 commit 0e056c5
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 12 deletions.
5 changes: 4 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,7 @@ FEEDGEN_HOSTNAME="example.com"
FEEDGEN_PUBLISHER_DID="did:plc:abcde...."

# Only use this if you want a service did different from did:web
# FEEDGEN_SERVICE_DID="did:plc:abcde..."
# FEEDGEN_SERVICE_DID="did:plc:abcde..."

# Delay between reconnect attempts to the firehose subscription endpoint (in milliseconds)
FEEDGEN_SUBSCRIPTION_RECONNECT_DELAY=3000
1 change: 1 addition & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ export type Config = {
subscriptionEndpoint: string
serviceDid: string
publisherDid: string
subscriptionReconnectDelay: number
}
2 changes: 2 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ const run = async () => {
'wss://bsky.social',
publisherDid:
maybeStr(process.env.FEEDGEN_PUBLISHER_DID) ?? 'did:example:alice',
subscriptionReconnectDelay:
maybeInt(process.env.FEEDGEN_SUBSCRIPTION_RECONNECT_DELAY) ?? 3000,
hostname,
serviceDid,
})
Expand Down
2 changes: 1 addition & 1 deletion src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ export class FeedGenerator {

async start(): Promise<http.Server> {
await migrateToLatest(this.db)
this.firehose.run()
this.firehose.run(this.cfg.subscriptionReconnectDelay)
this.server = this.app.listen(this.cfg.port, this.cfg.listenhost)
await events.once(this.server, 'listening')
return this.server
Expand Down
25 changes: 15 additions & 10 deletions src/util/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,22 @@ export abstract class FirehoseSubscriptionBase {

abstract handleEvent(evt: RepoEvent): Promise<void>

async run() {
for await (const evt of this.sub) {
try {
await this.handleEvent(evt)
} catch (err) {
console.error('repo subscription could not handle message', err)
}
// update stored cursor every 20 events or so
if (isCommit(evt) && evt.seq % 20 === 0) {
await this.updateCursor(evt.seq)
async run(subscriptionReconnectDelay: number) {
try {
for await (const evt of this.sub) {
try {
await this.handleEvent(evt)
} catch (err) {
console.error('repo subscription could not handle message', err)
}
// update stored cursor every 20 events or so
if (isCommit(evt) && evt.seq % 20 === 0) {
await this.updateCursor(evt.seq)
}
}
} catch (err) {
console.error('repo subscription errored', err)
setTimeout(() => this.run(subscriptionReconnectDelay), subscriptionReconnectDelay)
}
}

Expand Down

0 comments on commit 0e056c5

Please sign in to comment.