Skip to content

Python Version Build Status License Coverage Downloads PyPI Version Version

Logo Sumeh DQ

Unified Data Quality Validation Framework

One API. Fifty-plus rules. Fourteen engines. Zero compromise.

Documentation · PyPI · Changelog


Why Sumeh?

Every data quality framework eventually forces you to make a choice:

  • Great Expectations gives you a rich ecosystem, but with suites, checkpoints, and a data context to manage — it's a platform, not a library.
  • Soda pushes you toward SodaCL and SodaCloud; the open-source layer is progressively thinner.
  • pandera is excellent for schema and type validation, but stops there — no aggregation rules, no SQL engines, no row tagging.
  • cuallee introduced the clean API that inspired Sumeh, but covers fewer engines and has no SQL generation, no profiler, and no OpenMetadata integration.

Sumeh's position is different: it is a library, not a platform. It does one thing — run validation rules against data — and it does it across every major engine with the same API and the same return type.

Sumeh Great Expectations Soda Core pandera cuallee
Unified API across engines
Engines 14 ~8 ~6 2 ~7
SQL AST generation (no string concat) N/A
Row-level bifurcation
Open SQL mode (generate without executing) N/A
Data profiler built-in
OpenMetadata export (no SDK)
PySpark without .collect() N/A
Zero mandatory config files
pip install sumeh and go

Comparison reflects public documentation as of mid-2025. Corrections welcome.


What is Sumeh?

Data quality validation is a solved problem — until you have to run the same checks on Pandas today, migrate to PySpark next quarter, and push results to BigQuery in production. Every engine has its own API, its own quirks, and its own way of breaking.

Sumeh provides a single, consistent interface that compiles to whatever engine is underneath. You define rules once. You run them everywhere.

from sumeh import pandas, polars, duckdb, bigquery
from sumeh.core.rules.rule_model import RuleDefinition

rules = [
    RuleDefinition(field="user_id",  check_type="is_unique",      threshold=1.0),
    RuleDefinition(field="email",    check_type="is_complete",     threshold=1.0),
    RuleDefinition(field="email",    check_type="has_pattern",     value=r"^[\w.-]+@[\w.-]+\.[a-zA-Z]{2,}$"),
    RuleDefinition(field="age",      check_type="is_between",      min_value=18, max_value=120),
    RuleDefinition(field="status",   check_type="is_contained_in", allowed_values=["active","inactive","pending"]),
    RuleDefinition(field="revenue",  check_type="has_mean",        value=50_000.0, threshold=0.1),
]

# These are interchangeable — same rules, same report, different engine underneath
report = pandas.validate(df, rules)
report = polars.validate(df, rules)
report = duckdb.validate(con=con, df="orders", rules=rules)
report = bigquery.validate(client=bq_client, table="project.dataset.orders", rules=rules)

print(f"Pass rate: {report.pass_rate:.2%}")  # 83.33%
print(f"Failed:    {len(report.failed)} / {report.total_rules} rules")

good_df, bad_df = report.split()  # bifurcate clean and quarantine data

What Changed in v2.0

v2.0 is a complete rewrite. The architecture is different, the API is different, and there is no dependency on cuallee.

v1.x v2.0
API style validate.pandas(df, rules) from sumeh import pandas; pandas.validate(df, rules)
Return type (df_errors, violations, table_summary) tuple ValidationReport object
Rule class RuleDef RuleDefinition
Engines 6 14
SQL generation String concatenation SQLGlot AST — zero injection risk
PySpark bifurcation .collect() on driver fail_condition Column expressions — never collects
cuallee dependency Required Removed
Open SQL mode ✅ Generate SQL without executing
Profiler ✅ Column-level statistics
OpenMetadata exporter ✅ Zero-SDK payload generation

Migrating from v1.x? See the Migration Guide at the bottom.


Table of Contents


Installation

# Core (Pandas is included by default)
pip install sumeh

# Batch DataFrame Engines
pip install sumeh[polars]       # Polars
pip install sumeh[pyspark]      # Apache Spark
pip install sumeh[dask]         # Dask

# SQL Engines
pip install sumeh[duckdb]       # DuckDB
pip install sumeh[bigquery]     # Google BigQuery
pip install sumeh[snowflake]    # Snowflake
pip install sumeh[redshift]     # Amazon Redshift
pip install sumeh[athena]       # Amazon Athena
pip install sumeh[trino]        # Trino
pip install sumeh[doris]        # Apache Doris

