Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Host] Consumer pipeline revamp #347

Open
EtherZa opened this issue Nov 28, 2024 · 8 comments
Open

[Host] Consumer pipeline revamp #347

EtherZa opened this issue Nov 28, 2024 · 8 comments
Milestone

Comments

@EtherZa
Copy link
Contributor

EtherZa commented Nov 28, 2024

When per message scoping is enabled and an exception is thrown in the messaging pipeline; if a custom imeplementation of IConsumerErrorHandler is available, both the instnace of IConsumerErrorHandler and the retry function supplied to IConsumerErrorHandler.OnHandleErorr() are created under the original scope. This limits the functionality of the retry mechanism when transactions/scoped dependencies are in use.

An use case is when using a scope registered DbContext that experiences a transient error while in a transaction. Any subsequent retry will reuse the existing DbContent connection which is now in a failed state.

It would be ideal to only create the message scope when retry()/DoHandleInternal() is called, but the consumerInvoker instance that is made available to both IConsumerErrorHandler and IConsumerContext is dependent on the scope.

@EtherZa
Copy link
Contributor Author

EtherZa commented Nov 28, 2024

IConsumerContext is also dependent on a scoped instance of IMessasgeBus (MessageBusProxy).

@zarusz
Copy link
Owner

zarusz commented Nov 28, 2024

Are you planning a PR improve this?
If this something that is a priority for your project?

@EtherZa
Copy link
Contributor Author

EtherZa commented Nov 29, 2024

To be honest, there's not a lot that I can do to improve it without introducing breaking changes.

The basic flow at the moment is:

  1. create a IMessageScope
  2. create MessageBusProxy using scope
  3. create consumerInstance using scope
  4. create IConsumerContext using scope with MesageBusProxy + consumerInstance
  5. call DoHandleInternal(), passing the scoped IConsumerContext and MessageBusProxy
  6. on exception: call DoHandleError() passing int the same MessageBusProxy, IConsumerContext and IMessageScope
  7. Check for an isntance of IConsumerErrorHandler and recreate the steps in DoHandleInternal() to be co-ordinated by the IConsumerErrorHandler instance.

Due to this implementation being wrapped by a single IMessageScope, the IConsumerErrorHandler and retry() instances executes under the same scope as the original execution.

Possible solutions:

  1. Run through steps 1 to 5 as above and on an error check if there is an instance of IConsumerErrorHandler available. If so, create a new scope and execute OnHandleError() on the instance. IConsumerErrorHandler allows for multiple retry executions but due to the wrapped scope, the exception will just be deferred to the second exexcution.
  2. Remove the references to MessageBusProxy and Consumer from IConsumerContext and modify downstream to get the instances elsewhere. Move message scoping to happen in DoHandleInternal()/retry(). Scope is recreated as needed, but contracts are broken and contextual information may be missing.
  3. Do not pass an instance of IConsumerContext to IConsumerErrorHandler. Move message scoping to DoHandleInternal()/retry(). Allows for multiple retry attempts as in 3; but breaking change on IConsumerErrorHandler with loss of context.
  4. Extract the retry() functionality from IConsumerErrorHandler and have the handler return a state indicating that a retry is required** (the handler could then implement its own jitter/back-off implementation. Breaking change on IConsumerErrorHandler.
  5. Reimagine the pipeline where Resillience/Exception handling/Scopes/Request Response are just middleware. Allow for re-execution of child middleware. This is definitely the most extensible pattern, but requires a big refactor and has lots of breaking changes.

None of these options are ideal unfortunately. Do you have any other ideas/suggestions?

** it would be nice to optionally have an "abandon" state too, where a message can be sent directly to the DLQ instead of being served up by the service bus again (for non-transient exceptions).

@zarusz
Copy link
Owner

zarusz commented Nov 29, 2024

When the interceptors where introduced they were running intentionally on the same scope as the message handling scope (I wanted the interceptors to have access to the ongoing/current message scope).

Later, when Error Handler interceptors were introduced I had the same thought if not to create a whole new scope and debated this some at the time, and came to the conclusion these consumer error handlers are like a try-catch wrapper with the option of simply repeating the try-catch block. So a retry would re-use the existing scope, and it felt that the created message scope dependencies could be re-used (in most cases) and its more efficient to re-use them. Therefore I had not went down the road to re-create the message scope upon retry(). Of course this could be an additional option, but first need debate about this and weigh the prons/cons and effort.

Now, in your case I do understand the current design might not work (DbContext), and you really want to re-run the whole message processing (from the point of creating the scope) without letting it count towards retry count on the ASB side, correct?

To solve your case (as another option):

  • Option 6) We could let the ASB re-run the message at the transport level. In the ErrorHandler you could get some transport level extensions like we have for RabbitMq to return the message without incrementing the retry counter. Conversely, with this approach, you could have the ability to decide to DLQ the message without further ASB retries. I do understand there would be a delay related with re-scheduling the message on the transport level (not ideal).
  • I do like the idea of 5) that you've proposed, but that would require more breaking changes and I wonder about the memory overhead that such a design adds (currently interceptor pipeline runs as an option if you have interceptors for a given consumer).
  • The other options mentioned 1-4 that you mention would fall under let's recreate the scope (and dependencies) which perhaps could be done as an option (to recreate the scope or not, with default to not recreate the scope).

