From 519e3056ab58b15e25b23f61865a31409f582f17 Mon Sep 17 00:00:00 2001 From: SeaswimmerTheFsh Date: Fri, 2 Feb 2024 11:21:56 -0500 Subject: [PATCH] feat(aurora): added addrole command --- aurora/aurora.py | 1076 +++++++++++++++++++++++++++-------- aurora/utilities/factory.py | 461 ++++++++++----- 2 files changed, 1167 insertions(+), 370 deletions(-) diff --git a/aurora/aurora.py b/aurora/aurora.py index 2cba6bc..474a55a 100644 --- a/aurora/aurora.py +++ b/aurora/aurora.py @@ -7,8 +7,8 @@ import json import os -import time import sqlite3 +import time from datetime import datetime, timedelta, timezone from math import ceil @@ -29,9 +29,27 @@ from aurora.menus.immune import Immune from aurora.menus.overrides import Overrides from aurora.utilities.config import config, register_config from aurora.utilities.database import connect, create_guild_table, fetch_case, mysql_log -from aurora.utilities.factory import case_factory, changes_factory, evidenceformat_factory, message_factory, overrides_embed, immune_embed, guild_embed, addrole_embed +from aurora.utilities.factory import ( + addrole_embed, + case_factory, + changes_factory, + evidenceformat_factory, + guild_embed, + immune_embed, + message_factory, + overrides_embed, +) from aurora.utilities.logger import logger -from aurora.utilities.utils import convert_timedelta_to_str, check_moddable, check_permissions, fetch_channel_dict, fetch_user_dict, generate_dict, log, send_evidenceformat +from aurora.utilities.utils import ( + check_moddable, + check_permissions, + convert_timedelta_to_str, + fetch_channel_dict, + fetch_user_dict, + generate_dict, + log, + send_evidenceformat, +) class Aurora(commands.Cog): @@ -68,14 +86,16 @@ class Aurora(commands.Cog): if requester == "user_strict": await config.user_from_id(user_id).clear() else: - logger.warning("Invalid requester passed to red_delete_data_for_user: %s", requester) + logger.warning( + "Invalid requester passed to red_delete_data_for_user: %s", requester + ) def __init__(self, bot: Red): super().__init__() self.bot = bot register_config(config) disable_dateutil() - self.handle_expiry.start() # pylint: disable=no-member + self.handle_expiry.start() # pylint: disable=no-member async def cog_load(self): """This method prepares the database schema for all of the guilds the bot is currently in.""" @@ -90,9 +110,9 @@ class Aurora(commands.Cog): return async def cog_unload(self): - self.handle_expiry.cancel() # pylint: disable=no-member + self.handle_expiry.cancel() # pylint: disable=no-member - @commands.Cog.listener('on_guild_join') + @commands.Cog.listener("on_guild_join") async def db_generate_guild_join(self, guild: discord.Guild): """This method prepares the database schema whenever the bot joins a guild.""" if not await self.bot.cog_disabled_in_guild(self, guild): @@ -101,7 +121,7 @@ class Aurora(commands.Cog): except ConnectionRefusedError: return - @commands.Cog.listener('on_audit_log_entry_create') + @commands.Cog.listener("on_audit_log_entry_create") async def autologger(self, entry: discord.AuditLogEntry): """This method automatically logs moderations done by users manually ("right clicks").""" if not await self.bot.cog_disabled_in_guild(self, entry.guild): @@ -121,34 +141,53 @@ class Aurora(commands.Cog): reason = "This action was performed without the bot." if entry.action == discord.AuditLogAction.kick: - moderation_type = 'KICK' + moderation_type = "KICK" elif entry.action == discord.AuditLogAction.ban: - moderation_type = 'BAN' + moderation_type = "BAN" elif entry.action == discord.AuditLogAction.unban: - moderation_type = 'UNBAN' + moderation_type = "UNBAN" elif entry.action == discord.AuditLogAction.member_update: if entry.after.timed_out_until is not None: - timed_out_until_aware = entry.after.timed_out_until.replace(tzinfo=timezone.utc) - duration_datetime = timed_out_until_aware - datetime.now(tz=timezone.utc) + timed_out_until_aware = entry.after.timed_out_until.replace( + tzinfo=timezone.utc + ) + duration_datetime = timed_out_until_aware - datetime.now( + tz=timezone.utc + ) minutes = round(duration_datetime.total_seconds() / 60) duration = timedelta(minutes=minutes) - moderation_type = 'MUTE' + moderation_type = "MUTE" else: - moderation_type = 'UNMUTE' + moderation_type = "UNMUTE" else: return - await mysql_log(entry.guild.id, entry.user.id, moderation_type, 'USER', entry.target.id, 0, duration, reason) + await mysql_log( + entry.guild.id, + entry.user.id, + moderation_type, + "USER", + entry.target.id, + 0, + duration, + reason, + ) ####################################################################################################################### ### COMMANDS ####################################################################################################################### @app_commands.command(name="note") - async def note(self, interaction: discord.Interaction, target: discord.User, reason: str, silent: bool = None): + async def note( + self, + interaction: discord.Interaction, + target: discord.User, + reason: str, + silent: bool = None, + ): """Add a note to a user. Parameters @@ -159,29 +198,55 @@ class Aurora(commands.Cog): Why are you noting this user? silent: bool Should the user be messaged?""" - if not await check_moddable(target, interaction, ['moderate_members']): + if not await check_moddable(target, interaction, ["moderate_members"]): return - await interaction.response.send_message(content=f"{target.mention} has recieved a note!\n**Reason** - `{reason}`") + await interaction.response.send_message( + content=f"{target.mention} has recieved a note!\n**Reason** - `{reason}`" + ) if silent is None: silent = not await config.guild(interaction.guild).dm_users() if silent is False: try: - embed = await message_factory(await self.bot.get_embed_color(interaction.channel), guild=interaction.guild, moderator=interaction.user, reason=reason, moderation_type='note', response=await interaction.original_response()) + embed = await message_factory( + await self.bot.get_embed_color(interaction.channel), + guild=interaction.guild, + moderator=interaction.user, + reason=reason, + moderation_type="note", + response=await interaction.original_response(), + ) await target.send(embed=embed) except discord.errors.HTTPException: pass - moderation_id = await mysql_log(interaction.guild.id, interaction.user.id, 'NOTE', 'USER', target.id, 0, 'NULL', reason) - await interaction.edit_original_response(content=f"{target.mention} has received a note! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`") + moderation_id = await mysql_log( + interaction.guild.id, + interaction.user.id, + "NOTE", + "USER", + target.id, + 0, + "NULL", + reason, + ) + await interaction.edit_original_response( + content=f"{target.mention} has received a note! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`" + ) await log(interaction, moderation_id) case = await fetch_case(moderation_id, interaction.guild.id) await send_evidenceformat(interaction, case) @app_commands.command(name="warn") - async def warn(self, interaction: discord.Interaction, target: discord.Member, reason: str, silent: bool = None): + async def warn( + self, + interaction: discord.Interaction, + target: discord.Member, + reason: str, + silent: bool = None, + ): """Warn a user. Parameters @@ -192,89 +257,172 @@ class Aurora(commands.Cog): Why are you warning this user? silent: bool Should the user be messaged?""" - if not await check_moddable(target, interaction, ['moderate_members']): + if not await check_moddable(target, interaction, ["moderate_members"]): return - await interaction.response.send_message(content=f"{target.mention} has been warned!\n**Reason** - `{reason}`") + await interaction.response.send_message( + content=f"{target.mention} has been warned!\n**Reason** - `{reason}`" + ) if silent is None: silent = not await config.guild(interaction.guild).dm_users() if silent is False: try: - embed = await message_factory(await self.bot.get_embed_color(interaction.channel), guild=interaction.guild, moderator=interaction.user, reason=reason, moderation_type='warned', response=await interaction.original_response()) + embed = await message_factory( + await self.bot.get_embed_color(interaction.channel), + guild=interaction.guild, + moderator=interaction.user, + reason=reason, + moderation_type="warned", + response=await interaction.original_response(), + ) await target.send(embed=embed) except discord.errors.HTTPException: pass - moderation_id = await mysql_log(interaction.guild.id, interaction.user.id, 'WARN', 'USER', target.id, 0, 'NULL', reason) - await interaction.edit_original_response(content=f"{target.mention} has been warned! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`") + moderation_id = await mysql_log( + interaction.guild.id, + interaction.user.id, + "WARN", + "USER", + target.id, + 0, + "NULL", + reason, + ) + await interaction.edit_original_response( + content=f"{target.mention} has been warned! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`" + ) await log(interaction, moderation_id) case = await fetch_case(moderation_id, interaction.guild.id) await send_evidenceformat(interaction, case) - @app_commands.command(name="blacklist") - async def blacklist(self, interaction: discord.Interaction, target: discord.Member, role: str, reason: str, duration: str = None, silent: bool = None): - """Add a blacklist role to a user. + @app_commands.command(name="addrole") + async def addrole( + self, + interaction: discord.Interaction, + target: discord.Member, + role: str, + reason: str, + duration: str = None, + silent: bool = None, + ): + """Add a role to a user. Parameters ----------- target: discord.Member - Who are you blacklisting? + Who are you adding a role to? role: str - What blacklist role are you applying to the target? + What role are you adding to the target? reason: str - Why are you blacklisting this user? + Why are you adding a role to this user? duration: str - How long are you blacklisting this user for? + How long are you adding this role for? silent: bool Should the user be messaged?""" - blacklist_roles = await config.guild(interaction.guild).blacklist_roles() + addrole_whitelist = await config.guild(interaction.guild).addrole_whitelist() - if not blacklist_roles: - await interaction.response.send_message(content=error("There are no blacklist roles set for this server!"), ephemeral=True) + if not addrole_whitelist: + await interaction.response.send_message( + content=error("There are no whitelisted roles set for this server!"), + ephemeral=True, + ) return + if duration is not None: + try: + parsed_time = parse( + sval=duration, as_timedelta=True, raise_exception=True + ) + except ValueError: + await interaction.response.send_message( + error("Please provide a valid duration!"), ephemeral=True + ) + return + else: + parsed_time = "NULL" + matching_role = None - for role_dict in blacklist_roles: - if role_dict['id'] == role: - matching_role = role_dict + for role_id in addrole_whitelist: + if role_id == role: + matching_role = role_id break if not matching_role: - await interaction.response.send_message(content=error("Please provide a valid blacklist role!"), ephemeral=True) + await interaction.response.send_message( + content=error("That role isn't whitelisted!"), ephemeral=True + ) return - if not await check_moddable(target, interaction, ['moderate_members', 'manage_roles']): + if not await check_moddable( + target, interaction, ["moderate_members", "manage_roles"] + ): return if role in [role.id for role in target.roles]: - await interaction.response.send_message(content=error(f"{target.mention} already has the blacklist role!"), ephemeral=True) + await interaction.response.send_message( + content=error(f"{target.mention} already has this role!"), + ephemeral=True, + ) return + role_obj = interaction.guild.get_role(role) + if silent is None: silent = not await config.guild(interaction.guild).dm_users() if silent is False: try: - embed = await message_factory(await self.bot.get_embed_color(interaction.channel), guild=interaction.guild, moderator=interaction.user, reason=reason, moderation_type='blacklisted', response=await interaction.original_response(), duration=matching_role['duration']) + embed = await message_factory( + await self.bot.get_embed_color(interaction.channel), + guild=interaction.guild, + moderator=interaction.user, + reason=reason, + moderation_type=f"`{role_obj.name}` role", + response=await interaction.original_response(), + duration=parsed_time, + ) await target.send(embed=embed) except discord.errors.HTTPException: pass - role_obj = interaction.guild.get_role(role) - await target.add_roles(role, reason=f"Blacklisted by {interaction.user.id} for {humanize.precisedelta(duration)} for: {reason}") - await interaction.response.send_message(content=f"{target.mention} has been blacklisted with the {role_obj.name} role for {humanize.precisedelta(matching_role['duration'])}!\n**Reason** - `{reason}`") + await target.add_roles( + role, + reason=f"Role added by {interaction.user.id}{(' for ' + {humanize.precisedelta(parsed_time)} if parsed_time != 'NULL' else '')} for: {reason}", + ) + await interaction.response.send_message( + content=f"{target.mention} has been given the {role_obj.name} role{(' for ' + {humanize.precisedelta(parsed_time)} if parsed_time != 'NULL' else '')}!\n**Reason** - `{reason}`" + ) - moderation_id = await mysql_log(interaction.guild.id, interaction.user.id, 'BLACKLIST', 'USER', target.id, role, duration, reason) - await interaction.edit_original_response(content=f"{target.mention} has been blacklisted with the {role_obj.name} role for {humanize.precisedelta(matching_role['duration'])}! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`") + moderation_id = await mysql_log( + interaction.guild.id, + interaction.user.id, + "ADDROLE", + "USER", + target.id, + role, + duration, + reason, + ) + await interaction.edit_original_response( + content=f"{target.mention} has been given the {role_obj.name} role{(' for ' + {humanize.precisedelta(parsed_time)} if parsed_time != 'NULL' else '')}! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`" + ) await log(interaction, moderation_id) case = await fetch_case(moderation_id, interaction.guild.id) await send_evidenceformat(interaction, case) @app_commands.command(name="mute") - async def mute(self, interaction: discord.Interaction, target: discord.Member, duration: str, reason: str, silent: bool = None): + async def mute( + self, + interaction: discord.Interaction, + target: discord.Member, + duration: str, + reason: str, + silent: bool = None, + ): """Mute a user. Parameters @@ -287,45 +435,82 @@ class Aurora(commands.Cog): Why are you unbanning this user? silent: bool Should the user be messaged?""" - if not await check_moddable(target, interaction, ['moderate_members']): + if not await check_moddable(target, interaction, ["moderate_members"]): return if target.is_timed_out() is True: - await interaction.response.send_message(error(f"{target.mention} is already muted!"), allowed_mentions=discord.AllowedMentions(users=False), ephemeral=True) + await interaction.response.send_message( + error(f"{target.mention} is already muted!"), + allowed_mentions=discord.AllowedMentions(users=False), + ephemeral=True, + ) return try: parsed_time = parse(sval=duration, as_timedelta=True, raise_exception=True) except ValueError: - await interaction.response.send_message(error("Please provide a valid duration!"), ephemeral=True) + await interaction.response.send_message( + error("Please provide a valid duration!"), ephemeral=True + ) return if parsed_time.total_seconds() / 1000 > 2419200000: - await interaction.response.send_message(error("Please provide a duration that is less than 28 days.")) + await interaction.response.send_message( + error("Please provide a duration that is less than 28 days.") + ) return - await target.timeout(parsed_time, reason=f"Muted by {interaction.user.id} for: {reason}") + await target.timeout( + parsed_time, reason=f"Muted by {interaction.user.id} for: {reason}" + ) - await interaction.response.send_message(content=f"{target.mention} has been muted for {humanize.precisedelta(parsed_time)}!\n**Reason** - `{reason}`") + await interaction.response.send_message( + content=f"{target.mention} has been muted for {humanize.precisedelta(parsed_time)}!\n**Reason** - `{reason}`" + ) if silent is None: silent = not await config.guild(interaction.guild).dm_users() if silent is False: try: - embed = await message_factory(await self.bot.get_embed_color(interaction.channel), guild=interaction.guild, moderator=interaction.user, reason=reason, moderation_type='muted', response=await interaction.original_response(), duration=parsed_time) + embed = await message_factory( + await self.bot.get_embed_color(interaction.channel), + guild=interaction.guild, + moderator=interaction.user, + reason=reason, + moderation_type="muted", + response=await interaction.original_response(), + duration=parsed_time, + ) await target.send(embed=embed) except discord.errors.HTTPException: pass - moderation_id = await mysql_log(interaction.guild.id, interaction.user.id, 'MUTE', 'USER', target.id, 0, parsed_time, reason) - await interaction.edit_original_response(content=f"{target.mention} has been muted for {humanize.precisedelta(parsed_time)}! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`") + moderation_id = await mysql_log( + interaction.guild.id, + interaction.user.id, + "MUTE", + "USER", + target.id, + 0, + parsed_time, + reason, + ) + await interaction.edit_original_response( + content=f"{target.mention} has been muted for {humanize.precisedelta(parsed_time)}! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`" + ) await log(interaction, moderation_id) case = await fetch_case(moderation_id, interaction.guild.id) await send_evidenceformat(interaction, case) @app_commands.command(name="unmute") - async def unmute(self, interaction: discord.Interaction, target: discord.Member, reason: str = None, silent: bool = None): + async def unmute( + self, + interaction: discord.Interaction, + target: discord.Member, + reason: str = None, + silent: bool = None, + ): """Unmute a user. Parameters @@ -336,39 +521,71 @@ class Aurora(commands.Cog): Why are you unmuting this user? silent: bool Should the user be messaged?""" - if not await check_moddable(target, interaction, ['moderate_members']): + if not await check_moddable(target, interaction, ["moderate_members"]): return if target.is_timed_out() is False: - await interaction.response.send_message(error(f"{target.mention} is not muted!"), allowed_mentions=discord.AllowedMentions(users=False), ephemeral=True) + await interaction.response.send_message( + error(f"{target.mention} is not muted!"), + allowed_mentions=discord.AllowedMentions(users=False), + ephemeral=True, + ) return if reason: - await target.timeout(None, reason=f"Unmuted by {interaction.user.id} for: {reason}") + await target.timeout( + None, reason=f"Unmuted by {interaction.user.id} for: {reason}" + ) else: await target.timeout(None, reason=f"Unbanned by {interaction.user.id}") reason = "No reason given." - await interaction.response.send_message(content=f"{target.mention} has been unmuted!\n**Reason** - `{reason}`") + await interaction.response.send_message( + content=f"{target.mention} has been unmuted!\n**Reason** - `{reason}`" + ) if silent is None: silent = not await config.guild(interaction.guild).dm_users() if silent is False: try: - embed = await message_factory(await self.bot.get_embed_color(interaction.channel), guild=interaction.guild, moderator=interaction.user, reason=reason, moderation_type='unmuted', response=await interaction.original_response()) + embed = await message_factory( + await self.bot.get_embed_color(interaction.channel), + guild=interaction.guild, + moderator=interaction.user, + reason=reason, + moderation_type="unmuted", + response=await interaction.original_response(), + ) await target.send(embed=embed) except discord.errors.HTTPException: pass - moderation_id = await mysql_log(interaction.guild.id, interaction.user.id, 'UNMUTE', 'USER', target.id, 0, 'NULL', reason) - await interaction.edit_original_response(content=f"{target.mention} has been unmuted! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`") + moderation_id = await mysql_log( + interaction.guild.id, + interaction.user.id, + "UNMUTE", + "USER", + target.id, + 0, + "NULL", + reason, + ) + await interaction.edit_original_response( + content=f"{target.mention} has been unmuted! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`" + ) await log(interaction, moderation_id) case = await fetch_case(moderation_id, interaction.guild.id) await send_evidenceformat(interaction, case) @app_commands.command(name="kick") - async def kick(self, interaction: discord.Interaction, target: discord.Member, reason: str, silent: bool = None): + async def kick( + self, + interaction: discord.Interaction, + target: discord.Member, + reason: str, + silent: bool = None, + ): """Kick a user. Parameters @@ -379,39 +596,69 @@ class Aurora(commands.Cog): Why are you kicking this user? silent: bool Should the user be messaged?""" - if not await check_moddable(target, interaction, ['kick_members']): + if not await check_moddable(target, interaction, ["kick_members"]): return - await interaction.response.send_message(content=f"{target.mention} has been kicked!\n**Reason** - `{reason}`") + await interaction.response.send_message( + content=f"{target.mention} has been kicked!\n**Reason** - `{reason}`" + ) if silent is None: silent = not await config.guild(interaction.guild).dm_users() if silent is False: try: - embed = await message_factory(await self.bot.get_embed_color(interaction.channel), guild=interaction.guild, moderator=interaction.user, reason=reason, moderation_type='kicked', response=await interaction.original_response()) + embed = await message_factory( + await self.bot.get_embed_color(interaction.channel), + guild=interaction.guild, + moderator=interaction.user, + reason=reason, + moderation_type="kicked", + response=await interaction.original_response(), + ) await target.send(embed=embed) except discord.errors.HTTPException: pass await target.kick(reason=f"Kicked by {interaction.user.id} for: {reason}") - moderation_id = await mysql_log(interaction.guild.id, interaction.user.id, 'KICK', 'USER', target.id, 0, 'NULL', reason) - await interaction.edit_original_response(content=f"{target.mention} has been kicked! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`") + moderation_id = await mysql_log( + interaction.guild.id, + interaction.user.id, + "KICK", + "USER", + target.id, + 0, + "NULL", + reason, + ) + await interaction.edit_original_response( + content=f"{target.mention} has been kicked! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`" + ) await log(interaction, moderation_id) case = await fetch_case(moderation_id, interaction.guild.id) await send_evidenceformat(interaction, case) @app_commands.command(name="ban") - @app_commands.choices(delete_messages=[ - Choice(name="None", value=0), - Choice(name='1 Hour', value=3600), - Choice(name='12 Hours', value=43200), - Choice(name='1 Day', value=86400), - Choice(name='3 Days', value=259200), - Choice(name='7 Days', value=604800), - ]) - async def ban(self, interaction: discord.Interaction, target: discord.User, reason: str, duration: str = None, delete_messages: Choice[int] = None, silent: bool = None): + @app_commands.choices( + delete_messages=[ + Choice(name="None", value=0), + Choice(name="1 Hour", value=3600), + Choice(name="12 Hours", value=43200), + Choice(name="1 Day", value=86400), + Choice(name="3 Days", value=259200), + Choice(name="7 Days", value=604800), + ] + ) + async def ban( + self, + interaction: discord.Interaction, + target: discord.User, + reason: str, + duration: str = None, + delete_messages: Choice[int] = None, + silent: bool = None, + ): """Ban a user. Parameters @@ -426,7 +673,7 @@ class Aurora(commands.Cog): How many days of messages to delete? silent: bool Should the user be messaged?""" - if not await check_moddable(target, interaction, ['ban_members']): + if not await check_moddable(target, interaction, ["ban_members"]): return if delete_messages is None: @@ -436,57 +683,118 @@ class Aurora(commands.Cog): try: await interaction.guild.fetch_ban(target) - await interaction.response.send_message(content=error(f"{target.mention} is already banned!"), ephemeral=True) + await interaction.response.send_message( + content=error(f"{target.mention} is already banned!"), ephemeral=True + ) return except discord.errors.NotFound: pass if duration: try: - parsed_time = parse(sval=duration, as_timedelta=True, raise_exception=True) + parsed_time = parse( + sval=duration, as_timedelta=True, raise_exception=True + ) except ValueError: - await interaction.response.send_message(error("Please provide a valid duration!"), ephemeral=True) + await interaction.response.send_message( + error("Please provide a valid duration!"), ephemeral=True + ) return - await interaction.response.send_message(content=f"{target.mention} has been banned for {humanize.precisedelta(parsed_time)}!\n**Reason** - `{reason}`") + await interaction.response.send_message( + content=f"{target.mention} has been banned for {humanize.precisedelta(parsed_time)}!\n**Reason** - `{reason}`" + ) try: - embed = await message_factory(await self.bot.get_embed_color(interaction.channel) , guild=interaction.guild, moderator=interaction.user, reason=reason, moderation_type='tempbanned', response=await interaction.original_response(), duration=parsed_time) + embed = await message_factory( + await self.bot.get_embed_color(interaction.channel), + guild=interaction.guild, + moderator=interaction.user, + reason=reason, + moderation_type="tempbanned", + response=await interaction.original_response(), + duration=parsed_time, + ) await target.send(embed=embed) except discord.errors.HTTPException: pass - await interaction.guild.ban(target, reason=f"Tempbanned by {interaction.user.id} for: {reason} (Duration: {parsed_time})", delete_message_seconds=delete_messages_seconds) + await interaction.guild.ban( + target, + reason=f"Tempbanned by {interaction.user.id} for: {reason} (Duration: {parsed_time})", + delete_message_seconds=delete_messages_seconds, + ) - moderation_id = await mysql_log(interaction.guild.id, interaction.user.id, 'TEMPBAN', 'USER', target.id, 0, parsed_time, reason) - await interaction.edit_original_response(content=f"{target.mention} has been banned for {humanize.precisedelta(parsed_time)}! (Case `#{moderation_id}`)\n**Reason** - `{reason}`") + moderation_id = await mysql_log( + interaction.guild.id, + interaction.user.id, + "TEMPBAN", + "USER", + target.id, + 0, + parsed_time, + reason, + ) + await interaction.edit_original_response( + content=f"{target.mention} has been banned for {humanize.precisedelta(parsed_time)}! (Case `#{moderation_id}`)\n**Reason** - `{reason}`" + ) await log(interaction, moderation_id) case = await fetch_case(moderation_id, interaction.guild.id) await send_evidenceformat(interaction, case) else: - await interaction.response.send_message(content=f"{target.mention} has been banned!\n**Reason** - `{reason}`") + await interaction.response.send_message( + content=f"{target.mention} has been banned!\n**Reason** - `{reason}`" + ) if silent is None: silent = not await config.guild(interaction.guild).dm_users() if silent is False: try: - embed = embed = await message_factory(await self.bot.get_embed_color(interaction.channel), guild=interaction.guild, moderator=interaction.user, reason=reason, moderation_type='banned', response=await interaction.original_response()) + embed = embed = await message_factory( + await self.bot.get_embed_color(interaction.channel), + guild=interaction.guild, + moderator=interaction.user, + reason=reason, + moderation_type="banned", + response=await interaction.original_response(), + ) await target.send(embed=embed) except discord.errors.HTTPException: pass - await interaction.guild.ban(target, reason=f"Banned by {interaction.user.id} for: {reason}", delete_message_seconds=delete_messages_seconds) + await interaction.guild.ban( + target, + reason=f"Banned by {interaction.user.id} for: {reason}", + delete_message_seconds=delete_messages_seconds, + ) - moderation_id = await mysql_log(interaction.guild.id, interaction.user.id, 'BAN', 'USER', target.id, 0, 'NULL', reason) - await interaction.edit_original_response(content=f"{target.mention} has been banned! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`") + moderation_id = await mysql_log( + interaction.guild.id, + interaction.user.id, + "BAN", + "USER", + target.id, + 0, + "NULL", + reason, + ) + await interaction.edit_original_response( + content=f"{target.mention} has been banned! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`" + ) await log(interaction, moderation_id) case = await fetch_case(moderation_id, interaction.guild.id) await send_evidenceformat(interaction, case) @app_commands.command(name="unban") - async def unban(self, interaction: discord.Interaction, target: discord.User, reason: str = None, silent: bool = None): + async def unban( + self, + interaction: discord.Interaction, + target: discord.User, + reason: str = None, + silent: bool = None, + ): """Unban a user. Parameters @@ -497,41 +805,77 @@ class Aurora(commands.Cog): Why are you unbanning this user? silent: bool Should the user be messaged?""" - if not await check_moddable(target, interaction, ['ban_members']): + if not await check_moddable(target, interaction, ["ban_members"]): return try: await interaction.guild.fetch_ban(target) except discord.errors.NotFound: - await interaction.response.send_message(content=error(f"{target.mention} is not banned!"), ephemeral=True) + await interaction.response.send_message( + content=error(f"{target.mention} is not banned!"), ephemeral=True + ) return if reason: - await interaction.guild.unban(target, reason=f"Unbanned by {interaction.user.id} for: {reason}") + await interaction.guild.unban( + target, reason=f"Unbanned by {interaction.user.id} for: {reason}" + ) else: - await interaction.guild.unban(target, reason=f"Unbanned by {interaction.user.id}") + await interaction.guild.unban( + target, reason=f"Unbanned by {interaction.user.id}" + ) reason = "No reason given." - await interaction.response.send_message(content=f"{target.mention} has been unbanned!\n**Reason** - `{reason}`") + await interaction.response.send_message( + content=f"{target.mention} has been unbanned!\n**Reason** - `{reason}`" + ) if silent is None: silent = not await config.guild(interaction.guild).dm_users() if silent is False: try: - embed = await message_factory(await self.bot.get_embed_color(interaction.channel), guild=interaction.guild, moderator=interaction.user, reason=reason, moderation_type='unbanned', response=await interaction.original_response()) + embed = await message_factory( + await self.bot.get_embed_color(interaction.channel), + guild=interaction.guild, + moderator=interaction.user, + reason=reason, + moderation_type="unbanned", + response=await interaction.original_response(), + ) await target.send(embed=embed) except discord.errors.HTTPException: pass - moderation_id = await mysql_log(interaction.guild.id, interaction.user.id, 'UNBAN', 'USER', target.id, 0, 'NULL', reason) - await interaction.edit_original_response(content=f"{target.mention} has been unbanned! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`") + moderation_id = await mysql_log( + interaction.guild.id, + interaction.user.id, + "UNBAN", + "USER", + target.id, + 0, + "NULL", + reason, + ) + await interaction.edit_original_response( + content=f"{target.mention} has been unbanned! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`" + ) await log(interaction, moderation_id) case = await fetch_case(moderation_id, interaction.guild.id) await send_evidenceformat(interaction, case) @app_commands.command(name="history") - async def history(self, interaction: discord.Interaction, target: discord.User = None, moderator: discord.User = None, pagesize: app_commands.Range[int, 1, 20] = None, page: int = 1, ephemeral: bool = None, inline: bool = None, export: bool = False): + async def history( + self, + interaction: discord.Interaction, + target: discord.User = None, + moderator: discord.User = None, + pagesize: app_commands.Range[int, 1, 20] = None, + page: int = 1, + ephemeral: bool = None, + inline: bool = None, + export: bool = False, + ): """List previous infractions. Parameters @@ -551,31 +895,45 @@ class Aurora(commands.Cog): export: bool Exports the server's entire moderation history to a JSON file""" if ephemeral is None: - ephemeral = (await config.user(interaction.user).history_ephemeral() + ephemeral = ( + await config.user(interaction.user).history_ephemeral() or await config.guild(interaction.guild).history_ephemeral() - or False) + or False + ) if inline is None: - inline = (await config.user(interaction.user).history_inline() + inline = ( + await config.user(interaction.user).history_inline() or await config.guild(interaction.guild).history_inline() - or False) + or False + ) if pagesize is None: if inline is True: - pagesize = (await config.user(interaction.user).history_inline_pagesize() + pagesize = ( + await config.user(interaction.user).history_inline_pagesize() or await config.guild(interaction.guild).history_inline_pagesize() - or 6) + or 6 + ) else: - pagesize = (await config.user(interaction.user).history_pagesize() + pagesize = ( + await config.user(interaction.user).history_pagesize() or await config.guild(interaction.guild).history_pagesize() - or 5) - + or 5 + ) await interaction.response.defer(ephemeral=ephemeral) - permissions = check_permissions(interaction.client.user, ['embed_links'], interaction) + permissions = check_permissions( + interaction.client.user, ["embed_links"], interaction + ) if permissions: - await interaction.followup.send(error(f"I do not have the `{permissions}` permission, required for this action."), ephemeral=True) + await interaction.followup.send( + error( + f"I do not have the `{permissions}` permission, required for this action." + ), + ephemeral=True, + ) return database = connect() @@ -597,16 +955,31 @@ class Aurora(commands.Cog): cases.append(case) try: - filename = str(data_manager.cog_data_path(cog_instance=self)) + str(os.sep) + f"moderation_{interaction.guild.id}.json" + filename = ( + str(data_manager.cog_data_path(cog_instance=self)) + + str(os.sep) + + f"moderation_{interaction.guild.id}.json" + ) with open(filename, "w", encoding="utf-8") as f: json.dump(cases, f, indent=2) - await interaction.followup.send(file=discord.File(filename, f"moderation_{interaction.guild.id}.json"), ephemeral=ephemeral) + await interaction.followup.send( + file=discord.File( + filename, f"moderation_{interaction.guild.id}.json" + ), + ephemeral=ephemeral, + ) os.remove(filename) except json.JSONDecodeError as e: - await interaction.followup.send(content=error("An error occured while exporting the moderation history.\nError:\n") + box(e, 'py'), ephemeral=ephemeral) + await interaction.followup.send( + content=error( + "An error occured while exporting the moderation history.\nError:\n" + ) + + box(e, "py"), + ephemeral=ephemeral, + ) cursor.close() database.close() return @@ -636,7 +1009,7 @@ class Aurora(commands.Cog): for result in results: case_dict = generate_dict(result) - if case_dict['moderation_id'] == 0: + if case_dict["moderation_id"] == 0: continue result_dict_list.append(case_dict) @@ -646,45 +1019,74 @@ class Aurora(commands.Cog): end_index = page * pagesize embed = discord.Embed(color=await self.bot.get_embed_color(interaction.channel)) - embed.set_author(icon_url=interaction.guild.icon.url, name='Infraction History') - embed.set_footer(text=f"Page {page:,}/{page_quantity:,} | {case_quantity:,} Results") + embed.set_author(icon_url=interaction.guild.icon.url, name="Infraction History") + embed.set_footer( + text=f"Page {page:,}/{page_quantity:,} | {case_quantity:,} Results" + ) memory_dict = {} for case in result_dict_list[start_index:end_index]: - if case['target_id'] not in memory_dict: - if case['target_type'] == 'USER': - memory_dict[str(case['target_id'])] = await fetch_user_dict(interaction, case['target_id']) - elif case['target_type'] == 'CHANNEL': - memory_dict[str(case['target_id'])] = await fetch_channel_dict(interaction, case['target_id']) - target_user = memory_dict[str(case['target_id'])] + if case["target_id"] not in memory_dict: + if case["target_type"] == "USER": + memory_dict[str(case["target_id"])] = await fetch_user_dict( + interaction, case["target_id"] + ) + elif case["target_type"] == "CHANNEL": + memory_dict[str(case["target_id"])] = await fetch_channel_dict( + interaction, case["target_id"] + ) + target_user = memory_dict[str(case["target_id"])] - if case['target_type'] == 'USER': - target_name = f"`{target_user['name']}`" if target_user['discriminator'] == "0" else f"`{target_user['name']}#{target_user['discriminator']}`" - elif case['target_type'] == 'CHANNEL': + if case["target_type"] == "USER": + target_name = ( + f"`{target_user['name']}`" + if target_user["discriminator"] == "0" + else f"`{target_user['name']}#{target_user['discriminator']}`" + ) + elif case["target_type"] == "CHANNEL": target_name = f"`{target_user['mention']}`" - if case['moderator_id'] not in memory_dict: - memory_dict[str(case['moderator_id'])] = await fetch_user_dict(interaction, case['moderator_id']) - moderator_user = memory_dict[str(case['moderator_id'])] - moderator_name = f"`{moderator_user['name']}`" if moderator_user['discriminator'] == "0" else f"`{moderator_user['name']}#{moderator_user['discriminator']}`" + if case["moderator_id"] not in memory_dict: + memory_dict[str(case["moderator_id"])] = await fetch_user_dict( + interaction, case["moderator_id"] + ) + moderator_user = memory_dict[str(case["moderator_id"])] + moderator_name = ( + f"`{moderator_user['name']}`" + if moderator_user["discriminator"] == "0" + else f"`{moderator_user['name']}#{moderator_user['discriminator']}`" + ) field_name = f"Case #{case['moderation_id']:,} ({str.title(case['moderation_type'])})" field_value = f"**Target:** {target_name} ({target_user['id']})\n**Moderator:** {moderator_name} ({moderator_user['id']})" - if len(case['reason']) > 125: + if len(case["reason"]) > 125: field_value += f"\n**Reason:** `{str(case['reason'])[:125]}...`" else: field_value += f"\n**Reason:** `{str(case['reason'])}`" - if case['duration'] != 'NULL': - td = timedelta(**{unit: int(val) for unit, val in zip(["hours", "minutes", "seconds"], case["duration"].split(":"))}) - duration_embed = f"{humanize.precisedelta(td)} | " if bool(case['expired']) is False else f"{humanize.precisedelta(td)} | Expired" + if case["duration"] != "NULL": + td = timedelta( + **{ + unit: int(val) + for unit, val in zip( + ["hours", "minutes", "seconds"], case["duration"].split(":") + ) + } + ) + duration_embed = ( + f"{humanize.precisedelta(td)} | " + if bool(case["expired"]) is False + else f"{humanize.precisedelta(td)} | Expired" + ) field_value += f"\n**Duration:** {duration_embed}" - field_value += f"\n**Timestamp:** | " + field_value += ( + f"\n**Timestamp:** | " + ) - if bool(case['resolved']): + if bool(case["resolved"]): field_value += "\n**Resolved:** True" embed.add_field(name=field_name, value=field_value, inline=inline) @@ -692,7 +1094,9 @@ class Aurora(commands.Cog): await interaction.followup.send(embed=embed, ephemeral=ephemeral) @app_commands.command(name="resolve") - async def resolve(self, interaction: discord.Interaction, case: int, reason: str = None): + async def resolve( + self, interaction: discord.Interaction, case: int, reason: str = None + ): """Resolve a specific case. Parameters @@ -701,72 +1105,105 @@ class Aurora(commands.Cog): Case number of the case you're trying to resolve reason: str Reason for resolving case""" - permissions = check_permissions(interaction.client.user, ['embed_links', 'moderate_members', 'ban_members'], interaction) + permissions = check_permissions( + interaction.client.user, + ["embed_links", "moderate_members", "ban_members"], + interaction, + ) if permissions: - await interaction.response.send_message(error(f"I do not have the `{permissions}` permission, required for this action."), ephemeral=True) + await interaction.response.send_message( + error( + f"I do not have the `{permissions}` permission, required for this action." + ), + ephemeral=True, + ) return database = connect() cursor = database.cursor() - query_1 = f"SELECT * FROM moderation_{interaction.guild.id} WHERE moderation_id = ?;" + query_1 = ( + f"SELECT * FROM moderation_{interaction.guild.id} WHERE moderation_id = ?;" + ) cursor.execute(query_1, (case,)) result_1 = cursor.fetchone() if result_1 is None or case == 0: - await interaction.response.send_message(content=error(f"There is no moderation with a case number of {case}."), ephemeral=True) + await interaction.response.send_message( + content=error(f"There is no moderation with a case number of {case}."), + ephemeral=True, + ) return query_2 = f"SELECT * FROM moderation_{interaction.guild.id} WHERE moderation_id = ? AND resolved = 0;" cursor.execute(query_2, (case,)) result_2 = cursor.fetchone() if result_2 is None: - await interaction.response.send_message(content=error(f"This moderation has already been resolved!\nUse `/case {case}` for more information."), ephemeral=True) + await interaction.response.send_message( + content=error( + f"This moderation has already been resolved!\nUse `/case {case}` for more information." + ), + ephemeral=True, + ) return case_dict = generate_dict(result_2) if reason is None: reason = "No reason given." - changes: list = case_dict['changes'] + changes: list = case_dict["changes"] if len(changes) > 25: - await interaction.response.send_message(content=error("Due to limitations with Discord's embed system, you cannot edit a case more than 25 times."), ephemeral=True) + await interaction.response.send_message( + content=error( + "Due to limitations with Discord's embed system, you cannot edit a case more than 25 times." + ), + ephemeral=True, + ) return if not changes: changes.append( { - 'type': "ORIGINAL", - 'timestamp': case_dict['timestamp'], - 'reason': case_dict['reason'], - 'user_id': case_dict['moderator_id'] + "type": "ORIGINAL", + "timestamp": case_dict["timestamp"], + "reason": case_dict["reason"], + "user_id": case_dict["moderator_id"], } ) changes.append( { - 'type': "RESOLVE", - 'timestamp': int(time.time()), - 'reason': reason, - 'user_id': interaction.user.id + "type": "RESOLVE", + "timestamp": int(time.time()), + "reason": reason, + "user_id": interaction.user.id, } ) - if case_dict['moderation_type'] in ['UNMUTE', 'UNBAN']: - await interaction.response.send_message(content=error("You cannot resolve this type of moderation!"), ephemeral=True) + if case_dict["moderation_type"] in ["UNMUTE", "UNBAN"]: + await interaction.response.send_message( + content=error("You cannot resolve this type of moderation!"), + ephemeral=True, + ) return - if case_dict['moderation_type'] in ['MUTE', 'TEMPBAN', 'BAN']: - if case_dict['moderation_type'] == 'MUTE': + if case_dict["moderation_type"] in ["MUTE", "TEMPBAN", "BAN"]: + if case_dict["moderation_type"] == "MUTE": try: - member = await interaction.guild.fetch_member(case_dict['target_id']) + member = await interaction.guild.fetch_member( + case_dict["target_id"] + ) - await member.timeout(None, reason=f"Case #{case:,} resolved by {interaction.user.id}") + await member.timeout( + None, reason=f"Case #{case:,} resolved by {interaction.user.id}" + ) except discord.NotFound: pass - if case_dict['moderation_type'] in ['TEMPBAN', 'BAN']: + if case_dict["moderation_type"] in ["TEMPBAN", "BAN"]: try: - user = await interaction.client.fetch_user(case_dict['target_id']) + user = await interaction.client.fetch_user(case_dict["target_id"]) - await interaction.guild.unban(user, reason=f"Case #{case} resolved by {interaction.user.id}") + await interaction.guild.unban( + user, reason=f"Case #{case} resolved by {interaction.user.id}" + ) except discord.NotFound: pass @@ -774,22 +1211,45 @@ class Aurora(commands.Cog): else: resolve_query = f"UPDATE `moderation_{interaction.guild.id}` SET resolved = 1, changes = ?, resolved_by = ?, resolve_reason = ? WHERE moderation_id = ?" - cursor.execute(resolve_query, (json.dumps(changes), interaction.user.id, reason, case_dict['moderation_id'])) + cursor.execute( + resolve_query, + ( + json.dumps(changes), + interaction.user.id, + reason, + case_dict["moderation_id"], + ), + ) database.commit() - embed = await case_factory(interaction=interaction, case_dict=await fetch_case(case, interaction.guild.id)) - await interaction.response.send_message(content=f"✅ Moderation #{case:,} resolved!", embed=embed) + embed = await case_factory( + interaction=interaction, + case_dict=await fetch_case(case, interaction.guild.id), + ) + await interaction.response.send_message( + content=f"✅ Moderation #{case:,} resolved!", embed=embed + ) await log(interaction, case, resolved=True) cursor.close() database.close() @app_commands.command(name="case") - @app_commands.choices(export=[ - Choice(name='Export as File', value='file'), - Choice(name='Export as Codeblock', value='codeblock') - ]) - async def case(self, interaction: discord.Interaction, case: int, ephemeral: bool = None, evidenceformat: bool = False, changes: bool = False, export: Choice[str] = None): + @app_commands.choices( + export=[ + Choice(name="Export as File", value="file"), + Choice(name="Export as Codeblock", value="codeblock"), + ] + ) + async def case( + self, + interaction: discord.Interaction, + case: int, + ephemeral: bool = None, + evidenceformat: bool = False, + changes: bool = False, + export: Choice[str] = None, + ): """Check the details of a specific case. Parameters @@ -802,51 +1262,96 @@ class Aurora(commands.Cog): List the changes made to the case export: bool Export the case to a JSON file or codeblock""" - permissions = check_permissions(interaction.client.user, ['embed_links'], interaction) + permissions = check_permissions( + interaction.client.user, ["embed_links"], interaction + ) if permissions: - await interaction.response.send_message(error(f"I do not have the `{permissions}` permission, required for this action."), ephemeral=True) + await interaction.response.send_message( + error( + f"I do not have the `{permissions}` permission, required for this action." + ), + ephemeral=True, + ) return if ephemeral is None: - ephemeral = (await config.user(interaction.user).history_ephemeral() + ephemeral = ( + await config.user(interaction.user).history_ephemeral() or await config.guild(interaction.guild).history_ephemeral() - or False) + or False + ) if case != 0: case_dict = await fetch_case(case, interaction.guild.id) if case_dict: if export: - if export.value == 'file' or len(str(case_dict)) > 1800: - filename = str(data_manager.cog_data_path(cog_instance=self)) + str(os.sep) + f"moderation_{interaction.guild.id}_case_{case}.json" + if export.value == "file" or len(str(case_dict)) > 1800: + filename = ( + str(data_manager.cog_data_path(cog_instance=self)) + + str(os.sep) + + f"moderation_{interaction.guild.id}_case_{case}.json" + ) with open(filename, "w", encoding="utf-8") as f: json.dump(case_dict, f, indent=2) - if export.value == 'codeblock': - content = f"Case #{case:,} exported.\n" + warning("Case was too large to export as codeblock, so it has been uploaded as a `.json` file.") + if export.value == "codeblock": + content = f"Case #{case:,} exported.\n" + warning( + "Case was too large to export as codeblock, so it has been uploaded as a `.json` file." + ) else: content = f"Case #{case:,} exported." - await interaction.response.send_message(content=content, file=discord.File(filename, f"moderation_{interaction.guild.id}_case_{case}.json"), ephemeral=ephemeral) + await interaction.response.send_message( + content=content, + file=discord.File( + filename, + f"moderation_{interaction.guild.id}_case_{case}.json", + ), + ephemeral=ephemeral, + ) os.remove(filename) return - await interaction.response.send_message(content=box({json.dumps(case_dict, indent=2)}), ephemeral=ephemeral) + await interaction.response.send_message( + content=box({json.dumps(case_dict, indent=2)}), + ephemeral=ephemeral, + ) return if changes: - embed = await changes_factory(interaction=interaction, case_dict=case_dict) - await interaction.response.send_message(embed=embed, ephemeral=ephemeral) + embed = await changes_factory( + interaction=interaction, case_dict=case_dict + ) + await interaction.response.send_message( + embed=embed, ephemeral=ephemeral + ) elif evidenceformat: - content = await evidenceformat_factory(interaction=interaction, case_dict=case_dict) - await interaction.response.send_message(content=content, ephemeral=ephemeral) + content = await evidenceformat_factory( + interaction=interaction, case_dict=case_dict + ) + await interaction.response.send_message( + content=content, ephemeral=ephemeral + ) else: - embed = await case_factory(interaction=interaction, case_dict=case_dict) - await interaction.response.send_message(embed=embed, ephemeral=ephemeral) + embed = await case_factory( + interaction=interaction, case_dict=case_dict + ) + await interaction.response.send_message( + embed=embed, ephemeral=ephemeral + ) return - await interaction.response.send_message(content=f"No case with case number `{case}` found.", ephemeral=True) + await interaction.response.send_message( + content=f"No case with case number `{case}` found.", ephemeral=True + ) @app_commands.command(name="edit") - async def edit(self, interaction: discord.Interaction, case: int, reason: str, duration: str = None): + async def edit( + self, + interaction: discord.Interaction, + case: int, + reason: str, + duration: str = None, + ): """Edit the reason of a specific case. Parameters @@ -856,10 +1361,18 @@ class Aurora(commands.Cog): reason: str What is the new reason? duration: str - What is the new duration? Does not reapply the moderation if it has already expired.""" - permissions = check_permissions(interaction.client.user, ['embed_links'], interaction) + What is the new duration? Does not reapply the moderation if it has already expired. + """ + permissions = check_permissions( + interaction.client.user, ["embed_links"], interaction + ) if permissions: - await interaction.response.send_message(error(f"I do not have the `{permissions}` permission, required for this action."), ephemeral=True) + await interaction.response.send_message( + error( + f"I do not have the `{permissions}` permission, required for this action." + ), + ephemeral=True, + ) return if case != 0: @@ -868,60 +1381,80 @@ class Aurora(commands.Cog): if case_dict: if duration: try: - parsed_time = parse(sval=duration, as_timedelta=True, raise_exception=True) + parsed_time = parse( + sval=duration, as_timedelta=True, raise_exception=True + ) except ValueError: - await interaction.response.send_message(error("Please provide a valid duration!"), ephemeral=True) + await interaction.response.send_message( + error("Please provide a valid duration!"), ephemeral=True + ) return - end_timestamp = case_dict['timestamp'] + parsed_time.total_seconds() + end_timestamp = case_dict["timestamp"] + parsed_time.total_seconds() - if case_dict['moderation_type'] == 'MUTE': - if (time.time() - case_dict['timestamp']) + parsed_time.total_seconds() > 2419200: - await interaction.response.send_message(error("Please provide a duration that is less than 28 days from the initial moderation.")) + if case_dict["moderation_type"] == "MUTE": + if ( + time.time() - case_dict["timestamp"] + ) + parsed_time.total_seconds() > 2419200: + await interaction.response.send_message( + error( + "Please provide a duration that is less than 28 days from the initial moderation." + ) + ) return try: - member = await interaction.guild.fetch_member(case_dict['target_id']) + member = await interaction.guild.fetch_member( + case_dict["target_id"] + ) - await member.timeout(parsed_time, reason=f"Case #{case:,} edited by {interaction.user.id}") + await member.timeout( + parsed_time, + reason=f"Case #{case:,} edited by {interaction.user.id}", + ) except discord.NotFound: pass - changes: list = case_dict['changes'] + changes: list = case_dict["changes"] if len(changes) > 25: - await interaction.response.send_message(content=error("Due to limitations with Discord's embed system, you cannot edit a case more than 25 times."), ephemeral=True) + await interaction.response.send_message( + content=error( + "Due to limitations with Discord's embed system, you cannot edit a case more than 25 times." + ), + ephemeral=True, + ) return if not changes: changes.append( { - 'type': "ORIGINAL", - 'timestamp': case_dict['timestamp'], - 'reason': case_dict['reason'], - 'user_id': case_dict['moderator_id'], - 'duration': case_dict['duration'], - 'end_timestamp': case_dict['end_timestamp'] + "type": "ORIGINAL", + "timestamp": case_dict["timestamp"], + "reason": case_dict["reason"], + "user_id": case_dict["moderator_id"], + "duration": case_dict["duration"], + "end_timestamp": case_dict["end_timestamp"], } ) if parsed_time: changes.append( { - 'type': "EDIT", - 'timestamp': int(time.time()), - 'reason': reason, - 'user_id': interaction.user.id, - 'duration': convert_timedelta_to_str(parsed_time), - 'end_timestamp': end_timestamp + "type": "EDIT", + "timestamp": int(time.time()), + "reason": reason, + "user_id": interaction.user.id, + "duration": convert_timedelta_to_str(parsed_time), + "end_timestamp": end_timestamp, } ) else: changes.append( { - 'type': "EDIT", - 'timestamp': int(time.time()), - 'reason': reason, - 'user_id': interaction.user.id, - 'duration': case_dict['duration'], - 'end_timestamp': case_dict['end_timestamp'] + "type": "EDIT", + "timestamp": int(time.time()), + "reason": reason, + "user_id": interaction.user.id, + "duration": case_dict["duration"], + "end_timestamp": case_dict["end_timestamp"], } ) @@ -930,7 +1463,16 @@ class Aurora(commands.Cog): if parsed_time: update_query = f"UPDATE `moderation_{interaction.guild.id}` SET changes = ?, reason = ?, duration = ?, end_timestamp = ? WHERE moderation_id = ?" - cursor.execute(update_query, (json.dumps(changes), reason, convert_timedelta_to_str(parsed_time), end_timestamp, case)) + cursor.execute( + update_query, + ( + json.dumps(changes), + reason, + convert_timedelta_to_str(parsed_time), + end_timestamp, + case, + ), + ) else: update_query = f"UPDATE `moderation_{interaction.guild.id}` SET changes = ?, reason = ? WHERE moderation_id = ?" cursor.execute(update_query, (json.dumps(changes), reason, case)) @@ -939,13 +1481,19 @@ class Aurora(commands.Cog): new_case = await fetch_case(case, interaction.guild.id) embed = await case_factory(interaction=interaction, case_dict=new_case) - await interaction.response.send_message(content=f"✅ Moderation #{case:,} edited!", embed=embed, ephemeral=True) + await interaction.response.send_message( + content=f"✅ Moderation #{case:,} edited!", + embed=embed, + ephemeral=True, + ) await log(interaction, case) cursor.close() database.close() return - await interaction.response.send_message(content=error(f"No case with case number `{case}` found."), ephemeral=True) + await interaction.response.send_message( + content=error(f"No case with case number `{case}` found."), ephemeral=True + ) @tasks.loop(minutes=1) async def handle_expiry(self): @@ -973,21 +1521,49 @@ class Aurora(commands.Cog): num = 0 for target_id, moderation_id in zip(target_ids, moderation_ids): user: discord.User = await self.bot.fetch_user(target_id) - name = f"{user.name}#{user.discriminator}" if user.discriminator != "0" else user.name + name = ( + f"{user.name}#{user.discriminator}" + if user.discriminator != "0" + else user.name + ) try: - await guild.unban(user, reason=f"Automatic unban from case #{moderation_id}") + await guild.unban( + user, reason=f"Automatic unban from case #{moderation_id}" + ) - embed = await message_factory(await self.bot.get_embed_color(guild.channels[0]), guild=guild, reason=f'Automatic unban from case #{moderation_id}', moderation_type='unbanned') + embed = await message_factory( + await self.bot.get_embed_color(guild.channels[0]), + guild=guild, + reason=f"Automatic unban from case #{moderation_id}", + moderation_type="unbanned", + ) try: await user.send(embed=embed) except discord.errors.HTTPException: pass - logger.debug("Unbanned %s (%s) from %s (%s)", name, user.id, guild.name, guild.id) + logger.debug( + "Unbanned %s (%s) from %s (%s)", + name, + user.id, + guild.name, + guild.id, + ) num = num + 1 - except (discord.errors.NotFound, discord.errors.Forbidden, discord.errors.HTTPException) as e: - logger.error("Failed to unban %s (%s) from %s (%s)\n%s", name, user.id, guild.name, guild.id, e) + except ( + discord.errors.NotFound, + discord.errors.Forbidden, + discord.errors.HTTPException, + ) as e: + logger.error( + "Failed to unban %s (%s) from %s (%s)\n%s", + name, + user.id, + guild.name, + guild.id, + e, + ) expiry_query = f"UPDATE `moderation_{guild.id}` SET expired = 1 WHERE (end_timestamp != 0 AND end_timestamp <= ? AND expired = 0 AND moderation_type != 'BLACKLIST') OR (expired = 0 AND resolved = 1 AND moderation_type != 'BLACKLIST')" cursor.execute(expiry_query, (time.time(),)) @@ -1002,18 +1578,30 @@ class Aurora(commands.Cog): moderation_ids = [row[1] for row in result] role_ids = [row[2] for row in result] - for target_id, moderation_id, role_id in zip(target_ids, moderation_ids, role_ids): + for target_id, moderation_id, role_id in zip( + target_ids, moderation_ids, role_ids + ): try: # member: discord.Member = await guild.fetch_member(target_id) role: discord.Role = guild.get_role(role_id) if role is None: raise discord.errors.NotFound - except (discord.errors.NotFound, discord.errors.Forbidden, discord.errors.HTTPException): + except ( + discord.errors.NotFound, + discord.errors.Forbidden, + discord.errors.HTTPException, + ): continue per_guild_completion_time = (time.time() - time_per_guild) * 1000 - logger.debug("Completed expiry loop for %s (%s) in %sms with %s users unbanned", guild.name, guild.id, f"{per_guild_completion_time:.6f}", num) + logger.debug( + "Completed expiry loop for %s (%s) in %sms with %s users unbanned", + guild.name, + guild.id, + f"{per_guild_completion_time:.6f}", + num, + ) global_num = global_num + num database.commit() @@ -1021,11 +1609,15 @@ class Aurora(commands.Cog): database.close() completion_time = (time.time() - current_time) * 1000 - logger.debug("Completed expiry loop in %sms with %s users unbanned", f"{completion_time:.6f}", global_num) + logger.debug( + "Completed expiry loop in %sms with %s users unbanned", + f"{completion_time:.6f}", + global_num, + ) -######################################################################################################################## -### Configuration Commands # -######################################################################################################################## + ######################################################################################################################## + ### Configuration Commands # + ######################################################################################################################## @commands.group(autohelp=True, aliases=["moderation", "mod"]) async def aurora(self, ctx: commands.Context): @@ -1107,7 +1699,7 @@ class Aurora(commands.Cog): error("Please provide a valid GalacticBot moderation export file.") ) - @aurora.command(aliases=["tdc", 'td', 'timedeltaconvert']) + @aurora.command(aliases=["tdc", "td", "timedeltaconvert"]) async def timedelta(self, ctx: commands.Context, *, duration: str): """This command converts a duration to a [`timedelta`](https://docs.python.org/3/library/datetime.html#datetime.timedelta) Python object. diff --git a/aurora/utilities/factory.py b/aurora/utilities/factory.py index 4d22f39..34892f3 100644 --- a/aurora/utilities/factory.py +++ b/aurora/utilities/factory.py @@ -1,18 +1,32 @@ # pylint: disable=cyclic-import -from typing import Union from datetime import datetime, timedelta +from typing import Union import humanize -from discord import Color, Embed, Guild, Interaction, InteractionMessage, User, Member +from discord import Color, Embed, Guild, Interaction, InteractionMessage, Member, User from redbot.core import commands -from redbot.core.utils.chat_formatting import box, bold, error, warning +from redbot.core.utils.chat_formatting import bold, box, error, warning from aurora.utilities.config import config -from aurora.utilities.utils import fetch_channel_dict, fetch_user_dict, get_next_case_number, get_bool_emoji, get_pagesize_str +from aurora.utilities.utils import ( + fetch_channel_dict, + fetch_user_dict, + get_bool_emoji, + get_next_case_number, + get_pagesize_str, +) -async def message_factory(color: Color, guild: Guild, reason: str, moderation_type: str, moderator: Union[Member, User] = None, duration: timedelta = None, response: InteractionMessage = None) -> Embed: +async def message_factory( + color: Color, + guild: Guild, + reason: str, + moderation_type: str, + moderator: Union[Member, User] = None, + duration: timedelta = None, + response: InteractionMessage = None, +) -> Embed: """This function creates a message from set parameters, meant for contacting the moderated user. Args: @@ -27,7 +41,12 @@ async def message_factory(color: Color, guild: Guild, reason: str, moderation_ty Returns: embed: The message embed. """ - if response is not None and not moderation_type in ["kicked", "banned", "tempbanned", "unbanned"]: + if response is not None and not moderation_type in [ + "kicked", + "banned", + "tempbanned", + "unbanned", + ]: guild_name = f"[{guild.name}]({response.jump_url})" else: guild_name = guild.name @@ -39,26 +58,43 @@ async def message_factory(color: Color, guild: Guild, reason: str, moderation_ty if moderation_type == "note": embed_desc = "received a" + elif moderation_type == "role added": + embed_desc = "received the" + elif moderation_type == "role removed": + embed_desc = "lost the" else: embed_desc = "been" - embed = Embed(title=str.title(moderation_type), description=f"You have {embed_desc} {moderation_type}{embed_duration} in {guild_name}.", color=color, timestamp=datetime.now()) + embed = Embed( + title=str.title(moderation_type), + description=f"You have {embed_desc} {moderation_type}{embed_duration} in {guild_name}.", + color=color, + timestamp=datetime.now(), + ) if await config.guild(guild).show_moderator() and moderator is not None: - embed.add_field(name='Moderator', value=f"`{moderator.name} ({moderator.id})`", inline=False) + embed.add_field( + name="Moderator", value=f"`{moderator.name} ({moderator.id})`", inline=False + ) - embed.add_field(name='Reason', value=f"`{reason}`", inline=False) + embed.add_field(name="Reason", value=f"`{reason}`", inline=False) if guild.icon.url is not None: embed.set_author(name=guild.name, icon_url=guild.icon.url) else: embed.set_author(name=guild.name) - embed.set_footer(text=f"Case #{await get_next_case_number(guild.id):,}", icon_url="https://cdn.discordapp.com/attachments/1070822161389994054/1159469476773904414/arrow-right-circle-icon-512x512-2p1e2aaw.png?ex=65312319&is=651eae19&hm=3cebdd28e805c13a79ec48ef87c32ca532ffa6b9ede2e48d0cf8e5e81f3a6818&") + embed.set_footer( + text=f"Case #{await get_next_case_number(guild.id):,}", + icon_url="https://cdn.discordapp.com/attachments/1070822161389994054/1159469476773904414/arrow-right-circle-icon-512x512-2p1e2aaw.png?ex=65312319&is=651eae19&hm=3cebdd28e805c13a79ec48ef87c32ca532ffa6b9ede2e48d0cf8e5e81f3a6818&", + ) return embed -async def log_factory(interaction: Interaction, case_dict: dict, resolved: bool = False) -> Embed: + +async def log_factory( + interaction: Interaction, case_dict: dict, resolved: bool = False +) -> Embed: """This function creates a log embed from set parameters, meant for moderation logging. Args: @@ -67,57 +103,115 @@ async def log_factory(interaction: Interaction, case_dict: dict, resolved: bool resolved (bool, optional): Whether the case is resolved or not. Defaults to False. """ if resolved: - if case_dict['target_type'] == 'USER': - target_user = await fetch_user_dict(interaction, case_dict['target_id']) - target_name = f"`{target_user['name']}`" if target_user['discriminator'] == "0" else f"`{target_user['name']}#{target_user['discriminator']}`" - elif case_dict['target_type'] == 'CHANNEL': - target_user = await fetch_channel_dict(interaction, case_dict['target_id']) - if target_user['mention']: + if case_dict["target_type"] == "USER": + target_user = await fetch_user_dict(interaction, case_dict["target_id"]) + target_name = ( + f"`{target_user['name']}`" + if target_user["discriminator"] == "0" + else f"`{target_user['name']}#{target_user['discriminator']}`" + ) + elif case_dict["target_type"] == "CHANNEL": + target_user = await fetch_channel_dict(interaction, case_dict["target_id"]) + if target_user["mention"]: target_name = f"{target_user['mention']}" else: target_name = f"`{target_user['name']}`" - moderator_user = await fetch_user_dict(interaction, case_dict['moderator_id']) - moderator_name = f"`{moderator_user['name']}`" if moderator_user['discriminator'] == "0" else f"`{moderator_user['name']}#{moderator_user['discriminator']}`" + moderator_user = await fetch_user_dict(interaction, case_dict["moderator_id"]) + moderator_name = ( + f"`{moderator_user['name']}`" + if moderator_user["discriminator"] == "0" + else f"`{moderator_user['name']}#{moderator_user['discriminator']}`" + ) - embed = Embed(title=f"📕 Case #{case_dict['moderation_id']:,} Resolved", color=await interaction.client.get_embed_color(interaction.channel)) + embed = Embed( + title=f"📕 Case #{case_dict['moderation_id']:,} Resolved", + color=await interaction.client.get_embed_color(interaction.channel), + ) embed.description = f"**Type:** {str.title(case_dict['moderation_type'])}\n**Target:** {target_name} ({target_user['id']})\n**Moderator:** {moderator_name} ({moderator_user['id']})\n**Timestamp:** | " - if case_dict['duration'] != 'NULL': - td = timedelta(**{unit: int(val) for unit, val in zip(["hours", "minutes", "seconds"], case_dict["duration"].split(":"))}) - duration_embed = f"{humanize.precisedelta(td)} | " if case_dict["expired"] == '0' else str(humanize.precisedelta(td)) - embed.description = embed.description + f"\n**Duration:** {duration_embed}\n**Expired:** {bool(case_dict['expired'])}" + if case_dict["duration"] != "NULL": + td = timedelta( + **{ + unit: int(val) + for unit, val in zip( + ["hours", "minutes", "seconds"], + case_dict["duration"].split(":"), + ) + } + ) + duration_embed = ( + f"{humanize.precisedelta(td)} | " + if case_dict["expired"] == "0" + else str(humanize.precisedelta(td)) + ) + embed.description = ( + embed.description + + f"\n**Duration:** {duration_embed}\n**Expired:** {bool(case_dict['expired'])}" + ) - embed.add_field(name='Reason', value=box(case_dict['reason']), inline=False) + embed.add_field(name="Reason", value=box(case_dict["reason"]), inline=False) - resolved_user = await fetch_user_dict(interaction, case_dict['resolved_by']) - resolved_name = resolved_user['name'] if resolved_user['discriminator'] == "0" else f"{resolved_user['name']}#{resolved_user['discriminator']}" - embed.add_field(name='Resolve Reason', value=f"Resolved by `{resolved_name}` ({resolved_user['id']}) for:\n" + box(case_dict['resolve_reason']), inline=False) + resolved_user = await fetch_user_dict(interaction, case_dict["resolved_by"]) + resolved_name = ( + resolved_user["name"] + if resolved_user["discriminator"] == "0" + else f"{resolved_user['name']}#{resolved_user['discriminator']}" + ) + embed.add_field( + name="Resolve Reason", + value=f"Resolved by `{resolved_name}` ({resolved_user['id']}) for:\n" + + box(case_dict["resolve_reason"]), + inline=False, + ) else: - if case_dict['target_type'] == 'USER': - target_user = await fetch_user_dict(interaction, case_dict['target_id']) - target_name = f"`{target_user['name']}`" if target_user['discriminator'] == "0" else f"`{target_user['name']}#{target_user['discriminator']}`" - elif case_dict['target_type'] == 'CHANNEL': - target_user = await fetch_channel_dict(interaction, case_dict['target_id']) - if target_user['mention']: - target_name = target_user['mention'] + if case_dict["target_type"] == "USER": + target_user = await fetch_user_dict(interaction, case_dict["target_id"]) + target_name = ( + f"`{target_user['name']}`" + if target_user["discriminator"] == "0" + else f"`{target_user['name']}#{target_user['discriminator']}`" + ) + elif case_dict["target_type"] == "CHANNEL": + target_user = await fetch_channel_dict(interaction, case_dict["target_id"]) + if target_user["mention"]: + target_name = target_user["mention"] else: target_name = f"`{target_user['name']}`" - moderator_user = await fetch_user_dict(interaction, case_dict['moderator_id']) - moderator_name = f"`{moderator_user['name']}`" if moderator_user['discriminator'] == "0" else f"`{moderator_user['name']}#{moderator_user['discriminator']}`" + moderator_user = await fetch_user_dict(interaction, case_dict["moderator_id"]) + moderator_name = ( + f"`{moderator_user['name']}`" + if moderator_user["discriminator"] == "0" + else f"`{moderator_user['name']}#{moderator_user['discriminator']}`" + ) - embed = Embed(title=f"📕 Case #{case_dict['moderation_id']:,}", color=await interaction.client.get_embed_color(interaction.channel)) + embed = Embed( + title=f"📕 Case #{case_dict['moderation_id']:,}", + color=await interaction.client.get_embed_color(interaction.channel), + ) embed.description = f"**Type:** {str.title(case_dict['moderation_type'])}\n**Target:** {target_name} ({target_user['id']})\n**Moderator:** {moderator_name} ({moderator_user['id']})\n**Timestamp:** | " - if case_dict['duration'] != 'NULL': - td = timedelta(**{unit: int(val) for unit, val in zip(["hours", "minutes", "seconds"], case_dict["duration"].split(":"))}) - embed.description = embed.description + f"\n**Duration:** {humanize.precisedelta(td)} | " + if case_dict["duration"] != "NULL": + td = timedelta( + **{ + unit: int(val) + for unit, val in zip( + ["hours", "minutes", "seconds"], + case_dict["duration"].split(":"), + ) + } + ) + embed.description = ( + embed.description + + f"\n**Duration:** {humanize.precisedelta(td)} | " + ) - embed.add_field(name='Reason', value=box(case_dict['reason']), inline=False) + embed.add_field(name="Reason", value=box(case_dict["reason"]), inline=False) return embed + async def case_factory(interaction: Interaction, case_dict: dict) -> Embed: """This function creates a case embed from set parameters. @@ -125,42 +219,79 @@ async def case_factory(interaction: Interaction, case_dict: dict) -> Embed: interaction (Interaction): The interaction object. case_dict (dict): The case dictionary. """ - if case_dict['target_type'] == 'USER': - target_user = await fetch_user_dict(interaction, case_dict['target_id']) - target_name = f"`{target_user['name']}`" if target_user['discriminator'] == "0" else f"`{target_user['name']}#{target_user['discriminator']}`" - elif case_dict['target_type'] == 'CHANNEL': - target_user = await fetch_channel_dict(interaction, case_dict['target_id']) - if target_user['mention']: + if case_dict["target_type"] == "USER": + target_user = await fetch_user_dict(interaction, case_dict["target_id"]) + target_name = ( + f"`{target_user['name']}`" + if target_user["discriminator"] == "0" + else f"`{target_user['name']}#{target_user['discriminator']}`" + ) + elif case_dict["target_type"] == "CHANNEL": + target_user = await fetch_channel_dict(interaction, case_dict["target_id"]) + if target_user["mention"]: target_name = f"{target_user['mention']}" else: target_name = f"`{target_user['name']}`" - moderator_user = await fetch_user_dict(interaction, case_dict['moderator_id']) - moderator_name = f"`{moderator_user['name']}`" if moderator_user['discriminator'] == "0" else f"`{moderator_user['name']}#{moderator_user['discriminator']}`" + moderator_user = await fetch_user_dict(interaction, case_dict["moderator_id"]) + moderator_name = ( + f"`{moderator_user['name']}`" + if moderator_user["discriminator"] == "0" + else f"`{moderator_user['name']}#{moderator_user['discriminator']}`" + ) - embed = Embed(title=f"📕 Case #{case_dict['moderation_id']:,}", color=await interaction.client.get_embed_color(interaction.channel)) + embed = Embed( + title=f"📕 Case #{case_dict['moderation_id']:,}", + color=await interaction.client.get_embed_color(interaction.channel), + ) embed.description = f"**Type:** {str.title(case_dict['moderation_type'])}\n**Target:** {target_name} ({target_user['id']})\n**Moderator:** {moderator_name} ({moderator_user['id']})\n**Resolved:** {bool(case_dict['resolved'])}\n**Timestamp:** | " - if case_dict['duration'] != 'NULL': - td = timedelta(**{unit: int(val) for unit, val in zip(["hours", "minutes", "seconds"], case_dict["duration"].split(":"))}) - duration_embed = f"{humanize.precisedelta(td)} | " if bool(case_dict['expired']) is False else str(humanize.precisedelta(td)) + if case_dict["duration"] != "NULL": + td = timedelta( + **{ + unit: int(val) + for unit, val in zip( + ["hours", "minutes", "seconds"], case_dict["duration"].split(":") + ) + } + ) + duration_embed = ( + f"{humanize.precisedelta(td)} | " + if bool(case_dict["expired"]) is False + else str(humanize.precisedelta(td)) + ) embed.description += f"\n**Duration:** {duration_embed}\n**Expired:** {bool(case_dict['expired'])}" - embed.description += f"\n**Changes:** {len(case_dict['changes']) - 1}" if case_dict['changes'] else "\n**Changes:** 0" + embed.description += ( + f"\n**Changes:** {len(case_dict['changes']) - 1}" + if case_dict["changes"] + else "\n**Changes:** 0" + ) - if case_dict['metadata']: - if case_dict['metadata']['imported_from']: - embed.description += f"\n**Imported From:** {case_dict['metadata']['imported_from']}" + if case_dict["metadata"]: + if case_dict["metadata"]["imported_from"]: + embed.description += ( + f"\n**Imported From:** {case_dict['metadata']['imported_from']}" + ) - embed.add_field(name='Reason', value=box(case_dict['reason']), inline=False) + embed.add_field(name="Reason", value=box(case_dict["reason"]), inline=False) - if case_dict['resolved'] == 1: - resolved_user = await fetch_user_dict(interaction, case_dict['resolved_by']) - resolved_name = f"`{resolved_user['name']}`" if resolved_user['discriminator'] == "0" else f"`{resolved_user['name']}#{resolved_user['discriminator']}`" - embed.add_field(name='Resolve Reason', value=f"Resolved by {resolved_name} ({resolved_user['id']}) for:\n{box(case_dict['resolve_reason'])}", inline=False) + if case_dict["resolved"] == 1: + resolved_user = await fetch_user_dict(interaction, case_dict["resolved_by"]) + resolved_name = ( + f"`{resolved_user['name']}`" + if resolved_user["discriminator"] == "0" + else f"`{resolved_user['name']}#{resolved_user['discriminator']}`" + ) + embed.add_field( + name="Resolve Reason", + value=f"Resolved by {resolved_name} ({resolved_user['id']}) for:\n{box(case_dict['resolve_reason'])}", + inline=False, + ) return embed + async def changes_factory(interaction: Interaction, case_dict: dict) -> Embed: """This function creates a changes embed from set parameters. @@ -168,34 +299,56 @@ async def changes_factory(interaction: Interaction, case_dict: dict) -> Embed: interaction (Interaction): The interaction object. case_dict (dict): The case dictionary. """ - embed = Embed(title=f"📕 Case #{case_dict['moderation_id']:,} Changes", color=await interaction.client.get_embed_color(interaction.channel)) + embed = Embed( + title=f"📕 Case #{case_dict['moderation_id']:,} Changes", + color=await interaction.client.get_embed_color(interaction.channel), + ) memory_dict = {} - if case_dict['changes']: - for change in case_dict['changes']: - if change['user_id'] not in memory_dict: - memory_dict[str(change['user_id'])] = await fetch_user_dict(interaction, change['user_id']) + if case_dict["changes"]: + for change in case_dict["changes"]: + if change["user_id"] not in memory_dict: + memory_dict[str(change["user_id"])] = await fetch_user_dict( + interaction, change["user_id"] + ) - user = memory_dict[str(change['user_id'])] - name = user['name'] if user['discriminator'] == "0" else f"{user['name']}#{user['discriminator']}" + user = memory_dict[str(change["user_id"])] + name = ( + user["name"] + if user["discriminator"] == "0" + else f"{user['name']}#{user['discriminator']}" + ) timestamp = f" | " - if change['type'] == 'ORIGINAL': - embed.add_field(name='Original', value=f"**User:** `{name}` ({user['id']})\n**Reason:** {change['reason']}\n**Timestamp:** {timestamp}", inline=False) + if change["type"] == "ORIGINAL": + embed.add_field( + name="Original", + value=f"**User:** `{name}` ({user['id']})\n**Reason:** {change['reason']}\n**Timestamp:** {timestamp}", + inline=False, + ) - elif change['type'] == 'EDIT': - embed.add_field(name='Edit', value=f"**User:** `{name}` ({user['id']})\n**Reason:** {change['reason']}\n**Timestamp:** {timestamp}", inline=False) + elif change["type"] == "EDIT": + embed.add_field( + name="Edit", + value=f"**User:** `{name}` ({user['id']})\n**Reason:** {change['reason']}\n**Timestamp:** {timestamp}", + inline=False, + ) - elif change['type'] == 'RESOLVE': - embed.add_field(name='Resolve', value=f"**User:** `{name}` ({user['id']})\n**Reason:** {change['reason']}\n**Timestamp:** {timestamp}", inline=False) + elif change["type"] == "RESOLVE": + embed.add_field( + name="Resolve", + value=f"**User:** `{name}` ({user['id']})\n**Reason:** {change['reason']}\n**Timestamp:** {timestamp}", + inline=False, + ) else: embed.description = "*No changes have been made to this case.* 🙁" return embed + async def evidenceformat_factory(interaction: Interaction, case_dict: dict) -> str: """This function creates a codeblock in evidence format from set parameters. @@ -203,42 +356,49 @@ async def evidenceformat_factory(interaction: Interaction, case_dict: dict) -> s interaction (Interaction): The interaction object. case_dict (dict): The case dictionary. """ - if case_dict['target_type'] == 'USER': - target_user = await fetch_user_dict(interaction, case_dict['target_id']) - target_name = target_user['name'] if target_user['discriminator'] == "0" else f"{target_user['name']}#{target_user['discriminator']}" + if case_dict["target_type"] == "USER": + target_user = await fetch_user_dict(interaction, case_dict["target_id"]) + target_name = ( + target_user["name"] + if target_user["discriminator"] == "0" + else f"{target_user['name']}#{target_user['discriminator']}" + ) - elif case_dict['target_type'] == 'CHANNEL': - target_user = await fetch_channel_dict(interaction, case_dict['target_id']) - target_name = target_user['name'] + elif case_dict["target_type"] == "CHANNEL": + target_user = await fetch_channel_dict(interaction, case_dict["target_id"]) + target_name = target_user["name"] - moderator_user = await fetch_user_dict(interaction, case_dict['moderator_id']) - moderator_name = moderator_user['name'] if moderator_user['discriminator'] == "0" else f"{moderator_user['name']}#{moderator_user['discriminator']}" + moderator_user = await fetch_user_dict(interaction, case_dict["moderator_id"]) + moderator_name = ( + moderator_user["name"] + if moderator_user["discriminator"] == "0" + else f"{moderator_user['name']}#{moderator_user['discriminator']}" + ) content = f"Case: {case_dict['moderation_id']:,} ({str.title(case_dict['moderation_type'])})\nTarget: {target_name} ({target_user['id']})\nModerator: {moderator_name} ({moderator_user['id']})" - if case_dict['duration'] != 'NULL': - hours, minutes, seconds = map(int, case_dict['duration'].split(':')) + if case_dict["duration"] != "NULL": + hours, minutes, seconds = map(int, case_dict["duration"].split(":")) td = timedelta(hours=hours, minutes=minutes, seconds=seconds) content += f"\nDuration: {humanize.precisedelta(td)}" content += f"\nReason: {case_dict['reason']}" - return box(content, 'prolog') + return box(content, "prolog") ######################################################################################################################## ### Configuration Embeds # ######################################################################################################################## + async def _config(ctx: commands.Context) -> Embed: """Generates the core embed for configuration menus to use.""" - e = Embed( - title="Aurora Configuration Menu", - color=await ctx.embed_color() - ) + e = Embed(title="Aurora Configuration Menu", color=await ctx.embed_color()) e.set_thumbnail(url=ctx.bot.user.display_avatar.url) return e + async def overrides_embed(ctx: commands.Context) -> Embed: """Generates a configuration menu embed for a user's overrides.""" @@ -247,32 +407,44 @@ async def overrides_embed(ctx: commands.Context) -> Embed: "inline": await config.user(ctx.author).history_inline(), "inline_pagesize": await config.user(ctx.author).history_inline_pagesize(), "pagesize": await config.user(ctx.author).history_pagesize(), - "auto_evidenceformat": await config.user(ctx.author).auto_evidenceformat() + "auto_evidenceformat": await config.user(ctx.author).auto_evidenceformat(), } override_str = [ - '- ' + bold("Auto Evidence Format: ") + get_bool_emoji(override_settings['auto_evidenceformat']), - '- ' + bold("Ephemeral: ") + get_bool_emoji(override_settings['ephemeral']), - '- ' + bold("History Inline: ") + get_bool_emoji(override_settings['inline']), - '- ' + bold("History Inline Pagesize: ") + get_pagesize_str(override_settings['inline_pagesize']), - '- ' + bold("History Pagesize: ") + get_pagesize_str(override_settings['pagesize']), + "- " + + bold("Auto Evidence Format: ") + + get_bool_emoji(override_settings["auto_evidenceformat"]), + "- " + bold("Ephemeral: ") + get_bool_emoji(override_settings["ephemeral"]), + "- " + bold("History Inline: ") + get_bool_emoji(override_settings["inline"]), + "- " + + bold("History Inline Pagesize: ") + + get_pagesize_str(override_settings["inline_pagesize"]), + "- " + + bold("History Pagesize: ") + + get_pagesize_str(override_settings["pagesize"]), ] - override_str = '\n'.join(override_str) + override_str = "\n".join(override_str) e = await _config(ctx) e.title += ": User Overrides" - e.description = """ + e.description = ( + """ Use the buttons below to manage your user overrides. These settings will override the relevant guild settings.\n - """ + override_str + """ + + override_str + ) return e + async def guild_embed(ctx: commands.Context) -> Embed: """Generates a configuration menu field value for a guild's settings.""" guild_settings = { "show_moderator": await config.guild(ctx.guild).show_moderator(), - "use_discord_permissions": await config.guild(ctx.guild).use_discord_permissions(), + "use_discord_permissions": await config.guild( + ctx.guild + ).use_discord_permissions(), "ignore_modlog": await config.guild(ctx.guild).ignore_modlog(), "ignore_other_bots": await config.guild(ctx.guild).ignore_other_bots(), "dm_users": await config.guild(ctx.guild).dm_users(), @@ -280,54 +452,83 @@ async def guild_embed(ctx: commands.Context) -> Embed: "history_ephemeral": await config.guild(ctx.guild).history_ephemeral(), "history_inline": await config.guild(ctx.guild).history_inline(), "history_pagesize": await config.guild(ctx.guild).history_pagesize(), - "history_inline_pagesize": await config.guild(ctx.guild).history_inline_pagesize(), + "history_inline_pagesize": await config.guild( + ctx.guild + ).history_inline_pagesize(), "auto_evidenceformat": await config.guild(ctx.guild).auto_evidenceformat(), } - channel = ctx.guild.get_channel(guild_settings['log_channel']) + channel = ctx.guild.get_channel(guild_settings["log_channel"]) if channel is None: channel = warning("Not Set") else: channel = channel.mention guild_str = [ - '- '+ bold("Show Moderator: ") + get_bool_emoji(guild_settings['show_moderator']), - '- '+ bold("Use Discord Permissions: ") + get_bool_emoji(guild_settings['use_discord_permissions']), - '- '+ bold("Ignore Modlog: ") + get_bool_emoji(guild_settings['ignore_modlog']), - '- '+ bold("Ignore Other Bots: ") + get_bool_emoji(guild_settings['ignore_other_bots']), - '- '+ bold("DM Users: ") + get_bool_emoji(guild_settings['dm_users']), - '- '+ bold("Auto Evidence Format: ") + get_bool_emoji(guild_settings['auto_evidenceformat']), - '- '+ bold("Ephemeral: ") + get_bool_emoji(guild_settings['history_ephemeral']), - '- '+ bold("History Inline: ") + get_bool_emoji(guild_settings['history_inline']), - '- '+ bold("History Pagesize: ") + get_pagesize_str(guild_settings['history_pagesize']), - '- '+ bold("History Inline Pagesize: ") + get_pagesize_str(guild_settings['history_inline_pagesize']), - '- '+ bold("Log Channel: ") + channel + "- " + + bold("Show Moderator: ") + + get_bool_emoji(guild_settings["show_moderator"]), + "- " + + bold("Use Discord Permissions: ") + + get_bool_emoji(guild_settings["use_discord_permissions"]), + "- " + + bold("Ignore Modlog: ") + + get_bool_emoji(guild_settings["ignore_modlog"]), + "- " + + bold("Ignore Other Bots: ") + + get_bool_emoji(guild_settings["ignore_other_bots"]), + "- " + bold("DM Users: ") + get_bool_emoji(guild_settings["dm_users"]), + "- " + + bold("Auto Evidence Format: ") + + get_bool_emoji(guild_settings["auto_evidenceformat"]), + "- " + + bold("Ephemeral: ") + + get_bool_emoji(guild_settings["history_ephemeral"]), + "- " + + bold("History Inline: ") + + get_bool_emoji(guild_settings["history_inline"]), + "- " + + bold("History Pagesize: ") + + get_pagesize_str(guild_settings["history_pagesize"]), + "- " + + bold("History Inline Pagesize: ") + + get_pagesize_str(guild_settings["history_inline_pagesize"]), + "- " + bold("Log Channel: ") + channel, ] - guild_str = '\n'.join(guild_str) + guild_str = "\n".join(guild_str) e = await _config(ctx) e.title += ": Server Configuration" - e.description = """ + e.description = ( + """ Use the buttons below to manage Aurora's server configuration.\n - """ + guild_str + """ + + guild_str + ) return e + async def addrole_embed(ctx: commands.Context) -> Embed: """Generates a configuration menu field value for a guild's addrole whitelist.""" whitelist = await config.guild(ctx.guild).addrole_whitelist() if whitelist: - whitelist = [ctx.guild.get_role(role).mention or error(f"`{role}` (Not Found)") for role in whitelist] - whitelist = '\n'.join(whitelist) + whitelist = [ + ctx.guild.get_role(role).mention or error(f"`{role}` (Not Found)") + for role in whitelist + ] + whitelist = "\n".join(whitelist) else: whitelist = warning("No roles are on the addrole whitelist!") e = await _config(ctx) e.title += ": Addrole Whitelist" - e.description = "Use the select menu below to manage this guild's addrole whitelist." + e.description = ( + "Use the select menu below to manage this guild's addrole whitelist." + ) if len(whitelist) > 4000 and len(whitelist) < 5000: - lines = whitelist.split('\n') + lines = whitelist.split("\n") chunks = [] chunk = "" for line in lines: @@ -335,23 +536,27 @@ async def addrole_embed(ctx: commands.Context) -> Embed: chunks.append(chunk) chunk = line else: - chunk += '\n' + line if chunk else line + chunk += "\n" + line if chunk else line chunks.append(chunk) for chunk in chunks: e.add_field(name="", value=chunk) else: - e.description += '\n\n' + whitelist + e.description += "\n\n" + whitelist return e + async def immune_embed(ctx: commands.Context) -> Embed: """Generates a configuration menu field value for a guild's immune roles.""" immune_roles = await config.guild(ctx.guild).immune_roles() if immune_roles: - immune_str = [ctx.guild.get_role(role).mention or error(f"`{role}` (Not Found)") for role in immune_roles] - immune_str = '\n'.join(immune_str) + immune_str = [ + ctx.guild.get_role(role).mention or error(f"`{role}` (Not Found)") + for role in immune_roles + ] + immune_str = "\n".join(immune_str) else: immune_str = warning("No roles are set as immune roles!") @@ -360,7 +565,7 @@ async def immune_embed(ctx: commands.Context) -> Embed: e.description = "Use the select menu below to manage this guild's immune roles." if len(immune_str) > 4000 and len(immune_str) < 5000: - lines = immune_str.split('\n') + lines = immune_str.split("\n") chunks = [] chunk = "" for line in lines: @@ -368,12 +573,12 @@ async def immune_embed(ctx: commands.Context) -> Embed: chunks.append(chunk) chunk = line else: - chunk += '\n' + line if chunk else line + chunk += "\n" + line if chunk else line chunks.append(chunk) for chunk in chunks: e.add_field(name="", value=chunk) else: - e.description += '\n\n' + immune_str + e.description += "\n\n" + immune_str return e