chore: add ktyyds bot script

This commit is contained in:
woshiqp465
2025-11-04 16:39:42 +08:00
parent f4cd0a5f22
commit 3e311d4d26

View File

@@ -0,0 +1,909 @@
#!/usr/bin/env python3
"""
财务管家 AI Agent - 终极版
真正智能、极致体验
创作者kt暴君
版本3.0 Ultimate
"""
import asyncio
import json
import logging
import shutil
from typing import Dict, Any, List, Optional, Tuple, Deque
from datetime import datetime, timedelta
import subprocess
import os
import re
from collections import defaultdict, Counter, deque
logger = logging.getLogger(__name__)
class FinanceAgentUltimate:
"""财务管家 - 终极智能版"""
def __init__(self):
self.name = "财务管家"
self.creator = "kt暴君"
self.version = "3.0 Ultimate"
self.api_url = "http://192.168.9.149:5666/api/finance/transactions"
# 缓存数据,避免重复请求
self.data_cache = None
self.cache_time = None
self.cache_ttl = 60 # 缓存60秒
# 上下文记忆系统按用户ID存储
self.user_contexts = {} # {user_id: {state, data, timestamp}}
self.general_chat_cli = os.getenv("FINANCE_AGENT_GENERAL_CLI", "codex")
self.general_histories: Dict[str, Deque[Tuple[str, str]]] = {}
async def initialize(self):
"""初始化"""
logger.info(f"{self.name} v{self.version} 已启动 - 终极智能模式")
# 预加载数据
await self._get_data(force_refresh=True)
return True
async def _get_data(self, force_refresh=False) -> List[Dict]:
"""获取数据(带缓存)"""
now = datetime.now()
if (
not force_refresh
and self.data_cache is not None
and self.cache_time is not None
and (now - self.cache_time).total_seconds() < self.cache_ttl
):
return self.data_cache
try:
import requests
resp = requests.get(self.api_url, timeout=10)
if resp.status_code == 200:
payload = resp.json()
if isinstance(payload, dict):
self.data_cache = payload.get("data", [])
else:
self.data_cache = []
self.cache_time = now
logger.info(f"数据已更新: {len(self.data_cache)} 条记录")
return self.data_cache
logger.error(f"获取数据失败: 状态码 {resp.status_code}")
except Exception as e:
logger.error(f"获取数据失败: {e}")
return self.data_cache or []
async def process_command(self, command: str, context: Dict = None) -> str:
"""主处理入口 - 智能路由(支持上下文)"""
cmd = command.lower().strip()
user_id = context.get("user_id", "default") if context else "default"
# 检查是否有待处理的上下文
if user_id in self.user_contexts:
ctx = self.user_contexts[user_id]
# 检查上下文是否过期5分钟
from datetime import datetime, timedelta
if datetime.now() - ctx.get("timestamp", datetime.now()) < timedelta(minutes=5):
# 尝试处理上下文响应
result = await self._handle_context_response(command, user_id, ctx)
if result:
return result
else:
# 上下文过期,清除
del self.user_contexts[user_id]
# 1. 账单查询(最高优先级)
if self._is_bill_query(command):
return await self._handle_bill_query(command, context)
# 2. 网站相关(优先于状态检查,避免被拦截)
if any(kw in cmd for kw in ["网站", "website", "web"]) and "状态" in cmd:
return await self._handle_website(command)
# 3. 虚拟机/服务器(优先处理具体对象)
if any(kw in cmd for kw in ["虚拟机", "vm", "服务器"]) and not any(kw in cmd for kw in ["状态", "重启"]):
return await self._handle_vm(command)
# 4. 状态检查
if any(kw in cmd for kw in ["状态", "status", "运行", "健康", "正常吗"]):
return await self._handle_status_check(command)
# 5. 统计分析
if any(kw in cmd for kw in ["统计", "分析", "趋势", "汇总", "总共"]):
return await self._handle_statistics(command)
# 6. 数据查询
if any(kw in cmd for kw in ["本月", "今天", "本周", "昨天", "最近"]):
return await self._handle_time_query(command)
# 7. 系统管理
if "重启" in cmd:
return await self._handle_restart(command)
# 8. 日志查看
if any(kw in cmd for kw in ["日志", "log", "错误", "异常"]):
return await self._handle_logs(command)
# 9. 帮助
if any(kw in cmd for kw in ["帮助", "help", "功能", "怎么用", "命令"]):
return await self._handle_help(command)
# 10. 智能对话
return await self._handle_conversation(command, context)
# ========== 上下文管理 ==========
async def _handle_context_response(self, command: str, user_id: str, ctx: Dict) -> Optional[str]:
"""处理上下文响应"""
state = ctx.get("state")
# 年份选择上下文
if state == "awaiting_year_selection":
# 检查用户是否输入了年份
year_match = re.search(r'20(2[0-9])', command)
if year_match:
year = int(year_match.group(0))
months = ctx.get("months", [])
# 更新月份的年份
for m in months:
m["year"] = year
# 清除上下文
del self.user_contexts[user_id]
# 执行查询
data = await self._get_data()
return await self._execute_bill_query(data, months)
# 检查是否输入了年份关键词
if "2024" in command or "2025" in command or "今年" in command or "去年" in command:
if "2024" in command:
year = 2024
elif "2025" in command or "今年" in command:
year = 2025
elif "去年" in command:
year = datetime.now().year - 1
else:
return None
months = ctx.get("months", [])
for m in months:
m["year"] = year
del self.user_contexts[user_id]
data = await self._get_data()
return await self._execute_bill_query(data, months)
return None
def _set_context(self, user_id: str, state: str, data: Dict):
"""设置用户上下文"""
from datetime import datetime
self.user_contexts[user_id] = {
"state": state,
"timestamp": datetime.now(),
**data
}
# ========== 账单查询 ==========
def _is_bill_query(self, cmd: str) -> bool:
"""判断是否是账单查询"""
month_patterns = [r'\d{1,2}月', r'[一二三四五六七八九十]{1,2}月', r'\d{4}\d{1,2}月']
has_month = any(re.search(p, cmd) for p in month_patterns)
has_query = any(w in cmd for w in ["账单", "记录", "", "", "显示"])
return has_month and has_query
async def _handle_bill_query(self, command: str, context: Dict = None) -> str:
"""处理账单查询 - 极致智能(支持上下文)"""
data = await self._get_data()
user_id = context.get("user_id", "default") if context else "default"
# 解析月份
months = self._parse_months(command)
if not months:
return "❓ 请告诉我要查询哪个月份\n\n例如:\n• 查9月账单\n• 查2024年9月\n• 查7月和8月"
# 检查是否需要询问年份(只有当用户使用当前年份且命令中没有明确年份时才询问)
current_year = datetime.now().year
# 检查命令中是否包含年份数字
has_explicit_year = bool(re.search(r'\d{4}', command))
need_confirm = any(m["year"] == current_year for m in months) and not has_explicit_year
if need_confirm:
# 检查历史数据
year_stats = self._get_year_stats(data, months)
if len(year_stats) > 1:
# 多年都有数据,需要用户确认
# 保存上下文
self._set_context(user_id, "awaiting_year_selection", {
"months": months,
"year_stats": year_stats,
"original_command": command
})
month_names = "".join([f"{m['month']}" for m in months])
options = []
for year in sorted(year_stats.keys(), reverse=True):
options.append(f"{year}{month_names}: {year_stats[year]['count']}条记录,总计 ¥{year_stats[year]['total']:.3f}")
return f"📊 发现多个年份都有{month_names}的数据:\n\n" + "\n".join(options) + f"\n\n❓ 请直接回复年份:\n• 2025\n• 2024"
# 执行查询
return await self._execute_bill_query(data, months)
def _parse_months(self, cmd: str) -> List[Dict]:
"""解析月份"""
months = []
current_year = datetime.now().year
cn_map = {"": 1, "": 2, "": 3, "": 4, "": 5, "": 6,
"": 7, "": 8, "": 9, "": 10, "十一": 11, "十二": 12}
# 完整格式2024年9月
for m in re.finditer(r'(\d{4})年(\d{1,2})月', cmd):
year, month = int(m.group(1)), int(m.group(2))
if 1 <= month <= 12 and 2020 <= year <= 2030:
months.append({"year": year, "month": month})
# 简单格式9月使用当前年份
if not months:
for m in re.finditer(r'(\d{1,2})月', cmd):
month = int(m.group(1))
if 1 <= month <= 12:
months.append({"year": current_year, "month": month})
# 中文月份
for cn, num in cn_map.items():
if f"{cn}" in cmd and not any(m["month"] == num for m in months):
months.append({"year": current_year, "month": num})
# 去重
seen = set()
unique = []
for m in months:
key = (m["year"], m["month"])
if key not in seen:
seen.add(key)
unique.append(m)
return unique
def _get_year_stats(self, data: List[Dict], months: List[Dict]) -> Dict:
"""获取各年份的统计"""
stats = {}
for item in data:
date_str = item.get("transactionDate", "")
if not date_str:
continue
try:
date = datetime.strptime(date_str, "%Y-%m-%d")
for m in months:
if date.month == m["month"]:
year = date.year
if year not in stats:
stats[year] = {"count": 0, "total": 0}
stats[year]["count"] += 1
stats[year]["total"] += float(item.get("amount", 0))
break
except:
continue
return stats
async def _execute_bill_query(self, data: List[Dict], months: List[Dict]) -> str:
"""执行账单查询"""
# 筛选数据
filtered = []
for item in data:
date_str = item.get("transactionDate", "")
if not date_str:
continue
try:
date = datetime.strptime(date_str, "%Y-%m-%d")
for m in months:
if date.year == m["year"] and date.month == m["month"]:
filtered.append(item)
break
except:
continue
if not filtered:
month_desc = " + ".join([f"{m['year']}{m['month']}" for m in months])
# 智能提示有数据的月份
all_months = set()
for item in data:
date_str = item.get("transactionDate", "")
if date_str:
try:
all_months.add(date_str[:7])
except:
pass
recent = sorted(all_months, reverse=True)[:5]
hints = "\n".join([f"{m}" for m in recent])
return f"📊 {month_desc} 暂无记录\n\n💡 最近有数据的月份:\n{hints}"
# 生成详细报告
filtered.sort(key=lambda x: x.get("transactionDate", ""), reverse=True)
# 分月统计
by_month = defaultdict(list)
for item in filtered:
month_key = item.get("transactionDate", "")[:7]
by_month[month_key].append(item)
# 生成报告
total = sum(float(i.get("amount", 0)) for i in filtered)
month_desc = " + ".join([f"{m['year']}{m['month']}" for m in months])
report = f"📊 {month_desc} 账单详情\n{'='*50}\n\n"
for month_key in sorted(by_month.keys(), reverse=True):
items = by_month[month_key]
subtotal = sum(float(i.get("amount", 0)) for i in items)
# 格式化月份
y, m = month_key.split('-')
report += f"📅 {y}{int(m)}月 ({len(items)}笔)\n{'-'*60}\n"
# 表格头
report += f"{'序号':<4} {'日期':<12} {'金额':<12} {'币种':<6} {'描述'}\n"
report += f"{''*60}\n"
# 显示所有记录(完整)
for idx, item in enumerate(items, 1):
date = item.get("transactionDate", "")
amt = float(item.get("amount", 0))
desc = item.get("description", "无描述")
curr = item.get("currency", "CNY")
# 截断过长描述
if len(desc) > 25:
desc = desc[:22] + "..."
report += f"{idx:<4} {date:<12} {amt:<12.2f} {curr:<6} {desc}\n"
report += f"{'-'*60}\n"
report += f"💰 小计: ¥{subtotal:.3f}\n\n"
report += f"{'='*50}\n"
report += f"💰 总计: ¥{total:.3f}\n"
report += f"📝 共 {len(filtered)} 笔交易\n"
report += f"🕐 {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
return report
# ========== 状态检查 ==========
async def _handle_status_check(self, command: str) -> str:
"""状态检查 - 详细健康报告"""
checks = []
# 1. 机器人状态
bot_status = await self._check_bot_status()
checks.append(bot_status)
# 2. API状态
api_status = await self._check_api_status()
checks.append(api_status)
# 3. 虚拟机状态
vm_status = await self._check_vm_status()
checks.append(vm_status)
# 4. 数据统计
data_status = await self._check_data_status()
checks.append(data_status)
# 汇总健康度
health_score = sum(1 for c in checks if "" in c["status"]) / len(checks) * 100
report = f"📊 系统健康度报告\n{'='*50}\n\n"
report += f"🏥 健康评分: {health_score:.0f}%\n\n"
for check in checks:
report += f"{check['icon']} {check['name']}: {check['status']}\n"
if check.get("detail"):
report += f" {check['detail']}\n"
report += f"\n{'='*50}\n"
report += f"🕐 {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
if health_score < 80:
report += "\n⚠️ 系统存在问题,建议检查!"
return report
async def _check_bot_status(self) -> Dict:
"""检查机器人状态"""
try:
result = subprocess.run(
"ps aux | grep telegram_webhook_bot.py | grep -v grep",
shell=True, capture_output=True, text=True, timeout=5
)
running = bool(result.stdout.strip())
if running:
# 获取运行时间
lines = result.stdout.strip().split('\n')
if lines:
return {
"icon": "🤖", "name": "机器人",
"status": "✅ 运行中",
"detail": "服务正常"
}
return {"icon": "🤖", "name": "机器人", "status": "❌ 已停止", "detail": "需要重启"}
except:
return {"icon": "🤖", "name": "机器人", "status": "⚠️ 检查失败", "detail": None}
async def _check_api_status(self) -> Dict:
"""检查API状态"""
try:
data = await self._get_data()
count = len(data)
return {
"icon": "🌐", "name": "API服务",
"status": f"✅ 正常",
"detail": f"{count}条记录"
}
except:
return {"icon": "🌐", "name": "API服务", "status": "❌ 异常", "detail": "无法连接"}
async def _check_vm_status(self) -> Dict:
"""检查虚拟机状态"""
try:
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(3)
result = sock.connect_ex(("192.168.9.149", 22))
sock.close()
if result == 0:
return {"icon": "💻", "name": "虚拟机", "status": "✅ 在线", "detail": "SSH可达"}
return {"icon": "💻", "name": "虚拟机", "status": "❌ 离线", "detail": "无法连接"}
except:
return {"icon": "💻", "name": "虚拟机", "status": "⚠️ 检查失败", "detail": None}
async def _check_data_status(self) -> Dict:
"""检查数据状态"""
try:
data = await self._get_data()
if not data:
return {"icon": "📊", "name": "数据", "status": "⚠️ 无数据", "detail": None}
# 最新记录时间
dates = [d.get("transactionDate", "") for d in data if d.get("transactionDate")]
if dates:
latest = max(dates)
return {
"icon": "📊", "name": "数据",
"status": "✅ 正常",
"detail": f"最新: {latest}"
}
return {"icon": "📊", "name": "数据", "status": "⚠️ 异常", "detail": "无日期"}
except:
return {"icon": "📊", "name": "数据", "status": "❌ 错误", "detail": None}
# ========== 统计分析 ==========
async def _handle_statistics(self, command: str) -> str:
"""统计分析 - 可视化报告"""
data = await self._get_data()
if not data:
return "❌ 暂无数据"
now = datetime.now()
today = now.date()
month_start = now.replace(day=1).date()
year_start = now.replace(month=1, day=1).date()
today_total = 0
month_total = 0
year_total = 0
# 分类统计
categories = Counter()
monthly = defaultdict(float)
for item in data:
date_str = item.get("transactionDate", "")
if not date_str:
continue
try:
date = datetime.strptime(date_str, "%Y-%m-%d").date()
amount = float(item.get("amount", 0))
if date == today:
today_total += amount
if date >= month_start:
month_total += amount
if date >= year_start:
year_total += amount
# 月度统计
month_key = date_str[:7]
monthly[month_key] += amount
# 分类统计
desc = item.get("description", "其他")
categories[desc] += amount
except:
continue
# 生成报告
report = f"📊 财务统计分析\n{'='*50}\n\n"
# 基础统计
report += "📈 基础数据\n"
report += f"• 今日支出: ¥{today_total:.3f}\n"
report += f"• 本月支出: ¥{month_total:.3f}\n"
report += f"• 本年支出: ¥{year_total:.3f}\n"
report += f"• 总记录数: {len(data)}\n\n"
# 月度趋势最近6个月
recent_months = sorted(monthly.keys(), reverse=True)[:6]
if recent_months:
report += "📅 月度趋势最近6个月\n"
max_amt = max(monthly[m] for m in recent_months)
for month in recent_months:
amt = monthly[month]
bar_len = int((amt / max_amt) * 20) if max_amt > 0 else 0
bar = "" * bar_len
report += f"{month}: ¥{amt:>10.2f} {bar}\n"
report += "\n"
# Top消费
top_cat = categories.most_common(5)
if top_cat:
report += "🏆 Top 5 消费项目\n"
for idx, (cat, amt) in enumerate(top_cat, 1):
report += f"{idx}. {cat[:20]:20} ¥{amt:.3f}\n"
report += f"\n{'='*50}\n"
report += f"🕐 {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
return report
# ========== 其他功能 ==========
async def _handle_time_query(self, command: str) -> str:
"""时间范围查询"""
data = await self._get_data()
now = datetime.now()
# 解析时间范围
if "今天" in command or "今日" in command:
start = now.date()
title = "今日"
elif "昨天" in command:
start = (now - timedelta(days=1)).date()
title = "昨日"
elif "本周" in command:
start = (now - timedelta(days=now.weekday())).date()
title = "本周"
elif "本月" in command:
start = now.replace(day=1).date()
title = "本月"
else:
start = (now - timedelta(days=7)).date()
title = "最近7天"
# 筛选数据
filtered = []
for item in data:
date_str = item.get("transactionDate", "")
if date_str:
try:
date = datetime.strptime(date_str, "%Y-%m-%d").date()
if date >= start:
filtered.append(item)
except:
pass
if not filtered:
return f"📊 {title}暂无交易记录"
total = sum(float(i.get("amount", 0)) for i in filtered)
report = f"📊 {title}交易汇总\n{'='*50}\n\n"
report += f"💰 总金额: ¥{total:.3f}\n"
report += f"📝 交易笔数: {len(filtered)}\n\n"
# 最近5笔
filtered.sort(key=lambda x: x.get("transactionDate", ""), reverse=True)
report += "最近交易:\n"
for idx, item in enumerate(filtered[:5], 1):
date = item.get("transactionDate", "")
amt = item.get("amount", 0)
desc = item.get("description", "")[:20]
report += f"{idx}. {date} ¥{amt} {desc}\n"
if len(filtered) > 5:
report += f"... 还有 {len(filtered)-5}\n"
return report
async def _handle_restart(self, command: str) -> str:
"""重启服务"""
cmd_lower = command.lower()
# 如果是询问怎么做,提供指导而不是直接执行
if any(w in cmd_lower for w in ["怎么", "如何", "怎样", "如何做"]):
return """
🔧 重启服务指南
【重启机器人】
直接输入:重启机器人
【重启网站】
直接输入:重启网站
⚠️ 注意:重启会导致短暂服务中断
"""
# 明确的重启命令才执行
if "机器人" in command or "bot" in command:
return await self._restart_bot()
return "请明确要重启什么:\n• 重启机器人\n• 重启网站"
async def _restart_bot(self) -> str:
"""重启机器人"""
try:
subprocess.run("pkill -f telegram_webhook_bot.py", shell=True)
await asyncio.sleep(1)
subprocess.Popen(
["python3", "/Users/fuwuqi/telegram_webhook_bot.py"],
stdout=open("/Users/fuwuqi/bot.log", "a"),
stderr=subprocess.STDOUT,
start_new_session=True
)
await asyncio.sleep(2)
result = subprocess.run(
"ps aux | grep telegram_webhook_bot.py | grep -v grep | wc -l",
shell=True, capture_output=True, text=True
)
if int(result.stdout.strip()) > 0:
return "✅ 机器人重启成功!"
return "❌ 重启失败,请检查日志"
except Exception as e:
return f"❌ 重启失败: {e}"
async def _handle_logs(self, command: str) -> str:
"""日志查看"""
try:
lines = 20
if "错误" in command or "error" in command.lower():
# 只显示错误
result = subprocess.run(
f"tail -100 /Users/fuwuqi/bot.log | grep -i error | tail -20",
shell=True, capture_output=True, text=True
)
content = result.stdout or "无错误日志"
return f"📝 错误日志\n{'='*50}\n{content}"
with open("/Users/fuwuqi/bot.log", "r") as f:
content = "".join(f.readlines()[-lines:])
return f"📝 最近日志({lines}行)\n{'='*50}\n{content}"
except:
return "❌ 无法读取日志"
async def _handle_help(self, command: str) -> str:
"""帮助"""
return f"""
🤖 {self.name} v{self.version}
创作者:{self.creator}
💡 我能做什么:
【📊 账单查询】
• 查9月账单 - 智能询问年份
• 查2024年9月 - 指定年份
• 查7月和8月 - 多月汇总
【📈 统计分析】
• 统计 - 详细财务报告
• 本月 / 今天 - 时间范围查询
• 分析 - 趋势和可视化
【🔧 系统管理】
• 状态 - 健康度报告
• 重启机器人 - 安全重启
• 日志 - 查看运行日志
【💬 智能对话】
直接说话即可,我会理解!
📱 系统信息:
• 网站: http://192.168.9.149:5666
• 机器人: @ktyyds_bot
• 作者: {self.creator}
"""
async def _handle_vm(self, command: str) -> str:
"""虚拟机信息"""
return """
💻 虚拟机信息
{'='*50}
• IP: 192.168.9.149
• 用户: atai
• 端口: 5666
• SSH: 22
• 数据库: /home/atai/finwise-pro/apps/backend/storage/finance.db
"""
async def _handle_website(self, command: str) -> str:
"""网站相关"""
try:
import socket
sock = socket.socket()
sock.settimeout(3)
result = sock.connect_ex(("192.168.9.149", 5666))
sock.close()
if result == 0:
return "🌐 网站状态: ✅ 正常\n地址: http://192.168.9.149:5666"
return "🌐 网站状态: ❌ 无法访问\n\n建议:\n• 检查网络\n• 检查虚拟机"
except:
return "🌐 网站状态: ⚠️ 检查失败"
async def _handle_conversation(self, command: str, context: Dict = None) -> str:
"""智能对话"""
user_id = context.get("user_id", "default") if context else "default"
cmd = command.lower()
if any(w in cmd for w in ["你好", "hi", "hello", ""]):
response = f"您好!我是{self.name},智能财务助手。\n\n输入'帮助'查看我能做什么!"
self._remember_exchange(user_id, command, response)
return response
if any(w in cmd for w in ["谢谢", "thank"]):
response = "不客气!随时为您服务 😊"
self._remember_exchange(user_id, command, response)
return response
general_reply = await self._generate_general_reply(user_id, command)
if general_reply:
return general_reply
if any(symbol in command for symbol in ["?", "", ""]):
response = (
"我暂时没抓住您的重点,能再具体描述一下吗?\n"
"我擅长:账单查询、网站/虚拟机/机器人状态、财务统计,还有日常聊天。"
)
self._remember_exchange(user_id, command, response)
return response
response = (
f"🤔 我理解您说:\"{command}\"\n\n"
"如果是系统相关需求,可以告诉我:账单查询、网站/虚拟机状态、机器人、统计分析等。\n"
"如果只是想聊聊别的,也可以直接说,我会用自然语言和您交流。"
)
self._remember_exchange(user_id, command, response)
return response
async def _generate_general_reply(self, user_id: str, message: str) -> Optional[str]:
"""调用外部对话模型,处理泛化对话"""
if not self.general_chat_cli:
return None
prompt = self._build_general_prompt(user_id, message)
reply = await self._call_general_cli(prompt)
if reply:
self._remember_exchange(user_id, message, reply)
return reply
return None
def _build_general_prompt(self, user_id: str, message: str) -> str:
"""构建与Codex/Claude CLI的对话提示"""
history = self._get_history(user_id)
recent = list(history)[-6:]
formatted_history = []
for role, text in recent:
speaker = "用户" if role == "user" else "助手"
formatted_history.append(f"{speaker}: {text}")
history_block = "\n".join(formatted_history) if formatted_history else "(无历史对话)"
return (
"你是“财务管家助手”,负责财务系统网站、虚拟机和机器人运维,"
"也可以跟用户就任何话题自然聊天,提供常识、建议或情绪支持。\n"
"- 语气自然亲切,像同事聊天\n"
"- 回答保持务实最多6句中文\n"
"- 如果问题超出掌握范围,要坦诚说明并给出下一步建议\n"
"- 优先考虑是否与财务系统、虚拟机、机器人相关,如是则结合实际经验回复\n"
"- 如果用户只是闲聊,就顺着话题交流\n\n"
f"历史对话:\n{history_block}\n\n"
f"用户: {message}\n"
"助手:"
)
async def _call_general_cli(self, prompt: str) -> Optional[str]:
"""调用本地CLI进行对话"""
if not shutil.which(self.general_chat_cli):
logger.warning(f"未找到对话CLI{self.general_chat_cli}")
return None
def _run() -> Optional[str]:
try:
completed = subprocess.run(
[self.general_chat_cli, prompt],
capture_output=True,
text=True,
timeout=45
)
if completed.returncode != 0:
logger.error(
"对话CLI执行失败: returncode=%s, stderr=%s",
completed.returncode,
completed.stderr.strip()
)
return None
return self._sanitize_cli_output(completed.stdout)
except subprocess.TimeoutExpired:
logger.error("对话CLI执行超时")
return None
except Exception as exc:
logger.error(f"对话CLI调用异常: {exc}")
return None
return await asyncio.to_thread(_run)
def _get_history(self, user_id: str) -> Deque[Tuple[str, str]]:
"""获取或初始化用户对话历史"""
history = self.general_histories.get(user_id)
if history is None:
history = deque(maxlen=12)
self.general_histories[user_id] = history
return history
def _remember_exchange(self, user_id: str, user_text: str, assistant_text: str):
"""记录一次对话轮次"""
history = self._get_history(user_id)
history.append(("user", user_text.strip()))
history.append(("assistant", assistant_text.strip()))
def _sanitize_cli_output(self, output: str) -> Optional[str]:
"""清理CLI输出去除控制字符和冗余前缀"""
if not output:
return None
cleaned = output.replace("\r\n", "\n").replace("\r", "\n").strip()
cleaned = re.sub(r"\x1b\[[0-9;]*[A-Za-z]", "", cleaned)
lines = [line.strip() for line in cleaned.split("\n") if line.strip()]
if not lines:
return None
# 删除通用前缀
first_line = lines[0]
first_line = re.sub(r"^(assistant|codex|claude|response)[:]\s*", "", first_line, flags=re.IGNORECASE)
lines[0] = first_line
cleaned_text = "\n".join(lines).strip()
return cleaned_text[:1200]
# 全局实例
finance_agent = FinanceAgentUltimate()
async def handle_agent_query(query: str, context: Dict = None) -> str:
"""主入口"""
if not hasattr(finance_agent, 'initialized'):
await finance_agent.initialize()
finance_agent.initialized = True
return await finance_agent.process_command(query, context)