# Streaming & ML Engines
pip install sumeh[pyflink]      # Apache Flink
pip install sumeh[ray]          # Ray Data

# Example: Installing multiple engines at once
pip install sumeh[polars,duckdb,bigquery]

Requirements: Python 3.10+


Quickstart

from sumeh import pandas as sumeh_pandas
from sumeh.core.rules.rule_model import RuleDefinition
import pandas as pd

df = pd.read_csv("customers.csv")

rules = [
    RuleDefinition(field="customer_id", check_type="is_unique",      threshold=1.0),
    RuleDefinition(field="email",       check_type="is_complete",     threshold=1.0),
    RuleDefinition(field="age",         check_type="is_positive",     threshold=0.99),
    RuleDefinition(field="country",     check_type="is_contained_in", allowed_values=["BR","US","DE","FR"]),
    RuleDefinition(field="revenue",     check_type="has_mean",        value=3_500.0, threshold=0.15),
]

report = sumeh_pandas.validate(df, rules)

# Summary
print(f"Pass rate: {report.pass_rate:.2%}")
for r in report.failed:
    print(f"  ✗ [{r.check_type}] {r.field}{r.message}")

# Annotated DataFrame (_dq_errors column added per row)
annotated = report.df

# Split into clean and quarantine
good_df, bad_df = report.split()
bad_df.to_parquet("quarantine/customers.parquet")

Engines

Sumeh supports fourteen engines across three tiers. Every engine exposes the same validate() function and returns the same ValidationReport.

Batch DataFrame Engines

Engine Import Bifurcation Notes
Pandas from sumeh import pandas Boolean mask bifurcation
Polars from sumeh import polars Rust-powered; list.len() bifurcation
PySpark from sumeh import pyspark fail_condition Column expressions — no .collect()
Dask from sumeh import dask Out-of-core parallel computing

SQL Engines

All SQL engines share the sql_core compiler. Queries are built as SQLGlot AST and compiled to the target dialect at call time.

Engine Import Bifurcation Notes
DuckDB from sumeh import duckdb Embedded; in-process SQL bifurcation
BigQuery from sumeh import bigquery Pushes compiled SQL to BQ
Snowflake from sumeh import snowflake Aggregation mode
Redshift from sumeh import redshift Aggregation mode
Athena from sumeh import athena Serverless S3 queries
Trino from sumeh import trino Distributed SQL federation
Apache Doris from sumeh import doris Real-time OLAP
Generic SQL from sumeh import sql_core Query generation without execution

Streaming & ML Engines

Engine Import Notes
PyFlink from sumeh import pyflink Unbounded streams; row-level rules only
Ray Data from sumeh import ray_data ML/AI pipelines; GPU acceleration

Streaming note: Table-level aggregation rules (has_mean, has_cardinality, etc.) require a full dataset. They are not compatible with unbounded streaming sources.


Validation Rules

Sumeh ships 54 rules across 9 categories. The single source of truth is sumeh/core/rules/manifest.json; the tables below are generated from it via scripts/generate_rule_docs.py.

Engine coverage:

Group Engines Row rules Table rules
Core Pandas · PySpark · Polars · DuckDB · Dask · BigQuery
SQL Snowflake · Redshift · Athena · Trino · Doris
Streaming PyFlink · Ray Data ✅ row-level only

Table-level aggregation rules (has_mean, has_cardinality, etc.) require a complete dataset scan and cannot run on unbounded streaming sources.

Completeness

Rule Description Level Engine coverage
is_complete Checks that field has no null values ROW core ✅ · SQL ✅ · stream ✅
are_complete Checks that multiple fields have no null values ROW core ✅ · SQL ✅ · stream ✅

Uniqueness

Rule Description Level Engine coverage
is_unique Checks that field values are unique ROW core ✅ · SQL ✅ · stream ✅
are_unique Checks that combination of fields is unique ROW core ✅ · SQL ✅ · stream ✅
is_primary_key Alias for is_unique (alias: is_unique) ROW core ✅ · SQL ✅ · stream ✅
is_composite_key Alias for are_unique (alias: are_unique) ROW core ✅ · SQL ✅ · stream ✅

