# _____ _ # / ____| (_) # | (___ ___ __ _ _____ ___ _ __ ___ _ __ ___ ___ _ __ # \___ \ / _ \/ _` / __\ \ /\ / / | '_ ` _ \| '_ ` _ \ / _ \ '__| # ____) | __/ (_| \__ \\ V V /| | | | | | | | | | | | __/ | # |_____/ \___|\__,_|___/ \_/\_/ |_|_| |_| |_|_| |_| |_|\___|_| import logging import json import time import os from datetime import datetime, timedelta, timezone from typing import Union import discord import humanize import mysql.connector from discord.ext import tasks from pytimeparse2 import disable_dateutil, parse from redbot.core import app_commands, checks, Config, commands, data_manager from redbot.core.app_commands import Choice class Moderation(commands.Cog): """Custom moderation cog. Developed by SeaswimmerTheFsh.""" async def red_delete_data_for_user(self, *, requester, user_id: int): if requester == "discord_deleted_user": await self.config.user_from_id(user_id).clear() database = await self.connect() cursor = database.cursor() cursor.execute("SHOW TABLES;") tables = [table[0] for table in cursor.fetchall()] condition = "target_id = %s OR moderator_id = %s;" for table in tables: delete_query = f"DELETE FROM {table[0]} WHERE {condition}" cursor.execute(delete_query, (user_id, user_id)) database.commit() cursor.close() database.close() if requester == "owner": await self.config.user_from_id(user_id).clear() if requester == "user": await self.config.user_from_id(user_id).clear() if requester == "user_strict": await self.config.user_from_id(user_id).clear() else: self.logger.warning("Invalid requester passed to red_delete_data_for_user: %s", requester) def __init__(self, bot): self.bot = bot self.config = Config.get_conf(self, identifier=481923957134912) self.config.register_global( mysql_address= " ", mysql_database = " ", mysql_username = " ", mysql_password = " " ) self.config.register_guild( use_discord_permissions = True, ignore_other_bots = True, dm_users = True, log_channel = " ", immune_roles = [], history_ephemeral = False, history_inline = False, history_pagesize = 5, history_inline_pagesize = 6, blacklist_roles = [] ) self.config.register_user( history_ephemeral = None, history_inline = None, history_pagesize = None, history_inline_pagesize = None ) disable_dateutil() self.handle_expiry.start() # pylint: disable=no-member self.logger = logging.getLogger('red.seaswimmerthefsh.moderation') async def cog_load(self): """This method prepares the database schema for all of the guilds the bot is currently in.""" conf = await self.check_conf([ 'mysql_address', 'mysql_database', 'mysql_username', 'mysql_password' ]) if conf: self.logger.fatal("Failed to create tables, due to MySQL connection configuration being unset.") return guilds: list[discord.Guild] = self.bot.guilds try: for guild in guilds: if not await self.bot.cog_disabled_in_guild(self, guild): await self.create_guild_table(guild) except ConnectionRefusedError: return async def cog_unload(self): self.handle_expiry.cancel() # pylint: disable=no-member @commands.Cog.listener('on_guild_join') async def db_generate_guild_join(self, guild: discord.Guild): """This method prepares the database schema whenever the bot joins a guild.""" if not await self.bot.cog_disabled_in_guild(self, guild): conf = await self.check_conf([ 'mysql_address', 'mysql_database', 'mysql_username', 'mysql_password' ]) if conf: self.logger.error("Failed to create a table for %s, due to MySQL connection configuration being unset.", guild.id) return try: await self.create_guild_table(guild) except ConnectionRefusedError: return @commands.Cog.listener('on_audit_log_entry_create') async def autologger(self, entry: discord.AuditLogEntry): """This method automatically logs moderations done by users manually ("right clicks").""" if not await self.bot.cog_disabled_in_guild(self, entry.guild): if await self.config.guild(entry.guild).ignore_other_bots() is True: if entry.user.bot or entry.target.bot: return else: if entry.user.id == self.bot.user.id: return duration = "NULL" if entry.reason: reason = entry.reason + " (This action was performed without the bot.)" else: reason = "This action was performed without the bot." if entry.action == discord.AuditLogAction.kick: moderation_type = 'KICK' elif entry.action == discord.AuditLogAction.ban: moderation_type = 'BAN' elif entry.action == discord.AuditLogAction.unban: moderation_type = 'UNBAN' elif entry.action == discord.AuditLogAction.member_update: if entry.after.timed_out_until is not None: timed_out_until_aware = entry.after.timed_out_until.replace(tzinfo=timezone.utc) duration_datetime = timed_out_until_aware - datetime.now(tz=timezone.utc) minutes = round(duration_datetime.total_seconds() / 60) duration = timedelta(minutes=minutes) moderation_type = 'MUTE' else: moderation_type = 'UNMUTE' else: return await self.mysql_log(entry.guild.id, entry.user.id, moderation_type, entry.target.id, 0, duration, reason) async def connect(self): """Connects to the MySQL database, and returns a connection object.""" conf = await self.check_conf([ 'mysql_address', 'mysql_database', 'mysql_username', 'mysql_password' ]) if conf: raise LookupError("MySQL connection details not set properly!") try: connection = mysql.connector.connect( host=await self.config.mysql_address(), user=await self.config.mysql_username(), password=await self.config.mysql_password(), database=await self.config.mysql_database() ) return connection except mysql.connector.ProgrammingError as e: self.logger.fatal("Unable to access the MySQL database!\nError:\n%s", e.msg) raise ConnectionRefusedError(f"Unable to access the MySQL Database!\n{e.msg}") from e async def create_guild_table(self, guild: discord.Guild): database = await self.connect() cursor = database.cursor() try: cursor.execute(f"SELECT * FROM `moderation_{guild.id}`") self.logger.info("MySQL Table exists for server %s (%s)", guild.name, guild.id) except mysql.connector.errors.ProgrammingError: query = f""" CREATE TABLE `moderation_{guild.id}` ( moderation_id INT UNIQUE PRIMARY KEY NOT NULL, timestamp INT NOT NULL, moderation_type LONGTEXT NOT NULL, target_id LONGTEXT NOT NULL, moderator_id LONGTEXT NOT NULL, role_id LONGTEXT, duration LONGTEXT, end_timestamp INT, reason LONGTEXT, resolved BOOL NOT NULL, resolved_by LONGTEXT, resolve_reason LONGTEXT, expired BOOL NOT NULL, changes JSON NOT NULL ) """ cursor.execute(query) index_query_1 = "CREATE INDEX idx_target_id ON moderation_%s(target_id(25));" cursor.execute(index_query_1, (guild.id,)) index_query_2 = "CREATE INDEX idx_moderator_id ON moderation_%s(moderator_id(25));" cursor.execute(index_query_2, (guild.id,)) index_query_3 = "CREATE INDEX idx_moderation_id ON moderation_%s(moderation_id);" cursor.execute(index_query_3, (guild.id,)) insert_query = f""" INSERT INTO `moderation_{guild.id}` (moderation_id, timestamp, moderation_type, target_id, moderator_id, role_id, duration, end_timestamp, reason, resolved, resolved_by, resolve_reason, expired, changes) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ insert_values = (0, 0, "NULL", 0, 0, 0, "NULL", 0, "NULL", 0, "NULL", "NULL", 0, json.dumps([])) cursor.execute(insert_query, insert_values) database.commit() self.logger.info("MySQL Table (moderation_%s) created for %s (%s)", guild.id, guild.name, guild.id) database.close() async def check_conf(self, config: list): """Checks if any required config options are not set.""" not_found_list = [] for item in config: if await self.config.item() == " ": not_found_list.append(item) return not_found_list def check_permissions(self, user: discord.User, permissions: list, ctx: Union[commands.Context, discord.Interaction] = None, guild: discord.Guild = None): """Checks if a user has a specific permission (or a list of permissions) in a channel.""" if ctx: member = ctx.guild.get_member(user.id) resolved_permissions = ctx.channel.permissions_for(member) elif guild: member = guild.get_member(user.id) resolved_permissions = member.guild_permissions else: raise(KeyError) for permission in permissions: if not getattr(resolved_permissions, permission, False) and not resolved_permissions.administrator is True: return permission return False async def check_moddable(self, target: Union[discord.User, discord.Member], interaction: discord.Interaction, permissions: list): """Checks if a moderator can moderate a target.""" if self.check_permissions(interaction.client.user, permissions, guild=interaction.guild): await interaction.response.send_message(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True) return False if await self.config.guild(interaction.guild).use_discord_permissions() is True: if self.check_permissions(interaction.user, permissions, guild=interaction.guild): await interaction.response.send_message(f"You do not have the `{permissions}` permission, required for this action.", ephemeral=True) return False if interaction.user.id == target.id: await interaction.response.send_message(content="You cannot moderate yourself!", ephemeral=True) return False if target.bot: await interaction.response.send_message(content="You cannot moderate bots!", ephemeral=True) return False if isinstance(target, discord.Member): if interaction.user.top_role <= target.top_role: await interaction.response.send_message(content="You cannot moderate members with a higher role than you!", ephemeral=True) return False if interaction.guild.get_member(interaction.client.user.id).top_role <= target.top_role: await interaction.response.send_message(content="You cannot moderate members with a role higher than the bot!", ephemeral=True) return False immune_roles = await self.config.guild(target.guild).immune_roles() for role in target.roles: if role.id in immune_roles: await interaction.response.send_message(content="You cannot moderate members with an immune role!", ephemeral=True) return False return True async def mysql_log(self, guild_id: str, author_id: str, moderation_type: str, target_id: int, role_id: int, duration, reason: str, database: mysql.connector.MySQLConnection = None, timestamp: int = None, resolved: bool = False, resolved_by: str = None, resolved_reason: str = None, expired: bool = None, changes: list = []): if not timestamp: timestamp = int(time.time()) if duration != "NULL": end_timedelta = datetime.fromtimestamp(timestamp) + duration end_timestamp = int(end_timedelta.timestamp()) else: end_timestamp = 0 if not expired: if int(time.time()) > end_timestamp: expired = 1 else: expired = 0 if resolved_by is None: resolved_by = "NULL" if resolved_reason is None: resolved_reason = "NULL" if not database: database = await self.connect() close_db = True else: close_db = False cursor = database.cursor() moderation_id = await self.get_next_case_number(guild_id=guild_id, cursor=cursor) sql = f"INSERT INTO `moderation_{guild_id}` (moderation_id, timestamp, moderation_type, target_id, moderator_id, role_id, duration, end_timestamp, reason, resolved, resolved_by, resolve_reason, expired, changes) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)" val = (moderation_id, timestamp, moderation_type, target_id, author_id, role_id, duration, end_timestamp, reason, int(resolved), resolved_by, resolved_reason, expired, json.dumps(changes)) cursor.execute(sql, val) cursor.close() database.commit() if close_db: database.close() self.logger.debug("MySQL row inserted into moderation_%s!\n%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s", guild_id, moderation_id, timestamp, moderation_type, target_id, author_id, role_id, duration, end_timestamp, reason, int(resolved), resolved_by, resolved_reason, expired, changes) return moderation_id async def get_next_case_number(self, guild_id: str, cursor = None): """This method returns the next case number from the MySQL table for a specific guild.""" if not cursor: database = await self.connect() cursor = database.cursor() cursor.execute(f"SELECT moderation_id FROM `moderation_{guild_id}` ORDER BY moderation_id DESC LIMIT 1") return cursor.fetchone()[0] + 1 def generate_dict(self, result): case: dict = { "moderation_id": result[0], "timestamp": result[1], "moderation_type": result[2], "target_id": result[3], "moderator_id": result[4], "role_id": result[5], "duration": result[6], "end_timestamp": result[7], "reason": result[8], "resolved": result[9], "resolved_by": result[10], "resolve_reason": result[11], "expired": result[12], "changes": json.loads(result[13]) } return case async def fetch_user_dict(self, interaction: discord.Interaction, user_id: str): """This method returns a dictionary containing either user information or a standard deleted user template.""" if user_id == '?': user_dict = { 'id': '?', 'name': 'Unknown User', 'discriminator':'0' } else: try: user = interaction.client.get_user(user_id) if user is None: user = await interaction.client.fetch_user(user_id) user_dict = { 'id': user.id, 'name': user.name, 'discriminator': user.discriminator } except discord.errors.NotFound: user_dict = { 'id': user_id, 'name': 'Deleted User', 'discriminator': '0' } return user_dict async def fetch_role_dict(self, interaction: discord.Interaction, role_id: str): """This method returns a dictionary containing either role information or a standard deleted role template.""" try: role = interaction.guild.get_role(role_id) role_dict = { 'id': role.id, 'name': role.name } except discord.errors.NotFound: role_dict = { 'id': role_id, 'name': 'Deleted Role' } return role_dict async def embed_factory(self, embed_type: str, /, interaction: discord.Interaction = None, case_dict: dict = None, guild: discord.Guild = None, reason: str = None, moderation_type: str = None, response: discord.InteractionMessage = None, duration: timedelta = None, resolved: bool = False): """This method creates an embed from set parameters, meant for either moderation logging or contacting the moderated user. Valid arguments for 'embed_type': - 'message' - 'log' - WIP - 'case' - 'changes' Required arguments for 'message': - guild - reason - moderation_type - response - duration (optional) Required arguments for 'log': - interaction - case_dict - resolved (optional) Required arguments for 'case' & 'changes': - interaction - case_dict""" if embed_type == 'message': if moderation_type in ["kicked", "banned", "tempbanned", "unbanned"]: guild_name = guild.name else: guild_name = f"[{guild.name}]({response.jump_url})" if moderation_type in ["tempbanned", "muted"] and duration: embed_duration = f" for {humanize.precisedelta(duration)}" else: embed_duration = "" if moderation_type == "note": embed_desc = "received a" else: embed_desc = "been" embed = discord.Embed(title=str.title(moderation_type), description=f"You have {embed_desc} {moderation_type}{embed_duration} in {guild_name}.", color=await self.bot.get_embed_color(None), timestamp=datetime.now()) embed.add_field(name='Reason', value=f"`{reason}`") embed.set_author(name=guild.name, icon_url=guild.icon.url) embed.set_footer(text=f"Case #{await self.get_next_case_number(guild.id)}", icon_url="https://cdn.discordapp.com/attachments/1070822161389994054/1159469476773904414/arrow-right-circle-icon-512x512-2p1e2aaw.png?ex=65312319&is=651eae19&hm=3cebdd28e805c13a79ec48ef87c32ca532ffa6b9ede2e48d0cf8e5e81f3a6818&") return embed if embed_type == 'case': target_user = await self.fetch_user_dict(interaction, case_dict['target_id']) moderator_user = await self.fetch_user_dict(interaction, case_dict['moderator_id']) target_name = f"`{target_user['name']}`" if target_user['discriminator'] == "0" else f"`{target_user['name']}#{target_user['discriminator']}`" moderator_name = f"`{moderator_user['name']}`" if moderator_user['discriminator'] == "0" else f"`{moderator_user['name']}#{moderator_user['discriminator']}`" embed = discord.Embed(title=f"📕 Case #{case_dict['moderation_id']}", color=await self.bot.get_embed_color(None)) embed.description = f"**Type:** {str.title(case_dict['moderation_type'])}\n**Target:** {target_name} ({target_user['id']})\n**Moderator:** {moderator_name} ({moderator_user['id']})\n**Resolved:** {bool(case_dict['resolved'])}\n**Timestamp:** | " if case_dict['duration'] != 'NULL': td = timedelta(**{unit: int(val) for unit, val in zip(["hours", "minutes", "seconds"], case_dict["duration"].split(":"))}) duration_embed = f"{humanize.precisedelta(td)} | " if bool(case_dict['expired']) is False else str(humanize.precisedelta(td)) embed.description += f"\n**Duration:** {duration_embed}\n**Expired:** {bool(case_dict['expired'])}" embed.description += f"\n**Changes:** {len(case_dict['changes'])}" embed.add_field(name='Reason', value=f"```{case_dict['reason']}```", inline=False) if case_dict['resolved'] == 1: resolved_user = await self.fetch_user_dict(interaction, case_dict['resolved_by']) resolved_name = f"`{resolved_user['name']}`" if resolved_user['discriminator'] == "0" else f"`{resolved_user['name']}#{resolved_user['discriminator']}`" embed.add_field(name='Resolve Reason', value=f"Resolved by {resolved_name} ({resolved_user['id']}) for:\n```{case_dict['resolve_reason']}```", inline=False) return embed if embed_type == 'changes': embed = discord.Embed(title=f"📕 Case #{case_dict['moderation_id']} Changes", color=await self.bot.get_embed_color(None)) memory_dict = {} if case_dict['changes']: for change in case_dict['changes']: if change['user_id'] not in memory_dict: memory_dict[str(change['user_id'])] = await self.fetch_user_dict(interaction, change['user_id']) user = memory_dict[str(change['user_id'])] name = user['name'] if user['discriminator'] == "0" else f"{user['name']}#{user['discriminator']}" timestamp = f" | " if change['type'] == 'ORIGINAL': embed.add_field(name='Original', value=f"**User:** `{name}` ({user['id']})\n**Reason:** {change['reason']}\n**Timestamp:** {timestamp}", inline=False) elif change['type'] == 'EDIT': embed.add_field(name='Edit', value=f"**User:** `{name}` ({user['id']})\n**Reason:** {change['reason']}\n**Timestamp:** {timestamp}", inline=False) elif change['type'] == 'RESOLVE': embed.add_field(name='Resolve', value=f"**User:** `{name}` ({user['id']})\n**Reason:** {change['reason']}\n**Timestamp:** {timestamp}", inline=False) else: embed.description = "*No changes have been made to this case.* 🙁" return embed if embed_type == 'log': if resolved: target_user = await self.fetch_user_dict(interaction, case_dict['target_id']) moderator_user = await self.fetch_user_dict(interaction, case_dict['moderator_id']) target_name = f"`{target_user['name']}`" if target_user['discriminator'] == "0" else f"`{target_user['name']}#{target_user['discriminator']}`" moderator_name = moderator_user['name'] if moderator_user['discriminator'] == "0" else f"{moderator_user['name']}#{moderator_user['discriminator']}" embed = discord.Embed(title=f"📕 Case #{case_dict['moderation_id']} Resolved", color=await self.bot.get_embed_color(None)) embed.description = f"**Type:** {str.title(case_dict['moderation_type'])}\n**Target:** {target_name} ({target_user['id']})\n**Moderator:** {moderator_name} ({moderator_user['id']})\n**Timestamp:** | " if case_dict['duration'] != 'NULL': td = timedelta(**{unit: int(val) for unit, val in zip(["hours", "minutes", "seconds"], case_dict["duration"].split(":"))}) duration_embed = f"{humanize.precisedelta(td)} | " if case_dict["expired"] == '0' else str(humanize.precisedelta(td)) embed.description = embed.description + f"\n**Duration:** {duration_embed}\n**Expired:** {bool(case_dict['expired'])}" embed.add_field(name='Reason', value=f"```{case_dict['reason']}```", inline=False) resolved_user = await self.fetch_user_dict(interaction, case_dict['resolved_by']) resolved_name = resolved_user['name'] if resolved_user['discriminator'] == "0" else f"{resolved_user['name']}#{resolved_user['discriminator']}" embed.add_field(name='Resolve Reason', value=f"Resolved by {resolved_name} ({resolved_user['id']}) for:\n```{case_dict['resolve_reason']}```", inline=False) else: target_user = await self.fetch_user_dict(interaction, case_dict['target_id']) moderator_user = await self.fetch_user_dict(interaction, case_dict['moderator_id']) target_name = f"`{target_user['name']}`" if target_user['discriminator'] == "0" else f"`{target_user['name']}#{target_user['discriminator']}`" moderator_name = moderator_user['name'] if moderator_user['discriminator'] == "0" else f"{moderator_user['name']}#{moderator_user['discriminator']}" embed = discord.Embed(title=f"📕 Case #{case_dict['moderation_id']}", color=await self.bot.get_embed_color(None)) embed.description = f"**Type:** {str.title(case_dict['moderation_type'])}\n**Target:** {target_name} ({target_user['id']})\n**Moderator:** {moderator_name} ({moderator_user['id']})\n**Timestamp:** | " if case_dict['duration'] != 'NULL': td = timedelta(**{unit: int(val) for unit, val in zip(["hours", "minutes", "seconds"], case_dict["duration"].split(":"))}) embed.description = embed.description + f"\n**Duration:** {humanize.precisedelta(td)} | " embed.add_field(name='Reason', value=f"```{case_dict['reason']}```", inline=False) return embed raise(TypeError("'type' argument is invalid!")) async def fetch_case(self, moderation_id: int, guild_id: str): """This method fetches a case from the database and returns the case's dictionary.""" database = await self.connect() cursor = database.cursor() query = "SELECT * FROM moderation_%s WHERE moderation_id = %s;" cursor.execute(query, (guild_id, moderation_id)) result = cursor.fetchone() cursor.close() database.close() return self.generate_dict(result) async def log(self, interaction: discord.Interaction, moderation_id: int, resolved: bool = False): """This method sends a message to the guild's configured logging channel when an infraction takes place.""" logging_channel_id = await self.config.guild(interaction.guild).log_channel() if logging_channel_id != " ": logging_channel = interaction.guild.get_channel(logging_channel_id) case = await self.fetch_case(moderation_id, interaction.guild.id) if case: embed = await self.embed_factory('log', interaction=interaction, case_dict=case, resolved=resolved) try: await logging_channel.send(embed=embed) except discord.errors.Forbidden: return ####################################################################################################################### ### COMMANDS ####################################################################################################################### @app_commands.command(name="note") async def note(self, interaction: discord.Interaction, target: discord.User, reason: str, silent: bool = None): """Add a note to a user. Parameters ----------- target: discord.User Who are you noting? reason: str Why are you noting this user? silent: bool Should the user be messaged?""" if not await self.check_moddable(target, interaction, ['moderate_members']): return await interaction.response.send_message(content=f"{target.mention} has recieved a note!\n**Reason** - `{reason}`") if silent is None: silent = not await self.config.guild(interaction.guild).dm_users() if silent is False: try: embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='note', response=await interaction.original_response()) await target.send(embed=embed) except discord.errors.HTTPException: pass moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'NOTE', target.id, 0, 'NULL', reason) await interaction.edit_original_response(content=f"{target.mention} has received a note! (Case `#{moderation_id}`)\n**Reason** - `{reason}`") await self.log(interaction, moderation_id) @app_commands.command(name="warn") async def warn(self, interaction: discord.Interaction, target: discord.Member, reason: str, silent: bool = None): """Warn a user. Parameters ----------- target: discord.Member Who are you warning? reason: str Why are you warning this user? silent: bool Should the user be messaged?""" if not await self.check_moddable(target, interaction, ['moderate_members']): return await interaction.response.send_message(content=f"{target.mention} has been warned!\n**Reason** - `{reason}`") if silent is None: silent = not await self.config.guild(interaction.guild).dm_users() if silent is False: try: embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='warned', response=await interaction.original_response()) await target.send(embed=embed) except discord.errors.HTTPException: pass moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'WARN', target.id, 0, 'NULL', reason) await interaction.edit_original_response(content=f"{target.mention} has been warned! (Case `#{moderation_id}`)\n**Reason** - `{reason}`") await self.log(interaction, moderation_id) # @app_commands.group(name="blacklist") # async def blacklist(self, interaction: discord.Interaction): # """Blacklist users.""" # @blacklist.command(name="add") # async def blacklist_add(self, interaction: discord.Interaction, target: discord.Member, reason: str, silent: bool = None): # """""" @app_commands.command(name="mute") async def mute(self, interaction: discord.Interaction, target: discord.Member, duration: str, reason: str, silent: bool = None): """Mute a user. Parameters ----------- target: discord.Member Who are you unbanning? duration: str How long are you muting this user for? reason: str Why are you unbanning this user? silent: bool Should the user be messaged?""" if not await self.check_moddable(target, interaction, ['moderate_members']): return if target.is_timed_out() is True: await interaction.response.send_message(f"{target.mention} is already muted!", allowed_mentions=discord.AllowedMentions(users=False), ephemeral=True) return try: parsed_time = parse(sval=duration, as_timedelta=True, raise_exception=True) except ValueError: await interaction.response.send_message("Please provide a valid duration!", ephemeral=True) return if parsed_time.total_seconds() / 1000 > 2419200000: await interaction.response.send_message("Please provide a duration that is less than 28 days.") return await target.timeout(parsed_time, reason=f"Muted by {interaction.user.id} for: {reason}") await interaction.response.send_message(content=f"{target.mention} has been muted for {humanize.precisedelta(parsed_time)}!\n**Reason** - `{reason}`") if silent is None: silent = not await self.config.guild(interaction.guild).dm_users() if silent is False: try: embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='muted', response=await interaction.original_response(), duration=parsed_time) await target.send(embed=embed) except discord.errors.HTTPException: pass moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'MUTE', target.id, 0, parsed_time, reason) await interaction.edit_original_response(content=f"{target.mention} has been muted for {humanize.precisedelta(parsed_time)}! (Case `#{moderation_id}`)\n**Reason** - `{reason}`") await self.log(interaction, moderation_id) @app_commands.command(name="unmute") async def unmute(self, interaction: discord.Interaction, target: discord.Member, reason: str = None, silent: bool = None): """Unmute a user. Parameters ----------- target: discord.user Who are you unmuting? reason: str Why are you unmuting this user? silent: bool Should the user be messaged?""" if not await self.check_moddable(target, interaction, ['moderate_members']): return if target.is_timed_out() is False: await interaction.response.send_message(f"{target.mention} is not muted!", allowed_mentions=discord.AllowedMentions(users=False), ephemeral=True) return if reason: await target.timeout(None, reason=f"Unmuted by {interaction.user.id} for: {reason}") else: await target.timeout(None, reason=f"Unbanned by {interaction.user.id}") reason = "No reason given." await interaction.response.send_message(content=f"{target.mention} has been unmuted!\n**Reason** - `{reason}`") if silent is None: silent = not await self.config.guild(interaction.guild).dm_users() if silent is False: try: embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='unmuted', response=await interaction.original_response()) await target.send(embed=embed) except discord.errors.HTTPException: pass moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'UNMUTE', target.id, 0, 'NULL', reason) await interaction.edit_original_response(content=f"{target.mention} has been unmuted! (Case `#{moderation_id}`)\n**Reason** - `{reason}`") await self.log(interaction, moderation_id) @app_commands.command(name="kick") async def kick(self, interaction: discord.Interaction, target: discord.Member, reason: str, silent: bool = None): """Kick a user. Parameters ----------- target: discord.user Who are you kicking? reason: str Why are you kicking this user? silent: bool Should the user be messaged?""" if not await self.check_moddable(target, interaction, ['kick_members']): return await interaction.response.send_message(content=f"{target.mention} has been kicked!\n**Reason** - `{reason}`") if silent is None: silent = not await self.config.guild(interaction.guild).dm_users() if silent is False: try: embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='kicked', response=await interaction.original_response()) await target.send(embed=embed) except discord.errors.HTTPException: pass await target.kick(f"Kicked by {interaction.user.id} for: {reason}") moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'KICK', target.id, 0, 'NULL', reason) await interaction.edit_original_response(content=f"{target.mention} has been kicked! (Case `#{moderation_id}`)\n**Reason** - `{reason}`") await self.log(interaction, moderation_id) @app_commands.command(name="ban") @app_commands.choices(delete_messages=[ Choice(name="None", value=0), Choice(name='1 Hour', value=3600), Choice(name='12 Hours', value=43200), Choice(name='1 Day', value=86400), Choice(name='3 Days', value=259200), Choice(name='7 Days', value=604800), ]) async def ban(self, interaction: discord.Interaction, target: discord.User, reason: str, duration: str = None, delete_messages: Choice[int] = 0, silent: bool = None): """Ban a user. Parameters ----------- target: discord.user Who are you banning? duration: str How long are you banning this user for? reason: str Why are you banning this user? delete_messages: Choices[int] How many days of messages to delete? silent: bool Should the user be messaged?""" if not await self.check_moddable(target, interaction, ['ban_members']): return try: await interaction.guild.fetch_ban(target) await interaction.response.send_message(content=f"{target.mention} is already banned!", ephemeral=True) return except discord.errors.NotFound: pass if duration: try: parsed_time = parse(sval=duration, as_timedelta=True, raise_exception=True) except ValueError: await interaction.response.send_message("Please provide a valid duration!", ephemeral=True) return await interaction.response.send_message(content=f"{target.mention} has been banned for {humanize.precisedelta(parsed_time)}!\n**Reason** - `{reason}`") try: embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='tempbanned', response=await interaction.original_response(), duration=parsed_time) await target.send(embed=embed) except discord.errors.HTTPException: pass await interaction.guild.ban(target, reason=f"Tempbanned by {interaction.user.id} for: {reason} (Duration: {parsed_time})", delete_message_seconds=delete_messages) moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'TEMPBAN', target.id, 0, parsed_time, reason) await interaction.edit_original_response(content=f"{target.mention} has been banned for {humanize.precisedelta(parsed_time)}! (Case `#{moderation_id}`)\n**Reason** - `{reason}`") await self.log(interaction, moderation_id) else: await interaction.response.send_message(content=f"{target.mention} has been banned!\n**Reason** - `{reason}`") if silent is None: silent = not await self.config.guild(interaction.guild).dm_users() if silent is False: try: embed = embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='banned', response=await interaction.original_response()) await target.send(embed=embed) except discord.errors.HTTPException: pass await interaction.guild.ban(target, reason=f"Banned by {interaction.user.id} for: {reason}", delete_message_seconds=delete_messages) moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'BAN', target.id, 0, 'NULL', reason) await interaction.edit_original_response(content=f"{target.mention} has been banned! (Case `#{moderation_id}`)\n**Reason** - `{reason}`") await self.log(interaction, moderation_id) @app_commands.command(name="unban") async def unban(self, interaction: discord.Interaction, target: discord.User, reason: str = None, silent: bool = None): """Unban a user. Parameters ----------- target: discord.user Who are you unbanning? reason: str Why are you unbanning this user? silent: bool Should the user be messaged?""" if not await self.check_moddable(target, interaction, ['ban_members']): return try: await interaction.guild.fetch_ban(target) except discord.errors.NotFound: await interaction.response.send_message(content=f"{target.mention} is not banned!", ephemeral=True) return if reason: await interaction.guild.unban(target, reason=f"Unbanned by {interaction.user.id} for: {reason}") else: await interaction.guild.unban(target, reason=f"Unbanned by {interaction.user.id}") reason = "No reason given." await interaction.response.send_message(content=f"{target.mention} has been unbanned!\n**Reason** - `{reason}`") if silent is None: silent = not await self.config.guild(interaction.guild).dm_users() if silent is False: try: embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='unbanned', response=await interaction.original_response()) await target.send(embed=embed) except discord.errors.HTTPException: pass moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'UNBAN', target.id, 0, 'NULL', reason) await interaction.edit_original_response(content=f"{target.mention} has been unbanned! (Case `#{moderation_id}`)\n**Reason** - `{reason}`") await self.log(interaction, moderation_id) @app_commands.command(name="history") async def history(self, interaction: discord.Interaction, target: discord.User = None, moderator: discord.User = None, pagesize: app_commands.Range[int, 1, 25] = None, page: int = 1, ephemeral: bool = None, inline: bool = None, export: bool = False): """List previous infractions. Parameters ----------- target: discord.User User whose infractions to query, overrides moderator if both are given moderator: discord.User Query by moderator pagesize: app_commands.Range[int, 1, 25] Amount of infractions to list per page page: int Page to select ephemeral: bool Hide the command response inline: bool Display infractions in a grid arrangement (does not look very good) export: bool Exports the server's entire moderation history to a JSON file""" if ephemeral is None: ephemeral = (await self.config.user(interaction.user).history_ephemeral() or await self.config.guild(interaction.guild).history_ephemeral() or False) if inline is None: inline = (await self.config.user(interaction.user).history_inline() or await self.config.guild(interaction.guild).history_inline() or False) if pagesize is None: if inline is True: pagesize = (await self.config.user(interaction.user).history_inline_pagesize() or await self.config.guild(interaction.guild).history_inline_pagesize() or 6) else: pagesize = (await self.config.user(interaction.user).history_pagesize() or await self.config.guild(interaction.guild).history_pagesize() or 5) await interaction.response.defer(ephemeral=ephemeral) permissions = self.check_permissions(interaction.client.user, ['embed_links'], interaction) if permissions: await interaction.followup.send(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True) return database = await self.connect() cursor = database.cursor() if target: query = """SELECT * FROM moderation_%s WHERE target_id = %s ORDER BY moderation_id DESC;""" cursor.execute(query, (interaction.guild.id, target.id)) elif moderator: query = """SELECT * FROM moderation_%s WHERE moderator_id = %s ORDER BY moderation_id DESC;""" cursor.execute(query, (interaction.guild.id, moderator.id)) else: query = """SELECT * FROM moderation_%s ORDER BY moderation_id DESC;""" cursor.execute(query, (interaction.guild.id,)) results = cursor.fetchall() result_dict_list = [] for result in results: case_dict = self.generate_dict(result) if case_dict['moderation_id'] == 0: continue result_dict_list.append(case_dict) if export: try: filename = str(data_manager.cog_data_path(cog_instance=self)) + str(os.sep) + f"moderation_{interaction.guild.id}.json" with open(filename, "w", encoding="utf-8") as f: json.dump(result_dict_list, f, indent=2) await interaction.followup.send(file=discord.File(filename, f"moderation_{interaction.guild.id}.json"), ephemeral=ephemeral) os.remove(filename) return except json.JSONDecodeError as e: await interaction.followup.send(content=f"An error occured while exporting the moderation history.\nError:\n```{e}```", ephemeral=ephemeral) return case_quantity = len(result_dict_list) page_quantity = round(case_quantity / pagesize) start_index = (page - 1) * pagesize end_index = page * pagesize embed = discord.Embed(color=await self.bot.get_embed_color(None)) embed.set_author(icon_url=interaction.guild.icon.url, name='Infraction History') embed.set_footer(text=f"Page {page}/{page_quantity} | {case_quantity} Results") memory_dict = {} for case in result_dict_list[start_index:end_index]: if case['target_id'] not in memory_dict: memory_dict[str(case['target_id'])] = await self.fetch_user_dict(interaction, case['target_id']) target_user = memory_dict[str(case['target_id'])] if case['moderator_id'] not in memory_dict: memory_dict[str(case['moderator_id'])] = await self.fetch_user_dict(interaction, case['moderator_id']) moderator_user = memory_dict[str(case['moderator_id'])] target_name = target_user['name'] if target_user['discriminator'] == "0" else f"{target_user['name']}#{target_user['discriminator']}" moderator_name = moderator_user['name'] if moderator_user['discriminator'] == "0" else f"{moderator_user['name']}#{moderator_user['discriminator']}" field_name = f"Case #{case['moderation_id']} ({str.title(case['moderation_type'])})" field_value = f"**Target:** `{target_name}` ({target_user['id']})\n**Moderator:** `{moderator_name}` ({moderator_user['id']})" if len(case['reason']) > 150: field_value += f"\n**Reason:** `{str(case['reason'])[:150]}...`" else: field_value += f"\n**Reason:** `{str(case['reason'])}`" if case['duration'] != 'NULL': td = timedelta(**{unit: int(val) for unit, val in zip(["hours", "minutes", "seconds"], case["duration"].split(":"))}) duration_embed = f"{humanize.precisedelta(td)} | " if bool(case['expired']) is False else f"{humanize.precisedelta(td)} | Expired" field_value += f"\n**Duration:** {duration_embed}" field_value += f"\n**Timestamp:** | " if bool(case['resolved']): field_value += "\n**Resolved:** True" embed.add_field(name=field_name, value=field_value, inline=inline) await interaction.followup.send(embed=embed, ephemeral=ephemeral) @app_commands.command(name="resolve") async def resolve(self, interaction: discord.Interaction, case: int, reason: str = None): """Resolve a specific case. Parameters ----------- case: int Case number of the case you're trying to resolve reason: str Reason for resolving case""" permissions = self.check_permissions(interaction.client.user, ['embed_links', 'moderate_members', 'ban_members'], interaction) if permissions: await interaction.response.send_message(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True) return conf = await self.check_conf(['mysql_database']) if conf: raise(LookupError) database = await self.connect() cursor = database.cursor() db = await self.config.mysql_database() query_1 = "SELECT * FROM moderation_%s WHERE moderation_id = %s;" cursor.execute(query_1, (interaction.guild.id, case)) result_1 = cursor.fetchone() if result_1 is None or case == 0: await interaction.response.send_message(content=f"There is no moderation with a case number of {case}.", ephemeral=True) return query_2 = "SELECT * FROM moderation_%s WHERE moderation_id = %s AND resolved = 0;" cursor.execute(query_2, (interaction.guild.id, case)) result_2 = cursor.fetchone() if result_2 is None: await interaction.response.send_message(content=f"This moderation has already been resolved!\nUse `/case {case}` for more information.", ephemeral=True) return case_dict = self.generate_dict(result_2) if reason is None: reason = "No reason given." changes: list = case_dict['changes'] if len(changes) > 25: await interaction.response.send_message(content="Due to limitations with Discord's embed system, you cannot edit a case more than 25 times.", ephemeral=True) return if not changes: changes.append( { 'type': "ORIGINAL", 'timestamp': case_dict['timestamp'], 'reason': case_dict['reason'], 'user_id': case_dict['moderator_id'] } ) changes.append( { 'type': "RESOLVE", 'timestamp': int(time.time()), 'reason': reason, 'user_id': interaction.user.id } ) if case_dict['moderation_type'] in ['UNMUTE', 'UNBAN']: await interaction.response.send_message(content="You cannot resolve this type of moderation!", ephemeral=True) if case_dict['moderation_type'] in ['MUTE', 'TEMPBAN', 'BAN']: if case_dict['moderation_type'] == 'MUTE': try: member = await interaction.guild.fetch_member(case_dict['target_id']) await member.timeout(None, reason=f"Case #{case} resolved by {interaction.user.id}") except discord.NotFound: pass if case_dict['moderation_type'] in ['TEMPBAN', 'BAN']: try: user = await interaction.client.fetch_user(case_dict['target_id']) await interaction.guild.unban(user, reason=f"Case #{case} resolved by {interaction.user.id}") except discord.NotFound: pass resolve_query = f"UPDATE `{db}`.`moderation_{interaction.guild.id}` SET resolved = 1, changes = %s, resolved_by = %s, resolve_reason = %s WHERE moderation_id = %s" else: resolve_query = f"UPDATE `{db}`.`moderation_{interaction.guild.id}` SET resolved = 1, changes = %s, resolved_by = %s, resolve_reason = %s WHERE moderation_id = %s" cursor.execute(resolve_query, (json.dumps(changes), interaction.user.id, reason, case_dict['moderation_id'])) database.commit() embed = await self.embed_factory('case', interaction=interaction, case_dict=await self.fetch_case(case, interaction.guild.id)) await interaction.response.send_message(content=f"✅ Moderation #{case} resolved!", embed=embed) await self.log(interaction, case, True) cursor.close() database.close() @app_commands.command(name="case") @app_commands.choices(export=[ Choice(name='Export as File', value='file'), Choice(name='Export as Codeblock', value='codeblock') ]) async def case(self, interaction: discord.Interaction, case: int, ephemeral: bool = None, changes: bool = False, export: Choice[str] = None): """Check the details of a specific case. Parameters ----------- case: int What case are you looking up? ephemeral: bool Hide the command response changes: bool List the changes made to the case export: bool Export the case to a JSON file or codeblock""" permissions = self.check_permissions(interaction.client.user, ['embed_links'], interaction) if permissions: await interaction.response.send_message(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True) return if ephemeral is None: ephemeral = (await self.config.user(interaction.user).history_ephemeral() or await self.config.guild(interaction.guild).history_ephemeral() or False) if case != 0: case_dict = await self.fetch_case(case, interaction.guild.id) if case_dict: if export: if export.value == 'file' or len(str(case_dict)) > 1800: filename = str(data_manager.cog_data_path(cog_instance=self)) + str(os.sep) + f"moderation_{interaction.guild.id}_case_{case}.json" with open(filename, "w", encoding="utf-8") as f: json.dump(case_dict, f, indent=2) if export.value == 'codeblock': content = f"Case #{case} exported.\n*Case was too large to export as codeblock, so it has been uploaded as a `.json` file.*" else: content = f"Case #{case} exported." await interaction.response.send_message(content=content, file=discord.File(filename, f"moderation_{interaction.guild.id}_case_{case}.json"), ephemeral=ephemeral) os.remove(filename) return await interaction.response.send_message(content=f"```json\n{json.dumps(case_dict, indent=2)}```", ephemeral=ephemeral) return if changes: embed = await self.embed_factory('changes', interaction=interaction, case_dict=case_dict) else: embed = await self.embed_factory('case', interaction=interaction, case_dict=case_dict) await interaction.response.send_message(embed=embed, ephemeral=ephemeral) return await interaction.response.send_message(content=f"No case with case number `{case}` found.", ephemeral=True) @app_commands.command(name="edit") async def edit(self, interaction: discord.Interaction, case: int, reason: str): """Edit the reason of a specific case. Parameters ----------- case: int What case are you editing? reason: str What is the new reason?""" permissions = self.check_permissions(interaction.client.user, ['embed_links'], interaction) if permissions: await interaction.response.send_message(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True) return if case != 0: case_dict = await self.fetch_case(case, interaction.guild.id) if case_dict: conf = await self.check_conf(['mysql_database']) if conf: raise(LookupError) changes: list = case_dict['changes'] if len(changes) > 25: await interaction.response.send_message(content="Due to limitations with Discord's embed system, you cannot edit a case more than 25 times.", ephemeral=True) return if not changes: changes.append( { 'type': "ORIGINAL", 'timestamp': case_dict['timestamp'], 'reason': case_dict['reason'], 'user_id': case_dict['moderator_id'] } ) changes.append( { 'type': "EDIT", 'timestamp': int(time.time()), 'reason': reason, 'user_id': interaction.user.id } ) database = await self.connect() cursor = database.cursor() db = await self.config.mysql_database() update_query = f"UPDATE `{db}`.`moderation_{interaction.guild.id}` SET changes = %s, reason = %s WHERE moderation_id = %s" cursor.execute(update_query, (json.dumps(changes), reason, case)) database.commit() new_case = await self.fetch_case(case, interaction.guild.id) embed = await self.embed_factory('case', interaction=interaction, case_dict=new_case) await interaction.response.send_message(content=f"✅ Moderation #{case} edited!", embed=embed, ephemeral=True) await self.log(interaction, case) cursor.close() database.close() return await interaction.response.send_message(content=f"No case with case number `{case}` found.", ephemeral=True) @tasks.loop(minutes=1) async def handle_expiry(self): conf = await self.check_conf(['mysql_database']) if conf: raise(LookupError) database = await self.connect() cursor = database.cursor() db = await self.config.mysql_database() guilds: list[discord.Guild] = self.bot.guilds for guild in guilds: if not await self.bot.cog_disabled_in_guild(self, guild): tempban_query = f"SELECT target_id, moderation_id FROM moderation_{guild.id} WHERE end_timestamp != 0 AND end_timestamp <= %s AND moderation_type = 'TEMPBAN' AND expired = 0" try: cursor.execute(tempban_query, (time.time(),)) result = cursor.fetchall() except mysql.connector.errors.ProgrammingError: continue target_ids = [row[0] for row in result] moderation_ids = [row[1] for row in result] for target_id, moderation_id in zip(target_ids, moderation_ids): user: discord.User = await self.bot.fetch_user(target_id) try: await guild.unban(user, reason=f"Automatic unban from case #{moderation_id}") embed = await self.embed_factory('message', guild, f'Automatic unban from case #{moderation_id}', 'unbanned') try: await user.send(embed=embed) except discord.errors.HTTPException: pass except [discord.errors.NotFound, discord.errors.Forbidden, discord.errors.HTTPException] as e: print(f"Failed to unban {user.name}#{user.discriminator} ({user.id}) from {guild.name} ({guild.id})\n{e}") expiry_query = f"UPDATE `{db}`.`moderation_{guild.id}` SET expired = 1 WHERE (end_timestamp != 0 AND end_timestamp <= %s AND expired = 0 AND moderation_type != 'BLACKLIST') OR (expired = 0 AND resolved = 1 AND moderation_type != 'BLACKLIST')" cursor.execute(expiry_query, (time.time(),)) blacklist_query = f"SELECT target_id, moderation_id, role_id FROM moderation_{guild.id} WHERE end_timestamp != 0 AND end_timestamp <= %s AND moderation_type = 'BLACKLIST' AND expired = 0" try: cursor.execute(blacklist_query, (time.time(),)) result = cursor.fetchall() except mysql.connector.errors.ProgrammingError: continue target_ids = [row[0] for row in result] moderation_ids = [row[1] for row in result] role_ids = [row[2] for row in result] for target_id, moderation_id, role_id in zip(target_ids, moderation_ids, role_ids): try: member: discord.Member = await guild.fetch_member(target_id) role: discord.Role = guild.get_role(role_id) if role is None: raise discord.errors.NotFound except [discord.errors.NotFound, discord.errors.Forbidden, discord.errors.HTTPException]: continue database.commit() cursor.close() database.close() ####################################################################################################################### ### CONFIGURATION COMMANDS ####################################################################################################################### @commands.group(autohelp=True, aliases=['modset', 'moderationsettings']) async def moderationset(self, ctx: commands.Context): """Manage moderation commands.""" @moderationset.command(name='list', aliases=['view', 'show']) async def moderationset_list(self, ctx: commands.Context): """List all moderation settings.""" if ctx.guild: guild_settings = await self.config.guild(ctx.guild).all() guild_settings_string = "" for setting in guild_settings: if 'mysql' in setting or 'roles' in setting: continue if setting == 'log_channel': channel = ctx.guild.get_channel(guild_settings[setting]) guild_settings_string += f"**{setting}**: {channel.mention}\n" else: guild_settings_string += f"**{setting}**: {guild_settings[setting]}\n" user_settings = await self.config.user(ctx.author).all() user_settings_string = "" for setting in user_settings: user_settings_string += f"**{setting}**: {user_settings[setting]}\n" embed = discord.Embed(color=await self.bot.get_embed_color(None)) embed.set_author(icon_url=ctx.guild.icon.url, name=f"{ctx.guild.name} Moderation Settings") if ctx.guild: embed.add_field(name="Guild Settings", value=guild_settings_string) embed.add_field(name="User Settings", value=user_settings_string) await ctx.send(embed=embed) @moderationset.group(autohelp=True, name='history') async def moderationset_history(self, ctx: commands.Context): """Manage configuration for the /history command.""" @moderationset_history.command(name='ephemeral', aliases=['hidden', 'hide']) async def moderationset_history_user_ephemeral(self, ctx: commands.Context, enabled: bool): """Toggle if the /history command should be ephemeral.""" await self.config.user(ctx.author).history_ephemeral.set(enabled) await ctx.send(f"Ephemeral setting set to {enabled}") @moderationset_history.command(name='pagesize') async def moderationset_history_user_pagesize(self, ctx: commands.Context, pagesize: int): """Set the amount of cases to display per page.""" await self.config.user(ctx.author).history_pagesize.set(pagesize) await ctx.send(f"Pagesize set to {await self.config.user(ctx.author).history_pagesize()}") @moderationset_history.group(name='inline') async def moderationset_history_inline(self, ctx: commands.Context): """Manage configuration for the /history command's inline argument.""" @moderationset_history_inline.command(name='toggle') async def moderationset_history_user_inline_toggle(self, ctx: commands.Context, enabled: bool): """Enable the /history command's inline argument by default.""" await self.config.user(ctx.author).history_inline.set(enabled) await ctx.send(f"Inline setting set to {enabled}") @moderationset_history_inline.command(name='pagesize') async def moderationset_history_user_inline_pagesize(self, ctx: commands.Context, pagesize: int): """Set the amount of cases to display per page.""" await self.config.user(ctx.author).history_inline_pagesize.set(pagesize) await ctx.send(f"Inline pagesize set to {await self.config.user(ctx.author).history_inline_pagesize()}") @moderationset_history.group(autohelp=True, name='guild') @checks.admin() async def moderationset_history_guild(self, ctx: commands.Context): """Manage configuration for the /history command, per guild.""" @moderationset_history_guild.command(name='ephemeral', aliases=['hidden', 'hide']) @checks.admin() async def moderationset_history_guild_ephemeral(self, ctx: commands.Context, enabled: bool): """Toggle if the /history command should be ephemeral.""" await self.config.guild(ctx.guild).history_ephemeral.set(enabled) await ctx.send(f"Ephemeral setting set to {enabled}") @moderationset_history_guild.command(name='pagesize') @checks.admin() async def moderationset_history_guild_pagesize(self, ctx: commands.Context, pagesize: int): """Set the amount of cases to display per page.""" await self.config.guild(ctx.guild).history_pagesize.set(pagesize) await ctx.send(f"Pagesize set to {await self.config.guild(ctx.guild).history_pagesize()}") @moderationset_history_guild.group(name='inline') @checks.admin() async def moderationset_history_guild_inline(self, ctx: commands.Context): """Manage configuration for the /history command's inline argument.""" @moderationset_history_guild_inline.command(name='toggle') @checks.admin() async def moderationset_history_guild_inline_toggle(self, ctx: commands.Context, enabled: bool): """Enable the /history command's inline argument by default.""" await self.config.guild(ctx.guild).history_inline.set(enabled) await ctx.send(f"Inline setting set to {enabled}") @moderationset_history_guild_inline.command(name='pagesize') @checks.admin() async def moderationset_history_guild_inline_pagesize(self, ctx: commands.Context, pagesize: int): """Set the amount of cases to display per page.""" await self.config.guild(ctx.guild).history_inline_pagesize.set(pagesize) await ctx.send(f"Inline pagesize set to {await self.config.guild(ctx.guild).history_inline_pagesize()}") @moderationset.group(autohelp=True, name='immunity') @checks.admin() async def moderationset_immunity(self, ctx: commands.Context): """Manage configuration for immune roles.""" @moderationset_immunity.command(name='add') @checks.admin() async def moderationset_immunity_add(self, ctx: commands.Context, role: discord.Role): """Add a role to the immune roles list.""" immune_roles: list = await self.config.guild(ctx.guild).immune_roles() if role.id in immune_roles: await ctx.send("Role is already immune!") return immune_roles.append(role.id) await self.config.guild(ctx.guild).immune_roles.set(immune_roles) await ctx.send(f"Role {role.name} added to immune roles.") @moderationset_immunity.command(name='remove') @checks.admin() async def moderationset_immunity_remove(self, ctx: commands.Context, role: discord.Role): """Remove a role from the immune roles list.""" immune_roles: list = await self.config.guild(ctx.guild).immune_roles() if role.id not in immune_roles: await ctx.send("Role is not immune!") return immune_roles.remove(role.id) await self.config.guild(ctx.guild).immune_roles.set(immune_roles) await ctx.send(f"Role {role.name} removed from immune roles.") @moderationset_immunity.command(name='list') @checks.admin() async def moderationset_immunity_list(self, ctx: commands.Context): """List all immune roles.""" immune_roles: list = await self.config.guild(ctx.guild).immune_roles() if not immune_roles: await ctx.send("No immune roles set!") return role_list = "" for role_id in immune_roles: role = ctx.guild.get_role(role_id) if role: role_list += f"{role.mention}\n" if role_list: embed = discord.Embed(title="Immune Roles", description=role_list, color=await self.bot.get_embed_color(None)) await ctx.send(embed=embed) @moderationset.group(autohelp=True, name='blacklist') @checks.admin() async def moderationset_blacklist(self, ctx: commands.Context): """Manage configuration for the /blacklist command.""" @moderationset_blacklist.command(name='add') @checks.admin() async def moderationset_blacklist_add(self, ctx: commands.Context, role: discord.Role, duration: str): """Add a role to the blacklist.""" blacklist_roles: list = await self.config.guild(ctx.guild).blacklist_roles() for blacklist_role in blacklist_roles: if role.id == blacklist_role['role']: await ctx.send("Role already has an associated blacklist type!") return try: parsed_time = parse(sval=duration, as_timedelta=True, raise_exception=True) except ValueError: await ctx.send("Please provide a valid duration!") return blacklist_roles.append( { 'role': role.id, 'duration': str(parsed_time) } ) await self.config.guild(ctx.guild).blacklist_roles.set(blacklist_roles) await ctx.send(f"Role {role.mention} added as a blacklist type.", allowed_mentions=discord.AllowedMentions.none()) @moderationset_blacklist.command(name='remove') @checks.admin() async def moderationset_blacklist_remove(self, ctx: commands.Context, role: discord.Role): """Remove a role's blacklist type.""" blacklist_roles: list = await self.config.guild(ctx.guild).blacklist_roles() for blacklist_role in blacklist_roles: if role.id == blacklist_role['role']: blacklist_roles.remove(blacklist_role) await self.config.guild(ctx.guild).blacklist_roles.set(blacklist_roles) await ctx.send(f"Role {role.mention} removed from blacklist types.", allowed_mentions=discord.AllowedMentions.none()) return await ctx.send("Role does not have an associated blacklist type!") @moderationset_blacklist.command(name='list') @checks.admin() async def moderationset_blacklist_list(self, ctx: commands.Context): """List all blacklist types.""" blacklist_roles: list = await self.config.guild(ctx.guild).blacklist_roles() if not blacklist_roles: await ctx.send("No blacklist types set!") return blacklist_list = "" for blacklist_role in blacklist_roles: role = ctx.guild.get_role(blacklist_role['role']) if role: blacklist_list += f"{role.mention} - {blacklist_role['duration']}\n" if blacklist_list: embed = discord.Embed(title="Blacklist Types", description=blacklist_list, color=await self.bot.get_embed_color(None)) await ctx.send(embed=embed) @moderationset.command(name="ignorebots") @checks.admin() async def moderationset_ignorebots(self, ctx: commands.Context): """Toggle if the cog should ignore other bots' moderations.""" await self.config.guild(ctx.guild).ignore_other_bots.set(not await self.config.guild(ctx.guild).ignore_other_bots()) await ctx.send(f"Ignore bots setting set to {await self.config.guild(ctx.guild).ignore_other_bots()}") @moderationset.command(name="dm") @checks.admin() async def moderationset_dm(self, ctx: commands.Context): """Toggle automatically messaging moderated users. This option can be overridden by specifying the `silent` argument in any moderation command.""" await self.config.guild(ctx.guild).dm_users.set(not await self.config.guild(ctx.guild).dm_users()) await ctx.send(f"DM users setting set to {await self.config.guild(ctx.guild).dm_users()}") @moderationset.command(name="permissions") @checks.admin() async def moderationset_permissions(self, ctx: commands.Context): """Toggle whether the bot will check for discord permissions.""" await self.config.guild(ctx.guild).use_discord_permissions.set(not await self.config.guild(ctx.guild).use_discord_permissions()) await ctx.send(f"Use Discord Permissions setting set to {await self.config.guild(ctx.guild).use_discord_permissions()}") @moderationset.command(name="logchannel") @checks.admin() async def moderationset_logchannel(self, ctx: commands.Context, channel: discord.TextChannel = None): """Set a channel to log infractions to.""" if channel: await self.config.guild(ctx.guild).log_channel.set(channel.id) await ctx.send(f"Logging channel set to {channel.mention}.") else: await self.config.guild(ctx.guild).log_channel.set(" ") await ctx.send("Logging channel disabled.") @moderationset.command(name="mysql") @checks.is_owner() async def moderationset_mysql(self, ctx: commands.Context): """Configure MySQL connection details.""" await ctx.message.add_reaction("✅") await ctx.author.send(content="Click the button below to configure your MySQL connection details.", view=self.ConfigButtons(60)) class ConfigButtons(discord.ui.View): def __init__(self, timeout): super().__init__() self.config = Config.get_conf(None, cog_name='Moderation', identifier=481923957134912) @discord.ui.button(label="Edit", style=discord.ButtonStyle.success) async def config_button(self, interaction: discord.Interaction, button: discord.ui.Button): # pylint: disable=unused-argument await interaction.response.send_modal(Moderation.MySQLConfigModal(self.config)) class MySQLConfigModal(discord.ui.Modal, title="MySQL Database Configuration"): def __init__(self, config): super().__init__() self.config = config address = discord.ui.TextInput( label="Address", placeholder="Input your MySQL address here.", style=discord.TextStyle.short, required=False, max_length=300 ) database = discord.ui.TextInput( label="Database", placeholder="Input the name of your database here.", style=discord.TextStyle.short, required=False, max_length=300 ) username = discord.ui.TextInput( label="Username", placeholder="Input your MySQL username here.", style=discord.TextStyle.short, required=False, max_length=300 ) password = discord.ui.TextInput( label="Password", placeholder="Input your MySQL password here.", style=discord.TextStyle.short, required=False, max_length=300 ) async def on_submit(self, interaction: discord.Interaction): message = "" if self.address.value != "": await self.config.mysql_address.set(self.address.value) message += f"- Address set to\n - `{self.address.value}`\n" if self.database.value != "": await self.config.mysql_database.set(self.database.value) message += f"- Database set to\n - `{self.database.value}`\n" if self.username.value != "": await self.config.mysql_username.set(self.username.value) message += f"- Username set to\n - `{self.username.value}`\n" if self.password.value != "": await self.config.mysql_password.set(self.password.value) trimmed_password = self.password.value[:8] message += f"- Password set to\n - `{trimmed_password}` - Trimmed for security\n" if message == "": trimmed_password = str(await self.config.mysql_password())[:8] send = f"No changes were made.\nCurrent configuration:\n- Address:\n - `{await self.config.mysql_address()}`\n- Database:\n - `{await self.config.mysql_database()}`\n- Username:\n - `{await self.config.mysql_username()}`\n- Password:\n - `{trimmed_password}` - Trimmed for security" else: send = f"Configuration changed:\n{message}" await interaction.response.send_message(send, ephemeral=True) @moderationset.group(autohelp=True, name='import', hidden=True) @checks.admin() async def moderationset_import(self, ctx: commands.Context): """Import moderations from other bots.""" @moderationset_import.command(name="galacticbot") @checks.admin() async def moderationset_import_galacticbot(self, ctx: commands.Context): """Import moderations from GalacticBot. **UNFINISHED!**""" if ctx.message.attachments and ctx.message.attachments[0].content_type == 'application/json; charset=utf-8': message = await ctx.send("Are you sure you want to import GalacticBot moderations? **This will overwrite any moderations that already exist in this guild's moderation table.**") await message.edit(view=self.GalacticBotImportButtons(60, ctx, message, self)) else: await ctx.send("Please provide a valid GalacticBot moderation export file.") class GalacticBotImportButtons(discord.ui.View): def __init__(self, timeout, ctx, message, cog_instance): super().__init__() self.cog_instance = cog_instance self.ctx: commands.Context = ctx self.message: discord.Message = message self.config = Config.get_conf(None, cog_name='Moderation', identifier=481923957134912) @discord.ui.button(label="Yes", style=discord.ButtonStyle.success) async def import_button_y(self, interaction: discord.Interaction, button: discord.ui.Button): # pylint: disable=unused-argument await self.message.delete() await interaction.response.send_message("Deleting original table...", ephemeral=True) database = await Moderation.connect(self.cog_instance) cursor = database.cursor() query = f"DROP TABLE IF EXISTS moderation_{self.ctx.guild.id};" cursor.execute(query) cursor.close() database.commit() await interaction.edit_original_response(content="Creating new table...") await Moderation.create_guild_table(self.cog_instance, self.ctx.guild) await interaction.edit_original_response(content="Importing moderations...") accepted_types = [ 'NOTE', 'WARN', 'MUTE', 'UNMUTE', 'KICK', 'SOFTBAN', 'BAN', 'UNBAN' ] file = await self.ctx.message.attachments[0].read() data = sorted(json.loads(file), key=lambda x: x['case']) for case in data: if case['type'] not in accepted_types: continue timestamp = case['timestamp'] / 1000 if case['duration']: duration = timedelta(milliseconds=case['duration']) if case['resolved']: resolved = 1 resolved_by = None resolved_reason = None if case['changes']: for change in case['changes']: if change['type'] == 'RESOLVE': resolved_by = change['staff'] resolved_reason = change['reason'] resolve_timestamp = change['timestamp'] / 1000 break if resolved_by is None: resolved_by = '?' if resolved_reason is None: resolved_reason = 'Could not get resolve reason during moderation import.' changes = [ { 'type': "ORIGINAL", 'reason': case['reason'], 'user_id': case['executor'], 'timestamp': timestamp }, { 'type': "RESOLVE", 'reason': resolved_reason, 'user_id': resolved_by, 'timestamp': resolve_timestamp } ] else: resolved = 0 resolved_by = 'NULL' resolved_reason = 'NULL' changes = [] await Moderation.mysql_log(self.cog_instance, self.ctx.guild.id, case['executor'], case['type'], case['target'], 0, duration, case['reason'], timestamp=timestamp, resolved=resolved, resolved_by=resolved_by, resolved_reason=resolved_reason, changes=changes, database=database) await interaction.edit_original_response(content="Import complete.") @discord.ui.button(label="No", style=discord.ButtonStyle.danger) async def import_button_n(self, interaction: discord.Interaction, button: discord.ui.Button): # pylint: disable=unused-argument await self.message.edit("Import cancelled.", view=None) await self.message.delete(10) await self.ctx.message.delete(10) @commands.command(aliases=["tdc"]) async def timedeltaconvert(self, ctx: commands.Context, *, duration: str): """This command converts a duration to a [`timedelta`](https://docs.python.org/3/library/datetime.html#datetime.timedelta) Python object. **Example usage** `[p]timedeltaconvert 1 day 15hr 82 minutes 52s` **Output** `1 day, 16:22:52`""" try: parsed_time = parse(duration, as_timedelta=True, raise_exception=True) await ctx.send(f"`{str(parsed_time)}`") except ValueError: await ctx.send("Please provide a convertible value!")