SeaCogs/aurora/aurora.py
2024-01-15 14:06:58 +00:00

1034 lines
48 KiB
Python

# _____ _
# / ____| (_)
# | (___ ___ __ _ _____ ___ _ __ ___ _ __ ___ ___ _ __
# \___ \ / _ \/ _` / __\ \ /\ / / | '_ ` _ \| '_ ` _ \ / _ \ '__|
# ____) | __/ (_| \__ \\ V V /| | | | | | | | | | | | __/ |
# |_____/ \___|\__,_|___/ \_/\_/ |_|_| |_| |_|_| |_| |_|\___|_|
import json
import os
import time
import sqlite3
from datetime import datetime, timedelta, timezone
from math import ceil
import discord
import humanize
from discord.ext import tasks
from pytimeparse2 import disable_dateutil, parse
from redbot.core import app_commands, commands, data_manager
from redbot.core.app_commands import Choice
from redbot.core.bot import Red
from redbot.core.utils.chat_formatting import box, error, warning
from .abc import CompositeMetaClass
from .configuration.commands import Configuration
from .utilities.config import config, register_config
from .utilities.database import connect, create_guild_table, fetch_case, mysql_log
from .utilities.factory import case_factory, changes_factory, evidenceformat_factory, message_factory
from .utilities.logger import logger
from .utilities.utils import convert_timedelta_to_str, check_moddable, check_permissions, fetch_channel_dict, fetch_user_dict, generate_dict, log, send_evidenceformat
class Aurora(Configuration, commands.Cog, metaclass=CompositeMetaClass): # pylint: disable=too-many-ancestors
"""Aurora is a fully-featured moderation system.
It is heavily inspired by GalacticBot, and is designed to be a more user-friendly alternative to Red's core Mod cogs.
This cog stores all of its data in an SQLite database."""
__author__ = "SeaswimmerTheFsh"
__version__ = "2.0.2"
async def red_delete_data_for_user(self, *, requester, user_id: int):
if requester == "discord_deleted_user":
await config.user_from_id(user_id).clear()
database = connect()
cursor = database.cursor()
cursor.execute("SHOW TABLES;")
tables = [table[0] for table in cursor.fetchall()]
condition = "target_id = %s OR moderator_id = %s;"
for table in tables:
delete_query = f"DELETE FROM {table[0]} WHERE {condition}"
cursor.execute(delete_query, (user_id, user_id))
database.commit()
cursor.close()
database.close()
if requester == "owner":
await config.user_from_id(user_id).clear()
if requester == "user":
await config.user_from_id(user_id).clear()
if requester == "user_strict":
await config.user_from_id(user_id).clear()
else:
logger.warning("Invalid requester passed to red_delete_data_for_user: %s", requester)
def __init__(self, bot: Red):
super().__init__()
self.bot = bot
register_config(config)
disable_dateutil()
self.handle_expiry.start() # pylint: disable=no-member
async def cog_load(self):
"""This method prepares the database schema for all of the guilds the bot is currently in."""
guilds: list[discord.Guild] = self.bot.guilds
try:
for guild in guilds:
if not await self.bot.cog_disabled_in_guild(self, guild):
await create_guild_table(guild)
except ConnectionRefusedError:
return
async def cog_unload(self):
self.handle_expiry.cancel() # pylint: disable=no-member
@commands.Cog.listener('on_guild_join')
async def db_generate_guild_join(self, guild: discord.Guild):
"""This method prepares the database schema whenever the bot joins a guild."""
if not await self.bot.cog_disabled_in_guild(self, guild):
try:
await create_guild_table(guild)
except ConnectionRefusedError:
return
@commands.Cog.listener('on_audit_log_entry_create')
async def autologger(self, entry: discord.AuditLogEntry):
"""This method automatically logs moderations done by users manually ("right clicks")."""
if not await self.bot.cog_disabled_in_guild(self, entry.guild):
if await config.guild(entry.guild).ignore_other_bots() is True:
if entry.user.bot or entry.target.bot:
return
else:
if entry.user.id == self.bot.user.id:
return
duration = "NULL"
if entry.reason:
reason = entry.reason + " (This action was performed without the bot.)"
else:
reason = "This action was performed without the bot."
if entry.action == discord.AuditLogAction.kick:
moderation_type = 'KICK'
elif entry.action == discord.AuditLogAction.ban:
moderation_type = 'BAN'
elif entry.action == discord.AuditLogAction.unban:
moderation_type = 'UNBAN'
elif entry.action == discord.AuditLogAction.member_update:
if entry.after.timed_out_until is not None:
timed_out_until_aware = entry.after.timed_out_until.replace(tzinfo=timezone.utc)
duration_datetime = timed_out_until_aware - datetime.now(tz=timezone.utc)
minutes = round(duration_datetime.total_seconds() / 60)
duration = timedelta(minutes=minutes)
moderation_type = 'MUTE'
else:
moderation_type = 'UNMUTE'
else:
return
await mysql_log(entry.guild.id, entry.user.id, moderation_type, 'USER', entry.target.id, 0, duration, reason)
#######################################################################################################################
### COMMANDS
#######################################################################################################################
@app_commands.command(name="note")
async def note(self, interaction: discord.Interaction, target: discord.User, reason: str, silent: bool = None):
"""Add a note to a user.
Parameters
-----------
target: discord.User
Who are you noting?
reason: str
Why are you noting this user?
silent: bool
Should the user be messaged?"""
if not await check_moddable(target, interaction, ['moderate_members']):
return
await interaction.response.send_message(content=f"{target.mention} has recieved a note!\n**Reason** - `{reason}`")
if silent is None:
silent = not await config.guild(interaction.guild).dm_users()
if silent is False:
try:
embed = await message_factory(await self.bot.get_embed_color(interaction.channel), guild=interaction.guild, moderator=interaction.user, reason=reason, moderation_type='note', response=await interaction.original_response())
await target.send(embed=embed)
except discord.errors.HTTPException:
pass
moderation_id = await mysql_log(interaction.guild.id, interaction.user.id, 'NOTE', 'USER', target.id, 0, 'NULL', reason)
await interaction.edit_original_response(content=f"{target.mention} has received a note! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`")
await log(interaction, moderation_id)
case = await fetch_case(moderation_id, interaction.guild.id)
await send_evidenceformat(interaction, case)
@app_commands.command(name="warn")
async def warn(self, interaction: discord.Interaction, target: discord.Member, reason: str, silent: bool = None):
"""Warn a user.
Parameters
-----------
target: discord.Member
Who are you warning?
reason: str
Why are you warning this user?
silent: bool
Should the user be messaged?"""
if not await check_moddable(target, interaction, ['moderate_members']):
return
await interaction.response.send_message(content=f"{target.mention} has been warned!\n**Reason** - `{reason}`")
if silent is None:
silent = not await config.guild(interaction.guild).dm_users()
if silent is False:
try:
embed = await message_factory(await self.bot.get_embed_color(interaction.channel), guild=interaction.guild, moderator=interaction.user, reason=reason, moderation_type='warned', response=await interaction.original_response())
await target.send(embed=embed)
except discord.errors.HTTPException:
pass
moderation_id = await mysql_log(interaction.guild.id, interaction.user.id, 'WARN', 'USER', target.id, 0, 'NULL', reason)
await interaction.edit_original_response(content=f"{target.mention} has been warned! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`")
await log(interaction, moderation_id)
case = await fetch_case(moderation_id, interaction.guild.id)
await send_evidenceformat(interaction, case)
@app_commands.command(name="blacklist")
async def blacklist(self, interaction: discord.Interaction, target: discord.Member, role: str, reason: str, duration: str = None, silent: bool = None):
"""Add a blacklist role to a user.
Parameters
-----------
target: discord.Member
Who are you blacklisting?
role: str
What blacklist role are you applying to the target?
reason: str
Why are you blacklisting this user?
duration: str
How long are you blacklisting this user for?
silent: bool
Should the user be messaged?"""
blacklist_roles = await config.guild(interaction.guild).blacklist_roles()
if not blacklist_roles:
await interaction.response.send_message(content=error("There are no blacklist roles set for this server!"), ephemeral=True)
return
matching_role = None
for role_dict in blacklist_roles:
if role_dict['id'] == role:
matching_role = role_dict
break
if not matching_role:
await interaction.response.send_message(content=error("Please provide a valid blacklist role!"), ephemeral=True)
return
if not await check_moddable(target, interaction, ['moderate_members', 'manage_roles']):
return
if role in [role.id for role in target.roles]:
await interaction.response.send_message(content=error(f"{target.mention} already has the blacklist role!"), ephemeral=True)
return
if silent is None:
silent = not await config.guild(interaction.guild).dm_users()
if silent is False:
try:
embed = await message_factory(await self.bot.get_embed_color(interaction.channel), guild=interaction.guild, moderator=interaction.user, reason=reason, moderation_type='blacklisted', response=await interaction.original_response(), duration=matching_role['duration'])
await target.send(embed=embed)
except discord.errors.HTTPException:
pass
role_obj = interaction.guild.get_role(role)
await target.add_roles(role, reason=f"Blacklisted by {interaction.user.id} for {humanize.precisedelta(duration)} for: {reason}")
await interaction.response.send_message(content=f"{target.mention} has been blacklisted with the {role_obj.name} role for {humanize.precisedelta(matching_role['duration'])}!\n**Reason** - `{reason}`")
moderation_id = await mysql_log(interaction.guild.id, interaction.user.id, 'BLACKLIST', 'USER', target.id, role, duration, reason)
await interaction.edit_original_response(content=f"{target.mention} has been blacklisted with the {role_obj.name} role for {humanize.precisedelta(matching_role['duration'])}! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`")
await log(interaction, moderation_id)
case = await fetch_case(moderation_id, interaction.guild.id)
await send_evidenceformat(interaction, case)
@app_commands.command(name="mute")
async def mute(self, interaction: discord.Interaction, target: discord.Member, duration: str, reason: str, silent: bool = None):
"""Mute a user.
Parameters
-----------
target: discord.Member
Who are you unbanning?
duration: str
How long are you muting this user for?
reason: str
Why are you unbanning this user?
silent: bool
Should the user be messaged?"""
if not await check_moddable(target, interaction, ['moderate_members']):
return
if target.is_timed_out() is True:
await interaction.response.send_message(error(f"{target.mention} is already muted!"), allowed_mentions=discord.AllowedMentions(users=False), ephemeral=True)
return
try:
parsed_time = parse(sval=duration, as_timedelta=True, raise_exception=True)
except ValueError:
await interaction.response.send_message(error("Please provide a valid duration!"), ephemeral=True)
return
if parsed_time.total_seconds() / 1000 > 2419200000:
await interaction.response.send_message(error("Please provide a duration that is less than 28 days."))
return
await target.timeout(parsed_time, reason=f"Muted by {interaction.user.id} for: {reason}")
await interaction.response.send_message(content=f"{target.mention} has been muted for {humanize.precisedelta(parsed_time)}!\n**Reason** - `{reason}`")
if silent is None:
silent = not await config.guild(interaction.guild).dm_users()
if silent is False:
try:
embed = await message_factory(await self.bot.get_embed_color(interaction.channel), guild=interaction.guild, moderator=interaction.user, reason=reason, moderation_type='muted', response=await interaction.original_response(), duration=parsed_time)
await target.send(embed=embed)
except discord.errors.HTTPException:
pass
moderation_id = await mysql_log(interaction.guild.id, interaction.user.id, 'MUTE', 'USER', target.id, 0, parsed_time, reason)
await interaction.edit_original_response(content=f"{target.mention} has been muted for {humanize.precisedelta(parsed_time)}! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`")
await log(interaction, moderation_id)
case = await fetch_case(moderation_id, interaction.guild.id)
await send_evidenceformat(interaction, case)
@app_commands.command(name="unmute")
async def unmute(self, interaction: discord.Interaction, target: discord.Member, reason: str = None, silent: bool = None):
"""Unmute a user.
Parameters
-----------
target: discord.user
Who are you unmuting?
reason: str
Why are you unmuting this user?
silent: bool
Should the user be messaged?"""
if not await check_moddable(target, interaction, ['moderate_members']):
return
if target.is_timed_out() is False:
await interaction.response.send_message(error(f"{target.mention} is not muted!"), allowed_mentions=discord.AllowedMentions(users=False), ephemeral=True)
return
if reason:
await target.timeout(None, reason=f"Unmuted by {interaction.user.id} for: {reason}")
else:
await target.timeout(None, reason=f"Unbanned by {interaction.user.id}")
reason = "No reason given."
await interaction.response.send_message(content=f"{target.mention} has been unmuted!\n**Reason** - `{reason}`")
if silent is None:
silent = not await config.guild(interaction.guild).dm_users()
if silent is False:
try:
embed = await message_factory(await self.bot.get_embed_color(interaction.channel), guild=interaction.guild, moderator=interaction.user, reason=reason, moderation_type='unmuted', response=await interaction.original_response())
await target.send(embed=embed)
except discord.errors.HTTPException:
pass
moderation_id = await mysql_log(interaction.guild.id, interaction.user.id, 'UNMUTE', 'USER', target.id, 0, 'NULL', reason)
await interaction.edit_original_response(content=f"{target.mention} has been unmuted! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`")
await log(interaction, moderation_id)
case = await fetch_case(moderation_id, interaction.guild.id)
await send_evidenceformat(interaction, case)
@app_commands.command(name="kick")
async def kick(self, interaction: discord.Interaction, target: discord.Member, reason: str, silent: bool = None):
"""Kick a user.
Parameters
-----------
target: discord.user
Who are you kicking?
reason: str
Why are you kicking this user?
silent: bool
Should the user be messaged?"""
if not await check_moddable(target, interaction, ['kick_members']):
return
await interaction.response.send_message(content=f"{target.mention} has been kicked!\n**Reason** - `{reason}`")
if silent is None:
silent = not await config.guild(interaction.guild).dm_users()
if silent is False:
try:
embed = await message_factory(await self.bot.get_embed_color(interaction.channel), guild=interaction.guild, moderator=interaction.user, reason=reason, moderation_type='kicked', response=await interaction.original_response())
await target.send(embed=embed)
except discord.errors.HTTPException:
pass
await target.kick(reason=f"Kicked by {interaction.user.id} for: {reason}")
moderation_id = await mysql_log(interaction.guild.id, interaction.user.id, 'KICK', 'USER', target.id, 0, 'NULL', reason)
await interaction.edit_original_response(content=f"{target.mention} has been kicked! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`")
await log(interaction, moderation_id)
case = await fetch_case(moderation_id, interaction.guild.id)
await send_evidenceformat(interaction, case)
@app_commands.command(name="ban")
@app_commands.choices(delete_messages=[
Choice(name="None", value=0),
Choice(name='1 Hour', value=3600),
Choice(name='12 Hours', value=43200),
Choice(name='1 Day', value=86400),
Choice(name='3 Days', value=259200),
Choice(name='7 Days', value=604800),
])
async def ban(self, interaction: discord.Interaction, target: discord.User, reason: str, duration: str = None, delete_messages: Choice[int] = None, silent: bool = None):
"""Ban a user.
Parameters
-----------
target: discord.user
Who are you banning?
duration: str
How long are you banning this user for?
reason: str
Why are you banning this user?
delete_messages: Choices[int]
How many days of messages to delete?
silent: bool
Should the user be messaged?"""
if not await check_moddable(target, interaction, ['ban_members']):
return
if delete_messages is None:
delete_messages_seconds = 0
else:
delete_messages_seconds = delete_messages.value
try:
await interaction.guild.fetch_ban(target)
await interaction.response.send_message(content=error(f"{target.mention} is already banned!"), ephemeral=True)
return
except discord.errors.NotFound:
pass
if duration:
try:
parsed_time = parse(sval=duration, as_timedelta=True, raise_exception=True)
except ValueError:
await interaction.response.send_message(error("Please provide a valid duration!"), ephemeral=True)
return
await interaction.response.send_message(content=f"{target.mention} has been banned for {humanize.precisedelta(parsed_time)}!\n**Reason** - `{reason}`")
try:
embed = await message_factory(await self.bot.get_embed_color(interaction.channel) , guild=interaction.guild, moderator=interaction.user, reason=reason, moderation_type='tempbanned', response=await interaction.original_response(), duration=parsed_time)
await target.send(embed=embed)
except discord.errors.HTTPException:
pass
await interaction.guild.ban(target, reason=f"Tempbanned by {interaction.user.id} for: {reason} (Duration: {parsed_time})", delete_message_seconds=delete_messages_seconds)
moderation_id = await mysql_log(interaction.guild.id, interaction.user.id, 'TEMPBAN', 'USER', target.id, 0, parsed_time, reason)
await interaction.edit_original_response(content=f"{target.mention} has been banned for {humanize.precisedelta(parsed_time)}! (Case `#{moderation_id}`)\n**Reason** - `{reason}`")
await log(interaction, moderation_id)
case = await fetch_case(moderation_id, interaction.guild.id)
await send_evidenceformat(interaction, case)
else:
await interaction.response.send_message(content=f"{target.mention} has been banned!\n**Reason** - `{reason}`")
if silent is None:
silent = not await config.guild(interaction.guild).dm_users()
if silent is False:
try:
embed = embed = await message_factory(await self.bot.get_embed_color(interaction.channel), guild=interaction.guild, moderator=interaction.user, reason=reason, moderation_type='banned', response=await interaction.original_response())
await target.send(embed=embed)
except discord.errors.HTTPException:
pass
await interaction.guild.ban(target, reason=f"Banned by {interaction.user.id} for: {reason}", delete_message_seconds=delete_messages_seconds)
moderation_id = await mysql_log(interaction.guild.id, interaction.user.id, 'BAN', 'USER', target.id, 0, 'NULL', reason)
await interaction.edit_original_response(content=f"{target.mention} has been banned! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`")
await log(interaction, moderation_id)
case = await fetch_case(moderation_id, interaction.guild.id)
await send_evidenceformat(interaction, case)
@app_commands.command(name="unban")
async def unban(self, interaction: discord.Interaction, target: discord.User, reason: str = None, silent: bool = None):
"""Unban a user.
Parameters
-----------
target: discord.user
Who are you unbanning?
reason: str
Why are you unbanning this user?
silent: bool
Should the user be messaged?"""
if not await check_moddable(target, interaction, ['ban_members']):
return
try:
await interaction.guild.fetch_ban(target)
except discord.errors.NotFound:
await interaction.response.send_message(content=error(f"{target.mention} is not banned!"), ephemeral=True)
return
if reason:
await interaction.guild.unban(target, reason=f"Unbanned by {interaction.user.id} for: {reason}")
else:
await interaction.guild.unban(target, reason=f"Unbanned by {interaction.user.id}")
reason = "No reason given."
await interaction.response.send_message(content=f"{target.mention} has been unbanned!\n**Reason** - `{reason}`")
if silent is None:
silent = not await config.guild(interaction.guild).dm_users()
if silent is False:
try:
embed = await message_factory(await self.bot.get_embed_color(interaction.channel), guild=interaction.guild, moderator=interaction.user, reason=reason, moderation_type='unbanned', response=await interaction.original_response())
await target.send(embed=embed)
except discord.errors.HTTPException:
pass
moderation_id = await mysql_log(interaction.guild.id, interaction.user.id, 'UNBAN', 'USER', target.id, 0, 'NULL', reason)
await interaction.edit_original_response(content=f"{target.mention} has been unbanned! (Case `#{moderation_id:,}`)\n**Reason** - `{reason}`")
await log(interaction, moderation_id)
case = await fetch_case(moderation_id, interaction.guild.id)
await send_evidenceformat(interaction, case)
@app_commands.command(name="history")
async def history(self, interaction: discord.Interaction, target: discord.User = None, moderator: discord.User = None, pagesize: app_commands.Range[int, 1, 20] = None, page: int = 1, ephemeral: bool = None, inline: bool = None, export: bool = False):
"""List previous infractions.
Parameters
-----------
target: discord.User
User whose infractions to query, overrides moderator if both are given
moderator: discord.User
Query by moderator
pagesize: app_commands.Range[int, 1, 25]
Amount of infractions to list per page
page: int
Page to select
ephemeral: bool
Hide the command response
inline: bool
Display infractions in a grid arrangement (does not look very good)
export: bool
Exports the server's entire moderation history to a JSON file"""
if ephemeral is None:
ephemeral = (await config.user(interaction.user).history_ephemeral()
or await config.guild(interaction.guild).history_ephemeral()
or False)
if inline is None:
inline = (await config.user(interaction.user).history_inline()
or await config.guild(interaction.guild).history_inline()
or False)
if pagesize is None:
if inline is True:
pagesize = (await config.user(interaction.user).history_inline_pagesize()
or await config.guild(interaction.guild).history_inline_pagesize()
or 6)
else:
pagesize = (await config.user(interaction.user).history_pagesize()
or await config.guild(interaction.guild).history_pagesize()
or 5)
await interaction.response.defer(ephemeral=ephemeral)
permissions = check_permissions(interaction.client.user, ['embed_links'], interaction)
if permissions:
await interaction.followup.send(error(f"I do not have the `{permissions}` permission, required for this action."), ephemeral=True)
return
database = connect()
if export:
database.row_factory = sqlite3.Row
cursor = database.cursor()
query = f"""SELECT *
FROM moderation_{interaction.guild.id}
ORDER BY moderation_id DESC;"""
cursor.execute(query)
results = cursor.fetchall()
cases = []
for result in results:
case = dict(result)
cases.append(case)
try:
filename = str(data_manager.cog_data_path(cog_instance=self)) + str(os.sep) + f"moderation_{interaction.guild.id}.json"
with open(filename, "w", encoding="utf-8") as f:
json.dump(cases, f, indent=2)
await interaction.followup.send(file=discord.File(filename, f"moderation_{interaction.guild.id}.json"), ephemeral=ephemeral)
os.remove(filename)
except json.JSONDecodeError as e:
await interaction.followup.send(content=error("An error occured while exporting the moderation history.\nError:\n") + box(e, 'py'), ephemeral=ephemeral)
cursor.close()
database.close()
return
cursor = database.cursor()
if target:
query = f"""SELECT *
FROM moderation_{interaction.guild.id}
WHERE target_id = ?
ORDER BY moderation_id DESC;"""
cursor.execute(query, (target.id,))
elif moderator:
query = f"""SELECT *
FROM moderation_{interaction.guild.id}
WHERE moderator_id = ?
ORDER BY moderation_id DESC;"""
cursor.execute(query, (moderator.id,))
else:
query = f"""SELECT *
FROM moderation_{interaction.guild.id}
ORDER BY moderation_id DESC;"""
cursor.execute(query)
results = cursor.fetchall()
result_dict_list = []
for result in results:
case_dict = generate_dict(result)
if case_dict['moderation_id'] == 0:
continue
result_dict_list.append(case_dict)
case_quantity = len(result_dict_list)
page_quantity = ceil(case_quantity / pagesize)
start_index = (page - 1) * pagesize
end_index = page * pagesize
embed = discord.Embed(color=await self.bot.get_embed_color(interaction.channel))
embed.set_author(icon_url=interaction.guild.icon.url, name='Infraction History')
embed.set_footer(text=f"Page {page:,}/{page_quantity:,} | {case_quantity:,} Results")
memory_dict = {}
for case in result_dict_list[start_index:end_index]:
if case['target_id'] not in memory_dict:
if case['target_type'] == 'USER':
memory_dict[str(case['target_id'])] = await fetch_user_dict(interaction, case['target_id'])
elif case['target_type'] == 'CHANNEL':
memory_dict[str(case['target_id'])] = await fetch_channel_dict(interaction, case['target_id'])
target_user = memory_dict[str(case['target_id'])]
if case['target_type'] == 'USER':
target_name = f"`{target_user['name']}`" if target_user['discriminator'] == "0" else f"`{target_user['name']}#{target_user['discriminator']}`"
elif case['target_type'] == 'CHANNEL':
target_name = f"`{target_user['mention']}`"
if case['moderator_id'] not in memory_dict:
memory_dict[str(case['moderator_id'])] = await fetch_user_dict(interaction, case['moderator_id'])
moderator_user = memory_dict[str(case['moderator_id'])]
moderator_name = f"`{moderator_user['name']}`" if moderator_user['discriminator'] == "0" else f"`{moderator_user['name']}#{moderator_user['discriminator']}`"
field_name = f"Case #{case['moderation_id']:,} ({str.title(case['moderation_type'])})"
field_value = f"**Target:** {target_name} ({target_user['id']})\n**Moderator:** {moderator_name} ({moderator_user['id']})"
if len(case['reason']) > 125:
field_value += f"\n**Reason:** `{str(case['reason'])[:125]}...`"
else:
field_value += f"\n**Reason:** `{str(case['reason'])}`"
if case['duration'] != 'NULL':
td = timedelta(**{unit: int(val) for unit, val in zip(["hours", "minutes", "seconds"], case["duration"].split(":"))})
duration_embed = f"{humanize.precisedelta(td)} | <t:{case['end_timestamp']}:R>" if bool(case['expired']) is False else f"{humanize.precisedelta(td)} | Expired"
field_value += f"\n**Duration:** {duration_embed}"
field_value += f"\n**Timestamp:** <t:{case['timestamp']}> | <t:{case['timestamp']}:R>"
if bool(case['resolved']):
field_value += "\n**Resolved:** True"
embed.add_field(name=field_name, value=field_value, inline=inline)
await interaction.followup.send(embed=embed, ephemeral=ephemeral)
@app_commands.command(name="resolve")
async def resolve(self, interaction: discord.Interaction, case: int, reason: str = None):
"""Resolve a specific case.
Parameters
-----------
case: int
Case number of the case you're trying to resolve
reason: str
Reason for resolving case"""
permissions = check_permissions(interaction.client.user, ['embed_links', 'moderate_members', 'ban_members'], interaction)
if permissions:
await interaction.response.send_message(error(f"I do not have the `{permissions}` permission, required for this action."), ephemeral=True)
return
database = connect()
cursor = database.cursor()
query_1 = f"SELECT * FROM moderation_{interaction.guild.id} WHERE moderation_id = ?;"
cursor.execute(query_1, (case,))
result_1 = cursor.fetchone()
if result_1 is None or case == 0:
await interaction.response.send_message(content=error(f"There is no moderation with a case number of {case}."), ephemeral=True)
return
query_2 = f"SELECT * FROM moderation_{interaction.guild.id} WHERE moderation_id = ? AND resolved = 0;"
cursor.execute(query_2, (case,))
result_2 = cursor.fetchone()
if result_2 is None:
await interaction.response.send_message(content=error(f"This moderation has already been resolved!\nUse `/case {case}` for more information."), ephemeral=True)
return
case_dict = generate_dict(result_2)
if reason is None:
reason = "No reason given."
changes: list = case_dict['changes']
if len(changes) > 25:
await interaction.response.send_message(content=error("Due to limitations with Discord's embed system, you cannot edit a case more than 25 times."), ephemeral=True)
return
if not changes:
changes.append(
{
'type': "ORIGINAL",
'timestamp': case_dict['timestamp'],
'reason': case_dict['reason'],
'user_id': case_dict['moderator_id']
}
)
changes.append(
{
'type': "RESOLVE",
'timestamp': int(time.time()),
'reason': reason,
'user_id': interaction.user.id
}
)
if case_dict['moderation_type'] in ['UNMUTE', 'UNBAN']:
await interaction.response.send_message(content=error("You cannot resolve this type of moderation!"), ephemeral=True)
return
if case_dict['moderation_type'] in ['MUTE', 'TEMPBAN', 'BAN']:
if case_dict['moderation_type'] == 'MUTE':
try:
member = await interaction.guild.fetch_member(case_dict['target_id'])
await member.timeout(None, reason=f"Case #{case:,} resolved by {interaction.user.id}")
except discord.NotFound:
pass
if case_dict['moderation_type'] in ['TEMPBAN', 'BAN']:
try:
user = await interaction.client.fetch_user(case_dict['target_id'])
await interaction.guild.unban(user, reason=f"Case #{case} resolved by {interaction.user.id}")
except discord.NotFound:
pass
resolve_query = f"UPDATE `moderation_{interaction.guild.id}` SET resolved = 1, changes = ?, resolved_by = ?, resolve_reason = ? WHERE moderation_id = ?"
else:
resolve_query = f"UPDATE `moderation_{interaction.guild.id}` SET resolved = 1, changes = ?, resolved_by = ?, resolve_reason = ? WHERE moderation_id = ?"
cursor.execute(resolve_query, (json.dumps(changes), interaction.user.id, reason, case_dict['moderation_id']))
database.commit()
embed = await case_factory(interaction=interaction, case_dict=await fetch_case(case, interaction.guild.id))
await interaction.response.send_message(content=f"✅ Moderation #{case:,} resolved!", embed=embed)
await log(interaction, case, resolved=True)
cursor.close()
database.close()
@app_commands.command(name="case")
@app_commands.choices(export=[
Choice(name='Export as File', value='file'),
Choice(name='Export as Codeblock', value='codeblock')
])
async def case(self, interaction: discord.Interaction, case: int, ephemeral: bool = None, evidenceformat: bool = False, changes: bool = False, export: Choice[str] = None):
"""Check the details of a specific case.
Parameters
-----------
case: int
What case are you looking up?
ephemeral: bool
Hide the command response
changes: bool
List the changes made to the case
export: bool
Export the case to a JSON file or codeblock"""
permissions = check_permissions(interaction.client.user, ['embed_links'], interaction)
if permissions:
await interaction.response.send_message(error(f"I do not have the `{permissions}` permission, required for this action."), ephemeral=True)
return
if ephemeral is None:
ephemeral = (await config.user(interaction.user).history_ephemeral()
or await config.guild(interaction.guild).history_ephemeral()
or False)
if case != 0:
case_dict = await fetch_case(case, interaction.guild.id)
if case_dict:
if export:
if export.value == 'file' or len(str(case_dict)) > 1800:
filename = str(data_manager.cog_data_path(cog_instance=self)) + str(os.sep) + f"moderation_{interaction.guild.id}_case_{case}.json"
with open(filename, "w", encoding="utf-8") as f:
json.dump(case_dict, f, indent=2)
if export.value == 'codeblock':
content = f"Case #{case:,} exported.\n" + warning("Case was too large to export as codeblock, so it has been uploaded as a `.json` file.")
else:
content = f"Case #{case:,} exported."
await interaction.response.send_message(content=content, file=discord.File(filename, f"moderation_{interaction.guild.id}_case_{case}.json"), ephemeral=ephemeral)
os.remove(filename)
return
await interaction.response.send_message(content=box({json.dumps(case_dict, indent=2)}), ephemeral=ephemeral)
return
if changes:
embed = await changes_factory(interaction=interaction, case_dict=case_dict)
await interaction.response.send_message(embed=embed, ephemeral=ephemeral)
elif evidenceformat:
content = await evidenceformat_factory(interaction=interaction, case_dict=case_dict)
await interaction.response.send_message(content=content, ephemeral=ephemeral)
else:
embed = await case_factory(interaction=interaction, case_dict=case_dict)
await interaction.response.send_message(embed=embed, ephemeral=ephemeral)
return
await interaction.response.send_message(content=f"No case with case number `{case}` found.", ephemeral=True)
@app_commands.command(name="edit")
async def edit(self, interaction: discord.Interaction, case: int, reason: str, duration: str = None):
"""Edit the reason of a specific case.
Parameters
-----------
case: int
What case are you editing?
reason: str
What is the new reason?
duration: str
What is the new duration? Does not reapply the moderation if it has already expired."""
permissions = check_permissions(interaction.client.user, ['embed_links'], interaction)
if permissions:
await interaction.response.send_message(error(f"I do not have the `{permissions}` permission, required for this action."), ephemeral=True)
return
if case != 0:
parsed_time = None
case_dict = await fetch_case(case, interaction.guild.id)
if case_dict:
if duration:
try:
parsed_time = parse(sval=duration, as_timedelta=True, raise_exception=True)
except ValueError:
await interaction.response.send_message(error("Please provide a valid duration!"), ephemeral=True)
return
end_timestamp = case_dict['timestamp'] + parsed_time.total_seconds()
if case_dict['moderation_type'] == 'MUTE':
if (time.time() - case_dict['timestamp']) + parsed_time.total_seconds() > 2419200:
await interaction.response.send_message(error("Please provide a duration that is less than 28 days from the initial moderation."))
return
try:
member = await interaction.guild.fetch_member(case_dict['target_id'])
await member.timeout(parsed_time, reason=f"Case #{case:,} edited by {interaction.user.id}")
except discord.NotFound:
pass
changes: list = case_dict['changes']
if len(changes) > 25:
await interaction.response.send_message(content=error("Due to limitations with Discord's embed system, you cannot edit a case more than 25 times."), ephemeral=True)
return
if not changes:
changes.append(
{
'type': "ORIGINAL",
'timestamp': case_dict['timestamp'],
'reason': case_dict['reason'],
'user_id': case_dict['moderator_id'],
'duration': case_dict['duration'],
'end_timestamp': case_dict['end_timestamp']
}
)
if parsed_time:
changes.append(
{
'type': "EDIT",
'timestamp': int(time.time()),
'reason': reason,
'user_id': interaction.user.id,
'duration': convert_timedelta_to_str(parsed_time),
'end_timestamp': end_timestamp
}
)
else:
changes.append(
{
'type': "EDIT",
'timestamp': int(time.time()),
'reason': reason,
'user_id': interaction.user.id,
'duration': case_dict['duration'],
'end_timestamp': case_dict['end_timestamp']
}
)
database = connect()
cursor = database.cursor()
if parsed_time:
update_query = f"UPDATE `moderation_{interaction.guild.id}` SET changes = ?, reason = ?, duration = ?, end_timestamp = ? WHERE moderation_id = ?"
cursor.execute(update_query, (json.dumps(changes), reason, convert_timedelta_to_str(parsed_time), end_timestamp, case))
else:
update_query = f"UPDATE `moderation_{interaction.guild.id}` SET changes = ?, reason = ? WHERE moderation_id = ?"
cursor.execute(update_query, (json.dumps(changes), reason, case))
database.commit()
new_case = await fetch_case(case, interaction.guild.id)
embed = await case_factory(interaction=interaction, case_dict=new_case)
await interaction.response.send_message(content=f"✅ Moderation #{case:,} edited!", embed=embed, ephemeral=True)
await log(interaction, case)
cursor.close()
database.close()
return
await interaction.response.send_message(content=error(f"No case with case number `{case}` found."), ephemeral=True)
@tasks.loop(minutes=1)
async def handle_expiry(self):
current_time = time.time()
database = connect()
cursor = database.cursor()
global_num = 0
guilds: list[discord.Guild] = self.bot.guilds
for guild in guilds:
if not await self.bot.cog_disabled_in_guild(self, guild):
time_per_guild = time.time()
tempban_query = f"SELECT target_id, moderation_id FROM moderation_{guild.id} WHERE end_timestamp != 0 AND end_timestamp <= ? AND moderation_type = 'TEMPBAN' AND expired = 0"
try:
cursor.execute(tempban_query, (time.time(),))
result = cursor.fetchall()
except sqlite3.OperationalError:
continue
target_ids = [row[0] for row in result]
moderation_ids = [row[1] for row in result]
num = 0
for target_id, moderation_id in zip(target_ids, moderation_ids):
user: discord.User = await self.bot.fetch_user(target_id)
name = f"{user.name}#{user.discriminator}" if user.discriminator != "0" else user.name
try:
await guild.unban(user, reason=f"Automatic unban from case #{moderation_id}")
embed = await message_factory(await self.bot.get_embed_color(guild.channels[0]), guild=guild, reason=f'Automatic unban from case #{moderation_id}', moderation_type='unbanned')
try:
await user.send(embed=embed)
except discord.errors.HTTPException:
pass
logger.debug("Unbanned %s (%s) from %s (%s)", name, user.id, guild.name, guild.id)
num = num + 1
except (discord.errors.NotFound, discord.errors.Forbidden, discord.errors.HTTPException) as e:
logger.error("Failed to unban %s (%s) from %s (%s)\n%s", name, user.id, guild.name, guild.id, e)
expiry_query = f"UPDATE `moderation_{guild.id}` SET expired = 1 WHERE (end_timestamp != 0 AND end_timestamp <= ? AND expired = 0 AND moderation_type != 'BLACKLIST') OR (expired = 0 AND resolved = 1 AND moderation_type != 'BLACKLIST')"
cursor.execute(expiry_query, (time.time(),))
blacklist_query = f"SELECT target_id, moderation_id, role_id FROM moderation_{guild.id} WHERE end_timestamp != 0 AND end_timestamp <= ? AND moderation_type = 'BLACKLIST' AND expired = 0"
try:
cursor.execute(blacklist_query, (time.time(),))
result = cursor.fetchall()
except sqlite3.OperationalError:
continue
target_ids = [row[0] for row in result]
moderation_ids = [row[1] for row in result]
role_ids = [row[2] for row in result]
for target_id, moderation_id, role_id in zip(target_ids, moderation_ids, role_ids):
try:
# member: discord.Member = await guild.fetch_member(target_id)
role: discord.Role = guild.get_role(role_id)
if role is None:
raise discord.errors.NotFound
except (discord.errors.NotFound, discord.errors.Forbidden, discord.errors.HTTPException):
continue
per_guild_completion_time = (time.time() - time_per_guild) * 1000
logger.debug("Completed expiry loop for %s (%s) in %sms with %s users unbanned", guild.name, guild.id, f"{per_guild_completion_time:.6f}", num)
global_num = global_num + num
database.commit()
cursor.close()
database.close()
completion_time = (time.time() - current_time) * 1000
logger.debug("Completed expiry loop in %sms with %s users unbanned", f"{completion_time:.6f}", global_num)
@commands.command(aliases=["tdc"])
async def timedeltaconvert(self, ctx: commands.Context, *, duration: str):
"""This command converts a duration to a [`timedelta`](https://docs.python.org/3/library/datetime.html#datetime.timedelta) Python object.
**Example usage**
`[p]timedeltaconvert 1 day 15hr 82 minutes 52s`
**Output**
`1 day, 16:22:52`"""
try:
parsed_time = parse(duration, as_timedelta=True, raise_exception=True)
await ctx.send(f"`{str(parsed_time)}`")
except ValueError:
await ctx.send(error("Please provide a convertible value!"))