Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Callbacks necessary for AWX usage #8

Open
7 tasks
AlanCoding opened this issue Oct 22, 2024 · 1 comment
Open
7 tasks

Callbacks necessary for AWX usage #8

AlanCoding opened this issue Oct 22, 2024 · 1 comment
Labels
awx needed for AWX enhancement New feature or request MVP minimum set of issues to fix

Comments

@AlanCoding
Copy link
Member

This issue is to document all the custom code in AWX that is integrated into the dispatcher, and track implementation of callbacks that would be able to support these. I'm taking the approach of a code walkthrough.

I'll list check boxes for all the items, which I tried to keep all as separate topics.

  • Main Process Before Loop
  • Run Task on Startup
  • Task schedule with Main Process Stats
  • Broker self-check and recycle
  • Corner case of AWX exception handling
  • Worker Process Startup
  • Worker Process Shutdown

Main Process Startup

Obviously this is all synchronous code... which we could call before starting the async loop.

https://github.com/ansible/awx/blob/764dcbf94b51f9b55b467824208e4c8b9dc15786/awx/main/management/commands/run_dispatcher.py#L66

        DispatcherMetricsServer().start()

It also runs logic to get the settings. So if we do not call the dispatcher from AWX code, this would have to have some custom callback.

Main Process Startup (Run Task on Startup)

We already have a callback on the worker side.

https://github.com/ansible/awx/blob/764dcbf94b51f9b55b467824208e4c8b9dc15786/awx/main/dispatch/worker/task.py#L141

    def on_start(self):
        dispatch_startup()

    def on_stop(self):
        inform_cluster_of_shutdown()

However, it is called by the main process

https://github.com/ansible/awx/blob/764dcbf94b51f9b55b467824208e4c8b9dc15786/awx/main/dispatch/worker/base.py#L237

This is confusing to put it with worker, but it actually runs in the main process. There appears to be a distinction between this task and the other startup task, because this happens after we have a database connection. Also, this isn't how we would want to do this.

Since this runs general / misc. logic, I would suggest this becomes a run task on startup callback, meaning that it would run in a worker, after all system checks.

Task Schedule with Main Process Stats

This logic would be best moved into the worker. The problem is getting the stats from the main process into the worker, but then we would strongly prefer that the connection to redis be managed in the worker.

https://github.com/ansible/awx/blob/764dcbf94b51f9b55b467824208e4c8b9dc15786/awx/main/dispatch/worker/base.py#L202

            self.record_statistics()  # maintains time buffer in method

The need for main->worker communication is described in #6

Main Process Periodic

Tracking health of postgres connection is also done periodically, would require a new ground-up solution in the main async code.

Worker Process Startup

The signal handing and other stuff becomes internal to the dispatcher (the logic is moved here), but the following is highly AWX-specific.

https://github.com/ansible/awx/blob/764dcbf94b51f9b55b467824208e4c8b9dc15786/awx/main/dispatch/worker/base.py#L295

        set_connection_name('worker')  # set application_name to distinguish from other dispatcher processes

This would require a callback called on the worker process startup.

Worker Process Shutdown

In a completely different location we have this on worker process shutdown.

https://github.com/ansible/awx/blob/764dcbf94b51f9b55b467824208e4c8b9dc15786/awx/main/dispatch/worker/task.py#L127-L133

            # It's frustrating that we have to do this, but the python k8s
            # client leaves behind cacert files in /tmp, so we must clean up
            # the tmpdir per-dispatcher process every time a new task comes in
            try:
                kube_config._cleanup_temp_files()
            except Exception:
                logger.exception('failed to cleanup k8s client tmp files')

This would be a fairly straightforward callback, just like the worker startup callback.

Worker exception handling

Worker exception logic does:

                if getattr(exc, 'is_awx_task_error', False):
                    # Error caused by user / tracked in job output
                    logger.warning("{}".format(exc))

Worker callbacks, errbacks

This is an exception case, these are task-level callbacks.

And

            for callback in body.get('errbacks', []) or []:
                callback['uuid'] = body['uuid']
                self.perform_work(callback)

And

        for callback in body.get('callbacks', []) or []:
            callback['uuid'] = body['uuid']
            self.perform_work(callback)

These are submitted as a part of the .apply_async call, and the confusing thing here is that these are implemented in the worker. My belief is that this is a hold-over from Celery that never got fully implemented as one would expect. So I would suggest the options are:

  • support callbacks & errbacks as Celery did, which means they are handled in the main process
  • Hack the specific case for job running, which is fairly trivial
@AlanCoding
Copy link
Member Author

Another one:

Before publishing, a callback to add additional data to the JSON data sent. This would be used by AWX for guid in logging.

@AlanCoding AlanCoding added awx needed for AWX MVP minimum set of issues to fix labels Jan 8, 2025
@AlanCoding AlanCoding added the enhancement New feature or request label Jan 16, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
awx needed for AWX enhancement New feature or request MVP minimum set of issues to fix
Projects
None yet
Development

No branches or pull requests

1 participant