SeaCogs/backup/backup.py

321 lines
13 KiB
Python
Raw Permalink Normal View History

2024-01-31 12:12:38 -05:00
# _____ _
# / ____| (_)
# | (___ ___ __ _ _____ ___ _ __ ___ _ __ ___ ___ _ __
# \___ \ / _ \/ _` / __\ \ /\ / / | '_ ` _ \| '_ ` _ \ / _ \ '__|
# ____) | __/ (_| \__ \\ V V /| | | | | | | | | | | | __/ |
# |_____/ \___|\__,_|___/ \_/\_/ |_|_| |_| |_|_| |_| |_|\___|_|
import contextlib
2024-01-31 12:12:38 -05:00
import json
2024-01-31 13:07:10 -05:00
import re
2024-01-31 12:12:38 -05:00
from red_commons.logging import getLogger
2024-01-31 14:54:36 -05:00
from redbot.cogs.downloader import errors
from redbot.cogs.downloader.converters import InstalledCog
2024-02-02 11:22:08 -05:00
from redbot.core import commands
from redbot.core.bot import Red
2024-03-29 03:29:35 -04:00
from redbot.core.utils.chat_formatting import error, humanize_list, text_to_file
2024-01-31 12:12:38 -05:00
2024-02-02 11:22:08 -05:00
2024-02-28 10:56:46 -05:00
# pylint: disable=protected-access
2024-01-31 12:12:38 -05:00
class Backup(commands.Cog):
"""A utility to make reinstalling repositories and cogs after migrating the bot far easier."""
__author__ = ["SeaswimmerTheFsh"]
__version__ = "1.1.0"
__documentation__ = "https://seacogs.coastalcommits.com/backup/"
2024-01-31 12:12:38 -05:00
def __init__(self, bot: Red):
super().__init__()
self.bot = bot
2024-04-08 05:55:35 -04:00
self.logger = getLogger("red.SeaCogs.Backup")
def format_help_for_context(self, ctx: commands.Context) -> str:
pre_processed = super().format_help_for_context(ctx) or ""
n = "\n" if "\n\n" not in pre_processed else ""
text = [
f"{pre_processed}{n}",
f"Cog Version: **{self.__version__}**",
f"Author: {humanize_list(self.__author__)}",
f"Documentation: {self.__documentation__}",
]
return "\n".join(text)
2024-01-31 12:12:38 -05:00
@commands.group(autohelp=True)
@commands.is_owner()
async def backup(self, ctx: commands.Context):
"""Backup your installed cogs."""
2024-02-02 11:22:08 -05:00
@backup.command(name="export")
2024-01-31 12:12:38 -05:00
@commands.is_owner()
async def backup_export(self, ctx: commands.Context):
"""Export your installed repositories and cogs to a file."""
downloader = ctx.bot.get_cog("Downloader")
if downloader is None:
2024-02-02 11:22:08 -05:00
await ctx.send(
error(
f"You do not have the `Downloader` cog loaded. Please run `{ctx.prefix}load downloader` and try again."
)
)
2024-01-31 12:12:38 -05:00
return
2024-01-31 12:18:55 -05:00
2024-02-02 11:27:28 -05:00
all_repos = list(downloader._repo_manager.repos)
2024-01-31 12:18:55 -05:00
export_data = []
for repo in all_repos:
repo_dict = {
"name": repo.name,
"url": repo.url,
2024-01-31 13:33:39 -05:00
"branch": repo.branch,
2024-02-02 11:22:08 -05:00
"cogs": [],
2024-01-31 12:18:55 -05:00
}
cogs = await downloader.installed_cogs()
for cog in cogs:
if cog.repo_name == repo.name:
cog_dict = {
"name": cog.name,
2024-02-02 11:22:08 -05:00
# "loaded": cog.name in ctx.bot.extensions.keys(),
# this functionality was planned but never implemented due to Red limitations
# and the possibility of restoration functionality being added to Core
"pinned": cog.pinned,
2024-02-02 11:22:08 -05:00
"commit": cog.commit,
}
repo_dict["cogs"].append(cog_dict)
2024-01-31 12:18:55 -05:00
export_data.append(repo_dict)
2024-02-02 11:22:08 -05:00
await ctx.send(
file=text_to_file(json.dumps(export_data, indent=4), "backup.json")
)
2024-01-31 12:35:01 -05:00
2024-02-02 11:22:08 -05:00
@backup.command(name="import")
2024-01-31 12:35:01 -05:00
@commands.is_owner()
async def backup_import(self, ctx: commands.Context):
"""Import your installed repositories and cogs from an export file."""
try:
2024-01-31 13:22:56 -05:00
export = json.loads(await ctx.message.attachments[0].read())
2024-01-31 13:21:24 -05:00
except (json.JSONDecodeError, IndexError):
try:
export = json.loads(await ctx.message.reference.resolved.attachments[0].read())
except (json.JSONDecodeError, IndexError, AttributeError):
await ctx.send(error("Please provide a valid JSON export file."))
return
2024-01-31 12:35:01 -05:00
downloader = ctx.bot.get_cog("Downloader")
if downloader is None:
2024-02-02 11:22:08 -05:00
await ctx.send(
error(
f"You do not have the `Downloader` cog loaded. Please run `{ctx.prefix}load downloader` and try again."
)
)
2024-01-31 12:35:01 -05:00
return
repo_s = []
uninstall_s = []
install_s = []
repo_e = []
uninstall_e = []
install_e = []
async with ctx.typing():
for repo in export:
2024-01-31 14:54:36 -05:00
# Most of this code is from the Downloader cog.
2024-02-02 11:22:08 -05:00
name = repo["name"]
branch = repo["branch"]
url = repo["url"]
cogs = repo["cogs"]
2024-02-02 11:22:08 -05:00
if "PyLav/Red-Cogs" in url:
repo_e.append("PyLav cogs are not supported.")
continue
2024-02-02 11:22:08 -05:00
if name.startswith(".") or name.endswith("."):
repo_e.append(
f"Invalid repository name: {name}\nRepository names cannot start or end with a dot."
)
continue
if re.match(r"^[a-zA-Z0-9_\-\.]+$", name) is None:
2024-02-02 11:22:08 -05:00
repo_e.append(
f"Invalid repository name: {name}\nRepository names may only contain letters, numbers, underscores, hyphens, and dots."
)
continue
try:
2024-02-02 11:22:08 -05:00
repository = await downloader._repo_manager.add_repo(
url, name, branch
)
2024-02-02 11:22:08 -05:00
repo_s.append(
f"Added repository {name} from {url} on branch {branch}."
)
self.logger.debug(
"Added repository %s from %s on branch %s", name, url, branch
)
except errors.ExistingGitRepo:
repo_e.append(f"Repository {name} already exists.")
2024-02-02 11:22:08 -05:00
repository = downloader._repo_manager.get_repo(
name
)
self.logger.debug("Repository %s already exists", name)
2024-03-29 03:29:35 -04:00
except errors.AuthenticationError as err:
repo_e.append(f"Authentication error while adding repository {name}. See logs for more information.")
self.logger.exception(
"Something went wrong whilst cloning %s (to revision %s)",
url,
branch,
exc_info=err,
)
continue
except errors.CloningError as err:
2024-02-02 11:22:08 -05:00
repo_e.append(
f"Cloning error while adding repository {name}. See logs for more information."
)
2024-01-31 14:22:46 -05:00
self.logger.exception(
"Something went wrong whilst cloning %s (to revision %s)",
url,
branch,
exc_info=err,
)
continue
except OSError:
2024-02-02 11:22:08 -05:00
repo_e.append(
f"OS error while adding repository {name}. See logs for more information."
)
2024-01-31 14:22:46 -05:00
self.logger.exception(
"Something went wrong trying to add repo %s under name %s",
url,
2024-02-02 11:22:08 -05:00
name,
)
continue
cog_modules = []
for cog in cogs:
2024-01-31 15:56:28 -05:00
# If you're forking this cog, make sure to change these strings!
2024-02-02 11:22:08 -05:00
if cog["name"] == "backup" and "SeaswimmerTheFsh/SeaCogs" in url:
2024-01-31 15:56:28 -05:00
continue
try:
2024-02-02 11:22:08 -05:00
cog_module = await InstalledCog.convert(ctx, cog["name"])
except commands.BadArgument:
uninstall_e.append(f"Failed to uninstall {cog['name']}")
continue
cog_modules.append(cog_module)
for cog in set(cog.name for cog in cog_modules):
poss_installed_path = (await downloader.cog_install_path()) / cog
if poss_installed_path.exists():
with contextlib.suppress(commands.ExtensionNotLoaded):
await ctx.bot.unload_extension(cog)
await ctx.bot.remove_loaded_package(cog)
2024-02-02 11:22:08 -05:00
await downloader._delete_cog(
poss_installed_path
)
uninstall_s.append(f"Uninstalled {cog}")
self.logger.debug("Uninstalled %s", cog)
else:
uninstall_e.append(f"Failed to uninstall {cog}")
self.logger.warning("Failed to uninstall %s", cog)
2024-02-02 11:22:08 -05:00
await downloader._remove_from_installed(
cog_modules
)
for cog in cogs:
2024-02-02 11:22:08 -05:00
cog_name = cog["name"]
cog_pinned = cog["pinned"]
if cog_pinned:
2024-02-02 11:22:08 -05:00
commit = cog["commit"]
else:
commit = None
# If you're forking this cog, make sure to change these strings!
2024-02-02 11:22:08 -05:00
if cog_name == "backup" and "SeaswimmerTheFsh/SeaCogs" in url:
continue
2024-02-02 11:22:08 -05:00
async with repository.checkout(
commit, exit_to_rev=repository.branch
):
cogs_c, message = (
await downloader._filter_incorrect_cogs_by_names(
repository, [cog_name]
)
)
if not cogs_c:
install_e.append(message)
self.logger.error(message)
continue
2024-02-02 11:22:08 -05:00
failed_reqs = await downloader._install_requirements(
cogs_c
)
if failed_reqs:
2024-02-02 11:22:08 -05:00
install_e.append(
f"Failed to install {cog_name} due to missing requirements: {failed_reqs}"
)
self.logger.error(
"Failed to install %s due to missing requirements: %s",
cog_name,
failed_reqs,
)
2024-01-31 14:37:51 -05:00
continue
2024-02-02 11:22:08 -05:00
installed_cogs, failed_cogs = await downloader._install_cogs(
cogs_c
)
if repository.available_libraries:
2024-02-02 11:22:08 -05:00
installed_libs, failed_libs = (
await repository.install_libraries(
target_dir=downloader.SHAREDLIB_PATH,
req_target_dir=downloader.LIB_PATH,
)
)
else:
installed_libs = None
failed_libs = None
2024-01-31 15:48:23 -05:00
if cog_pinned:
for cog in installed_cogs:
cog.pinned = True
2024-02-02 11:22:08 -05:00
await downloader._save_to_installed(
installed_cogs + installed_libs
if installed_libs
else installed_cogs
)
if installed_cogs:
installed_cog_name = installed_cogs[0].name
install_s.append(f"Installed {installed_cog_name}")
self.logger.debug("Installed %s", installed_cog_name)
if installed_libs:
for lib in installed_libs:
2024-02-02 11:22:08 -05:00
install_s.append(
f"Installed {lib.name} required for {cog_name}"
)
self.logger.debug(
"Installed %s required for %s", lib.name, cog_name
)
if failed_cogs:
failed_cog_name = failed_cogs[0].name
install_e.append(f"Failed to install {failed_cog_name}")
self.logger.error("Failed to install %s", failed_cog_name)
if failed_libs:
for lib in failed_libs:
2024-02-02 11:22:08 -05:00
install_e.append(
f"Failed to install {lib.name} required for {cog_name}"
)
self.logger.error(
"Failed to install %s required for %s",
lib.name,
cog_name,
)
await ctx.send(
"Import complete!",
file=text_to_file(
f"Repositories:\n{repo_s}\n\nRepository Errors:\n{repo_e}\n\nUninstalled Cogs:\n{uninstall_s}\n\nUninstalled Cogs Errors:\n{uninstall_e}\n\nInstalled Cogs:\n{install_s}\n\nInstalled Cogs Errors:\n{install_e}",
"backup.log",
),
)