Occasionally, when we use Airflow, we have a DAG which always works on the most recent snapshot of data even if we run a backfill for days in the past. It happens when at least one of the tasks downloads a snapshot of the current state from an external API, uploads data into another service (using an API, FTP upload, sending events to a message queue, or anything else), or pushes some information to users.

Table of Contents

  1. Get Weekly AI Implementation Insights

The last case is the worst. Certainly, we don’t want to send old versions of newsletters to all of the subscribers just because we had to backfill some values in a report.

How do we make sure that such time-sensitive tasks are executed only in the most recent DAG run and don’t start when we run a backfill? We can do that by using the LatestOnlyOperator in Airflow!

It is simple to use it. All we need to do is importing the operator, creating a new instance, and adding it to the DAG:

from airflow.operators.latest_only_operator import LatestOnlyOperator

is_this_latest_dag_run = LatestOnlyOperator(task_id="dont_send_newsletters_during_backfills", dag=dag)

upstream_task >> is_this_latest_dag_run >> send_newsletters >> other_downstream_task

When the operator runs, it checks whether the current time is between the most recent execution time and the next scheduled execution time. If yes, it lets the downstream tasks run. If no, it means that we are running a backfill, and the downstream tasks get skipped.

Get Weekly AI Implementation Insights

Join engineering leaders who receive my analysis of common AI production failures and how to prevent them. No fluff, just actionable techniques.

Get Weekly AI Implementation Insights

Join engineering leaders who receive my analysis of common AI production failures and how to prevent them. No fluff, just actionable techniques.

Older post

Christopher Bergh - How the DataOps principles help data engineers make data pipelines trustworthy

An interview with Christopher Bergh who explains how the DataOps principles help data engineers make data pipelines trustworthy

Newer post

What is the difference between a transformation and an action in Apache Spark?

What is an action in Apache Spark? What do you understand as transformations in Apache Spark?

Engineering leaders: Is your AI failing in production? Take the 10-minute assessment
>