Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
7bd7ae6
* allow to save the computed optical flows in two video motion score …
HYLcool Nov 19, 2025
0d4f88a
* update according to gemini's comments
HYLcool Nov 19, 2025
4cc2347
* fix two test cases
HYLcool Nov 19, 2025
4b4ac2b
+ add video_motion_score_filter
HYLcool Nov 20, 2025
bf43b8e
* update uv.lock
HYLcool Nov 20, 2025
0a971ef
* update test cases
HYLcool Nov 20, 2025
2ef7897
Merge branch 'refs/heads/main' into feat/opt_flow_saving
HYLcool Nov 26, 2025
a06c332
Merge branch 'refs/heads/main' into feat/opt_flow_saving
HYLcool Dec 15, 2025
384047f
* update uv.lock
HYLcool Dec 15, 2025
3fc8685
* update build_op_doc hook: check the op num table as well
HYLcool Dec 15, 2025
7775a51
Merge branch 'main' into feat/opt_flow_saving
HYLcool Dec 16, 2025
fc4d0f5
Merge branch 'refs/heads/main' into feat/opt_flow_saving
HYLcool Jan 4, 2026
25472c6
Merge branch 'refs/heads/main' into feat/opt_flow_saving
HYLcool Jan 6, 2026
11c9294
* update uv.lock
HYLcool Jan 6, 2026
8d89423
* limit timm to v1.0.22 and update uv.lock
HYLcool Jan 6, 2026
606aa87
* use customized repos from org instead of personal
HYLcool Jan 6, 2026
0740aba
* fix cython building
HYLcool Jan 8, 2026
97549d1
* merge from main
HYLcool Jan 8, 2026
0fb1137
* update cuda version of the base layer
HYLcool Jan 8, 2026
decf446
* use "with" instead of decorator on function to avoid import torch w…
HYLcool Jan 9, 2026
d5364ab
Merge branch 'refs/heads/main' into feat/opt_flow_saving
HYLcool Jan 9, 2026
e443f98
* update optical flows saving to align the latest impl.
HYLcool Jan 9, 2026
10589f3
* replace uv path
HYLcool Jan 9, 2026
b626d64
+ add new arg in the subclass of video_motion_score_filter
HYLcool Jan 9, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/perf-bench.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
- name: Install data-juicer
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
run: |
docker compose exec ray-head bash -c '/root/.local/bin/uv pip install --system -e .\[all\]'
docker compose exec ray-head bash -c 'uv pip install --system -e .\[all\]'
- name: Clean dataset cache
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
Expand Down
10 changes: 5 additions & 5 deletions .github/workflows/unit-test-partial.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ jobs:
- name: Install data-juicer
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
run: |
docker compose exec ray-head bash -c '/root/.local/bin/uv pip install --system -e .\[all\]'
docker compose exec ray-head bash -c 'UV_HTTP_TIMEOUT=3600 uv pip install --system -e .\[all\]'
- name: Print Pip Dependency Tree
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
run: |
docker compose exec ray-head bash -c '/root/.local/bin/uv pip install --system pipdeptree'
docker compose exec ray-head bash -c 'uv pip install --system pipdeptree'
docker compose exec ray-head bash -c 'pipdeptree'
- name: Clean dataset cache
Expand Down Expand Up @@ -90,8 +90,8 @@ jobs:
- name: Install data-juicer
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
run: |
docker compose exec ray-head bash -c '/root/.local/bin/uv pip install --system -e .\[all\]'
docker compose exec ray-worker bash -c '/root/.local/bin/uv pip install --system -e .\[all\]'
docker compose exec ray-head bash -c 'UV_HTTP_TIMEOUT=3600 uv pip install --system -e .\[all\]'
docker compose exec ray-worker bash -c 'UV_HTTP_TIMEOUT=3600 uv pip install --system -e .\[all\]'
- name: Clean dataset cache
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
Expand Down Expand Up @@ -140,7 +140,7 @@ jobs:
- name: Install coverage
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
run: |
docker compose exec ray-head bash -c '/root/.local/bin/uv pip install --system coverage'
docker compose exec ray-head bash -c 'uv pip install --system coverage'
- name: Download Coverage Report Standalone
uses: actions/download-artifact@v4
Expand Down
12 changes: 6 additions & 6 deletions .github/workflows/unit-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ jobs:
- name: Install data-juicer
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
run: |
docker compose exec ray-head bash -c '/root/.local/bin/uv pip install --system -e .\[all\]'
docker compose exec ray-worker bash -c '/root/.local/bin/uv pip install --system -e .\[all\]'
docker compose exec ray-head bash -c 'uv pip install --system -e .\[all\]'
docker compose exec ray-worker bash -c 'uv pip install --system -e .\[all\]'
- name: Print Pip Dependency Tree
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
run: |
docker compose exec ray-head bash -c '/root/.local/bin/uv pip install --system pipdeptree'
docker compose exec ray-head bash -c 'uv pip install --system pipdeptree'
docker compose exec ray-head bash -c 'pipdeptree'
- name: Clean dataset cache
Expand Down Expand Up @@ -87,8 +87,8 @@ jobs:
- name: Install data-juicer
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
run: |
docker compose exec ray-head bash -c '/root/.local/bin/uv pip install --system -e .\[all\]'
docker compose exec ray-worker bash -c '/root/.local/bin/uv pip install --system -e .\[all\]'
docker compose exec ray-head bash -c 'uv pip install --system -e .\[all\]'
docker compose exec ray-worker bash -c 'uv pip install --system -e .\[all\]'
- name: Clean dataset cache
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
Expand Down Expand Up @@ -139,7 +139,7 @@ jobs:
- name: Install coverage
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
run: |
docker compose exec ray-head bash -c '/root/.local/bin/uv pip install --system coverage'
docker compose exec ray-head bash -c 'uv pip install --system coverage'
- name: Download Coverage Report Standalone
uses: actions/download-artifact@v4
Expand Down
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,8 @@ perf_bench_data/

