Python Integration GuideΒΆ

Complete guide to integrating ReputeAPI with Python applications.


InstallationΒΆ

ReputeAPI works with any HTTP client library. We recommend using requests for simplicity or httpx for async support.

requests:

pip install requests

httpx (async):

pip install httpx

aiohttp:

pip install aiohttp


Quick StartΒΆ

Basic Domain CheckΒΆ

import requests

API_KEY = "your-api-key-here"
BASE_URL = "https://api.reputeapi.com"

def check_domain(domain):
    """Check domain mailflow security"""
    response = requests.get(
        f"{BASE_URL}/api/v1/check",
        params={"domain": domain},
        headers={"X-API-Key": API_KEY}
    )
    response.raise_for_status()
    return response.json()

# Usage
result = check_domain("example.com")
print(f"Score: {result['score']}/100")
print(f"Grade: {result['grade']}")

Configuration Best PracticesΒΆ

Store your API key securely using environment variables:

import os
from dotenv import load_dotenv

# Load from .env file
load_dotenv()

API_KEY = os.getenv("REPUTE_API_KEY")
BASE_URL = os.getenv("REPUTE_API_URL", "https://api.reputeapi.com")

if not API_KEY:
    raise ValueError("REPUTE_API_KEY environment variable not set")

.env file:

REPUTE_API_KEY=your-api-key-here
REPUTE_API_URL=https://api.reputeapi.com


Client Wrapper ClassΒΆ

Create a reusable client for your application:

import requests
from typing import Dict, List, Optional
from datetime import datetime

class ReputeAPIClient:
    """Python client for ReputeAPI"""

    def __init__(self, api_key: str, base_url: str = "https://api.reputeapi.com"):
        self.api_key = api_key
        self.base_url = base_url
        self.session = requests.Session()
        self.session.headers.update({"X-API-Key": api_key})

    def check_domain(
        self,
        domain: str,
        selectors: Optional[List[str]] = None,
        verbose: bool = False,
        refresh: bool = False
    ) -> Dict:
        """
        Run complete mailflow security check

        Args:
            domain: Domain to check (e.g., "example.com")
            selectors: DKIM selectors to check
            verbose: Include verbose DNS details
            refresh: Bypass cache and fetch fresh data

        Returns:
            Full validation results with score and issues
        """
        params = {
            "domain": domain,
            "verbose": verbose,
            "refresh": refresh
        }

        if selectors:
            params["selectors"] = ",".join(selectors)

        response = self.session.get(
            f"{self.base_url}/api/v1/check",
            params=params
        )
        response.raise_for_status()
        return response.json()

    def get_score(self, domain: str, refresh: bool = False) -> Dict:
        """
        Get quick score check (faster than full check)

        Args:
            domain: Domain to check
            refresh: Bypass cache

        Returns:
            Score and top issues
        """
        response = self.session.get(
            f"{self.base_url}/api/v1/score",
            params={"domain": domain, "refresh": refresh}
        )
        response.raise_for_status()
        return response.json()

    def get_recommendations(
        self,
        domain: str,
        severity: Optional[str] = None
    ) -> Dict:
        """
        Get prioritized recommendations

        Args:
            domain: Domain to get recommendations for
            severity: Filter by severity (critical, high, medium, low)

        Returns:
            Prioritized list of fixes
        """
        data = {"domain": domain}
        if severity:
            data["severity"] = severity

        response = self.session.post(
            f"{self.base_url}/api/v1/recommendations",
            json=data
        )
        response.raise_for_status()
        return response.json()

    def get_history(
        self,
        domain: str,
        days: int = 30,
        start_date: Optional[datetime] = None,
        end_date: Optional[datetime] = None
    ) -> Dict:
        """
        Get historical validation data

        Args:
            domain: Domain to get history for
            days: Number of days to retrieve (if dates not specified)
            start_date: Start date for history
            end_date: End date for history

        Returns:
            Historical snapshots
        """
        params = {"domain": domain}

        if start_date and end_date:
            params["start_date"] = start_date.isoformat()
            params["end_date"] = end_date.isoformat()
        else:
            params["days"] = days

        response = self.session.get(
            f"{self.base_url}/api/v1/history",
            params=params
        )
        response.raise_for_status()
        return response.json()

    def get_usage(self) -> Dict:
        """
        Get API usage statistics

        Returns:
            Usage stats for current billing period
        """
        response = self.session.get(f"{self.base_url}/api/v1/usage")
        response.raise_for_status()
        return response.json()

    def bulk_validate(self, domains: List[str]) -> Dict:
        """
        Validate multiple domains (legacy endpoint)

        Args:
            domains: List of domains to validate

        Returns:
            Results for all domains
        """
        response = self.session.post(
            f"{self.base_url}/v1/bulk-validate",
            json={"domains": domains}
        )
        response.raise_for_status()
        return response.json()

    def close(self):
        """Close the session"""
        self.session.close()

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()


