From 3e311d4d266206619d22d769a40871386ff824dc Mon Sep 17 00:00:00 2001 From: woshiqp465 Date: Tue, 4 Nov 2025 16:39:42 +0800 Subject: [PATCH] chore: add ktyyds bot script --- bots/ktyyds_bot/finance_agent.py | 909 +++++++++++++++++++++++++++++++ 1 file changed, 909 insertions(+) create mode 100644 bots/ktyyds_bot/finance_agent.py diff --git a/bots/ktyyds_bot/finance_agent.py b/bots/ktyyds_bot/finance_agent.py new file mode 100644 index 00000000..d2935dbf --- /dev/null +++ b/bots/ktyyds_bot/finance_agent.py @@ -0,0 +1,909 @@ +#!/usr/bin/env python3 +""" +财务管家 AI Agent - 终极版 +真正智能、极致体验 +创作者:kt暴君 +版本:3.0 Ultimate +""" + +import asyncio +import json +import logging +import shutil +from typing import Dict, Any, List, Optional, Tuple, Deque +from datetime import datetime, timedelta +import subprocess +import os +import re +from collections import defaultdict, Counter, deque + +logger = logging.getLogger(__name__) + +class FinanceAgentUltimate: + """财务管家 - 终极智能版""" + + def __init__(self): + self.name = "财务管家" + self.creator = "kt暴君" + self.version = "3.0 Ultimate" + self.api_url = "http://192.168.9.149:5666/api/finance/transactions" + + # 缓存数据,避免重复请求 + self.data_cache = None + self.cache_time = None + self.cache_ttl = 60 # 缓存60秒 + + # 上下文记忆系统(按用户ID存储) + self.user_contexts = {} # {user_id: {state, data, timestamp}} + self.general_chat_cli = os.getenv("FINANCE_AGENT_GENERAL_CLI", "codex") + self.general_histories: Dict[str, Deque[Tuple[str, str]]] = {} + + async def initialize(self): + """初始化""" + logger.info(f"✅ {self.name} v{self.version} 已启动 - 终极智能模式") + # 预加载数据 + await self._get_data(force_refresh=True) + return True + + async def _get_data(self, force_refresh=False) -> List[Dict]: + """获取数据(带缓存)""" + now = datetime.now() + + if ( + not force_refresh + and self.data_cache is not None + and self.cache_time is not None + and (now - self.cache_time).total_seconds() < self.cache_ttl + ): + return self.data_cache + + try: + import requests + + resp = requests.get(self.api_url, timeout=10) + if resp.status_code == 200: + payload = resp.json() + if isinstance(payload, dict): + self.data_cache = payload.get("data", []) + else: + self.data_cache = [] + self.cache_time = now + logger.info(f"数据已更新: {len(self.data_cache)} 条记录") + return self.data_cache + + logger.error(f"获取数据失败: 状态码 {resp.status_code}") + except Exception as e: + logger.error(f"获取数据失败: {e}") + + return self.data_cache or [] + + async def process_command(self, command: str, context: Dict = None) -> str: + """主处理入口 - 智能路由(支持上下文)""" + cmd = command.lower().strip() + user_id = context.get("user_id", "default") if context else "default" + + # 检查是否有待处理的上下文 + if user_id in self.user_contexts: + ctx = self.user_contexts[user_id] + # 检查上下文是否过期(5分钟) + from datetime import datetime, timedelta + if datetime.now() - ctx.get("timestamp", datetime.now()) < timedelta(minutes=5): + # 尝试处理上下文响应 + result = await self._handle_context_response(command, user_id, ctx) + if result: + return result + else: + # 上下文过期,清除 + del self.user_contexts[user_id] + + # 1. 账单查询(最高优先级) + if self._is_bill_query(command): + return await self._handle_bill_query(command, context) + + # 2. 网站相关(优先于状态检查,避免被拦截) + if any(kw in cmd for kw in ["网站", "website", "web"]) and "状态" in cmd: + return await self._handle_website(command) + + # 3. 虚拟机/服务器(优先处理具体对象) + if any(kw in cmd for kw in ["虚拟机", "vm", "服务器"]) and not any(kw in cmd for kw in ["状态", "重启"]): + return await self._handle_vm(command) + + # 4. 状态检查 + if any(kw in cmd for kw in ["状态", "status", "运行", "健康", "正常吗"]): + return await self._handle_status_check(command) + + # 5. 统计分析 + if any(kw in cmd for kw in ["统计", "分析", "趋势", "汇总", "总共"]): + return await self._handle_statistics(command) + + # 6. 数据查询 + if any(kw in cmd for kw in ["本月", "今天", "本周", "昨天", "最近"]): + return await self._handle_time_query(command) + + # 7. 系统管理 + if "重启" in cmd: + return await self._handle_restart(command) + + # 8. 日志查看 + if any(kw in cmd for kw in ["日志", "log", "错误", "异常"]): + return await self._handle_logs(command) + + # 9. 帮助 + if any(kw in cmd for kw in ["帮助", "help", "功能", "怎么用", "命令"]): + return await self._handle_help(command) + + # 10. 智能对话 + return await self._handle_conversation(command, context) + + # ========== 上下文管理 ========== + + async def _handle_context_response(self, command: str, user_id: str, ctx: Dict) -> Optional[str]: + """处理上下文响应""" + state = ctx.get("state") + + # 年份选择上下文 + if state == "awaiting_year_selection": + # 检查用户是否输入了年份 + year_match = re.search(r'20(2[0-9])', command) + if year_match: + year = int(year_match.group(0)) + months = ctx.get("months", []) + + # 更新月份的年份 + for m in months: + m["year"] = year + + # 清除上下文 + del self.user_contexts[user_id] + + # 执行查询 + data = await self._get_data() + return await self._execute_bill_query(data, months) + + # 检查是否输入了年份关键词 + if "2024" in command or "2025" in command or "今年" in command or "去年" in command: + if "2024" in command: + year = 2024 + elif "2025" in command or "今年" in command: + year = 2025 + elif "去年" in command: + year = datetime.now().year - 1 + else: + return None + + months = ctx.get("months", []) + for m in months: + m["year"] = year + + del self.user_contexts[user_id] + data = await self._get_data() + return await self._execute_bill_query(data, months) + + return None + + def _set_context(self, user_id: str, state: str, data: Dict): + """设置用户上下文""" + from datetime import datetime + self.user_contexts[user_id] = { + "state": state, + "timestamp": datetime.now(), + **data + } + + # ========== 账单查询 ========== + + def _is_bill_query(self, cmd: str) -> bool: + """判断是否是账单查询""" + month_patterns = [r'\d{1,2}月', r'[一二三四五六七八九十]{1,2}月', r'\d{4}年\d{1,2}月'] + has_month = any(re.search(p, cmd) for p in month_patterns) + has_query = any(w in cmd for w in ["账单", "记录", "查", "看", "显示"]) + return has_month and has_query + + async def _handle_bill_query(self, command: str, context: Dict = None) -> str: + """处理账单查询 - 极致智能(支持上下文)""" + data = await self._get_data() + user_id = context.get("user_id", "default") if context else "default" + + # 解析月份 + months = self._parse_months(command) + if not months: + return "❓ 请告诉我要查询哪个月份\n\n例如:\n• 查9月账单\n• 查2024年9月\n• 查7月和8月" + + # 检查是否需要询问年份(只有当用户使用当前年份且命令中没有明确年份时才询问) + current_year = datetime.now().year + # 检查命令中是否包含年份数字 + has_explicit_year = bool(re.search(r'\d{4}年', command)) + need_confirm = any(m["year"] == current_year for m in months) and not has_explicit_year + + if need_confirm: + # 检查历史数据 + year_stats = self._get_year_stats(data, months) + if len(year_stats) > 1: + # 多年都有数据,需要用户确认 + # 保存上下文 + self._set_context(user_id, "awaiting_year_selection", { + "months": months, + "year_stats": year_stats, + "original_command": command + }) + + month_names = "、".join([f"{m['month']}月" for m in months]) + options = [] + for year in sorted(year_stats.keys(), reverse=True): + options.append(f"• {year}年{month_names}: {year_stats[year]['count']}条记录,总计 ¥{year_stats[year]['total']:.3f}") + + return f"📊 发现多个年份都有{month_names}的数据:\n\n" + "\n".join(options) + f"\n\n❓ 请直接回复年份:\n• 2025\n• 2024" + + # 执行查询 + return await self._execute_bill_query(data, months) + + def _parse_months(self, cmd: str) -> List[Dict]: + """解析月份""" + months = [] + current_year = datetime.now().year + + cn_map = {"一": 1, "二": 2, "三": 3, "四": 4, "五": 5, "六": 6, + "七": 7, "八": 8, "九": 9, "十": 10, "十一": 11, "十二": 12} + + # 完整格式:2024年9月 + for m in re.finditer(r'(\d{4})年(\d{1,2})月', cmd): + year, month = int(m.group(1)), int(m.group(2)) + if 1 <= month <= 12 and 2020 <= year <= 2030: + months.append({"year": year, "month": month}) + + # 简单格式:9月(使用当前年份) + if not months: + for m in re.finditer(r'(\d{1,2})月', cmd): + month = int(m.group(1)) + if 1 <= month <= 12: + months.append({"year": current_year, "month": month}) + + # 中文月份 + for cn, num in cn_map.items(): + if f"{cn}月" in cmd and not any(m["month"] == num for m in months): + months.append({"year": current_year, "month": num}) + + # 去重 + seen = set() + unique = [] + for m in months: + key = (m["year"], m["month"]) + if key not in seen: + seen.add(key) + unique.append(m) + + return unique + + def _get_year_stats(self, data: List[Dict], months: List[Dict]) -> Dict: + """获取各年份的统计""" + stats = {} + for item in data: + date_str = item.get("transactionDate", "") + if not date_str: + continue + + try: + date = datetime.strptime(date_str, "%Y-%m-%d") + for m in months: + if date.month == m["month"]: + year = date.year + if year not in stats: + stats[year] = {"count": 0, "total": 0} + stats[year]["count"] += 1 + stats[year]["total"] += float(item.get("amount", 0)) + break + except: + continue + + return stats + + async def _execute_bill_query(self, data: List[Dict], months: List[Dict]) -> str: + """执行账单查询""" + # 筛选数据 + filtered = [] + for item in data: + date_str = item.get("transactionDate", "") + if not date_str: + continue + + try: + date = datetime.strptime(date_str, "%Y-%m-%d") + for m in months: + if date.year == m["year"] and date.month == m["month"]: + filtered.append(item) + break + except: + continue + + if not filtered: + month_desc = " + ".join([f"{m['year']}年{m['month']}月" for m in months]) + # 智能提示有数据的月份 + all_months = set() + for item in data: + date_str = item.get("transactionDate", "") + if date_str: + try: + all_months.add(date_str[:7]) + except: + pass + recent = sorted(all_months, reverse=True)[:5] + hints = "\n".join([f"• {m}" for m in recent]) + return f"📊 {month_desc} 暂无记录\n\n💡 最近有数据的月份:\n{hints}" + + # 生成详细报告 + filtered.sort(key=lambda x: x.get("transactionDate", ""), reverse=True) + + # 分月统计 + by_month = defaultdict(list) + for item in filtered: + month_key = item.get("transactionDate", "")[:7] + by_month[month_key].append(item) + + # 生成报告 + total = sum(float(i.get("amount", 0)) for i in filtered) + month_desc = " + ".join([f"{m['year']}年{m['month']}月" for m in months]) + + report = f"📊 {month_desc} 账单详情\n{'='*50}\n\n" + + for month_key in sorted(by_month.keys(), reverse=True): + items = by_month[month_key] + subtotal = sum(float(i.get("amount", 0)) for i in items) + + # 格式化月份 + y, m = month_key.split('-') + report += f"📅 {y}年{int(m)}月 ({len(items)}笔)\n{'-'*60}\n" + + # 表格头 + report += f"{'序号':<4} {'日期':<12} {'金额':<12} {'币种':<6} {'描述'}\n" + report += f"{'━'*60}\n" + + # 显示所有记录(完整) + for idx, item in enumerate(items, 1): + date = item.get("transactionDate", "") + amt = float(item.get("amount", 0)) + desc = item.get("description", "无描述") + curr = item.get("currency", "CNY") + + # 截断过长描述 + if len(desc) > 25: + desc = desc[:22] + "..." + + report += f"{idx:<4} {date:<12} {amt:<12.2f} {curr:<6} {desc}\n" + + report += f"{'-'*60}\n" + report += f"💰 小计: ¥{subtotal:.3f}\n\n" + + report += f"{'='*50}\n" + report += f"💰 总计: ¥{total:.3f}\n" + report += f"📝 共 {len(filtered)} 笔交易\n" + report += f"🕐 {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n" + + return report + + # ========== 状态检查 ========== + + async def _handle_status_check(self, command: str) -> str: + """状态检查 - 详细健康报告""" + checks = [] + + # 1. 机器人状态 + bot_status = await self._check_bot_status() + checks.append(bot_status) + + # 2. API状态 + api_status = await self._check_api_status() + checks.append(api_status) + + # 3. 虚拟机状态 + vm_status = await self._check_vm_status() + checks.append(vm_status) + + # 4. 数据统计 + data_status = await self._check_data_status() + checks.append(data_status) + + # 汇总健康度 + health_score = sum(1 for c in checks if "✅" in c["status"]) / len(checks) * 100 + + report = f"📊 系统健康度报告\n{'='*50}\n\n" + report += f"🏥 健康评分: {health_score:.0f}%\n\n" + + for check in checks: + report += f"{check['icon']} {check['name']}: {check['status']}\n" + if check.get("detail"): + report += f" {check['detail']}\n" + + report += f"\n{'='*50}\n" + report += f"🕐 {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n" + + if health_score < 80: + report += "\n⚠️ 系统存在问题,建议检查!" + + return report + + async def _check_bot_status(self) -> Dict: + """检查机器人状态""" + try: + result = subprocess.run( + "ps aux | grep telegram_webhook_bot.py | grep -v grep", + shell=True, capture_output=True, text=True, timeout=5 + ) + running = bool(result.stdout.strip()) + + if running: + # 获取运行时间 + lines = result.stdout.strip().split('\n') + if lines: + return { + "icon": "🤖", "name": "机器人", + "status": "✅ 运行中", + "detail": "服务正常" + } + + return {"icon": "🤖", "name": "机器人", "status": "❌ 已停止", "detail": "需要重启"} + except: + return {"icon": "🤖", "name": "机器人", "status": "⚠️ 检查失败", "detail": None} + + async def _check_api_status(self) -> Dict: + """检查API状态""" + try: + data = await self._get_data() + count = len(data) + return { + "icon": "🌐", "name": "API服务", + "status": f"✅ 正常", + "detail": f"{count}条记录" + } + except: + return {"icon": "🌐", "name": "API服务", "status": "❌ 异常", "detail": "无法连接"} + + async def _check_vm_status(self) -> Dict: + """检查虚拟机状态""" + try: + import socket + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(3) + result = sock.connect_ex(("192.168.9.149", 22)) + sock.close() + + if result == 0: + return {"icon": "💻", "name": "虚拟机", "status": "✅ 在线", "detail": "SSH可达"} + return {"icon": "💻", "name": "虚拟机", "status": "❌ 离线", "detail": "无法连接"} + except: + return {"icon": "💻", "name": "虚拟机", "status": "⚠️ 检查失败", "detail": None} + + async def _check_data_status(self) -> Dict: + """检查数据状态""" + try: + data = await self._get_data() + if not data: + return {"icon": "📊", "name": "数据", "status": "⚠️ 无数据", "detail": None} + + # 最新记录时间 + dates = [d.get("transactionDate", "") for d in data if d.get("transactionDate")] + if dates: + latest = max(dates) + return { + "icon": "📊", "name": "数据", + "status": "✅ 正常", + "detail": f"最新: {latest}" + } + + return {"icon": "📊", "name": "数据", "status": "⚠️ 异常", "detail": "无日期"} + except: + return {"icon": "📊", "name": "数据", "status": "❌ 错误", "detail": None} + + # ========== 统计分析 ========== + + async def _handle_statistics(self, command: str) -> str: + """统计分析 - 可视化报告""" + data = await self._get_data() + + if not data: + return "❌ 暂无数据" + + now = datetime.now() + today = now.date() + month_start = now.replace(day=1).date() + year_start = now.replace(month=1, day=1).date() + + today_total = 0 + month_total = 0 + year_total = 0 + + # 分类统计 + categories = Counter() + monthly = defaultdict(float) + + for item in data: + date_str = item.get("transactionDate", "") + if not date_str: + continue + + try: + date = datetime.strptime(date_str, "%Y-%m-%d").date() + amount = float(item.get("amount", 0)) + + if date == today: + today_total += amount + if date >= month_start: + month_total += amount + if date >= year_start: + year_total += amount + + # 月度统计 + month_key = date_str[:7] + monthly[month_key] += amount + + # 分类统计 + desc = item.get("description", "其他") + categories[desc] += amount + except: + continue + + # 生成报告 + report = f"📊 财务统计分析\n{'='*50}\n\n" + + # 基础统计 + report += "📈 基础数据\n" + report += f"• 今日支出: ¥{today_total:.3f}\n" + report += f"• 本月支出: ¥{month_total:.3f}\n" + report += f"• 本年支出: ¥{year_total:.3f}\n" + report += f"• 总记录数: {len(data)} 条\n\n" + + # 月度趋势(最近6个月) + recent_months = sorted(monthly.keys(), reverse=True)[:6] + if recent_months: + report += "📅 月度趋势(最近6个月)\n" + max_amt = max(monthly[m] for m in recent_months) + for month in recent_months: + amt = monthly[month] + bar_len = int((amt / max_amt) * 20) if max_amt > 0 else 0 + bar = "█" * bar_len + report += f"{month}: ¥{amt:>10.2f} {bar}\n" + report += "\n" + + # Top消费 + top_cat = categories.most_common(5) + if top_cat: + report += "🏆 Top 5 消费项目\n" + for idx, (cat, amt) in enumerate(top_cat, 1): + report += f"{idx}. {cat[:20]:20} ¥{amt:.3f}\n" + + report += f"\n{'='*50}\n" + report += f"🕐 {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n" + + return report + + # ========== 其他功能 ========== + + async def _handle_time_query(self, command: str) -> str: + """时间范围查询""" + data = await self._get_data() + now = datetime.now() + + # 解析时间范围 + if "今天" in command or "今日" in command: + start = now.date() + title = "今日" + elif "昨天" in command: + start = (now - timedelta(days=1)).date() + title = "昨日" + elif "本周" in command: + start = (now - timedelta(days=now.weekday())).date() + title = "本周" + elif "本月" in command: + start = now.replace(day=1).date() + title = "本月" + else: + start = (now - timedelta(days=7)).date() + title = "最近7天" + + # 筛选数据 + filtered = [] + for item in data: + date_str = item.get("transactionDate", "") + if date_str: + try: + date = datetime.strptime(date_str, "%Y-%m-%d").date() + if date >= start: + filtered.append(item) + except: + pass + + if not filtered: + return f"📊 {title}暂无交易记录" + + total = sum(float(i.get("amount", 0)) for i in filtered) + + report = f"📊 {title}交易汇总\n{'='*50}\n\n" + report += f"💰 总金额: ¥{total:.3f}\n" + report += f"📝 交易笔数: {len(filtered)}\n\n" + + # 最近5笔 + filtered.sort(key=lambda x: x.get("transactionDate", ""), reverse=True) + report += "最近交易:\n" + for idx, item in enumerate(filtered[:5], 1): + date = item.get("transactionDate", "") + amt = item.get("amount", 0) + desc = item.get("description", "")[:20] + report += f"{idx}. {date} ¥{amt} {desc}\n" + + if len(filtered) > 5: + report += f"... 还有 {len(filtered)-5} 笔\n" + + return report + + async def _handle_restart(self, command: str) -> str: + """重启服务""" + cmd_lower = command.lower() + + # 如果是询问怎么做,提供指导而不是直接执行 + if any(w in cmd_lower for w in ["怎么", "如何", "怎样", "如何做"]): + return """ +🔧 重启服务指南 + +【重启机器人】 +直接输入:重启机器人 + +【重启网站】 +直接输入:重启网站 + +⚠️ 注意:重启会导致短暂服务中断 +""" + + # 明确的重启命令才执行 + if "机器人" in command or "bot" in command: + return await self._restart_bot() + + return "请明确要重启什么:\n• 重启机器人\n• 重启网站" + + async def _restart_bot(self) -> str: + """重启机器人""" + try: + subprocess.run("pkill -f telegram_webhook_bot.py", shell=True) + await asyncio.sleep(1) + subprocess.Popen( + ["python3", "/Users/fuwuqi/telegram_webhook_bot.py"], + stdout=open("/Users/fuwuqi/bot.log", "a"), + stderr=subprocess.STDOUT, + start_new_session=True + ) + await asyncio.sleep(2) + result = subprocess.run( + "ps aux | grep telegram_webhook_bot.py | grep -v grep | wc -l", + shell=True, capture_output=True, text=True + ) + if int(result.stdout.strip()) > 0: + return "✅ 机器人重启成功!" + return "❌ 重启失败,请检查日志" + except Exception as e: + return f"❌ 重启失败: {e}" + + async def _handle_logs(self, command: str) -> str: + """日志查看""" + try: + lines = 20 + if "错误" in command or "error" in command.lower(): + # 只显示错误 + result = subprocess.run( + f"tail -100 /Users/fuwuqi/bot.log | grep -i error | tail -20", + shell=True, capture_output=True, text=True + ) + content = result.stdout or "无错误日志" + return f"📝 错误日志\n{'='*50}\n{content}" + + with open("/Users/fuwuqi/bot.log", "r") as f: + content = "".join(f.readlines()[-lines:]) + + return f"📝 最近日志({lines}行)\n{'='*50}\n{content}" + except: + return "❌ 无法读取日志" + + async def _handle_help(self, command: str) -> str: + """帮助""" + return f""" +🤖 {self.name} v{self.version} +创作者:{self.creator} + +💡 我能做什么: + +【📊 账单查询】 +• 查9月账单 - 智能询问年份 +• 查2024年9月 - 指定年份 +• 查7月和8月 - 多月汇总 + +【📈 统计分析】 +• 统计 - 详细财务报告 +• 本月 / 今天 - 时间范围查询 +• 分析 - 趋势和可视化 + +【🔧 系统管理】 +• 状态 - 健康度报告 +• 重启机器人 - 安全重启 +• 日志 - 查看运行日志 + +【💬 智能对话】 +直接说话即可,我会理解! + +📱 系统信息: +• 网站: http://192.168.9.149:5666 +• 机器人: @ktyyds_bot +• 作者: {self.creator} +""" + + async def _handle_vm(self, command: str) -> str: + """虚拟机信息""" + return """ +💻 虚拟机信息 +{'='*50} +• IP: 192.168.9.149 +• 用户: atai +• 端口: 5666 +• SSH: 22 +• 数据库: /home/atai/finwise-pro/apps/backend/storage/finance.db +""" + + async def _handle_website(self, command: str) -> str: + """网站相关""" + try: + import socket + sock = socket.socket() + sock.settimeout(3) + result = sock.connect_ex(("192.168.9.149", 5666)) + sock.close() + + if result == 0: + return "🌐 网站状态: ✅ 正常\n地址: http://192.168.9.149:5666" + return "🌐 网站状态: ❌ 无法访问\n\n建议:\n• 检查网络\n• 检查虚拟机" + except: + return "🌐 网站状态: ⚠️ 检查失败" + + async def _handle_conversation(self, command: str, context: Dict = None) -> str: + """智能对话""" + user_id = context.get("user_id", "default") if context else "default" + cmd = command.lower() + + if any(w in cmd for w in ["你好", "hi", "hello", "嗨"]): + response = f"您好!我是{self.name},智能财务助手。\n\n输入'帮助'查看我能做什么!" + self._remember_exchange(user_id, command, response) + return response + + if any(w in cmd for w in ["谢谢", "thank"]): + response = "不客气!随时为您服务 😊" + self._remember_exchange(user_id, command, response) + return response + + general_reply = await self._generate_general_reply(user_id, command) + if general_reply: + return general_reply + + if any(symbol in command for symbol in ["?", "?", "吗"]): + response = ( + "我暂时没抓住您的重点,能再具体描述一下吗?\n" + "我擅长:账单查询、网站/虚拟机/机器人状态、财务统计,还有日常聊天。" + ) + self._remember_exchange(user_id, command, response) + return response + + response = ( + f"🤔 我理解您说:\"{command}\"\n\n" + "如果是系统相关需求,可以告诉我:账单查询、网站/虚拟机状态、机器人、统计分析等。\n" + "如果只是想聊聊别的,也可以直接说,我会用自然语言和您交流。" + ) + self._remember_exchange(user_id, command, response) + return response + + async def _generate_general_reply(self, user_id: str, message: str) -> Optional[str]: + """调用外部对话模型,处理泛化对话""" + if not self.general_chat_cli: + return None + + prompt = self._build_general_prompt(user_id, message) + reply = await self._call_general_cli(prompt) + + if reply: + self._remember_exchange(user_id, message, reply) + return reply + return None + + def _build_general_prompt(self, user_id: str, message: str) -> str: + """构建与Codex/Claude CLI的对话提示""" + history = self._get_history(user_id) + recent = list(history)[-6:] + + formatted_history = [] + for role, text in recent: + speaker = "用户" if role == "user" else "助手" + formatted_history.append(f"{speaker}: {text}") + + history_block = "\n".join(formatted_history) if formatted_history else "(无历史对话)" + + return ( + "你是“财务管家助手”,负责财务系统网站、虚拟机和机器人运维," + "也可以跟用户就任何话题自然聊天,提供常识、建议或情绪支持。\n" + "- 语气自然亲切,像同事聊天\n" + "- 回答保持务实,最多6句中文\n" + "- 如果问题超出掌握范围,要坦诚说明并给出下一步建议\n" + "- 优先考虑是否与财务系统、虚拟机、机器人相关,如是则结合实际经验回复\n" + "- 如果用户只是闲聊,就顺着话题交流\n\n" + f"历史对话:\n{history_block}\n\n" + f"用户: {message}\n" + "助手:" + ) + + async def _call_general_cli(self, prompt: str) -> Optional[str]: + """调用本地CLI进行对话""" + if not shutil.which(self.general_chat_cli): + logger.warning(f"未找到对话CLI:{self.general_chat_cli}") + return None + + def _run() -> Optional[str]: + try: + completed = subprocess.run( + [self.general_chat_cli, prompt], + capture_output=True, + text=True, + timeout=45 + ) + if completed.returncode != 0: + logger.error( + "对话CLI执行失败: returncode=%s, stderr=%s", + completed.returncode, + completed.stderr.strip() + ) + return None + return self._sanitize_cli_output(completed.stdout) + except subprocess.TimeoutExpired: + logger.error("对话CLI执行超时") + return None + except Exception as exc: + logger.error(f"对话CLI调用异常: {exc}") + return None + + return await asyncio.to_thread(_run) + + def _get_history(self, user_id: str) -> Deque[Tuple[str, str]]: + """获取或初始化用户对话历史""" + history = self.general_histories.get(user_id) + if history is None: + history = deque(maxlen=12) + self.general_histories[user_id] = history + return history + + def _remember_exchange(self, user_id: str, user_text: str, assistant_text: str): + """记录一次对话轮次""" + history = self._get_history(user_id) + history.append(("user", user_text.strip())) + history.append(("assistant", assistant_text.strip())) + + def _sanitize_cli_output(self, output: str) -> Optional[str]: + """清理CLI输出,去除控制字符和冗余前缀""" + if not output: + return None + + cleaned = output.replace("\r\n", "\n").replace("\r", "\n").strip() + cleaned = re.sub(r"\x1b\[[0-9;]*[A-Za-z]", "", cleaned) + + lines = [line.strip() for line in cleaned.split("\n") if line.strip()] + if not lines: + return None + + # 删除通用前缀 + first_line = lines[0] + first_line = re.sub(r"^(assistant|codex|claude|response)[::]\s*", "", first_line, flags=re.IGNORECASE) + lines[0] = first_line + + cleaned_text = "\n".join(lines).strip() + return cleaned_text[:1200] + +# 全局实例 +finance_agent = FinanceAgentUltimate() + +async def handle_agent_query(query: str, context: Dict = None) -> str: + """主入口""" + if not hasattr(finance_agent, 'initialized'): + await finance_agent.initialize() + finance_agent.initialized = True + + return await finance_agent.process_command(query, context)