Integration PatternsΒΆ

Architecture patterns and design strategies for integrating ReputeAPI into your applications.


OverviewΒΆ

This guide covers common integration patterns for different use cases, from simple validation workflows to complex multi-tenant systems.

What You'll Learn:

  • Architecture patterns for various use cases
  • Data flow and caching strategies
  • Event-driven integration approaches
  • Multi-tenant and SaaS patterns
  • Real-time monitoring architectures

Pattern 1: Simple Validation ServiceΒΆ

Use Case: Basic domain validation during user onboarding or configuration.

ArchitectureΒΆ

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   Client    │─────▢│   Your API  │─────▢│ ReputeAPI   β”‚
β”‚ Application β”‚      β”‚   Service   β”‚      β”‚             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                            β”‚
                            β–Ό
                     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                     β”‚  Database   β”‚
                     β”‚   (Cache)   β”‚
                     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

ImplementationΒΆ

Python:

from datetime import datetime, timedelta
from typing import Optional, Dict

    class ValidationService:
        """Simple validation service with caching"""

        def __init__(self, api_client, db_connection):
            self.api = api_client
            self.db = db_connection
            self.cache_ttl = timedelta(minutes=15)

        def validate_domain(self, domain: str, force_refresh: bool = False) -> Dict:
            """
            Validate domain with database caching

            Args:
                domain: Domain to validate
                force_refresh: Bypass cache

            Returns:
                Validation result
            """
            # Check cache first
            if not force_refresh:
                cached = self._get_from_cache(domain)
                if cached:
                    return cached

            # Fetch from API
            result = self.api.check_domain(domain)

            # Save to cache
            self._save_to_cache(domain, result)

            return result

        def _get_from_cache(self, domain: str) -> Optional[Dict]:
            """Get cached result if not expired"""
            query = """
                SELECT result, cached_at
                FROM domain_validations
                WHERE domain = %s
                ORDER BY cached_at DESC
                LIMIT 1
            """
            row = self.db.query_one(query, (domain,))

            if row:
                result, cached_at = row
                if datetime.now() - cached_at < self.cache_ttl:
                    return result

            return None

        def _save_to_cache(self, domain: str, result: Dict):
            """Save result to cache"""
            query = """
                INSERT INTO domain_validations (domain, result, cached_at)
                VALUES (%s, %s, %s)
            """
            self.db.execute(query, (domain, result, datetime.now()))
    ```

**JavaScript:**
```javascript
class ValidationService {
  constructor(apiClient, database) {
    this.api = apiClient;
    this.db = database;
    this.cacheTtl = 15 * 60 * 1000; // 15 minutes
  }

      async validateDomain(domain, forceRefresh = false) {
        // Check cache first
        if (!forceRefresh) {
          const cached = await this.getFromCache(domain);
          if (cached) return cached;
        }

        // Fetch from API
        const result = await this.api.checkDomain(domain);

        // Save to cache
        await this.saveToCache(domain, result);

        return result;
      }

      async getFromCache(domain) {
        const query = `
          SELECT result, cached_at
          FROM domain_validations
          WHERE domain = $1
          ORDER BY cached_at DESC
          LIMIT 1
        `;

        const row = await this.db.queryOne(query, [domain]);

        if (row) {
          const cacheAge = Date.now() - row.cached_at.getTime();
          if (cacheAge < this.cacheTtl) {
            return row.result;
          }
        }

        return null;
      }

      async saveToCache(domain, result) {
        const query = `
          INSERT INTO domain_validations (domain, result, cached_at)
          VALUES ($1, $2, $3)
        `;

        await this.db.execute(query, [domain, result, new Date()]);
      }
    }
    ```

### Database Schema

```sql
CREATE TABLE domain_validations (
    id SERIAL PRIMARY KEY,
    domain VARCHAR(255) NOT NULL,
    result JSONB NOT NULL,
    cached_at TIMESTAMP NOT NULL,
    INDEX idx_domain_cached (domain, cached_at)
);


Pattern 2: Background Job ProcessingΒΆ

Use Case: Validate domains asynchronously without blocking user requests.

