From 75109e0834d8ff71543aac9c1e4e87236bbb8095 Mon Sep 17 00:00:00 2001 From: SeaswimmerTheFsh Date: Sun, 17 Dec 2023 02:16:44 -0500 Subject: [PATCH] misc(moderation): moved a bunch of functionality to other files to reduce the length of the main file and improve code maintainability --- moderation/config.py | 3 + moderation/database.py | 221 ++++++++++++++ moderation/embed_factory.py | 152 ++++++++++ moderation/logger.py | 3 + moderation/moderation.py | 590 +++++------------------------------- moderation/utils.py | 198 ++++++++++++ 6 files changed, 652 insertions(+), 515 deletions(-) create mode 100644 moderation/config.py create mode 100644 moderation/database.py create mode 100644 moderation/embed_factory.py create mode 100644 moderation/logger.py create mode 100644 moderation/utils.py diff --git a/moderation/config.py b/moderation/config.py new file mode 100644 index 0000000..7571fa8 --- /dev/null +++ b/moderation/config.py @@ -0,0 +1,3 @@ +from Redbot.core import Config + +config = Config.get_conf(None, identifier=481923957134912, cog_name="Moderation") diff --git a/moderation/database.py b/moderation/database.py new file mode 100644 index 0000000..68ef0b3 --- /dev/null +++ b/moderation/database.py @@ -0,0 +1,221 @@ +import json +import time +import mysql.connector +from datetime import datetime +from discord import Guild +from .utils import generate_dict, get_next_case_number, check_conf +from .config import config +from .logger import logger + + +async def connect(): + """Connects to the MySQL database, and returns a connection object.""" + conf = await check_conf( + ["mysql_address", "mysql_database", "mysql_username", "mysql_password"] + ) + + if conf: + raise LookupError("MySQL connection details not set properly!") + + try: + connection = mysql.connector.connect( + host=await config.mysql_address(), + user=await config.mysql_username(), + password=await config.mysql_password(), + database=await config.mysql_database(), + ) + + return connection + + except mysql.connector.ProgrammingError as e: + logger.error("Unable to access the MySQL database!\nError:\n%s", e.msg) + raise ConnectionRefusedError( + f"Unable to access the MySQL Database!\n{e.msg}" + ) from e + + +async def create_guild_table(guild: Guild): + database = await connect() + cursor = database.cursor() + + try: + cursor.execute(f"SELECT * FROM `moderation_{guild.id}`") + logger.debug("MySQL Table exists for server %s (%s)", guild.name, guild.id) + + except mysql.connector.errors.ProgrammingError: + query = f""" + CREATE TABLE `moderation_{guild.id}` ( + moderation_id INT UNIQUE PRIMARY KEY NOT NULL, + timestamp INT NOT NULL, + moderation_type LONGTEXT NOT NULL, + target_id LONGTEXT NOT NULL, + moderator_id LONGTEXT NOT NULL, + role_id LONGTEXT, + duration LONGTEXT, + end_timestamp INT, + reason LONGTEXT, + resolved BOOL NOT NULL, + resolved_by LONGTEXT, + resolve_reason LONGTEXT, + expired BOOL NOT NULL, + changes JSON NOT NULL, + metadata JSON NOT NULL + ) + """ + cursor.execute(query) + + index_query_1 = "CREATE INDEX idx_target_id ON moderation_%s(target_id(25));" + cursor.execute(index_query_1, (guild.id,)) + index_query_2 = ( + "CREATE INDEX idx_moderator_id ON moderation_%s(moderator_id(25));" + ) + cursor.execute(index_query_2, (guild.id,)) + index_query_3 = ( + "CREATE INDEX idx_moderation_id ON moderation_%s(moderation_id);" + ) + cursor.execute(index_query_3, (guild.id,)) + + insert_query = f""" + INSERT INTO `moderation_{guild.id}` + (moderation_id, timestamp, moderation_type, target_id, moderator_id, role_id, duration, end_timestamp, reason, resolved, resolved_by, resolve_reason, expired, changes, metadata) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + """ + insert_values = ( + 0, + 0, + "NULL", + 0, + 0, + 0, + "NULL", + 0, + "NULL", + 0, + "NULL", + "NULL", + 0, + json.dumps([]), + json.dumps({}), + ) + cursor.execute(insert_query, insert_values) + + database.commit() + + logger.debug( + "MySQL Table (moderation_%s) created for %s (%s)", + guild.id, + guild.name, + guild.id, + ) + + database.close() + + +async def mysql_log( + guild_id: str, + author_id: str, + moderation_type: str, + target_id: int, + role_id: int, + duration, + reason: str, + database: mysql.connector.MySQLConnection = None, + timestamp: int = None, + resolved: bool = False, + resolved_by: str = None, + resolved_reason: str = None, + expired: bool = None, + changes: list = [], + metadata: dict = {}, +): # pylint: disable=dangerous-default-argument + if not timestamp: + timestamp = int(time.time()) + + if duration != "NULL": + end_timedelta = datetime.fromtimestamp(timestamp) + duration + end_timestamp = int(end_timedelta.timestamp()) + else: + end_timestamp = 0 + + if not expired: + if int(time.time()) > end_timestamp: + expired = 1 + else: + expired = 0 + + if resolved_by is None: + resolved_by = "NULL" + + if resolved_reason is None: + resolved_reason = "NULL" + + if not database: + database = await connect() + close_db = True + else: + close_db = False + cursor = database.cursor() + + moderation_id = await get_next_case_number(guild_id=guild_id, cursor=cursor) + + sql = f"INSERT INTO `moderation_{guild_id}` (moderation_id, timestamp, moderation_type, target_id, moderator_id, role_id, duration, end_timestamp, reason, resolved, resolved_by, resolve_reason, expired, changes, metadata) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)" + val = ( + moderation_id, + timestamp, + moderation_type, + target_id, + author_id, + role_id, + duration, + end_timestamp, + reason, + int(resolved), + resolved_by, + resolved_reason, + expired, + json.dumps(changes), + json.dumps(metadata), + ) + cursor.execute(sql, val) + + cursor.close() + database.commit() + if close_db: + database.close() + + logger.debug( + "MySQL row inserted into moderation_%s!\n%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s", + guild_id, + moderation_id, + timestamp, + moderation_type, + target_id, + author_id, + role_id, + duration, + end_timestamp, + reason, + int(resolved), + resolved_by, + resolved_reason, + expired, + changes, + metadata, + ) + + return moderation_id + + +async def fetch_case(moderation_id: int, guild_id: str): + """This method fetches a case from the database and returns the case's dictionary.""" + database = await connect() + cursor = database.cursor() + + query = "SELECT * FROM moderation_%s WHERE moderation_id = %s;" + cursor.execute(query, (guild_id, moderation_id)) + result = cursor.fetchone() + + cursor.close() + database.close() + + return generate_dict(result) diff --git a/moderation/embed_factory.py b/moderation/embed_factory.py new file mode 100644 index 0000000..69fac49 --- /dev/null +++ b/moderation/embed_factory.py @@ -0,0 +1,152 @@ +import humanize +from datetime import datetime, timedelta +from discord import Color, Embed, Guild, Interaction, InteractionMessage +from .utils import get_next_case_number, fetch_user_dict + +async def embed_factory(embed_type: str, color: Color, /, interaction: Interaction = None, case_dict: dict = None, guild: Guild = None, reason: str = None, moderation_type: str = None, response: InteractionMessage = None, duration: timedelta = None, resolved: bool = False): + """This method creates an embed from set parameters, meant for either moderation logging or contacting the moderated user. + + Valid arguments for 'embed_type': + - 'message' + - 'log' - WIP + - 'case' + - 'changes' + + Required arguments for 'message': + - guild + - reason + - moderation_type + - response + - duration (optional) + + Required arguments for 'log': + - interaction + - case_dict + - resolved (optional) + + Required arguments for 'case' & 'changes': + - interaction + - case_dict""" + if embed_type == 'message': + + if moderation_type in ["kicked", "banned", "tempbanned", "unbanned"]: + guild_name = guild.name + else: + guild_name = f"[{guild.name}]({response.jump_url})" + + if moderation_type in ["tempbanned", "muted"] and duration: + embed_duration = f" for {humanize.precisedelta(duration)}" + else: + embed_duration = "" + + if moderation_type == "note": + embed_desc = "received a" + else: + embed_desc = "been" + + embed = Embed(title=str.title(moderation_type), description=f"You have {embed_desc} {moderation_type}{embed_duration} in {guild_name}.", color=color, timestamp=datetime.now()) + embed.add_field(name='Reason', value=f"`{reason}`") + embed.set_author(name=guild.name, icon_url=guild.icon.url) + embed.set_footer(text=f"Case #{await get_next_case_number(guild.id):,}", icon_url="https://cdn.discordapp.com/attachments/1070822161389994054/1159469476773904414/arrow-right-circle-icon-512x512-2p1e2aaw.png?ex=65312319&is=651eae19&hm=3cebdd28e805c13a79ec48ef87c32ca532ffa6b9ede2e48d0cf8e5e81f3a6818&") + + return embed + + if embed_type == 'case': + target_user = await fetch_user_dict(interaction, case_dict['target_id']) + moderator_user = await fetch_user_dict(interaction, case_dict['moderator_id']) + + target_name = f"`{target_user['name']}`" if target_user['discriminator'] == "0" else f"`{target_user['name']}#{target_user['discriminator']}`" + moderator_name = f"`{moderator_user['name']}`" if moderator_user['discriminator'] == "0" else f"`{moderator_user['name']}#{moderator_user['discriminator']}`" + + embed = Embed(title=f"📕 Case #{case_dict['moderation_id']:,}", color=color) + embed.description = f"**Type:** {str.title(case_dict['moderation_type'])}\n**Target:** {target_name} ({target_user['id']})\n**Moderator:** {moderator_name} ({moderator_user['id']})\n**Resolved:** {bool(case_dict['resolved'])}\n**Timestamp:** | " + + if case_dict['duration'] != 'NULL': + td = timedelta(**{unit: int(val) for unit, val in zip(["hours", "minutes", "seconds"], case_dict["duration"].split(":"))}) + duration_embed = f"{humanize.precisedelta(td)} | " if bool(case_dict['expired']) is False else str(humanize.precisedelta(td)) + embed.description += f"\n**Duration:** {duration_embed}\n**Expired:** {bool(case_dict['expired'])}" + + embed.description += f"\n**Changes:** {len(case_dict['changes'])}" + + if case_dict['metadata']: + if case_dict['metadata']['imported_from']: + embed.description += f"\n**Imported From:** {case_dict['metadata']['imported_from']}" + + embed.add_field(name='Reason', value=f"```{case_dict['reason']}```", inline=False) + + if case_dict['resolved'] == 1: + resolved_user = await fetch_user_dict(interaction, case_dict['resolved_by']) + resolved_name = f"`{resolved_user['name']}`" if resolved_user['discriminator'] == "0" else f"`{resolved_user['name']}#{resolved_user['discriminator']}`" + embed.add_field(name='Resolve Reason', value=f"Resolved by {resolved_name} ({resolved_user['id']}) for:\n```{case_dict['resolve_reason']}```", inline=False) + + return embed + + if embed_type == 'changes': + embed = Embed(title=f"📕 Case #{case_dict['moderation_id']:,} Changes", color=color) + + memory_dict = {} + + if case_dict['changes']: + for change in case_dict['changes']: + if change['user_id'] not in memory_dict: + memory_dict[str(change['user_id'])] = await fetch_user_dict(interaction, change['user_id']) + + user = memory_dict[str(change['user_id'])] + name = user['name'] if user['discriminator'] == "0" else f"{user['name']}#{user['discriminator']}" + + timestamp = f" | " + + if change['type'] == 'ORIGINAL': + embed.add_field(name='Original', value=f"**User:** `{name}` ({user['id']})\n**Reason:** {change['reason']}\n**Timestamp:** {timestamp}", inline=False) + + elif change['type'] == 'EDIT': + embed.add_field(name='Edit', value=f"**User:** `{name}` ({user['id']})\n**Reason:** {change['reason']}\n**Timestamp:** {timestamp}", inline=False) + + elif change['type'] == 'RESOLVE': + embed.add_field(name='Resolve', value=f"**User:** `{name}` ({user['id']})\n**Reason:** {change['reason']}\n**Timestamp:** {timestamp}", inline=False) + + else: + embed.description = "*No changes have been made to this case.* 🙁" + + return embed + + if embed_type == 'log': + if resolved: + target_user = await fetch_user_dict(interaction, case_dict['target_id']) + moderator_user = await fetch_user_dict(interaction, case_dict['moderator_id']) + + target_name = f"`{target_user['name']}`" if target_user['discriminator'] == "0" else f"`{target_user['name']}#{target_user['discriminator']}`" + moderator_name = moderator_user['name'] if moderator_user['discriminator'] == "0" else f"{moderator_user['name']}#{moderator_user['discriminator']}" + + embed = Embed(title=f"📕 Case #{case_dict['moderation_id']:,} Resolved", color=color) + + embed.description = f"**Type:** {str.title(case_dict['moderation_type'])}\n**Target:** {target_name} ({target_user['id']})\n**Moderator:** {moderator_name} ({moderator_user['id']})\n**Timestamp:** | " + + if case_dict['duration'] != 'NULL': + td = timedelta(**{unit: int(val) for unit, val in zip(["hours", "minutes", "seconds"], case_dict["duration"].split(":"))}) + duration_embed = f"{humanize.precisedelta(td)} | " if case_dict["expired"] == '0' else str(humanize.precisedelta(td)) + embed.description = embed.description + f"\n**Duration:** {duration_embed}\n**Expired:** {bool(case_dict['expired'])}" + + embed.add_field(name='Reason', value=f"```{case_dict['reason']}```", inline=False) + + resolved_user = await fetch_user_dict(interaction, case_dict['resolved_by']) + resolved_name = resolved_user['name'] if resolved_user['discriminator'] == "0" else f"{resolved_user['name']}#{resolved_user['discriminator']}" + embed.add_field(name='Resolve Reason', value=f"Resolved by {resolved_name} ({resolved_user['id']}) for:\n```{case_dict['resolve_reason']}```", inline=False) + else: + target_user = await fetch_user_dict(interaction, case_dict['target_id']) + moderator_user = await fetch_user_dict(interaction, case_dict['moderator_id']) + + target_name = f"`{target_user['name']}`" if target_user['discriminator'] == "0" else f"`{target_user['name']}#{target_user['discriminator']}`" + moderator_name = moderator_user['name'] if moderator_user['discriminator'] == "0" else f"{moderator_user['name']}#{moderator_user['discriminator']}" + + embed = Embed(title=f"📕 Case #{case_dict['moderation_id']:,}", color=color) + embed.description = f"**Type:** {str.title(case_dict['moderation_type'])}\n**Target:** {target_name} ({target_user['id']})\n**Moderator:** {moderator_name} ({moderator_user['id']})\n**Timestamp:** | " + + if case_dict['duration'] != 'NULL': + td = timedelta(**{unit: int(val) for unit, val in zip(["hours", "minutes", "seconds"], case_dict["duration"].split(":"))}) + embed.description = embed.description + f"\n**Duration:** {humanize.precisedelta(td)} | " + + embed.add_field(name='Reason', value=f"```{case_dict['reason']}```", inline=False) + return embed + + raise(TypeError("'type' argument is invalid!")) diff --git a/moderation/logger.py b/moderation/logger.py new file mode 100644 index 0000000..0357c60 --- /dev/null +++ b/moderation/logger.py @@ -0,0 +1,3 @@ +import logging + +logger = logging.getLogger("red.sea.moderation") diff --git a/moderation/moderation.py b/moderation/moderation.py index daf5da1..f5c1614 100644 --- a/moderation/moderation.py +++ b/moderation/moderation.py @@ -5,12 +5,10 @@ # ____) | __/ (_| \__ \\ V V /| | | | | | | | | | | | __/ | # |_____/ \___|\__,_|___/ \_/\_/ |_|_| |_| |_|_| |_| |_|\___|_| -import logging import json import time import os from datetime import datetime, timedelta, timezone -from typing import Union import discord import humanize import mysql.connector @@ -18,6 +16,10 @@ from discord.ext import tasks from pytimeparse2 import disable_dateutil, parse from redbot.core import app_commands, checks, Config, commands, data_manager from redbot.core.app_commands import Choice +from .database import connect, create_guild_table, fetch_case, mysql_log +from .embed_factory import embed_factory +from .utils import check_conf, check_permissions, check_moddable, fetch_user_dict, generate_dict, log +from .logger import logger class Moderation(commands.Cog): """Custom moderation cog. @@ -27,7 +29,7 @@ class Moderation(commands.Cog): if requester == "discord_deleted_user": await self.config.user_from_id(user_id).clear() - database = await self.connect() + database = await connect() cursor = database.cursor() cursor.execute("SHOW TABLES;") @@ -49,7 +51,7 @@ class Moderation(commands.Cog): if requester == "user_strict": await self.config.user_from_id(user_id).clear() else: - self.logger.warning("Invalid requester passed to red_delete_data_for_user: %s", requester) + logger.warning("Invalid requester passed to red_delete_data_for_user: %s", requester) def __init__(self, bot): self.bot = bot @@ -80,11 +82,10 @@ class Moderation(commands.Cog): ) disable_dateutil() self.handle_expiry.start() # pylint: disable=no-member - self.logger = logging.getLogger('red.sea.moderation') async def cog_load(self): """This method prepares the database schema for all of the guilds the bot is currently in.""" - conf = await self.check_conf([ + conf = await check_conf([ 'mysql_address', 'mysql_database', 'mysql_username', @@ -92,7 +93,7 @@ class Moderation(commands.Cog): ]) if conf: - self.logger.error("Failed to create tables, due to MySQL connection configuration being unset.") + logger.error("Failed to create tables, due to MySQL connection configuration being unset.") return guilds: list[discord.Guild] = self.bot.guilds @@ -112,7 +113,7 @@ class Moderation(commands.Cog): async def db_generate_guild_join(self, guild: discord.Guild): """This method prepares the database schema whenever the bot joins a guild.""" if not await self.bot.cog_disabled_in_guild(self, guild): - conf = await self.check_conf([ + conf = await check_conf([ 'mysql_address', 'mysql_database', 'mysql_username', @@ -120,11 +121,11 @@ class Moderation(commands.Cog): ]) if conf: - self.logger.error("Failed to create a table for %s, due to MySQL connection configuration being unset.", guild.id) + logger.error("Failed to create a table for %s, due to MySQL connection configuration being unset.", guild.id) return try: - await self.create_guild_table(guild) + await create_guild_table(guild) except ConnectionRefusedError: return @@ -169,446 +170,7 @@ class Moderation(commands.Cog): else: return - await self.mysql_log(entry.guild.id, entry.user.id, moderation_type, entry.target.id, 0, duration, reason) - - async def connect(self): - """Connects to the MySQL database, and returns a connection object.""" - conf = await self.check_conf([ - 'mysql_address', - 'mysql_database', - 'mysql_username', - 'mysql_password' - ]) - - if conf: - raise LookupError("MySQL connection details not set properly!") - - try: - connection = mysql.connector.connect( - host=await self.config.mysql_address(), - user=await self.config.mysql_username(), - password=await self.config.mysql_password(), - database=await self.config.mysql_database() - ) - - return connection - - except mysql.connector.ProgrammingError as e: - self.logger.error("Unable to access the MySQL database!\nError:\n%s", e.msg) - raise ConnectionRefusedError(f"Unable to access the MySQL Database!\n{e.msg}") from e - - async def create_guild_table(self, guild: discord.Guild): - database = await self.connect() - cursor = database.cursor() - - try: - cursor.execute(f"SELECT * FROM `moderation_{guild.id}`") - self.logger.debug("MySQL Table exists for server %s (%s)", guild.name, guild.id) - - except mysql.connector.errors.ProgrammingError: - query = f""" - CREATE TABLE `moderation_{guild.id}` ( - moderation_id INT UNIQUE PRIMARY KEY NOT NULL, - timestamp INT NOT NULL, - moderation_type LONGTEXT NOT NULL, - target_id LONGTEXT NOT NULL, - moderator_id LONGTEXT NOT NULL, - role_id LONGTEXT, - duration LONGTEXT, - end_timestamp INT, - reason LONGTEXT, - resolved BOOL NOT NULL, - resolved_by LONGTEXT, - resolve_reason LONGTEXT, - expired BOOL NOT NULL, - changes JSON NOT NULL, - metadata JSON NOT NULL - ) - """ - cursor.execute(query) - - index_query_1 = "CREATE INDEX idx_target_id ON moderation_%s(target_id(25));" - cursor.execute(index_query_1, (guild.id,)) - index_query_2 = "CREATE INDEX idx_moderator_id ON moderation_%s(moderator_id(25));" - cursor.execute(index_query_2, (guild.id,)) - index_query_3 = "CREATE INDEX idx_moderation_id ON moderation_%s(moderation_id);" - cursor.execute(index_query_3, (guild.id,)) - - insert_query = f""" - INSERT INTO `moderation_{guild.id}` - (moderation_id, timestamp, moderation_type, target_id, moderator_id, role_id, duration, end_timestamp, reason, resolved, resolved_by, resolve_reason, expired, changes, metadata) - VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) - """ - insert_values = (0, 0, "NULL", 0, 0, 0, "NULL", 0, "NULL", 0, "NULL", "NULL", 0, json.dumps([]), json.dumps({})) - cursor.execute(insert_query, insert_values) - - database.commit() - - self.logger.debug("MySQL Table (moderation_%s) created for %s (%s)", guild.id, guild.name, guild.id) - - database.close() - - async def check_conf(self, config: list): - """Checks if any required config options are not set.""" - not_found_list = [] - - for item in config: - if await self.config.item() == " ": - not_found_list.append(item) - - return not_found_list - - def check_permissions(self, user: discord.User, permissions: list, ctx: Union[commands.Context, discord.Interaction] = None, guild: discord.Guild = None): - """Checks if a user has a specific permission (or a list of permissions) in a channel.""" - if ctx: - member = ctx.guild.get_member(user.id) - resolved_permissions = ctx.channel.permissions_for(member) - - elif guild: - member = guild.get_member(user.id) - resolved_permissions = member.guild_permissions - - else: - raise(KeyError) - - for permission in permissions: - if not getattr(resolved_permissions, permission, False) and not resolved_permissions.administrator is True: - return permission - - return False - - async def check_moddable(self, target: Union[discord.User, discord.Member], interaction: discord.Interaction, permissions: list): - """Checks if a moderator can moderate a target.""" - if self.check_permissions(interaction.client.user, permissions, guild=interaction.guild): - await interaction.response.send_message(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True) - return False - - if await self.config.guild(interaction.guild).use_discord_permissions() is True: - if self.check_permissions(interaction.user, permissions, guild=interaction.guild): - await interaction.response.send_message(f"You do not have the `{permissions}` permission, required for this action.", ephemeral=True) - return False - - if interaction.user.id == target.id: - await interaction.response.send_message(content="You cannot moderate yourself!", ephemeral=True) - return False - - if target.bot: - await interaction.response.send_message(content="You cannot moderate bots!", ephemeral=True) - return False - - if isinstance(target, discord.Member): - if interaction.user.top_role <= target.top_role: - await interaction.response.send_message(content="You cannot moderate members with a higher role than you!", ephemeral=True) - return False - - if interaction.guild.get_member(interaction.client.user.id).top_role <= target.top_role: - await interaction.response.send_message(content="You cannot moderate members with a role higher than the bot!", ephemeral=True) - return False - - immune_roles = await self.config.guild(target.guild).immune_roles() - - for role in target.roles: - if role.id in immune_roles: - await interaction.response.send_message(content="You cannot moderate members with an immune role!", ephemeral=True) - return False - - return True - - async def mysql_log(self, guild_id: str, author_id: str, moderation_type: str, target_id: int, role_id: int, duration, reason: str, database: mysql.connector.MySQLConnection = None, timestamp: int = None, resolved: bool = False, resolved_by: str = None, resolved_reason: str = None, expired: bool = None, changes: list = [], metadata: dict = {}): # pylint: disable=dangerous-default-argument - if not timestamp: - timestamp = int(time.time()) - - if duration != "NULL": - end_timedelta = datetime.fromtimestamp(timestamp) + duration - end_timestamp = int(end_timedelta.timestamp()) - else: - end_timestamp = 0 - - if not expired: - if int(time.time()) > end_timestamp: - expired = 1 - else: - expired = 0 - - if resolved_by is None: - resolved_by = "NULL" - - if resolved_reason is None: - resolved_reason = "NULL" - - if not database: - database = await self.connect() - close_db = True - else: - close_db = False - cursor = database.cursor() - - moderation_id = await self.get_next_case_number(guild_id=guild_id, cursor=cursor) - - sql = f"INSERT INTO `moderation_{guild_id}` (moderation_id, timestamp, moderation_type, target_id, moderator_id, role_id, duration, end_timestamp, reason, resolved, resolved_by, resolve_reason, expired, changes, metadata) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)" - val = (moderation_id, timestamp, moderation_type, target_id, author_id, role_id, duration, end_timestamp, reason, int(resolved), resolved_by, resolved_reason, expired, json.dumps(changes), json.dumps(metadata)) - cursor.execute(sql, val) - - cursor.close() - database.commit() - if close_db: - database.close() - - self.logger.debug("MySQL row inserted into moderation_%s!\n%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s", guild_id, moderation_id, timestamp, moderation_type, target_id, author_id, role_id, duration, end_timestamp, reason, int(resolved), resolved_by, resolved_reason, expired, changes, metadata) - - return moderation_id - - async def get_next_case_number(self, guild_id: str, cursor = None): - """This method returns the next case number from the MySQL table for a specific guild.""" - if not cursor: - database = await self.connect() - cursor = database.cursor() - cursor.execute(f"SELECT moderation_id FROM `moderation_{guild_id}` ORDER BY moderation_id DESC LIMIT 1") - return cursor.fetchone()[0] + 1 - - def generate_dict(self, result): - case: dict = { - "moderation_id": result[0], - "timestamp": result[1], - "moderation_type": result[2], - "target_id": result[3], - "moderator_id": result[4], - "role_id": result[5], - "duration": result[6], - "end_timestamp": result[7], - "reason": result[8], - "resolved": result[9], - "resolved_by": result[10], - "resolve_reason": result[11], - "expired": result[12], - "changes": json.loads(result[13]), - "metadata": json.loads(result[14]) - } - return case - - async def fetch_user_dict(self, interaction: discord.Interaction, user_id: str): - """This method returns a dictionary containing either user information or a standard deleted user template.""" - if user_id == '?': - user_dict = { - 'id': '?', - 'name': 'Unknown User', - 'discriminator':'0' - } - - else: - try: - user = interaction.client.get_user(user_id) - if user is None: - user = await interaction.client.fetch_user(user_id) - - user_dict = { - 'id': user.id, - 'name': user.name, - 'discriminator': user.discriminator - } - - except discord.errors.NotFound: - user_dict = { - 'id': user_id, - 'name': 'Deleted User', - 'discriminator': '0' - } - - return user_dict - - async def fetch_role_dict(self, interaction: discord.Interaction, role_id: str): - """This method returns a dictionary containing either role information or a standard deleted role template.""" - try: - role = interaction.guild.get_role(role_id) - - role_dict = { - 'id': role.id, - 'name': role.name - } - - except discord.errors.NotFound: - role_dict = { - 'id': role_id, - 'name': 'Deleted Role' - } - - return role_dict - - async def embed_factory(self, embed_type: str, /, interaction: discord.Interaction = None, case_dict: dict = None, guild: discord.Guild = None, reason: str = None, moderation_type: str = None, response: discord.InteractionMessage = None, duration: timedelta = None, resolved: bool = False): - """This method creates an embed from set parameters, meant for either moderation logging or contacting the moderated user. - - Valid arguments for 'embed_type': - - 'message' - - 'log' - WIP - - 'case' - - 'changes' - - Required arguments for 'message': - - guild - - reason - - moderation_type - - response - - duration (optional) - - Required arguments for 'log': - - interaction - - case_dict - - resolved (optional) - - Required arguments for 'case' & 'changes': - - interaction - - case_dict""" - if embed_type == 'message': - - if moderation_type in ["kicked", "banned", "tempbanned", "unbanned"]: - guild_name = guild.name - else: - guild_name = f"[{guild.name}]({response.jump_url})" - - if moderation_type in ["tempbanned", "muted"] and duration: - embed_duration = f" for {humanize.precisedelta(duration)}" - else: - embed_duration = "" - - if moderation_type == "note": - embed_desc = "received a" - else: - embed_desc = "been" - - embed = discord.Embed(title=str.title(moderation_type), description=f"You have {embed_desc} {moderation_type}{embed_duration} in {guild_name}.", color=await self.bot.get_embed_color(None), timestamp=datetime.now()) - embed.add_field(name='Reason', value=f"`{reason}`") - embed.set_author(name=guild.name, icon_url=guild.icon.url) - embed.set_footer(text=f"Case #{await self.get_next_case_number(guild.id):,}", icon_url="https://cdn.discordapp.com/attachments/1070822161389994054/1159469476773904414/arrow-right-circle-icon-512x512-2p1e2aaw.png?ex=65312319&is=651eae19&hm=3cebdd28e805c13a79ec48ef87c32ca532ffa6b9ede2e48d0cf8e5e81f3a6818&") - - return embed - - if embed_type == 'case': - target_user = await self.fetch_user_dict(interaction, case_dict['target_id']) - moderator_user = await self.fetch_user_dict(interaction, case_dict['moderator_id']) - - target_name = f"`{target_user['name']}`" if target_user['discriminator'] == "0" else f"`{target_user['name']}#{target_user['discriminator']}`" - moderator_name = f"`{moderator_user['name']}`" if moderator_user['discriminator'] == "0" else f"`{moderator_user['name']}#{moderator_user['discriminator']}`" - - embed = discord.Embed(title=f"📕 Case #{case_dict['moderation_id']:,}", color=await self.bot.get_embed_color(None)) - embed.description = f"**Type:** {str.title(case_dict['moderation_type'])}\n**Target:** {target_name} ({target_user['id']})\n**Moderator:** {moderator_name} ({moderator_user['id']})\n**Resolved:** {bool(case_dict['resolved'])}\n**Timestamp:** | " - - if case_dict['duration'] != 'NULL': - td = timedelta(**{unit: int(val) for unit, val in zip(["hours", "minutes", "seconds"], case_dict["duration"].split(":"))}) - duration_embed = f"{humanize.precisedelta(td)} | " if bool(case_dict['expired']) is False else str(humanize.precisedelta(td)) - embed.description += f"\n**Duration:** {duration_embed}\n**Expired:** {bool(case_dict['expired'])}" - - embed.description += f"\n**Changes:** {len(case_dict['changes'])}" - - if case_dict['metadata']: - if case_dict['metadata']['imported_from']: - embed.description += f"\n**Imported From:** {case_dict['metadata']['imported_from']}" - - embed.add_field(name='Reason', value=f"```{case_dict['reason']}```", inline=False) - - if case_dict['resolved'] == 1: - resolved_user = await self.fetch_user_dict(interaction, case_dict['resolved_by']) - resolved_name = f"`{resolved_user['name']}`" if resolved_user['discriminator'] == "0" else f"`{resolved_user['name']}#{resolved_user['discriminator']}`" - embed.add_field(name='Resolve Reason', value=f"Resolved by {resolved_name} ({resolved_user['id']}) for:\n```{case_dict['resolve_reason']}```", inline=False) - - return embed - - if embed_type == 'changes': - embed = discord.Embed(title=f"📕 Case #{case_dict['moderation_id']:,} Changes", color=await self.bot.get_embed_color(None)) - - memory_dict = {} - - if case_dict['changes']: - for change in case_dict['changes']: - if change['user_id'] not in memory_dict: - memory_dict[str(change['user_id'])] = await self.fetch_user_dict(interaction, change['user_id']) - - user = memory_dict[str(change['user_id'])] - name = user['name'] if user['discriminator'] == "0" else f"{user['name']}#{user['discriminator']}" - - timestamp = f" | " - - if change['type'] == 'ORIGINAL': - embed.add_field(name='Original', value=f"**User:** `{name}` ({user['id']})\n**Reason:** {change['reason']}\n**Timestamp:** {timestamp}", inline=False) - - elif change['type'] == 'EDIT': - embed.add_field(name='Edit', value=f"**User:** `{name}` ({user['id']})\n**Reason:** {change['reason']}\n**Timestamp:** {timestamp}", inline=False) - - elif change['type'] == 'RESOLVE': - embed.add_field(name='Resolve', value=f"**User:** `{name}` ({user['id']})\n**Reason:** {change['reason']}\n**Timestamp:** {timestamp}", inline=False) - - else: - embed.description = "*No changes have been made to this case.* 🙁" - - return embed - - if embed_type == 'log': - if resolved: - target_user = await self.fetch_user_dict(interaction, case_dict['target_id']) - moderator_user = await self.fetch_user_dict(interaction, case_dict['moderator_id']) - - target_name = f"`{target_user['name']}`" if target_user['discriminator'] == "0" else f"`{target_user['name']}#{target_user['discriminator']}`" - moderator_name = moderator_user['name'] if moderator_user['discriminator'] == "0" else f"{moderator_user['name']}#{moderator_user['discriminator']}" - - embed = discord.Embed(title=f"📕 Case #{case_dict['moderation_id']:,} Resolved", color=await self.bot.get_embed_color(None)) - - embed.description = f"**Type:** {str.title(case_dict['moderation_type'])}\n**Target:** {target_name} ({target_user['id']})\n**Moderator:** {moderator_name} ({moderator_user['id']})\n**Timestamp:** | " - - if case_dict['duration'] != 'NULL': - td = timedelta(**{unit: int(val) for unit, val in zip(["hours", "minutes", "seconds"], case_dict["duration"].split(":"))}) - duration_embed = f"{humanize.precisedelta(td)} | " if case_dict["expired"] == '0' else str(humanize.precisedelta(td)) - embed.description = embed.description + f"\n**Duration:** {duration_embed}\n**Expired:** {bool(case_dict['expired'])}" - - embed.add_field(name='Reason', value=f"```{case_dict['reason']}```", inline=False) - - resolved_user = await self.fetch_user_dict(interaction, case_dict['resolved_by']) - resolved_name = resolved_user['name'] if resolved_user['discriminator'] == "0" else f"{resolved_user['name']}#{resolved_user['discriminator']}" - embed.add_field(name='Resolve Reason', value=f"Resolved by {resolved_name} ({resolved_user['id']}) for:\n```{case_dict['resolve_reason']}```", inline=False) - else: - target_user = await self.fetch_user_dict(interaction, case_dict['target_id']) - moderator_user = await self.fetch_user_dict(interaction, case_dict['moderator_id']) - - target_name = f"`{target_user['name']}`" if target_user['discriminator'] == "0" else f"`{target_user['name']}#{target_user['discriminator']}`" - moderator_name = moderator_user['name'] if moderator_user['discriminator'] == "0" else f"{moderator_user['name']}#{moderator_user['discriminator']}" - - embed = discord.Embed(title=f"📕 Case #{case_dict['moderation_id']:,}", color=await self.bot.get_embed_color(None)) - embed.description = f"**Type:** {str.title(case_dict['moderation_type'])}\n**Target:** {target_name} ({target_user['id']})\n**Moderator:** {moderator_name} ({moderator_user['id']})\n**Timestamp:** | " - - if case_dict['duration'] != 'NULL': - td = timedelta(**{unit: int(val) for unit, val in zip(["hours", "minutes", "seconds"], case_dict["duration"].split(":"))}) - embed.description = embed.description + f"\n**Duration:** {humanize.precisedelta(td)} | " - - embed.add_field(name='Reason', value=f"```{case_dict['reason']}```", inline=False) - return embed - - raise(TypeError("'type' argument is invalid!")) - - async def fetch_case(self, moderation_id: int, guild_id: str): - """This method fetches a case from the database and returns the case's dictionary.""" - database = await self.connect() - cursor = database.cursor() - - query = "SELECT * FROM moderation_%s WHERE moderation_id = %s;" - cursor.execute(query, (guild_id, moderation_id)) - result = cursor.fetchone() - - cursor.close() - database.close() - - return self.generate_dict(result) - - async def log(self, interaction: discord.Interaction, moderation_id: int, resolved: bool = False): - """This method sends a message to the guild's configured logging channel when an infraction takes place.""" - logging_channel_id = await self.config.guild(interaction.guild).log_channel() - if logging_channel_id != " ": - logging_channel = interaction.guild.get_channel(logging_channel_id) - - case = await self.fetch_case(moderation_id, interaction.guild.id) - if case: - embed = await self.embed_factory('log', interaction=interaction, case_dict=case, resolved=resolved) - try: - await logging_channel.send(embed=embed) - except discord.errors.Forbidden: - return + await mysql_log(entry.guild.id, entry.user.id, moderation_type, entry.target.id, 0, duration, reason) ####################################################################################################################### ### COMMANDS @@ -626,7 +188,7 @@ class Moderation(commands.Cog): Why are you noting this user? silent: bool Should the user be messaged?""" - if not await self.check_moddable(target, interaction, ['moderate_members']): + if not await check_moddable(target, interaction, ['moderate_members']): return await interaction.response.send_message(content=f"{target.mention} has recieved a note!\n**Reason** - `{reason}`") @@ -635,14 +197,14 @@ class Moderation(commands.Cog): silent = not await self.config.guild(interaction.guild).dm_users() if silent is False: try: - embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='note', response=await interaction.original_response()) + embed = await embed_factory('message', await self.bot.get_embed_color(None), guild=interaction.guild, reason=reason, moderation_type='note', response=await interaction.original_response()) await target.send(embed=embed) except discord.errors.HTTPException: pass - moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'NOTE', target.id, 0, 'NULL', reason) + moderation_id = await mysql_log(interaction.guild.id, interaction.user.id, 'NOTE', target.id, 0, 'NULL', reason) await interaction.edit_original_response(content=f"{target.mention} has received a note! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`") - await self.log(interaction, moderation_id) + await log(interaction, moderation_id) @app_commands.command(name="warn") async def warn(self, interaction: discord.Interaction, target: discord.Member, reason: str, silent: bool = None): @@ -656,7 +218,7 @@ class Moderation(commands.Cog): Why are you warning this user? silent: bool Should the user be messaged?""" - if not await self.check_moddable(target, interaction, ['moderate_members']): + if not await check_moddable(target, interaction, ['moderate_members']): return await interaction.response.send_message(content=f"{target.mention} has been warned!\n**Reason** - `{reason}`") @@ -665,14 +227,14 @@ class Moderation(commands.Cog): silent = not await self.config.guild(interaction.guild).dm_users() if silent is False: try: - embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='warned', response=await interaction.original_response()) + embed = await embed_factory('message', await self.bot.get_embed_color(None), guild=interaction.guild, reason=reason, moderation_type='warned', response=await interaction.original_response()) await target.send(embed=embed) except discord.errors.HTTPException: pass - moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'WARN', target.id, 0, 'NULL', reason) + moderation_id = await mysql_log(interaction.guild.id, interaction.user.id, 'WARN', target.id, 0, 'NULL', reason) await interaction.edit_original_response(content=f"{target.mention} has been warned! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`") - await self.log(interaction, moderation_id) + await log(interaction, moderation_id) async def blacklist_autocomplete(self, interaction: discord.Interaction, current: str,) -> list[app_commands.Choice[str]]: """Autocompletes a blacklist role.""" @@ -715,7 +277,7 @@ class Moderation(commands.Cog): await interaction.response.send_message(content=f"Please provide a valid blacklist type!", ephemeral=True) return - if not await self.check_moddable(target, interaction, ['moderate_members', 'manage_roles']): + if not await check_moddable(target, interaction, ['moderate_members', 'manage_roles']): return if role in [role.id for role in target.roles]: @@ -726,9 +288,9 @@ class Moderation(commands.Cog): await target.add_roles(role, reason=f"Blacklisted by {interaction.user.id} for {humanize.precisedelta(matching_role['duration'])} for: {reason}") await interaction.response.send_message(content=f"{target.mention} has been blacklisted with the {role_obj.name} role for {humanize.precisedelta(matching_role['duration'])}!\n**Reason** - `{reason}`") - moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'BLACKLIST', target.id, role, matching_role['duration'], reason) + moderation_id = await mysql_log(interaction.guild.id, interaction.user.id, 'BLACKLIST', target.id, role, matching_role['duration'], reason) await interaction.edit_original_response(content=f"{target.mention} has been blacklisted with the {role_obj.name} role for {humanize.precisedelta(matching_role['duration'])}! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`") - await self.log(interaction, moderation_id) + await log(interaction, moderation_id) @app_commands.command(name="mute") async def mute(self, interaction: discord.Interaction, target: discord.Member, duration: str, reason: str, silent: bool = None): @@ -744,7 +306,7 @@ class Moderation(commands.Cog): Why are you unbanning this user? silent: bool Should the user be messaged?""" - if not await self.check_moddable(target, interaction, ['moderate_members']): + if not await check_moddable(target, interaction, ['moderate_members']): return if target.is_timed_out() is True: @@ -769,14 +331,14 @@ class Moderation(commands.Cog): silent = not await self.config.guild(interaction.guild).dm_users() if silent is False: try: - embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='muted', response=await interaction.original_response(), duration=parsed_time) + embed = await embed_factory('message', await self.bot.get_embed_color(None), guild=interaction.guild, reason=reason, moderation_type='muted', response=await interaction.original_response(), duration=parsed_time) await target.send(embed=embed) except discord.errors.HTTPException: pass - moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'MUTE', target.id, 0, parsed_time, reason) + moderation_id = await mysql_log(interaction.guild.id, interaction.user.id, 'MUTE', target.id, 0, parsed_time, reason) await interaction.edit_original_response(content=f"{target.mention} has been muted for {humanize.precisedelta(parsed_time)}! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`") - await self.log(interaction, moderation_id) + await log(interaction, moderation_id) @app_commands.command(name="unmute") async def unmute(self, interaction: discord.Interaction, target: discord.Member, reason: str = None, silent: bool = None): @@ -790,7 +352,7 @@ class Moderation(commands.Cog): Why are you unmuting this user? silent: bool Should the user be messaged?""" - if not await self.check_moddable(target, interaction, ['moderate_members']): + if not await check_moddable(target, interaction, ['moderate_members']): return if target.is_timed_out() is False: @@ -809,14 +371,14 @@ class Moderation(commands.Cog): silent = not await self.config.guild(interaction.guild).dm_users() if silent is False: try: - embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='unmuted', response=await interaction.original_response()) + embed = await embed_factory('message', await self.bot.get_embed_color(None), guild=interaction.guild, reason=reason, moderation_type='unmuted', response=await interaction.original_response()) await target.send(embed=embed) except discord.errors.HTTPException: pass - moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'UNMUTE', target.id, 0, 'NULL', reason) + moderation_id = await mysql_log(interaction.guild.id, interaction.user.id, 'UNMUTE', target.id, 0, 'NULL', reason) await interaction.edit_original_response(content=f"{target.mention} has been unmuted! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`") - await self.log(interaction, moderation_id) + await log(interaction, moderation_id) @app_commands.command(name="kick") async def kick(self, interaction: discord.Interaction, target: discord.Member, reason: str, silent: bool = None): @@ -830,7 +392,7 @@ class Moderation(commands.Cog): Why are you kicking this user? silent: bool Should the user be messaged?""" - if not await self.check_moddable(target, interaction, ['kick_members']): + if not await check_moddable(target, interaction, ['kick_members']): return await interaction.response.send_message(content=f"{target.mention} has been kicked!\n**Reason** - `{reason}`") @@ -839,16 +401,16 @@ class Moderation(commands.Cog): silent = not await self.config.guild(interaction.guild).dm_users() if silent is False: try: - embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='kicked', response=await interaction.original_response()) + embed = await embed_factory('message', await self.bot.get_embed_color(None), guild=interaction.guild, reason=reason, moderation_type='kicked', response=await interaction.original_response()) await target.send(embed=embed) except discord.errors.HTTPException: pass await target.kick(reason=f"Kicked by {interaction.user.id} for: {reason}") - moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'KICK', target.id, 0, 'NULL', reason) + moderation_id = await mysql_log(interaction.guild.id, interaction.user.id, 'KICK', target.id, 0, 'NULL', reason) await interaction.edit_original_response(content=f"{target.mention} has been kicked! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`") - await self.log(interaction, moderation_id) + await log(interaction, moderation_id) @app_commands.command(name="ban") @app_commands.choices(delete_messages=[ @@ -874,7 +436,7 @@ class Moderation(commands.Cog): How many days of messages to delete? silent: bool Should the user be messaged?""" - if not await self.check_moddable(target, interaction, ['ban_members']): + if not await check_moddable(target, interaction, ['ban_members']): return try: @@ -894,16 +456,16 @@ class Moderation(commands.Cog): await interaction.response.send_message(content=f"{target.mention} has been banned for {humanize.precisedelta(parsed_time)}!\n**Reason** - `{reason}`") try: - embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='tempbanned', response=await interaction.original_response(), duration=parsed_time) + embed = await embed_factory('message', await self.bot.get_embed_color(None), guild=interaction.guild, reason=reason, moderation_type='tempbanned', response=await interaction.original_response(), duration=parsed_time) await target.send(embed=embed) except discord.errors.HTTPException: pass await interaction.guild.ban(target, reason=f"Tempbanned by {interaction.user.id} for: {reason} (Duration: {parsed_time})", delete_message_seconds=delete_messages) - moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'TEMPBAN', target.id, 0, parsed_time, reason) + moderation_id = await mysql_log(interaction.guild.id, interaction.user.id, 'TEMPBAN', target.id, 0, parsed_time, reason) await interaction.edit_original_response(content=f"{target.mention} has been banned for {humanize.precisedelta(parsed_time)}! (Case `#{moderation_id}`)\n**Reason** - `{reason}`") - await self.log(interaction, moderation_id) + await log(interaction, moderation_id) else: await interaction.response.send_message(content=f"{target.mention} has been banned!\n**Reason** - `{reason}`") @@ -911,7 +473,7 @@ class Moderation(commands.Cog): silent = not await self.config.guild(interaction.guild).dm_users() if silent is False: try: - embed = embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='banned', response=await interaction.original_response()) + embed = embed = await embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='banned', response=await interaction.original_response()) await target.send(embed=embed) except discord.errors.HTTPException: pass @@ -920,7 +482,7 @@ class Moderation(commands.Cog): moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'BAN', target.id, 0, 'NULL', reason) await interaction.edit_original_response(content=f"{target.mention} has been banned! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`") - await self.log(interaction, moderation_id) + await log(interaction, moderation_id) @app_commands.command(name="unban") async def unban(self, interaction: discord.Interaction, target: discord.User, reason: str = None, silent: bool = None): @@ -934,7 +496,7 @@ class Moderation(commands.Cog): Why are you unbanning this user? silent: bool Should the user be messaged?""" - if not await self.check_moddable(target, interaction, ['ban_members']): + if not await check_moddable(target, interaction, ['ban_members']): return try: @@ -955,14 +517,14 @@ class Moderation(commands.Cog): silent = not await self.config.guild(interaction.guild).dm_users() if silent is False: try: - embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='unbanned', response=await interaction.original_response()) + embed = await embed_factory('message', await self.bot.get_embed_color(None), guild=interaction.guild, reason=reason, moderation_type='unbanned', response=await interaction.original_response()) await target.send(embed=embed) except discord.errors.HTTPException: pass - moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'UNBAN', target.id, 0, 'NULL', reason) + moderation_id = await mysql_log(interaction.guild.id, interaction.user.id, 'UNBAN', target.id, 0, 'NULL', reason) await interaction.edit_original_response(content=f"{target.mention} has been unbanned! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`") - await self.log(interaction, moderation_id) + await log(interaction, moderation_id) @app_commands.command(name="history") async def history(self, interaction: discord.Interaction, target: discord.User = None, moderator: discord.User = None, pagesize: app_commands.Range[int, 1, 20] = None, page: int = 1, ephemeral: bool = None, inline: bool = None, export: bool = False): @@ -1007,12 +569,12 @@ class Moderation(commands.Cog): await interaction.response.defer(ephemeral=ephemeral) - permissions = self.check_permissions(interaction.client.user, ['embed_links'], interaction) + permissions = check_permissions(interaction.client.user, ['embed_links'], interaction) if permissions: await interaction.followup.send(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True) return - database = await self.connect() + database = await connect() cursor = database.cursor() if target: @@ -1037,7 +599,7 @@ class Moderation(commands.Cog): result_dict_list = [] for result in results: - case_dict = self.generate_dict(result) + case_dict = generate_dict(result) if case_dict['moderation_id'] == 0: continue result_dict_list.append(case_dict) @@ -1070,11 +632,11 @@ class Moderation(commands.Cog): for case in result_dict_list[start_index:end_index]: if case['target_id'] not in memory_dict: - memory_dict[str(case['target_id'])] = await self.fetch_user_dict(interaction, case['target_id']) + memory_dict[str(case['target_id'])] = await fetch_user_dict(interaction, case['target_id']) target_user = memory_dict[str(case['target_id'])] if case['moderator_id'] not in memory_dict: - memory_dict[str(case['moderator_id'])] = await self.fetch_user_dict(interaction, case['moderator_id']) + memory_dict[str(case['moderator_id'])] = await fetch_user_dict(interaction, case['moderator_id']) moderator_user = memory_dict[str(case['moderator_id'])] target_name = target_user['name'] if target_user['discriminator'] == "0" else f"{target_user['name']}#{target_user['discriminator']}" @@ -1112,16 +674,16 @@ class Moderation(commands.Cog): Case number of the case you're trying to resolve reason: str Reason for resolving case""" - permissions = self.check_permissions(interaction.client.user, ['embed_links', 'moderate_members', 'ban_members'], interaction) + permissions = check_permissions(interaction.client.user, ['embed_links', 'moderate_members', 'ban_members'], interaction) if permissions: await interaction.response.send_message(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True) return - conf = await self.check_conf(['mysql_database']) + conf = await check_conf(['mysql_database']) if conf: raise(LookupError) - database = await self.connect() + database = await connect() cursor = database.cursor() db = await self.config.mysql_database() @@ -1139,7 +701,7 @@ class Moderation(commands.Cog): await interaction.response.send_message(content=f"This moderation has already been resolved!\nUse `/case {case}` for more information.", ephemeral=True) return - case_dict = self.generate_dict(result_2) + case_dict = generate_dict(result_2) if reason is None: reason = "No reason given." @@ -1192,9 +754,9 @@ class Moderation(commands.Cog): cursor.execute(resolve_query, (json.dumps(changes), interaction.user.id, reason, case_dict['moderation_id'])) database.commit() - embed = await self.embed_factory('case', interaction=interaction, case_dict=await self.fetch_case(case, interaction.guild.id)) + embed = await embed_factory('case', await self.bot.get_embed_color(None), interaction=interaction, case_dict=await self.fetch_case(case, interaction.guild.id)) await interaction.response.send_message(content=f"✅ Moderation #{case:,} resolved!", embed=embed) - await self.log(interaction, case, True) + await log(interaction, case, True) cursor.close() database.close() @@ -1217,7 +779,7 @@ class Moderation(commands.Cog): List the changes made to the case export: bool Export the case to a JSON file or codeblock""" - permissions = self.check_permissions(interaction.client.user, ['embed_links'], interaction) + permissions = check_permissions(interaction.client.user, ['embed_links'], interaction) if permissions: await interaction.response.send_message(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True) return @@ -1228,7 +790,7 @@ class Moderation(commands.Cog): or False) if case != 0: - case_dict = await self.fetch_case(case, interaction.guild.id) + case_dict = await fetch_case(case, interaction.guild.id) if case_dict: if export: if export.value == 'file' or len(str(case_dict)) > 1800: @@ -1249,9 +811,9 @@ class Moderation(commands.Cog): await interaction.response.send_message(content=f"```json\n{json.dumps(case_dict, indent=2)}```", ephemeral=ephemeral) return if changes: - embed = await self.embed_factory('changes', interaction=interaction, case_dict=case_dict) + embed = await embed_factory('changes', await self.bot.get_embed_color(None), interaction=interaction, case_dict=case_dict) else: - embed = await self.embed_factory('case', interaction=interaction, case_dict=case_dict) + embed = await embed_factory('case', await self.bot.get_embed_color(None), interaction=interaction, case_dict=case_dict) await interaction.response.send_message(embed=embed, ephemeral=ephemeral) return await interaction.response.send_message(content=f"No case with case number `{case}` found.", ephemeral=True) @@ -1268,16 +830,16 @@ class Moderation(commands.Cog): What is the new reason? duration: str What is the new duration? Does not reapply the moderation if it has already expired.""" - permissions = self.check_permissions(interaction.client.user, ['embed_links'], interaction) + permissions = check_permissions(interaction.client.user, ['embed_links'], interaction) if permissions: await interaction.response.send_message(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True) return if case != 0: parsed_time = None - case_dict = await self.fetch_case(case, interaction.guild.id) + case_dict = await fetch_case(case, interaction.guild.id) if case_dict: - conf = await self.check_conf(['mysql_database']) + conf = await check_conf(['mysql_database']) if conf: raise(LookupError) @@ -1340,7 +902,7 @@ class Moderation(commands.Cog): } ) - database = await self.connect() + database = await connect() cursor = database.cursor() db = await self.config.mysql_database() @@ -1352,11 +914,11 @@ class Moderation(commands.Cog): cursor.execute(update_query, (json.dumps(changes), reason, case)) database.commit() - new_case = await self.fetch_case(case, interaction.guild.id) - embed = await self.embed_factory('case', interaction=interaction, case_dict=new_case) + new_case = await fetch_case(case, interaction.guild.id) + embed = await embed_factory('case', await self.bot.get_embed_color(None), interaction=interaction, case_dict=new_case) await interaction.response.send_message(content=f"✅ Moderation #{case:,} edited!", embed=embed, ephemeral=True) - await self.log(interaction, case) + await log(interaction, case) cursor.close() database.close() @@ -1365,11 +927,11 @@ class Moderation(commands.Cog): @tasks.loop(minutes=1) async def handle_expiry(self): - conf = await self.check_conf(['mysql_database']) + conf = await check_conf(['mysql_database']) if conf: raise(LookupError) - database = await self.connect() + database = await connect() cursor = database.cursor() db = await self.config.mysql_database() @@ -1393,7 +955,7 @@ class Moderation(commands.Cog): try: await guild.unban(user, reason=f"Automatic unban from case #{moderation_id}") - embed = await self.embed_factory('message', guild, f'Automatic unban from case #{moderation_id}', 'unbanned') + embed = await embed_factory('message', guild, f'Automatic unban from case #{moderation_id}', 'unbanned') try: await user.send(embed=embed) @@ -1790,14 +1352,13 @@ class Moderation(commands.Cog): """Import moderations from GalacticBot.""" if ctx.message.attachments and ctx.message.attachments[0].content_type == 'application/json; charset=utf-8': message = await ctx.send("Are you sure you want to import GalacticBot moderations?\n**This will overwrite any moderations that already exist in this guild's moderation table.**\n*The import process will block the rest of your bot until it is complete.*") - await message.edit(view=self.GalacticBotImportButtons(60, ctx, message, self)) + await message.edit(view=self.GalacticBotImportButtons(60, ctx, message)) else: await ctx.send("Please provide a valid GalacticBot moderation export file.") class GalacticBotImportButtons(discord.ui.View): - def __init__(self, timeout, ctx, message, cog_instance): + def __init__(self, timeout, ctx, message): super().__init__() - self.cog_instance = cog_instance self.ctx: commands.Context = ctx self.message: discord.Message = message self.config = Config.get_conf(None, cog_name='Moderation', identifier=481923957134912) @@ -1807,7 +1368,7 @@ class Moderation(commands.Cog): await self.message.delete() await interaction.response.send_message("Deleting original table...", ephemeral=True) - database = await Moderation.connect(self.cog_instance) + database = await connect() cursor = database.cursor() query = f"DROP TABLE IF EXISTS moderation_{self.ctx.guild.id};" @@ -1818,7 +1379,7 @@ class Moderation(commands.Cog): await interaction.edit_original_response(content="Creating new table...") - await Moderation.create_guild_table(self.cog_instance, self.ctx.guild) + await create_guild_table(self.ctx.guild) await interaction.edit_original_response(content="Importing moderations...") @@ -1891,8 +1452,7 @@ class Moderation(commands.Cog): resolved_reason = 'NULL' changes = [] - await Moderation.mysql_log( - self.cog_instance, + await mysql_log( self.ctx.guild.id, case['executor'], case['type'], diff --git a/moderation/utils.py b/moderation/utils.py new file mode 100644 index 0000000..a7e0528 --- /dev/null +++ b/moderation/utils.py @@ -0,0 +1,198 @@ +import json +from typing import Union +from discord import User, Member, Interaction, Guild +from discord.errors import NotFound, Forbidden +from redbot.core import commands +import database as db +from .embed_factory import embed_factory +from .config import config + + +async def check_conf(config_list: list): + """Checks if any required config options are not set.""" + not_found_list = [] + + for item in config_list: + if await config.item() == " ": + not_found_list.append(item) + + return not_found_list + + +def check_permissions( + user: User, + permissions: list, + ctx: Union[commands.Context, Interaction] = None, + guild: Guild = None, +): + """Checks if a user has a specific permission (or a list of permissions) in a channel.""" + if ctx: + member = ctx.guild.get_member(user.id) + resolved_permissions = ctx.channel.permissions_for(member) + + elif guild: + member = guild.get_member(user.id) + resolved_permissions = member.guild_permissions + + else: + raise (KeyError) + + for permission in permissions: + if ( + not getattr(resolved_permissions, permission, False) + and not resolved_permissions.administrator is True + ): + return permission + + return False + + +async def check_moddable( + target: Union[User, Member], interaction: Interaction, permissions: list +): + """Checks if a moderator can moderate a target.""" + if check_permissions(interaction.client.user, permissions, guild=interaction.guild): + await interaction.response.send_message( + f"I do not have the `{permissions}` permission, required for this action.", + ephemeral=True, + ) + return False + + if await config.guild(interaction.guild).use_discord_permissions() is True: + if check_permissions(interaction.user, permissions, guild=interaction.guild): + await interaction.response.send_message( + f"You do not have the `{permissions}` permission, required for this action.", + ephemeral=True, + ) + return False + + if interaction.user.id == target.id: + await interaction.response.send_message( + content="You cannot moderate yourself!", ephemeral=True + ) + return False + + if target.bot: + await interaction.response.send_message( + content="You cannot moderate bots!", ephemeral=True + ) + return False + + if isinstance(target, Member): + if interaction.user.top_role <= target.top_role: + await interaction.response.send_message( + content="You cannot moderate members with a higher role than you!", + ephemeral=True, + ) + return False + + if ( + interaction.guild.get_member(interaction.client.user.id).top_role + <= target.top_role + ): + await interaction.response.send_message( + content="You cannot moderate members with a role higher than the bot!", + ephemeral=True, + ) + return False + + immune_roles = await config.guild(target.guild).immune_roles() + + for role in target.roles: + if role.id in immune_roles: + await interaction.response.send_message( + content="You cannot moderate members with an immune role!", + ephemeral=True, + ) + return False + + return True + + +async def get_next_case_number(guild_id: str, cursor=None): + """This method returns the next case number from the MySQL table for a specific guild.""" + if not cursor: + database = await db.connect() + cursor = database.cursor() + cursor.execute( + f"SELECT moderation_id FROM `moderation_{guild_id}` ORDER BY moderation_id DESC LIMIT 1" + ) + return cursor.fetchone()[0] + 1 + + +def generate_dict(result): + case: dict = { + "moderation_id": result[0], + "timestamp": result[1], + "moderation_type": result[2], + "target_id": result[3], + "moderator_id": result[4], + "role_id": result[5], + "duration": result[6], + "end_timestamp": result[7], + "reason": result[8], + "resolved": result[9], + "resolved_by": result[10], + "resolve_reason": result[11], + "expired": result[12], + "changes": json.loads(result[13]), + "metadata": json.loads(result[14]), + } + return case + + +async def fetch_user_dict(interaction: Interaction, user_id: str): + """This method returns a dictionary containing either user information or a standard deleted user template.""" + if user_id == "?": + user_dict = {"id": "?", "name": "Unknown User", "discriminator": "0"} + + else: + try: + user = interaction.client.get_user(user_id) + if user is None: + user = await interaction.client.fetch_user(user_id) + + user_dict = { + "id": user.id, + "name": user.name, + "discriminator": user.discriminator, + } + + except NotFound: + user_dict = { + "id": user_id, + "name": "Deleted User", + "discriminator": "0", + } + + return user_dict + + +async def fetch_role_dict(interaction: Interaction, role_id: str): + """This method returns a dictionary containing either role information or a standard deleted role template.""" + try: + role = interaction.guild.get_role(role_id) + + role_dict = {"id": role.id, "name": role.name} + + except NotFound: + role_dict = {"id": role_id, "name": "Deleted Role"} + + return role_dict + + +async def log(interaction: Interaction, moderation_id: int, resolved: bool = False): + """This method sends a message to the guild's configured logging channel when an infraction takes place.""" + logging_channel_id = await config.guild(interaction.guild).log_channel() + if logging_channel_id != " ": + logging_channel = interaction.guild.get_channel(logging_channel_id) + + case = await db.fetch_case(moderation_id, interaction.guild.id) + if case: + embed = await embed_factory( + "log", interaction=interaction, case_dict=case, resolved=resolved + ) + try: + await logging_channel.send(embed=embed) + except Forbidden: + return