This article will show you how to run batch machine learning inference on a dataset stored in S3. We’ll use the Sagemaker Batch Transform Jobs and a pre-trained machine learning model.
Table of Contents
I assume you have already trained the model, pushed the Docker image to ECR, and registered the model in Sagemaker. If you don’t know how to do it, take a look at one of my previous articles:
- How to deploy a Transformer-based model with custom preprocessing code to Sagemaker Endpoints using BentoML
- How to deploy a Tensorflow model using Sagemaker Endpoints and AWS Code Pipeline
- Multimodel deployment in Sagemaker Endpoints
Preprocessing the data
We’ll need the input data in CSV files or text files containing JSON objects. In the case of JSON, the file should contain one JSON per line - the jsonl
format. Sagemaker batch transform jobs can read uncompressed data and files using gzip compression.
Our example model classifies text and does the entire data preprocessing internally in the Docker container. We send raw data to the endpoint, and the Docker container cleans the input, tokenizes it, and passes the tokenization result to the model. If the model requires two input parameters called: feature_A
and feature_B
, we create jsonl
files looking like this:
{"feature_A": "This is a text", "feature_B": "Some other text"}
In the case of CSV files, we CAN’T use headers, so the Docker container must recognize parameters using their position in the input array instead of feature names.
Want to build AI systems that actually work?
Download my expert-crafted GenAI Transformation Guide for Data Teams and discover how to properly measure AI performance, set up guardrails, and continuously improve your AI solutions like the pros.
Running the batch inference
To run the batch inference, we need the identifier of the Sagemaker model we want to use and the location of the input data. We’ll also need to decide where Sagemaker will store the output.
First, we have to configure a Transformer. We’ll use the “assembly with line” mode to combine the output with the input. It’ll make it easier to process the results. However, we create a duplicate of input data, so you may want to use a different mode if you must limit the cost of S3 storage.
We have also specified the input data format application/json
and instruct Sagemaker to process data points one by one (SingleRecord
strategy). Alternatively, we could use batching and send multiple data points to the Docker container at once, but our code (in the Docker container) must support such an input format. In this example, I reuse the model I was using for real-time inference, so SingleRecord
is my only option.
from sagemaker.transformer import Transformer
transformer = Transformer(
model_name='model_id',
instance_count=1,
instance_type='ml.m5.large',
strategy='SingleRecord',
output_path='s3://bucket_name/key_prefix',
base_transform_job_name='transform_job_name_prefix',
accept='application/json',
assembly_with='Line'
)
When we have the Transformer, we can start a new batch transform job. Unfortunately, the API requires parameter duplication, so we specify the content_type
and the join_source
again, even though application/json
and Line
are the only options we can use with our Transformer (because of the parameters we passed to its constructor).
We can pass the input location in two ways. We can pass the object key or a prefix of multiple object keys as in the example below or pass the object key of a ManifestFile. In the ManifestFile, we list the exact S3 keys of the files we want to process.
transformer.transform(
's3://bucket/input_file_key_or_prefix',
content_type='application_json',
split_type='Line',
join_source='Input',
wait=False)
The transform method doesn’t return the job’s identifier, so to check whether we can read the output, we need to find the running job using boto3 and check the job status. We can do it like this:
import boto3
client = boto3.client('sagemaker')
client.list_transform_jobs(NameContains='transform_job_name_prefix')
It’ll find the jobs with the given prefix. It’s the same prefix as the base_transform_job_name
we used in the Transformer constructor. Alternatively, we could set the wait
parameter to True and wait for the result.
Speeding up the processing
We have only one instance running, so processing the entire file may take some time. We can increase the number of instances using the instance_count
parameter to speed it up. We can send multiple requests to the Docker container simultaneously, too. The configure concurrent transformations we must use the max_concurrent_transforms
parameter. However, when we use this option, we must also make sure the instance type we selected is powerful enough to handle multiple requests at the same time. It may be an issue when we run a large language processing model.
Processing the output
In the end, we must get access to the output. We’ll find the output files in the location specified in the Transformer constructor. Every line contains the prediction and the input parameters. For example:
{"SageMakerOutput":{"predictions": [[0.76]]}, "feature_A": "This is a text", "feature_B": "Some other text"}
If you didn’t configure the join of the output with the input, the file would contain only the predictions. You can join the files using the line numbers (or the index number after loading the datasets to Pandas).