Numeric & Comparison

Rule Description Level Engine coverage
is_equal Checks if value equals specified threshold ROW core ✅ · SQL ✅ · stream ✅
is_equal_than Checks if value equals another column ROW core ✅ · SQL ✅ · stream ✅
is_between Checks if value is within range ROW core ✅ · SQL ✅ · stream ✅
is_greater_than Checks if value > threshold ROW core ✅ · SQL ✅ · stream ✅
is_less_than Checks if value < threshold ROW core ✅ · SQL ✅ · stream ✅
is_greater_or_equal_than Checks if value >= threshold ROW core ✅ · SQL ✅ · stream ✅
is_less_or_equal_than Checks if value <= threshold ROW core ✅ · SQL ✅ · stream ✅
is_positive Checks if value > 0 ROW core ✅ · SQL ✅ · stream ✅
is_negative Checks if value < 0 ROW core ✅ · SQL ✅ · stream ✅
is_in_millions Checks if value >= 1,000,000 ROW core ✅ · SQL ✅ · stream ✅
is_in_billions Checks if value >= 1,000,000,000 ROW core ✅ · SQL ✅ · stream ✅

Membership

Rule Description Level Engine coverage
is_contained_in Checks if value in allowed list ROW core ✅ · SQL ✅ · stream ✅
not_contained_in Checks if value not in disallowed list ROW core ✅ · SQL ✅ · stream ✅
is_in Alias for is_contained_in (alias: is_contained_in) ROW core ✅ · SQL ✅ · stream ✅
not_in Alias for not_contained_in (alias: not_contained_in) ROW core ✅ · SQL ✅ · stream ✅

Pattern

Rule Description Level Engine coverage
has_pattern Checks if value matches regex pattern ROW core ✅ · SQL ✅ · stream ✅
is_legit Checks if value is non-null and non-whitespace ROW core ✅ · SQL ✅ · stream ✅

Date

Rule Description Level Engine coverage
is_today Checks if date equals today ROW core ✅ · SQL ✅ · stream ✅
is_t_minus_1 Checks if date equals yesterday ROW core ✅ · SQL ✅ · stream ✅
is_t_minus_2 Checks if date equals 2 days ago ROW core ✅ · SQL ✅ · stream ✅
is_t_minus_3 Checks if date equals 3 days ago ROW core ✅ · SQL ✅ · stream ✅
is_yesterday Alias for is_t_minus_1 (alias: is_t_minus_1) ROW core ✅ · SQL ✅ · stream ✅
is_past_date Checks if date is before today ROW core ✅ · SQL ✅ · stream ✅
is_future_date Checks if date is after today ROW core ✅ · SQL ✅ · stream ✅
is_date_between Checks if date is within range ROW core ✅ · SQL ✅ · stream ✅
is_date_after Checks if date is after specified date ROW core ✅ · SQL ✅ · stream ✅
is_date_before Checks if date is before specified date ROW core ✅ · SQL ✅ · stream ✅
is_on_weekday Checks if date falls on Monday-Friday ROW core ✅ · SQL ✅ · stream ✅
is_on_weekend Checks if date falls on Saturday-Sunday ROW core ✅ · SQL ✅ · stream ✅
is_on_monday Checks if date is Monday ROW core ✅ · SQL ✅ · stream ✅
is_on_tuesday Checks if date is Tuesday ROW core ✅ · SQL ✅ · stream ✅
is_on_wednesday Checks if date is Wednesday ROW core ✅ · SQL ✅ · stream ✅
is_on_thursday Checks if date is Thursday ROW core ✅ · SQL ✅ · stream ✅
is_on_friday Checks if date is Friday ROW core ✅ · SQL ✅ · stream ✅
is_on_saturday Checks if date is Saturday ROW core ✅ · SQL ✅ · stream ✅
is_on_sunday Checks if date is Sunday ROW core ✅ · SQL ✅ · stream ✅
validate_date_format Checks if date string matches expected format ROW core ✅ · SQL ✅ · stream ✅
all_date_checks Runs comprehensive date validation suite ROW core ✅ · SQL ✅ · stream ✅

Custom SQL

Rule Description Level Engine coverage
satisfies Validates custom SQL condition ROW core ✅ · SQL ✅ · stream ✅

