SeaCogs/aurora/models/moderation.py
Seaswimmer 14750787b2
Some checks failed
Actions / Build Documentation (MkDocs) (pull_request) Failing after 28s
Actions / Lint Code (Ruff & Pylint) (pull_request) Successful in 39s
fix(aurora): cast timestamps to integers
2024-08-10 13:34:15 -04:00

520 lines
21 KiB
Python

import json
import sqlite3
from datetime import datetime, timedelta
from time import time
from typing import Dict, Iterable, List, Optional, Tuple, Union
import discord
from aiosqlite import Connection, Cursor, OperationalError, Row
from aiosqlite import connect as aiosqlite_connect
from redbot.core import data_manager
from redbot.core.bot import Red
from ..utilities.logger import logger
from ..utilities.utils import timedelta_to_string
from .base import AuroraGuildModel
from .change import Change
from .partials import PartialChannel, PartialRole, PartialUser
from .type import Type, type_registry
class Moderation(AuroraGuildModel):
"""This class represents a moderation case in the database.
Attributes:
bot (Red): The bot instance.
guild (discord.Guild): The guild the case belongs to.
moderation_id (int): The ID of the moderation case.
timestamp (datetime): The timestamp of the case.
moderation_type (Type): The type of moderation case.
target_type (str): The type of target. Should be either `user` or `channel`.
target_id (int): The ID of the target.
moderator_id (int): The ID of the moderator who issued the case.
role_id (int): The ID of the role, if applicable.
duration (timedelta): The duration of the case, if applicable.
end_timestamp (datetime): The end timestamp of the case, if applicable.
reason (str): The reason for the case.
resolved (bool): Whether the case is resolved.
resolved_by (int): The ID of the user who resolved the case.
resolve_reason (str): The reason the case was resolved.
expired (bool): Whether the case is expired.
changes (List[Change]): A list of changes to the case.
metadata (Dict): A dictionary of metadata stored with the case.
Properties:
id (int): The ID of the case.
type (Type): The type of the case.
unix_timestamp (int): The timestamp of the case as a Unix timestamp.
Methods:
get_moderator: Gets the moderator who issued the case.
get_target: Gets the target of the case.
get_resolved_by: Gets the user who resolved the case.
get_role: Gets the role, if applicable.
resolve: Resolves the case.
update: Updates the case in the database.
Class Methods:
from_dict: Creates a `Moderation` object from a dictionary.
from_result: Creates a `Moderation` object from a database result.
execute: Executes a query on the database.
get_latest: Gets the latest cases from the database.
get_next_case_number: Gets the next case number to use.
find_by_id: Finds a case by its ID.
find_by_target: Finds cases by the target.
find_by_moderator: Finds cases by the moderator.
log: Logs a moderation case in the database.
Static Methods:
connect: Connects to the SQLite database.
"""
moderation_id: int
timestamp: datetime
moderation_type: Type
target_type: str
target_id: int
moderator_id: int
role_id: Optional[int] = None
duration: Optional[timedelta] = None
end_timestamp: Optional[datetime] = None
reason: Optional[str] = None
resolved: bool
resolved_by: Optional[int] = None
resolve_reason: Optional[str] = None
expired: bool
changes: List["Change"]
metadata: Dict
@property
def id(self) -> int:
return self.moderation_id
@property
def type(self) -> Type:
return self.moderation_type
@property
def unix_timestamp(self) -> int:
return int(self.timestamp.timestamp())
async def get_moderator(self) -> "PartialUser":
return await PartialUser.from_id(self.bot, self.moderator_id)
async def get_target(self) -> Union["PartialUser", "PartialChannel"]:
if self.target_type.lower() == "user":
return await PartialUser.from_id(self.bot, self.target_id)
return await PartialChannel.from_id(self.bot, self.target_id, self.guild)
async def get_resolved_by(self) -> Optional["PartialUser"]:
if self.resolved_by:
return await PartialUser.from_id(self.bot, self.resolved_by)
return None
async def get_role(self) -> Optional["PartialRole"]:
if self.role_id:
return await PartialRole.from_id(self.bot, self.guild, self.role_id)
return None
def __str__(self) -> str:
return f"{self.moderation_type} {self.target_type} {self.target_id} {self.reason}"
def __int__(self) -> int:
return self.moderation_id
async def resolve(self, resolved_by: int, reason: str) -> Tuple[bool, str]:
if self.resolved:
raise ValueError("Case is already resolved!")
self.resolved = True
self.resolved_by = resolved_by
self.resolve_reason = reason
success, msg = await self.type.resolve_handler(moderation=self, reason=reason)
if not self.changes:
self.changes.append(Change.from_dict(self.bot, {
"type": "ORIGINAL",
"timestamp": self.timestamp,
"reason": self.reason,
"user_id": self.moderator_id,
"duration": self.duration,
"end_timestamp": self.end_timestamp,
}))
self.changes.append(Change.from_dict(self.bot, {
"type": "RESOLVE",
"timestamp": datetime.now(),
"reason": reason,
"user_id": resolved_by,
}))
await self.update()
return success, msg
async def update(self) -> None:
from ..utilities.json import dumps
query = f"UPDATE moderation_{self.guild_id} SET timestamp = ?, moderation_type = ?, target_type = ?, moderator_id = ?, role_id = ?, duration = ?, end_timestamp = ?, reason = ?, resolved = ?, resolved_by = ?, resolve_reason = ?, expired = ?, changes = ?, metadata = ? WHERE moderation_id = ?;"
await self.execute(query, (
self.timestamp.timestamp(),
self.moderation_type.key,
self.target_type,
self.moderator_id,
self.role_id,
str(self.duration) if self.duration else None,
self.end_timestamp.timestamp() if self.end_timestamp else None,
self.reason,
self.resolved,
self.resolved_by,
self.resolve_reason,
self.expired,
dumps(self.changes),
dumps(self.metadata),
self.moderation_id,
))
logger.verbose("Row updated in moderation_%s!\n%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s",
self.moderation_id,
self.guild_id,
self.timestamp.timestamp(),
self.moderation_type.key,
self.target_type,
self.moderator_id,
self.role_id,
str(self.duration) if self.duration else None,
self.end_timestamp.timestamp() if self.end_timestamp else None,
self.reason,
self.resolved,
self.resolved_by,
self.resolve_reason,
self.expired,
dumps(self.changes),
dumps(self.metadata),
)
@classmethod
async def from_dict(cls, bot: Red, data: dict) -> "Moderation":
if data.get("guild_id"):
try:
guild = bot.get_guild(data["guild_id"])
if not guild:
guild = await bot.fetch_guild(data["guild_id"])
except (discord.Forbidden, discord.HTTPException):
guild = None
data.update({"guild": guild})
return cls(bot=bot, **data)
@classmethod
async def from_result(cls, bot: Red, result: Iterable, guild_id: int) -> "Moderation":
if result[7] is not None and result[7] != "NULL":
hours, minutes, seconds = map(int, result[7].split(':'))
duration = timedelta(hours=hours, minutes=minutes, seconds=seconds)
else:
duration = None
if result[14] is not None:
changes = json.loads(result[14])
change_obj_list = []
if changes:
for change in changes:
change_obj_list.append(Change.from_dict(bot=bot, data=change))
if result[15] is not None:
metadata = json.loads(result[15])
else:
metadata = {}
moderation_type = str.lower(result[2])
if moderation_type in type_registry:
moderation_type = type_registry[moderation_type]
else:
logger.error("Unknown moderation type in case %s: %s", result[0], result[2])
case = {
"moderation_id": int(result[0]),
"guild_id": int(guild_id),
"timestamp": datetime.fromtimestamp(result[1]),
"moderation_type": moderation_type,
"target_type": str(result[3]),
"target_id": int(result[4]),
"moderator_id": int(result[5]),
"role_id": int(result[6]) if result[6] is not None else None,
"duration": duration,
"end_timestamp": datetime.fromtimestamp(result[8]) if result[8] is not None else None,
"reason": result[9],
"resolved": bool(result[10]),
"resolved_by": result[11],
"resolve_reason": result[12],
"expired": bool(result[13]),
"changes": change_obj_list,
"metadata": metadata if metadata else {},
}
return await cls.from_dict(bot=bot, data=case)
@staticmethod
async def connect() -> Connection:
"""Connects to the SQLite database, and returns a connection object."""
try:
connection = await aiosqlite_connect(
database=data_manager.cog_data_path(raw_name="Aurora") / "aurora.db"
)
return connection
except OperationalError as e:
logger.error("Unable to access the SQLite database!\nError:\n%s", e.msg)
raise ConnectionRefusedError(
f"Unable to access the SQLite Database!\n{e.msg}"
) from e
@classmethod
async def execute(cls, query: str, parameters: tuple | None = None, bot: Red | None = None, guild_id: int | None = None, cursor: Cursor | None = None, return_obj: bool = True) -> Union[Tuple["Moderation"], Iterable[Row]]:
"""Executes a query on the database.
Arguments:
query (str): The query to execute.
parameters (tuple): The parameters to pass to the query.
bot (Red): The bot instance.
guild_id (int): The ID of the guild to execute the query on.
cursor (Cursor): The cursor to use for the query.
return_obj (bool): Whether to return the case object(s). Defaults to `True`. If `False`, returns a `Iterable` of `aiosqlite.Row` objects.
Returns: The result of the query, either as a `Tuple` of `Moderation` objects or an `Iterable` of `aiosqlite.Row` objects.
"""
logger.trace("Executing query: \"%s\" with parameters \"%s\"", query, parameters)
if not parameters:
parameters = ()
if not cursor:
no_cursor = True
database = await cls.connect()
cursor = await database.cursor()
else:
no_cursor = False
try:
await cursor.execute(query, parameters)
except OperationalError as e:
logger.error("Error executing query: \"%s\" with parameters \"%s\"\nError:\n%s",
query, parameters, e)
raise OperationalError(f"Error executing query: \"{query}\" with parameters \"{parameters}\"") from e
results = await cursor.fetchall()
await database.commit()
if no_cursor:
await cursor.close()
await database.close()
if results and return_obj and bot and guild_id:
cases = []
for result in results:
if result[0] == 0:
continue
case = await cls.from_result(bot=bot, result=result, guild_id=guild_id)
cases.append(case)
return tuple(cases)
return results
@classmethod
async def get_latest(cls, bot: Red, guild_id: int, before: datetime = None, after: datetime = None, limit: int | None = None, offset: int = 0, types: Iterable[Type] | None = None, cursor: Cursor | None = None) -> Tuple["Moderation"]:
params = []
query = f"SELECT * FROM moderation_{guild_id} ORDER BY moderation_id DESC"
if types:
query += f" WHERE moderation_type IN ({', '.join(['?' for _ in types])})"
for t in types:
params.append(t.key)
if before:
query += " WHERE timestamp < ?"
params.append(int(before.timestamp()))
if after:
query += " WHERE timestamp > ?"
params.append(int(after.timestamp()))
if limit:
query += " LIMIT ? OFFSET ?"
params.extend((limit, offset))
query += ";"
return await cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=tuple(params) if params else (), cursor=cursor)
@classmethod
async def get_next_case_number(cls, bot: Red, guild_id: int, cursor: Cursor | None = None) -> int:
result = await cls.get_latest(bot=bot, guild_id=guild_id, cursor=cursor, limit=1)
return (result[0].moderation_id + 1) if result else 1
@classmethod
async def find_by_id(cls, bot: Red, moderation_id: int, guild_id: int, cursor: Cursor | None = None) -> "Moderation":
query = f"SELECT * FROM moderation_{guild_id} WHERE moderation_id = ?;"
case = await cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=(moderation_id,), cursor=cursor)
if case:
return case[0]
raise ValueError(f"Case {moderation_id} not found in moderation_{guild_id}!")
@classmethod
async def find_by_target(cls, bot: Red, guild_id: int, target: int, before: datetime = None, after: datetime = None, types: Iterable[Type] | None = None, cursor: Cursor | None = None) -> Tuple["Moderation"]:
query = f"SELECT * FROM moderation_{guild_id} WHERE target_id = ?"
params = [target]
if types:
query += f" AND moderation_type IN ({', '.join(['?' for _ in types])})"
for t in types:
params.append(t.key)
if before:
query += " AND timestamp < ?"
params.append(int(before.timestamp()))
if after:
query += " AND timestamp > ?"
params.append(int(after.timestamp()))
query += " ORDER BY moderation_id DESC;"
return await cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=params, cursor=cursor)
@classmethod
async def find_by_moderator(cls, bot: Red, guild_id: int, moderator: int, before: datetime = None, after: datetime = None, types: Iterable[Type] | None = None, cursor: Cursor | None = None) -> Tuple["Moderation"]:
query = f"SELECT * FROM moderation_{guild_id} WHERE moderator_id = ?"
params = [moderator]
if types:
query += f" AND moderation_type IN ({', '.join(['?' for _ in types])})"
for t in types:
params.append(t.key)
if before:
query += " AND timestamp < ?"
params.append(int(before.timestamp()))
if after:
query += " AND timestamp > ?"
params.append(int(after.timestamp()))
query += " ORDER BY moderation_id DESC;"
return await cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=params, cursor=cursor)
@classmethod
async def log(
cls,
bot: Red,
guild_id: int,
moderator_id: int,
moderation_type: Type,
target_type: str,
target_id: int,
role_id: int | None = None,
duration: timedelta | None = None,
reason: str | None = None,
database: sqlite3.Connection | None = None,
timestamp: datetime | None = None,
resolved: bool = False,
resolved_by: int | None = None,
resolved_reason: str | None = None,
expired: bool | None = None,
changes: list | None = None,
metadata: dict | None = None,
return_obj: bool = True,
) -> Union["Moderation", int]:
"""Logs a moderation case in the database.
Args:
bot (Red): The bot instance.
guild_id (int): The ID of the guild to log the case in.
moderator_id (int): The ID of the moderator who issued the case.
moderation_type (Type): The type of moderation case. See `aurora.models.moderation_types` for the built-in options.
target_type (str): The type of target. Should be either `user` or `channel`.
target_id (int): The ID of the target.
role_id (int): The ID of the role, if applicable.
duration (timedelta): The duration of the case, if applicable.
reason (str): The reason for the case.
database (sqlite3.Connection): The database connection to use to log the case. A connection will be automatically created if not provided.
timestamp (datetime): The timestamp of the case. Will be automatically generated if not provided.
resolved (bool): Whether the case is resolved.
resolved_by (int): The ID of the user who resolved the case.
resolved_reason (str): The reason the case was resolved.
expired (bool): Whether the case is expired.
changes (list): A list of changes to log. You usually shouldn't pass this, as it's automatically generated by the `/edit` and `/resolve` commands.
metadata (dict): A dictionary of metadata to store with the case.
return_obj (bool): Whether to return the case object. Defaults to `True`. If `False`, returns the case ID.
Returns:
Union[Moderation, int]: The `Moderation` object if `return_obj` is `True`, otherwise the case ID.
"""
from ..utilities.json import dumps
if not timestamp:
timestamp = datetime.fromtimestamp(time())
elif not isinstance(timestamp, datetime):
timestamp = datetime.fromtimestamp(timestamp)
if duration == "NULL":
duration = None
if duration is not None:
end_timestamp = timestamp + duration
else:
duration = None
end_timestamp = None
if not expired:
if end_timestamp:
expired = bool(timestamp > end_timestamp)
else:
expired = False
if reason == "NULL":
reason = None
if resolved_by in ["NULL", "?"]:
resolved_by = None
if resolved_reason == "NULL":
resolved_reason = None
if role_id == 0:
role_id = None
if not database:
database = await cls.connect()
close_db = True
else:
close_db = False
moderation_id = await cls.get_next_case_number(bot=bot, guild_id=guild_id)
case = {
"moderation_id": moderation_id,
"timestamp": timestamp.timestamp(),
"moderation_type": moderation_type.key,
"target_type": target_type,
"target_id": target_id,
"moderator_id": moderator_id,
"role_id": role_id,
"duration": timedelta_to_string(duration) if duration else None,
"end_timestamp": end_timestamp.timestamp() if end_timestamp else None,
"reason": reason,
"resolved": resolved,
"resolved_by": resolved_by,
"resolve_reason": resolved_reason,
"expired": expired,
"changes": dumps(changes),
"metadata": dumps(metadata)
}
sql = f"INSERT INTO `moderation_{guild_id}` (moderation_id, timestamp, moderation_type, target_type, target_id, moderator_id, role_id, duration, end_timestamp, reason, resolved, resolved_by, resolve_reason, expired, changes, metadata) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
await database.execute(sql, tuple(case.values()))
await database.commit()
if close_db:
await database.close()
logger.verbose(
"Row inserted into moderation_%s!\n%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s",
guild_id,
case["moderation_id"],
case["timestamp"],
case["moderation_type"],
case["target_type"],
case["target_id"],
case["moderator_id"],
case["role_id"],
case["duration"],
case["end_timestamp"],
case["reason"],
case["resolved"],
case["resolved_by"],
case["resolve_reason"],
case["expired"],
case["changes"],
case["metadata"],
)
if return_obj:
return await cls.find_by_id(bot=bot, moderation_id=moderation_id, guild_id=guild_id)
return moderation_id