SeaCogs/aurora/aurora.py
cswimr 43e82c9eb5
Some checks failed
Actions / Build Documentation (MkDocs) (pull_request) Successful in 28s
Actions / Lint Code (Ruff & Pylint) (pull_request) Failing after 52s
feat(aurora): minify export json to reduce file sizes
2024-08-22 15:59:23 -04:00

1335 lines
51 KiB
Python

# _____ _
# / ____| (_)
# | (___ ___ __ _ _____ ___ _ __ ___ _ __ ___ ___ _ __
# \___ \ / _ \/ _` / __\ \ /\ / / | '_ ` _ \| '_ ` _ \ / _ \ '__|
# ____) | __/ (_| \__ \\ V V /| | | | | | | | | | | | __/ |
# |_____/ \___|\__,_|___/ \_/\_/ |_|_| |_| |_|_| |_| |_|\___|_|
import json
import logging as py_logging
import os
import time
from datetime import datetime, timedelta, timezone
from math import ceil
from typing import List, Union
import discord
from class_registry.registry import RegistryKeyError
from dateutil.parser import ParserError, parse
from discord.ext import tasks
from redbot.core import app_commands, commands, data_manager
from redbot.core.app_commands import Choice
from redbot.core.bot import Red
from redbot.core.commands.converter import parse_relativedelta, parse_timedelta
from redbot.core.utils.chat_formatting import bold, box, error, humanize_list, humanize_timedelta, warning
from .importers.aurora import ImportAuroraView
from .importers.galacticbot import ImportGalacticBotView
from .menus.addrole import Addrole
from .menus.guild import Guild
from .menus.immune import Immune
from .menus.overrides import Overrides
from .menus.types import Types
from .models.change import Change
from .models.moderation import Moderation
from .models.type import Type, type_registry
from .utilities.config import config, register_config
from .utilities.factory import addrole_embed, case_factory, changes_factory, evidenceformat_factory, guild_embed, immune_embed, overrides_embed, type_embed
from .utilities.json import dump
from .utilities.logger import logger
from .utilities.utils import check_moddable, check_permissions, create_guild_table, log, timedelta_from_relativedelta, timedelta_to_string
class Aurora(commands.Cog):
"""Aurora is a fully-featured moderation system.
It is heavily inspired by GalacticBot, and is designed to be a more user-friendly alternative to Red's core Mod cogs.
This cog stores all of its data in an SQLite database."""
__author__ = ["Seaswimmer"]
__version__ = "3.0.0-indev20"
__documentation__ = "https://seacogs.coastalcommits.com/aurora/"
async def red_delete_data_for_user(self, *, requester, user_id: int):
if requester == "discord_deleted_user":
await config.user_from_id(user_id).clear()
results = await Moderation.execute(query="SHOW TABLES;", return_obj=False)
tables = [table[0] for table in results]
condition = "target_id = %s OR moderator_id = %s;"
for table in tables:
delete_query = f"DELETE FROM {table[0]} WHERE {condition}"
await Moderation.execute(query=delete_query, parameters=(user_id, user_id), return_obj=False)
if requester == "owner":
await config.user_from_id(user_id).clear()
if requester == "user":
await config.user_from_id(user_id).clear()
if requester == "user_strict":
await config.user_from_id(user_id).clear()
else:
logger.warning(
"Invalid requester passed to red_delete_data_for_user: %s", requester
)
def __init__(self, bot: Red) -> None:
super().__init__()
self.bot = bot
self.type_registry = type_registry
register_config(config)
self.handle_expiry.start()
# If we don't override aiosqlite's logging level, it will spam the console with dozens of debug messages per query.
# This is unnecessary because Aurora already logs all of its SQL queries (or at least, most of them),
# and the information that aiosqlite logs is not useful to the bot owner.
# This is a bad solution though as it overrides it for any other cogs that are using aiosqlite too.
# If there's a better solution that you're aware of, please let me know in Discord or in a CoastalCommits issue.
py_logging.getLogger('aiosqlite').setLevel(py_logging.INFO)
def format_help_for_context(self, ctx: commands.Context) -> str:
pre_processed = super().format_help_for_context(ctx) or ""
n = "\n" if "\n\n" not in pre_processed else ""
text = [
f"{pre_processed}{n}",
f"{bold('Cog Version:')} {self.__version__}",
f"{bold('Author:')} {humanize_list(self.__author__)}",
f"{bold('Documentation:')} {self.__documentation__}",
]
return "\n".join(text)
async def cog_load(self):
"""This method prepares the database schema for all of the guilds the bot is currently in."""
guilds: list[discord.Guild] = self.bot.guilds
try:
for guild in guilds:
if not await self.bot.cog_disabled_in_guild(self, guild):
await create_guild_table(guild)
except ConnectionRefusedError:
return
async def cog_unload(self):
self.handle_expiry.cancel()
@staticmethod
async def moderate(ctx: Union[commands.Context, discord.Interaction], target: discord.Member | discord.User | discord.abc.Messageable, silent: bool | None, permissions: List[str], moderation_type: Type | str, **kwargs) -> None | Type:
"""This function is used to moderate users.
It checks if the target can be moderated, then calls the handler method of the moderation type specified.
Args:
ctx (Union[commands.Context, discord.Interaction]): The context of the command. If this is a `discord.Interaction` object, it will be converted to a `commands.Context` object. Additionally, if the interaction orignated from a context menu, the `ctx.author` attribute will be overriden to `interaction.user`.
target (discord.Member, discord.User, discord.abc.Messageable): The target user or channel to moderate.
silent (bool | None): Whether to send the moderation action to the target.
permissions (List[str]): The permissions required to moderate the target.
moderation_type (Type): The moderation type (handler) to use. See `aurora.models.moderation_types` for some examples.
**kwargs: The keyword arguments to pass to the handler method.
"""
if isinstance(moderation_type, str):
moderation_type = type_registry[str.lower(moderation_type)]
if isinstance(ctx, discord.Interaction):
interaction = ctx
ctx = await commands.Context.from_interaction(interaction)
if isinstance(interaction.command, app_commands.ContextMenu):
ctx.author = interaction.user
if not await check_moddable(target=target, ctx=ctx, permissions=permissions, moderation_type=moderation_type):
return
if silent is None:
dm_users = await config.custom("types", ctx.guild.id, moderation_type.key).dm_users()
if dm_users is None:
dm_users = await config.guild(ctx.guild).dm_users()
silent = not dm_users
return await moderation_type.handler(
ctx=ctx,
target=target,
silent=silent,
**kwargs
)
@commands.Cog.listener("on_guild_join")
async def db_generate_on_guild_join(self, guild: discord.Guild):
"""This method prepares the database schema whenever the bot joins a guild."""
if not await self.bot.cog_disabled_in_guild(self, guild):
try:
await create_guild_table(guild)
except ConnectionRefusedError:
return
@commands.Cog.listener("on_member_join")
async def addrole_on_member_join(self, member: discord.Member):
"""This method automatically adds roles to users when they join the server."""
if not await self.bot.cog_disabled_in_guild(self, member.guild):
query = f"""SELECT moderation_id, role_id, reason FROM moderation_{member.guild.id} WHERE target_id = ? AND moderation_type = 'ADDROLE' AND expired = 0 AND resolved = 0;"""
results = await Moderation.execute(query, (member.id,))
for row in results:
role = member.guild.get_role(row[1])
reason = row[2]
await member.add_roles(role, reason=f"Role automatically added on member rejoin for: {reason} (Case #{row[0]:,})")
@commands.Cog.listener("on_audit_log_entry_create")
async def autologger(self, entry: discord.AuditLogEntry):
"""This method automatically logs moderations done by users manually ("right clicks")."""
try:
if not await self.bot.cog_disabled_in_guild(self, entry.guild):
if await config.guild(entry.guild).ignore_other_bots() is True:
if entry.user.bot or entry.target.bot:
return
else:
if entry.user.id == self.bot.user.id:
return
duration = None
if entry.reason:
reason = entry.reason + " (This action was performed without the bot.)"
else:
reason = "This action was performed without the bot."
if entry.action == discord.AuditLogAction.kick:
moderation_type = "KICK"
elif entry.action == discord.AuditLogAction.ban:
moderation_type = "BAN"
elif entry.action == discord.AuditLogAction.unban:
moderation_type = "UNBAN"
elif entry.action == discord.AuditLogAction.member_update:
if entry.after.timed_out_until is not None:
timed_out_until_aware = entry.after.timed_out_until.replace(
tzinfo=timezone.utc
)
duration_datetime = timed_out_until_aware - datetime.now(
tz=timezone.utc
)
minutes = round(duration_datetime.total_seconds() / 60)
duration = timedelta(minutes=minutes)
moderation_type = "MUTE"
else:
moderation_type = "UNMUTE"
else:
return
await Moderation.log(
self.bot,
entry.guild.id,
entry.user.id,
moderation_type,
"USER",
entry.target.id,
None,
duration,
reason,
)
except AttributeError:
return
#######################################################################################################################
### COMMANDS
#######################################################################################################################
@app_commands.command(name="note")
async def note(
self,
interaction: discord.Interaction,
target: discord.User,
reason: str,
silent: bool | None = None,
):
"""Add a note to a user.
Parameters
-----------
target: discord.User
Who are you noting?
reason: str
Why are you noting this user?
silent: bool
Should the user be messaged?"""
await self.moderate(
ctx=interaction,
target=target,
silent=silent,
permissions=["moderate_members"],
moderation_type=type_registry['note'],
reason=reason,
)
@app_commands.command(name="warn")
async def warn(
self,
interaction: discord.Interaction,
target: discord.Member,
reason: str,
silent: bool | None = None,
):
"""Warn a user.
Parameters
-----------
target: discord.Member
Who are you warning?
reason: str
Why are you warning this user?
silent: bool
Should the user be messaged?"""
await self.moderate(
ctx=interaction,
target=target,
silent=silent,
permissions=["moderate_members"],
moderation_type=type_registry['warn'],
reason=reason,
)
@app_commands.command(name="addrole")
async def addrole(
self,
interaction: discord.Interaction,
target: discord.Member,
role: discord.Role,
reason: str,
duration: str | None = None,
silent: bool | None = None,
):
"""Add a role to a user.
Parameters
-----------
target: discord.Member
Who are you adding a role to?
role: discord.Role
What role are you adding to the target?
reason: str
Why are you adding a role to this user?
duration: str
How long are you adding this role for?
silent: bool
Should the user be messaged?"""
await self.moderate(
ctx=interaction,
target=target,
silent=silent,
permissions=["moderate_members", "manage_roles"],
moderation_type=type_registry['addrole'],
reason=reason,
role=role,
duration=duration
)
@app_commands.command(name="removerole")
async def removerole(
self,
interaction: discord.Interaction,
target: discord.Member,
role: discord.Role,
reason: str,
duration: str | None = None,
silent: bool | None = None,
):
"""Remove a role from a user.
Parameters
-----------
target: discord.Member
Who are you removing a role from?
role: discord.Role
What role are you removing from the target?
reason: str
Why are you removing a role from this user?
duration: str
How long are you removing this role for?
silent: bool
Should the user be messaged?"""
await self.moderate(
ctx=interaction,
target=target,
silent=silent,
permissions=["moderate_members", "manage_roles"],
moderation_type=type_registry['removerole'],
reason=reason,
role=role,
duration=duration
)
@app_commands.command(name="mute")
async def mute(
self,
interaction: discord.Interaction,
target: discord.Member,
duration: str,
reason: str,
silent: bool | None = None,
):
"""Mute a user.
Parameters
-----------
target: discord.Member
Who are you unbanning?
duration: str
How long are you muting this user for?
reason: str
Why are you unbanning this user?
silent: bool
Should the user be messaged?"""
await self.moderate(
ctx=interaction,
target=target,
silent=silent,
permissions=["moderate_members"],
moderation_type=type_registry['mute'],
duration=duration,
reason=reason,
)
@app_commands.command(name="unmute")
async def unmute(
self,
interaction: discord.Interaction,
target: discord.Member,
reason: str | None = None,
silent: bool | None = None,
):
"""Unmute a user.
Parameters
-----------
target: discord.user
Who are you unmuting?
reason: str
Why are you unmuting this user?
silent: bool
Should the user be messaged?"""
await self.moderate(
ctx=interaction,
target=target,
silent=silent,
permissions=["moderate_members"],
moderation_type=type_registry['unmute'],
reason=reason,
)
@app_commands.command(name="kick")
async def kick(
self,
interaction: discord.Interaction,
target: discord.Member,
reason: str,
silent: bool | None = None,
):
"""Kick a user.
Parameters
-----------
target: discord.user
Who are you kicking?
reason: str
Why are you kicking this user?
silent: bool
Should the user be messaged?"""
await self.moderate(
ctx=interaction,
target=target,
silent=silent,
permissions=["kick_members"],
moderation_type=type_registry['kick'],
reason=reason,
)
@app_commands.command(name="ban")
@app_commands.choices(
delete_messages=[
Choice(name="None", value=0),
Choice(name="1 Hour", value=3600),
Choice(name="12 Hours", value=43200),
Choice(name="1 Day", value=86400),
Choice(name="3 Days", value=259200),
Choice(name="7 Days", value=604800),
]
)
async def ban(
self,
interaction: discord.Interaction,
target: discord.User,
reason: str,
duration: str | None = None,
delete_messages: Choice[int] | None = None,
silent: bool | None = None,
):
"""Ban a user.
Parameters
-----------
target: discord.user
Who are you banning?
duration: str
How long are you banning this user for?
reason: str
Why are you banning this user?
delete_messages: Choices[int]
How many days of messages to delete?
silent: bool
Should the user be messaged?"""
if duration:
await self.moderate(
ctx=interaction,
target=target,
silent=silent,
permissions=["ban_members"],
moderation_type=type_registry['tempban'],
reason=reason,
duration=duration,
delete_messages=delete_messages,
)
else:
await self.moderate(
ctx=interaction,
target=target,
silent=silent,
permissions=["ban_members"],
moderation_type=type_registry['ban'],
reason=reason,
delete_messages=delete_messages,
)
@app_commands.command(name="unban")
async def unban(
self,
interaction: discord.Interaction,
target: discord.User,
reason: str | None = None,
silent: bool | None = None,
):
"""Unban a user.
Parameters
-----------
target: discord.user
Who are you unbanning?
reason: str
Why are you unbanning this user?
silent: bool
Should the user be messaged?"""
await self.moderate(
ctx=interaction,
target=target,
silent=silent,
permissions=["ban_members"],
moderation_type=type_registry['unban'],
reason=reason,
)
@app_commands.command(name="slowmode")
async def slowmode(
self,
interaction: discord.Interaction,
interval: int,
channel: discord.TextChannel | None = None,
reason: str | None = None,
):
"""Set the slowmode of a channel.
Parameters
-----------
interval: int
The slowmode interval in seconds
channel: discord.TextChannel
The channel to set the slowmode in
reason: str
Why are you setting the slowmode?"""
if channel is None:
channel = interaction.channel
await self.moderate(
ctx=interaction,
target=channel,
silent=True,
permissions=["manage_channel"],
moderation_type=type_registry['slowmode'],
interval=interval,
reason=reason,
)
@app_commands.command(name="history")
async def history(
self,
interaction: discord.Interaction,
target: discord.User | None = None,
moderator: discord.User | None = None,
pagesize: app_commands.Range[int, 1, 20] | None = None,
page: int = 1,
on: str | None = None,
before: str | None = None,
after: str | None = None,
expired: bool | None = None,
types: str | None = None,
ephemeral: bool | None = None,
inline: bool | None = None,
export: bool = False,
):
"""List previous infractions.
Parameters
-----------
target: discord.User
User whose infractions to query, overrides moderator if both are given
moderator: discord.User
Query by moderator
pagesize: app_commands.Range[int, 1, 25]
Amount of infractions to list per page
page: int
Page to select
on: str
List infractions on a certain date
before: str
List infractions before a certain date
after: str
List infractions after a certain date
expired: bool
List expired or unexpired infractions
types: str
List infractions of specific types, comma separated
ephemeral: bool
Hide the command response
inline: bool
Display infractions in a grid arrangement (does not look very good)
export: bool
Exports the server's moderation history to a JSON file"""
if ephemeral is None:
ephemeral = (
await config.user(interaction.user).history_ephemeral()
or await config.guild(interaction.guild).history_ephemeral()
or False
)
if inline is None:
inline = (
await config.user(interaction.user).history_inline()
or await config.guild(interaction.guild).history_inline()
or False
)
if pagesize is None:
if inline is True:
pagesize = (
await config.user(interaction.user).history_inline_pagesize()
or await config.guild(interaction.guild).history_inline_pagesize()
or 6
)
else:
pagesize = (
await config.user(interaction.user).history_pagesize()
or await config.guild(interaction.guild).history_pagesize()
or 5
)
if before and not on:
try:
before = parse(before)
except (ParserError, OverflowError) as e:
if e == ParserError:
await interaction.response.send_message(
content=error("Invalid date format for `before` parameter!"), ephemeral=True
)
return
if e == OverflowError:
await interaction.response.send_message(
content=error("Date is too far in the future!"), ephemeral=True
)
return
if after and not on:
try:
after = parse(after)
except (ParserError, OverflowError) as e:
if e == ParserError:
await interaction.response.send_message(
content=error("Invalid date format for `after` parameter!"), ephemeral=True
)
return
if e == OverflowError:
await interaction.response.send_message(
content=error("Date is too far in the future!"), ephemeral=True
)
return
if on:
try:
on = parse(on)
except (ParserError, OverflowError) as e:
if e == ParserError:
await interaction.response.send_message(
content=error("Invalid date format for `on` parameter!"), ephemeral=True
)
return
if e == OverflowError:
await interaction.response.send_message(
content=error("Date is too far in the future!"), ephemeral=True
)
return
before = datetime.combine(on, datetime.max.time())
after = datetime.combine(on, datetime.min.time())
await interaction.response.defer(ephemeral=ephemeral)
permissions = check_permissions(
interaction.client.user, ["embed_links"], interaction
)
if permissions:
await interaction.followup.send(
error(
f"I do not have the `{permissions}` permission, required for this action."
),
ephemeral=True,
)
return
if export and not types:
types = 'all'
type_list = []
registry_values = type_registry.values()
if types:
for t in types.split(","):
stripped = t.strip().lower()
if stripped == "all":
type_list.clear()
type_list.extend((t for t in registry_values))
break
try:
type_list.append(type_registry[stripped])
except RegistryKeyError:
continue
else:
for t in registry_values:
if await config.custom("types", interaction.guild.id, t.key).show_in_history() is True:
type_list.append(t)
if target:
filename = f"moderation_target_{str(target.id)}_{str(interaction.guild.id)}.json"
moderations = await Moderation.find_by_target(bot=interaction.client, guild_id=interaction.guild.id, target=target.id, before=before, after=after, types=type_list, expired=expired)
elif moderator:
filename = f"moderation_moderator_{str(moderator.id)}_{str(interaction.guild.id)}.json"
moderations = await Moderation.find_by_moderator(bot=interaction.client, guild_id=interaction.guild.id, moderator=moderator.id, before=before, after=after, types=type_list, expired=expired)
else:
filename = f"moderation_{str(interaction.guild.id)}.json"
moderations = await Moderation.get_latest(bot=interaction.client, guild_id=interaction.guild.id, before=before, after=after, types=type_list, expired=expired)
if export:
try:
filepath = (
str(data_manager.cog_data_path(cog_instance=self))
+ str(os.sep)
+ filename
)
with open(filepath, "w", encoding="utf-8") as f:
dump(obj=moderations, fp=f)
await interaction.followup.send(
file=discord.File(
fp=filepath, filename=filename
),
ephemeral=ephemeral,
)
os.remove(filepath)
except json.JSONDecodeError as e:
await interaction.followup.send(
content=error(
"An error occured while exporting the moderation history.\nError:\n"
)
+ box(text=e, lang="py"),
ephemeral=ephemeral,
)
return
case_quantity = len(moderations)
page_quantity = ceil(case_quantity / pagesize)
start_index = (page - 1) * pagesize
end_index = page * pagesize
embed = discord.Embed(color=await self.bot.get_embed_color(interaction.channel))
embed.set_author(icon_url=interaction.guild.icon.url, name="Infraction History")
embed.set_footer(
text=f"Page {page:,}/{page_quantity:,} | {case_quantity:,} Results"
)
memory_dict = {}
for mod in moderations[start_index:end_index]:
if mod.target_id not in memory_dict:
memory_dict.update({
str(mod.target_id): await mod.get_target()
})
target = memory_dict[str(mod.target_id)]
if mod.moderator_id not in memory_dict:
memory_dict.update({
str(mod.moderator_id): await mod.get_moderator()
})
moderator = memory_dict[str(mod.moderator_id)]
field_name = f"Case #{mod.id:,} ({mod.type.string.title()})"
field_value = f"**Target:** `{target.name}` ({target.id})\n**Moderator:** `{moderator.name}` ({moderator.id})"
if len(str(mod.reason)) > 125:
field_value += f"\n**Reason:** `{str(mod.reason)[:125]}...`"
else:
field_value += f"\n**Reason:** `{str(mod.reason)}`"
if mod.duration:
duration_embed = (
f"{humanize_timedelta(timedelta=mod.duration)} | <t:{int(mod.end_timestamp.timestamp())}:R>"
if mod.expired is False
else f"{humanize_timedelta(timedelta=mod.duration)} | Expired"
)
field_value += f"\n**Duration:** {duration_embed}"
field_value += (
f"\n**Timestamp:** <t:{int(mod.timestamp.timestamp())}> | <t:{int(mod.timestamp.timestamp())}:R>"
)
if mod.role_id:
role = await mod.get_role()
field_value += f"\n**Role:** {role.mention} ({role.id})"
if mod.resolved:
field_value += "\n**Resolved:** True"
embed.add_field(name=field_name, value=field_value, inline=inline)
await interaction.followup.send(embed=embed, ephemeral=ephemeral)
@history.autocomplete('types')
async def _history_types(self, interaction: discord.Interaction, current: str) -> List[app_commands.Choice[str]]: # pylint: disable=unused-argument
types: List[str] = sorted(self.type_registry.keys())
choices = []
if current.endswith(","):
for c in current.split(","):
if c in types:
types.remove(c)
for t in types:
choices.append(app_commands.Choice(name=current+t, value=current+t))
else:
choices.append(app_commands.Choice(name="all", value="all"))
for t in types:
if t.startswith(current):
choices.append(app_commands.Choice(name=t, value=t))
return choices[:25]
@app_commands.command(name="resolve")
async def resolve(
self, interaction: discord.Interaction, case: int, reason: str = "No reason provided."
):
"""Resolve a specific case.
Parameters
-----------
case: int
Case number of the case you're trying to resolve
reason: str
Reason for resolving case"""
permissions = check_permissions(
interaction.client.user,
("embed_links", "moderate_members", "ban_members"),
interaction,
)
if permissions:
await interaction.response.send_message(
error(
f"I do not have the `{permissions}` permission, required for this action."
),
ephemeral=True,
)
return
try:
moderation = await Moderation.find_by_id(interaction.client, case, interaction.guild.id)
except ValueError:
await interaction.response.send_message(
content=error(f"Case #{case:,} does not exist!"), ephemeral=True
)
return
if len(moderation.changes) > 25:
await interaction.response.send_message(
content=error(
"Due to limitations with Discord's embed system, you cannot edit a case more than 25 times."
),
ephemeral=True,
)
return
try:
success, msg = await moderation.resolve(interaction.user.id, reason)
except (ValueError, TypeError) as e:
if e == ValueError:
await interaction.response.send_message(
content=error("This case has already been resolved!"), ephemeral=True
)
elif e == TypeError:
await interaction.response.send_message(
content=error("This case type cannot be resolved!"), ephemeral=True
)
embed = await case_factory(
interaction=interaction,
moderation=moderation,
)
await interaction.response.send_message(
content=f"✅ Moderation #{case:,} resolved!\n" + error(f"Resolve handler returned an error message: `{msg}`") if success is False else "", embed=embed
)
ctx = await self.bot.get_context(interaction, cls=commands.Context)
await log(ctx=ctx, moderation_id=case, resolved=True)
@app_commands.command(name="case")
@app_commands.choices(
raw=[
Choice(name="Export as Codeblock", value="codeblock"),
Choice(name="Export as File", value="file"),
]
)
async def case(
self,
interaction: discord.Interaction,
case: int,
ephemeral: bool | None = None,
evidenceformat: bool = False,
changes: bool = False,
raw: Choice[str] | None = None,
):
"""Check the details of a specific case.
Parameters
-----------
case: int
What case are you looking up?
ephemeral: bool
Hide the command response
evidenceformat: bool
Display the evidence format of the case
changes: bool
List the changes made to the case
raw: bool
Export the case to a JSON file or codeblock"""
permissions = check_permissions(
interaction.client.user, ["embed_links"], interaction
)
if permissions:
await interaction.response.send_message(
error(
f"I do not have the `{permissions}` permission, required for this action."
),
ephemeral=True,
)
return
if ephemeral is None:
ephemeral = (
await config.user(interaction.user).history_ephemeral()
or await config.guild(interaction.guild).history_ephemeral()
or False
)
try:
mod = await Moderation.find_by_id(interaction.client, case, interaction.guild.id)
except ValueError:
await interaction.response.send_message(
content=error(f"Case #{case:,} does not exist!"), ephemeral=True
)
return
if raw:
if raw.value == "file" or len(mod.to_json(2)) > 1800:
filename = (
str(data_manager.cog_data_path(cog_instance=self))
+ str(os.sep)
+ f"moderation_{interaction.guild.id}_case_{case}.json"
)
with open(filename, "w", encoding="utf-8") as f:
mod.to_json(2, f)
if raw.value == "codeblock":
content = f"Case #{case:,} exported.\n" + warning(
"Case was too large to export as codeblock, so it has been uploaded as a `.json` file."
)
else:
content = f"Case #{case:,} exported."
await interaction.response.send_message(
content=content,
file=discord.File(
filename,
f"moderation_{interaction.guild.id}_case_{case}.json",
),
ephemeral=ephemeral,
)
os.remove(filename)
return
await interaction.response.send_message(
content=box(mod.to_json(2), 'json'),
ephemeral=ephemeral,
)
return
if changes:
embed = await changes_factory(
interaction=interaction, moderation=mod
)
await interaction.response.send_message(
embed=embed, ephemeral=ephemeral
)
elif evidenceformat:
content = await evidenceformat_factory(moderation=mod)
await interaction.response.send_message(
content=content, ephemeral=ephemeral
)
else:
embed = await case_factory(
interaction=interaction, moderation=mod
)
await interaction.response.send_message(
embed=embed, ephemeral=ephemeral
)
return
@app_commands.command(name="edit")
async def edit(
self,
interaction: discord.Interaction,
case: int,
reason: str | None = None,
duration: str | None = None,
):
"""Edit the reason of a specific case.
Parameters
-----------
case: int
What case are you editing?
reason: str
What is the new reason?
duration: str
What is the new duration? Does not reapply the moderation if it has already expired.
"""
permissions = check_permissions(
interaction.client.user, ["embed_links"], interaction
)
if permissions:
await interaction.response.send_message(
error(
f"I do not have the `{permissions}` permission, required for this action."
),
ephemeral=True,
)
return
try:
moderation = await Moderation.find_by_id(interaction.client, case, interaction.guild.id)
old_moderation = moderation.model_copy()
except ValueError:
await interaction.response.send_message(
content=error(f"Case #{case:,} does not exist!"), ephemeral=True
)
return
if len(moderation.changes) > 25:
return await interaction.response.send_message(
content=error(
"Due to limitations with Discord's embed system, you cannot edit a case more than 25 times."
),
ephemeral=True,
)
if duration:
try:
parsed_time = parse_relativedelta(argument=duration)
if parsed_time is None:
raise commands.BadArgument()
moderation.duration = timedelta_from_relativedelta(relativedelta=parsed_time)
except (commands.BadArgument, ValueError):
return await interaction.response.send_message(
error("Please provide a valid duration!"), ephemeral=True
)
moderation.end_timestamp = moderation.timestamp + timedelta(seconds=moderation.duration.total_seconds())
try:
success = await moderation.type.duration_edit_handler(interaction=interaction, old_moderation=old_moderation, new_moderation=moderation)
except NotImplementedError:
return await interaction.response.send_message(
error("This case type does not support duration editing!"), ephemeral=True
)
if not success:
return
if reason:
moderation.reason = reason
if not reason and not duration:
return await interaction.response.send_message(
error("Please provide a new reason or duration to edit this case!"), ephemeral=True
)
if not moderation.changes:
moderation.changes.append(Change.from_dict(interaction.client, {
"type": "ORIGINAL",
"timestamp": old_moderation.timestamp,
"reason": old_moderation.reason,
"user_id": old_moderation.moderator_id,
"duration": timedelta_to_string(old_moderation.duration) if old_moderation.duration else None,
"end_timestamp": old_moderation.end_timestamp,
}))
moderation.changes.append(Change.from_dict(interaction.client, {
"type": "EDIT",
"timestamp": int(time.time()),
"reason": reason if reason else None,
"user_id": interaction.user.id,
"duration": timedelta_to_string(moderation.duration) if duration else None,
"end_timestamp": moderation.end_timestamp if duration else None,
}))
await moderation.update()
embed = await case_factory(interaction=interaction, moderation=moderation)
await interaction.response.send_message(
content=f"✅ Moderation #{case:,} edited!",
embed=embed,
ephemeral=True,
)
await log(await self.bot.get_context(interaction), case)
return
@tasks.loop(minutes=1)
async def handle_expiry(self):
await self.bot.wait_until_red_ready()
current_time = time.time()
global_unban_num = 0
global_addrole_num = 0
global_removerole_num = 0
global_other_num = 0
guilds: list[discord.Guild] = self.bot.guilds
for guild in guilds:
if not await self.bot.cog_disabled_in_guild(self, guild):
time_per_guild = time.time()
query = f"SELECT * FROM moderation_{guild.id} WHERE end_timestamp IS NOT NULL AND end_timestamp <= ? AND expired = 0"
moderations = await Moderation.execute(bot=self.bot, guild_id=guild.id, query=query, parameters=(time.time(),))
unban_num = 0
removerole_num = 0
addrole_num = 0
other_num = 0
for moderation in moderations:
try:
num = await moderation.type.expiry_handler(moderation)
except NotImplementedError:
logger.warning("Expiry handler not implemented for expirable moderation type %s", moderation.type.key)
continue
match moderation.type.key:
case "tempban":
unban_num += num
case "addrole":
removerole_num += num
case "removerole":
addrole_num += num
case _:
other_num += num if isinstance(num, int) else 0
expiry_query = f"UPDATE `moderation_{guild.id}` SET expired = 1 WHERE (end_timestamp IS NOT NULL AND end_timestamp <= ? AND expired = 0) OR (expired = 0 AND resolved = 1);"
await Moderation.execute(bot=self.bot, guild_id=guild.id, query=expiry_query, parameters=(time.time(),), return_obj=False)
per_guild_completion_time = (time.time() - time_per_guild) * 1000
logger.debug(
"Completed expiry loop for %s (%s) in %sms with %s users unbanned, %s roles added, and %s roles removed (%s other cases expired)",
guild.name,
guild.id,
f"{per_guild_completion_time:.6f}",
unban_num,
addrole_num,
removerole_num,
other_num
)
global_unban_num = global_unban_num + unban_num
global_addrole_num = global_addrole_num + addrole_num
global_removerole_num = global_removerole_num + removerole_num
global_other_num = global_other_num + other_num
completion_time = (time.time() - current_time) * 1000
logger.debug(
"Completed expiry loop in %sms with %s users unbanned, %s roles added, and %s roles removed (%s other cases expired)",
f"{completion_time:.6f}",
global_unban_num,
global_addrole_num,
global_removerole_num,
global_other_num
)
########################################################################################################################
### Configuration Commands #
########################################################################################################################
@commands.group(autohelp=True, aliases=["moderation", "mod"])
async def aurora(self, ctx: commands.Context):
"""Settings and miscellaneous commands for Aurora."""
@aurora.group(autohelp=True, name="settings", aliases=["config", "options", "set"])
async def aurora_settings(self, ctx: commands.Context):
"""Configure Aurora's settings."""
@aurora_settings.command(name="overrides", aliases=["override", "user"])
async def aurora_settings_overrides(self, ctx: commands.Context):
"""Manage Aurora's user overriddable settings."""
msg = await ctx.send(embed=await overrides_embed(ctx))
await msg.edit(view=Overrides(ctx, msg, 60))
@aurora_settings.command(name="guild", aliases=["server"])
@commands.admin_or_permissions(manage_guild=True)
@commands.guild_only()
async def aurora_settings_guild(self, ctx: commands.Context):
"""Manage Aurora's guild settings."""
msg = await ctx.send(embed=await guild_embed(ctx))
await msg.edit(view=Guild(ctx, msg, 60))
@aurora_settings.command(name="type")
@commands.admin_or_permissions(manage_guild=True)
@commands.guild_only()
async def aurora_settings_type(self, ctx: commands.Context, moderation_type: str):
"""Manage configuration options for specific moderation types.
See [the documentation](https://seacogs.coastalcommits.com/Aurora/Types) for a list of built-in moderation types, or run this command with a junk argument (`awasd` or something) to see a list of valid types."""
try:
registered_type = type_registry.get(moderation_type)
except RegistryKeyError:
types = "`, `".join(type_registry.keys())
await ctx.send(error(f"`{moderation_type}` is not a valid moderation type.\nValid types are:\n`{types}`"))
return
msg = await ctx.send(embed=await type_embed(ctx, registered_type))
await msg.edit(view=Types(ctx, msg, registered_type))
@aurora_settings.command(name="addrole", aliases=["removerole"])
@commands.admin_or_permissions(manage_guild=True)
@commands.guild_only()
async def aurora_settings_addrole(self, ctx: commands.Context):
"""Manage the addrole whitelist.
Roles added to this list are also applied to `/removerole`."""
msg = await ctx.send(embed=await addrole_embed(ctx))
await msg.edit(view=Addrole(ctx, msg, 60))
@aurora_settings.command(name="immunity")
@commands.admin_or_permissions(manage_guild=True)
@commands.guild_only()
async def aurora_settings_immunity(self, ctx: commands.Context):
"""Manage the immunity whitelist."""
msg = await ctx.send(embed=await immune_embed(ctx))
await msg.edit(view=Immune(ctx, msg, 60))
@aurora.group(autohelp=True, name="import")
@commands.admin()
@commands.guild_only()
async def aurora_import(self, ctx: commands.Context):
"""Import moderation history from other bots."""
@aurora_import.command(name="aurora")
@commands.admin()
async def aurora_import_aurora(self, ctx: commands.Context):
"""Import moderation history from another bot using Aurora."""
if (
ctx.message.attachments
and ctx.message.attachments[0].content_type
== "application/json; charset=utf-8"
):
file = await ctx.message.attachments[0].read()
data: list[dict] = sorted(json.loads(file), key=lambda x: x["moderation_id"])
message = await ctx.send(
warning(
"Are you sure you want to import moderations from another bot?\n**This will overwrite any moderations that already exist in this guild's moderation table.**\n*The import process will block the rest of your bot until it is complete.*"
)
)
await message.edit(view=ImportAuroraView(60, ctx, message, data))
else:
await ctx.send(error("Please provide a valid Aurora export file."))
@aurora_import.command(name="galacticbot")
@commands.admin()
async def aurora_import_galacticbot(self, ctx: commands.Context):
"""Import moderation history from GalacticBot."""
if (
ctx.message.attachments
and ctx.message.attachments[0].content_type
== "application/json; charset=utf-8"
):
message = await ctx.send(
warning(
"Are you sure you want to import GalacticBot moderations?\n**This will overwrite any moderations that already exist in this guild's moderation table.**\n*The import process will block the rest of your bot until it is complete.*"
)
)
await message.edit(view=ImportGalacticBotView(60, ctx, message))
else:
await ctx.send(
error("Please provide a valid GalacticBot moderation export file.")
)
@aurora.group(autohelp=True, name="convert")
async def aurora_convert(self, ctx: commands.Context):
"""Convert strings to various Python objects."""
@aurora_convert.command(aliases=["dt"])
async def datetime(self, ctx: commands.Context, *, date: str) -> None:
"""Convert a string to a datetime object.
This command converts a date to a [`datetime`](https://docs.python.org/3/library/datetime.html#datetime.datetime) Python object.
**Example usage**
`[p]aurora datetime 08/20/2024`
**Output**
`2024-08-20 12:00:00`"""
try:
parsed_date = parse(date)
await ctx.send(f"`{parsed_date}`")
except (ParserError, OverflowError) as e:
if e == ParserError:
await ctx.send(error("Invalid date format!"))
if e == OverflowError:
await ctx.send(error("Date is too far in the future!"))
@aurora_convert.command(aliases=["td"])
async def timedelta(self, ctx: commands.Context, *, duration: str) -> None:
"""Convert a string to a timedelta.
This command converts a duration to a [`timedelta`](https://docs.python.org/3/library/datetime.html#datetime.timedelta) Python object.
You cannot convert years or months as they are not fixed units. Use `[p]aurora relativedelta` for that.
**Example usage**
`[p]aurora timedelta 1 day 15hr 82 minutes 52s`
**Output**
`1 day, 16:22:52`"""
parsed_time = parse_timedelta(duration)
if parsed_time is None:
await ctx.send(error("Please provide a convertible value!"))
return
await ctx.send(f"`{parsed_time}`")
@aurora_convert.command(aliases=["rd"])
async def relativedelta(self, ctx: commands.Context, *, duration: str) -> None:
"""Convert a string to a relativedelta.
This command converts a duration to a [`relativedelta`](https://dateutil.readthedocs.io/en/stable/relativedelta.html) Python object.
**Example usage**
`[p]aurora relativedelta 3 years 1 day 15hr 82 minutes 52s`
**Output**
`relativedelta(years=+3, days=+1, hours=+15, minutes=+82, seconds=+52)`"""
parsed_time = parse_relativedelta(duration)
if parsed_time is None:
await ctx.send(error("Please provide a convertible value!"))
return
await ctx.send(f"`{parsed_time}`")