1297 lines
49 KiB
Python
1297 lines
49 KiB
Python
# _____ _
|
|
# / ____| (_)
|
|
# | (___ ___ __ _ _____ ___ _ __ ___ _ __ ___ ___ _ __
|
|
# \___ \ / _ \/ _` / __\ \ /\ / / | '_ ` _ \| '_ ` _ \ / _ \ '__|
|
|
# ____) | __/ (_| \__ \\ V V /| | | | | | | | | | | | __/ |
|
|
# |_____/ \___|\__,_|___/ \_/\_/ |_|_| |_| |_|_| |_| |_|\___|_|
|
|
|
|
import json
|
|
import logging as py_logging
|
|
import os
|
|
import time
|
|
from datetime import datetime, timedelta, timezone
|
|
from math import ceil
|
|
from typing import List
|
|
|
|
import discord
|
|
from class_registry.registry import RegistryKeyError
|
|
from dateutil.parser import ParserError, parse
|
|
from discord.ext import tasks
|
|
from redbot.core import app_commands, commands, data_manager
|
|
from redbot.core.app_commands import Choice
|
|
from redbot.core.bot import Red
|
|
from redbot.core.commands.converter import parse_relativedelta, parse_timedelta
|
|
from redbot.core.utils.chat_formatting import bold, box, error, humanize_list, humanize_timedelta, warning
|
|
|
|
from .importers.aurora import ImportAuroraView
|
|
from .importers.galacticbot import ImportGalacticBotView
|
|
from .menus.addrole import Addrole
|
|
from .menus.guild import Guild
|
|
from .menus.immune import Immune
|
|
from .menus.overrides import Overrides
|
|
from .menus.types import Types
|
|
from .models.change import Change
|
|
from .models.moderation import Moderation
|
|
from .models.type import type_registry
|
|
from .utilities.config import config, register_config
|
|
from .utilities.factory import addrole_embed, case_factory, changes_factory, evidenceformat_factory, guild_embed, immune_embed, overrides_embed, type_embed
|
|
from .utilities.json import dump
|
|
from .utilities.logger import logger
|
|
from .utilities.moderate import moderate
|
|
from .utilities.utils import check_permissions, create_guild_table, log
|
|
|
|
|
|
class Aurora(commands.Cog):
|
|
"""Aurora is a fully-featured moderation system.
|
|
It is heavily inspired by GalacticBot, and is designed to be a more user-friendly alternative to Red's core Mod cogs.
|
|
This cog stores all of its data in an SQLite database."""
|
|
|
|
__author__ = ["Seaswimmer"]
|
|
__version__ = "3.0.0-indev2"
|
|
__documentation__ = "https://seacogs.coastalcommits.com/aurora/"
|
|
|
|
async def red_delete_data_for_user(self, *, requester, user_id: int):
|
|
if requester == "discord_deleted_user":
|
|
await config.user_from_id(user_id).clear()
|
|
|
|
results = await Moderation.execute(query="SHOW TABLES;", return_obj=False)
|
|
tables = [table[0] for table in results]
|
|
|
|
condition = "target_id = %s OR moderator_id = %s;"
|
|
|
|
for table in tables:
|
|
delete_query = f"DELETE FROM {table[0]} WHERE {condition}"
|
|
await Moderation.execute(query=delete_query, parameters=(user_id, user_id), return_obj=False)
|
|
|
|
if requester == "owner":
|
|
await config.user_from_id(user_id).clear()
|
|
if requester == "user":
|
|
await config.user_from_id(user_id).clear()
|
|
if requester == "user_strict":
|
|
await config.user_from_id(user_id).clear()
|
|
else:
|
|
logger.warning(
|
|
"Invalid requester passed to red_delete_data_for_user: %s", requester
|
|
)
|
|
|
|
def __init__(self, bot: Red) -> None:
|
|
super().__init__()
|
|
self.bot = bot
|
|
self.type_registry = type_registry
|
|
register_config(config)
|
|
self.handle_expiry.start()
|
|
# If we don't override aiosqlite's logging level, it will spam the console with dozens of debug messages per query.
|
|
# This is unnecessary because Aurora already logs all of its SQL queries (or at least, most of them),
|
|
# and the information that aiosqlite logs is not useful to the bot owner.
|
|
# This is a bad solution though as it overrides it for any other cogs that are using aiosqlite too.
|
|
# If there's a better solution that you're aware of, please let me know in Discord or in a CoastalCommits issue.
|
|
py_logging.getLogger('aiosqlite').setLevel(py_logging.INFO)
|
|
|
|
def format_help_for_context(self, ctx: commands.Context) -> str:
|
|
pre_processed = super().format_help_for_context(ctx) or ""
|
|
n = "\n" if "\n\n" not in pre_processed else ""
|
|
text = [
|
|
f"{pre_processed}{n}",
|
|
f"{bold('Cog Version:')} {self.__version__}",
|
|
f"{bold('Author:')} {humanize_list(self.__author__)}",
|
|
f"{bold('Documentation:')} {self.__documentation__}",
|
|
]
|
|
return "\n".join(text)
|
|
|
|
async def cog_load(self):
|
|
"""This method prepares the database schema for all of the guilds the bot is currently in."""
|
|
guilds: list[discord.Guild] = self.bot.guilds
|
|
|
|
try:
|
|
for guild in guilds:
|
|
if not await self.bot.cog_disabled_in_guild(self, guild):
|
|
await create_guild_table(guild)
|
|
|
|
except ConnectionRefusedError:
|
|
return
|
|
|
|
async def cog_unload(self):
|
|
self.handle_expiry.cancel()
|
|
|
|
@commands.Cog.listener("on_guild_join")
|
|
async def db_generate_on_guild_join(self, guild: discord.Guild):
|
|
"""This method prepares the database schema whenever the bot joins a guild."""
|
|
if not await self.bot.cog_disabled_in_guild(self, guild):
|
|
try:
|
|
await create_guild_table(guild)
|
|
except ConnectionRefusedError:
|
|
return
|
|
|
|
@commands.Cog.listener("on_member_join")
|
|
async def addrole_on_member_join(self, member: discord.Member):
|
|
"""This method automatically adds roles to users when they join the server."""
|
|
if not await self.bot.cog_disabled_in_guild(self, member.guild):
|
|
query = f"""SELECT moderation_id, role_id, reason FROM moderation_{member.guild.id} WHERE target_id = ? AND moderation_type = 'ADDROLE' AND expired = 0 AND resolved = 0;"""
|
|
results = await Moderation.execute(query, (member.id,))
|
|
for row in results:
|
|
role = member.guild.get_role(row[1])
|
|
reason = row[2]
|
|
await member.add_roles(role, reason=f"Role automatically added on member rejoin for: {reason} (Case #{row[0]:,})")
|
|
|
|
@commands.Cog.listener("on_audit_log_entry_create")
|
|
async def autologger(self, entry: discord.AuditLogEntry):
|
|
"""This method automatically logs moderations done by users manually ("right clicks")."""
|
|
try:
|
|
if not await self.bot.cog_disabled_in_guild(self, entry.guild):
|
|
if await config.guild(entry.guild).ignore_other_bots() is True:
|
|
if entry.user.bot or entry.target.bot:
|
|
return
|
|
else:
|
|
if entry.user.id == self.bot.user.id:
|
|
return
|
|
|
|
duration = None
|
|
|
|
if entry.reason:
|
|
reason = entry.reason + " (This action was performed without the bot.)"
|
|
|
|
else:
|
|
reason = "This action was performed without the bot."
|
|
|
|
if entry.action == discord.AuditLogAction.kick:
|
|
moderation_type = "KICK"
|
|
|
|
elif entry.action == discord.AuditLogAction.ban:
|
|
moderation_type = "BAN"
|
|
|
|
elif entry.action == discord.AuditLogAction.unban:
|
|
moderation_type = "UNBAN"
|
|
|
|
elif entry.action == discord.AuditLogAction.member_update:
|
|
if entry.after.timed_out_until is not None:
|
|
timed_out_until_aware = entry.after.timed_out_until.replace(
|
|
tzinfo=timezone.utc
|
|
)
|
|
duration_datetime = timed_out_until_aware - datetime.now(
|
|
tz=timezone.utc
|
|
)
|
|
minutes = round(duration_datetime.total_seconds() / 60)
|
|
duration = timedelta(minutes=minutes)
|
|
moderation_type = "MUTE"
|
|
else:
|
|
moderation_type = "UNMUTE"
|
|
else:
|
|
return
|
|
|
|
await Moderation.log(
|
|
self.bot,
|
|
entry.guild.id,
|
|
entry.user.id,
|
|
moderation_type,
|
|
"USER",
|
|
entry.target.id,
|
|
None,
|
|
duration,
|
|
reason,
|
|
)
|
|
except AttributeError:
|
|
return
|
|
|
|
#######################################################################################################################
|
|
### COMMANDS
|
|
#######################################################################################################################
|
|
|
|
@app_commands.command(name="note")
|
|
async def note(
|
|
self,
|
|
interaction: discord.Interaction,
|
|
target: discord.User,
|
|
reason: str,
|
|
silent: bool | None = None,
|
|
):
|
|
"""Add a note to a user.
|
|
|
|
Parameters
|
|
-----------
|
|
target: discord.User
|
|
Who are you noting?
|
|
reason: str
|
|
Why are you noting this user?
|
|
silent: bool
|
|
Should the user be messaged?"""
|
|
await moderate(
|
|
ctx=interaction,
|
|
target=target,
|
|
silent=silent,
|
|
permissions=["moderate_members"],
|
|
moderation_type=type_registry['note'],
|
|
reason=reason,
|
|
)
|
|
|
|
@app_commands.command(name="warn")
|
|
async def warn(
|
|
self,
|
|
interaction: discord.Interaction,
|
|
target: discord.Member,
|
|
reason: str,
|
|
silent: bool | None = None,
|
|
):
|
|
"""Warn a user.
|
|
|
|
Parameters
|
|
-----------
|
|
target: discord.Member
|
|
Who are you warning?
|
|
reason: str
|
|
Why are you warning this user?
|
|
silent: bool
|
|
Should the user be messaged?"""
|
|
await moderate(
|
|
ctx=interaction,
|
|
target=target,
|
|
silent=silent,
|
|
permissions=["moderate_members"],
|
|
moderation_type=type_registry['warn'],
|
|
reason=reason,
|
|
)
|
|
|
|
@app_commands.command(name="addrole")
|
|
async def addrole(
|
|
self,
|
|
interaction: discord.Interaction,
|
|
target: discord.Member,
|
|
role: discord.Role,
|
|
reason: str,
|
|
duration: str | None = None,
|
|
silent: bool | None = None,
|
|
):
|
|
"""Add a role to a user.
|
|
|
|
Parameters
|
|
-----------
|
|
target: discord.Member
|
|
Who are you adding a role to?
|
|
role: discord.Role
|
|
What role are you adding to the target?
|
|
reason: str
|
|
Why are you adding a role to this user?
|
|
duration: str
|
|
How long are you adding this role for?
|
|
silent: bool
|
|
Should the user be messaged?"""
|
|
await moderate(
|
|
ctx=interaction,
|
|
target=target,
|
|
silent=silent,
|
|
permissions=["moderate_members", "manage_roles"],
|
|
moderation_type=type_registry['addrole'],
|
|
reason=reason,
|
|
role=role,
|
|
duration=duration
|
|
)
|
|
|
|
@app_commands.command(name="removerole")
|
|
async def removerole(
|
|
self,
|
|
interaction: discord.Interaction,
|
|
target: discord.Member,
|
|
role: discord.Role,
|
|
reason: str,
|
|
duration: str | None = None,
|
|
silent: bool | None = None,
|
|
):
|
|
"""Remove a role from a user.
|
|
|
|
Parameters
|
|
-----------
|
|
target: discord.Member
|
|
Who are you removing a role from?
|
|
role: discord.Role
|
|
What role are you removing from the target?
|
|
reason: str
|
|
Why are you removing a role from this user?
|
|
duration: str
|
|
How long are you removing this role for?
|
|
silent: bool
|
|
Should the user be messaged?"""
|
|
await moderate(
|
|
ctx=interaction,
|
|
target=target,
|
|
silent=silent,
|
|
permissions=["moderate_members", "manage_roles"],
|
|
moderation_type=type_registry['removerole'],
|
|
reason=reason,
|
|
role=role,
|
|
duration=duration
|
|
)
|
|
|
|
@app_commands.command(name="mute")
|
|
async def mute(
|
|
self,
|
|
interaction: discord.Interaction,
|
|
target: discord.Member,
|
|
duration: str,
|
|
reason: str,
|
|
silent: bool | None = None,
|
|
):
|
|
"""Mute a user.
|
|
|
|
Parameters
|
|
-----------
|
|
target: discord.Member
|
|
Who are you unbanning?
|
|
duration: str
|
|
How long are you muting this user for?
|
|
reason: str
|
|
Why are you unbanning this user?
|
|
silent: bool
|
|
Should the user be messaged?"""
|
|
await moderate(
|
|
ctx=interaction,
|
|
target=target,
|
|
silent=silent,
|
|
permissions=["moderate_members"],
|
|
moderation_type=type_registry['mute'],
|
|
duration=duration,
|
|
reason=reason,
|
|
)
|
|
|
|
@app_commands.command(name="unmute")
|
|
async def unmute(
|
|
self,
|
|
interaction: discord.Interaction,
|
|
target: discord.Member,
|
|
reason: str | None = None,
|
|
silent: bool | None = None,
|
|
):
|
|
"""Unmute a user.
|
|
|
|
Parameters
|
|
-----------
|
|
target: discord.user
|
|
Who are you unmuting?
|
|
reason: str
|
|
Why are you unmuting this user?
|
|
silent: bool
|
|
Should the user be messaged?"""
|
|
await moderate(
|
|
ctx=interaction,
|
|
target=target,
|
|
silent=silent,
|
|
permissions=["moderate_members"],
|
|
moderation_type=type_registry['unmute'],
|
|
reason=reason,
|
|
)
|
|
|
|
@app_commands.command(name="kick")
|
|
async def kick(
|
|
self,
|
|
interaction: discord.Interaction,
|
|
target: discord.Member,
|
|
reason: str,
|
|
silent: bool | None = None,
|
|
):
|
|
"""Kick a user.
|
|
|
|
Parameters
|
|
-----------
|
|
target: discord.user
|
|
Who are you kicking?
|
|
reason: str
|
|
Why are you kicking this user?
|
|
silent: bool
|
|
Should the user be messaged?"""
|
|
await moderate(
|
|
ctx=interaction,
|
|
target=target,
|
|
silent=silent,
|
|
permissions=["kick_members"],
|
|
moderation_type=type_registry['kick'],
|
|
reason=reason,
|
|
)
|
|
|
|
@app_commands.command(name="ban")
|
|
@app_commands.choices(
|
|
delete_messages=[
|
|
Choice(name="None", value=0),
|
|
Choice(name="1 Hour", value=3600),
|
|
Choice(name="12 Hours", value=43200),
|
|
Choice(name="1 Day", value=86400),
|
|
Choice(name="3 Days", value=259200),
|
|
Choice(name="7 Days", value=604800),
|
|
]
|
|
)
|
|
async def ban(
|
|
self,
|
|
interaction: discord.Interaction,
|
|
target: discord.User,
|
|
reason: str,
|
|
duration: str | None = None,
|
|
delete_messages: Choice[int] | None = None,
|
|
silent: bool | None = None,
|
|
):
|
|
"""Ban a user.
|
|
|
|
Parameters
|
|
-----------
|
|
target: discord.user
|
|
Who are you banning?
|
|
duration: str
|
|
How long are you banning this user for?
|
|
reason: str
|
|
Why are you banning this user?
|
|
delete_messages: Choices[int]
|
|
How many days of messages to delete?
|
|
silent: bool
|
|
Should the user be messaged?"""
|
|
if duration:
|
|
await moderate(
|
|
ctx=interaction,
|
|
target=target,
|
|
silent=silent,
|
|
permissions=["ban_members"],
|
|
moderation_type=type_registry['tempban'],
|
|
reason=reason,
|
|
duration=duration,
|
|
delete_messages=delete_messages,
|
|
)
|
|
else:
|
|
await moderate(
|
|
ctx=interaction,
|
|
target=target,
|
|
silent=silent,
|
|
permissions=["ban_members"],
|
|
moderation_type=type_registry['ban'],
|
|
reason=reason,
|
|
delete_messages=delete_messages,
|
|
)
|
|
|
|
@app_commands.command(name="unban")
|
|
async def unban(
|
|
self,
|
|
interaction: discord.Interaction,
|
|
target: discord.User,
|
|
reason: str | None = None,
|
|
silent: bool | None = None,
|
|
):
|
|
"""Unban a user.
|
|
|
|
Parameters
|
|
-----------
|
|
target: discord.user
|
|
Who are you unbanning?
|
|
reason: str
|
|
Why are you unbanning this user?
|
|
silent: bool
|
|
Should the user be messaged?"""
|
|
await moderate(
|
|
ctx=interaction,
|
|
target=target,
|
|
silent=silent,
|
|
permissions=["ban_members"],
|
|
moderation_type=type_registry['unban'],
|
|
reason=reason,
|
|
)
|
|
|
|
@app_commands.command(name="slowmode")
|
|
async def slowmode(
|
|
self,
|
|
interaction: discord.Interaction,
|
|
interval: int,
|
|
channel: discord.TextChannel | None = None,
|
|
reason: str | None = None,
|
|
):
|
|
"""Set the slowmode of a channel.
|
|
|
|
Parameters
|
|
-----------
|
|
interval: int
|
|
The slowmode interval in seconds
|
|
channel: discord.TextChannel
|
|
The channel to set the slowmode in
|
|
reason: str
|
|
Why are you setting the slowmode?"""
|
|
if channel is None:
|
|
channel = interaction.channel
|
|
|
|
await moderate(
|
|
ctx=interaction,
|
|
target=channel,
|
|
silent=True,
|
|
permissions=["manage_channel"],
|
|
moderation_type=type_registry['slowmode'],
|
|
interval=interval,
|
|
reason=reason,
|
|
)
|
|
|
|
@app_commands.command(name="history")
|
|
async def history(
|
|
self,
|
|
interaction: discord.Interaction,
|
|
target: discord.User | None = None,
|
|
moderator: discord.User | None = None,
|
|
pagesize: app_commands.Range[int, 1, 20] | None = None,
|
|
page: int = 1,
|
|
on: str | None = None,
|
|
before: str | None = None,
|
|
after: str | None = None,
|
|
types: str | None = None,
|
|
ephemeral: bool | None = None,
|
|
inline: bool | None = None,
|
|
export: bool = False,
|
|
):
|
|
"""List previous infractions.
|
|
|
|
Parameters
|
|
-----------
|
|
target: discord.User
|
|
User whose infractions to query, overrides moderator if both are given
|
|
moderator: discord.User
|
|
Query by moderator
|
|
pagesize: app_commands.Range[int, 1, 25]
|
|
Amount of infractions to list per page
|
|
page: int
|
|
Page to select
|
|
on: str
|
|
List infractions on a certain date
|
|
before: str
|
|
List infractions before a certain date
|
|
after: str
|
|
List infractions after a certain date
|
|
types: bool
|
|
List infractions of specific types, comma separated
|
|
ephemeral: bool
|
|
Hide the command response
|
|
inline: bool
|
|
Display infractions in a grid arrangement (does not look very good)
|
|
export: bool
|
|
Exports the server's moderation history to a JSON file"""
|
|
if ephemeral is None:
|
|
ephemeral = (
|
|
await config.user(interaction.user).history_ephemeral()
|
|
or await config.guild(interaction.guild).history_ephemeral()
|
|
or False
|
|
)
|
|
|
|
if inline is None:
|
|
inline = (
|
|
await config.user(interaction.user).history_inline()
|
|
or await config.guild(interaction.guild).history_inline()
|
|
or False
|
|
)
|
|
|
|
if pagesize is None:
|
|
if inline is True:
|
|
pagesize = (
|
|
await config.user(interaction.user).history_inline_pagesize()
|
|
or await config.guild(interaction.guild).history_inline_pagesize()
|
|
or 6
|
|
)
|
|
else:
|
|
pagesize = (
|
|
await config.user(interaction.user).history_pagesize()
|
|
or await config.guild(interaction.guild).history_pagesize()
|
|
or 5
|
|
)
|
|
|
|
if before and not on:
|
|
try:
|
|
before = parse(before)
|
|
except (ParserError, OverflowError) as e:
|
|
if e == ParserError:
|
|
await interaction.response.send_message(
|
|
content=error("Invalid date format for `before` parameter!"), ephemeral=True
|
|
)
|
|
return
|
|
if e == OverflowError:
|
|
await interaction.response.send_message(
|
|
content=error("Date is too far in the future!"), ephemeral=True
|
|
)
|
|
return
|
|
|
|
if after and not on:
|
|
try:
|
|
after = parse(after)
|
|
except (ParserError, OverflowError) as e:
|
|
if e == ParserError:
|
|
await interaction.response.send_message(
|
|
content=error("Invalid date format for `after` parameter!"), ephemeral=True
|
|
)
|
|
return
|
|
if e == OverflowError:
|
|
await interaction.response.send_message(
|
|
content=error("Date is too far in the future!"), ephemeral=True
|
|
)
|
|
return
|
|
|
|
if on:
|
|
try:
|
|
on = parse(on)
|
|
except (ParserError, OverflowError) as e:
|
|
if e == ParserError:
|
|
await interaction.response.send_message(
|
|
content=error("Invalid date format for `on` parameter!"), ephemeral=True
|
|
)
|
|
return
|
|
if e == OverflowError:
|
|
await interaction.response.send_message(
|
|
content=error("Date is too far in the future!"), ephemeral=True
|
|
)
|
|
return
|
|
|
|
before = datetime.combine(on, datetime.max.time())
|
|
after = datetime.combine(on, datetime.min.time())
|
|
|
|
await interaction.response.defer(ephemeral=ephemeral)
|
|
|
|
permissions = check_permissions(
|
|
interaction.client.user, ["embed_links"], interaction
|
|
)
|
|
if permissions:
|
|
await interaction.followup.send(
|
|
error(
|
|
f"I do not have the `{permissions}` permission, required for this action."
|
|
),
|
|
ephemeral=True,
|
|
)
|
|
return
|
|
|
|
if export and not types:
|
|
types = 'all'
|
|
|
|
type_list = []
|
|
registry_values = type_registry.values()
|
|
if types:
|
|
for t in types.split(","):
|
|
stripped = t.strip().lower()
|
|
if stripped == "all":
|
|
type_list.clear()
|
|
type_list = registry_values
|
|
break
|
|
try:
|
|
type_list.append(type_registry[stripped])
|
|
except RegistryKeyError:
|
|
continue
|
|
else:
|
|
for t in registry_values:
|
|
if await config.custom("types", interaction.guild.id, t.key).show_in_history() is True:
|
|
type_list.append(t)
|
|
|
|
if target:
|
|
filename = f"moderation_target_{str(target.id)}_{str(interaction.guild.id)}.json"
|
|
moderations = await Moderation.find_by_target(bot=interaction.client, guild_id=interaction.guild.id, target=target.id, before=before, after=after, types=type_list)
|
|
elif moderator:
|
|
filename = f"moderation_moderator_{str(moderator.id)}_{str(interaction.guild.id)}.json"
|
|
moderations = await Moderation.find_by_moderator(bot=interaction.client, guild_id=interaction.guild.id, moderator=moderator.id, before=before, after=after, types=type_list)
|
|
else:
|
|
filename = f"moderation_{str(interaction.guild.id)}.json"
|
|
moderations = await Moderation.get_latest(bot=interaction.client, guild_id=interaction.guild.id, before=before, after=after, types=type_list)
|
|
|
|
if export:
|
|
try:
|
|
filepath = (
|
|
str(data_manager.cog_data_path(cog_instance=self))
|
|
+ str(os.sep)
|
|
+ filename
|
|
)
|
|
|
|
with open(filepath, "w", encoding="utf-8") as f:
|
|
dump(obj=moderations, fp=f, indent=2)
|
|
|
|
await interaction.followup.send(
|
|
file=discord.File(
|
|
fp=filepath, filename=filename
|
|
),
|
|
ephemeral=ephemeral,
|
|
)
|
|
|
|
os.remove(filepath)
|
|
except json.JSONDecodeError as e:
|
|
await interaction.followup.send(
|
|
content=error(
|
|
"An error occured while exporting the moderation history.\nError:\n"
|
|
)
|
|
+ box(text=e, lang="py"),
|
|
ephemeral=ephemeral,
|
|
)
|
|
return
|
|
|
|
case_quantity = len(moderations)
|
|
page_quantity = ceil(case_quantity / pagesize)
|
|
start_index = (page - 1) * pagesize
|
|
end_index = page * pagesize
|
|
|
|
embed = discord.Embed(color=await self.bot.get_embed_color(interaction.channel))
|
|
embed.set_author(icon_url=interaction.guild.icon.url, name="Infraction History")
|
|
embed.set_footer(
|
|
text=f"Page {page:,}/{page_quantity:,} | {case_quantity:,} Results"
|
|
)
|
|
|
|
memory_dict = {}
|
|
|
|
for mod in moderations[start_index:end_index]:
|
|
if mod.target_id not in memory_dict:
|
|
memory_dict.update({
|
|
str(mod.target_id): await mod.get_target()
|
|
})
|
|
target = memory_dict[str(mod.target_id)]
|
|
|
|
if mod.moderator_id not in memory_dict:
|
|
memory_dict.update({
|
|
str(mod.moderator_id): await mod.get_moderator()
|
|
})
|
|
moderator = memory_dict[str(mod.moderator_id)]
|
|
|
|
field_name = f"Case #{mod.id:,} ({mod.type.string.title()})"
|
|
field_value = f"**Target:** `{target.name}` ({target.id})\n**Moderator:** `{moderator.name}` ({moderator.id})"
|
|
|
|
if len(str(mod.reason)) > 125:
|
|
field_value += f"\n**Reason:** `{str(mod.reason)[:125]}...`"
|
|
else:
|
|
field_value += f"\n**Reason:** `{str(mod.reason)}`"
|
|
|
|
if mod.duration:
|
|
duration_embed = (
|
|
f"{humanize_timedelta(timedelta=mod.duration)} | <t:{int(mod.end_timestamp.timestamp())}:R>"
|
|
if mod.expired is False
|
|
else f"{humanize_timedelta(timedelta=mod.duration)} | Expired"
|
|
)
|
|
field_value += f"\n**Duration:** {duration_embed}"
|
|
|
|
field_value += (
|
|
f"\n**Timestamp:** <t:{int(mod.timestamp.timestamp())}> | <t:{int(mod.timestamp.timestamp())}:R>"
|
|
)
|
|
|
|
if mod.role_id:
|
|
role = await mod.get_role()
|
|
field_value += f"\n**Role:** {role.mention} ({role.id})"
|
|
|
|
if mod.resolved:
|
|
field_value += "\n**Resolved:** True"
|
|
|
|
embed.add_field(name=field_name, value=field_value, inline=inline)
|
|
|
|
await interaction.followup.send(embed=embed, ephemeral=ephemeral)
|
|
|
|
@history.autocomplete('types')
|
|
async def _history_types(self, interaction: discord.Interaction, current: str) -> List[app_commands.Choice[str]]:
|
|
types: List[str] = sorted(self.type_registry.keys())
|
|
choices = list()
|
|
if current.endswith(","):
|
|
for c in current.split(","):
|
|
if c in types:
|
|
types.remove(c)
|
|
for t in types:
|
|
choices.append(app_commands.Choice(name=current+t, value=current+t))
|
|
else:
|
|
for t in types:
|
|
choices.append(app_commands.Choice(name=t, value=t))
|
|
return choices[:25]
|
|
|
|
@app_commands.command(name="resolve")
|
|
async def resolve(
|
|
self, interaction: discord.Interaction, case: int, reason: str | None = None
|
|
):
|
|
"""Resolve a specific case.
|
|
|
|
Parameters
|
|
-----------
|
|
case: int
|
|
Case number of the case you're trying to resolve
|
|
reason: str
|
|
Reason for resolving case"""
|
|
permissions = check_permissions(
|
|
interaction.client.user,
|
|
("embed_links", "moderate_members", "ban_members"),
|
|
interaction,
|
|
)
|
|
if permissions:
|
|
await interaction.response.send_message(
|
|
error(
|
|
f"I do not have the `{permissions}` permission, required for this action."
|
|
),
|
|
ephemeral=True,
|
|
)
|
|
return
|
|
|
|
try:
|
|
moderation = await Moderation.find_by_id(interaction.client, case, interaction.guild.id)
|
|
except ValueError:
|
|
await interaction.response.send_message(
|
|
content=error(f"Case #{case:,} does not exist!"), ephemeral=True
|
|
)
|
|
return
|
|
if len(moderation.changes) > 25:
|
|
await interaction.response.send_message(
|
|
content=error(
|
|
"Due to limitations with Discord's embed system, you cannot edit a case more than 25 times."
|
|
),
|
|
ephemeral=True,
|
|
)
|
|
return
|
|
|
|
try:
|
|
success, msg = await moderation.resolve(interaction.user.id, reason)
|
|
except (ValueError, TypeError) as e:
|
|
if e == ValueError:
|
|
await interaction.response.send_message(
|
|
content=error("This case has already been resolved!"), ephemeral=True
|
|
)
|
|
elif e == TypeError:
|
|
await interaction.response.send_message(
|
|
content=error("This case type cannot be resolved!"), ephemeral=True
|
|
)
|
|
|
|
embed = await case_factory(
|
|
interaction=interaction,
|
|
moderation=moderation,
|
|
)
|
|
await interaction.response.send_message(
|
|
content=f"✅ Moderation #{case:,} resolved!\n" + error(f"Resolve handler returned an error message: `{msg}`") if success is False else "", embed=embed
|
|
)
|
|
ctx = await self.bot.get_context(interaction, cls=commands.Context)
|
|
await log(ctx=ctx, moderation_id=case, resolved=True)
|
|
|
|
@app_commands.command(name="case")
|
|
@app_commands.choices(
|
|
raw=[
|
|
Choice(name="Export as Codeblock", value="codeblock"),
|
|
Choice(name="Export as File", value="file"),
|
|
]
|
|
)
|
|
async def case(
|
|
self,
|
|
interaction: discord.Interaction,
|
|
case: int,
|
|
ephemeral: bool | None = None,
|
|
evidenceformat: bool = False,
|
|
changes: bool = False,
|
|
raw: Choice[str] | None = None,
|
|
):
|
|
"""Check the details of a specific case.
|
|
|
|
Parameters
|
|
-----------
|
|
case: int
|
|
What case are you looking up?
|
|
ephemeral: bool
|
|
Hide the command response
|
|
evidenceformat: bool
|
|
Display the evidence format of the case
|
|
changes: bool
|
|
List the changes made to the case
|
|
raw: bool
|
|
Export the case to a JSON file or codeblock"""
|
|
permissions = check_permissions(
|
|
interaction.client.user, ["embed_links"], interaction
|
|
)
|
|
if permissions:
|
|
await interaction.response.send_message(
|
|
error(
|
|
f"I do not have the `{permissions}` permission, required for this action."
|
|
),
|
|
ephemeral=True,
|
|
)
|
|
return
|
|
|
|
if ephemeral is None:
|
|
ephemeral = (
|
|
await config.user(interaction.user).history_ephemeral()
|
|
or await config.guild(interaction.guild).history_ephemeral()
|
|
or False
|
|
)
|
|
|
|
try:
|
|
mod = await Moderation.find_by_id(interaction.client, case, interaction.guild.id)
|
|
except ValueError:
|
|
await interaction.response.send_message(
|
|
content=error(f"Case #{case:,} does not exist!"), ephemeral=True
|
|
)
|
|
return
|
|
|
|
if raw:
|
|
if raw.value == "file" or len(mod.to_json(2)) > 1800:
|
|
filename = (
|
|
str(data_manager.cog_data_path(cog_instance=self))
|
|
+ str(os.sep)
|
|
+ f"moderation_{interaction.guild.id}_case_{case}.json"
|
|
)
|
|
|
|
with open(filename, "w", encoding="utf-8") as f:
|
|
mod.to_json(2, f)
|
|
if raw.value == "codeblock":
|
|
content = f"Case #{case:,} exported.\n" + warning(
|
|
"Case was too large to export as codeblock, so it has been uploaded as a `.json` file."
|
|
)
|
|
else:
|
|
content = f"Case #{case:,} exported."
|
|
|
|
await interaction.response.send_message(
|
|
content=content,
|
|
file=discord.File(
|
|
filename,
|
|
f"moderation_{interaction.guild.id}_case_{case}.json",
|
|
),
|
|
ephemeral=ephemeral,
|
|
)
|
|
|
|
os.remove(filename)
|
|
return
|
|
await interaction.response.send_message(
|
|
content=box(mod.to_json(2), 'json'),
|
|
ephemeral=ephemeral,
|
|
)
|
|
return
|
|
if changes:
|
|
embed = await changes_factory(
|
|
interaction=interaction, moderation=mod
|
|
)
|
|
await interaction.response.send_message(
|
|
embed=embed, ephemeral=ephemeral
|
|
)
|
|
elif evidenceformat:
|
|
content = await evidenceformat_factory(moderation=mod)
|
|
await interaction.response.send_message(
|
|
content=content, ephemeral=ephemeral
|
|
)
|
|
else:
|
|
embed = await case_factory(
|
|
interaction=interaction, moderation=mod
|
|
)
|
|
await interaction.response.send_message(
|
|
embed=embed, ephemeral=ephemeral
|
|
)
|
|
return
|
|
|
|
@app_commands.command(name="edit")
|
|
async def edit(
|
|
self,
|
|
interaction: discord.Interaction,
|
|
case: int,
|
|
reason: str,
|
|
duration: str | None = None,
|
|
):
|
|
"""Edit the reason of a specific case.
|
|
|
|
Parameters
|
|
-----------
|
|
case: int
|
|
What case are you editing?
|
|
reason: str
|
|
What is the new reason?
|
|
duration: str
|
|
What is the new duration? Does not reapply the moderation if it has already expired.
|
|
"""
|
|
permissions = check_permissions(
|
|
interaction.client.user, ["embed_links"], interaction
|
|
)
|
|
if permissions:
|
|
await interaction.response.send_message(
|
|
error(
|
|
f"I do not have the `{permissions}` permission, required for this action."
|
|
),
|
|
ephemeral=True,
|
|
)
|
|
return
|
|
|
|
try:
|
|
moderation = await Moderation.find_by_id(interaction.client, case, interaction.guild.id)
|
|
old_moderation = moderation
|
|
except ValueError:
|
|
await interaction.response.send_message(
|
|
content=error(f"Case #{case:,} does not exist!"), ephemeral=True
|
|
)
|
|
return
|
|
|
|
if len(moderation.changes) > 25:
|
|
return await interaction.response.send_message(
|
|
content=error(
|
|
"Due to limitations with Discord's embed system, you cannot edit a case more than 25 times."
|
|
),
|
|
ephemeral=True,
|
|
)
|
|
|
|
if duration:
|
|
moderation.duration = parse_timedelta(duration)
|
|
if moderation.duration is None:
|
|
return await interaction.response.send_message(
|
|
error("Please provide a valid duration!"), ephemeral=True
|
|
)
|
|
|
|
moderation.end_timestamp = moderation.timestamp + moderation.duration.total_seconds()
|
|
|
|
try:
|
|
success = await moderation.type.duration_edit_handler(interaction=interaction.client, old_moderation=old_moderation, new_moderation=moderation)
|
|
except NotImplementedError:
|
|
return await interaction.response.send_message(
|
|
error("This case type does not support duration editing!"), ephemeral=True
|
|
)
|
|
if not success:
|
|
return
|
|
|
|
if reason:
|
|
moderation.reason = reason
|
|
|
|
if not moderation.changes:
|
|
moderation.changes.append(Change.from_dict(interaction.client, {
|
|
"type": "ORIGINAL",
|
|
"timestamp": old_moderation.timestamp,
|
|
"reason": old_moderation.reason,
|
|
"user_id": old_moderation.moderator_id,
|
|
"duration": old_moderation.duration,
|
|
"end_timestamp": old_moderation.end_timestamp,
|
|
}))
|
|
if duration:
|
|
moderation.changes.append(Change.from_dict(interaction.client, {
|
|
"type": "EDIT",
|
|
"timestamp": int(time.time()),
|
|
"reason": reason,
|
|
"user_id": interaction.user.id,
|
|
"duration": moderation.duration,
|
|
"end_timestamp": moderation.end_timestamp,
|
|
}))
|
|
else:
|
|
moderation.changes.append(Change.from_dict(interaction.client, {
|
|
"type": "EDIT",
|
|
"timestamp": int(time.time()),
|
|
"reason": reason,
|
|
"user_id": interaction.user.id,
|
|
"duration": moderation.duration,
|
|
"end_timestamp": moderation.end_timestamp,
|
|
}))
|
|
|
|
await moderation.update()
|
|
embed = await case_factory(interaction=interaction, moderation=moderation)
|
|
|
|
await interaction.response.send_message(
|
|
content=f"✅ Moderation #{case:,} edited!",
|
|
embed=embed,
|
|
ephemeral=True,
|
|
)
|
|
await log(interaction, case)
|
|
|
|
return
|
|
|
|
@tasks.loop(minutes=1)
|
|
async def handle_expiry(self):
|
|
await self.bot.wait_until_red_ready()
|
|
current_time = time.time()
|
|
global_unban_num = 0
|
|
global_addrole_num = 0
|
|
global_removerole_num = 0
|
|
global_other_num = 0
|
|
|
|
guilds: list[discord.Guild] = self.bot.guilds
|
|
for guild in guilds:
|
|
if not await self.bot.cog_disabled_in_guild(self, guild):
|
|
time_per_guild = time.time()
|
|
|
|
query = f"SELECT * FROM moderation_{guild.id} WHERE end_timestamp IS NOT NULL AND end_timestamp <= ? AND expired = 0"
|
|
moderations = await Moderation.execute(bot=self.bot, guild_id=guild.id, query=query, parameters=(time.time(),))
|
|
|
|
unban_num = 0
|
|
removerole_num = 0
|
|
addrole_num = 0
|
|
other_num = 0
|
|
for moderation in moderations:
|
|
try:
|
|
num = await moderation.type.expiry_handler(moderation)
|
|
except NotImplementedError:
|
|
logger.warning("Expiry handler not implemented for expirable moderation type %s", moderation.type.key)
|
|
continue
|
|
match moderation.type.key:
|
|
case "tempban":
|
|
unban_num += num
|
|
case "addrole":
|
|
removerole_num += num
|
|
case "removerole":
|
|
addrole_num += num
|
|
case _:
|
|
other_num += num if isinstance(num, int) else 0
|
|
|
|
expiry_query = f"UPDATE `moderation_{guild.id}` SET expired = 1 WHERE (end_timestamp IS NOT NULL AND end_timestamp <= ? AND expired = 0) OR (expired = 0 AND resolved = 1);"
|
|
await Moderation.execute(bot=self.bot, guild_id=guild.id, query=expiry_query, parameters=(time.time(),), return_obj=False)
|
|
|
|
per_guild_completion_time = (time.time() - time_per_guild) * 1000
|
|
logger.debug(
|
|
"Completed expiry loop for %s (%s) in %sms with %s users unbanned, %s roles added, and %s roles removed (%s other cases expired)",
|
|
guild.name,
|
|
guild.id,
|
|
f"{per_guild_completion_time:.6f}",
|
|
unban_num,
|
|
addrole_num,
|
|
removerole_num,
|
|
other_num
|
|
)
|
|
global_unban_num = global_unban_num + unban_num
|
|
global_addrole_num = global_addrole_num + addrole_num
|
|
global_removerole_num = global_removerole_num + removerole_num
|
|
global_other_num = global_other_num + other_num
|
|
|
|
|
|
completion_time = (time.time() - current_time) * 1000
|
|
logger.debug(
|
|
"Completed expiry loop in %sms with %s users unbanned, %s roles added, and %s roles removed (%s other cases expired)",
|
|
f"{completion_time:.6f}",
|
|
global_unban_num,
|
|
global_addrole_num,
|
|
global_removerole_num,
|
|
global_other_num
|
|
)
|
|
|
|
########################################################################################################################
|
|
### Configuration Commands #
|
|
########################################################################################################################
|
|
|
|
@commands.group(autohelp=True, aliases=["moderation", "mod"])
|
|
async def aurora(self, ctx: commands.Context):
|
|
"""Settings and miscellaneous commands for Aurora."""
|
|
|
|
@aurora.group(autohelp=True, name="settings", aliases=["config", "options", "set"])
|
|
async def aurora_settings(self, ctx: commands.Context):
|
|
"""Configure Aurora's settings."""
|
|
|
|
@aurora_settings.command(name="overrides", aliases=["override", "user"])
|
|
async def aurora_settings_overrides(self, ctx: commands.Context):
|
|
"""Manage Aurora's user overriddable settings."""
|
|
msg = await ctx.send(embed=await overrides_embed(ctx))
|
|
await msg.edit(view=Overrides(ctx, msg, 60))
|
|
|
|
@aurora_settings.command(name="guild", aliases=["server"])
|
|
@commands.admin_or_permissions(manage_guild=True)
|
|
@commands.guild_only()
|
|
async def aurora_settings_guild(self, ctx: commands.Context):
|
|
"""Manage Aurora's guild settings."""
|
|
msg = await ctx.send(embed=await guild_embed(ctx))
|
|
await msg.edit(view=Guild(ctx, msg, 60))
|
|
|
|
@aurora_settings.command(name="type")
|
|
@commands.admin_or_permissions(manage_guild=True)
|
|
@commands.guild_only()
|
|
async def aurora_settings_type(self, ctx: commands.Context, moderation_type: str):
|
|
"""Manage configuration options for specific moderation types.
|
|
|
|
See [the documentation](https://seacogs.coastalcommits.com/Aurora/Types) for a list of built-in moderation types, or run this command with a junk argument (`awasd` or something) to see a list of valid types."""
|
|
try:
|
|
registered_type = type_registry.get(moderation_type)
|
|
except RegistryKeyError:
|
|
types = "`, `".join(type_registry.keys())
|
|
await ctx.send(error(f"`{moderation_type}` is not a valid moderation type.\nValid types are:\n`{types}`"))
|
|
return
|
|
msg = await ctx.send(embed=await type_embed(ctx, registered_type))
|
|
await msg.edit(view=Types(ctx, msg, registered_type))
|
|
|
|
@aurora_settings.command(name="addrole", aliases=["removerole"])
|
|
@commands.admin_or_permissions(manage_guild=True)
|
|
@commands.guild_only()
|
|
async def aurora_settings_addrole(self, ctx: commands.Context):
|
|
"""Manage the addrole whitelist.
|
|
|
|
Roles added to this list are also applied to `/removerole`."""
|
|
msg = await ctx.send(embed=await addrole_embed(ctx))
|
|
await msg.edit(view=Addrole(ctx, msg, 60))
|
|
|
|
@aurora_settings.command(name="immunity")
|
|
@commands.admin_or_permissions(manage_guild=True)
|
|
@commands.guild_only()
|
|
async def aurora_settings_immunity(self, ctx: commands.Context):
|
|
"""Manage the immunity whitelist."""
|
|
msg = await ctx.send(embed=await immune_embed(ctx))
|
|
await msg.edit(view=Immune(ctx, msg, 60))
|
|
|
|
@aurora.group(autohelp=True, name="import")
|
|
@commands.admin()
|
|
@commands.guild_only()
|
|
async def aurora_import(self, ctx: commands.Context):
|
|
"""Import moderation history from other bots."""
|
|
|
|
@aurora_import.command(name="aurora")
|
|
@commands.admin()
|
|
async def aurora_import_aurora(self, ctx: commands.Context):
|
|
"""Import moderation history from another bot using Aurora."""
|
|
if (
|
|
ctx.message.attachments
|
|
and ctx.message.attachments[0].content_type
|
|
== "application/json; charset=utf-8"
|
|
):
|
|
file = await ctx.message.attachments[0].read()
|
|
data: list[dict] = sorted(json.loads(file), key=lambda x: x["moderation_id"])
|
|
message = await ctx.send(
|
|
warning(
|
|
"Are you sure you want to import moderations from another bot?\n**This will overwrite any moderations that already exist in this guild's moderation table.**\n*The import process will block the rest of your bot until it is complete.*"
|
|
)
|
|
)
|
|
await message.edit(view=ImportAuroraView(60, ctx, message, data))
|
|
else:
|
|
await ctx.send(error("Please provide a valid Aurora export file."))
|
|
|
|
@aurora_import.command(name="galacticbot")
|
|
@commands.admin()
|
|
async def aurora_import_galacticbot(self, ctx: commands.Context):
|
|
"""Import moderation history from GalacticBot."""
|
|
if (
|
|
ctx.message.attachments
|
|
and ctx.message.attachments[0].content_type
|
|
== "application/json; charset=utf-8"
|
|
):
|
|
message = await ctx.send(
|
|
warning(
|
|
"Are you sure you want to import GalacticBot moderations?\n**This will overwrite any moderations that already exist in this guild's moderation table.**\n*The import process will block the rest of your bot until it is complete.*"
|
|
)
|
|
)
|
|
await message.edit(view=ImportGalacticBotView(60, ctx, message))
|
|
else:
|
|
await ctx.send(
|
|
error("Please provide a valid GalacticBot moderation export file.")
|
|
)
|
|
|
|
@aurora.group(autohelp=True, name="convert")
|
|
async def aurora_convert(self, ctx: commands.Context):
|
|
"""Convert strings to various Python objects."""
|
|
|
|
@aurora_convert.command(aliases=["dt"])
|
|
async def datetime(self, ctx: commands.Context, *, date: str) -> None:
|
|
"""Convert a string to a datetime object.
|
|
|
|
This command converts a date to a [`datetime`](https://docs.python.org/3/library/datetime.html#datetime.datetime) Python object.
|
|
|
|
**Example usage**
|
|
`[p]aurora datetime 08/20/2024`
|
|
**Output**
|
|
`2024-08-20 12:00:00`"""
|
|
try:
|
|
parsed_date = parse(date)
|
|
await ctx.send(f"`{parsed_date}`")
|
|
except (ParserError, OverflowError) as e:
|
|
if e == ParserError:
|
|
await ctx.send(error("Invalid date format!"))
|
|
if e == OverflowError:
|
|
await ctx.send(error("Date is too far in the future!"))
|
|
|
|
@aurora_convert.command(aliases=["td"])
|
|
async def timedelta(self, ctx: commands.Context, *, duration: str) -> None:
|
|
"""Convert a string to a timedelta.
|
|
|
|
This command converts a duration to a [`timedelta`](https://docs.python.org/3/library/datetime.html#datetime.timedelta) Python object.
|
|
You cannot convert years or months as they are not fixed units. Use `[p]aurora relativedelta` for that.
|
|
|
|
**Example usage**
|
|
`[p]aurora timedelta 1 day 15hr 82 minutes 52s`
|
|
**Output**
|
|
`1 day, 16:22:52`"""
|
|
parsed_time = parse_timedelta(duration)
|
|
if parsed_time is None:
|
|
await ctx.send(error("Please provide a convertible value!"))
|
|
return
|
|
await ctx.send(f"`{parsed_time}`")
|
|
|
|
@aurora_convert.command(aliases=["rd"])
|
|
async def relativedelta(self, ctx: commands.Context, *, duration: str) -> None:
|
|
"""Convert a string to a relativedelta.
|
|
|
|
This command converts a duration to a [`relativedelta`](https://dateutil.readthedocs.io/en/stable/relativedelta.html) Python object.
|
|
|
|
**Example usage**
|
|
`[p]aurora relativedelta 3 years 1 day 15hr 82 minutes 52s`
|
|
**Output**
|
|
`relativedelta(years=+3, days=+1, hours=+15, minutes=+82, seconds=+52)`"""
|
|
parsed_time = parse_relativedelta(duration)
|
|
if parsed_time is None:
|
|
await ctx.send(error("Please provide a convertible value!"))
|
|
return
|
|
await ctx.send(f"`{parsed_time}`")
|