It is not difficult to turn your Python environment into a mess. Soon, the libraries become incompatible with one another, start producing weird results or suddenly crash in the middle of a computation.

Table of Contents

  1. Get Weekly AI Implementation Insights
  2. Use PythonVirtualenvOperator

Fortunately, we can create separate environments using Virtualenv or Conda. This feature is also available in Airflow, but in this case, we have access only to Virtualenv (unless you add a custom operator).

First, we have to define a Python function we want to run. Note that we must define ALL imports inside the function, and it cannot reference anything defined outside. Even if it is a global variable. We must pass all such variables as arguments of the PythonVirtualenvOperator.

def some_python_function():
    import pandas as pd

    # do something with Pandas

    return "some value"

The returned value is available in the Airflow XCOM, and we can reference it in the subsequent tasks.

There is one issue concerning returned values (and input parameters). If the Python version used in the Virtualenv environment differs from the Python version used by Airflow, we cannot pass parameters and return values. In this case, we can use only the string_args parameter.

Get Weekly AI Implementation Insights

Join engineering leaders who receive my analysis of common AI production failures and how to prevent them. No fluff, just actionable techniques.

Use PythonVirtualenvOperator

Now, I can configure the Airflow operator. I pass the required libraries as the requirements parameter. It supports the same syntax as the requirements.txt file, so I can also define a version:

virtualenv_task = PythonVirtualenvOperator(
    task_id="virtualenv_pandas",
    python_callable=some_python_function,
    requirements=["pandas"],
    system_site_packages=False,
    dag=dag,
)

Get Weekly AI Implementation Insights

Join engineering leaders who receive my analysis of common AI production failures and how to prevent them. No fluff, just actionable techniques.

Older post

Remove a directory from S3 using Airflow S3Hook

How to remove files with a common prefix from S3

Newer post

How to add an EMR step in Airflow and wait until it finishes running

How to use AwsHook and EmrStepSensor to add an EMR step and wait until it finishes running

Engineering leaders: Is your AI failing in production? Take the 10-minute assessment
>