Bulk Validation ExamplesΒΆ

Learn how to efficiently validate multiple domains using ReputeAPI's bulk validation capabilities and CSV processing.


OverviewΒΆ

Bulk validation is ideal for:

  • Processing customer domain lists
  • Periodic security audits
  • Compliance reporting
  • Data migration validation
  • Deliverability assessments

Key Benefits: - Process up to 100 domains per request - Parallel processing for speed - Batch management for large datasets - CSV import/export support


Quick Start: Bulk EndpointΒΆ

Python ExampleΒΆ

import requests

API_KEY = "your-api-key-here"
BASE_URL = "https://api.reputeapi.com"

def bulk_validate(domains):
    """
    Validate multiple domains in one request

    Args:
        domains: List of domain strings

    Returns:
        dict: Results for all domains
    """
    response = requests.post(
        f"{BASE_URL}/v1/bulk-validate",
        headers={
            "X-API-Key": API_KEY,
            "Content-Type": "application/json"
        },
        json={
            "domains": [{"domain": d} for d in domains],
            "options": {
                "include_score": True,
                "parallel": True
            }
        }
    )

    response.raise_for_status()
    return response.json()

# Usage
domains = ["google.com", "github.com", "stackoverflow.com"]
results = bulk_validate(domains)

print(f"Total: {results['total_domains']}")
print(f"Successful: {results['successful']}")
print(f"Failed: {results['failed']}")

for result in results['results']:
    print(f"{result['domain']}: {result['score']}/100")

Expected Output:

Total: 3
Successful: 3
Failed: 0
google.com: 95/100
github.com: 88/100
stackoverflow.com: 92/100


CSV File ProcessingΒΆ

Read CSV and ValidateΒΆ

Process domains from a CSV file:

import csv
import requests
from typing import List, Dict
import time

API_KEY = "your-api-key-here"
BASE_URL = "https://api.reputeapi.com"

def read_domains_from_csv(filename: str) -> List[str]:
    """
    Read domains from CSV file

    Expected CSV format:
    domain
    example.com
    test.com
    """
    domains = []

    with open(filename, 'r') as csvfile:
        reader = csv.DictReader(csvfile)

        for row in reader:
            domain = row.get('domain', '').strip()
            if domain:
                domains.append(domain)

    return domains

def validate_domains(domains: List[str]) -> Dict:
    """Validate domains via API"""
    response = requests.post(
        f"{BASE_URL}/v1/bulk-validate",
        headers={
            "X-API-Key": API_KEY,
            "Content-Type": "application/json"
        },
        json={
            "domains": [{"domain": d} for d in domains],
            "options": {"include_score": True}
        }
    )

    response.raise_for_status()
    return response.json()

def save_results_to_csv(results: Dict, output_filename: str):
    """Save validation results to CSV"""

    with open(output_filename, 'w', newline='') as csvfile:
        fieldnames = [
            'domain', 'score', 'grade', 'spf_present',
            'dkim_present', 'dmarc_present', 'issues_count'
        ]

        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()

        for result in results['results']:
            writer.writerow({
                'domain': result['domain'],
                'score': result['score'],
                'grade': result.get('grade', 'N/A'),
                'spf_present': 'Yes' if result['spf']['present'] else 'No',
                'dkim_present': 'Yes' if result['dkim']['validated_keys'] else 'No',
                'dmarc_present': 'Yes' if result['dmarc']['present'] else 'No',
                'issues_count': len(result.get('issues', []))
            })

def process_csv(input_file: str, output_file: str):
    """
    Complete CSV processing workflow

    Args:
        input_file: CSV file with domains
        output_file: Output CSV file for results
    """
    print(f"Reading domains from {input_file}...")
    domains = read_domains_from_csv(input_file)
    print(f"Found {len(domains)} domains")

    print("Validating domains...")
    results = validate_domains(domains)

    print(f"Validation complete!")
    print(f"  Successful: {results['successful']}")
    print(f"  Failed: {results['failed']}")

    print(f"Saving results to {output_file}...")
    save_results_to_csv(results, output_file)
    print("Done!")