ArchitectureΒΆ

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   Client    │─────▢│   Web API   β”‚      β”‚  Job Queue  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β”‚   (Redis)   β”‚
                            β”‚              β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                            β”‚                     β”‚
                            β–Ό                     β–Ό
                     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                     β”‚  Database   β”‚      β”‚   Worker    β”‚
                     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β”‚  Processes  β”‚
                                          β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                                                 β”‚
                                                 β–Ό
                                          β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                                          β”‚ ReputeAPI   β”‚
                                          β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

ImplementationΒΆ

Python (Celery):

from celery import Celery
from datetime import datetime

    app = Celery('tasks', broker='redis://localhost:6379')

    @app.task(bind=True, max_retries=3)
    def validate_domain_task(self, domain: str, user_id: int):
        """
        Background task to validate domain

        Args:
            domain: Domain to validate
            user_id: User who requested validation
        """
        try:
            # Update status to processing
            db.update_validation_status(domain, user_id, 'processing')

            # Call API
            client = ReputeAPIClient(api_key=settings.REPUTE_API_KEY)
            result = client.check_domain(domain)

            # Save result
            db.save_validation_result(
                domain=domain,
                user_id=user_id,
                result=result,
                status='completed',
                completed_at=datetime.now()
            )

            # Trigger webhook if configured
            webhook_url = db.get_user_webhook(user_id)
            if webhook_url:
                send_webhook(webhook_url, {
                    'event': 'validation.completed',
                    'domain': domain,
                    'result': result
                })

        except Exception as e:
            # Update status to failed
            db.update_validation_status(domain, user_id, 'failed', str(e))

            # Retry with exponential backoff
            raise self.retry(exc=e, countdown=2 ** self.request.retries)


    # API endpoint
    @app.route('/api/validate', methods=['POST'])
    def queue_validation():
        """Queue domain validation"""
        data = request.json
        domain = data.get('domain')
        user_id = request.user.id

        # Create validation record
        validation_id = db.create_validation(
            domain=domain,
            user_id=user_id,
            status='queued'
        )

        # Queue background task
        validate_domain_task.delay(domain, user_id)

        return jsonify({
            'validation_id': validation_id,
            'status': 'queued',
            'message': 'Validation queued for processing'
        })
    ```

**JavaScript (Bull):**
```javascript
const Queue = require('bull');
const validationQueue = new Queue('domain-validation', 'redis://localhost:6379');

    // Worker process
    validationQueue.process(async (job) => {
      const { domain, userId } = job.data;

      try {
        // Update status
        await db.updateValidationStatus(domain, userId, 'processing');

        // Call API
        const client = new ReputeAPIClient(process.env.REPUTE_API_KEY);
        const result = await client.checkDomain(domain);

        // Save result
        await db.saveValidationResult({
          domain,
          userId,
          result,
          status: 'completed',
          completedAt: new Date()
        });

        // Trigger webhook
        const webhookUrl = await db.getUserWebhook(userId);
        if (webhookUrl) {
          await sendWebhook(webhookUrl, {
            event: 'validation.completed',
            domain,
            result
          });
        }

        return result;

      } catch (error) {
        await db.updateValidationStatus(
          domain,
          userId,
          'failed',
          error.message
        );
        throw error;
      }
    });

    // API endpoint
    app.post('/api/validate', async (req, res) => {
      const { domain } = req.body;
      const userId = req.user.id;

      // Create validation record
      const validationId = await db.createValidation({
        domain,
        userId,
        status: 'queued'
      });

      // Queue job
      await validationQueue.add({
        domain,
        userId
      });

      res.json({
        validationId,
        status: 'queued',
        message: 'Validation queued for processing'
      });
    });
    ```

---

## Pattern 3: Event-Driven Architecture

**Use Case:** React to validation events across multiple services.

### Architecture
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ Service │─────▢│ Message │─────▢│ Analytics β”‚ β”‚ A β”‚ β”‚ Broker β”‚ β”‚ Service β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ (Kafka) β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Άβ”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ β”‚ Alert β”‚ β”‚ β”‚ Service β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Άβ”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ Webhook β”‚ β”‚ Service β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
### Implementation

**Python (Kafka):**
```python
from kafka import KafkaProducer, KafkaConsumer
import json

    class ValidationEventProducer:
        """Produce validation events"""

        def __init__(self, bootstrap_servers):
            self.producer = KafkaProducer(
                bootstrap_servers=bootstrap_servers,
                value_serializer=lambda v: json.dumps(v).encode('utf-8')
            )

        def emit_validation_started(self, domain: str, user_id: int):
            """Emit validation started event"""
            event = {
                'event_type': 'validation.started',
                'domain': domain,
                'user_id': user_id,
                'timestamp': datetime.now().isoformat()
            }
            self.producer.send('domain-validations', value=event)

        def emit_validation_completed(
            self,
            domain: str,
            user_id: int,
            result: Dict
        ):
            """Emit validation completed event"""
            event = {
                'event_type': 'validation.completed',
                'domain': domain,
                'user_id': user_id,
                'score': result['score'],
                'grade': result['grade'],
                'issues_count': len(result.get('issues', [])),
                'timestamp': datetime.now().isoformat()
            }
            self.producer.send('domain-validations', value=event)


    class AlertService:
        """Alert service consuming validation events"""

        def __init__(self, bootstrap_servers):
            self.consumer = KafkaConsumer(
                'domain-validations',
                bootstrap_servers=bootstrap_servers,
                value_deserializer=lambda m: json.loads(m.decode('utf-8')),
                group_id='alert-service'
            )

        def process_events(self):
            """Process validation events"""
            for message in self.consumer:
                event = message.value

                if event['event_type'] == 'validation.completed':
                    score = event['score']

                    # Alert on low scores
                    if score < 70:
                        self.send_alert(
                            event['domain'],
                            f"Low security score: {score}/100"
                        )

                    # Alert on critical issues
                    if event.get('critical_issues', 0) > 0:
                        self.send_alert(
                            event['domain'],
                            f"Critical issues found"
                        )

        def send_alert(self, domain: str, message: str):
            """Send alert (email, Slack, etc.)"""
            print(f"ALERT [{domain}]: {message}")
            # Implement your alert logic here
    ```

**JavaScript (RabbitMQ):**
```javascript
const amqp = require('amqplib');

    class ValidationEventProducer {
      constructor(rabbitUrl) {
        this.rabbitUrl = rabbitUrl;
      }

      async connect() {
        this.connection = await amqp.connect(this.rabbitUrl);
        this.channel = await this.connection.createChannel();
        await this.channel.assertExchange('domain-validations', 'topic');
      }

      async emitValidationStarted(domain, userId) {
        const event = {
          eventType: 'validation.started',
          domain,
          userId,
          timestamp: new Date().toISOString()
        };

        this.channel.publish(
          'domain-validations',
          'validation.started',
          Buffer.from(JSON.stringify(event))
        );
      }

      async emitValidationCompleted(domain, userId, result) {
        const event = {
          eventType: 'validation.completed',
          domain,
          userId,
          score: result.score,
          grade: result.grade,
          issuesCount: result.issues?.length || 0,
          timestamp: new Date().toISOString()
        };

        this.channel.publish(
          'domain-validations',
          'validation.completed',
          Buffer.from(JSON.stringify(event))
        );
      }
    }

    class AlertService {
      constructor(rabbitUrl) {
        this.rabbitUrl = rabbitUrl;
      }

      async connect() {
        this.connection = await amqp.connect(this.rabbitUrl);
        this.channel = await this.connection.createChannel();

        await this.channel.assertExchange('domain-validations', 'topic');
        const queue = await this.channel.assertQueue('alerts');

        await this.channel.bindQueue(
          queue.queue,
          'domain-validations',
          'validation.completed'
        );

        this.channel.consume(queue.queue, this.handleEvent.bind(this));
      }

      async handleEvent(message) {
        const event = JSON.parse(message.content.toString());

        if (event.eventType === 'validation.completed') {
          const { domain, score } = event;

          // Alert on low scores
          if (score < 70) {
            await this.sendAlert(domain, `Low security score: ${score}/100`);
          }
        }

        this.channel.ack(message);
      }

      async sendAlert(domain, message) {
        console.log(`ALERT [${domain}]: ${message}`);
        // Implement your alert logic here
      }
    }
    ```

---

## Pattern 4: Multi-Tenant SaaS

**Use Case:** SaaS application managing multiple customer domains.

### Architecture
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ Multi-Tenant Application β”‚ β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€ β”‚ Tenant A β”‚ Tenant B β”‚ Tenant C β”‚ β”‚ Domains β”‚ Domains β”‚ Domains β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β” β”‚ Tenant β”‚ β”‚ Isolation β”‚ β”‚ Layer β”‚ β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β” β”‚ Rate Limit β”‚ β”‚ Manager β”‚ β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β” β”‚ ReputeAPI β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
### Implementation

```python
from typing import Dict, List, Optional
from datetime import datetime, timedelta
import hashlib