# env file
.env

# cython outputs
/data_juicer/ops/deduplicator/minhash.cpython-*
/data_juicer/ops/deduplicator/tokenize.c
/data_juicer/ops/deduplicator/tokenize.cpython-*
32 changes: 26 additions & 6 deletions .pre-commit-hooks/build_op_doc.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,12 +340,14 @@ def get_op_list_from_code():
# get docs for formatters first
op_record_list = get_op_list_from_code_for_formatter()
# get docs for other ops
op_num_dict = {}
for type in os.listdir(OP_CODE_PREFIX):
if type in OP_EXCLUDE:
continue
type_dir = os.path.join(OP_CODE_PREFIX, type)
if os.path.isfile(type_dir):
continue
op_num_dict[type] = 0
for op in os.listdir(type_dir):
if op in OP_EXCLUDE:
continue
Expand All @@ -369,8 +371,9 @@ def get_op_list_from_code():
ref=ref_link(op.replace(".py", "")),
)
)
op_num_dict[type] += 1
op_record_list.sort(key=lambda record: (record.type, record.name))
return op_record_list
return op_record_list, op_num_dict


def generate_new_doc(op_record_list, old_op_record_list):
Expand Down Expand Up @@ -516,6 +519,22 @@ def get_op_desc_in_en_zh_batched(descs):
return zhs


def parse_op_num_from_doc(doc_content):
pattern = r"\| +(.*?) +\| +(.*?) +\| +(.*?) +\|"
link_pattern = r"\[(.*?)\]\(.*\)"
overview_section = doc_content.split("## Overview 概览")[1].split("##")[0]
res = re.findall(pattern, overview_section)
num_dict = {}
for type, num, desc in res:
if type == "Type 类型":
continue
type = re.findall(link_pattern, type)[0]
if type == "formatter":
continue
num_dict[type] = int(num)
return num_dict


