162 lines
8.5 KiB
Python
162 lines
8.5 KiB
Python
import asyncio
|
|
import os
|
|
import re
|
|
import discord
|
|
from redbot.core import Config, checks, commands, data_manager
|
|
from pytube import YouTube, exceptions
|
|
|
|
|
|
class YouTubeDownloader(commands.Cog):
|
|
def __init__(self, bot):
|
|
self.bot = bot
|
|
self.config = Config.get_conf(self, identifier=475728338)
|
|
self.config.register_global(
|
|
save_directory = str(data_manager.cog_data_path()) + f"{os.sep}YouTubeDownloader",
|
|
blacklisted_users = []
|
|
)
|
|
|
|
class UserBlacklisted(Exception):
|
|
def __init__(self, message="The user is blacklisted from using this command."):
|
|
super().__init__(message)
|
|
|
|
async def blacklist_checker(self, user_id):
|
|
blacklisted_users = await self.config.blacklisted_users()
|
|
for blacklisted_user_id in blacklisted_users:
|
|
if blacklisted_user_id == user_id:
|
|
return self.UserBlacklisted
|
|
|
|
@commands.command()
|
|
@checks.is_owner()
|
|
async def change_data_path(self, ctx: commands.Context, *, data_path: str = None):
|
|
"""This command changes the data path the `[p]download` command outputs to."""
|
|
old_path = await self.config.save_directory()
|
|
if not data_path:
|
|
await ctx.send(f"The current data path is `{old_path}`.")
|
|
return
|
|
if os.path.isdir(data_path):
|
|
await self.config.save_directory.set(data_path)
|
|
embed=discord.Embed(color=await self.bot.get_embed_color(None), description=f"The save directory has been set to `{data_path}`.\n It was previously set to `{old_path}`.")
|
|
await ctx.send(embed=embed)
|
|
elif os.path.isfile(data_path):
|
|
await ctx.send("The path you've provided leads to a file, not a directory!")
|
|
elif os.path.exists(data_path) is False:
|
|
await ctx.send("The path you've provided doesn't exist!")
|
|
|
|
@commands.command(aliases=["dl"])
|
|
async def download(self, ctx: commands.Context, url: str, audio_only: bool = True, delete: bool = False, *, subfolder: str = None):
|
|
"""This command downloads a YouTube Video as an `m4a` (or `mp4`) and uploads the file to discord.
|
|
|
|
If you're considered a bot owner, you will be able to save downloaded files to the data path set in the `[p]change_data_path` command.
|
|
|
|
**Arguments**
|
|
|
|
- The `url` argument is just the url of the YouTube Video you're downloading.
|
|
|
|
- The `audio_only` argument determines if the video will be an `m4a` file or an `mp4` file.
|
|
|
|
- The `delete` argument will automatically delete the file after uploading it to Discord. If set to False, it will only save the file if you are a bot owner.
|
|
|
|
- The `subfolder` argument only does anything if `delete` is set to False, but it allows you to save to a subfolder in the data path you've set previously without having to change said data path manually."""
|
|
try:
|
|
self.blacklist_checker(ctx.author.id)
|
|
except self.UserBlacklisted:
|
|
await ctx.send(f"You are blacklisted from running this command!")
|
|
return
|
|
def youtube_download(self, url: str, path: str):
|
|
"""This function does the actual downloading of the YouTube Video."""
|
|
yt = YouTube(url=url)
|
|
filename = f"{yt.title} ({yt.video_id})"
|
|
if audio_only:
|
|
yt.streams.filter(only_audio=True, mime_type='audio/mp4')
|
|
yt.streams.order_by('abr')
|
|
filename_format = filename + ".m4a"
|
|
else:
|
|
yt.streams.filter(progressive=True, mime_type="video/mp4")
|
|
yt.streams.order_by('resolution')
|
|
filename_format = filename + ".mp4"
|
|
if os.path.isfile(path + f"{os.sep}{filename_format}"):
|
|
previously_existed = True
|
|
else:
|
|
previously_existed = False
|
|
return yt.streams.first().download(filename=filename, output_path=path), previously_existed, filename_format
|
|
data_path = await self.config.save_directory()
|
|
if subfolder and await self.bot.is_owner(ctx.author):
|
|
data_path = os.path.join(data_path, subfolder)
|
|
illegal_chars = r'<>:"/\|?*'
|
|
if any(char in illegal_chars for char in subfolder):
|
|
pattern = "[" + re.escape(illegal_chars) + "]"
|
|
modified_subfolder = re.sub(pattern, r'__**\g<0>**__', subfolder)
|
|
await ctx.send(f"Your subfolder contains illegal characters: `{modified_subfolder}`")
|
|
elif os.path.isfile(data_path):
|
|
await ctx.send("Your 'subfolder' is a file, not a directory!")
|
|
elif os.path.exists(data_path) is False:
|
|
message = await ctx.send("Your subfolder does not exist yet, would you like to continue? It will be automatically created.")
|
|
def check(message):
|
|
return message.author == ctx.author and message.content.lower() in ['yes', 'ye', 'y', '1']
|
|
try:
|
|
await self.bot.wait_for('message', check=check, timeout=60)
|
|
except asyncio.TimeoutError:
|
|
await message.edit("You took too long to respond.")
|
|
else:
|
|
await message.edit("Confirmed!")
|
|
try:
|
|
os.makedirs(data_path)
|
|
except OSError as e:
|
|
await message.edit(f"Encountered an error attempting to create the subfolder!\n`{e}`")
|
|
msg = message.edit
|
|
else:
|
|
msg = ctx.send
|
|
message = await msg("YouTube Downloader started!")
|
|
try:
|
|
output = youtube_download(self, url, data_path)
|
|
except exceptions.RegexMatchError:
|
|
await message.edit(content="Please provide a link to YouTube and not another site.")
|
|
return
|
|
while not os.path.isfile(output[0]):
|
|
await asyncio.sleep(0.2)
|
|
if os.path.isfile(output[0]):
|
|
with open(output[0], 'rb') as file:
|
|
try:
|
|
complete_message = await ctx.send(content="YouTube Downloader completed!\nDownloaded file:", file=discord.File(file, output[2]))
|
|
except ValueError:
|
|
complete_message = await ctx.send(content="YouTube Downloader completed, but the file was too large to upload.")
|
|
file.close()
|
|
if delete is True or await self.bot.is_owner(ctx.author) is False:
|
|
if output[1] is False:
|
|
os.remove(output)
|
|
await complete_message.edit(content="YouTube Downloader completed!\nFile has been deleted.\nDownloaded file:")
|
|
if output[1] is True:
|
|
await complete_message.edit(content="YouTube Downloader completed!\nFile has not been deleted, as it was previously downloaded and saved.\nDownloaded file:")
|
|
|
|
@commands.group(name="dl-blacklist", invoke_without_command=True)
|
|
async def blacklist(self, ctx: commands.Context, user: discord.User):
|
|
"""Group command for managing the blacklist."""
|
|
for user_id in await self.config.blacklisted_users():
|
|
if user_id == user.id:
|
|
await ctx.send(f"{user.mention} is in the blacklist.", allowed_mentions = discord.AllowedMentions(users=False))
|
|
return
|
|
await ctx.send(f"{user.mention} is not in the blacklist.", allowed_mentions = discord.AllowedMentions(users=False))
|
|
|
|
@blacklist.command(name='add')
|
|
@checks.is_owner()
|
|
async def blacklist_add(self, ctx: commands.Context, user: discord.User):
|
|
blacklisted_users: list = await self.config.blacklisted_users()
|
|
for user_id in blacklisted_users:
|
|
if user_id == user.id:
|
|
await ctx.send(f"{user.mention} is already in the blacklist.")
|
|
return
|
|
blacklisted_users.append(user.id)
|
|
await self.config.blacklisted_users.set(blacklisted_users)
|
|
await ctx.send(f"{user.mention} has been added to the blacklist.")
|
|
|
|
@blacklist.command(name='remove')
|
|
@checks.is_owner()
|
|
async def blacklist_remove(self, ctx: commands.Context, user: discord.User):
|
|
blacklisted_users: list = await self.config.blacklisted_users()
|
|
for user_id in blacklisted_users:
|
|
if user_id == user.id:
|
|
blacklisted_users.remove(user_id)
|
|
await self.config.blacklisted_users.set(blacklisted_users)
|
|
await ctx.send(f"{user.mention} has been removed from the blacklist.")
|
|
return
|
|
await ctx.send(f"{user.mention} is not in the blacklist.")
|