LLM API Integration Best Practices for Production Environments

Engineering

Technical guide to integrating LLM APIs (GPT-5, Claude Sonnet 4.5, Gemini 2.5 Pro) in production systems. Learn about error handling, rate limiting, cost optimization, and reliability patterns.

LLM API Integration Best Practices for Production Environments

Integrating LLM APIs into production systems requires careful attention to reliability, performance, and cost management. This guide covers essential patterns and practices for robust implementations.

Error Handling Strategies

Retry Logic with Exponential Backoff

Implement retries for transient failures:

  • Initial retry after 1 second
  • Double wait time for each subsequent retry
  • Maximum of 3-5 retries before failing
  • Add jitter to prevent thundering herd problem
  • Distinguish between retryable (500, 429) and non-retryable (400, 401) errors

Timeout Configuration

Set appropriate timeouts:

  • Connection timeout: 5-10 seconds
  • Read timeout: 30-120 seconds depending on task complexity
  • Implement graceful degradation when timeouts occur
  • Log timeout occurrences for monitoring

Fallback Mechanisms

  • Cached responses for common queries
  • Alternative models for non-critical features
  • Simplified responses when primary API unavailable
  • Queue requests for later processing during outages

Rate Limiting and Throttling

Client-Side Rate Limiting

Implement rate limiting before sending requests:

  • Token bucket algorithm for smooth rate control
  • Track requests per minute/hour based on tier
  • Queue excess requests rather than rejecting
  • Monitor rate limit headroom continuously

Handling 429 Responses

When receiving rate limit errors:

  • Read Retry-After header for wait duration
  • Implement exponential backoff if header absent
  • Alert when consistently hitting rate limits
  • Consider upgrading tier if limits consistently reached

Cost Optimization

Caching Strategies

Implement intelligent caching:

  • Semantic caching: Hash prompts by meaning, not exact text
  • TTL-based caching for time-sensitive content
  • LRU eviction for memory management
  • Cache warming for predictable queries

Prompt Optimization

  • Minimize unnecessary context in prompts
  • Use system prompts for repeated instructions
  • Compress verbose prompts without losing meaning
  • Monitor token usage per request type

Model Selection

  • Use smaller/cheaper models for simple tasks
  • Reserve advanced models (GPT-5, Claude Sonnet 4.5) for complex reasoning
  • Implement routing logic based on task complexity
  • A/B test model performance vs cost trade-offs

Code Example: Production-Ready API Client with Retry Logic

python
import anthropic
import openai
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import time
import logging
from typing import Optional

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class LLMClient:
    """Production-ready LLM API client with error handling and retries"""
    
    def __init__(self, provider: str = "anthropic"):
        self.provider = provider
        if provider == "anthropic":
            self.client = anthropic.Anthropic()
        elif provider == "openai":
            self.client = openai.OpenAI()
        else:
            raise ValueError(f"Unsupported provider: {provider}")
    
    @retry(
        stop=stop_after_attempt(5),
        wait=wait_exponential(multiplier=1, min=1, max=60),
        retry=retry_if_exception_type((anthropic.RateLimitError, openai.RateLimitError)),
        before_sleep=lambda retry_state: logger.info(f"Retrying after {retry_state.next_action.sleep} seconds...")
    )
    def generate_with_retry(self, prompt: str, max_tokens: int = 1024, temperature: float = 0.7) -> str:
        """Generate text with automatic retry on rate limits"""
        try:
            if self.provider == "anthropic":
                response = self.client.messages.create(
                    model="claude-sonnet-4.5",
                    max_tokens=max_tokens,
                    temperature=temperature,
                    messages=[{"role": "user", "content": prompt}]
                )
                return response.content[0].text
            
            elif self.provider == "openai":
                response = self.client.chat.completions.create(
                    model="gpt-5",
                    max_tokens=max_tokens,
                    temperature=temperature,
                    messages=[{"role": "user", "content": prompt}]
                )
                return response.choices[0].message.content
                
        except (anthropic.APIError, openai.APIError) as e:
            logger.error(f"API error: {str(e)}")
            raise
        except Exception as e:
            logger.error(f"Unexpected error: {str(e)}")
            raise
    
    def generate_with_fallback(self, prompt: str, fallback_provider: Optional[str] = None) -> str:
        """Generate with fallback to alternative provider"""
        try:
            return self.generate_with_retry(prompt)
        except Exception as e:
            if fallback_provider:
                logger.warning(f"Primary provider failed, trying fallback: {fallback_provider}")
                fallback_client = LLMClient(fallback_provider)
                return fallback_client.generate_with_retry(prompt)
            raise

# Usage example
client = LLMClient(provider="anthropic")

try:
    result = client.generate_with_fallback(
        "Explain quantum computing in simple terms",
        fallback_provider="openai"
    )
    print(f"Result: {result}")
