288 lines
10 KiB
Python
288 lines
10 KiB
Python
# pylint: disable=cyclic-import
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional, Tuple, Union
|
|
|
|
import aiosqlite
|
|
from dateutil.relativedelta import relativedelta as rd
|
|
from discord import File, Guild, Interaction, Member, SelectOption, TextChannel, User
|
|
from discord.errors import Forbidden
|
|
from redbot.core import commands, data_manager
|
|
from redbot.core.utils.chat_formatting import error
|
|
|
|
from ..models.type import Type
|
|
from ..utilities.config import config
|
|
from ..utilities.json import dumps
|
|
from ..utilities.logger import logger
|
|
|
|
|
|
def check_permissions(
|
|
user: User,
|
|
permissions: Tuple[str],
|
|
ctx: commands.Context | Interaction | None = None,
|
|
guild: Guild | None = None,
|
|
) -> Union[bool, str]:
|
|
"""Checks if a user has a specific permission (or a list of permissions) in a channel."""
|
|
if ctx:
|
|
member = ctx.guild.get_member(user.id)
|
|
resolved_permissions = ctx.channel.permissions_for(member)
|
|
|
|
elif guild:
|
|
member = guild.get_member(user.id)
|
|
resolved_permissions = member.guild_permissions
|
|
|
|
else:
|
|
raise (KeyError)
|
|
|
|
for permission in permissions:
|
|
if (
|
|
not getattr(resolved_permissions, permission, False)
|
|
and resolved_permissions.administrator is not True
|
|
):
|
|
return permission
|
|
|
|
return False
|
|
|
|
|
|
async def check_moddable(
|
|
target: Union[User, Member, TextChannel], ctx: commands.Context, permissions: Tuple[str], moderation_type: Type,
|
|
) -> bool:
|
|
"""Checks if a moderator can moderate a target."""
|
|
is_channel = isinstance(target, TextChannel)
|
|
|
|
use_discord_permissions = await config.custom("types", ctx.guild.id, moderation_type.key).use_discord_permissions()
|
|
if use_discord_permissions is None:
|
|
use_discord_permissions = await config.guild(ctx.guild).use_discord_permissions()
|
|
|
|
if check_permissions(ctx.bot.user, permissions, guild=ctx.guild):
|
|
await ctx.send(
|
|
error(
|
|
f"I do not have the `{permissions}` permission, required for this action."
|
|
),
|
|
ephemeral=True,
|
|
)
|
|
return False
|
|
|
|
if use_discord_permissions is True:
|
|
if check_permissions(ctx.author, permissions, guild=ctx.guild):
|
|
await ctx.send(
|
|
error(
|
|
f"You do not have the `{permissions}` permission, required for this action."
|
|
),
|
|
ephemeral=True,
|
|
)
|
|
return False
|
|
|
|
if ctx.author.id == target.id:
|
|
await ctx.send(
|
|
content="You cannot moderate yourself!", ephemeral=True
|
|
)
|
|
return False
|
|
|
|
if not is_channel and target.bot:
|
|
await ctx.send(
|
|
content="You cannot moderate bots!", ephemeral=True
|
|
)
|
|
return False
|
|
|
|
if isinstance(target, Member):
|
|
if ctx.author.top_role <= target.top_role and await config.guild(ctx.guild).respect_hierarchy() is True:
|
|
await ctx.send(
|
|
content=error(
|
|
"You cannot moderate members with a higher role than you!"
|
|
),
|
|
ephemeral=True,
|
|
)
|
|
return False
|
|
|
|
if (
|
|
ctx.guild.get_member(ctx.bot.user.id).top_role
|
|
<= target.top_role
|
|
):
|
|
await ctx.send(
|
|
content=error(
|
|
"You cannot moderate members with a role higher than the bot!"
|
|
),
|
|
ephemeral=True,
|
|
)
|
|
return False
|
|
|
|
immune_roles = await config.guild(target.guild).immune_roles()
|
|
|
|
for role in target.roles:
|
|
if role.id in immune_roles:
|
|
await ctx.send(
|
|
content=error("You cannot moderate members with an immune role!"),
|
|
ephemeral=True,
|
|
)
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
async def log(ctx: commands.Context, moderation_id: int, resolved: bool = False) -> None:
|
|
"""This function sends a message to the guild's configured logging channel when an infraction takes place."""
|
|
from ..models.moderation import Moderation
|
|
from .factory import log_factory
|
|
|
|
logging_channel_id = await config.guild(ctx.guild).log_channel()
|
|
if logging_channel_id != " ":
|
|
logging_channel = ctx.guild.get_channel(logging_channel_id)
|
|
|
|
try:
|
|
moderation = await Moderation.find_by_id(ctx.bot, moderation_id, ctx.guild.id)
|
|
embed = await log_factory(
|
|
ctx=ctx, moderation=moderation, resolved=resolved
|
|
)
|
|
try:
|
|
await logging_channel.send(embed=embed)
|
|
except Forbidden:
|
|
return
|
|
except ValueError:
|
|
return
|
|
|
|
|
|
async def send_evidenceformat(ctx: commands.Context, moderation_id: int) -> None:
|
|
"""This function sends an ephemeral message to the moderator who took the moderation action, with a pre-made codeblock for use in the mod-evidence channel."""
|
|
from ..models.moderation import Moderation
|
|
from .factory import evidenceformat_factory
|
|
|
|
send_evidence_bool = (
|
|
await config.user(ctx.author).auto_evidenceformat()
|
|
or await config.guild(guild=ctx.guild).auto_evidenceformat()
|
|
or False
|
|
)
|
|
if send_evidence_bool is True:
|
|
moderation = await Moderation.find_by_id(ctx.bot, moderation_id, ctx.guild.id)
|
|
content = await evidenceformat_factory(moderation=moderation)
|
|
if not ctx.interaction:
|
|
await ctx.author.send(content=content)
|
|
else:
|
|
await ctx.send(content=content, ephemeral=True)
|
|
|
|
|
|
def get_bool_emoji(value: Optional[bool]) -> str:
|
|
"""Returns a unicode emoji based on a boolean value."""
|
|
match value:
|
|
case True:
|
|
return "\N{WHITE HEAVY CHECK MARK}"
|
|
case False:
|
|
return "\N{NO ENTRY SIGN}"
|
|
case _:
|
|
return "\N{BLACK QUESTION MARK ORNAMENT}\N{VARIATION SELECTOR-16}"
|
|
|
|
|
|
def get_pagesize_str(value: Union[int, None]) -> str:
|
|
"""Returns a string based on a pagesize value."""
|
|
if value is None:
|
|
return "\N{BLACK QUESTION MARK ORNAMENT}\N{VARIATION SELECTOR-16}"
|
|
return str(value) + " cases per page"
|
|
|
|
|
|
def create_pagesize_options() -> list[SelectOption]:
|
|
"""Returns a list of SelectOptions for pagesize configuration."""
|
|
options = []
|
|
options.append(
|
|
SelectOption(
|
|
label="Default",
|
|
value="default",
|
|
description="Reset the pagesize to the default value.",
|
|
)
|
|
)
|
|
for i in range(1, 21):
|
|
options.append(
|
|
SelectOption(
|
|
label=str(i),
|
|
value=str(i),
|
|
description=f"Set the pagesize to {i}.",
|
|
)
|
|
)
|
|
return options
|
|
|
|
def timedelta_from_relativedelta(relativedelta: rd) -> timedelta:
|
|
"""Converts a relativedelta object to a timedelta object."""
|
|
now = datetime.now()
|
|
then = now - relativedelta
|
|
return now - then
|
|
|
|
def timedelta_from_string(string: str) -> timedelta:
|
|
"""Converts a string to a timedelta object."""
|
|
hours, minutes, seconds = map(int, string.split(":"))
|
|
return timedelta(hours=hours, minutes=minutes, seconds=seconds)
|
|
|
|
def timedelta_to_string(td: timedelta) -> str:
|
|
"""Converts a timedelta object to a string."""
|
|
days = td.days * 24
|
|
hours, remainder = divmod(td.seconds, 3600)
|
|
minutes, seconds = divmod(remainder, 60)
|
|
return f"{days + hours}:{minutes:02}:{seconds:02}"
|
|
|
|
def get_footer_image(coginstance: commands.Cog) -> File:
|
|
"""Returns the footer image for the embeds."""
|
|
image_path = data_manager.bundled_data_path(coginstance) / "arrow.png"
|
|
return File(image_path, filename="arrow.png", description="arrow")
|
|
|
|
async def create_guild_table(guild: Guild) -> None:
|
|
from ..models.moderation import Moderation
|
|
|
|
try:
|
|
await Moderation.execute(f"SELECT * FROM `moderation_{guild.id}`", return_obj=False)
|
|
logger.trace("SQLite Table exists for server %s (%s)", guild.name, guild.id)
|
|
|
|
except aiosqlite.OperationalError:
|
|
query = f"""
|
|
CREATE TABLE `moderation_{guild.id}` (
|
|
moderation_id INTEGER PRIMARY KEY,
|
|
timestamp INTEGER NOT NULL,
|
|
moderation_type TEXT NOT NULL,
|
|
target_type TEXT NOT NULL,
|
|
target_id INTEGER NOT NULL,
|
|
moderator_id INTEGER NOT NULL,
|
|
role_id INTEGER,
|
|
duration TEXT,
|
|
end_timestamp INTEGER,
|
|
reason TEXT,
|
|
resolved INTEGER NOT NULL,
|
|
resolved_by TEXT,
|
|
resolve_reason TEXT,
|
|
expired INTEGER NOT NULL,
|
|
changes JSON NOT NULL,
|
|
metadata JSON NOT NULL
|
|
)
|
|
"""
|
|
await Moderation.execute(query=query, return_obj=False)
|
|
|
|
index_query_1 = f"CREATE INDEX IF NOT EXISTS idx_target_id ON moderation_{guild.id}(target_id);"
|
|
await Moderation.execute(query=index_query_1, return_obj=False)
|
|
|
|
index_query_2 = f"CREATE INDEX IF NOT EXISTS idx_moderator_id ON moderation_{guild.id}(moderator_id);"
|
|
await Moderation.execute(query=index_query_2, return_obj=False)
|
|
|
|
index_query_3 = f"CREATE INDEX IF NOT EXISTS idx_moderation_id ON moderation_{guild.id}(moderation_id);"
|
|
await Moderation.execute(query=index_query_3, return_obj=False)
|
|
|
|
insert_query = f"""
|
|
INSERT INTO `moderation_{guild.id}`
|
|
(moderation_id, timestamp, moderation_type, target_type, target_id, moderator_id, role_id, duration, end_timestamp, reason, resolved, resolved_by, resolve_reason, expired, changes, metadata)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
"""
|
|
insert_values = (
|
|
0,
|
|
0,
|
|
"NULL",
|
|
"NULL",
|
|
0,
|
|
0,
|
|
None,
|
|
None,
|
|
None,
|
|
None,
|
|
0,
|
|
None,
|
|
None,
|
|
0,
|
|
dumps([]),
|
|
dumps({}),
|
|
)
|
|
await Moderation.execute(query=insert_query, parameters=insert_values, return_obj=False)
|
|
|
|
logger.trace("SQLite Table created for server %s (%s)", guild.name, guild.id)
|