Wicked Smart Data
LearnArticlesAbout
Sign InSign Up
LearnArticlesAboutContact
Sign InSign Up
Wicked Smart Data

The go-to platform for professionals who want to master data, automation, and AI — from Excel fundamentals to cutting-edge machine learning.

Platform

  • Learning Paths
  • Articles
  • About
  • Contact

Connect

  • Contact Us
  • RSS Feed

© 2026 Wicked Smart Data. All rights reserved.

Privacy PolicyTerms of Service
All Articles
Working with APIs: REST, Pagination, and Rate Limiting for Data Engineers

Working with APIs: REST, Pagination, and Rate Limiting for Data Engineers

Data Engineering🌱 Foundation13 min readApr 4, 2026Updated Apr 4, 2026
Table of Contents
  • Prerequisites
  • Understanding REST APIs: Your Gateway to External Data
  • The Pagination Challenge: Getting All Your Data
  • Offset-Based Pagination
  • Cursor-Based Pagination
  • Page Number Pagination
  • Rate Limiting: Playing Nice with API Providers
  • Implementing Basic Rate Limiting
  • Respecting API Response Headers
  • Building a Robust API Client
  • Hands-On Exercise
  • Common Mistakes & Troubleshooting
  • Summary & Next Steps

You're building a dashboard that shows daily sales metrics from your company's e-commerce platform. The platform provides an API to access this data, but when you make your first request, you only get 50 records back — not the thousands you expected. When you try to get more data by making rapid successive calls, the API starts returning error messages about "rate limits exceeded." Welcome to the real world of API integration.

Working with APIs is a fundamental skill for data professionals. Whether you're pulling customer data from a CRM, fetching financial metrics from a payment processor, or accessing social media analytics, you'll need to understand how to properly request data, handle large datasets that come in chunks, and respect the API's usage limits. These aren't just technical details — they're essential for building reliable data pipelines that won't break in production.

What you'll learn:

  • How REST APIs work and why they're the standard for data access
  • How to handle pagination to retrieve large datasets completely
  • How to implement rate limiting to avoid API blocks and maintain good relationships with data providers
  • How to build robust API clients that handle errors gracefully
  • Best practices for API authentication and data retrieval in production environments

Prerequisites

You should be comfortable with basic Python programming, including functions, loops, and error handling. Familiarity with JSON data format is helpful but not required — we'll cover what you need to know.

Understanding REST APIs: Your Gateway to External Data

REST (Representational State Transfer) is an architectural style for building web APIs that has become the de facto standard for data access. Think of a REST API like a digital librarian: you make specific requests for information, and it returns exactly what you asked for in a standardized format.

REST APIs use HTTP methods to define actions:

  • GET: Retrieve data (like asking the librarian for a book)
  • POST: Create new data (like donating a new book)
  • PUT: Update existing data (like revising a book)
  • DELETE: Remove data (like removing a book from circulation)

For data engineering, you'll primarily use GET requests to retrieve data. Let's start with a practical example using a real API.

import requests
import json
from time import sleep
from datetime import datetime

# Let's work with a public API that provides fake e-commerce data
base_url = "https://jsonplaceholder.typicode.com"

def make_api_request(endpoint, params=None):
    """
    Make a basic API request with error handling
    """
    url = f"{base_url}/{endpoint}"
    
    try:
        response = requests.get(url, params=params)
        response.raise_for_status()  # Raises an exception for HTTP error codes
        return response.json()
    except requests.exceptions.RequestException as e:
        print(f"API request failed: {e}")
        return None

# Get all posts (this represents your data records)
posts = make_api_request("posts")
if posts:
    print(f"Retrieved {len(posts)} posts")
    print("First post:", json.dumps(posts[0], indent=2))

This basic example shows the foundation of API work: making requests, handling errors, and parsing JSON responses. But real-world APIs have complications that this simple example doesn't address.

The Pagination Challenge: Getting All Your Data

Most APIs don't return all available data in a single request. Imagine if Gmail tried to load every email you've ever received on the first page load — it would be slow and overwhelming. Instead, APIs use pagination to break large datasets into manageable chunks.

There are several pagination patterns you'll encounter:

Offset-Based Pagination

This is like page numbers in a book. You specify which "page" you want and how many items per page.

