from flask import Flask, render_template, request, jsonify from flask_socketio import SocketIO, emit from dotenv import load_dotenv import subprocess import uuid import os import requests import hashlib import mysql.connector import filetype app = Flask(__name__) socketio = SocketIO(app) def get_file_hashes(file_path): hash_md5 = hashlib.md5() hash_sha1 = hashlib.sha1() hash_sha256 = hashlib.sha256() with open(file_path, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): hash_md5.update(chunk) hash_sha1.update(chunk) hash_sha256.update(chunk) return { "md5": hash_md5.hexdigest(), "sha1": hash_sha1.hexdigest(), "sha256": hash_sha256.hexdigest() } def get_file_type(file_path): kind = filetype.guess(file_path) if kind is None: return "Unknown" return kind.mime def check_file_in_db(hashes): connection = mysql.connector.connect( host=os.getenv('DB_HOST'), user=os.getenv('DB_USER'), password=os.getenv('DB_PASSWD'), database=os.getenv('DB_NAME'), charset=os.getenv('DB_CHARSET'), collation=os.getenv('DB_COALLITION') ) cursor = connection.cursor() query = "SELECT scan_result FROM file_scans WHERE md5_hash = %s OR sha1_hash = %s OR sha256_hash = %s" cursor.execute(query, (hashes["md5"], hashes["sha1"], hashes["sha256"])) result = cursor.fetchone() cursor.close() connection.close() return result def store_file_in_db(filename, hashes, file_type, scan_result): connection = mysql.connector.connect( host=os.getenv('DB_HOST'), user=os.getenv('DB_USER'), password=os.getenv('DB_PASSWD'), database=os.getenv('DB_NAME'), charset=os.getenv('DB_CHARSET'), collation=os.getenv('DB_COALLITION') ) cursor = connection.cursor() query = """ INSERT INTO file_scans (filename, md5_hash, sha1_hash, sha256_hash, file_type, scan_result) VALUES (%s, %s, %s, %s, %s, %s) """ cursor.execute(query, (filename, hashes["md5"], hashes["sha1"], hashes["sha256"], file_type, scan_result)) connection.commit() cursor.close() connection.close() @app.route('/', methods=['GET', 'POST']) def index(): return render_template('index.html') @app.route('/upload', methods=['POST']) def upload_file(): file = request.files.get('file') if file: file_path = os.path.join('/tmp', f"{uuid.uuid4()}_{file.filename}") file.save(file_path) # Obtener los hashes y el tipo de archivo hashes = get_file_hashes(file_path) file_type = get_file_type(file_path) # Verificar si el archivo ya fue escaneado existing_result = check_file_in_db(hashes) if existing_result: os.remove(file_path) # Formatear el resultado del escaneo para que sea más legible scan_result = format_scan_result(existing_result[0]) return jsonify({ 'message': 'Este archivo ya lo he visto antes.', 'scan_result': scan_result }) # Ejecuta el escaneo en un hilo separado para no bloquear la aplicación socketio.start_background_task(target=scan_file, file_path=file_path, hashes=hashes, file_type=file_type, filename=file.filename) return jsonify({'message': 'Archivo subido exitosamente: ' + file.filename}) return jsonify({'error': 'No se recibió ningún archivo'}), 400 @app.route('/scan_url', methods=['POST']) def scan_url(): url = request.json.get('url') if not url: return jsonify({'error': 'No URL provided'}), 400 # Añadir esquema si falta if not url.startswith(('http://', 'https://')): url = 'http://' + url try: # Descargar el contenido de la URL a un archivo temporal response = requests.get(url) if response.status_code != 200: return jsonify({'error': f'Error al descargar la URL: {response.status_code}'}), 400 file_path = os.path.join('/tmp', f"{uuid.uuid4()}_downloaded_content") with open(file_path, 'wb') as temp_file: temp_file.write(response.content) # Obtener los hashes y el tipo de archivo hashes = get_file_hashes(file_path) file_type = get_file_type(file_path) # Verificar si el archivo ya fue escaneado existing_result = check_file_in_db(hashes) if existing_result: os.remove(file_path) # Formatear el resultado del escaneo para que sea más legible scan_result = format_scan_result(existing_result[0]) return jsonify({ 'message': 'Este archivo ya lo he visto antes.', 'scan_result': scan_result }) # Ejecutar el escaneo en un hilo separado para no bloquear la aplicación socketio.start_background_task(target=scan_file, file_path=file_path, hashes=hashes, file_type=file_type, filename=filename) return jsonify({'message': f'Archivo descargado y guardado como {file_path}. El escaneo está en progreso.'}), 200 except Exception as e: return jsonify({'error': str(e)}), 500 def scan_file(file_path, hashes, file_type, filename): try: # Ejecuta el comando de escaneo scan_command = ["clamscan", "-r", file_path] process = subprocess.Popen(scan_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) # Lee la salida y los errores usando communicate() scan_output, error_output = process.communicate() # Verificar el código de retorno del proceso if process.returncode != 0: socketio.emit('scan_output', {'data': f'Error en el escaneo: {error_output}'}) return # Si no hubo errores, emitir la salida del escaneo socketio.emit('scan_output', {'data': scan_output}) # Emitir mensaje de escaneo completo socketio.emit('scan_output', {'data': '--- Escaneo completado ---'}) # Almacenar los hashes, el tipo de archivo y el resultado del escaneo en la base de datos store_file_in_db(filename, hashes, file_type, scan_output) except Exception as e: socketio.emit('scan_output', {'data': f'Error: {str(e)}'}) finally: # Asegúrate de eliminar el archivo temporal después del escaneo if os.path.exists(file_path): os.remove(file_path) def format_scan_result(scan_result): # Formatear el resultado del escaneo para mejor legibilidad formatted_result = scan_result.replace("\n", "
") return formatted_result if __name__ == '__main__': socketio.run(app, host='0.0.0.0', port=5001, debug=os.getenv('FLASK_DEBUG', 'False').lower() == 'true')