cbs-web-antivirus-scanner/app.py

77 lines
2.6 KiB
Python
Raw Normal View History

2024-06-24 07:38:51 -06:00
from flask import Flask, render_template, request, jsonify
from flask_socketio import SocketIO, emit
import subprocess
import uuid
import os
import requests
app = Flask(__name__)
socketio = SocketIO(app)
@app.route('/', methods=['GET', 'POST'])
def index():
return render_template('index.html')
@app.route('/upload', methods=['POST'])
def upload_file():
file = request.files.get('file')
if file:
file_path = os.path.join('/tmp', f"{uuid.uuid4()}_{file.filename}")
file.save(file_path)
# Ejecuta el escaneo en un hilo separado para no bloquear la aplicación
socketio.start_background_task(target=scan_file, file_path=file_path)
return 'Archivo subido exitosamente: ' + file.filename
return 'No se recibió ningún archivo', 400
@app.route('/scan_url', methods=['POST'])
def scan_url():
url = request.json.get('url')
if not url:
return jsonify({'error': 'No URL provided'}), 400
# Añadir esquema si falta
if not url.startswith(('http://', 'https://')):
url = 'http://' + url
try:
# Descargar el contenido de la URL a un archivo temporal
response = requests.get(url)
if response.status_code != 200:
return jsonify({'error': f'Error al descargar la URL: {response.status_code}'}), 400
file_path = os.path.join('/tmp', f"{uuid.uuid4()}_downloaded_content")
with open(file_path, 'wb') as temp_file:
temp_file.write(response.content)
# Ejecutar el escaneo en un hilo separado para no bloquear la aplicación
socketio.start_background_task(target=scan_file, file_path=file_path)
return jsonify({'message': f'Archivo descargado y guardado como {file_path}'}), 200
except Exception as e:
return jsonify({'error': str(e)}), 500
def scan_file(file_path):
try:
scan_command = ["clamscan", "-r", file_path]
process = subprocess.Popen(scan_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
for line in iter(process.stdout.readline, ''):
if line.strip():
socketio.emit('scan_output', {'data': line.strip()})
socketio.emit('scan_output', {'data': '--- Escaneo completado ---'})
process.stdout.close()
process.stderr.close()
process.wait()
except Exception as e:
socketio.emit('scan_output', {'data': f'Error: {str(e)}'})
finally:
if os.path.exists(file_path):
os.remove(file_path)
if __name__ == '__main__':
socketio.run(app, debug=True)