feat(aurora): whole bunch of changes to the models and various other things

This commit is contained in:
Seaswimmer 2024-05-04 16:54:12 -04:00
parent 2dfc9d9824
commit 6147c8c6d5
Signed by untrusted user: cswimr
GPG key ID: 5D671B5D03D65A7F
6 changed files with 167 additions and 162 deletions

View file

@ -19,8 +19,7 @@ from redbot.core import app_commands, commands, data_manager
from redbot.core.app_commands import Choice
from redbot.core.bot import Red
from redbot.core.commands.converter import parse_relativedelta, parse_timedelta
from redbot.core.utils.chat_formatting import (box, error, humanize_list,
humanize_timedelta, warning)
from redbot.core.utils.chat_formatting import box, error, humanize_list, humanize_timedelta, warning
from aurora.importers.aurora import ImportAuroraView
from aurora.importers.galacticbot import ImportGalacticBotView
@ -28,21 +27,13 @@ from aurora.menus.addrole import Addrole
from aurora.menus.guild import Guild
from aurora.menus.immune import Immune
from aurora.menus.overrides import Overrides
from aurora.models import Moderation
from aurora.utilities.config import config, register_config
from aurora.utilities.database import (connect, create_guild_table, fetch_case,
mysql_log)
from aurora.utilities.factory import (addrole_embed, case_factory,
changes_factory, evidenceformat_factory,
guild_embed, immune_embed,
message_factory, overrides_embed)
from aurora.utilities.database import connect, create_guild_table, fetch_case, mysql_log
from aurora.utilities.factory import addrole_embed, case_factory, changes_factory, evidenceformat_factory, guild_embed, immune_embed, message_factory, overrides_embed
from aurora.utilities.json import dump, dumps
from aurora.utilities.logger import logger
from aurora.utilities.utils import (check_moddable, check_permissions,
convert_timedelta_to_str,
fetch_channel_dict, fetch_user_dict,
generate_dict, get_footer_image, log,
send_evidenceformat,
timedelta_from_relativedelta)
from aurora.utilities.utils import check_moddable, check_permissions, convert_timedelta_to_str, fetch_channel_dict, fetch_user_dict, generate_dict, get_footer_image, log, send_evidenceformat, timedelta_from_relativedelta
class Aurora(commands.Cog):
@ -1406,10 +1397,10 @@ class Aurora(commands.Cog):
)
if case != 0:
case_dict = await fetch_case(case, interaction.guild.id)
if case_dict:
mod = Moderation.from_sql(interaction.client, case, interaction.guild.id)
if mod:
if export:
if export.value == "file" or len(str(case_dict)) > 1800:
if export.value == "file" or len(mod.to_json(2)) > 1800:
filename = (
str(data_manager.cog_data_path(cog_instance=self))
+ str(os.sep)
@ -1417,8 +1408,7 @@ class Aurora(commands.Cog):
)
with open(filename, "w", encoding="utf-8") as f:
dump(case_dict, f, indent=2)
mod.to_json(2, f)
if export.value == "codeblock":
content = f"Case #{case:,} exported.\n" + warning(
"Case was too large to export as codeblock, so it has been uploaded as a `.json` file."
@ -1438,27 +1428,27 @@ class Aurora(commands.Cog):
os.remove(filename)
return
await interaction.response.send_message(
content=box(dumps(case_dict, indent=2), 'json'),
content=box(mod.to_json(2), 'json'),
ephemeral=ephemeral,
)
return
if changes:
embed = await changes_factory(
interaction=interaction, case_dict=case_dict
interaction=interaction, moderation=mod
)
await interaction.response.send_message(
embed=embed, ephemeral=ephemeral
)
elif evidenceformat:
content = await evidenceformat_factory(
interaction=interaction, case_dict=case_dict
interaction=interaction, moderation=mod
)
await interaction.response.send_message(
content=content, ephemeral=ephemeral
)
else:
embed = await case_factory(
interaction=interaction, case_dict=case_dict
interaction=interaction, moderation=mod
)
await interaction.response.send_message(
embed=embed, ephemeral=ephemeral

View file

@ -1,14 +1,18 @@
import json
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from typing import Any, Dict, List, Optional, Union
from async_property import async_cached_property
from discord import Forbidden, HTTPException, InvalidData, NotFound
from pydantic import BaseModel
from redbot.core.bot import Red
from aurora.utilities.utils import generate_dict
class AuroraBaseModel(BaseModel):
"""Base class for all models in Aurora."""
class Moderation(AuroraBaseModel):
bot: Red
moderation_id: int
guild_id: int
timestamp: datetime
@ -27,11 +31,36 @@ class Moderation(AuroraBaseModel):
changes: List[Dict]
metadata: Dict
@property
def id(self) -> int:
return self.moderation_id
@property
def type(self) -> str:
return self.moderation_type
@async_cached_property
async def moderator(self) -> "PartialUser":
return await PartialUser.from_id(self.bot, self.moderator_id)
@async_cached_property
async def target(self) -> Union["PartialUser", "PartialChannel"]:
if self.target_type == "user":
return await PartialUser.from_id(self.bot, self.target_id)
else:
return await PartialChannel.from_id(self.bot, self.target_id)
@async_cached_property
async def resolved_by_user(self) -> Optional["PartialUser"]:
if self.resolved_by:
return await PartialUser.from_id(self.bot, self.resolved_by)
return None
def __str__(self):
return f"{self.moderation_type} {self.target_type} {self.target_id} {self.reason}"
@classmethod
def from_sql(cls, moderation_id: int, guild_id: int):
def from_sql(cls, bot: Red, moderation_id: int, guild_id: int) -> Optional["Moderation"]:
from aurora.utilities.database import connect
query = f"SELECT * FROM moderation_{guild_id} WHERE moderation_id = ?;"
@ -41,37 +70,64 @@ class Moderation(AuroraBaseModel):
result = cursor.fetchone()
if result:
if result[7] is not None:
hours, minutes, seconds = map(int, result[7].split(':'))
duration = timedelta(hours=hours, minutes=minutes, seconds=seconds)
else:
duration = None
case = {
"moderation_id": int(result[0]),
"guild_id": int(guild_id),
"timestamp": datetime.fromtimestamp(result[1]),
"moderation_type": str(result[2]),
"target_type": str(result[3]),
"target_id": int(result[4]),
"moderator_id": int(result[5]),
"role_id": int(result[6]) if result[6] is not None else None,
"duration": duration,
"end_timestamp": datetime.fromtimestamp(result[8]) if result[8] is not None else None,
"reason": result[9],
"resolved": bool(result[10]),
"resolved_by": result[11],
"resolve_reason": result[12],
"expired": bool(result[13]),
"changes": json.loads(result[14].strip('"').replace('\\"', '"')) if result[14] else [],
"metadata": json.loads(result[15].strip('"').replace('\\"', '"')) if result[15] else {},
}
case = generate_dict(result)
cursor.close()
return cls(**case)
return cls.from_dict(bot, case)
return None
def to_json(self, indent: int = None, file: bool = False):
@classmethod
def from_dict(cls, bot: Red, data: dict) -> "Moderation":
return cls(bot=bot, **data)
def to_json(self, indent: int = None, file: Any = None):
from aurora.utilities.json import dump, dumps
return dump(self.model_dump(), indent=indent) if file else dumps(self.model_dump(), indent=indent)
return dump(self.model_dump(exclude={"bot", "guild_id"}), file, indent=indent) if file else dumps(self.model_dump(exclude={"bot", "guild_id"}), indent=indent)
class PartialUser(AuroraBaseModel):
id: int
username: str
discriminator: int
@property
def name(self):
return f"{self.username}#{self.discriminator}" if self.discriminator == 0 else self.username
def __str__(self):
return self.name
@classmethod
async def from_id(cls, bot: Red, user_id: int) -> "PartialUser":
user = bot.get_user(user_id)
if not user:
try:
user = await bot.fetch_user(user_id)
return cls(id=user.id, username=user.name, discriminator=user.discriminator)
except NotFound:
return cls(id=user_id, username="Deleted User", discriminator=0)
class PartialChannel(AuroraBaseModel):
id: int
name: str
@property
def mention(self):
if self.name == "Deleted Channel" or self.name == "Forbidden Channel":
return self.name
return f"<#{self.id}>"
def __str__(self):
return self.mention
@classmethod
async def from_id(cls, bot: Red, channel_id: int) -> "PartialChannel":
user = bot.get_channel(channel_id)
if not user:
try:
user = await bot.fetch_channel(channel_id)
return cls(id=user.id, username=user.name, discriminator=user.discriminator)
except (NotFound, InvalidData, HTTPException, Forbidden) as e:
if e == Forbidden:
return cls(id=channel_id, name="Forbidden Channel")
return cls(id=channel_id, name="Deleted Channel")

View file

@ -1,13 +1,18 @@
# pylint: disable=cyclic-import
from datetime import datetime, timedelta
from typing import Union
from typing import Optional, Union
from discord import Color, Embed, Guild, Interaction, InteractionMessage, Member, Role, User
from discord import (Color, Embed, Guild, Interaction, InteractionMessage,
Member, Role, User)
from redbot.core import commands
from redbot.core.utils.chat_formatting import bold, box, error, humanize_timedelta, warning
from redbot.core.utils.chat_formatting import (bold, box, error,
humanize_timedelta, warning)
from aurora.models import Moderation, PartialChannel, PartialUser
from aurora.utilities.config import config
from aurora.utilities.utils import fetch_channel_dict, fetch_user_dict, get_bool_emoji, get_next_case_number, get_pagesize_str
from aurora.utilities.utils import (fetch_channel_dict, fetch_user_dict,
get_bool_emoji, get_next_case_number,
get_pagesize_str)
async def message_factory(
@ -94,7 +99,7 @@ async def message_factory(
async def log_factory(
interaction: Interaction, case_dict: dict, resolved: bool = False
interaction: Interaction, moderation: Moderation, resolved: bool = False
) -> Embed:
"""This function creates a log embed from set parameters, meant for moderation logging.
@ -103,113 +108,50 @@ async def log_factory(
case_dict (dict): The case dictionary.
resolved (bool, optional): Whether the case is resolved or not. Defaults to False.
"""
target: Union[PartialUser, PartialChannel] = await moderation.target
moderator: PartialUser = await moderation.moderator
if resolved:
if case_dict["target_type"] == "USER":
target_user = await fetch_user_dict(interaction.client, case_dict["target_id"])
target_name = (
f"`{target_user['name']}`"
if target_user["discriminator"] == "0"
else f"`{target_user['name']}#{target_user['discriminator']}`"
)
elif case_dict["target_type"] == "CHANNEL":
target_user = await fetch_channel_dict(interaction.guild, case_dict["target_id"])
if target_user["mention"]:
target_name = f"{target_user['mention']}"
else:
target_name = f"`{target_user['name']}`"
moderator_user = await fetch_user_dict(interaction.client, case_dict["moderator_id"])
moderator_name = (
f"`{moderator_user['name']}`"
if moderator_user["discriminator"] == "0"
else f"`{moderator_user['name']}#{moderator_user['discriminator']}`"
)
embed = Embed(
title=f"📕 Case #{case_dict['moderation_id']:,} Resolved",
title=f"📕 Case #{moderation.id:,} Resolved",
color=await interaction.client.get_embed_color(interaction.channel),
)
embed.description = f"**Type:** {str.title(case_dict['moderation_type'])}\n**Target:** {target_name} ({target_user['id']})\n**Moderator:** {moderator_name} ({moderator_user['id']})\n**Timestamp:** <t:{case_dict['timestamp']}> | <t:{case_dict['timestamp']}:R>"
resolved_by: Optional[PartialUser] = await moderation.resolved_by_user
embed.description = f"**Type:** {str.title(moderation.moderation_type)}\n**Target:** {target.name} ({target.id})\n**Moderator:** {moderator.name} ({moderator.id})\n**Timestamp:** <t:{moderation.timestamp}> | <t:{moderation.timestamp}:R>"
if case_dict["duration"] != "NULL":
td = timedelta(
**{
unit: int(val)
for unit, val in zip(
["hours", "minutes", "seconds"],
case_dict["duration"].split(":"),
)
}
)
if moderation.duration is not None:
duration_embed = (
f"{humanize_timedelta(timedelta=td)} | <t:{case_dict['end_timestamp']}:R>"
if case_dict["expired"] == "0"
else str(humanize_timedelta(timedelta=td))
f"{humanize_timedelta(timedelta=moderation.duration)} | <t:{moderation.end_timestamp}:R>"
if not moderation.expired
else str(humanize_timedelta(timedelta=moderation.duration))
)
embed.description = (
embed.description
+ f"\n**Duration:** {duration_embed}\n**Expired:** {bool(case_dict['expired'])}"
+ f"\n**Duration:** {duration_embed}\n**Expired:** {moderation.expired}"
)
embed.add_field(name="Reason", value=box(case_dict["reason"]), inline=False)
embed.add_field(name="Reason", value=box(moderation.reason), inline=False)
resolved_user = await fetch_user_dict(interaction.client, case_dict["resolved_by"])
resolved_name = (
resolved_user["name"]
if resolved_user["discriminator"] == "0"
else f"{resolved_user['name']}#{resolved_user['discriminator']}"
)
embed.add_field(
name="Resolve Reason",
value=f"Resolved by `{resolved_name}` ({resolved_user['id']}) for:\n"
+ box(case_dict["resolve_reason"]),
value=f"Resolved by `{resolved_by.name}` ({resolved_by.id}) for:\n"
+ box(moderation.resolve_reason),
inline=False,
)
else:
if case_dict["target_type"] == "USER":
target_user = await fetch_user_dict(interaction.client, case_dict["target_id"])
target_name = (
f"`{target_user['name']}`"
if target_user["discriminator"] == "0"
else f"`{target_user['name']}#{target_user['discriminator']}`"
)
elif case_dict["target_type"] == "CHANNEL":
target_user = await fetch_channel_dict(interaction.guild, case_dict["target_id"])
if target_user["mention"]:
target_name = target_user["mention"]
else:
target_name = f"`{target_user['name']}`"
moderator_user = await fetch_user_dict(interaction.client, case_dict["moderator_id"])
moderator_name = (
f"`{moderator_user['name']}`"
if moderator_user["discriminator"] == "0"
else f"`{moderator_user['name']}#{moderator_user['discriminator']}`"
)
embed = Embed(
title=f"📕 Case #{case_dict['moderation_id']:,}",
title=f"📕 Case #{moderation.id:,}",
color=await interaction.client.get_embed_color(interaction.channel),
)
embed.description = f"**Type:** {str.title(case_dict['moderation_type'])}\n**Target:** {target_name} ({target_user['id']})\n**Moderator:** {moderator_name} ({moderator_user['id']})\n**Timestamp:** <t:{case_dict['timestamp']}> | <t:{case_dict['timestamp']}:R>"
embed.description = f"**Type:** {str.title(moderation.type)}\n**Target:** {target.name} ({target.id})\n**Moderator:** {moderator.name} ({moderator.id})\n**Timestamp:** <t:{moderation.timestamp}> | <t:{moderation.timestamp}:R>"
if case_dict["duration"] != "NULL":
td = timedelta(
**{
unit: int(val)
for unit, val in zip(
["hours", "minutes", "seconds"],
case_dict["duration"].split(":"),
)
}
)
if moderation.duration:
embed.description = (
embed.description
+ f"\n**Duration:** {humanize_timedelta(timedelta=td)} | <t:{case_dict['end_timestamp']}:R>"
+ f"\n**Duration:** {humanize_timedelta(timedelta=moderation.duration)} | <t:{moderation.timestamp}:R>"
)
embed.add_field(name="Reason", value=box(case_dict["reason"]), inline=False)
embed.add_field(name="Reason", value=box(moderation.reason), inline=False)
return embed

View file

@ -1,7 +1,6 @@
# pylint: disable=cyclic-import
import json
from datetime import datetime
from datetime import timedelta as td
from datetime import datetime, timedelta
from typing import Optional, Union
from dateutil.relativedelta import relativedelta as rd
@ -10,7 +9,7 @@ from discord.errors import Forbidden, NotFound
from redbot.core import commands, data_manager
from redbot.core.utils.chat_formatting import error
from .config import config
from aurora.utilities.config import config
def check_permissions(
@ -125,24 +124,30 @@ async def get_next_case_number(guild_id: str, cursor=None) -> int:
return (result[0] + 1) if result else 1
def generate_dict(result) -> dict:
def generate_dict(result: dict, guild_id: int) -> dict:
if result[7] is not None:
hours, minutes, seconds = map(int, result[7].split(':'))
duration = timedelta(hours=hours, minutes=minutes, seconds=seconds)
else:
duration = None
case = {
"moderation_id": result[0],
"timestamp": result[1],
"moderation_type": result[2],
"target_type": result[3],
"target_id": result[4],
"moderator_id": result[5],
"role_id": result[6],
"duration": result[7],
"end_timestamp": result[8],
"moderation_id": int(result[0]),
"guild_id": int(guild_id),
"timestamp": datetime.fromtimestamp(result[1]),
"moderation_type": str(result[2]),
"target_type": str(result[3]),
"target_id": int(result[4]),
"moderator_id": int(result[5]),
"role_id": int(result[6]) if result[6] is not None else None,
"duration": duration,
"end_timestamp": datetime.fromtimestamp(result[8]) if result[8] is not None else None,
"reason": result[9],
"resolved": result[10],
"resolved": bool(result[10]),
"resolved_by": result[11],
"resolve_reason": result[12],
"expired": result[13],
"changes": json.loads(result[14]),
"metadata": json.loads(result[15]),
"expired": bool(result[13]),
"changes": json.loads(result[14].strip('"').replace('\\"', '"')) if result[14] else [],
"metadata": json.loads(result[15].strip('"').replace('\\"', '"')) if result[15] else {},
}
return case
@ -241,9 +246,9 @@ async def send_evidenceformat(interaction: Interaction, case_dict: dict) -> None
await interaction.followup.send(content=content, ephemeral=True)
def convert_timedelta_to_str(timedelta: td) -> str:
def convert_timedelta_to_str(td: timedelta) -> str:
"""This function converts a timedelta object to a string."""
total_seconds = int(timedelta.total_seconds())
total_seconds = int(td.total_seconds())
hours = total_seconds // 3600
minutes = (total_seconds % 3600) // 60
seconds = total_seconds % 60
@ -286,7 +291,7 @@ def create_pagesize_options() -> list[SelectOption]:
)
return options
def timedelta_from_relativedelta(relativedelta: rd) -> td:
def timedelta_from_relativedelta(relativedelta: rd) -> timedelta:
"""Converts a relativedelta object to a timedelta object."""
now = datetime.now()
then = now - relativedelta

13
poetry.lock generated
View file

@ -206,6 +206,17 @@ files = [
{file = "astroid-3.1.0.tar.gz", hash = "sha256:ac248253bfa4bd924a0de213707e7ebeeb3138abeb48d798784ead1e56d419d4"},
]
[[package]]
name = "async-property"
version = "0.2.2"
description = "Python decorator for async properties."
optional = false
python-versions = "*"
files = [
{file = "async_property-0.2.2-py2.py3-none-any.whl", hash = "sha256:8924d792b5843994537f8ed411165700b27b2bd966cefc4daeefc1253442a9d7"},
{file = "async_property-0.2.2.tar.gz", hash = "sha256:17d9bd6ca67e27915a75d92549df64b5c7174e9dc806b30a3934dc4ff0506380"},
]
[[package]]
name = "attrs"
version = "23.2.0"
@ -2545,4 +2556,4 @@ multidict = ">=4.0"
[metadata]
lock-version = "2.0"
python-versions = ">=3.11,<3.12"
content-hash = "67eb5e616951979332b6f32bcb39d85171cbf8377f566ea1862c51b5068b52f3"
content-hash = "05c89da1577b4a3507856338502218e0da92dd9785a5fc4a78d6cb59058d887f"

View file

@ -15,6 +15,7 @@ websockets = "^12.0"
pillow = "^10.3.0"
numpy = "^1.26.4"
pydantic = "^2.7.1"
async-property = "^0.2.2"
[tool.poetry.group.dev]
optional = true