From 4dc8e35bd5f90e5bce2f401b471cdcf74326bd27 Mon Sep 17 00:00:00 2001 From: Galaxy Date: Tue, 11 Nov 2025 15:20:18 +0800 Subject: [PATCH] demo0.01 --- qflow/__init__.py | 5 ++ qflow/app.py | 72 +++++++++++++++++ qflow/config.py | 137 ++++++++++++++++++++++++++++++++ qflow/db.py | 111 ++++++++++++++++++++++++++ qflow/emulator.py | 89 +++++++++++++++++++++ qflow/models.py | 43 +++++++++++ qflow/qb.py | 69 +++++++++++++++++ qflow/scheduler.py | 189 +++++++++++++++++++++++++++++++++++++++++++++ qflow/uploader.py | 109 ++++++++++++++++++++++++++ 9 files changed, 824 insertions(+) create mode 100644 qflow/__init__.py create mode 100644 qflow/app.py create mode 100644 qflow/config.py create mode 100644 qflow/db.py create mode 100644 qflow/emulator.py create mode 100644 qflow/models.py create mode 100644 qflow/qb.py create mode 100644 qflow/scheduler.py create mode 100644 qflow/uploader.py diff --git a/qflow/__init__.py b/qflow/__init__.py new file mode 100644 index 0000000..aaa1214 --- /dev/null +++ b/qflow/__init__.py @@ -0,0 +1,5 @@ +"""Core package for the QFlow demo implementation.""" + +__all__ = [ + "app", +] diff --git a/qflow/app.py b/qflow/app.py new file mode 100644 index 0000000..fa9f779 --- /dev/null +++ b/qflow/app.py @@ -0,0 +1,72 @@ +from __future__ import annotations + +import logging +import os +import threading +import time +from pathlib import Path + +from .config import AppConfig, load_config +from .db import UploadQueue +from .qb import QBClient +from .scheduler import Scheduler +from .uploader import Uploader + + +def _setup_logging(log_dir: Path) -> logging.Logger: + log_dir.mkdir(parents=True, exist_ok=True) + log_level = os.environ.get("QFLOW_LOG_LEVEL", "INFO").upper() + logging.basicConfig( + level=getattr(logging, log_level, logging.INFO), + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + ) + logger = logging.getLogger("qflow") + fh = logging.FileHandler(log_dir / "qflow.log") + fh.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s")) + logger.addHandler(fh) + return logger + + +def ensure_paths(cfg: AppConfig) -> None: + cfg.paths.download_dir.mkdir(parents=True, exist_ok=True) + cfg.paths.log_dir.mkdir(parents=True, exist_ok=True) + cfg.paths.database_path.parent.mkdir(parents=True, exist_ok=True) + + +def run() -> None: + cfg = load_config() + ensure_paths(cfg) + logger = _setup_logging(cfg.paths.log_dir) + logger.info("Starting QFlow controller") + stop_event = threading.Event() + + queue = UploadQueue(cfg.paths.database_path) + qb = QBClient(cfg.qbittorrent, logger.getChild("qb")) + + scheduler = Scheduler(cfg, qb, queue, stop_event, logger.getChild("scheduler")) + scheduler.start() + + uploaders = [ + Uploader(cfg, queue, stop_event, logger, idx) + for idx in range(1, cfg.uploader.threads + 1) + ] + for worker in uploaders: + worker.start() + + logger.info("QFlow initialized with %s uploader threads", len(uploaders)) + + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + logger.info("Shutdown requested") + finally: + stop_event.set() + scheduler.join(timeout=5) + for worker in uploaders: + worker.join(timeout=5) + logger.info("Shutdown complete") + + +if __name__ == "__main__": # pragma: no cover + run() diff --git a/qflow/config.py b/qflow/config.py new file mode 100644 index 0000000..b994936 --- /dev/null +++ b/qflow/config.py @@ -0,0 +1,137 @@ +from __future__ import annotations + +import os +from dataclasses import dataclass +from pathlib import Path +from typing import Optional + + +@dataclass(slots=True) +class QBittorrentConfig: + host: str + port: int + username: str + password: str + use_https: bool + + +@dataclass(slots=True) +class SchedulerConfig: + poll_seconds: int + safe_margin_gb: int + anchors_per_batch: int + filler_limit: int + cold_start_small_limit: int + stall_percent: float + stall_minutes: int + stall_resume_minutes: int + + +@dataclass(slots=True) +class UploaderConfig: + remote: str + root_template: str + threads: int + rclone_path: str + rclone_config: Optional[Path] + transfers: int + checkers: int + bwlimit_up: Optional[str] + drive_chunk_size: Optional[str] + retries: int + low_level_retries: int + ionice_class: Optional[str] + ionice_level: Optional[str] + nice_level: Optional[str] + + +@dataclass(slots=True) +class AppPaths: + download_dir: Path + database_path: Path + log_dir: Path + + +@dataclass(slots=True) +class AppConfig: + qbittorrent: QBittorrentConfig + scheduler: SchedulerConfig + uploader: UploaderConfig + paths: AppPaths + + +def _env(key: str, default: Optional[str] = None) -> Optional[str]: + return os.environ.get(key, default) + + +def _env_int(key: str, default: int) -> int: + try: + return int(_env(key, str(default))) + except (TypeError, ValueError): + return default + + +def _env_float(key: str, default: float) -> float: + try: + return float(_env(key, str(default))) + except (TypeError, ValueError): + return default + + +def _env_path(key: str, default: Path) -> Path: + raw = _env(key) + return Path(raw).expanduser() if raw else default + + +def load_config() -> AppConfig: + state_dir = _env_path("QFLOW_STATE_DIR", Path("./state")) + log_dir = _env_path("QFLOW_LOG_DIR", state_dir / "logs") + db_path = _env_path("QFLOW_DB_PATH", state_dir / "qflow.db") + download_dir = _env_path("QFLOW_DOWNLOAD_DIR", Path("/downloads")) + + qb = QBittorrentConfig( + host=_env("QFLOW_QBIT_HOST", "http://qbittorrent:8080"), + port=_env_int("QFLOW_QBIT_PORT", 8080), + username=_env("QFLOW_QBIT_USER", "admin"), + password=_env("QFLOW_QBIT_PASS", "adminadmin"), + use_https=_env("QFLOW_QBIT_HTTPS", "false").lower() == "true", + ) + + scheduler = SchedulerConfig( + poll_seconds=_env_int("QFLOW_SCHED_POLL", 15), + safe_margin_gb=_env_int("QFLOW_SAFE_MARGIN_GB", 20), + anchors_per_batch=_env_int("QFLOW_ANCHORS", 1), + filler_limit=_env_int("QFLOW_FILLERS", 4), + cold_start_small_limit=_env_int("QFLOW_COLD_SMALL", 3), + stall_percent=_env_float("QFLOW_STALL_PCT", 85.0), + stall_minutes=_env_int("QFLOW_STALL_MIN", 5), + stall_resume_minutes=_env_int("QFLOW_STALL_RESUME_MIN", 2), + ) + + uploader = UploaderConfig( + remote=_env("QFLOW_REMOTE", "gcrypt:"), + root_template=_env("QFLOW_REMOTE_TEMPLATE", "{year}/{month:02d}"), + threads=_env_int("QFLOW_UPLOAD_THREADS", 2), + rclone_path=_env("QFLOW_RCLONE_BIN", "rclone"), + rclone_config=_env_path("QFLOW_RCLONE_CONFIG", Path("/config/rclone/rclone.conf")), + transfers=_env_int("QFLOW_RCLONE_TRANSFERS", 8), + checkers=_env_int("QFLOW_RCLONE_CHECKERS", 16), + bwlimit_up=_env("QFLOW_RCLONE_BWLIMIT", None), + drive_chunk_size=_env("QFLOW_DRIVE_CHUNK", "128M"), + retries=_env_int("QFLOW_RCLONE_RETRIES", 3), + low_level_retries=_env_int("QFLOW_RCLONE_LL_RETRIES", 10), + ionice_class=_env("QFLOW_IONICE_CLASS", "2"), + ionice_level=_env("QFLOW_IONICE_LEVEL", "7"), + nice_level=_env("QFLOW_NICE_LEVEL", "10"), + ) + + return AppConfig( + qbittorrent=qb, + scheduler=scheduler, + uploader=uploader, + paths=AppPaths( + download_dir=download_dir, + database_path=db_path, + log_dir=log_dir, + ), + ) diff --git a/qflow/db.py b/qflow/db.py new file mode 100644 index 0000000..744dba7 --- /dev/null +++ b/qflow/db.py @@ -0,0 +1,111 @@ +from __future__ import annotations + +import sqlite3 +import threading +import time +from pathlib import Path +from typing import Iterable, List + +from .models import UploadTask + + +class UploadQueue: + def __init__(self, db_path: Path): + self.db_path = db_path + db_path.parent.mkdir(parents=True, exist_ok=True) + self._conn = sqlite3.connect(db_path, check_same_thread=False) + self._conn.row_factory = sqlite3.Row + self._lock = threading.Lock() + self._init_db() + + def _init_db(self) -> None: + with self._conn: + self._conn.execute( + """ + CREATE TABLE IF NOT EXISTS upload_queue ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + torrent_hash TEXT NOT NULL, + file_index INTEGER NOT NULL, + path TEXT NOT NULL UNIQUE, + size INTEGER NOT NULL, + status TEXT NOT NULL DEFAULT 'pending', + tries INTEGER NOT NULL DEFAULT 0, + last_error TEXT, + updated_at REAL NOT NULL + ) + """ + ) + + def enqueue(self, torrent_hash: str, file_index: int, path: Path, size: int) -> bool: + now = time.time() + with self._lock, self._conn: + cur = self._conn.execute( + """ + INSERT OR IGNORE INTO upload_queue (torrent_hash, file_index, path, size, updated_at) + VALUES (?, ?, ?, ?, ?) + """, + (torrent_hash, file_index, str(path), size, now), + ) + return cur.rowcount > 0 + + def lease(self, limit: int) -> List[UploadTask]: + now = time.time() + with self._lock, self._conn: + rows = list( + self._conn.execute( + "SELECT * FROM upload_queue WHERE status='pending' ORDER BY id LIMIT ?", + (limit,), + ) + ) + ids = [row["id"] for row in rows] + if ids: + placeholders = ",".join("?" for _ in ids) + self._conn.execute( + f"UPDATE upload_queue SET status='in_progress', updated_at=? WHERE id IN ({placeholders})", + (now, *ids), + ) + return [self._row_to_task(row) for row in rows] + + def mark_done(self, task_id: int) -> None: + with self._lock, self._conn: + self._conn.execute( + "UPDATE upload_queue SET status='done', updated_at=? WHERE id=?", + (time.time(), task_id), + ) + + def mark_failed(self, task_id: int, error: str) -> None: + now = time.time() + with self._lock, self._conn: + self._conn.execute( + """ + UPDATE upload_queue + SET status='pending', tries=tries+1, last_error=?, updated_at=? + WHERE id=? + """, + (error[:400], now, task_id), + ) + + def pending_count(self) -> int: + with self._lock: + cur = self._conn.execute("SELECT COUNT(*) FROM upload_queue WHERE status='pending'") + (count,) = cur.fetchone() + return int(count) + + def _row_to_task(self, row: sqlite3.Row) -> UploadTask: + return UploadTask( + id=row["id"], + torrent_hash=row["torrent_hash"], + file_index=row["file_index"], + path=Path(row["path"]), + size=row["size"], + tries=row["tries"], + last_error=row["last_error"], + ) + + def pending_items(self, limit: int = 50) -> Iterable[UploadTask]: + rows = self._conn.execute( + "SELECT * FROM upload_queue WHERE status='pending' ORDER BY id LIMIT ?", + (limit,), + ) + for row in rows: + yield self._row_to_task(row) diff --git a/qflow/emulator.py b/qflow/emulator.py new file mode 100644 index 0000000..7da65d8 --- /dev/null +++ b/qflow/emulator.py @@ -0,0 +1,89 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from pathlib import Path +from typing import Dict, List + +from .models import FileState + + +@dataclass +class FakeFile: + torrent_hash: str + torrent_name: str + file_index: int + name: str + size: int + downloaded: int + priority: int = 0 + save_path: Path = Path("/downloads") + + def to_state(self) -> FileState: + downloaded = min(self.downloaded, self.size) + remaining = max(self.size - downloaded, 0) + progress = downloaded / self.size if self.size else 1.0 + return FileState( + torrent_hash=self.torrent_hash, + torrent_name=self.torrent_name, + file_index=self.file_index, + name=self.name, + size=self.size, + downloaded=downloaded, + progress=progress, + priority=self.priority, + remaining=remaining, + save_path=self.save_path, + ) + + def clone(self) -> FakeFile: + return FakeFile( + torrent_hash=self.torrent_hash, + torrent_name=self.torrent_name, + file_index=self.file_index, + name=self.name, + size=self.size, + downloaded=self.downloaded, + priority=self.priority, + save_path=self.save_path, + ) + + +@dataclass +class FakeTorrent: + torrent_hash: str + name: str + files: List[FakeFile] + sequential: bool = True + + +class FakeQBClient: + """In-memory emulator that mimics the parts of qBittorrent the scheduler needs.""" + + def __init__(self, torrents: List[FakeTorrent]): + self.torrents: Dict[str, FakeTorrent] = {t.torrent_hash: t for t in torrents} + self.priority_calls: List[Dict] = [] + self.sequential_calls: List[Dict] = [] + + def fetch_file_states(self) -> List[FileState]: + states: List[FileState] = [] + for torrent in self.torrents.values(): + for file in torrent.files: + states.append(file.to_state()) + return states + + def set_priority(self, torrent_hash: str, file_ids: List[int], priority: int) -> None: + torrent = self.torrents[torrent_hash] + for file_id in file_ids: + for file in torrent.files: + if file.file_index == file_id: + file.priority = priority + break + self.priority_calls.append( + {"hash": torrent_hash, "file_ids": list(file_ids), "priority": priority} + ) + + def set_sequential(self, torrent_hashes: List[str], value: bool) -> None: + for torrent_hash in torrent_hashes: + torrent = self.torrents[torrent_hash] + torrent.sequential = value + self.sequential_calls.append({"hashes": list(torrent_hashes), "value": value}) diff --git a/qflow/models.py b/qflow/models.py new file mode 100644 index 0000000..b168c31 --- /dev/null +++ b/qflow/models.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path +from typing import Optional + + +@dataclass(slots=True) +class FileState: + """Represents a single file inside a torrent.""" + + torrent_hash: str + torrent_name: str + file_index: int + name: str + size: int + downloaded: int + progress: float + priority: int + remaining: int + save_path: Path + + def full_path(self) -> Path: + return (self.save_path / self.name).resolve() + + @property + def is_complete(self) -> bool: + return self.remaining <= 0 or self.progress >= 0.9999 + + @property + def is_downloading(self) -> bool: + return not self.is_complete and self.priority > 0 + + +@dataclass(slots=True) +class UploadTask: + id: int + torrent_hash: str + file_index: int + path: Path + size: int + tries: int + last_error: Optional[str] diff --git a/qflow/qb.py b/qflow/qb.py new file mode 100644 index 0000000..8131dcd --- /dev/null +++ b/qflow/qb.py @@ -0,0 +1,69 @@ +from __future__ import annotations + +import logging +from pathlib import Path +from typing import Iterable, List + +import qbittorrentapi + +from .config import QBittorrentConfig +from .models import FileState + + +class QBClient: + def __init__(self, cfg: QBittorrentConfig, logger: logging.Logger): + self.cfg = cfg + self.log = logger + self.client = qbittorrentapi.Client( + host=cfg.host, + port=cfg.port, + username=cfg.username, + password=cfg.password, + VERIFY_WEBUI_CERTIFICATE=cfg.use_https, + ) + self.client.auth_log_in() + self.log.info("Connected to qBittorrent WebUI at %s", cfg.host) + + def fetch_file_states(self) -> List[FileState]: + torrents = self.client.torrents_info() + files: List[FileState] = [] + for torrent in torrents: + save_path = torrent.save_path + torrent_files = self.client.torrents_files(torrent_hash=torrent.hash) + for file_entry in torrent_files: + downloaded = int(file_entry.get("downloaded", file_entry["size"])) + remaining = max(int(file_entry["size"]) - downloaded, 0) + files.append( + FileState( + torrent_hash=torrent.hash, + torrent_name=torrent.name, + file_index=file_entry["index"], + name=file_entry["name"], + size=int(file_entry["size"]), + downloaded=downloaded, + progress=float(file_entry["progress"]), + priority=int(file_entry["priority"]), + remaining=remaining, + save_path=Path(save_path), + ) + ) + return files + + def set_priority(self, torrent_hash: str, file_ids: Iterable[int], priority: int) -> None: + ids = list(file_ids) + if not ids: + return + self.client.torrents_file_priority( + torrent_hash=torrent_hash, + file_ids=ids, + priority=priority, + ) + + def set_sequential(self, torrent_hashes: Iterable[str], value: bool) -> None: + hashes = list(dict.fromkeys(torrent_hashes)) + if not hashes: + return + self.client.torrents_set_sequential_download( + torrent_hashes="|".join(hashes), + value=value, + ) diff --git a/qflow/scheduler.py b/qflow/scheduler.py new file mode 100644 index 0000000..677fd22 --- /dev/null +++ b/qflow/scheduler.py @@ -0,0 +1,189 @@ +from __future__ import annotations + +import logging +import shutil +import threading +import time +from collections import defaultdict +from typing import TYPE_CHECKING, Dict, List, Sequence, Set, Tuple + +from .config import AppConfig +from .db import UploadQueue +from .models import FileState + +if TYPE_CHECKING: # pragma: no cover + from .qb import QBClient + + +class Scheduler(threading.Thread): + def __init__( + self, + cfg: AppConfig, + qb: "QBClient", + queue: UploadQueue, + stop_event: threading.Event, + logger: logging.Logger, + ): + super().__init__(daemon=True) + self.cfg = cfg + self.qb = qb + self.queue = queue + self.stop_event = stop_event + self.log = logger + self._progress_ts: Dict[Tuple[str, int], Tuple[float, float]] = {} + self._stall_until: Dict[str, float] = {} + self._sequential_state: Dict[str, bool] = {} + + def run(self) -> None: + poll = self.cfg.scheduler.poll_seconds + self.log.info("Scheduler loop started (poll=%ss)", poll) + while not self.stop_event.is_set(): + start = time.time() + try: + self._loop() + except Exception as exc: # pragma: no cover + self.log.exception("Scheduler iteration failed: %s", exc) + elapsed = time.time() - start + wait = max(0, poll - elapsed) + self.stop_event.wait(wait) + self.log.info("Scheduler loop stopped") + + # ---------------- internal helpers ----------------- + + def _loop(self) -> None: + files = self.qb.fetch_file_states() + if not files: + self.log.debug("No torrents returned by WebUI") + return + + self._handle_completed(files) + free_bytes = self._free_capacity() + queue_empty = self.queue.pending_count() == 0 + selection = self._select_files(files, free_bytes, queue_empty) + self._apply_priorities(files, selection) + self._manage_sequential(selection) + + def _handle_completed(self, files: Sequence[FileState]) -> None: + for file in files: + if not file.is_complete: + continue + target_path = file.full_path() + inserted = self.queue.enqueue( + file.torrent_hash, + file.file_index, + target_path, + file.size, + ) + if inserted: + if file.priority != 0: + self.qb.set_priority(file.torrent_hash, [file.file_index], 0) + self.log.info("Queued %s for upload", target_path) + + def _free_capacity(self) -> int: + usage = shutil.disk_usage(self.cfg.paths.download_dir) + safe_margin = self.cfg.scheduler.safe_margin_gb * (1024**3) + capacity = max(0, usage.free - safe_margin) + self.log.debug( + "Disk free=%s GiB (safe margin %s GiB, usable=%s GiB)", + round(usage.free / (1024**3), 2), + self.cfg.scheduler.safe_margin_gb, + round(capacity / (1024**3), 2), + ) + return capacity + + def _select_files( + self, + files: Sequence[FileState], + capacity_bytes: int, + queue_empty: bool, + ) -> List[FileState]: + candidates = [f for f in files if not f.is_complete] + if capacity_bytes <= 0 or not candidates: + return [] + + selection: List[FileState] = [] + budget = capacity_bytes + anchors = sorted(candidates, key=lambda f: f.remaining, reverse=True) + for file in anchors: + if file.remaining <= budget and len(selection) < self.cfg.scheduler.anchors_per_batch: + selection.append(file) + budget -= file.remaining + + fillers_taken = 0 + filler_limit = self.cfg.scheduler.filler_limit + if queue_empty: + filler_limit = max(filler_limit, self.cfg.scheduler.cold_start_small_limit) + for file in sorted(candidates, key=lambda f: f.remaining): + if file in selection: + continue + if file.remaining <= budget and fillers_taken < filler_limit: + selection.append(file) + budget -= file.remaining + fillers_taken += 1 + + return selection + + def _apply_priorities(self, files: Sequence[FileState], selection: Sequence[FileState]) -> None: + selected_keys = {(f.torrent_hash, f.file_index) for f in selection} + per_torrent: Dict[str, Dict[int, List[int]]] = defaultdict(lambda: defaultdict(list)) + for file in files: + if file.is_complete: + continue + key = (file.torrent_hash, file.file_index) + target_priority = 7 if key in selected_keys else 0 + if file.priority == target_priority: + continue + per_torrent[file.torrent_hash][target_priority].append(file.file_index) + + for torrent_hash, priorities in per_torrent.items(): + for priority, file_ids in priorities.items(): + self.qb.set_priority(torrent_hash, file_ids, priority) + self.log.debug( + "Set %s files on %s to priority %s", + len(file_ids), + torrent_hash, + priority, + ) + + def _manage_sequential(self, selection: Sequence[FileState]) -> None: + now = time.time() + selected_torrents: Set[str] = {file.torrent_hash for file in selection} + stall_threshold = self.cfg.scheduler.stall_percent / 100 + + for file in selection: + key = (file.torrent_hash, file.file_index) + progress_info = self._progress_ts.get(key) + if progress_info is None or file.progress - progress_info[0] > 0.001: + self._progress_ts[key] = (file.progress, now) + continue + last_progress, last_ts = progress_info + if file.progress >= stall_threshold and now - last_ts >= self.cfg.scheduler.stall_minutes * 60: + resume_after = now + self.cfg.scheduler.stall_resume_minutes * 60 + if self._stall_until.get(file.torrent_hash, 0) < resume_after: + self._stall_until[file.torrent_hash] = resume_after + self.qb.set_sequential([file.torrent_hash], False) + self._sequential_state[file.torrent_hash] = False + self.log.warning( + "Detected stall on %s:%s (progress %.2f%%) -> sequential OFF", + file.torrent_hash, + file.file_index, + file.progress * 100, + ) + + for torrent_hash in list(self._stall_until.keys()): + if now >= self._stall_until[torrent_hash]: + self._stall_until.pop(torrent_hash, None) + + for torrent_hash in selected_torrents: + if torrent_hash in self._stall_until: + continue + if not self._sequential_state.get(torrent_hash, False): + self.qb.set_sequential([torrent_hash], True) + self._sequential_state[torrent_hash] = True + self.log.debug("Sequential ON for %s", torrent_hash) + + for torrent_hash, enabled in list(self._sequential_state.items()): + if torrent_hash not in selected_torrents and enabled: + self.qb.set_sequential([torrent_hash], False) + self._sequential_state[torrent_hash] = False + self.log.debug("Sequential OFF for %s", torrent_hash) diff --git a/qflow/uploader.py b/qflow/uploader.py new file mode 100644 index 0000000..d036c2d --- /dev/null +++ b/qflow/uploader.py @@ -0,0 +1,109 @@ +from __future__ import annotations + +import logging +import subprocess +import threading +import time +from datetime import datetime, timezone +from pathlib import Path +from typing import List + +from .config import AppConfig +from .db import UploadQueue +from .models import UploadTask + + +class Uploader(threading.Thread): + def __init__( + self, + cfg: AppConfig, + queue: UploadQueue, + stop_event: threading.Event, + logger: logging.Logger, + worker_id: int, + ): + super().__init__(daemon=True, name=f"uploader-{worker_id}") + self.cfg = cfg + self.queue = queue + self.stop_event = stop_event + self.log = logger.getChild(self.name) + self.worker_id = worker_id + + def run(self) -> None: + self.log.info("Uploader thread ready") + while not self.stop_event.is_set(): + tasks = self.queue.lease(1) + if not tasks: + self.stop_event.wait(3) + continue + for task in tasks: + self._process(task) + self.log.info("Uploader thread exiting") + + def _process(self, task: UploadTask) -> None: + if not task.path.exists(): + self.log.warning("File missing, marking as done: %s", task.path) + self.queue.mark_done(task.id) + return + remote_path = self._build_remote_path(task) + cmd = self._build_command(task.path, remote_path) + self.log.info("[%s] Uploading %s -> %s", task.id, task.path, remote_path) + try: + subprocess.run(cmd, check=True, capture_output=True) + except subprocess.CalledProcessError as exc: + stderr = exc.stderr.decode() if exc.stderr else str(exc) + self.log.error("[%s] Upload failed: %s", task.id, stderr.strip()) + self.queue.mark_failed(task.id, stderr.strip()) + time.sleep(2) + return + self.queue.mark_done(task.id) + self.log.info("[%s] Upload finished", task.id) + + def _build_command(self, local_path: Path, remote_path: str) -> List[str]: + args: List[str] = [self.cfg.uploader.rclone_path, "move", str(local_path), remote_path] + if self.cfg.uploader.rclone_config: + args += ["--config", str(self.cfg.uploader.rclone_config)] + args += [ + "--transfers", + str(self.cfg.uploader.transfers), + "--checkers", + str(self.cfg.uploader.checkers), + "--retries", + str(self.cfg.uploader.retries), + "--low-level-retries", + str(self.cfg.uploader.low_level_retries), + ] + if self.cfg.uploader.bwlimit_up: + args += ["--bwlimit", self.cfg.uploader.bwlimit_up] + if self.cfg.uploader.drive_chunk_size: + args += ["--drive-chunk-size", self.cfg.uploader.drive_chunk_size] + nice_level = self.cfg.uploader.nice_level + if nice_level: + args = ["nice", "-n", str(nice_level), *args] + ionice_class = self.cfg.uploader.ionice_class + if ionice_class: + args = [ + "ionice", + "-c", + str(ionice_class), + "-n", + str(self.cfg.uploader.ionice_level or 7), + *args, + ] + return args + + def _build_remote_path(self, task: UploadTask) -> str: + now = datetime.now(timezone.utc) + folder = self.cfg.uploader.root_template.format( + year=now.year, + month=now.month, + day=now.day, + ).strip("/") + base = self.cfg.uploader.remote.rstrip("/") + if base.endswith(":"): + remote_root = base + else: + remote_root = base + "/" + if folder: + remote_root = f"{remote_root}{folder}/" + return f"{remote_root}{task.path.name}"