Integration Testing for your Databricks CI/CD data pipelines

Integration Testing for your Databricks CI/CD Data Pipelines with Microsoft Nutter

In this blogpost we will continue our journey of testing our Data Pipelines. If you haven't checked out the first post, make sure you do. As assessed in the first post, writing unit tests in pyspark can be challenging. Especially when we go to the world of notebooks, setting up your testing infrastructure can be challenging. Most often the data scientists/data engineers in your organization will prefer working on and developing their Data Pipelines in a dedicated environment like Databricks or JupyterLab. In order to enable cross-team collaboration, peer-review and enforce best testing practices, the notebooks will have to be version controlled. In Databricks, for example, the preferred way to work on notebooks is to use Databricks Repos. However, because Databricks doesn't have support for pytest, you cannot write a separate notebook for tests. The data engineer will be writing pytest's using an IDE as part of the notebook pull request, the execution part being left to the CI/CD pipeline in a mocked Spark environment. This ensures that our code is determinstic and our functions are correct, but it does not necessarily translate to correct functioning in a production-like environment.

Microsoft Nutter - Testing Framework for Databricks

To overcome the limits described above, a popular framework open-sourced by Microsoft is Nutter. The library has two main components: the runner and the CLI. We will be exploring in this blogpost only the runner, leaving the CLI as an exercise for the readers. The runner runtime is a module that can be used once you install the library in your notebook. In Databricks we can use the following command, that will install nutter as well as hypothesis and tinsel libraries used to generate our datasets:

%pip install -U nutter hypothesis tinsel

As an example, we will be using the same function as in the first series blogpost to illustrate the example in a separate notebook:

from pyspark.sql import DataFrame
import pyspark.sql.functions as F

def test_dataframe_transformations(df: DataFrame) -> DataFrame:
    temp_df = df.select('*', F.explode_outer('order_items'))\
        .select(F.col('created_at'), F.col('col.item_value').alias('item_value'))\
        .withColumn('day', F.to_date('created_at'))\
        .withColumn('week', F.weekofyear('created_at'))

    weekly_revenue_df = temp_df.groupBy('week').agg(
        F.sum('item_value').alias('weekly_revenue')
    )

    return temp_df.groupBy('week', 'day').agg(
        F.sum('item_value').alias('daily_revenue')
    ).join(weekly_revenue_df, on='week', how='inner')

Now, in our testing notebook, we can import the needed functions with the following Databricks command:

%run /{notebook_to_test_path}

In order to setup our test datasets, we will be using the same setup as in the previous blogpost:

import hypothesis.strategies as st
from hypothesis import given, settings, HealthCheck

settings.register_profile(
"my_profile",
max_examples=50,
deadline=60 * 1000,  # Allow 1 min per example (deadline is specified in milliseconds)
suppress_health_check=(HealthCheck.too_slow, HealthCheck.data_too_large),
)
from datetime import datetime, timedelta
from typing import List, NamedTuple
from tinsel import struct, transform

@struct
class OrderItem(NamedTuple):
item_id: str
item_value: float

@struct
class Order(NamedTuple):
order_id: str
created_at: datetime
updated_at: datetime
customer_id: str
order_items: List[OrderItem]

week_sunday = (datetime.now() - timedelta(days = datetime.now().weekday() + 1))
time_window = (datetime.now() - timedelta(days = datetime.now().weekday() + 7))

def order_strategy():
return st.lists(st.builds(Order,
order_id=st.text(min_size=5),
created_at=st.datetimes(min_value=time_window, max_value=week_sunday),
updated_at=st.datetimes(min_value=time_window, max_value=week_sunday),
customer_id=st.text(min_size=5),
order_items=st.lists(st.builds(OrderItem,
item_id=st.text(min_size=5),
item_value=st.floats(min_value=0, max_value=100)), min_size=1)), min_size=5)

We are now ready to create our first tests with Microsoft Nutter. The test framework follows the following conventions for the test fixtures:

before_(testname) - (optional) - if provided, is run prior to the 'run_' method. This method can be used to setup any test pre-conditions

run_(testname) - (optional) - if provider, is run after 'before_' if before was provided, otherwise run first. This method is typically used to run the notebook under test

assertion_(testname) (required) - run after 'run_', if run was provided. This method typically contains the test assertions

after_(testname) (optional) - if provided, run after 'assertion_'. This method typically is used to clean up any test data used by the test

In the case where we want to test full notebooks, we can setup for example our static dataframe from a file in the before function, run the ETL transformations, assert that the created delta table corresponds to our expectations and use the after in order to clean up all the test resources we might have spinned up. In this blogpost, however, we will only be using the assertion as we are generating the data on the fly. Our test fixture then would look like this:

from runtime.nutterfixture import NutterFixture, tag

default_timeout = 600

class Test1Fixture(NutterFixture):
def init(self):
NutterFixture.init(self)

@given(data=order_strategy())
@settings(settings.load_profile("my_profile"))
def test_samples(self, data):
  df = spark.createDataFrame(data=data, schema=transform(Order), verifySchema=False)
  collected_list = test_dataframe_transformations(df).collect()
  for row in collected_list:
    assert row['weekly_revenue'] >= row['daily_revenue']

def assertion_test_transform_data(self):
    self.test_samples()

def after_test_transform_data(self):
    print('done')

We can now run our test notebook, and the results will look like this:

Notebook exited:
Notebook: N/A - Lifecycle State: N/A, Result: N/A
Run Page URL: N/A

PASSING TESTS

test_transform_data (31.414987 seconds)

This notebook can be added as part of your CI/CD pipeline, and we will explore in the next blogpost how. We can write and run unit/integration/end2end tests using Nutter and export the results via CSV or JUnit. Happy Coding!

References

Microsoft Nutter

social