112 lines
3.8 KiB
Python
112 lines
3.8 KiB
Python
from __future__ import annotations
|
|
|
|
import sqlite3
|
|
import threading
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Iterable, List
|
|
|
|
from .models import UploadTask
|
|
|
|
|
|
class UploadQueue:
|
|
def __init__(self, db_path: Path):
|
|
self.db_path = db_path
|
|
db_path.parent.mkdir(parents=True, exist_ok=True)
|
|
self._conn = sqlite3.connect(db_path, check_same_thread=False)
|
|
self._conn.row_factory = sqlite3.Row
|
|
self._lock = threading.Lock()
|
|
self._init_db()
|
|
|
|
def _init_db(self) -> None:
|
|
with self._conn:
|
|
self._conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS upload_queue (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
torrent_hash TEXT NOT NULL,
|
|
file_index INTEGER NOT NULL,
|
|
path TEXT NOT NULL UNIQUE,
|
|
size INTEGER NOT NULL,
|
|
status TEXT NOT NULL DEFAULT 'pending',
|
|
tries INTEGER NOT NULL DEFAULT 0,
|
|
last_error TEXT,
|
|
updated_at REAL NOT NULL
|
|
)
|
|
"""
|
|
)
|
|
|
|
def enqueue(self, torrent_hash: str, file_index: int, path: Path, size: int) -> bool:
|
|
now = time.time()
|
|
with self._lock, self._conn:
|
|
cur = self._conn.execute(
|
|
"""
|
|
INSERT OR IGNORE INTO upload_queue (torrent_hash, file_index, path, size, updated_at)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
""",
|
|
(torrent_hash, file_index, str(path), size, now),
|
|
)
|
|
return cur.rowcount > 0
|
|
|
|
def lease(self, limit: int) -> List[UploadTask]:
|
|
now = time.time()
|
|
with self._lock, self._conn:
|
|
rows = list(
|
|
self._conn.execute(
|
|
"SELECT * FROM upload_queue WHERE status='pending' ORDER BY id LIMIT ?",
|
|
(limit,),
|
|
)
|
|
)
|
|
ids = [row["id"] for row in rows]
|
|
if ids:
|
|
placeholders = ",".join("?" for _ in ids)
|
|
self._conn.execute(
|
|
f"UPDATE upload_queue SET status='in_progress', updated_at=? WHERE id IN ({placeholders})",
|
|
(now, *ids),
|
|
)
|
|
return [self._row_to_task(row) for row in rows]
|
|
|
|
def mark_done(self, task_id: int) -> None:
|
|
with self._lock, self._conn:
|
|
self._conn.execute(
|
|
"UPDATE upload_queue SET status='done', updated_at=? WHERE id=?",
|
|
(time.time(), task_id),
|
|
)
|
|
|
|
def mark_failed(self, task_id: int, error: str) -> None:
|
|
now = time.time()
|
|
with self._lock, self._conn:
|
|
self._conn.execute(
|
|
"""
|
|
UPDATE upload_queue
|
|
SET status='pending', tries=tries+1, last_error=?, updated_at=?
|
|
WHERE id=?
|
|
""",
|
|
(error[:400], now, task_id),
|
|
)
|
|
|
|
def pending_count(self) -> int:
|
|
with self._lock:
|
|
cur = self._conn.execute("SELECT COUNT(*) FROM upload_queue WHERE status='pending'")
|
|
(count,) = cur.fetchone()
|
|
return int(count)
|
|
|
|
def _row_to_task(self, row: sqlite3.Row) -> UploadTask:
|
|
return UploadTask(
|
|
id=row["id"],
|
|
torrent_hash=row["torrent_hash"],
|
|
file_index=row["file_index"],
|
|
path=Path(row["path"]),
|
|
size=row["size"],
|
|
tries=row["tries"],
|
|
last_error=row["last_error"],
|
|
)
|
|
|
|
def pending_items(self, limit: int = 50) -> Iterable[UploadTask]:
|
|
rows = self._conn.execute(
|
|
"SELECT * FROM upload_queue WHERE status='pending' ORDER BY id LIMIT ?",
|
|
(limit,),
|
|
)
|
|
for row in rows:
|
|
yield self._row_to_task(row)
|