GalaxyCogs/moderation/moderation.py

700 lines
37 KiB
Python
Raw Normal View History

import logging
import time
from datetime import datetime, timedelta, timezone
import discord
import humanize
import mysql.connector
from discord.ext import tasks
from pytimeparse2 import disable_dateutil, parse
from redbot.core import app_commands, checks, Config, commands
from redbot.core.app_commands import Choice
class Moderation(commands.Cog):
2023-09-27 13:24:57 -04:00
"""Custom cog moderation cog, meant to copy GalacticBot.
Developed by SeaswimmerTheFsh."""
def __init__(self, bot):
self.bot = bot
self.config = Config.get_conf(self, identifier=481923957134912)
self.config.register_global(
mysql_address= " ",
mysql_database = " ",
mysql_username = " ",
mysql_password = " "
)
self.config.register_guild(
ignore_other_bots = True,
dm_users = True
)
disable_dateutil()
self.handle_expiry.start() # pylint: disable=no-member
2023-10-06 10:25:13 -04:00
self.logger = logging.getLogger('red.seaswimmerthefsh.moderation')
async def cog_load(self):
"""This method prepares the database schema for all of the guilds the bot is currently in."""
conf = await self.check_conf([
'mysql_address',
'mysql_database',
'mysql_username',
'mysql_password'
])
if conf:
2023-10-06 10:25:13 -04:00
self.logger.fatal("Failed to create tables, due to MySQL connection configuration being unset.")
return
guilds: list[discord.Guild] = self.bot.guilds
try:
for guild in guilds:
await self.create_guild_table(guild)
except ConnectionRefusedError:
return
async def cog_unload(self):
self.handle_expiry.cancel() # pylint: disable=no-member
@commands.Cog.listener('on_guild_join')
async def db_generate_guild_join(self, guild: discord.Guild):
"""This method prepares the database schema whenever the bot joins a guild."""
conf = await self.check_conf([
'mysql_address',
'mysql_database',
'mysql_username',
'mysql_password'
])
if conf:
2023-10-06 10:25:13 -04:00
self.logger.error("Failed to create a table for %s, due to MySQL connection configuration being unset.", guild.id)
return
try:
await self.create_guild_table(guild)
except ConnectionRefusedError:
return
@commands.Cog.listener('on_audit_log_entry_create')
async def autologger(self, entry: discord.AuditLogEntry):
"""This method automatically logs moderations done by users manually ("right clicks")."""
if await self.config.guild(entry.guild.id).ignore_other_bots() is True:
if entry.user.bot or entry.target.bot:
return
2023-10-04 21:32:33 -04:00
else:
if entry.user.id == self.bot.user.id:
return
duration = "NULL"
if entry.reason:
reason = entry.reason + " (This action was performed without the bot.)"
else:
reason = "This action was performed without the bot."
if entry.action == discord.AuditLogAction.kick:
moderation_type = 'KICK'
elif entry.action == discord.AuditLogAction.ban:
moderation_type = 'BAN'
elif entry.action == discord.AuditLogAction.unban:
moderation_type = 'UNBAN'
elif entry.action == discord.AuditLogAction.member_update:
2023-10-04 12:01:54 -04:00
if entry.after.timed_out_until is not None:
2023-10-04 12:22:21 -04:00
timed_out_until_aware = entry.after.timed_out_until.replace(tzinfo=timezone.utc)
duration_datetime = timed_out_until_aware - datetime.now(tz=timezone.utc)
minutes = round(duration_datetime.total_seconds() / 60)
duration = timedelta(minutes=minutes)
moderation_type = 'MUTE'
2023-10-04 12:01:54 -04:00
else:
moderation_type = 'UNMUTE'
else:
return
await self.mysql_log(entry.guild.id, entry.user.id, moderation_type, entry.target.id, duration, reason)
async def connect(self):
"""Connects to the MySQL database, and returns a connection object."""
conf = await self.check_conf([
'mysql_address',
'mysql_database',
'mysql_username',
'mysql_password'
])
if conf:
raise LookupError("MySQL connection details not set properly!")
try:
connection = mysql.connector.connect(
host=await self.config.mysql_address(),
user=await self.config.mysql_username(),
password=await self.config.mysql_password(),
database=await self.config.mysql_database()
)
return connection
except mysql.connector.ProgrammingError as e:
2023-10-06 10:25:13 -04:00
self.logger.fatal("Unable to access the MySQL database!\nError:\n%s", e.msg)
2023-10-04 11:06:06 -04:00
raise ConnectionRefusedError(f"Unable to access the MySQL Database!\n{e.msg}") from e
async def create_guild_table(self, guild: discord.Guild):
database = await self.connect()
cursor = database.cursor()
try:
cursor.execute(f"SELECT * FROM `moderation_{guild.id}`")
2023-10-06 10:25:13 -04:00
self.logger.info("MySQL Table exists for server %s (%s)", guild.name, guild.id)
except mysql.connector.errors.ProgrammingError:
query = f"""
CREATE TABLE `moderation_{guild.id}` (
moderation_id INT UNIQUE PRIMARY KEY NOT NULL,
timestamp INT NOT NULL,
moderation_type LONGTEXT NOT NULL,
target_id LONGTEXT NOT NULL,
moderator_id LONGTEXT NOT NULL,
duration LONGTEXT,
end_timestamp INT,
reason LONGTEXT,
resolved BOOL NOT NULL,
resolved_by LONGTEXT,
resolve_reason LONGTEXT,
expired BOOL NOT NULL
)
"""
cursor.execute(query)
insert_query = f"""
INSERT INTO `moderation_{guild.id}`
(moderation_id, timestamp, moderation_type, target_id, moderator_id, duration, end_timestamp, reason, resolved, resolved_by, resolve_reason, expired)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
insert_values = (0, 0, "NULL", 0, 0, "NULL", 0, "NULL", 0, "NULL", "NULL", 0)
cursor.execute(insert_query, insert_values)
database.commit()
database.close()
2023-10-06 10:25:13 -04:00
self.logger.info("MySQL Table (moderation_%s) created for %s (%s)", guild.id, guild.name, guild.id)
else:
database.close()
return
async def check_conf(self, config: list):
"""Checks if any required config options are not set."""
not_found_list = []
for item in config:
if await self.config.item() == " ":
not_found_list.append(item)
return not_found_list
async def mysql_log(self, guild_id: str, author_id: str, moderation_type: str, target_id: int, duration, reason: str):
timestamp = int(time.time())
if duration != "NULL":
end_timedelta = datetime.fromtimestamp(timestamp) + duration
end_timestamp = int(end_timedelta.timestamp())
else:
end_timestamp = 0
database = await self.connect()
cursor = database.cursor()
moderation_id = await self.get_next_case_number(guild_id=guild_id, cursor=cursor)
sql = f"INSERT INTO `moderation_{guild_id}` (moderation_id, timestamp, moderation_type, target_id, moderator_id, duration, end_timestamp, reason, resolved, resolved_by, resolve_reason, expired) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
val = (moderation_id, timestamp, moderation_type, target_id, author_id, duration, end_timestamp, f"{reason}", 0, "NULL", "NULL", 0)
cursor.execute(sql, val)
database.commit()
database.close()
2023-10-06 10:25:13 -04:00
self.logger.debug("MySQL row inserted into moderation_%s!\n%s, %s, %s, %s, %s, %s, %s, %s, 0, NULL, NULL, 0", guild_id, moderation_id, timestamp, moderation_type, target_id, author_id, duration, end_timestamp, reason)
async def get_next_case_number(self, guild_id: str, cursor = None):
"""This method returns the next case number from the MySQL table for a specific guild."""
if not cursor:
database = await self.connect()
cursor = database.cursor()
cursor.execute(f"SELECT moderation_id FROM `moderation_{guild_id}` ORDER BY moderation_id DESC LIMIT 1")
return cursor.fetchone()[0] + 1
def generate_dict(self, result):
case: dict = {
"moderation_id": result[0],
"timestamp": result[1],
"moderation_type": result[2],
"target_id": result[3],
"moderator_id": result[4],
"duration": result[5],
"end_timestamp": result[6],
"reason": result[7],
"resolved": result[8],
"resolved_by": result[9],
"resolve_reason": result[10],
"expired": result[11]
}
return case
async def fetch_user_dict(self, interaction: discord.Interaction, user_id: str):
"""This method returns a dictionary containing either user information or a standard deleted user template."""
try:
user = await interaction.client.fetch_user(user_id)
user_dict = {
'id': user.id,
'name': user.name,
'discriminator': user.discriminator
}
except discord.errors.NotFound:
user_dict = {
'id': user_id,
'name': 'Deleted User',
'discriminator': '0'
}
return user_dict
async def embed_factory(self, embed_type: str, /, interaction: discord.Interaction = None, case_dict: dict = None, guild: discord.Guild = None, reason: str = None, moderation_type: str = None, response: discord.InteractionMessage = None, duration: timedelta = None):
"""This method creates an embed from set parameters, meant for either moderation logging or contacting the moderated user.
2023-10-05 09:47:53 -04:00
Valid arguments for 'embed_type':
- 'message'
- 'list' - WIP
- 'case'
Required arguments for 'message':
- guild
- reason
- moderation_type
- response
- duration (optional)
Required arguments for 'case':
- interaction
- case_dict"""
2023-10-05 09:47:53 -04:00
if embed_type == 'message':
if moderation_type in ["kicked", "banned", "tempbanned", "unbanned"]:
guild_name = guild.name
else:
guild_name = f"[{guild.name}]({response.jump_url})"
if moderation_type in ["tempbanned", "muted"] and duration:
embed_duration = f" for {humanize.precisedelta(duration)}"
else:
embed_duration = ""
2023-10-05 10:46:52 -04:00
if moderation_type == "note":
embed_desc = "recieved a"
else:
embed_desc = "been"
embed = discord.Embed(title=str.title(moderation_type), description=f"You have {embed_desc} {moderation_type}{embed_duration} in {guild_name}.", color=await self.bot.get_embed_color(None), timestamp=datetime.now())
embed.add_field(name='Reason', value=f"`{reason}`")
embed.set_author(name=guild.name, icon_url=guild.icon.url)
embed.set_footer(text=f"Case #{await self.get_next_case_number(guild.id)}", icon_url="https://cdn.discordapp.com/attachments/1070822161389994054/1159469476773904414/arrow-right-circle-icon-512x512-2p1e2aaw.png?ex=65312319&is=651eae19&hm=3cebdd28e805c13a79ec48ef87c32ca532ffa6b9ede2e48d0cf8e5e81f3a6818&")
return embed
if embed_type == 'case':
target_user = await self.fetch_user_dict(interaction, case_dict['target_id'])
moderator_user = await self.fetch_user_dict(interaction, case_dict['moderator_id'])
target_name = f"`{target_user['name']}`" if target_user['discriminator'] == "0" else f"`{target_user['name']}#{target_user['discriminator']}`"
moderator_name = moderator_user['name'] if moderator_user['discriminator'] == "0" else f"{moderator_user['name']}#{moderator_user['discriminator']}"
embed = discord.Embed(title=f"📕 Case #{case_dict['moderation_id']}", color=await self.bot.get_embed_color(None))
embed.description = f"**Type:** {str.title(case_dict['moderation_type'])}\n**Target:** {target_name} ({target_user['id']})\n**Moderator:** {moderator_name} ({moderator_user['id']})\n**Resolved:** {bool(case_dict['resolved'])}\n**Timestamp:** <t:{case_dict['timestamp']}> | <t:{case_dict['timestamp']}:R>"
if case_dict['duration'] != 'NULL':
td = timedelta(**{unit: int(val) for unit, val in zip(["hours", "minutes", "seconds"], case_dict["duration"].split(":"))})
duration_embed = f"{humanize.precisedelta(td)} | <t:{case_dict['end_timestamp']}:R>" if case_dict["expired"] == '0' else str(humanize.precisedelta(td))
embed.description = embed.description + f"\n**Duration:** {duration_embed}\n**Expired:** {bool(case_dict['expired'])}"
embed.add_field(name='Reason', value=f"```{case_dict['reason']}```", inline=False)
if case_dict['resolved'] == 1:
resolved_user = await self.fetch_user_dict(interaction, case_dict['resolved_by'])
resolved_name = resolved_user['name'] if resolved_user['discriminator'] == "0" else f"{resolved_user['name']}#{resolved_user['discriminator']}"
embed.add_field(name='Resolve Reason', value=f"Resolved by {resolved_name} ({resolved_user['id']}) for:\n```{case_dict['resolve_reason']}```", inline=False)
return embed
2023-10-05 09:47:53 -04:00
raise(TypeError("'type' argument is invalid!"))
2023-10-05 10:46:52 -04:00
@app_commands.command(name="note")
async def note(self, interaction: discord.Interaction, target: discord.Member, reason: str, silent: bool = None):
2023-10-05 10:46:52 -04:00
"""Add a note to a user."""
await interaction.response.send_message(content=f"{target.mention} has recieved a note!\n**Reason** - `{reason}`")
if silent is None:
silent = not await self.config.guild(interaction.guild).dm_users()
if silent is False:
try:
embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='note', response=await interaction.original_response())
await target.send(embed=embed)
except discord.errors.HTTPException:
pass
2023-10-05 10:46:52 -04:00
await self.mysql_log(interaction.guild.id, interaction.user.id, 'NOTE', target.id, 'NULL', reason)
@app_commands.command(name="warn")
async def warn(self, interaction: discord.Interaction, target: discord.Member, reason: str, silent: bool = None):
2023-10-04 16:39:58 -04:00
"""Warn a user."""
await interaction.response.send_message(content=f"{target.mention} has been warned!\n**Reason** - `{reason}`")
if silent is None:
silent = not await self.config.guild(interaction.guild).dm_users()
if silent is False:
try:
embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='warned', response=await interaction.original_response())
await target.send(embed=embed)
except discord.errors.HTTPException:
pass
await self.mysql_log(interaction.guild.id, interaction.user.id, 'WARN', target.id, 'NULL', reason)
2023-10-04 16:39:58 -04:00
@app_commands.command(name="mute")
async def mute(self, interaction: discord.Interaction, target: discord.Member, duration: str, reason: str, silent: bool = None):
"""Mute a user."""
if target.is_timed_out() is True:
await interaction.response.send_message(f"{target.mention} is already muted!", allowed_mentions=discord.AllowedMentions(users=False), ephemeral=True)
return
try:
parsed_time = parse(sval=duration, as_timedelta=True, raise_exception=True)
except ValueError:
2023-10-05 09:47:53 -04:00
await interaction.response.send_message("Please provide a valid duration!", ephemeral=True)
return
2023-10-04 21:51:54 -04:00
if parsed_time.total_seconds() / 1000 > 2419200000:
await interaction.response.send_message("Please provide a duration that is less than 28 days.")
return
await target.timeout(parsed_time, reason=f"Muted by {interaction.user.id} for: {reason}")
await interaction.response.send_message(content=f"{target.mention} has been muted for {humanize.precisedelta(parsed_time)}!\n**Reason** - `{reason}`")
if silent is None:
silent = not await self.config.guild(interaction.guild).dm_users()
if silent is False:
try:
embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='muted', response=await interaction.original_response(), duration=parsed_time)
await target.send(embed=embed)
except discord.errors.HTTPException:
pass
await self.mysql_log(interaction.guild.id, interaction.user.id, 'MUTE', target.id, parsed_time, reason)
@app_commands.command(name="unmute")
async def unmute(self, interaction: discord.Interaction, target: discord.Member, reason: str = None, silent: bool = None):
2023-10-04 16:43:15 -04:00
"""Unmute a user."""
if target.is_timed_out() is False:
await interaction.response.send_message(f"{target.mention} is not muted!", allowed_mentions=discord.AllowedMentions(users=False), ephemeral=True)
2023-10-04 16:43:15 -04:00
return
if reason:
await target.timeout(None, reason=f"Unmuted by {interaction.user.id} for: {reason}")
else:
await target.timeout(None, reason=f"Unbanned by {interaction.user.id}")
2023-10-04 16:43:15 -04:00
reason = "No reason given."
await interaction.response.send_message(content=f"{target.mention} has been unmuted!\n**Reason** - `{reason}`")
if silent is None:
silent = not await self.config.guild(interaction.guild).dm_users()
if silent is False:
try:
embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='unmuted', response=await interaction.original_response())
await target.send(embed=embed)
except discord.errors.HTTPException:
pass
await self.mysql_log(interaction.guild.id, interaction.user.id, 'UNMUTE', target.id, 'NULL', reason)
@app_commands.command(name="kick")
async def kick(self, interaction: discord.Interaction, target: discord.Member, reason: str, silent: bool = None):
"""Kick a user."""
await interaction.response.send_message(content=f"{target.mention} has been kicked!\n**Reason** - `{reason}`")
if silent is None:
silent = not await self.config.guild(interaction.guild).dm_users()
if silent is False:
try:
embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='kicked', response=await interaction.original_response())
await target.send(embed=embed)
except discord.errors.HTTPException:
pass
await target.kick(f"Kicked by {interaction.user.id} for: {reason}")
await self.mysql_log(interaction.guild.id, interaction.user.id, 'KICK', target.id, 'NULL', reason)
2023-10-04 16:43:15 -04:00
@app_commands.command(name="ban")
@app_commands.choices(delete_messages=[
Choice(name="None", value=0),
Choice(name='1 Hour', value=3600),
Choice(name='12 Hours', value=43200),
Choice(name='1 Day', value=86400),
Choice(name='3 Days', value=259200),
Choice(name='7 Days', value=604800),
])
async def ban(self, interaction: discord.Interaction, target: discord.User, reason: str, duration: str = None, delete_messages: Choice[int] = 0):
"""Ban a user."""
try:
await interaction.guild.fetch_ban(target)
await interaction.response.send_message(content=f"{target.mention} is already banned!", ephemeral=True)
return
except discord.errors.NotFound:
pass
if duration:
try:
parsed_time = parse(sval=duration, as_timedelta=True, raise_exception=True)
except ValueError:
await interaction.response.send_message("Please provide a valid duration!", ephemeral=True)
return
await interaction.response.send_message(content=f"{target.mention} has been banned for {humanize.precisedelta(parsed_time)}!\n**Reason** - `{reason}`")
try:
embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='tempbanned', response=await interaction.original_response(), duration=parsed_time)
await target.send(embed=embed)
except discord.errors.HTTPException:
pass
await interaction.guild.ban(target, reason=f"Tempbanned by {interaction.user.id} for: {reason} (Duration: {parsed_time})", delete_message_seconds=delete_messages)
await self.mysql_log(interaction.guild.id, interaction.user.id, 'TEMPBAN', target.id, parsed_time, reason)
else:
await interaction.response.send_message(content=f"{target.mention} has been banned!\n**Reason** - `{reason}`")
if silent is None:
silent = not await self.config.guild(interaction.guild).dm_users()
if silent is False:
try:
embed = embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='banned', response=await interaction.original_response())
await target.send(embed=embed)
except discord.errors.HTTPException:
pass
await interaction.guild.ban(target, reason=f"Banned by {interaction.user.id} for: {reason}", delete_message_seconds=delete_messages)
await self.mysql_log(interaction.guild.id, interaction.user.id, 'BAN', target.id, 'NULL', reason)
@app_commands.command(name="unban")
async def unban(self, interaction: discord.Interaction, target: discord.User, reason: str = None):
"""Unban a user."""
try:
await interaction.guild.fetch_ban(target)
except discord.errors.NotFound:
await interaction.response.send_message(content=f"{target.mention} is not banned!", ephemeral=True)
return
if reason:
await interaction.guild.unban(target, reason=f"Unbanned by {interaction.user.id} for: {reason}")
else:
await interaction.guild.unban(target, reason=f"Unbanned by {interaction.user.id}")
reason = "No reason given."
await interaction.response.send_message(content=f"{target.mention} has been unbanned!\n**Reason** - `{reason}`")
if silent is None:
silent = not await self.config.guild(interaction.guild).dm_users()
if silent is False:
try:
embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='unbanned', response=await interaction.original_response())
await target.send(embed=embed)
except discord.errors.HTTPException:
pass
await self.mysql_log(interaction.guild.id, interaction.user.id, 'UNBAN', target.id, 'NULL', reason)
@app_commands.command(name="history")
async def history(self, interaction: discord.Interaction, target: discord.Member = None, moderator: discord.Member = None, pagesize: app_commands.Range[int, 1, 25] = 5, page: int = 1):
"""List previous infractions."""
database = await self.connect()
cursor = database.cursor()
if target:
query = """SELECT *
FROM moderation_%s
WHERE target_id = %s
ORDER BY moderation_id DESC;"""
cursor.execute(query, (interaction.guild.id, target.id))
elif moderator:
query = """SELECT *
FROM moderation_%s
WHERE moderator_id = %s
ORDER BY moderation_id DESC;"""
cursor.execute(query, (interaction.guild.id, moderator.id))
results = cursor.fetchall()
result_dict_list = []
for result in results:
case_dict = self.generate_dict(result)
result_dict_list.append(case_dict)
case_quantity = len(result_dict_list)
page_quantity = round(case_quantity / pagesize)
start_index = (page - 1) * pagesize
end_index = page * pagesize
embed = discord.Embed(color=await self.bot.get_embed_color(None))
embed.set_author(icon_url=interaction.guild.icon.url, name='Infraction History')
embed.set_footer(text=f"Page {page}/{page_quantity} | {case_quantity} Results")
for case in result_dict_list[start_index:end_index]:
target_user = await self.fetch_user_dict(interaction, case['target_id'])
moderator_user = await self.fetch_user_dict(interaction, case['moderator_id'])
target_name = f"`{target_user['name']}`" if target_user['discriminator'] == "0" else f"`{target_user['name']}#{target_user['discriminator']}`"
moderator_name = moderator_user['name'] if moderator_user['discriminator'] == "0" else f"{moderator_user['name']}#{moderator_user['discriminator']}"
field_name = f"Case #{case['moderation_id']} ({str.title(case['moderation_type'])})"
field_value = f"**Target:** `{target_name}` ({target_user['id']})\n**Moderator:** `{moderator_name}` ({moderator_user['id']})\n**Reason:** `{str(case['reason'])[:150]}`"
if case['duration'] != 'NULL':
td = timedelta(**{unit: int(val) for unit, val in zip(["hours", "minutes", "seconds"], case["duration"].split(":"))})
duration_embed = f"{humanize.precisedelta(td)} | <t:{case['end_timestamp']}:R>" if case["expired"] == '0' else f"{humanize.precisedelta(td)} | Expired"
field_value = field_value + f"\n**Duration:** {duration_embed}"
if bool(case['resolved']):
field_value = field_value + "\n**Resolved:** True"
embed.add_field(name=field_name, value=field_value, inline=False)
await interaction.response.send_message(embed=embed)
@app_commands.command(name="resolve")
async def resolve(self, interaction: discord.Interaction, case_number: int, reason: str = None):
"""Resolve a specific case."""
2023-10-06 09:11:22 -04:00
conf = await self.check_conf(['mysql_database'])
if conf:
raise(LookupError)
database = await self.connect()
cursor = database.cursor()
2023-10-06 09:11:22 -04:00
db = await self.config.mysql_database()
query_1 = "SELECT * FROM moderation_%s WHERE moderation_id = %s;"
cursor.execute(query_1, (interaction.guild.id, case_number))
result_1 = cursor.fetchone()
if result_1 is None:
await interaction.response.send_message(content=f"There is no moderation with a case number of {case_number}.", ephemeral=True)
return
query_2 = "SELECT * FROM moderation_%s WHERE moderation_id = %s AND resolved = 0;"
cursor.execute(query_2, (interaction.guild.id, case_number))
result_2 = cursor.fetchone()
if result_2 is None:
await interaction.response.send_message(content=f"This moderation has already been resolved!\nUse `/case {case_number}` for more information.", ephemeral=True)
return
case = self.generate_dict(result_2)
if reason is None:
reason = "No reason given."
if case['moderation_type'] in ['UNMUTE', 'UNBAN']:
await interaction.response.send_message(content="You cannot resolve this type of moderation!", ephemeral=True)
if case['moderation_type'] in ['MUTE', 'TEMPBAN', 'BAN']:
if case['moderation_type'] == 'MUTE':
try:
member = await interaction.guild.fetch_member(case['target_id'])
await member.timeout(None, reason=f"Case #{case_number} resolved by {interaction.user.id}")
2023-10-06 09:04:02 -04:00
except discord.NotFound:
pass
if case['moderation_type'] in ['TEMPBAN', 'BAN']:
try:
user = await interaction.client.fetch_user(case['target_id'])
await interaction.guild.unban(user, reason=f"Case #{case_number} resolved by {interaction.user.id}")
2023-10-06 09:04:02 -04:00
except discord.NotFound:
pass
resolve_query = f"UPDATE `{db}`.`moderation_{interaction.guild.id}` SET resolved = 1, expired = 1, resolved_by = %s, resolve_reason = %s WHERE moderation_id = %s"
else:
resolve_query = f"UPDATE `{db}`.`moderation_{interaction.guild.id}` SET resolved = 1, resolved_by = %s, resolve_reason = %s WHERE moderation_id = %s"
cursor.execute(resolve_query, (interaction.user.id, reason, case_number))
database.commit()
response_query = "SELECT * FROM moderation_%s WHERE moderation_id = %s;"
cursor.execute(response_query, (interaction.guild.id, case_number))
result = cursor.fetchone()
case_dict = self.generate_dict(result)
2023-10-06 09:36:23 -04:00
embed = await self.embed_factory('case', interaction=interaction, case_dict=case_dict)
await interaction.response.send_message(content=f"✅ Moderation #{case_number} resolved!", embed=embed)
cursor.close()
database.close()
2023-10-05 12:58:57 -04:00
@app_commands.command(name="case")
async def case(self, interaction: discord.Interaction, case_number: int, ephemeral: bool = False):
"""Check the details of a specific case."""
database = await self.connect()
cursor = database.cursor()
query = "SELECT * FROM moderation_%s WHERE moderation_id = %s;"
cursor.execute(query, (interaction.guild.id, case_number))
result = cursor.fetchone()
cursor.close()
database.close()
if result:
2023-10-06 09:04:02 -04:00
case = self.generate_dict(result)
2023-10-06 09:36:23 -04:00
embed = await self.embed_factory('case', interaction=interaction, case_dict=case)
2023-10-05 12:58:57 -04:00
await interaction.response.send_message(embed=embed, ephemeral=ephemeral)
else:
await interaction.response.send_message(content=f"No case with case number `{case_number}` found.", ephemeral=True)
@tasks.loop(minutes=1)
async def handle_expiry(self):
conf = await self.check_conf(['mysql_database'])
if conf:
raise(LookupError)
database = await self.connect()
cursor = database.cursor()
db = await self.config.mysql_database()
guilds: list[discord.Guild] = self.bot.guilds
for guild in guilds:
tempban_query = f"SELECT target_id, moderation_id FROM moderation_{guild.id} WHERE end_timestamp != 0 AND end_timestamp <= %s AND moderation_type = 'TEMPBAN' AND expired = 0"
try:
cursor.execute(tempban_query, (time.time(),))
result = cursor.fetchall()
except mysql.connector.errors.ProgrammingError:
continue
target_ids = [row[0] for row in result]
moderation_ids = [row[1] for row in result]
for target_id, moderation_id in zip(target_ids, moderation_ids):
user: discord.User = await self.bot.fetch_user(target_id)
await guild.unban(user, reason=f"Automatic unban from case #{moderation_id}")
embed = await self.embed_factory('message', guild, f'Automatic unban from case #{moderation_id}', 'unbanned')
try:
await user.send(embed=embed)
except discord.errors.HTTPException:
pass
expiry_query = f"UPDATE `{db}`.`moderation_{guild.id}` SET expired = 1 WHERE (end_timestamp != 0 AND end_timestamp <= %s AND expired = 0) OR (expired = 0 AND resolved = 1)"
cursor.execute(expiry_query, (time.time(),))
database.commit()
cursor.close()
database.close()
2023-10-05 12:58:57 -04:00
@commands.group(autohelp=True)
@checks.admin()
async def moderationset(self, ctx: commands.Context):
"""Manage moderation commands."""
@moderationset.command(name="ignorebots")
@checks.admin()
async def moderationset_ignorebots(self, ctx: commands.Context):
"""Toggle if the cog should ignore other bots' moderations."""
await self.config.guild(ctx.guild).ignore_other_bots.set(not await self.config.guild(ctx.guild).ignore_other_bots())
await ctx.send(f"Ignore bots setting set to {await self.config.guild(ctx.guild).ignore_other_bots()}")
@moderationset.command(name="dm")
@checks.admin()
async def moderationset_dm(self, ctx: commands.Context):
"""Toggle automatically messaging moderated users.
This option can be overridden by specifying the `silent` argument in any moderation command."""
await self.config.guild(ctx.guild).dm_users.set(not await self.config.guild(ctx.guild).dm_users())
await ctx.send(f"DM users setting set to {await self.config.guild(ctx.guild).dm_users()}")
@moderationset.command(name="mysql")
@checks.is_owner()
async def moderationset_mysql(self, ctx: commands.Context):
"""Configure MySQL connection details."""
await ctx.message.add_reaction("")
await ctx.author.send(content="Click the button below to configure your MySQL connection details.", view=self.ConfigButtons(60))
class ConfigButtons(discord.ui.View):
def __init__(self, timeout):
super().__init__()
self.config = Config.get_conf(None, cog_name='Moderation', identifier=481923957134912)
@discord.ui.button(label="Edit", style=discord.ButtonStyle.success)
async def config_button(self, interaction: discord.Interaction, button: discord.ui.Button): # pylint: disable=unused-argument
await interaction.response.send_modal(Moderation.MySQLConfigModal(self.config))
class MySQLConfigModal(discord.ui.Modal, title="MySQL Database Configuration"):
def __init__(self, config):
super().__init__()
self.config = config
address = discord.ui.TextInput(
label="Address",
placeholder="Input your MySQL address here.",
style=discord.TextStyle.short,
required=False,
max_length=300
)
database = discord.ui.TextInput(
label="Database",
placeholder="Input the name of your database here.",
style=discord.TextStyle.short,
required=False,
max_length=300
)
username = discord.ui.TextInput(
label="Username",
placeholder="Input your MySQL username here.",
style=discord.TextStyle.short,
required=False,
max_length=300
)
password = discord.ui.TextInput(
label="Password",
placeholder="Input your MySQL password here.",
style=discord.TextStyle.short,
required=False,
max_length=300
)
async def on_submit(self, interaction: discord.Interaction):
message = ""
if self.address.value != "":
await self.config.mysql_address.set(self.address.value)
message += f"- Address set to\n - `{self.address.value}`\n"
if self.database.value != "":
await self.config.mysql_database.set(self.database.value)
message += f"- Database set to\n - `{self.database.value}`\n"
if self.username.value != "":
await self.config.mysql_username.set(self.username.value)
message += f"- Username set to\n - `{self.username.value}`\n"
if self.password.value != "":
await self.config.mysql_password.set(self.password.value)
trimmed_password = self.password.value[:8]
message += f"- Password set to\n - `{trimmed_password}` - Trimmed for security\n"
if message == "":
trimmed_password = str(await self.config.mysql_password())[:8]
send = f"No changes were made.\nCurrent configuration:\n- Address:\n - `{await self.config.mysql_address()}`\n- Database:\n - `{await self.config.mysql_database()}`\n- Username:\n - `{await self.config.mysql_username()}`\n- Password:\n - `{trimmed_password}` - Trimmed for security"
else:
send = f"Configuration changed:\n{message}"
await interaction.response.send_message(send, ephemeral=True)
2023-10-04 21:32:33 -04:00
@commands.command(aliases=["tdc"])
async def timedeltaconvert(self, ctx: commands.Context, *, duration: str):
"""This command converts a duration to a [`timedelta`](https://docs.python.org/3/library/datetime.html#datetime.timedelta) Python object.
**Example usage**
`[p]timedeltaconvert 1 day 15hr 82 minutes 52s`
**Output**
`1 day, 16:22:52`"""
try:
parsed_time = parse(duration, as_timedelta=True, raise_exception=True)
await ctx.send(f"`{str(parsed_time)}`")
except ValueError:
await ctx.send("Please provide a convertible value!")