GalaxyCogs/moderation/moderation.py
2023-12-14 14:57:28 -05:00

1087 lines
60 KiB
Python

import logging
import json
import time
import os
from datetime import datetime, timedelta, timezone
from typing import Union
import discord
import humanize
import mysql.connector
from discord.ext import tasks
from pytimeparse2 import disable_dateutil, parse
from redbot.core import app_commands, checks, Config, commands, data_manager
from redbot.core.app_commands import Choice
class Moderation(commands.Cog):
"""Custom moderation cog.
Developed by SeaswimmerTheFsh."""
def __init__(self, bot):
self.bot = bot
self.config = Config.get_conf(self, identifier=481923957134912)
self.config.register_global(
mysql_address= " ",
mysql_database = " ",
mysql_username = " ",
mysql_password = " "
)
self.config.register_guild(
ignore_other_bots = True,
dm_users = True,
log_channel = " "
)
disable_dateutil()
self.handle_expiry.start() # pylint: disable=no-member
self.logger = logging.getLogger('red.seaswimmerthefsh.moderation')
async def cog_load(self):
"""This method prepares the database schema for all of the guilds the bot is currently in."""
conf = await self.check_conf([
'mysql_address',
'mysql_database',
'mysql_username',
'mysql_password'
])
if conf:
self.logger.fatal("Failed to create tables, due to MySQL connection configuration being unset.")
return
guilds: list[discord.Guild] = self.bot.guilds
try:
for guild in guilds:
if not await self.bot.cog_disabled_in_guild(self, guild):
await self.create_guild_table(guild)
except ConnectionRefusedError:
return
async def cog_unload(self):
self.handle_expiry.cancel() # pylint: disable=no-member
@commands.Cog.listener('on_guild_join')
async def db_generate_guild_join(self, guild: discord.Guild):
"""This method prepares the database schema whenever the bot joins a guild."""
if not await self.bot.cog_disabled_in_guild(self, guild):
conf = await self.check_conf([
'mysql_address',
'mysql_database',
'mysql_username',
'mysql_password'
])
if conf:
self.logger.error("Failed to create a table for %s, due to MySQL connection configuration being unset.", guild.id)
return
try:
await self.create_guild_table(guild)
except ConnectionRefusedError:
return
@commands.Cog.listener('on_audit_log_entry_create')
async def autologger(self, entry: discord.AuditLogEntry):
"""This method automatically logs moderations done by users manually ("right clicks")."""
if not await self.bot.cog_disabled_in_guild(self, entry.guild):
if await self.config.guild(entry.guild.id).ignore_other_bots() is True:
if entry.user.bot or entry.target.bot:
return
else:
if entry.user.id == self.bot.user.id:
return
duration = "NULL"
if entry.reason:
reason = entry.reason + " (This action was performed without the bot.)"
else:
reason = "This action was performed without the bot."
if entry.action == discord.AuditLogAction.kick:
moderation_type = 'KICK'
elif entry.action == discord.AuditLogAction.ban:
moderation_type = 'BAN'
elif entry.action == discord.AuditLogAction.unban:
moderation_type = 'UNBAN'
elif entry.action == discord.AuditLogAction.member_update:
if entry.after.timed_out_until is not None:
timed_out_until_aware = entry.after.timed_out_until.replace(tzinfo=timezone.utc)
duration_datetime = timed_out_until_aware - datetime.now(tz=timezone.utc)
minutes = round(duration_datetime.total_seconds() / 60)
duration = timedelta(minutes=minutes)
moderation_type = 'MUTE'
else:
moderation_type = 'UNMUTE'
else:
return
await self.mysql_log(entry.guild.id, entry.user.id, moderation_type, entry.target.id, 0, duration, reason)
async def connect(self):
"""Connects to the MySQL database, and returns a connection object."""
conf = await self.check_conf([
'mysql_address',
'mysql_database',
'mysql_username',
'mysql_password'
])
if conf:
raise LookupError("MySQL connection details not set properly!")
try:
connection = mysql.connector.connect(
host=await self.config.mysql_address(),
user=await self.config.mysql_username(),
password=await self.config.mysql_password(),
database=await self.config.mysql_database()
)
return connection
except mysql.connector.ProgrammingError as e:
self.logger.fatal("Unable to access the MySQL database!\nError:\n%s", e.msg)
raise ConnectionRefusedError(f"Unable to access the MySQL Database!\n{e.msg}") from e
async def create_guild_table(self, guild: discord.Guild):
database = await self.connect()
cursor = database.cursor()
try:
cursor.execute(f"SELECT * FROM `moderation_{guild.id}`")
self.logger.info("MySQL Table exists for server %s (%s)", guild.name, guild.id)
except mysql.connector.errors.ProgrammingError:
query = f"""
CREATE TABLE `moderation_{guild.id}` (
moderation_id INT UNIQUE PRIMARY KEY NOT NULL,
timestamp INT NOT NULL,
moderation_type LONGTEXT NOT NULL,
target_id LONGTEXT NOT NULL,
moderator_id LONGTEXT NOT NULL,
role_id LONGTEXT,
duration LONGTEXT,
end_timestamp INT,
reason LONGTEXT,
resolved BOOL NOT NULL,
resolved_by LONGTEXT,
resolve_reason LONGTEXT,
expired BOOL NOT NULL
)
"""
cursor.execute(query)
index_query_1 = "CREATE INDEX idx_target_id ON moderation_%s(target_id(25));"
cursor.execute(index_query_1, (guild.id,))
index_query_2 = "CREATE INDEX idx_moderator_id ON moderation_%s(moderator_id(25));"
cursor.execute(index_query_2, (guild.id,))
index_query_3 = "CREATE INDEX idx_moderation_id ON moderation_%s(moderation_id);"
cursor.execute(index_query_3, (guild.id,))
insert_query = f"""
INSERT INTO `moderation_{guild.id}`
(moderation_id, timestamp, moderation_type, target_id, moderator_id, role_id, duration, end_timestamp, reason, resolved, resolved_by, resolve_reason, expired)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
insert_values = (0, 0, "NULL", 0, 0, 0, "NULL", 0, "NULL", 0, "NULL", "NULL", 0)
cursor.execute(insert_query, insert_values)
database.commit()
self.logger.info("MySQL Table (moderation_%s) created for %s (%s)", guild.id, guild.name, guild.id)
database.close()
async def check_conf(self, config: list):
"""Checks if any required config options are not set."""
not_found_list = []
for item in config:
if await self.config.item() == " ":
not_found_list.append(item)
return not_found_list
def check_permissions(self, user: discord.User, permissions: list, ctx: Union[commands.Context, discord.Interaction] = None, guild: discord.Guild = None):
"""Checks if a user has a specific permission (or a list of permissions) in a channel."""
if ctx:
member = ctx.guild.get_member(user.id)
resolved_permissions = ctx.channel.permissions_for(member)
elif guild:
member = guild.get_member(user.id)
resolved_permissions = member.guild_permissions
else:
raise(KeyError)
for permission in permissions:
if not getattr(resolved_permissions, permission, False) and not resolved_permissions.administrator is True:
return permission
return False
async def mysql_log(self, guild_id: str, author_id: str, moderation_type: str, target_id: int, role_id: int, duration, reason: str):
timestamp = int(time.time())
if duration != "NULL":
end_timedelta = datetime.fromtimestamp(timestamp) + duration
end_timestamp = int(end_timedelta.timestamp())
else:
end_timestamp = 0
database = await self.connect()
cursor = database.cursor()
moderation_id = await self.get_next_case_number(guild_id=guild_id, cursor=cursor)
sql = f"INSERT INTO `moderation_{guild_id}` (moderation_id, timestamp, moderation_type, target_id, moderator_id, role_id, duration, end_timestamp, reason, resolved, resolved_by, resolve_reason, expired) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
val = (moderation_id, timestamp, moderation_type, target_id, author_id, role_id, duration, end_timestamp, f"{reason}", 0, "NULL", "NULL", 0)
cursor.execute(sql, val)
database.commit()
database.close()
self.logger.debug("MySQL row inserted into moderation_%s!\n%s, %s, %s, %s, %s, %s, %s, %s, %s, 0, NULL, NULL, 0", guild_id, moderation_id, timestamp, moderation_type, target_id, author_id, role_id, duration, end_timestamp, reason)
return moderation_id
async def get_next_case_number(self, guild_id: str, cursor = None):
"""This method returns the next case number from the MySQL table for a specific guild."""
if not cursor:
database = await self.connect()
cursor = database.cursor()
cursor.execute(f"SELECT moderation_id FROM `moderation_{guild_id}` ORDER BY moderation_id DESC LIMIT 1")
return cursor.fetchone()[0] + 1
def generate_dict(self, result):
case: dict = {
"moderation_id": result[0],
"timestamp": result[1],
"moderation_type": result[2],
"target_id": result[3],
"moderator_id": result[4],
"role_id": result[5],
"duration": result[6],
"end_timestamp": result[7],
"reason": result[8],
"resolved": result[9],
"resolved_by": result[10],
"resolve_reason": result[11],
"expired": result[12]
}
return case
async def fetch_user_dict(self, interaction: discord.Interaction, user_id: str):
"""This method returns a dictionary containing either user information or a standard deleted user template."""
try:
user = interaction.client.get_user(user_id)
if user is None:
user = await interaction.client.fetch_user(user_id)
user_dict = {
'id': user.id,
'name': user.name,
'discriminator': user.discriminator
}
except discord.errors.NotFound:
user_dict = {
'id': user_id,
'name': 'Deleted User',
'discriminator': '0'
}
return user_dict
async def fetch_role_dict(self, interaction: discord.Interaction, role_id: str):
"""This method returns a dictionary containing either role information or a standard deleted role template."""
try:
role = interaction.guild.get_role(role_id)
role_dict = {
'id': role.id,
'name': role.name
}
except discord.errors.NotFound:
role_dict = {
'id': role_id,
'name': 'Deleted Role'
}
return role_dict
async def embed_factory(self, embed_type: str, /, interaction: discord.Interaction = None, case_dict: dict = None, guild: discord.Guild = None, reason: str = None, moderation_type: str = None, response: discord.InteractionMessage = None, duration: timedelta = None, resolved: bool = False):
"""This method creates an embed from set parameters, meant for either moderation logging or contacting the moderated user.
Valid arguments for 'embed_type':
- 'message'
- 'log' - WIP
- 'case'
Required arguments for 'message':
- guild
- reason
- moderation_type
- response
- duration (optional)
Required arguments for 'log':
- interaction
- case_dict
- resolved (optional)
Required arguments for 'case':
- interaction
- case_dict"""
if embed_type == 'message':
if moderation_type in ["kicked", "banned", "tempbanned", "unbanned"]:
guild_name = guild.name
else:
guild_name = f"[{guild.name}]({response.jump_url})"
if moderation_type in ["tempbanned", "muted"] and duration:
embed_duration = f" for {humanize.precisedelta(duration)}"
else:
embed_duration = ""
if moderation_type == "note":
embed_desc = "recieved a"
else:
embed_desc = "been"
embed = discord.Embed(title=str.title(moderation_type), description=f"You have {embed_desc} {moderation_type}{embed_duration} in {guild_name}.", color=await self.bot.get_embed_color(None), timestamp=datetime.now())
embed.add_field(name='Reason', value=f"`{reason}`")
embed.set_author(name=guild.name, icon_url=guild.icon.url)
embed.set_footer(text=f"Case #{await self.get_next_case_number(guild.id)}", icon_url="https://cdn.discordapp.com/attachments/1070822161389994054/1159469476773904414/arrow-right-circle-icon-512x512-2p1e2aaw.png?ex=65312319&is=651eae19&hm=3cebdd28e805c13a79ec48ef87c32ca532ffa6b9ede2e48d0cf8e5e81f3a6818&")
return embed
if embed_type == 'case':
target_user = await self.fetch_user_dict(interaction, case_dict['target_id'])
moderator_user = await self.fetch_user_dict(interaction, case_dict['moderator_id'])
target_name = f"`{target_user['name']}`" if target_user['discriminator'] == "0" else f"`{target_user['name']}#{target_user['discriminator']}`"
moderator_name = f"`{moderator_user['name']}`" if moderator_user['discriminator'] == "0" else f"`{moderator_user['name']}#{moderator_user['discriminator']}`"
embed = discord.Embed(title=f"📕 Case #{case_dict['moderation_id']}", color=await self.bot.get_embed_color(None))
embed.description = f"**Type:** {str.title(case_dict['moderation_type'])}\n**Target:** {target_name} ({target_user['id']})\n**Moderator:** {moderator_name} ({moderator_user['id']})\n**Resolved:** {bool(case_dict['resolved'])}\n**Timestamp:** <t:{case_dict['timestamp']}> | <t:{case_dict['timestamp']}:R>"
if case_dict['duration'] != 'NULL':
td = timedelta(**{unit: int(val) for unit, val in zip(["hours", "minutes", "seconds"], case_dict["duration"].split(":"))})
duration_embed = f"{humanize.precisedelta(td)} | <t:{case_dict['end_timestamp']}:R>" if bool(case_dict['expired']) is False else str(humanize.precisedelta(td))
embed.description = embed.description + f"\n**Duration:** {duration_embed}\n**Expired:** {bool(case_dict['expired'])}"
embed.add_field(name='Reason', value=f"```{case_dict['reason']}```", inline=False)
if case_dict['resolved'] == 1:
resolved_user = await self.fetch_user_dict(interaction, case_dict['resolved_by'])
resolved_name = f"`{resolved_user['name']}`" if resolved_user['discriminator'] == "0" else f"`{resolved_user['name']}#{resolved_user['discriminator']}`"
embed.add_field(name='Resolve Reason', value=f"Resolved by {resolved_name} ({resolved_user['id']}) for:\n```{case_dict['resolve_reason']}```", inline=False)
return embed
if embed_type == 'log':
if resolved:
target_user = await self.fetch_user_dict(interaction, case_dict['target_id'])
moderator_user = await self.fetch_user_dict(interaction, case_dict['moderator_id'])
target_name = f"`{target_user['name']}`" if target_user['discriminator'] == "0" else f"`{target_user['name']}#{target_user['discriminator']}`"
moderator_name = moderator_user['name'] if moderator_user['discriminator'] == "0" else f"{moderator_user['name']}#{moderator_user['discriminator']}"
embed = discord.Embed(title=f"📕 Case #{case_dict['moderation_id']} Resolved", color=await self.bot.get_embed_color(None))
embed.description = f"**Type:** {str.title(case_dict['moderation_type'])}\n**Target:** {target_name} ({target_user['id']})\n**Moderator:** {moderator_name} ({moderator_user['id']})\n**Timestamp:** <t:{case_dict['timestamp']}> | <t:{case_dict['timestamp']}:R>"
if case_dict['duration'] != 'NULL':
td = timedelta(**{unit: int(val) for unit, val in zip(["hours", "minutes", "seconds"], case_dict["duration"].split(":"))})
duration_embed = f"{humanize.precisedelta(td)} | <t:{case_dict['end_timestamp']}:R>" if case_dict["expired"] == '0' else str(humanize.precisedelta(td))
embed.description = embed.description + f"\n**Duration:** {duration_embed}\n**Expired:** {bool(case_dict['expired'])}"
embed.add_field(name='Reason', value=f"```{case_dict['reason']}```", inline=False)
resolved_user = await self.fetch_user_dict(interaction, case_dict['resolved_by'])
resolved_name = resolved_user['name'] if resolved_user['discriminator'] == "0" else f"{resolved_user['name']}#{resolved_user['discriminator']}"
embed.add_field(name='Resolve Reason', value=f"Resolved by {resolved_name} ({resolved_user['id']}) for:\n```{case_dict['resolve_reason']}```", inline=False)
else:
target_user = await self.fetch_user_dict(interaction, case_dict['target_id'])
moderator_user = await self.fetch_user_dict(interaction, case_dict['moderator_id'])
target_name = f"`{target_user['name']}`" if target_user['discriminator'] == "0" else f"`{target_user['name']}#{target_user['discriminator']}`"
moderator_name = moderator_user['name'] if moderator_user['discriminator'] == "0" else f"{moderator_user['name']}#{moderator_user['discriminator']}"
embed = discord.Embed(title=f"📕 Case #{case_dict['moderation_id']}", color=await self.bot.get_embed_color(None))
embed.description = f"**Type:** {str.title(case_dict['moderation_type'])}\n**Target:** {target_name} ({target_user['id']})\n**Moderator:** {moderator_name} ({moderator_user['id']})\n**Timestamp:** <t:{case_dict['timestamp']}> | <t:{case_dict['timestamp']}:R>"
if case_dict['duration'] != 'NULL':
td = timedelta(**{unit: int(val) for unit, val in zip(["hours", "minutes", "seconds"], case_dict["duration"].split(":"))})
embed.description = embed.description + f"\n**Duration:** {humanize.precisedelta(td)} | <t:{case_dict['end_timestamp']}:R>"
embed.add_field(name='Reason', value=f"```{case_dict['reason']}```", inline=False)
return embed
raise(TypeError("'type' argument is invalid!"))
async def fetch_case(self, moderation_id: int, guild_id: str):
"""This method fetches a case from the database and returns the case's dictionary."""
database = await self.connect()
cursor = database.cursor()
query = "SELECT * FROM moderation_%s WHERE moderation_id = %s;"
cursor.execute(query, (guild_id, moderation_id))
result = cursor.fetchone()
cursor.close()
database.close()
return self.generate_dict(result)
async def log(self, interaction: discord.Interaction, moderation_id: int, resolved: bool = False):
"""This method sends a message to the guild's configured logging channel when an infraction takes place."""
logging_channel_id = await self.config.guild(interaction.guild).log_channel()
if logging_channel_id != " ":
logging_channel = interaction.guild.get_channel(logging_channel_id)
case = await self.fetch_case(moderation_id, interaction.guild.id)
if case:
embed = await self.embed_factory('log', interaction=interaction, case_dict=case, resolved=resolved)
try:
await logging_channel.send(embed=embed)
except discord.errors.Forbidden:
return
#######################################################################################################################
### COMMANDS
#######################################################################################################################
@app_commands.command(name="note")
async def note(self, interaction: discord.Interaction, target: discord.User, reason: str, silent: bool = None):
"""Add a note to a user.
Parameters
-----------
target: discord.User
Who are you noting?
reason: str
Why are you noting this user?
silent: bool
Should the user be messaged?"""
if interaction.guild.get_member(target.id):
target_member = interaction.guild.get_member(target.id)
if interaction.guild.get_member(interaction.client.user.id).top_role <= target_member.top_role:
await interaction.response.send_message(content="You cannot moderate members with a role higher than the bot!", ephemeral=True)
return
if interaction.user.top_role <= target_member.top_role:
await interaction.response.send_message(content="You cannot moderate members with a higher role than you!", ephemeral=True)
return
await interaction.response.send_message(content=f"{target.mention} has recieved a note!\n**Reason** - `{reason}`")
if silent is None:
silent = not await self.config.guild(interaction.guild).dm_users()
if silent is False:
try:
embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='note', response=await interaction.original_response())
await target.send(embed=embed)
except discord.errors.HTTPException:
pass
moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'NOTE', target.id, 0, 'NULL', reason)
await interaction.edit_original_response(content=f"{target.mention} has received a note! (Case `#{moderation_id}`)\n**Reason** - `{reason}`")
await self.log(interaction, moderation_id)
@app_commands.command(name="warn")
async def warn(self, interaction: discord.Interaction, target: discord.Member, reason: str, silent: bool = None):
"""Warn a user.
Parameters
-----------
target: discord.Member
Who are you warning?
reason: str
Why are you warning this user?
silent: bool
Should the user be messaged?"""
if interaction.guild.get_member(target.id):
target_member = interaction.guild.get_member(target.id)
if interaction.guild.get_member(interaction.client.user.id).top_role <= target_member.top_role:
await interaction.response.send_message(content="You cannot moderate members with a role higher than the bot!", ephemeral=True)
return
if interaction.user.top_role <= target_member.top_role:
await interaction.response.send_message(content="You cannot moderate members with a higher role than you!", ephemeral=True)
return
await interaction.response.send_message(content=f"{target.mention} has been warned!\n**Reason** - `{reason}`")
if silent is None:
silent = not await self.config.guild(interaction.guild).dm_users()
if silent is False:
try:
embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='warned', response=await interaction.original_response())
await target.send(embed=embed)
except discord.errors.HTTPException:
pass
moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'WARN', target.id, 0, 'NULL', reason)
await interaction.edit_original_response(content=f"{target.mention} has been warned! (Case `#{moderation_id}`)\n**Reason** - `{reason}`")
await self.log(interaction, moderation_id)
@app_commands.command(name="mute")
async def mute(self, interaction: discord.Interaction, target: discord.Member, duration: str, reason: str, silent: bool = None):
"""Mute a user.
Parameters
-----------
target: discord.Member
Who are you unbanning?
duration: str
How long are you muting this user for?
reason: str
Why are you unbanning this user?
silent: bool
Should the user be messaged?"""
if interaction.guild.get_member(target.id):
target_member = interaction.guild.get_member(target.id)
if interaction.guild.get_member(interaction.client.user.id).top_role <= target_member.top_role:
await interaction.response.send_message(content="You cannot moderate members with a role higher than the bot!", ephemeral=True)
return
if interaction.user.top_role <= target_member.top_role:
await interaction.response.send_message(content="You cannot moderate members with a higher role than you!", ephemeral=True)
return
permissions = self.check_permissions(interaction.client.user, ['moderate_members'], interaction)
if permissions:
await interaction.response.send_message(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True)
return
if target.is_timed_out() is True:
await interaction.response.send_message(f"{target.mention} is already muted!", allowed_mentions=discord.AllowedMentions(users=False), ephemeral=True)
return
try:
parsed_time = parse(sval=duration, as_timedelta=True, raise_exception=True)
except ValueError:
await interaction.response.send_message("Please provide a valid duration!", ephemeral=True)
return
if parsed_time.total_seconds() / 1000 > 2419200000:
await interaction.response.send_message("Please provide a duration that is less than 28 days.")
return
await target.timeout(parsed_time, reason=f"Muted by {interaction.user.id} for: {reason}")
await interaction.response.send_message(content=f"{target.mention} has been muted for {humanize.precisedelta(parsed_time)}!\n**Reason** - `{reason}`")
if silent is None:
silent = not await self.config.guild(interaction.guild).dm_users()
if silent is False:
try:
embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='muted', response=await interaction.original_response(), duration=parsed_time)
await target.send(embed=embed)
except discord.errors.HTTPException:
pass
moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'MUTE', target.id, 0, parsed_time, reason)
await interaction.edit_original_response(content=f"{target.mention} has been muted for {humanize.precisedelta(parsed_time)}! (Case `#{moderation_id}`)\n**Reason** - `{reason}`")
await self.log(interaction, moderation_id)
@app_commands.command(name="unmute")
async def unmute(self, interaction: discord.Interaction, target: discord.Member, reason: str = None, silent: bool = None):
"""Unmute a user.
Parameters
-----------
target: discord.user
Who are you unmuting?
reason: str
Why are you unmuting this user?
silent: bool
Should the user be messaged?"""
if interaction.guild.get_member(target.id):
target_member = interaction.guild.get_member(target.id)
if interaction.guild.get_member(interaction.client.user.id).top_role <= target_member.top_role:
await interaction.response.send_message(content="You cannot moderate members with a role higher than the bot!", ephemeral=True)
return
if interaction.user.top_role <= target_member.top_role:
await interaction.response.send_message(content="You cannot moderate members with a higher role than you!", ephemeral=True)
return
permissions = self.check_permissions(interaction.client.user, ['moderate_members'], interaction)
if permissions:
await interaction.response.send_message(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True)
return
if target.is_timed_out() is False:
await interaction.response.send_message(f"{target.mention} is not muted!", allowed_mentions=discord.AllowedMentions(users=False), ephemeral=True)
return
if reason:
await target.timeout(None, reason=f"Unmuted by {interaction.user.id} for: {reason}")
else:
await target.timeout(None, reason=f"Unbanned by {interaction.user.id}")
reason = "No reason given."
await interaction.response.send_message(content=f"{target.mention} has been unmuted!\n**Reason** - `{reason}`")
if silent is None:
silent = not await self.config.guild(interaction.guild).dm_users()
if silent is False:
try:
embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='unmuted', response=await interaction.original_response())
await target.send(embed=embed)
except discord.errors.HTTPException:
pass
moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'UNMUTE', target.id, 0, 'NULL', reason)
await interaction.edit_original_response(content=f"{target.mention} has been unmuted! (Case `#{moderation_id}`)\n**Reason** - `{reason}`")
await self.log(interaction, moderation_id)
@app_commands.command(name="kick")
async def kick(self, interaction: discord.Interaction, target: discord.Member, reason: str, silent: bool = None):
"""Kick a user.
Parameters
-----------
target: discord.user
Who are you kicking?
reason: str
Why are you kicking this user?
silent: bool
Should the user be messaged?"""
if interaction.guild.get_member(target.id):
target_member = interaction.guild.get_member(target.id)
if interaction.guild.get_member(interaction.client.user.id).top_role <= target_member.top_role:
await interaction.response.send_message(content="You cannot moderate members with a role higher than the bot!", ephemeral=True)
return
if interaction.user.top_role <= target_member.top_role:
await interaction.response.send_message(content="You cannot moderate members with a higher role than you!", ephemeral=True)
return
permissions = self.check_permissions(interaction.client.user, ['kick_members'], interaction)
if permissions:
await interaction.response.send_message(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True)
return
await interaction.response.send_message(content=f"{target.mention} has been kicked!\n**Reason** - `{reason}`")
if silent is None:
silent = not await self.config.guild(interaction.guild).dm_users()
if silent is False:
try:
embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='kicked', response=await interaction.original_response())
await target.send(embed=embed)
except discord.errors.HTTPException:
pass
await target.kick(f"Kicked by {interaction.user.id} for: {reason}")
moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'KICK', target.id, 0, 'NULL', reason)
await interaction.edit_original_response(content=f"{target.mention} has been kicked! (Case `#{moderation_id}`)\n**Reason** - `{reason}`")
await self.log(interaction, moderation_id)
@app_commands.command(name="ban")
@app_commands.choices(delete_messages=[
Choice(name="None", value=0),
Choice(name='1 Hour', value=3600),
Choice(name='12 Hours', value=43200),
Choice(name='1 Day', value=86400),
Choice(name='3 Days', value=259200),
Choice(name='7 Days', value=604800),
])
async def ban(self, interaction: discord.Interaction, target: discord.User, reason: str, duration: str = None, delete_messages: Choice[int] = 0, silent: bool = None):
"""Ban a user.
Parameters
-----------
target: discord.user
Who are you banning?
duration: str
How long are you banning this user for?
reason: str
Why are you banning this user?
delete_messages: Choices[int]
How many days of messages to delete?
silent: bool
Should the user be messaged?"""
if interaction.guild.get_member(target.id):
target_member = interaction.guild.get_member(target.id)
if interaction.guild.get_member(interaction.client.user.id).top_role <= target_member.top_role:
await interaction.response.send_message(content="You cannot moderate members with a role higher than the bot!", ephemeral=True)
return
if interaction.user.top_role <= target_member.top_role:
await interaction.response.send_message(content="You cannot moderate members with a higher role than you!", ephemeral=True)
return
permissions = self.check_permissions(interaction.client.user, ['ban_members'], interaction)
if permissions:
await interaction.response.send_message(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True)
return
try:
await interaction.guild.fetch_ban(target)
await interaction.response.send_message(content=f"{target.mention} is already banned!", ephemeral=True)
return
except discord.errors.NotFound:
pass
if duration:
try:
parsed_time = parse(sval=duration, as_timedelta=True, raise_exception=True)
except ValueError:
await interaction.response.send_message("Please provide a valid duration!", ephemeral=True)
return
await interaction.response.send_message(content=f"{target.mention} has been banned for {humanize.precisedelta(parsed_time)}!\n**Reason** - `{reason}`")
try:
embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='tempbanned', response=await interaction.original_response(), duration=parsed_time)
await target.send(embed=embed)
except discord.errors.HTTPException:
pass
await interaction.guild.ban(target, reason=f"Tempbanned by {interaction.user.id} for: {reason} (Duration: {parsed_time})", delete_message_seconds=delete_messages)
moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'TEMPBAN', target.id, 0, parsed_time, reason)
await interaction.edit_original_response(content=f"{target.mention} has been banned for {humanize.precisedelta(parsed_time)}! (Case `#{moderation_id}`)\n**Reason** - `{reason}`")
await self.log(interaction, moderation_id)
else:
await interaction.response.send_message(content=f"{target.mention} has been banned!\n**Reason** - `{reason}`")
if silent is None:
silent = not await self.config.guild(interaction.guild).dm_users()
if silent is False:
try:
embed = embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='banned', response=await interaction.original_response())
await target.send(embed=embed)
except discord.errors.HTTPException:
pass
await interaction.guild.ban(target, reason=f"Banned by {interaction.user.id} for: {reason}", delete_message_seconds=delete_messages)
moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'BAN', target.id, 0, 'NULL', reason)
await interaction.edit_original_response(content=f"{target.mention} has been banned! (Case `#{moderation_id}`)\n**Reason** - `{reason}`")
await self.log(interaction, moderation_id)
@app_commands.command(name="unban")
async def unban(self, interaction: discord.Interaction, target: discord.User, reason: str = None, silent: bool = None):
"""Unban a user.
Parameters
-----------
target: discord.user
Who are you unbanning?
reason: str
Why are you unbanning this user?
silent: bool
Should the user be messaged?"""
if interaction.guild.get_member(target.id):
target_member = interaction.guild.get_member(target.id)
if interaction.guild.get_member(interaction.client.user.id).top_role <= target_member.top_role:
await interaction.response.send_message(content="You cannot moderate members with a role higher than the bot!", ephemeral=True)
return
if interaction.user.top_role <= target_member.top_role:
await interaction.response.send_message(content="You cannot moderate members with a higher role than you!", ephemeral=True)
return
permissions = self.check_permissions(interaction.client.user, ['ban_members'], interaction)
if permissions:
await interaction.response.send_message(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True)
return
try:
await interaction.guild.fetch_ban(target)
except discord.errors.NotFound:
await interaction.response.send_message(content=f"{target.mention} is not banned!", ephemeral=True)
return
if reason:
await interaction.guild.unban(target, reason=f"Unbanned by {interaction.user.id} for: {reason}")
else:
await interaction.guild.unban(target, reason=f"Unbanned by {interaction.user.id}")
reason = "No reason given."
await interaction.response.send_message(content=f"{target.mention} has been unbanned!\n**Reason** - `{reason}`")
if silent is None:
silent = not await self.config.guild(interaction.guild).dm_users()
if silent is False:
try:
embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='unbanned', response=await interaction.original_response())
await target.send(embed=embed)
except discord.errors.HTTPException:
pass
moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'UNBAN', target.id, 0, 'NULL', reason)
await interaction.edit_original_response(content=f"{target.mention} has been unbanned! (Case `#{moderation_id}`)\n**Reason** - `{reason}`")
await self.log(interaction, moderation_id)
@app_commands.command(name="history")
async def history(self, interaction: discord.Interaction, target: discord.User = None, moderator: discord.User = None, pagesize: app_commands.Range[int, 1, 25] = 5, page: int = 1, ephemeral: bool = False, export: bool = False):
"""List previous infractions.
Parameters
-----------
target: discord.User
User whose infractions to query, overrides moderator if both are given
moderator: discord.User
Query by moderator
pagesize: app_commands.Range[int, 1, 25]
Amount of infractions to list per page
page: int
Page to select
ephemeral: bool
Hide the command response
export: bool
Exports the server's entire moderation history to a JSON file"""
await interaction.response.defer(ephemeral=ephemeral)
permissions = self.check_permissions(interaction.client.user, ['embed_links'], interaction)
if permissions:
await interaction.followup.send(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True)
return
database = await self.connect()
cursor = database.cursor()
if target:
query = """SELECT *
FROM moderation_%s
WHERE target_id = %s
ORDER BY moderation_id DESC;"""
cursor.execute(query, (interaction.guild.id, target.id))
elif moderator:
query = """SELECT *
FROM moderation_%s
WHERE moderator_id = %s
ORDER BY moderation_id DESC;"""
cursor.execute(query, (interaction.guild.id, moderator.id))
else:
query = """SELECT *
FROM moderation_%s
ORDER BY moderation_id DESC;"""
cursor.execute(query, (interaction.guild.id,))
results = cursor.fetchall()
result_dict_list = []
for result in results:
case_dict = self.generate_dict(result)
if case_dict['moderation_id'] == 0:
continue
result_dict_list.append(case_dict)
if export:
try:
filename = str(data_manager.cog_data_path(cog_instance=self)) + str(os.sep) + f"moderation_{interaction.guild.id}.json"
with open(filename, "w", encoding="utf-8") as f:
json.dump(result_dict_list, f, indent=2)
await interaction.followup.send(file=discord.File(filename, f"moderation_{interaction.guild.id}.json"), ephemeral=ephemeral)
os.remove(filename)
return
except json.JSONDecodeError as e:
await interaction.followup.send(content=f"An error occured while exporting the moderation history.\nError:\n```{e}```", ephemeral=ephemeral)
return
case_quantity = len(result_dict_list)
page_quantity = round(case_quantity / pagesize)
start_index = (page - 1) * pagesize
end_index = page * pagesize
embed = discord.Embed(color=await self.bot.get_embed_color(None))
embed.set_author(icon_url=interaction.guild.icon.url, name='Infraction History')
embed.set_footer(text=f"Page {page}/{page_quantity} | {case_quantity} Results")
memory_dict = {}
for case in result_dict_list[start_index:end_index]:
if case['target_id'] not in memory_dict:
memory_dict[str(case['target_id'])] = await self.fetch_user_dict(interaction, case['target_id'])
target_user = memory_dict[str(case['target_id'])]
if case['moderator_id'] not in memory_dict:
memory_dict[str(case['moderator_id'])] = await self.fetch_user_dict(interaction, case['moderator_id'])
moderator_user = memory_dict[str(case['moderator_id'])]
target_name = target_user['name'] if target_user['discriminator'] == "0" else f"{target_user['name']}#{target_user['discriminator']}"
moderator_name = moderator_user['name'] if moderator_user['discriminator'] == "0" else f"{moderator_user['name']}#{moderator_user['discriminator']}"
field_name = f"Case #{case['moderation_id']} ({str.title(case['moderation_type'])})"
field_value = f"**Target:** `{target_name}` ({target_user['id']})\n**Moderator:** `{moderator_name}` ({moderator_user['id']})"
if len(case['reason']) > 150:
field_value += f"\n**Reason:** `{str(case['reason'])[:150]}...`"
else:
field_value += f"\n**Reason:** `{str(case['reason'])}`"
if case['duration'] != 'NULL':
td = timedelta(**{unit: int(val) for unit, val in zip(["hours", "minutes", "seconds"], case["duration"].split(":"))})
duration_embed = f"{humanize.precisedelta(td)} | <t:{case['end_timestamp']}:R>" if bool(case['expired']) is False else f"{humanize.precisedelta(td)} | Expired"
field_value = field_value + f"\n**Duration:** {duration_embed}"
if bool(case['resolved']):
field_value = field_value + "\n**Resolved:** True"
embed.add_field(name=field_name, value=field_value, inline=False)
await interaction.followup.send(embed=embed, ephemeral=ephemeral)
@app_commands.command(name="resolve")
async def resolve(self, interaction: discord.Interaction, case_number: int, reason: str = None):
"""Resolve a specific case.
Parameters
-----------
case_number: int
Case number of the case you're trying to resolve
reason: str
Reason for resolving case"""
permissions = self.check_permissions(interaction.client.user, ['embed_links', 'moderate_members', 'ban_members'], interaction)
if permissions:
await interaction.response.send_message(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True)
return
conf = await self.check_conf(['mysql_database'])
if conf:
raise(LookupError)
database = await self.connect()
cursor = database.cursor()
db = await self.config.mysql_database()
query_1 = "SELECT * FROM moderation_%s WHERE moderation_id = %s;"
cursor.execute(query_1, (interaction.guild.id, case_number))
result_1 = cursor.fetchone()
if result_1 is None or case_number == 0:
await interaction.response.send_message(content=f"There is no moderation with a case number of {case_number}.", ephemeral=True)
return
query_2 = "SELECT * FROM moderation_%s WHERE moderation_id = %s AND resolved = 0;"
cursor.execute(query_2, (interaction.guild.id, case_number))
result_2 = cursor.fetchone()
if result_2 is None:
await interaction.response.send_message(content=f"This moderation has already been resolved!\nUse `/case {case_number}` for more information.", ephemeral=True)
return
case = self.generate_dict(result_2)
if reason is None:
reason = "No reason given."
if case['moderation_type'] in ['UNMUTE', 'UNBAN']:
await interaction.response.send_message(content="You cannot resolve this type of moderation!", ephemeral=True)
if case['moderation_type'] in ['MUTE', 'TEMPBAN', 'BAN']:
if case['moderation_type'] == 'MUTE':
try:
member = await interaction.guild.fetch_member(case['target_id'])
await member.timeout(None, reason=f"Case #{case_number} resolved by {interaction.user.id}")
except discord.NotFound:
pass
if case['moderation_type'] in ['TEMPBAN', 'BAN']:
try:
user = await interaction.client.fetch_user(case['target_id'])
await interaction.guild.unban(user, reason=f"Case #{case_number} resolved by {interaction.user.id}")
except discord.NotFound:
pass
resolve_query = f"UPDATE `{db}`.`moderation_{interaction.guild.id}` SET resolved = 1, expired = 1, resolved_by = %s, resolve_reason = %s WHERE moderation_id = %s"
else:
resolve_query = f"UPDATE `{db}`.`moderation_{interaction.guild.id}` SET resolved = 1, resolved_by = %s, resolve_reason = %s WHERE moderation_id = %s"
cursor.execute(resolve_query, (interaction.user.id, reason, case_number))
database.commit()
response_query = "SELECT * FROM moderation_%s WHERE moderation_id = %s;"
cursor.execute(response_query, (interaction.guild.id, case_number))
result = cursor.fetchone()
case_dict = self.generate_dict(result)
embed = await self.embed_factory('case', interaction=interaction, case_dict=case_dict)
await interaction.response.send_message(content=f"✅ Moderation #{case_number} resolved!", embed=embed)
await self.log(interaction, case_number, True)
cursor.close()
database.close()
@app_commands.command(name="case")
async def case(self, interaction: discord.Interaction, case_number: int, ephemeral: bool = False):
"""Check the details of a specific case.
Parameters
-----------
case_number: int
What case are you looking up?
ephemeral: bool
Hide the command response"""
permissions = self.check_permissions(interaction.client.user, ['embed_links'], interaction)
if permissions:
await interaction.response.send_message(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True)
return
if case_number != 0:
case = await self.fetch_case(case_number, interaction.guild.id)
if case:
embed = await self.embed_factory('case', interaction=interaction, case_dict=case)
await interaction.response.send_message(embed=embed, ephemeral=ephemeral)
return
await interaction.response.send_message(content=f"No case with case number `{case_number}` found.", ephemeral=True)
@tasks.loop(minutes=1)
async def handle_expiry(self):
conf = await self.check_conf(['mysql_database'])
if conf:
raise(LookupError)
database = await self.connect()
cursor = database.cursor()
db = await self.config.mysql_database()
guilds: list[discord.Guild] = self.bot.guilds
for guild in guilds:
if not await self.bot.cog_disabled_in_guild(self, guild):
tempban_query = f"SELECT target_id, moderation_id FROM moderation_{guild.id} WHERE end_timestamp != 0 AND end_timestamp <= %s AND moderation_type = 'TEMPBAN' AND expired = 0"
try:
cursor.execute(tempban_query, (time.time(),))
result = cursor.fetchall()
except mysql.connector.errors.ProgrammingError:
continue
target_ids = [row[0] for row in result]
moderation_ids = [row[1] for row in result]
for target_id, moderation_id in zip(target_ids, moderation_ids):
user: discord.User = await self.bot.fetch_user(target_id)
try:
await guild.unban(user, reason=f"Automatic unban from case #{moderation_id}")
embed = await self.embed_factory('message', guild, f'Automatic unban from case #{moderation_id}', 'unbanned')
try:
await user.send(embed=embed)
except discord.errors.HTTPException:
pass
except [discord.errors.NotFound, discord.errors.Forbidden, discord.errors.HTTPException] as e:
print(f"Failed to unban {user.name}#{user.discriminator} ({user.id}) from {guild.name} ({guild.id})\n{e}")
expiry_query = f"UPDATE `{db}`.`moderation_{guild.id}` SET expired = 1 WHERE (end_timestamp != 0 AND end_timestamp <= %s AND expired = 0 AND moderation_type != 'BLACKLIST') OR (expired = 0 AND resolved = 1 AND moderation_type != 'BLACKLIST')"
cursor.execute(expiry_query, (time.time(),))
blacklist_query = f"SELECT target_id, moderation_id, role_id FROM moderation_{guild.id} WHERE end_timestamp != 0 AND end_timestamp <= %s AND moderation_type = 'BLACKLIST' AND expired = 0"
try:
cursor.execute(blacklist_query, (time.time(),))
result = cursor.fetchall()
except mysql.connector.errors.ProgrammingError:
continue
target_ids = [row[0] for row in result]
moderation_ids = [row[1] for row in result]
role_ids = [row[2] for row in result]
for target_id, moderation_id, role_id in zip(target_ids, moderation_ids, role_ids):
try:
member: discord.Member = await guild.fetch_member(target_id)
role: discord.Role = guild.get_role(role_id)
if role is None:
raise discord.errors.NotFound
except [discord.errors.NotFound, discord.errors.Forbidden, discord.errors.HTTPException]:
continue
database.commit()
cursor.close()
database.close()
@commands.group(autohelp=True)
@checks.admin()
async def moderationset(self, ctx: commands.Context):
"""Manage moderation commands."""
@moderationset.command(name="ignorebots")
@checks.admin()
async def moderationset_ignorebots(self, ctx: commands.Context):
"""Toggle if the cog should ignore other bots' moderations."""
await self.config.guild(ctx.guild).ignore_other_bots.set(not await self.config.guild(ctx.guild).ignore_other_bots())
await ctx.send(f"Ignore bots setting set to {await self.config.guild(ctx.guild).ignore_other_bots()}")
@moderationset.command(name="dm")
@checks.admin()
async def moderationset_dm(self, ctx: commands.Context):
"""Toggle automatically messaging moderated users.
This option can be overridden by specifying the `silent` argument in any moderation command."""
await self.config.guild(ctx.guild).dm_users.set(not await self.config.guild(ctx.guild).dm_users())
await ctx.send(f"DM users setting set to {await self.config.guild(ctx.guild).dm_users()}")
@moderationset.command(name="logchannel")
@checks.admin()
async def moderationset_logchannel(self, ctx: commands.Context, channel: discord.TextChannel = None):
"""Set a channel to log infractions to."""
if channel:
await self.config.guild(ctx.guild).log_channel.set(channel.id)
await ctx.send(f"Logging channel set to {channel.mention}.")
else:
await self.config.guild(ctx.guild).log_channel.set(" ")
await ctx.send("Logging channel disabled.")
@moderationset.command(name="mysql")
@checks.is_owner()
async def moderationset_mysql(self, ctx: commands.Context):
"""Configure MySQL connection details."""
await ctx.message.add_reaction("")
await ctx.author.send(content="Click the button below to configure your MySQL connection details.", view=self.ConfigButtons(60))
class ConfigButtons(discord.ui.View):
def __init__(self, timeout):
super().__init__()
self.config = Config.get_conf(None, cog_name='Moderation', identifier=481923957134912)
@discord.ui.button(label="Edit", style=discord.ButtonStyle.success)
async def config_button(self, interaction: discord.Interaction, button: discord.ui.Button): # pylint: disable=unused-argument
await interaction.response.send_modal(Moderation.MySQLConfigModal(self.config))
class MySQLConfigModal(discord.ui.Modal, title="MySQL Database Configuration"):
def __init__(self, config):
super().__init__()
self.config = config
address = discord.ui.TextInput(
label="Address",
placeholder="Input your MySQL address here.",
style=discord.TextStyle.short,
required=False,
max_length=300
)
database = discord.ui.TextInput(
label="Database",
placeholder="Input the name of your database here.",
style=discord.TextStyle.short,
required=False,
max_length=300
)
username = discord.ui.TextInput(
label="Username",
placeholder="Input your MySQL username here.",
style=discord.TextStyle.short,
required=False,
max_length=300
)
password = discord.ui.TextInput(
label="Password",
placeholder="Input your MySQL password here.",
style=discord.TextStyle.short,
required=False,
max_length=300
)
async def on_submit(self, interaction: discord.Interaction):
message = ""
if self.address.value != "":
await self.config.mysql_address.set(self.address.value)
message += f"- Address set to\n - `{self.address.value}`\n"
if self.database.value != "":
await self.config.mysql_database.set(self.database.value)
message += f"- Database set to\n - `{self.database.value}`\n"
if self.username.value != "":
await self.config.mysql_username.set(self.username.value)
message += f"- Username set to\n - `{self.username.value}`\n"
if self.password.value != "":
await self.config.mysql_password.set(self.password.value)
trimmed_password = self.password.value[:8]
message += f"- Password set to\n - `{trimmed_password}` - Trimmed for security\n"
if message == "":
trimmed_password = str(await self.config.mysql_password())[:8]
send = f"No changes were made.\nCurrent configuration:\n- Address:\n - `{await self.config.mysql_address()}`\n- Database:\n - `{await self.config.mysql_database()}`\n- Username:\n - `{await self.config.mysql_username()}`\n- Password:\n - `{trimmed_password}` - Trimmed for security"
else:
send = f"Configuration changed:\n{message}"
await interaction.response.send_message(send, ephemeral=True)
@moderationset.group(autohelp=True)
@checks.admin()
async def moderationset_import(self, ctx: commands.Context):
"""Import moderations from other bots."""
@moderationset_import.command(name="galacticbot")
@checks.admin()
async def moderationset_import_galacticbot(self, ctx: commands.Context):
"""Import moderations from GalacticBot. **UNFINISHED!**"""
await ctx.send("Are you sure you want to import GalacticBot moderations? This will overwrite any moderations that already exist in the database.", view=self.GalacticBotImportButtons(60))
class GalacticBotImportButtons(discord.ui.View):
def __init__(self, timeout):
super().__init__()
self.config = Config.get_conf(None, cog_name='Moderation', identifier=481923957134912)
@discord.ui.button(label="Yes", style=discord.ButtonStyle.success)
async def import_button_y(self, interaction: discord.Interaction, button: discord.ui.Button): # pylint: disable=unused-argument
await interaction.response.send_modal(Moderation.MySQLConfigModal(self.config))
@discord.ui.button(label="No", style=discord.ButtonStyle.danger)
async def import_button_n(self, interaction: discord.Interaction, button: discord.ui.Button): # pylint: disable=unused-argument
await interaction.response.send_modal(Moderation.MySQLConfigModal(self.config))
@commands.command(aliases=["tdc"])
async def timedeltaconvert(self, ctx: commands.Context, *, duration: str):
"""This command converts a duration to a [`timedelta`](https://docs.python.org/3/library/datetime.html#datetime.timedelta) Python object.
**Example usage**
`[p]timedeltaconvert 1 day 15hr 82 minutes 52s`
**Output**
`1 day, 16:22:52`"""
try:
parsed_time = parse(duration, as_timedelta=True, raise_exception=True)
await ctx.send(f"`{str(parsed_time)}`")
except ValueError:
await ctx.send("Please provide a convertible value!")