feat(aurora): finishing up moderation handlers
Some checks failed
Actions / Build Documentation (MkDocs) (pull_request) Failing after 28s
Actions / Lint Code (Ruff & Pylint) (pull_request) Failing after 46s

This commit is contained in:
Seaswimmer 2024-07-12 15:22:24 -04:00
parent 9d0f2e3887
commit e2b0fc999a
Signed by: cswimr
GPG key ID: 3813315477F26F82
9 changed files with 995 additions and 720 deletions

View file

@ -13,7 +13,6 @@ from datetime import datetime, timedelta, timezone
from math import ceil from math import ceil
import discord import discord
from discord import Object
from discord.ext import tasks from discord.ext import tasks
from redbot.core import app_commands, commands, data_manager from redbot.core import app_commands, commands, data_manager
from redbot.core.app_commands import Choice from redbot.core.app_commands import Choice
@ -29,14 +28,13 @@ from .menus.immune import Immune
from .menus.overrides import Overrides from .menus.overrides import Overrides
from .models.change import Change from .models.change import Change
from .models.moderation import Moderation from .models.moderation import Moderation
from .models.moderation_types import Ban, Tempban from .models.type import type_registry
from .utilities.config import config, register_config from .utilities.config import config, register_config
from .utilities.factory import addrole_embed, case_factory, changes_factory, evidenceformat_factory, guild_embed, immune_embed, message_factory, overrides_embed from .utilities.factory import addrole_embed, case_factory, changes_factory, evidenceformat_factory, guild_embed, immune_embed, overrides_embed
from .utilities.json import dump from .utilities.json import dump
from .utilities.logger import logger from .utilities.logger import logger
from .utilities.moderate import moderate from .utilities.moderate import moderate
from .utilities.registry import type_registry from .utilities.utils import check_permissions, create_guild_table, log
from .utilities.utils import check_moddable, check_permissions, create_guild_table, get_footer_image, log, send_evidenceformat
class Aurora(commands.Cog): class Aurora(commands.Cog):
@ -209,47 +207,15 @@ class Aurora(commands.Cog):
Why are you noting this user? Why are you noting this user?
silent: bool silent: bool
Should the user be messaged?""" Should the user be messaged?"""
if not await check_moddable(target, interaction, ["moderate_members"]): await moderate(
return ctx=interaction,
target=target,
await interaction.response.send_message( silent=silent,
content=f"{target.mention} has recieved a note!\n**Reason** - `{reason}`" permissions=["moderate_members"],
moderation_type=type_registry['note'],
reason=reason,
) )
if silent is None:
silent = not await config.guild(interaction.guild).dm_users()
if silent is False:
try:
embed = await message_factory(
bot=interaction.client,
color=await self.bot.get_embed_color(interaction.channel),
guild=interaction.guild,
moderator=interaction.user,
reason=reason,
moderation_type="note",
response=await interaction.original_response(),
)
await target.send(embed=embed, file=get_footer_image(self))
except discord.errors.HTTPException:
pass
moderation = await Moderation.log(
interaction.client,
interaction.guild.id,
interaction.user.id,
"NOTE",
"USER",
target.id,
None,
None,
reason,
)
await interaction.edit_original_response(
content=f"{target.mention} has received a note! (Case `#{moderation.id:,}`)\n**Reason** - `{reason}`"
)
await log(interaction, moderation.id)
await send_evidenceformat(interaction, moderation.id)
@app_commands.command(name="warn") @app_commands.command(name="warn")
async def warn( async def warn(
self, self,
@ -268,47 +234,15 @@ class Aurora(commands.Cog):
Why are you warning this user? Why are you warning this user?
silent: bool silent: bool
Should the user be messaged?""" Should the user be messaged?"""
if not await check_moddable(target, interaction, ["moderate_members"]): await moderate(
return ctx=interaction,
target=target,
await interaction.response.send_message( silent=silent,
content=f"{target.mention} has been warned!\n**Reason** - `{reason}`" permissions=["moderate_members"],
moderation_type=type_registry['warn'],
reason=reason,
) )
if silent is None:
silent = not await config.guild(interaction.guild).dm_users()
if silent is False:
try:
embed = await message_factory(
bot=interaction.client,
color=await self.bot.get_embed_color(interaction.channel),
guild=interaction.guild,
moderator=interaction.user,
reason=reason,
moderation_type="warned",
response=await interaction.original_response(),
)
await target.send(embed=embed, file=get_footer_image(self))
except discord.errors.HTTPException:
pass
moderation = await Moderation.log(
interaction.client,
interaction.guild.id,
interaction.user.id,
"WARN",
"USER",
target.id,
None,
None,
reason,
)
await interaction.edit_original_response(
content=f"{target.mention} has been warned! (Case `#{moderation.id:,}`)\n**Reason** - `{reason}`"
)
await log(interaction, moderation.id)
await send_evidenceformat(interaction, moderation.id)
@app_commands.command(name="addrole") @app_commands.command(name="addrole")
async def addrole( async def addrole(
self, self,
@ -333,87 +267,16 @@ class Aurora(commands.Cog):
How long are you adding this role for? How long are you adding this role for?
silent: bool silent: bool
Should the user be messaged?""" Should the user be messaged?"""
addrole_whitelist = await config.guild(interaction.guild).addrole_whitelist() await moderate(
ctx=interaction,
if not addrole_whitelist: target=target,
await interaction.response.send_message( silent=silent,
content=error("There are no whitelisted roles set for this server!"), permissions=["moderate_members", "manage_roles"],
ephemeral=True, moderation_type=type_registry['addrole'],
) reason=reason,
return role=role,
duration=duration
if duration is not None:
parsed_time = parse_timedelta(duration)
if parsed_time is None:
await interaction.response.send_message(
content=error("Please provide a valid duration!"), ephemeral=True
)
return
else:
parsed_time = None
if role.id not in addrole_whitelist:
await interaction.response.send_message(
content=error("That role isn't whitelisted!"), ephemeral=True
)
return
if not await check_moddable(
target, interaction, ["moderate_members", "manage_roles"]
):
return
if role.id in [user_role.id for user_role in target.roles]:
await interaction.response.send_message(
content=error(f"{target.mention} already has this role!"),
ephemeral=True,
)
return
await interaction.response.defer()
if silent is None:
silent = not await config.guild(interaction.guild).dm_users()
if silent is False:
try:
embed = await message_factory(
bot=interaction.client,
color=await self.bot.get_embed_color(interaction.channel),
guild=interaction.guild,
moderator=interaction.user,
reason=reason,
moderation_type="addrole",
response=await interaction.original_response(),
duration=parsed_time,
role=role,
)
await target.send(embed=embed, file=get_footer_image(self))
except discord.errors.HTTPException:
pass
await target.add_roles(
role,
reason=f"Role added by {interaction.user.id}{' for ' + humanize_timedelta(timedelta=parsed_time) if parsed_time != 'NULL' else ''} for: {reason}",
) )
response: discord.WebhookMessage = await interaction.followup.send(
content=f"{target.mention} has been given the {role.mention} role{' for ' + humanize_timedelta(timedelta=parsed_time) if parsed_time != 'NULL' else ''}!\n**Reason** - `{reason}`"
)
moderation = await Moderation.log(
interaction.client,
interaction.guild.id,
interaction.user.id,
"ADDROLE",
"USER",
target.id,
role.id,
parsed_time,
reason,
)
await response.edit(
content=f"{target.mention} has been given the {role.mention} role{' for ' + humanize_timedelta(timedelta=parsed_time) if parsed_time != 'NULL' else ''}! (Case `#{moderation.id:,}`)\n**Reason** - `{reason}`",
)
await log(interaction, moderation.id)
await send_evidenceformat(interaction, moderation.id)
@app_commands.command(name="removerole") @app_commands.command(name="removerole")
async def removerole( async def removerole(
@ -439,87 +302,16 @@ class Aurora(commands.Cog):
How long are you removing this role for? How long are you removing this role for?
silent: bool silent: bool
Should the user be messaged?""" Should the user be messaged?"""
addrole_whitelist = await config.guild(interaction.guild).addrole_whitelist() await moderate(
ctx=interaction,
if not addrole_whitelist: target=target,
await interaction.response.send_message( silent=silent,
content=error("There are no whitelisted roles set for this server!"), permissions=["moderate_members", "manage_roles"],
ephemeral=True, moderation_type=type_registry['removerole'],
) reason=reason,
return role=role,
duration=duration
if duration is not None:
parsed_time = parse_timedelta(duration)
if parsed_time is None:
await interaction.response.send_message(
content=error("Please provide a valid duration!"), ephemeral=True
)
return
else:
parsed_time = None
if role.id not in addrole_whitelist:
await interaction.response.send_message(
content=error("That role isn't whitelisted!"), ephemeral=True
)
return
if not await check_moddable(
target, interaction, ["moderate_members", "manage_roles"]
):
return
if role.id not in [user_role.id for user_role in target.roles]:
await interaction.response.send_message(
content=error(f"{target.mention} does not have this role!"),
ephemeral=True,
)
return
await interaction.response.defer()
if silent is None:
silent = not await config.guild(interaction.guild).dm_users()
if silent is False:
try:
embed = await message_factory(
bot=interaction.client,
color=await self.bot.get_embed_color(interaction.channel),
guild=interaction.guild,
moderator=interaction.user,
reason=reason,
moderation_type="removerole",
response=await interaction.original_response(),
duration=parsed_time,
role=role,
)
await target.send(embed=embed, file=get_footer_image(self))
except discord.errors.HTTPException:
pass
await target.remove_roles(
role,
reason=f"Role removed by {interaction.user.id}{' for ' + humanize_timedelta(timedelta=parsed_time) if parsed_time != 'NULL' else ''} for: {reason}",
) )
response: discord.WebhookMessage = await interaction.followup.send(
content=f"{target.mention} has had the {role.mention} role removed{' for ' + humanize_timedelta(timedelta=parsed_time) if parsed_time != 'NULL' else ''}!\n**Reason** - `{reason}`"
)
moderation = await Moderation.log(
interaction.client,
interaction.guild.id,
interaction.user.id,
"REMOVEROLE",
"USER",
target.id,
role.id,
parsed_time,
reason,
)
await response.edit(
content=f"{target.mention} has had the {role.mention} role removed{' for ' + humanize_timedelta(timedelta=parsed_time) if parsed_time != 'NULL' else ''}! (Case `#{moderation.id:,}`)\n**Reason** - `{reason}`",
)
await log(interaction, moderation.id)
await send_evidenceformat(interaction, moderation.id)
@app_commands.command(name="mute") @app_commands.command(name="mute")
async def mute( async def mute(
@ -542,73 +334,16 @@ class Aurora(commands.Cog):
Why are you unbanning this user? Why are you unbanning this user?
silent: bool silent: bool
Should the user be messaged?""" Should the user be messaged?"""
if not await check_moddable(target, interaction, ["moderate_members"]): await moderate(
return ctx=interaction,
target=target,
if target.is_timed_out() is True: silent=silent,
await interaction.response.send_message( permissions=["moderate_members"],
error(f"{target.mention} is already muted!"), moderation_type=type_registry['mute'],
allowed_mentions=discord.AllowedMentions(users=False), duration=duration,
ephemeral=True, reason=reason,
)
return
try:
parsed_time = parse_timedelta(duration, maximum=timedelta(days=28))
if parsed_time is None:
await interaction.response.send_message(
error("Please provide a valid duration!"), ephemeral=True
)
return
except commands.BadArgument:
await interaction.response.send_message(
error("Please provide a duration that is less than 28 days."), ephemeral=True
)
return
await target.timeout(
parsed_time, reason=f"Muted by {interaction.user.id} for: {reason}"
) )
await interaction.response.send_message(
content=f"{target.mention} has been muted for {humanize_timedelta(timedelta=parsed_time)}!\n**Reason** - `{reason}`"
)
if silent is None:
silent = not await config.guild(interaction.guild).dm_users()
if silent is False:
try:
embed = await message_factory(
bot=interaction.client,
color=await self.bot.get_embed_color(interaction.channel),
guild=interaction.guild,
moderator=interaction.user,
reason=reason,
moderation_type="muted",
response=await interaction.original_response(),
duration=parsed_time,
)
await target.send(embed=embed, file=get_footer_image(self))
except discord.errors.HTTPException:
pass
moderation = await Moderation.log(
interaction.client,
interaction.guild.id,
interaction.user.id,
"MUTE",
"USER",
target.id,
None,
parsed_time,
reason,
)
await interaction.edit_original_response(
content=f"{target.mention} has been muted for {humanize_timedelta(timedelta=parsed_time)}! (Case `#{moderation.id:,}`)\n**Reason** - `{reason}`"
)
await log(interaction, moderation.id)
await send_evidenceformat(interaction, moderation.id)
@app_commands.command(name="unmute") @app_commands.command(name="unmute")
async def unmute( async def unmute(
self, self,
@ -627,63 +362,15 @@ class Aurora(commands.Cog):
Why are you unmuting this user? Why are you unmuting this user?
silent: bool silent: bool
Should the user be messaged?""" Should the user be messaged?"""
if not await check_moddable(target, interaction, ["moderate_members"]): await moderate(
return ctx=interaction,
target=target,
if target.is_timed_out() is False: silent=silent,
await interaction.response.send_message( permissions=["moderate_members"],
error(f"{target.mention} is not muted!"), moderation_type=type_registry['unmute'],
allowed_mentions=discord.AllowedMentions(users=False), reason=reason,
ephemeral=True,
)
return
if reason:
await target.timeout(
None, reason=f"Unmuted by {interaction.user.id} for: {reason}"
)
else:
await target.timeout(None, reason=f"Unbanned by {interaction.user.id}")
reason = "No reason given."
await interaction.response.send_message(
content=f"{target.mention} has been unmuted!\n**Reason** - `{reason}`"
) )
if silent is None:
silent = not await config.guild(interaction.guild).dm_users()
if silent is False:
try:
embed = await message_factory(
bot=interaction.client,
color=await self.bot.get_embed_color(interaction.channel),
guild=interaction.guild,
moderator=interaction.user,
reason=reason,
moderation_type="unmuted",
response=await interaction.original_response(),
)
await target.send(embed=embed, file=get_footer_image(self))
except discord.errors.HTTPException:
pass
moderation = await Moderation.log(
interaction.client,
interaction.guild.id,
interaction.user.id,
"UNMUTE",
"USER",
target.id,
None,
None,
reason,
)
await interaction.edit_original_response(
content=f"{target.mention} has been unmuted! (Case `#{moderation.id:,}`)\n**Reason** - `{reason}`"
)
await log(interaction, moderation.id)
await send_evidenceformat(interaction, moderation.id)
@app_commands.command(name="kick") @app_commands.command(name="kick")
async def kick( async def kick(
self, self,
@ -702,49 +389,15 @@ class Aurora(commands.Cog):
Why are you kicking this user? Why are you kicking this user?
silent: bool silent: bool
Should the user be messaged?""" Should the user be messaged?"""
if not await check_moddable(target, interaction, ["kick_members"]): await moderate(
return ctx=interaction,
target=target,
await interaction.response.send_message( silent=silent,
content=f"{target.mention} has been kicked!\n**Reason** - `{reason}`" permissions=["kick_members"],
moderation_type=type_registry['kick'],
reason=reason,
) )
if silent is None:
silent = not await config.guild(interaction.guild).dm_users()
if silent is False:
try:
embed = await message_factory(
bot=interaction.client,
color=await self.bot.get_embed_color(interaction.channel),
guild=interaction.guild,
moderator=interaction.user,
reason=reason,
moderation_type="kicked",
response=await interaction.original_response(),
)
await target.send(embed=embed, file=get_footer_image(self))
except discord.errors.HTTPException:
pass
await target.kick(reason=f"Kicked by {interaction.user.id} for: {reason}")
moderation = await Moderation.log(
interaction.client,
interaction.guild.id,
interaction.user.id,
"KICK",
"USER",
target.id,
None,
None,
reason,
)
await interaction.edit_original_response(
content=f"{target.mention} has been kicked! (Case `#{moderation.id:,}`)\n**Reason** - `{reason}`"
)
await log(interaction, moderation.id)
await send_evidenceformat(interaction, moderation.id)
@app_commands.command(name="ban") @app_commands.command(name="ban")
@app_commands.choices( @app_commands.choices(
delete_messages=[ delete_messages=[
@ -781,22 +434,22 @@ class Aurora(commands.Cog):
Should the user be messaged?""" Should the user be messaged?"""
if duration: if duration:
await moderate( await moderate(
interaction, ctx=interaction,
target, target=target,
silent, silent=silent,
["ban_members"], permissions=["ban_members"],
Tempban, moderation_type=type_registry['tempban'],
reason=reason, reason=reason,
duration=duration, duration=duration,
delete_messages=delete_messages, delete_messages=delete_messages,
) )
else: else:
await moderate( await moderate(
interaction, ctx=interaction,
target, target=target,
silent, silent=silent,
["ban_members"], permissions=["ban_members"],
Ban, moderation_type=type_registry['ban'],
reason=reason, reason=reason,
delete_messages=delete_messages, delete_messages=delete_messages,
) )
@ -819,65 +472,15 @@ class Aurora(commands.Cog):
Why are you unbanning this user? Why are you unbanning this user?
silent: bool silent: bool
Should the user be messaged?""" Should the user be messaged?"""
if not await check_moddable(target, interaction, ["ban_members"]): await moderate(
return ctx=interaction,
target=target,
try: silent=silent,
await interaction.guild.fetch_ban(target) permissions=["ban_members"],
except discord.errors.NotFound: moderation_type=type_registry['unban'],
await interaction.response.send_message( reason=reason,
content=error(f"{target.mention} is not banned!"), ephemeral=True
)
return
if reason:
await interaction.guild.unban(
target, reason=f"Unbanned by {interaction.user.id} for: {reason}"
)
else:
await interaction.guild.unban(
target, reason=f"Unbanned by {interaction.user.id}"
)
reason = "No reason given."
await interaction.response.send_message(
content=f"{target.mention} has been unbanned!\n**Reason** - `{reason}`"
) )
if silent is None:
silent = not await config.guild(interaction.guild).dm_users()
if silent is False:
try:
embed = await message_factory(
bot=interaction.client,
color=await self.bot.get_embed_color(interaction.channel),
guild=interaction.guild,
moderator=interaction.user,
reason=reason,
moderation_type="unbanned",
response=await interaction.original_response(),
)
await target.send(embed=embed, file=get_footer_image(self))
except discord.errors.HTTPException:
pass
moderation = await Moderation.log(
interaction.client,
interaction.guild.id,
interaction.user.id,
"UNBAN",
"USER",
target.id,
None,
None,
reason,
)
await interaction.edit_original_response(
content=f"{target.mention} has been unbanned! (Case `#{moderation.id:,}`)\n**Reason** - `{reason}`"
)
await log(interaction, moderation.id)
await send_evidenceformat(interaction, moderation.id)
@app_commands.command(name="slowmode") @app_commands.command(name="slowmode")
async def slowmode( async def slowmode(
self, self,
@ -899,27 +502,15 @@ class Aurora(commands.Cog):
if channel is None: if channel is None:
channel = interaction.channel channel = interaction.channel
if not await check_moddable(channel, interaction, ["manage_channels"]): await moderate(
return ctx=interaction,
target=channel,
await channel.edit(slowmode_delay=interval) silent=True,
await interaction.response.send_message(f"Slowmode in {channel.mention} has been set to {interval} seconds!\n**Reason** - `{reason}`") permissions=["manage_channel"],
moderation_type=type_registry['slowmode'],
moderation = await Moderation.log( interval=interval,
interaction.client, reason=reason,
interaction.guild.id,
interaction.user.id,
"SLOWMODE",
"CHANNEL",
channel.id,
None,
None,
reason,
metadata={"interval": f"{interval} seconds"}
) )
await interaction.edit_original_response(content=f"Slowmode in {channel.mention} has been set to {interval} seconds! (Case `#{moderation.id:,}`)\n**Reason** - `{reason}`")
await log(interaction, moderation.id)
await send_evidenceformat(interaction, moderation.id)
@app_commands.command(name="history") @app_commands.command(name="history")
async def history( async def history(
@ -1132,7 +723,7 @@ class Aurora(commands.Cog):
return return
try: try:
await moderation.resolve(interaction.user.id, reason) success, msg = await moderation.resolve(interaction.user.id, reason)
except (ValueError, TypeError) as e: except (ValueError, TypeError) as e:
if e == ValueError: if e == ValueError:
await interaction.response.send_message( await interaction.response.send_message(
@ -1148,9 +739,10 @@ class Aurora(commands.Cog):
moderation=moderation, moderation=moderation,
) )
await interaction.response.send_message( await interaction.response.send_message(
content=f"✅ Moderation #{case:,} resolved!", embed=embed content=f"✅ Moderation #{case:,} resolved!\n" + error(f"Resolve handler returned an error message: `{msg}`") if success is False else "", embed=embed
) )
await log(interaction, case, resolved=True) ctx = await self.bot.get_context(interaction, cls=commands.Context)
await log(ctx=ctx, moderation_id=case, resolved=True)
@app_commands.command(name="case") @app_commands.command(name="case")
@app_commands.choices( @app_commands.choices(
@ -1320,27 +912,14 @@ class Aurora(commands.Cog):
moderation.end_timestamp = moderation.timestamp + moderation.duration.total_seconds() moderation.end_timestamp = moderation.timestamp + moderation.duration.total_seconds()
if moderation.type == "MUTE": try:
if ( success = await moderation.type.duration_edit_handler(interaction=interaction.client, old_moderation=old_moderation, new_moderation=moderation)
time.time() - moderation.unix_timestamp except NotImplementedError:
) + moderation.duration.total_seconds() > 2419200: return await interaction.response.send_message(
return await interaction.response.send_message( error("This case type does not support duration editing!"), ephemeral=True
error( )
"Please provide a duration that is less than 28 days from the initial moderation." if not success:
) return
)
try:
member = await interaction.guild.fetch_member(
moderation.target_id
)
await member.timeout(
moderation.duration,
reason=f"Case #{case:,} edited by {interaction.user.id}",
)
except discord.NotFound:
pass
if reason: if reason:
moderation.reason = reason moderation.reason = reason
@ -1392,147 +971,64 @@ class Aurora(commands.Cog):
global_unban_num = 0 global_unban_num = 0
global_addrole_num = 0 global_addrole_num = 0
global_removerole_num = 0 global_removerole_num = 0
global_other_num = 0
guilds: list[discord.Guild] = self.bot.guilds guilds: list[discord.Guild] = self.bot.guilds
for guild in guilds: for guild in guilds:
if not await self.bot.cog_disabled_in_guild(self, guild): if not await self.bot.cog_disabled_in_guild(self, guild):
time_per_guild = time.time() time_per_guild = time.time()
tempban_query = f"SELECT * FROM moderation_{guild.id} WHERE end_timestamp IS NOT NULL AND end_timestamp <= ? AND moderation_type = 'TEMPBAN' AND expired = 0" query = f"SELECT * FROM moderation_{guild.id} WHERE end_timestamp IS NOT NULL AND end_timestamp <= ? AND expired = 0"
tempbans = await Moderation.execute(bot=self.bot, guild_id=guild.id, query=tempban_query, parameters=(time.time(),)) moderations = await Moderation.execute(bot=self.bot, guild_id=guild.id, query=query, parameters=(time.time(),))
unban_num = 0 unban_num = 0
for moderation in tempbans:
user = self.bot.get_user(moderation.target_id)
if user is None:
try:
user = await self.bot.fetch_user(moderation.target_id)
except discord.errors.NotFound:
continue
name = (
f"{user.name}#{user.discriminator}"
if user.discriminator != "0"
else user.name
)
try:
await guild.unban(
user, reason=f"Automatic unban from case #{moderation.id}"
)
embed = await message_factory(
bot=self.bot,
color=await self.bot.get_embed_color(guild.channels[0]),
guild=guild,
reason=f"Automatic unban from case #{moderation.id}",
moderation_type="unbanned",
)
try:
await user.send(embed=embed, file=get_footer_image(self))
except discord.errors.HTTPException:
pass
logger.trace(
"Unbanned %s (%s) from %s (%s)",
name,
user.id,
guild.name,
guild.id,
)
unban_num += 1
except (
discord.errors.NotFound,
discord.errors.Forbidden,
discord.errors.HTTPException,
) as e:
logger.error(
"Failed to unban %s (%s) from %s (%s)\n%s",
name,
user.id,
guild.name,
guild.id,
e,
)
removerole_num = 0 removerole_num = 0
addrole_query = f"SELECT * FROM moderation_{guild.id} WHERE end_timestamp IS NOT NULL AND end_timestamp <= ? AND moderation_type = 'ADDROLE' AND expired = 0"
addroles = await Moderation.execute(bot=self.bot, guild_id=guild.id, query=addrole_query, parameters=(time.time(),))
for moderation in addroles:
try:
member = await guild.fetch_member(moderation.target_id)
await member.remove_roles(
Object(moderation.role_id), reason=f"Automatic role removal from case #{moderation.id}"
)
logger.trace(
"Removed role %s from %s (%s)",
moderation.role_id,
member.name,
member.id,
)
removerole_num = removerole_num + 1
except (
discord.errors.NotFound,
discord.errors.Forbidden,
discord.errors.HTTPException,
) as e:
logger.error(
"Removing the role %s from user %s failed due to: \n%s",
moderation.role_id,
moderation.target_id,
e,
)
continue
addrole_num = 0 addrole_num = 0
removerole_query = f"SELECT * FROM moderation_{guild.id} WHERE end_timestamp IS NOT NULL AND end_timestamp <= ? AND moderation_type = 'REMOVEROLE' AND expired = 0" other_num = 0
removeroles = await Moderation.execute(bot=self.bot, guild_id=guild.id, query=removerole_query, parameters=(time.time(),)) for moderation in moderations:
for moderation in removeroles:
try: try:
member = await guild.fetch_member(moderation.target_id) num = await moderation.type.expiry_handler(self.bot, guild, moderation)
except NotImplementedError:
await member.add_roles( logger.warning("Expiry handler not implemented for expirable moderation type %s", moderation.type.key)
Object(moderation.role_id), reason=f"Automatic role addition from case #{moderation.id}"
)
logger.trace("Added role %s to %s (%s)", moderation.role_id, member.name, member.id)
addrole_num = addrole_num + 1
except (
discord.errors.NotFound,
discord.errors.Forbidden,
discord.errors.HTTPException,
) as e:
logger.error("Adding the role %s to user %s failed due to: \n%s", moderation.role_id, moderation.target_id, e)
continue continue
match moderation.type.key:
case "tempban":
unban_num += num
case "addrole":
removerole_num += num
case "removerole":
addrole_num += num
case _:
other_num += num if isinstance(num, int) else 0
expiry_query = f"UPDATE `moderation_{guild.id}` SET expired = 1 WHERE (end_timestamp IS NOT NULL AND end_timestamp <= ? AND expired = 0) OR (expired = 0 AND resolved = 1);" expiry_query = f"UPDATE `moderation_{guild.id}` SET expired = 1 WHERE (end_timestamp IS NOT NULL AND end_timestamp <= ? AND expired = 0) OR (expired = 0 AND resolved = 1);"
await Moderation.execute(bot=self.bot, guild_id=guild.id, query=expiry_query, parameters=(time.time(),), return_obj=False) await Moderation.execute(bot=self.bot, guild_id=guild.id, query=expiry_query, parameters=(time.time(),), return_obj=False)
per_guild_completion_time = (time.time() - time_per_guild) * 1000 per_guild_completion_time = (time.time() - time_per_guild) * 1000
logger.debug( logger.debug(
"Completed expiry loop for %s (%s) in %sms with %s users unbanned, %s roles added, and %s roles removed", "Completed expiry loop for %s (%s) in %sms with %s users unbanned, %s roles added, and %s roles removed (%s other cases expired)",
guild.name, guild.name,
guild.id, guild.id,
f"{per_guild_completion_time:.6f}", f"{per_guild_completion_time:.6f}",
unban_num, unban_num,
addrole_num, addrole_num,
removerole_num, removerole_num,
other_num
) )
global_unban_num = global_unban_num + unban_num global_unban_num = global_unban_num + unban_num
global_addrole_num = global_addrole_num + addrole_num global_addrole_num = global_addrole_num + addrole_num
global_removerole_num = global_removerole_num + removerole_num global_removerole_num = global_removerole_num + removerole_num
global_other_num = global_other_num + other_num
completion_time = (time.time() - current_time) * 1000 completion_time = (time.time() - current_time) * 1000
logger.debug( logger.debug(
"Completed expiry loop in %sms with %s users unbanned, %s roles added, and %s roles removed", "Completed expiry loop in %sms with %s users unbanned, %s roles added, and %s roles removed (%s other cases expired)",
f"{completion_time:.6f}", f"{completion_time:.6f}",
global_unban_num, global_unban_num,
global_addrole_num, global_addrole_num,
global_removerole_num, global_removerole_num,
global_other_num
) )
######################################################################################################################## ########################################################################################################################

View file

@ -9,9 +9,8 @@ from redbot.core import commands, data_manager
from redbot.core.utils.chat_formatting import warning from redbot.core.utils.chat_formatting import warning
from ..models.moderation import Moderation from ..models.moderation import Moderation
from ..models.type import Type from ..models.type import Type, type_registry
from ..utilities.json import dump from ..utilities.json import dump
from ..utilities.registry import type_registry
from ..utilities.utils import create_guild_table, timedelta_from_string from ..utilities.utils import create_guild_table, timedelta_from_string

View file

@ -0,0 +1,2 @@
from .moderation_types import * # noqa: F403
# This just imports all the built-in moderation types so they can be registered, as they aren't imported anywhere else.

View file

@ -11,15 +11,64 @@ from redbot.core import data_manager
from redbot.core.bot import Red from redbot.core.bot import Red
from ..utilities.logger import logger from ..utilities.logger import logger
from ..utilities.registry import type_registry
from ..utilities.utils import timedelta_to_string from ..utilities.utils import timedelta_to_string
from .base import AuroraGuildModel from .base import AuroraGuildModel
from .change import Change from .change import Change
from .partials import PartialChannel, PartialRole, PartialUser from .partials import PartialChannel, PartialRole, PartialUser
from .type import Type from .type import Type, type_registry
class Moderation(AuroraGuildModel): class Moderation(AuroraGuildModel):
"""This class represents a moderation case in the database.
Attributes:
bot (Red): The bot instance.
guild (discord.Guild): The guild the case belongs to.
moderation_id (int): The ID of the moderation case.
timestamp (datetime): The timestamp of the case.
moderation_type (Type): The type of moderation case.
target_type (str): The type of target. Should be either `user` or `channel`.
target_id (int): The ID of the target.
moderator_id (int): The ID of the moderator who issued the case.
role_id (int): The ID of the role, if applicable.
duration (timedelta): The duration of the case, if applicable.
end_timestamp (datetime): The end timestamp of the case, if applicable.
reason (str): The reason for the case.
resolved (bool): Whether the case is resolved.
resolved_by (int): The ID of the user who resolved the case.
resolve_reason (str): The reason the case was resolved.
expired (bool): Whether the case is expired.
changes (List[Change]): A list of changes to the case.
metadata (Dict): A dictionary of metadata stored with the case.
Properties:
id (int): The ID of the case.
type (Type): The type of the case.
unix_timestamp (int): The timestamp of the case as a Unix timestamp.
Methods:
get_moderator: Gets the moderator who issued the case.
get_target: Gets the target of the case.
get_resolved_by: Gets the user who resolved the case.
get_role: Gets the role, if applicable.
resolve: Resolves the case.
update: Updates the case in the database.
Class Methods:
from_dict: Creates a `Moderation` object from a dictionary.
from_result: Creates a `Moderation` object from a database result.
execute: Executes a query on the database.
get_latest: Gets the latest cases from the database.
get_next_case_number: Gets the next case number to use.
find_by_id: Finds a case by its ID.
find_by_target: Finds cases by the target.
find_by_moderator: Finds cases by the moderator.
log: Logs a moderation case in the database.
Static Methods:
connect: Connects to the SQLite database.
"""
moderation_id: int moderation_id: int
timestamp: datetime timestamp: datetime
moderation_type: Type moderation_type: Type
@ -53,7 +102,7 @@ class Moderation(AuroraGuildModel):
return await PartialUser.from_id(self.bot, self.moderator_id) return await PartialUser.from_id(self.bot, self.moderator_id)
async def get_target(self) -> Union["PartialUser", "PartialChannel"]: async def get_target(self) -> Union["PartialUser", "PartialChannel"]:
if str.lower(self.target_type) == "user": if self.target_type.lower() == "user":
return await PartialUser.from_id(self.bot, self.target_id) return await PartialUser.from_id(self.bot, self.target_id)
return await PartialChannel.from_id(self.bot, self.target_id, self.guild) return await PartialChannel.from_id(self.bot, self.target_id, self.guild)
@ -73,7 +122,7 @@ class Moderation(AuroraGuildModel):
def __int__(self) -> int: def __int__(self) -> int:
return self.moderation_id return self.moderation_id
async def resolve(self, resolved_by: int, reason: str) -> None: async def resolve(self, resolved_by: int, reason: str) -> Tuple[bool, str]:
if self.resolved: if self.resolved:
raise ValueError("Case is already resolved!") raise ValueError("Case is already resolved!")
@ -81,7 +130,7 @@ class Moderation(AuroraGuildModel):
self.resolved_by = resolved_by self.resolved_by = resolved_by
self.resolve_reason = reason self.resolve_reason = reason
await self.type.resolve_handler(self.bot, self.guild, await self.get_target(), reason) success, msg = await self.type.resolve_handler(moderation=self, reason=reason)
# if self.type in ["UNMUTE", "UNBAN"]: # if self.type in ["UNMUTE", "UNBAN"]:
# raise TypeError("Cannot resolve an unmute or unban case!") # raise TypeError("Cannot resolve an unmute or unban case!")
@ -121,6 +170,7 @@ class Moderation(AuroraGuildModel):
})) }))
await self.update() await self.update()
return success, msg
async def update(self) -> None: async def update(self) -> None:
from ..utilities.json import dumps from ..utilities.json import dumps
@ -164,19 +214,19 @@ class Moderation(AuroraGuildModel):
) )
@classmethod @classmethod
def from_dict(cls, bot: Red, data: dict) -> "Moderation": async def from_dict(cls, bot: Red, data: dict) -> "Moderation":
if data.get("guild_id"): if data.get("guild_id"):
try: try:
guild: discord.Guild = bot.get_guild(data["guild_id"]) guild = bot.get_guild(data["guild_id"])
if not guild: if not guild:
guild = bot.fetch_guild(data["guild_id"]) guild = await bot.fetch_guild(data["guild_id"])
except (discord.Forbidden, discord.HTTPException): except (discord.Forbidden, discord.HTTPException):
guild = None guild = None
data.update({"guild": guild}) data.update({"guild": guild})
return cls(bot=bot, **data) return cls(bot=bot, **data)
@classmethod @classmethod
def from_result(cls, bot: Red, result: Iterable, guild_id: int) -> "Moderation": async def from_result(cls, bot: Red, result: Iterable, guild_id: int) -> "Moderation":
if result[7] is not None and result[7] != "NULL": if result[7] is not None and result[7] != "NULL":
hours, minutes, seconds = map(int, result[7].split(':')) hours, minutes, seconds = map(int, result[7].split(':'))
duration = timedelta(hours=hours, minutes=minutes, seconds=seconds) duration = timedelta(hours=hours, minutes=minutes, seconds=seconds)
@ -220,7 +270,7 @@ class Moderation(AuroraGuildModel):
"changes": change_obj_list, "changes": change_obj_list,
"metadata": metadata if metadata else {}, "metadata": metadata if metadata else {},
} }
return cls.from_dict(bot=bot, data=case) return await cls.from_dict(bot=bot, data=case)
@staticmethod @staticmethod
async def connect() -> Connection: async def connect() -> Connection:
@ -239,6 +289,17 @@ class Moderation(AuroraGuildModel):
@classmethod @classmethod
async def execute(cls, query: str, parameters: tuple | None = None, bot: Red | None = None, guild_id: int | None = None, cursor: Cursor | None = None, return_obj: bool = True) -> Union[Tuple["Moderation"], Iterable[Row]]: async def execute(cls, query: str, parameters: tuple | None = None, bot: Red | None = None, guild_id: int | None = None, cursor: Cursor | None = None, return_obj: bool = True) -> Union[Tuple["Moderation"], Iterable[Row]]:
"""Executes a query on the database.
Arguments:
query (str): The query to execute.
parameters (tuple): The parameters to pass to the query.
bot (Red): The bot instance.
guild_id (int): The ID of the guild to execute the query on.
cursor (Cursor): The cursor to use for the query.
return_obj (bool): Whether to return the case object(s). Defaults to `True`. If `False`, returns a `Iterable` of `aiosqlite.Row` objects.
Returns: The result of the query, either as a `Tuple` of `Moderation` objects or an `Iterable` of `aiosqlite.Row` objects.
"""
logger.trace("Executing query: \"%s\" with parameters \"%s\"", query, parameters) logger.trace("Executing query: \"%s\" with parameters \"%s\"", query, parameters)
if not parameters: if not parameters:
parameters = () parameters = ()
@ -261,18 +322,19 @@ class Moderation(AuroraGuildModel):
for result in results: for result in results:
if result[0] == 0: if result[0] == 0:
continue continue
case = cls.from_result(bot=bot, result=result, guild_id=guild_id) case = await cls.from_result(bot=bot, result=result, guild_id=guild_id)
cases.append(case) cases.append(case)
return tuple(cases) return tuple(cases)
return results return results
@classmethod @classmethod
async def get_latest(cls, bot: Red, guild_id: int, limit: int | None = None, offset: int = 0, types: Iterable | None = None, cursor: Cursor | None = None) -> Tuple["Moderation"]: async def get_latest(cls, bot: Red, guild_id: int, limit: int | None = None, offset: int = 0, types: Iterable[Type] | None = None, cursor: Cursor | None = None) -> Tuple["Moderation"]:
params = [] params = []
query = f"SELECT * FROM moderation_{guild_id} ORDER BY moderation_id DESC" query = f"SELECT * FROM moderation_{guild_id} ORDER BY moderation_id DESC"
if types: if types:
query += f" WHERE moderation_type IN ({', '.join(['?' for _ in types])})" query += f" WHERE moderation_type IN ({', '.join(['?' for _ in types])})"
params.extend(types) for t in types:
params.append(t.key)
if limit: if limit:
query += " LIMIT ? OFFSET ?" query += " LIMIT ? OFFSET ?"
params.extend((limit, offset)) params.extend((limit, offset))
@ -293,22 +355,22 @@ class Moderation(AuroraGuildModel):
raise ValueError(f"Case {moderation_id} not found in moderation_{guild_id}!") raise ValueError(f"Case {moderation_id} not found in moderation_{guild_id}!")
@classmethod @classmethod
async def find_by_target(cls, bot: Red, guild_id: int, target: int, types: Iterable | None = None, cursor: Cursor | None = None) -> Tuple["Moderation"]: async def find_by_target(cls, bot: Red, guild_id: int, target: int, types: Iterable[Type] | None = None, cursor: Cursor | None = None) -> Tuple["Moderation"]:
query = f"SELECT * FROM moderation_{guild_id} WHERE target_id = ?" query = f"SELECT * FROM moderation_{guild_id} WHERE target_id = ?"
if types: if types:
query += f" AND moderation_type IN ({', '.join(['?' for _ in types])})" query += f" AND moderation_type IN ({', '.join(['?' for _ in types])})"
query += " ORDER BY moderation_id DESC;" query += " ORDER BY moderation_id DESC;"
return await cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=(target, *types) if types else (target,), cursor=cursor) return await cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=(target, *[t.key for t in types]) if types else (target,), cursor=cursor)
@classmethod @classmethod
async def find_by_moderator(cls, bot: Red, guild_id: int, moderator: int, types: Iterable | None = None, cursor: Cursor | None = None) -> Tuple["Moderation"]: async def find_by_moderator(cls, bot: Red, guild_id: int, moderator: int, types: Iterable[Type] | None = None, cursor: Cursor | None = None) -> Tuple["Moderation"]:
query = f"SELECT * FROM moderation_{guild_id} WHERE moderator_id = ?" query = f"SELECT * FROM moderation_{guild_id} WHERE moderator_id = ?"
if types: if types:
query += f" AND moderation_type IN ({', '.join(['?' for _ in types])})" query += f" AND moderation_type IN ({', '.join(['?' for _ in types])})"
query += " ORDER BY moderation_id DESC;" query += " ORDER BY moderation_id DESC;"
return await cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=(moderator, *types) if types else (moderator,), cursor=cursor) return await cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=(moderator, *[t.key for t in types]) if types else (moderator,), cursor=cursor)
@classmethod @classmethod
async def log( async def log(
@ -332,6 +394,31 @@ class Moderation(AuroraGuildModel):
metadata: dict | None = None, metadata: dict | None = None,
return_obj: bool = True, return_obj: bool = True,
) -> Union["Moderation", int]: ) -> Union["Moderation", int]:
"""Logs a moderation case in the database.
Args:
bot (Red): The bot instance.
guild_id (int): The ID of the guild to log the case in.
moderator_id (int): The ID of the moderator who issued the case.
moderation_type (Type): The type of moderation case. See `aurora.models.moderation_types` for the built-in options.
target_type (str): The type of target. Should be either `user` or `channel`.
target_id (int): The ID of the target.
role_id (int): The ID of the role, if applicable.
duration (timedelta): The duration of the case, if applicable.
reason (str): The reason for the case.
database (sqlite3.Connection): The database connection to use to log the case. A connection will be automatically created if not provided.
timestamp (datetime): The timestamp of the case. Will be automatically generated if not provided.
resolved (bool): Whether the case is resolved.
resolved_by (int): The ID of the user who resolved the case.
resolved_reason (str): The reason the case was resolved.
expired (bool): Whether the case is expired.
changes (list): A list of changes to log. You usually shouldn't pass this, as it's automatically generated by the `/edit` and `/resolve` commands.
metadata (dict): A dictionary of metadata to store with the case.
return_obj (bool): Whether to return the case object. Defaults to `True`. If `False`, returns the case ID.
Returns:
Union[Moderation, int]: The `Moderation` object if `return_obj` is `True`, otherwise the case ID.
"""
from ..utilities.json import dumps from ..utilities.json import dumps
if not timestamp: if not timestamp:
timestamp = datetime.fromtimestamp(time()) timestamp = datetime.fromtimestamp(time())

View file

@ -1,19 +1,23 @@
from datetime import timedelta
from math import ceil from math import ceil
from time import time
from typing import Tuple
from discord import File, Guild, Member, TextChannel, User from discord import AllowedMentions, File, Interaction, Member, Object, Role, TextChannel, User
from discord.abc import Messageable from discord.abc import Messageable
from discord.errors import HTTPException, NotFound from discord.errors import Forbidden, HTTPException, NotFound
from redbot.core import app_commands, commands from redbot.core import app_commands, commands
from redbot.core.bot import Red from redbot.core.bot import Red
from redbot.core.commands.converter import parse_relativedelta from redbot.core.commands.converter import parse_relativedelta, parse_timedelta
from redbot.core.utils.chat_formatting import bold, error, humanize_timedelta, inline from redbot.core.utils.chat_formatting import bold, error, humanize_timedelta, inline
from ..utilities.factory import message_factory from ..utilities.config import config
from ..utilities.registry import type_registry from ..utilities.factory import message_factory, resolve_factory
from ..utilities.logger import logger
from ..utilities.utils import get_footer_image, log, send_evidenceformat, timedelta_from_relativedelta from ..utilities.utils import get_footer_image, log, send_evidenceformat, timedelta_from_relativedelta
from .moderation import Moderation from .moderation import Moderation
from .type import Type from .type import Type, type_registry
def get_icon(bot: Red) -> File: def get_icon(bot: Red) -> File:
@ -22,43 +26,597 @@ def get_icon(bot: Red) -> File:
return get_footer_image(cog) return get_footer_image(cog)
raise ValueError("Aurora cog not found. How was this managed?") raise ValueError("Aurora cog not found. How was this managed?")
@type_registry.register(key="note")
class Note(Type): class Note(Type):
key="note" key="note"
string="note" string="note"
verb="noted" verb="noted"
embed_desc="received a "
@classmethod
async def handler(cls, ctx: commands.Context, target: Member | User, silent: bool, reason: str) -> 'Note':
response = await ctx.send(
content=f"{target.mention} has {cls.embed_desc}{cls.string}!\n**Reason** - `{reason}`"
)
if silent is False:
try:
embed = await message_factory(
bot=ctx.bot,
color=await ctx.embed_color,
guild=ctx.guild,
moderator=ctx.author,
reason=reason,
moderation_type=cls(),
response=response,
)
await target.send(embed=embed, file=get_icon(ctx.bot))
except HTTPException:
pass
moderation = await Moderation.log(
bot=ctx.bot,
guild_id=ctx.guild.id,
moderator_id=ctx.author.id,
moderation_type=cls(),
target_type="user",
target_id=target.id,
role_id=None,
duration=None,
reason=reason,
)
await response.edit(
content=f"{target.mention} has {cls.embed_desc}{cls.string}! (Case `#{moderation.id:,}`)\n**Reason** - `{reason}`"
)
await log(ctx=ctx, moderation_id=moderation.id)
await send_evidenceformat(ctx=ctx, moderation_id=moderation.id)
return cls()
@classmethod
async def resolve_handler(cls, moderation: Moderation, reason: str) -> Tuple[bool, str]:
if await config.guild(moderation.guild).dm_users() is True:
try:
target = await moderation.bot.fetch_user(moderation.target_id)
embed = await resolve_factory(
moderation=moderation,
reason=reason
)
await target.send(embed=embed, file=get_icon(bot=moderation.bot))
except (Forbidden, HTTPException, NotFound):
pass
return True, ""
@type_registry.register(key="warn")
class Warn(Type): class Warn(Type):
key="warn" key="warn"
string="warn" string="warn"
verb="warned" verb="warned"
@type_registry.register(key="addrole") @classmethod
async def handler(cls, ctx: commands.Context, target: Member | User, silent: bool, reason: str) -> 'Warn':
response = await ctx.send(
content=f"{target.mention} has {cls.embed_desc}{cls.verb}!\n**Reason** - `{reason}`"
)
if silent is False:
try:
embed = await message_factory(
bot=ctx.bot,
color=await ctx.embed_color,
guild=ctx.guild,
moderator=ctx.author,
reason=reason,
moderation_type=cls(),
response=response,
)
await target.send(embed=embed, file=get_icon(ctx.bot))
except HTTPException:
pass
moderation = await Moderation.log(
bot=ctx.bot,
guild_id=ctx.guild.id,
moderator_id=ctx.author.id,
moderation_type=cls(),
target_type="user",
target_id=target.id,
role_id=None,
duration=None,
reason=reason,
)
await response.edit(
content=f"{target.mention} has {cls.embed_desc}{cls.verb}! (Case `#{moderation.id:,}`)\n**Reason** - `{reason}`"
)
await log(ctx=ctx, moderation_id=moderation.id)
await send_evidenceformat(ctx=ctx, moderation_id=moderation.id)
return cls()
@classmethod
async def resolve_handler(cls, moderation: Moderation, reason: str) -> Tuple[bool, str]:
if await config.guild(moderation.guild).dm_users() is True:
try:
target = await moderation.bot.fetch_user(moderation.target_id)
embed = await resolve_factory(
moderation=moderation,
reason=reason
)
await target.send(embed=embed, file=get_icon(bot=moderation.bot))
except (Forbidden, HTTPException, NotFound):
pass
return True, ""
class AddRole(Type): class AddRole(Type):
key="addrole" key="addrole"
string="addrole" string="addrole"
verb="added a role to" verb="added a role to"
embed_desc="been given the "
@classmethod
async def handler(cls, ctx: commands.Context, target: Member, role: Role, silent: bool, duration: str = None, reason: str = None):
addrole_whitelist = await config.guild(ctx.guild).addrole_whitelist()
if not addrole_whitelist:
await ctx.send(
content=error("There are no whitelisted roles set for this server!"),
ephemeral=True,
)
return
if duration is not None:
parsed_time = parse_timedelta(duration)
if parsed_time is None:
await ctx.send(
content=error("Please provide a valid duration!"), ephemeral=True
)
return
else:
parsed_time = None
if role.id not in addrole_whitelist:
await ctx.send(
content=error("That role isn't whitelisted!"), ephemeral=True
)
return
if role.id in [user_role.id for user_role in target.roles]:
await ctx.send(
content=error(f"{target.mention} already has this role!"),
ephemeral=True,
)
return
response = await ctx.send(
content=f"{target.mention} has {cls.embed_desc}{role.mention} role{' for ' + humanize_timedelta(timedelta=parsed_time) if parsed_time != 'NULL' else ''}!\n**Reason** - `{reason}`"
)
if silent is False:
try:
embed = await message_factory(
bot=ctx.bot,
color=await ctx.embed_color(),
guild=ctx.guild,
moderator=ctx.author,
reason=reason,
moderation_type=cls(),
response=response,
duration=parsed_time,
role=role,
)
await target.send(embed=embed, file=get_icon(ctx.bot))
except HTTPException:
pass
await target.add_roles(
role,
reason=f"Role added by {ctx.author.id}{' for ' + humanize_timedelta(timedelta=parsed_time) if parsed_time != 'NULL' else ''} for: {reason}",
)
moderation = await Moderation.log(
bot=ctx.bot,
guild_id=ctx.guild.id,
moderator_id=ctx.author.id,
moderation_type=cls(),
target_type="user",
target_id=target.id,
role_id=role.id,
duration=parsed_time,
reason=reason,
)
await response.edit(
content=f"{target.mention} has {cls.embed_desc}{role.mention} role{' for ' + humanize_timedelta(timedelta=parsed_time) if parsed_time != 'NULL' else ''}! (Case `#{moderation.id:,}`)\n**Reason** - `{reason}`",
)
await log(ctx=ctx, moderation_id=moderation.id)
await send_evidenceformat(ctx=ctx, moderation_id=moderation.id)
return cls()
@classmethod
async def duration_edit_handler(cls, interaction: Interaction, old_moderation: Moderation, new_moderation: Moderation) -> bool: # pylint: disable=unused-argument
return True
@classmethod
async def expiry_handler(cls, moderation: Moderation) -> int:
try:
target = await moderation.get_target()
await target.remove_roles(
Object(moderation.role_id), reason=f"Automatic role removal from case #{moderation.id}"
)
if await config.guild(moderation.guild).dm_users() is True:
try:
embed = await message_factory(
bot=moderation.bot,
color=await moderation.bot.get_embed_color(moderation.guild.channels[0]),
guild=moderation.guild,
reason=f"Automatic role removal from case #{moderation.id}",
moderation_type=type_registry["removerole"],
moderator=None,
duration=None,
response=None,
case=False,
)
await target.send(embed=embed, file=get_icon(bot=moderation.bot))
except HTTPException:
pass
logger.trace(
"Removed role %s from %s (%s)",
moderation.role_id,
target.name,
target.id,
)
return 1
except (
NotFound,
Forbidden,
HTTPException,
) as e:
logger.error(
"Removing the role %s from user %s failed due to: \n%s",
moderation.role_id,
moderation.target_id,
e,
)
return 0
@type_registry.register(key="removerole")
class RemoveRole(Type): class RemoveRole(Type):
key="removerole" key="removerole"
string="removerole" string="removerole"
verb="removed a role from" verb="removed a role from"
embed_desc="had the "
@classmethod
async def handler(cls, ctx: commands.Context, target: Member, role: Role, silent: bool, duration: str = None, reason: str = None):
addrole_whitelist = await config.guild(ctx.guild).addrole_whitelist()
if not addrole_whitelist:
await ctx.send(
content=error("There are no whitelisted roles set for this server!"),
ephemeral=True,
)
return
if duration is not None:
parsed_time = parse_timedelta(duration)
if parsed_time is None:
await ctx.send(
content=error("Please provide a valid duration!"), ephemeral=True
)
return
else:
parsed_time = None
if role.id not in addrole_whitelist:
await ctx.send(
content=error("That role isn't whitelisted!"), ephemeral=True
)
return
if role.id not in [user_role.id for user_role in target.roles]:
await ctx.send(
content=error(f"{target.mention} does not have this role!"),
ephemeral=True,
)
return
response = await ctx.send(
content=f"{target.mention} has {cls.embed_desc}{role.mention} role removed{' for ' + humanize_timedelta(timedelta=parsed_time) if parsed_time != 'NULL' else ''}!\n**Reason** - `{reason}`"
)
if silent is False:
try:
embed = await message_factory(
bot=ctx.bot,
color=await ctx.embed_color(),
guild=ctx.guild,
moderator=ctx.author,
reason=reason,
moderation_type="removerole",
response=response,
duration=parsed_time,
role=role,
)
await target.send(embed=embed, file=get_icon(ctx.bot))
except HTTPException:
pass
await target.remove_roles(
role,
reason=f"Role removed by {ctx.author.id}{' for ' + humanize_timedelta(timedelta=parsed_time) if parsed_time != 'NULL' else ''} for: {reason}",
)
moderation = await Moderation.log(
bot=ctx.bot,
guild_id=ctx.guild.id,
moderator_id=ctx.author.id,
moderation_type=cls(),
target_type="user",
target_id=target.id,
role_id=role.id,
duration=parsed_time,
reason=reason,
)
await response.edit(
content=f"{target.mention} has {cls.embed_desc}{role.mention} role removed{' for ' + humanize_timedelta(timedelta=parsed_time) if parsed_time != 'NULL' else ''}! (Case `#{moderation.id:,}`)\n**Reason** - `{reason}`",
)
await log(ctx=ctx, moderation_id=moderation.id)
await send_evidenceformat(ctx=ctx, moderation_id=moderation.id)
return cls()
@classmethod
async def duration_edit_handler(cls, interaction: Interaction, old_moderation: Moderation, new_moderation: Moderation) -> bool: # pylint: disable=unused-argument
return True
@classmethod
async def expiry_handler(cls, moderation: Moderation) -> int:
try:
target = await moderation.get_target()
await target.add_roles(
Object(moderation.role_id), reason=f"Automatic role addition from case #{moderation.id}"
)
if await config.guild(moderation.guild).dm_users() is True:
try:
embed = await message_factory(
bot=moderation.bot,
color=await moderation.bot.get_embed_color(moderation.guild.channels[0]),
guild=moderation.guild,
reason=f"Automatic role addition from case #{moderation.id}",
moderation_type=type_registry["addrole"],
moderator=None,
duration=None,
response=None,
case=False,
)
await target.send(embed=embed, file=get_icon(bot=moderation.bot))
except HTTPException:
pass
logger.trace(
"Added role %s to %s (%s)",
moderation.role_id,
target.name,
target.id,
)
return 1
except (
NotFound,
Forbidden,
HTTPException,
) as e:
logger.error(
"Adding the role %s to user %s failed due to: \n%s",
moderation.role_id,
moderation.target_id,
e,
)
return 0
@classmethod
async def resolve_handler(cls, moderation: Moderation, reason: str) -> Tuple[bool, str]:
try:
target = await moderation.get_target()
await target.add_roles(
Object(moderation.role_id), reason=reason
)
if await config.guild(moderation.guild).dm_users() is True:
try:
embed = await resolve_factory(
moderation=moderation,
reason=reason
)
await target.send(embed=embed, file=get_icon(bot=moderation.bot))
except HTTPException:
pass
logger.trace(
"Added role %s to %s (%s)",
moderation.role_id,
target.name,
target.id,
)
return True, ""
except (NotFound, Forbidden, HTTPException) as e:
logger.error(
"Failed to add role %s to user %s (%s)\n%s",
moderation.role_id,
target.name,
target.id,
e,
)
return False, "Failed to add role to user."
@type_registry.register(key="mute")
class Mute(Type): class Mute(Type):
key="mute" key="mute"
string="mute" string="mute"
verb="muted" verb="muted"
@type_registry.register(key="unmute") @classmethod
async def handler(cls, ctx: commands.Context, target: Member, silent: bool, duration: str, reason: str = None):
if target.is_timed_out() is True:
await ctx.send(
error(f"{target.mention} is already muted!"),
allowed_mentions=AllowedMentions(users=False),
ephemeral=True,
)
return
try:
parsed_time = parse_timedelta(duration, maximum=timedelta(days=28))
if parsed_time is None:
await ctx.send(
error("Please provide a valid duration!"), ephemeral=True
)
return
except commands.BadArgument:
await ctx.send(
error("Please provide a duration that is less than 28 days."), ephemeral=True
)
return
await target.timeout(
parsed_time, reason=f"Muted by {ctx.author.id} for: {reason}"
)
response = await ctx.send(
content=f"{target.mention} has been muted for {humanize_timedelta(timedelta=parsed_time)}!\n**Reason** - `{reason}`"
)
if silent is False:
try:
embed = await message_factory(
bot=ctx.bot,
color=await ctx.embed_color(),
guild=ctx.guild,
moderator=ctx.author,
reason=reason,
moderation_type=cls(),
response=response,
duration=parsed_time,
)
await target.send(embed=embed, file=get_icon(ctx.bot))
except HTTPException:
pass
moderation = await Moderation.log(
bot=ctx.bot,
guild_id=ctx.guild.id,
moderator_id=ctx.author.id,
moderation_type=cls(),
target_type="user",
target_id=target.id,
role_id=None,
duration=parsed_time,
reason=reason,
)
await response.edit(
content=f"{target.mention} has been muted for {humanize_timedelta(timedelta=parsed_time)}! (Case `#{moderation.id:,}`)\n**Reason** - `{reason}`"
)
await log(ctx=ctx, moderation_id=moderation.id)
await send_evidenceformat(ctx=ctx, moderation_id=moderation.id)
return cls()
@classmethod
async def resolve_handler(cls, moderation: Moderation, reason: str) -> Tuple[bool, str]:
try:
target = await moderation.guild.fetch_member(moderation.target_id)
except (Forbidden, HTTPException, NotFound):
return False, "User is not in the server, so I cannot unmute them."
if target.is_timed_out() is False:
return True, ""
await target.timeout(None, reason=reason)
if await config.guild(moderation.guild).dm_users() is True:
try:
embed = await resolve_factory(
moderation=moderation,
reason=reason
)
await target.send(embed=embed, file=get_icon(bot=moderation.bot))
except (Forbidden, HTTPException):
pass
return True, ""
@classmethod
async def duration_edit_handler(cls, interaction: Interaction, old_moderation: Moderation, new_moderation: Moderation) -> bool: # pylint: disable=unused-argument
if (
time() - new_moderation.unix_timestamp
) + new_moderation.duration.total_seconds() > 2419200:
await interaction.response.send_message(
content=error(
"Please provide a duration that is less than 28 days from the initial moderation."
),
ephemeral=True
)
return False
try:
member = await interaction.guild.fetch_member(
new_moderation.target_id
)
await member.timeout(
new_moderation.duration,
reason=f"Case #{new_moderation.id:,} edited by {interaction.user.id}",
)
except NotFound:
pass
return True
class Unmute(Type): class Unmute(Type):
key="unmute" key="unmute"
string="unmute" string="unmute"
verb="unmuted" verb="unmuted"
@type_registry.register(key="kick") @classmethod
async def handler(cls, ctx: commands.Context, target: Member, silent: bool, reason: str = None):
if target.is_timed_out() is False:
await ctx.send(
content=error(f"{target.mention} is not muted!"),
allowed_mentions=AllowedMentions(users=False),
ephemeral=True,
)
return
if reason:
await target.timeout(
None, reason=f"{cls.verb.title()} by {ctx.author.id} for: {reason}"
)
else:
await target.timeout(None, reason=f"{cls.verb.title()} by {ctx.author.id}")
reason = "No reason given."
response_message = await ctx.send(
content=f"{target.mention} has been {cls.verb}!\n**Reason** - `{reason}`"
)
if silent is False:
try:
embed = await message_factory(
bot=ctx.bot,
color=await ctx.embed_color(),
guild=ctx.guild,
moderator=ctx.author,
reason=reason,
moderation_type="unmuted",
response=response_message,
)
await target.send(embed=embed, file=get_icon(ctx.bot))
except HTTPException:
pass
moderation = await Moderation.log(
bot=ctx.bot,
guild_id=ctx.guild.id,
moderator_id=ctx.author.id,
moderation_type=cls(),
target_type="user",
target_id=target.id,
role_id=None,
duration=None,
reason=reason,
)
await response_message.edit(
content=f"{target.mention} has been {cls.verb}! (Case `#{moderation.id:,}`)\n**Reason** - `{reason}`"
)
await log(ctx=ctx, moderation_id=moderation.id)
await send_evidenceformat(ctx=ctx, moderation_id=moderation.id)
class Kick(Type): class Kick(Type):
key="kick" key="kick"
string="kick" string="kick"
@ -74,7 +632,7 @@ class Kick(Type):
try: try:
embed = await message_factory( embed = await message_factory(
bot=bot, bot=bot,
color=await bot.get_embed_color(ctx.channel), color=await ctx.embed_color(),
guild=ctx.guild, guild=ctx.guild,
reason=reason, reason=reason,
moderation_type=cls(), moderation_type=cls(),
@ -99,10 +657,24 @@ class Kick(Type):
reason=reason reason=reason
) )
await response_message.edit(content=f"{target.mention} has been {cls.verb}! (Case {inline(f'#{moderation.id}')})\n{bold('Reason:')} {inline(reason)}") await response_message.edit(content=f"{target.mention} has been {cls.verb}! (Case {inline(f'#{moderation.id}')})\n{bold('Reason:')} {inline(reason)}")
await log(ctx, moderation.id) await log(ctx=ctx, moderation_id=moderation.id)
await send_evidenceformat(ctx, moderation.id) await send_evidenceformat(ctx=ctx, moderation_id=moderation.id)
return cls return cls
@type_registry.register(key="ban")
@classmethod
async def resolve_handler(cls, moderation: Moderation, reason: str) -> Tuple[bool, str]:
if await config.guild(moderation.guild).dm_users() is True:
try:
target = await moderation.bot.fetch_user(moderation.target_id)
embed = await resolve_factory(
moderation=moderation,
reason=reason
)
await target.send(embed=embed, file=get_icon(bot=moderation.bot))
except (Forbidden, HTTPException, NotFound):
pass
return True, ""
class Ban(Type): class Ban(Type):
key="ban" key="ban"
string="ban" string="ban"
@ -129,7 +701,7 @@ class Ban(Type):
try: try:
embed = await message_factory( embed = await message_factory(
bot=bot, bot=bot,
color=await bot.get_embed_color(ctx.channel), color=await ctx.embed_color(),
guild=ctx.guild, guild=ctx.guild,
reason=reason, reason=reason,
moderation_type=cls(), moderation_type=cls(),
@ -154,34 +726,35 @@ class Ban(Type):
reason=reason reason=reason
) )
await response_message.edit(content=f"{target.mention} has been {cls.verb}! (Case {inline(f'#{moderation.id}')})\n{bold('Reason:')} {inline(reason)}") await response_message.edit(content=f"{target.mention} has been {cls.verb}! (Case {inline(f'#{moderation.id}')})\n{bold('Reason:')} {inline(reason)}")
await log(ctx, moderation.id) await log(ctx=ctx, moderation_id=moderation.id)
await send_evidenceformat(ctx, moderation.id) await send_evidenceformat(ctx=ctx, moderation_id=moderation.id)
return cls return cls
@classmethod @classmethod
async def resolve_handler(cls, bot: Red, guild: Guild, target: Member | User, reason: str | None = None) -> None: async def resolve_handler(cls, moderation: Moderation, reason: str) -> Tuple[bool, str]:
try: try:
await guild.fetch_ban(user=target) target = await moderation.bot.fetch_user(moderation.target_id)
except NotFound: except (HTTPException, NotFound):
return return False, "Fetching the target failed, so I cannot unban them."
await guild.unban(user=target, reason=reason)
try: try:
embed = await message_factory( await moderation.guild.unban(user=target, reason=reason)
bot=bot, except (NotFound, Forbidden, HTTPException) as e:
color=await bot.get_embed_color(guild.channels[0]), if e == NotFound:
guild=guild, return True, ""
reason=reason, return False, "I do not have permission to unban this user."
moderation_type=Unban(),
moderator=None, if await config.guild(moderation.guild).dm_users() is True:
duration=None, try:
response=None embed = await resolve_factory(
) moderation=moderation,
await target.send(embed=embed, file=get_icon(bot)) reason=reason
except HTTPException: )
pass await target.send(embed=embed, file=get_icon(bot=moderation.bot))
except (Forbidden, HTTPException):
pass
return True, ""
@type_registry.register(key="tempban")
class Tempban(Ban): class Tempban(Ban):
key="tempban" key="tempban"
string="tempban" string="tempban"
@ -216,7 +789,7 @@ class Tempban(Ban):
try: try:
embed = await message_factory( embed = await message_factory(
bot=bot, bot=bot,
color=await bot.get_embed_color(ctx.channel), color=await ctx.embed_color(),
guild=ctx.guild, guild=ctx.guild,
reason=reason, reason=reason,
moderation_type=cls(), moderation_type=cls(),
@ -245,7 +818,54 @@ class Tempban(Ban):
await send_evidenceformat(ctx, moderation.id) await send_evidenceformat(ctx, moderation.id)
return cls return cls
@type_registry.register(key="softban") @classmethod
async def expiry_handler(cls, moderation: Moderation) -> int:
reason = f"Automatic {Unban.string} from case #{moderation.id}"
try:
target = await moderation.get_target()
await moderation.guild.unban(user=target, reason=reason)
if await config.guild(moderation.guild).dm_users() is True:
try:
embed = await message_factory(
bot=moderation.bot,
color=await moderation.bot.get_embed_color(moderation.guild.channels[0]),
guild=moderation.guild,
reason=reason,
moderation_type=type_registry["unban"],
moderator=None,
duration=None,
response=None,
case=False,
)
await target.send(embed=embed, file=get_icon(bot=moderation.bot))
except HTTPException:
pass
logger.trace(
"%s %s (%s) from %s (%s)",
Unban.verb.title(),
target.name,
target.id,
moderation.guild.name,
moderation.guild.id,
)
return 1
except (NotFound, Forbidden, HTTPException) as e:
logger.error(
"Failed to %s %s (%s) from %s (%s)\n%s",
Unban.string,
target.name,
target.id,
moderation.guild.name,
moderation.guild.id,
e,
)
return 0
@classmethod
async def duration_edit_handler(cls, interaction: Interaction, old_moderation: Moderation, new_moderation: Moderation) -> bool: # pylint: disable=unused-argument
return True
class Softban(Type): class Softban(Type):
key="softban" key="softban"
string="softban" string="softban"
@ -272,7 +892,7 @@ class Softban(Type):
try: try:
embed = await message_factory( embed = await message_factory(
bot=bot, bot=bot,
color=await bot.get_embed_color(ctx.channel), color=await ctx.embed_color(),
guild=ctx.guild, guild=ctx.guild,
reason=reason, reason=reason,
moderation_type=cls(), moderation_type=cls(),
@ -302,7 +922,20 @@ class Softban(Type):
await send_evidenceformat(ctx, moderation.id) await send_evidenceformat(ctx, moderation.id)
return cls return cls
@type_registry.register(key="unban") @classmethod
async def resolve_handler(cls, moderation: Moderation, reason: str) -> Tuple[bool, str]:
if await config.guild(moderation.guild).dm_users() is True:
try:
target = await moderation.bot.fetch_user(moderation.target_id)
embed = await resolve_factory(
moderation=moderation,
reason=reason
)
await target.send(embed=embed, file=get_icon(bot=moderation.bot))
except (Forbidden, HTTPException, NotFound):
pass
return True, ""
class Unban(Type): class Unban(Type):
key="unban" key="unban"
string="unban" string="unban"
@ -324,7 +957,7 @@ class Unban(Type):
try: try:
embed = await message_factory( embed = await message_factory(
bot=bot, bot=bot,
color=await bot.get_embed_color(ctx.channel), color=await ctx.embed_color(),
guild=ctx.guild, guild=ctx.guild,
reason=reason, reason=reason,
moderation_type=cls(), moderation_type=cls(),
@ -353,7 +986,6 @@ class Unban(Type):
await send_evidenceformat(ctx, moderation.id) await send_evidenceformat(ctx, moderation.id)
return cls return cls
@type_registry.register(key="slowmode")
class Slowmode(Type): class Slowmode(Type):
key="slowmode" key="slowmode"
string="slowmode" string="slowmode"
@ -393,7 +1025,6 @@ class Slowmode(Type):
await log(ctx, moderation.id) await log(ctx, moderation.id)
return cls return cls
@type_registry.register(key="lockdown")
class Lockdown(Type): class Lockdown(Type):
key="lockdown" key="lockdown"
string="lockdown" string="lockdown"

View file

@ -1,36 +1,82 @@
from typing import Any from abc import ABC
from typing import List, Tuple
from discord import Guild, Member, User from class_registry import AutoRegister, ClassRegistry
from discord import Interaction, Member, User
from discord.abc import Messageable from discord.abc import Messageable
from redbot.core import commands from redbot.core import commands
from redbot.core.bot import Red
type_registry: List['Type'] = ClassRegistry(attr_name='key', unique=True)
#@type_registry.register(key="type") class Type(ABC, AutoRegister(type_registry)):
class Type(object): """This is a base class for moderation types.
key = "type" # this should ALWAYS be the same as the registry key, and should NEVER be localized
Attributes:
key (str): The key to use for this type. This should be unique, as this is how the type is registered internally. Changing this key will break existing cases with this type.
string (str): The string to display for this type.
verb (str): The verb to use for this type.
embed_desc (str): The string to use for embed descriptions.
channel (bool): Whether this type targets channels or users. If this is `true` in a subclass, its overriden handler methods should be typed with `discord.abc.Messageable` instead of `discord.Member | discord.User`.
Properties:
name (str): The string to display for this type. This is the same as the `string` attribute.
"""
key = "type"
string = "type" string = "type"
verb = "typed" verb = "typed"
embed_desc = "been" embed_desc = "been "
channel = False # if this is True, the overridden handler methods should be typed with `discord.abc.Messageable` instead of `discord.Member | discord.User` channel = False
@property
def name(self) -> str:
"""Alias for the `string` attribute."""
return self.string
def __str__(self) -> str: def __str__(self) -> str:
return self.string return self.string
@classmethod @classmethod
async def handler(cls, ctx: commands.Context, target: Member | User | Messageable, silent: bool, **kwargs) -> 'Type': # pylint: disable=unused-argument async def handler(cls, ctx: commands.Context, target: Member | User | Messageable, silent: bool, **kwargs) -> 'Type': # pylint: disable=unused-argument
"""This method should be overridden by any child classes, but should retain the same starting keyword arguments.""" """This method should be overridden by any child classes, but should retain the same starting keyword arguments.
Arguments:
ctx (commands.Context): The context of the command.
target (discord.Member | discord.User | discord.abc.Messageable): The target of the moderation.
silent (bool): Whether details about the moderation should be DM'ed to the target of the moderation.
"""
raise NotImplementedError raise NotImplementedError
@classmethod @classmethod
async def resolve_handler(cls, bot: Red, guild: Guild, target: Member | User | Messageable, reason: str | None = None, **kwargs) -> Any: # pylint: disable=unused-argument async def resolve_handler(cls, moderation, reason: str) -> Tuple[bool, str]: # pylint: disable=unused-argument
"""This method should be overridden by any resolvable child classes, but should retain the same keyword arguments. """This method should be overridden by any resolvable child classes, but should retain the same keyword arguments.
If your moderation type should not be resolvable, do not override this.""" If your moderation type should not be resolvable, do not override this.
Arguments:
moderation (aurora.models.Moderation): The moderation to resolve.
reason (str): The reason for resolving the moderation.
"""
raise NotImplementedError raise NotImplementedError
@classmethod @classmethod
async def expiry_handler(cls, bot: Red, guild: Guild, target: Member | User | Messageable, **kwargs) -> Any: # pylint: disable=unused-argument async def expiry_handler(cls, moderation) -> int: # pylint: disable=unused-argument
"""This method should be overridden by any expirable child classes, but should retain the same keyword arguments. """This method should be overridden by any expirable child classes, but should retain the same keyword arguments and return an integer.
If your moderation type should not expire, do not override this.""" If your moderation type should not expire, do not override this, but also do not set an `end_timestamp` when you log your moderation.
Arguments:
moderation (aurora.models.Moderation): The moderation that is expiring.
"""
raise NotImplementedError
@classmethod
async def duration_edit_handler(cls, interaction: Interaction, old_moderation, new_moderation) -> bool: # pylint: disable=unused-argument
"""This method should be overridden by any child classes with editable durations, but should retain the same keyword arguments and should return True if the duration was successfully modified, or False if it was not.
If your moderation type's duration should not be editable, do not override this.
Arguments:
interaction (discord.Interaction): The interaction that triggered the duration edit.
old_moderation (aurora.models.Moderation): The old moderation, from before the `/edit` command was invoked.
new_moderation (aurora.models.Moderation): The current state of the moderation.
"""
raise NotImplementedError raise NotImplementedError

View file

@ -24,6 +24,7 @@ async def message_factory(
duration: timedelta | None = None, duration: timedelta | None = None,
response: Message | None = None, response: Message | None = None,
role: Role | None = None, role: Role | None = None,
case: bool = True,
) -> Embed: ) -> Embed:
"""This function creates a message from set parameters, meant for contacting the moderated user. """This function creates a message from set parameters, meant for contacting the moderated user.
@ -37,6 +38,7 @@ async def message_factory(
duration (timedelta, optional): The duration of the moderation. Defaults to None. duration (timedelta, optional): The duration of the moderation. Defaults to None.
response (Message, optional): The response message. Defaults to None. response (Message, optional): The response message. Defaults to None.
role (Role, optional): The role that was added or removed. Defaults to None. role (Role, optional): The role that was added or removed. Defaults to None.
case (bool, optional): Whether the message is for a moderation case. Defaults to True.
Returns: Returns:
@ -57,22 +59,9 @@ async def message_factory(
else: else:
embed_duration = "" embed_duration = ""
# if moderation_type.type == "note":
# embed_desc = "received a"
# elif moderation_type.type == "addrole":
# embed_desc = f"received the {role.name} role"
# title = "Role Added"
# verb = ""
# elif moderation_type.type == "removerole":
# embed_desc = f"lost the {role.name} role"
# title = "Role Removed"
# verb = ""
# else:
# embed_desc = "been"
embed = Embed( embed = Embed(
title=str.title(moderation_type.verb), title=str.title(moderation_type.verb),
description=f"You have {moderation_type.embed_desc} {moderation_type.verb}{embed_duration} in {guild_name}.", description=f"You have {moderation_type.embed_desc}{moderation_type.verb}{embed_duration} in {guild_name}.",
color=color, color=color,
timestamp=datetime.now(), timestamp=datetime.now(),
) )
@ -89,14 +78,43 @@ async def message_factory(
else: else:
embed.set_author(name=guild.name) embed.set_author(name=guild.name)
if case:
embed.set_footer(
text=f"Case #{await Moderation.get_next_case_number(bot=bot, guild_id=guild.id):,}",
icon_url="attachment://arrow.png",
)
return embed
async def resolve_factory(moderation: Moderation, reason: str) -> Embed:
"""This function creates a resolved embed from set parameters, meant for contacting the moderated user.
Args:
moderation (aurora.models.Moderation): The moderation object.
reason (str): The reason for resolving the moderation.
Returns: `discord.Embed`
"""
embed = Embed(
title=str.title(moderation.type.name) + " Resolved",
description=f"Your {moderation.type.name} in {moderation.guild.name} has been resolved.",
color=await moderation.bot.get_embed_color(moderation.guild.channels[0]),
timestamp=datetime.now(),
)
embed.add_field(name="Reason", value=f"`{reason}`", inline=False)
if moderation.guild.icon.url is not None:
embed.set_author(name=moderation.guild.name, icon_url=moderation.guild.icon.url)
else:
embed.set_author(name=moderation.guild.name)
embed.set_footer( embed.set_footer(
text=f"Case #{await Moderation.get_next_case_number(bot=bot, guild_id=guild.id):,}", text=f"Case #{moderation.id:,}",
icon_url="attachment://arrow.png", icon_url="attachment://arrow.png",
) )
return embed return embed
async def log_factory( async def log_factory(
ctx: commands.Context, moderation: Moderation, resolved: bool = False ctx: commands.Context, moderation: Moderation, resolved: bool = False
) -> Embed: ) -> Embed:

View file

@ -3,9 +3,8 @@ from typing import List, Union
import discord import discord
from redbot.core import app_commands, commands from redbot.core import app_commands, commands
from ..models.moderation_types import Type from ..models.type import Type, type_registry
from .config import config from .config import config
from .registry import type_registry
from .utils import check_moddable from .utils import check_moddable

View file

@ -1,3 +0,0 @@
from class_registry import ClassRegistry
type_registry = ClassRegistry()