Seaswimmer
9c345ed96b
none of the options do anything yet, this is just creating the configuration keys and the menu to modify them
1227 lines
46 KiB
Python
1227 lines
46 KiB
Python
# _____ _
|
|
# / ____| (_)
|
|
# | (___ ___ __ _ _____ ___ _ __ ___ _ __ ___ ___ _ __
|
|
# \___ \ / _ \/ _` / __\ \ /\ / / | '_ ` _ \| '_ ` _ \ / _ \ '__|
|
|
# ____) | __/ (_| \__ \\ V V /| | | | | | | | | | | | __/ |
|
|
# |_____/ \___|\__,_|___/ \_/\_/ |_|_| |_| |_|_| |_| |_|\___|_|
|
|
|
|
import json
|
|
import logging as py_logging
|
|
import os
|
|
import time
|
|
from datetime import datetime, timedelta, timezone
|
|
from math import ceil
|
|
|
|
import discord
|
|
from dateutil.parser import ParserError, parse
|
|
from discord.ext import tasks
|
|
from menus.types import Types
|
|
from redbot.core import app_commands, commands, data_manager
|
|
from redbot.core.app_commands import Choice
|
|
from redbot.core.bot import Red
|
|
from redbot.core.commands.converter import parse_relativedelta, parse_timedelta
|
|
from redbot.core.utils.chat_formatting import bold, box, error, humanize_list, humanize_timedelta, warning
|
|
|
|
from .importers.aurora import ImportAuroraView
|
|
from .importers.galacticbot import ImportGalacticBotView
|
|
from .menus.addrole import Addrole
|
|
from .menus.guild import Guild
|
|
from .menus.immune import Immune
|
|
from .menus.overrides import Overrides
|
|
from .models.change import Change
|
|
from .models.moderation import Moderation
|
|
from .models.type import type_registry
|
|
from .utilities.config import config, register_config
|
|
from .utilities.factory import addrole_embed, case_factory, changes_factory, evidenceformat_factory, guild_embed, immune_embed, overrides_embed, type_embed
|
|
from .utilities.json import dump
|
|
from .utilities.logger import logger
|
|
from .utilities.moderate import moderate
|
|
from .utilities.utils import check_permissions, create_guild_table, log
|
|
|
|
|
|
class Aurora(commands.Cog):
|
|
"""Aurora is a fully-featured moderation system.
|
|
It is heavily inspired by GalacticBot, and is designed to be a more user-friendly alternative to Red's core Mod cogs.
|
|
This cog stores all of its data in an SQLite database."""
|
|
|
|
__author__ = ["SeaswimmerTheFsh"]
|
|
__version__ = "2.4.2"
|
|
__documentation__ = "https://seacogs.coastalcommits.com/aurora/"
|
|
|
|
async def red_delete_data_for_user(self, *, requester, user_id: int):
|
|
if requester == "discord_deleted_user":
|
|
await config.user_from_id(user_id).clear()
|
|
|
|
results = await Moderation.execute(query="SHOW TABLES;", return_obj=False)
|
|
tables = [table[0] for table in results]
|
|
|
|
condition = "target_id = %s OR moderator_id = %s;"
|
|
|
|
for table in tables:
|
|
delete_query = f"DELETE FROM {table[0]} WHERE {condition}"
|
|
await Moderation.execute(query=delete_query, parameters=(user_id, user_id), return_obj=False)
|
|
|
|
if requester == "owner":
|
|
await config.user_from_id(user_id).clear()
|
|
if requester == "user":
|
|
await config.user_from_id(user_id).clear()
|
|
if requester == "user_strict":
|
|
await config.user_from_id(user_id).clear()
|
|
else:
|
|
logger.warning(
|
|
"Invalid requester passed to red_delete_data_for_user: %s", requester
|
|
)
|
|
|
|
def __init__(self, bot: Red) -> None:
|
|
super().__init__()
|
|
self.bot = bot
|
|
self.type_registry = type_registry
|
|
register_config(config)
|
|
self.handle_expiry.start()
|
|
# If we don't override aiosqlite's logging level, it will spam the console with dozens of debug messages per query.
|
|
# This is unnecessary because Aurora already logs all of its SQL queries (or at least, most of them),
|
|
# and the information that aiosqlite logs is not useful to the bot owner.
|
|
# This is a bad solution though as it overrides it for any other cogs that are using aiosqlite too.
|
|
# If there's a better solution that you're aware of, please let me know in Discord or in a CoastalCommits issue.
|
|
py_logging.getLogger('aiosqlite').setLevel(py_logging.INFO)
|
|
|
|
def format_help_for_context(self, ctx: commands.Context) -> str:
|
|
pre_processed = super().format_help_for_context(ctx) or ""
|
|
n = "\n" if "\n\n" not in pre_processed else ""
|
|
text = [
|
|
f"{pre_processed}{n}",
|
|
f"{bold('Cog Version:')} {self.__version__}",
|
|
f"{bold('Author:')} {humanize_list(self.__author__)}",
|
|
f"{bold('Documentation:')} {self.__documentation__}",
|
|
]
|
|
return "\n".join(text)
|
|
|
|
async def cog_load(self):
|
|
"""This method prepares the database schema for all of the guilds the bot is currently in."""
|
|
guilds: list[discord.Guild] = self.bot.guilds
|
|
|
|
try:
|
|
for guild in guilds:
|
|
if not await self.bot.cog_disabled_in_guild(self, guild):
|
|
await create_guild_table(guild)
|
|
|
|
except ConnectionRefusedError:
|
|
return
|
|
|
|
async def cog_unload(self):
|
|
self.handle_expiry.cancel()
|
|
|
|
@commands.Cog.listener("on_guild_join")
|
|
async def db_generate_guild_join(self, guild: discord.Guild):
|
|
"""This method prepares the database schema whenever the bot joins a guild."""
|
|
if not await self.bot.cog_disabled_in_guild(self, guild):
|
|
try:
|
|
await create_guild_table(guild)
|
|
except ConnectionRefusedError:
|
|
return
|
|
|
|
@commands.Cog.listener("on_member_join")
|
|
async def addrole_on_member_join(self, member: discord.Member):
|
|
"""This method automatically adds roles to users when they join the server."""
|
|
if not await self.bot.cog_disabled_in_guild(self, member.guild):
|
|
query = f"""SELECT moderation_id, role_id, reason FROM moderation_{member.guild.id} WHERE target_id = ? AND moderation_type = 'ADDROLE' AND expired = 0 AND resolved = 0;"""
|
|
results = Moderation.execute(query, (member.id,))
|
|
for row in results:
|
|
role = member.guild.get_role(row[1])
|
|
reason = row[2]
|
|
await member.add_roles(role, reason=f"Role automatically added on member rejoin for: {reason} (Case #{row[0]:,})")
|
|
|
|
@commands.Cog.listener("on_audit_log_entry_create")
|
|
async def autologger(self, entry: discord.AuditLogEntry):
|
|
"""This method automatically logs moderations done by users manually ("right clicks")."""
|
|
if not await self.bot.cog_disabled_in_guild(self, entry.guild):
|
|
if await config.guild(entry.guild).ignore_other_bots() is True:
|
|
if entry.user.bot or entry.target.bot:
|
|
return
|
|
else:
|
|
if entry.user.id == self.bot.user.id:
|
|
return
|
|
|
|
duration = None
|
|
|
|
if entry.reason:
|
|
reason = entry.reason + " (This action was performed without the bot.)"
|
|
|
|
else:
|
|
reason = "This action was performed without the bot."
|
|
|
|
if entry.action == discord.AuditLogAction.kick:
|
|
moderation_type = "KICK"
|
|
|
|
elif entry.action == discord.AuditLogAction.ban:
|
|
moderation_type = "BAN"
|
|
|
|
elif entry.action == discord.AuditLogAction.unban:
|
|
moderation_type = "UNBAN"
|
|
|
|
elif entry.action == discord.AuditLogAction.member_update:
|
|
if entry.after.timed_out_until is not None:
|
|
timed_out_until_aware = entry.after.timed_out_until.replace(
|
|
tzinfo=timezone.utc
|
|
)
|
|
duration_datetime = timed_out_until_aware - datetime.now(
|
|
tz=timezone.utc
|
|
)
|
|
minutes = round(duration_datetime.total_seconds() / 60)
|
|
duration = timedelta(minutes=minutes)
|
|
moderation_type = "MUTE"
|
|
else:
|
|
moderation_type = "UNMUTE"
|
|
else:
|
|
return
|
|
|
|
await Moderation.log(
|
|
self.bot,
|
|
entry.guild.id,
|
|
entry.user.id,
|
|
moderation_type,
|
|
"USER",
|
|
entry.target.id,
|
|
None,
|
|
duration,
|
|
reason,
|
|
)
|
|
|
|
#######################################################################################################################
|
|
### COMMANDS
|
|
#######################################################################################################################
|
|
|
|
@app_commands.command(name="note")
|
|
async def note(
|
|
self,
|
|
interaction: discord.Interaction,
|
|
target: discord.User,
|
|
reason: str,
|
|
silent: bool | None = None,
|
|
):
|
|
"""Add a note to a user.
|
|
|
|
Parameters
|
|
-----------
|
|
target: discord.User
|
|
Who are you noting?
|
|
reason: str
|
|
Why are you noting this user?
|
|
silent: bool
|
|
Should the user be messaged?"""
|
|
await moderate(
|
|
ctx=interaction,
|
|
target=target,
|
|
silent=silent,
|
|
permissions=["moderate_members"],
|
|
moderation_type=type_registry['note'],
|
|
reason=reason,
|
|
)
|
|
|
|
@app_commands.command(name="warn")
|
|
async def warn(
|
|
self,
|
|
interaction: discord.Interaction,
|
|
target: discord.Member,
|
|
reason: str,
|
|
silent: bool | None = None,
|
|
):
|
|
"""Warn a user.
|
|
|
|
Parameters
|
|
-----------
|
|
target: discord.Member
|
|
Who are you warning?
|
|
reason: str
|
|
Why are you warning this user?
|
|
silent: bool
|
|
Should the user be messaged?"""
|
|
await moderate(
|
|
ctx=interaction,
|
|
target=target,
|
|
silent=silent,
|
|
permissions=["moderate_members"],
|
|
moderation_type=type_registry['warn'],
|
|
reason=reason,
|
|
)
|
|
|
|
@app_commands.command(name="addrole")
|
|
async def addrole(
|
|
self,
|
|
interaction: discord.Interaction,
|
|
target: discord.Member,
|
|
role: discord.Role,
|
|
reason: str,
|
|
duration: str | None = None,
|
|
silent: bool | None = None,
|
|
):
|
|
"""Add a role to a user.
|
|
|
|
Parameters
|
|
-----------
|
|
target: discord.Member
|
|
Who are you adding a role to?
|
|
role: discord.Role
|
|
What role are you adding to the target?
|
|
reason: str
|
|
Why are you adding a role to this user?
|
|
duration: str
|
|
How long are you adding this role for?
|
|
silent: bool
|
|
Should the user be messaged?"""
|
|
await moderate(
|
|
ctx=interaction,
|
|
target=target,
|
|
silent=silent,
|
|
permissions=["moderate_members", "manage_roles"],
|
|
moderation_type=type_registry['addrole'],
|
|
reason=reason,
|
|
role=role,
|
|
duration=duration
|
|
)
|
|
|
|
@app_commands.command(name="removerole")
|
|
async def removerole(
|
|
self,
|
|
interaction: discord.Interaction,
|
|
target: discord.Member,
|
|
role: discord.Role,
|
|
reason: str,
|
|
duration: str | None = None,
|
|
silent: bool | None = None,
|
|
):
|
|
"""Remove a role from a user.
|
|
|
|
Parameters
|
|
-----------
|
|
target: discord.Member
|
|
Who are you removing a role from?
|
|
role: discord.Role
|
|
What role are you removing from the target?
|
|
reason: str
|
|
Why are you removing a role from this user?
|
|
duration: str
|
|
How long are you removing this role for?
|
|
silent: bool
|
|
Should the user be messaged?"""
|
|
await moderate(
|
|
ctx=interaction,
|
|
target=target,
|
|
silent=silent,
|
|
permissions=["moderate_members", "manage_roles"],
|
|
moderation_type=type_registry['removerole'],
|
|
reason=reason,
|
|
role=role,
|
|
duration=duration
|
|
)
|
|
|
|
@app_commands.command(name="mute")
|
|
async def mute(
|
|
self,
|
|
interaction: discord.Interaction,
|
|
target: discord.Member,
|
|
duration: str,
|
|
reason: str,
|
|
silent: bool | None = None,
|
|
):
|
|
"""Mute a user.
|
|
|
|
Parameters
|
|
-----------
|
|
target: discord.Member
|
|
Who are you unbanning?
|
|
duration: str
|
|
How long are you muting this user for?
|
|
reason: str
|
|
Why are you unbanning this user?
|
|
silent: bool
|
|
Should the user be messaged?"""
|
|
await moderate(
|
|
ctx=interaction,
|
|
target=target,
|
|
silent=silent,
|
|
permissions=["moderate_members"],
|
|
moderation_type=type_registry['mute'],
|
|
duration=duration,
|
|
reason=reason,
|
|
)
|
|
|
|
@app_commands.command(name="unmute")
|
|
async def unmute(
|
|
self,
|
|
interaction: discord.Interaction,
|
|
target: discord.Member,
|
|
reason: str | None = None,
|
|
silent: bool | None = None,
|
|
):
|
|
"""Unmute a user.
|
|
|
|
Parameters
|
|
-----------
|
|
target: discord.user
|
|
Who are you unmuting?
|
|
reason: str
|
|
Why are you unmuting this user?
|
|
silent: bool
|
|
Should the user be messaged?"""
|
|
await moderate(
|
|
ctx=interaction,
|
|
target=target,
|
|
silent=silent,
|
|
permissions=["moderate_members"],
|
|
moderation_type=type_registry['unmute'],
|
|
reason=reason,
|
|
)
|
|
|
|
@app_commands.command(name="kick")
|
|
async def kick(
|
|
self,
|
|
interaction: discord.Interaction,
|
|
target: discord.Member,
|
|
reason: str,
|
|
silent: bool | None = None,
|
|
):
|
|
"""Kick a user.
|
|
|
|
Parameters
|
|
-----------
|
|
target: discord.user
|
|
Who are you kicking?
|
|
reason: str
|
|
Why are you kicking this user?
|
|
silent: bool
|
|
Should the user be messaged?"""
|
|
await moderate(
|
|
ctx=interaction,
|
|
target=target,
|
|
silent=silent,
|
|
permissions=["kick_members"],
|
|
moderation_type=type_registry['kick'],
|
|
reason=reason,
|
|
)
|
|
|
|
@app_commands.command(name="ban")
|
|
@app_commands.choices(
|
|
delete_messages=[
|
|
Choice(name="None", value=0),
|
|
Choice(name="1 Hour", value=3600),
|
|
Choice(name="12 Hours", value=43200),
|
|
Choice(name="1 Day", value=86400),
|
|
Choice(name="3 Days", value=259200),
|
|
Choice(name="7 Days", value=604800),
|
|
]
|
|
)
|
|
async def ban(
|
|
self,
|
|
interaction: discord.Interaction,
|
|
target: discord.User,
|
|
reason: str,
|
|
duration: str | None = None,
|
|
delete_messages: Choice[int] | None = None,
|
|
silent: bool | None = None,
|
|
):
|
|
"""Ban a user.
|
|
|
|
Parameters
|
|
-----------
|
|
target: discord.user
|
|
Who are you banning?
|
|
duration: str
|
|
How long are you banning this user for?
|
|
reason: str
|
|
Why are you banning this user?
|
|
delete_messages: Choices[int]
|
|
How many days of messages to delete?
|
|
silent: bool
|
|
Should the user be messaged?"""
|
|
if duration:
|
|
await moderate(
|
|
ctx=interaction,
|
|
target=target,
|
|
silent=silent,
|
|
permissions=["ban_members"],
|
|
moderation_type=type_registry['tempban'],
|
|
reason=reason,
|
|
duration=duration,
|
|
delete_messages=delete_messages,
|
|
)
|
|
else:
|
|
await moderate(
|
|
ctx=interaction,
|
|
target=target,
|
|
silent=silent,
|
|
permissions=["ban_members"],
|
|
moderation_type=type_registry['ban'],
|
|
reason=reason,
|
|
delete_messages=delete_messages,
|
|
)
|
|
|
|
@app_commands.command(name="unban")
|
|
async def unban(
|
|
self,
|
|
interaction: discord.Interaction,
|
|
target: discord.User,
|
|
reason: str | None = None,
|
|
silent: bool | None = None,
|
|
):
|
|
"""Unban a user.
|
|
|
|
Parameters
|
|
-----------
|
|
target: discord.user
|
|
Who are you unbanning?
|
|
reason: str
|
|
Why are you unbanning this user?
|
|
silent: bool
|
|
Should the user be messaged?"""
|
|
await moderate(
|
|
ctx=interaction,
|
|
target=target,
|
|
silent=silent,
|
|
permissions=["ban_members"],
|
|
moderation_type=type_registry['unban'],
|
|
reason=reason,
|
|
)
|
|
|
|
@app_commands.command(name="slowmode")
|
|
async def slowmode(
|
|
self,
|
|
interaction: discord.Interaction,
|
|
interval: int,
|
|
channel: discord.TextChannel | None = None,
|
|
reason: str | None = None,
|
|
):
|
|
"""Set the slowmode of a channel.
|
|
|
|
Parameters
|
|
-----------
|
|
interval: int
|
|
The slowmode interval in seconds
|
|
channel: discord.TextChannel
|
|
The channel to set the slowmode in
|
|
reason: str
|
|
Why are you setting the slowmode?"""
|
|
if channel is None:
|
|
channel = interaction.channel
|
|
|
|
await moderate(
|
|
ctx=interaction,
|
|
target=channel,
|
|
silent=True,
|
|
permissions=["manage_channel"],
|
|
moderation_type=type_registry['slowmode'],
|
|
interval=interval,
|
|
reason=reason,
|
|
)
|
|
|
|
@app_commands.command(name="history")
|
|
async def history(
|
|
self,
|
|
interaction: discord.Interaction,
|
|
target: discord.User | None = None,
|
|
moderator: discord.User | None = None,
|
|
pagesize: app_commands.Range[int, 1, 20] | None = None,
|
|
page: int = 1,
|
|
on: str | None = None,
|
|
before: str | None = None,
|
|
after: str | None = None,
|
|
ephemeral: bool | None = None,
|
|
inline: bool | None = None,
|
|
export: bool = False,
|
|
):
|
|
"""List previous infractions.
|
|
|
|
Parameters
|
|
-----------
|
|
target: discord.User
|
|
User whose infractions to query, overrides moderator if both are given
|
|
moderator: discord.User
|
|
Query by moderator
|
|
pagesize: app_commands.Range[int, 1, 25]
|
|
Amount of infractions to list per page
|
|
page: int
|
|
Page to select
|
|
on: str
|
|
List infractions on a certain date
|
|
before: str
|
|
List infractions before a certain date
|
|
after: str
|
|
List infractions after a certain date
|
|
ephemeral: bool
|
|
Hide the command response
|
|
inline: bool
|
|
Display infractions in a grid arrangement (does not look very good)
|
|
export: bool
|
|
Exports the server's moderation history to a JSON file"""
|
|
if ephemeral is None:
|
|
ephemeral = (
|
|
await config.user(interaction.user).history_ephemeral()
|
|
or await config.guild(interaction.guild).history_ephemeral()
|
|
or False
|
|
)
|
|
|
|
if inline is None:
|
|
inline = (
|
|
await config.user(interaction.user).history_inline()
|
|
or await config.guild(interaction.guild).history_inline()
|
|
or False
|
|
)
|
|
|
|
if pagesize is None:
|
|
if inline is True:
|
|
pagesize = (
|
|
await config.user(interaction.user).history_inline_pagesize()
|
|
or await config.guild(interaction.guild).history_inline_pagesize()
|
|
or 6
|
|
)
|
|
else:
|
|
pagesize = (
|
|
await config.user(interaction.user).history_pagesize()
|
|
or await config.guild(interaction.guild).history_pagesize()
|
|
or 5
|
|
)
|
|
|
|
if before and not on:
|
|
try:
|
|
before = parse(before)
|
|
except (ParserError, OverflowError) as e:
|
|
if e == ParserError:
|
|
await interaction.response.send_message(
|
|
content=error("Invalid date format for `before` parameter!"), ephemeral=True
|
|
)
|
|
return
|
|
if e == OverflowError:
|
|
await interaction.response.send_message(
|
|
content=error("Date is too far in the future!"), ephemeral=True
|
|
)
|
|
return
|
|
|
|
if after and not on:
|
|
try:
|
|
after = parse(after)
|
|
except (ParserError, OverflowError) as e:
|
|
if e == ParserError:
|
|
await interaction.response.send_message(
|
|
content=error("Invalid date format for `after` parameter!"), ephemeral=True
|
|
)
|
|
return
|
|
if e == OverflowError:
|
|
await interaction.response.send_message(
|
|
content=error("Date is too far in the future!"), ephemeral=True
|
|
)
|
|
return
|
|
|
|
if on:
|
|
try:
|
|
on = parse(on)
|
|
except (ParserError, OverflowError) as e:
|
|
if e == ParserError:
|
|
await interaction.response.send_message(
|
|
content=error("Invalid date format for `on` parameter!"), ephemeral=True
|
|
)
|
|
return
|
|
if e == OverflowError:
|
|
await interaction.response.send_message(
|
|
content=error("Date is too far in the future!"), ephemeral=True
|
|
)
|
|
return
|
|
|
|
before = datetime.combine(on, datetime.max.time())
|
|
after = datetime.combine(on, datetime.min.time())
|
|
|
|
await interaction.response.defer(ephemeral=ephemeral)
|
|
|
|
permissions = check_permissions(
|
|
interaction.client.user, ["embed_links"], interaction
|
|
)
|
|
if permissions:
|
|
await interaction.followup.send(
|
|
error(
|
|
f"I do not have the `{permissions}` permission, required for this action."
|
|
),
|
|
ephemeral=True,
|
|
)
|
|
return
|
|
|
|
if target:
|
|
filename = f"moderation_target_{str(target.id)}_{str(interaction.guild.id)}.json"
|
|
moderations = await Moderation.find_by_target(bot=interaction.client, guild_id=interaction.guild.id, target=target.id, before=before, after=after)
|
|
elif moderator:
|
|
filename = f"moderation_moderator_{str(moderator.id)}_{str(interaction.guild.id)}.json"
|
|
moderations = await Moderation.find_by_moderator(bot=interaction.client, guild_id=interaction.guild.id, moderator=moderator.id, before=before, after=after)
|
|
else:
|
|
filename = f"moderation_{str(interaction.guild.id)}.json"
|
|
moderations = await Moderation.get_latest(bot=interaction.client, guild_id=interaction.guild.id, before=before, after=after)
|
|
|
|
if export:
|
|
try:
|
|
filepath = (
|
|
str(data_manager.cog_data_path(cog_instance=self))
|
|
+ str(os.sep)
|
|
+ filename
|
|
)
|
|
|
|
with open(filepath, "w", encoding="utf-8") as f:
|
|
dump(obj=moderations, fp=f, indent=2)
|
|
|
|
await interaction.followup.send(
|
|
file=discord.File(
|
|
fp=filepath, filename=filename
|
|
),
|
|
ephemeral=ephemeral,
|
|
)
|
|
|
|
os.remove(filepath)
|
|
except json.JSONDecodeError as e:
|
|
await interaction.followup.send(
|
|
content=error(
|
|
"An error occured while exporting the moderation history.\nError:\n"
|
|
)
|
|
+ box(text=e, lang="py"),
|
|
ephemeral=ephemeral,
|
|
)
|
|
return
|
|
|
|
case_quantity = len(moderations)
|
|
page_quantity = ceil(case_quantity / pagesize)
|
|
start_index = (page - 1) * pagesize
|
|
end_index = page * pagesize
|
|
|
|
embed = discord.Embed(color=await self.bot.get_embed_color(interaction.channel))
|
|
embed.set_author(icon_url=interaction.guild.icon.url, name="Infraction History")
|
|
embed.set_footer(
|
|
text=f"Page {page:,}/{page_quantity:,} | {case_quantity:,} Results"
|
|
)
|
|
|
|
memory_dict = {}
|
|
|
|
for mod in moderations[start_index:end_index]:
|
|
if mod.target_id not in memory_dict:
|
|
memory_dict.update({
|
|
str(mod.target_id): await mod.get_target()
|
|
})
|
|
target = memory_dict[str(mod.target_id)]
|
|
|
|
if mod.moderator_id not in memory_dict:
|
|
memory_dict.update({
|
|
str(mod.moderator_id): await mod.get_moderator()
|
|
})
|
|
moderator = memory_dict[str(mod.moderator_id)]
|
|
|
|
field_name = f"Case #{mod.id:,} ({mod.type.string.title()})"
|
|
field_value = f"**Target:** `{target.name}` ({target.id})\n**Moderator:** `{moderator.name}` ({moderator.id})"
|
|
|
|
if len(str(mod.reason)) > 125:
|
|
field_value += f"\n**Reason:** `{str(mod.reason)[:125]}...`"
|
|
else:
|
|
field_value += f"\n**Reason:** `{str(mod.reason)}`"
|
|
|
|
if mod.duration:
|
|
duration_embed = (
|
|
f"{humanize_timedelta(timedelta=mod.duration)} | <t:{int(mod.end_timestamp.timestamp())}:R>"
|
|
if mod.expired is False
|
|
else f"{humanize_timedelta(timedelta=mod.duration)} | Expired"
|
|
)
|
|
field_value += f"\n**Duration:** {duration_embed}"
|
|
|
|
field_value += (
|
|
f"\n**Timestamp:** <t:{int(mod.timestamp.timestamp())}> | <t:{int(mod.timestamp.timestamp())}:R>"
|
|
)
|
|
|
|
if mod.role_id:
|
|
role = await mod.get_role()
|
|
field_value += f"\n**Role:** {role.mention} ({role.id})"
|
|
|
|
if mod.resolved:
|
|
field_value += "\n**Resolved:** True"
|
|
|
|
embed.add_field(name=field_name, value=field_value, inline=inline)
|
|
|
|
await interaction.followup.send(embed=embed, ephemeral=ephemeral)
|
|
|
|
@app_commands.command(name="resolve")
|
|
async def resolve(
|
|
self, interaction: discord.Interaction, case: int, reason: str | None = None
|
|
):
|
|
"""Resolve a specific case.
|
|
|
|
Parameters
|
|
-----------
|
|
case: int
|
|
Case number of the case you're trying to resolve
|
|
reason: str
|
|
Reason for resolving case"""
|
|
permissions = check_permissions(
|
|
interaction.client.user,
|
|
("embed_links", "moderate_members", "ban_members"),
|
|
interaction,
|
|
)
|
|
if permissions:
|
|
await interaction.response.send_message(
|
|
error(
|
|
f"I do not have the `{permissions}` permission, required for this action."
|
|
),
|
|
ephemeral=True,
|
|
)
|
|
return
|
|
|
|
try:
|
|
moderation = await Moderation.find_by_id(interaction.client, case, interaction.guild.id)
|
|
except ValueError:
|
|
await interaction.response.send_message(
|
|
content=error(f"Case #{case:,} does not exist!"), ephemeral=True
|
|
)
|
|
return
|
|
if len(moderation.changes) > 25:
|
|
await interaction.response.send_message(
|
|
content=error(
|
|
"Due to limitations with Discord's embed system, you cannot edit a case more than 25 times."
|
|
),
|
|
ephemeral=True,
|
|
)
|
|
return
|
|
|
|
try:
|
|
success, msg = await moderation.resolve(interaction.user.id, reason)
|
|
except (ValueError, TypeError) as e:
|
|
if e == ValueError:
|
|
await interaction.response.send_message(
|
|
content=error("This case has already been resolved!"), ephemeral=True
|
|
)
|
|
elif e == TypeError:
|
|
await interaction.response.send_message(
|
|
content=error("This case type cannot be resolved!"), ephemeral=True
|
|
)
|
|
|
|
embed = await case_factory(
|
|
interaction=interaction,
|
|
moderation=moderation,
|
|
)
|
|
await interaction.response.send_message(
|
|
content=f"✅ Moderation #{case:,} resolved!\n" + error(f"Resolve handler returned an error message: `{msg}`") if success is False else "", embed=embed
|
|
)
|
|
ctx = await self.bot.get_context(interaction, cls=commands.Context)
|
|
await log(ctx=ctx, moderation_id=case, resolved=True)
|
|
|
|
@app_commands.command(name="case")
|
|
@app_commands.choices(
|
|
raw=[
|
|
Choice(name="Export as Codeblock", value="codeblock"),
|
|
Choice(name="Export as File", value="file"),
|
|
]
|
|
)
|
|
async def case(
|
|
self,
|
|
interaction: discord.Interaction,
|
|
case: int,
|
|
ephemeral: bool | None = None,
|
|
evidenceformat: bool = False,
|
|
changes: bool = False,
|
|
raw: Choice[str] | None = None,
|
|
):
|
|
"""Check the details of a specific case.
|
|
|
|
Parameters
|
|
-----------
|
|
case: int
|
|
What case are you looking up?
|
|
ephemeral: bool
|
|
Hide the command response
|
|
evidenceformat: bool
|
|
Display the evidence format of the case
|
|
changes: bool
|
|
List the changes made to the case
|
|
raw: bool
|
|
Export the case to a JSON file or codeblock"""
|
|
permissions = check_permissions(
|
|
interaction.client.user, ["embed_links"], interaction
|
|
)
|
|
if permissions:
|
|
await interaction.response.send_message(
|
|
error(
|
|
f"I do not have the `{permissions}` permission, required for this action."
|
|
),
|
|
ephemeral=True,
|
|
)
|
|
return
|
|
|
|
if ephemeral is None:
|
|
ephemeral = (
|
|
await config.user(interaction.user).history_ephemeral()
|
|
or await config.guild(interaction.guild).history_ephemeral()
|
|
or False
|
|
)
|
|
|
|
try:
|
|
mod = await Moderation.find_by_id(interaction.client, case, interaction.guild.id)
|
|
except ValueError:
|
|
await interaction.response.send_message(
|
|
content=error(f"Case #{case:,} does not exist!"), ephemeral=True
|
|
)
|
|
return
|
|
|
|
if raw:
|
|
if raw.value == "file" or len(mod.to_json(2)) > 1800:
|
|
filename = (
|
|
str(data_manager.cog_data_path(cog_instance=self))
|
|
+ str(os.sep)
|
|
+ f"moderation_{interaction.guild.id}_case_{case}.json"
|
|
)
|
|
|
|
with open(filename, "w", encoding="utf-8") as f:
|
|
mod.to_json(2, f)
|
|
if raw.value == "codeblock":
|
|
content = f"Case #{case:,} exported.\n" + warning(
|
|
"Case was too large to export as codeblock, so it has been uploaded as a `.json` file."
|
|
)
|
|
else:
|
|
content = f"Case #{case:,} exported."
|
|
|
|
await interaction.response.send_message(
|
|
content=content,
|
|
file=discord.File(
|
|
filename,
|
|
f"moderation_{interaction.guild.id}_case_{case}.json",
|
|
),
|
|
ephemeral=ephemeral,
|
|
)
|
|
|
|
os.remove(filename)
|
|
return
|
|
await interaction.response.send_message(
|
|
content=box(mod.to_json(2), 'json'),
|
|
ephemeral=ephemeral,
|
|
)
|
|
return
|
|
if changes:
|
|
embed = await changes_factory(
|
|
interaction=interaction, moderation=mod
|
|
)
|
|
await interaction.response.send_message(
|
|
embed=embed, ephemeral=ephemeral
|
|
)
|
|
elif evidenceformat:
|
|
content = await evidenceformat_factory(moderation=mod)
|
|
await interaction.response.send_message(
|
|
content=content, ephemeral=ephemeral
|
|
)
|
|
else:
|
|
embed = await case_factory(
|
|
interaction=interaction, moderation=mod
|
|
)
|
|
await interaction.response.send_message(
|
|
embed=embed, ephemeral=ephemeral
|
|
)
|
|
return
|
|
|
|
@app_commands.command(name="edit")
|
|
async def edit(
|
|
self,
|
|
interaction: discord.Interaction,
|
|
case: int,
|
|
reason: str,
|
|
duration: str | None = None,
|
|
):
|
|
"""Edit the reason of a specific case.
|
|
|
|
Parameters
|
|
-----------
|
|
case: int
|
|
What case are you editing?
|
|
reason: str
|
|
What is the new reason?
|
|
duration: str
|
|
What is the new duration? Does not reapply the moderation if it has already expired.
|
|
"""
|
|
permissions = check_permissions(
|
|
interaction.client.user, ["embed_links"], interaction
|
|
)
|
|
if permissions:
|
|
await interaction.response.send_message(
|
|
error(
|
|
f"I do not have the `{permissions}` permission, required for this action."
|
|
),
|
|
ephemeral=True,
|
|
)
|
|
return
|
|
|
|
try:
|
|
moderation = await Moderation.find_by_id(interaction.client, case, interaction.guild.id)
|
|
old_moderation = moderation
|
|
except ValueError:
|
|
await interaction.response.send_message(
|
|
content=error(f"Case #{case:,} does not exist!"), ephemeral=True
|
|
)
|
|
return
|
|
|
|
if len(moderation.changes) > 25:
|
|
return await interaction.response.send_message(
|
|
content=error(
|
|
"Due to limitations with Discord's embed system, you cannot edit a case more than 25 times."
|
|
),
|
|
ephemeral=True,
|
|
)
|
|
|
|
if duration:
|
|
moderation.duration = parse_timedelta(duration)
|
|
if moderation.duration is None:
|
|
return await interaction.response.send_message(
|
|
error("Please provide a valid duration!"), ephemeral=True
|
|
)
|
|
|
|
moderation.end_timestamp = moderation.timestamp + moderation.duration.total_seconds()
|
|
|
|
try:
|
|
success = await moderation.type.duration_edit_handler(interaction=interaction.client, old_moderation=old_moderation, new_moderation=moderation)
|
|
except NotImplementedError:
|
|
return await interaction.response.send_message(
|
|
error("This case type does not support duration editing!"), ephemeral=True
|
|
)
|
|
if not success:
|
|
return
|
|
|
|
if reason:
|
|
moderation.reason = reason
|
|
|
|
if not moderation.changes:
|
|
moderation.changes.append(Change.from_dict(interaction.client, {
|
|
"type": "ORIGINAL",
|
|
"timestamp": old_moderation.timestamp,
|
|
"reason": old_moderation.reason,
|
|
"user_id": old_moderation.moderator_id,
|
|
"duration": old_moderation.duration,
|
|
"end_timestamp": old_moderation.end_timestamp,
|
|
}))
|
|
if duration:
|
|
moderation.changes.append(Change.from_dict(interaction.client, {
|
|
"type": "EDIT",
|
|
"timestamp": int(time.time()),
|
|
"reason": reason,
|
|
"user_id": interaction.user.id,
|
|
"duration": moderation.duration,
|
|
"end_timestamp": moderation.end_timestamp,
|
|
}))
|
|
else:
|
|
moderation.changes.append(Change.from_dict(interaction.client, {
|
|
"type": "EDIT",
|
|
"timestamp": int(time.time()),
|
|
"reason": reason,
|
|
"user_id": interaction.user.id,
|
|
"duration": moderation.duration,
|
|
"end_timestamp": moderation.end_timestamp,
|
|
}))
|
|
|
|
await moderation.update()
|
|
embed = await case_factory(interaction=interaction, moderation=moderation)
|
|
|
|
await interaction.response.send_message(
|
|
content=f"✅ Moderation #{case:,} edited!",
|
|
embed=embed,
|
|
ephemeral=True,
|
|
)
|
|
await log(interaction, case)
|
|
|
|
return
|
|
|
|
@tasks.loop(minutes=1)
|
|
async def handle_expiry(self):
|
|
await self.bot.wait_until_red_ready()
|
|
current_time = time.time()
|
|
global_unban_num = 0
|
|
global_addrole_num = 0
|
|
global_removerole_num = 0
|
|
global_other_num = 0
|
|
|
|
guilds: list[discord.Guild] = self.bot.guilds
|
|
for guild in guilds:
|
|
if not await self.bot.cog_disabled_in_guild(self, guild):
|
|
time_per_guild = time.time()
|
|
|
|
query = f"SELECT * FROM moderation_{guild.id} WHERE end_timestamp IS NOT NULL AND end_timestamp <= ? AND expired = 0"
|
|
moderations = await Moderation.execute(bot=self.bot, guild_id=guild.id, query=query, parameters=(time.time(),))
|
|
|
|
unban_num = 0
|
|
removerole_num = 0
|
|
addrole_num = 0
|
|
other_num = 0
|
|
for moderation in moderations:
|
|
try:
|
|
num = await moderation.type.expiry_handler(self.bot, guild, moderation)
|
|
except NotImplementedError:
|
|
logger.warning("Expiry handler not implemented for expirable moderation type %s", moderation.type.key)
|
|
continue
|
|
match moderation.type.key:
|
|
case "tempban":
|
|
unban_num += num
|
|
case "addrole":
|
|
removerole_num += num
|
|
case "removerole":
|
|
addrole_num += num
|
|
case _:
|
|
other_num += num if isinstance(num, int) else 0
|
|
|
|
expiry_query = f"UPDATE `moderation_{guild.id}` SET expired = 1 WHERE (end_timestamp IS NOT NULL AND end_timestamp <= ? AND expired = 0) OR (expired = 0 AND resolved = 1);"
|
|
await Moderation.execute(bot=self.bot, guild_id=guild.id, query=expiry_query, parameters=(time.time(),), return_obj=False)
|
|
|
|
per_guild_completion_time = (time.time() - time_per_guild) * 1000
|
|
logger.debug(
|
|
"Completed expiry loop for %s (%s) in %sms with %s users unbanned, %s roles added, and %s roles removed (%s other cases expired)",
|
|
guild.name,
|
|
guild.id,
|
|
f"{per_guild_completion_time:.6f}",
|
|
unban_num,
|
|
addrole_num,
|
|
removerole_num,
|
|
other_num
|
|
)
|
|
global_unban_num = global_unban_num + unban_num
|
|
global_addrole_num = global_addrole_num + addrole_num
|
|
global_removerole_num = global_removerole_num + removerole_num
|
|
global_other_num = global_other_num + other_num
|
|
|
|
|
|
completion_time = (time.time() - current_time) * 1000
|
|
logger.debug(
|
|
"Completed expiry loop in %sms with %s users unbanned, %s roles added, and %s roles removed (%s other cases expired)",
|
|
f"{completion_time:.6f}",
|
|
global_unban_num,
|
|
global_addrole_num,
|
|
global_removerole_num,
|
|
global_other_num
|
|
)
|
|
|
|
########################################################################################################################
|
|
### Configuration Commands #
|
|
########################################################################################################################
|
|
|
|
@commands.group(autohelp=True, aliases=["moderation", "mod"])
|
|
async def aurora(self, ctx: commands.Context):
|
|
"""Settings and miscellaneous commands for Aurora."""
|
|
|
|
@aurora.group(autohelp=True, name="settings", aliases=["config", "options", "set"])
|
|
async def aurora_settings(self, ctx: commands.Context):
|
|
"""Configure Aurora's settings."""
|
|
|
|
@aurora_settings.command(name="overrides", aliases=["override", "user"])
|
|
async def aurora_settings_overrides(self, ctx: commands.Context):
|
|
"""Manage Aurora's user overriddable settings."""
|
|
msg = await ctx.send(embed=await overrides_embed(ctx))
|
|
await msg.edit(view=Overrides(ctx, msg, 60))
|
|
|
|
@aurora_settings.command(name="guild", aliases=["server"])
|
|
@commands.admin_or_permissions(manage_guild=True)
|
|
@commands.guild_only()
|
|
async def aurora_settings_guild(self, ctx: commands.Context):
|
|
"""Manage Aurora's guild settings."""
|
|
msg = await ctx.send(embed=await guild_embed(ctx))
|
|
await msg.edit(view=Guild(ctx, msg, 60))
|
|
|
|
@aurora_settings.command(name="type")
|
|
@commands.admin_or_permissions(manage_guild=True)
|
|
@commands.guild_only()
|
|
async def aurora_settings_type(self, ctx: commands.Context, moderation_type: str):
|
|
"""Manage configuration options for specific moderation types.
|
|
|
|
See [the documentation](https://seacogs.coastalcommits.com/Aurora/Types) for a list of built-in moderation types."""
|
|
registered_type = type_registry.get(moderation_type)
|
|
if not registered_type:
|
|
types = "`, `".join(type_registry.keys())
|
|
await ctx.send(error("`moderation_type` is not a valid moderation type.\nValid types are:\n" + types))
|
|
return
|
|
msg = await ctx.send(embed=await type_embed(ctx, registered_type))
|
|
await msg.edit(view=Types(ctx, msg, registered_type))
|
|
|
|
@aurora_settings.command(name="addrole", aliases=["removerole"])
|
|
@commands.admin_or_permissions(manage_guild=True)
|
|
@commands.guild_only()
|
|
async def aurora_settings_addrole(self, ctx: commands.Context):
|
|
"""Manage the addrole whitelist.
|
|
|
|
Roles added to this list are also applied to `/removerole`."""
|
|
msg = await ctx.send(embed=await addrole_embed(ctx))
|
|
await msg.edit(view=Addrole(ctx, msg, 60))
|
|
|
|
@aurora_settings.command(name="immunity")
|
|
@commands.admin_or_permissions(manage_guild=True)
|
|
@commands.guild_only()
|
|
async def aurora_settings_immunity(self, ctx: commands.Context):
|
|
"""Manage the immunity whitelist."""
|
|
msg = await ctx.send(embed=await immune_embed(ctx))
|
|
await msg.edit(view=Immune(ctx, msg, 60))
|
|
|
|
@aurora.group(autohelp=True, name="import")
|
|
@commands.admin()
|
|
@commands.guild_only()
|
|
async def aurora_import(self, ctx: commands.Context):
|
|
"""Import moderation history from other bots."""
|
|
|
|
@aurora_import.command(name="aurora")
|
|
@commands.admin()
|
|
async def aurora_import_aurora(self, ctx: commands.Context):
|
|
"""Import moderation history from another bot using Aurora."""
|
|
if (
|
|
ctx.message.attachments
|
|
and ctx.message.attachments[0].content_type
|
|
== "application/json; charset=utf-8"
|
|
):
|
|
message = await ctx.send(
|
|
warning(
|
|
"Are you sure you want to import moderations from another bot?\n**This will overwrite any moderations that already exist in this guild's moderation table.**\n*The import process will block the rest of your bot until it is complete.*"
|
|
)
|
|
)
|
|
await message.edit(view=ImportAuroraView(60, ctx, message))
|
|
else:
|
|
await ctx.send(error("Please provide a valid Aurora export file."))
|
|
|
|
@aurora_import.command(name="galacticbot")
|
|
@commands.admin()
|
|
async def aurora_import_galacticbot(self, ctx: commands.Context):
|
|
"""Import moderation history from GalacticBot."""
|
|
if (
|
|
ctx.message.attachments
|
|
and ctx.message.attachments[0].content_type
|
|
== "application/json; charset=utf-8"
|
|
):
|
|
message = await ctx.send(
|
|
warning(
|
|
"Are you sure you want to import GalacticBot moderations?\n**This will overwrite any moderations that already exist in this guild's moderation table.**\n*The import process will block the rest of your bot until it is complete.*"
|
|
)
|
|
)
|
|
await message.edit(view=ImportGalacticBotView(60, ctx, message))
|
|
else:
|
|
await ctx.send(
|
|
error("Please provide a valid GalacticBot moderation export file.")
|
|
)
|
|
|
|
@aurora.command(aliases=["tdc", "td", "timedeltaconvert"])
|
|
async def timedelta(self, ctx: commands.Context, *, duration: str) -> None:
|
|
"""Convert a string to a timedelta.
|
|
|
|
This command converts a duration to a [`timedelta`](https://docs.python.org/3/library/datetime.html#datetime.timedelta) Python object.
|
|
You cannot convert years or months as they are not fixed units. Use `[p]aurora relativedelta` for that.
|
|
|
|
**Example usage**
|
|
`[p]aurora timedelta 1 day 15hr 82 minutes 52s`
|
|
**Output**
|
|
`1 day, 16:22:52`"""
|
|
parsed_time = parse_timedelta(duration)
|
|
if parsed_time is None:
|
|
await ctx.send(error("Please provide a convertible value!"))
|
|
return
|
|
await ctx.send(f"`{parsed_time}`")
|
|
|
|
@aurora.command(aliases=["rdc", "rd", "relativedeltaconvert"])
|
|
async def relativedelta(self, ctx: commands.Context, *, duration: str) -> None:
|
|
"""Convert a string to a relativedelta.
|
|
|
|
This command converts a duration to a [`relativedelta`](https://dateutil.readthedocs.io/en/stable/relativedelta.html) Python object.
|
|
|
|
**Example usage**
|
|
`[p]aurora relativedelta 3 years 1 day 15hr 82 minutes 52s`
|
|
**Output**
|
|
`relativedelta(years=+3, days=+1, hours=+15, minutes=+82, seconds=+52)`"""
|
|
parsed_time = parse_relativedelta(duration)
|
|
if parsed_time is None:
|
|
await ctx.send(error("Please provide a convertible value!"))
|
|
return
|
|
await ctx.send(f"`{parsed_time}`")
|