Integration PatternsΒΆ
Architecture patterns and design strategies for integrating ReputeAPI into your applications.
OverviewΒΆ
This guide covers common integration patterns for different use cases, from simple validation workflows to complex multi-tenant systems.
What You'll Learn:
- Architecture patterns for various use cases
- Data flow and caching strategies
- Event-driven integration approaches
- Multi-tenant and SaaS patterns
- Real-time monitoring architectures
Pattern 1: Simple Validation ServiceΒΆ
Use Case: Basic domain validation during user onboarding or configuration.
ArchitectureΒΆ
βββββββββββββββ βββββββββββββββ βββββββββββββββ
β Client βββββββΆβ Your API βββββββΆβ ReputeAPI β
β Application β β Service β β β
βββββββββββββββ βββββββββββββββ βββββββββββββββ
β
βΌ
βββββββββββββββ
β Database β
β (Cache) β
βββββββββββββββ
ImplementationΒΆ
Python:
from datetime import datetime, timedelta
from typing import Optional, Dict
class ValidationService:
"""Simple validation service with caching"""
def __init__(self, api_client, db_connection):
self.api = api_client
self.db = db_connection
self.cache_ttl = timedelta(minutes=15)
def validate_domain(self, domain: str, force_refresh: bool = False) -> Dict:
"""
Validate domain with database caching
Args:
domain: Domain to validate
force_refresh: Bypass cache
Returns:
Validation result
"""
# Check cache first
if not force_refresh:
cached = self._get_from_cache(domain)
if cached:
return cached
# Fetch from API
result = self.api.check_domain(domain)
# Save to cache
self._save_to_cache(domain, result)
return result
def _get_from_cache(self, domain: str) -> Optional[Dict]:
"""Get cached result if not expired"""
query = """
SELECT result, cached_at
FROM domain_validations
WHERE domain = %s
ORDER BY cached_at DESC
LIMIT 1
"""
row = self.db.query_one(query, (domain,))
if row:
result, cached_at = row
if datetime.now() - cached_at < self.cache_ttl:
return result
return None
def _save_to_cache(self, domain: str, result: Dict):
"""Save result to cache"""
query = """
INSERT INTO domain_validations (domain, result, cached_at)
VALUES (%s, %s, %s)
"""
self.db.execute(query, (domain, result, datetime.now()))
```
**JavaScript:**
```javascript
class ValidationService {
constructor(apiClient, database) {
this.api = apiClient;
this.db = database;
this.cacheTtl = 15 * 60 * 1000; // 15 minutes
}
async validateDomain(domain, forceRefresh = false) {
// Check cache first
if (!forceRefresh) {
const cached = await this.getFromCache(domain);
if (cached) return cached;
}
// Fetch from API
const result = await this.api.checkDomain(domain);
// Save to cache
await this.saveToCache(domain, result);
return result;
}
async getFromCache(domain) {
const query = `
SELECT result, cached_at
FROM domain_validations
WHERE domain = $1
ORDER BY cached_at DESC
LIMIT 1
`;
const row = await this.db.queryOne(query, [domain]);
if (row) {
const cacheAge = Date.now() - row.cached_at.getTime();
if (cacheAge < this.cacheTtl) {
return row.result;
}
}
return null;
}
async saveToCache(domain, result) {
const query = `
INSERT INTO domain_validations (domain, result, cached_at)
VALUES ($1, $2, $3)
`;
await this.db.execute(query, [domain, result, new Date()]);
}
}
```
### Database Schema
```sql
CREATE TABLE domain_validations (
id SERIAL PRIMARY KEY,
domain VARCHAR(255) NOT NULL,
result JSONB NOT NULL,
cached_at TIMESTAMP NOT NULL,
INDEX idx_domain_cached (domain, cached_at)
);
Pattern 2: Background Job ProcessingΒΆ
Use Case: Validate domains asynchronously without blocking user requests.
ArchitectureΒΆ
βββββββββββββββ βββββββββββββββ βββββββββββββββ
β Client βββββββΆβ Web API β β Job Queue β
βββββββββββββββ βββββββββββββββ β (Redis) β
β βββββββββββββββ
β β
βΌ βΌ
βββββββββββββββ βββββββββββββββ
β Database β β Worker β
βββββββββββββββ β Processes β
βββββββββββββββ
β
βΌ
βββββββββββββββ
β ReputeAPI β
βββββββββββββββ
ImplementationΒΆ
Python (Celery):
from celery import Celery
from datetime import datetime
app = Celery('tasks', broker='redis://localhost:6379')
@app.task(bind=True, max_retries=3)
def validate_domain_task(self, domain: str, user_id: int):
"""
Background task to validate domain
Args:
domain: Domain to validate
user_id: User who requested validation
"""
try:
# Update status to processing
db.update_validation_status(domain, user_id, 'processing')
# Call API
client = ReputeAPIClient(api_key=settings.REPUTE_API_KEY)
result = client.check_domain(domain)
# Save result
db.save_validation_result(
domain=domain,
user_id=user_id,
result=result,
status='completed',
completed_at=datetime.now()
)
# Trigger webhook if configured
webhook_url = db.get_user_webhook(user_id)
if webhook_url:
send_webhook(webhook_url, {
'event': 'validation.completed',
'domain': domain,
'result': result
})
except Exception as e:
# Update status to failed
db.update_validation_status(domain, user_id, 'failed', str(e))
# Retry with exponential backoff
raise self.retry(exc=e, countdown=2 ** self.request.retries)
# API endpoint
@app.route('/api/validate', methods=['POST'])
def queue_validation():
"""Queue domain validation"""
data = request.json
domain = data.get('domain')
user_id = request.user.id
# Create validation record
validation_id = db.create_validation(
domain=domain,
user_id=user_id,
status='queued'
)
# Queue background task
validate_domain_task.delay(domain, user_id)
return jsonify({
'validation_id': validation_id,
'status': 'queued',
'message': 'Validation queued for processing'
})
```
**JavaScript (Bull):**
```javascript
const Queue = require('bull');
const validationQueue = new Queue('domain-validation', 'redis://localhost:6379');
// Worker process
validationQueue.process(async (job) => {
const { domain, userId } = job.data;
try {
// Update status
await db.updateValidationStatus(domain, userId, 'processing');
// Call API
const client = new ReputeAPIClient(process.env.REPUTE_API_KEY);
const result = await client.checkDomain(domain);
// Save result
await db.saveValidationResult({
domain,
userId,
result,
status: 'completed',
completedAt: new Date()
});
// Trigger webhook
const webhookUrl = await db.getUserWebhook(userId);
if (webhookUrl) {
await sendWebhook(webhookUrl, {
event: 'validation.completed',
domain,
result
});
}
return result;
} catch (error) {
await db.updateValidationStatus(
domain,
userId,
'failed',
error.message
);
throw error;
}
});
// API endpoint
app.post('/api/validate', async (req, res) => {
const { domain } = req.body;
const userId = req.user.id;
// Create validation record
const validationId = await db.createValidation({
domain,
userId,
status: 'queued'
});
// Queue job
await validationQueue.add({
domain,
userId
});
res.json({
validationId,
status: 'queued',
message: 'Validation queued for processing'
});
});
```
---
## Pattern 3: Event-Driven Architecture
**Use Case:** React to validation events across multiple services.
### Architecture
### Implementation
**Python (Kafka):**
```python
from kafka import KafkaProducer, KafkaConsumer
import json
class ValidationEventProducer:
"""Produce validation events"""
def __init__(self, bootstrap_servers):
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
def emit_validation_started(self, domain: str, user_id: int):
"""Emit validation started event"""
event = {
'event_type': 'validation.started',
'domain': domain,
'user_id': user_id,
'timestamp': datetime.now().isoformat()
}
self.producer.send('domain-validations', value=event)
def emit_validation_completed(
self,
domain: str,
user_id: int,
result: Dict
):
"""Emit validation completed event"""
event = {
'event_type': 'validation.completed',
'domain': domain,
'user_id': user_id,
'score': result['score'],
'grade': result['grade'],
'issues_count': len(result.get('issues', [])),
'timestamp': datetime.now().isoformat()
}
self.producer.send('domain-validations', value=event)
class AlertService:
"""Alert service consuming validation events"""
def __init__(self, bootstrap_servers):
self.consumer = KafkaConsumer(
'domain-validations',
bootstrap_servers=bootstrap_servers,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
group_id='alert-service'
)
def process_events(self):
"""Process validation events"""
for message in self.consumer:
event = message.value
if event['event_type'] == 'validation.completed':
score = event['score']
# Alert on low scores
if score < 70:
self.send_alert(
event['domain'],
f"Low security score: {score}/100"
)
# Alert on critical issues
if event.get('critical_issues', 0) > 0:
self.send_alert(
event['domain'],
f"Critical issues found"
)
def send_alert(self, domain: str, message: str):
"""Send alert (email, Slack, etc.)"""
print(f"ALERT [{domain}]: {message}")
# Implement your alert logic here
```
**JavaScript (RabbitMQ):**
```javascript
const amqp = require('amqplib');
class ValidationEventProducer {
constructor(rabbitUrl) {
this.rabbitUrl = rabbitUrl;
}
async connect() {
this.connection = await amqp.connect(this.rabbitUrl);
this.channel = await this.connection.createChannel();
await this.channel.assertExchange('domain-validations', 'topic');
}
async emitValidationStarted(domain, userId) {
const event = {
eventType: 'validation.started',
domain,
userId,
timestamp: new Date().toISOString()
};
this.channel.publish(
'domain-validations',
'validation.started',
Buffer.from(JSON.stringify(event))
);
}
async emitValidationCompleted(domain, userId, result) {
const event = {
eventType: 'validation.completed',
domain,
userId,
score: result.score,
grade: result.grade,
issuesCount: result.issues?.length || 0,
timestamp: new Date().toISOString()
};
this.channel.publish(
'domain-validations',
'validation.completed',
Buffer.from(JSON.stringify(event))
);
}
}
class AlertService {
constructor(rabbitUrl) {
this.rabbitUrl = rabbitUrl;
}
async connect() {
this.connection = await amqp.connect(this.rabbitUrl);
this.channel = await this.connection.createChannel();
await this.channel.assertExchange('domain-validations', 'topic');
const queue = await this.channel.assertQueue('alerts');
await this.channel.bindQueue(
queue.queue,
'domain-validations',
'validation.completed'
);
this.channel.consume(queue.queue, this.handleEvent.bind(this));
}
async handleEvent(message) {
const event = JSON.parse(message.content.toString());
if (event.eventType === 'validation.completed') {
const { domain, score } = event;
// Alert on low scores
if (score < 70) {
await this.sendAlert(domain, `Low security score: ${score}/100`);
}
}
this.channel.ack(message);
}
async sendAlert(domain, message) {
console.log(`ALERT [${domain}]: ${message}`);
// Implement your alert logic here
}
}
```
---
## Pattern 4: Multi-Tenant SaaS
**Use Case:** SaaS application managing multiple customer domains.
### Architecture
### Implementation
```python
from typing import Dict, List, Optional
from datetime import datetime, timedelta
import hashlib
class MultiTenantValidationService:
"""Multi-tenant validation service with isolation"""
def __init__(self, api_client, db_connection):
self.api = api_client
self.db = db_connection
self.rate_limiter = TenantRateLimiter()
def validate_domain(
self,
tenant_id: str,
domain: str,
force_refresh: bool = False
) -> Dict:
"""
Validate domain for a specific tenant
Args:
tenant_id: Tenant identifier
domain: Domain to validate
force_refresh: Bypass cache
Returns:
Validation result
"""
# Verify tenant access to domain
if not self._verify_tenant_domain(tenant_id, domain):
raise PermissionError(f"Tenant {tenant_id} cannot access domain {domain}")
# Check rate limits for this tenant
if not self.rate_limiter.allow_request(tenant_id):
raise RateLimitExceeded(f"Rate limit exceeded for tenant {tenant_id}")
# Check cache with tenant isolation
if not force_refresh:
cached = self._get_from_cache(tenant_id, domain)
if cached:
return cached
# Fetch from API
result = self.api.check_domain(domain)
# Save to cache with tenant isolation
self._save_to_cache(tenant_id, domain, result)
# Track usage for this tenant
self._track_usage(tenant_id, domain)
return result
def get_tenant_domains(self, tenant_id: str) -> List[Dict]:
"""Get all domains for a tenant with latest scores"""
query = """
SELECT
d.domain,
d.added_at,
v.result->>'score' as score,
v.result->>'grade' as grade,
v.cached_at as last_checked
FROM tenant_domains d
LEFT JOIN LATERAL (
SELECT result, cached_at
FROM domain_validations
WHERE domain = d.domain
ORDER BY cached_at DESC
LIMIT 1
) v ON true
WHERE d.tenant_id = %s
ORDER BY d.domain
"""
return self.db.query_all(query, (tenant_id,))
def get_tenant_usage(
self,
tenant_id: str,
start_date: datetime,
end_date: datetime
) -> Dict:
"""Get usage statistics for a tenant"""
query = """
SELECT
COUNT(*) as total_validations,
COUNT(DISTINCT domain) as unique_domains,
DATE(validated_at) as date
FROM validation_usage
WHERE tenant_id = %s
AND validated_at BETWEEN %s AND %s
GROUP BY DATE(validated_at)
ORDER BY date
"""
results = self.db.query_all(query, (tenant_id, start_date, end_date))
return {
'tenant_id': tenant_id,
'period': {
'start': start_date.isoformat(),
'end': end_date.isoformat()
},
'usage': results
}
def _verify_tenant_domain(self, tenant_id: str, domain: str) -> bool:
"""Verify tenant has access to domain"""
query = """
SELECT COUNT(*) FROM tenant_domains
WHERE tenant_id = %s AND domain = %s
"""
count = self.db.query_one(query, (tenant_id, domain))[0]
return count > 0
def _get_from_cache(self, tenant_id: str, domain: str) -> Optional[Dict]:
"""Get cached result with tenant isolation"""
cache_key = self._get_cache_key(tenant_id, domain)
query = """
SELECT result, cached_at
FROM domain_validations
WHERE cache_key = %s
ORDER BY cached_at DESC
LIMIT 1
"""
row = self.db.query_one(query, (cache_key,))
if row:
result, cached_at = row
if datetime.now() - cached_at < timedelta(minutes=15):
return result
return None
def _save_to_cache(self, tenant_id: str, domain: str, result: Dict):
"""Save result to cache with tenant isolation"""
cache_key = self._get_cache_key(tenant_id, domain)
query = """
INSERT INTO domain_validations (cache_key, domain, result, cached_at)
VALUES (%s, %s, %s, %s)
"""
self.db.execute(query, (cache_key, domain, result, datetime.now()))
def _get_cache_key(self, tenant_id: str, domain: str) -> str:
"""Generate tenant-isolated cache key"""
key_data = f"{tenant_id}:{domain}"
return hashlib.sha256(key_data.encode()).hexdigest()
def _track_usage(self, tenant_id: str, domain: str):
"""Track API usage for billing/analytics"""
query = """
INSERT INTO validation_usage (tenant_id, domain, validated_at)
VALUES (%s, %s, %s)
"""
self.db.execute(query, (tenant_id, domain, datetime.now()))
class TenantRateLimiter:
"""Per-tenant rate limiting"""
def __init__(self, redis_client):
self.redis = redis_client
self.default_limit = 100 # requests per hour
def allow_request(self, tenant_id: str) -> bool:
"""Check if tenant can make another request"""
key = f"rate_limit:{tenant_id}"
current = self.redis.incr(key)
if current == 1:
self.redis.expire(key, 3600) # 1 hour window
limit = self._get_tenant_limit(tenant_id)
return current <= limit
def _get_tenant_limit(self, tenant_id: str) -> int:
"""Get rate limit for tenant (based on plan)"""
# Query database for tenant plan
# Return appropriate limit
return self.default_limit
Database SchemaΒΆ
-- Tenant domains
CREATE TABLE tenant_domains (
id SERIAL PRIMARY KEY,
tenant_id VARCHAR(255) NOT NULL,
domain VARCHAR(255) NOT NULL,
added_at TIMESTAMP NOT NULL,
UNIQUE(tenant_id, domain),
INDEX idx_tenant (tenant_id)
);
-- Domain validations with tenant isolation
CREATE TABLE domain_validations (
id SERIAL PRIMARY KEY,
cache_key VARCHAR(64) NOT NULL,
domain VARCHAR(255) NOT NULL,
result JSONB NOT NULL,
cached_at TIMESTAMP NOT NULL,
INDEX idx_cache_key (cache_key, cached_at)
);
-- Usage tracking
CREATE TABLE validation_usage (
id SERIAL PRIMARY KEY,
tenant_id VARCHAR(255) NOT NULL,
domain VARCHAR(255) NOT NULL,
validated_at TIMESTAMP NOT NULL,
INDEX idx_tenant_date (tenant_id, validated_at)
);
Pattern 5: Real-Time Monitoring DashboardΒΆ
Use Case: Real-time dashboard showing live domain security status.
ArchitectureΒΆ
βββββββββββββββ βββββββββββββββ βββββββββββββββ
β Browser βββββββΆβ WebSocket βββββββΆβ Backend β
β Dashboard β β Server β β Service β
βββββββββββββββ βββββββββββββββ βββββββββββββββ
β
βΌ
βββββββββββββββ
β Scheduler β
β (Cron) β
βββββββββββββββ
β
βΌ
βββββββββββββββ
β ReputeAPI β
βββββββββββββββ
ImplementationΒΆ
Backend (Python + Flask-SocketIO):
from flask import Flask
from flask_socketio import SocketIO, emit
from apscheduler.schedulers.background import BackgroundScheduler
app = Flask(__name__)
socketio = SocketIO(app, cors_allowed_origins="*")
class DomainMonitor:
"""Real-time domain monitoring"""
def __init__(self, api_client):
self.api = api_client
self.monitored_domains = set()
self.scheduler = BackgroundScheduler()
self.scheduler.start()
def add_domain(self, domain: str):
"""Add domain to monitoring"""
if domain not in self.monitored_domains:
self.monitored_domains.add(domain)
# Schedule periodic checks
self.scheduler.add_job(
func=self.check_domain,
args=[domain],
trigger='interval',
minutes=15,
id=f"check_{domain}"
)
# Check immediately
self.check_domain(domain)
def remove_domain(self, domain: str):
"""Remove domain from monitoring"""
if domain in self.monitored_domains:
self.monitored_domains.remove(domain)
self.scheduler.remove_job(f"check_{domain}")
def check_domain(self, domain: str):
"""Check domain and emit result via WebSocket"""
try:
result = self.api.get_score(domain)
# Emit update to all connected clients
socketio.emit('domain_update', {
'domain': domain,
'score': result['score'],
'grade': result['grade'],
'top_issues': result.get('top_issues', []),
'timestamp': datetime.now().isoformat()
})
except Exception as e:
socketio.emit('domain_error', {
'domain': domain,
'error': str(e),
'timestamp': datetime.now().isoformat()
})
# Initialize monitor
api_client = ReputeAPIClient(api_key=settings.REPUTE_API_KEY)
monitor = DomainMonitor(api_client)
@socketio.on('connect')
def handle_connect():
"""Handle client connection"""
print('Client connected')
@socketio.on('subscribe')
def handle_subscribe(data):
"""Subscribe to domain updates"""
domain = data['domain']
monitor.add_domain(domain)
emit('subscribed', {'domain': domain})
@socketio.on('unsubscribe')
def handle_unsubscribe(data):
"""Unsubscribe from domain updates"""
domain = data['domain']
monitor.remove_domain(domain)
emit('unsubscribed', {'domain': domain})
if __name__ == '__main__':
socketio.run(app, host='0.0.0.0', port=5000)
```
**Frontend (React + Socket.io):**
```javascript
import React, { useState, useEffect } from 'react';
import io from 'socket.io-client';
function DomainMonitorDashboard() {
const [socket, setSocket] = useState(null);
const [domains, setDomains] = useState({});
const [newDomain, setNewDomain] = useState('');
useEffect(() => {
// Connect to WebSocket server
const newSocket = io('http://localhost:5000');
setSocket(newSocket);
// Listen for domain updates
newSocket.on('domain_update', (data) => {
setDomains(prev => ({
...prev,
[data.domain]: {
...data,
status: 'online'
}
}));
});
// Listen for errors
newSocket.on('domain_error', (data) => {
setDomains(prev => ({
...prev,
[data.domain]: {
...prev[data.domain],
error: data.error,
status: 'error'
}
}));
});
return () => newSocket.close();
}, []);
const subscribeToDomain = (domain) => {
if (socket && domain) {
socket.emit('subscribe', { domain });
setNewDomain('');
}
};
const unsubscribeFromDomain = (domain) => {
if (socket) {
socket.emit('unsubscribe', { domain });
setDomains(prev => {
const updated = { ...prev };
delete updated[domain];
return updated;
});
}
};
return (
<div className="dashboard">
<h1>Domain Security Monitor</h1>
<div className="add-domain">
<input
value={newDomain}
onChange={(e) => setNewDomain(e.target.value)}
onKeyPress={(e) => e.key === 'Enter' && subscribeToDomain(newDomain)}
placeholder="Enter domain to monitor"
/>
<button onClick={() => subscribeToDomain(newDomain)}>
Add Domain
</button>
</div>
<div className="domains-grid">
{Object.entries(domains).map(([domain, data]) => (
<DomainCard
key={domain}
domain={domain}
data={data}
onRemove={() => unsubscribeFromDomain(domain)}
/>
))}
</div>
</div>
);
}
function DomainCard({ domain, data, onRemove }) {
const getGradeColor = (grade) => {
const colors = {
'Excellent': 'green',
'Good': 'blue',
'Fair': 'orange',
'Poor': 'red'
};
return colors[grade] || 'gray';
};
return (
<div className="domain-card">
<div className="card-header">
<h3>{domain}</h3>
<button onClick={onRemove}>Γ</button>
</div>
{data.status === 'error' ? (
<div className="error">Error: {data.error}</div>
) : (
<>
<div className="score" style={{ color: getGradeColor(data.grade) }}>
{data.score}/100
</div>
<div className="grade">{data.grade}</div>
{data.top_issues && data.top_issues.length > 0 && (
<div className="issues">
<h4>Issues:</h4>
<ul>
{data.top_issues.map((issue, i) => (
<li key={i} className={`severity-${issue.severity}`}>
{issue.message}
</li>
))}
</ul>
</div>
)}
<div className="timestamp">
Last checked: {new Date(data.timestamp).toLocaleString()}
</div>
</>
)}
</div>
);
}
export default DomainMonitorDashboard;
```
---
## Pattern 6: Webhook Integration
**Use Case:** Notify external systems of validation results.
### Implementation
```python
import requests
from typing import Dict, List
import hmac
import hashlib
class WebhookService:
"""Webhook notification service"""
def __init__(self, db_connection):
self.db = db_connection
def send_webhook(
self,
webhook_url: str,
event_type: str,
payload: Dict,
secret: str = None
):
"""
Send webhook notification
Args:
webhook_url: Target webhook URL
event_type: Type of event
payload: Event data
secret: Optional secret for signature
"""
headers = {
'Content-Type': 'application/json',
'X-Webhook-Event': event_type
}
# Add signature if secret provided
if secret:
signature = self._generate_signature(payload, secret)
headers['X-Webhook-Signature'] = signature
try:
response = requests.post(
webhook_url,
json=payload,
headers=headers,
timeout=10
)
# Log webhook delivery
self._log_webhook(
webhook_url,
event_type,
response.status_code,
success=response.status_code < 400
)
except Exception as e:
self._log_webhook(
webhook_url,
event_type,
0,
success=False,
error=str(e)
)
def _generate_signature(self, payload: Dict, secret: str) -> str:
"""Generate HMAC signature"""
import json
message = json.dumps(payload, sort_keys=True)
signature = hmac.new(
secret.encode(),
message.encode(),
hashlib.sha256
).hexdigest()
return f"sha256={signature}"
def _log_webhook(
self,
url: str,
event_type: str,
status_code: int,
success: bool,
error: str = None
):
"""Log webhook delivery"""
query = """
INSERT INTO webhook_logs (url, event_type, status_code, success, error, sent_at)
VALUES (%s, %s, %s, %s, %s, NOW())
"""
self.db.execute(query, (url, event_type, status_code, success, error))
# Usage in validation workflow
def validate_with_webhook(domain: str, user_id: int):
"""Validate domain and send webhook"""
# Get validation result
client = ReputeAPIClient(api_key=API_KEY)
result = client.check_domain(domain)
# Get user's webhook URL
webhook_config = db.get_user_webhook_config(user_id)
if webhook_config:
webhook_service = WebhookService(db)
webhook_service.send_webhook(
webhook_url=webhook_config['url'],
event_type='domain.validated',
payload={
'domain': domain,
'score': result['score'],
'grade': result['grade'],
'issues': result.get('issues', []),
'timestamp': datetime.now().isoformat()
},
secret=webhook_config.get('secret')
)
return result
Best PracticesΒΆ
1. Caching StrategyΒΆ
- Cache results for 15 minutes (API default)
- Use domain + parameters as cache key
- Implement cache invalidation on user request
2. Rate LimitingΒΆ
- Respect API rate limits
- Implement client-side rate limiting
- Use exponential backoff for retries
3. Error HandlingΒΆ
- Handle all error types gracefully
- Implement retry logic for transient failures
- Log errors for debugging
4. SecurityΒΆ
- Store API keys securely (environment variables, secrets manager)
- Use HTTPS for all API calls
- Validate webhook signatures
5. MonitoringΒΆ
- Track API usage and costs
- Monitor response times
- Set up alerts for errors
Next StepsΒΆ
- Best Practices - Performance optimization
- Python Guide - Python integration examples
- JavaScript Guide - JavaScript integration examples
- API Reference - Complete API documentation
SupportΒΆ
Need help with integration patterns?