#!/usr/bin/env python3 """ 财务管家 AI Agent - 终极版 真正智能、极致体验 创作者:kt暴君 版本:3.0 Ultimate """ import asyncio import json import logging import shutil from typing import Dict, Any, List, Optional, Tuple, Deque from datetime import datetime, timedelta import subprocess import os import re from collections import defaultdict, Counter, deque logger = logging.getLogger(__name__) class FinanceAgentUltimate: """财务管家 - 终极智能版""" def __init__(self): self.name = "财务管家" self.creator = "kt暴君" self.version = "3.0 Ultimate" self.api_url = "http://192.168.9.149:5666/api/finance/transactions" # 缓存数据,避免重复请求 self.data_cache = None self.cache_time = None self.cache_ttl = 60 # 缓存60秒 # 上下文记忆系统(按用户ID存储) self.user_contexts = {} # {user_id: {state, data, timestamp}} self.general_chat_cli = os.getenv("FINANCE_AGENT_GENERAL_CLI", "codex") self.general_histories: Dict[str, Deque[Tuple[str, str]]] = {} async def initialize(self): """初始化""" logger.info(f"✅ {self.name} v{self.version} 已启动 - 终极智能模式") # 预加载数据 await self._get_data(force_refresh=True) return True async def _get_data(self, force_refresh=False) -> List[Dict]: """获取数据(带缓存)""" now = datetime.now() if ( not force_refresh and self.data_cache is not None and self.cache_time is not None and (now - self.cache_time).total_seconds() < self.cache_ttl ): return self.data_cache try: import requests resp = requests.get(self.api_url, timeout=10) if resp.status_code == 200: payload = resp.json() if isinstance(payload, dict): self.data_cache = payload.get("data", []) else: self.data_cache = [] self.cache_time = now logger.info(f"数据已更新: {len(self.data_cache)} 条记录") return self.data_cache logger.error(f"获取数据失败: 状态码 {resp.status_code}") except Exception as e: logger.error(f"获取数据失败: {e}") return self.data_cache or [] async def process_command(self, command: str, context: Dict = None) -> str: """主处理入口 - 智能路由(支持上下文)""" cmd = command.lower().strip() user_id = context.get("user_id", "default") if context else "default" # 检查是否有待处理的上下文 if user_id in self.user_contexts: ctx = self.user_contexts[user_id] # 检查上下文是否过期(5分钟) from datetime import datetime, timedelta if datetime.now() - ctx.get("timestamp", datetime.now()) < timedelta(minutes=5): # 尝试处理上下文响应 result = await self._handle_context_response(command, user_id, ctx) if result: return result else: # 上下文过期,清除 del self.user_contexts[user_id] # 1. 账单查询(最高优先级) if self._is_bill_query(command): return await self._handle_bill_query(command, context) # 2. 网站相关(优先于状态检查,避免被拦截) if any(kw in cmd for kw in ["网站", "website", "web"]) and "状态" in cmd: return await self._handle_website(command) # 3. 虚拟机/服务器(优先处理具体对象) if any(kw in cmd for kw in ["虚拟机", "vm", "服务器"]) and not any(kw in cmd for kw in ["状态", "重启"]): return await self._handle_vm(command) # 4. 状态检查 if any(kw in cmd for kw in ["状态", "status", "运行", "健康", "正常吗"]): return await self._handle_status_check(command) # 5. 统计分析 if any(kw in cmd for kw in ["统计", "分析", "趋势", "汇总", "总共"]): return await self._handle_statistics(command) # 6. 数据查询 if any(kw in cmd for kw in ["本月", "今天", "本周", "昨天", "最近"]): return await self._handle_time_query(command) # 7. 系统管理 if "重启" in cmd: return await self._handle_restart(command) # 8. 日志查看 if any(kw in cmd for kw in ["日志", "log", "错误", "异常"]): return await self._handle_logs(command) # 9. 帮助 if any(kw in cmd for kw in ["帮助", "help", "功能", "怎么用", "命令"]): return await self._handle_help(command) # 10. 智能对话 return await self._handle_conversation(command, context) # ========== 上下文管理 ========== async def _handle_context_response(self, command: str, user_id: str, ctx: Dict) -> Optional[str]: """处理上下文响应""" state = ctx.get("state") # 年份选择上下文 if state == "awaiting_year_selection": # 检查用户是否输入了年份 year_match = re.search(r'20(2[0-9])', command) if year_match: year = int(year_match.group(0)) months = ctx.get("months", []) # 更新月份的年份 for m in months: m["year"] = year # 清除上下文 del self.user_contexts[user_id] # 执行查询 data = await self._get_data() return await self._execute_bill_query(data, months) # 检查是否输入了年份关键词 if "2024" in command or "2025" in command or "今年" in command or "去年" in command: if "2024" in command: year = 2024 elif "2025" in command or "今年" in command: year = 2025 elif "去年" in command: year = datetime.now().year - 1 else: return None months = ctx.get("months", []) for m in months: m["year"] = year del self.user_contexts[user_id] data = await self._get_data() return await self._execute_bill_query(data, months) return None def _set_context(self, user_id: str, state: str, data: Dict): """设置用户上下文""" from datetime import datetime self.user_contexts[user_id] = { "state": state, "timestamp": datetime.now(), **data } # ========== 账单查询 ========== def _is_bill_query(self, cmd: str) -> bool: """判断是否是账单查询""" month_patterns = [r'\d{1,2}月', r'[一二三四五六七八九十]{1,2}月', r'\d{4}年\d{1,2}月'] has_month = any(re.search(p, cmd) for p in month_patterns) has_query = any(w in cmd for w in ["账单", "记录", "查", "看", "显示"]) return has_month and has_query async def _handle_bill_query(self, command: str, context: Dict = None) -> str: """处理账单查询 - 极致智能(支持上下文)""" data = await self._get_data() user_id = context.get("user_id", "default") if context else "default" # 解析月份 months = self._parse_months(command) if not months: return "❓ 请告诉我要查询哪个月份\n\n例如:\n• 查9月账单\n• 查2024年9月\n• 查7月和8月" # 检查是否需要询问年份(只有当用户使用当前年份且命令中没有明确年份时才询问) current_year = datetime.now().year # 检查命令中是否包含年份数字 has_explicit_year = bool(re.search(r'\d{4}年', command)) need_confirm = any(m["year"] == current_year for m in months) and not has_explicit_year if need_confirm: # 检查历史数据 year_stats = self._get_year_stats(data, months) if len(year_stats) > 1: # 多年都有数据,需要用户确认 # 保存上下文 self._set_context(user_id, "awaiting_year_selection", { "months": months, "year_stats": year_stats, "original_command": command }) month_names = "、".join([f"{m['month']}月" for m in months]) options = [] for year in sorted(year_stats.keys(), reverse=True): options.append(f"• {year}年{month_names}: {year_stats[year]['count']}条记录,总计 ¥{year_stats[year]['total']:.3f}") return f"📊 发现多个年份都有{month_names}的数据:\n\n" + "\n".join(options) + f"\n\n❓ 请直接回复年份:\n• 2025\n• 2024" # 执行查询 return await self._execute_bill_query(data, months) def _parse_months(self, cmd: str) -> List[Dict]: """解析月份""" months = [] current_year = datetime.now().year cn_map = {"一": 1, "二": 2, "三": 3, "四": 4, "五": 5, "六": 6, "七": 7, "八": 8, "九": 9, "十": 10, "十一": 11, "十二": 12} # 完整格式:2024年9月 for m in re.finditer(r'(\d{4})年(\d{1,2})月', cmd): year, month = int(m.group(1)), int(m.group(2)) if 1 <= month <= 12 and 2020 <= year <= 2030: months.append({"year": year, "month": month}) # 简单格式:9月(使用当前年份) if not months: for m in re.finditer(r'(\d{1,2})月', cmd): month = int(m.group(1)) if 1 <= month <= 12: months.append({"year": current_year, "month": month}) # 中文月份 for cn, num in cn_map.items(): if f"{cn}月" in cmd and not any(m["month"] == num for m in months): months.append({"year": current_year, "month": num}) # 去重 seen = set() unique = [] for m in months: key = (m["year"], m["month"]) if key not in seen: seen.add(key) unique.append(m) return unique def _get_year_stats(self, data: List[Dict], months: List[Dict]) -> Dict: """获取各年份的统计""" stats = {} for item in data: date_str = item.get("transactionDate", "") if not date_str: continue try: date = datetime.strptime(date_str, "%Y-%m-%d") for m in months: if date.month == m["month"]: year = date.year if year not in stats: stats[year] = {"count": 0, "total": 0} stats[year]["count"] += 1 stats[year]["total"] += float(item.get("amount", 0)) break except: continue return stats async def _execute_bill_query(self, data: List[Dict], months: List[Dict]) -> str: """执行账单查询""" # 筛选数据 filtered = [] for item in data: date_str = item.get("transactionDate", "") if not date_str: continue try: date = datetime.strptime(date_str, "%Y-%m-%d") for m in months: if date.year == m["year"] and date.month == m["month"]: filtered.append(item) break except: continue if not filtered: month_desc = " + ".join([f"{m['year']}年{m['month']}月" for m in months]) # 智能提示有数据的月份 all_months = set() for item in data: date_str = item.get("transactionDate", "") if date_str: try: all_months.add(date_str[:7]) except: pass recent = sorted(all_months, reverse=True)[:5] hints = "\n".join([f"• {m}" for m in recent]) return f"📊 {month_desc} 暂无记录\n\n💡 最近有数据的月份:\n{hints}" # 生成详细报告 filtered.sort(key=lambda x: x.get("transactionDate", ""), reverse=True) # 分月统计 by_month = defaultdict(list) for item in filtered: month_key = item.get("transactionDate", "")[:7] by_month[month_key].append(item) # 生成报告 total = sum(float(i.get("amount", 0)) for i in filtered) month_desc = " + ".join([f"{m['year']}年{m['month']}月" for m in months]) report = f"📊 {month_desc} 账单详情\n{'='*50}\n\n" for month_key in sorted(by_month.keys(), reverse=True): items = by_month[month_key] subtotal = sum(float(i.get("amount", 0)) for i in items) # 格式化月份 y, m = month_key.split('-') report += f"📅 {y}年{int(m)}月 ({len(items)}笔)\n{'-'*60}\n" # 表格头 report += f"{'序号':<4} {'日期':<12} {'金额':<12} {'币种':<6} {'描述'}\n" report += f"{'━'*60}\n" # 显示所有记录(完整) for idx, item in enumerate(items, 1): date = item.get("transactionDate", "") amt = float(item.get("amount", 0)) desc = item.get("description", "无描述") curr = item.get("currency", "CNY") # 截断过长描述 if len(desc) > 25: desc = desc[:22] + "..." report += f"{idx:<4} {date:<12} {amt:<12.2f} {curr:<6} {desc}\n" report += f"{'-'*60}\n" report += f"💰 小计: ¥{subtotal:.3f}\n\n" report += f"{'='*50}\n" report += f"💰 总计: ¥{total:.3f}\n" report += f"📝 共 {len(filtered)} 笔交易\n" report += f"🕐 {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n" return report # ========== 状态检查 ========== async def _handle_status_check(self, command: str) -> str: """状态检查 - 详细健康报告""" checks = [] # 1. 机器人状态 bot_status = await self._check_bot_status() checks.append(bot_status) # 2. API状态 api_status = await self._check_api_status() checks.append(api_status) # 3. 虚拟机状态 vm_status = await self._check_vm_status() checks.append(vm_status) # 4. 数据统计 data_status = await self._check_data_status() checks.append(data_status) # 汇总健康度 health_score = sum(1 for c in checks if "✅" in c["status"]) / len(checks) * 100 report = f"📊 系统健康度报告\n{'='*50}\n\n" report += f"🏥 健康评分: {health_score:.0f}%\n\n" for check in checks: report += f"{check['icon']} {check['name']}: {check['status']}\n" if check.get("detail"): report += f" {check['detail']}\n" report += f"\n{'='*50}\n" report += f"🕐 {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n" if health_score < 80: report += "\n⚠️ 系统存在问题,建议检查!" return report async def _check_bot_status(self) -> Dict: """检查机器人状态""" try: result = subprocess.run( "ps aux | grep telegram_webhook_bot.py | grep -v grep", shell=True, capture_output=True, text=True, timeout=5 ) running = bool(result.stdout.strip()) if running: # 获取运行时间 lines = result.stdout.strip().split('\n') if lines: return { "icon": "🤖", "name": "机器人", "status": "✅ 运行中", "detail": "服务正常" } return {"icon": "🤖", "name": "机器人", "status": "❌ 已停止", "detail": "需要重启"} except: return {"icon": "🤖", "name": "机器人", "status": "⚠️ 检查失败", "detail": None} async def _check_api_status(self) -> Dict: """检查API状态""" try: data = await self._get_data() count = len(data) return { "icon": "🌐", "name": "API服务", "status": f"✅ 正常", "detail": f"{count}条记录" } except: return {"icon": "🌐", "name": "API服务", "status": "❌ 异常", "detail": "无法连接"} async def _check_vm_status(self) -> Dict: """检查虚拟机状态""" try: import socket sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(3) result = sock.connect_ex(("192.168.9.149", 22)) sock.close() if result == 0: return {"icon": "💻", "name": "虚拟机", "status": "✅ 在线", "detail": "SSH可达"} return {"icon": "💻", "name": "虚拟机", "status": "❌ 离线", "detail": "无法连接"} except: return {"icon": "💻", "name": "虚拟机", "status": "⚠️ 检查失败", "detail": None} async def _check_data_status(self) -> Dict: """检查数据状态""" try: data = await self._get_data() if not data: return {"icon": "📊", "name": "数据", "status": "⚠️ 无数据", "detail": None} # 最新记录时间 dates = [d.get("transactionDate", "") for d in data if d.get("transactionDate")] if dates: latest = max(dates) return { "icon": "📊", "name": "数据", "status": "✅ 正常", "detail": f"最新: {latest}" } return {"icon": "📊", "name": "数据", "status": "⚠️ 异常", "detail": "无日期"} except: return {"icon": "📊", "name": "数据", "status": "❌ 错误", "detail": None} # ========== 统计分析 ========== async def _handle_statistics(self, command: str) -> str: """统计分析 - 可视化报告""" data = await self._get_data() if not data: return "❌ 暂无数据" now = datetime.now() today = now.date() month_start = now.replace(day=1).date() year_start = now.replace(month=1, day=1).date() today_total = 0 month_total = 0 year_total = 0 # 分类统计 categories = Counter() monthly = defaultdict(float) for item in data: date_str = item.get("transactionDate", "") if not date_str: continue try: date = datetime.strptime(date_str, "%Y-%m-%d").date() amount = float(item.get("amount", 0)) if date == today: today_total += amount if date >= month_start: month_total += amount if date >= year_start: year_total += amount # 月度统计 month_key = date_str[:7] monthly[month_key] += amount # 分类统计 desc = item.get("description", "其他") categories[desc] += amount except: continue # 生成报告 report = f"📊 财务统计分析\n{'='*50}\n\n" # 基础统计 report += "📈 基础数据\n" report += f"• 今日支出: ¥{today_total:.3f}\n" report += f"• 本月支出: ¥{month_total:.3f}\n" report += f"• 本年支出: ¥{year_total:.3f}\n" report += f"• 总记录数: {len(data)} 条\n\n" # 月度趋势(最近6个月) recent_months = sorted(monthly.keys(), reverse=True)[:6] if recent_months: report += "📅 月度趋势(最近6个月)\n" max_amt = max(monthly[m] for m in recent_months) for month in recent_months: amt = monthly[month] bar_len = int((amt / max_amt) * 20) if max_amt > 0 else 0 bar = "█" * bar_len report += f"{month}: ¥{amt:>10.2f} {bar}\n" report += "\n" # Top消费 top_cat = categories.most_common(5) if top_cat: report += "🏆 Top 5 消费项目\n" for idx, (cat, amt) in enumerate(top_cat, 1): report += f"{idx}. {cat[:20]:20} ¥{amt:.3f}\n" report += f"\n{'='*50}\n" report += f"🕐 {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n" return report # ========== 其他功能 ========== async def _handle_time_query(self, command: str) -> str: """时间范围查询""" data = await self._get_data() now = datetime.now() # 解析时间范围 if "今天" in command or "今日" in command: start = now.date() title = "今日" elif "昨天" in command: start = (now - timedelta(days=1)).date() title = "昨日" elif "本周" in command: start = (now - timedelta(days=now.weekday())).date() title = "本周" elif "本月" in command: start = now.replace(day=1).date() title = "本月" else: start = (now - timedelta(days=7)).date() title = "最近7天" # 筛选数据 filtered = [] for item in data: date_str = item.get("transactionDate", "") if date_str: try: date = datetime.strptime(date_str, "%Y-%m-%d").date() if date >= start: filtered.append(item) except: pass if not filtered: return f"📊 {title}暂无交易记录" total = sum(float(i.get("amount", 0)) for i in filtered) report = f"📊 {title}交易汇总\n{'='*50}\n\n" report += f"💰 总金额: ¥{total:.3f}\n" report += f"📝 交易笔数: {len(filtered)}\n\n" # 最近5笔 filtered.sort(key=lambda x: x.get("transactionDate", ""), reverse=True) report += "最近交易:\n" for idx, item in enumerate(filtered[:5], 1): date = item.get("transactionDate", "") amt = item.get("amount", 0) desc = item.get("description", "")[:20] report += f"{idx}. {date} ¥{amt} {desc}\n" if len(filtered) > 5: report += f"... 还有 {len(filtered)-5} 笔\n" return report async def _handle_restart(self, command: str) -> str: """重启服务""" cmd_lower = command.lower() # 如果是询问怎么做,提供指导而不是直接执行 if any(w in cmd_lower for w in ["怎么", "如何", "怎样", "如何做"]): return """ 🔧 重启服务指南 【重启机器人】 直接输入:重启机器人 【重启网站】 直接输入:重启网站 ⚠️ 注意:重启会导致短暂服务中断 """ # 明确的重启命令才执行 if "机器人" in command or "bot" in command: return await self._restart_bot() return "请明确要重启什么:\n• 重启机器人\n• 重启网站" async def _restart_bot(self) -> str: """重启机器人""" try: subprocess.run("pkill -f telegram_webhook_bot.py", shell=True) await asyncio.sleep(1) subprocess.Popen( ["python3", "/Users/fuwuqi/telegram_webhook_bot.py"], stdout=open("/Users/fuwuqi/bot.log", "a"), stderr=subprocess.STDOUT, start_new_session=True ) await asyncio.sleep(2) result = subprocess.run( "ps aux | grep telegram_webhook_bot.py | grep -v grep | wc -l", shell=True, capture_output=True, text=True ) if int(result.stdout.strip()) > 0: return "✅ 机器人重启成功!" return "❌ 重启失败,请检查日志" except Exception as e: return f"❌ 重启失败: {e}" async def _handle_logs(self, command: str) -> str: """日志查看""" try: lines = 20 if "错误" in command or "error" in command.lower(): # 只显示错误 result = subprocess.run( f"tail -100 /Users/fuwuqi/bot.log | grep -i error | tail -20", shell=True, capture_output=True, text=True ) content = result.stdout or "无错误日志" return f"📝 错误日志\n{'='*50}\n{content}" with open("/Users/fuwuqi/bot.log", "r") as f: content = "".join(f.readlines()[-lines:]) return f"📝 最近日志({lines}行)\n{'='*50}\n{content}" except: return "❌ 无法读取日志" async def _handle_help(self, command: str) -> str: """帮助""" return f""" 🤖 {self.name} v{self.version} 创作者:{self.creator} 💡 我能做什么: 【📊 账单查询】 • 查9月账单 - 智能询问年份 • 查2024年9月 - 指定年份 • 查7月和8月 - 多月汇总 【📈 统计分析】 • 统计 - 详细财务报告 • 本月 / 今天 - 时间范围查询 • 分析 - 趋势和可视化 【🔧 系统管理】 • 状态 - 健康度报告 • 重启机器人 - 安全重启 • 日志 - 查看运行日志 【💬 智能对话】 直接说话即可,我会理解! 📱 系统信息: • 网站: http://192.168.9.149:5666 • 机器人: @ktyyds_bot • 作者: {self.creator} """ async def _handle_vm(self, command: str) -> str: """虚拟机信息""" return """ 💻 虚拟机信息 {'='*50} • IP: 192.168.9.149 • 用户: atai • 端口: 5666 • SSH: 22 • 数据库: /home/atai/finwise-pro/apps/backend/storage/finance.db """ async def _handle_website(self, command: str) -> str: """网站相关""" try: import socket sock = socket.socket() sock.settimeout(3) result = sock.connect_ex(("192.168.9.149", 5666)) sock.close() if result == 0: return "🌐 网站状态: ✅ 正常\n地址: http://192.168.9.149:5666" return "🌐 网站状态: ❌ 无法访问\n\n建议:\n• 检查网络\n• 检查虚拟机" except: return "🌐 网站状态: ⚠️ 检查失败" async def _handle_conversation(self, command: str, context: Dict = None) -> str: """智能对话""" user_id = context.get("user_id", "default") if context else "default" cmd = command.lower() if any(w in cmd for w in ["你好", "hi", "hello", "嗨"]): response = f"您好!我是{self.name},智能财务助手。\n\n输入'帮助'查看我能做什么!" self._remember_exchange(user_id, command, response) return response if any(w in cmd for w in ["谢谢", "thank"]): response = "不客气!随时为您服务 😊" self._remember_exchange(user_id, command, response) return response general_reply = await self._generate_general_reply(user_id, command) if general_reply: return general_reply if any(symbol in command for symbol in ["?", "?", "吗"]): response = ( "我暂时没抓住您的重点,能再具体描述一下吗?\n" "我擅长:账单查询、网站/虚拟机/机器人状态、财务统计,还有日常聊天。" ) self._remember_exchange(user_id, command, response) return response response = ( f"🤔 我理解您说:\"{command}\"\n\n" "如果是系统相关需求,可以告诉我:账单查询、网站/虚拟机状态、机器人、统计分析等。\n" "如果只是想聊聊别的,也可以直接说,我会用自然语言和您交流。" ) self._remember_exchange(user_id, command, response) return response async def _generate_general_reply(self, user_id: str, message: str) -> Optional[str]: """调用外部对话模型,处理泛化对话""" if not self.general_chat_cli: return None prompt = self._build_general_prompt(user_id, message) reply = await self._call_general_cli(prompt) if reply: self._remember_exchange(user_id, message, reply) return reply return None def _build_general_prompt(self, user_id: str, message: str) -> str: """构建与Codex/Claude CLI的对话提示""" history = self._get_history(user_id) recent = list(history)[-6:] formatted_history = [] for role, text in recent: speaker = "用户" if role == "user" else "助手" formatted_history.append(f"{speaker}: {text}") history_block = "\n".join(formatted_history) if formatted_history else "(无历史对话)" return ( "你是“财务管家助手”,负责财务系统网站、虚拟机和机器人运维," "也可以跟用户就任何话题自然聊天,提供常识、建议或情绪支持。\n" "- 语气自然亲切,像同事聊天\n" "- 回答保持务实,最多6句中文\n" "- 如果问题超出掌握范围,要坦诚说明并给出下一步建议\n" "- 优先考虑是否与财务系统、虚拟机、机器人相关,如是则结合实际经验回复\n" "- 如果用户只是闲聊,就顺着话题交流\n\n" f"历史对话:\n{history_block}\n\n" f"用户: {message}\n" "助手:" ) async def _call_general_cli(self, prompt: str) -> Optional[str]: """调用本地CLI进行对话""" if not shutil.which(self.general_chat_cli): logger.warning(f"未找到对话CLI:{self.general_chat_cli}") return None def _run() -> Optional[str]: try: completed = subprocess.run( [self.general_chat_cli, prompt], capture_output=True, text=True, timeout=45 ) if completed.returncode != 0: logger.error( "对话CLI执行失败: returncode=%s, stderr=%s", completed.returncode, completed.stderr.strip() ) return None return self._sanitize_cli_output(completed.stdout) except subprocess.TimeoutExpired: logger.error("对话CLI执行超时") return None except Exception as exc: logger.error(f"对话CLI调用异常: {exc}") return None return await asyncio.to_thread(_run) def _get_history(self, user_id: str) -> Deque[Tuple[str, str]]: """获取或初始化用户对话历史""" history = self.general_histories.get(user_id) if history is None: history = deque(maxlen=12) self.general_histories[user_id] = history return history def _remember_exchange(self, user_id: str, user_text: str, assistant_text: str): """记录一次对话轮次""" history = self._get_history(user_id) history.append(("user", user_text.strip())) history.append(("assistant", assistant_text.strip())) def _sanitize_cli_output(self, output: str) -> Optional[str]: """清理CLI输出,去除控制字符和冗余前缀""" if not output: return None cleaned = output.replace("\r\n", "\n").replace("\r", "\n").strip() cleaned = re.sub(r"\x1b\[[0-9;]*[A-Za-z]", "", cleaned) lines = [line.strip() for line in cleaned.split("\n") if line.strip()] if not lines: return None # 删除通用前缀 first_line = lines[0] first_line = re.sub(r"^(assistant|codex|claude|response)[::]\s*", "", first_line, flags=re.IGNORECASE) lines[0] = first_line cleaned_text = "\n".join(lines).strip() return cleaned_text[:1200] # 全局实例 finance_agent = FinanceAgentUltimate() async def handle_agent_query(query: str, context: Dict = None) -> str: """主入口""" if not hasattr(finance_agent, 'initialized'): await finance_agent.initialize() finance_agent.initialized = True return await finance_agent.process_command(query, context)