#!/usr/bin/env python3 # pylint: disable=no-member,method-hidden import os import sys import asyncio from pathlib import Path from aiohttp import web from aiohttp.log import access_logger import ssl import socket import socketio import logging import json import pathlib import re import base64 from urllib.parse import urlparse from watchfiles import DefaultFilter, Change, awatch from ytdl import DownloadQueueNotifier, DownloadQueue from yt_dlp.version import __version__ as yt_dlp_version log = logging.getLogger('main') class Config: _DEFAULTS = { 'DOWNLOAD_DIR': '.', 'AUDIO_DOWNLOAD_DIR': '%%DOWNLOAD_DIR', 'TEMP_DIR': '%%DOWNLOAD_DIR', 'DOWNLOAD_DIRS_INDEXABLE': 'false', 'CUSTOM_DIRS': 'true', 'CREATE_CUSTOM_DIRS': 'true', 'CUSTOM_DIRS_EXCLUDE_REGEX': r'(^|/)[.@].*$', 'DELETE_FILE_ON_TRASHCAN': 'true', 'STATE_DIR': '.', 'URL_PREFIX': '', 'PUBLIC_HOST_URL': 'download/', 'PUBLIC_HOST_AUDIO_URL': 'audio_download/', 'OUTPUT_TEMPLATE': '%(title)s.%(ext)s', 'OUTPUT_TEMPLATE_CHAPTER': '%(title)s - %(section_number)s %(section_title)s.%(ext)s', 'OUTPUT_TEMPLATE_PLAYLIST': '%(playlist_title)s/%(title)s.%(ext)s', 'DEFAULT_OPTION_PLAYLIST_STRICT_MODE' : 'false', 'DEFAULT_OPTION_PLAYLIST_ITEM_LIMIT' : '0', 'YTDL_OPTIONS': '{}', 'YTDL_OPTIONS_FILE': '', 'ROBOTS_TXT': '', 'HOST': '0.0.0.0', 'PORT': '8081', 'HTTPS': 'false', 'CERTFILE': '', 'KEYFILE': '', 'BASE_DIR': '', 'DEFAULT_THEME': 'auto', 'DOWNLOAD_MODE': 'limited', 'MAX_CONCURRENT_DOWNLOADS': 3, 'LOGLEVEL': 'INFO', 'ENABLE_ACCESSLOG': 'false', } _BOOLEAN = ('DOWNLOAD_DIRS_INDEXABLE', 'CUSTOM_DIRS', 'CREATE_CUSTOM_DIRS', 'DELETE_FILE_ON_TRASHCAN', 'DEFAULT_OPTION_PLAYLIST_STRICT_MODE', 'HTTPS', 'ENABLE_ACCESSLOG') def __init__(self): for k, v in self._DEFAULTS.items(): setattr(self, k, os.environ.get(k, v)) for k, v in self.__dict__.items(): if isinstance(v, str) and v.startswith('%%'): setattr(self, k, getattr(self, v[2:])) if k in self._BOOLEAN: if v not in ('true', 'false', 'True', 'False', 'on', 'off', '1', '0'): log.error(f'Environment variable "{k}" is set to a non-boolean value "{v}"') sys.exit(1) setattr(self, k, v in ('true', 'True', 'on', '1')) if not self.URL_PREFIX.endswith('/'): self.URL_PREFIX += '/' # Convert relative addresses to absolute addresses to prevent the failure of file address comparison if self.YTDL_OPTIONS_FILE and self.YTDL_OPTIONS_FILE.startswith('.'): self.YTDL_OPTIONS_FILE = str(Path(self.YTDL_OPTIONS_FILE).resolve()) success,_ = self.load_ytdl_options() if not success: sys.exit(1) def load_ytdl_options(self) -> tuple[bool, str]: try: self.YTDL_OPTIONS = json.loads(os.environ.get('YTDL_OPTIONS', '{}')) assert isinstance(self.YTDL_OPTIONS, dict) except (json.decoder.JSONDecodeError, AssertionError): msg = 'Environment variable YTDL_OPTIONS is invalid' log.error(msg) return (False, msg) if not self.YTDL_OPTIONS_FILE: return (True, '') log.info(f'Loading yt-dlp custom options from "{self.YTDL_OPTIONS_FILE}"') if not os.path.exists(self.YTDL_OPTIONS_FILE): msg = f'File "{self.YTDL_OPTIONS_FILE}" not found' log.error(msg) return (False, msg) try: with open(self.YTDL_OPTIONS_FILE) as json_data: opts = json.load(json_data) assert isinstance(opts, dict) except (json.decoder.JSONDecodeError, AssertionError): msg = 'YTDL_OPTIONS_FILE contents is invalid' log.error(msg) return (False, msg) self.YTDL_OPTIONS.update(opts) return (True, '') config = Config() class ObjectSerializer(json.JSONEncoder): def default(self, obj): # First try to use __dict__ for custom objects if hasattr(obj, '__dict__'): return obj.__dict__ # Convert iterables (generators, dict_items, etc.) to lists # Exclude strings and bytes which are also iterable elif hasattr(obj, '__iter__') and not isinstance(obj, (str, bytes)): try: return list(obj) except: pass # Fall back to default behavior return json.JSONEncoder.default(self, obj) serializer = ObjectSerializer() app = web.Application() sio = socketio.AsyncServer(cors_allowed_origins='*') sio.attach(app, socketio_path=config.URL_PREFIX + 'socket.io') routes = web.RouteTableDef() class Notifier(DownloadQueueNotifier): async def added(self, dl): log.info(f"Notifier: Download added - {dl.title}") await sio.emit('added', serializer.encode(dl)) async def updated(self, dl): log.info(f"Notifier: Download updated - {dl.title}") await sio.emit('updated', serializer.encode(dl)) async def completed(self, dl): log.info(f"Notifier: Download completed - {dl.title}") await sio.emit('completed', serializer.encode(dl)) async def canceled(self, id): log.info(f"Notifier: Download canceled - {id}") await sio.emit('canceled', serializer.encode(id)) async def cleared(self, id): log.info(f"Notifier: Download cleared - {id}") await sio.emit('cleared', serializer.encode(id)) dqueue = DownloadQueue(config, Notifier()) app.on_startup.append(lambda app: dqueue.initialize()) class FileOpsFilter(DefaultFilter): def __call__(self, change_type: int, path: str) -> bool: # Check if this path matches our YTDL_OPTIONS_FILE if path != config.YTDL_OPTIONS_FILE: return False # For existing files, use samefile comparison to handle symlinks correctly if os.path.exists(config.YTDL_OPTIONS_FILE): try: if not os.path.samefile(path, config.YTDL_OPTIONS_FILE): return False except (OSError, IOError): # If samefile fails, fall back to string comparison if path != config.YTDL_OPTIONS_FILE: return False # Accept all change types for our file: modified, added, deleted return change_type in (Change.modified, Change.added, Change.deleted) def get_options_update_time(success=True, msg=''): result = { 'success': success, 'msg': msg, 'update_time': None } # Only try to get file modification time if YTDL_OPTIONS_FILE is set and file exists if config.YTDL_OPTIONS_FILE and os.path.exists(config.YTDL_OPTIONS_FILE): try: result['update_time'] = os.path.getmtime(config.YTDL_OPTIONS_FILE) except (OSError, IOError) as e: log.warning(f"Could not get modification time for {config.YTDL_OPTIONS_FILE}: {e}") result['update_time'] = None return result async def watch_files(): async def _watch_files(): async for changes in awatch(config.YTDL_OPTIONS_FILE, watch_filter=FileOpsFilter()): success, msg = config.load_ytdl_options() result = get_options_update_time(success, msg) await sio.emit('ytdl_options_changed', serializer.encode(result)) log.info(f'Starting Watch File: {config.YTDL_OPTIONS_FILE}') asyncio.create_task(_watch_files()) if config.YTDL_OPTIONS_FILE: app.on_startup.append(lambda app: watch_files()) @routes.post(config.URL_PREFIX + 'add') async def add(request): log.info("Received request to add download") post = await request.json() log.info(f"Request data: {post}") url = post.get('url') quality = post.get('quality') if not url or not quality: log.error("Bad request: missing 'url' or 'quality'") raise web.HTTPBadRequest() format = post.get('format') folder = post.get('folder') custom_name_prefix = post.get('custom_name_prefix') playlist_strict_mode = post.get('playlist_strict_mode') playlist_item_limit = post.get('playlist_item_limit') auto_start = post.get('auto_start') if custom_name_prefix is None: custom_name_prefix = '' if auto_start is None: auto_start = True if playlist_strict_mode is None: playlist_strict_mode = config.DEFAULT_OPTION_PLAYLIST_STRICT_MODE if playlist_item_limit is None: playlist_item_limit = config.DEFAULT_OPTION_PLAYLIST_ITEM_LIMIT playlist_item_limit = int(playlist_item_limit) status = await dqueue.add(url, quality, format, folder, custom_name_prefix, playlist_strict_mode, playlist_item_limit, auto_start) return web.Response(text=serializer.encode(status)) @routes.post(config.URL_PREFIX + 'cookie') async def set_cookie(request): """Accept cookie string and save as cookie file for domain""" log.info("Received request to set cookie") post = await request.json() url = post.get('url') cookie = post.get('cookie') domain = post.get('domain') if not cookie: log.error("Bad request: missing 'cookie'") raise web.HTTPBadRequest() # Determine domain from either explicit domain field or URL if not domain: if url: parsed_url = urlparse(url) domain = parsed_url.netloc else: log.error("Bad request: missing both 'url' and 'domain'") raise web.HTTPBadRequest() log.info(f"Processing cookie for domain: {domain}") try: # Decode base64 cookie if it appears to be encoded try: # Check if cookie is base64 encoded decoded_cookie = base64.b64decode(cookie).decode('utf-8') log.info(f"Cookie was base64 encoded, decoded successfully") cookie = decoded_cookie except Exception as e: # If decoding fails, assume it's already plain text log.info(f"Cookie is not base64 encoded or decode failed ({e}), using as-is") log.debug(f"Cookie content: {cookie[:100]}...") # Log first 100 chars # Create cookies directory if it doesn't exist cookies_dir = os.path.join(config.STATE_DIR, 'cookies') os.makedirs(cookies_dir, exist_ok=True) # Use domain as filename (sanitized) safe_domain = domain.replace(':', '_').replace('/', '_') cookie_file = os.path.join(cookies_dir, f'{safe_domain}.txt') log.info(f"Writing cookie file to: {cookie_file}") # Convert cookie string to Netscape cookie file format with open(cookie_file, 'w') as f: f.write('# Netscape HTTP Cookie File\n') f.write(f'# This file was generated by MeTube for {domain}\n') f.write('# Edit at your own risk.\n\n') # Parse cookie string (format: "key1=value1; key2=value2; ...") cookie_count = 0 for cookie_pair in cookie.split(';'): cookie_pair = cookie_pair.strip() if '=' in cookie_pair: key, value = cookie_pair.split('=', 1) key = key.strip() value = value.strip() # Netscape format: domain\tflag\tpath\tsecure\texpiration\tname\tvalue # domain: .domain.com (with leading dot for all subdomains) # flag: TRUE (include subdomains) # path: / (all paths) # secure: FALSE (http and https) # expiration: 2147483647 (max 32-bit timestamp - Jan 2038) # name: cookie name # value: cookie value f.write(f'.{domain}\tTRUE\t/\tFALSE\t2147483647\t{key}\t{value}\n') cookie_count += 1 log.debug(f"Added cookie: {key}={value[:20]}...") log.info(f"Cookie file created successfully with {cookie_count} cookies at {cookie_file}") return web.Response(text=serializer.encode({ 'status': 'ok', 'cookie_file': cookie_file, 'cookie_count': cookie_count, 'msg': f'Cookie saved successfully for {domain} ({cookie_count} cookies)' })) except Exception as e: log.error(f"Error saving cookie: {str(e)}", exc_info=True) return web.Response(text=serializer.encode({ 'status': 'error', 'msg': f'Failed to save cookie: {str(e)}' })) @routes.post(config.URL_PREFIX + 'delete') async def delete(request): post = await request.json() ids = post.get('ids') where = post.get('where') if not ids or where not in ['queue', 'done']: log.error("Bad request: missing 'ids' or incorrect 'where' value") raise web.HTTPBadRequest() status = await (dqueue.cancel(ids) if where == 'queue' else dqueue.clear(ids)) log.info(f"Download delete request processed for ids: {ids}, where: {where}") return web.Response(text=serializer.encode(status)) @routes.post(config.URL_PREFIX + 'start') async def start(request): post = await request.json() ids = post.get('ids') log.info(f"Received request to start pending downloads for ids: {ids}") status = await dqueue.start_pending(ids) return web.Response(text=serializer.encode(status)) @routes.get(config.URL_PREFIX + 'history') async def history(request): history = { 'done': [], 'queue': [], 'pending': []} for _, v in dqueue.queue.saved_items(): history['queue'].append(v) for _, v in dqueue.done.saved_items(): history['done'].append(v) for _, v in dqueue.pending.saved_items(): history['pending'].append(v) log.info("Sending download history") return web.Response(text=serializer.encode(history)) @sio.event async def connect(sid, environ): log.info(f"Client connected: {sid}") await sio.emit('all', serializer.encode(dqueue.get()), to=sid) await sio.emit('configuration', serializer.encode(config), to=sid) if config.CUSTOM_DIRS: await sio.emit('custom_dirs', serializer.encode(get_custom_dirs()), to=sid) if config.YTDL_OPTIONS_FILE: await sio.emit('ytdl_options_changed', serializer.encode(get_options_update_time()), to=sid) def get_custom_dirs(): def recursive_dirs(base): path = pathlib.Path(base) # Converts PosixPath object to string, and remove base/ prefix def convert(p): s = str(p) if s.startswith(base): s = s[len(base):] if s.startswith('/'): s = s[1:] return s # Include only directories which do not match the exclude filter def include_dir(d): if len(config.CUSTOM_DIRS_EXCLUDE_REGEX) == 0: return True else: return re.search(config.CUSTOM_DIRS_EXCLUDE_REGEX, d) is None # Recursively lists all subdirectories of DOWNLOAD_DIR dirs = list(filter(include_dir, map(convert, path.glob('**/')))) return dirs download_dir = recursive_dirs(config.DOWNLOAD_DIR) audio_download_dir = download_dir if config.DOWNLOAD_DIR != config.AUDIO_DOWNLOAD_DIR: audio_download_dir = recursive_dirs(config.AUDIO_DOWNLOAD_DIR) return { "download_dir": download_dir, "audio_download_dir": audio_download_dir } @routes.get(config.URL_PREFIX) def index(request): response = web.FileResponse(os.path.join(config.BASE_DIR, 'ui/dist/metube/browser/index.html')) if 'metube_theme' not in request.cookies: response.set_cookie('metube_theme', config.DEFAULT_THEME) return response @routes.get(config.URL_PREFIX + 'robots.txt') def robots(request): if config.ROBOTS_TXT: response = web.FileResponse(os.path.join(config.BASE_DIR, config.ROBOTS_TXT)) else: response = web.Response( text="User-agent: *\nDisallow: /download/\nDisallow: /audio_download/\n" ) return response @routes.get(config.URL_PREFIX + 'version') def version(request): return web.json_response({ "yt-dlp": yt_dlp_version, "version": os.getenv("METUBE_VERSION", "dev") }) if config.URL_PREFIX != '/': @routes.get('/') def index_redirect_root(request): return web.HTTPFound(config.URL_PREFIX) @routes.get(config.URL_PREFIX[:-1]) def index_redirect_dir(request): return web.HTTPFound(config.URL_PREFIX) routes.static(config.URL_PREFIX + 'download/', config.DOWNLOAD_DIR, show_index=config.DOWNLOAD_DIRS_INDEXABLE) routes.static(config.URL_PREFIX + 'audio_download/', config.AUDIO_DOWNLOAD_DIR, show_index=config.DOWNLOAD_DIRS_INDEXABLE) routes.static(config.URL_PREFIX, os.path.join(config.BASE_DIR, 'ui/dist/metube/browser')) try: app.add_routes(routes) except ValueError as e: if 'ui/dist/metube/browser' in str(e): raise RuntimeError('Could not find the frontend UI static assets. Please run `node_modules/.bin/ng build` inside the ui folder') from e raise e # https://github.com/aio-libs/aiohttp/pull/4615 waiting for release # @routes.options(config.URL_PREFIX + 'add') async def add_cors(request): return web.Response(text=serializer.encode({"status": "ok"})) async def cookie_cors(request): return web.Response(text=serializer.encode({"status": "ok"})) app.router.add_route('OPTIONS', config.URL_PREFIX + 'add', add_cors) app.router.add_route('OPTIONS', config.URL_PREFIX + 'cookie', cookie_cors) async def on_prepare(request, response): if 'Origin' in request.headers: response.headers['Access-Control-Allow-Origin'] = request.headers['Origin'] response.headers['Access-Control-Allow-Headers'] = 'Content-Type' app.on_response_prepare.append(on_prepare) def supports_reuse_port(): try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) sock.close() return True except (AttributeError, OSError): return False def parseLogLevel(logLevel): match logLevel: case 'DEBUG': return logging.DEBUG case 'INFO': return logging.INFO case 'WARNING': return logging.WARNING case 'ERROR': return logging.ERROR case 'CRITICAL': return logging.CRITICAL case _: return None def isAccessLogEnabled(): if config.ENABLE_ACCESSLOG: return access_logger else: return None if __name__ == '__main__': logging.basicConfig(level=parseLogLevel(config.LOGLEVEL)) log.info(f"Listening on {config.HOST}:{config.PORT}") if config.HTTPS: ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) ssl_context.load_cert_chain(certfile=config.CERTFILE, keyfile=config.KEYFILE) web.run_app(app, host=config.HOST, port=int(config.PORT), reuse_port=supports_reuse_port(), ssl_context=ssl_context, access_log=isAccessLogEnabled()) else: web.run_app(app, host=config.HOST, port=int(config.PORT), reuse_port=supports_reuse_port(), access_log=isAccessLogEnabled())