Python Integration GuideΒΆ
Complete guide to integrating ReputeAPI with Python applications.
InstallationΒΆ
ReputeAPI works with any HTTP client library. We recommend using requests for simplicity or httpx for async support.
requests:
httpx (async):
aiohttp:
Quick StartΒΆ
Basic Domain CheckΒΆ
import requests
API_KEY = "your-api-key-here"
BASE_URL = "https://api.reputeapi.com"
def check_domain(domain):
"""Check domain mailflow security"""
response = requests.get(
f"{BASE_URL}/api/v1/check",
params={"domain": domain},
headers={"X-API-Key": API_KEY}
)
response.raise_for_status()
return response.json()
# Usage
result = check_domain("example.com")
print(f"Score: {result['score']}/100")
print(f"Grade: {result['grade']}")
Configuration Best PracticesΒΆ
Store your API key securely using environment variables:
import os
from dotenv import load_dotenv
# Load from .env file
load_dotenv()
API_KEY = os.getenv("REPUTE_API_KEY")
BASE_URL = os.getenv("REPUTE_API_URL", "https://api.reputeapi.com")
if not API_KEY:
raise ValueError("REPUTE_API_KEY environment variable not set")
.env file:
Client Wrapper ClassΒΆ
Create a reusable client for your application:
import requests
from typing import Dict, List, Optional
from datetime import datetime
class ReputeAPIClient:
"""Python client for ReputeAPI"""
def __init__(self, api_key: str, base_url: str = "https://api.reputeapi.com"):
self.api_key = api_key
self.base_url = base_url
self.session = requests.Session()
self.session.headers.update({"X-API-Key": api_key})
def check_domain(
self,
domain: str,
selectors: Optional[List[str]] = None,
verbose: bool = False,
refresh: bool = False
) -> Dict:
"""
Run complete mailflow security check
Args:
domain: Domain to check (e.g., "example.com")
selectors: DKIM selectors to check
verbose: Include verbose DNS details
refresh: Bypass cache and fetch fresh data
Returns:
Full validation results with score and issues
"""
params = {
"domain": domain,
"verbose": verbose,
"refresh": refresh
}
if selectors:
params["selectors"] = ",".join(selectors)
response = self.session.get(
f"{self.base_url}/api/v1/check",
params=params
)
response.raise_for_status()
return response.json()
def get_score(self, domain: str, refresh: bool = False) -> Dict:
"""
Get quick score check (faster than full check)
Args:
domain: Domain to check
refresh: Bypass cache
Returns:
Score and top issues
"""
response = self.session.get(
f"{self.base_url}/api/v1/score",
params={"domain": domain, "refresh": refresh}
)
response.raise_for_status()
return response.json()
def get_recommendations(
self,
domain: str,
severity: Optional[str] = None
) -> Dict:
"""
Get prioritized recommendations
Args:
domain: Domain to get recommendations for
severity: Filter by severity (critical, high, medium, low)
Returns:
Prioritized list of fixes
"""
data = {"domain": domain}
if severity:
data["severity"] = severity
response = self.session.post(
f"{self.base_url}/api/v1/recommendations",
json=data
)
response.raise_for_status()
return response.json()
def get_history(
self,
domain: str,
days: int = 30,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None
) -> Dict:
"""
Get historical validation data
Args:
domain: Domain to get history for
days: Number of days to retrieve (if dates not specified)
start_date: Start date for history
end_date: End date for history
Returns:
Historical snapshots
"""
params = {"domain": domain}
if start_date and end_date:
params["start_date"] = start_date.isoformat()
params["end_date"] = end_date.isoformat()
else:
params["days"] = days
response = self.session.get(
f"{self.base_url}/api/v1/history",
params=params
)
response.raise_for_status()
return response.json()
def get_usage(self) -> Dict:
"""
Get API usage statistics
Returns:
Usage stats for current billing period
"""
response = self.session.get(f"{self.base_url}/api/v1/usage")
response.raise_for_status()
return response.json()
def bulk_validate(self, domains: List[str]) -> Dict:
"""
Validate multiple domains (legacy endpoint)
Args:
domains: List of domains to validate
Returns:
Results for all domains
"""
response = self.session.post(
f"{self.base_url}/v1/bulk-validate",
json={"domains": domains}
)
response.raise_for_status()
return response.json()
def close(self):
"""Close the session"""
self.session.close()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
# Usage
with ReputeAPIClient(api_key=API_KEY) as client:
result = client.check_domain("example.com")
print(f"Score: {result['score']}/100")
Error HandlingΒΆ
Comprehensive Error HandlingΒΆ
import requests
from requests.exceptions import (
RequestException,
Timeout,
ConnectionError,
HTTPError
)
class ReputeAPIError(Exception):
"""Base exception for ReputeAPI errors"""
pass
class AuthenticationError(ReputeAPIError):
"""Invalid API key"""
pass
class RateLimitError(ReputeAPIError):
"""Rate limit exceeded"""
def __init__(self, message, retry_after=None):
super().__init__(message)
self.retry_after = retry_after
class ValidationError(ReputeAPIError):
"""Invalid request parameters"""
pass
class ServerError(ReputeAPIError):
"""Server-side error"""
pass
def check_domain_safe(domain: str) -> Optional[Dict]:
"""
Check domain with comprehensive error handling
Returns:
Validation result or None if error occurred
"""
try:
response = requests.get(
f"{BASE_URL}/api/v1/check",
params={"domain": domain},
headers={"X-API-Key": API_KEY},
timeout=10 # 10 second timeout
)
# Check for HTTP errors
if response.status_code == 401:
raise AuthenticationError("Invalid or missing API key")
elif response.status_code == 429:
retry_after = response.headers.get("Retry-After", 60)
raise RateLimitError(
"Rate limit exceeded",
retry_after=int(retry_after)
)
elif response.status_code == 400:
error_data = response.json()
raise ValidationError(f"Invalid request: {error_data.get('message')}")
elif response.status_code >= 500:
raise ServerError(f"Server error: {response.status_code}")
response.raise_for_status()
return response.json()
except AuthenticationError as e:
print(f"Authentication error: {e}")
print("Please check your API key")
return None
except RateLimitError as e:
print(f"Rate limit exceeded. Retry after {e.retry_after} seconds")
return None
except ValidationError as e:
print(f"Validation error: {e}")
return None
except Timeout:
print(f"Request timed out for domain: {domain}")
return None
except ConnectionError:
print("Connection error. Please check your internet connection")
return None
except HTTPError as e:
print(f"HTTP error: {e}")
return None
except RequestException as e:
print(f"Request failed: {e}")
return None
# Usage
result = check_domain_safe("example.com")
if result:
print(f"Score: {result['score']}/100")
else:
print("Failed to check domain")
Retry Logic with Exponential BackoffΒΆ
import time
from functools import wraps
def retry_with_backoff(max_retries=3, base_delay=1, max_delay=60):
"""
Decorator to retry failed requests with exponential backoff
Args:
max_retries: Maximum number of retry attempts
base_delay: Initial delay in seconds
max_delay: Maximum delay in seconds
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
retries = 0
delay = base_delay
while retries < max_retries:
try:
return func(*args, **kwargs)
except RateLimitError as e:
# Use server-provided retry_after if available
delay = e.retry_after if e.retry_after else delay
print(f"Rate limited. Waiting {delay} seconds...")
time.sleep(delay)
retries += 1
except (Timeout, ConnectionError, ServerError) as e:
retries += 1
if retries >= max_retries:
raise
print(f"Error: {e}. Retrying in {delay} seconds... (attempt {retries}/{max_retries})")
time.sleep(delay)
# Exponential backoff with jitter
delay = min(delay * 2, max_delay)
except (AuthenticationError, ValidationError):
# Don't retry on authentication or validation errors
raise
raise Exception(f"Max retries ({max_retries}) exceeded")
return wrapper
return decorator
@retry_with_backoff(max_retries=3, base_delay=2)
def check_domain_with_retry(domain: str) -> Dict:
"""Check domain with automatic retry"""
response = requests.get(
f"{BASE_URL}/api/v1/check",
params={"domain": domain},
headers={"X-API-Key": API_KEY},
timeout=10
)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 60))
raise RateLimitError("Rate limit exceeded", retry_after=retry_after)
if response.status_code >= 500:
raise ServerError(f"Server error: {response.status_code}")
response.raise_for_status()
return response.json()
# Usage
try:
result = check_domain_with_retry("example.com")
print(f"Score: {result['score']}/100")
except Exception as e:
print(f"Failed after retries: {e}")
Async SupportΒΆ
Using httpx (Recommended)ΒΆ
import httpx
import asyncio
from typing import List
class AsyncReputeAPIClient:
"""Async Python client for ReputeAPI"""
def __init__(self, api_key: str, base_url: str = "https://api.reputeapi.com"):
self.api_key = api_key
self.base_url = base_url
self.headers = {"X-API-Key": api_key}
async def check_domain(self, domain: str, **kwargs) -> Dict:
"""Check domain asynchronously"""
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.base_url}/api/v1/check",
params={"domain": domain, **kwargs},
headers=self.headers,
timeout=10.0
)
response.raise_for_status()
return response.json()
async def get_score(self, domain: str) -> Dict:
"""Get score asynchronously"""
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.base_url}/api/v1/score",
params={"domain": domain},
headers=self.headers,
timeout=10.0
)
response.raise_for_status()
return response.json()
async def check_multiple_domains(self, domains: List[str]) -> List[Dict]:
"""
Check multiple domains concurrently
Args:
domains: List of domains to check
Returns:
List of results (one per domain)
"""
tasks = [self.check_domain(domain) for domain in domains]
return await asyncio.gather(*tasks, return_exceptions=True)
# Usage
async def main():
client = AsyncReputeAPIClient(api_key=API_KEY)
# Single domain
result = await client.check_domain("example.com")
print(f"Score: {result['score']}/100")
# Multiple domains concurrently
domains = ["example.com", "google.com", "github.com"]
results = await client.check_multiple_domains(domains)
for domain, result in zip(domains, results):
if isinstance(result, Exception):
print(f"{domain}: Error - {result}")
else:
print(f"{domain}: {result['score']}/100")
# Run
asyncio.run(main())
Using aiohttpΒΆ
import aiohttp
import asyncio
async def check_domain_aiohttp(domain: str) -> Dict:
"""Check domain using aiohttp"""
async with aiohttp.ClientSession() as session:
async with session.get(
f"{BASE_URL}/api/v1/check",
params={"domain": domain},
headers={"X-API-Key": API_KEY},
timeout=aiohttp.ClientTimeout(total=10)
) as response:
response.raise_for_status()
return await response.json()
# Usage
result = asyncio.run(check_domain_aiohttp("example.com"))
print(f"Score: {result['score']}/100")
Common Use CasesΒΆ
1. Domain Onboarding ValidatorΒΆ
from dataclasses import dataclass
from typing import List
@dataclass
class OnboardingResult:
domain: str
approved: bool
score: int
critical_issues: List[Dict]
recommendations: List[str]
def validate_customer_domain(domain: str, min_score: int = 50) -> OnboardingResult:
"""
Validate customer domain during onboarding
Args:
domain: Customer's email domain
min_score: Minimum acceptable score
Returns:
OnboardingResult with approval status
"""
client = ReputeAPIClient(api_key=API_KEY)
try:
# Get full check
result = client.check_domain(domain)
score = result["score"]
issues = result.get("issues", [])
# Filter critical issues
critical_issues = [
issue for issue in issues
if issue["severity"] == "critical"
]
# Get recommendations for critical issues
recommendations = []
if critical_issues:
rec_result = client.get_recommendations(domain, severity="critical")
recommendations = [
rec["remediation"]
for rec in rec_result.get("recommendations", [])
]
# Approve if score meets minimum and no critical issues
approved = score >= min_score and len(critical_issues) == 0
return OnboardingResult(
domain=domain,
approved=approved,
score=score,
critical_issues=critical_issues,
recommendations=recommendations
)
except Exception as e:
print(f"Error validating domain: {e}")
return OnboardingResult(
domain=domain,
approved=False,
score=0,
critical_issues=[],
recommendations=["Unable to validate domain"]
)
# Usage
result = validate_customer_domain("customer-domain.com", min_score=70)
if result.approved:
print(f"Domain approved with score: {result.score}/100")
else:
print(f"Domain rejected. Score: {result.score}/100")
print(f"Critical issues found: {len(result.critical_issues)}")
print("\nRecommendations:")
for rec in result.recommendations:
print(f" - {rec}")
2. Batch Domain AuditorΒΆ
import csv
from datetime import datetime
from typing import List, Dict
def audit_domains_batch(
domains: List[str],
output_file: str = "audit_results.csv"
) -> List[Dict]:
"""
Audit multiple domains and save results to CSV
Args:
domains: List of domains to audit
output_file: CSV file to save results
Returns:
List of audit results
"""
client = ReputeAPIClient(api_key=API_KEY)
results = []
print(f"Auditing {len(domains)} domains...")
for i, domain in enumerate(domains, 1):
print(f"[{i}/{len(domains)}] Checking {domain}...", end=" ")
try:
result = client.get_score(domain) # Use score endpoint for speed
audit_result = {
"domain": domain,
"score": result["score"],
"grade": result["grade"],
"timestamp": datetime.now().isoformat(),
"status": "success"
}
# Get issue count by severity
top_issues = result.get("top_issues", [])
audit_result["critical_count"] = sum(
1 for i in top_issues if i["severity"] == "critical"
)
audit_result["high_count"] = sum(
1 for i in top_issues if i["severity"] == "high"
)
print(f"Score: {result['score']}/100")
except Exception as e:
print(f"Error: {e}")
audit_result = {
"domain": domain,
"score": 0,
"grade": "Error",
"timestamp": datetime.now().isoformat(),
"status": f"error: {str(e)}",
"critical_count": 0,
"high_count": 0
}
results.append(audit_result)
# Rate limiting: wait between requests
time.sleep(0.5)
# Save to CSV
if results:
with open(output_file, "w", newline="") as csvfile:
fieldnames = [
"domain", "score", "grade", "critical_count",
"high_count", "status", "timestamp"
]
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(results)
print(f"\nResults saved to {output_file}")
return results
# Usage
domains = [
"example.com",
"test.com",
"demo.com"
]
results = audit_domains_batch(domains)
# Summary statistics
total_domains = len(results)
passing_domains = sum(1 for r in results if r["score"] >= 80)
print(f"\nSummary: {passing_domains}/{total_domains} domains passing (80+)")
3. Continuous Monitoring ScriptΒΆ
import schedule
import time
from datetime import datetime
class DomainMonitor:
"""Continuous domain monitoring with alerting"""
def __init__(self, api_key: str, alert_threshold: int = 70):
self.client = ReputeAPIClient(api_key=api_key)
self.alert_threshold = alert_threshold
self.previous_scores = {}
def check_domain(self, domain: str):
"""Check domain and alert if score drops"""
try:
result = self.client.get_score(domain)
current_score = result["score"]
previous_score = self.previous_scores.get(domain)
print(f"[{datetime.now()}] {domain}: {current_score}/100")
# Alert if score below threshold
if current_score < self.alert_threshold:
self.send_alert(
domain,
f"Score below threshold: {current_score} < {self.alert_threshold}"
)
# Alert if score dropped significantly
if previous_score and current_score < previous_score - 10:
self.send_alert(
domain,
f"Score dropped from {previous_score} to {current_score}"
)
# Update score history
self.previous_scores[domain] = current_score
except Exception as e:
print(f"Error checking {domain}: {e}")
self.send_alert(domain, f"Check failed: {str(e)}")
def send_alert(self, domain: str, message: str):
"""Send alert (implement your notification method)"""
print(f"ALERT for {domain}: {message}")
# Add your notification logic here:
# - Send email
# - Post to Slack
# - Create ticket
# - Send SMS
# etc.
def monitor_domains(self, domains: List[str]):
"""Monitor list of domains"""
for domain in domains:
self.check_domain(domain)
# Setup monitoring
monitor = DomainMonitor(api_key=API_KEY, alert_threshold=75)
monitored_domains = ["example.com", "test.com"]
# Schedule checks
schedule.every().hour.do(
lambda: monitor.monitor_domains(monitored_domains)
)
print("Starting domain monitor...")
print(f"Monitoring: {', '.join(monitored_domains)}")
# Run initial check
monitor.monitor_domains(monitored_domains)
# Keep running
while True:
schedule.run_pending()
time.sleep(60)
4. Flask Web Application IntegrationΒΆ
from flask import Flask, jsonify, request
from functools import lru_cache
import os
app = Flask(__name__)
client = ReputeAPIClient(api_key=os.getenv("REPUTE_API_KEY"))
@lru_cache(maxsize=128)
def cached_domain_check(domain: str) -> Dict:
"""Cache domain checks for 15 minutes"""
return client.check_domain(domain)
@app.route("/api/check/<domain>", methods=["GET"])
def check_domain_endpoint(domain: str):
"""Check domain via web API"""
try:
result = cached_domain_check(domain)
return jsonify(result), 200
except AuthenticationError:
return jsonify({"error": "API authentication failed"}), 500
except ValidationError as e:
return jsonify({"error": str(e)}), 400
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/api/score/<domain>", methods=["GET"])
def get_score_endpoint(domain: str):
"""Get quick score"""
try:
result = client.get_score(domain)
return jsonify({
"domain": domain,
"score": result["score"],
"grade": result["grade"]
}), 200
except Exception as e:
return jsonify({"error": str(e)}), 500
if __name__ == "__main__":
app.run(debug=True)
TestingΒΆ
Unit Tests with pytestΒΆ
import pytest
from unittest.mock import Mock, patch
import requests
@pytest.fixture
def mock_response():
"""Fixture for mock API response"""
return {
"domain": "example.com",
"score": 85,
"grade": "Good",
"issues": []
}
@pytest.fixture
def client():
"""Fixture for ReputeAPI client"""
return ReputeAPIClient(api_key="test-key")
def test_check_domain_success(client, mock_response):
"""Test successful domain check"""
with patch.object(client.session, "get") as mock_get:
mock_get.return_value.json.return_value = mock_response
mock_get.return_value.raise_for_status = Mock()
result = client.check_domain("example.com")
assert result["domain"] == "example.com"
assert result["score"] == 85
mock_get.assert_called_once()
def test_check_domain_invalid_key(client):
"""Test authentication error handling"""
with patch.object(client.session, "get") as mock_get:
mock_get.return_value.status_code = 401
mock_get.return_value.raise_for_status.side_effect = requests.HTTPError()
with pytest.raises(requests.HTTPError):
client.check_domain("example.com")
def test_retry_on_rate_limit():
"""Test retry logic on rate limit"""
with patch("time.sleep"): # Skip actual sleep
with patch("requests.get") as mock_get:
# First call: rate limit
# Second call: success
mock_get.side_effect = [
Mock(status_code=429, headers={"Retry-After": "1"}),
Mock(status_code=200, json=lambda: {"score": 90})
]
result = check_domain_with_retry("example.com")
assert result["score"] == 90
Performance TipsΒΆ
1. Connection PoolingΒΆ
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
def create_session_with_retries():
"""Create session with connection pooling and retries"""
session = requests.Session()
retry_strategy = Retry(
total=3,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET", "POST"],
backoff_factor=1
)
adapter = HTTPAdapter(
max_retries=retry_strategy,
pool_connections=10,
pool_maxsize=20
)
session.mount("https://", adapter)
session.mount("http://", adapter)
return session
# Use in client
class OptimizedReputeAPIClient(ReputeAPIClient):
def __init__(self, api_key: str, base_url: str = "https://api.reputeapi.com"):
super().__init__(api_key, base_url)
self.session = create_session_with_retries()
self.session.headers.update({"X-API-Key": api_key})
2. Local CachingΒΆ
from functools import lru_cache
from datetime import datetime, timedelta
import hashlib
import json
class CachedReputeAPIClient(ReputeAPIClient):
"""Client with local caching"""
def __init__(self, *args, cache_ttl=900, **kwargs):
super().__init__(*args, **kwargs)
self.cache_ttl = cache_ttl # 15 minutes
self._cache = {}
def _get_cache_key(self, endpoint: str, params: Dict) -> str:
"""Generate cache key"""
key_data = f"{endpoint}:{json.dumps(params, sort_keys=True)}"
return hashlib.md5(key_data.encode()).hexdigest()
def _get_cached(self, key: str) -> Optional[Dict]:
"""Get from cache if not expired"""
if key in self._cache:
data, timestamp = self._cache[key]
if datetime.now() - timestamp < timedelta(seconds=self.cache_ttl):
return data
return None
def _set_cache(self, key: str, data: Dict):
"""Set cache entry"""
self._cache[key] = (data, datetime.now())
def check_domain(self, domain: str, **kwargs) -> Dict:
"""Check domain with caching"""
cache_key = self._get_cache_key("check", {"domain": domain, **kwargs})
# Try cache first
cached_data = self._get_cached(cache_key)
if cached_data and not kwargs.get("refresh"):
return cached_data
# Fetch from API
result = super().check_domain(domain, **kwargs)
# Cache result
self._set_cache(cache_key, result)
return result
Advanced ExamplesΒΆ
Parallel Processing with MultiprocessingΒΆ
from multiprocessing import Pool
from functools import partial
def check_domain_worker(domain: str, api_key: str) -> Dict:
"""Worker function for parallel processing"""
client = ReputeAPIClient(api_key=api_key)
try:
return client.get_score(domain)
except Exception as e:
return {"domain": domain, "error": str(e)}
def check_domains_parallel(domains: List[str], workers: int = 4) -> List[Dict]:
"""
Check multiple domains in parallel using multiprocessing
Args:
domains: List of domains to check
workers: Number of parallel workers
Returns:
List of results
"""
worker_func = partial(check_domain_worker, api_key=API_KEY)
with Pool(workers) as pool:
results = pool.map(worker_func, domains)
return results
# Usage
domains = ["example.com", "google.com", "github.com", "stackoverflow.com"]
results = check_domains_parallel(domains, workers=4)
for result in results:
if "error" in result:
print(f"{result['domain']}: Error - {result['error']}")
else:
print(f"{result['domain']}: {result['score']}/100")
Next StepsΒΆ
- JavaScript Guide - Node.js integration
- cURL Examples - Command-line usage
- Integration Patterns - Architecture patterns
- Best Practices - Optimization tips
- API Reference - Complete API documentation
Additional ResourcesΒΆ
SupportΒΆ
Need help with Python integration?