160 lines
5.0 KiB
Python
160 lines
5.0 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
交互式 Telegram BOT 测试工具
|
||
"""
|
||
import requests
|
||
import json
|
||
import time
|
||
import sys
|
||
|
||
BOT_TOKEN = "8410096573:AAFLJbWUp2Xog0oeoe7hfBlVqR7ChoSl9Pg"
|
||
BASE_URL = f"https://api.telegram.org/bot{BOT_TOKEN}"
|
||
|
||
# 这里需要你的 Telegram 用户 ID
|
||
# 可以通过向 @userinfobot 发送消息来获取
|
||
YOUR_USER_ID = None
|
||
|
||
def get_updates(offset=None, timeout=30):
|
||
"""获取消息更新"""
|
||
params = {"timeout": timeout}
|
||
if offset:
|
||
params["offset"] = offset
|
||
response = requests.get(f"{BASE_URL}/getUpdates", params=params)
|
||
return response.json()
|
||
|
||
def send_message_to_user(user_id, text):
|
||
"""向指定用户发送消息(作为 BOT)"""
|
||
data = {
|
||
"chat_id": user_id,
|
||
"text": text
|
||
}
|
||
response = requests.post(f"{BASE_URL}/sendMessage", json=data)
|
||
return response.json()
|
||
|
||
def format_message(msg):
|
||
"""格式化消息显示"""
|
||
from_user = msg.get('from', {})
|
||
is_bot = from_user.get('is_bot', False)
|
||
username = from_user.get('username', '')
|
||
first_name = from_user.get('first_name', '')
|
||
|
||
sender = f"{'🤖 BOT' if is_bot else '👤 User'}"
|
||
sender_name = username or first_name or 'Unknown'
|
||
|
||
text = msg.get('text', '')
|
||
photo = msg.get('photo')
|
||
document = msg.get('document')
|
||
|
||
result = [f"\n{'='*60}"]
|
||
result.append(f"{sender} {sender_name}")
|
||
result.append('-'*60)
|
||
|
||
if text:
|
||
result.append(text)
|
||
|
||
if photo:
|
||
result.append("[图片消息]")
|
||
if msg.get('caption'):
|
||
result.append(f"说明: {msg['caption']}")
|
||
|
||
if document:
|
||
doc_name = document.get('file_name', 'unknown')
|
||
result.append(f"[文档: {doc_name}]")
|
||
if msg.get('caption'):
|
||
result.append(f"说明: {msg['caption']}")
|
||
|
||
if 'reply_markup' in msg:
|
||
markup = msg['reply_markup']
|
||
if 'inline_keyboard' in markup:
|
||
result.append("\n[内联按钮:]")
|
||
for row in markup['inline_keyboard']:
|
||
for btn in row:
|
||
callback = btn.get('callback_data', btn.get('url', ''))
|
||
result.append(f" [{btn.get('text')}] -> {callback}")
|
||
|
||
if 'keyboard' in markup:
|
||
result.append("\n[回复键盘:]")
|
||
for row in markup['keyboard']:
|
||
row_buttons = []
|
||
for btn in row:
|
||
btn_text = btn.get('text') if isinstance(btn, dict) else str(btn)
|
||
row_buttons.append(btn_text)
|
||
result.append(f" {' | '.join(row_buttons)}")
|
||
|
||
result.append('='*60)
|
||
return '\n'.join(result)
|
||
|
||
def main():
|
||
print("=" * 60)
|
||
print("Telegram BOT 交互式测试工具")
|
||
print("=" * 60)
|
||
|
||
# 首先,我们需要找到一个有效的聊天
|
||
print("\n正在查找活动聊天...")
|
||
updates = get_updates(timeout=5)
|
||
|
||
active_chats = {}
|
||
if updates.get('result'):
|
||
for update in updates['result']:
|
||
if 'message' in update:
|
||
msg = update['message']
|
||
chat_id = msg['chat']['id']
|
||
chat_title = msg['chat'].get('title') or msg['chat'].get('first_name', str(chat_id))
|
||
|
||
if chat_id not in active_chats:
|
||
active_chats[chat_id] = chat_title
|
||
|
||
if active_chats:
|
||
print(f"\n找到 {len(active_chats)} 个活动聊天:")
|
||
for i, (chat_id, title) in enumerate(active_chats.items(), 1):
|
||
print(f" {i}. {title} (ID: {chat_id})")
|
||
|
||
print("\n提示: 要与 BOT 交互,请:")
|
||
print("1. 在 Telegram 中打开 @openaiw_bot")
|
||
print("2. 发送 /start 命令")
|
||
print("3. 然后运行这个脚本来查看响应")
|
||
else:
|
||
print("\n没有找到活动聊天。")
|
||
print("\n要开始测试,请:")
|
||
print("1. 在 Telegram 中搜索 @openaiw_bot")
|
||
print("2. 发送任意消息(例如 /start)")
|
||
print("3. 然后重新运行这个脚本")
|
||
|
||
# 监听新消息
|
||
print("\n" + "="*60)
|
||
print("开始监听消息... (按 Ctrl+C 退出)")
|
||
print("="*60)
|
||
|
||
last_update_id = None
|
||
if updates.get('result'):
|
||
last_update_id = updates['result'][-1]['update_id']
|
||
|
||
all_messages = []
|
||
|
||
try:
|
||
while True:
|
||
updates = get_updates(offset=last_update_id, timeout=30)
|
||
|
||
if updates.get('result'):
|
||
for update in updates['result']:
|
||
last_update_id = update['update_id'] + 1
|
||
|
||
if 'message' in update:
|
||
msg = update['message']
|
||
print(format_message(msg))
|
||
all_messages.append(msg)
|
||
|
||
except KeyboardInterrupt:
|
||
print("\n\n" + "="*60)
|
||
print("停止监听")
|
||
print("="*60)
|
||
|
||
# 保存所有消息
|
||
if all_messages:
|
||
with open("bot_interaction_log.json", "w", encoding="utf-8") as f:
|
||
json.dump(all_messages, f, indent=2, ensure_ascii=False)
|
||
print(f"\n✓ 已保存 {len(all_messages)} 条消息到 bot_interaction_log.json")
|
||
|
||
if __name__ == "__main__":
|
||
main()
|