2024-01-31 12:12:38 -05:00
|
|
|
# _____ _
|
|
|
|
# / ____| (_)
|
|
|
|
# | (___ ___ __ _ _____ ___ _ __ ___ _ __ ___ ___ _ __
|
|
|
|
# \___ \ / _ \/ _` / __\ \ /\ / / | '_ ` _ \| '_ ` _ \ / _ \ '__|
|
|
|
|
# ____) | __/ (_| \__ \\ V V /| | | | | | | | | | | | __/ |
|
|
|
|
# |_____/ \___|\__,_|___/ \_/\_/ |_|_| |_| |_|_| |_| |_|\___|_|
|
|
|
|
|
2024-01-31 14:15:46 -05:00
|
|
|
import contextlib
|
|
|
|
import logging
|
2024-01-31 12:12:38 -05:00
|
|
|
import json
|
2024-01-31 13:07:10 -05:00
|
|
|
import re
|
2024-01-31 12:12:38 -05:00
|
|
|
|
|
|
|
from redbot.core import commands
|
|
|
|
from redbot.core.bot import Red
|
2024-01-31 14:15:46 -05:00
|
|
|
from redbot.core.utils.chat_formatting import error, text_to_file, inline
|
|
|
|
from redbot.cogs.downloader.converters import InstalledCog
|
|
|
|
import redbot.cogs.downloader.errors as errors
|
2024-01-31 12:12:38 -05:00
|
|
|
|
|
|
|
class Backup(commands.Cog):
|
|
|
|
"""A utility to make reinstalling repositories and cogs after migrating the bot far easier."""
|
|
|
|
|
|
|
|
__author__ = "SeaswimmerTheFsh"
|
|
|
|
__version__ = "1.0.0"
|
|
|
|
|
|
|
|
def __init__(self, bot: Red):
|
|
|
|
super().__init__()
|
|
|
|
self.bot = bot
|
2024-01-31 14:15:46 -05:00
|
|
|
self.logger = logging.getLogger("red.sea.backup")
|
2024-01-31 12:12:38 -05:00
|
|
|
|
|
|
|
@commands.group(autohelp=True)
|
|
|
|
@commands.is_owner()
|
|
|
|
async def backup(self, ctx: commands.Context):
|
|
|
|
"""Backup your installed cogs."""
|
|
|
|
|
|
|
|
@backup.command(name='export')
|
|
|
|
@commands.is_owner()
|
|
|
|
async def backup_export(self, ctx: commands.Context):
|
|
|
|
"""Export your installed repositories and cogs to a file."""
|
|
|
|
downloader = ctx.bot.get_cog("Downloader")
|
|
|
|
if downloader is None:
|
2024-01-31 12:35:01 -05:00
|
|
|
await ctx.send(error(f"You do not have the `Downloader` cog loaded. Please run `{ctx.prefix}load downloader` and try again."))
|
2024-01-31 12:12:38 -05:00
|
|
|
return
|
2024-01-31 12:18:55 -05:00
|
|
|
|
2024-01-31 12:35:01 -05:00
|
|
|
all_repos = list(downloader._repo_manager.repos) # pylint: disable=protected-access
|
2024-01-31 12:18:55 -05:00
|
|
|
|
|
|
|
export_data = []
|
|
|
|
|
|
|
|
for repo in all_repos:
|
|
|
|
repo_dict = {
|
|
|
|
"name": repo.name,
|
|
|
|
"url": repo.url,
|
2024-01-31 13:33:39 -05:00
|
|
|
"branch": repo.branch,
|
2024-01-31 12:18:55 -05:00
|
|
|
"cogs": []
|
|
|
|
}
|
|
|
|
|
|
|
|
cogs = await downloader.installed_cogs()
|
|
|
|
|
|
|
|
for cog in cogs:
|
|
|
|
if cog.repo_name == repo.name:
|
2024-01-31 13:55:34 -05:00
|
|
|
cog_dict = {
|
|
|
|
"name": cog.name,
|
|
|
|
"pinned": cog.pinned,
|
|
|
|
"commit": cog.commit
|
|
|
|
}
|
|
|
|
repo_dict["cogs"].append(cog_dict)
|
2024-01-31 12:18:55 -05:00
|
|
|
|
|
|
|
export_data.append(repo_dict)
|
|
|
|
|
|
|
|
await ctx.send(file=text_to_file(json.dumps(export_data, indent=4), 'backup.json'))
|
2024-01-31 12:35:01 -05:00
|
|
|
|
|
|
|
@backup.command(name='import')
|
|
|
|
@commands.is_owner()
|
2024-01-31 13:17:54 -05:00
|
|
|
async def backup_import(self, ctx: commands.Context):
|
|
|
|
"""Import your installed repositories and cogs from an export file."""
|
2024-01-31 12:55:35 -05:00
|
|
|
try:
|
2024-01-31 13:22:56 -05:00
|
|
|
export = json.loads(await ctx.message.attachments[0].read())
|
2024-01-31 13:21:24 -05:00
|
|
|
except (json.JSONDecodeError, IndexError):
|
2024-01-31 13:19:26 -05:00
|
|
|
await ctx.send(error("Please provide a valid JSON export file."))
|
2024-01-31 12:55:35 -05:00
|
|
|
return
|
2024-01-31 12:35:01 -05:00
|
|
|
|
|
|
|
downloader = ctx.bot.get_cog("Downloader")
|
|
|
|
if downloader is None:
|
|
|
|
await ctx.send(error(f"You do not have the `Downloader` cog loaded. Please run `{ctx.prefix}load downloader` and try again."))
|
|
|
|
return
|
2024-01-31 14:15:46 -05:00
|
|
|
|
|
|
|
repo_e = []
|
|
|
|
uninstall_e = []
|
|
|
|
install_e = []
|
|
|
|
|
|
|
|
async with ctx.typing():
|
|
|
|
for repo in export:
|
|
|
|
# Most of this code is from the Downloader cog's repo_add funciton.
|
|
|
|
name = repo['name']
|
|
|
|
branch = repo['branch']
|
|
|
|
url = repo['url']
|
|
|
|
cogs = repo['cogs']
|
|
|
|
|
2024-01-31 14:18:37 -05:00
|
|
|
if 'PyLav/Red-Cogs' in url:
|
2024-01-31 14:15:46 -05:00
|
|
|
repo_e.append("PyLav cogs are not supported.")
|
|
|
|
continue
|
|
|
|
if name.startswith('.') or name.endswith('.'):
|
|
|
|
repo_e.append(f"Invalid repository name: {name}\nRepository names cannot start or end with a dot.")
|
|
|
|
continue
|
|
|
|
if re.match(r"^[a-zA-Z0-9_\-\.]+$", name) is None:
|
|
|
|
repo_e.append(f"Invalid repository name: {name}\nRepository names may only contain letters, numbers, underscores, hyphens, and dots.")
|
|
|
|
continue
|
|
|
|
|
|
|
|
try:
|
|
|
|
repository = await downloader._repo_manager.add_repo(name, url, branch) # pylint: disable=protected-access
|
|
|
|
|
|
|
|
except errors.ExistingGitRepo:
|
|
|
|
repo_e.append(f"Repository {name} already exists.")
|
|
|
|
continue
|
|
|
|
|
|
|
|
except errors.AuthenticationError as err:
|
|
|
|
repo_e.append(f"Authentication error while adding repository {name}. See logs for more information.")
|
|
|
|
self.log.exception(
|
|
|
|
"Something went wrong whilst cloning %s (to revision %s)",
|
|
|
|
url,
|
|
|
|
branch,
|
|
|
|
exc_info=err,
|
|
|
|
)
|
|
|
|
continue
|
|
|
|
|
|
|
|
except errors.CloningError as err:
|
|
|
|
repo_e.append(f"Cloning error while adding repository {name}. See logs for more information.")
|
|
|
|
self.log.exception(
|
|
|
|
"Something went wrong whilst cloning %s (to revision %s)",
|
|
|
|
url,
|
|
|
|
branch,
|
|
|
|
exc_info=err,
|
|
|
|
)
|
|
|
|
continue
|
|
|
|
|
|
|
|
except OSError:
|
|
|
|
repo_e.append(f"OS error while adding repository {name}. See logs for more information.")
|
|
|
|
self.log.exception(
|
|
|
|
"Something went wrong trying to add repo %s under name %s",
|
|
|
|
url,
|
|
|
|
name
|
|
|
|
)
|
|
|
|
continue
|
|
|
|
|
|
|
|
else:
|
|
|
|
cog_names = []
|
|
|
|
for cog in cogs:
|
|
|
|
cog_names.append(cog['name'])
|
|
|
|
|
|
|
|
for cog in set(cog_names):
|
|
|
|
poss_installed_path = (await downloader.cog_install_path()) / cog
|
|
|
|
if poss_installed_path.exists():
|
|
|
|
with contextlib.suppress(commands.ExtensionNotLoaded):
|
|
|
|
await ctx.bot.unload_extension(cog)
|
|
|
|
await ctx.bot.remove_loaded_package(cog)
|
|
|
|
await downloader._delete_cog(poss_installed_path)
|
|
|
|
else:
|
|
|
|
uninstall_e.append(f"Failed to uninstall {cog}")
|
|
|
|
await downloader._remove_from_installed(cogs)
|
|
|
|
|
|
|
|
for cog in cogs:
|
|
|
|
cog_name = cog['name']
|
|
|
|
cog_pinned = cog['pinned']
|
|
|
|
if cog_pinned:
|
|
|
|
commit = cog['commit']
|
|
|
|
else:
|
|
|
|
commit = None
|
|
|
|
|
|
|
|
async with repository.checkout(commit, exit_to_rev=repo.branch):
|
|
|
|
cogs, message = await downloader._filter_incorrect_cogs_by_names(repository, [cog_name]) # pylint: disable=protected-access
|
|
|
|
if not cogs:
|
|
|
|
install_e.append(message)
|
|
|
|
continue
|
|
|
|
failed_reqs = await downloader._install_requirements(cogs)
|
|
|
|
if failed_reqs:
|
|
|
|
install_e.append(f"Failed to install {cog_name} due to missing requirements: {failed_reqs}")
|
|
|
|
continue
|
|
|
|
|
|
|
|
installed_cogs, failed_cogs = await downloader._install_cogs(cogs)
|
|
|
|
|
|
|
|
if repo.available_libraries:
|
|
|
|
installed_libs, failed_libs = await repository.install_libraries(target_dir=downloader.SHAREDLIB_PATH, req_target_dir=downloader.LIB_PATH)
|
|
|
|
|
|
|
|
if cog_pinned:
|
|
|
|
for cog in installed_cogs:
|
|
|
|
cog.pinned = True
|
|
|
|
|
|
|
|
await downloader._save_to_installed(installed_cogs + installed_libs)
|
|
|
|
if failed_cogs:
|
|
|
|
install_e.append(f"Failed to install {failed_cogs}")
|
|
|
|
if failed_libs:
|
|
|
|
install_e.append(f"Failed to install {failed_libs} required for {cog_name}")
|
|
|
|
await ctx.send("Import complete!\nErrors:", file=text_to_file(f"Repositories:\n{repo_e}\n\nUninstalled Cogs:\n{uninstall_e}\n\nInstalled Cogs:\n{install_e}", 'backup-errors.log'))
|