Common ScenariosΒΆ

Real-world examples and solutions for integrating ReputeAPI into your applications. Each scenario includes complete, production-ready code.


Customer Onboarding ValidationΒΆ

Validate customer domains during signup to ensure email deliverability.

Use CaseΒΆ

Email service provider needs to validate new customer domains before allowing them to send emails.

RequirementsΒΆ

  • Minimum security score of 70/100
  • No critical issues
  • SPF and DMARC must be present
  • Provide remediation steps for issues

ImplementationΒΆ

import requests
from dataclasses import dataclass
from typing import List, Dict, Optional

API_KEY = "your-api-key-here"
BASE_URL = "https://api.reputeapi.com"

@dataclass
class OnboardingResult:
    """Result of domain onboarding validation"""
    domain: str
    approved: bool
    score: int
    grade: str
    issues: List[Dict]
    remediation_steps: List[str]
    rejection_reason: Optional[str] = None

def validate_onboarding_domain(
    domain: str,
    min_score: int = 70,
    require_dmarc: bool = True,
    require_spf: bool = True
) -> OnboardingResult:
    """
    Validate domain for customer onboarding

    Args:
        domain: Customer's email domain
        min_score: Minimum acceptable score
        require_dmarc: Require DMARC to be present
        require_spf: Require SPF to be present

    Returns:
        OnboardingResult with approval status and recommendations
    """
    try:
        # Check domain
        response = requests.get(
            f"{BASE_URL}/api/v1/check",
            params={"domain": domain},
            headers={"X-API-Key": API_KEY},
            timeout=10
        )
        response.raise_for_status()
        result = response.json()

        # Extract data
        score = result['score']
        grade = result['grade']
        issues = result.get('issues', [])

        # Check for critical issues
        critical_issues = [i for i in issues if i['severity'] == 'critical']

        # Check requirements
        spf_present = result['spf']['present']
        dmarc_present = result['dmarc']['present']

        # Determine approval
        approved = True
        rejection_reason = None
        remediation_steps = []

        if score < min_score:
            approved = False
            rejection_reason = f"Security score ({score}) below minimum ({min_score})"

        if critical_issues:
            approved = False
            if rejection_reason:
                rejection_reason += f"; {len(critical_issues)} critical issues found"
            else:
                rejection_reason = f"{len(critical_issues)} critical issues found"

        if require_spf and not spf_present:
            approved = False
            rejection_reason = (rejection_reason or "") + "; SPF record missing"
            remediation_steps.append("Add SPF record to DNS")

        if require_dmarc and not dmarc_present:
            approved = False
            rejection_reason = (rejection_reason or "") + "; DMARC record missing"
            remediation_steps.append("Add DMARC record to DNS")

        # Get remediation steps from API
        if not approved and not remediation_steps:
            rec_response = requests.post(
                f"{BASE_URL}/api/v1/recommendations",
                headers={"X-API-Key": API_KEY},
                json={"domain": domain, "severity": "critical"}
            )

            if rec_response.ok:
                recommendations = rec_response.json().get('recommendations', [])
                remediation_steps = [r['remediation'] for r in recommendations[:5]]

        return OnboardingResult(
            domain=domain,
            approved=approved,
            score=score,
            grade=grade,
            issues=issues,
            remediation_steps=remediation_steps,
            rejection_reason=rejection_reason
        )

    except Exception as e:
        return OnboardingResult(
            domain=domain,
            approved=False,
            score=0,
            grade="Error",
            issues=[],
            remediation_steps=[],
            rejection_reason=f"Validation failed: {str(e)}"
        )

# Usage Example
def onboard_customer(email_domain: str):
    """Onboard new customer with domain validation"""

    print(f"Validating domain: {email_domain}")

    result = validate_onboarding_domain(
        domain=email_domain,
        min_score=70,
        require_dmarc=True,
        require_spf=True
    )

    if result.approved:
        print(f"βœ… Domain approved!")
        print(f"   Score: {result.score}/100 ({result.grade})")
        print(f"   Customer can proceed with onboarding")
        # TODO: Create customer account
        return True
    else:
        print(f"❌ Domain rejected")
        print(f"   Reason: {result.rejection_reason}")
        print(f"   Score: {result.score}/100")

        if result.remediation_steps:
            print(f"\n   Required fixes:")
            for i, step in enumerate(result.remediation_steps, 1):
                print(f"   {i}. {step}")

        # TODO: Send email to customer with remediation steps
        return False

# Test
if __name__ == "__main__":
    onboard_customer("example.com")

Expected Output:

