import asyncio import os import re import discord from redbot.core import Config, checks, commands, data_manager from pytube import YouTube, exceptions class YouTubeDownloader(commands.Cog): def __init__(self, bot): self.bot = bot self.config = Config.get_conf(self, identifier=475728338) self.config.register_global( save_directory = str(data_manager.cog_data_path()) + f"{os.sep}YouTubeDownloader", blacklisted_users = [] ) class UserBlacklisted(Exception): def __init__(self, message="The user is blacklisted from using this command."): super().__init__(message) async def blacklist_checker(self, user_id): blacklisted_users = await self.config.blacklisted_users() for blacklisted_user_id in blacklisted_users: if blacklisted_user_id == user_id: return self.UserBlacklisted @commands.command() @checks.is_owner() async def change_data_path(self, ctx: commands.Context, *, data_path: str = None): """This command changes the data path the `[p]download` command outputs to.""" old_path = await self.config.save_directory() if not data_path: await ctx.send(f"The current data path is `{old_path}`.") return if os.path.isdir(data_path): await self.config.save_directory.set(data_path) embed=discord.Embed(color=await self.bot.get_embed_color(None), description=f"The save directory has been set to `{data_path}`.\n It was previously set to `{old_path}`.") await ctx.send(embed=embed) elif os.path.isfile(data_path): await ctx.send("The path you've provided leads to a file, not a directory!") elif os.path.exists(data_path) is False: await ctx.send("The path you've provided doesn't exist!") @commands.command(aliases=["dl"]) async def download(self, ctx: commands.Context, url: str, audio_only: bool = True, delete: bool = False, *, subfolder: str = None): """This command downloads a YouTube Video as an `m4a` (or `mp4`) and uploads the file to discord. If you're considered a bot owner, you will be able to save downloaded files to the data path set in the `[p]change_data_path` command. **Arguments** - The `url` argument is just the url of the YouTube Video you're downloading. - The `audio_only` argument determines if the video will be an `m4a` file or an `mp4` file. - The `delete` argument will automatically delete the file after uploading it to Discord. If set to False, it will only save the file if you are a bot owner. - The `subfolder` argument only does anything if `delete` is set to False, but it allows you to save to a subfolder in the data path you've set previously without having to change said data path manually.""" try: self.blacklist_checker(ctx.author.id) except self.UserBlacklisted: await ctx.send(f"You are blacklisted from running this command!") return def youtube_download(self, url: str, path: str): """This function does the actual downloading of the YouTube Video.""" yt = YouTube(url=url) filename = f"{yt.title} ({yt.video_id})" if audio_only: yt.streams.filter(progressive=True, only_audio=True, format='m4a') yt.streams.order_by('abr') filename_format = filename + ".m4a" else: yt.streams.filter(progressive=True, format="mp4") yt.streams.order_by('resolution') filename_format = filename + ".mp4" if os.path.isfile(path + f"{os.sep}{filename_format}"): previously_existed = True else: previously_existed = False return yt.streams.first().download(filename=filename, output_path=path), previously_existed data_path = await self.config.save_directory() if subfolder and await self.bot.is_owner(ctx.author): data_path = os.path.join(data_path, subfolder) illegal_chars = r'<>:"/\|?*' if any(char in illegal_chars for char in subfolder): pattern = "[" + re.escape(illegal_chars) + "]" modified_subfolder = re.sub(pattern, r'__**\g<0>**__', subfolder) await ctx.send(f"Your subfolder contains illegal characters: `{modified_subfolder}`") elif os.path.isfile(data_path): await ctx.send("Your 'subfolder' is a file, not a directory!") elif os.path.exists(data_path) is False: message = await ctx.send("Your subfolder does not exist yet, would you like to continue? It will be automatically created.") def check(message): return message.author == ctx.author and message.content.lower() in ['yes', 'ye', 'y', '1'] try: await self.bot.wait_for('message', check=check, timeout=60) except asyncio.TimeoutError: await message.edit("You took too long to respond.") else: await message.edit("Confirmed!") try: os.makedirs(data_path) except OSError as e: await message.edit(f"Encountered an error attempting to create the subfolder!\n`{e}`") msg = message.edit else: msg = ctx.send message = await msg("YouTube Downloader started!") try: output = youtube_download(self, url, data_path) except exceptions.RegexMatchError: await message.edit(content="Please provide a link to YouTube and not another site.") return while not os.path.isfile(output[0]): await asyncio.sleep(0.2) if os.path.isfile(output[0]): with open(output[0], 'rb') as file: try: complete_message = await ctx.send(content="YouTube Downloader completed!\nDownloaded file:", file=discord.File(file, output[0])) except ValueError: complete_message = await ctx.send(content="YouTube Downloader completed, but the file was too large to upload.") file.close() if delete is True or await self.bot.is_owner(ctx.author) is False: if output[1] is False: os.remove(output) await complete_message.edit(content="YouTube Downloader completed!\nFile has been deleted.\nDownloaded file:") if output[1] is True: await complete_message.edit(content="YouTube Downloader completed!\nFile has not been deleted, as it was previously downloaded and saved.\nDownloaded file:") @commands.group(name="dl-blacklist", invoke_without_command=True) async def blacklist(self, ctx: commands.Context, user: discord.User): """Group command for managing the blacklist.""" for user_id in await self.config.blacklisted_users(): if user_id == user.id: await ctx.send(f"{user.mention} is in the blacklist.", allowed_mentions = discord.AllowedMentions(users=False)) return await ctx.send(f"{user.mention} is not in the blacklist.", allowed_mentions = discord.AllowedMentions(users=False)) @blacklist.command(name='add') @checks.is_owner() async def blacklist_add(self, ctx: commands.Context, user: discord.User): blacklisted_users: list = await self.config.blacklisted_users() for user_id in blacklisted_users: if user_id == user.id: await ctx.send(f"{user.mention} is already in the blacklist.") return blacklisted_users.append(user.id) await self.config.blacklisted_users.set(blacklisted_users) await ctx.send(f"{user.mention} has been added to the blacklist.") @blacklist.command(name='remove') @checks.is_owner() async def blacklist_remove(self, ctx: commands.Context, user: discord.User): blacklisted_users: list = await self.config.blacklisted_users() for user_id in blacklisted_users: if user_id == user.id: blacklisted_users.remove(user_id) await self.config.blacklisted_users.set(blacklisted_users) await ctx.send(f"{user.mention} has been removed from the blacklist.") return await ctx.send(f"{user.mention} is not in the blacklist.")