249 lines
7.3 KiB
Python
249 lines
7.3 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
测试 funstat BOT 的所有命令
|
||
基于截图发现的功能
|
||
"""
|
||
import json
|
||
import os
|
||
import sys
|
||
import time
|
||
|
||
import requests
|
||
|
||
from env_loader import load_env
|
||
|
||
load_env()
|
||
BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
|
||
if not BOT_TOKEN:
|
||
raise RuntimeError("请在 .env 中设置 TELEGRAM_BOT_TOKEN")
|
||
BASE_URL = f"https://api.telegram.org/bot{BOT_TOKEN}"
|
||
|
||
# 测试用的免费 ID(从 BOT 提供)
|
||
FREE_TEST_IDS = [
|
||
"7745296119",
|
||
"994270689",
|
||
"7864982459",
|
||
"5095649820",
|
||
"5933194496",
|
||
"7695732077"
|
||
]
|
||
|
||
def get_updates(offset=None, timeout=10):
|
||
"""获取消息更新"""
|
||
params = {"timeout": timeout}
|
||
if offset:
|
||
params["offset"] = offset
|
||
response = requests.get(f"{BASE_URL}/getUpdates", params=params)
|
||
return response.json()
|
||
|
||
def send_message(chat_id, text):
|
||
"""发送消息"""
|
||
data = {"chat_id": chat_id, "text": text}
|
||
response = requests.post(f"{BASE_URL}/sendMessage", json=data)
|
||
return response.json()
|
||
|
||
def clear_updates():
|
||
"""清除旧消息"""
|
||
updates = get_updates(timeout=1)
|
||
if updates.get('result'):
|
||
last_id = updates['result'][-1]['update_id']
|
||
get_updates(offset=last_id + 1, timeout=1)
|
||
|
||
def wait_for_response(chat_id, timeout=15):
|
||
"""等待 BOT 响应"""
|
||
start_time = time.time()
|
||
last_offset = None
|
||
responses = []
|
||
|
||
while time.time() - start_time < timeout:
|
||
updates = get_updates(offset=last_offset, timeout=5)
|
||
|
||
if updates.get('result'):
|
||
for update in updates['result']:
|
||
if last_offset is None or update['update_id'] > last_offset:
|
||
last_offset = update['update_id'] + 1
|
||
|
||
if 'message' in update:
|
||
msg = update['message']
|
||
if msg.get('from', {}).get('is_bot') and msg.get('chat', {}).get('id') == chat_id:
|
||
responses.append(msg)
|
||
|
||
if responses:
|
||
# 等待一下看是否还有更多消息
|
||
time.sleep(2)
|
||
# 再检查一次
|
||
updates = get_updates(offset=last_offset, timeout=1)
|
||
if updates.get('result'):
|
||
for update in updates['result']:
|
||
if update['update_id'] >= last_offset:
|
||
last_offset = update['update_id'] + 1
|
||
if 'message' in update:
|
||
msg = update['message']
|
||
if msg.get('from', {}).get('is_bot') and msg.get('chat', {}).get('id') == chat_id:
|
||
responses.append(msg)
|
||
break
|
||
|
||
time.sleep(0.5)
|
||
|
||
return responses
|
||
|
||
def format_response(responses):
|
||
"""格式化响应"""
|
||
if not responses:
|
||
return "❌ 无响应"
|
||
|
||
result = []
|
||
for i, msg in enumerate(responses, 1):
|
||
if len(responses) > 1:
|
||
result.append(f"\n--- 响应 {i} ---")
|
||
|
||
# 文本
|
||
if 'text' in msg:
|
||
text = msg['text']
|
||
if len(text) > 500:
|
||
result.append(f"📝 文本: {text[:500]}...")
|
||
else:
|
||
result.append(f"📝 文本: {text}")
|
||
|
||
# 图片
|
||
if 'photo' in msg:
|
||
result.append(f"🖼️ 图片: {len(msg['photo'])} 张")
|
||
if msg.get('caption'):
|
||
result.append(f" 说明: {msg['caption']}")
|
||
|
||
# 文档
|
||
if 'document' in msg:
|
||
result.append(f"📄 文档: {msg['document'].get('file_name', 'unknown')}")
|
||
|
||
# 内联键盘
|
||
if 'reply_markup' in msg and 'inline_keyboard' in msg['reply_markup']:
|
||
result.append("⌨️ 内联按钮:")
|
||
for row in msg['reply_markup']['inline_keyboard']:
|
||
buttons = [btn.get('text', '') for btn in row]
|
||
result.append(f" {' | '.join(buttons)}")
|
||
|
||
# 回复键盘
|
||
if 'reply_markup' in msg and 'keyboard' in msg['reply_markup']:
|
||
result.append("⌨️ 回复键盘:")
|
||
for row in msg['reply_markup']['keyboard']:
|
||
buttons = []
|
||
for btn in row:
|
||
btn_text = btn.get('text') if isinstance(btn, dict) else str(btn)
|
||
buttons.append(btn_text)
|
||
result.append(f" {' | '.join(buttons)}")
|
||
|
||
return '\n'.join(result)
|
||
|
||
def test_command(chat_id, command, description="", wait_time=15):
|
||
"""测试命令"""
|
||
print(f"\n{'='*70}")
|
||
print(f"📤 测试: {command}")
|
||
if description:
|
||
print(f" {description}")
|
||
print('='*70)
|
||
|
||
# 清除旧消息
|
||
clear_updates()
|
||
|
||
# 发送命令
|
||
result = send_message(chat_id, command)
|
||
if not result.get('ok'):
|
||
print(f"❌ 发送失败: {result}")
|
||
return None
|
||
|
||
print("⏳ 等待响应...")
|
||
|
||
# 等待响应
|
||
responses = wait_for_response(chat_id, timeout=wait_time)
|
||
|
||
print(format_response(responses))
|
||
|
||
return responses
|
||
|
||
def find_chat_id():
|
||
"""查找 chat_id"""
|
||
print("🔍 查找 chat_id...")
|
||
updates = get_updates(timeout=5)
|
||
|
||
if updates.get('result'):
|
||
for update in reversed(updates['result']):
|
||
if 'message' in update:
|
||
chat_id = update['message']['chat']['id']
|
||
print(f"✅ 找到 chat_id: {chat_id}\n")
|
||
return chat_id
|
||
|
||
print("❌ 未找到 chat_id")
|
||
print("请先在 Telegram 中向 @openaiw_bot 发送任意消息\n")
|
||
return None
|
||
|
||
def main():
|
||
print("="*70)
|
||
print("🤖 funstat BOT 完整功能测试")
|
||
print("="*70)
|
||
|
||
chat_id = find_chat_id()
|
||
if not chat_id:
|
||
return
|
||
|
||
# 定义所有要测试的命令
|
||
test_cases = [
|
||
# 基础命令
|
||
("/start", "显示欢迎信息和帮助"),
|
||
("/menu", "显示主菜单和用户状态"),
|
||
("/balance", "查看积分余额"),
|
||
|
||
# 搜索命令
|
||
("/search telegram", "搜索群组(示例:搜索 'telegram')"),
|
||
("/topchat", "获取热门群组目录"),
|
||
("/text hello", "通过消息文本搜索(示例:搜索 'hello')"),
|
||
("/human john", "通过名字搜索用户(示例:搜索 'john')"),
|
||
|
||
# 用免费测试 ID 查询
|
||
(FREE_TEST_IDS[0], f"查询测试用户 ID: {FREE_TEST_IDS[0]}"),
|
||
|
||
# 语言设置
|
||
("/lang", "语言设置(如果支持)"),
|
||
]
|
||
|
||
results = {}
|
||
|
||
for command, description in test_cases:
|
||
try:
|
||
responses = test_command(chat_id, command, description)
|
||
results[command] = responses
|
||
time.sleep(3) # 避免请求过快
|
||
except KeyboardInterrupt:
|
||
print("\n\n⚠️ 用户中断测试")
|
||
break
|
||
except Exception as e:
|
||
print(f"\n❌ 错误: {e}")
|
||
continue
|
||
|
||
# 保存结果
|
||
print("\n" + "="*70)
|
||
print("💾 保存测试结果...")
|
||
print("="*70)
|
||
|
||
with open("funstat_test_results.json", "w", encoding="utf-8") as f:
|
||
json.dump(results, f, indent=2, ensure_ascii=False)
|
||
|
||
print("✅ 结果已保存到 funstat_test_results.json")
|
||
|
||
# 生成摘要
|
||
print("\n" + "="*70)
|
||
print("📊 测试摘要")
|
||
print("="*70)
|
||
|
||
total = len(test_cases)
|
||
success = sum(1 for r in results.values() if r and len(r) > 0)
|
||
|
||
print(f"总测试数: {total}")
|
||
print(f"成功响应: {success}")
|
||
print(f"无响应: {total - success}")
|
||
|
||
if __name__ == "__main__":
|
||
try:
|
||
main()
|
||
except KeyboardInterrupt:
|
||
print("\n\n👋 测试结束")
|