class MultiTenantValidationService:
    """Multi-tenant validation service with isolation"""

    def __init__(self, api_client, db_connection):
        self.api = api_client
        self.db = db_connection
        self.rate_limiter = TenantRateLimiter()

    def validate_domain(
        self,
        tenant_id: str,
        domain: str,
        force_refresh: bool = False
    ) -> Dict:
        """
        Validate domain for a specific tenant

        Args:
            tenant_id: Tenant identifier
            domain: Domain to validate
            force_refresh: Bypass cache

        Returns:
            Validation result
        """
        # Verify tenant access to domain
        if not self._verify_tenant_domain(tenant_id, domain):
            raise PermissionError(f"Tenant {tenant_id} cannot access domain {domain}")

        # Check rate limits for this tenant
        if not self.rate_limiter.allow_request(tenant_id):
            raise RateLimitExceeded(f"Rate limit exceeded for tenant {tenant_id}")

        # Check cache with tenant isolation
        if not force_refresh:
            cached = self._get_from_cache(tenant_id, domain)
            if cached:
                return cached

        # Fetch from API
        result = self.api.check_domain(domain)

        # Save to cache with tenant isolation
        self._save_to_cache(tenant_id, domain, result)

        # Track usage for this tenant
        self._track_usage(tenant_id, domain)

        return result

    def get_tenant_domains(self, tenant_id: str) -> List[Dict]:
        """Get all domains for a tenant with latest scores"""
        query = """
            SELECT
                d.domain,
                d.added_at,
                v.result->>'score' as score,
                v.result->>'grade' as grade,
                v.cached_at as last_checked
            FROM tenant_domains d
            LEFT JOIN LATERAL (
                SELECT result, cached_at
                FROM domain_validations
                WHERE domain = d.domain
                ORDER BY cached_at DESC
                LIMIT 1
            ) v ON true
            WHERE d.tenant_id = %s
            ORDER BY d.domain
        """
        return self.db.query_all(query, (tenant_id,))

    def get_tenant_usage(
        self,
        tenant_id: str,
        start_date: datetime,
        end_date: datetime
    ) -> Dict:
        """Get usage statistics for a tenant"""
        query = """
            SELECT
                COUNT(*) as total_validations,
                COUNT(DISTINCT domain) as unique_domains,
                DATE(validated_at) as date
            FROM validation_usage
            WHERE tenant_id = %s
              AND validated_at BETWEEN %s AND %s
            GROUP BY DATE(validated_at)
            ORDER BY date
        """
        results = self.db.query_all(query, (tenant_id, start_date, end_date))

        return {
            'tenant_id': tenant_id,
            'period': {
                'start': start_date.isoformat(),
                'end': end_date.isoformat()
            },
            'usage': results
        }

    def _verify_tenant_domain(self, tenant_id: str, domain: str) -> bool:
        """Verify tenant has access to domain"""
        query = """
            SELECT COUNT(*) FROM tenant_domains
            WHERE tenant_id = %s AND domain = %s
        """
        count = self.db.query_one(query, (tenant_id, domain))[0]
        return count > 0

    def _get_from_cache(self, tenant_id: str, domain: str) -> Optional[Dict]:
        """Get cached result with tenant isolation"""
        cache_key = self._get_cache_key(tenant_id, domain)
        query = """
            SELECT result, cached_at
            FROM domain_validations
            WHERE cache_key = %s
            ORDER BY cached_at DESC
            LIMIT 1
        """
        row = self.db.query_one(query, (cache_key,))

        if row:
            result, cached_at = row
            if datetime.now() - cached_at < timedelta(minutes=15):
                return result

        return None

    def _save_to_cache(self, tenant_id: str, domain: str, result: Dict):
        """Save result to cache with tenant isolation"""
        cache_key = self._get_cache_key(tenant_id, domain)
        query = """
            INSERT INTO domain_validations (cache_key, domain, result, cached_at)
            VALUES (%s, %s, %s, %s)
        """
        self.db.execute(query, (cache_key, domain, result, datetime.now()))

    def _get_cache_key(self, tenant_id: str, domain: str) -> str:
        """Generate tenant-isolated cache key"""
        key_data = f"{tenant_id}:{domain}"
        return hashlib.sha256(key_data.encode()).hexdigest()

    def _track_usage(self, tenant_id: str, domain: str):
        """Track API usage for billing/analytics"""
        query = """
            INSERT INTO validation_usage (tenant_id, domain, validated_at)
            VALUES (%s, %s, %s)
        """
        self.db.execute(query, (tenant_id, domain, datetime.now()))


