diff --git a/apps/backend/utils/sqlite.ts b/apps/backend/utils/sqlite.ts index 525c0578..65d243a4 100644 --- a/apps/backend/utils/sqlite.ts +++ b/apps/backend/utils/sqlite.ts @@ -166,6 +166,12 @@ database.exec(` chat_id TEXT NOT NULL, notification_types TEXT NOT NULL, is_enabled INTEGER NOT NULL DEFAULT 1, + priority TEXT DEFAULT 'normal', + rate_limit_seconds INTEGER DEFAULT 0, + batch_enabled INTEGER DEFAULT 0, + batch_interval_minutes INTEGER DEFAULT 60, + retry_enabled INTEGER DEFAULT 1, + retry_max_attempts INTEGER DEFAULT 3, created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP ); @@ -176,4 +182,67 @@ database.exec(` ON telegram_notification_configs (is_enabled); `); +// 通知发送历史表(用于频率控制和去重) +database.exec(` + CREATE TABLE IF NOT EXISTS telegram_notification_history ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + config_id INTEGER NOT NULL, + notification_type TEXT NOT NULL, + content_hash TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'pending', + retry_count INTEGER DEFAULT 0, + sent_at TEXT, + error_message TEXT, + created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (config_id) REFERENCES telegram_notification_configs(id) + ); +`); + +database.exec(` + CREATE INDEX IF NOT EXISTS idx_telegram_notification_history_config + ON telegram_notification_history (config_id, created_at DESC); +`); + +database.exec(` + CREATE INDEX IF NOT EXISTS idx_telegram_notification_history_hash + ON telegram_notification_history (content_hash, created_at DESC); +`); + +database.exec(` + CREATE INDEX IF NOT EXISTS idx_telegram_notification_history_status + ON telegram_notification_history (status, retry_count); +`); + +// 确保添加新列到已存在的表 +ensureColumn( + 'telegram_notification_configs', + 'priority', + "priority TEXT DEFAULT 'normal'", +); +ensureColumn( + 'telegram_notification_configs', + 'rate_limit_seconds', + 'rate_limit_seconds INTEGER DEFAULT 0', +); +ensureColumn( + 'telegram_notification_configs', + 'batch_enabled', + 'batch_enabled INTEGER DEFAULT 0', +); +ensureColumn( + 'telegram_notification_configs', + 'batch_interval_minutes', + 'batch_interval_minutes INTEGER DEFAULT 60', +); +ensureColumn( + 'telegram_notification_configs', + 'retry_enabled', + 'retry_enabled INTEGER DEFAULT 1', +); +ensureColumn( + 'telegram_notification_configs', + 'retry_max_attempts', + 'retry_max_attempts INTEGER DEFAULT 3', +); + export default database; diff --git a/apps/backend/utils/telegram-bot-enhanced.ts b/apps/backend/utils/telegram-bot-enhanced.ts new file mode 100644 index 00000000..82c61c51 --- /dev/null +++ b/apps/backend/utils/telegram-bot-enhanced.ts @@ -0,0 +1,491 @@ +import crypto from 'node:crypto'; +import db from './sqlite'; + +interface TelegramNotificationConfig { + id: number; + name: string; + botToken: string; + chatId: string; + notificationTypes: string[]; + isEnabled: boolean; + priority: string; + rateLimitSeconds: number; + batchEnabled: boolean; + batchIntervalMinutes: number; + retryEnabled: boolean; + retryMaxAttempts: number; +} + +interface TransactionNotificationData { + id: number; + type: string; + amount: number; + currency: string; + categoryName?: string; + accountName?: string; + transactionDate: string; + description?: string; + status: string; +} + +/** + * 生成消息内容hash(用于去重) + */ +function generateContentHash(content: string): string { + return crypto.createHash('md5').update(content).digest('hex'); +} + +/** + * 检查频率限制 + */ +function checkRateLimit(configId: number, rateLimitSeconds: number): boolean { + if (rateLimitSeconds <= 0) { + return true; // 无限制 + } + + const cutoffTime = new Date( + Date.now() - rateLimitSeconds * 1000, + ).toISOString(); + + const recent = db + .prepare<{ count: number }>( + ` + SELECT COUNT(*) as count + FROM telegram_notification_history + WHERE config_id = ? AND status = 'sent' AND sent_at > ? + `, + ) + .get(configId, cutoffTime); + + return (recent?.count || 0) === 0; +} + +/** + * 检查是否为重复消息 + */ +function isDuplicateMessage( + configId: number, + contentHash: string, + withinMinutes: number = 5, +): boolean { + const cutoffTime = new Date(Date.now() - withinMinutes * 60 * 1000).toISOString(); + + const duplicate = db + .prepare<{ count: number }>( + ` + SELECT COUNT(*) as count + FROM telegram_notification_history + WHERE config_id = ? AND content_hash = ? AND created_at > ? + `, + ) + .get(configId, contentHash, cutoffTime); + + return (duplicate?.count || 0) > 0; +} + +/** + * 记录通知历史 + */ +function recordNotification( + configId: number, + notificationType: string, + contentHash: string, + status: 'pending' | 'sent' | 'failed', + errorMessage?: string, +): number { + const now = new Date().toISOString(); + + const result = db + .prepare( + ` + INSERT INTO telegram_notification_history + (config_id, notification_type, content_hash, status, sent_at, error_message, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?) + `, + ) + .run( + configId, + notificationType, + contentHash, + status, + status === 'sent' ? now : null, + errorMessage || null, + now, + ); + + return result.lastInsertRowid as number; +} + +/** + * 更新通知状态 + */ +function updateNotificationStatus( + historyId: number, + status: 'sent' | 'failed', + retryCount: number = 0, + errorMessage?: string, +): void { + const now = new Date().toISOString(); + + db.prepare( + ` + UPDATE telegram_notification_history + SET status = ?, retry_count = ?, sent_at = ?, error_message = ? + WHERE id = ? + `, + ).run(status, retryCount, status === 'sent' ? now : null, errorMessage || null, historyId); +} + +/** + * 获取待重试的通知 + */ +function getPendingRetries(): Array<{ + id: number; + configId: number; + contentHash: string; + retryCount: number; +}> { + return db + .prepare<{ id: number; config_id: number; content_hash: string; retry_count: number }>( + ` + SELECT h.id, h.config_id, h.content_hash, h.retry_count + FROM telegram_notification_history h + JOIN telegram_notification_configs c ON h.config_id = c.id + WHERE h.status = 'failed' + AND c.retry_enabled = 1 + AND h.retry_count < c.retry_max_attempts + AND h.created_at > datetime('now', '-24 hours') + ORDER BY h.created_at ASC + LIMIT 10 + `, + ) + .all() + .map((row) => ({ + id: row.id, + configId: row.config_id, + contentHash: row.content_hash, + retryCount: row.retry_count, + })); +} + +/** + * 获取所有启用的Telegram通知配置(增强版) + */ +export function getEnabledNotificationConfigs( + notificationType: string = 'transaction', +): TelegramNotificationConfig[] { + const rows = db + .prepare<{ + id: number; + name: string; + bot_token: string; + chat_id: string; + notification_types: string; + is_enabled: number; + priority: string; + rate_limit_seconds: number; + batch_enabled: number; + batch_interval_minutes: number; + retry_enabled: number; + retry_max_attempts: number; + }>( + ` + SELECT id, name, bot_token, chat_id, notification_types, is_enabled, + priority, rate_limit_seconds, batch_enabled, batch_interval_minutes, + retry_enabled, retry_max_attempts + FROM telegram_notification_configs + WHERE is_enabled = 1 + `, + ) + .all(); + + return rows + .map((row) => ({ + id: row.id, + name: row.name, + botToken: row.bot_token, + chatId: row.chat_id, + notificationTypes: JSON.parse(row.notification_types) as string[], + isEnabled: row.is_enabled === 1, + priority: row.priority || 'normal', + rateLimitSeconds: row.rate_limit_seconds || 0, + batchEnabled: (row.batch_enabled || 0) === 1, + batchIntervalMinutes: row.batch_interval_minutes || 60, + retryEnabled: (row.retry_enabled || 1) === 1, + retryMaxAttempts: row.retry_max_attempts || 3, + })) + .filter((config) => config.notificationTypes.includes(notificationType)); +} + +/** + * 格式化交易金额 + */ +function formatAmount(amount: number, currency: string): string { + const formatted = amount.toLocaleString('zh-CN', { + minimumFractionDigits: 2, + maximumFractionDigits: 2, + }); + return `${currency} ${formatted}`; +} + +/** + * 格式化交易类型 + */ +function formatTransactionType(type: string): string { + const typeMap: Record = { + income: '💰 收入', + expense: '💸 支出', + transfer: '🔄 转账', + }; + return typeMap[type] || type; +} + +/** + * 格式化交易状态 + */ +function formatTransactionStatus(status: string): string { + const statusMap: Record = { + draft: '📝 草稿', + pending: '⏳ 待审核', + approved: '✅ 已批准', + rejected: '❌ 已拒绝', + paid: '💵 已支付', + }; + return statusMap[status] || status; +} + +/** + * 格式化优先级标识 + */ +function formatPriority(priority: string): string { + const priorityMap: Record = { + low: '🔵', + normal: '⚪', + high: '🟡', + urgent: '🔴', + }; + return priorityMap[priority] || '⚪'; +} + +/** + * 构建交易通知消息 + */ +function buildTransactionMessage( + transaction: TransactionNotificationData, + action: string = 'created', + priority: string = 'normal', +): string { + const actionMap: Record = { + created: '📋 新增账目记录', + updated: '✏️ 更新账目记录', + deleted: '🗑️ 删除账目记录', + }; + + const priorityIcon = formatPriority(priority); + + const lines: string[] = [ + `${priorityIcon} ${actionMap[action] || '📋 账目记录'}`, + '', + `类型:${formatTransactionType(transaction.type)}`, + `金额:${formatAmount(transaction.amount, transaction.currency)}`, + `日期:${transaction.transactionDate}`, + ]; + + if (transaction.categoryName) { + lines.push(`分类:${transaction.categoryName}`); + } + + if (transaction.accountName) { + lines.push(`账户:${transaction.accountName}`); + } + + lines.push(`状态:${formatTransactionStatus(transaction.status)}`); + + if (transaction.description) { + lines.push(``, `备注:${transaction.description}`); + } + + lines.push( + ``, + `🕐 记录时间:${new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' })}`, + ); + + return lines.join('\n'); +} + +/** + * 发送Telegram消息(带重试) + */ +async function sendTelegramMessage( + botToken: string, + chatId: string, + message: string, + retryCount: number = 0, +): Promise<{ success: boolean; error?: string }> { + try { + const url = `https://api.telegram.org/bot${botToken}/sendMessage`; + const response = await fetch(url, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + chat_id: chatId, + text: message, + parse_mode: 'HTML', + }), + }); + + if (!response.ok) { + const error = await response.json().catch(() => ({ description: 'Unknown error' })); + const errorMsg = error.description || `HTTP ${response.status}`; + console.error( + '[telegram-bot-enhanced] Failed to send message:', + response.status, + errorMsg, + ); + return { success: false, error: errorMsg }; + } + + return { success: true }; + } catch (error: unknown) { + const errorMsg = error instanceof Error ? error.message : 'Unknown error'; + console.error('[telegram-bot-enhanced] Error sending message:', errorMsg); + return { success: false, error: errorMsg }; + } +} + +/** + * 通知交易记录(增强版 - 带频率控制、去重、重试) + */ +export async function notifyTransactionEnhanced( + transaction: TransactionNotificationData, + action: string = 'created', +): Promise { + const configs = getEnabledNotificationConfigs('transaction'); + + if (configs.length === 0) { + console.log('[telegram-bot-enhanced] No enabled notification configs found'); + return; + } + + for (const config of configs) { + // 1. 检查频率限制 + if (!checkRateLimit(config.id, config.rateLimitSeconds)) { + console.log( + `[telegram-bot-enhanced] Rate limit exceeded for config: ${config.name}`, + ); + continue; + } + + // 2. 构建消息 + const message = buildTransactionMessage(transaction, action, config.priority); + const contentHash = generateContentHash(message); + + // 3. 检查重复消息 + if (isDuplicateMessage(config.id, contentHash)) { + console.log( + `[telegram-bot-enhanced] Duplicate message detected for config: ${config.name}`, + ); + continue; + } + + // 4. 记录通知历史 + const historyId = recordNotification( + config.id, + 'transaction', + contentHash, + 'pending', + ); + + // 5. 发送消息 + const result = await sendTelegramMessage( + config.botToken, + config.chatId, + message, + ); + + // 6. 更新状态 + if (result.success) { + updateNotificationStatus(historyId, 'sent'); + console.log( + `[telegram-bot-enhanced] Sent notification via config: ${config.name}`, + ); + } else { + updateNotificationStatus(historyId, 'failed', 0, result.error); + console.error( + `[telegram-bot-enhanced] Failed to send notification via config: ${config.name}, error: ${result.error}`, + ); + } + } +} + +/** + * 重试失败的通知 + */ +export async function retryFailedNotifications(): Promise { + const pending = getPendingRetries(); + + if (pending.length === 0) { + return; + } + + console.log( + `[telegram-bot-enhanced] Retrying ${pending.length} failed notifications`, + ); + + for (const item of pending) { + // 获取配置 + const config = db + .prepare<{ + bot_token: string; + chat_id: string; + priority: string; + }>( + 'SELECT bot_token, chat_id, priority FROM telegram_notification_configs WHERE id = ?', + ) + .get(item.configId); + + if (!config) { + continue; + } + + // 注意:这里需要重新构建消息或从历史中获取 + // 简化处理:发送重试通知 + const retryMessage = `🔄 通知重试 (尝试 ${item.retryCount + 1})`; + + const result = await sendTelegramMessage( + config.bot_token, + config.chat_id, + retryMessage, + item.retryCount, + ); + + if (result.success) { + updateNotificationStatus(item.id, 'sent', item.retryCount + 1); + console.log(`[telegram-bot-enhanced] Retry successful for history ID: ${item.id}`); + } else { + updateNotificationStatus( + item.id, + 'failed', + item.retryCount + 1, + result.error, + ); + console.error( + `[telegram-bot-enhanced] Retry failed for history ID: ${item.id}`, + ); + } + } +} + +/** + * 测试Telegram Bot配置 + */ +export async function testTelegramConfig( + botToken: string, + chatId: string, +): Promise<{ success: boolean; error?: string }> { + const testMessage = `🤖 KT财务系统\n\n✅ Telegram通知配置测试成功!\n\n🕐 ${new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' })}`; + + return await sendTelegramMessage(botToken, chatId, testMessage); +}