Files
funstat-mcp/scripts/test_all_commands.py
2025-11-01 21:58:03 +08:00

242 lines
7.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
测试 funstat BOT 的所有命令
基于截图发现的功能
"""
import requests
import json
import time
import sys
BOT_TOKEN = "8410096573:AAFLJbWUp2Xog0oeoe7hfBlVqR7ChoSl9Pg"
BASE_URL = f"https://api.telegram.org/bot{BOT_TOKEN}"
# 测试用的免费 ID从 BOT 提供)
FREE_TEST_IDS = [
"7745296119",
"994270689",
"7864982459",
"5095649820",
"5933194496",
"7695732077"
]
def get_updates(offset=None, timeout=10):
"""获取消息更新"""
params = {"timeout": timeout}
if offset:
params["offset"] = offset
response = requests.get(f"{BASE_URL}/getUpdates", params=params)
return response.json()
def send_message(chat_id, text):
"""发送消息"""
data = {"chat_id": chat_id, "text": text}
response = requests.post(f"{BASE_URL}/sendMessage", json=data)
return response.json()
def clear_updates():
"""清除旧消息"""
updates = get_updates(timeout=1)
if updates.get('result'):
last_id = updates['result'][-1]['update_id']
get_updates(offset=last_id + 1, timeout=1)
def wait_for_response(chat_id, timeout=15):
"""等待 BOT 响应"""
start_time = time.time()
last_offset = None
responses = []
while time.time() - start_time < timeout:
updates = get_updates(offset=last_offset, timeout=5)
if updates.get('result'):
for update in updates['result']:
if last_offset is None or update['update_id'] > last_offset:
last_offset = update['update_id'] + 1
if 'message' in update:
msg = update['message']
if msg.get('from', {}).get('is_bot') and msg.get('chat', {}).get('id') == chat_id:
responses.append(msg)
if responses:
# 等待一下看是否还有更多消息
time.sleep(2)
# 再检查一次
updates = get_updates(offset=last_offset, timeout=1)
if updates.get('result'):
for update in updates['result']:
if update['update_id'] >= last_offset:
last_offset = update['update_id'] + 1
if 'message' in update:
msg = update['message']
if msg.get('from', {}).get('is_bot') and msg.get('chat', {}).get('id') == chat_id:
responses.append(msg)
break
time.sleep(0.5)
return responses
def format_response(responses):
"""格式化响应"""
if not responses:
return "❌ 无响应"
result = []
for i, msg in enumerate(responses, 1):
if len(responses) > 1:
result.append(f"\n--- 响应 {i} ---")
# 文本
if 'text' in msg:
text = msg['text']
if len(text) > 500:
result.append(f"📝 文本: {text[:500]}...")
else:
result.append(f"📝 文本: {text}")
# 图片
if 'photo' in msg:
result.append(f"🖼️ 图片: {len(msg['photo'])}")
if msg.get('caption'):
result.append(f" 说明: {msg['caption']}")
# 文档
if 'document' in msg:
result.append(f"📄 文档: {msg['document'].get('file_name', 'unknown')}")
# 内联键盘
if 'reply_markup' in msg and 'inline_keyboard' in msg['reply_markup']:
result.append("⌨️ 内联按钮:")
for row in msg['reply_markup']['inline_keyboard']:
buttons = [btn.get('text', '') for btn in row]
result.append(f" {' | '.join(buttons)}")
# 回复键盘
if 'reply_markup' in msg and 'keyboard' in msg['reply_markup']:
result.append("⌨️ 回复键盘:")
for row in msg['reply_markup']['keyboard']:
buttons = []
for btn in row:
btn_text = btn.get('text') if isinstance(btn, dict) else str(btn)
buttons.append(btn_text)
result.append(f" {' | '.join(buttons)}")
return '\n'.join(result)
def test_command(chat_id, command, description="", wait_time=15):
"""测试命令"""
print(f"\n{'='*70}")
print(f"📤 测试: {command}")
if description:
print(f" {description}")
print('='*70)
# 清除旧消息
clear_updates()
# 发送命令
result = send_message(chat_id, command)
if not result.get('ok'):
print(f"❌ 发送失败: {result}")
return None
print("⏳ 等待响应...")
# 等待响应
responses = wait_for_response(chat_id, timeout=wait_time)
print(format_response(responses))
return responses
def find_chat_id():
"""查找 chat_id"""
print("🔍 查找 chat_id...")
updates = get_updates(timeout=5)
if updates.get('result'):
for update in reversed(updates['result']):
if 'message' in update:
chat_id = update['message']['chat']['id']
print(f"✅ 找到 chat_id: {chat_id}\n")
return chat_id
print("❌ 未找到 chat_id")
print("请先在 Telegram 中向 @openaiw_bot 发送任意消息\n")
return None
def main():
print("="*70)
print("🤖 funstat BOT 完整功能测试")
print("="*70)
chat_id = find_chat_id()
if not chat_id:
return
# 定义所有要测试的命令
test_cases = [
# 基础命令
("/start", "显示欢迎信息和帮助"),
("/menu", "显示主菜单和用户状态"),
("/balance", "查看积分余额"),
# 搜索命令
("/search telegram", "搜索群组(示例:搜索 'telegram'"),
("/topchat", "获取热门群组目录"),
("/text hello", "通过消息文本搜索(示例:搜索 'hello'"),
("/human john", "通过名字搜索用户(示例:搜索 'john'"),
# 用免费测试 ID 查询
(FREE_TEST_IDS[0], f"查询测试用户 ID: {FREE_TEST_IDS[0]}"),
# 语言设置
("/lang", "语言设置(如果支持)"),
]
results = {}
for command, description in test_cases:
try:
responses = test_command(chat_id, command, description)
results[command] = responses
time.sleep(3) # 避免请求过快
except KeyboardInterrupt:
print("\n\n⚠️ 用户中断测试")
break
except Exception as e:
print(f"\n❌ 错误: {e}")
continue
# 保存结果
print("\n" + "="*70)
print("💾 保存测试结果...")
print("="*70)
with open("funstat_test_results.json", "w", encoding="utf-8") as f:
json.dump(results, f, indent=2, ensure_ascii=False)
print("✅ 结果已保存到 funstat_test_results.json")
# 生成摘要
print("\n" + "="*70)
print("📊 测试摘要")
print("="*70)
total = len(test_cases)
success = sum(1 for r in results.values() if r and len(r) > 0)
print(f"总测试数: {total}")
print(f"成功响应: {success}")
print(f"无响应: {total - success}")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\n\n👋 测试结束")