Aggregations (Table-level)

These check a single metric across the full column. A rule passes when the measured value is within threshold percent of value. Not compatible with unbounded streaming sources (PyFlink, Ray Data).

Rule Description Level Engine coverage
has_min Validates column minimum value TABLE core ✅ · SQL ⚠️ table-level · stream —
has_max Validates column maximum value TABLE core ✅ · SQL ⚠️ table-level · stream —
has_sum Validates column sum TABLE core ✅ · SQL ⚠️ table-level · stream —
has_mean Validates column average/mean TABLE core ✅ · SQL ⚠️ table-level · stream —
has_std Validates column standard deviation TABLE core ✅ · SQL ⚠️ table-level · stream —
has_cardinality Validates number of distinct values TABLE core ✅ · SQL ⚠️ table-level · stream —
has_entropy Validates column entropy TABLE core ✅ · SQL ⚠️ table-level · stream —
has_infogain Validates information gain metric TABLE core ✅ · SQL ⚠️ table-level · stream —

Schema

Rule Description Level Engine coverage
validate_schema Validates DataFrame schema structure TABLE core ✅ · SQL ⚠️ table-level · stream —

Defining Rules

Rules are RuleDefinition dataclasses.

Field Type Description
field str \| list[str] Column(s) to validate. Use a list for multi-column rules.
check_type str Rule identifier from the manifest
threshold float For row-level rules: minimum pass rate (0.0–1.0). For aggregations: maximum allowed relative deviation from value. Default 1.0.
value Any Expected value for aggregation and pattern rules
min_value / max_value Any Bounds for is_between and range rules
allowed_values list Allowed set for membership rules
execute bool False to skip without removing the rule
from sumeh.core.rules.rule_model import RuleDefinition

# Row-level: threshold = minimum pass rate across all rows
RuleDefinition(field="email",        check_type="is_complete",     threshold=1.0)
RuleDefinition(field=["name","dob"], check_type="are_complete",    threshold=0.95)
RuleDefinition(field="user_id",      check_type="is_unique",       threshold=1.0)
RuleDefinition(field=["id","date"],  check_type="are_unique",      threshold=1.0)
RuleDefinition(field="age",          check_type="is_positive",     threshold=0.99)
RuleDefinition(field="score",        check_type="is_between",      min_value=0, max_value=100)
RuleDefinition(field="status",       check_type="is_contained_in", allowed_values=["A","B","C"])
RuleDefinition(field="email",        check_type="has_pattern",     value=r"^[\w.-]+@[\w.-]+\.[a-zA-Z]{2,}$")
RuleDefinition(field="created_at",   check_type="is_past_date",    threshold=1.0)
RuleDefinition(field="*",            check_type="satisfies",       value="age >= 18 AND status != 'banned'")

# Table-level: threshold = allowed relative deviation from expected value
# threshold=0.1 → actual metric must be within ±10% of value
RuleDefinition(field="age",          check_type="has_mean",        value=35.0,    threshold=0.10)
RuleDefinition(field="salary",       check_type="has_min",         value=1_000.0, threshold=0.05)
RuleDefinition(field="category",     check_type="has_cardinality", value=5,       threshold=0.0)

Loading from CSV

field,check_type,threshold,value,execute
customer_id,is_unique,1.0,,true
email,is_complete,1.0,,true
email,has_pattern,1.0,"^[\w\.-]+@[\w\.-]+\.[a-zA-Z]{2,}$",true
age,is_between,1.0,"[18, 120]",true
status,is_contained_in,1.0,"['active','inactive','pending']",true
"[first_name,last_name]",are_complete,0.95,,true
salary,has_mean,0.1,50000,true
from sumeh.config.csv import load_rules_csv
rules = load_rules_csv("rules.csv")

Values are automatically parsed to the correct Python type (int, float, list, regex, date range). Multi-column fields use the "[col1,col2]" notation.


The ValidationReport

Every validate() call returns a ValidationReport. This is the core object of v2.0.

report = pandas.validate(df, rules)

# Aggregate metrics
report.pass_rate           # float — fraction of rules that passed
report.total_rules         # int
report.passed              # list[ValidationResult]
report.failed              # list[ValidationResult]

