🐻‍❄️ Data Transforms

High-performance JSON-to-database pipeline using Polars vectorized transformations.

🎯 Overview

Function Purpose
map_and_upsert Bulk JSON→DB upsert via staging table
apply_schema Type conversions (dates, booleans, numbers)

🏗️ Architecture

┌─────────────────────────────────────────────────────────────────┐
│                    Staging Table Pattern                        │
├─────────────────────────────────────────────────────────────────┤
│  1. df.write_database(staging_table) → fast bulk INSERT        │
│  2. INSERT ... ON CONFLICT SELECT FROM staging → vectorized    │
│  3. DROP TABLE staging → cleanup                                │
│                                                                 │
│  ⚡ 10-100x faster than row-by-row upserts                     │
└─────────────────────────────────────────────────────────────────┘

📥 Map & Upsert

Parameter Description
df Polars DataFrame from JSON
table_name Target database table (must exist)
key_col Primary key for ON CONFLICT resolution
db_uri Database connection string
column_map Optional rename map {json_col: db_col}
unnest_cols Optional list of nested columns to flatten
type_map Optional type casting map {col: pl.DataType}

Returns: int - Number of rows affected by the upsert operation

Staging Table Pattern (10-100x faster than row-by-row): 1. Write to temporary staging table (fast bulk insert) 2. Execute INSERT ... ON CONFLICT (database-native, vectorized) 3. Drop staging table (cleanup)

Type Casting with type_map:

When API data contains None values, Polars may infer incorrect types (e.g., String instead of Int64). This causes PostgreSQL type mismatch errors like:

column "balance" is of type integer but expression is of type text

Use type_map to explicitly cast columns before writing:

rows = map_and_upsert(
    df=df,
    table_name='accounts',
    key_col='id',
    db_uri=db_uri,
    type_map={
        'balance_current': pl.Int64,
        'balance_available': pl.Int64,
        'balance_limit': pl.Int64
    }
)
print(f"Upserted {rows} rows")

Basic Example:

import polars as pl

# JSON from API
json_data = [
    {'user_id_val': 1, 'ABC_1': 'Alice', 'extra': 'ignore'},
    {'user_id_val': 2, 'ABC_1': 'Bob', 'extra': 'ignore'}
]

df = pl.DataFrame(json_data)

rows_affected = map_and_upsert(
    df=df,
    table_name='users',
    key_col='user_id',
    db_uri='sqlite:///app.db',
    column_map={'user_id_val': 'user_id', 'ABC_1': 'name'}
)
print(f"Upserted {rows_affected} users")

source

map_and_upsert


def map_and_upsert(
    df:pl.DataFrame, # The raw Polars DataFrame from JSON
    table_name:str, # Target database table name
    key_col:str, # Primary key column for conflict resolution
    db_uri:str, # SQLAlchemy connection string (e.g., 'sqlite:///db.db' or 'postgresql://...')
    column_map:dict=None, # Optional rename map {json_key: db_col}
    unnest_cols:list[str]=None, # List of Struct columns to flatten
    type_map:dict=None, # Optional type casting map {col_name: pl.DataType}
)->int:

Map JSON data to database columns and upsert using staging table pattern.

Returns: Number of rows affected by the upsert operation

Type Casting: When columns have None values, Polars may infer incorrect types (e.g., String instead of Int64). This causes PostgreSQL type mismatch errors. Use type_map to explicitly cast columns before writing to the database.

🔧 Schema Transformations

Parameter Description
df Polars DataFrame
type_map Dict mapping column names to Polars dtypes

Supported conversions:

Type Handling
pl.Date Parses YYYY-MM-DD strings
pl.Datetime Parses datetime strings
pl.Boolean Converts "true"/"false" strings
Other Uses cast() (works for numeric types)

Example:

df = pl.DataFrame({
    'created_at': ['2024-01-15', '2024-01-16'],
    'is_active': ['true', 'false'],
    'amount': ['123.45', '678.90']
})

df = apply_schema(df, {
    'created_at': pl.Date,
    'is_active': pl.Boolean,
    'amount': pl.Float64
})

source

apply_schema


def apply_schema(
    df:pl.DataFrame, # Input DataFrame
    type_map:dict, # Column name -> Polars dtype (e.g., {'created_at': pl.Date, 'is_active': pl.Boolean})
)->pl.DataFrame:

Apply explicit type conversions to DataFrame columns.