# Usage
with ReputeAPIClient(api_key=API_KEY) as client:
    result = client.check_domain("example.com")
    print(f"Score: {result['score']}/100")

Error HandlingΒΆ

Comprehensive Error HandlingΒΆ

import requests
from requests.exceptions import (
    RequestException,
    Timeout,
    ConnectionError,
    HTTPError
)

class ReputeAPIError(Exception):
    """Base exception for ReputeAPI errors"""
    pass

class AuthenticationError(ReputeAPIError):
    """Invalid API key"""
    pass

class RateLimitError(ReputeAPIError):
    """Rate limit exceeded"""
    def __init__(self, message, retry_after=None):
        super().__init__(message)
        self.retry_after = retry_after

class ValidationError(ReputeAPIError):
    """Invalid request parameters"""
    pass

class ServerError(ReputeAPIError):
    """Server-side error"""
    pass


def check_domain_safe(domain: str) -> Optional[Dict]:
    """
    Check domain with comprehensive error handling

    Returns:
        Validation result or None if error occurred
    """
    try:
        response = requests.get(
            f"{BASE_URL}/api/v1/check",
            params={"domain": domain},
            headers={"X-API-Key": API_KEY},
            timeout=10  # 10 second timeout
        )

        # Check for HTTP errors
        if response.status_code == 401:
            raise AuthenticationError("Invalid or missing API key")

        elif response.status_code == 429:
            retry_after = response.headers.get("Retry-After", 60)
            raise RateLimitError(
                "Rate limit exceeded",
                retry_after=int(retry_after)
            )

        elif response.status_code == 400:
            error_data = response.json()
            raise ValidationError(f"Invalid request: {error_data.get('message')}")

        elif response.status_code >= 500:
            raise ServerError(f"Server error: {response.status_code}")

        response.raise_for_status()
        return response.json()

    except AuthenticationError as e:
        print(f"Authentication error: {e}")
        print("Please check your API key")
        return None

    except RateLimitError as e:
        print(f"Rate limit exceeded. Retry after {e.retry_after} seconds")
        return None

    except ValidationError as e:
        print(f"Validation error: {e}")
        return None

    except Timeout:
        print(f"Request timed out for domain: {domain}")
        return None

    except ConnectionError:
        print("Connection error. Please check your internet connection")
        return None

    except HTTPError as e:
        print(f"HTTP error: {e}")
        return None

    except RequestException as e:
        print(f"Request failed: {e}")
        return None


# Usage
result = check_domain_safe("example.com")
if result:
    print(f"Score: {result['score']}/100")
else:
    print("Failed to check domain")

Retry Logic with Exponential BackoffΒΆ

import time
from functools import wraps

def retry_with_backoff(max_retries=3, base_delay=1, max_delay=60):
    """
    Decorator to retry failed requests with exponential backoff

    Args:
        max_retries: Maximum number of retry attempts
        base_delay: Initial delay in seconds
        max_delay: Maximum delay in seconds
    """
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            retries = 0
            delay = base_delay

            while retries < max_retries:
                try:
                    return func(*args, **kwargs)

                except RateLimitError as e:
                    # Use server-provided retry_after if available
                    delay = e.retry_after if e.retry_after else delay
                    print(f"Rate limited. Waiting {delay} seconds...")
                    time.sleep(delay)
                    retries += 1

                except (Timeout, ConnectionError, ServerError) as e:
                    retries += 1
                    if retries >= max_retries:
                        raise

                    print(f"Error: {e}. Retrying in {delay} seconds... (attempt {retries}/{max_retries})")
                    time.sleep(delay)

                    # Exponential backoff with jitter
                    delay = min(delay * 2, max_delay)

                except (AuthenticationError, ValidationError):
                    # Don't retry on authentication or validation errors
                    raise

            raise Exception(f"Max retries ({max_retries}) exceeded")

        return wrapper
    return decorator


