๐Ÿ”ฎ GraphQL Client

GraphQL client with streaming pagination for memory-efficient data fetching.

๐ŸŽฏ Overview

Method Purpose
GraphQLClient GraphQL client with async generator pagination
.from_url() โœจ NEW - Create client from URL with optional bearer token
.execute() โœจ NEW - Unified method for queries and mutations
.execute_query() Execute single query (returns full response)
.execute_mutation() Execute mutation (alias for execute_query)
.fetch_pages_relay() โœจ NEW - Fetch all pages from Relay-style pagination
.fetch_pages_generator() Stream paginated data (memory-efficient)
execute_graphql() โœจ NEW - One-liner function for simple queries

๐Ÿ—๏ธ Architecture

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    Streaming Pagination Flow                    โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚  Page 1: query($cursor: null) โ†’ yield [batch] โ†’ process        โ”‚
โ”‚  Page 2: query($cursor: xyz)  โ†’ yield [batch] โ†’ process        โ”‚
โ”‚  Page N: query($cursor: abc)  โ†’ yield [batch] โ†’ process        โ”‚
โ”‚                                                                 โ”‚
โ”‚  โš ๏ธ CRITICAL: Never accumulates full dataset in memory         โ”‚
โ”‚  โœ… Memory: O(page_size) not O(total_records)                  โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

๐Ÿ”ฎ GraphQLClient

Method Purpose
.from_url() โœจ NEW - Create client from URL + token
.execute() โœจ NEW - Unified query/mutation (returns data only)
.execute_query() Execute single GraphQL query (full response)
.execute_mutation() Execute GraphQL mutation
.fetch_pages_relay() โœจ NEW - Auto-accumulate Relay pagination
.fetch_pages_generator() Async generator for paginated data (streaming)

๐Ÿš€ Quick Start Examples

One-Liner Query

from fh_saas.utils_graphql import execute_graphql

# NEW: No client management needed
result = await execute_graphql(
    url="https://api.example.com/graphql",
    query="query { users { id name } }",
    bearer_token="your-token"
)

Convenience Constructor

from fh_saas.utils_graphql import GraphQLClient

# NEW: Direct from URL
async with GraphQLClient.from_url(
    url="https://api.example.com/graphql",
    bearer_token="your-token"
) as gql:
    result = await gql.execute("query { users { id name } }")

Relay Pagination (Auto-Accumulate)

# NEW: Fetch all pages automatically
async with GraphQLClient.from_url(url, bearer_token=token) as gql:
    all_items = await gql.fetch_pages_relay(
        query='''
            query($first: Int, $after: String, $filter: Filter) {
                transactionsConnection(first: $first, after: $after, filter: $filter) {
                    edges { node { id amount } }
                    pageInfo { hasNextPage endCursor }
                }
            }
        ''',
        connection_path="transactionsConnection",
        variables={"filter": {"status": "completed"}},
        page_size=100,
        max_pages=50  # Safety limit
    )
    print(f"Fetched {len(all_items)} total items")

Legacy: Manual Client Management

from fh_saas.utils_api import AsyncAPIClient, bearer_token_auth
from fh_saas.utils_graphql import GraphQLClient

# Still supported for advanced use cases
async with AsyncAPIClient(
    'https://api.example.com/graphql',
    auth_headers=bearer_token_auth('TOKEN')
) as http_client:
    
    client = GraphQLClient(http_client)
    result = await client.execute_query(
        query='{ users { id name } }'
    )

Memory-Efficient Streaming (Generator)

# For very large datasets - process batches without accumulating
async with GraphQLClient.from_url(url, bearer_token=token) as gql:
    async for batch in gql.fetch_pages_generator(
        query_template='''
            query($cursor: String) {
                users(after: $cursor, first: 1000) {
                    nodes { id name email }
                    pageInfo { hasNextPage endCursor }
                }
            }
        ''',
        variables={'cursor': None},
        items_path=['data', 'users', 'nodes'],
        cursor_path=['data', 'users', 'pageInfo', 'endCursor'],
        has_next_path=['data', 'users', 'pageInfo', 'hasNextPage']
    ):
        print(f"Processing batch of {len(batch)} records")
        # Process immediately - never load full dataset

source

GraphQLClient


def GraphQLClient(
    api_client:AsyncAPIClient, # Initialized AsyncAPIClient instance
    endpoint:str='', # GraphQL endpoint path (default: '' for base URL)
):

GraphQL client with streaming pagination for memory-efficient data fetching.


source

GraphQLClient.__init__


def __init__(
    api_client:AsyncAPIClient, # Initialized AsyncAPIClient instance
    endpoint:str='', # GraphQL endpoint path (default: '' for base URL)
):

