Skip to content

Module sumeh.engine.pyspark_engine

This module provides a set of functions for performing data quality checks on PySpark DataFrames. It includes various validation rules, schema validation, and summarization utilities.

Functions:

Name Description
is_positive

Filters rows where the specified field is negative and adds a data quality status column.

is_negative

Filters rows where the specified field is non-negative and adds a data quality status column.

is_in_millions

Retains rows where the field value is at least 1,000,000 and flags them with dq_status.

is_positive

Filters rows where the specified field is negative and adds a data quality status column.

is_negative

Filters rows where the specified field is non-negative and adds a data quality status column.

is_in_millions

Retains rows where the field value is at least 1,000,000 and flags them with dq_status.

is_in_billions

Retains rows where the field value is at least 1,000,000,000 and flags them with dq_status.

is_t_minus_1

Retains rows where the date field equals yesterday (T-1) and flags them with dq_status.

is_t_minus_2

Retains rows where the date field equals two days ago (T-2) and flags them with dq_status.

is_t_minus_3

Retains rows where the date field equals three days ago (T-3) and flags them with dq_status.

is_today

Retains rows where the date field equals today and flags them with dq_status.

is_yesterday

Retains rows where the date field equals yesterday and flags them with dq_status.

is_on_weekday

Retains rows where the date field falls on a weekday (Mon-Fri) and flags them with dq_status.

is_on_weekend

Retains rows where the date field is on a weekend (Sat-Sun) and flags them with dq_status.

is_on_monday

Retains rows where the date field is on Monday and flags them with dq_status.

is_on_tuesday

Retains rows where the date field is on Tuesday and flags them with dq_status.

is_on_wednesday

Retains rows where the date field is on Wednesday and flags them with dq_status.

is_on_thursday

Retains rows where the date field is on Thursday and flags them with dq_status.

is_on_friday

Retains rows where the date field is on Friday and flags them with dq_status.

is_on_saturday

Retains rows where the date field is on Saturday and flags them with dq_status.

is_on_sunday

Retains rows where the date field is on Sunday and flags them with dq_status.

is_complete

Filters rows where the specified field is null and adds a data quality status column.

is_unique

Identifies duplicate rows based on the specified field and adds a data quality status column.

are_complete

Filters rows where any of the specified fields are null and adds a data quality status column.

are_unique

Identifies duplicate rows based on a combination of specified fields and adds a data quality status column.

is_greater_than

Filters rows where the specified field is less than or equal to the given value.

is_greater_or_equal_than

Filters rows where the specified field is less than the given value.

is_less_than

Filters rows where the specified field is greater than or equal to the given value.

is_less_or_equal_than

Filters rows where the specified field is greater than the given value.

is_equal

Filters rows where the specified field is not equal to the given value.

is_equal_than

Alias for is_equal.

is_contained_in

Filters rows where the specified field is not in the given list of values.

not_contained_in

Filters rows where the specified field is in the given list of values.

is_between

Filters rows where the specified field is not within the given range.

has_pattern

Filters rows where the specified field does not match the given regex pattern.

is_legit

Filters rows where the specified field is null or does not match a non-whitespace pattern.

is_primary_key

DataFrame, rule: dict):

is_composite_key

DataFrame, rule: dict):

has_max

Filters rows where the specified field exceeds the given maximum value.

has_min

Filters rows where the specified field is below the given minimum value.

has_std

Checks if the standard deviation of the specified field exceeds the given value.

has_mean

Checks if the mean of the specified field exceeds the given value.

has_sum

Checks if the sum of the specified field exceeds the given value.

has_cardinality

Checks if the cardinality (distinct count) of the specified field exceeds the given value.

has_infogain

Checks if the information gain (distinct count) of the specified field exceeds the given value.

has_entropy

Checks if the entropy (distinct count) of the specified field exceeds the given value.

all_date_checks

Filters rows where the specified date field is earlier than the current date.

satisfies

Filters rows where the specified field matches the given regex pattern.

validate

Applies a list of validation rules to the DataFrame and returns the results.

summarize

Summarizes the results of data quality checks, including pass rates and violations.

validate_schema

Validates the schema of the DataFrame against the expected schema.

__rules_to_df

Converts a list of rules into a DataFrame for further processing.

__pyspark_schema_to_list

Converts the schema of a DataFrame into a list of dictionaries for comparison.

__compare_schemas(actual, expected)

Compare two lists of schema definitions and identify discrepancies.

Parameters:

Name Type Description Default
actual List[SchemaDef]

The list of actual schema definitions.

required
expected List[SchemaDef]

The list of expected schema definitions.

required

Returns:

Type Description
bool

Tuple[bool, List[Tuple[str, str]]]: A tuple where the first element is a boolean indicating

List[Tuple[str, str]]

whether the schemas match (True if they match, False otherwise), and the second element

Tuple[bool, List[Tuple[str, str]]]

is a list of tuples describing the discrepancies. Each tuple contains: - The field name (str). - A description of the discrepancy (str), such as "missing", "type mismatch", "nullable but expected non-nullable", or "extra column".

Notes
  • A field is considered "missing" if it exists in the expected schema but not in the actual schema.
  • A "type mismatch" occurs if the data type of a field in the actual schema does not match the expected data type.
  • A field is considered "nullable but expected non-nullable" if it is nullable in the actual schema but not nullable in the expected schema.
  • An "extra column" is a field that exists in the actual schema but not in the expected schema.
Source code in sumeh/services/utils.py
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def __compare_schemas(
    actual: List[SchemaDef],
    expected: List[SchemaDef],
) -> Tuple[bool, List[Tuple[str, str]]]:
    """
    Compare two lists of schema definitions and identify discrepancies.

    Args:
        actual (List[SchemaDef]): The list of actual schema definitions.
        expected (List[SchemaDef]): The list of expected schema definitions.

    Returns:
        Tuple[bool, List[Tuple[str, str]]]: A tuple where the first element is a boolean indicating
        whether the schemas match (True if they match, False otherwise), and the second element
        is a list of tuples describing the discrepancies. Each tuple contains:
            - The field name (str).
            - A description of the discrepancy (str), such as "missing", "type mismatch",
              "nullable but expected non-nullable", or "extra column".

    Notes:
        - A field is considered "missing" if it exists in the expected schema but not in the actual schema.
        - A "type mismatch" occurs if the data type of a field in the actual schema does not match
          the expected data type.
        - A field is considered "nullable but expected non-nullable" if it is nullable in the actual
          schema but not nullable in the expected schema.
        - An "extra column" is a field that exists in the actual schema but not in the expected schema.
    """

    exp_map = {c["field"]: c for c in expected}
    act_map = {c["field"]: c for c in actual}

    erros: List[Tuple[str, str]] = []

    for fld, exp in exp_map.items():
        if fld not in act_map:
            erros.append((fld, "missing"))
            continue
        act = act_map[fld]
        if act["data_type"] != exp["data_type"]:
            erros.append(
                (
                    fld,
                    f"type mismatch (got {act['data_type']}, expected {exp['data_type']})",
                )
            )

        if act["nullable"] and not exp["nullable"]:
            erros.append((fld, "nullable but expected non-nullable"))

        if exp.get("max_length") is not None:
            pass

    # 2. campos extras (se quiser)
    extras = set(act_map) - set(exp_map)
    for fld in extras:
        erros.append((fld, "extra column"))

    return len(erros) == 0, erros

__convert_value(value)

Converts the provided value to the appropriate type (date, float, or int).

Depending on the format of the input value, it will be converted to a datetime object, a floating-point number (float), or an integer (int).

Parameters:

Name Type Description Default
value str

The value to be converted, represented as a string.

required

Returns:

Type Description

Union[datetime, float, int]: The converted value, which can be a datetime object, float, or int.

Raises:

Type Description
ValueError

If the value does not match an expected format.

Source code in sumeh/services/utils.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def __convert_value(value):
    """
    Converts the provided value to the appropriate type (date, float, or int).

    Depending on the format of the input value, it will be converted to a datetime object,
    a floating-point number (float), or an integer (int).

    Args:
        value (str): The value to be converted, represented as a string.

    Returns:
        Union[datetime, float, int]: The converted value, which can be a datetime object, float, or int.

    Raises:
        ValueError: If the value does not match an expected format.
    """
    from datetime import datetime

    value = value.strip()
    try:
        if "-" in value:
            return datetime.strptime(value, "%Y-%m-%d")
        else:
            return datetime.strptime(value, "%d/%m/%Y")
    except ValueError:
        if "." in value:
            return float(value)
        return int(value)

__extract_params(rule)

Source code in sumeh/services/utils.py
38
39
40
41
42
43
44
45
46
47
48
49
50
def __extract_params(rule: dict) -> tuple:
    rule_name = rule["check_type"]
    field = rule["field"]
    raw_value = rule.get("value")
    if isinstance(raw_value, str) and raw_value not in (None, "", "NULL"):
        try:
            value = __convert_value(raw_value)
        except ValueError:
            value = raw_value
    else:
        value = raw_value
    value = value if value not in (None, "", "NULL") else ""
    return field, rule_name, value

__pyspark_schema_to_list(df)

Convert the schema of a PySpark DataFrame into a list of dictionaries.

Each dictionary in the output list represents a field in the DataFrame schema and contains the following keys: - "field": The name of the field. - "data_type": The data type of the field as a lowercase string. - "nullable": A boolean indicating whether the field allows null values. - "max_length": Always set to None (reserved for future use).

Parameters:

Name Type Description Default
df DataFrame

The PySpark DataFrame whose schema is to be converted.

required

Returns:

Type Description
List[Dict[str, Any]]

List[Dict[str, Any]]: A list of dictionaries representing the schema of the DataFrame.

Source code in sumeh/engine/pyspark_engine.py
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
def __pyspark_schema_to_list(df: DataFrame) -> List[Dict[str, Any]]:
    """
    Convert the schema of a PySpark DataFrame into a list of dictionaries.

    Each dictionary in the output list represents a field in the DataFrame schema
    and contains the following keys:
        - "field": The name of the field.
        - "data_type": The data type of the field as a lowercase string.
        - "nullable": A boolean indicating whether the field allows null values.
        - "max_length": Always set to None (reserved for future use).

    Args:
        df (DataFrame): The PySpark DataFrame whose schema is to be converted.

    Returns:
        List[Dict[str, Any]]: A list of dictionaries representing the schema of the DataFrame.
    """
    out: List[Dict[str, Any]] = []
    for f in df.schema.fields:
        out.append(
            {
                "field": f.name,
                "data_type": f.dataType.simpleString().lower(),
                "nullable": f.nullable,
                "max_length": None,
            }
        )
    return out

__rules_to_df(rules)

Converts a list of rule dictionaries into a PySpark DataFrame.

Parameters:

Name Type Description Default
rules List[Dict]

A list of dictionaries where each dictionary represents a rule. Each rule dictionary should contain the following keys: - "field" (str or list): The name of the field or a list of field names. - "check_type" (str): The type of rule or check to be applied. - "threshold" (float, optional): The threshold value for the rule. Defaults to 1.0 if not provided. - "value" (str, optional): The value associated with the rule. Defaults to "N/A" if not provided. - "execute" (bool, optional): A flag indicating whether the rule should be executed. Defaults to True.

required

Returns:

Name Type Description
DataFrame DataFrame

A PySpark DataFrame containing the following columns: - "column" (str): The name of the field. - "rule" (str): The type of rule or check. - "pass_threshold" (float): The threshold value for the rule. - "value" (str): The value associated with the rule.

Notes
  • Rows with "execute" set to False are skipped.
  • Duplicate rows based on the "column" and "rule" columns are removed.
Source code in sumeh/engine/pyspark_engine.py
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
def __rules_to_df(rules: List[Dict]) -> DataFrame:
    """
    Converts a list of rule dictionaries into a PySpark DataFrame.

    Args:
        rules (List[Dict]): A list of dictionaries where each dictionary represents a rule.
            Each rule dictionary should contain the following keys:
                - "field" (str or list): The name of the field or a list of field names.
                - "check_type" (str): The type of rule or check to be applied.
                - "threshold" (float, optional): The threshold value for the rule. Defaults to 1.0 if not provided.
                - "value" (str, optional): The value associated with the rule. Defaults to "N/A" if not provided.
                - "execute" (bool, optional): A flag indicating whether the rule should be executed. Defaults to True.

    Returns:
        DataFrame: A PySpark DataFrame containing the following columns:
            - "column" (str): The name of the field.
            - "rule" (str): The type of rule or check.
            - "pass_threshold" (float): The threshold value for the rule.
            - "value" (str): The value associated with the rule.

    Notes:
        - Rows with "execute" set to False are skipped.
        - Duplicate rows based on the "column" and "rule" columns are removed.
    """
    from pyspark.sql import SparkSession

    spark = SparkSession.builder.getOrCreate()
    rows = []
    for r in rules:
        if not r.get("execute", True):
            continue
        col_name = str(r["field"]) if isinstance(r["field"], list) else r["field"]
        rows.append(
            Row(
                column=col_name.strip(),
                rule=r["check_type"],
                pass_threshold=float(r.get("threshold") or 1.0),
                value=r.get("value", "N/A") or "N/A",
            )
        )
    return spark.createDataFrame(rows).dropDuplicates(["column", "rule"])

