How to use one SparkSession to run all Pytest tests

When we test a PySpark application, we run into a problem of passing the SparkSession into the tests. Of course, we can instantiate a separate session in every test function, but that is going to slow down the tests significantly. Such a solution may be acceptable when we have only one or two PySpark tests inside a larger application, but what if we want to run a few hundreds of tests and every one of them uses PySpark?

In this situation, we should instantiate the SparkSession once and pass it to every test as a parameter. In PyTest, we can do it using the fixtures. The fixtures are supposed to configure the test environment and clean up after the tests.

To configure a fixture, we must create a new file in the tests directory and implement a function that returns the value of that fixture. The function must be decorated using the pytest.fixture decorator. Inside the function, we can also define a finalizer that is supposed to release the resources allocated by the fixture.

To reuse the same SparkSession in all of the tests, we must specify the scope of the fixture and set its value to “session”.

The following example demonstrates the complete way of defining a SparkSession fixture. Note that the name of the function is going to be used as the fixture name.

import pytest
from pyspark.sql import SparkSession


@pytest.fixture(scope="session")
def spark_session(request):
    spark_session = SparkSession.builder \
        .master("local[*]") \
        .appName("some-app-name") \
        .getOrCreate()

    request.addfinalizer(lambda: spark_session.sparkContext.stop())

    return spark_session

In the tests, we must declare which fixture we want to use inside the test file. The function that creates a SparkSession is called spark_session, so we use the same name to declare the fixture.

pytestmark = pytest.mark.usefixtures("spark_session")

Now, we can add the spark_session parameter to every test function that needs a SparkSession.

def test_name(spark_session):
    ...
Older post

How to send AWS CloudWatch Alerts to a Slack channel using Terraform

How to use Terraform to configure a CloudWatch alert and send the message to a Slack channel.

Newer post

How to send a customized Slack notification when an Airflow task fails

How to customize a Slack notification before sending it to the Slack incoming webhook.

Are you looking for an experienced AI consultant? Do you need assistance with your RAG or Agentic Workflow?
Book a Quick Consultation, send me a message on LinkedIn. Book a Quick Consultation or send me a message on LinkedIn

>