A year ago, I wrote a (blog post)[https://www.mikulskibartosz.name/measuring-data-quality-using-aws-deequ/] about using AWS Deequ to validate a dataset at the beginning of a data pipeline. I had to write the code in Scala because it was the only supported language at the time. When I was running it in production, I had to rewrite our custom EMR-controlling script to support a Scala job. Now, I wouldn’t have to do it because Deequ is available in Python.
Table of Contents
Let’s take a look at the Python version of the library.
Importing Deequ
First, we have to import the libraries and create a Spark session. Note that we pass Maven libraries specified by Deequ to Spark. We do it because the Python version is a wrapper around the Scala code.
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, DoubleType, DateType
import pydeequ
from pydeequ.analyzers import *
from pydeequ.checks import *
from pydeequ.verification import *
spark = (SparkSession
.builder
.config("spark.jars.packages", pydeequ.deequ_maven_coord)
.config("spark.jars.excludes", pydeequ.f2j_maven_coord)
.getOrCreate())
Using the analyzer
Before we start writing validation rules, we can automatically run analyzers to figure out what may be wrong with the dataset. To do it, we will have to load the file into a Spark Dataset:
schema = StructType([ \
StructField("order_id", IntegerType(), True), \
StructField("product_name", StringType(), True), \
StructField("pieces_sold", IntegerType(), False), \
StructField("price_per_item", DoubleType(), True), \
StructField("order_date", DateType(), True), \
StructField("shop_id", StringType(), False) \
])
df = spark.read.csv("MOCK_DATA.csv", header=True, schema=schema)
In the next step, we specify the analyses, run them, and display the results:
analysisResult = AnalysisRunner(spark) \
.onData(df) \
.addAnalyzer(Size()) \
.addAnalyzer(Completeness("product_name")) \
.addAnalyzer(Completeness("pieces_sold")) \
.run()
analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()
In the console, we are going to see something like this:
+-------+------------+------------+------+
| entity| instance| name| value|
+-------+------------+------------+------+
|Dataset| *| Size|1000.0|
| Column|product_name|Completeness| 1.0|
| Column| pieces_sold|Completeness| 0.958|
+-------+------------+------------+------+
We have 1000 data rows in the file, and all of them have a not-null product_name. However, 4.2% of pieces_sold
values are missing.
Want to build AI systems that actually work?
Download my expert-crafted GenAI Transformation Guide for Data Teams and discover how to properly measure AI performance, set up guardrails, and continuously improve your AI solutions like the pros.
Running the validation
Analyzing the dataset is handy, but we use Deequ primarily to validate the data. Let’s do it now.
In the code below, I verify the following conditions:
- there are at least 3 data rows in the dataset
- the minimal value of price_per_item is zero
- pieces_sold don’t have null values
- product_name contains unique values
- shop_id has one of the three values: shop_1, shop_2, shop_3
- price_per_item does not have negative values (which is a duplicate of the second rule, but I wanted to show it anyway)
check = Check(spark, CheckLevel.Warning, "Review Check")
checkResult = VerificationSuite(spark) \
.onData(df) \
.addCheck(
check.hasSize(lambda x: x >= 3) \
.hasMin("price_per_item", lambda x: x == 0) \
.isComplete("pieces_sold") \
.isUnique("product_name") \
.isContainedIn("shop_id", ["shop_1", "shop_2", "shop_3"]) \
.isNonNegative("price_per_item")) \
.run()
checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show()
In the case of my test data, the checks return those error messages:
+------------+-----------+------------+--------------------+-----------------+--------------------+
| check|check_level|check_status| constraint|constraint_status| constraint_message|
+------------+-----------+------------+--------------------+-----------------+--------------------+
|Review Check| Warning| Warning|SizeConstraint(Si...| Success| |
|Review Check| Warning| Warning|MinimumConstraint...| Failure|Value: 6.57 does ...|
|Review Check| Warning| Warning|CompletenessConst...| Failure|Value: 0.958 does...|
|Review Check| Warning| Warning|UniquenessConstra...| Failure|Value: 0.673 does...|
|Review Check| Warning| Warning|ComplianceConstra...| Failure|Value: 0.754 does...|
|Review Check| Warning| Warning|ComplianceConstra...| Success| |
+------------+-----------+------------+--------------------+-----------------+--------------------+
We see which checks failed and how many rows contain an invalid value.
What can we do with invalid values?
In one of my projects, we decided that an invalid value detected by Deequ is not a big deal, but we want to know about it. Because of that, we were not stopping the pipeline, but a message was sent to a Slack channel. This didn’t work as intended. Quickly, we started ignoring the messages. That’s why, now, I suggest running such checks only for the most critical data and stopping processing (with an error) if Deequ detects a problem.