__transform_date_format_in_pattern(date_format)

Source code in sumeh/services/utils.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
def __transform_date_format_in_pattern(date_format):
    date_patterns = {
        "DD": "(0[1-9]|[12][0-9]|3[01])",
        "MM": "(0[1-9]|1[012])",
        "YYYY": "(19|20)\\d\\d",
        "YY": "\\d\\d",
        " ": "\\s",
        ".": "\\.",
    }

    date_pattern = date_format
    for single_format, pattern in date_patterns.items():
        date_pattern = date_pattern.replace(single_format, pattern)

    return date_pattern

all_date_checks(df, rule)

Filters the input DataFrame based on a date-related rule and adds a data quality status column.

Parameters:

Name Type Description Default
df DataFrame

The input PySpark DataFrame to be filtered.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field': The name of the column to apply the rule on. - 'check': The type of check to perform (e.g., comparison operator). - 'value': The value to be used in the check.

required

Returns:

Name Type Description
DataFrame DataFrame

A new DataFrame filtered based on the rule, with an additional column

DataFrame

"dq_status" indicating the data quality status in the format "field:check:value".

Source code in sumeh/engine/pyspark_engine.py
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
def all_date_checks(df: DataFrame, rule: dict) -> DataFrame:
    """
    Filters the input DataFrame based on a date-related rule and adds a data quality status column.

    Args:
        df (DataFrame): The input PySpark DataFrame to be filtered.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field': The name of the column to apply the rule on.
            - 'check': The type of check to perform (e.g., comparison operator).
            - 'value': The value to be used in the check.

    Returns:
        DataFrame: A new DataFrame filtered based on the rule, with an additional column
        "dq_status" indicating the data quality status in the format "field:check:value".
    """
    field, check, value = __extract_params(rule)
    return df.filter((col(field) < current_date())).withColumn(
        "dq_status", concat(lit(field), lit(":"), lit(check), lit(":"), lit(value))
    )

are_complete(df, rule)

Filters rows in a DataFrame that do not meet the completeness rule and adds a data quality status column.

Parameters:

Name Type Description Default
df DataFrame

The input PySpark DataFrame to be checked.

required
rule dict

A dictionary containing the rule parameters. It should include: - "fields" (list): A list of column names to check for completeness (non-null values). - "check" (str): A descriptive label for the type of check being performed. - "value" (str): A descriptive value associated with the check.

required

Returns:

Name Type Description
DataFrame DataFrame

A new DataFrame containing only the rows that fail the completeness check,

DataFrame

with an additional column "dq_status" describing the failed rule.

Source code in sumeh/engine/pyspark_engine.py
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
def are_complete(df: DataFrame, rule: dict) -> DataFrame:
    """
    Filters rows in a DataFrame that do not meet the completeness rule and adds a data quality status column.

    Args:
        df (DataFrame): The input PySpark DataFrame to be checked.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - "fields" (list): A list of column names to check for completeness (non-null values).
            - "check" (str): A descriptive label for the type of check being performed.
            - "value" (str): A descriptive value associated with the check.

    Returns:
        DataFrame: A new DataFrame containing only the rows that fail the completeness check,
        with an additional column "dq_status" describing the failed rule.
    """
    fields, check, value = __extract_params(rule)
    predicate = reduce(operator.and_, [col(field).isNotNull() for field in fields])
    return df.filter(~predicate).withColumn(
        "dq_status",
        concat(lit(str(fields)), lit(":"), lit(check), lit(":"), lit(value)),
    )

are_unique(df, rule)

Checks for uniqueness of specified fields in a PySpark DataFrame based on the provided rule.

Parameters:

Name Type Description Default
df DataFrame

The input PySpark DataFrame to be checked.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'fields': A list of column names to check for uniqueness. - 'check': A string representing the type of check (e.g., "unique"). - 'value': A value associated with the rule for logging or identification.

required

Returns:

Name Type Description
DataFrame DataFrame

A DataFrame containing rows that violate the uniqueness rule.

DataFrame

The resulting DataFrame includes an additional column dq_status that

DataFrame

describes the rule violation in the format: "[fields]:[check]:[value]".

Notes
  • The function concatenates the specified fields into a single column and checks for duplicate values within that column.
  • Rows that do not meet the uniqueness criteria are returned, while rows that satisfy the criteria are excluded from the result.
Source code in sumeh/engine/pyspark_engine.py
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
def are_unique(df: DataFrame, rule: dict) -> DataFrame:
    """
    Checks for uniqueness of specified fields in a PySpark DataFrame based on the provided rule.

    Args:
        df (DataFrame): The input PySpark DataFrame to be checked.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'fields': A list of column names to check for uniqueness.
            - 'check': A string representing the type of check (e.g., "unique").
            - 'value': A value associated with the rule for logging or identification.

    Returns:
        DataFrame: A DataFrame containing rows that violate the uniqueness rule.
        The resulting DataFrame includes an additional column `dq_status` that
        describes the rule violation in the format: "[fields]:[check]:[value]".

    Notes:
        - The function concatenates the specified fields into a single column
          and checks for duplicate values within that column.
        - Rows that do not meet the uniqueness criteria are returned, while
          rows that satisfy the criteria are excluded from the result.
    """
    fields, check, value = __extract_params(rule)
    combined_col = concat_ws("|", *[coalesce(col(f), lit("")) for f in fields])
    window = Window.partitionBy(combined_col)
    result = (
        df.withColumn("_count", count("*").over(window))
        .filter(col("_count") > 1)
        .drop("_count")
        .withColumn(
            "dq_status",
            concat(lit(str(fields)), lit(":"), lit(check), lit(":"), lit(value)),
        )
    )
    return result

has_cardinality(df, rule)

Checks the cardinality of a specified field in a DataFrame against a given rule.

This function evaluates whether the distinct count of values in a specified column (field) of the DataFrame exceeds a given threshold (value) as defined in the rule. If the cardinality exceeds the threshold, a new column dq_status is added to the DataFrame with information about the rule violation. Otherwise, an empty DataFrame is returned.

Parameters:

Name Type Description Default
df DataFrame

The input PySpark DataFrame to be checked.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field': The name of the column to check. - 'check': The type of check being performed (e.g., "cardinality"). - 'value': The threshold value for the cardinality.

required

Returns:

Name Type Description
DataFrame DataFrame

A DataFrame with the dq_status column added if the cardinality

DataFrame

exceeds the threshold, or an empty DataFrame if the condition is not met.

Source code in sumeh/engine/pyspark_engine.py
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
def has_cardinality(df: DataFrame, rule: dict) -> DataFrame:
    """
    Checks the cardinality of a specified field in a DataFrame against a given rule.

    This function evaluates whether the distinct count of values in a specified column
    (field) of the DataFrame exceeds a given threshold (value) as defined in the rule.
    If the cardinality exceeds the threshold, a new column `dq_status` is added to the
    DataFrame with information about the rule violation. Otherwise, an empty DataFrame
    is returned.

    Args:
        df (DataFrame): The input PySpark DataFrame to be checked.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field': The name of the column to check.
            - 'check': The type of check being performed (e.g., "cardinality").
            - 'value': The threshold value for the cardinality.

    Returns:
        DataFrame: A DataFrame with the `dq_status` column added if the cardinality
        exceeds the threshold, or an empty DataFrame if the condition is not met.
    """
    field, check, value = __extract_params(rule)
    card_val = df.select(countDistinct(col(field))).first()[0] or 0
    if card_val > value:
        return df.withColumn(
            "dq_status", concat(lit(field), lit(":"), lit(check), lit(":"), lit(value))
        )
    return df.limit(0)

has_entropy(df, rule)

Evaluates the entropy of a specified field in a DataFrame and applies a rule to determine whether the DataFrame should be processed further or filtered out.

Parameters:

Name Type Description Default
df DataFrame

The input PySpark DataFrame to evaluate.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field' (str): The name of the column to evaluate. - 'check' (str): The type of check being performed (e.g., "entropy"). - 'value' (float): The threshold value for the entropy check.

required

Returns:

Name Type Description
DataFrame DataFrame

If the entropy of the specified field exceeds the given value, returns the

DataFrame

original DataFrame with an additional column "dq_status" indicating the rule applied.

DataFrame

Otherwise, returns an empty DataFrame with the same schema as the input.

Source code in sumeh/engine/pyspark_engine.py
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
def has_entropy(df: DataFrame, rule: dict) -> DataFrame:
    """
    Evaluates the entropy of a specified field in a DataFrame and applies a rule to determine
    whether the DataFrame should be processed further or filtered out.

    Parameters:
        df (DataFrame): The input PySpark DataFrame to evaluate.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field' (str): The name of the column to evaluate.
            - 'check' (str): The type of check being performed (e.g., "entropy").
            - 'value' (float): The threshold value for the entropy check.

    Returns:
        DataFrame: If the entropy of the specified field exceeds the given value, returns the
        original DataFrame with an additional column "dq_status" indicating the rule applied.
        Otherwise, returns an empty DataFrame with the same schema as the input.
    """
    field, check, value = __extract_params(rule)
    entropy_val = df.select(countDistinct(col(field))).first()[0] or 0.0
    if entropy_val > value:
        return df.withColumn(
            "dq_status", concat(lit(field), lit(":"), lit(check), lit(":"), lit(value))
        )
    return df.limit(0)

has_infogain(df, rule)

Evaluates whether a given DataFrame satisfies an information gain condition based on the provided rule. If the condition is met, it appends a column indicating the status; otherwise, it returns an empty DataFrame.

Parameters:

Name Type Description Default
df DataFrame

The input PySpark DataFrame to evaluate.

required
rule dict

A dictionary containing the rule parameters. It should include the following keys: - 'field': The column name to evaluate. - 'check': The condition type (not used directly in the logic). - 'value': The threshold value for information gain.

required

Returns:

Name Type Description
DataFrame DataFrame

A DataFrame with an additional "dq_status" column if the information gain condition is met, or an empty DataFrame if the condition is not satisfied.

Source code in sumeh/engine/pyspark_engine.py
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
def has_infogain(df: DataFrame, rule: dict) -> DataFrame:
    """
    Evaluates whether a given DataFrame satisfies an information gain condition
    based on the provided rule. If the condition is met, it appends a column
    indicating the status; otherwise, it returns an empty DataFrame.

    Args:
        df (DataFrame): The input PySpark DataFrame to evaluate.
        rule (dict): A dictionary containing the rule parameters. It should
                     include the following keys:
                     - 'field': The column name to evaluate.
                     - 'check': The condition type (not used directly in the logic).
                     - 'value': The threshold value for information gain.

    Returns:
        DataFrame: A DataFrame with an additional "dq_status" column if the
                   information gain condition is met, or an empty DataFrame
                   if the condition is not satisfied.
    """
    field, check, value = __extract_params(rule)
    info_gain = df.select(countDistinct(col(field))).first()[0] or 0.0
    if info_gain > value:
        return df.withColumn(
            "dq_status", concat(lit(field), lit(":"), lit(check), lit(":"), lit(value))
        )
    return df.limit(0)

has_max(df, rule)

Filters a PySpark DataFrame to include only rows where the value of a specified field is greater than a given threshold. Adds a new column 'dq_status' to indicate the rule applied.

Parameters:

Name Type Description Default
df DataFrame

The input PySpark DataFrame to be filtered.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field' (str): The name of the column to apply the rule on. - 'check' (str): The type of check being performed (e.g., 'max'). - 'value' (numeric): The threshold value to compare against.

required

Returns:

Name Type Description
DataFrame DataFrame

A new DataFrame filtered based on the rule, with an additional column 'dq_status'

DataFrame

describing the rule applied in the format "field:check:value".

Source code in sumeh/engine/pyspark_engine.py
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
def has_max(df: DataFrame, rule: dict) -> DataFrame:
    """
    Filters a PySpark DataFrame to include only rows where the value of a specified field
    is greater than a given threshold. Adds a new column 'dq_status' to indicate the rule applied.

    Args:
        df (DataFrame): The input PySpark DataFrame to be filtered.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field' (str): The name of the column to apply the rule on.
            - 'check' (str): The type of check being performed (e.g., 'max').
            - 'value' (numeric): The threshold value to compare against.

    Returns:
        DataFrame: A new DataFrame filtered based on the rule, with an additional column 'dq_status'
        describing the rule applied in the format "field:check:value".
    """
    field, check, value = __extract_params(rule)
    return df.filter(col(field) > value).withColumn(
        "dq_status", concat(lit(field), lit(":"), lit(check), lit(":"), lit(value))
    )

has_mean(df, rule)

Evaluates whether the mean value of a specified column in a DataFrame satisfies a given rule.

Parameters:

Name Type Description Default
df DataFrame

The input PySpark DataFrame to evaluate.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field' (str): The name of the column to calculate the mean for. - 'check' (str): The type of check being performed (e.g., 'greater_than'). - 'value' (float): The threshold value to compare the mean against.

required

Returns:

Name Type Description
DataFrame DataFrame

If the mean value of the specified column exceeds the threshold,

DataFrame

returns the original DataFrame with an additional column dq_status indicating

