Building Framework on top of Great Expectations
Data Quality has been mostly overlooked by teams and companies, they miss the importance of how bad data can lead to bad decisions and how important is to validate data during the pipeline. The great news is that recently with the rise of new term "Data Contracts", data validation and quality has gained lot of hype and pushing teams to adopt the right approach to solve data quality and consistency issues. There are numerous tools now that solves this problem, from open source libraries to SaaS products.
It is very important and best practice to have atleast some level of validation. Data Quality can be achieved in many ways:
For basic one off checks, writing checks from scratch are fine.
For advanced data quality, leveraging open source is great.
For advance data quality with custom features and reusability, building a framework/abstraction is ideal.
In this article, I will share how to leverage one of the open source data quality library and leverage to build a robust framework for PySpark
based Data Pipelines, making sure the bad data is not getting ingested silently which can impact the decision making.
You can take an inspiration to build one for your specific use case on top of any OSS.
Great Expectations [GE]
Great Expectations has been around in the market for many years, they have their data quality open source as well a commercial cloud tool on top of the OSS. The open source also have libraries like PySpark
that can be leveraged directly which I will use to build a custom easy to use framework.
Some features of Great Expectations PySpark
package.
Great Expectations supports
Spark 3.0+
Great Expectations has built in metrics and validations which returns useful metadata
Great Expectations can be linked up with HTML static pages [Not part of this framework]
On top of GE, I built custom functionalities that bring many benefits through abstraction along with additional features:
Ability to generate a full set of validations and metrics as a
DataFrame
which can be pushed to a data warehouse as a table to perform historical analysisAbility to fail pipelines when one of the data validation has failed
Ability to add optional metadata columns, e.g.
run_id
from the OrchestratorAbility to add optional pipeline arguments along with metadata, e.g.
batch_date
Ability to derive validations through configurations (Dict, YAML, JSON)
Ability to generate default set of metrics
Ability to provide users to implement metrics and validations using lower level API
If you are looking for anything from the above list then this could be right for you.
Why GE?
The above items might be available through different libraries that I am unaware of, e.g. PyDeequ
which I looked into first, however at that time it was not supporting Spark 3.0
. With PyDeequ
data profiling, you could already get few of the above benefits for free. Further, I have not seen that many Spark based libraries that are flexible enough to be used in similar fashion.
Standardization
GE already provides a simple to use library that already returns lot of information, however, in order to support many additional use cases mentioned earlier, I added another layer of abstraction to have more control and keep things simple for end users.
Metrics
Each Metrics returns a custom MetricsSuiteResult
standardized through a decorator into a Row
. These rows with optional metadata generate a DataFrame
and return to the caller. This enables users to push it to Data Warehouse and do analysis on top of it.
For example: Comparing the row counts in the last ten pipeline runs.
Validations
Each Validations returns a GE Object ExpectationSuiteValidationResult
standardized through a decorator into a Row
. These rows with optional metadata generate a DataFrame
and return to the caller. This enables users to push it to Data Warehouse and do analysis on top of it.
For example: Finding how many times a pipeline failed in the last month due to X
validation.
Currently, it supports the standard metrics and validations. Can be extended easily.
Config Driven Validations
On top of standardization, I built a config driven validation application.
A config is expected to be a Dict
, however upstream it can be sourced from YAML or JSON ideally which keeps it decoupled from the application code and make things easier to change especially when validations (Data Contracts) are driven by non technical team.
A sample YAML config:
validations:
decision:
- expect_column_to_exist
- expect_column_values_to_be_of_type: "StringType"
- expect_column_values_to_be_in_set: ["yes", "no"]
- expect_column_value_to_exist: ["random"]
id:
- expect_column_values_to_not_be_null
random_column:
- expect_column_to_exist
Default Metrics
Similar to a config driven app, having default metrics is very useful, users don't always know what to calculate, the best approach is to just generate the defaults.
This can be computationally expensive as you may end up generating metrics that are useless. In that case using lower level Api is better [discussed later].
Default metrics runs all the basic metrics depending on type, e.g.
For Numeric columns:
- get_column_max
- get_column_median
- ...
For Non Numeric columns:
- get_column_unique_count
- get_column_values_count
- ...
Usage
The package can be used in few ways, depends on the use case.
Downloading from PyPi:
pip install data-kalite
Clone and integrate in your codebase
Clone, integrate and build your python package:
pip install .
(.egg) orpython setup.py bdist_wheel
(whl)
Lower Level API
Accessing the functions directly by implementing the function metrics
or validations
.
from typing import List
from kalite.data_classes.metrics_data import MetricsData
from kalite.functions.metrics import GeMetrics, Metrics
class TempM(Metrics):
def metrics(self) -> List[MetricsData]:
return GeMetrics(self.source_data).\
get_column_unique_count("col_1").\
get_column_max("col_2").\
result()
source_data = "<set_dataframe>"
metadata = {
"dag_id": "<set>",
"dag_task_id": "<set>",
"dag_run_id": "<set>",
"pipeline_args": {"<set>": "<set>"}, # OPTIONAL
}
# run() returns a DataFrame
metrics = TempM(source_data=source_data, metadata=metadata).run()
metrics.show()
Similarly for validations, example in the README.md.
Config
Creating a config (as shown in previous section) and using the Config Class to run validations.
from kalite.application.config_driven_validations import ConfigDrivenValidations
source_data = "<set_dataframe>"
metadata = {
"dag_id": "<set>",
"dag_task_id": "<set>",
"dag_run_id": "<set>",
"pipeline_args": {"<set>": "<set>"}, # OPTIONAL
}
config = "<set_dict>" Set dict directly or source from YAML/JSON
# run() returns a DataFrame
validations = ConfigDrivenValidations(source_data=source_data, metadata=metadata, config=config).run()
validations.show()
Default Metrics
Default Metrics which performs all the standard metrics depending on column type, this could be compute heavy, so you have to consider using the first option.
from kalite.application.default_metrics import DefaultMetrics
source_data = "<set_dataframe>"
metadata = {
"dag_id": "<set>",
"dag_task_id": "<set>",
"dag_run_id": "<set>",
"pipeline_args": {"<set>": "<set>"}, # OPTIONAL
}
# run() returns a DataFrame
metrics = DefaultMetrics(source_data=source_data, metadata=metadata).run()
metrics.show()
Validator
Using the validator class for failing the pipeline.
from kalite.functions.validator import Validator
# dataframe that was generated through validations
validation_data = "<set_dataframe>"
Validator.validate(validation_data)
The best way to understand usage is through unit tests.
For detailed examples and sample of Metrics and Validations output table please check here.
Use case
I have been using Data-Kalite for more than a year now in PySpark
based Airflow Dag. Below is a high level architecture of a very simple one task Dag.
In Spark App this is the flow:
Loads data from s3
Generates and writes metrics to s3
Generates and writes validations to s3
Perform validate
If validate fails:
Throws exception, kills process, fails the DAG and alert
If validate passed:
Write data to S3
Contribution
The package is good enough to be part of production pipeline, however it is still need lot of work, if interested please read the basic guide to learn how to contribute or make changes to your version of Data-Kalite. Also, open to new suggestions and feedback.
Adding a new metric or validation is super easy, just follow one of the existing example, add a unit test and that pretty much is. This package is supposed to be lightweight.
Repository: Data-Kalite
Future
There is lot of room for improvements, depending on need I might be adding the following features in the future.
Adding new metrics and validations
Adding multi column metrics and validations
Improving the metrics default
Ability to build a plan then execute metrics and validations
Improving the metadata input
Deriving metrics from config (similar to validations)
Providing external config parsing functionality
Would love to hear feedback and missing functionality that could be helpful.