Files
telegram-customer-bot/admin_commands.py
2025-11-01 21:58:31 +08:00

348 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
管理员命令模块
为integrated_bot_ai.py添加管理员后台功能
"""
import sqlite3
from datetime import datetime, timedelta
from typing import Dict, List
import os
import psutil
ADMIN_ID = 7363537082
class AdminCommands:
"""管理员命令处理类"""
def __init__(self, bot_instance, logger):
self.bot = bot_instance
self.logger = logger
self.stats = {
"start_time": datetime.now(),
"messages_received": 0,
"searches_performed": 0,
"ai_queries": 0,
"cache_hits": 0
}
async def handle_stats(self, update, context):
"""处理 /stats 命令 - 查看统计信息"""
user = update.effective_user
if user.id != ADMIN_ID:
return
uptime = datetime.now() - self.stats["start_time"]
days = uptime.days
hours = uptime.seconds // 3600
minutes = (uptime.seconds % 3600) // 60
# 获取进程信息
process = psutil.Process(os.getpid())
memory_mb = process.memory_info().rss / 1024 / 1024
# 查询日志获取用户数
user_count = self._get_unique_users_from_logs()
# 查询缓存数据库
cache_stats = self._get_cache_stats()
text = (
"📊 **机器人统计信息**\\n\\n"
f"⏱ **运行时间:** {days}{hours}小时 {minutes}分钟\\n"
f"📅 **启动时间:** {self.stats['start_time'].strftime('%Y-%m-%d %H:%M:%S')}\\n\\n"
"👥 **用户统计:**\\n"
f"• 总访问用户:{user_count}\\n"
f"• AI对话次数{self.stats['ai_queries']}\\n"
f"• 搜索执行次数:{self.stats['searches_performed']}\\n\\n"
"💾 **缓存统计:**\\n"
f"• 缓存记录数:{cache_stats['total']}\\n"
f"• 缓存命中率:{cache_stats['hit_rate']:.1f}%\\n\\n"
"💻 **系统资源:**\\n"
f"• 内存使用:{memory_mb:.1f} MB\\n"
f"• CPU核心数{psutil.cpu_count()}\\n"
)
await update.message.reply_text(text, parse_mode='Markdown')
self.logger.info(f"[管理员] 查看统计信息")
async def handle_users(self, update, context):
"""处理 /users 命令 - 查看用户列表"""
user = update.effective_user
if user.id != ADMIN_ID:
return
users = self._get_users_from_logs()
if not users:
await update.message.reply_text("暂无用户访问记录")
return
text = f"👥 **访问用户列表** ({len(users)}人)\\n\\n"
for i, (user_id, count) in enumerate(users[:20], 1):
text += f"{i}. 用户 `{user_id}` - {count}次交互\\n"
if len(users) > 20:
text += f"\\n... 还有 {len(users)-20} 个用户"
text += f"\\n\\n💡 使用 /userinfo <用户ID> 查看详情"
await update.message.reply_text(text, parse_mode='Markdown')
self.logger.info(f"[管理员] 查看用户列表")
async def handle_userinfo(self, update, context):
"""处理 /userinfo 命令 - 查看指定用户信息"""
user = update.effective_user
if user.id != ADMIN_ID:
return
if not context.args:
await update.message.reply_text(
"用法:/userinfo <用户ID>\\n"
"示例:/userinfo 123456789"
)
return
try:
target_user_id = int(context.args[0])
info = self._get_user_info(target_user_id)
if not info:
await update.message.reply_text(f"未找到用户 {target_user_id} 的记录")
return
text = (
f"👤 **用户信息** (ID: `{target_user_id}`)\\n\\n"
f"📊 交互次数:{info['interactions']}\\n"
f"🔍 搜索次数:{info['searches']}\\n"
f"💬 AI对话次数{info['ai_chats']}\\n"
f"⏰ 首次访问:{info['first_seen']}\\n"
f"🕐 最后活跃:{info['last_seen']}\\n"
)
await update.message.reply_text(text, parse_mode='Markdown')
except ValueError:
await update.message.reply_text("❌ 用户ID格式错误请输入数字")
except Exception as e:
await update.message.reply_text(f"❌ 查询失败:{str(e)}")
self.logger.error(f"[管理员] 查询用户信息失败: {e}")
async def handle_logs(self, update, context):
"""处理 /logs 命令 - 查看最近日志"""
user = update.effective_user
if user.id != ADMIN_ID:
return
lines = context.args[0] if context.args else "50"
try:
num_lines = int(lines)
log_file = "./logs/integrated_bot_detailed.log"
if os.path.exists(log_file):
with open(log_file, 'r') as f:
all_lines = f.readlines()
recent_lines = all_lines[-num_lines:]
log_text = ''.join(recent_lines)
# Telegram消息限制4096字符
if len(log_text) > 4000:
log_text = log_text[-4000:]
await update.message.reply_text(
f"📋 **最近 {num_lines} 行日志:**\\n\\n```\\n{log_text}\\n```",
parse_mode='Markdown'
)
else:
await update.message.reply_text("❌ 日志文件不存在")
except Exception as e:
await update.message.reply_text(f"❌ 读取日志失败:{str(e)}")
async def handle_cache(self, update, context):
"""处理 /cache 命令 - 查看缓存详情"""
user = update.effective_user
if user.id != ADMIN_ID:
return
stats = self._get_cache_details()
text = (
"💾 **缓存详细信息**\\n\\n"
f"📦 总记录数:{stats['total']}\\n"
f"✅ 有效记录:{stats['valid']}\\n"
f"❌ 过期记录:{stats['expired']}\\n"
f"📈 命中率:{stats['hit_rate']:.1f}%\\n"
f"💿 数据库大小:{stats['db_size_mb']:.2f} MB\\n\\n"
"🔝 **热门搜索:**\\n"
)
for item in stats['top_searches'][:10]:
text += f"{item['command']} {item['keyword']} ({item['count']}次)\\n"
await update.message.reply_text(text, parse_mode='Markdown')
def _get_unique_users_from_logs(self):
"""从日志中提取唯一用户数"""
try:
log_file = "./logs/integrated_bot_detailed.log"
if not os.path.exists(log_file):
return 0
user_ids = set()
with open(log_file, 'r') as f:
for line in f:
if '用户' in line:
import re
matches = re.findall(r'用户 (\\d+)', line)
user_ids.update(matches)
return len(user_ids)
except:
return 0
def _get_users_from_logs(self):
"""从日志中获取用户列表及交互次数"""
try:
log_file = "./logs/integrated_bot_detailed.log"
if not os.path.exists(log_file):
return []
user_counts = {}
with open(log_file, 'r') as f:
for line in f:
if '用户' in line:
import re
matches = re.findall(r'用户 (\\d+)', line)
for user_id in matches:
user_counts[user_id] = user_counts.get(user_id, 0) + 1
return sorted(user_counts.items(), key=lambda x: x[1], reverse=True)
except:
return []
def _get_user_info(self, user_id):
"""获取指定用户的详细信息"""
try:
log_file = "./logs/integrated_bot_detailed.log"
if not os.path.exists(log_file):
return None
info = {
'interactions': 0,
'searches': 0,
'ai_chats': 0,
'first_seen': None,
'last_seen': None
}
with open(log_file, 'r') as f:
for line in f:
if f'用户 {user_id}' in line:
info['interactions'] += 1
# 提取时间戳
import re
time_match = re.search(r'(\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2})', line)
if time_match:
timestamp = time_match.group(1)
if not info['first_seen']:
info['first_seen'] = timestamp
info['last_seen'] = timestamp
if '镜像' in line or '搜索' in line:
info['searches'] += 1
if 'AI对话' in line or 'Claude API' in line:
info['ai_chats'] += 1
return info if info['interactions'] > 0 else None
except:
return None
def _get_cache_stats(self):
"""获取缓存统计"""
try:
db_path = "/home/atai/bot_data/cache.db"
if not os.path.exists(db_path):
return {'total': 0, 'hit_rate': 0}
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# 总记录数
cursor.execute("SELECT COUNT(*) FROM search_cache")
total = cursor.fetchone()[0]
# 访问统计
cursor.execute("SELECT SUM(access_count) FROM search_cache")
total_accesses = cursor.fetchone()[0] or 0
conn.close()
hit_rate = (total_accesses / (total_accesses + total) * 100) if total > 0 else 0
return {
'total': total,
'hit_rate': hit_rate
}
except:
return {'total': 0, 'hit_rate': 0}
def _get_cache_details(self):
"""获取缓存详细统计"""
try:
db_path = "/home/atai/bot_data/cache.db"
if not os.path.exists(db_path):
return {'total': 0, 'valid': 0, 'expired': 0, 'hit_rate': 0, 'db_size_mb': 0, 'top_searches': []}
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# 总记录数
cursor.execute("SELECT COUNT(*) FROM search_cache")
total = cursor.fetchone()[0]
# 有效记录数
cursor.execute("SELECT COUNT(*) FROM search_cache WHERE expires_at IS NULL OR expires_at > ?",
(datetime.now().isoformat(),))
valid = cursor.fetchone()[0]
# 访问统计
cursor.execute("SELECT SUM(access_count) FROM search_cache")
total_accesses = cursor.fetchone()[0] or 0
# 热门搜索
cursor.execute("""
SELECT command, keyword, access_count
FROM search_cache
ORDER BY access_count DESC
LIMIT 10
""")
top_searches = [
{'command': row[0], 'keyword': row[1], 'count': row[2]}
for row in cursor.fetchall()
]
conn.close()
# 数据库大小
db_size_mb = os.path.getsize(db_path) / 1024 / 1024
hit_rate = (total_accesses / (total_accesses + total) * 100) if total > 0 else 0
return {
'total': total,
'valid': valid,
'expired': total - valid,
'hit_rate': hit_rate,
'db_size_mb': db_size_mb,
'top_searches': top_searches
}
except Exception as e:
print(f"Error: {e}")
return {'total': 0, 'valid': 0, 'expired': 0, 'hit_rate': 0, 'db_size_mb': 0, 'top_searches': []}