How to add an EMR step in Airflow and wait until it finishes running

In this article, I am going to show you three things:

Table of Contents

  1. Get cluster id by name
  2. Add a step to the EMR cluster
  3. Wait for the result

  • how to retrieve the EMR cluster id using the cluster name
  • how to add an EMR step to an existing EMR cluster using the AwsHook in Airflow
  • how to define an EmrStepSensor to wait until the EMR finishes processing

First, we have to import the AwsHook and create a new instance of it. In this step, I assume that the AWS connection configuration has been already added to Airflow:

from airflow.contrib.hooks.aws_hook import AwsHook

aws_hook = AwsHook(aws_conn_id=AWS_CONNECTION_ID)
emr_client = aws_hook.get_client_type('emr')

Get cluster id by name

Now, I can list all active EMR cluster, find the one that I need using its name and store its identifier in a variable:

response = emr_client.list_clusters(ClusterStates=['RUNNING', 'WAITING'])

matching_clusters = list(
    filter(lambda cluster: cluster['Name'] == EMR_CLUSTER_NAME, response['Clusters'])
)

if len(matching_clusters) == 1:
    cluster_id = matching_clusters[0]['Id']
else:
    # handle the error

Add a step to the EMR cluster

Now, I am going to define a Python function that adds a new step to the EMR cluster. The function will return the cluster id and the step id, so it is possible to use it in a PythonOperator. In this case, it will automatically store the output in XCOM, so we can use it in the EmrStepSensor later.

Before I begin, I must define the command I want to run. In this example, I pretend that I want to submit a Spark job stored in a jar file on S3. Remember that the EMR step arguments cannot be longer than 10280 characters!

def run_spark_job():
    # here is the code that retrieves the cluster id

    step_args = f'spark-submit --master yarn --deploy-mode client ' \
        f'--class name.mikulskibartosz.SparkJob ' \
        f's3://bucket_name/spark_job.jar ' \
        f'--job_parameter something'

After that, I use the step_args in the Args part of a EMR step definition:

step_configuration = {
    'Name': 'step_name',
    'ActionOnFailure': 'CONTINUE',
    'HadoopJarStep': {
        'Jar': 'command-runner.jar',
        'Args': step_args
    }
}

In the end, I use the add_job_flow_steps function to add the step to the cluster. Note that I can add multiple steps at once because the function accepts a list of steps. In this example, however, I am going to define only one step:

step_ids = emr_client.add_job_flow_steps(JobFlowId=cluster_id, Steps=[step_configuration])

In the last line, I extract the step id from the step_ids and return a tuple that contains both the cluster id and the step id:

return cluster_id, step_ids['StepIds'][0]

Wait for the result

Let’s assume that I ran the function using a PythonOperator, so the returned values are available in XCOM. In this case, I can define an EmrStepSensor that pauses the DAG until the EMR step finishes processing.

If the task that executes the run_spark_job function is called run_emr_step_task I can retrieve the cluster id by requesting the return_value of that task and selecting the first element of the tuple:

task_instance.xcom_pull('run_emr_step_task', key='return_value')[0]

Similarly, the step id is in the second element of the same tuple. I can use an Airflow template to get both of them:

wait_for_it = EmrStepSensor(
    task_id='wait_for_it',
    job_flow_id="{{ task_instance.xcom_pull('run_emr_step_task', key='return_value')[0] }}" 
    step_id="{{ task_instance.xcom_pull('run_emr_step_task', key='return_value')[1] }}" 
    dag=dag
)
Older post

How to use Virtualenv to prepare a separate environment for Python function running in Airflow

How to use the PythonVirtualenvOperator in Airflow

Newer post

Use HttpSensor to pause an Airflow DAG until a website is available

Pause an Airflow DAG until an HTTP endpoint returns 200 OK

Are you looking for an experienced AI consultant? Do you need assistance with your RAG or Agentic Workflow?
Book a Quick Consultation, send me a message on LinkedIn. Book a Quick Consultation or send me a message on LinkedIn

>