Our Airflow DAG failed again. Nobody noticed for hours because the email notification got lost between hundreds of emails from GitHub, Jenkins, and a few other applications that send way too many messages. What can we do about it? Of course, the best option is to limit the overall number of emails, but what if we are not allowed to do that?
Table of Contents
Can we get a Slack notification from Airflow? Sure! That is easy!
Slack webhook
The first thing we need is a configured Slack incoming webhook. It is an URL that receives POST requests and forwards every request to the pre-configured Slack channel. Because of that, we must keep that URL secret! Anyone who has access to it can send messages to our Slack!
We can configure the incoming hook using the steps described in the Slack documentation: https://api.slack.com/messaging/webhooks#getting_started.
Want to build AI systems that actually work?
Download my expert-crafted GenAI Transformation Guide for Data Teams and discover how to properly measure AI performance, set up guardrails, and continuously improve your AI solutions like the pros.
Airflow callback
After defining a webhook, we must create a callback function in Airflow. The function gets an Airflow DAG context as the parameter and does not return anything. Inside this function, we will build the message and send it to the Slack webhook. To communicate with Slack, we will use the SlackWebhookOperator because it encapsulates the HTTP connection and we don’t need to worry about that code:
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
def alert_slack_channel(context):
webhook = 'put here the webhook URL or read it from configuration'
msg = 'here is the message' # we will change it in the next step
SlackWebhookOperator(
task_id='notify_slack_channel',
http_conn_id=webhook,
message=msg,
).execute(context=None)
After defining the callback function, we have to use it as the “on failure callback” in the Airflow DAG:
dag = DAG(
...
default_args={
'on_failure_callback': alert_slack_channel
}
)
Customize the message
The last step is message customization. We would like to include the name of the Airflow task that failed, a link to the log, and the error message.
First, let’s extract the task name from the DAG context:
last_task: Optional[TaskInstance] = context.get('task_instance')
task_name = last_task.task_id
After that, we create a link to the log:
log_link = f"<{last_task.log_url}|{task_name}>"
The last thing we need is the error message or the failure reason:
error_message = context.get('exception') or context.get('reason')
When we send a message using the webhook, we can use the same markup language as the one available in the Slack application. We can even use emoticons!
execution_date = context.get('execution_date')
title = f':red_circle: {task_name} has failed!'
msg_parts = {
'Execution date': execution_date,
'Log': log_link,
'Error': error_message
}
msg = "\\n".join([title,
*[f"*{key}*: {value}" for key, value in msg_parts.items()]
]).strip()
As a result, Slack displays a message which looks like this:
