Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
7bd7ae6
* allow to save the computed optical flows in two video motion score …
HYLcool Nov 19, 2025
0d4f88a
* update according to gemini's comments
HYLcool Nov 19, 2025
4cc2347
* fix two test cases
HYLcool Nov 19, 2025
4b4ac2b
+ add video_motion_score_filter
HYLcool Nov 20, 2025
bf43b8e
* update uv.lock
HYLcool Nov 20, 2025
0a971ef
* update test cases
HYLcool Nov 20, 2025
2ef7897
Merge branch 'refs/heads/main' into feat/opt_flow_saving
HYLcool Nov 26, 2025
a06c332
Merge branch 'refs/heads/main' into feat/opt_flow_saving
HYLcool Dec 15, 2025
384047f
* update uv.lock
HYLcool Dec 15, 2025
3fc8685
* update build_op_doc hook: check the op num table as well
HYLcool Dec 15, 2025
7775a51
Merge branch 'main' into feat/opt_flow_saving
HYLcool Dec 16, 2025
fc4d0f5
Merge branch 'refs/heads/main' into feat/opt_flow_saving
HYLcool Jan 4, 2026
25472c6
Merge branch 'refs/heads/main' into feat/opt_flow_saving
HYLcool Jan 6, 2026
11c9294
* update uv.lock
HYLcool Jan 6, 2026
8d89423
* limit timm to v1.0.22 and update uv.lock
HYLcool Jan 6, 2026
606aa87
* use customized repos from org instead of personal
HYLcool Jan 6, 2026
0740aba
* fix cython building
HYLcool Jan 8, 2026
97549d1
* merge from main
HYLcool Jan 8, 2026
0fb1137
* update cuda version of the base layer
HYLcool Jan 8, 2026
decf446
* use "with" instead of decorator on function to avoid import torch w…
HYLcool Jan 9, 2026
d5364ab
Merge branch 'refs/heads/main' into feat/opt_flow_saving
HYLcool Jan 9, 2026
e443f98
* update optical flows saving to align the latest impl.
HYLcool Jan 9, 2026
10589f3
* replace uv path
HYLcool Jan 9, 2026
b626d64
+ add new arg in the subclass of video_motion_score_filter
HYLcool Jan 9, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/unit-test-partial.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
- name: Install data-juicer
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
run: |
docker compose exec ray-head bash -c '/root/.local/bin/uv pip install --system -e .\[all\]'
docker compose exec ray-head bash -c 'UV_HTTP_TIMEOUT=3600 /root/.local/bin/uv pip install --system -e .\[all\]'

- name: Print Pip Dependency Tree
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
Expand Down Expand Up @@ -90,8 +90,8 @@ jobs:
- name: Install data-juicer
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
run: |
docker compose exec ray-head bash -c '/root/.local/bin/uv pip install --system -e .\[all\]'
docker compose exec ray-worker bash -c '/root/.local/bin/uv pip install --system -e .\[all\]'
docker compose exec ray-head bash -c 'UV_HTTP_TIMEOUT=3600 /root/.local/bin/uv pip install --system -e .\[all\]'
docker compose exec ray-worker bash -c 'UV_HTTP_TIMEOUT=3600 /root/.local/bin/uv pip install --system -e .\[all\]'

- name: Clean dataset cache
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
Expand Down
32 changes: 26 additions & 6 deletions .pre-commit-hooks/build_op_doc.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,12 +340,14 @@ def get_op_list_from_code():
# get docs for formatters first
op_record_list = get_op_list_from_code_for_formatter()
# get docs for other ops
op_num_dict = {}
for type in os.listdir(OP_CODE_PREFIX):
if type in OP_EXCLUDE:
continue
type_dir = os.path.join(OP_CODE_PREFIX, type)
if os.path.isfile(type_dir):
continue
op_num_dict[type] = 0
for op in os.listdir(type_dir):
if op in OP_EXCLUDE:
continue
Expand All @@ -369,8 +371,9 @@ def get_op_list_from_code():
ref=ref_link(op.replace(".py", "")),
)
)
op_num_dict[type] += 1
op_record_list.sort(key=lambda record: (record.type, record.name))
return op_record_list
return op_record_list, op_num_dict


