Skip to content

Commit

Permalink
Refactor OP & Dataset (from #336) (#359)
Browse files Browse the repository at this point in the history
* Refactor OP & Dataset (#336)

* modelscope-sora news (#323)

* News/modelscope sora (#327)

* modelscope-sora news

* remove empower

* debug for gpu rank for analyser (#329)

* debug for gpu rank for analyser

* spec_numprocs -> num_proc

* Add more unittest  (#304)

* add unittest env with gpu

* fix unittest yml

* add environment for unittest

* update workflow trigger

* update install step

* fix install command

* update working dir

* update container

* update working dir

* change working directory

* change working directory

* change working directory

* change working directory

* change unittest

* use test tag

* finish tag support

* support run op with different executro

* fix pre-commit

* add hf mirror

* add hf mirror

* run all test in standalone mode by default

* ignore image face ratio

* update tags

* add ray testcase

* add ray test in workflow

* update ray unittest workflow

* delete old unittest

---------

Co-authored-by: root <panxuchen>

* Add source tag (#317)

* add source tag for some mapper op

* fix no attribute 'current_tag' when executing local tests

* move op process logic from executor to base op

* fix typo

* move export outside op

* init refactor

* update analyser

* fix format

* clean up

* bring back batch mapper

* Improve fault tolerance & Fix Ray executor

* fix wrapper

* fix batched filter

* Remove use_actor as it is not compatible with the refactored OP clas, unless the dataset class is refactored

* make wrappers work with unittests

* Compatible with unit tests and works with ray

* fix unittest

* fix wrappers with ray, map, filter

* unify unittests

* wrap deduplicators

* Compatible with non-batched calls

* Class-level wrappers

- compatible with dataset.filter
- bring back nested wrappers

* Instance-level wrappers

* Refined instance-level wrappers

- Remove incomplete dataset.filter wrappers
- Simplify code
- Stack wrappers

* fix use_cuda

* Refactor dataset (#348)

* refactor dataset

* update unittest with DJDataset

* fix unittest

* update ray data load

* add test

* ray read json

* update docker image version

* actor is no longer supported

* Regress filter's stats export logic

---------

Co-authored-by: BeachWang <[email protected]>
Co-authored-by: Xuchen Pan <[email protected]>
Co-authored-by: chenhesen <[email protected]>
Co-authored-by: garyzhang99 <[email protected]>

* minor fix

* fix num_proc default None

---------

Co-authored-by: Ce Ge (戈策) <[email protected]>
Co-authored-by: BeachWang <[email protected]>
Co-authored-by: Xuchen Pan <[email protected]>
Co-authored-by: chenhesen <[email protected]>
Co-authored-by: garyzhang99 <[email protected]>
Co-authored-by: null <[email protected]>
  • Loading branch information
7 people authored Jul 18, 2024
1 parent 166b4ed commit 9f97231
Show file tree
Hide file tree
Showing 96 changed files with 642 additions and 530 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/docker/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: '3'
services:
ray-head:
image: data-juicer-unittest:0.2.0
image: data-juicer-unittest:0.2.1
pull_policy: never
command: ray start --head --dashboard-host 0.0.0.0 --include-dashboard true --block
environment:
Expand Down Expand Up @@ -30,7 +30,7 @@ services:
capabilities: [gpu]

ray-worker:
image: data-juicer-unittest:0.2.0
image: data-juicer-unittest:0.2.1
pull_policy: never
command: ray start --address=ray-head:6379 --block
environment:
Expand Down
2 changes: 0 additions & 2 deletions README_ZH.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,9 @@ Data-Juicer正在积极更新和维护中,我们将定期强化和新增更多
- [2024-02-20] 我们在积极维护一份关于LLM-Data的*精选列表*,欢迎[访问](docs/awesome_llm_data.md)并参与贡献!
- [2024-02-05] 我们的论文被SIGMOD'24 industrial track接收!
- [2024-01-10] 开启“数据混合”新视界——第二届Data-Juicer大模型数据挑战赛已经正式启动!立即访问[竞赛官网](https://tianchi.aliyun.com/competition/entrance/532174),了解赛事详情。

- [2024-01-05] **Data-Juicer v0.1.3** 版本发布了。
在这个新版本中,我们支持了**更多Python版本**(3.8-3.10),同时支持了**多模态**数据集的[转换](tools/multimodal/README_ZH.md)[处理](docs/Operators_ZH.md)(包括文本、图像和音频。更多模态也将会在之后支持)!
此外,我们的论文也更新到了[第三版](https://arxiv.org/abs/2309.02033)

- [2023-10-13] 我们的第一届以数据为中心的 LLM 竞赛开始了!
请访问大赛官网,FT-Data Ranker([1B赛道](https://tianchi.aliyun.com/competition/entrance/532157)[7B赛道](https://tianchi.aliyun.com/competition/entrance/532158) ) ,了解更多信息。

Expand Down
36 changes: 6 additions & 30 deletions data_juicer/core/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,9 @@
from data_juicer.analysis import ColumnWiseAnalysis, OverallAnalysis
from data_juicer.config import init_configs
from data_juicer.format import load_formatter
from data_juicer.ops import UNFORKABLE, Filter, load_ops
from data_juicer.ops import Filter, load_ops
from data_juicer.utils import cache_utils
from data_juicer.utils.constant import Fields
from data_juicer.utils.process_utils import calculate_np, setup_mp

from .data import add_same_content_to_new_column
from .exporter import Exporter


Expand Down Expand Up @@ -84,37 +81,16 @@ def run(self, load_data_np=None, skip_export=False):
logger.info('Preparing process operators...')
self.cfg.process, self.ops = load_ops(self.cfg.process,
self.cfg.op_fusion)
unforkable_op_list = set(UNFORKABLE.modules.keys())

# 2. stats precompute only for filter ops
logger.info('Computing the stats of dataset...')
stats_collected = False
for op_cfg, op in zip(self.cfg.process, self.ops):
op_name = list(op_cfg.keys())[0]
with_rank = op.use_cuda()
if op.num_proc != 0:
op_proc = op.num_proc
logger.info(f'Op [{op_name}] running with sepcified '
f'number of procs:{op.num_proc}')
else:
op_proc = calculate_np(self.cfg.np, op, op_name)
mp_method = ['forkserver', 'spawn'
] if op_name in unforkable_op_list else None
setup_mp(mp_method)
for op in self.ops:
if isinstance(op, Filter):
if Fields.stats not in dataset.features:
# only add stats when calling filter op
dataset = dataset.map(add_same_content_to_new_column,
fn_kwargs={
'new_column_name': Fields.stats,
'initial_value': {}
},
num_proc=self.cfg.np,
desc='Adding new column for stats')
dataset = dataset.map(op.compute_stats,
num_proc=op_proc,
with_rank=with_rank,
desc=op_name + '_compute_stats')
original_process = op.process
op.process = None
dataset = dataset.process(op)
op.process = original_process
stats_collected = True
if not stats_collected:
logger.warning('No stats collected. Please add some Filter ops to '
Expand Down
73 changes: 64 additions & 9 deletions data_juicer/core/data.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,38 @@
from __future__ import annotations

import copy
import inspect
from abc import ABC, abstractmethod
from functools import wraps
from time import time
from typing import Union

from datasets import Dataset, DatasetDict, is_caching_enabled
from datasets.formatting.formatting import LazyBatch
from loguru import logger

from data_juicer.ops import UNFORKABLE
from data_juicer.utils import cache_utils
from data_juicer.utils.compress import (CompressionOff,
cleanup_compressed_cache_files,
compress, decompress)
from data_juicer.utils.fingerprint_utils import generate_fingerprint
from data_juicer.utils.process_utils import setup_mp


class DJDataset(ABC):
"""Base dataset of DJ"""

@abstractmethod
def process(
self,
operators, # TODO: add type hint
*,
exporter=None,
checkpointer=None,
tracer=None) -> DJDataset:
"""process a list of operators on the dataset."""
pass


def wrap_func_with_nested_access(f):
Expand Down Expand Up @@ -116,7 +137,7 @@ def map(self, **args):
return super().map(**args)


class NestedDataset(Dataset):
class NestedDataset(Dataset, DJDataset):
"""Enhanced HuggingFace-Dataset for better usability and efficiency."""

def __init__(self, *args, **kargs):
Expand All @@ -139,6 +160,40 @@ def __getitem__(self, key):
res = super().__getitem__(key)
return nested_obj_factory(res)

def process(self,
operators,
*,
exporter=None,
checkpointer=None,
tracer=None):
if operators is None:
return self

if not isinstance(operators, list):
operators = [operators]
unforkable_operators = set(UNFORKABLE.modules.keys())

dataset = self
for op in operators:
mp_context = ['forkserver', 'spawn'] if (
op.use_cuda() or op._name in unforkable_operators) else None
setup_mp(mp_context)

start = time()
# run single op
dataset = op(dataset,
exporter=exporter,
checkpointer=checkpointer,
tracer=tracer)
# record processed ops
if checkpointer is not None:
checkpointer.record(op._name,
list(op._process_kwargs.values())[0])
end = time()
logger.info(f'OP [{op._name}] Done in {end - start:.3f}s. '
f'Left {len(dataset)} samples.')
return dataset

def map(self, *args, **kargs):
"""Override the map func, which is called by most common operations,
such that the processed samples can be accessed by nested manner."""
Expand All @@ -158,16 +213,16 @@ def map(self, *args, **kargs):
kargs['function'])
called_func = kargs['function']

# For wrapped function, try to get its original unwrapped method
while hasattr(called_func, '__wrapped__'):
# For wrapped function, try to get its unwrapped (bound) method
while not inspect.ismethod(called_func) and hasattr(
called_func, '__wrapped__'):
called_func = called_func.__wrapped__
# Does the called function belong to a batched OP?
if inspect.ismethod(called_func) \
and 'is_batched_op' in dir(called_func.__self__) \
and callable(getattr(called_func.__self__, 'is_batched_op')) \
and called_func.__self__.is_batched_op():

# Batched is always required for fault tolerance
if inspect.ismethod(called_func):
kargs['batched'] = True
kargs['batch_size'] = 1
kargs['batch_size'] = kargs.pop(
'batch_size', 1) if called_func.__self__.is_batched_op() else 1

if 'new_fingerprint' not in kargs or kargs['new_fingerprint'] is None:
new_fingerprint = generate_fingerprint(self, *args, **kargs)
Expand Down
113 changes: 10 additions & 103 deletions data_juicer/core/executor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import traceback
from time import time

from loguru import logger
Expand All @@ -7,18 +8,14 @@
from data_juicer.core.data import Dataset
from data_juicer.format.load import load_formatter
from data_juicer.format.mixture_formatter import MixtureFormatter
from data_juicer.ops import (OPERATORS, UNFORKABLE, Deduplicator, Filter,
Mapper, Selector, load_ops)
from data_juicer.ops import OPERATORS, load_ops
from data_juicer.utils import cache_utils
from data_juicer.utils.ckpt_utils import CheckpointManager
from data_juicer.utils.constant import Fields
from data_juicer.utils.process_utils import calculate_np, setup_mp

from ..ops.selector.frequency_specified_field_selector import \
FrequencySpecifiedFieldSelector
from ..ops.selector.topk_specified_field_selector import \
TopkSpecifiedFieldSelector
from .data import add_same_content_to_new_column
from .exporter import Exporter
from .tracer import Tracer

Expand All @@ -42,6 +39,8 @@ def __init__(self, cfg=None):
self.work_dir = self.cfg.work_dir

self.ops = None
self.tracer = None
self.ckpt_manager = None

# only enable it when using cache
if self.cfg.use_cache:
Expand Down Expand Up @@ -158,109 +157,18 @@ def run(self, load_data_np=None):
logger.info('Preparing process operators...')
self.process_list, self.ops = load_ops(self.cfg.process,
self.cfg.op_fusion)
unforkable_op_list = set(UNFORKABLE.modules.keys())

# 3. data process
# - If tracer is open, trace each op after it's processed
# - If checkpoint is open, clean the cache files after each process
logger.info('Processing data...')
start = time()
tstart = start
for op_cfg, op in zip(self.process_list, self.ops):
op_name, op_args = list(op_cfg.items())[0]
prev = dataset # record last dataset
with_rank = op.use_cuda()
if op.use_cuda() or op_name in unforkable_op_list:
setup_mp(['forkserver', 'spawn'])
else:
setup_mp()
if op.num_proc != 0:
op_proc = op.num_proc
logger.info(f'Op [{op_name}] running with sepcified '
f'number of procs:{op.num_proc}')
else:
op_proc = calculate_np(self.cfg.np, op, op_name)
try:
if isinstance(op, Mapper):
tmp = dataset.map(function=op.process,
num_proc=op_proc,
with_rank=with_rank,
desc=op_name + '_process')
if self.open_tracer and \
op_name in self.op_list_to_trace:
if op.is_batched_op():
self.tracer.trace_batch_mapper(
op_name, dataset, tmp, op.text_key)
else:
self.tracer.trace_mapper(op_name, dataset, tmp,
op.text_key)
elif isinstance(op, Filter):
if Fields.stats not in dataset.features:
# only add stats when calling filter op
dataset = dataset.map(
add_same_content_to_new_column,
fn_kwargs={
'new_column_name': Fields.stats,
'initial_value': {}
},
num_proc=self.cfg.np,
desc='Adding new column for stats')
if self.cfg.use_checkpoint:
prev = dataset
dataset = dataset.map(op.compute_stats,
num_proc=op_proc,
with_rank=with_rank,
desc=op_name + '_compute_stats')
if self.cfg.use_checkpoint:
prev = dataset
if op.stats_export_path is not None:
self.exporter.export_compute_stats(
dataset, op.stats_export_path)
tmp = dataset.filter(op.process,
num_proc=self.cfg.np,
desc=op_name + '_process')
if self.open_tracer and op_name in self.op_list_to_trace:
self.tracer.trace_filter(op_name, dataset, tmp)
elif isinstance(op, Selector):
tmp = op.process(dataset)
if self.open_tracer and op_name in self.op_list_to_trace:
self.tracer.trace_filter(op_name, dataset, tmp)
elif isinstance(op, Deduplicator):
dataset = dataset.map(op.compute_hash,
num_proc=op_proc,
with_rank=with_rank,
desc=op_name + '_compute_hash')
if self.cfg.use_checkpoint:
prev = dataset
tmp, dup_pairs = op.process(
dataset, self.tracer.show_num if self.open_tracer
and op_name in self.op_list_to_trace else 0)
if self.open_tracer and op_name in self.op_list_to_trace:
self.tracer.trace_deduplicator(op_name, dup_pairs)
else:
raise NotImplementedError
dataset = tmp
except: # noqa: E722
logger.error(f'An error occurred during Op [{op_name}].')
import traceback
traceback.print_exc()
if self.cfg.use_checkpoint:
logger.info('Writing checkpoint of dataset processed by '
'last op...')
prev.cleanup_cache_files()
self.ckpt_manager.save_ckpt(prev)
exit(1)

# clean up cache files and record processed ops
if self.cfg.use_checkpoint:
self.ckpt_manager.record(op_name, op_args)

end = time()
logger.info(f'Op [{op_name}] Done in {"%.3f" % (end - start)}(s). '
f'Left {len(dataset)} samples.')
start = end
tstart = time()
dataset = dataset.process(self.ops,
exporter=self.exporter,
checkpointer=self.ckpt_manager,
tracer=self.tracer)
tend = time()
logger.info(f'All Ops are done in {"%.3f" % (tend - tstart)}(s).')
logger.info(f'All OPs are done in {tend - tstart:.3f}s.')

# 4. data export
logger.info('Exporting dataset to disk...')
Expand All @@ -269,7 +177,6 @@ def run(self, load_data_np=None):
except: # noqa: E722
logger.error('An error occurred during exporting the processed '
'dataset.')
import traceback
traceback.print_exc()
if self.cfg.use_checkpoint:
logger.info('Writing checkpoint of dataset processed by '
Expand Down
Loading

0 comments on commit 9f97231

Please sign in to comment.