SeaCogs/moderation/moderation.py

1774 lines
83 KiB
Python

# _____ _
# / ____| (_)
# | (___ ___ __ _ _____ ___ _ __ ___ _ __ ___ ___ _ __
# \___ \ / _ \/ _` / __\ \ /\ / / | '_ ` _ \| '_ ` _ \ / _ \ '__|
# ____) | __/ (_| \__ \\ V V /| | | | | | | | | | | | __/ |
# |_____/ \___|\__,_|___/ \_/\_/ |_|_| |_| |_|_| |_| |_|\___|_|
import logging
import json
import time
import os
from datetime import datetime, timedelta, timezone
from typing import Union
import discord
import humanize
import mysql.connector
from discord.ext import tasks
from pytimeparse2 import disable_dateutil, parse
from redbot.core import app_commands, checks, Config, commands, data_manager
from redbot.core.app_commands import Choice
class Moderation(commands.Cog):
"""Custom moderation cog.
Developed by SeaswimmerTheFsh."""
async def red_delete_data_for_user(self, *, requester, user_id: int):
if requester == "discord_deleted_user":
await self.config.user_from_id(user_id).clear()
database = await self.connect()
cursor = database.cursor()
cursor.execute("SHOW TABLES;")
tables = [table[0] for table in cursor.fetchall()]
condition = "target_id = %s OR moderator_id = %s;"
for table in tables:
delete_query = f"DELETE FROM {table[0]} WHERE {condition}"
cursor.execute(delete_query, (user_id, user_id))
database.commit()
cursor.close()
database.close()
if requester == "owner":
await self.config.user_from_id(user_id).clear()
if requester == "user":
await self.config.user_from_id(user_id).clear()
if requester == "user_strict":
await self.config.user_from_id(user_id).clear()
else:
self.logger.warning("Invalid requester passed to red_delete_data_for_user: %s", requester)
def __init__(self, bot):
self.bot = bot
self.config = Config.get_conf(self, identifier=481923957134912)
self.config.register_global(
mysql_address= " ",
mysql_database = " ",
mysql_username = " ",
mysql_password = " "
)
self.config.register_guild(
use_discord_permissions = True,
ignore_other_bots = True,
dm_users = True,
log_channel = " ",
immune_roles = [],
history_ephemeral = False,
history_inline = False,
history_pagesize = 5,
history_inline_pagesize = 6,
blacklist_roles = []
)
self.config.register_user(
history_ephemeral = None,
history_inline = None,
history_pagesize = None,
history_inline_pagesize = None
)
disable_dateutil()
self.handle_expiry.start() # pylint: disable=no-member
self.logger = logging.getLogger('red.seaswimmerthefsh.moderation')
async def cog_load(self):
"""This method prepares the database schema for all of the guilds the bot is currently in."""
conf = await self.check_conf([
'mysql_address',
'mysql_database',
'mysql_username',
'mysql_password'
])
if conf:
self.logger.fatal("Failed to create tables, due to MySQL connection configuration being unset.")
return
guilds: list[discord.Guild] = self.bot.guilds
try:
for guild in guilds:
if not await self.bot.cog_disabled_in_guild(self, guild):
await self.create_guild_table(guild)
except ConnectionRefusedError:
return
async def cog_unload(self):
self.handle_expiry.cancel() # pylint: disable=no-member
@commands.Cog.listener('on_guild_join')
async def db_generate_guild_join(self, guild: discord.Guild):
"""This method prepares the database schema whenever the bot joins a guild."""
if not await self.bot.cog_disabled_in_guild(self, guild):
conf = await self.check_conf([
'mysql_address',
'mysql_database',
'mysql_username',
'mysql_password'
])
if conf:
self.logger.error("Failed to create a table for %s, due to MySQL connection configuration being unset.", guild.id)
return
try:
await self.create_guild_table(guild)
except ConnectionRefusedError:
return
@commands.Cog.listener('on_audit_log_entry_create')
async def autologger(self, entry: discord.AuditLogEntry):
"""This method automatically logs moderations done by users manually ("right clicks")."""
if not await self.bot.cog_disabled_in_guild(self, entry.guild):
if await self.config.guild(entry.guild).ignore_other_bots() is True:
if entry.user.bot or entry.target.bot:
return
else:
if entry.user.id == self.bot.user.id:
return
duration = "NULL"
if entry.reason:
reason = entry.reason + " (This action was performed without the bot.)"
else:
reason = "This action was performed without the bot."
if entry.action == discord.AuditLogAction.kick:
moderation_type = 'KICK'
elif entry.action == discord.AuditLogAction.ban:
moderation_type = 'BAN'
elif entry.action == discord.AuditLogAction.unban:
moderation_type = 'UNBAN'
elif entry.action == discord.AuditLogAction.member_update:
if entry.after.timed_out_until is not None:
timed_out_until_aware = entry.after.timed_out_until.replace(tzinfo=timezone.utc)
duration_datetime = timed_out_until_aware - datetime.now(tz=timezone.utc)
minutes = round(duration_datetime.total_seconds() / 60)
duration = timedelta(minutes=minutes)
moderation_type = 'MUTE'
else:
moderation_type = 'UNMUTE'
else:
return
await self.mysql_log(entry.guild.id, entry.user.id, moderation_type, entry.target.id, 0, duration, reason)
async def connect(self):
"""Connects to the MySQL database, and returns a connection object."""
conf = await self.check_conf([
'mysql_address',
'mysql_database',
'mysql_username',
'mysql_password'
])
if conf:
raise LookupError("MySQL connection details not set properly!")
try:
connection = mysql.connector.connect(
host=await self.config.mysql_address(),
user=await self.config.mysql_username(),
password=await self.config.mysql_password(),
database=await self.config.mysql_database()
)
return connection
except mysql.connector.ProgrammingError as e:
self.logger.fatal("Unable to access the MySQL database!\nError:\n%s", e.msg)
raise ConnectionRefusedError(f"Unable to access the MySQL Database!\n{e.msg}") from e
async def create_guild_table(self, guild: discord.Guild):
database = await self.connect()
cursor = database.cursor()
try:
cursor.execute(f"SELECT * FROM `moderation_{guild.id}`")
self.logger.info("MySQL Table exists for server %s (%s)", guild.name, guild.id)
except mysql.connector.errors.ProgrammingError:
query = f"""
CREATE TABLE `moderation_{guild.id}` (
moderation_id INT UNIQUE PRIMARY KEY NOT NULL,
timestamp INT NOT NULL,
moderation_type LONGTEXT NOT NULL,
target_id LONGTEXT NOT NULL,
moderator_id LONGTEXT NOT NULL,
role_id LONGTEXT,
duration LONGTEXT,
end_timestamp INT,
reason LONGTEXT,
resolved BOOL NOT NULL,
resolved_by LONGTEXT,
resolve_reason LONGTEXT,
expired BOOL NOT NULL,
changes JSON NOT NULL
)
"""
cursor.execute(query)
index_query_1 = "CREATE INDEX idx_target_id ON moderation_%s(target_id(25));"
cursor.execute(index_query_1, (guild.id,))
index_query_2 = "CREATE INDEX idx_moderator_id ON moderation_%s(moderator_id(25));"
cursor.execute(index_query_2, (guild.id,))
index_query_3 = "CREATE INDEX idx_moderation_id ON moderation_%s(moderation_id);"
cursor.execute(index_query_3, (guild.id,))
insert_query = f"""
INSERT INTO `moderation_{guild.id}`
(moderation_id, timestamp, moderation_type, target_id, moderator_id, role_id, duration, end_timestamp, reason, resolved, resolved_by, resolve_reason, expired, changes)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
insert_values = (0, 0, "NULL", 0, 0, 0, "NULL", 0, "NULL", 0, "NULL", "NULL", 0, json.dumps([]))
cursor.execute(insert_query, insert_values)
database.commit()
self.logger.info("MySQL Table (moderation_%s) created for %s (%s)", guild.id, guild.name, guild.id)
database.close()
async def check_conf(self, config: list):
"""Checks if any required config options are not set."""
not_found_list = []
for item in config:
if await self.config.item() == " ":
not_found_list.append(item)
return not_found_list
def check_permissions(self, user: discord.User, permissions: list, ctx: Union[commands.Context, discord.Interaction] = None, guild: discord.Guild = None):
"""Checks if a user has a specific permission (or a list of permissions) in a channel."""
if ctx:
member = ctx.guild.get_member(user.id)
resolved_permissions = ctx.channel.permissions_for(member)
elif guild:
member = guild.get_member(user.id)
resolved_permissions = member.guild_permissions
else:
raise(KeyError)
for permission in permissions:
if not getattr(resolved_permissions, permission, False) and not resolved_permissions.administrator is True:
return permission
return False
async def check_moddable(self, target: Union[discord.User, discord.Member], interaction: discord.Interaction, permissions: list):
"""Checks if a moderator can moderate a target."""
if self.check_permissions(interaction.client.user, permissions, guild=interaction.guild):
await interaction.response.send_message(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True)
return False
if await self.config.guild(interaction.guild).use_discord_permissions() is True:
if self.check_permissions(interaction.user, permissions, guild=interaction.guild):
await interaction.response.send_message(f"You do not have the `{permissions}` permission, required for this action.", ephemeral=True)
return False
if interaction.user.id == target.id:
await interaction.response.send_message(content="You cannot moderate yourself!", ephemeral=True)
return False
if target.bot:
await interaction.response.send_message(content="You cannot moderate bots!", ephemeral=True)
return False
if isinstance(target, discord.Member):
if interaction.user.top_role <= target.top_role:
await interaction.response.send_message(content="You cannot moderate members with a higher role than you!", ephemeral=True)
return False
if interaction.guild.get_member(interaction.client.user.id).top_role <= target.top_role:
await interaction.response.send_message(content="You cannot moderate members with a role higher than the bot!", ephemeral=True)
return False
immune_roles = await self.config.guild(target.guild).immune_roles()
for role in target.roles:
if role.id in immune_roles:
await interaction.response.send_message(content="You cannot moderate members with an immune role!", ephemeral=True)
return False
return True
async def mysql_log(self, guild_id: str, author_id: str, moderation_type: str, target_id: int, role_id: int, duration, reason: str, database: mysql.connector.MySQLConnection = None, timestamp: int = None, resolved: bool = False, resolved_by: str = None, resolved_reason: str = None, expired: bool = None, changes: list = []):
if not timestamp:
timestamp = int(time.time())
if duration != "NULL":
end_timedelta = datetime.fromtimestamp(timestamp) + duration
end_timestamp = int(end_timedelta.timestamp())
else:
end_timestamp = 0
if not expired:
if int(time.time()) > end_timestamp:
expired = 1
else:
expired = 0
if resolved_by is None:
resolved_by = "NULL"
if resolved_reason is None:
resolved_reason = "NULL"
if not database:
database = await self.connect()
close_db = True
else:
close_db = False
cursor = database.cursor()
moderation_id = await self.get_next_case_number(guild_id=guild_id, cursor=cursor)
sql = f"INSERT INTO `moderation_{guild_id}` (moderation_id, timestamp, moderation_type, target_id, moderator_id, role_id, duration, end_timestamp, reason, resolved, resolved_by, resolve_reason, expired, changes) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
val = (moderation_id, timestamp, moderation_type, target_id, author_id, role_id, duration, end_timestamp, reason, int(resolved), resolved_by, resolved_reason, expired, json.dumps(changes))
cursor.execute(sql, val)
cursor.close()
database.commit()
if close_db:
database.close()
self.logger.debug("MySQL row inserted into moderation_%s!\n%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s", guild_id, moderation_id, timestamp, moderation_type, target_id, author_id, role_id, duration, end_timestamp, reason, int(resolved), resolved_by, resolved_reason, expired, changes)
return moderation_id
async def get_next_case_number(self, guild_id: str, cursor = None):
"""This method returns the next case number from the MySQL table for a specific guild."""
if not cursor:
database = await self.connect()
cursor = database.cursor()
cursor.execute(f"SELECT moderation_id FROM `moderation_{guild_id}` ORDER BY moderation_id DESC LIMIT 1")
return cursor.fetchone()[0] + 1
def generate_dict(self, result):
case: dict = {
"moderation_id": result[0],
"timestamp": result[1],
"moderation_type": result[2],
"target_id": result[3],
"moderator_id": result[4],
"role_id": result[5],
"duration": result[6],
"end_timestamp": result[7],
"reason": result[8],
"resolved": result[9],
"resolved_by": result[10],
"resolve_reason": result[11],
"expired": result[12],
"changes": json.loads(result[13])
}
return case
async def fetch_user_dict(self, interaction: discord.Interaction, user_id: str):
"""This method returns a dictionary containing either user information or a standard deleted user template."""
if user_id == '?':
user_dict = {
'id': '?',
'name': 'Unknown User',
'discriminator':'0'
}
else:
try:
user = interaction.client.get_user(user_id)
if user is None:
user = await interaction.client.fetch_user(user_id)
user_dict = {
'id': user.id,
'name': user.name,
'discriminator': user.discriminator
}
except discord.errors.NotFound:
user_dict = {
'id': user_id,
'name': 'Deleted User',
'discriminator': '0'
}
return user_dict
async def fetch_role_dict(self, interaction: discord.Interaction, role_id: str):
"""This method returns a dictionary containing either role information or a standard deleted role template."""
try:
role = interaction.guild.get_role(role_id)
role_dict = {
'id': role.id,
'name': role.name
}
except discord.errors.NotFound:
role_dict = {
'id': role_id,
'name': 'Deleted Role'
}
return role_dict
async def embed_factory(self, embed_type: str, /, interaction: discord.Interaction = None, case_dict: dict = None, guild: discord.Guild = None, reason: str = None, moderation_type: str = None, response: discord.InteractionMessage = None, duration: timedelta = None, resolved: bool = False):
"""This method creates an embed from set parameters, meant for either moderation logging or contacting the moderated user.
Valid arguments for 'embed_type':
- 'message'
- 'log' - WIP
- 'case'
- 'changes'
Required arguments for 'message':
- guild
- reason
- moderation_type
- response
- duration (optional)
Required arguments for 'log':
- interaction
- case_dict
- resolved (optional)
Required arguments for 'case' & 'changes':
- interaction
- case_dict"""
if embed_type == 'message':
if moderation_type in ["kicked", "banned", "tempbanned", "unbanned"]:
guild_name = guild.name
else:
guild_name = f"[{guild.name}]({response.jump_url})"
if moderation_type in ["tempbanned", "muted"] and duration:
embed_duration = f" for {humanize.precisedelta(duration)}"
else:
embed_duration = ""
if moderation_type == "note":
embed_desc = "received a"
else:
embed_desc = "been"
embed = discord.Embed(title=str.title(moderation_type), description=f"You have {embed_desc} {moderation_type}{embed_duration} in {guild_name}.", color=await self.bot.get_embed_color(None), timestamp=datetime.now())
embed.add_field(name='Reason', value=f"`{reason}`")
embed.set_author(name=guild.name, icon_url=guild.icon.url)
embed.set_footer(text=f"Case #{await self.get_next_case_number(guild.id)}", icon_url="https://cdn.discordapp.com/attachments/1070822161389994054/1159469476773904414/arrow-right-circle-icon-512x512-2p1e2aaw.png?ex=65312319&is=651eae19&hm=3cebdd28e805c13a79ec48ef87c32ca532ffa6b9ede2e48d0cf8e5e81f3a6818&")
return embed
if embed_type == 'case':
target_user = await self.fetch_user_dict(interaction, case_dict['target_id'])
moderator_user = await self.fetch_user_dict(interaction, case_dict['moderator_id'])
target_name = f"`{target_user['name']}`" if target_user['discriminator'] == "0" else f"`{target_user['name']}#{target_user['discriminator']}`"
moderator_name = f"`{moderator_user['name']}`" if moderator_user['discriminator'] == "0" else f"`{moderator_user['name']}#{moderator_user['discriminator']}`"
embed = discord.Embed(title=f"📕 Case #{case_dict['moderation_id']}", color=await self.bot.get_embed_color(None))
embed.description = f"**Type:** {str.title(case_dict['moderation_type'])}\n**Target:** {target_name} ({target_user['id']})\n**Moderator:** {moderator_name} ({moderator_user['id']})\n**Resolved:** {bool(case_dict['resolved'])}\n**Timestamp:** <t:{case_dict['timestamp']}> | <t:{case_dict['timestamp']}:R>"
if case_dict['duration'] != 'NULL':
td = timedelta(**{unit: int(val) for unit, val in zip(["hours", "minutes", "seconds"], case_dict["duration"].split(":"))})
duration_embed = f"{humanize.precisedelta(td)} | <t:{case_dict['end_timestamp']}:R>" if bool(case_dict['expired']) is False else str(humanize.precisedelta(td))
embed.description += f"\n**Duration:** {duration_embed}\n**Expired:** {bool(case_dict['expired'])}"
embed.description += f"\n**Changes:** {len(case_dict['changes'])}"
embed.add_field(name='Reason', value=f"```{case_dict['reason']}```", inline=False)
if case_dict['resolved'] == 1:
resolved_user = await self.fetch_user_dict(interaction, case_dict['resolved_by'])
resolved_name = f"`{resolved_user['name']}`" if resolved_user['discriminator'] == "0" else f"`{resolved_user['name']}#{resolved_user['discriminator']}`"
embed.add_field(name='Resolve Reason', value=f"Resolved by {resolved_name} ({resolved_user['id']}) for:\n```{case_dict['resolve_reason']}```", inline=False)
return embed
if embed_type == 'changes':
embed = discord.Embed(title=f"📕 Case #{case_dict['moderation_id']} Changes", color=await self.bot.get_embed_color(None))
memory_dict = {}
if case_dict['changes']:
for change in case_dict['changes']:
if change['user_id'] not in memory_dict:
memory_dict[str(change['user_id'])] = await self.fetch_user_dict(interaction, change['user_id'])
user = memory_dict[str(change['user_id'])]
name = user['name'] if user['discriminator'] == "0" else f"{user['name']}#{user['discriminator']}"
timestamp = f"<t:{change['timestamp']}> | <t:{change['timestamp']}:R>"
if change['type'] == 'ORIGINAL':
embed.add_field(name='Original', value=f"**User:** `{name}` ({user['id']})\n**Reason:** {change['reason']}\n**Timestamp:** {timestamp}", inline=False)
elif change['type'] == 'EDIT':
embed.add_field(name='Edit', value=f"**User:** `{name}` ({user['id']})\n**Reason:** {change['reason']}\n**Timestamp:** {timestamp}", inline=False)
elif change['type'] == 'RESOLVE':
embed.add_field(name='Resolve', value=f"**User:** `{name}` ({user['id']})\n**Reason:** {change['reason']}\n**Timestamp:** {timestamp}", inline=False)
else:
embed.description = "*No changes have been made to this case.* 🙁"
return embed
if embed_type == 'log':
if resolved:
target_user = await self.fetch_user_dict(interaction, case_dict['target_id'])
moderator_user = await self.fetch_user_dict(interaction, case_dict['moderator_id'])
target_name = f"`{target_user['name']}`" if target_user['discriminator'] == "0" else f"`{target_user['name']}#{target_user['discriminator']}`"
moderator_name = moderator_user['name'] if moderator_user['discriminator'] == "0" else f"{moderator_user['name']}#{moderator_user['discriminator']}"
embed = discord.Embed(title=f"📕 Case #{case_dict['moderation_id']} Resolved", color=await self.bot.get_embed_color(None))
embed.description = f"**Type:** {str.title(case_dict['moderation_type'])}\n**Target:** {target_name} ({target_user['id']})\n**Moderator:** {moderator_name} ({moderator_user['id']})\n**Timestamp:** <t:{case_dict['timestamp']}> | <t:{case_dict['timestamp']}:R>"
if case_dict['duration'] != 'NULL':
td = timedelta(**{unit: int(val) for unit, val in zip(["hours", "minutes", "seconds"], case_dict["duration"].split(":"))})
duration_embed = f"{humanize.precisedelta(td)} | <t:{case_dict['end_timestamp']}:R>" if case_dict["expired"] == '0' else str(humanize.precisedelta(td))
embed.description = embed.description + f"\n**Duration:** {duration_embed}\n**Expired:** {bool(case_dict['expired'])}"
embed.add_field(name='Reason', value=f"```{case_dict['reason']}```", inline=False)
resolved_user = await self.fetch_user_dict(interaction, case_dict['resolved_by'])
resolved_name = resolved_user['name'] if resolved_user['discriminator'] == "0" else f"{resolved_user['name']}#{resolved_user['discriminator']}"
embed.add_field(name='Resolve Reason', value=f"Resolved by {resolved_name} ({resolved_user['id']}) for:\n```{case_dict['resolve_reason']}```", inline=False)
else:
target_user = await self.fetch_user_dict(interaction, case_dict['target_id'])
moderator_user = await self.fetch_user_dict(interaction, case_dict['moderator_id'])
target_name = f"`{target_user['name']}`" if target_user['discriminator'] == "0" else f"`{target_user['name']}#{target_user['discriminator']}`"
moderator_name = moderator_user['name'] if moderator_user['discriminator'] == "0" else f"{moderator_user['name']}#{moderator_user['discriminator']}"
embed = discord.Embed(title=f"📕 Case #{case_dict['moderation_id']}", color=await self.bot.get_embed_color(None))
embed.description = f"**Type:** {str.title(case_dict['moderation_type'])}\n**Target:** {target_name} ({target_user['id']})\n**Moderator:** {moderator_name} ({moderator_user['id']})\n**Timestamp:** <t:{case_dict['timestamp']}> | <t:{case_dict['timestamp']}:R>"
if case_dict['duration'] != 'NULL':
td = timedelta(**{unit: int(val) for unit, val in zip(["hours", "minutes", "seconds"], case_dict["duration"].split(":"))})
embed.description = embed.description + f"\n**Duration:** {humanize.precisedelta(td)} | <t:{case_dict['end_timestamp']}:R>"
embed.add_field(name='Reason', value=f"```{case_dict['reason']}```", inline=False)
return embed
raise(TypeError("'type' argument is invalid!"))
async def fetch_case(self, moderation_id: int, guild_id: str):
"""This method fetches a case from the database and returns the case's dictionary."""
database = await self.connect()
cursor = database.cursor()
query = "SELECT * FROM moderation_%s WHERE moderation_id = %s;"
cursor.execute(query, (guild_id, moderation_id))
result = cursor.fetchone()
cursor.close()
database.close()
return self.generate_dict(result)
async def log(self, interaction: discord.Interaction, moderation_id: int, resolved: bool = False):
"""This method sends a message to the guild's configured logging channel when an infraction takes place."""
logging_channel_id = await self.config.guild(interaction.guild).log_channel()
if logging_channel_id != " ":
logging_channel = interaction.guild.get_channel(logging_channel_id)
case = await self.fetch_case(moderation_id, interaction.guild.id)
if case:
embed = await self.embed_factory('log', interaction=interaction, case_dict=case, resolved=resolved)
try:
await logging_channel.send(embed=embed)
except discord.errors.Forbidden:
return
#######################################################################################################################
### COMMANDS
#######################################################################################################################
@app_commands.command(name="note")
async def note(self, interaction: discord.Interaction, target: discord.User, reason: str, silent: bool = None):
"""Add a note to a user.
Parameters
-----------
target: discord.User
Who are you noting?
reason: str
Why are you noting this user?
silent: bool
Should the user be messaged?"""
if not await self.check_moddable(target, interaction, ['moderate_members']):
return
await interaction.response.send_message(content=f"{target.mention} has recieved a note!\n**Reason** - `{reason}`")
if silent is None:
silent = not await self.config.guild(interaction.guild).dm_users()
if silent is False:
try:
embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='note', response=await interaction.original_response())
await target.send(embed=embed)
except discord.errors.HTTPException:
pass
moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'NOTE', target.id, 0, 'NULL', reason)
await interaction.edit_original_response(content=f"{target.mention} has received a note! (Case `#{moderation_id}`)\n**Reason** - `{reason}`")
await self.log(interaction, moderation_id)
@app_commands.command(name="warn")
async def warn(self, interaction: discord.Interaction, target: discord.Member, reason: str, silent: bool = None):
"""Warn a user.
Parameters
-----------
target: discord.Member
Who are you warning?
reason: str
Why are you warning this user?
silent: bool
Should the user be messaged?"""
if not await self.check_moddable(target, interaction, ['moderate_members']):
return
await interaction.response.send_message(content=f"{target.mention} has been warned!\n**Reason** - `{reason}`")
if silent is None:
silent = not await self.config.guild(interaction.guild).dm_users()
if silent is False:
try:
embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='warned', response=await interaction.original_response())
await target.send(embed=embed)
except discord.errors.HTTPException:
pass
moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'WARN', target.id, 0, 'NULL', reason)
await interaction.edit_original_response(content=f"{target.mention} has been warned! (Case `#{moderation_id}`)\n**Reason** - `{reason}`")
await self.log(interaction, moderation_id)
# @app_commands.group(name="blacklist")
# async def blacklist(self, interaction: discord.Interaction):
# """Blacklist users."""
# @blacklist.command(name="add")
# async def blacklist_add(self, interaction: discord.Interaction, target: discord.Member, reason: str, silent: bool = None):
# """"""
@app_commands.command(name="mute")
async def mute(self, interaction: discord.Interaction, target: discord.Member, duration: str, reason: str, silent: bool = None):
"""Mute a user.
Parameters
-----------
target: discord.Member
Who are you unbanning?
duration: str
How long are you muting this user for?
reason: str
Why are you unbanning this user?
silent: bool
Should the user be messaged?"""
if not await self.check_moddable(target, interaction, ['moderate_members']):
return
if target.is_timed_out() is True:
await interaction.response.send_message(f"{target.mention} is already muted!", allowed_mentions=discord.AllowedMentions(users=False), ephemeral=True)
return
try:
parsed_time = parse(sval=duration, as_timedelta=True, raise_exception=True)
except ValueError:
await interaction.response.send_message("Please provide a valid duration!", ephemeral=True)
return
if parsed_time.total_seconds() / 1000 > 2419200000:
await interaction.response.send_message("Please provide a duration that is less than 28 days.")
return
await target.timeout(parsed_time, reason=f"Muted by {interaction.user.id} for: {reason}")
await interaction.response.send_message(content=f"{target.mention} has been muted for {humanize.precisedelta(parsed_time)}!\n**Reason** - `{reason}`")
if silent is None:
silent = not await self.config.guild(interaction.guild).dm_users()
if silent is False:
try:
embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='muted', response=await interaction.original_response(), duration=parsed_time)
await target.send(embed=embed)
except discord.errors.HTTPException:
pass
moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'MUTE', target.id, 0, parsed_time, reason)
await interaction.edit_original_response(content=f"{target.mention} has been muted for {humanize.precisedelta(parsed_time)}! (Case `#{moderation_id}`)\n**Reason** - `{reason}`")
await self.log(interaction, moderation_id)
@app_commands.command(name="unmute")
async def unmute(self, interaction: discord.Interaction, target: discord.Member, reason: str = None, silent: bool = None):
"""Unmute a user.
Parameters
-----------
target: discord.user
Who are you unmuting?
reason: str
Why are you unmuting this user?
silent: bool
Should the user be messaged?"""
if not await self.check_moddable(target, interaction, ['moderate_members']):
return
if target.is_timed_out() is False:
await interaction.response.send_message(f"{target.mention} is not muted!", allowed_mentions=discord.AllowedMentions(users=False), ephemeral=True)
return
if reason:
await target.timeout(None, reason=f"Unmuted by {interaction.user.id} for: {reason}")
else:
await target.timeout(None, reason=f"Unbanned by {interaction.user.id}")
reason = "No reason given."
await interaction.response.send_message(content=f"{target.mention} has been unmuted!\n**Reason** - `{reason}`")
if silent is None:
silent = not await self.config.guild(interaction.guild).dm_users()
if silent is False:
try:
embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='unmuted', response=await interaction.original_response())
await target.send(embed=embed)
except discord.errors.HTTPException:
pass
moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'UNMUTE', target.id, 0, 'NULL', reason)
await interaction.edit_original_response(content=f"{target.mention} has been unmuted! (Case `#{moderation_id}`)\n**Reason** - `{reason}`")
await self.log(interaction, moderation_id)
@app_commands.command(name="kick")
async def kick(self, interaction: discord.Interaction, target: discord.Member, reason: str, silent: bool = None):
"""Kick a user.
Parameters
-----------
target: discord.user
Who are you kicking?
reason: str
Why are you kicking this user?
silent: bool
Should the user be messaged?"""
if not await self.check_moddable(target, interaction, ['kick_members']):
return
await interaction.response.send_message(content=f"{target.mention} has been kicked!\n**Reason** - `{reason}`")
if silent is None:
silent = not await self.config.guild(interaction.guild).dm_users()
if silent is False:
try:
embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='kicked', response=await interaction.original_response())
await target.send(embed=embed)
except discord.errors.HTTPException:
pass
await target.kick(f"Kicked by {interaction.user.id} for: {reason}")
moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'KICK', target.id, 0, 'NULL', reason)
await interaction.edit_original_response(content=f"{target.mention} has been kicked! (Case `#{moderation_id}`)\n**Reason** - `{reason}`")
await self.log(interaction, moderation_id)
@app_commands.command(name="ban")
@app_commands.choices(delete_messages=[
Choice(name="None", value=0),
Choice(name='1 Hour', value=3600),
Choice(name='12 Hours', value=43200),
Choice(name='1 Day', value=86400),
Choice(name='3 Days', value=259200),
Choice(name='7 Days', value=604800),
])
async def ban(self, interaction: discord.Interaction, target: discord.User, reason: str, duration: str = None, delete_messages: Choice[int] = 0, silent: bool = None):
"""Ban a user.
Parameters
-----------
target: discord.user
Who are you banning?
duration: str
How long are you banning this user for?
reason: str
Why are you banning this user?
delete_messages: Choices[int]
How many days of messages to delete?
silent: bool
Should the user be messaged?"""
if not await self.check_moddable(target, interaction, ['ban_members']):
return
try:
await interaction.guild.fetch_ban(target)
await interaction.response.send_message(content=f"{target.mention} is already banned!", ephemeral=True)
return
except discord.errors.NotFound:
pass
if duration:
try:
parsed_time = parse(sval=duration, as_timedelta=True, raise_exception=True)
except ValueError:
await interaction.response.send_message("Please provide a valid duration!", ephemeral=True)
return
await interaction.response.send_message(content=f"{target.mention} has been banned for {humanize.precisedelta(parsed_time)}!\n**Reason** - `{reason}`")
try:
embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='tempbanned', response=await interaction.original_response(), duration=parsed_time)
await target.send(embed=embed)
except discord.errors.HTTPException:
pass
await interaction.guild.ban(target, reason=f"Tempbanned by {interaction.user.id} for: {reason} (Duration: {parsed_time})", delete_message_seconds=delete_messages)
moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'TEMPBAN', target.id, 0, parsed_time, reason)
await interaction.edit_original_response(content=f"{target.mention} has been banned for {humanize.precisedelta(parsed_time)}! (Case `#{moderation_id}`)\n**Reason** - `{reason}`")
await self.log(interaction, moderation_id)
else:
await interaction.response.send_message(content=f"{target.mention} has been banned!\n**Reason** - `{reason}`")
if silent is None:
silent = not await self.config.guild(interaction.guild).dm_users()
if silent is False:
try:
embed = embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='banned', response=await interaction.original_response())
await target.send(embed=embed)
except discord.errors.HTTPException:
pass
await interaction.guild.ban(target, reason=f"Banned by {interaction.user.id} for: {reason}", delete_message_seconds=delete_messages)
moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'BAN', target.id, 0, 'NULL', reason)
await interaction.edit_original_response(content=f"{target.mention} has been banned! (Case `#{moderation_id}`)\n**Reason** - `{reason}`")
await self.log(interaction, moderation_id)
@app_commands.command(name="unban")
async def unban(self, interaction: discord.Interaction, target: discord.User, reason: str = None, silent: bool = None):
"""Unban a user.
Parameters
-----------
target: discord.user
Who are you unbanning?
reason: str
Why are you unbanning this user?
silent: bool
Should the user be messaged?"""
if not await self.check_moddable(target, interaction, ['ban_members']):
return
try:
await interaction.guild.fetch_ban(target)
except discord.errors.NotFound:
await interaction.response.send_message(content=f"{target.mention} is not banned!", ephemeral=True)
return
if reason:
await interaction.guild.unban(target, reason=f"Unbanned by {interaction.user.id} for: {reason}")
else:
await interaction.guild.unban(target, reason=f"Unbanned by {interaction.user.id}")
reason = "No reason given."
await interaction.response.send_message(content=f"{target.mention} has been unbanned!\n**Reason** - `{reason}`")
if silent is None:
silent = not await self.config.guild(interaction.guild).dm_users()
if silent is False:
try:
embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='unbanned', response=await interaction.original_response())
await target.send(embed=embed)
except discord.errors.HTTPException:
pass
moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'UNBAN', target.id, 0, 'NULL', reason)
await interaction.edit_original_response(content=f"{target.mention} has been unbanned! (Case `#{moderation_id}`)\n**Reason** - `{reason}`")
await self.log(interaction, moderation_id)
@app_commands.command(name="history")
async def history(self, interaction: discord.Interaction, target: discord.User = None, moderator: discord.User = None, pagesize: app_commands.Range[int, 1, 25] = None, page: int = 1, ephemeral: bool = None, inline: bool = None, export: bool = False):
"""List previous infractions.
Parameters
-----------
target: discord.User
User whose infractions to query, overrides moderator if both are given
moderator: discord.User
Query by moderator
pagesize: app_commands.Range[int, 1, 25]
Amount of infractions to list per page
page: int
Page to select
ephemeral: bool
Hide the command response
inline: bool
Display infractions in a grid arrangement (does not look very good)
export: bool
Exports the server's entire moderation history to a JSON file"""
if ephemeral is None:
ephemeral = (await self.config.user(interaction.user).history_ephemeral()
or await self.config.guild(interaction.guild).history_ephemeral()
or False)
if inline is None:
inline = (await self.config.user(interaction.user).history_inline()
or await self.config.guild(interaction.guild).history_inline()
or False)
if pagesize is None:
if inline is True:
pagesize = (await self.config.user(interaction.user).history_inline_pagesize()
or await self.config.guild(interaction.guild).history_inline_pagesize()
or 6)
else:
pagesize = (await self.config.user(interaction.user).history_pagesize()
or await self.config.guild(interaction.guild).history_pagesize()
or 5)
await interaction.response.defer(ephemeral=ephemeral)
permissions = self.check_permissions(interaction.client.user, ['embed_links'], interaction)
if permissions:
await interaction.followup.send(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True)
return
database = await self.connect()
cursor = database.cursor()
if target:
query = """SELECT *
FROM moderation_%s
WHERE target_id = %s
ORDER BY moderation_id DESC;"""
cursor.execute(query, (interaction.guild.id, target.id))
elif moderator:
query = """SELECT *
FROM moderation_%s
WHERE moderator_id = %s
ORDER BY moderation_id DESC;"""
cursor.execute(query, (interaction.guild.id, moderator.id))
else:
query = """SELECT *
FROM moderation_%s
ORDER BY moderation_id DESC;"""
cursor.execute(query, (interaction.guild.id,))
results = cursor.fetchall()
result_dict_list = []
for result in results:
case_dict = self.generate_dict(result)
if case_dict['moderation_id'] == 0:
continue
result_dict_list.append(case_dict)
if export:
try:
filename = str(data_manager.cog_data_path(cog_instance=self)) + str(os.sep) + f"moderation_{interaction.guild.id}.json"
with open(filename, "w", encoding="utf-8") as f:
json.dump(result_dict_list, f, indent=2)
await interaction.followup.send(file=discord.File(filename, f"moderation_{interaction.guild.id}.json"), ephemeral=ephemeral)
os.remove(filename)
return
except json.JSONDecodeError as e:
await interaction.followup.send(content=f"An error occured while exporting the moderation history.\nError:\n```{e}```", ephemeral=ephemeral)
return
case_quantity = len(result_dict_list)
page_quantity = round(case_quantity / pagesize)
start_index = (page - 1) * pagesize
end_index = page * pagesize
embed = discord.Embed(color=await self.bot.get_embed_color(None))
embed.set_author(icon_url=interaction.guild.icon.url, name='Infraction History')
embed.set_footer(text=f"Page {page}/{page_quantity} | {case_quantity} Results")
memory_dict = {}
for case in result_dict_list[start_index:end_index]:
if case['target_id'] not in memory_dict:
memory_dict[str(case['target_id'])] = await self.fetch_user_dict(interaction, case['target_id'])
target_user = memory_dict[str(case['target_id'])]
if case['moderator_id'] not in memory_dict:
memory_dict[str(case['moderator_id'])] = await self.fetch_user_dict(interaction, case['moderator_id'])
moderator_user = memory_dict[str(case['moderator_id'])]
target_name = target_user['name'] if target_user['discriminator'] == "0" else f"{target_user['name']}#{target_user['discriminator']}"
moderator_name = moderator_user['name'] if moderator_user['discriminator'] == "0" else f"{moderator_user['name']}#{moderator_user['discriminator']}"
field_name = f"Case #{case['moderation_id']} ({str.title(case['moderation_type'])})"
field_value = f"**Target:** `{target_name}` ({target_user['id']})\n**Moderator:** `{moderator_name}` ({moderator_user['id']})"
if len(case['reason']) > 150:
field_value += f"\n**Reason:** `{str(case['reason'])[:150]}...`"
else:
field_value += f"\n**Reason:** `{str(case['reason'])}`"
if case['duration'] != 'NULL':
td = timedelta(**{unit: int(val) for unit, val in zip(["hours", "minutes", "seconds"], case["duration"].split(":"))})
duration_embed = f"{humanize.precisedelta(td)} | <t:{case['end_timestamp']}:R>" if bool(case['expired']) is False else f"{humanize.precisedelta(td)} | Expired"
field_value += f"\n**Duration:** {duration_embed}"
field_value += f"\n**Timestamp:** <t:{case['timestamp']}> | <t:{case['timestamp']}:R>"
if bool(case['resolved']):
field_value += "\n**Resolved:** True"
embed.add_field(name=field_name, value=field_value, inline=inline)
await interaction.followup.send(embed=embed, ephemeral=ephemeral)
@app_commands.command(name="resolve")
async def resolve(self, interaction: discord.Interaction, case: int, reason: str = None):
"""Resolve a specific case.
Parameters
-----------
case: int
Case number of the case you're trying to resolve
reason: str
Reason for resolving case"""
permissions = self.check_permissions(interaction.client.user, ['embed_links', 'moderate_members', 'ban_members'], interaction)
if permissions:
await interaction.response.send_message(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True)
return
conf = await self.check_conf(['mysql_database'])
if conf:
raise(LookupError)
database = await self.connect()
cursor = database.cursor()
db = await self.config.mysql_database()
query_1 = "SELECT * FROM moderation_%s WHERE moderation_id = %s;"
cursor.execute(query_1, (interaction.guild.id, case))
result_1 = cursor.fetchone()
if result_1 is None or case == 0:
await interaction.response.send_message(content=f"There is no moderation with a case number of {case}.", ephemeral=True)
return
query_2 = "SELECT * FROM moderation_%s WHERE moderation_id = %s AND resolved = 0;"
cursor.execute(query_2, (interaction.guild.id, case))
result_2 = cursor.fetchone()
if result_2 is None:
await interaction.response.send_message(content=f"This moderation has already been resolved!\nUse `/case {case}` for more information.", ephemeral=True)
return
case_dict = self.generate_dict(result_2)
if reason is None:
reason = "No reason given."
changes: list = case_dict['changes']
if len(changes) > 25:
await interaction.response.send_message(content="Due to limitations with Discord's embed system, you cannot edit a case more than 25 times.", ephemeral=True)
return
if not changes:
changes.append(
{
'type': "ORIGINAL",
'timestamp': case_dict['timestamp'],
'reason': case_dict['reason'],
'user_id': case_dict['moderator_id']
}
)
changes.append(
{
'type': "RESOLVE",
'timestamp': int(time.time()),
'reason': reason,
'user_id': interaction.user.id
}
)
if case_dict['moderation_type'] in ['UNMUTE', 'UNBAN']:
await interaction.response.send_message(content="You cannot resolve this type of moderation!", ephemeral=True)
if case_dict['moderation_type'] in ['MUTE', 'TEMPBAN', 'BAN']:
if case_dict['moderation_type'] == 'MUTE':
try:
member = await interaction.guild.fetch_member(case_dict['target_id'])
await member.timeout(None, reason=f"Case #{case} resolved by {interaction.user.id}")
except discord.NotFound:
pass
if case_dict['moderation_type'] in ['TEMPBAN', 'BAN']:
try:
user = await interaction.client.fetch_user(case_dict['target_id'])
await interaction.guild.unban(user, reason=f"Case #{case} resolved by {interaction.user.id}")
except discord.NotFound:
pass
resolve_query = f"UPDATE `{db}`.`moderation_{interaction.guild.id}` SET resolved = 1, changes = %s, resolved_by = %s, resolve_reason = %s WHERE moderation_id = %s"
else:
resolve_query = f"UPDATE `{db}`.`moderation_{interaction.guild.id}` SET resolved = 1, changes = %s, resolved_by = %s, resolve_reason = %s WHERE moderation_id = %s"
cursor.execute(resolve_query, (json.dumps(changes), interaction.user.id, reason, case_dict['moderation_id']))
database.commit()
embed = await self.embed_factory('case', interaction=interaction, case_dict=await self.fetch_case(case, interaction.guild.id))
await interaction.response.send_message(content=f"✅ Moderation #{case} resolved!", embed=embed)
await self.log(interaction, case, True)
cursor.close()
database.close()
@app_commands.command(name="case")
@app_commands.choices(export=[
Choice(name='Export as File', value='file'),
Choice(name='Export as Codeblock', value='codeblock')
])
async def case(self, interaction: discord.Interaction, case: int, ephemeral: bool = None, changes: bool = False, export: Choice[str] = None):
"""Check the details of a specific case.
Parameters
-----------
case: int
What case are you looking up?
ephemeral: bool
Hide the command response
changes: bool
List the changes made to the case
export: bool
Export the case to a JSON file or codeblock"""
permissions = self.check_permissions(interaction.client.user, ['embed_links'], interaction)
if permissions:
await interaction.response.send_message(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True)
return
if ephemeral is None:
ephemeral = (await self.config.user(interaction.user).history_ephemeral()
or await self.config.guild(interaction.guild).history_ephemeral()
or False)
if case != 0:
case_dict = await self.fetch_case(case, interaction.guild.id)
if case_dict:
if export:
if export.value == 'file' or len(str(case_dict)) > 1800:
filename = str(data_manager.cog_data_path(cog_instance=self)) + str(os.sep) + f"moderation_{interaction.guild.id}_case_{case}.json"
with open(filename, "w", encoding="utf-8") as f:
json.dump(case_dict, f, indent=2)
if export.value == 'codeblock':
content = f"Case #{case} exported.\n*Case was too large to export as codeblock, so it has been uploaded as a `.json` file.*"
else:
content = f"Case #{case} exported."
await interaction.response.send_message(content=content, file=discord.File(filename, f"moderation_{interaction.guild.id}_case_{case}.json"), ephemeral=ephemeral)
os.remove(filename)
return
await interaction.response.send_message(content=f"```json\n{json.dumps(case_dict, indent=2)}```", ephemeral=ephemeral)
return
if changes:
embed = await self.embed_factory('changes', interaction=interaction, case_dict=case_dict)
else:
embed = await self.embed_factory('case', interaction=interaction, case_dict=case_dict)
await interaction.response.send_message(embed=embed, ephemeral=ephemeral)
return
await interaction.response.send_message(content=f"No case with case number `{case}` found.", ephemeral=True)
@app_commands.command(name="edit")
async def edit(self, interaction: discord.Interaction, case: int, reason: str):
"""Edit the reason of a specific case.
Parameters
-----------
case: int
What case are you editing?
reason: str
What is the new reason?"""
permissions = self.check_permissions(interaction.client.user, ['embed_links'], interaction)
if permissions:
await interaction.response.send_message(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True)
return
if case != 0:
case_dict = await self.fetch_case(case, interaction.guild.id)
if case_dict:
conf = await self.check_conf(['mysql_database'])
if conf:
raise(LookupError)
changes: list = case_dict['changes']
if len(changes) > 25:
await interaction.response.send_message(content="Due to limitations with Discord's embed system, you cannot edit a case more than 25 times.", ephemeral=True)
return
if not changes:
changes.append(
{
'type': "ORIGINAL",
'timestamp': case_dict['timestamp'],
'reason': case_dict['reason'],
'user_id': case_dict['moderator_id']
}
)
changes.append(
{
'type': "EDIT",
'timestamp': int(time.time()),
'reason': reason,
'user_id': interaction.user.id
}
)
database = await self.connect()
cursor = database.cursor()
db = await self.config.mysql_database()
update_query = f"UPDATE `{db}`.`moderation_{interaction.guild.id}` SET changes = %s, reason = %s WHERE moderation_id = %s"
cursor.execute(update_query, (json.dumps(changes), reason, case))
database.commit()
new_case = await self.fetch_case(case, interaction.guild.id)
embed = await self.embed_factory('case', interaction=interaction, case_dict=new_case)
await interaction.response.send_message(content=f"✅ Moderation #{case} edited!", embed=embed, ephemeral=True)
await self.log(interaction, case)
cursor.close()
database.close()
return
await interaction.response.send_message(content=f"No case with case number `{case}` found.", ephemeral=True)
@tasks.loop(minutes=1)
async def handle_expiry(self):
conf = await self.check_conf(['mysql_database'])
if conf:
raise(LookupError)
database = await self.connect()
cursor = database.cursor()
db = await self.config.mysql_database()
guilds: list[discord.Guild] = self.bot.guilds
for guild in guilds:
if not await self.bot.cog_disabled_in_guild(self, guild):
tempban_query = f"SELECT target_id, moderation_id FROM moderation_{guild.id} WHERE end_timestamp != 0 AND end_timestamp <= %s AND moderation_type = 'TEMPBAN' AND expired = 0"
try:
cursor.execute(tempban_query, (time.time(),))
result = cursor.fetchall()
except mysql.connector.errors.ProgrammingError:
continue
target_ids = [row[0] for row in result]
moderation_ids = [row[1] for row in result]
for target_id, moderation_id in zip(target_ids, moderation_ids):
user: discord.User = await self.bot.fetch_user(target_id)
try:
await guild.unban(user, reason=f"Automatic unban from case #{moderation_id}")
embed = await self.embed_factory('message', guild, f'Automatic unban from case #{moderation_id}', 'unbanned')
try:
await user.send(embed=embed)
except discord.errors.HTTPException:
pass
except [discord.errors.NotFound, discord.errors.Forbidden, discord.errors.HTTPException] as e:
print(f"Failed to unban {user.name}#{user.discriminator} ({user.id}) from {guild.name} ({guild.id})\n{e}")
expiry_query = f"UPDATE `{db}`.`moderation_{guild.id}` SET expired = 1 WHERE (end_timestamp != 0 AND end_timestamp <= %s AND expired = 0 AND moderation_type != 'BLACKLIST') OR (expired = 0 AND resolved = 1 AND moderation_type != 'BLACKLIST')"
cursor.execute(expiry_query, (time.time(),))
blacklist_query = f"SELECT target_id, moderation_id, role_id FROM moderation_{guild.id} WHERE end_timestamp != 0 AND end_timestamp <= %s AND moderation_type = 'BLACKLIST' AND expired = 0"
try:
cursor.execute(blacklist_query, (time.time(),))
result = cursor.fetchall()
except mysql.connector.errors.ProgrammingError:
continue
target_ids = [row[0] for row in result]
moderation_ids = [row[1] for row in result]
role_ids = [row[2] for row in result]
for target_id, moderation_id, role_id in zip(target_ids, moderation_ids, role_ids):
try:
member: discord.Member = await guild.fetch_member(target_id)
role: discord.Role = guild.get_role(role_id)
if role is None:
raise discord.errors.NotFound
except [discord.errors.NotFound, discord.errors.Forbidden, discord.errors.HTTPException]:
continue
database.commit()
cursor.close()
database.close()
#######################################################################################################################
### CONFIGURATION COMMANDS
#######################################################################################################################
@commands.group(autohelp=True, aliases=['modset', 'moderationsettings'])
async def moderationset(self, ctx: commands.Context):
"""Manage moderation commands."""
@moderationset.command(name='list', aliases=['view', 'show'])
async def moderationset_list(self, ctx: commands.Context):
"""List all moderation settings."""
if ctx.guild:
guild_settings = await self.config.guild(ctx.guild).all()
guild_settings_string = ""
for setting in guild_settings:
if 'mysql' in setting or 'roles' in setting:
continue
if setting == 'log_channel':
channel = ctx.guild.get_channel(guild_settings[setting])
guild_settings_string += f"**{setting}**: {channel.mention}\n"
else:
guild_settings_string += f"**{setting}**: {guild_settings[setting]}\n"
user_settings = await self.config.user(ctx.author).all()
user_settings_string = ""
for setting in user_settings:
user_settings_string += f"**{setting}**: {user_settings[setting]}\n"
embed = discord.Embed(color=await self.bot.get_embed_color(None))
embed.set_author(icon_url=ctx.guild.icon.url, name=f"{ctx.guild.name} Moderation Settings")
if ctx.guild:
embed.add_field(name="Guild Settings", value=guild_settings_string)
embed.add_field(name="User Settings", value=user_settings_string)
await ctx.send(embed=embed)
@moderationset.group(autohelp=True, name='history')
async def moderationset_history(self, ctx: commands.Context):
"""Manage configuration for the /history command."""
@moderationset_history.command(name='ephemeral', aliases=['hidden', 'hide'])
async def moderationset_history_user_ephemeral(self, ctx: commands.Context, enabled: bool):
"""Toggle if the /history command should be ephemeral."""
await self.config.user(ctx.author).history_ephemeral.set(enabled)
await ctx.send(f"Ephemeral setting set to {enabled}")
@moderationset_history.command(name='pagesize')
async def moderationset_history_user_pagesize(self, ctx: commands.Context, pagesize: int):
"""Set the amount of cases to display per page."""
await self.config.user(ctx.author).history_pagesize.set(pagesize)
await ctx.send(f"Pagesize set to {await self.config.user(ctx.author).history_pagesize()}")
@moderationset_history.group(name='inline')
async def moderationset_history_inline(self, ctx: commands.Context):
"""Manage configuration for the /history command's inline argument."""
@moderationset_history_inline.command(name='toggle')
async def moderationset_history_user_inline_toggle(self, ctx: commands.Context, enabled: bool):
"""Enable the /history command's inline argument by default."""
await self.config.user(ctx.author).history_inline.set(enabled)
await ctx.send(f"Inline setting set to {enabled}")
@moderationset_history_inline.command(name='pagesize')
async def moderationset_history_user_inline_pagesize(self, ctx: commands.Context, pagesize: int):
"""Set the amount of cases to display per page."""
await self.config.user(ctx.author).history_inline_pagesize.set(pagesize)
await ctx.send(f"Inline pagesize set to {await self.config.user(ctx.author).history_inline_pagesize()}")
@moderationset_history.group(autohelp=True, name='guild')
@checks.admin()
async def moderationset_history_guild(self, ctx: commands.Context):
"""Manage configuration for the /history command, per guild."""
@moderationset_history_guild.command(name='ephemeral', aliases=['hidden', 'hide'])
@checks.admin()
async def moderationset_history_guild_ephemeral(self, ctx: commands.Context, enabled: bool):
"""Toggle if the /history command should be ephemeral."""
await self.config.guild(ctx.guild).history_ephemeral.set(enabled)
await ctx.send(f"Ephemeral setting set to {enabled}")
@moderationset_history_guild.command(name='pagesize')
@checks.admin()
async def moderationset_history_guild_pagesize(self, ctx: commands.Context, pagesize: int):
"""Set the amount of cases to display per page."""
await self.config.guild(ctx.guild).history_pagesize.set(pagesize)
await ctx.send(f"Pagesize set to {await self.config.guild(ctx.guild).history_pagesize()}")
@moderationset_history_guild.group(name='inline')
@checks.admin()
async def moderationset_history_guild_inline(self, ctx: commands.Context):
"""Manage configuration for the /history command's inline argument."""
@moderationset_history_guild_inline.command(name='toggle')
@checks.admin()
async def moderationset_history_guild_inline_toggle(self, ctx: commands.Context, enabled: bool):
"""Enable the /history command's inline argument by default."""
await self.config.guild(ctx.guild).history_inline.set(enabled)
await ctx.send(f"Inline setting set to {enabled}")
@moderationset_history_guild_inline.command(name='pagesize')
@checks.admin()
async def moderationset_history_guild_inline_pagesize(self, ctx: commands.Context, pagesize: int):
"""Set the amount of cases to display per page."""
await self.config.guild(ctx.guild).history_inline_pagesize.set(pagesize)
await ctx.send(f"Inline pagesize set to {await self.config.guild(ctx.guild).history_inline_pagesize()}")
@moderationset.group(autohelp=True, name='immunity')
@checks.admin()
async def moderationset_immunity(self, ctx: commands.Context):
"""Manage configuration for immune roles."""
@moderationset_immunity.command(name='add')
@checks.admin()
async def moderationset_immunity_add(self, ctx: commands.Context, role: discord.Role):
"""Add a role to the immune roles list."""
immune_roles: list = await self.config.guild(ctx.guild).immune_roles()
if role.id in immune_roles:
await ctx.send("Role is already immune!")
return
immune_roles.append(role.id)
await self.config.guild(ctx.guild).immune_roles.set(immune_roles)
await ctx.send(f"Role {role.name} added to immune roles.")
@moderationset_immunity.command(name='remove')
@checks.admin()
async def moderationset_immunity_remove(self, ctx: commands.Context, role: discord.Role):
"""Remove a role from the immune roles list."""
immune_roles: list = await self.config.guild(ctx.guild).immune_roles()
if role.id not in immune_roles:
await ctx.send("Role is not immune!")
return
immune_roles.remove(role.id)
await self.config.guild(ctx.guild).immune_roles.set(immune_roles)
await ctx.send(f"Role {role.name} removed from immune roles.")
@moderationset_immunity.command(name='list')
@checks.admin()
async def moderationset_immunity_list(self, ctx: commands.Context):
"""List all immune roles."""
immune_roles: list = await self.config.guild(ctx.guild).immune_roles()
if not immune_roles:
await ctx.send("No immune roles set!")
return
role_list = ""
for role_id in immune_roles:
role = ctx.guild.get_role(role_id)
if role:
role_list += f"{role.mention}\n"
if role_list:
embed = discord.Embed(title="Immune Roles", description=role_list, color=await self.bot.get_embed_color(None))
await ctx.send(embed=embed)
@moderationset.group(autohelp=True, name='blacklist')
@checks.admin()
async def moderationset_blacklist(self, ctx: commands.Context):
"""Manage configuration for the /blacklist command."""
@moderationset_blacklist.command(name='add')
@checks.admin()
async def moderationset_blacklist_add(self, ctx: commands.Context, role: discord.Role, duration: str):
"""Add a role to the blacklist."""
blacklist_roles: list = await self.config.guild(ctx.guild).blacklist_roles()
for blacklist_role in blacklist_roles:
if role.id == blacklist_role['role']:
await ctx.send("Role already has an associated blacklist type!")
return
try:
parsed_time = parse(sval=duration, as_timedelta=True, raise_exception=True)
except ValueError:
await ctx.send("Please provide a valid duration!")
return
blacklist_roles.append(
{
'role': role.id,
'duration': str(parsed_time)
}
)
await self.config.guild(ctx.guild).blacklist_roles.set(blacklist_roles)
await ctx.send(f"Role {role.mention} added as a blacklist type.", allowed_mentions=discord.AllowedMentions.none())
@moderationset_blacklist.command(name='remove')
@checks.admin()
async def moderationset_blacklist_remove(self, ctx: commands.Context, role: discord.Role):
"""Remove a role's blacklist type."""
blacklist_roles: list = await self.config.guild(ctx.guild).blacklist_roles()
for blacklist_role in blacklist_roles:
if role.id == blacklist_role['role']:
blacklist_roles.remove(blacklist_role)
await self.config.guild(ctx.guild).blacklist_roles.set(blacklist_roles)
await ctx.send(f"Role {role.mention} removed from blacklist types.", allowed_mentions=discord.AllowedMentions.none())
return
await ctx.send("Role does not have an associated blacklist type!")
@moderationset_blacklist.command(name='list')
@checks.admin()
async def moderationset_blacklist_list(self, ctx: commands.Context):
"""List all blacklist types."""
blacklist_roles: list = await self.config.guild(ctx.guild).blacklist_roles()
if not blacklist_roles:
await ctx.send("No blacklist types set!")
return
blacklist_list = ""
for blacklist_role in blacklist_roles:
role = ctx.guild.get_role(blacklist_role['role'])
if role:
blacklist_list += f"{role.mention} - {blacklist_role['duration']}\n"
if blacklist_list:
embed = discord.Embed(title="Blacklist Types", description=blacklist_list, color=await self.bot.get_embed_color(None))
await ctx.send(embed=embed)
@moderationset.command(name="ignorebots")
@checks.admin()
async def moderationset_ignorebots(self, ctx: commands.Context):
"""Toggle if the cog should ignore other bots' moderations."""
await self.config.guild(ctx.guild).ignore_other_bots.set(not await self.config.guild(ctx.guild).ignore_other_bots())
await ctx.send(f"Ignore bots setting set to {await self.config.guild(ctx.guild).ignore_other_bots()}")
@moderationset.command(name="dm")
@checks.admin()
async def moderationset_dm(self, ctx: commands.Context):
"""Toggle automatically messaging moderated users.
This option can be overridden by specifying the `silent` argument in any moderation command."""
await self.config.guild(ctx.guild).dm_users.set(not await self.config.guild(ctx.guild).dm_users())
await ctx.send(f"DM users setting set to {await self.config.guild(ctx.guild).dm_users()}")
@moderationset.command(name="permissions")
@checks.admin()
async def moderationset_permissions(self, ctx: commands.Context):
"""Toggle whether the bot will check for discord permissions."""
await self.config.guild(ctx.guild).use_discord_permissions.set(not await self.config.guild(ctx.guild).use_discord_permissions())
await ctx.send(f"Use Discord Permissions setting set to {await self.config.guild(ctx.guild).use_discord_permissions()}")
@moderationset.command(name="logchannel")
@checks.admin()
async def moderationset_logchannel(self, ctx: commands.Context, channel: discord.TextChannel = None):
"""Set a channel to log infractions to."""
if channel:
await self.config.guild(ctx.guild).log_channel.set(channel.id)
await ctx.send(f"Logging channel set to {channel.mention}.")
else:
await self.config.guild(ctx.guild).log_channel.set(" ")
await ctx.send("Logging channel disabled.")
@moderationset.command(name="mysql")
@checks.is_owner()
async def moderationset_mysql(self, ctx: commands.Context):
"""Configure MySQL connection details."""
await ctx.message.add_reaction("")
await ctx.author.send(content="Click the button below to configure your MySQL connection details.", view=self.ConfigButtons(60))
class ConfigButtons(discord.ui.View):
def __init__(self, timeout):
super().__init__()
self.config = Config.get_conf(None, cog_name='Moderation', identifier=481923957134912)
@discord.ui.button(label="Edit", style=discord.ButtonStyle.success)
async def config_button(self, interaction: discord.Interaction, button: discord.ui.Button): # pylint: disable=unused-argument
await interaction.response.send_modal(Moderation.MySQLConfigModal(self.config))
class MySQLConfigModal(discord.ui.Modal, title="MySQL Database Configuration"):
def __init__(self, config):
super().__init__()
self.config = config
address = discord.ui.TextInput(
label="Address",
placeholder="Input your MySQL address here.",
style=discord.TextStyle.short,
required=False,
max_length=300
)
database = discord.ui.TextInput(
label="Database",
placeholder="Input the name of your database here.",
style=discord.TextStyle.short,
required=False,
max_length=300
)
username = discord.ui.TextInput(
label="Username",
placeholder="Input your MySQL username here.",
style=discord.TextStyle.short,
required=False,
max_length=300
)
password = discord.ui.TextInput(
label="Password",
placeholder="Input your MySQL password here.",
style=discord.TextStyle.short,
required=False,
max_length=300
)
async def on_submit(self, interaction: discord.Interaction):
message = ""
if self.address.value != "":
await self.config.mysql_address.set(self.address.value)
message += f"- Address set to\n - `{self.address.value}`\n"
if self.database.value != "":
await self.config.mysql_database.set(self.database.value)
message += f"- Database set to\n - `{self.database.value}`\n"
if self.username.value != "":
await self.config.mysql_username.set(self.username.value)
message += f"- Username set to\n - `{self.username.value}`\n"
if self.password.value != "":
await self.config.mysql_password.set(self.password.value)
trimmed_password = self.password.value[:8]
message += f"- Password set to\n - `{trimmed_password}` - Trimmed for security\n"
if message == "":
trimmed_password = str(await self.config.mysql_password())[:8]
send = f"No changes were made.\nCurrent configuration:\n- Address:\n - `{await self.config.mysql_address()}`\n- Database:\n - `{await self.config.mysql_database()}`\n- Username:\n - `{await self.config.mysql_username()}`\n- Password:\n - `{trimmed_password}` - Trimmed for security"
else:
send = f"Configuration changed:\n{message}"
await interaction.response.send_message(send, ephemeral=True)
@moderationset.group(autohelp=True, name='import', hidden=True)
@checks.admin()
async def moderationset_import(self, ctx: commands.Context):
"""Import moderations from other bots."""
@moderationset_import.command(name="galacticbot")
@checks.admin()
async def moderationset_import_galacticbot(self, ctx: commands.Context):
"""Import moderations from GalacticBot. **UNFINISHED!**"""
if ctx.message.attachments and ctx.message.attachments[0].content_type == 'application/json; charset=utf-8':
message = await ctx.send("Are you sure you want to import GalacticBot moderations? **This will overwrite any moderations that already exist in this guild's moderation table.**")
await message.edit(view=self.GalacticBotImportButtons(60, ctx, message, self))
else:
await ctx.send("Please provide a valid GalacticBot moderation export file.")
class GalacticBotImportButtons(discord.ui.View):
def __init__(self, timeout, ctx, message, cog_instance):
super().__init__()
self.cog_instance = cog_instance
self.ctx: commands.Context = ctx
self.message: discord.Message = message
self.config = Config.get_conf(None, cog_name='Moderation', identifier=481923957134912)
@discord.ui.button(label="Yes", style=discord.ButtonStyle.success)
async def import_button_y(self, interaction: discord.Interaction, button: discord.ui.Button): # pylint: disable=unused-argument
await self.message.delete()
await interaction.response.send_message("Deleting original table...", ephemeral=True)
database = await Moderation.connect(self.cog_instance)
cursor = database.cursor()
query = f"DROP TABLE IF EXISTS moderation_{self.ctx.guild.id};"
cursor.execute(query)
cursor.close()
database.commit()
database.close()
await interaction.edit_original_response(content="Creating new table...")
await Moderation.create_guild_table(self.cog_instance, self.ctx.guild)
await interaction.edit_original_response(content="Importing moderations...")
accepted_types = [
'NOTE',
'WARN',
'MUTE',
'UNMUTE',
'KICK',
'SOFTBAN',
'BAN',
'UNBAN'
]
file = await self.ctx.message.attachments[0].read()
data = sorted(json.loads(file), key=lambda x: x['case'])
for case in data:
if case['type'] not in accepted_types:
continue
timestamp = case['timestamp'] / 1000
if case['duration']:
duration = timedelta(milliseconds=case['duration'])
if case['resolved']:
resolved = 1
for change in case['changes']:
if change['type'] == 'RESOLVE':
resolved_by = change['staff']
resolved_reason = change['reason']
resolve_timestamp = change['timestamp'] / 1000
break
if resolved_by is None:
resolved_by = '?'
if resolved_reason is None:
resolved_reason = 'Could not get resolve reason during moderation import.'
changes = [
{
'type': "ORIGINAL",
'reason': case['reason'],
'user_id': case['executor'],
'timestamp': timestamp
},
{
'type': "RESOLVE",
'reason': resolved_reason,
'user_id': resolved_by,
'timestamp': resolve_timestamp
}
]
await Moderation.mysql_log(self.cog_instance, self.ctx.guild, case['executor'], case['type'], case['target'], 0, duration, case['reason'], timestamp=timestamp, resolved=resolved, resolved_by=resolved_by, resolved_reason=resolved_reason, changes=changes)
await interaction.edit_original_response(content="Import complete.")
@discord.ui.button(label="No", style=discord.ButtonStyle.danger)
async def import_button_n(self, interaction: discord.Interaction, button: discord.ui.Button): # pylint: disable=unused-argument
await self.message.edit("Import cancelled.", view=None)
await self.message.delete(10)
await self.ctx.message.delete(10)
@commands.command(aliases=["tdc"])
async def timedeltaconvert(self, ctx: commands.Context, *, duration: str):
"""This command converts a duration to a [`timedelta`](https://docs.python.org/3/library/datetime.html#datetime.timedelta) Python object.
**Example usage**
`[p]timedeltaconvert 1 day 15hr 82 minutes 52s`
**Output**
`1 day, 16:22:52`"""
try:
parsed_time = parse(duration, as_timedelta=True, raise_exception=True)
await ctx.send(f"`{str(parsed_time)}`")
except ValueError:
await ctx.send("Please provide a convertible value!")