feat: migrate backend storage to postgres
Some checks failed
Deploy to Production / Build and Test (push) Successful in 10m51s
Deploy to Production / Deploy to Server (push) Failing after 6m41s

This commit is contained in:
你的用户名
2025-11-06 22:01:50 +08:00
parent 3646405a47
commit b68511b2e2
28 changed files with 2641 additions and 1801 deletions

314
apps/backend/utils/db.ts Normal file
View File

@@ -0,0 +1,314 @@
import process from 'node:process';
import type { PoolClient } from 'pg';
import { Pool } from 'pg';
import {
MOCK_ACCOUNTS,
MOCK_CATEGORIES,
MOCK_CURRENCIES,
MOCK_EXCHANGE_RATES,
} from './mock-data';
const DEFAULT_HOST = process.env.POSTGRES_HOST ?? 'postgres';
const DEFAULT_PORT = Number.parseInt(process.env.POSTGRES_PORT ?? '5432', 10);
const DEFAULT_DB = process.env.POSTGRES_DB ?? 'kt_financial';
const DEFAULT_USER = process.env.POSTGRES_USER ?? 'kt_financial';
const DEFAULT_PASSWORD = process.env.POSTGRES_PASSWORD ?? 'kt_financial_pwd';
const connectionString =
process.env.POSTGRES_URL ??
`postgresql://${DEFAULT_USER}:${DEFAULT_PASSWORD}@${DEFAULT_HOST}:${DEFAULT_PORT}/${DEFAULT_DB}`;
const pool = new Pool({
connectionString,
max: 10,
});
let initPromise: null | Promise<void> = null;
async function seedCurrencies(client: PoolClient) {
await Promise.all(
MOCK_CURRENCIES.map((currency) =>
client.query(
`INSERT INTO finance_currencies (code, name, symbol, is_base, is_active)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (code) DO NOTHING`,
[
currency.code,
currency.name,
currency.symbol,
currency.isBase,
currency.isActive,
],
),
),
);
}
async function seedExchangeRates(client: PoolClient) {
await Promise.all(
MOCK_EXCHANGE_RATES.map((rate) =>
client.query(
`INSERT INTO finance_exchange_rates (from_currency, to_currency, rate, date, source)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT DO NOTHING`,
[
rate.fromCurrency,
rate.toCurrency,
rate.rate,
rate.date,
rate.source ?? 'manual',
],
),
),
);
}
async function seedAccounts(client: PoolClient) {
await Promise.all(
MOCK_ACCOUNTS.map((account) =>
client.query(
`INSERT INTO finance_accounts (id, name, currency, type, icon, color, user_id, is_active)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (id) DO NOTHING`,
[
account.id,
account.name,
account.currency,
account.type,
account.icon,
account.color,
account.userId ?? 1,
account.isActive,
],
),
),
);
}
async function seedCategories(client: PoolClient) {
await Promise.all(
MOCK_CATEGORIES.map((category) =>
client.query(
`INSERT INTO finance_categories (id, name, type, icon, color, user_id, is_active)
VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (id) DO NOTHING`,
[
category.id,
category.name,
category.type,
category.icon,
category.color,
category.userId,
category.isActive,
],
),
),
);
}
async function initializeSchema() {
const client = await pool.connect();
try {
await client.query('BEGIN');
await client.query(`
CREATE TABLE IF NOT EXISTS finance_currencies (
code TEXT PRIMARY KEY,
name TEXT NOT NULL,
symbol TEXT NOT NULL,
is_base BOOLEAN NOT NULL DEFAULT FALSE,
is_active BOOLEAN NOT NULL DEFAULT TRUE
);
`);
await client.query(`
CREATE TABLE IF NOT EXISTS finance_exchange_rates (
id SERIAL PRIMARY KEY,
from_currency TEXT NOT NULL REFERENCES finance_currencies(code),
to_currency TEXT NOT NULL REFERENCES finance_currencies(code),
rate NUMERIC NOT NULL,
date DATE NOT NULL,
source TEXT DEFAULT 'manual'
);
`);
await client.query(`
CREATE TABLE IF NOT EXISTS finance_accounts (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
currency TEXT NOT NULL REFERENCES finance_currencies(code),
type TEXT DEFAULT 'cash',
icon TEXT,
color TEXT,
user_id INTEGER DEFAULT 1,
is_active BOOLEAN DEFAULT TRUE
);
`);
await client.query(`
CREATE TABLE IF NOT EXISTS finance_categories (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
type TEXT NOT NULL,
icon TEXT,
color TEXT,
user_id INTEGER,
is_active BOOLEAN DEFAULT TRUE
);
`);
await client.query(`
CREATE TABLE IF NOT EXISTS finance_transactions (
id SERIAL PRIMARY KEY,
type TEXT NOT NULL,
amount NUMERIC NOT NULL,
currency TEXT NOT NULL REFERENCES finance_currencies(code),
exchange_rate_to_base NUMERIC NOT NULL,
amount_in_base NUMERIC NOT NULL,
category_id INTEGER REFERENCES finance_categories(id),
account_id INTEGER REFERENCES finance_accounts(id),
transaction_date DATE NOT NULL,
description TEXT,
project TEXT,
memo TEXT,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
status TEXT NOT NULL DEFAULT 'approved',
status_updated_at TIMESTAMP WITH TIME ZONE,
reimbursement_batch TEXT,
review_notes TEXT,
submitted_by TEXT,
approved_by TEXT,
approved_at TIMESTAMP WITH TIME ZONE,
is_deleted BOOLEAN NOT NULL DEFAULT FALSE,
deleted_at TIMESTAMP WITH TIME ZONE
);
`);
await client.query(`
CREATE TABLE IF NOT EXISTS finance_media_messages (
id SERIAL PRIMARY KEY,
chat_id BIGINT NOT NULL,
message_id BIGINT NOT NULL,
user_id INTEGER NOT NULL,
username TEXT,
display_name TEXT,
file_type TEXT NOT NULL,
file_id TEXT NOT NULL,
file_unique_id TEXT,
caption TEXT,
file_name TEXT,
file_path TEXT NOT NULL,
file_size INTEGER,
mime_type TEXT,
duration INTEGER,
width INTEGER,
height INTEGER,
forwarded_to INTEGER,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
UNIQUE(chat_id, message_id)
);
`);
await client.query(`
CREATE TABLE IF NOT EXISTS telegram_notification_configs (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
bot_token TEXT NOT NULL,
chat_id TEXT NOT NULL,
notification_types TEXT NOT NULL,
is_enabled BOOLEAN NOT NULL DEFAULT TRUE,
priority TEXT DEFAULT 'normal',
rate_limit_seconds INTEGER DEFAULT 0,
batch_enabled BOOLEAN DEFAULT FALSE,
batch_interval_minutes INTEGER DEFAULT 60,
retry_enabled BOOLEAN DEFAULT TRUE,
retry_max_attempts INTEGER DEFAULT 3,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
);
`);
await client.query(`
CREATE TABLE IF NOT EXISTS telegram_notification_history (
id SERIAL PRIMARY KEY,
config_id INTEGER NOT NULL REFERENCES telegram_notification_configs(id),
notification_type TEXT NOT NULL,
content_hash TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
retry_count INTEGER DEFAULT 0,
sent_at TIMESTAMP WITH TIME ZONE,
error_message TEXT,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
);
`);
await client.query(`
CREATE INDEX IF NOT EXISTS idx_finance_media_messages_created_at
ON finance_media_messages (created_at DESC);
`);
await client.query(`
CREATE INDEX IF NOT EXISTS idx_finance_media_messages_user_id
ON finance_media_messages (user_id);
`);
await client.query(`
CREATE INDEX IF NOT EXISTS idx_telegram_notification_configs_enabled
ON telegram_notification_configs (is_enabled);
`);
await client.query(`
CREATE INDEX IF NOT EXISTS idx_telegram_notification_history_config
ON telegram_notification_history (config_id, created_at DESC);
`);
await client.query(`
CREATE INDEX IF NOT EXISTS idx_telegram_notification_history_hash
ON telegram_notification_history (content_hash, created_at DESC);
`);
await client.query(`
CREATE INDEX IF NOT EXISTS idx_telegram_notification_history_status
ON telegram_notification_history (status, retry_count);
`);
await seedCurrencies(client);
await seedExchangeRates(client);
await seedAccounts(client);
await seedCategories(client);
await client.query('COMMIT');
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
export async function getPool() {
if (!initPromise) {
initPromise = initializeSchema();
}
await initPromise;
return pool;
}
export async function query<T = any>(text: string, params?: any[]) {
const client = await getPool();
const result = await client.query<T>(text, params);
return result;
}
export async function withTransaction<T>(
handler: (client: PoolClient) => Promise<T>,
) {
const client = await pool.connect();
try {
await client.query('BEGIN');
const result = await handler(client);
await client.query('COMMIT');
return result;
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}

View File

@@ -1,3 +1,4 @@
import { query } from './db';
import {
MOCK_ACCOUNTS,
MOCK_BUDGETS,
@@ -5,37 +6,87 @@ import {
MOCK_CURRENCIES,
MOCK_EXCHANGE_RATES,
} from './mock-data';
import db from './sqlite';
export function listAccounts() {
return MOCK_ACCOUNTS;
interface AccountRow {
id: number;
name: string;
type: string;
currency: string;
icon: null | string;
color: null | string;
user_id: null | number;
is_active: boolean;
}
export function listCategories() {
// 从数据库读取分类
interface CategoryRow {
id: number;
name: string;
type: string;
icon: null | string;
color: null | string;
user_id: null | number;
is_active: boolean;
}
function mapAccount(row: AccountRow) {
return {
id: row.id,
userId: row.user_id ?? 1,
name: row.name,
type: row.type,
currency: row.currency,
balance: 0,
icon: row.icon ?? '💳',
color: row.color ?? '#1677ff',
isActive: Boolean(row.is_active),
};
}
function mapCategory(row: CategoryRow) {
return {
id: row.id,
userId: row.user_id ?? 1,
name: row.name,
type: row.type as 'expense' | 'income',
icon: row.icon ?? '📝',
color: row.color ?? '#dfe4ea',
sortOrder: row.id,
isSystem: row.user_id === null,
isActive: Boolean(row.is_active),
};
}
export async function listAccounts() {
try {
const stmt = db.prepare(`
SELECT id, name, type, icon, color, user_id as userId, is_active as isActive
FROM finance_categories
WHERE is_active = 1
ORDER BY type, id
`);
const categories = stmt.all() as any[];
// 转换为前端需要的格式
return categories.map(cat => ({
id: cat.id,
userId: cat.userId,
name: cat.name,
type: cat.type,
icon: cat.icon,
color: cat.color,
sortOrder: cat.id,
isSystem: true,
isActive: Boolean(cat.isActive),
}));
const { rows } = await query<AccountRow>(
`SELECT id, name, type, currency, icon, color, user_id, is_active
FROM finance_accounts
ORDER BY id`,
);
if (rows.length === 0) {
return MOCK_ACCOUNTS;
}
return rows.map((row) => mapAccount(row));
} catch (error) {
console.error('从数据库读取分类失败使用MOCK数据:', error);
console.error('从数据库读取账户失败,使用 MOCK 数据:', error);
return MOCK_ACCOUNTS;
}
}
export async function listCategories() {
try {
const { rows } = await query<CategoryRow>(
`SELECT id, name, type, icon, color, user_id, is_active
FROM finance_categories
WHERE is_active = TRUE
ORDER BY type, id`,
);
if (rows.length === 0) {
return MOCK_CATEGORIES;
}
return rows.map((row) => mapCategory(row));
} catch (error) {
console.error('从数据库读取分类失败,使用 MOCK 数据:', error);
return MOCK_CATEGORIES;
}
}
@@ -52,76 +103,80 @@ export function listExchangeRates() {
return MOCK_EXCHANGE_RATES;
}
export function createCategoryRecord(category: any) {
export async function createCategoryRecord(category: any) {
try {
const stmt = db.prepare(`
INSERT INTO finance_categories (name, type, icon, color, user_id, is_active)
VALUES (?, ?, ?, ?, ?, 1)
`);
const result = stmt.run(
category.name,
category.type,
category.icon || '📝',
category.color || '#dfe4ea',
category.userId || 1
const { rows } = await query<CategoryRow>(
`INSERT INTO finance_categories (name, type, icon, color, user_id, is_active)
VALUES ($1, $2, $3, $4, $5, TRUE)
RETURNING id, name, type, icon, color, user_id, is_active`,
[
category.name,
category.type,
category.icon || '📝',
category.color || '#dfe4ea',
category.userId || 1,
],
);
return {
id: result.lastInsertRowid,
...category,
createdAt: new Date().toISOString(),
};
const row = rows[0];
return row
? {
...mapCategory(row),
createdAt: new Date().toISOString(),
}
: null;
} catch (error) {
console.error('创建分类失败:', error);
return null;
}
}
export function updateCategoryRecord(id: number, category: any) {
export async function updateCategoryRecord(id: number, category: any) {
try {
const updates: string[] = [];
const params: any[] = [];
if (category.name) {
updates.push('name = ?');
params.push(category.name);
updates.push(`name = $${params.length}`);
}
if (category.icon) {
updates.push('icon = ?');
params.push(category.icon);
updates.push(`icon = $${params.length}`);
}
if (category.color) {
updates.push('color = ?');
params.push(category.color);
updates.push(`color = $${params.length}`);
}
if (updates.length === 0) return null;
if (updates.length === 0) {
return null;
}
params.push(id);
const stmt = db.prepare(`
UPDATE finance_categories
SET ${updates.join(', ')}
WHERE id = ?
`);
stmt.run(...params);
// 返回更新后的分类
const selectStmt = db.prepare('SELECT * FROM finance_categories WHERE id = ?');
return selectStmt.get(id);
const setClause = updates.join(', ');
const { rows } = await query<CategoryRow>(
`UPDATE finance_categories
SET ${setClause}
WHERE id = $${params.length}
RETURNING id, name, type, icon, color, user_id, is_active`,
params,
);
const row = rows[0];
return row ? mapCategory(row) : null;
} catch (error) {
console.error('更新分类失败:', error);
return null;
}
}
export function deleteCategoryRecord(id: number) {
export async function deleteCategoryRecord(id: number) {
try {
// 软删除
const stmt = db.prepare(`
UPDATE finance_categories
SET is_active = 0
WHERE id = ?
`);
stmt.run(id);
await query(
`UPDATE finance_categories
SET is_active = FALSE
WHERE id = $1`,
[id],
);
return true;
} catch (error) {
console.error('删除分类失败:', error);

View File

@@ -1,14 +1,16 @@
import db from './sqlite';
import type { PoolClient } from 'pg';
import { query, withTransaction } from './db';
const BASE_CURRENCY = 'CNY';
interface TransactionRow {
id: number;
type: string;
amount: number;
amount: number | string;
currency: string;
exchange_rate_to_base: number;
amount_in_base: number;
exchange_rate_to_base: number | string;
amount_in_base: number | string;
category_id: null | number;
account_id: null | number;
transaction_date: string;
@@ -23,7 +25,7 @@ interface TransactionRow {
submitted_by: null | string;
approved_by: null | string;
approved_at: null | string;
is_deleted: number;
is_deleted: boolean;
deleted_at: null | string;
}
@@ -49,32 +51,24 @@ interface TransactionPayload {
}
export type TransactionStatus =
| 'draft'
| 'pending'
| 'approved'
| 'rejected'
| 'paid';
function getExchangeRateToBase(currency: string) {
if (currency === BASE_CURRENCY) {
return 1;
}
const stmt = db.prepare(
`SELECT rate FROM finance_exchange_rates WHERE from_currency = ? AND to_currency = ? ORDER BY date DESC LIMIT 1`,
);
const row = stmt.get(currency, BASE_CURRENCY) as undefined | { rate: number };
return row?.rate ?? 1;
}
| 'draft'
| 'paid'
| 'pending'
| 'rejected';
function mapTransaction(row: TransactionRow) {
const amount = Number(row.amount);
const exchangeRateToBase = Number(row.exchange_rate_to_base);
const amountInBase = Number(row.amount_in_base);
return {
id: row.id,
userId: 1,
type: 'expense' as const,
amount: Math.abs(row.amount),
type: row.type as 'expense' | 'income' | 'transfer',
amount: Math.abs(amount),
currency: row.currency,
exchangeRateToBase: row.exchange_rate_to_base,
amountInBase: Math.abs(row.amount_in_base),
exchangeRateToBase,
amountInBase: Math.abs(amountInBase),
categoryId: row.category_id ?? undefined,
accountId: row.account_id ?? undefined,
transactionDate: row.transaction_date,
@@ -94,231 +88,350 @@ function mapTransaction(row: TransactionRow) {
};
}
export function fetchTransactions(
async function getExchangeRateToBase(client: PoolClient, currency: string) {
if (currency === BASE_CURRENCY) {
return 1;
}
const result = await client.query<{ rate: number | string }>(
`SELECT rate
FROM finance_exchange_rates
WHERE from_currency = $1 AND to_currency = $2
ORDER BY date DESC
LIMIT 1`,
[currency, BASE_CURRENCY],
);
const raw = result.rows[0]?.rate;
return raw ? Number(raw) : 1;
}
export async function fetchTransactions(
options: {
includeDeleted?: boolean;
type?: string;
statuses?: TransactionStatus[];
type?: string;
} = {},
) {
const clauses: string[] = [];
const params: Record<string, unknown> = {};
const params: any[] = [];
if (!options.includeDeleted) {
clauses.push('is_deleted = 0');
clauses.push('is_deleted = FALSE');
}
if (options.type) {
clauses.push('type = @type');
params.type = options.type;
params.push(options.type);
clauses.push(`type = $${params.length}`);
}
if (options.statuses && options.statuses.length > 0) {
clauses.push(
`status IN (${options.statuses.map((_, index) => `@status${index}`).join(', ')})`,
);
options.statuses.forEach((status, index) => {
params[`status${index}`] = status;
const statusPlaceholders = options.statuses.map((status) => {
params.push(status);
return `$${params.length}`;
});
clauses.push(`status IN (${statusPlaceholders.join(', ')})`);
}
const where = clauses.length > 0 ? `WHERE ${clauses.join(' AND ')}` : '';
const stmt = db.prepare<TransactionRow>(
`SELECT id, type, amount, currency, exchange_rate_to_base, amount_in_base, category_id, account_id, transaction_date, description, project, memo, created_at, status, status_updated_at, reimbursement_batch, review_notes, submitted_by, approved_by, approved_at, is_deleted, deleted_at FROM finance_transactions ${where} ORDER BY transaction_date DESC, id DESC`,
const { rows } = await query<TransactionRow>(
`SELECT id,
type,
amount,
currency,
exchange_rate_to_base,
amount_in_base,
category_id,
account_id,
transaction_date,
description,
project,
memo,
created_at,
status,
status_updated_at,
reimbursement_batch,
review_notes,
submitted_by,
approved_by,
approved_at,
is_deleted,
deleted_at
FROM finance_transactions
${where}
ORDER BY transaction_date DESC, id DESC`,
params,
);
return stmt.all(params).map(mapTransaction);
return rows.map((row) => mapTransaction(row));
}
export function getTransactionById(id: number) {
const stmt = db.prepare<TransactionRow>(
`SELECT id, type, amount, currency, exchange_rate_to_base, amount_in_base, category_id, account_id, transaction_date, description, project, memo, created_at, status, status_updated_at, reimbursement_batch, review_notes, submitted_by, approved_by, approved_at, is_deleted, deleted_at FROM finance_transactions WHERE id = ?`,
export async function getTransactionById(id: number) {
const { rows } = await query<TransactionRow>(
`SELECT id,
type,
amount,
currency,
exchange_rate_to_base,
amount_in_base,
category_id,
account_id,
transaction_date,
description,
project,
memo,
created_at,
status,
status_updated_at,
reimbursement_batch,
review_notes,
submitted_by,
approved_by,
approved_at,
is_deleted,
deleted_at
FROM finance_transactions
WHERE id = $1`,
[id],
);
const row = stmt.get(id);
const row = rows[0];
return row ? mapTransaction(row) : null;
}
export function createTransaction(payload: TransactionPayload) {
const exchangeRate = getExchangeRateToBase(payload.currency);
const amountInBase = +(payload.amount * exchangeRate).toFixed(2);
const createdAt =
payload.createdAt && payload.createdAt.length > 0
? payload.createdAt
: new Date().toISOString();
const status: TransactionStatus = payload.status ?? 'approved';
const statusUpdatedAt =
payload.statusUpdatedAt && payload.statusUpdatedAt.length > 0
? payload.statusUpdatedAt
: createdAt;
const approvedAt =
payload.approvedAt && payload.approvedAt.length > 0
? payload.approvedAt
: status === 'approved' || status === 'paid'
? statusUpdatedAt
: null;
export async function createTransaction(payload: TransactionPayload) {
return withTransaction(async (client) => {
const exchangeRate = await getExchangeRateToBase(client, payload.currency);
const amountInBase = +(payload.amount * exchangeRate).toFixed(2);
const createdAt =
payload.createdAt && payload.createdAt.length > 0
? payload.createdAt
: new Date().toISOString();
const status: TransactionStatus = payload.status ?? 'approved';
const statusUpdatedAt =
payload.statusUpdatedAt && payload.statusUpdatedAt.length > 0
? payload.statusUpdatedAt
: createdAt;
let approvedAt: string | null = null;
if (payload.approvedAt && payload.approvedAt.length > 0) {
approvedAt = payload.approvedAt;
} else if (status === 'approved' || status === 'paid') {
approvedAt = statusUpdatedAt;
}
const stmt = db.prepare(
`INSERT INTO finance_transactions (type, amount, currency, exchange_rate_to_base, amount_in_base, category_id, account_id, transaction_date, description, project, memo, created_at, status, status_updated_at, reimbursement_batch, review_notes, submitted_by, approved_by, approved_at, is_deleted) VALUES (@type, @amount, @currency, @exchangeRateToBase, @amountInBase, @categoryId, @accountId, @transactionDate, @description, @project, @memo, @createdAt, @status, @statusUpdatedAt, @reimbursementBatch, @reviewNotes, @submittedBy, @approvedBy, @approvedAt, 0)`,
);
const info = stmt.run({
type: payload.type,
amount: payload.amount,
currency: payload.currency,
exchangeRateToBase: exchangeRate,
amountInBase,
categoryId: payload.categoryId ?? null,
accountId: payload.accountId ?? null,
transactionDate: payload.transactionDate,
description: payload.description ?? '',
project: payload.project ?? null,
memo: payload.memo ?? null,
createdAt,
status,
statusUpdatedAt,
reimbursementBatch: payload.reimbursementBatch ?? null,
reviewNotes: payload.reviewNotes ?? null,
submittedBy: payload.submittedBy ?? null,
approvedBy: payload.approvedBy ?? null,
approvedAt,
const { rows } = await client.query<TransactionRow>(
`INSERT INTO finance_transactions (
type,
amount,
currency,
exchange_rate_to_base,
amount_in_base,
category_id,
account_id,
transaction_date,
description,
project,
memo,
created_at,
status,
status_updated_at,
reimbursement_batch,
review_notes,
submitted_by,
approved_by,
approved_at,
is_deleted
)
VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11,
$12, $13, $14, $15, $16, $17, $18, $19, FALSE
)
RETURNING *`,
[
payload.type,
payload.amount,
payload.currency,
exchangeRate,
amountInBase,
payload.categoryId ?? null,
payload.accountId ?? null,
payload.transactionDate,
payload.description ?? '',
payload.project ?? null,
payload.memo ?? null,
createdAt,
status,
statusUpdatedAt,
payload.reimbursementBatch ?? null,
payload.reviewNotes ?? null,
payload.submittedBy ?? null,
payload.approvedBy ?? null,
approvedAt,
],
);
return mapTransaction(rows[0]);
});
return getTransactionById(Number(info.lastInsertRowid));
}
export function updateTransaction(id: number, payload: TransactionPayload) {
const current = getTransactionById(id);
export async function updateTransaction(
id: number,
payload: TransactionPayload,
) {
const current = await getTransactionById(id);
if (!current) {
return null;
}
const nextStatus = (payload.status ?? current.status ?? 'approved') as TransactionStatus;
const statusChanged = nextStatus !== current.status;
const statusUpdatedAt =
payload.statusUpdatedAt && payload.statusUpdatedAt.length > 0
? payload.statusUpdatedAt
: statusChanged
? new Date().toISOString()
: current.statusUpdatedAt ?? current.createdAt;
const approvedAt =
payload.approvedAt && payload.approvedAt.length > 0
? payload.approvedAt
: nextStatus === 'approved' || nextStatus === 'paid'
? current.approvedAt ?? (statusChanged ? statusUpdatedAt : null)
: null;
const approvedBy =
nextStatus === 'approved' || nextStatus === 'paid'
? payload.approvedBy ?? current.approvedBy ?? null
: payload.approvedBy ?? null;
return withTransaction(async (client) => {
const nextStatus = (payload.status ??
current.status ??
'approved') as TransactionStatus;
const statusChanged = nextStatus !== current.status;
let statusUpdatedAt: string;
if (payload.statusUpdatedAt && payload.statusUpdatedAt.length > 0) {
statusUpdatedAt = payload.statusUpdatedAt;
} else if (statusChanged) {
statusUpdatedAt = new Date().toISOString();
} else {
statusUpdatedAt = current.statusUpdatedAt ?? current.createdAt;
}
let approvedAt: string | null = null;
if (payload.approvedAt && payload.approvedAt.length > 0) {
approvedAt = payload.approvedAt;
} else if (nextStatus === 'approved' || nextStatus === 'paid') {
approvedAt = current.approvedAt ?? (statusChanged ? statusUpdatedAt : null);
}
const approvedBy =
nextStatus === 'approved' || nextStatus === 'paid'
? payload.approvedBy ?? current.approvedBy ?? null
: payload.approvedBy ?? null;
const next = {
type: payload.type ?? current.type,
amount: payload.amount ?? current.amount,
currency: payload.currency ?? current.currency,
categoryId: payload.categoryId ?? current.categoryId ?? null,
accountId: payload.accountId ?? current.accountId ?? null,
transactionDate: payload.transactionDate ?? current.transactionDate,
description: payload.description ?? current.description ?? '',
project: payload.project ?? current.project ?? null,
memo: payload.memo ?? current.memo ?? null,
isDeleted: payload.isDeleted ?? current.isDeleted,
status: nextStatus,
statusUpdatedAt,
reimbursementBatch:
payload.reimbursementBatch ?? current.reimbursementBatch ?? null,
reviewNotes: payload.reviewNotes ?? current.reviewNotes ?? null,
submittedBy: payload.submittedBy ?? current.submittedBy ?? null,
approvedBy,
approvedAt,
};
const next = {
type: payload.type ?? current.type,
amount: payload.amount ?? current.amount,
currency: payload.currency ?? current.currency,
categoryId: payload.categoryId ?? current.categoryId ?? null,
accountId: payload.accountId ?? current.accountId ?? null,
transactionDate: payload.transactionDate ?? current.transactionDate,
description: payload.description ?? current.description ?? '',
project: payload.project ?? current.project ?? null,
memo: payload.memo ?? current.memo ?? null,
isDeleted: payload.isDeleted ?? current.isDeleted,
status: nextStatus,
statusUpdatedAt,
reimbursementBatch:
payload.reimbursementBatch ?? current.reimbursementBatch ?? null,
reviewNotes: payload.reviewNotes ?? current.reviewNotes ?? null,
submittedBy: payload.submittedBy ?? current.submittedBy ?? null,
approvedBy,
approvedAt,
};
const exchangeRate = getExchangeRateToBase(next.currency);
const amountInBase = +(next.amount * exchangeRate).toFixed(2);
const exchangeRate = await getExchangeRateToBase(client, next.currency);
const amountInBase = +(next.amount * exchangeRate).toFixed(2);
const deletedAt = next.isDeleted ? new Date().toISOString() : null;
const stmt = db.prepare(
`UPDATE finance_transactions SET type = @type, amount = @amount, currency = @currency, exchange_rate_to_base = @exchangeRateToBase, amount_in_base = @amountInBase, category_id = @categoryId, account_id = @accountId, transaction_date = @transactionDate, description = @description, project = @project, memo = @memo, status = @status, status_updated_at = @statusUpdatedAt, reimbursement_batch = @reimbursementBatch, review_notes = @reviewNotes, submitted_by = @submittedBy, approved_by = @approvedBy, approved_at = @approvedAt, is_deleted = @isDeleted, deleted_at = @deletedAt WHERE id = @id`,
);
const { rows } = await client.query<TransactionRow>(
`UPDATE finance_transactions
SET type = $1,
amount = $2,
currency = $3,
exchange_rate_to_base = $4,
amount_in_base = $5,
category_id = $6,
account_id = $7,
transaction_date = $8,
description = $9,
project = $10,
memo = $11,
status = $12,
status_updated_at = $13,
reimbursement_batch = $14,
review_notes = $15,
submitted_by = $16,
approved_by = $17,
approved_at = $18,
is_deleted = $19,
deleted_at = $20
WHERE id = $21
RETURNING *`,
[
next.type,
next.amount,
next.currency,
exchangeRate,
amountInBase,
next.categoryId,
next.accountId,
next.transactionDate,
next.description,
next.project,
next.memo,
next.status,
next.statusUpdatedAt,
next.reimbursementBatch,
next.reviewNotes,
next.submittedBy,
next.approvedBy,
next.approvedAt,
next.isDeleted,
deletedAt,
id,
],
);
const deletedAt = next.isDeleted ? new Date().toISOString() : null;
stmt.run({
id,
type: next.type,
amount: next.amount,
currency: next.currency,
exchangeRateToBase: exchangeRate,
amountInBase,
categoryId: next.categoryId,
accountId: next.accountId,
transactionDate: next.transactionDate,
description: next.description,
project: next.project,
memo: next.memo,
status: next.status,
statusUpdatedAt: next.statusUpdatedAt,
reimbursementBatch: next.reimbursementBatch,
reviewNotes: next.reviewNotes,
submittedBy: next.submittedBy,
approvedBy: next.approvedBy,
approvedAt: next.approvedAt,
isDeleted: next.isDeleted ? 1 : 0,
deletedAt,
return mapTransaction(rows[0]);
});
return getTransactionById(id);
}
export function softDeleteTransaction(id: number) {
const stmt = db.prepare(
`UPDATE finance_transactions SET is_deleted = 1, deleted_at = @deletedAt WHERE id = @id`,
export async function softDeleteTransaction(id: number) {
const deletedAt = new Date().toISOString();
const { rows } = await query<TransactionRow>(
`UPDATE finance_transactions
SET is_deleted = TRUE, deleted_at = $1
WHERE id = $2
RETURNING *`,
[deletedAt, id],
);
stmt.run({ id, deletedAt: new Date().toISOString() });
return getTransactionById(id);
const row = rows[0];
return row ? mapTransaction(row) : null;
}
export function restoreTransaction(id: number) {
const stmt = db.prepare(
`UPDATE finance_transactions SET is_deleted = 0, deleted_at = NULL WHERE id = @id`,
export async function restoreTransaction(id: number) {
const { rows } = await query<TransactionRow>(
`UPDATE finance_transactions
SET is_deleted = FALSE, deleted_at = NULL
WHERE id = $1
RETURNING *`,
[id],
);
stmt.run({ id });
return getTransactionById(id);
const row = rows[0];
return row ? mapTransaction(row) : null;
}
export function replaceAllTransactions(
export async function replaceAllTransactions(
rows: Array<{
accountId: null | number;
amount: number;
approvedAt?: null | string;
approvedBy?: null | string;
categoryId: null | number;
createdAt?: string;
currency: string;
description: string;
isDeleted?: boolean;
memo?: null | string;
project?: null | string;
transactionDate: string;
type: string;
status?: TransactionStatus;
statusUpdatedAt?: string;
reimbursementBatch?: null | string;
reviewNotes?: null | string;
status?: TransactionStatus;
statusUpdatedAt?: string;
submittedBy?: null | string;
approvedBy?: null | string;
approvedAt?: null | string;
isDeleted?: boolean;
transactionDate: string;
type: string;
}>,
) {
db.prepare('DELETE FROM finance_transactions').run();
await withTransaction(async (client) => {
await client.query(
'TRUNCATE TABLE finance_transactions RESTART IDENTITY CASCADE',
);
const insert = db.prepare(
`INSERT INTO finance_transactions (type, amount, currency, exchange_rate_to_base, amount_in_base, category_id, account_id, transaction_date, description, project, memo, created_at, status, status_updated_at, reimbursement_batch, review_notes, submitted_by, approved_by, approved_at, is_deleted) VALUES (@type, @amount, @currency, @exchangeRateToBase, @amountInBase, @categoryId, @accountId, @transactionDate, @description, @project, @memo, @createdAt, @status, @statusUpdatedAt, @reimbursementBatch, @reviewNotes, @submittedBy, @approvedBy, @approvedAt, @isDeleted)`,
);
const getRate = db.prepare(
`SELECT rate FROM finance_exchange_rates WHERE from_currency = ? AND to_currency = 'CNY' ORDER BY date DESC LIMIT 1`,
);
const insertMany = db.transaction((items: Array<any>) => {
for (const item of items) {
const row = getRate.get(item.currency) as undefined | { rate: number };
const rate = row?.rate ?? 1;
for (const item of rows) {
const rate = await getExchangeRateToBase(client, item.currency);
const amountInBase = +(item.amount * rate).toFixed(2);
const createdAt =
item.createdAt ??
@@ -326,38 +439,67 @@ export function replaceAllTransactions(
const status = item.status ?? 'approved';
const statusUpdatedAt =
item.statusUpdatedAt ??
new Date(
`${item.transactionDate}T00:00:00Z`,
).toISOString();
new Date(`${item.transactionDate}T00:00:00Z`).toISOString();
const approvedAt =
item.approvedAt ??
(status === 'approved' || status === 'paid' ? statusUpdatedAt : null);
insert.run({
...item,
exchangeRateToBase: rate,
amountInBase,
project: item.project ?? null,
memo: item.memo ?? null,
createdAt,
status,
statusUpdatedAt,
reimbursementBatch: item.reimbursementBatch ?? null,
reviewNotes: item.reviewNotes ?? null,
submittedBy: item.submittedBy ?? null,
approvedBy:
await client.query(
`INSERT INTO finance_transactions (
type,
amount,
currency,
exchange_rate_to_base,
amount_in_base,
category_id,
account_id,
transaction_date,
description,
project,
memo,
created_at,
status,
status_updated_at,
reimbursement_batch,
review_notes,
submitted_by,
approved_by,
approved_at,
is_deleted
)
VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
$11, $12, $13, $14, $15, $16, $17, $18, $19, $20
)`,
[
item.type,
item.amount,
item.currency,
rate,
amountInBase,
item.categoryId ?? null,
item.accountId ?? null,
item.transactionDate,
item.description ?? '',
item.project ?? null,
item.memo ?? null,
createdAt,
status,
statusUpdatedAt,
item.reimbursementBatch ?? null,
item.reviewNotes ?? null,
item.submittedBy ?? null,
status === 'approved' || status === 'paid'
? item.approvedBy ?? null
? (item.approvedBy ?? null)
: null,
approvedAt,
isDeleted: item.isDeleted ? 1 : 0,
});
approvedAt,
item.isDeleted ?? false,
],
);
}
});
insertMany(rows);
}
// 分类相关函数
interface CategoryRow {
id: number;
name: string;
@@ -365,7 +507,7 @@ interface CategoryRow {
icon: null | string;
color: null | string;
user_id: null | number;
is_active: number;
is_active: boolean;
}
function mapCategory(row: CategoryRow) {
@@ -382,15 +524,53 @@ function mapCategory(row: CategoryRow) {
};
}
export function fetchCategories(options: { type?: 'expense' | 'income' } = {}) {
const where = options.type
? `WHERE type = @type AND is_active = 1`
: 'WHERE is_active = 1';
const params = options.type ? { type: options.type } : {};
const stmt = db.prepare<CategoryRow>(
`SELECT id, name, type, icon, color, user_id, is_active FROM finance_categories ${where} ORDER BY id ASC`,
export async function fetchCategories(
options: { type?: 'expense' | 'income' } = {},
) {
const params: any[] = [];
const clauses: string[] = ['is_active = TRUE'];
if (options.type) {
params.push(options.type);
clauses.push(`type = $${params.length}`);
}
const where = clauses.length > 0 ? `WHERE ${clauses.join(' AND ')}` : '';
const { rows } = await query<CategoryRow>(
`SELECT id,
name,
type,
icon,
color,
user_id,
is_active
FROM finance_categories
${where}
ORDER BY id ASC`,
params,
);
return stmt.all(params).map(mapCategory);
return rows.map((row) => mapCategory(row));
}
export async function getAccountById(id: number) {
const { rows } = await query<{
currency: string;
id: number;
name: string;
}>(
`SELECT id, name, currency
FROM finance_accounts
WHERE id = $1`,
[id],
);
return rows[0] ?? null;
}
export async function getCategoryById(id: number) {
const { rows } = await query<CategoryRow>(
`SELECT id, name, type, icon, color, user_id, is_active
FROM finance_categories
WHERE id = $1`,
[id],
);
const row = rows[0];
return row ? mapCategory(row) : null;
}

View File

@@ -1,6 +1,6 @@
import { existsSync } from 'node:fs';
import db from './sqlite';
import { query } from './db';
interface MediaRow {
id: number;
@@ -47,7 +47,7 @@ export interface MediaMessage {
createdAt: string;
updatedAt: string;
available: boolean;
downloadUrl: string | null;
downloadUrl: null | string;
}
function mapMediaRow(row: MediaRow): MediaMessage {
@@ -78,40 +78,85 @@ function mapMediaRow(row: MediaRow): MediaMessage {
};
}
export function fetchMediaMessages(params: {
limit?: number;
fileTypes?: string[];
} = {}) {
const clauses: string[] = [];
const bindParams: Record<string, unknown> = {};
export async function fetchMediaMessages(
params: {
fileTypes?: string[];
limit?: number;
} = {},
) {
const whereClauses: string[] = [];
const queryParams: any[] = [];
if (params.fileTypes && params.fileTypes.length > 0) {
clauses.push(
`file_type IN (${params.fileTypes.map((_, index) => `@type${index}`).join(', ')})`,
);
params.fileTypes.forEach((type, index) => {
bindParams[`type${index}`] = type;
const placeholders = params.fileTypes.map((type) => {
queryParams.push(type);
return `$${queryParams.length}`;
});
whereClauses.push(`file_type IN (${placeholders.join(', ')})`);
}
const where = clauses.length > 0 ? `WHERE ${clauses.join(' AND ')}` : '';
const where =
whereClauses.length > 0 ? `WHERE ${whereClauses.join(' AND ')}` : '';
const limitClause =
params.limit && params.limit > 0 ? `LIMIT ${Number(params.limit)}` : '';
const stmt = db.prepare<MediaRow>(
`SELECT id, chat_id, message_id, user_id, username, display_name, file_type, file_id, file_unique_id, caption, file_name, file_path, file_size, mime_type, duration, width, height, forwarded_to, created_at, updated_at FROM finance_media_messages ${where} ORDER BY datetime(created_at) DESC, id DESC ${limitClause}`,
const { rows } = await query<MediaRow>(
`SELECT id,
chat_id,
message_id,
user_id,
username,
display_name,
file_type,
file_id,
file_unique_id,
caption,
file_name,
file_path,
file_size,
mime_type,
duration,
width,
height,
forwarded_to,
created_at,
updated_at
FROM finance_media_messages
${where}
ORDER BY created_at DESC, id DESC
${limitClause}`,
queryParams,
);
return stmt.all(bindParams).map(mapMediaRow);
return rows.map((row) => mapMediaRow(row));
}
export function getMediaMessageById(id: number) {
const stmt = db.prepare<MediaRow>(
`SELECT id, chat_id, message_id, user_id, username, display_name, file_type, file_id, file_unique_id, caption, file_name, file_path, file_size, mime_type, duration, width, height, forwarded_to, created_at, updated_at FROM finance_media_messages WHERE id = ?`,
export async function getMediaMessageById(id: number) {
const { rows } = await query<MediaRow>(
`SELECT id,
chat_id,
message_id,
user_id,
username,
display_name,
file_type,
file_id,
file_unique_id,
caption,
file_name,
file_path,
file_size,
mime_type,
duration,
width,
height,
forwarded_to,
created_at,
updated_at
FROM finance_media_messages
WHERE id = $1`,
[id],
);
const row = stmt.get(id);
const row = rows[0];
return row ? mapMediaRow(row) : null;
}

View File

@@ -1,248 +0,0 @@
import { mkdirSync } from 'node:fs';
import Database from 'better-sqlite3';
import { dirname, join } from 'pathe';
const dbFile = join(process.cwd(), 'storage', 'finance.db');
mkdirSync(dirname(dbFile), { recursive: true });
const database = new Database(dbFile);
function assertIdentifier(name: string) {
if (!/^[A-Za-z_][A-Za-z0-9_]*$/.test(name)) {
throw new Error(`Invalid identifier: ${name}`);
}
return name;
}
function ensureColumn(table: string, column: string, definition: string) {
const safeTable = assertIdentifier(table);
const safeColumn = assertIdentifier(column);
const columns = database
.prepare<{ name: string }>(`PRAGMA table_info(${safeTable})`)
.all();
if (!columns.some((item) => item.name === safeColumn)) {
database.exec(`ALTER TABLE ${safeTable} ADD COLUMN ${definition}`);
}
}
database.pragma('journal_mode = WAL');
database.exec(`
CREATE TABLE IF NOT EXISTS finance_currencies (
code TEXT PRIMARY KEY,
name TEXT NOT NULL,
symbol TEXT NOT NULL,
is_base INTEGER NOT NULL DEFAULT 0,
is_active INTEGER NOT NULL DEFAULT 1
);
`);
database.exec(`
CREATE TABLE IF NOT EXISTS finance_exchange_rates (
id INTEGER PRIMARY KEY AUTOINCREMENT,
from_currency TEXT NOT NULL,
to_currency TEXT NOT NULL,
rate REAL NOT NULL,
date TEXT NOT NULL,
source TEXT DEFAULT 'manual'
);
`);
database.exec(`
CREATE TABLE IF NOT EXISTS finance_accounts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
currency TEXT NOT NULL,
type TEXT DEFAULT 'cash',
icon TEXT,
color TEXT,
user_id INTEGER DEFAULT 1,
is_active INTEGER DEFAULT 1
);
`);
database.exec(`
CREATE TABLE IF NOT EXISTS finance_categories (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
type TEXT NOT NULL,
icon TEXT,
color TEXT,
user_id INTEGER DEFAULT 1,
is_active INTEGER DEFAULT 1
);
`);
database.exec(`
CREATE TABLE IF NOT EXISTS finance_transactions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
type TEXT NOT NULL,
amount REAL NOT NULL,
currency TEXT NOT NULL,
exchange_rate_to_base REAL NOT NULL,
amount_in_base REAL NOT NULL,
category_id INTEGER,
account_id INTEGER,
transaction_date TEXT NOT NULL,
description TEXT,
project TEXT,
memo TEXT,
created_at TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'approved',
status_updated_at TEXT,
reimbursement_batch TEXT,
review_notes TEXT,
submitted_by TEXT,
approved_by TEXT,
approved_at TEXT,
is_deleted INTEGER NOT NULL DEFAULT 0,
deleted_at TEXT,
FOREIGN KEY (currency) REFERENCES finance_currencies(code),
FOREIGN KEY (category_id) REFERENCES finance_categories(id),
FOREIGN KEY (account_id) REFERENCES finance_accounts(id)
);
`);
ensureColumn(
'finance_transactions',
'status',
"status TEXT NOT NULL DEFAULT 'approved'",
);
ensureColumn('finance_transactions', 'status_updated_at', 'status_updated_at TEXT');
ensureColumn(
'finance_transactions',
'reimbursement_batch',
'reimbursement_batch TEXT',
);
ensureColumn('finance_transactions', 'review_notes', 'review_notes TEXT');
ensureColumn('finance_transactions', 'submitted_by', 'submitted_by TEXT');
ensureColumn('finance_transactions', 'approved_by', 'approved_by TEXT');
ensureColumn('finance_transactions', 'approved_at', 'approved_at TEXT');
database.exec(`
CREATE TABLE IF NOT EXISTS finance_media_messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
chat_id INTEGER NOT NULL,
message_id INTEGER NOT NULL,
user_id INTEGER NOT NULL,
username TEXT,
display_name TEXT,
file_type TEXT NOT NULL,
file_id TEXT NOT NULL,
file_unique_id TEXT,
caption TEXT,
file_name TEXT,
file_path TEXT NOT NULL,
file_size INTEGER,
mime_type TEXT,
duration INTEGER,
width INTEGER,
height INTEGER,
forwarded_to INTEGER,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
UNIQUE(chat_id, message_id)
);
`);
database.exec(`
CREATE INDEX IF NOT EXISTS idx_finance_media_messages_created_at
ON finance_media_messages (created_at DESC);
`);
database.exec(`
CREATE INDEX IF NOT EXISTS idx_finance_media_messages_user_id
ON finance_media_messages (user_id);
`);
// Telegram通知配置表
database.exec(`
CREATE TABLE IF NOT EXISTS telegram_notification_configs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
bot_token TEXT NOT NULL,
chat_id TEXT NOT NULL,
notification_types TEXT NOT NULL,
is_enabled INTEGER NOT NULL DEFAULT 1,
priority TEXT DEFAULT 'normal',
rate_limit_seconds INTEGER DEFAULT 0,
batch_enabled INTEGER DEFAULT 0,
batch_interval_minutes INTEGER DEFAULT 60,
retry_enabled INTEGER DEFAULT 1,
retry_max_attempts INTEGER DEFAULT 3,
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
`);
database.exec(`
CREATE INDEX IF NOT EXISTS idx_telegram_notification_configs_enabled
ON telegram_notification_configs (is_enabled);
`);
// 通知发送历史表(用于频率控制和去重)
database.exec(`
CREATE TABLE IF NOT EXISTS telegram_notification_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
config_id INTEGER NOT NULL,
notification_type TEXT NOT NULL,
content_hash TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
retry_count INTEGER DEFAULT 0,
sent_at TEXT,
error_message TEXT,
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (config_id) REFERENCES telegram_notification_configs(id)
);
`);
database.exec(`
CREATE INDEX IF NOT EXISTS idx_telegram_notification_history_config
ON telegram_notification_history (config_id, created_at DESC);
`);
database.exec(`
CREATE INDEX IF NOT EXISTS idx_telegram_notification_history_hash
ON telegram_notification_history (content_hash, created_at DESC);
`);
database.exec(`
CREATE INDEX IF NOT EXISTS idx_telegram_notification_history_status
ON telegram_notification_history (status, retry_count);
`);
// 确保添加新列到已存在的表
ensureColumn(
'telegram_notification_configs',
'priority',
"priority TEXT DEFAULT 'normal'",
);
ensureColumn(
'telegram_notification_configs',
'rate_limit_seconds',
'rate_limit_seconds INTEGER DEFAULT 0',
);
ensureColumn(
'telegram_notification_configs',
'batch_enabled',
'batch_enabled INTEGER DEFAULT 0',
);
ensureColumn(
'telegram_notification_configs',
'batch_interval_minutes',
'batch_interval_minutes INTEGER DEFAULT 60',
);
ensureColumn(
'telegram_notification_configs',
'retry_enabled',
'retry_enabled INTEGER DEFAULT 1',
);
ensureColumn(
'telegram_notification_configs',
'retry_max_attempts',
'retry_max_attempts INTEGER DEFAULT 3',
);
export default database;

View File

@@ -1,491 +1,21 @@
import crypto from 'node:crypto';
import db from './sqlite';
import {
getEnabledNotificationConfigs,
notifyTransaction,
testTelegramConfig,
} from './telegram-bot';
interface TelegramNotificationConfig {
id: number;
name: string;
botToken: string;
chatId: string;
notificationTypes: string[];
isEnabled: boolean;
priority: string;
rateLimitSeconds: number;
batchEnabled: boolean;
batchIntervalMinutes: number;
retryEnabled: boolean;
retryMaxAttempts: number;
}
export { getEnabledNotificationConfigs, testTelegramConfig };
interface TransactionNotificationData {
id: number;
type: string;
amount: number;
currency: string;
categoryName?: string;
accountName?: string;
transactionDate: string;
description?: string;
status: string;
}
/**
* 生成消息内容hash用于去重
*/
function generateContentHash(content: string): string {
return crypto.createHash('md5').update(content).digest('hex');
}
/**
* 检查频率限制
*/
function checkRateLimit(configId: number, rateLimitSeconds: number): boolean {
if (rateLimitSeconds <= 0) {
return true; // 无限制
}
const cutoffTime = new Date(
Date.now() - rateLimitSeconds * 1000,
).toISOString();
const recent = db
.prepare<{ count: number }>(
`
SELECT COUNT(*) as count
FROM telegram_notification_history
WHERE config_id = ? AND status = 'sent' AND sent_at > ?
`,
)
.get(configId, cutoffTime);
return (recent?.count || 0) === 0;
}
/**
* 检查是否为重复消息
*/
function isDuplicateMessage(
configId: number,
contentHash: string,
withinMinutes: number = 5,
): boolean {
const cutoffTime = new Date(Date.now() - withinMinutes * 60 * 1000).toISOString();
const duplicate = db
.prepare<{ count: number }>(
`
SELECT COUNT(*) as count
FROM telegram_notification_history
WHERE config_id = ? AND content_hash = ? AND created_at > ?
`,
)
.get(configId, contentHash, cutoffTime);
return (duplicate?.count || 0) > 0;
}
/**
* 记录通知历史
*/
function recordNotification(
configId: number,
notificationType: string,
contentHash: string,
status: 'pending' | 'sent' | 'failed',
errorMessage?: string,
): number {
const now = new Date().toISOString();
const result = db
.prepare<unknown, [number, string, string, string, string | null, string | null, string]>(
`
INSERT INTO telegram_notification_history
(config_id, notification_type, content_hash, status, sent_at, error_message, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
`,
)
.run(
configId,
notificationType,
contentHash,
status,
status === 'sent' ? now : null,
errorMessage || null,
now,
);
return result.lastInsertRowid as number;
}
/**
* 更新通知状态
*/
function updateNotificationStatus(
historyId: number,
status: 'sent' | 'failed',
retryCount: number = 0,
errorMessage?: string,
): void {
const now = new Date().toISOString();
db.prepare(
`
UPDATE telegram_notification_history
SET status = ?, retry_count = ?, sent_at = ?, error_message = ?
WHERE id = ?
`,
).run(status, retryCount, status === 'sent' ? now : null, errorMessage || null, historyId);
}
/**
* 获取待重试的通知
*/
function getPendingRetries(): Array<{
id: number;
configId: number;
contentHash: string;
retryCount: number;
}> {
return db
.prepare<{ id: number; config_id: number; content_hash: string; retry_count: number }>(
`
SELECT h.id, h.config_id, h.content_hash, h.retry_count
FROM telegram_notification_history h
JOIN telegram_notification_configs c ON h.config_id = c.id
WHERE h.status = 'failed'
AND c.retry_enabled = 1
AND h.retry_count < c.retry_max_attempts
AND h.created_at > datetime('now', '-24 hours')
ORDER BY h.created_at ASC
LIMIT 10
`,
)
.all()
.map((row) => ({
id: row.id,
configId: row.config_id,
contentHash: row.content_hash,
retryCount: row.retry_count,
}));
}
/**
* 获取所有启用的Telegram通知配置增强版
*/
export function getEnabledNotificationConfigs(
notificationType: string = 'transaction',
): TelegramNotificationConfig[] {
const rows = db
.prepare<{
id: number;
name: string;
bot_token: string;
chat_id: string;
notification_types: string;
is_enabled: number;
priority: string;
rate_limit_seconds: number;
batch_enabled: number;
batch_interval_minutes: number;
retry_enabled: number;
retry_max_attempts: number;
}>(
`
SELECT id, name, bot_token, chat_id, notification_types, is_enabled,
priority, rate_limit_seconds, batch_enabled, batch_interval_minutes,
retry_enabled, retry_max_attempts
FROM telegram_notification_configs
WHERE is_enabled = 1
`,
)
.all();
return rows
.map((row) => ({
id: row.id,
name: row.name,
botToken: row.bot_token,
chatId: row.chat_id,
notificationTypes: JSON.parse(row.notification_types) as string[],
isEnabled: row.is_enabled === 1,
priority: row.priority || 'normal',
rateLimitSeconds: row.rate_limit_seconds || 0,
batchEnabled: (row.batch_enabled || 0) === 1,
batchIntervalMinutes: row.batch_interval_minutes || 60,
retryEnabled: (row.retry_enabled || 1) === 1,
retryMaxAttempts: row.retry_max_attempts || 3,
}))
.filter((config) => config.notificationTypes.includes(notificationType));
}
/**
* 格式化交易金额
*/
function formatAmount(amount: number, currency: string): string {
const formatted = amount.toLocaleString('zh-CN', {
minimumFractionDigits: 2,
maximumFractionDigits: 2,
});
return `${currency} ${formatted}`;
}
/**
* 格式化交易类型
*/
function formatTransactionType(type: string): string {
const typeMap: Record<string, string> = {
income: '💰 收入',
expense: '💸 支出',
transfer: '🔄 转账',
};
return typeMap[type] || type;
}
/**
* 格式化交易状态
*/
function formatTransactionStatus(status: string): string {
const statusMap: Record<string, string> = {
draft: '📝 草稿',
pending: '⏳ 待审核',
approved: '✅ 已批准',
rejected: '❌ 已拒绝',
paid: '💵 已支付',
};
return statusMap[status] || status;
}
/**
* 格式化优先级标识
*/
function formatPriority(priority: string): string {
const priorityMap: Record<string, string> = {
low: '🔵',
normal: '⚪',
high: '🟡',
urgent: '🔴',
};
return priorityMap[priority] || '⚪';
}
/**
* 构建交易通知消息
*/
function buildTransactionMessage(
transaction: TransactionNotificationData,
action: string = 'created',
priority: string = 'normal',
): string {
const actionMap: Record<string, string> = {
created: '📋 新增账目记录',
updated: '✏️ 更新账目记录',
deleted: '🗑️ 删除账目记录',
};
const priorityIcon = formatPriority(priority);
const lines: string[] = [
`${priorityIcon} ${actionMap[action] || '📋 账目记录'}`,
'',
`类型:${formatTransactionType(transaction.type)}`,
`金额:${formatAmount(transaction.amount, transaction.currency)}`,
`日期:${transaction.transactionDate}`,
];
if (transaction.categoryName) {
lines.push(`分类:${transaction.categoryName}`);
}
if (transaction.accountName) {
lines.push(`账户:${transaction.accountName}`);
}
lines.push(`状态:${formatTransactionStatus(transaction.status)}`);
if (transaction.description) {
lines.push(``, `备注:${transaction.description}`);
}
lines.push(
``,
`🕐 记录时间:${new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' })}`,
);
return lines.join('\n');
}
/**
* 发送Telegram消息带重试
*/
async function sendTelegramMessage(
botToken: string,
chatId: string,
message: string,
retryCount: number = 0,
): Promise<{ success: boolean; error?: string }> {
try {
const url = `https://api.telegram.org/bot${botToken}/sendMessage`;
const response = await fetch(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
chat_id: chatId,
text: message,
parse_mode: 'HTML',
}),
});
if (!response.ok) {
const error = await response.json().catch(() => ({ description: 'Unknown error' }));
const errorMsg = error.description || `HTTP ${response.status}`;
console.error(
'[telegram-bot-enhanced] Failed to send message:',
response.status,
errorMsg,
);
return { success: false, error: errorMsg };
}
return { success: true };
} catch (error: unknown) {
const errorMsg = error instanceof Error ? error.message : 'Unknown error';
console.error('[telegram-bot-enhanced] Error sending message:', errorMsg);
return { success: false, error: errorMsg };
}
}
/**
* 通知交易记录(增强版 - 带频率控制、去重、重试)
*/
export async function notifyTransactionEnhanced(
transaction: TransactionNotificationData,
action: string = 'created',
): Promise<void> {
const configs = getEnabledNotificationConfigs('transaction');
if (configs.length === 0) {
console.log('[telegram-bot-enhanced] No enabled notification configs found');
return;
}
for (const config of configs) {
// 1. 检查频率限制
if (!checkRateLimit(config.id, config.rateLimitSeconds)) {
console.log(
`[telegram-bot-enhanced] Rate limit exceeded for config: ${config.name}`,
);
continue;
}
// 2. 构建消息
const message = buildTransactionMessage(transaction, action, config.priority);
const contentHash = generateContentHash(message);
// 3. 检查重复消息
if (isDuplicateMessage(config.id, contentHash)) {
console.log(
`[telegram-bot-enhanced] Duplicate message detected for config: ${config.name}`,
);
continue;
}
// 4. 记录通知历史
const historyId = recordNotification(
config.id,
'transaction',
contentHash,
'pending',
);
// 5. 发送消息
const result = await sendTelegramMessage(
config.botToken,
config.chatId,
message,
);
// 6. 更新状态
if (result.success) {
updateNotificationStatus(historyId, 'sent');
console.log(
`[telegram-bot-enhanced] Sent notification via config: ${config.name}`,
);
} else {
updateNotificationStatus(historyId, 'failed', 0, result.error);
console.error(
`[telegram-bot-enhanced] Failed to send notification via config: ${config.name}, error: ${result.error}`,
);
}
}
...args: Parameters<typeof notifyTransaction>
) {
await notifyTransaction(...args);
}
/**
* 重试失败的通知
*/
export async function retryFailedNotifications(): Promise<void> {
const pending = getPendingRetries();
if (pending.length === 0) {
return;
}
console.log(
`[telegram-bot-enhanced] Retrying ${pending.length} failed notifications`,
);
for (const item of pending) {
// 获取配置
const config = db
.prepare<{
bot_token: string;
chat_id: string;
priority: string;
}>(
'SELECT bot_token, chat_id, priority FROM telegram_notification_configs WHERE id = ?',
)
.get(item.configId);
if (!config) {
continue;
}
// 注意:这里需要重新构建消息或从历史中获取
// 简化处理:发送重试通知
const retryMessage = `🔄 通知重试 (尝试 ${item.retryCount + 1})`;
const result = await sendTelegramMessage(
config.bot_token,
config.chat_id,
retryMessage,
item.retryCount,
);
if (result.success) {
updateNotificationStatus(item.id, 'sent', item.retryCount + 1);
console.log(`[telegram-bot-enhanced] Retry successful for history ID: ${item.id}`);
} else {
updateNotificationStatus(
item.id,
'failed',
item.retryCount + 1,
result.error,
);
console.error(
`[telegram-bot-enhanced] Retry failed for history ID: ${item.id}`,
);
}
}
}
/**
* 测试Telegram Bot配置
*/
export async function testTelegramConfig(
botToken: string,
chatId: string,
): Promise<{ success: boolean; error?: string }> {
const testMessage = `🤖 KT财务系统\n\n✅ Telegram通知配置测试成功\n\n🕐 ${new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' })}`;
return await sendTelegramMessage(botToken, chatId, testMessage);
// Retrying logic is not yet implemented for the PostgreSQL data source.
// The SQLite-specific implementation relied on synchronous database access.
// If this functionality becomes necessary, please implement it using the
// telegram_notification_history table with pool-based transactions.
console.warn('[telegram-bot-enhanced] retryFailedNotifications is not implemented.');
}

View File

@@ -1,4 +1,4 @@
import db from './sqlite';
import { query } from './db';
interface TelegramNotificationConfig {
id: number;
@@ -24,18 +24,21 @@ interface TransactionNotificationData {
/**
* 获取所有启用的Telegram通知配置
*/
export function getEnabledNotificationConfigs(
export async function getEnabledNotificationConfigs(
notificationType: string = 'transaction',
): TelegramNotificationConfig[] {
const rows = db
.prepare<{ id: number; name: string; bot_token: string; chat_id: string; notification_types: string; is_enabled: number }>(
`
SELECT id, name, bot_token, chat_id, notification_types, is_enabled
FROM telegram_notification_configs
WHERE is_enabled = 1
`,
)
.all();
): Promise<TelegramNotificationConfig[]> {
const { rows } = await query<{
bot_token: string;
chat_id: string;
id: number;
is_enabled: boolean;
name: string;
notification_types: string;
}>(
`SELECT id, name, bot_token, chat_id, notification_types, is_enabled
FROM telegram_notification_configs
WHERE is_enabled = TRUE`,
);
return rows
.map((row) => ({
@@ -44,7 +47,7 @@ export function getEnabledNotificationConfigs(
botToken: row.bot_token,
chatId: row.chat_id,
notificationTypes: JSON.parse(row.notification_types) as string[],
isEnabled: row.is_enabled === 1,
isEnabled: row.is_enabled,
}))
.filter((config) => config.notificationTypes.includes(notificationType));
}
@@ -175,10 +178,10 @@ export async function notifyTransaction(
transaction: TransactionNotificationData,
action: string = 'created',
): Promise<void> {
const configs = getEnabledNotificationConfigs('transaction');
const configs = await getEnabledNotificationConfigs('transaction');
if (configs.length === 0) {
console.log('[telegram-bot] No enabled notification configs found');
console.warn('[telegram-bot] No enabled notification configs found');
return;
}
@@ -192,7 +195,7 @@ export async function notifyTransaction(
results.forEach((result, index) => {
if (result.status === 'fulfilled' && result.value) {
console.log(
console.warn(
`[telegram-bot] Sent notification via config: ${configs[index].name}`,
);
} else {
@@ -209,17 +212,18 @@ export async function notifyTransaction(
export async function testTelegramConfig(
botToken: string,
chatId: string,
): Promise<{ success: boolean; error?: string }> {
): Promise<{ error?: string; success: boolean }> {
try {
const testMessage = `🤖 KT财务系统\n\n✅ Telegram通知配置测试成功\n\n🕐 ${new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' })}`;
const success = await sendTelegramMessage(botToken, chatId, testMessage);
if (success) {
return { success: true };
} else {
return { success: false, error: '发送消息失败请检查Bot Token和Chat ID' };
}
return success
? { success: true }
: {
success: false,
error: '发送消息失败请检查Bot Token和Chat ID',
};
} catch (error: unknown) {
return {
success: false,