Skip to content

Module sumeh.engine.polars_engine

This module provides a set of data quality validation functions using the Polars library. It includes various checks for data validation, such as completeness, uniqueness, range checks, pattern matching, and schema validation.

Functions:

Name Description
is_positive

Filters rows where the specified field is less than zero.

is_negative

Filters rows where the specified field is greater than or equal to zero.

is_complete

Filters rows where the specified field is null.

is_unique

Filters rows with duplicate values in the specified field.

are_complete

Filters rows where any of the specified fields are null.

are_unique

Filters rows with duplicate combinations of the specified fields.

is_greater_than

Filters rows where the specified field is less than or equal to the given value.

is_greater_or_equal_than

Filters rows where the specified field is less than the given value.

is_less_than

Filters rows where the specified field is greater than or equal to the given value.

is_less_or_equal_than

Filters rows where the specified field is greater than the given value.

is_equal

Filters rows where the specified field is not equal to the given value.

is_equal_than

Alias for is_equal.

is_in_millions

Retains rows where the field value is less than 1,000,000 and flags them with dq_status.

is_in_billions

Retains rows where the field value is less than 1,000,000,000 and flags them with dq_status.

is_t_minus_1

Retains rows where the date field not equals yesterday (T-1) and flags them with dq_status.

is_t_minus_2

Retains rows where the date field not equals two days ago (T-2) and flags them with dq_status.

is_t_minus_3

Retains rows where the date field not equals three days ago (T-3) and flags them with dq_status.

is_today

Retains rows where the date field not equals today and flags them with dq_status.

is_yesterday

Retains rows where the date field not equals yesterday and flags them with dq_status.

is_on_weekday

Retains rows where the date field not falls on a weekday (Mon-Fri) and flags them with dq_status.

is_on_weekend

Retains rows where the date field is not on a weekend (Sat-Sun) and flags them with dq_status.

is_on_monday

Retains rows where the date field is not on Monday and flags them with dq_status.

is_on_tuesday

Retains rows where the date field is not on Tuesday and flags them with dq_status.

is_on_wednesday

Retains rows where the date field is not on Wednesday and flags them with dq_status.

is_on_thursday

Retains rows where the date field is not on Thursday and flags them with dq_status.

is_on_friday

Retains rows where the date field is not on Friday and flags them with dq_status.

is_on_saturday

Retains rows where the date field is not on Saturday and flags them with dq_status.

is_on_sunday

Retains rows where the date field is not on Sunday and flags them with dq_status.

is_contained_in

Filters rows where the specified field is not in the given list of values.

not_contained_in

Filters rows where the specified field is in the given list of values.

is_between

Filters rows where the specified field is not within the given range.

has_pattern

Filters rows where the specified field does not match the given regex pattern.

is_legit

Filters rows where the specified field is null or contains whitespace.

has_max

Filters rows where the specified field exceeds the given maximum value.

has_min

Filters rows where the specified field is below the given minimum value.

has_std

Checks if the standard deviation of the specified field exceeds the given value.

has_mean

Checks if the mean of the specified field exceeds the given value.

has_sum

Checks if the sum of the specified field exceeds the given value.

has_cardinality

Checks if the cardinality (number of unique values) of the specified field exceeds the given value.

has_infogain

Placeholder for information gain validation (currently uses cardinality).

has_entropy

Placeholder for entropy validation (currently uses cardinality).

satisfies

Filters rows that do not satisfy the given SQL condition.

validate_date_format

Filters rows where the specified field does not match the expected date format or is null.

is_future_date

Filters rows where the specified date field is after today.

is_past_date

Filters rows where the specified date field is before today.

is_date_between

Filters rows where the specified date field is not within the given [start,end] range.

is_date_after

Filters rows where the specified date field is before the given date.

is_date_before

Filters rows where the specified date field is after the given date.

all_date_checks

Alias for is_past_date (checks date against today).

validate

Validates a DataFrame against a list of rules and returns the original DataFrame with data quality status and a DataFrame of violations.

__build_rules_df

Converts a list of rules into a Polars DataFrame for summarization.

summarize

Summarizes the results of data quality checks, including pass rates and statuses.

__polars_schema_to_list

Converts a Polars DataFrame schema into a list of dictionaries.

validate_schema

Validates the schema of a DataFrame against an expected schema and returns a boolean result and a list of errors.

__build_rules_df(rules)

Builds a Polars DataFrame from a list of rule dictionaries.

This function processes a list of rule dictionaries, filters out rules that are not marked for execution, and constructs a DataFrame with the relevant rule information. It ensures uniqueness of rows based on specific columns and casts the data to appropriate types.

Parameters:

Name Type Description Default
rules list[dict]

A list of dictionaries, where each dictionary represents a rule. Each rule dictionary may contain the following keys: - "field" (str or list): The column(s) the rule applies to. - "check_type" (str): The type of rule or check. - "threshold" (float, optional): The pass threshold for the rule. Defaults to 1.0. - "value" (any, optional): Additional value associated with the rule. - "execute" (bool, optional): Whether the rule should be executed. Defaults to True.

required

Returns:

Type Description
DataFrame

pl.DataFrame: A Polars DataFrame containing the processed rules with the following columns: - "column" (str): The column(s) the rule applies to, joined by commas if multiple. - "rule" (str): The type of rule or check. - "pass_threshold" (float): The pass threshold for the rule. - "value" (str): The value associated with the rule, or an empty string if not provided.

Source code in sumeh/engine/polars_engine.py
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
def __build_rules_df(rules: list[dict]) -> pl.DataFrame:
    """
    Builds a Polars DataFrame from a list of rule dictionaries.

    This function processes a list of rule dictionaries, filters out rules
    that are not marked for execution, and constructs a DataFrame with the
    relevant rule information. It ensures uniqueness of rows based on
    specific columns and casts the data to appropriate types.

    Args:
        rules (list[dict]): A list of dictionaries, where each dictionary
            represents a rule. Each rule dictionary may contain the following keys:
            - "field" (str or list): The column(s) the rule applies to.
            - "check_type" (str): The type of rule or check.
            - "threshold" (float, optional): The pass threshold for the rule. Defaults to 1.0.
            - "value" (any, optional): Additional value associated with the rule.
            - "execute" (bool, optional): Whether the rule should be executed. Defaults to True.

    Returns:
        pl.DataFrame: A Polars DataFrame containing the processed rules with the following columns:
            - "column" (str): The column(s) the rule applies to, joined by commas if multiple.
            - "rule" (str): The type of rule or check.
            - "pass_threshold" (float): The pass threshold for the rule.
            - "value" (str): The value associated with the rule, or an empty string if not provided.
    """
    rules_df = (
        pl.DataFrame(
            [
                {
                    "column": (
                        ",".join(r["field"])
                        if isinstance(r["field"], list)
                        else r["field"]
                    ),
                    "rule": r["check_type"],
                    "pass_threshold": float(r.get("threshold") or 1.0),
                    "value": r.get("value"),
                }
                for r in rules
                if r.get("execute", True)
            ]
        )
        .unique(subset=["column", "rule", "value"])
        .with_columns(
            [
                pl.col("column").cast(str),
                pl.col("rule").cast(str),
                pl.col("value").cast(str),
            ]
        )
    ).with_columns(pl.col("value").fill_null("").alias("value"))

    return rules_df

__compare_schemas(actual, expected)

Compare two lists of schema definitions and identify discrepancies.

Parameters:

Name Type Description Default
actual List[SchemaDef]

The list of actual schema definitions.

required
expected List[SchemaDef]

The list of expected schema definitions.

required

Returns:

Type Description
bool

Tuple[bool, List[Tuple[str, str]]]: A tuple where the first element is a boolean indicating

List[Tuple[str, str]]

whether the schemas match (True if they match, False otherwise), and the second element

Tuple[bool, List[Tuple[str, str]]]

is a list of tuples describing the discrepancies. Each tuple contains: - The field name (str). - A description of the discrepancy (str), such as "missing", "type mismatch", "nullable but expected non-nullable", or "extra column".

Notes
  • A field is considered "missing" if it exists in the expected schema but not in the actual schema.
  • A "type mismatch" occurs if the data type of a field in the actual schema does not match the expected data type.
  • A field is considered "nullable but expected non-nullable" if it is nullable in the actual schema but not nullable in the expected schema.
  • An "extra column" is a field that exists in the actual schema but not in the expected schema.
Source code in sumeh/services/utils.py
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def __compare_schemas(
    actual: List[SchemaDef],
    expected: List[SchemaDef],
) -> Tuple[bool, List[Tuple[str, str]]]:
    """
    Compare two lists of schema definitions and identify discrepancies.

    Args:
        actual (List[SchemaDef]): The list of actual schema definitions.
        expected (List[SchemaDef]): The list of expected schema definitions.

    Returns:
        Tuple[bool, List[Tuple[str, str]]]: A tuple where the first element is a boolean indicating
        whether the schemas match (True if they match, False otherwise), and the second element
        is a list of tuples describing the discrepancies. Each tuple contains:
            - The field name (str).
            - A description of the discrepancy (str), such as "missing", "type mismatch",
              "nullable but expected non-nullable", or "extra column".

    Notes:
        - A field is considered "missing" if it exists in the expected schema but not in the actual schema.
        - A "type mismatch" occurs if the data type of a field in the actual schema does not match
          the expected data type.
        - A field is considered "nullable but expected non-nullable" if it is nullable in the actual
          schema but not nullable in the expected schema.
        - An "extra column" is a field that exists in the actual schema but not in the expected schema.
    """

    exp_map = {c["field"]: c for c in expected}
    act_map = {c["field"]: c for c in actual}

    erros: List[Tuple[str, str]] = []

    for fld, exp in exp_map.items():
        if fld not in act_map:
            erros.append((fld, "missing"))
            continue
        act = act_map[fld]
        if act["data_type"] != exp["data_type"]:
            erros.append(
                (
                    fld,
                    f"type mismatch (got {act['data_type']}, expected {exp['data_type']})",
                )
            )

        if act["nullable"] and not exp["nullable"]:
            erros.append((fld, "nullable but expected non-nullable"))

        if exp.get("max_length") is not None:
            pass

    # 2. campos extras (se quiser)
    extras = set(act_map) - set(exp_map)
    for fld in extras:
        erros.append((fld, "extra column"))

    return len(erros) == 0, erros

__convert_value(value)

Converts the provided value to the appropriate type (date, float, or int).

Depending on the format of the input value, it will be converted to a datetime object, a floating-point number (float), or an integer (int).

Parameters:

Name Type Description Default
value str

The value to be converted, represented as a string.

required

Returns:

Type Description

Union[datetime, float, int]: The converted value, which can be a datetime object, float, or int.

Raises:

Type Description
ValueError

If the value does not match an expected format.

Source code in sumeh/services/utils.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def __convert_value(value):
    """
    Converts the provided value to the appropriate type (date, float, or int).

    Depending on the format of the input value, it will be converted to a datetime object,
    a floating-point number (float), or an integer (int).

    Args:
        value (str): The value to be converted, represented as a string.

    Returns:
        Union[datetime, float, int]: The converted value, which can be a datetime object, float, or int.

    Raises:
        ValueError: If the value does not match an expected format.
    """
    from datetime import datetime

    value = value.strip()
    try:
        if "-" in value:
            return datetime.strptime(value, "%Y-%m-%d")
        else:
            return datetime.strptime(value, "%d/%m/%Y")
    except ValueError:
        if "." in value:
            return float(value)
        return int(value)

__extract_params(rule)

Source code in sumeh/services/utils.py
38
39
40
41
42
43
44
45
46
47
48
49
50
def __extract_params(rule: dict) -> tuple:
    rule_name = rule["check_type"]
    field = rule["field"]
    raw_value = rule.get("value")
    if isinstance(raw_value, str) and raw_value not in (None, "", "NULL"):
        try:
            value = __convert_value(raw_value)
        except ValueError:
            value = raw_value
    else:
        value = raw_value
    value = value if value not in (None, "", "NULL") else ""
    return field, rule_name, value

__polars_schema_to_list(df)

Converts the schema of a Polars DataFrame into a list of dictionaries, where each dictionary represents a field in the schema.

Parameters:

Name Type Description Default
df DataFrame

The Polars DataFrame whose schema is to be converted.

required

Returns:

Type Description
List[Dict[str, Any]]