@retry_with_backoff(max_retries=3, base_delay=2)
def check_domain_with_retry(domain: str) -> Dict:
    """Check domain with automatic retry"""
    response = requests.get(
        f"{BASE_URL}/api/v1/check",
        params={"domain": domain},
        headers={"X-API-Key": API_KEY},
        timeout=10
    )

    if response.status_code == 429:
        retry_after = int(response.headers.get("Retry-After", 60))
        raise RateLimitError("Rate limit exceeded", retry_after=retry_after)

    if response.status_code >= 500:
        raise ServerError(f"Server error: {response.status_code}")

    response.raise_for_status()
    return response.json()


# Usage
try:
    result = check_domain_with_retry("example.com")
    print(f"Score: {result['score']}/100")
except Exception as e:
    print(f"Failed after retries: {e}")

Async SupportΒΆ

import httpx
import asyncio
from typing import List

class AsyncReputeAPIClient:
    """Async Python client for ReputeAPI"""

    def __init__(self, api_key: str, base_url: str = "https://api.reputeapi.com"):
        self.api_key = api_key
        self.base_url = base_url
        self.headers = {"X-API-Key": api_key}

    async def check_domain(self, domain: str, **kwargs) -> Dict:
        """Check domain asynchronously"""
        async with httpx.AsyncClient() as client:
            response = await client.get(
                f"{self.base_url}/api/v1/check",
                params={"domain": domain, **kwargs},
                headers=self.headers,
                timeout=10.0
            )
            response.raise_for_status()
            return response.json()

    async def get_score(self, domain: str) -> Dict:
        """Get score asynchronously"""
        async with httpx.AsyncClient() as client:
            response = await client.get(
                f"{self.base_url}/api/v1/score",
                params={"domain": domain},
                headers=self.headers,
                timeout=10.0
            )
            response.raise_for_status()
            return response.json()

    async def check_multiple_domains(self, domains: List[str]) -> List[Dict]:
        """
        Check multiple domains concurrently

        Args:
            domains: List of domains to check

        Returns:
            List of results (one per domain)
        """
        tasks = [self.check_domain(domain) for domain in domains]
        return await asyncio.gather(*tasks, return_exceptions=True)


# Usage
async def main():
    client = AsyncReputeAPIClient(api_key=API_KEY)

    # Single domain
    result = await client.check_domain("example.com")
    print(f"Score: {result['score']}/100")

    # Multiple domains concurrently
    domains = ["example.com", "google.com", "github.com"]
    results = await client.check_multiple_domains(domains)

    for domain, result in zip(domains, results):
        if isinstance(result, Exception):
            print(f"{domain}: Error - {result}")
        else:
            print(f"{domain}: {result['score']}/100")

# Run
asyncio.run(main())

Using aiohttpΒΆ

import aiohttp
import asyncio

async def check_domain_aiohttp(domain: str) -> Dict:
    """Check domain using aiohttp"""
    async with aiohttp.ClientSession() as session:
        async with session.get(
            f"{BASE_URL}/api/v1/check",
            params={"domain": domain},
            headers={"X-API-Key": API_KEY},
            timeout=aiohttp.ClientTimeout(total=10)
        ) as response:
            response.raise_for_status()
            return await response.json()

# Usage
result = asyncio.run(check_domain_aiohttp("example.com"))
print(f"Score: {result['score']}/100")

Common Use CasesΒΆ

1. Domain Onboarding ValidatorΒΆ

from dataclasses import dataclass
from typing import List

@dataclass
class OnboardingResult:
    domain: str
    approved: bool
    score: int
    critical_issues: List[Dict]
    recommendations: List[str]