def fetch_paginated_data_offset(endpoint, page_size=20):
    """
    Fetch all data using offset-based pagination
    """
    all_data = []
    offset = 0
    
    while True:
        params = {
            'limit': page_size,
            'offset': offset
        }
        
        print(f"Fetching data with offset {offset}...")
        response = make_api_request(endpoint, params)
        
        if not response or len(response) == 0:
            break
            
        all_data.extend(response)
        
        # If we got fewer items than requested, we've reached the end
        if len(response) < page_size:
            break
            
        offset += page_size
        
        # Be respectful - don't hammer the API
        sleep(0.5)
    
    return all_data

# Example usage
all_posts = fetch_paginated_data_offset("posts")
print(f"Total posts retrieved: {len(all_posts)}")

Cursor-Based Pagination

This approach uses a "cursor" or token to mark your position in the dataset. It's more reliable for data that changes frequently because it doesn't suffer from the "shifting data" problem that offset pagination has.

def fetch_paginated_data_cursor(endpoint, page_size=20):
    """
    Fetch all data using cursor-based pagination
    Note: This is a simulated example since our test API doesn't support cursors
    """
    all_data = []
    cursor = None
    
    while True:
        params = {'limit': page_size}
        if cursor:
            params['cursor'] = cursor
            
        print(f"Fetching data with cursor: {cursor or 'initial'}...")
        
        # In a real cursor-based API, you'd get both data and next cursor
        response = make_api_request(endpoint, params)
        
        if not response:
            break
            
        # Extract the actual data (this varies by API)
        data = response.get('data', response)
        if not data:
            break
            
        all_data.extend(data)
        
        # Get the next cursor from the response
        cursor = response.get('next_cursor')
        if not cursor:
            break
            
        sleep(0.5)
    
    return all_data

Page Number Pagination

Some APIs use traditional page numbers. Here's how to handle that pattern:

def fetch_paginated_data_pages(endpoint, page_size=20):
    """
    Fetch all data using page number pagination
    """
    all_data = []
    page = 1
    
    while True:
        params = {
            'per_page': page_size,
            'page': page
        }
        
        print(f"Fetching page {page}...")
        response = make_api_request(endpoint, params)
        
        if not response or len(response) == 0:
            break
            
        all_data.extend(response)
        
        # If we got fewer items than requested, we've reached the end
        if len(response) < page_size:
            break
            
        page += 1
        sleep(0.5)
    
    return all_data

Pro Tip: Always check the API documentation for pagination details. Some APIs include metadata in their responses that tells you the total number of records, current page, and whether there are more pages available. This information can make your pagination logic more efficient and reliable.

Rate Limiting: Playing Nice with API Providers

Rate limiting is how API providers protect their servers from being overwhelmed. Think of it like a toll booth on a highway — only so many cars can pass through per minute. Exceed the limit, and you'll get blocked.

Common rate limiting patterns include:

  • Requests per minute/hour: "100 requests per hour"
  • Burst limits: "10 requests per second, 1000 per hour"
  • Concurrent request limits: "No more than 3 simultaneous requests"

Implementing Basic Rate Limiting

import time
from collections import deque

class RateLimiter:
    """
    A simple rate limiter using a sliding window approach
    """
    def __init__(self, max_requests, time_window):
        self.max_requests = max_requests
        self.time_window = time_window
        self.requests = deque()
    
    def wait_if_needed(self):
        """
        Wait if necessary to stay within rate limits
        """
        now = time.time()
        
        # Remove requests older than our time window
        while self.requests and self.requests[0] < now - self.time_window:
            self.requests.popleft()
        
        # If we're at the limit, wait until the oldest request expires
        if len(self.requests) >= self.max_requests:
            sleep_time = self.requests[0] + self.time_window - now
            if sleep_time > 0:
                print(f"Rate limit reached. Sleeping for {sleep_time:.2f} seconds...")
                time.sleep(sleep_time)
                self.wait_if_needed()  # Recheck after sleeping
        
        # Record this request
        self.requests.append(now)

# Usage example
rate_limiter = RateLimiter(max_requests=60, time_window=60)  # 60 requests per minute

def make_rate_limited_request(endpoint, params=None):
    """
    Make an API request with rate limiting
    """
    rate_limiter.wait_if_needed()
    return make_api_request(endpoint, params)

Respecting API Response Headers

