Best PracticesΒΆ

Performance optimization, caching strategies, error handling, and security best practices for ReputeAPI integration.


OverviewΒΆ

This guide covers best practices to help you build robust, efficient, and secure integrations with ReputeAPI.

Topics Covered:

  • Performance optimization
  • Caching strategies
  • Error handling and resilience
  • Security best practices
  • Rate limiting and cost optimization
  • Monitoring and observability

Performance OptimizationΒΆ

1. Use the Right EndpointΒΆ

Choose the appropriate endpoint based on your needs:

Use /api/v1/score when: - You only need the security score - You're checking many domains quickly - You want faster response times (150-300ms vs 400-700ms)

Use /api/v1/check when: - You need detailed SPF/DKIM/DMARC results - You want actionable remediation steps - You need DNS snippets for fixes

Python:

# Fast score check for dashboard
def get_domain_score(domain: str) -> int:
    result = client.get_score(domain)
    return result['score']

# Full check for detailed analysis
def analyze_domain(domain: str) -> Dict:
    result = client.check_domain(domain)
    return {
        'score': result['score'],
        'issues': result['issues'],
        'recommendations': result.get('recommendations', [])
    }

JavaScript:

// Fast score check for dashboard
async function getDomainScore(domain) {
  const result = await client.getScore(domain);
  return result.score;
}

// Full check for detailed analysis
async function analyzeDomain(domain) {
  const result = await client.checkDomain(domain);
  return {
    score: result.score,
    issues: result.issues,
    recommendations: result.recommendations || []
  };
}

2. Implement Connection PoolingΒΆ

Reuse HTTP connections for better performance:

Python:

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

    def create_optimized_session():
        """Create session with connection pooling"""
        session = requests.Session()

        # Configure retries
        retry_strategy = Retry(
            total=3,
            status_forcelist=[429, 500, 502, 503, 504],
            allowed_methods=["GET", "POST"],
            backoff_factor=1
        )

        # Configure connection pooling
        adapter = HTTPAdapter(
            max_retries=retry_strategy,
            pool_connections=10,  # Pool size
            pool_maxsize=20,      # Max pool size
            pool_block=False
        )

        session.mount("https://", adapter)
        session.mount("http://", adapter)

        return session

    # Use in client
    class OptimizedReputeAPIClient:
        def __init__(self, api_key: str):
            self.api_key = api_key
            self.session = create_optimized_session()
            self.session.headers.update({"X-API-Key": api_key})
    ```

**JavaScript:**
```javascript
const axios = require('axios');
const https = require('https');

    // Create HTTP agent with connection pooling
    const httpsAgent = new https.Agent({
      keepAlive: true,
      keepAliveMsecs: 30000,
      maxSockets: 50,
      maxFreeSockets: 10
    });

    const client = axios.create({
      baseURL: 'https://api.reputeapi.com',
      httpsAgent,
      headers: {
        'X-API-Key': process.env.REPUTE_API_KEY
      }
    });
    ```

### 3. Batch Requests Efficiently

When checking multiple domains, use concurrent requests with rate limiting:

**Python:**
```python
import asyncio
import httpx
from typing import List, Dict

    async def check_domains_concurrent(
        domains: List[str],
        max_concurrent: int = 5
    ) -> List[Dict]:
        """
        Check multiple domains concurrently with rate limiting

        Args:
            domains: List of domains to check
            max_concurrent: Maximum concurrent requests

        Returns:
            List of results
        """
        semaphore = asyncio.Semaphore(max_concurrent)

        async def check_with_limit(domain: str):
            async with semaphore:
                async with httpx.AsyncClient() as client:
                    response = await client.get(
                        f"{BASE_URL}/api/v1/score",
                        params={"domain": domain},
                        headers={"X-API-Key": API_KEY}
                    )
                    return response.json()

        results = await asyncio.gather(
            *[check_with_limit(domain) for domain in domains],
            return_exceptions=True
        )

        return results
    ```

**JavaScript:**
```javascript
const pLimit = require('p-limit');

    async function checkDomainsConcurrent(domains, maxConcurrent = 5) {
      const limit = pLimit(maxConcurrent);

      const results = await Promise.allSettled(
        domains.map(domain =>
          limit(() => client.getScore(domain))
        )
      );

      return results.map((result, index) => ({
        domain: domains[index],
        ...(result.status === 'fulfilled'
          ? { success: true, data: result.value }
          : { success: false, error: result.reason.message }
        )
      }));
    }
    ```

---

## Caching Strategies

### 1. Respect API Caching

The API caches results for 15 minutes. Leverage this:

```python
# Don't bypass cache unless necessary
result = client.check_domain(domain)  # Uses cache if available