DataFrame

the rule violation. If the mean value satisfies the rule, returns an empty DataFrame.

Source code in sumeh/engine/pyspark_engine.py
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
def has_mean(df: DataFrame, rule: dict) -> DataFrame:
    """
    Evaluates whether the mean value of a specified column in a DataFrame satisfies a given rule.

    Args:
        df (DataFrame): The input PySpark DataFrame to evaluate.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field' (str): The name of the column to calculate the mean for.
            - 'check' (str): The type of check being performed (e.g., 'greater_than').
            - 'value' (float): The threshold value to compare the mean against.

    Returns:
        DataFrame: If the mean value of the specified column exceeds the threshold,
        returns the original DataFrame with an additional column `dq_status` indicating
        the rule violation. If the mean value satisfies the rule, returns an empty DataFrame.
    """
    field, check, value = __extract_params(rule)
    mean_val = (df.select(avg(col(field))).first()[0]) or 0.0
    if mean_val > value:  # regra falhou
        return df.withColumn(
            "dq_status", concat(lit(field), lit(":"), lit(check), lit(":"), lit(value))
        )
    else:  # passou
        return df.limit(0)

has_min(df, rule)

Filters rows in a DataFrame where the value of a specified field is less than a given threshold and adds a new column indicating the data quality status.

Parameters:

Name Type Description Default
df DataFrame

The input PySpark DataFrame to be filtered.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field' (str): The name of the column to check. - 'check' (str): The type of check being performed (e.g., "min"). - 'value' (numeric): The threshold value for the check.

required

Returns:

Name Type Description
DataFrame DataFrame

A new DataFrame with rows filtered based on the rule and an additional

DataFrame

"dq_status" column containing a string representation of the rule applied.

Source code in sumeh/engine/pyspark_engine.py
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
def has_min(df: DataFrame, rule: dict) -> DataFrame:
    """
    Filters rows in a DataFrame where the value of a specified field is less than a given threshold
    and adds a new column indicating the data quality status.

    Args:
        df (DataFrame): The input PySpark DataFrame to be filtered.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field' (str): The name of the column to check.
            - 'check' (str): The type of check being performed (e.g., "min").
            - 'value' (numeric): The threshold value for the check.

    Returns:
        DataFrame: A new DataFrame with rows filtered based on the rule and an additional
        "dq_status" column containing a string representation of the rule applied.
    """
    field, check, value = __extract_params(rule)
    return df.filter(col(field) < value).withColumn(
        "dq_status", concat(lit(field), lit(":"), lit(check), lit(":"), lit(value))
    )

has_pattern(df, rule)

Filters a PySpark DataFrame based on a pattern match and adds a data quality status column.

Parameters:

Name Type Description Default
df DataFrame

The input PySpark DataFrame to be filtered.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field': The name of the column to apply the pattern check. - 'check': A descriptive label for the type of check being performed. - 'value': The regex pattern to match against the column values.

required

Returns:

Name Type Description
DataFrame DataFrame

A new DataFrame with rows that do not match the pattern filtered out. Additionally, a "dq_status" column is added, containing a string representation of the rule applied in the format "field:check:value".

Source code in sumeh/engine/pyspark_engine.py
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
def has_pattern(df: DataFrame, rule: dict) -> DataFrame:
    """
    Filters a PySpark DataFrame based on a pattern match and adds a data quality status column.

    Args:
        df (DataFrame): The input PySpark DataFrame to be filtered.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field': The name of the column to apply the pattern check.
            - 'check': A descriptive label for the type of check being performed.
            - 'value': The regex pattern to match against the column values.

    Returns:
        DataFrame: A new DataFrame with rows that do not match the pattern filtered out.
                   Additionally, a "dq_status" column is added, containing a string
                   representation of the rule applied in the format "field:check:value".
    """
    field, check, value = __extract_params(rule)
    return df.filter(~col(field).rlike(value)).withColumn(
        "dq_status", concat(lit(field), lit(":"), lit(check), lit(":"), lit(value))
    )

has_std(df, rule)

Checks if the standard deviation of a specified field in a DataFrame exceeds a given value.

Parameters:

Name Type Description Default
df DataFrame

The input PySpark DataFrame.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field' (str): The name of the column to calculate the standard deviation for. - 'check' (str): A descriptive label for the check being performed. - 'value' (float): The threshold value for the standard deviation.

required

Returns:

Name Type Description
DataFrame DataFrame

If the standard deviation of the specified field exceeds the given value,

DataFrame

returns the original DataFrame with an additional column "dq_status" indicating the

DataFrame

field, check, and value. Otherwise, returns an empty DataFrame.

Source code in sumeh/engine/pyspark_engine.py
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
def has_std(df: DataFrame, rule: dict) -> DataFrame:
    """
    Checks if the standard deviation of a specified field in a DataFrame exceeds a given value.

    Args:
        df (DataFrame): The input PySpark DataFrame.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field' (str): The name of the column to calculate the standard deviation for.
            - 'check' (str): A descriptive label for the check being performed.
            - 'value' (float): The threshold value for the standard deviation.

    Returns:
        DataFrame: If the standard deviation of the specified field exceeds the given value,
        returns the original DataFrame with an additional column "dq_status" indicating the
        field, check, and value. Otherwise, returns an empty DataFrame.
    """
    field, check, value = __extract_params(rule)
    std_val = df.select(stddev(col(field))).first()[0]
    std_val = std_val or 0.0
    if std_val > value:
        return df.withColumn(
            "dq_status", concat(lit(field), lit(":"), lit(check), lit(":"), lit(value))
        )
    else:
        return df.limit(0)

has_sum(df, rule)

Checks if the sum of values in a specified column of a DataFrame exceeds a given threshold.

Parameters:

Name Type Description Default
df DataFrame

The input PySpark DataFrame to evaluate.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field' (str): The name of the column to sum. - 'check' (str): A descriptive label for the check being performed. - 'value' (float): The threshold value to compare the sum against.

required

Returns:

Name Type Description
DataFrame DataFrame

If the sum of the specified column exceeds the threshold, returns the original

DataFrame

DataFrame with an additional column dq_status indicating the rule details. If the sum

DataFrame

does not exceed the threshold, returns an empty DataFrame.

Source code in sumeh/engine/pyspark_engine.py
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
def has_sum(df: DataFrame, rule: dict) -> DataFrame:
    """
    Checks if the sum of values in a specified column of a DataFrame exceeds a given threshold.

    Args:
        df (DataFrame): The input PySpark DataFrame to evaluate.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field' (str): The name of the column to sum.
            - 'check' (str): A descriptive label for the check being performed.
            - 'value' (float): The threshold value to compare the sum against.

    Returns:
        DataFrame: If the sum of the specified column exceeds the threshold, returns the original
        DataFrame with an additional column `dq_status` indicating the rule details. If the sum
        does not exceed the threshold, returns an empty DataFrame.
    """
    field, check, value = __extract_params(rule)
    sum_val = (df.select(sum(col(field))).first()[0]) or 0.0
    if sum_val > value:
        return df.withColumn(
            "dq_status", concat(lit(field), lit(":"), lit(check), lit(":"), lit(value))
        )
    return df.limit(0)

is_between(df, rule)

Filters rows in a PySpark DataFrame where the value of a specified field is not within a given range. Adds a new column 'dq_status' to indicate the rule that was applied.

Parameters:

Name Type Description Default
df DataFrame

The input PySpark DataFrame.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field': The name of the column to check. - 'check': A string representing the type of check (e.g., "between"). - 'value': A string representing the range in the format "[min_value,max_value]".

required

Returns:

Name Type Description
DataFrame DataFrame

A new DataFrame with rows filtered based on the rule and an additional

DataFrame

'dq_status' column indicating the applied rule.

Source code in sumeh/engine/pyspark_engine.py
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
def is_between(df: DataFrame, rule: dict) -> DataFrame:
    """
    Filters rows in a PySpark DataFrame where the value of a specified field is not within a given range.
    Adds a new column 'dq_status' to indicate the rule that was applied.

    Args:
        df (DataFrame): The input PySpark DataFrame.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field': The name of the column to check.
            - 'check': A string representing the type of check (e.g., "between").
            - 'value': A string representing the range in the format "[min_value,max_value]".

    Returns:
        DataFrame: A new DataFrame with rows filtered based on the rule and an additional
        'dq_status' column indicating the applied rule.
    """
    field, check, value = __extract_params(rule)
    min_value, max_value = value.strip("[]").split(",")
    return df.filter(~col(field).between(min_value, max_value)).withColumn(
        "dq_status", concat(lit(field), lit(":"), lit(check), lit(":"), lit(value))
    )

is_complete(df, rule)

Filters a DataFrame to identify rows where a specified field is null and adds a "dq_status" column indicating the data quality rule applied.

Parameters:

Name Type Description Default
df DataFrame

The input PySpark DataFrame to be checked.

required
rule dict

A dictionary containing the data quality rule. It should include: - "field" (str): The name of the field to check for null values. - "check" (str): A description of the check being performed. - "value" (str): Additional information about the rule.

required

Returns:

Name Type Description
DataFrame DataFrame

A new DataFrame filtered to include only rows where the specified

DataFrame

field is null, with an additional "dq_status" column describing the rule.

Source code in sumeh/engine/pyspark_engine.py
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
def is_complete(df: DataFrame, rule: dict) -> DataFrame:
    """
    Filters a DataFrame to identify rows where a specified field is null and adds a
    "dq_status" column indicating the data quality rule applied.

    Args:
        df (DataFrame): The input PySpark DataFrame to be checked.
        rule (dict): A dictionary containing the data quality rule. It should include:
            - "field" (str): The name of the field to check for null values.
            - "check" (str): A description of the check being performed.
            - "value" (str): Additional information about the rule.

    Returns:
        DataFrame: A new DataFrame filtered to include only rows where the specified
        field is null, with an additional "dq_status" column describing the rule.
    """
    field, check, value = __extract_params(rule)
    return df.filter(col(field).isNull()).withColumn(
        "dq_status", concat(lit(field), lit(":"), lit(check), lit(":"), lit(value))
    )

is_composite_key(df, rule)

Determines if the given DataFrame satisfies the composite key condition based on the provided rule.

A composite key is a combination of two or more columns in a DataFrame that uniquely identify a row.

Parameters:

Name Type Description Default
df DataFrame

The PySpark DataFrame to be evaluated.

required
rule dict

A dictionary containing the rules or criteria to determine the composite key.

required

Returns:

Name Type Description
bool

True if the DataFrame satisfies the composite key condition, False otherwise.

Source code in sumeh/engine/pyspark_engine.py
817
818
819
820
821
822
823
824
825
826
827
828
829
830
def is_composite_key(df: DataFrame, rule: dict):
    """
    Determines if the given DataFrame satisfies the composite key condition based on the provided rule.

    A composite key is a combination of two or more columns in a DataFrame that uniquely identify a row.

    Args:
        df (DataFrame): The PySpark DataFrame to be evaluated.
        rule (dict): A dictionary containing the rules or criteria to determine the composite key.

    Returns:
        bool: True if the DataFrame satisfies the composite key condition, False otherwise.
    """
    return are_unique(df, rule)

is_contained_in(df, rule)

Filters rows in a PySpark DataFrame based on whether a specified column's value is not contained in a given list of values. Adds a new column 'dq_status' to indicate the rule applied.

Parameters:

Name Type Description Default
df DataFrame

The input PySpark DataFrame to filter.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field': The name of the column to check. - 'check': The type of check being performed (e.g., "is_contained_in"). - 'value': A string representation of a list of values (e.g., "[value1,value2]").

required

Returns:

Name Type Description
DataFrame DataFrame

A new PySpark DataFrame with rows filtered based on the rule

DataFrame

and an additional column 'dq_status' describing the rule applied.

Example

rule = {"field": "column_name", "check": "is_contained_in", "value": "[value1,value2]"} result_df = is_contained_in(input_df, rule)

Source code in sumeh/engine/pyspark_engine.py
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
def is_contained_in(df: DataFrame, rule: dict) -> DataFrame:
    """
    Filters rows in a PySpark DataFrame based on whether a specified column's value
    is not contained in a given list of values. Adds a new column 'dq_status' to
    indicate the rule applied.

    Args:
        df (DataFrame): The input PySpark DataFrame to filter.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field': The name of the column to check.
            - 'check': The type of check being performed (e.g., "is_contained_in").
            - 'value': A string representation of a list of values (e.g., "[value1,value2]").

    Returns:
        DataFrame: A new PySpark DataFrame with rows filtered based on the rule
        and an additional column 'dq_status' describing the rule applied.

    Example:
        rule = {"field": "column_name", "check": "is_contained_in", "value": "[value1,value2]"}
        result_df = is_contained_in(input_df, rule)
    """
    field, check, value = __extract_params(rule)
    positive_list = value.strip("[]").split(",")
    return df.filter(~col(field).isin(positive_list)).withColumn(
        "dq_status", concat(lit(field), lit(":"), lit(check), lit(":"), lit(value))
    )

is_date_after(df, rule)

Filters a DataFrame to identify rows where a specified field has a date lower than the date informed in the rule and adds a "dq_status" column indicating the data quality rule applied.

