Kevin Muñoz
14d9d5d93e
- Agrega evaluación de fortaleza SPF/DMARC (0-3) - Implementa verificación PCT en DMARC - Añade análisis de registros PTR - Actualiza documentación y ejemplos - Incrementa versión a 2.1.0
354 lines
14 KiB
Python
354 lines
14 KiB
Python
#!/usr/bin/env python
|
||
|
||
import asyncio
|
||
import dns.resolver
|
||
import argparse
|
||
import re
|
||
import socket
|
||
import sys
|
||
import time
|
||
from dataclasses import dataclass
|
||
from typing import List, Optional, Tuple
|
||
from rich.console import Console
|
||
from rich.table import Table
|
||
from rich import print as rprint
|
||
|
||
VERSION = "2.1.0"
|
||
|
||
@dataclass
|
||
class SecurityAnalysis:
|
||
score: int
|
||
features: List[str]
|
||
vulnerabilities: List[str]
|
||
spf_strict: bool
|
||
dmarc_enforced: bool
|
||
spoofable: bool
|
||
|
||
class Colors:
|
||
GREEN = "\033[0;32m"
|
||
RED = "\033[0;31m"
|
||
YELLOW = "\033[1;33m"
|
||
BLUE = "\033[0;34m"
|
||
CYAN = "\033[0;36m"
|
||
NC = "\033[0m"
|
||
|
||
class EmailValidator:
|
||
def __init__(self, verbose: bool = False, stealth: bool = False, delay: int = 0):
|
||
self.verbose = verbose
|
||
self.stealth = stealth
|
||
self.delay = delay
|
||
self.console = Console()
|
||
|
||
@staticmethod
|
||
def show_banner():
|
||
banner = """
|
||
███████╗███╗ ███╗ █████╗ ██╗██╗ ██████╗ ███████╗██╗███╗ ██╗████████╗
|
||
██╔════╝████╗ ████║██╔══██╗██║██║ ██╔═══██╗██╔════╝██║████╗ ██║╚══██╔══╝
|
||
█████╗ ██╔████╔██║███████║██║██║ ██║ ██║███████╗██║██╔██╗ ██║ ██║
|
||
██╔══╝ ██║╚██╔╝██║██╔══██║██║██║ ██║ ██║╚════██║██║██║╚██╗██║ ██║
|
||
███████╗██║ ╚═╝ ██║██║ ██║██║███████╗╚██████╔╝███████║██║██║ ╚████║ ██║
|
||
╚══════╝╚═╝ ╚═╝╚═╝ ╚═╝╚═╝╚══════╝ ╚═════╝ ╚══════╝╚═╝╚═╝ ╚═══╝ ╚═╝
|
||
"""
|
||
print(f"{Colors.CYAN}{banner}{Colors.NC}")
|
||
print(f"{Colors.CYAN} Email OSINT Validator v{VERSION}{Colors.NC}")
|
||
print(f"{Colors.BLUE} Desarrollado por: Kevin Muñoz (MrHacker|TheSL18){Colors.NC}")
|
||
print(f"{Colors.YELLOW} Uso ético - Solo para investigación autorizada{Colors.NC}\n")
|
||
|
||
async def analyze_mx_security(self, domain: str) -> SecurityAnalysis:
|
||
security_features = []
|
||
vulnerabilities = []
|
||
security_score = 0
|
||
spf_strict = False
|
||
dmarc_enforced = False
|
||
spoofable = False
|
||
spf_strength = 0
|
||
dmarc_strength = 0
|
||
|
||
try:
|
||
# Verificar SPF
|
||
try:
|
||
answers = dns.resolver.resolve(domain, 'TXT')
|
||
spf_record = next((str(record) for record in answers
|
||
if str(record).startswith('"v=spf1')), None)
|
||
|
||
if spf_record:
|
||
security_features.append("SPF")
|
||
security_score += 20
|
||
|
||
# Analizar mecanismos SPF
|
||
mechanisms = re.findall(r'[\+\-\~\?]?(ip4|ip6|a|mx|ptr|exists|include|all)[:\/]?[^\s]*', spf_record)
|
||
|
||
if '-all' in spf_record:
|
||
security_features.append("SPF_STRICT")
|
||
spf_strict = True
|
||
spf_strength = 3
|
||
security_score += 15
|
||
elif '~all' in spf_record:
|
||
security_features.append("SPF_SOFT_FAIL")
|
||
vulnerabilities.append("SPF no es estricto (usa ~all)")
|
||
spf_strength = 2
|
||
security_score += 10
|
||
elif '?all' in spf_record:
|
||
vulnerabilities.append("SPF en modo neutral")
|
||
spf_strength = 1
|
||
security_score += 5
|
||
elif '+all' in spf_record:
|
||
vulnerabilities.append("SPF permite cualquier remitente")
|
||
spf_strength = 0
|
||
spoofable = True
|
||
|
||
if len(mechanisms) < 2:
|
||
vulnerabilities.append("SPF: Pocas reglas definidas")
|
||
if 'ptr' in spf_record:
|
||
vulnerabilities.append("SPF: Uso de mecanismo PTR (no recomendado)")
|
||
if any(m.startswith('+') for m in mechanisms):
|
||
vulnerabilities.append("SPF: Uso de modificador '+' explícito (riesgo de spoofing)")
|
||
spoofable = True
|
||
else:
|
||
vulnerabilities.append("No se encontró registro SPF")
|
||
spoofable = True
|
||
except Exception as e:
|
||
vulnerabilities.append(f"Error al verificar SPF: {str(e)}")
|
||
|
||
# Verificar DMARC
|
||
try:
|
||
answers = dns.resolver.resolve(f'_dmarc.{domain}', 'TXT')
|
||
dmarc_record = next((str(record) for record in answers
|
||
if str(record).startswith('"v=DMARC1')), None)
|
||
|
||
if dmarc_record:
|
||
security_features.append("DMARC")
|
||
security_score += 20
|
||
|
||
# Analizar política DMARC y opciones
|
||
dmarc_policy = re.search(r'p=(reject|quarantine|none)', dmarc_record)
|
||
pct_match = re.search(r'pct=(\d+)', dmarc_record)
|
||
rua_match = re.search(r'rua=([^\s;]+)', dmarc_record)
|
||
ruf_match = re.search(r'ruf=([^\s;]+)', dmarc_record)
|
||
|
||
if dmarc_policy:
|
||
if dmarc_policy.group(1) == 'reject':
|
||
security_features.append("DMARC_ENFORCED")
|
||
dmarc_enforced = True
|
||
dmarc_strength = 3
|
||
security_score += 15
|
||
elif dmarc_policy.group(1) == 'quarantine':
|
||
security_features.append("DMARC_QUARANTINE")
|
||
dmarc_strength = 2
|
||
security_score += 10
|
||
else: # none
|
||
vulnerabilities.append("DMARC en modo monitoreo")
|
||
dmarc_strength = 1
|
||
security_score += 5
|
||
|
||
if not pct_match or int(pct_match.group(1)) < 100:
|
||
vulnerabilities.append("DMARC: No aplicado al 100% de los mensajes")
|
||
spoofable = True
|
||
|
||
if not rua_match and not ruf_match:
|
||
vulnerabilities.append("DMARC: Sin configuración de reportes")
|
||
|
||
if not re.search(r'(adkim|aspf)=s', dmarc_record):
|
||
vulnerabilities.append("DMARC: Modo de alineación relajado")
|
||
else:
|
||
vulnerabilities.append("No se encontró registro DMARC")
|
||
spoofable = True
|
||
except Exception as e:
|
||
vulnerabilities.append(f"Error al verificar DMARC: {str(e)}")
|
||
|
||
# Verificar DKIM
|
||
dkim_selectors = ['default', 'google', 'mail', 'email', 'key1', 'selector1', 'selector2']
|
||
dkim_found = False
|
||
|
||
for selector in dkim_selectors:
|
||
try:
|
||
dns.resolver.resolve(f'{selector}._domainkey.{domain}', 'TXT')
|
||
security_features.append("DKIM")
|
||
security_score += 15
|
||
dkim_found = True
|
||
break
|
||
except:
|
||
continue
|
||
|
||
if not dkim_found:
|
||
vulnerabilities.append("No se encontró registro DKIM")
|
||
|
||
# Evaluación final de riesgo de spoofing
|
||
if spf_strength < 2 or dmarc_strength < 2:
|
||
spoofable = True
|
||
|
||
if len(vulnerabilities) > 3:
|
||
spoofable = True
|
||
|
||
# Verificaciones adicionales para misconfiguraciones comunes
|
||
try:
|
||
answers = dns.resolver.resolve(domain, 'A')
|
||
ip = str(answers[0])
|
||
ptr = dns.resolver.resolve_address(ip)
|
||
if not any(domain in str(r) for r in ptr):
|
||
vulnerabilities.append("Falta de registro PTR válido")
|
||
except:
|
||
pass
|
||
|
||
except Exception as e:
|
||
vulnerabilities.append(f"Error en análisis de seguridad: {str(e)}")
|
||
|
||
return SecurityAnalysis(
|
||
score=min(100, security_score),
|
||
features=security_features,
|
||
vulnerabilities=vulnerabilities,
|
||
spf_strict=spf_strict,
|
||
dmarc_enforced=dmarc_enforced,
|
||
spoofable=spoofable
|
||
)
|
||
|
||
def fingerprint_server(self, response: str) -> Optional[str]:
|
||
server_info = re.search(r'^220.*$', response, re.MULTILINE | re.IGNORECASE)
|
||
if not server_info:
|
||
return None
|
||
|
||
server_info = server_info.group(0)
|
||
|
||
if 'mx.google.com' in server_info.lower() or 'gmail-smtp' in server_info.lower():
|
||
return "Google Mail Infrastructure"
|
||
elif 'microsoft' in server_info.lower():
|
||
return "Microsoft Exchange/Office 365"
|
||
elif 'postfix' in server_info.lower():
|
||
return "Postfix"
|
||
elif 'exim' in server_info.lower():
|
||
return "Exim"
|
||
elif 'sendmail' in server_info.lower():
|
||
return "Sendmail"
|
||
elif 'zimbra' in server_info.lower():
|
||
return "Zimbra"
|
||
|
||
return "Servidor Desconocido"
|
||
|
||
async def validate_email(self, email: str) -> bool:
|
||
if not re.match(r'^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$', email):
|
||
rprint("[red]❌ Formato de correo inválido[/]")
|
||
return False
|
||
|
||
rprint("[green]✓ La sintaxis del correo es correcta[/]")
|
||
|
||
domain = email.split('@')[1]
|
||
|
||
try:
|
||
rprint("[cyan]🔍 Buscando registros MX...[/]")
|
||
mx_records = sorted(
|
||
[(rdata.preference, rdata.exchange.to_text().rstrip('.'))
|
||
for rdata in dns.resolver.resolve(domain, 'MX')],
|
||
key=lambda x: x[0]
|
||
)
|
||
|
||
if not mx_records:
|
||
rprint(f"[red]❌ No se encontraron registros MX para {domain}[/]")
|
||
return False
|
||
|
||
for preference, server in mx_records:
|
||
rprint(f"[green]✓ MX record encontrado: {server} (Prioridad {preference})[/]")
|
||
|
||
if self.verbose:
|
||
security_analysis = await self.analyze_mx_security(domain)
|
||
self.display_security_analysis(domain, security_analysis)
|
||
|
||
primary_mx = mx_records[0][1]
|
||
|
||
if self.stealth:
|
||
time.sleep(self.delay)
|
||
|
||
rprint(f"[cyan]⏳ Iniciando diálogo con {primary_mx}[/]")
|
||
|
||
try:
|
||
with socket.create_connection((primary_mx, 25), timeout=10) as sock:
|
||
sock.settimeout(10)
|
||
|
||
response = sock.recv(1024).decode()
|
||
if self.verbose:
|
||
server_type = self.fingerprint_server(response)
|
||
if server_type:
|
||
rprint(f"[blue]ℹ Servidor detectado: {server_type}[/]")
|
||
|
||
commands = [
|
||
f"HELO email-validator-osint.local\r\n",
|
||
f"MAIL FROM:<validator@email-validator-osint.local>\r\n",
|
||
f"RCPT TO:<{email}>\r\n",
|
||
"QUIT\r\n"
|
||
]
|
||
|
||
for cmd in commands:
|
||
sock.send(cmd.encode())
|
||
response = sock.recv(1024).decode()
|
||
|
||
if self.verbose:
|
||
rprint(f"[cyan]→ {cmd.strip()}[/]")
|
||
rprint(f"[cyan]← {response.strip()}[/]")
|
||
|
||
code = response[:3]
|
||
if code == "550":
|
||
rprint(f"[red]❌ El correo {email} no es válido[/]")
|
||
return False
|
||
elif code not in ["220", "250", "221"]:
|
||
rprint(f"[yellow]⚠ Respuesta inesperada del servidor: {response.strip()}[/]")
|
||
return False
|
||
|
||
rprint(f"[green]✓ El correo {email} es válido[/]")
|
||
return True
|
||
|
||
except Exception as e:
|
||
rprint(f"[red]❌ Error al conectar con el servidor: {str(e)}[/]")
|
||
return False
|
||
|
||
except Exception as e:
|
||
rprint(f"[red]❌ Error: {str(e)}[/]")
|
||
return False
|
||
|
||
def display_security_analysis(self, domain: str, analysis: SecurityAnalysis):
|
||
table = Table(title=f"🔒 Análisis de Seguridad para {domain}")
|
||
|
||
table.add_column("Categoría", style="cyan")
|
||
table.add_column("Detalle", style="white")
|
||
|
||
table.add_row(
|
||
"📊 Puntuación de Seguridad",
|
||
f"{analysis.score}/100"
|
||
)
|
||
|
||
if analysis.features:
|
||
table.add_row(
|
||
"✅ Características de Seguridad",
|
||
"\n".join(analysis.features)
|
||
)
|
||
|
||
if analysis.vulnerabilities:
|
||
table.add_row(
|
||
"⚠️ Vulnerabilidades",
|
||
"\n".join(analysis.vulnerabilities)
|
||
)
|
||
|
||
table.add_row(
|
||
"🎯 Estado de Protecciones",
|
||
f"SPF Estricto: {'✅' if analysis.spf_strict else '❌'}\n"
|
||
f"DMARC Enforced: {'✅' if analysis.dmarc_enforced else '❌'}\n"
|
||
f"Spoofing Posible: {'⚠️ SÍ' if analysis.spoofable else '✅ NO'}"
|
||
)
|
||
|
||
self.console.print(table)
|
||
|
||
async def main():
|
||
parser = argparse.ArgumentParser(description='Email OSINT Validator')
|
||
parser.add_argument('email', help='Email a validar')
|
||
parser.add_argument('-v', '--verbose', action='store_true', help='Modo verbose')
|
||
parser.add_argument('-s', '--stealth', action='store_true', help='Modo sigiloso')
|
||
parser.add_argument('-d', '--delay', type=int, default=0, help='Retraso entre consultas')
|
||
args = parser.parse_args()
|
||
|
||
validator = EmailValidator(verbose=args.verbose, stealth=args.stealth, delay=args.delay)
|
||
validator.show_banner()
|
||
|
||
result = await validator.validate_email(args.email)
|
||
sys.exit(0 if result else 1)
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(main())
|