Even though S3 has no concept of catalogs, we tend to put / as delimiters in the object keys and think of files with the same key prefix as files in the same directory. After all, when we open the S3 web interface, it looks like a file system with directories.

Table of Contents

  1. Get Weekly AI Implementation Insights

Because of that, removing files with a common prefix is an everyday use case, as it is the S3 equivalent of removing a directory.

This operation is not trivial for two reasons:

  • We have to list all objects in a “directory” because we need exact keys to remove anything
  • We cannot remove more than 1000 keys at once

I will do all of that in three stages. First, I have to retrieve all object keys that begin with a given prefix. Note that the S3Hook uses pagination internally to get all files, so we don’t have to worry about that:

from airflow.hooks.S3_hook import S3Hook

s3_hook = S3Hook('s3_connection_id')

object_keys = s3_hook.list_keys(bucket_name=bucket_name, prefix=key_prefix)

I cannot pass all of the keys directly into the delete_objects functions because I may have more than 1000 keys in the object_keys list. To solve that problem, I will define the chunks function that splits the list into multiple lists that contain no more than the given number of elements:

def chunks(lst, n):
    for i in range(0, len(lst), n):
        yield lst[i:i + n]

Get Weekly AI Implementation Insights

Join engineering leaders who receive my analysis of common AI production failures and how to prevent them. No fluff, just actionable techniques.

Now, I use chunks to split the keys into batches and remove every batch using the delete_objects function:

if object_keys:
    batches = chunks(object_keys, 1000)
    for batch in batches:
        s3_hook.delete_objects(bucket=bucket_name, keys=batch)

Get Weekly AI Implementation Insights

Join engineering leaders who receive my analysis of common AI production failures and how to prevent them. No fluff, just actionable techniques.

Older post

Run a command on a remote server using SSH in Airflow

How to use the SSHHook in a PythonOperator to connect to a remote server from Airflow using SSH and execute a command.

Newer post

How to use Virtualenv to prepare a separate environment for Python function running in Airflow

How to use the PythonVirtualenvOperator in Airflow

Engineering leaders: Is your AI failing in production? Take the 10-minute assessment
>