Parameters:

Name Type Description Default
df DataFrame

The input PySpark DataFrame to be checked.

required
rule dict

A dictionary containing the data quality rule. It should include: - "field" (str): The name of the field to check for null values. - "check" (str): A description of the check being performed. - "value" (str): Additional information about the rule.

required

Returns:

Name Type Description
DataFrame DataFrame

A new DataFrame filtered to include only rows where the specified

DataFrame

field is null, with an additional "dq_status" column describing the rule.

Source code in sumeh/engine/pyspark_engine.py
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
def is_date_after(df: DataFrame, rule: dict) -> DataFrame:
    """
    Filters a DataFrame to identify rows where a specified field has a date lower than the date informed in the rule
    and adds a "dq_status" column indicating the data quality rule applied.

    Args:
        df (DataFrame): The input PySpark DataFrame to be checked.
        rule (dict): A dictionary containing the data quality rule. It should include:
            - "field" (str): The name of the field to check for null values.
            - "check" (str): A description of the check being performed.
            - "value" (str): Additional information about the rule.

    Returns:
        DataFrame: A new DataFrame filtered to include only rows where the specified
        field is null, with an additional "dq_status" column describing the rule.
    """

    field, check, value = __extract_params(rule)
    return df.filter(col(field) < value).withColumn(
        "dq_status", concat(lit(field), lit(":"), lit(check), lit(":"), lit(value))
    )

is_date_before(df, rule)

Filters a DataFrame to identify rows where a specified field has a date greater than the date informed in the rule and adds a "dq_status" column indicating the data quality rule applied.

Parameters:

Name Type Description Default
df DataFrame

The input PySpark DataFrame to be checked.

required
rule dict

A dictionary containing the data quality rule. It should include: - "field" (str): The name of the field to check for null values. - "check" (str): A description of the check being performed. - "value" (str): Additional information about the rule.

required

Returns:

Name Type Description
DataFrame DataFrame

A new DataFrame filtered to include only rows where the specified

DataFrame

field is null, with an additional "dq_status" column describing the rule.

Source code in sumeh/engine/pyspark_engine.py
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
def is_date_before(df: DataFrame, rule: dict) -> DataFrame:
    """
    Filters a DataFrame to identify rows where a specified field has a date greater than the date informed in the rule
    and adds a "dq_status" column indicating the data quality rule applied.

    Args:
        df (DataFrame): The input PySpark DataFrame to be checked.
        rule (dict): A dictionary containing the data quality rule. It should include:
            - "field" (str): The name of the field to check for null values.
            - "check" (str): A description of the check being performed.
            - "value" (str): Additional information about the rule.

    Returns:
        DataFrame: A new DataFrame filtered to include only rows where the specified
        field is null, with an additional "dq_status" column describing the rule.
    """

    field, check, value = __extract_params(rule)
    return df.filter(col(field) > value).withColumn(
        "dq_status", concat(lit(field), lit(":"), lit(check), lit(":"), lit(value))
    )

is_date_between(df, rule)

Filters a DataFrame to identify rows where a specified field has a date between two dates passed in the rule using the format: "[, ]" and adds a "dq_status" column indicating the data quality rule applied.

Parameters:

Name Type Description Default
df DataFrame

The input PySpark DataFrame to be checked.

required
rule dict

A dictionary containing the data quality rule. It should include: - "field" (str): The name of the field to check for null values. - "check" (str): A description of the check being performed. - "value" (str): Additional information about the rule.

required

Returns:

Name Type Description
DataFrame DataFrame

A new DataFrame filtered to include only rows where the specified

DataFrame

field is null, with an additional "dq_status" column describing the rule.

Source code in sumeh/engine/pyspark_engine.py
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
def is_date_between(df: DataFrame, rule: dict) -> DataFrame:
    """
    Filters a DataFrame to identify rows where a specified field has a date between two dates passed in the rule using
    the format: "[<initial_date>, <final_date>]" and adds a "dq_status" column indicating the data quality rule applied.

    Args:
        df (DataFrame): The input PySpark DataFrame to be checked.
        rule (dict): A dictionary containing the data quality rule. It should include:
            - "field" (str): The name of the field to check for null values.
            - "check" (str): A description of the check being performed.
            - "value" (str): Additional information about the rule.

    Returns:
        DataFrame: A new DataFrame filtered to include only rows where the specified
        field is null, with an additional "dq_status" column describing the rule.
    """

    field, check, value = __extract_params(rule)
    start_date, end_date = value.strip("[]").split(",")
    return df.filter(~col(field).between(start_date, end_date)).withColumn(
        "dq_status", concat(lit(field), lit(":"), lit(check), lit(":"), lit(value))
    )

is_equal(df, rule)

Filters a PySpark DataFrame based on a rule that checks for equality between a specified field and a given value. Rows that do not satisfy the equality condition are retained, and a new column "dq_status" is added to indicate the rule applied.

Parameters:

Name Type Description Default
df DataFrame

The input PySpark DataFrame to be filtered.

required
rule dict

A dictionary containing the rule parameters. It should include: - "field" (str): The name of the column to check. - "check" (str): The type of check (e.g., "equal"). This is used for logging purposes. - "value" (Any): The value to compare against.

required

Returns:

Name Type Description
DataFrame DataFrame

A new DataFrame with rows that do not satisfy the equality condition and an

DataFrame

additional "dq_status" column describing the rule applied.

Source code in sumeh/engine/pyspark_engine.py
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
def is_equal(df: DataFrame, rule: dict) -> DataFrame:
    """
    Filters a PySpark DataFrame based on a rule that checks for equality between a specified field
    and a given value. Rows that do not satisfy the equality condition are retained, and a new
    column "dq_status" is added to indicate the rule applied.

    Args:
        df (DataFrame): The input PySpark DataFrame to be filtered.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - "field" (str): The name of the column to check.
            - "check" (str): The type of check (e.g., "equal"). This is used for logging purposes.
            - "value" (Any): The value to compare against.

    Returns:
        DataFrame: A new DataFrame with rows that do not satisfy the equality condition and an
        additional "dq_status" column describing the rule applied.
    """
    field, check, value = __extract_params(rule)
    return df.filter(~col(field).eqNullSafe(value)).withColumn(
        "dq_status", concat(lit(field), lit(":"), lit(check), lit(":"), lit(value))
    )

is_equal_than(df, rule)

Filters rows in a PySpark DataFrame that do not satisfy an equality condition specified in the rule dictionary and adds a "dq_status" column with details about the rule applied.

Parameters:

Name Type Description Default
df DataFrame

The input PySpark DataFrame to be filtered.

required
rule dict

A dictionary containing the rule parameters. It should include: - "field" (str): The name of the column to check. - "check" (str): The type of check being performed (e.g., "equal"). - "value" (Any): The value to compare against.

required

Returns:

Name Type Description
DataFrame DataFrame

A new DataFrame with rows filtered based on the rule and an

DataFrame

additional "dq_status" column describing the rule applied.

Source code in sumeh/engine/pyspark_engine.py
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
def is_equal_than(df: DataFrame, rule: dict) -> DataFrame:
    """
    Filters rows in a PySpark DataFrame that do not satisfy an equality condition
    specified in the rule dictionary and adds a "dq_status" column with details
    about the rule applied.

    Args:
        df (DataFrame): The input PySpark DataFrame to be filtered.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - "field" (str): The name of the column to check.
            - "check" (str): The type of check being performed (e.g., "equal").
            - "value" (Any): The value to compare against.

    Returns:
        DataFrame: A new DataFrame with rows filtered based on the rule and an
        additional "dq_status" column describing the rule applied.
    """
    field, check, value = __extract_params(rule)
    return df.filter(~col(field).eqNullSafe(value)).withColumn(
        "dq_status", concat(lit(field), lit(":"), lit(check), lit(":"), lit(value))
    )

is_future_date(df, rule)

Filters a DataFrame to identify rows where a specified field has a date greater than the current date and adds a "dq_status" column indicating the data quality rule applied.

Parameters:

Name Type Description Default
df DataFrame

The input PySpark DataFrame to be checked.

required
rule dict

A dictionary containing the data quality rule. It should include: - "field" (str): The name of the field to check for null values. - "check" (str): A description of the check being performed. - "value" (str): Additional information about the rule.

required

Returns:

Name Type Description
DataFrame DataFrame

A new DataFrame filtered to include only rows where the specified

DataFrame

field is null, with an additional "dq_status" column describing the rule.

Source code in sumeh/engine/pyspark_engine.py
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
def is_future_date(df: DataFrame, rule: dict) -> DataFrame:
    """
    Filters a DataFrame to identify rows where a specified field has a date greater than the current date and
    adds a "dq_status" column indicating the data quality rule applied.

    Args:
        df (DataFrame): The input PySpark DataFrame to be checked.
        rule (dict): A dictionary containing the data quality rule. It should include:
            - "field" (str): The name of the field to check for null values.
            - "check" (str): A description of the check being performed.
            - "value" (str): Additional information about the rule.

    Returns:
        DataFrame: A new DataFrame filtered to include only rows where the specified
        field is null, with an additional "dq_status" column describing the rule.
    """

    field, check, value = __extract_params(rule)
    return df.filter(col(field) > current_date()).withColumn(
        "dq_status", concat(lit(field), lit(":"), lit(check), lit(":"), lit(value))
    )

is_greater_or_equal_than(df, rule)

Filters rows in a DataFrame where the value of a specified field is less than a given value and adds a new column "dq_status" with a formatted string indicating the rule applied.

Parameters:

Name Type Description Default
df DataFrame

The input PySpark DataFrame to filter.

required
rule dict

A dictionary containing the rule parameters. It should include: - "field" (str): The name of the column to check. - "check" (str): A descriptive string for the check (e.g., "greater_or_equal"). - "value" (numeric): The threshold value for the comparison.

required

Returns:

Name Type Description
DataFrame DataFrame

A new DataFrame with rows filtered based on the rule and an additional

DataFrame

"dq_status" column describing the rule applied.

Source code in sumeh/engine/pyspark_engine.py
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
def is_greater_or_equal_than(df: DataFrame, rule: dict) -> DataFrame:
    """
    Filters rows in a DataFrame where the value of a specified field is less than a given value
    and adds a new column "dq_status" with a formatted string indicating the rule applied.

    Args:
        df (DataFrame): The input PySpark DataFrame to filter.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - "field" (str): The name of the column to check.
            - "check" (str): A descriptive string for the check (e.g., "greater_or_equal").
            - "value" (numeric): The threshold value for the comparison.

    Returns:
        DataFrame: A new DataFrame with rows filtered based on the rule and an additional
        "dq_status" column describing the rule applied.
    """
    field, check, value = __extract_params(rule)
    return df.filter(col(field) < value).withColumn(
        "dq_status", concat(lit(field), lit(":"), lit(check), lit(":"), lit(value))
    )

is_greater_than(df, rule)

Filters rows in a DataFrame where the value of a specified field is less than or equal to a given threshold and adds a new column indicating the rule applied.

Parameters:

Name Type Description Default
df DataFrame

The input PySpark DataFrame to filter.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field' (str): The name of the column to apply the rule on. - 'check' (str): A descriptive string for the rule (e.g., "greater_than"). - 'value' (int or float): The threshold value for the comparison.

required

Returns:

Name Type Description
DataFrame DataFrame

A new DataFrame with rows filtered based on the rule and an

DataFrame

additional column "dq_status" describing the rule applied.

Source code in sumeh/engine/pyspark_engine.py
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
def is_greater_than(df: DataFrame, rule: dict) -> DataFrame:
    """
    Filters rows in a DataFrame where the value of a specified field is less than
    or equal to a given threshold and adds a new column indicating the rule applied.

    Args:
        df (DataFrame): The input PySpark DataFrame to filter.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field' (str): The name of the column to apply the rule on.
            - 'check' (str): A descriptive string for the rule (e.g., "greater_than").
            - 'value' (int or float): The threshold value for the comparison.

    Returns:
        DataFrame: A new DataFrame with rows filtered based on the rule and an
        additional column "dq_status" describing the rule applied.
    """
    field, check, value = __extract_params(rule)
    return df.filter(col(field) <= value).withColumn(
        "dq_status", concat(lit(field), lit(":"), lit(check), lit(":"), lit(value))
    )

is_in(df, rule)

Checks if the values in the specified column of a DataFrame are contained within a given set of values.

Parameters:

Name Type Description Default
df DataFrame

The input DataFrame to evaluate.

required
rule dict

A dictionary containing the rule for the check. It should specify the column name and the set of values to check against.

required

Returns:

Name Type Description
DataFrame DataFrame

A DataFrame with the applied rule, typically filtered or modified based on the check.

Source code in sumeh/engine/pyspark_engine.py
673
674
675
676
677
678
679
680
681
682
683
684
685
def is_in(df: DataFrame, rule: dict) -> DataFrame:
    """
    Checks if the values in the specified column of a DataFrame are contained within a given set of values.

    Args:
        df (DataFrame): The input DataFrame to evaluate.
        rule (dict): A dictionary containing the rule for the check. It should specify the column name
                     and the set of values to check against.

    Returns:
        DataFrame: A DataFrame with the applied rule, typically filtered or modified based on the check.
    """
    return is_contained_in(df, rule)

