-
Notifications
You must be signed in to change notification settings - Fork 4.7k
Description
import argparse
import os
import deepspeed
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torchvision.transforms as transforms
from deepspeed.accelerator import get_accelerator
from deepspeed.moe.utils import split_params_into_different_moe_groups_for_optimizer
from torch.utils.data import Dataset, DataLoader
import pandas as pd
from PIL import Image
import io
from torchvision import transforms
def add_argument():
parser = argparse.ArgumentParser(description="CIFAR")
# For train.
parser.add_argument(
"-e",
"--epochs",
default=30,
type=int,
help="number of total epochs (default: 30)",
)
parser.add_argument(
"--local_rank",
type=int,
default=-1,
help="local rank passed from distributed launcher",
)
parser.add_argument(
"--log-interval",
type=int,
default=2000,
help="output logging information at a given interval",
)
# For mixed precision training.
parser.add_argument(
"--dtype",
default="fp16",
type=str,
choices=["bf16", "fp16", "fp32"],
help="Datatype used for training",
)
# For ZeRO Optimization.
parser.add_argument(
"--stage",
default=2,
type=int,
choices=[0, 1, 2, 3],
help="Datatype used for training",
)
# For MoE (Mixture of Experts).
parser.add_argument(
"--moe",
default=False,
action="store_true",
help="use deepspeed mixture of experts (moe)",
)
parser.add_argument(
"--ep-world-size", default=1, type=int, help="(moe) expert parallel world size"
)
parser.add_argument(
"--num-experts",
type=int,
nargs="+",
default=[
1,
],
help="number of experts list, MoE related.",
)
parser.add_argument(
"--mlp-type",
type=str,
default="standard",
help="Only applicable when num-experts > 1, accepts [standard, residual]",
)
parser.add_argument(
"--top-k", default=1, type=int, help="(moe) gating top 1 and 2 supported"
)
parser.add_argument(
"--min-capacity",
default=0,
type=int,
help="(moe) minimum capacity of an expert regardless of the capacity_factor",
)
parser.add_argument(
"--noisy-gate-policy",
default=None,
type=str,
help="(moe) noisy gating (only supported with top-1). Valid values are None, RSample, and Jitter",
)
parser.add_argument(
"--moe-param-group",
default=False,
action="store_true",
help="(moe) create separate moe param groups, required when using ZeRO w. MoE",
)
# Include DeepSpeed configuration arguments.
parser = deepspeed.add_config_arguments(parser)
args = parser.parse_args()
return args
def create_moe_param_groups(model):
"""Create separate parameter groups for each expert."""
parameters = {"params": [p for p in model.parameters()], "name": "parameters"}
return split_params_into_different_moe_groups_for_optimizer(parameters)
def get_ds_config(args):
"""Get the DeepSpeed configuration dictionary."""
print(args.stage)
ds_config = {
"train_batch_size": 16,
"steps_per_print": 2000,
"optimizer": {
"type": "Adam",
"params": {
"lr": 0.001,
"betas": [0.8, 0.999],
"eps": 1e-8,
"weight_decay": 3e-7,
},
},
"scheduler": {
"type": "WarmupLR",
"params": {
"warmup_min_lr": 0,
"warmup_max_lr": 0.001,
"warmup_num_steps": 1000,
},
},
"gradient_clipping": 1.0,
"prescale_gradients": False,
"bf16": {"enabled": args.dtype == "bf16"},
"fp16": {
"enabled": args.dtype == "fp16",
"fp16_master_weights_and_grads": False,
"loss_scale": 0,
"loss_scale_window": 500,
"hysteresis": 2,
"min_loss_scale": 1,
"initial_scale_power": 15,
},
"wall_clock_breakdown": False,
"zero_optimization": {
"stage": args.stage,
"allgather_partitions": True,
"reduce_scatter": True,
"allgather_bucket_size": 50000000,
"reduce_bucket_size": 50000000,
"overlap_comm": True,
"contiguous_gradients": True,
"cpu_offload": False,
},
}
return ds_config
class Net(nn.Module):
def init(self, args):
super(Net, self).init()
self.conv1 = nn.Conv2d(3, 6, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(6, 16, 5)
self.fc1 = nn.Linear(16 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.moe = args.moe
if self.moe:
fc3 = nn.Linear(84, 84)
self.moe_layer_list = []
for n_e in args.num_experts:
# Create moe layers based on the number of experts.
self.moe_layer_list.append(
deepspeed.moe.layer.MoE(
hidden_size=84,
expert=fc3,
num_experts=n_e,
ep_size=args.ep_world_size,
use_residual=args.mlp_type == "residual",
k=args.top_k,
min_capacity=args.min_capacity,
noisy_gate_policy=args.noisy_gate_policy,
)
)
self.moe_layer_list = nn.ModuleList(self.moe_layer_list)
self.fc4 = nn.Linear(84, 10)
else:
self.fc3 = nn.Linear(84, 10)
def forward(self, x):
x = self.pool(F.relu(self.conv1(x)))
x = self.pool(F.relu(self.conv2(x)))
x = x.view(-1, 16 * 5 * 5)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
if self.moe:
for layer in self.moe_layer_list:
x, _, _ = layer(x)
x = self.fc4(x)
else:
x = self.fc3(x)
return x
def test(model_engine, testset, local_device, target_dtype, test_batch_size=4):
"""Test the network on the test data.
Args:
model_engine (deepspeed.runtime.engine.DeepSpeedEngine): the DeepSpeed engine.
testset (torch.utils.data.Dataset): the test dataset.
local_device (str): the local device name.
target_dtype (torch.dtype): the target datatype for the test data.
test_batch_size (int): the test batch size.
"""
# The 10 classes for CIFAR10.
classes = (
"plane",
"car",
"bird",
"cat",
"deer",
"dog",
"frog",
"horse",
"ship",
"truck",
)
# Define the test dataloader.
testloader = torch.utils.data.DataLoader(
testset, batch_size=test_batch_size, shuffle=False, num_workers=0
)
# For total accuracy.
correct, total = 0, 0
# For accuracy per class.
class_correct = list(0.0 for i in range(10))
class_total = list(0.0 for i in range(10))
# Start testing.
model_engine.eval()
with torch.no_grad():
for data in testloader:
images, labels = data
if target_dtype != None:
images = images.to(target_dtype)
outputs = model_engine(images.to(local_device))
_, predicted = torch.max(outputs.data, 1)
# Count the total accuracy.
total += labels.size(0)
correct += (predicted == labels.to(local_device)).sum().item()
# Count the accuracy per class.
batch_correct = (predicted == labels.to(local_device)).squeeze()
for i in range(test_batch_size):
label = labels[i]
class_correct[label] += batch_correct[i].item()
class_total[label] += 1
if model_engine.local_rank == 0:
print(
f"Accuracy of the network on the {total} test images: {100 * correct / total : .0f} %"
)
# For all classes, print the accuracy.
for i in range(10):
print(
f"Accuracy of {classes[i] : >5s} : {100 * class_correct[i] / class_total[i] : 2.0f} %"
)
自定义数据集类
class ParquetDataset(Dataset):
def init(self, parquet_file, transform=None):
# 读取 parquet 文件
self.data = pd.read_parquet(parquet_file)
self.transform = transform
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
# 获取图像数据和标签
# print(self.data.iloc[idx])
img_data = self.data.iloc[idx]['img']['bytes'] # 假设 image 存储为字节数据或路径
label = self.data.iloc[idx]['label']
# 将图像数据转换为 PIL 图片(如果是字节数据)
if isinstance(img_data, bytes):
# 使用 BytesIO 将字节数据转换为图像
img = Image.open(io.BytesIO(img_data)) # 使用 BytesIO 将字节数据转换为图像
else:
# 如果 img_data 是其他格式,直接使用
img = img_data
# 应用转换
if self.transform:
img = self.transform(img)
return img, label
def main(args):
# Initialize DeepSpeed distributed backend.
deepspeed.init_distributed()
_local_rank = int(os.environ.get("LOCAL_RANK"))
get_accelerator().set_device(_local_rank)
########################################################################
# Step1. Data Preparation.
#
# The output of torchvision datasets are PILImage images of range [0, 1].
# We transform them to Tensors of normalized range [-1, 1].
#
# Note:
# If running on Windows and you get a BrokenPipeError, try setting
# the num_worker of torch.utils.data.DataLoader() to 0.
########################################################################
transform = transforms.Compose(
[transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]
)
if torch.distributed.get_rank() != 0:
# Might be downloading cifar data, let rank 0 download first.
torch.distributed.barrier()
# Load or download cifar data.
# trainset = torchvision.datasets.CIFAR10(
# root="./data", train=True, download=True, transform=transform
# )
# testset = torchvision.datasets.CIFAR10(
# root="./data", train=False, download=True, transform=transform
# )
trainset = ParquetDataset(parquet_file="ref_data/plain_text/train-00000-of-00001.parquet", transform=transform)
testset = ParquetDataset(parquet_file="ref_data/plain_text/test-00000-of-00001.parquet", transform=transform)
if torch.distributed.get_rank() == 0:
# Cifar data is downloaded, indicate other ranks can proceed.
torch.distributed.barrier()
########################################################################
# Step 2. Define the network with DeepSpeed.
#
# First, we define a Convolution Neural Network.
# Then, we define the DeepSpeed configuration dictionary and use it to
# initialize the DeepSpeed engine.
########################################################################
net = Net(args)
# Get list of parameters that require gradients.
parameters = filter(lambda p: p.requires_grad, net.parameters())
# If using MoE, create separate param groups for each expert.
if args.moe_param_group:
parameters = create_moe_param_groups(net)
# Initialize DeepSpeed to use the following features.
# 1) Distributed model.
# 2) Distributed data loader.
# 3) DeepSpeed optimizer.
ds_config = get_ds_config(args)
for name, param in net.named_parameters():
try:
print(f"model_enginenew{name}: grad norm = {param.grad.norm().item()} ===param.requires_grad: {param.requires_grad} ")
except:
print(f"model_enginenew**{name}: grad is None ===param.requires_grad: {param.requires_grad} ")
model_engine, optimizer, trainloader, __ = deepspeed.initialize(
args=args,
model=net,
model_parameters=parameters,
training_data=trainset,
config=ds_config,
)
# trainloader = DataLoader(trainset, batch_size=16, shuffle=True)
# Get the local device name (str) and local rank (int).
local_device = get_accelerator().device_name(model_engine.local_rank)
local_rank = model_engine.local_rank
# For float32, target_dtype will be None so no datatype conversion needed.
target_dtype = None
if model_engine.bfloat16_enabled():
target_dtype = torch.bfloat16
elif model_engine.fp16_enabled():
target_dtype = torch.half
for name, param in model_engine.named_parameters():
try:
print(f"model_enginenew{name}: grad norm = {param.grad.norm().item()} ===param.requires_grad: {param.requires_grad} ")
except:
print(f"model_enginenew**{name}: grad is None ===param.requires_grad: {param.requires_grad} ")
# Define the Classification Cross-Entropy loss function.
criterion = nn.CrossEntropyLoss()
########################################################################
# Step 3. Train the network.
#
# This is when things start to get interesting.
# We simply have to loop over our data iterator, and feed the inputs to the
# network and optimize. (DeepSpeed handles the distributed details for us!)
########################################################################
for epoch in range(args.epochs): # loop over the dataset multiple times
running_loss = 0.0
for i, data in enumerate(trainloader):
# Get the inputs. ``data`` is a list of [inputs, labels].
inputs, labels = data[0].to(local_device), data[1].to(local_device)
print(f'## inputs: {inputs}, labels: {labels}')
# Try to convert to target_dtype if needed.
if target_dtype != None:
inputs = inputs.to(target_dtype)
outputs = model_engine(inputs)
loss = criterion(outputs, labels)
model_engine.backward(loss)
model_engine.step()
for name, param in model_engine.named_parameters():
try:
print(f"model_enginenew{name}: grad norm = {param.grad.norm().item()} ===param.requires_grad: {param.requires_grad} ")
except:
print(f"model_enginenew**{name}: grad is None ===param.requires_grad: {param.requires_grad} ")
for name, param in net.named_parameters():
try:
print(f"netmodel_enginenew{name}: grad norm = {param.grad.norm().item()} ===param.requires_grad: {param.requires_grad} ")
except:
print(f"netmodel_enginenew**{name}: grad is None ===param.requires_grad: {param.requires_grad} ")
# Print statistics
running_loss += loss.item()
if local_rank == 0 and i % args.log_interval == (
args.log_interval - 1
): # Print every log_interval mini-batches.
print(
f"[{epoch + 1 : d}, {i + 1 : 5d}] loss: {running_loss / args.log_interval : .3f}"
)
running_loss = 0.0
print("Finished Training")
########################################################################
# Step 4. Test the network on the test data.
########################################################################
test(model_engine, testset, local_device, target_dtype)
if name == "main":
args = add_argument()
main(args)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
bash:
deepspeed --num_gpus 4 --num_nodes 1 --hostfile /etc/aistudio/hostfile --master_addr $MASTER_ADDR --ssh_port 20023 ref.py --stage 3
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
model_enginenewmodule.conv1.bias: grad is None ===param.requires_grad: True
model_enginenewmodule.conv2.weight: grad is None ===param.requires_grad: True
model_enginenewmodule.conv2.weight: grad is None ===param.requires_grad: True model_enginenewmodule.conv2.bias: grad is None ===param.requires_grad: True
model_enginenewmodule.conv2.weight: grad is None ===param.requires_grad: True model_enginenewmodule.conv2.bias: grad is None ===param.requires_grad: True
model_enginenewmodule.fc1.weight: grad is None ===param.requires_grad: True model_enginenewmodule.conv2.bias: grad is None ===param.requires_grad: True
model_enginenewmodule.fc1.weight: grad is None ===param.requires_grad: True
model_enginenewmodule.fc1.bias: grad is None ===param.requires_grad: True
model_enginenewmodule.conv1.weight: grad is None ===param.requires_grad: True model_enginenewmodule.fc1.weight: grad is None ===param.requires_grad: True model_enginenew**module.fc1.bias: grad is None ===param.requires_grad: True
model_enginenew**module.fc2.weight: grad is None ===param.requires_grad: True
model_enginenewmodule.fc1.bias: grad is None ===param.requires_grad: True
model_enginenewmodule.fc2.bias: grad is None ===param.requires_grad: True model_enginenew**module.conv1.bias: grad is None ===param.requires_grad: True
model_enginenewmodule.fc2.weight: grad is None ===param.requires_grad: True model_enginenewmodule.fc2.weight: grad is None ===param.requires_grad: True
model_enginenewmodule.fc3.weight: grad is None ===param.requires_grad: True
model_enginenewmodule.fc2.bias: grad is None ===param.requires_grad: True model_enginenewmodule.fc2.bias: grad is None ===param.requires_grad: True model_enginenewmodule.fc3.bias: grad is None ===param.requires_grad: True
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
How to fix