# Usage
if __name__ == "__main__":
    process_csv("domains.csv", "results.csv")

Input CSV (domains.csv):

domain
google.com
github.com
stackoverflow.com
reddit.com
netflix.com

Output CSV (results.csv):

domain,score,grade,spf_present,dkim_present,dmarc_present,issues_count
google.com,95,Excellent,Yes,Yes,Yes,1
github.com,88,Good,Yes,Yes,Yes,2
stackoverflow.com,92,Excellent,Yes,Yes,Yes,1
reddit.com,85,Good,Yes,Yes,Yes,3
netflix.com,90,Excellent,Yes,Yes,Yes,2


Batch Processing for Large ListsΒΆ

Chunked ProcessingΒΆ

Process large lists in batches:

import requests
import time
from typing import List, Dict, Iterator

API_KEY = "your-api-key-here"
BASE_URL = "https://api.reputeapi.com"

def chunk_list(items: List, chunk_size: int) -> Iterator[List]:
    """Split list into chunks"""
    for i in range(0, len(items), chunk_size):
        yield items[i:i + chunk_size]

def validate_batch(domains: List[str]) -> Dict:
    """Validate a single batch of domains"""
    try:
        response = requests.post(
            f"{BASE_URL}/v1/bulk-validate",
            headers={
                "X-API-Key": API_KEY,
                "Content-Type": "application/json"
            },
            json={
                "domains": [{"domain": d} for d in domains],
                "options": {"include_score": True}
            },
            timeout=30
        )

        response.raise_for_status()
        return response.json()

    except requests.exceptions.HTTPError as e:
        if e.response.status_code == 429:
            # Rate limited - will be handled by caller
            raise
        else:
            print(f"HTTP Error: {e}")
            return None

    except Exception as e:
        print(f"Error validating batch: {e}")
        return None

def process_large_list(
    domains: List[str],
    batch_size: int = 50,
    delay_between_batches: float = 1.0
) -> List[Dict]:
    """
    Process large domain list in batches

    Args:
        domains: List of all domains to process
        batch_size: Domains per batch (max 100)
        delay_between_batches: Seconds to wait between batches

    Returns:
        List of all results
    """
    all_results = []
    batches = list(chunk_list(domains, batch_size))
    total_batches = len(batches)

    print(f"Processing {len(domains)} domains in {total_batches} batches...")

    for batch_num, batch in enumerate(batches, 1):
        print(f"\nBatch {batch_num}/{total_batches} ({len(batch)} domains)...")

        try:
            batch_results = validate_batch(batch)

            if batch_results:
                all_results.extend(batch_results['results'])
                print(f"  βœ“ Successful: {batch_results['successful']}")
                print(f"  βœ— Failed: {batch_results['failed']}")
            else:
                print("  ⚠ Batch failed")

            # Rate limiting: wait between batches
            if batch_num < total_batches:
                print(f"  Waiting {delay_between_batches}s before next batch...")
                time.sleep(delay_between_batches)

        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 429:
                retry_after = int(e.response.headers.get('Retry-After', 60))
                print(f"  ⏸ Rate limited. Waiting {retry_after}s...")
                time.sleep(retry_after)

                # Retry this batch
                batch_results = validate_batch(batch)
                if batch_results:
                    all_results.extend(batch_results['results'])

    print(f"\nβœ… Complete! Processed {len(all_results)} domains")
    return all_results

# Usage
if __name__ == "__main__":
    # Load 500 domains from file
    with open('large_domain_list.txt') as f:
        domains = [line.strip() for line in f if line.strip()]

    # Process in batches of 50 with 2 second delay
    results = process_large_list(domains, batch_size=50, delay_between_batches=2.0)

    # Save results
    import json
    with open('results.json', 'w') as f:
        json.dump(results, f, indent=2)

    print(f"Results saved to results.json")