# Only bypass cache when you need fresh data
result = client.check_domain(domain, refresh=True)  # Forces fresh lookup

2. Implement Application-Level CachingΒΆ

Add your own caching layer for frequently accessed data:

Python (Redis):

import redis
import json
from datetime import timedelta

    class CachedReputeAPIClient:
        """Client with Redis caching"""

        def __init__(self, api_key: str, redis_url: str):
            self.api = ReputeAPIClient(api_key)
            self.redis = redis.from_url(redis_url)
            self.cache_ttl = 900  # 15 minutes

        def check_domain(self, domain: str, force_refresh: bool = False) -> Dict:
            """Check domain with Redis caching"""
            cache_key = f"repute:check:{domain}"

            # Try cache first
            if not force_refresh:
                cached = self.redis.get(cache_key)
                if cached:
                    return json.loads(cached)

            # Fetch from API
            result = self.api.check_domain(domain)

            # Cache result
            self.redis.setex(
                cache_key,
                self.cache_ttl,
                json.dumps(result)
            )

            return result

        def invalidate_cache(self, domain: str):
            """Invalidate cached result for domain"""
            cache_key = f"repute:check:{domain}"
            self.redis.delete(cache_key)
    ```

**JavaScript (Redis):**
```javascript
const Redis = require('ioredis');

    class CachedReputeAPIClient {
      constructor(apiKey, redisUrl) {
        this.api = new ReputeAPIClient(apiKey);
        this.redis = new Redis(redisUrl);
        this.cacheTtl = 900; // 15 minutes
      }

      async checkDomain(domain, forceRefresh = false) {
        const cacheKey = `repute:check:${domain}`;

        // Try cache first
        if (!forceRefresh) {
          const cached = await this.redis.get(cacheKey);
          if (cached) {
            return JSON.parse(cached);
          }
        }

        // Fetch from API
        const result = await this.api.checkDomain(domain);

        // Cache result
        await this.redis.setex(
          cacheKey,
          this.cacheTtl,
          JSON.stringify(result)
        );

        return result;
      }

      async invalidateCache(domain) {
        const cacheKey = `repute:check:${domain}`;
        await this.redis.del(cacheKey);
      }
    }
    ```

### 3. Database Caching

For applications that need historical tracking:

```sql
CREATE TABLE domain_cache (
    domain VARCHAR(255) PRIMARY KEY,
    result JSONB NOT NULL,
    cached_at TIMESTAMP NOT NULL,
    expires_at TIMESTAMP NOT NULL
);

-- Index for cleanup of expired entries
CREATE INDEX idx_expires_at ON domain_cache(expires_at);

-- Query to get from cache
SELECT result
FROM domain_cache
WHERE domain = 'example.com'
  AND expires_at > NOW();

-- Insert/update cache
INSERT INTO domain_cache (domain, result, cached_at, expires_at)
VALUES ('example.com', '{"score": 85}', NOW(), NOW() + INTERVAL '15 minutes')
ON CONFLICT (domain)
DO UPDATE SET
    result = EXCLUDED.result,
    cached_at = EXCLUDED.cached_at,
    expires_at = EXCLUDED.expires_at;

4. Cache Invalidation StrategiesΒΆ

Implement intelligent cache invalidation:

