cbs-web-antivirus-scanner/app.py

192 lines
6.7 KiB
Python
Raw Permalink Normal View History

2024-06-24 07:38:51 -06:00
from flask import Flask, render_template, request, jsonify
from flask_socketio import SocketIO, emit
2024-09-07 12:18:02 -06:00
from dotenv import load_dotenv
2024-06-24 07:38:51 -06:00
import subprocess
import uuid
import os
import requests
2024-09-07 12:18:02 -06:00
import hashlib
import mysql.connector
import filetype
2024-06-24 07:38:51 -06:00
app = Flask(__name__)
socketio = SocketIO(app)
2024-09-07 12:18:02 -06:00
def get_file_hashes(file_path):
hash_md5 = hashlib.md5()
hash_sha1 = hashlib.sha1()
hash_sha256 = hashlib.sha256()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
hash_sha1.update(chunk)
hash_sha256.update(chunk)
return {
"md5": hash_md5.hexdigest(),
"sha1": hash_sha1.hexdigest(),
"sha256": hash_sha256.hexdigest()
}
def get_file_type(file_path):
kind = filetype.guess(file_path)
if kind is None:
return "Unknown"
return kind.mime
def check_file_in_db(hashes):
connection = mysql.connector.connect(
host=os.getenv('DB_HOST'),
user=os.getenv('DB_USER'),
password=os.getenv('DB_PASSWD'),
database=os.getenv('DB_NAME'),
charset=os.getenv('DB_CHARSET'),
collation=os.getenv('DB_COALLITION')
)
cursor = connection.cursor()
query = "SELECT scan_result FROM file_scans WHERE md5_hash = %s OR sha1_hash = %s OR sha256_hash = %s"
cursor.execute(query, (hashes["md5"], hashes["sha1"], hashes["sha256"]))
result = cursor.fetchone()
cursor.close()
connection.close()
return result
def store_file_in_db(filename, hashes, file_type, scan_result):
connection = mysql.connector.connect(
host=os.getenv('DB_HOST'),
user=os.getenv('DB_USER'),
password=os.getenv('DB_PASSWD'),
database=os.getenv('DB_NAME'),
charset=os.getenv('DB_CHARSET'),
collation=os.getenv('DB_COALLITION')
)
cursor = connection.cursor()
query = """
INSERT INTO file_scans (filename, md5_hash, sha1_hash, sha256_hash, file_type, scan_result)
VALUES (%s, %s, %s, %s, %s, %s)
"""
cursor.execute(query, (filename, hashes["md5"], hashes["sha1"], hashes["sha256"], file_type, scan_result))
connection.commit()
cursor.close()
connection.close()
2024-06-24 07:38:51 -06:00
@app.route('/', methods=['GET', 'POST'])
def index():
return render_template('index.html')
@app.route('/upload', methods=['POST'])
def upload_file():
file = request.files.get('file')
if file:
file_path = os.path.join('/tmp', f"{uuid.uuid4()}_{file.filename}")
file.save(file_path)
2024-09-07 12:18:02 -06:00
# Obtener los hashes y el tipo de archivo
hashes = get_file_hashes(file_path)
file_type = get_file_type(file_path)
# Verificar si el archivo ya fue escaneado
existing_result = check_file_in_db(hashes)
if existing_result:
os.remove(file_path)
# Formatear el resultado del escaneo para que sea más legible
scan_result = format_scan_result(existing_result[0])
return jsonify({
'message': 'Este archivo ya lo he visto antes.',
'scan_result': scan_result
})
2024-06-24 07:38:51 -06:00
# Ejecuta el escaneo en un hilo separado para no bloquear la aplicación
2024-09-07 12:18:02 -06:00
socketio.start_background_task(target=scan_file, file_path=file_path, hashes=hashes, file_type=file_type, filename=file.filename)
return jsonify({'message': 'Archivo subido exitosamente: ' + file.filename})
return jsonify({'error': 'No se recibió ningún archivo'}), 400
2024-06-24 07:38:51 -06:00
@app.route('/scan_url', methods=['POST'])
def scan_url():
url = request.json.get('url')
if not url:
return jsonify({'error': 'No URL provided'}), 400
# Añadir esquema si falta
if not url.startswith(('http://', 'https://')):
url = 'http://' + url
try:
# Descargar el contenido de la URL a un archivo temporal
response = requests.get(url)
if response.status_code != 200:
return jsonify({'error': f'Error al descargar la URL: {response.status_code}'}), 400
2024-09-07 12:18:02 -06:00
2024-06-24 07:38:51 -06:00
file_path = os.path.join('/tmp', f"{uuid.uuid4()}_downloaded_content")
with open(file_path, 'wb') as temp_file:
temp_file.write(response.content)
2024-09-07 12:18:02 -06:00
# Obtener los hashes y el tipo de archivo
hashes = get_file_hashes(file_path)
file_type = get_file_type(file_path)
# Verificar si el archivo ya fue escaneado
existing_result = check_file_in_db(hashes)
if existing_result:
os.remove(file_path)
# Formatear el resultado del escaneo para que sea más legible
scan_result = format_scan_result(existing_result[0])
return jsonify({
'message': 'Este archivo ya lo he visto antes.',
'scan_result': scan_result
})
2024-06-24 07:38:51 -06:00
# Ejecutar el escaneo en un hilo separado para no bloquear la aplicación
2024-09-07 12:18:02 -06:00
socketio.start_background_task(target=scan_file, file_path=file_path, hashes=hashes, file_type=file_type, filename=filename)
2024-06-24 07:38:51 -06:00
2024-09-07 12:18:02 -06:00
return jsonify({'message': f'Archivo descargado y guardado como {file_path}. El escaneo está en progreso.'}), 200
2024-06-24 07:38:51 -06:00
except Exception as e:
return jsonify({'error': str(e)}), 500
2024-09-07 12:18:02 -06:00
def scan_file(file_path, hashes, file_type, filename):
2024-06-24 07:38:51 -06:00
try:
2024-09-07 12:18:02 -06:00
# Ejecuta el comando de escaneo
2024-06-24 07:38:51 -06:00
scan_command = ["clamscan", "-r", file_path]
process = subprocess.Popen(scan_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
2024-09-07 12:18:02 -06:00
# Lee la salida y los errores usando communicate()
scan_output, error_output = process.communicate()
# Verificar el código de retorno del proceso
if process.returncode != 0:
socketio.emit('scan_output', {'data': f'Error en el escaneo: {error_output}'})
return
# Si no hubo errores, emitir la salida del escaneo
socketio.emit('scan_output', {'data': scan_output})
2024-06-24 07:38:51 -06:00
2024-09-07 12:18:02 -06:00
# Emitir mensaje de escaneo completo
2024-06-24 07:38:51 -06:00
socketio.emit('scan_output', {'data': '--- Escaneo completado ---'})
2024-09-07 12:18:02 -06:00
# Almacenar los hashes, el tipo de archivo y el resultado del escaneo en la base de datos
store_file_in_db(filename, hashes, file_type, scan_output)
2024-06-24 07:38:51 -06:00
except Exception as e:
socketio.emit('scan_output', {'data': f'Error: {str(e)}'})
finally:
2024-09-07 12:18:02 -06:00
# Asegúrate de eliminar el archivo temporal después del escaneo
2024-06-24 07:38:51 -06:00
if os.path.exists(file_path):
os.remove(file_path)
2024-09-07 12:18:02 -06:00
def format_scan_result(scan_result):
# Formatear el resultado del escaneo para mejor legibilidad
formatted_result = scan_result.replace("\n", "<br>")
return formatted_result
2024-06-24 07:38:51 -06:00
if __name__ == '__main__':
2024-11-19 14:19:23 -06:00
socketio.run(app, host='0.0.0.0', port=5001, debug=os.getenv('FLASK_DEBUG', 'False').lower() == 'true')
2024-09-07 12:18:02 -06:00