SeaCogs/aurora/utilities/utils.py
cswimr d600a10729
All checks were successful
Actions / Build Documentation (MkDocs) (pull_request) Successful in 29s
Actions / Lint Code (Ruff & Pylint) (pull_request) Successful in 51s
fix(aurora): don't try and use the attribute of a Member object when the object is a User object
2024-08-24 19:13:01 -04:00

294 lines
10 KiB
Python

# pylint: disable=cyclic-import
from datetime import datetime, timedelta
from typing import Optional, Tuple, Union
import aiosqlite
from dateutil.relativedelta import relativedelta as rd
from discord import File, Guild, Interaction, Member, SelectOption, TextChannel, User
from discord.errors import Forbidden
from redbot.core import commands, data_manager
from redbot.core.utils.chat_formatting import error
from ..models.type import Type
from ..utilities.config import config
from ..utilities.json import dumps
from ..utilities.logger import logger
def check_permissions(
user: User,
permissions: Tuple[str],
ctx: commands.Context | Interaction | None = None,
guild: Guild | None = None,
) -> Union[bool, str]:
"""Checks if a user has a specific permission (or a list of permissions) in a channel."""
if ctx:
member = ctx.guild.get_member(user.id)
resolved_permissions = ctx.channel.permissions_for(member)
elif guild:
member = guild.get_member(user.id)
resolved_permissions = member.guild_permissions
else:
raise (KeyError)
for permission in permissions:
if (
not getattr(resolved_permissions, permission, False)
and resolved_permissions.administrator is not True
):
return permission
return False
async def check_moddable(
target: Union[User, Member, TextChannel], ctx: commands.Context, permissions: Tuple[str], moderation_type: Type,
) -> bool:
"""Checks if a moderator can moderate a target."""
is_channel = isinstance(target, TextChannel)
use_discord_permissions = await config.custom("types", ctx.guild.id, moderation_type.key).use_discord_permissions()
if use_discord_permissions is None:
use_discord_permissions = await config.guild(ctx.guild).use_discord_permissions()
if check_permissions(ctx.bot.user, permissions, guild=ctx.guild):
await ctx.send(
error(
f"I do not have the `{permissions}` permission, required for this action."
),
ephemeral=True,
)
return False
if use_discord_permissions is True:
if check_permissions(ctx.author, permissions, guild=ctx.guild):
await ctx.send(
error(
f"You do not have the `{permissions}` permission, required for this action."
),
ephemeral=True,
)
return False
if ctx.author.id == target.id:
await ctx.send(
content="You cannot moderate yourself!", ephemeral=True
)
return False
if not is_channel and target.bot:
await ctx.send(
content="You cannot moderate bots!", ephemeral=True
)
return False
if isinstance(target, Member):
if ctx.author.top_role <= target.top_role and await config.guild(ctx.guild).respect_hierarchy() is True:
await ctx.send(
content=error(
"You cannot moderate members with a higher role than you!"
),
ephemeral=True,
)
return False
if target.guild_permissions.administrator:
await ctx.send(
content="You cannot moderate members with the Administrator permission!", ephemeral=True
)
return False
if (
ctx.guild.get_member(ctx.bot.user.id).top_role
<= target.top_role
):
await ctx.send(
content=error(
"You cannot moderate members with a role higher than the bot!"
),
ephemeral=True,
)
return False
immune_roles = await config.guild(target.guild).immune_roles()
for role in target.roles:
if role.id in immune_roles:
await ctx.send(
content=error("You cannot moderate members with an immune role!"),
ephemeral=True,
)
return False
return True
async def log(ctx: commands.Context, moderation_id: int, resolved: bool = False) -> None:
"""This function sends a message to the guild's configured logging channel when an infraction takes place."""
from ..models.moderation import Moderation
from .factory import log_factory
logging_channel_id = await config.guild(ctx.guild).log_channel()
if logging_channel_id != " ":
logging_channel = ctx.guild.get_channel(logging_channel_id)
try:
moderation = await Moderation.find_by_id(ctx.bot, moderation_id, ctx.guild.id)
embed = await log_factory(
ctx=ctx, moderation=moderation, resolved=resolved
)
try:
await logging_channel.send(embed=embed)
except Forbidden:
return
except ValueError:
return
async def send_evidenceformat(ctx: commands.Context, moderation_id: int) -> None:
"""This function sends an ephemeral message to the moderator who took the moderation action, with a pre-made codeblock for use in the mod-evidence channel."""
from ..models.moderation import Moderation
from .factory import evidenceformat_factory
send_evidence_bool = (
await config.user(ctx.author).auto_evidenceformat()
or await config.guild(guild=ctx.guild).auto_evidenceformat()
or False
)
if send_evidence_bool is True:
moderation = await Moderation.find_by_id(ctx.bot, moderation_id, ctx.guild.id)
content = await evidenceformat_factory(moderation=moderation)
if not ctx.interaction:
await ctx.author.send(content=content)
else:
await ctx.send(content=content, ephemeral=True)
def get_bool_emoji(value: Optional[bool]) -> str:
"""Returns a unicode emoji based on a boolean value."""
match value:
case True:
return "\N{WHITE HEAVY CHECK MARK}"
case False:
return "\N{NO ENTRY SIGN}"
case _:
return "\N{BLACK QUESTION MARK ORNAMENT}\N{VARIATION SELECTOR-16}"
def get_pagesize_str(value: Union[int, None]) -> str:
"""Returns a string based on a pagesize value."""
if value is None:
return "\N{BLACK QUESTION MARK ORNAMENT}\N{VARIATION SELECTOR-16}"
return str(value) + " cases per page"
def create_pagesize_options() -> list[SelectOption]:
"""Returns a list of SelectOptions for pagesize configuration."""
options = []
options.append(
SelectOption(
label="Default",
value="default",
description="Reset the pagesize to the default value.",
)
)
for i in range(1, 21):
options.append(
SelectOption(
label=str(i),
value=str(i),
description=f"Set the pagesize to {i}.",
)
)
return options
def timedelta_from_relativedelta(relativedelta: rd) -> timedelta:
"""Converts a relativedelta object to a timedelta object."""
now = datetime.now()
then = now - relativedelta
return now - then
def timedelta_from_string(string: str) -> timedelta:
"""Converts a string to a timedelta object."""
hours, minutes, seconds = map(int, string.split(":"))
return timedelta(hours=hours, minutes=minutes, seconds=seconds)
def timedelta_to_string(td: timedelta) -> str:
"""Converts a timedelta object to a string."""
days = td.days * 24
hours, remainder = divmod(td.seconds, 3600)
minutes, seconds = divmod(remainder, 60)
return f"{days + hours}:{minutes:02}:{seconds:02}"
def get_footer_image(coginstance: commands.Cog) -> File:
"""Returns the footer image for the embeds."""
image_path = data_manager.bundled_data_path(coginstance) / "arrow.png"
return File(image_path, filename="arrow.png", description="arrow")
async def create_guild_table(guild: Guild) -> None:
from ..models.moderation import Moderation
try:
await Moderation.execute(f"SELECT * FROM `moderation_{guild.id}`", return_obj=False)
logger.trace("SQLite Table exists for server %s (%s)", guild.name, guild.id)
except aiosqlite.OperationalError:
query = f"""
CREATE TABLE `moderation_{guild.id}` (
moderation_id INTEGER PRIMARY KEY NOT NULL,
timestamp INTEGER NOT NULL,
moderation_type TEXT NOT NULL,
target_type TEXT NOT NULL,
target_id INTEGER NOT NULL,
moderator_id INTEGER NOT NULL,
role_id INTEGER,
duration TEXT,
end_timestamp INTEGER,
reason TEXT,
resolved INTEGER NOT NULL,
resolved_by TEXT,
resolve_reason TEXT,
expired INTEGER NOT NULL,
changes JSON NOT NULL,
metadata JSON NOT NULL
)
"""
await Moderation.execute(query=query, return_obj=False)
index_query_1 = f"CREATE INDEX IF NOT EXISTS idx_target_id ON moderation_{guild.id}(target_id);"
await Moderation.execute(query=index_query_1, return_obj=False)
index_query_2 = f"CREATE INDEX IF NOT EXISTS idx_moderator_id ON moderation_{guild.id}(moderator_id);"
await Moderation.execute(query=index_query_2, return_obj=False)
index_query_3 = f"CREATE INDEX IF NOT EXISTS idx_moderation_id ON moderation_{guild.id}(moderation_id);"
await Moderation.execute(query=index_query_3, return_obj=False)
insert_query = f"""
INSERT INTO `moderation_{guild.id}`
(moderation_id, timestamp, moderation_type, target_type, target_id, moderator_id, role_id, duration, end_timestamp, reason, resolved, resolved_by, resolve_reason, expired, changes, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
insert_values = (
0,
0,
"NULL",
"NULL",
0,
0,
None,
None,
None,
None,
0,
None,
None,
0,
dumps([]),
dumps({}),
)
await Moderation.execute(query=insert_query, parameters=insert_values, return_obj=False)
logger.trace("SQLite Table created for server %s (%s)", guild.name, guild.id)