Skip to content

Commit fa2b0ea

Browse files
authored
Check for task cancelation before fetching exception (#2905)
1 parent e850bdf commit fa2b0ea

1 file changed

Lines changed: 10 additions & 2 deletions

File tree

connectors/es/sink.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1052,15 +1052,23 @@ async def async_bulk(
10521052
self._sink_task.add_done_callback(functools.partial(self.sink_task_callback))
10531053

10541054
def sink_task_callback(self, task):
1055-
if task.exception():
1055+
if task.cancelled():
1056+
self._logger.warning(
1057+
f"{type(self._sink).__name__}: {task.get_name()} was cancelled before completion"
1058+
)
1059+
elif task.exception():
10561060
self._logger.error(
10571061
f"Encountered an error in the sync's {type(self._sink).__name__}: {task.get_name()}",
10581062
exc_info=task.exception(),
10591063
)
10601064
self.error = task.exception()
10611065

10621066
def extractor_task_callback(self, task):
1063-
if task.exception():
1067+
if task.cancelled():
1068+
self._logger.warning(
1069+
f"{type(self._extractor).__name__}: {task.get_name()} was cancelled before completion"
1070+
)
1071+
elif task.exception():
10641072
self._logger.error(
10651073
f"Encountered an error in the sync's {type(self._extractor).__name__}: {task.get_name()}",
10661074
exc_info=task.exception(),

0 commit comments

Comments
 (0)