except Exception as e:
    logger.error(f"All providers failed: {str(e)}")

Code Example: Rate Limiting with Token Bucket

python
import time
import threading
from collections import deque
from typing import Callable

class TokenBucketRateLimiter:
    """Token bucket algorithm for smooth rate limiting"""
    
    def __init__(self, requests_per_minute: int = 50, burst_size: int = 10):
        self.capacity = burst_size
        self.tokens = burst_size
        self.rate = requests_per_minute / 60.0  # tokens per second
        self.last_update = time.time()
        self.lock = threading.Lock()
    
    def _refill(self):
        """Refill tokens based on elapsed time"""
        now = time.time()
        elapsed = now - self.last_update
        
        # Add tokens based on rate
        tokens_to_add = elapsed * self.rate
        self.tokens = min(self.capacity, self.tokens + tokens_to_add)
        self.last_update = now
    
    def acquire(self, tokens: int = 1) -> bool:
        """Try to acquire tokens, returns True if successful"""
        with self.lock:
            self._refill()
            
            if self.tokens >= tokens:
                self.tokens -= tokens
                return True
            return False
    
    def wait_for_token(self, tokens: int = 1, timeout: float = 60.0):
        """Wait until tokens are available or timeout"""
        start_time = time.time()
        
        while time.time() - start_time < timeout:
            if self.acquire(tokens):
                return True
            time.sleep(0.1)  # Sleep briefly before retry
        
        raise TimeoutError("Rate limit timeout")

class RateLimitedLLMClient:
    """LLM client with built-in rate limiting"""
    
    def __init__(self, client: LLMClient, requests_per_minute: int = 50):
        self.client = client
        self.rate_limiter = TokenBucketRateLimiter(requests_per_minute=requests_per_minute)
    
    def generate(self, prompt: str, **kwargs) -> str:
        """Generate with rate limiting"""
        # Wait for rate limit token
        self.rate_limiter.wait_for_token()
        
        # Make the API call
        return self.client.generate_with_retry(prompt, **kwargs)

# Usage
base_client = LLMClient(provider="anthropic")
rate_limited_client = RateLimitedLLMClient(base_client, requests_per_minute=50)

# Make requests - automatically rate limited
for i in range(100):
    result = rate_limited_client.generate(f"Request {i}")
    print(f"Completed request {i}")

Code Example: Semantic Caching with Redis

python
import redis
import hashlib
import json
from sentence_transformers import SentenceTransformer
import numpy as np
from typing import Optional, Tuple

class SemanticCache:
    """Semantic caching for LLM responses using embeddings"""
    
    def __init__(self, redis_url: str = "redis://localhost:6379", similarity_threshold: float = 0.95):
        self.redis_client = redis.from_url(redis_url)
        self.embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
        self.similarity_threshold = similarity_threshold
        self.cache_prefix = "llm_cache:"
    
    def _get_embedding(self, text: str) -> np.ndarray:
        """Generate embedding for text"""
        return self.embedding_model.encode(text)
    
    def _cosine_similarity(self, a: np.ndarray, b: np.ndarray) -> float:
        """Calculate cosine similarity between two vectors"""
        return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
    
    def _hash_prompt(self, prompt: str) -> str:
        """Create hash for exact match lookups"""
        return hashlib.sha256(prompt.encode()).hexdigest()
    
    def get(self, prompt: str) -> Optional[str]:
        """Get cached response if similar prompt exists"""
        # Try exact match first
        exact_key = self.cache_prefix + self._hash_prompt(prompt)
        cached = self.redis_client.get(exact_key)
        if cached:
            return json.loads(cached)["response"]
        
        # Semantic similarity search
        query_embedding = self._get_embedding(prompt)
        
        # Get all cached prompts
        all_keys = self.redis_client.keys(self.cache_prefix + "*")
        
        for key in all_keys:
            cached_data = json.loads(self.redis_client.get(key))
            cached_embedding = np.array(cached_data["embedding"])
            
            similarity = self._cosine_similarity(query_embedding, cached_embedding)
            
            if similarity >= self.similarity_threshold:
                return cached_data["response"]
        
        return None
    
    def set(self, prompt: str, response: str, ttl: int = 3600):
        """Cache response with embedding"""
        embedding = self._get_embedding(prompt)
        
        cache_data = {
            "prompt": prompt,
            "response": response,
            "embedding": embedding.tolist()
        }
        
        key = self.cache_prefix + self._hash_prompt(prompt)
        self.redis_client.setex(key, ttl, json.dumps(cache_data))

