Bulk Validation ExamplesΒΆ
Learn how to efficiently validate multiple domains using ReputeAPI's bulk validation capabilities and CSV processing.
OverviewΒΆ
Bulk validation is ideal for:
- Processing customer domain lists
- Periodic security audits
- Compliance reporting
- Data migration validation
- Deliverability assessments
Key Benefits: - Process up to 100 domains per request - Parallel processing for speed - Batch management for large datasets - CSV import/export support
Quick Start: Bulk EndpointΒΆ
Python ExampleΒΆ
import requests
API_KEY = "your-api-key-here"
BASE_URL = "https://api.reputeapi.com"
def bulk_validate(domains):
"""
Validate multiple domains in one request
Args:
domains: List of domain strings
Returns:
dict: Results for all domains
"""
response = requests.post(
f"{BASE_URL}/v1/bulk-validate",
headers={
"X-API-Key": API_KEY,
"Content-Type": "application/json"
},
json={
"domains": [{"domain": d} for d in domains],
"options": {
"include_score": True,
"parallel": True
}
}
)
response.raise_for_status()
return response.json()
# Usage
domains = ["google.com", "github.com", "stackoverflow.com"]
results = bulk_validate(domains)
print(f"Total: {results['total_domains']}")
print(f"Successful: {results['successful']}")
print(f"Failed: {results['failed']}")
for result in results['results']:
print(f"{result['domain']}: {result['score']}/100")
Expected Output:
CSV File ProcessingΒΆ
Read CSV and ValidateΒΆ
Process domains from a CSV file:
import csv
import requests
from typing import List, Dict
import time
API_KEY = "your-api-key-here"
BASE_URL = "https://api.reputeapi.com"
def read_domains_from_csv(filename: str) -> List[str]:
"""
Read domains from CSV file
Expected CSV format:
domain
example.com
test.com
"""
domains = []
with open(filename, 'r') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
domain = row.get('domain', '').strip()
if domain:
domains.append(domain)
return domains
def validate_domains(domains: List[str]) -> Dict:
"""Validate domains via API"""
response = requests.post(
f"{BASE_URL}/v1/bulk-validate",
headers={
"X-API-Key": API_KEY,
"Content-Type": "application/json"
},
json={
"domains": [{"domain": d} for d in domains],
"options": {"include_score": True}
}
)
response.raise_for_status()
return response.json()
def save_results_to_csv(results: Dict, output_filename: str):
"""Save validation results to CSV"""
with open(output_filename, 'w', newline='') as csvfile:
fieldnames = [
'domain', 'score', 'grade', 'spf_present',
'dkim_present', 'dmarc_present', 'issues_count'
]
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for result in results['results']:
writer.writerow({
'domain': result['domain'],
'score': result['score'],
'grade': result.get('grade', 'N/A'),
'spf_present': 'Yes' if result['spf']['present'] else 'No',
'dkim_present': 'Yes' if result['dkim']['validated_keys'] else 'No',
'dmarc_present': 'Yes' if result['dmarc']['present'] else 'No',
'issues_count': len(result.get('issues', []))
})
def process_csv(input_file: str, output_file: str):
"""
Complete CSV processing workflow
Args:
input_file: CSV file with domains
output_file: Output CSV file for results
"""
print(f"Reading domains from {input_file}...")
domains = read_domains_from_csv(input_file)
print(f"Found {len(domains)} domains")
print("Validating domains...")
results = validate_domains(domains)
print(f"Validation complete!")
print(f" Successful: {results['successful']}")
print(f" Failed: {results['failed']}")
print(f"Saving results to {output_file}...")
save_results_to_csv(results, output_file)
print("Done!")
# Usage
if __name__ == "__main__":
process_csv("domains.csv", "results.csv")
Input CSV (domains.csv):
Output CSV (results.csv):
domain,score,grade,spf_present,dkim_present,dmarc_present,issues_count
google.com,95,Excellent,Yes,Yes,Yes,1
github.com,88,Good,Yes,Yes,Yes,2
stackoverflow.com,92,Excellent,Yes,Yes,Yes,1
reddit.com,85,Good,Yes,Yes,Yes,3
netflix.com,90,Excellent,Yes,Yes,Yes,2
Batch Processing for Large ListsΒΆ
Chunked ProcessingΒΆ
Process large lists in batches:
import requests
import time
from typing import List, Dict, Iterator
API_KEY = "your-api-key-here"
BASE_URL = "https://api.reputeapi.com"
def chunk_list(items: List, chunk_size: int) -> Iterator[List]:
"""Split list into chunks"""
for i in range(0, len(items), chunk_size):
yield items[i:i + chunk_size]
def validate_batch(domains: List[str]) -> Dict:
"""Validate a single batch of domains"""
try:
response = requests.post(
f"{BASE_URL}/v1/bulk-validate",
headers={
"X-API-Key": API_KEY,
"Content-Type": "application/json"
},
json={
"domains": [{"domain": d} for d in domains],
"options": {"include_score": True}
},
timeout=30
)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429:
# Rate limited - will be handled by caller
raise
else:
print(f"HTTP Error: {e}")
return None
except Exception as e:
print(f"Error validating batch: {e}")
return None
def process_large_list(
domains: List[str],
batch_size: int = 50,
delay_between_batches: float = 1.0
) -> List[Dict]:
"""
Process large domain list in batches
Args:
domains: List of all domains to process
batch_size: Domains per batch (max 100)
delay_between_batches: Seconds to wait between batches
Returns:
List of all results
"""
all_results = []
batches = list(chunk_list(domains, batch_size))
total_batches = len(batches)
print(f"Processing {len(domains)} domains in {total_batches} batches...")
for batch_num, batch in enumerate(batches, 1):
print(f"\nBatch {batch_num}/{total_batches} ({len(batch)} domains)...")
try:
batch_results = validate_batch(batch)
if batch_results:
all_results.extend(batch_results['results'])
print(f" β Successful: {batch_results['successful']}")
print(f" β Failed: {batch_results['failed']}")
else:
print(" β Batch failed")
# Rate limiting: wait between batches
if batch_num < total_batches:
print(f" Waiting {delay_between_batches}s before next batch...")
time.sleep(delay_between_batches)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429:
retry_after = int(e.response.headers.get('Retry-After', 60))
print(f" βΈ Rate limited. Waiting {retry_after}s...")
time.sleep(retry_after)
# Retry this batch
batch_results = validate_batch(batch)
if batch_results:
all_results.extend(batch_results['results'])
print(f"\nβ
Complete! Processed {len(all_results)} domains")
return all_results
# Usage
if __name__ == "__main__":
# Load 500 domains from file
with open('large_domain_list.txt') as f:
domains = [line.strip() for line in f if line.strip()]
# Process in batches of 50 with 2 second delay
results = process_large_list(domains, batch_size=50, delay_between_batches=2.0)
# Save results
import json
with open('results.json', 'w') as f:
json.dump(results, f, indent=2)
print(f"Results saved to results.json")
Advanced CSV ProcessingΒΆ
With Progress TrackingΒΆ
Add progress bar and detailed reporting:
import csv
import requests
from typing import List, Dict
from tqdm import tqdm # pip install tqdm
import json
from datetime import datetime
API_KEY = "your-api-key-here"
BASE_URL = "https://api.reputeapi.com"
class DomainBatchProcessor:
"""Advanced domain batch processor with progress tracking"""
def __init__(self, api_key: str, batch_size: int = 50):
self.api_key = api_key
self.base_url = BASE_URL
self.batch_size = batch_size
self.session = requests.Session()
self.session.headers.update({
"X-API-Key": api_key,
"Content-Type": "application/json"
})
def read_csv(self, filename: str, domain_column: str = 'domain') -> List[Dict]:
"""Read domains from CSV with all columns preserved"""
domains = []
with open(filename, 'r', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
domain = row.get(domain_column, '').strip()
if domain:
domains.append({
'domain': domain,
'metadata': row # Preserve all CSV columns
})
return domains
def validate_batch(self, batch: List[Dict]) -> Dict:
"""Validate a batch of domains"""
domain_list = [item['domain'] for item in batch]
response = self.session.post(
f"{self.base_url}/v1/bulk-validate",
json={
"domains": [{"domain": d} for d in domain_list],
"options": {"include_score": True}
},
timeout=30
)
response.raise_for_status()
return response.json()
def process_file(
self,
input_file: str,
output_file: str,
domain_column: str = 'domain'
) -> Dict:
"""
Process CSV file with progress tracking
Returns:
dict: Processing statistics
"""
# Read input
print(f"π Reading {input_file}...")
domains = self.read_csv(input_file, domain_column)
total = len(domains)
print(f"β Found {total} domains\n")
# Process in batches with progress bar
results = []
errors = []
with tqdm(total=total, desc="Processing", unit="domain") as pbar:
for i in range(0, total, self.batch_size):
batch = domains[i:i + self.batch_size]
try:
batch_results = self.validate_batch(batch)
# Merge results with metadata
for j, result in enumerate(batch_results['results']):
results.append({
**result,
'metadata': batch[j]['metadata']
})
# Track errors
if batch_results.get('errors'):
errors.extend(batch_results['errors'])
except Exception as e:
print(f"\nβ Error processing batch: {e}")
for item in batch:
errors.append({
'domain': item['domain'],
'error': str(e)
})
pbar.update(len(batch))
# Save results
print(f"\nπΎ Saving results to {output_file}...")
self.save_results_csv(results, errors, output_file)
# Statistics
stats = {
'total_domains': total,
'successful': len(results),
'failed': len(errors),
'timestamp': datetime.now().isoformat()
}
print(f"\nβ
Processing complete!")
print(f" Total: {stats['total_domains']}")
print(f" Successful: {stats['successful']}")
print(f" Failed: {stats['failed']}")
return stats
def save_results_csv(self, results: List[Dict], errors: List[Dict], filename: str):
"""Save results to CSV with comprehensive data"""
with open(filename, 'w', newline='', encoding='utf-8') as csvfile:
# Determine all fieldnames
base_fields = [
'domain', 'score', 'grade', 'spf_present', 'spf_valid',
'dkim_keys_found', 'dmarc_present', 'dmarc_policy',
'critical_issues', 'high_issues', 'medium_issues', 'low_issues',
'status'
]
# Add metadata fields if present
if results and 'metadata' in results[0]:
metadata_fields = list(results[0]['metadata'].keys())
fieldnames = base_fields + metadata_fields
else:
fieldnames = base_fields
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
# Write successful results
for result in results:
issues_by_severity = {
'critical': 0, 'high': 0, 'medium': 0, 'low': 0
}
for issue in result.get('issues', []):
severity = issue.get('severity', 'low')
issues_by_severity[severity] = issues_by_severity.get(severity, 0) + 1
row = {
'domain': result['domain'],
'score': result.get('score', 0),
'grade': result.get('grade', 'N/A'),
'spf_present': 'Yes' if result.get('spf', {}).get('present') else 'No',
'spf_valid': 'Yes' if result.get('spf', {}).get('valid') else 'No',
'dkim_keys_found': len(result.get('dkim', {}).get('validated_keys', [])),
'dmarc_present': 'Yes' if result.get('dmarc', {}).get('present') else 'No',
'dmarc_policy': result.get('dmarc', {}).get('policy', 'N/A'),
'critical_issues': issues_by_severity['critical'],
'high_issues': issues_by_severity['high'],
'medium_issues': issues_by_severity['medium'],
'low_issues': issues_by_severity['low'],
'status': 'success'
}
# Add metadata columns
if 'metadata' in result:
row.update(result['metadata'])
writer.writerow(row)
# Write errors
for error in errors:
row = {
'domain': error['domain'],
'status': 'error',
'score': 0,
'grade': 'Error'
}
writer.writerow(row)
# Usage
if __name__ == "__main__":
processor = DomainBatchProcessor(
api_key=API_KEY,
batch_size=50
)
stats = processor.process_file(
input_file="customer_domains.csv",
output_file="validation_results.csv",
domain_column="domain"
)
# Save statistics
with open("processing_stats.json", "w") as f:
json.dump(stats, f, indent=2)
Expected Output:
π Reading customer_domains.csv...
β Found 250 domains
Processing: 100%|ββββββββββββββββββββ| 250/250 [00:45<00:00, 5.51domain/s]
πΎ Saving results to validation_results.csv...
β
Processing complete!
Total: 250
Successful: 248
Failed: 2
Database Export ProcessingΒΆ
From Database to CSVΒΆ
Process domains from database:
import psycopg2 # or your database library
import requests
import csv
from typing import List, Dict
API_KEY = "your-api-key-here"
BASE_URL = "https://api.reputeapi.com"
def fetch_domains_from_db(connection_string: str) -> List[Dict]:
"""Fetch domains from PostgreSQL database"""
conn = psycopg2.connect(connection_string)
cursor = conn.cursor()
# Fetch domains with associated data
query = """
SELECT
id,
domain,
customer_name,
created_at
FROM customers
WHERE active = true
"""
cursor.execute(query)
rows = cursor.fetchall()
domains = []
for row in rows:
domains.append({
'id': row[0],
'domain': row[1],
'customer_name': row[2],
'created_at': row[3]
})
cursor.close()
conn.close()
return domains
def validate_and_update_db(
domains: List[Dict],
connection_string: str
):
"""
Validate domains and update database with results
Args:
domains: List of domain records from database
connection_string: Database connection string
"""
# Validate domains
domain_list = [d['domain'] for d in domains]
response = requests.post(
f"{BASE_URL}/v1/bulk-validate",
headers={
"X-API-Key": API_KEY,
"Content-Type": "application/json"
},
json={
"domains": [{"domain": d} for d in domain_list],
"options": {"include_score": True}
}
)
results = response.json()
# Update database with results
conn = psycopg2.connect(connection_string)
cursor = conn.cursor()
for i, result in enumerate(results['results']):
domain_id = domains[i]['id']
update_query = """
UPDATE customers
SET
email_security_score = %s,
spf_valid = %s,
dkim_valid = %s,
dmarc_valid = %s,
last_checked = NOW()
WHERE id = %s
"""
cursor.execute(update_query, (
result['score'],
result['spf']['present'],
len(result['dkim']['validated_keys']) > 0,
result['dmarc']['present'],
domain_id
))
conn.commit()
cursor.close()
conn.close()
print(f"β
Updated {len(results['results'])} records in database")
# Usage
DB_CONNECTION = "postgresql://user:pass@localhost:5432/mydb"
domains = fetch_domains_from_db(DB_CONNECTION)
print(f"Fetched {len(domains)} domains from database")
validate_and_update_db(domains, DB_CONNECTION)
Async Parallel ProcessingΒΆ
High-Performance ProcessingΒΆ
Use async for maximum throughput:
import asyncio
import httpx
from typing import List, Dict
from tqdm.asyncio import tqdm
API_KEY = "your-api-key-here"
BASE_URL = "https://api.reputeapi.com"
async def validate_domain(client: httpx.AsyncClient, domain: str) -> Dict:
"""Validate single domain asynchronously"""
try:
response = await client.get(
f"{BASE_URL}/api/v1/check",
params={"domain": domain},
headers={"X-API-Key": API_KEY},
timeout=15.0
)
response.raise_for_status()
return response.json()
except Exception as e:
return {
'domain': domain,
'error': str(e),
'score': 0
}
async def validate_domains_async(
domains: List[str],
max_concurrent: int = 10
) -> List[Dict]:
"""
Validate multiple domains concurrently
Args:
domains: List of domains to validate
max_concurrent: Maximum concurrent requests
Returns:
List of validation results
"""
# Create semaphore to limit concurrency
semaphore = asyncio.Semaphore(max_concurrent)
async def validate_with_semaphore(client, domain):
async with semaphore:
return await validate_domain(client, domain)
async with httpx.AsyncClient() as client:
tasks = [
validate_with_semaphore(client, domain)
for domain in domains
]
# Use tqdm for progress tracking
results = await tqdm.gather(*tasks, desc="Validating")
return results
def process_domains_async(domains: List[str]) -> List[Dict]:
"""Synchronous wrapper for async processing"""
return asyncio.run(validate_domains_async(domains, max_concurrent=10))
# Usage
if __name__ == "__main__":
# Load domains
with open('domains.txt') as f:
domains = [line.strip() for line in f if line.strip()]
print(f"Processing {len(domains)} domains asynchronously...")
# Process
results = process_domains_async(domains)
# Calculate statistics
successful = sum(1 for r in results if 'error' not in r)
failed = len(results) - successful
print(f"\nβ
Complete!")
print(f" Successful: {successful}")
print(f" Failed: {failed}")
# Save to CSV
import csv
with open('results.csv', 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=['domain', 'score', 'grade', 'status'])
writer.writeheader()
for result in results:
writer.writerow({
'domain': result['domain'],
'score': result.get('score', 0),
'grade': result.get('grade', 'N/A'),
'status': 'error' if 'error' in result else 'success'
})
Domain Audit ReportΒΆ
Generate Comprehensive AuditΒΆ
Create detailed audit report:
import csv
import requests
from datetime import datetime
from typing import List, Dict
API_KEY = "your-api-key-here"
BASE_URL = "https://api.reputeapi.com"
def generate_audit_report(domains: List[str], output_file: str = "audit_report.csv"):
"""
Generate comprehensive domain security audit report
Args:
domains: List of domains to audit
output_file: Output CSV filename
"""
print(f"π Starting audit of {len(domains)} domains...")
# Validate all domains
response = requests.post(
f"{BASE_URL}/v1/bulk-validate",
headers={
"X-API-Key": API_KEY,
"Content-Type": "application/json"
},
json={
"domains": [{"domain": d} for d in domains],
"options": {"include_score": True}
}
)
results = response.json()
# Analyze results
audit_data = []
for result in results['results']:
# Count issues by severity
issues = result.get('issues', [])
critical = sum(1 for i in issues if i['severity'] == 'critical')
high = sum(1 for i in issues if i['severity'] == 'high')
medium = sum(1 for i in issues if i['severity'] == 'medium')
low = sum(1 for i in issues if i['severity'] == 'low')
# Get top issue
top_issue = issues[0]['message'] if issues else 'None'
# Compile audit entry
audit_data.append({
'domain': result['domain'],
'score': result['score'],
'grade': result.get('grade', 'N/A'),
'spf_status': 'β' if result['spf']['present'] else 'β',
'dkim_status': 'β' if result['dkim']['validated_keys'] else 'β',
'dmarc_status': 'β' if result['dmarc']['present'] else 'β',
'dmarc_policy': result['dmarc'].get('policy', 'none'),
'critical_issues': critical,
'high_issues': high,
'medium_issues': medium,
'low_issues': low,
'total_issues': len(issues),
'top_issue': top_issue,
'risk_level': get_risk_level(result['score']),
'audit_date': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
})
# Sort by score (lowest first - highest risk)
audit_data.sort(key=lambda x: x['score'])
# Save to CSV
with open(output_file, 'w', newline='', encoding='utf-8') as csvfile:
fieldnames = audit_data[0].keys() if audit_data else []
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(audit_data)
# Generate summary
print_audit_summary(audit_data)
print(f"\nπΎ Full report saved to {output_file}")
def get_risk_level(score: int) -> str:
"""Determine risk level from score"""
if score >= 90:
return 'Low'
elif score >= 75:
return 'Medium'
elif score >= 50:
return 'High'
else:
return 'Critical'
def print_audit_summary(audit_data: List[Dict]):
"""Print audit summary statistics"""
total = len(audit_data)
# Risk distribution
critical_risk = sum(1 for d in audit_data if d['risk_level'] == 'Critical')
high_risk = sum(1 for d in audit_data if d['risk_level'] == 'High')
medium_risk = sum(1 for d in audit_data if d['risk_level'] == 'Medium')
low_risk = sum(1 for d in audit_data if d['risk_level'] == 'Low')
# Average score
avg_score = sum(d['score'] for d in audit_data) / total if total > 0 else 0
# Configuration status
spf_count = sum(1 for d in audit_data if d['spf_status'] == 'β')
dkim_count = sum(1 for d in audit_data if d['dkim_status'] == 'β')
dmarc_count = sum(1 for d in audit_data if d['dmarc_status'] == 'β')
print("\n" + "="*60)
print("AUDIT SUMMARY")
print("="*60)
print(f"\nTotal Domains Audited: {total}")
print(f"Average Security Score: {avg_score:.1f}/100")
print(f"\nRisk Distribution:")
print(f" π΄ Critical Risk: {critical_risk} ({critical_risk/total*100:.1f}%)")
print(f" π High Risk: {high_risk} ({high_risk/total*100:.1f}%)")
print(f" π‘ Medium Risk: {medium_risk} ({medium_risk/total*100:.1f}%)")
print(f" π’ Low Risk: {low_risk} ({low_risk/total*100:.1f}%)")
print(f"\nConfiguration Status:")
print(f" SPF: {spf_count}/{total} ({spf_count/total*100:.1f}%)")
print(f" DKIM: {dkim_count}/{total} ({dkim_count/total*100:.1f}%)")
print(f" DMARC: {dmarc_count}/{total} ({dmarc_count/total*100:.1f}%)")
# Top 5 worst domains
print(f"\nTop 5 Domains Needing Attention:")
for i, domain in enumerate(audit_data[:5], 1):
print(f" {i}. {domain['domain']}: {domain['score']}/100 ({domain['risk_level']} Risk)")
print(f" Issue: {domain['top_issue']}")
# Usage
if __name__ == "__main__":
# Read domains from file
with open('domains.txt') as f:
domains = [line.strip() for line in f if line.strip()]
generate_audit_report(domains, "security_audit_report.csv")
Best PracticesΒΆ
1. Respect Rate LimitsΒΆ
import time
def process_with_rate_limit(domains: List[str], requests_per_minute: int = 60):
"""Process domains respecting rate limits"""
delay = 60.0 / requests_per_minute # Delay between requests
results = []
for domain in domains:
result = validate_domain(domain)
results.append(result)
time.sleep(delay) # Rate limiting
return results
2. Handle Partial FailuresΒΆ
def process_with_error_recovery(domains: List[str]):
"""Process domains with retry logic for failures"""
results = []
failed = []
for domain in domains:
try:
result = validate_domain(domain)
results.append(result)
except Exception as e:
print(f"Failed: {domain} - {e}")
failed.append(domain)
# Retry failed domains
if failed:
print(f"\nRetrying {len(failed)} failed domains...")
for domain in failed:
try:
time.sleep(2) # Wait before retry
result = validate_domain(domain)
results.append(result)
except Exception as e:
print(f"Retry failed: {domain}")
return results
3. Use Batch ProcessingΒΆ
For large lists, always use batches: - Free tier: 10 domains per batch - Basic tier: 50 domains per batch - Premium tier: 100 domains per batch
4. Cache ResultsΒΆ
Avoid re-validating same domains:
import json
from pathlib import Path
def get_cached_result(domain: str, cache_file: str = 'cache.json'):
"""Get cached result if available"""
if Path(cache_file).exists():
with open(cache_file) as f:
cache = json.load(f)
return cache.get(domain)
return None
def save_to_cache(domain: str, result: Dict, cache_file: str = 'cache.json'):
"""Save result to cache"""
cache = {}
if Path(cache_file).exists():
with open(cache_file) as f:
cache = json.load(f)
cache[domain] = result
with open(cache_file, 'w') as f:
json.dump(cache, f)
TroubleshootingΒΆ
Issue: 413 Payload Too LargeΒΆ
Cause: Too many domains in single request
Solution: Reduce batch size
Issue: Timeout ErrorsΒΆ
Cause: Network issues or large batches
Solution: Increase timeout and reduce batch size
Issue: Rate Limit ErrorsΒΆ
Cause: Sending requests too quickly
Solution: Add delays between batches
Next StepsΒΆ
- Real-World Scenarios: See Common Scenarios
- Troubleshooting: Check Troubleshooting Guide
- Complete Code: Browse Code Samples Library
- Best Practices: Read Integration Best Practices