Validating domain: example.com
βœ… Domain approved!
   Score: 82/100 (Good)
   Customer can proceed with onboarding


Continuous Domain MonitoringΒΆ

Monitor domains 24/7 and alert on security score changes.

Use CaseΒΆ

Monitor company domains and customer domains for security configuration changes.

ImplementationΒΆ

import requests
import schedule
import time
from datetime import datetime
from typing import Dict, List, Optional
import smtplib
from email.mime.text import MIMEText

API_KEY = "your-api-key-here"
BASE_URL = "https://api.reputeapi.com"

class DomainMonitor:
    """Continuous domain monitoring with alerting"""

    def __init__(
        self,
        api_key: str,
        alert_threshold: int = 70,
        alert_email: Optional[str] = None
    ):
        self.api_key = api_key
        self.base_url = BASE_URL
        self.alert_threshold = alert_threshold
        self.alert_email = alert_email
        self.previous_scores: Dict[str, int] = {}
        self.previous_issues: Dict[str, List] = {}

    def check_domain(self, domain: str) -> Dict:
        """Check single domain"""
        response = requests.get(
            f"{self.base_url}/api/v1/check",
            params={"domain": domain, "refresh": True},
            headers={"X-API-Key": self.api_key},
            timeout=10
        )
        response.raise_for_status()
        return response.json()

    def monitor_domain(self, domain: str):
        """
        Monitor domain and alert on changes

        Triggers alerts when:
        - Score drops below threshold
        - Score drops by 10+ points
        - New critical issues appear
        """
        timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

        try:
            result = self.check_domain(domain)
            current_score = result['score']
            previous_score = self.previous_scores.get(domain)

            print(f"[{timestamp}] {domain}: {current_score}/100")

            # Check for score below threshold
            if current_score < self.alert_threshold:
                self.send_alert(
                    domain,
                    f"ALERT: Score below threshold",
                    f"Current score: {current_score}\nThreshold: {self.alert_threshold}"
                )

            # Check for significant drop
            if previous_score and current_score < previous_score - 10:
                self.send_alert(
                    domain,
                    f"ALERT: Score dropped significantly",
                    f"Previous: {previous_score}\nCurrent: {current_score}\nDrop: {previous_score - current_score} points"
                )

            # Check for new critical issues
            current_critical = [
                i for i in result.get('issues', [])
                if i['severity'] == 'critical'
            ]

            previous_critical = self.previous_issues.get(domain, [])

            if len(current_critical) > len(previous_critical):
                new_issues = len(current_critical) - len(previous_critical)
                self.send_alert(
                    domain,
                    f"ALERT: New critical issues detected",
                    f"New critical issues: {new_issues}\n\nIssues:\n" +
                    "\n".join([f"- {i['message']}" for i in current_critical])
                )

            # Update tracking
            self.previous_scores[domain] = current_score
            self.previous_issues[domain] = current_critical

        except Exception as e:
            error_msg = f"Failed to check {domain}: {str(e)}"
            print(f"[{timestamp}] ❌ {error_msg}")
            self.send_alert(domain, "ALERT: Check failed", error_msg)

    def monitor_multiple_domains(self, domains: List[str]):
        """Monitor multiple domains"""
        for domain in domains:
            self.monitor_domain(domain)
            time.sleep(1)  # Rate limiting

    def send_alert(self, domain: str, subject: str, message: str):
        """Send alert notification"""
        alert_text = f"""
Domain Monitoring Alert
=======================

Domain: {domain}
Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

{message}
"""
        print(f"\n🚨 ALERT: {subject}")
        print(alert_text)

        # Send email if configured
        if self.alert_email:
            self.send_email_alert(subject, alert_text)

    def send_email_alert(self, subject: str, body: str):
        """Send email alert (configure SMTP settings)"""
        # TODO: Configure your SMTP settings
        pass

def main():
    """Main monitoring loop"""

    # Configuration
    monitor = DomainMonitor(
        api_key=API_KEY,
        alert_threshold=75,
        alert_email="alerts@example.com"
    )

    # Domains to monitor
    monitored_domains = [
        "example.com",
        "mycompany.com",
        "customer1.com"
    ]

    print("Starting domain monitor...")
    print(f"Monitoring {len(monitored_domains)} domains")
    print(f"Alert threshold: {monitor.alert_threshold}")
    print(f"Check interval: Every hour\n")

    # Schedule checks every hour
    schedule.every().hour.do(
        lambda: monitor.monitor_multiple_domains(monitored_domains)
    )

    # Run initial check
    monitor.monitor_multiple_domains(monitored_domains)

    # Keep running
    while True:
        schedule.run_pending()
        time.sleep(60)

