Skip to content

Configuration

sumeh.core.config

This module provides a set of utility functions to retrieve and parse configuration data from various data sources, including S3, MySQL, PostgreSQL, BigQuery, CSV files, AWS Glue Data Catalog, DuckDB, and Databricks. Additionally, it includes functions to infer schema information from these sources.

Functions:

Name Description
get_config_from_s3

str, delimiter: Optional[str] = ",") -> List[Dict[str, Any]]:

get_config_from_mysql
get_config_from_postgresql
get_config_from_bigquery
get_config_from_csv

str, delimiter: Optional[str] = ",") -> List[Dict[str, str]]: Retrieves configuration data from a local CSV file.

get_config_from_glue_data_catalog
get_config_from_duckdb

Retrieves configuration data from a DuckDB database.

get_config_from_databricks

Retrieves configuration data from a Databricks table.

get_schema_from_csv

str, delimiter: str = ",", sample_size: int = 1_000) -> List[Dict[str, Any]]: Infers the schema of a CSV file based on its content.

get_schema_from_s3

str, **kwargs) -> List[Dict[str, Any]]: Infers the schema of a CSV file stored in S3.

get_schema_from_mysql

Retrieves schema information from a MySQL database table.

get_schema_from_postgresql

Retrieves schema information from a PostgreSQL database table.

get_schema_from_bigquery

Retrieves schema information from a Google BigQuery table.

get_schema_from_glue

Retrieves schema information from AWS Glue Data Catalog.

get_schema_from_duckdb

Retrieves schema information from a DuckDB database table.

get_schema_from_databricks

Retrieves schema information from a Databricks table.

__read_s3_file

str) -> Optional[str]:

__parse_s3_path

str) -> Tuple[str, str]:

__read_local_file

str) -> str:

__read_csv_file

str, delimiter: Optional[str] = ",") -> List[Dict[str, str]]:

__parse_data

list[dict]) -> list[dict]: Parses the configuration data into a structured format.

__create_connection
infer_basic_type

str) -> str: Infers the basic data type of given value.

__create_connection

__create_connection(connect_func, host, user, password, database, port) -> Any

Helper function to create a database connection.

Parameters:

Name Type Description Default
connect_func

A connection function (e.g., mysql.connector.connect or psycopg2.connect).

required
host str

The host of the database server.

required
user str

The username for the database.

required
password str

The password for the database.

required
database str

The name of the database.

required
port int

The port to connect to.

required

Returns:

Name Type Description
Connection Any

A connection object for the database.

Raises:

Type Description
ConnectionError

If there is an error establishing the connection.

__parse_data

__parse_data(data: list[dict]) -> List[RuleDef]

Parse configuration data into validated Rule objects.

Parameters:

Name Type Description Default
data list[dict]

Raw configuration data as list of dictionaries

required

Returns:

Type Description
List[RuleDef]

List[Rule]: Validated Rule objects with enriched metadata

Note

Engine compatibility is not validated here - only rule existence. Engine validation happens during execution in validate().

__parse_s3_path

__parse_s3_path(s3_path: str) -> Tuple[str, str]

Parses an S3 path into its bucket and key components.

Parameters:

Name Type Description Default
s3_path str

The S3 path to parse. Must start with "s3://".

required

Returns:

Type Description
Tuple[str, str]

Tuple[str, str]: A tuple containing the bucket name and the key.

Raises:

Type Description
ValueError

If the S3 path does not start with "s3://", or if the path format is invalid and cannot be split into bucket and key.

__read_csv_file

__read_csv_file(file_content: str, delimiter: Optional[str] = ',') -> List[Dict[str, str]]

Parses the content of a CSV file.

Parameters:

Name Type Description Default
file_content str

The content of the CSV file as a string.

required
delimiter str

The delimiter used in the CSV file.

','

Returns:

Type Description
List[Dict[str, str]]

List[Dict[str, str]]: A list of dictionaries representing the parsed CSV data.

Raises:

Type Description
ValueError

If there is an error parsing the CSV content.

__read_local_file

__read_local_file(file_path: str) -> str

Reads the content of a local file.

Parameters:

Name Type Description Default
file_path str

The local file path to be read.

required

Returns:

Name Type Description
str str

The content of the file.

Raises:

Type Description
FileNotFoundError