def generate_new_doc(op_record_list, old_op_record_list):
Expand Down Expand Up @@ -516,6 +519,22 @@ def get_op_desc_in_en_zh_batched(descs):
return zhs


def parse_op_num_from_doc(doc_content):
pattern = r"\| +(.*?) +\| +(.*?) +\| +(.*?) +\|"
link_pattern = r"\[(.*?)\]\(.*\)"
overview_section = doc_content.split("## Overview 概览")[1].split("##")[0]
res = re.findall(pattern, overview_section)
num_dict = {}
for type, num, desc in res:
if type == "Type 类型":
continue
type = re.findall(link_pattern, type)[0]
if type == "formatter":
continue
num_dict[type] = int(num)
return num_dict


def parse_op_record_from_current_doc():
"""
Parse the old-version OP records from the existing OP doc.
Expand All @@ -527,6 +546,7 @@ def parse_op_record_from_current_doc():
op_record_list = []
with open(DOC_PATH, "r", encoding="utf-8") as fin:
content = fin.read()
op_num_dict = parse_op_num_from_doc(content)
res = re.findall(tab_pattern, content)
for name, tags, desc, info, ref in res:
# skip table header
Expand All @@ -553,9 +573,9 @@ def parse_op_record_from_current_doc():
)
)
op_record_list.sort(key=lambda record: (record.type, record.name))
return op_record_list
return op_record_list, op_num_dict
else:
return []
return [], {}


def check_and_update_op_record(old_op_record_list, new_op_record_list):
Expand Down Expand Up @@ -620,11 +640,11 @@ def check_and_update_op_record(old_op_record_list, new_op_record_list):


def main():
old_op_record_list = parse_op_record_from_current_doc()
new_op_record_list = get_op_list_from_code()
old_op_record_list, old_op_num_dict = parse_op_record_from_current_doc()
new_op_record_list, new_op_num_dict = get_op_list_from_code()
updated_op_record_list = check_and_update_op_record(old_op_record_list, new_op_record_list)
# if the doc is changed, exit with non-zero value
if old_op_record_list == updated_op_record_list:
if new_op_num_dict == old_op_num_dict and old_op_record_list == updated_op_record_list:
exit(0)
else:
generate_new_doc(updated_op_record_list, old_op_record_list)
Expand Down
2 changes: 2 additions & 0 deletions data_juicer/ops/filter/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from .video_duration_filter import VideoDurationFilter
from .video_frames_text_similarity_filter import VideoFramesTextSimilarityFilter
from .video_motion_score_filter import VideoMotionScoreFilter
from .video_motion_score_ptlflow_filter import VideoMotionScorePtlflowFilter
from .video_motion_score_raft_filter import VideoMotionScoreRaftFilter
from .video_nsfw_filter import VideoNSFWFilter
from .video_ocr_area_ratio_filter import VideoOcrAreaRatioFilter
Expand Down Expand Up @@ -101,6 +102,7 @@
"VideoDurationFilter",
"VideoFramesTextSimilarityFilter",
"VideoMotionScoreFilter",
"VideoMotionScorePtlflowFilter",
"VideoMotionScoreRaftFilter",
"VideoNSFWFilter",
"VideoOcrAreaRatioFilter",
Expand Down
22 changes: 21 additions & 1 deletion data_juicer/ops/filter/video_motion_score_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import numpy as np
from pydantic import PositiveFloat, PositiveInt

from data_juicer.utils.constant import Fields, StatsKeys
from data_juicer.utils.constant import Fields, MetaKeys, StatsKeys
from data_juicer.utils.lazy_loader import LazyLoader
from data_juicer.utils.mm_utils import calculate_resized_dimensions

Expand Down Expand Up @@ -57,6 +57,8 @@ def __init__(
divisible: PositiveInt = 1,
relative: bool = False,
any_or_all: str = "any",
if_output_optical_flow: bool = False,
optical_flow_key: str = MetaKeys.video_optical_flow,
*args,
**kwargs,
):
Expand Down Expand Up @@ -84,6 +86,11 @@ def __init__(
all videos. 'any': keep this sample if any videos meet the
condition. 'all': keep this sample only if all videos meet the
condition.
:param if_output_optical_flow: Determines whether to output
the computed optical flows into the metas. The optical flows for each
video will be stored in the shape of (num_frame, H, W, 2)
:param optical_flow_key: The field name to store the optical flows. It's
"video_optical_flow" in default.
:param args: extra args
:param kwargs: extra args
"""
Expand Down Expand Up @@ -113,6 +120,12 @@ def __init__(
raise ValueError(f"Keep strategy [{any_or_all}] is not supported. " f'Can only be one of ["any", "all"].')
self.any = any_or_all == "any"

self.if_output_optical_flow = if_output_optical_flow
self.optical_flow_key = optical_flow_key

# setup model
self.model = None

def setup_model(self, rank=None):
self.model = cv2.calcOpticalFlowFarneback

Expand Down Expand Up @@ -141,12 +154,14 @@ def compute_stats_single(self, sample, rank=None, context=False):
# load videos
loaded_video_keys = sample[self.video_key]
unique_motion_scores = {}
video_optical_flows = {}
for video_key in loaded_video_keys:
# skip duplicate videos
if video_key in unique_motion_scores:
continue

video_motion_scores = []
optical_flows = []
with VideoCapture(video_key) as cap:
if cap.isOpened():
fps = cap.get(cv2.CAP_PROP_FPS)
Expand Down Expand Up @@ -176,6 +191,7 @@ def compute_stats_single(self, sample, rank=None, context=False):
flow, prev_frame = self.compute_flow(prev_frame, frame)
if flow is None:
continue
optical_flows.append(flow)
mag, _ = cv2.cartToPolar(flow[..., 0], flow[..., 1])
frame_motion_score = np.mean(mag)
if self.relative:
Expand All @@ -192,7 +208,11 @@ def compute_stats_single(self, sample, rank=None, context=False):
else:
unique_motion_scores[video_key] = np.mean(video_motion_scores or [-1])

video_optical_flows[video_key] = np.stack(optical_flows).tolist() if optical_flows else []

sample[Fields.stats][StatsKeys.video_motion_score] = [unique_motion_scores[key] for key in loaded_video_keys]
if self.if_output_optical_flow:
sample[Fields.meta][self.optical_flow_key] = [video_optical_flows[key] for key in loaded_video_keys]
return sample

def process_single(self, sample):
Expand Down
111 changes: 111 additions & 0 deletions data_juicer/ops/filter/video_motion_score_ptlflow_filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import sys
from typing import Optional, Tuple, Union

from jsonargparse import dict_to_namespace
from pydantic import PositiveFloat, PositiveInt

from data_juicer.ops.filter.video_motion_score_filter import VideoMotionScoreFilter
from data_juicer.utils.constant import MetaKeys
from data_juicer.utils.lazy_loader import LazyLoader
from data_juicer.utils.resource_utils import cuda_device_count

from ..base_op import OPERATORS, UNFORKABLE

torch = LazyLoader("torch")
tvm = LazyLoader("torchvision.models")
tvt = LazyLoader("torchvision.transforms")
ptlflow = LazyLoader("ptlflow")
ptlflow_io_adapter = LazyLoader("ptlflow.utils.io_adapter")

OP_NAME = "video_motion_score_ptlflow_filter"


@UNFORKABLE.register_module(OP_NAME)
@OPERATORS.register_module(OP_NAME)
class VideoMotionScorePtlflowFilter(VideoMotionScoreFilter):
"""Filter to keep samples with video motion scores within a specified range.
This operator utilizes the ptlflow library (https://github.com/hmorimitsu/ptlflow) to
predict optical flow between video frames. It keeps samples where the
video motion score is within the given min and max score range. The motion score is
computed based on the optical flow between frames, which is estimated using the models
supported in ptlflow. The operator can sample frames at a specified FPS and apply
transformations to the frames before computing the flow.
- The models in ptlflow is used to estimate the optical flow.
- Frames are preprocessed using a series of transformations including normalization and
color channel flipping.
- The motion score is calculated from the optical flow data.
- The operator can be configured to filter based on any or all frames in the video.
- The device for model inference (CPU or CUDA) is automatically detected and set.
For further details, refer to the official documentation:
https://ptlflow.readthedocs.io/
"""

_accelerator = "cuda"
_default_kwargs = {}

def __init__(
self,
min_score: float = 1.0,
max_score: float = sys.float_info.max,
model_name: str = "dpflow",
ckpt_path: Optional[str] = "things",
get_model_args: Optional[dict] = None,
sampling_fps: PositiveFloat = 2,
size: Union[PositiveInt, Tuple[PositiveInt], Tuple[PositiveInt, PositiveInt], None] = None,
max_size: Optional[PositiveInt] = None,
divisible: PositiveInt = 8,
relative: bool = False,
any_or_all: str = "any",
if_output_optical_flow: bool = False,
optical_flow_key: str = MetaKeys.video_optical_flow,
*args,
**kwargs,
):
super().__init__(
min_score,
max_score,
sampling_fps,
size,
max_size,
divisible,
relative,
any_or_all,
if_output_optical_flow,
optical_flow_key,
*args,
**kwargs,
)

self.model_name = model_name
self.ckpt_path = ckpt_path
if get_model_args is not None:
get_model_args = dict_to_namespace(get_model_args)
self.get_model_args = get_model_args

def setup_model(self, rank=None):
self.model = ptlflow.get_model(self.model_name, ckpt_path=self.ckpt_path, args=self.get_model_args)
if self.use_cuda():
rank = rank if rank is not None else 0
rank = rank % cuda_device_count()
self.device = f"cuda:{rank}"
else:
self.device = "cpu"
self.model.to(self.device)
self.model.eval()

def compute_flow(self, prev_frame, curr_frame):
if prev_frame is None:
flow = None
else:
io_adapter = ptlflow_io_adapter.IOAdapter(self.model, prev_frame.shape[:2])
frames = [prev_frame, curr_frame]
inputs = io_adapter.prepare_inputs(frames)
inputs = {key: value.to(self.device) for key, value in inputs.items()}
with torch.no_grad():
predictions = self.model(inputs)
flows = predictions.get("flows") # shape: (1, 1, 2, H, W)
flow = flows[-1][0].detach().cpu().numpy().transpose((1, 2, 0)) # 2, H, W -> H, W, 2
return flow, curr_frame
16 changes: 15 additions & 1 deletion data_juicer/ops/filter/video_motion_score_raft_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from pydantic import PositiveFloat, PositiveInt

from data_juicer.ops.filter.video_motion_score_filter import VideoMotionScoreFilter
from data_juicer.utils.constant import MetaKeys
from data_juicer.utils.lazy_loader import LazyLoader
from data_juicer.utils.resource_utils import cuda_device_count

Expand Down Expand Up @@ -55,11 +56,24 @@ def __init__(
divisible: PositiveInt = 8,
relative: bool = False,
any_or_all: str = "any",
if_output_optical_flow: bool = False,
optical_flow_key: str = MetaKeys.video_optical_flow,
*args,
**kwargs,
):
super().__init__(
min_score, max_score, sampling_fps, size, max_size, divisible, relative, any_or_all, *args, **kwargs
min_score,
max_score,
sampling_fps,
size,
max_size,
divisible,
relative,
any_or_all,
if_output_optical_flow,
optical_flow_key,
*args,
**kwargs,
)

def setup_model(self, rank=None):
Expand Down
2 changes: 2 additions & 0 deletions data_juicer/utils/constant.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ class MetaKeys(object):
video_object_segment_tags = "video_object_segment_tags"
# # depth info in video
video_depth_tags = "video_depth_tags"
# # video optical flow
video_optical_flow = "video_optical_flow"
# # info extracted by VGGT
vggt_tags = "vggt_tags"
# # image tags
Expand Down
4 changes: 2 additions & 2 deletions data_juicer/utils/model_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@
aes_pred = LazyLoader("aesthetics_predictor", "simple-aesthetics-predictor")
vllm = LazyLoader("vllm")
diffusers = LazyLoader("diffusers")
ram = LazyLoader("ram", "git+https://github.com/HYLcool/recognize-anything.git")
ram = LazyLoader("ram", "git+https://github.com/datajuicer/recognize-anything.git")
cv2 = LazyLoader("cv2", "opencv-python")
openai = LazyLoader("openai")
ultralytics = LazyLoader("ultralytics")
tiktoken = LazyLoader("tiktoken")
dashscope = LazyLoader("dashscope")
qwen_vl_utils = LazyLoader("qwen_vl_utils", "qwen-vl-utils")
transformers_stream_generator = LazyLoader(
"transformers_stream_generator", "git+https://github.com/HYLcool/transformers-stream-generator.git"
"transformers_stream_generator", "git+https://github.com/datajuicer/transformers-stream-generator.git"
)

MODEL_ZOO = {}
Expand Down
3 changes: 2 additions & 1 deletion docs/Operators.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ Data-Juicer 中的算子分为以下 8 种类型。
|------|:---------:|-------------|
| [aggregator](#aggregator) | 4 | Aggregate for batched samples, such as summary or conclusion. 对批量样本进行汇总,如得出总结或结论。 |
| [deduplicator](#deduplicator) | 10 | Detects and removes duplicate samples. 识别、删除重复样本。 |
| [filter](#filter) | 54 | Filters out low-quality samples. 过滤低质量样本。 |
| [filter](#filter) | 55 | Filters out low-quality samples. 过滤低质量样本。 |
| [formatter](#formatter) | 8 | Discovers, loads, and canonicalizes source data. 发现、加载、规范化原始数据。 |
| [grouper](#grouper) | 3 | Group samples to batched samples. 将样本分组,每一组组成一个批量样本。 |
| [mapper](#mapper) | 96 | Edits and transforms samples. 对数据样本进行编辑和转换。 |
Expand Down Expand Up @@ -144,6 +144,7 @@ All the specific operators are listed below, each featured with several capabili
| video_duration_filter | 🎬Video 💻CPU 🟢Stable | Keep data samples whose videos' durations are within a specified range. 保留视频持续时间在指定范围内的数据样本。 | [info](operators/filter/video_duration_filter.md) | - |
| video_frames_text_similarity_filter | 🔮Multimodal 🚀GPU 🧩HF 🟢Stable | Filter to keep samples based on the similarity between video frame images and text within a specific range. 根据视频帧图像和文本之间的相似性进行过滤,以保持样本在特定范围内。 | [info](operators/filter/video_frames_text_similarity_filter.md) | - |
| video_motion_score_filter | 🎬Video 💻CPU 🟢Stable | Filter to keep samples with video motion scores within a specific range. 过滤器将视频运动分数的样本保持在特定范围内。 | [info](operators/filter/video_motion_score_filter.md) | - |
| video_motion_score_ptlflow_filter | 🎬Video 🚀GPU 🟡Beta | Filter to keep samples with video motion scores within a specified range. 过滤器将视频运动分数的样本保持在指定范围内。 | - | - |
| video_motion_score_raft_filter | 🎬Video 🚀GPU 🟢Stable | Filter to keep samples with video motion scores within a specified range. 过滤器将视频运动分数的样本保持在指定范围内。 | [info](operators/filter/video_motion_score_raft_filter.md) | [RAFT](https://arxiv.org/abs/2003.12039) |
| video_nsfw_filter | 🎬Video 🚀GPU 🧩HF 🟢Stable | Filter to keep samples whose videos have nsfw scores in a specified range. 过滤器以保留其视频的nsfw分数在指定范围内的样本。 | [info](operators/filter/video_nsfw_filter.md) | - |
| video_ocr_area_ratio_filter | 🎬Video 🚀GPU 🟢Stable | Keep data samples whose detected text area ratios for specified frames in the video are within a specified range. 保留检测到的视频中指定帧的文本面积比率在指定范围内的数据样本。 | [info](operators/filter/video_ocr_area_ratio_filter.md) | - |
Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ vision = [
"rembg", # Background removal
"decord", # video/audio handling
"qwen-vl-utils==0.0.14", # for Qwen-VL
"ptlflow==0.4.1", # optical flow models collection
"timm==1.0.22", # avoid importing issue in the latest v1.0.23
]

# Natural Language Processing
Expand Down
Loading