Configuration¶
sumeh.core.config ¶
This module provides a set of utility functions to retrieve and parse configuration data from various data sources, including S3, MySQL, PostgreSQL, BigQuery, CSV files, AWS Glue Data Catalog, DuckDB, and Databricks. Additionally, it includes functions to infer schema information from these sources.
Functions:
| Name | Description |
|---|---|
get_config_from_s3 |
str, delimiter: Optional[str] = ",") -> List[Dict[str, Any]]: |
get_config_from_mysql |
|
get_config_from_postgresql |
|
get_config_from_bigquery |
|
get_config_from_csv |
str, delimiter: Optional[str] = ",") -> List[Dict[str, str]]: Retrieves configuration data from a local CSV file. |
get_config_from_glue_data_catalog |
|
get_config_from_duckdb |
Retrieves configuration data from a DuckDB database. |
get_config_from_databricks |
Retrieves configuration data from a Databricks table. |
get_schema_from_csv |
str, delimiter: str = ",", sample_size: int = 1_000) -> List[Dict[str, Any]]: Infers the schema of a CSV file based on its content. |
get_schema_from_s3 |
str, **kwargs) -> List[Dict[str, Any]]: Infers the schema of a CSV file stored in S3. |
get_schema_from_mysql |
Retrieves schema information from a MySQL database table. |
get_schema_from_postgresql |
Retrieves schema information from a PostgreSQL database table. |
get_schema_from_bigquery |
Retrieves schema information from a Google BigQuery table. |
get_schema_from_glue |
Retrieves schema information from AWS Glue Data Catalog. |
get_schema_from_duckdb |
Retrieves schema information from a DuckDB database table. |
get_schema_from_databricks |
Retrieves schema information from a Databricks table. |
__read_s3_file |
str) -> Optional[str]: |
__parse_s3_path |
str) -> Tuple[str, str]: |
__read_local_file |
str) -> str: |
__read_csv_file |
str, delimiter: Optional[str] = ",") -> List[Dict[str, str]]: |
__parse_data |
list[dict]) -> list[dict]: Parses the configuration data into a structured format. |
__create_connection |
|
infer_basic_type |
str) -> str: Infers the basic data type of given value. |
__create_connection ¶
Helper function to create a database connection.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
connect_func
|
A connection function (e.g., |
required | |
host
|
str
|
The host of the database server. |
required |
user
|
str
|
The username for the database. |
required |
password
|
str
|
The password for the database. |
required |
database
|
str
|
The name of the database. |
required |
port
|
int
|
The port to connect to. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
Connection |
Any
|
A connection object for the database. |
Raises:
| Type | Description |
|---|---|
ConnectionError
|
If there is an error establishing the connection. |
__parse_data ¶
Parse configuration data into validated Rule objects.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
list[dict]
|
Raw configuration data as list of dictionaries |
required |
Returns:
| Type | Description |
|---|---|
List[RuleDef]
|
List[Rule]: Validated Rule objects with enriched metadata |
Note
Engine compatibility is not validated here - only rule existence. Engine validation happens during execution in validate().
__parse_s3_path ¶
Parses an S3 path into its bucket and key components.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
s3_path
|
str
|
The S3 path to parse. Must start with "s3://". |
required |
Returns:
| Type | Description |
|---|---|
Tuple[str, str]
|
Tuple[str, str]: A tuple containing the bucket name and the key. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If the S3 path does not start with "s3://", or if the path format is invalid and cannot be split into bucket and key. |
__read_csv_file ¶
Parses the content of a CSV file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
file_content
|
str
|
The content of the CSV file as a string. |
required |
delimiter
|
str
|
The delimiter used in the CSV file. |
','
|
Returns:
| Type | Description |
|---|---|
List[Dict[str, str]]
|
List[Dict[str, str]]: A list of dictionaries representing the parsed CSV data. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If there is an error parsing the CSV content. |
__read_local_file ¶
Reads the content of a local file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
file_path
|
str
|
The local file path to be read. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
The content of the file. |
Raises:
| Type | Description |
|---|---|
FileNotFoundError
|
If the file is not found. |
__read_s3_file ¶
Reads the content of a file stored in S3.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
s3_path
|
str
|
The S3 path of the file. |
required |
Returns:
| Name | Type | Description |
|---|---|---|
str |
Optional[str]
|
The content of the S3 file. |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If there is an error retrieving the file from S3. |
get_config_from_bigquery ¶
get_config_from_bigquery(project_id: str, dataset_id: str, table_id: str, credentials_path: Optional[str] = None, client: Optional[Any] = None, query: Optional[str] = None) -> List[RuleDef]
Retrieves configuration data from a Google BigQuery table.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
project_id
|
str
|
Google Cloud project ID. |
required |
dataset_id
|
str
|
BigQuery dataset ID. |
required |
table_id
|
str
|
BigQuery table ID. |
required |
credentials_path
|
Optional[str]
|
Path to service account credentials file (if not provided, uses default credentials). |
None
|
client
|
Optional[Any]
|
Optional instance of google.cloud.bigquery.Client. If provided, it will be used and credentials_path ignored. |
None
|
query
|
Optional[str]
|
Optional custom SQL query. If not provided, defaults to SELECT * FROM |
None
|
Returns:
| Type | Description |
|---|---|
List[RuleDef]
|
List[Dict[str, Any]]: A list of records (dicts) returned by BigQuery (optionally parsed by __parse_data). |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If there is an error while querying BigQuery or with credentials. |
get_config_from_csv ¶
Retrieves configuration data from a CSV file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
file_path
|
str
|
The local file path to the CSV file. |
required |
delimiter
|
Optional[str]
|
The delimiter used in the CSV file (default is ","). |
','
|
Returns:
| Type | Description |
|---|---|
List[RuleDef]
|
List[Dict[str, str]]: A list of dictionaries representing the parsed configuration data. |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If there is an error reading or processing the file. |
get_config_from_databricks ¶
get_config_from_databricks(spark, catalog: Optional[str], schema: Optional[str], table: str, **kwargs) -> List[RuleDef]
Retrieves configuration data from a Databricks table and returns it as a list of dictionaries.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spark SparkSession
|
Spark Session to get information from Databricks |
required | |
catalog
|
Optional[str]
|
The catalog name in Databricks. If provided, it will be included in the table's full path. |
required |
schema
|
Optional[str]
|
The schema name in Databricks. If provided, it will be included in the table's full path. |
required |
table
|
str
|
The name of the table to retrieve data from. |
required |
query
|
Additional keyword arguments (currently unused). |
required |
Returns:
| Type | Description |
|---|---|
List[RuleDef]
|
List[Dict[str, Any]]: A list of dictionaries, where each dictionary represents a row of data from the table. |
get_config_from_duckdb ¶
Retrieve configuration data from a DuckDB database.
This function fetches data from a DuckDB database either by executing a custom SQL query or by selecting all rows from a specified table. The data is then parsed into a list of dictionaries.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
table
|
str
|
The name of the table to fetch data from. Defaults to None. |
None
|
query
|
str
|
A custom SQL query to execute. Defaults to None. |
None
|
conn
|
A valid DuckDB connection object. |
None
|
Returns:
| Type | Description |
|---|---|
List[RuleDef]
|
List[Dict[str, Any]]: A list of dictionaries representing the fetched data. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If neither |
Example
import duckdb conn = duckdb.connect('my_db.duckdb') config = get_config_from_duckdb('my_db.duckdb', table='rules', conn=conn)
get_config_from_glue_data_catalog ¶
get_config_from_glue_data_catalog(glue_context, database_name: str, table_name: str, query: Optional[str] = None) -> List[RuleDef]
Retrieves configuration data from AWS Glue Data Catalog.
Using Spark directly - works with all table formats (Parquet, ORC, CSV, Iceberg, Delta, Hudi).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
glue_context
|
An instance of |
required | |
database_name
|
str
|
Glue database name. |
required |
table_name
|
str
|
Glue table name. |
required |
query
|
Optional[str]
|
Custom SQL query to fetch data (if provided). |
None
|
Returns:
| Type | Description |
|---|---|
List[RuleDef]
|
List[Dict[str, str]]: A list of dictionaries representing the parsed configuration data. |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If there is an error querying Glue Data Catalog. |
get_config_from_mysql ¶
get_config_from_mysql(host: str = None, user: str = None, password: str = None, database: str = None, schema: str = None, table: str = None, port: int = 3306, query: str = None, conn=None) -> List[RuleDef]
Get configuration from MySQL table
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
host
|
str
|
MySQL host (not needed if conn is provided) |
None
|
user
|
str
|
MySQL user (not needed if conn is provided) |
None
|
password
|
str
|
MySQL password (not needed if conn is provided) |
None
|
database
|
str
|
Database name (not needed if conn is provided) |
None
|
schema
|
str
|
Schema name (optional) |
None
|
table
|
str
|
Table name to query |
None
|
port
|
int
|
MySQL port (default: 3306) |
3306
|
query
|
str
|
Optional custom query (if not provided, uses schema and table) |
None
|
conn
|
Existing MySQL connection (optional) |
None
|
Returns:
| Type | Description |
|---|---|
List[RuleDef]
|
List of dicts with configuration data |
get_config_from_postgresql ¶
get_config_from_postgresql(connection: Optional = None, host: Optional[str] = None, user: Optional[str] = None, password: Optional[str] = None, database: Optional[str] = None, port: Optional[int] = 5432, schema: Optional[str] = None, table: Optional[str] = None, query: Optional[str] = None) -> List[RuleDef]
Retrieves configuration data from a PostgreSQL database.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
connection
|
Optional
|
An existing PostgreSQL connection object. |
None
|
host
|
Optional[str]
|
Host of the PostgreSQL server. |
None
|
user
|
Optional[str]
|
Username to connect to PostgreSQL. |
None
|
password
|
Optional[str]
|
Password for the PostgreSQL user. |
None
|
database
|
Optional[str]
|
Database name to query. |
None
|
port
|
Optional[int]
|
The port for the PostgreSQL connection (default is 5432). |
5432
|
schema
|
Optional[str]
|
Schema name if query is not provided. |
None
|
table
|
Optional[str]
|
Table name if query is not provided. |
None
|
query
|
Optional[str]
|
Custom SQL query to fetch data (if not provided, |
None
|
Returns:
| Type | Description |
|---|---|
List[RuleDef]
|
List[Dict[str, Any]]: A list of dictionaries representing the parsed configuration data. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If neither |
ConnectionError
|
If there is an error connecting to PostgreSQL. |
RuntimeError
|
If there is an error executing the query or processing the data. |
get_config_from_s3 ¶
Retrieves configuration data from a CSV file stored in an S3 bucket.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
s3_path
|
str
|
The S3 path to the CSV file. |
required |
delimiter
|
Optional[str]
|
The delimiter used in the CSV file (default is ","). |
','
|
Returns:
| Type | Description |
|---|---|
|
List[Dict[str, Any]]: A list of dictionaries representing the parsed configuration data. |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If there is an error reading or processing the S3 file. |
get_schema_from_bigquery ¶
get_schema_from_bigquery(project_id: str, dataset_id: str, table_id: str, credentials_path: str = None, registry_table: str = 'schema_registry', query: str = None) -> List[Dict[str, Any]]
Get schema from BigQuery schema_registry table
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
project_id
|
str
|
BigQuery project ID |
required |
dataset_id
|
str
|
BigQuery dataset ID |
required |
table_id
|
str
|
Table name to look up in the registry |
required |
credentials_path
|
str
|
Path to service account credentials file |
None
|
registry_table
|
str
|
Name of the schema registry table |
'schema_registry'
|
query
|
str
|
Optional custom WHERE clause for additional filters |
None
|
Returns:
| Type | Description |
|---|---|
List[Dict[str, Any]]
|
List of dicts with schema information |
get_schema_from_csv ¶
get_schema_from_csv(file_path: str, table: str, delimiter: str = ',', query: str = None) -> List[Dict[str, Any]]
Get schema from CSV schema_registry file
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
file_path
|
str
|
Path to the schema_registry CSV file |
required |
table
|
str
|
Table name to look up in the registry |
required |
delimiter
|
str
|
CSV delimiter (default: ',') |
','
|
query
|
str
|
Optional custom WHERE clause for additional filters (NOT SUPPORTED for CSV) |
None
|
Returns:
| Type | Description |
|---|---|
List[Dict[str, Any]]
|
List of dicts with schema information |
get_schema_from_databricks ¶
get_schema_from_databricks(spark, catalog: str, schema: str, table: str, registry_table: str = 'schema_registry', query: str = None) -> List[Dict[str, Any]]
Get schema from Databricks Unity Catalog schema_registry table
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spark
|
SparkSession instance |
required | |
catalog
|
str
|
Unity Catalog name containing the registry |
required |
schema
|
str
|
Schema name containing the registry table |
required |
table
|
str
|
Table name to look up in the registry |
required |
registry_table
|
str
|
Name of the schema registry table (default: 'schema_registry') |
'schema_registry'
|
query
|
str
|
Optional custom WHERE clause for additional filters |
None
|
Returns:
| Type | Description |
|---|---|
List[Dict[str, Any]]
|
List of dicts with schema information |
get_schema_from_duckdb ¶
get_schema_from_duckdb(conn, table: str, registry_table: str = 'schema_registry', query: str = None) -> List[Dict[str, Any]]
Get schema from DuckDB schema_registry table
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
conn
|
DuckDB connection object |
required | |
table
|
str
|
Table name to look up in the registry |
required |
registry_table
|
str
|
Name of the schema registry table (default: 'schema_registry') |
'schema_registry'
|
query
|
str
|
Optional custom WHERE clause for additional filters |
None
|
Returns:
| Type | Description |
|---|---|
List[Dict[str, Any]]
|
List of dicts with schema information |
get_schema_from_glue ¶
get_schema_from_glue(glue_context, database_name: str, table_name: str, registry_table: str = 'schema_registry', query: str = None) -> List[Dict[str, Any]]
Get schema from Glue Data Catalog schema_registry table
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
glue_context
|
GlueContext instance |
required | |
database_name
|
str
|
Glue database containing the registry table |
required |
table_name
|
str
|
Table name to look up in the registry |
required |
registry_table
|
str
|
Name of the schema registry table (default: 'schema_registry') |
'schema_registry'
|
query
|
str
|
Optional custom WHERE clause for additional filters |
None
|
Returns:
| Type | Description |
|---|---|
List[Dict[str, Any]]
|
List of dicts with schema information |
get_schema_from_mysql ¶
get_schema_from_mysql(host: str = None, user: str = None, password: str = None, database: str = None, table: str = None, port: int = 3306, registry_table: str = 'schema_registry', query: str = None, conn=None) -> List[Dict[str, Any]]
Get schema from MySQL schema_registry table
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
host
|
str
|
MySQL host (not needed if conn is provided) |
None
|
user
|
str
|
MySQL user (not needed if conn is provided) |
None
|
password
|
str
|
MySQL password (not needed if conn is provided) |
None
|
database
|
str
|
Database containing the registry table (not needed if conn is provided) |
None
|
table
|
str
|
Table name to look up in the registry |
None
|
port
|
int
|
MySQL port (default: 3306) |
3306
|
registry_table
|
str
|
Name of the schema registry table (default: 'schema_registry') |
'schema_registry'
|
query
|
str
|
Optional custom WHERE clause for additional filters |
None
|
conn
|
Existing MySQL connection (optional, will create new if not provided) |
None
|
Returns:
| Type | Description |
|---|---|
List[Dict[str, Any]]
|
List of dicts with schema information |
get_schema_from_postgresql ¶
get_schema_from_postgresql(host: str = None, user: str = None, password: str = None, database: str = None, schema: str = None, table: str = None, port: int = 5432, registry_table: str = 'schema_registry', query: str = None, conn=None) -> List[Dict[str, Any]]
Get schema from PostgreSQL schema_registry table
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
host
|
str
|
PostgreSQL host (not needed if conn is provided) |
None
|
user
|
str
|
PostgreSQL user (not needed if conn is provided) |
None
|
password
|
str
|
PostgreSQL password (not needed if conn is provided) |
None
|
database
|
str
|
Database containing the registry table (not needed if conn is provided) |
None
|
schema
|
str
|
Schema containing the registry table |
None
|
table
|
str
|
Table name to look up in the registry |
None
|
port
|
int
|
PostgreSQL port (default: 5432) |
5432
|
registry_table
|
str
|
Name of the schema registry table (default: 'schema_registry') |
'schema_registry'
|
query
|
str
|
Optional custom WHERE clause for additional filters |
None
|
conn
|
Existing PostgreSQL connection (optional, will create new if not provided) |
None
|
Returns:
| Type | Description |
|---|---|
List[Dict[str, Any]]
|
List of dicts with schema information |
get_schema_from_s3 ¶
get_schema_from_s3(s3_path: str, table: str, delimiter: str = ',', query: str = None) -> List[Dict[str, Any]]
Get schema from S3 schema_registry CSV file
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
s3_path
|
str
|
S3 URI to the schema_registry CSV file (e.g., 's3://bucket/path/schema_registry.csv') |
required |
table
|
str
|
Table name to look up in the registry |
required |
delimiter
|
str
|
CSV delimiter (default: ',') |
','
|
query
|
str
|
Optional custom WHERE clause for additional filters (NOT SUPPORTED for S3/CSV) |
None
|
Returns:
| Type | Description |
|---|---|
List[Dict[str, Any]]
|
List of dicts with schema information |