Skip to content

Commit

Permalink
Merge pull request #942 from roboflow/feature/adjust-batch-processing…
Browse files Browse the repository at this point in the history
…-commands

Add changes required to effectively index content of batch processing
  • Loading branch information
PawelPeczek-Roboflow authored Jan 14, 2025
2 parents ad79281 + 8a99e2b commit 5ebf63a
Show file tree
Hide file tree
Showing 8 changed files with 329 additions and 99 deletions.
2 changes: 1 addition & 1 deletion inference/core/version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "0.33.0"
__version__ = "0.34.0rc1"


if __name__ == "__main__":
Expand Down
91 changes: 83 additions & 8 deletions inference_cli/lib/workflows/common.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import json
import os.path
import re
from collections import defaultdict
from copy import copy
from datetime import datetime
from functools import lru_cache
from threading import Lock
Expand All @@ -14,7 +16,13 @@
from rich.progress import track

from inference_cli.lib.utils import dump_json, dump_jsonl, read_json
from inference_cli.lib.workflows.entities import OutputFileType
from inference_cli.lib.workflows.entities import (
ImagePath,
ImageResultsIndexEntry,
OutputFileType,
WorkflowExecutionMetadataResultPath,
WorkflowOutputField,
)

BASE64_DATA_TYPE_PATTERN = re.compile(r"^data:image\/[a-z]+;base64,")

Expand Down Expand Up @@ -74,10 +82,10 @@ def get_progress_log_path(output_directory: str) -> str:

def dump_image_processing_results(
result: Dict[str, Any],
image_path: str,
image_path: ImagePath,
output_directory: str,
save_image_outputs: bool,
) -> None:
) -> ImageResultsIndexEntry:
images_in_result = []
if save_image_outputs:
images_in_result = extract_images_from_result(result=result)
Expand All @@ -92,21 +100,33 @@ def dump_image_processing_results(
path=structured_results_path,
content=structured_content,
)
dump_images_outputs(
image_outputs = dump_images_outputs(
image_results_dir=image_results_dir,
images_in_result=images_in_result,
)
return ImageResultsIndexEntry(
metadata_output_path=structured_results_path,
image_outputs=image_outputs,
)


def dump_images_outputs(
image_results_dir: str,
images_in_result: List[Tuple[str, np.ndarray]],
) -> None:
) -> Dict[WorkflowOutputField, List[ImagePath]]:
result = defaultdict(list)
for image_key, image in images_in_result:
target_path = os.path.join(image_results_dir, f"{image_key}.jpg")
target_path_dir = os.path.dirname(target_path)
os.makedirs(target_path_dir, exist_ok=True)
cv2.imwrite(target_path, image)
workflow_field = _extract_workflow_field_from_image_key(image_key=image_key)
result[workflow_field].append(target_path)
return result


def _extract_workflow_field_from_image_key(image_key: str) -> WorkflowOutputField:
return image_key.split("/")[0]


def construct_image_output_dir_path(image_path: str, output_directory: str) -> str:
Expand Down Expand Up @@ -203,7 +223,7 @@ def _is_file_system_case_sensitive() -> bool:

def report_failed_files(
failed_files: List[Tuple[str, str]], output_directory: str
) -> None:
) -> Optional[str]:
if not failed_files:
return None
os.makedirs(output_directory, exist_ok=True)
Expand All @@ -216,12 +236,13 @@ def report_failed_files(
print(
f"Detected {len(failed_files)} processing failures. Details saved under: {failed_files_path}"
)
return failed_files_path


def aggregate_batch_processing_results(
output_directory: str,
aggregation_format: OutputFileType,
) -> None:
) -> str:
file_descriptor, all_processed_files = open_progress_log(
output_directory=output_directory
)
Expand All @@ -247,7 +268,7 @@ def aggregate_batch_processing_results(
decoded_content, description="Dumping aggregated results to JSONL..."
),
)
return None
return aggregated_results_path
dumped_results = []
for decoded_result in track(
decoded_content, description="Dumping aggregated results to CSV..."
Expand All @@ -258,6 +279,7 @@ def aggregate_batch_processing_results(
data_frame = pd.DataFrame(dumped_results)
aggregated_results_path = os.path.join(output_directory, "aggregated_results.csv")
data_frame.to_csv(aggregated_results_path, index=False)
return aggregated_results_path


def dump_objects_to_json(value: Any) -> Any:
Expand All @@ -266,3 +288,56 @@ def dump_objects_to_json(value: Any) -> Any:
if isinstance(value, list) or isinstance(value, dict) or isinstance(value, set):
return json.dumps(value)
return value


class WorkflowsImagesProcessingIndex:

@classmethod
def init(cls) -> "WorkflowsImagesProcessingIndex":
return cls(index_content={}, registered_output_images=set())

def __init__(
self,
index_content: Dict[ImagePath, ImageResultsIndexEntry],
registered_output_images: Set[WorkflowOutputField],
):
self._index_content = index_content
self._registered_output_images = registered_output_images

@property
def registered_output_images(self) -> Set[WorkflowOutputField]:
return copy(self._registered_output_images)

def collect_entry(
self, image_path: ImagePath, entry: ImageResultsIndexEntry
) -> None:
self._index_content[image_path] = entry
for image_output_name in entry.image_outputs.keys():
self._registered_output_images.add(image_output_name)

def export_metadata(
self,
) -> List[Tuple[ImagePath, WorkflowExecutionMetadataResultPath]]:
return [
(image_path, index_entry.metadata_output_path)
for image_path, index_entry in self._index_content.items()
]

def export_images(
self,
) -> Dict[WorkflowOutputField, List[Tuple[ImagePath, List[ImagePath]]]]:
result = {}
for field_name in self._registered_output_images:
result[field_name] = self.export_images_for_field(field_name=field_name)
return result

def export_images_for_field(
self, field_name: WorkflowOutputField
) -> List[Tuple[ImagePath, List[ImagePath]]]:
results = []
for image_path, index_entry in self._index_content.items():
if field_name not in index_entry.image_outputs:
continue
registered_images = index_entry.image_outputs[field_name]
results.append((image_path, registered_images))
return results
4 changes: 2 additions & 2 deletions inference_cli/lib/workflows/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def process_images_directory_with_workflow(
process_image_directory_with_workflow_using_inference_package,
)

process_image_directory_with_workflow_using_inference_package(
_ = process_image_directory_with_workflow_using_inference_package(
input_directory=input_directory,
output_directory=output_directory,
workflow_specification=workflow_specification,
Expand All @@ -138,7 +138,7 @@ def process_images_directory_with_workflow(
debug_mode=debug_mode,
)
return None
process_image_directory_with_workflow_using_api(
_ = process_image_directory_with_workflow_using_api(
input_directory=input_directory,
output_directory=output_directory,
workflow_specification=workflow_specification,
Expand Down
25 changes: 25 additions & 0 deletions inference_cli/lib/workflows/entities.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
from dataclasses import dataclass, field
from enum import Enum
from typing import Dict, List, Optional, Tuple

ImagePath = str
WorkflowExecutionMetadataResultPath = str
WorkflowOutputField = str


class OutputFileType(str, Enum):
Expand All @@ -9,3 +15,22 @@ class OutputFileType(str, Enum):
class ProcessingTarget(str, Enum):
API = "api"
INFERENCE_PACKAGE = "inference_package"


@dataclass(frozen=True)
class ImageResultsIndexEntry:
metadata_output_path: WorkflowExecutionMetadataResultPath
image_outputs: Dict[WorkflowOutputField, List[ImagePath]]


@dataclass(frozen=True)
class ImagesDirectoryProcessingDetails:
output_directory: str
processed_images: int
failures: int
result_metadata_paths: List[Tuple[ImagePath, WorkflowExecutionMetadataResultPath]]
result_images_paths: Dict[
WorkflowOutputField, List[Tuple[ImagePath, List[ImagePath]]]
]
aggregated_results_path: Optional[str] = field(default=None)
failures_report_path: Optional[str] = field(default=None)
Loading

0 comments on commit 5ebf63a

Please sign in to comment.