class TenantRateLimiter:
    """Per-tenant rate limiting"""

    def __init__(self, redis_client):
        self.redis = redis_client
        self.default_limit = 100  # requests per hour

    def allow_request(self, tenant_id: str) -> bool:
        """Check if tenant can make another request"""
        key = f"rate_limit:{tenant_id}"
        current = self.redis.incr(key)

        if current == 1:
            self.redis.expire(key, 3600)  # 1 hour window

        limit = self._get_tenant_limit(tenant_id)
        return current <= limit

    def _get_tenant_limit(self, tenant_id: str) -> int:
        """Get rate limit for tenant (based on plan)"""
        # Query database for tenant plan
        # Return appropriate limit
        return self.default_limit

Database SchemaΒΆ

-- Tenant domains
CREATE TABLE tenant_domains (
    id SERIAL PRIMARY KEY,
    tenant_id VARCHAR(255) NOT NULL,
    domain VARCHAR(255) NOT NULL,
    added_at TIMESTAMP NOT NULL,
    UNIQUE(tenant_id, domain),
    INDEX idx_tenant (tenant_id)
);

-- Domain validations with tenant isolation
CREATE TABLE domain_validations (
    id SERIAL PRIMARY KEY,
    cache_key VARCHAR(64) NOT NULL,
    domain VARCHAR(255) NOT NULL,
    result JSONB NOT NULL,
    cached_at TIMESTAMP NOT NULL,
    INDEX idx_cache_key (cache_key, cached_at)
);

