import logging import time from datetime import datetime, timedelta, timezone from typing import Union import discord import humanize from prisma import Prisma from discord.ext import tasks from pytimeparse2 import disable_dateutil, parse from redbot.core import app_commands, checks, Config, commands from redbot.core.app_commands import Choice class Moderation(commands.Cog): """Custom moderation cog. Developed by SeaswimmerTheFsh.""" def __init__(self, bot): self.bot = bot self.config = Config.get_conf(self, identifier=481923957134912) self.config.register_global( database_provider = "sqlite", database_address= "file:moderation.db", database_port = " ", database_name = "moderation", database_username = "red", database_password = "red" ) self.config.register_guild( ignore_other_bots = True, dm_users = True, log_channel = " " ) disable_dateutil() self.handle_expiry.start() # pylint: disable=no-member self.logger = logging.getLogger('red.seaswimmerthefsh.moderation') async def cog_load(self): """This method prepares the database schema for all of the guilds the bot is currently in.""" conf = await self.check_conf([ 'mysql_address', 'mysql_database', 'mysql_username', 'mysql_password' ]) if conf: self.logger.fatal("Failed to create tables, due to MySQL connection configuration being unset.") return guilds: list[discord.Guild] = self.bot.guilds try: for guild in guilds: if not await self.bot.cog_disabled_in_guild(self, guild): await self.create_guild_table(guild) except ConnectionRefusedError: return async def cog_unload(self): self.handle_expiry.cancel() # pylint: disable=no-member @commands.Cog.listener('on_guild_join') async def db_generate_guild_join(self, guild: discord.Guild): """This method prepares the database schema whenever the bot joins a guild.""" if not await self.bot.cog_disabled_in_guild(self, guild): conf = await self.check_conf([ 'mysql_address', 'mysql_database', 'mysql_username', 'mysql_password' ]) if conf: self.logger.error("Failed to create a table for %s, due to MySQL connection configuration being unset.", guild.id) return try: await self.create_guild_table(guild) except ConnectionRefusedError: return @commands.Cog.listener('on_audit_log_entry_create') async def autologger(self, entry: discord.AuditLogEntry): """This method automatically logs moderations done by users manually ("right clicks").""" if not await self.bot.cog_disabled_in_guild(self, entry.guild): if await self.config.guild(entry.guild.id).ignore_other_bots() is True: if entry.user.bot or entry.target.bot: return else: if entry.user.id == self.bot.user.id: return duration = "NULL" if entry.reason: reason = entry.reason + " (This action was performed without the bot.)" else: reason = "This action was performed without the bot." if entry.action == discord.AuditLogAction.kick: moderation_type = 'KICK' elif entry.action == discord.AuditLogAction.ban: moderation_type = 'BAN' elif entry.action == discord.AuditLogAction.unban: moderation_type = 'UNBAN' elif entry.action == discord.AuditLogAction.member_update: if entry.after.timed_out_until is not None: timed_out_until_aware = entry.after.timed_out_until.replace(tzinfo=timezone.utc) duration_datetime = timed_out_until_aware - datetime.now(tz=timezone.utc) minutes = round(duration_datetime.total_seconds() / 60) duration = timedelta(minutes=minutes) moderation_type = 'MUTE' else: moderation_type = 'UNMUTE' else: return await self.mysql_log(entry.guild.id, entry.user.id, moderation_type, entry.target.id, duration, reason) async def connect(self): """Connects to the MySQL database, and returns a connection object.""" conf = await self.check_conf([ 'mysql_address', 'mysql_database', 'mysql_username', 'mysql_password' ]) if conf: raise LookupError("MySQL connection details not set properly!") try: connection = mysql.connector.connect( host=await self.config.mysql_address(), user=await self.config.mysql_username(), password=await self.config.mysql_password(), database=await self.config.mysql_database() ) return connection except mysql.connector.ProgrammingError as e: self.logger.fatal("Unable to access the MySQL database!\nError:\n%s", e.msg) raise ConnectionRefusedError(f"Unable to access the MySQL Database!\n{e.msg}") from e async def create_guild_table(self, guild: discord.Guild): database = await self.connect() cursor = database.cursor() try: cursor.execute(f"SELECT * FROM `moderation_{guild.id}`") self.logger.info("MySQL Table exists for server %s (%s)", guild.name, guild.id) except mysql.connector.errors.ProgrammingError: query = f""" CREATE TABLE `moderation_{guild.id}` ( moderation_id INT UNIQUE PRIMARY KEY NOT NULL, timestamp INT NOT NULL, moderation_type LONGTEXT NOT NULL, target_id LONGTEXT NOT NULL, moderator_id LONGTEXT NOT NULL, duration LONGTEXT, end_timestamp INT, reason LONGTEXT, resolved BOOL NOT NULL, resolved_by LONGTEXT, resolve_reason LONGTEXT, expired BOOL NOT NULL ) """ cursor.execute(query) index_query_1 = "CREATE INDEX idx_target_id ON moderation_%s(target_id(25));" cursor.execute(index_query_1, (guild.id,)) index_query_2 = "CREATE INDEX idx_moderator_id ON moderation_%s(moderator_id(25));" cursor.execute(index_query_2, (guild.id,)) index_query_3 = "CREATE INDEX idx_moderation_id ON moderation_%s(moderation_id);" cursor.execute(index_query_3, (guild.id,)) insert_query = f""" INSERT INTO `moderation_{guild.id}` (moderation_id, timestamp, moderation_type, target_id, moderator_id, duration, end_timestamp, reason, resolved, resolved_by, resolve_reason, expired) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ insert_values = (0, 0, "NULL", 0, 0, "NULL", 0, "NULL", 0, "NULL", "NULL", 0) cursor.execute(insert_query, insert_values) database.commit() self.logger.info("MySQL Table (moderation_%s) created for %s (%s)", guild.id, guild.name, guild.id) database.close() async def check_conf(self, config: list): """Checks if any required config options are not set.""" not_found_list = [] for item in config: if await self.config.item() == " ": not_found_list.append(item) return not_found_list def check_permissions(self, user: discord.User, permissions: list, ctx: Union[commands.Context, discord.Interaction] = None, guild: discord.Guild = None): """Checks if a user has a specific permission (or a list of permissions) in a channel.""" if ctx: member = ctx.guild.get_member(user.id) resolved_permissions = ctx.channel.permissions_for(member) elif guild: member = guild.get_member(user.id) resolved_permissions = member.guild_permissions else: raise(KeyError) for permission in permissions: if not getattr(resolved_permissions, permission, False) and not resolved_permissions.administrator is True: return permission return False async def mysql_log(self, guild_id: str, author_id: str, moderation_type: str, target_id: int, duration, reason: str): timestamp = int(time.time()) if duration != "NULL": end_timedelta = datetime.fromtimestamp(timestamp) + duration end_timestamp = int(end_timedelta.timestamp()) else: end_timestamp = 0 database = await self.connect() cursor = database.cursor() moderation_id = await self.get_next_case_number(guild_id=guild_id, cursor=cursor) sql = f"INSERT INTO `moderation_{guild_id}` (moderation_id, timestamp, moderation_type, target_id, moderator_id, duration, end_timestamp, reason, resolved, resolved_by, resolve_reason, expired) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)" val = (moderation_id, timestamp, moderation_type, target_id, author_id, duration, end_timestamp, f"{reason}", 0, "NULL", "NULL", 0) cursor.execute(sql, val) database.commit() database.close() self.logger.debug("MySQL row inserted into moderation_%s!\n%s, %s, %s, %s, %s, %s, %s, %s, 0, NULL, NULL, 0", guild_id, moderation_id, timestamp, moderation_type, target_id, author_id, duration, end_timestamp, reason) return moderation_id async def get_next_case_number(self, guild_id: str, cursor = None): """This method returns the next case number from the MySQL table for a specific guild.""" if not cursor: database = await self.connect() cursor = database.cursor() cursor.execute(f"SELECT moderation_id FROM `moderation_{guild_id}` ORDER BY moderation_id DESC LIMIT 1") return cursor.fetchone()[0] + 1 def generate_dict(self, result): case: dict = { "moderation_id": result[0], "timestamp": result[1], "moderation_type": result[2], "target_id": result[3], "moderator_id": result[4], "duration": result[5], "end_timestamp": result[6], "reason": result[7], "resolved": result[8], "resolved_by": result[9], "resolve_reason": result[10], "expired": result[11] } return case async def fetch_user_dict(self, interaction: discord.Interaction, user_id: str): """This method returns a dictionary containing either user information or a standard deleted user template.""" try: user = await interaction.client.fetch_user(user_id) user_dict = { 'id': user.id, 'name': user.name, 'discriminator': user.discriminator } except discord.errors.NotFound: user_dict = { 'id': user_id, 'name': 'Deleted User', 'discriminator': '0' } return user_dict async def embed_factory(self, embed_type: str, /, interaction: discord.Interaction = None, case_dict: dict = None, guild: discord.Guild = None, reason: str = None, moderation_type: str = None, response: discord.InteractionMessage = None, duration: timedelta = None, resolved: bool = False): """This method creates an embed from set parameters, meant for either moderation logging or contacting the moderated user. Valid arguments for 'embed_type': - 'message' - 'log' - WIP - 'case' Required arguments for 'message': - guild - reason - moderation_type - response - duration (optional) Required arguments for 'log': - interaction - case_dict - resolved (optional) Required arguments for 'case': - interaction - case_dict""" if embed_type == 'message': if moderation_type in ["kicked", "banned", "tempbanned", "unbanned"]: guild_name = guild.name else: guild_name = f"[{guild.name}]({response.jump_url})" if moderation_type in ["tempbanned", "muted"] and duration: embed_duration = f" for {humanize.precisedelta(duration)}" else: embed_duration = "" if moderation_type == "note": embed_desc = "recieved a" else: embed_desc = "been" embed = discord.Embed(title=str.title(moderation_type), description=f"You have {embed_desc} {moderation_type}{embed_duration} in {guild_name}.", color=await self.bot.get_embed_color(None), timestamp=datetime.now()) embed.add_field(name='Reason', value=f"`{reason}`") embed.set_author(name=guild.name, icon_url=guild.icon.url) embed.set_footer(text=f"Case #{await self.get_next_case_number(guild.id)}", icon_url="https://cdn.discordapp.com/attachments/1070822161389994054/1159469476773904414/arrow-right-circle-icon-512x512-2p1e2aaw.png?ex=65312319&is=651eae19&hm=3cebdd28e805c13a79ec48ef87c32ca532ffa6b9ede2e48d0cf8e5e81f3a6818&") return embed if embed_type == 'case': target_user = await self.fetch_user_dict(interaction, case_dict['target_id']) moderator_user = await self.fetch_user_dict(interaction, case_dict['moderator_id']) target_name = f"`{target_user['name']}`" if target_user['discriminator'] == "0" else f"`{target_user['name']}#{target_user['discriminator']}`" moderator_name = moderator_user['name'] if moderator_user['discriminator'] == "0" else f"{moderator_user['name']}#{moderator_user['discriminator']}" embed = discord.Embed(title=f"📕 Case #{case_dict['moderation_id']}", color=await self.bot.get_embed_color(None)) embed.description = f"**Type:** {str.title(case_dict['moderation_type'])}\n**Target:** {target_name} ({target_user['id']})\n**Moderator:** {moderator_name} ({moderator_user['id']})\n**Resolved:** {bool(case_dict['resolved'])}\n**Timestamp:** | " if case_dict['duration'] != 'NULL': td = timedelta(**{unit: int(val) for unit, val in zip(["hours", "minutes", "seconds"], case_dict["duration"].split(":"))}) duration_embed = f"{humanize.precisedelta(td)} | " if case_dict["expired"] == '0' else str(humanize.precisedelta(td)) embed.description = embed.description + f"\n**Duration:** {duration_embed}\n**Expired:** {bool(case_dict['expired'])}" embed.add_field(name='Reason', value=f"```{case_dict['reason']}```", inline=False) if case_dict['resolved'] == 1: resolved_user = await self.fetch_user_dict(interaction, case_dict['resolved_by']) resolved_name = resolved_user['name'] if resolved_user['discriminator'] == "0" else f"{resolved_user['name']}#{resolved_user['discriminator']}" embed.add_field(name='Resolve Reason', value=f"Resolved by {resolved_name} ({resolved_user['id']}) for:\n```{case_dict['resolve_reason']}```", inline=False) return embed if embed_type == 'log': if resolved: target_user = await self.fetch_user_dict(interaction, case_dict['target_id']) moderator_user = await self.fetch_user_dict(interaction, case_dict['moderator_id']) target_name = f"`{target_user['name']}`" if target_user['discriminator'] == "0" else f"`{target_user['name']}#{target_user['discriminator']}`" moderator_name = moderator_user['name'] if moderator_user['discriminator'] == "0" else f"{moderator_user['name']}#{moderator_user['discriminator']}" embed = discord.Embed(title=f"📕 Case #{case_dict['moderation_id']} Resolved", color=await self.bot.get_embed_color(None)) embed.description = f"**Type:** {str.title(case_dict['moderation_type'])}\n**Target:** {target_name} ({target_user['id']})\n**Moderator:** {moderator_name} ({moderator_user['id']})\n**Timestamp:** | " if case_dict['duration'] != 'NULL': td = timedelta(**{unit: int(val) for unit, val in zip(["hours", "minutes", "seconds"], case_dict["duration"].split(":"))}) duration_embed = f"{humanize.precisedelta(td)} | " if case_dict["expired"] == '0' else str(humanize.precisedelta(td)) embed.description = embed.description + f"\n**Duration:** {duration_embed}\n**Expired:** {bool(case_dict['expired'])}" embed.add_field(name='Reason', value=f"```{case_dict['reason']}```", inline=False) resolved_user = await self.fetch_user_dict(interaction, case_dict['resolved_by']) resolved_name = resolved_user['name'] if resolved_user['discriminator'] == "0" else f"{resolved_user['name']}#{resolved_user['discriminator']}" embed.add_field(name='Resolve Reason', value=f"Resolved by {resolved_name} ({resolved_user['id']}) for:\n```{case_dict['resolve_reason']}```", inline=False) else: target_user = await self.fetch_user_dict(interaction, case_dict['target_id']) moderator_user = await self.fetch_user_dict(interaction, case_dict['moderator_id']) target_name = f"`{target_user['name']}`" if target_user['discriminator'] == "0" else f"`{target_user['name']}#{target_user['discriminator']}`" moderator_name = moderator_user['name'] if moderator_user['discriminator'] == "0" else f"{moderator_user['name']}#{moderator_user['discriminator']}" embed = discord.Embed(title=f"📕 Case #{case_dict['moderation_id']}", color=await self.bot.get_embed_color(None)) embed.description = f"**Type:** {str.title(case_dict['moderation_type'])}\n**Target:** {target_name} ({target_user['id']})\n**Moderator:** {moderator_name} ({moderator_user['id']})\n**Timestamp:** | " if case_dict['duration'] != 'NULL': td = timedelta(**{unit: int(val) for unit, val in zip(["hours", "minutes", "seconds"], case_dict["duration"].split(":"))}) embed.description = embed.description + f"\n**Duration:** {humanize.precisedelta(td)} | " embed.add_field(name='Reason', value=f"```{case_dict['reason']}```", inline=False) return embed raise(TypeError("'type' argument is invalid!")) async def fetch_case(self, moderation_id: int, guild_id: str): """This method fetches a case from the database and returns the case's dictionary.""" database = await self.connect() cursor = database.cursor() query = "SELECT * FROM moderation_%s WHERE moderation_id = %s;" cursor.execute(query, (guild_id, moderation_id)) result = cursor.fetchone() cursor.close() database.close() return self.generate_dict(result) async def log(self, interaction: discord.Interaction, moderation_id: int, resolved: bool = False): """This method sends a message to the guild's configured logging channel when an infraction takes place.""" logging_channel_id = await self.config.guild(interaction.guild).log_channel() if logging_channel_id != " ": logging_channel = interaction.guild.get_channel(logging_channel_id) case = await self.fetch_case(moderation_id, interaction.guild.id) if case: embed = await self.embed_factory('log', interaction=interaction, case_dict=case, resolved=resolved) try: await logging_channel.send(embed=embed) except discord.errors.Forbidden: return @app_commands.command(name="note") async def note(self, interaction: discord.Interaction, target: discord.User, reason: str, silent: bool = None): """Add a note to a user. Parameters ----------- target: discord.User Who are you noting? reason: str Why are you noting this user? silent: bool Should the user be messaged?""" if interaction.guild.get_member(target.id): target_member = interaction.guild.get_member(target.id) if interaction.guild.get_member(interaction.client.user.id).top_role <= target_member.top_role: await interaction.response.send_message(content="You cannot moderate members with a role higher than the bot!", ephemeral=True) return if interaction.user.top_role <= target_member.top_role: await interaction.response.send_message(content="You cannot moderate members with a higher role than you!", ephemeral=True) return await interaction.response.send_message(content=f"{target.mention} has recieved a note!\n**Reason** - `{reason}`") if silent is None: silent = not await self.config.guild(interaction.guild).dm_users() if silent is False: try: embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='note', response=await interaction.original_response()) await target.send(embed=embed) except discord.errors.HTTPException: pass moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'NOTE', target.id, 'NULL', reason) await self.log(interaction, moderation_id) @app_commands.command(name="warn") async def warn(self, interaction: discord.Interaction, target: discord.Member, reason: str, silent: bool = None): """Warn a user. Parameters ----------- target: discord.Member Who are you warning? reason: str Why are you warning this user? silent: bool Should the user be messaged?""" if interaction.guild.get_member(target.id): target_member = interaction.guild.get_member(target.id) if interaction.guild.get_member(interaction.client.user.id).top_role <= target_member.top_role: await interaction.response.send_message(content="You cannot moderate members with a role higher than the bot!", ephemeral=True) return if interaction.user.top_role <= target_member.top_role: await interaction.response.send_message(content="You cannot moderate members with a higher role than you!", ephemeral=True) return await interaction.response.send_message(content=f"{target.mention} has been warned!\n**Reason** - `{reason}`") if silent is None: silent = not await self.config.guild(interaction.guild).dm_users() if silent is False: try: embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='warned', response=await interaction.original_response()) await target.send(embed=embed) except discord.errors.HTTPException: pass moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'WARN', target.id, 'NULL', reason) await self.log(interaction, moderation_id) @app_commands.command(name="mute") async def mute(self, interaction: discord.Interaction, target: discord.Member, duration: str, reason: str, silent: bool = None): """Mute a user. Parameters ----------- target: discord.Member Who are you unbanning? duration: str How long are you muting this user for? reason: str Why are you unbanning this user? silent: bool Should the user be messaged?""" if interaction.guild.get_member(target.id): target_member = interaction.guild.get_member(target.id) if interaction.guild.get_member(interaction.client.user.id).top_role <= target_member.top_role: await interaction.response.send_message(content="You cannot moderate members with a role higher than the bot!", ephemeral=True) return if interaction.user.top_role <= target_member.top_role: await interaction.response.send_message(content="You cannot moderate members with a higher role than you!", ephemeral=True) return permissions = self.check_permissions(interaction.client.user, ['moderate_members'], interaction) if permissions: await interaction.response.send_message(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True) return if target.is_timed_out() is True: await interaction.response.send_message(f"{target.mention} is already muted!", allowed_mentions=discord.AllowedMentions(users=False), ephemeral=True) return try: parsed_time = parse(sval=duration, as_timedelta=True, raise_exception=True) except ValueError: await interaction.response.send_message("Please provide a valid duration!", ephemeral=True) return if parsed_time.total_seconds() / 1000 > 2419200000: await interaction.response.send_message("Please provide a duration that is less than 28 days.") return await target.timeout(parsed_time, reason=f"Muted by {interaction.user.id} for: {reason}") await interaction.response.send_message(content=f"{target.mention} has been muted for {humanize.precisedelta(parsed_time)}!\n**Reason** - `{reason}`") if silent is None: silent = not await self.config.guild(interaction.guild).dm_users() if silent is False: try: embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='muted', response=await interaction.original_response(), duration=parsed_time) await target.send(embed=embed) except discord.errors.HTTPException: pass moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'MUTE', target.id, parsed_time, reason) await self.log(interaction, moderation_id) @app_commands.command(name="unmute") async def unmute(self, interaction: discord.Interaction, target: discord.Member, reason: str = None, silent: bool = None): """Unmute a user. Parameters ----------- target: discord.user Who are you unmuting? reason: str Why are you unmuting this user? silent: bool Should the user be messaged?""" if interaction.guild.get_member(target.id): target_member = interaction.guild.get_member(target.id) if interaction.guild.get_member(interaction.client.user.id).top_role <= target_member.top_role: await interaction.response.send_message(content="You cannot moderate members with a role higher than the bot!", ephemeral=True) return if interaction.user.top_role <= target_member.top_role: await interaction.response.send_message(content="You cannot moderate members with a higher role than you!", ephemeral=True) return permissions = self.check_permissions(interaction.client.user, ['moderate_members'], interaction) if permissions: await interaction.response.send_message(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True) return if target.is_timed_out() is False: await interaction.response.send_message(f"{target.mention} is not muted!", allowed_mentions=discord.AllowedMentions(users=False), ephemeral=True) return if reason: await target.timeout(None, reason=f"Unmuted by {interaction.user.id} for: {reason}") else: await target.timeout(None, reason=f"Unbanned by {interaction.user.id}") reason = "No reason given." await interaction.response.send_message(content=f"{target.mention} has been unmuted!\n**Reason** - `{reason}`") if silent is None: silent = not await self.config.guild(interaction.guild).dm_users() if silent is False: try: embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='unmuted', response=await interaction.original_response()) await target.send(embed=embed) except discord.errors.HTTPException: pass moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'UNMUTE', target.id, 'NULL', reason) await self.log(interaction, moderation_id) @app_commands.command(name="kick") async def kick(self, interaction: discord.Interaction, target: discord.Member, reason: str, silent: bool = None): """Kick a user. Parameters ----------- target: discord.user Who are you kicking? reason: str Why are you kicking this user? silent: bool Should the user be messaged?""" if interaction.guild.get_member(target.id): target_member = interaction.guild.get_member(target.id) if interaction.guild.get_member(interaction.client.user.id).top_role <= target_member.top_role: await interaction.response.send_message(content="You cannot moderate members with a role higher than the bot!", ephemeral=True) return if interaction.user.top_role <= target_member.top_role: await interaction.response.send_message(content="You cannot moderate members with a higher role than you!", ephemeral=True) return permissions = self.check_permissions(interaction.client.user, ['kick_members'], interaction) if permissions: await interaction.response.send_message(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True) return await interaction.response.send_message(content=f"{target.mention} has been kicked!\n**Reason** - `{reason}`") if silent is None: silent = not await self.config.guild(interaction.guild).dm_users() if silent is False: try: embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='kicked', response=await interaction.original_response()) await target.send(embed=embed) except discord.errors.HTTPException: pass await target.kick(f"Kicked by {interaction.user.id} for: {reason}") moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'KICK', target.id, 'NULL', reason) await self.log(interaction, moderation_id) @app_commands.command(name="ban") @app_commands.choices(delete_messages=[ Choice(name="None", value=0), Choice(name='1 Hour', value=3600), Choice(name='12 Hours', value=43200), Choice(name='1 Day', value=86400), Choice(name='3 Days', value=259200), Choice(name='7 Days', value=604800), ]) async def ban(self, interaction: discord.Interaction, target: discord.User, reason: str, duration: str = None, delete_messages: Choice[int] = 0, silent: bool = None): """Ban a user. Parameters ----------- target: discord.user Who are you banning? duration: str How long are you banning this user for? reason: str Why are you banning this user? delete_messages: Choices[int] How many days of messages to delete? silent: bool Should the user be messaged?""" if interaction.guild.get_member(target.id): target_member = interaction.guild.get_member(target.id) if interaction.guild.get_member(interaction.client.user.id).top_role <= target_member.top_role: await interaction.response.send_message(content="You cannot moderate members with a role higher than the bot!", ephemeral=True) return if interaction.user.top_role <= target_member.top_role: await interaction.response.send_message(content="You cannot moderate members with a higher role than you!", ephemeral=True) return permissions = self.check_permissions(interaction.client.user, ['ban_members'], interaction) if permissions: await interaction.response.send_message(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True) return try: await interaction.guild.fetch_ban(target) await interaction.response.send_message(content=f"{target.mention} is already banned!", ephemeral=True) return except discord.errors.NotFound: pass if duration: try: parsed_time = parse(sval=duration, as_timedelta=True, raise_exception=True) except ValueError: await interaction.response.send_message("Please provide a valid duration!", ephemeral=True) return await interaction.response.send_message(content=f"{target.mention} has been banned for {humanize.precisedelta(parsed_time)}!\n**Reason** - `{reason}`") try: embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='tempbanned', response=await interaction.original_response(), duration=parsed_time) await target.send(embed=embed) except discord.errors.HTTPException: pass await interaction.guild.ban(target, reason=f"Tempbanned by {interaction.user.id} for: {reason} (Duration: {parsed_time})", delete_message_seconds=delete_messages) await self.mysql_log(interaction.guild.id, interaction.user.id, 'TEMPBAN', target.id, parsed_time, reason) else: await interaction.response.send_message(content=f"{target.mention} has been banned!\n**Reason** - `{reason}`") if silent is None: silent = not await self.config.guild(interaction.guild).dm_users() if silent is False: try: embed = embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='banned', response=await interaction.original_response()) await target.send(embed=embed) except discord.errors.HTTPException: pass await interaction.guild.ban(target, reason=f"Banned by {interaction.user.id} for: {reason}", delete_message_seconds=delete_messages) moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'BAN', target.id, 'NULL', reason) await self.log(interaction, moderation_id) @app_commands.command(name="unban") async def unban(self, interaction: discord.Interaction, target: discord.User, reason: str = None, silent: bool = None): """Unban a user. Parameters ----------- target: discord.user Who are you unbanning? reason: str Why are you unbanning this user? silent: bool Should the user be messaged?""" if interaction.guild.get_member(target.id): target_member = interaction.guild.get_member(target.id) if interaction.guild.get_member(interaction.client.user.id).top_role <= target_member.top_role: await interaction.response.send_message(content="You cannot moderate members with a role higher than the bot!", ephemeral=True) return if interaction.user.top_role <= target_member.top_role: await interaction.response.send_message(content="You cannot moderate members with a higher role than you!", ephemeral=True) return permissions = self.check_permissions(interaction.client.user, ['ban_members'], interaction) if permissions: await interaction.response.send_message(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True) return try: await interaction.guild.fetch_ban(target) except discord.errors.NotFound: await interaction.response.send_message(content=f"{target.mention} is not banned!", ephemeral=True) return if reason: await interaction.guild.unban(target, reason=f"Unbanned by {interaction.user.id} for: {reason}") else: await interaction.guild.unban(target, reason=f"Unbanned by {interaction.user.id}") reason = "No reason given." await interaction.response.send_message(content=f"{target.mention} has been unbanned!\n**Reason** - `{reason}`") if silent is None: silent = not await self.config.guild(interaction.guild).dm_users() if silent is False: try: embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='unbanned', response=await interaction.original_response()) await target.send(embed=embed) except discord.errors.HTTPException: pass moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'UNBAN', target.id, 'NULL', reason) await self.log(interaction, moderation_id) @app_commands.command(name="history") async def history(self, interaction: discord.Interaction, target: discord.User = None, moderator: discord.User = None, pagesize: app_commands.Range[int, 1, 25] = 5, page: int = 1, epheremal: bool = False): """List previous infractions. Parameters ----------- target: discord.User User whose infractions to query, overrides moderator if both are given moderator: discord.User Query by moderator pagesize: app_commands.Range[int, 1, 25] Amount of infractions to list per page page: int Page to select epheremal: bool Hide the command response""" permissions = self.check_permissions(interaction.client.user, ['embed_links'], interaction) if permissions: await interaction.response.send_message(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True) return database = await self.connect() cursor = database.cursor() if target: query = """SELECT * FROM moderation_%s WHERE target_id = %s ORDER BY moderation_id DESC;""" cursor.execute(query, (interaction.guild.id, target.id)) elif moderator: query = """SELECT * FROM moderation_%s WHERE moderator_id = %s ORDER BY moderation_id DESC;""" cursor.execute(query, (interaction.guild.id, moderator.id)) else: query = """SELECT * FROM moderation_%s ORDER BY moderation_id DESC;""" cursor.execute(query, (interaction.guild.id,)) results = cursor.fetchall() result_dict_list = [] for result in results: case_dict = self.generate_dict(result) result_dict_list.append(case_dict) if target or moderator: case_quantity = len(result_dict_list) else: case_quantity = len(result_dict_list) - 1 # account for case 0 technically existing page_quantity = round(case_quantity / pagesize) start_index = (page - 1) * pagesize end_index = page * pagesize embed = discord.Embed(color=await self.bot.get_embed_color(None)) embed.set_author(icon_url=interaction.guild.icon.url, name='Infraction History') embed.set_footer(text=f"Page {page}/{page_quantity} | {case_quantity} Results") for case in result_dict_list[start_index:end_index]: if str(case['moderation_id']) == '0': continue target_user = await self.fetch_user_dict(interaction, case['target_id']) moderator_user = await self.fetch_user_dict(interaction, case['moderator_id']) target_name = f"`{target_user['name']}`" if target_user['discriminator'] == "0" else f"`{target_user['name']}#{target_user['discriminator']}`" moderator_name = moderator_user['name'] if moderator_user['discriminator'] == "0" else f"{moderator_user['name']}#{moderator_user['discriminator']}" field_name = f"Case #{case['moderation_id']} ({str.title(case['moderation_type'])})" field_value = f"**Target:** `{target_name}` ({target_user['id']})\n**Moderator:** `{moderator_name}` ({moderator_user['id']})\n**Reason:** `{str(case['reason'])[:150]}`" if case['duration'] != 'NULL': td = timedelta(**{unit: int(val) for unit, val in zip(["hours", "minutes", "seconds"], case["duration"].split(":"))}) duration_embed = f"{humanize.precisedelta(td)} | " if case["expired"] == '0' else f"{humanize.precisedelta(td)} | Expired" field_value = field_value + f"\n**Duration:** {duration_embed}" if bool(case['resolved']): field_value = field_value + "\n**Resolved:** True" embed.add_field(name=field_name, value=field_value, inline=False) await interaction.response.send_message(embed=embed, ephemeral=epheremal) @app_commands.command(name="resolve") async def resolve(self, interaction: discord.Interaction, case_number: int, reason: str = None): """Resolve a specific case. Parameters ----------- case_number: int Case number of the case you're trying to resolve reason: str Reason for resolving case""" permissions = self.check_permissions(interaction.client.user, ['embed_links', 'moderate_members', 'ban_members'], interaction) if permissions: await interaction.response.send_message(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True) return conf = await self.check_conf(['mysql_database']) if conf: raise(LookupError) database = await self.connect() cursor = database.cursor() db = await self.config.mysql_database() query_1 = "SELECT * FROM moderation_%s WHERE moderation_id = %s;" cursor.execute(query_1, (interaction.guild.id, case_number)) result_1 = cursor.fetchone() if result_1 is None or case_number == 0: await interaction.response.send_message(content=f"There is no moderation with a case number of {case_number}.", ephemeral=True) return query_2 = "SELECT * FROM moderation_%s WHERE moderation_id = %s AND resolved = 0;" cursor.execute(query_2, (interaction.guild.id, case_number)) result_2 = cursor.fetchone() if result_2 is None: await interaction.response.send_message(content=f"This moderation has already been resolved!\nUse `/case {case_number}` for more information.", ephemeral=True) return case = self.generate_dict(result_2) if reason is None: reason = "No reason given." if case['moderation_type'] in ['UNMUTE', 'UNBAN']: await interaction.response.send_message(content="You cannot resolve this type of moderation!", ephemeral=True) if case['moderation_type'] in ['MUTE', 'TEMPBAN', 'BAN']: if case['moderation_type'] == 'MUTE': try: member = await interaction.guild.fetch_member(case['target_id']) await member.timeout(None, reason=f"Case #{case_number} resolved by {interaction.user.id}") except discord.NotFound: pass if case['moderation_type'] in ['TEMPBAN', 'BAN']: try: user = await interaction.client.fetch_user(case['target_id']) await interaction.guild.unban(user, reason=f"Case #{case_number} resolved by {interaction.user.id}") except discord.NotFound: pass resolve_query = f"UPDATE `{db}`.`moderation_{interaction.guild.id}` SET resolved = 1, expired = 1, resolved_by = %s, resolve_reason = %s WHERE moderation_id = %s" else: resolve_query = f"UPDATE `{db}`.`moderation_{interaction.guild.id}` SET resolved = 1, resolved_by = %s, resolve_reason = %s WHERE moderation_id = %s" cursor.execute(resolve_query, (interaction.user.id, reason, case_number)) database.commit() response_query = "SELECT * FROM moderation_%s WHERE moderation_id = %s;" cursor.execute(response_query, (interaction.guild.id, case_number)) result = cursor.fetchone() case_dict = self.generate_dict(result) embed = await self.embed_factory('case', interaction=interaction, case_dict=case_dict) await interaction.response.send_message(content=f"✅ Moderation #{case_number} resolved!", embed=embed) await self.log(interaction, case_number, True) cursor.close() database.close() @app_commands.command(name="case") async def case(self, interaction: discord.Interaction, case_number: int, ephemeral: bool = False): """Check the details of a specific case. Parameters ----------- case_number: int What case are you looking up? ephemeral: bool Hide the command response""" permissions = self.check_permissions(interaction.client.user, ['embed_links'], interaction) if permissions: await interaction.response.send_message(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True) return if case_number != 0: case = await self.fetch_case(case_number, interaction.guild.id) if case: embed = await self.embed_factory('case', interaction=interaction, case_dict=case) await interaction.response.send_message(embed=embed, ephemeral=ephemeral) return await interaction.response.send_message(content=f"No case with case number `{case_number}` found.", ephemeral=True) @tasks.loop(minutes=1) async def handle_expiry(self): conf = await self.check_conf(['mysql_database']) if conf: raise(LookupError) database = await self.connect() cursor = database.cursor() db = await self.config.mysql_database() guilds: list[discord.Guild] = self.bot.guilds for guild in guilds: if not await self.bot.cog_disabled_in_guild(self, guild): tempban_query = f"SELECT target_id, moderation_id FROM moderation_{guild.id} WHERE end_timestamp != 0 AND end_timestamp <= %s AND moderation_type = 'TEMPBAN' AND expired = 0" try: cursor.execute(tempban_query, (time.time(),)) result = cursor.fetchall() except mysql.connector.errors.ProgrammingError: continue target_ids = [row[0] for row in result] moderation_ids = [row[1] for row in result] for target_id, moderation_id in zip(target_ids, moderation_ids): user: discord.User = await self.bot.fetch_user(target_id) await guild.unban(user, reason=f"Automatic unban from case #{moderation_id}") embed = await self.embed_factory('message', guild, f'Automatic unban from case #{moderation_id}', 'unbanned') try: await user.send(embed=embed) except discord.errors.HTTPException: pass expiry_query = f"UPDATE `{db}`.`moderation_{guild.id}` SET expired = 1 WHERE (end_timestamp != 0 AND end_timestamp <= %s AND expired = 0) OR (expired = 0 AND resolved = 1)" cursor.execute(expiry_query, (time.time(),)) database.commit() cursor.close() database.close() @commands.group(autohelp=True) @checks.admin() async def moderationset(self, ctx: commands.Context): """Manage moderation commands.""" @moderationset.command(name="ignorebots") @checks.admin() async def moderationset_ignorebots(self, ctx: commands.Context): """Toggle if the cog should ignore other bots' moderations.""" await self.config.guild(ctx.guild).ignore_other_bots.set(not await self.config.guild(ctx.guild).ignore_other_bots()) await ctx.send(f"Ignore bots setting set to {await self.config.guild(ctx.guild).ignore_other_bots()}") @moderationset.command(name="dm") @checks.admin() async def moderationset_dm(self, ctx: commands.Context): """Toggle automatically messaging moderated users. This option can be overridden by specifying the `silent` argument in any moderation command.""" await self.config.guild(ctx.guild).dm_users.set(not await self.config.guild(ctx.guild).dm_users()) await ctx.send(f"DM users setting set to {await self.config.guild(ctx.guild).dm_users()}") @moderationset.command(name="logchannel") @checks.admin() async def moderationset_logchannel(self, ctx: commands.Context, channel: discord.TextChannel = None): """Set a channel to log infractions to.""" if channel: await self.config.guild(ctx.guild).log_channel.set(channel.id) await ctx.send(f"Logging channel set to {channel.mention}.") else: await self.config.guild(ctx.guild).log_channel.set(" ") await ctx.send("Logging channel disabled.") @moderationset.command(name="mysql") @checks.is_owner() async def moderationset_mysql(self, ctx: commands.Context): """Configure MySQL connection details.""" await ctx.message.add_reaction("✅") await ctx.author.send(content="Click the button below to configure your MySQL connection details.", view=self.ConfigButtons(60)) class ConfigButtons(discord.ui.View): def __init__(self, timeout): super().__init__() self.config = Config.get_conf(None, cog_name='Moderation', identifier=481923957134912) @discord.ui.button(label="Edit", style=discord.ButtonStyle.success) async def config_button(self, interaction: discord.Interaction, button: discord.ui.Button): # pylint: disable=unused-argument await interaction.response.send_modal(Moderation.MySQLConfigModal(self.config)) class MySQLConfigModal(discord.ui.Modal, title="MySQL Database Configuration"): def __init__(self, config): super().__init__() self.config = config address = discord.ui.TextInput( label="Address", placeholder="Input your MySQL address here.", style=discord.TextStyle.short, required=False, max_length=300 ) database = discord.ui.TextInput( label="Database", placeholder="Input the name of your database here.", style=discord.TextStyle.short, required=False, max_length=300 ) username = discord.ui.TextInput( label="Username", placeholder="Input your MySQL username here.", style=discord.TextStyle.short, required=False, max_length=300 ) password = discord.ui.TextInput( label="Password", placeholder="Input your MySQL password here.", style=discord.TextStyle.short, required=False, max_length=300 ) async def on_submit(self, interaction: discord.Interaction): message = "" if self.address.value != "": await self.config.mysql_address.set(self.address.value) message += f"- Address set to\n - `{self.address.value}`\n" if self.database.value != "": await self.config.mysql_database.set(self.database.value) message += f"- Database set to\n - `{self.database.value}`\n" if self.username.value != "": await self.config.mysql_username.set(self.username.value) message += f"- Username set to\n - `{self.username.value}`\n" if self.password.value != "": await self.config.mysql_password.set(self.password.value) trimmed_password = self.password.value[:8] message += f"- Password set to\n - `{trimmed_password}` - Trimmed for security\n" if message == "": trimmed_password = str(await self.config.mysql_password())[:8] send = f"No changes were made.\nCurrent configuration:\n- Address:\n - `{await self.config.mysql_address()}`\n- Database:\n - `{await self.config.mysql_database()}`\n- Username:\n - `{await self.config.mysql_username()}`\n- Password:\n - `{trimmed_password}` - Trimmed for security" else: send = f"Configuration changed:\n{message}" await interaction.response.send_message(send, ephemeral=True) @commands.command(aliases=["tdc"]) async def timedeltaconvert(self, ctx: commands.Context, *, duration: str): """This command converts a duration to a [`timedelta`](https://docs.python.org/3/library/datetime.html#datetime.timedelta) Python object. **Example usage** `[p]timedeltaconvert 1 day 15hr 82 minutes 52s` **Output** `1 day, 16:22:52`""" try: parsed_time = parse(duration, as_timedelta=True, raise_exception=True) await ctx.send(f"`{str(parsed_time)}`") except ValueError: await ctx.send("Please provide a convertible value!")