Skip to content

Commit

Permalink
Merge pull request #983 from roboflow/fix/prevent-pipeline-manager-fr…
Browse files Browse the repository at this point in the history
…om-entering-infinite-loop-if-workflow-could-not-be-parsed

Prevent pipeline manager from entering infinite loop if workflow could not be parsed
  • Loading branch information
grzegorz-roboflow authored Jan 28, 2025
2 parents f36c249 + 450127c commit 2bd69ad
Showing 1 changed file with 19 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
init_rtc_peer_connection,
)
from inference.core.utils.async_utils import Queue as SyncAsyncQueue
from inference.core.workflows.errors import WorkflowSyntaxError
from inference.core.workflows.execution_engine.entities.base import WorkflowImageData


Expand Down Expand Up @@ -167,8 +168,8 @@ def _handle_command(self, request_id: str, payload: dict) -> None:

def _initialise_pipeline(self, request_id: str, payload: dict) -> None:
try:
self._watchdog = BasePipelineWatchDog()
parsed_payload = InitialisePipelinePayload.model_validate(payload)
watchdog = BasePipelineWatchDog()
buffer_sink = InMemoryBufferSink.init(
queue_size=parsed_payload.sink_configuration.results_buffer_size,
)
Expand All @@ -183,7 +184,7 @@ def _initialise_pipeline(self, request_id: str, payload: dict) -> None:
workflows_parameters=parsed_payload.processing_configuration.workflows_parameters,
on_prediction=self._buffer_sink.on_prediction,
max_fps=parsed_payload.video_configuration.max_fps,
watchdog=watchdog,
watchdog=self._watchdog,
source_buffer_filling_strategy=parsed_payload.video_configuration.source_buffer_filling_strategy,
source_buffer_consumption_strategy=parsed_payload.video_configuration.source_buffer_consumption_strategy,
video_source_properties=parsed_payload.video_configuration.video_source_properties,
Expand All @@ -192,7 +193,6 @@ def _initialise_pipeline(self, request_id: str, payload: dict) -> None:
video_metadata_input_name=parsed_payload.processing_configuration.video_metadata_input_name,
batch_collection_timeout=parsed_payload.video_configuration.batch_collection_timeout,
)
self._watchdog = watchdog
self._consumption_timeout = parsed_payload.consumption_timeout
self._last_consume_time = time.monotonic()
self._inference_pipeline.start(use_main_thread=False)
Expand Down Expand Up @@ -228,11 +228,18 @@ def _initialise_pipeline(self, request_id: str, payload: dict) -> None:
"wrong API key used.",
error_type=ErrorType.NOT_FOUND,
)
except WorkflowSyntaxError as error:
self._handle_error(
request_id=request_id,
error=error,
public_error_message="Provided workflow configuration is not valid.",
error_type=ErrorType.INVALID_PAYLOAD,
)

def _start_webrtc(self, request_id: str, payload: dict):
try:
self._watchdog = BasePipelineWatchDog()
parsed_payload = InitialiseWebRTCPipelinePayload.model_validate(payload)
watchdog = BasePipelineWatchDog()

def start_loop(loop: asyncio.AbstractEventLoop):
asyncio.set_event_loop(loop)
Expand Down Expand Up @@ -330,7 +337,7 @@ def webrtc_sink(
workflows_parameters=parsed_payload.processing_configuration.workflows_parameters,
on_prediction=chained_sink,
max_fps=parsed_payload.video_configuration.max_fps,
watchdog=watchdog,
watchdog=self._watchdog,
source_buffer_filling_strategy=parsed_payload.video_configuration.source_buffer_filling_strategy,
source_buffer_consumption_strategy=parsed_payload.video_configuration.source_buffer_consumption_strategy,
video_source_properties=parsed_payload.video_configuration.video_source_properties,
Expand All @@ -339,7 +346,6 @@ def webrtc_sink(
video_metadata_input_name=parsed_payload.processing_configuration.video_metadata_input_name,
batch_collection_timeout=parsed_payload.video_configuration.batch_collection_timeout,
)
self._watchdog = watchdog
self._inference_pipeline.start(use_main_thread=False)
self._responses_queue.put(
(
Expand Down Expand Up @@ -380,6 +386,13 @@ def webrtc_sink(
"wrong API key used.",
error_type=ErrorType.NOT_FOUND,
)
except WorkflowSyntaxError as error:
self._handle_error(
request_id=request_id,
error=error,
public_error_message="Provided workflow configuration is not valid.",
error_type=ErrorType.INVALID_PAYLOAD,
)

def _terminate_pipeline(self, request_id: str) -> None:
if self._inference_pipeline is None:
Expand Down

0 comments on commit 2bd69ad

Please sign in to comment.