misc(repository): black reformat

This commit is contained in:
Seaswimmer 2024-02-02 11:22:08 -05:00
parent 519e3056ab
commit fa3b353704
Signed by untrusted user: cswimr
GPG key ID: B8953EC01E5C4063
5 changed files with 186 additions and 86 deletions

View file

@ -2,6 +2,7 @@ from redbot.core import Config
config: Config = Config.get_conf(None, identifier=481923957134912, cog_name="Aurora") config: Config = Config.get_conf(None, identifier=481923957134912, cog_name="Aurora")
def register_config(config_obj: Config): def register_config(config_obj: Config):
config_obj.register_guild( config_obj.register_guild(
show_moderator=True, show_moderator=True,
@ -16,12 +17,12 @@ def register_config(config_obj: Config):
history_pagesize=5, history_pagesize=5,
history_inline_pagesize=6, history_inline_pagesize=6,
auto_evidenceformat=False, auto_evidenceformat=False,
addrole_whitelist = [] addrole_whitelist=[],
) )
config_obj.register_user( config_obj.register_user(
history_ephemeral=None, history_ephemeral=None,
history_inline=None, history_inline=None,
history_pagesize=None, history_pagesize=None,
history_inline_pagesize=None, history_inline_pagesize=None,
auto_evidenceformat = None auto_evidenceformat=None,
) )

View file

@ -1,21 +1,23 @@
# pylint: disable=cyclic-import # pylint: disable=cyclic-import
import json import json
import time
import sqlite3 import sqlite3
import time
from datetime import datetime, timedelta from datetime import datetime, timedelta
from discord import Guild from discord import Guild
from redbot.core import data_manager from redbot.core import data_manager
from .logger import logger from .logger import logger
from .utils import generate_dict, get_next_case_number, convert_timedelta_to_str from .utils import convert_timedelta_to_str, generate_dict, get_next_case_number
def connect() -> sqlite3.Connection: def connect() -> sqlite3.Connection:
"""Connects to the SQLite database, and returns a connection object.""" """Connects to the SQLite database, and returns a connection object."""
try: try:
connection = sqlite3.connect(database=data_manager.cog_data_path(raw_name='Aurora') / 'aurora.db') connection = sqlite3.connect(
database=data_manager.cog_data_path(raw_name="Aurora") / "aurora.db"
)
return connection return connection
except sqlite3.OperationalError as e: except sqlite3.OperationalError as e:

View file

