#!/usr/bin/env python3 """ Telethon Session 创建脚本 这个脚本会帮助你创建 Telegram session 文件 运行后按照提示操作即可 """ import asyncio from telethon import TelegramClient from telethon.errors import SessionPasswordNeededError # 你的 API 凭证 API_ID = 24660516 API_HASH = "eae564578880a59c9963916ff1bbbd3a" # Session 文件名 SESSION_NAME = "funstat_bot_session" async def create_session(): """创建 Telegram session 文件""" print("=" * 60) print("🚀 Telegram Session 创建工具") print("=" * 60) print() # 创建客户端 client = TelegramClient(SESSION_NAME, API_ID, API_HASH) print("📱 正在连接到 Telegram...") await client.connect() if not await client.is_user_authorized(): print() print("=" * 60) print("需要登录到你的 Telegram 账号") print("=" * 60) print() # 请求手机号 phone = input("请输入你的手机号 (格式: +86xxxxxxxxxx): ") try: await client.send_code_request(phone) print() print("✅ 验证码已发送到你的 Telegram 客户端") print(" (请检查你的 Telegram 应用)") print() # 请求验证码 code = input("请输入收到的验证码: ") try: await client.sign_in(phone, code) except SessionPasswordNeededError: # 如果账号设置了两步验证 print() print("⚠️ 你的账号启用了两步验证") password = input("请输入你的两步验证密码: ") await client.sign_in(password=password) except Exception as e: print(f"❌ 登录失败: {e}") await client.disconnect() return False # 验证登录成功 me = await client.get_me() print() print("=" * 60) print("✅ 登录成功!") print("=" * 60) print(f"账号信息:") print(f" - 用户名: @{me.username if me.username else '未设置'}") print(f" - 姓名: {me.first_name} {me.last_name if me.last_name else ''}") print(f" - 手机号: {me.phone}") print(f" - ID: {me.id}") print() print(f"✅ Session 文件已创建: {SESSION_NAME}.session") print() # 测试与 @openaiw_bot 的连接 print("=" * 60) print("🔍 正在测试与 @openaiw_bot 的连接...") print("=" * 60) try: # 查找 bot bot_entity = await client.get_entity("@openaiw_bot") print(f"✅ 找到 BOT: {bot_entity.first_name}") print(f" BOT ID: {bot_entity.id}") print() # 发送测试消息 print("📤 发送测试消息: /start") await client.send_message(bot_entity, "/start") print("⏳ 等待 BOT 响应 (最多 10 秒)...") # 等待响应 async def wait_for_response(): async for message in client.iter_messages(bot_entity, limit=1): return message try: response = await asyncio.wait_for(wait_for_response(), timeout=10.0) if response: print() print("=" * 60) print("✅ 收到 BOT 响应:") print("=" * 60) print(response.text[:500]) # 显示前500个字符 print() if len(response.text) > 500: print(f"... (还有 {len(response.text) - 500} 个字符)") print() except asyncio.TimeoutError: print("⚠️ 10秒内未收到响应,但连接正常") print() except Exception as e: print(f"❌ 测试失败: {e}") print() await client.disconnect() print("=" * 60) print("🎉 完成!") print("=" * 60) print() print(f"Session 文件位置: ./{SESSION_NAME}.session") print("现在你可以在 MCP 服务器中使用这个 session 文件了") print() return True if __name__ == "__main__": # 检查是否已安装 telethon try: import telethon print(f"✅ Telethon 版本: {telethon.__version__}") print() except ImportError: print("❌ 未安装 Telethon") print("请先运行: pip install telethon") exit(1) # 运行创建流程 asyncio.run(create_session())