348 lines
12 KiB
Python
348 lines
12 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
管理员命令模块
|
||
为integrated_bot_ai.py添加管理员后台功能
|
||
"""
|
||
import sqlite3
|
||
from datetime import datetime, timedelta
|
||
from typing import Dict, List
|
||
import os
|
||
import psutil
|
||
|
||
ADMIN_ID = 7363537082
|
||
|
||
class AdminCommands:
|
||
"""管理员命令处理类"""
|
||
|
||
def __init__(self, bot_instance, logger):
|
||
self.bot = bot_instance
|
||
self.logger = logger
|
||
self.stats = {
|
||
"start_time": datetime.now(),
|
||
"messages_received": 0,
|
||
"searches_performed": 0,
|
||
"ai_queries": 0,
|
||
"cache_hits": 0
|
||
}
|
||
|
||
async def handle_stats(self, update, context):
|
||
"""处理 /stats 命令 - 查看统计信息"""
|
||
user = update.effective_user
|
||
if user.id != ADMIN_ID:
|
||
return
|
||
|
||
uptime = datetime.now() - self.stats["start_time"]
|
||
days = uptime.days
|
||
hours = uptime.seconds // 3600
|
||
minutes = (uptime.seconds % 3600) // 60
|
||
|
||
# 获取进程信息
|
||
process = psutil.Process(os.getpid())
|
||
memory_mb = process.memory_info().rss / 1024 / 1024
|
||
|
||
# 查询日志获取用户数
|
||
user_count = self._get_unique_users_from_logs()
|
||
|
||
# 查询缓存数据库
|
||
cache_stats = self._get_cache_stats()
|
||
|
||
text = (
|
||
"📊 **机器人统计信息**\\n\\n"
|
||
f"⏱ **运行时间:** {days}天 {hours}小时 {minutes}分钟\\n"
|
||
f"📅 **启动时间:** {self.stats['start_time'].strftime('%Y-%m-%d %H:%M:%S')}\\n\\n"
|
||
|
||
"👥 **用户统计:**\\n"
|
||
f"• 总访问用户:{user_count}人\\n"
|
||
f"• AI对话次数:{self.stats['ai_queries']}\\n"
|
||
f"• 搜索执行次数:{self.stats['searches_performed']}\\n\\n"
|
||
|
||
"💾 **缓存统计:**\\n"
|
||
f"• 缓存记录数:{cache_stats['total']}\\n"
|
||
f"• 缓存命中率:{cache_stats['hit_rate']:.1f}%\\n\\n"
|
||
|
||
"💻 **系统资源:**\\n"
|
||
f"• 内存使用:{memory_mb:.1f} MB\\n"
|
||
f"• CPU核心数:{psutil.cpu_count()}\\n"
|
||
)
|
||
|
||
await update.message.reply_text(text, parse_mode='Markdown')
|
||
self.logger.info(f"[管理员] 查看统计信息")
|
||
|
||
async def handle_users(self, update, context):
|
||
"""处理 /users 命令 - 查看用户列表"""
|
||
user = update.effective_user
|
||
if user.id != ADMIN_ID:
|
||
return
|
||
|
||
users = self._get_users_from_logs()
|
||
|
||
if not users:
|
||
await update.message.reply_text("暂无用户访问记录")
|
||
return
|
||
|
||
text = f"👥 **访问用户列表** ({len(users)}人)\\n\\n"
|
||
|
||
for i, (user_id, count) in enumerate(users[:20], 1):
|
||
text += f"{i}. 用户 `{user_id}` - {count}次交互\\n"
|
||
|
||
if len(users) > 20:
|
||
text += f"\\n... 还有 {len(users)-20} 个用户"
|
||
|
||
text += f"\\n\\n💡 使用 /userinfo <用户ID> 查看详情"
|
||
|
||
await update.message.reply_text(text, parse_mode='Markdown')
|
||
self.logger.info(f"[管理员] 查看用户列表")
|
||
|
||
async def handle_userinfo(self, update, context):
|
||
"""处理 /userinfo 命令 - 查看指定用户信息"""
|
||
user = update.effective_user
|
||
if user.id != ADMIN_ID:
|
||
return
|
||
|
||
if not context.args:
|
||
await update.message.reply_text(
|
||
"用法:/userinfo <用户ID>\\n"
|
||
"示例:/userinfo 123456789"
|
||
)
|
||
return
|
||
|
||
try:
|
||
target_user_id = int(context.args[0])
|
||
info = self._get_user_info(target_user_id)
|
||
|
||
if not info:
|
||
await update.message.reply_text(f"未找到用户 {target_user_id} 的记录")
|
||
return
|
||
|
||
text = (
|
||
f"👤 **用户信息** (ID: `{target_user_id}`)\\n\\n"
|
||
f"📊 交互次数:{info['interactions']}\\n"
|
||
f"🔍 搜索次数:{info['searches']}\\n"
|
||
f"💬 AI对话次数:{info['ai_chats']}\\n"
|
||
f"⏰ 首次访问:{info['first_seen']}\\n"
|
||
f"🕐 最后活跃:{info['last_seen']}\\n"
|
||
)
|
||
|
||
await update.message.reply_text(text, parse_mode='Markdown')
|
||
|
||
except ValueError:
|
||
await update.message.reply_text("❌ 用户ID格式错误,请输入数字")
|
||
except Exception as e:
|
||
await update.message.reply_text(f"❌ 查询失败:{str(e)}")
|
||
self.logger.error(f"[管理员] 查询用户信息失败: {e}")
|
||
|
||
async def handle_logs(self, update, context):
|
||
"""处理 /logs 命令 - 查看最近日志"""
|
||
user = update.effective_user
|
||
if user.id != ADMIN_ID:
|
||
return
|
||
|
||
lines = context.args[0] if context.args else "50"
|
||
try:
|
||
num_lines = int(lines)
|
||
log_file = "./logs/integrated_bot_detailed.log"
|
||
|
||
if os.path.exists(log_file):
|
||
with open(log_file, 'r') as f:
|
||
all_lines = f.readlines()
|
||
recent_lines = all_lines[-num_lines:]
|
||
|
||
log_text = ''.join(recent_lines)
|
||
|
||
# Telegram消息限制4096字符
|
||
if len(log_text) > 4000:
|
||
log_text = log_text[-4000:]
|
||
|
||
await update.message.reply_text(
|
||
f"📋 **最近 {num_lines} 行日志:**\\n\\n```\\n{log_text}\\n```",
|
||
parse_mode='Markdown'
|
||
)
|
||
else:
|
||
await update.message.reply_text("❌ 日志文件不存在")
|
||
|
||
except Exception as e:
|
||
await update.message.reply_text(f"❌ 读取日志失败:{str(e)}")
|
||
|
||
async def handle_cache(self, update, context):
|
||
"""处理 /cache 命令 - 查看缓存详情"""
|
||
user = update.effective_user
|
||
if user.id != ADMIN_ID:
|
||
return
|
||
|
||
stats = self._get_cache_details()
|
||
|
||
text = (
|
||
"💾 **缓存详细信息**\\n\\n"
|
||
f"📦 总记录数:{stats['total']}\\n"
|
||
f"✅ 有效记录:{stats['valid']}\\n"
|
||
f"❌ 过期记录:{stats['expired']}\\n"
|
||
f"📈 命中率:{stats['hit_rate']:.1f}%\\n"
|
||
f"💿 数据库大小:{stats['db_size_mb']:.2f} MB\\n\\n"
|
||
"🔝 **热门搜索:**\\n"
|
||
)
|
||
|
||
for item in stats['top_searches'][:10]:
|
||
text += f"• {item['command']} {item['keyword']} ({item['count']}次)\\n"
|
||
|
||
await update.message.reply_text(text, parse_mode='Markdown')
|
||
|
||
def _get_unique_users_from_logs(self):
|
||
"""从日志中提取唯一用户数"""
|
||
try:
|
||
log_file = "./logs/integrated_bot_detailed.log"
|
||
if not os.path.exists(log_file):
|
||
return 0
|
||
|
||
user_ids = set()
|
||
with open(log_file, 'r') as f:
|
||
for line in f:
|
||
if '用户' in line:
|
||
import re
|
||
matches = re.findall(r'用户 (\\d+)', line)
|
||
user_ids.update(matches)
|
||
|
||
return len(user_ids)
|
||
except:
|
||
return 0
|
||
|
||
def _get_users_from_logs(self):
|
||
"""从日志中获取用户列表及交互次数"""
|
||
try:
|
||
log_file = "./logs/integrated_bot_detailed.log"
|
||
if not os.path.exists(log_file):
|
||
return []
|
||
|
||
user_counts = {}
|
||
with open(log_file, 'r') as f:
|
||
for line in f:
|
||
if '用户' in line:
|
||
import re
|
||
matches = re.findall(r'用户 (\\d+)', line)
|
||
for user_id in matches:
|
||
user_counts[user_id] = user_counts.get(user_id, 0) + 1
|
||
|
||
return sorted(user_counts.items(), key=lambda x: x[1], reverse=True)
|
||
except:
|
||
return []
|
||
|
||
def _get_user_info(self, user_id):
|
||
"""获取指定用户的详细信息"""
|
||
try:
|
||
log_file = "./logs/integrated_bot_detailed.log"
|
||
if not os.path.exists(log_file):
|
||
return None
|
||
|
||
info = {
|
||
'interactions': 0,
|
||
'searches': 0,
|
||
'ai_chats': 0,
|
||
'first_seen': None,
|
||
'last_seen': None
|
||
}
|
||
|
||
with open(log_file, 'r') as f:
|
||
for line in f:
|
||
if f'用户 {user_id}' in line:
|
||
info['interactions'] += 1
|
||
|
||
# 提取时间戳
|
||
import re
|
||
time_match = re.search(r'(\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2})', line)
|
||
if time_match:
|
||
timestamp = time_match.group(1)
|
||
if not info['first_seen']:
|
||
info['first_seen'] = timestamp
|
||
info['last_seen'] = timestamp
|
||
|
||
if '镜像' in line or '搜索' in line:
|
||
info['searches'] += 1
|
||
if 'AI对话' in line or 'Claude API' in line:
|
||
info['ai_chats'] += 1
|
||
|
||
return info if info['interactions'] > 0 else None
|
||
except:
|
||
return None
|
||
|
||
def _get_cache_stats(self):
|
||
"""获取缓存统计"""
|
||
try:
|
||
db_path = "/home/atai/bot_data/cache.db"
|
||
if not os.path.exists(db_path):
|
||
return {'total': 0, 'hit_rate': 0}
|
||
|
||
conn = sqlite3.connect(db_path)
|
||
cursor = conn.cursor()
|
||
|
||
# 总记录数
|
||
cursor.execute("SELECT COUNT(*) FROM search_cache")
|
||
total = cursor.fetchone()[0]
|
||
|
||
# 访问统计
|
||
cursor.execute("SELECT SUM(access_count) FROM search_cache")
|
||
total_accesses = cursor.fetchone()[0] or 0
|
||
|
||
conn.close()
|
||
|
||
hit_rate = (total_accesses / (total_accesses + total) * 100) if total > 0 else 0
|
||
|
||
return {
|
||
'total': total,
|
||
'hit_rate': hit_rate
|
||
}
|
||
except:
|
||
return {'total': 0, 'hit_rate': 0}
|
||
|
||
def _get_cache_details(self):
|
||
"""获取缓存详细统计"""
|
||
try:
|
||
db_path = "/home/atai/bot_data/cache.db"
|
||
if not os.path.exists(db_path):
|
||
return {'total': 0, 'valid': 0, 'expired': 0, 'hit_rate': 0, 'db_size_mb': 0, 'top_searches': []}
|
||
|
||
conn = sqlite3.connect(db_path)
|
||
cursor = conn.cursor()
|
||
|
||
# 总记录数
|
||
cursor.execute("SELECT COUNT(*) FROM search_cache")
|
||
total = cursor.fetchone()[0]
|
||
|
||
# 有效记录数
|
||
cursor.execute("SELECT COUNT(*) FROM search_cache WHERE expires_at IS NULL OR expires_at > ?",
|
||
(datetime.now().isoformat(),))
|
||
valid = cursor.fetchone()[0]
|
||
|
||
# 访问统计
|
||
cursor.execute("SELECT SUM(access_count) FROM search_cache")
|
||
total_accesses = cursor.fetchone()[0] or 0
|
||
|
||
# 热门搜索
|
||
cursor.execute("""
|
||
SELECT command, keyword, access_count
|
||
FROM search_cache
|
||
ORDER BY access_count DESC
|
||
LIMIT 10
|
||
""")
|
||
top_searches = [
|
||
{'command': row[0], 'keyword': row[1], 'count': row[2]}
|
||
for row in cursor.fetchall()
|
||
]
|
||
|
||
conn.close()
|
||
|
||
# 数据库大小
|
||
db_size_mb = os.path.getsize(db_path) / 1024 / 1024
|
||
|
||
hit_rate = (total_accesses / (total_accesses + total) * 100) if total > 0 else 0
|
||
|
||
return {
|
||
'total': total,
|
||
'valid': valid,
|
||
'expired': total - valid,
|
||
'hit_rate': hit_rate,
|
||
'db_size_mb': db_size_mb,
|
||
'top_searches': top_searches
|
||
}
|
||
except Exception as e:
|
||
print(f"Error: {e}")
|
||
return {'total': 0, 'valid': 0, 'expired': 0, 'hit_rate': 0, 'db_size_mb': 0, 'top_searches': []}
|