bots3/bots3.py
2025-04-23 14:45:43 -05:00

487 lines
21 KiB
Python

#!/usr/bin/env python
import telebot
import boto3
from dotenv import load_dotenv
from telebot import types
import os
from datetime import datetime, timedelta
import logging
# Configurar logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
# Para debugging, descomenta esta línea
# logger.setLevel(logging.DEBUG)
load_dotenv()
TELEGRAM_BOT_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN')
MINIO_ENDPOINT = os.getenv('MINIO_ENDPOINT')
MINIO_ACCESS_KEY = os.getenv('MINIO_ACCESS_KEY')
MINIO_SECRET_KEY = os.getenv('MINIO_SECRET_KEY')
# Obtener usuarios autorizados desde variables de entorno
AUTHORIZED_USER_IDS_STR = os.getenv('AUTHORIZED_USER_IDS', '')
AUTHORIZED_USERNAMES_STR = os.getenv('AUTHORIZED_USERNAMES', '')
# Convertir string de IDs a lista de enteros
AUTHORIZED_USER_IDS = []
if AUTHORIZED_USER_IDS_STR:
try:
AUTHORIZED_USER_IDS = [int(user_id.strip()) for user_id in AUTHORIZED_USER_IDS_STR.split(',') if user_id.strip()]
logger.info(f"Cargados {len(AUTHORIZED_USER_IDS)} IDs de usuario autorizados desde variables de entorno")
except ValueError as e:
logger.error(f"Error al convertir IDs de usuario: {e}. Asegúrate de que sean valores numéricos separados por comas.")
# Convertir string de nombres de usuario a lista
AUTHORIZED_USERNAMES = []
if AUTHORIZED_USERNAMES_STR:
AUTHORIZED_USERNAMES = [username.strip() for username in AUTHORIZED_USERNAMES_STR.split(',') if username.strip()]
logger.info(f"Cargados {len(AUTHORIZED_USERNAMES)} nombres de usuario autorizados desde variables de entorno")
# Si no hay usuarios autorizados definidos, mostrar advertencia
if not AUTHORIZED_USER_IDS and not AUTHORIZED_USERNAMES:
logger.warning("⚠️ No se han definido usuarios autorizados. El bot rechazará todas las solicitudes.")
# Importante: Configuración de comandos para el bot
bot = telebot.TeleBot(TELEGRAM_BOT_TOKEN, parse_mode='MARKDOWN')
# Registrar comandos en BotFather
bot.set_my_commands([
telebot.types.BotCommand("start", "Iniciar el bot y mostrar menú principal"),
telebot.types.BotCommand("upload", "Subir un archivo a un bucket"),
telebot.types.BotCommand("list", "Listar archivos en un bucket"),
telebot.types.BotCommand("help", "Mostrar ayuda")
])
BUCKETS = [
('bancoppel', 'Bancoppel'),
('cajadrarroyo', 'Caja Dr.Arroyo'),
('cajapopulardolores', 'Caja Popular Dolores'),
('cajasolidaria', 'Caja Solidaria'),
('condorbs-oss', 'Condorsbs-oss'),
('contingencia', 'Bucket de contingencia'),
('cooperativa', 'Cooperativa'),
('comercializado', 'Comercializado'),
('csguachinango', 'Caja Solidaria Guachinango'),
('cssmh', 'Caja Solidaria San Miguel Huimilpan'),
('financieratamazula', 'Financiera Tamazula'),
('imperialcc', 'Imperialcc'),
('lenocirochin', 'Lenocirochin'),
('mario', 'Mario bro'),
('mizuho', 'Mizuho'),
('mufg', 'Mufg'),
('serfimex', 'Serfimex'),
('tepeyac', 'Tepayac'),
('test', 'Test'),
('testb', 'Testb'),
('tiendapago', 'tiendapago'),
('walmart', 'Walmart'),
]
# Diccionarios para rastrear selecciones y estados de usuario
selected_buckets = {}
user_states = {} # Para rastrear en qué operación está cada usuario
def is_user_authorized(user):
"""Verifica si el usuario está autorizado para usar el bot"""
# Si recibimos un message o un call, extraer el from_user
if hasattr(user, 'from_user'):
user = user.from_user
# Ahora user debería ser directamente el objeto from_user
user_id = user.id
username = user.username
# Comprobar si el ID de usuario está en la lista de autorizados
if user_id in AUTHORIZED_USER_IDS:
logger.debug(f"Usuario {user_id} autorizado por ID")
return True
# Comprobar si el nombre de usuario está en la lista de autorizados
if username and username.lower() in [name.lower() for name in AUTHORIZED_USERNAMES]:
logger.debug(f"Usuario @{username} autorizado por nombre de usuario")
return True
logger.warning(f"Acceso denegado para: ID={user_id}, Username=@{username or 'None'}")
return False
def send_unauthorized_message(message):
"""Envía un mensaje al usuario no autorizado"""
user_info = f"ID: {message.from_user.id}, Username: @{message.from_user.username or 'None'}"
logger.warning(f"Intento de acceso no autorizado: {user_info}")
bot.reply_to(message, "⛔ No estás autorizado para usar este bot. Contacta al administrador si crees que esto es un error.")
@bot.message_handler(commands=['start'])
def start(message):
if not is_user_authorized(message):
send_unauthorized_message(message)
return
logger.info(f"Comando /start recibido de usuario {message.from_user.id} (@{message.from_user.username or 'None'})")
bot.reply_to(message, """
**Hola! Este bot te permite interactuar con tu servidor MinIO S3.**
**¿Qué deseas hacer?**
""", reply_markup=generar_teclado_acciones())
@bot.message_handler(commands=['help'])
def help_message(message):
if not is_user_authorized(message):
send_unauthorized_message(message)
return
logger.info(f"Comando /help recibido de usuario {message.from_user.id}")
bot.reply_to(message, """
**Bienvenido a la ayuda de este bot!**
Comandos disponibles:
- /start - Comenzar a usar el bot
- /upload - Subir un archivo
- /list - Listar archivos en un bucket
- /help - Mostrar este mensaje de ayuda
Para subir un archivo:
- Usa /upload y selecciona un bucket, luego envía el archivo.
Para obtener un enlace de descarga:
- Usa /list para ver los archivos disponibles y selecciona uno.
""")
@bot.message_handler(commands=['upload'])
def upload_command(message):
if not is_user_authorized(message):
send_unauthorized_message(message)
return
logger.info(f"Comando /upload recibido de usuario {message.from_user.id}")
chat_id = message.chat.id
user_states[chat_id] = 'selecting_bucket_for_upload'
bot.send_message(chat_id, "**Selecciona un bucket para subir tu archivo:**", reply_markup=generar_teclado_buckets('upload'))
@bot.message_handler(commands=['list'])
def list_command(message):
if not is_user_authorized(message):
send_unauthorized_message(message)
return
logger.info(f"Comando /list recibido de usuario {message.from_user.id}")
chat_id = message.chat.id
user_states[chat_id] = 'selecting_bucket_for_list'
bot.send_message(chat_id, "**Selecciona un bucket para ver sus archivos:**", reply_markup=generar_teclado_buckets('list'))
@bot.callback_query_handler(func=lambda call: call.data.startswith(('upload_', 'list_', 'download_', 'action_')))
def handle_callback(call):
chat_id = call.message.chat.id
# Verificar que el usuario esté autorizado usando call.from_user
if not is_user_authorized(call.from_user):
bot.answer_callback_query(callback_query_id=call.id, text="No estás autorizado para usar este bot", show_alert=True)
return
logger.debug(f"Callback recibido: {call.data} de usuario {call.from_user.id}")
# Manejar selección de acción
if call.data.startswith('action_'):
action = call.data.split('_')[1]
if action == 'upload':
user_states[chat_id] = 'selecting_bucket_for_upload'
bot.edit_message_text("**Selecciona un bucket para subir tu archivo:**",
chat_id=chat_id,
message_id=call.message.message_id,
reply_markup=generar_teclado_buckets('upload'))
elif action == 'list':
user_states[chat_id] = 'selecting_bucket_for_list'
bot.edit_message_text("**Selecciona un bucket para ver sus archivos:**",
chat_id=chat_id,
message_id=call.message.message_id,
reply_markup=generar_teclado_buckets('list'))
elif action == 'start':
# Manejar el regreso al menú principal
user_states[chat_id] = None
bot.edit_message_text("**Hola! Este bot te permite interactuar con tu servidor MinIO S3.**\n\n**¿Qué deseas hacer?**",
chat_id=chat_id,
message_id=call.message.message_id,
reply_markup=generar_teclado_acciones())
bot.answer_callback_query(callback_query_id=call.id, text=f'Seleccionado: {action}')
return
# Manejar selección de bucket para subir archivo
if call.data.startswith('upload_'):
bucket_name = call.data[7:] # Quitar 'upload_' del principio
selected_buckets[chat_id] = bucket_name
user_states[chat_id] = 'waiting_for_file'
bot.answer_callback_query(callback_query_id=call.id, text=f'Bucket seleccionado: {bucket_name}')
bot.send_message(chat_id, f"Bucket seleccionado: **{bucket_name}**\n\nAhora envíame el archivo que quieres almacenar.")
# Manejar selección de bucket para listar archivos
elif call.data.startswith('list_'):
bucket_name = call.data[5:] # Quitar 'list_' del principio
selected_buckets[chat_id] = bucket_name
list_bucket_contents(chat_id, bucket_name, call.message.message_id)
bot.answer_callback_query(callback_query_id=call.id, text=f'Listando bucket: {bucket_name}')
# Manejar selección de archivo para descargar
elif call.data.startswith('download_'):
parts = call.data.split('_', 2) # Máximo 2 divisiones
bucket_name = parts[1]
file_key = parts[2]
generate_download_link(chat_id, bucket_name, file_key)
bot.answer_callback_query(callback_query_id=call.id, text=f'Generando enlace para: {file_key}')
@bot.message_handler(content_types=['document'])
def handle_document(message):
try:
# Verificar si el usuario está autorizado
if not is_user_authorized(message):
send_unauthorized_message(message)
return
chat_id = message.chat.id
# Verificar si el usuario está en el estado correcto
if chat_id not in user_states or user_states[chat_id] != 'waiting_for_file':
bot.reply_to(message, "Por favor selecciona un bucket primero usando /upload.")
return
if chat_id not in selected_buckets:
bot.reply_to(message, "Por favor selecciona un bucket primero.")
return
file_info = bot.get_file(message.document.file_id)
file_buffer = bot.download_file(file_info.file_path) # Descargar archivo en memoria
file_bytes = bytes(file_buffer) # Convertir a bytes
filename = message.document.file_name
bucket_name = selected_buckets[chat_id]
s3 = boto3.client('s3', endpoint_url=MINIO_ENDPOINT,
aws_access_key_id=MINIO_ACCESS_KEY,
aws_secret_access_key=MINIO_SECRET_KEY)
# Registrar la operación
logger.info(f"Usuario {message.from_user.id} (@{message.from_user.username}) subiendo archivo {filename} al bucket {bucket_name}")
# Define los metadatos que deseas adjuntar al archivo
metadata = {
'user': str(message.from_user.id),
'username': message.from_user.username or "sin_username",
'file_name': filename,
'content_type': message.document.mime_type,
'upload_date': datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
# Sube el archivo con los metadatos definidos
s3.put_object(Bucket=bucket_name, Key=filename, Body=file_bytes, Metadata=metadata)
bot.reply_to(message, f"✅ Archivo **{filename}** subido exitosamente al bucket **{bucket_name}**")
# Generar enlace de descarga temporal automáticamente después de subir
generate_download_link(chat_id, bucket_name, filename)
# Reiniciar el estado del usuario
user_states[chat_id] = None
except Exception as e:
logger.error(f"Error al subir archivo: {str(e)}")
bot.reply_to(message, f"❌ Error al subir el archivo: {e}")
def list_bucket_contents(chat_id, bucket_name, message_id=None):
try:
# Registrar la operación
logger.info(f"Listando contenido del bucket {bucket_name} para el usuario {chat_id}")
s3 = boto3.client('s3', endpoint_url=MINIO_ENDPOINT,
aws_access_key_id=MINIO_ACCESS_KEY,
aws_secret_access_key=MINIO_SECRET_KEY)
response = s3.list_objects_v2(Bucket=bucket_name)
if 'Contents' in response and response['Contents']:
# Crear un teclado con los archivos
keyboard = types.InlineKeyboardMarkup()
for obj in response['Contents']:
file_key = obj['Key']
file_size = format_size(obj['Size'])
last_modified = obj['LastModified'].strftime("%Y-%m-%d")
button_text = f"{file_key} ({file_size}) - {last_modified}"
button_data = f"download_{bucket_name}_{file_key}"
# Asegurarnos de que el callback_data no exceda los 64 bytes
if len(button_data) > 64:
# Truncar el nombre del archivo si es necesario
max_key_length = 64 - len(f"download_{bucket_name}_") - 3 # 3 para "..."
truncated_key = file_key[:max_key_length] + "..."
button_text = f"{truncated_key} ({file_size}) - {last_modified}"
button_data = f"download_{bucket_name}_{file_key[:max_key_length]}"
keyboard.add(types.InlineKeyboardButton(text=button_text, callback_data=button_data))
# Agregar botón para volver
keyboard.add(types.InlineKeyboardButton("⬅️ Volver", callback_data="action_list"))
message_text = f"📁 Archivos en el bucket **{bucket_name}**:\n\nSelecciona un archivo para generar un enlace de descarga:"
if message_id:
bot.edit_message_text(message_text, chat_id=chat_id, message_id=message_id, reply_markup=keyboard)
else:
bot.send_message(chat_id, message_text, reply_markup=keyboard)
else:
if message_id:
bot.edit_message_text(f"📂 El bucket **{bucket_name}** está vacío.",
chat_id=chat_id,
message_id=message_id,
reply_markup=types.InlineKeyboardMarkup().add(
types.InlineKeyboardButton("⬅️ Volver", callback_data="action_list")
))
else:
bot.send_message(chat_id, f"📂 El bucket **{bucket_name}** está vacío.",
reply_markup=types.InlineKeyboardMarkup().add(
types.InlineKeyboardButton("⬅️ Volver", callback_data="action_list")
))
except Exception as e:
logger.error(f"Error al listar contenido del bucket {bucket_name}: {str(e)}")
error_message = f"❌ Error al listar archivos: {str(e)}"
if message_id:
bot.edit_message_text(error_message, chat_id=chat_id, message_id=message_id)
else:
bot.send_message(chat_id, error_message)
def generate_download_link(chat_id, bucket_name, file_key):
try:
# Registrar la operación
logger.info(f"Generando enlace para archivo {file_key} en bucket {bucket_name} para usuario {chat_id}")
s3 = boto3.client('s3', endpoint_url=MINIO_ENDPOINT,
aws_access_key_id=MINIO_ACCESS_KEY,
aws_secret_access_key=MINIO_SECRET_KEY)
# Verificar que el archivo existe antes de generar el enlace
try:
s3.head_object(Bucket=bucket_name, Key=file_key)
except s3.exceptions.ClientError as e:
if e.response['Error']['Code'] == '404':
bot.send_message(chat_id, f"❌ Error: El archivo {file_key} no existe en el bucket {bucket_name}.")
return
else:
raise
# Generar enlace válido por 12 horas (43200 segundos)
url = s3.generate_presigned_url(
'get_object',
Params={'Bucket': bucket_name, 'Key': file_key},
ExpiresIn=43200
)
# Calcular la fecha de expiración
expiration_time = datetime.now() + timedelta(hours=12)
expiration_str = expiration_time.strftime("%Y-%m-%d %H:%M:%S")
# Enviar mensaje con el enlace
message = (f"🔗 **Enlace de descarga para {file_key}**\n\n"
f"📍 Bucket: **{bucket_name}**\n"
f"⏱️ Enlace válido hasta: **{expiration_str}**\n\n"
f"{url}")
bot.send_message(chat_id, message)
except Exception as e:
logger.error(f"Error al generar enlace para {file_key}: {str(e)}")
bot.send_message(chat_id, f"❌ Error al generar el enlace de descarga: {str(e)}")
def format_size(size_bytes):
"""Formato legible para el tamaño de archivo"""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if size_bytes < 1024.0:
return f"{size_bytes:.2f} {unit}"
size_bytes /= 1024.0
return f"{size_bytes:.2f} PB"
def generar_teclado_acciones():
"""Genera el teclado de acciones principales"""
keyboard = types.InlineKeyboardMarkup(row_width=2)
keyboard.add(
types.InlineKeyboardButton("📤 Subir archivo", callback_data="action_upload"),
types.InlineKeyboardButton("📋 Listar archivos", callback_data="action_list")
)
return keyboard
def generar_teclado_buckets(action_prefix):
"""Genera el teclado de buckets con un prefijo de acción"""
keyboard = types.InlineKeyboardMarkup()
buckets_row = []
for bucket_name, bucket_text in BUCKETS:
callback_data = f"{action_prefix}_{bucket_name}"
buckets_row.append(types.InlineKeyboardButton(bucket_text, callback_data=callback_data))
if len(buckets_row) == 3:
keyboard.row(*buckets_row)
buckets_row = []
if buckets_row:
keyboard.row(*buckets_row)
# Agregar botón para volver
keyboard.row(types.InlineKeyboardButton("⬅️ Volver", callback_data="action_start"))
return keyboard
@bot.callback_query_handler(func=lambda call: call.data == "action_start")
def handle_start_action(call):
chat_id = call.message.chat.id
# Verificar que el usuario esté autorizado usando call.from_user
if not is_user_authorized(call.from_user):
bot.answer_callback_query(callback_query_id=call.id, text="No estás autorizado para usar este bot", show_alert=True)
return
bot.edit_message_text(
"**Hola! Este bot te permite interactuar con tu servidor MinIO S3.**\n\n**¿Qué deseas hacer?**",
chat_id=chat_id,
message_id=call.message.message_id,
reply_markup=generar_teclado_acciones()
)
bot.answer_callback_query(callback_query_id=call.id, text="Menú principal")
@bot.message_handler(func=lambda message: True)
def handle_unknown_command(message):
"""Maneja cualquier mensaje que no coincida con los comandos anteriores"""
if not is_user_authorized(message):
send_unauthorized_message(message)
return
logger.debug(f"Mensaje no reconocido: {message.text} de usuario {message.from_user.id}")
# Si el usuario está en estado de espera para un archivo, recordarle que envíe un documento
chat_id = message.chat.id
if chat_id in user_states and user_states[chat_id] == 'waiting_for_file':
bot.reply_to(message, "Por favor, envía un archivo para subir al bucket seleccionado.")
return
# Para cualquier otro mensaje, mostrar ayuda
bot.reply_to(message, "Comando no reconocido. Usa /help para ver los comandos disponibles.")
if __name__ == "__main__":
logger.info("Bot iniciado. Esperando mensajes...")
# Imprimir mensaje de arranque con lista de usuarios autorizados
auth_ids = ", ".join([str(uid) for uid in AUTHORIZED_USER_IDS])
auth_names = ", ".join(AUTHORIZED_USERNAMES)
logger.info(f"Usuarios autorizados por ID: {auth_ids}")
logger.info(f"Usuarios autorizados por nombre: {auth_names}")
try:
# Intentar iniciar el bot con manejo de errores
bot.polling(none_stop=True, interval=0)
except Exception as e:
logger.error(f"Error al iniciar el bot: {str(e)}")