class SmartCacheManager:
    """Intelligent cache management"""

    def __init__(self, cache, api_client):
        self.cache = cache
        self.api = api_client

    def get_result(self, domain: str) -> Dict:
        """Get result with smart caching"""
        cached = self.cache.get(domain)

        if cached:
            # Check if score is critical and cache is old
            if self._is_critical(cached) and self._is_stale(cached):
                # Refresh in background
                self._refresh_async(domain)

            return cached

        return self._fetch_and_cache(domain)

    def _is_critical(self, result: Dict) -> bool:
        """Check if result indicates critical issues"""
        return result['score'] < 50

    def _is_stale(self, cached: Dict) -> bool:
        """Check if cached data is older than 5 minutes"""
        cached_at = datetime.fromisoformat(cached['_cached_at'])
        return datetime.now() - cached_at > timedelta(minutes=5)

    def _refresh_async(self, domain: str):
        """Refresh cache in background"""
        # Queue background job to refresh
        refresh_cache_task.delay(domain)

Error Handling and ResilienceΒΆ

1. Comprehensive Error HandlingΒΆ

Handle all error types appropriately:

class ReputeAPIError(Exception):
    """Base exception for ReputeAPI errors"""
    pass

class AuthenticationError(ReputeAPIError):
    """Invalid API key"""
    pass

class RateLimitError(ReputeAPIError):
    """Rate limit exceeded"""
    def __init__(self, message, retry_after=None):
        super().__init__(message)
        self.retry_after = retry_after

class ValidationError(ReputeAPIError):
    """Invalid request parameters"""
    pass

class ServerError(ReputeAPIError):
    """Server-side error"""
    pass

class NetworkError(ReputeAPIError):
    """Network connectivity error"""
    pass

def check_domain_safe(domain: str) -> Optional[Dict]:
    """Check domain with comprehensive error handling"""
    try:
        response = requests.get(
            f"{BASE_URL}/api/v1/check",
            params={"domain": domain},
            headers={"X-API-Key": API_KEY},
            timeout=10
        )

        if response.status_code == 401:
            raise AuthenticationError("Invalid API key")

        elif response.status_code == 429:
            retry_after = int(response.headers.get("Retry-After", 60))
            raise RateLimitError("Rate limit exceeded", retry_after=retry_after)

        elif response.status_code == 400:
            error = response.json()
            raise ValidationError(error.get('message', 'Invalid request'))

        elif response.status_code >= 500:
            raise ServerError(f"Server error: {response.status_code}")

        response.raise_for_status()
        return response.json()

    except AuthenticationError as e:
        logger.error(f"Authentication failed: {e}")
        # Alert ops team - API key may be invalid
        alert_ops("Authentication failed")
        raise

    except RateLimitError as e:
        logger.warning(f"Rate limit hit, retry after {e.retry_after}s")
        # Implement backoff or queue for later
        return None

    except ValidationError as e:
        logger.error(f"Validation error for {domain}: {e}")
        # This is likely a bug in our code
        return None

    except ServerError as e:
        logger.error(f"Server error: {e}")
        # Retry with exponential backoff
        raise

    except requests.Timeout:
        logger.error(f"Request timeout for {domain}")
        raise NetworkError("Request timed out")

    except requests.ConnectionError:
        logger.error("Connection error")
        raise NetworkError("Connection failed")

    except Exception as e:
        logger.error(f"Unexpected error: {e}")
        raise

2. Circuit Breaker PatternΒΆ

Prevent cascading failures:

from datetime import datetime, timedelta

class CircuitBreaker:
    """Circuit breaker for API calls"""

    def __init__(
        self,
        failure_threshold: int = 5,
        timeout: int = 60,
        success_threshold: int = 2
    ):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.success_threshold = success_threshold

        self.failures = 0
        self.successes = 0
        self.last_failure_time = None
        self.state = 'closed'  # closed, open, half-open

    def call(self, func, *args, **kwargs):
        """Execute function with circuit breaker"""
        if self.state == 'open':
            if self._should_attempt_reset():
                self.state = 'half-open'
            else:
                raise Exception("Circuit breaker is OPEN")

        try:
            result = func(*args, **kwargs)
            self._on_success()
            return result

        except Exception as e:
            self._on_failure()
            raise

    def _on_success(self):
        """Handle successful call"""
        self.failures = 0

        if self.state == 'half-open':
            self.successes += 1
            if self.successes >= self.success_threshold:
                self.state = 'closed'
                self.successes = 0

    def _on_failure(self):
        """Handle failed call"""
        self.failures += 1
        self.last_failure_time = datetime.now()
        self.successes = 0

        if self.failures >= self.failure_threshold:
            self.state = 'open'

    def _should_attempt_reset(self) -> bool:
        """Check if we should attempt to reset circuit"""
        return (
            self.last_failure_time and
            datetime.now() - self.last_failure_time > timedelta(seconds=self.timeout)
        )