Advanced CSV ProcessingΒΆ

With Progress TrackingΒΆ

Add progress bar and detailed reporting:

import csv
import requests
from typing import List, Dict
from tqdm import tqdm  # pip install tqdm
import json
from datetime import datetime

API_KEY = "your-api-key-here"
BASE_URL = "https://api.reputeapi.com"

class DomainBatchProcessor:
    """Advanced domain batch processor with progress tracking"""

    def __init__(self, api_key: str, batch_size: int = 50):
        self.api_key = api_key
        self.base_url = BASE_URL
        self.batch_size = batch_size
        self.session = requests.Session()
        self.session.headers.update({
            "X-API-Key": api_key,
            "Content-Type": "application/json"
        })

    def read_csv(self, filename: str, domain_column: str = 'domain') -> List[Dict]:
        """Read domains from CSV with all columns preserved"""
        domains = []

        with open(filename, 'r', encoding='utf-8') as csvfile:
            reader = csv.DictReader(csvfile)

            for row in reader:
                domain = row.get(domain_column, '').strip()
                if domain:
                    domains.append({
                        'domain': domain,
                        'metadata': row  # Preserve all CSV columns
                    })

        return domains

    def validate_batch(self, batch: List[Dict]) -> Dict:
        """Validate a batch of domains"""
        domain_list = [item['domain'] for item in batch]

        response = self.session.post(
            f"{self.base_url}/v1/bulk-validate",
            json={
                "domains": [{"domain": d} for d in domain_list],
                "options": {"include_score": True}
            },
            timeout=30
        )

        response.raise_for_status()
        return response.json()

    def process_file(
        self,
        input_file: str,
        output_file: str,
        domain_column: str = 'domain'
    ) -> Dict:
        """
        Process CSV file with progress tracking

        Returns:
            dict: Processing statistics
        """
        # Read input
        print(f"πŸ“– Reading {input_file}...")
        domains = self.read_csv(input_file, domain_column)
        total = len(domains)
        print(f"βœ“ Found {total} domains\n")

        # Process in batches with progress bar
        results = []
        errors = []

        with tqdm(total=total, desc="Processing", unit="domain") as pbar:
            for i in range(0, total, self.batch_size):
                batch = domains[i:i + self.batch_size]

                try:
                    batch_results = self.validate_batch(batch)

                    # Merge results with metadata
                    for j, result in enumerate(batch_results['results']):
                        results.append({
                            **result,
                            'metadata': batch[j]['metadata']
                        })

                    # Track errors
                    if batch_results.get('errors'):
                        errors.extend(batch_results['errors'])

                except Exception as e:
                    print(f"\n⚠ Error processing batch: {e}")
                    for item in batch:
                        errors.append({
                            'domain': item['domain'],
                            'error': str(e)
                        })

                pbar.update(len(batch))

        # Save results
        print(f"\nπŸ’Ύ Saving results to {output_file}...")
        self.save_results_csv(results, errors, output_file)

        # Statistics
        stats = {
            'total_domains': total,
            'successful': len(results),
            'failed': len(errors),
            'timestamp': datetime.now().isoformat()
        }

        print(f"\nβœ… Processing complete!")
        print(f"   Total: {stats['total_domains']}")
        print(f"   Successful: {stats['successful']}")
        print(f"   Failed: {stats['failed']}")

        return stats

    def save_results_csv(self, results: List[Dict], errors: List[Dict], filename: str):
        """Save results to CSV with comprehensive data"""

        with open(filename, 'w', newline='', encoding='utf-8') as csvfile:
            # Determine all fieldnames
            base_fields = [
                'domain', 'score', 'grade', 'spf_present', 'spf_valid',
                'dkim_keys_found', 'dmarc_present', 'dmarc_policy',
                'critical_issues', 'high_issues', 'medium_issues', 'low_issues',
                'status'
            ]

            # Add metadata fields if present
            if results and 'metadata' in results[0]:
                metadata_fields = list(results[0]['metadata'].keys())
                fieldnames = base_fields + metadata_fields
            else:
                fieldnames = base_fields

            writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
            writer.writeheader()

            # Write successful results
            for result in results:
                issues_by_severity = {
                    'critical': 0, 'high': 0, 'medium': 0, 'low': 0
                }

                for issue in result.get('issues', []):
                    severity = issue.get('severity', 'low')
                    issues_by_severity[severity] = issues_by_severity.get(severity, 0) + 1

                row = {
                    'domain': result['domain'],
                    'score': result.get('score', 0),
                    'grade': result.get('grade', 'N/A'),
                    'spf_present': 'Yes' if result.get('spf', {}).get('present') else 'No',
                    'spf_valid': 'Yes' if result.get('spf', {}).get('valid') else 'No',
                    'dkim_keys_found': len(result.get('dkim', {}).get('validated_keys', [])),
                    'dmarc_present': 'Yes' if result.get('dmarc', {}).get('present') else 'No',
                    'dmarc_policy': result.get('dmarc', {}).get('policy', 'N/A'),
                    'critical_issues': issues_by_severity['critical'],
                    'high_issues': issues_by_severity['high'],
                    'medium_issues': issues_by_severity['medium'],
                    'low_issues': issues_by_severity['low'],
                    'status': 'success'
                }

                # Add metadata columns
                if 'metadata' in result:
                    row.update(result['metadata'])

                writer.writerow(row)

            # Write errors
            for error in errors:
                row = {
                    'domain': error['domain'],
                    'status': 'error',
                    'score': 0,
                    'grade': 'Error'
                }
                writer.writerow(row)

