import json import sqlite3 from datetime import datetime, timedelta from time import time from typing import Dict, Iterable, List, Optional, Tuple, Union import discord from aiosqlite import Cursor from discord import NotFound from redbot.core.bot import Red from ..utilities.logger import logger from ..utilities.utils import timedelta_to_string from .base import AuroraGuildModel from .change import Change from .partials import PartialChannel, PartialRole, PartialUser class Moderation(AuroraGuildModel): moderation_id: int timestamp: datetime moderation_type: str target_type: str target_id: int moderator_id: int role_id: Optional[int] = None duration: Optional[timedelta] = None end_timestamp: Optional[datetime] = None reason: Optional[str] = None resolved: bool resolved_by: Optional[int] = None resolve_reason: Optional[str] = None expired: bool changes: List["Change"] metadata: Dict @property def id(self) -> int: return self.moderation_id @property def type(self) -> str: return self.moderation_type @property def unix_timestamp(self) -> int: return int(self.timestamp.timestamp()) async def get_moderator(self) -> "PartialUser": return await PartialUser.from_id(self.bot, self.moderator_id) async def get_target(self) -> Union["PartialUser", "PartialChannel"]: if self.target_type == "USER": return await PartialUser.from_id(self.bot, self.target_id) return await PartialChannel.from_id(self.bot, self.target_id) async def get_resolved_by(self) -> Optional["PartialUser"]: if self.resolved_by: return await PartialUser.from_id(self.bot, self.resolved_by) return None async def get_role(self) -> Optional["PartialRole"]: if self.role_id: return await PartialRole.from_id(self.bot, self.guild_id, self.role_id) return None def __str__(self) -> str: return f"{self.moderation_type} {self.target_type} {self.target_id} {self.reason}" def __int__(self) -> int: return self.moderation_id async def resolve(self, resolved_by: int, reason: str) -> None: if self.resolved: raise ValueError("Case is already resolved!") self.resolved = True self.resolved_by = resolved_by self.resolve_reason = reason if self.type in ["UNMUTE", "UNBAN"]: raise TypeError("Cannot resolve an unmute or unban case!") if self.type == "MUTE": try: guild: discord.Guild = await self.bot.fetch_guild(self.guild_id) member = await guild.fetch_member(self.target_id) await member.timeout( None, reason=f"Case {self.moderation_id} resolved by {resolved_by}{' for '+ reason if reason else ''}" ) except NotFound: pass if self.type in ["BAN", "TEMPBAN"]: try: guild: discord.Guild = await self.bot.fetch_guild(self.guild_id) await guild.unban(await self.get_target(), reason=f"Case {self.moderation_id} resolved by {resolved_by}{' for '+ reason if reason else ''}") except NotFound: pass if not self.changes: self.changes.append(Change.from_dict(self.bot, { "type": "ORIGINAL", "timestamp": self.timestamp, "reason": self.reason, "user_id": self.moderator_id, "duration": self.duration, "end_timestamp": self.end_timestamp, })) self.changes.append(Change.from_dict(self.bot, { "type": "RESOLVE", "timestamp": datetime.now(), "reason": reason, "user_id": resolved_by, })) await self.update() async def update(self) -> None: from ..utilities.database import connect from ..utilities.json import dumps query = f"UPDATE moderation_{self.guild_id} SET timestamp = ?, moderation_type = ?, target_type = ?, moderator_id = ?, role_id = ?, duration = ?, end_timestamp = ?, reason = ?, resolved = ?, resolved_by = ?, resolve_reason = ?, expired = ?, changes = ?, metadata = ? WHERE moderation_id = ?;" database = await connect() await database.execute(query, ( self.timestamp.timestamp(), self.moderation_type, self.target_type, self.moderator_id, self.role_id, str(self.duration) if self.duration else None, self.end_timestamp.timestamp() if self.end_timestamp else None, self.reason, self.resolved, self.resolved_by, self.resolve_reason, self.expired, dumps(self.changes), dumps(self.metadata), self.moderation_id, )) await database.commit() await database.close() logger.debug("Row updated in moderation_%s!\n%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s", self.moderation_id, self.guild_id, self.timestamp.timestamp(), self.moderation_type, self.target_type, self.moderator_id, self.role_id, str(self.duration) if self.duration else None, self.end_timestamp.timestamp() if self.end_timestamp else None, self.reason, self.resolved, self.resolved_by, self.resolve_reason, self.expired, dumps(self.changes), dumps(self.metadata), ) @classmethod def from_dict(cls, bot: Red, data: dict) -> "Moderation": return cls(bot=bot, **data) @classmethod def from_result(cls, bot: Red, result: Iterable, guild_id: int) -> "Moderation": if result[7] is not None and result[7] != "NULL": hours, minutes, seconds = map(int, result[7].split(':')) duration = timedelta(hours=hours, minutes=minutes, seconds=seconds) else: duration = None if result[14] is not None: changes = json.loads(result[14]) change_obj_list = [] if changes: for change in changes: change_obj_list.append(Change.from_dict(bot=bot, data=change)) if result[15] is not None: metadata = json.loads(result[15]) else: metadata = {} case = { "moderation_id": int(result[0]), "guild_id": int(guild_id), "timestamp": datetime.fromtimestamp(result[1]), "moderation_type": str(result[2]), "target_type": str(result[3]), "target_id": int(result[4]), "moderator_id": int(result[5]), "role_id": int(result[6]) if result[6] is not None else None, "duration": duration, "end_timestamp": datetime.fromtimestamp(result[8]) if result[8] is not None else None, "reason": result[9], "resolved": bool(result[10]), "resolved_by": result[11], "resolve_reason": result[12], "expired": bool(result[13]), "changes": change_obj_list, "metadata": metadata if metadata else {}, } return cls.from_dict(bot=bot, data=case) @classmethod async def execute(cls, bot: Red, guild_id: int, query: str, parameters: tuple | None = None, cursor: Cursor | None = None) -> Tuple["Moderation"]: from ..utilities.database import connect logger.trace("Executing query: %s", query) logger.trace("With parameters: %s", parameters) if not parameters: parameters = () if not cursor: no_cursor = True database = await connect() cursor = await database.cursor() else: no_cursor = False await cursor.execute(query, parameters) results = await cursor.fetchall() if no_cursor: await cursor.close() await database.close() if results: cases = [] for result in results: case = cls.from_result(bot=bot, result=result, guild_id=guild_id) if case.moderation_id != 0: cases.append(case) return tuple(cases) return () @classmethod async def get_latest(cls, bot: Red, guild_id: int, limit: int | None = None, offset: int = 0, types: Iterable | None = None, cursor: Cursor | None = None) -> Tuple["Moderation"]: params = [] query = f"SELECT * FROM moderation_{guild_id} ORDER BY moderation_id DESC" if types: query += f" WHERE moderation_type IN ({', '.join(['?' for _ in types])})" params.extend(types) if limit: query += " LIMIT ? OFFSET ?" params.extend((limit, offset)) query += ";" return await cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=tuple(params) if params else (), cursor=cursor) @classmethod async def get_next_case_number(cls, bot: Red, guild_id: int, cursor: Cursor | None = None) -> int: result = await cls.get_latest(bot=bot, guild_id=guild_id, cursor=cursor, limit=1) return (result[0].moderation_id + 1) if result else 1 @classmethod async def find_by_id(cls, bot: Red, moderation_id: int, guild_id: int, cursor: Cursor | None = None) -> "Moderation": query = f"SELECT * FROM moderation_{guild_id} WHERE moderation_id = ?;" case = await cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=(moderation_id,), cursor=cursor) if case: return case[0] raise ValueError(f"Case {moderation_id} not found in moderation_{guild_id}!") @classmethod async def find_by_target(cls, bot: Red, guild_id: int, target: int, types: Iterable | None = None, cursor: Cursor | None = None) -> Tuple["Moderation"]: query = f"SELECT * FROM moderation_{guild_id} WHERE target_id = ?" if types: query += f" AND moderation_type IN ({', '.join(['?' for _ in types])})" query += " ORDER BY moderation_id DESC;" return await cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=(target, *types) if types else (target,), cursor=cursor) @classmethod async def find_by_moderator(cls, bot: Red, guild_id: int, moderator: int, types: Iterable | None = None, cursor: Cursor | None = None) -> Tuple["Moderation"]: query = f"SELECT * FROM moderation_{guild_id} WHERE moderator_id = ?" if types: query += f" AND moderation_type IN ({', '.join(['?' for _ in types])})" query += " ORDER BY moderation_id DESC;" return await cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=(moderator, *types) if types else (moderator,), cursor=cursor) @classmethod async def log( cls, bot: Red, guild_id: int, moderator_id: int, moderation_type: str, target_type: str, target_id: int, role_id: int | None = None, duration: timedelta | None = None, reason: str | None = None, database: sqlite3.Connection | None = None, timestamp: datetime | None = None, resolved: bool = False, resolved_by: int | None = None, resolved_reason: str | None = None, expired: bool | None = None, changes: list | None = None, metadata: dict | None = None, return_obj: bool = True, ) -> Union["Moderation", int]: from ..utilities.database import connect from ..utilities.json import dumps if not timestamp: timestamp = datetime.fromtimestamp(time()) elif not isinstance(timestamp, datetime): timestamp = datetime.fromtimestamp(timestamp) if duration == "NULL": duration = None if duration is not None: end_timestamp = timestamp + duration else: duration = None end_timestamp = None if not expired: if end_timestamp: expired = bool(timestamp > end_timestamp) else: expired = False if reason == "NULL": reason = None if resolved_by in ["NULL", "?"]: resolved_by = None if resolved_reason == "NULL": resolved_reason = None if role_id == 0: role_id = None if not database: database = await connect() close_db = True else: close_db = False moderation_id = cls.get_next_case_number(bot=bot, guild_id=guild_id) case = { "moderation_id": moderation_id, "timestamp": timestamp.timestamp(), "moderation_type": moderation_type, "target_type": target_type, "target_id": target_id, "moderator_id": moderator_id, "role_id": role_id, "duration": timedelta_to_string(duration) if duration else None, "end_timestamp": end_timestamp.timestamp() if end_timestamp else None, "reason": reason, "resolved": resolved, "resolved_by": resolved_by, "resolve_reason": resolved_reason, "expired": expired, "changes": dumps(changes), "metadata": dumps(metadata) } sql = f"INSERT INTO `moderation_{guild_id}` (moderation_id, timestamp, moderation_type, target_type, target_id, moderator_id, role_id, duration, end_timestamp, reason, resolved, resolved_by, resolve_reason, expired, changes, metadata) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" await database.execute(sql, tuple(case.values())) await database.close() await database.commit() if close_db: await database.close() logger.debug( "Row inserted into moderation_%s!\n%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s", guild_id, case["moderation_id"], case["timestamp"], case["moderation_type"], case["target_type"], case["target_id"], case["moderator_id"], case["role_id"], case["duration"], case["end_timestamp"], case["reason"], case["resolved"], case["resolved_by"], case["resolve_reason"], case["expired"], case["changes"], case["metadata"], ) if return_obj: return await cls.find_by_id(bot=bot, moderation_id=moderation_id, guild_id=guild_id) return moderation_id