# Usage
circuit_breaker = CircuitBreaker(failure_threshold=5, timeout=60)

def check_domain_with_circuit_breaker(domain: str):
    return circuit_breaker.call(
        lambda: client.check_domain(domain)
    )

3. Retry with Exponential BackoffΒΆ

import time
from functools import wraps

def retry_with_backoff(
    max_retries: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    exponential_base: float = 2.0
):
    """
    Decorator for retry with exponential backoff

    Args:
        max_retries: Maximum number of retry attempts
        base_delay: Initial delay in seconds
        max_delay: Maximum delay in seconds
        exponential_base: Base for exponential calculation
    """
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            retries = 0
            delay = base_delay

            while retries <= max_retries:
                try:
                    return func(*args, **kwargs)

                except RateLimitError as e:
                    # Use server-provided retry_after
                    delay = e.retry_after if e.retry_after else delay
                    logger.warning(f"Rate limited, waiting {delay}s")

                except (ServerError, NetworkError) as e:
                    retries += 1

                    if retries > max_retries:
                        logger.error(f"Max retries exceeded: {e}")
                        raise

                    logger.warning(
                        f"Error: {e}. Retry {retries}/{max_retries} "
                        f"after {delay}s"
                    )

                except (AuthenticationError, ValidationError):
                    # Don't retry these
                    raise

                time.sleep(delay)

                # Exponential backoff with jitter
                delay = min(
                    delay * exponential_base + random.uniform(0, 1),
                    max_delay
                )

            raise Exception(f"Failed after {max_retries} retries")

        return wrapper
    return decorator

# Usage
@retry_with_backoff(max_retries=3, base_delay=2)
def check_domain_with_retry(domain: str):
    return client.check_domain(domain)

Security Best PracticesΒΆ

1. Secure API Key StorageΒΆ

Never hardcode API keys:

Environment Variables:

# .env file
REPUTE_API_KEY=sk_live_abc123...

    # Load in application
    from dotenv import load_dotenv
    import os

    load_dotenv()
    API_KEY = os.getenv("REPUTE_API_KEY")
    ```

**Secrets Manager (AWS):**
```python
import boto3
import json

    def get_api_key():
        """Get API key from AWS Secrets Manager"""
        client = boto3.client('secretsmanager')

        response = client.get_secret_value(
            SecretId='prod/reputeapi/apikey'
        )

        secret = json.loads(response['SecretString'])
        return secret['api_key']

    API_KEY = get_api_key()
    ```

**Vault (HashiCorp):**
```python
import hvac

    def get_api_key():
        """Get API key from Vault"""
        client = hvac.Client(url='https://vault.example.com')
        client.auth.approle.login(
            role_id=os.getenv('VAULT_ROLE_ID'),
            secret_id=os.getenv('VAULT_SECRET_ID')
        )

        secret = client.secrets.kv.v2.read_secret_version(
            path='reputeapi/apikey'
        )

        return secret['data']['data']['api_key']

    API_KEY = get_api_key()
    ```

### 2. Validate Input

Always validate domain inputs:

```python
import re
from urllib.parse import urlparse

def validate_domain(domain: str) -> str:
    """
    Validate and sanitize domain input

    Args:
        domain: Domain to validate

    Returns:
        Sanitized domain

    Raises:
        ValueError: If domain is invalid
    """
    # Remove whitespace
    domain = domain.strip()

    # Remove protocol if present
    if '://' in domain:
        parsed = urlparse(domain)
        domain = parsed.netloc or parsed.path

    # Remove path, query, fragment
    domain = domain.split('/')[0].split('?')[0].split('#')[0]

    # Remove port
    domain = domain.split(':')[0]

    # Validate format
    domain_pattern = r'^[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(\.[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)*$'

    if not re.match(domain_pattern, domain):
        raise ValueError(f"Invalid domain format: {domain}")

    # Check length
    if len(domain) > 253:
        raise ValueError(f"Domain too long: {domain}")

    return domain.lower()