if __name__ == "__main__":
    main()

Pre-Send ValidationΒΆ

Validate domain before sending marketing emails.

ImplementationΒΆ

import requests
from typing import Dict, Optional

API_KEY = "your-api-key-here"
BASE_URL = "https://api.reputeapi.com"

def validate_before_send(
    domain: str,
    min_score: int = 75,
    check_dmarc_policy: bool = True
) -> tuple[bool, Optional[str]]:
    """
    Validate domain before sending emails

    Args:
        domain: Domain to validate
        min_score: Minimum required score
        check_dmarc_policy: Check if DMARC policy is 'reject' or 'quarantine'

    Returns:
        Tuple of (approved, reason)
    """
    try:
        response = requests.get(
            f"{BASE_URL}/api/v1/check",
            params={"domain": domain},
            headers={"X-API-Key": API_KEY},
            timeout=10
        )
        response.raise_for_status()
        result = response.json()

        # Check score
        if result['score'] < min_score:
            return False, f"Security score too low: {result['score']}/100"

        # Check SPF
        if not result['spf']['present']:
            return False, "SPF record missing"

        # Check DMARC
        if not result['dmarc']['present']:
            return False, "DMARC record missing"

        # Check DMARC policy
        if check_dmarc_policy:
            policy = result['dmarc'].get('policy', 'none')
            if policy == 'none':
                return False, "DMARC policy is 'none' - emails may be rejected"

        return True, None

    except Exception as e:
        return False, f"Validation error: {str(e)}"

# Usage in email sending function
def send_marketing_email(to_email: str, subject: str, body: str):
    """Send marketing email with pre-send validation"""

    # Extract domain from email
    domain = to_email.split('@')[1]

    # Validate domain
    approved, reason = validate_before_send(domain)

    if not approved:
        print(f"❌ Cannot send to {to_email}")
        print(f"   Reason: {reason}")
        # Log for later review
        return False

    # Domain validated - send email
    print(f"βœ… Domain validated: {domain}")
    # TODO: Send email via your ESP
    print(f"   Sending email to {to_email}")
    return True

# Test
if __name__ == "__main__":
    send_marketing_email("user@example.com", "Test", "Body")

CI/CD IntegrationΒΆ

Integrate email security checks into deployment pipeline.

GitHub Actions ExampleΒΆ

# .github/workflows/email-security-check.yml
name: Email Security Check

on:
  push:
    branches: [main, develop]
  pull_request:
    branches: [main]
  schedule:
    # Run daily at 9am
    - cron: '0 9 * * *'

jobs:
  check-email-security:
    runs-on: ubuntu-latest

    steps:
      - name: Checkout code
        uses: actions/checkout@v3

      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.11'

      - name: Install dependencies
        run: |
          pip install requests

      - name: Check domain email security
        env:
          REPUTE_API_KEY: ${{ secrets.REPUTE_API_KEY }}
          DOMAIN: ${{ vars.COMPANY_DOMAIN }}
        run: |
          python .github/scripts/check_email_security.py

      - name: Upload results
        uses: actions/upload-artifact@v3
        if: always()
        with:
          name: email-security-report
          path: email-security-report.json

Script: .github/scripts/check_email_security.py

#!/usr/bin/env python3
import os
import sys
import requests
import json

API_KEY = os.environ['REPUTE_API_KEY']
DOMAIN = os.environ['DOMAIN']
BASE_URL = "https://api.reputeapi.com"
MIN_SCORE = 80

def check_domain():
    """Check domain and fail CI if score too low"""

    print(f"Checking email security for: {DOMAIN}")

    response = requests.get(
        f"{BASE_URL}/api/v1/check",
        params={"domain": DOMAIN, "refresh": True},
        headers={"X-API-Key": API_KEY},
        timeout=15
    )

    response.raise_for_status()
    result = response.json()

    score = result['score']
    grade = result['grade']
    issues = result.get('issues', [])

    # Save report
    with open('email-security-report.json', 'w') as f:
        json.dump(result, f, indent=2)

    # Print results
    print(f"\n{'='*60}")
    print(f"EMAIL SECURITY REPORT")
    print(f"{'='*60}")
    print(f"Domain: {DOMAIN}")
    print(f"Score: {score}/100 ({grade})")
    print(f"Issues: {len(issues)}")

    # Print critical issues
    critical = [i for i in issues if i['severity'] == 'critical']
    if critical:
        print(f"\n❌ Critical Issues ({len(critical)}):")
        for issue in critical:
            print(f"  - {issue['message']}")
            if issue.get('remediation'):
                print(f"    Fix: {issue['remediation']}")

    # Check threshold
    if score < MIN_SCORE:
        print(f"\n❌ FAILED: Score {score} below minimum {MIN_SCORE}")
        print(f"   Please fix email security issues before deploying")
        sys.exit(1)
    else:
        print(f"\nβœ… PASSED: Email security meets requirements")
        sys.exit(0)