def validate_customer_domain(domain: str, min_score: int = 50) -> OnboardingResult:
    """
    Validate customer domain during onboarding

    Args:
        domain: Customer's email domain
        min_score: Minimum acceptable score

    Returns:
        OnboardingResult with approval status
    """
    client = ReputeAPIClient(api_key=API_KEY)

    try:
        # Get full check
        result = client.check_domain(domain)

        score = result["score"]
        issues = result.get("issues", [])

        # Filter critical issues
        critical_issues = [
            issue for issue in issues
            if issue["severity"] == "critical"
        ]

        # Get recommendations for critical issues
        recommendations = []
        if critical_issues:
            rec_result = client.get_recommendations(domain, severity="critical")
            recommendations = [
                rec["remediation"]
                for rec in rec_result.get("recommendations", [])
            ]

        # Approve if score meets minimum and no critical issues
        approved = score >= min_score and len(critical_issues) == 0

        return OnboardingResult(
            domain=domain,
            approved=approved,
            score=score,
            critical_issues=critical_issues,
            recommendations=recommendations
        )

    except Exception as e:
        print(f"Error validating domain: {e}")
        return OnboardingResult(
            domain=domain,
            approved=False,
            score=0,
            critical_issues=[],
            recommendations=["Unable to validate domain"]
        )


# Usage
result = validate_customer_domain("customer-domain.com", min_score=70)

if result.approved:
    print(f"Domain approved with score: {result.score}/100")
else:
    print(f"Domain rejected. Score: {result.score}/100")
    print(f"Critical issues found: {len(result.critical_issues)}")
    print("\nRecommendations:")
    for rec in result.recommendations:
        print(f"  - {rec}")

2. Batch Domain AuditorΒΆ

import csv
from datetime import datetime
from typing import List, Dict

def audit_domains_batch(
    domains: List[str],
    output_file: str = "audit_results.csv"
) -> List[Dict]:
    """
    Audit multiple domains and save results to CSV

    Args:
        domains: List of domains to audit
        output_file: CSV file to save results

    Returns:
        List of audit results
    """
    client = ReputeAPIClient(api_key=API_KEY)
    results = []

    print(f"Auditing {len(domains)} domains...")

    for i, domain in enumerate(domains, 1):
        print(f"[{i}/{len(domains)}] Checking {domain}...", end=" ")

        try:
            result = client.get_score(domain)  # Use score endpoint for speed

            audit_result = {
                "domain": domain,
                "score": result["score"],
                "grade": result["grade"],
                "timestamp": datetime.now().isoformat(),
                "status": "success"
            }

            # Get issue count by severity
            top_issues = result.get("top_issues", [])
            audit_result["critical_count"] = sum(
                1 for i in top_issues if i["severity"] == "critical"
            )
            audit_result["high_count"] = sum(
                1 for i in top_issues if i["severity"] == "high"
            )

            print(f"Score: {result['score']}/100")

        except Exception as e:
            print(f"Error: {e}")
            audit_result = {
                "domain": domain,
                "score": 0,
                "grade": "Error",
                "timestamp": datetime.now().isoformat(),
                "status": f"error: {str(e)}",
                "critical_count": 0,
                "high_count": 0
            }

        results.append(audit_result)

        # Rate limiting: wait between requests
        time.sleep(0.5)

    # Save to CSV
    if results:
        with open(output_file, "w", newline="") as csvfile:
            fieldnames = [
                "domain", "score", "grade", "critical_count",
                "high_count", "status", "timestamp"
            ]
            writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
            writer.writeheader()
            writer.writerows(results)

        print(f"\nResults saved to {output_file}")

    return results


# Usage
domains = [
    "example.com",
    "test.com",
    "demo.com"
]

results = audit_domains_batch(domains)

# Summary statistics
total_domains = len(results)
passing_domains = sum(1 for r in results if r["score"] >= 80)
print(f"\nSummary: {passing_domains}/{total_domains} domains passing (80+)")

3. Continuous Monitoring ScriptΒΆ

import schedule
import time
from datetime import datetime

