#!/usr/bin/env python3 """ 测试 funstat BOT 的所有命令 基于截图发现的功能 """ import json import os import sys import time import requests from env_loader import load_env load_env() BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN") if not BOT_TOKEN: raise RuntimeError("请在 .env 中设置 TELEGRAM_BOT_TOKEN") BASE_URL = f"https://api.telegram.org/bot{BOT_TOKEN}" # 测试用的免费 ID(从 BOT 提供) FREE_TEST_IDS = [ "7745296119", "994270689", "7864982459", "5095649820", "5933194496", "7695732077" ] def get_updates(offset=None, timeout=10): """获取消息更新""" params = {"timeout": timeout} if offset: params["offset"] = offset response = requests.get(f"{BASE_URL}/getUpdates", params=params) return response.json() def send_message(chat_id, text): """发送消息""" data = {"chat_id": chat_id, "text": text} response = requests.post(f"{BASE_URL}/sendMessage", json=data) return response.json() def clear_updates(): """清除旧消息""" updates = get_updates(timeout=1) if updates.get('result'): last_id = updates['result'][-1]['update_id'] get_updates(offset=last_id + 1, timeout=1) def wait_for_response(chat_id, timeout=15): """等待 BOT 响应""" start_time = time.time() last_offset = None responses = [] while time.time() - start_time < timeout: updates = get_updates(offset=last_offset, timeout=5) if updates.get('result'): for update in updates['result']: if last_offset is None or update['update_id'] > last_offset: last_offset = update['update_id'] + 1 if 'message' in update: msg = update['message'] if msg.get('from', {}).get('is_bot') and msg.get('chat', {}).get('id') == chat_id: responses.append(msg) if responses: # 等待一下看是否还有更多消息 time.sleep(2) # 再检查一次 updates = get_updates(offset=last_offset, timeout=1) if updates.get('result'): for update in updates['result']: if update['update_id'] >= last_offset: last_offset = update['update_id'] + 1 if 'message' in update: msg = update['message'] if msg.get('from', {}).get('is_bot') and msg.get('chat', {}).get('id') == chat_id: responses.append(msg) break time.sleep(0.5) return responses def format_response(responses): """格式化响应""" if not responses: return "❌ 无响应" result = [] for i, msg in enumerate(responses, 1): if len(responses) > 1: result.append(f"\n--- 响应 {i} ---") # 文本 if 'text' in msg: text = msg['text'] if len(text) > 500: result.append(f"📝 文本: {text[:500]}...") else: result.append(f"📝 文本: {text}") # 图片 if 'photo' in msg: result.append(f"🖼️ 图片: {len(msg['photo'])} 张") if msg.get('caption'): result.append(f" 说明: {msg['caption']}") # 文档 if 'document' in msg: result.append(f"📄 文档: {msg['document'].get('file_name', 'unknown')}") # 内联键盘 if 'reply_markup' in msg and 'inline_keyboard' in msg['reply_markup']: result.append("⌨️ 内联按钮:") for row in msg['reply_markup']['inline_keyboard']: buttons = [btn.get('text', '') for btn in row] result.append(f" {' | '.join(buttons)}") # 回复键盘 if 'reply_markup' in msg and 'keyboard' in msg['reply_markup']: result.append("⌨️ 回复键盘:") for row in msg['reply_markup']['keyboard']: buttons = [] for btn in row: btn_text = btn.get('text') if isinstance(btn, dict) else str(btn) buttons.append(btn_text) result.append(f" {' | '.join(buttons)}") return '\n'.join(result) def test_command(chat_id, command, description="", wait_time=15): """测试命令""" print(f"\n{'='*70}") print(f"📤 测试: {command}") if description: print(f" {description}") print('='*70) # 清除旧消息 clear_updates() # 发送命令 result = send_message(chat_id, command) if not result.get('ok'): print(f"❌ 发送失败: {result}") return None print("⏳ 等待响应...") # 等待响应 responses = wait_for_response(chat_id, timeout=wait_time) print(format_response(responses)) return responses def find_chat_id(): """查找 chat_id""" print("🔍 查找 chat_id...") updates = get_updates(timeout=5) if updates.get('result'): for update in reversed(updates['result']): if 'message' in update: chat_id = update['message']['chat']['id'] print(f"✅ 找到 chat_id: {chat_id}\n") return chat_id print("❌ 未找到 chat_id") print("请先在 Telegram 中向 @openaiw_bot 发送任意消息\n") return None def main(): print("="*70) print("🤖 funstat BOT 完整功能测试") print("="*70) chat_id = find_chat_id() if not chat_id: return # 定义所有要测试的命令 test_cases = [ # 基础命令 ("/start", "显示欢迎信息和帮助"), ("/menu", "显示主菜单和用户状态"), ("/balance", "查看积分余额"), # 搜索命令 ("/search telegram", "搜索群组(示例:搜索 'telegram')"), ("/topchat", "获取热门群组目录"), ("/text hello", "通过消息文本搜索(示例:搜索 'hello')"), ("/human john", "通过名字搜索用户(示例:搜索 'john')"), # 用免费测试 ID 查询 (FREE_TEST_IDS[0], f"查询测试用户 ID: {FREE_TEST_IDS[0]}"), # 语言设置 ("/lang", "语言设置(如果支持)"), ] results = {} for command, description in test_cases: try: responses = test_command(chat_id, command, description) results[command] = responses time.sleep(3) # 避免请求过快 except KeyboardInterrupt: print("\n\n⚠️ 用户中断测试") break except Exception as e: print(f"\n❌ 错误: {e}") continue # 保存结果 print("\n" + "="*70) print("💾 保存测试结果...") print("="*70) with open("funstat_test_results.json", "w", encoding="utf-8") as f: json.dump(results, f, indent=2, ensure_ascii=False) print("✅ 结果已保存到 funstat_test_results.json") # 生成摘要 print("\n" + "="*70) print("📊 测试摘要") print("="*70) total = len(test_cases) success = sum(1 for r in results.values() if r and len(r) > 0) print(f"总测试数: {total}") print(f"成功响应: {success}") print(f"无响应: {total - success}") if __name__ == "__main__": try: main() except KeyboardInterrupt: print("\n\n👋 测试结束")