Building a Rate Limiter in Python: A Practical Guide
Rate limiting is a critical technique for protecting APIs from abuse, ensuring fair resource allocation, and maintaining service stability. In this tutorial, we’ll build a token bucket rate limiter from scratch in Python.
What You’ll Learn
- The token bucket algorithm and why it’s effective
- Implementation patterns for rate limiting
- Testing strategies for concurrent scenarios
- Production considerations
Prerequisites
- Python 3.8+
- Basic understanding of decorators
- Familiarity with threading concepts
The Token Bucket Algorithm
The token bucket algorithm works like this:
- A bucket holds tokens, with a maximum capacity
- Tokens are added at a fixed rate
- Each request consumes one (or more) tokens
- If no tokens are available, the request is rejected
This approach allows for bursts of traffic while maintaining an average rate limit.
Basic Implementation
Let’s start with a simple in-memory rate limiter:
import time
import threading
from typing import Dict, Tuple
class TokenBucket:
def __init__(self, rate: float, capacity: int):
"""
rate: tokens added per second
capacity: maximum tokens the bucket can hold
"""
self.rate = rate
self.capacity = capacity
self.tokens = capacity
self.last_update = time.time()
self.lock = threading.Lock()
def _refill(self):
"""Add tokens based on time elapsed"""
now = time.time()
elapsed = now - self.last_update
# Add tokens based on elapsed time
tokens_to_add = elapsed * self.rate
self.tokens = min(self.capacity, self.tokens + tokens_to_add)
self.last_update = now
def consume(self, tokens: int = 1) -> bool:
"""
Try to consume tokens. Returns True if successful.
"""
with self.lock:
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
Making It Reusable: A Rate Limiter Class
Now let’s create a rate limiter that manages multiple buckets (one per user/IP):
class RateLimiter:
def __init__(self, rate: float, capacity: int):
self.rate = rate
self.capacity = capacity
self.buckets: Dict[str, TokenBucket] = {}
self.lock = threading.Lock()
def get_bucket(self, key: str) -> TokenBucket:
"""Get or create a bucket for the given key"""
with self.lock:
if key not in self.buckets:
self.buckets[key] = TokenBucket(self.rate, self.capacity)
return self.buckets[key]
def is_allowed(self, key: str, tokens: int = 1) -> bool:
"""Check if the request should be allowed"""
bucket = self.get_bucket(key)
return bucket.consume(tokens)
def cleanup_old_buckets(self, max_idle_time: int = 3600):
"""Remove buckets that haven't been used recently"""
with self.lock:
now = time.time()
keys_to_remove = [
key for key, bucket in self.buckets.items()
if now - bucket.last_update > max_idle_time
]
for key in keys_to_remove:
del self.buckets[key]
Flask Integration
Here’s how to use it as a Flask decorator:
from flask import Flask, request, jsonify
from functools import wraps
app = Flask(__name__)
limiter = RateLimiter(rate=10.0, capacity=20) # 10 req/sec, burst of 20
def rate_limit(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Use user_id if authenticated, otherwise IP
key = request.headers.get('X-User-ID') or request.remote_addr
if not limiter.is_allowed(key):
return jsonify({
'error': 'Rate limit exceeded',
'retry_after': 1.0 / limiter.rate
}), 429
return func(*args, **kwargs)
return wrapper
@app.route('/api/search')
@rate_limit
def search():
query = request.args.get('q', '')
# Process search...
return jsonify({'results': []})
Testing Your Rate Limiter
Here’s a simple test to verify the behavior:
import pytest
import time
def test_token_bucket_basic():
bucket = TokenBucket(rate=10.0, capacity=10)
# Should allow up to capacity
for _ in range(10):
assert bucket.consume() is True
# Should deny when empty
assert bucket.consume() is False
# Should refill over time
time.sleep(0.5) # Wait for 5 tokens to refill
for _ in range(5):
assert bucket.consume() is True
def test_rate_limiter_per_key():
limiter = RateLimiter(rate=5.0, capacity=5)
# Different keys should have independent buckets
assert limiter.is_allowed('user1') is True
assert limiter.is_allowed('user2') is True
Production Considerations
1. Distributed Systems
For multi-server deployments, use Redis:
import redis
import json
class RedisRateLimiter:
def __init__(self, redis_client, rate: float, capacity: int):
self.redis = redis_client
self.rate = rate
self.capacity = capacity
def is_allowed(self, key: str) -> bool:
lua_script = """
local key = KEYS[1]
local rate = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local bucket = redis.call('HMGET', key, 'tokens', 'last_update')
local tokens = tonumber(bucket[1]) or capacity
local last_update = tonumber(bucket[2]) or now
local elapsed = now - last_update
tokens = math.min(capacity, tokens + elapsed * rate)
if tokens >= 1 then
tokens = tokens - 1
redis.call('HMSET', key, 'tokens', tokens, 'last_update', now)
redis.call('EXPIRE', key, 3600)
return 1
end
return 0
"""
result = self.redis.eval(
lua_script, 1, f"ratelimit:{key}",
self.rate, self.capacity, time.time()
)
return bool(result)
2. Monitoring
Track rate limit metrics:
- Requests allowed vs. denied
- Per-user consumption patterns
- Peak usage times
3. Graceful Degradation
Consider returning Retry-After headers and implementing exponential backoff on the client side.
Summary
We’ve built a production-ready rate limiter from scratch! Key takeaways:
- Token bucket provides burst tolerance with average rate control
- Thread-safe implementation is critical
- Redis enables distributed rate limiting
- Testing concurrent scenarios prevents race conditions
Next Steps
- Explore sliding window algorithms for stricter limits
- Implement tiered rate limits (free vs. premium users)
- Add dynamic rate adjustment based on server load
Further Reading
- GCRA algorithm for more precise rate limiting
- Token bucket vs. leaky bucket
- Kong API Gateway rate limiting
Questions or improvements? Reach out via the contact page or open an issue on GitHub!