Building a Rate Limiter in Python: A Practical Guide


Rate limiting is a critical technique for protecting APIs from abuse, ensuring fair resource allocation, and maintaining service stability. In this tutorial, we’ll build a token bucket rate limiter from scratch in Python.

What You’ll Learn

  • The token bucket algorithm and why it’s effective
  • Implementation patterns for rate limiting
  • Testing strategies for concurrent scenarios
  • Production considerations

Prerequisites

  • Python 3.8+
  • Basic understanding of decorators
  • Familiarity with threading concepts

The Token Bucket Algorithm

The token bucket algorithm works like this:

  1. A bucket holds tokens, with a maximum capacity
  2. Tokens are added at a fixed rate
  3. Each request consumes one (or more) tokens
  4. If no tokens are available, the request is rejected

This approach allows for bursts of traffic while maintaining an average rate limit.

Basic Implementation

Let’s start with a simple in-memory rate limiter:

import time
import threading
from typing import Dict, Tuple

class TokenBucket:
    def __init__(self, rate: float, capacity: int):
        """
        rate: tokens added per second
        capacity: maximum tokens the bucket can hold
        """
        self.rate = rate
        self.capacity = capacity
        self.tokens = capacity
        self.last_update = time.time()
        self.lock = threading.Lock()
    
    def _refill(self):
        """Add tokens based on time elapsed"""
        now = time.time()
        elapsed = now - self.last_update
        
        # Add tokens based on elapsed time
        tokens_to_add = elapsed * self.rate
        self.tokens = min(self.capacity, self.tokens + tokens_to_add)
        self.last_update = now
    
    def consume(self, tokens: int = 1) -> bool:
        """
        Try to consume tokens. Returns True if successful.
        """
        with self.lock:
            self._refill()
            
            if self.tokens >= tokens:
                self.tokens -= tokens
                return True
            return False

Making It Reusable: A Rate Limiter Class

Now let’s create a rate limiter that manages multiple buckets (one per user/IP):

class RateLimiter:
    def __init__(self, rate: float, capacity: int):
        self.rate = rate
        self.capacity = capacity
        self.buckets: Dict[str, TokenBucket] = {}
        self.lock = threading.Lock()
    
    def get_bucket(self, key: str) -> TokenBucket:
        """Get or create a bucket for the given key"""
        with self.lock:
            if key not in self.buckets:
                self.buckets[key] = TokenBucket(self.rate, self.capacity)
            return self.buckets[key]
    
    def is_allowed(self, key: str, tokens: int = 1) -> bool:
        """Check if the request should be allowed"""
        bucket = self.get_bucket(key)
        return bucket.consume(tokens)
    
    def cleanup_old_buckets(self, max_idle_time: int = 3600):
        """Remove buckets that haven't been used recently"""
        with self.lock:
            now = time.time()
            keys_to_remove = [
                key for key, bucket in self.buckets.items()
                if now - bucket.last_update > max_idle_time
            ]
            for key in keys_to_remove:
                del self.buckets[key]

Flask Integration

Here’s how to use it as a Flask decorator:

from flask import Flask, request, jsonify
from functools import wraps

app = Flask(__name__)
limiter = RateLimiter(rate=10.0, capacity=20)  # 10 req/sec, burst of 20

def rate_limit(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        # Use user_id if authenticated, otherwise IP
        key = request.headers.get('X-User-ID') or request.remote_addr
        
        if not limiter.is_allowed(key):
            return jsonify({
                'error': 'Rate limit exceeded',
                'retry_after': 1.0 / limiter.rate
            }), 429
        
        return func(*args, **kwargs)
    return wrapper

@app.route('/api/search')
@rate_limit
def search():
    query = request.args.get('q', '')
    # Process search...
    return jsonify({'results': []})

Testing Your Rate Limiter

Here’s a simple test to verify the behavior:

import pytest
import time

def test_token_bucket_basic():
    bucket = TokenBucket(rate=10.0, capacity=10)
    
    # Should allow up to capacity
    for _ in range(10):
        assert bucket.consume() is True
    
    # Should deny when empty
    assert bucket.consume() is False
    
    # Should refill over time
    time.sleep(0.5)  # Wait for 5 tokens to refill
    for _ in range(5):
        assert bucket.consume() is True

def test_rate_limiter_per_key():
    limiter = RateLimiter(rate=5.0, capacity=5)
    
    # Different keys should have independent buckets
    assert limiter.is_allowed('user1') is True
    assert limiter.is_allowed('user2') is True

Production Considerations

1. Distributed Systems

For multi-server deployments, use Redis:

import redis
import json

class RedisRateLimiter:
    def __init__(self, redis_client, rate: float, capacity: int):
        self.redis = redis_client
        self.rate = rate
        self.capacity = capacity
    
    def is_allowed(self, key: str) -> bool:
        lua_script = """
        local key = KEYS[1]
        local rate = tonumber(ARGV[1])
        local capacity = tonumber(ARGV[2])
        local now = tonumber(ARGV[3])
        
        local bucket = redis.call('HMGET', key, 'tokens', 'last_update')
        local tokens = tonumber(bucket[1]) or capacity
        local last_update = tonumber(bucket[2]) or now
        
        local elapsed = now - last_update
        tokens = math.min(capacity, tokens + elapsed * rate)
        
        if tokens >= 1 then
            tokens = tokens - 1
            redis.call('HMSET', key, 'tokens', tokens, 'last_update', now)
            redis.call('EXPIRE', key, 3600)
            return 1
        end
        return 0
        """
        
        result = self.redis.eval(
            lua_script, 1, f"ratelimit:{key}",
            self.rate, self.capacity, time.time()
        )
        return bool(result)

2. Monitoring

Track rate limit metrics:

  • Requests allowed vs. denied
  • Per-user consumption patterns
  • Peak usage times

3. Graceful Degradation

Consider returning Retry-After headers and implementing exponential backoff on the client side.

Summary

We’ve built a production-ready rate limiter from scratch! Key takeaways:

  • Token bucket provides burst tolerance with average rate control
  • Thread-safe implementation is critical
  • Redis enables distributed rate limiting
  • Testing concurrent scenarios prevents race conditions

Next Steps

  • Explore sliding window algorithms for stricter limits
  • Implement tiered rate limits (free vs. premium users)
  • Add dynamic rate adjustment based on server load

Further Reading


Questions or improvements? Reach out via the contact page or open an issue on GitHub!