# Per-rule results
for result in report.results:
    result.check_type      # "is_complete"
    result.field           # "email"
    result.status          # ValidationStatus.PASS | FAIL
    result.pass_rate       # 0.973 — 97.3% of rows passed
    result.actual_value    # measured metric
    result.expected_value  # expected metric
    result.message         # "27 null values found in 1000 rows"

# Annotated DataFrame: adds _dq_errors column to each row
annotated_df = report.df

Bifurcation

Bifurcation splits the validated dataset into clean rows and quarantine rows in a single pass. No double scanning, no extra joins.

report = pandas.validate(df, rules)
good_df, bad_df = report.split()

# good_df — original columns, _dq_errors removed
# bad_df  — original columns + _dq_errors (list of failed rule names per row)

bad_df.to_parquet("quarantine/2024-06-01.parquet")

The same pattern works across all bifurcation-capable engines:

# Polars
good_df, bad_df = polars.validate(df, rules).split()
# → pl.DataFrame, pl.DataFrame

# PySpark — fully lazy, no .collect(), no driver OOM
report = pyspark.validate(spark, df, rules)
good_df, bad_df = report.split()
good_df.write.parquet("s3://bucket/clean/")
bad_df.write.parquet("s3://bucket/quarantine/")

# Dask
good_df, bad_df = dask.validate(df, rules).split()
# → dask.DataFrame, dask.DataFrame

# DuckDB — bifurcation at the SQL layer
report = duckdb.validate(con=con, df="stg_orders", rules=rules, bifurcate=True)
good_rel, bad_rel = report.split()
# → DuckDBPyRelation, DuckDBPyRelation
good_rel.write_parquet("clean.parquet")

Aggregation rules (has_mean, has_cardinality, etc.) are table-level checks — they are evaluated and reported, but they have no row-level counterpart and do not affect which rows end up in bad_df.


Open SQL Mode

Generate the full validation SQL for any dialect without executing it. Useful for auditing, CI dry-runs, or submitting to an external scheduler.

from sumeh import sql_core
from sumeh.core.rules.rule_model import RuleDefinition

rules = [
    RuleDefinition(field="user_id", check_type="is_unique",   threshold=1.0),
    RuleDefinition(field="email",   check_type="is_complete", threshold=1.0),
    RuleDefinition(field="age",     check_type="has_mean",    value=35.0, threshold=0.1),
]

sql = sql_core.get_validation_sql(
    table="bronze.stg_transactions",
    rules=rules,
    dialect="bigquery",  # "snowflake", "duckdb", "trino", "spark", "postgres", ...
)

print(sql)
# SELECT
#   CAST(COUNT(user_id) AS FLOAT64) / COUNT(*) AS is_unique__user_id,
#   CAST(COUNT(email) AS FLOAT64) / COUNT(*) AS is_complete__email,
#   AVG(age) AS has_mean__age
# FROM bronze.stg_transactions

All SQL is built through SQLGlot's AST — no string concatenation, no injection surface, full dialect normalization.


Schema Validation

Validate the structure of a DataFrame against a schema stored in any supported backend.

Schema Registry DDL

CREATE TABLE schema_registry (
    id            INTEGER PRIMARY KEY,
    environment   VARCHAR(50),     -- 'prod', 'staging', 'dev'
    source_type   VARCHAR(50),     -- 'bigquery', 'mysql', etc.
    database_name VARCHAR(100),
    catalog_name  VARCHAR(100),    -- Databricks Unity Catalog
    schema_name   VARCHAR(100),    -- PostgreSQL schema
    table_name    VARCHAR(100),
    field         VARCHAR(100),
    data_type     VARCHAR(50),
    nullable      BOOLEAN,
    max_length    INTEGER,
    comment       TEXT,
    created_at    TIMESTAMP,
    updated_at    TIMESTAMP
);

Extract and Validate

from sumeh import extract_schema, validate_schema, get_schema_config
import pandas as pd

df = pd.read_csv("users.csv")

# Extract what the DataFrame actually has
actual_schema = extract_schema.pandas(df)

# Load what it should have
expected_schema = get_schema_config.csv("schema_registry.csv", table="users")
# or: get_schema_config.bigquery(project_id=..., dataset_id=..., table_id="users")
# or: get_schema_config.postgresql(host=..., schema="public", table="users")