Initialize self. See help(type(self)) for accurate signature.


source

GraphQLClient.execute_query


def execute_query(
    query:str, # GraphQL query string
    variables:dict=None, # Query variables
)->Dict[str, Any]:

Execute a single GraphQL query and return the JSON response.


source

GraphQLClient.execute_mutation


def execute_mutation(
    mutation:str, # GraphQL mutation string
    variables:dict=None, # Mutation variables
)->Dict[str, Any]:

Execute a GraphQL mutation (alias for execute_query).


source

GraphQLClient.fetch_pages_generator


def fetch_pages_generator(
    query_template:str, # GraphQL query with $variables placeholders
    variables:dict, # Initial variables (must include cursor key)
    items_path:list[str], # JSONPath to list of items (e.g., ['data', 'users', 'nodes'])
    cursor_path:list[str], # JSONPath to next cursor (e.g., ['data', 'users', 'pageInfo', 'endCursor'])
    has_next_path:list[str]=None, # Optional path to hasNextPage boolean (if None, checks cursor != None)
    cursor_var:str='cursor', # Variable name for cursor in query (default: 'cursor')
)->AsyncGenerator[List[Dict], None]:

Stream paginated GraphQL data page-by-page using async generator.


source

GraphQLClient.fetch_pages_relay


def fetch_pages_relay(
    query:str, # GraphQL query with $first and $after variables
    connection_path:str, # Dot-notation path to connection object (e.g., "transactionsConnection")
    variables:dict | None=None, # Base variables for the query (excluding first/after)
    page_size:int=100, # Number of items per page
    max_pages:int | None=None, # Maximum pages to fetch (None for unlimited)
)->list[dict]:

Fetch all pages from a Relay-style paginated GraphQL query.

Example query structure: query($first: Int, $after: String, $filter: TransactionFilter) { transactionsConnection(first: $first, after: $after, filter: $filter) { edges { node { id amount } } pageInfo { hasNextPage endCursor } } }

Returns: List of all nodes from all pages


source

GraphQLClient.execute


def execute(
    query:str, # GraphQL query or mutation string
    variables:dict=None, # Query/mutation variables
)->Dict[str, Any]:

Execute a GraphQL query or mutation and return the data portion.

This is a unified method that works for both queries and mutations. Returns only the โ€˜dataโ€™ portion of the response for convenience.


source

GraphQLClient.from_url


def from_url(
    cls, url:str, # GraphQL endpoint URL
    bearer_token:str | None=None, # Optional bearer token for Authorization header
    headers:dict | None=None, # Optional additional headers
):

Create GraphQL client directly from URL with optional bearer token.

Usage: async with GraphQLClient.from_url(url, bearer_token=token) as gql: result = await gql.execute(query)

Streaming Pagination Details

Why use generators? For large datasets (millions of rows), accumulating all data in memory causes OOM errors. The async generator yields batches instead.

Parameters:

Parameter Description
query_template GraphQL query with $cursor variable
variables Initial variables dict (e.g., {'cursor': None})
items_path Path to data list in response (e.g., ['data', 'users', 'nodes'])
cursor_path Path to next cursor (e.g., ['data', 'users', 'pageInfo', 'endCursor'])
has_next_path Optional path to hasNextPage flag
cursor_var Cursor variable name in query (default: 'cursor')

Example with database insert:

async for batch in client.fetch_pages_generator(
    query_template='''
        query($cursor: String) {
            users(after: $cursor, first: 1000) {
                nodes { id name }
                pageInfo { hasNextPage endCursor }
            }
        }
    ''',
    variables={'cursor': None},
    items_path=['data', 'users', 'nodes'],
    cursor_path=['data', 'users', 'pageInfo', 'endCursor'],
    has_next_path=['data', 'users', 'pageInfo', 'hasNextPage']
):
    # Process each batch immediately
    df = pl.DataFrame(batch)
    df.write_database('staging_table', connection=conn)

source

execute_graphql


def execute_graphql(
    url:str, # GraphQL endpoint URL
    query:str, # GraphQL query string
    variables:dict | None=None, # Optional query variables
    bearer_token:str | None=None, # Optional bearer token for Authorization header
    headers:dict | None=None, # Optional additional headers
)->dict:

Execute a single GraphQL query without managing client lifecycle.

This is a convenience function for one-off queries. For multiple queries, use GraphQLClient.from_url() to reuse the connection.

Usage: result = await execute_graphql( url=โ€œhttps://api.example.com/graphqlโ€, query=โ€œquery { users { id name } }โ€, bearer_token=โ€œyour-tokenโ€ )

Raises: ValueError: If the response contains GraphQL errors