-
Notifications
You must be signed in to change notification settings - Fork 202
ETCD: initial heartbeat thread #1010
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
👋 Hi tstamler! Thank you for contributing to ai-dynamo/nixl. Your PR reviewers will review your contribution then trigger the CI to test your changes. 🚀 |
|
/ok to test c2065fd |
|
/build |
aranadive
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
|
/ok to test 4e8f87d |
|
/build |
|
@tstamler I added the fixes we talked about. |
@tstamler, there was an error processing your request: See the following link for more information: https://docs.gha-runners.nvidia.com/cpr/e/2/ |
|
/ok to test 5220637 |
|
/build |
|
/build |
1 similar comment
|
/build |
|
/ok to test 73195df |
|
/ok to test 62eea59 |
|
/ok to test 179ecbd |
|
/build |
| // Get current index to watch from | ||
| etcd::Response response = etcd->get(metadata_key); | ||
| int64_t watch_index = response.index(); | ||
| int64_t watch_index = response.index() + 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please explain this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Setting a watcher for the current index can "replay" events from that index.
For example, an agent is removed and re-added, as can happen in the an elastic use case.
The created watcher gets the delete event and closes.
Setting +1 for the index it is created on, where it failed to fetch, will notify it of changes that happens from this point on.
| void | ||
| startHeartbeatThread(uint64_t lease_id) { | ||
| while (!heartbeat_thread_stop) { | ||
| // keep alive for twice the heartbeat interval |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please explain why 2x?
| etcd::Response response = etcd->leasegrant((heartbeat.count()) * 2); | ||
|
|
||
| if (response.is_ok()) { | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Empty line?
|
|
||
| NIXL_DEBUG << "Using etcd namespace for agents: " << namespace_prefix; | ||
|
|
||
| etcd::Response response = etcd->leasegrant((heartbeat.count()) * 2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why 2x?
| This posts metadata to etcd and then is killed. | ||
| """ | ||
| logger.info("Target subprocess started") | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we please reduce the empty lines in this file?
| * @var Heartbeat interval in seconds | ||
| * Interval for heartbeat that keeps remote metadata valid. | ||
| */ | ||
| std::chrono::seconds heartbeatInterval; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think, to be able to review this PR, there needs to be a detailed description that explains the why & how as per template.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we reuse commWorker thread instead of creating a new one?
Or, if it is not possible, use etcd builtin etcd::KeepAlive keepalive which is sipposed to do that internally
What?
Add a heartbeat mechanism to NIXL ETCD metadata to invalidate stale metadata if an agent dies.
When an agent uploads metadata to ETCD, it will only lease that data and not store permanently. Then, a heartbeat thread will be spawned to continue renewing the lease on the metadata as long as the agent exists. If the lease is not renewed, the metadata will be removed, which will automatically tell any NIXL agents watching to remove that agent.
Why?
If a NIXL agent has uploaded metadata to ETCD and is unexpectedly killed (or user exits application without invalidating), the metadata it had uploaded will remain in ETCD indefinitely and can confuse applications that rely on ETCD for agent coordination.