# _____ _ # / ____| (_) # | (___ ___ __ _ _____ ___ _ __ ___ _ __ ___ ___ _ __ # \___ \ / _ \/ _` / __\ \ /\ / / | '_ ` _ \| '_ ` _ \ / _ \ '__| # ____) | __/ (_| \__ \\ V V /| | | | | | | | | | | | __/ | # |_____/ \___|\__,_|___/ \_/\_/ |_|_| |_| |_|_| |_| |_|\___|_| import json import time import os from datetime import datetime, timedelta, timezone import discord import humanize import mysql.connector from discord.ext import tasks from pytimeparse2 import disable_dateutil, parse from redbot.core import app_commands, checks, Config, commands, data_manager from redbot.core.app_commands import Choice from .database import connect, create_guild_table, fetch_case, mysql_log from .embed_factory import embed_factory from .utils import check_conf, check_permissions, check_moddable, fetch_channel_dict, fetch_user_dict, generate_dict, log from .logger import logger class Moderation(commands.Cog): """Custom moderation cog. Developed by SeaswimmerTheFsh.""" async def red_delete_data_for_user(self, *, requester, user_id: int): if requester == "discord_deleted_user": await self.config.user_from_id(user_id).clear() database = await connect() cursor = database.cursor() cursor.execute("SHOW TABLES;") tables = [table[0] for table in cursor.fetchall()] condition = "target_id = %s OR moderator_id = %s;" for table in tables: delete_query = f"DELETE FROM {table[0]} WHERE {condition}" cursor.execute(delete_query, (user_id, user_id)) database.commit() cursor.close() database.close() if requester == "owner": await self.config.user_from_id(user_id).clear() if requester == "user": await self.config.user_from_id(user_id).clear() if requester == "user_strict": await self.config.user_from_id(user_id).clear() else: logger.warning("Invalid requester passed to red_delete_data_for_user: %s", requester) def __init__(self, bot): self.bot = bot self.config = Config.get_conf(self, identifier=481923957134912) self.config.register_global( mysql_address= " ", mysql_database = " ", mysql_username = " ", mysql_password = " " ) self.config.register_guild( use_discord_permissions = True, ignore_other_bots = True, dm_users = True, log_channel = " ", immune_roles = [], history_ephemeral = False, history_inline = False, history_pagesize = 5, history_inline_pagesize = 6, blacklist_roles = [] ) self.config.register_user( history_ephemeral = None, history_inline = None, history_pagesize = None, history_inline_pagesize = None ) disable_dateutil() self.handle_expiry.start() # pylint: disable=no-member async def cog_load(self): """This method prepares the database schema for all of the guilds the bot is currently in.""" conf = await check_conf([ 'mysql_address', 'mysql_database', 'mysql_username', 'mysql_password' ]) if conf: logger.error("Failed to create tables, due to MySQL connection configuration being unset.") return guilds: list[discord.Guild] = self.bot.guilds try: for guild in guilds: if not await self.bot.cog_disabled_in_guild(self, guild): await create_guild_table(guild) except ConnectionRefusedError: return async def cog_unload(self): self.handle_expiry.cancel() # pylint: disable=no-member @commands.Cog.listener('on_guild_join') async def db_generate_guild_join(self, guild: discord.Guild): """This method prepares the database schema whenever the bot joins a guild.""" if not await self.bot.cog_disabled_in_guild(self, guild): conf = await check_conf([ 'mysql_address', 'mysql_database', 'mysql_username', 'mysql_password' ]) if conf: logger.error("Failed to create a table for %s, due to MySQL connection configuration being unset.", guild.id) return try: await create_guild_table(guild) except ConnectionRefusedError: return @commands.Cog.listener('on_audit_log_entry_create') async def autologger(self, entry: discord.AuditLogEntry): """This method automatically logs moderations done by users manually ("right clicks").""" if not await self.bot.cog_disabled_in_guild(self, entry.guild): if await self.config.guild(entry.guild).ignore_other_bots() is True: if entry.user.bot or entry.target.bot: return else: if entry.user.id == self.bot.user.id: return duration = "NULL" if entry.reason: reason = entry.reason + " (This action was performed without the bot.)" else: reason = "This action was performed without the bot." if entry.action == discord.AuditLogAction.kick: moderation_type = 'KICK' elif entry.action == discord.AuditLogAction.ban: moderation_type = 'BAN' elif entry.action == discord.AuditLogAction.unban: moderation_type = 'UNBAN' elif entry.action == discord.AuditLogAction.member_update: if entry.after.timed_out_until is not None: timed_out_until_aware = entry.after.timed_out_until.replace(tzinfo=timezone.utc) duration_datetime = timed_out_until_aware - datetime.now(tz=timezone.utc) minutes = round(duration_datetime.total_seconds() / 60) duration = timedelta(minutes=minutes) moderation_type = 'MUTE' else: moderation_type = 'UNMUTE' else: return await mysql_log(entry.guild.id, entry.user.id, moderation_type, entry.target.id, 0, duration, reason) ####################################################################################################################### ### COMMANDS ####################################################################################################################### @app_commands.command(name="note") async def note(self, interaction: discord.Interaction, target: discord.User, reason: str, silent: bool = None): """Add a note to a user. Parameters ----------- target: discord.User Who are you noting? reason: str Why are you noting this user? silent: bool Should the user be messaged?""" if not await check_moddable(target, interaction, ['moderate_members']): return await interaction.response.send_message(content=f"{target.mention} has recieved a note!\n**Reason** - `{reason}`") if silent is None: silent = not await self.config.guild(interaction.guild).dm_users() if silent is False: try: embed = await embed_factory('message', await self.bot.get_embed_color(None), guild=interaction.guild, reason=reason, moderation_type='note', response=await interaction.original_response()) await target.send(embed=embed) except discord.errors.HTTPException: pass moderation_id = await mysql_log(interaction.guild.id, interaction.user.id, 'NOTE', 'USER', target.id, 0, 'NULL', reason) await interaction.edit_original_response(content=f"{target.mention} has received a note! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`") await log(interaction, moderation_id) @app_commands.command(name="warn") async def warn(self, interaction: discord.Interaction, target: discord.Member, reason: str, silent: bool = None): """Warn a user. Parameters ----------- target: discord.Member Who are you warning? reason: str Why are you warning this user? silent: bool Should the user be messaged?""" if not await check_moddable(target, interaction, ['moderate_members']): return await interaction.response.send_message(content=f"{target.mention} has been warned!\n**Reason** - `{reason}`") if silent is None: silent = not await self.config.guild(interaction.guild).dm_users() if silent is False: try: embed = await embed_factory('message', await self.bot.get_embed_color(None), guild=interaction.guild, reason=reason, moderation_type='warned', response=await interaction.original_response()) await target.send(embed=embed) except discord.errors.HTTPException: pass moderation_id = await mysql_log(interaction.guild.id, interaction.user.id, 'WARN', 'USER', target.id, 0, 'NULL', reason) await interaction.edit_original_response(content=f"{target.mention} has been warned! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`") await log(interaction, moderation_id) async def blacklist_autocomplete(self, interaction: discord.Interaction, current: str,) -> list[app_commands.Choice[str]]: # pylint: disable=unused-argument """Autocompletes a blacklist role.""" blacklist_roles = await self.config.guild(interaction.guild).blacklist_roles() if blacklist_roles: return [app_commands.Choice(name=role.name, value=role.id) for role in interaction.guild.roles if role.id in blacklist_roles] return [] @app_commands.command(name="blacklist") @app_commands.autocomplete(role=blacklist_autocomplete) async def blacklist(self, interaction: discord.Interaction, target: discord.Member, role: str, reason: str, silent: bool = None): """Add a blacklist role to a user. Parameters ----------- target: discord.Member Who are you blacklisting? role: str What blacklist type are you applying to the target? reason: str Why are you blacklisting this user? silent: bool Should the user be messaged?""" blacklist_roles = await self.config.guild(interaction.guild).blacklist_roles() if not blacklist_roles: await interaction.response.send_message(content="There are no blacklist types set for this server!", ephemeral=True) return matching_role = None for role_dict in blacklist_roles: if role_dict['id'] == role: matching_role = role_dict break if not matching_role: await interaction.response.send_message(content="Please provide a valid blacklist type!", ephemeral=True) return if not await check_moddable(target, interaction, ['moderate_members', 'manage_roles']): return if role in [role.id for role in target.roles]: await interaction.response.send_message(content=f"{target.mention} already has the blacklist role!", ephemeral=True) return if silent is None: silent = not await self.config.guild(interaction.guild).dm_users() if silent is False: try: embed = await embed_factory('message', await self.bot.get_embed_color(None), guild=interaction.guild, reason=reason, moderation_type='blacklisted', response=await interaction.original_response(), duration=matching_role['duration']) await target.send(embed=embed) except discord.errors.HTTPException: pass role_obj = interaction.guild.get_role(role) await target.add_roles(role, reason=f"Blacklisted by {interaction.user.id} for {humanize.precisedelta(matching_role['duration'])} for: {reason}") await interaction.response.send_message(content=f"{target.mention} has been blacklisted with the {role_obj.name} role for {humanize.precisedelta(matching_role['duration'])}!\n**Reason** - `{reason}`") moderation_id = await mysql_log(interaction.guild.id, interaction.user.id, 'BLACKLIST', 'USER', target.id, role, matching_role['duration'], reason) await interaction.edit_original_response(content=f"{target.mention} has been blacklisted with the {role_obj.name} role for {humanize.precisedelta(matching_role['duration'])}! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`") await log(interaction, moderation_id) @app_commands.command(name="mute") async def mute(self, interaction: discord.Interaction, target: discord.Member, duration: str, reason: str, silent: bool = None): """Mute a user. Parameters ----------- target: discord.Member Who are you unbanning? duration: str How long are you muting this user for? reason: str Why are you unbanning this user? silent: bool Should the user be messaged?""" if not await check_moddable(target, interaction, ['moderate_members']): return if target.is_timed_out() is True: await interaction.response.send_message(f"{target.mention} is already muted!", allowed_mentions=discord.AllowedMentions(users=False), ephemeral=True) return try: parsed_time = parse(sval=duration, as_timedelta=True, raise_exception=True) except ValueError: await interaction.response.send_message("Please provide a valid duration!", ephemeral=True) return if parsed_time.total_seconds() / 1000 > 2419200000: await interaction.response.send_message("Please provide a duration that is less than 28 days.") return await target.timeout(parsed_time, reason=f"Muted by {interaction.user.id} for: {reason}") await interaction.response.send_message(content=f"{target.mention} has been muted for {humanize.precisedelta(parsed_time)}!\n**Reason** - `{reason}`") if silent is None: silent = not await self.config.guild(interaction.guild).dm_users() if silent is False: try: embed = await embed_factory('message', await self.bot.get_embed_color(None), guild=interaction.guild, reason=reason, moderation_type='muted', response=await interaction.original_response(), duration=parsed_time) await target.send(embed=embed) except discord.errors.HTTPException: pass moderation_id = await mysql_log(interaction.guild.id, interaction.user.id, 'MUTE', 'USER', target.id, 0, parsed_time, reason) await interaction.edit_original_response(content=f"{target.mention} has been muted for {humanize.precisedelta(parsed_time)}! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`") await log(interaction, moderation_id) @app_commands.command(name="unmute") async def unmute(self, interaction: discord.Interaction, target: discord.Member, reason: str = None, silent: bool = None): """Unmute a user. Parameters ----------- target: discord.user Who are you unmuting? reason: str Why are you unmuting this user? silent: bool Should the user be messaged?""" if not await check_moddable(target, interaction, ['moderate_members']): return if target.is_timed_out() is False: await interaction.response.send_message(f"{target.mention} is not muted!", allowed_mentions=discord.AllowedMentions(users=False), ephemeral=True) return if reason: await target.timeout(None, reason=f"Unmuted by {interaction.user.id} for: {reason}") else: await target.timeout(None, reason=f"Unbanned by {interaction.user.id}") reason = "No reason given." await interaction.response.send_message(content=f"{target.mention} has been unmuted!\n**Reason** - `{reason}`") if silent is None: silent = not await self.config.guild(interaction.guild).dm_users() if silent is False: try: embed = await embed_factory('message', await self.bot.get_embed_color(None), guild=interaction.guild, reason=reason, moderation_type='unmuted', response=await interaction.original_response()) await target.send(embed=embed) except discord.errors.HTTPException: pass moderation_id = await mysql_log(interaction.guild.id, interaction.user.id, 'UNMUTE', 'USER', target.id, 0, 'NULL', reason) await interaction.edit_original_response(content=f"{target.mention} has been unmuted! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`") await log(interaction, moderation_id) @app_commands.command(name="kick") async def kick(self, interaction: discord.Interaction, target: discord.Member, reason: str, silent: bool = None): """Kick a user. Parameters ----------- target: discord.user Who are you kicking? reason: str Why are you kicking this user? silent: bool Should the user be messaged?""" if not await check_moddable(target, interaction, ['kick_members']): return await interaction.response.send_message(content=f"{target.mention} has been kicked!\n**Reason** - `{reason}`") if silent is None: silent = not await self.config.guild(interaction.guild).dm_users() if silent is False: try: embed = await embed_factory('message', await self.bot.get_embed_color(None), guild=interaction.guild, reason=reason, moderation_type='kicked', response=await interaction.original_response()) await target.send(embed=embed) except discord.errors.HTTPException: pass await target.kick(reason=f"Kicked by {interaction.user.id} for: {reason}") moderation_id = await mysql_log(interaction.guild.id, interaction.user.id, 'KICK', 'USER', target.id, 0, 'NULL', reason) await interaction.edit_original_response(content=f"{target.mention} has been kicked! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`") await log(interaction, moderation_id) @app_commands.command(name="ban") @app_commands.choices(delete_messages=[ Choice(name="None", value=0), Choice(name='1 Hour', value=3600), Choice(name='12 Hours', value=43200), Choice(name='1 Day', value=86400), Choice(name='3 Days', value=259200), Choice(name='7 Days', value=604800), ]) async def ban(self, interaction: discord.Interaction, target: discord.User, reason: str, duration: str = None, delete_messages: Choice[int] = 0, silent: bool = None): """Ban a user. Parameters ----------- target: discord.user Who are you banning? duration: str How long are you banning this user for? reason: str Why are you banning this user? delete_messages: Choices[int] How many days of messages to delete? silent: bool Should the user be messaged?""" if not await check_moddable(target, interaction, ['ban_members']): return try: await interaction.guild.fetch_ban(target) await interaction.response.send_message(content=f"{target.mention} is already banned!", ephemeral=True) return except discord.errors.NotFound: pass if duration: try: parsed_time = parse(sval=duration, as_timedelta=True, raise_exception=True) except ValueError: await interaction.response.send_message("Please provide a valid duration!", ephemeral=True) return await interaction.response.send_message(content=f"{target.mention} has been banned for {humanize.precisedelta(parsed_time)}!\n**Reason** - `{reason}`") try: embed = await embed_factory('message', await self.bot.get_embed_color(None), guild=interaction.guild, reason=reason, moderation_type='tempbanned', response=await interaction.original_response(), duration=parsed_time) await target.send(embed=embed) except discord.errors.HTTPException: pass await interaction.guild.ban(target, reason=f"Tempbanned by {interaction.user.id} for: {reason} (Duration: {parsed_time})", delete_message_seconds=delete_messages) moderation_id = await mysql_log(interaction.guild.id, interaction.user.id, 'TEMPBAN', target.id, 0, parsed_time, reason) await interaction.edit_original_response(content=f"{target.mention} has been banned for {humanize.precisedelta(parsed_time)}! (Case `#{moderation_id}`)\n**Reason** - `{reason}`") await log(interaction, moderation_id) else: await interaction.response.send_message(content=f"{target.mention} has been banned!\n**Reason** - `{reason}`") if silent is None: silent = not await self.config.guild(interaction.guild).dm_users() if silent is False: try: embed = embed = await embed_factory('message', await self.bot.get_embed_color(None), guild=interaction.guild, reason=reason, moderation_type='banned', response=await interaction.original_response()) await target.send(embed=embed) except discord.errors.HTTPException: pass await interaction.guild.ban(target, reason=f"Banned by {interaction.user.id} for: {reason}", delete_message_seconds=delete_messages) moderation_id = await mysql_log(interaction.guild.id, interaction.user.id, 'BAN', 'USER', target.id, 0, 'NULL', reason) await interaction.edit_original_response(content=f"{target.mention} has been banned! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`") await log(interaction, moderation_id) @app_commands.command(name="unban") async def unban(self, interaction: discord.Interaction, target: discord.User, reason: str = None, silent: bool = None): """Unban a user. Parameters ----------- target: discord.user Who are you unbanning? reason: str Why are you unbanning this user? silent: bool Should the user be messaged?""" if not await check_moddable(target, interaction, ['ban_members']): return try: await interaction.guild.fetch_ban(target) except discord.errors.NotFound: await interaction.response.send_message(content=f"{target.mention} is not banned!", ephemeral=True) return if reason: await interaction.guild.unban(target, reason=f"Unbanned by {interaction.user.id} for: {reason}") else: await interaction.guild.unban(target, reason=f"Unbanned by {interaction.user.id}") reason = "No reason given." await interaction.response.send_message(content=f"{target.mention} has been unbanned!\n**Reason** - `{reason}`") if silent is None: silent = not await self.config.guild(interaction.guild).dm_users() if silent is False: try: embed = await embed_factory('message', await self.bot.get_embed_color(None), guild=interaction.guild, reason=reason, moderation_type='unbanned', response=await interaction.original_response()) await target.send(embed=embed) except discord.errors.HTTPException: pass moderation_id = await mysql_log(interaction.guild.id, interaction.user.id, 'UNBAN', 'USER', target.id, 0, 'NULL', reason) await interaction.edit_original_response(content=f"{target.mention} has been unbanned! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`") await log(interaction, moderation_id) @app_commands.command(name="history") async def history(self, interaction: discord.Interaction, target: discord.User = None, moderator: discord.User = None, pagesize: app_commands.Range[int, 1, 20] = None, page: int = 1, ephemeral: bool = None, inline: bool = None, export: bool = False): """List previous infractions. Parameters ----------- target: discord.User User whose infractions to query, overrides moderator if both are given moderator: discord.User Query by moderator pagesize: app_commands.Range[int, 1, 25] Amount of infractions to list per page page: int Page to select ephemeral: bool Hide the command response inline: bool Display infractions in a grid arrangement (does not look very good) export: bool Exports the server's entire moderation history to a JSON file""" if ephemeral is None: ephemeral = (await self.config.user(interaction.user).history_ephemeral() or await self.config.guild(interaction.guild).history_ephemeral() or False) if inline is None: inline = (await self.config.user(interaction.user).history_inline() or await self.config.guild(interaction.guild).history_inline() or False) if pagesize is None: if inline is True: pagesize = (await self.config.user(interaction.user).history_inline_pagesize() or await self.config.guild(interaction.guild).history_inline_pagesize() or 6) else: pagesize = (await self.config.user(interaction.user).history_pagesize() or await self.config.guild(interaction.guild).history_pagesize() or 5) await interaction.response.defer(ephemeral=ephemeral) permissions = check_permissions(interaction.client.user, ['embed_links'], interaction) if permissions: await interaction.followup.send(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True) return database = await connect() cursor = database.cursor() if target: query = """SELECT * FROM moderation_%s WHERE target_id = %s ORDER BY moderation_id DESC;""" cursor.execute(query, (interaction.guild.id, target.id)) elif moderator: query = """SELECT * FROM moderation_%s WHERE moderator_id = %s ORDER BY moderation_id DESC;""" cursor.execute(query, (interaction.guild.id, moderator.id)) else: query = """SELECT * FROM moderation_%s ORDER BY moderation_id DESC;""" cursor.execute(query, (interaction.guild.id,)) results = cursor.fetchall() result_dict_list = [] for result in results: case_dict = generate_dict(result) if case_dict['moderation_id'] == 0: continue result_dict_list.append(case_dict) if export: try: filename = str(data_manager.cog_data_path(cog_instance=self)) + str(os.sep) + f"moderation_{interaction.guild.id}.json" with open(filename, "w", encoding="utf-8") as f: json.dump(result_dict_list, f, indent=2) await interaction.followup.send(file=discord.File(filename, f"moderation_{interaction.guild.id}.json"), ephemeral=ephemeral) os.remove(filename) return except json.JSONDecodeError as e: await interaction.followup.send(content=f"An error occured while exporting the moderation history.\nError:\n```{e}```", ephemeral=ephemeral) return case_quantity = len(result_dict_list) page_quantity = round(case_quantity / pagesize) start_index = (page - 1) * pagesize end_index = page * pagesize embed = discord.Embed(color=await self.bot.get_embed_color(None)) embed.set_author(icon_url=interaction.guild.icon.url, name='Infraction History') embed.set_footer(text=f"Page {page:,}/{page_quantity:,} | {case_quantity:,} Results") memory_dict = {} for case in result_dict_list[start_index:end_index]: if case['target_id'] not in memory_dict: if case['target_type'] == 'USER': memory_dict[str(case['target_id'])] = await fetch_user_dict(interaction, case['target_id']) elif case['target_type'] == 'CHANNEL': memory_dict[str(case['target_id'])] = await fetch_channel_dict(interaction, case['target_id']) target_user = memory_dict[str(case['target_id'])] if case['target_type'] == 'USER': target_name = f"`{target_user['name']}`" if target_user['discriminator'] == "0" else f"`{target_user['name']}#{target_user['discriminator']}`" elif case['target_type'] == 'CHANNEL': target_name = f"`{target_user['mention']}`" if case['moderator_id'] not in memory_dict: memory_dict[str(case['moderator_id'])] = await fetch_user_dict(interaction, case['moderator_id']) moderator_user = memory_dict[str(case['moderator_id'])] moderator_name = f"`{moderator_user['name']}`" if moderator_user['discriminator'] == "0" else f"`{moderator_user['name']}#{moderator_user['discriminator']}`" field_name = f"Case #{case['moderation_id']:,} ({str.title(case['moderation_type'])})" field_value = f"**Target:** {target_name} ({target_user['id']})\n**Moderator:** {moderator_name} ({moderator_user['id']})" if len(case['reason']) > 125: field_value += f"\n**Reason:** `{str(case['reason'])[:125]}...`" else: field_value += f"\n**Reason:** `{str(case['reason'])}`" if case['duration'] != 'NULL': td = timedelta(**{unit: int(val) for unit, val in zip(["hours", "minutes", "seconds"], case["duration"].split(":"))}) duration_embed = f"{humanize.precisedelta(td)} | " if bool(case['expired']) is False else f"{humanize.precisedelta(td)} | Expired" field_value += f"\n**Duration:** {duration_embed}" field_value += f"\n**Timestamp:** | " if bool(case['resolved']): field_value += "\n**Resolved:** True" embed.add_field(name=field_name, value=field_value, inline=inline) await interaction.followup.send(embed=embed, ephemeral=ephemeral) @app_commands.command(name="resolve") async def resolve(self, interaction: discord.Interaction, case: int, reason: str = None): """Resolve a specific case. Parameters ----------- case: int Case number of the case you're trying to resolve reason: str Reason for resolving case""" permissions = check_permissions(interaction.client.user, ['embed_links', 'moderate_members', 'ban_members'], interaction) if permissions: await interaction.response.send_message(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True) return conf = await check_conf(['mysql_database']) if conf: raise(LookupError) database = await connect() cursor = database.cursor() db = await self.config.mysql_database() query_1 = "SELECT * FROM moderation_%s WHERE moderation_id = %s;" cursor.execute(query_1, (interaction.guild.id, case)) result_1 = cursor.fetchone() if result_1 is None or case == 0: await interaction.response.send_message(content=f"There is no moderation with a case number of {case}.", ephemeral=True) return query_2 = "SELECT * FROM moderation_%s WHERE moderation_id = %s AND resolved = 0;" cursor.execute(query_2, (interaction.guild.id, case)) result_2 = cursor.fetchone() if result_2 is None: await interaction.response.send_message(content=f"This moderation has already been resolved!\nUse `/case {case}` for more information.", ephemeral=True) return case_dict = generate_dict(result_2) if reason is None: reason = "No reason given." changes: list = case_dict['changes'] if len(changes) > 25: await interaction.response.send_message(content="Due to limitations with Discord's embed system, you cannot edit a case more than 25 times.", ephemeral=True) return if not changes: changes.append( { 'type': "ORIGINAL", 'timestamp': case_dict['timestamp'], 'reason': case_dict['reason'], 'user_id': case_dict['moderator_id'] } ) changes.append( { 'type': "RESOLVE", 'timestamp': int(time.time()), 'reason': reason, 'user_id': interaction.user.id } ) if case_dict['moderation_type'] in ['UNMUTE', 'UNBAN']: await interaction.response.send_message(content="You cannot resolve this type of moderation!", ephemeral=True) if case_dict['moderation_type'] in ['MUTE', 'TEMPBAN', 'BAN']: if case_dict['moderation_type'] == 'MUTE': try: member = await interaction.guild.fetch_member(case_dict['target_id']) await member.timeout(None, reason=f"Case #{case:,} resolved by {interaction.user.id}") except discord.NotFound: pass if case_dict['moderation_type'] in ['TEMPBAN', 'BAN']: try: user = await interaction.client.fetch_user(case_dict['target_id']) await interaction.guild.unban(user, reason=f"Case #{case} resolved by {interaction.user.id}") except discord.NotFound: pass resolve_query = f"UPDATE `{db}`.`moderation_{interaction.guild.id}` SET resolved = 1, changes = %s, resolved_by = %s, resolve_reason = %s WHERE moderation_id = %s" else: resolve_query = f"UPDATE `{db}`.`moderation_{interaction.guild.id}` SET resolved = 1, changes = %s, resolved_by = %s, resolve_reason = %s WHERE moderation_id = %s" cursor.execute(resolve_query, (json.dumps(changes), interaction.user.id, reason, case_dict['moderation_id'])) database.commit() embed = await embed_factory('case', await self.bot.get_embed_color(None), interaction=interaction, case_dict=await fetch_case(case, interaction.guild.id)) await interaction.response.send_message(content=f"✅ Moderation #{case:,} resolved!", embed=embed) await log(interaction, case) cursor.close() database.close() @app_commands.command(name="case") @app_commands.choices(export=[ Choice(name='Export as File', value='file'), Choice(name='Export as Codeblock', value='codeblock') ]) async def case(self, interaction: discord.Interaction, case: int, ephemeral: bool = None, changes: bool = False, export: Choice[str] = None): """Check the details of a specific case. Parameters ----------- case: int What case are you looking up? ephemeral: bool Hide the command response changes: bool List the changes made to the case export: bool Export the case to a JSON file or codeblock""" permissions = check_permissions(interaction.client.user, ['embed_links'], interaction) if permissions: await interaction.response.send_message(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True) return if ephemeral is None: ephemeral = (await self.config.user(interaction.user).history_ephemeral() or await self.config.guild(interaction.guild).history_ephemeral() or False) if case != 0: case_dict = await fetch_case(case, interaction.guild.id) if case_dict: if export: if export.value == 'file' or len(str(case_dict)) > 1800: filename = str(data_manager.cog_data_path(cog_instance=self)) + str(os.sep) + f"moderation_{interaction.guild.id}_case_{case}.json" with open(filename, "w", encoding="utf-8") as f: json.dump(case_dict, f, indent=2) if export.value == 'codeblock': content = f"Case #{case:,} exported.\n*Case was too large to export as codeblock, so it has been uploaded as a `.json` file.*" else: content = f"Case #{case:,} exported." await interaction.response.send_message(content=content, file=discord.File(filename, f"moderation_{interaction.guild.id}_case_{case}.json"), ephemeral=ephemeral) os.remove(filename) return await interaction.response.send_message(content=f"```json\n{json.dumps(case_dict, indent=2)}```", ephemeral=ephemeral) return if changes: embed = await embed_factory('changes', await self.bot.get_embed_color(None), interaction=interaction, case_dict=case_dict) else: embed = await embed_factory('case', await self.bot.get_embed_color(None), interaction=interaction, case_dict=case_dict) await interaction.response.send_message(embed=embed, ephemeral=ephemeral) return await interaction.response.send_message(content=f"No case with case number `{case}` found.", ephemeral=True) @app_commands.command(name="edit") async def edit(self, interaction: discord.Interaction, case: int, reason: str, duration: str = None): """Edit the reason of a specific case. Parameters ----------- case: int What case are you editing? reason: str What is the new reason? duration: str What is the new duration? Does not reapply the moderation if it has already expired.""" permissions = check_permissions(interaction.client.user, ['embed_links'], interaction) if permissions: await interaction.response.send_message(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True) return if case != 0: parsed_time = None case_dict = await fetch_case(case, interaction.guild.id) if case_dict: conf = await check_conf(['mysql_database']) if conf: raise(LookupError) if duration: try: parsed_time = parse(sval=duration, as_timedelta=True, raise_exception=True) except ValueError: await interaction.response.send_message("Please provide a valid duration!", ephemeral=True) return end_timestamp = case_dict['timestamp'] + parsed_time.total_seconds() if case_dict['type'] == 'MUTE': if (time.time() - case_dict['timestamp']) + parsed_time.total_seconds() > 2419200: await interaction.response.send_message("Please provide a duration that is less than 28 days from the initial moderation.") return try: member = await interaction.guild.fetch_member(case_dict['target_id']) await member.timeout(parsed_time, reason=f"Case #{case:,} edited by {interaction.user.id}") except discord.NotFound: pass changes: list = case_dict['changes'] if len(changes) > 25: await interaction.response.send_message(content="Due to limitations with Discord's embed system, you cannot edit a case more than 25 times.", ephemeral=True) return if not changes: changes.append( { 'type': "ORIGINAL", 'timestamp': case_dict['timestamp'], 'reason': case_dict['reason'], 'user_id': case_dict['moderator_id'], 'duration': case_dict['duration'], 'end_timestamp': case_dict['end_timestamp'] } ) if parsed_time: changes.append( { 'type': "EDIT", 'timestamp': int(time.time()), 'reason': reason, 'user_id': interaction.user.id, 'duration': parsed_time, 'end_timestamp': end_timestamp } ) else: changes.append( { 'type': "EDIT", 'timestamp': int(time.time()), 'reason': reason, 'user_id': interaction.user.id, 'duration': case_dict['duration'], 'end_timestamp': case_dict['end_timestamp'] } ) database = await connect() cursor = database.cursor() db = await self.config.mysql_database() if parsed_time: update_query = f"UPDATE `{db}`.`moderation_{interaction.guild.id}` SET changes = %s, reason = %s, duration = %s, end_timestamp = %s WHERE moderation_id = %s" cursor.execute(update_query, (json.dumps(changes), reason, parsed_time, end_timestamp, case)) else: update_query = f"UPDATE `{db}`.`moderation_{interaction.guild.id}` SET changes = %s, reason = %s WHERE moderation_id = %s" cursor.execute(update_query, (json.dumps(changes), reason, case)) database.commit() new_case = await fetch_case(case, interaction.guild.id) embed = await embed_factory('case', await self.bot.get_embed_color(None), interaction=interaction, case_dict=new_case) await interaction.response.send_message(content=f"✅ Moderation #{case:,} edited!", embed=embed, ephemeral=True) await log(interaction, case) cursor.close() database.close() return await interaction.response.send_message(content=f"No case with case number `{case}` found.", ephemeral=True) @tasks.loop(minutes=1) async def handle_expiry(self): conf = await check_conf(['mysql_database']) if conf: raise(LookupError) database = await connect() cursor = database.cursor() db = await self.config.mysql_database() guilds: list[discord.Guild] = self.bot.guilds for guild in guilds: if not await self.bot.cog_disabled_in_guild(self, guild): tempban_query = f"SELECT target_id, moderation_id FROM moderation_{guild.id} WHERE end_timestamp != 0 AND end_timestamp <= %s AND moderation_type = 'TEMPBAN' AND expired = 0" try: cursor.execute(tempban_query, (time.time(),)) result = cursor.fetchall() except mysql.connector.errors.ProgrammingError: continue target_ids = [row[0] for row in result] moderation_ids = [row[1] for row in result] for target_id, moderation_id in zip(target_ids, moderation_ids): user: discord.User = await self.bot.fetch_user(target_id) try: await guild.unban(user, reason=f"Automatic unban from case #{moderation_id}") embed = await embed_factory('message', self.bot.get_embed_color(None), guild, f'Automatic unban from case #{moderation_id}', 'unbanned') try: await user.send(embed=embed) except discord.errors.HTTPException: pass except [discord.errors.NotFound, discord.errors.Forbidden, discord.errors.HTTPException] as e: print(f"Failed to unban {user.name}#{user.discriminator} ({user.id}) from {guild.name} ({guild.id})\n{e}") expiry_query = f"UPDATE `{db}`.`moderation_{guild.id}` SET expired = 1 WHERE (end_timestamp != 0 AND end_timestamp <= %s AND expired = 0 AND moderation_type != 'BLACKLIST') OR (expired = 0 AND resolved = 1 AND moderation_type != 'BLACKLIST')" cursor.execute(expiry_query, (time.time(),)) blacklist_query = f"SELECT target_id, moderation_id, role_id FROM moderation_{guild.id} WHERE end_timestamp != 0 AND end_timestamp <= %s AND moderation_type = 'BLACKLIST' AND expired = 0" try: cursor.execute(blacklist_query, (time.time(),)) result = cursor.fetchall() except mysql.connector.errors.ProgrammingError: continue target_ids = [row[0] for row in result] moderation_ids = [row[1] for row in result] role_ids = [row[2] for row in result] for target_id, moderation_id, role_id in zip(target_ids, moderation_ids, role_ids): try: member: discord.Member = await guild.fetch_member(target_id) role: discord.Role = guild.get_role(role_id) if role is None: raise discord.errors.NotFound except [discord.errors.NotFound, discord.errors.Forbidden, discord.errors.HTTPException]: continue database.commit() cursor.close() database.close() ####################################################################################################################### ### CONFIGURATION COMMANDS ####################################################################################################################### @commands.group(autohelp=True, aliases=['modset', 'moderationsettings']) async def moderationset(self, ctx: commands.Context): """Manage moderation commands.""" @moderationset.command(name='list', aliases=['view', 'show']) async def moderationset_list(self, ctx: commands.Context): """List all moderation settings.""" if ctx.guild: guild_settings = await self.config.guild(ctx.guild).all() guild_settings_string = "" for setting in guild_settings: if 'mysql' in setting or 'roles' in setting: continue if setting == 'log_channel': channel = ctx.guild.get_channel(guild_settings[setting]) guild_settings_string += f"**{setting}**: {channel.mention}\n" else: guild_settings_string += f"**{setting}**: {guild_settings[setting]}\n" user_settings = await self.config.user(ctx.author).all() user_settings_string = "" for setting in user_settings: user_settings_string += f"**{setting}**: {user_settings[setting]}\n" embed = discord.Embed(color=await self.bot.get_embed_color(None)) embed.set_author(icon_url=ctx.guild.icon.url, name=f"{ctx.guild.name} Moderation Settings") if ctx.guild: embed.add_field(name="Guild Settings", value=guild_settings_string) embed.add_field(name="User Settings", value=user_settings_string) await ctx.send(embed=embed) @moderationset.group(autohelp=True, name='history') async def moderationset_history(self, ctx: commands.Context): """Manage configuration for the /history command.""" @moderationset_history.command(name='ephemeral', aliases=['hidden', 'hide']) async def moderationset_history_user_ephemeral(self, ctx: commands.Context, enabled: bool): """Toggle if the /history command should be ephemeral.""" await self.config.user(ctx.author).history_ephemeral.set(enabled) await ctx.send(f"Ephemeral setting set to {enabled}") @moderationset_history.command(name='pagesize') async def moderationset_history_user_pagesize(self, ctx: commands.Context, pagesize: int): """Set the amount of cases to display per page.""" if pagesize > 20: await ctx.send("Pagesize cannot be greater than 20!") return if pagesize < 1: await ctx.send("Pagesize cannot be less than 1!") return await self.config.user(ctx.author).history_pagesize.set(pagesize) await ctx.send(f"Pagesize set to {await self.config.user(ctx.author).history_pagesize()}") @moderationset_history.group(name='inline') async def moderationset_history_inline(self, ctx: commands.Context): """Manage configuration for the /history command's inline argument.""" @moderationset_history_inline.command(name='toggle') async def moderationset_history_user_inline_toggle(self, ctx: commands.Context, enabled: bool): """Enable the /history command's inline argument by default.""" await self.config.user(ctx.author).history_inline.set(enabled) await ctx.send(f"Inline setting set to {enabled}") @moderationset_history_inline.command(name='pagesize') async def moderationset_history_user_inline_pagesize(self, ctx: commands.Context, pagesize: int): """Set the amount of cases to display per page.""" if pagesize > 20: await ctx.send("Pagesize cannot be greater than 20!") return if pagesize < 1: await ctx.send("Pagesize cannot be less than 1!") return await self.config.user(ctx.author).history_inline_pagesize.set(pagesize) await ctx.send(f"Inline pagesize set to {await self.config.user(ctx.author).history_inline_pagesize()}") @moderationset_history.group(autohelp=True, name='guild') @checks.admin() async def moderationset_history_guild(self, ctx: commands.Context): """Manage configuration for the /history command, per guild.""" @moderationset_history_guild.command(name='ephemeral', aliases=['hidden', 'hide']) @checks.admin() async def moderationset_history_guild_ephemeral(self, ctx: commands.Context, enabled: bool): """Toggle if the /history command should be ephemeral.""" await self.config.guild(ctx.guild).history_ephemeral.set(enabled) await ctx.send(f"Ephemeral setting set to {enabled}") @moderationset_history_guild.command(name='pagesize') @checks.admin() async def moderationset_history_guild_pagesize(self, ctx: commands.Context, pagesize: int): """Set the amount of cases to display per page.""" if pagesize > 20: await ctx.send("Pagesize cannot be greater than 20!") return if pagesize < 1: await ctx.send("Pagesize cannot be less than 1!") return await self.config.guild(ctx.guild).history_pagesize.set(pagesize) await ctx.send(f"Pagesize set to {await self.config.guild(ctx.guild).history_pagesize()}") @moderationset_history_guild.group(name='inline') @checks.admin() async def moderationset_history_guild_inline(self, ctx: commands.Context): """Manage configuration for the /history command's inline argument.""" @moderationset_history_guild_inline.command(name='toggle') @checks.admin() async def moderationset_history_guild_inline_toggle(self, ctx: commands.Context, enabled: bool): """Enable the /history command's inline argument by default.""" await self.config.guild(ctx.guild).history_inline.set(enabled) await ctx.send(f"Inline setting set to {enabled}") @moderationset_history_guild_inline.command(name='pagesize') @checks.admin() async def moderationset_history_guild_inline_pagesize(self, ctx: commands.Context, pagesize: int): """Set the amount of cases to display per page.""" if pagesize > 20: await ctx.send("Pagesize cannot be greater than 20!") return if pagesize < 1: await ctx.send("Pagesize cannot be less than 1!") return await self.config.guild(ctx.guild).history_inline_pagesize.set(pagesize) await ctx.send(f"Inline pagesize set to {await self.config.guild(ctx.guild).history_inline_pagesize()}") @moderationset.group(autohelp=True, name='immunity') @checks.admin() async def moderationset_immunity(self, ctx: commands.Context): """Manage configuration for immune roles.""" @moderationset_immunity.command(name='add') @checks.admin() async def moderationset_immunity_add(self, ctx: commands.Context, role: discord.Role): """Add a role to the immune roles list.""" immune_roles: list = await self.config.guild(ctx.guild).immune_roles() if role.id in immune_roles: await ctx.send("Role is already immune!") return immune_roles.append(role.id) await self.config.guild(ctx.guild).immune_roles.set(immune_roles) await ctx.send(f"Role {role.name} added to immune roles.") @moderationset_immunity.command(name='remove') @checks.admin() async def moderationset_immunity_remove(self, ctx: commands.Context, role: discord.Role): """Remove a role from the immune roles list.""" immune_roles: list = await self.config.guild(ctx.guild).immune_roles() if role.id not in immune_roles: await ctx.send("Role is not immune!") return immune_roles.remove(role.id) await self.config.guild(ctx.guild).immune_roles.set(immune_roles) await ctx.send(f"Role {role.name} removed from immune roles.") @moderationset_immunity.command(name='list') @checks.admin() async def moderationset_immunity_list(self, ctx: commands.Context): """List all immune roles.""" immune_roles: list = await self.config.guild(ctx.guild).immune_roles() if not immune_roles: await ctx.send("No immune roles set!") return role_list = "" for role_id in immune_roles: role = ctx.guild.get_role(role_id) if role: role_list += f"{role.mention}\n" if role_list: embed = discord.Embed(title="Immune Roles", description=role_list, color=await self.bot.get_embed_color(None)) await ctx.send(embed=embed) @moderationset.group(autohelp=True, name='blacklist') @checks.admin() async def moderationset_blacklist(self, ctx: commands.Context): """Manage configuration for the /blacklist command.""" @moderationset_blacklist.command(name='add') @checks.admin() async def moderationset_blacklist_add(self, ctx: commands.Context, role: discord.Role, duration: str): """Add a role to the blacklist.""" blacklist_roles: list = await self.config.guild(ctx.guild).blacklist_roles() for blacklist_role in blacklist_roles: if role.id == blacklist_role['role']: await ctx.send("Role already has an associated blacklist type!") return try: parsed_time = parse(sval=duration, as_timedelta=True, raise_exception=True) except ValueError: await ctx.send("Please provide a valid duration!") return blacklist_roles.append( { 'role': role.id, 'duration': str(parsed_time) } ) await self.config.guild(ctx.guild).blacklist_roles.set(blacklist_roles) await ctx.send(f"Role {role.mention} added as a blacklist type.", allowed_mentions=discord.AllowedMentions.none()) @moderationset_blacklist.command(name='remove') @checks.admin() async def moderationset_blacklist_remove(self, ctx: commands.Context, role: discord.Role): """Remove a role's blacklist type.""" blacklist_roles: list = await self.config.guild(ctx.guild).blacklist_roles() for blacklist_role in blacklist_roles: if role.id == blacklist_role['role']: blacklist_roles.remove(blacklist_role) await self.config.guild(ctx.guild).blacklist_roles.set(blacklist_roles) await ctx.send(f"Role {role.mention} removed from blacklist types.", allowed_mentions=discord.AllowedMentions.none()) return await ctx.send("Role does not have an associated blacklist type!") @moderationset_blacklist.command(name='list') @checks.admin() async def moderationset_blacklist_list(self, ctx: commands.Context): """List all blacklist types.""" blacklist_roles: list = await self.config.guild(ctx.guild).blacklist_roles() if not blacklist_roles: await ctx.send("No blacklist types set!") return blacklist_list = "" for blacklist_role in blacklist_roles: role = ctx.guild.get_role(blacklist_role['role']) if role: blacklist_list += f"{role.mention} - {blacklist_role['duration']}\n" if blacklist_list: embed = discord.Embed(title="Blacklist Types", description=blacklist_list, color=await self.bot.get_embed_color(None)) await ctx.send(embed=embed) @moderationset.command(name="ignorebots") @checks.admin() async def moderationset_ignorebots(self, ctx: commands.Context): """Toggle if the cog should ignore other bots' moderations.""" await self.config.guild(ctx.guild).ignore_other_bots.set(not await self.config.guild(ctx.guild).ignore_other_bots()) await ctx.send(f"Ignore bots setting set to {await self.config.guild(ctx.guild).ignore_other_bots()}") @moderationset.command(name="dm") @checks.admin() async def moderationset_dm(self, ctx: commands.Context): """Toggle automatically messaging moderated users. This option can be overridden by specifying the `silent` argument in any moderation command.""" await self.config.guild(ctx.guild).dm_users.set(not await self.config.guild(ctx.guild).dm_users()) await ctx.send(f"DM users setting set to {await self.config.guild(ctx.guild).dm_users()}") @moderationset.command(name="permissions") @checks.admin() async def moderationset_permissions(self, ctx: commands.Context): """Toggle whether the bot will check for discord permissions.""" await self.config.guild(ctx.guild).use_discord_permissions.set(not await self.config.guild(ctx.guild).use_discord_permissions()) await ctx.send(f"Use Discord Permissions setting set to {await self.config.guild(ctx.guild).use_discord_permissions()}") @moderationset.command(name="logchannel") @checks.admin() async def moderationset_logchannel(self, ctx: commands.Context, channel: discord.TextChannel = None): """Set a channel to log infractions to.""" if channel: await self.config.guild(ctx.guild).log_channel.set(channel.id) await ctx.send(f"Logging channel set to {channel.mention}.") else: await self.config.guild(ctx.guild).log_channel.set(" ") await ctx.send("Logging channel disabled.") @moderationset.command(name="mysql") @checks.is_owner() async def moderationset_mysql(self, ctx: commands.Context): """Configure MySQL connection details.""" await ctx.message.add_reaction("✅") await ctx.author.send(content="Click the button below to configure your MySQL connection details.", view=self.ConfigButtons(60)) class ConfigButtons(discord.ui.View): def __init__(self, timeout): super().__init__() self.config = Config.get_conf(None, cog_name='Moderation', identifier=481923957134912) @discord.ui.button(label="Edit", style=discord.ButtonStyle.success) async def config_button(self, interaction: discord.Interaction, button: discord.ui.Button): # pylint: disable=unused-argument await interaction.response.send_modal(Moderation.MySQLConfigModal(self.config)) class MySQLConfigModal(discord.ui.Modal, title="MySQL Database Configuration"): def __init__(self, config): super().__init__() self.config = config address = discord.ui.TextInput( label="Address", placeholder="Input your MySQL address here.", style=discord.TextStyle.short, required=False, max_length=300 ) database = discord.ui.TextInput( label="Database", placeholder="Input the name of your database here.", style=discord.TextStyle.short, required=False, max_length=300 ) username = discord.ui.TextInput( label="Username", placeholder="Input your MySQL username here.", style=discord.TextStyle.short, required=False, max_length=300 ) password = discord.ui.TextInput( label="Password", placeholder="Input your MySQL password here.", style=discord.TextStyle.short, required=False, max_length=300 ) async def on_submit(self, interaction: discord.Interaction): message = "" if self.address.value != "": await self.config.mysql_address.set(self.address.value) message += f"- Address set to\n - `{self.address.value}`\n" if self.database.value != "": await self.config.mysql_database.set(self.database.value) message += f"- Database set to\n - `{self.database.value}`\n" if self.username.value != "": await self.config.mysql_username.set(self.username.value) message += f"- Username set to\n - `{self.username.value}`\n" if self.password.value != "": await self.config.mysql_password.set(self.password.value) trimmed_password = self.password.value[:8] message += f"- Password set to\n - `{trimmed_password}` - Trimmed for security\n" if message == "": trimmed_password = str(await self.config.mysql_password())[:8] send = f"No changes were made.\nCurrent configuration:\n- Address:\n - `{await self.config.mysql_address()}`\n- Database:\n - `{await self.config.mysql_database()}`\n- Username:\n - `{await self.config.mysql_username()}`\n- Password:\n - `{trimmed_password}` - Trimmed for security" else: send = f"Configuration changed:\n{message}" await interaction.response.send_message(send, ephemeral=True) @moderationset.group(autohelp=True, name='import') @checks.admin() async def moderationset_import(self, ctx: commands.Context): """Import moderations from other bots.""" @moderationset_import.command(name="galacticbot") @checks.admin() async def moderationset_import_galacticbot(self, ctx: commands.Context): """Import moderations from GalacticBot.""" if ctx.message.attachments and ctx.message.attachments[0].content_type == 'application/json; charset=utf-8': message = await ctx.send("Are you sure you want to import GalacticBot moderations?\n**This will overwrite any moderations that already exist in this guild's moderation table.**\n*The import process will block the rest of your bot until it is complete.*") await message.edit(view=self.GalacticBotImportButtons(60, ctx, message)) else: await ctx.send("Please provide a valid GalacticBot moderation export file.") class GalacticBotImportButtons(discord.ui.View): def __init__(self, timeout, ctx, message): super().__init__() self.ctx: commands.Context = ctx self.message: discord.Message = message self.config = Config.get_conf(None, cog_name='Moderation', identifier=481923957134912) @discord.ui.button(label="Yes", style=discord.ButtonStyle.success) async def import_button_y(self, interaction: discord.Interaction, button: discord.ui.Button): # pylint: disable=unused-argument await self.message.delete() await interaction.response.send_message("Deleting original table...", ephemeral=True) database = await connect() cursor = database.cursor() query = f"DROP TABLE IF EXISTS moderation_{self.ctx.guild.id};" cursor.execute(query) cursor.close() database.commit() await interaction.edit_original_response(content="Creating new table...") await create_guild_table(self.ctx.guild) await interaction.edit_original_response(content="Importing moderations...") accepted_types = [ 'NOTE', 'WARN', 'MUTE', 'UNMUTE', 'KICK', 'SOFTBAN', 'BAN', 'UNBAN', 'SLOWMODE', 'LOCKDOWN' ] file = await self.ctx.message.attachments[0].read() data = sorted(json.loads(file), key=lambda x: x['case']) failed_cases = [] for case in data: if case['type'] not in accepted_types: continue timestamp = round(case['timestamp'] / 1000) try: if case['duration'] is not None and float(case['duration']) != 0: duration = timedelta(seconds=round(float(case['duration']) / 1000)) else: duration = 'NULL' except OverflowError: failed_cases.append(case['case']) continue metadata = { 'imported_from': 'GalacticBot' } if case['type'] == 'SLOWMODE': metadata['seconds'] = case['data']['seconds'] if case['resolved']: resolved = 1 resolved_by = None resolved_reason = None resolved_timestamp = None if case['changes']: for change in case['changes']: if change['type'] == 'RESOLVE': resolved_by = change['staff'] resolved_reason = change['reason'] resolved_timestamp = round(change['timestamp'] / 1000) break if resolved_by is None: resolved_by = '?' if resolved_reason is None: resolved_reason = 'Could not get resolve reason during moderation import.' if resolved_timestamp is None: resolved_timestamp = timestamp changes = [ { 'type': "ORIGINAL", 'reason': case['reason'], 'user_id': case['executor'], 'timestamp': timestamp }, { 'type': "RESOLVE", 'reason': resolved_reason, 'user_id': resolved_by, 'timestamp': resolved_timestamp } ] else: resolved = 0 resolved_by = 'NULL' resolved_reason = 'NULL' changes = [] if case['reason']: reason = case['reason'] else: reason = "NULL" await mysql_log( self.ctx.guild.id, case['executor'], case['type'], case['targetType'], case['target'], 0, duration, reason, timestamp=timestamp, resolved=resolved, resolved_by=resolved_by, resolved_reason=resolved_reason, changes=changes, metadata=metadata, database=database ) await interaction.edit_original_response(content="Import complete.") if failed_cases: await interaction.edit_original_response(content=f"Import complete.\n*Failed to import the following cases:*\n```{failed_cases}```") @discord.ui.button(label="No", style=discord.ButtonStyle.danger) async def import_button_n(self, interaction: discord.Interaction, button: discord.ui.Button): # pylint: disable=unused-argument await self.message.edit("Import cancelled.", view=None) await self.message.delete(10) await self.ctx.message.delete(10) @commands.command(aliases=["tdc"]) async def timedeltaconvert(self, ctx: commands.Context, *, duration: str): """This command converts a duration to a [`timedelta`](https://docs.python.org/3/library/datetime.html#datetime.timedelta) Python object. **Example usage** `[p]timedeltaconvert 1 day 15hr 82 minutes 52s` **Output** `1 day, 16:22:52`""" try: parsed_time = parse(duration, as_timedelta=True, raise_exception=True) await ctx.send(f"`{str(parsed_time)}`") except ValueError: await ctx.send("Please provide a convertible value!")