# Compare
is_valid, errors = validate_schema.pandas(df, expected_schema)

if not is_valid:
    for field, error in errors:
        print(f"  ✗ {field}: {error}")

Available for all engines: extract_schema.{engine}() and validate_schema.{engine}() for pandas, polars, pyspark, and duckdb.


Data Profiling

Generate column-level statistics — without writing any validation rules.

from sumeh.core.services.profiler import profile

stats = profile(df)

print(stats["table_stats"])
# { "total_rows": 10000, "columns_count": 12 }

for col, s in stats["column_profiles"].items():
    print(f"{col}: nulls={s['null_count']}, distinct={s['distinct_count']}, mean={s.get('mean')}")

The profiler output is directly consumable by the OpenMetadata exporter — see below.


OpenMetadata Integration

Export validation results and profiling statistics to OpenMetadata without the openmetadata-ingestion SDK.

from sumeh.exporters.openmetadata import OpenMetadataExport
from sumeh.core.services.profiler import profile
import requests

exporter = OpenMetadataExport(table_fqn="iceberg.bronze.stg_transactions")

# --- Validation payloads ---
payload = exporter.validation(report)

# payload["definitions"] → list of CreateTestCaseRequest dicts
for definition in payload["definitions"]:
    requests.post(
        f"{om_url}/api/v1/dataQuality/testCases",
        json=definition, headers=auth_headers
    )

# payload["results"] → list of TestCaseResult dicts
for result in payload["results"]:
    fqn = result["test_case_fqn"]
    requests.post(
        f"{om_url}/api/v1/dataQuality/testCases/{fqn}/testCaseResult",
        json=result["payload"], headers=auth_headers
    )

# --- Profiling payload ---
stats = profile(df)
profile_payload = exporter.profile(stats)
requests.put(
    f"{om_url}/api/v1/tables/{table_id}/tableProfile",
    json=profile_payload, headers=auth_headers
)

The exporter is pure Python with zero I/O. It generates dicts. You own the HTTP calls, the auth, and the retry logic. Nothing is ever sent without your explicit call.


SQL DDL Generator

Generate rules and schema_registry table DDL for 17+ SQL dialects.

from sumeh.generators import SQLGenerator

# PostgreSQL
print(SQLGenerator.generate(table="rules", dialect="postgres", schema="public"))

# BigQuery with partitioning and clustering
print(SQLGenerator.generate(
    table="schema_registry",
    dialect="bigquery",
    schema="my_dataset",
    partition_by="DATE(created_at)",
    cluster_by=["table_name", "environment"]
))

# Snowflake with clustering key
print(SQLGenerator.generate(
    table="rules",
    dialect="snowflake",
    cluster_by=["environment", "table_name"]
))

# Redshift with distribution and sort keys
print(SQLGenerator.generate(
    table="rules",
    dialect="redshift",
    distkey="table_name",
    sortkey=["created_at", "environment"]
))

# Transpile SQL between dialects
transpiled = SQLGenerator.transpile(
    "SELECT * FROM users WHERE created_at >= CURRENT_DATE - 7",
    from_dialect="postgres",
    to_dialect="bigquery"
)

# Introspect
print(SQLGenerator.list_dialects())  # ['athena', 'bigquery', 'databricks', 'duckdb', ...]
print(SQLGenerator.list_tables())    # ['rules', 'schema_registry']

CLI

# Validate a file — defaults to Pandas engine
sumeh validate data.csv rules.csv

# Choose engine
sumeh validate data.parquet rules.csv --engine polars
sumeh validate data.csv rules.csv     --engine duckdb --format json

# Save clean and quarantine splits
sumeh validate data.csv rules.csv \
  --output     clean/data.csv     \
  --quarantine quarantine/data.csv

# CI/CD gate — exits with code 1 if any rule fails
sumeh validate data.csv rules.csv --fail-on-error

# Generate validation SQL without executing it
sumeh sql rules.csv --table bronze.orders --dialect bigquery
sumeh sql rules.csv --table bronze.orders --dialect snowflake

# Schema operations
sumeh schema extract  --data data.csv --output schema.json
sumeh schema validate --data data.csv --registry schema_registry.csv

# DDL generation
sumeh ddl generate --table rules           --dialect postgres
sumeh ddl generate --table schema_registry --dialect bigquery

