SeaCogs/aurora/models/moderation.py

396 lines
15 KiB
Python

import json
import sqlite3
from datetime import datetime, timedelta
from time import time
from typing import Dict, Iterable, List, Optional, Tuple, Union
import discord
from aiosqlite import Cursor
from discord import NotFound
from redbot.core.bot import Red
from ..utilities.logger import logger
from ..utilities.utils import timedelta_to_string
from .base import AuroraGuildModel
from .change import Change
from .partials import PartialChannel, PartialRole, PartialUser
class Moderation(AuroraGuildModel):
moderation_id: int
timestamp: datetime
moderation_type: str
target_type: str
target_id: int
moderator_id: int
role_id: Optional[int] = None
duration: Optional[timedelta] = None
end_timestamp: Optional[datetime] = None
reason: Optional[str] = None
resolved: bool
resolved_by: Optional[int] = None
resolve_reason: Optional[str] = None
expired: bool
changes: List["Change"]
metadata: Dict
@property
def id(self) -> int:
return self.moderation_id
@property
def type(self) -> str:
return self.moderation_type
@property
def unix_timestamp(self) -> int:
return int(self.timestamp.timestamp())
async def get_moderator(self) -> "PartialUser":
return await PartialUser.from_id(self.bot, self.moderator_id)
async def get_target(self) -> Union["PartialUser", "PartialChannel"]:
if self.target_type == "USER":
return await PartialUser.from_id(self.bot, self.target_id)
return await PartialChannel.from_id(self.bot, self.target_id)
async def get_resolved_by(self) -> Optional["PartialUser"]:
if self.resolved_by:
return await PartialUser.from_id(self.bot, self.resolved_by)
return None
async def get_role(self) -> Optional["PartialRole"]:
if self.role_id:
return await PartialRole.from_id(self.bot, self.guild_id, self.role_id)
return None
def __str__(self) -> str:
return f"{self.moderation_type} {self.target_type} {self.target_id} {self.reason}"
def __int__(self) -> int:
return self.moderation_id
async def resolve(self, resolved_by: int, reason: str) -> None:
if self.resolved:
raise ValueError("Case is already resolved!")
self.resolved = True
self.resolved_by = resolved_by
self.resolve_reason = reason
if self.type in ["UNMUTE", "UNBAN"]:
raise TypeError("Cannot resolve an unmute or unban case!")
if self.type == "MUTE":
try:
guild: discord.Guild = await self.bot.fetch_guild(self.guild_id)
member = await guild.fetch_member(self.target_id)
await member.timeout(
None, reason=f"Case {self.moderation_id} resolved by {resolved_by}{' for '+ reason if reason else ''}"
)
except NotFound:
pass
if self.type in ["BAN", "TEMPBAN"]:
try:
guild: discord.Guild = await self.bot.fetch_guild(self.guild_id)
await guild.unban(await self.get_target(), reason=f"Case {self.moderation_id} resolved by {resolved_by}{' for '+ reason if reason else ''}")
except NotFound:
pass
if not self.changes:
self.changes.append(Change.from_dict(self.bot, {
"type": "ORIGINAL",
"timestamp": self.timestamp,
"reason": self.reason,
"user_id": self.moderator_id,
"duration": self.duration,
"end_timestamp": self.end_timestamp,
}))
self.changes.append(Change.from_dict(self.bot, {
"type": "RESOLVE",
"timestamp": datetime.now(),
"reason": reason,
"user_id": resolved_by,
}))
await self.update()
async def update(self) -> None:
from ..utilities.database import connect
from ..utilities.json import dumps
query = f"UPDATE moderation_{self.guild_id} SET timestamp = ?, moderation_type = ?, target_type = ?, moderator_id = ?, role_id = ?, duration = ?, end_timestamp = ?, reason = ?, resolved = ?, resolved_by = ?, resolve_reason = ?, expired = ?, changes = ?, metadata = ? WHERE moderation_id = ?;"
database = await connect()
await database.execute(query, (
self.timestamp.timestamp(),
self.moderation_type,
self.target_type,
self.moderator_id,
self.role_id,
str(self.duration) if self.duration else None,
self.end_timestamp.timestamp() if self.end_timestamp else None,
self.reason,
self.resolved,
self.resolved_by,
self.resolve_reason,
self.expired,
dumps(self.changes),
dumps(self.metadata),
self.moderation_id,
))
await database.commit()
await database.close()
logger.debug("Row updated in moderation_%s!\n%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s",
self.moderation_id,
self.guild_id,
self.timestamp.timestamp(),
self.moderation_type,
self.target_type,
self.moderator_id,
self.role_id,
str(self.duration) if self.duration else None,
self.end_timestamp.timestamp() if self.end_timestamp else None,
self.reason,
self.resolved,
self.resolved_by,
self.resolve_reason,
self.expired,
dumps(self.changes),
dumps(self.metadata),
)
@classmethod
def from_dict(cls, bot: Red, data: dict) -> "Moderation":
return cls(bot=bot, **data)
@classmethod
def from_result(cls, bot: Red, result: Iterable, guild_id: int) -> "Moderation":
if result[7] is not None and result[7] != "NULL":
hours, minutes, seconds = map(int, result[7].split(':'))
duration = timedelta(hours=hours, minutes=minutes, seconds=seconds)
else:
duration = None
if result[14] is not None:
changes = json.loads(result[14])
change_obj_list = []
if changes:
for change in changes:
change_obj_list.append(Change.from_dict(bot=bot, data=change))
if result[15] is not None:
metadata = json.loads(result[15])
else:
metadata = {}
case = {
"moderation_id": int(result[0]),
"guild_id": int(guild_id),
"timestamp": datetime.fromtimestamp(result[1]),
"moderation_type": str(result[2]),
"target_type": str(result[3]),
"target_id": int(result[4]),
"moderator_id": int(result[5]),
"role_id": int(result[6]) if result[6] is not None else None,
"duration": duration,
"end_timestamp": datetime.fromtimestamp(result[8]) if result[8] is not None else None,
"reason": result[9],
"resolved": bool(result[10]),
"resolved_by": result[11],
"resolve_reason": result[12],
"expired": bool(result[13]),
"changes": change_obj_list,
"metadata": metadata if metadata else {},
}
return cls.from_dict(bot=bot, data=case)
@classmethod
async def execute(cls, bot: Red, guild_id: int, query: str, parameters: tuple | None = None, cursor: Cursor | None = None, return_obj: bool = True) -> Tuple["Moderation"]:
from ..utilities.database import connect
logger.trace("Executing query: %s", query)
logger.trace("With parameters: %s", parameters)
if not parameters:
parameters = ()
if not cursor:
no_cursor = True
database = await connect()
cursor = await database.cursor()
else:
no_cursor = False
await cursor.execute(query, parameters)
results = await cursor.fetchall()
if no_cursor:
await cursor.close()
await database.close()
if results and return_obj:
cases = []
for result in results:
case = cls.from_result(bot=bot, result=result, guild_id=guild_id)
if case.moderation_id != 0:
cases.append(case)
return tuple(cases)
return ()
@classmethod
async def get_latest(cls, bot: Red, guild_id: int, limit: int | None = None, offset: int = 0, types: Iterable | None = None, cursor: Cursor | None = None) -> Tuple["Moderation"]:
params = []
query = f"SELECT * FROM moderation_{guild_id} ORDER BY moderation_id DESC"
if types:
query += f" WHERE moderation_type IN ({', '.join(['?' for _ in types])})"
params.extend(types)
if limit:
query += " LIMIT ? OFFSET ?"
params.extend((limit, offset))
query += ";"
return await cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=tuple(params) if params else (), cursor=cursor)
@classmethod
async def get_next_case_number(cls, bot: Red, guild_id: int, cursor: Cursor | None = None) -> int:
result = await cls.get_latest(bot=bot, guild_id=guild_id, cursor=cursor, limit=1)
return (result[0].moderation_id + 1) if result else 1
@classmethod
async def find_by_id(cls, bot: Red, moderation_id: int, guild_id: int, cursor: Cursor | None = None) -> "Moderation":
query = f"SELECT * FROM moderation_{guild_id} WHERE moderation_id = ?;"
case = await cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=(moderation_id,), cursor=cursor)
if case:
return case[0]
raise ValueError(f"Case {moderation_id} not found in moderation_{guild_id}!")
@classmethod
async def find_by_target(cls, bot: Red, guild_id: int, target: int, types: Iterable | None = None, cursor: Cursor | None = None) -> Tuple["Moderation"]:
query = f"SELECT * FROM moderation_{guild_id} WHERE target_id = ?"
if types:
query += f" AND moderation_type IN ({', '.join(['?' for _ in types])})"
query += " ORDER BY moderation_id DESC;"
return await cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=(target, *types) if types else (target,), cursor=cursor)
@classmethod
async def find_by_moderator(cls, bot: Red, guild_id: int, moderator: int, types: Iterable | None = None, cursor: Cursor | None = None) -> Tuple["Moderation"]:
query = f"SELECT * FROM moderation_{guild_id} WHERE moderator_id = ?"
if types:
query += f" AND moderation_type IN ({', '.join(['?' for _ in types])})"
query += " ORDER BY moderation_id DESC;"
return await cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=(moderator, *types) if types else (moderator,), cursor=cursor)
@classmethod
async def log(
cls,
bot: Red,
guild_id: int,
moderator_id: int,
moderation_type: str,
target_type: str,
target_id: int,
role_id: int | None = None,
duration: timedelta | None = None,
reason: str | None = None,
database: sqlite3.Connection | None = None,
timestamp: datetime | None = None,
resolved: bool = False,
resolved_by: int | None = None,
resolved_reason: str | None = None,
expired: bool | None = None,
changes: list | None = None,
metadata: dict | None = None,
return_obj: bool = True,
) -> Union["Moderation", int]:
from ..utilities.database import connect
from ..utilities.json import dumps
if not timestamp:
timestamp = datetime.fromtimestamp(time())
elif not isinstance(timestamp, datetime):
timestamp = datetime.fromtimestamp(timestamp)
if duration == "NULL":
duration = None
if duration is not None:
end_timestamp = timestamp + duration
else:
duration = None
end_timestamp = None
if not expired:
if end_timestamp:
expired = bool(timestamp > end_timestamp)
else:
expired = False
if reason == "NULL":
reason = None
if resolved_by in ["NULL", "?"]:
resolved_by = None
if resolved_reason == "NULL":
resolved_reason = None
if role_id == 0:
role_id = None
if not database:
database = await connect()
close_db = True
else:
close_db = False
moderation_id = await cls.get_next_case_number(bot=bot, guild_id=guild_id)
case = {
"moderation_id": moderation_id,
"timestamp": timestamp.timestamp(),
"moderation_type": moderation_type,
"target_type": target_type,
"target_id": target_id,
"moderator_id": moderator_id,
"role_id": role_id,
"duration": timedelta_to_string(duration) if duration else None,
"end_timestamp": end_timestamp.timestamp() if end_timestamp else None,
"reason": reason,
"resolved": resolved,
"resolved_by": resolved_by,
"resolve_reason": resolved_reason,
"expired": expired,
"changes": dumps(changes),
"metadata": dumps(metadata)
}
sql = f"INSERT INTO `moderation_{guild_id}` (moderation_id, timestamp, moderation_type, target_type, target_id, moderator_id, role_id, duration, end_timestamp, reason, resolved, resolved_by, resolve_reason, expired, changes, metadata) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
await database.execute(sql, tuple(case.values()))
await database.commit()
if close_db:
await database.close()
logger.verbose(
"Row inserted into moderation_%s!\n%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s",
guild_id,
case["moderation_id"],
case["timestamp"],
case["moderation_type"],
case["target_type"],
case["target_id"],
case["moderator_id"],
case["role_id"],
case["duration"],
case["end_timestamp"],
case["reason"],
case["resolved"],
case["resolved_by"],
case["resolve_reason"],
case["expired"],
case["changes"],
case["metadata"],
)
if return_obj:
return await cls.find_by_id(bot=bot, moderation_id=moderation_id, guild_id=guild_id)
return moderation_id