๐ฎ GraphQL Client
๐ฏ Overview
| Method | Purpose |
|---|---|
GraphQLClient |
GraphQL client with async generator pagination |
.from_url() |
โจ NEW - Create client from URL with optional bearer token |
.execute() |
โจ NEW - Unified method for queries and mutations |
.execute_query() |
Execute single query (returns full response) |
.execute_mutation() |
Execute mutation (alias for execute_query) |
.fetch_pages_relay() |
โจ NEW - Fetch all pages from Relay-style pagination |
.fetch_pages_generator() |
Stream paginated data (memory-efficient) |
execute_graphql() |
โจ NEW - One-liner function for simple queries |
๐๏ธ Architecture
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Streaming Pagination Flow โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Page 1: query($cursor: null) โ yield [batch] โ process โ
โ Page 2: query($cursor: xyz) โ yield [batch] โ process โ
โ Page N: query($cursor: abc) โ yield [batch] โ process โ
โ โ
โ โ ๏ธ CRITICAL: Never accumulates full dataset in memory โ
โ โ
Memory: O(page_size) not O(total_records) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
๐ฎ GraphQLClient
| Method | Purpose |
|---|---|
.from_url() |
โจ NEW - Create client from URL + token |
.execute() |
โจ NEW - Unified query/mutation (returns data only) |
.execute_query() |
Execute single GraphQL query (full response) |
.execute_mutation() |
Execute GraphQL mutation |
.fetch_pages_relay() |
โจ NEW - Auto-accumulate Relay pagination |
.fetch_pages_generator() |
Async generator for paginated data (streaming) |
๐ Quick Start Examples
One-Liner Query
from fh_saas.utils_graphql import execute_graphql
# NEW: No client management needed
result = await execute_graphql(
url="https://api.example.com/graphql",
query="query { users { id name } }",
bearer_token="your-token"
)Convenience Constructor
from fh_saas.utils_graphql import GraphQLClient
# NEW: Direct from URL
async with GraphQLClient.from_url(
url="https://api.example.com/graphql",
bearer_token="your-token"
) as gql:
result = await gql.execute("query { users { id name } }")Relay Pagination (Auto-Accumulate)
# NEW: Fetch all pages automatically
async with GraphQLClient.from_url(url, bearer_token=token) as gql:
all_items = await gql.fetch_pages_relay(
query='''
query($first: Int, $after: String, $filter: Filter) {
transactionsConnection(first: $first, after: $after, filter: $filter) {
edges { node { id amount } }
pageInfo { hasNextPage endCursor }
}
}
''',
connection_path="transactionsConnection",
variables={"filter": {"status": "completed"}},
page_size=100,
max_pages=50 # Safety limit
)
print(f"Fetched {len(all_items)} total items")Legacy: Manual Client Management
from fh_saas.utils_api import AsyncAPIClient, bearer_token_auth
from fh_saas.utils_graphql import GraphQLClient
# Still supported for advanced use cases
async with AsyncAPIClient(
'https://api.example.com/graphql',
auth_headers=bearer_token_auth('TOKEN')
) as http_client:
client = GraphQLClient(http_client)
result = await client.execute_query(
query='{ users { id name } }'
)Memory-Efficient Streaming (Generator)
# For very large datasets - process batches without accumulating
async with GraphQLClient.from_url(url, bearer_token=token) as gql:
async for batch in gql.fetch_pages_generator(
query_template='''
query($cursor: String) {
users(after: $cursor, first: 1000) {
nodes { id name email }
pageInfo { hasNextPage endCursor }
}
}
''',
variables={'cursor': None},
items_path=['data', 'users', 'nodes'],
cursor_path=['data', 'users', 'pageInfo', 'endCursor'],
has_next_path=['data', 'users', 'pageInfo', 'hasNextPage']
):
print(f"Processing batch of {len(batch)} records")
# Process immediately - never load full datasetGraphQLClient
def GraphQLClient(
api_client:AsyncAPIClient, # Initialized AsyncAPIClient instance
endpoint:str='', # GraphQL endpoint path (default: '' for base URL)
):
GraphQL client with streaming pagination for memory-efficient data fetching.
GraphQLClient.__init__
def __init__(
api_client:AsyncAPIClient, # Initialized AsyncAPIClient instance
endpoint:str='', # GraphQL endpoint path (default: '' for base URL)
):
Initialize self. See help(type(self)) for accurate signature.
GraphQLClient.execute_query
def execute_query(
query:str, # GraphQL query string
variables:dict=None, # Query variables
)->Dict[str, Any]:
Execute a single GraphQL query and return the JSON response.
GraphQLClient.execute_mutation
def execute_mutation(
mutation:str, # GraphQL mutation string
variables:dict=None, # Mutation variables
)->Dict[str, Any]:
Execute a GraphQL mutation (alias for execute_query).
GraphQLClient.fetch_pages_generator
def fetch_pages_generator(
query_template:str, # GraphQL query with $variables placeholders
variables:dict, # Initial variables (must include cursor key)
items_path:list[str], # JSONPath to list of items (e.g., ['data', 'users', 'nodes'])
cursor_path:list[str], # JSONPath to next cursor (e.g., ['data', 'users', 'pageInfo', 'endCursor'])
has_next_path:list[str]=None, # Optional path to hasNextPage boolean (if None, checks cursor != None)
cursor_var:str='cursor', # Variable name for cursor in query (default: 'cursor')
)->AsyncGenerator[List[Dict], None]:
Stream paginated GraphQL data page-by-page using async generator.
GraphQLClient.fetch_pages_relay
def fetch_pages_relay(
query:str, # GraphQL query with $first and $after variables
connection_path:str, # Dot-notation path to connection object (e.g., "transactionsConnection")
variables:dict | None=None, # Base variables for the query (excluding first/after)
page_size:int=100, # Number of items per page
max_pages:int | None=None, # Maximum pages to fetch (None for unlimited)
)->list[dict]:
Fetch all pages from a Relay-style paginated GraphQL query.
Example query structure: query($first: Int, $after: String, $filter: TransactionFilter) { transactionsConnection(first: $first, after: $after, filter: $filter) { edges { node { id amount } } pageInfo { hasNextPage endCursor } } }
Returns: List of all nodes from all pages
GraphQLClient.execute
def execute(
query:str, # GraphQL query or mutation string
variables:dict=None, # Query/mutation variables
)->Dict[str, Any]:
Execute a GraphQL query or mutation and return the data portion.
This is a unified method that works for both queries and mutations. Returns only the โdataโ portion of the response for convenience.
GraphQLClient.from_url
def from_url(
cls, url:str, # GraphQL endpoint URL
bearer_token:str | None=None, # Optional bearer token for Authorization header
headers:dict | None=None, # Optional additional headers
):
Create GraphQL client directly from URL with optional bearer token.
Usage: async with GraphQLClient.from_url(url, bearer_token=token) as gql: result = await gql.execute(query)
Streaming Pagination Details
Why use generators? For large datasets (millions of rows), accumulating all data in memory causes OOM errors. The async generator yields batches instead.
Parameters:
| Parameter | Description |
|---|---|
query_template |
GraphQL query with $cursor variable |
variables |
Initial variables dict (e.g., {'cursor': None}) |
items_path |
Path to data list in response (e.g., ['data', 'users', 'nodes']) |
cursor_path |
Path to next cursor (e.g., ['data', 'users', 'pageInfo', 'endCursor']) |
has_next_path |
Optional path to hasNextPage flag |
cursor_var |
Cursor variable name in query (default: 'cursor') |
Example with database insert:
async for batch in client.fetch_pages_generator(
query_template='''
query($cursor: String) {
users(after: $cursor, first: 1000) {
nodes { id name }
pageInfo { hasNextPage endCursor }
}
}
''',
variables={'cursor': None},
items_path=['data', 'users', 'nodes'],
cursor_path=['data', 'users', 'pageInfo', 'endCursor'],
has_next_path=['data', 'users', 'pageInfo', 'hasNextPage']
):
# Process each batch immediately
df = pl.DataFrame(batch)
df.write_database('staging_table', connection=conn)execute_graphql
def execute_graphql(
url:str, # GraphQL endpoint URL
query:str, # GraphQL query string
variables:dict | None=None, # Optional query variables
bearer_token:str | None=None, # Optional bearer token for Authorization header
headers:dict | None=None, # Optional additional headers
)->dict:
Execute a single GraphQL query without managing client lifecycle.
This is a convenience function for one-off queries. For multiple queries, use GraphQLClient.from_url() to reuse the connection.
Usage: result = await execute_graphql( url=โhttps://api.example.com/graphqlโ, query=โquery { users { id name } }โ, bearer_token=โyour-tokenโ )
Raises: ValueError: If the response contains GraphQL errors