class DomainMonitor:
    """Continuous domain monitoring with alerting"""

    def __init__(self, api_key: str, alert_threshold: int = 70):
        self.client = ReputeAPIClient(api_key=api_key)
        self.alert_threshold = alert_threshold
        self.previous_scores = {}

    def check_domain(self, domain: str):
        """Check domain and alert if score drops"""
        try:
            result = self.client.get_score(domain)
            current_score = result["score"]
            previous_score = self.previous_scores.get(domain)

            print(f"[{datetime.now()}] {domain}: {current_score}/100")

            # Alert if score below threshold
            if current_score < self.alert_threshold:
                self.send_alert(
                    domain,
                    f"Score below threshold: {current_score} < {self.alert_threshold}"
                )

            # Alert if score dropped significantly
            if previous_score and current_score < previous_score - 10:
                self.send_alert(
                    domain,
                    f"Score dropped from {previous_score} to {current_score}"
                )

            # Update score history
            self.previous_scores[domain] = current_score

        except Exception as e:
            print(f"Error checking {domain}: {e}")
            self.send_alert(domain, f"Check failed: {str(e)}")

    def send_alert(self, domain: str, message: str):
        """Send alert (implement your notification method)"""
        print(f"ALERT for {domain}: {message}")

        # Add your notification logic here:
        # - Send email
        # - Post to Slack
        # - Create ticket
        # - Send SMS
        # etc.

    def monitor_domains(self, domains: List[str]):
        """Monitor list of domains"""
        for domain in domains:
            self.check_domain(domain)


# Setup monitoring
monitor = DomainMonitor(api_key=API_KEY, alert_threshold=75)
monitored_domains = ["example.com", "test.com"]

# Schedule checks
schedule.every().hour.do(
    lambda: monitor.monitor_domains(monitored_domains)
)

print("Starting domain monitor...")
print(f"Monitoring: {', '.join(monitored_domains)}")

# Run initial check
monitor.monitor_domains(monitored_domains)

# Keep running
while True:
    schedule.run_pending()
    time.sleep(60)

4. Flask Web Application IntegrationΒΆ

from flask import Flask, jsonify, request
from functools import lru_cache
import os

app = Flask(__name__)
client = ReputeAPIClient(api_key=os.getenv("REPUTE_API_KEY"))

@lru_cache(maxsize=128)
def cached_domain_check(domain: str) -> Dict:
    """Cache domain checks for 15 minutes"""
    return client.check_domain(domain)

@app.route("/api/check/<domain>", methods=["GET"])
def check_domain_endpoint(domain: str):
    """Check domain via web API"""
    try:
        result = cached_domain_check(domain)
        return jsonify(result), 200

    except AuthenticationError:
        return jsonify({"error": "API authentication failed"}), 500

    except ValidationError as e:
        return jsonify({"error": str(e)}), 400

    except Exception as e:
        return jsonify({"error": str(e)}), 500

@app.route("/api/score/<domain>", methods=["GET"])
def get_score_endpoint(domain: str):
    """Get quick score"""
    try:
        result = client.get_score(domain)
        return jsonify({
            "domain": domain,
            "score": result["score"],
            "grade": result["grade"]
        }), 200

    except Exception as e:
        return jsonify({"error": str(e)}), 500

if __name__ == "__main__":
    app.run(debug=True)

TestingΒΆ

Unit Tests with pytestΒΆ

import pytest
from unittest.mock import Mock, patch
import requests

@pytest.fixture
def mock_response():
    """Fixture for mock API response"""
    return {
        "domain": "example.com",
        "score": 85,
        "grade": "Good",
        "issues": []
    }

@pytest.fixture
def client():
    """Fixture for ReputeAPI client"""
    return ReputeAPIClient(api_key="test-key")

def test_check_domain_success(client, mock_response):
    """Test successful domain check"""
    with patch.object(client.session, "get") as mock_get:
        mock_get.return_value.json.return_value = mock_response
        mock_get.return_value.raise_for_status = Mock()

        result = client.check_domain("example.com")

        assert result["domain"] == "example.com"
        assert result["score"] == 85
        mock_get.assert_called_once()

def test_check_domain_invalid_key(client):
    """Test authentication error handling"""
    with patch.object(client.session, "get") as mock_get:
        mock_get.return_value.status_code = 401
        mock_get.return_value.raise_for_status.side_effect = requests.HTTPError()

        with pytest.raises(requests.HTTPError):
            client.check_domain("example.com")

