Common ScenariosΒΆ
Real-world examples and solutions for integrating ReputeAPI into your applications. Each scenario includes complete, production-ready code.
Customer Onboarding ValidationΒΆ
Validate customer domains during signup to ensure email deliverability.
Use CaseΒΆ
Email service provider needs to validate new customer domains before allowing them to send emails.
RequirementsΒΆ
- Minimum security score of 70/100
- No critical issues
- SPF and DMARC must be present
- Provide remediation steps for issues
ImplementationΒΆ
import requests
from dataclasses import dataclass
from typing import List, Dict, Optional
API_KEY = "your-api-key-here"
BASE_URL = "https://api.reputeapi.com"
@dataclass
class OnboardingResult:
"""Result of domain onboarding validation"""
domain: str
approved: bool
score: int
grade: str
issues: List[Dict]
remediation_steps: List[str]
rejection_reason: Optional[str] = None
def validate_onboarding_domain(
domain: str,
min_score: int = 70,
require_dmarc: bool = True,
require_spf: bool = True
) -> OnboardingResult:
"""
Validate domain for customer onboarding
Args:
domain: Customer's email domain
min_score: Minimum acceptable score
require_dmarc: Require DMARC to be present
require_spf: Require SPF to be present
Returns:
OnboardingResult with approval status and recommendations
"""
try:
# Check domain
response = requests.get(
f"{BASE_URL}/api/v1/check",
params={"domain": domain},
headers={"X-API-Key": API_KEY},
timeout=10
)
response.raise_for_status()
result = response.json()
# Extract data
score = result['score']
grade = result['grade']
issues = result.get('issues', [])
# Check for critical issues
critical_issues = [i for i in issues if i['severity'] == 'critical']
# Check requirements
spf_present = result['spf']['present']
dmarc_present = result['dmarc']['present']
# Determine approval
approved = True
rejection_reason = None
remediation_steps = []
if score < min_score:
approved = False
rejection_reason = f"Security score ({score}) below minimum ({min_score})"
if critical_issues:
approved = False
if rejection_reason:
rejection_reason += f"; {len(critical_issues)} critical issues found"
else:
rejection_reason = f"{len(critical_issues)} critical issues found"
if require_spf and not spf_present:
approved = False
rejection_reason = (rejection_reason or "") + "; SPF record missing"
remediation_steps.append("Add SPF record to DNS")
if require_dmarc and not dmarc_present:
approved = False
rejection_reason = (rejection_reason or "") + "; DMARC record missing"
remediation_steps.append("Add DMARC record to DNS")
# Get remediation steps from API
if not approved and not remediation_steps:
rec_response = requests.post(
f"{BASE_URL}/api/v1/recommendations",
headers={"X-API-Key": API_KEY},
json={"domain": domain, "severity": "critical"}
)
if rec_response.ok:
recommendations = rec_response.json().get('recommendations', [])
remediation_steps = [r['remediation'] for r in recommendations[:5]]
return OnboardingResult(
domain=domain,
approved=approved,
score=score,
grade=grade,
issues=issues,
remediation_steps=remediation_steps,
rejection_reason=rejection_reason
)
except Exception as e:
return OnboardingResult(
domain=domain,
approved=False,
score=0,
grade="Error",
issues=[],
remediation_steps=[],
rejection_reason=f"Validation failed: {str(e)}"
)
# Usage Example
def onboard_customer(email_domain: str):
"""Onboard new customer with domain validation"""
print(f"Validating domain: {email_domain}")
result = validate_onboarding_domain(
domain=email_domain,
min_score=70,
require_dmarc=True,
require_spf=True
)
if result.approved:
print(f"β
Domain approved!")
print(f" Score: {result.score}/100 ({result.grade})")
print(f" Customer can proceed with onboarding")
# TODO: Create customer account
return True
else:
print(f"β Domain rejected")
print(f" Reason: {result.rejection_reason}")
print(f" Score: {result.score}/100")
if result.remediation_steps:
print(f"\n Required fixes:")
for i, step in enumerate(result.remediation_steps, 1):
print(f" {i}. {step}")
# TODO: Send email to customer with remediation steps
return False
# Test
if __name__ == "__main__":
onboard_customer("example.com")
Expected Output:
Validating domain: example.com
β
Domain approved!
Score: 82/100 (Good)
Customer can proceed with onboarding
Continuous Domain MonitoringΒΆ
Monitor domains 24/7 and alert on security score changes.
Use CaseΒΆ
Monitor company domains and customer domains for security configuration changes.
ImplementationΒΆ
import requests
import schedule
import time
from datetime import datetime
from typing import Dict, List, Optional
import smtplib
from email.mime.text import MIMEText
API_KEY = "your-api-key-here"
BASE_URL = "https://api.reputeapi.com"
class DomainMonitor:
"""Continuous domain monitoring with alerting"""
def __init__(
self,
api_key: str,
alert_threshold: int = 70,
alert_email: Optional[str] = None
):
self.api_key = api_key
self.base_url = BASE_URL
self.alert_threshold = alert_threshold
self.alert_email = alert_email
self.previous_scores: Dict[str, int] = {}
self.previous_issues: Dict[str, List] = {}
def check_domain(self, domain: str) -> Dict:
"""Check single domain"""
response = requests.get(
f"{self.base_url}/api/v1/check",
params={"domain": domain, "refresh": True},
headers={"X-API-Key": self.api_key},
timeout=10
)
response.raise_for_status()
return response.json()
def monitor_domain(self, domain: str):
"""
Monitor domain and alert on changes
Triggers alerts when:
- Score drops below threshold
- Score drops by 10+ points
- New critical issues appear
"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
try:
result = self.check_domain(domain)
current_score = result['score']
previous_score = self.previous_scores.get(domain)
print(f"[{timestamp}] {domain}: {current_score}/100")
# Check for score below threshold
if current_score < self.alert_threshold:
self.send_alert(
domain,
f"ALERT: Score below threshold",
f"Current score: {current_score}\nThreshold: {self.alert_threshold}"
)
# Check for significant drop
if previous_score and current_score < previous_score - 10:
self.send_alert(
domain,
f"ALERT: Score dropped significantly",
f"Previous: {previous_score}\nCurrent: {current_score}\nDrop: {previous_score - current_score} points"
)
# Check for new critical issues
current_critical = [
i for i in result.get('issues', [])
if i['severity'] == 'critical'
]
previous_critical = self.previous_issues.get(domain, [])
if len(current_critical) > len(previous_critical):
new_issues = len(current_critical) - len(previous_critical)
self.send_alert(
domain,
f"ALERT: New critical issues detected",
f"New critical issues: {new_issues}\n\nIssues:\n" +
"\n".join([f"- {i['message']}" for i in current_critical])
)
# Update tracking
self.previous_scores[domain] = current_score
self.previous_issues[domain] = current_critical
except Exception as e:
error_msg = f"Failed to check {domain}: {str(e)}"
print(f"[{timestamp}] β {error_msg}")
self.send_alert(domain, "ALERT: Check failed", error_msg)
def monitor_multiple_domains(self, domains: List[str]):
"""Monitor multiple domains"""
for domain in domains:
self.monitor_domain(domain)
time.sleep(1) # Rate limiting
def send_alert(self, domain: str, subject: str, message: str):
"""Send alert notification"""
alert_text = f"""
Domain Monitoring Alert
=======================
Domain: {domain}
Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
{message}
"""
print(f"\nπ¨ ALERT: {subject}")
print(alert_text)
# Send email if configured
if self.alert_email:
self.send_email_alert(subject, alert_text)
def send_email_alert(self, subject: str, body: str):
"""Send email alert (configure SMTP settings)"""
# TODO: Configure your SMTP settings
pass
def main():
"""Main monitoring loop"""
# Configuration
monitor = DomainMonitor(
api_key=API_KEY,
alert_threshold=75,
alert_email="alerts@example.com"
)
# Domains to monitor
monitored_domains = [
"example.com",
"mycompany.com",
"customer1.com"
]
print("Starting domain monitor...")
print(f"Monitoring {len(monitored_domains)} domains")
print(f"Alert threshold: {monitor.alert_threshold}")
print(f"Check interval: Every hour\n")
# Schedule checks every hour
schedule.every().hour.do(
lambda: monitor.monitor_multiple_domains(monitored_domains)
)
# Run initial check
monitor.monitor_multiple_domains(monitored_domains)
# Keep running
while True:
schedule.run_pending()
time.sleep(60)
if __name__ == "__main__":
main()
Pre-Send ValidationΒΆ
Validate domain before sending marketing emails.
ImplementationΒΆ
import requests
from typing import Dict, Optional
API_KEY = "your-api-key-here"
BASE_URL = "https://api.reputeapi.com"
def validate_before_send(
domain: str,
min_score: int = 75,
check_dmarc_policy: bool = True
) -> tuple[bool, Optional[str]]:
"""
Validate domain before sending emails
Args:
domain: Domain to validate
min_score: Minimum required score
check_dmarc_policy: Check if DMARC policy is 'reject' or 'quarantine'
Returns:
Tuple of (approved, reason)
"""
try:
response = requests.get(
f"{BASE_URL}/api/v1/check",
params={"domain": domain},
headers={"X-API-Key": API_KEY},
timeout=10
)
response.raise_for_status()
result = response.json()
# Check score
if result['score'] < min_score:
return False, f"Security score too low: {result['score']}/100"
# Check SPF
if not result['spf']['present']:
return False, "SPF record missing"
# Check DMARC
if not result['dmarc']['present']:
return False, "DMARC record missing"
# Check DMARC policy
if check_dmarc_policy:
policy = result['dmarc'].get('policy', 'none')
if policy == 'none':
return False, "DMARC policy is 'none' - emails may be rejected"
return True, None
except Exception as e:
return False, f"Validation error: {str(e)}"
# Usage in email sending function
def send_marketing_email(to_email: str, subject: str, body: str):
"""Send marketing email with pre-send validation"""
# Extract domain from email
domain = to_email.split('@')[1]
# Validate domain
approved, reason = validate_before_send(domain)
if not approved:
print(f"β Cannot send to {to_email}")
print(f" Reason: {reason}")
# Log for later review
return False
# Domain validated - send email
print(f"β
Domain validated: {domain}")
# TODO: Send email via your ESP
print(f" Sending email to {to_email}")
return True
# Test
if __name__ == "__main__":
send_marketing_email("user@example.com", "Test", "Body")
CI/CD IntegrationΒΆ
Integrate email security checks into deployment pipeline.
GitHub Actions ExampleΒΆ
# .github/workflows/email-security-check.yml
name: Email Security Check
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
schedule:
# Run daily at 9am
- cron: '0 9 * * *'
jobs:
check-email-security:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install requests
- name: Check domain email security
env:
REPUTE_API_KEY: ${{ secrets.REPUTE_API_KEY }}
DOMAIN: ${{ vars.COMPANY_DOMAIN }}
run: |
python .github/scripts/check_email_security.py
- name: Upload results
uses: actions/upload-artifact@v3
if: always()
with:
name: email-security-report
path: email-security-report.json
Script: .github/scripts/check_email_security.py
#!/usr/bin/env python3
import os
import sys
import requests
import json
API_KEY = os.environ['REPUTE_API_KEY']
DOMAIN = os.environ['DOMAIN']
BASE_URL = "https://api.reputeapi.com"
MIN_SCORE = 80
def check_domain():
"""Check domain and fail CI if score too low"""
print(f"Checking email security for: {DOMAIN}")
response = requests.get(
f"{BASE_URL}/api/v1/check",
params={"domain": DOMAIN, "refresh": True},
headers={"X-API-Key": API_KEY},
timeout=15
)
response.raise_for_status()
result = response.json()
score = result['score']
grade = result['grade']
issues = result.get('issues', [])
# Save report
with open('email-security-report.json', 'w') as f:
json.dump(result, f, indent=2)
# Print results
print(f"\n{'='*60}")
print(f"EMAIL SECURITY REPORT")
print(f"{'='*60}")
print(f"Domain: {DOMAIN}")
print(f"Score: {score}/100 ({grade})")
print(f"Issues: {len(issues)}")
# Print critical issues
critical = [i for i in issues if i['severity'] == 'critical']
if critical:
print(f"\nβ Critical Issues ({len(critical)}):")
for issue in critical:
print(f" - {issue['message']}")
if issue.get('remediation'):
print(f" Fix: {issue['remediation']}")
# Check threshold
if score < MIN_SCORE:
print(f"\nβ FAILED: Score {score} below minimum {MIN_SCORE}")
print(f" Please fix email security issues before deploying")
sys.exit(1)
else:
print(f"\nβ
PASSED: Email security meets requirements")
sys.exit(0)
if __name__ == "__main__":
try:
check_domain()
except Exception as e:
print(f"β Error: {e}")
sys.exit(1)
Webhook NotificationsΒΆ
Receive real-time notifications via webhooks.
ImplementationΒΆ
from flask import Flask, request, jsonify
import requests
import hmac
import hashlib
app = Flask(__name__)
API_KEY = "your-api-key-here"
BASE_URL = "https://api.reputeapi.com"
WEBHOOK_SECRET = "your-webhook-secret"
@app.route('/webhook/domain-check', methods=['POST'])
def domain_check_webhook():
"""
Receive webhook notifications for domain checks
This endpoint would be called by ReputeAPI when:
- Score drops below threshold
- New critical issue detected
- Scheduled check completes
"""
# Verify webhook signature
signature = request.headers.get('X-Webhook-Signature')
if not verify_webhook_signature(request.data, signature):
return jsonify({'error': 'Invalid signature'}), 401
# Process webhook data
data = request.json
event_type = data.get('event')
domain = data.get('domain')
score = data.get('score')
print(f"Webhook received: {event_type} for {domain}")
if event_type == 'score_drop':
handle_score_drop(domain, score, data)
elif event_type == 'critical_issue':
handle_critical_issue(domain, data)
elif event_type == 'check_complete':
handle_check_complete(domain, data)
return jsonify({'status': 'received'}), 200
def verify_webhook_signature(payload: bytes, signature: str) -> bool:
"""Verify webhook signature"""
expected = hmac.new(
WEBHOOK_SECRET.encode(),
payload,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(expected, signature)
def handle_score_drop(domain: str, score: int, data: dict):
"""Handle score drop event"""
previous_score = data.get('previous_score')
print(f"β Score drop: {domain} from {previous_score} to {score}")
# Send alert
send_slack_alert(
f"Email security score dropped for {domain}",
f"Score changed from {previous_score} to {score}"
)
def handle_critical_issue(domain: str, data: dict):
"""Handle critical issue event"""
issue = data.get('issue')
print(f"π¨ Critical issue: {domain} - {issue}")
# Create ticket
create_support_ticket(domain, issue)
def handle_check_complete(domain: str, data: dict):
"""Handle check complete event"""
score = data.get('score')
print(f"β Check complete: {domain} - {score}/100")
def send_slack_alert(title: str, message: str):
"""Send Slack notification"""
# TODO: Implement Slack integration
pass
def create_support_ticket(domain: str, issue: str):
"""Create support ticket"""
# TODO: Implement ticketing system integration
pass
if __name__ == '__main__':
app.run(port=5000)
Multi-Tenant SaaS PlatformΒΆ
Handle multiple customer accounts with isolated data.
ImplementationΒΆ
import requests
from typing import Dict, List, Optional
from dataclasses import dataclass
from datetime import datetime
API_KEY = "your-api-key-here"
BASE_URL = "https://api.reputeapi.com"
@dataclass
class Customer:
"""Customer account"""
id: str
name: str
domains: List[str]
plan: str # 'free', 'basic', 'premium'
class EmailSecurityService:
"""Multi-tenant email security service"""
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = BASE_URL
def check_customer_domains(self, customer: Customer) -> Dict:
"""
Check all domains for a customer
Args:
customer: Customer object with domains
Returns:
Dictionary with results per domain
"""
results = {}
for domain in customer.domains:
try:
result = self.check_domain(domain)
results[domain] = {
'status': 'success',
'score': result['score'],
'grade': result['grade'],
'issues': result.get('issues', []),
'checked_at': datetime.now().isoformat()
}
# Store in customer's database
self.save_customer_result(customer.id, domain, result)
except Exception as e:
results[domain] = {
'status': 'error',
'error': str(e)
}
return results
def check_domain(self, domain: str) -> Dict:
"""Check single domain"""
response = requests.get(
f"{self.base_url}/api/v1/check",
params={"domain": domain},
headers={"X-API-Key": self.api_key},
timeout=10
)
response.raise_for_status()
return response.json()
def save_customer_result(self, customer_id: str, domain: str, result: Dict):
"""Save result to customer's database (implement based on your DB)"""
# TODO: Implement database storage
pass
def get_customer_report(self, customer: Customer) -> Dict:
"""Generate comprehensive report for customer"""
results = self.check_customer_domains(customer)
# Calculate aggregate statistics
total_domains = len(results)
avg_score = sum(
r.get('score', 0) for r in results.values()
if r.get('status') == 'success'
) / total_domains if total_domains > 0 else 0
critical_issues = sum(
len([i for i in r.get('issues', []) if i['severity'] == 'critical'])
for r in results.values()
if r.get('status') == 'success'
)
return {
'customer_id': customer.id,
'customer_name': customer.name,
'report_date': datetime.now().isoformat(),
'summary': {
'total_domains': total_domains,
'average_score': round(avg_score, 1),
'critical_issues': critical_issues
},
'domains': results
}
# Usage
if __name__ == "__main__":
service = EmailSecurityService(api_key=API_KEY)
# Customer with multiple domains
customer = Customer(
id="cust_123",
name="Acme Corp",
domains=["acme.com", "acme-mail.com", "acme.io"],
plan="premium"
)
# Generate report
report = service.get_customer_report(customer)
print(f"Report for {report['customer_name']}")
print(f"Average Score: {report['summary']['average_score']}/100")
print(f"Domains: {report['summary']['total_domains']}")
print(f"Critical Issues: {report['summary']['critical_issues']}")
Form ValidationΒΆ
Validate domain in real-time during form submission.
React ExampleΒΆ
import React, { useState } from 'react';
import axios from 'axios';
const API_KEY = 'your-api-key-here';
const BASE_URL = 'https://api.reputeapi.com';
function DomainValidator() {
const [domain, setDomain] = useState('');
const [validating, setValidating] = useState(false);
const [result, setResult] = useState(null);
const [error, setError] = useState(null);
const validateDomain = async () => {
if (!domain) {
setError('Please enter a domain');
return;
}
setValidating(true);
setError(null);
setResult(null);
try {
const response = await axios.get(`${BASE_URL}/api/v1/check`, {
params: { domain },
headers: { 'X-API-Key': API_KEY },
timeout: 10000
});
setResult(response.data);
} catch (err) {
if (err.response) {
setError(`Error: ${err.response.data.message || err.response.statusText}`);
} else {
setError('Failed to validate domain. Please try again.');
}
} finally {
setValidating(false);
}
};
const handleSubmit = (e) => {
e.preventDefault();
validateDomain();
};
return (
<div className="domain-validator">
<h2>Email Security Checker</h2>
<form onSubmit={handleSubmit}>
<input
type="text"
placeholder="Enter domain (e.g., example.com)"
value={domain}
onChange={(e) => setDomain(e.target.value)}
disabled={validating}
/>
<button type="submit" disabled={validating}>
{validating ? 'Checking...' : 'Check Domain'}
</button>
</form>
{error && (
<div className="error">{error}</div>
)}
{result && (
<div className="result">
<h3>{result.domain}</h3>
<div className="score">
<span className="score-value">{result.score}</span>
<span className="score-max">/100</span>
<span className="grade">({result.grade})</span>
</div>
<div className="status">
<div>SPF: {result.spf.present ? 'β' : 'β'}</div>
<div>DKIM: {result.dkim.validated_keys.length > 0 ? 'β' : 'β'}</div>
<div>DMARC: {result.dmarc.present ? 'β' : 'β'}</div>
</div>
{result.issues && result.issues.length > 0 && (
<div className="issues">
<h4>Issues Found:</h4>
<ul>
{result.issues.map((issue, index) => (
<li key={index} className={`severity-${issue.severity}`}>
<strong>[{issue.severity.toUpperCase()}]</strong> {issue.message}
</li>
))}
</ul>
</div>
)}
</div>
)}
</div>
);
}
export default DomainValidator;
Next StepsΒΆ
- Troubleshooting: See Troubleshooting Guide
- Code Library: Browse Code Samples
- Best Practices: Read Integration Best Practices
- API Reference: Check Complete API Docs