How to run a Python script as a part of your data pipeline? We can use the PythonOperator
in Airflow to run the script. It is a decent solution, but we will block an Airflow worker. What if we want to run the Python script in an external system?
Table of Contents
- Packing the script as a Docker image
- Configuring an AWS Batch job
- Running an AWS Batch job in Airflow
I have seen projects where such scripts were running on the master node in the EMR cluster. It is not a terrible idea either. The coordinating node in a cluster usually has nothing to do anyway, so we can use it to run additional code.
However, the best solution is to use a specialized environment to run the Python scripts, for example, AWS Batch.
Packing the script as a Docker image
To run a Python script in AWS Batch, we have to generate a Docker image that contains the script and the entire runtime environment.
Let’s assume that I have my script in the main.py
file inside a separate directory, which also contains the requirements.txt
file. To generate a Docker image, I have to add a Dockerfile
:
FROM python:3.8
WORKDIR /script
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY main.py .
ENTRYPOINT [ "python", "/main.py"]
In a deployment script, I have to build the Docker container:
docker build -t docker_image_name .
Now, I can use the AWS Elastic Container Registry to store the generated Docker image. Before I push the image, I have to log in to the container registry and tag the image. Note that you must replace the
aws ecr get-login-password --region us-east-1 | docker login --username AWS --password-stdin <container registry id>.dkr.ecr.us-east-1.amazonaws.com
docker tag docker_image_name:latest <container registry id>.dkr.ecr.us-east-1.amazonaws.com/docker_image_name:latest
In the end, I can push the image to the repository:
docker push <container registry id>.dkr.ecr.us-east-1.amazonaws.com/docker_image_name:latest
Want to build AI systems that actually work?
Download my expert-crafted GenAI Transformation Guide for Data Teams and discover how to properly measure AI performance, set up guardrails, and continuously improve your AI solutions like the pros.
Configuring an AWS Batch job
To configure the AWS Batch job, I will use Terraform. In the first step, I have to define the compute environment:
resource "aws_batch_compute_environment" "python_comp_env" {
compute_environment_name = "python_comp_env"
compute_resources {
instance_role = "arn_of_an_instance_role"
allocation_strategy = "BEST_FIT"
instance_type = [
"optimal"
]
max_vcpus = 4
min_vcpus = 0
security_group_ids = [
aws_security_group.some_security_group.id,
]
subnets = [
aws_subnet.some_subnet.id,
]
type = "EC2"
}
service_role = "arn_of_aws_batch_service_role"
type = "MANAGED"
}
To schedule jobs, we’ll add them to a job queue, so in the next step, we must define the queue:
resource "aws_batch_job_queue" "python_scripts_queue" {
name = "python_scripts_queue"
state = "ENABLED"
priority = 1
compute_environments = [aws_batch_compute_environment.python_comp_env.arn]
}
Finally, we create a job definition to tell AWS Batch which Docker image to run in the compute environment:
esource "aws_batch_job_definition" "the_script_to_run" {
name = "the_script_to_run"
type = "container"
container_properties = <<CONTAINER_PROPERTIES
{
"command": [],
"image": "<container registry id>.dkr.ecr.us-east-1.amazonaws.com/docker_image_name:latest",
"memory": 2048,
"vcpus": 2,
"jobRoleArn": "arn_role_with_required_permissions",
"volumes": [],
"environment": [],
"mountPoints": [],
"ulimits": []
}
CONTAINER_PROPERTIES
}
Running an AWS Batch job in Airflow
When the Docker image, the computing environment, and the job definition are ready, I can add the AWS batch job to an Airflow pipeline using a built-in operator:
from airflow.contrib.operators.awsbatch_operator import AWSBatchOperator
batch_params = ["",""]
run_aws_batch = AWSBatchOperator(
task_id='run_aws_batch',
aws_conn_id='aws_conn_id',
job_name='aws_batch_job_name',
job_definition='aws_batch_job_definition',
job_queue='batch_job_queue',
overrides={'command': batch_params},
dag=dag
)