---
title: "How to check whether a YARN application has finished"
description: "How to use Airflow PythonSensor to check whether a YARN application finished running"
author: "Bartosz Mikulski"
author_bio: "Principal AI Engineer & MLOps Architect. I bridge the gap between \"it works in a notebook\" and \"it works for 200 million users.\""
author_url: https://mikulskibartosz.name
author_linkedin: https://www.linkedin.com/in/mikulskibartosz/
author_github: https://github.com/mikulskibartosz
canonical_url: https://mikulskibartosz.name/check-whether-yarn-application-finished
---

It is relatively easy to start a new YARN application in Airflow, but how can we check whether it has finished running? In this tutorial, I show how to configure the `PythonSensor` to access the YARN cluster using SSH, list all the running applications, and check whether it still runs.

First, we have to define a `PythonSensor`. In this example, I use an Airflow template to pass the YARN command to the Python function. Later, I will check whether the list of YARN applications contains that string:

```python
wait_for_yarn = PythonSensor(
    task_id='wait_for_yarn',
    python_callable=the_python_function,
    op_kwargs={
        'yarn_app': 'here is the YARN command'
    },
    timeout=YARN_TIMEOUT,
    dag=dag
)
```

I have to define the `the_python_function`, which should return True when the application has finished running or False if it is still being executed:

```python
def the_python_function(yarn_app: str):
    ...
```

In the function, we have to connect to the EMR cluster using SSH and call the `yarn application --list` command:

```python
def the_python_function(yarn_app: str):
    ssh = SSHHook(ssh_conn_id=EMR_CONNECTION_ID)
    ssh_client = None
    try:
        ssh_client = ssh.get_conn()
        ssh_client.load_system_host_keys()
        stdin, stdout, stderr = ssh_client.exec_command(f'yarn application --list')
        output_lines = stdout.readlines()

        ... # here we will process the output_lines

    finally:
        if ssh_client:
            ssh_client.close()
```

The `yarn application --list` command returns YARN applications that have not finished (or failed) yet. Therefore, now, I have to iterate over the `output_lines` to check whether they contain the `yarn_app`. If such a line exists, the YARN application is still running, and the sensor function returns False:

```python
# put this between "output_lines = stdout.readlines()" and "finally"

for line in output_lines:
    if yarn_app in line:
        return False
```

In most cases, we will want to distinguish between an application that finished successfully and those that failed or were killed. Because of that, we must call the `yarn application` command again to retrieve the failed commands and check whether the list contains the `yarn_app`. In this case, the function will raise an exception and fail the Airflow task:

```python
# put this after the previous code snippet but before "finally"

stdin, stdout, stderr = ssh_client.exec_command(f'yarn application --list -appStates FAILED,KILLED')
output_lines = stdout.readlines()

for line in output_lines:
    if yarn_app in line:
        raise Exception(f'{yarn_app} has failed')
```

If the function has not returned False or raised an exception yet, it means that the application finished successfully, and we must return True:

```python
# this can be the last line of the function, outside the try...finally block

return True
```

