910 lines
34 KiB
Python
910 lines
34 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
财务管家 AI Agent - 终极版
|
||
真正智能、极致体验
|
||
创作者:kt暴君
|
||
版本:3.0 Ultimate
|
||
"""
|
||
|
||
import asyncio
|
||
import json
|
||
import logging
|
||
import shutil
|
||
from typing import Dict, Any, List, Optional, Tuple, Deque
|
||
from datetime import datetime, timedelta
|
||
import subprocess
|
||
import os
|
||
import re
|
||
from collections import defaultdict, Counter, deque
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
class FinanceAgentUltimate:
|
||
"""财务管家 - 终极智能版"""
|
||
|
||
def __init__(self):
|
||
self.name = "财务管家"
|
||
self.creator = "kt暴君"
|
||
self.version = "3.0 Ultimate"
|
||
self.api_url = "http://192.168.9.149:5666/api/finance/transactions"
|
||
|
||
# 缓存数据,避免重复请求
|
||
self.data_cache = None
|
||
self.cache_time = None
|
||
self.cache_ttl = 60 # 缓存60秒
|
||
|
||
# 上下文记忆系统(按用户ID存储)
|
||
self.user_contexts = {} # {user_id: {state, data, timestamp}}
|
||
self.general_chat_cli = os.getenv("FINANCE_AGENT_GENERAL_CLI", "codex")
|
||
self.general_histories: Dict[str, Deque[Tuple[str, str]]] = {}
|
||
|
||
async def initialize(self):
|
||
"""初始化"""
|
||
logger.info(f"✅ {self.name} v{self.version} 已启动 - 终极智能模式")
|
||
# 预加载数据
|
||
await self._get_data(force_refresh=True)
|
||
return True
|
||
|
||
async def _get_data(self, force_refresh=False) -> List[Dict]:
|
||
"""获取数据(带缓存)"""
|
||
now = datetime.now()
|
||
|
||
if (
|
||
not force_refresh
|
||
and self.data_cache is not None
|
||
and self.cache_time is not None
|
||
and (now - self.cache_time).total_seconds() < self.cache_ttl
|
||
):
|
||
return self.data_cache
|
||
|
||
try:
|
||
import requests
|
||
|
||
resp = requests.get(self.api_url, timeout=10)
|
||
if resp.status_code == 200:
|
||
payload = resp.json()
|
||
if isinstance(payload, dict):
|
||
self.data_cache = payload.get("data", [])
|
||
else:
|
||
self.data_cache = []
|
||
self.cache_time = now
|
||
logger.info(f"数据已更新: {len(self.data_cache)} 条记录")
|
||
return self.data_cache
|
||
|
||
logger.error(f"获取数据失败: 状态码 {resp.status_code}")
|
||
except Exception as e:
|
||
logger.error(f"获取数据失败: {e}")
|
||
|
||
return self.data_cache or []
|
||
|
||
async def process_command(self, command: str, context: Dict = None) -> str:
|
||
"""主处理入口 - 智能路由(支持上下文)"""
|
||
cmd = command.lower().strip()
|
||
user_id = context.get("user_id", "default") if context else "default"
|
||
|
||
# 检查是否有待处理的上下文
|
||
if user_id in self.user_contexts:
|
||
ctx = self.user_contexts[user_id]
|
||
# 检查上下文是否过期(5分钟)
|
||
from datetime import datetime, timedelta
|
||
if datetime.now() - ctx.get("timestamp", datetime.now()) < timedelta(minutes=5):
|
||
# 尝试处理上下文响应
|
||
result = await self._handle_context_response(command, user_id, ctx)
|
||
if result:
|
||
return result
|
||
else:
|
||
# 上下文过期,清除
|
||
del self.user_contexts[user_id]
|
||
|
||
# 1. 账单查询(最高优先级)
|
||
if self._is_bill_query(command):
|
||
return await self._handle_bill_query(command, context)
|
||
|
||
# 2. 网站相关(优先于状态检查,避免被拦截)
|
||
if any(kw in cmd for kw in ["网站", "website", "web"]) and "状态" in cmd:
|
||
return await self._handle_website(command)
|
||
|
||
# 3. 虚拟机/服务器(优先处理具体对象)
|
||
if any(kw in cmd for kw in ["虚拟机", "vm", "服务器"]) and not any(kw in cmd for kw in ["状态", "重启"]):
|
||
return await self._handle_vm(command)
|
||
|
||
# 4. 状态检查
|
||
if any(kw in cmd for kw in ["状态", "status", "运行", "健康", "正常吗"]):
|
||
return await self._handle_status_check(command)
|
||
|
||
# 5. 统计分析
|
||
if any(kw in cmd for kw in ["统计", "分析", "趋势", "汇总", "总共"]):
|
||
return await self._handle_statistics(command)
|
||
|
||
# 6. 数据查询
|
||
if any(kw in cmd for kw in ["本月", "今天", "本周", "昨天", "最近"]):
|
||
return await self._handle_time_query(command)
|
||
|
||
# 7. 系统管理
|
||
if "重启" in cmd:
|
||
return await self._handle_restart(command)
|
||
|
||
# 8. 日志查看
|
||
if any(kw in cmd for kw in ["日志", "log", "错误", "异常"]):
|
||
return await self._handle_logs(command)
|
||
|
||
# 9. 帮助
|
||
if any(kw in cmd for kw in ["帮助", "help", "功能", "怎么用", "命令"]):
|
||
return await self._handle_help(command)
|
||
|
||
# 10. 智能对话
|
||
return await self._handle_conversation(command, context)
|
||
|
||
# ========== 上下文管理 ==========
|
||
|
||
async def _handle_context_response(self, command: str, user_id: str, ctx: Dict) -> Optional[str]:
|
||
"""处理上下文响应"""
|
||
state = ctx.get("state")
|
||
|
||
# 年份选择上下文
|
||
if state == "awaiting_year_selection":
|
||
# 检查用户是否输入了年份
|
||
year_match = re.search(r'20(2[0-9])', command)
|
||
if year_match:
|
||
year = int(year_match.group(0))
|
||
months = ctx.get("months", [])
|
||
|
||
# 更新月份的年份
|
||
for m in months:
|
||
m["year"] = year
|
||
|
||
# 清除上下文
|
||
del self.user_contexts[user_id]
|
||
|
||
# 执行查询
|
||
data = await self._get_data()
|
||
return await self._execute_bill_query(data, months)
|
||
|
||
# 检查是否输入了年份关键词
|
||
if "2024" in command or "2025" in command or "今年" in command or "去年" in command:
|
||
if "2024" in command:
|
||
year = 2024
|
||
elif "2025" in command or "今年" in command:
|
||
year = 2025
|
||
elif "去年" in command:
|
||
year = datetime.now().year - 1
|
||
else:
|
||
return None
|
||
|
||
months = ctx.get("months", [])
|
||
for m in months:
|
||
m["year"] = year
|
||
|
||
del self.user_contexts[user_id]
|
||
data = await self._get_data()
|
||
return await self._execute_bill_query(data, months)
|
||
|
||
return None
|
||
|
||
def _set_context(self, user_id: str, state: str, data: Dict):
|
||
"""设置用户上下文"""
|
||
from datetime import datetime
|
||
self.user_contexts[user_id] = {
|
||
"state": state,
|
||
"timestamp": datetime.now(),
|
||
**data
|
||
}
|
||
|
||
# ========== 账单查询 ==========
|
||
|
||
def _is_bill_query(self, cmd: str) -> bool:
|
||
"""判断是否是账单查询"""
|
||
month_patterns = [r'\d{1,2}月', r'[一二三四五六七八九十]{1,2}月', r'\d{4}年\d{1,2}月']
|
||
has_month = any(re.search(p, cmd) for p in month_patterns)
|
||
has_query = any(w in cmd for w in ["账单", "记录", "查", "看", "显示"])
|
||
return has_month and has_query
|
||
|
||
async def _handle_bill_query(self, command: str, context: Dict = None) -> str:
|
||
"""处理账单查询 - 极致智能(支持上下文)"""
|
||
data = await self._get_data()
|
||
user_id = context.get("user_id", "default") if context else "default"
|
||
|
||
# 解析月份
|
||
months = self._parse_months(command)
|
||
if not months:
|
||
return "❓ 请告诉我要查询哪个月份\n\n例如:\n• 查9月账单\n• 查2024年9月\n• 查7月和8月"
|
||
|
||
# 检查是否需要询问年份(只有当用户使用当前年份且命令中没有明确年份时才询问)
|
||
current_year = datetime.now().year
|
||
# 检查命令中是否包含年份数字
|
||
has_explicit_year = bool(re.search(r'\d{4}年', command))
|
||
need_confirm = any(m["year"] == current_year for m in months) and not has_explicit_year
|
||
|
||
if need_confirm:
|
||
# 检查历史数据
|
||
year_stats = self._get_year_stats(data, months)
|
||
if len(year_stats) > 1:
|
||
# 多年都有数据,需要用户确认
|
||
# 保存上下文
|
||
self._set_context(user_id, "awaiting_year_selection", {
|
||
"months": months,
|
||
"year_stats": year_stats,
|
||
"original_command": command
|
||
})
|
||
|
||
month_names = "、".join([f"{m['month']}月" for m in months])
|
||
options = []
|
||
for year in sorted(year_stats.keys(), reverse=True):
|
||
options.append(f"• {year}年{month_names}: {year_stats[year]['count']}条记录,总计 ¥{year_stats[year]['total']:.3f}")
|
||
|
||
return f"📊 发现多个年份都有{month_names}的数据:\n\n" + "\n".join(options) + f"\n\n❓ 请直接回复年份:\n• 2025\n• 2024"
|
||
|
||
# 执行查询
|
||
return await self._execute_bill_query(data, months)
|
||
|
||
def _parse_months(self, cmd: str) -> List[Dict]:
|
||
"""解析月份"""
|
||
months = []
|
||
current_year = datetime.now().year
|
||
|
||
cn_map = {"一": 1, "二": 2, "三": 3, "四": 4, "五": 5, "六": 6,
|
||
"七": 7, "八": 8, "九": 9, "十": 10, "十一": 11, "十二": 12}
|
||
|
||
# 完整格式:2024年9月
|
||
for m in re.finditer(r'(\d{4})年(\d{1,2})月', cmd):
|
||
year, month = int(m.group(1)), int(m.group(2))
|
||
if 1 <= month <= 12 and 2020 <= year <= 2030:
|
||
months.append({"year": year, "month": month})
|
||
|
||
# 简单格式:9月(使用当前年份)
|
||
if not months:
|
||
for m in re.finditer(r'(\d{1,2})月', cmd):
|
||
month = int(m.group(1))
|
||
if 1 <= month <= 12:
|
||
months.append({"year": current_year, "month": month})
|
||
|
||
# 中文月份
|
||
for cn, num in cn_map.items():
|
||
if f"{cn}月" in cmd and not any(m["month"] == num for m in months):
|
||
months.append({"year": current_year, "month": num})
|
||
|
||
# 去重
|
||
seen = set()
|
||
unique = []
|
||
for m in months:
|
||
key = (m["year"], m["month"])
|
||
if key not in seen:
|
||
seen.add(key)
|
||
unique.append(m)
|
||
|
||
return unique
|
||
|
||
def _get_year_stats(self, data: List[Dict], months: List[Dict]) -> Dict:
|
||
"""获取各年份的统计"""
|
||
stats = {}
|
||
for item in data:
|
||
date_str = item.get("transactionDate", "")
|
||
if not date_str:
|
||
continue
|
||
|
||
try:
|
||
date = datetime.strptime(date_str, "%Y-%m-%d")
|
||
for m in months:
|
||
if date.month == m["month"]:
|
||
year = date.year
|
||
if year not in stats:
|
||
stats[year] = {"count": 0, "total": 0}
|
||
stats[year]["count"] += 1
|
||
stats[year]["total"] += float(item.get("amount", 0))
|
||
break
|
||
except:
|
||
continue
|
||
|
||
return stats
|
||
|
||
async def _execute_bill_query(self, data: List[Dict], months: List[Dict]) -> str:
|
||
"""执行账单查询"""
|
||
# 筛选数据
|
||
filtered = []
|
||
for item in data:
|
||
date_str = item.get("transactionDate", "")
|
||
if not date_str:
|
||
continue
|
||
|
||
try:
|
||
date = datetime.strptime(date_str, "%Y-%m-%d")
|
||
for m in months:
|
||
if date.year == m["year"] and date.month == m["month"]:
|
||
filtered.append(item)
|
||
break
|
||
except:
|
||
continue
|
||
|
||
if not filtered:
|
||
month_desc = " + ".join([f"{m['year']}年{m['month']}月" for m in months])
|
||
# 智能提示有数据的月份
|
||
all_months = set()
|
||
for item in data:
|
||
date_str = item.get("transactionDate", "")
|
||
if date_str:
|
||
try:
|
||
all_months.add(date_str[:7])
|
||
except:
|
||
pass
|
||
recent = sorted(all_months, reverse=True)[:5]
|
||
hints = "\n".join([f"• {m}" for m in recent])
|
||
return f"📊 {month_desc} 暂无记录\n\n💡 最近有数据的月份:\n{hints}"
|
||
|
||
# 生成详细报告
|
||
filtered.sort(key=lambda x: x.get("transactionDate", ""), reverse=True)
|
||
|
||
# 分月统计
|
||
by_month = defaultdict(list)
|
||
for item in filtered:
|
||
month_key = item.get("transactionDate", "")[:7]
|
||
by_month[month_key].append(item)
|
||
|
||
# 生成报告
|
||
total = sum(float(i.get("amount", 0)) for i in filtered)
|
||
month_desc = " + ".join([f"{m['year']}年{m['month']}月" for m in months])
|
||
|
||
report = f"📊 {month_desc} 账单详情\n{'='*50}\n\n"
|
||
|
||
for month_key in sorted(by_month.keys(), reverse=True):
|
||
items = by_month[month_key]
|
||
subtotal = sum(float(i.get("amount", 0)) for i in items)
|
||
|
||
# 格式化月份
|
||
y, m = month_key.split('-')
|
||
report += f"📅 {y}年{int(m)}月 ({len(items)}笔)\n{'-'*60}\n"
|
||
|
||
# 表格头
|
||
report += f"{'序号':<4} {'日期':<12} {'金额':<12} {'币种':<6} {'描述'}\n"
|
||
report += f"{'━'*60}\n"
|
||
|
||
# 显示所有记录(完整)
|
||
for idx, item in enumerate(items, 1):
|
||
date = item.get("transactionDate", "")
|
||
amt = float(item.get("amount", 0))
|
||
desc = item.get("description", "无描述")
|
||
curr = item.get("currency", "CNY")
|
||
|
||
# 截断过长描述
|
||
if len(desc) > 25:
|
||
desc = desc[:22] + "..."
|
||
|
||
report += f"{idx:<4} {date:<12} {amt:<12.2f} {curr:<6} {desc}\n"
|
||
|
||
report += f"{'-'*60}\n"
|
||
report += f"💰 小计: ¥{subtotal:.3f}\n\n"
|
||
|
||
report += f"{'='*50}\n"
|
||
report += f"💰 总计: ¥{total:.3f}\n"
|
||
report += f"📝 共 {len(filtered)} 笔交易\n"
|
||
report += f"🕐 {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
|
||
|
||
return report
|
||
|
||
# ========== 状态检查 ==========
|
||
|
||
async def _handle_status_check(self, command: str) -> str:
|
||
"""状态检查 - 详细健康报告"""
|
||
checks = []
|
||
|
||
# 1. 机器人状态
|
||
bot_status = await self._check_bot_status()
|
||
checks.append(bot_status)
|
||
|
||
# 2. API状态
|
||
api_status = await self._check_api_status()
|
||
checks.append(api_status)
|
||
|
||
# 3. 虚拟机状态
|
||
vm_status = await self._check_vm_status()
|
||
checks.append(vm_status)
|
||
|
||
# 4. 数据统计
|
||
data_status = await self._check_data_status()
|
||
checks.append(data_status)
|
||
|
||
# 汇总健康度
|
||
health_score = sum(1 for c in checks if "✅" in c["status"]) / len(checks) * 100
|
||
|
||
report = f"📊 系统健康度报告\n{'='*50}\n\n"
|
||
report += f"🏥 健康评分: {health_score:.0f}%\n\n"
|
||
|
||
for check in checks:
|
||
report += f"{check['icon']} {check['name']}: {check['status']}\n"
|
||
if check.get("detail"):
|
||
report += f" {check['detail']}\n"
|
||
|
||
report += f"\n{'='*50}\n"
|
||
report += f"🕐 {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
|
||
|
||
if health_score < 80:
|
||
report += "\n⚠️ 系统存在问题,建议检查!"
|
||
|
||
return report
|
||
|
||
async def _check_bot_status(self) -> Dict:
|
||
"""检查机器人状态"""
|
||
try:
|
||
result = subprocess.run(
|
||
"ps aux | grep telegram_webhook_bot.py | grep -v grep",
|
||
shell=True, capture_output=True, text=True, timeout=5
|
||
)
|
||
running = bool(result.stdout.strip())
|
||
|
||
if running:
|
||
# 获取运行时间
|
||
lines = result.stdout.strip().split('\n')
|
||
if lines:
|
||
return {
|
||
"icon": "🤖", "name": "机器人",
|
||
"status": "✅ 运行中",
|
||
"detail": "服务正常"
|
||
}
|
||
|
||
return {"icon": "🤖", "name": "机器人", "status": "❌ 已停止", "detail": "需要重启"}
|
||
except:
|
||
return {"icon": "🤖", "name": "机器人", "status": "⚠️ 检查失败", "detail": None}
|
||
|
||
async def _check_api_status(self) -> Dict:
|
||
"""检查API状态"""
|
||
try:
|
||
data = await self._get_data()
|
||
count = len(data)
|
||
return {
|
||
"icon": "🌐", "name": "API服务",
|
||
"status": f"✅ 正常",
|
||
"detail": f"{count}条记录"
|
||
}
|
||
except:
|
||
return {"icon": "🌐", "name": "API服务", "status": "❌ 异常", "detail": "无法连接"}
|
||
|
||
async def _check_vm_status(self) -> Dict:
|
||
"""检查虚拟机状态"""
|
||
try:
|
||
import socket
|
||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||
sock.settimeout(3)
|
||
result = sock.connect_ex(("192.168.9.149", 22))
|
||
sock.close()
|
||
|
||
if result == 0:
|
||
return {"icon": "💻", "name": "虚拟机", "status": "✅ 在线", "detail": "SSH可达"}
|
||
return {"icon": "💻", "name": "虚拟机", "status": "❌ 离线", "detail": "无法连接"}
|
||
except:
|
||
return {"icon": "💻", "name": "虚拟机", "status": "⚠️ 检查失败", "detail": None}
|
||
|
||
async def _check_data_status(self) -> Dict:
|
||
"""检查数据状态"""
|
||
try:
|
||
data = await self._get_data()
|
||
if not data:
|
||
return {"icon": "📊", "name": "数据", "status": "⚠️ 无数据", "detail": None}
|
||
|
||
# 最新记录时间
|
||
dates = [d.get("transactionDate", "") for d in data if d.get("transactionDate")]
|
||
if dates:
|
||
latest = max(dates)
|
||
return {
|
||
"icon": "📊", "name": "数据",
|
||
"status": "✅ 正常",
|
||
"detail": f"最新: {latest}"
|
||
}
|
||
|
||
return {"icon": "📊", "name": "数据", "status": "⚠️ 异常", "detail": "无日期"}
|
||
except:
|
||
return {"icon": "📊", "name": "数据", "status": "❌ 错误", "detail": None}
|
||
|
||
# ========== 统计分析 ==========
|
||
|
||
async def _handle_statistics(self, command: str) -> str:
|
||
"""统计分析 - 可视化报告"""
|
||
data = await self._get_data()
|
||
|
||
if not data:
|
||
return "❌ 暂无数据"
|
||
|
||
now = datetime.now()
|
||
today = now.date()
|
||
month_start = now.replace(day=1).date()
|
||
year_start = now.replace(month=1, day=1).date()
|
||
|
||
today_total = 0
|
||
month_total = 0
|
||
year_total = 0
|
||
|
||
# 分类统计
|
||
categories = Counter()
|
||
monthly = defaultdict(float)
|
||
|
||
for item in data:
|
||
date_str = item.get("transactionDate", "")
|
||
if not date_str:
|
||
continue
|
||
|
||
try:
|
||
date = datetime.strptime(date_str, "%Y-%m-%d").date()
|
||
amount = float(item.get("amount", 0))
|
||
|
||
if date == today:
|
||
today_total += amount
|
||
if date >= month_start:
|
||
month_total += amount
|
||
if date >= year_start:
|
||
year_total += amount
|
||
|
||
# 月度统计
|
||
month_key = date_str[:7]
|
||
monthly[month_key] += amount
|
||
|
||
# 分类统计
|
||
desc = item.get("description", "其他")
|
||
categories[desc] += amount
|
||
except:
|
||
continue
|
||
|
||
# 生成报告
|
||
report = f"📊 财务统计分析\n{'='*50}\n\n"
|
||
|
||
# 基础统计
|
||
report += "📈 基础数据\n"
|
||
report += f"• 今日支出: ¥{today_total:.3f}\n"
|
||
report += f"• 本月支出: ¥{month_total:.3f}\n"
|
||
report += f"• 本年支出: ¥{year_total:.3f}\n"
|
||
report += f"• 总记录数: {len(data)} 条\n\n"
|
||
|
||
# 月度趋势(最近6个月)
|
||
recent_months = sorted(monthly.keys(), reverse=True)[:6]
|
||
if recent_months:
|
||
report += "📅 月度趋势(最近6个月)\n"
|
||
max_amt = max(monthly[m] for m in recent_months)
|
||
for month in recent_months:
|
||
amt = monthly[month]
|
||
bar_len = int((amt / max_amt) * 20) if max_amt > 0 else 0
|
||
bar = "█" * bar_len
|
||
report += f"{month}: ¥{amt:>10.2f} {bar}\n"
|
||
report += "\n"
|
||
|
||
# Top消费
|
||
top_cat = categories.most_common(5)
|
||
if top_cat:
|
||
report += "🏆 Top 5 消费项目\n"
|
||
for idx, (cat, amt) in enumerate(top_cat, 1):
|
||
report += f"{idx}. {cat[:20]:20} ¥{amt:.3f}\n"
|
||
|
||
report += f"\n{'='*50}\n"
|
||
report += f"🕐 {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
|
||
|
||
return report
|
||
|
||
# ========== 其他功能 ==========
|
||
|
||
async def _handle_time_query(self, command: str) -> str:
|
||
"""时间范围查询"""
|
||
data = await self._get_data()
|
||
now = datetime.now()
|
||
|
||
# 解析时间范围
|
||
if "今天" in command or "今日" in command:
|
||
start = now.date()
|
||
title = "今日"
|
||
elif "昨天" in command:
|
||
start = (now - timedelta(days=1)).date()
|
||
title = "昨日"
|
||
elif "本周" in command:
|
||
start = (now - timedelta(days=now.weekday())).date()
|
||
title = "本周"
|
||
elif "本月" in command:
|
||
start = now.replace(day=1).date()
|
||
title = "本月"
|
||
else:
|
||
start = (now - timedelta(days=7)).date()
|
||
title = "最近7天"
|
||
|
||
# 筛选数据
|
||
filtered = []
|
||
for item in data:
|
||
date_str = item.get("transactionDate", "")
|
||
if date_str:
|
||
try:
|
||
date = datetime.strptime(date_str, "%Y-%m-%d").date()
|
||
if date >= start:
|
||
filtered.append(item)
|
||
except:
|
||
pass
|
||
|
||
if not filtered:
|
||
return f"📊 {title}暂无交易记录"
|
||
|
||
total = sum(float(i.get("amount", 0)) for i in filtered)
|
||
|
||
report = f"📊 {title}交易汇总\n{'='*50}\n\n"
|
||
report += f"💰 总金额: ¥{total:.3f}\n"
|
||
report += f"📝 交易笔数: {len(filtered)}\n\n"
|
||
|
||
# 最近5笔
|
||
filtered.sort(key=lambda x: x.get("transactionDate", ""), reverse=True)
|
||
report += "最近交易:\n"
|
||
for idx, item in enumerate(filtered[:5], 1):
|
||
date = item.get("transactionDate", "")
|
||
amt = item.get("amount", 0)
|
||
desc = item.get("description", "")[:20]
|
||
report += f"{idx}. {date} ¥{amt} {desc}\n"
|
||
|
||
if len(filtered) > 5:
|
||
report += f"... 还有 {len(filtered)-5} 笔\n"
|
||
|
||
return report
|
||
|
||
async def _handle_restart(self, command: str) -> str:
|
||
"""重启服务"""
|
||
cmd_lower = command.lower()
|
||
|
||
# 如果是询问怎么做,提供指导而不是直接执行
|
||
if any(w in cmd_lower for w in ["怎么", "如何", "怎样", "如何做"]):
|
||
return """
|
||
🔧 重启服务指南
|
||
|
||
【重启机器人】
|
||
直接输入:重启机器人
|
||
|
||
【重启网站】
|
||
直接输入:重启网站
|
||
|
||
⚠️ 注意:重启会导致短暂服务中断
|
||
"""
|
||
|
||
# 明确的重启命令才执行
|
||
if "机器人" in command or "bot" in command:
|
||
return await self._restart_bot()
|
||
|
||
return "请明确要重启什么:\n• 重启机器人\n• 重启网站"
|
||
|
||
async def _restart_bot(self) -> str:
|
||
"""重启机器人"""
|
||
try:
|
||
subprocess.run("pkill -f telegram_webhook_bot.py", shell=True)
|
||
await asyncio.sleep(1)
|
||
subprocess.Popen(
|
||
["python3", "/Users/fuwuqi/telegram_webhook_bot.py"],
|
||
stdout=open("/Users/fuwuqi/bot.log", "a"),
|
||
stderr=subprocess.STDOUT,
|
||
start_new_session=True
|
||
)
|
||
await asyncio.sleep(2)
|
||
result = subprocess.run(
|
||
"ps aux | grep telegram_webhook_bot.py | grep -v grep | wc -l",
|
||
shell=True, capture_output=True, text=True
|
||
)
|
||
if int(result.stdout.strip()) > 0:
|
||
return "✅ 机器人重启成功!"
|
||
return "❌ 重启失败,请检查日志"
|
||
except Exception as e:
|
||
return f"❌ 重启失败: {e}"
|
||
|
||
async def _handle_logs(self, command: str) -> str:
|
||
"""日志查看"""
|
||
try:
|
||
lines = 20
|
||
if "错误" in command or "error" in command.lower():
|
||
# 只显示错误
|
||
result = subprocess.run(
|
||
f"tail -100 /Users/fuwuqi/bot.log | grep -i error | tail -20",
|
||
shell=True, capture_output=True, text=True
|
||
)
|
||
content = result.stdout or "无错误日志"
|
||
return f"📝 错误日志\n{'='*50}\n{content}"
|
||
|
||
with open("/Users/fuwuqi/bot.log", "r") as f:
|
||
content = "".join(f.readlines()[-lines:])
|
||
|
||
return f"📝 最近日志({lines}行)\n{'='*50}\n{content}"
|
||
except:
|
||
return "❌ 无法读取日志"
|
||
|
||
async def _handle_help(self, command: str) -> str:
|
||
"""帮助"""
|
||
return f"""
|
||
🤖 {self.name} v{self.version}
|
||
创作者:{self.creator}
|
||
|
||
💡 我能做什么:
|
||
|
||
【📊 账单查询】
|
||
• 查9月账单 - 智能询问年份
|
||
• 查2024年9月 - 指定年份
|
||
• 查7月和8月 - 多月汇总
|
||
|
||
【📈 统计分析】
|
||
• 统计 - 详细财务报告
|
||
• 本月 / 今天 - 时间范围查询
|
||
• 分析 - 趋势和可视化
|
||
|
||
【🔧 系统管理】
|
||
• 状态 - 健康度报告
|
||
• 重启机器人 - 安全重启
|
||
• 日志 - 查看运行日志
|
||
|
||
【💬 智能对话】
|
||
直接说话即可,我会理解!
|
||
|
||
📱 系统信息:
|
||
• 网站: http://192.168.9.149:5666
|
||
• 机器人: @ktyyds_bot
|
||
• 作者: {self.creator}
|
||
"""
|
||
|
||
async def _handle_vm(self, command: str) -> str:
|
||
"""虚拟机信息"""
|
||
return """
|
||
💻 虚拟机信息
|
||
{'='*50}
|
||
• IP: 192.168.9.149
|
||
• 用户: atai
|
||
• 端口: 5666
|
||
• SSH: 22
|
||
• 数据库: /home/atai/finwise-pro/apps/backend/storage/finance.db
|
||
"""
|
||
|
||
async def _handle_website(self, command: str) -> str:
|
||
"""网站相关"""
|
||
try:
|
||
import socket
|
||
sock = socket.socket()
|
||
sock.settimeout(3)
|
||
result = sock.connect_ex(("192.168.9.149", 5666))
|
||
sock.close()
|
||
|
||
if result == 0:
|
||
return "🌐 网站状态: ✅ 正常\n地址: http://192.168.9.149:5666"
|
||
return "🌐 网站状态: ❌ 无法访问\n\n建议:\n• 检查网络\n• 检查虚拟机"
|
||
except:
|
||
return "🌐 网站状态: ⚠️ 检查失败"
|
||
|
||
async def _handle_conversation(self, command: str, context: Dict = None) -> str:
|
||
"""智能对话"""
|
||
user_id = context.get("user_id", "default") if context else "default"
|
||
cmd = command.lower()
|
||
|
||
if any(w in cmd for w in ["你好", "hi", "hello", "嗨"]):
|
||
response = f"您好!我是{self.name},智能财务助手。\n\n输入'帮助'查看我能做什么!"
|
||
self._remember_exchange(user_id, command, response)
|
||
return response
|
||
|
||
if any(w in cmd for w in ["谢谢", "thank"]):
|
||
response = "不客气!随时为您服务 😊"
|
||
self._remember_exchange(user_id, command, response)
|
||
return response
|
||
|
||
general_reply = await self._generate_general_reply(user_id, command)
|
||
if general_reply:
|
||
return general_reply
|
||
|
||
if any(symbol in command for symbol in ["?", "?", "吗"]):
|
||
response = (
|
||
"我暂时没抓住您的重点,能再具体描述一下吗?\n"
|
||
"我擅长:账单查询、网站/虚拟机/机器人状态、财务统计,还有日常聊天。"
|
||
)
|
||
self._remember_exchange(user_id, command, response)
|
||
return response
|
||
|
||
response = (
|
||
f"🤔 我理解您说:\"{command}\"\n\n"
|
||
"如果是系统相关需求,可以告诉我:账单查询、网站/虚拟机状态、机器人、统计分析等。\n"
|
||
"如果只是想聊聊别的,也可以直接说,我会用自然语言和您交流。"
|
||
)
|
||
self._remember_exchange(user_id, command, response)
|
||
return response
|
||
|
||
async def _generate_general_reply(self, user_id: str, message: str) -> Optional[str]:
|
||
"""调用外部对话模型,处理泛化对话"""
|
||
if not self.general_chat_cli:
|
||
return None
|
||
|
||
prompt = self._build_general_prompt(user_id, message)
|
||
reply = await self._call_general_cli(prompt)
|
||
|
||
if reply:
|
||
self._remember_exchange(user_id, message, reply)
|
||
return reply
|
||
return None
|
||
|
||
def _build_general_prompt(self, user_id: str, message: str) -> str:
|
||
"""构建与Codex/Claude CLI的对话提示"""
|
||
history = self._get_history(user_id)
|
||
recent = list(history)[-6:]
|
||
|
||
formatted_history = []
|
||
for role, text in recent:
|
||
speaker = "用户" if role == "user" else "助手"
|
||
formatted_history.append(f"{speaker}: {text}")
|
||
|
||
history_block = "\n".join(formatted_history) if formatted_history else "(无历史对话)"
|
||
|
||
return (
|
||
"你是“财务管家助手”,负责财务系统网站、虚拟机和机器人运维,"
|
||
"也可以跟用户就任何话题自然聊天,提供常识、建议或情绪支持。\n"
|
||
"- 语气自然亲切,像同事聊天\n"
|
||
"- 回答保持务实,最多6句中文\n"
|
||
"- 如果问题超出掌握范围,要坦诚说明并给出下一步建议\n"
|
||
"- 优先考虑是否与财务系统、虚拟机、机器人相关,如是则结合实际经验回复\n"
|
||
"- 如果用户只是闲聊,就顺着话题交流\n\n"
|
||
f"历史对话:\n{history_block}\n\n"
|
||
f"用户: {message}\n"
|
||
"助手:"
|
||
)
|
||
|
||
async def _call_general_cli(self, prompt: str) -> Optional[str]:
|
||
"""调用本地CLI进行对话"""
|
||
if not shutil.which(self.general_chat_cli):
|
||
logger.warning(f"未找到对话CLI:{self.general_chat_cli}")
|
||
return None
|
||
|
||
def _run() -> Optional[str]:
|
||
try:
|
||
completed = subprocess.run(
|
||
[self.general_chat_cli, prompt],
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=45
|
||
)
|
||
if completed.returncode != 0:
|
||
logger.error(
|
||
"对话CLI执行失败: returncode=%s, stderr=%s",
|
||
completed.returncode,
|
||
completed.stderr.strip()
|
||
)
|
||
return None
|
||
return self._sanitize_cli_output(completed.stdout)
|
||
except subprocess.TimeoutExpired:
|
||
logger.error("对话CLI执行超时")
|
||
return None
|
||
except Exception as exc:
|
||
logger.error(f"对话CLI调用异常: {exc}")
|
||
return None
|
||
|
||
return await asyncio.to_thread(_run)
|
||
|
||
def _get_history(self, user_id: str) -> Deque[Tuple[str, str]]:
|
||
"""获取或初始化用户对话历史"""
|
||
history = self.general_histories.get(user_id)
|
||
if history is None:
|
||
history = deque(maxlen=12)
|
||
self.general_histories[user_id] = history
|
||
return history
|
||
|
||
def _remember_exchange(self, user_id: str, user_text: str, assistant_text: str):
|
||
"""记录一次对话轮次"""
|
||
history = self._get_history(user_id)
|
||
history.append(("user", user_text.strip()))
|
||
history.append(("assistant", assistant_text.strip()))
|
||
|
||
def _sanitize_cli_output(self, output: str) -> Optional[str]:
|
||
"""清理CLI输出,去除控制字符和冗余前缀"""
|
||
if not output:
|
||
return None
|
||
|
||
cleaned = output.replace("\r\n", "\n").replace("\r", "\n").strip()
|
||
cleaned = re.sub(r"\x1b\[[0-9;]*[A-Za-z]", "", cleaned)
|
||
|
||
lines = [line.strip() for line in cleaned.split("\n") if line.strip()]
|
||
if not lines:
|
||
return None
|
||
|
||
# 删除通用前缀
|
||
first_line = lines[0]
|
||
first_line = re.sub(r"^(assistant|codex|claude|response)[::]\s*", "", first_line, flags=re.IGNORECASE)
|
||
lines[0] = first_line
|
||
|
||
cleaned_text = "\n".join(lines).strip()
|
||
return cleaned_text[:1200]
|
||
|
||
# 全局实例
|
||
finance_agent = FinanceAgentUltimate()
|
||
|
||
async def handle_agent_query(query: str, context: Dict = None) -> str:
|
||
"""主入口"""
|
||
if not hasattr(finance_agent, 'initialized'):
|
||
await finance_agent.initialize()
|
||
finance_agent.initialized = True
|
||
|
||
return await finance_agent.process_command(query, context)
|