Skip to content

Provide an EventId to correlate raw messages and parsed records#348

Closed
alnkesq wants to merge 1 commit intodrasticactions:developfrom
alnkesq:eventid
Closed

Provide an EventId to correlate raw messages and parsed records#348
alnkesq wants to merge 1 commit intodrasticactions:developfrom
alnkesq:eventid

Conversation

@alnkesq
Copy link
Copy Markdown
Contributor

@alnkesq alnkesq commented Mar 20, 2026

Because firehose records are parsed in parallel on a threadpool, results can arrive out-of-order.

This PR adds a way of linking them back to their original raw messages, so that a consumer can:

  • Decide to process them in-order, and, more importantly:
  • Be able to confidently checkpoint the last definitely processed event (firehose cursor resume)

Note: this PR only handles JetStreams, not ATProto sockets.
If you are ok with this change, I will implement the same for ATProto sockets (either here or in a separate PR)

For context

The way I currently solve the cursor checkpointing problem in AppViewLite is an ugly hack that involves "pausing" the threadpool until it drains and I can then capture the last definitely processed cursor. However this hack is prone to deadlocks and unnecessary slowdowns.

By contrast, this is an example of how the new EventIds can be used to reliably keep track of the last definitely processed record: OutOfOrderCompletionTracker.cs

@drasticactions
Copy link
Copy Markdown
Owner

drasticactions commented Mar 24, 2026

You can add it to this PR, but as a heads-up, I'm getting to the point where I'm going to stop work on this repo. I'm nearly at the point with a fully source gen implementation that I'm happy with. You should probably check that out and see if you can use that instead of(and implement whatever changes you want there). I think it can basically meet your needs (Although it needs an XRPC generator to be "complete" with this).

@alnkesq
Copy link
Copy Markdown
Contributor Author

alnkesq commented Mar 24, 2026

You can add it to this PR

By "it" do you mean OutOfOrderCompletionTracker?

Also I've just noticed that
ATJetStream.OnRawMessageReceived is EventHandler<JetStreamRawMessageEventArgs>, however
ATWebSocketProtocol.OnMessageReceived is EventHandler<ReadOnlySequence<byte>>

So I cannot add an EventId property to the latter without introducing breaking changes.

If you're ok with adding a OutOfOrderCompletionTracker-like mechanism directly to FishyFlip, I can expose a higher-level API such as

ATJetStream.LastDefinitelyProcessedCursor and
ATWebSocketProtocol.LastDefinitelyProcessedCursor

@drasticactions
Copy link
Copy Markdown
Owner

drasticactions commented Mar 24, 2026

Yeah, go for it. Given that you're the only one actively using those events AFAIK, and that this is a relatively minor breaking change, it's fine. Do what you need to do.

e. To be clear, again, though, I'm planning on stopping development on this. Once I merge this, I'll probably do one more bump of the lexicon bindings, release that as stable, and then stop making any changes to this repo and move any future work to Carpanet. This codebase is still a good base for what you need, though, so if you don't want to use my newer library (which I would totally understand, especially in your case), you should probably fork this outright or add it directly to AppViewLite.

@alnkesq
Copy link
Copy Markdown
Contributor Author

alnkesq commented Mar 24, 2026

Superseded by #349

@alnkesq alnkesq closed this Mar 25, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants