Skip to content

Module sumeh.engine.dask_engine

This module provides a set of data quality validation functions for Dask DataFrames. It includes various checks such as completeness, uniqueness, value range, patterns, and schema validation. The module also provides utilities for summarizing validation results and schema comparison.

Functions:

Name Description
is_positive

dd.DataFrame, rule: dict) -> dd.DataFrame:

is_negative

dd.DataFrame, rule: dict) -> dd.DataFrame:

is_in_millions

dd.DataFrame, rule: dict) -> dd.DataFrame:

is_in_billions

dd.DataFrame, rule: dict) -> dd.DataFrame:

is_t_minus_1

dd.DataFrame, rule: dict) -> dd.DataFrame:

is_t_minus_2

dd.DataFrame, rule: dict) -> dd.DataFrame:

is_t_minus_3

dd.DataFrame, rule: dict) -> dd.DataFrame:

is_today

dd.DataFrame, rule: dict) -> dd.DataFrame:

is_yesterday

dd.DataFrame, rule: dict) -> dd.DataFrame:

is_on_weekday

dd.DataFrame, rule: dict) -> dd.DataFrame:

is_on_weekend

dd.DataFrame, rule: dict) -> dd.DataFrame:

is_on_monday

dd.DataFrame, rule: dict) -> dd.DataFrame:

is_on_tuesday

dd.DataFrame, rule: dict) -> dd.DataFrame:

is_on_wednesday

dd.DataFrame, rule: dict) -> dd.DataFrame:

is_on_thursday

dd.DataFrame, rule: dict) -> dd.DataFrame:

is_on_friday

dd.DataFrame, rule: dict) -> dd.DataFrame:

is_on_saturday

dd.DataFrame, rule: dict) -> dd.DataFrame:

is_on_sunday

dd.DataFrame, rule: dict) -> dd.DataFrame:

is_complete

dd.DataFrame, rule: dict) -> dd.DataFrame:

is_unique

dd.DataFrame, rule: dict) -> dd.DataFrame:

are_complete

dd.DataFrame, rule: dict) -> dd.DataFrame:

are_unique

dd.DataFrame, rule: dict) -> dd.DataFrame:

is_greater_than

dd.DataFrame, rule: dict) -> dd.DataFrame:

is_greater_or_equal_than

dd.DataFrame, rule: dict) -> dd.DataFrame:

is_less_than

dd.DataFrame, rule: dict) -> dd.DataFrame:

is_less_or_equal_than

dd.DataFrame, rule: dict) -> dd.DataFrame:

is_equal

dd.DataFrame, rule: dict) -> dd.DataFrame:

is_equal_than

dd.DataFrame, rule: dict) -> dd.DataFrame:

is_contained_in

dd.DataFrame, rule: dict) -> dd.DataFrame:

not_contained_in

dd.DataFrame, rule: dict) -> dd.DataFrame:

is_between

dd.DataFrame, rule: dict) -> dd.DataFrame:

has_pattern

dd.DataFrame, rule: dict) -> dd.DataFrame:

is_legit

dd.DataFrame, rule: dict) -> dd.DataFrame:

is_primary_key

dd.DataFrame, rule: dict) -> dd.DataFrame:

is_composite_key

dd.DataFrame, rule: dict) -> dd.DataFrame:

has_max

dd.DataFrame, rule: dict) -> dd.DataFrame:

has_min

dd.DataFrame, rule: dict) -> dd.DataFrame:

has_std

dd.DataFrame, rule: dict) -> dd.DataFrame:

has_mean

dd.DataFrame, rule: dict) -> dd.DataFrame:

has_sum

dd.DataFrame, rule: dict) -> dd.DataFrame:

has_cardinality

dd.DataFrame, rule: dict) -> dd.DataFrame:

has_infogain

dd.DataFrame, rule: dict) -> dd.DataFrame:

has_entropy

dd.DataFrame, rule: dict) -> dd.DataFrame:

satisfies

dd.DataFrame, rule: dict) -> dd.DataFrame:

validate

dd.DataFrame, rules: list[dict]) -> tuple[dd.DataFrame, dd.DataFrame]:

summarize

dd.DataFrame, rules: list[dict], total_rows: int) -> pd.DataFrame:

validate_schema

dd.DataFrame, expected: List[Dict[str, Any]]) -> Tuple[bool, List[Tuple[str, str]]]:

__compare_schemas(actual, expected)

Compare two lists of schema definitions and identify discrepancies.

Parameters:

Name Type Description Default
actual List[SchemaDef]

The list of actual schema definitions.

required
expected List[SchemaDef]

The list of expected schema definitions.

required

Returns:

Type Description
bool

Tuple[bool, List[Tuple[str, str]]]: A tuple where the first element is a boolean indicating

List[Tuple[str, str]]

whether the schemas match (True if they match, False otherwise), and the second element

Tuple[bool, List[Tuple[str, str]]]

is a list of tuples describing the discrepancies. Each tuple contains: - The field name (str). - A description of the discrepancy (str), such as "missing", "type mismatch", "nullable but expected non-nullable", or "extra column".

Notes
  • A field is considered "missing" if it exists in the expected schema but not in the actual schema.
  • A "type mismatch" occurs if the data type of a field in the actual schema does not match the expected data type.
  • A field is considered "nullable but expected non-nullable" if it is nullable in the actual schema but not nullable in the expected schema.
  • An "extra column" is a field that exists in the actual schema but not in the expected schema.
Source code in sumeh/services/utils.py
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def __compare_schemas(
    actual: List[SchemaDef],
    expected: List[SchemaDef],
) -> Tuple[bool, List[Tuple[str, str]]]:
    """
    Compare two lists of schema definitions and identify discrepancies.

    Args:
        actual (List[SchemaDef]): The list of actual schema definitions.
        expected (List[SchemaDef]): The list of expected schema definitions.

    Returns:
        Tuple[bool, List[Tuple[str, str]]]: A tuple where the first element is a boolean indicating
        whether the schemas match (True if they match, False otherwise), and the second element
        is a list of tuples describing the discrepancies. Each tuple contains:
            - The field name (str).
            - A description of the discrepancy (str), such as "missing", "type mismatch",
              "nullable but expected non-nullable", or "extra column".

    Notes:
        - A field is considered "missing" if it exists in the expected schema but not in the actual schema.
        - A "type mismatch" occurs if the data type of a field in the actual schema does not match
          the expected data type.
        - A field is considered "nullable but expected non-nullable" if it is nullable in the actual
          schema but not nullable in the expected schema.
        - An "extra column" is a field that exists in the actual schema but not in the expected schema.
    """

    exp_map = {c["field"]: c for c in expected}
    act_map = {c["field"]: c for c in actual}

    erros: List[Tuple[str, str]] = []

    for fld, exp in exp_map.items():
        if fld not in act_map:
            erros.append((fld, "missing"))
            continue
        act = act_map[fld]
        if act["data_type"] != exp["data_type"]:
            erros.append(
                (
                    fld,
                    f"type mismatch (got {act['data_type']}, expected {exp['data_type']})",
                )
            )

        if act["nullable"] and not exp["nullable"]:
            erros.append((fld, "nullable but expected non-nullable"))

        if exp.get("max_length") is not None:
            pass

    # 2. campos extras (se quiser)
    extras = set(act_map) - set(exp_map)
    for fld in extras:
        erros.append((fld, "extra column"))

    return len(erros) == 0, erros

__convert_value(value)

Converts the provided value to the appropriate type (date, float, or int).

Depending on the format of the input value, it will be converted to a datetime object, a floating-point number (float), or an integer (int).

Parameters:

Name Type Description Default
value str

The value to be converted, represented as a string.

required

Returns:

Type Description

Union[datetime, float, int]: The converted value, which can be a datetime object, float, or int.

Raises:

Type Description
ValueError

If the value does not match an expected format.

Source code in sumeh/services/utils.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def __convert_value(value):
    """
    Converts the provided value to the appropriate type (date, float, or int).

    Depending on the format of the input value, it will be converted to a datetime object,
    a floating-point number (float), or an integer (int).

    Args:
        value (str): The value to be converted, represented as a string.

    Returns:
        Union[datetime, float, int]: The converted value, which can be a datetime object, float, or int.

    Raises:
        ValueError: If the value does not match an expected format.
    """
    from datetime import datetime

    value = value.strip()
    try:
        if "-" in value:
            return datetime.strptime(value, "%Y-%m-%d")
        else:
            return datetime.strptime(value, "%d/%m/%Y")
    except ValueError:
        if "." in value:
            return float(value)
        return int(value)

__dask_schema_to_list(df)

Convert the schema of a Dask DataFrame into a list of dictionaries.

Each dictionary in the resulting list represents a column in the DataFrame and contains metadata about the column, including its name, data type, nullability, and maximum length.

Parameters:

Name Type Description Default
df DataFrame

The Dask DataFrame whose schema is to be converted.

required

Returns:

Type Description
List[Dict[str, Any]]

List[Dict[str, Any]]: A list of dictionaries, where each dictionary

List[Dict[str, Any]]

contains the following keys: - "field" (str): The name of the column. - "data_type" (str): The data type of the column, converted to a lowercase string. - "nullable" (bool): Always set to True, indicating the column is nullable. - "max_length" (None): Always set to None, as maximum length is not determined.

Source code in sumeh/engine/dask_engine.py
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
def __dask_schema_to_list(df: dd.DataFrame) -> List[Dict[str, Any]]:
    """
    Convert the schema of a Dask DataFrame into a list of dictionaries.

    Each dictionary in the resulting list represents a column in the DataFrame
    and contains metadata about the column, including its name, data type,
    nullability, and maximum length.

    Args:
        df (dd.DataFrame): The Dask DataFrame whose schema is to be converted.

    Returns:
        List[Dict[str, Any]]: A list of dictionaries, where each dictionary
        contains the following keys:
            - "field" (str): The name of the column.
            - "data_type" (str): The data type of the column, converted to a lowercase string.
            - "nullable" (bool): Always set to True, indicating the column is nullable.
            - "max_length" (None): Always set to None, as maximum length is not determined.
    """
    return [
        {
            "field": col,
            "data_type": str(dtype).lower(),
            "nullable": True,
            "max_length": None,
        }
        for col, dtype in df.dtypes.items()
    ]

__extract_params(rule)

Source code in sumeh/services/utils.py
38
39
40
41
42
43
44
45
46
47
48
49
50
def __extract_params(rule: dict) -> tuple:
    rule_name = rule["check_type"]
    field = rule["field"]
    raw_value = rule.get("value")
    if isinstance(raw_value, str) and raw_value not in (None, "", "NULL"):
        try:
            value = __convert_value(raw_value)
        except ValueError:
            value = raw_value
    else:
        value = raw_value
    value = value if value not in (None, "", "NULL") else ""
    return field, rule_name, value

_rules_to_df(rules)

Converts a list of rule dictionaries into a pandas DataFrame.

Each rule dictionary is expected to have the following keys: - "field": The column(s) the rule applies to. Can be a string or a list of strings. - "check_type": The type of rule or check being applied. - "threshold" (optional): A numeric value representing the pass threshold. Defaults to 1.0 if not provided. - "value" (optional): Additional value associated with the rule. - "execute" (optional): A boolean indicating whether the rule should be executed. Defaults to True if not provided.

Rules with "execute" set to False are skipped. The resulting DataFrame contains unique rows based on the combination of "column" and "rule".

Parameters:

Name Type Description Default
rules list[dict]

A list of dictionaries representing the rules.

required

Returns:

Type Description
DataFrame

pd.DataFrame: A DataFrame with the following columns: - "column": The column(s) the rule applies to, joined by a comma if multiple. - "rule": The type of rule or check being applied. - "pass_threshold": The numeric pass threshold for the rule. - "value": Additional value associated with the rule, if any.

Source code in sumeh/engine/dask_engine.py
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
def _rules_to_df(rules: list[dict]) -> pd.DataFrame:
    """
    Converts a list of rule dictionaries into a pandas DataFrame.

    Each rule dictionary is expected to have the following keys:
    - "field": The column(s) the rule applies to. Can be a string or a list of strings.
    - "check_type": The type of rule or check being applied.
    - "threshold" (optional): A numeric value representing the pass threshold. Defaults to 1.0 if not provided.
    - "value" (optional): Additional value associated with the rule.
    - "execute" (optional): A boolean indicating whether the rule should be executed. Defaults to True if not provided.

    Rules with "execute" set to False are skipped. The resulting DataFrame contains unique rows based on the combination
    of "column" and "rule".

    Args:
        rules (list[dict]): A list of dictionaries representing the rules.

    Returns:
        pd.DataFrame: A DataFrame with the following columns:
            - "column": The column(s) the rule applies to, joined by a comma if multiple.
            - "rule": The type of rule or check being applied.
            - "pass_threshold": The numeric pass threshold for the rule.
            - "value": Additional value associated with the rule, if any.
    """
    rows = []
    for r in rules:
        if not r.get("execute", True):
            continue
        coln = ",".join(r["field"]) if isinstance(r["field"], list) else r["field"]
        rows.append(
            {
                "column": coln.strip(),
                "rule": r["check_type"],
                "pass_threshold": float(r.get("threshold") or 1.0),
                "value": r.get("value") or None,
            }
        )
    return pd.DataFrame(rows).drop_duplicates(["column", "rule"])

