Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions data_juicer/core/data/dj_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ class DJDataset(ABC):
def process(self, operators, *, exporter=None, checkpointer=None, tracer=None) -> DJDataset: # TODO: add type hint
"""process a list of operators on the dataset."""

@abstractmethod
def process_parallel(self, operators, *, exporter=None, checkpointer=None, tracer=None) -> DJDataset:
"""Implementing op parallel data processing based on Ray Actor"""

@abstractmethod
def schema(self) -> Schema:
"""Get dataset schema.
Expand Down
518 changes: 514 additions & 4 deletions data_juicer/core/data/ray_dataset.py

Large diffs are not rendered by default.

11 changes: 10 additions & 1 deletion data_juicer/core/executor/ray_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ def __init__(self, cfg: Optional[Namespace] = None):

# init ray
logger.info("Initializing Ray ...")

ray.init(self.cfg.ray_address, ignore_reinit_error=True)

self.tmp_dir = os.path.join(self.work_dir, ".tmp", ray.get_runtime_context().get_job_id())

# absolute path resolution logic
Expand All @@ -73,6 +75,8 @@ def __init__(self, cfg: Optional[Namespace] = None):
keep_hashes_in_res_ds=self.cfg.keep_hashes_in_res_ds,
**self.cfg.export_extra_args,
)
self.op_enable_parallel = True
# self.op_enable_parallel = False

def run(self, load_data_np: Optional[PositiveInt] = None, skip_return=False):
"""
Expand All @@ -84,7 +88,9 @@ def run(self, load_data_np: Optional[PositiveInt] = None, skip_return=False):
"""
# 1. load data
logger.info("Loading dataset with Ray...")
dstart = time.time()
dataset = self.datasetbuilder.load_dataset(num_proc=load_data_np)
logger.info(f"Data loading in {time.time() - dstart:.3f}")
columns = dataset.schema().columns

# 2. extract processes
Expand All @@ -99,7 +105,10 @@ def run(self, load_data_np: Optional[PositiveInt] = None, skip_return=False):
# 3. data process
logger.info("Processing data...")
tstart = time.time()
dataset.process(ops)
if self.op_enable_parallel:
dataset.process_parallel(ops)
else:
dataset.process(ops)

# 4. data export
logger.info("Exporting dataset to disk...")
Expand Down
120 changes: 120 additions & 0 deletions data_juicer/core/ray_actor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
from functools import partial
import ray
import pyarrow

from data_juicer.ops.base_op import Filter, Mapper
from loguru import logger



def filter_batch(batch, filter_func):
mask = pyarrow.array(filter_func(batch.to_pydict()))
return batch.filter(mask)

@ray.remote(num_gpus=0.0)
class Actor:
def __init__(self, op, rank=None):

self.op = op
self._model_loaded = False # 标记模型是否已加载
self.rank = rank
self.model = None
self.processor = None

def load_model(self):

if self.op.use_cuda() and not self._model_loaded:

self.model, self.processor = self.op.load_model(rank=self.rank)
self._model_loaded = True

def mapper_cuda(self, data):
if not self._model_loaded:
self.load_model() # 确保调用前模型已加载
data = self.op.process_single(data, self.model, self.processor)
return data

def mapper_cuda_batched(self, data):
if not self._model_loaded:
self.load_model() # 确保调用前模型已加载
data = self.op.process_batched_actor(data, self.model, self.processor)
return data

def mapper_cpu(self, data):
# 处理数据
processed_data = self.op.process_single(data)
return processed_data

def filter_cuda_single(self, data):
if not self._model_loaded:
self.load_model()
data = self.op.compute_stats_single_actor(data, self.model, self.processor)
keep = self.op.process_single(data)

if keep:
return data
else:
return None

def filter_cuda_batched(self, data):
if not self._model_loaded:
self.load_model()
# data = self.op.compute_stats_batched(data, self.model, self.processor)
data = self.op.compute_stats_batched(data)
keep_mask = list(self.op.process_batched(data)) # 将map对象转换为列表

# 如果没有数据需要保留,返回None
if not any(keep_mask):
return None

# 根据掩码过滤数据
if isinstance(data, dict):
# 如果data是字典(假设每个key对应一个列表)
filtered_data = {
key: [value for value, keep in zip(values, keep_mask) if keep]
for key, values in data.items()
}
elif isinstance(data, list):
# 如果data是列表
filtered_data = [item for item, keep in zip(data, keep_mask) if keep]
else:
# 其他情况(如Ray Dataset的批处理)
raise ValueError("Unsupported data type for batch filtering")

return filtered_data


def filter_cpu_single(self, data):
data = self.op.compute_stats_single(data)
keep = self.op.process_single(data)
if keep:
return data
else:
return None

def filter_cpu_batched(self, data):
# data = self.op.compute_stats_batched(data, self.model, self.processor)
data = self.op.compute_stats_batched(data)
keep_mask = list(self.op.process_batched(data)) # 将map对象转换为列表

# 如果没有数据需要保留,返回None
if not any(keep_mask):
return None

# 根据掩码过滤数据
if isinstance(data, dict):
# 如果data是字典(假设每个key对应一个列表)
filtered_data = {
key: [value for value, keep in zip(values, keep_mask) if keep]
for key, values in data.items()
}
elif isinstance(data, list):
# 如果data是列表
filtered_data = [item for item, keep in zip(data, keep_mask) if keep]
else:
# 其他情况(如Ray Dataset的批处理)
raise ValueError("Unsupported data type for batch filtering")

return filtered_data


23 changes: 21 additions & 2 deletions data_juicer/ops/base_op.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import copy
from functools import wraps
import time

import numpy as np
import pyarrow as pa
Expand All @@ -8,7 +9,7 @@
from data_juicer import is_cuda_available
from data_juicer.utils.constant import Fields
from data_juicer.utils.mm_utils import size_to_bytes
from data_juicer.utils.model_utils import free_models
from data_juicer.utils.model_utils import free_models, get_model
from data_juicer.utils.process_utils import calculate_np
from data_juicer.utils.registry import Registry

Expand All @@ -19,6 +20,10 @@
ATTRIBUTION_FILTERS = Registry("Attribution Filters")


import pytz
from datetime import datetime
beijing_tz = pytz.timezone('Asia/Singapore')

def convert_list_dict_to_dict_list(samples):
# reconstruct samples from "list of dicts" to "dict of lists"
keys = samples[0].keys()
Expand Down Expand Up @@ -191,6 +196,7 @@ def __init__(self, *args, **kwargs):
self.num_proc = kwargs.get("num_proc", None)
self.cpu_required = kwargs.get("cpu_required", 1)
self.mem_required = kwargs.get("mem_required", 0)
self.gpu_required = kwargs.get("gpu_required", 1)
if isinstance(self.mem_required, str):
self.mem_required = size_to_bytes(self.mem_required) / 1024**3

Expand Down Expand Up @@ -285,7 +291,20 @@ def add_index(sample, idx):
def empty_history(self):
return np.empty((0, 0), dtype=str)


def load_model(self, rank=None):

start = time.time()
start_time = datetime.fromtimestamp(start, pytz.utc).astimezone(beijing_tz)
model, processor = get_model(self.model_key, rank=rank, use_cuda=self.use_cuda())
end = time.time()
end_time = datetime.fromtimestamp(end, pytz.utc).astimezone(beijing_tz)
print(
f"[Actor] {self._name} Model loaded in {end - start:.3f} seconds "
f"from {start_time.strftime('%Y-%m-%d %H:%M:%S')} "
f"to {end_time.strftime('%Y-%m-%d %H:%M:%S')}"
)
return model, processor

class Mapper(OP):
def __init__(self, *args, **kwargs):
"""
Expand Down
68 changes: 66 additions & 2 deletions data_juicer/ops/filter/video_aesthetics_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

