From 616b9b49f5f4cbdb94bea9457a3066b3cba03155 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kevin=20Mu=C3=B1oz?= Date: Mon, 23 Jun 2025 10:57:46 -0500 Subject: [PATCH] fix: implement pagination to resolve Telegram API 'reply markup too long' error - Add pagination system with 8 files per page to avoid API limits - Implement file name truncation for display (max 30 chars) - Add safe callback_data generation to respect 64-byte limit - Introduce file caching to improve performance and reduce MinIO calls - Add navigation buttons (Previous/Next) for multi-page file lists - Display pagination info (current page, total pages, file range) - Preserve all security and authorization checks - Maintain detailed logging for audit purposes Fixes issue where buckets with many files caused Telegram API errors due to inline keyboard size exceeding platform limits. --- bots3.py | 194 +++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 138 insertions(+), 56 deletions(-) diff --git a/bots3.py b/bots3.py index 0386647..e65b5d7 100644 --- a/bots3.py +++ b/bots3.py @@ -7,6 +7,7 @@ from telebot import types import os from datetime import datetime, timedelta import logging +import math # Configurar logging logging.basicConfig( @@ -84,9 +85,14 @@ BUCKETS = [ ('walmart', 'Walmart'), ] +# Constantes para paginación +FILES_PER_PAGE = 8 # Número de archivos por página (reducido para evitar límites) +MAX_FILENAME_LENGTH = 30 # Longitud máxima del nombre de archivo mostrado + # Diccionarios para rastrear selecciones y estados de usuario selected_buckets = {} user_states = {} # Para rastrear en qué operación está cada usuario +bucket_files_cache = {} # Cache para archivos de bucket def is_user_authorized(user): """Verifica si el usuario está autorizado para usar el bot""" @@ -118,6 +124,41 @@ def send_unauthorized_message(message): bot.reply_to(message, "⛔ No estás autorizado para usar este bot. Contacta al administrador si crees que esto es un error.") +def truncate_filename(filename, max_length=MAX_FILENAME_LENGTH): + """Trunca el nombre del archivo si es muy largo""" + if len(filename) <= max_length: + return filename + + # Mantener la extensión si es posible + name, ext = os.path.splitext(filename) + if len(ext) <= max_length - 3: # 3 para "..." + truncated_name = name[:max_length - len(ext) - 3] + "..." + return truncated_name + ext + else: + return filename[:max_length - 3] + "..." + +def create_safe_callback_data(prefix, bucket_name, file_key, page=None): + """Crea callback_data seguro que no exceda los 64 bytes""" + if page is not None: + base = f"{prefix}_{bucket_name}_{page}_" + else: + base = f"{prefix}_{bucket_name}_" + + available_length = 64 - len(base) + + if len(file_key) <= available_length: + if page is not None: + return f"{prefix}_{bucket_name}_{page}_{file_key}" + else: + return f"{prefix}_{bucket_name}_{file_key}" + else: + # Truncar el file_key + truncated_key = file_key[:available_length] + if page is not None: + return f"{prefix}_{bucket_name}_{page}_{truncated_key}" + else: + return f"{prefix}_{bucket_name}_{truncated_key}" + @bot.message_handler(commands=['start']) def start(message): if not is_user_authorized(message): @@ -180,7 +221,7 @@ def list_command(message): user_states[chat_id] = 'selecting_bucket_for_list' bot.send_message(chat_id, "**Selecciona un bucket para ver sus archivos:**", reply_markup=generar_teclado_buckets('list')) -@bot.callback_query_handler(func=lambda call: call.data.startswith(('upload_', 'list_', 'download_', 'action_'))) +@bot.callback_query_handler(func=lambda call: True) def handle_callback(call): chat_id = call.message.chat.id @@ -228,16 +269,41 @@ def handle_callback(call): elif call.data.startswith('list_'): bucket_name = call.data[5:] # Quitar 'list_' del principio selected_buckets[chat_id] = bucket_name - list_bucket_contents(chat_id, bucket_name, call.message.message_id) + list_bucket_contents(chat_id, bucket_name, call.message.message_id, page=1) bot.answer_callback_query(callback_query_id=call.id, text=f'Listando bucket: {bucket_name}') + # Manejar paginación + elif call.data.startswith('page_'): + parts = call.data.split('_') + bucket_name = parts[1] + page = int(parts[2]) + list_bucket_contents(chat_id, bucket_name, call.message.message_id, page=page) + bot.answer_callback_query(callback_query_id=call.id, text=f'Página {page}') + # Manejar selección de archivo para descargar elif call.data.startswith('download_'): - parts = call.data.split('_', 2) # Máximo 2 divisiones + parts = call.data.split('_', 3) # Máximo 3 divisiones bucket_name = parts[1] - file_key = parts[2] - generate_download_link(chat_id, bucket_name, file_key) - bot.answer_callback_query(callback_query_id=call.id, text=f'Generando enlace para: {file_key}') + page = int(parts[2]) if parts[2].isdigit() else 1 + file_key = parts[3] + + # Buscar el archivo completo en el cache + cache_key = f"{chat_id}_{bucket_name}" + if cache_key in bucket_files_cache: + # Buscar el archivo que comience con el file_key truncado + full_file_key = None + for cached_file in bucket_files_cache[cache_key]: + if cached_file['Key'].startswith(file_key): + full_file_key = cached_file['Key'] + break + + if full_file_key: + generate_download_link(chat_id, bucket_name, full_file_key) + bot.answer_callback_query(callback_query_id=call.id, text=f'Generando enlace para: {truncate_filename(full_file_key)}') + else: + bot.answer_callback_query(callback_query_id=call.id, text='Archivo no encontrado', show_alert=True) + else: + bot.answer_callback_query(callback_query_id=call.id, text='Error: Cache expirado, vuelve a listar', show_alert=True) @bot.message_handler(content_types=['document']) def handle_document(message): @@ -288,6 +354,11 @@ def handle_document(message): # Generar enlace de descarga temporal automáticamente después de subir generate_download_link(chat_id, bucket_name, filename) + # Limpiar cache del bucket + cache_key = f"{chat_id}_{bucket_name}" + if cache_key in bucket_files_cache: + del bucket_files_cache[cache_key] + # Reiniciar el estado del usuario user_states[chat_id] = None @@ -295,68 +366,95 @@ def handle_document(message): logger.error(f"Error al subir archivo: {str(e)}") bot.reply_to(message, f"❌ Error al subir el archivo: {e}") -def list_bucket_contents(chat_id, bucket_name, message_id=None): +def list_bucket_contents(chat_id, bucket_name, message_id=None, page=1): try: # Registrar la operación - logger.info(f"Listando contenido del bucket {bucket_name} para el usuario {chat_id}") + logger.info(f"Listando contenido del bucket {bucket_name} para el usuario {chat_id}, página {page}") s3 = boto3.client('s3', endpoint_url=MINIO_ENDPOINT, aws_access_key_id=MINIO_ACCESS_KEY, aws_secret_access_key=MINIO_SECRET_KEY) - response = s3.list_objects_v2(Bucket=bucket_name) + # Usar cache o hacer nueva consulta + cache_key = f"{chat_id}_{bucket_name}" + if cache_key not in bucket_files_cache: + response = s3.list_objects_v2(Bucket=bucket_name) + if 'Contents' in response: + bucket_files_cache[cache_key] = response['Contents'] + else: + bucket_files_cache[cache_key] = [] - if 'Contents' in response and response['Contents']: - # Crear un teclado con los archivos + files = bucket_files_cache[cache_key] + + if files: + # Calcular paginación + total_files = len(files) + total_pages = math.ceil(total_files / FILES_PER_PAGE) + start_index = (page - 1) * FILES_PER_PAGE + end_index = min(start_index + FILES_PER_PAGE, total_files) + page_files = files[start_index:end_index] + + # Crear un teclado con los archivos de la página actual keyboard = types.InlineKeyboardMarkup() - for obj in response['Contents']: + for obj in page_files: file_key = obj['Key'] file_size = format_size(obj['Size']) last_modified = obj['LastModified'].strftime("%Y-%m-%d") - button_text = f"{file_key} ({file_size}) - {last_modified}" - button_data = f"download_{bucket_name}_{file_key}" + # Truncar nombre del archivo para el botón + display_name = truncate_filename(file_key) + button_text = f"{display_name} ({file_size})" - # Asegurarnos de que el callback_data no exceda los 64 bytes - if len(button_data) > 64: - # Truncar el nombre del archivo si es necesario - max_key_length = 64 - len(f"download_{bucket_name}_") - 3 # 3 para "..." - truncated_key = file_key[:max_key_length] + "..." - button_text = f"{truncated_key} ({file_size}) - {last_modified}" - button_data = f"download_{bucket_name}_{file_key[:max_key_length]}" + # Crear callback_data seguro + callback_data = create_safe_callback_data("download", bucket_name, file_key, page) - keyboard.add(types.InlineKeyboardButton(text=button_text, callback_data=button_data)) + keyboard.add(types.InlineKeyboardButton(text=button_text, callback_data=callback_data)) + + # Agregar botones de navegación si hay múltiples páginas + nav_buttons = [] + if page > 1: + nav_buttons.append(types.InlineKeyboardButton("⬅️ Anterior", callback_data=f"page_{bucket_name}_{page-1}")) + if page < total_pages: + nav_buttons.append(types.InlineKeyboardButton("➡️ Siguiente", callback_data=f"page_{bucket_name}_{page+1}")) + + if nav_buttons: + keyboard.row(*nav_buttons) # Agregar botón para volver - keyboard.add(types.InlineKeyboardButton("⬅️ Volver", callback_data="action_list")) + keyboard.add(types.InlineKeyboardButton("🏠 Menú principal", callback_data="action_start")) - message_text = f"📁 Archivos en el bucket **{bucket_name}**:\n\nSelecciona un archivo para generar un enlace de descarga:" + message_text = (f"📁 **Bucket: {bucket_name}**\n" + f"📄 Archivos: {total_files}\n" + f"📃 Página: {page}/{total_pages}\n" + f"📋 Mostrando: {start_index + 1}-{end_index}\n\n" + f"Selecciona un archivo para generar enlace:") if message_id: bot.edit_message_text(message_text, chat_id=chat_id, message_id=message_id, reply_markup=keyboard) else: bot.send_message(chat_id, message_text, reply_markup=keyboard) else: + keyboard = types.InlineKeyboardMarkup() + keyboard.add(types.InlineKeyboardButton("🏠 Menú principal", callback_data="action_start")) + + message_text = f"📂 El bucket **{bucket_name}** está vacío." + if message_id: - bot.edit_message_text(f"📂 El bucket **{bucket_name}** está vacío.", - chat_id=chat_id, - message_id=message_id, - reply_markup=types.InlineKeyboardMarkup().add( - types.InlineKeyboardButton("⬅️ Volver", callback_data="action_list") - )) + bot.edit_message_text(message_text, chat_id=chat_id, message_id=message_id, reply_markup=keyboard) else: - bot.send_message(chat_id, f"📂 El bucket **{bucket_name}** está vacío.", - reply_markup=types.InlineKeyboardMarkup().add( - types.InlineKeyboardButton("⬅️ Volver", callback_data="action_list") - )) + bot.send_message(chat_id, message_text, reply_markup=keyboard) + except Exception as e: logger.error(f"Error al listar contenido del bucket {bucket_name}: {str(e)}") + keyboard = types.InlineKeyboardMarkup() + keyboard.add(types.InlineKeyboardButton("🏠 Menú principal", callback_data="action_start")) + error_message = f"❌ Error al listar archivos: {str(e)}" if message_id: - bot.edit_message_text(error_message, chat_id=chat_id, message_id=message_id) + bot.edit_message_text(error_message, chat_id=chat_id, message_id=message_id, reply_markup=keyboard) else: - bot.send_message(chat_id, error_message) + bot.send_message(chat_id, error_message, reply_markup=keyboard) def generate_download_link(chat_id, bucket_name, file_key): try: @@ -389,9 +487,10 @@ def generate_download_link(chat_id, bucket_name, file_key): expiration_str = expiration_time.strftime("%Y-%m-%d %H:%M:%S") # Enviar mensaje con el enlace - message = (f"🔗 **Enlace de descarga para {file_key}**\n\n" + message = (f"🔗 **Enlace de descarga**\n\n" + f"📁 Archivo: **{file_key}**\n" f"📍 Bucket: **{bucket_name}**\n" - f"⏱️ Enlace válido hasta: **{expiration_str}**\n\n" + f"⏱️ Válido hasta: **{expiration_str}**\n\n" f"{url}") bot.send_message(chat_id, message) @@ -432,27 +531,10 @@ def generar_teclado_buckets(action_prefix): keyboard.row(*buckets_row) # Agregar botón para volver - keyboard.row(types.InlineKeyboardButton("⬅️ Volver", callback_data="action_start")) + keyboard.row(types.InlineKeyboardButton("🏠 Menú principal", callback_data="action_start")) return keyboard -@bot.callback_query_handler(func=lambda call: call.data == "action_start") -def handle_start_action(call): - chat_id = call.message.chat.id - - # Verificar que el usuario esté autorizado usando call.from_user - if not is_user_authorized(call.from_user): - bot.answer_callback_query(callback_query_id=call.id, text="No estás autorizado para usar este bot", show_alert=True) - return - - bot.edit_message_text( - "**Hola! Este bot te permite interactuar con tu servidor MinIO S3.**\n\n**¿Qué deseas hacer?**", - chat_id=chat_id, - message_id=call.message.message_id, - reply_markup=generar_teclado_acciones() - ) - bot.answer_callback_query(callback_query_id=call.id, text="Menú principal") - @bot.message_handler(func=lambda message: True) def handle_unknown_command(message): """Maneja cualquier mensaje que no coincida con los comandos anteriores"""