from __future__ import annotations import logging import subprocess import threading import time from datetime import datetime, timezone from pathlib import Path from typing import List from .config import AppConfig from .db import UploadQueue from .models import UploadTask class Uploader(threading.Thread): def __init__( self, cfg: AppConfig, queue: UploadQueue, stop_event: threading.Event, logger: logging.Logger, worker_id: int, ): super().__init__(daemon=True, name=f"uploader-{worker_id}") self.cfg = cfg self.queue = queue self.stop_event = stop_event self.log = logger.getChild(self.name) self.worker_id = worker_id def run(self) -> None: self.log.info("Uploader thread ready") while not self.stop_event.is_set(): tasks = self.queue.lease(1) if not tasks: self.stop_event.wait(3) continue for task in tasks: self._process(task) self.log.info("Uploader thread exiting") def _process(self, task: UploadTask) -> None: if not task.path.exists(): self.log.warning("File missing, marking as done: %s", task.path) self.queue.mark_done(task.id) return remote_path = self._build_remote_path(task) cmd = self._build_command(task.path, remote_path) self.log.info("[%s] Uploading %s -> %s", task.id, task.path, remote_path) try: subprocess.run(cmd, check=True, capture_output=True) except subprocess.CalledProcessError as exc: stderr = exc.stderr.decode() if exc.stderr else str(exc) self.log.error("[%s] Upload failed: %s", task.id, stderr.strip()) self.queue.mark_failed(task.id, stderr.strip()) time.sleep(2) return self.queue.mark_done(task.id) self.log.info("[%s] Upload finished", task.id) def _build_command(self, local_path: Path, remote_path: str) -> List[str]: args: List[str] = [self.cfg.uploader.rclone_path, "move", str(local_path), remote_path] if self.cfg.uploader.rclone_config: args += ["--config", str(self.cfg.uploader.rclone_config)] args += [ "--transfers", str(self.cfg.uploader.transfers), "--checkers", str(self.cfg.uploader.checkers), "--retries", str(self.cfg.uploader.retries), "--low-level-retries", str(self.cfg.uploader.low_level_retries), ] if self.cfg.uploader.bwlimit_up: args += ["--bwlimit", self.cfg.uploader.bwlimit_up] if self.cfg.uploader.drive_chunk_size: args += ["--drive-chunk-size", self.cfg.uploader.drive_chunk_size] nice_level = self.cfg.uploader.nice_level if nice_level: args = ["nice", "-n", str(nice_level), *args] ionice_class = self.cfg.uploader.ionice_class if ionice_class: args = [ "ionice", "-c", str(ionice_class), "-n", str(self.cfg.uploader.ionice_level or 7), *args, ] return args def _build_remote_path(self, task: UploadTask) -> str: now = datetime.now(timezone.utc) folder = self.cfg.uploader.root_template.format( year=now.year, month=now.month, day=now.day, ).strip("/") base = self.cfg.uploader.remote.rstrip("/") if base.endswith(":"): remote_root = base else: remote_root = base + "/" if folder: remote_root = f"{remote_root}{folder}/" return f"{remote_root}{task.path.name}"