From d2bf4bd38587705dad18aca6c0e2288d77b13169 Mon Sep 17 00:00:00 2001 From: evilmonkeydiaz Date: Sat, 17 Aug 2024 14:22:03 -0700 Subject: [PATCH] update class --- app/main.py | 226 +++++++++++++++++++++++++++----------------------- app/ytdl.py | 233 +++++++++++++++++----------------------------------- 2 files changed, 199 insertions(+), 260 deletions(-) diff --git a/app/main.py b/app/main.py index 2caf1fb..c00231f 100644 --- a/app/main.py +++ b/app/main.py @@ -3,19 +3,20 @@ import os import sys +import asyncio from aiohttp import web import socketio import logging import json import pathlib -import sys + from ytdl import DownloadQueueNotifier, DownloadQueue log = logging.getLogger('main') class Config: _DEFAULTS = { - 'DOWNLOAD_DIR': '.', + 'DOWNLOAD_DIR': 'C:/Users/Roger/Desktop/MeTube', 'AUDIO_DOWNLOAD_DIR': '%%DOWNLOAD_DIR', 'TEMP_DIR': '%%DOWNLOAD_DIR', 'DOWNLOAD_DIRS_INDEXABLE': 'false', @@ -40,10 +41,10 @@ class Config: def __init__(self): for k, v in self._DEFAULTS.items(): - setattr(self, k, os.environ[k] if k in os.environ else v) + setattr(self, k, os.environ.get(k, v)) for k, v in self.__dict__.items(): - if v.startswith('%%'): + if isinstance(v, str) and v.startswith('%%'): setattr(self, k, getattr(self, v[2:])) if k in self._BOOLEAN: if v not in ('true', 'false', 'True', 'False', 'on', 'off', '1', '0'): @@ -85,10 +86,6 @@ class ObjectSerializer(json.JSONEncoder): return json.JSONEncoder.default(self, obj) serializer = ObjectSerializer() -app = web.Application() -sio = socketio.AsyncServer(cors_allowed_origins='*') -sio.attach(app, socketio_path=config.URL_PREFIX + 'socket.io') -routes = web.RouteTableDef() class Notifier(DownloadQueueNotifier): async def added(self, dl): @@ -106,80 +103,124 @@ class Notifier(DownloadQueueNotifier): async def cleared(self, id): await sio.emit('cleared', serializer.encode(id)) -dqueue = DownloadQueue(config, Notifier()) -app.on_startup.append(lambda app: dqueue.initialize()) +async def init_app(): + app = web.Application() + global sio + sio = socketio.AsyncServer(cors_allowed_origins='*') + sio.attach(app, socketio_path=config.URL_PREFIX + 'socket.io') + routes = web.RouteTableDef() -@routes.post(config.URL_PREFIX + 'add') -async def add(request): - post = await request.json() - url = post.get('url') - quality = post.get('quality') - if not url or not quality: - raise web.HTTPBadRequest() - format = post.get('format') - folder = post.get('folder') - custom_name_prefix = post.get('custom_name_prefix') - auto_start = post.get('auto_start') - if custom_name_prefix is None: - custom_name_prefix = '' - if auto_start is None: - auto_start = True - status = await dqueue.add(url, quality, format, folder, custom_name_prefix, auto_start) - return web.Response(text=serializer.encode(status)) + @routes.post(config.URL_PREFIX + 'add') + async def add(request): + post = await request.json() + url = post.get('url') + quality = post.get('quality') + if not url or not quality: + raise web.HTTPBadRequest() + format = post.get('format') + folder = post.get('folder') + custom_name_prefix = post.get('custom_name_prefix') + auto_start = post.get('auto_start') + if custom_name_prefix is None: + custom_name_prefix = '' + if auto_start is None: + auto_start = True + status = await request.app['dqueue'].add(url, quality, format, folder, custom_name_prefix, auto_start) + return web.Response(text=serializer.encode(status)) -@routes.post(config.URL_PREFIX + 'delete') -async def delete(request): - post = await request.json() - ids = post.get('ids') - where = post.get('where') - if not ids or where not in ['queue', 'done']: - raise web.HTTPBadRequest() - status = await (dqueue.cancel(ids) if where == 'queue' else dqueue.clear(ids)) - return web.Response(text=serializer.encode(status)) + @routes.post(config.URL_PREFIX + 'delete') + async def delete(request): + post = await request.json() + ids = post.get('ids') + where = post.get('where') + if not ids or where not in ['queue', 'done']: + raise web.HTTPBadRequest() + status = await (request.app['dqueue'].cancel(ids) if where == 'queue' else request.app['dqueue'].clear(ids)) + return web.Response(text=serializer.encode(status)) -@routes.post(config.URL_PREFIX + 'start') -async def start(request): - post = await request.json() - ids = post.get('ids') - status = await dqueue.start_pending(ids) - return web.Response(text=serializer.encode(status)) + @routes.post(config.URL_PREFIX + 'start') + async def start(request): + post = await request.json() + ids = post.get('ids') + status = await request.app['dqueue'].start_pending(ids) + return web.Response(text=serializer.encode(status)) -@routes.get(config.URL_PREFIX + 'history') -async def history(request): - history = { 'done': [], 'queue': []} + @routes.get(config.URL_PREFIX + 'history') + async def history(request): + history = { 'done': [], 'queue': []} - for _ ,v in dqueue.queue.saved_items(): - history['queue'].append(v) - for _ ,v in dqueue.done.saved_items(): - history['done'].append(v) + for _ ,v in request.app['dqueue'].queue.saved_items(): + history['queue'].append(v) + for _ ,v in request.app['dqueue'].done.saved_items(): + history['done'].append(v) - return web.Response(text=serializer.encode(history)) + return web.Response(text=serializer.encode(history)) -@sio.event -async def connect(sid, environ): - await sio.emit('all', serializer.encode(dqueue.get()), to=sid) - await sio.emit('configuration', serializer.encode(config), to=sid) - if config.CUSTOM_DIRS: - await sio.emit('custom_dirs', serializer.encode(get_custom_dirs()), to=sid) + @sio.event + async def connect(sid, environ): + await sio.emit('all', serializer.encode(request.app['dqueue'].get()), to=sid) + await sio.emit('configuration', serializer.encode(config), to=sid) + if config.CUSTOM_DIRS: + await sio.emit('custom_dirs', serializer.encode(get_custom_dirs()), to=sid) + + @routes.get(config.URL_PREFIX) + def index(request): + response = web.FileResponse(os.path.join(config.BASE_DIR, 'ui/dist/metube/index.html')) + if 'metube_theme' not in request.cookies: + response.set_cookie('metube_theme', config.DEFAULT_THEME) + return response + + if config.URL_PREFIX != '/': + @routes.get('/') + def index_redirect_root(request): + return web.HTTPFound(config.URL_PREFIX) + + @routes.get(config.URL_PREFIX[:-1]) + def index_redirect_dir(request): + return web.HTTPFound(config.URL_PREFIX) + + routes.static(config.URL_PREFIX + 'download/', config.DOWNLOAD_DIR, show_index=config.DOWNLOAD_DIRS_INDEXABLE) + routes.static(config.URL_PREFIX + 'audio_download/', config.AUDIO_DOWNLOAD_DIR, show_index=config.DOWNLOAD_DIRS_INDEXABLE) + routes.static(config.URL_PREFIX, os.path.join(config.BASE_DIR, 'ui/dist/metube')) + + try: + app.add_routes(routes) + except ValueError as e: + if 'ui/dist/metube' in str(e): + raise RuntimeError('Could not find the frontend UI static assets. Please run `node_modules/.bin/ng build` inside the ui folder') from e + raise e + + async def add_cors(request): + return web.Response(text=serializer.encode({"status": "ok"})) + + app.router.add_route('OPTIONS', config.URL_PREFIX + 'add', add_cors) + + async def on_prepare(request, response): + if 'Origin' in request.headers: + response.headers['Access-Control-Allow-Origin'] = request.headers['Origin'] + response.headers['Access-Control-Allow-Headers'] = 'Content-Type' + + app.on_response_prepare.append(on_prepare) + + dqueue = DownloadQueue(config, Notifier()) + await dqueue.initialize() + app['dqueue'] = dqueue + + return app def get_custom_dirs(): def recursive_dirs(base): path = pathlib.Path(base) - # Converts PosixPath object to string, and remove base/ prefix def convert(p): s = str(p) if s.startswith(base): s = s[len(base):] - if s.startswith('/'): s = s[1:] - return s - # Recursively lists all subdirectories of DOWNLOAD_DIR dirs = list(filter(None, map(convert, path.glob('**')))) - return dirs download_dir = recursive_dirs(config.DOWNLOAD_DIR) @@ -193,52 +234,33 @@ def get_custom_dirs(): "audio_download_dir": audio_download_dir } -@routes.get(config.URL_PREFIX) -def index(request): - response = web.FileResponse(os.path.join(config.BASE_DIR, 'ui/dist/metube/index.html')) - if 'metube_theme' not in request.cookies: - response.set_cookie('metube_theme', config.DEFAULT_THEME) - return response +async def start_background_tasks(app): + app['dqueue_task'] = asyncio.create_task(app['dqueue'].run()) -if config.URL_PREFIX != '/': - @routes.get('/') - def index_redirect_root(request): - return web.HTTPFound(config.URL_PREFIX) +async def cleanup_background_tasks(app): + app['dqueue_task'].cancel() + await app['dqueue_task'] - @routes.get(config.URL_PREFIX[:-1]) - def index_redirect_dir(request): - return web.HTTPFound(config.URL_PREFIX) +def main(): + logging.basicConfig(level=logging.DEBUG) + log.info(f"Initializing application on {config.HOST}:{config.PORT}") -routes.static(config.URL_PREFIX + 'download/', config.DOWNLOAD_DIR, show_index=config.DOWNLOAD_DIRS_INDEXABLE) -routes.static(config.URL_PREFIX + 'audio_download/', config.AUDIO_DOWNLOAD_DIR, show_index=config.DOWNLOAD_DIRS_INDEXABLE) -routes.static(config.URL_PREFIX, os.path.join(config.BASE_DIR, 'ui/dist/metube')) -try: - app.add_routes(routes) -except ValueError as e: - if 'ui/dist/metube' in str(e): - raise RuntimeError('Could not find the frontend UI static assets. Please run `node_modules/.bin/ng build` inside the ui folder') from e - raise e + loop = asyncio.get_event_loop() + app = loop.run_until_complete(init_app()) -# https://github.com/aio-libs/aiohttp/pull/4615 waiting for release -# @routes.options(config.URL_PREFIX + 'add') -async def add_cors(request): - return web.Response(text=serializer.encode({"status": "ok"})) - -app.router.add_route('OPTIONS', config.URL_PREFIX + 'add', add_cors) - - -async def on_prepare(request, response): - if 'Origin' in request.headers: - response.headers['Access-Control-Allow-Origin'] = request.headers['Origin'] - response.headers['Access-Control-Allow-Headers'] = 'Content-Type' - -app.on_response_prepare.append(on_prepare) + app.on_startup.append(start_background_tasks) + app.on_cleanup.append(cleanup_background_tasks) + try: + if sys.platform.startswith('win'): + web.run_app(app, host=config.HOST, port=int(config.PORT)) + else: + web.run_app(app, host=config.HOST, port=int(config.PORT), reuse_port=True) + except Exception as e: + log.error(f"Failed to start the server: {str(e)}") + sys.exit(1) if __name__ == '__main__': - logging.basicConfig(level=logging.DEBUG) - log.info(f"Listening on {config.HOST}:{config.PORT}") if sys.platform.startswith('win'): - web.run_app(app, host=config.HOST, port=int(config.PORT)) - else: - web.run_app(app, host=config.HOST, port=int(config.PORT), reuse_port=True) + asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) + main() \ No newline at end of file diff --git a/app/ytdl.py b/app/ytdl.py index e9b4ee6..fe8eba4 100644 --- a/app/ytdl.py +++ b/app/ytdl.py @@ -4,7 +4,6 @@ from collections import OrderedDict import shelve import time import asyncio -import multiprocessing import logging import re from dl_formats import get_format, get_opts, AUDIO_FORMATS @@ -44,8 +43,6 @@ class DownloadInfo: self.error = error class Download: - manager = None - def __init__(self, download_dir, temp_dir, output_template, output_template_chapter, quality, format, ytdl_opts, info): self.download_dir = download_dir self.temp_dir = temp_dir @@ -56,106 +53,48 @@ class Download: self.info = info self.canceled = False self.tmpfilename = None - self.status_queue = None - self.proc = None - self.loop = None - self.notifier = None - - - def _download(self): - try: - def put_status(st): - self.status_queue.put({k: v for k, v in st.items() if k in ( - 'tmpfilename', - 'filename', - 'status', - 'msg', - 'total_bytes', - 'total_bytes_estimate', - 'downloaded_bytes', - 'speed', - 'eta', - )}) - def put_status_postprocessor(d): - if d['postprocessor'] == 'MoveFiles' and d['status'] == 'finished': - if '__finaldir' in d['info_dict']: - filename = os.path.join(d['info_dict']['__finaldir'], os.path.basename(d['info_dict']['filepath'])) - else: - filename = d['info_dict']['filepath'] - self.status_queue.put({'status': 'finished', 'filename': filename}) - ret = yt_dlp.YoutubeDL(params={ - 'quiet': True, - 'no_color': True, - #'skip_download': True, - 'paths': {"home": self.download_dir, "temp": self.temp_dir}, - 'outtmpl': { "default": self.output_template, "chapter": self.output_template_chapter }, - 'format': self.format, - 'socket_timeout': 30, - 'ignore_no_formats_error': True, - 'progress_hooks': [put_status], - 'postprocessor_hooks': [put_status_postprocessor], - **self.ytdl_opts, - }).download([self.info.url]) - self.status_queue.put({'status': 'finished' if ret == 0 else 'error'}) - except yt_dlp.utils.YoutubeDLError as exc: - self.status_queue.put({'status': 'error', 'msg': str(exc)}) async def start(self, notifier): - if Download.manager is None: - Download.manager = multiprocessing.Manager() - self.status_queue = Download.manager.Queue() - self.proc = multiprocessing.Process(target=self._download) - self.proc.start() - self.loop = asyncio.get_running_loop() - self.notifier = notifier self.info.status = 'preparing' - await self.notifier.updated(self.info) - asyncio.create_task(self.update_status()) - return await self.loop.run_in_executor(None, self.proc.join) + await notifier.updated(self.info) + + try: + result = await asyncio.get_event_loop().run_in_executor(None, self._download) + if result['status'] == 'finished': + self.info.status = 'finished' + self.info.filename = result.get('filename') + self.info.size = os.path.getsize(result['filename']) if os.path.exists(result['filename']) else None + else: + self.info.status = 'error' + self.info.msg = result.get('msg', 'Unknown error occurred') + except Exception as e: + self.info.status = 'error' + self.info.msg = str(e) + + await notifier.updated(self.info) + + def _download(self): + ydl_opts = { + 'quiet': True, + 'no_color': True, + 'paths': {"home": self.download_dir, "temp": self.temp_dir}, + 'outtmpl': {"default": self.output_template, "chapter": self.output_template_chapter}, + 'format': self.format, + 'socket_timeout': 30, + 'ignore_no_formats_error': True, + **self.ytdl_opts, + } + + with yt_dlp.YoutubeDL(ydl_opts) as ydl: + try: + info = ydl.extract_info(self.info.url, download=True) + return {'status': 'finished', 'filename': ydl.prepare_filename(info)} + except yt_dlp.utils.DownloadError as e: + return {'status': 'error', 'msg': str(e)} def cancel(self): - if self.running(): - self.proc.kill() self.canceled = True - def close(self): - if self.started(): - self.proc.close() - self.status_queue.put(None) - - def running(self): - try: - return self.proc is not None and self.proc.is_alive() - except ValueError: - return False - - def started(self): - return self.proc is not None - - async def update_status(self): - while True: - status = await self.loop.run_in_executor(None, self.status_queue.get) - if status is None: - return - self.tmpfilename = status.get('tmpfilename') - if 'filename' in status: - fileName = status.get('filename') - self.info.filename = os.path.relpath(fileName, self.download_dir) - self.info.size = os.path.getsize(fileName) if os.path.exists(fileName) else None - - # Set correct file extension for thumbnails - if(self.info.format == 'thumbnail'): - self.info.filename = re.sub(r'\.webm$', '.jpg', self.info.filename) - self.info.status = status['status'] - self.info.msg = status.get('msg') - if 'downloaded_bytes' in status: - total = status.get('total_bytes') or status.get('total_bytes_estimate') - if total: - self.info.percent = status['downloaded_bytes'] / total * 100 - self.info.speed = status.get('speed') - self.info.eta = status.get('eta') - await self.notifier.updated(self.info) - class PersistentQueue: def __init__(self, path): pdir = os.path.dirname(path) @@ -209,19 +148,25 @@ class DownloadQueue: self.done = PersistentQueue(self.config.STATE_DIR + '/completed') self.pending = PersistentQueue(self.config.STATE_DIR + '/pending') self.done.load() - self.max_concurrent_downloads = self.config.MAX_CONCURRENT_DOWNLOADS # New configuration option self.active_downloads = set() - self.download_semaphore = asyncio.Semaphore(self.max_concurrent_downloads) + self.max_concurrent_downloads = 3 # Adjust this value as needed + self.event = asyncio.Event() + + async def initialize(self): + await self.__import_queue() + + async def run(self): + while True: + try: + await self.__manage_downloads() + except Exception as e: + log.error(f"Error in download queue: {str(e)}") + await asyncio.sleep(5) # Wait a bit before retrying async def __import_queue(self): for k, v in self.queue.saved_items(): await self.add(v.url, v.quality, v.format, v.folder, v.custom_name_prefix) - async def initialize(self): - self.event = asyncio.Event() - asyncio.create_task(self.__manage_downloads()) - await self.__import_queue() - def __extract_info(self, url): return yt_dlp.YoutubeDL(params={ 'quiet': True, @@ -233,12 +178,6 @@ class DownloadQueue: }).extract_info(url, download=False) def __calc_download_path(self, quality, format, folder): - """Calculates download path from quality, format and folder attributes. - - Returns: - Tuple dldirectory, error_message both of which might be None (but not at the same time) - """ - # Keep consistent with frontend base_directory = self.config.DOWNLOAD_DIR if (quality != 'audio' and format not in AUDIO_FORMATS) else self.config.AUDIO_DOWNLOAD_DIR if folder: if not self.config.CUSTOM_DIRS: @@ -254,17 +193,7 @@ class DownloadQueue: else: dldirectory = base_directory return dldirectory, None - async def __manage_downloads(self): - while True: - while self.queue.empty(): - log.info('waiting for item to download') - await self.event.wait() - self.event.clear() - - async with self.download_semaphore: - id, entry = self.queue.next() - self.active_downloads.add(id) - asyncio.create_task(self.__process_download(id, entry)) + async def __add_entry(self, entry, quality, format, folder, custom_name_prefix, auto_start, already): if not entry: return {'status': 'error', 'msg': "Invalid/empty data was given."} @@ -324,10 +253,15 @@ class DownloadQueue: else: already.add(url) try: - entry = await asyncio.get_running_loop().run_in_executor(None, self.__extract_info, url) + entry = await asyncio.get_event_loop().run_in_executor(None, self.__extract_info, url) except yt_dlp.utils.YoutubeDLError as exc: return {'status': 'error', 'msg': str(exc)} - return await self.__add_entry(entry, quality, format, folder, custom_name_prefix, auto_start, already) + result = await self.__add_entry(entry, quality, format, folder, custom_name_prefix, auto_start, already) + + if result['status'] == 'ok' and auto_start: + self.event.set() # Signal that new items are available for download + + return result async def start_pending(self, ids): for id in ids: @@ -349,11 +283,11 @@ class DownloadQueue: if not self.queue.exists(id): log.warn(f'requested cancel for non-existent download {id}') continue - if self.queue.get(id).started(): - self.queue.get(id).cancel() - else: - self.queue.delete(id) - await self.notifier.canceled(id) + dl = self.queue.get(id) + if isinstance(dl, Download): + dl.cancel() + self.queue.delete(id) + await self.notifier.canceled(id) return {'status': 'ok'} async def clear(self, ids): @@ -375,35 +309,18 @@ class DownloadQueue: def get(self): return(list((k, v.info) for k, v in self.queue.items()) + list((k, v.info) for k, v in self.pending.items()), list((k, v.info) for k, v in self.done.items())) - async def __process_download(self, id, entry): - try: - log.info(f'downloading {entry.info.title}') - await entry.start(self.notifier) - if entry.info.status != 'finished': - if entry.tmpfilename and os.path.isfile(entry.tmpfilename): - try: - os.remove(entry.tmpfilename) - except: - pass - entry.info.status = 'error' - entry.close() - finally: - if self.queue.exists(id): - self.queue.delete(id) - if entry.canceled: - await self.notifier.canceled(id) - else: - self.done.put(entry) - await self.notifier.completed(entry.info) - self.active_downloads.remove(id) - self.download_semaphore.release() - async def __download(self): + + async def __manage_downloads(self): while True: - while self.queue.empty(): - log.info('waiting for item to download') - await self.event.wait() - self.event.clear() - id, entry = self.queue.next() + while not self.queue.empty() and len(self.active_downloads) < self.max_concurrent_downloads: + id, entry = self.queue.next() + if id not in self.active_downloads: + self.active_downloads.add(id) + asyncio.create_task(self.__download(id, entry)) + await asyncio.sleep(1) # Add a small delay to prevent busy waiting + + async def __download(self, id, entry): + try: log.info(f'downloading {entry.info.title}') await entry.start(self.notifier) if entry.info.status != 'finished': @@ -412,12 +329,12 @@ class DownloadQueue: os.remove(entry.tmpfilename) except: pass - entry.info.status = 'error' - entry.close() if self.queue.exists(id): self.queue.delete(id) if entry.canceled: await self.notifier.canceled(id) else: self.done.put(entry) - await self.notifier.completed(entry.info) \ No newline at end of file + await self.notifier.completed(entry.info) + finally: + self.active_downloads.remove(id) \ No newline at end of file