
You're building a dashboard that shows daily sales metrics from your company's e-commerce platform. The platform provides an API to access this data, but when you make your first request, you only get 50 records back — not the thousands you expected. When you try to get more data by making rapid successive calls, the API starts returning error messages about "rate limits exceeded." Welcome to the real world of API integration.
Working with APIs is a fundamental skill for data professionals. Whether you're pulling customer data from a CRM, fetching financial metrics from a payment processor, or accessing social media analytics, you'll need to understand how to properly request data, handle large datasets that come in chunks, and respect the API's usage limits. These aren't just technical details — they're essential for building reliable data pipelines that won't break in production.
What you'll learn:
You should be comfortable with basic Python programming, including functions, loops, and error handling. Familiarity with JSON data format is helpful but not required — we'll cover what you need to know.
REST (Representational State Transfer) is an architectural style for building web APIs that has become the de facto standard for data access. Think of a REST API like a digital librarian: you make specific requests for information, and it returns exactly what you asked for in a standardized format.
REST APIs use HTTP methods to define actions:
For data engineering, you'll primarily use GET requests to retrieve data. Let's start with a practical example using a real API.
import requests
import json
from time import sleep
from datetime import datetime
# Let's work with a public API that provides fake e-commerce data
base_url = "https://jsonplaceholder.typicode.com"
def make_api_request(endpoint, params=None):
"""
Make a basic API request with error handling
"""
url = f"{base_url}/{endpoint}"
try:
response = requests.get(url, params=params)
response.raise_for_status() # Raises an exception for HTTP error codes
return response.json()
except requests.exceptions.RequestException as e:
print(f"API request failed: {e}")
return None
# Get all posts (this represents your data records)
posts = make_api_request("posts")
if posts:
print(f"Retrieved {len(posts)} posts")
print("First post:", json.dumps(posts[0], indent=2))
This basic example shows the foundation of API work: making requests, handling errors, and parsing JSON responses. But real-world APIs have complications that this simple example doesn't address.
Most APIs don't return all available data in a single request. Imagine if Gmail tried to load every email you've ever received on the first page load — it would be slow and overwhelming. Instead, APIs use pagination to break large datasets into manageable chunks.
There are several pagination patterns you'll encounter:
This is like page numbers in a book. You specify which "page" you want and how many items per page.
def fetch_paginated_data_offset(endpoint, page_size=20):
"""
Fetch all data using offset-based pagination
"""
all_data = []
offset = 0
while True:
params = {
'limit': page_size,
'offset': offset
}
print(f"Fetching data with offset {offset}...")
response = make_api_request(endpoint, params)
if not response or len(response) == 0:
break
all_data.extend(response)
# If we got fewer items than requested, we've reached the end
if len(response) < page_size:
break
offset += page_size
# Be respectful - don't hammer the API
sleep(0.5)
return all_data
# Example usage
all_posts = fetch_paginated_data_offset("posts")
print(f"Total posts retrieved: {len(all_posts)}")
This approach uses a "cursor" or token to mark your position in the dataset. It's more reliable for data that changes frequently because it doesn't suffer from the "shifting data" problem that offset pagination has.
def fetch_paginated_data_cursor(endpoint, page_size=20):
"""
Fetch all data using cursor-based pagination
Note: This is a simulated example since our test API doesn't support cursors
"""
all_data = []
cursor = None
while True:
params = {'limit': page_size}
if cursor:
params['cursor'] = cursor
print(f"Fetching data with cursor: {cursor or 'initial'}...")
# In a real cursor-based API, you'd get both data and next cursor
response = make_api_request(endpoint, params)
if not response:
break
# Extract the actual data (this varies by API)
data = response.get('data', response)
if not data:
break
all_data.extend(data)
# Get the next cursor from the response
cursor = response.get('next_cursor')
if not cursor:
break
sleep(0.5)
return all_data
Some APIs use traditional page numbers. Here's how to handle that pattern:
def fetch_paginated_data_pages(endpoint, page_size=20):
"""
Fetch all data using page number pagination
"""
all_data = []
page = 1
while True:
params = {
'per_page': page_size,
'page': page
}
print(f"Fetching page {page}...")
response = make_api_request(endpoint, params)
if not response or len(response) == 0:
break
all_data.extend(response)
# If we got fewer items than requested, we've reached the end
if len(response) < page_size:
break
page += 1
sleep(0.5)
return all_data
Pro Tip: Always check the API documentation for pagination details. Some APIs include metadata in their responses that tells you the total number of records, current page, and whether there are more pages available. This information can make your pagination logic more efficient and reliable.
Rate limiting is how API providers protect their servers from being overwhelmed. Think of it like a toll booth on a highway — only so many cars can pass through per minute. Exceed the limit, and you'll get blocked.
Common rate limiting patterns include:
import time
from collections import deque
class RateLimiter:
"""
A simple rate limiter using a sliding window approach
"""
def __init__(self, max_requests, time_window):
self.max_requests = max_requests
self.time_window = time_window
self.requests = deque()
def wait_if_needed(self):
"""
Wait if necessary to stay within rate limits
"""
now = time.time()
# Remove requests older than our time window
while self.requests and self.requests[0] < now - self.time_window:
self.requests.popleft()
# If we're at the limit, wait until the oldest request expires
if len(self.requests) >= self.max_requests:
sleep_time = self.requests[0] + self.time_window - now
if sleep_time > 0:
print(f"Rate limit reached. Sleeping for {sleep_time:.2f} seconds...")
time.sleep(sleep_time)
self.wait_if_needed() # Recheck after sleeping
# Record this request
self.requests.append(now)
# Usage example
rate_limiter = RateLimiter(max_requests=60, time_window=60) # 60 requests per minute
def make_rate_limited_request(endpoint, params=None):
"""
Make an API request with rate limiting
"""
rate_limiter.wait_if_needed()
return make_api_request(endpoint, params)
Many APIs include rate limiting information in their response headers. Here's how to use this information:
def make_smart_api_request(endpoint, params=None):
"""
Make an API request and respect rate limiting headers
"""
url = f"{base_url}/{endpoint}"
try:
response = requests.get(url, params=params)
# Check for rate limiting headers (common patterns)
remaining = response.headers.get('X-RateLimit-Remaining')
reset_time = response.headers.get('X-RateLimit-Reset')
retry_after = response.headers.get('Retry-After')
if response.status_code == 429: # Too Many Requests
if retry_after:
wait_time = int(retry_after)
print(f"Rate limited. Waiting {wait_time} seconds...")
time.sleep(wait_time)
return make_smart_api_request(endpoint, params) # Retry
else:
print("Rate limited but no retry time provided")
return None
if remaining and int(remaining) < 5:
print(f"Low rate limit remaining: {remaining}")
# Maybe slow down or implement backoff
time.sleep(1)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"API request failed: {e}")
return None
Now let's combine everything into a production-ready API client that handles pagination, rate limiting, and common error scenarios:
import requests
import time
import json
from typing import List, Dict, Any, Optional
import logging
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class APIClient:
"""
A robust API client with pagination and rate limiting
"""
def __init__(self, base_url: str, rate_limit: int = 60,
time_window: int = 60, max_retries: int = 3):
self.base_url = base_url.rstrip('/')
self.session = requests.Session()
self.rate_limiter = RateLimiter(rate_limit, time_window)
self.max_retries = max_retries
def set_auth(self, auth_type: str, **kwargs):
"""
Set up authentication for the API
"""
if auth_type == 'bearer':
token = kwargs.get('token')
self.session.headers.update({'Authorization': f'Bearer {token}'})
elif auth_type == 'api_key':
key = kwargs.get('key')
header = kwargs.get('header', 'X-API-Key')
self.session.headers.update({header: key})
elif auth_type == 'basic':
username = kwargs.get('username')
password = kwargs.get('password')
self.session.auth = (username, password)
def make_request(self, endpoint: str, params: Optional[Dict] = None,
method: str = 'GET') -> Optional[Dict]:
"""
Make a single API request with error handling and retries
"""
url = f"{self.base_url}/{endpoint.lstrip('/')}"
for attempt in range(self.max_retries + 1):
try:
self.rate_limiter.wait_if_needed()
response = self.session.request(method, url, params=params)
# Handle rate limiting
if response.status_code == 429:
retry_after = int(response.headers.get('Retry-After', 60))
logger.warning(f"Rate limited. Waiting {retry_after} seconds...")
time.sleep(retry_after)
continue
# Handle server errors with exponential backoff
if response.status_code >= 500:
if attempt < self.max_retries:
wait_time = 2 ** attempt
logger.warning(f"Server error {response.status_code}. "
f"Retrying in {wait_time} seconds...")
time.sleep(wait_time)
continue
response.raise_for_status()
return response.json()
except requests.exceptions.ConnectionError:
if attempt < self.max_retries:
wait_time = 2 ** attempt
logger.warning(f"Connection error. Retrying in {wait_time} seconds...")
time.sleep(wait_time)
continue
logger.error("Max retries exceeded for connection errors")
return None
except requests.exceptions.RequestException as e:
logger.error(f"Request failed: {e}")
return None
return None
def fetch_all_paginated(self, endpoint: str, pagination_style: str = 'offset',
page_size: int = 100, **kwargs) -> List[Dict]:
"""
Fetch all data from a paginated endpoint
"""
all_data = []
if pagination_style == 'offset':
offset = 0
while True:
params = {
'limit': page_size,
'offset': offset,
**kwargs
}
logger.info(f"Fetching {endpoint} with offset {offset}")
response = self.make_request(endpoint, params)
if not response:
break
# Handle different response formats
data = self._extract_data(response)
if not data:
break
all_data.extend(data)
if len(data) < page_size:
break
offset += page_size
elif pagination_style == 'page':
page = 1
while True:
params = {
'per_page': page_size,
'page': page,
**kwargs
}
logger.info(f"Fetching {endpoint} page {page}")
response = self.make_request(endpoint, params)
if not response:
break
data = self._extract_data(response)
if not data:
break
all_data.extend(data)
if len(data) < page_size:
break
page += 1
logger.info(f"Retrieved {len(all_data)} total records from {endpoint}")
return all_data
def _extract_data(self, response: Dict) -> List[Dict]:
"""
Extract data from API response, handling different formats
"""
# If response is already a list
if isinstance(response, list):
return response
# Common data keys in API responses
for key in ['data', 'results', 'items', 'records']:
if key in response and isinstance(response[key], list):
return response[key]
# If no standard key found, return the whole response as single item
return [response] if response else []
# Example usage
def main():
# Initialize the client
client = APIClient("https://jsonplaceholder.typicode.com", rate_limit=10, time_window=60)
# If using a real API with authentication:
# client.set_auth('bearer', token='your-api-token')
# Fetch all posts using pagination
all_posts = client.fetch_all_paginated('posts', pagination_style='offset', page_size=20)
print(f"Successfully retrieved {len(all_posts)} posts")
# Get specific data with filters
user_posts = client.fetch_all_paginated('posts', pagination_style='offset',
page_size=10, userId=1)
print(f"User 1 has {len(user_posts)} posts")
if __name__ == "__main__":
main()
Let's put your new skills to work with a practical exercise. You'll build a data pipeline that fetches user and post data from an API, handling all the complexities we've covered.
Scenario: Your marketing team needs a daily report of user engagement metrics. You need to fetch all users and their posts, then calculate engagement statistics.
import pandas as pd
from datetime import datetime
import json
def build_engagement_report():
"""
Build a comprehensive engagement report using API data
"""
client = APIClient("https://jsonplaceholder.typicode.com")
print("Step 1: Fetching all users...")
users = client.fetch_all_paginated('users', pagination_style='offset')
print("Step 2: Fetching all posts...")
posts = client.fetch_all_paginated('posts', pagination_style='offset')
print("Step 3: Fetching comments for engagement metrics...")
comments = client.fetch_all_paginated('comments', pagination_style='offset')
# Convert to DataFrames for analysis
users_df = pd.DataFrame(users)
posts_df = pd.DataFrame(posts)
comments_df = pd.DataFrame(comments)
print("Step 4: Calculating engagement metrics...")
# Posts per user
posts_per_user = posts_df.groupby('userId').size().reset_index(name='post_count')
# Comments per post
comments_per_post = comments_df.groupby('postId').size().reset_index(name='comment_count')
posts_with_comments = posts_df.merge(comments_per_post, left_on='id',
right_on='postId', how='left')
posts_with_comments['comment_count'] = posts_with_comments['comment_count'].fillna(0)
# Aggregate by user
user_engagement = posts_with_comments.groupby('userId').agg({
'id': 'count', # number of posts
'comment_count': 'sum' # total comments received
}).reset_index()
user_engagement.columns = ['userId', 'total_posts', 'total_comments']
# Merge with user info
final_report = users_df[['id', 'name', 'email', 'company']].merge(
user_engagement, left_on='id', right_on='userId', how='left'
)
final_report['avg_comments_per_post'] = final_report['total_comments'] / final_report['total_posts']
final_report = final_report.fillna(0)
print("Step 5: Saving report...")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"engagement_report_{timestamp}.csv"
final_report.to_csv(filename, index=False)
print(f"Report saved to {filename}")
print("\nTop 5 most engaged users:")
print(final_report.nlargest(5, 'total_comments')[['name', 'total_posts', 'total_comments']])
return final_report
# Run the exercise
if __name__ == "__main__":
report = build_engagement_report()
Your task:
Mistake 1: Not handling empty responses
# Wrong: Assumes response always has data
def bad_pagination(endpoint):
page = 1
while True:
data = make_api_request(f"{endpoint}?page={page}")
all_data.extend(data) # Will crash if data is None
page += 1
# Right: Check for empty/None responses
def good_pagination(endpoint):
page = 1
while True:
data = make_api_request(f"{endpoint}?page={page}")
if not data or len(data) == 0: # Handle None and empty list
break
all_data.extend(data)
page += 1
Mistake 2: Ignoring API documentation pagination details Always read the API docs carefully. Some APIs:
limit vs per_page, offset vs skip)Mistake 3: Not implementing exponential backoff When you get rate limited or server errors, don't just retry immediately:
# Wrong: Immediate retry
for attempt in range(3):
response = make_request()
if response.status_code == 500:
continue # Try again immediately
# Right: Exponential backoff
for attempt in range(3):
response = make_request()
if response.status_code == 500:
wait_time = 2 ** attempt # 1s, 2s, 4s
time.sleep(wait_time)
continue
Troubleshooting checklist:
Warning: Never hardcode API keys in your source code. Use environment variables or configuration files that aren't committed to version control. Most API breaches happen because credentials were accidentally exposed in code repositories.
You now have a solid foundation for working with REST APIs in your data pipelines. You understand how to handle pagination patterns, implement rate limiting, and build robust clients that can recover from common errors. These skills will serve you well whether you're integrating with major platforms like Salesforce and HubSpot or working with internal company APIs.
Key takeaways:
Next steps to deepen your API skills:
The data landscape is increasingly API-driven. Master these fundamentals, and you'll be prepared to integrate virtually any data source into your pipelines reliably and efficiently.
Learning Path: Data Pipeline Fundamentals