# Usage
if __name__ == "__main__":
    processor = DomainBatchProcessor(
        api_key=API_KEY,
        batch_size=50
    )

    stats = processor.process_file(
        input_file="customer_domains.csv",
        output_file="validation_results.csv",
        domain_column="domain"
    )

    # Save statistics
    with open("processing_stats.json", "w") as f:
        json.dump(stats, f, indent=2)

Expected Output:

πŸ“– Reading customer_domains.csv...
βœ“ Found 250 domains

Processing: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 250/250 [00:45<00:00,  5.51domain/s]

πŸ’Ύ Saving results to validation_results.csv...

βœ… Processing complete!
   Total: 250
   Successful: 248
   Failed: 2


Database Export ProcessingΒΆ

From Database to CSVΒΆ

Process domains from database:

import psycopg2  # or your database library
import requests
import csv
from typing import List, Dict

API_KEY = "your-api-key-here"
BASE_URL = "https://api.reputeapi.com"

def fetch_domains_from_db(connection_string: str) -> List[Dict]:
    """Fetch domains from PostgreSQL database"""

    conn = psycopg2.connect(connection_string)
    cursor = conn.cursor()

    # Fetch domains with associated data
    query = """
        SELECT
            id,
            domain,
            customer_name,
            created_at
        FROM customers
        WHERE active = true
    """

    cursor.execute(query)
    rows = cursor.fetchall()

    domains = []
    for row in rows:
        domains.append({
            'id': row[0],
            'domain': row[1],
            'customer_name': row[2],
            'created_at': row[3]
        })

    cursor.close()
    conn.close()

    return domains