is_in_billions(df, rule)

Filters a PySpark DataFrame to include only rows where the specified field's value is greater than or equal to one billion, and adds a "dq_status" column with a formatted string indicating the field, check, and value.

Parameters:

Name Type Description Default
df DataFrame

The input PySpark DataFrame to filter.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field': The name of the column to check. - 'check': The type of check being performed (e.g., "greater_than"). - 'value': The threshold value for the check.

required

Returns:

Type Description

pyspark.sql.DataFrame: A new DataFrame filtered by the rule and with an

additional "dq_status" column.

Source code in sumeh/engine/pyspark_engine.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
def is_in_billions(df, rule: dict):
    """
    Filters a PySpark DataFrame to include only rows where the specified field's value
    is greater than or equal to one billion, and adds a "dq_status" column with a
    formatted string indicating the field, check, and value.

    Args:
        df (pyspark.sql.DataFrame): The input PySpark DataFrame to filter.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field': The name of the column to check.
            - 'check': The type of check being performed (e.g., "greater_than").
            - 'value': The threshold value for the check.

    Returns:
        pyspark.sql.DataFrame: A new DataFrame filtered by the rule and with an
        additional "dq_status" column.
    """
    field, check, value = __extract_params(rule)
    return df.filter(col(field) < lit(1_000_000_000)).withColumn(
        "dq_status", concat(lit(field), lit(":"), lit(check), lit(":"), lit(value))
    )

is_in_millions(df, rule)

Filters a DataFrame to include only rows where the specified field's value is greater than or equal to 1,000,000 and adds a "dq_status" column with a formatted string indicating the rule applied.

Parameters:

Name Type Description Default
df DataFrame

The input DataFrame to filter and modify.

required
rule dict

A dictionary containing the rule parameters. It should include the field to check, the check type, and the value.

required

Returns:

Type Description

pyspark.sql.DataFrame: A new DataFrame with rows filtered based on the

rule and an additional "dq_status" column describing the rule applied.

Source code in sumeh/engine/pyspark_engine.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
def is_in_millions(df, rule: dict):
    """
    Filters a DataFrame to include only rows where the specified field's value
    is greater than or equal to 1,000,000 and adds a "dq_status" column with
    a formatted string indicating the rule applied.

    Args:
        df (pyspark.sql.DataFrame): The input DataFrame to filter and modify.
        rule (dict): A dictionary containing the rule parameters. It should
                     include the field to check, the check type, and the value.

    Returns:
        pyspark.sql.DataFrame: A new DataFrame with rows filtered based on the
        rule and an additional "dq_status" column describing the rule applied.
    """
    field, check, value = __extract_params(rule)
    return df.filter(col(field) < lit(1_000_000)).withColumn(
        "dq_status", concat(lit(field), lit(":"), lit(check), lit(":"), lit(value))
    )

is_legit(df, rule)

Filters a PySpark DataFrame to identify rows that do not meet a specified rule and appends a column indicating the data quality status.

Parameters:

Name Type Description Default
df DataFrame

The input PySpark DataFrame to be validated.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field': The name of the column to validate. - 'check': The type of check being performed (e.g., "is_legit"). - 'value': The expected value or condition for the validation.

required

Returns:

Name Type Description
DataFrame DataFrame

A new DataFrame containing only the rows that fail the validation

DataFrame

rule, with an additional column "dq_status" describing the validation status

DataFrame

in the format "field:check:value".

Source code in sumeh/engine/pyspark_engine.py
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
def is_legit(df: DataFrame, rule: dict) -> DataFrame:
    """
    Filters a PySpark DataFrame to identify rows that do not meet a specified rule
    and appends a column indicating the data quality status.

    Args:
        df (DataFrame): The input PySpark DataFrame to be validated.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field': The name of the column to validate.
            - 'check': The type of check being performed (e.g., "is_legit").
            - 'value': The expected value or condition for the validation.

    Returns:
        DataFrame: A new DataFrame containing only the rows that fail the validation
        rule, with an additional column "dq_status" describing the validation status
        in the format "field:check:value".
    """
    field, check, value = __extract_params(rule)
    pattern_legit = "\S*"
    return df.filter(
        ~(col(field).isNotNull() & col(field).rlike(pattern_legit))
    ).withColumn(
        "dq_status", concat(lit(field), lit(":"), lit(check), lit(":"), lit(value))
    )

is_less_or_equal_than(df, rule)

Filters rows in a PySpark DataFrame where the value of a specified field is greater than a given value and adds a new column "dq_status" with a formatted string indicating the rule applied.

Parameters:

Name Type Description Default
df DataFrame

The input PySpark DataFrame to filter.

required
rule dict

A dictionary containing the rule parameters. It should include: - "field" (str): The name of the column to evaluate. - "check" (str): A descriptive string for the check being performed. - "value" (numeric): The threshold value to compare against.

required

Returns:

Name Type Description
DataFrame DataFrame

A new PySpark DataFrame with rows filtered based on the rule and an additional

DataFrame

"dq_status" column describing the rule applied.

Source code in sumeh/engine/pyspark_engine.py
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
def is_less_or_equal_than(df: DataFrame, rule: dict) -> DataFrame:
    """
    Filters rows in a PySpark DataFrame where the value of a specified field is greater than a given value
    and adds a new column "dq_status" with a formatted string indicating the rule applied.

    Args:
        df (DataFrame): The input PySpark DataFrame to filter.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - "field" (str): The name of the column to evaluate.
            - "check" (str): A descriptive string for the check being performed.
            - "value" (numeric): The threshold value to compare against.

    Returns:
        DataFrame: A new PySpark DataFrame with rows filtered based on the rule and an additional
        "dq_status" column describing the rule applied.
    """
    field, check, value = __extract_params(rule)
    return df.filter(col(field) > value).withColumn(
        "dq_status", concat(lit(field), lit(":"), lit(check), lit(":"), lit(value))
    )

is_less_than(df, rule)

Filters rows in a PySpark DataFrame where the specified field is greater than or equal to a given value and adds a new column indicating the rule applied.

Parameters:

Name Type Description Default
df DataFrame

The input PySpark DataFrame to filter.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field' (str): The name of the column to apply the filter on. - 'check' (str): A descriptive string for the rule (e.g., "less_than"). - 'value' (int, float, or str): The value to compare the column against.

required

Returns:

Name Type Description
DataFrame DataFrame

A new DataFrame with rows filtered based on the rule and an

DataFrame

additional column "dq_status" describing the rule applied.

Source code in sumeh/engine/pyspark_engine.py
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
def is_less_than(df: DataFrame, rule: dict) -> DataFrame:
    """
    Filters rows in a PySpark DataFrame where the specified field is greater than
    or equal to a given value and adds a new column indicating the rule applied.

    Args:
        df (DataFrame): The input PySpark DataFrame to filter.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field' (str): The name of the column to apply the filter on.
            - 'check' (str): A descriptive string for the rule (e.g., "less_than").
            - 'value' (int, float, or str): The value to compare the column against.

    Returns:
        DataFrame: A new DataFrame with rows filtered based on the rule and an
        additional column "dq_status" describing the rule applied.
    """
    field, check, value = __extract_params(rule)
    return df.filter(col(field) >= value).withColumn(
        "dq_status", concat(lit(field), lit(":"), lit(check), lit(":"), lit(value))
    )

is_negative(df, rule)

Filters rows in the given DataFrame where the specified field is non-negative and adds a new column "dq_status" containing a formatted string with rule details.

Parameters:

Name Type Description Default
df DataFrame

The input PySpark DataFrame to be filtered and modified.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field' (str): The name of the column to check. - 'check' (str): A descriptive string for the check being performed. - 'value' (any): The value associated with the rule.

required

Returns:

Name Type Description
DataFrame DataFrame

A new DataFrame with rows filtered based on the rule and an

DataFrame

additional "dq_status" column describing the rule applied.

Source code in sumeh/engine/pyspark_engine.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
def is_negative(df: DataFrame, rule: dict) -> DataFrame:
    """
    Filters rows in the given DataFrame where the specified field is non-negative
    and adds a new column "dq_status" containing a formatted string with rule details.

    Args:
        df (DataFrame): The input PySpark DataFrame to be filtered and modified.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field' (str): The name of the column to check.
            - 'check' (str): A descriptive string for the check being performed.
            - 'value' (any): The value associated with the rule.

    Returns:
        DataFrame: A new DataFrame with rows filtered based on the rule and an
        additional "dq_status" column describing the rule applied.
    """
    field, check, value = __extract_params(rule)
    return df.filter(col(field) >= 0).withColumn(
        "dq_status", concat(lit(field), lit(":"), lit(check), lit(":"), lit(value))
    )

is_on_friday(df, rule)

Filters a PySpark DataFrame to include only rows where the specified date field falls on a Friday.

Parameters:

Name Type Description Default
df DataFrame

The input DataFrame to filter.

required
rule dict

A dictionary containing the rule parameters. It is expected to have the following keys: - 'field': The name of the column in the DataFrame to check. - 'check': A string representing the type of check (not used in this function but included for consistency). - 'value': A value associated with the rule (not used in this function but included for consistency).

required

Returns:

Type Description

pyspark.sql.DataFrame: A new DataFrame filtered to include only rows where the specified date field

corresponds to a Friday. Additionally, a new column dq_status is added, which contains a string

representation of the rule applied in the format "field:check:value".

Source code in sumeh/engine/pyspark_engine.py
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
def is_on_friday(df, rule: dict):
    """
    Filters a PySpark DataFrame to include only rows where the specified date field falls on a Friday.

    Args:
        df (pyspark.sql.DataFrame): The input DataFrame to filter.
        rule (dict): A dictionary containing the rule parameters. It is expected to have the following keys:
            - 'field': The name of the column in the DataFrame to check.
            - 'check': A string representing the type of check (not used in this function but included for consistency).
            - 'value': A value associated with the rule (not used in this function but included for consistency).

    Returns:
        pyspark.sql.DataFrame: A new DataFrame filtered to include only rows where the specified date field
        corresponds to a Friday. Additionally, a new column `dq_status` is added, which contains a string
        representation of the rule applied in the format "field:check:value".
    """
    field, check, value = __extract_params(rule)
    return df.filter(dayofweek(col(field)) != 6).withColumn(
        "dq_status", concat(lit(field), lit(":"), lit(check), lit(":"), lit(value))
    )

is_on_monday(df, rule)

Filters a PySpark DataFrame to include only rows where the specified date field falls on a Monday.

Parameters:

Name Type Description Default
df DataFrame

The input PySpark DataFrame.

required
rule dict

A dictionary containing rule parameters. It is expected to include: - 'field': The name of the column to check. - 'check': A string representing the type of check (not used in this function). - 'value': A value associated with the rule (not used in this function).

required

Returns:

Type Description

pyspark.sql.DataFrame: A new DataFrame filtered to include only rows where the specified

date field corresponds to a Monday. Additionally, a new column "dq_status" is added,

containing a concatenated string of the field, check, and value.

Source code in sumeh/engine/pyspark_engine.py
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
def is_on_monday(df, rule: dict):
    """
    Filters a PySpark DataFrame to include only rows where the specified date field falls on a Monday.

    Args:
        df (pyspark.sql.DataFrame): The input PySpark DataFrame.
        rule (dict): A dictionary containing rule parameters. It is expected to include:
            - 'field': The name of the column to check.
            - 'check': A string representing the type of check (not used in this function).
            - 'value': A value associated with the rule (not used in this function).

    Returns:
        pyspark.sql.DataFrame: A new DataFrame filtered to include only rows where the specified
        date field corresponds to a Monday. Additionally, a new column "dq_status" is added,
        containing a concatenated string of the field, check, and value.
    """
    field, check, value = __extract_params(rule)
    return df.filter(dayofweek(col(field)) != 2).withColumn(
        "dq_status", concat(lit(field), lit(":"), lit(check), lit(":"), lit(value))
    )

is_on_saturday(df, rule)

Filters a PySpark DataFrame to include only rows where the specified date field falls on a Saturday.

Parameters:

Name Type Description Default
df DataFrame

The input DataFrame to filter.

required
rule dict

A dictionary containing rule parameters. The function expects the rule to include: - 'field': The name of the column to check. - 'check': A string representing the check being performed (not used in logic, but included in the output column). - 'value': A value to include in the output column (not used in logic, but included in the output column).

required

Returns:

Type Description

pyspark.sql.DataFrame: A new DataFrame filtered to include only rows where the specified field falls on a Saturday.

Additionally, a new column "dq_status" is added, containing a string in the format "field:check:value".

Source code in sumeh/engine/pyspark_engine.py
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
def is_on_saturday(df, rule: dict):
    """
    Filters a PySpark DataFrame to include only rows where the specified date field falls on a Saturday.

    Args:
        df (pyspark.sql.DataFrame): The input DataFrame to filter.
        rule (dict): A dictionary containing rule parameters. The function expects the rule to include:
            - 'field': The name of the column to check.
            - 'check': A string representing the check being performed (not used in logic, but included in the output column).
            - 'value': A value to include in the output column (not used in logic, but included in the output column).

    Returns:
        pyspark.sql.DataFrame: A new DataFrame filtered to include only rows where the specified field falls on a Saturday.
        Additionally, a new column "dq_status" is added, containing a string in the format "field:check:value".
    """
    field, check, value = __extract_params(rule)
    return df.filter(dayofweek(col(field)) != 7).withColumn(
        "dq_status", concat(lit(field), lit(":"), lit(check), lit(":"), lit(value))
    )

is_on_sunday(df, rule)

Filters a PySpark DataFrame to include only rows where the specified date field falls on a Sunday.

Parameters:

Name Type Description Default
df DataFrame

The input DataFrame to filter.

required
rule dict

A dictionary containing the rule parameters. It is expected to include: - field (str): The name of the column to check. - check (str): A descriptive string for the check being performed. - value (str): A value to include in the "dq_status" column for context.

required

Returns:

Type Description

pyspark.sql.DataFrame: A new DataFrame filtered to include only rows where the specified

date field corresponds to a Sunday. Additionally, a "dq_status" column is added to the

DataFrame, containing a string in the format "field:check:value".

Source code in sumeh/engine/pyspark_engine.py
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
def is_on_sunday(df, rule: dict):
    """
    Filters a PySpark DataFrame to include only rows where the specified date field falls on a Sunday.

    Args:
        df (pyspark.sql.DataFrame): The input DataFrame to filter.
        rule (dict): A dictionary containing the rule parameters. It is expected to include:
            - field (str): The name of the column to check.
            - check (str): A descriptive string for the check being performed.
            - value (str): A value to include in the "dq_status" column for context.

    Returns:
        pyspark.sql.DataFrame: A new DataFrame filtered to include only rows where the specified
        date field corresponds to a Sunday. Additionally, a "dq_status" column is added to the
        DataFrame, containing a string in the format "field:check:value".
    """
    field, check, value = __extract_params(rule)
    return df.filter(dayofweek(col(field)) != 1).withColumn(
        "dq_status", concat(lit(field), lit(":"), lit(check), lit(":"), lit(value))
    )

is_on_thursday(df, rule)

Filters a PySpark DataFrame to include only rows where the specified date column falls on a Thursday.

Parameters:

Name Type Description Default
df DataFrame

The PySpark DataFrame to filter.

required
rule dict

A dictionary containing the rule parameters. It is expected to include: - 'field': The name of the column to check. - 'check': A string representing the type of check (not used in the filtering logic). - 'value': A value associated with the rule (not used in the filtering logic).

required

Returns:

Name Type Description
DataFrame

A new PySpark DataFrame filtered to include only rows where the specified column's day of the week is Thursday. Additionally, a new column "dq_status" is added, containing a concatenated string of the field, check, and value.

Source code in sumeh/engine/pyspark_engine.py
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
def is_on_thursday(df, rule: dict):
    """
    Filters a PySpark DataFrame to include only rows where the specified date column falls on a Thursday.

    Args:
        df (DataFrame): The PySpark DataFrame to filter.
        rule (dict): A dictionary containing the rule parameters. It is expected to include:
            - 'field': The name of the column to check.
            - 'check': A string representing the type of check (not used in the filtering logic).
            - 'value': A value associated with the rule (not used in the filtering logic).

    Returns:
        DataFrame: A new PySpark DataFrame filtered to include only rows where the specified column's day of the week is Thursday.
                   Additionally, a new column "dq_status" is added, containing a concatenated string of the field, check, and value.
    """
    field, check, value = __extract_params(rule)
    return df.filter(dayofweek(col(field)) != 5).withColumn(
        "dq_status", concat(lit(field), lit(":"), lit(check), lit(":"), lit(value))
    )

is_on_tuesday(df, rule)

Filters a PySpark DataFrame to include only rows where the day of the week for a specified date column is Tuesday. Adds a new column 'dq_status' to indicate the validation status.

Parameters:

Name Type Description Default
df DataFrame

The input PySpark DataFrame.

required
rule dict

A dictionary containing the rule parameters. It is expected to include: - 'field': The name of the column to check. - 'check': A string describing the check being performed. - 'value': A value associated with the check.

required

Returns:

Type Description

pyspark.sql.DataFrame: A new DataFrame filtered to include only rows

where the specified column corresponds to Tuesday, with an additional

'dq_status' column describing the validation status.

Source code in sumeh/engine/pyspark_engine.py
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
def is_on_tuesday(df, rule: dict):
    """
    Filters a PySpark DataFrame to include only rows where the day of the week
    for a specified date column is Tuesday. Adds a new column 'dq_status' to
    indicate the validation status.

    Args:
        df (pyspark.sql.DataFrame): The input PySpark DataFrame.
        rule (dict): A dictionary containing the rule parameters. It is expected
            to include:
            - 'field': The name of the column to check.
            - 'check': A string describing the check being performed.
            - 'value': A value associated with the check.

    Returns:
        pyspark.sql.DataFrame: A new DataFrame filtered to include only rows
        where the specified column corresponds to Tuesday, with an additional
        'dq_status' column describing the validation status.
    """
    field, check, value = __extract_params(rule)
    return df.filter(dayofweek(col(field)) != 3).withColumn(
        "dq_status", concat(lit(field), lit(":"), lit(check), lit(":"), lit(value))
    )

is_on_wednesday(df, rule)

Filters a PySpark DataFrame to include only rows where the specified date field falls on a Wednesday.

Parameters:

Name Type Description Default
df DataFrame

The input PySpark DataFrame.

required
rule dict

A dictionary containing the rule parameters. It is expected to have the following keys: - 'field': The name of the column in the DataFrame to check. - 'check': A string representing the type of check (not used in the logic but included for status reporting). - 'value': A value associated with the rule (not used in the logic but included for status reporting).

required

Returns:

Type Description

pyspark.sql.DataFrame: A new DataFrame filtered to include only rows where the specified field corresponds to a Wednesday.

Additionally, a new column 'dq_status' is added, which contains a string in the format "field:check:value".

Source code in sumeh/engine/pyspark_engine.py
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
def is_on_wednesday(df, rule: dict):
    """
    Filters a PySpark DataFrame to include only rows where the specified date field falls on a Wednesday.

    Args:
        df (pyspark.sql.DataFrame): The input PySpark DataFrame.
        rule (dict): A dictionary containing the rule parameters. It is expected to have the following keys:
            - 'field': The name of the column in the DataFrame to check.
            - 'check': A string representing the type of check (not used in the logic but included for status reporting).
            - 'value': A value associated with the rule (not used in the logic but included for status reporting).

    Returns:
        pyspark.sql.DataFrame: A new DataFrame filtered to include only rows where the specified field corresponds to a Wednesday.
        Additionally, a new column 'dq_status' is added, which contains a string in the format "field:check:value".
    """
    field, check, value = __extract_params(rule)
    return df.filter(dayofweek(col(field)) != 4).withColumn(
        "dq_status", concat(lit(field), lit(":"), lit(check), lit(":"), lit(value))
    )

is_on_weekday(df, rule)

Filters a PySpark DataFrame to include only rows where the specified date field falls on a weekday (Monday to Friday). Adds a new column 'dq_status' to indicate the rule applied.

Parameters:

Name Type Description Default
df DataFrame

The input PySpark DataFrame.

required
rule dict

A dictionary containing the rule parameters. It is expected to include the following keys: - 'field': The name of the column to check. - 'check': A string representing the type of check (used for logging). - 'value': A value associated with the rule (used for logging).

required

Returns:

Type Description

pyspark.sql.DataFrame: A new DataFrame filtered to include only rows where

the specified date field is a weekday, with an additional 'dq_status' column

describing the rule applied.

Source code in sumeh/engine/pyspark_engine.py
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
def is_on_weekday(df, rule: dict):
    """
    Filters a PySpark DataFrame to include only rows where the specified date field
    falls on a weekday (Monday to Friday). Adds a new column 'dq_status' to indicate
    the rule applied.

    Args:
        df (pyspark.sql.DataFrame): The input PySpark DataFrame.
        rule (dict): A dictionary containing the rule parameters. It is expected to
            include the following keys:
            - 'field': The name of the column to check.
            - 'check': A string representing the type of check (used for logging).
            - 'value': A value associated with the rule (used for logging).

    Returns:
        pyspark.sql.DataFrame: A new DataFrame filtered to include only rows where
        the specified date field is a weekday, with an additional 'dq_status' column
        describing the rule applied.
    """
    field, check, value = __extract_params(rule)
    return df.filter(
        (dayofweek(col(field)) == 1) | (dayofweek(col(field)) == 7)
    ).withColumn(
        "dq_status", concat(lit(field), lit(":"), lit(check), lit(":"), lit(value))
    )

is_on_weekend(df, rule)

Filters a PySpark DataFrame to include only rows where the specified date field falls on a weekend (Saturday or Sunday). Additionally, adds a new column 'dq_status' to indicate the rule applied.

Parameters:

Name Type Description Default
df DataFrame

The input PySpark DataFrame.

required
rule dict

A dictionary containing the rule parameters. It is expected to have the following keys: - 'field': The name of the date column to check. - 'check': A string representing the type of check (not used in logic). - 'value': A string representing the value to include in the 'dq_status' column.

required

Returns:

Type Description

pyspark.sql.DataFrame: A new DataFrame filtered to include only rows where

the specified date field is on a weekend, with an additional 'dq_status' column.

Source code in sumeh/engine/pyspark_engine.py
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
def is_on_weekend(df, rule: dict):
    """
    Filters a PySpark DataFrame to include only rows where the specified date field
    falls on a weekend (Saturday or Sunday). Additionally, adds a new column
    'dq_status' to indicate the rule applied.

    Args:
        df (pyspark.sql.DataFrame): The input PySpark DataFrame.
        rule (dict): A dictionary containing the rule parameters. It is expected
                     to have the following keys:
                     - 'field': The name of the date column to check.
                     - 'check': A string representing the type of check (not used in logic).
                     - 'value': A string representing the value to include in the 'dq_status' column.

    Returns:
        pyspark.sql.DataFrame: A new DataFrame filtered to include only rows where
        the specified date field is on a weekend, with an additional 'dq_status' column.
    """
    field, check, value = __extract_params(rule)
    return df.filter(
        (dayofweek(col(field)) != 1) | (dayofweek(col(field)) != 7)
    ).withColumn(
        "dq_status", concat(lit(field), lit(":"), lit(check), lit(":"), lit(value))
    )

is_past_date(df, rule)

Filters a DataFrame to identify rows where a specified field has a date lower than the current date and adds a "dq_status" column indicating the data quality rule applied.

Parameters:

Name Type Description Default
df DataFrame

The input PySpark DataFrame to be checked.

required
rule dict

A dictionary containing the data quality rule. It should include: - "field" (str): The name of the field to check for null values. - "check" (str): A description of the check being performed. - "value" (str): Additional information about the rule.

required

Returns:

Name Type Description
DataFrame DataFrame

A new DataFrame filtered to include only rows where the specified

DataFrame

field is null, with an additional "dq_status" column describing the rule.

Source code in sumeh/engine/pyspark_engine.py
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
def is_past_date(df: DataFrame, rule: dict) -> DataFrame:
    """
    Filters a DataFrame to identify rows where a specified field has a date lower than the current date and
    adds a "dq_status" column indicating the data quality rule applied.

    Args:
        df (DataFrame): The input PySpark DataFrame to be checked.
        rule (dict): A dictionary containing the data quality rule. It should include:
            - "field" (str): The name of the field to check for null values.
            - "check" (str): A description of the check being performed.
            - "value" (str): Additional information about the rule.

    Returns:
        DataFrame: A new DataFrame filtered to include only rows where the specified
        field is null, with an additional "dq_status" column describing the rule.
    """

    field, check, value = __extract_params(rule)
    return df.filter(col(field) < current_date()).withColumn(
        "dq_status", concat(lit(field), lit(":"), lit(check), lit(":"), lit(value))
    )

is_positive(df, rule)

Filters a DataFrame to identify rows where the specified field does not satisfy a positive check and adds a "dq_status" column with details of the rule applied.

Parameters:

Name Type Description Default
df DataFrame

The input PySpark DataFrame to be filtered.

required
rule dict

A dictionary containing the rule parameters. It should include: - "field" (str): The name of the column to check. - "check" (str): The type of check being performed (e.g., "positive"). - "value" (any): The value associated with the rule (not directly used in this function).

required

Returns:

Name Type Description
DataFrame DataFrame

A new DataFrame filtered to include only rows where the specified field is less than 0,

DataFrame

with an additional "dq_status" column describing the rule applied.

Source code in sumeh/engine/pyspark_engine.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def is_positive(df: DataFrame, rule: dict) -> DataFrame:
    """
    Filters a DataFrame to identify rows where the specified field does not satisfy a positive check
    and adds a "dq_status" column with details of the rule applied.

    Args:
        df (DataFrame): The input PySpark DataFrame to be filtered.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - "field" (str): The name of the column to check.
            - "check" (str): The type of check being performed (e.g., "positive").
            - "value" (any): The value associated with the rule (not directly used in this function).

    Returns:
        DataFrame: A new DataFrame filtered to include only rows where the specified field is less than 0,
        with an additional "dq_status" column describing the rule applied.
    """
    field, check, value = __extract_params(rule)
    return df.filter(col(field) < 0).withColumn(
        "dq_status", concat(lit(field), lit(":"), lit(check), lit(":"), lit(value))
    )

