528 lines
21 KiB
Python
528 lines
21 KiB
Python
import json
|
|
import sqlite3
|
|
from datetime import datetime, timedelta
|
|
from time import time
|
|
from typing import Dict, Iterable, List, Optional, Tuple, Union
|
|
|
|
import discord
|
|
from aiosqlite import Connection, Cursor, OperationalError, Row
|
|
from aiosqlite import connect as aiosqlite_connect
|
|
from redbot.core import data_manager
|
|
from redbot.core.bot import Red
|
|
|
|
from ..utilities.logger import logger
|
|
from ..utilities.utils import timedelta_to_string
|
|
from .base import AuroraGuildModel
|
|
from .change import Change
|
|
from .partials import PartialChannel, PartialRole, PartialUser
|
|
from .type import Type, type_registry
|
|
|
|
|
|
class Moderation(AuroraGuildModel):
|
|
"""This class represents a moderation case in the database.
|
|
|
|
Attributes:
|
|
bot (Red): The bot instance.
|
|
guild (discord.Guild): The guild the case belongs to.
|
|
moderation_id (int): The ID of the moderation case.
|
|
timestamp (datetime): The timestamp of the case.
|
|
moderation_type (Type): The type of moderation case.
|
|
target_type (str): The type of target. Should be either `user` or `channel`.
|
|
target_id (int): The ID of the target.
|
|
moderator_id (int): The ID of the moderator who issued the case.
|
|
role_id (int): The ID of the role, if applicable.
|
|
duration (timedelta): The duration of the case, if applicable.
|
|
end_timestamp (datetime): The end timestamp of the case, if applicable.
|
|
reason (str): The reason for the case.
|
|
resolved (bool): Whether the case is resolved.
|
|
resolved_by (int): The ID of the user who resolved the case.
|
|
resolve_reason (str): The reason the case was resolved.
|
|
expired (bool): Whether the case is expired.
|
|
changes (List[Change]): A list of changes to the case.
|
|
metadata (Dict): A dictionary of metadata stored with the case.
|
|
|
|
Properties:
|
|
id (int): The ID of the case.
|
|
type (Type): The type of the case.
|
|
unix_timestamp (int): The timestamp of the case as a Unix timestamp.
|
|
|
|
Methods:
|
|
get_moderator: Gets the moderator who issued the case.
|
|
get_target: Gets the target of the case.
|
|
get_resolved_by: Gets the user who resolved the case.
|
|
get_role: Gets the role, if applicable.
|
|
resolve: Resolves the case.
|
|
update: Updates the case in the database.
|
|
|
|
Class Methods:
|
|
from_dict: Creates a `Moderation` object from a dictionary.
|
|
from_result: Creates a `Moderation` object from a database result.
|
|
execute: Executes a query on the database.
|
|
get_latest: Gets the latest cases from the database.
|
|
get_next_case_number: Gets the next case number to use.
|
|
find_by_id: Finds a case by its ID.
|
|
find_by_target: Finds cases by the target.
|
|
find_by_moderator: Finds cases by the moderator.
|
|
log: Logs a moderation case in the database.
|
|
|
|
Static Methods:
|
|
connect: Connects to the SQLite database.
|
|
"""
|
|
|
|
moderation_id: int
|
|
timestamp: datetime
|
|
moderation_type: Type
|
|
target_type: str
|
|
target_id: int
|
|
moderator_id: int
|
|
role_id: Optional[int] = None
|
|
duration: Optional[timedelta] = None
|
|
end_timestamp: Optional[datetime] = None
|
|
reason: Optional[str] = None
|
|
resolved: bool
|
|
resolved_by: Optional[int] = None
|
|
resolve_reason: Optional[str] = None
|
|
expired: bool
|
|
changes: List["Change"]
|
|
metadata: Dict
|
|
|
|
@property
|
|
def id(self) -> int:
|
|
return self.moderation_id
|
|
|
|
@property
|
|
def type(self) -> Type:
|
|
return self.moderation_type
|
|
|
|
@property
|
|
def unix_timestamp(self) -> int:
|
|
return int(self.timestamp.timestamp())
|
|
|
|
async def get_moderator(self) -> "PartialUser":
|
|
return await PartialUser.from_id(self.bot, self.moderator_id)
|
|
|
|
async def get_target(self) -> Union["PartialUser", "PartialChannel"]:
|
|
if self.target_type.lower() == "user":
|
|
return await PartialUser.from_id(self.bot, self.target_id)
|
|
return await PartialChannel.from_id(self.bot, self.target_id, self.guild)
|
|
|
|
async def get_resolved_by(self) -> Optional["PartialUser"]:
|
|
if self.resolved_by:
|
|
return await PartialUser.from_id(self.bot, self.resolved_by)
|
|
return None
|
|
|
|
async def get_role(self) -> Optional["PartialRole"]:
|
|
if self.role_id:
|
|
return await PartialRole.from_id(self.bot, self.guild, self.role_id)
|
|
return None
|
|
|
|
def __str__(self) -> str:
|
|
return f"{self.moderation_type} {self.target_type} {self.target_id} {self.reason}"
|
|
|
|
def __int__(self) -> int:
|
|
return self.moderation_id
|
|
|
|
async def resolve(self, resolved_by: int, reason: str) -> Tuple[bool, str]:
|
|
if self.resolved:
|
|
raise ValueError("Case is already resolved!")
|
|
|
|
self.resolved = True
|
|
self.resolved_by = resolved_by
|
|
self.resolve_reason = reason
|
|
|
|
success, msg = await self.type.resolve_handler(moderation=self, reason=reason)
|
|
|
|
if not self.changes:
|
|
self.changes.append(Change.from_dict(self.bot, {
|
|
"type": "ORIGINAL",
|
|
"timestamp": self.timestamp,
|
|
"reason": self.reason,
|
|
"user_id": self.moderator_id,
|
|
"duration": self.duration,
|
|
"end_timestamp": self.end_timestamp,
|
|
}))
|
|
self.changes.append(Change.from_dict(self.bot, {
|
|
"type": "RESOLVE",
|
|
"timestamp": datetime.now(),
|
|
"reason": reason,
|
|
"user_id": resolved_by,
|
|
}))
|
|
|
|
await self.update()
|
|
return success, msg
|
|
|
|
async def update(self) -> None:
|
|
from ..utilities.json import dumps
|
|
query = f"UPDATE moderation_{self.guild_id} SET timestamp = ?, moderation_type = ?, target_type = ?, moderator_id = ?, role_id = ?, duration = ?, end_timestamp = ?, reason = ?, resolved = ?, resolved_by = ?, resolve_reason = ?, expired = ?, changes = ?, metadata = ? WHERE moderation_id = ?;"
|
|
|
|
await self.execute(query, (
|
|
self.timestamp.timestamp(),
|
|
self.moderation_type.key,
|
|
self.target_type,
|
|
self.moderator_id,
|
|
self.role_id,
|
|
str(self.duration) if self.duration else None,
|
|
self.end_timestamp.timestamp() if self.end_timestamp else None,
|
|
self.reason,
|
|
self.resolved,
|
|
self.resolved_by,
|
|
self.resolve_reason,
|
|
self.expired,
|
|
dumps(self.changes),
|
|
dumps(self.metadata),
|
|
self.moderation_id,
|
|
))
|
|
|
|
logger.verbose("Row updated in moderation_%s!\n%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s",
|
|
self.moderation_id,
|
|
self.guild_id,
|
|
self.timestamp.timestamp(),
|
|
self.moderation_type.key,
|
|
self.target_type,
|
|
self.moderator_id,
|
|
self.role_id,
|
|
str(self.duration) if self.duration else None,
|
|
self.end_timestamp.timestamp() if self.end_timestamp else None,
|
|
self.reason,
|
|
self.resolved,
|
|
self.resolved_by,
|
|
self.resolve_reason,
|
|
self.expired,
|
|
dumps(self.changes),
|
|
dumps(self.metadata),
|
|
)
|
|
|
|
@classmethod
|
|
async def from_dict(cls, bot: Red, data: dict) -> "Moderation":
|
|
if data.get("guild_id"):
|
|
try:
|
|
guild = bot.get_guild(data["guild_id"])
|
|
if not guild:
|
|
guild = await bot.fetch_guild(data["guild_id"])
|
|
except (discord.Forbidden, discord.HTTPException):
|
|
guild = None
|
|
data.update({"guild": guild})
|
|
return cls(bot=bot, **data)
|
|
|
|
@classmethod
|
|
async def from_result(cls, bot: Red, result: Iterable, guild_id: int) -> "Moderation":
|
|
if result[7] is not None and result[7] != "NULL":
|
|
hours, minutes, seconds = map(int, result[7].split(':'))
|
|
duration = timedelta(hours=hours, minutes=minutes, seconds=seconds)
|
|
else:
|
|
duration = None
|
|
|
|
if result[14] is not None:
|
|
changes = json.loads(result[14])
|
|
change_obj_list = []
|
|
if changes:
|
|
for change in changes:
|
|
change_obj_list.append(Change.from_dict(bot=bot, data=change))
|
|
|
|
if result[15] is not None:
|
|
metadata = json.loads(result[15])
|
|
else:
|
|
metadata = {}
|
|
|
|
moderation_type = str.lower(result[2])
|
|
if moderation_type in type_registry:
|
|
moderation_type = type_registry[moderation_type]
|
|
else:
|
|
logger.error("Unknown moderation type in case %s: %s", result[0], result[2])
|
|
|
|
case = {
|
|
"moderation_id": int(result[0]),
|
|
"guild_id": int(guild_id),
|
|
"timestamp": datetime.fromtimestamp(result[1]),
|
|
"moderation_type": moderation_type,
|
|
"target_type": str(result[3]),
|
|
"target_id": int(result[4]),
|
|
"moderator_id": int(result[5]),
|
|
"role_id": int(result[6]) if result[6] is not None else None,
|
|
"duration": duration,
|
|
"end_timestamp": datetime.fromtimestamp(result[8]) if result[8] is not None else None,
|
|
"reason": result[9],
|
|
"resolved": bool(result[10]),
|
|
"resolved_by": result[11],
|
|
"resolve_reason": result[12],
|
|
"expired": bool(result[13]),
|
|
"changes": change_obj_list,
|
|
"metadata": metadata if metadata else {},
|
|
}
|
|
return await cls.from_dict(bot=bot, data=case)
|
|
|
|
@staticmethod
|
|
async def connect() -> Connection:
|
|
"""Connects to the SQLite database, and returns a connection object."""
|
|
try:
|
|
connection = await aiosqlite_connect(
|
|
database=data_manager.cog_data_path(raw_name="Aurora") / "aurora.db"
|
|
)
|
|
return connection
|
|
|
|
except OperationalError as e:
|
|
logger.error("Unable to access the SQLite database!\nError:\n%s", e.msg)
|
|
raise ConnectionRefusedError(
|
|
f"Unable to access the SQLite Database!\n{e.msg}"
|
|
) from e
|
|
|
|
@classmethod
|
|
async def execute(cls, query: str, parameters: tuple | None = None, bot: Red | None = None, guild_id: int | None = None, cursor: Cursor | None = None, return_obj: bool = True) -> Union[Tuple["Moderation"], Iterable[Row]]:
|
|
"""Executes a query on the database.
|
|
|
|
Arguments:
|
|
query (str): The query to execute.
|
|
parameters (tuple): The parameters to pass to the query.
|
|
bot (Red): The bot instance.
|
|
guild_id (int): The ID of the guild to execute the query on.
|
|
cursor (Cursor): The cursor to use for the query.
|
|
return_obj (bool): Whether to return the case object(s). Defaults to `True`. If `False`, returns a `Iterable` of `aiosqlite.Row` objects.
|
|
Returns: The result of the query, either as a `Tuple` of `Moderation` objects or an `Iterable` of `aiosqlite.Row` objects.
|
|
"""
|
|
logger.trace("Executing query: \"%s\" with parameters \"%s\"", query, parameters)
|
|
if not parameters:
|
|
parameters = ()
|
|
if not cursor:
|
|
no_cursor = True
|
|
database = await cls.connect()
|
|
cursor = await database.cursor()
|
|
else:
|
|
no_cursor = False
|
|
|
|
try:
|
|
await cursor.execute(query, parameters)
|
|
except OperationalError as e:
|
|
logger.error("Error executing query: \"%s\" with parameters \"%s\"\nError:\n%s",
|
|
query, parameters, e)
|
|
raise OperationalError(f"Error executing query: \"{query}\" with parameters \"{parameters}\"") from e
|
|
results = await cursor.fetchall()
|
|
await database.commit()
|
|
if no_cursor:
|
|
await cursor.close()
|
|
await database.close()
|
|
|
|
if results and return_obj and bot and guild_id:
|
|
cases = []
|
|
for result in results:
|
|
if result[0] == 0:
|
|
continue
|
|
case = await cls.from_result(bot=bot, result=result, guild_id=guild_id)
|
|
cases.append(case)
|
|
return tuple(cases)
|
|
return results
|
|
|
|
@classmethod
|
|
async def get_latest(cls, bot: Red, guild_id: int, before: datetime = None, after: datetime = None, limit: int | None = None, offset: int = 0, types: Iterable[Type] | None = None, cursor: Cursor | None = None) -> Tuple["Moderation"]:
|
|
params = []
|
|
query = f"SELECT * FROM moderation_{guild_id}"
|
|
conditions = []
|
|
|
|
if types:
|
|
conditions.append(f"moderation_type IN ({', '.join(['?' for _ in types])})")
|
|
for t in types:
|
|
params.append(t.key)
|
|
if before:
|
|
conditions.append("timestamp < ?")
|
|
params.append(int(before.timestamp()))
|
|
if after:
|
|
conditions.append("timestamp > ?")
|
|
params.append(int(after.timestamp()))
|
|
|
|
if conditions:
|
|
query += " WHERE " + " AND ".join(conditions)
|
|
|
|
query += " ORDER BY moderation_id DESC"
|
|
|
|
if limit:
|
|
query += " LIMIT ? OFFSET ?"
|
|
params.extend((limit, offset))
|
|
query += ";"
|
|
return await cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=tuple(params) if params else (), cursor=cursor)
|
|
|
|
@classmethod
|
|
async def get_next_case_number(cls, bot: Red, guild_id: int, cursor: Cursor | None = None) -> int:
|
|
result = await cls.get_latest(bot=bot, guild_id=guild_id, cursor=cursor, limit=1)
|
|
return (result[0].moderation_id + 1) if result else 1
|
|
|
|
@classmethod
|
|
async def find_by_id(cls, bot: Red, moderation_id: int, guild_id: int, cursor: Cursor | None = None) -> "Moderation":
|
|
query = f"SELECT * FROM moderation_{guild_id} WHERE moderation_id = ?;"
|
|
case = await cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=(moderation_id,), cursor=cursor)
|
|
if case:
|
|
return case[0]
|
|
raise ValueError(f"Case {moderation_id} not found in moderation_{guild_id}!")
|
|
|
|
@classmethod
|
|
async def find_by_target(cls, bot: Red, guild_id: int, target: int, before: datetime = None, after: datetime = None, types: Iterable[Type] | None = None, cursor: Cursor | None = None) -> Tuple["Moderation"]:
|
|
query = f"SELECT * FROM moderation_{guild_id} WHERE target_id = ?"
|
|
params = [target]
|
|
if types:
|
|
query += f" AND moderation_type IN ({', '.join(['?' for _ in types])})"
|
|
for t in types:
|
|
params.append(t.key)
|
|
if before:
|
|
query += " AND timestamp < ?"
|
|
params.append(int(before.timestamp()))
|
|
if after:
|
|
query += " AND timestamp > ?"
|
|
params.append(int(after.timestamp()))
|
|
query += " ORDER BY moderation_id DESC;"
|
|
|
|
return await cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=params, cursor=cursor)
|
|
|
|
@classmethod
|
|
async def find_by_moderator(cls, bot: Red, guild_id: int, moderator: int, before: datetime = None, after: datetime = None, types: Iterable[Type] | None = None, cursor: Cursor | None = None) -> Tuple["Moderation"]:
|
|
query = f"SELECT * FROM moderation_{guild_id} WHERE moderator_id = ?"
|
|
params = [moderator]
|
|
if types:
|
|
query += f" AND moderation_type IN ({', '.join(['?' for _ in types])})"
|
|
for t in types:
|
|
params.append(t.key)
|
|
if before:
|
|
query += " AND timestamp < ?"
|
|
params.append(int(before.timestamp()))
|
|
if after:
|
|
query += " AND timestamp > ?"
|
|
params.append(int(after.timestamp()))
|
|
query += " ORDER BY moderation_id DESC;"
|
|
|
|
return await cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=params, cursor=cursor)
|
|
|
|
@classmethod
|
|
async def log(
|
|
cls,
|
|
bot: Red,
|
|
guild_id: int,
|
|
moderator_id: int,
|
|
moderation_type: Type,
|
|
target_type: str,
|
|
target_id: int,
|
|
role_id: int | None = None,
|
|
duration: timedelta | None = None,
|
|
reason: str | None = None,
|
|
database: sqlite3.Connection | None = None,
|
|
timestamp: datetime | None = None,
|
|
resolved: bool = False,
|
|
resolved_by: int | None = None,
|
|
resolved_reason: str | None = None,
|
|
expired: bool | None = None,
|
|
changes: list | None = None,
|
|
metadata: dict | None = None,
|
|
return_obj: bool = True,
|
|
) -> Union["Moderation", int]:
|
|
"""Logs a moderation case in the database.
|
|
|
|
Args:
|
|
bot (Red): The bot instance.
|
|
guild_id (int): The ID of the guild to log the case in.
|
|
moderator_id (int): The ID of the moderator who issued the case.
|
|
moderation_type (Type): The type of moderation case. See `aurora.models.moderation_types` for the built-in options.
|
|
target_type (str): The type of target. Should be either `user` or `channel`.
|
|
target_id (int): The ID of the target.
|
|
role_id (int): The ID of the role, if applicable.
|
|
duration (timedelta): The duration of the case, if applicable.
|
|
reason (str): The reason for the case.
|
|
database (sqlite3.Connection): The database connection to use to log the case. A connection will be automatically created if not provided.
|
|
timestamp (datetime): The timestamp of the case. Will be automatically generated if not provided.
|
|
resolved (bool): Whether the case is resolved.
|
|
resolved_by (int): The ID of the user who resolved the case.
|
|
resolved_reason (str): The reason the case was resolved.
|
|
expired (bool): Whether the case is expired.
|
|
changes (list): A list of changes to log. You usually shouldn't pass this, as it's automatically generated by the `/edit` and `/resolve` commands.
|
|
metadata (dict): A dictionary of metadata to store with the case.
|
|
return_obj (bool): Whether to return the case object. Defaults to `True`. If `False`, returns the case ID.
|
|
|
|
Returns:
|
|
Union[Moderation, int]: The `Moderation` object if `return_obj` is `True`, otherwise the case ID.
|
|
"""
|
|
from ..utilities.json import dumps
|
|
if not timestamp:
|
|
timestamp = datetime.fromtimestamp(time())
|
|
elif not isinstance(timestamp, datetime):
|
|
timestamp = datetime.fromtimestamp(timestamp)
|
|
|
|
if duration == "NULL":
|
|
duration = None
|
|
|
|
if duration is not None:
|
|
end_timestamp = timestamp + duration
|
|
else:
|
|
duration = None
|
|
end_timestamp = None
|
|
|
|
if not expired:
|
|
if end_timestamp:
|
|
expired = bool(timestamp > end_timestamp)
|
|
else:
|
|
expired = False
|
|
|
|
if reason == "NULL":
|
|
reason = None
|
|
|
|
if resolved_by in ["NULL", "?"]:
|
|
resolved_by = None
|
|
|
|
if resolved_reason == "NULL":
|
|
resolved_reason = None
|
|
|
|
if role_id == 0:
|
|
role_id = None
|
|
|
|
if not database:
|
|
database = await cls.connect()
|
|
close_db = True
|
|
else:
|
|
close_db = False
|
|
|
|
moderation_id = await cls.get_next_case_number(bot=bot, guild_id=guild_id)
|
|
|
|
case = {
|
|
"moderation_id": moderation_id,
|
|
"timestamp": timestamp.timestamp(),
|
|
"moderation_type": moderation_type.key,
|
|
"target_type": target_type,
|
|
"target_id": target_id,
|
|
"moderator_id": moderator_id,
|
|
"role_id": role_id,
|
|
"duration": timedelta_to_string(duration) if duration else None,
|
|
"end_timestamp": end_timestamp.timestamp() if end_timestamp else None,
|
|
"reason": reason,
|
|
"resolved": resolved,
|
|
"resolved_by": resolved_by,
|
|
"resolve_reason": resolved_reason,
|
|
"expired": expired,
|
|
"changes": dumps(changes),
|
|
"metadata": dumps(metadata)
|
|
}
|
|
|
|
sql = f"INSERT INTO `moderation_{guild_id}` (moderation_id, timestamp, moderation_type, target_type, target_id, moderator_id, role_id, duration, end_timestamp, reason, resolved, resolved_by, resolve_reason, expired, changes, metadata) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
|
|
await database.execute(sql, tuple(case.values()))
|
|
|
|
await database.commit()
|
|
if close_db:
|
|
await database.close()
|
|
|
|
logger.verbose(
|
|
"Row inserted into moderation_%s!\n%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s",
|
|
guild_id,
|
|
case["moderation_id"],
|
|
case["timestamp"],
|
|
case["moderation_type"],
|
|
case["target_type"],
|
|
case["target_id"],
|
|
case["moderator_id"],
|
|
case["role_id"],
|
|
case["duration"],
|
|
case["end_timestamp"],
|
|
case["reason"],
|
|
case["resolved"],
|
|
case["resolved_by"],
|
|
case["resolve_reason"],
|
|
case["expired"],
|
|
case["changes"],
|
|
case["metadata"],
|
|
)
|
|
|
|
if return_obj:
|
|
return await cls.find_by_id(bot=bot, moderation_id=moderation_id, guild_id=guild_id)
|
|
return moderation_id
|