def validate_and_update_db(
    domains: List[Dict],
    connection_string: str
):
    """
    Validate domains and update database with results

    Args:
        domains: List of domain records from database
        connection_string: Database connection string
    """
    # Validate domains
    domain_list = [d['domain'] for d in domains]

    response = requests.post(
        f"{BASE_URL}/v1/bulk-validate",
        headers={
            "X-API-Key": API_KEY,
            "Content-Type": "application/json"
        },
        json={
            "domains": [{"domain": d} for d in domain_list],
            "options": {"include_score": True}
        }
    )

    results = response.json()

    # Update database with results
    conn = psycopg2.connect(connection_string)
    cursor = conn.cursor()

    for i, result in enumerate(results['results']):
        domain_id = domains[i]['id']

        update_query = """
            UPDATE customers
            SET
                email_security_score = %s,
                spf_valid = %s,
                dkim_valid = %s,
                dmarc_valid = %s,
                last_checked = NOW()
            WHERE id = %s
        """

        cursor.execute(update_query, (
            result['score'],
            result['spf']['present'],
            len(result['dkim']['validated_keys']) > 0,
            result['dmarc']['present'],
            domain_id
        ))

    conn.commit()
    cursor.close()
    conn.close()

    print(f"βœ… Updated {len(results['results'])} records in database")

# Usage
DB_CONNECTION = "postgresql://user:pass@localhost:5432/mydb"

domains = fetch_domains_from_db(DB_CONNECTION)
print(f"Fetched {len(domains)} domains from database")

validate_and_update_db(domains, DB_CONNECTION)

Async Parallel ProcessingΒΆ

High-Performance ProcessingΒΆ

Use async for maximum throughput:

import asyncio
import httpx
from typing import List, Dict
from tqdm.asyncio import tqdm

API_KEY = "your-api-key-here"
BASE_URL = "https://api.reputeapi.com"

async def validate_domain(client: httpx.AsyncClient, domain: str) -> Dict:
    """Validate single domain asynchronously"""
    try:
        response = await client.get(
            f"{BASE_URL}/api/v1/check",
            params={"domain": domain},
            headers={"X-API-Key": API_KEY},
            timeout=15.0
        )
        response.raise_for_status()
        return response.json()

    except Exception as e:
        return {
            'domain': domain,
            'error': str(e),
            'score': 0
        }

async def validate_domains_async(
    domains: List[str],
    max_concurrent: int = 10
) -> List[Dict]:
    """
    Validate multiple domains concurrently

    Args:
        domains: List of domains to validate
        max_concurrent: Maximum concurrent requests

    Returns:
        List of validation results
    """
    # Create semaphore to limit concurrency
    semaphore = asyncio.Semaphore(max_concurrent)

    async def validate_with_semaphore(client, domain):
        async with semaphore:
            return await validate_domain(client, domain)

    async with httpx.AsyncClient() as client:
        tasks = [
            validate_with_semaphore(client, domain)
            for domain in domains
        ]

        # Use tqdm for progress tracking
        results = await tqdm.gather(*tasks, desc="Validating")

    return results

def process_domains_async(domains: List[str]) -> List[Dict]:
    """Synchronous wrapper for async processing"""
    return asyncio.run(validate_domains_async(domains, max_concurrent=10))

# Usage
if __name__ == "__main__":
    # Load domains
    with open('domains.txt') as f:
        domains = [line.strip() for line in f if line.strip()]

    print(f"Processing {len(domains)} domains asynchronously...")

    # Process
    results = process_domains_async(domains)

    # Calculate statistics
    successful = sum(1 for r in results if 'error' not in r)
    failed = len(results) - successful

    print(f"\nβœ… Complete!")
    print(f"   Successful: {successful}")
    print(f"   Failed: {failed}")

    # Save to CSV
    import csv
    with open('results.csv', 'w', newline='') as f:
        writer = csv.DictWriter(f, fieldnames=['domain', 'score', 'grade', 'status'])
        writer.writeheader()

        for result in results:
            writer.writerow({
                'domain': result['domain'],
                'score': result.get('score', 0),
                'grade': result.get('grade', 'N/A'),
                'status': 'error' if 'error' in result else 'success'
            })

Domain Audit ReportΒΆ

Generate Comprehensive AuditΒΆ

