SeaCogs/backup/backup.py
SeaswimmerTheFsh f57b2c5196
All checks were successful
Actions / Lint Code (Pylint) (pull_request) Successful in 15s
Actions / Build Documentation (MkDocs) (pull_request) Successful in 11s
fix(backup): fixed some logging stuff
2024-01-31 15:05:25 -05:00

215 lines
10 KiB
Python

# _____ _
# / ____| (_)
# | (___ ___ __ _ _____ ___ _ __ ___ _ __ ___ ___ _ __
# \___ \ / _ \/ _` / __\ \ /\ / / | '_ ` _ \| '_ ` _ \ / _ \ '__|
# ____) | __/ (_| \__ \\ V V /| | | | | | | | | | | | __/ |
# |_____/ \___|\__,_|___/ \_/\_/ |_|_| |_| |_|_| |_| |_|\___|_|
import contextlib
import logging
import json
import re
from redbot.core import commands
from redbot.core.bot import Red
from redbot.cogs.downloader import errors
from redbot.cogs.downloader.converters import InstalledCog
from redbot.core.utils.chat_formatting import error, text_to_file
class Backup(commands.Cog):
"""A utility to make reinstalling repositories and cogs after migrating the bot far easier."""
__author__ = "SeaswimmerTheFsh"
__version__ = "1.0.0"
def __init__(self, bot: Red):
super().__init__()
self.bot = bot
self.logger = logging.getLogger("red.sea.backup")
@commands.group(autohelp=True)
@commands.is_owner()
async def backup(self, ctx: commands.Context):
"""Backup your installed cogs."""
@backup.command(name='export')
@commands.is_owner()
async def backup_export(self, ctx: commands.Context):
"""Export your installed repositories and cogs to a file."""
downloader = ctx.bot.get_cog("Downloader")
if downloader is None:
await ctx.send(error(f"You do not have the `Downloader` cog loaded. Please run `{ctx.prefix}load downloader` and try again."))
return
all_repos = list(downloader._repo_manager.repos) # pylint: disable=protected-access
export_data = []
for repo in all_repos:
repo_dict = {
"name": repo.name,
"url": repo.url,
"branch": repo.branch,
"cogs": []
}
cogs = await downloader.installed_cogs()
for cog in cogs:
if cog.repo_name == repo.name:
cog_dict = {
"name": cog.name,
"pinned": cog.pinned,
"commit": cog.commit
}
repo_dict["cogs"].append(cog_dict)
export_data.append(repo_dict)
await ctx.send(file=text_to_file(json.dumps(export_data, indent=4), 'backup.json'))
@backup.command(name='import')
@commands.is_owner()
async def backup_import(self, ctx: commands.Context):
"""Import your installed repositories and cogs from an export file."""
try:
export = json.loads(await ctx.message.attachments[0].read())
except (json.JSONDecodeError, IndexError):
await ctx.send(error("Please provide a valid JSON export file."))
return
downloader = ctx.bot.get_cog("Downloader")
if downloader is None:
await ctx.send(error(f"You do not have the `Downloader` cog loaded. Please run `{ctx.prefix}load downloader` and try again."))
return
repo_s = []
uninstall_s = []
install_s = []
repo_e = []
uninstall_e = []
install_e = []
async with ctx.typing():
for repo in export:
# Most of this code is from the Downloader cog.
name = repo['name']
branch = repo['branch']
url = repo['url']
cogs = repo['cogs']
if 'PyLav/Red-Cogs' in url:
repo_e.append("PyLav cogs are not supported.")
continue
if name.startswith('.') or name.endswith('.'):
repo_e.append(f"Invalid repository name: {name}\nRepository names cannot start or end with a dot.")
continue
if re.match(r"^[a-zA-Z0-9_\-\.]+$", name) is None:
repo_e.append(f"Invalid repository name: {name}\nRepository names may only contain letters, numbers, underscores, hyphens, and dots.")
continue
try:
repository = await downloader._repo_manager.add_repo(url, name, branch) # pylint: disable=protected-access
repo_s.append(f"Added repository {name} from {url} on branch {branch}.")
except errors.ExistingGitRepo:
repo_e.append(f"Repository {name} already exists.")
repository = downloader._repo_manager.get_repo(name) # pylint: disable=protected-access
# This is commented out because errors.AuthenticationError is not yet implemented in Red 3.5.5's Downloader cog.
# Rather, it is only in the development version and will be added in version 3.5.6 (or whatever the next version is).
# except errors.AuthenticationError as err:
# repo_e.append(f"Authentication error while adding repository {name}. See logs for more information.")
# self.logger.exception(
# "Something went wrong whilst cloning %s (to revision %s)",
# url,
# branch,
# exc_info=err,
# )
# continue
except errors.CloningError as err:
repo_e.append(f"Cloning error while adding repository {name}. See logs for more information.")
self.logger.exception(
"Something went wrong whilst cloning %s (to revision %s)",
url,
branch,
exc_info=err,
)
continue
except OSError:
repo_e.append(f"OS error while adding repository {name}. See logs for more information.")
self.logger.exception(
"Something went wrong trying to add repo %s under name %s",
url,
name
)
continue
else:
cog_modules = []
for cog in cogs:
try:
cog_module = await InstalledCog.convert(ctx, cog['name'])
except commands.BadArgument:
uninstall_e.append(f"Failed to uninstall {cog['name']}")
continue
cog_modules.append(cog_module)
for cog in set(cog.name for cog in cog_modules):
poss_installed_path = (await downloader.cog_install_path()) / cog
if poss_installed_path.exists():
with contextlib.suppress(commands.ExtensionNotLoaded):
await ctx.bot.unload_extension(cog)
await ctx.bot.remove_loaded_package(cog)
await downloader._delete_cog(poss_installed_path) # pylint: disable=protected-access
uninstall_s.append(f"Uninstalled {cog}")
else:
uninstall_e.append(f"Failed to uninstall {cog}")
await downloader._remove_from_installed(cog_modules) # pylint: disable=protected-access
for cog in cogs:
cog_name = cog['name']
cog_pinned = cog['pinned']
if cog_pinned:
commit = cog['commit']
else:
commit = None
async with repository.checkout(commit, exit_to_rev=repository.branch):
cogs_c, message = await downloader._filter_incorrect_cogs_by_names(repository, [cog_name]) # pylint: disable=protected-access
if not cogs_c:
install_e.append(message)
continue
failed_reqs = await downloader._install_requirements(cogs_c) # pylint: disable=protected-access
if failed_reqs:
install_e.append(f"Failed to install {cog_name} due to missing requirements: {failed_reqs}")
continue
installed_cogs, failed_cogs = await downloader._install_cogs(cogs_c) # pylint: disable=protected-access
if repository.available_libraries:
installed_libs, failed_libs = await repository.install_libraries(target_dir=downloader.SHAREDLIB_PATH, req_target_dir=downloader.LIB_PATH)
else:
installed_libs = None
failed_libs = None
if cog_pinned:
for cog in installed_cogs:
cog.pinned = True
await downloader._save_to_installed(installed_cogs + installed_libs if installed_libs else installed_cogs) # pylint: disable=protected-access
if installed_cogs:
installed_cog_name = installed_cogs[0].name
install_s.append(f"Installed {installed_cog_name}")
if installed_libs:
for lib in installed_libs:
install_s.append(f"Installed {lib.name} required for {cog_name}")
if failed_cogs:
failed_cog_name = failed_cogs[0].name
install_e.append(f"Failed to install {failed_cog_name}")
if failed_libs:
for lib in failed_libs:
install_e.append(f"Failed to install {lib.name} required for {cog_name}")
await ctx.send("Import complete!", file=text_to_file(f"Repositories:\n{repo_s}\n\nRepository Errors:\n{repo_e}\n\nUninstalled Cogs:\n{uninstall_s}\n\nUninstalled Cogs Errors:\n{uninstall_e}\n\nInstalled Cogs:\n{install_s}\n\nInstalled Cogs Errors:\n{install_e}", 'backup.log'))