forked from blizzthewolf/SeaCogs
1936 lines
90 KiB
Python
1936 lines
90 KiB
Python
# _____ _
|
|
# / ____| (_)
|
|
# | (___ ___ __ _ _____ ___ _ __ ___ _ __ ___ ___ _ __
|
|
# \___ \ / _ \/ _` / __\ \ /\ / / | '_ ` _ \| '_ ` _ \ / _ \ '__|
|
|
# ____) | __/ (_| \__ \\ V V /| | | | | | | | | | | | __/ |
|
|
# |_____/ \___|\__,_|___/ \_/\_/ |_|_| |_| |_|_| |_| |_|\___|_|
|
|
|
|
import logging
|
|
import json
|
|
import time
|
|
import os
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Union
|
|
import discord
|
|
import humanize
|
|
import mysql.connector
|
|
from discord.ext import tasks
|
|
from pytimeparse2 import disable_dateutil, parse
|
|
from redbot.core import app_commands, checks, Config, commands, data_manager
|
|
from redbot.core.app_commands import Choice
|
|
|
|
class Moderation(commands.Cog):
|
|
"""Custom moderation cog.
|
|
Developed by SeaswimmerTheFsh."""
|
|
|
|
async def red_delete_data_for_user(self, *, requester, user_id: int):
|
|
if requester == "discord_deleted_user":
|
|
await self.config.user_from_id(user_id).clear()
|
|
|
|
database = await self.connect()
|
|
cursor = database.cursor()
|
|
|
|
cursor.execute("SHOW TABLES;")
|
|
tables = [table[0] for table in cursor.fetchall()]
|
|
|
|
condition = "target_id = %s OR moderator_id = %s;"
|
|
|
|
for table in tables:
|
|
delete_query = f"DELETE FROM {table[0]} WHERE {condition}"
|
|
cursor.execute(delete_query, (user_id, user_id))
|
|
|
|
database.commit()
|
|
cursor.close()
|
|
database.close()
|
|
if requester == "owner":
|
|
await self.config.user_from_id(user_id).clear()
|
|
if requester == "user":
|
|
await self.config.user_from_id(user_id).clear()
|
|
if requester == "user_strict":
|
|
await self.config.user_from_id(user_id).clear()
|
|
else:
|
|
self.logger.warning("Invalid requester passed to red_delete_data_for_user: %s", requester)
|
|
|
|
def __init__(self, bot):
|
|
self.bot = bot
|
|
self.config = Config.get_conf(self, identifier=481923957134912)
|
|
self.config.register_global(
|
|
mysql_address= " ",
|
|
mysql_database = " ",
|
|
mysql_username = " ",
|
|
mysql_password = " "
|
|
)
|
|
self.config.register_guild(
|
|
use_discord_permissions = True,
|
|
ignore_other_bots = True,
|
|
dm_users = True,
|
|
log_channel = " ",
|
|
immune_roles = [],
|
|
history_ephemeral = False,
|
|
history_inline = False,
|
|
history_pagesize = 5,
|
|
history_inline_pagesize = 6,
|
|
blacklist_roles = []
|
|
)
|
|
self.config.register_user(
|
|
history_ephemeral = None,
|
|
history_inline = None,
|
|
history_pagesize = None,
|
|
history_inline_pagesize = None
|
|
)
|
|
disable_dateutil()
|
|
self.handle_expiry.start() # pylint: disable=no-member
|
|
self.logger = logging.getLogger('red.sea.moderation')
|
|
|
|
async def cog_load(self):
|
|
"""This method prepares the database schema for all of the guilds the bot is currently in."""
|
|
conf = await self.check_conf([
|
|
'mysql_address',
|
|
'mysql_database',
|
|
'mysql_username',
|
|
'mysql_password'
|
|
])
|
|
|
|
if conf:
|
|
self.logger.error("Failed to create tables, due to MySQL connection configuration being unset.")
|
|
return
|
|
|
|
guilds: list[discord.Guild] = self.bot.guilds
|
|
|
|
try:
|
|
for guild in guilds:
|
|
if not await self.bot.cog_disabled_in_guild(self, guild):
|
|
await self.create_guild_table(guild)
|
|
|
|
except ConnectionRefusedError:
|
|
return
|
|
|
|
async def cog_unload(self):
|
|
self.handle_expiry.cancel() # pylint: disable=no-member
|
|
|
|
@commands.Cog.listener('on_guild_join')
|
|
async def db_generate_guild_join(self, guild: discord.Guild):
|
|
"""This method prepares the database schema whenever the bot joins a guild."""
|
|
if not await self.bot.cog_disabled_in_guild(self, guild):
|
|
conf = await self.check_conf([
|
|
'mysql_address',
|
|
'mysql_database',
|
|
'mysql_username',
|
|
'mysql_password'
|
|
|
|
])
|
|
if conf:
|
|
self.logger.error("Failed to create a table for %s, due to MySQL connection configuration being unset.", guild.id)
|
|
return
|
|
|
|
try:
|
|
await self.create_guild_table(guild)
|
|
|
|
except ConnectionRefusedError:
|
|
return
|
|
|
|
@commands.Cog.listener('on_audit_log_entry_create')
|
|
async def autologger(self, entry: discord.AuditLogEntry):
|
|
"""This method automatically logs moderations done by users manually ("right clicks")."""
|
|
if not await self.bot.cog_disabled_in_guild(self, entry.guild):
|
|
if await self.config.guild(entry.guild).ignore_other_bots() is True:
|
|
if entry.user.bot or entry.target.bot:
|
|
return
|
|
else:
|
|
if entry.user.id == self.bot.user.id:
|
|
return
|
|
|
|
duration = "NULL"
|
|
|
|
if entry.reason:
|
|
reason = entry.reason + " (This action was performed without the bot.)"
|
|
|
|
else:
|
|
reason = "This action was performed without the bot."
|
|
|
|
if entry.action == discord.AuditLogAction.kick:
|
|
moderation_type = 'KICK'
|
|
|
|
elif entry.action == discord.AuditLogAction.ban:
|
|
moderation_type = 'BAN'
|
|
|
|
elif entry.action == discord.AuditLogAction.unban:
|
|
moderation_type = 'UNBAN'
|
|
|
|
elif entry.action == discord.AuditLogAction.member_update:
|
|
if entry.after.timed_out_until is not None:
|
|
timed_out_until_aware = entry.after.timed_out_until.replace(tzinfo=timezone.utc)
|
|
duration_datetime = timed_out_until_aware - datetime.now(tz=timezone.utc)
|
|
minutes = round(duration_datetime.total_seconds() / 60)
|
|
duration = timedelta(minutes=minutes)
|
|
moderation_type = 'MUTE'
|
|
else:
|
|
moderation_type = 'UNMUTE'
|
|
else:
|
|
return
|
|
|
|
await self.mysql_log(entry.guild.id, entry.user.id, moderation_type, entry.target.id, 0, duration, reason)
|
|
|
|
async def connect(self):
|
|
"""Connects to the MySQL database, and returns a connection object."""
|
|
conf = await self.check_conf([
|
|
'mysql_address',
|
|
'mysql_database',
|
|
'mysql_username',
|
|
'mysql_password'
|
|
])
|
|
|
|
if conf:
|
|
raise LookupError("MySQL connection details not set properly!")
|
|
|
|
try:
|
|
connection = mysql.connector.connect(
|
|
host=await self.config.mysql_address(),
|
|
user=await self.config.mysql_username(),
|
|
password=await self.config.mysql_password(),
|
|
database=await self.config.mysql_database()
|
|
)
|
|
|
|
return connection
|
|
|
|
except mysql.connector.ProgrammingError as e:
|
|
self.logger.error("Unable to access the MySQL database!\nError:\n%s", e.msg)
|
|
raise ConnectionRefusedError(f"Unable to access the MySQL Database!\n{e.msg}") from e
|
|
|
|
async def create_guild_table(self, guild: discord.Guild):
|
|
database = await self.connect()
|
|
cursor = database.cursor()
|
|
|
|
try:
|
|
cursor.execute(f"SELECT * FROM `moderation_{guild.id}`")
|
|
self.logger.debug("MySQL Table exists for server %s (%s)", guild.name, guild.id)
|
|
|
|
except mysql.connector.errors.ProgrammingError:
|
|
query = f"""
|
|
CREATE TABLE `moderation_{guild.id}` (
|
|
moderation_id INT UNIQUE PRIMARY KEY NOT NULL,
|
|
timestamp INT NOT NULL,
|
|
moderation_type LONGTEXT NOT NULL,
|
|
target_id LONGTEXT NOT NULL,
|
|
moderator_id LONGTEXT NOT NULL,
|
|
role_id LONGTEXT,
|
|
duration LONGTEXT,
|
|
end_timestamp INT,
|
|
reason LONGTEXT,
|
|
resolved BOOL NOT NULL,
|
|
resolved_by LONGTEXT,
|
|
resolve_reason LONGTEXT,
|
|
expired BOOL NOT NULL,
|
|
changes JSON NOT NULL,
|
|
metadata JSON NOT NULL,
|
|
)
|
|
"""
|
|
cursor.execute(query)
|
|
|
|
index_query_1 = "CREATE INDEX idx_target_id ON moderation_%s(target_id(25));"
|
|
cursor.execute(index_query_1, (guild.id,))
|
|
index_query_2 = "CREATE INDEX idx_moderator_id ON moderation_%s(moderator_id(25));"
|
|
cursor.execute(index_query_2, (guild.id,))
|
|
index_query_3 = "CREATE INDEX idx_moderation_id ON moderation_%s(moderation_id);"
|
|
cursor.execute(index_query_3, (guild.id,))
|
|
|
|
insert_query = f"""
|
|
INSERT INTO `moderation_{guild.id}`
|
|
(moderation_id, timestamp, moderation_type, target_id, moderator_id, role_id, duration, end_timestamp, reason, resolved, resolved_by, resolve_reason, expired, changes, metadata)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
"""
|
|
insert_values = (0, 0, "NULL", 0, 0, 0, "NULL", 0, "NULL", 0, "NULL", "NULL", 0, json.dumps([]), json.dumps({}))
|
|
cursor.execute(insert_query, insert_values)
|
|
|
|
database.commit()
|
|
|
|
self.logger.debug("MySQL Table (moderation_%s) created for %s (%s)", guild.id, guild.name, guild.id)
|
|
|
|
database.close()
|
|
|
|
async def check_conf(self, config: list):
|
|
"""Checks if any required config options are not set."""
|
|
not_found_list = []
|
|
|
|
for item in config:
|
|
if await self.config.item() == " ":
|
|
not_found_list.append(item)
|
|
|
|
return not_found_list
|
|
|
|
def check_permissions(self, user: discord.User, permissions: list, ctx: Union[commands.Context, discord.Interaction] = None, guild: discord.Guild = None):
|
|
"""Checks if a user has a specific permission (or a list of permissions) in a channel."""
|
|
if ctx:
|
|
member = ctx.guild.get_member(user.id)
|
|
resolved_permissions = ctx.channel.permissions_for(member)
|
|
|
|
elif guild:
|
|
member = guild.get_member(user.id)
|
|
resolved_permissions = member.guild_permissions
|
|
|
|
else:
|
|
raise(KeyError)
|
|
|
|
for permission in permissions:
|
|
if not getattr(resolved_permissions, permission, False) and not resolved_permissions.administrator is True:
|
|
return permission
|
|
|
|
return False
|
|
|
|
async def check_moddable(self, target: Union[discord.User, discord.Member], interaction: discord.Interaction, permissions: list):
|
|
"""Checks if a moderator can moderate a target."""
|
|
if self.check_permissions(interaction.client.user, permissions, guild=interaction.guild):
|
|
await interaction.response.send_message(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True)
|
|
return False
|
|
|
|
if await self.config.guild(interaction.guild).use_discord_permissions() is True:
|
|
if self.check_permissions(interaction.user, permissions, guild=interaction.guild):
|
|
await interaction.response.send_message(f"You do not have the `{permissions}` permission, required for this action.", ephemeral=True)
|
|
return False
|
|
|
|
if interaction.user.id == target.id:
|
|
await interaction.response.send_message(content="You cannot moderate yourself!", ephemeral=True)
|
|
return False
|
|
|
|
if target.bot:
|
|
await interaction.response.send_message(content="You cannot moderate bots!", ephemeral=True)
|
|
return False
|
|
|
|
if isinstance(target, discord.Member):
|
|
if interaction.user.top_role <= target.top_role:
|
|
await interaction.response.send_message(content="You cannot moderate members with a higher role than you!", ephemeral=True)
|
|
return False
|
|
|
|
if interaction.guild.get_member(interaction.client.user.id).top_role <= target.top_role:
|
|
await interaction.response.send_message(content="You cannot moderate members with a role higher than the bot!", ephemeral=True)
|
|
return False
|
|
|
|
immune_roles = await self.config.guild(target.guild).immune_roles()
|
|
|
|
for role in target.roles:
|
|
if role.id in immune_roles:
|
|
await interaction.response.send_message(content="You cannot moderate members with an immune role!", ephemeral=True)
|
|
return False
|
|
|
|
return True
|
|
|
|
async def mysql_log(self, guild_id: str, author_id: str, moderation_type: str, target_id: int, role_id: int, duration, reason: str, database: mysql.connector.MySQLConnection = None, timestamp: int = None, resolved: bool = False, resolved_by: str = None, resolved_reason: str = None, expired: bool = None, changes: list = [], metadata: dict = {}): # pylint: disable=dangerous-default-argument
|
|
if not timestamp:
|
|
timestamp = int(time.time())
|
|
|
|
if duration != "NULL":
|
|
end_timedelta = datetime.fromtimestamp(timestamp) + duration
|
|
end_timestamp = int(end_timedelta.timestamp())
|
|
else:
|
|
end_timestamp = 0
|
|
|
|
if not expired:
|
|
if int(time.time()) > end_timestamp:
|
|
expired = 1
|
|
else:
|
|
expired = 0
|
|
|
|
if resolved_by is None:
|
|
resolved_by = "NULL"
|
|
|
|
if resolved_reason is None:
|
|
resolved_reason = "NULL"
|
|
|
|
if not database:
|
|
database = await self.connect()
|
|
close_db = True
|
|
else:
|
|
close_db = False
|
|
cursor = database.cursor()
|
|
|
|
moderation_id = await self.get_next_case_number(guild_id=guild_id, cursor=cursor)
|
|
|
|
sql = f"INSERT INTO `moderation_{guild_id}` (moderation_id, timestamp, moderation_type, target_id, moderator_id, role_id, duration, end_timestamp, reason, resolved, resolved_by, resolve_reason, expired, changes, metadata) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
|
|
val = (moderation_id, timestamp, moderation_type, target_id, author_id, role_id, duration, end_timestamp, reason, int(resolved), resolved_by, resolved_reason, expired, json.dumps(changes), json.dumps(metadata))
|
|
cursor.execute(sql, val)
|
|
|
|
cursor.close()
|
|
database.commit()
|
|
if close_db:
|
|
database.close()
|
|
|
|
self.logger.debug("MySQL row inserted into moderation_%s!\n%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s", guild_id, moderation_id, timestamp, moderation_type, target_id, author_id, role_id, duration, end_timestamp, reason, int(resolved), resolved_by, resolved_reason, expired, changes, metadata)
|
|
|
|
return moderation_id
|
|
|
|
async def get_next_case_number(self, guild_id: str, cursor = None):
|
|
"""This method returns the next case number from the MySQL table for a specific guild."""
|
|
if not cursor:
|
|
database = await self.connect()
|
|
cursor = database.cursor()
|
|
cursor.execute(f"SELECT moderation_id FROM `moderation_{guild_id}` ORDER BY moderation_id DESC LIMIT 1")
|
|
return cursor.fetchone()[0] + 1
|
|
|
|
def generate_dict(self, result):
|
|
case: dict = {
|
|
"moderation_id": result[0],
|
|
"timestamp": result[1],
|
|
"moderation_type": result[2],
|
|
"target_id": result[3],
|
|
"moderator_id": result[4],
|
|
"role_id": result[5],
|
|
"duration": result[6],
|
|
"end_timestamp": result[7],
|
|
"reason": result[8],
|
|
"resolved": result[9],
|
|
"resolved_by": result[10],
|
|
"resolve_reason": result[11],
|
|
"expired": result[12],
|
|
"changes": json.loads(result[13]),
|
|
"metadata": json.loads(result[14])
|
|
}
|
|
return case
|
|
|
|
async def fetch_user_dict(self, interaction: discord.Interaction, user_id: str):
|
|
"""This method returns a dictionary containing either user information or a standard deleted user template."""
|
|
if user_id == '?':
|
|
user_dict = {
|
|
'id': '?',
|
|
'name': 'Unknown User',
|
|
'discriminator':'0'
|
|
}
|
|
|
|
else:
|
|
try:
|
|
user = interaction.client.get_user(user_id)
|
|
if user is None:
|
|
user = await interaction.client.fetch_user(user_id)
|
|
|
|
user_dict = {
|
|
'id': user.id,
|
|
'name': user.name,
|
|
'discriminator': user.discriminator
|
|
}
|
|
|
|
except discord.errors.NotFound:
|
|
user_dict = {
|
|
'id': user_id,
|
|
'name': 'Deleted User',
|
|
'discriminator': '0'
|
|
}
|
|
|
|
return user_dict
|
|
|
|
async def fetch_role_dict(self, interaction: discord.Interaction, role_id: str):
|
|
"""This method returns a dictionary containing either role information or a standard deleted role template."""
|
|
try:
|
|
role = interaction.guild.get_role(role_id)
|
|
|
|
role_dict = {
|
|
'id': role.id,
|
|
'name': role.name
|
|
}
|
|
|
|
except discord.errors.NotFound:
|
|
role_dict = {
|
|
'id': role_id,
|
|
'name': 'Deleted Role'
|
|
}
|
|
|
|
return role_dict
|
|
|
|
async def embed_factory(self, embed_type: str, /, interaction: discord.Interaction = None, case_dict: dict = None, guild: discord.Guild = None, reason: str = None, moderation_type: str = None, response: discord.InteractionMessage = None, duration: timedelta = None, resolved: bool = False):
|
|
"""This method creates an embed from set parameters, meant for either moderation logging or contacting the moderated user.
|
|
|
|
Valid arguments for 'embed_type':
|
|
- 'message'
|
|
- 'log' - WIP
|
|
- 'case'
|
|
- 'changes'
|
|
|
|
Required arguments for 'message':
|
|
- guild
|
|
- reason
|
|
- moderation_type
|
|
- response
|
|
- duration (optional)
|
|
|
|
Required arguments for 'log':
|
|
- interaction
|
|
- case_dict
|
|
- resolved (optional)
|
|
|
|
Required arguments for 'case' & 'changes':
|
|
- interaction
|
|
- case_dict"""
|
|
if embed_type == 'message':
|
|
|
|
if moderation_type in ["kicked", "banned", "tempbanned", "unbanned"]:
|
|
guild_name = guild.name
|
|
else:
|
|
guild_name = f"[{guild.name}]({response.jump_url})"
|
|
|
|
if moderation_type in ["tempbanned", "muted"] and duration:
|
|
embed_duration = f" for {humanize.precisedelta(duration)}"
|
|
else:
|
|
embed_duration = ""
|
|
|
|
if moderation_type == "note":
|
|
embed_desc = "received a"
|
|
else:
|
|
embed_desc = "been"
|
|
|
|
embed = discord.Embed(title=str.title(moderation_type), description=f"You have {embed_desc} {moderation_type}{embed_duration} in {guild_name}.", color=await self.bot.get_embed_color(None), timestamp=datetime.now())
|
|
embed.add_field(name='Reason', value=f"`{reason}`")
|
|
embed.set_author(name=guild.name, icon_url=guild.icon.url)
|
|
embed.set_footer(text=f"Case #{await self.get_next_case_number(guild.id):,}", icon_url="https://cdn.discordapp.com/attachments/1070822161389994054/1159469476773904414/arrow-right-circle-icon-512x512-2p1e2aaw.png?ex=65312319&is=651eae19&hm=3cebdd28e805c13a79ec48ef87c32ca532ffa6b9ede2e48d0cf8e5e81f3a6818&")
|
|
|
|
return embed
|
|
|
|
if embed_type == 'case':
|
|
target_user = await self.fetch_user_dict(interaction, case_dict['target_id'])
|
|
moderator_user = await self.fetch_user_dict(interaction, case_dict['moderator_id'])
|
|
|
|
target_name = f"`{target_user['name']}`" if target_user['discriminator'] == "0" else f"`{target_user['name']}#{target_user['discriminator']}`"
|
|
moderator_name = f"`{moderator_user['name']}`" if moderator_user['discriminator'] == "0" else f"`{moderator_user['name']}#{moderator_user['discriminator']}`"
|
|
|
|
embed = discord.Embed(title=f"📕 Case #{case_dict['moderation_id']:,}", color=await self.bot.get_embed_color(None))
|
|
embed.description = f"**Type:** {str.title(case_dict['moderation_type'])}\n**Target:** {target_name} ({target_user['id']})\n**Moderator:** {moderator_name} ({moderator_user['id']})\n**Resolved:** {bool(case_dict['resolved'])}\n**Timestamp:** <t:{case_dict['timestamp']}> | <t:{case_dict['timestamp']}:R>"
|
|
|
|
if case_dict['duration'] != 'NULL':
|
|
td = timedelta(**{unit: int(val) for unit, val in zip(["hours", "minutes", "seconds"], case_dict["duration"].split(":"))})
|
|
duration_embed = f"{humanize.precisedelta(td)} | <t:{case_dict['end_timestamp']}:R>" if bool(case_dict['expired']) is False else str(humanize.precisedelta(td))
|
|
embed.description += f"\n**Duration:** {duration_embed}\n**Expired:** {bool(case_dict['expired'])}"
|
|
|
|
embed.description += f"\n**Changes:** {len(case_dict['changes'])}"
|
|
|
|
if case_dict['metadata']:
|
|
if case_dict['metadata']['imported_from']:
|
|
embed.description += f"\n**Imported From:** {case_dict['metadata']['imported_from']}"
|
|
|
|
embed.add_field(name='Reason', value=f"```{case_dict['reason']}```", inline=False)
|
|
|
|
if case_dict['resolved'] == 1:
|
|
resolved_user = await self.fetch_user_dict(interaction, case_dict['resolved_by'])
|
|
resolved_name = f"`{resolved_user['name']}`" if resolved_user['discriminator'] == "0" else f"`{resolved_user['name']}#{resolved_user['discriminator']}`"
|
|
embed.add_field(name='Resolve Reason', value=f"Resolved by {resolved_name} ({resolved_user['id']}) for:\n```{case_dict['resolve_reason']}```", inline=False)
|
|
|
|
return embed
|
|
|
|
if embed_type == 'changes':
|
|
embed = discord.Embed(title=f"📕 Case #{case_dict['moderation_id']:,} Changes", color=await self.bot.get_embed_color(None))
|
|
|
|
memory_dict = {}
|
|
|
|
if case_dict['changes']:
|
|
for change in case_dict['changes']:
|
|
if change['user_id'] not in memory_dict:
|
|
memory_dict[str(change['user_id'])] = await self.fetch_user_dict(interaction, change['user_id'])
|
|
|
|
user = memory_dict[str(change['user_id'])]
|
|
name = user['name'] if user['discriminator'] == "0" else f"{user['name']}#{user['discriminator']}"
|
|
|
|
timestamp = f"<t:{change['timestamp']}> | <t:{change['timestamp']}:R>"
|
|
|
|
if change['type'] == 'ORIGINAL':
|
|
embed.add_field(name='Original', value=f"**User:** `{name}` ({user['id']})\n**Reason:** {change['reason']}\n**Timestamp:** {timestamp}", inline=False)
|
|
|
|
elif change['type'] == 'EDIT':
|
|
embed.add_field(name='Edit', value=f"**User:** `{name}` ({user['id']})\n**Reason:** {change['reason']}\n**Timestamp:** {timestamp}", inline=False)
|
|
|
|
elif change['type'] == 'RESOLVE':
|
|
embed.add_field(name='Resolve', value=f"**User:** `{name}` ({user['id']})\n**Reason:** {change['reason']}\n**Timestamp:** {timestamp}", inline=False)
|
|
|
|
else:
|
|
embed.description = "*No changes have been made to this case.* 🙁"
|
|
|
|
return embed
|
|
|
|
if embed_type == 'log':
|
|
if resolved:
|
|
target_user = await self.fetch_user_dict(interaction, case_dict['target_id'])
|
|
moderator_user = await self.fetch_user_dict(interaction, case_dict['moderator_id'])
|
|
|
|
target_name = f"`{target_user['name']}`" if target_user['discriminator'] == "0" else f"`{target_user['name']}#{target_user['discriminator']}`"
|
|
moderator_name = moderator_user['name'] if moderator_user['discriminator'] == "0" else f"{moderator_user['name']}#{moderator_user['discriminator']}"
|
|
|
|
embed = discord.Embed(title=f"📕 Case #{case_dict['moderation_id']:,} Resolved", color=await self.bot.get_embed_color(None))
|
|
|
|
embed.description = f"**Type:** {str.title(case_dict['moderation_type'])}\n**Target:** {target_name} ({target_user['id']})\n**Moderator:** {moderator_name} ({moderator_user['id']})\n**Timestamp:** <t:{case_dict['timestamp']}> | <t:{case_dict['timestamp']}:R>"
|
|
|
|
if case_dict['duration'] != 'NULL':
|
|
td = timedelta(**{unit: int(val) for unit, val in zip(["hours", "minutes", "seconds"], case_dict["duration"].split(":"))})
|
|
duration_embed = f"{humanize.precisedelta(td)} | <t:{case_dict['end_timestamp']}:R>" if case_dict["expired"] == '0' else str(humanize.precisedelta(td))
|
|
embed.description = embed.description + f"\n**Duration:** {duration_embed}\n**Expired:** {bool(case_dict['expired'])}"
|
|
|
|
embed.add_field(name='Reason', value=f"```{case_dict['reason']}```", inline=False)
|
|
|
|
resolved_user = await self.fetch_user_dict(interaction, case_dict['resolved_by'])
|
|
resolved_name = resolved_user['name'] if resolved_user['discriminator'] == "0" else f"{resolved_user['name']}#{resolved_user['discriminator']}"
|
|
embed.add_field(name='Resolve Reason', value=f"Resolved by {resolved_name} ({resolved_user['id']}) for:\n```{case_dict['resolve_reason']}```", inline=False)
|
|
else:
|
|
target_user = await self.fetch_user_dict(interaction, case_dict['target_id'])
|
|
moderator_user = await self.fetch_user_dict(interaction, case_dict['moderator_id'])
|
|
|
|
target_name = f"`{target_user['name']}`" if target_user['discriminator'] == "0" else f"`{target_user['name']}#{target_user['discriminator']}`"
|
|
moderator_name = moderator_user['name'] if moderator_user['discriminator'] == "0" else f"{moderator_user['name']}#{moderator_user['discriminator']}"
|
|
|
|
embed = discord.Embed(title=f"📕 Case #{case_dict['moderation_id']:,}", color=await self.bot.get_embed_color(None))
|
|
embed.description = f"**Type:** {str.title(case_dict['moderation_type'])}\n**Target:** {target_name} ({target_user['id']})\n**Moderator:** {moderator_name} ({moderator_user['id']})\n**Timestamp:** <t:{case_dict['timestamp']}> | <t:{case_dict['timestamp']}:R>"
|
|
|
|
if case_dict['duration'] != 'NULL':
|
|
td = timedelta(**{unit: int(val) for unit, val in zip(["hours", "minutes", "seconds"], case_dict["duration"].split(":"))})
|
|
embed.description = embed.description + f"\n**Duration:** {humanize.precisedelta(td)} | <t:{case_dict['end_timestamp']}:R>"
|
|
|
|
embed.add_field(name='Reason', value=f"```{case_dict['reason']}```", inline=False)
|
|
return embed
|
|
|
|
raise(TypeError("'type' argument is invalid!"))
|
|
|
|
async def fetch_case(self, moderation_id: int, guild_id: str):
|
|
"""This method fetches a case from the database and returns the case's dictionary."""
|
|
database = await self.connect()
|
|
cursor = database.cursor()
|
|
|
|
query = "SELECT * FROM moderation_%s WHERE moderation_id = %s;"
|
|
cursor.execute(query, (guild_id, moderation_id))
|
|
result = cursor.fetchone()
|
|
|
|
cursor.close()
|
|
database.close()
|
|
|
|
return self.generate_dict(result)
|
|
|
|
async def log(self, interaction: discord.Interaction, moderation_id: int, resolved: bool = False):
|
|
"""This method sends a message to the guild's configured logging channel when an infraction takes place."""
|
|
logging_channel_id = await self.config.guild(interaction.guild).log_channel()
|
|
if logging_channel_id != " ":
|
|
logging_channel = interaction.guild.get_channel(logging_channel_id)
|
|
|
|
case = await self.fetch_case(moderation_id, interaction.guild.id)
|
|
if case:
|
|
embed = await self.embed_factory('log', interaction=interaction, case_dict=case, resolved=resolved)
|
|
try:
|
|
await logging_channel.send(embed=embed)
|
|
except discord.errors.Forbidden:
|
|
return
|
|
|
|
#######################################################################################################################
|
|
### COMMANDS
|
|
#######################################################################################################################
|
|
|
|
@app_commands.command(name="note")
|
|
async def note(self, interaction: discord.Interaction, target: discord.User, reason: str, silent: bool = None):
|
|
"""Add a note to a user.
|
|
|
|
Parameters
|
|
-----------
|
|
target: discord.User
|
|
Who are you noting?
|
|
reason: str
|
|
Why are you noting this user?
|
|
silent: bool
|
|
Should the user be messaged?"""
|
|
if not await self.check_moddable(target, interaction, ['moderate_members']):
|
|
return
|
|
|
|
await interaction.response.send_message(content=f"{target.mention} has recieved a note!\n**Reason** - `{reason}`")
|
|
|
|
if silent is None:
|
|
silent = not await self.config.guild(interaction.guild).dm_users()
|
|
if silent is False:
|
|
try:
|
|
embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='note', response=await interaction.original_response())
|
|
await target.send(embed=embed)
|
|
except discord.errors.HTTPException:
|
|
pass
|
|
|
|
moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'NOTE', target.id, 0, 'NULL', reason)
|
|
await interaction.edit_original_response(content=f"{target.mention} has received a note! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`")
|
|
await self.log(interaction, moderation_id)
|
|
|
|
@app_commands.command(name="warn")
|
|
async def warn(self, interaction: discord.Interaction, target: discord.Member, reason: str, silent: bool = None):
|
|
"""Warn a user.
|
|
|
|
Parameters
|
|
-----------
|
|
target: discord.Member
|
|
Who are you warning?
|
|
reason: str
|
|
Why are you warning this user?
|
|
silent: bool
|
|
Should the user be messaged?"""
|
|
if not await self.check_moddable(target, interaction, ['moderate_members']):
|
|
return
|
|
|
|
await interaction.response.send_message(content=f"{target.mention} has been warned!\n**Reason** - `{reason}`")
|
|
|
|
if silent is None:
|
|
silent = not await self.config.guild(interaction.guild).dm_users()
|
|
if silent is False:
|
|
try:
|
|
embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='warned', response=await interaction.original_response())
|
|
await target.send(embed=embed)
|
|
except discord.errors.HTTPException:
|
|
pass
|
|
|
|
moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'WARN', target.id, 0, 'NULL', reason)
|
|
await interaction.edit_original_response(content=f"{target.mention} has been warned! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`")
|
|
await self.log(interaction, moderation_id)
|
|
|
|
async def blacklist_autocomplete(self, interaction: discord.Interaction, current: str,) -> list[app_commands.Choice[str]]:
|
|
"""Autocompletes a blacklist role."""
|
|
blacklist_roles = await self.config.guild(interaction.guild).blacklist_roles()
|
|
|
|
if blacklist_roles:
|
|
return [app_commands.Choice(name=role.name, value=role.id) for role in interaction.guild.roles if role.id in blacklist_roles]
|
|
else:
|
|
return []
|
|
|
|
@app_commands.command(name="blacklist")
|
|
@app_commands.autocomplete(role=blacklist_autocomplete)
|
|
async def blacklist(self, interaction: discord.Interaction, target: discord.Member, role: str, reason: str, silent: bool = None):
|
|
"""Add a blacklist role to a user.
|
|
|
|
Parameters
|
|
-----------
|
|
target: discord.Member
|
|
Who are you blacklisting?
|
|
role: str
|
|
What blacklist type are you applying to the target?
|
|
reason: str
|
|
Why are you blacklisting this user?
|
|
silent: bool
|
|
Should the user be messaged?"""
|
|
blacklist_roles = await self.config.guild(interaction.guild).blacklist_roles()
|
|
|
|
if not blacklist_roles:
|
|
await interaction.response.send_message(content=f"There are no blacklist types set for this server!", ephemeral=True)
|
|
return
|
|
|
|
matching_role = None
|
|
|
|
for role_dict in blacklist_roles:
|
|
if role_dict['id'] == role:
|
|
matching_role = role_dict
|
|
break
|
|
|
|
if not matching_role:
|
|
await interaction.response.send_message(content=f"Please provide a valid blacklist type!", ephemeral=True)
|
|
return
|
|
|
|
if not await self.check_moddable(target, interaction, ['moderate_members', 'manage_roles']):
|
|
return
|
|
|
|
if role in [role.id for role in target.roles]:
|
|
await interaction.response.send_message(content=f"{target.mention} already has the blacklist role!", ephemeral=True)
|
|
return
|
|
|
|
role_obj = interaction.guild.get_role(role)
|
|
await target.add_roles(role, reason=f"Blacklisted by {interaction.user.id} for {humanize.precisedelta(matching_role['duration'])} for: {reason}")
|
|
await interaction.response.send_message(content=f"{target.mention} has been blacklisted with the {role_obj.name} role for {humanize.precisedelta(matching_role['duration'])}!\n**Reason** - `{reason}`")
|
|
|
|
moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'BLACKLIST', target.id, role, matching_role['duration'], reason)
|
|
await interaction.edit_original_response(content=f"{target.mention} has been blacklisted with the {role_obj.name} role for {humanize.precisedelta(matching_role['duration'])}! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`")
|
|
await self.log(interaction, moderation_id)
|
|
|
|
@app_commands.command(name="mute")
|
|
async def mute(self, interaction: discord.Interaction, target: discord.Member, duration: str, reason: str, silent: bool = None):
|
|
"""Mute a user.
|
|
|
|
Parameters
|
|
-----------
|
|
target: discord.Member
|
|
Who are you unbanning?
|
|
duration: str
|
|
How long are you muting this user for?
|
|
reason: str
|
|
Why are you unbanning this user?
|
|
silent: bool
|
|
Should the user be messaged?"""
|
|
if not await self.check_moddable(target, interaction, ['moderate_members']):
|
|
return
|
|
|
|
if target.is_timed_out() is True:
|
|
await interaction.response.send_message(f"{target.mention} is already muted!", allowed_mentions=discord.AllowedMentions(users=False), ephemeral=True)
|
|
return
|
|
|
|
try:
|
|
parsed_time = parse(sval=duration, as_timedelta=True, raise_exception=True)
|
|
except ValueError:
|
|
await interaction.response.send_message("Please provide a valid duration!", ephemeral=True)
|
|
return
|
|
|
|
if parsed_time.total_seconds() / 1000 > 2419200000:
|
|
await interaction.response.send_message("Please provide a duration that is less than 28 days.")
|
|
return
|
|
|
|
await target.timeout(parsed_time, reason=f"Muted by {interaction.user.id} for: {reason}")
|
|
|
|
await interaction.response.send_message(content=f"{target.mention} has been muted for {humanize.precisedelta(parsed_time)}!\n**Reason** - `{reason}`")
|
|
|
|
if silent is None:
|
|
silent = not await self.config.guild(interaction.guild).dm_users()
|
|
if silent is False:
|
|
try:
|
|
embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='muted', response=await interaction.original_response(), duration=parsed_time)
|
|
await target.send(embed=embed)
|
|
except discord.errors.HTTPException:
|
|
pass
|
|
|
|
moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'MUTE', target.id, 0, parsed_time, reason)
|
|
await interaction.edit_original_response(content=f"{target.mention} has been muted for {humanize.precisedelta(parsed_time)}! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`")
|
|
await self.log(interaction, moderation_id)
|
|
|
|
@app_commands.command(name="unmute")
|
|
async def unmute(self, interaction: discord.Interaction, target: discord.Member, reason: str = None, silent: bool = None):
|
|
"""Unmute a user.
|
|
|
|
Parameters
|
|
-----------
|
|
target: discord.user
|
|
Who are you unmuting?
|
|
reason: str
|
|
Why are you unmuting this user?
|
|
silent: bool
|
|
Should the user be messaged?"""
|
|
if not await self.check_moddable(target, interaction, ['moderate_members']):
|
|
return
|
|
|
|
if target.is_timed_out() is False:
|
|
await interaction.response.send_message(f"{target.mention} is not muted!", allowed_mentions=discord.AllowedMentions(users=False), ephemeral=True)
|
|
return
|
|
|
|
if reason:
|
|
await target.timeout(None, reason=f"Unmuted by {interaction.user.id} for: {reason}")
|
|
else:
|
|
await target.timeout(None, reason=f"Unbanned by {interaction.user.id}")
|
|
reason = "No reason given."
|
|
|
|
await interaction.response.send_message(content=f"{target.mention} has been unmuted!\n**Reason** - `{reason}`")
|
|
|
|
if silent is None:
|
|
silent = not await self.config.guild(interaction.guild).dm_users()
|
|
if silent is False:
|
|
try:
|
|
embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='unmuted', response=await interaction.original_response())
|
|
await target.send(embed=embed)
|
|
except discord.errors.HTTPException:
|
|
pass
|
|
|
|
moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'UNMUTE', target.id, 0, 'NULL', reason)
|
|
await interaction.edit_original_response(content=f"{target.mention} has been unmuted! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`")
|
|
await self.log(interaction, moderation_id)
|
|
|
|
@app_commands.command(name="kick")
|
|
async def kick(self, interaction: discord.Interaction, target: discord.Member, reason: str, silent: bool = None):
|
|
"""Kick a user.
|
|
|
|
Parameters
|
|
-----------
|
|
target: discord.user
|
|
Who are you kicking?
|
|
reason: str
|
|
Why are you kicking this user?
|
|
silent: bool
|
|
Should the user be messaged?"""
|
|
if not await self.check_moddable(target, interaction, ['kick_members']):
|
|
return
|
|
|
|
await interaction.response.send_message(content=f"{target.mention} has been kicked!\n**Reason** - `{reason}`")
|
|
|
|
if silent is None:
|
|
silent = not await self.config.guild(interaction.guild).dm_users()
|
|
if silent is False:
|
|
try:
|
|
embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='kicked', response=await interaction.original_response())
|
|
await target.send(embed=embed)
|
|
except discord.errors.HTTPException:
|
|
pass
|
|
|
|
await target.kick(reason=f"Kicked by {interaction.user.id} for: {reason}")
|
|
|
|
moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'KICK', target.id, 0, 'NULL', reason)
|
|
await interaction.edit_original_response(content=f"{target.mention} has been kicked! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`")
|
|
await self.log(interaction, moderation_id)
|
|
|
|
@app_commands.command(name="ban")
|
|
@app_commands.choices(delete_messages=[
|
|
Choice(name="None", value=0),
|
|
Choice(name='1 Hour', value=3600),
|
|
Choice(name='12 Hours', value=43200),
|
|
Choice(name='1 Day', value=86400),
|
|
Choice(name='3 Days', value=259200),
|
|
Choice(name='7 Days', value=604800),
|
|
])
|
|
async def ban(self, interaction: discord.Interaction, target: discord.User, reason: str, duration: str = None, delete_messages: Choice[int] = 0, silent: bool = None):
|
|
"""Ban a user.
|
|
|
|
Parameters
|
|
-----------
|
|
target: discord.user
|
|
Who are you banning?
|
|
duration: str
|
|
How long are you banning this user for?
|
|
reason: str
|
|
Why are you banning this user?
|
|
delete_messages: Choices[int]
|
|
How many days of messages to delete?
|
|
silent: bool
|
|
Should the user be messaged?"""
|
|
if not await self.check_moddable(target, interaction, ['ban_members']):
|
|
return
|
|
|
|
try:
|
|
await interaction.guild.fetch_ban(target)
|
|
await interaction.response.send_message(content=f"{target.mention} is already banned!", ephemeral=True)
|
|
return
|
|
except discord.errors.NotFound:
|
|
pass
|
|
|
|
if duration:
|
|
try:
|
|
parsed_time = parse(sval=duration, as_timedelta=True, raise_exception=True)
|
|
except ValueError:
|
|
await interaction.response.send_message("Please provide a valid duration!", ephemeral=True)
|
|
return
|
|
|
|
await interaction.response.send_message(content=f"{target.mention} has been banned for {humanize.precisedelta(parsed_time)}!\n**Reason** - `{reason}`")
|
|
|
|
try:
|
|
embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='tempbanned', response=await interaction.original_response(), duration=parsed_time)
|
|
await target.send(embed=embed)
|
|
except discord.errors.HTTPException:
|
|
pass
|
|
|
|
await interaction.guild.ban(target, reason=f"Tempbanned by {interaction.user.id} for: {reason} (Duration: {parsed_time})", delete_message_seconds=delete_messages)
|
|
|
|
moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'TEMPBAN', target.id, 0, parsed_time, reason)
|
|
await interaction.edit_original_response(content=f"{target.mention} has been banned for {humanize.precisedelta(parsed_time)}! (Case `#{moderation_id}`)\n**Reason** - `{reason}`")
|
|
await self.log(interaction, moderation_id)
|
|
else:
|
|
await interaction.response.send_message(content=f"{target.mention} has been banned!\n**Reason** - `{reason}`")
|
|
|
|
if silent is None:
|
|
silent = not await self.config.guild(interaction.guild).dm_users()
|
|
if silent is False:
|
|
try:
|
|
embed = embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='banned', response=await interaction.original_response())
|
|
await target.send(embed=embed)
|
|
except discord.errors.HTTPException:
|
|
pass
|
|
|
|
await interaction.guild.ban(target, reason=f"Banned by {interaction.user.id} for: {reason}", delete_message_seconds=delete_messages)
|
|
|
|
moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'BAN', target.id, 0, 'NULL', reason)
|
|
await interaction.edit_original_response(content=f"{target.mention} has been banned! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`")
|
|
await self.log(interaction, moderation_id)
|
|
|
|
@app_commands.command(name="unban")
|
|
async def unban(self, interaction: discord.Interaction, target: discord.User, reason: str = None, silent: bool = None):
|
|
"""Unban a user.
|
|
|
|
Parameters
|
|
-----------
|
|
target: discord.user
|
|
Who are you unbanning?
|
|
reason: str
|
|
Why are you unbanning this user?
|
|
silent: bool
|
|
Should the user be messaged?"""
|
|
if not await self.check_moddable(target, interaction, ['ban_members']):
|
|
return
|
|
|
|
try:
|
|
await interaction.guild.fetch_ban(target)
|
|
except discord.errors.NotFound:
|
|
await interaction.response.send_message(content=f"{target.mention} is not banned!", ephemeral=True)
|
|
return
|
|
|
|
if reason:
|
|
await interaction.guild.unban(target, reason=f"Unbanned by {interaction.user.id} for: {reason}")
|
|
else:
|
|
await interaction.guild.unban(target, reason=f"Unbanned by {interaction.user.id}")
|
|
reason = "No reason given."
|
|
|
|
await interaction.response.send_message(content=f"{target.mention} has been unbanned!\n**Reason** - `{reason}`")
|
|
|
|
if silent is None:
|
|
silent = not await self.config.guild(interaction.guild).dm_users()
|
|
if silent is False:
|
|
try:
|
|
embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='unbanned', response=await interaction.original_response())
|
|
await target.send(embed=embed)
|
|
except discord.errors.HTTPException:
|
|
pass
|
|
|
|
moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'UNBAN', target.id, 0, 'NULL', reason)
|
|
await interaction.edit_original_response(content=f"{target.mention} has been unbanned! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`")
|
|
await self.log(interaction, moderation_id)
|
|
|
|
@app_commands.command(name="history")
|
|
async def history(self, interaction: discord.Interaction, target: discord.User = None, moderator: discord.User = None, pagesize: app_commands.Range[int, 1, 20] = None, page: int = 1, ephemeral: bool = None, inline: bool = None, export: bool = False):
|
|
"""List previous infractions.
|
|
|
|
Parameters
|
|
-----------
|
|
target: discord.User
|
|
User whose infractions to query, overrides moderator if both are given
|
|
moderator: discord.User
|
|
Query by moderator
|
|
pagesize: app_commands.Range[int, 1, 25]
|
|
Amount of infractions to list per page
|
|
page: int
|
|
Page to select
|
|
ephemeral: bool
|
|
Hide the command response
|
|
inline: bool
|
|
Display infractions in a grid arrangement (does not look very good)
|
|
export: bool
|
|
Exports the server's entire moderation history to a JSON file"""
|
|
if ephemeral is None:
|
|
ephemeral = (await self.config.user(interaction.user).history_ephemeral()
|
|
or await self.config.guild(interaction.guild).history_ephemeral()
|
|
or False)
|
|
|
|
if inline is None:
|
|
inline = (await self.config.user(interaction.user).history_inline()
|
|
or await self.config.guild(interaction.guild).history_inline()
|
|
or False)
|
|
|
|
if pagesize is None:
|
|
if inline is True:
|
|
pagesize = (await self.config.user(interaction.user).history_inline_pagesize()
|
|
or await self.config.guild(interaction.guild).history_inline_pagesize()
|
|
or 6)
|
|
else:
|
|
pagesize = (await self.config.user(interaction.user).history_pagesize()
|
|
or await self.config.guild(interaction.guild).history_pagesize()
|
|
or 5)
|
|
|
|
|
|
await interaction.response.defer(ephemeral=ephemeral)
|
|
|
|
permissions = self.check_permissions(interaction.client.user, ['embed_links'], interaction)
|
|
if permissions:
|
|
await interaction.followup.send(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True)
|
|
return
|
|
|
|
database = await self.connect()
|
|
cursor = database.cursor()
|
|
|
|
if target:
|
|
query = """SELECT *
|
|
FROM moderation_%s
|
|
WHERE target_id = %s
|
|
ORDER BY moderation_id DESC;"""
|
|
cursor.execute(query, (interaction.guild.id, target.id))
|
|
elif moderator:
|
|
query = """SELECT *
|
|
FROM moderation_%s
|
|
WHERE moderator_id = %s
|
|
ORDER BY moderation_id DESC;"""
|
|
cursor.execute(query, (interaction.guild.id, moderator.id))
|
|
else:
|
|
query = """SELECT *
|
|
FROM moderation_%s
|
|
ORDER BY moderation_id DESC;"""
|
|
cursor.execute(query, (interaction.guild.id,))
|
|
|
|
results = cursor.fetchall()
|
|
result_dict_list = []
|
|
|
|
for result in results:
|
|
case_dict = self.generate_dict(result)
|
|
if case_dict['moderation_id'] == 0:
|
|
continue
|
|
result_dict_list.append(case_dict)
|
|
|
|
if export:
|
|
try:
|
|
filename = str(data_manager.cog_data_path(cog_instance=self)) + str(os.sep) + f"moderation_{interaction.guild.id}.json"
|
|
|
|
with open(filename, "w", encoding="utf-8") as f:
|
|
json.dump(result_dict_list, f, indent=2)
|
|
|
|
await interaction.followup.send(file=discord.File(filename, f"moderation_{interaction.guild.id}.json"), ephemeral=ephemeral)
|
|
|
|
os.remove(filename)
|
|
return
|
|
except json.JSONDecodeError as e:
|
|
await interaction.followup.send(content=f"An error occured while exporting the moderation history.\nError:\n```{e}```", ephemeral=ephemeral)
|
|
return
|
|
|
|
case_quantity = len(result_dict_list)
|
|
page_quantity = round(case_quantity / pagesize)
|
|
start_index = (page - 1) * pagesize
|
|
end_index = page * pagesize
|
|
|
|
embed = discord.Embed(color=await self.bot.get_embed_color(None))
|
|
embed.set_author(icon_url=interaction.guild.icon.url, name='Infraction History')
|
|
embed.set_footer(text=f"Page {page:,}/{page_quantity:,} | {case_quantity:,} Results")
|
|
|
|
memory_dict = {}
|
|
|
|
for case in result_dict_list[start_index:end_index]:
|
|
if case['target_id'] not in memory_dict:
|
|
memory_dict[str(case['target_id'])] = await self.fetch_user_dict(interaction, case['target_id'])
|
|
target_user = memory_dict[str(case['target_id'])]
|
|
|
|
if case['moderator_id'] not in memory_dict:
|
|
memory_dict[str(case['moderator_id'])] = await self.fetch_user_dict(interaction, case['moderator_id'])
|
|
moderator_user = memory_dict[str(case['moderator_id'])]
|
|
|
|
target_name = target_user['name'] if target_user['discriminator'] == "0" else f"{target_user['name']}#{target_user['discriminator']}"
|
|
moderator_name = moderator_user['name'] if moderator_user['discriminator'] == "0" else f"{moderator_user['name']}#{moderator_user['discriminator']}"
|
|
|
|
field_name = f"Case #{case['moderation_id']:,} ({str.title(case['moderation_type'])})"
|
|
field_value = f"**Target:** `{target_name}` ({target_user['id']})\n**Moderator:** `{moderator_name}` ({moderator_user['id']})"
|
|
|
|
if len(case['reason']) > 125:
|
|
field_value += f"\n**Reason:** `{str(case['reason'])[:125]}...`"
|
|
else:
|
|
field_value += f"\n**Reason:** `{str(case['reason'])}`"
|
|
|
|
if case['duration'] != 'NULL':
|
|
td = timedelta(**{unit: int(val) for unit, val in zip(["hours", "minutes", "seconds"], case["duration"].split(":"))})
|
|
duration_embed = f"{humanize.precisedelta(td)} | <t:{case['end_timestamp']}:R>" if bool(case['expired']) is False else f"{humanize.precisedelta(td)} | Expired"
|
|
field_value += f"\n**Duration:** {duration_embed}"
|
|
|
|
field_value += f"\n**Timestamp:** <t:{case['timestamp']}> | <t:{case['timestamp']}:R>"
|
|
|
|
if bool(case['resolved']):
|
|
field_value += "\n**Resolved:** True"
|
|
|
|
embed.add_field(name=field_name, value=field_value, inline=inline)
|
|
|
|
await interaction.followup.send(embed=embed, ephemeral=ephemeral)
|
|
|
|
@app_commands.command(name="resolve")
|
|
async def resolve(self, interaction: discord.Interaction, case: int, reason: str = None):
|
|
"""Resolve a specific case.
|
|
|
|
Parameters
|
|
-----------
|
|
case: int
|
|
Case number of the case you're trying to resolve
|
|
reason: str
|
|
Reason for resolving case"""
|
|
permissions = self.check_permissions(interaction.client.user, ['embed_links', 'moderate_members', 'ban_members'], interaction)
|
|
if permissions:
|
|
await interaction.response.send_message(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True)
|
|
return
|
|
|
|
conf = await self.check_conf(['mysql_database'])
|
|
if conf:
|
|
raise(LookupError)
|
|
|
|
database = await self.connect()
|
|
cursor = database.cursor()
|
|
db = await self.config.mysql_database()
|
|
|
|
query_1 = "SELECT * FROM moderation_%s WHERE moderation_id = %s;"
|
|
cursor.execute(query_1, (interaction.guild.id, case))
|
|
result_1 = cursor.fetchone()
|
|
if result_1 is None or case == 0:
|
|
await interaction.response.send_message(content=f"There is no moderation with a case number of {case}.", ephemeral=True)
|
|
return
|
|
|
|
query_2 = "SELECT * FROM moderation_%s WHERE moderation_id = %s AND resolved = 0;"
|
|
cursor.execute(query_2, (interaction.guild.id, case))
|
|
result_2 = cursor.fetchone()
|
|
if result_2 is None:
|
|
await interaction.response.send_message(content=f"This moderation has already been resolved!\nUse `/case {case}` for more information.", ephemeral=True)
|
|
return
|
|
|
|
case_dict = self.generate_dict(result_2)
|
|
if reason is None:
|
|
reason = "No reason given."
|
|
|
|
changes: list = case_dict['changes']
|
|
if len(changes) > 25:
|
|
await interaction.response.send_message(content="Due to limitations with Discord's embed system, you cannot edit a case more than 25 times.", ephemeral=True)
|
|
return
|
|
if not changes:
|
|
changes.append(
|
|
{
|
|
'type': "ORIGINAL",
|
|
'timestamp': case_dict['timestamp'],
|
|
'reason': case_dict['reason'],
|
|
'user_id': case_dict['moderator_id']
|
|
}
|
|
)
|
|
changes.append(
|
|
{
|
|
'type': "RESOLVE",
|
|
'timestamp': int(time.time()),
|
|
'reason': reason,
|
|
'user_id': interaction.user.id
|
|
}
|
|
)
|
|
|
|
if case_dict['moderation_type'] in ['UNMUTE', 'UNBAN']:
|
|
await interaction.response.send_message(content="You cannot resolve this type of moderation!", ephemeral=True)
|
|
|
|
if case_dict['moderation_type'] in ['MUTE', 'TEMPBAN', 'BAN']:
|
|
if case_dict['moderation_type'] == 'MUTE':
|
|
try:
|
|
member = await interaction.guild.fetch_member(case_dict['target_id'])
|
|
|
|
await member.timeout(None, reason=f"Case #{case:,} resolved by {interaction.user.id}")
|
|
except discord.NotFound:
|
|
pass
|
|
|
|
if case_dict['moderation_type'] in ['TEMPBAN', 'BAN']:
|
|
try:
|
|
user = await interaction.client.fetch_user(case_dict['target_id'])
|
|
|
|
await interaction.guild.unban(user, reason=f"Case #{case} resolved by {interaction.user.id}")
|
|
except discord.NotFound:
|
|
pass
|
|
|
|
resolve_query = f"UPDATE `{db}`.`moderation_{interaction.guild.id}` SET resolved = 1, changes = %s, resolved_by = %s, resolve_reason = %s WHERE moderation_id = %s"
|
|
else:
|
|
resolve_query = f"UPDATE `{db}`.`moderation_{interaction.guild.id}` SET resolved = 1, changes = %s, resolved_by = %s, resolve_reason = %s WHERE moderation_id = %s"
|
|
|
|
cursor.execute(resolve_query, (json.dumps(changes), interaction.user.id, reason, case_dict['moderation_id']))
|
|
database.commit()
|
|
|
|
embed = await self.embed_factory('case', interaction=interaction, case_dict=await self.fetch_case(case, interaction.guild.id))
|
|
await interaction.response.send_message(content=f"✅ Moderation #{case:,} resolved!", embed=embed)
|
|
await self.log(interaction, case, True)
|
|
|
|
cursor.close()
|
|
database.close()
|
|
|
|
@app_commands.command(name="case")
|
|
@app_commands.choices(export=[
|
|
Choice(name='Export as File', value='file'),
|
|
Choice(name='Export as Codeblock', value='codeblock')
|
|
])
|
|
async def case(self, interaction: discord.Interaction, case: int, ephemeral: bool = None, changes: bool = False, export: Choice[str] = None):
|
|
"""Check the details of a specific case.
|
|
|
|
Parameters
|
|
-----------
|
|
case: int
|
|
What case are you looking up?
|
|
ephemeral: bool
|
|
Hide the command response
|
|
changes: bool
|
|
List the changes made to the case
|
|
export: bool
|
|
Export the case to a JSON file or codeblock"""
|
|
permissions = self.check_permissions(interaction.client.user, ['embed_links'], interaction)
|
|
if permissions:
|
|
await interaction.response.send_message(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True)
|
|
return
|
|
|
|
if ephemeral is None:
|
|
ephemeral = (await self.config.user(interaction.user).history_ephemeral()
|
|
or await self.config.guild(interaction.guild).history_ephemeral()
|
|
or False)
|
|
|
|
if case != 0:
|
|
case_dict = await self.fetch_case(case, interaction.guild.id)
|
|
if case_dict:
|
|
if export:
|
|
if export.value == 'file' or len(str(case_dict)) > 1800:
|
|
filename = str(data_manager.cog_data_path(cog_instance=self)) + str(os.sep) + f"moderation_{interaction.guild.id}_case_{case}.json"
|
|
|
|
with open(filename, "w", encoding="utf-8") as f:
|
|
json.dump(case_dict, f, indent=2)
|
|
|
|
if export.value == 'codeblock':
|
|
content = f"Case #{case:,} exported.\n*Case was too large to export as codeblock, so it has been uploaded as a `.json` file.*"
|
|
else:
|
|
content = f"Case #{case:,} exported."
|
|
|
|
await interaction.response.send_message(content=content, file=discord.File(filename, f"moderation_{interaction.guild.id}_case_{case}.json"), ephemeral=ephemeral)
|
|
|
|
os.remove(filename)
|
|
return
|
|
await interaction.response.send_message(content=f"```json\n{json.dumps(case_dict, indent=2)}```", ephemeral=ephemeral)
|
|
return
|
|
if changes:
|
|
embed = await self.embed_factory('changes', interaction=interaction, case_dict=case_dict)
|
|
else:
|
|
embed = await self.embed_factory('case', interaction=interaction, case_dict=case_dict)
|
|
await interaction.response.send_message(embed=embed, ephemeral=ephemeral)
|
|
return
|
|
await interaction.response.send_message(content=f"No case with case number `{case}` found.", ephemeral=True)
|
|
|
|
@app_commands.command(name="edit")
|
|
async def edit(self, interaction: discord.Interaction, case: int, reason: str, duration: str = None):
|
|
"""Edit the reason of a specific case.
|
|
|
|
Parameters
|
|
-----------
|
|
case: int
|
|
What case are you editing?
|
|
reason: str
|
|
What is the new reason?
|
|
duration: str
|
|
What is the new duration? Does not reapply the moderation if it has already expired."""
|
|
permissions = self.check_permissions(interaction.client.user, ['embed_links'], interaction)
|
|
if permissions:
|
|
await interaction.response.send_message(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True)
|
|
return
|
|
|
|
if case != 0:
|
|
parsed_time = None
|
|
case_dict = await self.fetch_case(case, interaction.guild.id)
|
|
if case_dict:
|
|
conf = await self.check_conf(['mysql_database'])
|
|
if conf:
|
|
raise(LookupError)
|
|
|
|
if duration:
|
|
try:
|
|
parsed_time = parse(sval=duration, as_timedelta=True, raise_exception=True)
|
|
except ValueError:
|
|
await interaction.response.send_message("Please provide a valid duration!", ephemeral=True)
|
|
return
|
|
|
|
end_timestamp = case_dict['timestamp'] + parsed_time.total_seconds()
|
|
|
|
if case_dict['type'] == 'MUTE':
|
|
if (time.time() - case_dict['timestamp']) + parsed_time.total_seconds() > 2419200:
|
|
await interaction.response.send_message("Please provide a duration that is less than 28 days from the initial moderation.")
|
|
return
|
|
|
|
try:
|
|
member = await interaction.guild.fetch_member(case_dict['target_id'])
|
|
|
|
await member.timeout(parsed_time, reason=f"Case #{case:,} edited by {interaction.user.id}")
|
|
except discord.NotFound:
|
|
pass
|
|
|
|
changes: list = case_dict['changes']
|
|
if len(changes) > 25:
|
|
await interaction.response.send_message(content="Due to limitations with Discord's embed system, you cannot edit a case more than 25 times.", ephemeral=True)
|
|
return
|
|
if not changes:
|
|
changes.append(
|
|
{
|
|
'type': "ORIGINAL",
|
|
'timestamp': case_dict['timestamp'],
|
|
'reason': case_dict['reason'],
|
|
'user_id': case_dict['moderator_id'],
|
|
'duration': case_dict['duration'],
|
|
'end_timestamp': case_dict['end_timestamp']
|
|
}
|
|
)
|
|
if parsed_time:
|
|
changes.append(
|
|
{
|
|
'type': "EDIT",
|
|
'timestamp': int(time.time()),
|
|
'reason': reason,
|
|
'user_id': interaction.user.id,
|
|
'duration': parsed_time,
|
|
'end_timestamp': end_timestamp
|
|
}
|
|
)
|
|
else:
|
|
changes.append(
|
|
{
|
|
'type': "EDIT",
|
|
'timestamp': int(time.time()),
|
|
'reason': reason,
|
|
'user_id': interaction.user.id,
|
|
'duration': case_dict['duration'],
|
|
'end_timestamp': case_dict['end_timestamp']
|
|
}
|
|
)
|
|
|
|
database = await self.connect()
|
|
cursor = database.cursor()
|
|
db = await self.config.mysql_database()
|
|
|
|
if parsed_time:
|
|
update_query = f"UPDATE `{db}`.`moderation_{interaction.guild.id}` SET changes = %s, reason = %s, duration = %s, end_timestamp = %s WHERE moderation_id = %s"
|
|
cursor.execute(update_query, (json.dumps(changes), reason, parsed_time, end_timestamp, case))
|
|
else:
|
|
update_query = f"UPDATE `{db}`.`moderation_{interaction.guild.id}` SET changes = %s, reason = %s WHERE moderation_id = %s"
|
|
cursor.execute(update_query, (json.dumps(changes), reason, case))
|
|
database.commit()
|
|
|
|
new_case = await self.fetch_case(case, interaction.guild.id)
|
|
embed = await self.embed_factory('case', interaction=interaction, case_dict=new_case)
|
|
|
|
await interaction.response.send_message(content=f"✅ Moderation #{case:,} edited!", embed=embed, ephemeral=True)
|
|
await self.log(interaction, case)
|
|
|
|
cursor.close()
|
|
database.close()
|
|
return
|
|
await interaction.response.send_message(content=f"No case with case number `{case}` found.", ephemeral=True)
|
|
|
|
@tasks.loop(minutes=1)
|
|
async def handle_expiry(self):
|
|
conf = await self.check_conf(['mysql_database'])
|
|
if conf:
|
|
raise(LookupError)
|
|
|
|
database = await self.connect()
|
|
cursor = database.cursor()
|
|
db = await self.config.mysql_database()
|
|
|
|
guilds: list[discord.Guild] = self.bot.guilds
|
|
for guild in guilds:
|
|
if not await self.bot.cog_disabled_in_guild(self, guild):
|
|
|
|
tempban_query = f"SELECT target_id, moderation_id FROM moderation_{guild.id} WHERE end_timestamp != 0 AND end_timestamp <= %s AND moderation_type = 'TEMPBAN' AND expired = 0"
|
|
|
|
try:
|
|
cursor.execute(tempban_query, (time.time(),))
|
|
result = cursor.fetchall()
|
|
except mysql.connector.errors.ProgrammingError:
|
|
continue
|
|
|
|
target_ids = [row[0] for row in result]
|
|
moderation_ids = [row[1] for row in result]
|
|
|
|
for target_id, moderation_id in zip(target_ids, moderation_ids):
|
|
user: discord.User = await self.bot.fetch_user(target_id)
|
|
try:
|
|
await guild.unban(user, reason=f"Automatic unban from case #{moderation_id}")
|
|
|
|
embed = await self.embed_factory('message', guild, f'Automatic unban from case #{moderation_id}', 'unbanned')
|
|
|
|
try:
|
|
await user.send(embed=embed)
|
|
except discord.errors.HTTPException:
|
|
pass
|
|
except [discord.errors.NotFound, discord.errors.Forbidden, discord.errors.HTTPException] as e:
|
|
print(f"Failed to unban {user.name}#{user.discriminator} ({user.id}) from {guild.name} ({guild.id})\n{e}")
|
|
|
|
expiry_query = f"UPDATE `{db}`.`moderation_{guild.id}` SET expired = 1 WHERE (end_timestamp != 0 AND end_timestamp <= %s AND expired = 0 AND moderation_type != 'BLACKLIST') OR (expired = 0 AND resolved = 1 AND moderation_type != 'BLACKLIST')"
|
|
cursor.execute(expiry_query, (time.time(),))
|
|
|
|
blacklist_query = f"SELECT target_id, moderation_id, role_id FROM moderation_{guild.id} WHERE end_timestamp != 0 AND end_timestamp <= %s AND moderation_type = 'BLACKLIST' AND expired = 0"
|
|
try:
|
|
cursor.execute(blacklist_query, (time.time(),))
|
|
result = cursor.fetchall()
|
|
except mysql.connector.errors.ProgrammingError:
|
|
|
|
continue
|
|
target_ids = [row[0] for row in result]
|
|
moderation_ids = [row[1] for row in result]
|
|
role_ids = [row[2] for row in result]
|
|
|
|
for target_id, moderation_id, role_id in zip(target_ids, moderation_ids, role_ids):
|
|
try:
|
|
member: discord.Member = await guild.fetch_member(target_id)
|
|
|
|
role: discord.Role = guild.get_role(role_id)
|
|
if role is None:
|
|
raise discord.errors.NotFound
|
|
except [discord.errors.NotFound, discord.errors.Forbidden, discord.errors.HTTPException]:
|
|
continue
|
|
|
|
database.commit()
|
|
cursor.close()
|
|
database.close()
|
|
|
|
#######################################################################################################################
|
|
### CONFIGURATION COMMANDS
|
|
#######################################################################################################################
|
|
|
|
@commands.group(autohelp=True, aliases=['modset', 'moderationsettings'])
|
|
async def moderationset(self, ctx: commands.Context):
|
|
"""Manage moderation commands."""
|
|
|
|
@moderationset.command(name='list', aliases=['view', 'show'])
|
|
async def moderationset_list(self, ctx: commands.Context):
|
|
"""List all moderation settings."""
|
|
if ctx.guild:
|
|
guild_settings = await self.config.guild(ctx.guild).all()
|
|
|
|
guild_settings_string = ""
|
|
for setting in guild_settings:
|
|
if 'mysql' in setting or 'roles' in setting:
|
|
continue
|
|
if setting == 'log_channel':
|
|
channel = ctx.guild.get_channel(guild_settings[setting])
|
|
guild_settings_string += f"**{setting}**: {channel.mention}\n"
|
|
else:
|
|
guild_settings_string += f"**{setting}**: {guild_settings[setting]}\n"
|
|
|
|
user_settings = await self.config.user(ctx.author).all()
|
|
user_settings_string = ""
|
|
for setting in user_settings:
|
|
user_settings_string += f"**{setting}**: {user_settings[setting]}\n"
|
|
|
|
embed = discord.Embed(color=await self.bot.get_embed_color(None))
|
|
embed.set_author(icon_url=ctx.guild.icon.url, name=f"{ctx.guild.name} Moderation Settings")
|
|
if ctx.guild:
|
|
embed.add_field(name="Guild Settings", value=guild_settings_string)
|
|
embed.add_field(name="User Settings", value=user_settings_string)
|
|
|
|
await ctx.send(embed=embed)
|
|
|
|
@moderationset.group(autohelp=True, name='history')
|
|
async def moderationset_history(self, ctx: commands.Context):
|
|
"""Manage configuration for the /history command."""
|
|
|
|
@moderationset_history.command(name='ephemeral', aliases=['hidden', 'hide'])
|
|
async def moderationset_history_user_ephemeral(self, ctx: commands.Context, enabled: bool):
|
|
"""Toggle if the /history command should be ephemeral."""
|
|
await self.config.user(ctx.author).history_ephemeral.set(enabled)
|
|
await ctx.send(f"Ephemeral setting set to {enabled}")
|
|
|
|
@moderationset_history.command(name='pagesize')
|
|
async def moderationset_history_user_pagesize(self, ctx: commands.Context, pagesize: int):
|
|
"""Set the amount of cases to display per page."""
|
|
if pagesize > 20:
|
|
await ctx.send("Pagesize cannot be greater than 20!")
|
|
return
|
|
if pagesize < 1:
|
|
await ctx.send("Pagesize cannot be less than 1!")
|
|
return
|
|
await self.config.user(ctx.author).history_pagesize.set(pagesize)
|
|
await ctx.send(f"Pagesize set to {await self.config.user(ctx.author).history_pagesize()}")
|
|
|
|
@moderationset_history.group(name='inline')
|
|
async def moderationset_history_inline(self, ctx: commands.Context):
|
|
"""Manage configuration for the /history command's inline argument."""
|
|
|
|
@moderationset_history_inline.command(name='toggle')
|
|
async def moderationset_history_user_inline_toggle(self, ctx: commands.Context, enabled: bool):
|
|
"""Enable the /history command's inline argument by default."""
|
|
await self.config.user(ctx.author).history_inline.set(enabled)
|
|
await ctx.send(f"Inline setting set to {enabled}")
|
|
|
|
@moderationset_history_inline.command(name='pagesize')
|
|
async def moderationset_history_user_inline_pagesize(self, ctx: commands.Context, pagesize: int):
|
|
"""Set the amount of cases to display per page."""
|
|
if pagesize > 20:
|
|
await ctx.send("Pagesize cannot be greater than 20!")
|
|
return
|
|
if pagesize < 1:
|
|
await ctx.send("Pagesize cannot be less than 1!")
|
|
return
|
|
await self.config.user(ctx.author).history_inline_pagesize.set(pagesize)
|
|
await ctx.send(f"Inline pagesize set to {await self.config.user(ctx.author).history_inline_pagesize()}")
|
|
|
|
@moderationset_history.group(autohelp=True, name='guild')
|
|
@checks.admin()
|
|
async def moderationset_history_guild(self, ctx: commands.Context):
|
|
"""Manage configuration for the /history command, per guild."""
|
|
|
|
@moderationset_history_guild.command(name='ephemeral', aliases=['hidden', 'hide'])
|
|
@checks.admin()
|
|
async def moderationset_history_guild_ephemeral(self, ctx: commands.Context, enabled: bool):
|
|
"""Toggle if the /history command should be ephemeral."""
|
|
await self.config.guild(ctx.guild).history_ephemeral.set(enabled)
|
|
await ctx.send(f"Ephemeral setting set to {enabled}")
|
|
|
|
@moderationset_history_guild.command(name='pagesize')
|
|
@checks.admin()
|
|
async def moderationset_history_guild_pagesize(self, ctx: commands.Context, pagesize: int):
|
|
"""Set the amount of cases to display per page."""
|
|
if pagesize > 20:
|
|
await ctx.send("Pagesize cannot be greater than 20!")
|
|
return
|
|
if pagesize < 1:
|
|
await ctx.send("Pagesize cannot be less than 1!")
|
|
return
|
|
await self.config.guild(ctx.guild).history_pagesize.set(pagesize)
|
|
await ctx.send(f"Pagesize set to {await self.config.guild(ctx.guild).history_pagesize()}")
|
|
|
|
@moderationset_history_guild.group(name='inline')
|
|
@checks.admin()
|
|
async def moderationset_history_guild_inline(self, ctx: commands.Context):
|
|
"""Manage configuration for the /history command's inline argument."""
|
|
|
|
@moderationset_history_guild_inline.command(name='toggle')
|
|
@checks.admin()
|
|
async def moderationset_history_guild_inline_toggle(self, ctx: commands.Context, enabled: bool):
|
|
"""Enable the /history command's inline argument by default."""
|
|
await self.config.guild(ctx.guild).history_inline.set(enabled)
|
|
await ctx.send(f"Inline setting set to {enabled}")
|
|
|
|
@moderationset_history_guild_inline.command(name='pagesize')
|
|
@checks.admin()
|
|
async def moderationset_history_guild_inline_pagesize(self, ctx: commands.Context, pagesize: int):
|
|
"""Set the amount of cases to display per page."""
|
|
if pagesize > 20:
|
|
await ctx.send("Pagesize cannot be greater than 20!")
|
|
return
|
|
if pagesize < 1:
|
|
await ctx.send("Pagesize cannot be less than 1!")
|
|
return
|
|
await self.config.guild(ctx.guild).history_inline_pagesize.set(pagesize)
|
|
await ctx.send(f"Inline pagesize set to {await self.config.guild(ctx.guild).history_inline_pagesize()}")
|
|
|
|
@moderationset.group(autohelp=True, name='immunity')
|
|
@checks.admin()
|
|
async def moderationset_immunity(self, ctx: commands.Context):
|
|
"""Manage configuration for immune roles."""
|
|
|
|
@moderationset_immunity.command(name='add')
|
|
@checks.admin()
|
|
async def moderationset_immunity_add(self, ctx: commands.Context, role: discord.Role):
|
|
"""Add a role to the immune roles list."""
|
|
immune_roles: list = await self.config.guild(ctx.guild).immune_roles()
|
|
if role.id in immune_roles:
|
|
await ctx.send("Role is already immune!")
|
|
return
|
|
immune_roles.append(role.id)
|
|
await self.config.guild(ctx.guild).immune_roles.set(immune_roles)
|
|
await ctx.send(f"Role {role.name} added to immune roles.")
|
|
|
|
@moderationset_immunity.command(name='remove')
|
|
@checks.admin()
|
|
async def moderationset_immunity_remove(self, ctx: commands.Context, role: discord.Role):
|
|
"""Remove a role from the immune roles list."""
|
|
immune_roles: list = await self.config.guild(ctx.guild).immune_roles()
|
|
if role.id not in immune_roles:
|
|
await ctx.send("Role is not immune!")
|
|
return
|
|
immune_roles.remove(role.id)
|
|
await self.config.guild(ctx.guild).immune_roles.set(immune_roles)
|
|
await ctx.send(f"Role {role.name} removed from immune roles.")
|
|
|
|
@moderationset_immunity.command(name='list')
|
|
@checks.admin()
|
|
async def moderationset_immunity_list(self, ctx: commands.Context):
|
|
"""List all immune roles."""
|
|
immune_roles: list = await self.config.guild(ctx.guild).immune_roles()
|
|
if not immune_roles:
|
|
await ctx.send("No immune roles set!")
|
|
return
|
|
role_list = ""
|
|
for role_id in immune_roles:
|
|
role = ctx.guild.get_role(role_id)
|
|
if role:
|
|
role_list += f"{role.mention}\n"
|
|
if role_list:
|
|
embed = discord.Embed(title="Immune Roles", description=role_list, color=await self.bot.get_embed_color(None))
|
|
await ctx.send(embed=embed)
|
|
|
|
@moderationset.group(autohelp=True, name='blacklist')
|
|
@checks.admin()
|
|
async def moderationset_blacklist(self, ctx: commands.Context):
|
|
"""Manage configuration for the /blacklist command."""
|
|
|
|
@moderationset_blacklist.command(name='add')
|
|
@checks.admin()
|
|
async def moderationset_blacklist_add(self, ctx: commands.Context, role: discord.Role, duration: str):
|
|
"""Add a role to the blacklist."""
|
|
blacklist_roles: list = await self.config.guild(ctx.guild).blacklist_roles()
|
|
for blacklist_role in blacklist_roles:
|
|
if role.id == blacklist_role['role']:
|
|
await ctx.send("Role already has an associated blacklist type!")
|
|
return
|
|
|
|
try:
|
|
parsed_time = parse(sval=duration, as_timedelta=True, raise_exception=True)
|
|
except ValueError:
|
|
await ctx.send("Please provide a valid duration!")
|
|
return
|
|
|
|
blacklist_roles.append(
|
|
{
|
|
'role': role.id,
|
|
'duration': str(parsed_time)
|
|
}
|
|
)
|
|
await self.config.guild(ctx.guild).blacklist_roles.set(blacklist_roles)
|
|
await ctx.send(f"Role {role.mention} added as a blacklist type.", allowed_mentions=discord.AllowedMentions.none())
|
|
|
|
@moderationset_blacklist.command(name='remove')
|
|
@checks.admin()
|
|
async def moderationset_blacklist_remove(self, ctx: commands.Context, role: discord.Role):
|
|
"""Remove a role's blacklist type."""
|
|
blacklist_roles: list = await self.config.guild(ctx.guild).blacklist_roles()
|
|
for blacklist_role in blacklist_roles:
|
|
if role.id == blacklist_role['role']:
|
|
blacklist_roles.remove(blacklist_role)
|
|
await self.config.guild(ctx.guild).blacklist_roles.set(blacklist_roles)
|
|
await ctx.send(f"Role {role.mention} removed from blacklist types.", allowed_mentions=discord.AllowedMentions.none())
|
|
return
|
|
await ctx.send("Role does not have an associated blacklist type!")
|
|
|
|
@moderationset_blacklist.command(name='list')
|
|
@checks.admin()
|
|
async def moderationset_blacklist_list(self, ctx: commands.Context):
|
|
"""List all blacklist types."""
|
|
blacklist_roles: list = await self.config.guild(ctx.guild).blacklist_roles()
|
|
if not blacklist_roles:
|
|
await ctx.send("No blacklist types set!")
|
|
return
|
|
blacklist_list = ""
|
|
for blacklist_role in blacklist_roles:
|
|
role = ctx.guild.get_role(blacklist_role['role'])
|
|
if role:
|
|
blacklist_list += f"{role.mention} - {blacklist_role['duration']}\n"
|
|
if blacklist_list:
|
|
embed = discord.Embed(title="Blacklist Types", description=blacklist_list, color=await self.bot.get_embed_color(None))
|
|
await ctx.send(embed=embed)
|
|
|
|
@moderationset.command(name="ignorebots")
|
|
@checks.admin()
|
|
async def moderationset_ignorebots(self, ctx: commands.Context):
|
|
"""Toggle if the cog should ignore other bots' moderations."""
|
|
await self.config.guild(ctx.guild).ignore_other_bots.set(not await self.config.guild(ctx.guild).ignore_other_bots())
|
|
await ctx.send(f"Ignore bots setting set to {await self.config.guild(ctx.guild).ignore_other_bots()}")
|
|
|
|
@moderationset.command(name="dm")
|
|
@checks.admin()
|
|
async def moderationset_dm(self, ctx: commands.Context):
|
|
"""Toggle automatically messaging moderated users.
|
|
|
|
This option can be overridden by specifying the `silent` argument in any moderation command."""
|
|
await self.config.guild(ctx.guild).dm_users.set(not await self.config.guild(ctx.guild).dm_users())
|
|
await ctx.send(f"DM users setting set to {await self.config.guild(ctx.guild).dm_users()}")
|
|
|
|
@moderationset.command(name="permissions")
|
|
@checks.admin()
|
|
async def moderationset_permissions(self, ctx: commands.Context):
|
|
"""Toggle whether the bot will check for discord permissions."""
|
|
await self.config.guild(ctx.guild).use_discord_permissions.set(not await self.config.guild(ctx.guild).use_discord_permissions())
|
|
await ctx.send(f"Use Discord Permissions setting set to {await self.config.guild(ctx.guild).use_discord_permissions()}")
|
|
|
|
@moderationset.command(name="logchannel")
|
|
@checks.admin()
|
|
async def moderationset_logchannel(self, ctx: commands.Context, channel: discord.TextChannel = None):
|
|
"""Set a channel to log infractions to."""
|
|
if channel:
|
|
await self.config.guild(ctx.guild).log_channel.set(channel.id)
|
|
await ctx.send(f"Logging channel set to {channel.mention}.")
|
|
else:
|
|
await self.config.guild(ctx.guild).log_channel.set(" ")
|
|
await ctx.send("Logging channel disabled.")
|
|
|
|
@moderationset.command(name="mysql")
|
|
@checks.is_owner()
|
|
async def moderationset_mysql(self, ctx: commands.Context):
|
|
"""Configure MySQL connection details."""
|
|
await ctx.message.add_reaction("✅")
|
|
await ctx.author.send(content="Click the button below to configure your MySQL connection details.", view=self.ConfigButtons(60))
|
|
|
|
class ConfigButtons(discord.ui.View):
|
|
def __init__(self, timeout):
|
|
super().__init__()
|
|
self.config = Config.get_conf(None, cog_name='Moderation', identifier=481923957134912)
|
|
|
|
@discord.ui.button(label="Edit", style=discord.ButtonStyle.success)
|
|
async def config_button(self, interaction: discord.Interaction, button: discord.ui.Button): # pylint: disable=unused-argument
|
|
await interaction.response.send_modal(Moderation.MySQLConfigModal(self.config))
|
|
|
|
class MySQLConfigModal(discord.ui.Modal, title="MySQL Database Configuration"):
|
|
def __init__(self, config):
|
|
super().__init__()
|
|
self.config = config
|
|
address = discord.ui.TextInput(
|
|
label="Address",
|
|
placeholder="Input your MySQL address here.",
|
|
style=discord.TextStyle.short,
|
|
required=False,
|
|
max_length=300
|
|
)
|
|
database = discord.ui.TextInput(
|
|
label="Database",
|
|
placeholder="Input the name of your database here.",
|
|
style=discord.TextStyle.short,
|
|
required=False,
|
|
max_length=300
|
|
)
|
|
username = discord.ui.TextInput(
|
|
label="Username",
|
|
placeholder="Input your MySQL username here.",
|
|
style=discord.TextStyle.short,
|
|
required=False,
|
|
max_length=300
|
|
)
|
|
password = discord.ui.TextInput(
|
|
label="Password",
|
|
placeholder="Input your MySQL password here.",
|
|
style=discord.TextStyle.short,
|
|
required=False,
|
|
max_length=300
|
|
)
|
|
|
|
async def on_submit(self, interaction: discord.Interaction):
|
|
message = ""
|
|
|
|
if self.address.value != "":
|
|
await self.config.mysql_address.set(self.address.value)
|
|
message += f"- Address set to\n - `{self.address.value}`\n"
|
|
|
|
if self.database.value != "":
|
|
await self.config.mysql_database.set(self.database.value)
|
|
message += f"- Database set to\n - `{self.database.value}`\n"
|
|
|
|
if self.username.value != "":
|
|
await self.config.mysql_username.set(self.username.value)
|
|
message += f"- Username set to\n - `{self.username.value}`\n"
|
|
|
|
if self.password.value != "":
|
|
await self.config.mysql_password.set(self.password.value)
|
|
trimmed_password = self.password.value[:8]
|
|
message += f"- Password set to\n - `{trimmed_password}` - Trimmed for security\n"
|
|
|
|
if message == "":
|
|
trimmed_password = str(await self.config.mysql_password())[:8]
|
|
send = f"No changes were made.\nCurrent configuration:\n- Address:\n - `{await self.config.mysql_address()}`\n- Database:\n - `{await self.config.mysql_database()}`\n- Username:\n - `{await self.config.mysql_username()}`\n- Password:\n - `{trimmed_password}` - Trimmed for security"
|
|
|
|
else:
|
|
send = f"Configuration changed:\n{message}"
|
|
|
|
await interaction.response.send_message(send, ephemeral=True)
|
|
|
|
@moderationset.group(autohelp=True, name='import')
|
|
@checks.admin()
|
|
async def moderationset_import(self, ctx: commands.Context):
|
|
"""Import moderations from other bots."""
|
|
|
|
@moderationset_import.command(name="galacticbot")
|
|
@checks.admin()
|
|
async def moderationset_import_galacticbot(self, ctx: commands.Context):
|
|
"""Import moderations from GalacticBot."""
|
|
if ctx.message.attachments and ctx.message.attachments[0].content_type == 'application/json; charset=utf-8':
|
|
message = await ctx.send("Are you sure you want to import GalacticBot moderations?\n**This will overwrite any moderations that already exist in this guild's moderation table.**\n*The import process will block the rest of your bot until it is complete.*")
|
|
await message.edit(view=self.GalacticBotImportButtons(60, ctx, message, self))
|
|
else:
|
|
await ctx.send("Please provide a valid GalacticBot moderation export file.")
|
|
|
|
class GalacticBotImportButtons(discord.ui.View):
|
|
def __init__(self, timeout, ctx, message, cog_instance):
|
|
super().__init__()
|
|
self.cog_instance = cog_instance
|
|
self.ctx: commands.Context = ctx
|
|
self.message: discord.Message = message
|
|
self.config = Config.get_conf(None, cog_name='Moderation', identifier=481923957134912)
|
|
|
|
@discord.ui.button(label="Yes", style=discord.ButtonStyle.success)
|
|
async def import_button_y(self, interaction: discord.Interaction, button: discord.ui.Button): # pylint: disable=unused-argument
|
|
await self.message.delete()
|
|
await interaction.response.send_message("Deleting original table...", ephemeral=True)
|
|
|
|
database = await Moderation.connect(self.cog_instance)
|
|
cursor = database.cursor()
|
|
|
|
query = f"DROP TABLE IF EXISTS moderation_{self.ctx.guild.id};"
|
|
cursor.execute(query)
|
|
|
|
cursor.close()
|
|
database.commit()
|
|
|
|
await interaction.edit_original_response(content="Creating new table...")
|
|
|
|
await Moderation.create_guild_table(self.cog_instance, self.ctx.guild)
|
|
|
|
await interaction.edit_original_response(content="Importing moderations...")
|
|
|
|
accepted_types = [
|
|
'NOTE',
|
|
'WARN',
|
|
'MUTE',
|
|
'UNMUTE',
|
|
'KICK',
|
|
'SOFTBAN',
|
|
'BAN',
|
|
'UNBAN'
|
|
]
|
|
|
|
file = await self.ctx.message.attachments[0].read()
|
|
data = sorted(json.loads(file), key=lambda x: x['case'])
|
|
|
|
failed_cases = []
|
|
|
|
for case in data:
|
|
if case['type'] not in accepted_types:
|
|
continue
|
|
|
|
timestamp = round(case['timestamp'] / 1000)
|
|
|
|
try:
|
|
if case['duration'] is not None and float(case['duration']) != 0:
|
|
duration = timedelta(seconds=round(float(case['duration']) / 1000))
|
|
else:
|
|
duration = 'NULL'
|
|
except OverflowError:
|
|
failed_cases.append(case['case'])
|
|
continue
|
|
|
|
if case['resolved']:
|
|
resolved = 1
|
|
resolved_by = None
|
|
resolved_reason = None
|
|
resolved_timestamp = None
|
|
if case['changes']:
|
|
for change in case['changes']:
|
|
if change['type'] == 'RESOLVE':
|
|
resolved_by = change['staff']
|
|
resolved_reason = change['reason']
|
|
resolved_timestamp = round(change['timestamp'] / 1000)
|
|
break
|
|
if resolved_by is None:
|
|
resolved_by = '?'
|
|
if resolved_reason is None:
|
|
resolved_reason = 'Could not get resolve reason during moderation import.'
|
|
if resolved_timestamp is None:
|
|
resolved_timestamp = timestamp
|
|
changes = [
|
|
{
|
|
'type': "ORIGINAL",
|
|
'reason': case['reason'],
|
|
'user_id': case['executor'],
|
|
'timestamp': timestamp
|
|
},
|
|
{
|
|
'type': "RESOLVE",
|
|
'reason': resolved_reason,
|
|
'user_id': resolved_by,
|
|
'timestamp': resolved_timestamp
|
|
}
|
|
]
|
|
else:
|
|
resolved = 0
|
|
resolved_by = 'NULL'
|
|
resolved_reason = 'NULL'
|
|
changes = []
|
|
|
|
await Moderation.mysql_log(
|
|
self.cog_instance,
|
|
self.ctx.guild.id,
|
|
case['executor'],
|
|
case['type'],
|
|
case['target'],
|
|
0,
|
|
duration,
|
|
case['reason'],
|
|
timestamp=timestamp,
|
|
resolved=resolved,
|
|
resolved_by=resolved_by,
|
|
resolved_reason=resolved_reason,
|
|
changes=changes,
|
|
metadata={
|
|
'imported_from': 'GalacticBot'
|
|
},
|
|
database=database
|
|
)
|
|
|
|
await interaction.edit_original_response(content="Import complete.")
|
|
if failed_cases:
|
|
await interaction.edit_original_response(content=f"Import complete.\n*Failed to import the following cases:*\n```{failed_cases}```")
|
|
|
|
@discord.ui.button(label="No", style=discord.ButtonStyle.danger)
|
|
async def import_button_n(self, interaction: discord.Interaction, button: discord.ui.Button): # pylint: disable=unused-argument
|
|
await self.message.edit("Import cancelled.", view=None)
|
|
await self.message.delete(10)
|
|
await self.ctx.message.delete(10)
|
|
|
|
@commands.command(aliases=["tdc"])
|
|
async def timedeltaconvert(self, ctx: commands.Context, *, duration: str):
|
|
"""This command converts a duration to a [`timedelta`](https://docs.python.org/3/library/datetime.html#datetime.timedelta) Python object.
|
|
|
|
**Example usage**
|
|
`[p]timedeltaconvert 1 day 15hr 82 minutes 52s`
|
|
**Output**
|
|
`1 day, 16:22:52`"""
|
|
try:
|
|
parsed_time = parse(duration, as_timedelta=True, raise_exception=True)
|
|
await ctx.send(f"`{str(parsed_time)}`")
|
|
except ValueError:
|
|
await ctx.send("Please provide a convertible value!")
|