OP_NAME = "video_aesthetics_filter"


@OPERATORS.register_module(OP_NAME)
@LOADED_VIDEOS.register_module(OP_NAME)
@INTER_SAMPLED_FRAMES.register_module(OP_NAME)
Expand Down Expand Up @@ -116,11 +115,76 @@ def __init__(
"" if frame_sampling_method == "all_keyframes" else f"-{frame_num}"
)

def compute_stats_single(self, sample, rank=None, context=False):
def compute_stats_single_actor(self, sample, model, processor, rank=None, context=False):
# check if it's computed already
if StatsKeys.video_frames_aesthetics_score in sample[Fields.stats]:
return sample
# there is no video in this sample
if self.video_key not in sample or not sample[self.video_key]:
sample[Fields.stats][StatsKeys.video_frames_aesthetics_score] = np.array([], dtype=np.float64)
return sample

# load videos
loaded_video_keys = sample[self.video_key]
sample, videos = load_data_with_context(sample, context, loaded_video_keys, load_video)

aesthetics_scores = []
for key, video in videos.items():
sampled_frames_key = key + self.sampled_frames_key_suffix
if video is None:
continue
elif context and sampled_frames_key in sample[Fields.context]:
# sampled frames can be found in the context
frames = sample[Fields.context][sampled_frames_key]
else:
# extract frame images
if self.frame_sampling_method == "all_keyframes":
frames = extract_key_frames(video)
elif self.frame_sampling_method == "uniform":
frames = extract_video_frames_uniformly(video, self.frame_num)
else:
frames = []