def test_retry_on_rate_limit():
    """Test retry logic on rate limit"""
    with patch("time.sleep"):  # Skip actual sleep
        with patch("requests.get") as mock_get:
            # First call: rate limit
            # Second call: success
            mock_get.side_effect = [
                Mock(status_code=429, headers={"Retry-After": "1"}),
                Mock(status_code=200, json=lambda: {"score": 90})
            ]

            result = check_domain_with_retry("example.com")
            assert result["score"] == 90

Performance TipsΒΆ

1. Connection PoolingΒΆ

from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry

def create_session_with_retries():
    """Create session with connection pooling and retries"""
    session = requests.Session()

    retry_strategy = Retry(
        total=3,
        status_forcelist=[429, 500, 502, 503, 504],
        allowed_methods=["GET", "POST"],
        backoff_factor=1
    )

    adapter = HTTPAdapter(
        max_retries=retry_strategy,
        pool_connections=10,
        pool_maxsize=20
    )

    session.mount("https://", adapter)
    session.mount("http://", adapter)

    return session

# Use in client
class OptimizedReputeAPIClient(ReputeAPIClient):
    def __init__(self, api_key: str, base_url: str = "https://api.reputeapi.com"):
        super().__init__(api_key, base_url)
        self.session = create_session_with_retries()
        self.session.headers.update({"X-API-Key": api_key})

2. Local CachingΒΆ

from functools import lru_cache
from datetime import datetime, timedelta
import hashlib
import json

class CachedReputeAPIClient(ReputeAPIClient):
    """Client with local caching"""

    def __init__(self, *args, cache_ttl=900, **kwargs):
        super().__init__(*args, **kwargs)
        self.cache_ttl = cache_ttl  # 15 minutes
        self._cache = {}

    def _get_cache_key(self, endpoint: str, params: Dict) -> str:
        """Generate cache key"""
        key_data = f"{endpoint}:{json.dumps(params, sort_keys=True)}"
        return hashlib.md5(key_data.encode()).hexdigest()

    def _get_cached(self, key: str) -> Optional[Dict]:
        """Get from cache if not expired"""
        if key in self._cache:
            data, timestamp = self._cache[key]
            if datetime.now() - timestamp < timedelta(seconds=self.cache_ttl):
                return data
        return None

    def _set_cache(self, key: str, data: Dict):
        """Set cache entry"""
        self._cache[key] = (data, datetime.now())

    def check_domain(self, domain: str, **kwargs) -> Dict:
        """Check domain with caching"""
        cache_key = self._get_cache_key("check", {"domain": domain, **kwargs})

        # Try cache first
        cached_data = self._get_cached(cache_key)
        if cached_data and not kwargs.get("refresh"):
            return cached_data

        # Fetch from API
        result = super().check_domain(domain, **kwargs)

        # Cache result
        self._set_cache(cache_key, result)

        return result

Advanced ExamplesΒΆ

Parallel Processing with MultiprocessingΒΆ

from multiprocessing import Pool
from functools import partial

def check_domain_worker(domain: str, api_key: str) -> Dict:
    """Worker function for parallel processing"""
    client = ReputeAPIClient(api_key=api_key)
    try:
        return client.get_score(domain)
    except Exception as e:
        return {"domain": domain, "error": str(e)}

def check_domains_parallel(domains: List[str], workers: int = 4) -> List[Dict]:
    """
    Check multiple domains in parallel using multiprocessing

    Args:
        domains: List of domains to check
        workers: Number of parallel workers

    Returns:
        List of results
    """
    worker_func = partial(check_domain_worker, api_key=API_KEY)

    with Pool(workers) as pool:
        results = pool.map(worker_func, domains)

    return results

# Usage
domains = ["example.com", "google.com", "github.com", "stackoverflow.com"]
results = check_domains_parallel(domains, workers=4)

for result in results:
    if "error" in result:
        print(f"{result['domain']}: Error - {result['error']}")
    else:
        print(f"{result['domain']}: {result['score']}/100")

Next StepsΒΆ


Additional ResourcesΒΆ


SupportΒΆ

Need help with Python integration?