Many APIs include rate limiting information in their response headers. Here's how to use this information:

def make_smart_api_request(endpoint, params=None):
    """
    Make an API request and respect rate limiting headers
    """
    url = f"{base_url}/{endpoint}"
    
    try:
        response = requests.get(url, params=params)
        
        # Check for rate limiting headers (common patterns)
        remaining = response.headers.get('X-RateLimit-Remaining')
        reset_time = response.headers.get('X-RateLimit-Reset')
        retry_after = response.headers.get('Retry-After')
        
        if response.status_code == 429:  # Too Many Requests
            if retry_after:
                wait_time = int(retry_after)
                print(f"Rate limited. Waiting {wait_time} seconds...")
                time.sleep(wait_time)
                return make_smart_api_request(endpoint, params)  # Retry
            else:
                print("Rate limited but no retry time provided")
                return None
        
        if remaining and int(remaining) < 5:
            print(f"Low rate limit remaining: {remaining}")
            # Maybe slow down or implement backoff
            time.sleep(1)
        
        response.raise_for_status()
        return response.json()
        
    except requests.exceptions.RequestException as e:
        print(f"API request failed: {e}")
        return None

Building a Robust API Client

Now let's combine everything into a production-ready API client that handles pagination, rate limiting, and common error scenarios:

import requests
import time
import json
from typing import List, Dict, Any, Optional
import logging

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class APIClient:
    """
    A robust API client with pagination and rate limiting
    """
    
    def __init__(self, base_url: str, rate_limit: int = 60, 
                 time_window: int = 60, max_retries: int = 3):
        self.base_url = base_url.rstrip('/')
        self.session = requests.Session()
        self.rate_limiter = RateLimiter(rate_limit, time_window)
        self.max_retries = max_retries
    
    def set_auth(self, auth_type: str, **kwargs):
        """
        Set up authentication for the API
        """
        if auth_type == 'bearer':
            token = kwargs.get('token')
            self.session.headers.update({'Authorization': f'Bearer {token}'})
        elif auth_type == 'api_key':
            key = kwargs.get('key')
            header = kwargs.get('header', 'X-API-Key')
            self.session.headers.update({header: key})
        elif auth_type == 'basic':
            username = kwargs.get('username')
            password = kwargs.get('password')
            self.session.auth = (username, password)
    
    def make_request(self, endpoint: str, params: Optional[Dict] = None, 
                    method: str = 'GET') -> Optional[Dict]:
        """
        Make a single API request with error handling and retries
        """
        url = f"{self.base_url}/{endpoint.lstrip('/')}"
        
        for attempt in range(self.max_retries + 1):
            try:
                self.rate_limiter.wait_if_needed()
                
                response = self.session.request(method, url, params=params)
                
                # Handle rate limiting
                if response.status_code == 429:
                    retry_after = int(response.headers.get('Retry-After', 60))
                    logger.warning(f"Rate limited. Waiting {retry_after} seconds...")
                    time.sleep(retry_after)
                    continue
                
                # Handle server errors with exponential backoff
                if response.status_code >= 500:
                    if attempt < self.max_retries:
                        wait_time = 2 ** attempt
                        logger.warning(f"Server error {response.status_code}. "
                                     f"Retrying in {wait_time} seconds...")
                        time.sleep(wait_time)
                        continue
                
                response.raise_for_status()
                return response.json()
                
            except requests.exceptions.ConnectionError:
                if attempt < self.max_retries:
                    wait_time = 2 ** attempt
                    logger.warning(f"Connection error. Retrying in {wait_time} seconds...")
                    time.sleep(wait_time)
                    continue
                logger.error("Max retries exceeded for connection errors")
                return None
                
            except requests.exceptions.RequestException as e:
                logger.error(f"Request failed: {e}")
                return None
        
        return None
    
    def fetch_all_paginated(self, endpoint: str, pagination_style: str = 'offset',
                          page_size: int = 100, **kwargs) -> List[Dict]:
        """
        Fetch all data from a paginated endpoint
        """
        all_data = []
        
        if pagination_style == 'offset':
            offset = 0
            while True:
                params = {
                    'limit': page_size,
                    'offset': offset,
                    **kwargs
                }
                
                logger.info(f"Fetching {endpoint} with offset {offset}")
                response = self.make_request(endpoint, params)
                
                if not response:
                    break
                
                # Handle different response formats
                data = self._extract_data(response)
                if not data:
                    break
                
                all_data.extend(data)
                
                if len(data) < page_size:
                    break
                
                offset += page_size
                
        elif pagination_style == 'page':
            page = 1
            while True:
                params = {
                    'per_page': page_size,
                    'page': page,
                    **kwargs
                }
                
                logger.info(f"Fetching {endpoint} page {page}")
                response = self.make_request(endpoint, params)
                
                if not response:
                    break
                
                data = self._extract_data(response)
                if not data:
                    break
                
                all_data.extend(data)
                
                if len(data) < page_size:
                    break
                
                page += 1
        
        logger.info(f"Retrieved {len(all_data)} total records from {endpoint}")
        return all_data
    
    def _extract_data(self, response: Dict) -> List[Dict]:
        """
        Extract data from API response, handling different formats
        """
        # If response is already a list
        if isinstance(response, list):
            return response
        
        # Common data keys in API responses
        for key in ['data', 'results', 'items', 'records']:
            if key in response and isinstance(response[key], list):
                return response[key]
        
        # If no standard key found, return the whole response as single item
        return [response] if response else []