class CachedLLMClient:
    """LLM client with semantic caching"""
    
    def __init__(self, client: LLMClient, cache: SemanticCache):
        self.client = client
        self.cache = cache
        self.cache_hits = 0
        self.cache_misses = 0
    
    def generate(self, prompt: str, **kwargs) -> Tuple[str, bool]:
        """Generate with caching - returns (response, from_cache)"""
        # Check cache first
        cached_response = self.cache.get(prompt)
        if cached_response:
            self.cache_hits += 1
            return cached_response, True
        
        # Cache miss - call API
        self.cache_misses += 1
        response = self.client.generate_with_retry(prompt, **kwargs)
        
        # Store in cache
        self.cache.set(prompt, response)
        
        return response, False
    
    def get_cache_stats(self) -> dict:
        """Get cache performance statistics"""
        total = self.cache_hits + self.cache_misses
        hit_rate = self.cache_hits / total if total > 0 else 0
        
        return {
            "cache_hits": self.cache_hits,
            "cache_misses": self.cache_misses,
            "hit_rate": f"{hit_rate:.2%}"
        }

# Usage example
base_client = LLMClient(provider="anthropic")
semantic_cache = SemanticCache()
cached_client = CachedLLMClient(base_client, semantic_cache)

# First call - cache miss
response1, from_cache1 = cached_client.generate("What is machine learning?")
print(f"Response 1 (from cache: {from_cache1}): {response1[:100]}...")

# Similar prompt - cache hit
response2, from_cache2 = cached_client.generate("Can you explain machine learning?")
print(f"Response 2 (from cache: {from_cache2}): {response2[:100]}...")

# Print cache statistics
print(cached_client.get_cache_stats())

Request Optimization

Batching

Where APIs support batching:

  • Accumulate requests over short time window (100-500ms)
  • Submit as single batch request
  • Distribute responses to original requestors
  • Balance latency vs throughput gains

Streaming Responses

Use streaming for user-facing applications:

  • Reduced perceived latency
  • Better user experience for long responses
  • Enable early termination if needed
  • Handle stream interruptions gracefully

Security Best Practices

API Key Management

  • Store keys in secure vaults (AWS Secrets Manager, HashiCorp Vault)
  • Rotate keys periodically
  • Use different keys per environment
  • Never commit keys to version control
  • Implement key rotation without downtime

Input Validation

  • Sanitize user inputs before inclusion in prompts
  • Implement maximum input length limits
  • Filter potentially harmful content
  • Validate input encoding and format

Output Validation

  • Validate response format matches expectations
  • Filter sensitive information from responses
  • Implement content moderation for user-facing outputs
  • Log suspicious responses for review

Monitoring and Observability

Key Metrics

  • Latency: p50, p95, p99 response times
  • Error rate: by error type and status code
  • Token usage: input and output tokens per request
  • Cost: daily and monthly spend by feature
  • Cache hit rate: effectiveness of caching layer
  • Rate limit proximity: how close to limits

Logging Strategy

Log comprehensively but efficiently:

  • Request metadata (timestamp, user ID, model used)
  • Token counts and cost
  • Response time and status
  • Error details for failures
  • Sample prompts and responses (respecting privacy)

Alerting

Configure alerts for:

  • Error rate exceeding threshold (e.g., >5%)
  • Latency degradation (e.g., p95 >10s)
  • Cost spikes (e.g., 50% above baseline)
  • Rate limit violations
  • Cache hit rate drops

Testing Strategies

Unit Testing

  • Mock API responses for deterministic tests
  • Test error handling paths
  • Validate retry logic
  • Test timeout behavior

Integration Testing

  • Test against actual APIs in staging
  • Use dedicated test API keys
  • Verify rate limiting behavior
  • Test with production-like data volumes

Load Testing

  • Simulate peak traffic scenarios
  • Identify rate limit breaking points
  • Measure latency under load
  • Verify auto-scaling behavior

Provider-Specific Considerations

OpenAI (GPT-5)

  • Rate limits vary by tier (free, plus, pro)
  • Streaming available via Server-Sent Events
  • Function calling for structured outputs
  • Vision capabilities for multimodal inputs

Anthropic (Claude Sonnet 4.5)

  • Higher rate limits than OpenAI on some tiers
  • Extended context windows (200K tokens)
  • Computer use capabilities require special setup
  • Available via AWS Bedrock and GCP Vertex AI

Google (Gemini 2.5 Pro)

  • Tight integration with Google Cloud services
  • Competitive pricing structure
  • Deep Think mode requires special configuration
  • Good multimodal capabilities

Deployment Checklist

  • Implement comprehensive error handling
  • Configure appropriate timeouts
  • Set up monitoring and alerting
  • Implement caching layer
  • Configure rate limiting
  • Secure API key storage
  • Add input/output validation
  • Set up cost tracking
  • Implement fallback strategies
  • Document API usage patterns

Production LLM integrations require attention to reliability, cost, and security. Following these practices ensures robust, maintainable systems that handle real-world conditions effectively.

Author

21medien

Last updated