from __future__ import annotations import logging import os import threading import time from pathlib import Path from .config import AppConfig, load_config from .db import UploadQueue from .qb import QBClient from .scheduler import Scheduler from .uploader import Uploader def _setup_logging(log_dir: Path) -> logging.Logger: log_dir.mkdir(parents=True, exist_ok=True) log_level = os.environ.get("QFLOW_LOG_LEVEL", "INFO").upper() logging.basicConfig( level=getattr(logging, log_level, logging.INFO), format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) logger = logging.getLogger("qflow") fh = logging.FileHandler(log_dir / "qflow.log") fh.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s")) logger.addHandler(fh) return logger def ensure_paths(cfg: AppConfig) -> None: cfg.paths.download_dir.mkdir(parents=True, exist_ok=True) cfg.paths.log_dir.mkdir(parents=True, exist_ok=True) cfg.paths.database_path.parent.mkdir(parents=True, exist_ok=True) def run() -> None: cfg = load_config() ensure_paths(cfg) logger = _setup_logging(cfg.paths.log_dir) logger.info("Starting QFlow controller") stop_event = threading.Event() queue = UploadQueue(cfg.paths.database_path) qb = QBClient(cfg.qbittorrent, logger.getChild("qb")) scheduler = Scheduler(cfg, qb, queue, stop_event, logger.getChild("scheduler")) scheduler.start() uploaders = [ Uploader(cfg, queue, stop_event, logger, idx) for idx in range(1, cfg.uploader.threads + 1) ] for worker in uploaders: worker.start() logger.info("QFlow initialized with %s uploader threads", len(uploaders)) try: while True: time.sleep(1) except KeyboardInterrupt: logger.info("Shutdown requested") finally: stop_event.set() scheduler.join(timeout=5) for worker in uploaders: worker.join(timeout=5) logger.info("Shutdown complete") if __name__ == "__main__": # pragma: no cover run()