Table des matières
PyTorch : Parallélisme de données multi-GPU et multi-nœuds
Cette page explique comment distribuer un modèle neuronal artificiel implémenté dans un code PyTorch, selon la méthode du parallélisme de données.
Nous documentons ici la solution intégrée DistributedDataParallel
, qui est la plus performante selon la documentation PyTorch. Il s'agit d'un parallélisme multi-process qui fonctionne aussi bien en mono-nœud qu'en multi-nœuds.
Configuration multi-process avec SLURM
Pour le multi-nœuds, il est nécessaire d'utiliser le multi-processing géré par SLURM (exécution via la commande SLURM srun
). Pour le mono-nœud il est possible d'utiliser, comme la documentation PyTorch l'indique, torch.multiprocessing.spawn
. Cependant il est possible et plus pratique d'utiliser le multi-processing SLURM dans tous les cas, en mono-nœud ou en multi-nœuds. C'est ce que nous documentons dans cette page.
Dans SLURM, lorsqu'on lance un script avec la commande srun
, le script est automatiquement distribué sur toutes les tâches prédéfinies. Par exemple, si nous réservons 4 nœuds octo-GPU en demandant 3 GPU par nœud, nous obtenons :
- 4 nœuds, indexés de 0 à 3
- 3 GPU/nœud indexés de 0 à 2 sur chaque nœud,
- 4 x 3 = 12 processus au total permettant d'exécuter 12 tâches avec les rangs de 0 à 11
Les communications collectives inter-nœuds sont gérées par la librairie NCCL.
Voici deux exemples de script SLURM pour Jean-Zay :
- pour une réservation de N nœuds quadri-GPU V100 via la partition GPU par défaut :
#!/bin/bash #SBATCH --job-name=torch-multi-gpu #SBATCH --nodes=N # nombre total de noeuds (N à définir) #SBATCH --ntasks-per-node=4 # nombre de tache par noeud (ici 4 taches soit 1 tache par GPU) #SBATCH --gres=gpu:4 # nombre de GPU reserve par noeud (ici 4 soit tous les GPU) #SBATCH --cpus-per-task=10 # nombre de coeurs par tache (donc 4x10 = 40 coeurs soit tous les coeurs) #SBATCH --hint=nomultithread #SBATCH --time=40:00:00 #SBATCH --output=torch-multi-gpu%j.out ##SBATCH --account=abc@v100 module load pytorch-gpu/py3/1.11.0 srun python myscript.py
Remarque : ici, les nœuds sont réservés en exclusivité. En particulier, cela nous donne accès à toute la mémoire de chaque nœud.
- pour une réservation de N nœuds octo-GPU A100 :
#!/bin/bash #SBATCH --job-name=torch-multi-gpu #SBATCH --nodes=N # nombre total de noeuds (N à définir) #SBATCH --ntasks-per-node=8 # nombre de tache par noeud (ici 8 taches soit 1 tache par GPU) #SBATCH --gres=gpu:8 # nombre de GPU reserve par noeud (ici 8 soit tous les GPU) #SBATCH --cpus-per-task=8 # nombre de coeurs par tache (donc 8x8 = 64 coeurs soit tous les coeurs) #SBATCH --hint=nomultithread #SBATCH --time=40:00:00 #SBATCH --output=torch-multi-gpu%j.out #SBATCH -C a100 ##SBATCH --account=abc@a100 module load cpuarch/amd module load pytorch-gpu/py3/1.11.0 srun python myscript.py
Remarque : ici, les nœuds sont réservés en exclusivité. En particulier, cela nous donne accès à toute la mémoire de chaque nœud.
Implémentation de la solution DistributedDataParallel
Pour implémenter la solution DistributedDataParallel
en PyTorch, il faut:
- Définir les variables d'environnement liées au nœud maître.
MASTER_ADD
, l'adresse IP ou le hostname du nœud correspondant à la tâche 0 (le premier de la liste des nœuds). Si on est en mono-nœud, la valeurlocalhost
suffit.MASTER_PORT
, un numéro de port aléatoire. Pour éviter les conflits et par convention on utilisera un numéro de port compris entre10001
et20000
(par exemple12345
).- Sur Jean Zay, une librairie développée par l'IDRIS a été incluse dans les modules Pytorch pour définir automatiquement les variables
MASTER_ADD
etMASTER_PORT
. Il suffit de l'importer dans votre script :import idr_torch
Cette commande suffit à créer les variables. Pour votre information, la bibliothèque appelée est disponible ici.
Remarque : Le module idr_torch
récupérant les valeurs de variables d'environnement, vous pouvez les réutiliser dans votre script en appelant idr_torch.rank
, idr_torch.local_rank
, idr_torch.size
et/ou idr_torch.cpus_per_task
.
- Initialiser le process group (i.e. le nombre de processus, le protocole de communications collectives ou backend, …). Les backends possibles sont
NCCL
,GLOO
etMPI
.NCCL
est doublement conseillé pour la performance et la garantie de bon fonctionnement sur Jean Zay.import torch.distributed as dist from torch.nn.parallel import DistributedDataParallel as DDP dist.init_process_group(backend='nccl', init_method='env://', world_size=idr_torch.size, rank=idr_torch.rank)
- Envoyer le modèle sur le GPU. Notez que
local_rank
(numérotation 0, 1, 2, … pour chaque nœud) sert d'identifiant GPU.torch.cuda.set_device(idr_torch.local_rank) gpu = torch.device("cuda") model = model.to(gpu)
- Transformer le modèle en modèle distribué associé à un GPU.
ddp_model = DDP(model, device_ids=[idr_torch.local_rank])
- Envoyer les sous-batches et les labels vers le GPU dédié, lors de l'apprentissage.
for (images, labels) in train_loader: images = images.to(gpu, non_blocking=True) labels = labels.to(gpu, non_blocking=True)
Remarque : ici, l'option
non_blocking=True
est nécessaire si le DataLoader utilise la fonctionnalitépin memory
pour accélérer le chargement des entrées.
Le code ci-dessous illustre l'utilisation du DataLoader avec un sampler adapté au parallélisme de données.batch_size = args.batch_size batch_size_per_gpu = batch_size // idr_torch.size # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss() optimizer = torch.optim.SGD(ddp_model.parameters(), 1e-4) # Data loading code train_dataset = torchvision.datasets.MNIST(root=os.environ['DSDIR'], train=True, transform=transforms.ToTensor(), download=False) train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset, num_replicas=idr_torch.size, rank=idr_torch.rank, shuffle=True) train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size_per_gpu, shuffle=False, num_workers=0, pin_memory=True, sampler=train_sampler)
Attention, le shuffling est délégué au DistributedSampler
. De plus, pour que la seed de shuffling varie à chaque époque, il faudra rajouter l'instruction train_sampler.set_epoch(epoch)
au début de chaque époque.
Sauvegarde et chargement de checkpoints
Il est possible de mettre en place des checkpoints lors d'un apprentissage distribué sur des GPU.
Sauvegarde
Le modèle étant répliqué sur chaque GPU, la sauvegarde de checkpoints peut être réalisée par un seul GPU pour limiter les opérations d'écriture. Par convention, on sollicite le GPU de rang 0 :
if idr_torch.rank == 0: torch.save(ddp_model.state_dict(), CHECKPOINT_PATH)
Ainsi, le checkpoint contiendra l'information issue du GPU de rang 0 qui est alors sauvegardée dans un format spécifique aux modèles distribués.
Chargement
Au début d'un apprentissage, le chargement d'un checkpoint est d'abord opéré par le CPU, puis l'information est envoyée sur le GPU.
Par défaut et par convention, cet envoi se fait vers l'emplacement mémoire qui a été utilisé lors de l'étape de sauvegarde : avec notre exemple, seul le GPU 0 chargerait le modèle en mémoire.
Pour que l'information soit communiquée à l'ensemble des GPUs, il faut utiliser l'argument map_location
de la fonction de chargement torch.load
pour rediriger le stockage en mémoire.
Dans l'exemple ci-dessous, l'argument map_location
ordonne une redirection du stockage en mémoire vers le GPU de rang local. Cette fonction étant appelée par l'ensemble des GPU, chaque GPU charge bien le checkpoint dans sa propre mémoire :
map_location = {'cuda:%d' % 0: 'cuda:%d' % idr_torch.local_rank} # remap storage from GPU 0 to local GPU ddp_model.load_state_dict(torch.load(CHECKPOINT_PATH), map_location=map_location)) # load checkpoint
Remarque : si, comme dans le tutoriel PyTorch, un checkpoint est chargé juste après une sauvegarde, il est nécessaire d'appeler la méthode dist.barrier()
avant le chargement. L'appel à dist.barrier()
permet de synchroniser les GPUs, garantissant ainsi que la sauvegarde du checkpoint par le GPU de rang 0 est bien achevée avant que les autres GPUs tentent de le charger.
Validation distribuée
L'étape de validation exécutée après chaque epoch ou après un nombre fixé d'itérations d'apprentissage peut se distribuer sur tous les GPU engagés dans l'apprentissage du modèle. Lorsque le parallélisme de données est utilisé et que l'ensemble de données de validation est conséquent, cette solution de validation distribuée sur les GPU semble être la plus efficace et la plus rapide.
Ici, l'enjeu est de calculer les métriques (loss, accuracy, etc…) par batch et par GPU, puis de les pondérer et de les moyenner sur l'ensemble des données de validation.
Pour cela, il faut:
- charger les données de validation de la même manière que les données d'apprentissage, mais sans les transformations aléatoires comme la data augmentation ou le shuffling (voir la documentation sur le chargement de bases de données en PyTorch) :
# validation dataset loading (imagenet for example) val_dataset = torchvision.datasets.ImageNet(root=root,split='val',transform=val_transform) # define distributed sampler for validation val_sampler = torch.utils.data.distributed.DistributedSampler(val_dataset, num_replicas=idr_torch.size, rank=idr_torch.rank, shuffle=False) # define dataloader for validation val_loader = torch.utils.data.DataLoader(dataset=val_dataset, batch_size=batch_size_per_gpu, shuffle=False, num_workers=4, pin_memory=True, sampler=val_sampler, prefetch_factor=2)
- basculer du mode “apprentissage” au mode “validation” pour désactiver certaines fonctionnalités propres à l'entraînement qui sont coûteuse et ici inutiles :
model.eval()
pour basculer le modèle en mode “validation” et désactiver la gestion des dropout, des batchnorm, etc.with torch.no_grad()
pour ignorer le calcul du gradient- optionnellement,
with autocast()
pour utiliser l'AMP (mixed precision)
- évaluer le modèle et calculer la métrique par batch de la manière habituelle (ici, nous prenons l'exemple du calcul de la loss, cela sera la même chose pour d'autres métriques) :
outputs = model(val_images)
suivi deloss = criterion(outputs, val_labels)
- pondérer et cumuler la métrique par GPU :
val_loss += loss * val_images.size(0) / N
avecval_images.size(0)
la taille du batch etN
la taille globale du dataset de validation. Sachant que les batches n'ont pas nécessairement la même taille (dernier batch parfois plus petit), il est préférable d'utiliser ici la valeurval_images.size(0)
.
- sommer les moyennes pondérées de la métrique sur l'ensemble des GPU :
dist.all_reduce(val_loss, op=dist.ReduceOp.SUM)
pour sommer les valeurs de la métrique calculées par GPU et communiquer le résultat à l'ensemble des GPU. Cette opération entraîne des communications inter-GPU.
Exemple après chargement des données de validation :
model.eval() # switch into validation mode val_loss = torch.Tensor([0.]).to(gpu) # initialize val_loss value N = len(val_dataset) # get validation dataset length for val_images, val_labels in val_loader: # loop over validation batches val_images = val_images.to(gpu, non_blocking=True) # transfer images and labels to GPUs val_labels = val_labels.to(gpu, non_blocking=True) with torch.no_grad(): # deactivate gradient computation with autocast(): # activate AMP outputs = model(val_images) # evaluate model loss = criterion(outputs, val_labels) # compute loss val_loss += loss * val_images.size(0) / N # cumulate weighted mean per GPU dist.all_reduce(val_loss, op=dist.ReduceOp.SUM) # sum weighted means and broadcast value to each GPU model.train() # switch again into training mode
Exemple d'application
Exécution multi-GPU, multi-nœuds avec "DistributedDataParallel"
Un exemple se trouve dans $DSDIR/examples_IA/Torch_parallel/Example_DataParallelism_Pytorch.ipynb
sur Jean-Zay, il utilise la base de données MNIST et un réseau dense simple. L'exemple est un Notebook qui permet de créer un script d'exécution.
Vous pouvez aussi télécharger le notebook en cliquant sur ce lien.
Il est à copier sur votre espace personnel (idéalement sur votre $WORK
).
$ cp $DSDIR/examples_IA/Torch_parallel/Example_DataParallelism_PyTorch.ipynb $WORK
Vous devez ensuite exécuter le Notebook à partir d'une machine frontale de Jean Zay en chargeant préalablement un module PyTorch (voir notre documentation sur l'accès à JupyterHub pour en savoir plus sur l'usage des Notebooks sur Jean Zay).
Documentation et sources
Annexes
Sur Jean Zay, pour un model resnet 101, en fixant une taille de mini batch fixe (la taille globale du batch augmente avec le nombre de GPU impliqués), on obtient les vitesses d'apprentissage suivantes qui croîssent avec le nombre de GPU impliqués. Le protocole de communication NCCL est toujours plus performant que GLOO. La communication entre Octo-GPU apparaît plus lente qu'entre quadri-GPU.
Pour NCCL, voici les temps moyens d'une itération de batch pour un certain nombre de GPU impliqué dans la distribution. Les différences de temps correspondent au temps de synchronisation entre GPU.