From e5be0ff9593212f866d0d20e4da76fd867507592 Mon Sep 17 00:00:00 2001 From: Kevin Munoz Date: Tue, 19 Nov 2024 16:05:21 -0500 Subject: [PATCH] =?UTF-8?q?feat:=20Actualizaci=C3=B3n=20mayor=20a=20versi?= =?UTF-8?q?=C3=B3n=202.0.0=20-=20Nueva=20interfaz=20cibern=C3=A9tica=20con?= =?UTF-8?q?=20efectos=20visuales=20-=20Sistema=20de=20escritura=20tipo=20t?= =?UTF-8?q?erminal=20-=20Panel=20de=20estad=C3=ADsticas=20en=20tiempo=20re?= =?UTF-8?q?al=20-=20Mejor=20manejo=20de=20amenazas=20-=20Efectos=20visuale?= =?UTF-8?q?s=20y=20animaciones=20mejoradas=20-=20Optimizaci=C3=B3n=20del?= =?UTF-8?q?=20sistema=20de=20escaneo?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app.py | 397 ++++++++++++------- templates/index.html | 888 +++++++++++++++++++++++++++---------------- 2 files changed, 824 insertions(+), 461 deletions(-) diff --git a/app.py b/app.py index af50022..80af9aa 100644 --- a/app.py +++ b/app.py @@ -8,184 +8,319 @@ import requests import hashlib import mysql.connector import filetype +from urllib.parse import urlparse +from pathlib import Path +import logging + +# Cargar variables de entorno al inicio +load_dotenv() app = Flask(__name__) -socketio = SocketIO(app) +socketio = SocketIO(app, cors_allowed_origins="*") + +# Configuración de logging +logging.basicConfig( + level=logging.DEBUG, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + +# Configuración de la base de datos +DB_CONFIG = { + 'host': os.getenv('DB_HOST'), + 'user': os.getenv('DB_USER'), + 'password': os.getenv('DB_PASSWD'), + 'database': os.getenv('DB_NAME'), + 'charset': os.getenv('DB_CHARSET'), + 'collation': os.getenv('DB_COALLITION') +} + +def get_db_connection(): + """Establece y retorna una conexión a la base de datos.""" + try: + return mysql.connector.connect(**DB_CONFIG) + except mysql.connector.Error as err: + logger.error(f"Error connecting to database: {err}") + raise def get_file_hashes(file_path): - hash_md5 = hashlib.md5() - hash_sha1 = hashlib.sha1() - hash_sha256 = hashlib.sha256() + """Calcula los hashes de un archivo.""" + try: + hash_md5 = hashlib.md5() + hash_sha1 = hashlib.sha1() + hash_sha256 = hashlib.sha256() - with open(file_path, "rb") as f: - for chunk in iter(lambda: f.read(4096), b""): - hash_md5.update(chunk) - hash_sha1.update(chunk) - hash_sha256.update(chunk) + with open(file_path, "rb") as f: + for chunk in iter(lambda: f.read(4096), b""): + hash_md5.update(chunk) + hash_sha1.update(chunk) + hash_sha256.update(chunk) - return { - "md5": hash_md5.hexdigest(), - "sha1": hash_sha1.hexdigest(), - "sha256": hash_sha256.hexdigest() - } + return { + "md5": hash_md5.hexdigest(), + "sha1": hash_sha1.hexdigest(), + "sha256": hash_sha256.hexdigest() + } + except IOError as e: + logger.error(f"Error reading file for hashing: {e}") + raise def get_file_type(file_path): - kind = filetype.guess(file_path) - if kind is None: - return "Unknown" - return kind.mime + """Determina el tipo MIME del archivo.""" + try: + kind = filetype.guess(file_path) + return kind.mime if kind else "application/octet-stream" + except Exception as e: + logger.error(f"Error determining file type: {e}") + return "application/octet-stream" def check_file_in_db(hashes): - connection = mysql.connector.connect( - host=os.getenv('DB_HOST'), - user=os.getenv('DB_USER'), - password=os.getenv('DB_PASSWD'), - database=os.getenv('DB_NAME'), - charset=os.getenv('DB_CHARSET'), - collation=os.getenv('DB_COALLITION') - ) - cursor = connection.cursor() + """Verifica si el archivo ya existe en la base de datos.""" + if not all(hashes.values()): + return None + + try: + connection = get_db_connection() + cursor = connection.cursor(dictionary=True) + + query = """ + SELECT filename, file_type, scan_result, + DATE_FORMAT(scan_date, '%Y-%m-%d %H:%i:%s') as scan_date, + md5_hash, sha1_hash, sha256_hash + FROM file_scans + WHERE md5_hash = %(md5)s + OR sha1_hash = %(sha1)s + OR sha256_hash = %(sha256)s + LIMIT 1 + """ + cursor.execute(query, { + 'md5': hashes['md5'], + 'sha1': hashes['sha1'], + 'sha256': hashes['sha256'] + }) + + result = cursor.fetchone() + cursor.close() + connection.close() - query = "SELECT scan_result FROM file_scans WHERE md5_hash = %s OR sha1_hash = %s OR sha256_hash = %s" - cursor.execute(query, (hashes["md5"], hashes["sha1"], hashes["sha256"])) - result = cursor.fetchone() - - cursor.close() - connection.close() - - return result + return result + + except mysql.connector.Error as err: + logger.error(f"Database error: {err}") + raise def store_file_in_db(filename, hashes, file_type, scan_result): - connection = mysql.connector.connect( - host=os.getenv('DB_HOST'), - user=os.getenv('DB_USER'), - password=os.getenv('DB_PASSWD'), - database=os.getenv('DB_NAME'), - charset=os.getenv('DB_CHARSET'), - collation=os.getenv('DB_COALLITION') - ) - cursor = connection.cursor() + """Almacena los resultados del escaneo en la base de datos.""" + if not all([filename, hashes, file_type, scan_result]): + raise ValueError("Missing required parameters for database storage") + + try: + connection = get_db_connection() + cursor = connection.cursor() + + query = """ + INSERT INTO file_scans + (filename, md5_hash, sha1_hash, sha256_hash, file_type, scan_result) + VALUES (%(filename)s, %(md5)s, %(sha1)s, %(sha256)s, %(file_type)s, %(scan_result)s) + """ + + cursor.execute(query, { + 'filename': filename, + 'md5': hashes['md5'], + 'sha1': hashes['sha1'], + 'sha256': hashes['sha256'], + 'file_type': file_type, + 'scan_result': scan_result + }) + + connection.commit() + cursor.close() + connection.close() + + except mysql.connector.Error as err: + logger.error(f"Error storing scan results: {err}") + raise - query = """ - INSERT INTO file_scans (filename, md5_hash, sha1_hash, sha256_hash, file_type, scan_result) - VALUES (%s, %s, %s, %s, %s, %s) - """ - cursor.execute(query, (filename, hashes["md5"], hashes["sha1"], hashes["sha256"], file_type, scan_result)) - connection.commit() - - cursor.close() - connection.close() - -@app.route('/', methods=['GET', 'POST']) +@app.route('/') def index(): return render_template('index.html') @app.route('/upload', methods=['POST']) def upload_file(): - file = request.files.get('file') - if file: - file_path = os.path.join('/tmp', f"{uuid.uuid4()}_{file.filename}") - file.save(file_path) + """Maneja la subida y escaneo de archivos.""" + try: + if 'file' not in request.files: + return jsonify({'error': 'No se recibió ningún archivo'}), 400 + + file = request.files['file'] + if not file.filename: + return jsonify({'error': 'Nombre de archivo vacío'}), 400 - # Obtener los hashes y el tipo de archivo - hashes = get_file_hashes(file_path) - file_type = get_file_type(file_path) + # Guardar archivo temporalmente y calcular hashes + temp_path = Path('/tmp') / f"{uuid.uuid4()}_{file.filename}" + file.save(str(temp_path)) - # Verificar si el archivo ya fue escaneado + # Calcular hashes y tipo de archivo + hashes = get_file_hashes(str(temp_path)) + file_type = get_file_type(str(temp_path)) + + # Verificar si existe en la base de datos existing_result = check_file_in_db(hashes) + if existing_result: - os.remove(file_path) - # Formatear el resultado del escaneo para que sea más legible - scan_result = format_scan_result(existing_result[0]) + # Si existe, eliminar archivo temporal y devolver resultado existente + temp_path.unlink() return jsonify({ - 'message': 'Este archivo ya lo he visto antes.', - 'scan_result': scan_result + 'message': 'Yo a este lo conozco', + 'result': { + 'filename': existing_result['filename'], + 'file_type': existing_result['file_type'], + 'scan_date': existing_result['scan_date'], + 'scan_result': existing_result['scan_result'], + 'hashes': { + 'md5': existing_result['md5_hash'], + 'sha1': existing_result['sha1_hash'], + 'sha256': existing_result['sha256_hash'] + } + } }) - # Ejecuta el escaneo en un hilo separado para no bloquear la aplicación - socketio.start_background_task(target=scan_file, file_path=file_path, hashes=hashes, file_type=file_type, filename=file.filename) + # Si no existe, iniciar escaneo + socketio.start_background_task( + target=scan_file, + file_path=str(temp_path), + hashes=hashes, + file_type=file_type, + filename=file.filename + ) - return jsonify({'message': 'Archivo subido exitosamente: ' + file.filename}) - return jsonify({'error': 'No se recibió ningún archivo'}), 400 + return jsonify({ + 'message': f'Archivo {file.filename} subido exitosamente. Iniciando escaneo...', + 'hashes': hashes + }) + + except Exception as e: + logger.error(f"Error in upload_file: {e}") + return jsonify({'error': str(e)}), 500 @app.route('/scan_url', methods=['POST']) def scan_url(): - url = request.json.get('url') - if not url: - return jsonify({'error': 'No URL provided'}), 400 - - # Añadir esquema si falta - if not url.startswith(('http://', 'https://')): - url = 'http://' + url - + """Maneja el escaneo de archivos desde URLs.""" try: - # Descargar el contenido de la URL a un archivo temporal - response = requests.get(url) - if response.status_code != 200: - return jsonify({'error': f'Error al descargar la URL: {response.status_code}'}), 400 + url = request.json.get('url') + if not url: + return jsonify({'error': 'URL no proporcionada'}), 400 - file_path = os.path.join('/tmp', f"{uuid.uuid4()}_downloaded_content") - with open(file_path, 'wb') as temp_file: - temp_file.write(response.content) + # Validar y normalizar URL + parsed_url = urlparse(url if '://' in url else f'http://{url}') + if not all([parsed_url.scheme, parsed_url.netloc]): + return jsonify({'error': 'URL inválida'}), 400 - # Obtener los hashes y el tipo de archivo - hashes = get_file_hashes(file_path) - file_type = get_file_type(file_path) + # Descargar archivo + try: + response = requests.get(parsed_url.geturl(), timeout=30) + response.raise_for_status() + except requests.exceptions.RequestException as e: + return jsonify({'error': f'Error al descargar la URL: {str(e)}'}), 400 + + # Crear archivo temporal + file_path = Path('/tmp') / f"{uuid.uuid4()}_downloaded_content" + file_path.write_bytes(response.content) + + try: + # Calcular hashes y tipo de archivo + hashes = get_file_hashes(str(file_path)) + file_type = get_file_type(str(file_path)) + + # Obtener nombre del archivo de la URL o usar uno genérico + filename = Path(parsed_url.path).name or "downloaded_file" + + # Verificar si existe en la base de datos + existing_result = check_file_in_db(hashes) + + if existing_result: + # Si existe, eliminar archivo temporal y devolver resultado existente + file_path.unlink() + return jsonify({ + 'message': 'Yo a este lo conozco', + 'result': { + 'filename': existing_result['filename'], + 'file_type': existing_result['file_type'], + 'scan_date': existing_result['scan_date'], + 'scan_result': existing_result['scan_result'], + 'hashes': { + 'md5': existing_result['md5_hash'], + 'sha1': existing_result['sha1_hash'], + 'sha256': existing_result['sha256_hash'] + } + } + }) + + # Si no existe, iniciar escaneo + socketio.start_background_task( + target=scan_file, + file_path=str(file_path), + hashes=hashes, + file_type=file_type, + filename=filename + ) - # Verificar si el archivo ya fue escaneado - existing_result = check_file_in_db(hashes) - if existing_result: - os.remove(file_path) - # Formatear el resultado del escaneo para que sea más legible - scan_result = format_scan_result(existing_result[0]) return jsonify({ - 'message': 'Este archivo ya lo he visto antes.', - 'scan_result': scan_result + 'message': 'URL descargada. Iniciando análisis...', + 'hashes': hashes }) - # Ejecutar el escaneo en un hilo separado para no bloquear la aplicación - socketio.start_background_task(target=scan_file, file_path=file_path, hashes=hashes, file_type=file_type, filename=filename) + except Exception as e: + # Asegurar limpieza del archivo temporal en caso de error + if file_path.exists(): + file_path.unlink() + raise - return jsonify({'message': f'Archivo descargado y guardado como {file_path}. El escaneo está en progreso.'}), 200 except Exception as e: - return jsonify({'error': str(e)}), 500 + logger.error(f"Error in scan_url: {e}") + return jsonify({'error': 'Error interno del servidor'}), 500 def scan_file(file_path, hashes, file_type, filename): + """Ejecuta el escaneo del archivo.""" try: - # Ejecuta el comando de escaneo + # Ejecutar ClamAV scan_command = ["clamscan", "-r", file_path] - process = subprocess.Popen(scan_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) + process = subprocess.run( + scan_command, + capture_output=True, + text=True, + check=True, + timeout=300 + ) - # Lee la salida y los errores usando communicate() - scan_output, error_output = process.communicate() + # Procesar y emitir resultado línea por línea + scan_output = process.stdout + for line in scan_output.split('\n'): + if line.strip(): + socketio.emit('scan_output', {'data': line + '\n'}) - # Verificar el código de retorno del proceso - if process.returncode != 0: - socketio.emit('scan_output', {'data': f'Error en el escaneo: {error_output}'}) - return - - # Si no hubo errores, emitir la salida del escaneo - socketio.emit('scan_output', {'data': scan_output}) - - # Emitir mensaje de escaneo completo - socketio.emit('scan_output', {'data': '--- Escaneo completado ---'}) - - # Almacenar los hashes, el tipo de archivo y el resultado del escaneo en la base de datos + # Almacenar resultado en la base de datos store_file_in_db(filename, hashes, file_type, scan_output) - except Exception as e: - socketio.emit('scan_output', {'data': f'Error: {str(e)}'}) - finally: - # Asegúrate de eliminar el archivo temporal después del escaneo - if os.path.exists(file_path): - os.remove(file_path) + # Emitir mensaje de finalización + socketio.emit('scan_output', {'data': '\n--- Escaneo completado ---\n'}) -def format_scan_result(scan_result): - # Formatear el resultado del escaneo para mejor legibilidad - formatted_result = scan_result.replace("\n", "
") - return formatted_result + except subprocess.TimeoutExpired: + socketio.emit('scan_output', {'data': 'Error: El escaneo excedió el tiempo límite\n'}) + except subprocess.CalledProcessError as e: + socketio.emit('scan_output', {'data': f'Error en el escaneo: {e.stderr}\n'}) + except Exception as e: + socketio.emit('scan_output', {'data': f'Error: {str(e)}\n'}) + finally: + # Limpiar archivo temporal + try: + Path(file_path).unlink(missing_ok=True) + except Exception as e: + logger.error(f"Error removing temporary file: {e}") if __name__ == '__main__': - socketio.run(app, host='0.0.0.0', port=5001, debug=os.getenv('FLASK_DEBUG', 'False').lower() == 'true') - - + # Asegurarse de que el directorio temporal existe + Path('/tmp').mkdir(exist_ok=True) + + # Iniciar la aplicación + socketio.run(app, debug=os.getenv('FLASK_DEBUG', 'False').lower() == 'true') diff --git a/templates/index.html b/templates/index.html index bc27a7e..b3f7072 100644 --- a/templates/index.html +++ b/templates/index.html @@ -1,371 +1,599 @@ - + - Condor Business Solutions SecureScan - - - + + CBS CyberScan | Terminal Web de Escaneo + +
-
-

Condor Business Solutions CyberScan|Terminal Web de Escaneo

- - - - - -
-