SeaCogs/aurora/models/moderation.py

397 lines
15 KiB
Python
Raw Normal View History

2024-05-06 15:11:11 -04:00
import json
import sqlite3
from datetime import datetime, timedelta
from time import time
from typing import Dict, Iterable, List, Optional, Tuple, Union
2024-05-04 13:41:11 -04:00
import discord
2024-06-05 00:14:43 -04:00
from aiosqlite import Cursor
2024-05-06 17:23:59 -04:00
from discord import NotFound
from redbot.core.bot import Red
2024-05-04 13:42:58 -04:00
2024-05-06 21:39:43 -04:00
from ..utilities.logger import logger
from ..utilities.utils import timedelta_to_string
2024-05-09 21:27:26 -04:00
from .base import AuroraGuildModel
from .change import Change
from .partials import PartialChannel, PartialRole, PartialUser
2024-05-04 13:41:11 -04:00
class Moderation(AuroraGuildModel):
2024-05-04 13:41:11 -04:00
moderation_id: int
timestamp: datetime
2024-05-04 13:41:11 -04:00
moderation_type: str
target_type: str
target_id: int
moderator_id: int
role_id: Optional[int] = None
duration: Optional[timedelta] = None
end_timestamp: Optional[datetime] = None
reason: Optional[str] = None
2024-05-04 13:41:11 -04:00
resolved: bool
resolved_by: Optional[int] = None
resolve_reason: Optional[str] = None
2024-05-04 13:41:11 -04:00
expired: bool
changes: List["Change"]
2024-05-04 13:41:11 -04:00
metadata: Dict
@property
def id(self) -> int:
return self.moderation_id
@property
def type(self) -> str:
return self.moderation_type
@property
def unix_timestamp(self) -> int:
return int(self.timestamp.timestamp())
async def get_moderator(self) -> "PartialUser":
return await PartialUser.from_id(self.bot, self.moderator_id)
async def get_target(self) -> Union["PartialUser", "PartialChannel"]:
if self.target_type == "USER":
return await PartialUser.from_id(self.bot, self.target_id)
2024-05-06 16:47:21 -04:00
return await PartialChannel.from_id(self.bot, self.target_id)
async def get_resolved_by(self) -> Optional["PartialUser"]:
if self.resolved_by:
return await PartialUser.from_id(self.bot, self.resolved_by)
return None
async def get_role(self) -> Optional["PartialRole"]:
if self.role_id:
return await PartialRole.from_id(self.bot, self.guild_id, self.role_id)
return None
def __str__(self) -> str:
2024-05-04 13:41:11 -04:00
return f"{self.moderation_type} {self.target_type} {self.target_id} {self.reason}"
def __int__(self) -> int:
return self.moderation_id
async def resolve(self, resolved_by: int, reason: str) -> None:
if self.resolved:
raise ValueError("Case is already resolved!")
self.resolved = True
self.resolved_by = resolved_by
self.resolve_reason = reason
if self.type in ["UNMUTE", "UNBAN"]:
raise TypeError("Cannot resolve an unmute or unban case!")
if self.type == "MUTE":
try:
guild: discord.Guild = await self.bot.fetch_guild(self.guild_id)
member = await guild.fetch_member(self.target_id)
await member.timeout(
2024-05-06 15:32:01 -04:00
None, reason=f"Case {self.moderation_id} resolved by {resolved_by}{' for '+ reason if reason else ''}"
)
except NotFound:
pass
if self.type in ["BAN", "TEMPBAN"]:
try:
guild: discord.Guild = await self.bot.fetch_guild(self.guild_id)
2024-05-06 15:32:01 -04:00
await guild.unban(await self.get_target(), reason=f"Case {self.moderation_id} resolved by {resolved_by}{' for '+ reason if reason else ''}")
except NotFound:
pass
if not self.changes:
self.changes.append(Change.from_dict(self.bot, {
"type": "ORIGINAL",
"timestamp": self.timestamp,
"reason": self.reason,
"user_id": self.moderator_id,
"duration": self.duration,
"end_timestamp": self.end_timestamp,
}))
self.changes.append(Change.from_dict(self.bot, {
"type": "RESOLVE",
"timestamp": datetime.now(),
"reason": reason,
"user_id": resolved_by,
}))
2024-06-05 00:14:43 -04:00
await self.update()
2024-06-05 00:14:43 -04:00
async def update(self) -> None:
2024-05-06 21:46:01 -04:00
from ..utilities.database import connect
from ..utilities.json import dumps
query = f"UPDATE moderation_{self.guild_id} SET timestamp = ?, moderation_type = ?, target_type = ?, moderator_id = ?, role_id = ?, duration = ?, end_timestamp = ?, reason = ?, resolved = ?, resolved_by = ?, resolve_reason = ?, expired = ?, changes = ?, metadata = ? WHERE moderation_id = ?;"
database = await connect()
await database.execute(query, (
self.timestamp.timestamp(),
self.moderation_type,
self.target_type,
self.moderator_id,
self.role_id,
str(self.duration) if self.duration else None,
self.end_timestamp.timestamp() if self.end_timestamp else None,
self.reason,
self.resolved,
self.resolved_by,
self.resolve_reason,
self.expired,
dumps(self.changes),
dumps(self.metadata),
self.moderation_id,
))
await database.commit()
await database.close()
2024-05-06 15:11:11 -04:00
logger.debug("Row updated in moderation_%s!\n%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s",
self.moderation_id,
self.guild_id,
2024-05-06 14:55:36 -04:00
self.timestamp.timestamp(),
self.moderation_type,
self.target_type,
self.moderator_id,
self.role_id,
2024-05-06 14:55:36 -04:00
str(self.duration) if self.duration else None,
self.end_timestamp.timestamp() if self.end_timestamp else None,
self.reason,
self.resolved,
self.resolved_by,
self.resolve_reason,
self.expired,
2024-05-06 14:55:36 -04:00
dumps(self.changes),
dumps(self.metadata),
)
2024-05-06 14:55:36 -04:00
@classmethod
def from_dict(cls, bot: Red, data: dict) -> "Moderation":
return cls(bot=bot, **data)
@classmethod
def from_result(cls, bot: Red, result: Iterable, guild_id: int) -> "Moderation":
2024-06-04 16:52:19 -04:00
if result[7] is not None and result[7] != "NULL":
hours, minutes, seconds = map(int, result[7].split(':'))
duration = timedelta(hours=hours, minutes=minutes, seconds=seconds)
else:
duration = None
if result[14] is not None:
changes = json.loads(result[14])
change_obj_list = []
2024-05-24 03:53:38 -04:00
if changes:
for change in changes:
change_obj_list.append(Change.from_dict(bot=bot, data=change))
if result[15] is not None:
metadata = json.loads(result[15])
else:
metadata = {}
case = {
"moderation_id": int(result[0]),
"guild_id": int(guild_id),
"timestamp": datetime.fromtimestamp(result[1]),
"moderation_type": str(result[2]),
"target_type": str(result[3]),
"target_id": int(result[4]),
"moderator_id": int(result[5]),
"role_id": int(result[6]) if result[6] is not None else None,
"duration": duration,
"end_timestamp": datetime.fromtimestamp(result[8]) if result[8] is not None else None,
"reason": result[9],
"resolved": bool(result[10]),
"resolved_by": result[11],
"resolve_reason": result[12],
"expired": bool(result[13]),
"changes": change_obj_list,
"metadata": metadata if metadata else {},
}
return cls.from_dict(bot=bot, data=case)
@classmethod
async def execute(cls, bot: Red, guild_id: int, query: str, parameters: tuple | None = None, cursor: Cursor | None = None, return_obj: bool = True) -> Tuple["Moderation"]:
2024-05-06 21:46:01 -04:00
from ..utilities.database import connect
2024-06-04 15:43:15 -04:00
logger.trace("Executing query: %s", query)
logger.trace("With parameters: %s", parameters)
if not parameters:
parameters = ()
if not cursor:
no_cursor = True
2024-06-05 00:14:43 -04:00
database = await connect()
cursor = await database.cursor()
else:
no_cursor = False
2024-06-05 00:14:43 -04:00
await cursor.execute(query, parameters)
results = await cursor.fetchall()
if no_cursor:
2024-06-05 00:14:43 -04:00
await cursor.close()
await database.close()
if results and return_obj:
cases = []
for result in results:
case = cls.from_result(bot=bot, result=result, guild_id=guild_id)
if case.moderation_id != 0:
cases.append(case)
return tuple(cases)
return ()
@classmethod
2024-06-05 00:14:43 -04:00
async def get_latest(cls, bot: Red, guild_id: int, limit: int | None = None, offset: int = 0, types: Iterable | None = None, cursor: Cursor | None = None) -> Tuple["Moderation"]:
params = []
query = f"SELECT * FROM moderation_{guild_id} ORDER BY moderation_id DESC"
if types:
query += f" WHERE moderation_type IN ({', '.join(['?' for _ in types])})"
params.extend(types)
if limit:
query += " LIMIT ? OFFSET ?"
params.extend((limit, offset))
query += ";"
2024-06-05 00:14:43 -04:00
return await cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=tuple(params) if params else (), cursor=cursor)
@classmethod
2024-06-05 00:14:43 -04:00
async def get_next_case_number(cls, bot: Red, guild_id: int, cursor: Cursor | None = None) -> int:
result = await cls.get_latest(bot=bot, guild_id=guild_id, cursor=cursor, limit=1)
return (result[0].moderation_id + 1) if result else 1
@classmethod
2024-06-05 00:14:43 -04:00
async def find_by_id(cls, bot: Red, moderation_id: int, guild_id: int, cursor: Cursor | None = None) -> "Moderation":
query = f"SELECT * FROM moderation_{guild_id} WHERE moderation_id = ?;"
2024-06-05 00:14:43 -04:00
case = await cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=(moderation_id,), cursor=cursor)
if case:
return case[0]
raise ValueError(f"Case {moderation_id} not found in moderation_{guild_id}!")
@classmethod
2024-06-05 00:14:43 -04:00
async def find_by_target(cls, bot: Red, guild_id: int, target: int, types: Iterable | None = None, cursor: Cursor | None = None) -> Tuple["Moderation"]:
query = f"SELECT * FROM moderation_{guild_id} WHERE target_id = ?"
if types:
query += f" AND moderation_type IN ({', '.join(['?' for _ in types])})"
query += " ORDER BY moderation_id DESC;"
2024-06-05 00:14:43 -04:00
return await cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=(target, *types) if types else (target,), cursor=cursor)
@classmethod
2024-06-05 00:14:43 -04:00
async def find_by_moderator(cls, bot: Red, guild_id: int, moderator: int, types: Iterable | None = None, cursor: Cursor | None = None) -> Tuple["Moderation"]:
query = f"SELECT * FROM moderation_{guild_id} WHERE moderator_id = ?"
if types:
query += f" AND moderation_type IN ({', '.join(['?' for _ in types])})"
query += " ORDER BY moderation_id DESC;"
2024-06-05 00:14:43 -04:00
return await cls.execute(bot=bot, guild_id=guild_id, query=query, parameters=(moderator, *types) if types else (moderator,), cursor=cursor)
@classmethod
2024-06-05 00:14:43 -04:00
async def log(
cls,
bot: Red,
guild_id: int,
moderator_id: int,
moderation_type: str,
target_type: str,
target_id: int,
2024-05-24 03:46:20 -04:00
role_id: int | None = None,
2024-05-06 21:04:08 -04:00
duration: timedelta | None = None,
reason: str | None = None,
database: sqlite3.Connection | None = None,
timestamp: datetime | None = None,
resolved: bool = False,
2024-05-06 21:04:08 -04:00
resolved_by: int | None = None,
resolved_reason: str | None = None,
expired: bool | None = None,
changes: list | None = None,
metadata: dict | None = None,
return_obj: bool = True,
2024-06-04 23:55:55 -04:00
) -> Union["Moderation", int]:
2024-05-06 21:46:01 -04:00
from ..utilities.database import connect
from ..utilities.json import dumps
if not timestamp:
timestamp = datetime.fromtimestamp(time())
elif not isinstance(timestamp, datetime):
timestamp = datetime.fromtimestamp(timestamp)
2024-05-09 21:27:26 -04:00
if duration == "NULL":
duration = None
if duration is not None:
end_timestamp = timestamp + duration
else:
duration = None
end_timestamp = None
if not expired:
if end_timestamp:
expired = bool(timestamp > end_timestamp)
else:
expired = False
if reason == "NULL":
reason = None
2024-06-04 23:55:55 -04:00
if resolved_by in ["NULL", "?"]:
resolved_by = None
if resolved_reason == "NULL":
resolved_reason = None
if role_id == 0:
role_id = None
if not database:
2024-06-05 00:14:43 -04:00
database = await connect()
close_db = True
else:
close_db = False
2024-06-05 00:41:41 -04:00
moderation_id = await cls.get_next_case_number(bot=bot, guild_id=guild_id)
case = {
"moderation_id": moderation_id,
2024-05-06 14:55:36 -04:00
"timestamp": timestamp.timestamp(),
"moderation_type": moderation_type,
"target_type": target_type,
"target_id": target_id,
"moderator_id": moderator_id,
"role_id": role_id,
"duration": timedelta_to_string(duration) if duration else None,
2024-05-06 14:55:36 -04:00
"end_timestamp": end_timestamp.timestamp() if end_timestamp else None,
"reason": reason,
"resolved": resolved,
"resolved_by": resolved_by,
"resolve_reason": resolved_reason,
"expired": expired,
2024-05-06 14:55:36 -04:00
"changes": dumps(changes),
"metadata": dumps(metadata)
}
sql = f"INSERT INTO `moderation_{guild_id}` (moderation_id, timestamp, moderation_type, target_type, target_id, moderator_id, role_id, duration, end_timestamp, reason, resolved, resolved_by, resolve_reason, expired, changes, metadata) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
2024-06-05 00:14:43 -04:00
await database.execute(sql, tuple(case.values()))
2024-06-05 00:14:43 -04:00
await database.commit()
if close_db:
2024-06-05 00:14:43 -04:00
await database.close()
logger.verbose(
"Row inserted into moderation_%s!\n%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s",
guild_id,
2024-05-06 14:55:36 -04:00
case["moderation_id"],
case["timestamp"],
case["moderation_type"],
case["target_type"],
case["target_id"],
case["moderator_id"],
case["role_id"],
case["duration"],
case["end_timestamp"],
case["reason"],
case["resolved"],
case["resolved_by"],
case["resolve_reason"],
case["expired"],
case["changes"],
case["metadata"],
)
if return_obj:
2024-06-05 00:14:43 -04:00
return await cls.find_by_id(bot=bot, moderation_id=moderation_id, guild_id=guild_id)
2024-06-04 23:55:55 -04:00
return moderation_id