In this article, I am going to show you three things:
Table of Contents
- how to retrieve the EMR cluster id using the cluster name
- how to add an EMR step to an existing EMR cluster using the AwsHook in Airflow
- how to define an
EmrStepSensor
to wait until the EMR finishes processing
First, we have to import the AwsHook and create a new instance of it. In this step, I assume that the AWS connection configuration has been already added to Airflow:
from airflow.contrib.hooks.aws_hook import AwsHook
aws_hook = AwsHook(aws_conn_id=AWS_CONNECTION_ID)
emr_client = aws_hook.get_client_type('emr')
Get cluster id by name
Now, I can list all active EMR cluster, find the one that I need using its name and store its identifier in a variable:
response = emr_client.list_clusters(ClusterStates=['RUNNING', 'WAITING'])
matching_clusters = list(
filter(lambda cluster: cluster['Name'] == EMR_CLUSTER_NAME, response['Clusters'])
)
if len(matching_clusters) == 1:
cluster_id = matching_clusters[0]['Id']
else:
# handle the error
Add a step to the EMR cluster
Now, I am going to define a Python function that adds a new step to the EMR cluster. The function will return the cluster id and the step id, so it is possible to use it in a PythonOperator
. In this case, it will automatically store the output in XCOM, so we can use it in the EmrStepSensor
later.
Before I begin, I must define the command I want to run. In this example, I pretend that I want to submit a Spark job stored in a jar file on S3. Remember that the EMR step arguments cannot be longer than 10280 characters!
def run_spark_job():
# here is the code that retrieves the cluster id
step_args = f'spark-submit --master yarn --deploy-mode client ' \
f'--class name.mikulskibartosz.SparkJob ' \
f's3://bucket_name/spark_job.jar ' \
f'--job_parameter something'
After that, I use the step_args
in the Args
part of a EMR step definition:
step_configuration = {
'Name': 'step_name',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': step_args
}
}
In the end, I use the add_job_flow_steps
function to add the step to the cluster. Note that I can add multiple steps at once because the function accepts a list of steps. In this example, however, I am going to define only one step:
step_ids = emr_client.add_job_flow_steps(JobFlowId=cluster_id, Steps=[step_configuration])
In the last line, I extract the step id from the step_ids
and return a tuple that contains both the cluster id and the step id:
return cluster_id, step_ids['StepIds'][0]
Wait for the result
Let’s assume that I ran the function using a PythonOperator
, so the returned values are available in XCOM. In this case, I can define an EmrStepSensor
that pauses the DAG until the EMR step finishes processing.
If the task that executes the run_spark_job
function is called run_emr_step_task
I can retrieve the cluster id by requesting the return_value
of that task and selecting the first element of the tuple:
task_instance.xcom_pull('run_emr_step_task', key='return_value')[0]
Similarly, the step id is in the second element of the same tuple. I can use an Airflow template to get both of them:
wait_for_it = EmrStepSensor(
task_id='wait_for_it',
job_flow_id="{{ task_instance.xcom_pull('run_emr_step_task', key='return_value')[0] }}"
step_id="{{ task_instance.xcom_pull('run_emr_step_task', key='return_value')[1] }}"
dag=dag
)
Want to build AI systems that actually work?
Download my expert-crafted GenAI Transformation Guide for Data Teams and discover how to properly measure AI performance, set up guardrails, and continuously improve your AI solutions like the pros.