diff --git a/app.py b/app.py
index af50022..80af9aa 100644
--- a/app.py
+++ b/app.py
@@ -8,184 +8,319 @@ import requests
import hashlib
import mysql.connector
import filetype
+from urllib.parse import urlparse
+from pathlib import Path
+import logging
+
+# Cargar variables de entorno al inicio
+load_dotenv()
app = Flask(__name__)
-socketio = SocketIO(app)
+socketio = SocketIO(app, cors_allowed_origins="*")
+
+# Configuración de logging
+logging.basicConfig(
+ level=logging.DEBUG,
+ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
+)
+logger = logging.getLogger(__name__)
+
+# Configuración de la base de datos
+DB_CONFIG = {
+ 'host': os.getenv('DB_HOST'),
+ 'user': os.getenv('DB_USER'),
+ 'password': os.getenv('DB_PASSWD'),
+ 'database': os.getenv('DB_NAME'),
+ 'charset': os.getenv('DB_CHARSET'),
+ 'collation': os.getenv('DB_COALLITION')
+}
+
+def get_db_connection():
+ """Establece y retorna una conexión a la base de datos."""
+ try:
+ return mysql.connector.connect(**DB_CONFIG)
+ except mysql.connector.Error as err:
+ logger.error(f"Error connecting to database: {err}")
+ raise
def get_file_hashes(file_path):
- hash_md5 = hashlib.md5()
- hash_sha1 = hashlib.sha1()
- hash_sha256 = hashlib.sha256()
+ """Calcula los hashes de un archivo."""
+ try:
+ hash_md5 = hashlib.md5()
+ hash_sha1 = hashlib.sha1()
+ hash_sha256 = hashlib.sha256()
- with open(file_path, "rb") as f:
- for chunk in iter(lambda: f.read(4096), b""):
- hash_md5.update(chunk)
- hash_sha1.update(chunk)
- hash_sha256.update(chunk)
+ with open(file_path, "rb") as f:
+ for chunk in iter(lambda: f.read(4096), b""):
+ hash_md5.update(chunk)
+ hash_sha1.update(chunk)
+ hash_sha256.update(chunk)
- return {
- "md5": hash_md5.hexdigest(),
- "sha1": hash_sha1.hexdigest(),
- "sha256": hash_sha256.hexdigest()
- }
+ return {
+ "md5": hash_md5.hexdigest(),
+ "sha1": hash_sha1.hexdigest(),
+ "sha256": hash_sha256.hexdigest()
+ }
+ except IOError as e:
+ logger.error(f"Error reading file for hashing: {e}")
+ raise
def get_file_type(file_path):
- kind = filetype.guess(file_path)
- if kind is None:
- return "Unknown"
- return kind.mime
+ """Determina el tipo MIME del archivo."""
+ try:
+ kind = filetype.guess(file_path)
+ return kind.mime if kind else "application/octet-stream"
+ except Exception as e:
+ logger.error(f"Error determining file type: {e}")
+ return "application/octet-stream"
def check_file_in_db(hashes):
- connection = mysql.connector.connect(
- host=os.getenv('DB_HOST'),
- user=os.getenv('DB_USER'),
- password=os.getenv('DB_PASSWD'),
- database=os.getenv('DB_NAME'),
- charset=os.getenv('DB_CHARSET'),
- collation=os.getenv('DB_COALLITION')
- )
- cursor = connection.cursor()
+ """Verifica si el archivo ya existe en la base de datos."""
+ if not all(hashes.values()):
+ return None
+
+ try:
+ connection = get_db_connection()
+ cursor = connection.cursor(dictionary=True)
+
+ query = """
+ SELECT filename, file_type, scan_result,
+ DATE_FORMAT(scan_date, '%Y-%m-%d %H:%i:%s') as scan_date,
+ md5_hash, sha1_hash, sha256_hash
+ FROM file_scans
+ WHERE md5_hash = %(md5)s
+ OR sha1_hash = %(sha1)s
+ OR sha256_hash = %(sha256)s
+ LIMIT 1
+ """
+ cursor.execute(query, {
+ 'md5': hashes['md5'],
+ 'sha1': hashes['sha1'],
+ 'sha256': hashes['sha256']
+ })
+
+ result = cursor.fetchone()
+ cursor.close()
+ connection.close()
- query = "SELECT scan_result FROM file_scans WHERE md5_hash = %s OR sha1_hash = %s OR sha256_hash = %s"
- cursor.execute(query, (hashes["md5"], hashes["sha1"], hashes["sha256"]))
- result = cursor.fetchone()
-
- cursor.close()
- connection.close()
-
- return result
+ return result
+
+ except mysql.connector.Error as err:
+ logger.error(f"Database error: {err}")
+ raise
def store_file_in_db(filename, hashes, file_type, scan_result):
- connection = mysql.connector.connect(
- host=os.getenv('DB_HOST'),
- user=os.getenv('DB_USER'),
- password=os.getenv('DB_PASSWD'),
- database=os.getenv('DB_NAME'),
- charset=os.getenv('DB_CHARSET'),
- collation=os.getenv('DB_COALLITION')
- )
- cursor = connection.cursor()
+ """Almacena los resultados del escaneo en la base de datos."""
+ if not all([filename, hashes, file_type, scan_result]):
+ raise ValueError("Missing required parameters for database storage")
+
+ try:
+ connection = get_db_connection()
+ cursor = connection.cursor()
+
+ query = """
+ INSERT INTO file_scans
+ (filename, md5_hash, sha1_hash, sha256_hash, file_type, scan_result)
+ VALUES (%(filename)s, %(md5)s, %(sha1)s, %(sha256)s, %(file_type)s, %(scan_result)s)
+ """
+
+ cursor.execute(query, {
+ 'filename': filename,
+ 'md5': hashes['md5'],
+ 'sha1': hashes['sha1'],
+ 'sha256': hashes['sha256'],
+ 'file_type': file_type,
+ 'scan_result': scan_result
+ })
+
+ connection.commit()
+ cursor.close()
+ connection.close()
+
+ except mysql.connector.Error as err:
+ logger.error(f"Error storing scan results: {err}")
+ raise
- query = """
- INSERT INTO file_scans (filename, md5_hash, sha1_hash, sha256_hash, file_type, scan_result)
- VALUES (%s, %s, %s, %s, %s, %s)
- """
- cursor.execute(query, (filename, hashes["md5"], hashes["sha1"], hashes["sha256"], file_type, scan_result))
- connection.commit()
-
- cursor.close()
- connection.close()
-
-@app.route('/', methods=['GET', 'POST'])
+@app.route('/')
def index():
return render_template('index.html')
@app.route('/upload', methods=['POST'])
def upload_file():
- file = request.files.get('file')
- if file:
- file_path = os.path.join('/tmp', f"{uuid.uuid4()}_{file.filename}")
- file.save(file_path)
+ """Maneja la subida y escaneo de archivos."""
+ try:
+ if 'file' not in request.files:
+ return jsonify({'error': 'No se recibió ningún archivo'}), 400
+
+ file = request.files['file']
+ if not file.filename:
+ return jsonify({'error': 'Nombre de archivo vacío'}), 400
- # Obtener los hashes y el tipo de archivo
- hashes = get_file_hashes(file_path)
- file_type = get_file_type(file_path)
+ # Guardar archivo temporalmente y calcular hashes
+ temp_path = Path('/tmp') / f"{uuid.uuid4()}_{file.filename}"
+ file.save(str(temp_path))
- # Verificar si el archivo ya fue escaneado
+ # Calcular hashes y tipo de archivo
+ hashes = get_file_hashes(str(temp_path))
+ file_type = get_file_type(str(temp_path))
+
+ # Verificar si existe en la base de datos
existing_result = check_file_in_db(hashes)
+
if existing_result:
- os.remove(file_path)
- # Formatear el resultado del escaneo para que sea más legible
- scan_result = format_scan_result(existing_result[0])
+ # Si existe, eliminar archivo temporal y devolver resultado existente
+ temp_path.unlink()
return jsonify({
- 'message': 'Este archivo ya lo he visto antes.',
- 'scan_result': scan_result
+ 'message': 'Yo a este lo conozco',
+ 'result': {
+ 'filename': existing_result['filename'],
+ 'file_type': existing_result['file_type'],
+ 'scan_date': existing_result['scan_date'],
+ 'scan_result': existing_result['scan_result'],
+ 'hashes': {
+ 'md5': existing_result['md5_hash'],
+ 'sha1': existing_result['sha1_hash'],
+ 'sha256': existing_result['sha256_hash']
+ }
+ }
})
- # Ejecuta el escaneo en un hilo separado para no bloquear la aplicación
- socketio.start_background_task(target=scan_file, file_path=file_path, hashes=hashes, file_type=file_type, filename=file.filename)
+ # Si no existe, iniciar escaneo
+ socketio.start_background_task(
+ target=scan_file,
+ file_path=str(temp_path),
+ hashes=hashes,
+ file_type=file_type,
+ filename=file.filename
+ )
- return jsonify({'message': 'Archivo subido exitosamente: ' + file.filename})
- return jsonify({'error': 'No se recibió ningún archivo'}), 400
+ return jsonify({
+ 'message': f'Archivo {file.filename} subido exitosamente. Iniciando escaneo...',
+ 'hashes': hashes
+ })
+
+ except Exception as e:
+ logger.error(f"Error in upload_file: {e}")
+ return jsonify({'error': str(e)}), 500
@app.route('/scan_url', methods=['POST'])
def scan_url():
- url = request.json.get('url')
- if not url:
- return jsonify({'error': 'No URL provided'}), 400
-
- # Añadir esquema si falta
- if not url.startswith(('http://', 'https://')):
- url = 'http://' + url
-
+ """Maneja el escaneo de archivos desde URLs."""
try:
- # Descargar el contenido de la URL a un archivo temporal
- response = requests.get(url)
- if response.status_code != 200:
- return jsonify({'error': f'Error al descargar la URL: {response.status_code}'}), 400
+ url = request.json.get('url')
+ if not url:
+ return jsonify({'error': 'URL no proporcionada'}), 400
- file_path = os.path.join('/tmp', f"{uuid.uuid4()}_downloaded_content")
- with open(file_path, 'wb') as temp_file:
- temp_file.write(response.content)
+ # Validar y normalizar URL
+ parsed_url = urlparse(url if '://' in url else f'http://{url}')
+ if not all([parsed_url.scheme, parsed_url.netloc]):
+ return jsonify({'error': 'URL inválida'}), 400
- # Obtener los hashes y el tipo de archivo
- hashes = get_file_hashes(file_path)
- file_type = get_file_type(file_path)
+ # Descargar archivo
+ try:
+ response = requests.get(parsed_url.geturl(), timeout=30)
+ response.raise_for_status()
+ except requests.exceptions.RequestException as e:
+ return jsonify({'error': f'Error al descargar la URL: {str(e)}'}), 400
+
+ # Crear archivo temporal
+ file_path = Path('/tmp') / f"{uuid.uuid4()}_downloaded_content"
+ file_path.write_bytes(response.content)
+
+ try:
+ # Calcular hashes y tipo de archivo
+ hashes = get_file_hashes(str(file_path))
+ file_type = get_file_type(str(file_path))
+
+ # Obtener nombre del archivo de la URL o usar uno genérico
+ filename = Path(parsed_url.path).name or "downloaded_file"
+
+ # Verificar si existe en la base de datos
+ existing_result = check_file_in_db(hashes)
+
+ if existing_result:
+ # Si existe, eliminar archivo temporal y devolver resultado existente
+ file_path.unlink()
+ return jsonify({
+ 'message': 'Yo a este lo conozco',
+ 'result': {
+ 'filename': existing_result['filename'],
+ 'file_type': existing_result['file_type'],
+ 'scan_date': existing_result['scan_date'],
+ 'scan_result': existing_result['scan_result'],
+ 'hashes': {
+ 'md5': existing_result['md5_hash'],
+ 'sha1': existing_result['sha1_hash'],
+ 'sha256': existing_result['sha256_hash']
+ }
+ }
+ })
+
+ # Si no existe, iniciar escaneo
+ socketio.start_background_task(
+ target=scan_file,
+ file_path=str(file_path),
+ hashes=hashes,
+ file_type=file_type,
+ filename=filename
+ )
- # Verificar si el archivo ya fue escaneado
- existing_result = check_file_in_db(hashes)
- if existing_result:
- os.remove(file_path)
- # Formatear el resultado del escaneo para que sea más legible
- scan_result = format_scan_result(existing_result[0])
return jsonify({
- 'message': 'Este archivo ya lo he visto antes.',
- 'scan_result': scan_result
+ 'message': 'URL descargada. Iniciando análisis...',
+ 'hashes': hashes
})
- # Ejecutar el escaneo en un hilo separado para no bloquear la aplicación
- socketio.start_background_task(target=scan_file, file_path=file_path, hashes=hashes, file_type=file_type, filename=filename)
+ except Exception as e:
+ # Asegurar limpieza del archivo temporal en caso de error
+ if file_path.exists():
+ file_path.unlink()
+ raise
- return jsonify({'message': f'Archivo descargado y guardado como {file_path}. El escaneo está en progreso.'}), 200
except Exception as e:
- return jsonify({'error': str(e)}), 500
+ logger.error(f"Error in scan_url: {e}")
+ return jsonify({'error': 'Error interno del servidor'}), 500
def scan_file(file_path, hashes, file_type, filename):
+ """Ejecuta el escaneo del archivo."""
try:
- # Ejecuta el comando de escaneo
+ # Ejecutar ClamAV
scan_command = ["clamscan", "-r", file_path]
- process = subprocess.Popen(scan_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
+ process = subprocess.run(
+ scan_command,
+ capture_output=True,
+ text=True,
+ check=True,
+ timeout=300
+ )
- # Lee la salida y los errores usando communicate()
- scan_output, error_output = process.communicate()
+ # Procesar y emitir resultado línea por línea
+ scan_output = process.stdout
+ for line in scan_output.split('\n'):
+ if line.strip():
+ socketio.emit('scan_output', {'data': line + '\n'})
- # Verificar el código de retorno del proceso
- if process.returncode != 0:
- socketio.emit('scan_output', {'data': f'Error en el escaneo: {error_output}'})
- return
-
- # Si no hubo errores, emitir la salida del escaneo
- socketio.emit('scan_output', {'data': scan_output})
-
- # Emitir mensaje de escaneo completo
- socketio.emit('scan_output', {'data': '--- Escaneo completado ---'})
-
- # Almacenar los hashes, el tipo de archivo y el resultado del escaneo en la base de datos
+ # Almacenar resultado en la base de datos
store_file_in_db(filename, hashes, file_type, scan_output)
- except Exception as e:
- socketio.emit('scan_output', {'data': f'Error: {str(e)}'})
- finally:
- # Asegúrate de eliminar el archivo temporal después del escaneo
- if os.path.exists(file_path):
- os.remove(file_path)
+ # Emitir mensaje de finalización
+ socketio.emit('scan_output', {'data': '\n--- Escaneo completado ---\n'})
-def format_scan_result(scan_result):
- # Formatear el resultado del escaneo para mejor legibilidad
- formatted_result = scan_result.replace("\n", "
")
- return formatted_result
+ except subprocess.TimeoutExpired:
+ socketio.emit('scan_output', {'data': 'Error: El escaneo excedió el tiempo límite\n'})
+ except subprocess.CalledProcessError as e:
+ socketio.emit('scan_output', {'data': f'Error en el escaneo: {e.stderr}\n'})
+ except Exception as e:
+ socketio.emit('scan_output', {'data': f'Error: {str(e)}\n'})
+ finally:
+ # Limpiar archivo temporal
+ try:
+ Path(file_path).unlink(missing_ok=True)
+ except Exception as e:
+ logger.error(f"Error removing temporary file: {e}")
if __name__ == '__main__':
- socketio.run(app, host='0.0.0.0', port=5001, debug=os.getenv('FLASK_DEBUG', 'False').lower() == 'true')
-
-
+ # Asegurarse de que el directorio temporal existe
+ Path('/tmp').mkdir(exist_ok=True)
+
+ # Iniciar la aplicación
+ socketio.run(app, debug=os.getenv('FLASK_DEBUG', 'False').lower() == 'true')
diff --git a/templates/index.html b/templates/index.html
index bc27a7e..b3f7072 100644
--- a/templates/index.html
+++ b/templates/index.html
@@ -1,371 +1,599 @@
-
+