Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ services:
- DEFAULT_BUCKET_NAME=video-summary
- DB_COLLECTION=video-rag-dev
- DETECTION_MODEL_DIR=${YOLOX_MODELS_MOUNT_PATH:-/app/models/yolox}
- EMBEDDING_BATCH_SIZE=${EMBEDDING_BATCH_SIZE:-32}
- MAX_PARALLEL_WORKERS=${MAX_PARALLEL_WORKERS:-}
volumes:
- "..:/app"
- "${YOLOX_MODELS_VOLUME_NAME:-vdms-yolox-models}:${YOLOX_MODELS_MOUNT_PATH:-/app/models/yolox}"
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ services:
- SDK_USE_OPENVINO=${SDK_USE_OPENVINO:-true}
- VDMS_DATAPREP_DEVICE=${VDMS_DATAPREP_DEVICE:-CPU}
- OV_MODELS_DIR=${OV_MODELS_DIR:-/app/ov_models}
- EMBEDDING_BATCH_SIZE=${EMBEDDING_BATCH_SIZE:-32}
- MAX_PARALLEL_WORKERS=${MAX_PARALLEL_WORKERS:-}

# Frame processing settings
- FRAME_INTERVAL=${FRAME_INTERVAL:-15}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ services:
- SDK_USE_OPENVINO=${SDK_USE_OPENVINO:-true}
- VDMS_DATAPREP_DEVICE=${VDMS_DATAPREP_DEVICE:-CPU}
- OV_MODELS_DIR=${OV_MODELS_DIR:-/app/ov_models}
- EMBEDDING_BATCH_SIZE=${EMBEDDING_BATCH_SIZE:-32}
- MAX_PARALLEL_WORKERS=${MAX_PARALLEL_WORKERS:-}

