from __future__ import annotations import sqlite3 import threading import time from pathlib import Path from typing import Iterable, List from .models import UploadTask class UploadQueue: def __init__(self, db_path: Path): self.db_path = db_path db_path.parent.mkdir(parents=True, exist_ok=True) self._conn = sqlite3.connect(db_path, check_same_thread=False) self._conn.row_factory = sqlite3.Row self._lock = threading.Lock() self._init_db() def _init_db(self) -> None: with self._conn: self._conn.execute( """ CREATE TABLE IF NOT EXISTS upload_queue ( id INTEGER PRIMARY KEY AUTOINCREMENT, torrent_hash TEXT NOT NULL, file_index INTEGER NOT NULL, path TEXT NOT NULL UNIQUE, size INTEGER NOT NULL, status TEXT NOT NULL DEFAULT 'pending', tries INTEGER NOT NULL DEFAULT 0, last_error TEXT, updated_at REAL NOT NULL ) """ ) def enqueue(self, torrent_hash: str, file_index: int, path: Path, size: int) -> bool: now = time.time() with self._lock, self._conn: cur = self._conn.execute( """ INSERT OR IGNORE INTO upload_queue (torrent_hash, file_index, path, size, updated_at) VALUES (?, ?, ?, ?, ?) """, (torrent_hash, file_index, str(path), size, now), ) return cur.rowcount > 0 def lease(self, limit: int) -> List[UploadTask]: now = time.time() with self._lock, self._conn: rows = list( self._conn.execute( "SELECT * FROM upload_queue WHERE status='pending' ORDER BY id LIMIT ?", (limit,), ) ) ids = [row["id"] for row in rows] if ids: placeholders = ",".join("?" for _ in ids) self._conn.execute( f"UPDATE upload_queue SET status='in_progress', updated_at=? WHERE id IN ({placeholders})", (now, *ids), ) return [self._row_to_task(row) for row in rows] def mark_done(self, task_id: int) -> None: with self._lock, self._conn: self._conn.execute( "UPDATE upload_queue SET status='done', updated_at=? WHERE id=?", (time.time(), task_id), ) def mark_failed(self, task_id: int, error: str) -> None: now = time.time() with self._lock, self._conn: self._conn.execute( """ UPDATE upload_queue SET status='pending', tries=tries+1, last_error=?, updated_at=? WHERE id=? """, (error[:400], now, task_id), ) def pending_count(self) -> int: with self._lock: cur = self._conn.execute("SELECT COUNT(*) FROM upload_queue WHERE status='pending'") (count,) = cur.fetchone() return int(count) def _row_to_task(self, row: sqlite3.Row) -> UploadTask: return UploadTask( id=row["id"], torrent_hash=row["torrent_hash"], file_index=row["file_index"], path=Path(row["path"]), size=row["size"], tries=row["tries"], last_error=row["last_error"], ) def pending_items(self, limit: int = 50) -> Iterable[UploadTask]: rows = self._conn.execute( "SELECT * FROM upload_queue WHERE status='pending' ORDER BY id LIMIT ?", (limit,), ) for row in rows: yield self._row_to_task(row)