Best PracticesΒΆ
Performance optimization, caching strategies, error handling, and security best practices for ReputeAPI integration.
OverviewΒΆ
This guide covers best practices to help you build robust, efficient, and secure integrations with ReputeAPI.
Topics Covered:
- Performance optimization
- Caching strategies
- Error handling and resilience
- Security best practices
- Rate limiting and cost optimization
- Monitoring and observability
Performance OptimizationΒΆ
1. Use the Right EndpointΒΆ
Choose the appropriate endpoint based on your needs:
Use /api/v1/score when:
- You only need the security score
- You're checking many domains quickly
- You want faster response times (150-300ms vs 400-700ms)
Use /api/v1/check when:
- You need detailed SPF/DKIM/DMARC results
- You want actionable remediation steps
- You need DNS snippets for fixes
Python:
# Fast score check for dashboard
def get_domain_score(domain: str) -> int:
result = client.get_score(domain)
return result['score']
# Full check for detailed analysis
def analyze_domain(domain: str) -> Dict:
result = client.check_domain(domain)
return {
'score': result['score'],
'issues': result['issues'],
'recommendations': result.get('recommendations', [])
}
JavaScript:
// Fast score check for dashboard
async function getDomainScore(domain) {
const result = await client.getScore(domain);
return result.score;
}
// Full check for detailed analysis
async function analyzeDomain(domain) {
const result = await client.checkDomain(domain);
return {
score: result.score,
issues: result.issues,
recommendations: result.recommendations || []
};
}
2. Implement Connection PoolingΒΆ
Reuse HTTP connections for better performance:
Python:
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def create_optimized_session():
"""Create session with connection pooling"""
session = requests.Session()
# Configure retries
retry_strategy = Retry(
total=3,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET", "POST"],
backoff_factor=1
)
# Configure connection pooling
adapter = HTTPAdapter(
max_retries=retry_strategy,
pool_connections=10, # Pool size
pool_maxsize=20, # Max pool size
pool_block=False
)
session.mount("https://", adapter)
session.mount("http://", adapter)
return session
# Use in client
class OptimizedReputeAPIClient:
def __init__(self, api_key: str):
self.api_key = api_key
self.session = create_optimized_session()
self.session.headers.update({"X-API-Key": api_key})
```
**JavaScript:**
```javascript
const axios = require('axios');
const https = require('https');
// Create HTTP agent with connection pooling
const httpsAgent = new https.Agent({
keepAlive: true,
keepAliveMsecs: 30000,
maxSockets: 50,
maxFreeSockets: 10
});
const client = axios.create({
baseURL: 'https://api.reputeapi.com',
httpsAgent,
headers: {
'X-API-Key': process.env.REPUTE_API_KEY
}
});
```
### 3. Batch Requests Efficiently
When checking multiple domains, use concurrent requests with rate limiting:
**Python:**
```python
import asyncio
import httpx
from typing import List, Dict
async def check_domains_concurrent(
domains: List[str],
max_concurrent: int = 5
) -> List[Dict]:
"""
Check multiple domains concurrently with rate limiting
Args:
domains: List of domains to check
max_concurrent: Maximum concurrent requests
Returns:
List of results
"""
semaphore = asyncio.Semaphore(max_concurrent)
async def check_with_limit(domain: str):
async with semaphore:
async with httpx.AsyncClient() as client:
response = await client.get(
f"{BASE_URL}/api/v1/score",
params={"domain": domain},
headers={"X-API-Key": API_KEY}
)
return response.json()
results = await asyncio.gather(
*[check_with_limit(domain) for domain in domains],
return_exceptions=True
)
return results
```
**JavaScript:**
```javascript
const pLimit = require('p-limit');
async function checkDomainsConcurrent(domains, maxConcurrent = 5) {
const limit = pLimit(maxConcurrent);
const results = await Promise.allSettled(
domains.map(domain =>
limit(() => client.getScore(domain))
)
);
return results.map((result, index) => ({
domain: domains[index],
...(result.status === 'fulfilled'
? { success: true, data: result.value }
: { success: false, error: result.reason.message }
)
}));
}
```
---
## Caching Strategies
### 1. Respect API Caching
The API caches results for 15 minutes. Leverage this:
```python
# Don't bypass cache unless necessary
result = client.check_domain(domain) # Uses cache if available
# Only bypass cache when you need fresh data
result = client.check_domain(domain, refresh=True) # Forces fresh lookup
2. Implement Application-Level CachingΒΆ
Add your own caching layer for frequently accessed data:
Python (Redis):
import redis
import json
from datetime import timedelta
class CachedReputeAPIClient:
"""Client with Redis caching"""
def __init__(self, api_key: str, redis_url: str):
self.api = ReputeAPIClient(api_key)
self.redis = redis.from_url(redis_url)
self.cache_ttl = 900 # 15 minutes
def check_domain(self, domain: str, force_refresh: bool = False) -> Dict:
"""Check domain with Redis caching"""
cache_key = f"repute:check:{domain}"
# Try cache first
if not force_refresh:
cached = self.redis.get(cache_key)
if cached:
return json.loads(cached)
# Fetch from API
result = self.api.check_domain(domain)
# Cache result
self.redis.setex(
cache_key,
self.cache_ttl,
json.dumps(result)
)
return result
def invalidate_cache(self, domain: str):
"""Invalidate cached result for domain"""
cache_key = f"repute:check:{domain}"
self.redis.delete(cache_key)
```
**JavaScript (Redis):**
```javascript
const Redis = require('ioredis');
class CachedReputeAPIClient {
constructor(apiKey, redisUrl) {
this.api = new ReputeAPIClient(apiKey);
this.redis = new Redis(redisUrl);
this.cacheTtl = 900; // 15 minutes
}
async checkDomain(domain, forceRefresh = false) {
const cacheKey = `repute:check:${domain}`;
// Try cache first
if (!forceRefresh) {
const cached = await this.redis.get(cacheKey);
if (cached) {
return JSON.parse(cached);
}
}
// Fetch from API
const result = await this.api.checkDomain(domain);
// Cache result
await this.redis.setex(
cacheKey,
this.cacheTtl,
JSON.stringify(result)
);
return result;
}
async invalidateCache(domain) {
const cacheKey = `repute:check:${domain}`;
await this.redis.del(cacheKey);
}
}
```
### 3. Database Caching
For applications that need historical tracking:
```sql
CREATE TABLE domain_cache (
domain VARCHAR(255) PRIMARY KEY,
result JSONB NOT NULL,
cached_at TIMESTAMP NOT NULL,
expires_at TIMESTAMP NOT NULL
);
-- Index for cleanup of expired entries
CREATE INDEX idx_expires_at ON domain_cache(expires_at);
-- Query to get from cache
SELECT result
FROM domain_cache
WHERE domain = 'example.com'
AND expires_at > NOW();
-- Insert/update cache
INSERT INTO domain_cache (domain, result, cached_at, expires_at)
VALUES ('example.com', '{"score": 85}', NOW(), NOW() + INTERVAL '15 minutes')
ON CONFLICT (domain)
DO UPDATE SET
result = EXCLUDED.result,
cached_at = EXCLUDED.cached_at,
expires_at = EXCLUDED.expires_at;
4. Cache Invalidation StrategiesΒΆ
Implement intelligent cache invalidation:
class SmartCacheManager:
"""Intelligent cache management"""
def __init__(self, cache, api_client):
self.cache = cache
self.api = api_client
def get_result(self, domain: str) -> Dict:
"""Get result with smart caching"""
cached = self.cache.get(domain)
if cached:
# Check if score is critical and cache is old
if self._is_critical(cached) and self._is_stale(cached):
# Refresh in background
self._refresh_async(domain)
return cached
return self._fetch_and_cache(domain)
def _is_critical(self, result: Dict) -> bool:
"""Check if result indicates critical issues"""
return result['score'] < 50
def _is_stale(self, cached: Dict) -> bool:
"""Check if cached data is older than 5 minutes"""
cached_at = datetime.fromisoformat(cached['_cached_at'])
return datetime.now() - cached_at > timedelta(minutes=5)
def _refresh_async(self, domain: str):
"""Refresh cache in background"""
# Queue background job to refresh
refresh_cache_task.delay(domain)
Error Handling and ResilienceΒΆ
1. Comprehensive Error HandlingΒΆ
Handle all error types appropriately:
class ReputeAPIError(Exception):
"""Base exception for ReputeAPI errors"""
pass
class AuthenticationError(ReputeAPIError):
"""Invalid API key"""
pass
class RateLimitError(ReputeAPIError):
"""Rate limit exceeded"""
def __init__(self, message, retry_after=None):
super().__init__(message)
self.retry_after = retry_after
class ValidationError(ReputeAPIError):
"""Invalid request parameters"""
pass
class ServerError(ReputeAPIError):
"""Server-side error"""
pass
class NetworkError(ReputeAPIError):
"""Network connectivity error"""
pass
def check_domain_safe(domain: str) -> Optional[Dict]:
"""Check domain with comprehensive error handling"""
try:
response = requests.get(
f"{BASE_URL}/api/v1/check",
params={"domain": domain},
headers={"X-API-Key": API_KEY},
timeout=10
)
if response.status_code == 401:
raise AuthenticationError("Invalid API key")
elif response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 60))
raise RateLimitError("Rate limit exceeded", retry_after=retry_after)
elif response.status_code == 400:
error = response.json()
raise ValidationError(error.get('message', 'Invalid request'))
elif response.status_code >= 500:
raise ServerError(f"Server error: {response.status_code}")
response.raise_for_status()
return response.json()
except AuthenticationError as e:
logger.error(f"Authentication failed: {e}")
# Alert ops team - API key may be invalid
alert_ops("Authentication failed")
raise
except RateLimitError as e:
logger.warning(f"Rate limit hit, retry after {e.retry_after}s")
# Implement backoff or queue for later
return None
except ValidationError as e:
logger.error(f"Validation error for {domain}: {e}")
# This is likely a bug in our code
return None
except ServerError as e:
logger.error(f"Server error: {e}")
# Retry with exponential backoff
raise
except requests.Timeout:
logger.error(f"Request timeout for {domain}")
raise NetworkError("Request timed out")
except requests.ConnectionError:
logger.error("Connection error")
raise NetworkError("Connection failed")
except Exception as e:
logger.error(f"Unexpected error: {e}")
raise
2. Circuit Breaker PatternΒΆ
Prevent cascading failures:
from datetime import datetime, timedelta
class CircuitBreaker:
"""Circuit breaker for API calls"""
def __init__(
self,
failure_threshold: int = 5,
timeout: int = 60,
success_threshold: int = 2
):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.success_threshold = success_threshold
self.failures = 0
self.successes = 0
self.last_failure_time = None
self.state = 'closed' # closed, open, half-open
def call(self, func, *args, **kwargs):
"""Execute function with circuit breaker"""
if self.state == 'open':
if self._should_attempt_reset():
self.state = 'half-open'
else:
raise Exception("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self):
"""Handle successful call"""
self.failures = 0
if self.state == 'half-open':
self.successes += 1
if self.successes >= self.success_threshold:
self.state = 'closed'
self.successes = 0
def _on_failure(self):
"""Handle failed call"""
self.failures += 1
self.last_failure_time = datetime.now()
self.successes = 0
if self.failures >= self.failure_threshold:
self.state = 'open'
def _should_attempt_reset(self) -> bool:
"""Check if we should attempt to reset circuit"""
return (
self.last_failure_time and
datetime.now() - self.last_failure_time > timedelta(seconds=self.timeout)
)
# Usage
circuit_breaker = CircuitBreaker(failure_threshold=5, timeout=60)
def check_domain_with_circuit_breaker(domain: str):
return circuit_breaker.call(
lambda: client.check_domain(domain)
)
3. Retry with Exponential BackoffΒΆ
import time
from functools import wraps
def retry_with_backoff(
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0
):
"""
Decorator for retry with exponential backoff
Args:
max_retries: Maximum number of retry attempts
base_delay: Initial delay in seconds
max_delay: Maximum delay in seconds
exponential_base: Base for exponential calculation
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
retries = 0
delay = base_delay
while retries <= max_retries:
try:
return func(*args, **kwargs)
except RateLimitError as e:
# Use server-provided retry_after
delay = e.retry_after if e.retry_after else delay
logger.warning(f"Rate limited, waiting {delay}s")
except (ServerError, NetworkError) as e:
retries += 1
if retries > max_retries:
logger.error(f"Max retries exceeded: {e}")
raise
logger.warning(
f"Error: {e}. Retry {retries}/{max_retries} "
f"after {delay}s"
)
except (AuthenticationError, ValidationError):
# Don't retry these
raise
time.sleep(delay)
# Exponential backoff with jitter
delay = min(
delay * exponential_base + random.uniform(0, 1),
max_delay
)
raise Exception(f"Failed after {max_retries} retries")
return wrapper
return decorator
# Usage
@retry_with_backoff(max_retries=3, base_delay=2)
def check_domain_with_retry(domain: str):
return client.check_domain(domain)
Security Best PracticesΒΆ
1. Secure API Key StorageΒΆ
Never hardcode API keys:
Environment Variables:
# .env file
REPUTE_API_KEY=sk_live_abc123...
# Load in application
from dotenv import load_dotenv
import os
load_dotenv()
API_KEY = os.getenv("REPUTE_API_KEY")
```
**Secrets Manager (AWS):**
```python
import boto3
import json
def get_api_key():
"""Get API key from AWS Secrets Manager"""
client = boto3.client('secretsmanager')
response = client.get_secret_value(
SecretId='prod/reputeapi/apikey'
)
secret = json.loads(response['SecretString'])
return secret['api_key']
API_KEY = get_api_key()
```
**Vault (HashiCorp):**
```python
import hvac
def get_api_key():
"""Get API key from Vault"""
client = hvac.Client(url='https://vault.example.com')
client.auth.approle.login(
role_id=os.getenv('VAULT_ROLE_ID'),
secret_id=os.getenv('VAULT_SECRET_ID')
)
secret = client.secrets.kv.v2.read_secret_version(
path='reputeapi/apikey'
)
return secret['data']['data']['api_key']
API_KEY = get_api_key()
```
### 2. Validate Input
Always validate domain inputs:
```python
import re
from urllib.parse import urlparse
def validate_domain(domain: str) -> str:
"""
Validate and sanitize domain input
Args:
domain: Domain to validate
Returns:
Sanitized domain
Raises:
ValueError: If domain is invalid
"""
# Remove whitespace
domain = domain.strip()
# Remove protocol if present
if '://' in domain:
parsed = urlparse(domain)
domain = parsed.netloc or parsed.path
# Remove path, query, fragment
domain = domain.split('/')[0].split('?')[0].split('#')[0]
# Remove port
domain = domain.split(':')[0]
# Validate format
domain_pattern = r'^[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(\.[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)*$'
if not re.match(domain_pattern, domain):
raise ValueError(f"Invalid domain format: {domain}")
# Check length
if len(domain) > 253:
raise ValueError(f"Domain too long: {domain}")
return domain.lower()
# Usage
try:
clean_domain = validate_domain(user_input)
result = client.check_domain(clean_domain)
except ValueError as e:
return {"error": str(e)}
3. Rate Limit Your ApplicationΒΆ
Implement client-side rate limiting:
from threading import Lock
from collections import deque
import time
class RateLimiter:
"""Client-side rate limiter"""
def __init__(self, max_calls: int, period: int):
"""
Initialize rate limiter
Args:
max_calls: Maximum calls allowed
period: Time period in seconds
"""
self.max_calls = max_calls
self.period = period
self.calls = deque()
self.lock = Lock()
def allow_request(self) -> bool:
"""Check if request is allowed"""
with self.lock:
now = time.time()
# Remove old calls outside the window
while self.calls and self.calls[0] < now - self.period:
self.calls.popleft()
if len(self.calls) < self.max_calls:
self.calls.append(now)
return True
return False
def wait_if_needed(self):
"""Wait if rate limit is hit"""
while not self.allow_request():
time.sleep(0.1)
# Usage
rate_limiter = RateLimiter(max_calls=10, period=60) # 10 per minute
def check_domain_rate_limited(domain: str):
rate_limiter.wait_if_needed()
return client.check_domain(domain)
Cost OptimizationΒΆ
1. Monitor UsageΒΆ
Track API usage to optimize costs:
class UsageTracker:
"""Track API usage"""
def __init__(self, db_connection):
self.db = db_connection
def track_request(
self,
endpoint: str,
domain: str,
cached: bool,
response_time: float
):
"""Track API request"""
query = """
INSERT INTO api_usage (
endpoint, domain, cached, response_time, timestamp
) VALUES (%s, %s, %s, %s, NOW())
"""
self.db.execute(query, (endpoint, domain, cached, response_time))
def get_usage_stats(self, days: int = 30) -> Dict:
"""Get usage statistics"""
query = """
SELECT
COUNT(*) as total_requests,
SUM(CASE WHEN cached THEN 1 ELSE 0 END) as cached_requests,
COUNT(DISTINCT domain) as unique_domains,
AVG(response_time) as avg_response_time,
DATE(timestamp) as date
FROM api_usage
WHERE timestamp >= NOW() - INTERVAL '%s days'
GROUP BY DATE(timestamp)
ORDER BY date
"""
return self.db.query_all(query, (days,))
def get_top_domains(self, limit: int = 10) -> List[Dict]:
"""Get most checked domains"""
query = """
SELECT domain, COUNT(*) as check_count
FROM api_usage
WHERE timestamp >= NOW() - INTERVAL '30 days'
GROUP BY domain
ORDER BY check_count DESC
LIMIT %s
"""
return self.db.query_all(query, (limit,))
2. Use Score Endpoint for Frequent ChecksΒΆ
The score endpoint is faster and uses less data:
def check_for_dashboard(domain: str) -> Dict:
"""Optimized check for dashboard display"""
# Use score endpoint for initial load
score_result = client.get_score(domain)
# Only fetch full details if user clicks for more info
return {
'domain': domain,
'score': score_result['score'],
'grade': score_result['grade'],
'top_issues_count': len(score_result.get('top_issues', []))
}
def get_full_details(domain: str) -> Dict:
"""Get full details only when needed"""
return client.check_domain(domain)
3. Implement Smart RefreshΒΆ
Only refresh when necessary:
def should_refresh(domain: str, last_check: datetime) -> bool:
"""Determine if domain should be refreshed"""
now = datetime.now()
age = now - last_check
# Always refresh if older than 24 hours
if age > timedelta(hours=24):
return True
# Refresh critical domains more frequently
score = get_cached_score(domain)
if score and score < 50:
return age > timedelta(hours=1)
# Normal domains: refresh if older than 6 hours
return age > timedelta(hours=6)
Monitoring and ObservabilityΒΆ
1. Logging Best PracticesΒΆ
import logging
import json
# Configure structured logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(levelname)s %(name)s %(message)s'
)
logger = logging.getLogger(__name__)
def check_domain_with_logging(domain: str) -> Dict:
"""Check domain with comprehensive logging"""
start_time = time.time()
logger.info("Starting domain check", extra={
'domain': domain,
'operation': 'check_domain'
})
try:
result = client.check_domain(domain)
elapsed = time.time() - start_time
logger.info("Domain check successful", extra={
'domain': domain,
'score': result['score'],
'elapsed_ms': elapsed * 1000,
'cached': result.get('meta', {}).get('cached', False)
})
return result
except Exception as e:
elapsed = time.time() - start_time
logger.error("Domain check failed", extra={
'domain': domain,
'error': str(e),
'error_type': type(e).__name__,
'elapsed_ms': elapsed * 1000
}, exc_info=True)
raise
2. Metrics CollectionΒΆ
from prometheus_client import Counter, Histogram, Gauge
# Define metrics
api_requests_total = Counter(
'repute_api_requests_total',
'Total API requests',
['endpoint', 'status']
)
api_request_duration = Histogram(
'repute_api_request_duration_seconds',
'API request duration',
['endpoint']
)
api_cache_hit_rate = Gauge(
'repute_api_cache_hit_rate',
'Cache hit rate'
)
def check_domain_with_metrics(domain: str) -> Dict:
"""Check domain with metrics collection"""
endpoint = '/api/v1/check'
with api_request_duration.labels(endpoint=endpoint).time():
try:
result = client.check_domain(domain)
api_requests_total.labels(
endpoint=endpoint,
status='success'
).inc()
# Track cache hits
if result.get('meta', {}).get('cached'):
api_cache_hit_rate.set(1)
else:
api_cache_hit_rate.set(0)
return result
except Exception as e:
api_requests_total.labels(
endpoint=endpoint,
status='error'
).inc()
raise
3. Health ChecksΒΆ
def health_check() -> Dict:
"""Check API health"""
try:
# Simple request to verify connectivity
client.get_usage()
return {
'status': 'healthy',
'api': 'reachable',
'timestamp': datetime.now().isoformat()
}
except Exception as e:
return {
'status': 'unhealthy',
'api': 'unreachable',
'error': str(e),
'timestamp': datetime.now().isoformat()
}
Testing Best PracticesΒΆ
1. Mock API ResponsesΒΆ
import pytest
from unittest.mock import Mock, patch
@pytest.fixture
def mock_api_response():
return {
'domain': 'example.com',
'score': 85,
'grade': 'Good',
'issues': []
}
def test_check_domain_success(mock_api_response):
"""Test successful domain check"""
with patch('requests.get') as mock_get:
mock_get.return_value.json.return_value = mock_api_response
mock_get.return_value.status_code = 200
result = check_domain('example.com')
assert result['domain'] == 'example.com'
assert result['score'] == 85
2. Integration TestsΒΆ
@pytest.mark.integration
def test_api_integration():
"""Test actual API integration"""
# Use test API key
client = ReputeAPIClient(api_key=TEST_API_KEY)
result = client.check_domain('google.com')
assert 'score' in result
assert 0 <= result['score'] <= 100
assert 'issues' in result
Quick Reference ChecklistΒΆ
- [ ] Use appropriate endpoint (score vs check)
- [ ] Implement connection pooling
- [ ] Add caching layer (Redis/database)
- [ ] Handle all error types
- [ ] Implement retry with exponential backoff
- [ ] Use circuit breaker for resilience
- [ ] Store API keys securely
- [ ] Validate all inputs
- [ ] Implement client-side rate limiting
- [ ] Track API usage
- [ ] Add comprehensive logging
- [ ] Collect metrics
- [ ] Set up monitoring and alerts
- [ ] Write tests (unit and integration)
Next StepsΒΆ
- Integration Patterns - Architecture patterns
- Python Guide - Python examples
- JavaScript Guide - JavaScript examples
- API Reference - Complete API documentation
SupportΒΆ
Need help with best practices?