all_date_checks(df, rule)

Applies date validation checks on a Dask DataFrame based on the provided rule.

This function serves as an alias for the is_past_date function, which performs checks to determine if dates in the DataFrame meet the specified criteria.

Parameters:

Name Type Description Default
df DataFrame

The Dask DataFrame containing the data to be validated.

required
rule dict

A dictionary specifying the validation rules to be applied.

required

Returns:

Type Description
DataFrame

dd.DataFrame: A Dask DataFrame with the results of the date validation checks.

Source code in sumeh/engine/dask_engine.py
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
def all_date_checks(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Applies date validation checks on a Dask DataFrame based on the provided rule.

    This function serves as an alias for the `is_past_date` function, which performs
    checks to determine if dates in the DataFrame meet the specified criteria.

    Args:
        df (dd.DataFrame): The Dask DataFrame containing the data to be validated.
        rule (dict): A dictionary specifying the validation rules to be applied.

    Returns:
        dd.DataFrame: A Dask DataFrame with the results of the date validation checks.
    """
    return is_past_date(df, rule)

are_complete(df, rule)

Checks if the specified fields in a Dask DataFrame are complete (non-null) based on the provided rule and returns a DataFrame of violations.

Parameters:

Name Type Description Default
df DataFrame

The input Dask DataFrame to check for completeness.

required
rule dict

A dictionary containing the rule parameters. It should include the fields to check, the type of check, and the expected value.

required

Returns:

Type Description
DataFrame

dd.DataFrame: A DataFrame containing rows that violate the completeness

DataFrame

rule, with an additional column dq_status indicating the rule details

DataFrame

in the format "{fields}:{check}:{value}".

Source code in sumeh/engine/dask_engine.py
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
def are_complete(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Checks if the specified fields in a Dask DataFrame are complete (non-null)
    based on the provided rule and returns a DataFrame of violations.

    Args:
        df (dd.DataFrame): The input Dask DataFrame to check for completeness.
        rule (dict): A dictionary containing the rule parameters. It should
            include the fields to check, the type of check, and the expected value.

    Returns:
        dd.DataFrame: A DataFrame containing rows that violate the completeness
        rule, with an additional column `dq_status` indicating the rule details
        in the format "{fields}:{check}:{value}".
    """
    fields, check, value = __extract_params(rule)
    mask = ~reduce(operator.and_, [df[f].notnull() for f in fields])
    viol = df[mask]
    return viol.assign(dq_status=f"{str(fields)}:{check}:{value}")

are_unique(df, rule)

Checks if the specified fields in a Dask DataFrame contain unique combinations of values.

Parameters:

Name Type Description Default
df DataFrame

The input Dask DataFrame to be checked.

required
rule dict

A dictionary containing the rule parameters. It is expected to include: - 'fields': A list of column names to check for uniqueness. - 'check': A string describing the type of check being performed. - 'value': A value associated with the rule (used for status reporting).

required

Returns:

Type Description
DataFrame

dd.DataFrame: A DataFrame containing rows that violate the uniqueness rule,

DataFrame

with an additional column dq_status indicating the rule that was violated.

Source code in sumeh/engine/dask_engine.py
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
def are_unique(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Checks if the specified fields in a Dask DataFrame contain unique combinations of values.

    Parameters:
        df (dd.DataFrame): The input Dask DataFrame to be checked.
        rule (dict): A dictionary containing the rule parameters. It is expected to include:
            - 'fields': A list of column names to check for uniqueness.
            - 'check': A string describing the type of check being performed.
            - 'value': A value associated with the rule (used for status reporting).

    Returns:
        dd.DataFrame: A DataFrame containing rows that violate the uniqueness rule,
        with an additional column `dq_status` indicating the rule that was violated.
    """
    fields, check, value = __extract_params(rule)
    combo = (
        df[fields]
        .astype(str)
        .apply(lambda row: "|".join(row.values), axis=1, meta=("combo", "object"))
    )
    counts = combo.value_counts().compute()
    dupes = counts[counts > 1].index.tolist()
    viol = df[combo.isin(dupes)]
    return viol.assign(dq_status=f"{str(fields)}:{check}:{value}")

has_cardinality(df, rule)

Checks if the cardinality (number of unique values) of a specified field in a Dask DataFrame exceeds a given threshold and returns a modified DataFrame based on the result.

Parameters:

Name Type Description Default
df DataFrame

The input Dask DataFrame to evaluate.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field' (str): The column name to check cardinality for. - 'check' (str): A descriptive label for the check (used in the output). - 'value' (int): The maximum allowed cardinality.

required

Returns:

Type Description
DataFrame

dd.DataFrame: If the cardinality of the specified field exceeds the given value,

DataFrame

returns the original DataFrame with an additional column dq_status indicating

DataFrame

the rule violation. Otherwise, returns an empty DataFrame with the same structure

DataFrame

as the input DataFrame.

Source code in sumeh/engine/dask_engine.py
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
def has_cardinality(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Checks if the cardinality (number of unique values) of a specified field in a Dask DataFrame
    exceeds a given threshold and returns a modified DataFrame based on the result.

    Args:
        df (dd.DataFrame): The input Dask DataFrame to evaluate.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field' (str): The column name to check cardinality for.
            - 'check' (str): A descriptive label for the check (used in the output).
            - 'value' (int): The maximum allowed cardinality.

    Returns:
        dd.DataFrame: If the cardinality of the specified field exceeds the given value,
        returns the original DataFrame with an additional column `dq_status` indicating
        the rule violation. Otherwise, returns an empty DataFrame with the same structure
        as the input DataFrame.
    """
    field, check, value = __extract_params(rule)
    card = df[field].nunique().compute() or 0
    if card > value:
        return df.assign(dq_status=f"{field}:{check}:{value}")
    return df.head(0).pipe(dd.from_pandas, npartitions=1)

has_entropy(df, rule)

Evaluates the entropy of a specified field in a Dask DataFrame and applies a rule to determine if the entropy exceeds a given threshold. If the threshold is exceeded, a new column dq_status is added to the DataFrame with information about the rule violation. Otherwise, an empty DataFrame is returned.

Parameters:

Name Type Description Default
df DataFrame

The input Dask DataFrame to evaluate.

required
rule dict

A dictionary containing the rule parameters. It should include: - field (str): The column name to evaluate. - check (str): The type of check being performed (used for status message). - value (float): The threshold value for the entropy.

required

Returns:

Type Description
DataFrame

dd.DataFrame: A DataFrame with the dq_status column added if the entropy exceeds the threshold,

DataFrame

or an empty DataFrame if the threshold is not exceeded.

Source code in sumeh/engine/dask_engine.py
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
def has_entropy(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Evaluates the entropy of a specified field in a Dask DataFrame and applies a rule to determine
    if the entropy exceeds a given threshold. If the threshold is exceeded, a new column `dq_status`
    is added to the DataFrame with information about the rule violation. Otherwise, an empty DataFrame
    is returned.

    Args:
        df (dd.DataFrame): The input Dask DataFrame to evaluate.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - `field` (str): The column name to evaluate.
            - `check` (str): The type of check being performed (used for status message).
            - `value` (float): The threshold value for the entropy.

    Returns:
        dd.DataFrame: A DataFrame with the `dq_status` column added if the entropy exceeds the threshold,
        or an empty DataFrame if the threshold is not exceeded.
    """
    field, check, value = __extract_params(rule)
    ent = df[field].nunique().compute() or 0.0
    if ent > value:
        return df.assign(dq_status=f"{field}:{check}:{value}")
    return df.head(0).pipe(dd.from_pandas, npartitions=1)

has_infogain(df, rule)

Evaluates whether a given field in a Dask DataFrame satisfies an information gain condition based on the specified rule. If the condition is met, the DataFrame is updated with a dq_status column indicating the rule applied. Otherwise, an empty DataFrame is returned.

Parameters:

Name Type Description Default
df DataFrame

The input Dask DataFrame to evaluate.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field' (str): The column name to evaluate. - 'check' (str): The type of check being performed (used for status annotation). - 'value' (float): The threshold value for the information gain.

required

Returns:

Type Description
DataFrame

dd.DataFrame: The original DataFrame with an added dq_status column if the condition

DataFrame

is met, or an empty DataFrame if the condition is not satisfied.

Source code in sumeh/engine/dask_engine.py
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
def has_infogain(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Evaluates whether a given field in a Dask DataFrame satisfies an information gain condition
    based on the specified rule. If the condition is met, the DataFrame is updated with a
    `dq_status` column indicating the rule applied. Otherwise, an empty DataFrame is returned.

    Args:
        df (dd.DataFrame): The input Dask DataFrame to evaluate.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field' (str): The column name to evaluate.
            - 'check' (str): The type of check being performed (used for status annotation).
            - 'value' (float): The threshold value for the information gain.

    Returns:
        dd.DataFrame: The original DataFrame with an added `dq_status` column if the condition
        is met, or an empty DataFrame if the condition is not satisfied.
    """
    field, check, value = __extract_params(rule)
    ig = df[field].nunique().compute() or 0.0
    if ig > value:
        return df.assign(dq_status=f"{field}:{check}:{value}")
    return df.head(0).pipe(dd.from_pandas, npartitions=1)

has_max(df, rule)

Identifies rows in a Dask DataFrame where the value of a specified field exceeds a given maximum value.

Parameters:

Name Type Description Default
df DataFrame

The input Dask DataFrame to be checked.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field': The name of the column to check. - 'check': A string describing the check (e.g., 'max'). - 'value': The maximum allowable value for the specified field.

required

Returns:

Type Description
DataFrame

dd.DataFrame: A new Dask DataFrame containing only the rows that violate the rule. An additional column dq_status is added to indicate the rule violation in the format "{field}:{check}:{value}".

Source code in sumeh/engine/dask_engine.py
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
def has_max(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Identifies rows in a Dask DataFrame where the value of a specified field exceeds a given maximum value.

    Parameters:
        df (dd.DataFrame): The input Dask DataFrame to be checked.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field': The name of the column to check.
            - 'check': A string describing the check (e.g., 'max').
            - 'value': The maximum allowable value for the specified field.

    Returns:
        dd.DataFrame: A new Dask DataFrame containing only the rows that violate the rule.
                      An additional column `dq_status` is added to indicate the rule violation
                      in the format "{field}:{check}:{value}".
    """
    field, check, value = __extract_params(rule)
    viol = df[df[field] > value]
    return viol.assign(dq_status=f"{field}:{check}:{value}")

has_mean(df, rule)

Checks if the mean of a specified field in a Dask DataFrame satisfies a given condition.

Parameters:

Name Type Description Default
df DataFrame

The input Dask DataFrame to evaluate.

required
rule dict

A dictionary containing the rule to apply. It should include: - 'field' (str): The column name to calculate the mean for. - 'check' (str): The type of check to perform (e.g., 'greater_than'). - 'value' (float): The threshold value to compare the mean against.

required

Returns:

Type Description
DataFrame

dd.DataFrame: A new Dask DataFrame with an additional column dq_status if the mean

DataFrame

satisfies the condition. If the condition is not met, an empty Dask DataFrame is returned.

Source code in sumeh/engine/dask_engine.py
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
def has_mean(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Checks if the mean of a specified field in a Dask DataFrame satisfies a given condition.

    Parameters:
        df (dd.DataFrame): The input Dask DataFrame to evaluate.
        rule (dict): A dictionary containing the rule to apply. It should include:
            - 'field' (str): The column name to calculate the mean for.
            - 'check' (str): The type of check to perform (e.g., 'greater_than').
            - 'value' (float): The threshold value to compare the mean against.

    Returns:
        dd.DataFrame: A new Dask DataFrame with an additional column `dq_status` if the mean
        satisfies the condition. If the condition is not met, an empty Dask DataFrame is returned.
    """
    field, check, value = __extract_params(rule)
    mean_val = df[field].mean().compute() or 0.0
    if mean_val > value:
        return df.assign(dq_status=f"{field}:{check}:{value}")
    return df.head(0).pipe(dd.from_pandas, npartitions=1)

has_min(df, rule)

Checks if the values in a specified field of a Dask DataFrame are greater than or equal to a given minimum value. Returns a DataFrame containing rows that violate this rule, with an additional column indicating the data quality status.

Parameters:

Name Type Description Default
df DataFrame

The input Dask DataFrame to validate.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field': The column name to check. - 'check': The type of check being performed (e.g., 'min'). - 'value': The minimum value to compare against.

required

Returns:

Type Description
DataFrame

dd.DataFrame: A DataFrame containing rows that do not meet the minimum value

DataFrame

requirement, with an additional column dq_status indicating the rule

DataFrame

violation in the format "field:check:value".

Source code in sumeh/engine/dask_engine.py
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
def has_min(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Checks if the values in a specified field of a Dask DataFrame are greater than
    or equal to a given minimum value. Returns a DataFrame containing rows that
    violate this rule, with an additional column indicating the data quality status.

    Args:
        df (dd.DataFrame): The input Dask DataFrame to validate.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field': The column name to check.
            - 'check': The type of check being performed (e.g., 'min').
            - 'value': The minimum value to compare against.

    Returns:
        dd.DataFrame: A DataFrame containing rows that do not meet the minimum value
        requirement, with an additional column `dq_status` indicating the rule
        violation in the format "field:check:value".
    """
    field, check, value = __extract_params(rule)
    viol = df[df[field] < value]
    return viol.assign(dq_status=f"{field}:{check}:{value}")

has_pattern(df, rule)

Identifies rows in a Dask DataFrame that do not match a specified pattern.

Parameters:

Name Type Description Default
df DataFrame

The input Dask DataFrame to be checked.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field': The column name in the DataFrame to apply the pattern check. - 'check': A descriptive label for the type of check being performed. - 'value': The regex pattern to match against the specified column.

required

Returns:

Type Description
DataFrame

dd.DataFrame: A DataFrame containing rows that do not match the specified pattern. An additional column dq_status is added, indicating the rule details in the format "{field}:{check}:{value}".

Source code in sumeh/engine/dask_engine.py
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
def has_pattern(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Identifies rows in a Dask DataFrame that do not match a specified pattern.

    Args:
        df (dd.DataFrame): The input Dask DataFrame to be checked.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field': The column name in the DataFrame to apply the pattern check.
            - 'check': A descriptive label for the type of check being performed.
            - 'value': The regex pattern to match against the specified column.

    Returns:
        dd.DataFrame: A DataFrame containing rows that do not match the specified pattern.
                      An additional column `dq_status` is added, indicating the rule details
                      in the format "{field}:{check}:{value}".
    """
    field, check, value = __extract_params(rule)
    viol = df[~df[field].str.match(value, na=False)]
    return viol.assign(dq_status=f"{field}:{check}:{value}")

has_std(df, rule)

Checks if the standard deviation of a specified field in a Dask DataFrame exceeds a given value.

Parameters:

Name Type Description Default
df DataFrame

The input Dask DataFrame to evaluate.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field' (str): The name of the column to calculate the standard deviation for. - 'check' (str): A descriptive label for the check being performed. - 'value' (float): The threshold value for the standard deviation.

required

Returns:

Type Description
DataFrame

dd.DataFrame: - If the standard deviation of the specified field exceeds the given value, returns the original DataFrame with an additional column dq_status indicating the rule details. - If the standard deviation does not exceed the value, returns an empty DataFrame with the same structure.

Source code in sumeh/engine/dask_engine.py
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
def has_std(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Checks if the standard deviation of a specified field in a Dask DataFrame exceeds a given value.

    Parameters:
        df (dd.DataFrame): The input Dask DataFrame to evaluate.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field' (str): The name of the column to calculate the standard deviation for.
            - 'check' (str): A descriptive label for the check being performed.
            - 'value' (float): The threshold value for the standard deviation.

    Returns:
        dd.DataFrame:
            - If the standard deviation of the specified field exceeds the given value,
              returns the original DataFrame with an additional column `dq_status` indicating the rule details.
            - If the standard deviation does not exceed the value, returns an empty DataFrame with the same structure.
    """
    field, check, value = __extract_params(rule)
    std_val = df[field].std().compute() or 0.0
    if std_val > value:
        return df.assign(dq_status=f"{field}:{check}:{value}")
    return df.head(0).pipe(dd.from_pandas, npartitions=1)

has_sum(df, rule)

Checks if the sum of a specified field in a Dask DataFrame exceeds a given value and returns a modified DataFrame with a status column if the condition is met.

Parameters:

Name Type Description Default
df DataFrame

The input Dask DataFrame to evaluate.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field' (str): The column name to sum. - 'check' (str): A descriptive label for the check (used in the status message). - 'value' (float): The threshold value to compare the sum against.

required

Returns:

Type Description
DataFrame

dd.DataFrame: A new Dask DataFrame. If the sum exceeds the threshold, the DataFrame

DataFrame

will include a dq_status column with a status message. Otherwise, an empty

DataFrame

DataFrame with the same structure as the input is returned.

Source code in sumeh/engine/dask_engine.py
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
def has_sum(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Checks if the sum of a specified field in a Dask DataFrame exceeds a given value
    and returns a modified DataFrame with a status column if the condition is met.

    Args:
        df (dd.DataFrame): The input Dask DataFrame to evaluate.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field' (str): The column name to sum.
            - 'check' (str): A descriptive label for the check (used in the status message).
            - 'value' (float): The threshold value to compare the sum against.

    Returns:
        dd.DataFrame: A new Dask DataFrame. If the sum exceeds the threshold, the DataFrame
        will include a `dq_status` column with a status message. Otherwise, an empty
        DataFrame with the same structure as the input is returned.
    """
    field, check, value = __extract_params(rule)
    sum_val = df[field].sum().compute() or 0.0
    if sum_val > value:
        return df.assign(dq_status=f"{field}:{check}:{value}")
    return df.head(0).pipe(dd.from_pandas, npartitions=1)

is_between(df, rule)

Filters a Dask DataFrame to identify rows where a specified field's value does not fall within a given range.

Parameters:

Name Type Description Default
df DataFrame

The input Dask DataFrame to be checked.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field': The column name in the DataFrame to check. - 'check': The type of check being performed (e.g., "between"). - 'value': A string representing the range in the format "[lo,hi]".

required

Returns:

Type Description
DataFrame

dd.DataFrame: A new DataFrame containing only the rows that violate

DataFrame

the specified range condition. An additional column dq_status is

DataFrame

added to indicate the field, check, and value that caused the violation.

Source code in sumeh/engine/dask_engine.py
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
def is_between(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Filters a Dask DataFrame to identify rows where a specified field's value
    does not fall within a given range.

    Args:
        df (dd.DataFrame): The input Dask DataFrame to be checked.
        rule (dict): A dictionary containing the rule parameters. It should
            include:
            - 'field': The column name in the DataFrame to check.
            - 'check': The type of check being performed (e.g., "between").
            - 'value': A string representing the range in the format "[lo,hi]".

    Returns:
        dd.DataFrame: A new DataFrame containing only the rows that violate
        the specified range condition. An additional column `dq_status` is
        added to indicate the field, check, and value that caused the violation.
    """
    field, check, value = __extract_params(rule)
    lo, hi = value.strip("[]").split(",")
    viol = df[~df[field].between(__convert_value(lo), __convert_value(hi))]
    return viol.assign(dq_status=f"{field}:{check}:{value}")

is_complete(df, rule)

Checks for completeness of a specified field in a Dask DataFrame based on a given rule.

This function identifies rows where the specified field is null and marks them as violations. It then assigns a data quality status to these rows in the resulting DataFrame.

Parameters:

Name Type Description Default
df DataFrame

The Dask DataFrame to be checked.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field': The name of the field to check for completeness. - 'check': The type of check being performed (e.g., 'is_complete'). - 'value': Additional value associated with the rule (not used in this function).

required

Returns:

Type Description
DataFrame

dd.DataFrame: A DataFrame containing rows where the specified field is null,

DataFrame

with an additional column dq_status indicating the data quality status in the format

DataFrame

"{field}:{check}:{value}".

Source code in sumeh/engine/dask_engine.py
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
def is_complete(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Checks for completeness of a specified field in a Dask DataFrame based on a given rule.

    This function identifies rows where the specified field is null and marks them as violations.
    It then assigns a data quality status to these rows in the resulting DataFrame.

    Args:
        df (dd.DataFrame): The Dask DataFrame to be checked.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field': The name of the field to check for completeness.
            - 'check': The type of check being performed (e.g., 'is_complete').
            - 'value': Additional value associated with the rule (not used in this function).

    Returns:
        dd.DataFrame: A DataFrame containing rows where the specified field is null,
        with an additional column `dq_status` indicating the data quality status in the format
        "{field}:{check}:{value}".
    """
    field, check, value = __extract_params(rule)
    viol = df[df[field].isnull()]
    return viol.assign(dq_status=f"{field}:{check}:{value}")

is_composite_key(df, rule)

Determines if the given DataFrame satisfies the composite key condition based on the provided rule.

Parameters:

Name Type Description Default
df DataFrame

A Dask DataFrame to be checked.

required
rule dict

A dictionary defining the composite key rule.

required

Returns:

Type Description
DataFrame

dd.DataFrame: A Dask DataFrame indicating whether the composite key condition is met.

Source code in sumeh/engine/dask_engine.py
648
649
650
651
652
653
654
655
656
657
658
659
def is_composite_key(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Determines if the given DataFrame satisfies the composite key condition based on the provided rule.

    Args:
        df (dd.DataFrame): A Dask DataFrame to be checked.
        rule (dict): A dictionary defining the composite key rule.

    Returns:
        dd.DataFrame: A Dask DataFrame indicating whether the composite key condition is met.
    """
    return are_unique(df, rule)

is_contained_in(df, rule)

Filters a Dask DataFrame to identify rows where the values in a specified field are not contained within a given list of allowed values.

Parameters:

Name Type Description Default
df DataFrame

The input Dask DataFrame to be checked.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field': The column name in the DataFrame to check. - 'check': The type of check being performed (e.g., "is_contained_in"). - 'value': A string representation of a list of allowed values (e.g., "[value1, value2]").

required

Returns:

Type Description
DataFrame

dd.DataFrame: A new Dask DataFrame containing only the rows that violate the rule.

DataFrame

An additional column dq_status is added to indicate the rule violation in the format:

DataFrame

"{field}:{check}:{value}".

Source code in sumeh/engine/dask_engine.py
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
def is_contained_in(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Filters a Dask DataFrame to identify rows where the values in a specified field
    are not contained within a given list of allowed values.

    Args:
        df (dd.DataFrame): The input Dask DataFrame to be checked.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field': The column name in the DataFrame to check.
            - 'check': The type of check being performed (e.g., "is_contained_in").
            - 'value': A string representation of a list of allowed values (e.g., "[value1, value2]").

    Returns:
        dd.DataFrame: A new Dask DataFrame containing only the rows that violate the rule.
        An additional column `dq_status` is added to indicate the rule violation in the format:
        "{field}:{check}:{value}".
    """
    field, check, value = __extract_params(rule)
    lst = [v.strip() for v in value.strip("[]").split(",")]
    viol = df[~df[field].isin(lst)]
    return viol.assign(dq_status=f"{field}:{check}:{value}")

is_date_after(df, rule)

Filters a Dask DataFrame to identify rows where a specified date field is earlier than a given reference date.

Parameters:

Name Type Description Default
df DataFrame

The input Dask DataFrame to be checked.

required
rule dict

A dictionary containing the rule parameters. It should include: - field (str): The name of the column to check. - check (str): A descriptive label for the check (used in the output status). - date_str (str): The reference date as a string in a format compatible with pd.Timestamp.

required

Returns:

Type Description
DataFrame

dd.DataFrame: A new Dask DataFrame containing only the rows where the

DataFrame

specified date field is earlier than the reference date. An additional

DataFrame

column dq_status is added, which contains a string describing the

DataFrame

rule violation in the format field:check:date_str.

Raises:

Type Description
ValueError

If the rule dictionary does not contain the required keys.

Source code in sumeh/engine/dask_engine.py
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
def is_date_after(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Filters a Dask DataFrame to identify rows where a specified date field is
    earlier than a given reference date.

    Args:
        df (dd.DataFrame): The input Dask DataFrame to be checked.
        rule (dict): A dictionary containing the rule parameters. It should
            include:
            - field (str): The name of the column to check.
            - check (str): A descriptive label for the check (used in the
              output status).
            - date_str (str): The reference date as a string in a format
              compatible with `pd.Timestamp`.

    Returns:
        dd.DataFrame: A new Dask DataFrame containing only the rows where the
        specified date field is earlier than the reference date. An additional
        column `dq_status` is added, which contains a string describing the
        rule violation in the format `field:check:date_str`.

    Raises:
        ValueError: If the `rule` dictionary does not contain the required keys.
    """
    field, check, date_str = __extract_params(rule)
    ref = pd.Timestamp(date_str)
    col_dt = dd.to_datetime(df[field], errors="coerce")
    viol = df[col_dt < ref]
    return viol.assign(dq_status=f"{field}:{check}:{date_str}")

is_date_before(df, rule)

Checks if the values in a specified date column of a Dask DataFrame are before a given reference date.

Parameters:

Name Type Description Default
df DataFrame

The input Dask DataFrame containing the data to be validated.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field': The name of the column to check. - 'check': A descriptive string for the check (e.g., "is_date_before"). - 'date_str': The reference date as a string in a format parsable by pandas.Timestamp.

required

Returns:

Type Description
DataFrame

dd.DataFrame: A new Dask DataFrame containing only the rows where the date in the specified column

DataFrame

is after the reference date. An additional column 'dq_status' is added to indicate the validation

DataFrame

status in the format "{field}:{check}:{date_str}".

Source code in sumeh/engine/dask_engine.py
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
def is_date_before(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Checks if the values in a specified date column of a Dask DataFrame are before a given reference date.

    Parameters:
        df (dd.DataFrame): The input Dask DataFrame containing the data to be validated.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field': The name of the column to check.
            - 'check': A descriptive string for the check (e.g., "is_date_before").
            - 'date_str': The reference date as a string in a format parsable by pandas.Timestamp.

    Returns:
        dd.DataFrame: A new Dask DataFrame containing only the rows where the date in the specified column
        is after the reference date. An additional column 'dq_status' is added to indicate the validation
        status in the format "{field}:{check}:{date_str}".
    """
    field, check, date_str = __extract_params(rule)
    ref = pd.Timestamp(date_str)
    col_dt = dd.to_datetime(df[field], errors="coerce")
    viol = df[col_dt > ref]
    return viol.assign(dq_status=f"{field}:{check}:{date_str}")

is_date_between(df, rule)

Filters a Dask DataFrame to identify rows where a date field does not fall within a specified range.

Parameters:

Name Type Description Default
df DataFrame

The input Dask DataFrame containing the data to be checked.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field': The name of the column in the DataFrame to check. - 'check': A string representing the type of check (used for status annotation). - 'val': A string representing the date range in the format "[start_date, end_date]".

required

Returns:

Type Description
DataFrame

dd.DataFrame: A DataFrame containing rows where the date field does not fall within the specified range. An additional column 'dq_status' is added to indicate the rule violation in the format "{field}:{check}:{val}".

Source code in sumeh/engine/dask_engine.py
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
def is_date_between(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Filters a Dask DataFrame to identify rows where a date field does not fall within a specified range.

    Args:
        df (dd.DataFrame): The input Dask DataFrame containing the data to be checked.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field': The name of the column in the DataFrame to check.
            - 'check': A string representing the type of check (used for status annotation).
            - 'val': A string representing the date range in the format "[start_date, end_date]".

    Returns:
        dd.DataFrame: A DataFrame containing rows where the date field does not fall within the specified range.
                      An additional column 'dq_status' is added to indicate the rule violation in the format
                      "{field}:{check}:{val}".
    """
    field, check, val = __extract_params(rule)
    start, end = [pd.Timestamp(v.strip()) for v in val.strip("[]").split(",")]
    col_dt = dd.to_datetime(df[field], errors="coerce")
    mask = (col_dt >= start) & (col_dt <= end)
    viol = df[~mask]
    return viol.assign(dq_status=f"{field}:{check}:{val}")

is_equal(df, rule)

Filters a Dask DataFrame to identify rows where a specified field does not equal a given value.

Parameters:

Name Type Description Default
df DataFrame

The input Dask DataFrame to be checked.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field': The column name in the DataFrame to be checked. - 'check': The type of check to perform (expected to be 'equal' for this function). - 'value': The value to compare against.

required

Returns:

Type Description
DataFrame

dd.DataFrame: A new DataFrame containing rows that violate the equality rule. An additional column dq_status is added, indicating the rule details in the format "{field}:{check}:{value}".

Source code in sumeh/engine/dask_engine.py
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
def is_equal(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Filters a Dask DataFrame to identify rows where a specified field does not equal a given value.

    Args:
        df (dd.DataFrame): The input Dask DataFrame to be checked.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field': The column name in the DataFrame to be checked.
            - 'check': The type of check to perform (expected to be 'equal' for this function).
            - 'value': The value to compare against.

    Returns:
        dd.DataFrame: A new DataFrame containing rows that violate the equality rule.
                      An additional column `dq_status` is added, indicating the rule details
                      in the format "{field}:{check}:{value}".
    """
    field, check, value = __extract_params(rule)
    viol = df[~df[field].eq(value)]
    return viol.assign(dq_status=f"{field}:{check}:{value}")

is_equal_than(df, rule)

Filters a Dask DataFrame to identify rows where the specified field does not equal the given value.

Parameters:

Name Type Description Default
df DataFrame

The input Dask DataFrame to be checked.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field': The column name in the DataFrame to check. - 'check': The type of check being performed (expected to be 'equal' for this function). - 'value': The value to compare against.

required

Returns:

Type Description
DataFrame

dd.DataFrame: A new DataFrame containing rows that violate the equality rule. An additional column dq_status is added, indicating the rule details in the format "{field}:{check}:{value}".

Source code in sumeh/engine/dask_engine.py
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
def is_equal_than(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Filters a Dask DataFrame to identify rows where the specified field does not equal the given value.

    Args:
        df (dd.DataFrame): The input Dask DataFrame to be checked.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field': The column name in the DataFrame to check.
            - 'check': The type of check being performed (expected to be 'equal' for this function).
            - 'value': The value to compare against.

    Returns:
        dd.DataFrame: A new DataFrame containing rows that violate the equality rule.
                      An additional column `dq_status` is added, indicating the rule details
                      in the format "{field}:{check}:{value}".
    """
    field, check, value = __extract_params(rule)
    viol = df[~df[field].eq(value)]
    return viol.assign(dq_status=f"{field}:{check}:{value}")

is_future_date(df, rule)

Checks for rows in a Dask DataFrame where the specified date field contains a future date.

Parameters:

Name Type Description Default
df DataFrame

The input Dask DataFrame to validate.

required
rule dict

A dictionary containing the rule parameters. It is expected to include: - field: The name of the column to check. - check: A descriptive label for the check (used in the output). - _: Additional parameters (ignored in this function).

required

Returns:

Type Description
DataFrame

dd.DataFrame: A new Dask DataFrame containing only the rows where the date in the specified

DataFrame

field is in the future. An additional column dq_status is added to indicate the status

DataFrame

of the validation in the format: "::".

Notes
  • The function coerces the specified column to datetime format, and invalid parsing results in NaT (Not a Time).
  • Rows with NaT in the specified column are excluded from the output.
  • The current date is determined using the system's local date.
Source code in sumeh/engine/dask_engine.py
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
def is_future_date(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Checks for rows in a Dask DataFrame where the specified date field contains a future date.

    Args:
        df (dd.DataFrame): The input Dask DataFrame to validate.
        rule (dict): A dictionary containing the rule parameters. It is expected to include:
            - field: The name of the column to check.
            - check: A descriptive label for the check (used in the output).
            - _: Additional parameters (ignored in this function).

    Returns:
        dd.DataFrame: A new Dask DataFrame containing only the rows where the date in the specified
        field is in the future. An additional column `dq_status` is added to indicate the status
        of the validation in the format: "<field>:<check>:<current_date>".

    Notes:
        - The function coerces the specified column to datetime format, and invalid parsing results
          in NaT (Not a Time).
        - Rows with NaT in the specified column are excluded from the output.
        - The current date is determined using the system's local date.
    """
    field, check, _ = __extract_params(rule)
    today = pd.Timestamp(date.today())
    col_dt = dd.to_datetime(df[field], errors="coerce")
    viol = df[col_dt > today]
    return viol.assign(dq_status=f"{field}:{check}:{today.date().isoformat()}")

is_greater_or_equal_than(df, rule)

Filters a Dask DataFrame to identify rows where a specified field's value is less than a given threshold, and annotates the resulting rows with a data quality status.

Parameters:

Name Type Description Default
df DataFrame

The input Dask DataFrame to be checked.

required
rule dict

A dictionary containing the rule parameters. It should include the following keys: - 'field': The column name in the DataFrame to check. - 'check': The type of check being performed (e.g., 'greater_or_equal'). - 'value': The threshold value to compare against.

required

Returns:

Type Description
DataFrame

dd.DataFrame: A new Dask DataFrame containing only the rows that violate the rule, with an additional column dq_status indicating the field, check type, and threshold value.

Source code in sumeh/engine/dask_engine.py
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
def is_greater_or_equal_than(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Filters a Dask DataFrame to identify rows where a specified field's value
    is less than a given threshold, and annotates the resulting rows with a
    data quality status.

    Args:
        df (dd.DataFrame): The input Dask DataFrame to be checked.
        rule (dict): A dictionary containing the rule parameters. It should
                     include the following keys:
                     - 'field': The column name in the DataFrame to check.
                     - 'check': The type of check being performed (e.g., 'greater_or_equal').
                     - 'value': The threshold value to compare against.

    Returns:
        dd.DataFrame: A new Dask DataFrame containing only the rows that
                      violate the rule, with an additional column `dq_status`
                      indicating the field, check type, and threshold value.
    """
    field, check, value = __extract_params(rule)
    viol = df[df[field] < value]
    return viol.assign(dq_status=f"{field}:{check}:{value}")

is_greater_than(df, rule)

Filters a Dask DataFrame to identify rows where the value in a specified field is greater than a given threshold and annotates the result with a data quality status.

Parameters:

Name Type Description Default
df DataFrame

The input Dask DataFrame to be checked.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field' (str): The column name to check. - 'check' (str): The type of check being performed (e.g., 'greater_than'). - 'value' (numeric): The threshold value to compare against.

required

Returns:

Type Description
DataFrame

dd.DataFrame: A filtered DataFrame containing rows that violate the rule,

DataFrame

with an additional column dq_status indicating the rule details in the format

DataFrame

"{field}:{check}:{value}".

Source code in sumeh/engine/dask_engine.py
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
def is_greater_than(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Filters a Dask DataFrame to identify rows where the value in a specified field
    is greater than a given threshold and annotates the result with a data quality status.

    Args:
        df (dd.DataFrame): The input Dask DataFrame to be checked.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field' (str): The column name to check.
            - 'check' (str): The type of check being performed (e.g., 'greater_than').
            - 'value' (numeric): The threshold value to compare against.

    Returns:
        dd.DataFrame: A filtered DataFrame containing rows that violate the rule,
        with an additional column `dq_status` indicating the rule details in the format
        "{field}:{check}:{value}".
    """
    field, check, value = __extract_params(rule)
    viol = df[df[field] > value]
    return viol.assign(dq_status=f"{field}:{check}:{value}")

is_in(df, rule)

Checks if the specified rule is contained within the given Dask DataFrame.

This function acts as a wrapper for the is_contained_in function, passing the provided DataFrame and rule to it.

Parameters:

Name Type Description Default
df DataFrame

The Dask DataFrame to evaluate.

required
rule dict

A dictionary representing the rule to check against the DataFrame.

required

Returns:

Type Description
DataFrame

dd.DataFrame: A Dask DataFrame resulting from the evaluation of the rule.

Source code in sumeh/engine/dask_engine.py
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
def is_in(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Checks if the specified rule is contained within the given Dask DataFrame.

    This function acts as a wrapper for the `is_contained_in` function,
    passing the provided DataFrame and rule to it.

    Args:
        df (dd.DataFrame): The Dask DataFrame to evaluate.
        rule (dict): A dictionary representing the rule to check against the DataFrame.

    Returns:
        dd.DataFrame: A Dask DataFrame resulting from the evaluation of the rule.
    """
    return is_contained_in(df, rule)

is_in_billions(df, rule)

Identifies rows in a Dask DataFrame where the value in a specified field is greater than or equal to one billion and marks them with a data quality status.

Parameters:

Name Type Description Default
df DataFrame

The input Dask DataFrame to be checked.

required
rule dict

A dictionary containing the rule parameters. It is expected to include the field name, check type, and value.

required

Returns:

Type Description
DataFrame

dd.DataFrame: A Dask DataFrame containing only the rows where the specified field's value is greater than or equal to one billion. An additional column dq_status is added, indicating the field, check type, and value that triggered the rule.

Source code in sumeh/engine/dask_engine.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
def is_in_billions(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Identifies rows in a Dask DataFrame where the value in a specified field
    is greater than or equal to one billion and marks them with a data quality status.

    Args:
        df (dd.DataFrame): The input Dask DataFrame to be checked.
        rule (dict): A dictionary containing the rule parameters. It is expected
                     to include the field name, check type, and value.

    Returns:
        dd.DataFrame: A Dask DataFrame containing only the rows where the specified
                      field's value is greater than or equal to one billion. An
                      additional column `dq_status` is added, indicating the field,
                      check type, and value that triggered the rule.
    """
    field, check, value = __extract_params(rule)
    viol = df[df[field] < 1_000_000_000]
    return viol.assign(dq_status=f"{field}:{check}:{value}")

is_in_millions(df, rule)

Checks if the values in a specified field of a Dask DataFrame are in the millions (greater than or equal to 1,000,000) and returns a DataFrame of violations.

Parameters:

Name Type Description Default
df DataFrame

The input Dask DataFrame to check.

required
rule dict

A dictionary containing the rule parameters. It is expected to include the field name, check type, and value.

required

Returns:

Type Description
DataFrame

dd.DataFrame: A DataFrame containing rows where the specified field's value is greater than or equal to 1,000,000. An additional column dq_status is added to indicate the field, check, and value that triggered the violation.

Source code in sumeh/engine/dask_engine.py
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def is_in_millions(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Checks if the values in a specified field of a Dask DataFrame are in the millions
    (greater than or equal to 1,000,000) and returns a DataFrame of violations.

    Args:
        df (dd.DataFrame): The input Dask DataFrame to check.
        rule (dict): A dictionary containing the rule parameters. It is expected to
                     include the field name, check type, and value.

    Returns:
        dd.DataFrame: A DataFrame containing rows where the specified field's value
                      is greater than or equal to 1,000,000. An additional column
                      `dq_status` is added to indicate the field, check, and value
                      that triggered the violation.
    """
    field, check, value = __extract_params(rule)
    viol = df[df[field] < 1_000_000]
    return viol.assign(dq_status=f"{field}:{check}:{value}")

is_legit(df, rule)

Validates a Dask DataFrame against a specified rule and returns rows that violate the rule.

Parameters:

Name Type Description Default
df DataFrame

The Dask DataFrame to validate.

required
rule dict

A dictionary containing the validation rule. It should include: - 'field': The column name in the DataFrame to validate. - 'check': The type of validation check (e.g., regex, condition). - 'value': The value or pattern to validate against.

required

Returns:

Type Description
DataFrame

dd.DataFrame: A new DataFrame containing rows that violate the rule, with an additional

DataFrame

column dq_status indicating the field, check, and value of the failed validation.

Source code in sumeh/engine/dask_engine.py
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
def is_legit(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Validates a Dask DataFrame against a specified rule and returns rows that violate the rule.

    Args:
        df (dd.DataFrame): The Dask DataFrame to validate.
        rule (dict): A dictionary containing the validation rule. It should include:
            - 'field': The column name in the DataFrame to validate.
            - 'check': The type of validation check (e.g., regex, condition).
            - 'value': The value or pattern to validate against.

    Returns:
        dd.DataFrame: A new DataFrame containing rows that violate the rule, with an additional
        column `dq_status` indicating the field, check, and value of the failed validation.
    """
    field, check, value = __extract_params(rule)
    s = df[field].astype("string")
    mask = s.notnull() & s.str.contains(r"^\S+$", na=False)
    viol = df[~mask]
    return viol.assign(dq_status=f"{field}:{check}:{value}")

is_less_or_equal_than(df, rule)

Filters a Dask DataFrame to identify rows where the value in a specified field is greater than a given threshold, violating a "less or equal than" rule.

Parameters:

Name Type Description Default
df DataFrame

The input Dask DataFrame to be checked.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field': The column name in the DataFrame to be checked. - 'check': The type of check being performed (e.g., "less_or_equal_than"). - 'value': The threshold value to compare against.

required

Returns:

Type Description
DataFrame

dd.DataFrame: A new DataFrame containing only the rows that violate the rule.

DataFrame

An additional column dq_status is added to indicate the rule violation

DataFrame

in the format "{field}:{check}:{value}".

Source code in sumeh/engine/dask_engine.py
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
def is_less_or_equal_than(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Filters a Dask DataFrame to identify rows where the value in a specified field
    is greater than a given threshold, violating a "less or equal than" rule.

    Args:
        df (dd.DataFrame): The input Dask DataFrame to be checked.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field': The column name in the DataFrame to be checked.
            - 'check': The type of check being performed (e.g., "less_or_equal_than").
            - 'value': The threshold value to compare against.

    Returns:
        dd.DataFrame: A new DataFrame containing only the rows that violate the rule.
        An additional column `dq_status` is added to indicate the rule violation
        in the format "{field}:{check}:{value}".
    """
    field, check, value = __extract_params(rule)
    viol = df[df[field] > value]
    return viol.assign(dq_status=f"{field}:{check}:{value}")

is_less_than(df, rule)

Filters a Dask DataFrame to identify rows where the value in a specified field is greater than or equal to a given threshold, and marks them with a data quality status.

Parameters:

Name Type Description Default
df DataFrame

The input Dask DataFrame to be checked.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field' (str): The column name to check. - 'check' (str): The type of check being performed (e.g., "less_than"). - 'value' (numeric): The threshold value for the check.

required

Returns:

Type Description
DataFrame

dd.DataFrame: A new Dask DataFrame containing only the rows that violate the rule,

DataFrame

with an additional column dq_status indicating the rule that was violated in the

DataFrame

format "{field}:{check}:{value}".

Source code in sumeh/engine/dask_engine.py
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
def is_less_than(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Filters a Dask DataFrame to identify rows where the value in a specified field
    is greater than or equal to a given threshold, and marks them with a data quality status.

    Args:
        df (dd.DataFrame): The input Dask DataFrame to be checked.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field' (str): The column name to check.
            - 'check' (str): The type of check being performed (e.g., "less_than").
            - 'value' (numeric): The threshold value for the check.

    Returns:
        dd.DataFrame: A new Dask DataFrame containing only the rows that violate the rule,
        with an additional column `dq_status` indicating the rule that was violated in the
        format "{field}:{check}:{value}".
    """
    field, check, value = __extract_params(rule)
    viol = df[df[field] >= value]
    return viol.assign(dq_status=f"{field}:{check}:{value}")

is_negative(df, rule)

Identifies rows in a Dask DataFrame where the specified field does not satisfy a "negative" check.

This function filters the DataFrame to find rows where the value in the specified field is greater than or equal to 0 (i.e., not negative). It then assigns a new column dq_status to indicate the rule that was violated.

Parameters:

Name Type Description Default
df DataFrame

The input Dask DataFrame to be checked.

required
rule dict

A dictionary containing the rule parameters. It should include: - field (str): The name of the column to check. - check (str): The type of check being performed (e.g., "negative"). - value (any): The expected value or condition for the check.

required

Returns:

Type Description
DataFrame

dd.DataFrame: A new Dask DataFrame containing only the rows that violate the rule,

DataFrame

with an additional column dq_status describing the violation in the format

DataFrame

"{field}:{check}:{value}".

Source code in sumeh/engine/dask_engine.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
def is_negative(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Identifies rows in a Dask DataFrame where the specified field does not satisfy a "negative" check.

    This function filters the DataFrame to find rows where the value in the specified field
    is greater than or equal to 0 (i.e., not negative). It then assigns a new column `dq_status`
    to indicate the rule that was violated.

    Args:
        df (dd.DataFrame): The input Dask DataFrame to be checked.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - `field` (str): The name of the column to check.
            - `check` (str): The type of check being performed (e.g., "negative").
            - `value` (any): The expected value or condition for the check.

    Returns:
        dd.DataFrame: A new Dask DataFrame containing only the rows that violate the rule,
        with an additional column `dq_status` describing the violation in the format
        "{field}:{check}:{value}".
    """
    field, check, value = __extract_params(rule)
    viol = df[df[field] >= 0]
    return viol.assign(dq_status=f"{field}:{check}:{value}")

is_on_friday(df, rule)

Filters a Dask DataFrame to identify rows where a specified date field falls on a Friday.

Parameters:

Name Type Description Default
df DataFrame

The input Dask DataFrame containing the data to be checked.

required
rule dict

A dictionary containing the rule parameters. It is expected to have the following keys: - field (str): The name of the column in the DataFrame to check. - check (str): A descriptive string for the check being performed. - value (str): A value associated with the rule, used for status annotation.

required

Returns:

Type Description
DataFrame

dd.DataFrame: A new Dask DataFrame containing only the rows where the specified

DataFrame

date field falls on a Friday. An additional column dq_status is added to the

DataFrame

DataFrame, containing a string in the format "{field}:{check}:{value}" to indicate

DataFrame

the rule applied.

Source code in sumeh/engine/dask_engine.py
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
def is_on_friday(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Filters a Dask DataFrame to identify rows where a specified date field falls on a Friday.

    Args:
        df (dd.DataFrame): The input Dask DataFrame containing the data to be checked.
        rule (dict): A dictionary containing the rule parameters. It is expected to have
            the following keys:
            - field (str): The name of the column in the DataFrame to check.
            - check (str): A descriptive string for the check being performed.
            - value (str): A value associated with the rule, used for status annotation.

    Returns:
        dd.DataFrame: A new Dask DataFrame containing only the rows where the specified
        date field falls on a Friday. An additional column `dq_status` is added to the
        DataFrame, containing a string in the format "{field}:{check}:{value}" to indicate
        the rule applied.
    """
    field, check, value = __extract_params(rule)
    col_dt = dd.to_datetime(df[field], errors="coerce")
    viol = df[col_dt.dt.weekday != 4]
    return viol.assign(dq_status=f"{field}:{check}:{value}")

is_on_monday(df, rule)

Filters a Dask DataFrame to identify rows where the date in a specified column falls on a Monday.

Parameters:

Name Type Description Default
df DataFrame

The input Dask DataFrame containing the data to be checked.

required
rule dict

A dictionary containing the rule parameters. It is expected to include: - 'field': The name of the column in the DataFrame to check. - 'check': A string representing the type of check (used for status assignment). - 'value': A value associated with the rule (used for status assignment).

required

Returns:

Type Description
DataFrame

dd.DataFrame: A new Dask DataFrame containing only the rows where the date in the specified

DataFrame

column falls on a Monday. An additional column dq_status is added to indicate the rule

DataFrame

applied in the format "{field}:{check}:{value}".

Source code in sumeh/engine/dask_engine.py
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
def is_on_monday(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Filters a Dask DataFrame to identify rows where the date in a specified column falls on a Monday.

    Args:
        df (dd.DataFrame): The input Dask DataFrame containing the data to be checked.
        rule (dict): A dictionary containing the rule parameters. It is expected to include:
            - 'field': The name of the column in the DataFrame to check.
            - 'check': A string representing the type of check (used for status assignment).
            - 'value': A value associated with the rule (used for status assignment).

    Returns:
        dd.DataFrame: A new Dask DataFrame containing only the rows where the date in the specified
        column falls on a Monday. An additional column `dq_status` is added to indicate the rule
        applied in the format "{field}:{check}:{value}".
    """
    field, check, value = __extract_params(rule)
    col_dt = dd.to_datetime(df[field], errors="coerce")
    viol = df[col_dt.dt.weekday != 0]
    return viol.assign(dq_status=f"{field}:{check}:{value}")

is_on_saturday(df, rule)

Filters a Dask DataFrame to identify rows where the date in a specified column falls on a Saturday.

Parameters:

Name Type Description Default
df DataFrame

The input Dask DataFrame containing the data to be checked.

required
rule dict

A dictionary containing the rule parameters. It is expected to include: - 'field': The name of the column in the DataFrame to check. - 'check': A string representing the type of check (used for status assignment). - 'value': A value associated with the rule (used for status assignment).

required

Returns:

Type Description
DataFrame

dd.DataFrame: A new Dask DataFrame containing only the rows where the date in the specified

DataFrame

column falls on a Saturday. An additional column dq_status is added to indicate the rule

DataFrame

applied in the format "{field}:{check}:{value}".

Source code in sumeh/engine/dask_engine.py
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
def is_on_saturday(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Filters a Dask DataFrame to identify rows where the date in a specified column falls on a Saturday.

    Args:
        df (dd.DataFrame): The input Dask DataFrame containing the data to be checked.
        rule (dict): A dictionary containing the rule parameters. It is expected to include:
            - 'field': The name of the column in the DataFrame to check.
            - 'check': A string representing the type of check (used for status assignment).
            - 'value': A value associated with the rule (used for status assignment).

    Returns:
        dd.DataFrame: A new Dask DataFrame containing only the rows where the date in the specified
        column falls on a Saturday. An additional column `dq_status` is added to indicate the rule
        applied in the format "{field}:{check}:{value}".
    """
    field, check, value = __extract_params(rule)
    col_dt = dd.to_datetime(df[field], errors="coerce")
    viol = df[col_dt.dt.weekday != 5]
    return viol.assign(dq_status=f"{field}:{check}:{value}")

is_on_sunday(df, rule)

Filters a Dask DataFrame to identify rows where a specified date field falls on a Sunday.

Parameters:

Name Type Description Default
df DataFrame

The input Dask DataFrame containing the data to be checked.

required
rule dict

A dictionary containing the rule parameters. It is expected to include: - field (str): The name of the column in the DataFrame to check. - check (str): A descriptive string for the check being performed. - value (str): A value associated with the rule, used for status annotation.

required

Returns:

Type Description
DataFrame

dd.DataFrame: A new Dask DataFrame containing only the rows where the specified

DataFrame

date field falls on a Sunday. An additional column dq_status is added to indicate

DataFrame

the rule applied in the format "{field}:{check}:{value}".

Source code in sumeh/engine/dask_engine.py
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
def is_on_sunday(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Filters a Dask DataFrame to identify rows where a specified date field falls on a Sunday.

    Args:
        df (dd.DataFrame): The input Dask DataFrame containing the data to be checked.
        rule (dict): A dictionary containing the rule parameters. It is expected to include:
            - field (str): The name of the column in the DataFrame to check.
            - check (str): A descriptive string for the check being performed.
            - value (str): A value associated with the rule, used for status annotation.

    Returns:
        dd.DataFrame: A new Dask DataFrame containing only the rows where the specified
        date field falls on a Sunday. An additional column `dq_status` is added to indicate
        the rule applied in the format "{field}:{check}:{value}".
    """
    field, check, value = __extract_params(rule)
    col_dt = dd.to_datetime(df[field], errors="coerce")
    viol = df[col_dt.dt.weekday != 6]
    return viol.assign(dq_status=f"{field}:{check}:{value}")

is_on_thursday(df, rule)

Filters a Dask DataFrame to identify rows where the specified date field falls on a Thursday.

Parameters:

Name Type Description Default
df DataFrame

The input Dask DataFrame containing the data to be checked.

required
rule dict

A dictionary containing the rule parameters. It is expected to include: - field (str): The name of the column in the DataFrame to check. - check (str): A descriptive string for the type of check being performed. - value (str): A value associated with the rule (not used in the logic but included in the output).

required

Returns:

Type Description
DataFrame

dd.DataFrame: A new Dask DataFrame containing only the rows where the specified date field

DataFrame

falls on a Thursday. An additional column dq_status is added to indicate the rule applied

DataFrame

in the format "{field}:{check}:{value}".

Source code in sumeh/engine/dask_engine.py
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
def is_on_thursday(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Filters a Dask DataFrame to identify rows where the specified date field falls on a Thursday.

    Args:
        df (dd.DataFrame): The input Dask DataFrame containing the data to be checked.
        rule (dict): A dictionary containing the rule parameters. It is expected to include:
            - field (str): The name of the column in the DataFrame to check.
            - check (str): A descriptive string for the type of check being performed.
            - value (str): A value associated with the rule (not used in the logic but included in the output).

    Returns:
        dd.DataFrame: A new Dask DataFrame containing only the rows where the specified date field
        falls on a Thursday. An additional column `dq_status` is added to indicate the rule applied
        in the format "{field}:{check}:{value}".
    """
    field, check, value = __extract_params(rule)
    col_dt = dd.to_datetime(df[field], errors="coerce")
    viol = df[col_dt.dt.weekday != 3]
    return viol.assign(dq_status=f"{field}:{check}:{value}")

is_on_tuesday(df, rule)

Filters a Dask DataFrame to identify rows where the specified date field falls on a Tuesday.

Parameters:

Name Type Description Default
df DataFrame

The input Dask DataFrame containing the data to be checked.

required
rule dict

A dictionary containing the rule parameters. It is expected to include: - 'field': The name of the column in the DataFrame to check. - 'check': A string representing the type of check (used for status annotation). - 'value': A value associated with the rule (used for status annotation).

required

Returns:

Type Description
DataFrame

dd.DataFrame: A new Dask DataFrame containing only the rows where the specified date field

DataFrame

falls on a Tuesday. An additional column dq_status is added to indicate the rule applied

DataFrame

in the format "{field}:{check}:{value}".

Source code in sumeh/engine/dask_engine.py
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
def is_on_tuesday(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Filters a Dask DataFrame to identify rows where the specified date field falls on a Tuesday.

    Args:
        df (dd.DataFrame): The input Dask DataFrame containing the data to be checked.
        rule (dict): A dictionary containing the rule parameters. It is expected to include:
            - 'field': The name of the column in the DataFrame to check.
            - 'check': A string representing the type of check (used for status annotation).
            - 'value': A value associated with the rule (used for status annotation).

    Returns:
        dd.DataFrame: A new Dask DataFrame containing only the rows where the specified date field
        falls on a Tuesday. An additional column `dq_status` is added to indicate the rule applied
        in the format "{field}:{check}:{value}".
    """
    field, check, value = __extract_params(rule)
    col_dt = dd.to_datetime(df[field], errors="coerce")
    viol = df[col_dt.dt.weekday != 1]
    return viol.assign(dq_status=f"{field}:{check}:{value}")

is_on_wednesday(df, rule)

Filters a Dask DataFrame to identify rows where the date in a specified column falls on a Wednesday.

Parameters:

Name Type Description Default
df DataFrame

The input Dask DataFrame containing the data to be checked.

required
rule dict

A dictionary containing the rule parameters. It is expected to include: - field (str): The name of the column in the DataFrame to check. - check (str): A descriptive string for the check being performed. - value (str): A value associated with the rule (not directly used in the function).

required

Returns:

Type Description
DataFrame

dd.DataFrame: A new Dask DataFrame containing only the rows where the date in the specified column

DataFrame

falls on a Wednesday. An additional column dq_status is added to indicate the rule applied in the

DataFrame

format {field}:{check}:{value}.

Source code in sumeh/engine/dask_engine.py
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
def is_on_wednesday(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Filters a Dask DataFrame to identify rows where the date in a specified column falls on a Wednesday.

    Args:
        df (dd.DataFrame): The input Dask DataFrame containing the data to be checked.
        rule (dict): A dictionary containing the rule parameters. It is expected to include:
            - `field` (str): The name of the column in the DataFrame to check.
            - `check` (str): A descriptive string for the check being performed.
            - `value` (str): A value associated with the rule (not directly used in the function).

    Returns:
        dd.DataFrame: A new Dask DataFrame containing only the rows where the date in the specified column
        falls on a Wednesday. An additional column `dq_status` is added to indicate the rule applied in the
        format `{field}:{check}:{value}`.
    """
    field, check, value = __extract_params(rule)
    col_dt = dd.to_datetime(df[field], errors="coerce")
    viol = df[col_dt.dt.weekday != 2]
    return viol.assign(dq_status=f"{field}:{check}:{value}")

is_on_weekday(df, rule)

Filters a Dask DataFrame to include only rows where the date in the specified field falls on a weekday (Monday to Friday) and assigns a data quality status column.

Parameters:

Name Type Description Default
df DataFrame

The input Dask DataFrame containing the data to be filtered.

required
rule dict

A dictionary containing the rule parameters. It is expected to have the following keys: - field (str): The name of the column in the DataFrame containing date values. - check (str): A descriptive string for the check being performed. - value (str): A value associated with the rule, used for constructing the dq_status column.

required

Returns:

Type Description
DataFrame

dd.DataFrame: A new Dask DataFrame containing only the rows where the date in the specified field

DataFrame

falls on a weekday. An additional column dq_status is added to indicate the rule applied.

Source code in sumeh/engine/dask_engine.py
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
def is_on_weekday(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Filters a Dask DataFrame to include only rows where the date in the specified field falls on a weekday
    (Monday to Friday) and assigns a data quality status column.

    Args:
        df (dd.DataFrame): The input Dask DataFrame containing the data to be filtered.
        rule (dict): A dictionary containing the rule parameters. It is expected to have the following keys:
            - field (str): The name of the column in the DataFrame containing date values.
            - check (str): A descriptive string for the check being performed.
            - value (str): A value associated with the rule, used for constructing the `dq_status` column.

    Returns:
        dd.DataFrame: A new Dask DataFrame containing only the rows where the date in the specified field
        falls on a weekday. An additional column `dq_status` is added to indicate the rule applied.
    """
    field, check, value = __extract_params(rule)
    col_dt = dd.to_datetime(df[field], errors="coerce")
    dow = col_dt.dt.weekday
    viol = df[(dow >= 5) & (dow <= 6)]
    return viol.assign(dq_status=f"{field}:{check}:{value}")

is_on_weekend(df, rule)

Identifies rows in a Dask DataFrame where the date in a specified column falls on a weekend.

Parameters:

Name Type Description Default
df DataFrame

The input Dask DataFrame containing the data to be checked.

required
rule dict

A dictionary containing the rule parameters. It is expected to have the following keys: - 'field': The name of the column in the DataFrame to check. - 'check': A string representing the type of check (used for status annotation). - 'value': A value associated with the rule (used for status annotation).

required

Returns:

Type Description
DataFrame

dd.DataFrame: A new Dask DataFrame containing only the rows where the date in the specified column falls on a weekend (Saturday or Sunday). An additional column dq_status is added to indicate the rule applied in the format "{field}:{check}:{value}".

Source code in sumeh/engine/dask_engine.py
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
def is_on_weekend(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Identifies rows in a Dask DataFrame where the date in a specified column falls on a weekend.

    Args:
        df (dd.DataFrame): The input Dask DataFrame containing the data to be checked.
        rule (dict): A dictionary containing the rule parameters. It is expected to have
                     the following keys:
                     - 'field': The name of the column in the DataFrame to check.
                     - 'check': A string representing the type of check (used for status annotation).
                     - 'value': A value associated with the rule (used for status annotation).

    Returns:
        dd.DataFrame: A new Dask DataFrame containing only the rows where the date in the specified
                      column falls on a weekend (Saturday or Sunday). An additional column `dq_status`
                      is added to indicate the rule applied in the format "{field}:{check}:{value}".
    """
    field, check, value = __extract_params(rule)
    col_dt = dd.to_datetime(df[field], errors="coerce")
    dow = col_dt.dt.weekday
    viol = df[(dow >= 0) & (dow <= 4)]
    return viol.assign(dq_status=f"{field}:{check}:{value}")

is_past_date(df, rule)

Checks if the values in a specified date column of a Dask DataFrame are in the past.

Parameters:

Name Type Description Default
df DataFrame

The input Dask DataFrame containing the data to be checked.

required
rule dict

A dictionary containing the rule parameters. It is expected to include the field name to check, the check type, and additional parameters.

required

Returns:

Type Description
DataFrame

dd.DataFrame: A Dask DataFrame containing rows where the date in the specified column is in the past. An additional column dq_status is added to indicate the field, check type, and the date of the check in the format "field:check:YYYY-MM-DD".

Source code in sumeh/engine/dask_engine.py
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
def is_past_date(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Checks if the values in a specified date column of a Dask DataFrame are in the past.

    Args:
        df (dd.DataFrame): The input Dask DataFrame containing the data to be checked.
        rule (dict): A dictionary containing the rule parameters. It is expected to include
                     the field name to check, the check type, and additional parameters.

    Returns:
        dd.DataFrame: A Dask DataFrame containing rows where the date in the specified column
                      is in the past. An additional column `dq_status` is added to indicate
                      the field, check type, and the date of the check in the format
                      "field:check:YYYY-MM-DD".
    """
    field, check, _ = __extract_params(rule)
    today = pd.Timestamp(date.today())
    col_dt = dd.to_datetime(df[field], errors="coerce")
    viol = df[col_dt < today]
    return viol.assign(dq_status=f"{field}:{check}:{today.date().isoformat()}")

is_positive(df, rule)

Checks if the values in a specified field of a Dask DataFrame are positive.

Parameters:

Name Type Description Default
df DataFrame

The input Dask DataFrame to validate.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field': The name of the field to check. - 'check': The type of check being performed (e.g., "is_positive"). - 'value': The expected value or condition (e.g., "0").

required

Returns:

Type Description
DataFrame

dd.DataFrame: A DataFrame containing rows where the specified field has

DataFrame

negative values, with an additional column dq_status indicating the

DataFrame

field, check, and value that failed.

Source code in sumeh/engine/dask_engine.py
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
def is_positive(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Checks if the values in a specified field of a Dask DataFrame are positive.

    Args:
        df (dd.DataFrame): The input Dask DataFrame to validate.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field': The name of the field to check.
            - 'check': The type of check being performed (e.g., "is_positive").
            - 'value': The expected value or condition (e.g., "0").

    Returns:
        dd.DataFrame: A DataFrame containing rows where the specified field has
        negative values, with an additional column `dq_status` indicating the
        field, check, and value that failed.
    """
    field, check, value = __extract_params(rule)
    viol = df[df[field] < 0]
    return viol.assign(dq_status=f"{field}:{check}:{value}")

is_primary_key(df, rule)

Determines if the specified rule identifies a primary key in the given Dask DataFrame.

This function checks whether the combination of columns specified in the rule results in unique values across the DataFrame, effectively identifying a primary key.

Parameters:

Name Type Description Default
df DataFrame

The Dask DataFrame to evaluate.

required
rule dict

A dictionary defining the rule to check for primary key uniqueness. Typically, this includes the column(s) to be evaluated.

required

Returns:

Type Description
DataFrame

dd.DataFrame: A Dask DataFrame indicating whether the rule satisfies the primary key condition.

Source code in sumeh/engine/dask_engine.py
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
def is_primary_key(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Determines if the specified rule identifies a primary key in the given Dask DataFrame.

    This function checks whether the combination of columns specified in the rule
    results in unique values across the DataFrame, effectively identifying a primary key.

    Args:
        df (dd.DataFrame): The Dask DataFrame to evaluate.
        rule (dict): A dictionary defining the rule to check for primary key uniqueness.
                     Typically, this includes the column(s) to be evaluated.

    Returns:
        dd.DataFrame: A Dask DataFrame indicating whether the rule satisfies the primary key condition.
    """
    return is_unique(df, rule)

is_t_minus_1(df, rule)

Filters a Dask DataFrame to identify rows where a specified datetime column matches the date of "T-1" (yesterday) and assigns a data quality status.

Parameters:

Name Type Description Default
df DataFrame

The input Dask DataFrame containing the data to be checked.

required
rule dict

A dictionary containing the rule parameters. It is expected to include the following keys: - 'field': The name of the column to check. - 'check': A string describing the check being performed. - 'value': Additional value or metadata related to the check.

required

Returns:

Type Description
DataFrame

dd.DataFrame: A new Dask DataFrame containing only the rows where the

DataFrame

specified column matches "T-1". An additional column dq_status is added

DataFrame

to indicate the data quality status in the format "{field}:{check}:{value}".

Source code in sumeh/engine/dask_engine.py
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
def is_t_minus_1(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Filters a Dask DataFrame to identify rows where a specified datetime column
    matches the date of "T-1" (yesterday) and assigns a data quality status.

    Args:
        df (dd.DataFrame): The input Dask DataFrame containing the data to be checked.
        rule (dict): A dictionary containing the rule parameters. It is expected
            to include the following keys:
            - 'field': The name of the column to check.
            - 'check': A string describing the check being performed.
            - 'value': Additional value or metadata related to the check.

    Returns:
        dd.DataFrame: A new Dask DataFrame containing only the rows where the
        specified column matches "T-1". An additional column `dq_status` is added
        to indicate the data quality status in the format "{field}:{check}:{value}".
    """
    field, check, value = __extract_params(rule)
    target = pd.Timestamp(date.today() - pd.Timedelta(days=1))
    col_dt = dd.to_datetime(df[field], errors="coerce")
    viol = df[col_dt != target]
    return viol.assign(dq_status=f"{field}:{check}:{value}")

is_t_minus_2(df, rule)

Filters a Dask DataFrame to identify rows where a specified datetime column matches the date two days prior to the current date.

Parameters:

Name Type Description Default
df DataFrame

The input Dask DataFrame containing the data to be filtered.

required
rule dict

A dictionary containing the rule parameters. It is expected to include the following keys: - 'field': The name of the column in the DataFrame to check. - 'check': A string representing the type of check (used for metadata). - 'value': A value associated with the rule (used for metadata).

required

Returns:

Type Description
DataFrame

dd.DataFrame: A new Dask DataFrame containing only the rows where the specified

DataFrame

column matches the target date (two days prior to the current date). An additional

DataFrame

column dq_status is added to indicate the rule applied in the format

DataFrame

"{field}:{check}:{value}".

Source code in sumeh/engine/dask_engine.py
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
def is_t_minus_2(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Filters a Dask DataFrame to identify rows where a specified datetime column
    matches the date two days prior to the current date.

    Args:
        df (dd.DataFrame): The input Dask DataFrame containing the data to be filtered.
        rule (dict): A dictionary containing the rule parameters. It is expected to
            include the following keys:
            - 'field': The name of the column in the DataFrame to check.
            - 'check': A string representing the type of check (used for metadata).
            - 'value': A value associated with the rule (used for metadata).

    Returns:
        dd.DataFrame: A new Dask DataFrame containing only the rows where the specified
        column matches the target date (two days prior to the current date). An additional
        column `dq_status` is added to indicate the rule applied in the format
        "{field}:{check}:{value}".
    """
    field, check, value = __extract_params(rule)
    target = pd.Timestamp(date.today() - pd.Timedelta(days=2))
    col_dt = dd.to_datetime(df[field], errors="coerce")
    viol = df[col_dt != target]
    return viol.assign(dq_status=f"{field}:{check}:{value}")

is_t_minus_3(df, rule)

Filters a Dask DataFrame to identify rows where the specified date field matches exactly three days prior to the current date.

Parameters:

Name Type Description Default
df DataFrame

The input Dask DataFrame containing the data to be checked.

required
rule dict

A dictionary containing rule parameters. It is expected to include the field name to check, the type of check, and the value (unused in this function).

required

Returns:

Type Description
DataFrame

dd.DataFrame: A filtered Dask DataFrame containing only the rows where the specified date field matches three days prior to the current date. An additional column dq_status is added to indicate the rule applied in the format "{field}:{check}:{value}".

Source code in sumeh/engine/dask_engine.py
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
def is_t_minus_3(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Filters a Dask DataFrame to identify rows where the specified date field matches
    exactly three days prior to the current date.

    Args:
        df (dd.DataFrame): The input Dask DataFrame containing the data to be checked.
        rule (dict): A dictionary containing rule parameters. It is expected to include
                     the field name to check, the type of check, and the value (unused in this function).

    Returns:
        dd.DataFrame: A filtered Dask DataFrame containing only the rows where the specified
                      date field matches three days prior to the current date. An additional
                      column `dq_status` is added to indicate the rule applied in the format
                      "{field}:{check}:{value}".
    """
    field, check, value = __extract_params(rule)
    target = pd.Timestamp(date.today() - pd.Timedelta(days=3))
    col_dt = dd.to_datetime(df[field], errors="coerce")
    viol = df[col_dt != target]
    return viol.assign(dq_status=f"{field}:{check}:{value}")

is_today(df, rule)

Filters a Dask DataFrame to identify rows where the specified field matches today's date.

Parameters:

Name Type Description Default
df DataFrame

The input Dask DataFrame to be filtered.

required
rule dict

A dictionary containing the rule parameters. It is expected to have the following keys: - field (str): The name of the column in the DataFrame to check. - check (str): A descriptive label for the type of check being performed. - value (str): A descriptive label for the expected value.

required

Returns:

Type Description
DataFrame

dd.DataFrame: A new Dask DataFrame containing only the rows where the specified

DataFrame

field matches today's date. An additional column dq_status is added to indicate

DataFrame

the rule applied in the format "{field}:{check}:{value}".

Source code in sumeh/engine/dask_engine.py
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
def is_today(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Filters a Dask DataFrame to identify rows where the specified field matches today's date.

    Args:
        df (dd.DataFrame): The input Dask DataFrame to be filtered.
        rule (dict): A dictionary containing the rule parameters. It is expected to have
            the following keys:
            - field (str): The name of the column in the DataFrame to check.
            - check (str): A descriptive label for the type of check being performed.
            - value (str): A descriptive label for the expected value.

    Returns:
        dd.DataFrame: A new Dask DataFrame containing only the rows where the specified
        field matches today's date. An additional column `dq_status` is added to indicate
        the rule applied in the format "{field}:{check}:{value}".
    """
    field, check, value = __extract_params(rule)
    target = pd.Timestamp(date.today())
    col_dt = dd.to_datetime(df[field], errors="coerce")
    viol = df[col_dt != target]
    return viol.assign(dq_status=f"{field}:{check}:{value}")

is_unique(df, rule)

Checks for uniqueness of a specified field in a Dask DataFrame based on a given rule.

Parameters:

Name Type Description Default
df DataFrame

The Dask DataFrame to check for uniqueness.

required
rule dict

A dictionary containing the rule parameters. It is expected to include: - 'field': The column name to check for uniqueness. - 'check': The type of check being performed (e.g., "unique"). - 'value': Additional value or metadata related to the check.

required

Returns:

Type Description
DataFrame

dd.DataFrame: A DataFrame containing rows that violate the uniqueness rule,

DataFrame

with an additional column dq_status indicating the rule that was violated

DataFrame

in the format "{field}:{check}:{value}".

Source code in sumeh/engine/dask_engine.py
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
def is_unique(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Checks for uniqueness of a specified field in a Dask DataFrame based on a given rule.

    Parameters:
        df (dd.DataFrame): The Dask DataFrame to check for uniqueness.
        rule (dict): A dictionary containing the rule parameters. It is expected to include:
            - 'field': The column name to check for uniqueness.
            - 'check': The type of check being performed (e.g., "unique").
            - 'value': Additional value or metadata related to the check.

    Returns:
        dd.DataFrame: A DataFrame containing rows that violate the uniqueness rule,
        with an additional column `dq_status` indicating the rule that was violated
        in the format "{field}:{check}:{value}".
    """
    field, check, value = __extract_params(rule)
    counts = df[field].value_counts().compute()
    dup_vals = counts[counts > 1].index.tolist()
    viol = df[df[field].isin(dup_vals)]
    return viol.assign(dq_status=f"{field}:{check}:{value}")

is_yesterday(df, rule)

Determines if the rows in a Dask DataFrame correspond to "yesterday" based on a given rule.

This function acts as a wrapper for the is_t_minus_1 function, applying the same logic to check if the data corresponds to the previous day.

Parameters:

Name Type Description Default
df DataFrame

The input Dask DataFrame to evaluate.

required
rule dict

A dictionary containing the rule or criteria to determine "yesterday".

required

Returns:

Type Description
DataFrame

dd.DataFrame: A Dask DataFrame with the evaluation results.

Source code in sumeh/engine/dask_engine.py
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
def is_yesterday(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Determines if the rows in a Dask DataFrame correspond to "yesterday"
    based on a given rule.

    This function acts as a wrapper for the `is_t_minus_1` function,
    applying the same logic to check if the data corresponds to the
    previous day.

    Args:
        df (dd.DataFrame): The input Dask DataFrame to evaluate.
        rule (dict): A dictionary containing the rule or criteria
                     to determine "yesterday".

    Returns:
        dd.DataFrame: A Dask DataFrame with the evaluation results.
    """
    return is_t_minus_1(df, rule)

not_contained_in(df, rule)

Filters a Dask DataFrame to identify rows where the specified field's value is contained in a given list, and assigns a data quality status to the resulting rows.

Parameters:

Name Type Description Default
df DataFrame

The input Dask DataFrame to be filtered.

required
rule dict

A dictionary containing the rule parameters. It should include: - 'field': The column name in the DataFrame to check. - 'check': The type of check being performed (e.g., "not_contained_in"). - 'value': A string representation of a list of values to check against, formatted as "[value1, value2, ...]".

required

Returns:

Type Description
DataFrame

dd.DataFrame: A new DataFrame containing only the rows where the specified

DataFrame

field's value is in the provided list, with an additional column dq_status

DataFrame

indicating the rule applied in the format "field:check:value".

Source code in sumeh/engine/dask_engine.py
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
def not_contained_in(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Filters a Dask DataFrame to identify rows where the specified field's value is
    contained in a given list, and assigns a data quality status to the resulting rows.

    Args:
        df (dd.DataFrame): The input Dask DataFrame to be filtered.
        rule (dict): A dictionary containing the rule parameters. It should include:
            - 'field': The column name in the DataFrame to check.
            - 'check': The type of check being performed (e.g., "not_contained_in").
            - 'value': A string representation of a list of values to check against,
              formatted as "[value1, value2, ...]".

    Returns:
        dd.DataFrame: A new DataFrame containing only the rows where the specified
        field's value is in the provided list, with an additional column `dq_status`
        indicating the rule applied in the format "field:check:value".
    """
    field, check, value = __extract_params(rule)
    lst = [v.strip() for v in value.strip("[]").split(",")]
    viol = df[df[field].isin(lst)]
    return viol.assign(dq_status=f"{field}:{check}:{value}")

not_in(df, rule)

Filters a Dask DataFrame by excluding rows where the specified rule is satisfied.

This function delegates the filtering logic to the not_contained_in function.

Parameters:

Name Type Description Default
df DataFrame

The input Dask DataFrame to be filtered.

required
rule dict

A dictionary defining the filtering rule. The structure and interpretation of this rule depend on the implementation of not_contained_in.

required

Returns:

Type Description
DataFrame

dd.DataFrame: A new Dask DataFrame with rows excluded based on the rule.

Source code in sumeh/engine/dask_engine.py
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
def not_in(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Filters a Dask DataFrame by excluding rows where the specified rule is satisfied.

    This function delegates the filtering logic to the `not_contained_in` function.

    Args:
        df (dd.DataFrame): The input Dask DataFrame to be filtered.
        rule (dict): A dictionary defining the filtering rule. The structure and
                     interpretation of this rule depend on the implementation of
                     `not_contained_in`.

    Returns:
        dd.DataFrame: A new Dask DataFrame with rows excluded based on the rule.
    """
    return not_contained_in(df, rule)

satisfies(df, rule)

Filters a Dask DataFrame based on a rule and returns rows that do not satisfy the rule.

The function evaluates a rule on the given Dask DataFrame and identifies rows that violate the rule. The rule is specified as a dictionary containing a field, a check, and a value. The rule's logical expression is converted to Python syntax for evaluation.

Parameters:

Name Type Description Default
df DataFrame

The input Dask DataFrame to be filtered.

required
rule dict

A dictionary specifying the rule to evaluate. It should contain: - 'field': The column name in the DataFrame to evaluate. - 'check': The type of check or condition to apply. - 'value': The value or expression to evaluate against.

required

Returns:

Type Description
DataFrame

dd.DataFrame: A new Dask DataFrame containing rows that do not satisfy the rule.

DataFrame

An additional column dq_status is added, which contains a string in the format

DataFrame

"{field}:{check}:{value}" to indicate the rule that was violated.

Example

import dask.dataframe as dd data = {'col1': [1, 2, 3], 'col2': [4, 5, 6]} df = dd.from_pandas(pd.DataFrame(data), npartitions=1) rule = {'field': 'col1', 'check': '>', 'value': '2'} result = satisfies(df, rule) result.compute()

Source code in sumeh/engine/dask_engine.py
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
def satisfies(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Filters a Dask DataFrame based on a rule and returns rows that do not satisfy the rule.

    The function evaluates a rule on the given Dask DataFrame and identifies rows that
    violate the rule. The rule is specified as a dictionary containing a field, a check,
    and a value. The rule's logical expression is converted to Python syntax for evaluation.

    Args:
        df (dd.DataFrame): The input Dask DataFrame to be filtered.
        rule (dict): A dictionary specifying the rule to evaluate. It should contain:
            - 'field': The column name in the DataFrame to evaluate.
            - 'check': The type of check or condition to apply.
            - 'value': The value or expression to evaluate against.

    Returns:
        dd.DataFrame: A new Dask DataFrame containing rows that do not satisfy the rule.
        An additional column `dq_status` is added, which contains a string in the format
        "{field}:{check}:{value}" to indicate the rule that was violated.

    Example:
        >>> import dask.dataframe as dd
        >>> data = {'col1': [1, 2, 3], 'col2': [4, 5, 6]}
        >>> df = dd.from_pandas(pd.DataFrame(data), npartitions=1)
        >>> rule = {'field': 'col1', 'check': '>', 'value': '2'}
        >>> result = satisfies(df, rule)
        >>> result.compute()
    """
    field, check, value = __extract_params(rule)
    py_expr = value
    py_expr = re.sub(r"(?<![=!<>])=(?!=)", "==", py_expr)
    py_expr = re.sub(r"\bAND\b", "&", py_expr, flags=re.IGNORECASE)
    py_expr = re.sub(r"\bOR\b", "|", py_expr, flags=re.IGNORECASE)

    def _filter_viol(pdf: pd.DataFrame) -> pd.DataFrame:
        mask = pdf.eval(py_expr)
        return pdf.loc[~mask]

    meta = df._meta
    viol = df.map_partitions(_filter_viol, meta=meta)
    return viol.assign(dq_status=f"{field}:{check}:{value}")

summarize(qc_ddf, rules, total_rows)

Summarizes quality check results by evaluating rules against a Dask DataFrame.

Parameters:

Name Type Description Default
qc_ddf DataFrame

A Dask DataFrame containing quality check results. The DataFrame must include a "dq_status" column with rule violations in the format "column:rule:value".

required
rules list[dict]

A list of dictionaries representing the rules to be evaluated. Each dictionary should include keys such as "column", "rule", "value", and "pass_threshold".

required
total_rows int

The total number of rows in the original dataset.

required

Returns:

Type Description
DataFrame

pd.DataFrame: A summarized Pandas DataFrame containing the following columns: - id: Unique identifier for each rule evaluation. - timestamp: Timestamp of the summary generation. - check: The type of check performed (e.g., "Quality Check"). - level: The severity level of the check (e.g., "WARNING"). - column: The column name associated with the rule. - rule: The rule being evaluated. - value: The value associated with the rule. - rows: The total number of rows in the dataset. - violations: The number of rows that violated the rule. - pass_rate: The proportion of rows that passed the rule. - pass_threshold: The threshold for passing the rule. - status: The status of the rule evaluation ("PASS" or "FAIL").

Source code in sumeh/engine/dask_engine.py
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
def summarize(qc_ddf: dd.DataFrame, rules: list[dict], total_rows: int) -> pd.DataFrame:
    """
    Summarizes quality check results by evaluating rules against a Dask DataFrame.

    Args:
        qc_ddf (dd.DataFrame): A Dask DataFrame containing quality check results.
            The DataFrame must include a "dq_status" column with rule violations
            in the format "column:rule:value".
        rules (list[dict]): A list of dictionaries representing the rules to be
            evaluated. Each dictionary should include keys such as "column",
            "rule", "value", and "pass_threshold".
        total_rows (int): The total number of rows in the original dataset.

    Returns:
        pd.DataFrame: A summarized Pandas DataFrame containing the following columns:
            - id: Unique identifier for each rule evaluation.
            - timestamp: Timestamp of the summary generation.
            - check: The type of check performed (e.g., "Quality Check").
            - level: The severity level of the check (e.g., "WARNING").
            - column: The column name associated with the rule.
            - rule: The rule being evaluated.
            - value: The value associated with the rule.
            - rows: The total number of rows in the dataset.
            - violations: The number of rows that violated the rule.
            - pass_rate: The proportion of rows that passed the rule.
            - pass_threshold: The threshold for passing the rule.
            - status: The status of the rule evaluation ("PASS" or "FAIL").
    """
    df = qc_ddf.compute()

    df = df[df["dq_status"].astype(bool)]
    split = df["dq_status"].str.split(":", expand=True)
    split.columns = ["column", "rule", "value"]
    viol_count = (
        split.groupby(["column", "rule", "value"], dropna=False)
        .size()
        .reset_index(name="violations")
    )

    rules_df = _rules_to_df(rules)

    rules_df["value"] = rules_df["value"].fillna("")
    viol_count["value"] = viol_count["value"].fillna("")

    summary = (
        rules_df.merge(viol_count, on=["column", "rule", "value"], how="left")
        .assign(
            violations=lambda df: df["violations"].fillna(0).astype(int),
            rows=total_rows,
            pass_rate=lambda df: (total_rows - df["violations"]) / total_rows,
            status=lambda df: np.where(
                df["pass_rate"] >= df["pass_threshold"], "PASS", "FAIL"
            ),
            timestamp=datetime.now().replace(second=0, microsecond=0),
            check="Quality Check",
            level="WARNING",
        )
        .reset_index(drop=True)
    )

    summary.insert(0, "id", [str(uuid.uuid4()) for _ in range(len(summary))])

    summary = summary[
        [
            "id",
            "timestamp",
            "check",
            "level",
            "column",
            "rule",
            "value",
            "rows",
            "violations",
            "pass_rate",
            "pass_threshold",
            "status",
        ]
    ]

    return dd.from_pandas(summary, npartitions=1)

validate(df, rules)

Validate a Dask DataFrame against a set of rules and return the aggregated results and raw violations.

Parameters:

Name Type Description Default
df DataFrame

The input Dask DataFrame to validate.

required
rules list[dict]

A list of validation rules. Each rule is a dictionary containing the following keys: - "check_type" (str): The name of the validation function to execute. - "value" (optional): The value to be used in the validation function. - "execute" (optional, bool): Whether to execute the rule. Defaults to True.

required

Returns:

Type Description
tuple[DataFrame, DataFrame]

tuple[dd.DataFrame, dd.DataFrame]: - The first DataFrame contains the aggregated validation results, with a concatenated "dq_status" column indicating the validation status. - The second DataFrame contains the raw violations for each rule.

Source code in sumeh/engine/dask_engine.py
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
def validate(df: dd.DataFrame, rules: list[dict]) -> tuple[dd.DataFrame, dd.DataFrame]:
    """
    Validate a Dask DataFrame against a set of rules and return the aggregated results
    and raw violations.

    Args:
        df (dd.DataFrame): The input Dask DataFrame to validate.
        rules (list[dict]): A list of validation rules. Each rule is a dictionary
            containing the following keys:
            - "check_type" (str): The name of the validation function to execute.
            - "value" (optional): The value to be used in the validation function.
            - "execute" (optional, bool): Whether to execute the rule. Defaults to True.

    Returns:
        tuple[dd.DataFrame, dd.DataFrame]:
            - The first DataFrame contains the aggregated validation results,
              with a concatenated "dq_status" column indicating the validation status.
            - The second DataFrame contains the raw violations for each rule.
    """
    empty = dd.from_pandas(
        pd.DataFrame(columns=df.columns.tolist() + ["dq_status"]), npartitions=1
    )
    raw_df = empty

    for rule in rules:
        if not rule.get("execute", True):
            continue
        rule_name = rule["check_type"]
        func = globals().get(rule_name)
        if func is None:
            warnings.warn(f"Unknown rule: {rule_name}")
            continue

        raw_val = rule.get("value")
        try:
            value = (
                __convert_value(raw_val)
                if isinstance(raw_val, str) and raw_val not in ("", "NULL")
                else raw_val
            )
        except ValueError:
            value = raw_val

        viol = func(df, rule)
        raw_df = dd.concat([raw_df, viol], interleave_partitions=True)

    group_cols = [c for c in df.columns if c != "dq_status"]

    def _concat_status(series: pd.Series) -> str:
        return ";".join([s for s in series.astype(str) if s])

    agg_df = (
        raw_df.groupby(group_cols)
        .dq_status.apply(_concat_status, meta=("dq_status", "object"))
        .reset_index()
    )

    return agg_df, raw_df

validate_date_format(df, rule)

Validates the date format of a specified column in a Dask DataFrame.

This function checks whether the values in a specified column of the DataFrame conform to a given date format. Rows with invalid date formats are returned with an additional column indicating the validation status.

Parameters:

Name Type Description Default
df DataFrame

The Dask DataFrame to validate.

required
rule dict

A dictionary containing the validation rule. It should include the following keys: - 'field': The name of the column to validate. - 'check': A string describing the validation check. - 'fmt': The expected date format (e.g., '%Y-%m-%d').

required

Returns:

Type Description
DataFrame

dd.DataFrame: A DataFrame containing rows where the date format validation failed. An additional column dq_status is added, which contains a string describing the validation status in the format "{field}:{check}:{fmt}".

Source code in sumeh/engine/dask_engine.py
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
def validate_date_format(df: dd.DataFrame, rule: dict) -> dd.DataFrame:
    """
    Validates the date format of a specified column in a Dask DataFrame.

    This function checks whether the values in a specified column of the
    DataFrame conform to a given date format. Rows with invalid date formats
    are returned with an additional column indicating the validation status.

    Args:
        df (dd.DataFrame): The Dask DataFrame to validate.
        rule (dict): A dictionary containing the validation rule. It should
                     include the following keys:
                     - 'field': The name of the column to validate.
                     - 'check': A string describing the validation check.
                     - 'fmt': The expected date format (e.g., '%Y-%m-%d').

    Returns:
        dd.DataFrame: A DataFrame containing rows where the date format
                      validation failed. An additional column `dq_status`
                      is added, which contains a string describing the
                      validation status in the format "{field}:{check}:{fmt}".
    """
    field, check, fmt = __extract_params(rule)
    col_dt = dd.to_datetime(df[field], format=fmt, errors="coerce")
    viol = df[col_dt.isna()]
    return viol.assign(dq_status=f"{field}:{check}:{fmt}")

validate_schema(df, expected)

Validates the schema of a Dask DataFrame against an expected schema.

Parameters:

Name Type Description Default
df DataFrame

The Dask DataFrame whose schema is to be validated.

required
expected List[Dict[str, Any]]

A list of dictionaries representing the expected schema. Each dictionary should define the expected column name and its properties.

required

Returns:

Type Description
Tuple[bool, List[Tuple[str, str]]]

Tuple[bool, List[Tuple[str, str]]]: A tuple where the first element is a boolean indicating whether the schema matches the expected schema, and the second element is a list of tuples containing mismatched column names and their respective issues.

Source code in sumeh/engine/dask_engine.py
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
def validate_schema(
    df: dd.DataFrame, expected: List[Dict[str, Any]]
) -> Tuple[bool, List[Tuple[str, str]]]:
    """
    Validates the schema of a Dask DataFrame against an expected schema.

    Args:
        df (dd.DataFrame): The Dask DataFrame whose schema is to be validated.
        expected (List[Dict[str, Any]]): A list of dictionaries representing the expected schema.
            Each dictionary should define the expected column name and its properties.

    Returns:
        Tuple[bool, List[Tuple[str, str]]]: A tuple where the first element is a boolean indicating
            whether the schema matches the expected schema, and the second element is a list of
            tuples containing mismatched column names and their respective issues.
    """
    actual = __dask_schema_to_list(df)
    return __compare_schemas(actual, expected)