# Rule introspection
sumeh rules list
sumeh rules info is_complete
sumeh rules search "date"
sumeh rules template


# System info
sumeh info

Architecture

sumeh/
├── core/
│   ├── base/
│   │   └── protocols.py        # IDataFrameValidator, IExporter — engine contracts
│   ├── models/
│   │   ├── validation.py       # ValidationReport, ValidationResult, ValidationStatus
│   │   └── metrics.py          # MetricResult
│   ├── rules/
│   │   ├── manifest.json       # 50+ rule definitions — single source of truth
│   │   └── rule_model.py       # RuleDefinition dataclass
│   ├── logic/
│   │   └── comparators.py      # Constraint classes per category
│   ├── services/
│   │   └── profiler/           # Column-level statistics
│   └── io.py                   # load_data / save_data helpers
├── engines/
│   ├── sql_core/               # Shared SQL compilation layer
│   │   ├── analyzers.py        # check_type → SQLGlot AST expression
│   │   ├── compiler.py         # Assembles SELECT from a rule list
│   │   ├── validator.py        # Maps SQL result row → ValidationResult
│   │   └── registry.py         # check_type → (Analyzer, Constraint)
│   │
│   ├── pandas/                 # Boolean mask bifurcation
│   ├── polars/                 # list.len() bifurcation
│   ├── pyspark/                # fail_condition Column expressions — no .collect()
│   ├── dask/                   # Out-of-core parallel
│   ├── duckdb/                 # sql_core + in-process SQL bifurcation
│   ├── bigquery/               # sql_core + BigQuery client
│   ├── snowflake/              # sql_core + Snowflake connector
│   ├── redshift/               # sql_core + Redshift
│   ├── athena/                 # sql_core + Athena
│   ├── trino/                  # sql_core + Trino
│   ├── doris/                  # sql_core + Apache Doris
│   ├── pyflink/                # PyFlink streaming UDF engine
│   └── ray_data/               # Ray Data ML engine
├── config/                     # Rule loading backends
├── exporters/
│   └── openmetadata.py         # Zero-SDK OpenMetadata payload generator
├── generators/
│   ├── ddl.py                  # SQL DDL for 17+ dialects
│   └── transpiler.py           # SQLGlot-based dialect transpiler
└── cli/
    └── commands/               # validate, sql, ddl, schema, rules

Design Decisions

Namespace-first API. from sumeh import pandas; pandas.validate(df, rules) — not validate(df, rules, engine="pandas"). The engine is resolved at import time. Errors are immediate and specific. IDEs understand the full call signature. There is no internal string dispatcher routing at runtime.

Analyzer / Constraint separation. An Analyzer knows how to measure a metric (compute the null rate as a SQLGlot expression, or as a vectorized Pandas operation). A Constraint knows how to decide whether that metric satisfies the rule. Both are small, independently testable, and replaceable. Adding a new check type means writing one of each.

SQLGlot AST for all SQL. No SQL string concatenation in the compiler or engine layers. Every expression is a typed SQLGlot AST node compiled to the target dialect at call time. This eliminates escape sequence bugs and the silent drift that comes from dialect-specific string templates. Note: the satisfies rule accepts arbitrary SQL from the caller — input sanitization is the caller's responsibility, as with any parameterized query layer.

Fail-condition pattern for PySpark. Analyzers return Column expressions applied lazily across the cluster. .collect() is never called — not even for sampling. Calling .collect() on a dataset of any real scale is a driver OOM waiting to happen. The full validation result is computed in a single distributed aggregation pass.

Single-pass bifurcation. Validation and the good/bad row split happen in one scan. The _dq_errors array column is populated per row during the same pass that computes per-rule metrics. The dataset is never read twice.


Migrating from v1.x

Import pattern

# v1.x
from sumeh import validate, summarize
df_errors, violations, table_summary = validate.pandas(df, rules)

# v2.0
from sumeh import pandas
report = pandas.validate(df, rules)

Rule class

# v1.x
from sumeh.core.rules import RuleDef
rule = RuleDef(field="email", check_type="is_complete", threshold=0.99)

# v2.0
from sumeh.core.rules.rule_model import RuleDefinition
rule = RuleDefinition(field="email", check_type="is_complete", threshold=0.99)

Loading Rules from Databases

