From 657d9631f34b40270d5e39e9c8d072c2180c75ce Mon Sep 17 00:00:00 2001 From: woshiqp465 Date: Fri, 7 Nov 2025 20:11:52 +0800 Subject: [PATCH] feat: initial telethon monitor --- .env.example | 8 ++ .gitignore | 23 +++++ README.md | 80 +++++++++++++++++ keywords.yaml | 12 +++ requirements.txt | 4 + src/__init__.py | 0 src/config.py | 79 +++++++++++++++++ src/group_monitor.py | 204 +++++++++++++++++++++++++++++++++++++++++++ src/keywords.py | 98 +++++++++++++++++++++ src/main.py | 66 ++++++++++++++ src/reporter.py | 54 ++++++++++++ 11 files changed, 628 insertions(+) create mode 100644 .env.example create mode 100644 .gitignore create mode 100644 README.md create mode 100644 keywords.yaml create mode 100644 requirements.txt create mode 100644 src/__init__.py create mode 100644 src/config.py create mode 100644 src/group_monitor.py create mode 100644 src/keywords.py create mode 100644 src/main.py create mode 100644 src/reporter.py diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..064b729 --- /dev/null +++ b/.env.example @@ -0,0 +1,8 @@ +API_ID=123456 +API_HASH=change_me +SESSION_NAME=userbot_session +USER_PHONE=+8613000000000 +GROUP_LINKS=https://t.me/example_group +REPORT_CHAT_LINK=https://t.me/example_group +TELEGRAM_BOT_TOKEN=your_bot_token_here +KEYWORDS_FILE=keywords.yaml diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..7626538 --- /dev/null +++ b/.gitignore @@ -0,0 +1,23 @@ +# Python artifacts +__pycache__/ +*.pyc +*.pyo +*.pyd +*.log +*.sqlite3 + +# Virtual environments +.venv/ +env/ +venv/ + +# VSCode / IDE +.vscode/ +.idea/ + +# OS files +.DS_Store + +# Secrets / sessions +.env +*.session diff --git a/README.md b/README.md new file mode 100644 index 0000000..4f4e9d7 --- /dev/null +++ b/README.md @@ -0,0 +1,80 @@ +# 群监控自动加群 + +使用 Telethon 登陆用户账号(User-Bot),自动加入目标群、抓取群主信息、监控消息并通过单独的 Bot 汇报关键词命中结果。 + +## 功能概述 +- 通过 `GROUP_LINKS` 列表批量加入群(支持公开链接与 `https://t.me/+xxxx` 邀请链接)。 +- 获取群主/创建者信息并在汇报群内确认当前监控状态。 +- 长连接监听群消息,命中关键词时推送到指定群/频道。 +- 关键词配置独立于代码(`keywords.yaml`),可随时增删并自动热加载。 +- 汇报通道使用 Bot Token,避免 user-bot 重复发言。 + +## 环境要求 +- Python 3.10+ +- Telegram API 凭据(`api_id`/`api_hash`) +- 目标账号可登录的手机号(首登需要验证码) +- Bot Token(用于把监控结果发到 @kt500_bot 已在的群) + +## 快速开始 +1. 安装依赖: + ```bash + python3 -m venv .venv && source .venv/bin/activate + pip install -r requirements.txt + ``` +2. 准备配置: + ```bash + cp .env.example .env + ``` + 按需填写: + - `API_ID` / `API_HASH`:来自 [my.telegram.org](https://my.telegram.org)。 + - `SESSION_NAME`:本地 session 文件名,首次登录会生成 `SESSION_NAME.session`。 + - `USER_PHONE`:用于自动登录(可留空,运行时手输)。 + - `GROUP_LINKS`:逗号分隔的群链接,可直接填 `https://t.me/+tvVm--E19cxkNWJl`。 + - `REPORT_CHAT_LINK`:用于汇报的群/频道链接,如果就地汇报可与 `GROUP_LINKS` 中某一项相同。 + - `TELEGRAM_BOT_TOKEN`:你的 @kt500_bot Token。 + - `KEYWORDS_FILE`:关键词配置文件路径,默认 `keywords.yaml`。 +3. 配置关键词:编辑 `keywords.yaml`,示例: + ```yaml + keywords: + - name: promo + patterns: + - "推广" + - "广告" + regex: false + - name: join_request + patterns: + - "(?i)拉群" + - "(?i)加好友" + regex: true + ``` + - `regex: false` 表示普通子串匹配(自动转换小写)。 + - `regex: true` 将整条 `pattern` 按正则表达式处理,可使用 `(?i)` 等修饰。 + - 文件被修改后,程序会自动检测更新时间并重新加载。 +4. 运行: + ```bash + python3 -m src.main + ``` + - 首次运行会提示输入验证码 / 二步验证密码。 + - 成功后会看到“群监控已启用”提示,同时在 `REPORT_CHAT_LINK` 对应的群内收到确认消息。 + +## 工作流程 +1. **自动入群**:对每个链接先尝试 `get_entity`,若失败且为邀请链接,则执行 `ImportChatInviteRequest` 加入。加入或已在群内后会开始监听。 +2. **群主识别**:调用 `GetParticipantsRequest(... ChannelParticipantsAdmins ...)` 找到 `ChannelParticipantCreator`,将结果写入汇报中,便于核对“自动添加群主”状态。 +3. **关键词监控**:`events.NewMessage` 监听指定群,命中关键词时将群名、消息 ID、发送人、关键词、时间与截断内容推送到 Bot 所在的群。 +4. **多次触发 & 附件提醒**:文本命中会附带一则消息,若原消息含媒体,还会追加“附件提醒”。 + +## 常见扩展 +- **新增关键词**:直接编辑 `keywords.yaml`,保存后生效(无需重启)。 +- **新增群**:在 `.env` 的 `GROUP_LINKS` 中添加链接,重启程序即可。 +- **自定义汇报格式**:在 `src/group_monitor.py` 的 `_handle_new_message` 中调整 `lines` 内容。 +- **落地数据库**:可在 `_handle_new_message` 中追加写库逻辑,然后调用 `reporter.send_safe` 做通知。 + +## 注意事项 +- 请确保 user-bot 与 @kt500_bot 均已在 `REPORT_CHAT_LINK` 对应的群里,并授予发送消息权限。 +- Telegram 对频繁入群/拉人有限制,若日志出现 `FloodWaitError`,需等待对应秒数。 +- Session 文件包含账户授权信息,只应保存在可信设备中。 +- 若长期运行,建议用 `supervisor`/`systemd` 守护,并开启日志轮转。 + +## 后续工作 +- 若需要把“自动添加群主”升级为主动发送好友请求,可结合 `contacts.AddContactRequest`,条件是群主公开手机号。 +- 可根据需要加入异常上报(例如钉钉/企业微信)或统计报表。 diff --git a/keywords.yaml b/keywords.yaml new file mode 100644 index 0000000..6e76870 --- /dev/null +++ b/keywords.yaml @@ -0,0 +1,12 @@ +# 关键词配置 +keywords: + - name: promo + patterns: + - "推广" + - "广告" + regex: false + - name: join_request + patterns: + - "(?i)拉群" + - "(?i)加好友" + regex: true diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..36ff572 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +telethon==1.35.0 +python-dotenv==1.0.1 +PyYAML==6.0.2 +httpx==0.27.2 diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/config.py b/src/config.py new file mode 100644 index 0000000..2618a26 --- /dev/null +++ b/src/config.py @@ -0,0 +1,79 @@ +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path +from typing import List, Optional +import os + +from dotenv import load_dotenv + + +@dataclass +class CredentialsConfig: + api_id: int + api_hash: str + session_name: str + phone: Optional[str] + + +@dataclass +class ReportingConfig: + bot_token: str + chat_link: Optional[str] + + +@dataclass +class MonitorConfig: + group_links: List[str] + keywords_file: Path + + +@dataclass +class AppConfig: + credentials: CredentialsConfig + reporting: ReportingConfig + monitor: MonitorConfig + + +def _comma_split(value: str) -> List[str]: + return [chunk.strip() for chunk in value.split(",") if chunk.strip()] + + +def load_config(env_file: str = ".env") -> AppConfig: + if Path(env_file).exists(): + load_dotenv(env_file) + + try: + api_id = int(os.environ["API_ID"]) + api_hash = os.environ["API_HASH"] + except KeyError as exc: + raise RuntimeError("API_ID 和 API_HASH 必填。") from exc + + session_name = os.environ.get("SESSION_NAME", "userbot") + phone = os.environ.get("USER_PHONE") + + bot_token = os.environ.get("TELEGRAM_BOT_TOKEN") + if not bot_token: + raise RuntimeError("TELEGRAM_BOT_TOKEN 缺失,以便推送汇报。") + + group_links_env = os.environ.get("GROUP_LINKS", "").strip() + if not group_links_env: + raise RuntimeError("GROUP_LINKS 至少包含一个要监听的群链接。") + group_links = _comma_split(group_links_env) + + keywords_file = Path(os.environ.get("KEYWORDS_FILE", "keywords.yaml")).expanduser() + + report_link = os.environ.get("REPORT_CHAT_LINK") + + credentials = CredentialsConfig( + api_id=api_id, + api_hash=api_hash, + session_name=session_name, + phone=phone, + ) + reporting = ReportingConfig(bot_token=bot_token, chat_link=report_link) + monitor = MonitorConfig( + group_links=group_links, + keywords_file=keywords_file, + ) + return AppConfig(credentials=credentials, reporting=reporting, monitor=monitor) diff --git a/src/group_monitor.py b/src/group_monitor.py new file mode 100644 index 0000000..493f7a4 --- /dev/null +++ b/src/group_monitor.py @@ -0,0 +1,204 @@ +from __future__ import annotations + +from datetime import datetime +import logging +from typing import List, Optional + +from telethon import events +from telethon.errors import ( + ChannelsTooMuchError, + ChatAdminRequiredError, + FloodWaitError, + InviteHashExpiredError, + InviteHashInvalidError, + UserAlreadyParticipantError, +) +from telethon.tl.functions.channels import GetParticipantsRequest, JoinChannelRequest +from telethon.tl.functions.messages import ImportChatInviteRequest +from telethon.tl.types import Channel, ChannelParticipantCreator, ChannelParticipantsAdmins +from telethon.utils import get_display_name, get_peer_id + +from .keywords import KeywordStore +from .reporter import Reporter + +logger = logging.getLogger(__name__) + + +def _escape_md(value: Optional[str]) -> str: + if value is None: + return "" + return value.replace("`", r"\`") + + +def _extract_invite_hash(link: str) -> Optional[str]: + if not link: + return None + if "t.me/+" in link: + return link.split("t.me/+", 1)[1] + if "t.me/joinchat/" in link: + return link.rstrip("/").split("t.me/joinchat/", 1)[1] + if link.startswith("+"): + return link[1:] + return None + + +class GroupMonitor: + def __init__( + self, + client, + keyword_store: KeywordStore, + reporter: Reporter, + group_links: List[str], + ) -> None: + self.client = client + self.keyword_store = keyword_store + self.reporter = reporter + self.group_links = group_links + self.entities: List[Channel] = [] + + async def start(self) -> None: + await self._ensure_memberships() + if not self.entities: + raise RuntimeError("没有可监听的群,停止启动。") + await self.reporter.prepare(self.client) + self.client.add_event_handler(self._handle_new_message, events.NewMessage(chats=[get_peer_id(e) for e in self.entities])) + logger.info("事件监听已注册,等待消息。") + + async def _ensure_memberships(self) -> None: + for link in self.group_links: + entity, joined_via_invite = await self._resolve_entity(link) + if not entity: + logger.error("无法解析群链接 %s", link) + continue + + if not isinstance(entity, Channel): + logger.warning("%s 不是超级群/频道,当前实现仅支持 Channel。", link) + continue + + if not joined_via_invite: + try: + await self.client(JoinChannelRequest(entity)) + logger.info("加入群 %s 成功。", entity.title) + except UserAlreadyParticipantError: + logger.info("已在群 %s 中。", entity.title) + except (ChannelsTooMuchError, FloodWaitError, InviteHashExpiredError, InviteHashInvalidError) as exc: + logger.error("加入群 %s 失败: %s", link, exc) + continue + + peer_id = get_peer_id(entity) + if any(get_peer_id(e) == peer_id for e in self.entities): + continue + self.entities.append(entity) + owner = await self._discover_owner(entity) + owner_label = _escape_md(owner or "未找到") + await self.reporter.send_safe( + ( + "✅ *群监控已启用*\n" + f"群: `{_escape_md(entity.title)}`\n" + f"ID: `{get_peer_id(entity)}`\n" + f"群主: `{owner_label}`" + ) + ) + + async def _discover_owner(self, entity: Channel) -> Optional[str]: + try: + result = await self.client( + GetParticipantsRequest( + channel=entity, + filter=ChannelParticipantsAdmins(), + offset=0, + limit=200, + hash=0, + ) + ) + except ChatAdminRequiredError: + logger.warning("没有足够权限获取 %s 管理员信息。", entity.title) + return None + except Exception as exc: + logger.warning("获取群主失败 %s: %s", entity.title, exc) + return None + + creator: Optional[str] = None + for participant in result.participants: + if isinstance(participant, ChannelParticipantCreator): + owner_user = next((u for u in result.users if u.id == participant.user_id), None) + if owner_user: + creator = get_display_name(owner_user) or str(owner_user.id) + break + return creator + + async def _handle_new_message(self, event: events.NewMessage.Event) -> None: + try: + chat = await event.get_chat() + sender = await event.get_sender() + except Exception as exc: + logger.warning("无法获取消息上下文: %s", exc) + return + + text = event.raw_text or "" + hits = self.keyword_store.match(text) + if not hits: + return + + keyword_summary = ", ".join(dict.fromkeys(hit.keyword for hit in hits)) + sender_name = _escape_md(get_display_name(sender)) + chat_name = _escape_md(getattr(chat, "title", str(event.chat_id))) + message_link = None + if getattr(chat, "username", None): + message_link = f"https://t.me/{chat.username}/{event.id}" + + lines = [ + "⚠️ *关键词触发*", + f"群: `{chat_name}`", + f"消息ID: `{event.id}`", + f"用户: `{sender_name}` (`{sender.id}`)", + f"关键词: `{_escape_md(keyword_summary)}`", + f"时间: `{datetime.utcnow().isoformat()}Z`", + ] + if message_link: + lines.append(f"链接: {message_link}") + preview = _escape_md(text.strip()) + if preview: + if len(preview) > 500: + preview = preview[:500] + "..." + lines.append("----") + lines.append(preview) + + await self.reporter.send_safe("\n".join(lines)) + + if event.message.media: + await self.reporter.send_safe( + ( + "📎 *附件提醒*\n" + f"群: `{chat_name}`\n" + f"消息ID: `{event.id}` 含有非文本内容" + ) + ) + + async def _resolve_entity(self, link: str) -> tuple[Optional[Channel], bool]: + try: + entity = await self.client.get_entity(link) + if isinstance(entity, Channel): + return entity, False + except (ValueError, TypeError): + logger.debug("直接解析 %s 失败,尝试邀请链接流程。", link) + + invite_hash = _extract_invite_hash(link) + if invite_hash: + try: + result = await self.client(ImportChatInviteRequest(invite_hash)) + for chat in result.chats: + if isinstance(chat, Channel): + logger.info("通过邀请链接加入 %s", chat.title) + return chat, True + except UserAlreadyParticipantError: + logger.info("邀请链接 %s 提示已在群内,尝试再次解析实体。", link) + try: + entity = await self.client.get_entity(link) + if isinstance(entity, Channel): + return entity, False + except Exception as exc: + logger.error("邀请链接 %s 解析实体失败: %s", link, exc) + except (InviteHashExpiredError, InviteHashInvalidError) as exc: + logger.error("邀请链接 %s 不可用: %s", link, exc) + return None, False diff --git a/src/keywords.py b/src/keywords.py new file mode 100644 index 0000000..dd8e21e --- /dev/null +++ b/src/keywords.py @@ -0,0 +1,98 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from pathlib import Path +from typing import List +import logging +import re +import threading + +import yaml + +logger = logging.getLogger(__name__) + + +@dataclass +class KeywordEntry: + name: str + patterns: List[str] + regex: bool = False + compiled: List[re.Pattern] = field(default_factory=list) + + +@dataclass +class KeywordHit: + keyword: str + pattern: str + + +class KeywordStore: + """加载并匹配关键词,支持热加载。""" + + def __init__(self, file_path: Path): + self.file_path = file_path + self._entries: List[KeywordEntry] = [] + self._last_mtime: float | None = None + self._lock = threading.Lock() + self._reload(force=True) + + def _reload(self, force: bool = False) -> None: + if not self.file_path.exists(): + if force: + logger.warning("关键词文件 %s 不存在,暂无匹配项。", self.file_path) + self._entries = [] + self._last_mtime = None + return + + mtime = self.file_path.stat().st_mtime + if not force and self._last_mtime == mtime: + return + + raw = yaml.safe_load(self.file_path.read_text(encoding="utf-8")) or {} + entries: List[KeywordEntry] = [] + for item in raw.get("keywords", []): + name = str(item.get("name", "unnamed")) + patterns = [str(p) for p in item.get("patterns", []) if p] + if not patterns: + continue + regex = bool(item.get("regex", False)) + if regex: + compiled = [] + for pattern in patterns: + try: + compiled.append(re.compile(pattern)) + except re.error as exc: + logger.warning("忽略非法正则 %s: %s", pattern, exc) + entry = KeywordEntry(name=name, patterns=patterns, regex=True, compiled=compiled) + else: + entry = KeywordEntry(name=name, patterns=[p.lower() for p in patterns], regex=False) + entries.append(entry) + + self._entries = entries + self._last_mtime = mtime + logger.info("加载 %d 个关键词分组。", len(entries)) + + def _ensure_fresh(self) -> None: + with self._lock: + self._reload() + + def match(self, text: str | None) -> List[KeywordHit]: + if not text: + return [] + self._ensure_fresh() + payload = text.strip() + if not payload: + return [] + + hits: List[KeywordHit] = [] + lowered = payload.lower() + for entry in self._entries: + if entry.regex: + for regex in entry.compiled: + if regex.search(payload): + hits.append(KeywordHit(keyword=entry.name, pattern=regex.pattern)) + else: + for pattern in entry.patterns: + if pattern in lowered: + hits.append(KeywordHit(keyword=entry.name, pattern=pattern)) + return hits diff --git a/src/main.py b/src/main.py new file mode 100644 index 0000000..0e9e7e9 --- /dev/null +++ b/src/main.py @@ -0,0 +1,66 @@ +from __future__ import annotations + +import asyncio +import logging +import signal + +from telethon import TelegramClient + +from .config import load_config +from .group_monitor import GroupMonitor +from .keywords import KeywordStore +from .reporter import Reporter + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s | %(levelname)s | %(name)s | %(message)s", +) +logger = logging.getLogger(__name__) + + +async def main() -> None: + config = load_config() + + client = TelegramClient( + session=config.credentials.session_name, + api_id=config.credentials.api_id, + api_hash=config.credentials.api_hash, + ) + + await client.start(phone=config.credentials.phone) + logger.info("User-Bot 登录完成。") + + keyword_store = KeywordStore(config.monitor.keywords_file) + reporter = Reporter( + bot_token=config.reporting.bot_token, + chat_link=config.reporting.chat_link or config.monitor.group_links[0], + ) + + monitor = GroupMonitor( + client=client, + keyword_store=keyword_store, + reporter=reporter, + group_links=config.monitor.group_links, + ) + + await monitor.start() + logger.info("监控已启动,等待消息...") + + stop_event = asyncio.Event() + + def _handle_sig(*_args): + stop_event.set() + + for sig in (signal.SIGINT, signal.SIGTERM): + try: + asyncio.get_running_loop().add_signal_handler(sig, _handle_sig) + except NotImplementedError: + pass + + await stop_event.wait() + logger.info("收到退出信号,断开连接。") + await client.disconnect() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/reporter.py b/src/reporter.py new file mode 100644 index 0000000..5c04867 --- /dev/null +++ b/src/reporter.py @@ -0,0 +1,54 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Optional +import logging + +import httpx +from telethon import utils as tg_utils + +logger = logging.getLogger(__name__) + + +@dataclass +class Reporter: + bot_token: str + chat_link: Optional[str] + chat_id: Optional[int] = None + timeout: float = 10.0 + + async def prepare(self, client) -> None: + if self.chat_id: + return + if not self.chat_link: + raise RuntimeError("REPORT_CHAT_LINK 未设置,无法汇报。") + entity = await client.get_entity(self.chat_link) + self.chat_id = tg_utils.get_peer_id(entity) + logger.info("汇报目标 chat_id=%s 来自 %s", self.chat_id, self.chat_link) + + async def send(self, text: str, parse_mode: str = "Markdown") -> None: + if not self.chat_id: + raise RuntimeError("Reporter 还未完成 prepare,无法发送。") + url = f"https://api.telegram.org/bot{self.bot_token}/sendMessage" + payload = { + "chat_id": self.chat_id, + "text": text, + "parse_mode": parse_mode, + "disable_web_page_preview": True, + } + try: + async with httpx.AsyncClient(timeout=self.timeout) as client: + response = await client.post(url, json=payload) + response.raise_for_status() + data = response.json() + if not data.get("ok", False): + raise RuntimeError(f"Bot API 返回失败: {data}") + except Exception as exc: + logger.error("发送汇报失败: %s", exc) + raise + + async def send_safe(self, text: str, parse_mode: str = "Markdown") -> None: + try: + await self.send(text, parse_mode=parse_mode) + except Exception: + logger.exception("忽略汇报发送异常")