QFlow/qflow/db.py
2025-11-11 15:20:18 +08:00

112 lines
3.8 KiB
Python

from __future__ import annotations
import sqlite3
import threading
import time
from pathlib import Path
from typing import Iterable, List
from .models import UploadTask
class UploadQueue:
def __init__(self, db_path: Path):
self.db_path = db_path
db_path.parent.mkdir(parents=True, exist_ok=True)
self._conn = sqlite3.connect(db_path, check_same_thread=False)
self._conn.row_factory = sqlite3.Row
self._lock = threading.Lock()
self._init_db()
def _init_db(self) -> None:
with self._conn:
self._conn.execute(
"""
CREATE TABLE IF NOT EXISTS upload_queue (
id INTEGER PRIMARY KEY AUTOINCREMENT,
torrent_hash TEXT NOT NULL,
file_index INTEGER NOT NULL,
path TEXT NOT NULL UNIQUE,
size INTEGER NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
tries INTEGER NOT NULL DEFAULT 0,
last_error TEXT,
updated_at REAL NOT NULL
)
"""
)
def enqueue(self, torrent_hash: str, file_index: int, path: Path, size: int) -> bool:
now = time.time()
with self._lock, self._conn:
cur = self._conn.execute(
"""
INSERT OR IGNORE INTO upload_queue (torrent_hash, file_index, path, size, updated_at)
VALUES (?, ?, ?, ?, ?)
""",
(torrent_hash, file_index, str(path), size, now),
)
return cur.rowcount > 0
def lease(self, limit: int) -> List[UploadTask]:
now = time.time()
with self._lock, self._conn:
rows = list(
self._conn.execute(
"SELECT * FROM upload_queue WHERE status='pending' ORDER BY id LIMIT ?",
(limit,),
)
)
ids = [row["id"] for row in rows]
if ids:
placeholders = ",".join("?" for _ in ids)
self._conn.execute(
f"UPDATE upload_queue SET status='in_progress', updated_at=? WHERE id IN ({placeholders})",
(now, *ids),
)
return [self._row_to_task(row) for row in rows]
def mark_done(self, task_id: int) -> None:
with self._lock, self._conn:
self._conn.execute(
"UPDATE upload_queue SET status='done', updated_at=? WHERE id=?",
(time.time(), task_id),
)
def mark_failed(self, task_id: int, error: str) -> None:
now = time.time()
with self._lock, self._conn:
self._conn.execute(
"""
UPDATE upload_queue
SET status='pending', tries=tries+1, last_error=?, updated_at=?
WHERE id=?
""",
(error[:400], now, task_id),
)
def pending_count(self) -> int:
with self._lock:
cur = self._conn.execute("SELECT COUNT(*) FROM upload_queue WHERE status='pending'")
(count,) = cur.fetchone()
return int(count)
def _row_to_task(self, row: sqlite3.Row) -> UploadTask:
return UploadTask(
id=row["id"],
torrent_hash=row["torrent_hash"],
file_index=row["file_index"],
path=Path(row["path"]),
size=row["size"],
tries=row["tries"],
last_error=row["last_error"],
)
def pending_items(self, limit: int = 50) -> Iterable[UploadTask]:
rows = self._conn.execute(
"SELECT * FROM upload_queue WHERE status='pending' ORDER BY id LIMIT ?",
(limit,),
)
for row in rows:
yield self._row_to_task(row)