# _____ _ # / ____| (_) # | (___ ___ __ _ _____ ___ _ __ ___ _ __ ___ ___ _ __ # \___ \ / _ \/ _` / __\ \ /\ / / | '_ ` _ \| '_ ` _ \ / _ \ '__| # ____) | __/ (_| \__ \\ V V /| | | | | | | | | | | | __/ | # |_____/ \___|\__,_|___/ \_/\_/ |_|_| |_| |_|_| |_| |_|\___|_| import json import os import sqlite3 import time from datetime import datetime, timedelta, timezone from math import ceil import discord import humanize from discord.ext import tasks from pytimeparse2 import disable_dateutil, parse from redbot.core import app_commands, commands, data_manager from redbot.core.app_commands import Choice from redbot.core.bot import Red from redbot.core.utils.chat_formatting import (box, error, humanize_list, warning) from aurora.importers.aurora import ImportAuroraView from aurora.importers.galacticbot import ImportGalacticBotView from aurora.menus.addrole import Addrole from aurora.menus.guild import Guild from aurora.menus.immune import Immune from aurora.menus.overrides import Overrides from aurora.utilities.config import config, register_config from aurora.utilities.database import (connect, create_guild_table, fetch_case, mysql_log) from aurora.utilities.factory import (addrole_embed, case_factory, changes_factory, evidenceformat_factory, guild_embed, immune_embed, message_factory, overrides_embed) from aurora.utilities.logger import logger from aurora.utilities.utils import (check_moddable, check_permissions, convert_timedelta_to_str, fetch_channel_dict, fetch_user_dict, generate_dict, log, send_evidenceformat) class Aurora(commands.Cog): """Aurora is a fully-featured moderation system. It is heavily inspired by GalacticBot, and is designed to be a more user-friendly alternative to Red's core Mod cogs. This cog stores all of its data in an SQLite database.""" __author__ = ["SeaswimmerTheFsh"] __version__ = "2.1.6" async def red_delete_data_for_user(self, *, requester, user_id: int): if requester == "discord_deleted_user": await config.user_from_id(user_id).clear() database = connect() cursor = database.cursor() cursor.execute("SHOW TABLES;") tables = [table[0] for table in cursor.fetchall()] condition = "target_id = %s OR moderator_id = %s;" for table in tables: delete_query = f"DELETE FROM {table[0]} WHERE {condition}" cursor.execute(delete_query, (user_id, user_id)) database.commit() cursor.close() database.close() if requester == "owner": await config.user_from_id(user_id).clear() if requester == "user": await config.user_from_id(user_id).clear() if requester == "user_strict": await config.user_from_id(user_id).clear() else: logger.warning( "Invalid requester passed to red_delete_data_for_user: %s", requester ) def __init__(self, bot: Red): super().__init__() self.bot = bot register_config(config) disable_dateutil() self.handle_expiry.start() def format_help_for_context(self, ctx: commands.Context) -> str: pre_processed = super().format_help_for_context(ctx) or "" n = "\n" if "\n\n" not in pre_processed else "" text = [ f"{pre_processed}{n}", f"Cog Version: **{self.__version__}**", f"Author: {humanize_list(self.__author__)}", ] return "\n".join(text) async def cog_load(self): """This method prepares the database schema for all of the guilds the bot is currently in.""" guilds: list[discord.Guild] = self.bot.guilds try: for guild in guilds: if not await self.bot.cog_disabled_in_guild(self, guild): await create_guild_table(guild) except ConnectionRefusedError: return async def cog_unload(self): self.handle_expiry.cancel() @commands.Cog.listener("on_guild_join") async def db_generate_guild_join(self, guild: discord.Guild): """This method prepares the database schema whenever the bot joins a guild.""" if not await self.bot.cog_disabled_in_guild(self, guild): try: await create_guild_table(guild) except ConnectionRefusedError: return @commands.Cog.listener("on_audit_log_entry_create") async def autologger(self, entry: discord.AuditLogEntry): """This method automatically logs moderations done by users manually ("right clicks").""" if not await self.bot.cog_disabled_in_guild(self, entry.guild): if await config.guild(entry.guild).ignore_other_bots() is True: if entry.user.bot or entry.target.bot: return else: if entry.user.id == self.bot.user.id: return duration = "NULL" if entry.reason: reason = entry.reason + " (This action was performed without the bot.)" else: reason = "This action was performed without the bot." if entry.action == discord.AuditLogAction.kick: moderation_type = "KICK" elif entry.action == discord.AuditLogAction.ban: moderation_type = "BAN" elif entry.action == discord.AuditLogAction.unban: moderation_type = "UNBAN" elif entry.action == discord.AuditLogAction.member_update: if entry.after.timed_out_until is not None: timed_out_until_aware = entry.after.timed_out_until.replace( tzinfo=timezone.utc ) duration_datetime = timed_out_until_aware - datetime.now( tz=timezone.utc ) minutes = round(duration_datetime.total_seconds() / 60) duration = timedelta(minutes=minutes) moderation_type = "MUTE" else: moderation_type = "UNMUTE" else: return await mysql_log( entry.guild.id, entry.user.id, moderation_type, "USER", entry.target.id, 0, duration, reason, ) ####################################################################################################################### ### COMMANDS ####################################################################################################################### @commands.hybrid_command(name="note") @commands.mod_or_permissions(moderate_members=True) async def note( self, ctx: commands.Context, target: discord.User, reason: str, silent: bool = None, ): """Add a note to a user. Parameters ----------- target: discord.User Who are you noting? reason: str Why are you noting this user? silent: bool Should the user be messaged?""" if not await check_moddable(target, ctx, ["moderate_members"]): return message = await ctx.send( content=f"{target.mention} has recieved a note!\n**Reason** - `{reason}`" ) if silent is None: silent = not await config.guild(ctx.guild).dm_users() if silent is False: try: embed = await message_factory( await self.bot.get_embed_color(ctx.channel), guild=ctx.guild, moderator=ctx.author, reason=reason, moderation_type="note", response=message, ) await target.send(embed=embed) except discord.errors.HTTPException: pass moderation_id = await mysql_log( ctx.guild.id, ctx.author.id, "NOTE", "USER", target.id, 0, "NULL", reason, ) await message.edit( content=f"{target.mention} has received a note! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`" ) await log(ctx, moderation_id) case = await fetch_case(moderation_id, ctx.guild.id) await send_evidenceformat(ctx, case) @app_commands.command(name="warn") async def warn( self, interaction: discord.Interaction, target: discord.Member, reason: str, silent: bool = None, ): """Warn a user. Parameters ----------- target: discord.Member Who are you warning? reason: str Why are you warning this user? silent: bool Should the user be messaged?""" if not await check_moddable(target, interaction, ["moderate_members"]): return await interaction.response.send_message( content=f"{target.mention} has been warned!\n**Reason** - `{reason}`" ) if silent is None: silent = not await config.guild(interaction.guild).dm_users() if silent is False: try: embed = await message_factory( await self.bot.get_embed_color(interaction.channel), guild=interaction.guild, moderator=interaction.user, reason=reason, moderation_type="warned", response=await interaction.original_response(), ) await target.send(embed=embed) except discord.errors.HTTPException: pass moderation_id = await mysql_log( interaction.guild.id, interaction.user.id, "WARN", "USER", target.id, 0, "NULL", reason, ) await interaction.edit_original_response( content=f"{target.mention} has been warned! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`" ) await log(interaction, moderation_id) case = await fetch_case(moderation_id, interaction.guild.id) await send_evidenceformat(interaction, case) @app_commands.command(name="addrole") async def addrole( self, interaction: discord.Interaction, target: discord.Member, role: discord.Role, reason: str, duration: str = None, silent: bool = None, ): """Add a role to a user. Parameters ----------- target: discord.Member Who are you adding a role to? role: discord.Role What role are you adding to the target? reason: str Why are you adding a role to this user? duration: str How long are you adding this role for? silent: bool Should the user be messaged?""" addrole_whitelist = await config.guild(interaction.guild).addrole_whitelist() if not addrole_whitelist: await interaction.response.send_message( content=error("There are no whitelisted roles set for this server!"), ephemeral=True, ) return if duration is not None: try: parsed_time = parse( sval=duration, as_timedelta=True, raise_exception=True ) except ValueError: await interaction.response.send_message( error("Please provide a valid duration!"), ephemeral=True ) return else: parsed_time = "NULL" if role.id not in addrole_whitelist: await interaction.response.send_message( content=error("That role isn't whitelisted!"), ephemeral=True ) return if not await check_moddable( target, interaction, ["moderate_members", "manage_roles"] ): return if role.id in [user_role.id for user_role in target.roles]: await interaction.response.send_message( content=error(f"{target.mention} already has this role!"), ephemeral=True, ) return await interaction.response.defer() if silent is None: silent = not await config.guild(interaction.guild).dm_users() if silent is False: try: embed = await message_factory( await self.bot.get_embed_color(interaction.channel), guild=interaction.guild, moderator=interaction.user, reason=reason, moderation_type="addrole", response=await interaction.original_response(), duration=parsed_time, role=role, ) await target.send(embed=embed) except discord.errors.HTTPException: pass await target.add_roles( role, reason=f"Role added by {interaction.user.id}{(' for ' + {humanize.precisedelta(parsed_time)} if parsed_time != 'NULL' else '')} for: {reason}", ) response: discord.WebhookMessage = await interaction.followup.send( content=f"{target.mention} has been given the {role.mention} role{(' for ' + {humanize.precisedelta(parsed_time)} if parsed_time != 'NULL' else '')}!\n**Reason** - `{reason}`" ) moderation_id = await mysql_log( interaction.guild.id, interaction.user.id, "ADDROLE", "USER", target.id, role.id, parsed_time, reason, ) await response.edit( content=f"{target.mention} has been given the {role.mention} role{(' for ' + {humanize.precisedelta(parsed_time)} if parsed_time != 'NULL' else '')}! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`", ) await log(interaction, moderation_id) case = await fetch_case(moderation_id, interaction.guild.id) await send_evidenceformat(interaction, case) @app_commands.command(name="mute") async def mute( self, interaction: discord.Interaction, target: discord.Member, duration: str, reason: str, silent: bool = None, ): """Mute a user. Parameters ----------- target: discord.Member Who are you unbanning? duration: str How long are you muting this user for? reason: str Why are you unbanning this user? silent: bool Should the user be messaged?""" if not await check_moddable(target, interaction, ["moderate_members"]): return if target.is_timed_out() is True: await interaction.response.send_message( error(f"{target.mention} is already muted!"), allowed_mentions=discord.AllowedMentions(users=False), ephemeral=True, ) return try: parsed_time = parse(sval=duration, as_timedelta=True, raise_exception=True) except ValueError: await interaction.response.send_message( error("Please provide a valid duration!"), ephemeral=True ) return if parsed_time.total_seconds() / 1000 > 2419200000: await interaction.response.send_message( error("Please provide a duration that is less than 28 days.") ) return await target.timeout( parsed_time, reason=f"Muted by {interaction.user.id} for: {reason}" ) await interaction.response.send_message( content=f"{target.mention} has been muted for {humanize.precisedelta(parsed_time)}!\n**Reason** - `{reason}`" ) if silent is None: silent = not await config.guild(interaction.guild).dm_users() if silent is False: try: embed = await message_factory( await self.bot.get_embed_color(interaction.channel), guild=interaction.guild, moderator=interaction.user, reason=reason, moderation_type="muted", response=await interaction.original_response(), duration=parsed_time, ) await target.send(embed=embed) except discord.errors.HTTPException: pass moderation_id = await mysql_log( interaction.guild.id, interaction.user.id, "MUTE", "USER", target.id, 0, parsed_time, reason, ) await interaction.edit_original_response( content=f"{target.mention} has been muted for {humanize.precisedelta(parsed_time)}! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`" ) await log(interaction, moderation_id) case = await fetch_case(moderation_id, interaction.guild.id) await send_evidenceformat(interaction, case) @app_commands.command(name="unmute") async def unmute( self, interaction: discord.Interaction, target: discord.Member, reason: str = None, silent: bool = None, ): """Unmute a user. Parameters ----------- target: discord.user Who are you unmuting? reason: str Why are you unmuting this user? silent: bool Should the user be messaged?""" if not await check_moddable(target, interaction, ["moderate_members"]): return if target.is_timed_out() is False: await interaction.response.send_message( error(f"{target.mention} is not muted!"), allowed_mentions=discord.AllowedMentions(users=False), ephemeral=True, ) return if reason: await target.timeout( None, reason=f"Unmuted by {interaction.user.id} for: {reason}" ) else: await target.timeout(None, reason=f"Unbanned by {interaction.user.id}") reason = "No reason given." await interaction.response.send_message( content=f"{target.mention} has been unmuted!\n**Reason** - `{reason}`" ) if silent is None: silent = not await config.guild(interaction.guild).dm_users() if silent is False: try: embed = await message_factory( await self.bot.get_embed_color(interaction.channel), guild=interaction.guild, moderator=interaction.user, reason=reason, moderation_type="unmuted", response=await interaction.original_response(), ) await target.send(embed=embed) except discord.errors.HTTPException: pass moderation_id = await mysql_log( interaction.guild.id, interaction.user.id, "UNMUTE", "USER", target.id, 0, "NULL", reason, ) await interaction.edit_original_response( content=f"{target.mention} has been unmuted! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`" ) await log(interaction, moderation_id) case = await fetch_case(moderation_id, interaction.guild.id) await send_evidenceformat(interaction, case) @app_commands.command(name="kick") async def kick( self, interaction: discord.Interaction, target: discord.Member, reason: str, silent: bool = None, ): """Kick a user. Parameters ----------- target: discord.user Who are you kicking? reason: str Why are you kicking this user? silent: bool Should the user be messaged?""" if not await check_moddable(target, interaction, ["kick_members"]): return await interaction.response.send_message( content=f"{target.mention} has been kicked!\n**Reason** - `{reason}`" ) if silent is None: silent = not await config.guild(interaction.guild).dm_users() if silent is False: try: embed = await message_factory( await self.bot.get_embed_color(interaction.channel), guild=interaction.guild, moderator=interaction.user, reason=reason, moderation_type="kicked", response=await interaction.original_response(), ) await target.send(embed=embed) except discord.errors.HTTPException: pass await target.kick(reason=f"Kicked by {interaction.user.id} for: {reason}") moderation_id = await mysql_log( interaction.guild.id, interaction.user.id, "KICK", "USER", target.id, 0, "NULL", reason, ) await interaction.edit_original_response( content=f"{target.mention} has been kicked! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`" ) await log(interaction, moderation_id) case = await fetch_case(moderation_id, interaction.guild.id) await send_evidenceformat(interaction, case) @app_commands.command(name="ban") @app_commands.choices( delete_messages=[ Choice(name="None", value=0), Choice(name="1 Hour", value=3600), Choice(name="12 Hours", value=43200), Choice(name="1 Day", value=86400), Choice(name="3 Days", value=259200), Choice(name="7 Days", value=604800), ] ) async def ban( self, interaction: discord.Interaction, target: discord.User, reason: str, duration: str = None, delete_messages: Choice[int] = None, silent: bool = None, ): """Ban a user. Parameters ----------- target: discord.user Who are you banning? duration: str How long are you banning this user for? reason: str Why are you banning this user? delete_messages: Choices[int] How many days of messages to delete? silent: bool Should the user be messaged?""" if not await check_moddable(target, interaction, ["ban_members"]): return if delete_messages is None: delete_messages_seconds = 0 else: delete_messages_seconds = delete_messages.value try: await interaction.guild.fetch_ban(target) await interaction.response.send_message( content=error(f"{target.mention} is already banned!"), ephemeral=True ) return except discord.errors.NotFound: pass if duration: try: parsed_time = parse( sval=duration, as_timedelta=True, raise_exception=True ) except ValueError: await interaction.response.send_message( error("Please provide a valid duration!"), ephemeral=True ) return await interaction.response.send_message( content=f"{target.mention} has been banned for {humanize.precisedelta(parsed_time)}!\n**Reason** - `{reason}`" ) try: embed = await message_factory( await self.bot.get_embed_color(interaction.channel), guild=interaction.guild, moderator=interaction.user, reason=reason, moderation_type="tempbanned", response=await interaction.original_response(), duration=parsed_time, ) await target.send(embed=embed) except discord.errors.HTTPException: pass await interaction.guild.ban( target, reason=f"Tempbanned by {interaction.user.id} for: {reason} (Duration: {parsed_time})", delete_message_seconds=delete_messages_seconds, ) moderation_id = await mysql_log( interaction.guild.id, interaction.user.id, "TEMPBAN", "USER", target.id, 0, parsed_time, reason, ) await interaction.edit_original_response( content=f"{target.mention} has been banned for {humanize.precisedelta(parsed_time)}! (Case `#{moderation_id}`)\n**Reason** - `{reason}`" ) await log(interaction, moderation_id) case = await fetch_case(moderation_id, interaction.guild.id) await send_evidenceformat(interaction, case) else: await interaction.response.send_message( content=f"{target.mention} has been banned!\n**Reason** - `{reason}`" ) if silent is None: silent = not await config.guild(interaction.guild).dm_users() if silent is False: try: embed = embed = await message_factory( await self.bot.get_embed_color(interaction.channel), guild=interaction.guild, moderator=interaction.user, reason=reason, moderation_type="banned", response=await interaction.original_response(), ) await target.send(embed=embed) except discord.errors.HTTPException: pass await interaction.guild.ban( target, reason=f"Banned by {interaction.user.id} for: {reason}", delete_message_seconds=delete_messages_seconds, ) moderation_id = await mysql_log( interaction.guild.id, interaction.user.id, "BAN", "USER", target.id, 0, "NULL", reason, ) await interaction.edit_original_response( content=f"{target.mention} has been banned! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`" ) await log(interaction, moderation_id) case = await fetch_case(moderation_id, interaction.guild.id) await send_evidenceformat(interaction, case) @app_commands.command(name="unban") async def unban( self, interaction: discord.Interaction, target: discord.User, reason: str = None, silent: bool = None, ): """Unban a user. Parameters ----------- target: discord.user Who are you unbanning? reason: str Why are you unbanning this user? silent: bool Should the user be messaged?""" if not await check_moddable(target, interaction, ["ban_members"]): return try: await interaction.guild.fetch_ban(target) except discord.errors.NotFound: await interaction.response.send_message( content=error(f"{target.mention} is not banned!"), ephemeral=True ) return if reason: await interaction.guild.unban( target, reason=f"Unbanned by {interaction.user.id} for: {reason}" ) else: await interaction.guild.unban( target, reason=f"Unbanned by {interaction.user.id}" ) reason = "No reason given." await interaction.response.send_message( content=f"{target.mention} has been unbanned!\n**Reason** - `{reason}`" ) if silent is None: silent = not await config.guild(interaction.guild).dm_users() if silent is False: try: embed = await message_factory( await self.bot.get_embed_color(interaction.channel), guild=interaction.guild, moderator=interaction.user, reason=reason, moderation_type="unbanned", response=await interaction.original_response(), ) await target.send(embed=embed) except discord.errors.HTTPException: pass moderation_id = await mysql_log( interaction.guild.id, interaction.user.id, "UNBAN", "USER", target.id, 0, "NULL", reason, ) await interaction.edit_original_response( content=f"{target.mention} has been unbanned! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`" ) await log(interaction, moderation_id) case = await fetch_case(moderation_id, interaction.guild.id) await send_evidenceformat(interaction, case) @app_commands.command(name="history") async def history( self, interaction: discord.Interaction, target: discord.User = None, moderator: discord.User = None, pagesize: app_commands.Range[int, 1, 20] = None, page: int = 1, ephemeral: bool = None, inline: bool = None, export: bool = False, ): """List previous infractions. Parameters ----------- target: discord.User User whose infractions to query, overrides moderator if both are given moderator: discord.User Query by moderator pagesize: app_commands.Range[int, 1, 25] Amount of infractions to list per page page: int Page to select ephemeral: bool Hide the command response inline: bool Display infractions in a grid arrangement (does not look very good) export: bool Exports the server's entire moderation history to a JSON file""" if ephemeral is None: ephemeral = ( await config.user(interaction.user).history_ephemeral() or await config.guild(interaction.guild).history_ephemeral() or False ) if inline is None: inline = ( await config.user(interaction.user).history_inline() or await config.guild(interaction.guild).history_inline() or False ) if pagesize is None: if inline is True: pagesize = ( await config.user(interaction.user).history_inline_pagesize() or await config.guild(interaction.guild).history_inline_pagesize() or 6 ) else: pagesize = ( await config.user(interaction.user).history_pagesize() or await config.guild(interaction.guild).history_pagesize() or 5 ) await interaction.response.defer(ephemeral=ephemeral) permissions = check_permissions( interaction.client.user, ["embed_links"], interaction ) if permissions: await interaction.followup.send( error( f"I do not have the `{permissions}` permission, required for this action." ), ephemeral=True, ) return database = connect() if export: database.row_factory = sqlite3.Row cursor = database.cursor() query = f"""SELECT * FROM moderation_{interaction.guild.id} ORDER BY moderation_id DESC;""" cursor.execute(query) results = cursor.fetchall() cases = [] for result in results: case = dict(result) cases.append(case) try: filename = ( str(data_manager.cog_data_path(cog_instance=self)) + str(os.sep) + f"moderation_{interaction.guild.id}.json" ) with open(filename, "w", encoding="utf-8") as f: json.dump(cases, f, indent=2) await interaction.followup.send( file=discord.File( filename, f"moderation_{interaction.guild.id}.json" ), ephemeral=ephemeral, ) os.remove(filename) except json.JSONDecodeError as e: await interaction.followup.send( content=error( "An error occured while exporting the moderation history.\nError:\n" ) + box(e, "py"), ephemeral=ephemeral, ) cursor.close() database.close() return cursor = database.cursor() if target: query = f"""SELECT * FROM moderation_{interaction.guild.id} WHERE target_id = ? ORDER BY moderation_id DESC;""" cursor.execute(query, (target.id,)) elif moderator: query = f"""SELECT * FROM moderation_{interaction.guild.id} WHERE moderator_id = ? ORDER BY moderation_id DESC;""" cursor.execute(query, (moderator.id,)) else: query = f"""SELECT * FROM moderation_{interaction.guild.id} ORDER BY moderation_id DESC;""" cursor.execute(query) results = cursor.fetchall() result_dict_list = [] for result in results: case_dict = generate_dict(result) if case_dict["moderation_id"] == 0: continue result_dict_list.append(case_dict) case_quantity = len(result_dict_list) page_quantity = ceil(case_quantity / pagesize) start_index = (page - 1) * pagesize end_index = page * pagesize embed = discord.Embed(color=await self.bot.get_embed_color(interaction.channel)) embed.set_author(icon_url=interaction.guild.icon.url, name="Infraction History") embed.set_footer( text=f"Page {page:,}/{page_quantity:,} | {case_quantity:,} Results" ) memory_dict = {} for case in result_dict_list[start_index:end_index]: if case["target_id"] not in memory_dict: if case["target_type"] == "USER": memory_dict[str(case["target_id"])] = await fetch_user_dict( interaction.client, case["target_id"] ) elif case["target_type"] == "CHANNEL": memory_dict[str(case["target_id"])] = await fetch_channel_dict( interaction.guild, case["target_id"] ) target_user = memory_dict[str(case["target_id"])] if case["target_type"] == "USER": target_name = ( f"`{target_user['name']}`" if target_user["discriminator"] == "0" else f"`{target_user['name']}#{target_user['discriminator']}`" ) elif case["target_type"] == "CHANNEL": target_name = f"`{target_user['mention']}`" if case["moderator_id"] not in memory_dict: memory_dict[str(case["moderator_id"])] = await fetch_user_dict( interaction.client, case["moderator_id"] ) moderator_user = memory_dict[str(case["moderator_id"])] moderator_name = ( f"`{moderator_user['name']}`" if moderator_user["discriminator"] == "0" else f"`{moderator_user['name']}#{moderator_user['discriminator']}`" ) field_name = f"Case #{case['moderation_id']:,} ({str.title(case['moderation_type'])})" field_value = f"**Target:** {target_name} ({target_user['id']})\n**Moderator:** {moderator_name} ({moderator_user['id']})" if len(case["reason"]) > 125: field_value += f"\n**Reason:** `{str(case['reason'])[:125]}...`" else: field_value += f"\n**Reason:** `{str(case['reason'])}`" if case["duration"] != "NULL": td = timedelta( **{ unit: int(val) for unit, val in zip( ["hours", "minutes", "seconds"], case["duration"].split(":") ) } ) duration_embed = ( f"{humanize.precisedelta(td)} | " if bool(case["expired"]) is False else f"{humanize.precisedelta(td)} | Expired" ) field_value += f"\n**Duration:** {duration_embed}" field_value += ( f"\n**Timestamp:** | " ) if case["role_id"] != "0": role = interaction.guild.get_role(int(case["role_id"])) if role is not None: field_value += f"\n**Role:** {role.mention}" else: field_value += f"\n**Role:** Deleted Role ({case['role_id']})" if bool(case["resolved"]): field_value += "\n**Resolved:** True" embed.add_field(name=field_name, value=field_value, inline=inline) await interaction.followup.send(embed=embed, ephemeral=ephemeral) @app_commands.command(name="resolve") async def resolve( self, interaction: discord.Interaction, case: int, reason: str = None ): """Resolve a specific case. Parameters ----------- case: int Case number of the case you're trying to resolve reason: str Reason for resolving case""" permissions = check_permissions( interaction.client.user, ["embed_links", "moderate_members", "ban_members"], interaction, ) if permissions: await interaction.response.send_message( error( f"I do not have the `{permissions}` permission, required for this action." ), ephemeral=True, ) return database = connect() cursor = database.cursor() query_1 = ( f"SELECT * FROM moderation_{interaction.guild.id} WHERE moderation_id = ?;" ) cursor.execute(query_1, (case,)) result_1 = cursor.fetchone() if result_1 is None or case == 0: await interaction.response.send_message( content=error(f"There is no moderation with a case number of {case}."), ephemeral=True, ) return query_2 = f"SELECT * FROM moderation_{interaction.guild.id} WHERE moderation_id = ? AND resolved = 0;" cursor.execute(query_2, (case,)) result_2 = cursor.fetchone() if result_2 is None: await interaction.response.send_message( content=error( f"This moderation has already been resolved!\nUse `/case {case}` for more information." ), ephemeral=True, ) return case_dict = generate_dict(result_2) if reason is None: reason = "No reason given." changes: list = case_dict["changes"] if len(changes) > 25: await interaction.response.send_message( content=error( "Due to limitations with Discord's embed system, you cannot edit a case more than 25 times." ), ephemeral=True, ) return if not changes: changes.append( { "type": "ORIGINAL", "timestamp": case_dict["timestamp"], "reason": case_dict["reason"], "user_id": case_dict["moderator_id"], } ) changes.append( { "type": "RESOLVE", "timestamp": int(time.time()), "reason": reason, "user_id": interaction.user.id, } ) if case_dict["moderation_type"] in ["UNMUTE", "UNBAN"]: await interaction.response.send_message( content=error("You cannot resolve this type of moderation!"), ephemeral=True, ) return if case_dict["moderation_type"] in ["MUTE", "TEMPBAN", "BAN"]: if case_dict["moderation_type"] == "MUTE": try: member = await interaction.guild.fetch_member( case_dict["target_id"] ) await member.timeout( None, reason=f"Case #{case:,} resolved by {interaction.user.id}" ) except discord.NotFound: pass if case_dict["moderation_type"] in ["TEMPBAN", "BAN"]: try: user = await interaction.client.fetch_user(case_dict["target_id"]) await interaction.guild.unban( user, reason=f"Case #{case} resolved by {interaction.user.id}" ) except discord.NotFound: pass resolve_query = f"UPDATE `moderation_{interaction.guild.id}` SET resolved = 1, changes = ?, resolved_by = ?, resolve_reason = ? WHERE moderation_id = ?" else: resolve_query = f"UPDATE `moderation_{interaction.guild.id}` SET resolved = 1, changes = ?, resolved_by = ?, resolve_reason = ? WHERE moderation_id = ?" cursor.execute( resolve_query, ( json.dumps(changes), interaction.user.id, reason, case_dict["moderation_id"], ), ) database.commit() embed = await case_factory( interaction=interaction, case_dict=await fetch_case(case, interaction.guild.id), ) await interaction.response.send_message( content=f"✅ Moderation #{case:,} resolved!", embed=embed ) await log(interaction, case, resolved=True) cursor.close() database.close() @app_commands.command(name="case") @app_commands.choices( export=[ Choice(name="Export as File", value="file"), Choice(name="Export as Codeblock", value="codeblock"), ] ) async def case( self, interaction: discord.Interaction, case: int, ephemeral: bool = None, evidenceformat: bool = False, changes: bool = False, export: Choice[str] = None, ): """Check the details of a specific case. Parameters ----------- case: int What case are you looking up? ephemeral: bool Hide the command response changes: bool List the changes made to the case export: bool Export the case to a JSON file or codeblock""" permissions = check_permissions( interaction.client.user, ["embed_links"], interaction ) if permissions: await interaction.response.send_message( error( f"I do not have the `{permissions}` permission, required for this action." ), ephemeral=True, ) return if ephemeral is None: ephemeral = ( await config.user(interaction.user).history_ephemeral() or await config.guild(interaction.guild).history_ephemeral() or False ) if case != 0: case_dict = await fetch_case(case, interaction.guild.id) if case_dict: if export: if export.value == "file" or len(str(case_dict)) > 1800: filename = ( str(data_manager.cog_data_path(cog_instance=self)) + str(os.sep) + f"moderation_{interaction.guild.id}_case_{case}.json" ) with open(filename, "w", encoding="utf-8") as f: json.dump(case_dict, f, indent=2) if export.value == "codeblock": content = f"Case #{case:,} exported.\n" + warning( "Case was too large to export as codeblock, so it has been uploaded as a `.json` file." ) else: content = f"Case #{case:,} exported." await interaction.response.send_message( content=content, file=discord.File( filename, f"moderation_{interaction.guild.id}_case_{case}.json", ), ephemeral=ephemeral, ) os.remove(filename) return await interaction.response.send_message( content=box({json.dumps(case_dict, indent=2)}), ephemeral=ephemeral, ) return if changes: embed = await changes_factory( interaction=interaction, case_dict=case_dict ) await interaction.response.send_message( embed=embed, ephemeral=ephemeral ) elif evidenceformat: content = await evidenceformat_factory( interaction=interaction, case_dict=case_dict ) await interaction.response.send_message( content=content, ephemeral=ephemeral ) else: embed = await case_factory( interaction=interaction, case_dict=case_dict ) await interaction.response.send_message( embed=embed, ephemeral=ephemeral ) return await interaction.response.send_message( content=f"No case with case number `{case}` found.", ephemeral=True ) @app_commands.command(name="edit") async def edit( self, interaction: discord.Interaction, case: int, reason: str, duration: str = None, ): """Edit the reason of a specific case. Parameters ----------- case: int What case are you editing? reason: str What is the new reason? duration: str What is the new duration? Does not reapply the moderation if it has already expired. """ permissions = check_permissions( interaction.client.user, ["embed_links"], interaction ) if permissions: await interaction.response.send_message( error( f"I do not have the `{permissions}` permission, required for this action." ), ephemeral=True, ) return if case != 0: parsed_time = None case_dict = await fetch_case(case, interaction.guild.id) if case_dict: if duration: try: parsed_time = parse( sval=duration, as_timedelta=True, raise_exception=True ) except ValueError: await interaction.response.send_message( error("Please provide a valid duration!"), ephemeral=True ) return end_timestamp = case_dict["timestamp"] + parsed_time.total_seconds() if case_dict["moderation_type"] == "MUTE": if ( time.time() - case_dict["timestamp"] ) + parsed_time.total_seconds() > 2419200: await interaction.response.send_message( error( "Please provide a duration that is less than 28 days from the initial moderation." ) ) return try: member = await interaction.guild.fetch_member( case_dict["target_id"] ) await member.timeout( parsed_time, reason=f"Case #{case:,} edited by {interaction.user.id}", ) except discord.NotFound: pass changes: list = case_dict["changes"] if len(changes) > 25: await interaction.response.send_message( content=error( "Due to limitations with Discord's embed system, you cannot edit a case more than 25 times." ), ephemeral=True, ) return if not changes: changes.append( { "type": "ORIGINAL", "timestamp": case_dict["timestamp"], "reason": case_dict["reason"], "user_id": case_dict["moderator_id"], "duration": case_dict["duration"], "end_timestamp": case_dict["end_timestamp"], } ) if parsed_time: changes.append( { "type": "EDIT", "timestamp": int(time.time()), "reason": reason, "user_id": interaction.user.id, "duration": convert_timedelta_to_str(parsed_time), "end_timestamp": end_timestamp, } ) else: changes.append( { "type": "EDIT", "timestamp": int(time.time()), "reason": reason, "user_id": interaction.user.id, "duration": case_dict["duration"], "end_timestamp": case_dict["end_timestamp"], } ) database = connect() cursor = database.cursor() if parsed_time: update_query = f"UPDATE `moderation_{interaction.guild.id}` SET changes = ?, reason = ?, duration = ?, end_timestamp = ? WHERE moderation_id = ?" cursor.execute( update_query, ( json.dumps(changes), reason, convert_timedelta_to_str(parsed_time), end_timestamp, case, ), ) else: update_query = f"UPDATE `moderation_{interaction.guild.id}` SET changes = ?, reason = ? WHERE moderation_id = ?" cursor.execute(update_query, (json.dumps(changes), reason, case)) database.commit() new_case = await fetch_case(case, interaction.guild.id) embed = await case_factory(interaction=interaction, case_dict=new_case) await interaction.response.send_message( content=f"✅ Moderation #{case:,} edited!", embed=embed, ephemeral=True, ) await log(interaction, case) cursor.close() database.close() return await interaction.response.send_message( content=error(f"No case with case number `{case}` found."), ephemeral=True ) @tasks.loop(minutes=1) async def handle_expiry(self): current_time = time.time() database = connect() cursor = database.cursor() global_num = 0 guilds: list[discord.Guild] = self.bot.guilds for guild in guilds: if not await self.bot.cog_disabled_in_guild(self, guild): time_per_guild = time.time() tempban_query = f"SELECT target_id, moderation_id FROM moderation_{guild.id} WHERE end_timestamp != 0 AND end_timestamp <= ? AND moderation_type = 'TEMPBAN' AND expired = 0" try: cursor.execute(tempban_query, (time.time(),)) result = cursor.fetchall() except sqlite3.OperationalError: continue target_ids = [row[0] for row in result] moderation_ids = [row[1] for row in result] num = 0 for target_id, moderation_id in zip(target_ids, moderation_ids): user: discord.User = await self.bot.fetch_user(target_id) name = ( f"{user.name}#{user.discriminator}" if user.discriminator != "0" else user.name ) try: await guild.unban( user, reason=f"Automatic unban from case #{moderation_id}" ) embed = await message_factory( await self.bot.get_embed_color(guild.channels[0]), guild=guild, reason=f"Automatic unban from case #{moderation_id}", moderation_type="unbanned", ) try: await user.send(embed=embed) except discord.errors.HTTPException: pass logger.debug( "Unbanned %s (%s) from %s (%s)", name, user.id, guild.name, guild.id, ) num = num + 1 except ( discord.errors.NotFound, discord.errors.Forbidden, discord.errors.HTTPException, ) as e: logger.error( "Failed to unban %s (%s) from %s (%s)\n%s", name, user.id, guild.name, guild.id, e, ) expiry_query = f"UPDATE `moderation_{guild.id}` SET expired = 1 WHERE (end_timestamp != 0 AND end_timestamp <= ? AND expired = 0 AND moderation_type != 'BLACKLIST') OR (expired = 0 AND resolved = 1 AND moderation_type != 'BLACKLIST')" cursor.execute(expiry_query, (time.time(),)) blacklist_query = f"SELECT target_id, moderation_id, role_id FROM moderation_{guild.id} WHERE end_timestamp != 0 AND end_timestamp <= ? AND moderation_type = 'BLACKLIST' AND expired = 0" try: cursor.execute(blacklist_query, (time.time(),)) result = cursor.fetchall() except sqlite3.OperationalError: continue target_ids = [row[0] for row in result] moderation_ids = [row[1] for row in result] role_ids = [row[2] for row in result] for target_id, moderation_id, role_id in zip( target_ids, moderation_ids, role_ids ): try: # member: discord.Member = await guild.fetch_member(target_id) role: discord.Role = guild.get_role(role_id) if role is None: raise discord.errors.NotFound except ( discord.errors.NotFound, discord.errors.Forbidden, discord.errors.HTTPException, ): continue per_guild_completion_time = (time.time() - time_per_guild) * 1000 logger.debug( "Completed expiry loop for %s (%s) in %sms with %s users unbanned", guild.name, guild.id, f"{per_guild_completion_time:.6f}", num, ) global_num = global_num + num database.commit() cursor.close() database.close() completion_time = (time.time() - current_time) * 1000 logger.debug( "Completed expiry loop in %sms with %s users unbanned", f"{completion_time:.6f}", global_num, ) ######################################################################################################################## ### Configuration Commands # ######################################################################################################################## @commands.group(autohelp=True, aliases=["moderation", "mod"]) async def aurora(self, ctx: commands.Context): """Settings and miscellaneous commands for Aurora.""" @aurora.group(autohelp=True, name="settings", aliases=["config", "options", "set"]) async def aurora_settings(self, ctx: commands.Context): """Configure Aurora's settings.""" @aurora_settings.command(name="overrides", aliases=["override", "user"]) async def aurora_settings_overrides(self, ctx: commands.Context): """Manage Aurora's user overriddable settings.""" await ctx.send(embed=await overrides_embed(ctx), view=Overrides(ctx)) @aurora_settings.command(name="guild", aliases=["server"]) @commands.admin_or_permissions(manage_guild=True) @commands.guild_only() async def aurora_settings_guild(self, ctx: commands.Context): """Manage Aurora's guild settings.""" await ctx.send(embed=await guild_embed(ctx), view=Guild(ctx)) @aurora_settings.command(name="addrole", aliases=["removerole"]) @commands.admin_or_permissions(manage_guild=True) @commands.guild_only() async def aurora_settings_addrole(self, ctx: commands.Context): """Manage the addrole whitelist. Roles added to this list are also applied to `/removerole`.""" await ctx.send(embed=await addrole_embed(ctx), view=Addrole(ctx)) @aurora_settings.command(name="immunity") @commands.admin_or_permissions(manage_guild=True) @commands.guild_only() async def aurora_settings_immunity(self, ctx: commands.Context): """Manage the immunity whitelist.""" await ctx.send(embed=await immune_embed(ctx), view=Immune(ctx)) @aurora.group(autohelp=True, name="import") @commands.admin() @commands.guild_only() async def aurora_import(self, ctx: commands.Context): """Import moderation history from other bots.""" @aurora_import.command(name="aurora") @commands.admin() async def aurora_import_aurora(self, ctx: commands.Context): """Import moderation history from another bot using Aurora.""" if ( ctx.message.attachments and ctx.message.attachments[0].content_type == "application/json; charset=utf-8" ): message = await ctx.send( warning( "Are you sure you want to import moderations from another bot?\n**This will overwrite any moderations that already exist in this guild's moderation table.**\n*The import process will block the rest of your bot until it is complete.*" ) ) await message.edit(view=ImportAuroraView(60, ctx, message)) else: await ctx.send(error("Please provide a valid Aurora export file.")) @aurora_import.command(name="galacticbot") @commands.admin() async def aurora_import_galacticbot(self, ctx: commands.Context): """Import moderation history from GalacticBot.""" if ( ctx.message.attachments and ctx.message.attachments[0].content_type == "application/json; charset=utf-8" ): message = await ctx.send( warning( "Are you sure you want to import GalacticBot moderations?\n**This will overwrite any moderations that already exist in this guild's moderation table.**\n*The import process will block the rest of your bot until it is complete.*" ) ) await message.edit(view=ImportGalacticBotView(60, ctx, message)) else: await ctx.send( error("Please provide a valid GalacticBot moderation export file.") ) @aurora.command(aliases=["tdc", "td", "timedeltaconvert"]) async def timedelta(self, ctx: commands.Context, *, duration: str): """This command converts a duration to a [`timedelta`](https://docs.python.org/3/library/datetime.html#datetime.timedelta) Python object. **Example usage** `[p]aurora timedelta 1 day 15hr 82 minutes 52s` **Output** `1 day, 16:22:52`""" try: parsed_time = parse(duration, as_timedelta=True, raise_exception=True) await ctx.send(f"`{str(parsed_time)}`") except ValueError: await ctx.send(error("Please provide a convertible value!"))