NLP Python PyTorch 資料科學

[PyTorch] 使用 torch.distributed 在單機多 GPU 上進行分散式訓練

turned on computer monitor displaying text

Finetune 語言模型所需要的 GPU memory 比較多,往往會需要我們能夠平行利用到多顆 GPU 的資源。今天這篇文章會說明 DataParallelDistributedDataParallel + DistributedSampler 兩種進行模型分散式訓練的方式。

方法比較

官方更推薦使用 DistributedDataParallel,原因如下

比較DataParallelDistributedDataParallel
實現方式multi-threadmulti-process
參數更新方式所有 thread 的梯度先匯總到 GPU: 0 (指定的第一個 device)進行反向傳播更新完參數,再 broadcast 參數到其他 GPU各 process 上的梯度計算並匯總後,由 rank: 0 process 將每個 process 的梯度平均 broadcast 到所有 process,每個 process 再獨立進行參數更新
使用門檻較簡易,只要一行指令較繁複,需搭配 DistributedSampler 及 local_rank 參數使用
採用理由.好上手.程式會為每個 GPU 建立一個 process,避免了 multi-thread 時 Python 內部的 Global Interpreter Lock 造成的效能開銷
.相比 DataParallel ,GPU 之間的數據傳輸量較小,因此效率更高

DataParallel

使用的方式很簡單,只要將模型 wrap 起來

import os
import torch
from torch import nn
from transformers import AutoModelForCasualLM, AutoTokenizer

os.environ["CUDA_VISIBLE_DEVICES"] = "0,1,2,3"
device = "cuda" if torch.cuda.is_available() else "cpu"
model = AutoModelForCasualLM.from_pretrained("bert-base-cased")
model = nn.DataParallel(model, device_ids=[0,1,2,3])
model = model.to(device)

nn.DataParallel 參數說明:

  • device_ids : 指定要使用哪些邏輯上的 GPU id
    • 假設 CUDA_VISIBLE_DEVICES 指定為 “1,2,3”,device_ids 指定 [0,1] ,則實際會用到的是 GPU: 1, GPU: 2 這兩張

DistributedDataParallel + DistributedSampler

會使用到以下幾個功能:

  • torchrun: torch.distributed.launch 的 console script,用來啟動分散式訓練,透過指令參數設定機器數 (node) 和 GPU 數量
  • nn.DistributedDataParallel: 將模型轉換為分散式訓練的 class
  • DistributedSampler: 使得 dataloader 可以被分散到多個 GPU

首先載入需要的模組

#!/usr/bin/python
# -*- coding: utf-8 -*-
# run.py

import os
import pandas as pd
import torch
import torch.distributed as dist
from torch.optim import AdamW
from torch.utils.data import DataLoader, Dataset
from torch.utils.data.distributed import DistributedSampler
from tqdm import tqdm
from transformers import AutoModelForCausalLM, AutoTokenizer, set_seed

接下來從環境變數取得 RANK, LOCAL_RANK, WORLD_SIZE 這些參數

  • RANK: 表示全域間的 process 序號,用於 process 間的通訊,當前的序號為優先執行的 process
  • LOCAL_RANK: 表示 process 內的 GPU 編號
  • WORLD_SIZE: 表示全域的 GPU 總數
def check_distributed():
    if "RANK" in os.environ:
        rank = int(os.environ["RANK"])
        local_rank = int(os.environ["LOCAL_RANK"])
        world_size = int(os.environ["WORLD_SIZE"])
    else:
        rank = local_rank = world_size = -1
        is_distributed = world_size != -1
    return rank, local_rank, world_size, is_distributed

建一個自訂的 Dataset object,用來把原始資料轉成訓練用格式

class CustomDataset(Dataset):
    def __init__(self, data, model_name, length=1024):
        self.data = data
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        tokens = self.tokenizer(self.data[idx])
        return (
            torch.tensor(tokens["input_ids"]),
            torch.tensor(tokens["attention_mask"])
        )

待會程式執行使用 torchrun

  • nproc_per_node: 使用的 GPU 數量
  • nnodes: 使用的機器數
~$ torchrun --standalone --nnodes=1 --nproc_per_node=4 run.py

在程式的 entry point 要設定 CUDA_VISIBLE_DEVICES 環境變數:

os.environ["CUDA_VISIBLE_DEVICES"] = "0,1,2,3"
seed = 2023
set_seed(seed)
rank, local_rank, world_size, is_distributed = check_distributed()

if is_distributed:
    # set current device
    torch.cuda.set_device(local_rank)
    device = torch.device("cuda", local_rank)
    # initialize process group and set the communication backend betweend GPUs
    dist.init_process_group("nccl")
else:
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

載入模型、創建資料集物件

remote_model_name = "bigscience/bloomz-1b1"
tokenizer = AutoTokenizer.from_pretrained(remote_model_name)
model = AutoModelForCausalLM.from_pretrained(remote_model_name).to(device)

if is_distributed:
    model = torch.nn.parallel.DistributedDataParallel(
        model, device_ids=[local_rank], output_device=local_rank
    )

fake_data = pd.read_json("./data/train.ndjson", lines=True)
train_dataset = CustomDataset(data=fake_data, model_name=remote_model_name)
train_sampler = DistributedSampler(train_dataset, shuffle=True, seed=seed)
train_loader = DataLoader(train_dataset, batch_size=32, sampler=train_sampler)

接下來的訓練程式都和單 GPU 訓練一樣:(這邊用個簡單範例)

optimizer = AdamW(model.parameters(), lr=0.1, weight_decay=0)
epochs = 20
for epoch in range(epochs):
    model.train()
    loss_sum = 0.0
    for batch in tqdm(train_loader):
        X, M = [tensor.to(device) for tensor in batch]
        labels = X.masked_fill(~M, -100)
        output = model(X, attention_mask=M, labels=labels)
        loss = output.loss
        loss_sum += loss.item()
        loss.backward()
        optimizer.step()
        model.zero_grad()

    loss_avg = loss_sum / len(train_loader)
    lr = optimizer.param_groups[0]["lr"]
    print(f"Epoch {epoch} | loss: {loss_avg}")

以上你也可以再把想要轉成 console arguments 的變數透過 argparse.ArgumentParser() 設定。

Terminal output

nvidia-smi

其他推薦閱讀:

未來會再介紹其他其他減少模型佔用 GPU 記憶體的方法,例如 AutoCast, 8-bit Optimizer 等,讓更多人知道怎麼使用有限的資源訓練大型語言模型。


DataAgent 致力於資料技術、資料產品開發經驗以及轉職心得分享

歡迎讀者用以下方式來回饋創作者:)
  • 如果你覺得這篇文章對你有幫助,歡迎幫我在下方多按幾顆 LikeCoin !
  • 訂閱 DataAgent Youtube 頻道 / 追蹤 Facebook 粉專,也可以加我 FB, LinkedIn 跟我分享你在做的事情
  • 看過很多文章覺得有幫助 -> 來 BuyMeACoffee 贊助我一杯咖啡,就是給我最好的回饋!

%d 位部落客按了讚: