Skip to content

Module sumeh.core

This module provides a set of functions and utilities for data validation, schema retrieval, and summarization. It supports multiple data sources and engines, including BigQuery, S3, CSV files, MySQL, PostgreSQL, AWS Glue, DuckDB, and Databricks.

Functions:

Name Description
get_rules_config

str, **kwargs) -> List[Dict[str, Any]]: Retrieves configuration rules based on the specified source.

get_schema_config

str, **kwargs) -> List[Dict[str, Any]]: Retrieves the schema configuration based on the provided data source.

__detect_engine
validate_schema

Any, expected: List[Dict[str, Any]], engine: str, **engine_kwargs) -> Tuple[bool, List[Tuple[str, str]]]:

validate
summarize

list[dict], **context):

report

list[dict], name: str = "Quality Check"):

Constants

_CONFIG_DISPATCH: A dictionary mapping data source types (e.g., "mysql", "postgresql") to their respective configuration retrieval functions.

Imports

cuallee: Provides the Check and CheckLevel classes for data validation. warnings: Used to issue warnings for unknown rule names. importlib: Dynamically imports modules based on engine detection. typing: Provides type hints for function arguments and return values. re: Used for regular expression matching in source string parsing. sumeh.services.config: Contains functions for retrieving configurations and schemas from various data sources. sumeh.services.utils: Provides utility functions for value conversion and URI parsing.

The module uses Python's structural pattern matching (match-case) to handle different data source types and validation rules. The report function supports a wide range of validation checks, including completeness, uniqueness, value comparisons, patterns, and date-related checks. The validate and summarize functions dynamically detect the appropriate engine based on the input DataFrame type and delegate the processing to the corresponding engine module.

_CONFIG_DISPATCH = {'mysql': get_config_from_mysql, 'postgresql': get_config_from_postgresql} module-attribute

__convert_value(value)

Converts the provided value to the appropriate type (date, float, or int).

Depending on the format of the input value, it will be converted to a datetime object, a floating-point number (float), or an integer (int).

Parameters:

Name Type Description Default
value str

The value to be converted, represented as a string.

required

Returns:

Type Description

Union[datetime, float, int]: The converted value, which can be a datetime object, float, or int.

Raises:

Type Description
ValueError

If the value does not match an expected format.

Source code in sumeh/services/utils.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def __convert_value(value):
    """
    Converts the provided value to the appropriate type (date, float, or int).

    Depending on the format of the input value, it will be converted to a datetime object,
    a floating-point number (float), or an integer (int).

    Args:
        value (str): The value to be converted, represented as a string.

    Returns:
        Union[datetime, float, int]: The converted value, which can be a datetime object, float, or int.

    Raises:
        ValueError: If the value does not match an expected format.
    """
    from datetime import datetime

    value = value.strip()
    try:
        if "-" in value:
            return datetime.strptime(value, "%Y-%m-%d")
        else:
            return datetime.strptime(value, "%d/%m/%Y")
    except ValueError:
        if "." in value:
            return float(value)
        return int(value)

__detect_engine(df)

Detects the engine type of the given DataFrame based on its module.

Parameters:

Name Type Description Default
df

The DataFrame object whose engine type is to be detected.

required

Returns:

Name Type Description
str

A string representing the detected engine type. Possible values are: - "pyspark_engine" for PySpark DataFrames - "dask_engine" for Dask DataFrames - "polars_engine" for Polars DataFrames - "pandas_engine" for Pandas DataFrames - "duckdb_engine" for DuckDB or BigQuery DataFrames

Raises:

Type Description
TypeError

If the DataFrame type is unsupported.

Source code in sumeh/core.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
def __detect_engine(df):
    """
    Detects the engine type of the given DataFrame based on its module.

    Args:
        df: The DataFrame object whose engine type is to be detected.

    Returns:
        str: A string representing the detected engine type. Possible values are:
            - "pyspark_engine" for PySpark DataFrames
            - "dask_engine" for Dask DataFrames
            - "polars_engine" for Polars DataFrames
            - "pandas_engine" for Pandas DataFrames
            - "duckdb_engine" for DuckDB or BigQuery DataFrames

    Raises:
        TypeError: If the DataFrame type is unsupported.
    """
    mod = df.__class__.__module__
    match mod:
        case m if m.startswith("pyspark"):
            return "pyspark_engine"
        case m if m.startswith("dask"):
            return "dask_engine"
        case m if m.startswith("polars"):
            return "polars_engine"
        case m if m.startswith("pandas"):
            return "pandas_engine"
        case m if m.startswith("duckdb"):
            return "duckdb_engine"
        case m if m.startswith("bigquery"):
            return "duckdb_engine"
        case _:
            raise TypeError(f"Unsupported DataFrame type: {type(df)}")

__parse_databricks_uri(uri)

Parses a Databricks URI into its catalog, schema, and table components.

The URI is expected to follow the format protocol://catalog.schema.table or protocol://schema.table. If the catalog is not provided, it will be set to None. If the schema is not provided, the current database from the active Spark session will be used.

Parameters:

Name Type Description Default
uri str

The Databricks URI to parse.

required

Returns:

Type Description
Dict[str, Optional[str]]

Dict[str, Optional[str]]: A dictionary containing the parsed components: - "catalog" (Optional[str]): The catalog name, or None if not provided. - "schema" (Optional[str]): The schema name, or the current database if not provided. - "table" (Optional[str]): The table name.

Source code in sumeh/services/utils.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def __parse_databricks_uri(uri: str) -> Dict[str, Optional[str]]:
    """
    Parses a Databricks URI into its catalog, schema, and table components.

    The URI is expected to follow the format `protocol://catalog.schema.table` or
    `protocol://schema.table`. If the catalog is not provided, it will be set to `None`.
    If the schema is not provided, the current database from the active Spark session
    will be used.

    Args:
        uri (str): The Databricks URI to parse.

    Returns:
        Dict[str, Optional[str]]: A dictionary containing the parsed components:
            - "catalog" (Optional[str]): The catalog name, or `None` if not provided.
            - "schema" (Optional[str]): The schema name, or the current database if not provided.
            - "table" (Optional[str]): The table name.
    """
    _, path = uri.split("://", 1)
    parts = path.split(".")
    if len(parts) == 3:
        catalog, schema, table = parts
    elif len(parts) == 2:
        catalog, schema, table = None, parts[0], parts[1]
    else:
        from pyspark.sql import SparkSession

        spark = SparkSession.builder.getOrCreate()
        catalog = None
        schema = spark.catalog.currentDatabase()
        table = parts[0]
    return {"catalog": catalog, "schema": schema, "table": table}

get_config_from_bigquery(project_id, dataset_id, table_id, credentials_path=None, query=None)

Retrieves configuration data from a Google BigQuery table.

Parameters:

Name Type Description Default
project_id str

Google Cloud project ID.

required
dataset_id str

BigQuery dataset ID.

required
table_id str

BigQuery table ID.

required
credentials_path Optional[str]

Path to service account credentials file (if not provided, defaults to default credentials).

None
query Optional[str]

Custom SQL query to fetch data (if not provided, defaults to SELECT *).

None

Returns:

Type Description
List[Dict[str, str]]

List[Dict[str, str]]: A list of dictionaries representing the parsed configuration data.

Raises:

Type Description
RuntimeError

If there is an error while querying BigQuery.

Source code in sumeh/services/config.py
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
def get_config_from_bigquery(
    project_id: str,
    dataset_id: str,
    table_id: str,
    credentials_path: Optional[str] = None,
    query: Optional[str] = None,
) -> List[Dict[str, str]]:
    """
    Retrieves configuration data from a Google BigQuery table.

    Args:
        project_id (str): Google Cloud project ID.
        dataset_id (str): BigQuery dataset ID.
        table_id (str): BigQuery table ID.
        credentials_path (Optional[str]): Path to service account credentials file (if not provided, defaults to default credentials).
        query (Optional[str]): Custom SQL query to fetch data (if not provided, defaults to SELECT *).

    Returns:
        List[Dict[str, str]]: A list of dictionaries representing the parsed configuration data.

    Raises:
        RuntimeError: If there is an error while querying BigQuery.
    """
    from google.cloud import bigquery
    from google.auth.exceptions import DefaultCredentialsError

    if query is None:
        query = f"SELECT * FROM `{project_id}.{dataset_id}.{table_id}`"

    try:
        client = bigquery.Client(
            project=project_id,
            credentials=(
                None
                if credentials_path is None
                else bigquery.Credentials.from_service_account_file(credentials_path)
            ),
        )

        # Execute the query and convert the result to a pandas DataFrame
        data = client.query(query).to_dataframe()

        # Convert the DataFrame to a list of dictionaries
        data_dict = data.to_dict(orient="records")

        # Parse the data and return the result
        return __parse_data(data_dict)

    except DefaultCredentialsError as e:
        raise RuntimeError(f"Credentials error: {e}") from e

    except Exception as e:
        raise RuntimeError(f"Error occurred while querying BigQuery: {e}") from e

get_config_from_csv(file_path, delimiter=',')

Retrieves configuration data from a CSV file.

Parameters:

Name Type Description Default
file_path str

The local file path to the CSV file.

required
delimiter Optional[str]

The delimiter used in the CSV file (default is ",").

','

Returns:

Type Description
List[Dict[str, str]]

List[Dict[str, str]]: A list of dictionaries representing the parsed configuration data.

Raises:

Type Description
RuntimeError

If there is an error reading or processing the file.

Source code in sumeh/services/config.py
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
def get_config_from_csv(
    file_path: str, delimiter: Optional[str] = ","
) -> List[Dict[str, str]]:
    """
    Retrieves configuration data from a CSV file.

    Args:
        file_path (str): The local file path to the CSV file.
        delimiter (Optional[str]): The delimiter used in the CSV file (default is ",").

    Returns:
        List[Dict[str, str]]: A list of dictionaries representing the parsed configuration data.

    Raises:
        RuntimeError: If there is an error reading or processing the file.
    """
    try:
        file_content = __read_local_file(file_path)
        result = __read_csv_file(file_content, delimiter)

        return __parse_data(result)

    except FileNotFoundError as e:
        raise RuntimeError(f"File '{file_path}' not found. Error: {e}") from e

    except ValueError as e:
        raise ValueError(
            f"Error while parsing CSV file '{file_path}'. Error: {e}"
        ) from e

    except Exception as e:
        # Catch any unexpected exceptions
        raise RuntimeError(
            f"Unexpected error while processing CSV file '{file_path}'. Error: {e}"
        ) from e

get_config_from_databricks(catalog, schema, table, **kwargs)

Retrieves configuration data from a Databricks table and returns it as a list of dictionaries.

Parameters:

Name Type Description Default
catalog Optional[str]

The catalog name in Databricks. If provided, it will be included in the table's full path.

required
schema Optional[str]

The schema name in Databricks. If provided, it will be included in the table's full path.

required
table str

The name of the table to retrieve data from.

required
**kwargs

Additional keyword arguments (currently unused).

{}

Returns:

Type Description
List[Dict[str, Any]]

List[Dict[str, Any]]: A list of dictionaries, where each dictionary represents a row of data from the table.

Source code in sumeh/services/config.py
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
def get_config_from_databricks(
    catalog: Optional[str], schema: Optional[str], table: str, **kwargs
) -> List[Dict[str, Any]]:
    """
    Retrieves configuration data from a Databricks table and returns it as a list of dictionaries.

    Args:
        catalog (Optional[str]): The catalog name in Databricks. If provided, it will be included in the table's full path.
        schema (Optional[str]): The schema name in Databricks. If provided, it will be included in the table's full path.
        table (str): The name of the table to retrieve data from.
        **kwargs: Additional keyword arguments (currently unused).

    Returns:
        List[Dict[str, Any]]: A list of dictionaries, where each dictionary represents a row of data from the table.
    """
    from pyspark.sql import SparkSession

    spark = SparkSession.builder.getOrCreate()
    if catalog and schema:
        full = f"{catalog}.{schema}.{table}"
    elif schema:
        full = f"{schema}.{table}"
    else:
        full = table
    if "query" in kwargs.keys():
        df = spark.sql(f"select * from {full} where {kwargs['query']}")
    else:
        df = spark.table(full)
    return [row.asDict() for row in df.collect()]

get_config_from_duckdb(db_path, table=None, query=None, conn=None)

Retrieve configuration data from a DuckDB database.

This function fetches data from a DuckDB database either by executing a custom SQL query or by selecting all rows from a specified table. The data is then parsed into a list of dictionaries.

Parameters:

Name Type Description Default
db_path str

The path to the DuckDB database file.

required
table str

The name of the table to fetch data from. Defaults to None.

None
query str

A custom SQL query to execute. Defaults to None.

None
conn

A valid DuckDB connection object.

None

Returns:

Type Description
List[Dict[str, Any]]

List[Dict[str, Any]]: A list of dictionaries representing the fetched data.

Raises:

Type Description
ValueError

If neither table nor query is provided, or if a valid conn is not supplied.

Example

import duckdb conn = duckdb.connect('my_db.duckdb') config = get_config_from_duckdb('my_db.duckdb', table='rules', conn=conn)

Source code in sumeh/services/config.py
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
def get_config_from_duckdb(
    db_path: str, table: str = None, query: str = None, conn=None
) -> List[Dict[str, Any]]:
    """
    Retrieve configuration data from a DuckDB database.

    This function fetches data from a DuckDB database either by executing a custom SQL query
    or by selecting all rows from a specified table. The data is then parsed into a list of
    dictionaries.

    Args:
        db_path (str): The path to the DuckDB database file.
        table (str, optional): The name of the table to fetch data from. Defaults to None.
        query (str, optional): A custom SQL query to execute. Defaults to None.
        conn: A valid DuckDB connection object.

    Returns:
        List[Dict[str, Any]]: A list of dictionaries representing the fetched data.

    Raises:
        ValueError: If neither `table` nor `query` is provided, or if a valid `conn` is not supplied.

    Example:
        >>> import duckdb
        >>> conn = duckdb.connect('my_db.duckdb')
        >>> config = get_config_from_duckdb('my_db.duckdb', table='rules', conn=conn)
    """

    if query:
        df = conn.execute(query).fetchdf()
    elif table:
        df = conn.execute(f"SELECT * FROM {table}").fetchdf()
    else:
        raise ValueError(
            "DuckDB configuration requires:\n"
            "1. Either a `table` name or custom `query`\n"
            "2. A valid database `conn` connection object\n"
            "Example: get_config('duckdb', table='rules', conn=duckdb.connect('my_db.duckdb'))"
        )

    return __parse_data(df.to_dict(orient="records"))

get_config_from_glue_data_catalog(glue_context, database_name, table_name, query=None)

Retrieves configuration data from AWS Glue Data Catalog.

Parameters:

Name Type Description Default
glue_context

An instance of GlueContext.

required
database_name str

Glue database name.

required
table_name str

Glue table name.

required
query Optional[str]

Custom SQL query to fetch data (if provided).

None

Returns:

Type Description
List[Dict[str, str]]

List[Dict[str, str]]: A list of dictionaries representing the parsed configuration data.

Raises:

Type Description
RuntimeError

If there is an error querying Glue Data Catalog.

Source code in sumeh/services/config.py
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
def get_config_from_glue_data_catalog(
    glue_context, database_name: str, table_name: str, query: Optional[str] = None
) -> List[Dict[str, str]]:
    """
    Retrieves configuration data from AWS Glue Data Catalog.

    Args:
        glue_context: An instance of `GlueContext`.
        database_name (str): Glue database name.
        table_name (str): Glue table name.
        query (Optional[str]): Custom SQL query to fetch data (if provided).

    Returns:
        List[Dict[str, str]]: A list of dictionaries representing the parsed configuration data.

    Raises:
        RuntimeError: If there is an error querying Glue Data Catalog.
    """
    from awsglue.context import GlueContext

    if not isinstance(glue_context, GlueContext):
        raise ValueError("The provided context is not a valid GlueContext.")

    spark = glue_context.spark_session

    try:
        dynamic_frame = glue_context.create_dynamic_frame.from_catalog(
            database=database_name, table_name=table_name
        )

        data_frame = dynamic_frame.toDF()

        if query:
            data_frame.createOrReplaceTempView("table_name")
            data_frame = spark.sql(query)

        data_dict = [row.asDict() for row in data_frame.collect()]

        return __parse_data(data_dict)

    except Exception as e:
        raise RuntimeError(
            f"Error occurred while querying Glue Data Catalog: {e}"
        ) from e

get_config_from_mysql(connection=None, host=None, user=None, password=None, database=None, port=3306, schema=None, table=None, query=None)

Retrieves configuration data from a MySQL database.

Parameters:

Name Type Description Default
connection Optional

An existing MySQL connection object.

None
host Optional[str]

Host of the MySQL server.

None
user Optional[str]

Username to connect to MySQL.

None
password Optional[str]

Password for the MySQL user.

None
database Optional[str]

Database name to query.

None
port Optional[int]

The port for the MySQL connection (default is 3306).

3306
schema Optional[str]

Schema name if query is not provided.

None
table Optional[str]

Table name if query is not provided.

None
query Optional[str]

Custom SQL query to fetch data (if not provided, schema and table must be given).

None

Returns:

Type Description

List[Dict[str, Any]]: A list of dictionaries representing the parsed configuration data.

Raises:

Type Description
ValueError

If neither query nor both schema and table are provided.

ConnectionError

If there is an error connecting to MySQL.

RuntimeError

If there is an error executing the query or processing the data.

Source code in sumeh/services/config.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
def get_config_from_mysql(
    connection: Optional = None,
    host: Optional[str] = None,
    user: Optional[str] = None,
    password: Optional[str] = None,
    database: Optional[str] = None,
    port: Optional[int] = 3306,
    schema: Optional[str] = None,
    table: Optional[str] = None,
    query: Optional[str] = None,
):
    """
    Retrieves configuration data from a MySQL database.

    Args:
        connection (Optional): An existing MySQL connection object.
        host (Optional[str]): Host of the MySQL server.
        user (Optional[str]): Username to connect to MySQL.
        password (Optional[str]): Password for the MySQL user.
        database (Optional[str]): Database name to query.
        port (Optional[int]): The port for the MySQL connection (default is 3306).
        schema (Optional[str]): Schema name if query is not provided.
        table (Optional[str]): Table name if query is not provided.
        query (Optional[str]): Custom SQL query to fetch data (if not provided, `schema` and `table` must be given).

    Returns:
        List[Dict[str, Any]]: A list of dictionaries representing the parsed configuration data.

    Raises:
        ValueError: If neither `query` nor both `schema` and `table` are provided.
        ConnectionError: If there is an error connecting to MySQL.
        RuntimeError: If there is an error executing the query or processing the data.
    """
    import mysql.connector
    import pandas as pd

    if query is None and (schema is None or table is None):
        raise ValueError(
            "You must provide either a 'query' or both 'schema' and 'table'."
        )

    if query is None:
        query = f"SELECT * FROM {schema}.{table}"

    try:
        connection = connection or __create_connection(
            mysql.connector.connect, host, user, password, database, port
        )
        data = pd.read_sql(query, connection)
        data_dict = data.to_dict(orient="records")
        return __parse_data(data_dict)

    except mysql.connector.Error as e:
        raise ConnectionError(f"Error connecting to MySQL database: {e}")

    except Exception as e:
        raise RuntimeError(f"Error executing the query or processing data: {e}")

    finally:
        if connection and host is not None:
            connection.close()

get_config_from_postgresql(connection=None, host=None, user=None, password=None, database=None, port=5432, schema=None, table=None, query=None)

Retrieves configuration data from a PostgreSQL database.

Parameters:

Name Type Description Default
connection Optional

An existing PostgreSQL connection object.

None
host Optional[str]

Host of the PostgreSQL server.

None
user Optional[str]

Username to connect to PostgreSQL.

None
password Optional[str]

Password for the PostgreSQL user.

None
database Optional[str]

Database name to query.

None
port Optional[int]

The port for the PostgreSQL connection (default is 5432).

5432
schema Optional[str]

Schema name if query is not provided.

None
table Optional[str]

Table name if query is not provided.

None
query Optional[str]

Custom SQL query to fetch data (if not provided, schema and table must be given).

None

Returns:

Type Description
list[dict]

List[Dict[str, Any]]: A list of dictionaries representing the parsed configuration data.

Raises:

Type Description
ValueError

If neither query nor both schema and table are provided.

ConnectionError

If there is an error connecting to PostgreSQL.

RuntimeError

If there is an error executing the query or processing the data.

Source code in sumeh/services/config.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
def get_config_from_postgresql(
    connection: Optional = None,
    host: Optional[str] = None,
    user: Optional[str] = None,
    password: Optional[str] = None,
    database: Optional[str] = None,
    port: Optional[int] = 5432,
    schema: Optional[str] = None,
    table: Optional[str] = None,
    query: Optional[str] = None,
) -> list[dict]:
    """
    Retrieves configuration data from a PostgreSQL database.

    Args:
        connection (Optional): An existing PostgreSQL connection object.
        host (Optional[str]): Host of the PostgreSQL server.
        user (Optional[str]): Username to connect to PostgreSQL.
        password (Optional[str]): Password for the PostgreSQL user.
        database (Optional[str]): Database name to query.
        port (Optional[int]): The port for the PostgreSQL connection (default is 5432).
        schema (Optional[str]): Schema name if query is not provided.
        table (Optional[str]): Table name if query is not provided.
        query (Optional[str]): Custom SQL query to fetch data (if not provided, `schema` and `table` must be given).

    Returns:
        List[Dict[str, Any]]: A list of dictionaries representing the parsed configuration data.

    Raises:
        ValueError: If neither `query` nor both `schema` and `table` are provided.
        ConnectionError: If there is an error connecting to PostgreSQL.
        RuntimeError: If there is an error executing the query or processing the data.
    """
    import psycopg2
    import pandas as pd

    if query is None and (schema is None or table is None):
        raise ValueError(
            "You must provide either a 'query' or both 'schema' and 'table'."
        )

    if query is None:
        query = f"SELECT * FROM {schema}.{table}"

    try:
        connection = connection or __create_connection(
            psycopg2.connect, host, user, password, database, port
        )

        data = pd.read_sql(query, connection)

        data_dict = data.to_dict(orient="records")
        return __parse_data(data_dict)

    except psycopg2.Error as e:
        raise ConnectionError(f"Error connecting to PostgreSQL database: {e}")

    except Exception as e:
        raise RuntimeError(f"Error executing the query or processing data: {e}")

    finally:
        if connection and host is not None:
            connection.close()

get_config_from_s3(s3_path, delimiter=',')

Retrieves configuration data from a CSV file stored in an S3 bucket.

Parameters:

Name Type Description Default
s3_path str

The S3 path to the CSV file.

required
delimiter Optional[str]

The delimiter used in the CSV file (default is ",").

','

Returns:

Type Description

List[Dict[str, Any]]: A list of dictionaries representing the parsed configuration data.

Raises:

Type Description
RuntimeError

If there is an error reading or processing the S3 file.

Source code in sumeh/services/config.py
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def get_config_from_s3(s3_path: str, delimiter: Optional[str] = ","):
    """
    Retrieves configuration data from a CSV file stored in an S3 bucket.

    Args:
        s3_path (str): The S3 path to the CSV file.
        delimiter (Optional[str]): The delimiter used in the CSV file (default is ",").

    Returns:
        List[Dict[str, Any]]: A list of dictionaries representing the parsed configuration data.

    Raises:
        RuntimeError: If there is an error reading or processing the S3 file.
    """
    try:
        file_content = __read_s3_file(s3_path)
        data = __read_csv_file(file_content, delimiter)
        return __parse_data(data)

    except Exception as e:
        raise RuntimeError(f"Error reading or processing the S3 file: {e}")

get_rules_config(source, **kwargs)

Retrieve configuration rules based on the specified source.

Dispatches to the appropriate loader according to the format of source, returning a list of parsed rule dictionaries.

Supported sources
  • bigquery://<project>.<dataset>.<table>
  • s3://<bucket>/<path>
  • <file>.csv
  • "mysql" or "postgresql" (requires host/user/etc. in kwargs)
  • "glue" (AWS Glue Data Catalog)
  • duckdb://<db_path>.<table>
  • databricks://<catalog>.<schema>.<table>

Parameters:

Name Type Description Default
source str

Identifier of the rules configuration location. Determines which handler is invoked.

required
**kwargs

Loader-specific parameters (e.g. host, user, password, connection, query).

{}

Returns:

Type Description
List[Dict[str, Any]]

List[Dict[str, Any]]: A list of dictionaries, each representing a validation rule with keys like "field", "check_type", "value", "threshold", and "execute".

Raises:

Type Description
ValueError

If source does not match any supported format.

Source code in sumeh/core.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def get_rules_config(source: str, **kwargs) -> List[Dict[str, Any]]:
    """
    Retrieve configuration rules based on the specified source.

    Dispatches to the appropriate loader according to the format of `source`,
    returning a list of parsed rule dictionaries.

    Supported sources:
      - `bigquery://<project>.<dataset>.<table>`
      - `s3://<bucket>/<path>`
      - `<file>.csv`
      - `"mysql"` or `"postgresql"` (requires host/user/etc. in kwargs)
      - `"glue"` (AWS Glue Data Catalog)
      - `duckdb://<db_path>.<table>`
      - `databricks://<catalog>.<schema>.<table>`

    Args:
        source (str):
            Identifier of the rules configuration location. Determines which
            handler is invoked.
        **kwargs:
            Loader-specific parameters (e.g. `host`, `user`, `password`,
            `connection`, `query`).

    Returns:
        List[Dict[str, Any]]:
            A list of dictionaries, each representing a validation rule with keys
            like `"field"`, `"check_type"`, `"value"`, `"threshold"`, and `"execute"`.

    Raises:
        ValueError:
            If `source` does not match any supported format.
    """
    match source:
        case s if s.startswith("bigquery://"):
            _, path = s.split("://", 1)
            project, dataset, table = path.split(".")
            return get_config_from_bigquery(
                project_id=project,
                dataset_id=dataset,
                table_id=table,
                **kwargs,
            )

        case s if s.startswith("s3://"):
            return get_config_from_s3(s, **kwargs)

        case s if re.search(r"\.csv$", s, re.IGNORECASE):
            return get_config_from_csv(s, **kwargs)

        case "mysql" | "postgresql" as driver:
            loader = _CONFIG_DISPATCH[driver]
            return loader(**kwargs)

        case "glue":
            return get_config_from_glue_data_catalog(**kwargs)

        case s if s.startswith("duckdb://"):
            _, path = s.split("://", 1)
            db_path, table = path.rsplit(".", 1)
            conn = kwargs.pop("conn", None)
            return get_config_from_duckdb(
                conn=conn,
                table=table,
            )

        case s if s.startswith("databricks://"):
            parts = __parse_databricks_uri(s)
            return get_config_from_databricks(
                catalog=parts["catalog"],
                schema=parts["schema"],
                table=parts["table"],
                **kwargs,
            )

        case _:
            raise ValueError(f"Unknow source: {source}")

get_schema_config(source, **kwargs)

Retrieve the schema configuration based on the provided data source.

This function determines the appropriate method to extract schema information based on the format or type of the source string. It supports various data sources such as BigQuery, S3, CSV files, MySQL, PostgreSQL, AWS Glue, DuckDB, and Databricks.

Parameters:

Name Type Description Default
source str

A string representing the data source. The format of the string determines the method used to retrieve the schema. Supported formats include: bigquery://<project>.<dataset>.<table>, s3://<bucket>/<path>, <file>.csv, mysql, postgresql, glue, duckdb://<db_path>.<table>, databricks://<catalog>.<schema>.<table>

required
**kwargs

Additional keyword arguments required by specific schema retrieval methods. For example: For DuckDB: conn (a database connection object). For other sources: Additional parameters specific to the source.

{}

Returns:

Type Description
List[Dict[str, Any]]

List[Dict[str, Any]]: A list of dictionaries representing the schema

List[Dict[str, Any]]

configuration. Each dictionary contains details about a column in the

List[Dict[str, Any]]

schema.

Raises:

Type Description
ValueError

If the source string does not match any supported format.

Examples:

>>> get_schema_config("bigquery://my_project.my_dataset.my_table")
>>> get_schema_config("s3://my_bucket/my_file.csv")
>>> get_schema_config("mysql", host="localhost", user="root", password="password")
Source code in sumeh/core.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
def get_schema_config(source: str, **kwargs) -> List[Dict[str, Any]]:
    """
    Retrieve the schema configuration based on the provided data source.

    This function determines the appropriate method to extract schema information
    based on the format or type of the `source` string. It supports various data
    sources such as BigQuery, S3, CSV files, MySQL, PostgreSQL, AWS Glue, DuckDB,
    and Databricks.

    Args:
        source (str):
            A string representing the data source. The format of the
            string determines the method used to retrieve the schema. Supported
            formats include: `bigquery://<project>.<dataset>.<table>`, `s3://<bucket>/<path>`,
            `<file>.csv`, `mysql`, `postgresql`, `glue`, `duckdb://<db_path>.<table>`,
            `databricks://<catalog>.<schema>.<table>`
        **kwargs: Additional keyword arguments required by specific schema
            retrieval methods. For example:
            For DuckDB: `conn` (a database connection object).
            For other sources: Additional parameters specific to the source.

    Returns:
        List[Dict[str, Any]]: A list of dictionaries representing the schema
        configuration. Each dictionary contains details about a column in the
        schema.

    Raises:
        ValueError: If the `source` string does not match any supported format.

    Examples:
        >>> get_schema_config("bigquery://my_project.my_dataset.my_table")
        >>> get_schema_config("s3://my_bucket/my_file.csv")
        >>> get_schema_config("mysql", host="localhost", user="root", password="password")
    """
    match source:
        case s if s.startswith("bigquery://"):
            _, path = s.split("://", 1)
            project, dataset, table = path.split(".")
            return get_schema_from_bigquery(
                project_id=project,
                dataset_id=dataset,
                table_id=table,
                **kwargs,
            )

        case s if s.startswith("s3://"):
            return get_schema_from_s3(s, **kwargs)

        case s if re.search(r"\.csv$", s, re.IGNORECASE):
            return get_schema_from_csv(s, **kwargs)

        case "mysql":
            return get_schema_from_mysql(**kwargs)

        case "postgresql":
            return get_schema_from_postgresql(**kwargs)

        case "glue":
            return get_schema_from_glue(**kwargs)

        case s if s.startswith("duckdb://"):
            conn = kwargs.pop("conn")
            _, path = s.split("://", 1)
            db_path, table = path.rsplit(".", 1)
            return get_schema_from_duckdb(conn=conn, table=table)

        case s if s.startswith("databricks://"):
            parts = __parse_databricks_uri(s)
            return get_schema_from_databricks(
                catalog=parts["catalog"],
                schema=parts["schema"],
                table=parts["table"],
                **kwargs,
            )

        case _:
            raise ValueError(f"Unknow source: {source}")

report(df, rules, name='Quality Check')

Performs a quality check on the given DataFrame based on the provided rules.

The function iterates over a list of rules and applies different checks to the specified fields of the DataFrame. The checks include validation of completeness, uniqueness, specific values, patterns, and other conditions. Each rule corresponds to a particular type of validation, such as 'is_complete', 'is_greater_than', 'has_mean', etc. After applying the checks, the function returns the result of the validation.

Parameters: - df (DataFrame): The DataFrame to be validated. - rules (list of dict): A list of rules defining the checks to be performed. Each rule is a dictionary with the following keys: - "check_type": The type of check to apply. - "field": The column of the DataFrame to check. - "value" (optional): The value used for comparison in some checks (e.g., for 'is_greater_than'). - "threshold" (optional): A percentage threshold to be applied in some checks. - name (str): The name of the quality check (default is "Quality Check").

Returns: - quality_check (CheckResult): The result of the quality validation.

Warnings: - If an unknown rule name is encountered, a warning is generated.

Source code in sumeh/core.py
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
def report(df, rules: list[dict], name: str = "Quality Check"):
    """
    Performs a quality check on the given DataFrame based on the provided rules.

    The function iterates over a list of rules and applies different checks to the
    specified fields of the DataFrame. The checks include validation of completeness,
    uniqueness, specific values, patterns, and other conditions. Each rule corresponds
    to a particular type of validation, such as 'is_complete', 'is_greater_than',
    'has_mean', etc. After applying the checks, the function returns the result of
    the validation.

    Parameters:
    - df (DataFrame): The DataFrame to be validated.
    - rules (list of dict): A list of rules defining the checks to be performed.
        Each rule is a dictionary with the following keys:
        - "check_type": The type of check to apply.
        - "field": The column of the DataFrame to check.
        - "value" (optional): The value used for comparison in some checks (e.g., for 'is_greater_than').
        - "threshold" (optional): A percentage threshold to be applied in some checks.
    - name (str): The name of the quality check (default is "Quality Check").

    Returns:
    - quality_check (CheckResult): The result of the quality validation.

    Warnings:
    - If an unknown rule name is encountered, a warning is generated.
    """

    check = Check(CheckLevel.WARNING, name)
    for rule in rules:
        rule_name = rule["check_type"]
        field = rule["field"]
        threshold = rule.get("threshold", 1.0)
        threshold = 1.0 if threshold is None else threshold

        match rule_name:

            case "is_complete":
                check = check.is_complete(field, pct=threshold)

            case "is_unique":
                check = check.is_unique(field, pct=threshold)

            case "is_primary_key":
                check = check.is_primary_key(field, pct=threshold)

            case "are_complete":
                check = check.are_complete(field, pct=threshold)

            case "are_unique":
                check = check.are_complete(field, pct=threshold)

            case "is_composite_key":
                check = check.are_complete(field, pct=threshold)

            case "is_greater_than":
                value = __convert_value(rule["value"])
                check = check.is_greater_than(field, value, pct=threshold)

            case "is_positive":
                check = check.is_positive(field, pct=threshold)

            case "is_negative":
                check = check.is_negative(field, pct=threshold)

            case "is_greater_or_equal_than":
                value = __convert_value(rule["value"])
                check = check.is_greater_or_equal_than(field, value, pct=threshold)

            case "is_less_than":
                value = __convert_value(rule["value"])
                check = check.is_less_than(field, value, pct=threshold)

            case "is_less_or_equal_than":
                value = __convert_value(rule["value"])
                check = check.is_less_or_equal_than(field, value, pct=threshold)

            case "is_equal_than":
                value = __convert_value(rule["value"])
                check = check.is_equal_than(field, value, pct=threshold)

            case "is_contained_in" | "is_in":
                values = rule["value"]
                values = values.replace("[", "").replace("]", "").split(",")
                values = tuple([value.strip() for value in values])
                check = check.is_contained_in(field, values, pct=threshold)

            case "not_contained_in" | "not_in":
                values = rule["value"]
                values = values.replace("[", "").replace("]", "").split(",")
                values = tuple([value.strip() for value in values])
                check = check.is_contained_in(field, values, pct=threshold)

            case "is_between":
                values = rule["value"]
                values = values.replace("[", "").replace("]", "").split(",")
                values = tuple(__convert_value(value) for value in values)
                check = check.is_between(field, values, pct=threshold)

            case "has_pattern":
                pattern = rule["value"]
                check = check.has_pattern(field, pattern, pct=threshold)

            case "is_legit":
                check = check.is_legit(field, pct=threshold)

            case "has_min":
                value = __convert_value(rule["value"])
                check = check.has_min(field, value)

            case "has_max":
                value = __convert_value(rule["value"])
                check = check.has_max(field, value)

            case "has_std":
                value = __convert_value(rule["value"])
                check = check.has_std(field, value)

            case "has_mean":
                value = __convert_value(rule["value"])
                check = check.has_mean(field, value)

            case "has_sum":
                value = __convert_value(rule["value"])
                check = check.has_sum(field, value)

            case "has_cardinality":
                value = __convert_value(rule["value"])
                check = check.has_cardinality(field, value)

            case "has_infogain":
                check = check.has_infogain(field, pct=threshold)

            case "has_entropy":
                value = __convert_value(rule["value"])
                check = check.has_entropy(field, value)

            case "is_in_millions":
                check = check.is_in_millions(field, pct=threshold)

            case "is_in_billions":
                check = check.is_in_millions(field, pct=threshold)

            case "is_t_minus_1":
                check = check.is_t_minus_1(field, pct=threshold)

            case "is_t_minus_2":
                check = check.is_t_minus_2(field, pct=threshold)

            case "is_t_minus_3":
                check = check.is_t_minus_3(field, pct=threshold)

            case "is_today":
                check = check.is_today(field, pct=threshold)

            case "is_yesterday":
                check = check.is_yesterday(field, pct=threshold)

            case "is_on_weekday":
                check = check.is_on_weekday(field, pct=threshold)

            case "is_on_weekend":
                check = check.is_on_weekend(field, pct=threshold)

            case "is_on_monday":
                check = check.is_on_monday(field, pct=threshold)

            case "is_on_tuesday":
                check = check.is_on_tuesday(field, pct=threshold)

            case "is_on_wednesday":
                check = check.is_on_wednesday(field, pct=threshold)

            case "is_on_thursday":
                check = check.is_on_thursday(field, pct=threshold)

            case "is_on_friday":
                check = check.is_on_friday(field, pct=threshold)

            case "is_on_saturday":
                check = check.is_on_saturday(field, pct=threshold)

            case "is_on_sunday":
                check = check.is_on_sunday(field, pct=threshold)

            case "satisfies":
                predicate = rule["value"]
                check = check.satisfies(field, predicate, pct=threshold)

            case _:
                warnings.warn(f"Unknown rule name: {rule_name}, {field}")

    quality_check = check.validate(df)
    return quality_check

summarize(df, rules, **context)

Summarizes a DataFrame based on the provided rules and context.

This function dynamically detects the appropriate engine to use for summarization based on the type of the input DataFrame. It delegates the summarization process to the corresponding engine module.

Parameters:

Name Type Description Default
df

The input DataFrame to be summarized. The type of the DataFrame determines the engine used for summarization.

required
rules list[dict]

A list of dictionaries defining the summarization rules. Each dictionary specifies the operations or transformations to be applied.

required
**context

Additional context parameters required by specific engines. Common parameters include: - conn: A database connection object (used by certain engines like DuckDB). - total_rows: The total number of rows in the DataFrame (optional).

{}

Returns:

Type Description

The summarized DataFrame as processed by the appropriate engine.

Raises:

Type Description
TypeError

If the type of the input DataFrame is unsupported.

Notes
  • The function uses the __detect_engine method to determine the engine name based on the input DataFrame.
  • Supported engines are dynamically imported from the sumeh.engine package.
  • The "duckdb_engine" case requires a database connection (conn) to be passed in the context.
Example

summarized_df = summarize(df, rules=[{"operation": "sum", "column": "sales"}], conn=my_conn)

Source code in sumeh/core.py
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
def summarize(df, rules: list[dict], **context):
    """
    Summarizes a DataFrame based on the provided rules and context.

    This function dynamically detects the appropriate engine to use for summarization
    based on the type of the input DataFrame. It delegates the summarization process
    to the corresponding engine module.

    Args:
        df: The input DataFrame to be summarized. The type of the DataFrame determines
            the engine used for summarization.
        rules (list[dict]): A list of dictionaries defining the summarization rules.
            Each dictionary specifies the operations or transformations to be applied.
        **context: Additional context parameters required by specific engines. Common
            parameters include:
            - conn: A database connection object (used by certain engines like DuckDB).
            - total_rows: The total number of rows in the DataFrame (optional).

    Returns:
        The summarized DataFrame as processed by the appropriate engine.

    Raises:
        TypeError: If the type of the input DataFrame is unsupported.

    Notes:
        - The function uses the `__detect_engine` method to determine the engine name
          based on the input DataFrame.
        - Supported engines are dynamically imported from the `sumeh.engine` package.
        - The "duckdb_engine" case requires a database connection (`conn`) to be passed
          in the context.

    Example:
        >>> summarized_df = summarize(df, rules=[{"operation": "sum", "column": "sales"}], conn=my_conn)
    """
    engine_name = __detect_engine(df)
    engine = import_module(f"sumeh.engine.{engine_name}")
    match engine_name:
        case "duckdb_engine":
            return engine.summarize(
                df_rel=df,
                rules=rules,
                conn=context.get("conn"),
                total_rows=context.get("total_rows"),
            )
        case _:
            return engine.summarize(df, rules, total_rows=context.get("total_rows"))

    raise TypeError(f"Unsupported DataFrame type: {type(df)}")

validate(df, rules, **context)

Validates a DataFrame against a set of rules using the appropriate engine.

This function dynamically detects the engine to use based on the input DataFrame and delegates the validation process to the corresponding engine's implementation.

Parameters:

Name Type Description Default
df DataFrame

The input DataFrame to be validated.

required
rules list or dict

The validation rules to be applied to the DataFrame.

required
**context

Additional context parameters that may be required by the engine. - conn (optional): A database connection object, required for certain engines like "duckdb_engine".

{}

Returns:

Type Description

bool or dict: The result of the validation process. The return type and structure

depend on the specific engine's implementation.

Raises:

Type Description
ImportError

If the required engine module cannot be imported.

AttributeError

If the detected engine does not have a validate method.

Notes
  • The engine is dynamically determined based on the DataFrame type or other characteristics.
  • For "duckdb_engine", a database connection object should be provided in the context under the key "conn".
Source code in sumeh/core.py
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
def validate(df, rules, **context):
    """
    Validates a DataFrame against a set of rules using the appropriate engine.

    This function dynamically detects the engine to use based on the input
    DataFrame and delegates the validation process to the corresponding engine's
    implementation.

    Args:
        df (DataFrame): The input DataFrame to be validated.
        rules (list or dict): The validation rules to be applied to the DataFrame.
        **context: Additional context parameters that may be required by the engine.
            - conn (optional): A database connection object, required for certain engines
              like "duckdb_engine".

    Returns:
        bool or dict: The result of the validation process. The return type and structure
        depend on the specific engine's implementation.

    Raises:
        ImportError: If the required engine module cannot be imported.
        AttributeError: If the detected engine does not have a `validate` method.

    Notes:
        - The engine is dynamically determined based on the DataFrame type or other
          characteristics.
        - For "duckdb_engine", a database connection object should be provided in the
          context under the key "conn".
    """
    engine_name = __detect_engine(df)
    engine = import_module(f"sumeh.engine.{engine_name}")

    match engine_name:
        case "duckdb_engine":
            return engine.validate(df, rules, context.get("conn"))
        case _:
            return engine.validate(df, rules)

validate_schema(df_or_conn, expected, engine, **engine_kwargs)

Validates the schema of a given data source or connection against an expected schema.

Parameters:

Name Type Description Default
df_or_conn Any

The data source or connection to validate. This can be a DataFrame, database connection, or other supported data structure.

required
expected List[Dict[str, Any]]

A list of dictionaries defining the expected schema. Each dictionary should describe a column or field, including its name, type, and other attributes.

required
engine str

The name of the engine to use for validation. This determines the specific validation logic to apply based on the data source type.

required
**engine_kwargs

Additional keyword arguments to pass to the engine's validation logic.

{}

Returns:

Type Description
Tuple[bool, List[Tuple[str, str]]]

Tuple[bool, List[Tuple[str, str]]]: A tuple where the first element is a boolean indicating whether the schema is valid, and the second element is a list of tuples containing error messages for any validation failures. Each tuple consists of the field name and the corresponding error message.

Source code in sumeh/core.py
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
def validate_schema(
    df_or_conn: Any, expected: List[Dict[str, Any]], engine: str, **engine_kwargs
) -> Tuple[bool, List[Tuple[str, str]]]:
    """
    Validates the schema of a given data source or connection against an expected schema.

    Args:
        df_or_conn (Any): The data source or connection to validate. This can be a DataFrame,
                          database connection, or other supported data structure.
        expected (List[Dict[str, Any]]): A list of dictionaries defining the expected schema.
                                         Each dictionary should describe a column or field,
                                         including its name, type, and other attributes.
        engine (str): The name of the engine to use for validation. This determines the
                      specific validation logic to apply based on the data source type.
        **engine_kwargs: Additional keyword arguments to pass to the engine's validation logic.

    Returns:
        Tuple[bool, List[Tuple[str, str]]]: A tuple where the first element is a boolean indicating
                                            whether the schema is valid, and the second element is
                                            a list of tuples containing error messages for any
                                            validation failures. Each tuple consists of the field
                                            name and the corresponding error message.
    """
    engine_name = __detect_engine(df_or_conn)
    engine = import_module(f"sumeh.engine.{engine_name}")
    return engine.validate_schema(df_or_conn, expected=expected, **engine_kwargs)