How to run a Python script as a part of your data pipeline? We can use the PythonOperator
in Airflow to run the script. It is a decent solution, but we will block an Airflow worker. What if we want to run the Python script in an external system?
I have seen projects where such scripts were running on the master node in the EMR cluster. It is not a terrible idea either. The coordinating node in a cluster usually has nothing to do anyway, so we can use it to run additional code.
However, the best solution is to use a specialized environment to run the Python scripts, for example, AWS Batch.
Packing the script as a Docker image
To run a Python script in AWS Batch, we have to generate a Docker image that contains the script and the entire runtime environment.
Let’s assume that I have my script in the main.py
file inside a separate directory, which also contains the requirements.txt
file. To generate a Docker image, I have to add a Dockerfile
:
FROM python:3.8
WORKDIR /script
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY main.py .
ENTRYPOINT [ "python", "/main.py"]
In a deployment script, I have to build the Docker container:
docker build -t docker_image_name .
Now, I can use the AWS Elastic Container Registry to store the generated Docker image. Before I push the image, I have to log in to the container registry and tag the image. Note that you must replace the
aws ecr get-login-password --region us-east-1 | docker login --username AWS --password-stdin <container registry id>.dkr.ecr.us-east-1.amazonaws.com
docker tag docker_image_name:latest <container registry id>.dkr.ecr.us-east-1.amazonaws.com/docker_image_name:latest
In the end, I can push the image to the repository:
docker push <container registry id>.dkr.ecr.us-east-1.amazonaws.com/docker_image_name:latest
Configuring an AWS Batch job
To configure the AWS Batch job, I will use Terraform. In the first step, I have to define the compute environment:
resource "aws_batch_compute_environment" "python_comp_env" {
compute_environment_name = "python_comp_env"
compute_resources {
instance_role = "arn_of_an_instance_role"
allocation_strategy = "BEST_FIT"
instance_type = [
"optimal"
]
max_vcpus = 4
min_vcpus = 0
security_group_ids = [
aws_security_group.some_security_group.id,
]
subnets = [
aws_subnet.some_subnet.id,
]
type = "EC2"
}
service_role = "arn_of_aws_batch_service_role"
type = "MANAGED"
}
To schedule jobs, we’ll add them to a job queue, so in the next step, we must define the queue:
resource "aws_batch_job_queue" "python_scripts_queue" {
name = "python_scripts_queue"
state = "ENABLED"
priority = 1
compute_environments = [aws_batch_compute_environment.python_comp_env.arn]
}
Finally, we create a job definition to tell AWS Batch which Docker image to run in the compute environment:
esource "aws_batch_job_definition" "the_script_to_run" {
name = "the_script_to_run"
type = "container"
container_properties = <<CONTAINER_PROPERTIES
{
"command": [],
"image": "<container registry id>.dkr.ecr.us-east-1.amazonaws.com/docker_image_name:latest",
"memory": 2048,
"vcpus": 2,
"jobRoleArn": "arn_role_with_required_permissions",
"volumes": [],
"environment": [],
"mountPoints": [],
"ulimits": []
}
CONTAINER_PROPERTIES
}
Running an AWS Batch job in Airflow
When the Docker image, the computing environment, and the job definition are ready, I can add the AWS batch job to an Airflow pipeline using a built-in operator:
from airflow.contrib.operators.awsbatch_operator import AWSBatchOperator
batch_params = ["",""]
run_aws_batch = AWSBatchOperator(
task_id='run_aws_batch',
aws_conn_id='aws_conn_id',
job_name='aws_batch_job_name',
job_definition='aws_batch_job_definition',
job_queue='batch_job_queue',
overrides={'command': batch_params},
dag=dag
)