---
title: "How to add an EMR step in Airflow and wait until it finishes running"
description: "How to use AwsHook and EmrStepSensor to add an EMR step and wait until it finishes running"
author: "Bartosz Mikulski"
author_bio: "Principal AI Engineer & MLOps Architect. I bridge the gap between \"it works in a notebook\" and \"it works for 200 million users.\""
author_url: https://mikulskibartosz.name
author_linkedin: https://www.linkedin.com/in/mikulskibartosz/
author_github: https://github.com/mikulskibartosz
canonical_url: https://mikulskibartosz.name/add-emr-step-in-airflow-and-wait-until-it-finishes-running
---

In this article, I am going to show you three things:

* how to retrieve the EMR cluster id using the cluster name
* how to add an EMR step to an existing EMR cluster using the AwsHook in Airflow
* how to define an `EmrStepSensor` to wait until the EMR finishes processing

First, we have to import the AwsHook and create a new instance of it. In this step, I assume that the AWS connection configuration has been already added to Airflow:

```python
from airflow.contrib.hooks.aws_hook import AwsHook

aws_hook = AwsHook(aws_conn_id=AWS_CONNECTION_ID)
emr_client = aws_hook.get_client_type('emr')
```

## Get cluster id by name

Now, I can list all active EMR cluster, find the one that I need using its name and store its identifier in a variable:

```python
response = emr_client.list_clusters(ClusterStates=['RUNNING', 'WAITING'])

matching_clusters = list(
    filter(lambda cluster: cluster['Name'] == EMR_CLUSTER_NAME, response['Clusters'])
)

if len(matching_clusters) == 1:
    cluster_id = matching_clusters[0]['Id']
else:
    # handle the error
```

## Add a step to the EMR cluster

Now, I am going to define a Python function that adds a new step to the EMR cluster. The function will return the cluster id and the step id, so it is possible to use it in a `PythonOperator`. In this case, it will automatically store the output in XCOM, so we can use it in the `EmrStepSensor` later.

Before I begin, I must define the command I want to run. In this example, I pretend that I want to submit a Spark job stored in a jar file on S3. Remember that the EMR step arguments cannot be longer than 10280 characters!

```python
def run_spark_job():
    # here is the code that retrieves the cluster id

    step_args = f'spark-submit --master yarn --deploy-mode client ' \
        f'--class name.mikulskibartosz.SparkJob ' \
        f's3://bucket_name/spark_job.jar ' \
        f'--job_parameter something'
```

After that, I use the `step_args` in the `Args` part of a EMR step definition:

```python
step_configuration = {
    'Name': 'step_name',
    'ActionOnFailure': 'CONTINUE',
    'HadoopJarStep': {
        'Jar': 'command-runner.jar',
        'Args': step_args
    }
}
```

In the end, I use the `add_job_flow_steps` function to add the step to the cluster. Note that I can add multiple steps at once because the function accepts a list of steps. In this example, however, I am going to define only one step:

```python
step_ids = emr_client.add_job_flow_steps(JobFlowId=cluster_id, Steps=[step_configuration])
```

In the last line, I extract the step id from the `step_ids` and return a tuple that contains both the cluster id and the step id:

```python
return cluster_id, step_ids['StepIds'][0]
```

## Wait for the result

Let's assume that I ran the function using a `PythonOperator`, so the returned values are available in XCOM. In this case, I can define an `EmrStepSensor` that pauses the DAG until the EMR step finishes processing.

If the task that executes the `run_spark_job` function is called `run_emr_step_task` I can retrieve the cluster id by requesting the `return_value` of that task and selecting the first element of the tuple:

```python
task_instance.xcom_pull('run_emr_step_task', key='return_value')[0]
```

Similarly, the step id is in the second element of the same tuple. I can use an Airflow template to get both of them:

```python
wait_for_it = EmrStepSensor(
    task_id='wait_for_it',
    {% raw %}job_flow_id="{{ task_instance.xcom_pull('run_emr_step_task', key='return_value')[0] }}" {% endraw %}
    {% raw %}step_id="{{ task_instance.xcom_pull('run_emr_step_task', key='return_value')[1] }}" {% endraw %}
    dag=dag
)
```