# Frame processing settings
- FRAME_INTERVAL=${FRAME_INTERVAL:-15}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,16 @@ curl -L "http://localhost:6007/v1/dataprep/videos/download?video_id=traffic_cam_
curl -X DELETE "http://localhost:6007/v1/dataprep/videos?video_id=traffic_cam_2024_10_21"
```

### Review processing telemetry

The telemetry endpoint captures per-request wall-clock timings, stage durations, throughput, and batch-level stats. Query the most recent entries directly from the DataPrep service (or via the pipeline-manager proxy) with:

```bash
curl --location 'http://localhost:6016/telemetry?limit=5'
```

See the [Telemetry Metrics](telemetry-metrics.md) reference for a complete breakdown of every field and how each value is calculated.

## Validate Services

1. Call `GET /v1/dataprep/health` – expect `status: ok`, the active embedding mode, and the OpenVINO flag when SDK mode is selected.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
# Telemetry Metrics

This note explains what the `/telemetry` endpoint returns, how each metric is computed, and how to interpret the numbers when tuning the VDMS DataPrep microservice.

## Endpoint recap

- **Path:** `GET /telemetry`
- **Query parameters:**
- `limit` (default `10`, max `100`) – number of most recent records to return (capped by the server-side retention window).
- `source` – optional filter that matches the request path that produced the entry (for example `/videos/upload`).
- **Response shape:**

Sample response:

```json
{
"count": 1,
"items": [
{
"request_id": "a2e00af4-3d62-4d3b-b9e2-5a08743b21b7",
"source": "/videos/minio",
"processing_mode": "sdk",
"timestamps": {
"requested_at": "2025-12-15T05:18:19.320075Z",
"completed_at": "2025-12-15T05:18:53.187766Z",
"wall_time_seconds": 33.42496871948242
},
"video": {
"bucket_name": "video-summary",
"video_id": "a0ee04eb-5dc2-450b-a4b7-16a230a1c282",
"filename": "sample.mp4",
"frame_interval": 20,
"fps": 30.0,
"total_frames": 17973,
"video_duration_seconds": 599.1,
"tags": [],
"video_url": "http://vdms-dataprep:8000/v1/dataprep/videos/download?video_id=a0ee04eb-5dc2-450b-a4b7-16a230a1c282&bucket_name=video-summary",
"video_rel_url": "/v1/dataprep/videos/download?video_id=a0ee04eb-5dc2-450b-a4b7-16a230a1c282&bucket_name=video-summary",
"processing_mode": "sdk"
},
"config": {
"embedding_mode": "sdk",
"object_detection_enabled": true,
"detection_confidence": 0.85,
"sdk_parallel_workers": 60,
"sdk_batch_size": 32
},
"counts": {
"frames_extracted": 899,
"items_after_detection": 2370,
"embeddings_stored": 2370
},
"stages": [
{
"name": "extraction",
"seconds": 14.225327968597412,
"percent_of_total": 42.55898663057204
},
{
"name": "detection",
"seconds": 388.91471695899963,
"percent_of_total": 48.282008923343355
},
{
"name": "embedding",
"seconds": 59.44792938232422,
"percent_of_total": 7.380192447729496
},
{
"name": "storage",
"seconds": 14.32844614982605,
"percent_of_total": 1.7788119983550996
}
],
"throughput": {
"embeddings_per_second": 960.7483555108118,
"wall_time_embeddings_per_second": 70.9559383300526,
"embedding_stage_embeddings_per_second": 39.86682168117158,
"frames_per_second": 63.19713696475425
},
"batches": [
{
"batch_index": 29,
"input_frames": 3,
"items_after_detection": 7,
"detection_seconds": 0.8262088298797607,
"embedding_seconds": 0.25011181831359863,
"storage_seconds": 0.06559395790100098,
"total_seconds": 1.1423156261444092,
"embeddings_stored": 7
},
<other batch details omitted for brevity>
]
}
]
}
```

Each `TelemetryRecord` is stored in JSONL under `data/telemetry/telemetry.jsonl` (or the configured path) and is served verbatim after lightweight normalization so that older float timestamps are converted to UTC ISO-8601 strings.

## Metric derivations

### Timestamps

| Field | Description | Calculation |
| --- | --- | --- |
| `requested_at` | When the pipeline accepted the request. | Captured at the start of processing and emitted as a UTC string (`YYYY-MM-DDTHH:MM:SS.sssZ`). |
| `completed_at` | When the final artifact (embeddings + manifests) was written. | Same formatting as `requested_at`, recorded after storage finishes. |
| `wall_time_seconds` | End-to-end time the request spent in the pipeline. | Difference between the completion and request timestamps (falls back to `0` if either timestamp is missing). |

### Video metadata

This block mirrors the request that was processed:

- `bucket_name`, `video_id`, `filename`, and `frame_interval` are copied from the active job. Numerical fields (`fps`, `total_frames`, `video_duration_seconds`) come straight from the frame extractor.
- `video_url` and `video_rel_url` point to the download endpoint for the processed video or stitched preview.
- `processing_mode` echoes the embedding execution path (`sdk` or `api`).

### Processing config

Fields such as `embedding_mode`, `object_detection_enabled`, `detection_confidence`, `sdk_parallel_workers`, and `sdk_batch_size` are captured from the resolved runtime configuration. They reflect the **effective** configuration (after environment variables, CLI args, and defaults are merged) so operators can correlate telemetry with tuning changes.

### Aggregate counts

| Field | Description |
| --- | --- |
| `frames_extracted` | Number of keyframes pulled from the source video before detection. |
| `items_after_detection` | Crops + frames that survived object detection filters. |
| `embeddings_stored` | Items that were successfully embedded and written to VDMS. This value should match the `embeddings` counter in the service logs for the same request. |

### Stage timings

Stage timing objects follow the schema `{name, seconds, percent_of_total}` and are produced by `_build_stage_timings`:

1. `seconds` equals the summed time spent in the stage per the pipeline stats.
2. Percentages always add up to `100` even when stages overlap:
- Extraction runs before anything else, so its percentage is `frame_extraction_seconds / wall_time_seconds`.
- Detection, embedding, and storage often overlap when the parallel pipeline is enabled. Their raw seconds are normalized against the **parallel budget**, computed as `(wall_time_seconds - extraction_seconds)`. Each stage receives a share of that budget proportional to its measured seconds. This highlights relative pressure inside the concurrently running stages without double-counting wall time.

### Throughput metrics

| Field | Description | Formula |
| --- | --- | --- |
| `embeddings_per_second` | Effective throughput for the entire request. Accounts for overlapping stages. | `embeddings_stored / effective_embedding_seconds`, where `effective_embedding_seconds = wall_time_seconds * (embedding_stage_percent / 100)`. Falls back to `wall_time_seconds` if the embedding stage percent is `0`. |
| `embedding_stage_embeddings_per_second` | Raw throughput during the embedding stage only. Useful for spotting model-level slowdowns. | `embeddings_stored / embedding_seconds_total`. |
| `wall_time_embeddings_per_second` | Wall-clock throughput that ignores stage overlap. | `embeddings_stored / wall_time_seconds`. |
| `frames_per_second` | Frame extraction throughput. | `frames_extracted / frame_extraction_seconds` (or `/ wall_time_seconds` if extraction time is unknown). |

### Batch breakdown

When SDK mode runs with batching enabled, each batch reports:

- `batch_index` – sequential identifier (starting at `1`).
- `input_frames` and `items_after_detection` – how many frames/crops were submitted for that batch.
- `detection_seconds`, `embedding_seconds`, `storage_seconds`, `total_seconds` – stage timing for the batch, captured before threading overhead is applied.
- `embeddings_stored` – how many embeddings survived all downstream filters.

These entries make it easy to identify skewed batches (for example, ones with large detection times because of busy scenes).
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ export ENABLE_OBJECT_DETECTION=${ENABLE_OBJECT_DETECTION:-true}
export DETECTION_CONFIDENCE=${DETECTION_CONFIDENCE:-0.85}
export FRAMES_TEMP_DIR=${FRAMES_TEMP_DIR:-"/tmp/dataprep"}
export VDMS_DATAPREP_LOG_LEVEL=${VDMS_DATAPREP_LOG_LEVEL:-INFO}
# Optional hard cap for SDK parallel workers (auto when unset)
export MAX_PARALLEL_WORKERS=${MAX_PARALLEL_WORKERS:-""}
export EMBEDDING_BATCH_SIZE=${EMBEDDING_BATCH_SIZE:-32}

# Embedding microservice configuration -------------------------------
export EMBEDDING_SERVER_PORT=${EMBEDDING_SERVER_PORT:-9777}
Expand Down Expand Up @@ -180,11 +183,18 @@ elif [ "$1" = "--down" ] && [ "$#" -eq 1 ]; then

# Build dataprep image
elif [ "$1" = "--build" ] && ([ "$#" -eq 1 ] || [ "$#" -eq 2 ]); then
tag=${2:-${REGISTRY}vdms-dataprep:${TAG:-latest}}
docker build -t $tag -f docker/Dockerfile .
if [ $? = 0 ]; then
docker images | grep $tag
echo "Image ${tag} was successfully built."
default_image="${REGISTRY}vdms-dataprep:${TAG:-latest}"
if ./build.sh; then
docker images | grep "${default_image}"
echo "Image ${default_image} was successfully built."

if [ $# -eq 2 ]; then
custom_tag="$2"
docker tag "${default_image}" "${custom_tag}"
echo "Tagged image ${default_image} as ${custom_tag}."
fi
else
echo -e "${RED}ERROR: build.sh failed. Please check the build logs for details.${NC}"
fi

# Build dataprep dev image
Expand Down Expand Up @@ -255,7 +265,12 @@ elif [ "$1" = "--nd" ] && [ "$#" -eq 1 ]; then

# Spin-up prod version of all services in daemon mode
elif [ "$#" -eq 0 ]; then
docker compose -f docker/compose.yaml up -d --build
if ! ./build.sh; then
echo -e "${RED}ERROR: build.sh failed. Please inspect the build logs.${NC}"
exit 1
fi

docker compose -f docker/compose.yaml up -d --no-build
if [ $? = 0 ]; then
docker ps | grep "${PROJECT_NAME}"
echo "Prod environment is up!"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,30 +157,101 @@ class VideoSummaryRequest(BaseModel):
bucket_name: Annotated[
str, Field(description="The Minio bucket name where the referenced video is stored")
]
video_id: Annotated[
str,
Field(
description="The video ID (directory in Minio bucket) containing the referenced video"
),
]
video_summary: Annotated[
str, Field(description="The summary text for the video to be embedded")
]
video_start_time: Annotated[
float,
Field(
ge=0,
description="The start timestamp in seconds for the video or video chunk",
),
]
video_end_time: Annotated[
float,
Field(description="The end timestamp in seconds for the video or video chunk"),
]
tags: Annotated[
Optional[List[str]],
Field(
default_factory=list,
description="List of tags to be associated with the video. Useful for filtering the search.",
),
]


class TelemetryStageTiming(BaseModel):
"""Represents the percentage contribution of a processing stage."""

name: str = Field(description="Stage label (extraction, detection, embedding, storage)")
seconds: float = Field(ge=0.0, description="Summed duration for the stage")
percent_of_total: float = Field(
ge=0.0,
le=100.0,
description="Percentage of overall pipeline wall time",
)


class TelemetryBatchDetail(BaseModel):
"""Timing details for a single batch in SDK mode."""

batch_index: int = Field(ge=1)
input_frames: int = Field(ge=0)
items_after_detection: int = Field(ge=0)
detection_seconds: float = Field(ge=0.0)
embedding_seconds: float = Field(ge=0.0)
storage_seconds: float = Field(ge=0.0)
total_seconds: float = Field(ge=0.0)
embeddings_stored: int = Field(ge=0)


class TelemetryCounts(BaseModel):
"""Aggregate frame and embedding counts."""

frames_extracted: int = Field(ge=0)
items_after_detection: int = Field(ge=0)
embeddings_stored: int = Field(ge=0)


class TelemetryThroughput(BaseModel):
"""Derived throughput metrics."""

embeddings_per_second: float = Field(ge=0.0)
embedding_stage_embeddings_per_second: float = Field(ge=0.0)
wall_time_embeddings_per_second: float = Field(ge=0.0)
frames_per_second: float = Field(ge=0.0)


class TelemetryVideoMetadata(BaseModel):
"""Snapshot of the processed video's metadata."""

bucket_name: str
video_id: str
filename: str
frame_interval: int
fps: Optional[float] = None
total_frames: Optional[int] = None
video_duration_seconds: Optional[float] = None
tags: List[str] = Field(default_factory=list)
video_url: Optional[str] = None
video_rel_url: Optional[str] = None
processing_mode: Optional[str] = None


class TelemetryProcessingConfig(BaseModel):
"""Processing configuration persisted with telemetry."""

embedding_mode: str
object_detection_enabled: bool
detection_confidence: Optional[float] = None
sdk_parallel_workers: Optional[int] = None
sdk_batch_size: Optional[int] = None


class TelemetryTimestamps(BaseModel):
"""Request lifecycle timestamps."""

requested_at: str = Field(description="UTC timestamp when processing started")
completed_at: str = Field(description="UTC timestamp when processing finished")
wall_time_seconds: float = Field(ge=0.0)


class TelemetryRecord(BaseModel):
"""Stored telemetry entry served via /telemetry endpoint."""

request_id: str
source: str
processing_mode: str
timestamps: TelemetryTimestamps
video: TelemetryVideoMetadata
config: TelemetryProcessingConfig
counts: TelemetryCounts
stages: List[TelemetryStageTiming]
throughput: TelemetryThroughput
batches: List[TelemetryBatchDetail] = Field(default_factory=list)


class TelemetryResponse(BaseModel):
"""Response payload for /telemetry endpoint."""

count: int
items: List[TelemetryRecord]
Loading
Loading