@ -4,7 +4,7 @@ import json
from datetime import timedelta as td from datetime import timedelta as td
from typing import Union from typing import Union
from discord import Guild, Interaction, Member, User, SelectOption from discord import Guild, Interaction, Member, SelectOption, User
from discord.errors import Forbidden, NotFound from discord.errors import Forbidden, NotFound
from redbot.core import commands from redbot.core import commands
from redbot.core.utils.chat_formatting import error from redbot.core.utils.chat_formatting import error
@ -46,7 +46,9 @@ async def check_moddable(
"""Checks if a moderator can moderate a target.""" """Checks if a moderator can moderate a target."""
if check_permissions(interaction.client.user, permissions, guild=interaction.guild): if check_permissions(interaction.client.user, permissions, guild=interaction.guild):
await interaction.response.send_message( await interaction.response.send_message(
error(f"I do not have the `{permissions}` permission, required for this action."), error(
f"I do not have the `{permissions}` permission, required for this action."
),
ephemeral=True, ephemeral=True,
) )
return False return False
@ -54,7 +56,9 @@ async def check_moddable(
if await config.guild(interaction.guild).use_discord_permissions() is True: if await config.guild(interaction.guild).use_discord_permissions() is True:
if check_permissions(interaction.user, permissions, guild=interaction.guild): if check_permissions(interaction.user, permissions, guild=interaction.guild):
await interaction.response.send_message( await interaction.response.send_message(
error(f"You do not have the `{permissions}` permission, required for this action."), error(
f"You do not have the `{permissions}` permission, required for this action."
),
ephemeral=True, ephemeral=True,
) )
return False return False
@ -74,7 +78,9 @@ async def check_moddable(
if isinstance(target, Member): if isinstance(target, Member):
if interaction.user.top_role <= target.top_role: if interaction.user.top_role <= target.top_role:
await interaction.response.send_message( await interaction.response.send_message(
content=error("You cannot moderate members with a higher role than you!"), content=error(
"You cannot moderate members with a higher role than you!"
),
ephemeral=True, ephemeral=True,
) )
return False return False
@ -84,7 +90,9 @@ async def check_moddable(
<= target.top_role <= target.top_role
): ):
await interaction.response.send_message( await interaction.response.send_message(
content=error("You cannot moderate members with a role higher than the bot!"), content=error(
"You cannot moderate members with a role higher than the bot!"
),
ephemeral=True, ephemeral=True,
) )
return False return False
@ -133,7 +141,7 @@ def generate_dict(result):
"resolve_reason": result[12], "resolve_reason": result[12],
"expired": result[13], "expired": result[13],
"changes": json.loads(result[14]), "changes": json.loads(result[14]),
"metadata": json.loads(result[15]) "metadata": json.loads(result[15]),
} }
return case return case
@ -172,7 +180,11 @@ async def fetch_channel_dict(interaction: Interaction, channel_id: str):
if not channel: if not channel:
channel = await interaction.guild.fetch_channel(channel_id) channel = await interaction.guild.fetch_channel(channel_id)
channel_dict = {"id": channel.id, "name": channel.name, "mention": channel.mention} channel_dict = {
"id": channel.id,
"name": channel.name,
"mention": channel.mention,
}
except NotFound: except NotFound:
channel_dict = {"id": channel_id, "name": "Deleted Channel", "mention": None} channel_dict = {"id": channel_id, "name": "Deleted Channel", "mention": None}
@ -202,25 +214,31 @@ async def log(interaction: Interaction, moderation_id: int, resolved: bool = Fal
case = await fetch_case(moderation_id, interaction.guild.id) case = await fetch_case(moderation_id, interaction.guild.id)
if case: if case:
embed = await log_factory(interaction=interaction, case_dict=case, resolved=resolved) embed = await log_factory(
interaction=interaction, case_dict=case, resolved=resolved
)
try: try:
await logging_channel.send(embed=embed) await logging_channel.send(embed=embed)
except Forbidden: except Forbidden:
return return
async def send_evidenceformat(interaction: Interaction, case_dict: dict): async def send_evidenceformat(interaction: Interaction, case_dict: dict):
"""This function sends an ephemeral message to the moderator who took the moderation action, with a pre-made codeblock for use in the mod-evidence channel.""" """This function sends an ephemeral message to the moderator who took the moderation action, with a pre-made codeblock for use in the mod-evidence channel."""
from .factory import evidenceformat_factory from .factory import evidenceformat_factory
send_evidence_bool = (await config.user(interaction.user).auto_evidenceformat() send_evidence_bool = (
await config.user(interaction.user).auto_evidenceformat()
or await config.guild(interaction.guild).auto_evidenceformat() or await config.guild(interaction.guild).auto_evidenceformat()
or False) or False
)
if send_evidence_bool is False: if send_evidence_bool is False:
return return
content = await evidenceformat_factory(interaction=interaction, case_dict=case_dict) content = await evidenceformat_factory(interaction=interaction, case_dict=case_dict)
await interaction.followup.send(content=content, ephemeral=True) await interaction.followup.send(content=content, ephemeral=True)
def convert_timedelta_to_str(timedelta: td) -> str: def convert_timedelta_to_str(timedelta: td) -> str:
"""This function converts a timedelta object to a string.""" """This function converts a timedelta object to a string."""
total_seconds = int(timedelta.total_seconds()) total_seconds = int(timedelta.total_seconds())
@ -229,6 +247,7 @@ def convert_timedelta_to_str(timedelta: td) -> str:
seconds = total_seconds % 60 seconds = total_seconds % 60
return f"{hours}:{minutes}:{seconds}" return f"{hours}:{minutes}:{seconds}"
def get_bool_emoji(value: bool) -> str: def get_bool_emoji(value: bool) -> str:
"""Returns a unicode emoji based on a boolean value.""" """Returns a unicode emoji based on a boolean value."""
if value is True: if value is True:
@ -237,12 +256,14 @@ def get_bool_emoji(value: bool) -> str:
return "\N{NO ENTRY SIGN}" return "\N{NO ENTRY SIGN}"
return "\N{BLACK QUESTION MARK ORNAMENT}\N{VARIATION SELECTOR-16}" return "\N{BLACK QUESTION MARK ORNAMENT}\N{VARIATION SELECTOR-16}"
def get_pagesize_str(value: Union[int, None]) -> str: def get_pagesize_str(value: Union[int, None]) -> str:
"""Returns a string based on a pagesize value.""" """Returns a string based on a pagesize value."""
if value is None: if value is None:
return "\N{BLACK QUESTION MARK ORNAMENT}\N{VARIATION SELECTOR-16}" return "\N{BLACK QUESTION MARK ORNAMENT}\N{VARIATION SELECTOR-16}"
return str(value) + " cases per page" return str(value) + " cases per page"
def create_pagesize_options() -> list[SelectOption]: def create_pagesize_options() -> list[SelectOption]:
"""Returns a list of SelectOptions for pagesize configuration.""" """Returns a list of SelectOptions for pagesize configuration."""
options = [] options = []

View file

@ -6,16 +6,17 @@
# |_____/ \___|\__,_|___/ \_/\_/ |_|_| |_| |_|_| |_| |_|\___|_| # |_____/ \___|\__,_|___/ \_/\_/ |_|_| |_| |_|_| |_| |_|\___|_|
import contextlib import contextlib
import logging
import json import json
import logging
import re import re
from redbot.core import commands
from redbot.core.bot import Red
from redbot.cogs.downloader import errors from redbot.cogs.downloader import errors
from redbot.cogs.downloader.converters import InstalledCog from redbot.cogs.downloader.converters import InstalledCog
from redbot.core import commands
from redbot.core.bot import Red
from redbot.core.utils.chat_formatting import error, text_to_file from redbot.core.utils.chat_formatting import error, text_to_file
class Backup(commands.Cog): class Backup(commands.Cog):
"""A utility to make reinstalling repositories and cogs after migrating the bot far easier.""" """A utility to make reinstalling repositories and cogs after migrating the bot far easier."""
@ -32,16 +33,22 @@ class Backup(commands.Cog):
async def backup(self, ctx: commands.Context): async def backup(self, ctx: commands.Context):
"""Backup your installed cogs.""" """Backup your installed cogs."""
@backup.command(name='export') @backup.command(name="export")
@commands.is_owner() @commands.is_owner()
async def backup_export(self, ctx: commands.Context): async def backup_export(self, ctx: commands.Context):
"""Export your installed repositories and cogs to a file.""" """Export your installed repositories and cogs to a file."""
downloader = ctx.bot.get_cog("Downloader") downloader = ctx.bot.get_cog("Downloader")
if downloader is None: if downloader is None:
await ctx.send(error(f"You do not have the `Downloader` cog loaded. Please run `{ctx.prefix}load downloader` and try again.")) await ctx.send(
error(
f"You do not have the `Downloader` cog loaded. Please run `{ctx.prefix}load downloader` and try again."
)
)
return return
all_repos = list(downloader._repo_manager.repos) # pylint: disable=protected-access all_repos = list(
downloader._repo_manager.repos
) # pylint: disable=protected-access
export_data = [] export_data = []
@ -50,7 +57,7 @@ class Backup(commands.Cog):
"name": repo.name, "name": repo.name,
"url": repo.url, "url": repo.url,
"branch": repo.branch, "branch": repo.branch,
"cogs": [] "cogs": [],
} }
cogs = await downloader.installed_cogs() cogs = await downloader.installed_cogs()
@ -63,15 +70,17 @@ class Backup(commands.Cog):
# this functionality was planned but never implemented due to Red limitations # this functionality was planned but never implemented due to Red limitations
# and the possibility of restoration functionality being added to Core # and the possibility of restoration functionality being added to Core
"pinned": cog.pinned, "pinned": cog.pinned,
"commit": cog.commit "commit": cog.commit,
} }
repo_dict["cogs"].append(cog_dict) repo_dict["cogs"].append(cog_dict)
export_data.append(repo_dict) export_data.append(repo_dict)
await ctx.send(file=text_to_file(json.dumps(export_data, indent=4), 'backup.json')) await ctx.send(
file=text_to_file(json.dumps(export_data, indent=4), "backup.json")
)
@backup.command(name='import') @backup.command(name="import")
@commands.is_owner() @commands.is_owner()
async def backup_import(self, ctx: commands.Context): async def backup_import(self, ctx: commands.Context):
"""Import your installed repositories and cogs from an export file.""" """Import your installed repositories and cogs from an export file."""
@ -83,7 +92,11 @@ class Backup(commands.Cog):
downloader = ctx.bot.get_cog("Downloader") downloader = ctx.bot.get_cog("Downloader")
if downloader is None: if downloader is None:
await ctx.send(error(f"You do not have the `Downloader` cog loaded. Please run `{ctx.prefix}load downloader` and try again.")) await ctx.send(
error(
f"You do not have the `Downloader` cog loaded. Please run `{ctx.prefix}load downloader` and try again."
)
)
return return
repo_s = [] repo_s = []
@ -96,29 +109,41 @@ class Backup(commands.Cog):
async with ctx.typing(): async with ctx.typing():
for repo in export: for repo in export:
# Most of this code is from the Downloader cog. # Most of this code is from the Downloader cog.
name = repo['name'] name = repo["name"]
branch = repo['branch'] branch = repo["branch"]
url = repo['url'] url = repo["url"]
cogs = repo['cogs'] cogs = repo["cogs"]
if 'PyLav/Red-Cogs' in url: if "PyLav/Red-Cogs" in url:
repo_e.append("PyLav cogs are not supported.") repo_e.append("PyLav cogs are not supported.")
continue continue
if name.startswith('.') or name.endswith('.'): if name.startswith(".") or name.endswith("."):
repo_e.append(f"Invalid repository name: {name}\nRepository names cannot start or end with a dot.") repo_e.append(
f"Invalid repository name: {name}\nRepository names cannot start or end with a dot."
)
continue continue
if re.match(r"^[a-zA-Z0-9_\-\.]+$", name) is None: if re.match(r"^[a-zA-Z0-9_\-\.]+$", name) is None:
repo_e.append(f"Invalid repository name: {name}\nRepository names may only contain letters, numbers, underscores, hyphens, and dots.") repo_e.append(
f"Invalid repository name: {name}\nRepository names may only contain letters, numbers, underscores, hyphens, and dots."
)
continue continue
try: try:
repository = await downloader._repo_manager.add_repo(url, name, branch) # pylint: disable=protected-access repository = await downloader._repo_manager.add_repo(
repo_s.append(f"Added repository {name} from {url} on branch {branch}.") url, name, branch
self.logger.debug("Added repository %s from %s on branch %s", name, url, branch) ) # pylint: disable=protected-access
repo_s.append(
f"Added repository {name} from {url} on branch {branch}."
)
self.logger.debug(
"Added repository %s from %s on branch %s", name, url, branch
)
except errors.ExistingGitRepo: except errors.ExistingGitRepo:
repo_e.append(f"Repository {name} already exists.") repo_e.append(f"Repository {name} already exists.")
repository = downloader._repo_manager.get_repo(name) # pylint: disable=protected-access repository = downloader._repo_manager.get_repo(
name
) # pylint: disable=protected-access
self.logger.debug("Repository %s already exists", name) self.logger.debug("Repository %s already exists", name)
# This is commented out because errors.AuthenticationError is not yet implemented in Red 3.5.5's Downloader cog. # This is commented out because errors.AuthenticationError is not yet implemented in Red 3.5.5's Downloader cog.
@ -134,7 +159,9 @@ class Backup(commands.Cog):
# continue # continue
except errors.CloningError as err: except errors.CloningError as err:
repo_e.append(f"Cloning error while adding repository {name}. See logs for more information.") repo_e.append(
f"Cloning error while adding repository {name}. See logs for more information."
)
self.logger.exception( self.logger.exception(
"Something went wrong whilst cloning %s (to revision %s)", "Something went wrong whilst cloning %s (to revision %s)",
url, url,
@ -144,21 +171,23 @@ class Backup(commands.Cog):
continue continue
except OSError: except OSError:
repo_e.append(f"OS error while adding repository {name}. See logs for more information.") repo_e.append(
f"OS error while adding repository {name}. See logs for more information."
)
self.logger.exception( self.logger.exception(
"Something went wrong trying to add repo %s under name %s", "Something went wrong trying to add repo %s under name %s",
url, url,
name name,
) )
continue continue
cog_modules = [] cog_modules = []
for cog in cogs: for cog in cogs:
# If you're forking this cog, make sure to change these strings! # If you're forking this cog, make sure to change these strings!
if cog['name'] == "backup" and 'SeaswimmerTheFsh/SeaCogs' in url: if cog["name"] == "backup" and "SeaswimmerTheFsh/SeaCogs" in url:
continue continue
try: try:
cog_module = await InstalledCog.convert(ctx, cog['name']) cog_module = await InstalledCog.convert(ctx, cog["name"])
except commands.BadArgument: except commands.BadArgument:
uninstall_e.append(f"Failed to uninstall {cog['name']}") uninstall_e.append(f"Failed to uninstall {cog['name']}")
continue continue
@ -170,42 +199,67 @@ class Backup(commands.Cog):
with contextlib.suppress(commands.ExtensionNotLoaded): with contextlib.suppress(commands.ExtensionNotLoaded):
await ctx.bot.unload_extension(cog) await ctx.bot.unload_extension(cog)
await ctx.bot.remove_loaded_package(cog) await ctx.bot.remove_loaded_package(cog)
await downloader._delete_cog(poss_installed_path) # pylint: disable=protected-access await downloader._delete_cog(
poss_installed_path
) # pylint: disable=protected-access
uninstall_s.append(f"Uninstalled {cog}") uninstall_s.append(f"Uninstalled {cog}")
self.logger.debug("Uninstalled %s", cog) self.logger.debug("Uninstalled %s", cog)
else: else:
uninstall_e.append(f"Failed to uninstall {cog}") uninstall_e.append(f"Failed to uninstall {cog}")
self.logger.warning("Failed to uninstall %s", cog) self.logger.warning("Failed to uninstall %s", cog)
await downloader._remove_from_installed(cog_modules) # pylint: disable=protected-access await downloader._remove_from_installed(
cog_modules
) # pylint: disable=protected-access
for cog in cogs: for cog in cogs:
cog_name = cog['name'] cog_name = cog["name"]
cog_pinned = cog['pinned'] cog_pinned = cog["pinned"]
if cog_pinned: if cog_pinned:
commit = cog['commit'] commit = cog["commit"]
else: else:
commit = None commit = None
# If you're forking this cog, make sure to change these strings! # If you're forking this cog, make sure to change these strings!
if cog_name == 'backup' and 'SeaswimmerTheFsh/SeaCogs' in url: if cog_name == "backup" and "SeaswimmerTheFsh/SeaCogs" in url:
continue continue
async with repository.checkout(commit, exit_to_rev=repository.branch): async with repository.checkout(
cogs_c, message = await downloader._filter_incorrect_cogs_by_names(repository, [cog_name]) # pylint: disable=protected-access commit, exit_to_rev=repository.branch
):
cogs_c, message = (
await downloader._filter_incorrect_cogs_by_names(
repository, [cog_name]
)
) # pylint: disable=protected-access
if not cogs_c: if not cogs_c:
install_e.append(message) install_e.append(message)
self.logger.error(message) self.logger.error(message)
continue continue
failed_reqs = await downloader._install_requirements(cogs_c) # pylint: disable=protected-access failed_reqs = await downloader._install_requirements(
cogs_c
) # pylint: disable=protected-access
if failed_reqs: if failed_reqs:
install_e.append(f"Failed to install {cog_name} due to missing requirements: {failed_reqs}") install_e.append(
self.logger.error("Failed to install %s due to missing requirements: %s", cog_name, failed_reqs) f"Failed to install {cog_name} due to missing requirements: {failed_reqs}"
)
self.logger.error(
"Failed to install %s due to missing requirements: %s",
cog_name,
failed_reqs,
)
continue continue
installed_cogs, failed_cogs = await downloader._install_cogs(cogs_c) # pylint: disable=protected-access installed_cogs, failed_cogs = await downloader._install_cogs(
cogs_c
) # pylint: disable=protected-access
if repository.available_libraries: if repository.available_libraries:
installed_libs, failed_libs = await repository.install_libraries(target_dir=downloader.SHAREDLIB_PATH, req_target_dir=downloader.LIB_PATH) installed_libs, failed_libs = (
await repository.install_libraries(
target_dir=downloader.SHAREDLIB_PATH,
req_target_dir=downloader.LIB_PATH,
)
)
else: else:
installed_libs = None installed_libs = None
failed_libs = None failed_libs = None
@ -214,21 +268,41 @@ class Backup(commands.Cog):
for cog in installed_cogs: for cog in installed_cogs:
cog.pinned = True cog.pinned = True
await downloader._save_to_installed(installed_cogs + installed_libs if installed_libs else installed_cogs) # pylint: disable=protected-access await downloader._save_to_installed(
installed_cogs + installed_libs
if installed_libs
else installed_cogs
) # pylint: disable=protected-access
if installed_cogs: if installed_cogs:
installed_cog_name = installed_cogs[0].name installed_cog_name = installed_cogs[0].name
install_s.append(f"Installed {installed_cog_name}") install_s.append(f"Installed {installed_cog_name}")
self.logger.debug("Installed %s", installed_cog_name) self.logger.debug("Installed %s", installed_cog_name)
if installed_libs: if installed_libs:
for lib in installed_libs: for lib in installed_libs:
install_s.append(f"Installed {lib.name} required for {cog_name}") install_s.append(
self.logger.debug("Installed %s required for %s", lib.name, cog_name) f"Installed {lib.name} required for {cog_name}"
)
self.logger.debug(
"Installed %s required for %s", lib.name, cog_name
)
if failed_cogs: if failed_cogs:
failed_cog_name = failed_cogs[0].name failed_cog_name = failed_cogs[0].name
install_e.append(f"Failed to install {failed_cog_name}") install_e.append(f"Failed to install {failed_cog_name}")
self.logger.error("Failed to install %s", failed_cog_name) self.logger.error("Failed to install %s", failed_cog_name)
if failed_libs: if failed_libs:
for lib in failed_libs: for lib in failed_libs:
install_e.append(f"Failed to install {lib.name} required for {cog_name}") install_e.append(
self.logger.error("Failed to install %s required for %s", lib.name, cog_name) f"Failed to install {lib.name} required for {cog_name}"
await ctx.send("Import complete!", file=text_to_file(f"Repositories:\n{repo_s}\n\nRepository Errors:\n{repo_e}\n\nUninstalled Cogs:\n{uninstall_s}\n\nUninstalled Cogs Errors:\n{uninstall_e}\n\nInstalled Cogs:\n{install_s}\n\nInstalled Cogs Errors:\n{install_e}", 'backup.log')) )
self.logger.error(
"Failed to install %s required for %s",
lib.name,
cog_name,
)
await ctx.send(
"Import complete!",
file=text_to_file(
f"Repositories:\n{repo_s}\n\nRepository Errors:\n{repo_e}\n\nUninstalled Cogs:\n{uninstall_s}\n\nUninstalled Cogs Errors:\n{uninstall_e}\n\nInstalled Cogs:\n{install_s}\n\nInstalled Cogs Errors:\n{install_e}",
"backup.log",
),
)

View file

@ -24,7 +24,9 @@ class Nerdify(commands.Cog):
self.bot = bot self.bot = bot
@commands.command(aliases=["nerd"]) @commands.command(aliases=["nerd"])
async def nerdify(self, ctx: commands.Context, *, text: Optional[str] = None) -> None: async def nerdify(
self, ctx: commands.Context, *, text: Optional[str] = None
) -> None:
"""Nerdify the replied to message, previous message, or your own text.""" """Nerdify the replied to message, previous message, or your own text."""
if not text: if not text:
if hasattr(ctx.message, "reference") and ctx.message.reference: if hasattr(ctx.message, "reference") and ctx.message.reference:
@ -57,7 +59,7 @@ class Nerdify(commands.Cog):
Returns: Returns:
The converted text.""" The converted text."""
return f"\"{text}\" 🤓" return f'"{text}" 🤓'
async def type_message( async def type_message(
self, destination: discord.abc.Messageable, content: str, **kwargs: Any self, destination: discord.abc.Messageable, content: str, **kwargs: Any