How to use AWS Batch to run a Python script

How to run a Python script as a part of your data pipeline? We can use the PythonOperator in Airflow to run the script. It is a decent solution, but we will block an Airflow worker. What if we want to run the Python script in an external system?

Table of Contents

  1. Packing the script as a Docker image
  2. Configuring an AWS Batch job
  3. Running an AWS Batch job in Airflow

I have seen projects where such scripts were running on the master node in the EMR cluster. It is not a terrible idea either. The coordinating node in a cluster usually has nothing to do anyway, so we can use it to run additional code.

However, the best solution is to use a specialized environment to run the Python scripts, for example, AWS Batch.

Packing the script as a Docker image

To run a Python script in AWS Batch, we have to generate a Docker image that contains the script and the entire runtime environment.

Let’s assume that I have my script in the main.py file inside a separate directory, which also contains the requirements.txt file. To generate a Docker image, I have to add a Dockerfile:

FROM python:3.8

WORKDIR /script

COPY requirements.txt .

RUN pip install -r requirements.txt

COPY main.py .

ENTRYPOINT [ "python", "/main.py"]

In a deployment script, I have to build the Docker container:

docker build -t docker_image_name .

Now, I can use the AWS Elastic Container Registry to store the generated Docker image. Before I push the image, I have to log in to the container registry and tag the image. Note that you must replace the with the identifier of your registry:

aws ecr get-login-password --region us-east-1 | docker login --username AWS --password-stdin <container registry id>.dkr.ecr.us-east-1.amazonaws.com

docker tag docker_image_name:latest <container registry id>.dkr.ecr.us-east-1.amazonaws.com/docker_image_name:latest

In the end, I can push the image to the repository:

docker push <container registry id>.dkr.ecr.us-east-1.amazonaws.com/docker_image_name:latest

Configuring an AWS Batch job

To configure the AWS Batch job, I will use Terraform. In the first step, I have to define the compute environment:

resource "aws_batch_compute_environment" "python_comp_env" {
  compute_environment_name = "python_comp_env"

  compute_resources {
    instance_role = "arn_of_an_instance_role"
    allocation_strategy = "BEST_FIT"
    instance_type = [
      "optimal"
    ]

    max_vcpus     = 4
    min_vcpus     = 0

    security_group_ids = [
      aws_security_group.some_security_group.id,
    ]

    subnets = [
      aws_subnet.some_subnet.id,
    ]

    type = "EC2"
  }

  service_role = "arn_of_aws_batch_service_role"
  type         = "MANAGED"
}

To schedule jobs, we’ll add them to a job queue, so in the next step, we must define the queue:

resource "aws_batch_job_queue" "python_scripts_queue" {
  name                 = "python_scripts_queue"
  state                = "ENABLED"
  priority             = 1
  compute_environments = [aws_batch_compute_environment.python_comp_env.arn]
}

Finally, we create a job definition to tell AWS Batch which Docker image to run in the compute environment:

esource "aws_batch_job_definition" "the_script_to_run" {
  name = "the_script_to_run"
  type = "container"

  container_properties = <<CONTAINER_PROPERTIES
{
    "command": [],
    "image": "<container registry id>.dkr.ecr.us-east-1.amazonaws.com/docker_image_name:latest",
    "memory": 2048,
    "vcpus": 2,
    "jobRoleArn": "arn_role_with_required_permissions",
    "volumes": [],
    "environment": [],
    "mountPoints": [],
    "ulimits": []
}
CONTAINER_PROPERTIES
}

Running an AWS Batch job in Airflow

When the Docker image, the computing environment, and the job definition are ready, I can add the AWS batch job to an Airflow pipeline using a built-in operator:

from airflow.contrib.operators.awsbatch_operator import AWSBatchOperator

batch_params = ["",""]

run_aws_batch = AWSBatchOperator(
    task_id='run_aws_batch',
    aws_conn_id='aws_conn_id',
    job_name='aws_batch_job_name',
    job_definition='aws_batch_job_definition',
    job_queue='batch_job_queue',
    overrides={'command': batch_params},
    dag=dag
)
Older post

Anomaly detection in Airflow DAG using Prophet library

How to detect problems in Airflow pipeline using Prophet for time series anomaly detection

Newer post

How to measure Spark performance and gather metrics about written data

How to track Spark metrics in AWS CloudWatch

Are you looking for an experienced AI consultant? Do you need assistance with your RAG or Agentic Workflow?
Book a Quick Consultation, send me a message on LinkedIn. Book a Quick Consultation or send me a message on LinkedIn

>