import logging import json import time import os from datetime import datetime, timedelta, timezone from typing import Union import discord import humanize import mysql.connector from discord.ext import tasks from pytimeparse2 import disable_dateutil, parse from redbot.core import app_commands, checks, Config, commands, data_manager from redbot.core.app_commands import Choice class Moderation(commands.Cog): """Custom moderation cog. Developed by SeaswimmerTheFsh.""" def __init__(self, bot): self.bot = bot self.config = Config.get_conf(self, identifier=481923957134912) self.config.register_global( mysql_address= " ", mysql_database = " ", mysql_username = " ", mysql_password = " " ) self.config.register_guild( ignore_other_bots = True, dm_users = True, log_channel = " " ) disable_dateutil() self.handle_expiry.start() # pylint: disable=no-member self.logger = logging.getLogger('red.seaswimmerthefsh.moderation') async def cog_load(self): """This method prepares the database schema for all of the guilds the bot is currently in.""" conf = await self.check_conf([ 'mysql_address', 'mysql_database', 'mysql_username', 'mysql_password' ]) if conf: self.logger.fatal("Failed to create tables, due to MySQL connection configuration being unset.") return guilds: list[discord.Guild] = self.bot.guilds try: for guild in guilds: if not await self.bot.cog_disabled_in_guild(self, guild): await self.create_guild_table(guild) except ConnectionRefusedError: return async def cog_unload(self): self.handle_expiry.cancel() # pylint: disable=no-member @commands.Cog.listener('on_guild_join') async def db_generate_guild_join(self, guild: discord.Guild): """This method prepares the database schema whenever the bot joins a guild.""" if not await self.bot.cog_disabled_in_guild(self, guild): conf = await self.check_conf([ 'mysql_address', 'mysql_database', 'mysql_username', 'mysql_password' ]) if conf: self.logger.error("Failed to create a table for %s, due to MySQL connection configuration being unset.", guild.id) return try: await self.create_guild_table(guild) except ConnectionRefusedError: return @commands.Cog.listener('on_audit_log_entry_create') async def autologger(self, entry: discord.AuditLogEntry): """This method automatically logs moderations done by users manually ("right clicks").""" if not await self.bot.cog_disabled_in_guild(self, entry.guild): if await self.config.guild(entry.guild.id).ignore_other_bots() is True: if entry.user.bot or entry.target.bot: return else: if entry.user.id == self.bot.user.id: return duration = "NULL" if entry.reason: reason = entry.reason + " (This action was performed without the bot.)" else: reason = "This action was performed without the bot." if entry.action == discord.AuditLogAction.kick: moderation_type = 'KICK' elif entry.action == discord.AuditLogAction.ban: moderation_type = 'BAN' elif entry.action == discord.AuditLogAction.unban: moderation_type = 'UNBAN' elif entry.action == discord.AuditLogAction.member_update: if entry.after.timed_out_until is not None: timed_out_until_aware = entry.after.timed_out_until.replace(tzinfo=timezone.utc) duration_datetime = timed_out_until_aware - datetime.now(tz=timezone.utc) minutes = round(duration_datetime.total_seconds() / 60) duration = timedelta(minutes=minutes) moderation_type = 'MUTE' else: moderation_type = 'UNMUTE' else: return await self.mysql_log(entry.guild.id, entry.user.id, moderation_type, entry.target.id, 0, duration, reason) async def connect(self): """Connects to the MySQL database, and returns a connection object.""" conf = await self.check_conf([ 'mysql_address', 'mysql_database', 'mysql_username', 'mysql_password' ]) if conf: raise LookupError("MySQL connection details not set properly!") try: connection = mysql.connector.connect( host=await self.config.mysql_address(), user=await self.config.mysql_username(), password=await self.config.mysql_password(), database=await self.config.mysql_database() ) return connection except mysql.connector.ProgrammingError as e: self.logger.fatal("Unable to access the MySQL database!\nError:\n%s", e.msg) raise ConnectionRefusedError(f"Unable to access the MySQL Database!\n{e.msg}") from e async def create_guild_table(self, guild: discord.Guild): database = await self.connect() cursor = database.cursor() try: cursor.execute(f"SELECT * FROM `moderation_{guild.id}`") self.logger.info("MySQL Table exists for server %s (%s)", guild.name, guild.id) except mysql.connector.errors.ProgrammingError: query = f""" CREATE TABLE `moderation_{guild.id}` ( moderation_id INT UNIQUE PRIMARY KEY NOT NULL, timestamp INT NOT NULL, moderation_type LONGTEXT NOT NULL, target_id LONGTEXT NOT NULL, moderator_id LONGTEXT NOT NULL, role_id LONGTEXT, duration LONGTEXT, end_timestamp INT, reason LONGTEXT, resolved BOOL NOT NULL, resolved_by LONGTEXT, resolve_reason LONGTEXT, expired BOOL NOT NULL, changes JSON NOT NULL ) """ cursor.execute(query) index_query_1 = "CREATE INDEX idx_target_id ON moderation_%s(target_id(25));" cursor.execute(index_query_1, (guild.id,)) index_query_2 = "CREATE INDEX idx_moderator_id ON moderation_%s(moderator_id(25));" cursor.execute(index_query_2, (guild.id,)) index_query_3 = "CREATE INDEX idx_moderation_id ON moderation_%s(moderation_id);" cursor.execute(index_query_3, (guild.id,)) insert_query = f""" INSERT INTO `moderation_{guild.id}` (moderation_id, timestamp, moderation_type, target_id, moderator_id, role_id, duration, end_timestamp, reason, resolved, resolved_by, resolve_reason, expired, changes) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ insert_values = (0, 0, "NULL", 0, 0, 0, "NULL", 0, "NULL", 0, "NULL", "NULL", 0, json.dumps([])) cursor.execute(insert_query, insert_values) database.commit() self.logger.info("MySQL Table (moderation_%s) created for %s (%s)", guild.id, guild.name, guild.id) database.close() async def check_conf(self, config: list): """Checks if any required config options are not set.""" not_found_list = [] for item in config: if await self.config.item() == " ": not_found_list.append(item) return not_found_list def check_permissions(self, user: discord.User, permissions: list, ctx: Union[commands.Context, discord.Interaction] = None, guild: discord.Guild = None): """Checks if a user has a specific permission (or a list of permissions) in a channel.""" if ctx: member = ctx.guild.get_member(user.id) resolved_permissions = ctx.channel.permissions_for(member) elif guild: member = guild.get_member(user.id) resolved_permissions = member.guild_permissions else: raise(KeyError) for permission in permissions: if not getattr(resolved_permissions, permission, False) and not resolved_permissions.administrator is True: return permission return False async def mysql_log(self, guild_id: str, author_id: str, moderation_type: str, target_id: int, role_id: int, duration, reason: str): timestamp = int(time.time()) if duration != "NULL": end_timedelta = datetime.fromtimestamp(timestamp) + duration end_timestamp = int(end_timedelta.timestamp()) else: end_timestamp = 0 database = await self.connect() cursor = database.cursor() moderation_id = await self.get_next_case_number(guild_id=guild_id, cursor=cursor) sql = f"INSERT INTO `moderation_{guild_id}` (moderation_id, timestamp, moderation_type, target_id, moderator_id, role_id, duration, end_timestamp, reason, resolved, resolved_by, resolve_reason, expired, changes) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)" val = (moderation_id, timestamp, moderation_type, target_id, author_id, role_id, duration, end_timestamp, f"{reason}", 0, "NULL", "NULL", 0, []) cursor.execute(sql, val) database.commit() database.close() self.logger.debug("MySQL row inserted into moderation_%s!\n%s, %s, %s, %s, %s, %s, %s, %s, %s, 0, NULL, NULL, 0, []", guild_id, moderation_id, timestamp, moderation_type, target_id, author_id, role_id, duration, end_timestamp, reason) return moderation_id async def get_next_case_number(self, guild_id: str, cursor = None): """This method returns the next case number from the MySQL table for a specific guild.""" if not cursor: database = await self.connect() cursor = database.cursor() cursor.execute(f"SELECT moderation_id FROM `moderation_{guild_id}` ORDER BY moderation_id DESC LIMIT 1") return cursor.fetchone()[0] + 1 def generate_dict(self, result): case: dict = { "moderation_id": result[0], "timestamp": result[1], "moderation_type": result[2], "target_id": result[3], "moderator_id": result[4], "role_id": result[5], "duration": result[6], "end_timestamp": result[7], "reason": result[8], "resolved": result[9], "resolved_by": result[10], "resolve_reason": result[11], "expired": result[12], "changes": json.loads(result[13]) } return case async def fetch_user_dict(self, interaction: discord.Interaction, user_id: str): """This method returns a dictionary containing either user information or a standard deleted user template.""" try: user = interaction.client.get_user(user_id) if user is None: user = await interaction.client.fetch_user(user_id) user_dict = { 'id': user.id, 'name': user.name, 'discriminator': user.discriminator } except discord.errors.NotFound: user_dict = { 'id': user_id, 'name': 'Deleted User', 'discriminator': '0' } return user_dict async def fetch_role_dict(self, interaction: discord.Interaction, role_id: str): """This method returns a dictionary containing either role information or a standard deleted role template.""" try: role = interaction.guild.get_role(role_id) role_dict = { 'id': role.id, 'name': role.name } except discord.errors.NotFound: role_dict = { 'id': role_id, 'name': 'Deleted Role' } return role_dict async def embed_factory(self, embed_type: str, /, interaction: discord.Interaction = None, case_dict: dict = None, guild: discord.Guild = None, reason: str = None, moderation_type: str = None, response: discord.InteractionMessage = None, duration: timedelta = None, resolved: bool = False): """This method creates an embed from set parameters, meant for either moderation logging or contacting the moderated user. Valid arguments for 'embed_type': - 'message' - 'log' - WIP - 'case' Required arguments for 'message': - guild - reason - moderation_type - response - duration (optional) Required arguments for 'log': - interaction - case_dict - resolved (optional) Required arguments for 'case': - interaction - case_dict""" if embed_type == 'message': if moderation_type in ["kicked", "banned", "tempbanned", "unbanned"]: guild_name = guild.name else: guild_name = f"[{guild.name}]({response.jump_url})" if moderation_type in ["tempbanned", "muted"] and duration: embed_duration = f" for {humanize.precisedelta(duration)}" else: embed_duration = "" if moderation_type == "note": embed_desc = "received a" else: embed_desc = "been" embed = discord.Embed(title=str.title(moderation_type), description=f"You have {embed_desc} {moderation_type}{embed_duration} in {guild_name}.", color=await self.bot.get_embed_color(None), timestamp=datetime.now()) embed.add_field(name='Reason', value=f"`{reason}`") embed.set_author(name=guild.name, icon_url=guild.icon.url) embed.set_footer(text=f"Case #{await self.get_next_case_number(guild.id)}", icon_url="https://cdn.discordapp.com/attachments/1070822161389994054/1159469476773904414/arrow-right-circle-icon-512x512-2p1e2aaw.png?ex=65312319&is=651eae19&hm=3cebdd28e805c13a79ec48ef87c32ca532ffa6b9ede2e48d0cf8e5e81f3a6818&") return embed if embed_type == 'case': target_user = await self.fetch_user_dict(interaction, case_dict['target_id']) moderator_user = await self.fetch_user_dict(interaction, case_dict['moderator_id']) target_name = f"`{target_user['name']}`" if target_user['discriminator'] == "0" else f"`{target_user['name']}#{target_user['discriminator']}`" moderator_name = f"`{moderator_user['name']}`" if moderator_user['discriminator'] == "0" else f"`{moderator_user['name']}#{moderator_user['discriminator']}`" embed = discord.Embed(title=f"📕 Case #{case_dict['moderation_id']}", color=await self.bot.get_embed_color(None)) embed.description = f"**Type:** {str.title(case_dict['moderation_type'])}\n**Target:** {target_name} ({target_user['id']})\n**Moderator:** {moderator_name} ({moderator_user['id']})\n**Resolved:** {bool(case_dict['resolved'])}\n**Timestamp:** | " if case_dict['duration'] != 'NULL': td = timedelta(**{unit: int(val) for unit, val in zip(["hours", "minutes", "seconds"], case_dict["duration"].split(":"))}) duration_embed = f"{humanize.precisedelta(td)} | " if bool(case_dict['expired']) is False else str(humanize.precisedelta(td)) embed.description = embed.description + f"\n**Duration:** {duration_embed}\n**Expired:** {bool(case_dict['expired'])}" embed.add_field(name='Reason', value=f"```{case_dict['reason']}```", inline=False) if case_dict['resolved'] == 1: resolved_user = await self.fetch_user_dict(interaction, case_dict['resolved_by']) resolved_name = f"`{resolved_user['name']}`" if resolved_user['discriminator'] == "0" else f"`{resolved_user['name']}#{resolved_user['discriminator']}`" embed.add_field(name='Resolve Reason', value=f"Resolved by {resolved_name} ({resolved_user['id']}) for:\n```{case_dict['resolve_reason']}```", inline=False) return embed if embed_type == 'log': if resolved: target_user = await self.fetch_user_dict(interaction, case_dict['target_id']) moderator_user = await self.fetch_user_dict(interaction, case_dict['moderator_id']) target_name = f"`{target_user['name']}`" if target_user['discriminator'] == "0" else f"`{target_user['name']}#{target_user['discriminator']}`" moderator_name = moderator_user['name'] if moderator_user['discriminator'] == "0" else f"{moderator_user['name']}#{moderator_user['discriminator']}" embed = discord.Embed(title=f"📕 Case #{case_dict['moderation_id']} Resolved", color=await self.bot.get_embed_color(None)) embed.description = f"**Type:** {str.title(case_dict['moderation_type'])}\n**Target:** {target_name} ({target_user['id']})\n**Moderator:** {moderator_name} ({moderator_user['id']})\n**Timestamp:** | " if case_dict['duration'] != 'NULL': td = timedelta(**{unit: int(val) for unit, val in zip(["hours", "minutes", "seconds"], case_dict["duration"].split(":"))}) duration_embed = f"{humanize.precisedelta(td)} | " if case_dict["expired"] == '0' else str(humanize.precisedelta(td)) embed.description = embed.description + f"\n**Duration:** {duration_embed}\n**Expired:** {bool(case_dict['expired'])}" embed.add_field(name='Reason', value=f"```{case_dict['reason']}```", inline=False) resolved_user = await self.fetch_user_dict(interaction, case_dict['resolved_by']) resolved_name = resolved_user['name'] if resolved_user['discriminator'] == "0" else f"{resolved_user['name']}#{resolved_user['discriminator']}" embed.add_field(name='Resolve Reason', value=f"Resolved by {resolved_name} ({resolved_user['id']}) for:\n```{case_dict['resolve_reason']}```", inline=False) else: target_user = await self.fetch_user_dict(interaction, case_dict['target_id']) moderator_user = await self.fetch_user_dict(interaction, case_dict['moderator_id']) target_name = f"`{target_user['name']}`" if target_user['discriminator'] == "0" else f"`{target_user['name']}#{target_user['discriminator']}`" moderator_name = moderator_user['name'] if moderator_user['discriminator'] == "0" else f"{moderator_user['name']}#{moderator_user['discriminator']}" embed = discord.Embed(title=f"📕 Case #{case_dict['moderation_id']}", color=await self.bot.get_embed_color(None)) embed.description = f"**Type:** {str.title(case_dict['moderation_type'])}\n**Target:** {target_name} ({target_user['id']})\n**Moderator:** {moderator_name} ({moderator_user['id']})\n**Timestamp:** | " if case_dict['duration'] != 'NULL': td = timedelta(**{unit: int(val) for unit, val in zip(["hours", "minutes", "seconds"], case_dict["duration"].split(":"))}) embed.description = embed.description + f"\n**Duration:** {humanize.precisedelta(td)} | " embed.add_field(name='Reason', value=f"```{case_dict['reason']}```", inline=False) return embed raise(TypeError("'type' argument is invalid!")) async def fetch_case(self, moderation_id: int, guild_id: str): """This method fetches a case from the database and returns the case's dictionary.""" database = await self.connect() cursor = database.cursor() query = "SELECT * FROM moderation_%s WHERE moderation_id = %s;" cursor.execute(query, (guild_id, moderation_id)) result = cursor.fetchone() cursor.close() database.close() return self.generate_dict(result) async def log(self, interaction: discord.Interaction, moderation_id: int, resolved: bool = False): """This method sends a message to the guild's configured logging channel when an infraction takes place.""" logging_channel_id = await self.config.guild(interaction.guild).log_channel() if logging_channel_id != " ": logging_channel = interaction.guild.get_channel(logging_channel_id) case = await self.fetch_case(moderation_id, interaction.guild.id) if case: embed = await self.embed_factory('log', interaction=interaction, case_dict=case, resolved=resolved) try: await logging_channel.send(embed=embed) except discord.errors.Forbidden: return ####################################################################################################################### ### COMMANDS ####################################################################################################################### @app_commands.command(name="note") async def note(self, interaction: discord.Interaction, target: discord.User, reason: str, silent: bool = None): """Add a note to a user. Parameters ----------- target: discord.User Who are you noting? reason: str Why are you noting this user? silent: bool Should the user be messaged?""" if interaction.guild.get_member(target.id): target_member = interaction.guild.get_member(target.id) if interaction.guild.get_member(interaction.client.user.id).top_role <= target_member.top_role: await interaction.response.send_message(content="You cannot moderate members with a role higher than the bot!", ephemeral=True) return if interaction.user.top_role <= target_member.top_role: await interaction.response.send_message(content="You cannot moderate members with a higher role than you!", ephemeral=True) return await interaction.response.send_message(content=f"{target.mention} has recieved a note!\n**Reason** - `{reason}`") if silent is None: silent = not await self.config.guild(interaction.guild).dm_users() if silent is False: try: embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='note', response=await interaction.original_response()) await target.send(embed=embed) except discord.errors.HTTPException: pass moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'NOTE', target.id, 0, 'NULL', reason) await interaction.edit_original_response(content=f"{target.mention} has received a note! (Case `#{moderation_id}`)\n**Reason** - `{reason}`") await self.log(interaction, moderation_id) @app_commands.command(name="warn") async def warn(self, interaction: discord.Interaction, target: discord.Member, reason: str, silent: bool = None): """Warn a user. Parameters ----------- target: discord.Member Who are you warning? reason: str Why are you warning this user? silent: bool Should the user be messaged?""" if interaction.guild.get_member(target.id): target_member = interaction.guild.get_member(target.id) if interaction.guild.get_member(interaction.client.user.id).top_role <= target_member.top_role: await interaction.response.send_message(content="You cannot moderate members with a role higher than the bot!", ephemeral=True) return if interaction.user.top_role <= target_member.top_role: await interaction.response.send_message(content="You cannot moderate members with a higher role than you!", ephemeral=True) return await interaction.response.send_message(content=f"{target.mention} has been warned!\n**Reason** - `{reason}`") if silent is None: silent = not await self.config.guild(interaction.guild).dm_users() if silent is False: try: embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='warned', response=await interaction.original_response()) await target.send(embed=embed) except discord.errors.HTTPException: pass moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'WARN', target.id, 0, 'NULL', reason) await interaction.edit_original_response(content=f"{target.mention} has been warned! (Case `#{moderation_id}`)\n**Reason** - `{reason}`") await self.log(interaction, moderation_id) @app_commands.command(name="mute") async def mute(self, interaction: discord.Interaction, target: discord.Member, duration: str, reason: str, silent: bool = None): """Mute a user. Parameters ----------- target: discord.Member Who are you unbanning? duration: str How long are you muting this user for? reason: str Why are you unbanning this user? silent: bool Should the user be messaged?""" if interaction.guild.get_member(target.id): target_member = interaction.guild.get_member(target.id) if interaction.guild.get_member(interaction.client.user.id).top_role <= target_member.top_role: await interaction.response.send_message(content="You cannot moderate members with a role higher than the bot!", ephemeral=True) return if interaction.user.top_role <= target_member.top_role: await interaction.response.send_message(content="You cannot moderate members with a higher role than you!", ephemeral=True) return permissions = self.check_permissions(interaction.client.user, ['moderate_members'], interaction) if permissions: await interaction.response.send_message(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True) return if target.is_timed_out() is True: await interaction.response.send_message(f"{target.mention} is already muted!", allowed_mentions=discord.AllowedMentions(users=False), ephemeral=True) return try: parsed_time = parse(sval=duration, as_timedelta=True, raise_exception=True) except ValueError: await interaction.response.send_message("Please provide a valid duration!", ephemeral=True) return if parsed_time.total_seconds() / 1000 > 2419200000: await interaction.response.send_message("Please provide a duration that is less than 28 days.") return await target.timeout(parsed_time, reason=f"Muted by {interaction.user.id} for: {reason}") await interaction.response.send_message(content=f"{target.mention} has been muted for {humanize.precisedelta(parsed_time)}!\n**Reason** - `{reason}`") if silent is None: silent = not await self.config.guild(interaction.guild).dm_users() if silent is False: try: embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='muted', response=await interaction.original_response(), duration=parsed_time) await target.send(embed=embed) except discord.errors.HTTPException: pass moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'MUTE', target.id, 0, parsed_time, reason) await interaction.edit_original_response(content=f"{target.mention} has been muted for {humanize.precisedelta(parsed_time)}! (Case `#{moderation_id}`)\n**Reason** - `{reason}`") await self.log(interaction, moderation_id) @app_commands.command(name="unmute") async def unmute(self, interaction: discord.Interaction, target: discord.Member, reason: str = None, silent: bool = None): """Unmute a user. Parameters ----------- target: discord.user Who are you unmuting? reason: str Why are you unmuting this user? silent: bool Should the user be messaged?""" if interaction.guild.get_member(target.id): target_member = interaction.guild.get_member(target.id) if interaction.guild.get_member(interaction.client.user.id).top_role <= target_member.top_role: await interaction.response.send_message(content="You cannot moderate members with a role higher than the bot!", ephemeral=True) return if interaction.user.top_role <= target_member.top_role: await interaction.response.send_message(content="You cannot moderate members with a higher role than you!", ephemeral=True) return permissions = self.check_permissions(interaction.client.user, ['moderate_members'], interaction) if permissions: await interaction.response.send_message(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True) return if target.is_timed_out() is False: await interaction.response.send_message(f"{target.mention} is not muted!", allowed_mentions=discord.AllowedMentions(users=False), ephemeral=True) return if reason: await target.timeout(None, reason=f"Unmuted by {interaction.user.id} for: {reason}") else: await target.timeout(None, reason=f"Unbanned by {interaction.user.id}") reason = "No reason given." await interaction.response.send_message(content=f"{target.mention} has been unmuted!\n**Reason** - `{reason}`") if silent is None: silent = not await self.config.guild(interaction.guild).dm_users() if silent is False: try: embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='unmuted', response=await interaction.original_response()) await target.send(embed=embed) except discord.errors.HTTPException: pass moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'UNMUTE', target.id, 0, 'NULL', reason) await interaction.edit_original_response(content=f"{target.mention} has been unmuted! (Case `#{moderation_id}`)\n**Reason** - `{reason}`") await self.log(interaction, moderation_id) @app_commands.command(name="kick") async def kick(self, interaction: discord.Interaction, target: discord.Member, reason: str, silent: bool = None): """Kick a user. Parameters ----------- target: discord.user Who are you kicking? reason: str Why are you kicking this user? silent: bool Should the user be messaged?""" if interaction.guild.get_member(target.id): target_member = interaction.guild.get_member(target.id) if interaction.guild.get_member(interaction.client.user.id).top_role <= target_member.top_role: await interaction.response.send_message(content="You cannot moderate members with a role higher than the bot!", ephemeral=True) return if interaction.user.top_role <= target_member.top_role: await interaction.response.send_message(content="You cannot moderate members with a higher role than you!", ephemeral=True) return permissions = self.check_permissions(interaction.client.user, ['kick_members'], interaction) if permissions: await interaction.response.send_message(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True) return await interaction.response.send_message(content=f"{target.mention} has been kicked!\n**Reason** - `{reason}`") if silent is None: silent = not await self.config.guild(interaction.guild).dm_users() if silent is False: try: embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='kicked', response=await interaction.original_response()) await target.send(embed=embed) except discord.errors.HTTPException: pass await target.kick(f"Kicked by {interaction.user.id} for: {reason}") moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'KICK', target.id, 0, 'NULL', reason) await interaction.edit_original_response(content=f"{target.mention} has been kicked! (Case `#{moderation_id}`)\n**Reason** - `{reason}`") await self.log(interaction, moderation_id) @app_commands.command(name="ban") @app_commands.choices(delete_messages=[ Choice(name="None", value=0), Choice(name='1 Hour', value=3600), Choice(name='12 Hours', value=43200), Choice(name='1 Day', value=86400), Choice(name='3 Days', value=259200), Choice(name='7 Days', value=604800), ]) async def ban(self, interaction: discord.Interaction, target: discord.User, reason: str, duration: str = None, delete_messages: Choice[int] = 0, silent: bool = None): """Ban a user. Parameters ----------- target: discord.user Who are you banning? duration: str How long are you banning this user for? reason: str Why are you banning this user? delete_messages: Choices[int] How many days of messages to delete? silent: bool Should the user be messaged?""" if interaction.guild.get_member(target.id): target_member = interaction.guild.get_member(target.id) if interaction.guild.get_member(interaction.client.user.id).top_role <= target_member.top_role: await interaction.response.send_message(content="You cannot moderate members with a role higher than the bot!", ephemeral=True) return if interaction.user.top_role <= target_member.top_role: await interaction.response.send_message(content="You cannot moderate members with a higher role than you!", ephemeral=True) return permissions = self.check_permissions(interaction.client.user, ['ban_members'], interaction) if permissions: await interaction.response.send_message(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True) return try: await interaction.guild.fetch_ban(target) await interaction.response.send_message(content=f"{target.mention} is already banned!", ephemeral=True) return except discord.errors.NotFound: pass if duration: try: parsed_time = parse(sval=duration, as_timedelta=True, raise_exception=True) except ValueError: await interaction.response.send_message("Please provide a valid duration!", ephemeral=True) return await interaction.response.send_message(content=f"{target.mention} has been banned for {humanize.precisedelta(parsed_time)}!\n**Reason** - `{reason}`") try: embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='tempbanned', response=await interaction.original_response(), duration=parsed_time) await target.send(embed=embed) except discord.errors.HTTPException: pass await interaction.guild.ban(target, reason=f"Tempbanned by {interaction.user.id} for: {reason} (Duration: {parsed_time})", delete_message_seconds=delete_messages) moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'TEMPBAN', target.id, 0, parsed_time, reason) await interaction.edit_original_response(content=f"{target.mention} has been banned for {humanize.precisedelta(parsed_time)}! (Case `#{moderation_id}`)\n**Reason** - `{reason}`") await self.log(interaction, moderation_id) else: await interaction.response.send_message(content=f"{target.mention} has been banned!\n**Reason** - `{reason}`") if silent is None: silent = not await self.config.guild(interaction.guild).dm_users() if silent is False: try: embed = embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='banned', response=await interaction.original_response()) await target.send(embed=embed) except discord.errors.HTTPException: pass await interaction.guild.ban(target, reason=f"Banned by {interaction.user.id} for: {reason}", delete_message_seconds=delete_messages) moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'BAN', target.id, 0, 'NULL', reason) await interaction.edit_original_response(content=f"{target.mention} has been banned! (Case `#{moderation_id}`)\n**Reason** - `{reason}`") await self.log(interaction, moderation_id) @app_commands.command(name="unban") async def unban(self, interaction: discord.Interaction, target: discord.User, reason: str = None, silent: bool = None): """Unban a user. Parameters ----------- target: discord.user Who are you unbanning? reason: str Why are you unbanning this user? silent: bool Should the user be messaged?""" if interaction.guild.get_member(target.id): target_member = interaction.guild.get_member(target.id) if interaction.guild.get_member(interaction.client.user.id).top_role <= target_member.top_role: await interaction.response.send_message(content="You cannot moderate members with a role higher than the bot!", ephemeral=True) return if interaction.user.top_role <= target_member.top_role: await interaction.response.send_message(content="You cannot moderate members with a higher role than you!", ephemeral=True) return permissions = self.check_permissions(interaction.client.user, ['ban_members'], interaction) if permissions: await interaction.response.send_message(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True) return try: await interaction.guild.fetch_ban(target) except discord.errors.NotFound: await interaction.response.send_message(content=f"{target.mention} is not banned!", ephemeral=True) return if reason: await interaction.guild.unban(target, reason=f"Unbanned by {interaction.user.id} for: {reason}") else: await interaction.guild.unban(target, reason=f"Unbanned by {interaction.user.id}") reason = "No reason given." await interaction.response.send_message(content=f"{target.mention} has been unbanned!\n**Reason** - `{reason}`") if silent is None: silent = not await self.config.guild(interaction.guild).dm_users() if silent is False: try: embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='unbanned', response=await interaction.original_response()) await target.send(embed=embed) except discord.errors.HTTPException: pass moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'UNBAN', target.id, 0, 'NULL', reason) await interaction.edit_original_response(content=f"{target.mention} has been unbanned! (Case `#{moderation_id}`)\n**Reason** - `{reason}`") await self.log(interaction, moderation_id) @app_commands.command(name="history") async def history(self, interaction: discord.Interaction, target: discord.User = None, moderator: discord.User = None, pagesize: app_commands.Range[int, 1, 25] = 5, page: int = 1, ephemeral: bool = False, inline: bool = False, export: bool = False): """List previous infractions. Parameters ----------- target: discord.User User whose infractions to query, overrides moderator if both are given moderator: discord.User Query by moderator pagesize: app_commands.Range[int, 1, 25] Amount of infractions to list per page page: int Page to select ephemeral: bool Hide the command response inline: bool Display infractions in a grid arrangement (does not look very good) export: bool Exports the server's entire moderation history to a JSON file""" await interaction.response.defer(ephemeral=ephemeral) permissions = self.check_permissions(interaction.client.user, ['embed_links'], interaction) if permissions: await interaction.followup.send(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True) return database = await self.connect() cursor = database.cursor() if target: query = """SELECT * FROM moderation_%s WHERE target_id = %s ORDER BY moderation_id DESC;""" cursor.execute(query, (interaction.guild.id, target.id)) elif moderator: query = """SELECT * FROM moderation_%s WHERE moderator_id = %s ORDER BY moderation_id DESC;""" cursor.execute(query, (interaction.guild.id, moderator.id)) else: query = """SELECT * FROM moderation_%s ORDER BY moderation_id DESC;""" cursor.execute(query, (interaction.guild.id,)) results = cursor.fetchall() result_dict_list = [] for result in results: case_dict = self.generate_dict(result) if case_dict['moderation_id'] == 0: continue result_dict_list.append(case_dict) if export: try: filename = str(data_manager.cog_data_path(cog_instance=self)) + str(os.sep) + f"moderation_{interaction.guild.id}.json" with open(filename, "w", encoding="utf-8") as f: json.dump(result_dict_list, f, indent=2) await interaction.followup.send(file=discord.File(filename, f"moderation_{interaction.guild.id}.json"), ephemeral=ephemeral) os.remove(filename) return except json.JSONDecodeError as e: await interaction.followup.send(content=f"An error occured while exporting the moderation history.\nError:\n```{e}```", ephemeral=ephemeral) return case_quantity = len(result_dict_list) page_quantity = round(case_quantity / pagesize) start_index = (page - 1) * pagesize end_index = page * pagesize embed = discord.Embed(color=await self.bot.get_embed_color(None)) embed.set_author(icon_url=interaction.guild.icon.url, name='Infraction History') embed.set_footer(text=f"Page {page}/{page_quantity} | {case_quantity} Results") memory_dict = {} for case in result_dict_list[start_index:end_index]: if case['target_id'] not in memory_dict: memory_dict[str(case['target_id'])] = await self.fetch_user_dict(interaction, case['target_id']) target_user = memory_dict[str(case['target_id'])] if case['moderator_id'] not in memory_dict: memory_dict[str(case['moderator_id'])] = await self.fetch_user_dict(interaction, case['moderator_id']) moderator_user = memory_dict[str(case['moderator_id'])] target_name = target_user['name'] if target_user['discriminator'] == "0" else f"{target_user['name']}#{target_user['discriminator']}" moderator_name = moderator_user['name'] if moderator_user['discriminator'] == "0" else f"{moderator_user['name']}#{moderator_user['discriminator']}" field_name = f"Case #{case['moderation_id']} ({str.title(case['moderation_type'])})" field_value = f"**Target:** `{target_name}` ({target_user['id']})\n**Moderator:** `{moderator_name}` ({moderator_user['id']})" if len(case['reason']) > 150: field_value += f"\n**Reason:** `{str(case['reason'])[:150]}...`" else: field_value += f"\n**Reason:** `{str(case['reason'])}`" if case['duration'] != 'NULL': td = timedelta(**{unit: int(val) for unit, val in zip(["hours", "minutes", "seconds"], case["duration"].split(":"))}) duration_embed = f"{humanize.precisedelta(td)} | " if bool(case['expired']) is False else f"{humanize.precisedelta(td)} | Expired" field_value += f"\n**Duration:** {duration_embed}" field_value += f"\n**Timestamp:** | " if bool(case['resolved']): field_value += "\n**Resolved:** True" embed.add_field(name=field_name, value=field_value, inline=inline) await interaction.followup.send(embed=embed, ephemeral=ephemeral) @app_commands.command(name="resolve") async def resolve(self, interaction: discord.Interaction, case_number: int, reason: str = None): """Resolve a specific case. Parameters ----------- case_number: int Case number of the case you're trying to resolve reason: str Reason for resolving case""" permissions = self.check_permissions(interaction.client.user, ['embed_links', 'moderate_members', 'ban_members'], interaction) if permissions: await interaction.response.send_message(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True) return conf = await self.check_conf(['mysql_database']) if conf: raise(LookupError) database = await self.connect() cursor = database.cursor() db = await self.config.mysql_database() query_1 = "SELECT * FROM moderation_%s WHERE moderation_id = %s;" cursor.execute(query_1, (interaction.guild.id, case_number)) result_1 = cursor.fetchone() if result_1 is None or case_number == 0: await interaction.response.send_message(content=f"There is no moderation with a case number of {case_number}.", ephemeral=True) return query_2 = "SELECT * FROM moderation_%s WHERE moderation_id = %s AND resolved = 0;" cursor.execute(query_2, (interaction.guild.id, case_number)) result_2 = cursor.fetchone() if result_2 is None: await interaction.response.send_message(content=f"This moderation has already been resolved!\nUse `/case {case_number}` for more information.", ephemeral=True) return case = self.generate_dict(result_2) if reason is None: reason = "No reason given." if case['moderation_type'] in ['UNMUTE', 'UNBAN']: await interaction.response.send_message(content="You cannot resolve this type of moderation!", ephemeral=True) if case['moderation_type'] in ['MUTE', 'TEMPBAN', 'BAN']: if case['moderation_type'] == 'MUTE': try: member = await interaction.guild.fetch_member(case['target_id']) await member.timeout(None, reason=f"Case #{case_number} resolved by {interaction.user.id}") except discord.NotFound: pass if case['moderation_type'] in ['TEMPBAN', 'BAN']: try: user = await interaction.client.fetch_user(case['target_id']) await interaction.guild.unban(user, reason=f"Case #{case_number} resolved by {interaction.user.id}") except discord.NotFound: pass resolve_query = f"UPDATE `{db}`.`moderation_{interaction.guild.id}` SET resolved = 1, expired = 1, resolved_by = %s, resolve_reason = %s WHERE moderation_id = %s" else: resolve_query = f"UPDATE `{db}`.`moderation_{interaction.guild.id}` SET resolved = 1, resolved_by = %s, resolve_reason = %s WHERE moderation_id = %s" cursor.execute(resolve_query, (interaction.user.id, reason, case_number)) database.commit() response_query = "SELECT * FROM moderation_%s WHERE moderation_id = %s;" cursor.execute(response_query, (interaction.guild.id, case_number)) result = cursor.fetchone() case_dict = self.generate_dict(result) embed = await self.embed_factory('case', interaction=interaction, case_dict=case_dict) await interaction.response.send_message(content=f"✅ Moderation #{case_number} resolved!", embed=embed) await self.log(interaction, case_number, True) cursor.close() database.close() @app_commands.command(name="case") async def case(self, interaction: discord.Interaction, case_number: int, ephemeral: bool = False): """Check the details of a specific case. Parameters ----------- case_number: int What case are you looking up? ephemeral: bool Hide the command response""" permissions = self.check_permissions(interaction.client.user, ['embed_links'], interaction) if permissions: await interaction.response.send_message(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True) return if case_number != 0: case = await self.fetch_case(case_number, interaction.guild.id) if case: embed = await self.embed_factory('case', interaction=interaction, case_dict=case) await interaction.response.send_message(embed=embed, ephemeral=ephemeral) return await interaction.response.send_message(content=f"No case with case number `{case_number}` found.", ephemeral=True) @tasks.loop(minutes=1) async def handle_expiry(self): conf = await self.check_conf(['mysql_database']) if conf: raise(LookupError) database = await self.connect() cursor = database.cursor() db = await self.config.mysql_database() guilds: list[discord.Guild] = self.bot.guilds for guild in guilds: if not await self.bot.cog_disabled_in_guild(self, guild): tempban_query = f"SELECT target_id, moderation_id FROM moderation_{guild.id} WHERE end_timestamp != 0 AND end_timestamp <= %s AND moderation_type = 'TEMPBAN' AND expired = 0" try: cursor.execute(tempban_query, (time.time(),)) result = cursor.fetchall() except mysql.connector.errors.ProgrammingError: continue target_ids = [row[0] for row in result] moderation_ids = [row[1] for row in result] for target_id, moderation_id in zip(target_ids, moderation_ids): user: discord.User = await self.bot.fetch_user(target_id) try: await guild.unban(user, reason=f"Automatic unban from case #{moderation_id}") embed = await self.embed_factory('message', guild, f'Automatic unban from case #{moderation_id}', 'unbanned') try: await user.send(embed=embed) except discord.errors.HTTPException: pass except [discord.errors.NotFound, discord.errors.Forbidden, discord.errors.HTTPException] as e: print(f"Failed to unban {user.name}#{user.discriminator} ({user.id}) from {guild.name} ({guild.id})\n{e}") expiry_query = f"UPDATE `{db}`.`moderation_{guild.id}` SET expired = 1 WHERE (end_timestamp != 0 AND end_timestamp <= %s AND expired = 0 AND moderation_type != 'BLACKLIST') OR (expired = 0 AND resolved = 1 AND moderation_type != 'BLACKLIST')" cursor.execute(expiry_query, (time.time(),)) blacklist_query = f"SELECT target_id, moderation_id, role_id FROM moderation_{guild.id} WHERE end_timestamp != 0 AND end_timestamp <= %s AND moderation_type = 'BLACKLIST' AND expired = 0" try: cursor.execute(blacklist_query, (time.time(),)) result = cursor.fetchall() except mysql.connector.errors.ProgrammingError: continue target_ids = [row[0] for row in result] moderation_ids = [row[1] for row in result] role_ids = [row[2] for row in result] for target_id, moderation_id, role_id in zip(target_ids, moderation_ids, role_ids): try: member: discord.Member = await guild.fetch_member(target_id) role: discord.Role = guild.get_role(role_id) if role is None: raise discord.errors.NotFound except [discord.errors.NotFound, discord.errors.Forbidden, discord.errors.HTTPException]: continue database.commit() cursor.close() database.close() @commands.group(autohelp=True) @checks.admin() async def moderationset(self, ctx: commands.Context): """Manage moderation commands.""" @moderationset.command(name="ignorebots") @checks.admin() async def moderationset_ignorebots(self, ctx: commands.Context): """Toggle if the cog should ignore other bots' moderations.""" await self.config.guild(ctx.guild).ignore_other_bots.set(not await self.config.guild(ctx.guild).ignore_other_bots()) await ctx.send(f"Ignore bots setting set to {await self.config.guild(ctx.guild).ignore_other_bots()}") @moderationset.command(name="dm") @checks.admin() async def moderationset_dm(self, ctx: commands.Context): """Toggle automatically messaging moderated users. This option can be overridden by specifying the `silent` argument in any moderation command.""" await self.config.guild(ctx.guild).dm_users.set(not await self.config.guild(ctx.guild).dm_users()) await ctx.send(f"DM users setting set to {await self.config.guild(ctx.guild).dm_users()}") @moderationset.command(name="logchannel") @checks.admin() async def moderationset_logchannel(self, ctx: commands.Context, channel: discord.TextChannel = None): """Set a channel to log infractions to.""" if channel: await self.config.guild(ctx.guild).log_channel.set(channel.id) await ctx.send(f"Logging channel set to {channel.mention}.") else: await self.config.guild(ctx.guild).log_channel.set(" ") await ctx.send("Logging channel disabled.") @moderationset.command(name="mysql") @checks.is_owner() async def moderationset_mysql(self, ctx: commands.Context): """Configure MySQL connection details.""" await ctx.message.add_reaction("✅") await ctx.author.send(content="Click the button below to configure your MySQL connection details.", view=self.ConfigButtons(60)) class ConfigButtons(discord.ui.View): def __init__(self, timeout): super().__init__() self.config = Config.get_conf(None, cog_name='Moderation', identifier=481923957134912) @discord.ui.button(label="Edit", style=discord.ButtonStyle.success) async def config_button(self, interaction: discord.Interaction, button: discord.ui.Button): # pylint: disable=unused-argument await interaction.response.send_modal(Moderation.MySQLConfigModal(self.config)) class MySQLConfigModal(discord.ui.Modal, title="MySQL Database Configuration"): def __init__(self, config): super().__init__() self.config = config address = discord.ui.TextInput( label="Address", placeholder="Input your MySQL address here.", style=discord.TextStyle.short, required=False, max_length=300 ) database = discord.ui.TextInput( label="Database", placeholder="Input the name of your database here.", style=discord.TextStyle.short, required=False, max_length=300 ) username = discord.ui.TextInput( label="Username", placeholder="Input your MySQL username here.", style=discord.TextStyle.short, required=False, max_length=300 ) password = discord.ui.TextInput( label="Password", placeholder="Input your MySQL password here.", style=discord.TextStyle.short, required=False, max_length=300 ) async def on_submit(self, interaction: discord.Interaction): message = "" if self.address.value != "": await self.config.mysql_address.set(self.address.value) message += f"- Address set to\n - `{self.address.value}`\n" if self.database.value != "": await self.config.mysql_database.set(self.database.value) message += f"- Database set to\n - `{self.database.value}`\n" if self.username.value != "": await self.config.mysql_username.set(self.username.value) message += f"- Username set to\n - `{self.username.value}`\n" if self.password.value != "": await self.config.mysql_password.set(self.password.value) trimmed_password = self.password.value[:8] message += f"- Password set to\n - `{trimmed_password}` - Trimmed for security\n" if message == "": trimmed_password = str(await self.config.mysql_password())[:8] send = f"No changes were made.\nCurrent configuration:\n- Address:\n - `{await self.config.mysql_address()}`\n- Database:\n - `{await self.config.mysql_database()}`\n- Username:\n - `{await self.config.mysql_username()}`\n- Password:\n - `{trimmed_password}` - Trimmed for security" else: send = f"Configuration changed:\n{message}" await interaction.response.send_message(send, ephemeral=True) @moderationset.group(autohelp=True) @checks.admin() async def moderationset_import(self, ctx: commands.Context): """Import moderations from other bots.""" @moderationset_import.command(name="galacticbot") @checks.admin() async def moderationset_import_galacticbot(self, ctx: commands.Context): """Import moderations from GalacticBot. **UNFINISHED!**""" await ctx.send("Are you sure you want to import GalacticBot moderations? This will overwrite any moderations that already exist in the database.", view=self.GalacticBotImportButtons(60)) class GalacticBotImportButtons(discord.ui.View): def __init__(self, timeout): super().__init__() self.config = Config.get_conf(None, cog_name='Moderation', identifier=481923957134912) @discord.ui.button(label="Yes", style=discord.ButtonStyle.success) async def import_button_y(self, interaction: discord.Interaction, button: discord.ui.Button): # pylint: disable=unused-argument await interaction.response.send_modal(Moderation.MySQLConfigModal(self.config)) @discord.ui.button(label="No", style=discord.ButtonStyle.danger) async def import_button_n(self, interaction: discord.Interaction, button: discord.ui.Button): # pylint: disable=unused-argument await interaction.response.send_modal(Moderation.MySQLConfigModal(self.config)) @commands.command(aliases=["tdc"]) async def timedeltaconvert(self, ctx: commands.Context, *, duration: str): """This command converts a duration to a [`timedelta`](https://docs.python.org/3/library/datetime.html#datetime.timedelta) Python object. **Example usage** `[p]timedeltaconvert 1 day 15hr 82 minutes 52s` **Output** `1 day, 16:22:52`""" try: parsed_time = parse(duration, as_timedelta=True, raise_exception=True) await ctx.send(f"`{str(parsed_time)}`") except ValueError: await ctx.send("Please provide a convertible value!")