Also, I will give this some more thought and review the code as currently I have other things in my focus.

Please let me know.

@EtherZa
Copy link
Contributor Author

EtherZa commented Nov 30, 2024

My primary application is for applying retry policies (with back-off and jitter) on Azure infrastrucure after a transient failure. With this, it's not the incremented delivery count that is an issue, but rather that messages are sent back to the queue when they fail/lock on mass and end up being processed again in unison -locking once more. I could add an interceptor to apply a delay before returning it to the queue but that just doesn't feel quite right.

With regards to 5; I was intrigued, so I threw up a quick spike on implementation and to have a look at what the overhead would look like. The spike has various configurations, but the simple flow is to pass a message into a runner which steps through registered middleware with ad hoc dependency resolution. Each middleware instance passes the context on the next which allows for manipulation as/when required. This makes the implementation pretty flexible in terms of what could be supported (resillience, transactions, scopes, hybrid deserialization, message lifecycle, etc).

ScopedPipeline
SImulates a repeated execution with a full rescope..

NoScopePipeline
No resilience, no scoping but does include a transaction scope middleware.

OneThousandInterationNoopPipeline
1000 iterations of a no-op interceptor before calling the consumer (stack overhead of many interceptors).

MinPipeline
No middleware other than the consumer host.

NoPipeline
Same as MinPipeline but does not use a runner. A non-interceptor implementation.

Method Mean Error StdDev Median Allocated
ScopedPipeline 181.13 us 3.620 us 5.074 us 181.90 us 25.95 KB
NoScopePipeline 38.10 us 2.117 us 6.174 us 37.40 us 13.91 KB
OneThousandInterationNoopPipeline 301.53 us 5.041 us 5.394 us 301.30 us 114.41 KB
MinPipeline 29.03 us 1.284 us 3.726 us 27.90 us 5.91 KB
NoPipeline 27.36 us 1.588 us 4.633 us 26.90 us 4 KB

The samples are very rudamentory, and certainly would need refinement and bus integration, but do flesh out the idea. Of course, due to the breaking nature of the change, it may very well be too much too chew. The major causalty would be IConsumerErrorHandler but interceptors in general would be impacted if the full approach were to be taken (context manipulation).

@zarusz
Copy link
Owner

zarusz commented Dec 1, 2024

@EtherZa could you update the spike link provided? Its currently broken.

I like the middleware approach 5), as an option the middleware could decide if to redo the scope (default = false).
From your benchmark, there is overhead. The refined implementation could perhaps be more lean and perfomant.
As to the breaking changes, I am on the fence if still this could go into the v3 as I had the intent to release this after some more features (it has been in the making for some time now).

Let me think about your requirements some more and get back to you.

As a developer, I want to have the ability to inject my custom transient error handling logic and decide if to retry the whole message processing pipeline with the option to re-create the message scope.

@EtherZa
Copy link
Contributor Author

EtherZa commented Dec 1, 2024

@zarusz sorry, the repo was marked as private. The link should be good to go now.

It is definitely a fundamental shift and while the additional stack does add overhead, I'll leave it to you to decide if the benefits outweigh it or not. The spike is quick and dirty and definitely could be optimised. The boxing/unboxing is just one area that would need to be made more efficient, but the principal is there.

EtherZa added a commit to EtherZa/SlimMessageBus that referenced this issue Dec 29, 2024
EtherZa added a commit to EtherZa/SlimMessageBus that referenced this issue Dec 29, 2024
…e instead of supplying a 'retry' delegate

Signed-off-by: Richard Pringle <[email protected]>
@zarusz zarusz added this to the 4.0.0 milestone Dec 29, 2024
@zarusz
Copy link
Owner

zarusz commented Dec 29, 2024

Given the scope re-create feature is handled as part of #354, I suggest we leave the pipeline revamp exploration to another bigger milestone 4.0.0, once the 3.0.0 is released.

@zarusz zarusz changed the title [Host] IConsumerErrorHandler usage limited when using per message scopes [Host] Consumer pipeline revamp Dec 29, 2024
EtherZa added a commit to EtherZa/SlimMessageBus that referenced this issue Dec 29, 2024
…e instead of supplying a 'retry' delegate

Signed-off-by: Richard Pringle <[email protected]>
zarusz pushed a commit that referenced this issue Dec 29, 2024
…ead of supplying a 'retry' delegate

Signed-off-by: Richard Pringle <[email protected]>
EtherZa added a commit to EtherZa/SlimMessageBus that referenced this issue Dec 29, 2024
Signed-off-by: Richard Pringle <[email protected]>
zarusz pushed a commit that referenced this issue Dec 29, 2024
Signed-off-by: Richard Pringle <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants