fix(youtubedownloader): converted the cog to use pytube instead of yt-dlp
Some checks failed
Pylint / Pylint (3.10) (push) Failing after 56s
Some checks failed
Pylint / Pylint (3.10) (push) Failing after 56s
This commit is contained in:
parent
6647a3a15e
commit
6a2137e5b0
7 changed files with 186 additions and 241 deletions
|
@ -1,11 +0,0 @@
|
|||
{
|
||||
"author" : ["SeaswimmerTheFsh"],
|
||||
"install_msg" : "Thank you for installing MusicDownloader!\nYou can find the source code of this cog here: https://git.seaswimmer.cc/SeaswimmerTheFsh/GalaxyCogs",
|
||||
"name" : "MusicDownloader",
|
||||
"short" : "Custom cog intended for use on the Galaxy discord server.",
|
||||
"description" : "Custom cog intended for use on the Galaxy discord server.",
|
||||
"end_user_data_statement" : "This cog does not store any End User Data.",
|
||||
"requirements": ["yt_dlp"],
|
||||
"hidden": true,
|
||||
"disabled": true
|
||||
}
|
|
@ -1,229 +0,0 @@
|
|||
import asyncio
|
||||
import os
|
||||
import re
|
||||
import sqlite3
|
||||
import discord
|
||||
from redbot.core import Config, checks, commands, data_manager
|
||||
from yt_dlp import YoutubeDL, utils
|
||||
|
||||
|
||||
class MusicDownloader(commands.Cog):
|
||||
def __init__(self, bot):
|
||||
self.bot = bot
|
||||
self.config = Config.get_conf(self, identifier=475728338)
|
||||
self.config.register_global(
|
||||
save_directory = str(data_manager.cog_data_path()) + f"{os.sep}MusicDownloader"
|
||||
)
|
||||
|
||||
class UserBlacklisted(Exception):
|
||||
def __init__(self, message="The user is blacklisted from using this command."):
|
||||
super().__init__(message)
|
||||
|
||||
def create_table(self):
|
||||
data_path = str(data_manager.cog_data_path()) + f"{os.sep}MusicDownloader"
|
||||
db_path = os.path.join(data_path, "database.db")
|
||||
if not os.path.isfile(db_path):
|
||||
con = sqlite3.connect(db_path)
|
||||
cur = con.cursor()
|
||||
cur.execute('''
|
||||
CREATE TABLE [IF NOT EXISTS] "blacklist_log" (
|
||||
"user_id" INTEGER NOT NULL UNIQUE,
|
||||
"reason" TEXT DEFAULT NULL,
|
||||
PRIMARY KEY("user_id")
|
||||
);
|
||||
''')
|
||||
con.commit()
|
||||
con.close()
|
||||
|
||||
def blacklist_checker(self, user_id):
|
||||
data_path = str(data_manager.cog_data_path()) + f"{os.sep}MusicDownloader"
|
||||
db_path = os.path.join(data_path, "database.db")
|
||||
con = sqlite3.connect(db_path)
|
||||
cur = con.cursor()
|
||||
cur.execute("SELECT user_id, reason FROM blacklist_log WHERE user_id = ?;", (user_id,))
|
||||
result = cur.fetchone()
|
||||
con.close()
|
||||
if result:
|
||||
user_id, reason = result
|
||||
raise self.UserBlacklisted(reason)
|
||||
|
||||
async def cog_load(self):
|
||||
self.create_table()
|
||||
|
||||
@commands.command()
|
||||
@checks.is_owner()
|
||||
async def change_data_path(self, ctx: commands.Context, *, data_path: str = None):
|
||||
"""This command changes the data path the `[p]download` command outputs to."""
|
||||
old_path = await self.config.save_directory()
|
||||
if not data_path:
|
||||
await ctx.send(f"The current data path is `{old_path}`.")
|
||||
return
|
||||
if os.path.isdir(data_path):
|
||||
await self.config.save_directory.set(data_path)
|
||||
embed=discord.Embed(color=await self.bot.get_embed_color(None), description=f"The save directory has been set to `{data_path}`.\n It was previously set to `{old_path}`.")
|
||||
await ctx.send(embed=embed)
|
||||
elif os.path.isfile(data_path):
|
||||
await ctx.send("The path you've provided leads to a file, not a directory!")
|
||||
elif os.path.exists(data_path) is False:
|
||||
await ctx.send("The path you've provided doesn't exist!")
|
||||
|
||||
@commands.command(aliases=["dl"])
|
||||
async def download(self, ctx: commands.Context, url: str, delete: bool = False, *, subfolder: str = None):
|
||||
"""This command downloads a YouTube Video as an `m4a` and uploads the file to discord.
|
||||
|
||||
If you're considered a bot owner, you will be able to save downloaded files to the data path set in the `[p]change_data_path` command.
|
||||
|
||||
**Arguments**
|
||||
|
||||
- The `url` argument is just the url of the YouTube Video you're downloading.
|
||||
|
||||
- The `delete` argument will automatically delete the audio file after uploading it to Discord. If set to False, it will only save the file if you are a bot owner.
|
||||
|
||||
- The `subfolder` argument only does anything if `delete` is set to False, but it allows you to save to a subfolder in the data path you've set previously without having to change said data path manually."""
|
||||
try:
|
||||
self.blacklist_checker(ctx.author.id)
|
||||
except self.UserBlacklisted as e:
|
||||
await ctx.send(f"You are blacklisted from running this command!\nReason: `{e}`")
|
||||
return
|
||||
def youtube_download(self, url: str, path: str):
|
||||
"""This function does the actual downloading of the YouTube Video."""
|
||||
class Logger:
|
||||
def debug(self, msg):
|
||||
if msg.startswith('[debug] '):
|
||||
pass
|
||||
else:
|
||||
self.info(msg)
|
||||
def info(self, msg):
|
||||
pass
|
||||
def warning(self, msg):
|
||||
pass
|
||||
def error(self, msg):
|
||||
print(msg)
|
||||
ydl_opts = {
|
||||
'logger': Logger(),
|
||||
'format': 'm4a/bestaudio/best',
|
||||
'postprocessors': [{'key': 'FFmpegExtractAudio', 'preferredcodec': 'm4a',}],
|
||||
'paths': {'home': path},
|
||||
'verbose': True
|
||||
}
|
||||
with YoutubeDL(ydl_opts) as ydl:
|
||||
info = ydl.extract_info(url=url, download=False)
|
||||
title = info['title']
|
||||
Yid = info['id']
|
||||
filename = title + f' [{Yid}].m4a'
|
||||
full_filename = os.path.join(data_path, filename)
|
||||
if os.path.isfile(full_filename):
|
||||
previously_existed = True
|
||||
else:
|
||||
with YoutubeDL(ydl_opts) as ydl:
|
||||
ydl.download(url)
|
||||
previously_existed = False
|
||||
return filename, previously_existed
|
||||
data_path = await self.config.save_directory()
|
||||
if subfolder and await self.bot.is_owner(ctx.author):
|
||||
data_path = os.path.join(data_path, subfolder)
|
||||
illegal_chars = r'<>:"/\|?*'
|
||||
if any(char in illegal_chars for char in subfolder):
|
||||
pattern = "[" + re.escape(illegal_chars) + "]"
|
||||
modified_subfolder = re.sub(pattern, r'__**\g<0>**__', subfolder)
|
||||
await ctx.send(f"Your subfolder contains illegal characters: `{modified_subfolder}`")
|
||||
elif os.path.isfile(data_path):
|
||||
await ctx.send("Your 'subfolder' is a file, not a directory!")
|
||||
elif os.path.exists(data_path) is False:
|
||||
message = await ctx.send("Your subfolder does not exist yet, would you like to continue? It will be automatically created.")
|
||||
def check(message):
|
||||
return message.author == ctx.author and message.content.lower() in ['yes', 'ye', 'y']
|
||||
try:
|
||||
await self.bot.wait_for('message', check=check, timeout=60) # Timeout after 60 seconds
|
||||
except asyncio.TimeoutError:
|
||||
await message.edit("You took too long to respond.")
|
||||
else:
|
||||
await message.edit("Confirmed!")
|
||||
try:
|
||||
os.makedirs(data_path)
|
||||
except OSError as e:
|
||||
await message.edit(f"Encountered an error attempting to create the subfolder!\n`{e}`")
|
||||
msg = message.edit
|
||||
else:
|
||||
msg = ctx.send
|
||||
message = await msg("YouTube Downloader started!")
|
||||
try:
|
||||
ytdlp_output = youtube_download(self, url, data_path)
|
||||
except (utils.DownloadError, utils.ExtractorError):
|
||||
await message.edit(content="Please provide a link to YouTube and not another site.\nThe site you've linked to is known for using DRM protection, so MusicDownloader cannot download from it.")
|
||||
return
|
||||
full_filename = os.path.join(data_path, ytdlp_output[0])
|
||||
while not os.path.isfile(full_filename):
|
||||
await asyncio.sleep(0.2)
|
||||
if os.path.isfile(full_filename):
|
||||
with open(full_filename, 'rb') as file:
|
||||
try:
|
||||
complete_message = await ctx.send(content="YouTube Downloader completed!\nDownloaded file:", file=discord.File(file, ytdlp_output[0]))
|
||||
except ValueError:
|
||||
complete_message = await ctx.send(content="YouTube Downloader completed, but the audio file was too large to upload.")
|
||||
file.close()
|
||||
if delete is True or await self.bot.is_owner(ctx.author) is False:
|
||||
if ytdlp_output[1] is False:
|
||||
os.remove(full_filename)
|
||||
await complete_message.edit(content="YouTube Downloader completed!\nFile has been deleted.\nDownloaded file:")
|
||||
if ytdlp_output[1] is True:
|
||||
await complete_message.edit(content="YouTube Downloader completed!\nFile has not been deleted, as it was previously downloaded and saved.\nDownloaded file:")
|
||||
|
||||
@commands.group(name="dl-blacklist", invoke_without_command=True)
|
||||
async def blacklist(self, ctx: commands.Context, user: discord.User = None):
|
||||
"""Group command for managing the blacklist."""
|
||||
if not user:
|
||||
user = ctx.author
|
||||
data_path = str(data_manager.cog_data_path()) + f"{os.sep}MusicDownloader"
|
||||
db_path = os.path.join(data_path, "database.db")
|
||||
if user is None:
|
||||
await ctx.send("Please provide a user to check in the blacklist.")
|
||||
return
|
||||
con = sqlite3.connect(db_path)
|
||||
cur = con.cursor()
|
||||
cur.execute("SELECT user_id, reason FROM blacklist_log WHERE user_id = ?;", (user.id,))
|
||||
result = cur.fetchone()
|
||||
if result:
|
||||
reason = result
|
||||
await ctx.send(f"{user.mention} is in the blacklist for the following reason: `{reason}`", allowed_mentions = discord.AllowedMentions(users=False))
|
||||
else:
|
||||
await ctx.send(f"{user.mention} is not in the blacklist.", allowed_mentions = discord.AllowedMentions(users=False))
|
||||
con.close()
|
||||
|
||||
@blacklist.command(name='add')
|
||||
@checks.is_owner()
|
||||
async def blacklist_add(self, ctx: commands.Context, user: discord.User, *, reason: str = None):
|
||||
if not reason:
|
||||
reason = 'No reason provided.'
|
||||
data_path = str(data_manager.cog_data_path()) + f"{os.sep}MusicDownloader"
|
||||
db_path = os.path.join(data_path, "database.db")
|
||||
con = sqlite3.connect(db_path)
|
||||
cur = con.cursor()
|
||||
cur.execute("SELECT user_id FROM blacklist_log WHERE user_id = ?;", (user.id,))
|
||||
result = cur.fetchone()
|
||||
if result:
|
||||
await ctx.send("User is already in the blacklist.")
|
||||
con.close()
|
||||
return
|
||||
cur.execute("INSERT INTO blacklist_log (user_id, reason) VALUES (?, ?);", (user.id, reason))
|
||||
con.commit()
|
||||
con.close()
|
||||
await ctx.send(f"{user.mention} has been added to the blacklist with the reason: `{reason}`")
|
||||
|
||||
@blacklist.command(name='remove')
|
||||
@checks.is_owner()
|
||||
async def blacklist_remove(self, ctx: commands.Context, user: discord.User):
|
||||
data_path = str(data_manager.cog_data_path()) + f"{os.sep}MusicDownloader"
|
||||
db_path = os.path.join(data_path, "database.db")
|
||||
con = sqlite3.connect(db_path)
|
||||
cur = con.cursor()
|
||||
cur.execute("SELECT user_id FROM blacklist_log WHERE user_id = ?;", (user.id,))
|
||||
result = cur.fetchone()
|
||||
if not result:
|
||||
await ctx.send("User is not in the blacklist.")
|
||||
con.close()
|
||||
return
|
||||
cur.execute("DELETE FROM blacklist_log WHERE user_id = ?;", (user.id,))
|
||||
con.commit()
|
||||
con.close()
|
||||
await ctx.send(f"{user.mention} has been removed from the blacklist.")
|
13
poetry.lock
generated
13
poetry.lock
generated
|
@ -1737,6 +1737,17 @@ files = [
|
|||
[package.extras]
|
||||
dateutil = ["python-dateutil (>=2.8.2,<2.9.0)"]
|
||||
|
||||
[[package]]
|
||||
name = "pytube"
|
||||
version = "15.0.0"
|
||||
description = "Python 3 library for downloading YouTube Videos."
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
{file = "pytube-15.0.0-py3-none-any.whl", hash = "sha256:07b9904749e213485780d7eb606e5e5b8e4341aa4dccf699160876da00e12d78"},
|
||||
{file = "pytube-15.0.0.tar.gz", hash = "sha256:076052efe76f390dfa24b1194ff821d4e86c17d41cb5562f3a276a8bcbfc9d1d"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pytz"
|
||||
version = "2023.3.post1"
|
||||
|
@ -2459,4 +2470,4 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "p
|
|||
[metadata]
|
||||
lock-version = "2.0"
|
||||
python-versions = ">=3.9,<3.12"
|
||||
content-hash = "e8a8586b2d19a4745fceaffae2d0e2cbd7fef38ebe30db14492877016a6df9b0"
|
||||
content-hash = "652356062af3b42b1c87efbbed8380668aa1926cfa02d4f9718de075f8c40f8a"
|
||||
|
|
|
@ -14,6 +14,7 @@ yt-dlp = "^2023.9.24"
|
|||
prisma = "^0.10.0"
|
||||
mysql-connector-python = "^8.1.0"
|
||||
humanize = "^4.8.0"
|
||||
pytube = "^15.0.0"
|
||||
|
||||
[tool.poetry.group.dev]
|
||||
optional = true
|
||||
|
|
11
youtubedownloader/info.json
Normal file
11
youtubedownloader/info.json
Normal file
|
@ -0,0 +1,11 @@
|
|||
{
|
||||
"author" : ["SeaswimmerTheFsh"],
|
||||
"install_msg" : "Thank you for installing YouTubeDownloader!\nYou can find the source code of this cog here: https://git.seaswimmer.cc/SeaswimmerTheFsh/GalaxyCogs",
|
||||
"name" : "YouTubeDownloader",
|
||||
"short" : "Custom cog intended for use on the Galaxy discord server.",
|
||||
"description" : "Custom cog intended for use on the Galaxy discord server.",
|
||||
"end_user_data_statement" : "This cog does not store any End User Data.",
|
||||
"requirements": ["pytube"],
|
||||
"hidden": false,
|
||||
"disabled": false
|
||||
}
|
162
youtubedownloader/youtubedownloader.py
Normal file
162
youtubedownloader/youtubedownloader.py
Normal file
|
@ -0,0 +1,162 @@
|
|||
import asyncio
|
||||
import os
|
||||
import re
|
||||
import discord
|
||||
from redbot.core import Config, checks, commands, data_manager
|
||||
from pytube import YouTube, exceptions
|
||||
|
||||
|
||||
class YouTubeDownloader(commands.Cog):
|
||||
def __init__(self, bot):
|
||||
self.bot = bot
|
||||
self.config = Config.get_conf(self, identifier=475728338)
|
||||
self.config.register_global(
|
||||
save_directory = str(data_manager.cog_data_path()) + f"{os.sep}YouTubeDownloader",
|
||||
blacklisted_users = []
|
||||
)
|
||||
|
||||
class UserBlacklisted(Exception):
|
||||
def __init__(self, message="The user is blacklisted from using this command."):
|
||||
super().__init__(message)
|
||||
|
||||
async def blacklist_checker(self, user_id):
|
||||
blacklisted_users = await self.config.blacklisted_users()
|
||||
for blacklisted_user_id in blacklisted_users:
|
||||
if blacklisted_user_id == user_id:
|
||||
return self.UserBlacklisted
|
||||
|
||||
@commands.command()
|
||||
@checks.is_owner()
|
||||
async def change_data_path(self, ctx: commands.Context, *, data_path: str = None):
|
||||
"""This command changes the data path the `[p]download` command outputs to."""
|
||||
old_path = await self.config.save_directory()
|
||||
if not data_path:
|
||||
await ctx.send(f"The current data path is `{old_path}`.")
|
||||
return
|
||||
if os.path.isdir(data_path):
|
||||
await self.config.save_directory.set(data_path)
|
||||
embed=discord.Embed(color=await self.bot.get_embed_color(None), description=f"The save directory has been set to `{data_path}`.\n It was previously set to `{old_path}`.")
|
||||
await ctx.send(embed=embed)
|
||||
elif os.path.isfile(data_path):
|
||||
await ctx.send("The path you've provided leads to a file, not a directory!")
|
||||
elif os.path.exists(data_path) is False:
|
||||
await ctx.send("The path you've provided doesn't exist!")
|
||||
|
||||
@commands.command(aliases=["dl"])
|
||||
async def download(self, ctx: commands.Context, url: str, audio_only: bool = True, delete: bool = False, *, subfolder: str = None):
|
||||
"""This command downloads a YouTube Video as an `m4a` (or `mp4`) and uploads the file to discord.
|
||||
|
||||
If you're considered a bot owner, you will be able to save downloaded files to the data path set in the `[p]change_data_path` command.
|
||||
|
||||
**Arguments**
|
||||
|
||||
- The `url` argument is just the url of the YouTube Video you're downloading.
|
||||
|
||||
- The `audio_only` argument determines if the video will be an `m4a` file or an `mp4` file.
|
||||
|
||||
- The `delete` argument will automatically delete the file after uploading it to Discord. If set to False, it will only save the file if you are a bot owner.
|
||||
|
||||
- The `subfolder` argument only does anything if `delete` is set to False, but it allows you to save to a subfolder in the data path you've set previously without having to change said data path manually."""
|
||||
try:
|
||||
self.blacklist_checker(ctx.author.id)
|
||||
except self.UserBlacklisted:
|
||||
await ctx.send(f"You are blacklisted from running this command!")
|
||||
return
|
||||
def youtube_download(self, url: str, path: str):
|
||||
"""This function does the actual downloading of the YouTube Video."""
|
||||
yt = YouTube(url=url)
|
||||
filename = f"{yt.title} ({yt.video_id})"
|
||||
if audio_only:
|
||||
yt.streams.filter(progressive=True, only_audio=True, format='m4a')
|
||||
yt.streams.order_by('abr')
|
||||
filename_format = filename + ".m4a"
|
||||
else:
|
||||
yt.streams.filter(progressive=True, format="mp4")
|
||||
yt.streams.order_by('resolution')
|
||||
filename_format = filename + ".mp4"
|
||||
if os.path.isfile(path + f"{os.sep}{filename_format}"):
|
||||
previously_existed = True
|
||||
else:
|
||||
previously_existed = False
|
||||
return yt.streams.first().download(filename=filename, output_path=path), previously_existed
|
||||
data_path = await self.config.save_directory()
|
||||
if subfolder and await self.bot.is_owner(ctx.author):
|
||||
data_path = os.path.join(data_path, subfolder)
|
||||
illegal_chars = r'<>:"/\|?*'
|
||||
if any(char in illegal_chars for char in subfolder):
|
||||
pattern = "[" + re.escape(illegal_chars) + "]"
|
||||
modified_subfolder = re.sub(pattern, r'__**\g<0>**__', subfolder)
|
||||
await ctx.send(f"Your subfolder contains illegal characters: `{modified_subfolder}`")
|
||||
elif os.path.isfile(data_path):
|
||||
await ctx.send("Your 'subfolder' is a file, not a directory!")
|
||||
elif os.path.exists(data_path) is False:
|
||||
message = await ctx.send("Your subfolder does not exist yet, would you like to continue? It will be automatically created.")
|
||||
def check(message):
|
||||
return message.author == ctx.author and message.content.lower() in ['yes', 'ye', 'y', '1']
|
||||
try:
|
||||
await self.bot.wait_for('message', check=check, timeout=60)
|
||||
except asyncio.TimeoutError:
|
||||
await message.edit("You took too long to respond.")
|
||||
else:
|
||||
await message.edit("Confirmed!")
|
||||
try:
|
||||
os.makedirs(data_path)
|
||||
except OSError as e:
|
||||
await message.edit(f"Encountered an error attempting to create the subfolder!\n`{e}`")
|
||||
msg = message.edit
|
||||
else:
|
||||
msg = ctx.send
|
||||
message = await msg("YouTube Downloader started!")
|
||||
try:
|
||||
output = youtube_download(self, url, data_path)
|
||||
except exceptions.RegexMatchError:
|
||||
await message.edit(content="Please provide a link to YouTube and not another site.")
|
||||
return
|
||||
while not os.path.isfile(output[0]):
|
||||
await asyncio.sleep(0.2)
|
||||
if os.path.isfile(output[0]):
|
||||
with open(output[0], 'rb') as file:
|
||||
try:
|
||||
complete_message = await ctx.send(content="YouTube Downloader completed!\nDownloaded file:", file=discord.File(file, output[0]))
|
||||
except ValueError:
|
||||
complete_message = await ctx.send(content="YouTube Downloader completed, but the file was too large to upload.")
|
||||
file.close()
|
||||
if delete is True or await self.bot.is_owner(ctx.author) is False:
|
||||
if output[1] is False:
|
||||
os.remove(output)
|
||||
await complete_message.edit(content="YouTube Downloader completed!\nFile has been deleted.\nDownloaded file:")
|
||||
if output[1] is True:
|
||||
await complete_message.edit(content="YouTube Downloader completed!\nFile has not been deleted, as it was previously downloaded and saved.\nDownloaded file:")
|
||||
|
||||
@commands.group(name="dl-blacklist", invoke_without_command=True)
|
||||
async def blacklist(self, ctx: commands.Context, user: discord.User):
|
||||
"""Group command for managing the blacklist."""
|
||||
for user_id in await self.config.blacklisted_users():
|
||||
if user_id == user.id:
|
||||
await ctx.send(f"{user.mention} is in the blacklist.", allowed_mentions = discord.AllowedMentions(users=False))
|
||||
return
|
||||
await ctx.send(f"{user.mention} is not in the blacklist.", allowed_mentions = discord.AllowedMentions(users=False))
|
||||
|
||||
@blacklist.command(name='add')
|
||||
@checks.is_owner()
|
||||
async def blacklist_add(self, ctx: commands.Context, user: discord.User):
|
||||
blacklisted_users: list = await self.config.blacklisted_users()
|
||||
for user_id in blacklisted_users:
|
||||
if user_id == user.id:
|
||||
await ctx.send(f"{user.mention} is already in the blacklist.")
|
||||
return
|
||||
blacklisted_users.append(user.id)
|
||||
await self.config.blacklisted_users.set(blacklisted_users)
|
||||
await ctx.send(f"{user.mention} has been added to the blacklist.")
|
||||
|
||||
@blacklist.command(name='remove')
|
||||
@checks.is_owner()
|
||||
async def blacklist_remove(self, ctx: commands.Context, user: discord.User):
|
||||
blacklisted_users: list = await self.config.blacklisted_users()
|
||||
for user_id in blacklisted_users:
|
||||
if user_id == user.id:
|
||||
blacklisted_users.remove(user_id)
|
||||
await self.config.blacklisted_users.set(blacklisted_users)
|
||||
await ctx.send(f"{user.mention} has been removed from the blacklist.")
|
||||
return
|
||||
await ctx.send(f"{user.mention} is not in the blacklist.")
|
Loading…
Add table
Reference in a new issue