WIP: Refactor Aurora (3.0.0) #29

Draft
cswimr wants to merge 347 commits from aurora-pydantic into main
2 changed files with 199 additions and 305 deletions
Showing only changes of commit f8968e8e9e - Show all commits

View file

@ -11,6 +11,7 @@ import sqlite3
import time
from datetime import datetime, timedelta, timezone
from math import ceil
from typing import List
import discord
from discord import Object
@ -19,8 +20,7 @@ from redbot.core import app_commands, commands, data_manager
from redbot.core.app_commands import Choice
from redbot.core.bot import Red
from redbot.core.commands.converter import parse_relativedelta, parse_timedelta
from redbot.core.utils.chat_formatting import (box, error, humanize_list,
humanize_timedelta, warning)
from redbot.core.utils.chat_formatting import box, error, humanize_list, humanize_timedelta, warning
from aurora.importers.aurora import ImportAuroraView
from aurora.importers.galacticbot import ImportGalacticBotView
@ -30,19 +30,11 @@ from aurora.menus.immune import Immune
from aurora.menus.overrides import Overrides
from aurora.models import Change, Moderation
from aurora.utilities.config import config, register_config
from aurora.utilities.database import (connect, create_guild_table, fetch_case,
mysql_log)
from aurora.utilities.factory import (addrole_embed, case_factory,
changes_factory, evidenceformat_factory,
guild_embed, immune_embed,
message_factory, overrides_embed)
from aurora.utilities.json import dump, dumps
from aurora.utilities.database import connect, create_guild_table, fetch_case, mysql_log
from aurora.utilities.factory import addrole_embed, case_factory, changes_factory, evidenceformat_factory, guild_embed, immune_embed, message_factory, overrides_embed
from aurora.utilities.json import dump
from aurora.utilities.logger import logger
from aurora.utilities.utils import (check_moddable, check_permissions,
fetch_channel_dict, fetch_user_dict,
generate_dict, get_footer_image, log,
send_evidenceformat,
timedelta_from_relativedelta)
from aurora.utilities.utils import check_moddable, check_permissions, get_footer_image, log, send_evidenceformat, timedelta_from_relativedelta
class Aurora(commands.Cog):
@ -1122,15 +1114,15 @@ class Aurora(commands.Cog):
cursor.execute(query)
results = cursor.fetchall()
result_dict_list = []
moderation_list: List[Moderation] = []
for result in results:
case_dict = generate_dict(result)
if case_dict["moderation_id"] == 0:
continue
result_dict_list.append(case_dict)
if result["moderation_id"] != 0:
result.update({"guild_id": interaction.guild.id})
moderation = Moderation.from_dict(interaction.client, dict(result))
moderation_list.append(moderation)
case_quantity = len(result_dict_list)
case_quantity = len(moderation_list)
page_quantity = ceil(case_quantity / pagesize)
start_index = (page - 1) * pagesize
end_index = page * pagesize
@ -1143,74 +1135,44 @@ class Aurora(commands.Cog):
memory_dict = {}
for case in result_dict_list[start_index:end_index]:
if case["target_id"] not in memory_dict:
if case["target_type"] == "USER":
memory_dict[str(case["target_id"])] = await fetch_user_dict(
interaction.client, case["target_id"]
)
elif case["target_type"] == "CHANNEL":
memory_dict[str(case["target_id"])] = await fetch_channel_dict(
interaction.guild, case["target_id"]
)
target_user = memory_dict[str(case["target_id"])]
for mod in moderation_list[start_index:end_index]:
if mod.target_id not in memory_dict:
memory_dict.update({
str(mod.target_id): await mod.get_target()
})
target = memory_dict[str(mod.target_id)]
if case["target_type"] == "USER":
target_name = (
f"`{target_user['name']}`"
if target_user["discriminator"] == "0"
else f"`{target_user['name']}#{target_user['discriminator']}`"
)
elif case["target_type"] == "CHANNEL":
target_name = f"`{target_user['mention']}`"
if case["moderator_id"] not in memory_dict:
memory_dict[str(case["moderator_id"])] = await fetch_user_dict(
interaction.client, case["moderator_id"]
)
moderator_user = memory_dict[str(case["moderator_id"])]
moderator_name = (
f"`{moderator_user['name']}`"
if moderator_user["discriminator"] == "0"
else f"`{moderator_user['name']}#{moderator_user['discriminator']}`"
)
if mod.moderator_id not in memory_dict:
memory_dict.update({
str(mod.moderator_id): await mod.get_moderator()
})
moderator = memory_dict[str(mod.moderator_id)]
field_name = f"Case #{case['moderation_id']:,} ({str.title(case['moderation_type'])})"
field_value = f"**Target:** {target_name} ({target_user['id']})\n**Moderator:** {moderator_name} ({moderator_user['id']})"
field_value = f"**Target:** `{target.name}` ({target.id})\n**Moderator:** `{moderator.name}` ({moderator.id})"
if len(case["reason"]) > 125:
field_value += f"\n**Reason:** `{str(case['reason'])[:125]}...`"
field_value += f"\n**Reason:** `{str(mod.reason)[:125]}...`"
else:
field_value += f"\n**Reason:** `{str(case['reason'])}`"
field_value += f"\n**Reason:** `{str(mod.reason)}`"
if case["duration"] != "NULL":
td = timedelta(
**{
unit: int(val)
for unit, val in zip(
["hours", "minutes", "seconds"], case["duration"].split(":")
)
}
)
if mod.duration:
duration_embed = (
f"{humanize_timedelta(timedelta=td)} | <t:{case['end_timestamp']}:R>"
if bool(case["expired"]) is False
else f"{humanize_timedelta(timedelta=td)} | Expired"
f"{humanize_timedelta(timedelta=mod.duration)} | <t:{int(mod.end_timestamp.timestamp())}:R>"
if mod.expired is False
else f"{humanize_timedelta(timedelta=mod.duration)} | Expired"
)
field_value += f"\n**Duration:** {duration_embed}"
field_value += (
f"\n**Timestamp:** <t:{case['timestamp']}> | <t:{case['timestamp']}:R>"
f"\n**Timestamp:** <t:{int(mod.timestamp.timestamp())}> | <t:{int(mod.timestamp.timestamp())}:R>"
)
if case["role_id"] != "0":
role = interaction.guild.get_role(int(case["role_id"]))
if role is not None:
field_value += f"\n**Role:** {role.mention}"
else:
field_value += f"\n**Role:** Deleted Role ({case['role_id']})"
if mod.role_id:
role = await mod.get_role()
field_value += f"\n**Role:** {role.mention} ({role.id})"
if bool(case["resolved"]):
if mod.resolved:
field_value += "\n**Resolved:** True"
embed.add_field(name=field_name, value=field_value, inline=inline)
@ -1243,39 +1205,14 @@ class Aurora(commands.Cog):
)
return
database = connect()
cursor = database.cursor()
query_1 = (
f"SELECT * FROM moderation_{interaction.guild.id} WHERE moderation_id = ?;"
)
cursor.execute(query_1, (case,))
result_1 = cursor.fetchone()
if result_1 is None or case == 0:
try:
moderation = Moderation.from_sql(interaction.client, case, interaction.guild.id)
except ValueError:
await interaction.response.send_message(
content=error(f"There is no moderation with a case number of {case}."),
ephemeral=True,
content=error(f"Case #{case:,} does not exist!"), ephemeral=True
)
return
query_2 = f"SELECT * FROM moderation_{interaction.guild.id} WHERE moderation_id = ? AND resolved = 0;"
cursor.execute(query_2, (case,))
result_2 = cursor.fetchone()
if result_2 is None:
await interaction.response.send_message(
content=error(
f"This moderation has already been resolved!\nUse `/case {case}` for more information."
),
ephemeral=True,
)
return
case_dict = generate_dict(result_2)
if reason is None:
reason = "No reason given."
changes: list = case_dict["changes"]
if len(changes) > 25:
if len(moderation.changes) > 25:
await interaction.response.send_message(
content=error(
"Due to limitations with Discord's embed system, you cannot edit a case more than 25 times."
@ -1283,68 +1220,18 @@ class Aurora(commands.Cog):
ephemeral=True,
)
return
if not changes:
changes.append(
{
"type": "ORIGINAL",
"timestamp": case_dict["timestamp"],
"reason": case_dict["reason"],
"user_id": case_dict["moderator_id"],
}
)
changes.append(
{
"type": "RESOLVE",
"timestamp": int(time.time()),
"reason": reason,
"user_id": interaction.user.id,
}
)
if case_dict["moderation_type"] in ["UNMUTE", "UNBAN"]:
await interaction.response.send_message(
content=error("You cannot resolve this type of moderation!"),
ephemeral=True,
)
return
if case_dict["moderation_type"] in ["MUTE", "TEMPBAN", "BAN"]:
if case_dict["moderation_type"] == "MUTE":
try:
member = await interaction.guild.fetch_member(
case_dict["target_id"]
)
await member.timeout(
None, reason=f"Case #{case:,} resolved by {interaction.user.id}"
)
except discord.NotFound:
pass
if case_dict["moderation_type"] in ["TEMPBAN", "BAN"]:
try:
user = await interaction.client.fetch_user(case_dict["target_id"])
await interaction.guild.unban(
user, reason=f"Case #{case} resolved by {interaction.user.id}"
)
except discord.NotFound:
pass
resolve_query = f"UPDATE `moderation_{interaction.guild.id}` SET resolved = 1, changes = ?, resolved_by = ?, resolve_reason = ? WHERE moderation_id = ?"
else:
resolve_query = f"UPDATE `moderation_{interaction.guild.id}` SET resolved = 1, changes = ?, resolved_by = ?, resolve_reason = ? WHERE moderation_id = ?"
cursor.execute(
resolve_query,
(
dumps(changes),
interaction.user.id,
reason,
case_dict["moderation_id"],
),
)
database.commit()
try:
await moderation.resolve(interaction.user.id, reason)
except (ValueError, TypeError) as e:
if e == ValueError:
await interaction.response.send_message(
content=error("This case has already been resolved!"), ephemeral=True
)
elif e == TypeError:
await interaction.response.send_message(
content=error("This case type cannot be resolved!"), ephemeral=True
)
embed = await case_factory(
interaction=interaction,
@ -1355,9 +1242,6 @@ class Aurora(commands.Cog):
)
await log(interaction, case, resolved=True)
cursor.close()
database.close()
@app_commands.command(name="case")
@app_commands.choices(
export=[
@ -1405,67 +1289,69 @@ class Aurora(commands.Cog):
or False
)
if case != 0:
try:
mod = Moderation.from_sql(interaction.client, case, interaction.guild.id)
if mod:
if export:
if export.value == "file" or len(mod.to_json(2)) > 1800:
filename = (
str(data_manager.cog_data_path(cog_instance=self))
+ str(os.sep)
+ f"moderation_{interaction.guild.id}_case_{case}.json"
)
except ValueError:
await interaction.response.send_message(
content=error(f"Case #{case:,} does not exist!"), ephemeral=True
)
return
with open(filename, "w", encoding="utf-8") as f:
mod.to_json(2, f)
if export.value == "codeblock":
content = f"Case #{case:,} exported.\n" + warning(
"Case was too large to export as codeblock, so it has been uploaded as a `.json` file."
)
else:
content = f"Case #{case:,} exported."
if export:
if export.value == "file" or len(mod.to_json(2)) > 1800:
filename = (
str(data_manager.cog_data_path(cog_instance=self))
+ str(os.sep)
+ f"moderation_{interaction.guild.id}_case_{case}.json"
)
await interaction.response.send_message(
content=content,
file=discord.File(
filename,
f"moderation_{interaction.guild.id}_case_{case}.json",
),
ephemeral=ephemeral,
)
os.remove(filename)
return
await interaction.response.send_message(
content=box(mod.to_json(2), 'json'),
ephemeral=ephemeral,
)
return
if changes:
embed = await changes_factory(
interaction=interaction, moderation=mod
)
await interaction.response.send_message(
embed=embed, ephemeral=ephemeral
)
elif evidenceformat:
content = await evidenceformat_factory(
interaction=interaction, moderation=mod
)
await interaction.response.send_message(
content=content, ephemeral=ephemeral
with open(filename, "w", encoding="utf-8") as f:
mod.to_json(2, f)
if export.value == "codeblock":
content = f"Case #{case:,} exported.\n" + warning(
"Case was too large to export as codeblock, so it has been uploaded as a `.json` file."
)
else:
embed = await case_factory(
interaction=interaction, moderation=mod
)
await interaction.response.send_message(
embed=embed, ephemeral=ephemeral
)
content = f"Case #{case:,} exported."
await interaction.response.send_message(
content=content,
file=discord.File(
filename,
f"moderation_{interaction.guild.id}_case_{case}.json",
),
ephemeral=ephemeral,
)
os.remove(filename)
return
await interaction.response.send_message(
content=f"No case with case number `{case}` found.", ephemeral=True
)
await interaction.response.send_message(
content=box(mod.to_json(2), 'json'),
ephemeral=ephemeral,
)
return
if changes:
embed = await changes_factory(
interaction=interaction, moderation=mod
)
await interaction.response.send_message(
embed=embed, ephemeral=ephemeral
)
elif evidenceformat:
content = await evidenceformat_factory(
interaction=interaction, moderation=mod
)
await interaction.response.send_message(
content=content, ephemeral=ephemeral
)
else:
embed = await case_factory(
interaction=interaction, moderation=mod
)
await interaction.response.send_message(
embed=embed, ephemeral=ephemeral
)
return
@app_commands.command(name="edit")
async def edit(
@ -1498,93 +1384,96 @@ class Aurora(commands.Cog):
)
return
if case != 0:
try:
moderation = Moderation.from_sql(interaction.client, case, interaction.guild.id)
old_moderation = moderation
if moderation:
if len(moderation.changes) > 25:
return await interaction.response.send_message(
content=error(
"Due to limitations with Discord's embed system, you cannot edit a case more than 25 times."
),
ephemeral=True,
)
if duration:
moderation.duration = parse_timedelta(duration)
if moderation.duration is None:
return await interaction.response.send_message(
error("Please provide a valid duration!"), ephemeral=True
)
except ValueError:
await interaction.response.send_message(
content=error(f"Case #{case:,} does not exist!"), ephemeral=True
)
return
moderation.end_timestamp = moderation.timestamp + moderation.duration.total_seconds()
if len(moderation.changes) > 25:
return await interaction.response.send_message(
content=error(
"Due to limitations with Discord's embed system, you cannot edit a case more than 25 times."
),
ephemeral=True,
)
if moderation.type == "MUTE":
if (
time.time() - moderation.unix_timestamp
) + moderation.duration.total_seconds() > 2419200:
return await interaction.response.send_message(
error(
"Please provide a duration that is less than 28 days from the initial moderation."
)
)
try:
member = await interaction.guild.fetch_member(
moderation.target_id
)
await member.timeout(
moderation.duration,
reason=f"Case #{case:,} edited by {interaction.user.id}",
)
except discord.NotFound:
pass
if reason:
moderation.reason = reason
if not moderation.changes:
moderation.changes.append(Change.from_dict(interaction.client, {
"type": "ORIGINAL",
"timestamp": old_moderation.timestamp,
"reason": old_moderation.reason,
"user_id": old_moderation.moderator_id,
"duration": old_moderation.duration,
"end_timestamp": old_moderation.end_timestamp,
}))
if duration:
moderation.changes.append(Change.from_dict(interaction.client, {
"type": "EDIT",
"timestamp": int(time.time()),
"reason": reason,
"user_id": interaction.user.id,
"duration": moderation.duration,
"end_timestamp": moderation.end_timestamp,
}))
else:
moderation.changes.append(Change.from_dict(interaction.client, {
"type": "EDIT",
"timestamp": int(time.time()),
"reason": reason,
"user_id": interaction.user.id,
"duration": moderation.duration,
"end_timestamp": moderation.end_timestamp,
}))
moderation.update()
embed = await case_factory(interaction=interaction, moderation=moderation)
await interaction.response.send_message(
content=f"✅ Moderation #{case:,} edited!",
embed=embed,
ephemeral=True,
if duration:
moderation.duration = parse_timedelta(duration)
if moderation.duration is None:
return await interaction.response.send_message(
error("Please provide a valid duration!"), ephemeral=True
)
await log(interaction, case)
return
moderation.end_timestamp = moderation.timestamp + moderation.duration.total_seconds()
if moderation.type == "MUTE":
if (
time.time() - moderation.unix_timestamp
) + moderation.duration.total_seconds() > 2419200:
return await interaction.response.send_message(
error(
"Please provide a duration that is less than 28 days from the initial moderation."
)
)
try:
member = await interaction.guild.fetch_member(
moderation.target_id
)
await member.timeout(
moderation.duration,
reason=f"Case #{case:,} edited by {interaction.user.id}",
)
except discord.NotFound:
pass
if reason:
moderation.reason = reason
if not moderation.changes:
moderation.changes.append(Change.from_dict(interaction.client, {
"type": "ORIGINAL",
"timestamp": old_moderation.timestamp,
"reason": old_moderation.reason,
"user_id": old_moderation.moderator_id,
"duration": old_moderation.duration,
"end_timestamp": old_moderation.end_timestamp,
}))
if duration:
moderation.changes.append(Change.from_dict(interaction.client, {
"type": "EDIT",
"timestamp": int(time.time()),
"reason": reason,
"user_id": interaction.user.id,
"duration": moderation.duration,
"end_timestamp": moderation.end_timestamp,
}))
else:
moderation.changes.append(Change.from_dict(interaction.client, {
"type": "EDIT",
"timestamp": int(time.time()),
"reason": reason,
"user_id": interaction.user.id,
"duration": moderation.duration,
"end_timestamp": moderation.end_timestamp,
}))
moderation.update()
embed = await case_factory(interaction=interaction, moderation=moderation)
await interaction.response.send_message(
content=error(f"No case with case number `{case}` found."), ephemeral=True
content=f"✅ Moderation #{case:,} edited!",
embed=embed,
ephemeral=True,
)
await log(interaction, case)
return
@tasks.loop(minutes=1)
async def handle_expiry(self):

View file

@ -90,6 +90,9 @@ class Moderation(AuroraGuildModel):
self.resolved_by = resolved_by
self.resolve_reason = reason
if self.type in ["UNMUTE", "UNBAN"]:
raise TypeError("Cannot resolve an unmute or unban case!")
if self.type == "MUTE":
try:
guild: discord.Guild = await self.bot.fetch_guild(self.guild_id)
@ -176,7 +179,7 @@ class Moderation(AuroraGuildModel):
return cls(bot=bot, **data)
@classmethod
def from_sql(cls, bot: Red, moderation_id: int, guild_id: int) -> Optional["Moderation"]:
def from_sql(cls, bot: Red, moderation_id: int, guild_id: int) -> "Moderation":
from aurora.utilities.database import connect
query = f"SELECT * FROM moderation_{guild_id} WHERE moderation_id = ?;"
@ -184,13 +187,13 @@ class Moderation(AuroraGuildModel):
cursor = database.cursor()
cursor.execute(query, (moderation_id,))
result = cursor.fetchone()
cursor.close()
if result:
if result and not moderation_id == 0:
case = generate_dict(bot, result, guild_id)
cursor.close()
return cls.from_dict(bot, case)
return None
raise ValueError(f"Case {moderation_id} not found in moderation_{guild_id}!")
@classmethod
def log(
@ -411,6 +414,8 @@ class PartialRole(AuroraGuildModel):
@property
def mention(self):
if self.name == "Deleted Role" or self.name == "Forbidden Role":
return self.name
return f"<@&{self.id}>"
def __str__(self):