If the file is not found.

__read_s3_file

__read_s3_file(s3_path: str) -> Optional[str]

Reads the content of a file stored in S3.

Parameters:

Name Type Description Default
s3_path str

The S3 path of the file.

required

Returns:

Name Type Description
str Optional[str]

The content of the S3 file.

Raises:

Type Description
RuntimeError

If there is an error retrieving the file from S3.

get_config_from_bigquery

get_config_from_bigquery(project_id: str, dataset_id: str, table_id: str, credentials_path: Optional[str] = None, client: Optional[Any] = None, query: Optional[str] = None) -> List[RuleDef]

Retrieves configuration data from a Google BigQuery table.

Parameters:

Name Type Description Default
project_id str

Google Cloud project ID.

required
dataset_id str

BigQuery dataset ID.

required
table_id str

BigQuery table ID.

required
credentials_path Optional[str]

Path to service account credentials file (if not provided, uses default credentials).

None
client Optional[Any]

Optional instance of google.cloud.bigquery.Client. If provided, it will be used and credentials_path ignored.

None
query Optional[str]

Optional custom SQL query. If not provided, defaults to SELECT * FROM project.dataset.table.

None

Returns:

Type Description
List[RuleDef]

List[Dict[str, Any]]: A list of records (dicts) returned by BigQuery (optionally parsed by __parse_data).

Raises:

Type Description
RuntimeError

If there is an error while querying BigQuery or with credentials.

get_config_from_csv

get_config_from_csv(file_path: str, delimiter: Optional[str] = ',') -> List[RuleDef]

Retrieves configuration data from a CSV file.

Parameters:

Name Type Description Default
file_path str

The local file path to the CSV file.

required
delimiter Optional[str]

The delimiter used in the CSV file (default is ",").

','

Returns:

Type Description
List[RuleDef]

List[Dict[str, str]]: A list of dictionaries representing the parsed configuration data.

Raises:

Type Description
RuntimeError

If there is an error reading or processing the file.

get_config_from_databricks

get_config_from_databricks(spark, catalog: Optional[str], schema: Optional[str], table: str, **kwargs) -> List[RuleDef]

Retrieves configuration data from a Databricks table and returns it as a list of dictionaries.

Parameters:

Name Type Description Default
spark SparkSession

Spark Session to get information from Databricks

required
catalog Optional[str]

The catalog name in Databricks. If provided, it will be included in the table's full path.

required
schema Optional[str]

The schema name in Databricks. If provided, it will be included in the table's full path.

required
table str

The name of the table to retrieve data from.

required
query

Additional keyword arguments (currently unused).

required

Returns:

Type Description
List[RuleDef]

List[Dict[str, Any]]: A list of dictionaries, where each dictionary represents a row of data from the table.

get_config_from_duckdb

get_config_from_duckdb(table: str = None, query: str = None, conn=None) -> List[RuleDef]

Retrieve configuration data from a DuckDB database.

This function fetches data from a DuckDB database either by executing a custom SQL query or by selecting all rows from a specified table. The data is then parsed into a list of dictionaries.

Parameters:

Name Type Description Default
table str

The name of the table to fetch data from. Defaults to None.

None
query str

A custom SQL query to execute. Defaults to None.

None
conn

A valid DuckDB connection object.

None

Returns:

Type Description
List[RuleDef]

List[Dict[str, Any]]: A list of dictionaries representing the fetched data.

Raises:

Type Description
ValueError

If neither table nor query is provided, or if a valid conn is not supplied.

Example

import duckdb conn = duckdb.connect('my_db.duckdb') config = get_config_from_duckdb('my_db.duckdb', table='rules', conn=conn)

get_config_from_glue_data_catalog

get_config_from_glue_data_catalog(glue_context, database_name: str, table_name: str, query: Optional[str] = None) -> List[RuleDef]

Retrieves configuration data from AWS Glue Data Catalog.

Using Spark directly - works with all table formats (Parquet, ORC, CSV, Iceberg, Delta, Hudi).

Parameters:

Name Type Description Default
glue_context

An instance of GlueContext.

required
database_name str

Glue database name.

required
table_name str

Glue table name.

required
query Optional[str]

Custom SQL query to fetch data (if provided).

None

Returns:

Type Description
List[RuleDef]

