tubewatch/app/main.py

513 lines
20 KiB
Python

#!/usr/bin/env python3
# pylint: disable=no-member,method-hidden
import os
import sys
import asyncio
from pathlib import Path
from aiohttp import web
from aiohttp.log import access_logger
import ssl
import socket
import socketio
import logging
import json
import pathlib
import re
import base64
from urllib.parse import urlparse
from watchfiles import DefaultFilter, Change, awatch
from ytdl import DownloadQueueNotifier, DownloadQueue
from yt_dlp.version import __version__ as yt_dlp_version
log = logging.getLogger('main')
class Config:
_DEFAULTS = {
'DOWNLOAD_DIR': '.',
'AUDIO_DOWNLOAD_DIR': '%%DOWNLOAD_DIR',
'TEMP_DIR': '%%DOWNLOAD_DIR',
'DOWNLOAD_DIRS_INDEXABLE': 'false',
'CUSTOM_DIRS': 'true',
'CREATE_CUSTOM_DIRS': 'true',
'CUSTOM_DIRS_EXCLUDE_REGEX': r'(^|/)[.@].*$',
'DELETE_FILE_ON_TRASHCAN': 'true',
'STATE_DIR': '.',
'URL_PREFIX': '',
'PUBLIC_HOST_URL': 'download/',
'PUBLIC_HOST_AUDIO_URL': 'audio_download/',
'OUTPUT_TEMPLATE': '%(title)s.%(ext)s',
'OUTPUT_TEMPLATE_CHAPTER': '%(title)s - %(section_number)s %(section_title)s.%(ext)s',
'OUTPUT_TEMPLATE_PLAYLIST': '%(playlist_title)s/%(title)s.%(ext)s',
'DEFAULT_OPTION_PLAYLIST_STRICT_MODE' : 'false',
'DEFAULT_OPTION_PLAYLIST_ITEM_LIMIT' : '0',
'YTDL_OPTIONS': '{}',
'YTDL_OPTIONS_FILE': '',
'ROBOTS_TXT': '',
'HOST': '0.0.0.0',
'PORT': '8081',
'HTTPS': 'false',
'CERTFILE': '',
'KEYFILE': '',
'BASE_DIR': '',
'DEFAULT_THEME': 'auto',
'DOWNLOAD_MODE': 'limited',
'MAX_CONCURRENT_DOWNLOADS': 3,
'LOGLEVEL': 'INFO',
'ENABLE_ACCESSLOG': 'false',
}
_BOOLEAN = ('DOWNLOAD_DIRS_INDEXABLE', 'CUSTOM_DIRS', 'CREATE_CUSTOM_DIRS', 'DELETE_FILE_ON_TRASHCAN', 'DEFAULT_OPTION_PLAYLIST_STRICT_MODE', 'HTTPS', 'ENABLE_ACCESSLOG')
def __init__(self):
for k, v in self._DEFAULTS.items():
setattr(self, k, os.environ.get(k, v))
for k, v in self.__dict__.items():
if isinstance(v, str) and v.startswith('%%'):
setattr(self, k, getattr(self, v[2:]))
if k in self._BOOLEAN:
if v not in ('true', 'false', 'True', 'False', 'on', 'off', '1', '0'):
log.error(f'Environment variable "{k}" is set to a non-boolean value "{v}"')
sys.exit(1)
setattr(self, k, v in ('true', 'True', 'on', '1'))
if not self.URL_PREFIX.endswith('/'):
self.URL_PREFIX += '/'
# Convert relative addresses to absolute addresses to prevent the failure of file address comparison
if self.YTDL_OPTIONS_FILE and self.YTDL_OPTIONS_FILE.startswith('.'):
self.YTDL_OPTIONS_FILE = str(Path(self.YTDL_OPTIONS_FILE).resolve())
success,_ = self.load_ytdl_options()
if not success:
sys.exit(1)
def load_ytdl_options(self) -> tuple[bool, str]:
try:
self.YTDL_OPTIONS = json.loads(os.environ.get('YTDL_OPTIONS', '{}'))
assert isinstance(self.YTDL_OPTIONS, dict)
except (json.decoder.JSONDecodeError, AssertionError):
msg = 'Environment variable YTDL_OPTIONS is invalid'
log.error(msg)
return (False, msg)
if not self.YTDL_OPTIONS_FILE:
return (True, '')
log.info(f'Loading yt-dlp custom options from "{self.YTDL_OPTIONS_FILE}"')
if not os.path.exists(self.YTDL_OPTIONS_FILE):
msg = f'File "{self.YTDL_OPTIONS_FILE}" not found'
log.error(msg)
return (False, msg)
try:
with open(self.YTDL_OPTIONS_FILE) as json_data:
opts = json.load(json_data)
assert isinstance(opts, dict)
except (json.decoder.JSONDecodeError, AssertionError):
msg = 'YTDL_OPTIONS_FILE contents is invalid'
log.error(msg)
return (False, msg)
self.YTDL_OPTIONS.update(opts)
return (True, '')
config = Config()
class ObjectSerializer(json.JSONEncoder):
def default(self, obj):
# First try to use __dict__ for custom objects
if hasattr(obj, '__dict__'):
return obj.__dict__
# Convert iterables (generators, dict_items, etc.) to lists
# Exclude strings and bytes which are also iterable
elif hasattr(obj, '__iter__') and not isinstance(obj, (str, bytes)):
try:
return list(obj)
except:
pass
# Fall back to default behavior
return json.JSONEncoder.default(self, obj)
serializer = ObjectSerializer()
app = web.Application()
sio = socketio.AsyncServer(cors_allowed_origins='*')
sio.attach(app, socketio_path=config.URL_PREFIX + 'socket.io')
routes = web.RouteTableDef()
class Notifier(DownloadQueueNotifier):
async def added(self, dl):
log.info(f"Notifier: Download added - {dl.title}")
await sio.emit('added', serializer.encode(dl))
async def updated(self, dl):
log.info(f"Notifier: Download updated - {dl.title}")
await sio.emit('updated', serializer.encode(dl))
async def completed(self, dl):
log.info(f"Notifier: Download completed - {dl.title}")
await sio.emit('completed', serializer.encode(dl))
async def canceled(self, id):
log.info(f"Notifier: Download canceled - {id}")
await sio.emit('canceled', serializer.encode(id))
async def cleared(self, id):
log.info(f"Notifier: Download cleared - {id}")
await sio.emit('cleared', serializer.encode(id))
dqueue = DownloadQueue(config, Notifier())
app.on_startup.append(lambda app: dqueue.initialize())
class FileOpsFilter(DefaultFilter):
def __call__(self, change_type: int, path: str) -> bool:
# Check if this path matches our YTDL_OPTIONS_FILE
if path != config.YTDL_OPTIONS_FILE:
return False
# For existing files, use samefile comparison to handle symlinks correctly
if os.path.exists(config.YTDL_OPTIONS_FILE):
try:
if not os.path.samefile(path, config.YTDL_OPTIONS_FILE):
return False
except (OSError, IOError):
# If samefile fails, fall back to string comparison
if path != config.YTDL_OPTIONS_FILE:
return False
# Accept all change types for our file: modified, added, deleted
return change_type in (Change.modified, Change.added, Change.deleted)
def get_options_update_time(success=True, msg=''):
result = {
'success': success,
'msg': msg,
'update_time': None
}
# Only try to get file modification time if YTDL_OPTIONS_FILE is set and file exists
if config.YTDL_OPTIONS_FILE and os.path.exists(config.YTDL_OPTIONS_FILE):
try:
result['update_time'] = os.path.getmtime(config.YTDL_OPTIONS_FILE)
except (OSError, IOError) as e:
log.warning(f"Could not get modification time for {config.YTDL_OPTIONS_FILE}: {e}")
result['update_time'] = None
return result
async def watch_files():
async def _watch_files():
async for changes in awatch(config.YTDL_OPTIONS_FILE, watch_filter=FileOpsFilter()):
success, msg = config.load_ytdl_options()
result = get_options_update_time(success, msg)
await sio.emit('ytdl_options_changed', serializer.encode(result))
log.info(f'Starting Watch File: {config.YTDL_OPTIONS_FILE}')
asyncio.create_task(_watch_files())
if config.YTDL_OPTIONS_FILE:
app.on_startup.append(lambda app: watch_files())
@routes.post(config.URL_PREFIX + 'add')
async def add(request):
log.info("Received request to add download")
post = await request.json()
log.info(f"Request data: {post}")
url = post.get('url')
quality = post.get('quality')
if not url or not quality:
log.error("Bad request: missing 'url' or 'quality'")
raise web.HTTPBadRequest()
format = post.get('format')
folder = post.get('folder')
custom_name_prefix = post.get('custom_name_prefix')
playlist_strict_mode = post.get('playlist_strict_mode')
playlist_item_limit = post.get('playlist_item_limit')
auto_start = post.get('auto_start')
if custom_name_prefix is None:
custom_name_prefix = ''
if auto_start is None:
auto_start = True
if playlist_strict_mode is None:
playlist_strict_mode = config.DEFAULT_OPTION_PLAYLIST_STRICT_MODE
if playlist_item_limit is None:
playlist_item_limit = config.DEFAULT_OPTION_PLAYLIST_ITEM_LIMIT
playlist_item_limit = int(playlist_item_limit)
status = await dqueue.add(url, quality, format, folder, custom_name_prefix, playlist_strict_mode, playlist_item_limit, auto_start)
return web.Response(text=serializer.encode(status))
@routes.post(config.URL_PREFIX + 'cookie')
async def set_cookie(request):
"""Accept cookie string and save as cookie file for domain"""
log.info("Received request to set cookie")
post = await request.json()
url = post.get('url')
cookie = post.get('cookie')
domain = post.get('domain')
if not cookie:
log.error("Bad request: missing 'cookie'")
raise web.HTTPBadRequest()
# Determine domain from either explicit domain field or URL
if not domain:
if url:
parsed_url = urlparse(url)
domain = parsed_url.netloc
else:
log.error("Bad request: missing both 'url' and 'domain'")
raise web.HTTPBadRequest()
log.info(f"Processing cookie for domain: {domain}")
try:
# Decode base64 cookie if it appears to be encoded
try:
# Check if cookie is base64 encoded
decoded_cookie = base64.b64decode(cookie).decode('utf-8')
log.info(f"Cookie was base64 encoded, decoded successfully")
cookie = decoded_cookie
except Exception as e:
# If decoding fails, assume it's already plain text
log.info(f"Cookie is not base64 encoded or decode failed ({e}), using as-is")
log.debug(f"Cookie content: {cookie[:100]}...") # Log first 100 chars
# Create cookies directory if it doesn't exist
cookies_dir = os.path.join(config.STATE_DIR, 'cookies')
os.makedirs(cookies_dir, exist_ok=True)
# Use domain as filename (sanitized)
safe_domain = domain.replace(':', '_').replace('/', '_')
cookie_file = os.path.join(cookies_dir, f'{safe_domain}.txt')
log.info(f"Writing cookie file to: {cookie_file}")
# Convert cookie string to Netscape cookie file format
with open(cookie_file, 'w') as f:
f.write('# Netscape HTTP Cookie File\n')
f.write(f'# This file was generated by MeTube for {domain}\n')
f.write('# Edit at your own risk.\n\n')
# Parse cookie string (format: "key1=value1; key2=value2; ...")
cookie_count = 0
for cookie_pair in cookie.split(';'):
cookie_pair = cookie_pair.strip()
if '=' in cookie_pair:
key, value = cookie_pair.split('=', 1)
key = key.strip()
value = value.strip()
# Netscape format: domain\tflag\tpath\tsecure\texpiration\tname\tvalue
# domain: .domain.com (with leading dot for all subdomains)
# flag: TRUE (include subdomains)
# path: / (all paths)
# secure: FALSE (http and https)
# expiration: 2147483647 (max 32-bit timestamp - Jan 2038)
# name: cookie name
# value: cookie value
f.write(f'.{domain}\tTRUE\t/\tFALSE\t2147483647\t{key}\t{value}\n')
cookie_count += 1
log.debug(f"Added cookie: {key}={value[:20]}...")
log.info(f"Cookie file created successfully with {cookie_count} cookies at {cookie_file}")
return web.Response(text=serializer.encode({
'status': 'ok',
'cookie_file': cookie_file,
'cookie_count': cookie_count,
'msg': f'Cookie saved successfully for {domain} ({cookie_count} cookies)'
}))
except Exception as e:
log.error(f"Error saving cookie: {str(e)}", exc_info=True)
return web.Response(text=serializer.encode({
'status': 'error',
'msg': f'Failed to save cookie: {str(e)}'
}))
@routes.post(config.URL_PREFIX + 'delete')
async def delete(request):
post = await request.json()
ids = post.get('ids')
where = post.get('where')
if not ids or where not in ['queue', 'done']:
log.error("Bad request: missing 'ids' or incorrect 'where' value")
raise web.HTTPBadRequest()
status = await (dqueue.cancel(ids) if where == 'queue' else dqueue.clear(ids))
log.info(f"Download delete request processed for ids: {ids}, where: {where}")
return web.Response(text=serializer.encode(status))
@routes.post(config.URL_PREFIX + 'start')
async def start(request):
post = await request.json()
ids = post.get('ids')
log.info(f"Received request to start pending downloads for ids: {ids}")
status = await dqueue.start_pending(ids)
return web.Response(text=serializer.encode(status))
@routes.get(config.URL_PREFIX + 'history')
async def history(request):
history = { 'done': [], 'queue': [], 'pending': []}
for _, v in dqueue.queue.saved_items():
history['queue'].append(v)
for _, v in dqueue.done.saved_items():
history['done'].append(v)
for _, v in dqueue.pending.saved_items():
history['pending'].append(v)
log.info("Sending download history")
return web.Response(text=serializer.encode(history))
@sio.event
async def connect(sid, environ):
log.info(f"Client connected: {sid}")
await sio.emit('all', serializer.encode(dqueue.get()), to=sid)
await sio.emit('configuration', serializer.encode(config), to=sid)
if config.CUSTOM_DIRS:
await sio.emit('custom_dirs', serializer.encode(get_custom_dirs()), to=sid)
if config.YTDL_OPTIONS_FILE:
await sio.emit('ytdl_options_changed', serializer.encode(get_options_update_time()), to=sid)
def get_custom_dirs():
def recursive_dirs(base):
path = pathlib.Path(base)
# Converts PosixPath object to string, and remove base/ prefix
def convert(p):
s = str(p)
if s.startswith(base):
s = s[len(base):]
if s.startswith('/'):
s = s[1:]
return s
# Include only directories which do not match the exclude filter
def include_dir(d):
if len(config.CUSTOM_DIRS_EXCLUDE_REGEX) == 0:
return True
else:
return re.search(config.CUSTOM_DIRS_EXCLUDE_REGEX, d) is None
# Recursively lists all subdirectories of DOWNLOAD_DIR
dirs = list(filter(include_dir, map(convert, path.glob('**/'))))
return dirs
download_dir = recursive_dirs(config.DOWNLOAD_DIR)
audio_download_dir = download_dir
if config.DOWNLOAD_DIR != config.AUDIO_DOWNLOAD_DIR:
audio_download_dir = recursive_dirs(config.AUDIO_DOWNLOAD_DIR)
return {
"download_dir": download_dir,
"audio_download_dir": audio_download_dir
}
@routes.get(config.URL_PREFIX)
def index(request):
response = web.FileResponse(os.path.join(config.BASE_DIR, 'ui/dist/metube/browser/index.html'))
if 'metube_theme' not in request.cookies:
response.set_cookie('metube_theme', config.DEFAULT_THEME)
return response
@routes.get(config.URL_PREFIX + 'robots.txt')
def robots(request):
if config.ROBOTS_TXT:
response = web.FileResponse(os.path.join(config.BASE_DIR, config.ROBOTS_TXT))
else:
response = web.Response(
text="User-agent: *\nDisallow: /download/\nDisallow: /audio_download/\n"
)
return response
@routes.get(config.URL_PREFIX + 'version')
def version(request):
return web.json_response({
"yt-dlp": yt_dlp_version,
"version": os.getenv("METUBE_VERSION", "dev")
})
if config.URL_PREFIX != '/':
@routes.get('/')
def index_redirect_root(request):
return web.HTTPFound(config.URL_PREFIX)
@routes.get(config.URL_PREFIX[:-1])
def index_redirect_dir(request):
return web.HTTPFound(config.URL_PREFIX)
routes.static(config.URL_PREFIX + 'download/', config.DOWNLOAD_DIR, show_index=config.DOWNLOAD_DIRS_INDEXABLE)
routes.static(config.URL_PREFIX + 'audio_download/', config.AUDIO_DOWNLOAD_DIR, show_index=config.DOWNLOAD_DIRS_INDEXABLE)
routes.static(config.URL_PREFIX, os.path.join(config.BASE_DIR, 'ui/dist/metube/browser'))
try:
app.add_routes(routes)
except ValueError as e:
if 'ui/dist/metube/browser' in str(e):
raise RuntimeError('Could not find the frontend UI static assets. Please run `node_modules/.bin/ng build` inside the ui folder') from e
raise e
# https://github.com/aio-libs/aiohttp/pull/4615 waiting for release
# @routes.options(config.URL_PREFIX + 'add')
async def add_cors(request):
return web.Response(text=serializer.encode({"status": "ok"}))
async def cookie_cors(request):
return web.Response(text=serializer.encode({"status": "ok"}))
app.router.add_route('OPTIONS', config.URL_PREFIX + 'add', add_cors)
app.router.add_route('OPTIONS', config.URL_PREFIX + 'cookie', cookie_cors)
async def on_prepare(request, response):
if 'Origin' in request.headers:
response.headers['Access-Control-Allow-Origin'] = request.headers['Origin']
response.headers['Access-Control-Allow-Headers'] = 'Content-Type'
app.on_response_prepare.append(on_prepare)
def supports_reuse_port():
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
sock.close()
return True
except (AttributeError, OSError):
return False
def parseLogLevel(logLevel):
match logLevel:
case 'DEBUG':
return logging.DEBUG
case 'INFO':
return logging.INFO
case 'WARNING':
return logging.WARNING
case 'ERROR':
return logging.ERROR
case 'CRITICAL':
return logging.CRITICAL
case _:
return None
def isAccessLogEnabled():
if config.ENABLE_ACCESSLOG:
return access_logger
else:
return None
if __name__ == '__main__':
logging.basicConfig(level=parseLogLevel(config.LOGLEVEL))
log.info(f"Listening on {config.HOST}:{config.PORT}")
if config.HTTPS:
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.load_cert_chain(certfile=config.CERTFILE, keyfile=config.KEYFILE)
web.run_app(app, host=config.HOST, port=int(config.PORT), reuse_port=supports_reuse_port(), ssl_context=ssl_context, access_log=isAccessLogEnabled())
else:
web.run_app(app, host=config.HOST, port=int(config.PORT), reuse_port=supports_reuse_port(), access_log=isAccessLogEnabled())