-- Usage tracking
CREATE TABLE validation_usage (
    id SERIAL PRIMARY KEY,
    tenant_id VARCHAR(255) NOT NULL,
    domain VARCHAR(255) NOT NULL,
    validated_at TIMESTAMP NOT NULL,
    INDEX idx_tenant_date (tenant_id, validated_at)
);

Pattern 5: Real-Time Monitoring DashboardΒΆ

Use Case: Real-time dashboard showing live domain security status.

ArchitectureΒΆ

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   Browser   │◀────▢│  WebSocket  │◀────▢│   Backend   β”‚
β”‚  Dashboard  β”‚      β”‚   Server    β”‚      β”‚   Service   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                                                  β”‚
                                                  β–Ό
                                           β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                                           β”‚  Scheduler  β”‚
                                           β”‚   (Cron)    β”‚
                                           β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                                                  β”‚
                                                  β–Ό
                                           β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                                           β”‚ ReputeAPI   β”‚
                                           β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

ImplementationΒΆ

Backend (Python + Flask-SocketIO):

from flask import Flask
from flask_socketio import SocketIO, emit
from apscheduler.schedulers.background import BackgroundScheduler

    app = Flask(__name__)
    socketio = SocketIO(app, cors_allowed_origins="*")

    class DomainMonitor:
        """Real-time domain monitoring"""

        def __init__(self, api_client):
            self.api = api_client
            self.monitored_domains = set()
            self.scheduler = BackgroundScheduler()
            self.scheduler.start()

        def add_domain(self, domain: str):
            """Add domain to monitoring"""
            if domain not in self.monitored_domains:
                self.monitored_domains.add(domain)
                # Schedule periodic checks
                self.scheduler.add_job(
                    func=self.check_domain,
                    args=[domain],
                    trigger='interval',
                    minutes=15,
                    id=f"check_{domain}"
                )
                # Check immediately
                self.check_domain(domain)

        def remove_domain(self, domain: str):
            """Remove domain from monitoring"""
            if domain in self.monitored_domains:
                self.monitored_domains.remove(domain)
                self.scheduler.remove_job(f"check_{domain}")

        def check_domain(self, domain: str):
            """Check domain and emit result via WebSocket"""
            try:
                result = self.api.get_score(domain)

                # Emit update to all connected clients
                socketio.emit('domain_update', {
                    'domain': domain,
                    'score': result['score'],
                    'grade': result['grade'],
                    'top_issues': result.get('top_issues', []),
                    'timestamp': datetime.now().isoformat()
                })

            except Exception as e:
                socketio.emit('domain_error', {
                    'domain': domain,
                    'error': str(e),
                    'timestamp': datetime.now().isoformat()
                })

    # Initialize monitor
    api_client = ReputeAPIClient(api_key=settings.REPUTE_API_KEY)
    monitor = DomainMonitor(api_client)

    @socketio.on('connect')
    def handle_connect():
        """Handle client connection"""
        print('Client connected')

    @socketio.on('subscribe')
    def handle_subscribe(data):
        """Subscribe to domain updates"""
        domain = data['domain']
        monitor.add_domain(domain)
        emit('subscribed', {'domain': domain})

    @socketio.on('unsubscribe')
    def handle_unsubscribe(data):
        """Unsubscribe from domain updates"""
        domain = data['domain']
        monitor.remove_domain(domain)
        emit('unsubscribed', {'domain': domain})

    if __name__ == '__main__':
        socketio.run(app, host='0.0.0.0', port=5000)
    ```

**Frontend (React + Socket.io):**
```javascript
import React, { useState, useEffect } from 'react';
import io from 'socket.io-client';

    function DomainMonitorDashboard() {
      const [socket, setSocket] = useState(null);
      const [domains, setDomains] = useState({});
      const [newDomain, setNewDomain] = useState('');

      useEffect(() => {
        // Connect to WebSocket server
        const newSocket = io('http://localhost:5000');
        setSocket(newSocket);

        // Listen for domain updates
        newSocket.on('domain_update', (data) => {
          setDomains(prev => ({
            ...prev,
            [data.domain]: {
              ...data,
              status: 'online'
            }
          }));
        });

        // Listen for errors
        newSocket.on('domain_error', (data) => {
          setDomains(prev => ({
            ...prev,
            [data.domain]: {
              ...prev[data.domain],
              error: data.error,
              status: 'error'
            }
          }));
        });

        return () => newSocket.close();
      }, []);

      const subscribeToDomain = (domain) => {
        if (socket && domain) {
          socket.emit('subscribe', { domain });
          setNewDomain('');
        }
      };

      const unsubscribeFromDomain = (domain) => {
        if (socket) {
          socket.emit('unsubscribe', { domain });
          setDomains(prev => {
            const updated = { ...prev };
            delete updated[domain];
            return updated;
          });
        }
      };

      return (
        <div className="dashboard">
          <h1>Domain Security Monitor</h1>

          <div className="add-domain">
            <input
              value={newDomain}
              onChange={(e) => setNewDomain(e.target.value)}
              onKeyPress={(e) => e.key === 'Enter' && subscribeToDomain(newDomain)}
              placeholder="Enter domain to monitor"
            />
            <button onClick={() => subscribeToDomain(newDomain)}>
              Add Domain
            </button>
          </div>

          <div className="domains-grid">
            {Object.entries(domains).map(([domain, data]) => (
              <DomainCard
                key={domain}
                domain={domain}
                data={data}
                onRemove={() => unsubscribeFromDomain(domain)}
              />
            ))}
          </div>
        </div>
      );
    }

    function DomainCard({ domain, data, onRemove }) {
      const getGradeColor = (grade) => {
        const colors = {
          'Excellent': 'green',
          'Good': 'blue',
          'Fair': 'orange',
          'Poor': 'red'
        };
        return colors[grade] || 'gray';
      };

      return (
        <div className="domain-card">
          <div className="card-header">
            <h3>{domain}</h3>
            <button onClick={onRemove}>Γ—</button>
          </div>

          {data.status === 'error' ? (
            <div className="error">Error: {data.error}</div>
          ) : (
            <>
              <div className="score" style={{ color: getGradeColor(data.grade) }}>
                {data.score}/100
              </div>
              <div className="grade">{data.grade}</div>

              {data.top_issues && data.top_issues.length > 0 && (
                <div className="issues">
                  <h4>Issues:</h4>
                  <ul>
                    {data.top_issues.map((issue, i) => (
                      <li key={i} className={`severity-${issue.severity}`}>
                        {issue.message}
                      </li>
                    ))}
                  </ul>
                </div>
              )}

              <div className="timestamp">
                Last checked: {new Date(data.timestamp).toLocaleString()}
              </div>
            </>
          )}
        </div>
      );
    }

    export default DomainMonitorDashboard;
    ```

---

## Pattern 6: Webhook Integration

**Use Case:** Notify external systems of validation results.

### Implementation

```python
import requests
from typing import Dict, List
import hmac
import hashlib

