129 lines
4.4 KiB
Python
129 lines
4.4 KiB
Python
import asyncio
|
|
import json
|
|
import logging
|
|
import tempfile
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import List
|
|
|
|
from .config import Settings
|
|
from .storage import StorageManager
|
|
|
|
|
|
logger = logging.getLogger("funstat_uploader")
|
|
|
|
|
|
class RemoteUploader:
|
|
def __init__(self, storage: StorageManager, settings: Settings):
|
|
self.storage = storage
|
|
self.settings = settings
|
|
self.enabled = (
|
|
settings.remote_upload_enabled
|
|
and settings.remote_ssh_host
|
|
and settings.remote_ssh_user
|
|
and settings.remote_ssh_password
|
|
)
|
|
self.interval = max(settings.remote_upload_interval, 30)
|
|
self.batch_size = max(settings.remote_upload_batch_size, 10)
|
|
self._task: asyncio.Task | None = None
|
|
self._ensure_task: asyncio.Task | None = None
|
|
|
|
async def start(self):
|
|
if not self.enabled or self._task:
|
|
return
|
|
|
|
await self._ensure_remote_dir()
|
|
self._task = asyncio.create_task(self._run_loop())
|
|
|
|
async def _run_loop(self):
|
|
while True:
|
|
try:
|
|
await self._process_batch()
|
|
except Exception as exc:
|
|
logger.error("远程上传任务异常: %s", exc, exc_info=exc)
|
|
await asyncio.sleep(self.interval)
|
|
|
|
async def _process_batch(self):
|
|
results = await self.storage.fetch_unsynced_results(self.batch_size)
|
|
if not results:
|
|
return
|
|
|
|
batch_id = uuid.uuid4().hex
|
|
tmp_path = await self._write_payload(batch_id, results)
|
|
try:
|
|
uploaded = await self._upload_file(tmp_path, batch_id)
|
|
ids = [item["id"] for item in results]
|
|
if uploaded:
|
|
await self.storage.mark_synced(ids, batch_id)
|
|
logger.info("成功同步 %s 条记录到远程服务器 (batch=%s)", len(ids), batch_id)
|
|
else:
|
|
await self.storage.mark_failed_sync(ids)
|
|
logger.warning("同步失败,已记录失败次数 (batch=%s)", batch_id)
|
|
finally:
|
|
if tmp_path.exists():
|
|
tmp_path.unlink(missing_ok=True)
|
|
|
|
async def _write_payload(self, batch_id: str, results: List[dict]) -> Path:
|
|
tmp_dir = Path(tempfile.gettempdir())
|
|
file_path = tmp_dir / f"funstat_batch_{batch_id}.json"
|
|
payload = {
|
|
"batch_id": batch_id,
|
|
"total": len(results),
|
|
"generated_at": datetime.now(timezone.utc).isoformat(),
|
|
"results": results,
|
|
}
|
|
|
|
file_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2))
|
|
return file_path
|
|
|
|
async def _ensure_remote_dir(self):
|
|
if not self.enabled:
|
|
return
|
|
|
|
target_dir = self.settings.remote_ssh_target
|
|
command = (
|
|
f"sshpass -p '{self.settings.remote_ssh_password}' "
|
|
f"ssh -o StrictHostKeyChecking=no "
|
|
f"{self.settings.remote_ssh_user}@{self.settings.remote_ssh_host} "
|
|
f"'mkdir -p {target_dir}'"
|
|
)
|
|
process = await asyncio.create_subprocess_shell(
|
|
command,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
stdout, stderr = await process.communicate()
|
|
if process.returncode != 0:
|
|
logger.warning(
|
|
"创建远程目录失败: %s",
|
|
stderr.decode().strip() or stdout.decode().strip()
|
|
)
|
|
|
|
async def _upload_file(self, file_path: Path, batch_id: str) -> bool:
|
|
if not self.enabled:
|
|
return False
|
|
|
|
remote_name = f"{batch_id}.json"
|
|
command = (
|
|
f"sshpass -p '{self.settings.remote_ssh_password}' "
|
|
f"scp -o StrictHostKeyChecking=no {file_path} "
|
|
f"{self.settings.remote_ssh_user}@{self.settings.remote_ssh_host}:"
|
|
f"{self.settings.remote_ssh_target}/{remote_name}"
|
|
)
|
|
process = await asyncio.create_subprocess_shell(
|
|
command,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
stdout, stderr = await process.communicate()
|
|
if process.returncode != 0:
|
|
logger.error(
|
|
"上传批次 %s 失败: %s",
|
|
batch_id,
|
|
stderr.decode().strip() or stdout.decode().strip()
|
|
)
|
|
return False
|
|
|
|
return True
|