Skip to content

Commit

Permalink
zero three save bug fix
Browse files Browse the repository at this point in the history
  • Loading branch information
jstzwj committed Jun 10, 2024
1 parent 881e718 commit 617adb2
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 29 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
{
"conv_format": "openbuddy",
"end_of_conversation": 151643,
"end_of_conversation": 2,
"data_path": [
"/data/wangjun/github/cangjie/",
"Vtuber-plan/sharegpt-cleaned"
{
"path": "/data/wangjun/github/cangjie_data/",
"sample": 1.0
},
{
"path": "Vtuber-plan/sharegpt-cleaned",
"sample": 0.01
}

],
"data_output_path": "./tmp/data_files/",
"model_name_or_path": "/data/wangjun/models/CodeQwen1.5-7B",
Expand All @@ -12,7 +19,7 @@
"per_device_eval_batch_size": 8,
"accumulate_grad_batches": 64,
"max_seq_len": 1024,
"checkpoint_every_n_train_steps": 100,
"checkpoint_every_n_train_steps": 1,
"log_every_n_steps": 1,
"val_check_interval": 0.25,
"limit_val_batches": 0.1,
Expand All @@ -30,5 +37,18 @@
"weight_decay": 0.0,
"gradient_clip_algorithm": "norm",
"gradient_clip_val": 1.0,
"strategy": null
"strategy": "deepspeed",
"strategy_params": {
"zero_stage": 3,
"remote_device": null,
"offload_optimizer": true,
"offload_optimizer_device": "cpu",
"offload_parameters": false,
"offload_params_device": "cpu",
"cpu_checkpointing": true,
"nvme_path": "./nvme_offload",
"params_buffer_count": 5,
"params_buffer_size": 1000000000,
"contiguous_memory_optimization": false
}
}
8 changes: 4 additions & 4 deletions hparams/hparams_chat_qwen2_7b_deepspeed_cj.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
"atten_class": "eager",
"per_device_train_batch_size": 2,
"per_device_eval_batch_size": 8,
"accumulate_grad_batches": 128,
"max_seq_len": 4096,
"accumulate_grad_batches": 64,
"max_seq_len": 1024,
"checkpoint_every_n_train_steps": 100,
"log_every_n_steps": 1,
"val_check_interval": 0.25,
Expand All @@ -41,9 +41,9 @@
"strategy_params": {
"zero_stage": 3,
"remote_device": null,
"offload_optimizer": true,
"offload_optimizer": false,
"offload_optimizer_device": "cpu",
"offload_parameters": false,
"offload_parameters": true,
"offload_params_device": "cpu",
"cpu_checkpointing": true,
"nvme_path": "./nvme_offload",
Expand Down
10 changes: 5 additions & 5 deletions katheryne/light_modules/models/chat_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from katheryne.utils.hparams import HParams
from katheryne.utils.model.model_utils import save_hf_format
from katheryne.utils.utils import get_optimizer_grouped_parameters, optimizer_to, save_zero_three_model
from katheryne.utils.utils import get_optimizer_grouped_parameters, optimizer_to, save_zero_three_hf_model

class ChatLanguageModel(pl.LightningModule):
def __init__(self, model: PreTrainedModel, params: HParams) -> None:
Expand All @@ -27,7 +27,7 @@ def __init__(self, model: PreTrainedModel, params: HParams) -> None:

self.deepspeed = self.params.get("strategy", None) == "deepspeed"
self.strategy_params = self.params.get("strategy_params", dict())
self.offload = self.strategy_params.get("offload_optimizer", False) or self.strategy_params.get("offload_parameters", False)
self.offload = self.strategy_params.get("offload_optimizer", False)

self.save_hyperparameters(ignore=["model"])

Expand Down Expand Up @@ -58,7 +58,7 @@ def training_step(self, batch, batch_idx: int):
loss = lm_output[0]

# Logging to TensorBoard by default
self.log('train_loss', loss, on_step=True, on_epoch=True, sync_dist=False)
self.log('train_loss', loss, on_step=True, on_epoch=True, sync_dist=True)
return loss

def validation_step(self, batch, batch_idx):
Expand All @@ -74,13 +74,13 @@ def validation_step(self, batch, batch_idx):
lm_output = self.forward(tokens=source_tokens)
loss = lm_output.loss

self.log('val_loss', loss, on_step=True, on_epoch=True, sync_dist=False)
self.log('val_loss', loss, on_step=True, on_epoch=True, sync_dist=True)

def on_save_checkpoint(self, checkpoint):
save_path = f"{self.trainer.logger.log_dir}/huggingface_format"
if self.deepspeed and self.strategy_params.get("zero_stage", 0) == 3:
# For zero stage 3, each gpu only has a part of the model, so we need a special save function
save_zero_three_model(self.model, self.global_rank,
save_zero_three_hf_model(self.model, self.global_rank,
os.path.join(save_path, f"checkpoint-step-{self.global_step}"),
zero_stage=3
)
Expand Down
11 changes: 3 additions & 8 deletions katheryne/light_modules/models/instruction_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from katheryne.utils.hparams import HParams
from katheryne.utils.model.model_utils import save_hf_format
from katheryne.utils.utils import get_optimizer_grouped_parameters, optimizer_to, save_zero_three_model
from katheryne.utils.utils import get_optimizer_grouped_parameters, optimizer_to, save_zero_three_hf_model, save_zero_three_model

class InstructionLanguageModel(pl.LightningModule):
def __init__(self, model: PreTrainedModel, params: HParams) -> None:
Expand All @@ -26,7 +26,7 @@ def __init__(self, model: PreTrainedModel, params: HParams) -> None:

self.deepspeed = self.params.get("strategy", None) == "deepspeed"
self.strategy_params = self.params.get("strategy_params", dict())
self.offload = self.strategy_params.get("offload_optimizer", False) or self.strategy_params.get("offload_parameters", False)
self.offload = self.strategy_params.get("offload_optimizer", False)

self.save_hyperparameters(ignore=["model"])

Expand Down Expand Up @@ -76,12 +76,9 @@ def validation_step(self, batch, batch_idx):
self.log('val_loss', loss, on_step=True, on_epoch=True, sync_dist=True, rank_zero_only=True)

def on_save_checkpoint(self, checkpoint):
if self.hparams.params.get("lora_dim", 0) > 0:
fuse_linear_layer(self.model, self.deepspeed)

if self.deepspeed and self.strategy_params.get("zero_stage", 0) == 3:
# For zero stage 3, each gpu only has a part of the model, so we need a special save function
save_zero_three_model(self.model, self.global_rank,
save_zero_three_hf_model(self.model, self.global_rank,
os.path.join("./lightning_logs/huggingface_format", f"checkpoint-step-{self.global_step}"),
zero_stage=3
)
Expand All @@ -94,8 +91,6 @@ def on_save_checkpoint(self, checkpoint):
sub_folder=f"checkpoint-step-{self.global_step}",
peft_merge=self.hparams.get("peft_merge", False),
)
if self.hparams.params.get("lora_dim", 0) > 0:
unfuse_linear_layer(self.model, self.deepspeed)

def configure_optimizers(self):
optimizer_grouped_parameters = get_optimizer_grouped_parameters(self.trainer.model, self.hparams.params.get("weight_decay", 0.0))
Expand Down
10 changes: 5 additions & 5 deletions katheryne/light_modules/models/pretrain_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from katheryne.utils.hparams import HParams
from katheryne.utils.model.model_utils import save_hf_format
from katheryne.utils.utils import save_zero_three_model
from katheryne.utils.utils import save_zero_three_hf_model, save_zero_three_model

class PretrainLanguageModel(pl.LightningModule):
def __init__(self, model: PreTrainedModel, params: HParams) -> None:
Expand All @@ -28,7 +28,7 @@ def __init__(self, model: PreTrainedModel, params: HParams) -> None:

self.deepspeed = self.params.get("strategy", None) == "deepspeed"
self.strategy_params = self.params.get("strategy_params", dict())
self.offload = self.strategy_params.get("offload_optimizer", False) or self.strategy_params.get("offload_parameters", False)
self.offload = self.strategy_params.get("offload_optimizer", False)

self.save_hyperparameters(ignore=["model"])

Expand Down Expand Up @@ -58,7 +58,7 @@ def training_step(self, batch, batch_idx: int):
loss = lm_output[0]

# Logging to TensorBoard by default
self.log('train_loss', loss, on_step=True, on_epoch=True, sync_dist=False)
self.log('train_loss', loss, on_step=True, on_epoch=True, sync_dist=True)
return loss

def validation_step(self, batch, batch_idx):
Expand All @@ -73,15 +73,15 @@ def validation_step(self, batch, batch_idx):
lm_output = self.forward(tokens=source_tokens)
loss = lm_output.loss

self.log('val_loss', loss, on_step=True, on_epoch=True, sync_dist=False)
self.log('val_loss', loss, on_step=True, on_epoch=True, sync_dist=True)

def on_save_checkpoint(self, checkpoint):
if self.trainer.logger is None:
return
save_path = f"{self.trainer.logger.log_dir}/huggingface_format"
if self.deepspeed and self.strategy_params.get("zero_stage", 0) == 3:
# For zero stage 3, each gpu only has a part of the model, so we need a special save function
save_zero_three_model(self.model, self.global_rank,
save_zero_three_hf_model(self.model, self.global_rank,
os.path.join(save_path, f"checkpoint-step-{self.global_step}"),
zero_stage=3
)
Expand Down
4 changes: 3 additions & 1 deletion katheryne/utils/model/model_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ def save_hf_format(model, tokenizer=None, output_dir: str="./", sub_folder: str=
else:
merged_model = model
merged_model.save_pretrained(output_dir)
else:
elif isinstance(model, PreTrainedModel):
model.save_pretrained(output_dir)
else:
raise Exception("Unsupported model to save.")


def save_hf_format_native(model, tokenizer=None, output_dir: str="./", sub_folder: str=""):
Expand Down
25 changes: 24 additions & 1 deletion katheryne/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import torch
import random
import numpy as np
from transformers import set_seed, AutoTokenizer
from transformers import set_seed, PreTrainedModel
import json
try:
import deepspeed
Expand Down Expand Up @@ -136,6 +136,29 @@ def save_zero_three_model(model_ema, global_rank: int, save_dir: str, zero_stage
torch.save(output_state_dict, output_model_file)
del output_state_dict

def save_zero_three_hf_model(model_ema, global_rank: int, save_dir: str, zero_stage: int=0):
zero_stage_3 = (zero_stage == 3)
os.makedirs(save_dir, exist_ok=True)

model_to_save: PreTrainedModel = model_ema.module if hasattr(model_ema, 'module') else model_ema
if not zero_stage_3:
if global_rank == 0:
model_to_save.save_pretrained(save_dir)
else:
output_state_dict = {}
for k, v in model_to_save.named_parameters():

if hasattr(v, 'ds_id'):
with deepspeed.zero.GatheredParameters(_z3_params_to_fetch([v]), enabled=zero_stage_3):
v_p = v.data.cpu()
else:
v_p = v.cpu()
if global_rank == 0:
output_state_dict[k] = v_p
if global_rank == 0:
model_to_save.save_pretrained(save_dir, state_dict=output_state_dict)
del output_state_dict

def optimizer_to(optim, device):
for param in optim.state.values():
# Not sure there are any global tensors in the state dict
Expand Down

0 comments on commit 617adb2

Please sign in to comment.