class WebhookService:
    """Webhook notification service"""

    def __init__(self, db_connection):
        self.db = db_connection

    def send_webhook(
        self,
        webhook_url: str,
        event_type: str,
        payload: Dict,
        secret: str = None
    ):
        """
        Send webhook notification

        Args:
            webhook_url: Target webhook URL
            event_type: Type of event
            payload: Event data
            secret: Optional secret for signature
        """
        headers = {
            'Content-Type': 'application/json',
            'X-Webhook-Event': event_type
        }

        # Add signature if secret provided
        if secret:
            signature = self._generate_signature(payload, secret)
            headers['X-Webhook-Signature'] = signature

        try:
            response = requests.post(
                webhook_url,
                json=payload,
                headers=headers,
                timeout=10
            )

            # Log webhook delivery
            self._log_webhook(
                webhook_url,
                event_type,
                response.status_code,
                success=response.status_code < 400
            )

        except Exception as e:
            self._log_webhook(
                webhook_url,
                event_type,
                0,
                success=False,
                error=str(e)
            )

    def _generate_signature(self, payload: Dict, secret: str) -> str:
        """Generate HMAC signature"""
        import json
        message = json.dumps(payload, sort_keys=True)
        signature = hmac.new(
            secret.encode(),
            message.encode(),
            hashlib.sha256
        ).hexdigest()
        return f"sha256={signature}"

    def _log_webhook(
        self,
        url: str,
        event_type: str,
        status_code: int,
        success: bool,
        error: str = None
    ):
        """Log webhook delivery"""
        query = """
            INSERT INTO webhook_logs (url, event_type, status_code, success, error, sent_at)
            VALUES (%s, %s, %s, %s, %s, NOW())
        """
        self.db.execute(query, (url, event_type, status_code, success, error))


