Accelerate 是一个库,只需添加四行代码,就可以在任何分布式 configuration 中运行相同的 PyTorch 代码:
x+ from accelerate import Accelerator+ accelerator = Accelerator()
+ model, optimizer, training_dataloader, scheduler = accelerator.prepare(+ model, optimizer, training_dataloader, scheduler+ )
for batch in training_dataloader: optimizer.zero_grad() inputs, targets = batch inputs = inputs.to(device) targets = targets.to(device) outputs = model(inputs) loss = loss_function(outputs, targets)+ accelerator.backward(loss) optimizer.step() scheduler.step()上述代码可以通过 Accelerate 的 CLI 接口在任何系统上启动:
xxxxxxxxxxaccelerate launch {my_script.py}安装:
xxxxxxxxxxpip install accelerateconda install -c conda-forge acceleratepip install git+https://github.com/huggingface/accelerate
配置:
xxxxxxxxxxaccelerate config然后 accelerate 会向你询问一些问题从而生成配置。
检查配置:
xxxxxxxxxxaccelerate env为了使用 Accelerate,你只需要修改四件事:
首先,导入 Accelerator 并创建一个 accelerator 对象:
xxxxxxxxxxfrom accelerate import Acceleratoraccelerator = Accelerator()然后,移除针对你的模型和输入数据的所有 .to(device) 或 .cuda() 的调用。accelerator 将会为你正确处理这个问题,并为你把所有这些对象放在正确的设备上。
如果你知道你在做什么,你可以保留那些 .to(device) 的调用,但你应该使用 accelerator 对象提供的设备: accelerator.device 。
要完全停用自动的 device placement ,在初始化 Accelerator 时传递 device_placement=False 。
接着,将所有与训练有关的对象(optimizer, model, training dataloader, learning rate scheduler )传递给accelerator.prepare() 方法。这将确保一切都为训练做好准备。
xxxxxxxxxxmodel, optimizer, train_dataloader, lr_scheduler = accelerator.prepare( model, optimizer, train_dataloader, lr_scheduler)具体而言,training dataloader 将被分片到所有可用的 GPU/TPU 核心上,这样每个设备都能看到训练数据集的不同部分。此外,所有进程的随机数状态将在每次迭代开始时通过 dataloader 进行同步,以确保数据以相同的方式被混洗(如果你决定使用shuffle=True 或任何类型的 random sampler )。
训练的实际 batch size 将是使用的设备数量乘以你在脚本中设置的 batch size 。另外,你可以在创建 accelerator 对象时使用 split_batches=True 参半,此时无论你在多少个 GPU 上运行你的脚本,实际 batch size 都会保持不变。
你需要再开始实际的 training loop 之前执行 accelerator.prepare() 。
只有当 scheduler 需要再每个 optimizer step 中被 stepped 时,才需要把 learning rate scheduler 传递给 prepare() 。
任何获取 training dataloader length 的方法(例如,你需要记录 total training step)都应该在 accelerator.prepare() 之后进行。
你可能想、也可能不想把你的validation dataloader 发送到 prepare() ,这取决于你是否想运行分布式评估。
最后,用 accelerator.backward(loss) 替换 loss.backward() 。
现在,你的脚本将在你的本地机器上运行,也可以在多个 GPU 或 TPU 上运行。你可以使用你喜欢的工具来启动分布式训练,或者你可以使用 Accelerate launcher 启动。
分布式评估:
可以进行常规评估,此时你需要将 validation dataloader 保持在 accelerator.prepare() 之外。并且,你需要手动将 input 数据放在 accelerator.device 上。
也可以进行分布式评估,此时你需要将 validation dataloader 放置在 accelerator.prepare() 之内:
xxxxxxxxxxvalidation_dataloader = accelerator.prepare(validation_dataloader)就像 training dataloader,这意味着在分布式评估时,每个设备将仅看到部分 evaluation 数据。这意味着你需要把 predictions 进行 group 。可以通过 accelerator.gather_for_metrics() 方法来实现:
xxxxxxxxxxfor inputs, targets in validation_dataloader: predictions = model(inputs) # Gather all predictions and targets all_predictions, all_targets = accelerator.gather_for_metrics((predictions, targets)) # Example of use with a Datasets.Metric metric.add_batch(all_predictions, all_targets)类似 training dataloader ,把 validation dataloader 传入 prepare() 可能会改变该 dataloader :如果你在 GPU 上运行,则它的长度将被除以 batch size 将被乘以 split_batches=True 。
任何获取 validation dataloader length 的方法都应该在 accelerator.prepare() 之后进行。
数据集末尾的一些数据可能是重复的,所以这个 batch 的数据可以平均分配给所有的工作者。因此,应该通过gather_for_metrics() 方法计算指标,以便在收集时自动删除重复的数据。如果出于某种原因,你不希望自动完成这项工作,可以用 accelerator.gather() 来收集所有进程的数据,然后手动完成。
gather() 和 gather_for_metrics() 要求每个进程上的张量是相同尺寸的。如果你在每个进程上有不同尺寸的张量(例如,当动态填充到一个 batch 的最大长度时),你应该使用 accelerator.gather.pad_across_processes() 方法将张量填充到跨进程的最大尺寸。
启动分布式脚本:你可以使用常规命令来启动你的分布式训练(如 PyTorch 的 torch.distributed.launch ),它们与 Accelerate 完全兼容。这里唯一需要注意的是: Accelerate 使用 environment 来确定所有有用的信息,所以 torch.distributed.launch 应与标志 --use_env 一起使用。
Accelerate 还提供了一个 CLI 工具,它统一了所有的 launcher ,所以你只需要记住一个命令:
xxxxxxxxxxaccelerate config你需要回答问题,然后 Accelerate 将在你的 cache folder 创建一个 default_config.yaml 文件。这个缓存目录是(根据优先级递减):
HF_HOME 的内容,以 accelerate 为后缀。XDG_CACHE_HOME 的内容,以 huggingface/accelerate 为后缀。~/.cache/huggingface/accelerate 。你也可以通过标志 --config_file 来指定你要保存的文件的位置。
然后,你可以通过运行来测试你的设置是否一切顺利:
xxxxxxxxxxaccelerate test这将启动一个简短的脚本,测试分布式环境。你也可以在测试期间指定配置文件的位置:
xxxxxxxxxxaccelerate test --config_file path_to_config.yaml如果测试通过,你可以通过如下的命令来执行你的脚本:
xxxxxxxxxxaccelerate launch path_to_script.py --args_for_the_script也可以指定配置文件的位置:
xxxxxxxxxxaccelerate launch --config_file path_to_config.yaml path_to_script.py --args_for_the_script从 notebook 中启动:在 Accelerate 0.3.0 中引入了一个 notebook_launcher() 从而帮助你在 notebook 上启动训练。
只要在 notebook 的一个 cell 中定义一个负责整个 train and/or evaluation 的函数,然后用以下代码执行一个 cell :
xxxxxxxxxxfrom accelerate import notebook_launcher
notebook_launcher(training_function)注意:你的 Accelerator 对象应该只在 training_function 中定义,这是因为初始化应该只在 launcher 内完成。
在 TPU 上训练:如果你想在 TPU 上启动你的脚本,有一些注意事项是你应该注意的。在幕后,TPU 将为你的 training step (前向传播、反向传播、以及 optimizer step )中发生的所有操作创建一个 graph 。这就是为什么你的第一个训练步总是非常长,因为建立和编译这个 graph 需要一些时间。
好消息是,这个编译将被缓存,所以第二步和所有后续的 step 将更快。坏消息是,这只适用于你的所有 step 做完全相同的操作,这意味着:
batch 必须有用相同的张量尺寸。step 中存在循环,那么循环次数在每个 step 必须相同)。如果上述任何一项在两个 step 之间发生变化,都会触发新的编译,这将再次花费大量时间。在实践中,这意味着:你必须特别注意让你的输入中的所有张量具有相同的形状(所以没有动态填充),并且不应该使用具有 for 循环的层,其中 for 循环的根据 input 的不同而具有不同长度(如 LSTM)。否则,训练会慢得令人难受。
可以针对 TPU 执行一些特殊的代码:
xxxxxxxxxxfrom accelerate import DistributedType
if accelerator.distributed_type == DistributedType.TPU: # do something of static shapeelse: # go crazy and be dynamic最后要注意的是:如果你的模型有 tied weight (比如语言模型将 embedding matrix 的权重与 decoder 的权重绑定),将这个模型移动到 TPU (无论是你自己移动、还是由 prepare() 移动)会破坏绑定。你将需要在之后重新绑定权重。
在单个进程上执行的语句:有些语句只需要在特定的进程上执行而无需在所有进程上执行,如数据下载、记录日志、以及打印进度条。此时可以执行:
xxxxxxxxxxif accelerator.is_local_main_process: # Is executed once per server from tqdm.auto import tqdmprogress_bar = tqdm(range(args.max_train_steps), disable=not accelerator.is_local_main_process)local 意思是每台机器上运行:如果你在两台服务器上训练,其中每台服务器有几个 GPU ,则代码将在每台服务器上执行一次。
如果你希望对所有进程仅执行一次(如,上传模型到 model hub),则可以执行:
xxxxxxxxxxif accelerator.is_main_process: # Is executed once only对于 print 语句,你希望在每台机器上执行一次,则可以用 accelerator.print 代替 print 函数。
延迟执行:当你运行你的常规脚本时,指令是按顺序执行的。使用 Accelerate 在几个 GPU 上同时部署你的脚本会带来一个复杂的问题:虽然每个进程都是按顺序执行所有指令,但有些可能比其他的快。
你可能需要等待所有进程达到一定程度后再执行某条指令。例如,在确定每个进程都完成了训练之前,你不应该保存一个模型。要做到这一点,可以执行:
xxxxxxxxxxaccelerator.wait_for_everyone()这条指令将阻塞所有先到的进程,直到所有其他进程都到达该点(如果你只在一个 GPU 或 CPU 上运行你的脚本,这不会有任何作用)。
保存/加载模型:保存训练好的模型可能需要一些调整:
首先,你应该等待所有的进程到达脚本中的 “延迟执行” 所描述的那个点。
然后,你应该在保存模型之前 unwrap 你的模型。这是因为在通过 prepare() 方法时,你的模型可能被 wrap 从而用于分布式训练。如:
xxxxxxxxxxaccelerator.wait_for_everyone()unwrapped_model = accelerator.unwrap_model(model)accelerator.save(unwrapped_model.state_dict(), filename)如果你的脚本包含加载 checkpoint 的逻辑,我们也建议你在 unwrapped model 中加载你的权重(这只在 prepare() 后使用加载函数时有用)。如:
xxxxxxxxxxunwrapped_model = accelerator.unwrap_model(model)unwrapped_model.load_state_dict(torch.load(filename))保存/加载整个状态:当训练你的模型时,你可能想保存模型、优化器、随机数生成器、以及潜在的 LR scheduler 的当前状态,以便在同一个脚本中恢复训练。你可以分别使用 save_state() 和 load_state() 来做到这一点,只需简单地传入一个保存位置。
如果你通过 register_for_checkpointing() 注册了任何其他需要存储的 stateful item ,它们也会被保存和/或加载。
示例:
xxxxxxxxxxfrom accelerate import Acceleratorimport torch
accelerator = Accelerator()
my_scheduler = torch.optim.lr_scheduler.StepLR(my_optimizer, step_size=1, gamma=0.99)my_model, my_optimizer, my_training_dataloader = accelerate.prepare(my_model, my_optimizer, my_training_dataloader)
# Register the LR scheduleraccelerate.register_for_checkpointing(my_scheduler)
# Save the starting stateaccelerate.save_state("my/save/path")
device = accelerator.devicemy_model.to(device)
# Perform trainingfor epoch in range(num_epochs): for batch in my_training_dataloader: my_optimizer.zero_grad() inputs, targets = batch inputs = inputs.to(device) targets = targets.to(device) outputs = my_model(inputs) loss = my_loss_function(outputs, targets) accelerator.backward(loss) my_optimizer.step() my_scheduler.step()
# Restore previous stateaccelerate.load_state("my/save/path")梯度裁剪:如果你在脚本中使用梯度剪裁,你应该把对 torch.nn.utils.clip_grad_norm_ 或 torch.nn.utils.clip_grad_value_ 的调用分别替换为 accelerator.clipgrad_norm() 和 accelerator.clipgrad_value() 。
混合精度训练:如果你用 Accelerate 在混合精度下训练,那么模型内的计算将以混合精度进行,而模型外的每一次计算都将以 full precision 执行。例如,loss 的计算通常在模型外,且涉及 softmax 。然而,你可能想把你的 loss 计算放在 accelerator.autocast上下文管理器中:
xxxxxxxxxxwith accelerator.autocast(): loss = complex_loss_function(outputs, target)混合精度训练的另一个注意事项是:梯度会在开始时跳过一些更新,有时在训练过程中也会跳过。这是因为动态损失缩放 dynamic loss scaling 策略,在训练过程中会有一些时刻,梯度已经溢出,loss scaling factor 会减少,从而避免在下一步再次发生这种情况。
这意味着你可能会在没有梯度更新的时候就更新你的 learning rate scheduler 。这在一般情况下是没有问题的,但是当你的训练数据非常少,或者你的 scheduler 的第一个学习率值非常重要时,可能会有影响。在这种情况下,你可以跳过 learning rate scheduler 的更新:
xxxxxxxxxxif not accelerator.optimizer_step_was_skipped: lr_scheduler.step()梯度累积:要执行梯度累积,请使用 accumulate() 并指定 gradient_accumulation_steps 。在多设备训练时,这也会自动确保梯度同步或不同步,检查是否真的应该执行该 step ,并自动计算损失:
xxxxxxxxxxaccelerator = Accelerator(gradient_accumulation_steps=2)model, optimizer, training_dataloader = accelerator.prepare(model, optimizer, training_dataloader)
for input, label in training_dataloader: with accelerator.accumulate(model): predictions = model(input) loss = loss_function(predictions, label) accelerator.backward(loss) optimizer.step() scheduler.step() optimizer.zero_grad()相比之下,传统的梯度累加方法会用更冗长的代码:
xxxxxxxxxx+ from accelerate import Accelerator+ accelerator = Accelerator()
+ model, optimizer, training_dataloader, scheduler = accelerator.prepare(+ model, optimizer, training_dataloader, scheduler+ )
for index, batch in enumerate(training_dataloader): inputs, targets = batch- inputs = inputs.to(device)- targets = targets.to(device) outputs = model(inputs) loss = loss_function(outputs, targets) loss = loss / gradient_accumulation_steps+ accelerator.backward(loss) if (index+1) % gradient_accumulation_steps == 0: optimizer.step() scheduler.step() optimizer.zero_grad()DeepSpeed:DeepSpeed 支持是实验性的,所以底层 API 将在不久的将来发展,可能会有一些轻微的破坏性变化。具体而言, Accelerate 还不支持你自己编写的 DeepSpeed 配置,这将在下一个版本中添加。
使用 accelerate launch:
xxxxxxxxxxaccelerate launch {script_name.py} --arg1 --arg2 ...指定单个 GPU:
xxxxxxxxxxCUDA_VISIBLE_DEVICES="0" accelerate launch {script_name.py} --arg1 --arg2 ...在两个 GPU 上混合精度训练:
xxxxxxxxxxaccelerate launch --multi_gpu --mixed_precision=fp16 --num_processes=2 {script_name.py} {--arg1} {--arg2} ...建议总是在 accelerate launch 之前执行 accelerate config ,这样就无需再 accelerate launch 中指定各种配置。
在 notebook 中 launch:
CUDA 的代码在一个函数中,该函数被传递给 notebook_launcher() 。num_processes 为训练的设备数量(如,GPU, CPU, TPU 数量)。TPU ,在 training loop 函数之外声明你的模型。如:
xxxxxxxxxxfrom accelerate import notebook_launcherargs = ("fp16", 42, 64)notebook_launcher(training_loop, args, num_processes=2)对于 TPU:
xxxxxxxxxxmodel = create_model("resnet50d", pretrained=True, num_classes=len(label_to_id))
args = (model, "fp16", 42, 64)notebook_launcher(training_loop, args, num_processes=8)启用 FSDP:
首先进行配置:accelerate config。FSDP 配置的一个例子:
xxxxxxxxxxcompute_environment: LOCAL_MACHINEdeepspeed_config: {}distributed_type: FSDPdowncast_bf16: 'no'fsdp_config: fsdp_auto_wrap_policy: TRANSFORMER_BASED_WRAP fsdp_backward_prefetch_policy: BACKWARD_PRE fsdp_offload_params: false fsdp_sharding_strategy: 1 fsdp_state_dict_type: FULL_STATE_DICT fsdp_transformer_layer_cls_to_wrap: GPT2Blockmachine_rank: 0main_process_ip: nullmain_process_port: nullmain_training_function: mainmixed_precision: 'no'num_machines: 1num_processes: 2use_cpu: false然后开始训练:
xxxxxxxxxxaccelerate launch examples/nlp_example.py这些配置参数的含义为:
Sharding Strategy:
FULL_SHARD:对 optimizer states, gradients, parameters 都进行分片。SHARD_GRAD_OP:仅对 optimizer states, gradients 进行分片。NO_SHARD:不进行分片。Offload Params:一个布尔值,指定是否将 parameters 和 gradients 卸载到 CPU 。
Auto Wrap Policy:可以为 TRANSFORMER_BASED_WRAP, SIZE_BASED_WRAP, NO_WRAP 。
Transformer Layer Class to Wrap:当使用 TRANSFORMER_BASED_WRAP 时,指定特定的 transformer layer class name (大小写敏感)从而执行 wrap 。如 BertLayer, GPTJBlock, T5Block,... 。
Min Num Params:使用 SIZE_BASED_WRAP 的最小参数数量。
Backward Prefetch:可以为 BACKWARD_PRE, BACKWARD_POST, NO_PREFETCH。
State Dict Type:可以为 FULL_STATE_DICT, LOCAL_STATE_DICT, SHARDED_STATE_DICT 。
有几个需要注意的地方:
PyTorch FSDP 会自动 wrap 子模块,对参数进行扁平化处理,并将参数分片。由于这个原因,任何在 model wrapping 之前创建的 optimizer 都会被破坏,并占用更多的内存。因此,强烈建议在创建 optimizer 之前准备好模型,这也是很有效的。Accelerate 将自动 wrap 模型,并在单个模型的情况下为你创建一个优化器,并发出警告信息:
xxxxxxxxxxFSDP Warning: When using FSDP, it is efficient and recommended to call prepare for the model before creating the optimizer.
下面是使用 FSDP 时准备模型和优化器的推荐方法:
xxxxxxxxxxmodel = AutoModelForSequenceClassification.from_pretrained("bert-base-cased", return_dict=True)+ model = accelerator.prepare(model)
optimizer = torch.optim.AdamW(params=model.parameters(), lr=lr)
- model, optimizer, train_dataloader, eval_dataloader, lr_scheduler = accelerator.prepare(- model, optimizer, train_dataloader, eval_dataloader, lr_scheduler- )
+ optimizer, train_dataloader, eval_dataloader, lr_scheduler = accelerator.prepare(+ optimizer, train_dataloader, eval_dataloader, lr_scheduler+ )在单个模型的情况下,如果你用多个 parameter groups创建了优化器,并且用它们一起调用 prepare ,那么 parameter groups 将被丢失,并显示以下警告:
xxxxxxxxxxFSDP Warning: When using FSDP, several parameter groups will be conflated into a single one due to nested module wrapping and parameter flattening.
这是因为,由于嵌套的 FSDP 模块的参数扁平化为一维数组,在 wrapping 前创建的 parameter groups 在 wrapping 后将没有意义。
在有多个模型的情况下,有必要在创建优化器之前准备好模型,否则会抛出一个错误。然后将优化器以与相应模型相同的顺序传递给 prepare() 方法,否则 accelerator.save_state() 和 accelerator.load_state() 将导致错误/意外的行为。
这个功能与 Transformers library 的 run_translation.py 脚本中的 --predict_with_generate 不兼容。
对于更多的控制,用户可以利用 FullyShardedDataParallelPlugin 。在创建这个类的实例后,用户可以把它传递给 Accelerator 类的实例。
启用 DeepSpeed:DeepSpeed 实现了 ZeRO 论文中描述的一切。目前,它提供了如下的支持:Optimizer state partitioning(ZeRO stage 1)、Gradient partitioning(ZeRO stage 2)、Parameter partitioning(ZeRO stage 3)、Custom mixed precision training handling、一系列基于 CUDA 扩展的快速优化器、ZeRO-Offload 到 CPU 和 Disk/NVMe 。
DeepSpeed ZeRO-2 主要只用于训练,因为它的功能对推理没有用处。DeepSpeed ZeRO-3 也可以用于推理,因为它允许在多个 GPU 上加载巨大的模型。
Accelerate 通过两种方式集成 DeepSpeed:
deepspeed 配置文件来集成。它支持 DeepSpeed 的所有核心功能,并为用户提供了很大的灵活性。用户可能需要根据配置来改变几行代码。deepspeed_plugin 来集成。这支持 DeepSpeed 功能的子集,并对其余的配置使用默认选项。用户不需要改变任何代码。什么被集成了?
训练:DeepSpeed ZeRO 训练支持完整的 ZeRO stages 1, 2 and 3、以及 optimizer states, gradients and parameters 的 CPU/Disk offload 。
Stage 1:将 optimizer states 分片到数据并行 workers/GPUs 上。Stage 2:将 optimizer states + gradients 分片到数据并行 workers/GPUs 上。Stage 3:将 optimizer states + gradients + model parameters 分片到数据并行 workers/GPUs 上。Optimizer Offload:将 optimizer states + gradients 卸载到 CPU/Disk ,建立在 ZERO Stage 2 之上。Param Offload:将 model parameters 卸载到 CPU/Disk ,建立在 ZERO Stage 3 之上。注意:关于 Disk Offload ,磁盘应该是 NVME 的,以便有好的速度,但技术上可以在任何磁盘上工作。
推断:DeepSpeed ZeRO Inference 支持 ZeRO Stage 3 与 ZeRO-Infinity 。它使用与训练相同的 ZeRO 协议,但它不使用优化器和 lr scheduler 。
如何工作:
首先安装 DeepSpeed version >=0.6.5 。
然后配置:accelerate config 。一个配置的例子:
xxxxxxxxxxcompute_environment: LOCAL_MACHINEdeepspeed_config: gradient_accumulation_steps: 1 gradient_clipping: 1.0 offload_optimizer_device: none offload_param_device: none zero3_init_flag: true zero_stage: 2distributed_type: DEEPSPEEDfsdp_config: {}machine_rank: 0main_process_ip: nullmain_process_port: nullmain_training_function: mainmixed_precision: fp16num_machines: 1num_processes: 2use_cpu: false最后执行训练:accelerate launch examples/nlp_example.py 。
配置参数的含义:
zero_stage:0 表示禁用,1 表示 optimizer state partitioning,2 表示 optimizer+gradient state partitioning ,3 表示 optimizer+gradient+parameter partitioning 。gradient_accumulation_steps:一个整数,表示在 averaging 和 applying 这些梯度之前,积累梯度的 training steps 数量。gradient_clipping:一个浮点数,指定启用梯度剪裁的值。offload_optimizer_device:none 表示禁用 optimizer offloading,cpu 表示 offload optimizer 到 CPU,nvme 表示offload optimizer 到 NVMe SSD 。仅适用于 ZeRO >= Stage-2 。offload_param_device:none 表示禁用 parameter offloading,cpu 表示 offload parameter 到 CPU,nvme 表示offload parameter 到 NVMe SSD 。仅适用于 ZeRO Stage-3 。zero3_init_flag:决定是否启用 deepspeed.zero.Init 来构建大规模模型。只适用于 ZeRO Stage-3 。zero3_save_16bit_model:决定是否在使用 ZeRO Stage-3 时保存 16 位模型权重。mixed_precision:no 用于 FP32 训练,fp16 用于 FP16 混合精度训练,bf16用于 BF16 混合精度训练。当使用配置文件时,需要修改一些代码:
DeepSpeed Optimizers and Schedulers:
如果是 DeepSpeed Optim + DeepSpeed Scheduler :用户必须使用 enhance.utils.DummyOptim 和enhance.utils.DummyScheduler 来取代他们代码中的 PyTorch/Custom 优化器和调度器:
xxxxxxxxxx# Creates Dummy Optimizer if `optimizer` was spcified in the config file else creates Adam Optimizer optimizer_cls = ( torch.optim.AdamW if accelerator.state.deepspeed_plugin is None or "optimizer" not in accelerator.state.deepspeed_plugin.deepspeed_config else DummyOptim ) optimizer = optimizer_cls(optimizer_grouped_parameters, lr=args.learning_rate)
# Creates Dummy Scheduler if `scheduler` was spcified in the config file else creates `args.lr_scheduler_type` Scheduler if ( accelerator.state.deepspeed_plugin is None or "scheduler" not in accelerator.state.deepspeed_plugin.deepspeed_config ): lr_scheduler = get_scheduler( name=args.lr_scheduler_type, optimizer=optimizer, num_warmup_steps=args.num_warmup_steps, num_training_steps=args.max_train_steps, ) else: lr_scheduler = DummyScheduler( optimizer, total_num_steps=args.max_train_steps, warmup_num_steps=args.num_warmup_steps )Custom Optim + Custom Scheduler:当 DeepSpeed 配置文件中没有 optimizer key 和 scheduler key 的情况。在这种情况下,不需要用户修改代码,通过 DeepSpeed Plugin 使用集成时就是这种情况。
Custom Optim + DeepSpeed Scheduler :这种情况下,用户必须使用accelerate.utils.DummyScheduler 来替换代码中的 PyTorch/Custom scheduler 。
DeepSpeed Optim + Custom Scheduler:这将导致一个错误,因为当使用 DeepSpeed Optim 时必须使用 DeepSpeed Scheduler 。
DeepSpeed 配置文件中存在一些 "auto" 值,这些值是由 prepare 方法根据所提供的模型、dataloaders 、dummy optimizer 和 dummy schedulers 自动处理的。那些不是 "auto" 的字段必须由用户明确指定。如 zero_stage2_config.json 文件:
xxxxxxxxxx{ "fp16": { "enabled": true, "loss_scale": 0, "loss_scale_window": 1000, "initial_scale_power": 16, "hysteresis": 2, "min_loss_scale": 1 }, "optimizer": { "type": "AdamW", "params": { "lr": "auto", "weight_decay": "auto", "torch_adam": true, "adam_w_mode": true } }, "scheduler": { "type": "WarmupDecayLR", "params": { "warmup_min_lr": "auto", "warmup_max_lr": "auto", "warmup_num_steps": "auto", "total_num_steps": "auto" } }, "zero_optimization": { "stage": 2, "allgather_partitions": true, "allgather_bucket_size": 2e8, "overlap_comm": true, "reduce_scatter": true, "reduce_bucket_size": "auto", "contiguous_gradients": true }, "gradient_accumulation_steps": 1, "gradient_clipping": "auto", "steps_per_print": 2000, "train_batch_size": "auto", "train_micro_batch_size_per_gpu": "auto", "wall_clock_breakdown": false}保存和加载:
对于 ZeRO Stage-1 和 ZeRO Stage-2,模型的保存和加载不需要改动。
对于 ZeRO Stage-3 ,state_dict 仅只包含占位符,因为模型的权重被分片到多个 GPU 。ZeRO Stage-3 有两个选项:
保存整个 16 位的模型权重,然后使用 model.load_state_dict(torch.load(pytorch_model.bin)) 来直接加载。为此,要么在 DeepSpeed 配置文件中把 zero_optimization.stage3_gather_16bit_weights_on_model_save 设为True ,要么在 DeepSpeed Plugin 中把 zero3_save_16bit_model 设为 True 。
请注意,这个选项需要在一个 GPU 上整合权重,这可能会很慢,而且对内存要求很高,所以只有在需要时才使用这个功能。
示例:
xxxxxxxxxxunwrapped_model = accelerator.unwrap_model(model)
# New Code ## Saves the whole/unpartitioned fp16 model when in ZeRO Stage-3 to the output directory if# `stage3_gather_16bit_weights_on_model_save` is True in DeepSpeed Config file or# `zero3_save_16bit_model` is True in DeepSpeed Plugin.# For Zero Stages 1 and 2, models are saved as usual in the output directory.# The model name saved is `pytorch_model.bin`unwrapped_model.save_pretrained( args.output_dir, is_main_process=accelerator.is_main_process, save_function=accelerator.save, state_dict=accelerator.get_state_dict(model),)为了获得 32 位的权重,首先使用 model.save_checkpoint() 保存模型:
xxxxxxxxxxsuccess = model.save_checkpoint(PATH, ckpt_id, checkpoint_state_dict)status_msg = "checkpointing: PATH={}, ckpt_id={}".format(PATH, ckpt_id)if success: logging.info(f"Success {status_msg}")else: logging.warning(f"Failure {status_msg}")这将在 checkpoint 目录下创建 ZeRO model 和 optimizer 的 partitions 以及 zero_to_fp32.py 脚本。你可以使用这个脚本来做离线整合,这不需要配置文件或 GPU 。如:
xxxxxxxxxxcd /path/to/checkpoint_dir./zero_to_fp32.py . pytorch_model.bin# Processing zero checkpoint at global_step1# Detected checkpoint of type zero stage 3, world_size: 2# Saving fp32 state dict to pytorch_model.bin (total_numel=60506624)要想加载 32 位的模型,做法如下:
xxxxxxxxxxfrom deepspeed.utils.zero_to_fp32 import load_state_dict_from_zero_checkpoint
unwrapped_model = accelerator.unwrap_model(model)fp32_model = load_state_dict_from_zero_checkpoint(unwrapped_model, checkpoint_dir)如果你仅仅想得到 state_dict,做法如下:
xxxxxxxxxxfrom deepspeed.utils.zero_to_fp32 import get_fp32_state_dict_from_zero_checkpoint
state_dict = get_fp32_state_dict_from_zero_checkpoint(checkpoint_dir)注意,加载时需要大约 2 倍于 final checkpoint 大小的内存。
ZeRO Inference:DeepSpeed ZeRO Inference 支持 ZeRO stage 3 。通过 accelerate 的集成,你只需要 prepare 模型和 dataloader ,如下:
xxxxxxxxxxmodel, eval_dataloader = accelerator.prepare(model, eval_dataloader)注意事项:
DeepSpeed 的 Pipeline Parallelism 。mpu ,限制了 Megatron-LM 中支持的张量并行。目前 Accelerate 支持如下的 tracker:TensorBoard, WandB, CometML, MLFlow ,如:
xxxxxxxxxxfrom accelerate import Acceleratorfrom accelerate.utils import LoggerType
accelerator = Accelerator(log_with="all") # For all available trackers in the environmentaccelerator = Accelerator(log_with="wandb")accelerator = Accelerator(log_with=["wandb", LoggerType.TENSORBOARD])然后需要初始化 tracker:
xxxxxxxxxxhps = {"num_iterations": 5, "learning_rate": 1e-2}accelerator.init_trackers("my_project", config=hps)然后记录日志:
xxxxxxxxxxaccelerator.log({"train_loss": 1.12, "valid_loss": 0.8}, step=1)最后在训练结束时调用:accelerator.end_training() 。
你也可以通过 accelerator.get_tracker 来获取内置的 tracker 对象:
xxxxxxxxxxwandb_tracker = accelerator.get_tracker("wandb")if accelerator.is_main_process: wandb_run.log_artifact(some_artifact_to_log)处理大模型:常规的加载预训练模型的方式:
xxxxxxxxxximport torch
my_model = ModelClass(...) # step 1state_dict = torch.load(checkpoint_file) # step 2my_model.load_state_dict(state_dict) # step 3这对于常规大小的模型而言很有效,但是无法处理大型模型:在 step 1 我们在 RAM 中加载一个完整版本的模型,并花一些时间随机初始化权重(这将在 step 3 被丢弃);在 step 2 ,我们在 RAM 中加载另一个完整版本的模型,并使用预训练的权重。
Accelerate 提供一些工具来帮助处理大模型(这些 API 是实验性质的,未来可能会发生改变):
init_empty_weights 上下文管理器:初始化一个模型而无需使用任何内存。这依赖于 PyTorch 1.9 中引入的 meta device 。
xxxxxxxxxxfrom accelerate import init_empty_weights
with init_empty_weights(): my_model = ModelClass(...)在该上下文管理器中,每当有一个 parameter 被创建时,它就被立即移动到 meta device 。
sharded checkpoints:有可能你的模型太大从而无法装入内存,这并不意味着它不能被加载:如果你有一个或几个 GPU ,这就有更多的内存可用于存储你的模型。此时需要你的 checkpoint 被拆分为几个小文件,即 checkpoint shards 。
Accelerate 将处理 checkpoint shards ,但是要满足如下格式:你的 checkpoint shards 应该放在一个文件夹中,并且有几个包含部分 state dict 的文件、以及一个 index.json 文件(将 parameter name 映射到包含该 parameter weights 的文件)。如:
xxxxxxxxxxfirst_state_dict.binindex.jsonsecond_state_dict.bin其中 index.json 内容为:
xxxxxxxxxx{ "linear1.weight": "first_state_dict.bin", "linear1.bias": "first_state_dict.bin", "linear2.weight": "second_state_dict.bin", "linear2.bias": "second_state_dict.bin"}load_checkpoint_and_dispatch:在 empty model 中加载一个 checkpoint 。它支持 full checkpoints (包含整个 state dict 的单一文件)以及 sharded checkpoints 。它还会在你可用的设备( GPU 、CPU )上自动分配这些权重,所以如果你正在加载一个 sharded checkpoints ,最大的RAM 用量将是最大分片的大小。
例如:
xxxxxxxxxxgit clone https://huggingface.co/sgugger/sharded-gpt-j-6Bcd sharded-gpt-j-6Bgit-lfs installgit pull初始化模型:
xxxxxxxxxxfrom accelerate import init_empty_weightsfrom transformers import AutoConfig, AutoModelForCausalLM
checkpoint = "EleutherAI/gpt-j-6B"config = AutoConfig.from_pretrained(checkpoint)
with init_empty_weights(): model = AutoModelForCausalLM.from_config(config)加载权重:
xxxxxxxxxxfrom accelerate import load_checkpoint_and_dispatch
model = load_checkpoint_and_dispatch( model, "sharded-gpt-j-6B", device_map="auto", no_split_module_classes=["GPTJBlock"])通过 device_map="auto",Accelerate 根据可用资源自动决定将模型的每一层放在哪里:
GPU 上的最大可用空间。CPU 上。RAM ,我们将剩余的权重作为内存映射的张量存储在硬盘上。no_split_module_classes=["GPTJBlock"] 表示属于 GPTJBlock 的模块不应该在不同的设备上分割。你应该在这里设置所有包括某种残差连接的 block 。
可以通过 model.hf_device_map 查看模型的权重的设备。
分布式训练的复现:
设置随机数种子:
xxxxxxxxxxfrom accelerate import set_seedset_seed(42)它在内部设置了五种随机数种子:
xxxxxxxxxx random.seed(seed) np.random.seed(seed) torch.manual_seed(seed) torch.cuda.manual_seed_all(seed) # ^^ safe to call this function even if cuda is not available if is_tpu_available(): xm.set_rng_state(seed)设置 batch size:当使用 Accelerate 训练时,传递给 dataloader 的 batch size 是 batch size/GPU ,因此 final batch size 是 batch size * device num 。
设置学习率:学习率应该和 device num 成正比,如:
xxxxxxxxxxlearning_rate = 1e-3accelerator = Accelerator()learning_rate *= accelerator.num_processes
optimizer = AdamW(params=model.parameters(), lr=learning_rate)梯度同步:在 DDP 中,PyTorch 在一些特定的点上进行进程间通信。然而在梯度累积时,你会累积 n 个 loss 并跳过 .backward() 。这可能会导致明显的减速,因为所有的进程都需要与它们进行更多次的通信。
可以通过 no_sync 上下文管理器来避免:
xxxxxxxxxx ddp_model, dataloader = accelerator.prepare(model, dataloader)
for index, batch in enumerate(dataloader): inputs, targets = batch # Trigger gradient synchronization on the last batch if index != (len(dataloader)-1):- with ddp_model.no_sync():+ with accelerator.no_sync(model): # Gradients only accumulate outputs = ddp_model(inputs) loss = loss_func(outputs, targets) accelerator.backward(loss) else: # Gradients finally sync outputs = ddp_model(inputs) loss = loss_func(outputs) accelerator.backward(loss)或者直接使用 accelerator.accumulate:
xxxxxxxxxxddp_model, dataloader = accelerator.prepare(model, dataloader)
for batch in dataloader: with accelerator.accumulate(model): optimizer.zero_grad() inputs, targets = batch outputs = model(inputs) loss = loss_function(outputs, targets) accelerator.backward(loss)进程间同步:
xxxxxxxxxxaccelerator.wait_for_everyone()这将阻塞所有进程直到所有进程都达到该点。
用途:
加载数据集:
xxxxxxxxxxwith accelerator.main_process_first(): datasets = load_dataset("glue", "mrpc")这等价于:
xxxxxxxxxx# First do something on the main processif accelerator.is_main_process: datasets = load_dataset("glue", "mrpc")else: accelerator.wait_for_everyone()
# And then send it to the rest of themif not accelerator.is_main_process: datasets = load_dataset("glue", "mrpc")else: accelerator.wait_for_everyone()存取 state_dict:
xxxxxxxxxxif accelerator.is_main_process: model = accelerator.unwrap_model(model) torch.save(model.state_dict(), "weights.pth") with accelerator.main_process_first(): state = torch.load("weights.pth") model.load_state_dict(state)在 global main 进程上 tokenizing,然后传播到每个 worker :
xxxxxxxxxxdatasets = load_dataset("glue", "mrpc")
with accelerator.main_process_first(): tokenized_datasets = datasets.map( tokenize_function, batched=True, remove_columns=["idx", "sentence1", "sentence2"], )
Accelerator 最佳实践:
print 语句应该由accelerator.print() 代替,每个进程打印一次。
xxxxxxxxxx- print("My thing I want to print!")+ accelerator.print("My thing I want to print!")每个 server 执行一次的语句,应该使用 accelerator.is_local_main_process:
xxxxxxxxxxif accelerator.is_local_main_process: do_thing_once_per_server()或者使用 accelerator.on_local_main_process() 装饰器:
xxxxxxxxxx.on_local_main_processdef do_my_thing(): "Something done once per server" do_thing_once_per_server()所有 server 中仅执行一次的语句,应该使用 accelerator.is_main_process:
xxxxxxxxxxif accelerator.is_main_process: do_thing_once()或者使用 accelerator.on_main_process() 装饰器:
xxxxxxxxxx.on_main_processdef do_my_thing(): "Something done once per server" do_thing_once()在指定的进程(局部编号或全局编号)上执行的语句,也可以使用如下的装饰器:
xxxxxxxxxx.on_local_process(local_process_idx=0)def do_my_thing(): "Something done on process index 0 on each server" do_thing_on_index_zero_on_each_server() .on_process(process_index=0)def do_my_thing(): "Something done on process index 0" do_thing_on_index_zero()同步控制:使用 accelerator.wait_for_everyone() 来确保所有进程在继续之前,先到达该点。例如,在模型保存之前很有用。
保存和加载:使用 accelerator.unwrap_model() 来删除所有在分布式过程中添加的特殊的 model wrapper 。
xxxxxxxxxxmodel = MyModel()model = accelerator.prepare(model)# Unwrapmodel = accelerator.unwrap_model(model)使用 accelerator.save() 而不是 torch.save():
xxxxxxxxxx state_dict = model.state_dict()- torch.save(state_dict, "my_state.pkl")+ accelerator.save(state_dict, "my_state.pkl")使用 accelerator.clipgrad_norm() 而不是 torch.nn.utils.clip_grad_norm_() ;使用 accelerator.clipgrad_value() 而不是torch.nn.utils.clip_grad_value() 。
梯度累积:要执行梯度累积,请使用 accelerator.accumulate() 并指定 gradient_accumulation_steps 。即使在多设备训练时,它也会自动处理。
xxxxxxxxxx- accelerator = Accelerator()+ accelerator = Accelerator(gradient_accumulation_steps=2)
for (input, label) in training_dataloader:+ with accelerator.accumulate(model): predictions = model(input) loss = loss_function(predictions, labels) accelerator.backward(loss) optimizer.step() scheduler.step() optimizer.zero_grad()class accelerate.Accelerator:Accelerator 类,用于分布式训练或混合精度训练。
xxxxxxxxxxclass accelerate.Accelerator( device_placement: bool = Trues, plit_batches: bool = False, fp16: bool = None, mixed_precision: typing.Union[accelerate.utils.dataclasses.PrecisionType, str] = None, gradient_accumulation_steps: int = 1, cpu: bool = False, deepspeed_plugin: DeepSpeedPlugin = None, fsdp_plugin: FullyShardedDataParallelPlugin = None, megatron_lm_plugin: MegatronLMPlugin = None, rng_types: typing.Union[typing.List[typing.Union[str, accelerate.utils.dataclasses.RNGType]], NoneType] = None, log_with: typing.Union[typing.List[typing.Union[str, accelerate.utils.dataclasses.LoggerType, accelerate.tracking.GeneralTracker]], NoneType] = None, logging_dir: typing.Union[str, os.PathLike, NoneType] = None, dispatch_batches: typing.Optional[bool] = None, even_batches: bool = True, step_scheduler_with_optimizer: bool = True, kwargs_handlers: typing.Optional[typing.List[accelerate.utils.dataclasses.KwargsHandler]] = None, dynamo_backend: typing.Union[accelerate.utils.dataclasses.DynamoBackend, str] = None )参数:
device_placement:一个布尔值,默认为 True,指定 accelerator 是否应该将对象放在 device 上(由 dataloader, model 等等产生的张量)。
split_batches:一个布尔值,默认为 False,指定 accelerator 是否应该将 dataloaders 产生的 batches 在设备上进行分割。
True ,实际使用的 batch size 在任何类型的分布式进程中都是一样的,但它必须是你使用的 num_processes (即,进程数量)的整数倍。False ,实际使用的 batch size 将是你脚本中设置的 batch size 乘以进程数。mixed_precision:一个字符串,指定是否使用混合精度训练(fp16 或 bfloat16)。可以为 'no', 'fp16', 'bf16' 。默认为环境变量 ACCELERATE_MIXED_PRECISION 中的值,或者通过 accelerate.launch 传入的选项。
'fp16' 要求 pytorch 1.6 及其以上版本,'bf16' 要求 pytorch 1.10 及其以上版本。
gradient_accumulation_steps:一个整数,指定在累积梯度之前应该经过多少个 step。默认为 1 ,表示没有梯度累积。一个大于 1 的数字应该与 Accelerator.accumulate 相结合。
cpu:一个布尔值,指定是否强制脚本在 CPU 上执行。如果设置为 True ,将忽略 GPU 的可用性,并且仅强制在一个进程上执行。默认为 False 。
deepspeed_plugin:一个 DeepSpeedPlugin ,用于调整 DeepSpeed 的相关的参数。也可以通过 accelerate config 来直接调整 DeepSpeed 。
fsdp_plugin:一个 FullyShardedDataParallelPlugin ,用于调整 FSDP 的相关的参数。也可以通过 accelerate config 来直接调整 FSDP 。
megatron_lm_plugin:一个 MegatronLMPlugin ,用于调整 FSDPMegatronLM 的相关的参数。也可以通过 accelerate config 来直接调整 MegatronLM 。
rng_types:一个关于字符串或 RNGType 的列表,它指定了一个关于随机数生成器的列表,用于在 dataloaders 的每个 iteration 开始时进行同步。应该是如下的一个或几个:
"torch":基本的 torch 的随机数生成器。"cuda":CUDA 随机数生成器(仅限于 GPU)。"xla":XLA随机数生成器(仅咸鱼 TPU )。"generator":sampler (或 batch sampler)的 torch.Generator 、或 iterable dataset 的 torch.Generator 。如果 PyTorch 版本 <=1.5.1 ,将默认为 ["torch"] ;如果 PyTorch 版本 >=1.6 ,则默认为 ["generator"] 。
log_with:一个关于字符串、LoggerType、GeneralTracker 的列表,指定 loggers 。可以为如下的一个或几个:"all", "tensorboard", "wandb", "comet_ml"。
如果选择了 "all" ,就会接收环境中所有可用的 trackers 并初始化它们。也可以接受用于自定义 tracker 的 GeneralTracker 的实现,并且可以与 "all" 结合使用。
logging_dir:一个字符串或 os.PathLike,指定用于日志的目录的路径。
dispatch_batches:一个布尔值,如果为 "True" ,Accelerator 准备的 dataloader 只在 global main 进程上进行迭代,然后将 batch 分割并广播给每个 worker 进程。对于底层数据集是 IterableDataset 的 DataLoader ,默认为 True ,否则为False 。
even_batches:一个布尔值,如果设置为 True ,在所有进程的 total batch size 不能完全分割数据集的情况下,数据集开头的样本将被重复,这样 batch 就可以在所有 worker 之间平均分配。默认为 True 。
step_scheduler_with_optimizer:一个布尔值,如果学习率 scheduler 与优化器同时 step ,则设置为 True ;否则设置为 False。默认为 True 。
kwargs_handlers:一个关于 KwargHandler 的列表,用于自定义如何创建与分布式训练或混合精度相关的对象。
dynamo_backend:一个字符串或 DynamoBackend,设置一个 dynamo 后端从而利用 Torch dynamo 优化你的训练。默认为 'no' 。
属性:
device:一个 Torch.device 对象,表示要使用的设备。distributed_type:一个 DistributedType 对象,表示分布式训练配置。local_process_index:一个整数,表示当前机器上的进程编号。mixed_precision:一个字符串,表示配置好的混合精度模式。num_processes :一个整数,表示用于训练的进程总数。optimizer_step_was_skipped:一个布尔值,表示当学习率不应该被改变的情况下,优化器的更新是否被跳过(因为混合精度中的梯度溢出)。process_index:一个整数,表示当前进程在所有进程中的总编号。state:一个 AcceleratorState,表示分布式的 setup state 。sync_gradients:一个布尔值,表示目前梯度是否在所有进程中被同步。use_distributed:一个布尔值,表示当前配置是否用于分布式训练。方法:
accumulate(model):一个上下文管理器,它 wrap 模型并自动进行梯度累积。
参数:model:一个 torch.nn.Module 对象,它是被 Accelerator.prepare 准备好之后的模型。
autocast():如果启用的话,将在这个上下文管理器中的代码块内应用自动混合精度。否则不会发生任何变化。
backward(loss, **kwargs):根据 Accelerator.gradient_accumulation_steps 对梯度进行调整,并根据配置来调用正确的backward() 。应该用来代替 loss.backward() 。
clear():是 Accelerate.free_memory 的别名,释放所有内部对象的引用并调用垃圾收集器。你应该在两个不同 models/optimizers 训练之间调用这个方法。
clip_grad_norm_(parameters, max_norm, norm_type = 2 ) -> torch.Tensor:参数梯度的总范数(将所有参数视为单个向量来看待)的范数截断。应该用来代替 torch.nn.utils.clip_grad_norm_ 。
参数:
parameters:待截断梯度的参数列表。max_norm:梯度阈值。norm_type:范数类型。clip_grad_value_(parameters, clip_value ) -> torch.Tensor:参数梯度的数值截断(绝对值)。应该用来代替 torch.nn.utils.clip_grad_value_ 。
参数:
parameters :待截断取值的参数列表。clip_value:参数阈值(绝对值)。end_training():运行任何特殊的 end training behavior ,比如只在 global main 进程上停止 tracker 。如果使用实验跟踪,应始终在脚本的最后调用 end_training() 。
free_memory():参考 clear() 。
gather(tensor) -> torch.Tensor, or a nested tuple/list/dictionary of torch.Tensor :跨所有进程收集 tensor 的取值,并在第一个维度上将其拼接起来。在进行评估时,对所有进程的预测进行 regroup 是很有用的。
注意:这种收集发生在所有进程中。
参数:tensor:一个张量或张量的集合,表示需要跨所有进程来收集取值的张量。
返回:返回收集后的结果,类型与 tensor 相同。
gather_for_metrics(tensor):与 gather() 作用相同,但是 gather_for_metrics 可能会丢弃 last batch 中重复的数组。它经常被用于收集 inputs 和 targets 来计算指标。
参数和返回值:参考 gather() 。
get_state_dict( model, unwrap = True ):以 full precision 来返回一个模型的 state_dict ,这个模型是被 accelerator.prepare() 处理过的。
参数:
model:一个 PyTorch 模型,它被 accelerator.prepare() 处理过。unwrap:一个布尔值,指定是否返回原始的 state_dict 。如果为 False,则返回 wrapped state_dict 。默认为 True 。get_tracker(name: str):基于 name 从 self.trackers 中返回一个 tracker,仅在 global main 进程上有效。
参数:name:一个字符串,指定 tracker 的名字。
init_trackers(project_name: str, config: Optional[dict] = None, init_kwargs: Optional[dict] = {}) :为存储在 self.log_with 中的所有 trackers 执行初始化。
参数:
project_name:一个字符串,指定 project 的名字。config:一个字典,指定 starting configuration 。init_kwargs:一个字典,它将被传递给 tracker 的初始化方法。join_uneven_inputs(joinables, even_batches = None ):一个上下文管理器,用于在 uneven 的输入上进行分布式训练或分布式评估。它作为 torch.distributed.algorithms.join 的 wrapper ,当 total batch size 无法整除 dataset length 时很有用。
仅支持多 GPU 上的 Distributed Data Parallel training。对于其它配置,则没有效果。
参数:
joinables:关于 torch.distributed.algorithms.Joinable 的列表,它为 torch.distributed.algorithms.Joinable 所子类化的模型或优化器,如 Accelerator.prepare 所准备的 PyTorch Module 。
even_batches:一个布尔值,它覆盖 Accelerator 中设置的 even_batches 的值。如果未提供,则默认使用 Accelerator 中的 even_batches 的值。
对于 iterable-style 的 dataloader,该参数不生效。
示例:
xxxxxxxxxxfrom accelerate import Accelerator
accelerator = Accelerator(even_batches=True)ddp_model, optimizer, dataloader = accelerator.prepare(model, optimizer, dataloader)
with accelerator.join_uneven_inputs([ddp_model], even_batches=False): for input, output in dataloader: outputs = model(input) loss = loss_func(outputs) loss.backward() optimizer.step() optimizer.zero_grad()load_state(input_dir: str):加载 model, optimizer, scaler, RNG generators, registered objects 的当前状态。必须与 accelerator.save_state() 配合工作。
参数:input_dir :一个字符串或 os.PathLike 对象,指定存放 state 的目录。
local_main_process_first():让 local main 进程先进入一个 with block ,其它进程将在 local main 进程退出后进入 with block 。
log(values: dict, step: Optional[int] = None, log_kwargs: Optional[dict] = {}):记录 values 到 self.trackers 中的所有 trackers,仅在 global main 进程上生效。
参数:
values:一个字典,仅包含 int/float/str 数据类型,表示待记录的值。step:一个整数,指定 run step 。log_kwargs:一个字典,它将被传递给 tracker 的 log 函数。main_process_first():让 global main 进程先进入一个 with block ,其它进程将在global main 进程退出后进入 with block 。
no_sync(model):一个上下文管理器,它通过调用 torch.nn.parallel.DistributedDataParallel.no_sync 来禁用跨DDP 进程的梯度同步。如果模型不在 DDP 中,这个上下文管理器不做任何事情。
参数:model:一个 torch.nn.Module 对象,它被 accelerator.prepare() 处理过。
示例:
xxxxxxxxxxfrom accelerate import Accelerator
accelerator = Accelerator()dataloader, model, optimizer = accelerator.prepare(dataloader, model, optimizer)input_a = next(iter(dataloader))input_b = next(iter(dataloader))
with accelerator.no_sync(): outputs = model(input_a) loss = loss_func(outputs) accelerator.backward(loss) # No synchronization across processes, only accumulate gradients outputs = model(input_b)loss = loss_func(outputs)accelerator.backward(loss)# Synchronization across all processesoptimizer.step()optimizer.zero_grad()on_last_process(func):一个装饰器,将仅仅在最后一个进程上运行被装饰的函数。
on_local_main_process(func):一个装饰器,将仅仅在 local main 进程上运行被装饰的函数。
on_local_process(local_process_idx):一个装饰器,将仅仅在 local process index 进程上运行被装饰的函数。
on_main_process(func):一个装饰器,将仅仅在 global main 进程上运行被装饰的函数。
on_process(process_idx):一个装饰器,将仅仅在 global process index 进程上运行被装饰的函数。
pad_across_processes(tensor, dim = 0, pad_index = 0, pad_first = False ):递归地将所有设备的张量(位于嵌套的 list/tuple/dict 中)填充到相同的 size,以便它们可以安全地被收集起来。
参数:
tensor:torch.Tensor 的 list/tuple/dict ,指定被收集的数据。dim:一个整数,指定填充哪一维。默认为 0 。pad_index:一个整数,指定用什么值来填充。pad_first:一个布尔值,指定是否从开始填充。默认为 False,表示从尾部填充。prepare(*args, device_placement = None ):为分布式训练和混合精度准备 args 中传递的所有对象,然后以相同的顺序返回。
如果你使用一个 model 来用于推断,并且没有任何形式的混合精度,那么你不需要 prepare 该 model 。
参数:
args:一个列表,可以包含如下类型的对象:torch.utils.data.DataLoader、torch.nn.Module、torch.optim.Optimizer、torch.optim.lr_scheduler.LRScheduler。device_placement:一个关于布尔值的列表,要求长度与 args 相同,分别指定每个被 prepare 的对象是否 automatic device placement 。prepare_data_loader(data_loader: DataLoaderde, dvice_placement = None ):准备一个 PyTorch DataLoader 用于分布式训练。推荐使用 prepare() 函数。
参数:参考 prepare() 函数。
prepare_model(model: Module, dvice_placement = None ):准备一个 PyTorch model 用于分布式训练。推荐使用 prepare() 函数。
参数:参考 prepare() 函数。
prepare_optimizer(optimizer: Optimizer, dvice_placement = None ):准备一个 PyTorch Optimizer 用于分布式训练。推荐使用 prepare() 函数。
参数:参考 prepare() 函数。
prepare_scheduler(scheduler: _LRScheduler, dvice_placement = None ):准备一个 PyTorch Scheduler 用于分布式训练。推荐使用 prepare() 函数。
参数:参考 prepare() 函数。
print(*args, **kwargs ):用于替代 python 内置的 print() 函数从而仅在每个 server 上打印一次。
reduce(tensor, reduction = 'sum' ) -> torch.Tensor, or a nested tuple/list/dictionary of torch.Tensor:跨所有进程来 reduce 指定的张量。注意,所有的进程都将得到被 reduce 之后的值。
参数:
tensor:一个 Tensor 或 Tensor 的集合,指定要被 reduce 的张量。reduction:一个字符串,指定 reduce 方式,可以为 'sum', 'mean', 'none' 。如果为 'none',则不做任何操作。默认为 'sum' 。返回:与 tensor 类型相同,表示被 reduce 之后的值。
register_for_checkpointing(*objects):注册对象,从而在 save_state/load_state 将该对象保存或加载。
注意,该方法应该在同一脚本中加载或保存 state 时利用。它不是被设计用来在不同的脚本中使用的。
参数:objects:被注册的对象,每个对象必须有一个 load_state_dict 方法和一个 state_dict 方法。
save(obj, f):在每台机器上保存 obj 一次,用于替代 torch.save 。
参数:
obj:被保存的对象。f:一个字符串或 os.PathLike 对象,指定存储路径。save_state(output_dir: str):保存 model, optimizer, scaler, RNG generator, registered object 等对象的当前状态。
参数:output_dir:一个字符串或 os.PathLike 对象,指定存储路径。
unscale_gradients(optimizer = None ):在使用 AMP 的混合精度训练中 unscale 梯度。在所有其他 setting 中,该方法不做任何事情。
参数:optimizer:一个 Optimizer 或一组 Optimizer,指定哪些 optimizer 需要 unscale 梯度。如果未设置,则默认为传递给 prepare() 方法的所有 optimizers 。
unwrap_model(model, keep_fp32_wrapper: bool = False ):unwrap model,其中 model 是经过 prepare() 处理过的。该方法常用于保存 model 。
参数:
model:一个 torch.nn.Module,指定需要被 unwrap 的模型。keep_fp32_wrapper:一个布尔值,指定是否需要移除 mixed precision hook(如果有的话)。默认为 False 。wait_for_everyone():将停止当前进程的执行,直到其他所有进程都达到该点(因此,当脚本只在一个进程中运行时,这没有任何作用)。在保存模型前执行该方法很有用。
class accelerate.state.AcceleratorState:单例类,它存储当前训练环境的信息。该类是不可变的,在第一次初始化之后就保存不变。
xxxxxxxxxxclass accelerate.state.AcceleratorState( mixed_precision: str = None, cpu: bool = False, dynamo_backend = None, deepspeed_plugin = None, fsdp_plugin = None, megatron_lm_plugin = None, _from_accelerator: bool = False, **kwargs)参数参考 Accelerator 。
属性:
device:一个 torch.device,表示要使用的设备。distributed_type:一个 DistributedType,表示当前使用的分布式环境的类型。initialized:一个布尔值,表示 AcceleratorState 是否已经从 Accelerator 得到初始化。local_process_index:一个整数,表示当前进程在当前 server 上的索引。mixed_precision:一个字符串,表示当前脚本是否会使用混合精度,如果是的话,正在执行的混合精度的类型。num_processes:一个整数,表示当前并行启动的进程的数量。process_index :一个整数,表示当前进程的 global index 。class accelerate.state.GradientState():单例类,它存储梯度同步相关的信息从而用于梯度累积。该类是不可变的,在第一次初始化之后就保存不变。
属性:
end_of_dataloader:一个布尔值,表示我们是否已经到达了当前 dataloader 的结束。remainder:一个整数,表示填充 dataloader 所需要增加的额外样本的数量。sync_gradients:一个布尔值,表示梯度是否应该在所有设备上同步。accelerate config 命令:启动一系列提示,为你的训练系统创建并保存 default_config.yml 配置文件。该命令应该总是最先执行。
xxxxxxxxxxaccelerate config [arguments] # 或者 accelerate-config [arguments]命令参数:
--config_file CONFIG_FILE (str):配置文件的存储路径。默认为 default_config.yaml 文件名,存放在 cache location 。-h, --help (bool):展示帮助信息。accelerate config default 命令:启动一系列提示,为你的训练系统创建并保存 default_config.yml 配置文件,但是会在命令行中配置一些参数,如 --mixed_precision 等等。
xxxxxxxxxxaccelerate config default [arguments] # 或者 accelerate-config default [arguments]用法参考 accelerate config 。
accelerate config update:命令:用一组新的参数更新已有的配置文件中的对应项。
xxxxxxxxxxaccelerate config update [arguments] # 或者 accelerate-config update [arguments]用法参考 accelerate config 。
accelerate env:列出配置文件的内容。
xxxxxxxxxxaccelerate env [arguments] # 或者 accelerate-env [arguments]用法参考 accelerate config 。
accelerate launch:launch 一个脚本。
xxxxxxxxxxaccelerate launch [arguments] {training_script} --{training_script-argument-1} --{training_script-argument-2} ...位置参数:
{training_script}:脚本的完整路径。--{training_script-argument-1}:脚本的参数。可选参数:
-h, --help (bool):展示帮助信息。--config_file CONFIG_FILE (str):替换默认的配置文件。-m, --module (bool):将 launch script 解释为一个 Python 模块,即通过 python -m 执行。--no_python (bool):跳过在脚本前加上 "python" ,直接执行它。当脚本不是 Python 脚本时很有用。--debug (bool):当发生故障时,是否打印出 torch.distributed stack trace 。-q, --quiet (bool):将子进程的错误信息从 launch stack trace 切换到仅展示相关的信息。仅用于 DeepSpeed 和单进程。下面的参数可以通过 accelerate config 来配置。也可以在 launch 时配置(或更新):
硬件选择参数:
--cpu (bool):是否强制在 CPU 上进行训练。--multi_gpu (bool):是否应该启动分布式 GPU 训练。--mps (bool):否应该在 MacOS 机器上使用支持 MPS 的 GPU 设备。--tpu (bool) :是否应该启动 TPU 训练。资源选择参数:
--mixed_precision {no,fp16,bf16} (str):是否使用混合精度训练。 BF16 训练仅在 Nvidia Ampere GPU 和 PyTorch 1.10 或更高版本上支持。--num_processes NUM_PROCESSES (int):要并行启动的进程总数。--num_machines NUM_MACHINES (int):本次训练中使用的机器总数。--num_cpu_threads_per_process NUM_CPU_THREADS_PER_PROCESS (int):每个进程的 CPU 线程数。可以进行调优以获得最佳性能。训练方式选择参数:
--use_deepspeed (bool):是否使用 DeepSpeed 进行训练。--use_fsdp (bool):是否使用 FullyShardedDataParallel 进行训练。--use_megatron_lm (bool):是否使用 Megatron-LM 进行训练。分布式 GPU 参数:以下参数只有在传递了 multi_gpu 或者通过 accelerate config 配置了 multi-gpu training 时才有用。
--gpu_ids (str):在这台机器上应该使用哪些 GPU (通过 id 指定)进行训练,以逗号分隔的方式列出。--same_network (bool):用于多节点训练的所有机器是否存在于同一个 local network。--machine_rank MACHINE_RANK (int):启动这个脚本的机器的 rank (即,编号)。--main_process_ip MAIN_PROCESS_IP (str):rank 0 的机器的 IP 地址。--main_process_port MAIN_PROCESS_PORT (int):与 rank 0 的机器通信的端口。--rdzv_conf (str):额外的 rendezvous 配置(<key1>=<value1>,<key2>=<value2>,…) 。--max_restarts (int):worker group 最多重启多少次(之后不再重启而是失败)。--monitor_interval (float):监控 worker 状态的时间间隔,单位是秒。TPU 参数:以下参数只有在传递了 tpu 或者通过 accelerate config 配置了 tpu training 时才有用。
--main_training_function MAIN_TRAINING_FUNCTION (str):脚本中要执行的主函数的名称。--downcast_bf16 (bool):当在 TPU 上使用 bf16 精度时,是否 float 和 double 张量都被类型转换到 bfloat16 ,还是 double 张量仍为 float32 。DeepSpeed 参数:以下参数只有在传递了 use_deepspeed 或者通过 accelerate config 配置了 deepspeed 时才有用。
--deepspeed_config_file (str):DeepSpeed 配置文件。--zero_stage (int):DeepSpeed 的 ZeRO 优化阶段。--offload_optimizer_device (str):决定在哪里(none|cpu|nvme)卸载优化器状态。--offload_param_device (str):决定在哪里(none|cpu|nvme)卸载 parameters。--gradient_accumulation_steps (int):训练脚本中使用的gradient_accumulation_steps 的数量。--gradient_clipping (float):训练脚本中使用的梯度剪裁值。--zero3_init_flag (str):决定是否( true|false )启用 deepspeed.zero.Init来构建大规模模型。只适用于DeepSpeed ZeRO Stage-3 。--zero3_save_16bit_model (str):决定在使用 ZeRO Stage-3 时是否(true|false)保存 16 位模型权重。只适用于DeepSpeed ZeRO Stage-3 。--deepspeed_hostfile (str):用于配置多节点计算资源的DeepSpeed hostfile。--deepspeed_exclusion_filter (str):当使用多节点配置时,DeepSpeed exclusion filter 字符串。--deepspeed_inclusion_filter (str):当使用多节点配置时,DeepSpeed inclusionfilter 字符串。--deepspeed_multinode_launcher (str):要使用的 DeepSpeed 多节点 launcher。Fully Sharded Data Parallelism 参数:以下参数只有在传递了 use_fdsp 或者通过 accelerate config 配置了 Fully Sharded Data Parallelism 时才有用。
--fsdp_offload_params (str): 决定是否( true|false )将 parameters 和梯度卸载到 CPU 。--fsdp_min_num_params (int):FSDP 默认的 Default Auto Wrapping 的 parameters 的最少数量。--fsdp_sharding_strategy (int):FSDP 的分片策略。--fsdp_auto_wrap_policy (str):FSDP 的 auto wrap policy 。--fsdp_transformer_layer_cls_to_wrap (str):要 wrap 的 Transformer layer class name (区分大小写),例如:BertLayer, GPTJBlock, T5Block ... 。--fsdp_backward_prefetch_policy (str):FSDP 的 backward prefetch policy 。--fsdp_state_dict_type (str):FSDP 的 state dict 类型。Megatron-LM 参数:以下参数只有在传递了 use_megatron_lm 或者通过 accelerate config 配置了 Megatron-LM 时才有用。
--megatron_lm_tp_degree (''):Megatron-LM 的张量并行(Tensor Parallelism: TP )度。--megatron_lm_pp_degree (''):Megatron-LM 的管道平行(Pipeline Parallelism: PP )度。--megatron_lm_num_micro_batches (''):当管道并行度大于 1 时,Megatron-LM 的 micro batch 数量。--megatron_lm_sequence_parallelism (''):当张量并行度大于 1 时,决定是否(true|false )启用序列并行 Sequence Parallelism 。--megatron_lm_recompute_activations (''):决定是否(true|false )启用 Selective Activation Recomputation 。--megatron_lm_use_distributed_optimizer ('') :决定是否(true|false )使用分布式优化器,将优化器状态和梯度分片到 Data Pralellel: DP 的 ranks 。--megatron_lm_gradient_clipping (''):Megatron-LM 基于全局 L2 范数的梯度裁剪值( 0 表示禁用)。AWS SageMaker 参数:以下参数仅当在 SageMake 中训练时才有用。
--aws_access_key_id AWS_ACCESS_KEY_ID (str):用于启动 Amazon SageMaker 训练工作的 AWS_ACCESS_KEY_ID 。--aws_secret_access_key AWS_SECRET_ACCESS_KEY (str):用于启动 Amazon SageMaker 训练工作的AWS_SECRET_ACCESS_KEY 。accelerate tpu-config:配置 tpu 训练。
xxxxxxxxxxaccelerate tpu-config [arguments]可选参数:
-h, --help (bool):展示帮助信息。配置参数:下面参数也可以通过 accelerate config 来配置:
--config_file (str):accelerate 配置文件的路径。--tpu_name (str):要使用的 TPU 的名称。如果没有指定,将使用配置文件中指定的 TPU 。--tpu_zone (str):要使用的 TPU 的 zone。如果没有指定,将使用配置文件中指定的 zone。TPU 参数:下面的参数用于配置 TPU。
--command_file (str):一个文件的路径,该文件包含启动时在 pod 上运行的命令。--command (str):要在 pod 上运行的命令。可以传递多次。--install_accelerate (bool):是否在 pod 上安装 accelerate 。默认为 False 。--accelerate_version (str):在 pod 上安装 accelerate 的版本。如果不指定,将使用最新的 pypi 版本。指定 'dev' 可以从 GitHub 安装。--debug (bool):如果设置,将打印将运行的命令,而不是运行它。accelerate test:执行 accelerate/test_utils/test_script.py 从而确保 Accelerate 被正确地配置。
xxxxxxxxxxaccelerate test [arguments] # 或 accelerate-test [arguments]可选参数:
--config_file CONFIG_FILE (str):配置文件的存储路径。默认为 default_config.yaml 文件名,存放在 cache location 。-h, --help (bool):展示帮助信息。class accelerate.tracking.GeneralTracker():所有 Tracker 的基类。
方法(每个方法都应该接受 **kwargs 参数):
finish():应该运行位于 tracking API 中的任何 finalizing function 。如果API 中没有这类 finalizing function ,则不要重写 finish() 方法。
log(values: dict, step: typing.Optional[int], **kwargs ):记录当前 run 的日志。
参数:
values:一个字典,指定 key-value 的要被记录的内容。注意,key 为字符串,而value 必须是字符串、浮点数、或整数类型。step:一个整数,指定当前的 run step。store_init_configuration(values: dict ):将 values 记录为超参数。
参数:参考 log() 。
class accelerate.tracking.TensorBoardTracker:Tensorboard Tracker 。应该在你的脚本开始的地方就被初始化。
xxxxxxxxxxclass accelerate.tracking.TensorBoardTracker( run_name: str, logging_dir: typing.Union[str, os.PathLike, NoneType] = None, **kwargs)参数:
run_name:一个字符串,指定当前 experiment run 的名字。logging_dir:一个字符串或 os.PathLike,指定 TensorBoard logs 存储的位置。kwargs:关键字参数,传递给 tensorboard.SummaryWriter.__init__ 方法。class accelerate.tracking.WandBTracker(run_name: str, **kwargs ):WandB Tracker。应该在你的脚本开始的地方就被初始化。
参数:
run_name:一个字符串,指定当前 experiment run 的名字。kwargs:关键字参数,传递给 wandb.init 方法。class accelerate.tracking.CometMLTracker(run_name: str, **kwargs ):comet_ml Tracker 。应该在你的脚本开始的地方就被初始化。API key 必须存储在 Comet 配置文件中。
参数:
run_name:一个字符串,指定当前 experiment run 的名字。kwargs:关键字参数,传递给 Experiment.__init__ 方法。accelerate.notebook_launcher( function, args = (), num_processes = None, mixed_precision = 'no', use_port = '29500'):启动一个训练函数。如果当前环境中允许的话(如,具有多核的 TPU ),使用几个进程。
要使用这个 notebook_launcher ,在调用之前,notebook session 中必须对 CUDA 设备没有任何调用。如果有任何调用,你将需要重启 notebook ,并确保没有 cell 使用任何 CUDA 设备。
参数:
function:一个可调用对象,指定要执行的训练函数。如果它接受参数,第一个参数应该是运行进程的 index。args:一个元组,指定传递给函数的参数的元组(函数将接收到 *args)。num_processes:一个整数,指定训练时使用的进程的数量。如果有 TPU,则在 Colab/Kaggle 中默认为 8,否则为可用的GPU 数量。mixed_precision:一个字符串,指定混合精度训练。默认为 'no' 。use_port:一个字符串,指定启动多 GPU 训练时用于进程间通信的端口。默认为 '29500' 。示例:
xxxxxxxxxx# Assume this is defined in a Jupyter Notebook on an instance with two GPUsfrom accelerate import notebook_launcher
def train(*args): # Your training function here ...
notebook_launcher(train, args=(arg1, arg2), num_processes=2, mixed_precision="fp16")accelerate.debug_launcher(function, args = (), num_processes = 2):在 CPU 上使用几个进程启动一个训练函数从而用于调试。
debug_launcher 仅用于调试,不应该用于真实的训练。它将仅使用 CPU 。
参数:参考 notebook_launcher 。
accelerate 有自己的 logging 工具从而用于分布式系统。使用方法为:用 accelerate.logging 代替 Python logging 。如:
xxxxxxxxxx- import logging+ from accelerate.logging import get_logger- logger = logging.getLogger(__name__)+ logger = get_logger(__name__)accelerate.logging.get_logger(name: str, log_level: str = None ):返回一个 logging.Logger ,它可以用于多进程环境。
参数:
name:一个字符串,指定 logger 名字。log_level:一个字符串,指定 log level 。默认为 LOG_LEVEL 环境变量指定的。如果没有 LOG_LEVEL 环境变量,则默认为 INFO 。如果一个 log 应该在所有进程上都记录,那么使用 main_process_only=False;否则仅在全局主进程上记录。
xxxxxxxxxxfrom accelerate.logging import get_logger
logger = get_logger(__name__)
logger.info("My log", main_process_only=False) # 所有进程都记录logger.debug("My log", main_process_only=True) # 仅全局主进程记录
logger = get_logger(__name__, accelerate_log_level="DEBUG")logger.info("My log") # level 太低,被过滤logger.debug("My second log") # level 符合条件,记录accelerate.init_empty_weights(include_buffers: bool = False):一个上下文管理器,在这个管理器下,模型被初始化为所有 parameters 都在 meta device 上,因此创建一个空模型。当仅仅初始化模型就会耗尽可用的内存时,这很有用。
参数:include_buffers:一个布尔值,指定在模型初始化时是否将所有的 buffers 也放在 meta device 上。
示例:
xxxxxxxxxximport torch.nn as nnfrom accelerate import init_empty_weights
# Initialize a model with 100 billions parameters in no time and without using any RAM.with init_empty_weights(): tst = nn.Sequential(*[nn.Linear(10000, 10000) for _ in range(1000)])accelerate.cpu_offload( model: Module, execution_device: typing.Optional[torch.device] = None, offload_buffers: bool = False, state_dict: typing.Union[typing.Dict[str, torch.Tensor], NoneType] = None, preload_module_classes: typing.Optional[typing.List[str]] = None):将模型的所有 parameters 卸载到 CPU上。此时,仅在 CPU 中保留一份 state dict (GPU 中没有了)。在前向传播过程中,parameters 将从 state dict 中提取,并在需要时放在 execution_device 上,然后再次卸载。
xxxxxxxxxxaccelerate.cpu_offload( model: Module, execution_device: typing.Optional[torch.device] = None, offload_buffers: bool = False, state_dict: typing.Union[typing.Dict[str, torch.Tensor], NoneType] = None, preload_module_classes: typing.Optional[typing.List[str]] = None)参数:
model:一个 torch.nn.Module,指定被卸载的模型。execution_device:一个 torch.device,指定执行模型前向传播的设备(应该是 GPU )。将默认为模型的第一个 parameter device 。offload_buffers:一个布尔值,指定是否也同时卸载模型的 buffer 。state_dict:一个字典,指定模型的 state dict(它将被保持在 CPU )。preload_module_classes:一个关于字符串的列表,它们的实例应该在前向传播开始时就加载权重(包括实例中包含的子模块)。accelerate.disk_offload():将模型的所有 parameters 卸载到磁盘上。此时,模型的所有 parameter 将作为内存映射 array 被卸载到一个给定的目录中。在前向传播过程中,parameters 将从该目录中访问,并在需要时放在 execution_device 上,然后再次卸载。
xxxxxxxxxxaccelerate.disk_offload( model: Module, offload_dir: typing.Union[str, os.PathLike], execution_device: typing.Optional[torch.device] = None, offload_buffers: bool = False, preload_module_classes: typing.Optional[typing.List[str]] = None)参数:
offload_dir:一个字符串或 os.PathLike,指定卸载权重到哪个目录。model/execution_device/offload_buffers/preload_module_classes:参考 cpu_offload() 。accelerate.dispatch_model():根据一个给定的设备映射来 dispatch 一个模型。模型的各层可能分布在 GPU 上,也可能卸载在CPU 甚至磁盘上。
xxxxxxxxxxaccelerate.dispatch_model( model: Module,device_map: typing.Dict[str, typing.Union[str, int, torch.device]], main_device: typing.Optional[torch.device] = None, state_dict: typing.Union[typing.Dict[str, torch.Tensor], NoneType] = None, offload_dir: typing.Union[str, os.PathLike, NoneType] = None, offload_index: typing.Union[typing.Dict[str, str], NoneType] = None, offload_buffers: bool = False, preload_module_classes: typing.Optional[typing.List[str]] = None)参数:
model:一个 torch.nn.Module,指定要 dispatch 的模型。
device_map:一个字典,将模型 state_dict 中的模块名称映射到它们应该放置的设备。注意,设备名称可以是 "disk",即使它不是 torch.device 的正确值。
它不需要细化到每个 parameter/buffer 名称,一旦一个给定的模块名称在里面,它的每个子模块都将被发送到同一个设备。要让 Accelerate 自动计算出最优化的设备映射,请设置 device_map="auto" 。
main_device:一个字符串、整数、或 torch.device,指定主执行设备。默认为 device_map 中不同于 "cpu" 或 "disk" 的第一个设备。
state_dict:一个字典,指定模型哪些部分的 state dict 被保留在 CPU 上。
offload_dir:一个字符串或 os.PathLike,指定卸载模型权重的文件夹(或者模型权重已经被卸载的地方)。
offload_index:一个字典,从权重名称到权重信息( dtype/shape 或 safetensors 文件名)。默认为 save_folder 中保存的 index 。
offload_buffers/preload_module_classes:参考 cpu_offload() 。
accelerate.load_checkpoint_and_dispatch():加载一个 checkpoint(可能是被分片后的)到模型。可能在加载时将权重发送到一个给定的设备上,并添加各种 hooks ,这些 hooks 这个模型正常运行(即使在设备间分割)。
xxxxxxxxxxaccelerate.load_checkpoint_and_dispatch( model: Module, checkpoint: typing.Union[str, os.PathLike], device_map: typing.Union[str, typing.Dict[str, typing.Union[str, int, torch.device]], NoneType] = None, max_memory: typing.Union[typing.Dict[typing.Union[int, str], typing.Union[int, str]], NoneType] = None, no_split_module_classes: typing.Optional[typing.List[str]] = None, offload_folder: typing.Union[str, os.PathLike, NoneType] = None, offload_buffers: bool = False, dtype: typing.Union[str, torch.dtype, NoneType] = None, offload_state_dict: typing.Optional[bool] = None, preload_module_classes: typing.Optional[typing.List[str]] = None )参数:
model:一个 torch.nn.Module,指定需要加载 checkpoint 的模型。checkpoint:一个字符串或 os.PathLike 对象,指定 checkpoint 的路径名。可以为:包含整个模型 state dict 的文件的路径、一个 .json 文件的路径(该 .json 文件包含 sharded checkpoint 的索引)、一个路径包含唯一的 .index.json 文件和 shards checkpoint 。max_memory:一个字典,指定每个设备的最大内存。如果不设置,将默认为每个 GPU 的最大内存,以及可用的 CPU RAM 。no_split_module_classes:一个关于字符串的列表,指定哪些 layer 不能跨设备分割(如,包含残差连接的层)。dtype:一个字符串或 torch.dtype,指定权重在加载时被转换为什么类型。offload_folder:一个字符串或 os.PathLike 对象,如果 device_map 包含 "disk" ,那么 offload_folder 指定卸载权重的目录。offload_state_dict:一个布尔值,如果为 True,则临时卸载 CPU state dict 到硬盘上从而防止 CPU out of memory 。如果 device map 包含 "disk",则默认为 True 。device_map/offload_buffers/preload_module_classes:参考 cpu_offload() 。示例:
xxxxxxxxxxfrom accelerate import init_empty_weights, load_checkpoint_and_dispatchfrom huggingface_hub import hf_hub_downloadfrom transformers import AutoConfig, AutoModelForCausalLM
# Download the Weightscheckpoint = "EleutherAI/gpt-j-6B"weights_location = hf_hub_download(checkpoint, "pytorch_model.bin")
# Create a model and initialize it with empty weightsconfig = AutoConfig.from_pretrained(checkpoint)with init_empty_weights(): model = AutoModelForCausalLM.from_config(config)
# Load the checkpoint and dispatch it to the right devicesmodel = load_checkpoint_and_dispatch( model, weights_location, device_map="auto", no_split_module_classes=["GPTJBlock"])class accelerate.hooks.ModelHook():hook 包含一些回调函数,这些回调函数在模块的前向传播之前或之后被调用。
与 PyTorch 现有的 hook 不同的是,这里的 hook 会沿着 kwargs 传递。
属性:
no_grad(bool, optional, defaults to False):是否在 torch.no_grad() 上下文管理器中执行实际的前向传播过程。方法:
detach_hook(module):当 hook 从一个模块中 detach 时执行该方法。
参数:module:一个 torch.nn.Module 对象,指定模块。
init_hook(module):当 hook attach 到一个模块时执行该方法。
参数:module:一个 torch.nn.Module 对象,指定模块。
post_forward(module, output) -> any:在模块的前向传播之后立即执行。
参数:
module:一个 torch.nn.Module 对象,指定模块。output:任何对象,表示模块前向传播的结果。返回:后处理的 output 。
pre_forward(module, *args, **kwargs) -> Tuple[Tuple[Any], Dict[Str, Any]]:在模块的前向传播之前执行。
参数:
module:一个 torch.nn.Module 对象,指定模块。args, kwargs:位置参数和关键字参数,被传递给 module 。返回值:处理过的 args 和 kwargs 的元组。
class accelerate.hooks.AlignDevicesHook():一个通用的 ModelHook ,确保 inputs 和模型权重在相关模块的前向传播中处于同一设备上,可能在前向传播之后卸载权重。
xxxxxxxxxxclass accelerate.hooks.AlignDevicesHook( execution_device: typing.Union[int, str, torch.device, NoneType] = None, offload: bool = False, io_same_device: bool = False, weights_map: typing.Optional[typing.Mapping] = None, offload_buffers: bool = False, place_submodules: bool = False)参数:
execution_device:一个 torch.device,指定在前向传播之前,inputs 和模型权重应该放在哪个设备上。offload:一个布尔值,指定权重是否应该在前向传播后被卸载。io_same_device:一个布尔值,指定 outputs 是否应放在与 inputs 相同的设备上。weights_map:一个 Mapping (可能是惰性的),指定当模型权重被卸载时,从参数名到张量值的映射。offload_buffers:一个布尔值,指定卸载时是否也包含模块的 buffers 。place_submodules:一个布尔值,指定在 init_hook 事件中是否将子模块放在 execution_device 上。class accelerate.hooks.SequentialHook(*hooks):一个 hook ,可以包含几个 hook ,并在每个事件中对这些子 hook 进行迭代。
accelerate.hooks.add_hook_to_module( module: Module, hook: ModelHook, append: bool = False) -> torch.nn.Module:在一个给定的模块中添加一个 hook 。
这将重写模块的 forward() 方法,使其包含 hook 。如果要删除这一行为并恢复原来的 forward() 方法,请使用remove_hook_from_module() 。
module:一个 torch.nn.Module 对象,指定需要 attach hook 的模块。hook:一个 ModelHook,指定 hook 。append:一个布尔值,指定当前的 hook 是否与 module 上已有的 hook 串起来(即,hook list)、或者替代已有的 hook 。默认为替代行为(False )。返回相同的 module,但是已经 attach 了 hook 。注意,module 已经被原地修改了。
accelerate.hooks.attach_execution_device_hook( module: Module, execution_device: typing.Union[int, str, torch.device], preload_module_classes: typing.Optional[typing.List[str]] = None):递归地将 AlignDevicesHook 附加到一个给定模型的所有子模块,以确保它们有正确的执行设备。
参数:
module:一个 torch.nn.Module 对象,指定需要 attach hook 的模块。execution_device:一个整数、字符串、或 torch.device,指定前向传播之前,inputs 和模型权重应该放到哪个设备。preload_module_classes:参考 cpu_offload() 。accelerate.hooks.attach_align_device_hook( module: Module, execution_device: typing.Optional[torch.device] = None, offload: bool = False, weights_map: typing.Optional[typing.Mapping] = None, offload_buffers: bool = False, module_name: str = '', preload_module_classes: typing.Optional[typing.List[str]] = None):递归地将 AlignDevicesHook 附加到一个给定模型的所有子模块。
参数:
module/execution_device/preload_module_classes:参考 attach_execution_device_hook() 。offload/weights_map/offload_buffers:参考 AlignDevicesHook() 初始化方法。module_name:一个字符串,指定模块的名字。accelerate.hooks.attach_align_device_hook_on_blocks(module: Module, execution_device: typing.Union[torch.device, typing.Dict[str, torch.device], NoneType] = None, offload: typing.Union[bool, typing.Dict[str, bool]] = False, weights_map: typing.Mapping = None, offload_buffers: bool = False, module_name: str = '', preload_module_classes: typing.Optional[typing.List[str]] = None):根据需要将AlignDevicesHook 附加到一个给定模型的所有 blocks 上。
参数:参考 attach_align_device_hook() 。
accelerate.hooks.remove_hook_from_module(module: Module, recurse = False ) -> torch.nn.Module:移除模块上附加的任何 hook 。
参数:
module:一个 torch.nn.Module 对象,指定需要 detach hook 的模块。recurse:一个布尔值,指定是否递归地移除。返回相同的 module,但是已经 detach 了 hook 。注意,module 已经被原地修改了。
accelerate.hooks.remove_hook_from_submodules(module: Module):递归地删除一个给定模型的子模块上的所有 hook 。
参数:module:一个 torch.nn.Module 对象,指定需要 detach hook 的模块。
class accelerate.DeepSpeedPlugin:用于集成 DeepSpeed 的插件。
xxxxxxxxxxclass accelerate.DeepSpeedPlugin( hf_ds_config: typing.Any = None, gradient_accumulation_steps: int = None, gradient_clipping: float = None, zero_stage: int = None, is_train_batch_min: str = True, offload_optimizer_device: bool = None, offload_param_device: bool = None, zero3_init_flag: bool = Nonezero3_save_16bit_model: bool = None)方法:
deepspeed_config_process( prefix = '', mismatches = None, config = None, must_match = True, **kwargs ):用 kwargs 的内容来处理 DeepSpeed config 。class accelerate.utils.DummyOptim(params, lr = 0.001, weight_decay = 0, **kwargs):Dummy optimizer。 当在 deepspeed 配置文件中指定 optimizer config 时,如果遵循常规的训练循环则用该 Dummy optimizer 。
参数:
lr:一个浮点数,指定学习率。params:一个可迭代对象或字典,指定 parameters 或 parameter group 。weight_decay:一个浮点数,指定权重衰减。kwargs:关键字参数。class accelerate.utils.DummyScheduler( optimizer, total_num_steps = None, warmup_num_steps = 0, **kwargs ):Dummy scheduler 。当在 deepspeed 配置文件中指定 scheduler config 时,如果遵循常规的训练循环则用该 Dummy scheduler 。
参数:
optimizer:一个 torch.optim.optimizer.Optimizer:指定优化器。total_num_steps:一个整数,指定总的 step 数。warmup_num_steps:一个整数,指定 warmup 的 step 数。kwargs:关键字参数。class accelerate.utils.DeepSpeedEngineWrappe(engine):用于 deepspeed.runtime.engine.DeepSpeedEngine 的 wrapper。它用于常规的训练循环。
参数:engine:一个 deepspeed.runtime.engine.DeepSpeedEngine,指定被 wrap 的 deepspeed engine 。
class accelerate.utils.DeepSpeedOptimizerWrapper(optimizer):用于 deepspeed optimizer 的 wrapper 。
参数:optimizer:一个 torch.optim.optimizer.Optimizer,指定被 wrap 的优化器。
class accelerate.utils.DeepSpeedSchedulerWrapper(scheduler, optimizers):用于 deepspeed scheduler 的 wrapper 。
参数:
scheduler:一个 torch.optim.lr_scheduler.LambdaLR,指定被 wrap 的 scheduler 。optimizers:一个torch.optim.Optimizer 或它的列表,指定被 wrap 的 optimizer 。class accelerate.utils.MegatronLMPlugin:Megatron-LM的插件,用于实现张量并行、管道并行、序列并行和数据并行。还可以启用 selective activation recomputation 和 optimized fused kernel 。
xxxxxxxxxxclass accelerate.utils.MegatronLMPlugin( tp_degree: int = None, pp_degree: int = None, num_micro_batches: int = None, gradient_clipping: float = None, sequence_parallelism: bool = None, recompute_activation: bool = None, use_distributed_optimizer: bool = None, pipeline_model_parallel_split_rank: int = None, num_layers_per_virtual_pipeline_stage: int = None, is_train_batch_min: str = True, train_iters: int = None, train_samples: int = None, weight_decay_incr_style: str = 'constant', start_weight_decay: float = None, end_weight_decay: float = None, lr_decay_style: str = 'linear', lr_decay_iters: int = None, lr_decay_samples: int = None, lr_warmup_iters: int = None, lr_warmup_samples: int = None, lr_warmup_fraction: float = None, min_lr: float = 0, consumed_samples: typing.List[int] = None, no_wd_decay_cond: typing.Optional[typing.Callable] = None, scale_lr_cond: typing.Optional[typing.Callable] = None, lr_mult: float = 1.0, megatron_dataset_flag: bool = False, seq_length: int = None, encoder_seq_length: int = None, decoder_seq_length: int = None, tensorboard_dir: str = None, set_all_logging_options: bool = False, eval_iters: int = 100, eval_interval: int = 1000, return_logits: bool = False, custom_train_step_class: typing.Optional[typing.Any] = None, custom_train_step_kwargs: typing.Union[typing.Dict[str, typing.Any], NoneType] = None, custom_model_provider_function: typing.Optional[typing.Callable] = None, custom_prepare_model_function: typing.Optional[typing.Callable] = None, other_megatron_args: typing.Union[typing.Dict[str, typing.Any], NoneType] = None)class accelerate.utils.MegatronLMDummyScheduler(optimizer, total_num_steps = None, warmup_num_steps = 0, **kwargs ):MegatronLM Dummy scheduler 。当在 megatron 配置文件中指定 scheduler config 时,如果遵循常规的训练循环则用该 Dummy scheduler 。
参数:参考 DummyScheduler 。
class accelerate.utils.MegatronLMDummyDataLoader(**dataset_kwargs):Dummy DataLoader,仅用于常规的训练循环。
class accelerate.utils.AbstractTrainStep(name):用于 batching 的抽象类。
class accelerate.utils.GPTTrainStep(args):GPT train step 类。
参数:args:指定 Megatron-LM 的参数。
class accelerate.utils.BertTrainStep(args):Bert train step 类。
参数:args:指定 Megatron-LM 的参数。
class accelerate.utils.T5TrainStep(args):T5 train step 类。
参数:args:指定 Megatron-LM 的参数。
accelerate.utils.avg_losses_across_data_parallel_group(losses):跨 data parallel group 来对损失函数值取平均。
参数:losses:张量的列表,指定哪些 loss 需要跨 data parallel group 取平均。