# Example usage
def main():
    # Initialize the client
    client = APIClient("https://jsonplaceholder.typicode.com", rate_limit=10, time_window=60)
    
    # If using a real API with authentication:
    # client.set_auth('bearer', token='your-api-token')
    
    # Fetch all posts using pagination
    all_posts = client.fetch_all_paginated('posts', pagination_style='offset', page_size=20)
    
    print(f"Successfully retrieved {len(all_posts)} posts")
    
    # Get specific data with filters
    user_posts = client.fetch_all_paginated('posts', pagination_style='offset', 
                                           page_size=10, userId=1)
    
    print(f"User 1 has {len(user_posts)} posts")

if __name__ == "__main__":
    main()

Hands-On Exercise

Let's put your new skills to work with a practical exercise. You'll build a data pipeline that fetches user and post data from an API, handling all the complexities we've covered.

Scenario: Your marketing team needs a daily report of user engagement metrics. You need to fetch all users and their posts, then calculate engagement statistics.

import pandas as pd
from datetime import datetime
import json

def build_engagement_report():
    """
    Build a comprehensive engagement report using API data
    """
    client = APIClient("https://jsonplaceholder.typicode.com")
    
    print("Step 1: Fetching all users...")
    users = client.fetch_all_paginated('users', pagination_style='offset')
    
    print("Step 2: Fetching all posts...")
    posts = client.fetch_all_paginated('posts', pagination_style='offset')
    
    print("Step 3: Fetching comments for engagement metrics...")
    comments = client.fetch_all_paginated('comments', pagination_style='offset')
    
    # Convert to DataFrames for analysis
    users_df = pd.DataFrame(users)
    posts_df = pd.DataFrame(posts)
    comments_df = pd.DataFrame(comments)
    
    print("Step 4: Calculating engagement metrics...")
    
    # Posts per user
    posts_per_user = posts_df.groupby('userId').size().reset_index(name='post_count')
    
    # Comments per post
    comments_per_post = comments_df.groupby('postId').size().reset_index(name='comment_count')
    posts_with_comments = posts_df.merge(comments_per_post, left_on='id', 
                                       right_on='postId', how='left')
    posts_with_comments['comment_count'] = posts_with_comments['comment_count'].fillna(0)
    
    # Aggregate by user
    user_engagement = posts_with_comments.groupby('userId').agg({
        'id': 'count',  # number of posts
        'comment_count': 'sum'  # total comments received
    }).reset_index()
    
    user_engagement.columns = ['userId', 'total_posts', 'total_comments']
    
    # Merge with user info
    final_report = users_df[['id', 'name', 'email', 'company']].merge(
        user_engagement, left_on='id', right_on='userId', how='left'
    )
    
    final_report['avg_comments_per_post'] = final_report['total_comments'] / final_report['total_posts']
    final_report = final_report.fillna(0)
    
    print("Step 5: Saving report...")
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    filename = f"engagement_report_{timestamp}.csv"
    final_report.to_csv(filename, index=False)
    
    print(f"Report saved to {filename}")
    print("\nTop 5 most engaged users:")
    print(final_report.nlargest(5, 'total_comments')[['name', 'total_posts', 'total_comments']])
    
    return final_report

