Files
bots3/bots3.py
Kevin Muñoz 616b9b49f5 fix: implement pagination to resolve Telegram API 'reply markup too long' error
- Add pagination system with 8 files per page to avoid API limits
- Implement file name truncation for display (max 30 chars)
- Add safe callback_data generation to respect 64-byte limit
- Introduce file caching to improve performance and reduce MinIO calls
- Add navigation buttons (Previous/Next) for multi-page file lists
- Display pagination info (current page, total pages, file range)
- Preserve all security and authorization checks
- Maintain detailed logging for audit purposes

Fixes issue where buckets with many files caused Telegram API errors
due to inline keyboard size exceeding platform limits.
2025-06-23 10:57:46 -05:00

569 lines
24 KiB
Python

#!/usr/bin/env python
import telebot
import boto3
from dotenv import load_dotenv
from telebot import types
import os
from datetime import datetime, timedelta
import logging
import math
# Configurar logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
# Para debugging, descomenta esta línea
# logger.setLevel(logging.DEBUG)
load_dotenv()
TELEGRAM_BOT_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN')
MINIO_ENDPOINT = os.getenv('MINIO_ENDPOINT')
MINIO_ACCESS_KEY = os.getenv('MINIO_ACCESS_KEY')
MINIO_SECRET_KEY = os.getenv('MINIO_SECRET_KEY')
# Obtener usuarios autorizados desde variables de entorno
AUTHORIZED_USER_IDS_STR = os.getenv('AUTHORIZED_USER_IDS', '')
AUTHORIZED_USERNAMES_STR = os.getenv('AUTHORIZED_USERNAMES', '')
# Convertir string de IDs a lista de enteros
AUTHORIZED_USER_IDS = []
if AUTHORIZED_USER_IDS_STR:
try:
AUTHORIZED_USER_IDS = [int(user_id.strip()) for user_id in AUTHORIZED_USER_IDS_STR.split(',') if user_id.strip()]
logger.info(f"Cargados {len(AUTHORIZED_USER_IDS)} IDs de usuario autorizados desde variables de entorno")
except ValueError as e:
logger.error(f"Error al convertir IDs de usuario: {e}. Asegúrate de que sean valores numéricos separados por comas.")
# Convertir string de nombres de usuario a lista
AUTHORIZED_USERNAMES = []
if AUTHORIZED_USERNAMES_STR:
AUTHORIZED_USERNAMES = [username.strip() for username in AUTHORIZED_USERNAMES_STR.split(',') if username.strip()]
logger.info(f"Cargados {len(AUTHORIZED_USERNAMES)} nombres de usuario autorizados desde variables de entorno")
# Si no hay usuarios autorizados definidos, mostrar advertencia
if not AUTHORIZED_USER_IDS and not AUTHORIZED_USERNAMES:
logger.warning("⚠️ No se han definido usuarios autorizados. El bot rechazará todas las solicitudes.")
# Importante: Configuración de comandos para el bot
bot = telebot.TeleBot(TELEGRAM_BOT_TOKEN, parse_mode='MARKDOWN')
# Registrar comandos en BotFather
bot.set_my_commands([
telebot.types.BotCommand("start", "Iniciar el bot y mostrar menú principal"),
telebot.types.BotCommand("upload", "Subir un archivo a un bucket"),
telebot.types.BotCommand("list", "Listar archivos en un bucket"),
telebot.types.BotCommand("help", "Mostrar ayuda")
])
BUCKETS = [
('bancoppel', 'Bancoppel'),
('cajadrarroyo', 'Caja Dr.Arroyo'),
('cajapopulardolores', 'Caja Popular Dolores'),
('cajasolidaria', 'Caja Solidaria'),
('condorbs-oss', 'Condorsbs-oss'),
('contingencia', 'Bucket de contingencia'),
('cooperativa', 'Cooperativa'),
('comercializado', 'Comercializado'),
('csguachinango', 'Caja Solidaria Guachinango'),
('cssmh', 'Caja Solidaria San Miguel Huimilpan'),
('financieratamazula', 'Financiera Tamazula'),
('imperialcc', 'Imperialcc'),
('lenocirochin', 'Lenocirochin'),
('mario', 'Mario bro'),
('mizuho', 'Mizuho'),
('mufg', 'Mufg'),
('serfimex', 'Serfimex'),
('tepeyac', 'Tepayac'),
('test', 'Test'),
('testb', 'Testb'),
('tiendapago', 'tiendapago'),
('walmart', 'Walmart'),
]
# Constantes para paginación
FILES_PER_PAGE = 8 # Número de archivos por página (reducido para evitar límites)
MAX_FILENAME_LENGTH = 30 # Longitud máxima del nombre de archivo mostrado
# Diccionarios para rastrear selecciones y estados de usuario
selected_buckets = {}
user_states = {} # Para rastrear en qué operación está cada usuario
bucket_files_cache = {} # Cache para archivos de bucket
def is_user_authorized(user):
"""Verifica si el usuario está autorizado para usar el bot"""
# Si recibimos un message o un call, extraer el from_user
if hasattr(user, 'from_user'):
user = user.from_user
# Ahora user debería ser directamente el objeto from_user
user_id = user.id
username = user.username
# Comprobar si el ID de usuario está en la lista de autorizados
if user_id in AUTHORIZED_USER_IDS:
logger.debug(f"Usuario {user_id} autorizado por ID")
return True
# Comprobar si el nombre de usuario está en la lista de autorizados
if username and username.lower() in [name.lower() for name in AUTHORIZED_USERNAMES]:
logger.debug(f"Usuario @{username} autorizado por nombre de usuario")
return True
logger.warning(f"Acceso denegado para: ID={user_id}, Username=@{username or 'None'}")
return False
def send_unauthorized_message(message):
"""Envía un mensaje al usuario no autorizado"""
user_info = f"ID: {message.from_user.id}, Username: @{message.from_user.username or 'None'}"
logger.warning(f"Intento de acceso no autorizado: {user_info}")
bot.reply_to(message, "⛔ No estás autorizado para usar este bot. Contacta al administrador si crees que esto es un error.")
def truncate_filename(filename, max_length=MAX_FILENAME_LENGTH):
"""Trunca el nombre del archivo si es muy largo"""
if len(filename) <= max_length:
return filename
# Mantener la extensión si es posible
name, ext = os.path.splitext(filename)
if len(ext) <= max_length - 3: # 3 para "..."
truncated_name = name[:max_length - len(ext) - 3] + "..."
return truncated_name + ext
else:
return filename[:max_length - 3] + "..."
def create_safe_callback_data(prefix, bucket_name, file_key, page=None):
"""Crea callback_data seguro que no exceda los 64 bytes"""
if page is not None:
base = f"{prefix}_{bucket_name}_{page}_"
else:
base = f"{prefix}_{bucket_name}_"
available_length = 64 - len(base)
if len(file_key) <= available_length:
if page is not None:
return f"{prefix}_{bucket_name}_{page}_{file_key}"
else:
return f"{prefix}_{bucket_name}_{file_key}"
else:
# Truncar el file_key
truncated_key = file_key[:available_length]
if page is not None:
return f"{prefix}_{bucket_name}_{page}_{truncated_key}"
else:
return f"{prefix}_{bucket_name}_{truncated_key}"
@bot.message_handler(commands=['start'])
def start(message):
if not is_user_authorized(message):
send_unauthorized_message(message)
return
logger.info(f"Comando /start recibido de usuario {message.from_user.id} (@{message.from_user.username or 'None'})")
bot.reply_to(message, """
**Hola! Este bot te permite interactuar con tu servidor MinIO S3.**
**¿Qué deseas hacer?**
""", reply_markup=generar_teclado_acciones())
@bot.message_handler(commands=['help'])
def help_message(message):
if not is_user_authorized(message):
send_unauthorized_message(message)
return
logger.info(f"Comando /help recibido de usuario {message.from_user.id}")
bot.reply_to(message, """
**Bienvenido a la ayuda de este bot!**
Comandos disponibles:
- /start - Comenzar a usar el bot
- /upload - Subir un archivo
- /list - Listar archivos en un bucket
- /help - Mostrar este mensaje de ayuda
Para subir un archivo:
- Usa /upload y selecciona un bucket, luego envía el archivo.
Para obtener un enlace de descarga:
- Usa /list para ver los archivos disponibles y selecciona uno.
""")
@bot.message_handler(commands=['upload'])
def upload_command(message):
if not is_user_authorized(message):
send_unauthorized_message(message)
return
logger.info(f"Comando /upload recibido de usuario {message.from_user.id}")
chat_id = message.chat.id
user_states[chat_id] = 'selecting_bucket_for_upload'
bot.send_message(chat_id, "**Selecciona un bucket para subir tu archivo:**", reply_markup=generar_teclado_buckets('upload'))
@bot.message_handler(commands=['list'])
def list_command(message):
if not is_user_authorized(message):
send_unauthorized_message(message)
return
logger.info(f"Comando /list recibido de usuario {message.from_user.id}")
chat_id = message.chat.id
user_states[chat_id] = 'selecting_bucket_for_list'
bot.send_message(chat_id, "**Selecciona un bucket para ver sus archivos:**", reply_markup=generar_teclado_buckets('list'))
@bot.callback_query_handler(func=lambda call: True)
def handle_callback(call):
chat_id = call.message.chat.id
# Verificar que el usuario esté autorizado usando call.from_user
if not is_user_authorized(call.from_user):
bot.answer_callback_query(callback_query_id=call.id, text="No estás autorizado para usar este bot", show_alert=True)
return
logger.debug(f"Callback recibido: {call.data} de usuario {call.from_user.id}")
# Manejar selección de acción
if call.data.startswith('action_'):
action = call.data.split('_')[1]
if action == 'upload':
user_states[chat_id] = 'selecting_bucket_for_upload'
bot.edit_message_text("**Selecciona un bucket para subir tu archivo:**",
chat_id=chat_id,
message_id=call.message.message_id,
reply_markup=generar_teclado_buckets('upload'))
elif action == 'list':
user_states[chat_id] = 'selecting_bucket_for_list'
bot.edit_message_text("**Selecciona un bucket para ver sus archivos:**",
chat_id=chat_id,
message_id=call.message.message_id,
reply_markup=generar_teclado_buckets('list'))
elif action == 'start':
# Manejar el regreso al menú principal
user_states[chat_id] = None
bot.edit_message_text("**Hola! Este bot te permite interactuar con tu servidor MinIO S3.**\n\n**¿Qué deseas hacer?**",
chat_id=chat_id,
message_id=call.message.message_id,
reply_markup=generar_teclado_acciones())
bot.answer_callback_query(callback_query_id=call.id, text=f'Seleccionado: {action}')
return
# Manejar selección de bucket para subir archivo
if call.data.startswith('upload_'):
bucket_name = call.data[7:] # Quitar 'upload_' del principio
selected_buckets[chat_id] = bucket_name
user_states[chat_id] = 'waiting_for_file'
bot.answer_callback_query(callback_query_id=call.id, text=f'Bucket seleccionado: {bucket_name}')
bot.send_message(chat_id, f"Bucket seleccionado: **{bucket_name}**\n\nAhora envíame el archivo que quieres almacenar.")
# Manejar selección de bucket para listar archivos
elif call.data.startswith('list_'):
bucket_name = call.data[5:] # Quitar 'list_' del principio
selected_buckets[chat_id] = bucket_name
list_bucket_contents(chat_id, bucket_name, call.message.message_id, page=1)
bot.answer_callback_query(callback_query_id=call.id, text=f'Listando bucket: {bucket_name}')
# Manejar paginación
elif call.data.startswith('page_'):
parts = call.data.split('_')
bucket_name = parts[1]
page = int(parts[2])
list_bucket_contents(chat_id, bucket_name, call.message.message_id, page=page)
bot.answer_callback_query(callback_query_id=call.id, text=f'Página {page}')
# Manejar selección de archivo para descargar
elif call.data.startswith('download_'):
parts = call.data.split('_', 3) # Máximo 3 divisiones
bucket_name = parts[1]
page = int(parts[2]) if parts[2].isdigit() else 1
file_key = parts[3]
# Buscar el archivo completo en el cache
cache_key = f"{chat_id}_{bucket_name}"
if cache_key in bucket_files_cache:
# Buscar el archivo que comience con el file_key truncado
full_file_key = None
for cached_file in bucket_files_cache[cache_key]:
if cached_file['Key'].startswith(file_key):
full_file_key = cached_file['Key']
break
if full_file_key:
generate_download_link(chat_id, bucket_name, full_file_key)
bot.answer_callback_query(callback_query_id=call.id, text=f'Generando enlace para: {truncate_filename(full_file_key)}')
else:
bot.answer_callback_query(callback_query_id=call.id, text='Archivo no encontrado', show_alert=True)
else:
bot.answer_callback_query(callback_query_id=call.id, text='Error: Cache expirado, vuelve a listar', show_alert=True)
@bot.message_handler(content_types=['document'])
def handle_document(message):
try:
# Verificar si el usuario está autorizado
if not is_user_authorized(message):
send_unauthorized_message(message)
return
chat_id = message.chat.id
# Verificar si el usuario está en el estado correcto
if chat_id not in user_states or user_states[chat_id] != 'waiting_for_file':
bot.reply_to(message, "Por favor selecciona un bucket primero usando /upload.")
return
if chat_id not in selected_buckets:
bot.reply_to(message, "Por favor selecciona un bucket primero.")
return
file_info = bot.get_file(message.document.file_id)
file_buffer = bot.download_file(file_info.file_path) # Descargar archivo en memoria
file_bytes = bytes(file_buffer) # Convertir a bytes
filename = message.document.file_name
bucket_name = selected_buckets[chat_id]
s3 = boto3.client('s3', endpoint_url=MINIO_ENDPOINT,
aws_access_key_id=MINIO_ACCESS_KEY,
aws_secret_access_key=MINIO_SECRET_KEY)
# Registrar la operación
logger.info(f"Usuario {message.from_user.id} (@{message.from_user.username}) subiendo archivo {filename} al bucket {bucket_name}")
# Define los metadatos que deseas adjuntar al archivo
metadata = {
'user': str(message.from_user.id),
'username': message.from_user.username or "sin_username",
'file_name': filename,
'content_type': message.document.mime_type,
'upload_date': datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
# Sube el archivo con los metadatos definidos
s3.put_object(Bucket=bucket_name, Key=filename, Body=file_bytes, Metadata=metadata)
bot.reply_to(message, f"✅ Archivo **{filename}** subido exitosamente al bucket **{bucket_name}**")
# Generar enlace de descarga temporal automáticamente después de subir
generate_download_link(chat_id, bucket_name, filename)
# Limpiar cache del bucket
cache_key = f"{chat_id}_{bucket_name}"
if cache_key in bucket_files_cache:
del bucket_files_cache[cache_key]
# Reiniciar el estado del usuario
user_states[chat_id] = None
except Exception as e:
logger.error(f"Error al subir archivo: {str(e)}")
bot.reply_to(message, f"❌ Error al subir el archivo: {e}")
def list_bucket_contents(chat_id, bucket_name, message_id=None, page=1):
try:
# Registrar la operación
logger.info(f"Listando contenido del bucket {bucket_name} para el usuario {chat_id}, página {page}")
s3 = boto3.client('s3', endpoint_url=MINIO_ENDPOINT,
aws_access_key_id=MINIO_ACCESS_KEY,
aws_secret_access_key=MINIO_SECRET_KEY)
# Usar cache o hacer nueva consulta
cache_key = f"{chat_id}_{bucket_name}"
if cache_key not in bucket_files_cache:
response = s3.list_objects_v2(Bucket=bucket_name)
if 'Contents' in response:
bucket_files_cache[cache_key] = response['Contents']
else:
bucket_files_cache[cache_key] = []
files = bucket_files_cache[cache_key]
if files:
# Calcular paginación
total_files = len(files)
total_pages = math.ceil(total_files / FILES_PER_PAGE)
start_index = (page - 1) * FILES_PER_PAGE
end_index = min(start_index + FILES_PER_PAGE, total_files)
page_files = files[start_index:end_index]
# Crear un teclado con los archivos de la página actual
keyboard = types.InlineKeyboardMarkup()
for obj in page_files:
file_key = obj['Key']
file_size = format_size(obj['Size'])
last_modified = obj['LastModified'].strftime("%Y-%m-%d")
# Truncar nombre del archivo para el botón
display_name = truncate_filename(file_key)
button_text = f"{display_name} ({file_size})"
# Crear callback_data seguro
callback_data = create_safe_callback_data("download", bucket_name, file_key, page)
keyboard.add(types.InlineKeyboardButton(text=button_text, callback_data=callback_data))
# Agregar botones de navegación si hay múltiples páginas
nav_buttons = []
if page > 1:
nav_buttons.append(types.InlineKeyboardButton("⬅️ Anterior", callback_data=f"page_{bucket_name}_{page-1}"))
if page < total_pages:
nav_buttons.append(types.InlineKeyboardButton("➡️ Siguiente", callback_data=f"page_{bucket_name}_{page+1}"))
if nav_buttons:
keyboard.row(*nav_buttons)
# Agregar botón para volver
keyboard.add(types.InlineKeyboardButton("🏠 Menú principal", callback_data="action_start"))
message_text = (f"📁 **Bucket: {bucket_name}**\n"
f"📄 Archivos: {total_files}\n"
f"📃 Página: {page}/{total_pages}\n"
f"📋 Mostrando: {start_index + 1}-{end_index}\n\n"
f"Selecciona un archivo para generar enlace:")
if message_id:
bot.edit_message_text(message_text, chat_id=chat_id, message_id=message_id, reply_markup=keyboard)
else:
bot.send_message(chat_id, message_text, reply_markup=keyboard)
else:
keyboard = types.InlineKeyboardMarkup()
keyboard.add(types.InlineKeyboardButton("🏠 Menú principal", callback_data="action_start"))
message_text = f"📂 El bucket **{bucket_name}** está vacío."
if message_id:
bot.edit_message_text(message_text, chat_id=chat_id, message_id=message_id, reply_markup=keyboard)
else:
bot.send_message(chat_id, message_text, reply_markup=keyboard)
except Exception as e:
logger.error(f"Error al listar contenido del bucket {bucket_name}: {str(e)}")
keyboard = types.InlineKeyboardMarkup()
keyboard.add(types.InlineKeyboardButton("🏠 Menú principal", callback_data="action_start"))
error_message = f"❌ Error al listar archivos: {str(e)}"
if message_id:
bot.edit_message_text(error_message, chat_id=chat_id, message_id=message_id, reply_markup=keyboard)
else:
bot.send_message(chat_id, error_message, reply_markup=keyboard)
def generate_download_link(chat_id, bucket_name, file_key):
try:
# Registrar la operación
logger.info(f"Generando enlace para archivo {file_key} en bucket {bucket_name} para usuario {chat_id}")
s3 = boto3.client('s3', endpoint_url=MINIO_ENDPOINT,
aws_access_key_id=MINIO_ACCESS_KEY,
aws_secret_access_key=MINIO_SECRET_KEY)
# Verificar que el archivo existe antes de generar el enlace
try:
s3.head_object(Bucket=bucket_name, Key=file_key)
except s3.exceptions.ClientError as e:
if e.response['Error']['Code'] == '404':
bot.send_message(chat_id, f"❌ Error: El archivo {file_key} no existe en el bucket {bucket_name}.")
return
else:
raise
# Generar enlace válido por 12 horas (43200 segundos)
url = s3.generate_presigned_url(
'get_object',
Params={'Bucket': bucket_name, 'Key': file_key},
ExpiresIn=43200
)
# Calcular la fecha de expiración
expiration_time = datetime.now() + timedelta(hours=12)
expiration_str = expiration_time.strftime("%Y-%m-%d %H:%M:%S")
# Enviar mensaje con el enlace
message = (f"🔗 **Enlace de descarga**\n\n"
f"📁 Archivo: **{file_key}**\n"
f"📍 Bucket: **{bucket_name}**\n"
f"⏱️ Válido hasta: **{expiration_str}**\n\n"
f"{url}")
bot.send_message(chat_id, message)
except Exception as e:
logger.error(f"Error al generar enlace para {file_key}: {str(e)}")
bot.send_message(chat_id, f"❌ Error al generar el enlace de descarga: {str(e)}")
def format_size(size_bytes):
"""Formato legible para el tamaño de archivo"""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if size_bytes < 1024.0:
return f"{size_bytes:.2f} {unit}"
size_bytes /= 1024.0
return f"{size_bytes:.2f} PB"
def generar_teclado_acciones():
"""Genera el teclado de acciones principales"""
keyboard = types.InlineKeyboardMarkup(row_width=2)
keyboard.add(
types.InlineKeyboardButton("📤 Subir archivo", callback_data="action_upload"),
types.InlineKeyboardButton("📋 Listar archivos", callback_data="action_list")
)
return keyboard
def generar_teclado_buckets(action_prefix):
"""Genera el teclado de buckets con un prefijo de acción"""
keyboard = types.InlineKeyboardMarkup()
buckets_row = []
for bucket_name, bucket_text in BUCKETS:
callback_data = f"{action_prefix}_{bucket_name}"
buckets_row.append(types.InlineKeyboardButton(bucket_text, callback_data=callback_data))
if len(buckets_row) == 3:
keyboard.row(*buckets_row)
buckets_row = []
if buckets_row:
keyboard.row(*buckets_row)
# Agregar botón para volver
keyboard.row(types.InlineKeyboardButton("🏠 Menú principal", callback_data="action_start"))
return keyboard
@bot.message_handler(func=lambda message: True)
def handle_unknown_command(message):
"""Maneja cualquier mensaje que no coincida con los comandos anteriores"""
if not is_user_authorized(message):
send_unauthorized_message(message)
return
logger.debug(f"Mensaje no reconocido: {message.text} de usuario {message.from_user.id}")
# Si el usuario está en estado de espera para un archivo, recordarle que envíe un documento
chat_id = message.chat.id
if chat_id in user_states and user_states[chat_id] == 'waiting_for_file':
bot.reply_to(message, "Por favor, envía un archivo para subir al bucket seleccionado.")
return
# Para cualquier otro mensaje, mostrar ayuda
bot.reply_to(message, "Comando no reconocido. Usa /help para ver los comandos disponibles.")
if __name__ == "__main__":
logger.info("Bot iniciado. Esperando mensajes...")
# Imprimir mensaje de arranque con lista de usuarios autorizados
auth_ids = ", ".join([str(uid) for uid in AUTHORIZED_USER_IDS])
auth_names = ", ".join(AUTHORIZED_USERNAMES)
logger.info(f"Usuarios autorizados por ID: {auth_ids}")
logger.info(f"Usuarios autorizados por nombre: {auth_names}")
try:
# Intentar iniciar el bot con manejo de errores
bot.polling(none_stop=True, interval=0)
except Exception as e:
logger.error(f"Error al iniciar el bot: {str(e)}")