#!/usr/bin/env python3 """ 007翻译客户自动获取脚本 通过 Funstat BOT 自动查询高质量客户 """ import asyncio import json import re from datetime import datetime from telethon import TelegramClient from telethon.tl.types import Message # 配置 API_ID = 28618843 API_HASH = "476b6116881049c68d65f108ed0c1d6d" BOT_USERNAME = "@openaiw_bot" SESSION_PATH = "/Users/lucas/telegram_sessions/funstat_bot" # 使用现有的 session # 查询关键词列表 QUERIES = [ {"type": "需求类", "keyword": "求推荐翻译"}, {"type": "需求类", "keyword": "翻译软件推荐"}, {"type": "痛点类", "keyword": "翻译不准"}, {"type": "痛点类", "keyword": "翻译太慢"}, {"type": "对比类", "keyword": "KT翻译"}, {"type": "对比类", "keyword": "翻译软件对比"}, ] class FunstatAutoQuery: """Funstat 自动查询客户端""" def __init__(self): self.client = None self.bot = None self.results = [] async def initialize(self): """初始化客户端""" print("🚀 初始化 Telegram 客户端...") self.client = TelegramClient(SESSION_PATH, API_ID, API_HASH) await self.client.start() self.bot = await self.client.get_entity(BOT_USERNAME) print(f"✅ 已连接到: {BOT_USERNAME}") async def send_command_and_wait(self, command: str, timeout: int = 15) -> str: """发送命令并等待响应""" print(f"\n📤 发送命令: {command}") # 获取发送前的最新消息ID messages = await self.client.get_messages(self.bot, limit=1) last_msg_id = messages[0].id if messages else 0 # 发送命令 await self.client.send_message(self.bot, command) # 等待响应 for i in range(timeout): await asyncio.sleep(1) new_messages = await self.client.get_messages( self.bot, limit=5, min_id=last_msg_id ) # 查找BOT的回复 for msg in reversed(new_messages): if msg.text and not msg.out: print(f"✅ 收到响应 ({len(msg.text)} 字符)") return msg.text print("⏰ 响应超时") return "" async def parse_search_results(self, text: str) -> list: """解析搜索结果""" results = [] # 提取群组/用户信息 # 格式可能是: @username, 用户名, 群组名等 lines = text.split('\n') for line in lines: # 跳过空行和标题行 if not line.strip() or line.startswith('===') or line.startswith('---'): continue # 提取用户名 @xxx usernames = re.findall(r'@(\w+)', line) # 提取群组链接 group_links = re.findall(r't\.me/(\w+)', line) if usernames or group_links: results.append({ 'raw_text': line, 'usernames': usernames, 'group_links': group_links, }) return results async def query_keyword(self, keyword: str, query_type: str): """查询单个关键词""" print(f"\n{'='*60}") print(f"🔍 查询类型: {query_type}") print(f"🔍 关键词: {keyword}") print(f"{'='*60}") # 使用 /text 命令搜索 command = f"/text {keyword}" response = await self.send_command_and_wait(command) if not response: print("❌ 未收到响应") return # 解析结果 parsed = await self.parse_search_results(response) result = { 'type': query_type, 'keyword': keyword, 'response': response, 'parsed_count': len(parsed), 'parsed_data': parsed, 'timestamp': datetime.now().isoformat() } self.results.append(result) print(f"✅ 找到 {len(parsed)} 条相关数据") # 显示前3条样例 if parsed: print("\n📋 样例数据:") for i, item in enumerate(parsed[:3], 1): print(f" {i}. {item['raw_text'][:100]}") async def run_queries(self): """执行所有查询""" print("\n🚀 开始批量查询...") print(f"📊 查询计划: {len(QUERIES)} 个关键词") for i, query in enumerate(QUERIES, 1): print(f"\n⏳ 进度: {i}/{len(QUERIES)}") await self.query_keyword(query['keyword'], query['type']) # 避免请求过快,休息2秒 if i < len(QUERIES): print("⏸️ 休息 2 秒...") await asyncio.sleep(2) def save_results(self): """保存结果到文件""" output_file = "funstat_query_results.json" with open(output_file, 'w', encoding='utf-8') as f: json.dump(self.results, f, ensure_ascii=False, indent=2) print(f"\n💾 结果已保存到: {output_file}") def generate_summary(self): """生成结果摘要""" print("\n" + "="*60) print("📊 查询结果摘要") print("="*60) total_parsed = sum(r['parsed_count'] for r in self.results) print(f"\n✅ 查询完成:") print(f" - 查询关键词数: {len(self.results)}") print(f" - 发现数据总数: {total_parsed}") print(f"\n📋 按类型统计:") type_stats = {} for result in self.results: t = result['type'] if t not in type_stats: type_stats[t] = {'count': 0, 'keywords': []} type_stats[t]['count'] += result['parsed_count'] type_stats[t]['keywords'].append(result['keyword']) for query_type, stats in type_stats.items(): print(f" - {query_type}: {stats['count']} 条数据") for kw in stats['keywords']: print(f" • {kw}") print(f"\n📁 详细数据已保存到 JSON 文件") print(f"📝 接下来需要:") print(f" 1. 分析 JSON 文件中的数据") print(f" 2. 提取用户信息") print(f" 3. 使用 funstat_user_info 验证用户价值") print(f" 4. 生成最终客户清单") async def cleanup(self): """清理资源""" if self.client: await self.client.disconnect() print("\n👋 客户端已断开") async def main(): """主函数""" print("="*60) print("🎯 007翻译客户自动获取系统") print("="*60) client = FunstatAutoQuery() try: await client.initialize() await client.run_queries() client.save_results() client.generate_summary() except Exception as e: print(f"\n❌ 错误: {e}") import traceback traceback.print_exc() finally: await client.cleanup() if __name__ == "__main__": asyncio.run(main())