Create detailed audit report:

import csv
import requests
from datetime import datetime
from typing import List, Dict

API_KEY = "your-api-key-here"
BASE_URL = "https://api.reputeapi.com"

def generate_audit_report(domains: List[str], output_file: str = "audit_report.csv"):
    """
    Generate comprehensive domain security audit report

    Args:
        domains: List of domains to audit
        output_file: Output CSV filename
    """
    print(f"πŸ” Starting audit of {len(domains)} domains...")

    # Validate all domains
    response = requests.post(
        f"{BASE_URL}/v1/bulk-validate",
        headers={
            "X-API-Key": API_KEY,
            "Content-Type": "application/json"
        },
        json={
            "domains": [{"domain": d} for d in domains],
            "options": {"include_score": True}
        }
    )

    results = response.json()

    # Analyze results
    audit_data = []

    for result in results['results']:
        # Count issues by severity
        issues = result.get('issues', [])
        critical = sum(1 for i in issues if i['severity'] == 'critical')
        high = sum(1 for i in issues if i['severity'] == 'high')
        medium = sum(1 for i in issues if i['severity'] == 'medium')
        low = sum(1 for i in issues if i['severity'] == 'low')

        # Get top issue
        top_issue = issues[0]['message'] if issues else 'None'

        # Compile audit entry
        audit_data.append({
            'domain': result['domain'],
            'score': result['score'],
            'grade': result.get('grade', 'N/A'),
            'spf_status': 'βœ“' if result['spf']['present'] else 'βœ—',
            'dkim_status': 'βœ“' if result['dkim']['validated_keys'] else 'βœ—',
            'dmarc_status': 'βœ“' if result['dmarc']['present'] else 'βœ—',
            'dmarc_policy': result['dmarc'].get('policy', 'none'),
            'critical_issues': critical,
            'high_issues': high,
            'medium_issues': medium,
            'low_issues': low,
            'total_issues': len(issues),
            'top_issue': top_issue,
            'risk_level': get_risk_level(result['score']),
            'audit_date': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        })

    # Sort by score (lowest first - highest risk)
    audit_data.sort(key=lambda x: x['score'])

    # Save to CSV
    with open(output_file, 'w', newline='', encoding='utf-8') as csvfile:
        fieldnames = audit_data[0].keys() if audit_data else []
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(audit_data)

    # Generate summary
    print_audit_summary(audit_data)

    print(f"\nπŸ’Ύ Full report saved to {output_file}")

def get_risk_level(score: int) -> str:
    """Determine risk level from score"""
    if score >= 90:
        return 'Low'
    elif score >= 75:
        return 'Medium'
    elif score >= 50:
        return 'High'
    else:
        return 'Critical'

def print_audit_summary(audit_data: List[Dict]):
    """Print audit summary statistics"""
    total = len(audit_data)

    # Risk distribution
    critical_risk = sum(1 for d in audit_data if d['risk_level'] == 'Critical')
    high_risk = sum(1 for d in audit_data if d['risk_level'] == 'High')
    medium_risk = sum(1 for d in audit_data if d['risk_level'] == 'Medium')
    low_risk = sum(1 for d in audit_data if d['risk_level'] == 'Low')

    # Average score
    avg_score = sum(d['score'] for d in audit_data) / total if total > 0 else 0

    # Configuration status
    spf_count = sum(1 for d in audit_data if d['spf_status'] == 'βœ“')
    dkim_count = sum(1 for d in audit_data if d['dkim_status'] == 'βœ“')
    dmarc_count = sum(1 for d in audit_data if d['dmarc_status'] == 'βœ“')

    print("\n" + "="*60)
    print("AUDIT SUMMARY")
    print("="*60)
    print(f"\nTotal Domains Audited: {total}")
    print(f"Average Security Score: {avg_score:.1f}/100")
    print(f"\nRisk Distribution:")
    print(f"  πŸ”΄ Critical Risk: {critical_risk} ({critical_risk/total*100:.1f}%)")
    print(f"  🟠 High Risk:     {high_risk} ({high_risk/total*100:.1f}%)")
    print(f"  🟑 Medium Risk:   {medium_risk} ({medium_risk/total*100:.1f}%)")
    print(f"  🟒 Low Risk:      {low_risk} ({low_risk/total*100:.1f}%)")
    print(f"\nConfiguration Status:")
    print(f"  SPF:   {spf_count}/{total} ({spf_count/total*100:.1f}%)")
    print(f"  DKIM:  {dkim_count}/{total} ({dkim_count/total*100:.1f}%)")
    print(f"  DMARC: {dmarc_count}/{total} ({dmarc_count/total*100:.1f}%)")

    # Top 5 worst domains
    print(f"\nTop 5 Domains Needing Attention:")
    for i, domain in enumerate(audit_data[:5], 1):
        print(f"  {i}. {domain['domain']}: {domain['score']}/100 ({domain['risk_level']} Risk)")
        print(f"     Issue: {domain['top_issue']}")