if __name__ == "__main__":
    try:
        check_domain()
    except Exception as e:
        print(f"❌ Error: {e}")
        sys.exit(1)

Webhook NotificationsΒΆ

Receive real-time notifications via webhooks.

ImplementationΒΆ

from flask import Flask, request, jsonify
import requests
import hmac
import hashlib

app = Flask(__name__)

API_KEY = "your-api-key-here"
BASE_URL = "https://api.reputeapi.com"
WEBHOOK_SECRET = "your-webhook-secret"

@app.route('/webhook/domain-check', methods=['POST'])
def domain_check_webhook():
    """
    Receive webhook notifications for domain checks

    This endpoint would be called by ReputeAPI when:
    - Score drops below threshold
    - New critical issue detected
    - Scheduled check completes
    """

    # Verify webhook signature
    signature = request.headers.get('X-Webhook-Signature')
    if not verify_webhook_signature(request.data, signature):
        return jsonify({'error': 'Invalid signature'}), 401

    # Process webhook data
    data = request.json

    event_type = data.get('event')
    domain = data.get('domain')
    score = data.get('score')

    print(f"Webhook received: {event_type} for {domain}")

    if event_type == 'score_drop':
        handle_score_drop(domain, score, data)

    elif event_type == 'critical_issue':
        handle_critical_issue(domain, data)

    elif event_type == 'check_complete':
        handle_check_complete(domain, data)

    return jsonify({'status': 'received'}), 200

def verify_webhook_signature(payload: bytes, signature: str) -> bool:
    """Verify webhook signature"""
    expected = hmac.new(
        WEBHOOK_SECRET.encode(),
        payload,
        hashlib.sha256
    ).hexdigest()

    return hmac.compare_digest(expected, signature)

def handle_score_drop(domain: str, score: int, data: dict):
    """Handle score drop event"""
    previous_score = data.get('previous_score')
    print(f"⚠ Score drop: {domain} from {previous_score} to {score}")

    # Send alert
    send_slack_alert(
        f"Email security score dropped for {domain}",
        f"Score changed from {previous_score} to {score}"
    )

def handle_critical_issue(domain: str, data: dict):
    """Handle critical issue event"""
    issue = data.get('issue')
    print(f"🚨 Critical issue: {domain} - {issue}")

    # Create ticket
    create_support_ticket(domain, issue)

def handle_check_complete(domain: str, data: dict):
    """Handle check complete event"""
    score = data.get('score')
    print(f"βœ“ Check complete: {domain} - {score}/100")

def send_slack_alert(title: str, message: str):
    """Send Slack notification"""
    # TODO: Implement Slack integration
    pass

def create_support_ticket(domain: str, issue: str):
    """Create support ticket"""
    # TODO: Implement ticketing system integration
    pass

if __name__ == '__main__':
    app.run(port=5000)

Multi-Tenant SaaS PlatformΒΆ

Handle multiple customer accounts with isolated data.

ImplementationΒΆ

import requests
from typing import Dict, List, Optional
from dataclasses import dataclass
from datetime import datetime

API_KEY = "your-api-key-here"
BASE_URL = "https://api.reputeapi.com"

@dataclass
class Customer:
    """Customer account"""
    id: str
    name: str
    domains: List[str]
    plan: str  # 'free', 'basic', 'premium'

