misc(moderation): moved a bunch of functionality to other files to reduce the length of the main file and improve code maintainability
This commit is contained in:
parent
209ca2998d
commit
75109e0834
6 changed files with 652 additions and 515 deletions
3
moderation/config.py
Normal file
3
moderation/config.py
Normal file
|
@ -0,0 +1,3 @@
|
|||
from Redbot.core import Config
|
||||
|
||||
config = Config.get_conf(None, identifier=481923957134912, cog_name="Moderation")
|
221
moderation/database.py
Normal file
221
moderation/database.py
Normal file
|
@ -0,0 +1,221 @@
|
|||
import json
|
||||
import time
|
||||
import mysql.connector
|
||||
from datetime import datetime
|
||||
from discord import Guild
|
||||
from .utils import generate_dict, get_next_case_number, check_conf
|
||||
from .config import config
|
||||
from .logger import logger
|
||||
|
||||
|
||||
async def connect():
|
||||
"""Connects to the MySQL database, and returns a connection object."""
|
||||
conf = await check_conf(
|
||||
["mysql_address", "mysql_database", "mysql_username", "mysql_password"]
|
||||
)
|
||||
|
||||
if conf:
|
||||
raise LookupError("MySQL connection details not set properly!")
|
||||
|
||||
try:
|
||||
connection = mysql.connector.connect(
|
||||
host=await config.mysql_address(),
|
||||
user=await config.mysql_username(),
|
||||
password=await config.mysql_password(),
|
||||
database=await config.mysql_database(),
|
||||
)
|
||||
|
||||
return connection
|
||||
|
||||
except mysql.connector.ProgrammingError as e:
|
||||
logger.error("Unable to access the MySQL database!\nError:\n%s", e.msg)
|
||||
raise ConnectionRefusedError(
|
||||
f"Unable to access the MySQL Database!\n{e.msg}"
|
||||
) from e
|
||||
|
||||
|
||||
async def create_guild_table(guild: Guild):
|
||||
database = await connect()
|
||||
cursor = database.cursor()
|
||||
|
||||
try:
|
||||
cursor.execute(f"SELECT * FROM `moderation_{guild.id}`")
|
||||
logger.debug("MySQL Table exists for server %s (%s)", guild.name, guild.id)
|
||||
|
||||
except mysql.connector.errors.ProgrammingError:
|
||||
query = f"""
|
||||
CREATE TABLE `moderation_{guild.id}` (
|
||||
moderation_id INT UNIQUE PRIMARY KEY NOT NULL,
|
||||
timestamp INT NOT NULL,
|
||||
moderation_type LONGTEXT NOT NULL,
|
||||
target_id LONGTEXT NOT NULL,
|
||||
moderator_id LONGTEXT NOT NULL,
|
||||
role_id LONGTEXT,
|
||||
duration LONGTEXT,
|
||||
end_timestamp INT,
|
||||
reason LONGTEXT,
|
||||
resolved BOOL NOT NULL,
|
||||
resolved_by LONGTEXT,
|
||||
resolve_reason LONGTEXT,
|
||||
expired BOOL NOT NULL,
|
||||
changes JSON NOT NULL,
|
||||
metadata JSON NOT NULL
|
||||
)
|
||||
"""
|
||||
cursor.execute(query)
|
||||
|
||||
index_query_1 = "CREATE INDEX idx_target_id ON moderation_%s(target_id(25));"
|
||||
cursor.execute(index_query_1, (guild.id,))
|
||||
index_query_2 = (
|
||||
"CREATE INDEX idx_moderator_id ON moderation_%s(moderator_id(25));"
|
||||
)
|
||||
cursor.execute(index_query_2, (guild.id,))
|
||||
index_query_3 = (
|
||||
"CREATE INDEX idx_moderation_id ON moderation_%s(moderation_id);"
|
||||
)
|
||||
cursor.execute(index_query_3, (guild.id,))
|
||||
|
||||
insert_query = f"""
|
||||
INSERT INTO `moderation_{guild.id}`
|
||||
(moderation_id, timestamp, moderation_type, target_id, moderator_id, role_id, duration, end_timestamp, reason, resolved, resolved_by, resolve_reason, expired, changes, metadata)
|
||||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||||
"""
|
||||
insert_values = (
|
||||
0,
|
||||
0,
|
||||
"NULL",
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
"NULL",
|
||||
0,
|
||||
"NULL",
|
||||
0,
|
||||
"NULL",
|
||||
"NULL",
|
||||
0,
|
||||
json.dumps([]),
|
||||
json.dumps({}),
|
||||
)
|
||||
cursor.execute(insert_query, insert_values)
|
||||
|
||||
database.commit()
|
||||
|
||||
logger.debug(
|
||||
"MySQL Table (moderation_%s) created for %s (%s)",
|
||||
guild.id,
|
||||
guild.name,
|
||||
guild.id,
|
||||
)
|
||||
|
||||
database.close()
|
||||
|
||||
|
||||
async def mysql_log(
|
||||
guild_id: str,
|
||||
author_id: str,
|
||||
moderation_type: str,
|
||||
target_id: int,
|
||||
role_id: int,
|
||||
duration,
|
||||
reason: str,
|
||||
database: mysql.connector.MySQLConnection = None,
|
||||
timestamp: int = None,
|
||||
resolved: bool = False,
|
||||
resolved_by: str = None,
|
||||
resolved_reason: str = None,
|
||||
expired: bool = None,
|
||||
changes: list = [],
|
||||
metadata: dict = {},
|
||||
): # pylint: disable=dangerous-default-argument
|
||||
if not timestamp:
|
||||
timestamp = int(time.time())
|
||||
|
||||
if duration != "NULL":
|
||||
end_timedelta = datetime.fromtimestamp(timestamp) + duration
|
||||
end_timestamp = int(end_timedelta.timestamp())
|
||||
else:
|
||||
end_timestamp = 0
|
||||
|
||||
if not expired:
|
||||
if int(time.time()) > end_timestamp:
|
||||
expired = 1
|
||||
else:
|
||||
expired = 0
|
||||
|
||||
if resolved_by is None:
|
||||
resolved_by = "NULL"
|
||||
|
||||
if resolved_reason is None:
|
||||
resolved_reason = "NULL"
|
||||
|
||||
if not database:
|
||||
database = await connect()
|
||||
close_db = True
|
||||
else:
|
||||
close_db = False
|
||||
cursor = database.cursor()
|
||||
|
||||
moderation_id = await get_next_case_number(guild_id=guild_id, cursor=cursor)
|
||||
|
||||
sql = f"INSERT INTO `moderation_{guild_id}` (moderation_id, timestamp, moderation_type, target_id, moderator_id, role_id, duration, end_timestamp, reason, resolved, resolved_by, resolve_reason, expired, changes, metadata) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
|
||||
val = (
|
||||
moderation_id,
|
||||
timestamp,
|
||||
moderation_type,
|
||||
target_id,
|
||||
author_id,
|
||||
role_id,
|
||||
duration,
|
||||
end_timestamp,
|
||||
reason,
|
||||
int(resolved),
|
||||
resolved_by,
|
||||
resolved_reason,
|
||||
expired,
|
||||
json.dumps(changes),
|
||||
json.dumps(metadata),
|
||||
)
|
||||
cursor.execute(sql, val)
|
||||
|
||||
cursor.close()
|
||||
database.commit()
|
||||
if close_db:
|
||||
database.close()
|
||||
|
||||
logger.debug(
|
||||
"MySQL row inserted into moderation_%s!\n%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s",
|
||||
guild_id,
|
||||
moderation_id,
|
||||
timestamp,
|
||||
moderation_type,
|
||||
target_id,
|
||||
author_id,
|
||||
role_id,
|
||||
duration,
|
||||
end_timestamp,
|
||||
reason,
|
||||
int(resolved),
|
||||
resolved_by,
|
||||
resolved_reason,
|
||||
expired,
|
||||
changes,
|
||||
metadata,
|
||||
)
|
||||
|
||||
return moderation_id
|
||||
|
||||
|
||||
async def fetch_case(moderation_id: int, guild_id: str):
|
||||
"""This method fetches a case from the database and returns the case's dictionary."""
|
||||
database = await connect()
|
||||
cursor = database.cursor()
|
||||
|
||||
query = "SELECT * FROM moderation_%s WHERE moderation_id = %s;"
|
||||
cursor.execute(query, (guild_id, moderation_id))
|
||||
result = cursor.fetchone()
|
||||
|
||||
cursor.close()
|
||||
database.close()
|
||||
|
||||
return generate_dict(result)
|
152
moderation/embed_factory.py
Normal file
152
moderation/embed_factory.py
Normal file
|
@ -0,0 +1,152 @@
|
|||
import humanize
|
||||
from datetime import datetime, timedelta
|
||||
from discord import Color, Embed, Guild, Interaction, InteractionMessage
|
||||
from .utils import get_next_case_number, fetch_user_dict
|
||||
|
||||
async def embed_factory(embed_type: str, color: Color, /, interaction: Interaction = None, case_dict: dict = None, guild: Guild = None, reason: str = None, moderation_type: str = None, response: InteractionMessage = None, duration: timedelta = None, resolved: bool = False):
|
||||
"""This method creates an embed from set parameters, meant for either moderation logging or contacting the moderated user.
|
||||
|
||||
Valid arguments for 'embed_type':
|
||||
- 'message'
|
||||
- 'log' - WIP
|
||||
- 'case'
|
||||
- 'changes'
|
||||
|
||||
Required arguments for 'message':
|
||||
- guild
|
||||
- reason
|
||||
- moderation_type
|
||||
- response
|
||||
- duration (optional)
|
||||
|
||||
Required arguments for 'log':
|
||||
- interaction
|
||||
- case_dict
|
||||
- resolved (optional)
|
||||
|
||||
Required arguments for 'case' & 'changes':
|
||||
- interaction
|
||||
- case_dict"""
|
||||
if embed_type == 'message':
|
||||
|
||||
if moderation_type in ["kicked", "banned", "tempbanned", "unbanned"]:
|
||||
guild_name = guild.name
|
||||
else:
|
||||
guild_name = f"[{guild.name}]({response.jump_url})"
|
||||
|
||||
if moderation_type in ["tempbanned", "muted"] and duration:
|
||||
embed_duration = f" for {humanize.precisedelta(duration)}"
|
||||
else:
|
||||
embed_duration = ""
|
||||
|
||||
if moderation_type == "note":
|
||||
embed_desc = "received a"
|
||||
else:
|
||||
embed_desc = "been"
|
||||
|
||||
embed = Embed(title=str.title(moderation_type), description=f"You have {embed_desc} {moderation_type}{embed_duration} in {guild_name}.", color=color, timestamp=datetime.now())
|
||||
embed.add_field(name='Reason', value=f"`{reason}`")
|
||||
embed.set_author(name=guild.name, icon_url=guild.icon.url)
|
||||
embed.set_footer(text=f"Case #{await get_next_case_number(guild.id):,}", icon_url="https://cdn.discordapp.com/attachments/1070822161389994054/1159469476773904414/arrow-right-circle-icon-512x512-2p1e2aaw.png?ex=65312319&is=651eae19&hm=3cebdd28e805c13a79ec48ef87c32ca532ffa6b9ede2e48d0cf8e5e81f3a6818&")
|
||||
|
||||
return embed
|
||||
|
||||
if embed_type == 'case':
|
||||
target_user = await fetch_user_dict(interaction, case_dict['target_id'])
|
||||
moderator_user = await fetch_user_dict(interaction, case_dict['moderator_id'])
|
||||
|
||||
target_name = f"`{target_user['name']}`" if target_user['discriminator'] == "0" else f"`{target_user['name']}#{target_user['discriminator']}`"
|
||||
moderator_name = f"`{moderator_user['name']}`" if moderator_user['discriminator'] == "0" else f"`{moderator_user['name']}#{moderator_user['discriminator']}`"
|
||||
|
||||
embed = Embed(title=f"📕 Case #{case_dict['moderation_id']:,}", color=color)
|
||||
embed.description = f"**Type:** {str.title(case_dict['moderation_type'])}\n**Target:** {target_name} ({target_user['id']})\n**Moderator:** {moderator_name} ({moderator_user['id']})\n**Resolved:** {bool(case_dict['resolved'])}\n**Timestamp:** <t:{case_dict['timestamp']}> | <t:{case_dict['timestamp']}:R>"
|
||||
|
||||
if case_dict['duration'] != 'NULL':
|
||||
td = timedelta(**{unit: int(val) for unit, val in zip(["hours", "minutes", "seconds"], case_dict["duration"].split(":"))})
|
||||
duration_embed = f"{humanize.precisedelta(td)} | <t:{case_dict['end_timestamp']}:R>" if bool(case_dict['expired']) is False else str(humanize.precisedelta(td))
|
||||
embed.description += f"\n**Duration:** {duration_embed}\n**Expired:** {bool(case_dict['expired'])}"
|
||||
|
||||
embed.description += f"\n**Changes:** {len(case_dict['changes'])}"
|
||||
|
||||
if case_dict['metadata']:
|
||||
if case_dict['metadata']['imported_from']:
|
||||
embed.description += f"\n**Imported From:** {case_dict['metadata']['imported_from']}"
|
||||
|
||||
embed.add_field(name='Reason', value=f"```{case_dict['reason']}```", inline=False)
|
||||
|
||||
if case_dict['resolved'] == 1:
|
||||
resolved_user = await fetch_user_dict(interaction, case_dict['resolved_by'])
|
||||
resolved_name = f"`{resolved_user['name']}`" if resolved_user['discriminator'] == "0" else f"`{resolved_user['name']}#{resolved_user['discriminator']}`"
|
||||
embed.add_field(name='Resolve Reason', value=f"Resolved by {resolved_name} ({resolved_user['id']}) for:\n```{case_dict['resolve_reason']}```", inline=False)
|
||||
|
||||
return embed
|
||||
|
||||
if embed_type == 'changes':
|
||||
embed = Embed(title=f"📕 Case #{case_dict['moderation_id']:,} Changes", color=color)
|
||||
|
||||
memory_dict = {}
|
||||
|
||||
if case_dict['changes']:
|
||||
for change in case_dict['changes']:
|
||||
if change['user_id'] not in memory_dict:
|
||||
memory_dict[str(change['user_id'])] = await fetch_user_dict(interaction, change['user_id'])
|
||||
|
||||
user = memory_dict[str(change['user_id'])]
|
||||
name = user['name'] if user['discriminator'] == "0" else f"{user['name']}#{user['discriminator']}"
|
||||
|
||||
timestamp = f"<t:{change['timestamp']}> | <t:{change['timestamp']}:R>"
|
||||
|
||||
if change['type'] == 'ORIGINAL':
|
||||
embed.add_field(name='Original', value=f"**User:** `{name}` ({user['id']})\n**Reason:** {change['reason']}\n**Timestamp:** {timestamp}", inline=False)
|
||||
|
||||
elif change['type'] == 'EDIT':
|
||||
embed.add_field(name='Edit', value=f"**User:** `{name}` ({user['id']})\n**Reason:** {change['reason']}\n**Timestamp:** {timestamp}", inline=False)
|
||||
|
||||
elif change['type'] == 'RESOLVE':
|
||||
embed.add_field(name='Resolve', value=f"**User:** `{name}` ({user['id']})\n**Reason:** {change['reason']}\n**Timestamp:** {timestamp}", inline=False)
|
||||
|
||||
else:
|
||||
embed.description = "*No changes have been made to this case.* 🙁"
|
||||
|
||||
return embed
|
||||
|
||||
if embed_type == 'log':
|
||||
if resolved:
|
||||
target_user = await fetch_user_dict(interaction, case_dict['target_id'])
|
||||
moderator_user = await fetch_user_dict(interaction, case_dict['moderator_id'])
|
||||
|
||||
target_name = f"`{target_user['name']}`" if target_user['discriminator'] == "0" else f"`{target_user['name']}#{target_user['discriminator']}`"
|
||||
moderator_name = moderator_user['name'] if moderator_user['discriminator'] == "0" else f"{moderator_user['name']}#{moderator_user['discriminator']}"
|
||||
|
||||
embed = Embed(title=f"📕 Case #{case_dict['moderation_id']:,} Resolved", color=color)
|
||||
|
||||
embed.description = f"**Type:** {str.title(case_dict['moderation_type'])}\n**Target:** {target_name} ({target_user['id']})\n**Moderator:** {moderator_name} ({moderator_user['id']})\n**Timestamp:** <t:{case_dict['timestamp']}> | <t:{case_dict['timestamp']}:R>"
|
||||
|
||||
if case_dict['duration'] != 'NULL':
|
||||
td = timedelta(**{unit: int(val) for unit, val in zip(["hours", "minutes", "seconds"], case_dict["duration"].split(":"))})
|
||||
duration_embed = f"{humanize.precisedelta(td)} | <t:{case_dict['end_timestamp']}:R>" if case_dict["expired"] == '0' else str(humanize.precisedelta(td))
|
||||
embed.description = embed.description + f"\n**Duration:** {duration_embed}\n**Expired:** {bool(case_dict['expired'])}"
|
||||
|
||||
embed.add_field(name='Reason', value=f"```{case_dict['reason']}```", inline=False)
|
||||
|
||||
resolved_user = await fetch_user_dict(interaction, case_dict['resolved_by'])
|
||||
resolved_name = resolved_user['name'] if resolved_user['discriminator'] == "0" else f"{resolved_user['name']}#{resolved_user['discriminator']}"
|
||||
embed.add_field(name='Resolve Reason', value=f"Resolved by {resolved_name} ({resolved_user['id']}) for:\n```{case_dict['resolve_reason']}```", inline=False)
|
||||
else:
|
||||
target_user = await fetch_user_dict(interaction, case_dict['target_id'])
|
||||
moderator_user = await fetch_user_dict(interaction, case_dict['moderator_id'])
|
||||
|
||||
target_name = f"`{target_user['name']}`" if target_user['discriminator'] == "0" else f"`{target_user['name']}#{target_user['discriminator']}`"
|
||||
moderator_name = moderator_user['name'] if moderator_user['discriminator'] == "0" else f"{moderator_user['name']}#{moderator_user['discriminator']}"
|
||||
|
||||
embed = Embed(title=f"📕 Case #{case_dict['moderation_id']:,}", color=color)
|
||||
embed.description = f"**Type:** {str.title(case_dict['moderation_type'])}\n**Target:** {target_name} ({target_user['id']})\n**Moderator:** {moderator_name} ({moderator_user['id']})\n**Timestamp:** <t:{case_dict['timestamp']}> | <t:{case_dict['timestamp']}:R>"
|
||||
|
||||
if case_dict['duration'] != 'NULL':
|
||||
td = timedelta(**{unit: int(val) for unit, val in zip(["hours", "minutes", "seconds"], case_dict["duration"].split(":"))})
|
||||
embed.description = embed.description + f"\n**Duration:** {humanize.precisedelta(td)} | <t:{case_dict['end_timestamp']}:R>"
|
||||
|
||||
embed.add_field(name='Reason', value=f"```{case_dict['reason']}```", inline=False)
|
||||
return embed
|
||||
|
||||
raise(TypeError("'type' argument is invalid!"))
|
3
moderation/logger.py
Normal file
3
moderation/logger.py
Normal file
|
@ -0,0 +1,3 @@
|
|||
import logging
|
||||
|
||||
logger = logging.getLogger("red.sea.moderation")
|
|
@ -5,12 +5,10 @@
|
|||
# ____) | __/ (_| \__ \\ V V /| | | | | | | | | | | | __/ |
|
||||
# |_____/ \___|\__,_|___/ \_/\_/ |_|_| |_| |_|_| |_| |_|\___|_|
|
||||
|
||||
import logging
|
||||
import json
|
||||
import time
|
||||
import os
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from typing import Union
|
||||
import discord
|
||||
import humanize
|
||||
import mysql.connector
|
||||
|
@ -18,6 +16,10 @@ from discord.ext import tasks
|
|||
from pytimeparse2 import disable_dateutil, parse
|
||||
from redbot.core import app_commands, checks, Config, commands, data_manager
|
||||
from redbot.core.app_commands import Choice
|
||||
from .database import connect, create_guild_table, fetch_case, mysql_log
|
||||
from .embed_factory import embed_factory
|
||||
from .utils import check_conf, check_permissions, check_moddable, fetch_user_dict, generate_dict, log
|
||||
from .logger import logger
|
||||
|
||||
class Moderation(commands.Cog):
|
||||
"""Custom moderation cog.
|
||||
|
@ -27,7 +29,7 @@ class Moderation(commands.Cog):
|
|||
if requester == "discord_deleted_user":
|
||||
await self.config.user_from_id(user_id).clear()
|
||||
|
||||
database = await self.connect()
|
||||
database = await connect()
|
||||
cursor = database.cursor()
|
||||
|
||||
cursor.execute("SHOW TABLES;")
|
||||
|
@ -49,7 +51,7 @@ class Moderation(commands.Cog):
|
|||
if requester == "user_strict":
|
||||
await self.config.user_from_id(user_id).clear()
|
||||
else:
|
||||
self.logger.warning("Invalid requester passed to red_delete_data_for_user: %s", requester)
|
||||
logger.warning("Invalid requester passed to red_delete_data_for_user: %s", requester)
|
||||
|
||||
def __init__(self, bot):
|
||||
self.bot = bot
|
||||
|
@ -80,11 +82,10 @@ class Moderation(commands.Cog):
|
|||
)
|
||||
disable_dateutil()
|
||||
self.handle_expiry.start() # pylint: disable=no-member
|
||||
self.logger = logging.getLogger('red.sea.moderation')
|
||||
|
||||
async def cog_load(self):
|
||||
"""This method prepares the database schema for all of the guilds the bot is currently in."""
|
||||
conf = await self.check_conf([
|
||||
conf = await check_conf([
|
||||
'mysql_address',
|
||||
'mysql_database',
|
||||
'mysql_username',
|
||||
|
@ -92,7 +93,7 @@ class Moderation(commands.Cog):
|
|||
])
|
||||
|
||||
if conf:
|
||||
self.logger.error("Failed to create tables, due to MySQL connection configuration being unset.")
|
||||
logger.error("Failed to create tables, due to MySQL connection configuration being unset.")
|
||||
return
|
||||
|
||||
guilds: list[discord.Guild] = self.bot.guilds
|
||||
|
@ -112,7 +113,7 @@ class Moderation(commands.Cog):
|
|||
async def db_generate_guild_join(self, guild: discord.Guild):
|
||||
"""This method prepares the database schema whenever the bot joins a guild."""
|
||||
if not await self.bot.cog_disabled_in_guild(self, guild):
|
||||
conf = await self.check_conf([
|
||||
conf = await check_conf([
|
||||
'mysql_address',
|
||||
'mysql_database',
|
||||
'mysql_username',
|
||||
|
@ -120,11 +121,11 @@ class Moderation(commands.Cog):
|
|||
|
||||
])
|
||||
if conf:
|
||||
self.logger.error("Failed to create a table for %s, due to MySQL connection configuration being unset.", guild.id)
|
||||
logger.error("Failed to create a table for %s, due to MySQL connection configuration being unset.", guild.id)
|
||||
return
|
||||
|
||||
try:
|
||||
await self.create_guild_table(guild)
|
||||
await create_guild_table(guild)
|
||||
|
||||
except ConnectionRefusedError:
|
||||
return
|
||||
|
@ -169,446 +170,7 @@ class Moderation(commands.Cog):
|
|||
else:
|
||||
return
|
||||
|
||||
await self.mysql_log(entry.guild.id, entry.user.id, moderation_type, entry.target.id, 0, duration, reason)
|
||||
|
||||
async def connect(self):
|
||||
"""Connects to the MySQL database, and returns a connection object."""
|
||||
conf = await self.check_conf([
|
||||
'mysql_address',
|
||||
'mysql_database',
|
||||
'mysql_username',
|
||||
'mysql_password'
|
||||
])
|
||||
|
||||
if conf:
|
||||
raise LookupError("MySQL connection details not set properly!")
|
||||
|
||||
try:
|
||||
connection = mysql.connector.connect(
|
||||
host=await self.config.mysql_address(),
|
||||
user=await self.config.mysql_username(),
|
||||
password=await self.config.mysql_password(),
|
||||
database=await self.config.mysql_database()
|
||||
)
|
||||
|
||||
return connection
|
||||
|
||||
except mysql.connector.ProgrammingError as e:
|
||||
self.logger.error("Unable to access the MySQL database!\nError:\n%s", e.msg)
|
||||
raise ConnectionRefusedError(f"Unable to access the MySQL Database!\n{e.msg}") from e
|
||||
|
||||
async def create_guild_table(self, guild: discord.Guild):
|
||||
database = await self.connect()
|
||||
cursor = database.cursor()
|
||||
|
||||
try:
|
||||
cursor.execute(f"SELECT * FROM `moderation_{guild.id}`")
|
||||
self.logger.debug("MySQL Table exists for server %s (%s)", guild.name, guild.id)
|
||||
|
||||
except mysql.connector.errors.ProgrammingError:
|
||||
query = f"""
|
||||
CREATE TABLE `moderation_{guild.id}` (
|
||||
moderation_id INT UNIQUE PRIMARY KEY NOT NULL,
|
||||
timestamp INT NOT NULL,
|
||||
moderation_type LONGTEXT NOT NULL,
|
||||
target_id LONGTEXT NOT NULL,
|
||||
moderator_id LONGTEXT NOT NULL,
|
||||
role_id LONGTEXT,
|
||||
duration LONGTEXT,
|
||||
end_timestamp INT,
|
||||
reason LONGTEXT,
|
||||
resolved BOOL NOT NULL,
|
||||
resolved_by LONGTEXT,
|
||||
resolve_reason LONGTEXT,
|
||||
expired BOOL NOT NULL,
|
||||
changes JSON NOT NULL,
|
||||
metadata JSON NOT NULL
|
||||
)
|
||||
"""
|
||||
cursor.execute(query)
|
||||
|
||||
index_query_1 = "CREATE INDEX idx_target_id ON moderation_%s(target_id(25));"
|
||||
cursor.execute(index_query_1, (guild.id,))
|
||||
index_query_2 = "CREATE INDEX idx_moderator_id ON moderation_%s(moderator_id(25));"
|
||||
cursor.execute(index_query_2, (guild.id,))
|
||||
index_query_3 = "CREATE INDEX idx_moderation_id ON moderation_%s(moderation_id);"
|
||||
cursor.execute(index_query_3, (guild.id,))
|
||||
|
||||
insert_query = f"""
|
||||
INSERT INTO `moderation_{guild.id}`
|
||||
(moderation_id, timestamp, moderation_type, target_id, moderator_id, role_id, duration, end_timestamp, reason, resolved, resolved_by, resolve_reason, expired, changes, metadata)
|
||||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
||||
"""
|
||||
insert_values = (0, 0, "NULL", 0, 0, 0, "NULL", 0, "NULL", 0, "NULL", "NULL", 0, json.dumps([]), json.dumps({}))
|
||||
cursor.execute(insert_query, insert_values)
|
||||
|
||||
database.commit()
|
||||
|
||||
self.logger.debug("MySQL Table (moderation_%s) created for %s (%s)", guild.id, guild.name, guild.id)
|
||||
|
||||
database.close()
|
||||
|
||||
async def check_conf(self, config: list):
|
||||
"""Checks if any required config options are not set."""
|
||||
not_found_list = []
|
||||
|
||||
for item in config:
|
||||
if await self.config.item() == " ":
|
||||
not_found_list.append(item)
|
||||
|
||||
return not_found_list
|
||||
|
||||
def check_permissions(self, user: discord.User, permissions: list, ctx: Union[commands.Context, discord.Interaction] = None, guild: discord.Guild = None):
|
||||
"""Checks if a user has a specific permission (or a list of permissions) in a channel."""
|
||||
if ctx:
|
||||
member = ctx.guild.get_member(user.id)
|
||||
resolved_permissions = ctx.channel.permissions_for(member)
|
||||
|
||||
elif guild:
|
||||
member = guild.get_member(user.id)
|
||||
resolved_permissions = member.guild_permissions
|
||||
|
||||
else:
|
||||
raise(KeyError)
|
||||
|
||||
for permission in permissions:
|
||||
if not getattr(resolved_permissions, permission, False) and not resolved_permissions.administrator is True:
|
||||
return permission
|
||||
|
||||
return False
|
||||
|
||||
async def check_moddable(self, target: Union[discord.User, discord.Member], interaction: discord.Interaction, permissions: list):
|
||||
"""Checks if a moderator can moderate a target."""
|
||||
if self.check_permissions(interaction.client.user, permissions, guild=interaction.guild):
|
||||
await interaction.response.send_message(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True)
|
||||
return False
|
||||
|
||||
if await self.config.guild(interaction.guild).use_discord_permissions() is True:
|
||||
if self.check_permissions(interaction.user, permissions, guild=interaction.guild):
|
||||
await interaction.response.send_message(f"You do not have the `{permissions}` permission, required for this action.", ephemeral=True)
|
||||
return False
|
||||
|
||||
if interaction.user.id == target.id:
|
||||
await interaction.response.send_message(content="You cannot moderate yourself!", ephemeral=True)
|
||||
return False
|
||||
|
||||
if target.bot:
|
||||
await interaction.response.send_message(content="You cannot moderate bots!", ephemeral=True)
|
||||
return False
|
||||
|
||||
if isinstance(target, discord.Member):
|
||||
if interaction.user.top_role <= target.top_role:
|
||||
await interaction.response.send_message(content="You cannot moderate members with a higher role than you!", ephemeral=True)
|
||||
return False
|
||||
|
||||
if interaction.guild.get_member(interaction.client.user.id).top_role <= target.top_role:
|
||||
await interaction.response.send_message(content="You cannot moderate members with a role higher than the bot!", ephemeral=True)
|
||||
return False
|
||||
|
||||
immune_roles = await self.config.guild(target.guild).immune_roles()
|
||||
|
||||
for role in target.roles:
|
||||
if role.id in immune_roles:
|
||||
await interaction.response.send_message(content="You cannot moderate members with an immune role!", ephemeral=True)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
async def mysql_log(self, guild_id: str, author_id: str, moderation_type: str, target_id: int, role_id: int, duration, reason: str, database: mysql.connector.MySQLConnection = None, timestamp: int = None, resolved: bool = False, resolved_by: str = None, resolved_reason: str = None, expired: bool = None, changes: list = [], metadata: dict = {}): # pylint: disable=dangerous-default-argument
|
||||
if not timestamp:
|
||||
timestamp = int(time.time())
|
||||
|
||||
if duration != "NULL":
|
||||
end_timedelta = datetime.fromtimestamp(timestamp) + duration
|
||||
end_timestamp = int(end_timedelta.timestamp())
|
||||
else:
|
||||
end_timestamp = 0
|
||||
|
||||
if not expired:
|
||||
if int(time.time()) > end_timestamp:
|
||||
expired = 1
|
||||
else:
|
||||
expired = 0
|
||||
|
||||
if resolved_by is None:
|
||||
resolved_by = "NULL"
|
||||
|
||||
if resolved_reason is None:
|
||||
resolved_reason = "NULL"
|
||||
|
||||
if not database:
|
||||
database = await self.connect()
|
||||
close_db = True
|
||||
else:
|
||||
close_db = False
|
||||
cursor = database.cursor()
|
||||
|
||||
moderation_id = await self.get_next_case_number(guild_id=guild_id, cursor=cursor)
|
||||
|
||||
sql = f"INSERT INTO `moderation_{guild_id}` (moderation_id, timestamp, moderation_type, target_id, moderator_id, role_id, duration, end_timestamp, reason, resolved, resolved_by, resolve_reason, expired, changes, metadata) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
|
||||
val = (moderation_id, timestamp, moderation_type, target_id, author_id, role_id, duration, end_timestamp, reason, int(resolved), resolved_by, resolved_reason, expired, json.dumps(changes), json.dumps(metadata))
|
||||
cursor.execute(sql, val)
|
||||
|
||||
cursor.close()
|
||||
database.commit()
|
||||
if close_db:
|
||||
database.close()
|
||||
|
||||
self.logger.debug("MySQL row inserted into moderation_%s!\n%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s", guild_id, moderation_id, timestamp, moderation_type, target_id, author_id, role_id, duration, end_timestamp, reason, int(resolved), resolved_by, resolved_reason, expired, changes, metadata)
|
||||
|
||||
return moderation_id
|
||||
|
||||
async def get_next_case_number(self, guild_id: str, cursor = None):
|
||||
"""This method returns the next case number from the MySQL table for a specific guild."""
|
||||
if not cursor:
|
||||
database = await self.connect()
|
||||
cursor = database.cursor()
|
||||
cursor.execute(f"SELECT moderation_id FROM `moderation_{guild_id}` ORDER BY moderation_id DESC LIMIT 1")
|
||||
return cursor.fetchone()[0] + 1
|
||||
|
||||
def generate_dict(self, result):
|
||||
case: dict = {
|
||||
"moderation_id": result[0],
|
||||
"timestamp": result[1],
|
||||
"moderation_type": result[2],
|
||||
"target_id": result[3],
|
||||
"moderator_id": result[4],
|
||||
"role_id": result[5],
|
||||
"duration": result[6],
|
||||
"end_timestamp": result[7],
|
||||
"reason": result[8],
|
||||
"resolved": result[9],
|
||||
"resolved_by": result[10],
|
||||
"resolve_reason": result[11],
|
||||
"expired": result[12],
|
||||
"changes": json.loads(result[13]),
|
||||
"metadata": json.loads(result[14])
|
||||
}
|
||||
return case
|
||||
|
||||
async def fetch_user_dict(self, interaction: discord.Interaction, user_id: str):
|
||||
"""This method returns a dictionary containing either user information or a standard deleted user template."""
|
||||
if user_id == '?':
|
||||
user_dict = {
|
||||
'id': '?',
|
||||
'name': 'Unknown User',
|
||||
'discriminator':'0'
|
||||
}
|
||||
|
||||
else:
|
||||
try:
|
||||
user = interaction.client.get_user(user_id)
|
||||
if user is None:
|
||||
user = await interaction.client.fetch_user(user_id)
|
||||
|
||||
user_dict = {
|
||||
'id': user.id,
|
||||
'name': user.name,
|
||||
'discriminator': user.discriminator
|
||||
}
|
||||
|
||||
except discord.errors.NotFound:
|
||||
user_dict = {
|
||||
'id': user_id,
|
||||
'name': 'Deleted User',
|
||||
'discriminator': '0'
|
||||
}
|
||||
|
||||
return user_dict
|
||||
|
||||
async def fetch_role_dict(self, interaction: discord.Interaction, role_id: str):
|
||||
"""This method returns a dictionary containing either role information or a standard deleted role template."""
|
||||
try:
|
||||
role = interaction.guild.get_role(role_id)
|
||||
|
||||
role_dict = {
|
||||
'id': role.id,
|
||||
'name': role.name
|
||||
}
|
||||
|
||||
except discord.errors.NotFound:
|
||||
role_dict = {
|
||||
'id': role_id,
|
||||
'name': 'Deleted Role'
|
||||
}
|
||||
|
||||
return role_dict
|
||||
|
||||
async def embed_factory(self, embed_type: str, /, interaction: discord.Interaction = None, case_dict: dict = None, guild: discord.Guild = None, reason: str = None, moderation_type: str = None, response: discord.InteractionMessage = None, duration: timedelta = None, resolved: bool = False):
|
||||
"""This method creates an embed from set parameters, meant for either moderation logging or contacting the moderated user.
|
||||
|
||||
Valid arguments for 'embed_type':
|
||||
- 'message'
|
||||
- 'log' - WIP
|
||||
- 'case'
|
||||
- 'changes'
|
||||
|
||||
Required arguments for 'message':
|
||||
- guild
|
||||
- reason
|
||||
- moderation_type
|
||||
- response
|
||||
- duration (optional)
|
||||
|
||||
Required arguments for 'log':
|
||||
- interaction
|
||||
- case_dict
|
||||
- resolved (optional)
|
||||
|
||||
Required arguments for 'case' & 'changes':
|
||||
- interaction
|
||||
- case_dict"""
|
||||
if embed_type == 'message':
|
||||
|
||||
if moderation_type in ["kicked", "banned", "tempbanned", "unbanned"]:
|
||||
guild_name = guild.name
|
||||
else:
|
||||
guild_name = f"[{guild.name}]({response.jump_url})"
|
||||
|
||||
if moderation_type in ["tempbanned", "muted"] and duration:
|
||||
embed_duration = f" for {humanize.precisedelta(duration)}"
|
||||
else:
|
||||
embed_duration = ""
|
||||
|
||||
if moderation_type == "note":
|
||||
embed_desc = "received a"
|
||||
else:
|
||||
embed_desc = "been"
|
||||
|
||||
embed = discord.Embed(title=str.title(moderation_type), description=f"You have {embed_desc} {moderation_type}{embed_duration} in {guild_name}.", color=await self.bot.get_embed_color(None), timestamp=datetime.now())
|
||||
embed.add_field(name='Reason', value=f"`{reason}`")
|
||||
embed.set_author(name=guild.name, icon_url=guild.icon.url)
|
||||
embed.set_footer(text=f"Case #{await self.get_next_case_number(guild.id):,}", icon_url="https://cdn.discordapp.com/attachments/1070822161389994054/1159469476773904414/arrow-right-circle-icon-512x512-2p1e2aaw.png?ex=65312319&is=651eae19&hm=3cebdd28e805c13a79ec48ef87c32ca532ffa6b9ede2e48d0cf8e5e81f3a6818&")
|
||||
|
||||
return embed
|
||||
|
||||
if embed_type == 'case':
|
||||
target_user = await self.fetch_user_dict(interaction, case_dict['target_id'])
|
||||
moderator_user = await self.fetch_user_dict(interaction, case_dict['moderator_id'])
|
||||
|
||||
target_name = f"`{target_user['name']}`" if target_user['discriminator'] == "0" else f"`{target_user['name']}#{target_user['discriminator']}`"
|
||||
moderator_name = f"`{moderator_user['name']}`" if moderator_user['discriminator'] == "0" else f"`{moderator_user['name']}#{moderator_user['discriminator']}`"
|
||||
|
||||
embed = discord.Embed(title=f"📕 Case #{case_dict['moderation_id']:,}", color=await self.bot.get_embed_color(None))
|
||||
embed.description = f"**Type:** {str.title(case_dict['moderation_type'])}\n**Target:** {target_name} ({target_user['id']})\n**Moderator:** {moderator_name} ({moderator_user['id']})\n**Resolved:** {bool(case_dict['resolved'])}\n**Timestamp:** <t:{case_dict['timestamp']}> | <t:{case_dict['timestamp']}:R>"
|
||||
|
||||
if case_dict['duration'] != 'NULL':
|
||||
td = timedelta(**{unit: int(val) for unit, val in zip(["hours", "minutes", "seconds"], case_dict["duration"].split(":"))})
|
||||
duration_embed = f"{humanize.precisedelta(td)} | <t:{case_dict['end_timestamp']}:R>" if bool(case_dict['expired']) is False else str(humanize.precisedelta(td))
|
||||
embed.description += f"\n**Duration:** {duration_embed}\n**Expired:** {bool(case_dict['expired'])}"
|
||||
|
||||
embed.description += f"\n**Changes:** {len(case_dict['changes'])}"
|
||||
|
||||
if case_dict['metadata']:
|
||||
if case_dict['metadata']['imported_from']:
|
||||
embed.description += f"\n**Imported From:** {case_dict['metadata']['imported_from']}"
|
||||
|
||||
embed.add_field(name='Reason', value=f"```{case_dict['reason']}```", inline=False)
|
||||
|
||||
if case_dict['resolved'] == 1:
|
||||
resolved_user = await self.fetch_user_dict(interaction, case_dict['resolved_by'])
|
||||
resolved_name = f"`{resolved_user['name']}`" if resolved_user['discriminator'] == "0" else f"`{resolved_user['name']}#{resolved_user['discriminator']}`"
|
||||
embed.add_field(name='Resolve Reason', value=f"Resolved by {resolved_name} ({resolved_user['id']}) for:\n```{case_dict['resolve_reason']}```", inline=False)
|
||||
|
||||
return embed
|
||||
|
||||
if embed_type == 'changes':
|
||||
embed = discord.Embed(title=f"📕 Case #{case_dict['moderation_id']:,} Changes", color=await self.bot.get_embed_color(None))
|
||||
|
||||
memory_dict = {}
|
||||
|
||||
if case_dict['changes']:
|
||||
for change in case_dict['changes']:
|
||||
if change['user_id'] not in memory_dict:
|
||||
memory_dict[str(change['user_id'])] = await self.fetch_user_dict(interaction, change['user_id'])
|
||||
|
||||
user = memory_dict[str(change['user_id'])]
|
||||
name = user['name'] if user['discriminator'] == "0" else f"{user['name']}#{user['discriminator']}"
|
||||
|
||||
timestamp = f"<t:{change['timestamp']}> | <t:{change['timestamp']}:R>"
|
||||
|
||||
if change['type'] == 'ORIGINAL':
|
||||
embed.add_field(name='Original', value=f"**User:** `{name}` ({user['id']})\n**Reason:** {change['reason']}\n**Timestamp:** {timestamp}", inline=False)
|
||||
|
||||
elif change['type'] == 'EDIT':
|
||||
embed.add_field(name='Edit', value=f"**User:** `{name}` ({user['id']})\n**Reason:** {change['reason']}\n**Timestamp:** {timestamp}", inline=False)
|
||||
|
||||
elif change['type'] == 'RESOLVE':
|
||||
embed.add_field(name='Resolve', value=f"**User:** `{name}` ({user['id']})\n**Reason:** {change['reason']}\n**Timestamp:** {timestamp}", inline=False)
|
||||
|
||||
else:
|
||||
embed.description = "*No changes have been made to this case.* 🙁"
|
||||
|
||||
return embed
|
||||
|
||||
if embed_type == 'log':
|
||||
if resolved:
|
||||
target_user = await self.fetch_user_dict(interaction, case_dict['target_id'])
|
||||
moderator_user = await self.fetch_user_dict(interaction, case_dict['moderator_id'])
|
||||
|
||||
target_name = f"`{target_user['name']}`" if target_user['discriminator'] == "0" else f"`{target_user['name']}#{target_user['discriminator']}`"
|
||||
moderator_name = moderator_user['name'] if moderator_user['discriminator'] == "0" else f"{moderator_user['name']}#{moderator_user['discriminator']}"
|
||||
|
||||
embed = discord.Embed(title=f"📕 Case #{case_dict['moderation_id']:,} Resolved", color=await self.bot.get_embed_color(None))
|
||||
|
||||
embed.description = f"**Type:** {str.title(case_dict['moderation_type'])}\n**Target:** {target_name} ({target_user['id']})\n**Moderator:** {moderator_name} ({moderator_user['id']})\n**Timestamp:** <t:{case_dict['timestamp']}> | <t:{case_dict['timestamp']}:R>"
|
||||
|
||||
if case_dict['duration'] != 'NULL':
|
||||
td = timedelta(**{unit: int(val) for unit, val in zip(["hours", "minutes", "seconds"], case_dict["duration"].split(":"))})
|
||||
duration_embed = f"{humanize.precisedelta(td)} | <t:{case_dict['end_timestamp']}:R>" if case_dict["expired"] == '0' else str(humanize.precisedelta(td))
|
||||
embed.description = embed.description + f"\n**Duration:** {duration_embed}\n**Expired:** {bool(case_dict['expired'])}"
|
||||
|
||||
embed.add_field(name='Reason', value=f"```{case_dict['reason']}```", inline=False)
|
||||
|
||||
resolved_user = await self.fetch_user_dict(interaction, case_dict['resolved_by'])
|
||||
resolved_name = resolved_user['name'] if resolved_user['discriminator'] == "0" else f"{resolved_user['name']}#{resolved_user['discriminator']}"
|
||||
embed.add_field(name='Resolve Reason', value=f"Resolved by {resolved_name} ({resolved_user['id']}) for:\n```{case_dict['resolve_reason']}```", inline=False)
|
||||
else:
|
||||
target_user = await self.fetch_user_dict(interaction, case_dict['target_id'])
|
||||
moderator_user = await self.fetch_user_dict(interaction, case_dict['moderator_id'])
|
||||
|
||||
target_name = f"`{target_user['name']}`" if target_user['discriminator'] == "0" else f"`{target_user['name']}#{target_user['discriminator']}`"
|
||||
moderator_name = moderator_user['name'] if moderator_user['discriminator'] == "0" else f"{moderator_user['name']}#{moderator_user['discriminator']}"
|
||||
|
||||
embed = discord.Embed(title=f"📕 Case #{case_dict['moderation_id']:,}", color=await self.bot.get_embed_color(None))
|
||||
embed.description = f"**Type:** {str.title(case_dict['moderation_type'])}\n**Target:** {target_name} ({target_user['id']})\n**Moderator:** {moderator_name} ({moderator_user['id']})\n**Timestamp:** <t:{case_dict['timestamp']}> | <t:{case_dict['timestamp']}:R>"
|
||||
|
||||
if case_dict['duration'] != 'NULL':
|
||||
td = timedelta(**{unit: int(val) for unit, val in zip(["hours", "minutes", "seconds"], case_dict["duration"].split(":"))})
|
||||
embed.description = embed.description + f"\n**Duration:** {humanize.precisedelta(td)} | <t:{case_dict['end_timestamp']}:R>"
|
||||
|
||||
embed.add_field(name='Reason', value=f"```{case_dict['reason']}```", inline=False)
|
||||
return embed
|
||||
|
||||
raise(TypeError("'type' argument is invalid!"))
|
||||
|
||||
async def fetch_case(self, moderation_id: int, guild_id: str):
|
||||
"""This method fetches a case from the database and returns the case's dictionary."""
|
||||
database = await self.connect()
|
||||
cursor = database.cursor()
|
||||
|
||||
query = "SELECT * FROM moderation_%s WHERE moderation_id = %s;"
|
||||
cursor.execute(query, (guild_id, moderation_id))
|
||||
result = cursor.fetchone()
|
||||
|
||||
cursor.close()
|
||||
database.close()
|
||||
|
||||
return self.generate_dict(result)
|
||||
|
||||
async def log(self, interaction: discord.Interaction, moderation_id: int, resolved: bool = False):
|
||||
"""This method sends a message to the guild's configured logging channel when an infraction takes place."""
|
||||
logging_channel_id = await self.config.guild(interaction.guild).log_channel()
|
||||
if logging_channel_id != " ":
|
||||
logging_channel = interaction.guild.get_channel(logging_channel_id)
|
||||
|
||||
case = await self.fetch_case(moderation_id, interaction.guild.id)
|
||||
if case:
|
||||
embed = await self.embed_factory('log', interaction=interaction, case_dict=case, resolved=resolved)
|
||||
try:
|
||||
await logging_channel.send(embed=embed)
|
||||
except discord.errors.Forbidden:
|
||||
return
|
||||
await mysql_log(entry.guild.id, entry.user.id, moderation_type, entry.target.id, 0, duration, reason)
|
||||
|
||||
#######################################################################################################################
|
||||
### COMMANDS
|
||||
|
@ -626,7 +188,7 @@ class Moderation(commands.Cog):
|
|||
Why are you noting this user?
|
||||
silent: bool
|
||||
Should the user be messaged?"""
|
||||
if not await self.check_moddable(target, interaction, ['moderate_members']):
|
||||
if not await check_moddable(target, interaction, ['moderate_members']):
|
||||
return
|
||||
|
||||
await interaction.response.send_message(content=f"{target.mention} has recieved a note!\n**Reason** - `{reason}`")
|
||||
|
@ -635,14 +197,14 @@ class Moderation(commands.Cog):
|
|||
silent = not await self.config.guild(interaction.guild).dm_users()
|
||||
if silent is False:
|
||||
try:
|
||||
embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='note', response=await interaction.original_response())
|
||||
embed = await embed_factory('message', await self.bot.get_embed_color(None), guild=interaction.guild, reason=reason, moderation_type='note', response=await interaction.original_response())
|
||||
await target.send(embed=embed)
|
||||
except discord.errors.HTTPException:
|
||||
pass
|
||||
|
||||
moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'NOTE', target.id, 0, 'NULL', reason)
|
||||
moderation_id = await mysql_log(interaction.guild.id, interaction.user.id, 'NOTE', target.id, 0, 'NULL', reason)
|
||||
await interaction.edit_original_response(content=f"{target.mention} has received a note! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`")
|
||||
await self.log(interaction, moderation_id)
|
||||
await log(interaction, moderation_id)
|
||||
|
||||
@app_commands.command(name="warn")
|
||||
async def warn(self, interaction: discord.Interaction, target: discord.Member, reason: str, silent: bool = None):
|
||||
|
@ -656,7 +218,7 @@ class Moderation(commands.Cog):
|
|||
Why are you warning this user?
|
||||
silent: bool
|
||||
Should the user be messaged?"""
|
||||
if not await self.check_moddable(target, interaction, ['moderate_members']):
|
||||
if not await check_moddable(target, interaction, ['moderate_members']):
|
||||
return
|
||||
|
||||
await interaction.response.send_message(content=f"{target.mention} has been warned!\n**Reason** - `{reason}`")
|
||||
|
@ -665,14 +227,14 @@ class Moderation(commands.Cog):
|
|||
silent = not await self.config.guild(interaction.guild).dm_users()
|
||||
if silent is False:
|
||||
try:
|
||||
embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='warned', response=await interaction.original_response())
|
||||
embed = await embed_factory('message', await self.bot.get_embed_color(None), guild=interaction.guild, reason=reason, moderation_type='warned', response=await interaction.original_response())
|
||||
await target.send(embed=embed)
|
||||
except discord.errors.HTTPException:
|
||||
pass
|
||||
|
||||
moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'WARN', target.id, 0, 'NULL', reason)
|
||||
moderation_id = await mysql_log(interaction.guild.id, interaction.user.id, 'WARN', target.id, 0, 'NULL', reason)
|
||||
await interaction.edit_original_response(content=f"{target.mention} has been warned! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`")
|
||||
await self.log(interaction, moderation_id)
|
||||
await log(interaction, moderation_id)
|
||||
|
||||
async def blacklist_autocomplete(self, interaction: discord.Interaction, current: str,) -> list[app_commands.Choice[str]]:
|
||||
"""Autocompletes a blacklist role."""
|
||||
|
@ -715,7 +277,7 @@ class Moderation(commands.Cog):
|
|||
await interaction.response.send_message(content=f"Please provide a valid blacklist type!", ephemeral=True)
|
||||
return
|
||||
|
||||
if not await self.check_moddable(target, interaction, ['moderate_members', 'manage_roles']):
|
||||
if not await check_moddable(target, interaction, ['moderate_members', 'manage_roles']):
|
||||
return
|
||||
|
||||
if role in [role.id for role in target.roles]:
|
||||
|
@ -726,9 +288,9 @@ class Moderation(commands.Cog):
|
|||
await target.add_roles(role, reason=f"Blacklisted by {interaction.user.id} for {humanize.precisedelta(matching_role['duration'])} for: {reason}")
|
||||
await interaction.response.send_message(content=f"{target.mention} has been blacklisted with the {role_obj.name} role for {humanize.precisedelta(matching_role['duration'])}!\n**Reason** - `{reason}`")
|
||||
|
||||
moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'BLACKLIST', target.id, role, matching_role['duration'], reason)
|
||||
moderation_id = await mysql_log(interaction.guild.id, interaction.user.id, 'BLACKLIST', target.id, role, matching_role['duration'], reason)
|
||||
await interaction.edit_original_response(content=f"{target.mention} has been blacklisted with the {role_obj.name} role for {humanize.precisedelta(matching_role['duration'])}! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`")
|
||||
await self.log(interaction, moderation_id)
|
||||
await log(interaction, moderation_id)
|
||||
|
||||
@app_commands.command(name="mute")
|
||||
async def mute(self, interaction: discord.Interaction, target: discord.Member, duration: str, reason: str, silent: bool = None):
|
||||
|
@ -744,7 +306,7 @@ class Moderation(commands.Cog):
|
|||
Why are you unbanning this user?
|
||||
silent: bool
|
||||
Should the user be messaged?"""
|
||||
if not await self.check_moddable(target, interaction, ['moderate_members']):
|
||||
if not await check_moddable(target, interaction, ['moderate_members']):
|
||||
return
|
||||
|
||||
if target.is_timed_out() is True:
|
||||
|
@ -769,14 +331,14 @@ class Moderation(commands.Cog):
|
|||
silent = not await self.config.guild(interaction.guild).dm_users()
|
||||
if silent is False:
|
||||
try:
|
||||
embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='muted', response=await interaction.original_response(), duration=parsed_time)
|
||||
embed = await embed_factory('message', await self.bot.get_embed_color(None), guild=interaction.guild, reason=reason, moderation_type='muted', response=await interaction.original_response(), duration=parsed_time)
|
||||
await target.send(embed=embed)
|
||||
except discord.errors.HTTPException:
|
||||
pass
|
||||
|
||||
moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'MUTE', target.id, 0, parsed_time, reason)
|
||||
moderation_id = await mysql_log(interaction.guild.id, interaction.user.id, 'MUTE', target.id, 0, parsed_time, reason)
|
||||
await interaction.edit_original_response(content=f"{target.mention} has been muted for {humanize.precisedelta(parsed_time)}! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`")
|
||||
await self.log(interaction, moderation_id)
|
||||
await log(interaction, moderation_id)
|
||||
|
||||
@app_commands.command(name="unmute")
|
||||
async def unmute(self, interaction: discord.Interaction, target: discord.Member, reason: str = None, silent: bool = None):
|
||||
|
@ -790,7 +352,7 @@ class Moderation(commands.Cog):
|
|||
Why are you unmuting this user?
|
||||
silent: bool
|
||||
Should the user be messaged?"""
|
||||
if not await self.check_moddable(target, interaction, ['moderate_members']):
|
||||
if not await check_moddable(target, interaction, ['moderate_members']):
|
||||
return
|
||||
|
||||
if target.is_timed_out() is False:
|
||||
|
@ -809,14 +371,14 @@ class Moderation(commands.Cog):
|
|||
silent = not await self.config.guild(interaction.guild).dm_users()
|
||||
if silent is False:
|
||||
try:
|
||||
embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='unmuted', response=await interaction.original_response())
|
||||
embed = await embed_factory('message', await self.bot.get_embed_color(None), guild=interaction.guild, reason=reason, moderation_type='unmuted', response=await interaction.original_response())
|
||||
await target.send(embed=embed)
|
||||
except discord.errors.HTTPException:
|
||||
pass
|
||||
|
||||
moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'UNMUTE', target.id, 0, 'NULL', reason)
|
||||
moderation_id = await mysql_log(interaction.guild.id, interaction.user.id, 'UNMUTE', target.id, 0, 'NULL', reason)
|
||||
await interaction.edit_original_response(content=f"{target.mention} has been unmuted! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`")
|
||||
await self.log(interaction, moderation_id)
|
||||
await log(interaction, moderation_id)
|
||||
|
||||
@app_commands.command(name="kick")
|
||||
async def kick(self, interaction: discord.Interaction, target: discord.Member, reason: str, silent: bool = None):
|
||||
|
@ -830,7 +392,7 @@ class Moderation(commands.Cog):
|
|||
Why are you kicking this user?
|
||||
silent: bool
|
||||
Should the user be messaged?"""
|
||||
if not await self.check_moddable(target, interaction, ['kick_members']):
|
||||
if not await check_moddable(target, interaction, ['kick_members']):
|
||||
return
|
||||
|
||||
await interaction.response.send_message(content=f"{target.mention} has been kicked!\n**Reason** - `{reason}`")
|
||||
|
@ -839,16 +401,16 @@ class Moderation(commands.Cog):
|
|||
silent = not await self.config.guild(interaction.guild).dm_users()
|
||||
if silent is False:
|
||||
try:
|
||||
embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='kicked', response=await interaction.original_response())
|
||||
embed = await embed_factory('message', await self.bot.get_embed_color(None), guild=interaction.guild, reason=reason, moderation_type='kicked', response=await interaction.original_response())
|
||||
await target.send(embed=embed)
|
||||
except discord.errors.HTTPException:
|
||||
pass
|
||||
|
||||
await target.kick(reason=f"Kicked by {interaction.user.id} for: {reason}")
|
||||
|
||||
moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'KICK', target.id, 0, 'NULL', reason)
|
||||
moderation_id = await mysql_log(interaction.guild.id, interaction.user.id, 'KICK', target.id, 0, 'NULL', reason)
|
||||
await interaction.edit_original_response(content=f"{target.mention} has been kicked! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`")
|
||||
await self.log(interaction, moderation_id)
|
||||
await log(interaction, moderation_id)
|
||||
|
||||
@app_commands.command(name="ban")
|
||||
@app_commands.choices(delete_messages=[
|
||||
|
@ -874,7 +436,7 @@ class Moderation(commands.Cog):
|
|||
How many days of messages to delete?
|
||||
silent: bool
|
||||
Should the user be messaged?"""
|
||||
if not await self.check_moddable(target, interaction, ['ban_members']):
|
||||
if not await check_moddable(target, interaction, ['ban_members']):
|
||||
return
|
||||
|
||||
try:
|
||||
|
@ -894,16 +456,16 @@ class Moderation(commands.Cog):
|
|||
await interaction.response.send_message(content=f"{target.mention} has been banned for {humanize.precisedelta(parsed_time)}!\n**Reason** - `{reason}`")
|
||||
|
||||
try:
|
||||
embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='tempbanned', response=await interaction.original_response(), duration=parsed_time)
|
||||
embed = await embed_factory('message', await self.bot.get_embed_color(None), guild=interaction.guild, reason=reason, moderation_type='tempbanned', response=await interaction.original_response(), duration=parsed_time)
|
||||
await target.send(embed=embed)
|
||||
except discord.errors.HTTPException:
|
||||
pass
|
||||
|
||||
await interaction.guild.ban(target, reason=f"Tempbanned by {interaction.user.id} for: {reason} (Duration: {parsed_time})", delete_message_seconds=delete_messages)
|
||||
|
||||
moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'TEMPBAN', target.id, 0, parsed_time, reason)
|
||||
moderation_id = await mysql_log(interaction.guild.id, interaction.user.id, 'TEMPBAN', target.id, 0, parsed_time, reason)
|
||||
await interaction.edit_original_response(content=f"{target.mention} has been banned for {humanize.precisedelta(parsed_time)}! (Case `#{moderation_id}`)\n**Reason** - `{reason}`")
|
||||
await self.log(interaction, moderation_id)
|
||||
await log(interaction, moderation_id)
|
||||
else:
|
||||
await interaction.response.send_message(content=f"{target.mention} has been banned!\n**Reason** - `{reason}`")
|
||||
|
||||
|
@ -911,7 +473,7 @@ class Moderation(commands.Cog):
|
|||
silent = not await self.config.guild(interaction.guild).dm_users()
|
||||
if silent is False:
|
||||
try:
|
||||
embed = embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='banned', response=await interaction.original_response())
|
||||
embed = embed = await embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='banned', response=await interaction.original_response())
|
||||
await target.send(embed=embed)
|
||||
except discord.errors.HTTPException:
|
||||
pass
|
||||
|
@ -920,7 +482,7 @@ class Moderation(commands.Cog):
|
|||
|
||||
moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'BAN', target.id, 0, 'NULL', reason)
|
||||
await interaction.edit_original_response(content=f"{target.mention} has been banned! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`")
|
||||
await self.log(interaction, moderation_id)
|
||||
await log(interaction, moderation_id)
|
||||
|
||||
@app_commands.command(name="unban")
|
||||
async def unban(self, interaction: discord.Interaction, target: discord.User, reason: str = None, silent: bool = None):
|
||||
|
@ -934,7 +496,7 @@ class Moderation(commands.Cog):
|
|||
Why are you unbanning this user?
|
||||
silent: bool
|
||||
Should the user be messaged?"""
|
||||
if not await self.check_moddable(target, interaction, ['ban_members']):
|
||||
if not await check_moddable(target, interaction, ['ban_members']):
|
||||
return
|
||||
|
||||
try:
|
||||
|
@ -955,14 +517,14 @@ class Moderation(commands.Cog):
|
|||
silent = not await self.config.guild(interaction.guild).dm_users()
|
||||
if silent is False:
|
||||
try:
|
||||
embed = await self.embed_factory('message', guild=interaction.guild, reason=reason, moderation_type='unbanned', response=await interaction.original_response())
|
||||
embed = await embed_factory('message', await self.bot.get_embed_color(None), guild=interaction.guild, reason=reason, moderation_type='unbanned', response=await interaction.original_response())
|
||||
await target.send(embed=embed)
|
||||
except discord.errors.HTTPException:
|
||||
pass
|
||||
|
||||
moderation_id = await self.mysql_log(interaction.guild.id, interaction.user.id, 'UNBAN', target.id, 0, 'NULL', reason)
|
||||
moderation_id = await mysql_log(interaction.guild.id, interaction.user.id, 'UNBAN', target.id, 0, 'NULL', reason)
|
||||
await interaction.edit_original_response(content=f"{target.mention} has been unbanned! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`")
|
||||
await self.log(interaction, moderation_id)
|
||||
await log(interaction, moderation_id)
|
||||
|
||||
@app_commands.command(name="history")
|
||||
async def history(self, interaction: discord.Interaction, target: discord.User = None, moderator: discord.User = None, pagesize: app_commands.Range[int, 1, 20] = None, page: int = 1, ephemeral: bool = None, inline: bool = None, export: bool = False):
|
||||
|
@ -1007,12 +569,12 @@ class Moderation(commands.Cog):
|
|||
|
||||
await interaction.response.defer(ephemeral=ephemeral)
|
||||
|
||||
permissions = self.check_permissions(interaction.client.user, ['embed_links'], interaction)
|
||||
permissions = check_permissions(interaction.client.user, ['embed_links'], interaction)
|
||||
if permissions:
|
||||
await interaction.followup.send(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True)
|
||||
return
|
||||
|
||||
database = await self.connect()
|
||||
database = await connect()
|
||||
cursor = database.cursor()
|
||||
|
||||
if target:
|
||||
|
@ -1037,7 +599,7 @@ class Moderation(commands.Cog):
|
|||
result_dict_list = []
|
||||
|
||||
for result in results:
|
||||
case_dict = self.generate_dict(result)
|
||||
case_dict = generate_dict(result)
|
||||
if case_dict['moderation_id'] == 0:
|
||||
continue
|
||||
result_dict_list.append(case_dict)
|
||||
|
@ -1070,11 +632,11 @@ class Moderation(commands.Cog):
|
|||
|
||||
for case in result_dict_list[start_index:end_index]:
|
||||
if case['target_id'] not in memory_dict:
|
||||
memory_dict[str(case['target_id'])] = await self.fetch_user_dict(interaction, case['target_id'])
|
||||
memory_dict[str(case['target_id'])] = await fetch_user_dict(interaction, case['target_id'])
|
||||
target_user = memory_dict[str(case['target_id'])]
|
||||
|
||||
if case['moderator_id'] not in memory_dict:
|
||||
memory_dict[str(case['moderator_id'])] = await self.fetch_user_dict(interaction, case['moderator_id'])
|
||||
memory_dict[str(case['moderator_id'])] = await fetch_user_dict(interaction, case['moderator_id'])
|
||||
moderator_user = memory_dict[str(case['moderator_id'])]
|
||||
|
||||
target_name = target_user['name'] if target_user['discriminator'] == "0" else f"{target_user['name']}#{target_user['discriminator']}"
|
||||
|
@ -1112,16 +674,16 @@ class Moderation(commands.Cog):
|
|||
Case number of the case you're trying to resolve
|
||||
reason: str
|
||||
Reason for resolving case"""
|
||||
permissions = self.check_permissions(interaction.client.user, ['embed_links', 'moderate_members', 'ban_members'], interaction)
|
||||
permissions = check_permissions(interaction.client.user, ['embed_links', 'moderate_members', 'ban_members'], interaction)
|
||||
if permissions:
|
||||
await interaction.response.send_message(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True)
|
||||
return
|
||||
|
||||
conf = await self.check_conf(['mysql_database'])
|
||||
conf = await check_conf(['mysql_database'])
|
||||
if conf:
|
||||
raise(LookupError)
|
||||
|
||||
database = await self.connect()
|
||||
database = await connect()
|
||||
cursor = database.cursor()
|
||||
db = await self.config.mysql_database()
|
||||
|
||||
|
@ -1139,7 +701,7 @@ class Moderation(commands.Cog):
|
|||
await interaction.response.send_message(content=f"This moderation has already been resolved!\nUse `/case {case}` for more information.", ephemeral=True)
|
||||
return
|
||||
|
||||
case_dict = self.generate_dict(result_2)
|
||||
case_dict = generate_dict(result_2)
|
||||
if reason is None:
|
||||
reason = "No reason given."
|
||||
|
||||
|
@ -1192,9 +754,9 @@ class Moderation(commands.Cog):
|
|||
cursor.execute(resolve_query, (json.dumps(changes), interaction.user.id, reason, case_dict['moderation_id']))
|
||||
database.commit()
|
||||
|
||||
embed = await self.embed_factory('case', interaction=interaction, case_dict=await self.fetch_case(case, interaction.guild.id))
|
||||
embed = await embed_factory('case', await self.bot.get_embed_color(None), interaction=interaction, case_dict=await self.fetch_case(case, interaction.guild.id))
|
||||
await interaction.response.send_message(content=f"✅ Moderation #{case:,} resolved!", embed=embed)
|
||||
await self.log(interaction, case, True)
|
||||
await log(interaction, case, True)
|
||||
|
||||
cursor.close()
|
||||
database.close()
|
||||
|
@ -1217,7 +779,7 @@ class Moderation(commands.Cog):
|
|||
List the changes made to the case
|
||||
export: bool
|
||||
Export the case to a JSON file or codeblock"""
|
||||
permissions = self.check_permissions(interaction.client.user, ['embed_links'], interaction)
|
||||
permissions = check_permissions(interaction.client.user, ['embed_links'], interaction)
|
||||
if permissions:
|
||||
await interaction.response.send_message(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True)
|
||||
return
|
||||
|
@ -1228,7 +790,7 @@ class Moderation(commands.Cog):
|
|||
or False)
|
||||
|
||||
if case != 0:
|
||||
case_dict = await self.fetch_case(case, interaction.guild.id)
|
||||
case_dict = await fetch_case(case, interaction.guild.id)
|
||||
if case_dict:
|
||||
if export:
|
||||
if export.value == 'file' or len(str(case_dict)) > 1800:
|
||||
|
@ -1249,9 +811,9 @@ class Moderation(commands.Cog):
|
|||
await interaction.response.send_message(content=f"```json\n{json.dumps(case_dict, indent=2)}```", ephemeral=ephemeral)
|
||||
return
|
||||
if changes:
|
||||
embed = await self.embed_factory('changes', interaction=interaction, case_dict=case_dict)
|
||||
embed = await embed_factory('changes', await self.bot.get_embed_color(None), interaction=interaction, case_dict=case_dict)
|
||||
else:
|
||||
embed = await self.embed_factory('case', interaction=interaction, case_dict=case_dict)
|
||||
embed = await embed_factory('case', await self.bot.get_embed_color(None), interaction=interaction, case_dict=case_dict)
|
||||
await interaction.response.send_message(embed=embed, ephemeral=ephemeral)
|
||||
return
|
||||
await interaction.response.send_message(content=f"No case with case number `{case}` found.", ephemeral=True)
|
||||
|
@ -1268,16 +830,16 @@ class Moderation(commands.Cog):
|
|||
What is the new reason?
|
||||
duration: str
|
||||
What is the new duration? Does not reapply the moderation if it has already expired."""
|
||||
permissions = self.check_permissions(interaction.client.user, ['embed_links'], interaction)
|
||||
permissions = check_permissions(interaction.client.user, ['embed_links'], interaction)
|
||||
if permissions:
|
||||
await interaction.response.send_message(f"I do not have the `{permissions}` permission, required for this action.", ephemeral=True)
|
||||
return
|
||||
|
||||
if case != 0:
|
||||
parsed_time = None
|
||||
case_dict = await self.fetch_case(case, interaction.guild.id)
|
||||
case_dict = await fetch_case(case, interaction.guild.id)
|
||||
if case_dict:
|
||||
conf = await self.check_conf(['mysql_database'])
|
||||
conf = await check_conf(['mysql_database'])
|
||||
if conf:
|
||||
raise(LookupError)
|
||||
|
||||
|
@ -1340,7 +902,7 @@ class Moderation(commands.Cog):
|
|||
}
|
||||
)
|
||||
|
||||
database = await self.connect()
|
||||
database = await connect()
|
||||
cursor = database.cursor()
|
||||
db = await self.config.mysql_database()
|
||||
|
||||
|
@ -1352,11 +914,11 @@ class Moderation(commands.Cog):
|
|||
cursor.execute(update_query, (json.dumps(changes), reason, case))
|
||||
database.commit()
|
||||
|
||||
new_case = await self.fetch_case(case, interaction.guild.id)
|
||||
embed = await self.embed_factory('case', interaction=interaction, case_dict=new_case)
|
||||
new_case = await fetch_case(case, interaction.guild.id)
|
||||
embed = await embed_factory('case', await self.bot.get_embed_color(None), interaction=interaction, case_dict=new_case)
|
||||
|
||||
await interaction.response.send_message(content=f"✅ Moderation #{case:,} edited!", embed=embed, ephemeral=True)
|
||||
await self.log(interaction, case)
|
||||
await log(interaction, case)
|
||||
|
||||
cursor.close()
|
||||
database.close()
|
||||
|
@ -1365,11 +927,11 @@ class Moderation(commands.Cog):
|
|||
|
||||
@tasks.loop(minutes=1)
|
||||
async def handle_expiry(self):
|
||||
conf = await self.check_conf(['mysql_database'])
|
||||
conf = await check_conf(['mysql_database'])
|
||||
if conf:
|
||||
raise(LookupError)
|
||||
|
||||
database = await self.connect()
|
||||
database = await connect()
|
||||
cursor = database.cursor()
|
||||
db = await self.config.mysql_database()
|
||||
|
||||
|
@ -1393,7 +955,7 @@ class Moderation(commands.Cog):
|
|||
try:
|
||||
await guild.unban(user, reason=f"Automatic unban from case #{moderation_id}")
|
||||
|
||||
embed = await self.embed_factory('message', guild, f'Automatic unban from case #{moderation_id}', 'unbanned')
|
||||
embed = await embed_factory('message', guild, f'Automatic unban from case #{moderation_id}', 'unbanned')
|
||||
|
||||
try:
|
||||
await user.send(embed=embed)
|
||||
|
@ -1790,14 +1352,13 @@ class Moderation(commands.Cog):
|
|||
"""Import moderations from GalacticBot."""
|
||||
if ctx.message.attachments and ctx.message.attachments[0].content_type == 'application/json; charset=utf-8':
|
||||
message = await ctx.send("Are you sure you want to import GalacticBot moderations?\n**This will overwrite any moderations that already exist in this guild's moderation table.**\n*The import process will block the rest of your bot until it is complete.*")
|
||||
await message.edit(view=self.GalacticBotImportButtons(60, ctx, message, self))
|
||||
await message.edit(view=self.GalacticBotImportButtons(60, ctx, message))
|
||||
else:
|
||||
await ctx.send("Please provide a valid GalacticBot moderation export file.")
|
||||
|
||||
class GalacticBotImportButtons(discord.ui.View):
|
||||
def __init__(self, timeout, ctx, message, cog_instance):
|
||||
def __init__(self, timeout, ctx, message):
|
||||
super().__init__()
|
||||
self.cog_instance = cog_instance
|
||||
self.ctx: commands.Context = ctx
|
||||
self.message: discord.Message = message
|
||||
self.config = Config.get_conf(None, cog_name='Moderation', identifier=481923957134912)
|
||||
|
@ -1807,7 +1368,7 @@ class Moderation(commands.Cog):
|
|||
await self.message.delete()
|
||||
await interaction.response.send_message("Deleting original table...", ephemeral=True)
|
||||
|
||||
database = await Moderation.connect(self.cog_instance)
|
||||
database = await connect()
|
||||
cursor = database.cursor()
|
||||
|
||||
query = f"DROP TABLE IF EXISTS moderation_{self.ctx.guild.id};"
|
||||
|
@ -1818,7 +1379,7 @@ class Moderation(commands.Cog):
|
|||
|
||||
await interaction.edit_original_response(content="Creating new table...")
|
||||
|
||||
await Moderation.create_guild_table(self.cog_instance, self.ctx.guild)
|
||||
await create_guild_table(self.ctx.guild)
|
||||
|
||||
await interaction.edit_original_response(content="Importing moderations...")
|
||||
|
||||
|
@ -1891,8 +1452,7 @@ class Moderation(commands.Cog):
|
|||
resolved_reason = 'NULL'
|
||||
changes = []
|
||||
|
||||
await Moderation.mysql_log(
|
||||
self.cog_instance,
|
||||
await mysql_log(
|
||||
self.ctx.guild.id,
|
||||
case['executor'],
|
||||
case['type'],
|
||||
|
|
198
moderation/utils.py
Normal file
198
moderation/utils.py
Normal file
|
@ -0,0 +1,198 @@
|
|||
import json
|
||||
from typing import Union
|
||||
from discord import User, Member, Interaction, Guild
|
||||
from discord.errors import NotFound, Forbidden
|
||||
from redbot.core import commands
|
||||
import database as db
|
||||
from .embed_factory import embed_factory
|
||||
from .config import config
|
||||
|
||||
|
||||
async def check_conf(config_list: list):
|
||||
"""Checks if any required config options are not set."""
|
||||
not_found_list = []
|
||||
|
||||
for item in config_list:
|
||||
if await config.item() == " ":
|
||||
not_found_list.append(item)
|
||||
|
||||
return not_found_list
|
||||
|
||||
|
||||
def check_permissions(
|
||||
user: User,
|
||||
permissions: list,
|
||||
ctx: Union[commands.Context, Interaction] = None,
|
||||
guild: Guild = None,
|
||||
):
|
||||
"""Checks if a user has a specific permission (or a list of permissions) in a channel."""
|
||||
if ctx:
|
||||
member = ctx.guild.get_member(user.id)
|
||||
resolved_permissions = ctx.channel.permissions_for(member)
|
||||
|
||||
elif guild:
|
||||
member = guild.get_member(user.id)
|
||||
resolved_permissions = member.guild_permissions
|
||||
|
||||
else:
|
||||
raise (KeyError)
|
||||
|
||||
for permission in permissions:
|
||||
if (
|
||||
not getattr(resolved_permissions, permission, False)
|
||||
and not resolved_permissions.administrator is True
|
||||
):
|
||||
return permission
|
||||
|
||||
return False
|
||||
|
||||
|
||||
async def check_moddable(
|
||||
target: Union[User, Member], interaction: Interaction, permissions: list
|
||||
):
|
||||
"""Checks if a moderator can moderate a target."""
|
||||
if check_permissions(interaction.client.user, permissions, guild=interaction.guild):
|
||||
await interaction.response.send_message(
|
||||
f"I do not have the `{permissions}` permission, required for this action.",
|
||||
ephemeral=True,
|
||||
)
|
||||
return False
|
||||
|
||||
if await config.guild(interaction.guild).use_discord_permissions() is True:
|
||||
if check_permissions(interaction.user, permissions, guild=interaction.guild):
|
||||
await interaction.response.send_message(
|
||||
f"You do not have the `{permissions}` permission, required for this action.",
|
||||
ephemeral=True,
|
||||
)
|
||||
return False
|
||||
|
||||
if interaction.user.id == target.id:
|
||||
await interaction.response.send_message(
|
||||
content="You cannot moderate yourself!", ephemeral=True
|
||||
)
|
||||
return False
|
||||
|
||||
if target.bot:
|
||||
await interaction.response.send_message(
|
||||
content="You cannot moderate bots!", ephemeral=True
|
||||
)
|
||||
return False
|
||||
|
||||
if isinstance(target, Member):
|
||||
if interaction.user.top_role <= target.top_role:
|
||||
await interaction.response.send_message(
|
||||
content="You cannot moderate members with a higher role than you!",
|
||||
ephemeral=True,
|
||||
)
|
||||
return False
|
||||
|
||||
if (
|
||||
interaction.guild.get_member(interaction.client.user.id).top_role
|
||||
<= target.top_role
|
||||
):
|
||||
await interaction.response.send_message(
|
||||
content="You cannot moderate members with a role higher than the bot!",
|
||||
ephemeral=True,
|
||||
)
|
||||
return False
|
||||
|
||||
immune_roles = await config.guild(target.guild).immune_roles()
|
||||
|
||||
for role in target.roles:
|
||||
if role.id in immune_roles:
|
||||
await interaction.response.send_message(
|
||||
content="You cannot moderate members with an immune role!",
|
||||
ephemeral=True,
|
||||
)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
async def get_next_case_number(guild_id: str, cursor=None):
|
||||
"""This method returns the next case number from the MySQL table for a specific guild."""
|
||||
if not cursor:
|
||||
database = await db.connect()
|
||||
cursor = database.cursor()
|
||||
cursor.execute(
|
||||
f"SELECT moderation_id FROM `moderation_{guild_id}` ORDER BY moderation_id DESC LIMIT 1"
|
||||
)
|
||||
return cursor.fetchone()[0] + 1
|
||||
|
||||
|
||||
def generate_dict(result):
|
||||
case: dict = {
|
||||
"moderation_id": result[0],
|
||||
"timestamp": result[1],
|
||||
"moderation_type": result[2],
|
||||
"target_id": result[3],
|
||||
"moderator_id": result[4],
|
||||
"role_id": result[5],
|
||||
"duration": result[6],
|
||||
"end_timestamp": result[7],
|
||||
"reason": result[8],
|
||||
"resolved": result[9],
|
||||
"resolved_by": result[10],
|
||||
"resolve_reason": result[11],
|
||||
"expired": result[12],
|
||||
"changes": json.loads(result[13]),
|
||||
"metadata": json.loads(result[14]),
|
||||
}
|
||||
return case
|
||||
|
||||
|
||||
async def fetch_user_dict(interaction: Interaction, user_id: str):
|
||||
"""This method returns a dictionary containing either user information or a standard deleted user template."""
|
||||
if user_id == "?":
|
||||
user_dict = {"id": "?", "name": "Unknown User", "discriminator": "0"}
|
||||
|
||||
else:
|
||||
try:
|
||||
user = interaction.client.get_user(user_id)
|
||||
if user is None:
|
||||
user = await interaction.client.fetch_user(user_id)
|
||||
|
||||
user_dict = {
|
||||
"id": user.id,
|
||||
"name": user.name,
|
||||
"discriminator": user.discriminator,
|
||||
}
|
||||
|
||||
except NotFound:
|
||||
user_dict = {
|
||||
"id": user_id,
|
||||
"name": "Deleted User",
|
||||
"discriminator": "0",
|
||||
}
|
||||
|
||||
return user_dict
|
||||
|
||||
|
||||
async def fetch_role_dict(interaction: Interaction, role_id: str):
|
||||
"""This method returns a dictionary containing either role information or a standard deleted role template."""
|
||||
try:
|
||||
role = interaction.guild.get_role(role_id)
|
||||
|
||||
role_dict = {"id": role.id, "name": role.name}
|
||||
|
||||
except NotFound:
|
||||
role_dict = {"id": role_id, "name": "Deleted Role"}
|
||||
|
||||
return role_dict
|
||||
|
||||
|
||||
async def log(interaction: Interaction, moderation_id: int, resolved: bool = False):
|
||||
"""This method sends a message to the guild's configured logging channel when an infraction takes place."""
|
||||
logging_channel_id = await config.guild(interaction.guild).log_channel()
|
||||
if logging_channel_id != " ":
|
||||
logging_channel = interaction.guild.get_channel(logging_channel_id)
|
||||
|
||||
case = await db.fetch_case(moderation_id, interaction.guild.id)
|
||||
if case:
|
||||
embed = await embed_factory(
|
||||
"log", interaction=interaction, case_dict=case, resolved=resolved
|
||||
)
|
||||
try:
|
||||
await logging_channel.send(embed=embed)
|
||||
except Forbidden:
|
||||
return
|
Loading…
Reference in a new issue