List[Dict[str, str]]: A list of dictionaries representing the parsed configuration data.

Raises:

Type Description
RuntimeError

If there is an error querying Glue Data Catalog.

get_config_from_mysql

get_config_from_mysql(host: str = None, user: str = None, password: str = None, database: str = None, schema: str = None, table: str = None, port: int = 3306, query: str = None, conn=None) -> List[RuleDef]

Get configuration from MySQL table

Parameters:

Name Type Description Default
host str

MySQL host (not needed if conn is provided)

None
user str

MySQL user (not needed if conn is provided)

None
password str

MySQL password (not needed if conn is provided)

None
database str

Database name (not needed if conn is provided)

None
schema str

Schema name (optional)

None
table str

Table name to query

None
port int

MySQL port (default: 3306)

3306
query str

Optional custom query (if not provided, uses schema and table)

None
conn

Existing MySQL connection (optional)

None

Returns:

Type Description
List[RuleDef]

List of dicts with configuration data

get_config_from_postgresql

get_config_from_postgresql(connection: Optional = None, host: Optional[str] = None, user: Optional[str] = None, password: Optional[str] = None, database: Optional[str] = None, port: Optional[int] = 5432, schema: Optional[str] = None, table: Optional[str] = None, query: Optional[str] = None) -> List[RuleDef]

Retrieves configuration data from a PostgreSQL database.

Parameters:

Name Type Description Default
connection Optional

An existing PostgreSQL connection object.

None
host Optional[str]

Host of the PostgreSQL server.

None
user Optional[str]

Username to connect to PostgreSQL.

None
password Optional[str]

Password for the PostgreSQL user.

None
database Optional[str]

Database name to query.

None
port Optional[int]

The port for the PostgreSQL connection (default is 5432).

5432
schema Optional[str]

Schema name if query is not provided.

None
table Optional[str]

Table name if query is not provided.

None
query Optional[str]

Custom SQL query to fetch data (if not provided, schema and table must be given).

None

Returns:

Type Description
List[RuleDef]

List[Dict[str, Any]]: A list of dictionaries representing the parsed configuration data.

Raises:

Type Description
ValueError

If neither query nor both schema and table are provided.

ConnectionError

If there is an error connecting to PostgreSQL.

RuntimeError

If there is an error executing the query or processing the data.

get_config_from_s3

get_config_from_s3(s3_path: str, delimiter: Optional[str] = ',')

Retrieves configuration data from a CSV file stored in an S3 bucket.

Parameters:

Name Type Description Default
s3_path str

The S3 path to the CSV file.

required
delimiter Optional[str]

The delimiter used in the CSV file (default is ",").

','

Returns:

Type Description

List[Dict[str, Any]]: A list of dictionaries representing the parsed configuration data.

Raises:

Type Description
RuntimeError

If there is an error reading or processing the S3 file.

get_schema_from_bigquery

get_schema_from_bigquery(project_id: str, dataset_id: str, table_id: str, credentials_path: str = None, registry_table: str = 'schema_registry', query: str = None) -> List[Dict[str, Any]]

Get schema from BigQuery schema_registry table

Parameters:

Name Type Description Default
project_id str

BigQuery project ID

required
dataset_id str

BigQuery dataset ID

required
table_id str

Table name to look up in the registry

required
credentials_path str

Path to service account credentials file

None
registry_table str

Name of the schema registry table

'schema_registry'
query str

Optional custom WHERE clause for additional filters

None

Returns:

Type Description
List[Dict[str, Any]]

List of dicts with schema information

get_schema_from_csv

get_schema_from_csv(file_path: str, table: str, delimiter: str = ',', query: str = None) -> List[Dict[str, Any]]

Get schema from CSV schema_registry file

Parameters:

Name Type Description Default
file_path str

Path to the schema_registry CSV file

required
table str

Table name to look up in the registry

required
delimiter str

CSV delimiter (default: ',')

','
query str

Optional custom WHERE clause for additional filters (NOT SUPPORTED for CSV)

None

Returns:

Type Description
List[Dict[str, Any]]

List of dicts with schema information

get_schema_from_databricks

get_schema_from_databricks(spark, catalog: str, schema: str, table: str, registry_table: str = 'schema_registry', query: str = None) -> List[Dict[str, Any]]

Get schema from Databricks Unity Catalog schema_registry table

Parameters:

Name Type Description Default
spark

SparkSession instance

required
catalog str

Unity Catalog name containing the registry

required
schema str

Schema name containing the registry table

required
table str

Table name to look up in the registry

required
registry_table str

Name of the schema registry table (default: 'schema_registry')

'schema_registry'
query str

Optional custom WHERE clause for additional filters

None

Returns:

Type Description
List[Dict[str, Any]]

List of dicts with schema information

get_schema_from_duckdb

get_schema_from_duckdb(conn, table: str, registry_table: str = 'schema_registry', query: str = None) -> List[Dict[str, Any]]

Get schema from DuckDB schema_registry table

Parameters:

Name Type Description Default
conn

DuckDB connection object

required
table str

Table name to look up in the registry

required
registry_table str

Name of the schema registry table (default: 'schema_registry')

'schema_registry'
query str

Optional custom WHERE clause for additional filters

None

Returns:

Type Description
List[Dict[str, Any]]

List of dicts with schema information

get_schema_from_glue

get_schema_from_glue(glue_context, database_name: str, table_name: str, registry_table: str = 'schema_registry', query: str = None) -> List[Dict[str, Any]]

Get schema from Glue Data Catalog schema_registry table

Parameters:

Name Type Description Default
glue_context

GlueContext instance

required
database_name str

Glue database containing the registry table

required
table_name str

Table name to look up in the registry

required
registry_table str

Name of the schema registry table (default: 'schema_registry')

'schema_registry'
query str

Optional custom WHERE clause for additional filters

None

Returns:

Type Description
List[Dict[str, Any]]

List of dicts with schema information

get_schema_from_mysql

get_schema_from_mysql(host: str = None, user: str = None, password: str = None, database: str = None, table: str = None, port: int = 3306, registry_table: str = 'schema_registry', query: str = None, conn=None) -> List[Dict[str, Any]]

Get schema from MySQL schema_registry table

Parameters:

Name Type Description Default
host str

MySQL host (not needed if conn is provided)

None
user str

MySQL user (not needed if conn is provided)

None
password str

MySQL password (not needed if conn is provided)

None
database str

Database containing the registry table (not needed if conn is provided)

None
table str

Table name to look up in the registry

None
port int

MySQL port (default: 3306)

3306
registry_table str

Name of the schema registry table (default: 'schema_registry')

'schema_registry'
query str

Optional custom WHERE clause for additional filters

None
conn

Existing MySQL connection (optional, will create new if not provided)

None

Returns:

Type Description
List[Dict[str, Any]]

List of dicts with schema information

get_schema_from_postgresql

get_schema_from_postgresql(host: str = None, user: str = None, password: str = None, database: str = None, schema: str = None, table: str = None, port: int = 5432, registry_table: str = 'schema_registry', query: str = None, conn=None) -> List[Dict[str, Any]]

Get schema from PostgreSQL schema_registry table

Parameters:

Name Type Description Default
host str

PostgreSQL host (not needed if conn is provided)

None
user str

PostgreSQL user (not needed if conn is provided)

None
password str

PostgreSQL password (not needed if conn is provided)

None
database str

Database containing the registry table (not needed if conn is provided)

None
schema str

Schema containing the registry table

None
table str

Table name to look up in the registry

None
port int

PostgreSQL port (default: 5432)

5432
registry_table str

Name of the schema registry table (default: 'schema_registry')

'schema_registry'
query str

Optional custom WHERE clause for additional filters

None
conn

Existing PostgreSQL connection (optional, will create new if not provided)

None

Returns:

Type Description
List[Dict[str, Any]]

List of dicts with schema information

get_schema_from_s3

get_schema_from_s3(s3_path: str, table: str, delimiter: str = ',', query: str = None) -> List[Dict[str, Any]]

Get schema from S3 schema_registry CSV file

Parameters:

Name Type Description Default
s3_path str

S3 URI to the schema_registry CSV file (e.g., 's3://bucket/path/schema_registry.csv')

required
table str

Table name to look up in the registry

required
delimiter str

CSV delimiter (default: ',')

','
query str

Optional custom WHERE clause for additional filters (NOT SUPPORTED for S3/CSV)

None

Returns:

Type Description
List[Dict[str, Any]]

List of dicts with schema information