🐻❄️ Data Transforms
🎯 Overview
| Function | Purpose |
|---|---|
map_and_upsert |
Bulk JSON→DB upsert via staging table |
apply_schema |
Type conversions (dates, booleans, numbers) |
🏗️ Architecture
┌─────────────────────────────────────────────────────────────────┐
│ Staging Table Pattern │
├─────────────────────────────────────────────────────────────────┤
│ 1. df.write_database(staging_table) → fast bulk INSERT │
│ 2. INSERT ... ON CONFLICT SELECT FROM staging → vectorized │
│ 3. DROP TABLE staging → cleanup │
│ │
│ ⚡ 10-100x faster than row-by-row upserts │
└─────────────────────────────────────────────────────────────────┘
📥 Map & Upsert
| Parameter | Description |
|---|---|
df |
Polars DataFrame from JSON |
table_name |
Target database table (must exist) |
key_col |
Primary key for ON CONFLICT resolution |
db_uri |
Database connection string |
column_map |
Optional rename map {json_col: db_col} |
unnest_cols |
Optional list of nested columns to flatten |
type_map |
Optional type casting map {col: pl.DataType} |
Returns: int - Number of rows affected by the upsert operation
Staging Table Pattern (10-100x faster than row-by-row): 1. Write to temporary staging table (fast bulk insert) 2. Execute INSERT ... ON CONFLICT (database-native, vectorized) 3. Drop staging table (cleanup)
Type Casting with type_map:
When API data contains None values, Polars may infer incorrect types (e.g., String instead of Int64). This causes PostgreSQL type mismatch errors like:
column "balance" is of type integer but expression is of type text
Use type_map to explicitly cast columns before writing:
rows = map_and_upsert(
df=df,
table_name='accounts',
key_col='id',
db_uri=db_uri,
type_map={
'balance_current': pl.Int64,
'balance_available': pl.Int64,
'balance_limit': pl.Int64
}
)
print(f"Upserted {rows} rows")Basic Example:
import polars as pl
# JSON from API
json_data = [
{'user_id_val': 1, 'ABC_1': 'Alice', 'extra': 'ignore'},
{'user_id_val': 2, 'ABC_1': 'Bob', 'extra': 'ignore'}
]
df = pl.DataFrame(json_data)
rows_affected = map_and_upsert(
df=df,
table_name='users',
key_col='user_id',
db_uri='sqlite:///app.db',
column_map={'user_id_val': 'user_id', 'ABC_1': 'name'}
)
print(f"Upserted {rows_affected} users")map_and_upsert
def map_and_upsert(
df:pl.DataFrame, # The raw Polars DataFrame from JSON
table_name:str, # Target database table name
key_col:str, # Primary key column for conflict resolution
db_uri:str, # SQLAlchemy connection string (e.g., 'sqlite:///db.db' or 'postgresql://...')
column_map:dict=None, # Optional rename map {json_key: db_col}
unnest_cols:list[str]=None, # List of Struct columns to flatten
type_map:dict=None, # Optional type casting map {col_name: pl.DataType}
)->int:
Map JSON data to database columns and upsert using staging table pattern.
Returns: Number of rows affected by the upsert operation
Type Casting: When columns have None values, Polars may infer incorrect types (e.g., String instead of Int64). This causes PostgreSQL type mismatch errors. Use type_map to explicitly cast columns before writing to the database.
🔧 Schema Transformations
| Parameter | Description |
|---|---|
df |
Polars DataFrame |
type_map |
Dict mapping column names to Polars dtypes |
Supported conversions:
| Type | Handling |
|---|---|
pl.Date |
Parses YYYY-MM-DD strings |
pl.Datetime |
Parses datetime strings |
pl.Boolean |
Converts "true"/"false" strings |
| Other | Uses cast() (works for numeric types) |
Example:
df = pl.DataFrame({
'created_at': ['2024-01-15', '2024-01-16'],
'is_active': ['true', 'false'],
'amount': ['123.45', '678.90']
})
df = apply_schema(df, {
'created_at': pl.Date,
'is_active': pl.Boolean,
'amount': pl.Float64
})apply_schema
def apply_schema(
df:pl.DataFrame, # Input DataFrame
type_map:dict, # Column name -> Polars dtype (e.g., {'created_at': pl.Date, 'is_active': pl.Boolean})
)->pl.DataFrame:
Apply explicit type conversions to DataFrame columns.