List[Dict[str, Any]]: A list of dictionaries, each containing the following keys: - "field" (str): The name of the field. - "data_type" (str): The data type of the field, converted to lowercase. - "nullable" (bool): Always set to True, as Polars does not expose nullability in the schema. - "max_length" (None): Always set to None, as max length is not applicable.

Source code in sumeh/engine/polars_engine.py
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
def __polars_schema_to_list(df: pl.DataFrame) -> List[Dict[str, Any]]:
    """
    Converts the schema of a Polars DataFrame into a list of dictionaries,
    where each dictionary represents a field in the schema.

    Args:
        df (pl.DataFrame): The Polars DataFrame whose schema is to be converted.

    Returns:
        List[Dict[str, Any]]: A list of dictionaries, each containing the following keys:
            - "field" (str): The name of the field.
            - "data_type" (str): The data type of the field, converted to lowercase.
            - "nullable" (bool): Always set to True, as Polars does not expose nullability in the schema.
            - "max_length" (None): Always set to None, as max length is not applicable.
    """
    return [
        {
            "field": name,
            "data_type": str(dtype).lower(),
            "nullable": True,  # Polars não expõe nullability no schema
            "max_length": None,
        }
        for name, dtype in df.schema.items()
    ]

__transform_date_format_in_pattern(date_format)

Source code in sumeh/services/utils.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
def __transform_date_format_in_pattern(date_format):
    date_patterns = {
        "DD": "(0[1-9]|[12][0-9]|3[01])",
        "MM": "(0[1-9]|1[012])",
        "YYYY": "(19|20)\\d\\d",
        "YY": "\\d\\d",
        " ": "\\s",
        ".": "\\.",
    }

    date_pattern = date_format
    for single_format, pattern in date_patterns.items():
        date_pattern = date_pattern.replace(single_format, pattern)

    return date_pattern

_day_of_week(df, rule, dow)

Filters a Polars DataFrame to include only rows where the day of the week of a specified date column matches the given day of the week (dow). Adds a new column indicating the data quality status.

Parameters:

Name Type Description Default
df DataFrame

The input Polars DataFrame.

required
rule dict

A dictionary containing the rule parameters. The rule should include the field name, check type, and value.

required
dow int

The target day of the week (0 = Monday, 6 = Sunday).

required

Returns:

Type Description
DataFrame

pl.DataFrame: A new DataFrame filtered by the specified day of the week

DataFrame

and with an additional "dq_status" column indicating the rule applied.

Source code in sumeh/engine/polars_engine.py
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
def _day_of_week(df: pl.DataFrame, rule: dict, dow: int) -> pl.DataFrame:
    """
    Filters a Polars DataFrame to include only rows where the day of the week
    of a specified date column matches the given day of the week (dow). Adds
    a new column indicating the data quality status.

    Args:
        df (pl.DataFrame): The input Polars DataFrame.
        rule (dict): A dictionary containing the rule parameters. The rule
            should include the field name, check type, and value.
        dow (int): The target day of the week (0 = Monday, 6 = Sunday).

    Returns:
        pl.DataFrame: A new DataFrame filtered by the specified day of the week
        and with an additional "dq_status" column indicating the rule applied.
    """
    field, check, value = __extract_params(rule)
    return df.filter(
        pl.col(field).str.strptime(pl.Date, "%Y-%m-%d").dt.weekday() == dow
    ).with_columns(pl.lit(f"{field}:{check}:{value}").alias("dq_status"))

all_date_checks(df, rule)

Applies all date-related validation checks on the given DataFrame based on the specified rule.

Parameters:

Name Type Description Default
df DataFrame

The input DataFrame to validate.

required
rule dict

A dictionary containing the validation rules to apply.

required

Returns:

Type Description
DataFrame

pl.DataFrame: The DataFrame after applying the date validation checks.