is_primary_key(df, rule)

Determines if a given DataFrame column or set of columns satisfies the primary key constraint.

A primary key constraint requires that the specified column(s) in the DataFrame have unique values.

Parameters:

Name Type Description Default
df DataFrame

The PySpark DataFrame to be checked.

required
rule dict

A dictionary containing the rules or specifications for identifying the primary key. Typically, this includes the column(s) to be checked for uniqueness.

required

Returns:

Name Type Description
bool

True if the specified column(s) in the DataFrame satisfy the primary key constraint, False otherwise.

Source code in sumeh/engine/pyspark_engine.py
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
def is_primary_key(df: DataFrame, rule: dict):
    """
    Determines if a given DataFrame column or set of columns satisfies the primary key constraint.

    A primary key constraint requires that the specified column(s) in the DataFrame have unique values.

    Args:
        df (DataFrame): The PySpark DataFrame to be checked.
        rule (dict): A dictionary containing the rules or specifications for identifying the primary key.
                     Typically, this includes the column(s) to be checked for uniqueness.

    Returns:
        bool: True if the specified column(s) in the DataFrame satisfy the primary key constraint, False otherwise.
    """
    return is_unique(df, rule)

is_t_minus_1(df, rule)

Filters the input DataFrame to include only rows where the specified field matches the date corresponding to "T-1" (yesterday). Adds a new column "dq_status" to indicate the rule applied.

Parameters:

Name Type Description Default
df DataFrame

The input DataFrame to be filtered.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field': The name of the column to be checked. - 'check': The type of check being performed (not used in filtering but included in "dq_status"). - 'value': The value associated with the check (not used in filtering but included in "dq_status").

required

Returns:

Type Description

pyspark.sql.DataFrame: A new DataFrame filtered by the rule and with an additional "dq_status" column.

Source code in sumeh/engine/pyspark_engine.py
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
def is_t_minus_1(df, rule: dict):
    """
    Filters the input DataFrame to include only rows where the specified field matches the date
    corresponding to "T-1" (yesterday). Adds a new column "dq_status" to indicate the rule applied.

    Args:
        df (pyspark.sql.DataFrame): The input DataFrame to be filtered.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field': The name of the column to be checked.
            - 'check': The type of check being performed (not used in filtering but included in "dq_status").
            - 'value': The value associated with the check (not used in filtering but included in "dq_status").

    Returns:
        pyspark.sql.DataFrame: A new DataFrame filtered by the rule and with an additional "dq_status" column.
    """
    field, check, value = __extract_params(rule)
    target = date_sub(current_date(), 1)
    return df.filter(col(field) != target).withColumn(
        "dq_status", concat(lit(field), lit(":"), lit(check), lit(":"), lit(value))
    )

is_t_minus_2(df, rule)

Filters the input DataFrame to include only rows where the specified field matches the date that is two days prior to the current date. Adds a new column 'dq_status' to indicate the data quality status.

Parameters:

Name Type Description Default
df DataFrame

The input DataFrame to be filtered.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field': The name of the column to be checked. - 'check': A string representing the type of check (not used in filtering). - 'value': A value associated with the check (not used in filtering).

required

Returns:

Type Description

pyspark.sql.DataFrame: A new DataFrame filtered by the rule and with an additional

'dq_status' column indicating the field, check, and value.

Source code in sumeh/engine/pyspark_engine.py
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
def is_t_minus_2(df, rule: dict):
    """
    Filters the input DataFrame to include only rows where the specified field matches the date
    that is two days prior to the current date. Adds a new column 'dq_status' to indicate the
    data quality status.

    Args:
        df (pyspark.sql.DataFrame): The input DataFrame to be filtered.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field': The name of the column to be checked.
            - 'check': A string representing the type of check (not used in filtering).
            - 'value': A value associated with the check (not used in filtering).

    Returns:
        pyspark.sql.DataFrame: A new DataFrame filtered by the rule and with an additional
        'dq_status' column indicating the field, check, and value.
    """
    field, check, value = __extract_params(rule)
    target = date_sub(current_date(), 2)
    return df.filter(col(field) != target).withColumn(
        "dq_status", concat(lit(field), lit(":"), lit(check), lit(":"), lit(value))
    )

is_t_minus_3(df, rule)

Filters the input DataFrame to include only rows where the specified field matches the date that is three days prior to the current date. Adds a new column 'dq_status' to indicate the data quality status.

Parameters:

Name Type Description Default
df DataFrame

The input DataFrame to be filtered.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field': The name of the column to be checked. - 'check': A string representing the type of check (not used in filtering). - 'value': A value associated with the rule (not used in filtering).

required

Returns:

Type Description

pyspark.sql.DataFrame: A new DataFrame filtered by the rule and with an

additional 'dq_status' column.

Source code in sumeh/engine/pyspark_engine.py
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
def is_t_minus_3(df, rule: dict):
    """
    Filters the input DataFrame to include only rows where the specified field matches
    the date that is three days prior to the current date. Adds a new column 'dq_status'
    to indicate the data quality status.

    Args:
        df (pyspark.sql.DataFrame): The input DataFrame to be filtered.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field': The name of the column to be checked.
            - 'check': A string representing the type of check (not used in filtering).
            - 'value': A value associated with the rule (not used in filtering).

    Returns:
        pyspark.sql.DataFrame: A new DataFrame filtered by the rule and with an
        additional 'dq_status' column.
    """
    field, check, value = __extract_params(rule)
    target = date_sub(current_date(), 3)
    return df.filter(col(field) != target).withColumn(
        "dq_status", concat(lit(field), lit(":"), lit(check), lit(":"), lit(value))
    )

is_today(df, rule)

Filters a DataFrame to include only rows where the specified field matches the current date.

Parameters:

Name Type Description Default
df DataFrame

The input DataFrame to filter.

required
rule dict

A dictionary containing the rule parameters. It is expected to have the following keys: - 'field': The name of the column to check. - 'check': A string representing the type of check (not used in this function). - 'value': A value associated with the rule (not used in this function).

required

Returns:

Type Description

pyspark.sql.DataFrame: A new DataFrame filtered by the current date and with an additional column "dq_status" indicating the rule applied in the format "field:check:value".

Source code in sumeh/engine/pyspark_engine.py
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
def is_today(df, rule: dict):
    """
    Filters a DataFrame to include only rows where the specified field matches the current date.

    Args:
        df (pyspark.sql.DataFrame): The input DataFrame to filter.
        rule (dict): A dictionary containing the rule parameters. It is expected to have
                     the following keys:
                     - 'field': The name of the column to check.
                     - 'check': A string representing the type of check (not used in this function).
                     - 'value': A value associated with the rule (not used in this function).

    Returns:
        pyspark.sql.DataFrame: A new DataFrame filtered by the current date and with an additional
                               column "dq_status" indicating the rule applied in the format
                               "field:check:value".
    """
    field, check, value = __extract_params(rule)
    today = current_date()
    return df.filter(col(field) != today).withColumn(
        "dq_status", concat(lit(field), lit(":"), lit(check), lit(":"), lit(value))
    )

is_unique(df, rule)

Checks for uniqueness of a specified field in a PySpark DataFrame based on the given rule.

This function identifies rows where the specified field is not unique within the DataFrame. It adds a new column dq_status to the resulting DataFrame, which contains information about the field, the check type, and the value from the rule.

Parameters:

Name Type Description Default
df DataFrame

The input PySpark DataFrame to check for uniqueness.

required
rule dict

A dictionary containing the rule parameters. It should include: - field (str): The name of the field to check for uniqueness. - check (str): The type of check being performed (e.g., "unique"). - value (str): Additional value or metadata related to the check.

required

Returns:

Name Type Description
DataFrame DataFrame

A new DataFrame containing rows where the specified field is not unique.

DataFrame

The resulting DataFrame includes a dq_status column with details about the rule violation.

Example

rule = {"field": "column_name", "check": "unique", "value": "some_value"} result_df = is_unique(input_df, rule)

Source code in sumeh/engine/pyspark_engine.py
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
def is_unique(df: DataFrame, rule: dict) -> DataFrame:
    """
    Checks for uniqueness of a specified field in a PySpark DataFrame based on the given rule.

    This function identifies rows where the specified field is not unique within the DataFrame.
    It adds a new column `dq_status` to the resulting DataFrame, which contains information
    about the field, the check type, and the value from the rule.

    Args:
        df (DataFrame): The input PySpark DataFrame to check for uniqueness.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - `field` (str): The name of the field to check for uniqueness.
            - `check` (str): The type of check being performed (e.g., "unique").
            - `value` (str): Additional value or metadata related to the check.

    Returns:
        DataFrame: A new DataFrame containing rows where the specified field is not unique.
        The resulting DataFrame includes a `dq_status` column with details about the rule violation.

    Example:
        rule = {"field": "column_name", "check": "unique", "value": "some_value"}
        result_df = is_unique(input_df, rule)
    """
    field, check, value = __extract_params(rule)
    window = Window.partitionBy(col(field))
    df_with_count = df.withColumn("count", count(col(field)).over(window))
    res = (
        df_with_count.filter(col("count") > 1)
        .withColumn(
            "dq_status", concat(lit(field), lit(":"), lit(check), lit(":"), lit(value))
        )
        .drop("count")
    )
    return res

is_yesterday(df, rule)

Filters a PySpark DataFrame to include only rows where the specified field matches yesterday's date. Adds a new column 'dq_status' to indicate the data quality status.

Parameters:

Name Type Description Default
df DataFrame

The input PySpark DataFrame.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field': The name of the column to check. - 'check': The type of check being performed (used for status message). - 'value': Additional value information (used for status message).

required

Returns:

Type Description

pyspark.sql.DataFrame: A filtered DataFrame with an additional 'dq_status' column.

Source code in sumeh/engine/pyspark_engine.py
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
def is_yesterday(df, rule: dict):
    """
    Filters a PySpark DataFrame to include only rows where the specified field matches yesterday's date.
    Adds a new column 'dq_status' to indicate the data quality status.

    Args:
        df (pyspark.sql.DataFrame): The input PySpark DataFrame.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field': The name of the column to check.
            - 'check': The type of check being performed (used for status message).
            - 'value': Additional value information (used for status message).

    Returns:
        pyspark.sql.DataFrame: A filtered DataFrame with an additional 'dq_status' column.
    """
    field, check, value = __extract_params(rule)
    yesterday = date_sub(current_date(), 1)
    return df.filter(col(field) != yesterday).withColumn(
        "dq_status", concat(lit(field), lit(":"), lit(check), lit(":"), lit(value))
    )

not_contained_in(df, rule)

Filters rows in a DataFrame where the specified field's value is in a given list and adds a column indicating the data quality status.

Parameters:

Name Type Description Default
df DataFrame

The input PySpark DataFrame to filter.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field': The name of the column to check. - 'check': A string representing the type of check (e.g., "not_contained_in"). - 'value': A string representation of a list (e.g., "[value1,value2,...]") containing the values to check against.

required

Returns:

Name Type Description
DataFrame DataFrame

A new DataFrame with rows filtered based on the rule and an

DataFrame

additional column "dq_status" indicating the data quality status in the

DataFrame

format "field:check:value".

Source code in sumeh/engine/pyspark_engine.py
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
def not_contained_in(df: DataFrame, rule: dict) -> DataFrame:
    """
    Filters rows in a DataFrame where the specified field's value is in a given list
    and adds a column indicating the data quality status.

    Args:
        df (DataFrame): The input PySpark DataFrame to filter.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field': The name of the column to check.
            - 'check': A string representing the type of check (e.g., "not_contained_in").
            - 'value': A string representation of a list (e.g., "[value1,value2,...]")
              containing the values to check against.

    Returns:
        DataFrame: A new DataFrame with rows filtered based on the rule and an
        additional column "dq_status" indicating the data quality status in the
        format "field:check:value".
    """
    field, check, value = __extract_params(rule)
    negative_list = value.strip("[]").split(",")
    return df.filter(col(field).isin(negative_list)).withColumn(
        "dq_status", concat(lit(field), lit(":"), lit(check), lit(":"), lit(value))
    )

not_in(df, rule)

Filters rows in a DataFrame where the specified rule is not contained.

This function delegates the operation to the not_contained_in function.

Parameters:

Name Type Description Default
df DataFrame

The input DataFrame to be filtered.

required
rule dict

A dictionary specifying the rule to apply for filtering.

required

Returns:

Name Type Description
DataFrame DataFrame

A new DataFrame with rows that do not match the specified rule.

Source code in sumeh/engine/pyspark_engine.py
713
714
715
716
717
718
719
720
721
722
723
724
725
726
def not_in(df: DataFrame, rule: dict) -> DataFrame:
    """
    Filters rows in a DataFrame where the specified rule is not contained.

    This function delegates the operation to the `not_contained_in` function.

    Args:
        df (DataFrame): The input DataFrame to be filtered.
        rule (dict): A dictionary specifying the rule to apply for filtering.

    Returns:
        DataFrame: A new DataFrame with rows that do not match the specified rule.
    """
    return not_contained_in(df, rule)

