GalaxyCogs/moderation/moderation.py

495 lines
25 KiB
Python
Raw Normal View History

import logging
import time
from datetime import datetime, timedelta, timezone
import discord
import humanize
import mysql.connector
from pytimeparse2 import disable_dateutil, parse
from redbot.core import app_commands, checks, Config, commands
from redbot.core.app_commands import Choice
class Moderation(commands.Cog):
2023-09-27 13:24:57 -04:00
"""Custom cog moderation cog, meant to copy GalacticBot.
Developed by SeaswimmerTheFsh."""
def __init__(self, bot):
self.bot = bot
self.config = Config.get_conf(self, identifier=481923957134912)
self.config.register_global(
mysql_address= " ",
mysql_database = " ",
mysql_username = " ",
mysql_password = " ",
ignore_other_bots = True
)
disable_dateutil()
async def cog_load(self):
"""This method prepares the database schema for all of the guilds the bot is currently in."""
conf = await self.check_conf([
'mysql_address',
'mysql_database',
'mysql_username',
'mysql_password'
])
if conf:
logging.fatal("Failed to create tables, due to MySQL connection configuration being unset.")
return
guilds: list[discord.Guild] = self.bot.guilds
try:
for guild in guilds:
await self.create_guild_table(guild)
except ConnectionRefusedError:
return
@commands.Cog.listener('on_guild_join')
async def db_generate_guild_join(self, guild: discord.Guild):
"""This method prepares the database schema whenever the bot joins a guild."""
conf = await self.check_conf([
'mysql_address',
'mysql_database',
'mysql_username',
'mysql_password'
])
if conf:
logging.error("Failed to create a table for %s, due to MySQL connection configuration being unset.", guild.id)
return
try:
await self.create_guild_table(guild)
except ConnectionRefusedError:
return
@commands.Cog.listener('on_audit_log_entry_create')
async def autologger(self, entry: discord.AuditLogEntry):
"""This method automatically logs moderations done by users manually ("right clicks")."""
if await self.config.ignore_other_bots() is True:
if entry.user.bot or entry.target.bot:
return
2023-10-04 21:32:33 -04:00
else:
if entry.user.id == self.bot.user.id:
return
duration = "NULL"
if entry.reason:
reason = entry.reason + " (This action was performed without the bot.)"
else:
reason = "This action was performed without the bot."
if entry.action == discord.AuditLogAction.kick:
moderation_type = 'KICK'
elif entry.action == discord.AuditLogAction.ban:
moderation_type = 'BAN'
elif entry.action == discord.AuditLogAction.unban:
moderation_type = 'UNBAN'
elif entry.action == discord.AuditLogAction.member_update:
2023-10-04 12:01:54 -04:00
if entry.after.timed_out_until is not None:
2023-10-04 12:22:21 -04:00
timed_out_until_aware = entry.after.timed_out_until.replace(tzinfo=timezone.utc)
duration_datetime = timed_out_until_aware - datetime.now(tz=timezone.utc)
minutes = round(duration_datetime.total_seconds() / 60)
duration = timedelta(minutes=minutes)
moderation_type = 'MUTE'
2023-10-04 12:01:54 -04:00
else:
moderation_type = 'UNMUTE'
else:
return
await self.mysql_log(entry.guild.id, entry.user.id, moderation_type, entry.target.id, duration, reason)
async def connect(self):
"""Connects to the MySQL database, and returns a connection object."""
conf = await self.check_conf([
'mysql_address',
'mysql_database',
'mysql_username',
'mysql_password'
])
if conf:
raise LookupError("MySQL connection details not set properly!")
try:
connection = mysql.connector.connect(
host=await self.config.mysql_address(),
user=await self.config.mysql_username(),
password=await self.config.mysql_password(),
database=await self.config.mysql_database()
)
return connection
except mysql.connector.ProgrammingError as e:
logging.fatal("Unable to access the MySQL database!\nError:\n%s", e.msg)
2023-10-04 11:06:06 -04:00
raise ConnectionRefusedError(f"Unable to access the MySQL Database!\n{e.msg}") from e
async def create_guild_table(self, guild: discord.Guild):
database = await self.connect()
cursor = database.cursor()
try:
cursor.execute(f"SELECT * FROM `moderation_{guild.id}`")
2023-10-04 09:15:49 -04:00
logging.info("MySQL Table exists for server %s (%s)", guild.name, guild.id)
except mysql.connector.errors.ProgrammingError:
query = f"""
CREATE TABLE `moderation_{guild.id}` (
moderation_id INT UNIQUE PRIMARY KEY NOT NULL,
timestamp INT NOT NULL,
moderation_type LONGTEXT NOT NULL,
target_id LONGTEXT NOT NULL,
moderator_id LONGTEXT NOT NULL,
duration LONGTEXT,
end_timestamp INT,
reason LONGTEXT,
resolved BOOL NOT NULL,
resolve_reason LONGTEXT,
expired BOOL NOT NULL
)
"""
cursor.execute(query)
insert_query = f"""
INSERT INTO `moderation_{guild.id}`
(moderation_id, timestamp, moderation_type, target_id, moderator_id, duration, end_timestamp, reason, resolved, resolve_reason, expired)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
insert_values = (0, 0, "NULL", 0, 0, "NULL", 0, "NULL", 0, "NULL", 0)
cursor.execute(insert_query, insert_values)
database.commit()
database.close()
logging.info("MySQL Table (moderation_%s) created for %s (%s)", guild.id, guild.name, guild.id)
else:
database.close()
return
async def check_conf(self, config: list):
"""Checks if any required config options are not set."""
not_found_list = []
for item in config:
if await self.config.item() == " ":
not_found_list.append(item)
return not_found_list
async def mysql_log(self, guild_id: str, author_id: str, moderation_type: str, target_id: int, duration, reason: str):
timestamp = int(time.time())
if duration != "NULL":
end_timedelta = datetime.fromtimestamp(timestamp) + duration
end_timestamp = int(end_timedelta.timestamp())
else:
end_timestamp = 0
database = await self.connect()
cursor = database.cursor()
moderation_id = await self.get_next_case_number(guild_id=guild_id, cursor=cursor)
sql = f"INSERT INTO `moderation_{guild_id}` (moderation_id, timestamp, moderation_type, target_id, moderator_id, duration, end_timestamp, reason, resolved, resolve_reason, expired) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
val = (moderation_id, timestamp, moderation_type, target_id, author_id, duration, end_timestamp, f"{reason}", 0, "NULL", 0)
cursor.execute(sql, val)
database.commit()
database.close()
logging.debug("MySQL row inserted into moderation_%s!\n%s, %s, %s, %s, %s, %s, %s, %s, 0, NULL", guild_id, moderation_id, timestamp, moderation_type, target_id, author_id, duration, end_timestamp, reason)
async def get_next_case_number(self, guild_id: str, cursor = None):
"""This method returns the next case number from the MySQL table for a specific guild."""
if not cursor:
database = await self.connect()
cursor = database.cursor()
cursor.execute(f"SELECT moderation_id FROM `moderation_{guild_id}` ORDER BY moderation_id DESC LIMIT 1")
return cursor.fetchone()[0] + 1
2023-10-05 12:58:57 -04:00
async def embed_factory(self, embed_type: str, guild: discord.Guild, reason: str, moderation_type: str, response: discord.InteractionMessage = None, duration: timedelta = None):
"""This method creates an embed from set parameters, meant for either moderation logging or contacting the moderated user.
2023-10-05 09:47:53 -04:00
Valid arguments for 'embed_type':
- 'message'
- 'log' - WIP
- 'case' - WIP"""
2023-10-05 09:47:53 -04:00
if embed_type == 'message':
if moderation_type in ["kicked", "banned", "tempbanned", "unbanned"]:
guild_name = guild.name
else:
guild_name = f"[{guild.name}]({response.jump_url})"
if moderation_type in ["tempbanned", "muted"] and duration:
embed_duration = f" for {humanize.precisedelta(duration)}"
else:
embed_duration = ""
2023-10-05 10:46:52 -04:00
if moderation_type == "note":
embed_desc = "recieved a"
else:
embed_desc = "been"
embed = discord.Embed(title=str.title(moderation_type), description=f"You have {embed_desc} {moderation_type}{embed_duration} in {guild_name}.", color=await self.bot.get_embed_color(None), timestamp=datetime.now())
embed.add_field(name='Reason', value=f"`{reason}`")
embed.set_author(name=guild.name, icon_url=guild.icon.url)
embed.set_footer(text=f"Case #{await self.get_next_case_number(guild.id)}", icon_url="https://cdn.discordapp.com/attachments/1070822161389994054/1159469476773904414/arrow-right-circle-icon-512x512-2p1e2aaw.png?ex=65312319&is=651eae19&hm=3cebdd28e805c13a79ec48ef87c32ca532ffa6b9ede2e48d0cf8e5e81f3a6818&")
return embed
2023-10-05 09:47:53 -04:00
raise(TypeError("'type' argument is invalid!"))
2023-10-05 10:46:52 -04:00
@app_commands.command(name="note")
async def note(self, interaction: discord.Interaction, target: discord.Member, reason: str):
"""Add a note to a user."""
await interaction.response.send_message(content=f"{target.mention} has recieved a note!\n**Reason** - `{reason}`")
try:
embed = await self.embed_factory('message', interaction.guild, reason, 'note', await interaction.original_response())
await target.send(embed=embed)
except discord.errors.HTTPException:
pass
await self.mysql_log(interaction.guild.id, interaction.user.id, 'NOTE', target.id, 'NULL', reason)
@app_commands.command(name="warn")
async def warn(self, interaction: discord.Interaction, target: discord.Member, reason: str):
2023-10-04 16:39:58 -04:00
"""Warn a user."""
await interaction.response.send_message(content=f"{target.mention} has been warned!\n**Reason** - `{reason}`")
2023-10-04 16:39:58 -04:00
try:
embed = await self.embed_factory('message', interaction.guild, reason, 'warned', await interaction.original_response())
2023-10-04 16:39:58 -04:00
await target.send(embed=embed)
except discord.errors.HTTPException:
pass
await self.mysql_log(interaction.guild.id, interaction.user.id, 'WARN', target.id, 'NULL', reason)
2023-10-04 16:39:58 -04:00
@app_commands.command(name="mute")
async def mute(self, interaction: discord.Interaction, target: discord.Member, duration: str, reason: str):
"""Mute a user."""
if target.is_timed_out() is True:
await interaction.response.send_message(f"{target.mention} is already muted!", allowed_mentions=discord.AllowedMentions(users=False), ephemeral=True)
return
try:
parsed_time = parse(sval=duration, as_timedelta=True, raise_exception=True)
except ValueError:
2023-10-05 09:47:53 -04:00
await interaction.response.send_message("Please provide a valid duration!", ephemeral=True)
return
2023-10-04 21:51:54 -04:00
if parsed_time.total_seconds() / 1000 > 2419200000:
await interaction.response.send_message("Please provide a duration that is less than 28 days.")
return
await target.timeout(parsed_time, reason=f"Muted by {interaction.user.id} for: {reason}")
await interaction.response.send_message(content=f"{target.mention} has been muted for {humanize.precisedelta(parsed_time)}!\n**Reason** - `{reason}`")
try:
embed = await self.embed_factory('message', interaction.guild, reason, 'muted', await interaction.original_response(), parsed_time)
await target.send(embed=embed)
except discord.errors.HTTPException:
pass
await self.mysql_log(interaction.guild.id, interaction.user.id, 'MUTE', target.id, parsed_time, reason)
@app_commands.command(name="unmute")
async def unmute(self, interaction: discord.Interaction, target: discord.Member, reason: str = None):
2023-10-04 16:43:15 -04:00
"""Unmute a user."""
if target.is_timed_out() is False:
await interaction.response.send_message(f"{target.mention} is not muted!", allowed_mentions=discord.AllowedMentions(users=False), ephemeral=True)
2023-10-04 16:43:15 -04:00
return
if reason:
await target.timeout(None, reason=f"Unmuted by {interaction.user.id} for: {reason}")
else:
await target.timeout(None, reason=f"Unbanned by {interaction.user.id}")
2023-10-04 16:43:15 -04:00
reason = "No reason given."
await interaction.response.send_message(content=f"{target.mention} has been unmuted!\n**Reason** - `{reason}`")
try:
embed = await self.embed_factory('message', interaction.guild, reason, 'unmuted', await interaction.original_response())
await target.send(embed=embed)
except discord.errors.HTTPException:
pass
await self.mysql_log(interaction.guild.id, interaction.user.id, 'UNMUTE', target.id, 'NULL', reason)
@app_commands.command(name="kick")
async def kick(self, interaction: discord.Interaction, target: discord.Member, reason: str):
"""Kick a user."""
await interaction.response.send_message(content=f"{target.mention} has been kicked!\n**Reason** - `{reason}`")
2023-10-04 16:43:15 -04:00
try:
embed = await self.embed_factory('message', interaction.guild, reason, 'kicked', await interaction.original_response())
2023-10-04 16:43:15 -04:00
await target.send(embed=embed)
except discord.errors.HTTPException:
pass
await target.kick(f"Kicked by {interaction.user.id} for: {reason}")
await self.mysql_log(interaction.guild.id, interaction.user.id, 'KICK', target.id, 'NULL', reason)
2023-10-04 16:43:15 -04:00
@app_commands.command(name="ban")
@app_commands.choices(delete_messages=[
Choice(name="None", value=0),
Choice(name='1 Hour', value=3600),
Choice(name='12 Hours', value=43200),
Choice(name='1 Day', value=86400),
Choice(name='3 Days', value=259200),
Choice(name='7 Days', value=604800),
])
async def ban(self, interaction: discord.Interaction, target: discord.User, reason: str, duration: str = None, delete_messages: Choice[int] = 0):
"""Ban a user."""
try:
await interaction.guild.fetch_ban(target)
await interaction.response.send_message(content=f"{target.mention} is already banned!", ephemeral=True)
return
except discord.errors.NotFound:
pass
if duration:
try:
parsed_time = parse(sval=duration, as_timedelta=True, raise_exception=True)
except ValueError:
await interaction.response.send_message("Please provide a valid duration!", ephemeral=True)
return
await interaction.response.send_message(content=f"{target.mention} has been banned for {humanize.precisedelta(parsed_time)}!\n**Reason** - `{reason}`")
try:
embed = await self.embed_factory('message', interaction.guild, reason, 'tempbanned', await interaction.original_response(), parsed_time)
await target.send(embed=embed)
except discord.errors.HTTPException:
pass
await interaction.guild.ban(target.id, reason=f"Tempbanned by {interaction.user.id} for: {reason} (Duration: {parsed_time})", delete_message_seconds=delete_messages)
await self.mysql_log(interaction.guild.id, interaction.user.id, 'TEMPBAN', target.id, parsed_time, reason)
else:
await interaction.response.send_message(content=f"{target.mention} has been banned!\n**Reason** - `{reason}`")
try:
embed = await self.embed_factory('message', interaction.guild, reason, 'banned', await interaction.original_response())
await target.send(embed=embed)
except discord.errors.HTTPException:
pass
await interaction.guild.ban(target, reason=f"Banned by {interaction.user.id} for: {reason}", delete_message_seconds=delete_messages)
await self.mysql_log(interaction.guild.id, interaction.user.id, 'BAN', target.id, 'NULL', reason)
@app_commands.command(name="unban")
async def unban(self, interaction: discord.Interaction, target: discord.User, reason: str = None):
"""Unban a user."""
try:
await interaction.guild.fetch_ban(target)
except discord.errors.NotFound:
await interaction.response.send_message(content=f"{target.mention} is not banned!", ephemeral=True)
return
if reason:
await interaction.guild.unban(target, reason=f"Unbanned by {interaction.user.id} for: {reason}")
else:
await interaction.guild.unban(target, reason=f"Unbanned by {interaction.user.id}")
reason = "No reason given."
await interaction.response.send_message(content=f"{target.mention} has been unbanned!\n**Reason** - `{reason}`")
try:
embed = await self.embed_factory('message', interaction.guild, reason, 'unbanned', await interaction.original_response())
await target.send(embed=embed)
except discord.errors.HTTPException:
pass
await self.mysql_log(interaction.guild.id, interaction.user.id, 'UNBAN', target.id, 'NULL', reason)
2023-10-05 12:58:57 -04:00
@app_commands.command(name="case")
async def case(self, interaction: discord.Interaction, case_number: int, ephemeral: bool = False):
"""Check the details of a specific case."""
database = await self.connect()
cursor = database.cursor()
query = "SELECT * FROM moderation_%s WHERE moderation_id = %s;"
cursor.execute(query, (interaction.guild.id, case_number))
result = cursor.fetchone()
cursor.close()
database.close()
if result:
case = {
"moderation_id": result[0],
"timestamp": result[1],
"moderation_type": result[2],
"target_id": result[3],
"moderator_id": result[4],
"duration": result[5],
"end_timestamp": result[6],
"reason": result[7],
"resolved": result[8],
"resolve_reason": result[9],
"expired": result[10]
}
try:
target = await interaction.client.fetch_user(case["target_id"])
2023-10-05 12:58:57 -04:00
except discord.errors.NotFound:
target = discord.User(id=case["target_id"], name="Deleted User", discriminator="0")
try:
moderator = await interaction.client.fetch_user(case["moderator_id"])
2023-10-05 12:58:57 -04:00
except discord.errors.NotFound:
moderator = discord.User(id=case["moderator_id"], name="Deleted User", discriminator="0")
target_name = target.name if target.discriminator == "0" else f"{target.name}#{target.discriminator}"
moderator_name = moderator.name if moderator.discriminator == "0" else f"{moderator.name}#{moderator.discriminator}"
embed = discord.Embed(title=f"📕 Case #{case['moderation_id']}", color=await self.bot.get_embed_color(None))
embed.description = f"**Type:** {str.title(case['moderation_type'])}\n**Target:** {target_name} ({target.id})\n**Moderator:** {moderator_name} ({moderator.id})\n**Resolved:** {bool(case['resolved'])}\n**Timestamp:** <t:{case['timestamp']}> | <t:{case['timestamp']}:R>"
2023-10-05 12:58:57 -04:00
if case['duration'] != 'NULL':
expired = True if case["end_timestamp"] <= time.time() or case['resolved'] == 1 else False
embed.description = embed.description + f"\n**Duration:** {humanize.precisedelta(case['duration'])}\n**Expired:** {expired}"
2023-10-05 12:58:57 -04:00
embed.add_field(name='Reason', value=f"```{case['reason']}```")
if case['resolved'] == 1:
embed.add_field(name='Resolve Reason', value=f"```{case['resolve_reason']}```")
await interaction.response.send_message(embed=embed, ephemeral=ephemeral)
else:
await interaction.response.send_message(content=f"No case with case number `{case_number}` found.", ephemeral=True)
@commands.group(autohelp=True)
@checks.admin()
async def moderationset(self, ctx: commands.Context):
"""Manage moderation commands."""
@moderationset.command(name="ignorebots")
@checks.admin()
async def moderationset_ignorebots(self, ctx: commands.Context):
await self.config.ignore_other_bots.set(not await self.config.ignore_other_bots())
await ctx.send(f"Ignore bots setting set to {await self.config.ignore_other_bots()}")
@moderationset.command(name="mysql")
@checks.is_owner()
async def moderationset_mysql(self, ctx: commands.Context):
"""Configure MySQL connection details."""
await ctx.message.add_reaction("")
await ctx.author.send(content="Click the button below to configure your MySQL connection details.", view=self.ConfigButtons(60))
class ConfigButtons(discord.ui.View):
def __init__(self, timeout):
super().__init__()
self.config = Config.get_conf(None, cog_name='Moderation', identifier=481923957134912)
@discord.ui.button(label="Edit", style=discord.ButtonStyle.success)
async def config_button(self, interaction: discord.Interaction, button: discord.ui.Button): # pylint: disable=unused-argument
await interaction.response.send_modal(Moderation.MySQLConfigModal(self.config))
class MySQLConfigModal(discord.ui.Modal, title="MySQL Database Configuration"):
def __init__(self, config):
super().__init__()
self.config = config
address = discord.ui.TextInput(
label="Address",
placeholder="Input your MySQL address here.",
style=discord.TextStyle.short,
required=False,
max_length=300
)
database = discord.ui.TextInput(
label="Database",
placeholder="Input the name of your database here.",
style=discord.TextStyle.short,
required=False,
max_length=300
)
username = discord.ui.TextInput(
label="Username",
placeholder="Input your MySQL username here.",
style=discord.TextStyle.short,
required=False,
max_length=300
)
password = discord.ui.TextInput(
label="Password",
placeholder="Input your MySQL password here.",
style=discord.TextStyle.short,
required=False,
max_length=300
)
async def on_submit(self, interaction: discord.Interaction):
message = ""
if self.address.value != "":
await self.config.mysql_address.set(self.address.value)
message += f"- Address set to\n - `{self.address.value}`\n"
if self.database.value != "":
await self.config.mysql_database.set(self.database.value)
message += f"- Database set to\n - `{self.database.value}`\n"
if self.username.value != "":
await self.config.mysql_username.set(self.username.value)
message += f"- Username set to\n - `{self.username.value}`\n"
if self.password.value != "":
await self.config.mysql_password.set(self.password.value)
trimmed_password = self.password.value[:8]
message += f"- Password set to\n - `{trimmed_password}` - Trimmed for security\n"
if message == "":
trimmed_password = str(await self.config.mysql_password())[:8]
send = f"No changes were made.\nCurrent configuration:\n- Address:\n - `{await self.config.mysql_address()}`\n- Database:\n - `{await self.config.mysql_database()}`\n- Username:\n - `{await self.config.mysql_username()}`\n- Password:\n - `{trimmed_password}` - Trimmed for security"
else:
send = f"Configuration changed:\n{message}"
await interaction.response.send_message(send, ephemeral=True)
2023-10-04 21:32:33 -04:00
@commands.command(aliases=["tdc"])
async def timedeltaconvert(self, ctx: commands.Context, *, duration: str):
"""This command converts a duration to a [`timedelta`](https://docs.python.org/3/library/datetime.html#datetime.timedelta) Python object.
**Example usage**
`[p]timedeltaconvert 1 day 15hr 82 minutes 52s`
**Output**
`1 day, 16:22:52`"""
try:
parsed_time = parse(duration, as_timedelta=True, raise_exception=True)
await ctx.send(f"`{str(parsed_time)}`")
except ValueError:
await ctx.send("Please provide a convertible value!")