Files
funstat-mcp/scripts/test_bot_commands.py

165 lines
4.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
测试 BOT 的所有命令并获取响应
"""
import json
import os
import time
import requests
from env_loader import load_env
load_env()
BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
if not BOT_TOKEN:
raise RuntimeError("请在 .env 中设置 TELEGRAM_BOT_TOKEN")
BASE_URL = f"https://api.telegram.org/bot{BOT_TOKEN}"
def get_updates(offset=None):
"""获取消息更新"""
params = {"timeout": 10, "allowed_updates": ["message"]}
if offset:
params["offset"] = offset
response = requests.get(f"{BASE_URL}/getUpdates", params=params)
return response.json()
def send_message(chat_id, text):
"""发送消息"""
data = {
"chat_id": chat_id,
"text": text
}
response = requests.post(f"{BASE_URL}/sendMessage", json=data)
return response.json()
def clear_updates():
"""清除旧消息"""
updates = get_updates()
if updates.get('result'):
last_update_id = updates['result'][-1]['update_id']
get_updates(offset=last_update_id + 1)
def wait_for_response(chat_id, timeout=10):
"""等待 BOT 响应"""
start_time = time.time()
last_offset = None
while time.time() - start_time < timeout:
updates = get_updates(offset=last_offset)
if updates.get('result'):
for update in updates['result']:
if last_offset is None or update['update_id'] > last_offset:
last_offset = update['update_id'] + 1
if 'message' in update:
msg = update['message']
# 检查是否是 BOT 发送的消息
if msg.get('from', {}).get('is_bot') and msg.get('chat', {}).get('id') == chat_id:
return msg
time.sleep(0.5)
return None
def test_command(chat_id, command, description=""):
"""测试单个命令"""
print(f"\n{'='*60}")
print(f"测试命令: {command}")
if description:
print(f"说明: {description}")
print('='*60)
# 清除旧消息
clear_updates()
# 发送命令
send_result = send_message(chat_id, command)
if not send_result.get('ok'):
print(f"❌ 发送失败: {send_result}")
return None
print(f"✓ 命令已发送,等待响应...")
# 等待响应
response = wait_for_response(chat_id)
if response:
print(f"\n📥 BOT 响应:")
print("-" * 60)
# 提取消息内容
if 'text' in response:
print(f"文本消息:")
print(response['text'])
if 'photo' in response:
print(f"图片消息: {len(response['photo'])} 张图片")
if 'document' in response:
print(f"文档消息: {response['document'].get('file_name', 'unknown')}")
if 'reply_markup' in response:
print(f"\n键盘/按钮:")
markup = response['reply_markup']
if 'inline_keyboard' in markup:
for row in markup['inline_keyboard']:
for button in row:
print(f" - {button.get('text')}: {button.get('callback_data', button.get('url', ''))}")
if 'keyboard' in markup:
for row in markup['keyboard']:
for button in row:
print(f" - {button.get('text', button)}")
print("-" * 60)
return response
else:
print("❌ 没有收到响应(超时)")
return None
def main():
# 获取 chat_id
print("获取 chat_id...")
updates = get_updates()
chat_id = None
if updates.get('result'):
for update in reversed(updates['result']):
if 'message' in update:
chat_id = update['message']['chat']['id']
print(f"✓ 找到 chat_id: {chat_id}")
break
if not chat_id:
print("❌ 找不到 chat_id请先向 BOT 发送一条消息")
return
# 已知命令列表
commands = [
("/start", "🚀 开始使用机器人"),
("/help", "帮助信息"),
("/search", "🔍 搜索数据"),
("/balance", "💰 查看积分余额"),
]
results = {}
for cmd, desc in commands:
response = test_command(chat_id, cmd, desc)
results[cmd] = response
time.sleep(2) # 避免发送太快
# 保存结果
print("\n" + "="*60)
print("保存测试结果...")
print("="*60)
with open("bot_commands_result.json", "w", encoding="utf-8") as f:
json.dump(results, f, indent=2, ensure_ascii=False)
print("✓ 结果已保存到 bot_commands_result.json")
if __name__ == "__main__":
main()