# Usage
if __name__ == "__main__":
    # Read domains from file
    with open('domains.txt') as f:
        domains = [line.strip() for line in f if line.strip()]

    generate_audit_report(domains, "security_audit_report.csv")

Best PracticesΒΆ

1. Respect Rate LimitsΒΆ

import time

def process_with_rate_limit(domains: List[str], requests_per_minute: int = 60):
    """Process domains respecting rate limits"""

    delay = 60.0 / requests_per_minute  # Delay between requests

    results = []
    for domain in domains:
        result = validate_domain(domain)
        results.append(result)

        time.sleep(delay)  # Rate limiting

    return results

2. Handle Partial FailuresΒΆ

def process_with_error_recovery(domains: List[str]):
    """Process domains with retry logic for failures"""

    results = []
    failed = []

    for domain in domains:
        try:
            result = validate_domain(domain)
            results.append(result)
        except Exception as e:
            print(f"Failed: {domain} - {e}")
            failed.append(domain)

    # Retry failed domains
    if failed:
        print(f"\nRetrying {len(failed)} failed domains...")
        for domain in failed:
            try:
                time.sleep(2)  # Wait before retry
                result = validate_domain(domain)
                results.append(result)
            except Exception as e:
                print(f"Retry failed: {domain}")

    return results

3. Use Batch ProcessingΒΆ

For large lists, always use batches: - Free tier: 10 domains per batch - Basic tier: 50 domains per batch - Premium tier: 100 domains per batch

4. Cache ResultsΒΆ

Avoid re-validating same domains:

import json
from pathlib import Path

def get_cached_result(domain: str, cache_file: str = 'cache.json'):
    """Get cached result if available"""
    if Path(cache_file).exists():
        with open(cache_file) as f:
            cache = json.load(f)
            return cache.get(domain)
    return None

def save_to_cache(domain: str, result: Dict, cache_file: str = 'cache.json'):
    """Save result to cache"""
    cache = {}
    if Path(cache_file).exists():
        with open(cache_file) as f:
            cache = json.load(f)

    cache[domain] = result

    with open(cache_file, 'w') as f:
        json.dump(cache, f)

TroubleshootingΒΆ

Issue: 413 Payload Too LargeΒΆ

Cause: Too many domains in single request

Solution: Reduce batch size

batch_size = 50  # Instead of 100

Issue: Timeout ErrorsΒΆ

Cause: Network issues or large batches

Solution: Increase timeout and reduce batch size

response = requests.post(url, json=data, timeout=60)  # 60 second timeout

Issue: Rate Limit ErrorsΒΆ

Cause: Sending requests too quickly

Solution: Add delays between batches

time.sleep(2)  # 2 second delay between batches


Next StepsΒΆ


Additional ResourcesΒΆ