# Run the exercise
if __name__ == "__main__":
    report = build_engagement_report()

Your task:

  1. Run this code and examine the output
  2. Modify the rate limiter to be more aggressive (try 5 requests per minute) and observe the behavior
  3. Add error handling for network timeouts
  4. Extend the report to include additional metrics like average post length

Common Mistakes & Troubleshooting

Mistake 1: Not handling empty responses

# Wrong: Assumes response always has data
def bad_pagination(endpoint):
    page = 1
    while True:
        data = make_api_request(f"{endpoint}?page={page}")
        all_data.extend(data)  # Will crash if data is None
        page += 1

# Right: Check for empty/None responses
def good_pagination(endpoint):
    page = 1
    while True:
        data = make_api_request(f"{endpoint}?page={page}")
        if not data or len(data) == 0:  # Handle None and empty list
            break
        all_data.extend(data)
        page += 1

Mistake 2: Ignoring API documentation pagination details Always read the API docs carefully. Some APIs:

  • Start page numbering at 0, others at 1
  • Include total count information you can use
  • Have different parameter names (limit vs per_page, offset vs skip)

Mistake 3: Not implementing exponential backoff When you get rate limited or server errors, don't just retry immediately:

# Wrong: Immediate retry
for attempt in range(3):
    response = make_request()
    if response.status_code == 500:
        continue  # Try again immediately

# Right: Exponential backoff
for attempt in range(3):
    response = make_request()
    if response.status_code == 500:
        wait_time = 2 ** attempt  # 1s, 2s, 4s
        time.sleep(wait_time)
        continue

Troubleshooting checklist:

  • Check your API credentials and permissions
  • Verify the endpoint URL is correct
  • Look for rate limiting headers in responses
  • Monitor your request patterns — are you making too many requests too quickly?
  • Check if the API has changed their pagination format
  • Always log API responses during development to understand the data structure

Warning: Never hardcode API keys in your source code. Use environment variables or configuration files that aren't committed to version control. Most API breaches happen because credentials were accidentally exposed in code repositories.

Summary & Next Steps

You now have a solid foundation for working with REST APIs in your data pipelines. You understand how to handle pagination patterns, implement rate limiting, and build robust clients that can recover from common errors. These skills will serve you well whether you're integrating with major platforms like Salesforce and HubSpot or working with internal company APIs.

Key takeaways:

  • Always implement pagination when fetching large datasets
  • Respect rate limits to maintain good relationships with API providers
  • Build in retry logic with exponential backoff for resilient pipelines
  • Use proper authentication and never expose API keys
  • Log your API interactions for debugging and monitoring

Next steps to deepen your API skills:

  1. Learn about webhook APIs for real-time data processing
  2. Explore GraphQL APIs, which are becoming more common for flexible data queries
  3. Study API authentication patterns beyond basic tokens (OAuth 2.0, JWT)
  4. Practice with async/await patterns for concurrent API requests
  5. Learn to monitor and alert on API pipeline health

The data landscape is increasingly API-driven. Master these fundamentals, and you'll be prepared to integrate virtually any data source into your pipelines reliably and efficiently.

Learning Path: Data Pipeline Fundamentals

Previous

Advanced API Integration for Data Pipelines: Mastering REST, Pagination, and Rate Limiting

Next

Incremental Loading Patterns: Timestamps, CDC, and Watermarks

Related Articles

Data Engineering⚡ Practitioner

Cost Management in Cloud Data Platforms

28 min
Data Engineering🌱 Foundation

Real-Time Data: When to Use Streaming vs Batch Processing

21 min
Data Engineering🔥 Expert

Data Governance: Catalogs, Lineage, and Access Controls

28 min

On this page

  • Prerequisites
  • Understanding REST APIs: Your Gateway to External Data
  • The Pagination Challenge: Getting All Your Data
  • Offset-Based Pagination
  • Cursor-Based Pagination
  • Page Number Pagination
  • Rate Limiting: Playing Nice with API Providers
  • Implementing Basic Rate Limiting
  • Respecting API Response Headers
  • Building a Robust API Client
Hands-On Exercise
  • Common Mistakes & Troubleshooting
  • Summary & Next Steps