from flask import Flask, render_template, request, jsonify from flask_socketio import SocketIO, emit from dotenv import load_dotenv import subprocess import uuid import os import requests import hashlib import mysql.connector import filetype from urllib.parse import urlparse from pathlib import Path import logging # Cargar variables de entorno al inicio load_dotenv() app = Flask(__name__) socketio = SocketIO(app, cors_allowed_origins="*") # Configuración de logging logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Configuración de la base de datos DB_CONFIG = { 'host': os.getenv('DB_HOST'), 'user': os.getenv('DB_USER'), 'password': os.getenv('DB_PASSWD'), 'database': os.getenv('DB_NAME'), 'charset': os.getenv('DB_CHARSET'), 'collation': os.getenv('DB_COALLITION') } def get_db_connection(): """Establece y retorna una conexión a la base de datos.""" try: return mysql.connector.connect(**DB_CONFIG) except mysql.connector.Error as err: logger.error(f"Error connecting to database: {err}") raise def get_file_hashes(file_path): """Calcula los hashes de un archivo.""" try: hash_md5 = hashlib.md5() hash_sha1 = hashlib.sha1() hash_sha256 = hashlib.sha256() with open(file_path, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): hash_md5.update(chunk) hash_sha1.update(chunk) hash_sha256.update(chunk) return { "md5": hash_md5.hexdigest(), "sha1": hash_sha1.hexdigest(), "sha256": hash_sha256.hexdigest() } except IOError as e: logger.error(f"Error reading file for hashing: {e}") raise def get_file_type(file_path): """Determina el tipo MIME del archivo.""" try: kind = filetype.guess(file_path) return kind.mime if kind else "application/octet-stream" except Exception as e: logger.error(f"Error determining file type: {e}") return "application/octet-stream" def check_file_in_db(hashes): """Verifica si el archivo ya existe en la base de datos.""" if not all(hashes.values()): return None try: connection = get_db_connection() cursor = connection.cursor(dictionary=True) query = """ SELECT filename, file_type, scan_result, DATE_FORMAT(scan_date, '%Y-%m-%d %H:%i:%s') as scan_date, md5_hash, sha1_hash, sha256_hash FROM file_scans WHERE md5_hash = %(md5)s OR sha1_hash = %(sha1)s OR sha256_hash = %(sha256)s LIMIT 1 """ cursor.execute(query, { 'md5': hashes['md5'], 'sha1': hashes['sha1'], 'sha256': hashes['sha256'] }) result = cursor.fetchone() cursor.close() connection.close() return result except mysql.connector.Error as err: logger.error(f"Database error: {err}") raise def store_file_in_db(filename, hashes, file_type, scan_result): """Almacena los resultados del escaneo en la base de datos.""" if not all([filename, hashes, file_type, scan_result]): raise ValueError("Missing required parameters for database storage") try: connection = get_db_connection() cursor = connection.cursor() query = """ INSERT INTO file_scans (filename, md5_hash, sha1_hash, sha256_hash, file_type, scan_result) VALUES (%(filename)s, %(md5)s, %(sha1)s, %(sha256)s, %(file_type)s, %(scan_result)s) """ cursor.execute(query, { 'filename': filename, 'md5': hashes['md5'], 'sha1': hashes['sha1'], 'sha256': hashes['sha256'], 'file_type': file_type, 'scan_result': scan_result }) connection.commit() cursor.close() connection.close() except mysql.connector.Error as err: logger.error(f"Error storing scan results: {err}") raise @app.route('/') def index(): return render_template('index.html') @app.route('/upload', methods=['POST']) def upload_file(): """Maneja la subida y escaneo de archivos.""" try: if 'file' not in request.files: return jsonify({'error': 'No se recibió ningún archivo'}), 400 file = request.files['file'] if not file.filename: return jsonify({'error': 'Nombre de archivo vacío'}), 400 # Guardar archivo temporalmente y calcular hashes temp_path = Path('/tmp') / f"{uuid.uuid4()}_{file.filename}" file.save(str(temp_path)) # Calcular hashes y tipo de archivo hashes = get_file_hashes(str(temp_path)) file_type = get_file_type(str(temp_path)) # Verificar si existe en la base de datos existing_result = check_file_in_db(hashes) if existing_result: # Si existe, eliminar archivo temporal y devolver resultado existente temp_path.unlink() return jsonify({ 'message': 'Yo a este lo conozco', 'result': { 'filename': existing_result['filename'], 'file_type': existing_result['file_type'], 'scan_date': existing_result['scan_date'], 'scan_result': existing_result['scan_result'], 'hashes': { 'md5': existing_result['md5_hash'], 'sha1': existing_result['sha1_hash'], 'sha256': existing_result['sha256_hash'] } } }) # Si no existe, iniciar escaneo socketio.start_background_task( target=scan_file, file_path=str(temp_path), hashes=hashes, file_type=file_type, filename=file.filename ) return jsonify({ 'message': f'Archivo {file.filename} subido exitosamente. Iniciando escaneo...', 'hashes': hashes }) except Exception as e: logger.error(f"Error in upload_file: {e}") return jsonify({'error': str(e)}), 500 @app.route('/scan_url', methods=['POST']) def scan_url(): """Maneja el escaneo de archivos desde URLs.""" try: url = request.json.get('url') if not url: return jsonify({'error': 'URL no proporcionada'}), 400 # Validar y normalizar URL parsed_url = urlparse(url if '://' in url else f'http://{url}') if not all([parsed_url.scheme, parsed_url.netloc]): return jsonify({'error': 'URL inválida'}), 400 # Descargar archivo try: response = requests.get(parsed_url.geturl(), timeout=30) response.raise_for_status() except requests.exceptions.RequestException as e: return jsonify({'error': f'Error al descargar la URL: {str(e)}'}), 400 # Crear archivo temporal file_path = Path('/tmp') / f"{uuid.uuid4()}_downloaded_content" file_path.write_bytes(response.content) try: # Calcular hashes y tipo de archivo hashes = get_file_hashes(str(file_path)) file_type = get_file_type(str(file_path)) # Obtener nombre del archivo de la URL o usar uno genérico filename = Path(parsed_url.path).name or "downloaded_file" # Verificar si existe en la base de datos existing_result = check_file_in_db(hashes) if existing_result: # Si existe, eliminar archivo temporal y devolver resultado existente file_path.unlink() return jsonify({ 'message': 'Yo a este lo conozco', 'result': { 'filename': existing_result['filename'], 'file_type': existing_result['file_type'], 'scan_date': existing_result['scan_date'], 'scan_result': existing_result['scan_result'], 'hashes': { 'md5': existing_result['md5_hash'], 'sha1': existing_result['sha1_hash'], 'sha256': existing_result['sha256_hash'] } } }) # Si no existe, iniciar escaneo socketio.start_background_task( target=scan_file, file_path=str(file_path), hashes=hashes, file_type=file_type, filename=filename ) return jsonify({ 'message': 'URL descargada. Iniciando análisis...', 'hashes': hashes }) except Exception as e: # Asegurar limpieza del archivo temporal en caso de error if file_path.exists(): file_path.unlink() raise except Exception as e: logger.error(f"Error in scan_url: {e}") return jsonify({'error': 'Error interno del servidor'}), 500 def scan_file(file_path, hashes, file_type, filename): """Ejecuta el escaneo del archivo.""" try: # Ejecutar ClamAV scan_command = ["clamscan", "-r", file_path] process = subprocess.run( scan_command, capture_output=True, text=True, check=True, timeout=300 ) # Procesar y emitir resultado línea por línea scan_output = process.stdout for line in scan_output.split('\n'): if line.strip(): socketio.emit('scan_output', {'data': line + '\n'}) # Almacenar resultado en la base de datos store_file_in_db(filename, hashes, file_type, scan_output) # Emitir mensaje de finalización socketio.emit('scan_output', {'data': '\n--- Escaneo completado ---\n'}) except subprocess.TimeoutExpired: socketio.emit('scan_output', {'data': 'Error: El escaneo excedió el tiempo límite\n'}) except subprocess.CalledProcessError as e: socketio.emit('scan_output', {'data': f'Error en el escaneo: {e.stderr}\n'}) except Exception as e: socketio.emit('scan_output', {'data': f'Error: {str(e)}\n'}) finally: # Limpiar archivo temporal try: Path(file_path).unlink(missing_ok=True) except Exception as e: logger.error(f"Error removing temporary file: {e}") if __name__ == '__main__': # Asegurarse de que el directorio temporal existe Path('/tmp').mkdir(exist_ok=True) # Iniciar la aplicación socketio.run(app, debug=os.getenv('FLASK_DEBUG', 'False').lower() == 'true')