tubewatch/playlist-monitor/app/services/playlist_service.py

430 lines
16 KiB
Python

"""
Playlist service for managing playlist subscriptions and operations
"""
import logging
import re
from typing import List, Optional, Dict, Any
from datetime import datetime
from urllib.parse import urlparse, parse_qs
import yt_dlp
from sqlalchemy.orm import Session
from sqlalchemy import and_, or_
from ..models.playlist import PlaylistSubscription
from ..models.video import VideoRecord, VideoStatus
from ..core.config import settings
from ..core.scheduler import scheduler_manager
from .metube_client import MeTubeClient
from .video_service import VideoService
logger = logging.getLogger(__name__)
class PlaylistService:
"""Service for managing playlist operations"""
def __init__(self, db: Session):
self.db = db
self.video_service = VideoService(db)
def get_playlists(self, skip: int = 0, limit: int = 100, enabled: Optional[bool] = None) -> List[PlaylistSubscription]:
"""Get playlists with optional filtering"""
query = self.db.query(PlaylistSubscription)
if enabled is not None:
query = query.filter(PlaylistSubscription.enabled == enabled)
return query.offset(skip).limit(limit).all()
def get_playlist(self, playlist_id: str) -> Optional[PlaylistSubscription]:
"""Get a specific playlist by ID"""
return self.db.query(PlaylistSubscription).filter(PlaylistSubscription.id == playlist_id).first()
def get_playlist_by_url(self, url: str) -> Optional[PlaylistSubscription]:
"""Get a playlist by URL"""
return self.db.query(PlaylistSubscription).filter(PlaylistSubscription.url == url).first()
async def add_playlist(
self,
url: str,
check_interval: int = settings.DEFAULT_CHECK_INTERVAL,
start_point: Optional[str] = None,
quality: str = settings.DEFAULT_QUALITY,
format: str = settings.DEFAULT_FORMAT,
folder: Optional[str] = None,
enabled: bool = True
) -> PlaylistSubscription:
"""Add a new playlist for monitoring"""
# Validate URL
if not self._is_valid_youtube_playlist_url(url):
raise ValueError("Invalid YouTube playlist URL")
# Check if playlist already exists
existing = self.get_playlist_by_url(url)
if existing:
raise ValueError(f"Playlist already exists with URL: {url}")
# Extract playlist info using yt-dlp
playlist_info = await self._extract_playlist_info(url)
if not playlist_info:
raise ValueError("Failed to extract playlist information")
# Create playlist subscription
playlist = PlaylistSubscription(
url=url,
title=playlist_info.get("title"),
check_interval=check_interval,
start_point=start_point,
quality=quality,
format=format,
folder=folder,
enabled=enabled
)
self.db.add(playlist)
self.db.commit()
self.db.refresh(playlist)
logger.info(f"Created playlist subscription: {playlist.title} ({playlist.id})")
# Fetch and create video records
await self._initialize_playlist_videos(playlist, playlist_info)
# Schedule periodic checks if enabled
if enabled:
scheduler_manager.add_playlist_check_job(playlist.id, check_interval)
return playlist
def update_playlist(self, playlist_id: str, **kwargs) -> PlaylistSubscription:
"""Update playlist settings"""
playlist = self.get_playlist(playlist_id)
if not playlist:
raise ValueError(f"Playlist not found: {playlist_id}")
# Update fields
for key, value in kwargs.items():
if hasattr(playlist, key) and value is not None:
setattr(playlist, key, value)
playlist.updated_at = datetime.utcnow()
self.db.commit()
self.db.refresh(playlist)
# Update scheduler if check_interval changed
if "check_interval" in kwargs and playlist.enabled:
scheduler_manager.add_playlist_check_job(playlist.id, playlist.check_interval)
logger.info(f"Updated playlist: {playlist.title} ({playlist.id})")
return playlist
def delete_playlist(self, playlist_id: str, delete_videos: bool = False) -> None:
"""Delete a playlist"""
playlist = self.get_playlist(playlist_id)
if not playlist:
raise ValueError(f"Playlist not found: {playlist_id}")
# Remove scheduler job
scheduler_manager.remove_playlist_check_job(playlist_id)
# Delete playlist (videos will be cascade deleted if delete_videos is True)
self.db.delete(playlist)
self.db.commit()
logger.info(f"Deleted playlist: {playlist.title} ({playlist.id})")
async def check_playlist(self, playlist_id: str, force: bool = False) -> int:
"""Check playlist for new videos"""
playlist = self.get_playlist(playlist_id)
if not playlist:
raise ValueError(f"Playlist not found: {playlist_id}")
if not playlist.enabled and not force:
logger.info(f"Playlist {playlist_id} is disabled, skipping check")
return 0
if not playlist.should_check() and not force:
logger.info(f"Playlist {playlist_id} was recently checked, skipping")
return 0
logger.info(f"Checking playlist: {playlist.title} ({playlist_id})")
# Extract current playlist info
playlist_info = await self._extract_playlist_info(playlist.url)
if not playlist_info:
logger.error(f"Failed to extract playlist info for {playlist_id}")
return 0
# Get existing video IDs
existing_videos = self.db.query(VideoRecord).filter(
VideoRecord.playlist_id == playlist_id
).all()
existing_video_ids = {v.video_id for v in existing_videos}
# Process new videos
new_videos_count = 0
videos_info = playlist_info.get("entries", [])
for video_info in videos_info:
video_id = video_info.get("id")
if not video_id:
continue
if video_id not in existing_video_ids:
# Create new video record
video = self._create_video_record(playlist, video_info)
self.db.add(video)
new_videos_count += 1
logger.debug(f"Found new video: {video.title} ({video_id})")
# Update last checked timestamp
playlist.last_checked = datetime.utcnow()
self.db.commit()
# Trigger downloads for pending videos
if new_videos_count > 0:
await self._trigger_pending_downloads(playlist)
logger.info(f"Playlist check completed: {playlist.title} - Found {new_videos_count} new videos")
return new_videos_count
def get_playlist_stats(self, playlist_id: str) -> Dict[str, int]:
"""Get playlist statistics"""
stats = {
"total": 0,
"pending": 0,
"downloading": 0,
"completed": 0,
"failed": 0,
"skipped": 0
}
# Get video counts by status
video_counts = self.db.query(VideoRecord.status, func.count(VideoRecord.id)).filter(
VideoRecord.playlist_id == playlist_id
).group_by(VideoRecord.status).all()
for status, count in video_counts:
stats["total"] += count
if status == VideoStatus.PENDING:
stats["pending"] = count
elif status == VideoStatus.DOWNLOADING:
stats["downloading"] = count
elif status == VideoStatus.COMPLETED:
stats["completed"] = count
elif status == VideoStatus.FAILED:
stats["failed"] = count
elif status == VideoStatus.SKIPPED:
stats["skipped"] = count
return stats
def get_playlist_videos(
self,
playlist_id: str,
status: Optional[str] = None,
limit: int = 50,
skip: int = 0
) -> List[Dict[str, Any]]:
"""Get videos for a playlist"""
query = self.db.query(VideoRecord).filter(VideoRecord.playlist_id == playlist_id)
if status:
query = query.filter(VideoRecord.status == status)
videos = query.order_by(VideoRecord.playlist_index).offset(skip).limit(limit).all()
# Convert to dict for JSON serialization
return [
{
"id": v.id,
"video_id": v.video_id,
"title": v.title,
"status": v.status,
"playlist_index": v.playlist_index,
"upload_date": v.upload_date.isoformat() if v.upload_date else None,
"download_requested_at": v.download_requested_at.isoformat() if v.download_requested_at else None,
"download_completed_at": v.download_completed_at.isoformat() if v.download_completed_at else None,
"error_message": v.error_message,
"retry_count": v.retry_count,
"file_moved": v.file_moved,
"file_location_note": v.file_location_note,
}
for v in videos
]
def update_start_point(self, playlist_id: str, start_video_id: str) -> int:
"""Update start point and mark videos before it as skipped"""
playlist = self.get_playlist(playlist_id)
if not playlist:
raise ValueError(f"Playlist not found: {playlist_id}")
# Find the start video
start_video = self.db.query(VideoRecord).filter(
and_(
VideoRecord.playlist_id == playlist_id,
VideoRecord.video_id == start_video_id
)
).first()
if not start_video:
raise ValueError(f"Video not found in playlist: {start_video_id}")
# Update playlist start point
playlist.start_point = start_video_id
playlist.updated_at = datetime.utcnow()
# Mark videos before start point as skipped
updated_count = 0
videos_to_skip = self.db.query(VideoRecord).filter(
and_(
VideoRecord.playlist_id == playlist_id,
VideoRecord.playlist_index < start_video.playlist_index,
VideoRecord.status == VideoStatus.PENDING
)
).all()
for video in videos_to_skip:
video.mark_as_skipped()
updated_count += 1
self.db.commit()
logger.info(f"Updated start point for playlist {playlist_id}: {updated_count} videos marked as skipped")
return updated_count
def _is_valid_youtube_playlist_url(self, url: str) -> bool:
"""Validate YouTube playlist URL"""
try:
parsed = urlparse(url)
# Check if it's a YouTube domain
if parsed.netloc not in ["youtube.com", "www.youtube.com", "m.youtube.com", "youtu.be"]:
return False
# Check for playlist parameter
if "playlist" in parsed.path.lower():
return True
query_params = parse_qs(parsed.query)
if "list" in query_params:
return True
return False
except Exception:
return False
async def _extract_playlist_info(self, url: str) -> Optional[Dict[str, Any]]:
"""Extract playlist information using yt-dlp"""
try:
ydl_opts = {
"quiet": True,
"no_warnings": True,
"extract_flat": True, # Only extract metadata, not actual videos
"skip_download": True,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
return info
except Exception as e:
logger.error(f"Error extracting playlist info: {e}")
return None
def _create_video_record(self, playlist: PlaylistSubscription, video_info: Dict[str, Any]) -> VideoRecord:
"""Create a video record from video info"""
video_id = video_info.get("id")
title = video_info.get("title")
playlist_index = video_info.get("playlist_index")
upload_date_str = video_info.get("upload_date")
# Parse upload date
upload_date = None
if upload_date_str:
try:
upload_date = datetime.strptime(upload_date_str, "%Y%m%d")
except ValueError:
pass
# Determine initial status based on start point
status = VideoStatus.PENDING
if playlist.start_point:
# If start_point is set, check if this video should be skipped
if self._should_skip_video(playlist, video_id, playlist_index):
status = VideoStatus.SKIPPED
video = VideoRecord(
playlist_id=playlist.id,
video_url=f"https://www.youtube.com/watch?v={video_id}",
video_id=video_id,
title=title,
playlist_index=playlist_index,
upload_date=upload_date,
status=status
)
return video
def _should_skip_video(self, playlist: PlaylistSubscription, video_id: str, playlist_index: Optional[int]) -> bool:
"""Determine if a video should be skipped based on start point"""
if not playlist.start_point:
return False
# If start_point is a video ID
if playlist.start_point == video_id:
return False
# If start_point is a playlist index
try:
start_index = int(playlist.start_point)
if playlist_index is not None and playlist_index < start_index:
return True
except ValueError:
pass
# Check if we've already processed videos after the start point
existing_after_start = self.db.query(VideoRecord).filter(
and_(
VideoRecord.playlist_id == playlist.id,
VideoRecord.playlist_index > playlist_index if playlist_index else True,
VideoRecord.status != VideoStatus.SKIPPED
)
).count()
return existing_after_start > 0
async def _initialize_playlist_videos(self, playlist: PlaylistSubscription, playlist_info: Dict[str, Any]) -> None:
"""Initialize video records for a new playlist"""
videos_info = playlist_info.get("entries", [])
for video_info in videos_info:
video = self._create_video_record(playlist, video_info)
self.db.add(video)
self.db.commit()
logger.info(f"Initialized {len(videos_info)} video records for playlist {playlist.id}")
async def _trigger_pending_downloads(self, playlist: PlaylistSubscription) -> None:
"""Trigger downloads for pending videos in a playlist"""
pending_videos = self.db.query(VideoRecord).filter(
and_(
VideoRecord.playlist_id == playlist.id,
VideoRecord.status == VideoStatus.PENDING
)
).order_by(VideoRecord.playlist_index).limit(settings.MAX_CONCURRENT_DOWNLOADS).all()
if not pending_videos:
return
logger.info(f"Triggering downloads for {len(pending_videos)} pending videos in playlist {playlist.id}")
for video in pending_videos:
try:
await self.video_service.download_video(video.id)
except Exception as e:
logger.error(f"Error triggering download for video {video.id}: {e}")