SeaCogs/aurora/models/moderation.py

542 lines
22 KiB
Python
Raw Normal View History

2024-05-06 15:11:11 -04:00
import json
import sqlite3
from datetime import datetime, timedelta
from time import time
from typing import Dict, Iterable, List, Optional, Tuple, Union
2024-05-04 13:41:11 -04:00
import discord
2024-06-08 20:12:22 -04:00
from aiosqlite import Connection, Cursor, OperationalError, Row
from aiosqlite import connect as aiosqlite_connect
from redbot.core import data_manager
from redbot.core.bot import Red
2024-05-04 13:42:58 -04:00
2024-05-06 21:39:43 -04:00
from ..utilities.logger import logger
from ..utilities.utils import timedelta_to_string
2024-05-09 21:27:26 -04:00
from .base import AuroraGuildModel
from .change import Change
from .partials import PartialChannel, PartialRole, PartialUser
from .type import Type, type_registry
2024-05-04 13:41:11 -04:00
class Moderation(AuroraGuildModel):
"""This class represents a moderation case in the database.
Attributes:
bot (Red): The bot instance.
guild (discord.Guild): The guild the case belongs to.
moderation_id (int): The ID of the moderation case.
timestamp (datetime): The timestamp of the case.
moderation_type (Type): The type of moderation case.
target_type (str): The type of target. Should be either `user` or `channel`.
target_id (int): The ID of the target.
moderator_id (int): The ID of the moderator who issued the case.
role_id (int): The ID of the role, if applicable.
duration (timedelta): The duration of the case, if applicable.
end_timestamp (datetime): The end timestamp of the case, if applicable.
reason (str): The reason for the case.
resolved (bool): Whether the case is resolved.
resolved_by (int): The ID of the user who resolved the case.
resolve_reason (str): The reason the case was resolved.
expired (bool): Whether the case is expired.
changes (List[Change]): A list of changes to the case.
metadata (Dict): A dictionary of metadata stored with the case.
Properties:
id (int): The ID of the case.
type (Type): The type of the case.
unix_timestamp (int): The timestamp of the case as a Unix timestamp.
Methods:
get_moderator: Gets the moderator who issued the case.
get_target: Gets the target of the case.
get_resolved_by: Gets the user who resolved the case.
get_role: Gets the role, if applicable.
resolve: Resolves the case.
update: Updates the case in the database.
Class Methods:
from_dict: Creates a `Moderation` object from a dictionary.
from_result: Creates a `Moderation` object from a database result.
execute: Executes a query on the database.
get_latest: Gets the latest cases from the database.
get_next_case_number: Gets the next case number to use.
find_by_id: Finds a case by its ID.
find_by_target: Finds cases by the target.
find_by_moderator: Finds cases by the moderator.
log: Logs a moderation case in the database.
Static Methods:
connect: Connects to the SQLite database.
"""
2024-05-04 13:41:11 -04:00
moderation_id: int
timestamp: datetime
moderation_type: Type
2024-05-04 13:41:11 -04:00
target_type: str
target_id: int
moderator_id: int
role_id: Optional[int] = None
duration: Optional[timedelta] = None
end_timestamp: Optional[datetime] = None
reason: Optional[str] = None
2024-05-04 13:41:11 -04:00
resolved: bool
resolved_by: Optional[int] = None
resolve_reason: Optional[str] = None
2024-05-04 13:41:11 -04:00
expired: bool
changes: List["Change"]
2024-05-04 13:41:11 -04:00
metadata: Dict
@property
def id(self) -> int:
return self.moderation_id
@property
2024-07-06 12:27:59 -04:00
def type(self) -> Type:
return self.moderation_type
@property
def unix_timestamp(self) -> int:
return int(self.timestamp.timestamp())
async def get_moderator(self) -> "PartialUser":
return await PartialUser.from_id(self.bot, self.moderator_id)
async def get_target(self) -> Union["PartialUser", "PartialChannel"]:
if self.target_type.lower() == "user":
return await PartialUser.from_id(self.bot, self.target_id)
2024-06-05 23:13:23 -04:00
return await PartialChannel.from_id(self.bot, self.target_id, self.guild)
async def get_resolved_by(self) -> Optional["PartialUser"]:
if self.resolved_by:
return await PartialUser.from_id(self.bot, self.resolved_by)
return None
async def get_role(self) -> Optional["PartialRole"]:
if self.role_id:
2024-06-05 23:13:23 -04:00
return await PartialRole.from_id(self.bot, self.guild, self.role_id)
return None
def __str__(self) -> str:
2024-05-04 13:41:11 -04:00
return f"{self.moderation_type} {self.target_type} {self.target_id} {self.reason}"
def __int__(self) -> int:
return self.moderation_id
async def resolve(self, resolved_by: int, reason: str) -> Tuple[bool, str]:
if self.resolved:
raise ValueError("Case is already resolved!")
self.resolved = True
self.resolved_by = resolved_by
self.resolve_reason = reason
success, msg = await self.type.resolve_handler(moderation=self, reason=reason)
2024-07-06 12:15:39 -04:00
if not self.changes:
self.changes.append(Change.from_dict(self.bot, {
"type": "ORIGINAL",
"timestamp": self.timestamp,
"reason": self.reason,
"user_id": self.moderator_id,
"duration": self.duration,
"end_timestamp": self.end_timestamp,
}))
self.changes.append(Change.from_dict(self.bot, {
"type": "RESOLVE",
"timestamp": datetime.now(),
"reason": reason,
"user_id": resolved_by,
}))
2024-06-05 00:14:43 -04:00
await self.update()
return success, msg
2024-06-05 00:14:43 -04:00
async def update(self) -> None:
2024-05-06 21:46:01 -04:00
from ..utilities.json import dumps
query = f"UPDATE moderation_{self.guild_id} SET timestamp = ?, moderation_type = ?, target_type = ?, moderator_id = ?, role_id = ?, duration = ?, end_timestamp = ?, reason = ?, resolved = ?, resolved_by = ?, resolve_reason = ?, expired = ?, changes = ?, metadata = ? WHERE moderation_id = ?;"
2024-06-05 23:13:23 -04:00
await self.execute(query, (
self.timestamp.timestamp(),
self.moderation_type.key,
self.target_type,
self.moderator_id,
self.role_id,
str(self.duration) if self.duration else None,
self.end_timestamp.timestamp() if self.end_timestamp else None,
self.reason,
self.resolved,
self.resolved_by,
self.resolve_reason,
self.expired,
dumps(self.changes),
dumps(self.metadata),
self.moderation_id,
))
2024-06-05 23:13:23 -04:00
logger.verbose("Row updated in moderation_%s!\n%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s",
self.moderation_id,
self.guild_id,
2024-05-06 14:55:36 -04:00
self.timestamp.timestamp(),
self.moderation_type.key,
self.target_type,
self.moderator_id,
self.role_id,
2024-05-06 14:55:36 -04:00
str(self.duration) if self.duration else None,
self.end_timestamp.timestamp() if self.end_timestamp else None,
self.reason,
self.resolved,
self.resolved_by,
self.resolve_reason,
self.expired,
2024-05-06 14:55:36 -04:00
dumps(self.changes),
dumps(self.metadata),
)
2024-05-06 14:55:36 -04:00
@classmethod
async def from_dict(cls, bot: Red, data: dict) -> "Moderation":
2024-06-05 23:13:23 -04:00
if data.get("guild_id"):
try:
guild = bot.get_guild(data["guild_id"])
2024-06-05 23:13:23 -04:00
if not guild:
guild = await bot.fetch_guild(data["guild_id"])
2024-06-05 23:13:23 -04:00
except (discord.Forbidden, discord.HTTPException):
guild = None
data.update({"guild": guild})
2024-05-06 14:55:36 -04:00
return cls(bot=bot, **data)
@classmethod
async def from_result(cls, bot: Red, result: Iterable, guild_id: int) -> "Moderation":
2024-06-04 16:52:19 -04:00
if result[7] is not None and result[7] != "NULL":
hours, minutes, seconds = map(int, result[7].split(':'))
duration = timedelta(hours=hours, minutes=minutes, seconds=seconds)
else:
duration = None
if result[14] is not None:
changes = json.loads(result[14])
change_obj_list = []
2024-05-24 03:53:38 -04:00
if changes:
for change in changes:
change_obj_list.append(Change.from_dict(bot=bot, data=change))
if result[15] is not None:
metadata = json.loads(result[15])
else:
metadata = {}
moderation_type = str.lower(result[2])
if moderation_type in type_registry:
moderation_type = type_registry[moderation_type]
else:
logger.error("Unknown moderation type in case %s: %s", result[0], result[2])
case = {
"moderation_id": int(result[0]),
"guild_id": int(guild_id),
"timestamp": datetime.fromtimestamp(result[1]),
"moderation_type": moderation_type,
"target_type": str(result[3]),
"target_id": int(result[4]),
"moderator_id": int(result[5]),
"role_id": int(result[6]) if result[6] is not None else None,
"duration": duration,
"end_timestamp": datetime.fromtimestamp(result[8]) if result[8] is not None else None,
"reason": result[9],
"resolved": bool(result[10]),
"resolved_by": result[11],
"resolve_reason": result[12],
"expired": bool(result[13]),
"changes": change_obj_list,
"metadata": metadata if metadata else {},
}
return await cls.from_dict(bot=bot, data=case)
2024-06-08 20:12:22 -04:00
@staticmethod
async def connect() -> Connection:
"""Connects to the SQLite database, and returns a connection object."""
try:
connection = await aiosqlite_connect(
database=data_manager.cog_data_path(raw_name="Aurora") / "aurora.db"
)
return connection
except OperationalError as e:
logger.error("Unable to access the SQLite database!\nError:\n%s", e.msg)
raise ConnectionRefusedError(
f"Unable to access the SQLite Database!\n{e.msg}"
) from e
@classmethod
2024-06-08 20:12:22 -04:00
async def execute(cls, query: str, parameters: tuple | None = None, bot: Red | None = None, guild_id: int | None = None, cursor: Cursor | None = None, return_obj: bool = True) -> Union[Tuple["Moderation"], Iterable[Row]]:
"""Executes a query on the database.
Arguments:
query (str): The query to execute.
parameters (tuple): The parameters to pass to the query.
bot (Red): The bot instance.
guild_id (int): The ID of the guild to execute the query on.
cursor (Cursor): The cursor to use for the query.
return_obj (bool): Whether to return the case object(s). Defaults to `True`. If `False`, returns a `Iterable` of `aiosqlite.Row` objects.
Returns: The result of the query, either as a `Tuple` of `Moderation` objects or an `Iterable` of `aiosqlite.Row` objects.
"""
2024-06-08 20:12:22 -04:00
logger.trace("Executing query: \"%s\" with parameters \"%s\"", query, parameters)
if not parameters:
parameters = ()
if not cursor:
no_cursor = True
2024-06-08 20:12:22 -04:00
database = await cls.connect()
2024-06-05 00:14:43 -04:00
cursor = await database.cursor()
else:
no_cursor = False
try:
await cursor.execute(query, parameters)
except OperationalError as e:
logger.error("Error executing query: \"%s\" with parameters \"%s\"\nError:\n%s",
query, parameters, e)
raise OperationalError(f"Error executing query: \"{query}\" with parameters \"{parameters}\"") from e
2024-06-05 00:14:43 -04:00
results = await cursor.fetchall()
2024-06-08 20:12:22 -04:00
await database.commit()
if no_cursor:
2024-06-05 00:14:43 -04:00
await cursor.close()
await database.close()
2024-06-08 20:12:22 -04:00
if results and return_obj and bot and guild_id:
cases = []
for result in results:
2024-07-06 13:15:06 -04:00
if result[0] == 0:
continue
case = await cls.from_result(bot=bot, result=result, guild_id=guild_id)
2024-07-06 13:15:06 -04:00
cases.append(case)
return tuple(cases)
2024-06-08 20:12:22 -04:00
return results
@classmethod
async def get_latest(cls, bot: Red, guild_id: int, before: datetime | None = None, after: datetime | None = None, limit: int | None = None, offset: int = 0, types: Iterable[Type] | None = None, expired: bool | None = None, cursor: Cursor | None = None) -> Tuple["Moderation"]:
params = []
query = f"SELECT * FROM moderation_{guild_id}"
conditions = []
if types:
if 'all' in types:
types = type_registry.keys()
logger.debug("Getting all types: %s", types)
conditions.append(f"moderation_type IN ({', '.join(['?' for _ in types])})")
params.extend([t.key for t in types])
if before:
conditions.append("timestamp < ?")
params.append(int(before.timestamp()))
if after:
conditions.append("timestamp > ?")
params.append(int(after.timestamp()))
if expired is not None:
conditions.append("expired = ?")
params.append(int(expired))
if conditions:
query += " WHERE " + " AND ".join(conditions)
query += " ORDER BY moderation_id DESC"
if limit:
query += " LIMIT ? OFFSET ?"
params.extend((limit, offset))
query += ";"
2024-06-05 00:14:43 -04:00
return await cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=tuple(params) if params else (), cursor=cursor)
@classmethod
2024-06-05 00:14:43 -04:00
async def get_next_case_number(cls, bot: Red, guild_id: int, cursor: Cursor | None = None) -> int:
result = await cls.get_latest(bot=bot, guild_id=guild_id, cursor=cursor, limit=1)
return (result[0].moderation_id + 1) if result else 1
@classmethod
2024-06-05 00:14:43 -04:00
async def find_by_id(cls, bot: Red, moderation_id: int, guild_id: int, cursor: Cursor | None = None) -> "Moderation":
query = f"SELECT * FROM moderation_{guild_id} WHERE moderation_id = ?;"
2024-06-05 00:14:43 -04:00
case = await cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=(moderation_id,), cursor=cursor)
if case:
return case[0]
raise ValueError(f"Case {moderation_id} not found in moderation_{guild_id}!")
@classmethod
async def find_by_target(cls, bot: Red, guild_id: int, target: int, before: datetime = None, after: datetime = None, types: Iterable[Type] | None = None, expired: bool | None = None, cursor: Cursor | None = None) -> Tuple["Moderation"]:
query = f"SELECT * FROM moderation_{guild_id} WHERE target_id = ?"
params = [target]
if types:
query += f" AND moderation_type IN ({', '.join(['?' for _ in types])})"
for t in types:
params.append(t.key)
if before:
query += " AND timestamp < ?"
params.append(int(before.timestamp()))
if after:
query += " AND timestamp > ?"
params.append(int(after.timestamp()))
if expired is not None:
query += " AND expired = ?"
params.append(int(expired))
query += " ORDER BY moderation_id DESC;"
return await cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=params, cursor=cursor)
@classmethod
async def find_by_moderator(cls, bot: Red, guild_id: int, moderator: int, before: datetime = None, after: datetime = None, types: Iterable[Type] | None = None, expired: bool | None = None, cursor: Cursor | None = None) -> Tuple["Moderation"]:
query = f"SELECT * FROM moderation_{guild_id} WHERE moderator_id = ?"
params = [moderator]
if types:
query += f" AND moderation_type IN ({', '.join(['?' for _ in types])})"
for t in types:
params.append(t.key)
if before:
query += " AND timestamp < ?"
params.append(int(before.timestamp()))
if after:
query += " AND timestamp > ?"
params.append(int(after.timestamp()))
if expired is not None:
query += " AND expired = ?"
params.append(int(expired))
query += " ORDER BY moderation_id DESC;"
return await cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=params, cursor=cursor)
@classmethod
2024-06-05 00:14:43 -04:00
async def log(
cls,
bot: Red,
guild_id: int,
moderator_id: int,
2024-07-06 12:23:06 -04:00
moderation_type: Type,
target_type: str,
target_id: int,
2024-05-24 03:46:20 -04:00
role_id: int | None = None,
2024-05-06 21:04:08 -04:00
duration: timedelta | None = None,
reason: str | None = None,
database: sqlite3.Connection | None = None,
timestamp: datetime | None = None,
resolved: bool = False,
2024-05-06 21:04:08 -04:00
resolved_by: int | None = None,
resolved_reason: str | None = None,
expired: bool | None = None,
changes: list | None = None,
metadata: dict | None = None,
return_obj: bool = True,
2024-06-04 23:55:55 -04:00
) -> Union["Moderation", int]:
"""Logs a moderation case in the database.
Args:
bot (Red): The bot instance.
guild_id (int): The ID of the guild to log the case in.
moderator_id (int): The ID of the moderator who issued the case.
moderation_type (Type): The type of moderation case. See `aurora.models.moderation_types` for the built-in options.
target_type (str): The type of target. Should be either `user` or `channel`.
target_id (int): The ID of the target.
role_id (int): The ID of the role, if applicable.
duration (timedelta): The duration of the case, if applicable.
reason (str): The reason for the case.
database (sqlite3.Connection): The database connection to use to log the case. A connection will be automatically created if not provided.
timestamp (datetime): The timestamp of the case. Will be automatically generated if not provided.
resolved (bool): Whether the case is resolved.
resolved_by (int): The ID of the user who resolved the case.
resolved_reason (str): The reason the case was resolved.
expired (bool): Whether the case is expired.
changes (list): A list of changes to log. You usually shouldn't pass this, as it's automatically generated by the `/edit` and `/resolve` commands.
metadata (dict): A dictionary of metadata to store with the case.
return_obj (bool): Whether to return the case object. Defaults to `True`. If `False`, returns the case ID.
Returns:
Union[Moderation, int]: The `Moderation` object if `return_obj` is `True`, otherwise the case ID.
"""
2024-05-06 21:46:01 -04:00
from ..utilities.json import dumps
if not timestamp:
timestamp = datetime.fromtimestamp(time())
elif not isinstance(timestamp, datetime):
timestamp = datetime.fromtimestamp(timestamp)
2024-05-09 21:27:26 -04:00
if duration == "NULL":
duration = None
if duration is not None:
end_timestamp = timestamp + duration
else:
duration = None
end_timestamp = None
if not expired:
if end_timestamp:
expired = bool(timestamp > end_timestamp)
else:
expired = False
if reason == "NULL":
reason = None
2024-06-04 23:55:55 -04:00
if resolved_by in ["NULL", "?"]:
resolved_by = None
if resolved_reason == "NULL":
resolved_reason = None
if role_id == 0:
role_id = None
if not database:
2024-06-08 20:12:22 -04:00
database = await cls.connect()
close_db = True
else:
close_db = False
2024-06-05 00:41:41 -04:00
moderation_id = await cls.get_next_case_number(bot=bot, guild_id=guild_id)
case = {
"moderation_id": moderation_id,
2024-05-06 14:55:36 -04:00
"timestamp": timestamp.timestamp(),
2024-07-06 12:23:06 -04:00
"moderation_type": moderation_type.key,
"target_type": target_type,
"target_id": target_id,
"moderator_id": moderator_id,
"role_id": role_id,
"duration": timedelta_to_string(duration) if duration else None,
2024-05-06 14:55:36 -04:00
"end_timestamp": end_timestamp.timestamp() if end_timestamp else None,
"reason": reason,
"resolved": resolved,
"resolved_by": resolved_by,
"resolve_reason": resolved_reason,
"expired": expired,
2024-05-06 14:55:36 -04:00
"changes": dumps(changes),
"metadata": dumps(metadata)
}
sql = f"INSERT INTO `moderation_{guild_id}` (moderation_id, timestamp, moderation_type, target_type, target_id, moderator_id, role_id, duration, end_timestamp, reason, resolved, resolved_by, resolve_reason, expired, changes, metadata) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
2024-06-05 00:14:43 -04:00
await database.execute(sql, tuple(case.values()))
2024-06-05 00:14:43 -04:00
await database.commit()
if close_db:
2024-06-05 00:14:43 -04:00
await database.close()
logger.verbose(
"Row inserted into moderation_%s!\n%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s",
guild_id,
2024-05-06 14:55:36 -04:00
case["moderation_id"],
case["timestamp"],
case["moderation_type"],
case["target_type"],
case["target_id"],
case["moderator_id"],
case["role_id"],
case["duration"],
case["end_timestamp"],
case["reason"],
case["resolved"],
case["resolved_by"],
case["resolve_reason"],
case["expired"],
case["changes"],
case["metadata"],
)
if return_obj:
2024-06-05 00:14:43 -04:00
return await cls.find_by_id(bot=bot, moderation_id=moderation_id, guild_id=guild_id)
2024-06-04 23:55:55 -04:00
return moderation_id