class EmailSecurityService:
    """Multi-tenant email security service"""

    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = BASE_URL

    def check_customer_domains(self, customer: Customer) -> Dict:
        """
        Check all domains for a customer

        Args:
            customer: Customer object with domains

        Returns:
            Dictionary with results per domain
        """
        results = {}

        for domain in customer.domains:
            try:
                result = self.check_domain(domain)

                results[domain] = {
                    'status': 'success',
                    'score': result['score'],
                    'grade': result['grade'],
                    'issues': result.get('issues', []),
                    'checked_at': datetime.now().isoformat()
                }

                # Store in customer's database
                self.save_customer_result(customer.id, domain, result)

            except Exception as e:
                results[domain] = {
                    'status': 'error',
                    'error': str(e)
                }

        return results

    def check_domain(self, domain: str) -> Dict:
        """Check single domain"""
        response = requests.get(
            f"{self.base_url}/api/v1/check",
            params={"domain": domain},
            headers={"X-API-Key": self.api_key},
            timeout=10
        )
        response.raise_for_status()
        return response.json()

    def save_customer_result(self, customer_id: str, domain: str, result: Dict):
        """Save result to customer's database (implement based on your DB)"""
        # TODO: Implement database storage
        pass

    def get_customer_report(self, customer: Customer) -> Dict:
        """Generate comprehensive report for customer"""

        results = self.check_customer_domains(customer)

        # Calculate aggregate statistics
        total_domains = len(results)
        avg_score = sum(
            r.get('score', 0) for r in results.values()
            if r.get('status') == 'success'
        ) / total_domains if total_domains > 0 else 0

        critical_issues = sum(
            len([i for i in r.get('issues', []) if i['severity'] == 'critical'])
            for r in results.values()
            if r.get('status') == 'success'
        )

        return {
            'customer_id': customer.id,
            'customer_name': customer.name,
            'report_date': datetime.now().isoformat(),
            'summary': {
                'total_domains': total_domains,
                'average_score': round(avg_score, 1),
                'critical_issues': critical_issues
            },
            'domains': results
        }

# Usage
if __name__ == "__main__":
    service = EmailSecurityService(api_key=API_KEY)

    # Customer with multiple domains
    customer = Customer(
        id="cust_123",
        name="Acme Corp",
        domains=["acme.com", "acme-mail.com", "acme.io"],
        plan="premium"
    )

    # Generate report
    report = service.get_customer_report(customer)

    print(f"Report for {report['customer_name']}")
    print(f"Average Score: {report['summary']['average_score']}/100")
    print(f"Domains: {report['summary']['total_domains']}")
    print(f"Critical Issues: {report['summary']['critical_issues']}")

Form ValidationΒΆ

Validate domain in real-time during form submission.

React ExampleΒΆ

import React, { useState } from 'react';
import axios from 'axios';

const API_KEY = 'your-api-key-here';
const BASE_URL = 'https://api.reputeapi.com';

function DomainValidator() {
  const [domain, setDomain] = useState('');
  const [validating, setValidating] = useState(false);
  const [result, setResult] = useState(null);
  const [error, setError] = useState(null);

  const validateDomain = async () => {
    if (!domain) {
      setError('Please enter a domain');
      return;
    }

    setValidating(true);
    setError(null);
    setResult(null);

    try {
      const response = await axios.get(`${BASE_URL}/api/v1/check`, {
        params: { domain },
        headers: { 'X-API-Key': API_KEY },
        timeout: 10000
      });

      setResult(response.data);
    } catch (err) {
      if (err.response) {
        setError(`Error: ${err.response.data.message || err.response.statusText}`);
      } else {
        setError('Failed to validate domain. Please try again.');
      }
    } finally {
      setValidating(false);
    }
  };

  const handleSubmit = (e) => {
    e.preventDefault();
    validateDomain();
  };

  return (
    <div className="domain-validator">
      <h2>Email Security Checker</h2>

      <form onSubmit={handleSubmit}>
        <input
          type="text"
          placeholder="Enter domain (e.g., example.com)"
          value={domain}
          onChange={(e) => setDomain(e.target.value)}
          disabled={validating}
        />
        <button type="submit" disabled={validating}>
          {validating ? 'Checking...' : 'Check Domain'}
        </button>
      </form>

      {error && (
        <div className="error">{error}</div>
      )}

      {result && (
        <div className="result">
          <h3>{result.domain}</h3>
          <div className="score">
            <span className="score-value">{result.score}</span>
            <span className="score-max">/100</span>
            <span className="grade">({result.grade})</span>
          </div>

          <div className="status">
            <div>SPF: {result.spf.present ? 'βœ“' : 'βœ—'}</div>
            <div>DKIM: {result.dkim.validated_keys.length > 0 ? 'βœ“' : 'βœ—'}</div>
            <div>DMARC: {result.dmarc.present ? 'βœ“' : 'βœ—'}</div>
          </div>

          {result.issues && result.issues.length > 0 && (
            <div className="issues">
              <h4>Issues Found:</h4>
              <ul>
                {result.issues.map((issue, index) => (
                  <li key={index} className={`severity-${issue.severity}`}>
                    <strong>[{issue.severity.toUpperCase()}]</strong> {issue.message}
                  </li>
                ))}
              </ul>
            </div>
          )}
        </div>
      )}
    </div>
  );
}

export default DomainValidator;

Next StepsΒΆ


Additional ResourcesΒΆ