satisfies(df, rule)

Filters a PySpark DataFrame based on a rule and adds a data quality status column.

Parameters:

Name Type Description Default
df DataFrame

The input PySpark DataFrame to be filtered.

required
rule dict

A dictionary containing the filtering rule. It should include: - 'field': The name of the column to apply the filter on. - 'check': The type of check to perform (currently unused in this implementation). - 'value': The expression in the pattern of pyspark.sql.functions.expr.

required

Returns:

Name Type Description
DataFrame DataFrame

A new DataFrame filtered based on the rule, with an additional column

DataFrame

"dq_status" that describes the rule applied in the format "field:check:value".

Source code in sumeh/engine/pyspark_engine.py
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
def satisfies(df: DataFrame, rule: dict) -> DataFrame:
    """
    Filters a PySpark DataFrame based on a rule and adds a data quality status column.

    Args:
        df (DataFrame): The input PySpark DataFrame to be filtered.
        rule (dict): A dictionary containing the filtering rule. It should include:
            - 'field': The name of the column to apply the filter on.
            - 'check': The type of check to perform (currently unused in this implementation).
            - 'value': The expression in the pattern of pyspark.sql.functions.expr.

    Returns:
        DataFrame: A new DataFrame filtered based on the rule, with an additional column
        "dq_status" that describes the rule applied in the format "field:check:value".
    """
    field, check, value = __extract_params(rule)
    expression = expr(value)
    return df.filter(~expression).withColumn(
        "dq_status", concat(lit(field), lit(":"), lit(check), lit(":"), lit(value))
    )

summarize(df, rules, total_rows)

Summarizes data quality results based on provided rules and total rows.

This function processes a DataFrame containing data quality statuses, applies rules to calculate violations, and generates a summary DataFrame with metrics such as pass rate, status, and other relevant information.

Parameters:

Name Type Description Default
df DataFrame

The input DataFrame containing a column dq_status with data quality statuses in the format "column:rule:value".

required
rules List[Dict]

A list of dictionaries representing the data quality rules. Each dictionary should define the column, rule, and optional value and pass_threshold.

required
total_rows int

The total number of rows in the input DataFrame.

required

Returns:

Name Type Description
DataFrame DataFrame

A summary DataFrame containing the following columns: - id: A unique identifier for each row. - timestamp: The timestamp when the summary was generated. - check: The type of check performed (e.g., "Quality Check"). - level: The severity level of the check (e.g., "WARNING"). - column: The column name associated with the rule. - rule: The rule applied to the column. - value: The value associated with the rule. - rows: The total number of rows in the input DataFrame. - violations: The number of rows that violated the rule. - pass_rate: The percentage of rows that passed the rule. - pass_threshold: The threshold for passing the rule. - status: The overall status of the rule (e.g., "PASS" or "FAIL").

Source code in sumeh/engine/pyspark_engine.py
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
def summarize(df: DataFrame, rules: List[Dict], total_rows) -> DataFrame:
    """
    Summarizes data quality results based on provided rules and total rows.

    This function processes a DataFrame containing data quality statuses, applies
    rules to calculate violations, and generates a summary DataFrame with metrics
    such as pass rate, status, and other relevant information.

    Args:
        df (DataFrame): The input DataFrame containing a column `dq_status` with
            data quality statuses in the format "column:rule:value".
        rules (List[Dict]): A list of dictionaries representing the data quality
            rules. Each dictionary should define the `column`, `rule`, and optional
            `value` and `pass_threshold`.
        total_rows (int): The total number of rows in the input DataFrame.

    Returns:
        DataFrame: A summary DataFrame containing the following columns:
            - id: A unique identifier for each row.
            - timestamp: The timestamp when the summary was generated.
            - check: The type of check performed (e.g., "Quality Check").
            - level: The severity level of the check (e.g., "WARNING").
            - column: The column name associated with the rule.
            - rule: The rule applied to the column.
            - value: The value associated with the rule.
            - rows: The total number of rows in the input DataFrame.
            - violations: The number of rows that violated the rule.
            - pass_rate: The percentage of rows that passed the rule.
            - pass_threshold: The threshold for passing the rule.
            - status: The overall status of the rule (e.g., "PASS" or "FAIL").
    """
    now_ts = current_timestamp()

    viol_df = (
        df.filter(trim(col("dq_status")) != lit(""))
        .withColumn("dq_status", split(trim(col("dq_status")), ":"))
        .withColumn("column", col("dq_status")[0])
        .withColumn("rule", col("dq_status")[1])
        .withColumn("value", col("dq_status")[2])
        .groupBy("column", "rule", "value")
        .agg(count("*").alias("violations"))
        .withColumn(
            "value",
            coalesce(
                when(col("value") == "", None).otherwise(col("value")), lit("N/A")
            ),
        )
    )

    rules_df = __rules_to_df(rules).withColumn(
        "value", coalesce(col("value"), lit("N/A"))
    )

    base = (
        broadcast(rules_df)
        .join(viol_df, ["column", "rule", "value"], how="left")
        .withColumn("violations", coalesce(col("violations"), lit(0)))
    )

    summary = (
        base.withColumn("rows", lit(total_rows))
        .withColumn(
            "pass_rate", (lit(total_rows) - col("violations")) / lit(total_rows)
        )
        .withColumn(
            "status",
            when(col("pass_rate") >= col("pass_threshold"), "PASS").otherwise("FAIL"),
        )
        .withColumn("timestamp", now_ts)
        .withColumn("check", lit("Quality Check"))
        .withColumn("level", lit("WARNING"))
    )

    summary = summary.withColumn("id", expr("uuid()"))
    summary = summary.select(
        "id",
        "timestamp",
        "check",
        "level",
        "column",
        "rule",
        "value",
        "rows",
        "violations",
        "pass_rate",
        "pass_threshold",
        "status",
    )

    return summary

validate(df, rules)

Validates a DataFrame against a set of rules and returns the validation results.

This function applies a series of validation rules to the input DataFrame. Each rule is expected to be a dictionary containing the parameters required for validation. The function generates two DataFrames as output: 1. A summarized result DataFrame with aggregated validation statuses. 2. A raw result DataFrame containing detailed validation results.

Parameters:

Name Type Description Default
df DataFrame

The input PySpark DataFrame to validate.

required
rules list[dict]

A list of dictionaries, where each dictionary defines a validation rule. Each rule should include the following keys: - field (str): The column name to validate. - rule_name (str): The name of the validation function to apply. - value (any): The value or parameter required by the validation function.

required

Returns:

Type Description
Tuple[DataFrame, DataFrame]

Tuple[DataFrame, DataFrame]: A tuple containing: - result (DataFrame): A DataFrame with aggregated validation statuses. - raw_result (DataFrame): A DataFrame with detailed validation results.

Raises:

Type Description
KeyError

If a rule references a validation function that does not exist in the global scope.

Notes
  • The dq_status column is used to store validation statuses.
  • The function assumes that the validation functions are defined in the global scope and are accessible by their names.
  • The concat_ws function is used to concatenate multiple validation statuses into a single string for each record in the summarized result.
Example

from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() df = spark.createDataFrame([(1, "Alice"), (2, "Bob")], ["id", "name"]) rules = [{"field": "id", "rule_name": "validate_positive", "value": None}] result, raw_result = validate(df, rules)

Source code in sumeh/engine/pyspark_engine.py
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
def validate(df: DataFrame, rules: list[dict]) -> Tuple[DataFrame, DataFrame]:
    """
    Validates a DataFrame against a set of rules and returns the validation results.

    This function applies a series of validation rules to the input DataFrame. Each rule
    is expected to be a dictionary containing the parameters required for validation.
    The function generates two DataFrames as output:
    1. A summarized result DataFrame with aggregated validation statuses.
    2. A raw result DataFrame containing detailed validation results.

    Args:
        df (DataFrame): The input PySpark DataFrame to validate.
        rules (list[dict]): A list of dictionaries, where each dictionary defines a validation rule.
            Each rule should include the following keys:
            - `field` (str): The column name to validate.
            - `rule_name` (str): The name of the validation function to apply.
            - `value` (any): The value or parameter required by the validation function.

    Returns:
        Tuple[DataFrame, DataFrame]: A tuple containing:
            - result (DataFrame): A DataFrame with aggregated validation statuses.
            - raw_result (DataFrame): A DataFrame with detailed validation results.

    Raises:
        KeyError: If a rule references a validation function that does not exist in the global scope.

    Warnings:
        If a rule references an unknown validation function, a warning is issued.

    Notes:
        - The `dq_status` column is used to store validation statuses.
        - The function assumes that the validation functions are defined in the global scope
          and are accessible by their names.
        - The `concat_ws` function is used to concatenate multiple validation statuses
          into a single string for each record in the summarized result.

    Example:
        >>> from pyspark.sql import SparkSession
        >>> spark = SparkSession.builder.getOrCreate()
        >>> df = spark.createDataFrame([(1, "Alice"), (2, "Bob")], ["id", "name"])
        >>> rules = [{"field": "id", "rule_name": "validate_positive", "value": None}]
        >>> result, raw_result = validate(df, rules)
    """
    df = df.withColumn("dq_status", lit(""))
    raw_result = df.limit(0)
    for rule in rules:
        field, rule_name, value = __extract_params(rule)
        try:
            rule_func = globals()[rule_name]
            raw_result = raw_result.unionByName(rule_func(df, rule))
        except KeyError:
            warnings.warn(f"Unknown rule name: {rule_name}, {field}")
    group_columns = [c for c in df.columns if c != "dq_status"]
    result = raw_result.groupBy(*group_columns).agg(
        concat_ws(";", collect_list("dq_status")).alias("dq_status")
    )
    return result, raw_result

validate_date_format(df, rule)

Filters a DataFrame to identify rows where a specified field has wrong date format based in the format from the rule and adds a "dq_status" column indicating the data quality rule applied.

YYYY = full year, ex: 2012; YY = only second part of the year, ex: 12; MM = Month number (1-12); DD = Day (1-31);

Parameters:

Name Type Description Default
df DataFrame

The input PySpark DataFrame to be checked.

required
rule dict

A dictionary containing the data quality rule. It should include: - "field" (str): The name of the field to check for null values. - "check" (str): A description of the check being performed. - "value" (str): Additional information about the rule.

required

Returns:

Name Type Description
DataFrame DataFrame

A new DataFrame filtered to include only rows where the specified

DataFrame

field is null, with an additional "dq_status" column describing the rule.

Source code in sumeh/engine/pyspark_engine.py
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
def validate_date_format(df: DataFrame, rule: dict) -> DataFrame:
    """
    Filters a DataFrame to identify rows where a specified field has wrong date format based in the format from the rule
    and adds a "dq_status" column indicating the data quality rule applied.

    YYYY = full year, ex: 2012;
    YY = only second part of the year, ex: 12;
    MM = Month number (1-12);
    DD = Day (1-31);

    Args:
        df (DataFrame): The input PySpark DataFrame to be checked.
        rule (dict): A dictionary containing the data quality rule. It should include:
            - "field" (str): The name of the field to check for null values.
            - "check" (str): A description of the check being performed.
            - "value" (str): Additional information about the rule.

    Returns:
        DataFrame: A new DataFrame filtered to include only rows where the specified
        field is null, with an additional "dq_status" column describing the rule.
    """

    field, check, date_format = __extract_params(rule)

    date_regex = __transform_date_format_in_pattern(date_format)

    return df.filter(~col(field).rlike(date_regex) | col(field).isNull()).withColumn(
        "dq_status",
        concat(lit(field), lit(":"), lit(check), lit(":"), lit(date_format)),
    )

validate_schema(df, expected)

Validates the schema of a PySpark DataFrame against an expected schema.

Parameters:

Name Type Description Default
df DataFrame

The PySpark DataFrame whose schema is to be validated.

required
expected list

The expected schema represented as a list of tuples, where each tuple contains the column name and its data type and a boolean, if the column is nullable or not.

required

Returns:

Type Description
Tuple[bool, List[Tuple[str, str]]]

Tuple[bool, List[Tuple[str, str]]]: A tuple containing: - A boolean indicating whether the schema matches the expected schema. - A list of tuples representing the mismatched columns, where each tuple contains the column name and the reason for the mismatch.

Source code in sumeh/engine/pyspark_engine.py
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
def validate_schema(df: DataFrame, expected) -> Tuple[bool, List[Tuple[str, str]]]:
    """
    Validates the schema of a PySpark DataFrame against an expected schema.

    Args:
        df (DataFrame): The PySpark DataFrame whose schema is to be validated.
        expected (list): The expected schema represented as a list of tuples,
                         where each tuple contains the column name and its data type
                         and a boolean, if the column is nullable or not.

    Returns:
        Tuple[bool, List[Tuple[str, str]]]: A tuple containing:
            - A boolean indicating whether the schema matches the expected schema.
            - A list of tuples representing the mismatched columns, where each tuple
              contains the column name and the reason for the mismatch.
    """
    actual = __pyspark_schema_to_list(df)
    result, errors = __compare_schemas(actual, expected)
    return result, errors