import json import sqlite3 from datetime import datetime, timedelta from time import time from typing import Dict, Iterable, List, Optional, Union import discord from discord import NotFound from redbot.core.bot import Red from aurora.models.base import AuroraGuildModel from aurora.models.change import Change from aurora.models.partials import PartialChannel, PartialRole, PartialUser from aurora.utilities.logger import logger from aurora.utilities.utils import get_next_case_number class Moderation(AuroraGuildModel): moderation_id: int timestamp: datetime moderation_type: str target_type: str target_id: int moderator_id: int role_id: Optional[int] = None duration: Optional[timedelta] = None end_timestamp: Optional[datetime] = None reason: Optional[str] = None resolved: bool resolved_by: Optional[int] = None resolve_reason: Optional[str] = None expired: bool changes: List["Change"] metadata: Dict @property def id(self) -> int: return self.moderation_id @property def type(self) -> str: return self.moderation_type @property def unix_timestamp(self) -> int: return int(self.timestamp.timestamp()) async def get_moderator(self) -> "PartialUser": return await PartialUser.from_id(self.bot, self.moderator_id) async def get_target(self) -> Union["PartialUser", "PartialChannel"]: if self.target_type == "USER": return await PartialUser.from_id(self.bot, self.target_id) return await PartialChannel.from_id(self.bot, self.target_id) async def get_resolved_by(self) -> Optional["PartialUser"]: if self.resolved_by: return await PartialUser.from_id(self.bot, self.resolved_by) return None async def get_role(self) -> Optional["PartialRole"]: if self.role_id: return await PartialRole.from_id(self.bot, self.guild_id, self.role_id) return None def __str__(self): return f"{self.moderation_type} {self.target_type} {self.target_id} {self.reason}" async def resolve(self, resolved_by: int, reason: str): if self.resolved: raise ValueError("Case is already resolved!") self.resolved = True self.resolved_by = resolved_by self.resolve_reason = reason if self.type in ["UNMUTE", "UNBAN"]: raise TypeError("Cannot resolve an unmute or unban case!") if self.type == "MUTE": try: guild: discord.Guild = await self.bot.fetch_guild(self.guild_id) member = await guild.fetch_member(self.target_id) await member.timeout( None, reason=f"Case {self.moderation_id} resolved by {resolved_by}{' for '+ reason if reason else ''}" ) except NotFound: pass if self.type in ["BAN", "TEMPBAN"]: try: guild: discord.Guild = await self.bot.fetch_guild(self.guild_id) await guild.unban(await self.get_target(), reason=f"Case {self.moderation_id} resolved by {resolved_by}{' for '+ reason if reason else ''}") except NotFound: pass if not self.changes: self.changes.append(Change.from_dict(self.bot, { "type": "ORIGINAL", "timestamp": self.timestamp, "reason": self.reason, "user_id": self.moderator_id, "duration": self.duration, "end_timestamp": self.end_timestamp, })) self.changes.append(Change.from_dict(self.bot, { "type": "RESOLVE", "timestamp": datetime.now(), "reason": reason, "user_id": resolved_by, })) self.update() def update(self): from aurora.utilities.database import connect from aurora.utilities.json import dumps query = f"UPDATE moderation_{self.guild_id} SET timestamp = ?, moderation_type = ?, target_type = ?, moderator_id = ?, role_id = ?, duration = ?, end_timestamp = ?, reason = ?, resolved = ?, resolved_by = ?, resolve_reason = ?, expired = ?, changes = ?, metadata = ? WHERE moderation_id = ?;" with connect() as database: cursor = database.cursor() cursor.execute(query, ( self.timestamp.timestamp(), self.moderation_type, self.target_type, self.moderator_id, self.role_id, str(self.duration) if self.duration else None, self.end_timestamp.timestamp() if self.end_timestamp else None, self.reason, self.resolved, self.resolved_by, self.resolve_reason, self.expired, dumps(self.changes), dumps(self.metadata), self.moderation_id, )) cursor.close() logger.debug("Row updated in moderation_%s!\n%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s", self.moderation_id, self.guild_id, self.timestamp.timestamp(), self.moderation_type, self.target_type, self.moderator_id, self.role_id, str(self.duration) if self.duration else None, self.end_timestamp.timestamp() if self.end_timestamp else None, self.reason, self.resolved, self.resolved_by, self.resolve_reason, self.expired, dumps(self.changes), dumps(self.metadata), ) @classmethod def from_dict(cls, bot: Red, data: dict) -> "Moderation": return cls(bot=bot, **data) @classmethod def from_result(cls, bot: Red, result: Iterable, guild_id: int) -> "Moderation": if result[7] is not None: hours, minutes, seconds = map(int, result[7].split(':')) duration = timedelta(hours=hours, minutes=minutes, seconds=seconds) else: duration = None if result[14] is not None: changes = json.loads(result[14]) change_obj_list = [] for change in changes: change_obj_list.append(Change.from_dict(bot=bot, data=change)) case = { "moderation_id": int(result[0]), "guild_id": int(guild_id), "timestamp": datetime.fromtimestamp(result[1]), "moderation_type": str(result[2]), "target_type": str(result[3]), "target_id": int(result[4]), "moderator_id": int(result[5]), "role_id": int(result[6]) if result[6] is not None else None, "duration": duration, "end_timestamp": datetime.fromtimestamp(result[8]) if result[8] is not None else None, "reason": result[9], "resolved": bool(result[10]), "resolved_by": result[11], "resolve_reason": result[12], "expired": bool(result[13]), "changes": change_obj_list if result[14] else [], "metadata": json.loads(result[15].replace('\\"', '"').replace('["{', '[{').replace('}"]', '}]')) if result[15] else {}, } return cls.from_dict(bot=bot, data=case) @classmethod def from_sql(cls, bot: Red, moderation_id: int, guild_id: int) -> "Moderation": from aurora.utilities.database import connect query = f"SELECT * FROM moderation_{guild_id} WHERE moderation_id = ?;" with connect() as database: cursor = database.cursor() cursor.execute(query, (moderation_id,)) result = cursor.fetchone() cursor.close() if result and not moderation_id == 0: return cls.from_result(bot, result, guild_id) raise ValueError(f"Case {moderation_id} not found in moderation_{guild_id}!") @classmethod def log( cls, bot: Red, guild_id: int, moderator_id: int, moderation_type: str, target_type: str, target_id: int, role_id: int, duration: timedelta = None, reason: str = None, database: sqlite3.Connection = None, timestamp: datetime = None, resolved: bool = False, resolved_by: int = None, resolved_reason: str = None, expired: bool = None, changes: list = None, metadata: dict = None, ) -> "Moderation": from aurora.utilities.database import connect from aurora.utilities.json import dumps if not timestamp: timestamp = datetime.fromtimestamp(time()) elif not isinstance(timestamp, datetime): timestamp = datetime.fromtimestamp(timestamp) if duration != "NULL" and duration is not None: end_timestamp = timestamp + duration else: duration = None end_timestamp = None if not expired: expired = bool(timestamp > end_timestamp) if reason == "NULL": reason = None if resolved_by == "NULL": resolved_by = None if resolved_reason == "NULL": resolved_reason = None if role_id == 0: role_id = None if not database: database = connect() close_db = True else: close_db = False cursor = database.cursor() moderation_id = get_next_case_number(guild_id=guild_id, cursor=cursor) case = { "moderation_id": moderation_id, "timestamp": timestamp.timestamp(), "moderation_type": moderation_type, "target_type": target_type, "target_id": target_id, "moderator_id": moderator_id, "role_id": role_id, "duration": str(duration) if duration else None, "end_timestamp": end_timestamp.timestamp() if end_timestamp else None, "reason": reason, "resolved": resolved, "resolved_by": resolved_by, "resolve_reason": resolved_reason, "expired": expired, "changes": dumps(changes), "metadata": dumps(metadata) } sql = f"INSERT INTO `moderation_{guild_id}` (moderation_id, timestamp, moderation_type, target_type, target_id, moderator_id, role_id, duration, end_timestamp, reason, resolved, resolved_by, resolve_reason, expired, changes, metadata) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" cursor.execute(sql, tuple(case.values())) cursor.close() database.commit() if close_db: database.close() logger.debug( "Row inserted into moderation_%s!\n%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s", guild_id, case["moderation_id"], case["timestamp"], case["moderation_type"], case["target_type"], case["target_id"], case["moderator_id"], case["role_id"], case["duration"], case["end_timestamp"], case["reason"], case["resolved"], case["resolved_by"], case["resolve_reason"], case["expired"], case["changes"], case["metadata"], ) return cls.from_sql(bot=bot, moderation_id=moderation_id, guild_id=guild_id)