# Usage
try:
    clean_domain = validate_domain(user_input)
    result = client.check_domain(clean_domain)
except ValueError as e:
    return {"error": str(e)}

3. Rate Limit Your ApplicationΒΆ

Implement client-side rate limiting:

from threading import Lock
from collections import deque
import time

class RateLimiter:
    """Client-side rate limiter"""

    def __init__(self, max_calls: int, period: int):
        """
        Initialize rate limiter

        Args:
            max_calls: Maximum calls allowed
            period: Time period in seconds
        """
        self.max_calls = max_calls
        self.period = period
        self.calls = deque()
        self.lock = Lock()

    def allow_request(self) -> bool:
        """Check if request is allowed"""
        with self.lock:
            now = time.time()

            # Remove old calls outside the window
            while self.calls and self.calls[0] < now - self.period:
                self.calls.popleft()

            if len(self.calls) < self.max_calls:
                self.calls.append(now)
                return True

            return False

    def wait_if_needed(self):
        """Wait if rate limit is hit"""
        while not self.allow_request():
            time.sleep(0.1)

# Usage
rate_limiter = RateLimiter(max_calls=10, period=60)  # 10 per minute

def check_domain_rate_limited(domain: str):
    rate_limiter.wait_if_needed()
    return client.check_domain(domain)

Cost OptimizationΒΆ

1. Monitor UsageΒΆ

Track API usage to optimize costs:

class UsageTracker:
    """Track API usage"""

    def __init__(self, db_connection):
        self.db = db_connection

    def track_request(
        self,
        endpoint: str,
        domain: str,
        cached: bool,
        response_time: float
    ):
        """Track API request"""
        query = """
            INSERT INTO api_usage (
                endpoint, domain, cached, response_time, timestamp
            ) VALUES (%s, %s, %s, %s, NOW())
        """
        self.db.execute(query, (endpoint, domain, cached, response_time))

    def get_usage_stats(self, days: int = 30) -> Dict:
        """Get usage statistics"""
        query = """
            SELECT
                COUNT(*) as total_requests,
                SUM(CASE WHEN cached THEN 1 ELSE 0 END) as cached_requests,
                COUNT(DISTINCT domain) as unique_domains,
                AVG(response_time) as avg_response_time,
                DATE(timestamp) as date
            FROM api_usage
            WHERE timestamp >= NOW() - INTERVAL '%s days'
            GROUP BY DATE(timestamp)
            ORDER BY date
        """
        return self.db.query_all(query, (days,))

    def get_top_domains(self, limit: int = 10) -> List[Dict]:
        """Get most checked domains"""
        query = """
            SELECT domain, COUNT(*) as check_count
            FROM api_usage
            WHERE timestamp >= NOW() - INTERVAL '30 days'
            GROUP BY domain
            ORDER BY check_count DESC
            LIMIT %s
        """
        return self.db.query_all(query, (limit,))

2. Use Score Endpoint for Frequent ChecksΒΆ

The score endpoint is faster and uses less data:

def check_for_dashboard(domain: str) -> Dict:
    """Optimized check for dashboard display"""
    # Use score endpoint for initial load
    score_result = client.get_score(domain)

    # Only fetch full details if user clicks for more info
    return {
        'domain': domain,
        'score': score_result['score'],
        'grade': score_result['grade'],
        'top_issues_count': len(score_result.get('top_issues', []))
    }

def get_full_details(domain: str) -> Dict:
    """Get full details only when needed"""
    return client.check_domain(domain)

3. Implement Smart RefreshΒΆ

Only refresh when necessary:

def should_refresh(domain: str, last_check: datetime) -> bool:
    """Determine if domain should be refreshed"""
    now = datetime.now()
    age = now - last_check

    # Always refresh if older than 24 hours
    if age > timedelta(hours=24):
        return True

    # Refresh critical domains more frequently
    score = get_cached_score(domain)
    if score and score < 50:
        return age > timedelta(hours=1)

    # Normal domains: refresh if older than 6 hours
    return age > timedelta(hours=6)

Monitoring and ObservabilityΒΆ

1. Logging Best PracticesΒΆ

