-
Notifications
You must be signed in to change notification settings - Fork 11
Broker interceptor #3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
| val clientId = request.header.clientId | ||
|
|
||
| var result = clientId.split('_'); | ||
| var eventPattern = result(1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Kafka 0.11, you can use Record Header to store eventPattern info.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that is my previous plan, but it need additional work to parse it from header. I think this is not the highest priority issue we need to resolve, we could left hack way here and make code work firstly.
| } | ||
| } | ||
|
|
||
| while (batchs.hasNext()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reset the batchs iterator first?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, should be. But I test this two separately and add them together, so this is not the issue cause get out key failed.
| var eventPatternRegex = eventPattern.r; | ||
| var records = logReadInfo.records.batches().iterator(); | ||
| if (!eventPatternRegex.findFirstIn(key.toString).isDefined) { | ||
| records.remove(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need we store the final records to the result?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to double check and confirm this thing after resolve get out key failed issue.
…pache#7305) A partition log in initialized in following steps: 1. Fetch log config from ZK 2. Call LogManager.getOrCreateLog which creates the Log object, then 3. Registers the Log object Step #3 enables Configuration update thread to deliver configuration updates to the log. But if any update arrives between step #1 and #3 then that update is missed. It breaks following use case: 1. Create a topic with default configuration, and immediately after that 2. Update the configuration of topic There is a race condition here and in random cases update made in second step will get dropped. This change fixes it by tracking updates arriving between step #1 and #3 Once a Partition is done initializing log, it checks if it has missed any update. If yes, then the configuration is read from ZK again. Added unit tests to make sure a dirty configuration is refreshed. Tested on local cluster to make sure that topic configuration and updates are handled correctly. Reviewers: Jason Gustafson <[email protected]>
This is a draft code change, there are a lot of hard code and bug inside.