Automate all your PySpark Unit Test with Hypothesis!
Unit testing is often regarded as a main pillar of testing your software applications, and it usually involves
testing a single/unit component and ensuring that it covers all the edge cases the software developer can think of.
The Functional Programming world, specifically Haskell with QuickCheck and Scala with ScalaCheck, have introduced the
idea of testing based on property specifications and automatic test data generation in order to complement
the traditional unit testing approach. The idea is that we define a property that specifies the program behaviour and
the data is automatically generated for us to overcome all the test scenarios, using shrinking in order to simplify the
verification of the test case.
PySpark property-based testing
Testing your PySpark notebooks can be quite challenging by itself, the data infrastructure itself has often quite
limited support for pytest and adding such tests to your existing data pipelines adds quite some complexity.
Additionally, testing your ETL transformation function can be quite cumbersome as it involves generating data,
maintaining it, or simply skipping the unit tests in favor of simple integrity tests. In this blog post we will see
how we can easily generate data from dataclasses and think of unit testing in terms of properties rather than data
integrity. We will start with Hypothesis, an excellent Python library built for property-based testing.
We can begin by specifying some simple dataclasses we will be using in order to showcase the potential of the library:
from datetime import datetime, timedelta
from typing import List, NamedTuple
from tinsel import struct, transform
@struct
class OrderItem(NamedTuple):
item_id: str
item_value: float
@struct
class Order(NamedTuple):
order_id: str
created_at: datetime
updated_at: datetime
customer_id: str
order_items: List[OrderItem]
Our transaction dataset will be composed by orders with a set of order items for each order. The dataclasses are defined
as NamedTuples because PySpark has limited support for dataclasses. We will be working with a secondary python library
called Tinsel, which allows us to easily convert our dataclass schema to a Spark DataFrame schema.
At this point we can leverage the Hypothesis library to specify arbitrary generators we will be using for our dataclasses:
import hypothesis.strategies as st
from hypothesis import given, settings, HealthCheck
week_sunday = (datetime.now() - timedelta(days = datetime.now().weekday() + 1))
time_window = (datetime.now() - timedelta(days = datetime.now().weekday() + 7))
def order_strategy():
return st.lists(st.builds(Order,
order_id=st.text(min_size=5),
created_at=st.datetimes(min_value=time_window, max_value=week_sunday),
updated_at=st.datetimes(min_value=time_window, max_value=week_sunday),
customer_id=st.text(min_size=5),
order_items=st.lists(st.builds(OrderItem,
item_id=st.text(min_size=5),
item_value=st.floats(min_value=0, max_value=100)), min_size=1)), min_size=5)
In the above code snippet I have specified some upper and down boundaries for the datetimes I will be using in my test
dataset. Likewise for the strings, floats and lists I have used different specifiers. For additional custom generator
strategy consult the excellent Hypothesis documentation.
Now that we have our custom generators we can specify our example transformation function we would like to test:
from pyspark.sql import DataFrame
import pyspark.sql.functions as F
def test_dataframe_transformations(df: DataFrame) -> DataFrame:
temp_df = df.select('*', F.explode_outer('order_items'))\
.select(F.col('created_at'), F.col('col.item_value').alias('item_value'))\
.withColumn('day', F.to_date('created_at'))\
.withColumn('week', F.weekofyear('created_at'))
weekly_revenue_df = temp_df.groupBy('week').agg(
F.sum('item_value').alias('weekly_revenue')
)
return temp_df.groupBy('week', 'day').agg(
F.sum('item_value').alias('daily_revenue')
).join(weekly_revenue_df, on='week', how='inner')
The property I am interested to test in the above code snippet is the revenue itself. Logically the daily revenue should
always be smaller or equal than the weekly revenue, unless I have an undetected bug in my code.
Now, in order to be able to use pyspark in your unit testing you need to setup a global SparkSession:
import unittest
from pyspark.sql import SparkSession
class PySparkTestCase(unittest.TestCase):
"""Set-up of global test SparkSession"""
@classmethod
def setUpClass(cls):
cls.spark = (SparkSession
.builder
.master("local[1]")
.appName("PySpark unit test")
.getOrCreate())
@classmethod
def tearDownClass(cls):
cls.spark.stop()
We can setup different profiles for our Hypothesis generators, for example the amount of examples or the deadline:
settings.register_profile(
"my_profile",
max_examples=50,
deadline=60 * 1000, # Allow 1 min per example (deadline is specified in milliseconds)
suppress_health_check=(HealthCheck.too_slow, HealthCheck.data_too_large),
)
Using our generators, settings and SparkSession we can now write our first unit test:
class SimpleTestCase(PySparkTestCase):
def test_spark(self):
@given(data=order_strategy())
@settings(settings.load_profile("my_profile"))
def test_samples(data):
df = self.spark.createDataFrame(data=data, schema=schema, verifySchema=False)
collected_list = test_dataframe_transformations(df).collect()
for row in collected_list:
assert row['weekly_revenue'] >= row['daily_revenue']
test_samples()
This code snippet will feed all the generated datasets to the transformation function as a Spark DataFrame, collect
the results and check the property we were after all along.
In the next blog post we will tackle the orchestration of unit tests and the addition to your Databricks CI/CD pipeline
using Microsoft's Nutter! Stay Tuned!
The code is available on github
References
Hypothesis Docs
Tinsel PySpark schema converter
Property Based Testing with QuickCheck