Source code in sumeh/engine/polars_engine.py
961
962
963
964
965
966
967
968
969
970
971
972
def all_date_checks(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Applies all date-related validation checks on the given DataFrame based on the specified rule.

    Args:
        df (pl.DataFrame): The input DataFrame to validate.
        rule (dict): A dictionary containing the validation rules to apply.

    Returns:
        pl.DataFrame: The DataFrame after applying the date validation checks.
    """
    return is_past_date(df, rule)

are_complete(df, rule)

Filters a Polars DataFrame to identify rows where specified fields contain null values and tags them with a data quality status.

Parameters:

Name Type Description Default
df DataFrame

The input Polars DataFrame to be checked.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'fields': A list of column names to check for null values. - 'check': A string representing the type of check (e.g., "is_null"). - 'value': A value associated with the check (not used in this function).

required

Returns:

Type Description
DataFrame

pl.DataFrame: A filtered DataFrame containing only rows where at least one of the

DataFrame

specified fields is null, with an additional column "dq_status" indicating the

DataFrame

data quality status.

Source code in sumeh/engine/polars_engine.py
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
def are_complete(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Filters a Polars DataFrame to identify rows where specified fields contain null values
    and tags them with a data quality status.

    Args:
        df (pl.DataFrame): The input Polars DataFrame to be checked.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'fields': A list of column names to check for null values.
            - 'check': A string representing the type of check (e.g., "is_null").
            - 'value': A value associated with the check (not used in this function).

    Returns:
        pl.DataFrame: A filtered DataFrame containing only rows where at least one of the
        specified fields is null, with an additional column "dq_status" indicating the
        data quality status.
    """
    fields, check, value = __extract_params(rule)
    cond = reduce(operator.or_, [pl.col(f).is_null() for f in fields])

    tag = f"{fields}:{check}:{value}"
    return df.filter(cond).with_columns(pl.lit(tag).alias("dq_status"))

are_unique(df, rule)

Checks for duplicate combinations of specified fields in a Polars DataFrame and returns a DataFrame containing the rows with duplicates along with a data quality status column.

Parameters:

Name Type Description Default
df DataFrame

The input Polars DataFrame to check for duplicates.

required
rule dict

A dictionary containing the rule parameters. It is expected to include the following keys: - 'fields': A list of column names to check for uniqueness. - 'check': A string representing the type of check (e.g., "unique"). - 'value': A value associated with the check (e.g., "True").

required

Returns:

Type Description
DataFrame

pl.DataFrame: A DataFrame containing rows with duplicate combinations of the specified fields. An additional column, "dq_status", is added to indicate the data quality status in the format "{fields}:{check}:{value}".

Source code in sumeh/engine/polars_engine.py
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
def are_unique(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Checks for duplicate combinations of specified fields in a Polars DataFrame
    and returns a DataFrame containing the rows with duplicates along with a
    data quality status column.

    Args:
        df (pl.DataFrame): The input Polars DataFrame to check for duplicates.
        rule (dict): A dictionary containing the rule parameters. It is expected
                     to include the following keys:
                     - 'fields': A list of column names to check for uniqueness.
                     - 'check': A string representing the type of check (e.g., "unique").
                     - 'value': A value associated with the check (e.g., "True").

    Returns:
        pl.DataFrame: A DataFrame containing rows with duplicate combinations of
                      the specified fields. An additional column, "dq_status",
                      is added to indicate the data quality status in the format
                      "{fields}:{check}:{value}".
    """
    fields, check, value = __extract_params(rule)
    combo = df.with_columns(
        pl.concat_str([pl.col(f).cast(str) for f in fields], separator="|").alias(
            "_combo"
        )
    )
    dupes = (
        combo.group_by("_combo")
        .agg(pl.len().alias("cnt"))
        .filter(pl.col("cnt") > 1)
        .select("_combo")
        .to_series()
        .to_list()
    )
    return (
        combo.filter(pl.col("_combo").is_in(dupes))
        .drop("_combo")
        .with_columns(pl.lit(f"{fields}:{check}:{value}").alias("dq_status"))
    )

has_cardinality(df, rule)

Checks if the cardinality (number of unique values) of a specified field in the given DataFrame satisfies a condition defined in the rule. If the cardinality exceeds the specified value, a new column "dq_status" is added to the DataFrame with a string indicating the rule violation. Otherwise, an empty DataFrame is returned.

Parameters:

Name Type Description Default
df DataFrame

The input DataFrame to evaluate.

required
rule dict

A dictionary containing the rule parameters. It should include: - "field" (str): The column name to check. - "check" (str): The type of check (e.g., "greater_than"). - "value" (int): The threshold value for the cardinality.

required

Returns:

Type Description
DataFrame

pl.DataFrame: The original DataFrame with an added "dq_status" column if the rule is violated, or an empty DataFrame if the rule is not violated.

Source code in sumeh/engine/polars_engine.py
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
def has_cardinality(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Checks if the cardinality (number of unique values) of a specified field in the given DataFrame
    satisfies a condition defined in the rule. If the cardinality exceeds the specified value,
    a new column "dq_status" is added to the DataFrame with a string indicating the rule violation.
    Otherwise, an empty DataFrame is returned.

    Args:
        df (pl.DataFrame): The input DataFrame to evaluate.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - "field" (str): The column name to check.
            - "check" (str): The type of check (e.g., "greater_than").
            - "value" (int): The threshold value for the cardinality.

    Returns:
        pl.DataFrame: The original DataFrame with an added "dq_status" column if the rule is violated,
                      or an empty DataFrame if the rule is not violated.
    """
    field, check, value = __extract_params(rule)
    card = df.select(pl.col(field).n_unique()).to_numpy()[0] or 0
    if card > value:
        return df.with_columns(pl.lit(f"{field}:{check}:{value}").alias("dq_status"))
    return df.head(0)

has_entropy(df, rule)

Evaluates the entropy of a specified field in a Polars DataFrame based on a given rule.

Parameters:

Name Type Description Default
df DataFrame

The input Polars DataFrame to evaluate.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field' (str): The column name in the DataFrame to evaluate. - 'check' (str): The type of check to perform (not used directly in this function). - 'value' (float): The threshold value for entropy comparison.

required

Returns:

Type Description
DataFrame

pl.DataFrame: - If the entropy of the specified field exceeds the given threshold (value), returns the original DataFrame with an additional column dq_status indicating the rule that was applied. - If the entropy does not exceed the threshold, returns an empty DataFrame with the same schema as the input DataFrame.

Notes
  • The entropy is calculated as the number of unique values in the specified field.
  • The dq_status column contains a string in the format "{field}:{check}:{value}".
Source code in sumeh/engine/polars_engine.py
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
def has_entropy(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Evaluates the entropy of a specified field in a Polars DataFrame based on a given rule.

    Parameters:
        df (pl.DataFrame): The input Polars DataFrame to evaluate.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field' (str): The column name in the DataFrame to evaluate.
            - 'check' (str): The type of check to perform (not used directly in this function).
            - 'value' (float): The threshold value for entropy comparison.

    Returns:
        pl.DataFrame:
            - If the entropy of the specified field exceeds the given threshold (`value`),
              returns the original DataFrame with an additional column `dq_status` indicating
              the rule that was applied.
            - If the entropy does not exceed the threshold, returns an empty DataFrame with
              the same schema as the input DataFrame.

    Notes:
        - The entropy is calculated as the number of unique values in the specified field.
        - The `dq_status` column contains a string in the format "{field}:{check}:{value}".
    """
    field, check, value = __extract_params(rule)
    ent = df.select(pl.col(field).n_unique()).to_numpy()[0] or 0.0
    if ent > value:
        return df.with_columns(pl.lit(f"{field}:{check}:{value}").alias("dq_status"))
    return df.head(0)

has_infogain(df, rule)

Evaluates whether a given DataFrame satisfies an information gain condition based on a specified rule. If the condition is met, a new column indicating the rule is added; otherwise, an empty DataFrame is returned.

Parameters:

Name Type Description Default
df DataFrame

The input DataFrame to evaluate.

required
rule dict

A dictionary containing the rule parameters. It should include the following keys: - 'field': The column name to evaluate. - 'check': The type of check to perform (not used directly in this function). - 'value': The threshold value for the information gain.

required

Returns:

Type Description
DataFrame

pl.DataFrame: The original DataFrame with an additional column named

DataFrame

"dq_status" if the condition is met, or an empty DataFrame if the

DataFrame

condition is not met.

Source code in sumeh/engine/polars_engine.py
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
def has_infogain(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Evaluates whether a given DataFrame satisfies an information gain condition
    based on a specified rule. If the condition is met, a new column indicating
    the rule is added; otherwise, an empty DataFrame is returned.

    Args:
        df (pl.DataFrame): The input DataFrame to evaluate.
        rule (dict): A dictionary containing the rule parameters. It should
            include the following keys:
            - 'field': The column name to evaluate.
            - 'check': The type of check to perform (not used directly in this function).
            - 'value': The threshold value for the information gain.

    Returns:
        pl.DataFrame: The original DataFrame with an additional column named
        "dq_status" if the condition is met, or an empty DataFrame if the
        condition is not met.
    """
    field, check, value = __extract_params(rule)
    ig = df.select(pl.col(field).n_unique()).to_numpy()[0] or 0.0
    if ig > value:
        return df.with_columns(pl.lit(f"{field}:{check}:{value}").alias("dq_status"))
    return df.head(0)

has_max(df, rule)

Filters a Polars DataFrame to include only rows where the value in a specified column exceeds a given threshold, and adds a new column indicating the rule applied.

Parameters:

Name Type Description Default
df DataFrame

The input Polars DataFrame to be filtered.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field' (str): The column name to apply the filter on. - 'check' (str): The type of check being performed (e.g., "max"). - 'value' (numeric): The threshold value to compare against.

required

Returns:

Type Description
DataFrame

pl.DataFrame: A new DataFrame containing only the rows that satisfy the condition,

DataFrame

with an additional column named "dq_status" that describes the applied rule.

Source code in sumeh/engine/polars_engine.py
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
def has_max(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Filters a Polars DataFrame to include only rows where the value in a specified
    column exceeds a given threshold, and adds a new column indicating the rule applied.

    Args:
        df (pl.DataFrame): The input Polars DataFrame to be filtered.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field' (str): The column name to apply the filter on.
            - 'check' (str): The type of check being performed (e.g., "max").
            - 'value' (numeric): The threshold value to compare against.

    Returns:
        pl.DataFrame: A new DataFrame containing only the rows that satisfy the condition,
        with an additional column named "dq_status" that describes the applied rule.
    """
    field, check, value = __extract_params(rule)
    return df.filter(pl.col(field) > value).with_columns(
        pl.lit(f"{field}:{check}:{value}").alias("dq_status")
    )

has_mean(df, rule)

Checks if the mean value of a specified column in a Polars DataFrame satisfies a given condition.

Parameters:

Name Type Description Default
df DataFrame

The input Polars DataFrame.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field' (str): The name of the column to calculate the mean for. - 'check' (str): The condition to check (e.g., 'greater than'). - 'value' (float): The threshold value to compare the mean against.

required

Returns:

Type Description
DataFrame

pl.DataFrame: - If the mean value of the specified column is greater than the threshold value, returns the original DataFrame with an additional column "dq_status" containing a string in the format "{field}:{check}:{value}". - If the condition is not met, returns an empty DataFrame with the same schema as the input.

Source code in sumeh/engine/polars_engine.py
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
def has_mean(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Checks if the mean value of a specified column in a Polars DataFrame satisfies a given condition.

    Args:
        df (pl.DataFrame): The input Polars DataFrame.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field' (str): The name of the column to calculate the mean for.
            - 'check' (str): The condition to check (e.g., 'greater than').
            - 'value' (float): The threshold value to compare the mean against.

    Returns:
        pl.DataFrame:
            - If the mean value of the specified column is greater than the threshold value,
              returns the original DataFrame with an additional column "dq_status" containing
              a string in the format "{field}:{check}:{value}".
            - If the condition is not met, returns an empty DataFrame with the same schema as the input.
    """
    field, check, value = __extract_params(rule)
    mean_val = df.select(pl.col(field).mean()).to_numpy()[0] or 0.0
    if mean_val > value:
        return df.with_columns(pl.lit(f"{field}:{check}:{value}").alias("dq_status"))
    return df.head(0)

has_min(df, rule)

Filters a Polars DataFrame to include only rows where the value of a specified column is less than a given threshold and adds a new column indicating the data quality status.

Parameters:

Name Type Description Default
df DataFrame

The input Polars DataFrame to be filtered.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field': The name of the column to apply the filter on. - 'check': A string representing the type of check (e.g., 'min'). - 'value': The threshold value for the filter.

required

Returns:

Type Description
DataFrame

pl.DataFrame: A new Polars DataFrame containing only the rows that satisfy

DataFrame

the condition, with an additional column named "dq_status" indicating the

DataFrame

applied rule in the format "field:check:value".

Source code in sumeh/engine/polars_engine.py
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
def has_min(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Filters a Polars DataFrame to include only rows where the value of a specified
    column is less than a given threshold and adds a new column indicating the
    data quality status.

    Args:
        df (pl.DataFrame): The input Polars DataFrame to be filtered.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field': The name of the column to apply the filter on.
            - 'check': A string representing the type of check (e.g., 'min').
            - 'value': The threshold value for the filter.

    Returns:
        pl.DataFrame: A new Polars DataFrame containing only the rows that satisfy
        the condition, with an additional column named "dq_status" indicating the
        applied rule in the format "field:check:value".
    """
    field, check, value = __extract_params(rule)
    return df.filter(pl.col(field) < value).with_columns(
        pl.lit(f"{field}:{check}:{value}").alias("dq_status")
    )

has_pattern(df, rule)

Filters a Polars DataFrame based on a pattern-matching rule and adds a data quality status column.

Parameters:

Name Type Description Default
df DataFrame

The input Polars DataFrame to be filtered.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field': The column name in the DataFrame to apply the pattern check. - 'check': A descriptive label for the check being performed. - 'pattern': The regex pattern to match against the column values.

required

Returns:

Type Description
DataFrame

pl.DataFrame: A new DataFrame with rows not matching the pattern removed and an additional

DataFrame

column named "dq_status" indicating the rule applied in the format "field:check:pattern".

Source code in sumeh/engine/polars_engine.py
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
def has_pattern(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Filters a Polars DataFrame based on a pattern-matching rule and adds a data quality status column.

    Args:
        df (pl.DataFrame): The input Polars DataFrame to be filtered.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field': The column name in the DataFrame to apply the pattern check.
            - 'check': A descriptive label for the check being performed.
            - 'pattern': The regex pattern to match against the column values.

    Returns:
        pl.DataFrame: A new DataFrame with rows not matching the pattern removed and an additional
        column named "dq_status" indicating the rule applied in the format "field:check:pattern".
    """
    field, check, pattern = __extract_params(rule)
    return df.filter(~pl.col(field).str.contains(pattern, literal=False)).with_columns(
        pl.lit(f"{field}:{check}:{pattern}").alias("dq_status")
    )

has_std(df, rule)

Evaluates whether the standard deviation of a specified column in a Polars DataFrame exceeds a given threshold and returns a modified DataFrame accordingly.

Parameters:

Name Type Description Default
df DataFrame

The input Polars DataFrame to evaluate.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field' (str): The name of the column to calculate the standard deviation for. - 'check' (str): A descriptive label for the check being performed. - 'value' (float): The threshold value for the standard deviation.

required

Returns:

Type Description
DataFrame

pl.DataFrame: A modified DataFrame. If the standard deviation of the specified column

DataFrame

exceeds the threshold, the DataFrame will include a new column dq_status with a

DataFrame

descriptive string. Otherwise, an empty DataFrame with the dq_status column is returned.

Source code in sumeh/engine/polars_engine.py
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
def has_std(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Evaluates whether the standard deviation of a specified column in a Polars DataFrame
    exceeds a given threshold and returns a modified DataFrame accordingly.

    Args:
        df (pl.DataFrame): The input Polars DataFrame to evaluate.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field' (str): The name of the column to calculate the standard deviation for.
            - 'check' (str): A descriptive label for the check being performed.
            - 'value' (float): The threshold value for the standard deviation.

    Returns:
        pl.DataFrame: A modified DataFrame. If the standard deviation of the specified column
        exceeds the threshold, the DataFrame will include a new column `dq_status` with a
        descriptive string. Otherwise, an empty DataFrame with the `dq_status` column is returned.
    """
    field, check, value = __extract_params(rule)
    std_val = df.select(pl.col(field).std()).to_numpy()[0] or 0.0
    if std_val > value:
        return df.with_columns(pl.lit(f"{field}:{check}:{value}").alias("dq_status"))
    return df.head(0).with_columns(pl.lit("dq_status").alias("dq_status")).head(0)

has_sum(df, rule)

Checks if the sum of a specified column in a Polars DataFrame exceeds a given value.

Parameters:

Name Type Description Default
df DataFrame

The input Polars DataFrame.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field': The name of the column to sum. - 'check': A string representing the check type (not used in this function). - 'value': The threshold value to compare the sum against.

required

Returns:

Type Description
DataFrame

pl.DataFrame: If the sum of the specified column exceeds the given value,

DataFrame

returns the original DataFrame with an additional column dq_status containing

DataFrame

a string in the format "{field}:{check}:{value}". Otherwise, returns an empty DataFrame.

Source code in sumeh/engine/polars_engine.py
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
def has_sum(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Checks if the sum of a specified column in a Polars DataFrame exceeds a given value.

    Args:
        df (pl.DataFrame): The input Polars DataFrame.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field': The name of the column to sum.
            - 'check': A string representing the check type (not used in this function).
            - 'value': The threshold value to compare the sum against.

    Returns:
        pl.DataFrame: If the sum of the specified column exceeds the given value,
        returns the original DataFrame with an additional column `dq_status` containing
        a string in the format "{field}:{check}:{value}". Otherwise, returns an empty DataFrame.
    """
    field, check, value = __extract_params(rule)
    sum_val = df.select(pl.col(field).sum()).to_numpy()[0] or 0.0
    if sum_val > value:
        return df.with_columns(pl.lit(f"{field}:{check}:{value}").alias("dq_status"))
    return df.head(0)

is_between(df, rule)

Filters a Polars DataFrame to exclude rows where the specified field's value falls within a given range, and adds a column indicating the data quality status.

Parameters:

Name Type Description Default
df DataFrame

The input Polars DataFrame to filter.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field': The name of the column to check. - 'check': The type of check being performed (e.g., "is_between"). - 'value': A string representing the range in the format "[lo,hi]".

required

Returns:

Type Description
DataFrame

pl.DataFrame: A new Polars DataFrame with rows outside the specified range

DataFrame

and an additional column named "dq_status" indicating the rule applied.

Raises:

Type Description
ValueError

If the 'value' parameter is not in the expected format "[lo,hi]".

Source code in sumeh/engine/polars_engine.py
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
def is_between(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Filters a Polars DataFrame to exclude rows where the specified field's value
    falls within a given range, and adds a column indicating the data quality status.

    Args:
        df (pl.DataFrame): The input Polars DataFrame to filter.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field': The name of the column to check.
            - 'check': The type of check being performed (e.g., "is_between").
            - 'value': A string representing the range in the format "[lo,hi]".

    Returns:
        pl.DataFrame: A new Polars DataFrame with rows outside the specified range
        and an additional column named "dq_status" indicating the rule applied.

    Raises:
        ValueError: If the 'value' parameter is not in the expected format "[lo,hi]".
    """
    field, check, value = __extract_params(rule)
    lo, hi = value.strip("[]").split(",")
    lo, hi = __convert_value(lo), __convert_value(hi)
    return df.filter(~pl.col(field).is_between(lo, hi)).with_columns(
        pl.lit(f"{field}:{check}:{value}").alias("dq_status")
    )

is_complete(df, rule)

Filters a Polars DataFrame to include only rows where the specified field is not null and appends a new column indicating the data quality status.

Parameters:

Name Type Description Default
df DataFrame

The input Polars DataFrame to be filtered and modified.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field' (str): The name of the column to check for non-null values. - 'check' (str): A descriptive string for the type of check being performed. - 'value' (str): A value associated with the rule for status annotation.

required

Returns:

Type Description
DataFrame

pl.DataFrame: A new Polars DataFrame with rows filtered based on the rule and

DataFrame

an additional column named "dq_status" containing the data quality status.

Source code in sumeh/engine/polars_engine.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
def is_complete(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Filters a Polars DataFrame to include only rows where the specified field is not null
    and appends a new column indicating the data quality status.

    Args:
        df (pl.DataFrame): The input Polars DataFrame to be filtered and modified.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field' (str): The name of the column to check for non-null values.
            - 'check' (str): A descriptive string for the type of check being performed.
            - 'value' (str): A value associated with the rule for status annotation.

    Returns:
        pl.DataFrame: A new Polars DataFrame with rows filtered based on the rule and
        an additional column named "dq_status" containing the data quality status.
    """
    field, check, value = __extract_params(rule)
    return df.filter(pl.col(field).is_not_null()).with_columns(
        pl.lit(f"{field}:{check}:{value}").alias("dq_status")
    )

is_composite_key(df, rule)

Determines if the given DataFrame satisfies the composite key condition based on the provided rule.

Parameters:

Name Type Description Default
df DataFrame

The input DataFrame to evaluate.

required
rule dict

A dictionary defining the rule to check for composite key uniqueness.

required

Returns:

Type Description
DataFrame

pl.DataFrame: A DataFrame indicating whether the composite key condition is met.

Source code in sumeh/engine/polars_engine.py
318
319
320
321
322
323
324
325
326
327
328
329
def is_composite_key(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Determines if the given DataFrame satisfies the composite key condition based on the provided rule.

    Args:
        df (pl.DataFrame): The input DataFrame to evaluate.
        rule (dict): A dictionary defining the rule to check for composite key uniqueness.

    Returns:
        pl.DataFrame: A DataFrame indicating whether the composite key condition is met.
    """
    return are_unique(df, rule)

is_contained_in(df, rule)

Filters a Polars DataFrame to exclude rows where the specified field's value is contained in a given list of values, and adds a new column indicating the rule applied.

Parameters:

Name Type Description Default
df DataFrame

The input Polars DataFrame to be filtered.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field': The column name to check. - 'check': The type of check being performed (e.g., "is_contained_in"). - 'value': A string representation of a list of values to check against, e.g., "[value1, value2, value3]".

required

Returns:

Type Description
DataFrame

pl.DataFrame: A new DataFrame with rows filtered based on the rule and an

DataFrame

additional column "dq_status" indicating the rule applied.

Source code in sumeh/engine/polars_engine.py
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
def is_contained_in(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Filters a Polars DataFrame to exclude rows where the specified field's value is
    contained in a given list of values, and adds a new column indicating the rule applied.

    Args:
        df (pl.DataFrame): The input Polars DataFrame to be filtered.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field': The column name to check.
            - 'check': The type of check being performed (e.g., "is_contained_in").
            - 'value': A string representation of a list of values to check against,
              e.g., "[value1, value2, value3]".

    Returns:
        pl.DataFrame: A new DataFrame with rows filtered based on the rule and an
        additional column "dq_status" indicating the rule applied.
    """
    field, check, value = __extract_params(rule)
    lst = [v.strip() for v in value.strip("[]").split(",")]
    return df.filter(~pl.col(field).is_in(lst)).with_columns(
        pl.lit(f"{field}:{check}:{value}").alias("dq_status")
    )

is_date_after(df, rule)

Filters a Polars DataFrame to include only rows where the specified date field is earlier than a given date, and adds a new column indicating the data quality status.

Parameters:

Name Type Description Default
df DataFrame

The input Polars DataFrame to filter.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field' (str): The name of the column containing date strings. - 'check' (str): A descriptive label for the check being performed. - 'date_str' (str): The date string in the format "%Y-%m-%d" to compare against.

required

Returns:

Type Description
DataFrame

pl.DataFrame: A new Polars DataFrame with rows filtered based on the date condition

DataFrame

and an additional column named "dq_status" indicating the applied rule.

Source code in sumeh/engine/polars_engine.py
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
def is_date_after(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Filters a Polars DataFrame to include only rows where the specified date field
    is earlier than a given date, and adds a new column indicating the data quality status.

    Args:
        df (pl.DataFrame): The input Polars DataFrame to filter.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field' (str): The name of the column containing date strings.
            - 'check' (str): A descriptive label for the check being performed.
            - 'date_str' (str): The date string in the format "%Y-%m-%d" to compare against.

    Returns:
        pl.DataFrame: A new Polars DataFrame with rows filtered based on the date condition
        and an additional column named "dq_status" indicating the applied rule.
    """
    field, check, date_str = __extract_params(rule)
    return df.filter(
        pl.col(field).str.strptime(pl.Date, "%Y-%m-%d") < date_str
    ).with_columns(pl.lit(f"{field}:{check}:{date_str}").alias("dq_status"))

is_date_before(df, rule)

Filters a Polars DataFrame to include only rows where the specified date field is after a given date, and adds a new column indicating the data quality status.

Parameters:

Name Type Description Default
df DataFrame

The input Polars DataFrame to filter.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field' (str): The name of the column to check. - 'check' (str): A descriptive label for the check being performed. - 'date_str' (str): The date string in the format "%Y-%m-%d" to compare against.

required

Returns:

Type Description
DataFrame

pl.DataFrame: A new Polars DataFrame with rows filtered based on the date condition

DataFrame

and an additional column named "dq_status" indicating the applied rule.

Source code in sumeh/engine/polars_engine.py
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
def is_date_before(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Filters a Polars DataFrame to include only rows where the specified date field
    is after a given date, and adds a new column indicating the data quality status.

    Args:
        df (pl.DataFrame): The input Polars DataFrame to filter.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field' (str): The name of the column to check.
            - 'check' (str): A descriptive label for the check being performed.
            - 'date_str' (str): The date string in the format "%Y-%m-%d" to compare against.

    Returns:
        pl.DataFrame: A new Polars DataFrame with rows filtered based on the date condition
        and an additional column named "dq_status" indicating the applied rule.
    """
    field, check, date_str = __extract_params(rule)
    return df.filter(
        pl.col(field).str.strptime(pl.Date, "%Y-%m-%d") > date_str
    ).with_columns(pl.lit(f"{field}:{check}:{date_str}").alias("dq_status"))

is_date_between(df, rule)

Filters a Polars DataFrame to exclude rows where the specified date field is within a given range.

Parameters:

Name Type Description Default
df DataFrame

The input Polars DataFrame to filter.

required
rule dict

A dictionary containing the filtering rule. It should include: - 'field': The name of the column to check. - 'check': A string representing the type of check (e.g., "is_date_between"). - 'value': A string representing the date range in the format "[YYYY-MM-DD,YYYY-MM-DD]".

required

Returns:

Type Description
DataFrame

pl.DataFrame: A new DataFrame excluding rows where the date in the specified field falls within the given inclusive range, with an additional column "dq_status" indicating the rule applied.

Source code in sumeh/engine/polars_engine.py
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
def is_date_between(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Filters a Polars DataFrame to exclude rows where the specified date field is within a given range.

    Args:
        df (pl.DataFrame): The input Polars DataFrame to filter.
        rule (dict): A dictionary containing the filtering rule. It should include:
            - 'field': The name of the column to check.
            - 'check': A string representing the type of check (e.g., "is_date_between").
            - 'value': A string representing the date range in the format "[YYYY-MM-DD,YYYY-MM-DD]".

    Returns:
        pl.DataFrame: A new DataFrame excluding rows where the date in the specified field
                      falls within the given inclusive range, with an additional column
                      "dq_status" indicating the rule applied.
    """
    field, check, raw = __extract_params(rule)
    start_str, end_str = [s.strip() for s in raw.strip("[]").split(",")]

    # build literal date expressions
    start_expr = pl.lit(start_str).str.strptime(pl.Date, "%Y-%m-%d")
    end_expr = pl.lit(end_str).str.strptime(pl.Date, "%Y-%m-%d")

    return df.filter(
        ~pl.col(field)
        .str.strptime(pl.Date, "%Y-%m-%d")
        .is_between(start_expr, end_expr)
    ).with_columns(pl.lit(f"{field}:{check}:{raw}").alias("dq_status"))

is_equal(df, rule)

Filters rows in a Polars DataFrame that do not match a specified equality condition and adds a column indicating the data quality status.

Parameters:

Name Type Description Default
df DataFrame

The input Polars DataFrame to be filtered.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field': The column name to apply the equality check on. - 'check': The type of check (expected to be 'eq' for equality). - 'value': The value to compare against.

required

Returns:

Type Description
DataFrame

pl.DataFrame: A new DataFrame with rows filtered based on the rule and an

DataFrame

additional column named "dq_status" indicating the rule applied.

Source code in sumeh/engine/polars_engine.py
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
def is_equal(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Filters rows in a Polars DataFrame that do not match a specified equality condition
    and adds a column indicating the data quality status.

    Args:
        df (pl.DataFrame): The input Polars DataFrame to be filtered.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field': The column name to apply the equality check on.
            - 'check': The type of check (expected to be 'eq' for equality).
            - 'value': The value to compare against.

    Returns:
        pl.DataFrame: A new DataFrame with rows filtered based on the rule and an
        additional column named "dq_status" indicating the rule applied.
    """
    field, check, value = __extract_params(rule)
    return df.filter(~pl.col(field).eq(value)).with_columns(
        pl.lit(f"{field}:{check}:{value}").alias("dq_status")
    )

is_equal_than(df, rule)

Filters rows in a Polars DataFrame where the specified field is not equal to a given value and adds a new column indicating the data quality status.

Parameters:

Name Type Description Default
df DataFrame

The input Polars DataFrame to be filtered.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field': The name of the column to check. - 'check': The type of check (expected to be 'equal' for this function). - 'value': The value to compare against.

required

Returns:

Type Description
DataFrame

pl.DataFrame: A new Polars DataFrame with rows filtered based on the rule and an

DataFrame

additional column named "dq_status" indicating the applied rule.

Source code in sumeh/engine/polars_engine.py
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
def is_equal_than(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Filters rows in a Polars DataFrame where the specified field is not equal to a given value
    and adds a new column indicating the data quality status.

    Args:
        df (pl.DataFrame): The input Polars DataFrame to be filtered.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field': The name of the column to check.
            - 'check': The type of check (expected to be 'equal' for this function).
            - 'value': The value to compare against.

    Returns:
        pl.DataFrame: A new Polars DataFrame with rows filtered based on the rule and an
        additional column named "dq_status" indicating the applied rule.
    """
    field, check, value = __extract_params(rule)
    return df.filter(~pl.col(field).eq(value)).with_columns(
        pl.lit(f"{field}:{check}:{value}").alias("dq_status")
    )

is_future_date(df, rule)

Filters a Polars DataFrame to include only rows where the specified date field contains a future date, based on the current date.

Parameters:

Name Type Description Default
df DataFrame

The input Polars DataFrame to filter.

required
rule dict

A dictionary containing the rule parameters. It is expected to include the field name to check, the check type, and additional parameters (ignored in this function).

required

Returns:

Type Description
DataFrame

pl.DataFrame: A new DataFrame containing only rows where the specified

DataFrame

date field is in the future. An additional column "dq_status" is added

DataFrame

to indicate the field, check type, and today's date in the format

DataFrame

"field:check:today".

Source code in sumeh/engine/polars_engine.py
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
def is_future_date(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Filters a Polars DataFrame to include only rows where the specified date field
    contains a future date, based on the current date.

    Args:
        df (pl.DataFrame): The input Polars DataFrame to filter.
        rule (dict): A dictionary containing the rule parameters. It is expected
            to include the field name to check, the check type, and additional
            parameters (ignored in this function).

    Returns:
        pl.DataFrame: A new DataFrame containing only rows where the specified
        date field is in the future. An additional column "dq_status" is added
        to indicate the field, check type, and today's date in the format
        "field:check:today".
    """
    field, check, _ = __extract_params(rule)
    today = _dt.today().isoformat()
    return df.filter(
        pl.col(field).str.strptime(pl.Date, "%Y-%m-%d") > pl.lit(today).cast(pl.Date)
    ).with_columns(pl.lit(f"{field}:{check}:{today}").alias("dq_status"))

is_greater_or_equal_than(df, rule)

Filters a Polars DataFrame to include only rows where the specified field is greater than or equal to a given value, and adds a new column indicating the data quality status.

Parameters:

Name Type Description Default
df DataFrame

The input Polars DataFrame to be filtered.

required
rule dict

A dictionary containing the filtering rule. It should include the following keys: - 'field': The name of the column to be checked. - 'check': The type of check being performed (e.g., "greater_or_equal"). - 'value': The threshold value for the comparison.

required

Returns:

Type Description
DataFrame

pl.DataFrame: A new Polars DataFrame with rows filtered based on the

DataFrame

specified rule and an additional column named "dq_status" indicating

DataFrame

the data quality status in the format "field:check:value".

Source code in sumeh/engine/polars_engine.py
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
def is_greater_or_equal_than(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Filters a Polars DataFrame to include only rows where the specified field
    is greater than or equal to a given value, and adds a new column indicating
    the data quality status.

    Args:
        df (pl.DataFrame): The input Polars DataFrame to be filtered.
        rule (dict): A dictionary containing the filtering rule. It should
            include the following keys:
            - 'field': The name of the column to be checked.
            - 'check': The type of check being performed (e.g., "greater_or_equal").
            - 'value': The threshold value for the comparison.

    Returns:
        pl.DataFrame: A new Polars DataFrame with rows filtered based on the
        specified rule and an additional column named "dq_status" indicating
        the data quality status in the format "field:check:value".
    """
    field, check, value = __extract_params(rule)
    return df.filter(pl.col(field) < value).with_columns(
        pl.lit(f"{field}:{check}:{value}").alias("dq_status")
    )

is_greater_than(df, rule)

Filters a Polars DataFrame to include only rows where the specified field's value is less than or equal to a given value, and adds a new column indicating the data quality status.

Parameters:

Name Type Description Default
df DataFrame

The Polars DataFrame to filter.

required
rule dict

A dictionary containing the filtering rule. It should include: - 'field': The name of the column to apply the filter on. - 'check': A string describing the check (e.g., "greater_than"). - 'value': The value to compare against.

required

Returns:

Type Description
DataFrame

pl.DataFrame: A new DataFrame with rows filtered based on the rule and an

DataFrame

additional column named "dq_status" indicating the applied rule.

Source code in sumeh/engine/polars_engine.py
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
def is_greater_than(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Filters a Polars DataFrame to include only rows where the specified field's value
    is less than or equal to a given value, and adds a new column indicating the
    data quality status.

    Args:
        df (pl.DataFrame): The Polars DataFrame to filter.
        rule (dict): A dictionary containing the filtering rule. It should include:
            - 'field': The name of the column to apply the filter on.
            - 'check': A string describing the check (e.g., "greater_than").
            - 'value': The value to compare against.

    Returns:
        pl.DataFrame: A new DataFrame with rows filtered based on the rule and an
        additional column named "dq_status" indicating the applied rule.
    """
    field, check, value = __extract_params(rule)
    return df.filter(pl.col(field) <= value).with_columns(
        pl.lit(f"{field}:{check}:{value}").alias("dq_status")
    )

is_in(df, rule)

Checks if the rows in the given DataFrame satisfy the conditions specified in the rule.

Parameters:

Name Type Description Default
df DataFrame

The input DataFrame to evaluate.

required
rule dict

A dictionary specifying the conditions to check against the DataFrame.

required

Returns:

Type Description
DataFrame

pl.DataFrame: A DataFrame containing rows that satisfy the specified conditions.

Source code in sumeh/engine/polars_engine.py
495
496
497
498
499
500
501
502
503
504
505
506
def is_in(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Checks if the rows in the given DataFrame satisfy the conditions specified in the rule.

    Args:
        df (pl.DataFrame): The input DataFrame to evaluate.
        rule (dict): A dictionary specifying the conditions to check against the DataFrame.

    Returns:
        pl.DataFrame: A DataFrame containing rows that satisfy the specified conditions.
    """
    return is_contained_in(df, rule)

is_in_billions(df, rule)

Filters a Polars DataFrame to include only rows where the specified field's value is less than one billion and adds a new column indicating the data quality status.

Parameters:

Name Type Description Default
df DataFrame

The input Polars DataFrame to be filtered.

required
rule dict

A dictionary containing the rule parameters. It should include: - field (str): The name of the column to check. - check (str): The type of check being performed (e.g., "less_than"). - value (any): The value associated with the rule (not used in this function).

required

Returns:

Type Description
DataFrame

pl.DataFrame: A new DataFrame with rows filtered based on the rule and an

DataFrame

additional column named "dq_status" containing a string in the format

DataFrame

"{field}:{check}:{value}".

Source code in sumeh/engine/polars_engine.py
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
def is_in_billions(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Filters a Polars DataFrame to include only rows where the specified field's value
    is less than one billion and adds a new column indicating the data quality status.

    Args:
        df (pl.DataFrame): The input Polars DataFrame to be filtered.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - field (str): The name of the column to check.
            - check (str): The type of check being performed (e.g., "less_than").
            - value (any): The value associated with the rule (not used in this function).

    Returns:
        pl.DataFrame: A new DataFrame with rows filtered based on the rule and an
        additional column named "dq_status" containing a string in the format
        "{field}:{check}:{value}".
    """
    field, check, value = __extract_params(rule)
    return df.filter(pl.col(field) < 1_000_000_000).with_columns(
        pl.lit(f"{field}:{check}:{value}").alias("dq_status")
    )

is_in_millions(df, rule)

Filters a Polars DataFrame to include only rows where the specified field's value is less than one million and adds a new column indicating the data quality status.

Parameters:

Name Type Description Default
df DataFrame

The input Polars DataFrame to be filtered.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field': The name of the column to check. - 'check': A string describing the check being performed. - 'value': A value associated with the rule (used for status annotation).

required

Returns:

Type Description
DataFrame

pl.DataFrame: A new Polars DataFrame with rows filtered based on the rule and

DataFrame

an additional column named "dq_status" containing the data quality status.

Source code in sumeh/engine/polars_engine.py
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
def is_in_millions(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Filters a Polars DataFrame to include only rows where the specified field's value
    is less than one million and adds a new column indicating the data quality status.

    Args:
        df (pl.DataFrame): The input Polars DataFrame to be filtered.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field': The name of the column to check.
            - 'check': A string describing the check being performed.
            - 'value': A value associated with the rule (used for status annotation).

    Returns:
        pl.DataFrame: A new Polars DataFrame with rows filtered based on the rule and
        an additional column named "dq_status" containing the data quality status.
    """
    field, check, value = __extract_params(rule)

    return df.filter(pl.col(field) < 1_000_000).with_columns(
        pl.lit(f"{field}:{check}:{value}").alias("dq_status")
    )

is_legit(df, rule)

Filters a Polars DataFrame based on a validation rule and appends a data quality status column.

Parameters:

Name Type Description Default
df DataFrame

The input Polars DataFrame to validate.

required
rule dict

A dictionary containing the validation rule. It should include: - 'field': The name of the column to validate. - 'check': The type of validation check (e.g., regex, condition). - 'value': The value or pattern to validate against.

required

Returns:

Type Description
DataFrame

pl.DataFrame: A new DataFrame containing rows that failed the validation,

DataFrame

with an additional column 'dq_status' indicating the validation rule applied.

Source code in sumeh/engine/polars_engine.py
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
def is_legit(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Filters a Polars DataFrame based on a validation rule and appends a data quality status column.

    Args:
        df (pl.DataFrame): The input Polars DataFrame to validate.
        rule (dict): A dictionary containing the validation rule. It should include:
            - 'field': The name of the column to validate.
            - 'check': The type of validation check (e.g., regex, condition).
            - 'value': The value or pattern to validate against.

    Returns:
        pl.DataFrame: A new DataFrame containing rows that failed the validation,
        with an additional column 'dq_status' indicating the validation rule applied.
    """
    field, check, value = __extract_params(rule)
    mask = pl.col(field).is_not_null() & pl.col(field).str.contains(r"^\S+$")
    return df.filter(~mask).with_columns(
        pl.lit(f"{field}:{check}:{value}").alias("dq_status")
    )

is_less_or_equal_than(df, rule)

Filters a Polars DataFrame to include only rows where the specified field's value is greater than the given value, and adds a new column indicating the rule applied.

Parameters:

Name Type Description Default
df DataFrame

The input Polars DataFrame to be filtered.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field': The name of the column to apply the filter on. - 'check': The type of check being performed (e.g., 'less_or_equal_than'). - 'value': The value to compare against.

required

Returns:

Type Description
DataFrame

pl.DataFrame: A new DataFrame with rows filtered based on the rule and an

DataFrame

additional column named "dq_status" indicating the rule applied.

Source code in sumeh/engine/polars_engine.py
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
def is_less_or_equal_than(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Filters a Polars DataFrame to include only rows where the specified field's value
    is greater than the given value, and adds a new column indicating the rule applied.

    Args:
        df (pl.DataFrame): The input Polars DataFrame to be filtered.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field': The name of the column to apply the filter on.
            - 'check': The type of check being performed (e.g., 'less_or_equal_than').
            - 'value': The value to compare against.

    Returns:
        pl.DataFrame: A new DataFrame with rows filtered based on the rule and an
        additional column named "dq_status" indicating the rule applied.
    """
    field, check, value = __extract_params(rule)
    return df.filter(pl.col(field) > value).with_columns(
        pl.lit(f"{field}:{check}:{value}").alias("dq_status")
    )

is_less_than(df, rule)

Filters a Polars DataFrame to include only rows where the specified field is greater than or equal to a given value. Adds a new column indicating the data quality status.

Parameters:

Name Type Description Default
df DataFrame

The input Polars DataFrame to filter.

required
rule dict

A dictionary containing the filtering rule. It should include the following keys: - 'field': The name of the column to apply the filter on. - 'check': A string representing the type of check (not used in logic). - 'value': The threshold value for the filter.

required

Returns:

Type Description
DataFrame

pl.DataFrame: A new Polars DataFrame with rows filtered based on the

DataFrame

condition and an additional column named "dq_status" containing the

DataFrame

rule description in the format "field:check:value".

Source code in sumeh/engine/polars_engine.py
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
def is_less_than(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Filters a Polars DataFrame to include only rows where the specified field
    is greater than or equal to a given value. Adds a new column indicating
    the data quality status.

    Args:
        df (pl.DataFrame): The input Polars DataFrame to filter.
        rule (dict): A dictionary containing the filtering rule. It should
            include the following keys:
            - 'field': The name of the column to apply the filter on.
            - 'check': A string representing the type of check (not used in logic).
            - 'value': The threshold value for the filter.

    Returns:
        pl.DataFrame: A new Polars DataFrame with rows filtered based on the
        condition and an additional column named "dq_status" containing the
        rule description in the format "field:check:value".
    """
    field, check, value = __extract_params(rule)
    return df.filter(pl.col(field) >= value).with_columns(
        pl.lit(f"{field}:{check}:{value}").alias("dq_status")
    )

is_negative(df, rule)

Filters a Polars DataFrame to exclude rows where the specified field is negative and adds a new column indicating the data quality status.

Parameters:

Name Type Description Default
df DataFrame

The input Polars DataFrame to be filtered.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field': The name of the column to check. - 'check': The type of check being performed (e.g., "is_negative"). - 'value': The value associated with the rule (not used in this function).

required

Returns:

Type Description
DataFrame

pl.DataFrame: A new DataFrame with rows where the specified field is non-negative

DataFrame

and an additional column named "dq_status" containing the rule details.

Source code in sumeh/engine/polars_engine.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
def is_negative(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Filters a Polars DataFrame to exclude rows where the specified field is negative
    and adds a new column indicating the data quality status.

    Args:
        df (pl.DataFrame): The input Polars DataFrame to be filtered.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field': The name of the column to check.
            - 'check': The type of check being performed (e.g., "is_negative").
            - 'value': The value associated with the rule (not used in this function).

    Returns:
        pl.DataFrame: A new DataFrame with rows where the specified field is non-negative
        and an additional column named "dq_status" containing the rule details.
    """
    field, check, value = __extract_params(rule)
    return df.filter(pl.col(field) >= 0).with_columns(
        [pl.lit(f"{field}:{check}:{value}").alias("dq_status")]
    )

is_on_friday(df, rule)

Filters a Polars DataFrame to include only rows where the date corresponds to a Friday.

Parameters:

Name Type Description Default
df DataFrame

The input Polars DataFrame containing the data to filter.

required
rule dict

A dictionary containing filtering rules or parameters.

required

Returns:

Type Description
DataFrame

pl.DataFrame: A new Polars DataFrame containing only the rows where the date is a Friday.

Source code in sumeh/engine/polars_engine.py
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
def is_on_friday(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Filters a Polars DataFrame to include only rows where the date corresponds to a Friday.

    Args:
        df (pl.DataFrame): The input Polars DataFrame containing the data to filter.
        rule (dict): A dictionary containing filtering rules or parameters.

    Returns:
        pl.DataFrame: A new Polars DataFrame containing only the rows where the date is a Friday.
    """
    return _day_of_week(df, rule, 4)

is_on_monday(df, rule)

Filters the given DataFrame to include only rows where the date corresponds to a Monday.

Parameters:

Name Type Description Default
df DataFrame

The input DataFrame to filter.

required
rule dict

A dictionary containing rules or parameters for filtering.

required

Returns:

Type Description
DataFrame

pl.DataFrame: A new DataFrame containing only the rows where the date is a Monday.

Source code in sumeh/engine/polars_engine.py
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
def is_on_monday(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Filters the given DataFrame to include only rows where the date corresponds to a Monday.

    Args:
        df (pl.DataFrame): The input DataFrame to filter.
        rule (dict): A dictionary containing rules or parameters for filtering.

    Returns:
        pl.DataFrame: A new DataFrame containing only the rows where the date is a Monday.
    """
    return _day_of_week(df, rule, 0)

is_on_saturday(df, rule)

Determines if the dates in the given DataFrame fall on a Saturday.

Parameters:

Name Type Description Default
df DataFrame

The input DataFrame containing date information.

required
rule dict

A dictionary containing rules or parameters for the operation.

required

Returns:

Type Description
DataFrame

pl.DataFrame: A DataFrame with the result of the operation, indicating whether each date is on a Saturday.

Source code in sumeh/engine/polars_engine.py
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
def is_on_saturday(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Determines if the dates in the given DataFrame fall on a Saturday.

    Args:
        df (pl.DataFrame): The input DataFrame containing date information.
        rule (dict): A dictionary containing rules or parameters for the operation.

    Returns:
        pl.DataFrame: A DataFrame with the result of the operation, indicating whether each date is on a Saturday.
    """
    return _day_of_week(df, rule, 5)

is_on_sunday(df, rule)

Filters the given DataFrame to include only rows where the date corresponds to Sunday.

Parameters:

Name Type Description Default
df DataFrame

The input DataFrame containing date-related data.

required
rule dict

A dictionary containing rules or parameters for filtering.

required

Returns:

Type Description
DataFrame

pl.DataFrame: A filtered DataFrame containing only rows where the date is a Sunday.

Source code in sumeh/engine/polars_engine.py
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
def is_on_sunday(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Filters the given DataFrame to include only rows where the date corresponds to Sunday.

    Args:
        df (pl.DataFrame): The input DataFrame containing date-related data.
        rule (dict): A dictionary containing rules or parameters for filtering.

    Returns:
        pl.DataFrame: A filtered DataFrame containing only rows where the date is a Sunday.
    """
    return _day_of_week(df, rule, 6)

is_on_thursday(df, rule)

Filters a Polars DataFrame to include only rows where the date corresponds to a Thursday.

Parameters:

Name Type Description Default
df DataFrame

The input Polars DataFrame containing the data to filter.

required
rule dict

A dictionary containing filtering rules or parameters.

required

Returns:

Type Description
DataFrame

pl.DataFrame: A new Polars DataFrame containing only the rows where the date is a Thursday.

Source code in sumeh/engine/polars_engine.py
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
def is_on_thursday(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Filters a Polars DataFrame to include only rows where the date corresponds to a Thursday.

    Args:
        df (pl.DataFrame): The input Polars DataFrame containing the data to filter.
        rule (dict): A dictionary containing filtering rules or parameters.

    Returns:
        pl.DataFrame: A new Polars DataFrame containing only the rows where the date is a Thursday.
    """
    return _day_of_week(df, rule, 3)

is_on_tuesday(df, rule)

Filters the given DataFrame to include only rows where the day of the week matches Tuesday.

Parameters:

Name Type Description Default
df DataFrame

The input DataFrame to filter.

required
rule dict

A dictionary containing rules or parameters for filtering.

required

Returns:

Type Description
DataFrame

pl.DataFrame: A new DataFrame containing only rows where the day of the week is Tuesday.

Source code in sumeh/engine/polars_engine.py
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
def is_on_tuesday(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Filters the given DataFrame to include only rows where the day of the week matches Tuesday.

    Args:
        df (pl.DataFrame): The input DataFrame to filter.
        rule (dict): A dictionary containing rules or parameters for filtering.

    Returns:
        pl.DataFrame: A new DataFrame containing only rows where the day of the week is Tuesday.
    """
    return _day_of_week(df, rule, 1)

is_on_wednesday(df, rule)

Filters the given DataFrame to include only rows where the day of the week matches Wednesday.

Parameters:

Name Type Description Default
df DataFrame

The input DataFrame to filter.

required
rule dict

A dictionary containing rules or parameters for filtering.

required

Returns:

Type Description
DataFrame

pl.DataFrame: A filtered DataFrame containing only rows corresponding to Wednesday.

Source code in sumeh/engine/polars_engine.py
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
def is_on_wednesday(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Filters the given DataFrame to include only rows where the day of the week matches Wednesday.

    Args:
        df (pl.DataFrame): The input DataFrame to filter.
        rule (dict): A dictionary containing rules or parameters for filtering.

    Returns:
        pl.DataFrame: A filtered DataFrame containing only rows corresponding to Wednesday.
    """
    return _day_of_week(df, rule, 2)

is_on_weekday(df, rule)

Filters a Polars DataFrame to include only rows where the specified date field falls on a weekday (Monday to Friday). Adds a new column indicating the rule applied.

Parameters:

Name Type Description Default
df DataFrame

The input Polars DataFrame.

required
rule dict

A dictionary containing the rule parameters. It is expected to have keys that can be extracted using the __extract_params function.

required

Returns:

Type Description
DataFrame

pl.DataFrame: A new DataFrame filtered to include only rows where the date field falls on a weekday, with an additional column named "dq_status" indicating the applied rule in the format "field:check:value".

Source code in sumeh/engine/polars_engine.py
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
def is_on_weekday(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Filters a Polars DataFrame to include only rows where the specified date field
    falls on a weekday (Monday to Friday). Adds a new column indicating the rule applied.

    Args:
        df (pl.DataFrame): The input Polars DataFrame.
        rule (dict): A dictionary containing the rule parameters. It is expected to have
                     keys that can be extracted using the `__extract_params` function.

    Returns:
        pl.DataFrame: A new DataFrame filtered to include only rows where the date field
                      falls on a weekday, with an additional column named "dq_status"
                      indicating the applied rule in the format "field:check:value".
    """
    field, check, value = __extract_params(rule)
    return df.filter(
        pl.col(field).str.strptime(pl.Date, "%Y-%m-%d").dt.weekday() < 5
    ).with_columns(pl.lit(f"{field}:{check}:{value}").alias("dq_status"))

is_on_weekend(df, rule)

Filters a Polars DataFrame to include only rows where the specified date field falls on a weekend (Saturday or Sunday). Adds a new column indicating the data quality status.

Parameters:

Name Type Description Default
df DataFrame

The input Polars DataFrame.

required
rule dict

A dictionary containing the rule parameters. It is expected to include the following keys: - 'field': The name of the column containing date strings. - 'check': A string representing the type of check being performed. - 'value': A value associated with the rule (not used in the logic).

required

Returns:

Type Description
DataFrame

pl.DataFrame: A new Polars DataFrame filtered to include only rows where

DataFrame

the specified date field falls on a weekend. The resulting DataFrame also

DataFrame

includes an additional column named "dq_status" with a string indicating

DataFrame

the rule applied.

Source code in sumeh/engine/polars_engine.py
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
def is_on_weekend(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Filters a Polars DataFrame to include only rows where the specified date field
    falls on a weekend (Saturday or Sunday). Adds a new column indicating the
    data quality status.

    Args:
        df (pl.DataFrame): The input Polars DataFrame.
        rule (dict): A dictionary containing the rule parameters. It is expected
            to include the following keys:
            - 'field': The name of the column containing date strings.
            - 'check': A string representing the type of check being performed.
            - 'value': A value associated with the rule (not used in the logic).

    Returns:
        pl.DataFrame: A new Polars DataFrame filtered to include only rows where
        the specified date field falls on a weekend. The resulting DataFrame also
        includes an additional column named "dq_status" with a string indicating
        the rule applied.
    """
    field, check, value = __extract_params(rule)
    return df.filter(
        pl.col(field).str.strptime(pl.Date, "%Y-%m-%d").dt.weekday() >= 5
    ).with_columns(pl.lit(f"{field}:{check}:{value}").alias("dq_status"))

is_past_date(df, rule)

Filters a Polars DataFrame to include only rows where the specified date field contains a date earlier than today. Adds a new column indicating the data quality status.

Parameters:

Name Type Description Default
df DataFrame

The input Polars DataFrame to filter.

required
rule dict

A dictionary containing the rule parameters. It is expected to include the field name to check, a check identifier, and additional parameters.

required

Returns:

Type Description
DataFrame

pl.DataFrame: A new DataFrame containing only rows where the specified date field is in the past, with an additional column named "dq_status" that contains a string in the format "{field}:{check}:{today}".

Source code in sumeh/engine/polars_engine.py
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
def is_past_date(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Filters a Polars DataFrame to include only rows where the specified date field
    contains a date earlier than today. Adds a new column indicating the data quality status.

    Args:
        df (pl.DataFrame): The input Polars DataFrame to filter.
        rule (dict): A dictionary containing the rule parameters. It is expected to include
                     the field name to check, a check identifier, and additional parameters.

    Returns:
        pl.DataFrame: A new DataFrame containing only rows where the specified date field
                      is in the past, with an additional column named "dq_status" that
                      contains a string in the format "{field}:{check}:{today}".
    """
    field, check, _ = __extract_params(rule)
    today = _dt.today().isoformat()
    return df.filter(
        pl.col(field).str.strptime(pl.Date, "%Y-%m-%d") < pl.lit(today).cast(pl.Date)
    ).with_columns(pl.lit(f"{field}:{check}:{today}").alias("dq_status"))

is_positive(df, rule)

Filters a Polars DataFrame to identify rows where the specified field contains negative values and appends a new column indicating the data quality status.

Parameters:

Name Type Description Default
df DataFrame

The input Polars DataFrame to be filtered.

required
rule dict

A dictionary containing the rule parameters. It is expected to include the following keys: - 'field': The name of the column to check. - 'check': The type of check being performed (e.g., "is_positive"). - 'value': The reference value for the check.

required

Returns:

Type Description
DataFrame

pl.DataFrame: A new Polars DataFrame containing only the rows where

DataFrame

the specified field has negative values, with an additional column

DataFrame

named "dq_status" that describes the rule applied.

Source code in sumeh/engine/polars_engine.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
def is_positive(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Filters a Polars DataFrame to identify rows where the specified field
    contains negative values and appends a new column indicating the data
    quality status.

    Args:
        df (pl.DataFrame): The input Polars DataFrame to be filtered.
        rule (dict): A dictionary containing the rule parameters. It is
            expected to include the following keys:
            - 'field': The name of the column to check.
            - 'check': The type of check being performed (e.g., "is_positive").
            - 'value': The reference value for the check.

    Returns:
        pl.DataFrame: A new Polars DataFrame containing only the rows where
        the specified field has negative values, with an additional column
        named "dq_status" that describes the rule applied.
    """
    field, check, value = __extract_params(rule)
    return df.filter(pl.col(field) < 0).with_columns(
        [pl.lit(f"{field}:{check}:{value}").alias("dq_status")]
    )

is_primary_key(df, rule)

Checks if the specified rule identifies a primary key in the given DataFrame.

A primary key is a set of columns in a DataFrame that uniquely identifies each row. This function delegates the check to the is_unique function.

Parameters:

Name Type Description Default
df DataFrame

The DataFrame to check for primary key uniqueness.

required
rule dict

A dictionary specifying the rule or criteria to determine the primary key.

required

Returns:

Type Description
DataFrame

pl.DataFrame: A DataFrame indicating whether the rule satisfies the primary key condition.

Source code in sumeh/engine/polars_engine.py
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
def is_primary_key(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Checks if the specified rule identifies a primary key in the given DataFrame.

    A primary key is a set of columns in a DataFrame that uniquely identifies each row.
    This function delegates the check to the `is_unique` function.

    Args:
        df (pl.DataFrame): The DataFrame to check for primary key uniqueness.
        rule (dict): A dictionary specifying the rule or criteria to determine the primary key.

    Returns:
        pl.DataFrame: A DataFrame indicating whether the rule satisfies the primary key condition.
    """
    return is_unique(df, rule)

is_t_minus_1(df, rule)

Filters a Polars DataFrame to include only rows where the specified field matches the date of "yesterday" (T-1) and appends a new column indicating the data quality status.

Parameters:

Name Type Description Default
df DataFrame

The input Polars DataFrame to filter.

required
rule dict

A dictionary containing the rule parameters. It is expected to include the following keys: - 'field': The name of the column to check. - 'check': A string representing the type of check (used for metadata). - 'value': A value associated with the check (used for metadata).

required

Returns:

Type Description
DataFrame

pl.DataFrame: A new Polars DataFrame filtered to include only rows where

DataFrame

the specified field matches the date of yesterday (T-1). The resulting

DataFrame

DataFrame also includes an additional column named "dq_status" that

DataFrame

contains metadata about the rule applied.

Source code in sumeh/engine/polars_engine.py
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
def is_t_minus_1(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Filters a Polars DataFrame to include only rows where the specified field
    matches the date of "yesterday" (T-1) and appends a new column indicating
    the data quality status.

    Args:
        df (pl.DataFrame): The input Polars DataFrame to filter.
        rule (dict): A dictionary containing the rule parameters. It is expected
            to include the following keys:
            - 'field': The name of the column to check.
            - 'check': A string representing the type of check (used for metadata).
            - 'value': A value associated with the check (used for metadata).

    Returns:
        pl.DataFrame: A new Polars DataFrame filtered to include only rows where
        the specified field matches the date of yesterday (T-1). The resulting
        DataFrame also includes an additional column named "dq_status" that
        contains metadata about the rule applied.
    """
    field, check, value = __extract_params(rule)
    target = (_dt.today() - timedelta(days=1)).isoformat()
    return df.filter(
        pl.col(field).str.strptime(pl.Date, "%Y-%m-%d") == pl.lit(target).cast(pl.Date)
    ).with_columns(pl.lit(f"{field}:{check}:{value}").alias("dq_status"))

is_t_minus_2(df, rule)

Filters a Polars DataFrame to include only rows where the specified date field matches the date two days prior to the current date. Adds a new column indicating the data quality status.

Parameters:

Name Type Description Default
df DataFrame

The input Polars DataFrame to filter.

required
rule dict

A dictionary containing the rule parameters. It is expected to include the following keys: - 'field': The name of the date field to check. - 'check': A string representing the type of check (not used in filtering). - 'value': A value associated with the rule (not used in filtering).

required

Returns:

Type Description
DataFrame

pl.DataFrame: A new Polars DataFrame filtered to include only rows where the

DataFrame

specified date field matches the date two days ago. The resulting DataFrame

DataFrame

includes an additional column named "dq_status" with a string indicating the

DataFrame

rule applied.

Source code in sumeh/engine/polars_engine.py
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
def is_t_minus_2(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Filters a Polars DataFrame to include only rows where the specified date field
    matches the date two days prior to the current date. Adds a new column indicating
    the data quality status.

    Args:
        df (pl.DataFrame): The input Polars DataFrame to filter.
        rule (dict): A dictionary containing the rule parameters. It is expected to
            include the following keys:
            - 'field': The name of the date field to check.
            - 'check': A string representing the type of check (not used in filtering).
            - 'value': A value associated with the rule (not used in filtering).

    Returns:
        pl.DataFrame: A new Polars DataFrame filtered to include only rows where the
        specified date field matches the date two days ago. The resulting DataFrame
        includes an additional column named "dq_status" with a string indicating the
        rule applied.
    """
    field, check, value = __extract_params(rule)
    target = (_dt.today() - timedelta(days=2)).isoformat()
    return df.filter(
        pl.col(field).str.strptime(pl.Date, "%Y-%m-%d") == pl.lit(target).cast(pl.Date)
    ).with_columns(pl.lit(f"{field}:{check}:{value}").alias("dq_status"))

is_t_minus_3(df, rule)

Filters a Polars DataFrame to include only rows where the specified date field matches the date three days prior to the current date. Additionally, adds a new column indicating the data quality status.

Parameters:

Name Type Description Default
df DataFrame

The input Polars DataFrame to filter.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field': The name of the date column to check. - 'check': A string representing the type of check (used for status annotation). - 'value': A value associated with the rule (used for status annotation).

required

Returns:

Type Description
DataFrame

pl.DataFrame: A filtered Polars DataFrame with an additional column named

DataFrame

"dq_status" that contains a string in the format "{field}:{check}:{value}".

Source code in sumeh/engine/polars_engine.py
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
def is_t_minus_3(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Filters a Polars DataFrame to include only rows where the specified date field
    matches the date three days prior to the current date. Additionally, adds a
    new column indicating the data quality status.

    Args:
        df (pl.DataFrame): The input Polars DataFrame to filter.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field': The name of the date column to check.
            - 'check': A string representing the type of check (used for status annotation).
            - 'value': A value associated with the rule (used for status annotation).

    Returns:
        pl.DataFrame: A filtered Polars DataFrame with an additional column named
        "dq_status" that contains a string in the format "{field}:{check}:{value}".
    """
    field, check, value = __extract_params(rule)
    target = (_dt.today() - timedelta(days=3)).isoformat()
    return df.filter(
        pl.col(field).str.strptime(pl.Date, "%Y-%m-%d") == pl.lit(target).cast(pl.Date)
    ).with_columns(pl.lit(f"{field}:{check}:{value}").alias("dq_status"))

is_today(df, rule)

Filters a Polars DataFrame to include only rows where the specified date field matches today's date. Additionally, adds a new column "dq_status" with a formatted string indicating the rule applied.

Parameters:

Name Type Description Default
df DataFrame

The input Polars DataFrame to filter.

required
rule dict

A dictionary containing the rule parameters. It is expected to have the following keys: - field (str): The name of the column to check. - check (str): A descriptive string for the type of check (used in the "dq_status" column). - value (str): A value associated with the rule (used in the "dq_status" column).

required

Returns:

Type Description
DataFrame

pl.DataFrame: A filtered Polars DataFrame with rows matching today's date in the specified field

DataFrame

and an additional "dq_status" column describing the rule applied.

Raises:

Type Description
ValueError

If the rule dictionary does not contain the required keys or if the date parsing fails.

Source code in sumeh/engine/polars_engine.py
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
def is_today(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Filters a Polars DataFrame to include only rows where the specified date field matches today's date.
    Additionally, adds a new column "dq_status" with a formatted string indicating the rule applied.

    Args:
        df (pl.DataFrame): The input Polars DataFrame to filter.
        rule (dict): A dictionary containing the rule parameters. It is expected to have the following keys:
            - field (str): The name of the column to check.
            - check (str): A descriptive string for the type of check (used in the "dq_status" column).
            - value (str): A value associated with the rule (used in the "dq_status" column).

    Returns:
        pl.DataFrame: A filtered Polars DataFrame with rows matching today's date in the specified field
        and an additional "dq_status" column describing the rule applied.

    Raises:
        ValueError: If the rule dictionary does not contain the required keys or if the date parsing fails.
    """
    field, check, value = __extract_params(rule)
    today = _dt.today().isoformat()
    return df.filter(
        pl.col(field).str.strptime(pl.Date, "%Y-%m-%d") == pl.lit(today).cast(pl.Date)
    ).with_columns(pl.lit(f"{field}:{check}:{value}").alias("dq_status"))

is_unique(df, rule)

Checks for duplicate values in a specified field of a Polars DataFrame and returns a filtered DataFrame containing only the rows with duplicate values. Additionally, it adds a new column 'dq_status' with a formatted string indicating the field, check type, and value.

Parameters:

Name Type Description Default
df DataFrame

The input Polars DataFrame to check for duplicates.

required
rule dict

A dictionary containing the rule parameters. It is expected to have keys that allow extraction of the field to check, the type of check, and a value.

required

Returns:

Type Description
DataFrame

pl.DataFrame: A filtered DataFrame containing rows with duplicate values in the specified field, along with an additional column 'dq_status' describing the rule applied.

Source code in sumeh/engine/polars_engine.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
def is_unique(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Checks for duplicate values in a specified field of a Polars DataFrame and
    returns a filtered DataFrame containing only the rows with duplicate values.
    Additionally, it adds a new column 'dq_status' with a formatted string
    indicating the field, check type, and value.

    Args:
        df (pl.DataFrame): The input Polars DataFrame to check for duplicates.
        rule (dict): A dictionary containing the rule parameters. It is expected
                     to have keys that allow extraction of the field to check,
                     the type of check, and a value.

    Returns:
        pl.DataFrame: A filtered DataFrame containing rows with duplicate values
                      in the specified field, along with an additional column
                      'dq_status' describing the rule applied.
    """
    field, check, value = __extract_params(rule)
    dup_vals = (
        df.group_by(field)
        .agg(pl.len().alias("cnt"))
        .filter(pl.col("cnt") > 1)
        .select(field)
        .to_series()
        .to_list()
    )
    return df.filter(pl.col(field).is_in(dup_vals)).with_columns(
        pl.lit(f"{field}:{check}:{value}").alias("dq_status")
    )

not_contained_in(df, rule)

Filters a Polars DataFrame to include only rows where the specified field's value is in a given list, and adds a new column indicating the data quality status.

Parameters:

Name Type Description Default
df DataFrame

The input Polars DataFrame to filter.

required
rule dict

A dictionary containing the filtering rule. It should include: - 'field': The column name to apply the filter on. - 'check': A string representing the type of check (not used in logic). - 'value': A string representation of a list of values (e.g., "[value1, value2]").

required

Returns:

Type Description
DataFrame

pl.DataFrame: A new Polars DataFrame with rows filtered based on the rule and

DataFrame

an additional column "dq_status" indicating the applied rule.

Source code in sumeh/engine/polars_engine.py
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
def not_contained_in(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Filters a Polars DataFrame to include only rows where the specified field's value
    is in a given list, and adds a new column indicating the data quality status.

    Args:
        df (pl.DataFrame): The input Polars DataFrame to filter.
        rule (dict): A dictionary containing the filtering rule. It should include:
            - 'field': The column name to apply the filter on.
            - 'check': A string representing the type of check (not used in logic).
            - 'value': A string representation of a list of values (e.g., "[value1, value2]").

    Returns:
        pl.DataFrame: A new Polars DataFrame with rows filtered based on the rule and
        an additional column "dq_status" indicating the applied rule.
    """
    field, check, value = __extract_params(rule)
    lst = [v.strip() for v in value.strip("[]").split(",")]
    return df.filter(pl.col(field).is_in(lst)).with_columns(
        pl.lit(f"{field}:{check}:{value}").alias("dq_status")
    )

not_in(df, rule)

Filters a Polars DataFrame by excluding rows where the specified rule applies.

Parameters:

Name Type Description Default
df DataFrame

The input Polars DataFrame to filter.

required
rule dict

A dictionary specifying the filtering rule. The structure and expected keys of this dictionary depend on the implementation of the not_contained_in function.

required

Returns:

Type Description
DataFrame

pl.DataFrame: A new DataFrame with rows excluded based on the given rule.

Source code in sumeh/engine/polars_engine.py
532
533
534
535
536
537
538
539
540
541
542
543
544
545
def not_in(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Filters a Polars DataFrame by excluding rows where the specified rule applies.

    Args:
        df (pl.DataFrame): The input Polars DataFrame to filter.
        rule (dict): A dictionary specifying the filtering rule. The structure and
            expected keys of this dictionary depend on the implementation of the
            `not_contained_in` function.

    Returns:
        pl.DataFrame: A new DataFrame with rows excluded based on the given rule.
    """
    return not_contained_in(df, rule)

satisfies(df, rule)

Evaluates a given rule against a Polars DataFrame and returns rows that do not satisfy the rule.

Parameters:

Name Type Description Default
df DataFrame

The input Polars DataFrame to be evaluated.

required
rule dict

A dictionary containing the rule to be applied. The rule should include the following keys: - 'field': The column name in the DataFrame to be checked. - 'check': The type of check or condition to be applied. - 'value': The value or expression to validate against.

required

Returns:

Type Description
DataFrame

pl.DataFrame: A DataFrame containing rows that do not satisfy the rule, with an additional column dq_status indicating the rule that was violated in the format "field:check:value".

Example

rule = {"field": "age", "check": ">", "value": "18"} result = satisfies(df, rule)

Source code in sumeh/engine/polars_engine.py
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
def satisfies(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Evaluates a given rule against a Polars DataFrame and returns rows that do not satisfy the rule.

    Args:
        df (pl.DataFrame): The input Polars DataFrame to be evaluated.
        rule (dict): A dictionary containing the rule to be applied. The rule should include
                     the following keys:
                     - 'field': The column name in the DataFrame to be checked.
                     - 'check': The type of check or condition to be applied.
                     - 'value': The value or expression to validate against.

    Returns:
        pl.DataFrame: A DataFrame containing rows that do not satisfy the rule, with an additional
                      column `dq_status` indicating the rule that was violated in the format
                      "field:check:value".

    Example:
        rule = {"field": "age", "check": ">", "value": "18"}
        result = satisfies(df, rule)
    """
    field, check, value = __extract_params(rule)
    ctx = pl.SQLContext(sumeh=df)
    viol = ctx.execute(
        f"""
        SELECT *
        FROM sumeh
        WHERE NOT ({value})
        """,
        eager=True,
    )
    return viol.with_columns(pl.lit(f"{field}:{check}:{value}").alias("dq_status"))

summarize(qc_df, rules, total_rows)

Summarizes quality check results by processing a DataFrame containing data quality statuses and comparing them against defined rules.

Parameters:

Name Type Description Default
qc_df DataFrame

A Polars DataFrame containing a column dq_status with semicolon-separated strings representing data quality statuses in the format "column:rule:value".

required
rules list[dict]

A list of dictionaries where each dictionary defines a rule with keys such as "column", "rule", "value", and "pass_threshold".

required
total_rows int

The total number of rows in the original dataset, used to calculate the pass rate.

required

Returns:

Type Description
DataFrame

pl.DataFrame: A summarized DataFrame containing the following columns: - id: A unique identifier for each rule. - timestamp: The timestamp when the summary was generated. - check: A label indicating the type of check (e.g., "Quality Check"). - level: The severity level of the check (e.g., "WARNING"). - column: The column name associated with the rule. - rule: The rule being evaluated. - value: The specific value associated with the rule. - rows: The total number of rows in the dataset. - violations: The number of rows that violated the rule. - pass_rate: The proportion of rows that passed the rule. - pass_threshold: The threshold for passing the rule. - status: The status of the rule evaluation ("PASS" or "FAIL").

Source code in sumeh/engine/polars_engine.py
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
def summarize(qc_df: pl.DataFrame, rules: list[dict], total_rows: int) -> pl.DataFrame:
    """
    Summarizes quality check results by processing a DataFrame containing
    data quality statuses and comparing them against defined rules.

    Args:
        qc_df (pl.DataFrame): A Polars DataFrame containing a column `dq_status`
            with semicolon-separated strings representing data quality statuses
            in the format "column:rule:value".
        rules (list[dict]): A list of dictionaries where each dictionary defines
            a rule with keys such as "column", "rule", "value", and "pass_threshold".
        total_rows (int): The total number of rows in the original dataset, used
            to calculate the pass rate.

    Returns:
        pl.DataFrame: A summarized DataFrame containing the following columns:
            - id: A unique identifier for each rule.
            - timestamp: The timestamp when the summary was generated.
            - check: A label indicating the type of check (e.g., "Quality Check").
            - level: The severity level of the check (e.g., "WARNING").
            - column: The column name associated with the rule.
            - rule: The rule being evaluated.
            - value: The specific value associated with the rule.
            - rows: The total number of rows in the dataset.
            - violations: The number of rows that violated the rule.
            - pass_rate: The proportion of rows that passed the rule.
            - pass_threshold: The threshold for passing the rule.
            - status: The status of the rule evaluation ("PASS" or "FAIL").
    """
    exploded = (
        qc_df.select(
            pl.col("dq_status").str.split(";").list.explode().alias("dq_status")
        )
        .filter(pl.col("dq_status") != "")
        .with_columns(
            [
                pl.col("dq_status").str.split(":").list.get(0).alias("column"),
                pl.col("dq_status").str.split(":").list.get(1).alias("rule"),
                pl.col("dq_status").str.split(":").list.get(2).alias("value"),
            ]
        )
    ).drop("dq_status")
    viol_count = exploded.group_by(["column", "rule", "value"]).agg(
        pl.len().alias("violations")
    )

    rules_df = __build_rules_df(rules)

    viol_count2 = viol_count.with_columns(pl.col("value").fill_null("").alias("value"))

    step1 = rules_df.join(
        viol_count2,
        on=["column", "rule", "value"],
        how="left",
    )

    step2 = step1.with_columns([pl.col("violations").fill_null(0).alias("violations")])

    step3 = step2.with_columns(
        [
            ((pl.lit(total_rows) - pl.col("violations")) / pl.lit(total_rows)).alias(
                "pass_rate"
            )
        ]
    )

    now = datetime.now().replace(second=0, microsecond=0)
    step4 = step3.with_columns(
        [
            pl.lit(total_rows).alias("rows"),
            pl.when(pl.col("pass_rate") >= pl.col("pass_threshold"))
            .then(pl.lit("PASS"))
            .otherwise(pl.lit("FAIL"))
            .alias("status"),
            pl.lit(now).alias("timestamp"),
            pl.lit("Quality Check").alias("check"),
            pl.lit("WARNING").alias("level"),
        ]
    )

    uuids = np.array([uuid.uuid4() for _ in range(len(step4))], dtype="object")

    summary = step4.with_columns(pl.Series(uuids).alias("id")).select(
        [
            "id",
            "timestamp",
            "check",
            "level",
            "column",
            "rule",
            "value",
            "rows",
            "violations",
            "pass_rate",
            "pass_threshold",
            "status",
        ]
    )

    return summary

validate(df, rules)

Validates a Polars DataFrame against a set of rules and returns the updated DataFrame with validation statuses and a DataFrame containing the validation violations.

Parameters:

Name Type Description Default
df DataFrame

The input Polars DataFrame to validate.

required
rules list[dict]

A list of dictionaries representing validation rules. Each rule should contain the following keys: - "check_type" (str): The type of validation to perform (e.g., "is_primary_key", "is_composite_key", "has_pattern", etc.). - "value" (optional): The value to validate against, depending on the rule type. - "execute" (bool, optional): Whether to execute the rule. Defaults to True.

required

Returns:

Type Description
Tuple[DataFrame, DataFrame]

Tuple[pl.DataFrame, pl.DataFrame]: A tuple containing: - The original DataFrame with an additional "dq_status" column indicating the validation status for each row. - A DataFrame containing rows that violated the validation rules, including details of the violations.

Notes
  • The function dynamically resolves validation functions based on the "check_type" specified in the rules.
  • If a rule's "check_type" is unknown, a warning is issued, and the rule is skipped.
  • The "__id" column is temporarily added to the DataFrame for internal processing and is removed in the final output.
Source code in sumeh/engine/polars_engine.py
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
def validate(df: pl.DataFrame, rules: list[dict]) -> Tuple[pl.DataFrame, pl.DataFrame]:
    """
    Validates a Polars DataFrame against a set of rules and returns the updated DataFrame
    with validation statuses and a DataFrame containing the validation violations.

    Args:
        df (pl.DataFrame): The input Polars DataFrame to validate.
        rules (list[dict]): A list of dictionaries representing validation rules. Each rule
            should contain the following keys:
            - "check_type" (str): The type of validation to perform (e.g., "is_primary_key",
                "is_composite_key", "has_pattern", etc.).
            - "value" (optional): The value to validate against, depending on the rule type.
            - "execute" (bool, optional): Whether to execute the rule. Defaults to True.

    Returns:
        Tuple[pl.DataFrame, pl.DataFrame]: A tuple containing:
            - The original DataFrame with an additional "dq_status" column indicating the
                validation status for each row.
            - A DataFrame containing rows that violated the validation rules, including
                details of the violations.

    Notes:
        - The function dynamically resolves validation functions based on the "check_type"
            specified in the rules.
        - If a rule's "check_type" is unknown, a warning is issued, and the rule is skipped.
        - The "__id" column is temporarily added to the DataFrame for internal processing
            and is removed in the final output.
    """
    df = df.with_columns(pl.arange(0, pl.len()).alias("__id"))
    df_with_dq = df.with_columns(pl.lit("").alias("dq_status"))
    result = df_with_dq.head(0)
    for rule in rules:
        if not rule.get("execute", True):
            continue
        rule_name = rule["check_type"]
        if rule_name == "is_primary_key":
            rule_name = "is_unique"
        elif rule_name == "is_composite_key":
            rule_name = "are_unique"

        func = globals().get(rule_name)
        if func is None:
            warnings.warn(f"Unknown rule: {rule_name}")
            continue

        raw_value = rule.get("value")
        if rule_name in ("has_pattern", "satisfies"):
            value = raw_value
        else:
            try:
                value = (
                    __convert_value(raw_value)
                    if isinstance(raw_value, str) and raw_value not in ("", "NULL")
                    else raw_value
                )
            except ValueError:
                value = raw_value

        viol = func(df_with_dq, rule)
        result = pl.concat([result, viol]) if not result.is_empty() else viol

    summary = (
        result.group_by("__id", maintain_order=True)
        .agg("dq_status")
        .with_columns(pl.col("dq_status").list.join(";").alias("dq_status"))
    )
    out = df.join(summary, on="__id", how="left").drop("__id")

    return out, result

validate_date_format(df, rule)

Validates the date format of a specified field in a Polars DataFrame based on a given rule.

Parameters:

Name Type Description Default
df DataFrame

The input Polars DataFrame to validate.

required
rule dict

A dictionary containing the validation rule. It should include: - field (str): The name of the column to validate. - check (str): The name of the validation check. - fmt (str): The expected date format to validate against.

required

Returns:

Type Description
DataFrame

pl.DataFrame: A new DataFrame containing only the rows where the specified field

DataFrame

does not match the expected date format or is null. An additional column

DataFrame

"dq_status" is added to indicate the validation status in the format

DataFrame

"{field}:{check}:{fmt}".

Source code in sumeh/engine/polars_engine.py
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
def validate_date_format(df: pl.DataFrame, rule: dict) -> pl.DataFrame:
    """
    Validates the date format of a specified field in a Polars DataFrame based on a given rule.

    Args:
        df (pl.DataFrame): The input Polars DataFrame to validate.
        rule (dict): A dictionary containing the validation rule. It should include:
            - field (str): The name of the column to validate.
            - check (str): The name of the validation check.
            - fmt (str): The expected date format to validate against.

    Returns:
        pl.DataFrame: A new DataFrame containing only the rows where the specified field
        does not match the expected date format or is null. An additional column
        "dq_status" is added to indicate the validation status in the format
        "{field}:{check}:{fmt}".
    """
    field, check, fmt = __extract_params(rule)
    regex = __transform_date_format_in_pattern(fmt)
    return df.filter(
        ~pl.col(field).str.contains(regex, literal=False) | pl.col(field).is_null()
    ).with_columns(pl.lit(f"{field}:{check}:{fmt}").alias("dq_status"))

validate_schema(df, expected)

Validates the schema of a given DataFrame against an expected schema.

Parameters:

Name Type Description Default
df

The DataFrame whose schema needs to be validated.

required
expected

The expected schema, represented as a list of tuples where each tuple contains the column name and its data type.

required

Returns:

Type Description
Tuple[bool, List[Tuple[str, str]]]

Tuple[bool, List[Tuple[str, str]]]: A tuple containing: - A boolean indicating whether the schema matches the expected schema. - A list of tuples representing the errors, where each tuple contains the column name and a description of the mismatch.

Source code in sumeh/engine/polars_engine.py
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
def validate_schema(df, expected) -> Tuple[bool, List[Tuple[str, str]]]:
    """
    Validates the schema of a given DataFrame against an expected schema.

    Args:
        df: The DataFrame whose schema needs to be validated.
        expected: The expected schema, represented as a list of tuples where each tuple
                  contains the column name and its data type.

    Returns:
        Tuple[bool, List[Tuple[str, str]]]: A tuple containing:
            - A boolean indicating whether the schema matches the expected schema.
            - A list of tuples representing the errors, where each tuple contains
              the column name and a description of the mismatch.
    """
    actual = __polars_schema_to_list(df)
    result, errors = __compare_schemas(actual, expected)
    return result, errors