-
Notifications
You must be signed in to change notification settings - Fork 13.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Send important executor logs to task logs #40468
base: main
Are you sure you want to change the base?
Conversation
@@ -57,7 +88,7 @@ def __init__(self, component_name: str, call_site_logger: Logger | None = None): | |||
def _should_enable(self) -> bool: | |||
if not conf.getboolean("logging", "enable_task_context_logger"): | |||
return False | |||
if not getattr(self.task_handler, "supports_task_context_logging", False): | |||
if not self.task_handler: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just a simplification
46aa58f
to
76ba744
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a few comments. Also what about the Batch executor? Do you plan on fast following with that after this PR?
@@ -130,6 +131,10 @@ def __init__(self, parallelism: int = PARALLELISM): | |||
self.running: set[TaskInstanceKey] = set() | |||
self.event_buffer: dict[TaskInstanceKey, EventBufferValueType] = {} | |||
self.attempts: dict[TaskInstanceKey, RunningRetryAttemptType] = defaultdict(RunningRetryAttemptType) | |||
self.task_context_logger: TaskContextLogger = TaskContextLogger( | |||
component_name="Executor", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should each executor create one of these loggers? In a world of hybrid executor you may have multiple and this name will be shared amongst them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hesitated. I dont see a concern having one logger shared among multiple executors? If you prefer having multiple ones, I can also do it. I dont feel very opinionated on this one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fine either way, I'm not really sure what all the implications of sharing that logger are. So if you've tested it and it's working well then no worries!
Yep, my plan is to first do it with one executor and receive feedbacks, address feedbacks etc ... Then when the direction is set, I'll create another PR for |
Had a look and I don't have anything extra to add; I like the direction this is going. I'd need to see some examples to have any real opinion on the question of one or one-per-executor but otherwise, I think it looks good after that one change to the session creation that Niko mentioned. |
self.log.error( | ||
"could not queue task %s (still running after %d attempts)", key, attempt.total_tries | ||
self.task_context_logger.error( | ||
"could not queue task %s (still running after %d attempts)", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's confusing for a user to see that a task couldn't be queued because it's currently running. Is there any way to make this log message more useful/intuitive?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you have any suggestions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, but I've run into this when troubleshooting and it always confuses me 😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're the perfect candidate to create a meaningful error message then :)
@@ -386,14 +385,16 @@ def attempt_task_runs(self): | |||
) | |||
self.pending_tasks.append(ecs_task) | |||
else: | |||
self.log.error( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@syedahsn You should have a review of this if you get a chance. The failure reason handling has been modified here.
If the executor fails to start a task, the user will not see any logs in the UI because the task has not started. This PR leverages
TaskContextLogger
implemented in #32646. It forwards the important error messages when an executor fail to execute a task to the task logs.cc @o-nikolas
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.