commit 8d1ce4598d1d49adf13c14d8777c58f2f236002f Author: 你的用户名 <你的邮箱> Date: Tue Nov 4 15:28:06 2025 +0800 Initial commit: FunStat MCP Server Go implementation - Telegram integration for customer statistics - MCP server implementation with rate limiting - Cache system for performance optimization - Multi-language support - RESTful API endpoints 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..25f3aa5 --- /dev/null +++ b/.gitignore @@ -0,0 +1,38 @@ +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test binary, built with `go test -c` +*.test + +# Output of the go coverage tool +*.out + +# Dependency directories +vendor/ + +# Go workspace file +go.work + +# Environment variables +.env +.env.local + +# IDE +.vscode/ +.idea/ +*.swp +*.swo +*~ + +# OS +.DS_Store +Thumbs.db + +# Build output +/bin/ +/dist/ +/build/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..ffdd538 --- /dev/null +++ b/README.md @@ -0,0 +1,108 @@ +# Funstat MCP (Go) + +This project is a complete Go reimplementation of the original Funstat MCP +server. It exposes the same MCP tools wrapped around the Telegram +`@openaiw_bot`, but with a native Go runtime and HTTP transport. + +## Features + +- Streamable HTTP transport (`/sse` + `/messages`) compatible with MCP clients + such as Codex CLI, Cursor, and Claude Code. +- Full parity with the original nine Funstat tools (search, topchat, text, + human lookup, user info, user messages, balance, menu, start). +- Built-in rate limiting and response caching to stay within Telegram limits. +- Telegram session bootstrapping from a Telethon string session or persisted Go + session storage. +- Configurable proxy, cache TTL, and rate limit settings via environment + variables. + +## Requirements + +- Go 1.22 or newer (the module was developed with Go 1.24). +- A valid Telegram API ID and API hash. +- A logged-in Telegram session. The Go server expects either: + - `TELEGRAM_SESSION_STRING` (or `TELEGRAM_SESSION_STRING_FILE`) containing a + Telethon *StringSession*, which will be converted automatically into Go + session storage, **or** + - `TELEGRAM_SESSION_PATH` pointing to a Go session file created by this + service on a previous run (`~/.funstatmcp/session.json` by default). +- Network access to Telegram (configure proxies via the `FUNSTAT_PROXY_*` + variables if necessary). + +## Environment Variables + +| Variable | Description | +|----------|-------------| +| `TELEGRAM_API_ID` | **Required.** Telegram API ID. | +| `TELEGRAM_API_HASH` | **Required.** Telegram API hash. | +| `TELEGRAM_SESSION_STRING` | Base64 Telethon StringSession (optional if session file already exists). | +| `TELEGRAM_SESSION_STRING_FILE` | Path to a file containing the StringSession. | +| `TELEGRAM_SESSION_PATH` | Optional path for Go session storage (`.session` suffix is appended if missing). | +| `FUNSTAT_BOT_USERNAME` | Bot username (default `@openaiw_bot`). | +| `FUNSTAT_RATE_LIMIT_PER_SECOND` | Requests per second (default `18`). | +| `FUNSTAT_RATE_LIMIT_WINDOW` | Duration window, e.g. `1s` (default `1s`). | +| `FUNSTAT_CACHE_TTL` | Cache lifetime for command responses (default `1h`). | +| `FUNSTAT_PROXY_TYPE` | Proxy type (`socks5` supported). | +| `FUNSTAT_PROXY_HOST` | Proxy host. | +| `FUNSTAT_PROXY_PORT` | Proxy port. | +| `FUNSTAT_PROXY_USERNAME` / `FUNSTAT_PROXY_PASSWORD` | Optional proxy credentials. | +| `FUNSTAT_HOST` | Bind host (default `127.0.0.1`). | +| `FUNSTAT_PORT` | Bind port (default `8091`). | +| `FUNSTAT_REQUIRE_SESSION` | When `true`, enables strict session ID enforcement (not yet required by the Go transport). | + +## Running the Server + +```bash +export TELEGRAM_API_ID=... +export TELEGRAM_API_HASH=... +export TELEGRAM_SESSION_STRING=... # or TELEGRAM_SESSION_STRING_FILE + +go run ./cmd/funstat-mcp +``` + +The server starts on `http://127.0.0.1:8091` by default with: + +- `GET /sse` — server-sent events stream for MCP clients. +- `POST /messages` — JSON-RPC 2.0 endpoint for MCP requests. +- `GET /health` — simple health probe. + +For convenience a helper script is available: + +```bash +./scripts/start_server.sh +``` + +## Tool Coverage + +The server exposes the following MCP tools (identical to the Python version): + +| Tool | Description | +|------|-------------| +| `funstat_search` | 搜索 Telegram 群组/频道。 | +| `funstat_topchat` | 获取热门群组列表。 | +| `funstat_text` | 按文本搜索消息。 | +| `funstat_human` | 按姓名搜索用户。 | +| `funstat_user_info` | 查询用户详情。 | +| `funstat_user_messages` | 抓取用户聊天记录并自动翻页。 | +| `funstat_balance` | 查询积分余额。 | +| `funstat_menu` | 显示 Funstat 菜单。 | +| `funstat_start` | Funstat 欢迎信息。 | + +## Session Notes + +- If you already possess the Telethon `.session` SQLite file, generate a + StringSession first (e.g. via Telethon or `python -m telethon.sessions`) + and provide it through `TELEGRAM_SESSION_STRING`. +- After the first successful run the Go server stores an encrypted Go session + at `~/.funstatmcp/session.json` (unless overridden). Subsequent runs only + require the API ID/hash. + +## Development + +- Format code with `gofmt -w cmd internal`. +- Run `go test ./...` (no automated tests are included yet). +- Dependencies are managed via Go modules (`go mod tidy`). + +## License + +Follows the license of the original Funstat MCP project (MIT, if applicable). diff --git a/cmd/funstat-mcp/main.go b/cmd/funstat-mcp/main.go new file mode 100644 index 0000000..bfb5858 --- /dev/null +++ b/cmd/funstat-mcp/main.go @@ -0,0 +1,32 @@ +package main + +import ( + "context" + "log" + "os/signal" + "syscall" + + "funstatmcp/internal/app" + "funstatmcp/internal/transport" +) + +func main() { + cfg, err := app.FromEnv() + if err != nil { + log.Fatalf("load config: %v", err) + } + + application, err := app.New(cfg) + if err != nil { + log.Fatalf("initialize app: %v", err) + } + + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer cancel() + + server := transport.NewServer(application, cfg) + + if err := server.Run(ctx); err != nil { + log.Fatalf("server exited with error: %v", err) + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..89a0b52 --- /dev/null +++ b/go.mod @@ -0,0 +1,43 @@ +module funstatmcp + +go 1.24.3 + +require ( + github.com/gotd/td v0.132.0 + golang.org/x/net v0.46.0 +) + +require ( + github.com/cenkalti/backoff/v4 v4.3.0 // indirect + github.com/coder/websocket v1.8.14 // indirect + github.com/dlclark/regexp2 v1.11.5 // indirect + github.com/fatih/color v1.18.0 // indirect + github.com/ghodss/yaml v1.0.0 // indirect + github.com/go-faster/errors v0.7.1 // indirect + github.com/go-faster/jx v1.1.0 // indirect + github.com/go-faster/xor v1.0.0 // indirect + github.com/go-faster/yaml v0.4.6 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/gotd/ige v0.2.2 // indirect + github.com/gotd/neo v0.1.5 // indirect + github.com/klauspost/compress v1.18.0 // indirect + github.com/mattn/go-colorable v0.1.14 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/ogen-go/ogen v1.15.2 // indirect + github.com/segmentio/asm v1.2.0 // indirect + go.opentelemetry.io/otel v1.38.0 // indirect + go.opentelemetry.io/otel/metric v1.38.0 // indirect + go.opentelemetry.io/otel/trace v1.38.0 // indirect + go.uber.org/atomic v1.11.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + go.uber.org/zap v1.27.0 // indirect + golang.org/x/crypto v0.43.0 // indirect + golang.org/x/exp v0.0.0-20230725093048-515e97ebf090 // indirect + golang.org/x/mod v0.29.0 // indirect + golang.org/x/sync v0.17.0 // indirect + golang.org/x/sys v0.37.0 // indirect + golang.org/x/text v0.30.0 // indirect + golang.org/x/tools v0.38.0 // indirect + gopkg.in/yaml.v2 v2.4.0 // indirect + rsc.io/qr v0.2.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..a241134 --- /dev/null +++ b/go.sum @@ -0,0 +1,98 @@ +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/coder/websocket v1.8.14 h1:9L0p0iKiNOibykf283eHkKUHHrpG7f65OE3BhhO7v9g= +github.com/coder/websocket v1.8.14/go.mod h1:NX3SzP+inril6yawo5CQXx8+fk145lPDC6pumgx0mVg= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dlclark/regexp2 v1.11.5 h1:Q/sSnsKerHeCkc/jSTNq1oCm7KiVgUMZRDUoRu0JQZQ= +github.com/dlclark/regexp2 v1.11.5/go.mod h1:DHkYz0B9wPfa6wondMfaivmHpzrQ3v9q8cnmRbL6yW8= +github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM= +github.com/fatih/color v1.18.0/go.mod h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU= +github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= +github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= +github.com/go-faster/errors v0.7.1 h1:MkJTnDoEdi9pDabt1dpWf7AA8/BaSYZqibYyhZ20AYg= +github.com/go-faster/errors v0.7.1/go.mod h1:5ySTjWFiphBs07IKuiL69nxdfd5+fzh1u7FPGZP2quo= +github.com/go-faster/jx v1.1.0 h1:ZsW3wD+snOdmTDy9eIVgQdjUpXRRV4rqW8NS3t+20bg= +github.com/go-faster/jx v1.1.0/go.mod h1:vKDNikrKoyUmpzaJ0OkIkRQClNHFX/nF3dnTJZb3skg= +github.com/go-faster/xor v0.3.0/go.mod h1:x5CaDY9UKErKzqfRfFZdfu+OSTfoZny3w5Ak7UxcipQ= +github.com/go-faster/xor v1.0.0 h1:2o8vTOgErSGHP3/7XwA5ib1FTtUsNtwCoLLBjl31X38= +github.com/go-faster/xor v1.0.0/go.mod h1:x5CaDY9UKErKzqfRfFZdfu+OSTfoZny3w5Ak7UxcipQ= +github.com/go-faster/yaml v0.4.6 h1:lOK/EhI04gCpPgPhgt0bChS6bvw7G3WwI8xxVe0sw9I= +github.com/go-faster/yaml v0.4.6/go.mod h1:390dRIvV4zbnO7qC9FGo6YYutc+wyyUSHBgbXL52eXk= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gotd/ige v0.2.2 h1:XQ9dJZwBfDnOGSTxKXBGP4gMud3Qku2ekScRjDWWfEk= +github.com/gotd/ige v0.2.2/go.mod h1:tuCRb+Y5Y3eNTo3ypIfNpQ4MFjrnONiL2jN2AKZXmb0= +github.com/gotd/neo v0.1.5 h1:oj0iQfMbGClP8xI59x7fE/uHoTJD7NZH9oV1WNuPukQ= +github.com/gotd/neo v0.1.5/go.mod h1:9A2a4bn9zL6FADufBdt7tZt+WMhvZoc5gWXihOPoiBQ= +github.com/gotd/td v0.132.0 h1:Iqm3S2b+8kDgA9237IDXRxj7sryUpvy+4Cr50/0tpx4= +github.com/gotd/td v0.132.0/go.mod h1:4CDGYS+rDtOqotRheGaF9MS5g6jaUewvSXqBNJnx8SQ= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE= +github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/ogen-go/ogen v1.15.2 h1:Hy5XNcDgWur758Kf0+DTQFN8cyBOs58EjDD3NMqih54= +github.com/ogen-go/ogen v1.15.2/go.mod h1:bS+BP2cV7+IGjOM24znBmh+PrpZvYFXA7o3BNF4Hj2E= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys= +github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8= +go.opentelemetry.io/otel v1.38.0/go.mod h1:zcmtmQ1+YmQM9wrNsTGV/q/uyusom3P8RxwExxkZhjM= +go.opentelemetry.io/otel/metric v1.38.0 h1:Kl6lzIYGAh5M159u9NgiRkmoMKjvbsKtYRwgfrA6WpA= +go.opentelemetry.io/otel/metric v1.38.0/go.mod h1:kB5n/QoRM8YwmUahxvI3bO34eVtQf2i4utNVLr9gEmI= +go.opentelemetry.io/otel/trace v1.38.0 h1:Fxk5bKrDZJUH+AMyyIXGcFAPah0oRcT+LuNtJrmcNLE= +go.opentelemetry.io/otel/trace v1.38.0/go.mod h1:j1P9ivuFsTceSWe1oY+EeW3sc+Pp42sO++GHkg4wwhs= +go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= +go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +golang.org/x/crypto v0.43.0 h1:dduJYIi3A3KOfdGOHX8AVZ/jGiyPa3IbBozJ5kNuE04= +golang.org/x/crypto v0.43.0/go.mod h1:BFbav4mRNlXJL4wNeejLpWxB7wMbc79PdRGhWKncxR0= +golang.org/x/exp v0.0.0-20230725093048-515e97ebf090 h1:Di6/M8l0O2lCLc6VVRWhgCiApHV8MnQurBnFSHsQtNY= +golang.org/x/exp v0.0.0-20230725093048-515e97ebf090/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= +golang.org/x/mod v0.29.0 h1:HV8lRxZC4l2cr3Zq1LvtOsi/ThTgWnUk/y64QSs8GwA= +golang.org/x/mod v0.29.0/go.mod h1:NyhrlYXJ2H4eJiRy/WDBO6HMqZQ6q9nk4JzS3NuCK+w= +golang.org/x/net v0.46.0 h1:giFlY12I07fugqwPuWJi68oOnpfqFnJIJzaIIm2JVV4= +golang.org/x/net v0.46.0/go.mod h1:Q9BGdFy1y4nkUwiLvT5qtyhAnEHgnQ/zd8PfU6nc210= +golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= +golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ= +golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/text v0.30.0 h1:yznKA/E9zq54KzlzBEAWn1NXSQ8DIp/NYMy88xJjl4k= +golang.org/x/text v0.30.0/go.mod h1:yDdHFIX9t+tORqspjENWgzaCVXgk0yYnYuSZ8UzzBVM= +golang.org/x/tools v0.38.0 h1:Hx2Xv8hISq8Lm16jvBZ2VQf+RLmbd7wVUsALibYI/IQ= +golang.org/x/tools v0.38.0/go.mod h1:yEsQ/d/YK8cjh0L6rZlY8tgtlKiBNTL14pGDJPJpYQs= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +nhooyr.io/websocket v1.8.17 h1:KEVeLJkUywCKVsnLIDlD/5gtayKp8VoCkksHCGGfT9Y= +nhooyr.io/websocket v1.8.17/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c= +rsc.io/qr v0.2.0 h1:6vBLea5/NRMVTz8V66gipeLycZMl/+UlFmk8DvqQ6WY= +rsc.io/qr v0.2.0/go.mod h1:IF+uZjkb9fqyeF/4tlBoynqmQxUoPfWEKh921coOuXs= diff --git a/internal/app/app.go b/internal/app/app.go new file mode 100644 index 0000000..c757e1d --- /dev/null +++ b/internal/app/app.go @@ -0,0 +1,153 @@ +package app + +import ( + "context" + "fmt" + "strconv" + "strings" + + tgclient "funstatmcp/internal/telegram" +) + +type App struct { + cfg Config + client *tgclient.Client +} + +func New(cfg Config) (*App, error) { + client, err := tgclient.New(cfg.Telegram) + if err != nil { + return nil, err + } + + return &App{ + cfg: cfg, + client: client, + }, nil +} + +func (a *App) Close() error { + return nil +} + +func (a *App) SendCommand(ctx context.Context, command string, useCache bool) (string, error) { + return a.client.SendCommand(ctx, command, useCache) +} + +func (a *App) CallTool(ctx context.Context, name string, args map[string]any) (string, error) { + switch name { + case "funstat_search": + query, err := requireString(args, "query") + if err != nil { + return "", err + } + return a.client.SendCommand(ctx, fmt.Sprintf("/search %s", query), true) + + case "funstat_topchat": + category := optionalString(args, "category") + if category != "" { + return a.client.SendCommand(ctx, fmt.Sprintf("/topchat %s", category), true) + } + return a.client.SendCommand(ctx, "/topchat", true) + + case "funstat_text": + text, err := requireString(args, "text") + if err != nil { + return "", err + } + return a.client.SendCommand(ctx, fmt.Sprintf("/text %s", text), true) + + case "funstat_human": + nameArg, err := requireString(args, "name") + if err != nil { + return "", err + } + return a.client.SendCommand(ctx, fmt.Sprintf("/human %s", nameArg), true) + + case "funstat_user_info": + identifier, err := requireString(args, "identifier") + if err != nil { + return "", err + } + identifier = strings.TrimSpace(identifier) + if identifier == "" { + return "", fmt.Errorf("identifier cannot be empty") + } + return a.client.SendCommand(ctx, fmt.Sprintf("/user_info %s", identifier), true) + + case "funstat_user_messages": + identifier, err := requireString(args, "identifier") + if err != nil { + return "", err + } + var maxPagesPtr *int + if value, ok := args["max_pages"]; ok { + v, err := toInt(value) + if err != nil { + return "", fmt.Errorf("max_pages must be an integer: %w", err) + } + maxPagesPtr = &v + } + return a.client.FetchUserMessages(ctx, identifier, maxPagesPtr) + + case "funstat_balance": + return a.client.SendCommand(ctx, "/balance", true) + + case "funstat_menu": + return a.client.SendCommand(ctx, "/menu", true) + + case "funstat_start": + return a.client.SendCommand(ctx, "/start", true) + + default: + return "", fmt.Errorf("unknown tool: %s", name) + } +} + +func requireString(args map[string]any, key string) (string, error) { + value, ok := args[key] + if !ok { + return "", fmt.Errorf("missing required argument: %s", key) + } + str, ok := value.(string) + if !ok { + return "", fmt.Errorf("argument %s must be a string", key) + } + str = strings.TrimSpace(str) + if str == "" { + return "", fmt.Errorf("argument %s cannot be empty", key) + } + return str, nil +} + +func optionalString(args map[string]any, key string) string { + if value, ok := args[key]; ok { + if str, ok := value.(string); ok { + return strings.TrimSpace(str) + } + } + return "" +} + +func toInt(value any) (int, error) { + switch v := value.(type) { + case float64: + return int(v), nil + case float32: + return int(v), nil + case int: + return v, nil + case int32: + return int(v), nil + case int64: + return int(v), nil + case string: + parsed, err := strconv.Atoi(strings.TrimSpace(v)) + if err != nil { + return 0, err + } + return parsed, nil + default: + return 0, fmt.Errorf("unsupported type %T", value) + } +} diff --git a/internal/app/config.go b/internal/app/config.go new file mode 100644 index 0000000..e38001c --- /dev/null +++ b/internal/app/config.go @@ -0,0 +1,158 @@ +package app + +import ( + "fmt" + "os" + "path/filepath" + "strconv" + "strings" + "time" + + tgconfig "funstatmcp/internal/telegram" +) + +type Config struct { + Telegram tgconfig.Config + Host string + Port int + RequireSession bool +} + +func FromEnv() (Config, error) { + var cfg Config + + telegramCfg, err := loadTelegramConfig() + if err != nil { + return cfg, err + } + + cfg.Telegram = telegramCfg + cfg.Host = getEnvDefault("FUNSTAT_HOST", "127.0.0.1") + + portStr := getEnvDefault("FUNSTAT_PORT", "8091") + port, err := strconv.Atoi(portStr) + if err != nil { + return cfg, fmt.Errorf("invalid FUNSTAT_PORT: %w", err) + } + cfg.Port = port + + cfg.RequireSession = parseBool(getEnvDefault("FUNSTAT_REQUIRE_SESSION", "false")) + + return cfg, nil +} + +func loadTelegramConfig() (tgconfig.Config, error) { + var cfg tgconfig.Config + + apiIDStr := os.Getenv("TELEGRAM_API_ID") + if apiIDStr == "" { + return cfg, fmt.Errorf("TELEGRAM_API_ID is required") + } + apiID, err := strconv.Atoi(apiIDStr) + if err != nil { + return cfg, fmt.Errorf("invalid TELEGRAM_API_ID: %w", err) + } + cfg.APIID = apiID + + cfg.APIHash = strings.TrimSpace(os.Getenv("TELEGRAM_API_HASH")) + if cfg.APIHash == "" { + return cfg, fmt.Errorf("TELEGRAM_API_HASH is required") + } + + cfg.BotUsername = getEnvDefault("FUNSTAT_BOT_USERNAME", "@openaiw_bot") + cfg.SessionString = strings.TrimSpace(os.Getenv("TELEGRAM_SESSION_STRING")) + sessionStringFile := strings.TrimSpace(os.Getenv("TELEGRAM_SESSION_STRING_FILE")) + if cfg.SessionString == "" && sessionStringFile != "" { + data, err := os.ReadFile(expandPath(sessionStringFile)) + if err != nil { + return cfg, fmt.Errorf("read TELEGRAM_SESSION_STRING_FILE: %w", err) + } + cfg.SessionString = strings.TrimSpace(string(data)) + } + + sessionPath := strings.TrimSpace(os.Getenv("TELEGRAM_SESSION_PATH")) + if sessionPath != "" { + if !strings.HasSuffix(sessionPath, ".session") { + sessionPath = sessionPath + ".session" + } + cfg.SessionStorage = expandPath(sessionPath) + } else { + cfg.SessionStorage = defaultSessionPath() + } + + if value := strings.TrimSpace(os.Getenv("FUNSTAT_RATE_LIMIT_PER_SECOND")); value != "" { + parsed, err := strconv.Atoi(value) + if err != nil { + return cfg, fmt.Errorf("invalid FUNSTAT_RATE_LIMIT_PER_SECOND: %w", err) + } + cfg.RateLimit = parsed + } + + if value := strings.TrimSpace(os.Getenv("FUNSTAT_RATE_LIMIT_WINDOW")); value != "" { + duration, err := time.ParseDuration(value) + if err != nil { + return cfg, fmt.Errorf("invalid FUNSTAT_RATE_LIMIT_WINDOW: %w", err) + } + cfg.RateLimitWindow = duration + } + + if value := strings.TrimSpace(os.Getenv("FUNSTAT_CACHE_TTL")); value != "" { + duration, err := time.ParseDuration(value) + if err != nil { + return cfg, fmt.Errorf("invalid FUNSTAT_CACHE_TTL: %w", err) + } + cfg.CacheTTL = duration + } + + proxyHost := strings.TrimSpace(os.Getenv("FUNSTAT_PROXY_HOST")) + proxyPort := strings.TrimSpace(os.Getenv("FUNSTAT_PROXY_PORT")) + if proxyHost != "" && proxyPort != "" { + port, err := strconv.Atoi(proxyPort) + if err != nil { + return cfg, fmt.Errorf("invalid FUNSTAT_PROXY_PORT: %w", err) + } + cfg.Proxy = &tgconfig.ProxyConfig{ + Type: getEnvDefault("FUNSTAT_PROXY_TYPE", "socks5"), + Host: proxyHost, + Port: port, + Username: strings.TrimSpace(os.Getenv("FUNSTAT_PROXY_USERNAME")), + Password: strings.TrimSpace(os.Getenv("FUNSTAT_PROXY_PASSWORD")), + } + } + + return cfg, nil +} + +func getEnvDefault(key, fallback string) string { + if value := strings.TrimSpace(os.Getenv(key)); value != "" { + return value + } + return fallback +} + +func parseBool(value string) bool { + switch strings.ToLower(strings.TrimSpace(value)) { + case "1", "true", "yes", "on": + return true + default: + return false + } +} + +func defaultSessionPath() string { + home, err := os.UserHomeDir() + if err != nil { + return filepath.Join(os.TempDir(), "funstatmcp", "session.json") + } + return filepath.Join(home, ".funstatmcp", "session.json") +} + +func expandPath(path string) string { + if strings.HasPrefix(path, "~") { + home, err := os.UserHomeDir() + if err == nil { + return filepath.Join(home, strings.TrimPrefix(path, "~")) + } + } + return path +} diff --git a/internal/app/tools.go b/internal/app/tools.go new file mode 100644 index 0000000..1ae97d6 --- /dev/null +++ b/internal/app/tools.go @@ -0,0 +1,124 @@ +package app + +type ToolDefinition struct { + Name string `json:"name"` + Description string `json:"description"` + InputSchema map[string]any `json:"inputSchema"` +} + +func ToolDefinitions() []ToolDefinition { + return []ToolDefinition{ + { + Name: "funstat_search", + Description: "搜索 Telegram 群组、频道。支持关键词搜索,返回相关的群组列表", + InputSchema: map[string]any{ + "type": "object", + "properties": map[string]any{ + "query": map[string]any{ + "type": "string", + "description": "搜索关键词,例如: 'python', '区块链', 'AI'", + }, + }, + "required": []string{"query"}, + }, + }, + { + Name: "funstat_topchat", + Description: "获取热门群组/频道列表,按成员数或活跃度排序", + InputSchema: map[string]any{ + "type": "object", + "properties": map[string]any{ + "category": map[string]any{ + "type": "string", + "description": "分类筛选(可选),例如: 'tech', 'crypto', 'news'", + }, + }, + }, + }, + { + Name: "funstat_text", + Description: "通过消息文本搜索,查找包含特定文本的消息和来源群组", + InputSchema: map[string]any{ + "type": "object", + "properties": map[string]any{ + "text": map[string]any{ + "type": "string", + "description": "要搜索的文本内容", + }, + }, + "required": []string{"text"}, + }, + }, + { + Name: "funstat_human", + Description: "通过姓名搜索,查找包含特定用户的群组和消息", + InputSchema: map[string]any{ + "type": "object", + "properties": map[string]any{ + "name": map[string]any{ + "type": "string", + "description": "用户姓名", + }, + }, + "required": []string{"name"}, + }, + }, + { + Name: "funstat_user_info", + Description: "查询用户详细信息,支持通过用户名、用户ID、联系人等方式查询", + InputSchema: map[string]any{ + "type": "object", + "properties": map[string]any{ + "identifier": map[string]any{ + "type": "string", + "description": "用户标识: 用户名(@username)、用户ID、或手机号", + }, + }, + "required": []string{"identifier"}, + }, + }, + { + Name: "funstat_user_messages", + Description: "获取指定用户的历史消息列表,并自动翻页汇总", + InputSchema: map[string]any{ + "type": "object", + "properties": map[string]any{ + "identifier": map[string]any{ + "type": "string", + "description": "用户标识: 用户名(@username) 或用户ID", + }, + "max_pages": map[string]any{ + "type": "integer", + "minimum": 1, + "description": "可选,限制抓取的最大页数", + }, + }, + "required": []string{"identifier"}, + }, + }, + { + Name: "funstat_balance", + Description: "查询当前账号的积分余额和使用统计", + InputSchema: map[string]any{ + "type": "object", + "properties": map[string]any{}, + }, + }, + { + Name: "funstat_menu", + Description: "显示 funstat BOT 的主菜单和所有可用功能", + InputSchema: map[string]any{ + "type": "object", + "properties": map[string]any{}, + }, + }, + { + Name: "funstat_start", + Description: "获取 funstat BOT 的欢迎信息和使用说明", + InputSchema: map[string]any{ + "type": "object", + "properties": map[string]any{}, + }, + }, + } +} diff --git a/internal/cache/cache.go b/internal/cache/cache.go new file mode 100644 index 0000000..0c84b99 --- /dev/null +++ b/internal/cache/cache.go @@ -0,0 +1,70 @@ +package cache + +import ( + "sync" + "time" +) + +type entry struct { + value string + expires time.Time +} + +type Cache struct { + ttl time.Duration + mu sync.RWMutex + values map[string]entry +} + +func New(ttl time.Duration) *Cache { + if ttl <= 0 { + ttl = time.Hour + } + return &Cache{ + ttl: ttl, + values: make(map[string]entry), + } +} + +func (c *Cache) Get(key string) (string, bool) { + c.mu.RLock() + e, ok := c.values[key] + c.mu.RUnlock() + if !ok { + return "", false + } + if time.Now().After(e.expires) { + c.mu.Lock() + delete(c.values, key) + c.mu.Unlock() + return "", false + } + return e.value, true +} + +func (c *Cache) Set(key string, value string) { + c.mu.Lock() + c.values[key] = entry{ + value: value, + expires: time.Now().Add(c.ttl), + } + c.mu.Unlock() +} + +func (c *Cache) ClearExpired() int { + now := time.Now() + c.mu.Lock() + removed := 0 + for k, v := range c.values { + if now.After(v.expires) { + delete(c.values, k) + removed++ + } + } + c.mu.Unlock() + return removed +} + +func (c *Cache) TTL() time.Duration { + return c.ttl +} diff --git a/internal/ratelimit/limiter.go b/internal/ratelimit/limiter.go new file mode 100644 index 0000000..42dd131 --- /dev/null +++ b/internal/ratelimit/limiter.go @@ -0,0 +1,71 @@ +package ratelimit + +import ( + "context" + "sync" + "time" +) + +type Limiter struct { + maxRequests int + window time.Duration + + mu sync.Mutex + timestamps []time.Time +} + +func New(maxRequests int, window time.Duration) *Limiter { + if maxRequests <= 0 { + maxRequests = 1 + } + if window <= 0 { + window = time.Second + } + + return &Limiter{ + maxRequests: maxRequests, + window: window, + timestamps: make([]time.Time, 0, maxRequests), + } +} + +func (l *Limiter) Wait(ctx context.Context) error { + for { + l.mu.Lock() + now := time.Now() + + cutoff := now.Add(-l.window) + idx := 0 + for ; idx < len(l.timestamps); idx++ { + if l.timestamps[idx].After(cutoff) { + break + } + } + + if idx > 0 { + l.timestamps = append([]time.Time(nil), l.timestamps[idx:]...) + } + + if len(l.timestamps) < l.maxRequests { + l.timestamps = append(l.timestamps, now) + l.mu.Unlock() + return nil + } + + waitUntil := l.timestamps[0].Add(l.window) + waitDuration := time.Until(waitUntil) + l.mu.Unlock() + + if waitDuration <= 0 { + continue + } + + timer := time.NewTimer(waitDuration) + select { + case <-ctx.Done(): + timer.Stop() + return ctx.Err() + case <-timer.C: + } + } +} diff --git a/internal/telegram/buttons.go b/internal/telegram/buttons.go new file mode 100644 index 0000000..a10271c --- /dev/null +++ b/internal/telegram/buttons.go @@ -0,0 +1,71 @@ +package telegram + +import ( + "fmt" + "strconv" + "strings" + "unicode" + + "github.com/gotd/td/tg" +) + +func findCallbackButton(message *tg.Message, keyword string) (*tg.KeyboardButtonCallback, error) { + markup, ok := message.ReplyMarkup.(*tg.ReplyInlineMarkup) + if !ok || len(markup.Rows) == 0 { + return nil, fmt.Errorf("message has no interactive buttons") + } + + normalizedKeyword := strings.ToLower(normalizeButtonText(keyword)) + available := make([]string, 0) + + for _, row := range markup.Rows { + for _, button := range row.Buttons { + callback, ok := button.(*tg.KeyboardButtonCallback) + if !ok { + continue + } + + text := callback.Text + normalized := strings.ToLower(normalizeButtonText(text)) + available = append(available, normalizeButtonText(text)) + + if strings.Contains(normalized, normalizedKeyword) { + return callback, nil + } + } + } + + return nil, fmt.Errorf("button containing '%s' not found (available: %s)", keyword, strings.Join(available, ", ")) +} + +func extractTotalPages(message *tg.Message) int { + markup, ok := message.ReplyMarkup.(*tg.ReplyInlineMarkup) + if !ok { + return 0 + } + + for _, row := range markup.Rows { + for _, button := range row.Buttons { + callback, ok := button.(*tg.KeyboardButtonCallback) + if !ok { + continue + } + + if strings.Contains(callback.Text, "⏭") { + digits := strings.Builder{} + for _, r := range normalizeButtonText(callback.Text) { + if unicode.IsDigit(r) { + digits.WriteRune(r) + } + } + if digits.Len() > 0 { + if value, err := strconv.Atoi(digits.String()); err == nil { + return value + } + } + } + } + } + + return 0 +} diff --git a/internal/telegram/client.go b/internal/telegram/client.go new file mode 100644 index 0000000..7374175 --- /dev/null +++ b/internal/telegram/client.go @@ -0,0 +1,497 @@ +package telegram + +import ( + "context" + "errors" + "fmt" + "net" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "golang.org/x/net/proxy" + + "github.com/gotd/td/session" + "github.com/gotd/td/telegram" + "github.com/gotd/td/telegram/dcs" + "github.com/gotd/td/telegram/message" + "github.com/gotd/td/tg" + + "funstatmcp/internal/cache" + "funstatmcp/internal/ratelimit" +) + +type Client struct { + cfg Config + limiter *ratelimit.Limiter + cache *cache.Cache + sessionPath string + sessionOnce sync.Once +} + +func New(cfg Config) (*Client, error) { + if err := cfg.Validate(); err != nil { + return nil, err + } + + sessionPath := cfg.SessionStorage + if sessionPath == "" { + home, err := os.UserHomeDir() + if err != nil { + return nil, fmt.Errorf("get home dir: %w", err) + } + sessionPath = filepath.Join(home, ".funstatmcp", "session.json") + } + + if err := os.MkdirAll(filepath.Dir(sessionPath), 0o700); err != nil { + return nil, fmt.Errorf("create session directory: %w", err) + } + + client := &Client{ + cfg: cfg, + limiter: ratelimit.New(cfg.RateLimitPerSecond(), cfg.RateLimitDuration()), + cache: cache.New(cfg.CacheDuration()), + sessionPath: sessionPath, + } + + if strings.TrimSpace(cfg.SessionString) != "" { + if err := client.writeStringSession(strings.TrimSpace(cfg.SessionString)); err != nil { + return nil, err + } + } + + return client, nil +} + +func (c *Client) writeStringSession(sessionStr string) error { + var result error + c.sessionOnce.Do(func() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + data, err := session.TelethonSession(sessionStr) + if err != nil { + result = fmt.Errorf("decode telethon session: %w", err) + return + } + + loader := session.Loader{Storage: &session.FileStorage{Path: c.sessionPath}} + if err := loader.Save(ctx, data); err != nil { + result = fmt.Errorf("save session: %w", err) + return + } + }) + return result +} + +func (c *Client) createOptions() (telegram.Options, error) { + opts := telegram.Options{ + SessionStorage: &session.FileStorage{Path: c.sessionPath}, + NoUpdates: true, + } + + proxyCfg := c.cfg.Proxy + if proxyCfg != nil && proxyCfg.Host != "" && proxyCfg.Port > 0 { + address := fmt.Sprintf("%s:%d", proxyCfg.Host, proxyCfg.Port) + switch strings.ToLower(proxyCfg.Type) { + case "", "socks5", "socks": + var auth *proxy.Auth + if proxyCfg.Username != "" { + auth = &proxy.Auth{User: proxyCfg.Username, Password: proxyCfg.Password} + } + dialer, err := proxy.SOCKS5("tcp", address, auth, proxy.Direct) + if err != nil { + return opts, fmt.Errorf("create SOCKS5 proxy: %w", err) + } + contextDialer, ok := dialer.(proxy.ContextDialer) + if !ok { + contextDialer = &contextDialerAdapter{Dialer: dialer} + } + opts.Resolver = dcs.Plain(dcs.PlainOptions{Dial: contextDialer.DialContext}) + default: + return opts, fmt.Errorf("unsupported proxy type %q", proxyCfg.Type) + } + } + + return opts, nil +} + +type contextDialerAdapter struct { + Dialer proxy.Dialer +} + +func (a *contextDialerAdapter) DialContext(ctx context.Context, network, addr string) (net.Conn, error) { + type dialResult struct { + conn net.Conn + err error + } + + result := make(chan dialResult, 1) + go func() { + conn, err := a.Dialer.Dial(network, addr) + result <- dialResult{conn: conn, err: err} + }() + + select { + case <-ctx.Done(): + return nil, ctx.Err() + case res := <-result: + return res.conn, res.err + } +} + +func (c *Client) withClient(ctx context.Context, fn func(ctx context.Context, api *tg.Client, sender *message.Sender) error) error { + opts, err := c.createOptions() + if err != nil { + return err + } + + client := telegram.NewClient(c.cfg.APIID, c.cfg.APIHash, opts) + + return client.Run(ctx, func(runCtx context.Context) error { + if err := c.ensureAuthorized(runCtx, client); err != nil { + return err + } + raw := tg.NewClient(client) + sender := message.NewSender(raw) + return fn(runCtx, raw, sender) + }) +} + +func (c *Client) withPeer(ctx context.Context, fn func(ctx context.Context, api *tg.Client, sender *message.Sender, peer tg.InputPeerClass, botID int64) error) error { + return c.withClient(ctx, func(runCtx context.Context, api *tg.Client, sender *message.Sender) error { + peer, botID, err := c.resolvePeer(runCtx, sender) + if err != nil { + return err + } + return fn(runCtx, api, sender, peer, botID) + }) +} + +func (c *Client) ensureAuthorized(ctx context.Context, client *telegram.Client) error { + status, err := client.Auth().Status(ctx) + if err != nil { + return fmt.Errorf("check auth status: %w", err) + } + if !status.Authorized { + return errors.New("telegram session is not authorized; provide TELEGRAM_SESSION_STRING") + } + return nil +} + +func (c *Client) resolvePeer(ctx context.Context, sender *message.Sender) (tg.InputPeerClass, int64, error) { + builder := sender.Resolve(c.cfg.BotUsername) + peer, err := builder.AsInputPeer(ctx) + if err != nil { + return nil, 0, fmt.Errorf("resolve bot peer: %w", err) + } + + inputUser, err := builder.AsInputUser(ctx) + if err != nil { + return nil, 0, fmt.Errorf("resolve bot user: %w", err) + } + + return peer, inputUser.UserID, nil +} + +func (c *Client) latestIncomingMessageID(ctx context.Context, api *tg.Client, peer tg.InputPeerClass, botID int64) (int, error) { + resp, err := api.MessagesGetHistory(ctx, &tg.MessagesGetHistoryRequest{ + Peer: peer, + Limit: 5, + }) + if err != nil { + return 0, err + } + + messages, err := extractMessages(resp) + if err != nil { + return 0, err + } + + last := 0 + for _, msg := range messages { + if isFromBot(msg, botID) && msg.ID > last { + last = msg.ID + } + } + return last, nil +} + +func (c *Client) waitForMessage(ctx context.Context, api *tg.Client, peer tg.InputPeerClass, botID int64, lastID int, timeout time.Duration) (*tg.Message, error) { + deadline := time.Now().Add(timeout) + for { + if err := ctx.Err(); err != nil { + return nil, err + } + + resp, err := api.MessagesGetHistory(ctx, &tg.MessagesGetHistoryRequest{ + Peer: peer, + Limit: 5, + }) + if err != nil { + return nil, err + } + + messages, err := extractMessages(resp) + if err != nil { + return nil, err + } + + for _, msg := range messages { + if msg.ID > lastID && isFromBot(msg, botID) { + return msg, nil + } + } + + if time.Now().After(deadline) { + return nil, fmt.Errorf("timeout waiting for bot response") + } + + if err := sleepWithContext(ctx, 500*time.Millisecond); err != nil { + return nil, err + } + } +} + +func (c *Client) SendCommand(ctx context.Context, command string, useCache bool) (string, error) { + cacheKey := fmt.Sprintf("cmd:%s", command) + if useCache { + if cached, ok := c.cache.Get(cacheKey); ok { + return cached, nil + } + } + + if err := c.limiter.Wait(ctx); err != nil { + return "", err + } + + var response string + err := c.withPeer(ctx, func(runCtx context.Context, api *tg.Client, sender *message.Sender, peer tg.InputPeerClass, botID int64) error { + lastID, err := c.latestIncomingMessageID(runCtx, api, peer, botID) + if err != nil { + return err + } + + if _, err := sender.Resolve(c.cfg.BotUsername).Text(runCtx, command); err != nil { + return fmt.Errorf("send command: %w", err) + } + + msg, err := c.waitForMessage(runCtx, api, peer, botID, lastID, 15*time.Second) + if err != nil { + return err + } + + response = strings.TrimSpace(msg.Message) + return nil + }) + if err != nil { + return "", err + } + + if useCache { + c.cache.Set(cacheKey, response) + } + + return response, nil +} + +func (c *Client) SendCommandMessage(ctx context.Context, command string, timeout time.Duration) (*tg.Message, error) { + if err := c.limiter.Wait(ctx); err != nil { + return nil, err + } + + var result *tg.Message + err := c.withPeer(ctx, func(runCtx context.Context, api *tg.Client, sender *message.Sender, peer tg.InputPeerClass, botID int64) error { + lastID, err := c.latestIncomingMessageID(runCtx, api, peer, botID) + if err != nil { + return err + } + + if _, err := sender.Resolve(c.cfg.BotUsername).Text(runCtx, command); err != nil { + return fmt.Errorf("send command: %w", err) + } + + msg, err := c.waitForMessage(runCtx, api, peer, botID, lastID, timeout) + if err != nil { + return err + } + + result = msg + return nil + }) + if err != nil { + return nil, err + } + + return result, nil +} + +func (c *Client) PressButton(ctx context.Context, msg *tg.Message, keyword string) (*tg.Message, error) { + if msg == nil { + return nil, errors.New("message cannot be nil") + } + + if err := c.limiter.Wait(ctx); err != nil { + return nil, err + } + + normalizedKeyword := strings.ToLower(keyword) + + var updated *tg.Message + err := c.withPeer(ctx, func(runCtx context.Context, api *tg.Client, sender *message.Sender, peer tg.InputPeerClass, botID int64) error { + button, err := findCallbackButton(msg, normalizedKeyword) + if err != nil { + return err + } + + req := &tg.MessagesGetBotCallbackAnswerRequest{ + Peer: peer, + MsgID: msg.ID, + } + if len(button.Data) > 0 { + req.SetData(button.Data) + } + + invoke := func() error { + _, err := api.MessagesGetBotCallbackAnswer(runCtx, req) + return err + } + + if err := invoke(); err != nil { + if wait, ok := telegram.AsFloodWait(err); ok { + if err := sleepWithContext(runCtx, wait+time.Second); err != nil { + return err + } + if err := invoke(); err != nil { + return fmt.Errorf("callback retry failed: %w", err) + } + } else { + return fmt.Errorf("press callback button: %w", err) + } + } + + if err := sleepWithContext(runCtx, 1200*time.Millisecond); err != nil { + return err + } + + resp, err := api.MessagesGetMessages(runCtx, []tg.InputMessageClass{ + &tg.InputMessageID{ID: msg.ID}, + }) + if err != nil { + return fmt.Errorf("fetch updated message: %w", err) + } + + refreshed, err := extractMessageByID(resp, msg.ID) + if err != nil { + return err + } + updated = refreshed + return nil + }) + if err != nil { + return nil, err + } + + return updated, nil +} + +func (c *Client) FetchUserMessages(ctx context.Context, identifier string, maxPages *int) (string, error) { + id := strings.TrimSpace(identifier) + if id == "" { + return "", errors.New("identifier cannot be empty") + } + + if !strings.HasPrefix(id, "/") { + if !strings.HasPrefix(id, "@") && !isNumericIdentifier(id) { + id = "@" + id + } + } + + command := id + if !strings.HasPrefix(command, "/") { + command = fmt.Sprintf("/user_info %s", id) + } + + base, err := c.SendCommandMessage(ctx, command, 20*time.Second) + if err != nil { + return "", err + } + + stage, err := c.PressButton(ctx, base, "messages") + if err != nil { + return "", err + } + + current, err := c.PressButton(ctx, stage, "all") + if err != nil { + return "", err + } + + pages := make([]string, 0) + seen := map[string]struct{}{} + currentPage := 1 + totalPages := extractTotalPages(current) + + var limit int + if maxPages != nil { + if *maxPages <= 0 { + return "", errors.New("maxPages must be greater than zero") + } + limit = *maxPages + } + + for { + text := strings.TrimSpace(current.Message) + if text != "" { + if _, ok := seen[text]; !ok { + header := fmt.Sprintf("第 %d 页", currentPage) + if totalPages > 0 { + header = fmt.Sprintf("%s/%d", header, totalPages) + } + entry := strings.Join([]string{header, "", text}, "\n") + pages = append(pages, entry) + seen[text] = struct{}{} + } + } + + if limit > 0 && currentPage >= limit { + break + } + + updated, err := c.PressButton(ctx, current, "➡") + if err != nil { + break + } + + newText := strings.TrimSpace(updated.Message) + if _, ok := seen[newText]; ok { + break + } + + current = updated + currentPage++ + } + + if len(pages) == 0 { + return fmt.Sprintf("未找到 %s 的消息记录。", identifier), nil + } + + summary := fmt.Sprintf("共收集 %d 页消息", len(pages)) + if totalPages > 0 { + summary = fmt.Sprintf("%s(存在 %d 页)", summary, totalPages) + } + + result := append([]string{summary, ""}, pages...) + return strings.Join(result, "\n\n"), nil +} + +func isNumericIdentifier(value string) bool { + for _, r := range value { + if r != '+' && (r < '0' || r > '9') { + return false + } + } + return value != "" +} diff --git a/internal/telegram/config.go b/internal/telegram/config.go new file mode 100644 index 0000000..83f8141 --- /dev/null +++ b/internal/telegram/config.go @@ -0,0 +1,60 @@ +package telegram + +import ( + "fmt" + "time" +) + +type ProxyConfig struct { + Type string + Host string + Port int + Username string + Password string +} + +type Config struct { + APIID int + APIHash string + BotUsername string + SessionString string + SessionStorage string + RateLimit int + RateLimitWindow time.Duration + CacheTTL time.Duration + Proxy *ProxyConfig +} + +func (c Config) Validate() error { + if c.APIID == 0 { + return fmt.Errorf("APIID must be provided") + } + if c.APIHash == "" { + return fmt.Errorf("APIHash must be provided") + } + if c.BotUsername == "" { + return fmt.Errorf("BotUsername must be provided") + } + return nil +} + +func (c Config) RateLimitPerSecond() int { + if c.RateLimit <= 0 { + return 18 + } + return c.RateLimit +} + +func (c Config) RateLimitDuration() time.Duration { + if c.RateLimitWindow <= 0 { + return time.Second + } + return c.RateLimitWindow +} + +func (c Config) CacheDuration() time.Duration { + if c.CacheTTL <= 0 { + return time.Hour + } + return c.CacheTTL +} diff --git a/internal/telegram/messages.go b/internal/telegram/messages.go new file mode 100644 index 0000000..724268d --- /dev/null +++ b/internal/telegram/messages.go @@ -0,0 +1,63 @@ +package telegram + +import ( + "fmt" + + "github.com/gotd/td/tg" +) + +func extractMessages(resp tg.MessagesMessagesClass) ([]*tg.Message, error) { + switch v := resp.(type) { + case *tg.MessagesMessages: + return filterMessages(v.Messages), nil + case *tg.MessagesMessagesSlice: + return filterMessages(v.Messages), nil + case *tg.MessagesChannelMessages: + return filterMessages(v.Messages), nil + case *tg.MessagesMessagesNotModified: + return nil, fmt.Errorf("messages not modified") + default: + return nil, fmt.Errorf("unsupported response type %T", resp) + } +} + +func filterMessages(values []tg.MessageClass) []*tg.Message { + result := make([]*tg.Message, 0, len(values)) + for _, m := range values { + if msg, ok := m.(*tg.Message); ok { + result = append(result, msg) + } + } + return result +} + +func extractMessageByID(resp tg.MessagesMessagesClass, id int) (*tg.Message, error) { + messages, err := extractMessages(resp) + if err != nil { + return nil, err + } + for _, msg := range messages { + if msg.ID == id { + return msg, nil + } + } + return nil, fmt.Errorf("message %d not found", id) +} + +func isFromBot(msg *tg.Message, botID int64) bool { + if msg == nil || msg.Out { + return false + } + + if peer, ok := msg.GetPeerID().(*tg.PeerUser); ok && peer.UserID == botID { + return true + } + + if fromClass, ok := msg.GetFromID(); ok { + if from, ok := fromClass.(*tg.PeerUser); ok && from.UserID == botID { + return true + } + } + + return false +} diff --git a/internal/telegram/normalize.go b/internal/telegram/normalize.go new file mode 100644 index 0000000..dc56c61 --- /dev/null +++ b/internal/telegram/normalize.go @@ -0,0 +1,45 @@ +package telegram + +import "strings" + +var buttonTextTranslations = map[rune]rune{ + 'ƒ': 'f', + 'Μ': 'M', + 'τ': 't', + 'ѕ': 's', + 'η': 'n', + 'Ғ': 'F', + 'α': 'a', + 'ο': 'o', + 'ᴜ': 'u', + 'о': 'o', + 'е': 'e', + 'с': 'c', + '℮': 'e', + 'Τ': 'T', + 'ρ': 'p', + 'Δ': 'D', + 'χ': 'x', + 'β': 'b', + 'λ': 'l', + 'γ': 'y', + 'Ν': 'N', + 'μ': 'm', + 'ψ': 'y', + 'Α': 'A', + 'Ρ': 'P', + 'С': 'C', + 'ё': 'e', + 'ł': 'l', + 'Ł': 'L', + 'ց': 'g', +} + +func normalizeButtonText(text string) string { + return strings.Map(func(r rune) rune { + if mapped, ok := buttonTextTranslations[r]; ok { + return mapped + } + return r + }, text) +} diff --git a/internal/telegram/util.go b/internal/telegram/util.go new file mode 100644 index 0000000..6c89c4b --- /dev/null +++ b/internal/telegram/util.go @@ -0,0 +1,22 @@ +package telegram + +import ( + "context" + "time" +) + +func sleepWithContext(ctx context.Context, d time.Duration) error { + if d <= 0 { + return nil + } + + timer := time.NewTimer(d) + defer timer.Stop() + + select { + case <-ctx.Done(): + return ctx.Err() + case <-timer.C: + return nil + } +} diff --git a/internal/transport/server.go b/internal/transport/server.go new file mode 100644 index 0000000..6f18aeb --- /dev/null +++ b/internal/transport/server.go @@ -0,0 +1,277 @@ +package transport + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "log" + "net/http" + "strings" + "sync" + "time" + + "funstatmcp/internal/app" +) + +type Server struct { + app *app.App + config app.Config + + subscribersMu sync.Mutex + subscribers map[int]chan []byte + nextID int +} + +func NewServer(appInstance *app.App, cfg app.Config) *Server { + return &Server{ + app: appInstance, + config: cfg, + subscribers: make(map[int]chan []byte), + } +} + +func (s *Server) Run(ctx context.Context) error { + mux := http.NewServeMux() + mux.HandleFunc("/sse", s.handleSSE) + mux.HandleFunc("/messages", s.handleMessages) + mux.HandleFunc("/health", s.handleHealth) + + server := &http.Server{ + Addr: fmt.Sprintf("%s:%d", s.config.Host, s.config.Port), + Handler: s.corsMiddleware(mux), + } + + go func() { + <-ctx.Done() + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := server.Shutdown(shutdownCtx); err != nil { + log.Printf("HTTP server shutdown error: %v", err) + } + }() + + log.Printf("Funstat MCP Go server listening on http://%s:%d", s.config.Host, s.config.Port) + if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + return err + } + + return nil +} + +func (s *Server) corsMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Headers", "Content-Type, X-MCP-Session-ID") + w.Header().Set("Access-Control-Allow-Methods", "POST, GET, OPTIONS") + + if r.Method == http.MethodOptions { + w.WriteHeader(http.StatusNoContent) + return + } + + next.ServeHTTP(w, r) + }) +} + +func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _ = json.NewEncoder(w).Encode(map[string]any{ + "status": "ok", + "server": "funstat-mcp-go", + }) +} + +func (s *Server) handleSSE(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "streaming unsupported", http.StatusInternalServerError) + return + } + + subscriber := make(chan []byte, 16) + id := s.addSubscriber(subscriber) + defer s.removeSubscriber(id) + + heartbeatTicker := time.NewTicker(15 * time.Second) + defer heartbeatTicker.Stop() + + ctx := r.Context() + + for { + select { + case <-ctx.Done(): + return + case <-heartbeatTicker.C: + fmt.Fprint(w, ": ping\n\n") + flusher.Flush() + case data := <-subscriber: + fmt.Fprintf(w, "event: message\n") + fmt.Fprintf(w, "data: %s\n\n", data) + flusher.Flush() + } + } +} + +func (s *Server) addSubscriber(ch chan []byte) int { + s.subscribersMu.Lock() + defer s.subscribersMu.Unlock() + id := s.nextID + s.nextID++ + s.subscribers[id] = ch + return id +} + +func (s *Server) removeSubscriber(id int) { + s.subscribersMu.Lock() + defer s.subscribersMu.Unlock() + delete(s.subscribers, id) +} + +func (s *Server) broadcast(payload []byte) { + s.subscribersMu.Lock() + defer s.subscribersMu.Unlock() + for _, ch := range s.subscribers { + select { + case ch <- payload: + default: + } + } +} + +type jsonRPCRequest struct { + JSONRPC string `json:"jsonrpc"` + ID any `json:"id"` + Method string `json:"method"` + Params json.RawMessage `json:"params"` +} + +type jsonRPCResponse struct { + JSONRPC string `json:"jsonrpc"` + ID any `json:"id"` + Result any `json:"result,omitempty"` + Error *jsonRPCError `json:"error,omitempty"` +} + +type jsonRPCError struct { + Code int `json:"code"` + Message string `json:"message"` +} + +func (s *Server) handleMessages(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + var request jsonRPCRequest + if err := json.NewDecoder(r.Body).Decode(&request); err != nil { + writeError(w, http.StatusBadRequest, fmt.Errorf("invalid JSON: %w", err)) + return + } + + response := s.handleRequest(r.Context(), request) + + var buffer bytes.Buffer + if err := json.NewEncoder(&buffer).Encode(response); err != nil { + writeError(w, http.StatusInternalServerError, err) + return + } + + payload := bytes.TrimSpace(buffer.Bytes()) + s.broadcast(payload) + + w.Header().Set("Content-Type", "application/json") + w.Write(payload) +} + +func (s *Server) handleRequest(ctx context.Context, req jsonRPCRequest) jsonRPCResponse { + response := jsonRPCResponse{ + JSONRPC: "2.0", + ID: req.ID, + } + + if strings.TrimSpace(req.JSONRPC) != "2.0" { + response.Error = &jsonRPCError{Code: -32600, Message: "invalid jsonrpc version"} + return response + } + + switch req.Method { + case "initialize": + response.Result = s.initializeResult() + case "list_tools": + response.Result = map[string]any{ + "tools": app.ToolDefinitions(), + } + case "call_tool": + var payload struct { + Name string `json:"name"` + Arguments map[string]any `json:"arguments"` + } + if err := json.Unmarshal(req.Params, &payload); err != nil { + response.Error = &jsonRPCError{Code: -32602, Message: "invalid params"} + return response + } + + if payload.Arguments == nil { + payload.Arguments = make(map[string]any) + } + + result, err := s.app.CallTool(ctx, payload.Name, payload.Arguments) + if err != nil { + response.Result = map[string]any{ + "content": []map[string]string{ + { + "type": "text", + "text": fmt.Sprintf("❌ 错误: %s", err.Error()), + }, + }, + } + return response + } + + response.Result = map[string]any{ + "content": []map[string]string{ + { + "type": "text", + "text": result, + }, + }, + } + default: + response.Error = &jsonRPCError{Code: -32601, Message: "method not found"} + } + + return response +} + +func (s *Server) initializeResult() map[string]any { + return map[string]any{ + "protocolVersion": "2025-03-26", + "capabilities": map[string]any{ + "tools": map[string]any{}, + }, + "serverInfo": map[string]any{ + "name": "funstat-mcp-go", + "version": "1.0.0", + }, + } +} + +func writeError(w http.ResponseWriter, status int, err error) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + _ = json.NewEncoder(w).Encode(map[string]any{ + "error": err.Error(), + }) +} diff --git a/scripts/start_server.sh b/scripts/start_server.sh new file mode 100755 index 0000000..dfeeeb9 --- /dev/null +++ b/scripts/start_server.sh @@ -0,0 +1,10 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +PROJECT_ROOT="$(cd "${SCRIPT_DIR}/.." && pwd)" + +cd "$PROJECT_ROOT" + +echo "Starting Funstat MCP Go server..." +go run ./cmd/funstat-mcp "$@"