def parse_op_record_from_current_doc():
"""
Parse the old-version OP records from the existing OP doc.
Expand All @@ -527,6 +546,7 @@ def parse_op_record_from_current_doc():
op_record_list = []
with open(DOC_PATH, "r", encoding="utf-8") as fin:
content = fin.read()
op_num_dict = parse_op_num_from_doc(content)
res = re.findall(tab_pattern, content)
for name, tags, desc, info, ref in res:
# skip table header
Expand All @@ -553,9 +573,9 @@ def parse_op_record_from_current_doc():
)
)
op_record_list.sort(key=lambda record: (record.type, record.name))
return op_record_list
return op_record_list, op_num_dict
else:
return []
return [], {}


def check_and_update_op_record(old_op_record_list, new_op_record_list):
Expand Down Expand Up @@ -620,11 +640,11 @@ def check_and_update_op_record(old_op_record_list, new_op_record_list):


def main():
old_op_record_list = parse_op_record_from_current_doc()
new_op_record_list = get_op_list_from_code()
old_op_record_list, old_op_num_dict = parse_op_record_from_current_doc()
new_op_record_list, new_op_num_dict = get_op_list_from_code()
updated_op_record_list = check_and_update_op_record(old_op_record_list, new_op_record_list)
# if the doc is changed, exit with non-zero value
if old_op_record_list == updated_op_record_list:
if new_op_num_dict == old_op_num_dict and old_op_record_list == updated_op_record_list:
exit(0)
else:
generate_new_doc(updated_op_record_list, old_op_record_list)
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# The data-juicer image includes all open-source contents of data-juicer,
# and it will be installed in editable mode.

FROM nvidia/cuda:12.4.1-cudnn-devel-ubuntu22.04
FROM nvidia/cuda:12.6.3-cudnn-devel-ubuntu24.04

# change to aliyun source
RUN sed -i 's/archive.ubuntu.com/mirrors.aliyun.com/g' /etc/apt/sources.list \
Expand Down
2 changes: 2 additions & 0 deletions data_juicer/ops/filter/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from .video_duration_filter import VideoDurationFilter
from .video_frames_text_similarity_filter import VideoFramesTextSimilarityFilter
from .video_motion_score_filter import VideoMotionScoreFilter
from .video_motion_score_ptlflow_filter import VideoMotionScorePtlflowFilter
from .video_motion_score_raft_filter import VideoMotionScoreRaftFilter
from .video_nsfw_filter import VideoNSFWFilter
from .video_ocr_area_ratio_filter import VideoOcrAreaRatioFilter
Expand Down Expand Up @@ -103,6 +104,7 @@
"VideoDurationFilter",
"VideoFramesTextSimilarityFilter",
"VideoMotionScoreFilter",
"VideoMotionScorePtlflowFilter",
"VideoMotionScoreRaftFilter",
"VideoNSFWFilter",
"VideoOcrAreaRatioFilter",
Expand Down
54 changes: 27 additions & 27 deletions data_juicer/ops/filter/llm_perplexity_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,34 +65,34 @@ def __init__(
self.model_params = model_params
self.model_key = prepare_model(model_type="huggingface", pretrained_model_name_or_path=hf_model, **model_params)

@torch.no_grad()
def _loss(self, example, pre_example=None, rank=None):
model, tokenizer = get_model(self.model_key, rank, self.use_cuda())
model.eval()
tokenizer.pad_token = tokenizer.pad_token or tokenizer.eos_token
tokenizer.padding_side = "left"
tokenizer.truncation_side = "left"

pre_msgs = pre_example["messages"] if pre_example is not None else []
msgs = pre_msgs + example["messages"]
# TODO: chat template
full_text = " ".join([msg["content"] for msg in msgs]).strip()
response_text = msgs[-1]["content"].strip()
max_length = self.model_params.get("max_length", None)
full_tokenized = tokenizer(full_text, max_length=max_length, truncation=True, return_tensors="pt")
input_ids = full_tokenized["input_ids"]
response_ids = tokenizer(response_text, max_length=max_length, truncation=True, return_tensors="pt")[
"input_ids"
][0]
response_len = len(response_ids) - int(tokenizer.bos_token_id is not None)
labels = input_ids.clone()
labels[0, :-response_len] = -100

input_ids = input_ids.to(model.device)
labels = labels.to(model.device)
loss = model(input_ids=input_ids, labels=labels).loss.item()

return loss
with torch.no_grad():
model, tokenizer = get_model(self.model_key, rank, self.use_cuda())
model.eval()
tokenizer.pad_token = tokenizer.pad_token or tokenizer.eos_token
tokenizer.padding_side = "left"
tokenizer.truncation_side = "left"

pre_msgs = pre_example["messages"] if pre_example is not None else []
msgs = pre_msgs + example["messages"]
# TODO: chat template
full_text = " ".join([msg["content"] for msg in msgs]).strip()
response_text = msgs[-1]["content"].strip()
max_length = self.model_params.get("max_length", None)
full_tokenized = tokenizer(full_text, max_length=max_length, truncation=True, return_tensors="pt")
input_ids = full_tokenized["input_ids"]
response_ids = tokenizer(response_text, max_length=max_length, truncation=True, return_tensors="pt")[
"input_ids"
][0]
response_len = len(response_ids) - int(tokenizer.bos_token_id is not None)
labels = input_ids.clone()
labels[0, :-response_len] = -100

input_ids = input_ids.to(model.device)
labels = labels.to(model.device)
loss = model(input_ids=input_ids, labels=labels).loss.item()

return loss

def sample_with_messages(self, sample, system_prompt=None):
if "messages" in sample:
Expand Down
45 changes: 39 additions & 6 deletions data_juicer/ops/filter/video_motion_score_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import numpy as np
from pydantic import PositiveFloat, PositiveInt

from data_juicer.utils.constant import Fields, StatsKeys
from data_juicer.utils.constant import Fields, MetaKeys, StatsKeys
from data_juicer.utils.lazy_loader import LazyLoader
from data_juicer.utils.mm_utils import calculate_resized_dimensions

Expand All @@ -28,7 +28,7 @@ def VideoCapture(*args, **kwargs):
@UNFORKABLE.register_module(OP_NAME)
@OPERATORS.register_module(OP_NAME)
class VideoMotionScoreFilter(Filter):
"""Filter to keep samples with video motion scores within a specific range.
"""Filter to keep samples with video motion scores from OpenCV within a specific range.

The operator uses Farneback's algorithm from OpenCV to compute dense optical flow. It
calculates the average motion score for each video and retains samples based on the
Expand Down Expand Up @@ -58,6 +58,8 @@ def __init__(
divisible: PositiveInt = 1,
relative: bool = False,
any_or_all: str = "any",
if_output_optical_flow: bool = False,
optical_flow_key: str = MetaKeys.video_optical_flow,
*args,
**kwargs,
):
Expand Down Expand Up @@ -87,6 +89,11 @@ def __init__(
all videos. 'any': keep this sample if any videos meet the
condition. 'all': keep this sample only if all videos meet the
condition.
:param if_output_optical_flow: Determines whether to output
the computed optical flows into the metas. The optical flows for each
video will be stored in the shape of (num_frame, H, W, 2)
:param optical_flow_key: The field name to store the optical flows. It's
"video_optical_flow" in default.
:param args: extra args
:param kwargs: extra args
"""
Expand Down Expand Up @@ -117,6 +124,12 @@ def __init__(
raise ValueError(f"Keep strategy [{any_or_all}] is not supported. " f'Can only be one of ["any", "all"].')
self.any = any_or_all == "any"

self.if_output_optical_flow = if_output_optical_flow
self.optical_flow_key = optical_flow_key

# setup model
self.model = None

def setup_model(self, rank=None):
self.model = cv2.calcOpticalFlowFarneback

Expand Down Expand Up @@ -150,29 +163,42 @@ def compute_stats_single(self, sample, rank=None, context=False):
all_videos_frames = sample[self.frame_field]
num_videos = len(all_videos_frames)
unique_motion_scores = {}
video_optical_flows = {}
for video_idx in range(num_videos):
unique_motion_scores[video_idx] = self._compute_motion_scores_from_frames(all_videos_frames[video_idx])
unique_motion_scores[video_idx], video_optical_flows[video_idx] = (
self._compute_motion_scores_from_frames(all_videos_frames[video_idx])
)

sample[Fields.stats][StatsKeys.video_motion_score] = [
unique_motion_scores.get(idx, -1) for idx in range(num_videos)
]
if self.if_output_optical_flow:
sample[Fields.meta][self.optical_flow_key] = [
video_optical_flows.get(idx, -1) for idx in range(num_videos)
]
else:
# Read videos and compute motion scores
loaded_video_keys = sample[self.video_key]
unique_motion_scores = {}
video_optical_flows = {}
for video_key in loaded_video_keys:
# skip duplicate videos
if video_key in unique_motion_scores:
continue
unique_motion_scores[video_key] = self._compute_motion_scores_from_video(video_key)
unique_motion_scores[video_key], video_optical_flows[video_key] = (
self._compute_motion_scores_from_video(video_key)
)

sample[Fields.stats][StatsKeys.video_motion_score] = [
unique_motion_scores.get(key, -1) for key in sample[self.video_key]
]
if self.if_output_optical_flow:
sample[Fields.meta][self.optical_flow_key] = [video_optical_flows[key] for key in loaded_video_keys]
return sample

def _compute_motion_scores_from_frames(self, frames):
video_motion_scores = []
optical_flows = []
prev_frame = None
for frame in frames:
if isinstance(frame, bytes):
Expand All @@ -190,16 +216,20 @@ def _compute_motion_scores_from_frames(self, frames):
flow, prev_frame = self.compute_flow(prev_frame, frame)
if flow is None:
continue
optical_flows.append(flow)
mag, _ = cv2.cartToPolar(flow[..., 0], flow[..., 1])
frame_motion_score = np.mean(mag)
if self.relative:
frame_motion_score /= np.hypot(*frame.shape[:2])
video_motion_scores.append(float(frame_motion_score))

return np.mean(video_motion_scores or [-1])
res_optical_flow = np.stack(optical_flows).tolist() if optical_flows else []

return np.mean(video_motion_scores or [-1]), res_optical_flow

def _compute_motion_scores_from_video(self, video_key):
video_motion_scores = []
optical_flows = []
with VideoCapture(video_key) as cap:
if cap.isOpened():
fps = cap.get(cv2.CAP_PROP_FPS)
Expand Down Expand Up @@ -229,6 +259,7 @@ def _compute_motion_scores_from_video(self, video_key):
flow, prev_frame = self.compute_flow(prev_frame, frame)
if flow is None:
continue
optical_flows.append(flow)
mag, _ = cv2.cartToPolar(flow[..., 0], flow[..., 1])
frame_motion_score = np.mean(mag)
if self.relative:
Expand All @@ -239,7 +270,9 @@ def _compute_motion_scores_from_video(self, video_key):
frame_count += sampling_step
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_count)

return np.mean(video_motion_scores or [-1])
res_optical_flow = np.stack(optical_flows).tolist() if optical_flows else []

return np.mean(video_motion_scores or [-1]), res_optical_flow

def process_single(self, sample):
video_motion_scores = sample[Fields.stats][StatsKeys.video_motion_score]
Expand Down
Loading