import logging
import json

# Configure structured logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s %(levelname)s %(name)s %(message)s'
)

logger = logging.getLogger(__name__)

def check_domain_with_logging(domain: str) -> Dict:
    """Check domain with comprehensive logging"""
    start_time = time.time()

    logger.info("Starting domain check", extra={
        'domain': domain,
        'operation': 'check_domain'
    })

    try:
        result = client.check_domain(domain)

        elapsed = time.time() - start_time

        logger.info("Domain check successful", extra={
            'domain': domain,
            'score': result['score'],
            'elapsed_ms': elapsed * 1000,
            'cached': result.get('meta', {}).get('cached', False)
        })

        return result

    except Exception as e:
        elapsed = time.time() - start_time

        logger.error("Domain check failed", extra={
            'domain': domain,
            'error': str(e),
            'error_type': type(e).__name__,
            'elapsed_ms': elapsed * 1000
        }, exc_info=True)

        raise

2. Metrics CollectionΒΆ

from prometheus_client import Counter, Histogram, Gauge

# Define metrics
api_requests_total = Counter(
    'repute_api_requests_total',
    'Total API requests',
    ['endpoint', 'status']
)

api_request_duration = Histogram(
    'repute_api_request_duration_seconds',
    'API request duration',
    ['endpoint']
)

api_cache_hit_rate = Gauge(
    'repute_api_cache_hit_rate',
    'Cache hit rate'
)

def check_domain_with_metrics(domain: str) -> Dict:
    """Check domain with metrics collection"""
    endpoint = '/api/v1/check'

    with api_request_duration.labels(endpoint=endpoint).time():
        try:
            result = client.check_domain(domain)

            api_requests_total.labels(
                endpoint=endpoint,
                status='success'
            ).inc()

            # Track cache hits
            if result.get('meta', {}).get('cached'):
                api_cache_hit_rate.set(1)
            else:
                api_cache_hit_rate.set(0)

            return result

        except Exception as e:
            api_requests_total.labels(
                endpoint=endpoint,
                status='error'
            ).inc()
            raise

3. Health ChecksΒΆ

def health_check() -> Dict:
    """Check API health"""
    try:
        # Simple request to verify connectivity
        client.get_usage()

        return {
            'status': 'healthy',
            'api': 'reachable',
            'timestamp': datetime.now().isoformat()
        }

    except Exception as e:
        return {
            'status': 'unhealthy',
            'api': 'unreachable',
            'error': str(e),
            'timestamp': datetime.now().isoformat()
        }

Testing Best PracticesΒΆ

1. Mock API ResponsesΒΆ

import pytest
from unittest.mock import Mock, patch

@pytest.fixture
def mock_api_response():
    return {
        'domain': 'example.com',
        'score': 85,
        'grade': 'Good',
        'issues': []
    }

def test_check_domain_success(mock_api_response):
    """Test successful domain check"""
    with patch('requests.get') as mock_get:
        mock_get.return_value.json.return_value = mock_api_response
        mock_get.return_value.status_code = 200

        result = check_domain('example.com')

        assert result['domain'] == 'example.com'
        assert result['score'] == 85

2. Integration TestsΒΆ

@pytest.mark.integration
def test_api_integration():
    """Test actual API integration"""
    # Use test API key
    client = ReputeAPIClient(api_key=TEST_API_KEY)

    result = client.check_domain('google.com')

    assert 'score' in result
    assert 0 <= result['score'] <= 100
    assert 'issues' in result

Quick Reference ChecklistΒΆ

  • [ ] Use appropriate endpoint (score vs check)
  • [ ] Implement connection pooling
  • [ ] Add caching layer (Redis/database)
  • [ ] Handle all error types
  • [ ] Implement retry with exponential backoff
  • [ ] Use circuit breaker for resilience
  • [ ] Store API keys securely
  • [ ] Validate all inputs
  • [ ] Implement client-side rate limiting
  • [ ] Track API usage
  • [ ] Add comprehensive logging
  • [ ] Collect metrics
  • [ ] Set up monitoring and alerts
  • [ ] Write tests (unit and integration)

Next StepsΒΆ


SupportΒΆ

Need help with best practices?