Skip to content
Open
Show file tree
Hide file tree
Changes from 128 commits
Commits
Show all changes
154 commits
Select commit Hold shift + click to select a range
bd5e115
docs and utility code
cyruszhang Jul 16, 2025
db3f470
add checkpointing strategy support
cyruszhang Jul 23, 2025
d6aa8c3
use enum for strategy; use every_ops for default
cyruszhang Jul 23, 2025
98eba4b
support event_log and checkpoint directories, with proper naming and …
cyruszhang Jul 23, 2025
8668ae9
add ray_partitioned mode in process_data
cyruszhang Jul 24, 2025
d7a1275
add necessary configs for partition/checkpoint
cyruszhang Jul 24, 2025
86c6522
update event_loggin_mixin for proper formatting
cyruszhang Jul 24, 2025
20e146c
add README.md
cyruszhang Jul 24, 2025
17e0d2c
update demo yaml
cyruszhang Jul 24, 2025
c277435
add parition and intermediate_storage related config logic
cyruszhang Jul 24, 2025
20b1c06
remove export_shard_size for single file output
cyruszhang Jul 24, 2025
bda3033
fix export/logging logic in PartitionedRayExecutor
cyruszhang Jul 24, 2025
f98d7f8
add atuo partition size logic; ignore F541
cyruszhang Jul 24, 2025
116ff0e
switch back to ray_partitioned mode
cyruszhang Jul 24, 2025
0c9373a
Update data_juicer/core/executor/partition_size_optimizer.py
cyruszhang Jul 24, 2025
552527d
remove duplcate code
cyruszhang Jul 24, 2025
9d20177
rename demo config
cyruszhang Jul 24, 2025
488126c
add partition_dir; resolution logic of job_id and work_dir
cyruszhang Jul 24, 2025
3796c20
consolidate direcotry resolution logic
cyruszhang Jul 25, 2025
6685a3b
add demo code; consolidate docs
cyruszhang Jul 25, 2025
8b7d0be
restore accidentally deleted images
cyruszhang Jul 25, 2025
56e562b
use every_op for checkpointing
cyruszhang Jul 25, 2025
ab3a27e
use every_op for checkpointing
cyruszhang Jul 25, 2025
e5be57a
complete event logs
cyruszhang Jul 25, 2025
b7e731c
add utility for counting different file formats; for debugging
cyruszhang Jul 26, 2025
6cfcbf9
add support for tallying directories
cyruszhang Jul 26, 2025
d51e7d8
bugfix: handle last partition
cyruszhang Jul 26, 2025
d21b690
fix parquet writing error during re-partitioning
cyruszhang Jul 28, 2025
ad0ecd5
make arrow intermediate work
cyruszhang Jul 28, 2025
f8d5138
parquet intermediate storage size fix; get rid of fallback config logic
cyruszhang Jul 28, 2025
2930f44
defaults to 10K for parquet file size
cyruszhang Jul 28, 2025
2da9183
remove dead arrow configs; use compression
cyruszhang Jul 28, 2025
3d92ad7
add job monitoring utility
cyruszhang Jul 29, 2025
f970d1d
update count_rows utility; auto detect between directory or file
cyruszhang Jul 29, 2025
5796eaf
add monitor and stop; update event log and executor for proper event …
cyruszhang Jul 29, 2025
4b40898
update README for job monitor/stopper
cyruszhang Jul 29, 2025
4b86ce1
remove info/warn from event logs
cyruszhang Jul 30, 2025
71ddd15
remove performance/resources related event logs
cyruszhang Jul 30, 2025
205ed70
backup config logic; don't resume when config don't match
cyruszhang Jul 30, 2025
cc18176
no resumption without job_id arg
cyruszhang Jul 30, 2025
f70aca7
add re-partition log entries
cyruszhang Jul 30, 2025
5e8526e
fix event_id issue
cyruszhang Jul 31, 2025
f076708
fix: jsonl with .json ext
cyruszhang Jul 31, 2025
fdf692f
use ray_exporter for data exporting
cyruszhang Jul 31, 2025
2ecdf02
bugfix: tracer related config
cyruszhang Jul 31, 2025
bd55dba
proper handling of already done job
cyruszhang Jul 31, 2025
474ebf9
add ast support
cyruszhang Aug 1, 2025
98cc115
add dag support
cyruszhang Aug 1, 2025
83ae339
enforce job_id as last part of work path
cyruszhang Aug 4, 2025
c923357
add test case for dir resolutions
cyruszhang Aug 4, 2025
089499c
dag_execution_plan location fix
cyruszhang Aug 4, 2025
b9f7e5e
dag related events
cyruszhang Aug 4, 2025
4365f06
enable DAG in ray_executor_partitioned; update partition sizing logic
cyruszhang Aug 4, 2025
bc72ba5
enable dag and event pipeline in DefaultExecutor
cyruszhang Aug 4, 2025
e2b5720
enable DAG/event-logging in ray executor
cyruszhang Aug 4, 2025
b276c90
add test for dag
cyruszhang Aug 4, 2025
bd849bb
bugfix: missing checkpoints in manual mode
cyruszhang Aug 4, 2025
7a350bc
update config
cyruszhang Aug 4, 2025
0938676
add restart related events
cyruszhang Aug 5, 2025
39c76bb
remove enable_fault_tolerance config
cyruszhang Aug 8, 2025
30b8122
remove retries and backoff strategy configs
cyruszhang Aug 8, 2025
a2ea06e
visualize dj job
cyruszhang Aug 8, 2025
9889cab
complete resource awareness logic; fix logs and events
cyruszhang Aug 9, 2025
1a5209b
remove visualization
cyruszhang Aug 9, 2025
8491be4
add job snapshot utility
cyruszhang Aug 9, 2025
4bed148
update documentations and demos
cyruszhang Aug 9, 2025
ba11d08
update readme
cyruszhang Aug 10, 2025
2c556c0
use ray_dataset.take instead of to_pandas.to_dict for more efficiency
cyruszhang Sep 2, 2025
7b82c13
bugfix: process by ops and proper checkpointing
cyruszhang Sep 2, 2025
0b08bcd
use ray dataset repartitioning
cyruszhang Sep 2, 2025
4a2b0bf
update config for partition and resource_optimization
cyruszhang Sep 5, 2025
a7fb5e2
handle config logic
cyruszhang Sep 5, 2025
414ee63
use split for partitioning data; support size_in_mb and rows options …
cyruszhang Sep 5, 2025
d483245
more changes
cyruszhang Sep 15, 2025
b977017
temp fix for filter_with_union_find stuck
cyruszhang Sep 16, 2025
a5579f1
Add materialize prior to global op convergence to avoid high backpres…
cyruszhang Sep 16, 2025
5c33e26
remove final checkpoint
cyruszhang Sep 16, 2025
aed499d
back to barebone for deadlock issue resolution
cyruszhang Sep 17, 2025
e283461
checkpointing support with auto-resumption
cyruszhang Sep 17, 2025
4720be6
job resumption logic
cyruszhang Sep 17, 2025
fb29fa7
add manual/auto partition sizing mode; fix subtle bug with job resump…
cyruszhang Sep 17, 2025
e195d98
merge master
cyruszhang Sep 18, 2025
469f10a
fix config bug
cyruszhang Sep 18, 2025
24463e2
re-enable DAG
cyruszhang Sep 18, 2025
40bcba2
add tests
cyruszhang Sep 18, 2025
2742b24
enable DAG monitoring
cyruszhang Sep 18, 2025
1f6ae68
temp directory cleanup
cyruszhang Sep 19, 2025
3ede5d7
temp directory logic
cyruszhang Sep 22, 2025
ff09274
Merge branch 'main' into feat/cyrusz/partition
cyruszhang Sep 22, 2025
939f5c4
fix config for artifically introduced namespace fields
cyruszhang Oct 1, 2025
b055cf1
fix factory return types
cyruszhang Oct 2, 2025
3e28558
Merge branch 'main' into feat/cyrusz/partition
cyruszhang Oct 24, 2025
1f5cd3f
add perf_bench_data to gitignore
cyruszhang Oct 30, 2025
3e2e7ed
DataJuicer -> Data-Juicer in docs
cyruszhang Oct 30, 2025
8cf3dfb
job_id logic fix up
cyruszhang Oct 30, 2025
ad19a31
use work_dir instead of job_dir
cyruszhang Nov 3, 2025
071eeb9
add .env to .gitignore
cyruszhang Nov 10, 2025
c9acb27
remove unnecessary version
cyruszhang Nov 10, 2025
54cb96c
fix test file naming; remove unused logger
cyruszhang Nov 10, 2025
1946cd7
support get_dag_node_id
cyruszhang Nov 10, 2025
fe9f6d8
Merge branch 'main' into feat/cyrusz/partition
cyruszhang Nov 12, 2025
4779678
merge main
cyruszhang Nov 20, 2025
76744d6
get rid of obsolete diagram generation script
cyruszhang Nov 20, 2025
4e96910
fix config related test cases and bugs
cyruszhang Nov 20, 2025
0b4d6cb
use ExecutorBase
cyruszhang Nov 20, 2025
a3ffa32
remove adapter from ray executor
cyruszhang Nov 20, 2025
2b9049d
consolidate op dependency
cyruszhang Nov 21, 2025
37a04a7
consolidate get_node_status
cyruszhang Nov 21, 2025
dc818ad
remove duplicate tracer code
cyruszhang Nov 21, 2025
35ab302
ckpt_dir duplicate
cyruszhang Nov 21, 2025
9b04891
refactor checkpoint manager
cyruszhang Nov 21, 2025
e3bb2a2
handle dataset/datasetpath
cyruszhang Nov 21, 2025
42ca4c7
import ray only when necessary
cyruszhang Nov 21, 2025
9ec1e67
consolidate logging logic into mixin
cyruszhang Nov 21, 2025
0022f2f
remove AST logic
cyruszhang Nov 21, 2025
9b86c9f
Merge branch 'main' into feat/cyrusz/partition
cyruszhang Nov 21, 2025
5bd0e36
return proper dataset
cyruszhang Nov 21, 2025
f7c2f44
proper handling of returned datasets; enclose with logging
cyruszhang Nov 21, 2025
60ad34b
use proper keys
cyruszhang Nov 21, 2025
4987c08
update rayexporter to support s3 and extra configs
cyruszhang Nov 21, 2025
afd4dbb
remove duplicate code
cyruszhang Nov 21, 2025
d74e34f
fix is_global_operation
cyruszhang Nov 21, 2025
4bf01a7
move documentation
cyruszhang Nov 21, 2025
27b5157
run demo from root
cyruszhang Nov 21, 2025
5129e92
log -> logs
cyruszhang Nov 21, 2025
e40665b
remove AST tests
cyruszhang Nov 22, 2025
9ca5ba5
update control config
cyruszhang Nov 22, 2025
91fbaae
adopt user defined dirs
cyruszhang Nov 22, 2025
9cf179d
merge main
cyruszhang Jan 6, 2026
e227a8d
fix: align job utilities with timestamped event logs and improve CLI …
cyruszhang Jan 6, 2026
550b3f1
Refactored PartitionedDAGStrategy
cyruszhang Jan 6, 2026
b5fba7e
remove old global op logic in DAG
cyruszhang Jan 6, 2026
fbb467a
Replaced all hasattr() checks with getattr() patterns across DAG files
cyruszhang Jan 6, 2026
34dc5dd
refactor(optimizer): improve partition size optimizer accuracy and fl…
cyruszhang Jan 6, 2026
a59a1d3
fix: Use RayDataset.get() for sampling instead of take()
cyruszhang Jan 6, 2026
de855cb
fix DAG initialization order; fix metrics issue for partitions
cyruszhang Jan 7, 2026
8f547c5
refactor: Clean up metrics logging to remove meaningless zeros
cyruszhang Jan 7, 2026
cf85aff
fix: Only show row counts for first/last ops in checkpoint group
cyruszhang Jan 7, 2026
ea2bd4d
Fix: Add real metrics tracking to non-partitioned RayExecutor; Update…
cyruszhang Jan 7, 2026
ee5e75d
fix: use .get() for optional metrics in DAG monitoring
cyruszhang Jan 7, 2026
c7c2a34
Feat: Add configurable partition target size
cyruszhang Jan 7, 2026
ce379e2
minor: move demo configs
cyruszhang Jan 7, 2026
d37c35c
Update test_ray_executor_partitioned.py for current API
cyruszhang Jan 7, 2026
d1db0f5
fix hatch_build.py for macos compatibility
cyruszhang Jan 7, 2026
6e3d1a8
Fix: Update test_dag.py to use existing operators
cyruszhang Jan 7, 2026
837d8ad
Fix: Handle empty datasets in Ray processing and export
cyruszhang Jan 7, 2026
0316659
graceful failing when EGL lib not available
cyruszhang Jan 8, 2026
8d65029
merge main
cyruszhang Jan 12, 2026
cb76a43
Merge branch 'main' into feat/cyrusz/partition
cyruszhang Jan 14, 2026
12a789c
Merge branch 'main' into feat/cyrusz/partition
cyruszhang Jan 15, 2026
3a0ff19
add pytest config
cyruszhang Jan 15, 2026
97837dc
add boto3 lib lazy loading details
cyruszhang Jan 15, 2026
53a3e95
add cuda tagging for test
cyruszhang Jan 15, 2026
d391c33
merge master
cyruszhang Jan 20, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ Besides, our paper is also updated to [v3](https://arxiv.org/abs/2309.02033).
- [How-to Guide for Developers](docs/DeveloperGuide.md)
- [Distributed Data Processing in Data-Juicer](docs/Distributed.md)
- [Sandbox](docs/Sandbox.md)
- [Job Management & Monitoring](docs/JobManagement.md)
- [Data-Juicer Agent](docs/DJ_agent.md)
- Demos
- [demos](demos/README.md)
Expand All @@ -141,6 +142,10 @@ Besides, our paper is also updated to [v3](https://arxiv.org/abs/2309.02033).
- [Postprocess Tools](tools/postprocess/README.md)
- [Preprocess Tools](tools/preprocess/README.md)
- [Data Scoring](tools/quality_classifier/README.md)
- Job Management & Monitoring
- [Processing Snapshot Utility](data_juicer/utils/job/snapshot.py) - Comprehensive job status analysis with JSON output
- [Job Management Tools](data_juicer/utils/job/) - Monitor and manage Data-Juicer processing jobs
- [Resource-Aware Partitioning](data_juicer/core/executor/partition_size_optimizer.py) - Automatic resource optimization for distributed processing
- Third-party
- [LLM Ecosystems](thirdparty/LLM_ecosystems/README.md)
- [Third-party Model Library](thirdparty/models/README.md)
Expand Down
5 changes: 5 additions & 0 deletions README_ZH.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ Data-Juicer 现采用 AI 自动重写和优化算子的 docstring,并生成详
- [开发者指南](docs/DeveloperGuide_ZH.md)
- [Data-Juicer分布式数据处理](docs/Distributed_ZH.md)
- [沙盒实验室](docs/Sandbox_ZH.md)
- [作业管理与监控](docs/JobManagement_ZH.md)
- [Data-Juicer Agent](docs/DJ_agent_ZH.md)
- Demos
- [演示](demos/README_ZH.md)
Expand All @@ -136,6 +137,10 @@ Data-Juicer 现采用 AI 自动重写和优化算子的 docstring,并生成详
- [后处理工具](tools/postprocess/README_ZH.md)
- [预处理工具](tools/preprocess/README_ZH.md)
- [给数据打分](tools/quality_classifier/README_ZH.md)
- 作业管理与监控
- [处理快照工具](data_juicer/utils/job/snapshot.py) - 提供JSON格式的全面作业状态分析
- [作业管理工具](data_juicer/utils/job/) - 监控和管理Data-Juicer处理作业
- [资源感知分区](data_juicer/core/executor/partition_size_optimizer.py) - 分布式处理的自动资源优化
- 第三方
- [大语言模型生态](thirdparty/LLM_ecosystems/README_ZH.md)
- [第三方模型库](thirdparty/models/README_ZH.md)
Expand Down
89 changes: 89 additions & 0 deletions configs/demo/partition-checkpoint-eventlog-control.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# =============================================================================
# CONTROL CONFIG FOR partition-checkpoint-eventlog.yaml
# =============================================================================
# This is a control configuration file for partition-checkpoint-eventlog.yaml
# that uses the non-partitioned Ray executor (executor_type: "ray") instead of
# the partitioned executor (executor_type: "ray_partitioned").
#
# This config is useful for:
# 1. Comparing performance between partitioned and non-partitioned executors
# 2. Testing DAG execution without partitioning
# 3. Simpler execution flow without partition management
#
# Key differences from partition-checkpoint-eventlog.yaml:
# - executor_type: "ray" (instead of "ray_partitioned")
# - No partition configuration needed
# - Simpler execution model (no partition splitting/merging)
# =============================================================================

dataset_path: './demos/data/demo-dataset.jsonl'

work_dir: "./outputs/partition-checkpoint-eventlog/{job_id}"
export_path: '{work_dir}/processed.jsonl'
np: 8

executor_type: "ray" # Non-partitioned Ray executor (control config)
ray_address: "auto"

# Process pipeline with real DataJuicer operations
process:
# Text cleaning operations
- clean_links_mapper:
text_key: "text"
min_links: 0
max_links: 10

- clean_email_mapper:
text_key: "text"
min_emails: 0
max_emails: 5

- whitespace_normalization_mapper:
text_key: "text"

- fix_unicode_mapper:
text_key: "text"

# Text filtering operations
- text_length_filter:
text_key: "text"
min_len: 5
max_len: 10000

- alphanumeric_filter:
text_key: "text"
min_ratio: 0.1

# Quality filtering
- character_repetition_filter:
text_key: "text"
min_ratio: 0.0
max_ratio: 0.5

- word_repetition_filter:
text_key: "text"
min_ratio: 0.0
max_ratio: 0.5

- ray_bts_minhash_deduplicator:
tokenization: 'character'
lowercase: true
union_find_parallel_num: 2

# Export configuration
export_in_parallel: true
keep_stats_in_res_ds: true
keep_hashes_in_res_ds: true

# =============================================================================
# USAGE:
# =============================================================================
# This control config uses the non-partitioned Ray executor for comparison.
# To use this config:
#
# dj-process --config configs/demo/partition-checkpoint-eventlog-control.yaml
#
# For the partitioned executor version, use:
# dj-process --config configs/demo/partition-checkpoint-eventlog.yaml
#
# =============================================================================
155 changes: 155 additions & 0 deletions configs/demo/partition-checkpoint-eventlog.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
# =============================================================================
# COMPREHENSIVE DATAJUICER DEMO: Checkpointing, Event Logging & Job Management
# =============================================================================
# This demo showcases:
# 1. Configurable checkpointing strategies
# 2. Event logging with job-specific directories
# 3. Flexible storage architecture
# 4. Job resumption capabilities
# 5. Real DataJuicer operations
# =============================================================================

# Data location configuration (Mandatory)
dataset_path: './demos/data/demo-dataset.jsonl'

# Work directory configuration
# IMPORTANT: If using {job_id} placeholder, it MUST be the last part of the path
# Examples:
# ✅ work_dir: "./outputs/my_project/{job_id}" # Valid
# ✅ work_dir: "/data/experiments/{job_id}" # Valid
# ❌ work_dir: "./outputs/{job_id}/results" # Invalid - {job_id} not at end
# ❌ work_dir: "./{job_id}/outputs/data" # Invalid - {job_id} not at end
#
# If no {job_id} is specified, job_id will be automatically appended:
# work_dir: "./outputs/my_project" → job_dir: "./outputs/my_project/20250804_143022_abc123"
work_dir: "./outputs/partition-checkpoint-eventlog/{job_id}"
export_path: '{work_dir}/processed.jsonl'

# Executor configuration
executor_type: "ray_partitioned" # Use our enhanced partitioned executor
ray_address: "auto"
# np will be auto-configured based on available cluster resources when partition.auto_configure: true
# np: 2 # Number of Ray workers (auto-configured when partition.auto_configure: true)

# Separate storage configuration
# Partition directory (Optional) is used to store the partitions of the dataset if using ray_partitioned executor
partition_dir: "{work_dir}/partitions"

# Event logs: Fast storage (SSD, local disk) - small files, frequent writes (Optional)
event_log_dir: "{work_dir}/event_logs" # Optional: separate fast storage for event logs

# Checkpoints: Large storage (HDD, network storage) - large files, infrequent writes (Optional)
checkpoint_dir: "{work_dir}/checkpoints" # Optional: separate large storage for checkpoints


# Partition configuration
partition:
mode: "manual" # Auto partition mode - optimal partitioning
num_of_partitions: 4 # Number of partitions to create


# Checkpoint configuration
checkpoint:
enabled: false
strategy: "every_n_ops"
n_ops: 3
# strategy: "every_op" # every_op, every_partition, every_n_ops, manual, disabled
# n_ops: 1 # Number of operations between checkpoints (for every_n_ops strategy)
# op_names: [] # Specific operation names to checkpoint after (for manual strategy)

# Intermediate storage configuration (includes file lifecycle management)
intermediate_storage:
format: "parquet" # parquet, arrow, jsonl; defaults to parquet
write_partitions: false

# Event logging configuration
event_logging:
enabled: true

# Process pipeline with real DataJuicer operations
process:
# Text cleaning operations
- clean_links_mapper:
text_key: "text"
min_links: 0
max_links: 10

- clean_email_mapper:
text_key: "text"
min_emails: 0
max_emails: 5

- whitespace_normalization_mapper:
text_key: "text"

- fix_unicode_mapper:
text_key: "text"

# Text filtering operations
- text_length_filter:
text_key: "text"
min_len: 5
max_len: 10000

- alphanumeric_filter:
text_key: "text"
min_ratio: 0.1

# Quality filtering
- character_repetition_filter:
text_key: "text"
min_ratio: 0.0
max_ratio: 0.5

- word_repetition_filter:
text_key: "text"
min_ratio: 0.0
max_ratio: 0.5

- ray_bts_minhash_deduplicator:
tokenization: 'character'
lowercase: true
union_find_parallel_num: 2

# Export configuration
export_in_parallel: true
keep_stats_in_res_ds: true
keep_hashes_in_res_ds: true


# =============================================================================
# COMPLETE USER EXPERIENCE:
# =============================================================================
# 1. Start job:
# dj-process --config configs/demo/partition-checkpoint-eventlog.yaml
# # Output shows: Job ID (timestamp_configname_suffix), job directory, resumption command
# # Example: 20241201_143022_partition-checkpoint-eventlog_abc123
#
# 2. If job fails, resume with:
# dj-process --config configs/demo/partition-checkpoint-eventlog.yaml --job_id <job_id>
# # System validates job_id and shows previous status
#
# 3. Directory structure (flexible storage):
# outputs/partition-checkpoint-eventlog/{job_id}/
# ├── partitions/ # Dataset partitions (large files)
# ├── checkpoints/ # Operation checkpoints (large files)
# ├── event_logs/ # Event logs (small files, frequent writes)
# ├── metadata/ # Job metadata and mapping
# ├── results/ # Final processed dataset
# └── processed.jsonl # Final output file
#
# 4. Resource Optimization:
# - partition.mode: "auto" automatically optimizes:
# * Partition size based on data characteristics and available memory
# * Number of partitions based on dataset size and optimal partition size
# * Worker count (np) based on available CPU cores
# * Processing efficiency based on data modality (text, image, audio, video)
# - No manual tuning required - system adapts to your hardware and data
#
# 5. Monitoring and Debugging:
# - Real-time event logs in event_logs/ directory
# - Processing summary with statistics and timing
# - Checkpoint recovery for fault tolerance
# - Detailed resource utilization analysis
#
# =============================================================================
6 changes: 6 additions & 0 deletions data_juicer/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
merge_config,
prepare_cfgs_for_export,
prepare_side_configs,
resolve_job_directories,
resolve_job_id,
update_op_attr,
validate_work_dir_config,
)

__all__ = [
Expand All @@ -18,4 +21,7 @@
"get_default_cfg",
"prepare_cfgs_for_export",
"update_op_attr",
"validate_work_dir_config",
"resolve_job_id",
"resolve_job_directories",
]
Loading
Loading