In Sumeh v2.0, built-in proprietary database connectors (get_rules_config) have been removed to keep the core library lightweight and give you full control over connection management.

You now load rule configurations using standard Python data libraries (like Pandas, SQLAlchemy, or DuckDB) and map the results directly to the RuleDefinition dataclass.

PostgreSQL / MySQL (via Pandas & SQLAlchemy)

import pandas as pd
from sqlalchemy import create_engine
from sumeh.core.rules.rule_model import RuleDefinition

# 1. Connect and query your rules table
engine = create_engine("postgresql://user:secret@localhost:5432/mydb")
query = "SELECT * FROM public.dq_rules WHERE execute = true"
df_rules = pd.read_sql(query, engine)

# 2. Map DataFrame rows to RuleDefinition objects
rules = [RuleDefinition(**row) for row in df_rules.to_dict(orient="records")]

BigQuery (via Google Cloud Client)

from google.cloud import bigquery
from sumeh.core.rules.rule_model import RuleDefinition

client = bigquery.Client(project="my-project")
query = "SELECT * FROM `my-project.my_dataset.dq_rules` WHERE execute = TRUE"

# Fetch results and convert directly
records = client.query(query).result()
rules = [RuleDefinition(**dict(row)) for row in records]

DuckDB (Native)

import duckdb
from sumeh.core.rules.rule_model import RuleDefinition

conn = duckdb.connect("warehouse.db")

# Fetch directly as a Pandas DataFrame
df_rules = conn.execute("SELECT * FROM dq_rules WHERE execute = true").df()

# Map to dataclass
rules = [RuleDefinition(**row) for row in df_rules.to_dict(orient="records")]

Passing Rules to the Engine

Once your rules list is populated, you pass it to any engine exactly the same way:

from sumeh import pyspark

# Validate using the loaded database rules
report = pyspark.validate(spark_session, df, rules)

Rules table DDL

CREATE TABLE dq_rules (
    id            INTEGER PRIMARY KEY,
    environment   VARCHAR(50)  NOT NULL,
    source_type   VARCHAR(50)  NOT NULL,
    database_name VARCHAR(255) NOT NULL,
    catalog_name  VARCHAR(255),
    schema_name   VARCHAR(255),
    table_name    VARCHAR(255) NOT NULL,
    field         VARCHAR(255) NOT NULL,
    level         VARCHAR(100) NOT NULL,   -- 'ROW' or 'TABLE'
    category      VARCHAR(100) NOT NULL,   -- 'completeness', 'uniqueness', ...
    check_type    VARCHAR(100) NOT NULL,
    value         TEXT,
    threshold     FLOAT        DEFAULT 1.0,
    execute       BOOLEAN      DEFAULT TRUE,
    created_at    TIMESTAMP    DEFAULT CURRENT_TIMESTAMP,
    updated_at    TIMESTAMP
);

Generate this DDL for any dialect with sumeh sql generate --table rules --dialect <dialect>.


Working with results

# v1.x — unpack 3-tuple
df_errors, violations, table_summary = validate.pandas(df, rules)
summary = summarize.pandas((df_errors, violations, table_summary), rules, len(df))

# v2.0 — everything lives on the report
report   = pandas.validate(df, rules)
summary  = report.summary()          # dict
good_df, bad_df = report.split()     # bifurcation
annotated = report.df                # DataFrame with _dq_errors column

cuallee

The cuallee backend has been removed. v2.0 has its own validation engine built from scratch.


Contributing

git clone https://github.com/maltzsama/sumeh.git
cd sumeh
git checkout develop
poetry install --with dev

# All tests
poetry run pytest

# Engine-specific
poetry run pytest tests/engines/test_pandas.py  -v
poetry run pytest tests/engines/test_polars.py  -v
poetry run pytest tests/engines/test_duckdb.py  -v
poetry run pytest tests/engines/test_pyspark.py -v

To add a new rule:

  1. Add the definition to sumeh/core/rules/manifest.json
  2. Implement an Analyzer in the target engine's analyzers.py
  3. Register it in the engine's registry.py
  4. Write tests

Releases are automated via semantic-release. Merging to main with a conventional commit triggers versioning, changelog generation, and PyPI publishing via Trusted Publishers.


License

Licensed under the Apache License 2.0.


Built by @maltzsama