SeaCogs/aurora/utilities/utils.py

282 lines
10 KiB
Python
Raw Permalink Normal View History

2024-02-28 10:58:57 -05:00
# pylint: disable=cyclic-import
from datetime import datetime, timedelta
from typing import Optional, Tuple, Union
2023-12-18 17:24:40 -05:00
2024-06-08 20:12:22 -04:00
import aiosqlite
from dateutil.relativedelta import relativedelta as rd
from discord import File, Guild, Interaction, Member, SelectOption, TextChannel, User
from discord.errors import Forbidden
from redbot.core import commands, data_manager
from redbot.core.utils.chat_formatting import error
2023-12-18 17:24:40 -05:00
2024-05-06 21:39:43 -04:00
from ..utilities.config import config
2024-06-08 20:12:22 -04:00
from ..utilities.json import dumps
from ..utilities.logger import logger
def check_permissions(
user: User,
permissions: Tuple[str],
2024-05-06 21:04:08 -04:00
ctx: Union[commands.Context, Interaction] | None = None,
guild: Guild | None = None,
) -> Union[bool, str]:
"""Checks if a user has a specific permission (or a list of permissions) in a channel."""
if ctx:
member = ctx.guild.get_member(user.id)
resolved_permissions = ctx.channel.permissions_for(member)
elif guild:
member = guild.get_member(user.id)
resolved_permissions = member.guild_permissions
else:
raise (KeyError)
for permission in permissions:
if (
not getattr(resolved_permissions, permission, False)
and resolved_permissions.administrator is not True
):
return permission
return False
async def check_moddable(
target: Union[User, Member, TextChannel], interaction: Interaction, permissions: Tuple[str]
) -> bool:
"""Checks if a moderator can moderate a target."""
is_channel = isinstance(target, TextChannel)
if check_permissions(interaction.client.user, permissions, guild=interaction.guild):
await interaction.response.send_message(
2024-02-02 11:22:08 -05:00
error(
f"I do not have the `{permissions}` permission, required for this action."
),
ephemeral=True,
)
return False
if await config.guild(interaction.guild).use_discord_permissions() is True:
if check_permissions(interaction.user, permissions, guild=interaction.guild):
await interaction.response.send_message(
2024-02-02 11:22:08 -05:00
error(
f"You do not have the `{permissions}` permission, required for this action."
),
ephemeral=True,
)
return False
if interaction.user.id == target.id:
await interaction.response.send_message(
content="You cannot moderate yourself!", ephemeral=True
)
return False
if not is_channel and target.bot:
await interaction.response.send_message(
content="You cannot moderate bots!", ephemeral=True
)
return False
if isinstance(target, Member):
if interaction.user.top_role <= target.top_role and await config.guild(interaction.guild).respect_hierarchy() is True:
await interaction.response.send_message(
2024-02-02 11:22:08 -05:00
content=error(
"You cannot moderate members with a higher role than you!"
),
ephemeral=True,
)
return False
if (
interaction.guild.get_member(interaction.client.user.id).top_role
<= target.top_role
):
await interaction.response.send_message(
2024-02-02 11:22:08 -05:00
content=error(
"You cannot moderate members with a role higher than the bot!"
),
ephemeral=True,
)
return False
immune_roles = await config.guild(target.guild).immune_roles()
for role in target.roles:
if role.id in immune_roles:
await interaction.response.send_message(
content=error("You cannot moderate members with an immune role!"),
ephemeral=True,
)
return False
return True
async def log(interaction: Interaction, moderation_id: int, resolved: bool = False) -> None:
"""This function sends a message to the guild's configured logging channel when an infraction takes place."""
2024-05-06 21:39:43 -04:00
from ..models.moderation import Moderation
from .factory import log_factory
2023-12-17 02:36:18 -05:00
logging_channel_id = await config.guild(interaction.guild).log_channel()
if logging_channel_id != " ":
logging_channel = interaction.guild.get_channel(logging_channel_id)
try:
2024-06-05 00:14:43 -04:00
moderation = await Moderation.find_by_id(interaction.client, moderation_id, interaction.guild_id)
2024-02-02 11:22:08 -05:00
embed = await log_factory(
interaction=interaction, moderation=moderation, resolved=resolved
2024-02-02 11:22:08 -05:00
)
try:
await logging_channel.send(embed=embed)
except Forbidden:
return
except ValueError:
return
2024-02-02 11:22:08 -05:00
async def send_evidenceformat(interaction: Interaction, moderation_id: int) -> None:
"""This function sends an ephemeral message to the moderator who took the moderation action, with a pre-made codeblock for use in the mod-evidence channel."""
2024-05-06 21:39:43 -04:00
from ..models.moderation import Moderation
from .factory import evidenceformat_factory
2024-02-02 11:22:08 -05:00
send_evidence_bool = (
await config.user(interaction.user).auto_evidenceformat()
or await config.guild(interaction.guild).auto_evidenceformat()
2024-02-02 11:22:08 -05:00
or False
)
if send_evidence_bool is False:
return
2024-06-05 00:14:43 -04:00
moderation = await Moderation.find_by_id(interaction.client, moderation_id, interaction.guild.id)
2024-05-06 16:47:21 -04:00
content = await evidenceformat_factory(moderation=moderation)
await interaction.followup.send(content=content, ephemeral=True)
2024-02-02 11:22:08 -05:00
def get_bool_emoji(value: Optional[bool]) -> str:
"""Returns a unicode emoji based on a boolean value."""
match value:
case True:
return "\N{WHITE HEAVY CHECK MARK}"
case False:
return "\N{NO ENTRY SIGN}"
case _:
return "\N{BLACK QUESTION MARK ORNAMENT}\N{VARIATION SELECTOR-16}"
2024-02-02 11:22:08 -05:00
def get_pagesize_str(value: Union[int, None]) -> str:
"""Returns a string based on a pagesize value."""
if value is None:
return "\N{BLACK QUESTION MARK ORNAMENT}\N{VARIATION SELECTOR-16}"
return str(value) + " cases per page"
2024-02-02 11:22:08 -05:00
def create_pagesize_options() -> list[SelectOption]:
"""Returns a list of SelectOptions for pagesize configuration."""
options = []
options.append(
SelectOption(
label="Default",
value="default",
description="Reset the pagesize to the default value.",
)
)
for i in range(1, 21):
options.append(
SelectOption(
label=str(i),
value=str(i),
description=f"Set the pagesize to {i}.",
)
)
return options
def timedelta_from_relativedelta(relativedelta: rd) -> timedelta:
"""Converts a relativedelta object to a timedelta object."""
now = datetime.now()
then = now - relativedelta
return now - then
def timedelta_from_string(string: str) -> timedelta:
"""Converts a string to a timedelta object."""
hours, minutes, seconds = map(int, string.split(":"))
return timedelta(hours=hours, minutes=minutes, seconds=seconds)
2024-06-04 23:55:55 -04:00
def timedelta_to_string(td: timedelta) -> str:
"""Converts a timedelta object to a string."""
2024-06-04 23:55:55 -04:00
days = td.days * 24
hours, remainder = divmod(td.seconds, 3600)
minutes, seconds = divmod(remainder, 60)
return f"{days + hours}:{minutes:02}:{seconds:02}"
def get_footer_image(coginstance: commands.Cog) -> File:
"""Returns the footer image for the embeds."""
image_path = data_manager.bundled_data_path(coginstance) / "arrow.png"
return File(image_path, filename="arrow.png", description="arrow")
2024-06-08 20:12:22 -04:00
async def create_guild_table(guild: Guild) -> None:
from ..models.moderation import Moderation
try:
await Moderation.execute(f"SELECT * FROM `moderation_{guild.id}`", return_obj=False)
logger.trace("SQLite Table exists for server %s (%s)", guild.name, guild.id)
except aiosqlite.OperationalError:
query = f"""
CREATE TABLE `moderation_{guild.id}` (
moderation_id INTEGER PRIMARY KEY,
timestamp INTEGER NOT NULL,
moderation_type TEXT NOT NULL,
target_type TEXT NOT NULL,
target_id INTEGER NOT NULL,
moderator_id INTEGER NOT NULL,
role_id INTEGER,
duration TEXT,
end_timestamp INTEGER,
reason TEXT,
resolved INTEGER NOT NULL,
resolved_by TEXT,
resolve_reason TEXT,
expired INTEGER NOT NULL,
changes JSON NOT NULL,
metadata JSON NOT NULL
)
"""
await Moderation.execute(query=query, return_obj=False)
index_query_1 = f"CREATE INDEX IF NOT EXISTS idx_target_id ON moderation_{guild.id}(target_id);"
await Moderation.execute(query=index_query_1, return_obj=False)
index_query_2 = f"CREATE INDEX IF NOT EXISTS idx_moderator_id ON moderation_{guild.id}(moderator_id);"
await Moderation.execute(query=index_query_2, return_obj=False)
index_query_3 = f"CREATE INDEX IF NOT EXISTS idx_moderation_id ON moderation_{guild.id}(moderation_id);"
await Moderation.execute(query=index_query_3, return_obj=False)
insert_query = f"""
INSERT INTO `moderation_{guild.id}`
(moderation_id, timestamp, moderation_type, target_type, target_id, moderator_id, role_id, duration, end_timestamp, reason, resolved, resolved_by, resolve_reason, expired, changes, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
insert_values = (
0,
0,
"NULL",
"NULL",
0,
0,
None,
None,
None,
None,
0,
None,
None,
0,
dumps([]),
dumps({}),
)
await Moderation.execute(query=insert_query, parameters=insert_values, return_obj=False)
logger.trace("SQLite Table created for server %s (%s)", guild.name, guild.id)