# Usage in validation workflow
def validate_with_webhook(domain: str, user_id: int):
    """Validate domain and send webhook"""
    # Get validation result
    client = ReputeAPIClient(api_key=API_KEY)
    result = client.check_domain(domain)

    # Get user's webhook URL
    webhook_config = db.get_user_webhook_config(user_id)

    if webhook_config:
        webhook_service = WebhookService(db)
        webhook_service.send_webhook(
            webhook_url=webhook_config['url'],
            event_type='domain.validated',
            payload={
                'domain': domain,
                'score': result['score'],
                'grade': result['grade'],
                'issues': result.get('issues', []),
                'timestamp': datetime.now().isoformat()
            },
            secret=webhook_config.get('secret')
        )

    return result


Best PracticesΒΆ

1. Caching StrategyΒΆ

  • Cache results for 15 minutes (API default)
  • Use domain + parameters as cache key
  • Implement cache invalidation on user request

2. Rate LimitingΒΆ

  • Respect API rate limits
  • Implement client-side rate limiting
  • Use exponential backoff for retries

3. Error HandlingΒΆ

  • Handle all error types gracefully
  • Implement retry logic for transient failures
  • Log errors for debugging

4. SecurityΒΆ

  • Store API keys securely (environment variables, secrets manager)
  • Use HTTPS for all API calls
  • Validate webhook signatures

5. MonitoringΒΆ

  • Track API usage and costs
  • Monitor response times
  • Set up alerts for errors

Next StepsΒΆ


SupportΒΆ

Need help with integration patterns?