import asyncio import json import logging import tempfile import uuid from datetime import datetime, timezone from pathlib import Path from typing import List from .config import Settings from .storage import StorageManager logger = logging.getLogger("funstat_uploader") class RemoteUploader: def __init__(self, storage: StorageManager, settings: Settings): self.storage = storage self.settings = settings self.enabled = ( settings.remote_upload_enabled and settings.remote_ssh_host and settings.remote_ssh_user and settings.remote_ssh_password ) self.interval = max(settings.remote_upload_interval, 30) self.batch_size = max(settings.remote_upload_batch_size, 10) self._task: asyncio.Task | None = None self._ensure_task: asyncio.Task | None = None async def start(self): if not self.enabled or self._task: return await self._ensure_remote_dir() self._task = asyncio.create_task(self._run_loop()) async def _run_loop(self): while True: try: await self._process_batch() except Exception as exc: logger.error("远程上传任务异常: %s", exc, exc_info=exc) await asyncio.sleep(self.interval) async def _process_batch(self): results = await self.storage.fetch_unsynced_results(self.batch_size) if not results: return batch_id = uuid.uuid4().hex tmp_path = await self._write_payload(batch_id, results) try: uploaded = await self._upload_file(tmp_path, batch_id) ids = [item["id"] for item in results] if uploaded: await self.storage.mark_synced(ids, batch_id) logger.info("成功同步 %s 条记录到远程服务器 (batch=%s)", len(ids), batch_id) else: await self.storage.mark_failed_sync(ids) logger.warning("同步失败,已记录失败次数 (batch=%s)", batch_id) finally: if tmp_path.exists(): tmp_path.unlink(missing_ok=True) async def _write_payload(self, batch_id: str, results: List[dict]) -> Path: tmp_dir = Path(tempfile.gettempdir()) file_path = tmp_dir / f"funstat_batch_{batch_id}.json" payload = { "batch_id": batch_id, "total": len(results), "generated_at": datetime.now(timezone.utc).isoformat(), "results": results, } file_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2)) return file_path async def _ensure_remote_dir(self): if not self.enabled: return target_dir = self.settings.remote_ssh_target command = ( f"sshpass -p '{self.settings.remote_ssh_password}' " f"ssh -o StrictHostKeyChecking=no " f"{self.settings.remote_ssh_user}@{self.settings.remote_ssh_host} " f"'mkdir -p {target_dir}'" ) process = await asyncio.create_subprocess_shell( command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await process.communicate() if process.returncode != 0: logger.warning( "创建远程目录失败: %s", stderr.decode().strip() or stdout.decode().strip() ) async def _upload_file(self, file_path: Path, batch_id: str) -> bool: if not self.enabled: return False remote_name = f"{batch_id}.json" command = ( f"sshpass -p '{self.settings.remote_ssh_password}' " f"scp -o StrictHostKeyChecking=no {file_path} " f"{self.settings.remote_ssh_user}@{self.settings.remote_ssh_host}:" f"{self.settings.remote_ssh_target}/{remote_name}" ) process = await asyncio.create_subprocess_shell( command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await process.communicate() if process.returncode != 0: logger.error( "上传批次 %s 失败: %s", batch_id, stderr.decode().strip() or stdout.decode().strip() ) return False return True