# store the sampled frames in the context
if context:
sample[Fields.context][sampled_frames_key] = frames
frame_images = [frame.to_image() for frame in frames]

if len(frame_images) > 0:
# compute aesthetics_scores
# model, processor = get_model(self.model_key, rank=rank, use_cuda=self.use_cuda())
inputs = processor(images=frame_images, return_tensors="pt").to(model.device)
with torch.no_grad():
outputs = model(**inputs)
if self.need_normalized_by_ten:
aesthetics_score = outputs.logits / 10.0
else:
aesthetics_score = outputs.logits

if self.reduce_mode == "avg":
aesthetics_score = float(aesthetics_score.mean())
elif self.reduce_mode == "max":
aesthetics_score = float(aesthetics_score.max())
else:
aesthetics_score = float(aesthetics_score.min())
else:
aesthetics_score = 0.0

aesthetics_scores.append(aesthetics_score)

logger.debug(f"aesthetics_score: {aesthetics_scores}")

sample[Fields.stats][StatsKeys.video_frames_aesthetics_score] = aesthetics_scores

if not context:
for vid_key in videos:
close_video(videos[vid_key])

return sample
def compute_stats_single(self, sample, rank=None, context=False):
# check if it's computed already
if StatsKeys.video_frames_aesthetics_score in sample[Fields.stats]:
return sample
# there is no video in this sample
if self.video_key not in sample or not sample[self.video_key]:
sample[Fields.stats][StatsKeys.video_frames_aesthetics_score] = np.array([], dtype=np.float64)
Expand Down
64 changes: 63 additions & 1 deletion data_juicer/ops/filter/video_watermark_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

OP_NAME = "video_watermark_filter"


@OPERATORS.register_module(OP_NAME)
@LOADED_VIDEOS.register_module(OP_NAME)
@INTER_SAMPLED_FRAMES.register_module(OP_NAME)
Expand Down Expand Up @@ -104,6 +103,69 @@ def __init__(
"" if frame_sampling_method == "all_keyframes" else f"-{frame_num}"
)

def compute_stats_single_actor(self, sample, model, processor, rank=None, context=False):
# check if it's computed already
if StatsKeys.video_watermark_prob in sample[Fields.stats]:
return sample

# there is no videos in this sample
if self.video_key not in sample or not sample[self.video_key]:
sample[Fields.stats][StatsKeys.video_watermark_prob] = np.array([], dtype=np.float64)
return sample

# load videos
loaded_video_keys = sample[self.video_key]
sample, videos = load_data_with_context(sample, context, loaded_video_keys, load_video)

watermark_probs = []
# model, processor = get_model(self.model_key, rank, self.use_cuda())

for video_key, video in videos.items():
sampled_frames_key = video_key + self.sampled_frames_key_suffix

# extract frame images
if context and sampled_frames_key in sample[Fields.context]:
frames = sample[Fields.context][sampled_frames_key]
else:
if self.frame_sampling_method == "all_keyframes":
frames = extract_key_frames(video)
elif self.frame_sampling_method == "uniform":
frames = extract_video_frames_uniformly(video, self.frame_num)
else:
frames = []

# store the sampled frames in the context
if context:
sample[Fields.context][sampled_frames_key] = frames

frame_images = [frame.to_image() for frame in frames]

if len(frame_images) > 0:
inputs = processor(images=frame_images, return_tensors="pt")
inputs = inputs.to(model.device)
outputs = model(**inputs)
logits = outputs.logits
cur_probs = [probs[1] for probs in torch.softmax(logits, dim=-1)]
cur_probs = torch.Tensor(cur_probs)

if self.reduce_mode == "avg":
cur_prob = cur_probs.mean()
elif self.reduce_mode == "max":
cur_prob = cur_probs.max()
else:
cur_prob = cur_probs.min()
else:
cur_prob = 0.0
watermark_probs.append(float(cur_prob))

sample[Fields.stats][StatsKeys.video_watermark_prob] = watermark_probs

if not context:
for vid_key in videos:
close_video(videos[vid_key])

return sample

def compute_stats_single(self, sample, rank=None, context=False):
# check if it's computed already
if StatsKeys.video_watermark_prob in sample[Fields.stats]:
Expand Down
Loading