From 7d9265f651ae93e4880ab039682388fb319008a7 Mon Sep 17 00:00:00 2001 From: XinyuYe-Intel Date: Fri, 6 Sep 2024 14:14:27 +0800 Subject: [PATCH] Support rerank model finetuning (#578) * support rerank model finetuning. Signed-off-by: Ye, Xinyu * adapt rerank model to transformers' scheme. Signed-off-by: Ye, Xinyu * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fix typo. Signed-off-by: Ye, Xinyu * refined readme. Signed-off-by: Ye, Xinyu * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * modify command due to api change. Signed-off-by: Ye, Xinyu * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --------- Signed-off-by: Ye, Xinyu Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: lkk <33276950+lkk12014402@users.noreply.github.com> --- comps/finetuning/README.md | 49 ++++-- comps/finetuning/finetune_config.py | 7 + .../llm_on_ray/finetune/data_process.py | 52 +++++++ .../llm_on_ray/finetune/finetune.py | 146 +++++++++++------- .../llm_on_ray/finetune/modeling.py | 51 ++++++ 5 files changed, 237 insertions(+), 68 deletions(-) create mode 100644 comps/finetuning/llm_on_ray/finetune/modeling.py diff --git a/comps/finetuning/README.md b/comps/finetuning/README.md index 11dd0d82e..3e25d9957 100644 --- a/comps/finetuning/README.md +++ b/comps/finetuning/README.md @@ -1,6 +1,6 @@ -# LLM Fine-tuning Microservice +# Fine-tuning Microservice -LLM Fine-tuning microservice involves adapting a base model to a specific task or dataset to improve its performance on that task. +Fine-tuning microservice involves adapting a model to a specific task or dataset to improve its performance on that task, we currently supported instruction tuning for LLMs, finetuning for reranking and embedding models. ## 🚀1. Start Microservice with Python (Optional 1) @@ -86,14 +86,22 @@ docker run --runtime=habana -e HABANA_VISIBLE_DEVICES=all -p 8015:8015 -e OMPI_M ## 🚀3. Consume Finetuning Service -### 3.1 Create fine-tuning job +## 3.1 Upload a training file -Assuming a training file `alpaca_data.json` is uploaded, it can be downloaded in [here](https://github.com/tatsu-lab/stanford_alpaca/blob/main/alpaca_data.json), the following script launches a finetuning job using `meta-llama/Llama-2-7b-chat-hf` as base model: +Download a training file, such as `alpaca_data.json` for instruction tuning and upload it to the server with below command, this file can be downloaded in [here](https://github.com/tatsu-lab/stanford_alpaca/blob/main/alpaca_data.json): ```bash # upload a training file curl http://${your_ip}:8015/v1/files -X POST -H "Content-Type: multipart/form-data" -F "file=@./alpaca_data.json" -F purpose="fine-tune" +``` + +For reranking and embedding models finetuning, the training file [toy_finetune_data.jsonl](https://github.com/FlagOpen/FlagEmbedding/blob/master/examples/finetune/toy_finetune_data.jsonl) is an toy example. + +## 3.2 Create fine-tuning job +After a training file like `alpaca_data.json` is uploaded, use the following command to launch a finetuning job using `meta-llama/Llama-2-7b-chat-hf` as base model: + +```bash # create a finetuning job curl http://${your_ip}:8015/v1/fine_tuning/jobs \ -X POST \ @@ -102,22 +110,41 @@ curl http://${your_ip}:8015/v1/fine_tuning/jobs \ "training_file": "alpaca_data.json", "model": "meta-llama/Llama-2-7b-chat-hf" }' +``` +Use the following command to launch a finetuning job for reranking model finetuning, such as `BAAI/bge-reranker-large`: + +```bash +# create a finetuning job +curl http://${your_ip}:8015/v1/fine_tuning/jobs \ + -X POST \ + -H "Content-Type: application/json" \ + -d '{ + "training_file": "toy_finetune_data.jsonl", + "model": "BAAI/bge-reranker-large", + "General":{ + "task":"rerank", + "lora_config":null + } + }' +``` + +## 3.3 Manage fine-tuning job + +Below commands show how to list finetuning jobs, retrieve a finetuning job, cancel a finetuning job and list checkpoints of a finetuning job. + +```bash # list finetuning jobs -curl http://${your_ip}:8015/v1/fine_tuning/jobs -X GET +curl http://${your_ip}:8015/v1/fine_tuning/jobs -X GET # retrieve one finetuning job -curl http://localhost:8015/v1/fine_tuning/jobs/retrieve -X POST -H "Content-Type: application/json" -d '{ - "fine_tuning_job_id": ${fine_tuning_job_id}}' +curl http://localhost:8015/v1/fine_tuning/jobs/retrieve -X POST -H "Content-Type: application/json" -d '{"fine_tuning_job_id": ${fine_tuning_job_id}}' # cancel one finetuning job -curl http://localhost:8015/v1/fine_tuning/jobs/cancel -X POST -H "Content-Type: application/json" -d '{ - "fine_tuning_job_id": ${fine_tuning_job_id}}' +curl http://localhost:8015/v1/fine_tuning/jobs/cancel -X POST -H "Content-Type: application/json" -d '{"fine_tuning_job_id": ${fine_tuning_job_id}}' # list checkpoints of a finetuning job curl http://${your_ip}:8015/v1/finetune/list_checkpoints -X POST -H "Content-Type: application/json" -d '{"fine_tuning_job_id": ${fine_tuning_job_id}}' - - ``` ## 🚀4. Descriptions for Finetuning parameters diff --git a/comps/finetuning/finetune_config.py b/comps/finetuning/finetune_config.py index b070281ff..6271b618d 100644 --- a/comps/finetuning/finetune_config.py +++ b/comps/finetuning/finetune_config.py @@ -48,12 +48,18 @@ class GeneralConfig(BaseModel): config: LoadConfig = LoadConfig() lora_config: Optional[LoraConfig] = LoraConfig() enable_gradient_checkpointing: bool = False + task: str = "instruction_tuning" @validator("report_to") def check_report_to(cls, v: str): assert v in ["none", "tensorboard"] return v + @validator("task") + def check_task(cls, v: str): + assert v in ["instruction_tuning", "rerank", "embedding"] + return v + class DatasetConfig(BaseModel): train_file: str = None @@ -74,6 +80,7 @@ class DatasetConfig(BaseModel): data_preprocess_type: str = "neural_chat" max_train_samples: int = 0 max_eval_samples: int = 0 + train_group_size: int = 8 class RayResourceConfig(BaseModel): diff --git a/comps/finetuning/llm_on_ray/finetune/data_process.py b/comps/finetuning/llm_on_ray/finetune/data_process.py index ab5efcc09..38455e878 100644 --- a/comps/finetuning/llm_on_ray/finetune/data_process.py +++ b/comps/finetuning/llm_on_ray/finetune/data_process.py @@ -4,10 +4,16 @@ # Copyright 2023 The LLM-on-Ray Authors. import copy +import math +import random import re +from dataclasses import dataclass from itertools import chain +from typing import Dict, List, Tuple import torch +from torch.utils.data import Dataset +from transformers import BatchEncoding, DataCollatorWithPadding IGNORE_INDEX = -100 @@ -194,3 +200,49 @@ def tokenize(self, examples): examples["labels"].append(labels) examples["attention_mask"].append(results["attention_mask"]) return examples + + +class TrainDatasetForCE(Dataset): + def __init__(self, dataset, args, tokenizer): + self.dataset = dataset + self.tokenizer = tokenizer + self.args = args + self.total_len = len(self.dataset) + + def create_one_example(self, qry_encoding: str, doc_encoding: str): + item = self.tokenizer.encode_plus( + qry_encoding, + doc_encoding, + truncation=True, + max_length=self.args.get("max_length", 512), + padding=False, + ) + return item + + def __len__(self): + return self.total_len + + def __getitem__(self, item) -> List[BatchEncoding]: + query = self.dataset[item]["query"] + pos = random.choice(self.dataset[item]["pos"]) + train_group_size = self.args.get("train_group_size", 8) + if len(self.dataset[item]["neg"]) < train_group_size - 1: + num = math.ceil((train_group_size - 1) / len(self.dataset[item]["neg"])) + negs = random.sample(self.dataset[item]["neg"] * num, train_group_size - 1) + else: + negs = random.sample(self.dataset[item]["neg"], train_group_size - 1) + + batch_data = [] + batch_data.append(self.create_one_example(query, pos)) + for neg in negs: + batch_data.append(self.create_one_example(query, neg)) + + return batch_data + + +@dataclass +class GroupCollator(DataCollatorWithPadding): + def __call__(self, features) -> Tuple[Dict[str, torch.Tensor], Dict[str, torch.Tensor]]: + if isinstance(features[0], list): + features = sum(features, []) + return super().__call__(features) diff --git a/comps/finetuning/llm_on_ray/finetune/finetune.py b/comps/finetuning/llm_on_ray/finetune/finetune.py index 2ccb20b73..2476e9638 100644 --- a/comps/finetuning/llm_on_ray/finetune/finetune.py +++ b/comps/finetuning/llm_on_ray/finetune/finetune.py @@ -22,11 +22,13 @@ from ray.air import FailureConfig, RunConfig from ray.air.config import ScalingConfig from ray.train.torch import TorchTrainer +from transformers import Trainer, TrainingArguments from comps import CustomLogger from comps.finetuning.finetune_config import FinetuneConfig from comps.finetuning.llm_on_ray import common -from comps.finetuning.llm_on_ray.finetune.data_process import DataProcessor +from comps.finetuning.llm_on_ray.finetune.data_process import DataProcessor, GroupCollator, TrainDatasetForCE +from comps.finetuning.llm_on_ray.finetune.modeling import CrossEncoder logger = CustomLogger("llm_on_ray/finetune") @@ -186,74 +188,106 @@ def local_load(name, **load_config): def tokenize_dataset(config: Dict, tokenizer, dataset): - group = config["Dataset"].get("group", True) - block_size = config["Dataset"].get("block_size", 512) - tokenizer.pad_token = tokenizer.eos_token - - processor = DataProcessor(config, tokenizer) - - for key in dataset: - prompts = processor.make_prompt(dataset[key]) - dataset[key] = datasets.Dataset.from_dict(prompts) - - column_names = list(dataset["train"].features) - tokenize_fn = ( - processor.tokenize_by_neural_chat - if config["Dataset"].get("data_preprocess_type", "") == "neural_chat" - else processor.tokenize - ) - - tokenized_dataset = dataset.map( - tokenize_fn, - remove_columns=column_names, - batched=True, - load_from_cache_file=False, - desc="Tokenize dataset", - ) - - if group: - - def group_texts(examples): - # Concatenate all texts. - concatenated_examples = {k: list(chain(*examples[k])) for k in examples.keys()} - total_length = len(concatenated_examples[list(examples.keys())[0]]) - # We drop the small remainder, we could add padding if the model supported it instead of this drop, you can - # customize this part to your needs. - if total_length >= block_size: - total_length = (total_length // block_size) * block_size - # Split by chunks of max_len. - result = { - k: [t[i : i + block_size] for i in range(0, total_length, block_size)] - for k, t in concatenated_examples.items() - } - return result + task = config["General"].get("task", "instruction_tuning") + if task == "instruction_tuning": + group = config["Dataset"].get("group", True) + block_size = config["Dataset"].get("block_size", 512) + tokenizer.pad_token = tokenizer.eos_token + + processor = DataProcessor(config, tokenizer) + + for key in dataset: + prompts = processor.make_prompt(dataset[key]) + dataset[key] = datasets.Dataset.from_dict(prompts) + + column_names = list(dataset["train"].features) + tokenize_fn = ( + processor.tokenize_by_neural_chat + if config["Dataset"].get("data_preprocess_type", "") == "neural_chat" + else processor.tokenize + ) - tokenized_dataset = tokenized_dataset.map( - group_texts, + tokenized_dataset = dataset.map( + tokenize_fn, + remove_columns=column_names, batched=True, load_from_cache_file=False, - desc=f"Grouping texts in chunks of {block_size}", + desc="Tokenize dataset", ) - return tokenized_dataset + if group: + + def group_texts(examples): + # Concatenate all texts. + concatenated_examples = {k: list(chain(*examples[k])) for k in examples.keys()} + total_length = len(concatenated_examples[list(examples.keys())[0]]) + # We drop the small remainder, we could add padding if the model supported it instead of this drop, you can + # customize this part to your needs. + if total_length >= block_size: + total_length = (total_length // block_size) * block_size + # Split by chunks of max_len. + result = { + k: [t[i : i + block_size] for i in range(0, total_length, block_size)] + for k, t in concatenated_examples.items() + } + return result + + tokenized_dataset = tokenized_dataset.map( + group_texts, + batched=True, + load_from_cache_file=False, + desc=f"Grouping texts in chunks of {block_size}", + ) + + return tokenized_dataset + elif task == "rerank": + dataset["train"] = TrainDatasetForCE(dataset["train"], config["Dataset"], tokenizer) + return dataset + elif task == "embedding": + pass + else: + raise NotImplementedError(f"Unsupported task {task}, only support instruction_tuning, rerank, embedding now.") def prepare_data_collator(config: Dict, tokenizer): - return transformers.DataCollatorForLanguageModeling( - tokenizer=tokenizer, mlm=False, return_tensors="pt", pad_to_multiple_of=8 - ) + task = config["General"].get("task", "instruction_tuning") + if task == "instruction_tuning": + return transformers.DataCollatorForLanguageModeling( + tokenizer=tokenizer, mlm=False, return_tensors="pt", pad_to_multiple_of=8 + ) + elif task == "rerank": + return GroupCollator(tokenizer) + elif task == "embedding": + pass + else: + raise NotImplementedError(f"Unsupported task {task}, only support instruction_tuning, rerank, embedding now.") def load_model(config: Dict): model_name = config["General"]["base_model"] model_dtype = convert_dtype(config["Training"].get("mixed_precision", "no")) model_config = config["General"].get("config", {}) - model = transformers.AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=model_dtype, **model_config) - - lora_config = config["General"].get("lora_config", None) - if lora_config: - peft_config = LoraConfig(**lora_config) - model = get_peft_model(model, peft_config) + task = config["General"].get("task", "instruction_tuning") + training_args = convert_to_training_args(TrainingArguments, config) + if task == "instruction_tuning": + model = transformers.AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=model_dtype, **model_config) + + lora_config = config["General"].get("lora_config", None) + if lora_config: + peft_config = LoraConfig(**lora_config) + model = get_peft_model(model, peft_config) + elif task == "rerank": + model = CrossEncoder.from_pretrained( + config["Dataset"], + training_args, + model_name, + from_tf=bool(".ckpt" in model_name), + config=model_config, + ) + elif task == "embedding": + pass + else: + raise NotImplementedError(f"Unsupported task {task}, only support instruction_tuning, rerank, embedding now.") egc = config["General"].get("enable_gradient_checkpointing", False) if egc: @@ -269,8 +303,6 @@ def load_model(config: Dict): def get_trainer(config: Dict, model, tokenizer, tokenized_dataset, data_collator): device = config["Training"]["device"] if device in ["cpu", "gpu"]: - from transformers import Trainer, TrainingArguments - training_args = convert_to_training_args(TrainingArguments, config) trainer = Trainer( model=model, diff --git a/comps/finetuning/llm_on_ray/finetune/modeling.py b/comps/finetuning/llm_on_ray/finetune/modeling.py new file mode 100644 index 000000000..0a7e37af4 --- /dev/null +++ b/comps/finetuning/llm_on_ray/finetune/modeling.py @@ -0,0 +1,51 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import torch +from torch import nn +from transformers import AutoModelForSequenceClassification, PreTrainedModel, TrainingArguments +from transformers.modeling_outputs import SequenceClassifierOutput + +from comps.finetuning.finetune_config import DatasetConfig + + +class CrossEncoder(PreTrainedModel): + def __init__(self, hf_model: PreTrainedModel, data_args: DatasetConfig, train_args: TrainingArguments): + super().__init__(hf_model.config) + self.hf_model = hf_model + self.train_args = train_args + self.data_args = data_args + + self.cross_entropy = nn.CrossEntropyLoss(reduction="mean") + + self.register_buffer("target_label", torch.zeros(self.train_args.per_device_train_batch_size, dtype=torch.long)) + + def gradient_checkpointing_enable(self, **kwargs): + self.hf_model.gradient_checkpointing_enable(**kwargs) + + def forward(self, **batch): + ranker_out: SequenceClassifierOutput = self.hf_model(**batch, return_dict=True) + logits = ranker_out.logits + + if self.training: + scores = logits.view(-1, self.data_args.get("train_group_size", 8)) + loss = self.cross_entropy(scores, self.target_label[: scores.shape[0]]) + + return SequenceClassifierOutput( + loss=loss, + **ranker_out, + ) + else: + return ranker_out + + @classmethod + def from_pretrained(cls, data_args: DatasetConfig, train_args: TrainingArguments, *args, **kwargs): + hf_model = AutoModelForSequenceClassification.from_pretrained(*args, **kwargs) + reranker = cls(hf_model, data_args, train_args) + return reranker + + def save_pretrained(self, output_dir: str, **kwargs): + state_dict = self.hf_model.state_dict() + state_dict = type(state_dict)({k: v.clone().cpu() for k, v in state_dict.items()}) + kwargs.pop("state_dict") + self.hf_model.save_pretrained(output_dir, state_dict=state_dict, **kwargs)