QFlow/qflow/scheduler.py
2025-11-11 15:20:18 +08:00

190 lines
7.4 KiB
Python

from __future__ import annotations
import logging
import shutil
import threading
import time
from collections import defaultdict
from typing import TYPE_CHECKING, Dict, List, Sequence, Set, Tuple
from .config import AppConfig
from .db import UploadQueue
from .models import FileState
if TYPE_CHECKING: # pragma: no cover
from .qb import QBClient
class Scheduler(threading.Thread):
def __init__(
self,
cfg: AppConfig,
qb: "QBClient",
queue: UploadQueue,
stop_event: threading.Event,
logger: logging.Logger,
):
super().__init__(daemon=True)
self.cfg = cfg
self.qb = qb
self.queue = queue
self.stop_event = stop_event
self.log = logger
self._progress_ts: Dict[Tuple[str, int], Tuple[float, float]] = {}
self._stall_until: Dict[str, float] = {}
self._sequential_state: Dict[str, bool] = {}
def run(self) -> None:
poll = self.cfg.scheduler.poll_seconds
self.log.info("Scheduler loop started (poll=%ss)", poll)
while not self.stop_event.is_set():
start = time.time()
try:
self._loop()
except Exception as exc: # pragma: no cover
self.log.exception("Scheduler iteration failed: %s", exc)
elapsed = time.time() - start
wait = max(0, poll - elapsed)
self.stop_event.wait(wait)
self.log.info("Scheduler loop stopped")
# ---------------- internal helpers -----------------
def _loop(self) -> None:
files = self.qb.fetch_file_states()
if not files:
self.log.debug("No torrents returned by WebUI")
return
self._handle_completed(files)
free_bytes = self._free_capacity()
queue_empty = self.queue.pending_count() == 0
selection = self._select_files(files, free_bytes, queue_empty)
self._apply_priorities(files, selection)
self._manage_sequential(selection)
def _handle_completed(self, files: Sequence[FileState]) -> None:
for file in files:
if not file.is_complete:
continue
target_path = file.full_path()
inserted = self.queue.enqueue(
file.torrent_hash,
file.file_index,
target_path,
file.size,
)
if inserted:
if file.priority != 0:
self.qb.set_priority(file.torrent_hash, [file.file_index], 0)
self.log.info("Queued %s for upload", target_path)
def _free_capacity(self) -> int:
usage = shutil.disk_usage(self.cfg.paths.download_dir)
safe_margin = self.cfg.scheduler.safe_margin_gb * (1024**3)
capacity = max(0, usage.free - safe_margin)
self.log.debug(
"Disk free=%s GiB (safe margin %s GiB, usable=%s GiB)",
round(usage.free / (1024**3), 2),
self.cfg.scheduler.safe_margin_gb,
round(capacity / (1024**3), 2),
)
return capacity
def _select_files(
self,
files: Sequence[FileState],
capacity_bytes: int,
queue_empty: bool,
) -> List[FileState]:
candidates = [f for f in files if not f.is_complete]
if capacity_bytes <= 0 or not candidates:
return []
selection: List[FileState] = []
budget = capacity_bytes
anchors = sorted(candidates, key=lambda f: f.remaining, reverse=True)
for file in anchors:
if file.remaining <= budget and len(selection) < self.cfg.scheduler.anchors_per_batch:
selection.append(file)
budget -= file.remaining
fillers_taken = 0
filler_limit = self.cfg.scheduler.filler_limit
if queue_empty:
filler_limit = max(filler_limit, self.cfg.scheduler.cold_start_small_limit)
for file in sorted(candidates, key=lambda f: f.remaining):
if file in selection:
continue
if file.remaining <= budget and fillers_taken < filler_limit:
selection.append(file)
budget -= file.remaining
fillers_taken += 1
return selection
def _apply_priorities(self, files: Sequence[FileState], selection: Sequence[FileState]) -> None:
selected_keys = {(f.torrent_hash, f.file_index) for f in selection}
per_torrent: Dict[str, Dict[int, List[int]]] = defaultdict(lambda: defaultdict(list))
for file in files:
if file.is_complete:
continue
key = (file.torrent_hash, file.file_index)
target_priority = 7 if key in selected_keys else 0
if file.priority == target_priority:
continue
per_torrent[file.torrent_hash][target_priority].append(file.file_index)
for torrent_hash, priorities in per_torrent.items():
for priority, file_ids in priorities.items():
self.qb.set_priority(torrent_hash, file_ids, priority)
self.log.debug(
"Set %s files on %s to priority %s",
len(file_ids),
torrent_hash,
priority,
)
def _manage_sequential(self, selection: Sequence[FileState]) -> None:
now = time.time()
selected_torrents: Set[str] = {file.torrent_hash for file in selection}
stall_threshold = self.cfg.scheduler.stall_percent / 100
for file in selection:
key = (file.torrent_hash, file.file_index)
progress_info = self._progress_ts.get(key)
if progress_info is None or file.progress - progress_info[0] > 0.001:
self._progress_ts[key] = (file.progress, now)
continue
last_progress, last_ts = progress_info
if file.progress >= stall_threshold and now - last_ts >= self.cfg.scheduler.stall_minutes * 60:
resume_after = now + self.cfg.scheduler.stall_resume_minutes * 60
if self._stall_until.get(file.torrent_hash, 0) < resume_after:
self._stall_until[file.torrent_hash] = resume_after
self.qb.set_sequential([file.torrent_hash], False)
self._sequential_state[file.torrent_hash] = False
self.log.warning(
"Detected stall on %s:%s (progress %.2f%%) -> sequential OFF",
file.torrent_hash,
file.file_index,
file.progress * 100,
)
for torrent_hash in list(self._stall_until.keys()):
if now >= self._stall_until[torrent_hash]:
self._stall_until.pop(torrent_hash, None)
for torrent_hash in selected_torrents:
if torrent_hash in self._stall_until:
continue
if not self._sequential_state.get(torrent_hash, False):
self.qb.set_sequential([torrent_hash], True)
self._sequential_state[torrent_hash] = True
self.log.debug("Sequential ON for %s", torrent_hash)
for torrent_hash, enabled in list(self._sequential_state.items()):
if torrent_hash not in selected_torrents and enabled:
self.qb.set_sequential([torrent_hash], False)
self._sequential_state[torrent_hash] = False
self.log.debug("Sequential OFF for %s", torrent_hash)