data:image/s3,"s3://crabby-images/39abf/39abfe49c5772415117213693699e6f8cece551c" alt="image"
Table des matières
TensorFlow: Database loading for distributed learning of a model
In this page, we describe the management of Datasets and DataGenerators for the TensorFlow distributed learning model. We are focusing here on issues introduced in the main page on data loading.
On this page, we describe the usage of:
- Datasets (customised and pre-defined)
- Transformations of input data (“map” function)
The page concludes with the presentation of a complete example of the loading of optimised data and implementation on Jean Zay via a Jupyter Notebook.
Preliminary comment : This documentation is limited to theTensorFlow 2.x release.
Datasets
In TensorFlow, the database manipulation is managed by a tf.data.Dataset
type object.
Pre-defined datasets in TensorFlow
TensorFlow offers a group of databases which are ready to use in the tensorflow-datasets module. Some of them are already available in the Jean Zay $DSDIR
common space in the tensorflow_datasets
directory.
At the loading, it is possible to differentiate the data dedicated to the training from data dedicated to the validation by using the split
option. For example, for the ImageNet2012 database:
import tensorflow as tf import tensorflow_datasets as tfds # load imagenet2012 dataset from DSDIR tfds_dir = os.environ['DSDIR'] + '/tensorflow_datasets/' # load data for training dataset = tfds.load('imagenet2012', data_dir=tfds_dir, split='train') # load data for testing dataset_test = tfds.load('imagenet2012', data_dir=tfds_dir, split='test')
Each loading function then proposes functionalities specific to the databases (data quality, partial data extraction, etc). For more information, please consult the official documentation. Comments :
- The tensorflow-datasets database collection is in the TFRecord format.
- Certain functions propose downloading the databases on line using the
download=True
argument. We remind you that the Jean Zay compute nodes do not have access to internet and such operations must be done from a front end or a pre-/post-processing node. - The
$DSDIR/tensorflow_datasets
directory can be augmented upon request to the IDRIS support team (assist@idris.fr).
Customised datasets
There are different ways to generate a tf.data.Dataset
type object from the reading of input files. The most commonly used are:
tf.data.Dataset.list_files()
for reading standard formats:import tensorflow as tf # locate Places365 dataset in DSDIR places365_dir = os.environ['DSDIR']+'/Places365-Standard/data_256/' # extract images from places beginning by ''a'' dataset = tf.data.Dataset.list_files(places365_dir + 'a/*.jpg')
By default, the
list_files
function organises the files randomly. To deactivate this functionality, you can specify theshuffle=False
argument.
Comment: The list_files
function applies the Python glob
function to its input parameter. If you already have the list of paths to the input files (because of a glob
, for example), it would be preferable to extract the data from the tf.data.Dataset.from_tensor_slices(glob_list)
function. For a randomized mixing of the files, it is necessary to apply a function to the list of paths beforehand, such as random.Random(seed).shuffle(glob_list)
. If there is a call to the .shard()
function (see below), the value of « seed» must be the same on each worker.
tf.data.TFRecordDataset()
for reading files in the TFRecord format:import tensorflow as tf # locate imagenet dataset in DSDIR imagenet_path = os.environ['DSDIR']+'/imagenet/' # extract training data from dataset dataset = tf.data.TFRecordDataset(imagenet_path + 'train*'))
Comments :
- You can create your files in the TFRecord format yourself from a database by following this tutorial. The
$DSDIR
directory can also be enriched with Datasets in the TFRecord format upon request to the IDRIS support team assist@idris.fr. - It is not possible to know the size of a Dataset loaded from a TFRecord by using the
len(dataset)
command (this returns an error). It is necessary to recover the information from the database source which enabled the creation of the TFRecord. - If your Dataset is partitionned into a number of TFRecords, it is advised to use the
interleave()
function during the reading of the Dataset. The partitionned TFRecords are then read concurrently, thus more rapidly. This also allows us to avoid read redundancy between the different processes when using data parallelism.dataset = tf.data.Dataset.list_files(pattern) dataset = dataset.interleave(tf.data.TFRecordDataset, num_parallel_calls=tf.data.AUTOTUNE, deterministic=False)
Moreover, the
num_parallel_calls=tf.data.AUTOTUNE
option enables the activation of multithreading. The number of CPUs solicited is determined automatically in function of the available CPUs. Thedeterministic=False
option improves the processing performance when the order of operations is not important.
Transformations
The map function
Input data can be transformed by using the tf.data.Dataset.map
function. The transformation function is defined by the user and then communicated to the map function. For example, to resize the images of the ImageNet2012 database:
# define resizing function def img_resize(x) : return tf.image.resize(x,[300,300]) # apply transformation to dataset dataset = dataset.map(img_resize, num_parallel_calls=tf.data.AUTOTUNE, deterministic=False)
Comments :
- The
num_parallel_calls=tf.data.AUTOTUNE
option enables the activation of multithreading for the transformation step. The number of CPUs solicited is determined automatically in function of the available CPUs. - The
deterministic=False
option enables the improvement of processing performance when the order of operations is not important.
The cache function
Certain transformation operations are deterministic (data reading, normalization, resizing, …): That is, they generate the same transformed data each time, whereas others are linked to Data Augmentation and are randomized. Deterministic transformations generally represent heavy processing generating low-volume data. These transformed data can be stored in a memory cache so that the transformation is not unnecessarily repeated at each epoque. For this, we use the cache()
function:
# define random transformation def random_trans(x) : return ... # define deterministic transformation def determ_trans(x) : return ... # apply deterministic transformation to dataset and store them in cache dataset = dataset.map(determ_trans, num_parallel_calls=tf.data.AUTOTUNE).cache() # apply random transformation to dataset at each epoch dataset = dataset.map(random_trans, num_parallel_calls=tf.data.AUTOTUNE)
By default, the cache memory used is the RAM memory of the CPU node. It is also possible to store the transformed data in an external file. For this, you just need to specify the path to the storage file during the cache()
function call: .cache(path/to/file)
.
Comment:
- The
cache()
function is useful if the RAM memory of the CPU is capable of receiving all of the database. The time gain with a.cache(path/to/file)
to a storage file is limited, except in the extreme case where the data loading and their deterministic transformations would be an important congestion point in the learning loop.
Configuration and optimisation of the preprocessing steps
Random processing of input data
The shuffle()
function enables activation of the process of random sorting of data at each epoch during the learning. This function takes the buffer_size
as input argument which sets the buffer memory size used for this operation. Ideally, the memory buffer should be able to contain all of the database in order to avoid any bias in the randomization process. In practice, a database is often too large and a buffer of reduced size (1 000 or 10 000 pointers) enables memory economisation.
Data distribution on more than one process for a distributed learning
To distribute data on more than one process (or worker) for a distributed learning, as must be done with Horovod, you need to use the shard()
function. This function takes the number of workers and the global index of the worker as input argument. These values can be recovered from the Slurm computing configuration as follows:
import tensorflow as tf import idr_tf # IDRIS package available in all TensorFlow modules # load dataset dataset = ... # get number of processes/workers num_workers = idr_tf.size # or hvd.size() if Horovod is used worker_index = idr_tf.rank # or hvd.rank() if Horovod is used # distribute dataset dataset = dataset.shard(num_workers,worker_index)
Here we are using the idr_tf
library to recover the information relative to the Slurm computing environment. This library, developed by IDRIS, is present in all of the TensorFlow modules on Jean Zay. If Horovod
is used, it is possible to get the same information with Horovod.
If you use the tf.distribute.MultiWorkerMirroredStrategy
distribution strategy, Autosharding is set by default. You should therefore not use the .shard()
. If you would like to disable Autosharding, please read theTensorflow documentation.
Comments :
- In calling the
shard
function, each worker only reads a part of the database. - In distributed configuration, it is important to effectuate the shuffling step after the distribution step. If not, the database will be entirely read by each worker and the performance will be poorer. In general, the
shard
function must be applied as soon as possible.
Optimisation of resource usage during the learning
The batch size is defined by the batch()
function. A batch size is optimal if it enables a good usage of the computing ressources; that is, if the memory of each GPU is maximally solicited and the work load is divided equitably between the GPUs. The drop_remainder=True
option allows ignoring the last batch if its size is inferior to the requested batch size. The batch
function is always called after the shuffle
function in order to obtain a global batch at each epoque.
Transfer/computation overlapping
It is possible to optimise the batch transfers from CPU to GPU by generating transfer/computation overlapping. The prefetch()
functionality enables pre-loading the next batches to be processed during the training. The quantity of pre-loaded batches is controlled by the buffer_size
argument. For an automatic definition of the buffer_size
, it is possible to specify buffer_size=tf.data.AUTOTUNE
.
Notes
- The
.repeat()
function enables repeating a Dataset indefinitely in a Data Generator, or to repeat it on the given number of epochs with.repeat(num_epoch)
; it is optional. The absence of this function corresponds to a.repeat(1)
. It can be useful if we wish to parameterize a number of learning iterations rathernthan a number of epochs. It can be called anywhere in the chain and the result will be the same. - If the database contains corrupted data which generate an error, it is possible to ignore the errors by using the
dataset = dataset.apply(tf.data.experimental.ignore_errors())
command, applied after the.map()
.
Complete example of optimised data loading
The following is a complete example of optimised loading of the ImageNet database for distributed learning on Jean Zay :
import tensorflow as tf import idr_tf # IDRIS package available in all TensorFlow modules import os import glob import random IMG_WIDTH=320 IMG_HEIGHT=320 def decode_img(file_path): # parse label label = tf.strings.split(file_path, sep='/')[-2] # read input file img = tf.io.read_file(file_path) # decode jpeg format (channel=3 for RGB, channel=0 for Grayscale) img = tf.image.decode_jpeg(img, channels=3) # convert to [0,1] format for TensorFlow compatibility img = tf.image.convert_image_dtype(img, tf.float32) # resize image return label, tf.image.resize(img, [IMG_WIDTH, IMG_HEIGHT]) # Create a generator rng = tf.random.Generator.from_seed(123, alg='philox') def randomized_preprocessing(label, img): # randomly adjust image contrast - Data Augmentation contrast_factor = random.random() + 1.0 img = tf.image.adjust_contrast(img,contrast_factor=contrast_factor) img = tf.image.stateless_random_flip_left_right(img,rng.make_seeds(2)[0]) return label, img # configuration num_epochs = 3 batch_size = 64 shuffling_buffer_size = 5000 num_parallel_calls = tf.data.AUTOTUNE prefetch_factor = tf.data.experimental.AUTOTUNE # locate Places365 dataset in DSDIR and list them places365_path = glob.glob(os.environ['DSDIR']+"/Places365-Standard/data_256/**/*.jpg", recursive=True) random.Random(123).shuffle(places365_path) # create a dataset object from path dataset = tf.data.Dataset.from_tensor_slices(places365_path) # distribute dataset num_workers = idr_tf.size worker_index = idr_tf.rank dataset = dataset.shard(num_workers,worker_index).shuffle(shuffling_buffer_size) # deterministic transformation dataset = dataset.map(decode_img, num_parallel_calls=num_parallel_calls, deterministic=False) # cache function only relevant for small to medium datasets dataset = dataset.cache() # random transformations dataset = dataset.map(randomized_preprocessing, num_parallel_calls=num_parallel_calls, deterministic=False) dataset = dataset.batch(batch_size, drop_remainder=True).prefetch(prefetch_factor) ## Repeat dataset num_epochs times #dataset = dataset.repeat(num_epochs) #for label, img in dataset: # train_step(label, img) ## equivalent to: for epoch in range(num_epochs): for label, img in dataset: train_step(label, img)
The following is an example of the loading chain when loading a global TFRecord file:
dataset = tf.data.TFRecordDataset(input_file) dataset = dataset.shard(num_workers, worker_index) dataset = dataset.shuffle(shuffle_buffer_size) dataset = dataset.map(parser_fn, num_parallel_calls=num_map_threads) dataset = dataset.batch(batch_size, drop_remainder=True).prefetch(prefetch_factor) dataset = dataset.repeat(num_epochs)
Lastly, here is an example of the loading chain when loading a Dataset partitionned into a number of TFRecords:
dataset = Dataset.list_files(pattern) dataset = dataset.shard(num_workers, worker_index) dataset = dataset.shuffle(shuffle_buffer_size) dataset = dataset.interleave(tf.data.TFRecordDataset, cycle_length=num_readers, block_length=1) dataset = dataset.map(parser_fn, num_parallel_calls=num_map_threads) dataset = dataset.batch(batch_size, drop_remainder=True).prefetch(prefetch_factor) dataset = dataset.repeat(num_epochs)
Implementation on Jean Zay
In order to implement the above documentation and get an idea of the gains brought by each of the functionalities offered by the TensorFlow datase, you can recover the Jupyter Notebook, notebook_data_preprocessing_tensorflow-eng.ipynb
, in the DSDIR
. For example, to recover it in your WORK
:
$ cp $DSDIR/examples_IA/Tensorflow_parallel/notebook_data_preprocessing_tensorflow-eng.ipynb $WORK
You can also download the Notebook here.
Then you can open and execute the Notebook from the IDRIS jupyterhub platform. To use jupyterhub, please refer to the corresponding documentations: Jean Zay: Access to JupyterHub and https://jupyterhub.idris.fr/services/documentation/.
Creation of TFRecord files and their usage are also addressed here.