Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place. Commercial Alternative to JupyterHub.
Real-time collaboration for Jupyter Notebooks, Linux Terminals, LaTeX, VS Code, R IDE, and more,
all in one place. Commercial Alternative to JupyterHub.
Path: blob/main/sagemaker/07_tensorflow_distributed_training_data_parallelism/sagemaker-notebook.ipynb
Views: 2542
Huggingface Sagemaker-sdk - Distributed Training Demo for TensorFlow
Distributed Data Parallelism with transformers
and tensorflow
Introduction
Welcome to our distributed end-to-end binary Text-Classification example. In this demo, we will use the Hugging Faces transformers
and datasets
library together with a custom Amazon sagemaker-sdk extension to fine-tune a pre-trained transformer on binary text classification. In particular, the pre-trained model will be fine-tuned using the imdb
dataset. To speed upload Training we are going to use SageMaker distributed Data Parallel library to run our training distributed across multiple gpus. To get started, we need to set up the environment with a few prerequisite steps, for permissions, configurations, and so on.
NOTE: You can run this demo in Sagemaker Studio, your local machine or Sagemaker Notebook Instances
Development Environment and Permissions
Installation
Note: we only install the required libraries from Hugging Face and AWS. You also need PyTorch or Tensorflow, if you haven´t it installed
Development environment
upgrade ipywidgets for datasets
library and restart kernel, only needed when prerpocessing is done in the notebook
Permissions
If you are going to use Sagemaker in a local environment. You need access to an IAM Role with the required permissions for Sagemaker. You can find here more about it.
Preprocessing
In this example the preproccsing will be done in the train.py
when executing the script. You could also move the preprocessing
outside of the script and upload the data to s3 and pass it into it.
Fine-tuning & starting Sagemaker Training Job
In order to create a sagemaker training job we need an HuggingFace
Estimator. The Estimator handles end-to-end Amazon SageMaker training and deployment tasks. In a Estimator we define, which fine-tuning script should be used as entry_point
, which instance_type
should be used, which hyperparameters
are passed in .....
When we create a SageMaker training job, SageMaker takes care of starting and managing all the required ec2 instances for us with the huggingface
container, uploads the provided fine-tuning script train.py
and downloads the data from our sagemaker_session_bucket
into the container at /opt/ml/input/data
. Then, it starts the training job by running.
The hyperparameters
you define in the HuggingFace
estimator are passed in as named arguments.
Sagemaker is providing useful properties about the training environment through various environment variables, including the following:
SM_MODEL_DIR
: A string that represents the path where the training job writes the model artifacts to. After training, artifacts in this directory are uploaded to S3 for model hosting.SM_NUM_GPUS
: An integer representing the number of GPUs available to the host.SM_CHANNEL_XXXX:
A string that represents the path to the directory that contains the input data for the specified channel. For example, if you specify two input channels in the HuggingFace estimator’s fit call, namedtrain
andtest
, the environment variablesSM_CHANNEL_TRAIN
andSM_CHANNEL_TEST
are set.
To run your training job locally you can define instance_type='local'
or instance_type='local_gpu'
for gpu usage. Note: this does not working within SageMaker Studio
import argparse
import logging
import os
import sys
import tensorflow as tf
from datasets import load_dataset
from tqdm import tqdm
from transformers import AutoTokenizer, TFAutoModelForSequenceClassification
from transformers.file_utils import is_sagemaker_distributed_available
if os.environ.get("SDP_ENABLED") or is_sagemaker_distributed_available():
SDP_ENABLED = True
os.environ["SAGEMAKER_INSTANCE_TYPE"] = "p3dn.24xlarge"
import smdistributed.dataparallel.tensorflow as sdp
else:
SDP_ENABLED = False
def fit(model, loss, opt, train_dataset, epochs, train_batch_size, max_steps=None):
pbar = tqdm(train_dataset)
for i, batch in enumerate(pbar):
with tf.GradientTape() as tape:
inputs, targets = batch
outputs = model(batch)
loss_value = loss(targets, outputs.logits)
if SDP_ENABLED:
tape = sdp.DistributedGradientTape(tape, sparse_as_dense=True)
grads = tape.gradient(loss_value, model.trainable_variables)
opt.apply_gradients(zip(grads, model.trainable_variables))
pbar.set_description(f"Loss: {loss_value:.4f}")
if SDP_ENABLED:
if i == 0:
sdp.broadcast_variables(model.variables, root_rank=0)
sdp.broadcast_variables(opt.variables(), root_rank=0)
first_batch = False
if max_steps and i >= max_steps:
break
train_results = {"loss": loss_value.numpy()}
return train_results
def get_datasets():
# Load dataset
train_dataset, test_dataset = load_dataset("imdb", split=["train", "test"])
# Preprocess train dataset
train_dataset = train_dataset.map(
lambda e: tokenizer(e["text"], truncation=True, padding="max_length"), batched=True
)
train_dataset.set_format(type="tensorflow", columns=["input_ids", "attention_mask", "label"])
train_features = {
x: train_dataset[x].to_tensor(default_value=0, shape=[None, tokenizer.model_max_length])
for x in ["input_ids", "attention_mask"]
}
tf_train_dataset = tf.data.Dataset.from_tensor_slices((train_features, train_dataset["label"]))
# Preprocess test dataset
test_dataset = test_dataset.map(
lambda e: tokenizer(e["text"], truncation=True, padding="max_length"), batched=True
)
test_dataset.set_format(type="tensorflow", columns=["input_ids", "attention_mask", "label"])
test_features = {
x: test_dataset[x].to_tensor(default_value=0, shape=[None, tokenizer.model_max_length])
for x in ["input_ids", "attention_mask"]
}
tf_test_dataset = tf.data.Dataset.from_tensor_slices((test_features, test_dataset["label"]))
if SDP_ENABLED:
tf_train_dataset = tf_train_dataset.shard(sdp.size(), sdp.rank())
tf_test_dataset = tf_test_dataset.shard(sdp.size(), sdp.rank())
tf_train_dataset = tf_train_dataset.batch(args.train_batch_size, drop_remainder=True)
tf_test_dataset = tf_test_dataset.batch(args.eval_batch_size, drop_remainder=True)
return tf_train_dataset, tf_test_dataset
if __name__ == "__main__":
parser = argparse.ArgumentParser()
# Hyperparameters sent by the client are passed as command-line arguments to the script.
parser.add_argument("--epochs", type=int, default=3)
parser.add_argument("--train-batch-size", type=int, default=16)
parser.add_argument("--eval-batch-size", type=int, default=8)
parser.add_argument("--model_name", type=str)
parser.add_argument("--learning_rate", type=str, default=5e-5)
parser.add_argument("--do_train", type=bool, default=True)
parser.add_argument("--do_eval", type=bool, default=True)
# Data, model, and output directories
parser.add_argument("--output_data_dir", type=str, default=os.environ["SM_OUTPUT_DATA_DIR"])
parser.add_argument("--model_dir", type=str, default=os.environ["SM_MODEL_DIR"])
parser.add_argument("--n_gpus", type=str, default=os.environ["SM_NUM_GPUS"])
args, _ = parser.parse_known_args()
# Set up logging
logger = logging.getLogger(__name__)
logging.basicConfig(
level=logging.getLevelName("INFO"),
handlers=[logging.StreamHandler(sys.stdout)],
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
if SDP_ENABLED:
sdp.init()
gpus = tf.config.experimental.list_physical_devices("GPU")
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
if gpus:
tf.config.experimental.set_visible_devices(gpus[sdp.local_rank()], "GPU")
# Load model and tokenizer
model = TFAutoModelForSequenceClassification.from_pretrained(args.model_name)
tokenizer = AutoTokenizer.from_pretrained(args.model_name)
# get datasets
tf_train_dataset, tf_test_dataset = get_datasets()
# fine optimizer and loss
optimizer = tf.keras.optimizers.Adam(learning_rate=args.learning_rate)
loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
metrics = [tf.keras.metrics.SparseCategoricalAccuracy()]
model.compile(optimizer=optimizer, loss=loss, metrics=metrics)
# Training
if args.do_train:
# train_results = model.fit(tf_train_dataset, epochs=args.epochs, batch_size=args.train_batch_size)
train_results = fit(
model, loss, optimizer, tf_train_dataset, args.epochs, args.train_batch_size, max_steps=None
)
logger.info("*** Train ***")
output_eval_file = os.path.join(args.output_data_dir, "train_results.txt")
if not SDP_ENABLED or sdp.rank() == 0:
with open(output_eval_file, "w") as writer:
logger.info("***** Train results *****")
logger.info(train_results)
for key, value in train_results.items():
logger.info(" %s = %s", key, value)
writer.write("%s = %s\n" % (key, value))
# Evaluation
if args.do_eval and (not SDP_ENABLED or sdp.rank() == 0):
result = model.evaluate(tf_test_dataset, batch_size=args.eval_batch_size, return_dict=True)
logger.info("*** Evaluate ***")
output_eval_file = os.path.join(args.output_data_dir, "eval_results.txt")
with open(output_eval_file, "w") as writer:
logger.info("***** Eval results *****")
logger.info(result)
for key, value in result.items():
logger.info(" %s = %s", key, value)
writer.write("%s = %s\n" % (key, value))
# Save result
if SDP_ENABLED:
if sdp.rank() == 0:
model.save_pretrained(args.model_dir)
tokenizer.save_pretrained(args.model_dir)
else:
model.save_pretrained(args.model_dir)
tokenizer.save_pretrained(args.model_dir)
Creating an Estimator and start a training job
Deploying the endpoint
To deploy our endpoint, we call deploy()
on our HuggingFace estimator object, passing in our desired number of instances and instance type.
Then, we use the returned predictor object to call the endpoint.
Finally, we delete the endpoint again.
Extras
Estimator Parameters
Attach to old training job to an estimator
In Sagemaker you can attach an old training job to an estimator to continue training, get results etc..